Fix: relayd metadata size
[lttng-tools.git] / src / bin / lttng-sessiond / main.c
index a15721d76ae529b0fa45bfcfebec8b481931bdee..f6d560c3325fe376a74c084b29b9d195396b1b62 100644 (file)
@@ -44,6 +44,7 @@
 #include <common/kernel-consumer/kernel-consumer.h>
 #include <common/futex.h>
 #include <common/relayd/relayd.h>
+#include <common/utils.h>
 
 #include "lttng-sessiond.h"
 #include "channel.h"
@@ -58,6 +59,8 @@
 #include "ust-consumer.h"
 #include "utils.h"
 #include "fd-limit.h"
+#include "filter.h"
+#include "health.h"
 
 #define CONSUMERD_FILE "lttng-consumerd"
 
@@ -100,6 +103,7 @@ static struct consumer_data ustconsumer32_data = {
        .cmd_sock = -1,
 };
 
+/* Shared between threads */
 static int dispatch_thread_exit;
 
 /* Global application Unix socket path */
@@ -108,6 +112,8 @@ static char apps_unix_sock_path[PATH_MAX];
 static char client_unix_sock_path[PATH_MAX];
 /* global wait shm path for UST */
 static char wait_shm_path[PATH_MAX];
+/* Global health check unix path */
+static char health_unix_sock_path[PATH_MAX];
 
 /* Sockets and FDs */
 static int client_sock = -1;
@@ -133,6 +139,7 @@ static pthread_t reg_apps_thread;
 static pthread_t client_thread;
 static pthread_t kernel_thread;
 static pthread_t dispatch_thread;
+static pthread_t health_thread;
 
 /*
  * UST registration command queue. This queue is tied with a futex and uses a N
@@ -207,6 +214,12 @@ static enum consumerd_state kernel_consumerd_state;
  */
 static unsigned int relayd_net_seq_idx;
 
+/* Used for the health monitoring of the session daemon. See health.h */
+struct health_state health_thread_cmd;
+struct health_state health_thread_app_manage;
+struct health_state health_thread_app_reg;
+struct health_state health_thread_kernel;
+
 static
 void setup_consumerd_path(void)
 {
@@ -409,7 +422,7 @@ static void stop_threads(void)
        }
 
        /* Dispatch thread */
-       dispatch_thread_exit = 1;
+       CMM_STORE_SHARED(dispatch_thread_exit, 1);
        futex_nto1_wake(&ust_cmd_queue.futex);
 }
 
@@ -704,13 +717,15 @@ static void update_ust_app(int app_sock)
  */
 static void *thread_manage_kernel(void *data)
 {
-       int ret, i, pollfd, update_poll_flag = 1;
+       int ret, i, pollfd, update_poll_flag = 1, err = -1;
        uint32_t revents, nb_fd;
        char tmp;
        struct lttng_poll_event events;
 
        DBG("Thread manage kernel started");
 
+       health_code_update(&health_thread_kernel);
+
        ret = create_thread_poll_set(&events, 2);
        if (ret < 0) {
                goto error_poll_create;
@@ -722,6 +737,8 @@ static void *thread_manage_kernel(void *data)
        }
 
        while (1) {
+               health_code_update(&health_thread_kernel);
+
                if (update_poll_flag == 1) {
                        /*
                         * Reset number of fd in the poll set. Always 2 since there is the thread
@@ -745,7 +762,9 @@ static void *thread_manage_kernel(void *data)
 
                /* Poll infinite value of time */
        restart:
+               health_poll_update(&health_thread_kernel);
                ret = lttng_poll_wait(&events, -1);
+               health_poll_update(&health_thread_kernel);
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -766,10 +785,13 @@ static void *thread_manage_kernel(void *data)
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update(&health_thread_kernel);
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
-                               goto error;
+                               err = 0;
+                               goto exit;
                        }
 
                        /* Check for data on kernel pipe */
@@ -797,9 +819,15 @@ static void *thread_manage_kernel(void *data)
                }
        }
 
