Fix: consumerd: live client receives incomplete metadata
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 14 May 2020 18:24:17 +0000 (14:24 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 26 May 2020 20:25:31 +0000 (16:25 -0400)
Observed issue
==============

Babeltrace 1.5.x and Babeltrace 2.x can both report errors (albeit
differently) when using the "lttng-live" protocol that imply that the
metadata they received is incomplete.

For instance, babeltrace 1.5.3 reports the following error:

```
[error] Error creating AST
[error] [Context] Cannot open_mmap_trace of format ctf.
[error] Error adding trace
[warning] [Context] Cannot open_trace of format lttng-live at path net://localhost:xxxx/host/session/live_session.
[warning] [Context] cannot open trace "net://localhost:xxxx/host/session/live_session" for reading.
[error] opening trace "net://localhost:xxxx/host/session/live_session" for reading.
[error] none of the specified trace paths could be opened.
```

While debugging both viewers, I noticed that both were attempting to
receive the available metadata before consuming the "data" streams'
content.

Typically, the following exchange between the relay daemon and the
lttng-live client occurs when the problem is observed:

bt lttng-live:
    emits LTTNG_VIEWER_GET_METADATA command
relayd:
    returns LTTNG_VIEWER_METADATA_OK, len = 4096 (default packet size)
bt lttng-live:
    consume 4096 bytes of metadata
    emits LTTNG_VIEWER_GET_METADATA command
relayd:
    returns LTTNG_VIEWER_NO_NEW_METADATA

When the lttng-live client receives the LTTNG_VIEWER_NO_NEW_METADATA
status code, it attempts to parse all the metadata it has received
since the last LTTNG_VIEWER_NO_NEW_METADATA reply. In effect, it is
expected that this forms a logical unit of metadata that is parseable
on its own.

If this is the first time metadata is received for that trace, the
metadata is expected to contain a trace declaration, packet header
declaration, etc.

If metadata was already received, it is expected that the newly parsed
declarations can be "appended" to the existing trace schema.

It appears that the relay daemon sends the
LTTNG_VIEWER_NO_NEW_METADATA while the metadata it has sent up to that
point is not parseable on its own.

The live protocol description does not require or imply that a viewer
should attempt to parse metadata packets until it hopefully succeeds
at some point. Anyhow:
  1) This would make it impossible for a live viewer to correctly
  handle a corrupted metadata stream beyond retrying forever,
  2) This behaviour is not implemented by the two reference
  implementations of the protocol.

Cause
=====

The relay daemon provides a guarantee that it will send any available
metadata before allowing a data stream packet to be served to the
client.

In other words, a client requesting a data packet will receive the
LTTNG_VIEWER_FLAG_NEW_METADATA status code (and no data) if it
attempts to get a data stream packet while the relay daemon has
metadata already available.

This guarantee is properly enforced as far as I can tell. However,
looking at the consumer daemon implementation, it appears that
metadata packets are sent as soon as they are available.

A metadata packet is not guaranteed to be parseable on its own. For
instance, it can end in the middle the an event declaration.

Hence, this hints at a race involving the tracer, the consumer daemon,
the relay daemon, and the lttng-live client.

Consider the following scenario:
  - Metadata packets (sub-buffers) are configured to be 4kB in size,
  - a large number of kernel events are enabled (e.g. --kernel --all),
  - the network connection between the consumer and relay daemons is
    slow

1) The kernel tracer will produce enough TSDL metadata to fill the
   first sub-buffer of the "metadata" ring-buffer and signal the
   consumer daemon that a buffer is ready. The tracer then starts
   writing the remaining data in the following available sub-buffers.

2) The consumer daemon metadata thread is woken up and consumes the
   first metadata sub-buffer and sends it to the relay daemon.

3) A live client establishes an lttng-live connection to the relay
   daemon and attempts to consume the available metadata. It receives
   the first packet and, since the relay daemon doesn't know about any
   follow-up metadata, receives LTTNG_VIEWER_NO_NEW_METADATA on the
   next attempt.

4) Having received LTTNG_VIEWER_NO_NEW_METADATA, the lttng-live client
   attempts to parse the metadata it has received and fails.

