Backport: relayd: track the live listener thread's epoll fd
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index f9d57281eea812fe0052dd052921539eb6fe5ee5..72fae0b60d9abbd25b354b456eabfc3445bcc0e2 100644 (file)
@@ -53,6 +53,7 @@
 #include <common/sessiond-comm/relayd.h>
 #include <common/uri.h>
 #include <common/utils.h>
+#include <common/fd-tracker/utils.h>
 
 #include "cmd.h"
 #include "live.h"
@@ -392,7 +393,8 @@ int relayd_live_stop(void)
  * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
  */
 static
-int create_thread_poll_set(struct lttng_poll_event *events, int size)
+int create_named_thread_poll_set(struct lttng_poll_event *events,
+               int size, const char *name)
 {
        int ret;
 
@@ -401,10 +403,8 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size)
                goto error;
        }
 
-       ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
-       if (ret < 0) {
-               goto error;
-       }
+       ret = fd_tracker_util_poll_create(the_fd_tracker,
+                       name, events, 1, LTTNG_CLOEXEC);
 
        /* Add quit pipe */
        ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
@@ -418,6 +418,15 @@ error:
        return ret;
 }
 
+/*
+ * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
+ */
+static
+int create_thread_poll_set(struct lttng_poll_event *events, int size)
+{
+       return create_named_thread_poll_set(events, size, "Unknown epoll");
+}
+
 /*
  * Check if the thread quit pipe was triggered.
  *
@@ -497,7 +506,8 @@ void *thread_listener(void *data)
        }
 
        /* Pass 2 as size here for the thread quit pipe and control sockets. */
-       ret = create_thread_poll_set(&events, 2);
+       ret = create_named_thread_poll_set(&events, 2,
+                       "Live listener thread epoll");
        if (ret < 0) {
                goto error_create_poll;
        }
@@ -611,7 +621,7 @@ exit:
 error:
 error_poll_add:
 error_testpoint:
-       lttng_poll_clean(&events);
+       (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 error_create_poll:
        if (live_control_sock->fd >= 0) {
                ret = live_control_sock->ops->close(live_control_sock);
@@ -1038,12 +1048,12 @@ int viewer_attach_session(struct relay_connection *conn)
        session = session_get_by_id(be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
+                               (uint64_t) be64toh(request.session_id));
                response.status = htobe32(LTTNG_VIEWER_ATTACH_UNK);
                goto send_reply;
        }
        DBG("Attach session ID %" PRIu64 " received",
-               be64toh(request.session_id));
+               (uint64_t) be64toh(request.session_id));
 
        if (session->live_timer == 0) {
                DBG("Not live session");
@@ -1297,7 +1307,7 @@ int viewer_get_next_index(struct relay_connection *conn)
        vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id));
        if (!vstream) {
                DBG("Client requested index of unknown stream id %" PRIu64,
-                               be64toh(request_index.stream_id));
+                               (uint64_t) be64toh(request_index.stream_id));
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        }
@@ -1411,7 +1421,7 @@ int viewer_get_next_index(struct relay_connection *conn)
         */
        DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64,
                rstream->stream_handle,
-               be64toh(packet_index.offset));
+               (uint64_t) be64toh(packet_index.offset));
        viewer_index.offset = packet_index.offset;
        viewer_index.packet_size = packet_index.packet_size;
        viewer_index.content_size = packet_index.content_size;
@@ -1479,13 +1489,15 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret, send_data = 0;
-       char *data = NULL;
-       uint32_t len = 0;
-       ssize_t read_len;
+       int ret;
+       off_t lseek_ret;
+       char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
-       struct lttng_viewer_trace_packet reply;
+       struct lttng_viewer_trace_packet reply_header;
        struct relay_viewer_stream *vstream = NULL;
+       uint32_t reply_size = sizeof(reply_header);
+       uint32_t packet_data_len = 0;
+       ssize_t read_len;
 
        DBG2("Relay get data packet");
 
@@ -1499,76 +1511,78 @@ int viewer_get_packet(struct relay_connection *conn)
        health_code_update();
 
        /* From this point on, the error label can be reached. */
-       memset(&reply, 0, sizeof(reply));
+       memset(&reply_header, 0, sizeof(reply_header));
 
        vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id));
        if (!vstream) {
                DBG("Client requested packet of unknown stream id %" PRIu64,
-                               be64toh(get_packet_info.stream_id));
-               reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+                               (uint64_t) be64toh(get_packet_info.stream_id));
+               reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
                goto send_reply_nolock;