+exit:
 error:
        lttng_poll_clean(&events);
 error_poll_create:
+       if (err) {
+               health_error(&health_thread_kernel);
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_exit(&health_thread_kernel);
        DBG("Kernel thread dying");
        return NULL;
 }
@@ -809,7 +837,7 @@ error_poll_create:
  */
 static void *thread_manage_consumer(void *data)
 {
-       int sock = -1, i, ret, pollfd;
+       int sock = -1, i, ret, pollfd, err = -1;
        uint32_t revents, nb_fd;
        enum lttcomm_return_code code;
        struct lttng_poll_event events;
@@ -817,6 +845,8 @@ static void *thread_manage_consumer(void *data)
 
        DBG("[thread] Manage consumer started");
 
+       health_code_update(&consumer_data->health);
+
        ret = lttcomm_listen_unix_sock(consumer_data->err_sock);
        if (ret < 0) {
                goto error_listen;
@@ -838,9 +868,13 @@ static void *thread_manage_consumer(void *data)
 
        nb_fd = LTTNG_POLL_GETNB(&events);
 
+       health_code_update(&consumer_data->health);
+
        /* Inifinite blocking call, waiting for transmission */
 restart:
+       health_poll_update(&consumer_data->health);
        ret = lttng_poll_wait(&events, -1);
+       health_poll_update(&consumer_data->health);
        if (ret < 0) {
                /*
                 * Restart interrupted system call.
@@ -856,10 +890,13 @@ restart:
                revents = LTTNG_POLL_GETEV(&events, i);
                pollfd = LTTNG_POLL_GETFD(&events, i);
 
+               health_code_update(&consumer_data->health);
+
                /* Thread quit pipe has been closed. Killing thread. */
                ret = check_thread_quit_pipe(pollfd, revents);
                if (ret) {
-                       goto error;
+                       err = 0;
+                       goto exit;
                }
 
                /* Event on the registration socket */
@@ -876,6 +913,8 @@ restart:
                goto error;
        }
 
+       health_code_update(&consumer_data->health);
+
        DBG2("Receiving code from consumer err_sock");
 
        /* Getting status code from kconsumerd */
@@ -885,6 +924,8 @@ restart:
                goto error;
        }
 
+       health_code_update(&consumer_data->health);
+
        if (code == CONSUMERD_COMMAND_SOCK_READY) {
                consumer_data->cmd_sock =
                        lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path);
@@ -913,12 +954,16 @@ restart:
                goto error;
        }
 
+       health_code_update(&consumer_data->health);
+
        /* Update number of fd */
        nb_fd = LTTNG_POLL_GETNB(&events);
 
        /* Inifinite blocking call, waiting for transmission */
 restart_poll:
+       health_poll_update(&consumer_data->health);
        ret = lttng_poll_wait(&events, -1);
+       health_poll_update(&consumer_data->health);
        if (ret < 0) {
                /*
                 * Restart interrupted system call.
@@ -934,10 +979,13 @@ restart_poll:
                revents = LTTNG_POLL_GETEV(&events, i);
                pollfd = LTTNG_POLL_GETFD(&events, i);
 
+               health_code_update(&consumer_data->health);
+
                /* Thread quit pipe has been closed. Killing thread. */
                ret = check_thread_quit_pipe(pollfd, revents);
                if (ret) {
-                       goto error;
+                       err = 0;
+                       goto exit;
                }
 
                /* Event on the kconsumerd socket */
@@ -949,6 +997,8 @@ restart_poll:
                }
        }
 
+       health_code_update(&consumer_data->health);
+
        /* Wait for any kconsumerd error */
        ret = lttcomm_recv_unix_sock(sock, &code,
                        sizeof(enum lttcomm_return_code));
@@ -959,6 +1009,7 @@ restart_poll:
 
        ERR("consumer return code : %s", lttcomm_get_readable_code(-code));
 
