Backport: relayd: replace lttng_index_file with relay_index_file
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index 4f14459d46a61545cd5c05ab75fd012a3de4cca2..9bb719ef607bc1d2c0777dda1a8f557bddd0afb9 100644 (file)
@@ -47,7 +47,6 @@
 #include <common/compat/endian.h>
 #include <common/defaults.h>
 #include <common/futex.h>
-#include <common/index/index.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/sessiond-comm/inet.h>
 #include <common/sessiond-comm/relayd.h>
@@ -67,6 +66,7 @@
 #include "ctf-trace.h"
 #include "connection.h"
 #include "viewer-session.h"
+#include "index-file.h"
 
 #define SESSION_BUF_DEFAULT_COUNT      16
 
@@ -433,14 +433,78 @@ int check_thread_quit_pipe(int fd, uint32_t events)
        return 0;
 }
 
+static
+int create_sock(void *data, int *out_fd)
+{
+       int ret;
+       struct lttcomm_sock *sock = data;
+
+       ret = lttcomm_create_sock(sock);
+       if (ret < 0) {
+               goto end;
+       }
+
+       *out_fd = sock->fd;
+end:
+       return ret;
+}
+
+static
+int close_sock(void *data, int *in_fd)
+{
+       struct lttcomm_sock *sock = data;
+
+       return sock->ops->close(sock);
+}
+
+static int accept_sock(void *data, int *out_fd)
+{
+       int ret = 0;
+       /* Socks is an array of in_sock, out_sock. */
+       struct lttcomm_sock **socks = data;
+       struct lttcomm_sock *in_sock = socks[0];
+
+        socks[1] = in_sock->ops->accept(in_sock);
+       if (!socks[1]) {
+               ret = -1;
+               goto end;
+       }
+       *out_fd = socks[1]->fd;
+end:
+       return ret;
+}
+
+static
+struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock,
+               const char *name)
+{
+       int out_fd, ret;
+       struct lttcomm_sock *socks[2] = { listening_sock, NULL };
+       struct lttcomm_sock *new_sock = NULL;
+
+        ret = fd_tracker_open_unsuspendable_fd(
+                       the_fd_tracker, &out_fd,
+                       (const char **) &name,
+                       1, accept_sock, &socks);
+       if (ret) {
+               goto end;
+       }
+       new_sock = socks[1];
+       DBG("%s accepted, socket %d", name, new_sock->fd);
+end:
+       return new_sock;
+}
+
 /*
  * Create and init socket from uri.
  */
 static
-struct lttcomm_sock *init_socket(struct lttng_uri *uri)
+struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name)
 {
-       int ret;
+       int ret, sock_fd;
        struct lttcomm_sock *sock = NULL;
+       char uri_str[PATH_MAX];
+       char *formated_name = NULL;
 
        sock = lttcomm_alloc_sock_from_uri(uri);
        if (sock == NULL) {
@@ -448,11 +512,25 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri)
                goto error;
        }
 
