X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=92bf85bc6a615ba5aa3face13efe04a534189bb2;hb=3f8e211fbe73cbcf69d52af5e839b14d1a951ed7;hp=e2ca01eb54fe6541323f971095c1d329f28cf747;hpb=53a80697a772bc2e260e3dff006f910be6709f04;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index e2ca01eb5..92bf85bc6 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include "lttng-sessiond.h" #include "channel.h" @@ -59,6 +60,7 @@ #include "utils.h" #include "fd-limit.h" #include "filter.h" +#include "health.h" #define CONSUMERD_FILE "lttng-consumerd" @@ -85,6 +87,8 @@ static struct consumer_data kconsumer_data = { .cmd_unix_sock_path = DEFAULT_KCONSUMERD_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer64_data = { .type = LTTNG_CONSUMER64_UST, @@ -92,6 +96,8 @@ static struct consumer_data ustconsumer64_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD64_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; static struct consumer_data ustconsumer32_data = { .type = LTTNG_CONSUMER32_UST, @@ -99,8 +105,11 @@ static struct consumer_data ustconsumer32_data = { .cmd_unix_sock_path = DEFAULT_USTCONSUMERD32_CMD_SOCK_PATH, .err_sock = -1, .cmd_sock = -1, + .pid_mutex = PTHREAD_MUTEX_INITIALIZER, + .lock = PTHREAD_MUTEX_INITIALIZER, }; +/* Shared between threads */ static int dispatch_thread_exit; /* Global application Unix socket path */ @@ -109,6 +118,8 @@ static char apps_unix_sock_path[PATH_MAX]; static char client_unix_sock_path[PATH_MAX]; /* global wait shm path for UST */ static char wait_shm_path[PATH_MAX]; +/* Global health check unix path */ +static char health_unix_sock_path[PATH_MAX]; /* Sockets and FDs */ static int client_sock = -1; @@ -134,6 +145,7 @@ static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; +static pthread_t health_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -208,6 +220,12 @@ static enum consumerd_state kernel_consumerd_state; */ static unsigned int relayd_net_seq_idx; +/* Used for the health monitoring of the session daemon. See health.h */ +struct health_state health_thread_cmd; +struct health_state health_thread_app_manage; +struct health_state health_thread_app_reg; +struct health_state health_thread_kernel; + static void setup_consumerd_path(void) { @@ -353,23 +371,56 @@ error: */ static void teardown_kernel_session(struct ltt_session *session) { + int ret; + struct lttng_ht_iter iter; + struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + if (!session->kernel_session) { DBG3("No kernel session when tearing down session"); return; } + ksess = session->kernel_session; + DBG("Tearing down kernel session"); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that not streams data will be lost after this + * command is issued. + */ + if (ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, ksess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + /* * If a custom kernel consumer was registered, close the socket before * tearing down the complete kernel session structure */ - if (kconsumer_data.cmd_sock >= 0 && - session->kernel_session->consumer_fd != kconsumer_data.cmd_sock) { - lttcomm_close_unix_sock(session->kernel_session->consumer_fd); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, + node.node) { + if (socket->fd != kconsumer_data.cmd_sock) { + rcu_read_lock(); + consumer_del_socket(socket, ksess->consumer); + lttcomm_close_unix_sock(socket->fd); + consumer_destroy_socket(socket); + rcu_read_unlock(); + } } - trace_kernel_destroy_session(session->kernel_session); + trace_kernel_destroy_session(ksess); } /* @@ -379,20 +430,44 @@ static void teardown_kernel_session(struct ltt_session *session) static void teardown_ust_session(struct ltt_session *session) { int ret; + struct lttng_ht_iter iter; + struct ltt_ust_session *usess; + struct consumer_socket *socket; if (!session->ust_session) { DBG3("No UST session when tearing down session"); return; } + usess = session->ust_session; DBG("Tearing down UST session(s)"); - ret = ust_app_destroy_trace_all(session->ust_session); + /* + * Destroy relayd associated with the session consumer. This action is + * valid since in order to destroy a session we must acquire the session + * lock. This means that there CAN NOT be stream(s) being sent to a + * consumer since this action also requires the session lock at any time. + * + * At this point, we are sure that no data will be lost after this command + * is issued. + */ + if (usess->consumer && usess->consumer->type == CONSUMER_DST_NET) { + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, socket, + node.node) { + ret = consumer_send_destroy_relayd(socket, usess->consumer); + if (ret < 0) { + ERR("Unable to send destroy relayd command to consumer"); + /* Continue since we MUST delete everything at this point. */ + } + } + } + + ret = ust_app_destroy_trace_all(usess); if (ret) { ERR("Error in ust_app_destroy_trace_all"); } - trace_ust_destroy_session(session->ust_session); + trace_ust_destroy_session(usess); } /* @@ -410,7 +485,7 @@ static void stop_threads(void) } /* Dispatch thread */ - dispatch_thread_exit = 1; + CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&ust_cmd_queue.futex); } @@ -456,8 +531,6 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); - pthread_mutex_destroy(&kconsumer_data.pid_mutex); - if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); if (kernel_tracer_fd >= 0) { @@ -622,6 +695,7 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) { int ret = 0; struct ltt_session *session; + struct ltt_kernel_session *ksess; struct ltt_kernel_channel *channel; DBG("Updating kernel streams for channel fd %d", fd); @@ -633,14 +707,9 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) session_unlock(session); continue; } + ksess = session->kernel_session; - /* This is not suppose to be -1 but this is an extra security check */ - if (session->kernel_session->consumer_fd < 0) { - session->kernel_session->consumer_fd = consumer_data->cmd_sock; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { + cds_list_for_each_entry(channel, &ksess->channel_list.head, list) { if (channel->fd == fd) { DBG("Channel found, updating kernel streams"); ret = kernel_open_channel_stream(channel); @@ -653,13 +722,23 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) * that tracing is started so it is safe to send our updated * stream fds. */ - if (session->kernel_session->consumer_fds_sent == 1 && - session->kernel_session->consumer != NULL) { - ret = kernel_consumer_send_channel_stream( - session->kernel_session->consumer_fd, channel, - session->kernel_session); - if (ret < 0) { - goto error; + if (ksess->consumer_fds_sent == 1 && ksess->consumer != NULL) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_stream(socket->fd, + channel, ksess); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + goto error; + } } } goto error; @@ -705,13 +784,15 @@ static void update_ust_app(int app_sock) */ static void *thread_manage_kernel(void *data) { - int ret, i, pollfd, update_poll_flag = 1; + int ret, i, pollfd, update_poll_flag = 1, err = -1; uint32_t revents, nb_fd; char tmp; struct lttng_poll_event events; DBG("Thread manage kernel started"); + health_code_update(&health_thread_kernel); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -723,6 +804,8 @@ static void *thread_manage_kernel(void *data) } while (1) { + health_code_update(&health_thread_kernel); + if (update_poll_flag == 1) { /* * Reset number of fd in the poll set. Always 2 since there is the thread @@ -746,7 +829,9 @@ static void *thread_manage_kernel(void *data) /* Poll infinite value of time */ restart: + health_poll_update(&health_thread_kernel); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_kernel); if (ret < 0) { /* * Restart interrupted system call. @@ -767,10 +852,13 @@ static void *thread_manage_kernel(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_kernel); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Check for data on kernel pipe */ @@ -798,9 +886,15 @@ static void *thread_manage_kernel(void *data) } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_kernel); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_kernel); DBG("Kernel thread dying"); return NULL; } @@ -810,7 +904,7 @@ error_poll_create: */ static void *thread_manage_consumer(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; enum lttcomm_return_code code; struct lttng_poll_event events; @@ -818,6 +912,8 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); + health_code_update(&consumer_data->health); + ret = lttcomm_listen_unix_sock(consumer_data->err_sock); if (ret < 0) { goto error_listen; @@ -839,9 +935,13 @@ static void *thread_manage_consumer(void *data) nb_fd = LTTNG_POLL_GETNB(&events); + health_code_update(&consumer_data->health); + /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -857,10 +957,13 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -877,6 +980,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + DBG2("Receiving code from consumer err_sock"); /* Getting status code from kconsumerd */ @@ -886,6 +991,8 @@ restart: goto error; } + health_code_update(&consumer_data->health); + if (code == CONSUMERD_COMMAND_SOCK_READY) { consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); @@ -914,12 +1021,16 @@ restart: goto error; } + health_code_update(&consumer_data->health); + /* Update number of fd */ nb_fd = LTTNG_POLL_GETNB(&events); /* Inifinite blocking call, waiting for transmission */ restart_poll: + health_poll_update(&consumer_data->health); ret = lttng_poll_wait(&events, -1); + health_poll_update(&consumer_data->health); if (ret < 0) { /* * Restart interrupted system call. @@ -935,10 +1046,13 @@ restart_poll: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&consumer_data->health); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the kconsumerd socket */ @@ -950,6 +1064,8 @@ restart_poll: } } + health_code_update(&consumer_data->health); + /* Wait for any kconsumerd error */ ret = lttcomm_recv_unix_sock(sock, &code, sizeof(enum lttcomm_return_code)); @@ -960,6 +1076,7 @@ restart_poll: ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); +exit: error: /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { @@ -998,6 +1115,11 @@ error: lttng_poll_clean(&events); error_poll: error_listen: + if (err) { + health_error(&consumer_data->health); + ERR("Health error occurred in %s", __func__); + } + health_exit(&consumer_data->health); DBG("consumer thread cleanup completed"); return NULL; @@ -1008,7 +1130,7 @@ error_listen: */ static void *thread_manage_apps(void *data) { - int i, ret, pollfd; + int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct ust_command ust_cmd; struct lttng_poll_event events; @@ -1018,6 +1140,8 @@ static void *thread_manage_apps(void *data) rcu_register_thread(); rcu_thread_online(); + health_code_update(&health_thread_app_manage); + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -1028,6 +1152,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + while (1) { /* Zeroed the events structure */ lttng_poll_reset(&events); @@ -1038,7 +1164,9 @@ static void *thread_manage_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_manage); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_manage); if (ret < 0) { /* * Restart interrupted system call. @@ -1054,10 +1182,13 @@ static void *thread_manage_apps(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_app_manage); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Inspect the apps cmd pipe */ @@ -1073,6 +1204,8 @@ static void *thread_manage_apps(void *data) goto error; } + health_code_update(&health_thread_app_manage); + /* Register applicaton to the session daemon */ ret = ust_app_register(&ust_cmd.reg_msg, ust_cmd.sock); @@ -1082,6 +1215,8 @@ static void *thread_manage_apps(void *data) break; } + health_code_update(&health_thread_app_manage); + /* * Validate UST version compatibility. */ @@ -1094,6 +1229,8 @@ static void *thread_manage_apps(void *data) update_ust_app(ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + ret = ust_app_register_done(ust_cmd.sock); if (ret < 0) { /* @@ -1117,6 +1254,8 @@ static void *thread_manage_apps(void *data) ust_cmd.sock); } + health_code_update(&health_thread_app_manage); + break; } } else { @@ -1136,12 +1275,20 @@ static void *thread_manage_apps(void *data) break; } } + + health_code_update(&health_thread_app_manage); } } +exit: error: lttng_poll_clean(&events); error_poll_create: + if (err) { + health_error(&health_thread_app_manage); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_manage); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); @@ -1160,7 +1307,7 @@ static void *thread_dispatch_ust_registration(void *data) DBG("[thread] Dispatch UST command started"); - while (!dispatch_thread_exit) { + while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); @@ -1217,7 +1364,7 @@ error: */ static void *thread_registration_apps(void *data) { - int sock = -1, i, ret, pollfd; + int sock = -1, i, ret, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_poll_event events; /* @@ -1263,7 +1410,9 @@ static void *thread_registration_apps(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_app_reg); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_app_reg); if (ret < 0) { /* * Restart interrupted system call. @@ -1275,6 +1424,8 @@ static void *thread_registration_apps(void *data) } for (i = 0; i < nb_fd; i++) { + health_code_update(&health_thread_app_reg); + /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -1282,7 +1433,8 @@ static void *thread_registration_apps(void *data) /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -1318,6 +1470,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, sizeof(struct ust_register_msg)); if (ret < 0 || ret < sizeof(struct ust_register_msg)) { @@ -1335,6 +1488,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } + health_code_update(&health_thread_app_reg); ust_cmd->sock = sock; sock = -1; @@ -1362,7 +1516,14 @@ static void *thread_registration_apps(void *data) } } +exit: error: + if (err) { + health_error(&health_thread_app_reg); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_app_reg); + /* Notify that the registration thread is gone */ notify_ust_apps(0); @@ -1468,7 +1629,8 @@ static int join_consumer_thread(struct consumer_data *consumer_data) void *status; int ret; - if (consumer_data->pid != 0) { + /* Consumer pid must be a real one. */ + if (consumer_data->pid > 0) { ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1681,6 +1843,23 @@ error: return ret; } +/* + * Compute health status of each consumer. If one of them is zero (bad + * state), we return 0. + */ +static int check_consumer_health(void) +{ + int ret; + + ret = health_check_state(&kconsumer_data.health) && + health_check_state(&ustconsumer32_data.health) && + health_check_state(&ustconsumer64_data.health); + + DBG3("Health consumer check %d", ret); + + return ret; +} + /* * Check version of the lttng-modules. */ @@ -1758,21 +1937,24 @@ error: static int init_kernel_tracing(struct ltt_kernel_session *session) { int ret = 0; + struct lttng_ht_iter iter; + struct consumer_socket *socket; - if (session->consumer_fds_sent == 0 && session->consumer != NULL) { - /* - * Assign default kernel consumer socket if no consumer assigned to the - * kernel session. At this point, it's NOT supposed to be -1 but this is - * an extra security check. - */ - if (session->consumer_fd < 0) { - session->consumer_fd = kconsumer_data.cmd_sock; - } + assert(session); - ret = kernel_consumer_send_session(session->consumer_fd, session); - if (ret < 0) { - ret = LTTCOMM_KERN_CONSUMER_FAIL; - goto error; + if (session->consumer_fds_sent == 0 && session->consumer != NULL) { + cds_lfht_for_each_entry(session->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_session(socket->fd, session); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + ret = LTTCOMM_KERN_CONSUMER_FAIL; + goto error; + } } } @@ -1881,25 +2063,12 @@ static int send_socket_relayd_consumer(int domain, struct ltt_session *session, session->net_handle = 1; } - switch (domain) { - case LTTNG_DOMAIN_KERNEL: - /* Send relayd socket to consumer. */ - ret = kernel_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; - case LTTNG_DOMAIN_UST: - /* Send relayd socket to consumer. */ - ret = ust_consumer_send_relayd_socket(consumer_fd, sock, - consumer, relayd_uri->stype); - if (ret < 0) { - ret = LTTCOMM_ENABLE_CONSUMER_FAIL; - goto close_sock; - } - break; + /* Send relayd socket to consumer. */ + ret = consumer_send_relayd_socket(consumer_fd, sock, + consumer, relayd_uri->stype); + if (ret < 0) { + ret = LTTCOMM_ENABLE_CONSUMER_FAIL; + goto close_sock; } ret = LTTCOMM_OK; @@ -1928,6 +2097,15 @@ static int send_sockets_relayd_consumer(int domain, { int ret; + assert(session); + assert(consumer); + + /* Don't resend the sockets to the consumer. */ + if (consumer->dst.net.relayd_socks_sent) { + ret = LTTCOMM_OK; + goto error; + } + /* Sending control relayd socket. */ ret = send_socket_relayd_consumer(domain, session, &consumer->dst.net.control, consumer, fd); @@ -1942,6 +2120,9 @@ static int send_sockets_relayd_consumer(int domain, goto error; } + /* Flag that all relayd sockets were sent to the consumer. */ + consumer->dst.net.relayd_socks_sent = 1; + error: return ret; } @@ -1956,6 +2137,8 @@ static int setup_relayd(struct ltt_session *session) int ret = LTTCOMM_OK; struct ltt_ust_session *usess; struct ltt_kernel_session *ksess; + struct consumer_socket *socket; + struct lttng_ht_iter iter; assert(session); @@ -1964,34 +2147,39 @@ static int setup_relayd(struct ltt_session *session) DBG2("Setting relayd for session %s", session->name); - if (usess && usess->consumer->sock == -1 && - usess->consumer->type == CONSUMER_DST_NET && - usess->consumer->enabled) { - /* Setup relayd for 64 bits consumer */ - if (ust_consumerd64_fd >= 0) { + if (usess && usess->consumer && usess->consumer->type == CONSUMER_DST_NET + && usess->consumer->enabled) { + /* For each consumer socket, send relayd sockets */ + cds_lfht_for_each_entry(usess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd64_fd); + usess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } + } - /* Setup relayd for 32 bits consumer */ - if (ust_consumerd32_fd >= 0) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_UST, session, - usess->consumer, ust_consumerd32_fd); + if (ksess && ksess->consumer && ksess->consumer->type == CONSUMER_DST_NET + && ksess->consumer->enabled) { + cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, + socket, node.node) { + /* Code flow error */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, + ksess->consumer, socket->fd); + pthread_mutex_unlock(socket->lock); if (ret != LTTCOMM_OK) { goto error; } } - } else if (ksess && ksess->consumer->sock == -1 && - ksess->consumer->type == CONSUMER_DST_NET && - ksess->consumer->enabled) { - send_sockets_relayd_consumer(LTTNG_DOMAIN_KERNEL, session, - ksess->consumer, ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } } error: @@ -2009,6 +2197,9 @@ static int copy_session_consumer(int domain, struct ltt_session *session) const char *dir_name; struct consumer_output *consumer; + assert(session); + assert(session->consumer); + switch (domain) { case LTTNG_DOMAIN_KERNEL: DBG3("Copying tracing session consumer output in kernel session"); @@ -2035,12 +2226,6 @@ static int copy_session_consumer(int domain, struct ltt_session *session) strncat(consumer->subdir, dir_name, sizeof(consumer->subdir)); DBG3("Copy session consumer subdir %s", consumer->subdir); - /* Add default trace directory name */ - if (consumer->type == CONSUMER_DST_LOCAL) { - strncat(consumer->dst.trace_path, dir_name, - sizeof(consumer->dst.trace_path)); - } - ret = LTTCOMM_OK; error: @@ -2057,6 +2242,7 @@ static int create_ust_session(struct ltt_session *session, struct ltt_ust_session *lus = NULL; assert(session); + assert(domain); assert(session->consumer); switch (domain->type) { @@ -2076,18 +2262,6 @@ static int create_ust_session(struct ltt_session *session, goto error; } - if (session->consumer->type == CONSUMER_DST_LOCAL) { - ret = run_as_mkdir_recursive(lus->pathname, S_IRWXU | S_IRWXG, - session->uid, session->gid); - if (ret < 0) { - if (ret != -EEXIST) { - ERR("Trace directory creation error"); - ret = LTTCOMM_UST_SESS_FAIL; - goto error; - } - } - } - lus->uid = session->uid; lus->gid = session->gid; session->ust_session = lus; @@ -2121,10 +2295,8 @@ static int create_kernel_session(struct ltt_session *session) goto error; } - /* Set kernel consumer socket fd */ - if (kconsumer_data.cmd_sock >= 0) { - session->kernel_session->consumer_fd = kconsumer_data.cmd_sock; - } + /* Code flow safety */ + assert(session->kernel_session); /* Copy session output to the newly created Kernel session */ ret = copy_session_consumer(LTTNG_DOMAIN_KERNEL, session); @@ -2133,7 +2305,8 @@ static int create_kernel_session(struct ltt_session *session) } /* Create directory(ies) on local filesystem. */ - if (session->consumer->type == CONSUMER_DST_LOCAL) { + if (session->kernel_session->consumer->type == CONSUMER_DST_LOCAL && + strlen(session->kernel_session->consumer->dst.trace_path) > 0) { ret = run_as_mkdir_recursive( session->kernel_session->consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); @@ -2350,6 +2523,9 @@ static int list_lttng_ust_global_events(char *channel_name, tmp[i].loglevel_type = LTTNG_EVENT_LOGLEVEL_SINGLE; break; } + if (uevent->filter) { + tmp[i].filter = 1; + } i++; } @@ -2432,6 +2608,77 @@ error: return ret; } + +/* + * Add URI so the consumer output object. Set the correct path depending on the + * domain adding the default trace directory. + */ +static int add_uri_to_consumer(struct consumer_output *consumer, + struct lttng_uri *uri, int domain) +{ + int ret; + const char *default_trace_dir; + + assert(uri); + + if (consumer == NULL) { + DBG("No consumer detected. Don't add URI. Stopping."); + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + + switch (domain) { + case LTTNG_DOMAIN_KERNEL: + default_trace_dir = DEFAULT_KERNEL_TRACE_DIR; + break; + case LTTNG_DOMAIN_UST: + default_trace_dir = DEFAULT_UST_TRACE_DIR; + break; + default: + /* + * This case is possible is we try to add the URI to the global tracing + * session consumer object which in this case there is no subdir. + */ + default_trace_dir = ""; + } + + switch (uri->dtype) { + case LTTNG_DST_IPV4: + case LTTNG_DST_IPV6: + DBG2("Setting network URI to consumer"); + + /* Set URI into consumer output object */ + ret = consumer_set_network_uri(consumer, uri); + if (ret < 0) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* On a new subdir, reappend the default trace dir. */ + if (strlen(uri->subdir) != 0) { + strncat(consumer->subdir, default_trace_dir, + sizeof(consumer->subdir)); + } + + break; + case LTTNG_DST_PATH: + DBG2("Setting trace directory path from URI to %s", uri->dst.path); + memset(consumer->dst.trace_path, 0, + sizeof(consumer->dst.trace_path)); + strncpy(consumer->dst.trace_path, uri->dst.path, + sizeof(consumer->dst.trace_path)); + /* Append default trace dir */ + strncat(consumer->dst.trace_path, default_trace_dir, + sizeof(consumer->dst.trace_path)); + /* Flag consumer as local. */ + consumer->type = CONSUMER_DST_LOCAL; + break; + } + +error: + return ret; +} + /* * Command LTTNG_DISABLE_CHANNEL processed by the client thread. */ @@ -3117,8 +3364,7 @@ static int cmd_start_trace(struct ltt_session *session) if (ksession != NULL) { /* Open kernel metadata */ if (ksession->metadata == NULL) { - ret = kernel_open_metadata(ksession, - ksession->consumer->dst.trace_path); + ret = kernel_open_metadata(ksession); if (ret < 0) { ret = LTTCOMM_KERN_META_FAIL; goto error; @@ -3250,38 +3496,102 @@ error: } /* - * Command LTTNG_CREATE_SESSION_URI processed by the client thread. + * Command LTTNG_CREATE_SESSION processed by the client thread. */ -static int cmd_create_session_uri(char *name, struct lttng_uri *ctrl_uri, - struct lttng_uri *data_uri, unsigned int enable_consumer, - lttng_sock_cred *creds) +static int cmd_create_session_uri(char *name, struct lttng_uri *uris, + size_t nb_uri, lttng_sock_cred *creds) { - int ret; - char *path = NULL; + int ret, have_default_name = 0; + char *path = NULL, datetime[16]; struct ltt_session *session; - struct consumer_output *consumer; + struct consumer_output *consumer = NULL; + struct lttng_uri *ctrl_uri, *data_uri = NULL; + time_t rawtime; + struct tm *timeinfo; + + assert(name); + + /* Flag if we have a default session. */ + if (strncmp(name, DEFAULT_SESSION_NAME, + strlen(DEFAULT_SESSION_NAME)) == 0) { + have_default_name = 1; + } else { + /* Get date and time for session path */ + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo); + } - /* Verify if the session already exist */ + /* + * Verify if the session already exist + * + * XXX: There is no need for the session lock list here since the caller + * (process_client_msg) is holding it. We might want to change that so a + * single command does not lock the entire session list. + */ session = session_find_by_name(name); if (session != NULL) { ret = LTTCOMM_EXIST_SESS; - goto error; + goto consumer_error; } - /* TODO: validate URIs */ - - /* Create default consumer output */ + /* Create default consumer output for the session not yet created. */ consumer = consumer_create_output(CONSUMER_DST_LOCAL); if (consumer == NULL) { ret = LTTCOMM_FATAL; + goto consumer_error; + } + + /* Add session name and data to the consumer subdir */ + if (have_default_name) { + ret = snprintf(consumer->subdir, sizeof(consumer->subdir), "/%s", + name); + } else { + ret = snprintf(consumer->subdir, sizeof(consumer->subdir), "/%s-%s", + name, datetime); + } + if (ret < 0) { + PERROR("snprintf consumer subdir"); goto error; } - strncpy(consumer->subdir, ctrl_uri->subdir, sizeof(consumer->subdir)); - DBG2("Consumer subdir set to %s", consumer->subdir); + DBG2("Consumer subdir set to '%s'", consumer->subdir); + + /* + * This means that the lttng_create_session call was called with the _path_ + * argument set to NULL. + */ + if (uris == NULL) { + /* + * At this point, we'll skip the consumer URI setup and create a + * session with a NULL path which will flag the session to NOT spawn a + * consumer. + */ + DBG("Create session %s with NO uri, skipping consumer setup", name); + goto skip_consumer; + } + + /* TODO: validate URIs */ + + ctrl_uri = &uris[0]; + if (nb_uri > 1) { + data_uri = &uris[1]; + } + + /* Set subdirectory from the ctrl_uri received. */ + if (strlen(ctrl_uri->subdir) > 0) { + strncpy(consumer->subdir, ctrl_uri->subdir, sizeof(consumer->subdir)); + DBG2("Consumer subdir copy from ctrl_uri '%s'", consumer->subdir); + } switch (ctrl_uri->dtype) { case LTTNG_DST_IPV4: case LTTNG_DST_IPV6: + /* + * We MUST have a data_uri set at this point or else there is a code + * flow error. The caller should check that. + */ + assert(data_uri); + /* Set control URI into consumer output object */ ret = consumer_set_network_uri(consumer, ctrl_uri); if (ret < 0) { @@ -3307,76 +3617,78 @@ static int cmd_create_session_uri(char *name, struct lttng_uri *ctrl_uri, break; } - /* Set if the consumer is enabled or not */ - consumer->enabled = enable_consumer; + consumer->enabled = 1; +skip_consumer: + /* Create tracing session in the registry */ ret = session_create(name, path, LTTNG_SOCK_GET_UID_CRED(creds), LTTNG_SOCK_GET_GID_CRED(creds)); if (ret != LTTCOMM_OK) { - goto consumer_error; + goto error; } - /* Get the newly created session pointer back */ + /* + * Get the newly created session pointer back + * + * XXX: There is no need for the session lock list here since the caller + * (process_client_msg) is holding it. We might want to change that so a + * single command does not lock the entire session list. + */ session = session_find_by_name(name); assert(session); /* Assign consumer to session */ session->consumer = consumer; + /* Set correct path to session */ + if (have_default_name) { + /* We have the default session so the date-time is already appended */ + ret = snprintf(session->path, sizeof(session->path), "%s/%s", + path, name); + } else { + ret = snprintf(session->path, sizeof(session->path), "%s/%s-%s", + path, name, datetime); + } + if (ret < 0) { + PERROR("snprintf session path"); + goto session_error; + } + return LTTCOMM_OK; -consumer_error: - consumer_destroy_output(consumer); +session_error: + session_destroy(session); error: + rcu_read_lock(); + consumer_destroy_output(consumer); + rcu_read_unlock(); +consumer_error: return ret; } /* - * Command LTTNG_CREATE_SESSION processed by the client thread. + * Command LTTNG_DESTROY_SESSION processed by the client thread. */ -static int cmd_create_session(char *name, char *path, lttng_sock_cred *creds) +static int cmd_destroy_session(struct ltt_session *session, char *name) { int ret; - struct lttng_uri uri; - /* Zeroed temporary URI */ - memset(&uri, 0, sizeof(uri)); + /* Safety net */ + assert(session); - uri.dtype = LTTNG_DST_PATH; - uri.utype = LTTNG_URI_DST; - strncpy(uri.dst.path, path, sizeof(uri.dst.path)); + /* Clean kernel session teardown */ + teardown_kernel_session(session); + /* UST session teardown */ + teardown_ust_session(session); - /* TODO: Strip date-time from path and put it in uri's subdir */ - - ret = cmd_create_session_uri(name, &uri, NULL, 1, creds); - if (ret != LTTCOMM_OK) { - goto error; - } - -error: - return ret; -} - -/* - * Command LTTNG_DESTROY_SESSION processed by the client thread. - */ -static int cmd_destroy_session(struct ltt_session *session, char *name) -{ - int ret; - - /* Clean kernel session teardown */ - teardown_kernel_session(session); - /* UST session teardown */ - teardown_ust_session(session); - - /* - * Must notify the kernel thread here to update it's poll setin order - * to remove the channel(s)' fd just destroyed. - */ - ret = notify_thread_pipe(kernel_poll_pipe[1]); - if (ret < 0) { - PERROR("write kernel poll pipe"); - } + /* + * Must notify the kernel thread here to update it's poll setin order + * to remove the channel(s)' fd just destroyed. + */ + ret = notify_thread_pipe(kernel_poll_pipe[1]); + if (ret < 0) { + PERROR("write kernel poll pipe"); + } ret = session_destroy(session); @@ -3433,6 +3745,7 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, char *sock_path) { int ret, sock; + struct consumer_socket *socket; switch (domain) { case LTTNG_DOMAIN_KERNEL: @@ -3448,7 +3761,29 @@ static int cmd_register_consumer(struct ltt_session *session, int domain, goto error; } - session->kernel_session->consumer_fd = sock; + socket = consumer_allocate_socket(sock); + if (socket == NULL) { + ret = LTTCOMM_FATAL; + close(sock); + goto error; + } + + socket->lock = zmalloc(sizeof(pthread_mutex_t)); + if (socket->lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = LTTCOMM_FATAL; + goto error; + } + pthread_mutex_init(socket->lock, NULL); + + rcu_read_lock(); + consumer_add_socket(socket, session->kernel_session->consumer); + rcu_read_unlock(); + + pthread_mutex_lock(&kconsumer_data.pid_mutex); + kconsumer_data.pid = -1; + pthread_mutex_unlock(&kconsumer_data.pid_mutex); + break; default: /* TODO: Userspace tracing */ @@ -3589,12 +3924,16 @@ error: * Command LTTNG_SET_CONSUMER_URI processed by the client thread. */ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, - struct lttng_uri *uri) + size_t nb_uri, struct lttng_uri *uris) { - int ret; + int ret, i; struct ltt_kernel_session *ksess = session->kernel_session; struct ltt_ust_session *usess = session->ust_session; - struct consumer_output *consumer; + struct consumer_output *consumer = NULL; + + assert(session); + assert(uris); + assert(nb_uri > 0); /* Can't enable consumer after session started. */ if (session->enabled) { @@ -3602,7 +3941,25 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, goto error; } + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + + /* + * This case switch makes sure the domain session has a temporary consumer + * so the URL can be set. + */ switch (domain) { + case 0: + /* Code flow error. A session MUST always have a consumer object */ + assert(session->consumer); + /* + * The URL will be added to the tracing session consumer instead of a + * specific domain consumer. + */ + consumer = session->consumer; + break; case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3615,47 +3972,9 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, ret = LTTCOMM_FATAL; goto error; } - /* Reassign new pointer */ ksess->tmp_consumer = consumer; } - switch (uri->dtype) { - case LTTNG_DST_IPV4: - case LTTNG_DST_IPV6: - DBG2("Setting network URI for kernel session %s", session->name); - - /* Set URI into consumer output object */ - ret = consumer_set_network_uri(consumer, uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; - } - - /* On a new subdir, reappend the default trace dir. */ - if (strlen(uri->subdir) != 0) { - strncat(consumer->subdir, DEFAULT_KERNEL_TRACE_DIR, - sizeof(consumer->subdir)); - } - - ret = send_socket_relayd_consumer(domain, session, uri, consumer, - ksess->consumer_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - break; - case LTTNG_DST_PATH: - DBG2("Setting trace directory path from URI to %s", uri->dst.path); - memset(consumer->dst.trace_path, 0, - sizeof(consumer->dst.trace_path)); - strncpy(consumer->dst.trace_path, uri->dst.path, - sizeof(consumer->dst.trace_path)); - /* Append default kernel trace dir */ - strncat(consumer->dst.trace_path, DEFAULT_KERNEL_TRACE_DIR, - sizeof(consumer->dst.trace_path)); - break; - } - - /* All good! */ break; case LTTNG_DOMAIN_UST: /* Code flow error if we don't have a kernel session here. */ @@ -3669,59 +3988,45 @@ static int cmd_set_consumer_uri(int domain, struct ltt_session *session, ret = LTTCOMM_FATAL; goto error; } - /* Reassign new pointer */ usess->tmp_consumer = consumer; } - switch (uri->dtype) { - case LTTNG_DST_IPV4: - case LTTNG_DST_IPV6: - { - DBG2("Setting network URI for UST session %s", session->name); + break; + } - /* Set URI into consumer object */ - ret = consumer_set_network_uri(consumer, uri); - if (ret < 0) { - ret = LTTCOMM_FATAL; - goto error; - } + for (i = 0; i < nb_uri; i++) { + struct consumer_socket *socket; + struct lttng_ht_iter iter; - /* On a new subdir, reappend the default trace dir. */ - if (strlen(uri->subdir) != 0) { - strncat(consumer->subdir, DEFAULT_UST_TRACE_DIR, - sizeof(consumer->subdir)); - } + ret = add_uri_to_consumer(consumer, &uris[i], domain); + if (ret < 0) { + goto error; + } - if (ust_consumerd64_fd >= 0) { - ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd64_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - } + /* + * Don't send relayd socket if URI is NOT remote or if the relayd + * sockets for the session are already sent. + */ + if (uris[i].dtype == LTTNG_DST_PATH || + consumer->dst.net.relayd_socks_sent) { + continue; + } - if (ust_consumerd32_fd >= 0) { - ret = send_socket_relayd_consumer(domain, session, uri, - consumer, ust_consumerd32_fd); - if (ret != LTTCOMM_OK) { - goto error; - } - } + /* Try to send relayd URI to the consumer if exist. */ + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, + socket, node.node) { - break; - } - case LTTNG_DST_PATH: - DBG2("Setting trace directory path from URI to %s", uri->dst.path); - memset(consumer->dst.trace_path, 0, - sizeof(consumer->dst.trace_path)); - strncpy(consumer->dst.trace_path, uri->dst.path, - sizeof(consumer->dst.trace_path)); - /* Append default UST trace dir */ - strncat(consumer->dst.trace_path, DEFAULT_UST_TRACE_DIR, - sizeof(consumer->dst.trace_path)); - break; + /* A socket in the HT should never have a negative fd */ + assert(socket->fd >= 0); + + pthread_mutex_lock(socket->lock); + ret = send_socket_relayd_consumer(domain, session, &uris[i], + consumer, socket->fd); + pthread_mutex_unlock(socket->lock); + if (ret != LTTCOMM_OK) { + goto error; + } } - break; } /* All good! */ @@ -3741,13 +4046,24 @@ static int cmd_disable_consumer(int domain, struct ltt_session *session) struct ltt_ust_session *usess = session->ust_session; struct consumer_output *consumer; + assert(session); + if (session->enabled) { /* Can't disable consumer on an already started session */ ret = LTTCOMM_TRACE_ALREADY_STARTED; goto error; } + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + switch (domain) { + case 0: + DBG("Disable tracing session %s consumer", session->name); + consumer = session->consumer; + break; case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3769,11 +4085,13 @@ static int cmd_disable_consumer(int domain, struct ltt_session *session) goto error; } - assert(consumer); - consumer->enabled = 0; - - /* Success at this point */ - ret = LTTCOMM_OK; + if (consumer) { + consumer->enabled = 0; + /* Success at this point */ + ret = LTTCOMM_OK; + } else { + ret = LTTCOMM_NO_CONSUMER; + } error: return ret; @@ -3787,7 +4105,9 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) int ret; struct ltt_kernel_session *ksess = session->kernel_session; struct ltt_ust_session *usess = session->ust_session; - struct consumer_output *tmp_out; + struct consumer_output *consumer = NULL; + + assert(session); /* Can't enable consumer after session started. */ if (session->enabled) { @@ -3795,7 +4115,16 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } + if (!session->start_consumer) { + ret = LTTCOMM_NO_CONSUMER; + goto error; + } + switch (domain) { + case 0: + assert(session->consumer); + consumer = session->consumer; + break; case LTTNG_DOMAIN_KERNEL: /* Code flow error if we don't have a kernel session here. */ assert(ksess); @@ -3810,20 +4139,21 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - tmp_out = ksess->tmp_consumer; - if (tmp_out == NULL) { + consumer = ksess->tmp_consumer; + if (consumer == NULL) { + ret = LTTCOMM_OK; /* No temp. consumer output exists. Using the current one. */ DBG3("No temporary consumer. Using default"); - ret = LTTCOMM_OK; + consumer = ksess->consumer; goto error; } - switch (tmp_out->type) { + switch (consumer->type) { case CONSUMER_DST_LOCAL: DBG2("Consumer output is local. Creating directory(ies)"); /* Create directory(ies) */ - ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + ret = run_as_mkdir_recursive(consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret < 0) { if (ret != -EEXIST) { @@ -3836,13 +4166,13 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) case CONSUMER_DST_NET: DBG2("Consumer output is network. Validating URIs"); /* Validate if we have both control and data path set. */ - if (!tmp_out->dst.net.control_isset) { - ret = LTTCOMM_URI_CTRL_MISS; + if (!consumer->dst.net.control_isset) { + ret = LTTCOMM_URL_CTRL_MISS; goto error; } - if (!tmp_out->dst.net.data_isset) { - ret = LTTCOMM_URI_DATA_MISS; + if (!consumer->dst.net.data_isset) { + ret = LTTCOMM_URL_DATA_MISS; goto error; } @@ -3867,8 +4197,10 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(ksess->consumer); - ksess->consumer = tmp_out; + rcu_read_unlock(); + ksess->consumer = consumer; ksess->tmp_consumer = NULL; break; @@ -3886,20 +4218,21 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - tmp_out = usess->tmp_consumer; - if (tmp_out == NULL) { + consumer = usess->tmp_consumer; + if (consumer == NULL) { + ret = LTTCOMM_OK; /* No temp. consumer output exists. Using the current one. */ DBG3("No temporary consumer. Using default"); - ret = LTTCOMM_OK; + consumer = usess->consumer; goto error; } - switch (tmp_out->type) { + switch (consumer->type) { case CONSUMER_DST_LOCAL: DBG2("Consumer output is local. Creating directory(ies)"); /* Create directory(ies) */ - ret = run_as_mkdir_recursive(tmp_out->dst.trace_path, + ret = run_as_mkdir_recursive(consumer->dst.trace_path, S_IRWXU | S_IRWXG, session->uid, session->gid); if (ret < 0) { if (ret != -EEXIST) { @@ -3912,13 +4245,13 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) case CONSUMER_DST_NET: DBG2("Consumer output is network. Validating URIs"); /* Validate if we have both control and data path set. */ - if (!tmp_out->dst.net.control_isset) { - ret = LTTCOMM_URI_CTRL_MISS; + if (!consumer->dst.net.control_isset) { + ret = LTTCOMM_URL_CTRL_MISS; goto error; } - if (!tmp_out->dst.net.data_isset) { - ret = LTTCOMM_URI_DATA_MISS; + if (!consumer->dst.net.data_isset) { + ret = LTTCOMM_URL_DATA_MISS; goto error; } @@ -3929,7 +4262,7 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) goto error; } - if (tmp_out->net_seq_index == -1) { + if (consumer->net_seq_index == -1) { ret = LTTCOMM_ENABLE_CONSUMER_FAIL; DBG2("Network index is not set on the consumer"); goto error; @@ -3949,15 +4282,24 @@ static int cmd_enable_consumer(int domain, struct ltt_session *session) * session without this lock hence freeing the consumer output object * is valid. */ + rcu_read_lock(); consumer_destroy_output(usess->consumer); - usess->consumer = tmp_out; + rcu_read_unlock(); + usess->consumer = consumer; usess->tmp_consumer = NULL; break; } - /* Success at this point */ - ret = LTTCOMM_OK; + /* Enable it */ + if (consumer) { + consumer->enabled = 1; + /* Success at this point */ + ret = LTTCOMM_OK; + } else { + /* Should not really happend... */ + ret = LTTCOMM_NO_CONSUMER; + } error: return ret; @@ -3985,7 +4327,6 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: - case LTTNG_CREATE_SESSION_URI: case LTTNG_DESTROY_SESSION: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_DOMAINS: @@ -4032,7 +4373,6 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Commands that DO NOT need a session. */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: - case LTTNG_CREATE_SESSION_URI: case LTTNG_CALIBRATE: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_TRACEPOINTS: @@ -4066,6 +4406,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, if (!need_domain) { goto skip_domain; } + /* * Check domain type for specific "pre-action". */ @@ -4104,7 +4445,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Start the kernel consumer daemon */ pthread_mutex_lock(&kconsumer_data.pid_mutex); if (kconsumer_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&kconsumer_data.pid_mutex); ret = start_consumerd(&kconsumer_data); if (ret < 0) { @@ -4112,13 +4454,19 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, goto error; } uatomic_set(&kernel_consumerd_state, CONSUMER_STARTED); - - /* Set consumer fd of the session */ - cmd_ctx->session->kernel_session->consumer_fd = - kconsumer_data.cmd_sock; } else { pthread_mutex_unlock(&kconsumer_data.pid_mutex); } + + /* + * The consumer was just spawned so we need to add the socket to + * the consumer output of the session if exist. + */ + ret = consumer_create_socket(&kconsumer_data, + cmd_ctx->session->kernel_session->consumer); + if (ret < 0) { + goto error; + } } break; @@ -4131,6 +4479,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, } if (need_tracing_session) { + /* Create UST session if none exist. */ if (cmd_ctx->session->ust_session == NULL) { ret = create_ust_session(cmd_ctx->session, &cmd_ctx->lsm->domain); @@ -4144,37 +4493,60 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, pthread_mutex_lock(&ustconsumer64_data.pid_mutex); if (consumerd64_bin[0] != '\0' && ustconsumer64_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER64_FAIL; - ust_consumerd64_fd = -EINVAL; + uatomic_set(&ust_consumerd64_fd, -EINVAL); goto error; } - ust_consumerd64_fd = ustconsumer64_data.cmd_sock; + uatomic_set(&ust_consumerd64_fd, ustconsumer64_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + ret = consumer_create_socket(&ustconsumer64_data, + cmd_ctx->session->ust_session->consumer); + if (ret < 0) { + goto error; + } + /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && + cmd_ctx->session->start_consumer) { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { ret = LTTCOMM_UST_CONSUMER32_FAIL; - ust_consumerd32_fd = -EINVAL; + uatomic_set(&ust_consumerd32_fd, -EINVAL); goto error; } - ust_consumerd32_fd = ustconsumer32_data.cmd_sock; + uatomic_set(&ust_consumerd32_fd, ustconsumer32_data.cmd_sock); uatomic_set(&ust_consumerd_state, CONSUMER_STARTED); } else { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); } + + /* + * Setup socket for consumer 64 bit. No need for atomic access + * since it was set above and can ONLY be set in this thread. + */ + ret = consumer_create_socket(&ustconsumer32_data, + cmd_ctx->session->ust_session->consumer); + if (ret < 0) { + goto error; + } } break; } @@ -4259,7 +4631,26 @@ skip_domain: } case LTTNG_ENABLE_CONSUMER: { + /* + * XXX: 0 means that this URI should be applied on the session. Should + * be a DOMAIN enuam. + */ ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); + if (ret != LTTCOMM_OK) { + goto error; + } + + if (cmd_ctx->lsm->domain.type == 0) { + /* Add the URI for the UST session if a consumer is present. */ + if (cmd_ctx->session->ust_session && + cmd_ctx->session->ust_session->consumer) { + ret = cmd_enable_consumer(LTTNG_DOMAIN_UST, cmd_ctx->session); + } else if (cmd_ctx->session->kernel_session && + cmd_ctx->session->kernel_session->consumer) { + ret = cmd_enable_consumer(LTTNG_DOMAIN_KERNEL, + cmd_ctx->session); + } + } break; } case LTTNG_ENABLE_EVENT: @@ -4313,7 +4704,8 @@ skip_domain: struct lttng_event_field *fields; ssize_t nb_fields; - nb_fields = cmd_list_tracepoint_fields(cmd_ctx->lsm->domain.type, &fields); + nb_fields = cmd_list_tracepoint_fields(cmd_ctx->lsm->domain.type, + &fields); if (nb_fields < 0) { ret = -nb_fields; goto error; @@ -4323,7 +4715,8 @@ skip_domain: * Setup lttng message with payload size set to the event list size in * bytes and then copy list into the llm payload. */ - ret = setup_lttng_msg(cmd_ctx, sizeof(struct lttng_event_field) * nb_fields); + ret = setup_lttng_msg(cmd_ctx, + sizeof(struct lttng_event_field) * nb_fields); if (ret < 0) { free(fields); goto setup_error; @@ -4340,8 +4733,56 @@ skip_domain: } case LTTNG_SET_CONSUMER_URI: { + size_t nb_uri, len; + struct lttng_uri *uris; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri == 0) { + ret = LTTCOMM_INVALID; + goto error; + } + + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Receiving %lu URI(s) from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + ret = cmd_set_consumer_uri(cmd_ctx->lsm->domain.type, cmd_ctx->session, - &cmd_ctx->lsm->u.uri); + nb_uri, uris); + if (ret != LTTCOMM_OK) { + goto error; + } + + /* + * XXX: 0 means that this URI should be applied on the session. Should + * be a DOMAIN enuam. + */ + if (cmd_ctx->lsm->domain.type == 0) { + /* Add the URI for the UST session if a consumer is present. */ + if (cmd_ctx->session->ust_session && + cmd_ctx->session->ust_session->consumer) { + ret = cmd_set_consumer_uri(LTTNG_DOMAIN_UST, cmd_ctx->session, + nb_uri, uris); + } else if (cmd_ctx->session->kernel_session && + cmd_ctx->session->kernel_session->consumer) { + ret = cmd_set_consumer_uri(LTTNG_DOMAIN_KERNEL, + cmd_ctx->session, nb_uri, uris); + } + } + break; } case LTTNG_START_TRACE: @@ -4356,26 +4797,47 @@ skip_domain: } case LTTNG_CREATE_SESSION: { - ret = cmd_create_session(cmd_ctx->lsm->session.name, - cmd_ctx->lsm->session.path, &cmd_ctx->creds); - break; - } - case LTTNG_CREATE_SESSION_URI: - { - ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, - &cmd_ctx->lsm->u.create_uri.ctrl_uri, - &cmd_ctx->lsm->u.create_uri.data_uri, - cmd_ctx->lsm->u.create_uri.enable_consumer, &cmd_ctx->creds); + size_t nb_uri, len; + struct lttng_uri *uris = NULL; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri > 0) { + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTCOMM_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Waiting for %lu URIs from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + + if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) { + DBG("Creating session with ONE network URI is a bad call"); + ret = LTTCOMM_SESSION_FAIL; + goto error; + } + } + + ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, nb_uri, + &cmd_ctx->creds); + break; } case LTTNG_DESTROY_SESSION: { ret = cmd_destroy_session(cmd_ctx->session, cmd_ctx->lsm->session.name); - /* - * Set session to NULL so we do not unlock it after - * free. - */ + + /* Set session to NULL so we do not unlock it after free. */ cmd_ctx->session = NULL; break; } @@ -4555,13 +5017,194 @@ init_setup_error: return ret; } +/* + * Thread managing health check socket. + */ +static void *thread_manage_health(void *data) +{ + int sock = -1, new_sock = -1, ret, i, pollfd, err = -1; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct lttcomm_health_msg msg; + struct lttcomm_health_data reply; + + DBG("[thread] Manage health check started"); + + rcu_register_thread(); + + /* Create unix socket */ + sock = lttcomm_create_unix_sock(health_unix_sock_path); + if (sock < 0) { + ERR("Unable to create health check Unix socket"); + ret = -1; + goto error; + } + + ret = lttcomm_listen_unix_sock(sock); + if (ret < 0) { + goto error; + } + + /* + * Pass 2 as size here for the thread quit pipe and client_sock. Nothing + * more will be added to this poll set. + */ + ret = create_thread_poll_set(&events, 2); + if (ret < 0) { + goto error; + } + + /* Add the application registration socket */ + ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLPRI); + if (ret < 0) { + goto error; + } + + while (1) { + DBG("Health check ready"); + + nb_fd = LTTNG_POLL_GETNB(&events); + + /* Inifinite blocking call, waiting for transmission */ +restart: + ret = lttng_poll_wait(&events, -1); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + /* Thread quit pipe has been closed. Killing thread. */ + ret = check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; + } + + /* Event on the registration socket */ + if (pollfd == sock) { + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Health socket poll error"); + goto error; + } + } + } + + new_sock = lttcomm_accept_unix_sock(sock); + if (new_sock < 0) { + goto error; + } + + DBG("Receiving data from client for health..."); + ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); + if (ret <= 0) { + DBG("Nothing recv() from client... continuing"); + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + continue; + } + + rcu_thread_online(); + + switch (msg.component) { + case LTTNG_HEALTH_CMD: + reply.ret_code = health_check_state(&health_thread_cmd); + break; + case LTTNG_HEALTH_APP_MANAGE: + reply.ret_code = health_check_state(&health_thread_app_manage); + break; + case LTTNG_HEALTH_APP_REG: + reply.ret_code = health_check_state(&health_thread_app_reg); + break; + case LTTNG_HEALTH_KERNEL: + reply.ret_code = health_check_state(&health_thread_kernel); + break; + case LTTNG_HEALTH_CONSUMER: + reply.ret_code = check_consumer_health(); + break; + case LTTNG_HEALTH_ALL: + reply.ret_code = + health_check_state(&health_thread_app_manage) && + health_check_state(&health_thread_app_reg) && + health_check_state(&health_thread_cmd) && + health_check_state(&health_thread_kernel) && + check_consumer_health(); + break; + default: + reply.ret_code = LTTCOMM_UND; + break; + } + + /* + * Flip ret value since 0 is a success and 1 indicates a bad health for + * the client where in the sessiond it is the opposite. Again, this is + * just to make things easier for us poor developer which enjoy a lot + * lazyness. + */ + if (reply.ret_code == 0 || reply.ret_code == 1) { + reply.ret_code = !reply.ret_code; + } + + DBG2("Health check return value %d", reply.ret_code); + + ret = send_unix_sock(new_sock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + ERR("Failed to send health data back to client"); + } + + /* End of transmission */ + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + new_sock = -1; + } + +exit: +error: + if (err) { + ERR("Health error occurred in %s", __func__); + } + DBG("Health check thread dying"); + unlink(health_unix_sock_path); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } + } + if (new_sock >= 0) { + ret = close(new_sock); + if (ret) { + PERROR("close"); + } + } + + lttng_poll_clean(&events); + + rcu_unregister_thread(); + return NULL; +} + /* * This thread manage all clients request using the unix client socket for * communication. */ static void *thread_manage_clients(void *data) { - int sock = -1, ret, i, pollfd; + int sock = -1, ret, i, pollfd, err = -1; int sock_error; uint32_t revents, nb_fd; struct command_ctx *cmd_ctx = NULL; @@ -4571,6 +5214,8 @@ static void *thread_manage_clients(void *data) rcu_register_thread(); + health_code_update(&health_thread_cmd); + ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { goto error; @@ -4598,6 +5243,8 @@ static void *thread_manage_clients(void *data) kill(ppid, SIGUSR1); } + health_code_update(&health_thread_cmd); + while (1) { DBG("Accepting client command ..."); @@ -4605,7 +5252,9 @@ static void *thread_manage_clients(void *data) /* Inifinite blocking call, waiting for transmission */ restart: + health_poll_update(&health_thread_cmd); ret = lttng_poll_wait(&events, -1); + health_poll_update(&health_thread_cmd); if (ret < 0) { /* * Restart interrupted system call. @@ -4621,10 +5270,13 @@ static void *thread_manage_clients(void *data) revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + health_code_update(&health_thread_cmd); + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { - goto error; + err = 0; + goto exit; } /* Event on the registration socket */ @@ -4638,6 +5290,8 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); + health_code_update(&health_thread_cmd); + sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { goto error; @@ -4666,6 +5320,8 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; + health_code_update(&health_thread_cmd); + /* * Data is received from the lttng client. The struct * lttcomm_session_msg (lsm) contains the command and data request of @@ -4685,6 +5341,8 @@ static void *thread_manage_clients(void *data) continue; } + health_code_update(&health_thread_cmd); + // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -4716,6 +5374,8 @@ static void *thread_manage_clients(void *data) continue; } + health_code_update(&health_thread_cmd); + DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, lttng_strerror(-cmd_ctx->llm->ret_code)); @@ -4732,9 +5392,18 @@ static void *thread_manage_clients(void *data) sock = -1; clean_command_ctx(&cmd_ctx); + + health_code_update(&health_thread_cmd); } +exit: error: + if (err) { + health_error(&health_thread_cmd); + ERR("Health error occurred in %s", __func__); + } + health_exit(&health_thread_cmd); + DBG("Client thread dying"); unlink(client_unix_sock_path); if (client_sock >= 0) { @@ -5283,6 +5952,11 @@ int main(int argc, char **argv) DEFAULT_GLOBAL_APPS_WAIT_SHM_PATH); } + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_GLOBAL_HEALTH_UNIX_SOCK); + } + /* Setup kernel consumerd path */ snprintf(kconsumer_data.err_unix_sock_path, PATH_MAX, DEFAULT_KCONSUMERD_ERR_SOCK_PATH, rundir); @@ -5333,6 +6007,12 @@ int main(int argc, char **argv) snprintf(wait_shm_path, PATH_MAX, DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); } + + /* Set health check Unix path */ + if (strlen(health_unix_sock_path) == 0) { + snprintf(health_unix_sock_path, sizeof(health_unix_sock_path), + DEFAULT_HOME_HEALTH_UNIX_SOCK, home_path); + } } /* Set consumer initial state */ @@ -5465,6 +6145,32 @@ int main(int argc, char **argv) */ uatomic_set(&relayd_net_seq_idx, 1); + /* Init all health thread counters. */ + health_init(&health_thread_cmd); + health_init(&health_thread_kernel); + health_init(&health_thread_app_manage); + health_init(&health_thread_app_reg); + + /* + * Init health counters of the consumer thread. We do a quick hack here to + * the state of the consumer health is fine even if the thread is not + * started. This is simply to ease our life and has no cost what so ever. + */ + health_init(&kconsumer_data.health); + health_poll_update(&kconsumer_data.health); + health_init(&ustconsumer32_data.health); + health_poll_update(&ustconsumer32_data.health); + health_init(&ustconsumer64_data.health); + health_poll_update(&ustconsumer64_data.health); + + /* Create thread to manage the client socket */ + ret = pthread_create(&health_thread, NULL, + thread_manage_health, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create health"); + goto exit_health; + } + /* Create thread to manage the client socket */ ret = pthread_create(&client_thread, NULL, thread_manage_clients, (void *) NULL); @@ -5546,6 +6252,7 @@ exit_dispatch: } exit_client: +exit_health: exit: /* * cleanup() is called when no other thread is running.