+       } else {
+               packet_data_len = be32toh(get_packet_info.len);
+               reply_size += packet_data_len;
        }
 
-       pthread_mutex_lock(&vstream->stream->lock);
-
-       len = be32toh(get_packet_info.len);
-       data = zmalloc(len);
-       if (!data) {
-               PERROR("relay data zmalloc");
+       reply = zmalloc(reply_size);
+       if (!reply) {
+               PERROR("packet reply zmalloc");
+               reply_size = sizeof(reply_header);
                goto error;
        }
 
-       ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
+       pthread_mutex_lock(&vstream->stream->lock);
+       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
                        SEEK_SET);
-       if (ret < 0) {
+       if (lseek_ret < 0) {
                PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
-                       be64toh(get_packet_info.offset));
+                       (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
-       if (read_len < len) {
+       read_len = lttng_read(vstream->stream_fd->fd,
+                       reply + sizeof(reply_header),
+                       packet_data_len);
+       if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
                                vstream->stream_fd->fd,
-                               be64toh(get_packet_info.offset));
+                               (uint64_t) be64toh(get_packet_info.offset));
                goto error;
        }
-       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
-       reply.len = htobe32(len);
-       send_data = 1;
+       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
+       reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
 error:
-       reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
+       reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
 send_reply:
        if (vstream) {
                pthread_mutex_unlock(&vstream->stream->lock);
        }
 send_reply_nolock:
-       reply.flags = htobe32(reply.flags);
 
        health_code_update();
 
-       ret = send_response(conn->sock, &reply, sizeof(reply));
-       if (ret < 0) {
-               goto end_free;
+       if (reply) {
+               memcpy(reply, &reply_header, sizeof(reply_header));
+               ret = send_response(conn->sock, reply, reply_size);
+       } else {
+               /* No reply to send. */
+               ret = send_response(conn->sock, &reply_header,
+                               reply_size);
        }
-       health_code_update();
 
-       if (send_data) {
-               health_code_update();
-               ret = send_response(conn->sock, data, len);
-               if (ret < 0) {
-                       goto end_free;
-               }
-               health_code_update();
+       health_code_update();
+       if (ret < 0) {
+               PERROR("sendmsg of packet data failed");
+               goto end_free;
        }
 
-       DBG("Sent %u bytes for stream %" PRIu64, len,
-                       be64toh(get_packet_info.stream_id));
+       DBG("Sent %u bytes for stream %" PRIu64, reply_size,
+                       (uint64_t) be64toh(get_packet_info.stream_id));
 
 end_free:
-       free(data);
+       free(reply);
 end:
        if (vstream) {
                viewer_stream_put(vstream);
@@ -1617,7 +1631,7 @@ int viewer_get_metadata(struct relay_connection *conn)
                 * find it.
                 */
                DBG("Client requested metadata of unknown stream id %" PRIu64,
-                               be64toh(request.stream_id));
+                               (uint64_t) be64toh(request.stream_id));
                reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
                goto send_reply;
        }
@@ -1703,7 +1717,7 @@ send_reply:
        }
 
        DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len,
-                       be64toh(request.stream_id));
+                       (uint64_t) be64toh(request.stream_id));
 
        DBG("Metadata sent");
 
@@ -1792,7 +1806,7 @@ int viewer_detach_session(struct relay_connection *conn)
        session = session_get_by_id(be64toh(request.session_id));
        if (!session) {
                DBG("Relay session %" PRIu64 " not found",
-                               be64toh(request.session_id));
+                               (uint64_t) be64toh(request.session_id));
                response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK);
                goto send_reply;
        }
@@ -2080,7 +2094,7 @@ error_poll_create:
        lttng_ht_destroy(viewer_connections_ht);
 viewer_connections_ht_error:
        /* Close relay conn pipes */
-       utils_close_pipe(live_conn_pipe);
+       (void) fd_tracker_util_pipe_close(the_fd_tracker, live_conn_pipe);
        if (err) {
                DBG("Viewer worker thread exited with error");
        }
@@ -2104,7 +2118,8 @@ error_testpoint:
  */
 static int create_conn_pipe(void)
 {
-       return utils_create_pipe_cloexec(live_conn_pipe);
+       return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
+                       "Live connection pipe", live_conn_pipe);
 }
 
 int relayd_live_join(void)
This page took 0.029217 seconds and 5 git commands to generate.