From e098433c90550d74288498f8c4474ef4c2daea68 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=A9mie=20Galarneau?= Date: Tue, 8 May 2018 21:41:08 -0400 Subject: [PATCH] Fix: propagate archive id to the consumer daemon on stream creation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit This is the first of a series of fixes addressing a number of problems with the way session rotation completions handled. Those issues can result in: - A stop never completing, - A rotation never completing, - A rotation being marked as completed while the consumerd/relayd are still writing to the completed chunk's trace archive, resulting in a temporarily corrupted trace. This first commit performs a relatively simple modification to ensure that the session's current archive id is propagated to the consumer daemon. Detailed description of the problems --- At the core of the problem is the fact that in per-pid buffering, we are not guaranteed that the sessiond will be able to see an application's channel(s) if it was torn down before (or even during) the rotation. When an application is torn down, it is removed from the ust_app_ht. That doesn't mean its buffers were received by the relayd or even consumed by the consumerd. The session daemon issues a "flush channel" command, but there is no guarantee/synchronization to ensure the buffers have been consumed. The current design assumes that the sessiond knows all the channels to rotate and that we can monitor those channels for the completion of a rotation. Given that an application can disappear or appear while we iterate on the ust_app_ht, this assumption does not hold. We also don't want to prevent/delay applications from registering or exiting just because a rotation is ongoing. * Problem 1 * A rename can happen before the relay has received all data for a given chunk, leading to the data pending issue explained previously. Rename should be performed as the last action after the rotation has been completed since data can still be in-flight, causing the creation of indexes upon its arrival on the relayd's end. See: https://github.com/lttng/lttng-tools/blob/cea6c68/src/bin/lttng-sessiond/rotation-thread.c#L392 Currently, the rotation thread waits for all channels (known to the sessiond at the start of the rotation) to have reached their rotation point. More specifically, the consumer will write to the channel_rotation pipe everytime a channel's subbuffers have been read up to the point of the rotation position. This does not guarantee that the data has been commited to disk on the relay's end. At that point, the command to rename the destination folder is sent to the relayd and the sessiond checks for the pending rotation periodically (every 200ms) if the output was to a relayd. That check is assumed not to be needed when tracing locally since reaching the rotation point implies the contents being written to disk. This scheme is not safe. If the sessiond sees no channel to iterate on, it will issue the rename command immediately. If an application's buffers were being flushed by the consumerd, the relayd will receive the data, attempt to create index files, and fail since the folder has been moved. From an architectural standpoint, the rename command also leaves the 'path' of streams that were unknown to the sessiond pointing to a path that does not exist anymore. * Problem 2 * In per-pid tracing mode, an application can appear after the rotation was initiated and cause the rotate pending check to never complete. A RELAYD_ROTATE_PENDING command is applied to a unique session id and a chunk id. When handling a RELAYD_ROTATE_PENDING commands, the relayd will perform the following check: - Iterate on every stream known at that point: - Check if the stream is rotating (stream->rotate_at_seq_num != -1ULL) - If the stream is not rotating, "stream->chunk_id < chunk_id" is checked. - If true, the rotation is considered incomplete. See: https://github.com/lttng/lttng-tools/blob/cea6c68/src/bin/lttng-relayd/main.c#L2850 Given that streams, at their creation, are initialized with their current "chunk_id" set to 0, the rotation will never be considered complete if a stream is created between a ROTATE_STREAM and ROTATE_PENDING command. This can happen whenever an application is registered during a rotation. * Problem 3 * Since the sessiond can't accurately monitor the channels that have to be rotated, the "rotation completed" notification (and state, if queried with the lttng_rotation_handle_get_state() interface) is not reliable. A client could see that the rotation is marked as completed and attempt to read a trace archive that has not been completely written. Signed-off-by: Jérémie Galarneau --- src/bin/lttng-sessiond/consumer.c | 17 ++- src/bin/lttng-sessiond/consumer.h | 14 +- src/bin/lttng-sessiond/kernel-consumer.c | 80 ++++++++---- src/bin/lttng-sessiond/kernel.c | 14 +- src/bin/lttng-sessiond/ust-app.c | 127 +++++++++++-------- src/bin/lttng-sessiond/ust-consumer.c | 20 ++- src/bin/lttng-sessiond/ust-consumer.h | 8 +- src/common/consumer/consumer.c | 4 +- src/common/consumer/consumer.h | 8 +- src/common/kernel-consumer/kernel-consumer.c | 3 +- src/common/sessiond-comm/sessiond-comm.h | 20 +++ src/common/ust-consumer/ust-consumer.c | 31 +++-- 12 files changed, 230 insertions(+), 116 deletions(-) diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 1ed1d7f81..bc973ddf8 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -834,7 +834,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint32_t ust_app_uid, int64_t blocking_timeout, const char *root_shm_path, - const char *shm_path) + const char *shm_path, + uint64_t trace_archive_id) { assert(msg); @@ -863,6 +864,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.ask_channel.monitor = monitor; msg->u.ask_channel.ust_app_uid = ust_app_uid; msg->u.ask_channel.blocking_timeout = blocking_timeout; + msg->u.ask_channel.trace_archive_id = trace_archive_id; memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid)); @@ -939,20 +941,21 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg, /* * Init stream communication message structure. */ -void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, +void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int cpu) + int32_t cpu, + uint64_t trace_archive_id) { assert(msg); memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); - msg->cmd_type = cmd; + msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM; msg->u.stream.channel_key = channel_key; msg->u.stream.stream_key = stream_key; msg->u.stream.cpu = cpu; + msg->u.stream.trace_archive_id = trace_archive_id; } void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, @@ -1429,7 +1432,8 @@ end: */ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, struct snapshot_output *output, int metadata, uid_t uid, gid_t gid, - const char *session_path, int wait, uint64_t nb_packets_per_stream) + const char *session_path, int wait, uint64_t nb_packets_per_stream, + uint64_t trace_archive_id) { int ret; struct lttcomm_consumer_msg msg; @@ -1445,6 +1449,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, msg.u.snapshot_channel.key = key; msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream; msg.u.snapshot_channel.metadata = metadata; + msg.u.snapshot_channel.trace_archive_id = trace_archive_id; if (output->consumer->type == CONSUMER_DST_NET) { msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index; diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 1018d7aa4..cdc48b7f3 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -276,12 +276,13 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg, uint32_t ust_app_uid, int64_t blocking_timeout, const char *root_shm_path, - const char *shm_path); -void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, - enum lttng_consumer_command cmd, + const char *shm_path, + uint64_t trace_archive_id); +void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, - int cpu); + int32_t cpu, + uint64_t trace_archive_id); void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, uint64_t channel_key, uint64_t net_seq_idx); @@ -319,8 +320,9 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, /* Snapshot command. */ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, - struct snapshot_output *output, int metadata, uid_t uid, gid_t gid, - const char *session_path, int wait, uint64_t nb_packets_per_stream); + struct snapshot_output *output, int metadata, + uid_t uid, gid_t gid, const char *session_path, int wait, + uint64_t nb_packets_per_stream, uint64_t trace_archive_id); int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 49adabb72..05e447754 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -173,6 +173,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock, rcu_read_lock(); session = session_find_by_id(ksession->id); assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); status = notification_thread_command_add_channel( notification_thread_handle, session->name, @@ -199,25 +201,30 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_add_metadata(struct consumer_socket *sock, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_session *ksession, unsigned int monitor) { int ret; char *pathname; struct lttcomm_consumer_msg lkm; struct consumer_output *consumer; + struct ltt_session *session; + + rcu_read_lock(); /* Safety net */ - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); - DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd); + DBG("Sending metadata %d to kernel consumer", + ksession->metadata_stream_fd); /* Get consumer output pointer */ - consumer = session->consumer; + consumer = ksession->consumer; if (monitor) { - pathname = create_channel_path(consumer, session->uid, session->gid); + pathname = create_channel_path(consumer, + ksession->uid, ksession->gid); } else { /* Empty path. */ pathname = strdup(""); @@ -227,13 +234,18 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, goto error; } + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Prep channel message structure */ consumer_init_add_channel_comm_msg(&lkm, - session->metadata->key, - session->id, + ksession->metadata->key, + ksession->id, pathname, - session->uid, - session->gid, + ksession->uid, + ksession->gid, consumer->net_seq_index, DEFAULT_METADATA_NAME, 1, @@ -252,17 +264,17 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); /* Prep stream message structure */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, - session->metadata->key, - session->metadata_stream_fd, - 0); /* CPU: 0 for metadata. */ + consumer_init_add_stream_comm_msg(&lkm, + ksession->metadata->key, + ksession->metadata_stream_fd, + 0 /* CPU: 0 for metadata. */, + session->current_archive_id); health_code_update(); /* Send stream and file descriptor */ ret = consumer_send_stream(sock, consumer, &lkm, - &session->metadata_stream_fd, 1); + &ksession->metadata_stream_fd, 1); if (ret < 0) { goto error; } @@ -270,6 +282,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock, health_code_update(); error: + rcu_read_unlock(); free(pathname); return ret; } @@ -279,8 +292,10 @@ error: */ static int kernel_consumer_add_stream(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream, - struct ltt_kernel_session *session, unsigned int monitor) + struct ltt_kernel_channel *channel, + struct ltt_kernel_stream *stream, + struct ltt_kernel_session *session, unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttcomm_consumer_msg lkm; @@ -299,11 +314,11 @@ int kernel_consumer_add_stream(struct consumer_socket *sock, consumer = session->consumer; /* Prep stream consumer message */ - consumer_init_stream_comm_msg(&lkm, - LTTNG_CONSUMER_ADD_STREAM, + consumer_init_add_stream_comm_msg(&lkm, channel->key, stream->fd, - stream->cpu); + stream->cpu, + trace_archive_id); health_code_update(); @@ -359,20 +374,28 @@ error: * The consumer socket lock must be held by the caller. */ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, - struct ltt_kernel_channel *channel, struct ltt_kernel_session *session, + struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession, unsigned int monitor) { int ret = LTTNG_OK; struct ltt_kernel_stream *stream; + struct ltt_session *session; /* Safety net */ assert(channel); - assert(session); - assert(session->consumer); + assert(ksession); + assert(ksession->consumer); assert(sock); + rcu_read_lock(); + + session = session_find_by_id(ksession->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Bail out if consumer is disabled */ - if (!session->consumer->enabled) { + if (!ksession->consumer->enabled) { ret = LTTNG_OK; goto error; } @@ -381,7 +404,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, channel->channel->name); if (!channel->sent_to_consumer) { - ret = kernel_consumer_add_channel(sock, channel, session, monitor); + ret = kernel_consumer_add_channel(sock, channel, ksession, monitor); if (ret < 0) { goto error; } @@ -395,8 +418,8 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } /* Add stream on the kernel consumer side. */ - ret = kernel_consumer_add_stream(sock, channel, stream, session, - monitor); + ret = kernel_consumer_add_stream(sock, channel, stream, + ksession, monitor, session->current_archive_id); if (ret < 0) { goto error; } @@ -404,6 +427,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock, } error: + rcu_read_unlock(); return ret; } diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index 19a325da1..fc817b2db 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -990,6 +990,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess, struct consumer_socket *socket; struct lttng_ht_iter iter; struct ltt_kernel_metadata *saved_metadata; + struct ltt_session *session; + uint64_t trace_archive_id; assert(ksess); assert(ksess->consumer); @@ -997,6 +999,12 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess, DBG("Kernel snapshot record started"); + session = session_find_by_id(ksess->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + trace_archive_id = session->current_archive_id; + /* Save current metadata since the following calls will change it. */ saved_metadata = ksess->metadata; saved_metadata_fd = ksess->metadata_stream_fd; @@ -1044,7 +1052,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess, ret = consumer_snapshot_channel(socket, chan->key, output, 0, ksess->uid, ksess->gid, DEFAULT_KERNEL_TRACE_DIR, wait, - nb_packets_per_stream); + nb_packets_per_stream, + trace_archive_id); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; (void) kernel_consumer_destroy_metadata(socket, @@ -1056,7 +1065,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess, /* Snapshot metadata, */ ret = consumer_snapshot_channel(socket, ksess->metadata->key, output, 1, ksess->uid, ksess->gid, - DEFAULT_KERNEL_TRACE_DIR, wait, 0); + DEFAULT_KERNEL_TRACE_DIR, wait, 0, + trace_archive_id); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; goto error_consumer; diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index f10a3dadf..160f567c8 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -2483,7 +2483,8 @@ error: */ static int do_consumer_create_channel(struct ltt_ust_session *usess, struct ust_app_session *ua_sess, struct ust_app_channel *ua_chan, - int bitness, struct ust_registry_session *registry) + int bitness, struct ust_registry_session *registry, + uint64_t trace_archive_id) { int ret; unsigned int nb_fd = 0; @@ -2518,7 +2519,7 @@ static int do_consumer_create_channel(struct ltt_ust_session *usess, * stream we have to expect. */ ret = ust_consumer_ask_channel(ua_sess, ua_chan, usess->consumer, socket, - registry); + registry, trace_archive_id); if (ret < 0) { goto error_ask; } @@ -2857,6 +2858,9 @@ static int create_channel_per_uid(struct ust_app *app, int ret; struct buffer_reg_uid *reg_uid; struct buffer_reg_channel *reg_chan; + struct ltt_session *session; + enum lttng_error_code notification_ret; + struct ust_registry_channel *chan_reg; assert(app); assert(usess); @@ -2887,12 +2891,18 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* * Create the buffers on the consumer side. This call populates the * ust app channel object with all streams and data object. */ ret = do_consumer_create_channel(usess, ua_sess, ua_chan, - app->bits_per_long, reg_uid->registry->reg.ust); + app->bits_per_long, reg_uid->registry->reg.ust, + session->current_archive_id); if (ret < 0) { ERR("Error creating UST channel \"%s\" on the consumer daemon", ua_chan->name); @@ -2918,39 +2928,26 @@ static int create_channel_per_uid(struct ust_app *app, goto error; } - { - enum lttng_error_code cmd_ret; - struct ltt_session *session; - uint64_t chan_reg_key; - struct ust_registry_channel *chan_reg; - - chan_reg_key = ua_chan->tracing_channel_id; + /* Notify the notification subsystem of the channel's creation. */ + pthread_mutex_lock(®_uid->registry->reg.ust->lock); + chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, + ua_chan->tracing_channel_id); + assert(chan_reg); + chan_reg->consumer_key = ua_chan->key; + chan_reg = NULL; + pthread_mutex_unlock(®_uid->registry->reg.ust->lock); - pthread_mutex_lock(®_uid->registry->reg.ust->lock); - chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust, - chan_reg_key); - assert(chan_reg); - chan_reg->consumer_key = ua_chan->key; - chan_reg = NULL; - pthread_mutex_unlock(®_uid->registry->reg.ust->lock); - - session = session_find_by_id(ua_sess->tracing_id); - assert(session); - - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - cmd_ret = notification_thread_command_add_channel( - notification_thread_handle, session->name, - ua_sess->euid, ua_sess->egid, - ua_chan->name, - ua_chan->key, - LTTNG_DOMAIN_UST, - ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); - if (cmd_ret != LTTNG_OK) { - ret = - (int) cmd_ret; - ERR("Failed to add channel to notification thread"); - goto error; - } + notification_ret = notification_thread_command_add_channel( + notification_thread_handle, session->name, + ua_sess->euid, ua_sess->egid, + ua_chan->name, + ua_chan->key, + LTTNG_DOMAIN_UST, + ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf); + if (notification_ret != LTTNG_OK) { + ret = - (int) notification_ret; + ERR("Failed to add channel to notification thread"); + goto error; } send_channel: @@ -3007,9 +3004,16 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* Create and get channel on the consumer side. */ ret = do_consumer_create_channel(usess, ua_sess, ua_chan, - app->bits_per_long, registry); + app->bits_per_long, registry, + session->current_archive_id); if (ret < 0) { ERR("Error creating UST channel \"%s\" on the consumer daemon", ua_chan->name); @@ -3024,9 +3028,6 @@ static int create_channel_per_pid(struct ust_app *app, goto error; } - session = session_find_by_id(ua_sess->tracing_id); - assert(session); - chan_reg_key = ua_chan->key; pthread_mutex_lock(®istry->lock); chan_reg = ust_registry_channel_find(registry, chan_reg_key); @@ -3034,9 +3035,6 @@ static int create_channel_per_pid(struct ust_app *app, chan_reg->consumer_key = ua_chan->key; pthread_mutex_unlock(®istry->lock); - assert(pthread_mutex_trylock(&session->lock)); - assert(session_trylock_list()); - cmd_ret = notification_thread_command_add_channel( notification_thread_handle, session->name, ua_sess->euid, ua_sess->egid, @@ -3242,6 +3240,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, struct ust_app_channel *metadata; struct consumer_socket *socket; struct ust_registry_session *registry; + struct ltt_session *session; assert(ua_sess); assert(app); @@ -3291,6 +3290,12 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, */ registry->metadata_key = metadata->key; + session = session_find_by_id(ua_sess->tracing_id); + assert(session); + + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + /* * Ask the metadata channel creation to the consumer. The metadata object * will be created by the consumer and kept their. However, the stream is @@ -3298,7 +3303,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess, * consumer. */ ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket, - registry); + registry, session->current_archive_id); if (ret < 0) { /* Nullify the metadata key so we don't try to close it later on. */ registry->metadata_key = 0; @@ -5946,12 +5951,20 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, struct lttng_ht_iter iter; struct ust_app *app; char pathname[PATH_MAX]; + struct ltt_session *session; + uint64_t trace_archive_id; assert(usess); assert(output); rcu_read_lock(); + session = session_find_by_id(usess->id); + assert(session); + assert(pthread_mutex_trylock(&session->lock)); + assert(session_trylock_list()); + trace_archive_id = session->current_archive_id; + switch (usess->buffer_type) { case LTTNG_BUFFER_PER_UID: { @@ -5981,16 +5994,20 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, /* Add the UST default trace dir to path. */ cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter, reg_chan, node.node) { - ret = consumer_snapshot_channel(socket, reg_chan->consumer_key, - output, 0, usess->uid, usess->gid, pathname, wait, - nb_packets_per_stream); + ret = consumer_snapshot_channel(socket, + reg_chan->consumer_key, + output, 0, usess->uid, + usess->gid, pathname, wait, + nb_packets_per_stream, + trace_archive_id); if (ret < 0) { goto error; } } ret = consumer_snapshot_channel(socket, reg->registry->reg.ust->metadata_key, output, 1, - usess->uid, usess->gid, pathname, wait, 0); + usess->uid, usess->gid, pathname, wait, 0, + trace_archive_id); if (ret < 0) { goto error; } @@ -6031,9 +6048,12 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter, ua_chan, node.node) { - ret = consumer_snapshot_channel(socket, ua_chan->key, output, - 0, ua_sess->euid, ua_sess->egid, pathname, wait, - nb_packets_per_stream); + ret = consumer_snapshot_channel(socket, + ua_chan->key, output, + 0, ua_sess->euid, ua_sess->egid, + pathname, wait, + nb_packets_per_stream, + trace_archive_id); if (ret < 0) { goto error; } @@ -6045,8 +6065,11 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess, ret = -1; goto error; } - ret = consumer_snapshot_channel(socket, registry->metadata_key, output, - 1, ua_sess->euid, ua_sess->egid, pathname, wait, 0); + ret = consumer_snapshot_channel(socket, + registry->metadata_key, output, + 1, ua_sess->euid, ua_sess->egid, + pathname, wait, 0, + trace_archive_id); if (ret < 0) { goto error; } diff --git a/src/bin/lttng-sessiond/ust-consumer.c b/src/bin/lttng-sessiond/ust-consumer.c index 01d71ce01..575593f03 100644 --- a/src/bin/lttng-sessiond/ust-consumer.c +++ b/src/bin/lttng-sessiond/ust-consumer.c @@ -108,8 +108,11 @@ error: * Consumer socket lock MUST be acquired before calling this. */ static int ask_channel_creation(struct ust_app_session *ua_sess, - struct ust_app_channel *ua_chan, struct consumer_output *consumer, - struct consumer_socket *socket, struct ust_registry_session *registry) + struct ust_app_channel *ua_chan, + struct consumer_output *consumer, + struct consumer_socket *socket, + struct ust_registry_session *registry, + uint64_t trace_archive_id) { int ret, output; uint32_t chan_id; @@ -201,7 +204,8 @@ static int ask_channel_creation(struct ust_app_session *ua_sess, ua_sess->output_traces, ua_sess->uid, ua_chan->attr.blocking_timeout, - root_shm_path, shm_path); + root_shm_path, shm_path, + trace_archive_id); health_code_update(); @@ -239,8 +243,11 @@ error: * Returns 0 on success else a negative value. */ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, - struct ust_app_channel *ua_chan, struct consumer_output *consumer, - struct consumer_socket *socket, struct ust_registry_session *registry) + struct ust_app_channel *ua_chan, + struct consumer_output *consumer, + struct consumer_socket *socket, + struct ust_registry_session *registry, + uint64_t trace_archive_id) { int ret; @@ -257,7 +264,8 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess, } pthread_mutex_lock(socket->lock); - ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry); + ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry, + trace_archive_id); pthread_mutex_unlock(socket->lock); if (ret < 0) { ERR("ask_channel_creation consumer command failed"); diff --git a/src/bin/lttng-sessiond/ust-consumer.h b/src/bin/lttng-sessiond/ust-consumer.h index ce0ab5193..b8bd65575 100644 --- a/src/bin/lttng-sessiond/ust-consumer.h +++ b/src/bin/lttng-sessiond/ust-consumer.h @@ -20,10 +20,14 @@ #include "consumer.h" #include "ust-app.h" +#include int ust_consumer_ask_channel(struct ust_app_session *ua_sess, - struct ust_app_channel *ua_chan, struct consumer_output *consumer, - struct consumer_socket *socket, struct ust_registry_session *registry); + struct ust_app_channel *ua_chan, + struct consumer_output *consumer, + struct consumer_socket *socket, + struct ust_registry_session *registry, + uint64_t trace_archive_id); int ust_consumer_get_channel(struct consumer_socket *socket, struct ust_app_channel *ua_chan); diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 5fa0ec4fa..3e991c8c7 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -562,7 +562,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor) + unsigned int monitor, + uint64_t trace_archive_id) { int ret; struct lttng_consumer_stream *stream; @@ -589,6 +590,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; stream->index_file = NULL; stream->last_sequence_number = -1ULL; + stream->trace_archive_id = trace_archive_id; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index acdc4b839..190de54ed 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -416,6 +416,11 @@ struct lttng_consumer_stream { uint64_t last_discarded_events; /* Copy of the sequence number of the last packet extracted. */ uint64_t last_sequence_number; + /* + * Session's current trace archive id at the time of the creation of + * this stream. + */ + uint64_t trace_archive_id; /* * Index file object of the index file for this stream. */ @@ -728,7 +733,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, int cpu, int *alloc_ret, enum consumer_channel_type type, - unsigned int monitor); + unsigned int monitor, + uint64_t trace_archive_id); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id, const char *pathname, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index ec67e316d..66ae16bc8 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -644,7 +644,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.stream.cpu, &alloc_ret, channel->type, - channel->monitor); + channel->monitor, + msg.u.stream.trace_archive_id); if (new_stream == NULL) { switch (alloc_ret) { case -ENOMEM: diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 8c4fe0ee7..59f7482eb 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -456,6 +456,13 @@ struct lttcomm_consumer_msg { int32_t cpu; /* On which CPU this stream is assigned. */ /* Tells the consumer if the stream should be or not monitored. */ uint32_t no_monitor; + /* + * The archive id that was "current" at the time this + * stream was created. This is used to determine + * whether a rotation request was sent before or after + * the creation of a stream. + */ + uint64_t trace_archive_id; } LTTNG_PACKED stream; /* Only used by Kernel. */ struct { uint64_t net_index; @@ -505,6 +512,13 @@ struct lttcomm_consumer_msg { */ uint32_t ust_app_uid; int64_t blocking_timeout; + /* + * The archive id that was "current" at the time this + * channel was created. This is used to determine + * whether a rotation request was sent before or after + * the creation of a channel. + */ + uint64_t trace_archive_id; char root_shm_path[PATH_MAX]; char shm_path[PATH_MAX]; } LTTNG_PACKED ask_channel; @@ -540,6 +554,12 @@ struct lttcomm_consumer_msg { uint64_t relayd_id; /* Relayd id if apply. */ uint64_t key; uint64_t nb_packets_per_stream; + /* + * The session's current trace archive id is propagated + * since a snapshot triggers the creation of an + * ephemeral metadata stream. + */ + uint64_t trace_archive_id; } LTTNG_PACKED snapshot_channel; struct { uint64_t channel_key; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 70bd14067..cb5743fad 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -147,7 +147,8 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id, */ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx, int *_alloc_ret) + struct lttng_consumer_local_data *ctx, int *_alloc_ret, + uint64_t trace_archive_id) { int alloc_ret; struct lttng_consumer_stream *stream = NULL; @@ -166,7 +167,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key, cpu, &alloc_ret, channel->type, - channel->monitor); + channel->monitor, + trace_archive_id); if (stream == NULL) { switch (alloc_ret) { case -ENOENT: @@ -267,7 +269,8 @@ end: * Return 0 on success else a negative value. */ static int create_ust_streams(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, + uint64_t trace_archive_id) { int ret, cpu = 0; struct ustctl_consumer_stream *ustream; @@ -298,7 +301,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel, } /* Allocate consumer stream object. */ - stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret); + stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret, + trace_archive_id); if (!stream) { goto error_alloc; } @@ -641,7 +645,8 @@ error: */ static int ask_channel(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *channel, - struct ustctl_consumer_channel_attr *attr) + struct ustctl_consumer_channel_attr *attr, + uint64_t trace_archive_id) { int ret; @@ -682,7 +687,7 @@ static int ask_channel(struct lttng_consumer_local_data *ctx, } /* Open all streams for this channel. */ - ret = create_ust_streams(channel, ctx); + ret = create_ust_streams(channel, ctx, trace_archive_id); if (ret < 0) { goto end; } @@ -986,7 +991,8 @@ end: * Returns 0 on success, < 0 on error */ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx, + uint64_t trace_archive_id) { int ret = 0; struct lttng_consumer_channel *metadata_channel; @@ -1026,7 +1032,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, * The metadata stream is NOT created in no monitor mode when the channel * is created on a sessiond ask channel command. */ - ret = create_ust_streams(metadata_channel, ctx); + ret = create_ust_streams(metadata_channel, ctx, trace_archive_id); if (ret < 0) { goto error; } @@ -1081,7 +1087,8 @@ error: * Returns 0 on success, < 0 on error */ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx) + uint64_t nb_packets_per_stream, + struct lttng_consumer_local_data *ctx) { int ret; unsigned use_relayd = 0; @@ -1492,7 +1499,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, health_code_update(); - ret = ask_channel(ctx, channel, &attr); + ret = ask_channel(ctx, channel, &attr, + msg.u.ask_channel.trace_archive_id); if (ret < 0) { goto end_channel_error; } @@ -1747,7 +1755,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = snapshot_metadata(msg.u.snapshot_channel.key, msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, - ctx); + ctx, + msg.u.snapshot_channel.trace_archive_id); if (ret < 0) { ERR("Snapshot metadata failed"); ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA; -- 2.34.1