This scenario is easy to reproduce by inserting a "sleep(1)" at
src/bin/lttng-relayd/main.c:1978 (as of this revision). This simulates
a relay daemon that would be slow to receive/process metadata packets
from the consumer daemon.

This problem similarly applies to the user space tracer.

Solution
========

Having no knowledge of TSDL, the relay daemon can't "bundle" packets
of metadata until they form a "parseable unit" to send to the consumer
daemon.

To provide the parseability guarantee expected by the viewers, and by
the relay daemon implicitly, we need to ensure that the consumer
daemons only send "parseable units" of metadata to the relay daemon.

Unfortunately, the consumer daemons do not know how to parse TSDL
either. In fact, only the metadata producers are able to provide the
boundaries of the "parseable units" of metadata.

The general idea of the fix is to accumulate metadata up to a point
where a "parseable unit" boundary has been identified and send that
content in one request to the relay daemon. Note that the solution
described here only concerns the live mode. In other cases, the
mechanisms described are simply bypassed.

A "metadata bucket" is added to lttng_consumer_stream when it is
created from a live channel. This bucket is filled until the
consumption position reaches the "parseable unit" end position.

A refresher about the handling of metadata in live mode
-------------------------------------------------------

Three "events" are of interest here and can cause metadata to be
consumed more or less indirectly:
  1) A metadata packet is closed, causing the metadata thread to wake
     up
  2) The live timer expires
  3) A data sub-buffer is closed, causing the data thread to wake-up

1) The first case is simple and happens regardless of whether or not
   the tracing session is in live mode or not. Metadata is always
   consumed by the metadata thread in the same way. However, this
   scenario can be "caused" by (2) and (3). See [1]. A sub-buffer is
   "acquired" from the metadata ring-buffer and sent to the relayd
   daemon as the payload of a "RELAYD_SEND_METADATA" command.

2) When the live timer expires [2], the 'check_stream' function is
   called on all data streams of the session. As its name clearly
   implies, this function is responsible for flushing all streams or
   sending a "live beacon" (called an "empty index" in the code) if
   there is no data to flush. Any flushed data will result in (3).

3) When a data sub-buffer is ready to be consumed, [1] is invoked
   by the data thread. This function acquires a sub-buffer and sends
   it to the relay daemon through the data connection.

   Then, an important synchronization step takes place. The index of
   the newly-sent packet will be sent through the control
   connection. The relay daemon waits for both the data packet and its
   matching index before making the new packet visible to live
   viewers.

   Since a data packet could contain data that requires "newer"
   metadata to be decoded, the data thread flushes the metadata stream
   and enters a "waiting" phase to pause until all metadata present in
   the metadata ring buffer has been consumed [3].

   At the end of this waiting phase, the data thread sends the data
   packet's index to the relay daemon, allowing the relayd to make it
   visible to its live clients.

How to identify a "parseable unit" boundary?
--------------------------------------------

In the case of the kernel domain, the kernel tracer produces the
actual TSDL descriptions directly. The TSDL metadata is serialized to
a metadata cache and is flushed "just in time" to the metadata
ring-buffer when a "get next" operation is performed.

There is no way, from user space, to query whether or not the metadata
cache of the kernel tracer is empty. Hence, a new
RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK command was added to
query whether or not the kernel tracer's metadata cache is empty when
acquiring a sub-buffer.

This allows the consumer daemon to identify a "coherent" position in
the metadata stream that is safe to use as a "parseable unit"
boundary.

As for the user space domain, since the session daemon is responsible
for generating the TSDL representation of the metadata, there is no
need to change LTTng-ust APIs.

