start to handle async rotation on the relay + basics for remote rotate_pending
authorJulien Desfossez <jdesfossez@efficios.com>
Wed, 6 Sep 2017 20:30:27 +0000 (16:30 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Wed, 6 Sep 2017 20:30:27 +0000 (16:30 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
15 files changed:
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/kernel.c
src/bin/lttng-sessiond/ust-app.c
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h
src/common/ust-consumer/ust-consumer.c

index c1b044f5786ae9c98f777c0420ab502bcb371b3e..c6d61a186f095f339193a509c701f67e56d73bbf 100644 (file)
@@ -1507,6 +1507,100 @@ end:
        return ret;
 }
 
+static
+int rotate_index_file(struct relay_stream *stream)
+{
+       int ret;
+       uint32_t major, minor;
+
+       /* Put ref on previous index_file. */
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       major = stream->trace->session->major;
+       minor = stream->trace->session->minor;
+       stream->index_file = lttng_index_file_create(stream->path_name,
+                       stream->channel_name,
+                       -1, -1, stream->tracefile_size,
+                       tracefile_array_get_file_index_head(stream->tfa),
+                       lttng_to_index_major(major, minor),
+                       lttng_to_index_minor(major, minor));
+       if (!stream->index_file) {
+               ret = -1;
+               goto end;
+       }
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int check_rotate_stream(struct relay_stream *stream)
+{
+       int ret;
+
+       /* No rotation expected */
+       if (stream->rotate_at_seq_num == -1ULL) {
+               ret = 0;
+               goto end;
+       }
+
+       if (stream->prev_seq < stream->rotate_at_seq_num) {
+               DBG("Stream %" PRIu64 " no yet ready for rotation",
+                               stream->stream_handle);
+               fprintf(stderr, "Stream %" PRIu64 " no yet ready for rotation\n",
+                               stream->stream_handle);
+               ret = 0;
+               goto end;
+       } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+               DBG("Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating", stream->stream_handle);
+               fprintf(stderr, "Rotation after too much data has been written in tracefile "
+                               "for stream %" PRIu64 ", need to truncate before "
+                               "rotating\n", stream->stream_handle);
+               /* TODO */
+       } else {
+               DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle);
+               fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle);
+       }
+
+       /* Perform the stream rotation. */
+       ret = utils_rotate_stream_file(stream->path_name,
+                       stream->channel_name, stream->tracefile_size,
+                       stream->tracefile_count, -1,
+                       -1, stream->stream_fd->fd,
+                       NULL, &stream->stream_fd->fd);
+       if (ret < 0) {
+               ERR("Rotating stream output file");
+               goto end;
+       }
+       stream->tracefile_size_current = 0;
+
+       /* Rotate also the index if the stream is not a metadata stream. */
+       if (!stream->is_metadata) {
+               ret = rotate_index_file(stream);
+               if (ret < 0) {
+                       ERR("Failed to rotate index file");
+                       goto end;
+               }
+       }
+
+       stream->rotate_at_seq_num = -1ULL;
+
+end:
+       return ret;
+}
+
 /*
  * relay_recv_metadata: receive the metadata for the session.
  */
@@ -1590,6 +1684,12 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
+       ret = check_rotate_stream(metadata_stream);
+       if (ret < 0) {
+               ERR("Check rotate stream");
+               goto end_put;
+       }
+
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
@@ -2122,36 +2222,6 @@ end_no_session:
        return ret;
 }
 