-       ret = lttcomm_create_sock(sock);
-       if (ret < 0) {
-               goto error;
+       /*
+        * Don't fail to create the socket if the name can't be built as it is
+        * only used for debugging purposes.
+        */
+       ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
+       uri_str[sizeof(uri_str) - 1] = '\0';
+       if (ret >= 0) {
+               ret = asprintf(&formated_name, "%s socket @ %s", name,
+                               uri_str);
+               if (ret < 0) {
+                       formated_name = NULL;
+               }
        }
-       DBG("Listening on sock %d for live", sock->fd);
+
+       ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
+                       (const char **) (formated_name ? &formated_name : NULL),
+                       1, create_sock, sock);
+       free(formated_name);
+       DBG("Listening on %s socket %d", name, sock->fd);
 
        ret = sock->ops->bind(sock);
        if (ret < 0) {
@@ -491,7 +569,7 @@ void *thread_listener(void *data)
 
        health_code_update();
 
-       live_control_sock = init_socket(live_uri);
+       live_control_sock = init_socket(live_uri, "Live listener");
        if (!live_control_sock) {
                goto error_sock_control;
        }
@@ -566,7 +644,8 @@ restart:
                                struct relay_connection *new_conn;
                                struct lttcomm_sock *newsock;
 
-                               newsock = live_control_sock->ops->accept(live_control_sock);
+                               newsock = accept_live_sock(live_control_sock,
+                                                       "Live socket to client");
                                if (!newsock) {
                                        PERROR("accepting control sock");
                                        goto error;
@@ -615,7 +694,9 @@ error_testpoint:
        (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
 error_create_poll:
        if (live_control_sock->fd >= 0) {
-               ret = live_control_sock->ops->close(live_control_sock);
+               ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+                               &live_control_sock->fd, 1, close_sock,
+                               live_control_sock);
                if (ret) {
                        PERROR("close");
                }
@@ -1150,7 +1231,7 @@ static int try_open_index(struct relay_viewer_stream *vstream,
                ret = -ENOENT;
                goto end;
        }
-       vstream->index_file = lttng_index_file_open(vstream->path_name,
+       vstream->index_file = relay_index_file_open(vstream->path_name,
                        vstream->channel_name,
                        vstream->stream->tracefile_count,
                        vstream->current_tracefile_id);
@@ -1374,12 +1455,8 @@ int viewer_get_next_index(struct relay_connection *conn)
                if (ret < 0) {
                        goto error_put;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening trace file");
-                       goto error_put;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
                        if (close(ret)) {
                                PERROR("close");
@@ -1396,10 +1473,9 @@ int viewer_get_next_index(struct relay_connection *conn)
                viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
        }
 
-       ret = lttng_index_file_read(vstream->index_file, &packet_index);
+       ret = relay_index_file_read(vstream->index_file, &packet_index);
        if (ret) {
-               ERR("Relay error reading index file %d",
-                               vstream->index_file->fd);
+               ERR("Relay error reading index file");
                viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
                goto send_reply;
        } else {
@@ -1480,7 +1556,7 @@ error_put:
 static
 int viewer_get_packet(struct relay_connection *conn)
 {
-       int ret;
+       int ret, stream_fd;
        off_t lseek_ret;
        char *reply = NULL;
        struct lttng_viewer_get_packet get_packet_info;
@@ -1523,26 +1599,32 @@ int viewer_get_packet(struct relay_connection *conn)
        }
 
        pthread_mutex_lock(&vstream->stream->lock);
-       lseek_ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset),
-                       SEEK_SET);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               ERR("Failed to get viewer stream file descriptor");
+               goto error_put_fd;
+       }
+       lseek_ret = lseek(stream_fd, be64toh(get_packet_info.offset), SEEK_SET);
        if (lseek_ret < 0) {
-               PERROR("lseek fd %d to offset %" PRIu64, vstream->stream_fd->fd,
+               PERROR("lseek fd %d to offset %" PRIu64, stream_fd,
                        (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
-       read_len = lttng_read(vstream->stream_fd->fd,
+       read_len = lttng_read(stream_fd,
                        reply + sizeof(reply_header),
                        packet_data_len);
        if (read_len < packet_data_len) {
                PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
-                               vstream->stream_fd->fd,
+                               stream_fd,
                                (uint64_t) be64toh(get_packet_info.offset));
-               goto error;
+               goto error_put_fd;
        }
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK);
        reply_header.len = htobe32(packet_data_len);
        goto send_reply;
 
+error_put_fd:
+       stream_fd_put_fd(vstream->stream_fd);
 error:
        reply_header.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR);
 
@@ -1589,7 +1671,7 @@ end:
 static
 int viewer_get_metadata(struct relay_connection *conn)
 {
-       int ret = 0;
+       int ret = 0, stream_fd = -1;
        ssize_t read_len;
        uint64_t len = 0;
        char *data = NULL;
@@ -1649,16 +1731,9 @@ int viewer_get_metadata(struct relay_connection *conn)
                if (ret < 0) {
                        goto error;
                }
-               ret = open(fullpath, O_RDONLY);
-               if (ret < 0) {
-                       PERROR("Relay opening metadata file");
-                       goto error;
-               }
-               vstream->stream_fd = stream_fd_create(ret);
+               vstream->stream_fd = stream_fd_open(fullpath);
                if (!vstream->stream_fd) {
-                       if (close(ret)) {
-                               PERROR("close");
-                       }
+                       PERROR("Failed to open viewer stream file at %s", fullpath);
                        goto error;
                }
        }
@@ -1670,10 +1745,14 @@ int viewer_get_metadata(struct relay_connection *conn)
                goto error;
        }
 
-       read_len = lttng_read(vstream->stream_fd->fd, data, len);
+       stream_fd = stream_fd_get_fd(vstream->stream_fd);
+       if (stream_fd < 0) {
+               goto error_put_fd;
+       }
+       read_len = lttng_read(stream_fd, data, len);
        if (read_len < len) {
                PERROR("Relay reading metadata file");
-               goto error;
+               goto error_put_fd;
        }
        vstream->metadata_sent += read_len;
        if (vstream->metadata_sent == vstream->stream->metadata_received
@@ -1685,7 +1764,8 @@ int viewer_get_metadata(struct relay_connection *conn)
        reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);
 
        goto send_reply;
-
+error_put_fd:
+       (void) stream_fd_put_fd(vstream->stream_fd);
 error:
        reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
 
@@ -1911,7 +1991,8 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
 
        (void) lttng_poll_del(events, pollfd);
 
-       ret = close(pollfd);
+       ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
+                       fd_tracker_util_close_fd, NULL);
        if (ret < 0) {
                ERR("Closing pollfd %d", pollfd);
        }
This page took 0.030619 seconds and 5 git commands to generate.