The session daemon generates coherent units of metadata and adds them
to its "registry" at once (protected by the registry's lock). It then
flushes the contents to the consumer daemon and waits for that data to
be consumed before proceeding further.

On the consumer daemon side, the metadata cache is filled with the
newly-produced contents. This is done atomically with respect to
accesses to the metadata cache as all accesses happen through a
dedicated metadata cache lock.

When the consumer's metadata polling thread is woken-up, it will
attempt to acquire (`get_next`) a sub-buffer from the metadata stream
ring-buffer. If it fails, it will flush a sub-buffer's worth of
metadata to the ring-buffer and attempt to acquire a sub-buffer again.

At this point, it is possible to determine if that sub-buffer is the
last one of a parseable metadata unit: the cache must be empty and the
ring-buffer must be empty following the consumption of this
sub-buffer. When those conditions are met, the resulting metadata
`stream_subbuffer` is tagged as being `coherent`.

Metadata bucket
---------------

A helper interface, metadata_bucket, is introduced as part of this
fix. A metadata_bucket is `fill`ed with `stream_subbuffer`s, and is
eventually `flushed` when it is filled by a `coherent` sub-buffer.

As older versions of LTTng-modules must remain supported, this new
helper is not used when the
RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK operation is not
available. When the operation is available, the metadata stream's
bucketization is enabled, causing a bucket to be created and the
`consume` callback to be swapped.

The `consume` callback of the metadata streams is replaced by a new
implementation when the metadata bucketization is activated on the
stream. This implementation returns the padded size of the consumed
sub-buffer when they could be added to the bucket. When the bucket is
flushed, the regular `mmap`-based consumption function is called with
the bucket's contents.

Known drawbacks
===============

This implementation causes the consumer daemon to buffer the whole
initial unit of metadata before sending it. In practice, this is not
expected to be a problem since the largest metadata files we have seen
in real use are a couple of megabytes wide.

Beyond the (temporary) memory use, this causes the metadata thread to
block while this potentially large chunk of metadata is sent (rather
than blocking while sending 4kb at a time).

The second point is just a consequence of existing shortcomings of the
consumerd; slow IO should not affect other unrelated streams. The
fundamental problem is that blocking IO is used and we should switch
to non-blocking communication if this is a problem (as is done in the
relay daemon).

The first point is more problematic given the existing tracer APIs.
If the tracer could provide the boundary of a "parseable unit" of
metadata, we could send the header of the RELAYD_SEND_METADATA command
with that size and send the various metadata packets as they are made
available. This would make no difference to the relay daemon as it is
not blocking on that socket and will not make the metadata size change
visible to the "live server" until it has all been received.

This size can't be determined right now since it could exceed the
total size of the "metadata" ring buffer. In other words, we can't wait
for the production of metadata to complete before starting to consume.

Finally, while implementing this fix, I also realized that the
computation of the rotation position of the metadata streams is
erroneous. The rotation code makes use of the ring-buffer's positions
to determine the rotation position. However, since both user space and
kernel domains make use of a "cache" behind the ring-buffer, that
cached content must be taken into account when computing the metadata
stream's rotation position.

References
==========

[1] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer.c#L3433
[2] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-timer.c#L312
[3] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-stream.c#L492

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I40ee07e5c344c72d9aae2b9b15dc36c00b21e5fa

src/common/consumer/Makefile.am
src/common/consumer/consumer-stream.c
src/common/consumer/consumer-stream.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/consumer/metadata-bucket.c [new file with mode: 0644]
src/common/consumer/metadata-bucket.h [new file with mode: 0644]
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 296301e6f360fa122dad866cb7943f5773c6f8fe..55e47b2101dcae6ac7ef0c4eab260689c601ba98 100644 (file)
@@ -6,7 +6,8 @@ noinst_HEADERS = consumer-metadata-cache.h consumer-timer.h \
                 consumer-testpoint.h
 
 libconsumer_la_SOURCES = consumer.c consumer.h consumer-metadata-cache.c \
-                         consumer-timer.c consumer-stream.c consumer-stream.h
+                         consumer-timer.c consumer-stream.c consumer-stream.h \
+                         metadata-bucket.c metadata-bucket.h
 
 libconsumer_la_LIBADD = \
                $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
index 5dc380e5e32aa9a23c6d75c878bb6d2a3e10086d..deebb58fe41b321cb5f41a76e4ee771c1b5ced4d 100644 (file)
@@ -21,6 +21,7 @@
 #include <common/utils.h>
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "consumer-stream.h"
 
@@ -153,7 +154,7 @@ static ssize_t consumer_stream_consume_mmap(
                        subbuffer->info.data.subbuf_size;
 
        return lttng_consumer_on_read_subbuffer_mmap(
-                       ctx, stream, &subbuffer->buffer.buffer, padding_size);
+                       stream, &subbuffer->buffer.buffer, padding_size);
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -395,6 +396,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        stream->metadata_version = subbuffer->info.metadata.version;
        stream->reset_metadata_flag = 1;
 
+       if (stream->metadata_bucket) {
+               metadata_bucket_reset(stream->metadata_bucket);
+       }
+
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
        }