-static
-int rotate_index_file(struct relay_stream *stream)
-{
-       int ret;
-       uint32_t major, minor;
-
-       /* Put ref on previous index_file. */
-       if (stream->index_file) {
-               lttng_index_file_put(stream->index_file);
-               stream->index_file = NULL;
-       }
-       major = stream->trace->session->major;
-       minor = stream->trace->session->minor;
-       stream->index_file = lttng_index_file_create(stream->path_name,
-                       stream->channel_name,
-                       -1, -1, stream->tracefile_size,
-                       tracefile_array_get_file_index_head(stream->tfa),
-                       lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor));
-       if (!stream->index_file) {
-               ret = -1;
-               goto end;
-       }
-
-       ret = 0;
-
-end:
-       return ret;
-}
-
 /*
  * relay_rotate_stream: rotate a stream to a new tracefile for the session
  * rotation feature (not the tracefile rotation feature).
@@ -2208,8 +2278,10 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
-       fprintf(stderr, "Rotating stream %lu to %s/\n",
-                       be64toh(stream_info.stream_id), stream_info.new_pathname);
+       fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n",
+                       be64toh(stream_info.stream_id), stream_info.new_pathname,
+                       stream->prev_seq, be64toh(stream_info.rotate_at_seq_num));
+       fprintf(stderr, "metadata: %d\n", stream->is_metadata);
 
        pthread_mutex_lock(&stream->lock);
 
@@ -2226,25 +2298,15 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr,
                ERR("relay creating output directory");
                goto end;
        }
-       ret = utils_rotate_stream_file(stream->path_name,
-                       stream->channel_name, stream->tracefile_size,
-                       stream->tracefile_count, -1,
-                       -1, stream->stream_fd->fd,
-                       NULL, &stream->stream_fd->fd);
+       stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+       stream->chunk_id = be64toh(stream_info.new_chunk_id);
+       /* FIXME: YOU ARE HERE
+        * metadata stream does not have a valid seq_num, but is sent on the control connection */
+       ret = check_rotate_stream(stream);
        if (ret < 0) {
-               ERR("Rotating stream output file");
+               ERR("Check rotate stream");
                goto end_stream_unlock;
        }
-       stream->tracefile_size_current = 0;
-
-       /* Rotate also the index if the stream is not a metadata stream. */
-       if (!stream->is_metadata) {
-               ret = rotate_index_file(stream);
-               if (ret < 0) {
-                       ERR("Failed to rotate index file");
-                       goto end_stream_unlock;
-               }
-       }
 
 end_stream_unlock:
        pthread_mutex_unlock(&stream->lock);
@@ -2643,6 +2705,12 @@ static int relay_process_data(struct relay_connection *conn)
 
        stream->prev_seq = net_seq_num;
 
+       ret = check_rotate_stream(stream);
+       if (ret < 0) {
+               ERR("Check rotate stream");
+               goto end_stream_unlock;
+       }
+
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
index 2bc815813613054f58f39810b9c8df0dcade0117..5ed37c58ec1d2c0956b550ac837bac314d1e33a0 100644 (file)
@@ -88,6 +88,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        stream->tracefile_count = tracefile_count;
        stream->path_name = path_name;
        stream->channel_name = channel_name;
+       stream->rotate_at_seq_num = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
        urcu_ref_init(&stream->ref);
index e385032cb71f98d4994dfad241e915f61e0cb28a..e9ddb5f21833e839e9d3f88002aa3b435423365f 100644 (file)
@@ -129,6 +129,22 @@ struct relay_stream {
        struct lttng_ht_node_u64 node;
        bool in_stream_ht;              /* is stream in stream hash table. */
        struct rcu_head rcu_node;       /* For call_rcu teardown. */
+       /*
+        * When we have written the data and index corresponding to this
+        * seq_num, rotate the tracefile (session rotation). The path_name is
+        * already up-to-date.
+        * This is set to -1ULL when no rotation is pending. Always
+        * read/updated with stream lock held.
+        */
+       uint64_t rotate_at_seq_num;
+       /*
+        * This is the id of the chunk where we are writing to if no rotate is
+        * pending (rotate_at_seq_num == -1ULL). If a rotate is pending, this
+        * is the chunk_id we will have after the rotation. It must be updated
+        * atomically with rotate_at_seq_num. Always read/updated with stream
+        * lock held.
+        */
+       uint64_t chunk_id;
 };
 
 struct relay_stream *stream_create(struct ctf_trace *trace,
index 23103ffa728211e42347e97f525a37fa623411bd..537090b70709f855541cc635dcae2d3b5aff71f0 100644 (file)
@@ -1594,10 +1594,14 @@ end:
 /*
  * Ask the consumer to rotate a channel.
  * app_pathname only used for UST, it contains the path after /ust/.
+ *
+ * The new_chunk_id is the session->rotate_count that has been incremented
+ * when the rotation started. On the relay, this allows to keep track in which
+ * chunk each stream is currently writing to (for the rotate_pending operation).
  */
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, struct consumer_output *output,
-               char *app_pathname, uint32_t metadata)
+               char *app_pathname, uint32_t metadata, uint64_t new_chunk_id)
 {
        int ret;
        struct lttcomm_consumer_msg msg;
@@ -1611,6 +1615,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
        msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
        msg.u.rotate_channel.key = key;
        msg.u.rotate_channel.metadata = metadata;
+       msg.u.rotate_channel.new_chunk_id = new_chunk_id;
 
        if (output->type == CONSUMER_DST_NET) {
                fprintf(stderr, "BASE: %s\n", output->dst.net.base_dir);
index 66ee67310da744415697f59600130251e2ba9f44..ae303222e1891f4317506d1e7a562a0a779161a1 100644 (file)
@@ -326,7 +326,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
 
 int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
                uid_t uid, gid_t gid, struct consumer_output *output,
-               char *tmp, uint32_t metadata);
+               char *tmp, uint32_t metadata, uint64_t new_chunk_id);
 int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
                struct consumer_output *output, char *current_path, char *new_path,
                uid_t uid, gid_t gid);
