relayd: keep track of prev_index_seq in relayd_stream
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index fcc7fc00ae55c844b935ef065028e3526c384257..92f9a5600ac28b7280dcea9f1041b7725b13b748 100644 (file)
@@ -1198,6 +1198,7 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        uint64_t stream_handle = -1ULL;
        char *path_name = NULL, *channel_name = NULL;
        uint64_t tracefile_size = 0, tracefile_count = 0;
+       struct relay_stream_chunk_id stream_chunk_id = { 0 };
 
        if (!session || !conn->version_check_done) {
                ERR("Trying to add a stream before version check");
@@ -1216,7 +1217,9 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
        } else {
                /* From 2.11 to ... */
                ret = cmd_recv_stream_2_11(payload, &path_name,
-                       &channel_name, &tracefile_size, &tracefile_count);
+                       &channel_name, &tracefile_size, &tracefile_count,
+                       &stream_chunk_id.value);
+               stream_chunk_id.is_set = true;
        }
 
        if (ret < 0) {
@@ -1235,7 +1238,8 @@ static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
 
        /* We pass ownership of path_name and channel_name. */
        stream = stream_create(trace, stream_handle, path_name,
-                       channel_name, tracefile_size, tracefile_count);
+               channel_name, tracefile_size, tracefile_count,
+               &stream_chunk_id);
        path_name = NULL;
        channel_name = NULL;
 
@@ -2310,12 +2314,18 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
+               stream->prev_index_seq = index_info.net_seq_num;
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
        } else {
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
                ERR("relay_index_try_flush error %d", ret);
-               relay_index_put(index);
                ret = -1;
        }
 
@@ -2481,7 +2491,8 @@ static int relay_rotate_session_stream(const struct lttcomm_relayd_hdr *recv_hdr
                goto end_stream_unlock;
        }
 
-       stream->chunk_id = stream_info.new_chunk_id;
+       assert(stream->current_chunk_id.is_set);
+       stream->current_chunk_id.value = stream_info.new_chunk_id;
 
        if (stream->is_metadata) {
                /*
@@ -2815,7 +2826,8 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
 
        chunk_id = be64toh(msg.chunk_id);
 
-       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+       DBG("Evaluating rotate pending for session \"%s\" and  chunk id %" PRIu64,
+                       session->session_name, chunk_id);
 
        /*
         * Iterate over all the streams in the session and check if they are
@@ -2837,7 +2849,7 @@ int relay_rotate_pending(const struct lttcomm_relayd_hdr *recv_hdr,
                        rotate_pending = true;
                        DBG("Stream %" PRIu64 " is still rotating",
                                        stream->stream_handle);
-               } else if (stream->chunk_id < chunk_id) {
+               } else if (stream->current_chunk_id.value < chunk_id) {
                        /*
                         * Stream closed on the consumer but still active on the
                         * relay.
@@ -3211,9 +3223,13 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                /* No flush. */
                ret = 0;
        } else {
-               /* Put self-ref for this index due to error. */
-               relay_index_put(index);
-               index = NULL;
+               /*
+                * ret < 0
+                *
+                * relay_index_try_flush is responsible for the self-reference
+                * put of the index object on error.
+                */
+               ERR("relay_index_try_flush error %d", ret);
                ret = -1;
        }
 end:
@@ -3467,6 +3483,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        if (index_flushed) {
                stream->pos_after_last_complete_data_index =
                                stream->tracefile_size_current;
+               stream->prev_index_seq = state->header.net_seq_num;
        }
 
        stream->prev_seq = state->header.net_seq_num;
This page took 0.025468 seconds and 5 git commands to generate.