+exit:
 error:
        /* Immediately set the consumerd state to stopped */
        if (consumer_data->type == LTTNG_CONSUMER_KERNEL) {
@@ -997,6 +1048,11 @@ error:
        lttng_poll_clean(&events);
 error_poll:
 error_listen:
+       if (err) {
+               health_error(&consumer_data->health);
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_exit(&consumer_data->health);
        DBG("consumer thread cleanup completed");
 
        return NULL;
@@ -1007,7 +1063,7 @@ error_listen:
  */
 static void *thread_manage_apps(void *data)
 {
-       int i, ret, pollfd;
+       int i, ret, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct ust_command ust_cmd;
        struct lttng_poll_event events;
@@ -1017,6 +1073,8 @@ static void *thread_manage_apps(void *data)
        rcu_register_thread();
        rcu_thread_online();
 
+       health_code_update(&health_thread_app_manage);
+
        ret = create_thread_poll_set(&events, 2);
        if (ret < 0) {
                goto error_poll_create;
@@ -1027,6 +1085,8 @@ static void *thread_manage_apps(void *data)
                goto error;
        }
 
+       health_code_update(&health_thread_app_manage);
+
        while (1) {
                /* Zeroed the events structure */
                lttng_poll_reset(&events);
@@ -1037,7 +1097,9 @@ static void *thread_manage_apps(void *data)
 
                /* Inifinite blocking call, waiting for transmission */
        restart:
+               health_poll_update(&health_thread_app_manage);
                ret = lttng_poll_wait(&events, -1);
+               health_poll_update(&health_thread_app_manage);
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -1053,10 +1115,13 @@ static void *thread_manage_apps(void *data)
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update(&health_thread_app_manage);
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
-                               goto error;
+                               err = 0;
+                               goto exit;
                        }
 
                        /* Inspect the apps cmd pipe */
@@ -1072,6 +1137,8 @@ static void *thread_manage_apps(void *data)
                                                goto error;
                                        }
 
+                                       health_code_update(&health_thread_app_manage);
+
                                        /* Register applicaton to the session daemon */
                                        ret = ust_app_register(&ust_cmd.reg_msg,
                                                        ust_cmd.sock);
@@ -1081,6 +1148,8 @@ static void *thread_manage_apps(void *data)
                                                break;
                                        }
 
+                                       health_code_update(&health_thread_app_manage);
+
                                        /*
                                         * Validate UST version compatibility.
                                         */
@@ -1093,6 +1162,8 @@ static void *thread_manage_apps(void *data)
                                                update_ust_app(ust_cmd.sock);
                                        }
 
+                                       health_code_update(&health_thread_app_manage);
+
                                        ret = ust_app_register_done(ust_cmd.sock);
                                        if (ret < 0) {
                                                /*
@@ -1116,6 +1187,8 @@ static void *thread_manage_apps(void *data)
                                                                ust_cmd.sock);
                                        }
 
+                                       health_code_update(&health_thread_app_manage);
+
                                        break;
                                }
                        } else {
@@ -1135,12 +1208,20 @@ static void *thread_manage_apps(void *data)
                                        break;
                                }
                        }
+
+                       health_code_update(&health_thread_app_manage);
                }
        }
 
+exit:
 error:
        lttng_poll_clean(&events);
 error_poll_create:
+       if (err) {
+               health_error(&health_thread_app_manage);
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_exit(&health_thread_app_manage);
        DBG("Application communication apps thread cleanup complete");
        rcu_thread_offline();
        rcu_unregister_thread();
@@ -1159,7 +1240,7 @@ static void *thread_dispatch_ust_registration(void *data)
 
        DBG("[thread] Dispatch UST command started");
 
-       while (!dispatch_thread_exit) {
+       while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
                /* Atomically prepare the queue futex */
                futex_nto1_prepare(&ust_cmd_queue.futex);
 
@@ -1216,7 +1297,7 @@ error:
  */
 static void *thread_registration_apps(void *data)
 {
-       int sock = -1, i, ret, pollfd;
+       int sock = -1, i, ret, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_poll_event events;
        /*
@@ -1262,7 +1343,9 @@ static void *thread_registration_apps(void *data)
 
                /* Inifinite blocking call, waiting for transmission */
        restart:
+               health_poll_update(&health_thread_app_reg);
                ret = lttng_poll_wait(&events, -1);
+               health_poll_update(&health_thread_app_reg);
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -1274,6 +1357,8 @@ static void *thread_registration_apps(void *data)
                }
 
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update(&health_thread_app_reg);
+
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
@@ -1281,7 +1366,8 @@ static void *thread_registration_apps(void *data)
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
-                               goto error;
+                               err = 0;
+                               goto exit;
                        }
 
                        /* Event on the registration socket */