index ef5733c66eb2b652e66cb6c508b52183fee37429..43fc935536612bf5af9287cc32bac4c45d4010b7 100644 (file)
@@ -1179,7 +1179,8 @@ int kernel_rotate_session(struct ltt_session *session)
 
                        ret = consumer_rotate_channel(socket, chan->fd,
                                        ksess->uid, ksess->gid, ksess->consumer,
-                                       "", 0);
+                                       "", 0,
+                                       session->rotate_count);
                        if (ret < 0) {
                                ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                                pthread_mutex_unlock(socket->lock);
@@ -1194,7 +1195,8 @@ int kernel_rotate_session(struct ltt_session *session)
                 */
                pthread_mutex_lock(socket->lock);
                ret = consumer_rotate_channel(socket, ksess->metadata->fd,
-                               ksess->uid, ksess->gid, ksess->consumer, "", 1);
+                               ksess->uid, ksess->gid, ksess->consumer, "", 1,
+                               session->rotate_count);
                if (ret < 0) {
                        ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
                        pthread_mutex_unlock(socket->lock);
index 368e5866713a9418eab4dcd4e11be9ae60f71b32..2283f980ae079c53c483f5c9f11e58aa4600b099 100644 (file)
@@ -6359,7 +6359,8 @@ int ust_app_rotate_session(struct ltt_session *session)
                                ret = consumer_rotate_channel(socket,
                                                reg_chan->consumer_key,
                                                usess->uid, usess->gid,
-                                               usess->consumer, pathname, 0);
+                                               usess->consumer, pathname, 0,
+                                               session->rotate_count);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -6370,7 +6371,8 @@ int ust_app_rotate_session(struct ltt_session *session)
                        ret = consumer_rotate_channel(socket,
                                        reg->registry->reg.ust->metadata_key,
                                        usess->uid, usess->gid,
-                                       usess->consumer, pathname, 1);
+                                       usess->consumer, pathname, 1,
+                                       session->rotate_count);
                        if (ret < 0) {
                                goto error;
                        }
@@ -6441,7 +6443,8 @@ int ust_app_rotate_session(struct ltt_session *session)
                                }
                                ret = consumer_rotate_channel(socket, ua_chan->key,
                                                ua_sess->euid, ua_sess->egid,
-                                               ua_sess->consumer, pathname, 0);
+                                               ua_sess->consumer, pathname, 0,
+                                               session->rotate_count);
                                if (ret < 0) {
                                        goto error;
                                }
@@ -6451,7 +6454,8 @@ int ust_app_rotate_session(struct ltt_session *session)
                        (void) push_metadata(registry, usess->consumer);
                        ret = consumer_rotate_channel(socket, registry->metadata_key,
                                        ua_sess->euid, ua_sess->egid,
-                                       ua_sess->consumer, pathname, 1);
+                                       ua_sess->consumer, pathname, 1,
+                                       session->rotate_count);
                        if (ret < 0) {
                                goto error;
                        }
index f187584f3913f21d6f8c20ae4af98722b0394b90..f8b25665f1df4ae83279a9801d8e54ca52b2eded 100644 (file)
@@ -3936,7 +3936,7 @@ end:
  * Returns 0 on success, < 0 on error
  */
 int lttng_consumer_rotate_channel(uint64_t key, char *path,
-               uint64_t relayd_id, uint32_t metadata,
+               uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
@@ -3956,6 +3956,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path,
                goto end;
        }
        pthread_mutex_lock(&channel->lock);
+       channel->current_chunk_id = new_chunk_id;
        snprintf(channel->pathname, PATH_MAX, "%s", path);
        ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG,
                        channel->uid, channel->gid);
@@ -4163,8 +4164,11 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
                goto end;
        }
 
