Fix: relayd: file rotation and live read
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index bafdf35ce90034a90e8ee0e9eaa1a8996103de9e..bd6e12e5af0e560ad697236e44d401b5940e5b01 100644 (file)
@@ -1130,7 +1130,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        /*
         * First time, we open the index file and at least one index is ready.
         */
-       if (rstream->total_index_received == 0) {
+       if (rstream->index_received_seqcount == 0) {
                ret = -ENOENT;
                goto end;
        }
@@ -1172,14 +1172,14 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        int ret;
 
        if (trace->session->connection_closed
-                       && rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       && rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /* Last index sent and session connection is closed. */
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
-                       rstream->total_index_received
-                               == vstream->last_sent_index) {
+                       rstream->index_received_seqcount
+                               == vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1193,21 +1193,24 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
                goto index_ready;
-       } else if (rstream->total_index_received <= vstream->last_sent_index) {
+       } else if (rstream->index_received_seqcount
+                       == vstream->index_sent_seqcount) {
                /*
-                * This actually checks the case where recv == last_sent.
-                * In this case, we have not received a beacon. Therefore, we
-                * can only ask the client to retry later.
+                * This checks whether received == sent seqcount. In
+                * this case, we have not received a beacon. Therefore,
+                * we can only ask the client to retry later.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
                goto index_ready;
-       } else if (!viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq)) {
+       } else if (!tracefile_array_seq_in_file(rstream->tfa,
+                       vstream->current_tracefile_id,
+                       vstream->index_sent_seqcount)) {
                /*
-                * The producer has overwritten our current file. We
-                * need to rotate.
+                * The next index we want to send cannot be read either
+                * because we need to perform a rotation, or due to
+                * the producer having overwritten its trace file.
                 */
-               DBG("Viewer stream %" PRIu64 " rotation due to overwrite",
+               DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
                if (ret < 0) {
@@ -1217,50 +1220,34 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
                }
-               assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                       vstream->current_tracefile_seq));
-               /* ret == 0 means successful so we continue. */
-               ret = 0;
-       } else {
-               ssize_t read_ret;
-               char tmp[1];
-
                /*
-                * Use EOF on current index file to find out when we
-                * need to rotate.
+                * If we have been pushed due to overwrite, it
+                * necessarily means there is data that can be read in
+                * the stream. If we rotated because we reached the end
+                * of a tracefile, it means the following tracefile
+                * needs to contain at least one index, else we would
+                * have already returned LTTNG_VIEWER_INDEX_RETRY to the
+                * viewer. The updated index_sent_seqcount needs to
+                * point to a readable index entry now.
+                *
+                * In the case where we "rotate" on a single file, we
+                * can end up in a case where the requested index is
+                * still unavailable.
                 */
-               read_ret = lttng_read(vstream->index_fd->fd, tmp, 1);
-               if (read_ret == 1) {
-                       off_t seek_ret;
-
-                       /* There is still data to read. Rewind position. */
-                       seek_ret = lseek(vstream->index_fd->fd, -1, SEEK_CUR);
-                       if (seek_ret < 0) {
-                               ret = -1;
-                               goto end;
-                       }
-                       ret = 0;
-               } else if (read_ret == 0) {
-                       /* EOF. We need to rotate. */
-                       DBG("Viewer stream %" PRIu64 " rotation due to EOF",
-                                       vstream->stream->stream_handle);
-                       ret = viewer_stream_rotate(vstream);
-                       if (ret < 0) {
-                               goto end;
-                       } else if (ret == 1) {
-                               /* EOF across entire stream. */
-                               index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
-                               goto hup;
-                       }
-                       assert(viewer_stream_is_tracefile_seq_readable(vstream,
-                               vstream->current_tracefile_seq));
-                       /* ret == 0 means successful so we continue. */
-                       ret = 0;
-               } else {
-                       /* Error reading index. */
-                       ret = -1;
+               if (rstream->tracefile_count == 1 &&
+                               !tracefile_array_seq_in_file(
+                                       rstream->tfa,
+                                       vstream->current_tracefile_id,
+                                       vstream->index_sent_seqcount)) {
+                       index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto index_ready;
                }
+               assert(tracefile_array_seq_in_file(rstream->tfa,
+                               vstream->current_tracefile_id,
+                               vstream->index_sent_seqcount));
        }
+       /* ret == 0 means successful so we continue. */
+       ret = 0;
 end:
        return ret;
 
@@ -1303,6 +1290,8 @@ int viewer_get_next_index(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
+               DBG("Client requested index of unknown stream id %" PRIu64,
+                               be64toh(request_index.stream_id));
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        }
@@ -1409,7 +1398,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        } else {
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_OK);
-               vstream->last_sent_index++;
+               vstream->index_sent_seqcount++;
        }
 
        /*
@@ -1456,7 +1445,7 @@ send_reply:
 
        if (vstream) {
                DBG("Index %" PRIu64 " for stream %" PRIu64 " sent",
-                               vstream->last_sent_index,
+                               vstream->index_sent_seqcount,
                                vstream->stream->stream_handle);
        }
 end:
@@ -1509,6 +1498,8 @@ int viewer_get_packet(struct relay_connection *conn)
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
+               DBG("Client requested packet of unknown stream id %" PRIu64,
+                               be64toh(get_packet_info.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                goto send_reply_nolock;
        }
@@ -1620,6 +1611,8 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * Reply back to the client with an error if we cannot
                 * find it.
                 */
+               DBG("Client requested metadata of unknown stream id %" PRIu64,
+                               be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
This page took 0.02869 seconds and 5 git commands to generate.