X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=0086d748f0c6fe17fd54e87ccffdf7384f2ae447;hp=82fe0168f2c85b11cac63390ddd427a763dc4149;hb=6b584c2ece62c620a86cf438bb358a9da7962b7e;hpb=b6921a1724c571a787e3600c5dae4d9efeef0290 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 82fe0168f..0086d748f 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1408,6 +1408,7 @@ void lttng_consumer_cleanup(void) { struct lttng_ht_iter iter; struct lttng_consumer_channel *channel; + unsigned int trace_chunks_left; rcu_read_lock(); @@ -1432,6 +1433,27 @@ void lttng_consumer_cleanup(void) */ lttng_ht_destroy(consumer_data.stream_list_ht); + /* + * Trace chunks in the registry may still exist if the session + * daemon has encountered an internal error and could not + * tear down its sessions and/or trace chunks properly. + * + * Release the session daemon's implicit reference to any remaining + * trace chunk and print an error if any trace chunk was found. Note + * that there are _no_ legitimate cases for trace chunks to be left, + * it is a leak. However, it can happen following a crash of the + * session daemon and not emptying the registry would cause an assertion + * to hit. + */ + trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk( + consumer_data.chunk_registry); + if (trace_chunks_left) { + ERR("%u trace chunks are leaked by lttng-consumerd. " + "This can be caused by an internal error of the session daemon.", + trace_chunks_left); + } + /* Run all callbacks freeing each chunk. */ + rcu_barrier(); lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry); } @@ -2264,7 +2286,8 @@ void lttng_consumer_close_all_metadata(void) void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht) { - struct lttng_consumer_channel *free_chan = NULL; + struct lttng_consumer_channel *channel = NULL; + bool free_channel = false; assert(stream); /* @@ -2276,11 +2299,17 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Consumer delete metadata stream %d", stream->wait_fd); pthread_mutex_lock(&consumer_data.lock); - pthread_mutex_lock(&stream->chan->lock); + /* + * Note that this assumes that a stream's channel is never changed and + * that the stream's lock doesn't need to be taken to sample its + * channel. + */ + channel = stream->chan; + pthread_mutex_lock(&channel->lock); pthread_mutex_lock(&stream->lock); - if (stream->chan->metadata_cache) { + if (channel->metadata_cache) { /* Only applicable to userspace consumers. */ - pthread_mutex_lock(&stream->chan->metadata_cache->lock); + pthread_mutex_lock(&channel->metadata_cache->lock); } /* Remove any reference to that stream. */ @@ -2292,28 +2321,29 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, consumer_stream_destroy_buffers(stream); /* Atomically decrement channel refcount since other threads can use it. */ - if (!uatomic_sub_return(&stream->chan->refcount, 1) - && !uatomic_read(&stream->chan->nb_init_stream_left)) { + if (!uatomic_sub_return(&channel->refcount, 1) + && !uatomic_read(&channel->nb_init_stream_left)) { /* Go for channel deletion! */ - free_chan = stream->chan; + free_channel = true; } + stream->chan = NULL; /* * Nullify the stream reference so it is not used after deletion. The * channel lock MUST be acquired before being able to check for a NULL * pointer value. */ - stream->chan->metadata_stream = NULL; + channel->metadata_stream = NULL; - if (stream->chan->metadata_cache) { - pthread_mutex_unlock(&stream->chan->metadata_cache->lock); + if (channel->metadata_cache) { + pthread_mutex_unlock(&channel->metadata_cache->lock); } pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); + pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); - if (free_chan) { - consumer_del_channel(free_chan); + if (free_channel) { + consumer_del_channel(channel); } lttng_trace_chunk_put(stream->trace_chunk); @@ -4431,7 +4461,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( { int ret; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct lttng_trace_chunk *created_chunk, *published_chunk; + struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL; enum lttng_trace_chunk_status chunk_status; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; char creation_timestamp_buffer[ISO8601_STR_LEN]; @@ -4486,7 +4516,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!created_chunk) { ERR("Failed to create trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } if (chunk_override_name) { @@ -4494,7 +4524,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( chunk_override_name); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } } @@ -4504,7 +4534,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk credentials"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } /* * The consumer daemon has no ownership of the chunk output @@ -4515,7 +4545,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk's directory handle"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } } @@ -4527,7 +4557,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!published_chunk) { ERR("Failed to publish trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } rcu_read_lock(); @@ -4550,11 +4580,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( * channels. */ enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; DBG("Failed to set new trace chunk on existing channels, rolling back"); close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp, NULL); + chunk_creation_timestamp, NULL, + path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4580,12 +4612,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!relayd || ret) { enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, chunk_creation_timestamp, - NULL); + NULL, path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, @@ -4593,21 +4626,23 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( } ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto error; + goto error_unlock; } } -error: +error_unlock: rcu_read_unlock(); +error: /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); -end: + lttng_trace_chunk_put(created_chunk); return ret_code; } enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command) + const enum lttng_trace_chunk_command_type *close_command, + char *path) { enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_trace_chunk *chunk; @@ -4705,7 +4740,8 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_close_trace_chunk( - &relayd->control_sock, chunk); + &relayd->control_sock, chunk, + path); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, @@ -4736,12 +4772,11 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists( { int ret; enum lttcomm_return_code ret_code; - struct lttng_trace_chunk *chunk; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = NULL; - bool chunk_exists_remote; + bool chunk_exists_local, chunk_exists_remote; if (relayd_id) { int ret; @@ -4759,13 +4794,19 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists( DBG("Consumer trace chunk exists command: relayd_id = %s" ", chunk_id = %" PRIu64, relayd_id_str, chunk_id); - chunk = lttng_trace_chunk_registry_find_chunk( + ret = lttng_trace_chunk_registry_chunk_exists( consumer_data.chunk_registry, session_id, - chunk_id); - DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist"); - if (chunk) { + chunk_id, &chunk_exists_local); + if (ret) { + /* Internal error. */ + ERR("Failed to query the existence of a trace chunk"); + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto end_rcu_unlock; + } + DBG("Trace chunk %s locally", + chunk_exists_local ? "exists" : "does not exist"); + if (chunk_exists_local) { ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL; - lttng_trace_chunk_put(chunk); goto end; } else if (is_local_trace) { ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;