X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=a94749635246a04f195cf7c0ff577ec9abae3dd1;hb=5ab669088a63bfc0a138ad8418cea17a58789281;hp=5c0dce45d27e87337c3c3190551522f122c4b49f;hpb=fdf9986c9639bbe13935351a6e3c3137e3160383;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 5c0dce45d..a94749635 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -110,7 +110,8 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, struct lttng_consumer_local_data *ctx) + uint64_t relayd_id, uint64_t max_stream_size, + struct lttng_consumer_local_data *ctx) { int ret; unsigned long consumed_pos, produced_pos; @@ -203,6 +204,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } + /* + * The original value is sent back if max stream size is larger than + * the possible size of the snapshot. Also, we asume that the session + * daemon should never send a maximum stream size that is lower than + * subbuffer size. + */ + consumed_pos = consumer_get_consumed_maxsize(consumed_pos, + produced_pos, max_stream_size); + while (consumed_pos < produced_pos) { ssize_t read_len; unsigned long len, padded_len; @@ -643,17 +653,38 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { + ret = consumer_add_metadata_stream(new_stream); + if (ret) { + ERR("Consumer add metadata stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_metadata_pipe; } else { + ret = consumer_add_data_stream(new_stream); + if (ret) { + ERR("Consumer add stream %" PRIu64 " failed. Continuing", + new_stream->key); + consumer_stream_free(new_stream); + goto end_nosignal; + } stream_pipe = ctx->consumer_data_pipe; } + /* Vitible to other threads */ + new_stream->globally_visible = 1; + ret = lttng_pipe_write(stream_pipe, &new_stream, sizeof(new_stream)); if (ret < 0) { ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_stream_free(new_stream); + if (new_stream->metadata_flag) { + consumer_del_stream_for_metadata(new_stream); + } else { + consumer_del_stream_for_data(new_stream); + } goto end_nosignal; } @@ -737,7 +768,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, - msg.u.snapshot_channel.relayd_id, ctx); + msg.u.snapshot_channel.relayd_id, + msg.u.snapshot_channel.max_stream_size, + ctx); if (ret < 0) { ERR("Snapshot channel failed"); ret_code = LTTNG_ERR_KERN_CHAN_FAIL; @@ -993,6 +1026,11 @@ int lttng_kconsumer_data_pending(struct lttng_consumer_stream *stream) assert(stream); + if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) { + ret = 0; + goto end; + } + ret = kernctl_get_next_subbuf(stream->wait_fd); if (ret == 0) { /* There is still data so let's put back this subbuffer. */