}
/*
- * First time, we open the index file and at least one index is ready.
+ * Deal with streams that have not received any index so far (or after clear).
*/
- if (rstream->index_received_seqcount == 0) {
+ if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) {
ret = -ENOENT;
goto end;
}
{
int ret;
+ DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64,
+ rstream->index_received_seqcount,
+ vstream->index_sent_seqcount,
+ rstream->clear_position_index_seqcount,
+ rstream->clear_position_data_seqcount, vstream->stream->stream_handle);
if ((trace->session->connection_closed || rstream->closed)
&& rstream->index_received_seqcount
== vstream->index_sent_seqcount) {
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ <= vstream->index_sent_seqcount) {
/*
* We've received a synchronization beacon and the last index
* available has been sent, the index for now is inactive.
* inform the client of a time interval during which we can
* guarantee that there are no events to read (and never will
* be).
+ *
+ * The sent seqcount can grow higher than receive seqcount on clear.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
index->timestamp_end = htobe64(rstream->beacon_ts_end);
index->stream_id = htobe64(rstream->ctf_stream_id);
+ DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
} else if (rstream->index_received_seqcount
- == vstream->index_sent_seqcount) {
+ <= vstream->index_sent_seqcount) {
/*
- * This checks whether received == sent seqcount. In
+ * This checks whether received <= sent seqcount. In
* this case, we have not received a beacon. Therefore,
* we can only ask the client to retry later.
+ *
+ * The sent seqcount can grow higher than receive seqcount on clear.
*/
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle);
+ goto index_ready;
+ } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount ||
+ rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) {
+ /*
+ * Send "retry" reply if a clear operation is in progress.
+ */
+ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
} else if (!tracefile_array_seq_in_file(rstream->tfa,
vstream->current_tracefile_id,
vstream->current_tracefile_id,
vstream->index_sent_seqcount)) {
index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle);
goto index_ready;
}
assert(tracefile_array_seq_in_file(rstream->tfa,
goto send_reply;
}
+ /*
+ * In case the stream has been cleared, we need to push the viewer
+ * stream index sent seqcount forward. Note that this can temporarily
+ * bring the index_sent position beyond the index received position.
+ */
+ if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) {
+ vstream->index_sent_seqcount = rstream->clear_position_index_seqcount;
+ /*
+ * Close the currently open index and data files to ensure we
+ * sync up with the receive side.
+ */
+ if (vstream->index_file) {
+ lttng_index_file_put(vstream->index_file);
+ vstream->index_file = NULL;
+ }
+ if (vstream->stream_fd) {
+ stream_fd_put(vstream->stream_fd);
+ vstream->stream_fd = NULL;
+ }
+ /*
+ * In tracefile rotation, we reset the current tracefile to 0.
+ */
+ vstream->current_tracefile_id = 0;
+ }
+
/* Try to open an index if one is needed for that stream. */
ret = try_open_index(vstream, rstream);
if (ret < 0) {
/*
* The index is created only when the first data
* packet arrives, it might not be ready at the
- * beginning of the session
+ * beginning of the session. Let check_index_status
+ * deal with inactivity beacons.
*/
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+ goto check_status;
} else {
/* Unhandled error. */
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
+check_status:
ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
if (ret < 0) {
goto error_put;