X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=3c51cfa6f40221cff9c2ee8ce65b8788c3ca7afc;hp=82be751c40ef82e83e47e10e785caa0d131cffbe;hb=1f8d1c146ba9c46338d04c416a75ee6d105c5c50;hpb=a6ef8ee6a852f9fa6394408fcc8d4d49891852d9 diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 82be751c4..3c51cfa6f 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -709,11 +709,6 @@ void consumer_add_data_stream(struct lttng_consumer_stream *stream) pthread_mutex_unlock(&consumer_data.lock); } -void consumer_del_data_stream(struct lttng_consumer_stream *stream) -{ - consumer_del_stream(stream, data_ht); -} - /* * Add relayd socket to global consumer data hashtable. RCU read side lock MUST * be acquired before calling this. @@ -1014,13 +1009,6 @@ int lttng_consumer_channel_set_trace_chunk( struct lttng_consumer_channel *channel, struct lttng_trace_chunk *new_trace_chunk) { - int ret = 0; - const bool is_local_trace = channel->relayd_id == -1ULL; - bool update_stream_trace_chunk; - struct cds_lfht_iter iter; - struct lttng_consumer_stream *stream; - unsigned long channel_hash; - pthread_mutex_lock(&channel->lock); if (channel->is_deleted) { /* @@ -1032,24 +1020,6 @@ int lttng_consumer_channel_set_trace_chunk( */ goto end; } - /* - * A stream can transition to a state where it and its channel - * no longer belong to a trace chunk. For instance, this happens when - * a session is rotated while it is inactive. After the rotation - * of an inactive session completes, the channel and its streams no - * longer belong to a trace chunk. - * - * However, if a session is stopped, rotated, and started again, - * the session daemon will create a new chunk and send it to its peers. - * In that case, the streams' transition to a new chunk can be performed - * immediately. - * - * This trace chunk transition could also be performed lazily when - * a buffer is consumed. However, creating the files here allows the - * consumer daemon to report any creation error to the session daemon - * and cause the start of the tracing session to fail. - */ - update_stream_trace_chunk = !channel->trace_chunk && new_trace_chunk; /* * The acquisition of the reference cannot fail (barring @@ -1065,59 +1035,9 @@ int lttng_consumer_channel_set_trace_chunk( lttng_trace_chunk_put(channel->trace_chunk); channel->trace_chunk = new_trace_chunk; - if (!is_local_trace || !new_trace_chunk) { - /* Not an error. */ - goto end; - } - - if (!update_stream_trace_chunk) { - goto end; - } - - channel_hash = consumer_data.stream_per_chan_id_ht->hash_fct( - &channel->key, lttng_ht_seed); - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate(consumer_data.stream_per_chan_id_ht->ht, - channel_hash, - consumer_data.stream_per_chan_id_ht->match_fct, - &channel->key, &iter, stream, node_channel_id.node) { - bool acquired_reference, should_regenerate_metadata = false; - - acquired_reference = lttng_trace_chunk_get(channel->trace_chunk); - assert(acquired_reference); - - pthread_mutex_lock(&stream->lock); - - /* - * On a transition from "no-chunk" to a new chunk, a metadata - * stream's content must be entirely dumped. This must occcur - * _after_ the creation of the metadata stream's output files - * as the consumption thread (not necessarily the one executing - * this) may start to consume during the call to - * consumer_metadata_stream_dump(). - */ - should_regenerate_metadata = - stream->metadata_flag && - !stream->trace_chunk && channel->trace_chunk; - stream->trace_chunk = channel->trace_chunk; - ret = consumer_stream_create_output_files(stream, true); - if (ret) { - pthread_mutex_unlock(&stream->lock); - goto end_rcu_unlock; - } - if (should_regenerate_metadata) { - ret = consumer_metadata_stream_dump(stream); - } - pthread_mutex_unlock(&stream->lock); - if (ret) { - goto end_rcu_unlock; - } - } -end_rcu_unlock: - rcu_read_unlock(); end: pthread_mutex_unlock(&channel->lock); - return ret; + return 0; } /* @@ -1408,6 +1328,7 @@ void lttng_consumer_cleanup(void) { struct lttng_ht_iter iter; struct lttng_consumer_channel *channel; + unsigned int trace_chunks_left; rcu_read_lock(); @@ -1432,6 +1353,27 @@ void lttng_consumer_cleanup(void) */ lttng_ht_destroy(consumer_data.stream_list_ht); + /* + * Trace chunks in the registry may still exist if the session + * daemon has encountered an internal error and could not + * tear down its sessions and/or trace chunks properly. + * + * Release the session daemon's implicit reference to any remaining + * trace chunk and print an error if any trace chunk was found. Note + * that there are _no_ legitimate cases for trace chunks to be left, + * it is a leak. However, it can happen following a crash of the + * session daemon and not emptying the registry would cause an assertion + * to hit. + */ + trace_chunks_left = lttng_trace_chunk_registry_put_each_chunk( + consumer_data.chunk_registry); + if (trace_chunks_left) { + ERR("%u trace chunks are leaked by lttng-consumerd. " + "This can be caused by an internal error of the session daemon.", + trace_chunks_left); + } + /* Run all callbacks freeing each chunk. */ + rcu_barrier(); lttng_trace_chunk_registry_destroy(consumer_data.chunk_registry); } @@ -2232,6 +2174,7 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, } } +static void lttng_consumer_close_all_metadata(void) { switch (consumer_data.type) { @@ -4413,7 +4356,7 @@ enum lttcomm_return_code lttng_consumer_init_command( const lttng_uuid sessiond_uuid) { enum lttcomm_return_code ret; - char uuid_str[UUID_STR_LEN]; + char uuid_str[LTTNG_UUID_STR_LEN]; if (ctx->sessiond_uuid.is_set) { ret = LTTCOMM_CONSUMERD_ALREADY_SET; @@ -4439,7 +4382,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( { int ret; enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; - struct lttng_trace_chunk *created_chunk, *published_chunk; + struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL; enum lttng_trace_chunk_status chunk_status; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; char creation_timestamp_buffer[ISO8601_STR_LEN]; @@ -4494,7 +4437,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!created_chunk) { ERR("Failed to create trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } if (chunk_override_name) { @@ -4502,7 +4445,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( chunk_override_name); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } } @@ -4512,7 +4455,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk credentials"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } /* * The consumer daemon has no ownership of the chunk output @@ -4523,7 +4466,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { ERR("Failed to set trace chunk's directory handle"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } } @@ -4535,7 +4478,7 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!published_chunk) { ERR("Failed to publish trace chunk"); ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto end; + goto error; } rcu_read_lock(); @@ -4558,11 +4501,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( * channels. */ enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; DBG("Failed to set new trace chunk on existing channels, rolling back"); close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, - chunk_creation_timestamp, NULL); + chunk_creation_timestamp, NULL, + path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, chunk_id); @@ -4588,12 +4533,13 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( if (!relayd || ret) { enum lttcomm_return_code close_ret; + char path[LTTNG_PATH_MAX]; close_ret = lttng_consumer_close_trace_chunk(relayd_id, session_id, chunk_id, chunk_creation_timestamp, - NULL); + NULL, path); if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) { ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64, session_id, @@ -4601,21 +4547,23 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk( } ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED; - goto error; + goto error_unlock; } } -error: +error_unlock: rcu_read_unlock(); +error: /* Release the reference returned by the "publish" operation. */ lttng_trace_chunk_put(published_chunk); -end: + lttng_trace_chunk_put(created_chunk); return ret_code; } enum lttcomm_return_code lttng_consumer_close_trace_chunk( const uint64_t *relayd_id, uint64_t session_id, uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command) + const enum lttng_trace_chunk_command_type *close_command, + char *path) { enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_trace_chunk *chunk; @@ -4713,7 +4661,8 @@ enum lttcomm_return_code lttng_consumer_close_trace_chunk( if (relayd) { pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_close_trace_chunk( - &relayd->control_sock, chunk); + &relayd->control_sock, chunk, + path); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, @@ -4744,12 +4693,11 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists( { int ret; enum lttcomm_return_code ret_code; - struct lttng_trace_chunk *chunk; char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)]; const char *relayd_id_str = "(none)"; const bool is_local_trace = !relayd_id; struct consumer_relayd_sock_pair *relayd = NULL; - bool chunk_exists_remote; + bool chunk_exists_local, chunk_exists_remote; if (relayd_id) { int ret; @@ -4767,13 +4715,19 @@ enum lttcomm_return_code lttng_consumer_trace_chunk_exists( DBG("Consumer trace chunk exists command: relayd_id = %s" ", chunk_id = %" PRIu64, relayd_id_str, chunk_id); - chunk = lttng_trace_chunk_registry_find_chunk( + ret = lttng_trace_chunk_registry_chunk_exists( consumer_data.chunk_registry, session_id, - chunk_id); - DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist"); - if (chunk) { + chunk_id, &chunk_exists_local); + if (ret) { + /* Internal error. */ + ERR("Failed to query the existence of a trace chunk"); + ret_code = LTTCOMM_CONSUMERD_FATAL; + goto end; + } + DBG("Trace chunk %s locally", + chunk_exists_local ? "exists" : "does not exist"); + if (chunk_exists_local) { ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL; - lttng_trace_chunk_put(chunk); goto end; } else if (is_local_trace) { ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;