rcu_read_unlock();
return vstream;
}
+
+int ctf_trace_clear(struct ctf_trace *trace)
+{
+ struct relay_stream *stream;
+ int ret = 0;
+
+ rcu_read_lock();
+ cds_list_for_each_entry_rcu(stream, &trace->stream_list,
+ stream_node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ ret = stream_clear(stream);
+ stream_put(stream);
+ if (ret) {
+ goto unlock;
+ }
+ }
+unlock:
+ rcu_read_unlock();
+ return ret;
+}
struct relay_viewer_stream *ctf_trace_get_viewer_metadata_stream(struct ctf_trace *trace);
+int ctf_trace_clear(struct ctf_trace *trace);
+
#endif /* _CTF_TRACE_H */
}
/*
- * First time, we open the index file and at least one index is ready.
+ * Deal with streams that have not received any index so far (or after clear).
*/
- if (rstream->index_received_seqcount == 0) {
+ if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) {
ret = -ENOENT;
goto end;
}
{
int ret;
+ DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64,
+ rstream->index_received_seqcount,
+ vstream->index_sent_seqcount,
+ rstream->clear_position_index_seqcount,
+ rstream->clear_position_data_seqcount, vstream->stream->stream_handle);
if ((trace->session->connection_closed || rstream->closed)
&& rstream->index_received_seqcount
== vstream->index_sent_seqcount) {
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ <= vstream->index_sent_seqcount) {
/*
* We've received a synchronization beacon and the last index
* available has been sent, the index for now is inactive.
* inform the client of a time interval during which we can
* guarantee that there are no events to read (and never will
* be).
+ *
+ * The sent seqcount can grow higher than receive seqcount on clear.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
index->stream_id = htobe64(rstream->ctf_stream_id);
+ DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
} else if (rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ <= vstream->index_sent_seqcount) {
/*
- * This checks whether received == sent seqcount. In
+ * This checks whether received <= sent seqcount. In
* this case, we have not received a beacon. Therefore,
* we can only ask the client to retry later.
+ *
+ * The sent seqcount can grow higher than receive seqcount on clear.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle);
+ goto index_ready;
+ } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount ||
+ rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) {
+ /*
+ * Send "retry" reply if a clear operation is in progress.
+ */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
} else if (!tracefile_array_seq_in_file(rstream->tfa,
vstream->current_tracefile_id,
vstream->current_tracefile_id,
vstream->index_sent_seqcount)) {
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
}
assert(tracefile_array_seq_in_file(rstream->tfa,
goto send_reply;
}
+ /*
+ * In case the stream has been cleared, we need to push the viewer
+ * stream index sent seqcount forward. Note that this can temporarily
+ * bring the index_sent position beyond the index received position.
+ */
+ if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) {
+ vstream->index_sent_seqcount = rstream->clear_position_index_seqcount;
+ /*
+ * Close the currently open index and data files to ensure we
+ * sync up with the receive side.
+ */
+ if (vstream->index_file) {
+ lttng_index_file_put(vstream->index_file);
+ vstream->index_file = NULL;
+ }
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
+ }
+ /*
+ * In tracefile rotation, we reset the current tracefile to 0.
+ */
+ vstream->current_tracefile_id = 0;
+ }
+
/* Try to open an index if one is needed for that stream. */
ret = try_open_index(vstream, rstream);
if (ret < 0) {
/*
* The index is created only when the first data
* packet arrives, it might not be ready at the
- * beginning of the session
+ * beginning of the session. Let check_index_status
+ * deal with inactivity beacons.
*/
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto check_status;
} else {
/* Unhandled error. */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
+check_status:
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
if (ret < 0) {
goto error_put;
return ret;
}
+/*
+ * relay_clear_session: clear all data files belonging to a session.
+ */
+static
+int relay_clear_session(const struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret;
+ ssize_t send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_generic_reply reply;
+
+ DBG("Clear session received");
+
+ if (!session || !conn->version_check_done) {
+ ERR("Trying to clear session before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (!opt_allow_clear) {
+ ERR("Trying to clear session, but clear is disallowed.");
+ ret = -1;
+ goto end_no_session;
+ }
+ ret = session_clear(session);
+
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < (ssize_t) sizeof(reply)) {
+ ERR("Failed to send \"clear session\" command reply (ret = %zd)",
+ send_ret);
+ ret = -1;
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_unknown_command: send -1 if received unknown command
*/
ret = 0;
goto end_stream_put;
} else {
+ DBG("Received index for stream %" PRIu64,
+ stream->stream_handle);
stream->beacon_ts_end = -1ULL;
}
}
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end_stream_put;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
ERR("relay_index_try_flush error %d", ret);
ret = -1;
}
+ stream->prev_index_seq = net_seq_num;
end_stream_put:
pthread_mutex_unlock(&stream->lock);
case RELAYD_RESET_METADATA:
ret = relay_reset_metadata(recv_hdr, conn);
break;
+ case RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS:
+ ret = relay_clear_session(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
ret = relay_index_try_flush(index);
if (ret == 0) {
- tracefile_array_commit_seq(stream->tfa);
+ tracefile_array_commit_seq(stream->tfa, stream->index_received_seqcount);
stream->index_received_seqcount++;
+ /* Clear index and data file(s) if reaching the clear position. */
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end;
+ }
} else if (ret > 0) {
/* No flush. */
ret = 0;
net_seq_num = be64toh(data_hdr.net_seq_num);
- DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ DBG("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
data_size, stream_id, net_seq_num);
pthread_mutex_lock(&stream->lock);
* rotation.
*/
stream->tracefile_size_current = 0;
+ stream->tracefile_count_current = new_id;
rotate_index = 1;
}
}
rcu_read_unlock();
}
+
+int session_clear(struct relay_session *session)
+{
+ struct ctf_trace *trace;
+ struct lttng_ht_iter iter;
+ int ret = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(session->ctf_traces_ht->ht,
+ &iter.iter, trace, node.node) {
+ ret = ctf_trace_clear(trace);
+ if (ret) {
+ goto rcu_unlock;
+ }
+ }
+rcu_unlock:
+ rcu_read_unlock();
+ return ret;
+}
void print_sessions(void);
+int session_clear(struct relay_session *session);
+
#endif /* _SESSION_H */
stream->stream_handle = stream_handle;
stream->prev_seq = -1ULL;
+ stream->prev_index_seq = -1ULL;
stream->last_net_seq_num = -1ULL;
stream->ctf_stream_id = -1ULL;
stream->tracefile_size = tracefile_size;
}
rcu_read_unlock();
}
+
+static int relay_unlink_stream_files_rotation(struct relay_stream *stream)
+{
+ uint64_t tracefile_size = stream->tracefile_size;
+ uint64_t tracefile_count = stream->tracefile_count;
+ uint64_t count;
+ int ret;
+
+ /*
+ * If the channel is configured to have an open-ended number of tracefiles,
+ * use the current tracefile count number as upper-bound.
+ */
+ if (!tracefile_count) {
+ tracefile_count = stream->tracefile_count_current + 1;
+ }
+
+ /*
+ * Try to unlink each file and each index for this stream. They may not exist,
+ * in which case ENOENT is fine.
+ */
+ for (count = 0; count < tracefile_count; count++) {
+ ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+ tracefile_size, count, -1, -1, NULL);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
+static int relay_unlink_index_files_rotation(struct relay_stream *stream)
+{
+ uint64_t tracefile_size = stream->tracefile_size;
+ uint64_t tracefile_count = stream->tracefile_count;
+ uint64_t count;
+ int ret;
+
+ /*
+ * If the channel is configured to have an open-ended number of tracefiles,
+ * use the current tracefile count number as upper-bound.
+ */
+ if (!tracefile_count) {
+ tracefile_count = stream->tracefile_count_current + 1;
+ }
+
+ /*
+ * Try to unlink each file and each index for this stream. They may not exist,
+ * in which case ENOENT is fine.
+ */
+ for (count = 0; count < tracefile_count; count++) {
+ if (stream->index_file) {
+ ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+ -1, -1, tracefile_size, count);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
+static int relay_unlink_stream_files(struct relay_stream *stream)
+{
+ int ret;
+
+ ret = utils_unlink_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1, NULL);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ return 0;
+}
+
+static int relay_unlink_index_files(struct relay_stream *stream)
+{
+ int ret;
+
+ ret = lttng_index_file_unlink(stream->path_name, stream->channel_name,
+ -1, -1, stream->tracefile_size, 0);
+ if (ret < 0 && errno != ENOENT) {
+ return -1;
+ }
+ return 0;
+}
+
+int try_stream_clear_index_data(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ DBG("try stream clear for handle %" PRIu64 " recv %" PRIu64 " clear_pos_idx %" PRIu64 " clear_pos_data %" PRIu64,
+ stream->stream_handle, stream->index_received_seqcount, stream->clear_position_index_seqcount,
+ stream->clear_position_data_seqcount);
+ if (!stream->index_received_seqcount) {
+ return 0;
+ }
+ if (stream->index_received_seqcount <= stream->clear_position_index_seqcount) {
+ /*
+ * Put ref on current index file. The new index file will be created upon
+ * reception of next index data beyond the clear position.
+ */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ if (stream->tracefile_size > 0) {
+ ret = relay_unlink_index_files_rotation(stream);
+ } else {
+ ret = relay_unlink_index_files(stream);
+ }
+ if (ret) {
+ return ret;
+ }
+ stream->index_file = NULL;
+ }
+ tracefile_array_reset(stream->tfa);
+ }
+ if (stream->index_received_seqcount == stream->clear_position_data_seqcount) {
+ ret = close(stream->stream_fd->fd);
+ if (ret < 0) {
+ PERROR("Closing tracefile");
+ return -1;
+ }
+ stream->stream_fd->fd = -1;
+ stream->tracefile_size_current = 0;
+
+ if (stream->tracefile_size > 0) {
+ ret = relay_unlink_stream_files_rotation(stream);
+ } else {
+ ret = relay_unlink_stream_files(stream);
+ }
+
+ /* Create new files. */
+ ret = utils_create_stream_file(stream->path_name, stream->channel_name,
+ stream->tracefile_size, 0, -1, -1, NULL);
+ if (ret < 0) {
+ ERR("Create output file");
+ return -1;
+ }
+ stream->stream_fd->fd = ret;
+ }
+ return 0;
+}
+
+int stream_clear(struct relay_stream *stream)
+{
+ int ret = 0;
+
+ pthread_mutex_lock(&stream->lock);
+
+ if (stream->is_metadata) {
+ /* Do not clear metadata streams. */
+ goto end;
+ }
+
+ /*
+ * Clear index and data for all packets up to and including the
+ * clear position index seqcount.
+ *
+ * Clearing the index is straightforward: we can remove the entire
+ * on-disk index for this stream because the control port is an ordered
+ * protocol. We may also have in-flight indexes within the indexes_ht
+ * (pending data reception). We need to mark those so they get
+ * discarded (as well as their associated data content) upon reception
+ * of matching data.
+ *
+ * Clearing the data: because data is written directly into the output files,
+ * we need to carefully handle cases where index or data positions are ahead
+ * of the other.
+ *
+ * In tracefile rotation mode, we need to move the seq_tail to the head
+ * position.
+ */
+
+ /*
+ * If the data received is beyond indexes received, unlink data immediately and
+ * discard indexes when they arrive (up to the clear position).
+ *
+ * If indexes received is beyond data, we will reach the sync point when the
+ * indexes are received, so it will be safe to unlink the data and index files
+ * at that point.
+ *
+ * Clear index and data file(s) immediately if reaching the clear
+ * position (no in-flight indexes).
+ */
+ DBG("stream clear for handle %" PRIu64 " prev_seq %" PRIu64 " prev_index_seq %" PRIu64 " indexes in flight %d",
+ stream->stream_handle, stream->prev_seq, stream->prev_index_seq,
+ stream->indexes_in_flight);
+ if (stream->prev_seq > stream->prev_index_seq) {
+ stream->clear_position_data_seqcount = stream->index_received_seqcount;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ } else if (stream->prev_seq < stream->prev_index_seq) {
+ stream->clear_position_data_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount +
+ stream->indexes_in_flight;
+ } else {
+ assert(stream->indexes_in_flight == 0);
+ stream->clear_position_data_seqcount = stream->index_received_seqcount;
+ stream->clear_position_index_seqcount = stream->index_received_seqcount;
+
+ }
+ ret = try_stream_clear_index_data(stream);
+ if (ret) {
+ goto end;
+ }
+
+end:
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
+}
*/
pthread_mutex_t lock;
uint64_t prev_seq; /* previous data sequence number encountered. */
+ uint64_t prev_index_seq; /* previous index sequence number encountered. */
uint64_t last_net_seq_num; /* seq num to encounter before closing. */
/* FD on which to write the stream data. */
uint64_t tracefile_size;
uint64_t tracefile_size_current;
uint64_t tracefile_count;
+ uint64_t tracefile_count_current;
/*
* Counts the number of received indexes. The "tag" associated
*/
uint64_t index_received_seqcount;
+
+ /*
+ * Index sequence number for the position at which clear needs to be
+ * performed.
+ */
+ uint64_t clear_position_index_seqcount;
+ uint64_t clear_position_data_seqcount;
+
/*
* Tracefile array is an index of the stream trace files,
* indexed by position. It allows keeping track of the oldest
void try_stream_close(struct relay_stream *stream);
void stream_publish(struct relay_stream *stream);
void print_relay_streams(void);
+int stream_clear(struct relay_stream *stream);
+int try_stream_clear_index_data(struct relay_stream *stream);
#endif /* _STREAM_H */
free(tfa);
}
+void tracefile_array_reset(struct tracefile_array *tfa)
+{
+ size_t count, i;
+
+ count = tfa->count;
+ for (i = 0; i < count; i++) {
+ tfa->tf[i].seq_head = -1ULL;
+ tfa->tf[i].seq_tail = -1ULL;
+ }
+ tfa->seq_head = -1ULL;
+ tfa->seq_tail = -1ULL;
+ tfa->file_head = 0;
+ tfa->file_tail = 0;
+}
+
void tracefile_array_file_rotate(struct tracefile_array *tfa)
{
uint64_t *headp, *tailp;
*tailp = -1ULL;
}
-void tracefile_array_commit_seq(struct tracefile_array *tfa)
+void tracefile_array_commit_seq(struct tracefile_array *tfa,
+ uint64_t new_seq_head)
{
uint64_t *headp, *tailp;
/* Increment overall head. */
- tfa->seq_head++;
- /* If we are committing our first index overall, set tail to 0. */
+ tfa->seq_head = new_seq_head;
+ /* If we are committing our first index overall, set tail to head. */
if (tfa->seq_tail == -1ULL) {
- tfa->seq_tail = 0;
+ tfa->seq_tail = new_seq_head;
}
if (!tfa->count) {
/* Not in tracefile rotation mode. */
*/
return true;
}
- assert(file_index < tfa->count);
if (seq == -1ULL) {
return false;
}
void tracefile_array_destroy(struct tracefile_array *tfa);
void tracefile_array_file_rotate(struct tracefile_array *tfa);
-void tracefile_array_commit_seq(struct tracefile_array *tfa);
+void tracefile_array_commit_seq(struct tracefile_array *tfa,
+ uint64_t new_seq_head);
+void tracefile_array_reset(struct tracefile_array *tfa);
uint64_t tracefile_array_get_file_index_head(struct tracefile_array *tfa);
/* May return -1ULL in the case where we have not received any indexes yet. */
* If we never received an index for the current stream, delay
* the opening of the index, otherwise open it right now.
*/
- if (stream->index_received_seqcount == 0) {
+ if (stream->index_file == NULL) {
vstream->index_file = NULL;
} else {
vstream->index_file = lttng_index_file_open(vstream->path_name,
RELAYD_STREAMS_SENT = 16,
/* Ask the relay to reset the metadata trace file (2.8+) */
RELAYD_RESET_METADATA = 17,
+
+ /* Feature branch specific commands start at 10000. */
+
+ /* Ask the relay to clear files belonging to a session (feature branch). */
+ RELAYD_CLEAR_SESSION_CUSTOM_EFFICIOS= 10000,
};
/*