X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=d682d3b09bd22e0ee3a4fcc534ecc80b47a56d70;hb=82b4cfcce55624cb2560144a5b1d160083bc6ba2;hp=f81872b9d1b4cfffa138dcb99eb40cc8b863db77;hpb=7a5e50b18fbe1093870efaf0dc3bee6e900af13c;p=deliverable%2Flttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index f81872b9d..d682d3b09 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -1149,9 +1149,9 @@ static int try_open_index(struct relay_viewer_stream *vstream, } /* - * First time, we open the index file and at least one index is ready. + * Deal with streams that have not received any index so far (or after clear). */ - if (rstream->index_received_seqcount == 0) { + if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) { ret = -ENOENT; goto end; } @@ -1184,6 +1184,11 @@ static int check_index_status(struct relay_viewer_stream *vstream, { int ret; + DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64, + rstream->index_received_seqcount, + vstream->index_sent_seqcount, + rstream->clear_position_index_seqcount, + rstream->clear_position_data_seqcount, vstream->stream->stream_handle); if ((trace->session->connection_closed || rstream->closed) && rstream->index_received_seqcount == vstream->index_sent_seqcount) { @@ -1195,7 +1200,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, goto hup; } else if (rstream->beacon_ts_end != -1ULL && rstream->index_received_seqcount - == vstream->index_sent_seqcount) { + <= vstream->index_sent_seqcount) { /* * We've received a synchronization beacon and the last index * available has been sent, the index for now is inactive. @@ -1204,19 +1209,33 @@ static int check_index_status(struct relay_viewer_stream *vstream, * inform the client of a time interval during which we can * guarantee that there are no events to read (and never will * be). + * + * The sent seqcount can grow higher than receive seqcount on clear. */ index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE); index->timestamp_end = htobe64(rstream->beacon_ts_end); index->stream_id = htobe64(rstream->ctf_stream_id); + DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle); goto index_ready; } else if (rstream->index_received_seqcount - == vstream->index_sent_seqcount) { + <= vstream->index_sent_seqcount) { /* - * This checks whether received == sent seqcount. In + * This checks whether received <= sent seqcount. In * this case, we have not received a beacon. Therefore, * we can only ask the client to retry later. + * + * The sent seqcount can grow higher than receive seqcount on clear. */ index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle); + goto index_ready; + } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount || + rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) { + /* + * Send "retry" reply if a clear operation is in progress. + */ + index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle); goto index_ready; } else if (!tracefile_array_seq_in_file(rstream->tfa, vstream->current_tracefile_id, @@ -1256,6 +1275,7 @@ static int check_index_status(struct relay_viewer_stream *vstream, vstream->current_tracefile_id, vstream->index_sent_seqcount)) { index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle); goto index_ready; } assert(tracefile_array_seq_in_file(rstream->tfa, @@ -1329,6 +1349,31 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } + /* + * In case the stream has been cleared, we need to push the viewer + * stream index sent seqcount forward. Note that this can temporarily + * bring the index_sent position beyond the index received position. + */ + if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) { + vstream->index_sent_seqcount = rstream->clear_position_index_seqcount; + /* + * Close the currently open index and data files to ensure we + * sync up with the receive side. + */ + if (vstream->index_file) { + lttng_index_file_put(vstream->index_file); + vstream->index_file = NULL; + } + if (vstream->stream_fd) { + stream_fd_put(vstream->stream_fd); + vstream->stream_fd = NULL; + } + /* + * In tracefile rotation, we reset the current tracefile to 0. + */ + vstream->current_tracefile_id = 0; + } + /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); if (ret < 0) { @@ -1336,9 +1381,10 @@ int viewer_get_next_index(struct relay_connection *conn) /* * The index is created only when the first data * packet arrives, it might not be ready at the - * beginning of the session + * beginning of the session. Let check_index_status + * deal with inactivity beacons. */ - viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY); + goto check_status; } else { /* Unhandled error. */ viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); @@ -1346,6 +1392,7 @@ int viewer_get_next_index(struct relay_connection *conn) goto send_reply; } +check_status: ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); if (ret < 0) { goto error_put;