Fix: propagate archive id to the consumer daemon on stream creation
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 9 May 2018 01:41:08 +0000 (21:41 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 5 Jun 2018 15:25:45 +0000 (11:25 -0400)
This is the first of a series of fixes addressing a number of problems
with the way session rotation completions handled.

Those issues can result in:
  - A stop never completing,
  - A rotation never completing,
  - A rotation being marked as completed while the consumerd/relayd
      are still writing to the completed chunk's trace archive,
      resulting in a temporarily corrupted trace.

This first commit performs a relatively simple modification
to ensure that the session's current archive id is propagated to the
consumer daemon.

Detailed description of the problems
---

At the core of the problem is the fact that in per-pid buffering, we
are not guaranteed that the sessiond will be able to see an
application's channel(s) if it was torn down before (or even during)
the rotation.

When an application is torn down, it is removed from the ust_app_ht.
That doesn't mean its buffers were received by the relayd or even
consumed by the consumerd. The session daemon issues a "flush channel"
command, but there is no guarantee/synchronization to ensure the
buffers have been consumed.

The current design assumes that the sessiond knows all the channels to
rotate and that we can monitor those channels for the completion of
a rotation. Given that an application can disappear or appear while
we iterate on the ust_app_ht, this assumption does not hold. We also
don't want to prevent/delay applications from registering or exiting
just because a rotation is ongoing.

* Problem 1 *

A rename can happen before the relay has received all data for a given
chunk, leading to the data pending issue explained previously.

Rename should be performed as the last action after the rotation
has been completed since data can still be in-flight,
causing the creation of indexes upon its arrival on the relayd's end.

See: https://github.com/lttng/lttng-tools/blob/cea6c68/src/bin/lttng-sessiond/rotation-thread.c#L392

Currently, the rotation thread waits for all channels (known to the
sessiond at the start of the rotation) to have reached their rotation
point. More specifically, the consumer will write to the
channel_rotation pipe everytime a channel's subbuffers have been read
up to the point of the rotation position. This does not guarantee that
the data has been commited to disk on the relay's end.

At that point, the command to rename the destination folder is sent to
the relayd and the sessiond checks for the pending rotation
periodically (every 200ms) if the output was to a relayd.

That check is assumed not to be needed when tracing locally since
reaching the rotation point implies the contents being written to
disk.

This scheme is not safe. If the sessiond sees no channel to iterate
on, it will issue the rename command immediately. If an application's
buffers were being flushed by the consumerd, the relayd will receive
the data, attempt to create index files, and fail since the folder has
been moved.

From an architectural standpoint, the rename command also leaves the
'path' of streams that were unknown to the sessiond pointing to a path
that does not exist anymore.

* Problem 2 *

In per-pid tracing mode, an application can appear after the rotation
was initiated and cause the rotate pending check to never complete.

A RELAYD_ROTATE_PENDING command is applied to a unique session id and
a chunk id.

When handling a RELAYD_ROTATE_PENDING commands, the relayd will perform
the following check:
- Iterate on every stream known at that point:
- Check if the stream is rotating (stream->rotate_at_seq_num != -1ULL)
- If the stream is not rotating, "stream->chunk_id < chunk_id" is checked.
- If true, the rotation is considered incomplete.

See: https://github.com/lttng/lttng-tools/blob/cea6c68/src/bin/lttng-relayd/main.c#L2850

Given that streams, at their creation, are initialized with their
current "chunk_id" set to 0, the rotation will never be considered
complete if a stream is created between a ROTATE_STREAM and
ROTATE_PENDING command.

This can happen whenever an application is registered during a rotation.

* Problem 3 *

Since the sessiond can't accurately monitor the channels that have to
be rotated, the "rotation completed" notification (and state, if
queried with the lttng_rotation_handle_get_state() interface) is not
reliable.

A client could see that the rotation is marked as completed and
attempt to read a trace archive that has not been completely written.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
12 files changed:
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel-consumer.c
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/ust-app.c
src/bin/lttng-sessiond/ust-consumer.c
src/bin/lttng-sessiond/ust-consumer.h
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index 1ed1d7f81991422f9666541f7c24e85ec5549bf2..bc973ddf80e78e90d535036ac1de3427266ac8d3 100644 (file)
@@ -834,7 +834,8 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                uint32_t ust_app_uid,
                int64_t blocking_timeout,
                const char *root_shm_path,
-               const char *shm_path)
+               const char *shm_path,
+               uint64_t trace_archive_id)
 {
        assert(msg);
 
@@ -863,6 +864,7 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
        msg->u.ask_channel.monitor = monitor;
        msg->u.ask_channel.ust_app_uid = ust_app_uid;
        msg->u.ask_channel.blocking_timeout = blocking_timeout;
+       msg->u.ask_channel.trace_archive_id = trace_archive_id;
 
        memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
 
@@ -939,20 +941,21 @@ void consumer_init_add_channel_comm_msg(struct lttcomm_consumer_msg *msg,
 /*
  * Init stream communication message structure.
  */
-void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
-               enum lttng_consumer_command cmd,
+void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t stream_key,
-               int cpu)
+               int32_t cpu,
+               uint64_t trace_archive_id)
 {
        assert(msg);
 
        memset(msg, 0, sizeof(struct lttcomm_consumer_msg));
 
-       msg->cmd_type = cmd;
+       msg->cmd_type = LTTNG_CONSUMER_ADD_STREAM;
        msg->u.stream.channel_key = channel_key;
        msg->u.stream.stream_key = stream_key;
        msg->u.stream.cpu = cpu;
+       msg->u.stream.trace_archive_id = trace_archive_id;
 }
 
 void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
