Backport: relayd: replace lttng_index_file with relay_index_file
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index f14b907cea778fdb546fefa7c23ab9a52a002e63..9bb719ef607bc1d2c0777dda1a8f557bddd0afb9 100644 (file)
@@ -47,7 +47,6 @@
 #include <common/compat/endian.h>
 #include <common/defaults.h>
 #include <common/futex.h>
-#include <common/index/index.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/sessiond-comm/inet.h>
 #include <common/sessiond-comm/relayd.h>
@@ -67,6 +66,7 @@
 #include "ctf-trace.h"
 #include "connection.h"
 #include "viewer-session.h"
+#include "index-file.h"
 
 #define SESSION_BUF_DEFAULT_COUNT      16
 
@@ -457,6 +457,44 @@ int close_sock(void *data, int *in_fd)
        return sock->ops->close(sock);
 }
 
+static int accept_sock(void *data, int *out_fd)
+{
+       int ret = 0;
+       /* Socks is an array of in_sock, out_sock. */
+       struct lttcomm_sock **socks = data;
+       struct lttcomm_sock *in_sock = socks[0];
+
+        socks[1] = in_sock->ops->accept(in_sock);
+       if (!socks[1]) {
+               ret = -1;
+               goto end;
+       }
+       *out_fd = socks[1]->fd;
+end:
+       return ret;
+}
+
+static
+struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
+               const char *name)
+{
+       int out_fd, ret;
+       struct lttcomm_sock *socks[2] = { listening_sock, NULL };
+       struct lttcomm_sock *new_sock = NULL;
+
+        ret = fd_tracker_open_unsuspendable_fd(
+                       the_fd_tracker, &out_fd,
+                       (const char **) &name,
+                       1, accept_sock, &socks);
+       if (ret) {
+               goto end;
+       }
+       new_sock = socks[1];
+       DBG("%s accepted, socket %d", name, new_sock->fd);
+end:
+       return new_sock;
+}
+
 /*
  * Create and init socket from uri.
  */
@@ -606,7 +644,8 @@ restart:
                                struct relay_connection *new_conn;
                                struct lttcomm_sock *newsock;
 
-                               newsock = live_control_sock->ops->accept(live_control_sock);
+                               newsock = accept_live_sock(live_control_sock,
+                                                       "Live socket to client");
                                if (!newsock) {
                                        PERROR("accepting control sock");
                                        goto error;
@@ -1192,7 +1231,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                ret = -ENOENT;
                goto end;
        }
-       vstream->index_file = lttng_index_file_open(vstream->path_name,
+       vstream->index_file = relay_index_file_open(vstream->path_name,
                        vstream->channel_name,
                        vstream->stream->tracefile_count,
                        vstream->current_tracefile_id);
@@ -1416,12 +1455,8 @@ int viewer_get_next_index(struct relay_connection *conn)
                if (ret < 0) {
                        goto error_put;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening trace file");
-                       goto error_put;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
                        if (close(ret)) {
                                PERROR("close");
@@ -1438,10 +1473,9 @@ int viewer_get_next_index(struct relay_connection *conn)
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
        }
 
-       ret = lttng_index_file_read(vstream->index_file, &packet_index);
+       ret = relay_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
-               ERR("Relay error reading index file %d",
-                               vstream->index_file->fd);
+               ERR("Relay error reading index file");
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        } else {
@@ -1522,7 +1556,7 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret;
+       int ret, stream_fd;
        off_t lseek_ret;
        char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
@@ -1565,26 +1599,32 @@ int viewer_get_packet(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
-                       SEEK_SET);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               ERR("Failed to get viewer stream file descriptor");
+               goto error_put_fd;
+       }
+       lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
-               PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
+               PERROR("lseek fd %d to offset %" PRIu64, stream_fd,
                        (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
-       read_len = lttng_read(vstream->stream_fd->fd,
+       read_len = lttng_read(stream_fd,
                        reply + sizeof(reply_header),
                        packet_data_len);
        if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-                               vstream->stream_fd->fd,
+                               stream_fd,
                                (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
+error_put_fd:
+       stream_fd_put_fd(vstream->stream_fd);
 error:
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
@@ -1631,7 +1671,7 @@ end:
 static
 int viewer_get_metadata(struct relay_connection *conn)
 {
-       int ret = 0;
+       int ret = 0, stream_fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
        char *data = NULL;
@@ -1691,16 +1731,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                if (ret < 0) {
                        goto error;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening metadata file");
-                       goto error;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
+                       PERROR("Failed to open viewer stream file at %s", fullpath);
                        goto error;
                }
        }
@@ -1712,10 +1745,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               goto error_put_fd;
+       }
+       read_len = lttng_read(stream_fd, data, len);
        if (read_len < len) {
                PERROR("Relay reading metadata file");
-               goto error;
+               goto error_put_fd;
        }
        vstream->metadata_sent += read_len;
        if (vstream->metadata_sent == vstream->stream->metadata_received
@@ -1727,7 +1764,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
 
        goto send_reply;
-
+error_put_fd:
+       (void) stream_fd_put_fd(vstream->stream_fd);
 error:
        reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
@@ -1953,7 +1991,8 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 
        (void) lttng_poll_del(events, pollfd);
 
-       ret = close(pollfd);
+       ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
+                       fd_tracker_util_close_fd, NULL);
        if (ret < 0) {
                ERR("Closing pollfd %d", pollfd);
        }
This page took 0.026669 seconds and 5 git commands to generate.