stream->stream_handle = stream_handle;
stream->prev_seq = -1ULL;
+ stream->prev_index_seq = -1ULL;
stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
}
rcu_read_unlock();
}
+
+static int relay_unlink_stream_files_rotation(struct relay_stream *stream)
+{
+ uint64_t tracefile_size = stream->tracefile_size;
+ uint64_t tracefile_count = stream->tracefile_count;
+ uint64_t count;
+ int ret;
+
+ /*
+ * If the channel is configured to have an open-ended number of tracefiles,
+ * use the current tracefile count number as upper-bound.
+ */
+ if (!tracefile_count) {
+ tracefile_count = stream->tracefile_count_current + 1;
+ }
+
+ /*
+ * Try to unlink each file and each index for this stream. They may not exist,
+ * in which case ENOENT is fine.
+ */
+ for (count = 0; count < tracefile_count; count++) {
+ ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+ tracefile_size, count, -1, -1, NULL);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int relay_unlink_index_files_rotation(struct relay_stream *stream)
+{
+ uint64_t tracefile_size = stream->tracefile_size;
+ uint64_t tracefile_count = stream->tracefile_count;
+ uint64_t count;
+ int ret;
+
+ /*
+ * If the channel is configured to have an open-ended number of tracefiles,
+ * use the current tracefile count number as upper-bound.
+ */
+ if (!tracefile_count) {
+ tracefile_count = stream->tracefile_count_current + 1;
+ }
+
+ /*
+ * Try to unlink each file and each index for this stream. They may not exist,
+ * in which case ENOENT is fine.
+ */
+ for (count = 0; count < tracefile_count; count++) {
+ if (stream->index_file) {
+ ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+ -1, -1, tracefile_size, count);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
+static int relay_unlink_stream_files(struct relay_stream *stream)
+{
+ int ret;
+
+ ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1, NULL);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ return 0;
+}
+
+static int relay_unlink_index_files(struct relay_stream *stream)
+{
+ int ret;
+
+ ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+ -1, -1, stream->tracefile_size, 0);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ return 0;
+}
+
+int try_stream_clear_index_data(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ DBG("try stream clear for handle %" PRIu64 " recv %" PRIu64 " clear_pos_idx %" PRIu64 " clear_pos_data %" PRIu64,
+ stream->stream_handle, stream->index_received_seqcount, stream->clear_position_index_seqcount,
+ stream->clear_position_data_seqcount);
+ if (!stream->index_received_seqcount) {
+ return 0;
+ }
+ if (stream->index_received_seqcount <= stream->clear_position_index_seqcount) {
+ /*
+ * Put ref on current index file. The new index file will be created upon
+ * reception of next index data beyond the clear position.
+ */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ if (stream->tracefile_size > 0) {
+ ret = relay_unlink_index_files_rotation(stream);
+ } else {
+ ret = relay_unlink_index_files(stream);
+ }
+ if (ret) {
+ return ret;
+ }
+ stream->index_file = NULL;
+ }
+ tracefile_array_reset(stream->tfa);
+ }
+ if (stream->index_received_seqcount == stream->clear_position_data_seqcount) {
+ ret = close(stream->stream_fd->fd);
+ if (ret < 0) {
+ PERROR("Closing tracefile");
+ return -1;
+ }
+ stream->stream_fd->fd = -1;
+ stream->tracefile_size_current = 0;
+
+ if (stream->tracefile_size > 0) {
+ ret = relay_unlink_stream_files_rotation(stream);
+ } else {
+ ret = relay_unlink_stream_files(stream);
+ }
+
+ /* Create new files. */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1, NULL);
+ if (ret < 0) {
+ ERR("Create output file");
+ return -1;
+ }
+ stream->stream_fd->fd = ret;
+ }
+ return 0;
+}
+
+int stream_clear(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->is_metadata) {
+ /* Do not clear metadata streams. */
+ goto end;
+ }
+
+ /*
+ * Clear index and data for all packets up to and including the
+ * clear position index seqcount.
+ *
+ * Clearing the index is straightforward: we can remove the entire
+ * on-disk index for this stream because the control port is an ordered
+ * protocol. We may also have in-flight indexes within the indexes_ht
+ * (pending data reception). We need to mark those so they get
+ * discarded (as well as their associated data content) upon reception
+ * of matching data.
+ *
+ * Clearing the data: because data is written directly into the output files,
+ * we need to carefully handle cases where index or data positions are ahead
+ * of the other.
+ *
+ * In tracefile rotation mode, we need to move the seq_tail to the head
+ * position.
+ */
+
+ /*
+ * If the data received is beyond indexes received, unlink data immediately and
+ * discard indexes when they arrive (up to the clear position).
+ *
+ * If indexes received is beyond data, we will reach the sync point when the
+ * indexes are received, so it will be safe to unlink the data and index files
+ * at that point.
+ *
+ * Clear index and data file(s) immediately if reaching the clear
+ * position (no in-flight indexes).
+ */
+ DBG("stream clear for handle %" PRIu64 " prev_seq %" PRIu64 " prev_index_seq %" PRIu64 " indexes in flight %d",
+ stream->stream_handle, stream->prev_seq, stream->prev_index_seq,
+ stream->indexes_in_flight);
+ if (stream->prev_seq > stream->prev_index_seq) {
+ stream->clear_position_data_seqcount = stream->index_received_seqcount;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ } else if (stream->prev_seq < stream->prev_index_seq) {
+ stream->clear_position_data_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ } else {
+ assert(stream->indexes_in_flight == 0);
+ stream->clear_position_data_seqcount = stream->index_received_seqcount;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount;
+
+ }
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end;
+ }
+
+end:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}