relayd: implement support for clear feature
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 427d30a00602c7448223e2c58b27f4eab9b9bf58..4238d3b53ff217e05613e5ed7aab0aef72f11921 100644 (file)
@@ -72,6 +72,11 @@ end:
 static void stream_complete_rotation(struct relay_stream *stream)
 {
        DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+       if (stream->ongoing_rotation.value.next_trace_chunk) {
+               tracefile_array_reset(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa,
+                               stream->index_received_seqcount);
+       }
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
        stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
@@ -123,7 +128,7 @@ static int stream_create_data_output_file_from_trace_chunk(
        }
 
        status = lttng_trace_chunk_open_file(
-                       trace_chunk, stream_path, flags, mode, &fd);
+                       trace_chunk, stream_path, flags, mode, &fd, false);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
@@ -146,8 +151,8 @@ static int stream_rotate_data_file(struct relay_stream *stream)
 {
        int ret = 0;
 
-       DBG("Rotating stream %" PRIu64 " data file",
-                       stream->stream_handle);
+       DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+                       stream->stream_handle, stream->tracefile_size_current);
 
        if (stream->stream_fd) {
                stream_fd_put(stream->stream_fd);
@@ -179,6 +184,8 @@ static int stream_rotate_data_file(struct relay_stream *stream)
                        goto end;
                }
        }
+       DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
        stream->tracefile_size_current = 0;
        stream->pos_after_last_complete_data_index = 0;
        stream->ongoing_rotation.value.data_rotated = true;
@@ -367,6 +374,15 @@ static int try_rotate_stream_data(struct relay_stream *stream)
                goto end;
        }
 
+       DBG("%s: Stream %" PRIu64
+                               " (rotate_at_index_packet_seq_num = %" PRIu64
+                               ", rotate_at_prev_data_net_seq = %" PRIu64
+                               ", prev_data_seq = %" PRIu64 ")",
+                               __func__, stream->stream_handle,
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->ongoing_rotation.value.prev_data_net_seq,
+                               stream->prev_data_seq);
+
        if (stream->prev_data_seq == -1ULL ||
                        stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
                        stream->prev_data_seq <
@@ -445,13 +461,14 @@ static int create_index_file(struct relay_stream *stream,
                ret = -1;
                goto end;
        }
-       stream->index_file = lttng_index_file_create_from_trace_chunk(
+       status = lttng_index_file_create_from_trace_chunk(
                        chunk, stream->path_name,
                        stream->channel_name, stream->tracefile_size,
                        stream->tracefile_current_index,
                        lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor), true);
-       if (!stream->index_file) {
+                       lttng_to_index_minor(major, minor), true,
+                       &stream->index_file);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
                goto end;
        }
@@ -482,6 +499,15 @@ static int try_rotate_stream_index(struct relay_stream *stream)
                goto end;
        }
 
+       DBG("%s: Stream %" PRIu64
+                       " (rotate_at_packet_seq_num = %" PRIu64
+                       ", received_packet_seq_num = "
+                       "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+                       __func__, stream->stream_handle,
+                       stream->ongoing_rotation.value.packet_seq_num,
+                       stream->received_packet_seq_num.value,
+                       stream->received_packet_seq_num.is_set);
+
        if (!stream->received_packet_seq_num.is_set ||
                        LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
                        stream->ongoing_rotation.value.packet_seq_num) {
@@ -504,8 +530,10 @@ static int try_rotate_stream_index(struct relay_stream *stream)
                                stream->ongoing_rotation.value.packet_seq_num);
                DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
-               ret = create_index_file(stream,
-                               stream->ongoing_rotation.value.next_trace_chunk);
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+                       stream->index_file = NULL;
+               }
                stream->ongoing_rotation.value.index_rotated = true;
 
                /*
@@ -847,6 +875,12 @@ int stream_set_pending_rotation(struct relay_stream *stream,
                 * A metadata stream has no index; consider it already rotated.
                 */
                stream->ongoing_rotation.value.index_rotated = true;
+               if (next_trace_chunk) {
+                       /*
+                        * The metadata will be received again in the new chunk.
+                        */
+                       stream->metadata_received = 0;
+               }
                ret = stream_rotate_data_file(stream);
        } else {
                ret = try_rotate_stream_index(stream);
@@ -1029,6 +1063,8 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                 * Reset current size because we just performed a stream
                 * rotation.
                 */
+               DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
                stream->tracefile_size_current = 0;
                *file_rotated = true;
        } else {
@@ -1160,7 +1196,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        ret = relay_index_try_flush(index);
        if (ret == 0) {
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
                LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
                        be64toh(index->index_data.packet_seq_num));
@@ -1216,6 +1252,8 @@ int stream_add_index(struct relay_stream *stream,
 
        ASSERT_LOCKED(stream->lock);
 
+       DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
        /* Live beacon handling */
        if (index_info->packet_size == 0) {
                DBG("Received live beacon for stream %" PRIu64,
@@ -1256,7 +1294,7 @@ int stream_add_index(struct relay_stream *stream,
        ret = relay_index_try_flush(index);
        if (ret == 0) {
                tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info->net_seq_num;
@@ -1318,6 +1356,8 @@ int stream_reset_file(struct relay_stream *stream)
                stream->stream_fd = NULL;
        }
 
+       DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
        stream->tracefile_size_current = 0;
        stream->prev_data_seq = 0;
        stream->prev_index_seq = 0;
This page took 0.028815 seconds and 5 git commands to generate.