X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-stream.c;h=bf3c2f807d65e98640edf3419e4faebb59366213;hp=a705f1b18e72ff6610dce543d5eaa062e3ff99b2;hb=d6ef77b34f2ba071e280bc8c25936e95bf1094d9;hpb=29d1a7ae5d5b46002e5883366a7b06f1d9dcc115 diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index a705f1b18..bf3c2f807 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -31,6 +31,7 @@ #include #include #include +#include #include "consumer-stream.h" @@ -162,7 +163,7 @@ static ssize_t consumer_stream_consume_mmap( subbuffer->info.data.subbuf_size; return lttng_consumer_on_read_subbuffer_mmap( - ctx, stream, &subbuffer->buffer.buffer, padding_size); + stream, &subbuffer->buffer.buffer, padding_size); } static ssize_t consumer_stream_consume_splice( @@ -404,6 +405,10 @@ int metadata_stream_check_version(struct lttng_consumer_stream *stream, stream->metadata_version = subbuffer->info.metadata.version; stream->reset_metadata_flag = 1; + if (stream->metadata_bucket) { + metadata_bucket_reset(stream->metadata_bucket); + } + if (stream->read_subbuffer_ops.reset_metadata) { stream->read_subbuffer_ops.reset_metadata(stream); } @@ -728,6 +733,7 @@ void consumer_stream_free(struct lttng_consumer_stream *stream) { assert(stream); + metadata_bucket_destroy(stream->metadata_bucket); call_rcu(&stream->node.head, free_stream_rcu); } @@ -897,4 +903,60 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, error: rcu_read_unlock(); return ret; -} \ No newline at end of file +} + +static ssize_t metadata_bucket_flush( + const struct stream_subbuffer *buffer, void *data) +{ + ssize_t ret; + struct lttng_consumer_stream *stream = data; + + ret = consumer_stream_consume_mmap(NULL, stream, buffer); + if (ret < 0) { + goto end; + } +end: + return ret; +} + +static ssize_t metadata_bucket_consume( + struct lttng_consumer_local_data *unused, + struct lttng_consumer_stream *stream, + const struct stream_subbuffer *subbuffer) +{ + ssize_t ret; + enum metadata_bucket_status status; + + status = metadata_bucket_fill(stream->metadata_bucket, subbuffer); + switch (status) { + case METADATA_BUCKET_STATUS_OK: + /* Return consumed size. */ + ret = subbuffer->buffer.buffer.size; + break; + default: + ret = -1; + } + + return ret; +} + +int consumer_stream_enable_metadata_bucketization( + struct lttng_consumer_stream *stream) +{ + int ret = 0; + + assert(stream->metadata_flag); + assert(!stream->metadata_bucket); + assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); + + stream->metadata_bucket = metadata_bucket_create( + metadata_bucket_flush, stream); + if (!stream->metadata_bucket) { + ret = -1; + goto end; + } + + stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume; +end: + return ret; +}