@@ -726,6 +731,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
+       metadata_bucket_destroy(stream->metadata_bucket);
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
@@ -991,3 +997,59 @@ bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream)
        assert(stream);
        return cds_lfht_is_node_deleted(&stream->node.node);
 }
+
+static ssize_t metadata_bucket_flush(
+               const struct stream_subbuffer *buffer, void *data)
+{
+       ssize_t ret;
+       struct lttng_consumer_stream *stream = data;
+
+       ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+       if (ret < 0) {
+               goto end;
+       }
+end:
+       return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       ssize_t ret;
+       enum metadata_bucket_status status;
+
+       status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+       switch (status) {
+       case METADATA_BUCKET_STATUS_OK:
+               /* Return consumed size. */
+               ret = subbuffer->buffer.buffer.size;
+               break;
+       default:
+               ret = -1;
+       }
+
+       return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       assert(stream->metadata_flag);
+       assert(!stream->metadata_bucket);
+       assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+       stream->metadata_bucket = metadata_bucket_create(
+                       metadata_bucket_flush, stream);
+       if (!stream->metadata_bucket) {
+               ret = -1;
+               goto end;
+       }
+
+       stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+       return ret;
+}
index eb00dac78849d62df66911b8423f4687ea4a4f5d..cb1dafe3951a4a004cfc2b03864f11471de7d130 100644 (file)
@@ -111,4 +111,13 @@ int consumer_stream_rotate_output_files(struct lttng_consumer_stream *stream);
  */
 bool consumer_stream_is_deleted(struct lttng_consumer_stream *stream);
 
+/*
+ * Enable metadata bucketization. This must only be enabled if the tracer
+ * provides a reliable metadata `coherent` flag.
+ *
+ * This must be called on initialization before any subbuffer is consumed.
+ */
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
index 5c211339d42bd2017a8888b590ff5b3941cbbadf..52b15867b968afdcd16848235093d37eb3bd5af8 100644 (file)
@@ -1583,7 +1583,6 @@ end:
  * Returns the number of bytes written
  */
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
                unsigned long padding)
