Fix: consumerd: live client receives incomplete metadata
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 14 May 2020 18:24:17 +0000 (14:24 -0400)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Tue, 26 May 2020 18:29:52 +0000 (14:29 -0400)
Observed issue
==============

Babeltrace 1.5.x and Babeltrace 2.x can both report errors (albeit
differently) when using the "lttng-live" protocol that imply that the
metadata they received is incomplete.

For instance, babeltrace 1.5.3 reports the following error:

```
[error] Error creating AST
[error] [Context] Cannot open_mmap_trace of format ctf.
[error] Error adding trace
[warning] [Context] Cannot open_trace of format lttng-live at path net://localhost:xxxx/host/session/live_session.
[warning] [Context] cannot open trace "net://localhost:xxxx/host/session/live_session" for reading.
[error] opening trace "net://localhost:xxxx/host/session/live_session" for reading.
[error] none of the specified trace paths could be opened.
```

While debugging both viewers, I noticed that both were attempting to
receive the available metadata before consuming the "data" streams'
content.

Typically, the following exchange between the relay daemon and the
lttng-live client occurs when the problem is observed:

bt lttng-live:
    emits LTTNG_VIEWER_GET_METADATA command
relayd:
    returns LTTNG_VIEWER_METADATA_OK, len = 4096 (default packet size)
bt lttng-live:
    consume 4096 bytes of metadata
    emits LTTNG_VIEWER_GET_METADATA command
relayd:
    returns LTTNG_VIEWER_NO_NEW_METADATA

When the lttng-live client receives the LTTNG_VIEWER_NO_NEW_METADATA
status code, it attempts to parse all the metadata it has received
since the last LTTNG_VIEWER_NO_NEW_METADATA reply. In effect, it is
expected that this forms a logical unit of metadata that is parseable
on its own.

If this is the first time metadata is received for that trace, the
metadata is expected to contain a trace declaration, packet header
declaration, etc.

If metadata was already received, it is expected that the newly parsed
declarations can be "appended" to the existing trace schema.

It appears that the relay daemon sends the
LTTNG_VIEWER_NO_NEW_METADATA while the metadata it has sent up to that
point is not parseable on its own.

The live protocol description does not require or imply that a viewer
should attempt to parse metadata packets until it hopefully succeeds
at some point. Anyhow:
  1) This would make it impossible for a live viewer to correctly
  handle a corrupted metadata stream beyond retrying forever,
  2) This behaviour is not implemented by the two reference
  implementations of the protocol.

Cause
=====

The relay daemon provides a guarantee that it will send any available
metadata before allowing a data stream packet to be served to the
client.

In other words, a client requesting a data packet will receive the
LTTNG_VIEWER_FLAG_NEW_METADATA status code (and no data) if it
attempts to get a data stream packet while the relay daemon has
metadata already available.

This guarantee is properly enforced as far as I can tell. However,
looking at the consumer daemon implementation, it appears that
metadata packets are sent as soon as they are available.

A metadata packet is not guaranteed to be parseable on its own. For
instance, it can end in the middle the an event declaration.

Hence, this hints at a race involving the tracer, the consumer daemon,
the relay daemon, and the lttng-live client.

Consider the following scenario:
  - Metadata packets (sub-buffers) are configured to be 4kB in size,
  - a large number of kernel events are enabled (e.g. --kernel --all),
  - the network connection between the consumer and relay daemons is
    slow

1) The kernel tracer will produce enough TSDL metadata to fill the
   first sub-buffer of the "metadata" ring-buffer and signal the
   consumer daemon that a buffer is ready. The tracer then starts
   writing the remaining data in the following available sub-buffers.

2) The consumer daemon metadata thread is woken up and consumes the
   first metadata sub-buffer and sends it to the relay daemon.

3) A live client establishes an lttng-live connection to the relay
   daemon and attempts to consume the available metadata. It receives
   the first packet and, since the relay daemon doesn't know about any
   follow-up metadata, receives LTTNG_VIEWER_NO_NEW_METADATA on the
   next attempt.

