X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=32dfd5bb49633e1717f6f63627d0cbde58179c31;hp=514278d91fd930fac88603286e93fec16c7691a5;hb=cbf53d23cecaca9c6ec71582663c4a8254a9f285;hpb=5cfcab67c300abcc2dabe8891525572dfc42ff1d diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 514278d91..32dfd5bb4 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1079,7 +1079,6 @@ error_stream: * Clean up the stream completly because the next snapshot will use a new * metadata stream. */ - pthread_mutex_lock(&metadata_stream->lock); consumer_stream_destroy(metadata_stream, NULL); cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; @@ -2040,8 +2039,7 @@ end_rotate_channel_nosignal: *msg.u.create_trace_chunk.override_name ? msg.u.create_trace_chunk.override_name : NULL; - LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle = - LTTNG_OPTIONAL_INIT; + struct lttng_directory_handle *chunk_directory_handle = NULL; /* * The session daemon will only provide a chunk directory file @@ -2066,17 +2064,15 @@ end_rotate_channel_nosignal: DBG("Received trace chunk directory fd (%d)", chunk_dirfd); - ret = lttng_directory_handle_init_from_dirfd( - &chunk_directory_handle.value, + chunk_directory_handle = lttng_directory_handle_create_from_dirfd( chunk_dirfd); - if (ret) { + if (!chunk_directory_handle) { ERR("Failed to initialize chunk directory handle from directory file descriptor"); if (close(chunk_dirfd)) { PERROR("Failed to close chunk directory file descriptor"); } goto error_fatal; } - chunk_directory_handle.is_set = true; } ret_code = lttng_consumer_create_trace_chunk( @@ -2089,14 +2085,8 @@ end_rotate_channel_nosignal: msg.u.create_trace_chunk.credentials.is_set ? &credentials : NULL, - chunk_directory_handle.is_set ? - &chunk_directory_handle.value : - NULL); - - if (chunk_directory_handle.is_set) { - lttng_directory_handle_fini( - &chunk_directory_handle.value); - } + chunk_directory_handle); + lttng_directory_handle_put(chunk_directory_handle); goto end_msg_sessiond; } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: @@ -2171,7 +2161,6 @@ end_msg_sessiond: end_channel_error: if (channel) { - pthread_mutex_unlock(&channel->lock); /* * Free channel here since no one has a reference to it. We don't * free after that because a stream can store this pointer. @@ -2424,62 +2413,69 @@ static int get_index_values(struct ctf_packet_index *index, struct ustctl_consumer_stream *ustream) { int ret; + uint64_t packet_size, content_size, timestamp_begin, timestamp_end, + events_discarded, stream_id, stream_instance_id, + packet_seq_num; - ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin); + ret = ustctl_get_timestamp_begin(ustream, ×tamp_begin); if (ret < 0) { PERROR("ustctl_get_timestamp_begin"); goto error; } - index->timestamp_begin = htobe64(index->timestamp_begin); - ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end); + ret = ustctl_get_timestamp_end(ustream, ×tamp_end); if (ret < 0) { PERROR("ustctl_get_timestamp_end"); goto error; } - index->timestamp_end = htobe64(index->timestamp_end); - ret = ustctl_get_events_discarded(ustream, &index->events_discarded); + ret = ustctl_get_events_discarded(ustream, &events_discarded); if (ret < 0) { PERROR("ustctl_get_events_discarded"); goto error; } - index->events_discarded = htobe64(index->events_discarded); - ret = ustctl_get_content_size(ustream, &index->content_size); + ret = ustctl_get_content_size(ustream, &content_size); if (ret < 0) { PERROR("ustctl_get_content_size"); goto error; } - index->content_size = htobe64(index->content_size); - ret = ustctl_get_packet_size(ustream, &index->packet_size); + ret = ustctl_get_packet_size(ustream, &packet_size); if (ret < 0) { PERROR("ustctl_get_packet_size"); goto error; } - index->packet_size = htobe64(index->packet_size); - ret = ustctl_get_stream_id(ustream, &index->stream_id); + ret = ustctl_get_stream_id(ustream, &stream_id); if (ret < 0) { PERROR("ustctl_get_stream_id"); goto error; } - index->stream_id = htobe64(index->stream_id); - ret = ustctl_get_instance_id(ustream, &index->stream_instance_id); + ret = ustctl_get_instance_id(ustream, &stream_instance_id); if (ret < 0) { PERROR("ustctl_get_instance_id"); goto error; } - index->stream_instance_id = htobe64(index->stream_instance_id); - ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num); + ret = ustctl_get_sequence_number(ustream, &packet_seq_num); if (ret < 0) { PERROR("ustctl_get_sequence_number"); goto error; } - index->packet_seq_num = htobe64(index->packet_seq_num); + + *index = (typeof(*index)) { + .offset = index->offset, + .packet_size = htobe64(packet_size), + .content_size = htobe64(content_size), + .timestamp_begin = htobe64(timestamp_begin), + .timestamp_end = htobe64(timestamp_end), + .events_discarded = htobe64(events_discarded), + .stream_id = htobe64(stream_id), + .stream_instance_id = htobe64(stream_instance_id), + .packet_seq_num = htobe64(packet_seq_num), + }; error: return ret; @@ -2577,37 +2573,59 @@ end: * interacting with sessiond, else we cause a deadlock with live * awaiting on metadata to be pushed out. * + * The RCU read side lock must be held by the caller. + * * Return 0 if new metadatda is available, EAGAIN if the metadata stream * is empty or a negative value on error. */ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *metadata) + struct lttng_consumer_stream *metadata_stream) { int ret; int retry = 0; + struct lttng_consumer_channel *metadata_channel; assert(ctx); - assert(metadata); + assert(metadata_stream); - pthread_mutex_unlock(&metadata->lock); + metadata_channel = metadata_stream->chan; + pthread_mutex_unlock(&metadata_stream->lock); /* * Request metadata from the sessiond, but don't wait for the flush * because we locked the metadata thread. */ - ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0); - pthread_mutex_lock(&metadata->lock); + ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0); + pthread_mutex_lock(&metadata_stream->lock); if (ret < 0) { goto end; } - ret = commit_one_metadata_packet(metadata); + /* + * The metadata stream and channel can be deleted while the + * metadata stream lock was released. The streamed is checked + * for deletion before we use it further. + * + * Note that it is safe to access a logically-deleted stream since its + * existence is still guaranteed by the RCU read side lock. However, + * it should no longer be used. The close/deletion of the metadata + * channel and stream already guarantees that all metadata has been + * consumed. Therefore, there is nothing left to do in this function. + */ + if (consumer_stream_is_deleted(metadata_stream)) { + DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization", + metadata_stream->key); + ret = 0; + goto end; + } + + ret = commit_one_metadata_packet(metadata_stream); if (ret <= 0) { goto end; } else if (ret > 0) { retry = 1; } - ret = ustctl_snapshot(metadata->ustream); + ret = ustctl_snapshot(metadata_stream->ustream); if (ret < 0) { if (errno != EAGAIN) { ERR("Sync metadata, taking UST snapshot");