@@ -1317,6 +1403,7 @@ static void *thread_registration_apps(void *data)
                                                sock = -1;
                                                continue;
                                        }
+                                       health_code_update(&health_thread_app_reg);
                                        ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg,
                                                        sizeof(struct ust_register_msg));
                                        if (ret < 0 || ret < sizeof(struct ust_register_msg)) {
@@ -1334,6 +1421,7 @@ static void *thread_registration_apps(void *data)
                                                sock = -1;
                                                continue;
                                        }
+                                       health_code_update(&health_thread_app_reg);
 
                                        ust_cmd->sock = sock;
                                        sock = -1;
@@ -1361,7 +1449,14 @@ static void *thread_registration_apps(void *data)
                }
        }
 
+exit:
 error:
+       if (err) {
+               health_error(&health_thread_app_reg);
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_exit(&health_thread_app_reg);
+
        /* Notify that the registration thread is gone */
        notify_ust_apps(0);
 
@@ -1680,6 +1775,23 @@ error:
        return ret;
 }
 
+/*
+ * Compute health status of each consumer. If one of them is zero (bad
+ * state), we return 0.
+ */
+static int check_consumer_health(void)
+{
+       int ret;
+
+       ret = health_check_state(&kconsumer_data.health) &&
+               health_check_state(&ustconsumer32_data.health) &&
+               health_check_state(&ustconsumer64_data.health);
+
+       DBG3("Health consumer check %d", ret);
+
+       return ret;
+}
+
 /*
  * Check version of the lttng-modules.
  */
@@ -1880,25 +1992,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session,
                session->net_handle = 1;
        }
 
-       switch (domain) {
-       case LTTNG_DOMAIN_KERNEL:
-               /* Send relayd socket to consumer. */
-               ret = kernel_consumer_send_relayd_socket(consumer_fd, sock,
-                               consumer, relayd_uri->stype);
-               if (ret < 0) {
-                       ret = LTTCOMM_ENABLE_CONSUMER_FAIL;
-                       goto close_sock;
-               }
-               break;
-       case LTTNG_DOMAIN_UST:
-               /* Send relayd socket to consumer. */
-               ret = ust_consumer_send_relayd_socket(consumer_fd, sock,
-                               consumer, relayd_uri->stype);
-               if (ret < 0) {
-                       ret = LTTCOMM_ENABLE_CONSUMER_FAIL;
-                       goto close_sock;
-               }
-               break;
+       /* Send relayd socket to consumer. */
+       ret = consumer_send_relayd_socket(consumer_fd, sock,
+                       consumer, relayd_uri->stype);
+       if (ret < 0) {
+               ret = LTTCOMM_ENABLE_CONSUMER_FAIL;
+               goto close_sock;
        }
 
        ret = LTTCOMM_OK;
@@ -2349,6 +2448,9 @@ static int list_lttng_ust_global_events(char *channel_name,
                        tmp[i].loglevel_type = LTTNG_EVENT_LOGLEVEL_SINGLE;
                        break;
                }
+               if (uevent->filter) {
+                       tmp[i].filter = 1;
+               }
                i++;
        }
 