@@ -1429,7 +1432,8 @@ end:
  */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
                struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, uint64_t nb_packets_per_stream)
+               const char *session_path, int wait, uint64_t nb_packets_per_stream,
+               uint64_t trace_archive_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1445,6 +1449,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
        msg.u.snapshot_channel.key = key;
        msg.u.snapshot_channel.nb_packets_per_stream = nb_packets_per_stream;
        msg.u.snapshot_channel.metadata = metadata;
+       msg.u.snapshot_channel.trace_archive_id = trace_archive_id;
 
        if (output->consumer->type == CONSUMER_DST_NET) {
                msg.u.snapshot_channel.relayd_id = output->consumer->net_seq_index;
index 1018d7aa4f28ba83f612e54b63dd5024fa94d36f..cdc48b7f3131df6600af05bf3c31cd70a5046266 100644 (file)
@@ -276,12 +276,13 @@ void consumer_init_ask_channel_comm_msg(struct lttcomm_consumer_msg *msg,
                uint32_t ust_app_uid,
                int64_t blocking_timeout,
                const char *root_shm_path,
-               const char *shm_path);
-void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg,
-               enum lttng_consumer_command cmd,
+               const char *shm_path,
+               uint64_t trace_archive_id);
+void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
                uint64_t channel_key,
                uint64_t stream_key,
-               int cpu);
+               int32_t cpu,
+               uint64_t trace_archive_id);
 void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg,
                enum lttng_consumer_command cmd,
                uint64_t channel_key, uint64_t net_seq_idx);
@@ -319,8 +320,9 @@ int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
 
 /* Snapshot command. */
 int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
