Fix: consumerd: live client receives incomplete metadata
[lttng-tools.git] / src / common / consumer / consumer-stream.c
index a705f1b18e72ff6610dce543d5eaa062e3ff99b2..bf3c2f807d65e98640edf3419e4faebb59366213 100644 (file)
@@ -31,6 +31,7 @@
 #include <common/utils.h>
 #include <common/consumer/consumer.h>
 #include <common/consumer/consumer-timer.h>
+#include <common/consumer/metadata-bucket.h>
 
 #include "consumer-stream.h"
 
@@ -162,7 +163,7 @@ static ssize_t consumer_stream_consume_mmap(
                        subbuffer->info.data.subbuf_size;
 
        return lttng_consumer_on_read_subbuffer_mmap(
-                       ctx, stream, &subbuffer->buffer.buffer, padding_size);
+                       stream, &subbuffer->buffer.buffer, padding_size);
 }
 
 static ssize_t consumer_stream_consume_splice(
@@ -404,6 +405,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream,
        stream->metadata_version = subbuffer->info.metadata.version;
        stream->reset_metadata_flag = 1;
 
+       if (stream->metadata_bucket) {
+               metadata_bucket_reset(stream->metadata_bucket);
+       }
+
        if (stream->read_subbuffer_ops.reset_metadata) {
                stream->read_subbuffer_ops.reset_metadata(stream);
        }
@@ -728,6 +733,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream)
 {
        assert(stream);
 
+       metadata_bucket_destroy(stream->metadata_bucket);
        call_rcu(&stream->node.head, free_stream_rcu);
 }
 
@@ -897,4 +903,60 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream,
 error:
        rcu_read_unlock();
        return ret;
-}
\ No newline at end of file
+}
+
+static ssize_t metadata_bucket_flush(
+               const struct stream_subbuffer *buffer, void *data)
+{
+       ssize_t ret;
+       struct lttng_consumer_stream *stream = data;
+
+       ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+       if (ret < 0) {
+               goto end;
+       }
+end:
+       return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+               struct lttng_consumer_local_data *unused,
+               struct lttng_consumer_stream *stream,
+               const struct stream_subbuffer *subbuffer)
+{
+       ssize_t ret;
+       enum metadata_bucket_status status;
+
+       status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+       switch (status) {
+       case METADATA_BUCKET_STATUS_OK:
+               /* Return consumed size. */
+               ret = subbuffer->buffer.buffer.size;
+               break;
+       default:
+               ret = -1;
+       }
+
+       return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+               struct lttng_consumer_stream *stream)
+{
+       int ret = 0;
+
+       assert(stream->metadata_flag);
+       assert(!stream->metadata_bucket);
+       assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+       stream->metadata_bucket = metadata_bucket_create(
+                       metadata_bucket_flush, stream);
+       if (!stream->metadata_bucket) {
+               ret = -1;
+               goto end;
+       }
+
+       stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+       return ret;
+}
This page took 0.051043 seconds and 5 git commands to generate.