relayd: implement support for clear feature
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 1b4e38ac8fc2b0c9d209b39ac3a559fe9e284f98..4238d3b53ff217e05613e5ed7aab0aef72f11921 100644 (file)
@@ -72,6 +72,11 @@ end:
 static void stream_complete_rotation(struct relay_stream *stream)
 {
        DBG("Rotation completed for stream %" PRIu64, stream->stream_handle);
+       if (stream->ongoing_rotation.value.next_trace_chunk) {
+               tracefile_array_reset(stream->tfa);
+               tracefile_array_commit_seq(stream->tfa,
+                               stream->index_received_seqcount);
+       }
        lttng_trace_chunk_put(stream->trace_chunk);
        stream->trace_chunk = stream->ongoing_rotation.value.next_trace_chunk;
        stream->ongoing_rotation = (typeof(stream->ongoing_rotation)) {};
@@ -90,7 +95,6 @@ static int stream_create_data_output_file_from_trace_chunk(
        const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
        ASSERT_LOCKED(stream->lock);
-       assert(stream->trace_chunk);
 
        ret = utils_stream_file_path(stream->path_name, stream->channel_name,
                        stream->tracefile_size, stream->tracefile_current_index,
@@ -124,7 +128,7 @@ static int stream_create_data_output_file_from_trace_chunk(
        }
 
        status = lttng_trace_chunk_open_file(
-                       trace_chunk, stream_path, flags, mode, &fd);
+                       trace_chunk, stream_path, flags, mode, &fd, false);
        if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ERR("Failed to open stream file \"%s\"", stream->channel_name);
                ret = -1;
@@ -147,8 +151,8 @@ static int stream_rotate_data_file(struct relay_stream *stream)
 {
        int ret = 0;
 
-       DBG("Rotating stream %" PRIu64 " data file",
-                       stream->stream_handle);
+       DBG("Rotating stream %" PRIu64 " data file with size %" PRIu64,
+                       stream->stream_handle, stream->tracefile_size_current);
 
        if (stream->stream_fd) {
                stream_fd_put(stream->stream_fd);
@@ -180,6 +184,8 @@ static int stream_rotate_data_file(struct relay_stream *stream)
                        goto end;
                }
        }
+       DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
        stream->tracefile_size_current = 0;
        stream->pos_after_last_complete_data_index = 0;
        stream->ongoing_rotation.value.data_rotated = true;
@@ -210,7 +216,7 @@ static int rotate_truncate_stream(struct relay_stream *stream)
        struct stream_fd *previous_stream_fd = NULL;
        struct lttng_trace_chunk *previous_chunk = NULL;
 
-       if (!LTTNG_OPTIONAL_GET(&stream->ongoing_rotation)->next_trace_chunk) {
+       if (!LTTNG_OPTIONAL_GET(stream->ongoing_rotation).next_trace_chunk) {
                ERR("Protocol error encoutered in %s(): stream rotation "
                        "sequence number is before the current sequence number "
                        "and the next trace chunk is unset. Honoring this "
@@ -368,19 +374,33 @@ static int try_rotate_stream_data(struct relay_stream *stream)
                goto end;
        }
 
+       DBG("%s: Stream %" PRIu64
+                               " (rotate_at_index_packet_seq_num = %" PRIu64
+                               ", rotate_at_prev_data_net_seq = %" PRIu64
+                               ", prev_data_seq = %" PRIu64 ")",
+                               __func__, stream->stream_handle,
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->ongoing_rotation.value.prev_data_net_seq,
+                               stream->prev_data_seq);
+
        if (stream->prev_data_seq == -1ULL ||
-                       stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+                       stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+                       stream->prev_data_seq <
+                       stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * The next packet that will be written is not part of the next
                 * chunk yet.
                 */
-               DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+               DBG("Stream %" PRIu64 " data not yet ready for rotation "
+                               "(rotate_at_index_packet_seq_num = %" PRIu64
+                               ", rotate_at_prev_data_net_seq = %" PRIu64
                                ", prev_data_seq = %" PRIu64 ")",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->ongoing_rotation.value.prev_data_net_seq,
                                stream->prev_data_seq);
                goto end;
-       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+       } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
                /*
                 * prev_data_seq is checked here since indexes and rotation
                 * commands are serialized with respect to each other.
@@ -441,13 +461,14 @@ static int create_index_file(struct relay_stream *stream,
                ret = -1;
                goto end;
        }
-       stream->index_file = lttng_index_file_create_from_trace_chunk(
+       status = lttng_index_file_create_from_trace_chunk(
                        chunk, stream->path_name,
                        stream->channel_name, stream->tracefile_size,
                        stream->tracefile_current_index,
                        lttng_to_index_major(major, minor),
-                       lttng_to_index_minor(major, minor), true);
-       if (!stream->index_file) {
+                       lttng_to_index_minor(major, minor), true,
+                       &stream->index_file);
+       if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
                ret = -1;
                goto end;
        }
@@ -478,23 +499,49 @@ static int try_rotate_stream_index(struct relay_stream *stream)
                goto end;
        }
 
-       if (stream->prev_index_seq == -1ULL ||
-                       stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
-               DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+       DBG("%s: Stream %" PRIu64
+                       " (rotate_at_packet_seq_num = %" PRIu64
+                       ", received_packet_seq_num = "
+                       "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
+                       __func__, stream->stream_handle,
+                       stream->ongoing_rotation.value.packet_seq_num,
+                       stream->received_packet_seq_num.value,
+                       stream->received_packet_seq_num.is_set);
+
+       if (!stream->received_packet_seq_num.is_set ||
+                       LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+                       stream->ongoing_rotation.value.packet_seq_num) {
+               DBG("Stream %" PRIu64 " index not yet ready for rotation "
+                               "(rotate_at_packet_seq_num = %" PRIu64
+                               ", received_packet_seq_num = "
+                               "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
                                stream->stream_handle,
-                               stream->ongoing_rotation.value.seq_num,
-                               stream->prev_index_seq);
+                               stream->ongoing_rotation.value.packet_seq_num,
+                               stream->received_packet_seq_num.value,
+                               stream->received_packet_seq_num.is_set);
                goto end;
        } else {
-               /* The next index belongs to the new trace chunk; rotate. */
-               assert(stream->prev_index_seq + 1 ==
-                               stream->ongoing_rotation.value.seq_num);
+               /*
+                * The next index belongs to the new trace chunk; rotate.
+                * In overwrite mode, the packet seq num may jump over the
+                * rotation position.
+                */
+               assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+                               stream->ongoing_rotation.value.packet_seq_num);
                DBG("Rotating stream %" PRIu64 " index file",
                                stream->stream_handle);
-               ret = create_index_file(stream,
-                               stream->ongoing_rotation.value.next_trace_chunk);
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+                       stream->index_file = NULL;
+               }
                stream->ongoing_rotation.value.index_rotated = true;
 
+               /*
+                * Set the rotation pivot position for the data, now that we have the
+                * net_seq_num matching the packet_seq_num index pivot position.
+                */
+               stream->ongoing_rotation.value.prev_data_net_seq =
+                       stream->prev_index_seq;
                if (stream->ongoing_rotation.value.data_rotated &&
                                stream->ongoing_rotation.value.index_rotated) {
                        /* Rotation completed; reset its state. */
@@ -644,7 +691,9 @@ end:
                stream_put(stream);
                stream = NULL;
        }
-       lttng_trace_chunk_put(current_trace_chunk);
+       if (acquired_reference) {
+               lttng_trace_chunk_put(current_trace_chunk);
+       }
        return stream;
 
 error_no_alloc:
@@ -797,7 +846,10 @@ int stream_set_pending_rotation(struct relay_stream *stream,
 {
        int ret = 0;
        const struct relay_stream_rotation rotation = {
-               .seq_num = rotation_sequence_number,
+               .data_rotated = false,
+               .index_rotated = false,
+               .packet_seq_num = rotation_sequence_number,
+               .prev_data_net_seq = -1ULL,
                .next_trace_chunk = next_trace_chunk,
        };
 
@@ -815,21 +867,28 @@ int stream_set_pending_rotation(struct relay_stream *stream,
        }
        LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
 
-       DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+       DBG("Setting pending rotation: stream_id = %" PRIu64
+                       ", rotate_at_packet_seq_num = %" PRIu64,
                        stream->stream_handle, rotation_sequence_number);
        if (stream->is_metadata) {
                /*
                 * A metadata stream has no index; consider it already rotated.
                 */
                stream->ongoing_rotation.value.index_rotated = true;
+               if (next_trace_chunk) {
+                       /*
+                        * The metadata will be received again in the new chunk.
+                        */
+                       stream->metadata_received = 0;
+               }
                ret = stream_rotate_data_file(stream);
        } else {
-               ret = try_rotate_stream_data(stream);
+               ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
 
-               ret = try_rotate_stream_index(stream);
+               ret = try_rotate_stream_data(stream);
                if (ret < 0) {
                        goto end;
                }
@@ -921,6 +980,27 @@ void try_stream_close(struct relay_stream *stream)
        stream->closed = true;
        /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
+
+       /*
+        * If we are closed by an application exiting (per-pid buffers),
+        * we need to put our reference on the stream trace chunk right
+        * away, because otherwise still holding the reference on the
+        * trace chunk could allow a viewer stream (which holds a reference
+        * to the stream) to postpone destroy waiting for the chunk to cease
+        * to exist endlessly until the viewer is detached.
+        */
+
+       /* Put stream fd before put chunk. */
+       if (stream->stream_fd) {
+               stream_fd_put(stream->stream_fd);
+               stream->stream_fd = NULL;
+       }
+       if (stream->index_file) {
+               lttng_index_file_put(stream->index_file);
+               stream->index_file = NULL;
+       }
+       lttng_trace_chunk_put(stream->trace_chunk);
+       stream->trace_chunk = NULL;
        pthread_mutex_unlock(&stream->lock);
        DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
@@ -932,6 +1012,14 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
        int ret = 0;
 
        ASSERT_LOCKED(stream->lock);
+
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
+
        if (caa_likely(stream->tracefile_size == 0)) {
                /* No size limit set; nothing to check. */
                goto end;
@@ -956,7 +1044,7 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                                stream->stream_handle,
                                stream->tracefile_size_current, packet_size,
                                stream->tracefile_current_index, new_file_index);
-               tracefile_array_file_rotate(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_WRITE);
                stream->tracefile_current_index = new_file_index;
 
                if (stream->stream_fd) {
@@ -975,6 +1063,8 @@ int stream_init_packet(struct relay_stream *stream, size_t packet_size,
                 * Reset current size because we just performed a stream
                 * rotation.
                 */
+               DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
                stream->tracefile_size_current = 0;
                *file_rotated = true;
        } else {
@@ -997,6 +1087,12 @@ int stream_write(struct relay_stream *stream,
        memset(padding_buffer, 0,
                        min(sizeof(padding_buffer), padding_to_write));
 
+       if (!stream->stream_fd || !stream->trace_chunk) {
+               ERR("Protocol error: received a packet for a stream that doesn't have a current trace chunk: stream_id = %" PRIu64 ", channel_name = %s",
+                               stream->stream_handle, stream->channel_name);
+               ret = -1;
+               goto end;
+       }
        if (packet) {
                write_ret = lttng_write(stream->stream_fd->fd,
                                packet->data, packet->size);
@@ -1026,8 +1122,14 @@ int stream_write(struct relay_stream *stream,
        }
 
        if (stream->is_metadata) {
-               stream->metadata_received += packet ? packet->size : 0;
-               stream->metadata_received += padding_len;
+               size_t recv_len;
+
+               recv_len = packet ? packet->size : 0;
+               recv_len += padding_len;
+               stream->metadata_received += recv_len;
+               if (recv_len) {
+                       stream->no_new_metadata_notified = false;
+               }
        }
 
        DBG("Wrote to %sstream %" PRIu64 ": data_length = %zu, padding_length = %zu",
@@ -1052,6 +1154,7 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
        uint64_t data_offset;
        struct relay_index *index;
 
+       assert(stream->trace_chunk);
        ASSERT_LOCKED(stream->lock);
        /* Get data offset because we are about to update the index. */
        data_offset = htobe64(stream->tracefile_size_current);
@@ -1092,8 +1195,11 @@ int stream_update_index(struct relay_stream *stream, uint64_t net_seq_num,
 
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                       be64toh(index->index_data.packet_seq_num));
                *flushed = true;
        } else if (ret > 0) {
                index->total_size = total_size;
@@ -1146,6 +1252,8 @@ int stream_add_index(struct relay_stream *stream,
 
        ASSERT_LOCKED(stream->lock);
 
+       DBG("stream_add_index for stream %" PRIu64, stream->stream_handle);
+
        /* Live beacon handling */
        if (index_info->packet_size == 0) {
                DBG("Received live beacon for stream %" PRIu64,
@@ -1185,15 +1293,22 @@ int stream_add_index(struct relay_stream *stream,
        }
        ret = relay_index_try_flush(index);
        if (ret == 0) {
-               tracefile_array_commit_seq(stream->tfa);
+               tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
+               tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
                stream->index_received_seqcount++;
                stream->pos_after_last_complete_data_index += index->total_size;
                stream->prev_index_seq = index_info->net_seq_num;
+               LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+                               index_info->packet_seq_num);
 
                ret = try_rotate_stream_index(stream);
                if (ret < 0) {
                        goto end;
                }
+               ret = try_rotate_stream_data(stream);
+               if (ret < 0) {
+                       goto end;
+               }
        } else if (ret > 0) {
                /* no flush. */
                ret = 0;
@@ -1241,6 +1356,8 @@ int stream_reset_file(struct relay_stream *stream)
                stream->stream_fd = NULL;
        }
 
+       DBG("%s: reset tracefile_size_current for stream %" PRIu64 " was %" PRIu64,
+                       __func__, stream->stream_handle, stream->tracefile_size_current);
        stream->tracefile_size_current = 0;
        stream->prev_data_seq = 0;
        stream->prev_index_seq = 0;
This page took 0.030202 seconds and 5 git commands to generate.