Test: clear: local, streaming, live, tracefile rotation
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 78ea95cc247652185e5ab0fc218c259458e629c6..c06b38cb7b2977929fa391d9b709f46b79ac83d9 100644 (file)
@@ -339,7 +339,10 @@ int make_viewer_streams(struct relay_session *session,
                                 * Ensure a self-reference is preserved even
                                 * after we have put our local reference.
                                 */
-                               viewer_stream_get(vstream);
+                               if (!viewer_stream_get(vstream)) {
+                                       ERR("Unable to get self-reference on viewer stream, logic error.");
+                                       abort();
+                               }
                        } else {
                                if (!vstream->sent_flag && nb_unsent) {
                                        /* Update number of unsent stream counter. */
@@ -449,10 +452,11 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri)
        if (ret < 0) {
                goto error;
        }
-       DBG("Listening on sock %d for live", sock->fd);
+       DBG("Listening on sock %d for lttng-live", sock->fd);
 
        ret = sock->ops->bind(sock);
        if (ret < 0) {
+               PERROR("Failed to bind lttng-live socket");
                goto error;
        }
 
@@ -651,12 +655,16 @@ void *thread_dispatcher(void *data)
 
        health_code_update();
 
-       while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+       for (;;) {
                health_code_update();
 
                /* Atomically prepare the queue futex */
                futex_nto1_prepare(&viewer_conn_queue.futex);
 
+               if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+                       break;
+               }
+
                do {
                        health_code_update();
 
@@ -827,6 +835,11 @@ int viewer_list_sessions(struct relay_connection *conn)
 
                health_code_update();
 
+               if (session->connection_closed) {
+                       /* Skip closed session */
+                       continue;
+               }
+
                if (count >= buf_count) {
                        struct lttng_viewer_session *newbuf;
                        uint32_t new_buf_count = buf_count << 1;
@@ -1031,12 +1044,12 @@ int viewer_attach_session(struct relay_connection *conn)
        session = session_get_by_id(be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
+                               (uint64_t) be64toh(request.session_id));
                response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
        DBG("Attach session ID %" PRIu64 " received",
-               be64toh(request.session_id));
+               (uint64_t) be64toh(request.session_id));
 
        if (session->live_timer == 0) {
                DBG("Not live session");
@@ -1081,10 +1094,19 @@ int viewer_attach_session(struct relay_connection *conn)
        if (closed) {
                send_streams = 0;
                response.streams_count = 0;
-               response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
 
+       if (testpoint(relayd_viewer_session_attach)) {
+               ERR("Viewer session attach testpoint failed");
+               send_streams = 0;
+               response.streams_count = 0;
+               response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
+               goto send_reply;
+       }
+
+
 send_reply:
        health_code_update();
        ret = send_response(conn->sock, &response, sizeof(response));
@@ -1119,8 +1141,8 @@ error:
 /*
  * Open the index file if needed for the given vstream.
  *
- * If an index file is successfully opened, the vstream index_fd set with
- * it.
+ * If an index file is successfully opened, the vstream will set it as its
+ * current index file.
  *
  * Return 0 on success, a negative value on error (-ENOENT if not ready yet).
  *
@@ -1131,31 +1153,23 @@ static int try_open_index(struct relay_viewer_stream *vstream,
 {
        int ret = 0;
 
-       if (vstream->index_fd) {
+       if (vstream->index_file) {
                goto end;
        }
 
        /*
-        * First time, we open the index file and at least one index is ready.
+        * Deal with streams that have not received any index so far (or after clear).
         */
-       if (rstream->index_received_seqcount == 0) {
+       if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount) {
                ret = -ENOENT;
                goto end;
        }
-       ret = index_open(vstream->path_name, vstream->channel_name,
+       vstream->index_file = lttng_index_file_open(vstream->path_name,
+                       vstream->channel_name,
                        vstream->stream->tracefile_count,
                        vstream->current_tracefile_id);
-       if (ret >= 0) {
-               vstream->index_fd = stream_fd_create(ret);
-               if (!vstream->index_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
-                       ret = -1;
-               } else {
-                       ret = 0;
-               }
-               goto end;
+       if (!vstream->index_file) {
+               ret = -1;
        }
 
 end:
@@ -1179,15 +1193,23 @@ static int check_index_status(struct relay_viewer_stream *vstream,
 {
        int ret;
 
-       if (trace->session->connection_closed
+       DBG("check status: recv %" PRIu64 " sent %" PRIu64 " clear index %" PRIu64 " clear data %" PRIu64 " for stream %" PRIu64,
+                               rstream->index_received_seqcount,
+                               vstream->index_sent_seqcount,
+                               rstream->clear_position_index_seqcount,
+                               rstream->clear_position_data_seqcount, vstream->stream->stream_handle);
+       if ((trace->session->connection_closed || rstream->closed)
                        && rstream->index_received_seqcount
                                == vstream->index_sent_seqcount) {
-               /* Last index sent and session connection is closed. */
+               /*
+                * Last index sent and session connection or relay
+                * stream are closed.
+                */
                index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
                goto hup;
        } else if (rstream->beacon_ts_end != -1ULL &&
                        rstream->index_received_seqcount
-                               == vstream->index_sent_seqcount) {
+                               <= vstream->index_sent_seqcount) {
                /*
                 * We've received a synchronization beacon and the last index
                 * available has been sent, the index for now is inactive.
@@ -1196,19 +1218,33 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                 * inform the client of a time interval during which we can
                 * guarantee that there are no events to read (and never will
                 * be).
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_INACTIVE);
                index->timestamp_end = htobe64(rstream->beacon_ts_end);
                index->stream_id = htobe64(rstream->ctf_stream_id);
+               DBG("INACTIVE: with beacon, r v recv eq for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (rstream->index_received_seqcount
-                       == vstream->index_sent_seqcount) {
+                       <= vstream->index_sent_seqcount) {
                /*
-                * This checks whether received == sent seqcount. In
+                * This checks whether received <= sent seqcount. In
                 * this case, we have not received a beacon. Therefore,
                 * we can only ask the client to retry later.
+                *
+                * The sent seqcount can grow higher than receive seqcount on clear.
                 */
                index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r v recv leq for stream %" PRIu64, vstream->stream->stream_handle);
+               goto index_ready;
+       } else if (rstream->index_received_seqcount <= rstream->clear_position_index_seqcount ||
+                       rstream->index_received_seqcount <= rstream->clear_position_data_seqcount) {
+               /*
+                * Send "retry" reply if a clear operation is in progress.
+                */
+               index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+               DBG("RETRY: r <= clear pos for stream %" PRIu64, vstream->stream->stream_handle);
                goto index_ready;
        } else if (!tracefile_array_seq_in_file(rstream->tfa,
                        vstream->current_tracefile_id,
@@ -1248,6 +1284,7 @@ static int check_index_status(struct relay_viewer_stream *vstream,
                                        vstream->current_tracefile_id,
                                        vstream->index_sent_seqcount)) {
                        index->status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       DBG("RETRY: !tfs seq in file for stream %" PRIu64, vstream->stream->stream_handle);
                        goto index_ready;
                }
                assert(tracefile_array_seq_in_file(rstream->tfa,
@@ -1274,7 +1311,6 @@ static
 int viewer_get_next_index(struct relay_connection *conn)
 {
        int ret;
-       ssize_t read_ret;
        struct lttng_viewer_get_next_index request_index;
        struct lttng_viewer_index viewer_index;
        struct ctf_packet_index packet_index;
@@ -1299,7 +1335,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
                DBG("Client requested index of unknown stream id %" PRIu64,
-                               be64toh(request_index.stream_id));
+                               (uint64_t) be64toh(request_index.stream_id));
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        }
@@ -1322,6 +1358,31 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+       /*
+        * In case the stream has been cleared, we need to push the viewer
+        * stream index sent seqcount forward. Note that this can temporarily
+        * bring the index_sent position beyond the index received position.
+        */
+       if (rstream->clear_position_index_seqcount >= vstream->index_sent_seqcount) {
+               vstream->index_sent_seqcount = rstream->clear_position_index_seqcount;
+               /*
+                * Close the currently open index and data files to ensure we
+                * sync up with the receive side.
+                */
+               if (vstream->index_file) {
+                       lttng_index_file_put(vstream->index_file);
+                       vstream->index_file = NULL;
+               }
+               if (vstream->stream_fd) {
+                       stream_fd_put(vstream->stream_fd);
+                       vstream->stream_fd = NULL;
+               }
+               /*
+                * In tracefile rotation, we reset the current tracefile to 0.
+                */
+               vstream->current_tracefile_id = 0;
+       }
+
        /* Try to open an index if one is needed for that stream. */
        ret = try_open_index(vstream, rstream);
        if (ret < 0) {
@@ -1329,9 +1390,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                        /*
                         * The index is created only when the first data
                         * packet arrives, it might not be ready at the
-                        * beginning of the session
+                        * beginning of the session. Let check_index_status
+                        * deal with inactivity beacons.
                         */
-                       viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_RETRY);
+                       goto check_status;
                } else {
                        /* Unhandled error. */
                        viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
@@ -1339,6 +1401,7 @@ int viewer_get_next_index(struct relay_connection *conn)
                goto send_reply;
        }
 
+check_status:
        ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index);
        if (ret < 0) {
                goto error_put;
@@ -1397,11 +1460,10 @@ int viewer_get_next_index(struct relay_connection *conn)
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
        }
 
-       read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
-                       sizeof(packet_index));
-       if (read_ret < sizeof(packet_index)) {
-               ERR("Relay reading index file %d returned %zd",
-                       vstream->index_fd->fd, read_ret);
+       ret = lttng_index_file_read(vstream->index_file, &packet_index);
+       if (ret) {
+               ERR("Relay error reading index file %d",
+                               vstream->index_file->fd);
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        } else {
@@ -1414,7 +1476,7 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
                rstream->stream_handle,
-               be64toh(packet_index.offset));
+               (uint64_t) be64toh(packet_index.offset));
        viewer_index.offset = packet_index.offset;
        viewer_index.packet_size = packet_index.packet_size;
        viewer_index.content_size = packet_index.content_size;
@@ -1482,13 +1544,15 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret, send_data = 0;
-       char *data = NULL;
-       uint32_t len = 0;
-       ssize_t read_len;
+       int ret;
+       off_t lseek_ret;
+       char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
-       struct lttng_viewer_trace_packet reply;
+       struct lttng_viewer_trace_packet reply_header;
        struct relay_viewer_stream *vstream = NULL;
+       uint32_t reply_size = sizeof(reply_header);
+       uint32_t packet_data_len = 0;
+       ssize_t read_len;
 
        DBG2("Relay get data packet");
 
@@ -1502,76 +1566,78 @@ int viewer_get_packet(struct relay_connection *conn)
        health_code_update();
 
        /* From this point on, the error label can be reached. */
-       memset(&reply, 0, sizeof(reply));
+       memset(&reply_header, 0, sizeof(reply_header));
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
                DBG("Client requested packet of unknown stream id %" PRIu64,
-                               be64toh(get_packet_info.stream_id));
-               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+                               (uint64_t) be64toh(get_packet_info.stream_id));
+               reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                goto send_reply_nolock;
+       } else {
+               packet_data_len = be32toh(get_packet_info.len);
+               reply_size += packet_data_len;
        }
 
-       pthread_mutex_lock(&vstream->stream->lock);
-
-       len = be32toh(get_packet_info.len);
-       data = zmalloc(len);
-       if (!data) {
-               PERROR("relay data zmalloc");
+       reply = zmalloc(reply_size);
+       if (!reply) {
+               PERROR("packet reply zmalloc");
+               reply_size = sizeof(reply_header);
                goto error;
        }
 
-       ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
+       pthread_mutex_lock(&vstream->stream->lock);
+       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
                        SEEK_SET);
-       if (ret < 0) {
+       if (lseek_ret < 0) {
                PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
-                       be64toh(get_packet_info.offset));
+                       (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
-       if (read_len < len) {
+       read_len = lttng_read(vstream->stream_fd->fd,
+                       reply + sizeof(reply_header),
+                       packet_data_len);
+       if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
                                vstream->stream_fd->fd,
-                               be64toh(get_packet_info.offset));
+                               (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
-       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
-       reply.len = htobe32(len);
-       send_data = 1;
+       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+       reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
 error:
-       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
        }
 send_reply_nolock:
-       reply.flags = htobe32(reply.flags);
 
        health_code_update();
 
-       ret = send_response(conn->sock, &reply, sizeof(reply));
-       if (ret < 0) {
-               goto end_free;
+       if (reply) {
+               memcpy(reply, &reply_header, sizeof(reply_header));
+               ret = send_response(conn->sock, reply, reply_size);
+       } else {
+               /* No reply to send. */
+               ret = send_response(conn->sock, &reply_header,
+                               reply_size);
        }
-       health_code_update();
 
-       if (send_data) {
-               health_code_update();
-               ret = send_response(conn->sock, data, len);
-               if (ret < 0) {
-                       goto end_free;
-               }
-               health_code_update();
+       health_code_update();
+       if (ret < 0) {
+               PERROR("sendmsg of packet data failed");
+               goto end_free;
        }
 
-       DBG("Sent %u bytes for stream %" PRIu64, len,
-                       be64toh(get_packet_info.stream_id));
+       DBG("Sent %u bytes for stream %" PRIu64, reply_size,
+                       (uint64_t) be64toh(get_packet_info.stream_id));
 
 end_free:
-       free(data);
+       free(reply);
 end:
        if (vstream) {
                viewer_stream_put(vstream);
@@ -1620,7 +1686,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * find it.
                 */
                DBG("Client requested metadata of unknown stream id %" PRIu64,
-                               be64toh(request.stream_id));
+                               (uint64_t) be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
@@ -1706,7 +1772,7 @@ send_reply:
        }
 
        DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
-                       be64toh(request.stream_id));
+                       (uint64_t) be64toh(request.stream_id));
 
        DBG("Metadata sent");
 
@@ -1795,7 +1861,7 @@ int viewer_detach_session(struct relay_connection *conn)
        session = session_get_by_id(be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
+                               (uint64_t) be64toh(request.session_id));
                response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
                goto send_reply;
        }
@@ -2183,7 +2249,7 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the dispatcher thread */
-       ret = pthread_create(&live_dispatcher_thread, NULL,
+       ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
                        thread_dispatcher, (void *) NULL);
        if (ret) {
                errno = ret;
@@ -2193,7 +2259,7 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the worker thread */
-       ret = pthread_create(&live_worker_thread, NULL,
+       ret = pthread_create(&live_worker_thread, default_pthread_attr(),
                        thread_worker, NULL);
        if (ret) {
                errno = ret;
@@ -2203,7 +2269,7 @@ int relayd_live_create(struct lttng_uri *uri)
        }
 
        /* Setup the listener thread */
-       ret = pthread_create(&live_listener_thread, NULL,
+       ret = pthread_create(&live_listener_thread, default_pthread_attr(),
                        thread_listener, (void *) NULL);
        if (ret) {
                errno = ret;
This page took 0.036715 seconds and 5 git commands to generate.