X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=b9d843714550c5821d3b091eb7fb6e390a3c6786;hp=e06f3b3e079aae44f494ba6d4409f02120018af4;hb=2463b7879c00298daa79744cdaae82ac061a4ed8;hpb=6f9449c22eef59294cf1e1dc3610a5cbf14baec0 diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index e06f3b3e0..b9d843714 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ #include #include #include -#include +#include #include "kernel-consumer.h" @@ -287,7 +288,7 @@ static int lttng_kconsumer_snapshot_channel( subbuf_view = lttng_buffer_view_init( subbuf_addr, 0, padded_len); - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + read_len = lttng_consumer_on_read_subbuffer_mmap( stream, &subbuf_view, padded_len - len); /* @@ -1206,8 +1207,8 @@ error_rotate_channel: case LTTNG_CONSUMER_CREATE_TRACE_CHUNK: { const struct lttng_credentials credentials = { - .uid = msg.u.create_trace_chunk.credentials.value.uid, - .gid = msg.u.create_trace_chunk.credentials.value.gid, + .uid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.uid), + .gid = LTTNG_OPTIONAL_INIT_VALUE(msg.u.create_trace_chunk.credentials.value.gid), }; const bool is_local_trace = !msg.u.create_trace_chunk.relayd_id.is_set; @@ -1310,6 +1311,24 @@ error_rotate_channel: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + WARN("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: goto end_nosignal; } @@ -1347,36 +1366,38 @@ end: * metadata thread can consumer them. * * Metadata stream lock MUST be acquired. - * - * Return 0 if new metadatda is available, EAGAIN if the metadata stream - * is empty or a negative value on error. */ -int lttng_kconsumer_sync_metadata(struct lttng_consumer_stream *metadata) +enum sync_metadata_status lttng_kconsumer_sync_metadata( + struct lttng_consumer_stream *metadata) { int ret; + enum sync_metadata_status status; assert(metadata); ret = kernctl_buffer_flush(metadata->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); + status = SYNC_METADATA_STATUS_ERROR; goto end; } ret = kernctl_snapshot(metadata->wait_fd); if (ret < 0) { - if (ret != -EAGAIN) { + if (errno == EAGAIN) { + /* No new metadata, exit. */ + DBG("Sync metadata, no new kernel metadata"); + status = SYNC_METADATA_STATUS_NO_DATA; + } else { ERR("Sync metadata, taking kernel snapshot failed."); - goto end; + status = SYNC_METADATA_STATUS_ERROR; } - DBG("Sync metadata, no new kernel metadata"); - /* No new metadata, exit. */ - ret = ENODATA; - goto end; + } else { + status = SYNC_METADATA_STATUS_NEW_DATA; } end: - return ret; + return status; } static @@ -1558,6 +1579,42 @@ end: return ret; } +static +int get_next_subbuffer_metadata_check(struct lttng_consumer_stream *stream, + struct stream_subbuffer *subbuffer) +{ + int ret; + const char *addr; + bool coherent; + + ret = kernctl_get_next_subbuf_metadata_check(stream->wait_fd, + &coherent); + if (ret) { + goto end; + } + + ret = stream->read_subbuffer_ops.extract_subbuffer_info( + stream, subbuffer); + if (ret) { + goto end; + } + + LTTNG_OPTIONAL_SET(&subbuffer->info.metadata.coherent, coherent); + + ret = get_current_subbuf_addr(stream, &addr); + if (ret) { + goto end; + } + + subbuffer->buffer.buffer = lttng_buffer_view_init( + addr, 0, subbuffer->info.data.padded_subbuf_size); + DBG("Got metadata packet with padded_subbuf_size = %lu, coherent = %s", + subbuffer->info.metadata.padded_subbuf_size, + coherent ? "true" : "false"); +end: + return ret; +} + static int put_next_subbuffer(struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer) @@ -1576,15 +1633,53 @@ int put_next_subbuffer(struct lttng_consumer_stream *stream, return ret; } -static void lttng_kconsumer_set_stream_ops( +static +bool is_get_next_check_metadata_available(int tracer_fd) +{ + return kernctl_get_next_subbuf_metadata_check(tracer_fd, NULL) != + -ENOTTY; +} + +static +int lttng_kconsumer_set_stream_ops( struct lttng_consumer_stream *stream) { - if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_mmap; - } else { - stream->read_subbuffer_ops.get_next_subbuffer = - get_next_subbuffer_splice; + int ret = 0; + + if (stream->metadata_flag && stream->chan->is_live) { + DBG("Attempting to enable metadata bucketization for live consumers"); + if (is_get_next_check_metadata_available(stream->wait_fd)) { + DBG("Kernel tracer supports get_next_subbuffer_metadata_check, metadata will be accumulated until a coherent state is reached"); + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_metadata_check; + ret = consumer_stream_enable_metadata_bucketization( + stream); + if (ret) { + goto end; + } + } else { + /* + * The kernel tracer version is too old to indicate + * when the metadata stream has reached a "coherent" + * (parseable) point. + * + * This means that a live viewer may see an incoherent + * sequence of metadata and fail to parse it. + */ + WARN("Kernel tracer does not support get_next_subbuffer_metadata_check which may cause live clients to fail to parse the metadata stream"); + metadata_bucket_destroy(stream->metadata_bucket); + stream->metadata_bucket = NULL; + } + } + + if (!stream->read_subbuffer_ops.get_next_subbuffer) { + if (stream->chan->output == CONSUMER_CHANNEL_MMAP) { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_mmap; + } else { + stream->read_subbuffer_ops.get_next_subbuffer = + get_next_subbuffer_splice; + } } if (stream->metadata_flag) { @@ -1600,6 +1695,8 @@ static void lttng_kconsumer_set_stream_ops( } stream->read_subbuffer_ops.put_next_subbuffer = put_next_subbuffer; +end: + return ret; } int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) @@ -1640,7 +1737,10 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream) } } - lttng_kconsumer_set_stream_ops(stream); + ret = lttng_kconsumer_set_stream_ops(stream); + if (ret) { + goto error_close_fd; + } /* we return 0 to let the library handle the FD internally */ return 0;