relayd: use same pipe for live and main
[lttng-tools.git] / src / bin / lttng-relayd / live.c
index c312e78e976e89bb6949e0aee54cf48e2bdca99c..bfa7b85259ad7d3d2a75fdba5dfb416feccfe13c 100644 (file)
 
 static struct lttng_uri *live_uri;
 
-/*
- * Quit pipe for all threads. This permits a single cancellation point
- * for all threads when receiving an event on the pipe.
- */
-static int live_thread_quit_pipe[2] = { -1, -1 };
-
 /*
  * This pipe is used to inform the worker thread that a command is queued and
  * ready to be processed.
@@ -126,7 +120,7 @@ void stop_threads(void)
 
        /* Stopping all threads */
        DBG("Terminating all live threads");
-       ret = notify_thread_pipe(live_thread_quit_pipe[1]);
+       ret = notify_thread_pipe(thread_quit_pipe[1]);
        if (ret < 0) {
                ERR("write error on thread quit pipe");
        }
@@ -155,7 +149,7 @@ int create_thread_poll_set(struct lttng_poll_event *events, int size)
        }
 
        /* Add quit pipe */
-       ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN);
+       ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
        if (ret < 0) {
                goto error;
        }
@@ -174,7 +168,7 @@ error:
 static
 int check_thread_quit_pipe(int fd, uint32_t events)
 {
-       if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) {
+       if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
                return 1;
        }
 
@@ -245,9 +239,7 @@ void *thread_listener(void *data)
                goto error_sock_control;
        }
 
-       /*
-        * Pass 3 as size here for the thread quit pipe, control and data socket.
-        */
+       /* Pass 2 as size here for the thread quit pipe and control sockets. */
        ret = create_thread_poll_set(&events, 2);
        if (ret < 0) {
                goto error_create_poll;
@@ -259,6 +251,8 @@ void *thread_listener(void *data)
                goto error_poll_add;
        }
 
+       lttng_relay_notify_ready();
+
        while (1) {
                health_code_update();
 
@@ -854,7 +848,8 @@ static
 int viewer_attach_session(struct relay_command *cmd,
                struct lttng_ht *sessions_ht)
 {
-       int ret, send_streams = 0, nb_streams = 0;
+       int ret, send_streams = 0;
+       uint32_t nb_streams = 0, nb_streams_ready = 0;
        struct lttng_viewer_attach_session_request request;
        struct lttng_viewer_attach_session_response response;
        struct lttng_viewer_stream send_stream;
@@ -961,14 +956,16 @@ int viewer_attach_session(struct relay_command *cmd,
                        if (stream->session != cmd->session) {
                                continue;
                        }
+                       nb_streams++;
 
                        /*
-                        * Don't send streams with no ctf_trace, they are not ready to be
-                        * read.
+                        * Don't send streams with no ctf_trace, they are not
+                        * ready to be read.
                         */
-                       if (!stream->ctf_trace) {
+                       if (!stream->ctf_trace || !stream->viewer_ready) {
                                continue;
                        }
+                       nb_streams_ready++;
 
                        vstream = live_find_viewer_stream_by_id(stream->stream_handle);
                        if (!vstream) {
@@ -977,7 +974,11 @@ int viewer_attach_session(struct relay_command *cmd,
                                        goto end_unlock;
                                }
                        }
-                       nb_streams++;
+               }
+
+               /* We must have the same amount of existing stream and ready stream. */
+               if (nb_streams != nb_streams_ready) {
+                       nb_streams = 0;
                }
                response.streams_count = htobe32(nb_streams);
        }
@@ -1113,7 +1114,8 @@ void destroy_viewer_stream(struct relay_viewer_stream *vstream)
         * we need to remove it because we won't detect a EOF for this
         * stream.
         */
-       if (ret_ref == 1 && vstream->ctf_trace->metadata_stream) {
+       if (ret_ref == 1 && vstream->ctf_trace->viewer_metadata_stream) {
+               delete_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
                destroy_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
                vstream->ctf_trace->metadata_stream = NULL;
                DBG("Freeing ctf_trace %" PRIu64, vstream->ctf_trace->id);
@@ -2036,7 +2038,7 @@ error:
  * main
  */
 int live_start_threads(struct lttng_uri *uri,
-               struct relay_local_data *relay_ctx, int quit_pipe[2])
+               struct relay_local_data *relay_ctx)
 {
        int ret = 0;
        void *status;
@@ -2045,9 +2047,6 @@ int live_start_threads(struct lttng_uri *uri,
        assert(uri);
        live_uri = uri;
 
-       live_thread_quit_pipe[0] = quit_pipe[0];
-       live_thread_quit_pipe[1] = quit_pipe[1];
-
        /* Check if daemon is UID = 0 */
        is_root = !getuid();
 
This page took 0.029818 seconds and 5 git commands to generate.