-               struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
-               const char *session_path, int wait, uint64_t nb_packets_per_stream);
+               struct snapshot_output *output, int metadata,
+               uid_t uid, gid_t gid, const char *session_path, int wait,
+               uint64_t nb_packets_per_stream, uint64_t trace_archive_id);
 
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, struct consumer_output *output,
index 49adabb72336b11d0255e3a1f1d2462b0a70601a..05e447754bb2156a77b6ce3d8dc1555f84973864 100644 (file)
@@ -173,6 +173,8 @@ int kernel_consumer_add_channel(struct consumer_socket *sock,
        rcu_read_lock();
        session = session_find_by_id(ksession->id);
        assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
 
        status = notification_thread_command_add_channel(
                        notification_thread_handle, session->name,
@@ -199,25 +201,30 @@ error:
  * The consumer socket lock must be held by the caller.
  */
 int kernel_consumer_add_metadata(struct consumer_socket *sock,
-               struct ltt_kernel_session *session, unsigned int monitor)
+               struct ltt_kernel_session *ksession, unsigned int monitor)
 {
        int ret;
        char *pathname;
        struct lttcomm_consumer_msg lkm;
        struct consumer_output *consumer;
+       struct ltt_session *session;
+
+       rcu_read_lock();
 
        /* Safety net */
-       assert(session);
-       assert(session->consumer);
+       assert(ksession);
+       assert(ksession->consumer);
        assert(sock);
 
-       DBG("Sending metadata %d to kernel consumer", session->metadata_stream_fd);
+       DBG("Sending metadata %d to kernel consumer",
+                       ksession->metadata_stream_fd);
 
        /* Get consumer output pointer */
-       consumer = session->consumer;
+       consumer = ksession->consumer;
 
        if (monitor) {
-               pathname = create_channel_path(consumer, session->uid, session->gid);
+               pathname = create_channel_path(consumer,
+                               ksession->uid, ksession->gid);
        } else {
                /* Empty path. */
                pathname = strdup("");
@@ -227,13 +234,18 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock,
                goto error;
        }
 
+       session = session_find_by_id(ksession->id);
+       assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+
        /* Prep channel message structure */
        consumer_init_add_channel_comm_msg(&lkm,
-                       session->metadata->key,
-                       session->id,
+                       ksession->metadata->key,
+                       ksession->id,
                        pathname,
-                       session->uid,
-                       session->gid,
+                       ksession->uid,
+                       ksession->gid,
                        consumer->net_seq_index,
                        DEFAULT_METADATA_NAME,
                        1,
@@ -252,17 +264,17 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock,
        health_code_update();
 
        /* Prep stream message structure */
-       consumer_init_stream_comm_msg(&lkm,
-                       LTTNG_CONSUMER_ADD_STREAM,
-                       session->metadata->key,
-                       session->metadata_stream_fd,
-                       0); /* CPU: 0 for metadata. */
+       consumer_init_add_stream_comm_msg(&lkm,
+                       ksession->metadata->key,
+                       ksession->metadata_stream_fd,
+                       0 /* CPU: 0 for metadata. */,
+                       session->current_archive_id);
 
        health_code_update();
 
        /* Send stream and file descriptor */
        ret = consumer_send_stream(sock, consumer, &lkm,
-                       &session->metadata_stream_fd, 1);
+                       &ksession->metadata_stream_fd, 1);
        if (ret < 0) {
                goto error;
        }
@@ -270,6 +282,7 @@ int kernel_consumer_add_metadata(struct consumer_socket *sock,
        health_code_update();
 
 error:
+       rcu_read_unlock();
        free(pathname);
        return ret;
 }
@@ -279,8 +292,10 @@ error:
  */
 static
 int kernel_consumer_add_stream(struct consumer_socket *sock,
-               struct ltt_kernel_channel *channel, struct ltt_kernel_stream *stream,
-               struct ltt_kernel_session *session, unsigned int monitor)
+               struct ltt_kernel_channel *channel,
+               struct ltt_kernel_stream *stream,
+               struct ltt_kernel_session *session, unsigned int monitor,
+               uint64_t trace_archive_id)
 {
        int ret;
        struct lttcomm_consumer_msg lkm;
@@ -299,11 +314,11 @@ int kernel_consumer_add_stream(struct consumer_socket *sock,
        consumer = session->consumer;
 
        /* Prep stream consumer message */
-       consumer_init_stream_comm_msg(&lkm,
-                       LTTNG_CONSUMER_ADD_STREAM,
+       consumer_init_add_stream_comm_msg(&lkm,
                        channel->key,
                        stream->fd,
-                       stream->cpu);
+                       stream->cpu,
+                       trace_archive_id);
 
        health_code_update();
 
@@ -359,20 +374,28 @@ error:
  * The consumer socket lock must be held by the caller.
  */
 int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
-               struct ltt_kernel_channel *channel, struct ltt_kernel_session *session,
+               struct ltt_kernel_channel *channel, struct ltt_kernel_session *ksession,
                unsigned int monitor)
 {
        int ret = LTTNG_OK;
        struct ltt_kernel_stream *stream;
+       struct ltt_session *session;
 
        /* Safety net */
        assert(channel);
-       assert(session);
-       assert(session->consumer);
+       assert(ksession);
+       assert(ksession->consumer);
        assert(sock);
 
+       rcu_read_lock();
+
+       session = session_find_by_id(ksession->id);
+       assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+
        /* Bail out if consumer is disabled */
-       if (!session->consumer->enabled) {
+       if (!ksession->consumer->enabled) {
                ret = LTTNG_OK;
                goto error;
        }
@@ -381,7 +404,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
                        channel->channel->name);
 
        if (!channel->sent_to_consumer) {
-               ret = kernel_consumer_add_channel(sock, channel, session, monitor);
+               ret = kernel_consumer_add_channel(sock, channel, ksession, monitor);
                if (ret < 0) {
                        goto error;
                }
@@ -395,8 +418,8 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
                }
 
                /* Add stream on the kernel consumer side. */
-               ret = kernel_consumer_add_stream(sock, channel, stream, session,
-                               monitor);
+               ret = kernel_consumer_add_stream(sock, channel, stream,
+                               ksession, monitor, session->current_archive_id);
                if (ret < 0) {
                        goto error;
                }
@@ -404,6 +427,7 @@ int kernel_consumer_send_channel_streams(struct consumer_socket *sock,
        }
 
 error:
+       rcu_read_unlock();
        return ret;
 }
 
index 19a325da1f1421b27c73bfb7866ae5204b87ca7b..fc817b2db304187675a38e1c5d8904658fe670a5 100644 (file)
@@ -990,6 +990,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
        struct consumer_socket *socket;
        struct lttng_ht_iter iter;
        struct ltt_kernel_metadata *saved_metadata;
+       struct ltt_session *session;
+       uint64_t trace_archive_id;
 
        assert(ksess);
        assert(ksess->consumer);
@@ -997,6 +999,12 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
 
        DBG("Kernel snapshot record started");
 
+       session = session_find_by_id(ksess->id);
+       assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+       trace_archive_id = session->current_archive_id;
+
        /* Save current metadata since the following calls will change it. */
        saved_metadata = ksess->metadata;
        saved_metadata_fd = ksess->metadata_stream_fd;
@@ -1044,7 +1052,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                        ret = consumer_snapshot_channel(socket, chan->key, output, 0,
                                        ksess->uid, ksess->gid,
                                        DEFAULT_KERNEL_TRACE_DIR, wait,
-                                       nb_packets_per_stream);
+                                       nb_packets_per_stream,
+                                       trace_archive_id);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                                (void) kernel_consumer_destroy_metadata(socket,
@@ -1056,7 +1065,8 @@ int kernel_snapshot_record(struct ltt_kernel_session *ksess,
                /* Snapshot metadata, */
                ret = consumer_snapshot_channel(socket, ksess->metadata->key, output,
                                1, ksess->uid, ksess->gid,
-                               DEFAULT_KERNEL_TRACE_DIR, wait, 0);
+                               DEFAULT_KERNEL_TRACE_DIR, wait, 0,
+                               trace_archive_id);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                        goto error_consumer;
index f10a3dadf0133747484b8302bd078d1bf3b5b3cd..160f567c89b5c3ca755ef0735233c6b3a436a1d5 100644 (file)
@@ -2483,7 +2483,8 @@ error:
  */
 static int do_consumer_create_channel(struct ltt_ust_session *usess,
                struct ust_app_session *ua_sess, struct ust_app_channel *ua_chan,
-               int bitness, struct ust_registry_session *registry)
+               int bitness, struct ust_registry_session *registry,
+               uint64_t trace_archive_id)
 {
        int ret;
        unsigned int nb_fd = 0;
@@ -2518,7 +2519,7 @@ static int do_consumer_create_channel(struct ltt_ust_session *usess,
         * stream we have to expect.
         */
        ret = ust_consumer_ask_channel(ua_sess, ua_chan, usess->consumer, socket,
-                       registry);
+                       registry, trace_archive_id);
        if (ret < 0) {
                goto error_ask;
        }
@@ -2857,6 +2858,9 @@ static int create_channel_per_uid(struct ust_app *app,
        int ret;
        struct buffer_reg_uid *reg_uid;
        struct buffer_reg_channel *reg_chan;
+       struct ltt_session *session;
+       enum lttng_error_code notification_ret;
+       struct ust_registry_channel *chan_reg;
 
        assert(app);
        assert(usess);
@@ -2887,12 +2891,18 @@ static int create_channel_per_uid(struct ust_app *app,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+
        /*
         * Create the buffers on the consumer side. This call populates the
         * ust app channel object with all streams and data object.
         */
        ret = do_consumer_create_channel(usess, ua_sess, ua_chan,
-                       app->bits_per_long, reg_uid->registry->reg.ust);
+                       app->bits_per_long, reg_uid->registry->reg.ust,
+                       session->current_archive_id);
        if (ret < 0) {
                ERR("Error creating UST channel \"%s\" on the consumer daemon",
                                ua_chan->name);
@@ -2918,39 +2928,26 @@ static int create_channel_per_uid(struct ust_app *app,
                goto error;
        }
 
-       {
-               enum lttng_error_code cmd_ret;
-               struct ltt_session *session;
-               uint64_t chan_reg_key;
-               struct ust_registry_channel *chan_reg;
-
-               chan_reg_key = ua_chan->tracing_channel_id;
+       /* Notify the notification subsystem of the channel's creation. */
+       pthread_mutex_lock(&reg_uid->registry->reg.ust->lock);
+       chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust,
+                       ua_chan->tracing_channel_id);
+       assert(chan_reg);
+       chan_reg->consumer_key = ua_chan->key;
+       chan_reg = NULL;
+       pthread_mutex_unlock(&reg_uid->registry->reg.ust->lock);
 
-               pthread_mutex_lock(&reg_uid->registry->reg.ust->lock);
-               chan_reg = ust_registry_channel_find(reg_uid->registry->reg.ust,
-                               chan_reg_key);
-               assert(chan_reg);
-               chan_reg->consumer_key = ua_chan->key;
-               chan_reg = NULL;
-               pthread_mutex_unlock(&reg_uid->registry->reg.ust->lock);
-
-               session = session_find_by_id(ua_sess->tracing_id);
-               assert(session);
-
-               assert(pthread_mutex_trylock(&session->lock));
-               assert(session_trylock_list());
-               cmd_ret = notification_thread_command_add_channel(
-                               notification_thread_handle, session->name,
-                               ua_sess->euid, ua_sess->egid,
-                               ua_chan->name,
-                               ua_chan->key,
-                               LTTNG_DOMAIN_UST,
-                               ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
-               if (cmd_ret != LTTNG_OK) {
-                       ret = - (int) cmd_ret;
-                       ERR("Failed to add channel to notification thread");
-                       goto error;
-               }
+       notification_ret = notification_thread_command_add_channel(
+                       notification_thread_handle, session->name,
+                       ua_sess->euid, ua_sess->egid,
+                       ua_chan->name,
+                       ua_chan->key,
+                       LTTNG_DOMAIN_UST,
+                       ua_chan->attr.subbuf_size * ua_chan->attr.num_subbuf);
+       if (notification_ret != LTTNG_OK) {
+               ret = - (int) notification_ret;
+               ERR("Failed to add channel to notification thread");
+               goto error;
        }
 
 send_channel:
@@ -3007,9 +3004,16 @@ static int create_channel_per_pid(struct ust_app *app,
                goto error;
        }
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+
        /* Create and get channel on the consumer side. */
        ret = do_consumer_create_channel(usess, ua_sess, ua_chan,
-                       app->bits_per_long, registry);
+                       app->bits_per_long, registry,
+                       session->current_archive_id);
        if (ret < 0) {
                ERR("Error creating UST channel \"%s\" on the consumer daemon",
                        ua_chan->name);
@@ -3024,9 +3028,6 @@ static int create_channel_per_pid(struct ust_app *app,
                goto error;
        }
 
-       session = session_find_by_id(ua_sess->tracing_id);
-       assert(session);
-
        chan_reg_key = ua_chan->key;
        pthread_mutex_lock(&registry->lock);
        chan_reg = ust_registry_channel_find(registry, chan_reg_key);
@@ -3034,9 +3035,6 @@ static int create_channel_per_pid(struct ust_app *app,
        chan_reg->consumer_key = ua_chan->key;
        pthread_mutex_unlock(&registry->lock);
 
-       assert(pthread_mutex_trylock(&session->lock));
-       assert(session_trylock_list());
-
        cmd_ret = notification_thread_command_add_channel(
                        notification_thread_handle, session->name,
                        ua_sess->euid, ua_sess->egid,
@@ -3242,6 +3240,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
        struct ust_app_channel *metadata;
        struct consumer_socket *socket;
        struct ust_registry_session *registry;
+       struct ltt_session *session;
 
        assert(ua_sess);
        assert(app);
@@ -3291,6 +3290,12 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
         */
        registry->metadata_key = metadata->key;
 
+       session = session_find_by_id(ua_sess->tracing_id);
+       assert(session);
+
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+
        /*
         * Ask the metadata channel creation to the consumer. The metadata object
         * will be created by the consumer and kept their. However, the stream is
@@ -3298,7 +3303,7 @@ static int create_ust_app_metadata(struct ust_app_session *ua_sess,
         * consumer.
         */
        ret = ust_consumer_ask_channel(ua_sess, metadata, consumer, socket,
-                       registry);
+                       registry, session->current_archive_id);
        if (ret < 0) {
                /* Nullify the metadata key so we don't try to close it later on. */
                registry->metadata_key = 0;
@@ -5946,12 +5951,20 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
        struct lttng_ht_iter iter;
        struct ust_app *app;
        char pathname[PATH_MAX];
+       struct ltt_session *session;
+       uint64_t trace_archive_id;
 
        assert(usess);
        assert(output);
 
        rcu_read_lock();
 
+       session = session_find_by_id(usess->id);
+       assert(session);
+       assert(pthread_mutex_trylock(&session->lock));
+       assert(session_trylock_list());
+       trace_archive_id = session->current_archive_id;
+
        switch (usess->buffer_type) {
        case LTTNG_BUFFER_PER_UID:
        {
@@ -5981,16 +5994,20 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                        /* Add the UST default trace dir to path. */
                        cds_lfht_for_each_entry(reg->registry->channels->ht, &iter.iter,
                                        reg_chan, node.node) {
-                               ret = consumer_snapshot_channel(socket, reg_chan->consumer_key,
-                                               output, 0, usess->uid, usess->gid, pathname, wait,
-                                               nb_packets_per_stream);
+                               ret = consumer_snapshot_channel(socket,
+                                               reg_chan->consumer_key,
+                                               output, 0, usess->uid,
+                                               usess->gid, pathname, wait,
+                                               nb_packets_per_stream,
+                                               trace_archive_id);
                                if (ret < 0) {
                                        goto error;
                                }
                        }
                        ret = consumer_snapshot_channel(socket,
                                        reg->registry->reg.ust->metadata_key, output, 1,
-                                       usess->uid, usess->gid, pathname, wait, 0);
+                                       usess->uid, usess->gid, pathname, wait, 0,
+                                       trace_archive_id);
                        if (ret < 0) {
                                goto error;
                        }
@@ -6031,9 +6048,12 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
 
                        cds_lfht_for_each_entry(ua_sess->channels->ht, &chan_iter.iter,
                                        ua_chan, node.node) {
-                               ret = consumer_snapshot_channel(socket, ua_chan->key, output,
-                                               0, ua_sess->euid, ua_sess->egid, pathname, wait,
-                                               nb_packets_per_stream);
+                               ret = consumer_snapshot_channel(socket,
+                                               ua_chan->key, output,
+                                               0, ua_sess->euid, ua_sess->egid,
+                                               pathname, wait,
+                                               nb_packets_per_stream,
+                                               trace_archive_id);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -6045,8 +6065,11 @@ int ust_app_snapshot_record(struct ltt_ust_session *usess,
                                ret = -1;
                                goto error;
                        }
-                       ret = consumer_snapshot_channel(socket, registry->metadata_key, output,
-                                       1, ua_sess->euid, ua_sess->egid, pathname, wait, 0);
+                       ret = consumer_snapshot_channel(socket,
+                                       registry->metadata_key, output,
+                                       1, ua_sess->euid, ua_sess->egid,
+                                       pathname, wait, 0,
+                                       trace_archive_id);
                        if (ret < 0) {
                                goto error;
                        }
index 01d71ce01e34f9a58f13feca4ec6f520e9664bf7..575593f03d85139ada759695c92622e217b1f721 100644 (file)
@@ -108,8 +108,11 @@ error:
  * Consumer socket lock MUST be acquired before calling this.
  */
 static int ask_channel_creation(struct ust_app_session *ua_sess,
-               struct ust_app_channel *ua_chan, struct consumer_output *consumer,
-               struct consumer_socket *socket, struct ust_registry_session *registry)
+               struct ust_app_channel *ua_chan,
+               struct consumer_output *consumer,
+               struct consumer_socket *socket,
+               struct ust_registry_session *registry,
+               uint64_t trace_archive_id)
 {
        int ret, output;
        uint32_t chan_id;
@@ -201,7 +204,8 @@ static int ask_channel_creation(struct ust_app_session *ua_sess,
                        ua_sess->output_traces,
                        ua_sess->uid,
                        ua_chan->attr.blocking_timeout,
-                       root_shm_path, shm_path);
+                       root_shm_path, shm_path,
+                       trace_archive_id);
 
        health_code_update();
 
@@ -239,8 +243,11 @@ error:
  * Returns 0 on success else a negative value.
  */
 int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
-               struct ust_app_channel *ua_chan, struct consumer_output *consumer,
-               struct consumer_socket *socket, struct ust_registry_session *registry)
+               struct ust_app_channel *ua_chan,
+               struct consumer_output *consumer,
+               struct consumer_socket *socket,
+               struct ust_registry_session *registry,
+               uint64_t trace_archive_id)
 {
        int ret;
 
@@ -257,7 +264,8 @@ int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
        }
 
        pthread_mutex_lock(socket->lock);
-       ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry);
+       ret = ask_channel_creation(ua_sess, ua_chan, consumer, socket, registry,
+                       trace_archive_id);
        pthread_mutex_unlock(socket->lock);
        if (ret < 0) {
                ERR("ask_channel_creation consumer command failed");
index ce0ab51939449cb6ece3f940de7d8bf0c1e27aae..b8bd655758130997c9bc917863f14935f1e5670b 100644 (file)
 
 #include "consumer.h"
 #include "ust-app.h"
+#include <stdint.h>
 
 int ust_consumer_ask_channel(struct ust_app_session *ua_sess,
-               struct ust_app_channel *ua_chan, struct consumer_output *consumer,
-               struct consumer_socket *socket, struct ust_registry_session *registry);
+               struct ust_app_channel *ua_chan,
+               struct consumer_output *consumer,
+               struct consumer_socket *socket,
+               struct ust_registry_session *registry,
+               uint64_t trace_archive_id);
 
 int ust_consumer_get_channel(struct consumer_socket *socket,
                struct ust_app_channel *ua_chan);
index 5fa0ec4faa8a76cbb2950d18d4d576e2f4852dc2..3e991c8c71035f6ee9f43f768db27247209e8707 100644 (file)
@@ -562,7 +562,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor)
+               unsigned int monitor,
+               uint64_t trace_archive_id)
 {
        int ret;
        struct lttng_consumer_stream *stream;
@@ -589,6 +590,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
+       stream->trace_archive_id = trace_archive_id;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
index acdc4b839701c324e0d7887798fcd37ff7ec4890..190de54ed79f0c91e0e7abe6f5d9f864ed9795fc 100644 (file)
@@ -416,6 +416,11 @@ struct lttng_consumer_stream {
        uint64_t last_discarded_events;
        /* Copy of the sequence number of the last packet extracted. */
        uint64_t last_sequence_number;
+       /*
+        * Session's current trace archive id at the time of the creation of
+        * this stream.
+        */
+       uint64_t trace_archive_id;
        /*
         * Index file object of the index file for this stream.
         */
@@ -728,7 +733,8 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                int cpu,
                int *alloc_ret,
                enum consumer_channel_type type,
-               unsigned int monitor);
+               unsigned int monitor,
+               uint64_t trace_archive_id);
 struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id,
                const char *pathname,
index ec67e316d714bc1dc030dfb54b065f7e18352360..66ae16bc8f09c224ffd3e1130bf9c1fa0ca0fa13 100644 (file)
@@ -644,7 +644,8 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.stream.cpu,
                                &alloc_ret,
                                channel->type,
-                               channel->monitor);
+                               channel->monitor,
+                               msg.u.stream.trace_archive_id);
                if (new_stream == NULL) {
                        switch (alloc_ret) {
                        case -ENOMEM:
index 8c4fe0ee7eed02293031b74ec9666fc375da72b6..59f7482eb729915ff6aceed83750a4732882d1c1 100644 (file)
@@ -456,6 +456,13 @@ struct lttcomm_consumer_msg {
                        int32_t cpu;    /* On which CPU this stream is assigned. */
                        /* Tells the consumer if the stream should be or not monitored. */
                        uint32_t no_monitor;
+                       /*
+                        * The archive id that was "current" at the time this
+                        * stream was created. This is used to determine
+                        * whether a rotation request was sent before or after
+                        * the creation of a stream.
+                        */
+                       uint64_t trace_archive_id;
                } LTTNG_PACKED stream;  /* Only used by Kernel. */
                struct {
                        uint64_t net_index;
@@ -505,6 +512,13 @@ struct lttcomm_consumer_msg {
                         */
                        uint32_t ust_app_uid;
                        int64_t blocking_timeout;
+                       /*
+                        * The archive id that was "current" at the time this
+                        * channel was created. This is used to determine
+                        * whether a rotation request was sent before or after
+                        * the creation of a channel.
+                        */
+                       uint64_t trace_archive_id;
                        char root_shm_path[PATH_MAX];
                        char shm_path[PATH_MAX];
                } LTTNG_PACKED ask_channel;
@@ -540,6 +554,12 @@ struct lttcomm_consumer_msg {
                        uint64_t relayd_id;             /* Relayd id if apply. */
                        uint64_t key;
                        uint64_t nb_packets_per_stream;
+                       /*
+                        * The session's current trace archive id is propagated
+                        * since a snapshot triggers the creation of an
+                        * ephemeral metadata stream.
+                        */
+                       uint64_t trace_archive_id;
                } LTTNG_PACKED snapshot_channel;
                struct {
                        uint64_t channel_key;
index 70bd140672e8ddb7aa5d4c3c39835148aee85ec9..cb5743fad626ba7a47e8130487ad94efa2172b07 100644 (file)
@@ -147,7 +147,8 @@ static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
  */
 static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx, int *_alloc_ret)
+               struct lttng_consumer_local_data *ctx, int *_alloc_ret,
+               uint64_t trace_archive_id)
 {
        int alloc_ret;
        struct lttng_consumer_stream *stream = NULL;
@@ -166,7 +167,8 @@ static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
                        cpu,
                        &alloc_ret,
                        channel->type,
-                       channel->monitor);
+                       channel->monitor,
+                       trace_archive_id);
        if (stream == NULL) {
                switch (alloc_ret) {
                case -ENOENT:
@@ -267,7 +269,8 @@ end:
  * Return 0 on success else a negative value.
  */
 static int create_ust_streams(struct lttng_consumer_channel *channel,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx,
+               uint64_t trace_archive_id)
 {
        int ret, cpu = 0;
        struct ustctl_consumer_stream *ustream;
@@ -298,7 +301,8 @@ static int create_ust_streams(struct lttng_consumer_channel *channel,
                }
 
                /* Allocate consumer stream object. */
-               stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
+               stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret,
+                               trace_archive_id);
                if (!stream) {
                        goto error_alloc;
                }
@@ -641,7 +645,8 @@ error:
  */
 static int ask_channel(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_channel *channel,
-               struct ustctl_consumer_channel_attr *attr)
+               struct ustctl_consumer_channel_attr *attr,
+               uint64_t trace_archive_id)
 {
        int ret;
 
@@ -682,7 +687,7 @@ static int ask_channel(struct lttng_consumer_local_data *ctx,
        }
 
        /* Open all streams for this channel. */
-       ret = create_ust_streams(channel, ctx);
+       ret = create_ust_streams(channel, ctx, trace_archive_id);
        if (ret < 0) {
                goto end;
        }
@@ -986,7 +991,8 @@ end:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx,
+               uint64_t trace_archive_id)
 {
        int ret = 0;
        struct lttng_consumer_channel *metadata_channel;
@@ -1026,7 +1032,7 @@ static int snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id,
         * The metadata stream is NOT created in no monitor mode when the channel
         * is created on a sessiond ask channel command.
         */
-       ret = create_ust_streams(metadata_channel, ctx);
+       ret = create_ust_streams(metadata_channel, ctx, trace_archive_id);
        if (ret < 0) {
                goto error;
        }
@@ -1081,7 +1087,8 @@ error:
  * Returns 0 on success, < 0 on error
  */
 static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id,
-               uint64_t nb_packets_per_stream, struct lttng_consumer_local_data *ctx)
+               uint64_t nb_packets_per_stream,
+               struct lttng_consumer_local_data *ctx)
 {
        int ret;
        unsigned use_relayd = 0;
@@ -1492,7 +1499,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
 
                health_code_update();
 
-               ret = ask_channel(ctx, channel, &attr);
+               ret = ask_channel(ctx, channel, &attr,
+                               msg.u.ask_channel.trace_archive_id);
                if (ret < 0) {
                        goto end_channel_error;
                }
@@ -1747,7 +1755,8 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        ret = snapshot_metadata(msg.u.snapshot_channel.key,
                                        msg.u.snapshot_channel.pathname,
                                        msg.u.snapshot_channel.relayd_id,
-                                       ctx);
+                                       ctx,
+                                       msg.u.snapshot_channel.trace_archive_id);
                        if (ret < 0) {
                                ERR("Snapshot metadata failed");
                                ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
This page took 0.041839 seconds and 5 git commands to generate.