+       /* FIXME: chan_ro ? */
        ret = relayd_rotate_stream(&relayd->control_sock,
-                       stream->relayd_stream_id, stream->channel_ro_pathname);
+                       stream->relayd_stream_id, stream->channel_ro_pathname,
+                       stream->chan->current_chunk_id,
+                       stream->last_sequence_number);
 
 end:
        return ret;
index 92552504eb796ebc03190a9d8b7d8ae6db7d1da0..63254007af65bf912e38418848275f2c727ab7a3 100644 (file)
@@ -236,6 +236,13 @@ struct lttng_consumer_channel {
         * daemon that this channel has finished its rotation.
         */
        uint64_t nr_stream_rotate_pending;
+
+       /*
+        * The chunk id where we currently write the data. This value is sent
+        * to the relay when we add a stream and when a stream rotates. This
+        * allows to keep track of where each stream on the relay is writing.
+        */
+       uint64_t current_chunk_id;
 };
 
 /*
@@ -818,7 +825,7 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_channel(uint64_t key, char *path,
                uint64_t relayd_id, uint32_t metadata,
-               struct lttng_consumer_local_data *ctx);
+               uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx);
 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
                unsigned long len);
 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
index bdca6708721947f3fec0d24781feeb68f853c82e..4fb586c085f8f0fb6d439cdabecb941b9fb276d3 100644 (file)
@@ -1141,6 +1141,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.rotate_channel.pathname,
                                msg.u.rotate_channel.relayd_id,
                                msg.u.rotate_channel.metadata,
+                               msg.u.rotate_channel.new_chunk_id,
                                ctx);
                if (ret < 0) {
                        ERR("Rotate channel failed");
index dad845d5c95f08e8f323b496d169f55f710cf306..6e3e91bfebbe861c51f62a7c58e94dc516dea695 100644 (file)
@@ -979,7 +979,8 @@ error:
 }
 
 int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
-               const char *new_pathname)
+               const char *new_pathname, uint64_t new_chunk_id,
+               uint64_t seq_num)
 {
        int ret;
        struct lttcomm_relayd_rotate_stream msg;
@@ -992,6 +993,8 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
 
        memset(&msg, 0, sizeof(msg));
        msg.stream_id = htobe64(stream_id);
+       msg.rotate_at_seq_num = htobe64(seq_num);
+       msg.new_chunk_id = htobe64(new_chunk_id);
        if (lttng_strncpy(msg.new_pathname, new_pathname,
                                sizeof(msg.new_pathname))) {
                ret = -1;
index 4c5c2daf5cacc12e4d0b75385f321f2c1ef7fb6d..f94a8e227aa339618fe9268bc1a7730f59d968a8 100644 (file)
@@ -53,7 +53,7 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
 int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
-               const char *new_pathname);
+               const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
 int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
                const char *current_path, const char *new_path);
 
index bc754ee5caeb58650131396f1603bb533d5976f4..921ff182a87cdf18abb9aa18de2bf0ac235651a4 100644 (file)
@@ -209,6 +209,8 @@ struct lttcomm_relayd_reset_metadata {
 
 struct lttcomm_relayd_rotate_stream {
        uint64_t stream_id;
+       uint64_t rotate_at_seq_num;
+       uint64_t new_chunk_id;
        char new_pathname[LTTNG_PATH_MAX];
 } LTTNG_PACKED;
 
index c9c39d1f1e9341ccbff97f40487b833707cdab3d..5e045be455af7e97fdc234ba807298b5786d2e0e 100644 (file)
@@ -549,6 +549,7 @@ struct lttcomm_consumer_msg {
                        uint32_t metadata; /* This is a metadata channel. */
                        uint64_t relayd_id; /* Relayd id if apply. */
                        uint64_t key;
+                       uint64_t new_chunk_id;
                } LTTNG_PACKED rotate_channel;
                struct {
                        char current_path[PATH_MAX];
index 3a67c7d5b593a4861946b5f73c2a827dcdd4a58a..2a02568b0dd74b328743fa36ccdc7559e3885cd4 100644 (file)
@@ -1939,6 +1939,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                msg.u.rotate_channel.pathname,
                                msg.u.rotate_channel.relayd_id,
                                msg.u.rotate_channel.metadata,
+                               msg.u.rotate_channel.new_chunk_id,
                                ctx);
                if (ret < 0) {
                        ERR("Rotate channel failed");
This page took 0.037242 seconds and 5 git commands to generate.