4) Having received LTTNG_VIEWER_NO_NEW_METADATA, the lttng-live client
   attempts to parse the metadata it has received and fails.

This scenario is easy to reproduce by inserting a "sleep(1)" at
src/bin/lttng-relayd/main.c:1978 (as of this revision). This simulates
a relay daemon that would be slow to receive/process metadata packets
from the consumer daemon.

This problem similarly applies to the user space tracer.

Solution
========

Having no knowledge of TSDL, the relay daemon can't "bundle" packets
of metadata until they form a "parseable unit" to send to the consumer
daemon.

To provide the parseability guarantee expected by the viewers, and by
the relay daemon implicitly, we need to ensure that the consumer
daemons only send "parseable units" of metadata to the relay daemon.

Unfortunately, the consumer daemons do not know how to parse TSDL
either. In fact, only the metadata producers are able to provide the
boundaries of the "parseable units" of metadata.

The general idea of the fix is to accumulate metadata up to a point
where a "parseable unit" boundary has been identified and send that
content in one request to the relay daemon. Note that the solution
described here only concerns the live mode. In other cases, the
mechanisms described are simply bypassed.

A "metadata bucket" is added to lttng_consumer_stream when it is
created from a live channel. This bucket is filled until the
consumption position reaches the "parseable unit" end position.

A refresher about the handling of metadata in live mode
-------------------------------------------------------

Three "events" are of interest here and can cause metadata to be
consumed more or less indirectly:
  1) A metadata packet is closed, causing the metadata thread to wake
     up
  2) The live timer expires
  3) A data sub-buffer is closed, causing the data thread to wake-up

1) The first case is simple and happens regardless of whether or not
   the tracing session is in live mode or not. Metadata is always
   consumed by the metadata thread in the same way. However, this
   scenario can be "caused" by (2) and (3). See [1]. A sub-buffer is
   "acquired" from the metadata ring-buffer and sent to the relayd
   daemon as the payload of a "RELAYD_SEND_METADATA" command.

2) When the live timer expires [2], the 'check_stream' function is
   called on all data streams of the session. As its name clearly
   implies, this function is responsible for flushing all streams or
   sending a "live beacon" (called an "empty index" in the code) if
   there is no data to flush. Any flushed data will result in (3).

3) When a data sub-buffer is ready to be consumed, [1] is invoked
   by the data thread. This function acquires a sub-buffer and sends
   it to the relay daemon through the data connection.

   Then, an important synchronization step takes place. The index of
   the newly-sent packet will be sent through the control
   connection. The relay daemon waits for both the data packet and its
   matching index before making the new packet visible to live
   viewers.

   Since a data packet could contain data that requires "newer"
   metadata to be decoded, the data thread flushes the metadata stream
   and enters a "waiting" phase to pause until all metadata present in
   the metadata ring buffer has been consumed [3].

   At the end of this waiting phase, the data thread sends the data
   packet's index to the relay daemon, allowing the relayd to make it
   visible to its live clients.

How to identify a "parseable unit" boundary?
--------------------------------------------

In the case of the kernel domain, the kernel tracer produces the
actual TSDL descriptions directly. The TSDL metadata is serialized to
a metadata cache and is flushed "just in time" to the metadata
ring-buffer when a "get next" operation is performed.

There is no way, from user space, to query whether or not the metadata
cache of the kernel tracer is empty. Hence, a new
RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK command was added to
query whether or not the kernel tracer's metadata cache is empty when
acquiring a sub-buffer.

This allows the consumer daemon to identify a "coherent" position in
the metadata stream that is safe to use as a "parseable unit"
boundary.

As for the user space domain, since the session daemon is responsible
for generating the TSDL representation of the metadata, there is no
need to change LTTng-ust APIs.

