relayd: Implement custom EfficiOS session clear
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index f989bfd1361be36bd20d4edfa018f36b11de062b..1218c2f4534f6b9537fbdc4f55b3bac4e046437c 100644 (file)
@@ -91,6 +91,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
+       stream->prev_index_seq = -1ULL;
        stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
@@ -477,3 +478,212 @@ void print_relay_streams(void)
        }
        rcu_read_unlock();
 }
+
+static int relay_unlink_stream_files_rotation(struct relay_stream *stream)
+{
+       uint64_t tracefile_size = stream->tracefile_size;
+       uint64_t tracefile_count = stream->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
+
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+                               tracefile_size, count, -1, -1, NULL);
+               if (ret < 0 && errno != ENOENT) {
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+static int relay_unlink_index_files_rotation(struct relay_stream *stream)
+{
+       uint64_t tracefile_size = stream->tracefile_size;
+       uint64_t tracefile_count = stream->tracefile_count;
+       uint64_t count;
+       int ret;
+
+       /*
+        * If the channel is configured to have an open-ended number of tracefiles,
+        * use the current tracefile count number as upper-bound.
+        */
+       if (!tracefile_count) {
+               tracefile_count = stream->tracefile_count_current + 1;
+       }
+
+       /*
+        * Try to unlink each file and each index for this stream. They may not exist,
+        * in which case ENOENT is fine.
+        */
+       for (count = 0; count < tracefile_count; count++) {
+               if (stream->index_file) {
+                       ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+                                       -1, -1, tracefile_size, count);
+                       if (ret < 0 && errno != ENOENT) {
+                               return -1;
+                       }
+               }
+       }
+       return 0;
+}
+
+static int relay_unlink_stream_files(struct relay_stream *stream)
+{
+       int ret;
+
+       ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+                       stream->tracefile_size, 0, -1, -1, NULL);
+       if (ret < 0 && errno != ENOENT) {
+               return -1;
+       }
+       return 0;
+}
+
+static int relay_unlink_index_files(struct relay_stream *stream)
+{
+       int ret;
+
+       ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+                       -1, -1, stream->tracefile_size, 0);
+       if (ret < 0 && errno != ENOENT) {
+               return -1;
+       }
+       return 0;
+}
+
+int try_stream_clear_index_data(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       DBG("try stream clear for handle %" PRIu64 " recv %" PRIu64 " clear_pos_idx %" PRIu64 " clear_pos_data %" PRIu64,
+                       stream->stream_handle, stream->index_received_seqcount, stream->clear_position_index_seqcount,
+                       stream->clear_position_data_seqcount);
+       if (!stream->index_received_seqcount) {
+               return 0;
+       }
+       if (stream->index_received_seqcount <= stream->clear_position_index_seqcount) {
+               /*
+                * Put ref on current index file. The new index file will be created upon
+                * reception of next index data beyond the clear position.
+                */
+               if (stream->index_file) {
+                       lttng_index_file_put(stream->index_file);
+                       if (stream->tracefile_size > 0) {
+                               ret = relay_unlink_index_files_rotation(stream);
+                       } else {
+                               ret = relay_unlink_index_files(stream);
+                       }
+                       if (ret) {
+                               return ret;
+                       }
+                       stream->index_file = NULL;
+               }
+               tracefile_array_reset(stream->tfa);
+       }
+       if (stream->index_received_seqcount == stream->clear_position_data_seqcount) {
+               ret = close(stream->stream_fd->fd);
+               if (ret < 0) {
+                       PERROR("Closing tracefile");
+                       return -1;
+               }
+               stream->stream_fd->fd = -1;
+               stream->tracefile_size_current = 0;
+
+               if (stream->tracefile_size > 0) {
+                       ret = relay_unlink_stream_files_rotation(stream);
+               } else {
+                       ret = relay_unlink_stream_files(stream);
+               }
+
+               /* Create new files. */
+               ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+                               stream->tracefile_size, 0, -1, -1, NULL);
+               if (ret < 0) {
+                       ERR("Create output file");
+                       return -1;
+               }
+               stream->stream_fd->fd = ret;
+       }
+       return 0;
+}
+
+int stream_clear(struct relay_stream *stream)
+{
+       int ret = 0;
+
+       pthread_mutex_lock(&stream->lock);
+
+       if (stream->is_metadata) {
+               /* Do not clear metadata streams. */
+               goto end;
+       }
+
+       /*
+        * Clear index and data for all packets up to and including the
+        * clear position index seqcount.
+        *
+        * Clearing the index is straightforward: we can remove the entire
+        * on-disk index for this stream because the control port is an ordered
+        * protocol. We may also have in-flight indexes within the indexes_ht
+        * (pending data reception). We need to mark those so they get
+        * discarded (as well as their associated data content) upon reception
+        * of matching data.
+        *
+        * Clearing the data: because data is written directly into the output files,
+        * we need to carefully handle cases where index or data positions are ahead
+        * of the other.
+        *
+        * In tracefile rotation mode, we need to move the seq_tail to the head
+        * position.
+        */
+
+       /*
+        * If the data received is beyond indexes received, unlink data immediately and
+        * discard indexes when they arrive (up to the clear position).
+        *
+        * If indexes received is beyond data, we will reach the sync point when the
+        * indexes are received, so it will be safe to unlink the data and index files
+        * at that point.
+        *
+        * Clear index and data file(s) immediately if reaching the clear
+        * position (no in-flight indexes).
+        */
+       DBG("stream clear for handle %" PRIu64 " prev_seq %" PRIu64 " prev_index_seq %" PRIu64 " indexes in flight %d",
+                       stream->stream_handle, stream->prev_seq, stream->prev_index_seq,
+                       stream->indexes_in_flight);
+       if (stream->prev_seq > stream->prev_index_seq) {
+               stream->clear_position_data_seqcount = stream->index_received_seqcount;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+       } else if (stream->prev_seq < stream->prev_index_seq) {
+               stream->clear_position_data_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount +
+                                                       stream->indexes_in_flight;
+       } else {
+               assert(stream->indexes_in_flight == 0);
+               stream->clear_position_data_seqcount = stream->index_received_seqcount;
+               stream->clear_position_index_seqcount = stream->index_received_seqcount;
+
+       }
+       ret = try_stream_clear_index_data(stream);
+       if (ret) {
+               goto end;
+       }
+
+end:
+       pthread_mutex_unlock(&stream->lock);
+       return ret;
+}
This page took 0.028309 seconds and 5 git commands to generate.