index aa8a401a0416efe089057f2e07b6187c38bf78d5..4770671c003a35f40d67ce4d85f199587323e93f 100644 (file)
@@ -265,6 +265,14 @@ struct stream_subbuffer {
                        unsigned long subbuf_size;
                        unsigned long padded_subbuf_size;
                        uint64_t version;
+                       /*
+                        * Left unset when unsupported.
+                        *
+                        * Indicates that this is the last sub-buffer of
+                        * a series of sub-buffer that makes-up a coherent
+                        * (parseable) unit of metadata.
+                        */
+                       LTTNG_OPTIONAL(bool) coherent;
                } metadata;
                struct {
                        unsigned long subbuf_size;
@@ -623,6 +631,7 @@ struct lttng_consumer_stream {
                on_sleep_cb on_sleep;
                unlock_cb unlock;
        } read_subbuffer_ops;
+       struct metadata_bucket *metadata_bucket;
 };
 
 /*
@@ -954,7 +963,6 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                int (*update_stream)(uint64_t sessiond_key, uint32_t state));
 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
                unsigned long padding);
diff --git a/src/common/consumer/metadata-bucket.c b/src/common/consumer/metadata-bucket.c
new file mode 100644 (file)
index 0000000..1ee5022
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "metadata-bucket.h"
+
+#include <common/buffer-view.h>
+#include <common/consumer/consumer.h>
+#include <common/dynamic-buffer.h>
+#include <common/macros.h>
+#include <common/error.h>
+
+struct metadata_bucket {
+       struct lttng_dynamic_buffer content;
+       struct {
+               metadata_bucket_flush_cb fn;
+               void *data;
+       } flush;
+       unsigned int buffer_count;
+};
+
+struct metadata_bucket *metadata_bucket_create(
+               metadata_bucket_flush_cb flush, void *data)
+{
+       struct metadata_bucket *bucket;
+
+       bucket = zmalloc(sizeof(typeof(*bucket)));
+       if (!bucket) {
+               PERROR("Failed to allocate buffer bucket");
+               goto end;
+       }
+
+       bucket->flush.fn = flush;
+       bucket->flush.data = data;
+       lttng_dynamic_buffer_init(&bucket->content);
+end:
+       return bucket;
+}
+
+void metadata_bucket_destroy(struct metadata_bucket *bucket)
+{
+       if (!bucket) {
+               return;
+       }
+
+       if (bucket->content.size > 0) {
+               WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
+                               bucket->content.size, bucket->buffer_count);
+       }
+
+       lttng_dynamic_buffer_reset(&bucket->content);
+       free(bucket);
+}
+
+void metadata_bucket_reset(struct metadata_bucket *bucket)
+{
+       lttng_dynamic_buffer_reset(&bucket->content);
+       lttng_dynamic_buffer_init(&bucket->content);
+       bucket->buffer_count = 0;
+}
+
+enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
+               const struct stream_subbuffer *buffer)
+{
+       ssize_t ret;
+       struct lttng_buffer_view flushed_view;
+       struct stream_subbuffer flushed_subbuffer;
+       enum metadata_bucket_status status;
+       const bool should_flush =
+                       LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent);
+       const size_t padding_this_buffer =
+                       buffer->info.metadata.padded_subbuf_size -
+                       buffer->info.metadata.subbuf_size;
+       size_t flush_size;
+
+       DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
+                       buffer->buffer.buffer.size,
+                       buffer->info.metadata.subbuf_size,
+                       buffer->info.metadata.padded_subbuf_size,
+                       buffer->info.metadata.coherent.value ? "true" : "false");
+       /*
+        * If no metadata was accumulated and this buffer should be
+        * flushed, don't copy it unecessarily; just flush it directly.
+        */
+       if (!should_flush || bucket->buffer_count != 0) {
+               /*
+                * Append the _padded_ subbuffer since they are combined
+                * into a single "virtual" subbuffer that will be
+                * flushed at once.
+                *
+                * This means that some padding will be sent over the
+                * network, but should not represent a large amount
+                * of data as incoherent subbuffers are typically
+                * pretty full.
+                *
+                * The padding of the last subbuffer (coherent) added to
+                * the bucket is not sent, which is what really matters
+                * from an efficiency point of view.
+                */
+               ret = lttng_dynamic_buffer_append_view(
+                       &bucket->content, &buffer->buffer.buffer);
+               if (ret) {
+                       status = METADATA_BUCKET_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
+       bucket->buffer_count++;
+       if (!should_flush) {
+               status = METADATA_BUCKET_STATUS_OK;
+               goto end;
+       }
+
+       flushed_view = bucket->content.size != 0 ?
+               lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) :
+               lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1);
+
+       /*
+        * The flush is done with the size of all padded sub-buffers, except
+        * for the last one which we can safely "trim". The padding of the last
+        * packet will be reconstructed by the relay daemon.
+        */
+       flush_size = flushed_view.size - padding_this_buffer;
+
+       flushed_subbuffer = (typeof(flushed_subbuffer)) {
+               .buffer.buffer = flushed_view,
+               .info.metadata.subbuf_size = flush_size,
+               .info.metadata.padded_subbuf_size = flushed_view.size,
+               .info.metadata.version = buffer->info.metadata.version,
+               .info.metadata.coherent = buffer->info.metadata.coherent,
+       };
+
+       DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
+                       flushed_view.size, bucket->buffer_count,
+                       bucket->buffer_count > 1 ? "s" : "");
+       ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data);
+       if (ret >= 0) {
+               status = METADATA_BUCKET_STATUS_OK;
+       } else {
+               status = METADATA_BUCKET_STATUS_ERROR;
+       }
+
+       metadata_bucket_reset(bucket);
+
+end:
+       return status;
+}
diff --git a/src/common/consumer/metadata-bucket.h b/src/common/consumer/metadata-bucket.h
new file mode 100644 (file)
index 0000000..0355eb3
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef METADATA_BUCKET_H
+#define METADATA_BUCKET_H
+
+#include <common/consumer/consumer.h>
+
+struct metadata_bucket;
+
+typedef ssize_t (*metadata_bucket_flush_cb)(
+               const struct stream_subbuffer *buffer, void *data);
+
+enum metadata_bucket_status {
+       METADATA_BUCKET_STATUS_OK,
+       METADATA_BUCKET_STATUS_ERROR,
+};
+
+struct metadata_bucket *metadata_bucket_create(
+               metadata_bucket_flush_cb flush, void *data);
+
+void metadata_bucket_destroy(struct metadata_bucket *bucket);
+
+enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
+               const struct stream_subbuffer *buffer);
+
+void metadata_bucket_reset(struct metadata_bucket *bucket);
+
+#endif /* METADATA_BUCKET_H */
+
index e06f3b3e079aae44f494ba6d4409f02120018af4..dd5cf1761a4992c9e7384d48675038c5438ece0e 100644 (file)
@@ -19,6 +19,7 @@
 #include <inttypes.h>
 #include <unistd.h>
 #include <sys/stat.h>