The session daemon generates coherent units of metadata and adds them
to its "registry" at once (protected by the registry's lock). It then
flushes the contents to the consumer daemon and waits for that data to
be consumed before proceeding further.

On the consumer daemon side, the metadata cache is filled with the
newly-produced contents. This is done atomically with respect to
accesses to the metadata cache as all accesses happen through a
dedicated metadata cache lock.

When the consumer's metadata polling thread is woken-up, it will
attempt to acquire (`get_next`) a sub-buffer from the metadata stream
ring-buffer. If it fails, it will flush a sub-buffer's worth of
metadata to the ring-buffer and attempt to acquire a sub-buffer again.

At this point, it is possible to determine if that sub-buffer is the
last one of a parseable metadata unit: the cache must be empty and the
ring-buffer must be empty following the consumption of this
sub-buffer. When those conditions are met, the resulting metadata
`stream_subbuffer` is tagged as being `coherent`.

Metadata bucket
---------------

A helper interface, metadata_bucket, is introduced as part of this
fix. A metadata_bucket is `fill`ed with `stream_subbuffer`s, and is
eventually `flushed` when it is filled by a `coherent` sub-buffer.

As older versions of LTTng-modules must remain supported, this new
helper is not used when the
RING_RING_BUFFER_GET_NEXT_SUBBUF_METADATA_CHECK operation is not
available. When the operation is available, the metadata stream's
bucketization is enabled, causing a bucket to be created and the
`consume` callback to be swapped.

The `consume` callback of the metadata streams is replaced by a new
implementation when the metadata bucketization is activated on the
stream. This implementation returns the padded size of the consumed
sub-buffer when they could be added to the bucket. When the bucket is
flushed, the regular `mmap`-based consumption function is called with
the bucket's contents.

Known drawbacks
===============

This implementation causes the consumer daemon to buffer the whole
initial unit of metadata before sending it. In practice, this is not
expected to be a problem since the largest metadata files we have seen
in real use are a couple of megabytes wide.

Beyond the (temporary) memory use, this causes the metadata thread to
block while this potentially large chunk of metadata is sent (rather
than blocking while sending 4kb at a time).

The second point is just a consequence of existing shortcomings of the
consumerd; slow IO should not affect other unrelated streams. The
fundamental problem is that blocking IO is used and we should switch
to non-blocking communication if this is a problem (as is done in the
relay daemon).

The first point is more problematic given the existing tracer APIs.
If the tracer could provide the boundary of a "parseable unit" of
metadata, we could send the header of the RELAYD_SEND_METADATA command
with that size and send the various metadata packets as they are made
available. This would make no difference to the relay daemon as it is
not blocking on that socket and will not make the metadata size change
visible to the "live server" until it has all been received.

This size can't be determined right now since it could exceed the
total size of the "metadata" ring buffer. In other words, we can't wait
for the production of metadata to complete before starting to consume.

Finally, while implementing this fix, I also realized that the
computation of the rotation position of the metadata streams is
erroneous. The rotation code makes use of the ring-buffer's positions
to determine the rotation position. However, since both user space and
kernel domains make use of a "cache" behind the ring-buffer, that
cached content must be taken into account when computing the metadata
stream's rotation position.

References
==========

[1] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer.c#L3433
[2] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-timer.c#L312
[3] https://github.com/lttng/lttng-tools/blob/d5ccf8fe0/src/common/consumer/consumer-stream.c#L492

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: I40ee07e5c344c72d9aae2b9b15dc36c00b21e5fa

src/common/consumer/Makefile.am
src/common/consumer/consumer-stream.c
src/common/consumer/consumer-stream.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/consumer/metadata-bucket.c [new file with mode: 0644]
src/common/consumer/metadata-bucket.h [new file with mode: 0644]
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index c628312892c1b6adf44632659493f712f53a97a9..33f043cbc3233a9a31996b4428457619079b7b8a 100644 (file)
@@ -5,7 +5,8 @@ noinst_HEADERS = consumer-metadata-cache.h consumer-timer.h \
                 consumer-testpoint.h
 
 libconsumer_la_SOURCES = consumer.c consumer.h consumer-metadata-cache.c \
-                         consumer-timer.c consumer-stream.c consumer-stream.h
+                         consumer-timer.c consumer-stream.c consumer-stream.h \
+                         metadata-bucket.c metadata-bucket.h
 
 libconsumer_la_LIBADD = \
                $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \
index a705f1b18e72ff6610dce543d5eaa062e3ff99b2..bf3c2f807d65e98640edf3419e4faebb59366213 100644 (file)
@@ -31,6 +31,7 @@
 #include <common/utils.h>
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "consumer-stream.h"
 
@@ -162,7 +163,7 @@ static ssize_t consumer_stream_consume_mmap(
                        subbuffer->info.data.subbuf_size;
 
        return lttng_consumer_on_read_subbuffer_mmap(
-                       ctx, stream, &subbuffer->buffer.buffer, padding_size);
+                       stream, &subbuffer->buffer.buffer, padding_size);
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -404,6 +405,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        stream->metadata_version = subbuffer->info.metadata.version;
        stream->reset_metadata_flag = 1;
 
+       if (stream->metadata_bucket) {
+               metadata_bucket_reset(stream->metadata_bucket);
+       }
+
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
        }
