Initialize relay_stream chunk_id to its session's current trace archive id
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 10 May 2018 21:14:39 +0000 (17:14 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 5 Jun 2018 16:29:53 +0000 (12:29 -0400)
Initializing the relayd's streams with a stream_chunk_id allows the
relayd to differentiate between a stream created before the first
rotation (at chunk id == 0) vs. a stream that has been created
after the last (or pending) rotation.

Before this fix, the relayd can fail to identify that a rotation
has been completed.

This is caused by the fact that a stream's chunk id is initialized
to 0 and updated by the RELAYD_ROTATE_STREAM command to the
id of the chunk that is currently being rotated.

The 'stream->current_chunk_id.value < chunk_id' check performed
by the RELAYD_ROTATE_PENDING will cause rotations to never
complete for streams that are created between the launch of a
rotation and the check for its completion.

For example, when the relayd is checking whether the rotation id
'3' is completed, it may see streams with the default value of
their chunk id set to '0' and determine that a rotation is still
pending.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-relayd/stream.c
src/bin/lttng-relayd/stream.h

index a04303e4f5814a47a1b7bce3eec973179f6d3866..44fb2e27cb99a1bc34feeeaaa7d7d618bcbb72f3 100644 (file)
@@ -1198,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
+       struct relay_stream_chunk_id stream_chunk_id = { 0 };
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1237,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-                       channel_name, tracefile_size, tracefile_count);
+               channel_name, tracefile_size, tracefile_count,
+               &stream_chunk_id);
        path_name = NULL;
        channel_name = NULL;
 
@@ -2483,7 +2485,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       stream->chunk_id = stream_info.new_chunk_id;
+       assert(stream->current_chunk_id.is_set);
+       stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
                /*
@@ -2839,7 +2842,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        rotate_pending = true;
                        DBG("Stream %" PRIu64 " is still rotating",
                                        stream->stream_handle);
-               } else if (stream->chunk_id < chunk_id) {
+               } else if (stream->current_chunk_id.value < chunk_id) {
                        /*
                         * Stream closed on the consumer but still active on the
                         * relay.
index 709b3b1f5e7af9d135a24b0b3d67a957f6f3fcbe..326ec4bf2ff2b970d6d46465c1abe9b949c65456 100644 (file)
@@ -68,7 +68,8 @@ end:
 struct relay_stream *stream_create(struct ctf_trace *trace,
        uint64_t stream_handle, char *path_name,
        char *channel_name, uint64_t tracefile_size,
-       uint64_t tracefile_count)
+       uint64_t tracefile_count,
+       const struct relay_stream_chunk_id *chunk_id)
 {
        int ret;
        struct relay_stream *stream = NULL;
@@ -94,6 +95,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
        urcu_ref_init(&stream->ref);
        ctf_trace_get(trace);
        stream->trace = trace;
+       stream->current_chunk_id = *chunk_id;
 
        stream->indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!stream->indexes_ht) {
index 47ae2e8f9927fdcd672a595bf3c20902fd375846..fb3e1ed68b936e1fd7411273c4b7a9b338f76e0b 100644 (file)
 #include "stream-fd.h"
 #include "tracefile-array.h"
 
+struct relay_stream_chunk_id {
+       bool is_set;
+       uint64_t value;
+};
+
 /*
  * Represents a stream in the relay
  */
@@ -150,14 +155,17 @@ struct relay_stream {
         * atomically with rotate_at_seq_num.
         *
         * Always access with stream lock held.
+        *
+        * This attribute is not set if the stream is created by a pre-2.11
+        * consumer.
         */
-       uint64_t chunk_id;
+       struct relay_stream_chunk_id current_chunk_id;
 };
 
 struct relay_stream *stream_create(struct ctf_trace *trace,
        uint64_t stream_handle, char *path_name,
        char *channel_name, uint64_t tracefile_size,
-       uint64_t tracefile_count);
+       uint64_t tracefile_count, const struct relay_stream_chunk_id *chunk_id);
 
 struct relay_stream *stream_get_by_id(uint64_t stream_id);
 bool stream_get(struct relay_stream *stream);
This page took 0.032348 seconds and 5 git commands to generate.