relayd: live: implement support for clear feature
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 9a3d78ef0fc3a0632476d131202b325d77e17e82..2a73556dffa0c22c33df136ee53eb39bcc5af362 100644 (file)
@@ -291,6 +291,13 @@ static int make_viewer_streams(struct relay_session *session,
        assert(session);
        ASSERT_LOCKED(session->lock);
 
+       if (!viewer_trace_chunk) {
+               ERR("Internal error: viewer session associated with session \"%s\" has a NULL trace chunk",
+                               session->session_name);
+               ret = -1;
+               goto error;
+       }
+
        if (session->connection_closed) {
                *closed = true;
        }
@@ -302,6 +309,7 @@ static int make_viewer_streams(struct relay_session *session,
        rcu_read_lock();
        cds_lfht_for_each_entry(session->ctf_traces_ht->ht, &iter.iter, ctf_trace,
                        node.node) {
+               bool trace_has_metadata_stream = false;
                struct relay_stream *stream;
 
                health_code_update();
@@ -310,6 +318,30 @@ static int make_viewer_streams(struct relay_session *session,
                        continue;
                }
 
+               /*
+                * Iterate over all the streams of the trace to see if we have a
+                * metadata stream.
+                */
+               cds_list_for_each_entry_rcu(
+                               stream, &ctf_trace->stream_list, stream_node)
+               {
+                       if (stream->is_metadata) {
+                               trace_has_metadata_stream = true;
+                               break;
+                       }
+               }
+
+               /*
+                * If there is no metadata stream in this trace at the moment
+                * and we never sent one to the viewer, skip the trace. We
+                * accept that the viewer will not see this trace at all.
+                */
+               if (!trace_has_metadata_stream &&
+                               !ctf_trace->metadata_stream_sent_to_viewer) {
+                       ctf_trace_put(ctf_trace);
+                       continue;
+               }
+
                cds_list_for_each_entry_rcu(stream, &ctf_trace->stream_list, stream_node) {
                        struct relay_viewer_stream *vstream;
 
@@ -324,6 +356,15 @@ static int make_viewer_streams(struct relay_session *session,
                        }
                        vstream = viewer_stream_get_by_id(stream->stream_handle);
                        if (!vstream) {
+                               /*
+                                * Save that we sent the metadata stream to the
+                                * viewer. So that we know what trace the viewer
+                                * is aware of.
+                                */
+                               if (stream->is_metadata) {
+                                       ctf_trace->metadata_stream_sent_to_viewer =
+                                                       true;
+                               }
                                vstream = viewer_stream_create(stream,
                                                viewer_trace_chunk, seek_t);
                                if (!vstream) {
@@ -378,6 +419,7 @@ static int make_viewer_streams(struct relay_session *session,
 
 error_unlock:
        rcu_read_unlock();
+error:
        return ret;
 }
 
@@ -959,25 +1001,6 @@ int viewer_get_new_streams(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&session->lock);
-       if (!session->current_trace_chunk) {
-               /*
-                * Means the session is being destroyed. React the same way
-                * as if it could not be found at all.
-                */
-               DBG("Relay session %" PRIu64 " has no current trace chunk, replying LTTNG_VIEWER_NEW_STREAMS_ERR",
-                               session_id);
-               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_ERR);
-               goto send_reply_unlock;
-       }
-
-       if (!conn->viewer_session->current_trace_chunk &&
-                       session->current_trace_chunk) {
-               ret = viewer_session_set_trace_chunk(conn->viewer_session,
-                               session->current_trace_chunk);
-               if (ret) {
-                       goto error_unlock_session;
-               }
-       }
        ret = make_viewer_streams(session,
                        conn->viewer_session->current_trace_chunk,
                        LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
@@ -1057,6 +1080,7 @@ int viewer_attach_session(struct relay_connection *conn)
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
        struct relay_session *session = NULL;
+       enum lttng_viewer_attach_return_code viewer_attach_status;
        bool closed = false;
        uint64_t session_id;
 
@@ -1106,10 +1130,10 @@ int viewer_attach_session(struct relay_connection *conn)
        }
 
        send_streams = 1;
-       ret = viewer_session_attach(conn->viewer_session, session);
-       if (ret) {
-               DBG("Already a viewer attached");
-               response.status = htobe32(LTTNG_VIEWER_ATTACH_ALREADY);
+       viewer_attach_status = viewer_session_attach(conn->viewer_session,
+                       session);
+       if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) {
+               response.status = htobe32(viewer_attach_status);
                goto send_reply;
        }
 
@@ -1126,14 +1150,6 @@ int viewer_attach_session(struct relay_connection *conn)
                goto send_reply;
        }
 
-       if (!conn->viewer_session->current_trace_chunk &&
-                       session->current_trace_chunk) {
-               ret = viewer_session_set_trace_chunk(conn->viewer_session,
-                               session->current_trace_chunk);
-               if (ret) {
-                       goto end_put_session;
-               }
-       }
        ret = make_viewer_streams(session,
                        conn->viewer_session->current_trace_chunk, seek_type,
                        &nb_streams, NULL, NULL, &closed);
@@ -1206,6 +1222,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
        int ret = 0;
        const uint32_t connection_major = rstream->trace->session->major;
        const uint32_t connection_minor = rstream->trace->session->minor;
+       enum lttng_trace_chunk_status chunk_status;
 
        if (vstream->index_file) {
                goto end;
@@ -1218,14 +1235,19 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                ret = -ENOENT;
                goto end;
        }
-       vstream->index_file = lttng_index_file_create_from_trace_chunk_read_only(
+       chunk_status = lttng_index_file_create_from_trace_chunk_read_only(
                        vstream->stream_file.trace_chunk, rstream->path_name,
                        rstream->channel_name, rstream->tracefile_size,
                        vstream->current_tracefile_id,
                        lttng_to_index_major(connection_major, connection_minor),
-                       lttng_to_index_minor(connection_major, connection_minor));
-       if (!vstream->index_file) {
-               ret = -1;
+                       lttng_to_index_minor(connection_major, connection_minor),
+                       true, &vstream->index_file);
+       if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+               if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
+                       ret = -ENOENT;
+               } else {
+                       ret = -1;
+               }
        }
 
 end:
@@ -1249,6 +1271,12 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 {
        int ret;
 
+       DBG("Check index status: index_received_seqcount %" PRIu64 " "
+                               "index_sent_seqcount %" PRIu64 " "
+                               "for stream %" PRIu64,
+                               rstream->index_received_seqcount,
+                               vstream->index_sent_seqcount,
+                               vstream->stream->stream_handle);
        if ((trace->session->connection_closed || rstream->closed)
                        && rstream->index_received_seqcount
                                == vstream->index_sent_seqcount) {
@@ -1259,8 +1287,10 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
+                       (rstream->index_received_seqcount == 0 ||
+                       (vstream->index_sent_seqcount != 0 &&
                        rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+                               <= vstream->index_sent_seqcount))) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1269,19 +1299,37 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * inform the client of a time interval during which we can
                 * guarantee that there are no events to read (and never will
                 * be).
+                *
+                * The sent seqcount can grow higher than receive seqcount on
+                * clear because the rotation performed by clear will push
+                * the index_sent_seqcount ahead (see
+                * viewer_stream_sync_tracefile_array_tail) and skip over
+                * packet sequence numbers.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
+               DBG("Check index status: inactive with beacon, for stream %" PRIu64,
+                               vstream->stream->stream_handle);
                goto index_ready;
-       } else if (rstream->index_received_seqcount
-                       == vstream->index_sent_seqcount) {
+       } else if (rstream->index_received_seqcount == 0 ||
+                       (vstream->index_sent_seqcount != 0 &&
+                       rstream->index_received_seqcount
+                               <= vstream->index_sent_seqcount)) {
                /*
-                * This checks whether received == sent seqcount. In
+                * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
                 * we can only ask the client to retry later.
+                *
+                * The sent seqcount can grow higher than receive seqcount on
+                * clear because the rotation performed by clear will push
+                * the index_sent_seqcount ahead (see
+                * viewer_stream_sync_tracefile_array_tail) and skip over
+                * packet sequence numbers.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("Check index status: retry for stream %" PRIu64,
+                               vstream->stream->stream_handle);
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
                        vstream->current_tracefile_id,
@@ -1294,9 +1342,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                DBG("Viewer stream %" PRIu64 " rotation",
                                vstream->stream->stream_handle);
                ret = viewer_stream_rotate(vstream);
-               if (ret < 0) {
-                       goto end;
-               } else if (ret == 1) {
+               if (ret == 1) {
                        /* EOF across entire stream. */
                        index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                        goto hup;
@@ -1321,6 +1367,11 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                        vstream->current_tracefile_id,
                                        vstream->index_sent_seqcount)) {
                        index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       DBG("Check index status: retry: "
+                               "tracefile array sequence number %" PRIu64
+                               " not in file for stream %" PRIu64,
+                               vstream->index_sent_seqcount,
+                               vstream->stream->stream_handle);
                        goto index_ready;
                }
                assert(tracefile_array_seq_in_file(rstream->tfa,
@@ -1329,7 +1380,6 @@ static int check_index_status(struct relay_viewer_stream *vstream,
        }
        /* ret == 0 means successful so we continue. */
        ret = 0;
-end:
        return ret;
 
 hup:
@@ -1394,21 +1444,70 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
-       /* Try to open an index if one is needed for that stream. */
-       ret = try_open_index(vstream, rstream);
-       if (ret < 0) {
-               if (ret == -ENOENT) {
-                       /*
-                        * The index is created only when the first data
-                        * packet arrives, it might not be ready at the
-                        * beginning of the session
-                        */
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
-               } else {
-                       /* Unhandled error. */
+       if (rstream->ongoing_rotation.is_set) {
+               /* Rotation is ongoing, try again later. */
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               goto send_reply;
+       }
+
+       if (rstream->trace->session->ongoing_rotation) {
+               /* Rotation is ongoing, try again later. */
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               goto send_reply;
+       }
+
+       if (rstream->trace_chunk) {
+               uint64_t rchunk_id, vchunk_id;
+
+               /*
+                * If the relay stream is not yet closed, ensure the viewer
+                * chunk matches the relay chunk after clear.
+                */
+               if (lttng_trace_chunk_get_id(rstream->trace_chunk,
+                               &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+                       goto send_reply;
+               }
+               if (lttng_trace_chunk_get_id(
+                               conn->viewer_session->current_trace_chunk,
+                               &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+                       goto send_reply;
+               }
+
+               if (rchunk_id != vchunk_id) {
+                       DBG("Relay and viewer chunk ids differ: "
+                               "rchunk_id %" PRIu64 " vchunk_id %" PRIu64,
+                               rchunk_id, vchunk_id);
+
+                       lttng_trace_chunk_put(
+                               conn->viewer_session->current_trace_chunk);
+                       conn->viewer_session->current_trace_chunk = NULL;
+                       ret = viewer_session_set_trace_chunk_copy(
+                                       conn->viewer_session,
+                                       rstream->trace_chunk);
+                       if (ret) {
+                               viewer_index.status =
+                                       htobe32(LTTNG_VIEWER_INDEX_ERR);
+                               goto send_reply;
+                       }
                }
-               goto send_reply;
+       }
+       if (conn->viewer_session->current_trace_chunk !=
+                       vstream->stream_file.trace_chunk) {
+               bool acquired_reference;
+
+               DBG("Viewer session and viewer stream chunk differ: "
+                               "vsession chunk %p vstream chunk %p",
+                               conn->viewer_session->current_trace_chunk,
+                               vstream->stream_file.trace_chunk);
+               lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+               acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+               assert(acquired_reference);
+               vstream->stream_file.trace_chunk =
+                       conn->viewer_session->current_trace_chunk;
+               viewer_stream_sync_tracefile_array_tail(vstream);
+               viewer_stream_close_files(vstream);
        }
 
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
@@ -1424,6 +1523,22 @@ int viewer_get_next_index(struct relay_connection *conn)
        /* At this point, ret is 0 thus we will be able to read the index. */
        assert(!ret);
 
+       /* Try to open an index if one is needed for that stream. */
+       ret = try_open_index(vstream, rstream);
+       if (ret == -ENOENT) {
+              if (rstream->closed) {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                       goto send_reply;
+              } else {
+                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto send_reply;
+              }
+       }
+       if (ret < 0) {
+               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
+               goto send_reply;
+       }
+
        /*
         * vstream->stream_fd may be NULL if it has been closed by
         * tracefile rotation, or if we are at the beginning of the
@@ -1444,10 +1559,20 @@ int viewer_get_next_index(struct relay_connection *conn)
                        goto error_put;
                }
 
+               /*
+                * It is possible the the file we are trying to open is
+                * missing if the stream has been closed (application exits with
+                * per-pid buffers) and a clear command has been performed.
+                */
                status = lttng_trace_chunk_open_file(
                                vstream->stream_file.trace_chunk,
-                               file_path, O_RDONLY, 0, &fd);
+                               file_path, O_RDONLY, 0, &fd, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE &&
+                                       rstream->closed) {
+                               viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_HUP);
+                               goto send_reply;
+                       }
                        PERROR("Failed to open trace file for viewer stream");
                        goto error_put;
                }
@@ -1704,14 +1829,32 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       assert(vstream->metadata_sent <= vstream->stream->metadata_received);
-
-       len = vstream->stream->metadata_received - vstream->metadata_sent;
-       if (len == 0) {
+       if (vstream->metadata_sent >= vstream->stream->metadata_received) {
+               /*
+                * The live viewers expect to receive a NO_NEW_METADATA
+                * status before a stream disappears, otherwise they abort the
+                * entire live connection when receiving an error status.
+                *
+                * Clear feature resets the metadata_sent to 0 until the
+                * same metadata is received again.
+                */
                reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+               /*
+                * The live viewer considers a closed 0 byte metadata stream as
+                * an error.
+                */
+               if (vstream->metadata_sent > 0) {
+                       vstream->stream->no_new_metadata_notified = true;
+                       if (vstream->stream->closed) {
+                               /* Release ownership for the viewer metadata stream. */
+                               viewer_stream_put(vstream);
+                       }
+               }
                goto send_reply;
        }
 
+       len = vstream->stream->metadata_received - vstream->metadata_sent;
+
        /* first time, we open the metadata file */
        if (!vstream->stream_file.fd) {
                int fd;
@@ -1727,10 +1870,23 @@ int viewer_get_metadata(struct relay_connection *conn)
                        goto error;
                }
 
+               /*
+                * It is possible the the metadata file we are trying to open is
+                * missing if the stream has been closed (application exits with
+                * per-pid buffers) and a clear command has been performed.
+                */
                status = lttng_trace_chunk_open_file(
                                vstream->stream_file.trace_chunk,
-                               file_path, O_RDONLY, 0, &fd);
+                               file_path, O_RDONLY, 0, &fd, true);
                if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+                       if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) {
+                               reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA);
+                               len = 0;
+                               if (vstream->stream->closed) {
+                                       viewer_stream_put(vstream);
+                               }
+                               goto send_reply;
+                       }
                        PERROR("Failed to open metadata file for viewer stream");
                        goto error;
                }
@@ -1756,12 +1912,6 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
        vstream->metadata_sent += read_len;
-       if (vstream->metadata_sent == vstream->stream->metadata_received
-                       && vstream->stream->closed) {
-               /* Release ownership for the viewer metadata stream. */
-               viewer_stream_put(vstream);
-       }
-
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
 
        goto send_reply;
This page took 0.029657 seconds and 5 git commands to generate.