X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=7875e890b6668ddf99ed6ca79bb107d99e5d17e3;hp=904462da68625905fb5293ad28aa269b4c11ab3f;hb=890d8fe47755c3bad936389cf48ffa141cff41c9;hpb=5a510c9fc5ac8b5292a497192bd47aeec07112e3 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 904462da6..7875e890b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -16,7 +16,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE +#define _LGPL_SOURCE #include #include #include @@ -114,7 +114,7 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * Returns 0 on success, < 0 on error */ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, - uint64_t relayd_id, uint64_t max_stream_size, + uint64_t relayd_id, uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx) { int ret; @@ -220,14 +220,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, } } - /* - * The original value is sent back if max stream size is larger than - * the possible size of the snapshot. Also, we asume that the session - * daemon should never send a maximum stream size that is lower than - * subbuffer size. - */ - consumed_pos = consumer_get_consumed_maxsize(consumed_pos, - produced_pos, max_stream_size); + consumed_pos = consumer_get_consume_start_pos(consumed_pos, + produced_pos, nb_packets_per_stream, + stream->max_sb_size); while (consumed_pos < produced_pos) { ssize_t read_len; @@ -484,7 +479,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.channel.tracefile_size, msg.u.channel.tracefile_count, 0, msg.u.channel.monitor, - msg.u.channel.live_timer_interval); + msg.u.channel.live_timer_interval, + NULL, NULL); if (new_channel == NULL) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR); goto end_nosignal; @@ -642,6 +638,10 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (channel->output) { case CONSUMER_CHANNEL_SPLICE: new_stream->output = LTTNG_EVENT_SPLICE; + ret = utils_create_pipe(new_stream->splice_pipe); + if (ret < 0) { + goto end_nosignal; + } break; case CONSUMER_CHANNEL_MMAP: new_stream->output = LTTNG_EVENT_MMAP; @@ -774,7 +774,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * Send status code to session daemon. */ ret = consumer_send_status_msg(sock, ret_code); - if (ret < 0) { + if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } @@ -881,7 +881,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_kconsumer_snapshot_channel(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, - msg.u.snapshot_channel.max_stream_size, + msg.u.snapshot_channel.nb_packets_per_stream, ctx); if (ret < 0) { ERR("Snapshot channel failed"); @@ -1218,7 +1218,22 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, /* * In live, block until all the metadata is sent. */ + pthread_mutex_lock(&stream->metadata_timer_lock); + assert(!stream->missed_metadata_flush); + stream->waiting_on_metadata = true; + pthread_mutex_unlock(&stream->metadata_timer_lock); + err = consumer_stream_sync_metadata(ctx, stream->session_id); + + pthread_mutex_lock(&stream->metadata_timer_lock); + stream->waiting_on_metadata = false; + if (stream->missed_metadata_flush) { + stream->missed_metadata_flush = false; + pthread_mutex_unlock(&stream->metadata_timer_lock); + (void) consumer_flush_kernel_index(stream); + } else { + pthread_mutex_unlock(&stream->metadata_timer_lock); + } if (err < 0) { goto end; }