X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=209931f5644a3d74dc27ed1a8f7dbb3d3863d387;hb=d1ba29d290281cf72ca3ec7b0222b336c747e925;hp=6d6690a32161a4cbc24e05157f31407fe9c1df74;hpb=f5ba75b4f0c0b44092c76bc931b25b24a2e62718;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 6d6690a32..209931f56 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -78,6 +78,11 @@ static void destroy_channel(struct lttng_consumer_channel *channel) lttng_ustconsumer_del_channel(channel); lttng_ustconsumer_free_channel(channel); } + + if (channel->trace_chunk) { + lttng_trace_chunk_put(channel->trace_chunk); + } + free(channel); } @@ -2176,6 +2181,28 @@ end_rotate_channel_nosignal: msg.u.trace_chunk_exists.chunk_id); goto end_msg_sessiond; } + case LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS: + { + const uint64_t key = msg.u.open_channel_packets.key; + struct lttng_consumer_channel *channel = + consumer_find_channel(key); + + if (channel) { + pthread_mutex_lock(&channel->lock); + ret_code = lttng_consumer_open_channel_packets(channel); + pthread_mutex_unlock(&channel->lock); + } else { + /* + * The channel could have disappeared in per-pid + * buffering mode. + */ + DBG("Channel %" PRIu64 " not found", key); + ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; + } + + health_code_update(); + goto end_msg_sessiond; + } default: break; } @@ -2430,20 +2457,19 @@ int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream) } static -void metadata_stream_reset_cache(struct lttng_consumer_stream *stream) +void metadata_stream_reset_cache_consumed_position( + struct lttng_consumer_stream *stream) { DBG("Reset metadata cache of session %" PRIu64, stream->chan->session_id); stream->ust_metadata_pushed = 0; - stream->metadata_version = stream->chan->metadata_cache->version; - stream->reset_metadata_flag = 1; } /* * Write up to one packet from the metadata cache to the channel. * - * Returns the number of bytes pushed in the cache, or a negative value - * on error. + * Returns the number of bytes pushed from the cache into the ring buffer, or a + * negative value on error. */ static int commit_one_metadata_packet(struct lttng_consumer_stream *stream) @@ -2452,10 +2478,41 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) int ret; pthread_mutex_lock(&stream->chan->metadata_cache->lock); - if (stream->chan->metadata_cache->max_offset - == stream->ust_metadata_pushed) { - ret = 0; - goto end; + if (stream->chan->metadata_cache->max_offset == + stream->ust_metadata_pushed) { + /* + * In the context of a user space metadata channel, a + * change in version can be detected in two ways: + * 1) During the pre-consume of the `read_subbuffer` loop, + * 2) When populating the metadata ring buffer (i.e. here). + * + * This function is invoked when there is no metadata + * available in the ring-buffer. If all data was consumed + * up to the size of the metadata cache, there is no metadata + * to insert in the ring-buffer. + * + * However, the metadata version could still have changed (a + * regeneration without any new data will yield the same cache + * size). + * + * The cache's version is checked for a version change and the + * consumed position is reset if one occurred. + * + * This check is only necessary for the user space domain as + * it has to manage the cache explicitly. If this reset was not + * performed, no metadata would be consumed (and no reset would + * occur as part of the pre-consume) until the metadata size + * exceeded the cache size. + */ + if (stream->metadata_version != + stream->chan->metadata_cache->version) { + metadata_stream_reset_cache_consumed_position(stream); + consumer_stream_metadata_set_version(stream, + stream->chan->metadata_cache->version); + } else { + ret = 0; + goto end; + } } write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan, @@ -2496,15 +2553,13 @@ end: * awaiting on metadata to be pushed out. * * The RCU read side lock must be held by the caller. - * - * Return 0 if new metadatda is available, EAGAIN if the metadata stream - * is empty or a negative value on error. */ -int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, +enum sync_metadata_status lttng_ustconsumer_sync_metadata( + struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *metadata_stream) { int ret; - int retry = 0; + enum sync_metadata_status status; struct lttng_consumer_channel *metadata_channel; assert(ctx); @@ -2519,6 +2574,7 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } @@ -2536,38 +2592,30 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, if (consumer_stream_is_deleted(metadata_stream)) { DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", metadata_stream->key); - ret = 0; + status = SYNC_METADATA_STATUS_NO_DATA; goto end; } ret = commit_one_metadata_packet(metadata_stream); - if (ret <= 0) { + if (ret < 0) { + status = SYNC_METADATA_STATUS_ERROR; goto end; } else if (ret > 0) { - retry = 1; + status = SYNC_METADATA_STATUS_NEW_DATA; + } else /* ret == 0 */ { + status = SYNC_METADATA_STATUS_NO_DATA; + goto end; } ret = ustctl_snapshot(metadata_stream->ustream); if (ret < 0) { - if (errno != EAGAIN) { - ERR("Sync metadata, taking UST snapshot"); - goto end; - } - DBG("No new metadata when syncing them."); - /* No new metadata, exit. */ - ret = ENODATA; + ERR("Failed to take a snapshot of the metadata ring-buffer positions, ret = %d", ret); + status = SYNC_METADATA_STATUS_ERROR; goto end; } - /* - * After this flush, we still need to extract metadata. - */ - if (retry) { - ret = EAGAIN; - } - end: - return ret; + return status; } /* @@ -2680,7 +2728,7 @@ static int extract_metadata_subbuffer_info(struct lttng_consumer_stream *stream, goto end; } - subbuf->info.metadata.version = stream->chan->metadata_cache->version; + subbuf->info.metadata.version = stream->metadata_version; end: return ret; @@ -2924,7 +2972,7 @@ static int lttng_ustconsumer_set_stream_ops( stream->read_subbuffer_ops.extract_subbuffer_info = extract_metadata_subbuffer_info; stream->read_subbuffer_ops.reset_metadata = - metadata_stream_reset_cache; + metadata_stream_reset_cache_consumed_position; if (stream->chan->is_live) { stream->read_subbuffer_ops.on_sleep = signal_metadata; ret = consumer_stream_enable_metadata_bucketization(