Fix: relayd: use packet sequence number for rotation position
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 755fb6734072113af804e59f6e1e81089adb6b1f..c0aeb17190d187e12155e8a162ef89204647df2b 100644 (file)
@@ -368,18 +368,23 @@ static int try_rotate_stream_data(struct relay_stream *stream)
        }
 
        if (stream->prev_data_seq == -1ULL ||
-                       stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+                       stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+                       stream->prev_data_seq <
+                       stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * The next packet that will be written is not part of the next
                 * chunk yet.
                 */
-               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+               DBG("Stream %" PRIu64 " data not yet ready for rotation "
+                               "(rotate_at_index_packet_seq_num = %" PRIu64
+                               ", rotate_at_prev_data_net_seq = %" PRIu64
                                ", prev_data_seq = %" PRIu64 ")",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->ongoing_rotation.value.prev_data_net_seq,
                                stream->prev_data_seq);
                goto end;
-       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * prev_data_seq is checked here since indexes and rotation
                 * commands are serialized with respect to each other.
@@ -477,23 +482,38 @@ static int try_rotate_stream_index(struct relay_stream *stream)
                goto end;
        }
 
-       if (stream->prev_index_seq == -1ULL ||
-                       stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
-               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+       if (!stream->received_packet_seq_num.is_set ||
+                       LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+                       stream->ongoing_rotation.value.packet_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation "
+                               "(rotate_at_packet_seq_num = %" PRIu64
+                               ", received_packet_seq_num = "
+                               "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
-                               stream->prev_index_seq);
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->received_packet_seq_num.value,
+                               stream->received_packet_seq_num.is_set);
                goto end;
        } else {
-               /* The next index belongs to the new trace chunk; rotate. */
-               assert(stream->prev_index_seq + 1 ==
-                               stream->ongoing_rotation.value.seq_num);
+               /*
+                * The next index belongs to the new trace chunk; rotate.
+                * In overwrite mode, the packet seq num may jump over the
+                * rotation position.
+                */
+               assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+                               stream->ongoing_rotation.value.packet_seq_num);
                DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
                ret = create_index_file(stream,
                                stream->ongoing_rotation.value.next_trace_chunk);
                stream->ongoing_rotation.value.index_rotated = true;
 
+               /*
+                * Set the rotation pivot position for the data, now that we have the
+                * net_seq_num matching the packet_seq_num index pivot position.
+                */
+               stream->ongoing_rotation.value.prev_data_net_seq =
+                       stream->prev_index_seq;
                if (stream->ongoing_rotation.value.data_rotated &&
                                stream->ongoing_rotation.value.index_rotated) {
                        /* Rotation completed; reset its state. */
@@ -798,7 +818,10 @@ int stream_set_pending_rotation(struct relay_stream *stream,
 {
        int ret = 0;
        const struct relay_stream_rotation rotation = {
-               .seq_num = rotation_sequence_number,
+               .data_rotated = false,
+               .index_rotated = false,
+               .packet_seq_num = rotation_sequence_number,
+               .prev_data_net_seq = -1ULL,
                .next_trace_chunk = next_trace_chunk,
        };
 
@@ -816,7 +839,8 @@ int stream_set_pending_rotation(struct relay_stream *stream,
        }
        LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
 
-       DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+       DBG("Setting pending rotation: stream_id = %" PRIu64
+                       ", rotate_at_packet_seq_num = %" PRIu64,
                        stream->stream_handle, rotation_sequence_number);
        if (stream->is_metadata) {
                /*
@@ -825,12 +849,12 @@ int stream_set_pending_rotation(struct relay_stream *stream,
                stream->ongoing_rotation.value.index_rotated = true;
                ret = stream_rotate_data_file(stream);
        } else {
-               ret = try_rotate_stream_data(stream);
+               ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
 
-               ret = try_rotate_stream_index(stream);
+               ret = try_rotate_stream_data(stream);
                if (ret < 0) {
                        goto end;
                }
@@ -1132,6 +1156,8 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                       be64toh(index->index_data.packet_seq_num));
                *flushed = true;
        } else if (ret > 0) {
                index->total_size = total_size;
@@ -1228,11 +1254,17 @@ int stream_add_index(struct relay_stream *stream,
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info->net_seq_num;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                               index_info->packet_seq_num);
 
                ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
This page took 0.025744 seconds and 5 git commands to generate.