Fix: relayd: tracefile rotation: viewer opening missing index file
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index f98b465572ec880d4730224da7766fe4ce7fc040..4d3d37a2bc91d745b8e6ee83935aa623f27b3db9 100644 (file)
@@ -210,6 +210,16 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        struct stream_fd *previous_stream_fd = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
+       if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+               ERR("Protocol error encoutered in %s(): stream rotation "
+                       "sequence number is before the current sequence number "
+                       "and the next trace chunk is unset. Honoring this "
+                       "rotation command would result in data loss",
+                               __FUNCTION__);
+               ret = -1;
+               goto end;
+       }
+
        ASSERT_LOCKED(stream->lock);
        /*
         * Acquire a reference to the current trace chunk to ensure
@@ -245,6 +255,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
                goto end;
        }
 
+       assert(stream->stream_fd);
        /*
         * Seek the current tracefile to the position at which the rotation
         * should have occurred.
@@ -633,7 +644,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -945,7 +958,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -1041,6 +1054,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1081,6 +1095,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                *flushed = true;
@@ -1122,9 +1137,7 @@ int stream_complete_packet(struct relay_stream *stream, size_t packet_total_size
 
        stream->prev_data_seq = sequence_number;
        ret = try_rotate_stream_data(stream);
-       if (ret < 0) {
-               goto end;
-       }
+
 end:
        return ret;
 }
@@ -1176,6 +1189,7 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
                tracefile_array_commit_seq(stream->tfa);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
This page took 0.025785 seconds and 5 git commands to generate.