+#include <stdint.h>
 
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
@@ -36,7 +37,7 @@
 #include <common/optional.h>
 #include <common/buffer-view.h>
 #include <common/consumer/consumer.h>
-#include <stdint.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "kernel-consumer.h"
 
@@ -287,7 +288,7 @@ static int lttng_kconsumer_snapshot_channel(
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view,
                                        padded_len - len);
                        /*
@@ -1558,6 +1559,42 @@ end:
        return ret;
 }
 
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+       const char *addr;
+       bool coherent;
+
+       ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+                       &coherent);
+       if (ret) {
+               goto end;
+       }
+
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+                       stream, subbuffer);
+       if (ret) {
+               goto end;
+       }
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+       ret = get_current_subbuf_addr(stream, &addr);
+       if (ret) {
+               goto end;
+       }
+
+       subbuffer->buffer.buffer = lttng_buffer_view_init(
+                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+       DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+                       subbuffer->info.metadata.padded_subbuf_size,
+                       coherent ? "true" : "false");
+end:
+       return ret;
+}
+
 static
 int put_next_subbuffer(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
@@ -1576,15 +1613,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-static void lttng_kconsumer_set_stream_ops(
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+       return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+                       -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
-       if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_mmap;
-       } else {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_splice;
+       int ret = 0;
+
+       if (stream->metadata_flag && stream->chan->is_live) {
+               DBG("Attempting to enable metadata bucketization for live consumers");
+               if (is_get_next_check_metadata_available(stream->wait_fd)) {
+                       DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_metadata_check;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               } else {
+                       /*
+                        * The kernel tracer version is too old to indicate
+                        * when the metadata stream has reached a "coherent"
+                        * (parseable) point.
+                        *
+                        * This means that a live viewer may see an incoherent
+                        * sequence of metadata and fail to parse it.
+                        */
+                       WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+                       metadata_bucket_destroy(stream->metadata_bucket);
+                       stream->metadata_bucket = NULL;
+               }
+       }
+
+       if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+               if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_mmap;
+               } else {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_splice;
+               }
        }
 
        if (stream->metadata_flag) {
@@ -1600,6 +1675,8 @@ static void lttng_kconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -1640,7 +1717,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                }
        }
 
-       lttng_kconsumer_set_stream_ops(stream);
+       ret = lttng_kconsumer_set_stream_ops(stream);
+       if (ret) {
+               goto error_close_fd;
+       }
 
        /* we return 0 to let the library handle the FD internally */
        return 0;
index 1af9840cd5184b0cfebad0bf680257fbbadac9a5..6d6690a32161a4cbc24e05157f31407fe9c1df74 100644 (file)
@@ -37,6 +37,7 @@
 #include <common/utils.h>
 #include <common/index/index.h>
 #include <common/consumer/consumer.h>