@@ -2731,6 +2833,46 @@ error:
        return ret;
 }
 
+/*
+ * Command LTTNG_SET_FILTER processed by the client thread.
+ */
+static int cmd_set_filter(struct ltt_session *session, int domain,
+               char *channel_name, char *event_name,
+               struct lttng_filter_bytecode *bytecode)
+{
+       int ret;
+
+       switch (domain) {
+       case LTTNG_DOMAIN_KERNEL:
+               ret = LTTCOMM_FATAL;
+               break;
+       case LTTNG_DOMAIN_UST:
+       {
+               struct ltt_ust_session *usess = session->ust_session;
+
+               ret = filter_ust_set(usess, domain, bytecode, event_name, channel_name);
+               if (ret != LTTCOMM_OK) {
+                       goto error;
+               }
+               break;
+       }
+#if 0
+       case LTTNG_DOMAIN_UST_EXEC_NAME:
+       case LTTNG_DOMAIN_UST_PID:
+       case LTTNG_DOMAIN_UST_PID_FOLLOW_CHILDREN:
+#endif
+       default:
+               ret = LTTCOMM_UND;
+               goto error;
+       }
+
+       ret = LTTCOMM_OK;
+
+error:
+       return ret;
+
+}
+
 /*
  * Command LTTNG_ENABLE_EVENT processed by the client thread.
  */
@@ -3928,8 +4070,11 @@ error:
  * is set and ready for transmission before returning.
  *
  * Return any error encountered or 0 for success.
+ *
+ * "sock" is only used for special-case var. len data.
  */
-static int process_client_msg(struct command_ctx *cmd_ctx)
+static int process_client_msg(struct command_ctx *cmd_ctx, int sock,
+               int *sock_error)
 {
        int ret = LTTCOMM_OK;
        int need_tracing_session = 1;
@@ -3937,6 +4082,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx)
 
        DBG("Processing client command %d", cmd_ctx->lsm->cmd_type);
 
+       *sock_error = 0;
+
        switch (cmd_ctx->lsm->cmd_type) {
        case LTTNG_CREATE_SESSION:
        case LTTNG_CREATE_SESSION_URI:
@@ -4447,6 +4594,43 @@ skip_domain:
                                cmd_ctx->lsm->u.reg.path);
                break;
        }
+       case LTTNG_SET_FILTER:
+       {
+               struct lttng_filter_bytecode *bytecode;
+
+               if (cmd_ctx->lsm->u.filter.bytecode_len > 65336) {
+                       ret = LTTCOMM_FILTER_INVAL;
+                       goto error;
+               }
+               bytecode = zmalloc(cmd_ctx->lsm->u.filter.bytecode_len);
+               if (!bytecode) {
+                       ret = LTTCOMM_FILTER_NOMEM;
+                       goto error;
+               }
+               /* Receive var. len. data */
+               DBG("Receiving var len data from client ...");
+               ret = lttcomm_recv_unix_sock(sock, bytecode,
+                               cmd_ctx->lsm->u.filter.bytecode_len);
+               if (ret <= 0) {
+                       DBG("Nothing recv() from client var len data... continuing");
+                       *sock_error = 1;
+                       ret = LTTCOMM_FILTER_INVAL;
+                       goto error;
+               }
+
+               if (bytecode->len + sizeof(*bytecode)
+                               != cmd_ctx->lsm->u.filter.bytecode_len) {
+                       free(bytecode);
+                       ret = LTTCOMM_FILTER_INVAL;
+                       goto error;
+               }
+
+               ret = cmd_set_filter(cmd_ctx->session, cmd_ctx->lsm->domain.type,
+                               cmd_ctx->lsm->u.filter.channel_name,
+                               cmd_ctx->lsm->u.filter.event_name,
+                               bytecode);
+               break;
+       }
        default:
                ret = LTTCOMM_UND;
                break;