@@ -728,6 +733,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
+       metadata_bucket_destroy(stream->metadata_bucket);
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
@@ -897,4 +903,60 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
 error:
        rcu_read_unlock();
        return ret;
-}
\ No newline at end of file
+}
+
+static ssize_t metadata_bucket_flush(
+               const struct stream_subbuffer *buffer, void *data)
+{
+       ssize_t ret;
+       struct lttng_consumer_stream *stream = data;
+
+       ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+       if (ret < 0) {
+               goto end;
+       }
+end:
+       return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       ssize_t ret;
+       enum metadata_bucket_status status;
+
+       status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+       switch (status) {
+       case METADATA_BUCKET_STATUS_OK:
+               /* Return consumed size. */
+               ret = subbuffer->buffer.buffer.size;
+               break;
+       default:
+               ret = -1;
+       }
+
+       return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       assert(stream->metadata_flag);
+       assert(!stream->metadata_bucket);
+       assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+       stream->metadata_bucket = metadata_bucket_create(
+                       metadata_bucket_flush, stream);
+       if (!stream->metadata_bucket) {
+               ret = -1;
+               goto end;
+       }
+
+       stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+       return ret;
+}
index 52df0c2558daa4e5bdecee60272ff9ff8454fa70..1746c0fdd361d06d19d5b016421c4aad4aa5d1e7 100644 (file)
@@ -97,4 +97,13 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
 int consumer_stream_sync_metadata(struct lttng_consumer_local_data *ctx,
                uint64_t session_id);
 
+/*
+ * Enable metadata bucketization. This must only be enabled if the tracer
+ * provides a reliable metadata `coherent` flag.
+ *
+ * This must be called on initialization before any subbuffer is consumed.
+ */
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream);
+
 #endif /* LTTNG_CONSUMER_STREAM_H */
index eac37654f244220492185121206892b6d2948d82..66234b84f28dd3bf657a75cb2aaafa37644cb540 100644 (file)
@@ -1423,7 +1423,6 @@ end:
  * Returns the number of bytes written
  */
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
                unsigned long padding)
