UST periodical metadata flush
[lttng-tools.git] / src / bin / lttng-sessiond / main.c
index d88bafeb6e9255febe166687c8962c42ff6e69ab..15bb7255a882dab71abe546e9d8e95bc631f92ea 100644 (file)
@@ -25,6 +25,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <inttypes.h>
 #include <sys/mman.h>
 #include <sys/mount.h>
 #include <sys/resource.h>
@@ -89,6 +90,7 @@ static struct consumer_data kconsumer_data = {
        .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .metadata_sock.fd = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -100,6 +102,7 @@ static struct consumer_data ustconsumer64_data = {
        .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .metadata_sock.fd = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -111,6 +114,7 @@ static struct consumer_data ustconsumer32_data = {
        .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH,
        .err_sock = -1,
        .cmd_sock = -1,
+       .metadata_sock.fd = -1,
        .pid_mutex = PTHREAD_MUTEX_INITIALIZER,
        .lock = PTHREAD_MUTEX_INITIALIZER,
        .cond = PTHREAD_COND_INITIALIZER,
@@ -865,10 +869,10 @@ static void *thread_manage_consumer(void *data)
        health_code_update();
 
        /*
-        * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock.
-        * Nothing more will be added to this poll set.
+        * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the
+        * metadata_sock. Nothing more will be added to this poll set.
         */
-       ret = sessiond_set_thread_pollset(&events, 2);
+       ret = sessiond_set_thread_pollset(&events, 3);
        if (ret < 0) {
                goto error_poll;
        }
@@ -885,7 +889,7 @@ static void *thread_manage_consumer(void *data)
 
        health_code_update();
 
-       /* Inifinite blocking call, waiting for transmission */
+       /* Infinite blocking call, waiting for transmission */
 restart:
        health_poll_entry();
 
@@ -955,87 +959,126 @@ restart:
        health_code_update();
 
        if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) {
+               /* Connect both socket, command and metadata. */
                consumer_data->cmd_sock =
                        lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
-               if (consumer_data->cmd_sock < 0) {
+               consumer_data->metadata_sock.fd =
+                       lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
+               if (consumer_data->cmd_sock < 0 ||
+                               consumer_data->metadata_sock.fd < 0) {
+                       PERROR("consumer connect cmd socket");
                        /* On error, signal condition and quit. */
                        signal_consumer_condition(consumer_data, -1);
-                       PERROR("consumer connect");
                        goto error;
                }
+               /* Create metadata socket lock. */
+               consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t));
+               if (consumer_data->metadata_sock.lock == NULL) {
+                       PERROR("zmalloc pthread mutex");
+                       ret = -1;
+                       goto error;
+               }
+               pthread_mutex_init(consumer_data->metadata_sock.lock, NULL);
+
                signal_consumer_condition(consumer_data, 1);
-               DBG("Consumer command socket ready");
+               DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock);
+               DBG("Consumer metadata socket ready (fd: %d)",
+                               consumer_data->metadata_sock.fd);
        } else {
                ERR("consumer error when waiting for SOCK_READY : %s",
                                lttcomm_get_readable_code(-code));
                goto error;
        }
 
-       /* Remove the kconsumerd error sock since we've established a connexion */
+       /* Remove the consumerd error sock since we've established a connexion */
        ret = lttng_poll_del(&events, consumer_data->err_sock);
        if (ret < 0) {
                goto error;
        }
 
+       /* Add new accepted error socket. */
        ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP);
        if (ret < 0) {
                goto error;
        }
 
+       /* Add metadata socket that is successfully connected. */
+       ret = lttng_poll_add(&events, consumer_data->metadata_sock.fd,
+                       LPOLLIN | LPOLLRDHUP);
+       if (ret < 0) {
+               goto error;
+       }
+
        health_code_update();
 
-       /* Inifinite blocking call, waiting for transmission */
+       /* Infinite blocking call, waiting for transmission */
 restart_poll:
-       health_poll_entry();
-       ret = lttng_poll_wait(&events, -1);
-       health_poll_exit();
-       if (ret < 0) {
-               /*
-                * Restart interrupted system call.
-                */
-               if (errno == EINTR) {
-                       goto restart_poll;
+       while (1) {
+               health_poll_entry();
+               ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
+               if (ret < 0) {
+                       /*
+                        * Restart interrupted system call.
+                        */
+                       if (errno == EINTR) {
+                               goto restart_poll;
+                       }
+                       goto error;
                }
-               goto error;
-       }
 
-       nb_fd = ret;
+               nb_fd = ret;
 
-       for (i = 0; i < nb_fd; i++) {
-               /* Fetch once the poll data */
-               revents = LTTNG_POLL_GETEV(&events, i);
-               pollfd = LTTNG_POLL_GETFD(&events, i);
+               for (i = 0; i < nb_fd; i++) {
+                       /* Fetch once the poll data */
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
 
-               health_code_update();
+                       health_code_update();
 
-               /* Thread quit pipe has been closed. Killing thread. */
-               ret = sessiond_check_thread_quit_pipe(pollfd, revents);
-               if (ret) {
-                       err = 0;
-                       goto exit;
-               }
+                       /* Thread quit pipe has been closed. Killing thread. */
+                       ret = sessiond_check_thread_quit_pipe(pollfd, revents);
+                       if (ret) {
+                               err = 0;
+                               goto exit;
+                       }
 
-               /* Event on the kconsumerd socket */
-               if (pollfd == sock) {
-                       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               ERR("consumer err socket second poll error");
+                       if (pollfd == sock) {
+                               /* Event on the consumerd socket */
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("consumer err socket second poll error");
+                                       goto error;
+                               }
+                               health_code_update();
+                               /* Wait for any kconsumerd error */
+                               ret = lttcomm_recv_unix_sock(sock, &code,
+                                               sizeof(enum lttcomm_return_code));
+                               if (ret <= 0) {
+                                       ERR("consumer closed the command socket");
+                                       goto error;
+                               }
+
+                               ERR("consumer return code : %s",
+                                               lttcomm_get_readable_code(-code));
+
+                               goto exit;
+                       } else if (pollfd == consumer_data->metadata_sock.fd) {
+                               /* UST metadata requests */
+                               ret = ust_consumer_metadata_request(
+                                               &consumer_data->metadata_sock);
+                               if (ret < 0) {
+                                       ERR("Handling metadata request");
+                                       goto error;
+                               }
+                               break;
+                       } else {
+                               ERR("Unknown pollfd");
                                goto error;
                        }
                }
+               health_code_update();
        }
 
-       health_code_update();
-
-       /* Wait for any kconsumerd error */
-       ret = lttcomm_recv_unix_sock(sock, &code,
-                       sizeof(enum lttcomm_return_code));
-       if (ret <= 0) {
-               ERR("consumer closed the command socket");
-               goto error;
-       }
-
-       ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
-
 exit:
 error:
        /* Immediately set the consumerd state to stopped */
@@ -1061,6 +1104,16 @@ error:
                        PERROR("close");
                }
        }
+       if (consumer_data->metadata_sock.fd >= 0) {
+               ret = close(consumer_data->metadata_sock.fd);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+       /* Cleanup metadata socket mutex. */
+       pthread_mutex_destroy(consumer_data->metadata_sock.lock);
+       free(consumer_data->metadata_sock.lock);
+
        if (sock >= 0) {
                ret = close(sock);
                if (ret) {
@@ -2011,7 +2064,7 @@ end:
        return 0;
 
 error:
-       /* Cleanup already created socket on error. */
+       /* Cleanup already created sockets on error. */
        if (consumer_data->err_sock >= 0) {
                int err;
 
This page took 0.02761 seconds and 5 git commands to generate.