@@ -4472,13 +4656,195 @@ init_setup_error:
        return ret;
 }
 
+/*
+ * Thread managing health check socket.
+ */
+static void *thread_manage_health(void *data)
+{
+       int sock = -1, new_sock, ret, i, pollfd, err = -1;
+       uint32_t revents, nb_fd;
+       struct lttng_poll_event events;
+       struct lttcomm_health_msg msg;
+       struct lttcomm_health_data reply;
+
+       DBG("[thread] Manage health check started");
+
+       rcu_register_thread();
+
+       /* Create unix socket */
+       sock = lttcomm_create_unix_sock(health_unix_sock_path);
+       if (sock < 0) {
+               ERR("Unable to create health check Unix socket");
+               ret = -1;
+               goto error;
+       }
+
+       ret = lttcomm_listen_unix_sock(sock);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /*
+        * Pass 2 as size here for the thread quit pipe and client_sock. Nothing
+        * more will be added to this poll set.
+        */
+       ret = create_thread_poll_set(&events, 2);
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Add the application registration socket */
+       ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI);
+       if (ret < 0) {
+               goto error;
+       }
+
+       while (1) {
+               DBG("Health check ready");
+
+               nb_fd = LTTNG_POLL_GETNB(&events);
+
+               /* Inifinite blocking call, waiting for transmission */
+restart:
+               ret = lttng_poll_wait(&events, -1);
+               if (ret < 0) {
+                       /*
+                        * Restart interrupted system call.
+                        */
+                       if (errno == EINTR) {
+                               goto restart;
+                       }
+                       goto error;
+               }
+
+               for (i = 0; i < nb_fd; i++) {
+                       /* Fetch once the poll data */
+                       revents = LTTNG_POLL_GETEV(&events, i);
+                       pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                       /* Thread quit pipe has been closed. Killing thread. */
+                       ret = check_thread_quit_pipe(pollfd, revents);
+                       if (ret) {
+                               err = 0;
+                               goto exit;
+                       }
+
+                       /* Event on the registration socket */
+                       if (pollfd == sock) {
+                               if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+                                       ERR("Health socket poll error");
+                                       goto error;
+                               }
+                       }
+               }
+
+               new_sock = lttcomm_accept_unix_sock(sock);
+               if (new_sock < 0) {
+                       goto error;
+               }
+
+               DBG("Receiving data from client for health...");
+               ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg));
+               if (ret <= 0) {
+                       DBG("Nothing recv() from client... continuing");
+                       ret = close(new_sock);
+                       if (ret) {
+                               PERROR("close");
+                       }
+                       new_sock = -1;
+                       continue;
+               }
+
+               rcu_thread_online();
+
+               switch (msg.component) {
+               case LTTNG_HEALTH_CMD:
+                       reply.ret_code = health_check_state(&health_thread_cmd);
+                       break;
+               case LTTNG_HEALTH_APP_MANAGE:
+                       reply.ret_code = health_check_state(&health_thread_app_manage);
+                       break;
+               case LTTNG_HEALTH_APP_REG:
+                       reply.ret_code = health_check_state(&health_thread_app_reg);
+                       break;
+               case LTTNG_HEALTH_KERNEL:
+                       reply.ret_code = health_check_state(&health_thread_kernel);
+                       break;
+               case LTTNG_HEALTH_CONSUMER:
+                       reply.ret_code = check_consumer_health();
+                       break;
+               case LTTNG_HEALTH_ALL:
+                       reply.ret_code =
+                               health_check_state(&health_thread_app_manage) &&
+                               health_check_state(&health_thread_app_reg) &&
+                               health_check_state(&health_thread_cmd) &&
+                               health_check_state(&health_thread_kernel) &&
+                               check_consumer_health();
+                       break;
+               default:
+                       reply.ret_code = LTTCOMM_UND;
+                       break;
+               }
+
+               /*
+                * Flip ret value since 0 is a success and 1 indicates a bad health for
+                * the client where in the sessiond it is the opposite. Again, this is
+                * just to make things easier for us poor developer which enjoy a lot
+                * lazyness.
+                */
+               if (reply.ret_code == 0 || reply.ret_code == 1) {
+                       reply.ret_code = !reply.ret_code;
+               }
+
+               DBG2("Health check return value %d", reply.ret_code);
+
+               ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply));
+               if (ret < 0) {
+                       ERR("Failed to send health data back to client");
+               }
+
+               /* End of transmission */
+               ret = close(new_sock);
+               if (ret) {
+                       PERROR("close");
+               }
+               new_sock = -1;
+       }
+
+exit:
+error:
+       if (err) {
+               ERR("Health error occurred in %s", __func__);
+       }
+       DBG("Health check thread dying");
+       unlink(health_unix_sock_path);
+       if (sock >= 0) {
+               ret = close(sock);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+       if (new_sock >= 0) {
+               ret = close(new_sock);
+               if (ret) {
+                       PERROR("close");
+               }
+       }
+
+       lttng_poll_clean(&events);
+
+       rcu_unregister_thread();
+       return NULL;
+}
+
 /*
  * This thread manage all clients request using the unix client socket for
  * communication.
  */
 static void *thread_manage_clients(void *data)
 {
-       int sock = -1, ret, i, pollfd;
+       int sock = -1, ret, i, pollfd, err = -1;
+       int sock_error;
        uint32_t revents, nb_fd;
        struct command_ctx *cmd_ctx = NULL;
        struct lttng_poll_event events;
@@ -4487,6 +4853,8 @@ static void *thread_manage_clients(void *data)
 
        rcu_register_thread();
 
+       health_code_update(&health_thread_cmd);
+
        ret = lttcomm_listen_unix_sock(client_sock);
        if (ret < 0) {
                goto error;
@@ -4514,6 +4882,8 @@ static void *thread_manage_clients(void *data)
                kill(ppid, SIGUSR1);
        }
 
+       health_code_update(&health_thread_cmd);
+
        while (1) {
                DBG("Accepting client command ...");
 
@@ -4521,7 +4891,9 @@ static void *thread_manage_clients(void *data)
 
                /* Inifinite blocking call, waiting for transmission */
        restart:
+               health_poll_update(&health_thread_cmd);
                ret = lttng_poll_wait(&events, -1);
+               health_poll_update(&health_thread_cmd);
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -4537,10 +4909,13 @@ static void *thread_manage_clients(void *data)
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update(&health_thread_cmd);
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
-                               goto error;
+                               err = 0;
+                               goto exit;
                        }
 
                        /* Event on the registration socket */
@@ -4554,6 +4929,8 @@ static void *thread_manage_clients(void *data)
 
                DBG("Wait for client response");
 
+               health_code_update(&health_thread_cmd);
+
                sock = lttcomm_accept_unix_sock(client_sock);
                if (sock < 0) {
                        goto error;
@@ -4582,6 +4959,8 @@ static void *thread_manage_clients(void *data)
                cmd_ctx->llm = NULL;
                cmd_ctx->session = NULL;
 
+               health_code_update(&health_thread_cmd);
+
                /*
                 * Data is received from the lttng client. The struct
                 * lttcomm_session_msg (lsm) contains the command and data request of
@@ -4601,6 +4980,8 @@ static void *thread_manage_clients(void *data)
                        continue;
                }
 
+               health_code_update(&health_thread_cmd);
+
                // TODO: Validate cmd_ctx including sanity check for
                // security purpose.
 
@@ -4611,18 +4992,29 @@ static void *thread_manage_clients(void *data)
                 * informations for the client. The command context struct contains
                 * everything this function may needs.
                 */
-               ret = process_client_msg(cmd_ctx);
+               ret = process_client_msg(cmd_ctx, sock, &sock_error);
                rcu_thread_offline();
                if (ret < 0) {
+                       if (sock_error) {
+                               ret = close(sock);
+                               if (ret) {
+                                       PERROR("close");
+                               }
+                               sock = -1;
+                       }
                        /*
                         * TODO: Inform client somehow of the fatal error. At
                         * this point, ret < 0 means that a zmalloc failed
-                        * (ENOMEM). Error detected but still accept command.
+                        * (ENOMEM). Error detected but still accept
+                        * command, unless a socket error has been
+                        * detected.
                         */
                        clean_command_ctx(&cmd_ctx);
                        continue;
                }
 
+               health_code_update(&health_thread_cmd);
+
                DBG("Sending response (size: %d, retcode: %s)",
                                cmd_ctx->lttng_msg_size,
                                lttng_strerror(-cmd_ctx->llm->ret_code));
@@ -4639,9 +5031,18 @@ static void *thread_manage_clients(void *data)
                sock = -1;
 
                clean_command_ctx(&cmd_ctx);
+
+               health_code_update(&health_thread_cmd);
        }
 
+exit:
 error:
+       if (err) {
+               health_error(&health_thread_cmd);
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_exit(&health_thread_cmd);
+
        DBG("Client thread dying");
        unlink(client_unix_sock_path);
        if (client_sock >= 0) {
@@ -5190,6 +5591,11 @@ int main(int argc, char **argv)
                                        DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH);
                }
 
+               if (strlen(health_unix_sock_path) == 0) {
+                       snprintf(health_unix_sock_path, sizeof(health_unix_sock_path),
+                                       DEFAULT_GLOBAL_HEALTH_UNIX_SOCK);
+               }
+
                /* Setup kernel consumerd path */
                snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX,
                                DEFAULT_KCONSUMERD_ERR_SOCK_PATH, rundir);