index e06e163bfdd50de210418692c20d04ac7f96c2b3..53ab4c577ce1668773902ef9072377a3517fc38c 100644 (file)
@@ -247,6 +247,14 @@ struct stream_subbuffer {
                        unsigned long subbuf_size;
                        unsigned long padded_subbuf_size;
                        uint64_t version;
+                       /*
+                        * Left unset when unsupported.
+                        *
+                        * Indicates that this is the last sub-buffer of
+                        * a series of sub-buffer that makes-up a coherent
+                        * (parseable) unit of metadata.
+                        */
+                       LTTNG_OPTIONAL(bool) coherent;
                } metadata;
                struct {
                        unsigned long subbuf_size;
@@ -577,6 +585,7 @@ struct lttng_consumer_stream {
                on_sleep_cb on_sleep;
                unlock_cb unlock;
        } read_subbuffer_ops;
+       struct metadata_bucket *metadata_bucket;
 };
 
 /*
@@ -863,7 +872,6 @@ struct lttng_consumer_local_data *lttng_consumer_create(
                int (*update_stream)(uint64_t sessiond_key, uint32_t state));
 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
-               struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
                unsigned long padding);
diff --git a/src/common/consumer/metadata-bucket.c b/src/common/consumer/metadata-bucket.c
new file mode 100644 (file)
index 0000000..1ee5022
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#include "metadata-bucket.h"
+
+#include <common/buffer-view.h>
+#include <common/consumer/consumer.h>
+#include <common/dynamic-buffer.h>
+#include <common/macros.h>
+#include <common/error.h>
+
+struct metadata_bucket {
+       struct lttng_dynamic_buffer content;
+       struct {
+               metadata_bucket_flush_cb fn;
+               void *data;
+       } flush;
+       unsigned int buffer_count;
+};
+
+struct metadata_bucket *metadata_bucket_create(
+               metadata_bucket_flush_cb flush, void *data)
+{
+       struct metadata_bucket *bucket;
+
+       bucket = zmalloc(sizeof(typeof(*bucket)));
+       if (!bucket) {
+               PERROR("Failed to allocate buffer bucket");
+               goto end;
+       }
+
+       bucket->flush.fn = flush;
+       bucket->flush.data = data;
+       lttng_dynamic_buffer_init(&bucket->content);
+end:
+       return bucket;
+}
+
+void metadata_bucket_destroy(struct metadata_bucket *bucket)
+{
+       if (!bucket) {
+               return;
+       }
+
+       if (bucket->content.size > 0) {
+               WARN("Stream metadata bucket destroyed with remaining data: size = %zu, buffer count = %u",
+                               bucket->content.size, bucket->buffer_count);
+       }
+
+       lttng_dynamic_buffer_reset(&bucket->content);
+       free(bucket);
+}
+
+void metadata_bucket_reset(struct metadata_bucket *bucket)
+{
+       lttng_dynamic_buffer_reset(&bucket->content);
+       lttng_dynamic_buffer_init(&bucket->content);
+       bucket->buffer_count = 0;
+}
+
+enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
+               const struct stream_subbuffer *buffer)
+{
+       ssize_t ret;
+       struct lttng_buffer_view flushed_view;
+       struct stream_subbuffer flushed_subbuffer;
+       enum metadata_bucket_status status;
+       const bool should_flush =
+                       LTTNG_OPTIONAL_GET(buffer->info.metadata.coherent);
+       const size_t padding_this_buffer =
+                       buffer->info.metadata.padded_subbuf_size -
+                       buffer->info.metadata.subbuf_size;
+       size_t flush_size;
+
+       DBG("Metadata bucket filled with %zu bytes buffer view, sub-buffer size: %lu, padded sub-buffer size: %lu, coherent: %s",
+                       buffer->buffer.buffer.size,
+                       buffer->info.metadata.subbuf_size,
+                       buffer->info.metadata.padded_subbuf_size,
+                       buffer->info.metadata.coherent.value ? "true" : "false");
+       /*
+        * If no metadata was accumulated and this buffer should be
+        * flushed, don't copy it unecessarily; just flush it directly.
+        */
+       if (!should_flush || bucket->buffer_count != 0) {
+               /*
+                * Append the _padded_ subbuffer since they are combined
+                * into a single "virtual" subbuffer that will be
+                * flushed at once.
+                *
+                * This means that some padding will be sent over the
+                * network, but should not represent a large amount
+                * of data as incoherent subbuffers are typically
+                * pretty full.
+                *
+                * The padding of the last subbuffer (coherent) added to
+                * the bucket is not sent, which is what really matters
+                * from an efficiency point of view.
+                */
+               ret = lttng_dynamic_buffer_append_view(
+                       &bucket->content, &buffer->buffer.buffer);
+               if (ret) {
+                       status = METADATA_BUCKET_STATUS_ERROR;
+                       goto end;
+               }
+       }
+
+       bucket->buffer_count++;
+       if (!should_flush) {
+               status = METADATA_BUCKET_STATUS_OK;
+               goto end;
+       }
+
+       flushed_view = bucket->content.size != 0 ?
+               lttng_buffer_view_from_dynamic_buffer(&bucket->content, 0, -1) :
+               lttng_buffer_view_from_view(&buffer->buffer.buffer, 0, -1);
+
+       /*
+        * The flush is done with the size of all padded sub-buffers, except
+        * for the last one which we can safely "trim". The padding of the last
+        * packet will be reconstructed by the relay daemon.
+        */
+       flush_size = flushed_view.size - padding_this_buffer;
+
+       flushed_subbuffer = (typeof(flushed_subbuffer)) {
+               .buffer.buffer = flushed_view,
+               .info.metadata.subbuf_size = flush_size,
+               .info.metadata.padded_subbuf_size = flushed_view.size,
+               .info.metadata.version = buffer->info.metadata.version,
+               .info.metadata.coherent = buffer->info.metadata.coherent,
+       };
+
+       DBG("Metadata bucket flushing %zu bytes (%u sub-buffer%s)",
+                       flushed_view.size, bucket->buffer_count,
+                       bucket->buffer_count > 1 ? "s" : "");
+       ret = bucket->flush.fn(&flushed_subbuffer, bucket->flush.data);
+       if (ret >= 0) {
+               status = METADATA_BUCKET_STATUS_OK;
+       } else {
+               status = METADATA_BUCKET_STATUS_ERROR;
+       }
+
+       metadata_bucket_reset(bucket);
+
+end:
+       return status;
+}
diff --git a/src/common/consumer/metadata-bucket.h b/src/common/consumer/metadata-bucket.h
new file mode 100644 (file)
index 0000000..0355eb3
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2020 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ *
+ * SPDX-License-Identifier: GPL-2.0-only
+ *
+ */
+
+#ifndef METADATA_BUCKET_H
+#define METADATA_BUCKET_H
+
+#include <common/consumer/consumer.h>
+
+struct metadata_bucket;
+
+typedef ssize_t (*metadata_bucket_flush_cb)(
+               const struct stream_subbuffer *buffer, void *data);
+
+enum metadata_bucket_status {
+       METADATA_BUCKET_STATUS_OK,
+       METADATA_BUCKET_STATUS_ERROR,
+};
+
+struct metadata_bucket *metadata_bucket_create(
+               metadata_bucket_flush_cb flush, void *data);
+
+void metadata_bucket_destroy(struct metadata_bucket *bucket);
+
+enum metadata_bucket_status metadata_bucket_fill(struct metadata_bucket *bucket,
+               const struct stream_subbuffer *buffer);
+
+void metadata_bucket_reset(struct metadata_bucket *bucket);
+
+#endif /* METADATA_BUCKET_H */
+
index 2281f5d8e60dca456c648e268f2b5a9d9e74d1e5..111d655881c7f35c915e83da4239d381df9e4970 100644 (file)
@@ -28,6 +28,7 @@
 #include <inttypes.h>
 #include <unistd.h>
 #include <sys/stat.h>
