X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=7b8782b2e12e2193ebb4f5a8e9062db10ba6ca57;hb=3f505e4b7709766f5953a994f5425f74a48300e3;hp=73ccbf6bd3b316e4606dd2acaeb628819099a3f7;hpb=2afa177e790cc3223abe9f1a591536017270e0ab;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 73ccbf6bd..7b8782b2e 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -323,6 +323,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -464,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -491,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -802,6 +800,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -843,6 +843,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1710,7 +1712,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1936,7 +1939,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -2159,7 +2163,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -3522,6 +3526,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3545,34 +3550,6 @@ error_nosignal: } } -/* - * Try to lock the stream mutex. - * - * On success, 1 is returned else 0 indicating that the mutex is NOT lock. - */ -static int stream_try_lock(struct lttng_consumer_stream *stream) -{ - int ret; - - assert(stream); - - /* - * Try to lock the stream mutex. On failure, we know that the stream is - * being used else where hence there is data still being extracted. - */ - ret = pthread_mutex_trylock(&stream->lock); - if (ret) { - /* For both EBUSY and EINVAL error, the mutex is NOT locked. */ - ret = 0; - goto end; - } - - ret = 1; - -end: - return ret; -} - /* * Search for a relayd associated to the session id and return the reference. * @@ -3639,28 +3616,11 @@ int consumer_data_pending(uint64_t id) /* Ease our life a bit */ ht = consumer_data.stream_list_ht; - relayd = find_relayd_by_session_id(id); - if (relayd) { - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_begin_data_pending(&relayd->control_sock, - relayd->relayd_session_id); - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); - if (ret < 0) { - /* Communication error thus the relayd so no data pending. */ - goto data_not_pending; - } - } - cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&id, lttng_ht_seed), ht->match_fct, &id, &iter.iter, stream, node_session_id.node) { - /* If this call fails, the stream is being used hence data pending. */ - ret = stream_try_lock(stream); - if (!ret) { - goto data_pending; - } + pthread_mutex_lock(&stream->lock); /* * A removed node from the hash table indicates that the stream has @@ -3678,9 +3638,27 @@ int consumer_data_pending(uint64_t id) } } - /* Relayd check */ - if (relayd) { - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + } + + relayd = find_relayd_by_session_id(id); + if (relayd) { + unsigned int is_data_inflight = 0; + + /* Send init command for data pending. */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_begin_data_pending(&relayd->control_sock, + relayd->relayd_session_id); + if (ret < 0) { + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + /* Communication error thus the relayd so no data pending. */ + goto data_not_pending; + } + + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&id, lttng_ht_seed), + ht->match_fct, &id, + &iter.iter, stream, node_session_id.node) { if (stream->metadata_flag) { ret = relayd_quiescent_control(&relayd->control_sock, stream->relayd_stream_id); @@ -3689,24 +3667,25 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } - pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret == 1) { - pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); goto data_pending; + } else if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + goto data_not_pending; } } - pthread_mutex_unlock(&stream->lock); - } - if (relayd) { - unsigned int is_data_inflight = 0; - - /* Send init command for data pending. */ - pthread_mutex_lock(&relayd->ctrl_sock_mutex); + /* Send end command for data pending. */ ret = relayd_end_data_pending(&relayd->control_sock, relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { @@ -3788,3 +3767,346 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, } return start_pos; } + +static +int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_flush(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_flush_buffer(stream, producer_active); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +static +int consumer_clear_buffer(struct lttng_consumer_stream *stream) +{ + int ret = 0; + + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + ret = kernctl_buffer_clear(stream->wait_fd); + if (ret < 0) { + ERR("Failed to flush kernel stream"); + goto end; + } + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + lttng_ustconsumer_clear_buffer(stream); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + +end: + return ret; +} + +static +int consumer_unlink_stream_files_rotation(struct lttng_consumer_stream *stream) +{ + uint64_t tracefile_size = stream->chan->tracefile_size; + uint64_t tracefile_count = stream->chan->tracefile_count; + uint64_t count; + int ret; + + /* + * If the channel is configured to have an open-ended number of tracefiles, + * use the current tracefile count number as upper-bound. + */ + if (!tracefile_count) { + tracefile_count = stream->tracefile_count_current + 1; + } + + /* + * Try to unlink each file and each index for this stream. They may not exist, + * in which case ENOENT is fine. + */ + for (count = 0; count < tracefile_count; count++) { + ret = utils_unlink_stream_file(stream->chan->pathname, stream->name, + tracefile_size, count, stream->uid, stream->gid, NULL); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + if (stream->index_file) { + ret = lttng_index_file_unlink(stream->chan->pathname, stream->name, + stream->uid, stream->gid, tracefile_size, count); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + } + } + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_unlink_stream_files(struct lttng_consumer_stream *stream) +{ + uint64_t tracefile_size = stream->chan->tracefile_size; + int ret; + + /* No tracefile rotation, a single file to unlink and re-create. */ + ret = utils_unlink_stream_file(stream->chan->pathname, stream->name, + tracefile_size, 0, stream->uid, stream->gid, 0); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_clear_stream_files(struct lttng_consumer_stream *stream) +{ + int ret; + uint64_t tracefile_size = stream->chan->tracefile_size; + + /* + * If stream is sent over to a relay daemon, there are no local files + * to unlink. + */ + if (stream->net_seq_idx != (uint64_t) -1ULL) { + return LTTCOMM_CONSUMERD_SUCCESS; + } + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing tracefile"); + return LTTCOMM_CONSUMERD_FATAL; + } + stream->out_fd = -1; + stream->out_fd_offset = 0; + stream->tracefile_size_current = 0; + + /* + * Re-creation of the index file takes care of clearing its + * content for non-tracefile-rotation streams. + * Rotation streams need to explicitly unlink each index file. + * We put the stream file, but keep the stream->index_file value + * as indication whether the stream has index (non-NULL) before + * overwriting it with an index creation. + */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + + if (tracefile_size > 0) { + /* Tracefile rotation. */ + ret = consumer_unlink_stream_files_rotation(stream); + } else { + ret = consumer_unlink_stream_files(stream); + } + if (ret != LTTCOMM_CONSUMERD_SUCCESS) { + return ret; + } + + /* Create new files. */ + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + tracefile_size, 0, stream->uid, stream->gid, 0); + if (ret < 0) { + return LTTCOMM_CONSUMERD_FATAL; + } + stream->out_fd = ret; + + if (stream->index_file) { + stream->index_file = lttng_index_file_create(stream->chan->pathname, + stream->name, stream->uid, stream->gid, tracefile_size, + 0, CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { + return LTTCOMM_CONSUMERD_FATAL; + } + } + + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_clear_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + ret = consumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream %" PRIu64 " during channel clear", + stream->key); + ret = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + + ret = consumer_clear_buffer(stream); + if (ret < 0) { + ERR("Failed to clear stream %" PRIu64 " during channel clear", + stream->key); + ret = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + + ret = consumer_clear_stream_files(stream); + if (ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to clear stream %" PRIu64 " files during channel clear", + stream->key); + goto error; + } + ret = LTTCOMM_CONSUMERD_SUCCESS; +error: + return ret; +} + +static +int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel) +{ + int ret; + struct lttng_consumer_stream *stream; + + rcu_read_lock(); + pthread_mutex_lock(&channel->lock); + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + health_code_update(); + pthread_mutex_lock(&stream->lock); + ret = consumer_clear_stream(stream); + if (ret) { + goto error_unlock; + } + pthread_mutex_unlock(&stream->lock); + } + pthread_mutex_unlock(&channel->lock); + rcu_read_unlock(); + return 0; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&channel->lock); + rcu_read_unlock(); + if (ret) + goto error; + ret = LTTCOMM_CONSUMERD_SUCCESS; +error: + return ret; + +} + +static +int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel) +{ + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; + int ret; + + ht = consumer_data.stream_per_chan_id_ht; + + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + /* + * Protect against teardown with mutex. + */ + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + ret = consumer_clear_stream(stream); + if (ret) { + goto error_unlock; + } + next: + pthread_mutex_unlock(&stream->lock); + } + rcu_read_unlock(); + return LTTCOMM_CONSUMERD_SUCCESS; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + rcu_read_unlock(); + return ret; +} + +int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) +{ + int ret; + + DBG("Consumer clear channel %" PRIu64, channel->key); + + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { + /* + * Nothing to do for the metadata channel/stream. + * Snapshot mechanism already take care of the metadata + * handling/generation, and monitored channels only need to + * have their data stream cleared.. + */ + ret = LTTCOMM_CONSUMERD_SUCCESS; + goto end; + } + + if (!channel->monitor) { + ret = consumer_clear_unmonitored_channel(channel); + } else { + ret = consumer_clear_monitored_channel(channel); + } +end: + return ret; +} + +int lttng_consumer_clear_session(uint64_t session_id) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + int ret; + + DBG("Consumer clear session %" PRIu64, session_id); + + rcu_read_lock(); + + /* Find first stream match in data_ht. */ + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { + if (stream->chan->session_id == session_id) { + struct consumer_relayd_sock_pair *relayd = NULL; + + if (stream->net_seq_idx == (uint64_t) -1ULL) { + /* + * Asking for a clear session on local session. + * No relayd to contact, nothing to do. + */ + break; /* Stop after first match. */ + } + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + /* + * relayd is shutting down. + */ + ret = LTTCOMM_CONSUMERD_SUCCESS; + goto end; + } + ret = relayd_clear_session(&relayd->control_sock); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_FATAL; + goto end; + } + break; /* Stop after first match. */ + + } + } + ret = LTTCOMM_CONSUMERD_SUCCESS; +end: + rcu_read_unlock(); + return ret; +}