@@ -5240,6 +5646,12 @@ int main(int argc, char **argv)
                        snprintf(wait_shm_path, PATH_MAX,
                                        DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid());
                }
+
+               /* Set health check Unix path */
+               if (strlen(health_unix_sock_path) == 0) {
+                       snprintf(health_unix_sock_path, sizeof(health_unix_sock_path),
+                                       DEFAULT_HOME_HEALTH_UNIX_SOCK, home_path);
+               }
        }
 
        /* Set consumer initial state */
@@ -5372,6 +5784,32 @@ int main(int argc, char **argv)
         */
        uatomic_set(&relayd_net_seq_idx, 1);
 
+       /* Init all health thread counters. */
+       health_init(&health_thread_cmd);
+       health_init(&health_thread_kernel);
+       health_init(&health_thread_app_manage);
+       health_init(&health_thread_app_reg);
+
+       /*
+        * Init health counters of the consumer thread. We do a quick hack here to
+        * the state of the consumer health is fine even if the thread is not
+        * started.  This is simply to ease our life and has no cost what so ever.
+        */
+       health_init(&kconsumer_data.health);
+       health_poll_update(&kconsumer_data.health);
+       health_init(&ustconsumer32_data.health);
+       health_poll_update(&ustconsumer32_data.health);
+       health_init(&ustconsumer64_data.health);
+       health_poll_update(&ustconsumer64_data.health);
+
+       /* Create thread to manage the client socket */
+       ret = pthread_create(&health_thread, NULL,
+                       thread_manage_health, (void *) NULL);
+       if (ret != 0) {
+               PERROR("pthread_create health");
+               goto exit_health;
+       }
+
        /* Create thread to manage the client socket */
        ret = pthread_create(&client_thread, NULL,
                        thread_manage_clients, (void *) NULL);
@@ -5453,6 +5891,7 @@ exit_dispatch:
        }
 
 exit_client:
+exit_health:
 exit:
        /*
         * cleanup() is called when no other thread is running.
This page took 0.037577 seconds and 5 git commands to generate.