From 3f505e4b7709766f5953a994f5425f74a48300e3 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Wed, 3 Apr 2019 16:32:21 -0400 Subject: [PATCH] consumerd: Implement clear stream/session commands Signed-off-by: Mathieu Desnoyers --- src/common/consumer/consumer.c | 288 ++++++++++++++++--- src/common/consumer/consumer.h | 2 + src/common/kernel-consumer/kernel-consumer.c | 18 ++ src/common/relayd/relayd.c | 42 +++ src/common/relayd/relayd.h | 1 + src/common/sessiond-comm/sessiond-comm.h | 3 + src/common/ust-consumer/ust-consumer.c | 19 ++ 7 files changed, 335 insertions(+), 38 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3e3c30487..7b8782b2e 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3821,34 +3821,167 @@ end: } static -int clear_unmonitored_channel(struct lttng_consumer_channel *channel) +int consumer_unlink_stream_files_rotation(struct lttng_consumer_stream *stream) { + uint64_t tracefile_size = stream->chan->tracefile_size; + uint64_t tracefile_count = stream->chan->tracefile_count; + uint64_t count; + int ret; + + /* + * If the channel is configured to have an open-ended number of tracefiles, + * use the current tracefile count number as upper-bound. + */ + if (!tracefile_count) { + tracefile_count = stream->tracefile_count_current + 1; + } + /* + * Try to unlink each file and each index for this stream. They may not exist, + * in which case ENOENT is fine. + */ + for (count = 0; count < tracefile_count; count++) { + ret = utils_unlink_stream_file(stream->chan->pathname, stream->name, + tracefile_size, count, stream->uid, stream->gid, NULL); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + if (stream->index_file) { + ret = lttng_index_file_unlink(stream->chan->pathname, stream->name, + stream->uid, stream->gid, tracefile_size, count); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + } + } + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_unlink_stream_files(struct lttng_consumer_stream *stream) +{ + uint64_t tracefile_size = stream->chan->tracefile_size; int ret; - struct lttng_consumer_stream *stream; - assert(!channel->monitor); - assert(channel->type != CONSUMER_CHANNEL_TYPE_METADATA); + /* No tracefile rotation, a single file to unlink and re-create. */ + ret = utils_unlink_stream_file(stream->chan->pathname, stream->name, + tracefile_size, 0, stream->uid, stream->gid, 0); + if (ret < 0 && errno != ENOENT) { + return LTTCOMM_CONSUMERD_FATAL; + } + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_clear_stream_files(struct lttng_consumer_stream *stream) +{ + int ret; + uint64_t tracefile_size = stream->chan->tracefile_size; + + /* + * If stream is sent over to a relay daemon, there are no local files + * to unlink. + */ + if (stream->net_seq_idx != (uint64_t) -1ULL) { + return LTTCOMM_CONSUMERD_SUCCESS; + } + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing tracefile"); + return LTTCOMM_CONSUMERD_FATAL; + } + stream->out_fd = -1; + stream->out_fd_offset = 0; + stream->tracefile_size_current = 0; + + /* + * Re-creation of the index file takes care of clearing its + * content for non-tracefile-rotation streams. + * Rotation streams need to explicitly unlink each index file. + * We put the stream file, but keep the stream->index_file value + * as indication whether the stream has index (non-NULL) before + * overwriting it with an index creation. + */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + } + + if (tracefile_size > 0) { + /* Tracefile rotation. */ + ret = consumer_unlink_stream_files_rotation(stream); + } else { + ret = consumer_unlink_stream_files(stream); + } + if (ret != LTTCOMM_CONSUMERD_SUCCESS) { + return ret; + } + + /* Create new files. */ + ret = utils_create_stream_file(stream->chan->pathname, stream->name, + tracefile_size, 0, stream->uid, stream->gid, 0); + if (ret < 0) { + return LTTCOMM_CONSUMERD_FATAL; + } + stream->out_fd = ret; + + if (stream->index_file) { + stream->index_file = lttng_index_file_create(stream->chan->pathname, + stream->name, stream->uid, stream->gid, tracefile_size, + 0, CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { + return LTTCOMM_CONSUMERD_FATAL; + } + } + + return LTTCOMM_CONSUMERD_SUCCESS; +} + +static +int consumer_clear_stream(struct lttng_consumer_stream *stream) +{ + int ret; + + ret = consumer_flush_buffer(stream, 1); + if (ret < 0) { + ERR("Failed to flush stream %" PRIu64 " during channel clear", + stream->key); + ret = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + + ret = consumer_clear_buffer(stream); + if (ret < 0) { + ERR("Failed to clear stream %" PRIu64 " during channel clear", + stream->key); + ret = LTTCOMM_CONSUMERD_FATAL; + goto error; + } + + ret = consumer_clear_stream_files(stream); + if (ret != LTTCOMM_CONSUMERD_SUCCESS) { + ERR("Failed to clear stream %" PRIu64 " files during channel clear", + stream->key); + goto error; + } + ret = LTTCOMM_CONSUMERD_SUCCESS; +error: + return ret; +} + +static +int consumer_clear_unmonitored_channel(struct lttng_consumer_channel *channel) +{ + int ret; + struct lttng_consumer_stream *stream; rcu_read_lock(); pthread_mutex_lock(&channel->lock); cds_list_for_each_entry(stream, &channel->streams.head, send_node) { health_code_update(); pthread_mutex_lock(&stream->lock); - - ret = consumer_flush_buffer(stream, 1); - if (ret < 0) { - ERR("Failed to flush stream %" PRIu64 " during channel clear", - stream->key); - ret = LTTCOMM_CONSUMERD_FATAL; - goto error_unlock; - } - - ret = consumer_clear_buffer(stream); - if (ret < 0) { - ERR("Failed to clear stream %" PRIu64 " during channel clear", - stream->key); - ret = LTTCOMM_CONSUMERD_FATAL; + ret = consumer_clear_stream(stream); + if (ret) { goto error_unlock; } pthread_mutex_unlock(&stream->lock); @@ -3861,40 +3994,119 @@ error_unlock: pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&channel->lock); rcu_read_unlock(); + if (ret) + goto error; + ret = LTTCOMM_CONSUMERD_SUCCESS; +error: return ret; + } -int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) +static +int consumer_clear_monitored_channel(struct lttng_consumer_channel *channel) { + struct lttng_ht *ht; + struct lttng_consumer_stream *stream; + struct lttng_ht_iter iter; int ret; - DBG("Consumer clear channel %" PRIu64, channel->key); + ht = consumer_data.stream_per_chan_id_ht; - if (!channel->monitor) { - /* Snapshot mode */ - if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { - /* - * Nothing to do for the metadata channel/stream. - * Snapshot mechanism already take care of the metadata - * handling/generation. - */ - goto end; + rcu_read_lock(); + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, &channel->key, + &iter.iter, stream, node_channel_id.node) { + /* + * Protect against teardown with mutex. + */ + pthread_mutex_lock(&stream->lock); + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; } - ret = clear_unmonitored_channel(channel); + ret = consumer_clear_stream(stream); if (ret) { - goto error; + goto error_unlock; } - } else { - /* TODO: - * Normal channel and relayd bound clear operation not supported - * for now + next: + pthread_mutex_unlock(&stream->lock); + } + rcu_read_unlock(); + return LTTCOMM_CONSUMERD_SUCCESS; + +error_unlock: + pthread_mutex_unlock(&stream->lock); + rcu_read_unlock(); + return ret; +} + +int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel) +{ + int ret; + + DBG("Consumer clear channel %" PRIu64, channel->key); + + if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) { + /* + * Nothing to do for the metadata channel/stream. + * Snapshot mechanism already take care of the metadata + * handling/generation, and monitored channels only need to + * have their data stream cleared.. */ - ret = LTTCOMM_CONSUMERD_FATAL; - goto error; + ret = LTTCOMM_CONSUMERD_SUCCESS; + goto end; } + if (!channel->monitor) { + ret = consumer_clear_unmonitored_channel(channel); + } else { + ret = consumer_clear_monitored_channel(channel); + } end: + return ret; +} + +int lttng_consumer_clear_session(uint64_t session_id) +{ + struct lttng_ht_iter iter; + struct lttng_consumer_stream *stream; + int ret; + + DBG("Consumer clear session %" PRIu64, session_id); + + rcu_read_lock(); + + /* Find first stream match in data_ht. */ + cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) { + if (stream->chan->session_id == session_id) { + struct consumer_relayd_sock_pair *relayd = NULL; + + if (stream->net_seq_idx == (uint64_t) -1ULL) { + /* + * Asking for a clear session on local session. + * No relayd to contact, nothing to do. + */ + break; /* Stop after first match. */ + } + relayd = consumer_find_relayd(stream->net_seq_idx); + if (relayd == NULL) { + /* + * relayd is shutting down. + */ + ret = LTTCOMM_CONSUMERD_SUCCESS; + goto end; + } + ret = relayd_clear_session(&relayd->control_sock); + if (ret < 0) { + ret = LTTCOMM_CONSUMERD_FATAL; + goto end; + } + break; /* Stop after first match. */ + + } + } ret = LTTCOMM_CONSUMERD_SUCCESS; -error: +end: + rcu_read_unlock(); return ret; } diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 3d4fb85ef..907aecf22 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -63,6 +63,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL, LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, LTTNG_CONSUMER_CLEAR_CHANNEL, + LTTNG_CONSUMER_CLEAR_SESSION, }; /* State of each fd in consumer */ @@ -757,5 +758,6 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); +int lttng_consumer_clear_session(uint64_t session_id); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 7039b3b80..442c49631 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1116,6 +1116,24 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_CLEAR_SESSION: + { + uint64_t session_id = msg.u.clear_session.session_id; + + ret = lttng_consumer_clear_session(session_id); + if (ret) { + ERR("Clear session failed session_id %" PRIu64, session_id); + ret_code = ret; + } + + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } default: goto end_nosignal; } diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index b936a4d3e..9f0cf1e3e 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -941,3 +941,45 @@ int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, error: return ret; } + +/* + * Ask the relay to clear files associated with the socket session. + */ +int relayd_clear_session(struct lttcomm_relayd_sock *rsock) +{ + int ret; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Relayd clear session"); + + /* Send command */ + ret = send_command(rsock, RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS, NULL, 0, 0); + if (ret < 0) { + goto error; + } + + /* Receive response */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd clear session replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + } + + DBG("Relayd clear session successful"); + +error: + return ret; +} diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index f090a0db6..2ece137a8 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -51,5 +51,6 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, uint64_t net_seq_num); int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); +int relayd_clear_session(struct lttcomm_relayd_sock *rsock); #endif /* _RELAYD_H */ diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 59c4aad71..efca5cb9c 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -544,6 +544,9 @@ struct lttcomm_consumer_msg { struct { uint64_t key; } LTTNG_PACKED clear_channel; + struct { + uint64_t session_id; + } LTTNG_PACKED clear_session; } u; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 473c94b23..87c121464 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1948,6 +1948,25 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } break; } + case LTTNG_CONSUMER_CLEAR_SESSION: + { + uint64_t session_id = msg.u.clear_session.session_id; + + ret = lttng_consumer_clear_session(session_id); + if (ret) { + ERR("Clear session failed session_id %" PRIu64, session_id); + ret_code = ret; + } + + health_code_update(); + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + break; + } + default: break; } -- 2.34.1