Backport: relayd: use the fd-tracker to track stream_fd fds
[deliverable/lttng-tools.git] / src / bin / lttng-relayd / live.c
index 26f6e100149656325b1905d36960ec354d91a891..bf2895542c5671890d2e2717248ee44fb43f569c 100644 (file)
@@ -457,6 +457,44 @@ int close_sock(void *data, int *in_fd)
        return sock->ops->close(sock);
 }
 
+static int accept_sock(void *data, int *out_fd)
+{
+       int ret = 0;
+       /* Socks is an array of in_sock, out_sock. */
+       struct lttcomm_sock **socks = data;
+       struct lttcomm_sock *in_sock = socks[0];
+
+        socks[1] = in_sock->ops->accept(in_sock);
+       if (!socks[1]) {
+               ret = -1;
+               goto end;
+       }
+       *out_fd = socks[1]->fd;
+end:
+       return ret;
+}
+
+static
+struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
+               const char *name)
+{
+       int out_fd, ret;
+       struct lttcomm_sock *socks[2] = { listening_sock, NULL };
+       struct lttcomm_sock *new_sock = NULL;
+
+        ret = fd_tracker_open_unsuspendable_fd(
+                       the_fd_tracker, &out_fd,
+                       (const char **) &name,
+                       1, accept_sock, &socks);
+       if (ret) {
+               goto end;
+       }
+       new_sock = socks[1];
+       DBG("%s accepted, socket %d", name, new_sock->fd);
+end:
+       return new_sock;
+}
+
 /*
  * Create and init socket from uri.
  */
@@ -606,7 +644,8 @@ restart:
                                struct relay_connection *new_conn;
                                struct lttcomm_sock *newsock;
 
-                               newsock = live_control_sock->ops->accept(live_control_sock);
+                               newsock = accept_live_sock(live_control_sock,
+                                                       "Live socket to client");
                                if (!newsock) {
                                        PERROR("accepting control sock");
                                        goto error;
@@ -1416,12 +1455,8 @@ int viewer_get_next_index(struct relay_connection *conn)
                if (ret < 0) {
                        goto error_put;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening trace file");
-                       goto error_put;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
                        if (close(ret)) {
                                PERROR("close");
@@ -1522,7 +1557,7 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret;
+       int ret, stream_fd;
        off_t lseek_ret;
        char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
@@ -1565,26 +1600,32 @@ int viewer_get_packet(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
-                       SEEK_SET);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               ERR("Failed to get viewer stream file descriptor");
+               goto error_put_fd;
+       }
+       lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
-               PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
-                       be64toh(get_packet_info.offset));
-               goto error;
+               PERROR("lseek fd %d to offset %" PRIu64, stream_fd,
+                       (uint64_t) be64toh(get_packet_info.offset));
+               goto error_put_fd;
        }
-       read_len = lttng_read(vstream->stream_fd->fd,
+       read_len = lttng_read(stream_fd,
                        reply + sizeof(reply_header),
                        packet_data_len);
        if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-                               vstream->stream_fd->fd,
-                               be64toh(get_packet_info.offset));
-               goto error;
+                               stream_fd,
+                               (uint64_t) be64toh(get_packet_info.offset));
+               goto error_put_fd;
        }
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
+error_put_fd:
+       stream_fd_put_fd(vstream->stream_fd);
 error:
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
@@ -1631,7 +1672,7 @@ end:
 static
 int viewer_get_metadata(struct relay_connection *conn)
 {
-       int ret = 0;
+       int ret = 0, stream_fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
        char *data = NULL;
@@ -1691,16 +1732,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                if (ret < 0) {
                        goto error;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening metadata file");
-                       goto error;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
+                       PERROR("Failed to open viewer stream file at %s", fullpath);
                        goto error;
                }
        }
@@ -1712,10 +1746,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               goto error_put_fd;
+       }
+       read_len = lttng_read(stream_fd, data, len);
        if (read_len < len) {
                PERROR("Relay reading metadata file");
-               goto error;
+               goto error_put_fd;
        }
        vstream->metadata_sent += read_len;
        if (vstream->metadata_sent == vstream->stream->metadata_received
@@ -1727,7 +1765,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
 
        goto send_reply;
-
+error_put_fd:
+       (void) stream_fd_put_fd(vstream->stream_fd);
 error:
        reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
@@ -1953,7 +1992,8 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 
        (void) lttng_poll_del(events, pollfd);
 
-       ret = close(pollfd);
+       ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
+                       fd_tracker_util_close_fd, NULL);
        if (ret < 0) {
                ERR("Closing pollfd %d", pollfd);
        }
This page took 0.028336 seconds and 5 git commands to generate.