+#include <stdint.h>
 
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
@@ -45,7 +46,7 @@
 #include <common/optional.h>
 #include <common/buffer-view.h>
 #include <common/consumer/consumer.h>
-#include <stdint.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "kernel-consumer.h"
 
@@ -298,7 +299,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view,
                                        padded_len - len);
                        /*
@@ -1292,6 +1293,42 @@ end:
        return ret;
 }
 
+static
+int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream,
+               struct stream_subbuffer *subbuffer)
+{
+       int ret;
+       const char *addr;
+       bool coherent;
+
+       ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd,
+                       &coherent);
+       if (ret) {
+               goto end;
+       }
+
+       ret = stream->read_subbuffer_ops.extract_subbuffer_info(
+                       stream, subbuffer);
+       if (ret) {
+               goto end;
+       }
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
+
+       ret = get_current_subbuf_addr(stream, &addr);
+       if (ret) {
+               goto end;
+       }
+
+       subbuffer->buffer.buffer = lttng_buffer_view_init(
+                       addr, 0, subbuffer->info.data.padded_subbuf_size);
+       DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s",
+                       subbuffer->info.metadata.padded_subbuf_size,
+                       coherent ? "true" : "false");
+end:
+       return ret;
+}
+
 static
 int put_next_subbuffer(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
@@ -1310,15 +1347,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream,
        return ret;
 }
 
-static void lttng_kconsumer_set_stream_ops(
+static
+bool is_get_next_check_metadata_available(int tracer_fd)
+{
+       return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) !=
+                       -ENOTTY;
+}
+
+static
+int lttng_kconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
-       if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_mmap;
-       } else {
-               stream->read_subbuffer_ops.get_next_subbuffer =
-                               get_next_subbuffer_splice;
+       int ret = 0;
+
+       if (stream->metadata_flag && stream->chan->is_live) {
+               DBG("Attempting to enable metadata bucketization for live consumers");
+               if (is_get_next_check_metadata_available(stream->wait_fd)) {
+                       DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached");
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_metadata_check;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               } else {
+                       /*
+                        * The kernel tracer version is too old to indicate
+                        * when the metadata stream has reached a "coherent"
+                        * (parseable) point.
+                        *
+                        * This means that a live viewer may see an incoherent
+                        * sequence of metadata and fail to parse it.
+                        */
+                       WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream");
+                       metadata_bucket_destroy(stream->metadata_bucket);
+                       stream->metadata_bucket = NULL;
+               }
+       }
+
+       if (!stream->read_subbuffer_ops.get_next_subbuffer) {
+               if (stream->chan->output == CONSUMER_CHANNEL_MMAP) {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_mmap;
+               } else {
+                       stream->read_subbuffer_ops.get_next_subbuffer =
+                                       get_next_subbuffer_splice;
+               }
        }
 
        if (stream->metadata_flag) {
@@ -1334,6 +1409,8 @@ static void lttng_kconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
@@ -1391,7 +1468,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                }
        }
 
