relayd: Implement custom EfficiOS session clear
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index f81872b9d1b4cfffa138dcb99eb40cc8b863db77..d682d3b09bd22e0ee3a4fcc534ecc80b47a56d70 100644 (file)
@@ -1149,9 +1149,9 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        }
 
        /*
-        * First time, we open the index file and at least one index is ready.
+        * Deal with streams that have not received any index so far (or after clear).
         */
-       if (rstream->index_received_seqcount == 0) {
+       if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) {
                ret = -ENOENT;
                goto end;
        }
@@ -1184,6 +1184,11 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 {
        int ret;
 
+       DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64,
+                               rstream->index_received_seqcount,
+                               vstream->index_sent_seqcount,
+                               rstream->clear_position_index_seqcount,
+                               rstream->clear_position_data_seqcount, vstream->stream->stream_handle);
        if ((trace->session->connection_closed || rstream->closed)
                        && rstream->index_received_seqcount
                                == vstream->index_sent_seqcount) {
@@ -1195,7 +1200,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
                        rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+                               <= vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1204,19 +1209,33 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * inform the client of a time interval during which we can
                 * guarantee that there are no events to read (and never will
                 * be).
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
+               DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (rstream->index_received_seqcount
-                       == vstream->index_sent_seqcount) {
+                       <= vstream->index_sent_seqcount) {
                /*
-                * This checks whether received == sent seqcount. In
+                * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
                 * we can only ask the client to retry later.
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle);
+               goto index_ready;
+       } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount ||
+                       rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) {
+               /*
+                * Send "retry" reply if a clear operation is in progress.
+                */
+               index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
                        vstream->current_tracefile_id,
@@ -1256,6 +1275,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                        vstream->current_tracefile_id,
                                        vstream->index_sent_seqcount)) {
                        index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle);
                        goto index_ready;
                }
                assert(tracefile_array_seq_in_file(rstream->tfa,
@@ -1329,6 +1349,31 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * In case the stream has been cleared, we need to push the viewer
+        * stream index sent seqcount forward. Note that this can temporarily
+        * bring the index_sent position beyond the index received position.
+        */
+       if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) {
+               vstream->index_sent_seqcount = rstream->clear_position_index_seqcount;
+               /*
+                * Close the currently open index and data files to ensure we
+                * sync up with the receive side.
+                */
+               if (vstream->index_file) {
+                       lttng_index_file_put(vstream->index_file);
+                       vstream->index_file = NULL;
+               }
+               if (vstream->stream_fd) {
+                       stream_fd_put(vstream->stream_fd);
+                       vstream->stream_fd = NULL;
+               }
+               /*
+                * In tracefile rotation, we reset the current tracefile to 0.
+                */
+               vstream->current_tracefile_id = 0;
+       }
+
        /* Try to open an index if one is needed for that stream. */
        ret = try_open_index(vstream, rstream);
        if (ret < 0) {
@@ -1336,9 +1381,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                        /*
                         * The index is created only when the first data
                         * packet arrives, it might not be ready at the
-                        * beginning of the session
+                        * beginning of the session. Let check_index_status
+                        * deal with inactivity beacons.
                         */
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto check_status;
                } else {
                        /* Unhandled error. */
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
@@ -1346,6 +1392,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+check_status:
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
        if (ret < 0) {
                goto error_put;
This page took 0.026218 seconds and 5 git commands to generate.