+#include <common/optional.h>
 
 #include "ust-consumer.h"
 
@@ -1230,7 +1231,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
@@ -2464,7 +2465,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        assert(write_len != 0);
        if (write_len < 0) {
                ERR("Writing one metadata packet");
-               ret = -1;
+               ret = write_len;
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
@@ -2810,28 +2811,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
 {
        int ret;
+       bool cache_empty;
+       bool got_subbuffer;
+       bool coherent;
+       bool buffer_empty;
+       unsigned long consumed_pos, produced_pos;
 
-       ret = ustctl_get_next_subbuf(stream->ustream);
-       if (ret) {
-               ret = commit_one_metadata_packet(stream);
-               if (ret < 0) {
-                       goto end;
-               } else if (ret == 0) {
-                       /* Not an error, the cache is empty. */
-                       ret = -ENODATA;
-                       goto end;
+       do {
+               ret = ustctl_get_next_subbuf(stream->ustream);
+               if (ret == 0) {
+                       got_subbuffer = true;
+               } else {
+                       got_subbuffer = false;
+                       if (ret != -EAGAIN) {
+                               /* Fatal error. */
+                               goto end;
+                       }
                }
 
-               ret = ustctl_get_next_subbuf(stream->ustream);
-               if (ret) {
-                       goto end;
+               /*
+                * Determine if the cache is empty and ensure that a sub-buffer
+                * is made available if the cache is not empty.
+                */
+               if (!got_subbuffer) {
+                       ret = commit_one_metadata_packet(stream);
+                       if (ret < 0 && ret != -ENOBUFS) {
+                               goto end;
+                       } else if (ret == 0) {
+                               /* Not an error, the cache is empty. */
+                               cache_empty = true;
+                               ret = -ENODATA;
+                               goto end;
+                       } else {
+                               cache_empty = false;
+                       }
+               } else {
+                       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+                       cache_empty = stream->chan->metadata_cache->max_offset ==
+                                     stream->ust_metadata_pushed;
+                       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                }
-       }
+       } while (!got_subbuffer);
 
+       /* Populate sub-buffer infos and view. */
        ret = get_next_subbuffer_common(stream, subbuffer);
        if (ret) {
                goto end;
        }
+
+       ret = lttng_ustconsumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               /*
+                * -EAGAIN is not expected since we got a sub-buffer and haven't
+                * pushed the consumption position yet (on put_next).
+                */
+               PERROR("Failed to take a snapshot of metadata buffer positions");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+       if (ret) {
+               PERROR("Failed to get metadata consumed position");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+       if (ret) {
+               PERROR("Failed to get metadata produced position");
+               goto end;
+       }
+
+       /* Last sub-buffer of the ring buffer ? */
+       buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
+
+       /*
+        * The sessiond registry lock ensures that coherent units of metadata
+        * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
+        * acquired, the cache is empty, and it is the only available sub-buffer
+        * available, it is safe to assume that it is "coherent".
+        */
+       coherent = got_subbuffer && cache_empty && buffer_empty;
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
 end:
        return ret;
 }
@@ -2851,9 +2912,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream,
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
 }
 
-static void lttng_ustconsumer_set_stream_ops(
+static int lttng_ustconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
+       int ret = 0;
+
        stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
        if (stream->metadata_flag) {
                stream->read_subbuffer_ops.get_next_subbuffer =
@@ -2862,7 +2925,14 @@ static void lttng_ustconsumer_set_stream_ops(
                                extract_metadata_subbuffer_info;
                stream->read_subbuffer_ops.reset_metadata =
                                metadata_stream_reset_cache;
-               stream->read_subbuffer_ops.on_sleep = signal_metadata;
+               if (stream->chan->is_live) {
+                       stream->read_subbuffer_ops.on_sleep = signal_metadata;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               }
        } else {
                stream->read_subbuffer_ops.get_next_subbuffer =
                                get_next_subbuffer;
@@ -2876,6 +2946,8 @@ static void lttng_ustconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 /*
This page took 0.038777 seconds and 5 git commands to generate.