-       lttng_kconsumer_set_stream_ops(stream);
+       ret = lttng_kconsumer_set_stream_ops(stream);
+       if (ret) {
+               goto error_close_fd;
+       }
 
        /* we return 0 to let the library handle the FD internally */
        return 0;
index 1cb56f283597f918df602a477efaf5b84ad02129..b78295c917c216f3be18fdf95ec0c7af652a8496 100644 (file)
@@ -46,6 +46,7 @@
 #include <common/utils.h>
 #include <common/index/index.h>
 #include <common/consumer/consumer.h>
+#include <common/optional.h>
 
 #include "ust-consumer.h"
 
@@ -1242,7 +1243,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
 
                        subbuf_view = lttng_buffer_view_init(
                                        subbuf_addr, 0, padded_len);
-                       read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
+                       read_len = lttng_consumer_on_read_subbuffer_mmap(
                                        stream, &subbuf_view, padded_len - len);
                        if (use_relayd) {
                                if (read_len != len) {
@@ -2169,7 +2170,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
        assert(write_len != 0);
        if (write_len < 0) {
                ERR("Writing one metadata packet");
-               ret = -1;
+               ret = write_len;
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
@@ -2493,28 +2494,88 @@ static int get_next_subbuffer_metadata(struct lttng_consumer_stream *stream,
                struct stream_subbuffer *subbuffer)
 {
        int ret;
+       bool cache_empty;
+       bool got_subbuffer;
+       bool coherent;
+       bool buffer_empty;
+       unsigned long consumed_pos, produced_pos;
 
-       ret = ustctl_get_next_subbuf(stream->ustream);
-       if (ret) {
-               ret = commit_one_metadata_packet(stream);
-               if (ret < 0) {
-                       goto end;
-               } else if (ret == 0) {
-                       /* Not an error, the cache is empty. */
-                       ret = -ENODATA;
-                       goto end;
+       do {
+               ret = ustctl_get_next_subbuf(stream->ustream);
+               if (ret == 0) {
+                       got_subbuffer = true;
+               } else {
+                       got_subbuffer = false;
+                       if (ret != -EAGAIN) {
+                               /* Fatal error. */
+                               goto end;
+                       }
                }
 
-               ret = ustctl_get_next_subbuf(stream->ustream);
-               if (ret) {
-                       goto end;
+               /*
+                * Determine if the cache is empty and ensure that a sub-buffer
+                * is made available if the cache is not empty.
+                */
+               if (!got_subbuffer) {
+                       ret = commit_one_metadata_packet(stream);
+                       if (ret < 0 && ret != -ENOBUFS) {
+                               goto end;
+                       } else if (ret == 0) {
+                               /* Not an error, the cache is empty. */
+                               cache_empty = true;
+                               ret = -ENODATA;
+                               goto end;
+                       } else {
+                               cache_empty = false;
+                       }
+               } else {
+                       pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+                       cache_empty = stream->chan->metadata_cache->max_offset ==
+                                     stream->ust_metadata_pushed;
+                       pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
                }
-       }
+       } while (!got_subbuffer);
 
+       /* Populate sub-buffer infos and view. */
        ret = get_next_subbuffer_common(stream, subbuffer);
        if (ret) {
                goto end;
        }
+
+       ret = lttng_ustconsumer_take_snapshot(stream);
+       if (ret < 0) {
+               /*
+                * -EAGAIN is not expected since we got a sub-buffer and haven't
+                * pushed the consumption position yet (on put_next).
+                */
+               PERROR("Failed to take a snapshot of metadata buffer positions");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
+       if (ret) {
+               PERROR("Failed to get metadata consumed position");
+               goto end;
+       }
+
+       ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
+       if (ret) {
+               PERROR("Failed to get metadata produced position");
+               goto end;
+       }
+
+       /* Last sub-buffer of the ring buffer ? */
+       buffer_empty = (consumed_pos + stream->max_sb_size) == produced_pos;
+
+       /*
+        * The sessiond registry lock ensures that coherent units of metadata
+        * are pushed to the consumer daemon at once. Hence, if a sub-buffer is
+        * acquired, the cache is empty, and it is the only available sub-buffer
+        * available, it is safe to assume that it is "coherent".
+        */
+       coherent = got_subbuffer && cache_empty && buffer_empty;
+
+       LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent);
 end:
        return ret;
 }
@@ -2534,9 +2595,11 @@ static int signal_metadata(struct lttng_consumer_stream *stream,
        return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;
 }
 
-static void lttng_ustconsumer_set_stream_ops(
+static int lttng_ustconsumer_set_stream_ops(
                struct lttng_consumer_stream *stream)
 {
+       int ret = 0;
+
        stream->read_subbuffer_ops.on_wake_up = consumer_stream_ust_on_wake_up;
        if (stream->metadata_flag) {
                stream->read_subbuffer_ops.get_next_subbuffer =
@@ -2545,7 +2608,14 @@ static void lttng_ustconsumer_set_stream_ops(
                                extract_metadata_subbuffer_info;
                stream->read_subbuffer_ops.reset_metadata =
                                metadata_stream_reset_cache;
-               stream->read_subbuffer_ops.on_sleep = signal_metadata;
+               if (stream->chan->is_live) {
+                       stream->read_subbuffer_ops.on_sleep = signal_metadata;
+                       ret = consumer_stream_enable_metadata_bucketization(
+                                       stream);
+                       if (ret) {
+                               goto end;
+                       }
+               }
        } else {
                stream->read_subbuffer_ops.get_next_subbuffer =
                                get_next_subbuffer;
@@ -2559,6 +2629,8 @@ static void lttng_ustconsumer_set_stream_ops(
        }
 
        stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer;
+end:
+       return ret;
 }
 
 /*
This page took 0.038312 seconds and 5 git commands to generate.