X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=550aa1b83f68d65f01e936d1e81fb5308b0b2b4e;hp=b4755b0b8cce0d7eba70d60d4a474345ba2bbd1c;hb=55d097957f5bb8138959ad2202a40d85d49f029e;hpb=bd69add26f7f847b1d23766ec631f8e3fde4ec7c diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index b4755b0b8..550aa1b83 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -37,7 +38,6 @@ #include #include -#include #include #include #include @@ -46,6 +46,7 @@ #include #include "lttng-sessiond.h" +#include "buffer-registry.h" #include "channel.h" #include "cmd.h" #include "consumer.h" @@ -59,20 +60,18 @@ #include "ust-consumer.h" #include "utils.h" #include "fd-limit.h" -#include "filter.h" -#include "health.h" +#include "health-sessiond.h" #include "testpoint.h" +#include "ust-thread.h" #define CONSUMERD_FILE "lttng-consumerd" /* Const values */ -const char default_home_dir[] = DEFAULT_HOME_DIR; const char default_tracing_group[] = DEFAULT_TRACING_GROUP; -const char default_ust_sock_dir[] = DEFAULT_UST_SOCK_DIR; -const char default_global_apps_pipe[] = DEFAULT_GLOBAL_APPS_PIPE; const char *progname; const char *opt_tracing_group; +static const char *opt_pidfile; static int opt_sig_parent; static int opt_verbose_consumer; static int opt_daemon; @@ -149,13 +148,17 @@ static int thread_quit_pipe[2] = { -1, -1 }; */ static int apps_cmd_pipe[2] = { -1, -1 }; +int apps_cmd_notify_pipe[2] = { -1, -1 }; + /* Pthread, Mutexes and Semaphores */ static pthread_t apps_thread; +static pthread_t apps_notify_thread; static pthread_t reg_apps_thread; static pthread_t client_thread; static pthread_t kernel_thread; static pthread_t dispatch_thread; static pthread_t health_thread; +static pthread_t ht_cleanup_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -222,11 +225,16 @@ enum consumerd_state { static enum consumerd_state ust_consumerd_state; static enum consumerd_state kernel_consumerd_state; -/* Used for the health monitoring of the session daemon. See health.h */ -struct health_state health_thread_cmd; -struct health_state health_thread_app_manage; -struct health_state health_thread_app_reg; -struct health_state health_thread_kernel; +/* + * Socket timeout for receiving and sending in seconds. + */ +static int app_socket_timeout; + +/* Set in main() with the current page size. */ +long page_size; + +/* Application health monitoring */ +struct health_app *health_sessiond; static void setup_consumerd_path(void) @@ -280,15 +288,11 @@ void setup_consumerd_path(void) /* * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set. */ -static int create_thread_poll_set(struct lttng_poll_event *events, - unsigned int size) +int sessiond_set_thread_pollset(struct lttng_poll_event *events, size_t size) { int ret; - if (events == NULL || size == 0) { - ret = -1; - goto error; - } + assert(events); ret = lttng_poll_create(events, size, LTTNG_CLOEXEC); if (ret < 0) { @@ -296,7 +300,7 @@ static int create_thread_poll_set(struct lttng_poll_event *events, } /* Add quit pipe */ - ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN); + ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR); if (ret < 0) { goto error; } @@ -312,7 +316,7 @@ error: * * Return 1 if it was triggered else 0; */ -static int check_thread_quit_pipe(int fd, uint32_t events) +int sessiond_check_thread_quit_pipe(int fd, uint32_t events) { if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) { return 1; @@ -386,20 +390,79 @@ static void stop_threads(void) futex_nto1_wake(&ust_cmd_queue.futex); } +/* + * Close every consumer sockets. + */ +static void close_consumer_sockets(void) +{ + int ret; + + if (kconsumer_data.err_sock >= 0) { + ret = close(kconsumer_data.err_sock); + if (ret < 0) { + PERROR("kernel consumer err_sock close"); + } + } + if (ustconsumer32_data.err_sock >= 0) { + ret = close(ustconsumer32_data.err_sock); + if (ret < 0) { + PERROR("UST consumerd32 err_sock close"); + } + } + if (ustconsumer64_data.err_sock >= 0) { + ret = close(ustconsumer64_data.err_sock); + if (ret < 0) { + PERROR("UST consumerd64 err_sock close"); + } + } + if (kconsumer_data.cmd_sock >= 0) { + ret = close(kconsumer_data.cmd_sock); + if (ret < 0) { + PERROR("kernel consumer cmd_sock close"); + } + } + if (ustconsumer32_data.cmd_sock >= 0) { + ret = close(ustconsumer32_data.cmd_sock); + if (ret < 0) { + PERROR("UST consumerd32 cmd_sock close"); + } + } + if (ustconsumer64_data.cmd_sock >= 0) { + ret = close(ustconsumer64_data.cmd_sock); + if (ret < 0) { + PERROR("UST consumerd64 cmd_sock close"); + } + } +} + /* * Cleanup the daemon */ static void cleanup(void) { int ret; - char *cmd; + char *cmd = NULL; struct ltt_session *sess, *stmp; DBG("Cleaning up"); - /* First thing first, stop all threads */ + /* + * Close the thread quit pipe. It has already done its job, + * since we are now called. + */ utils_close_pipe(thread_quit_pipe); + /* + * If opt_pidfile is undefined, the default file will be wiped when + * removing the rundir. + */ + if (opt_pidfile) { + ret = remove(opt_pidfile); + if (ret < 0) { + PERROR("remove pidfile %s", opt_pidfile); + } + } + DBG("Removing %s directory", rundir); ret = asprintf(&cmd, "rm -rf %s", rundir); if (ret < 0) { @@ -429,6 +492,7 @@ static void cleanup(void) DBG("Closing all UST sockets"); ust_app_clean_list(); + buffer_reg_destroy_registries(); if (is_root && !opt_no_kernel) { DBG2("Closing kernel fd"); @@ -442,8 +506,7 @@ static void cleanup(void) modprobe_remove_lttng_all(); } - utils_close_pipe(kernel_poll_pipe); - utils_close_pipe(apps_cmd_pipe); + close_consumer_sockets(); /* */ DBG("%c[%d;%dm*** assert failed :-) *** ==> %c[%dm%c[%d;%dm" @@ -460,7 +523,7 @@ static void cleanup(void) static int send_unix_sock(int sock, void *buf, size_t len) { /* Check valid length */ - if (len <= 0) { + if (len == 0) { return -1; } @@ -615,6 +678,8 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) if (ret < 0) { goto error; } + /* Update the stream global counter */ + ksess->stream_count_global += ret; /* * Have we already sent fds to the consumer? If yes, it means @@ -625,20 +690,20 @@ static int update_kernel_stream(struct consumer_data *consumer_data, int fd) struct lttng_ht_iter iter; struct consumer_socket *socket; - + rcu_read_lock(); cds_lfht_for_each_entry(ksess->consumer->socks->ht, &iter.iter, socket, node.node) { - /* Code flow error */ - assert(socket->fd >= 0); - pthread_mutex_lock(socket->lock); - ret = kernel_consumer_send_channel_stream(socket->fd, - channel, ksess); + ret = kernel_consumer_send_channel_stream(socket, + channel, ksess, + session->output_traces ? 1 : 0); pthread_mutex_unlock(socket->lock); if (ret < 0) { + rcu_read_unlock(); goto error; } } + rcu_read_unlock(); } goto error; } @@ -655,13 +720,18 @@ error: } /* - * For each tracing session, update newly registered apps. + * For each tracing session, update newly registered apps. The session list + * lock MUST be acquired before calling this. */ static void update_ust_app(int app_sock) { struct ltt_session *sess, *stmp; - session_lock_list(); + /* Consumer is in an ERROR state. Stop any application update. */ + if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { + /* Stop the update process since the consumer is dead. */ + return; + } /* For all tracing session(s) */ cds_list_for_each_entry_safe(sess, stmp, &session_list_ptr->head, list) { @@ -671,8 +741,6 @@ static void update_ust_app(int app_sock) } session_unlock(sess); } - - session_unlock_list(); } /* @@ -688,34 +756,44 @@ static void *thread_manage_kernel(void *data) char tmp; struct lttng_poll_event events; - DBG("Thread manage kernel started"); + DBG("[thread] Thread manage kernel started"); - testpoint(thread_manage_kernel); + health_register(health_sessiond, HEALTH_TYPE_KERNEL); - health_code_update(&health_thread_kernel); - - testpoint(thread_manage_kernel_before_loop); + /* + * This first step of the while is to clean this structure which could free + * non NULL pointers so initialize it before the loop. + */ + lttng_poll_init(&events); - ret = create_thread_poll_set(&events, 2); - if (ret < 0) { - goto error_poll_create; + if (testpoint(thread_manage_kernel)) { + goto error_testpoint; } - ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); - if (ret < 0) { - goto error; + health_code_update(); + + if (testpoint(thread_manage_kernel_before_loop)) { + goto error_testpoint; } while (1) { - health_code_update(&health_thread_kernel); + health_code_update(); if (update_poll_flag == 1) { - /* - * Reset number of fd in the poll set. Always 2 since there is the thread - * quit pipe and the kernel pipe. - */ - events.nb_fd = 2; + /* Clean events object. We are about to populate it again. */ + lttng_poll_clean(&events); + + ret = sessiond_set_thread_pollset(&events, 2); + if (ret < 0) { + goto error_poll_create; + } + ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); + if (ret < 0) { + goto error; + } + + /* This will add the available kernel channel if any. */ ret = update_kernel_poll(&events); if (ret < 0) { goto error; @@ -723,18 +801,13 @@ static void *thread_manage_kernel(void *data) update_poll_flag = 0; } - nb_fd = LTTNG_POLL_GETNB(&events); - - DBG("Thread kernel polling on %d fds", nb_fd); - - /* Zeroed the poll events */ - lttng_poll_reset(&events); + DBG("Thread kernel polling on %d fds", LTTNG_POLL_GETNB(&events)); /* Poll infinite value of time */ restart: - health_poll_update(&health_thread_kernel); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_kernel); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -750,15 +823,17 @@ static void *thread_manage_kernel(void *data) continue; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_kernel); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -766,7 +841,13 @@ static void *thread_manage_kernel(void *data) /* Check for data on kernel pipe */ if (pollfd == kernel_poll_pipe[0] && (revents & LPOLLIN)) { - ret = read(kernel_poll_pipe[0], &tmp, 1); + do { + ret = read(kernel_poll_pipe[0], &tmp, 1); + } while (ret < 0 && errno == EINTR); + /* + * Ret value is useless here, if this pipe gets any actions an + * update is required anyway. + */ update_poll_flag = 1; continue; } else { @@ -793,11 +874,16 @@ exit: error: lttng_poll_clean(&events); error_poll_create: +error_testpoint: + utils_close_pipe(kernel_poll_pipe); + kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1; if (err) { - health_error(&health_thread_kernel); + health_error(); ERR("Health error occurred in %s", __func__); + WARN("Kernel thread died unexpectedly. " + "Kernel tracing can continue but CPU hotplug is disabled."); } - health_exit(&health_thread_kernel); + health_unregister(health_sessiond); DBG("Kernel thread dying"); return NULL; } @@ -838,39 +924,41 @@ static void *thread_manage_consumer(void *data) DBG("[thread] Manage consumer started"); - health_code_update(&consumer_data->health); + health_register(health_sessiond, HEALTH_TYPE_CONSUMER); - ret = lttcomm_listen_unix_sock(consumer_data->err_sock); - if (ret < 0) { - goto error_listen; - } + health_code_update(); /* - * Pass 2 as size here for the thread quit pipe and kconsumerd_err_sock. - * Nothing more will be added to this poll set. + * Pass 3 as size here for the thread quit pipe, consumerd_err_sock and the + * metadata_sock. Nothing more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 3); if (ret < 0) { goto error_poll; } + /* + * The error socket here is already in a listening state which was done + * just before spawning this thread to avoid a race between the consumer + * daemon exec trying to connect and the listen() call. + */ ret = lttng_poll_add(&events, consumer_data->err_sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } - nb_fd = LTTNG_POLL_GETNB(&events); + health_code_update(); - health_code_update(&consumer_data->health); - - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart: - health_poll_update(&consumer_data->health); + health_poll_entry(); - testpoint(thread_manage_consumer); + if (testpoint(thread_manage_consumer)) { + goto error; + } ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -881,15 +969,17 @@ restart: goto error; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&consumer_data->health); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -909,7 +999,13 @@ restart: goto error; } - health_code_update(&consumer_data->health); + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + + health_code_update(); DBG2("Receiving code from consumer err_sock"); @@ -920,93 +1016,139 @@ restart: goto error; } - health_code_update(&consumer_data->health); + health_code_update(); if (code == LTTCOMM_CONSUMERD_COMMAND_SOCK_READY) { + /* Connect both socket, command and metadata. */ consumer_data->cmd_sock = lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); - if (consumer_data->cmd_sock < 0) { + consumer_data->metadata_fd = + lttcomm_connect_unix_sock(consumer_data->cmd_unix_sock_path); + if (consumer_data->cmd_sock < 0 + || consumer_data->metadata_fd < 0) { + PERROR("consumer connect cmd socket"); /* On error, signal condition and quit. */ signal_consumer_condition(consumer_data, -1); - PERROR("consumer connect"); goto error; } + consumer_data->metadata_sock.fd_ptr = &consumer_data->metadata_fd; + /* Create metadata socket lock. */ + consumer_data->metadata_sock.lock = zmalloc(sizeof(pthread_mutex_t)); + if (consumer_data->metadata_sock.lock == NULL) { + PERROR("zmalloc pthread mutex"); + ret = -1; + goto error; + } + pthread_mutex_init(consumer_data->metadata_sock.lock, NULL); + signal_consumer_condition(consumer_data, 1); - DBG("Consumer command socket ready"); + DBG("Consumer command socket ready (fd: %d", consumer_data->cmd_sock); + DBG("Consumer metadata socket ready (fd: %d)", + consumer_data->metadata_fd); } else { ERR("consumer error when waiting for SOCK_READY : %s", lttcomm_get_readable_code(-code)); goto error; } - /* Remove the kconsumerd error sock since we've established a connexion */ + /* Remove the consumerd error sock since we've established a connexion */ ret = lttng_poll_del(&events, consumer_data->err_sock); if (ret < 0) { goto error; } + /* Add new accepted error socket. */ ret = lttng_poll_add(&events, sock, LPOLLIN | LPOLLRDHUP); if (ret < 0) { goto error; } - health_code_update(&consumer_data->health); + /* Add metadata socket that is successfully connected. */ + ret = lttng_poll_add(&events, consumer_data->metadata_fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } - /* Update number of fd */ - nb_fd = LTTNG_POLL_GETNB(&events); + health_code_update(); - /* Inifinite blocking call, waiting for transmission */ + /* Infinite blocking call, waiting for transmission */ restart_poll: - health_poll_update(&consumer_data->health); - ret = lttng_poll_wait(&events, -1); - health_poll_update(&consumer_data->health); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart_poll; + while (1) { + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart_poll; + } + goto error; } - goto error; - } - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); + nb_fd = ret; - health_code_update(&consumer_data->health); + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); - /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } + health_code_update(); - /* Event on the kconsumerd socket */ - if (pollfd == sock) { - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("consumer err socket second poll error"); - goto error; + /* Thread quit pipe has been closed. Killing thread. */ + ret = sessiond_check_thread_quit_pipe(pollfd, revents); + if (ret) { + err = 0; + goto exit; } - } - } - health_code_update(&consumer_data->health); + if (pollfd == sock) { + /* Event on the consumerd socket */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("consumer err socket second poll error"); + goto error; + } + health_code_update(); + /* Wait for any kconsumerd error */ + ret = lttcomm_recv_unix_sock(sock, &code, + sizeof(enum lttcomm_return_code)); + if (ret <= 0) { + ERR("consumer closed the command socket"); + goto error; + } - /* Wait for any kconsumerd error */ - ret = lttcomm_recv_unix_sock(sock, &code, - sizeof(enum lttcomm_return_code)); - if (ret <= 0) { - ERR("consumer closed the command socket"); - goto error; - } + ERR("consumer return code : %s", + lttcomm_get_readable_code(-code)); - ERR("consumer return code : %s", lttcomm_get_readable_code(-code)); + goto exit; + } else if (pollfd == consumer_data->metadata_fd) { + /* UST metadata requests */ + ret = ust_consumer_metadata_request( + &consumer_data->metadata_sock); + if (ret < 0) { + ERR("Handling metadata request"); + goto error; + } + break; + } else { + ERR("Unknown pollfd"); + goto error; + } + } + health_code_update(); + } exit: error: + /* + * We lock here because we are about to close the sockets and some other + * thread might be using them so get exclusive access which will abort all + * other consumer command by other threads. + */ + pthread_mutex_lock(&consumer_data->lock); + /* Immediately set the consumerd state to stopped */ if (consumer_data->type == LTTNG_CONSUMER_KERNEL) { uatomic_set(&kernel_consumerd_state, CONSUMER_ERROR); @@ -1023,13 +1165,22 @@ error: if (ret) { PERROR("close"); } + consumer_data->err_sock = -1; } if (consumer_data->cmd_sock >= 0) { ret = close(consumer_data->cmd_sock); if (ret) { PERROR("close"); } + consumer_data->cmd_sock = -1; } + if (*consumer_data->metadata_sock.fd_ptr >= 0) { + ret = close(*consumer_data->metadata_sock.fd_ptr); + if (ret) { + PERROR("close"); + } + } + if (sock >= 0) { ret = close(sock); if (ret) { @@ -1040,15 +1191,19 @@ error: unlink(consumer_data->err_unix_sock_path); unlink(consumer_data->cmd_unix_sock_path); consumer_data->pid = 0; + pthread_mutex_unlock(&consumer_data->lock); + + /* Cleanup metadata socket mutex. */ + pthread_mutex_destroy(consumer_data->metadata_sock.lock); + free(consumer_data->metadata_sock.lock); lttng_poll_clean(&events); error_poll: -error_listen: if (err) { - health_error(&consumer_data->health); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&consumer_data->health); + health_unregister(health_sessiond); DBG("consumer thread cleanup completed"); return NULL; @@ -1061,19 +1216,22 @@ static void *thread_manage_apps(void *data) { int i, ret, pollfd, err = -1; uint32_t revents, nb_fd; - struct ust_command ust_cmd; struct lttng_poll_event events; DBG("[thread] Manage application started"); - testpoint(thread_manage_apps); - rcu_register_thread(); rcu_thread_online(); - health_code_update(&health_thread_app_manage); + health_register(health_sessiond, HEALTH_TYPE_APP_MANAGE); - ret = create_thread_poll_set(&events, 2); + if (testpoint(thread_manage_apps)) { + goto error_testpoint; + } + + health_code_update(); + + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_poll_create; } @@ -1083,23 +1241,20 @@ static void *thread_manage_apps(void *data) goto error; } - testpoint(thread_manage_apps_before_loop); + if (testpoint(thread_manage_apps_before_loop)) { + goto error; + } - health_code_update(&health_thread_app_manage); + health_code_update(); while (1) { - /* Zeroed the events structure */ - lttng_poll_reset(&events); - - nb_fd = LTTNG_POLL_GETNB(&events); - - DBG("Apps thread polling on %d fds", nb_fd); + DBG("Apps thread polling on %d fds", LTTNG_POLL_GETNB(&events)); /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_manage); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_manage); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1110,15 +1265,17 @@ static void *thread_manage_apps(void *data) goto error; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_app_manage); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1130,64 +1287,45 @@ static void *thread_manage_apps(void *data) ERR("Apps command pipe error"); goto error; } else if (revents & LPOLLIN) { + int sock; + /* Empty pipe */ - ret = read(apps_cmd_pipe[0], &ust_cmd, sizeof(ust_cmd)); - if (ret < 0 || ret < sizeof(ust_cmd)) { + do { + ret = read(apps_cmd_pipe[0], &sock, sizeof(sock)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret < sizeof(sock)) { PERROR("read apps cmd pipe"); goto error; } - health_code_update(&health_thread_app_manage); + health_code_update(); - /* Register applicaton to the session daemon */ - ret = ust_app_register(&ust_cmd.reg_msg, - ust_cmd.sock); - if (ret == -ENOMEM) { + /* + * We only monitor the error events of the socket. This + * thread does not handle any incoming data from UST + * (POLLIN). + */ + ret = lttng_poll_add(&events, sock, + LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { goto error; - } else if (ret < 0) { - break; } - health_code_update(&health_thread_app_manage); - /* - * Validate UST version compatibility. + * Set socket timeout for both receiving and ending. + * app_socket_timeout is in seconds, whereas + * lttcomm_setsockopt_rcv_timeout and + * lttcomm_setsockopt_snd_timeout expect msec as + * parameter. */ - ret = ust_app_validate_version(ust_cmd.sock); - if (ret >= 0) { - /* - * Add channel(s) and event(s) to newly registered apps - * from lttng global UST domain. - */ - update_ust_app(ust_cmd.sock); - } - - health_code_update(&health_thread_app_manage); - - ret = ust_app_register_done(ust_cmd.sock); - if (ret < 0) { - /* - * If the registration is not possible, we simply - * unregister the apps and continue - */ - ust_app_unregister(ust_cmd.sock); - } else { - /* - * We just need here to monitor the close of the UST - * socket and poll set monitor those by default. - * Listen on POLLIN (even if we never expect any - * data) to ensure that hangup wakes us. - */ - ret = lttng_poll_add(&events, ust_cmd.sock, LPOLLIN); - if (ret < 0) { - goto error; - } + (void) lttcomm_setsockopt_rcv_timeout(sock, + app_socket_timeout * 1000); + (void) lttcomm_setsockopt_snd_timeout(sock, + app_socket_timeout * 1000); - DBG("Apps with sock %d added to poll set", - ust_cmd.sock); - } + DBG("Apps with sock %d added to poll set", sock); - health_code_update(&health_thread_app_manage); + health_code_update(); break; } @@ -1209,7 +1347,7 @@ static void *thread_manage_apps(void *data) } } - health_code_update(&health_thread_app_manage); + health_code_update(); } } @@ -1217,34 +1355,192 @@ exit: error: lttng_poll_clean(&events); error_poll_create: +error_testpoint: + utils_close_pipe(apps_cmd_pipe); + apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1; + + /* + * We don't clean the UST app hash table here since already registered + * applications can still be controlled so let them be until the session + * daemon dies or the applications stop. + */ + if (err) { - health_error(&health_thread_app_manage); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&health_thread_app_manage); + health_unregister(health_sessiond); DBG("Application communication apps thread cleanup complete"); rcu_thread_offline(); rcu_unregister_thread(); return NULL; } +/* + * Send a socket to a thread This is called from the dispatch UST registration + * thread once all sockets are set for the application. + * + * The sock value can be invalid, we don't really care, the thread will handle + * it and make the necessary cleanup if so. + * + * On success, return 0 else a negative value being the errno message of the + * write(). + */ +static int send_socket_to_thread(int fd, int sock) +{ + int ret; + + /* + * It's possible that the FD is set as invalid with -1 concurrently just + * before calling this function being a shutdown state of the thread. + */ + if (fd < 0) { + ret = -EBADF; + goto error; + } + + do { + ret = write(fd, &sock, sizeof(sock)); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != sizeof(sock)) { + PERROR("write apps pipe %d", fd); + if (ret < 0) { + ret = -errno; + } + goto error; + } + + /* All good. Don't send back the write positive ret value. */ + ret = 0; +error: + return ret; +} + +/* + * Sanitize the wait queue of the dispatch registration thread meaning removing + * invalid nodes from it. This is to avoid memory leaks for the case the UST + * notify socket is never received. + */ +static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) +{ + int ret, nb_fd = 0, i; + unsigned int fd_added = 0; + struct lttng_poll_event events; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + + assert(wait_queue); + + lttng_poll_init(&events); + + /* Just skip everything for an empty queue. */ + if (!wait_queue->count) { + goto end; + } + + ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_create; + } + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + assert(wait_node->app); + ret = lttng_poll_add(&events, wait_node->app->sock, + LPOLLHUP | LPOLLERR); + if (ret < 0) { + goto error; + } + + fd_added = 1; + } + + if (!fd_added) { + goto end; + } + + /* + * Poll but don't block so we can quickly identify the faulty events and + * clean them afterwards from the wait queue. + */ + ret = lttng_poll_wait(&events, 0); + if (ret < 0) { + goto error; + } + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Get faulty FD. */ + uint32_t revents = LTTNG_POLL_GETEV(&events, i); + int pollfd = LTTNG_POLL_GETFD(&events, i); + + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue->head, head) { + if (pollfd == wait_node->app->sock && + (revents & (LPOLLHUP | LPOLLERR))) { + cds_list_del(&wait_node->head); + wait_queue->count--; + ust_app_destroy(wait_node->app); + free(wait_node); + break; + } + } + } + + if (nb_fd > 0) { + DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); + } + +end: + lttng_poll_clean(&events); + return; + +error: + lttng_poll_clean(&events); +error_create: + ERR("Unable to sanitize wait queue"); + return; +} + /* * Dispatch request from the registration threads to the application * communication thread. */ static void *thread_dispatch_ust_registration(void *data) { - int ret; + int ret, err = -1; struct cds_wfq_node *node; struct ust_command *ust_cmd = NULL; + struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; + struct ust_reg_wait_queue wait_queue = { + .count = 0, + }; + + health_register(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); + + health_code_update(); + + CDS_INIT_LIST_HEAD(&wait_queue.head); DBG("[thread] Dispatch UST command started"); while (!CMM_LOAD_SHARED(dispatch_thread_exit)) { + health_code_update(); + /* Atomically prepare the queue futex */ futex_nto1_prepare(&ust_cmd_queue.futex); do { + struct ust_app *app = NULL; + ust_cmd = NULL; + + /* + * Make sure we don't have node(s) that have hung up before receiving + * the notify socket. This is to clean the list in order to avoid + * memory leaks from notify socket that are never seen. + */ + sanitize_wait_queue(&wait_queue); + + health_code_update(); /* Dequeue command for registration */ node = cds_wfq_dequeue_blocking(&ust_cmd_queue.queue); if (node == NULL) { @@ -1261,34 +1557,176 @@ static void *thread_dispatch_ust_registration(void *data) ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, ust_cmd->sock, ust_cmd->reg_msg.name, ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - /* - * Inform apps thread of the new application registration. This - * call is blocking so we can be assured that the data will be read - * at some point in time or wait to the end of the world :) - */ - ret = write(apps_cmd_pipe[1], ust_cmd, - sizeof(struct ust_command)); - if (ret < 0) { - PERROR("write apps cmd pipe"); - if (errno == EBADF) { + + if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { + wait_node = zmalloc(sizeof(*wait_node)); + if (!wait_node) { + PERROR("zmalloc wait_node dispatch"); + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock dispatch %d", ust_cmd->sock); + } + lttng_fd_put(1, LTTNG_FD_APPS); + free(ust_cmd); + goto error; + } + CDS_INIT_LIST_HEAD(&wait_node->head); + + /* Create application object if socket is CMD. */ + wait_node->app = ust_app_create(&ust_cmd->reg_msg, + ust_cmd->sock); + if (!wait_node->app) { + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock dispatch %d", ust_cmd->sock); + } + lttng_fd_put(1, LTTNG_FD_APPS); + free(wait_node); + free(ust_cmd); + continue; + } + /* + * Add application to the wait queue so we can set the notify + * socket before putting this object in the global ht. + */ + cds_list_add(&wait_node->head, &wait_queue.head); + wait_queue.count++; + + free(ust_cmd); + /* + * We have to continue here since we don't have the notify + * socket and the application MUST be added to the hash table + * only at that moment. + */ + continue; + } else { + /* + * Look for the application in the local wait queue and set the + * notify socket if found. + */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue.head, head) { + health_code_update(); + if (wait_node->app->pid == ust_cmd->reg_msg.pid) { + wait_node->app->notify_sock = ust_cmd->sock; + cds_list_del(&wait_node->head); + wait_queue.count--; + app = wait_node->app; + free(wait_node); + DBG3("UST app notify socket %d is set", ust_cmd->sock); + break; + } + } + + /* + * With no application at this stage the received socket is + * basically useless so close it before we free the cmd data + * structure for good. + */ + if (!app) { + ret = close(ust_cmd->sock); + if (ret < 0) { + PERROR("close ust sock dispatch %d", ust_cmd->sock); + } + lttng_fd_put(1, LTTNG_FD_APPS); + } + free(ust_cmd); + } + + if (app) { + /* + * @session_lock_list + * + * Lock the global session list so from the register up to the + * registration done message, no thread can see the application + * and change its state. + */ + session_lock_list(); + rcu_read_lock(); + + /* + * Add application to the global hash table. This needs to be + * done before the update to the UST registry can locate the + * application. + */ + ust_app_add(app); + + /* Set app version. This call will print an error if needed. */ + (void) ust_app_version(app); + + /* Send notify socket through the notify pipe. */ + ret = send_socket_to_thread(apps_cmd_notify_pipe[1], + app->notify_sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); /* - * We can't inform the application thread to process - * registration. We will exit or else application - * registration will not occur and tracing will never - * start. + * No notify thread, stop the UST tracing. However, this is + * not an internal error of the this thread thus setting + * the health error code to a normal exit. */ + err = 0; goto error; } + + /* + * Update newly registered application with the tracing + * registry info already enabled information. + */ + update_ust_app(app->sock); + + /* + * Don't care about return value. Let the manage apps threads + * handle app unregistration upon socket close. + */ + (void) ust_app_register_done(app->sock); + + /* + * Even if the application socket has been closed, send the app + * to the thread and unregistration will take place at that + * place. + */ + ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); + if (ret < 0) { + rcu_read_unlock(); + session_unlock_list(); + /* + * No apps. thread, stop the UST tracing. However, this is + * not an internal error of the this thread thus setting + * the health error code to a normal exit. + */ + err = 0; + goto error; + } + + rcu_read_unlock(); + session_unlock_list(); } - free(ust_cmd); } while (node != NULL); + health_poll_entry(); /* Futex wait on queue. Blocking call on futex() */ futex_nto1_wait(&ust_cmd_queue.futex); + health_poll_exit(); } + /* Normal exit, no error */ + err = 0; error: + /* Clean up wait queue. */ + cds_list_for_each_entry_safe(wait_node, tmp_wait_node, + &wait_queue.head, head) { + cds_list_del(&wait_node->head); + wait_queue.count--; + free(wait_node); + } + DBG("Dispatch thread dying"); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_sessiond); return NULL; } @@ -1308,7 +1746,11 @@ static void *thread_registration_apps(void *data) DBG("[thread] Manage application registration started"); - testpoint(thread_registration_apps); + health_register(health_sessiond, HEALTH_TYPE_APP_REG); + + if (testpoint(thread_registration_apps)) { + goto error_testpoint; + } ret = lttcomm_listen_unix_sock(apps_sock); if (ret < 0) { @@ -1319,7 +1761,7 @@ static void *thread_registration_apps(void *data) * Pass 2 as size here for the thread quit pipe and apps socket. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error_create_poll; } @@ -1341,13 +1783,11 @@ static void *thread_registration_apps(void *data) while (1) { DBG("Accepting application registration"); - nb_fd = LTTNG_POLL_GETNB(&events); - /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_app_reg); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_app_reg); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -1358,15 +1798,17 @@ static void *thread_registration_apps(void *data) goto error; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { - health_code_update(&health_thread_app_reg); + health_code_update(); /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -1383,6 +1825,12 @@ static void *thread_registration_apps(void *data) goto error; } + /* + * Set the CLOEXEC flag. Return code is useless because + * either way, the show must go on. + */ + (void) utils_set_fd_cloexec(sock); + /* Create UST registration command for enqueuing */ ust_cmd = zmalloc(sizeof(struct ust_command)); if (ust_cmd == NULL) { @@ -1405,16 +1853,12 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); - ret = lttcomm_recv_unix_sock(sock, &ust_cmd->reg_msg, - sizeof(struct ust_register_msg)); - if (ret < 0 || ret < sizeof(struct ust_register_msg)) { - if (ret < 0) { - PERROR("lttcomm_recv_unix_sock register apps"); - } else { - ERR("Wrong size received on apps register"); - } + + health_code_update(); + ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); + if (ret < 0) { free(ust_cmd); + /* Close socket of the application. */ ret = close(sock); if (ret) { PERROR("close"); @@ -1423,7 +1867,7 @@ static void *thread_registration_apps(void *data) sock = -1; continue; } - health_code_update(&health_thread_app_reg); + health_code_update(); ust_cmd->sock = sock; sock = -1; @@ -1454,10 +1898,9 @@ static void *thread_registration_apps(void *data) exit: error: if (err) { - health_error(&health_thread_app_reg); + health_error(); ERR("Health error occurred in %s", __func__); } - health_exit(&health_thread_app_reg); /* Notify that the registration thread is gone */ notify_ust_apps(0); @@ -1481,7 +1924,9 @@ error_poll_add: lttng_poll_clean(&events); error_listen: error_create_poll: +error_testpoint: DBG("UST Registration thread cleanup complete"); + health_unregister(health_sessiond); return NULL; } @@ -1613,10 +2058,10 @@ error: static int join_consumer_thread(struct consumer_data *consumer_data) { void *status; - int ret; /* Consumer pid must be a real one. */ if (consumer_data->pid > 0) { + int ret; ret = kill(consumer_data->pid, SIGTERM); if (ret) { ERR("Error killing consumer daemon"); @@ -1712,6 +2157,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1756,6 +2202,7 @@ static pid_t spawn_consumerd(struct consumer_data *consumer_data) ret = putenv(tmpnew); if (ret) { ret = -errno; + free(tmpnew); goto error; } } @@ -1797,6 +2244,16 @@ static int start_consumerd(struct consumer_data *consumer_data) { int ret; + /* + * Set the listen() state on the socket since there is a possible race + * between the exec() of the consumer daemon and this call if place in the + * consumer thread. See bug #366 for more details. + */ + ret = lttcomm_listen_unix_sock(consumer_data->err_sock); + if (ret < 0) { + goto error; + } + pthread_mutex_lock(&consumer_data->pid_mutex); if (consumer_data->pid != 0) { pthread_mutex_unlock(&consumer_data->pid_mutex); @@ -1826,6 +2283,15 @@ end: return 0; error: + /* Cleanup already created sockets on error. */ + if (consumer_data->err_sock >= 0) { + int err; + + err = close(consumer_data->err_sock); + if (err < 0) { + PERROR("close consumer data error socket"); + } + } return ret; } @@ -1837,9 +2303,7 @@ static int check_consumer_health(void) { int ret; - ret = health_check_state(&kconsumer_data.health) && - health_check_state(&ustconsumer32_data.health) && - health_check_state(&ustconsumer64_data.health); + ret = health_check_state(health_sessiond, HEALTH_TYPE_CONSUMER); DBG3("Health consumer check %d", ret); @@ -1914,6 +2378,8 @@ error: * Copy consumer output from the tracing session to the domain session. The * function also applies the right modification on a per domain basis for the * trace files destination directory. + * + * Should *NOT* be called with RCU read-side lock held. */ static int copy_session_consumer(int domain, struct ltt_session *session) { @@ -1971,6 +2437,8 @@ error: /* * Create an UST session and add it to the session ust list. + * + * Should *NOT* be called with RCU read-side lock held. */ static int create_ust_session(struct ltt_session *session, struct lttng_domain *domain) @@ -1993,7 +2461,7 @@ static int create_ust_session(struct ltt_session *session, DBG("Creating UST session"); - lus = trace_ust_create_session(session->path, session->id, domain); + lus = trace_ust_create_session(session->id); if (lus == NULL) { ret = LTTNG_ERR_UST_SESS_FAIL; goto error; @@ -2001,6 +2469,9 @@ static int create_ust_session(struct ltt_session *session, lus->uid = session->uid; lus->gid = session->gid; + lus->output_traces = session->output_traces; + lus->snapshot_mode = session->snapshot_mode; + lus->live_timer_interval = session->live_timer; session->ust_session = lus; /* Copy session output to the newly created UST session */ @@ -2057,6 +2528,8 @@ static int create_kernel_session(struct ltt_session *session) session->kernel_session->uid = session->uid; session->kernel_session->gid = session->gid; + session->kernel_session->output_traces = session->output_traces; + session->kernel_session->snapshot_mode = session->snapshot_mode; return LTTNG_OK; @@ -2096,6 +2569,8 @@ static unsigned int lttng_sessions_count(uid_t uid, gid_t gid) * Return any error encountered or 0 for success. * * "sock" is only used for special-case var. len data. + * + * Should *NOT* be called with RCU read-side lock held. */ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, int *sock_error) @@ -2110,12 +2585,18 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: + case LTTNG_CREATE_SESSION_SNAPSHOT: + case LTTNG_CREATE_SESSION_LIVE: case LTTNG_DESTROY_SESSION: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_DOMAINS: case LTTNG_START_TRACE: case LTTNG_STOP_TRACE: - case LTTNG_DATA_AVAILABLE: + case LTTNG_DATA_PENDING: + case LTTNG_SNAPSHOT_ADD_OUTPUT: + case LTTNG_SNAPSHOT_DEL_OUTPUT: + case LTTNG_SNAPSHOT_LIST_OUTPUT: + case LTTNG_SNAPSHOT_RECORD: need_domain = 0; break; default: @@ -2168,6 +2649,8 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Commands that DO NOT need a session. */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_CREATE_SESSION: + case LTTNG_CREATE_SESSION_SNAPSHOT: + case LTTNG_CREATE_SESSION_LIVE: case LTTNG_CALIBRATE: case LTTNG_LIST_SESSIONS: case LTTNG_LIST_TRACEPOINTS: @@ -2184,12 +2667,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, session_lock_list(); cmd_ctx->session = session_find_by_name(cmd_ctx->lsm->session.name); if (cmd_ctx->session == NULL) { - if (cmd_ctx->lsm->session.name != NULL) { - ret = LTTNG_ERR_SESS_NOT_FOUND; - } else { - /* If no session name specified */ - ret = LTTNG_ERR_SELECT_SESS; - } + ret = LTTNG_ERR_SESS_NOT_FOUND; goto error; } else { /* Acquire lock for the session */ @@ -2240,8 +2718,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* Start the kernel consumer daemon */ pthread_mutex_lock(&kconsumer_data.pid_mutex); if (kconsumer_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&kconsumer_data.pid_mutex); ret = start_consumerd(&kconsumer_data); if (ret < 0) { @@ -2267,6 +2744,10 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, break; case LTTNG_DOMAIN_UST: { + if (!ust_app_supported()) { + ret = LTTNG_ERR_NO_UST; + goto error; + } /* Consumer is in an ERROR state. Report back to client */ if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { ret = LTTNG_ERR_NO_USTCONSUMERD; @@ -2288,8 +2769,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, pthread_mutex_lock(&ustconsumer64_data.pid_mutex); if (consumerd64_bin[0] != '\0' && ustconsumer64_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer64_data.pid_mutex); ret = start_consumerd(&ustconsumer64_data); if (ret < 0) { @@ -2317,8 +2797,7 @@ static int process_client_msg(struct command_ctx *cmd_ctx, int sock, /* 32-bit */ if (consumerd32_bin[0] != '\0' && ustconsumer32_data.pid == 0 && - cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER && - cmd_ctx->session->start_consumer) { + cmd_ctx->lsm->cmd_type != LTTNG_REGISTER_CONSUMER) { pthread_mutex_unlock(&ustconsumer32_data.pid_mutex); ret = start_consumerd(&ustconsumer32_data); if (ret < 0) { @@ -2382,14 +2861,28 @@ skip_domain: } } + /* + * Send relayd information to consumer as soon as we have a domain and a + * session defined. + */ + if (cmd_ctx->session && need_domain) { + /* + * Setup relayd if not done yet. If the relayd information was already + * sent to the consumer, this call will gracefully return. + */ + ret = cmd_setup_relayd(cmd_ctx->session); + if (ret != LTTNG_OK) { + goto error; + } + } + /* Process by command type */ switch (cmd_ctx->lsm->cmd_type) { case LTTNG_ADD_CONTEXT: { ret = cmd_add_context(cmd_ctx->session, cmd_ctx->lsm->domain.type, cmd_ctx->lsm->u.context.channel_name, - cmd_ctx->lsm->u.context.event_name, - &cmd_ctx->lsm->u.context.ctx); + &cmd_ctx->lsm->u.context.ctx, kernel_poll_pipe[1]); break; } case LTTNG_DISABLE_CHANNEL: @@ -2404,64 +2897,35 @@ skip_domain: cmd_ctx->lsm->u.disable.channel_name, cmd_ctx->lsm->u.disable.name); break; - } - case LTTNG_DISABLE_ALL_EVENT: - { - DBG("Disabling all events"); - - ret = cmd_disable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, - cmd_ctx->lsm->u.disable.channel_name); - break; - } - case LTTNG_DISABLE_CONSUMER: - { - ret = cmd_disable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - break; - } - case LTTNG_ENABLE_CHANNEL: - { - ret = cmd_enable_channel(cmd_ctx->session, cmd_ctx->lsm->domain.type, - &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); - break; - } - case LTTNG_ENABLE_CONSUMER: - { - /* - * XXX: 0 means that this URI should be applied on the session. Should - * be a DOMAIN enuam. - */ - ret = cmd_enable_consumer(cmd_ctx->lsm->domain.type, cmd_ctx->session); - if (ret != LTTNG_OK) { - goto error; - } - - if (cmd_ctx->lsm->domain.type == 0) { - /* Add the URI for the UST session if a consumer is present. */ - if (cmd_ctx->session->ust_session && - cmd_ctx->session->ust_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_UST, cmd_ctx->session); - } else if (cmd_ctx->session->kernel_session && - cmd_ctx->session->kernel_session->consumer) { - ret = cmd_enable_consumer(LTTNG_DOMAIN_KERNEL, - cmd_ctx->session); - } - } + } + case LTTNG_DISABLE_ALL_EVENT: + { + DBG("Disabling all events"); + + ret = cmd_disable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, + cmd_ctx->lsm->u.disable.channel_name); + break; + } + case LTTNG_ENABLE_CHANNEL: + { + ret = cmd_enable_channel(cmd_ctx->session, &cmd_ctx->lsm->domain, + &cmd_ctx->lsm->u.channel.chan, kernel_poll_pipe[1]); break; } case LTTNG_ENABLE_EVENT: { - ret = cmd_enable_event(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, - &cmd_ctx->lsm->u.enable.event, kernel_poll_pipe[1]); + &cmd_ctx->lsm->u.enable.event, NULL, kernel_poll_pipe[1]); break; } case LTTNG_ENABLE_ALL_EVENT: { DBG("Enabling all events"); - ret = cmd_enable_event_all(cmd_ctx->session, cmd_ctx->lsm->domain.type, + ret = cmd_enable_event_all(cmd_ctx->session, &cmd_ctx->lsm->domain, cmd_ctx->lsm->u.enable.channel_name, - cmd_ctx->lsm->u.enable.event.type, kernel_poll_pipe[1]); + cmd_ctx->lsm->u.enable.event.type, NULL, kernel_poll_pipe[1]); break; } case LTTNG_LIST_TRACEPOINTS: @@ -2631,7 +3095,7 @@ skip_domain: } ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, nb_uri, - &cmd_ctx->creds); + &cmd_ctx->creds, 0); free(uris); @@ -2659,6 +3123,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_dom * sizeof(struct lttng_domain)); if (ret < 0) { + free(domains); goto setup_error; } @@ -2686,6 +3151,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_chan * sizeof(struct lttng_channel)); if (ret < 0) { + free(channels); goto setup_error; } @@ -2713,6 +3179,7 @@ skip_domain: ret = setup_lttng_msg(cmd_ctx, nb_event * sizeof(struct lttng_event)); if (ret < 0) { + free(events); goto setup_error; } @@ -2773,15 +3240,19 @@ skip_domain: cmd_ctx->lsm->u.reg.path, cdata); break; } - case LTTNG_SET_FILTER: + case LTTNG_ENABLE_EVENT_WITH_FILTER: { struct lttng_filter_bytecode *bytecode; - if (cmd_ctx->lsm->u.filter.bytecode_len > LTTNG_FILTER_MAX_LEN) { + if (cmd_ctx->lsm->u.enable.bytecode_len > LTTNG_FILTER_MAX_LEN) { + ret = LTTNG_ERR_FILTER_INVAL; + goto error; + } + if (cmd_ctx->lsm->u.enable.bytecode_len == 0) { ret = LTTNG_ERR_FILTER_INVAL; goto error; } - bytecode = zmalloc(cmd_ctx->lsm->u.filter.bytecode_len); + bytecode = zmalloc(cmd_ctx->lsm->u.enable.bytecode_len); if (!bytecode) { ret = LTTNG_ERR_FILTER_NOMEM; goto error; @@ -2789,7 +3260,7 @@ skip_domain: /* Receive var. len. data */ DBG("Receiving var len data from client ..."); ret = lttcomm_recv_unix_sock(sock, bytecode, - cmd_ctx->lsm->u.filter.bytecode_len); + cmd_ctx->lsm->u.enable.bytecode_len); if (ret <= 0) { DBG("Nothing recv() from client var len data... continuing"); *sock_error = 1; @@ -2798,21 +3269,159 @@ skip_domain: } if (bytecode->len + sizeof(*bytecode) - != cmd_ctx->lsm->u.filter.bytecode_len) { + != cmd_ctx->lsm->u.enable.bytecode_len) { free(bytecode); ret = LTTNG_ERR_FILTER_INVAL; goto error; } - ret = cmd_set_filter(cmd_ctx->session, cmd_ctx->lsm->domain.type, - cmd_ctx->lsm->u.filter.channel_name, - cmd_ctx->lsm->u.filter.event_name, - bytecode); + ret = cmd_enable_event(cmd_ctx->session, &cmd_ctx->lsm->domain, + cmd_ctx->lsm->u.enable.channel_name, + &cmd_ctx->lsm->u.enable.event, bytecode, kernel_poll_pipe[1]); + break; + } + case LTTNG_DATA_PENDING: + { + ret = cmd_data_pending(cmd_ctx->session); + break; + } + case LTTNG_SNAPSHOT_ADD_OUTPUT: + { + struct lttcomm_lttng_output_id reply; + + ret = cmd_snapshot_add_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output, &reply.id); + if (ret != LTTNG_OK) { + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, sizeof(reply)); + if (ret < 0) { + goto setup_error; + } + + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, &reply, sizeof(reply)); + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_DEL_OUTPUT: + { + ret = cmd_snapshot_del_output(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_output.output); + break; + } + case LTTNG_SNAPSHOT_LIST_OUTPUT: + { + ssize_t nb_output; + struct lttng_snapshot_output *outputs = NULL; + + nb_output = cmd_snapshot_list_outputs(cmd_ctx->session, &outputs); + if (nb_output < 0) { + ret = -nb_output; + goto error; + } + + ret = setup_lttng_msg(cmd_ctx, + nb_output * sizeof(struct lttng_snapshot_output)); + if (ret < 0) { + free(outputs); + goto setup_error; + } + + if (outputs) { + /* Copy output list into message payload */ + memcpy(cmd_ctx->llm->payload, outputs, + nb_output * sizeof(struct lttng_snapshot_output)); + free(outputs); + } + + ret = LTTNG_OK; + break; + } + case LTTNG_SNAPSHOT_RECORD: + { + ret = cmd_snapshot_record(cmd_ctx->session, + &cmd_ctx->lsm->u.snapshot_record.output, + cmd_ctx->lsm->u.snapshot_record.wait); + break; + } + case LTTNG_CREATE_SESSION_SNAPSHOT: + { + size_t nb_uri, len; + struct lttng_uri *uris = NULL; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri > 0) { + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTNG_ERR_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Waiting for %zu URIs from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + + if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) { + DBG("Creating session with ONE network URI is a bad call"); + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + } + + ret = cmd_create_session_snapshot(cmd_ctx->lsm->session.name, uris, + nb_uri, &cmd_ctx->creds); + free(uris); break; } - case LTTNG_DATA_AVAILABLE: + case LTTNG_CREATE_SESSION_LIVE: { - ret = cmd_data_available(cmd_ctx->session); + size_t nb_uri, len; + struct lttng_uri *uris = NULL; + + nb_uri = cmd_ctx->lsm->u.uri.size; + len = nb_uri * sizeof(struct lttng_uri); + + if (nb_uri > 0) { + uris = zmalloc(len); + if (uris == NULL) { + ret = LTTNG_ERR_FATAL; + goto error; + } + + /* Receive variable len data */ + DBG("Waiting for %zu URIs from client ...", nb_uri); + ret = lttcomm_recv_unix_sock(sock, uris, len); + if (ret <= 0) { + DBG("No URIs received from client... continuing"); + *sock_error = 1; + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + + if (nb_uri == 1 && uris[0].dtype != LTTNG_DST_PATH) { + DBG("Creating session with ONE network URI is a bad call"); + ret = LTTNG_ERR_SESSION_FAIL; + free(uris); + goto error; + } + } + + ret = cmd_create_session_uri(cmd_ctx->lsm->session.name, uris, + nb_uri, &cmd_ctx->creds, cmd_ctx->lsm->u.session_live.timer_interval); + free(uris); break; } default: @@ -2855,6 +3464,9 @@ static void *thread_manage_health(void *data) rcu_register_thread(); + /* We might hit an error path before this is created. */ + lttng_poll_init(&events); + /* Create unix socket */ sock = lttcomm_create_unix_sock(health_unix_sock_path); if (sock < 0) { @@ -2863,6 +3475,12 @@ static void *thread_manage_health(void *data) goto error; } + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + ret = lttcomm_listen_unix_sock(sock); if (ret < 0) { goto error; @@ -2872,7 +3490,7 @@ static void *thread_manage_health(void *data) * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { goto error; } @@ -2886,8 +3504,6 @@ static void *thread_manage_health(void *data) while (1) { DBG("Health check ready"); - nb_fd = LTTNG_POLL_GETNB(&events); - /* Inifinite blocking call, waiting for transmission */ restart: ret = lttng_poll_wait(&events, -1); @@ -2901,13 +3517,15 @@ restart: goto error; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -2927,6 +3545,12 @@ restart: goto error; } + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(new_sock); + DBG("Receiving data from client for health..."); ret = lttcomm_recv_unix_sock(new_sock, (void *)&msg, sizeof(msg)); if (ret <= 0) { @@ -2943,27 +3567,39 @@ restart: switch (msg.component) { case LTTNG_HEALTH_CMD: - reply.ret_code = health_check_state(&health_thread_cmd); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_CMD); break; case LTTNG_HEALTH_APP_MANAGE: - reply.ret_code = health_check_state(&health_thread_app_manage); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE); break; case LTTNG_HEALTH_APP_REG: - reply.ret_code = health_check_state(&health_thread_app_reg); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_REG); break; case LTTNG_HEALTH_KERNEL: - reply.ret_code = health_check_state(&health_thread_kernel); + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_KERNEL); break; case LTTNG_HEALTH_CONSUMER: reply.ret_code = check_consumer_health(); break; + case LTTNG_HEALTH_HT_CLEANUP: + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_HT_CLEANUP); + break; + case LTTNG_HEALTH_APP_MANAGE_NOTIFY: + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE_NOTIFY); + break; + case LTTNG_HEALTH_APP_REG_DISPATCH: + reply.ret_code = health_check_state(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); + break; case LTTNG_HEALTH_ALL: reply.ret_code = - health_check_state(&health_thread_app_manage) && - health_check_state(&health_thread_app_reg) && - health_check_state(&health_thread_cmd) && - health_check_state(&health_thread_kernel) && - check_consumer_health(); + health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_REG) && + health_check_state(health_sessiond, HEALTH_TYPE_CMD) && + health_check_state(health_sessiond, HEALTH_TYPE_KERNEL) && + check_consumer_health() && + health_check_state(health_sessiond, HEALTH_TYPE_HT_CLEANUP) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_MANAGE_NOTIFY) && + health_check_state(health_sessiond, HEALTH_TYPE_APP_REG_DISPATCH); break; default: reply.ret_code = LTTNG_ERR_UND; @@ -3008,12 +3644,6 @@ error: PERROR("close"); } } - if (new_sock >= 0) { - ret = close(new_sock); - if (ret) { - PERROR("close"); - } - } lttng_poll_clean(&events); @@ -3035,24 +3665,28 @@ static void *thread_manage_clients(void *data) DBG("[thread] Manage client started"); - testpoint(thread_manage_clients); - rcu_register_thread(); - health_code_update(&health_thread_cmd); + health_register(health_sessiond, HEALTH_TYPE_CMD); + + if (testpoint(thread_manage_clients)) { + goto error_testpoint; + } + + health_code_update(); ret = lttcomm_listen_unix_sock(client_sock); if (ret < 0) { - goto error; + goto error_listen; } /* * Pass 2 as size here for the thread quit pipe and client_sock. Nothing * more will be added to this poll set. */ - ret = create_thread_poll_set(&events, 2); + ret = sessiond_set_thread_pollset(&events, 2); if (ret < 0) { - goto error; + goto error_create_poll; } /* Add the application registration socket */ @@ -3068,20 +3702,20 @@ static void *thread_manage_clients(void *data) kill(ppid, SIGUSR1); } - testpoint(thread_manage_clients_before_loop); + if (testpoint(thread_manage_clients_before_loop)) { + goto error; + } - health_code_update(&health_thread_cmd); + health_code_update(); while (1) { DBG("Accepting client command ..."); - nb_fd = LTTNG_POLL_GETNB(&events); - /* Inifinite blocking call, waiting for transmission */ restart: - health_poll_update(&health_thread_cmd); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); - health_poll_update(&health_thread_cmd); + health_poll_exit(); if (ret < 0) { /* * Restart interrupted system call. @@ -3092,15 +3726,17 @@ static void *thread_manage_clients(void *data) goto error; } + nb_fd = ret; + for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - health_code_update(&health_thread_cmd); + health_code_update(); /* Thread quit pipe has been closed. Killing thread. */ - ret = check_thread_quit_pipe(pollfd, revents); + ret = sessiond_check_thread_quit_pipe(pollfd, revents); if (ret) { err = 0; goto exit; @@ -3117,13 +3753,19 @@ static void *thread_manage_clients(void *data) DBG("Wait for client response"); - health_code_update(&health_thread_cmd); + health_code_update(); sock = lttcomm_accept_unix_sock(client_sock); if (sock < 0) { goto error; } + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + (void) utils_set_fd_cloexec(sock); + /* Set socket option for credentials retrieval */ ret = lttcomm_setsockopt_creds_unix_sock(sock); if (ret < 0) { @@ -3147,7 +3789,7 @@ static void *thread_manage_clients(void *data) cmd_ctx->llm = NULL; cmd_ctx->session = NULL; - health_code_update(&health_thread_cmd); + health_code_update(); /* * Data is received from the lttng client. The struct @@ -3168,7 +3810,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); // TODO: Validate cmd_ctx including sanity check for // security purpose. @@ -3183,13 +3825,11 @@ static void *thread_manage_clients(void *data) ret = process_client_msg(cmd_ctx, sock, &sock_error); rcu_thread_offline(); if (ret < 0) { - if (sock_error) { - ret = close(sock); - if (ret) { - PERROR("close"); - } - sock = -1; + ret = close(sock); + if (ret) { + PERROR("close"); } + sock = -1; /* * TODO: Inform client somehow of the fatal error. At * this point, ret < 0 means that a zmalloc failed @@ -3201,7 +3841,7 @@ static void *thread_manage_clients(void *data) continue; } - health_code_update(&health_thread_cmd); + health_code_update(); DBG("Sending response (size: %d, retcode: %s)", cmd_ctx->lttng_msg_size, @@ -3220,18 +3860,24 @@ static void *thread_manage_clients(void *data) clean_command_ctx(&cmd_ctx); - health_code_update(&health_thread_cmd); + health_code_update(); } exit: error: - if (err) { - health_error(&health_thread_cmd); - ERR("Health error occurred in %s", __func__); + if (sock >= 0) { + ret = close(sock); + if (ret) { + PERROR("close"); + } } - health_exit(&health_thread_cmd); - DBG("Client thread dying"); + lttng_poll_clean(&events); + clean_command_ctx(&cmd_ctx); + +error_listen: +error_create_poll: +error_testpoint: unlink(client_unix_sock_path); if (client_sock >= 0) { ret = close(client_sock); @@ -3239,15 +3885,15 @@ error: PERROR("close"); } } - if (sock >= 0) { - ret = close(sock); - if (ret) { - PERROR("close"); - } + + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); } - lttng_poll_clean(&events); - clean_command_ctx(&cmd_ctx); + health_unregister(health_sessiond); + + DBG("Client thread dying"); rcu_unregister_thread(); return NULL; @@ -3279,6 +3925,7 @@ static void usage(void) fprintf(stderr, " -S, --sig-parent Send SIGCHLD to parent pid to notify readiness.\n"); fprintf(stderr, " -q, --quiet No output at all.\n"); fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -p, --pidfile FILE Write a pid to FILE name overriding the default value.\n"); fprintf(stderr, " --verbose-consumer Verbose mode for consumer. Activate DBG() macro.\n"); fprintf(stderr, " --no-kernel Disable kernel tracer\n"); } @@ -3312,12 +3959,13 @@ static int parse_args(int argc, char **argv) { "verbose", 0, 0, 'v' }, { "verbose-consumer", 0, 0, 'Z' }, { "no-kernel", 0, 0, 'N' }, + { "pidfile", 1, 0, 'p' }, { NULL, 0, 0, 0 } }; while (1) { int option_index = 0; - c = getopt_long(argc, argv, "dhqvVSN" "a:c:g:s:C:E:D:F:Z:u:t", + c = getopt_long(argc, argv, "dhqvVSN" "a:c:g:s:C:E:D:F:Z:u:t:p:", long_options, &option_index); if (c == -1) { break; @@ -3394,6 +4042,9 @@ static int parse_args(int argc, char **argv) case 'T': consumerd64_libdir = optarg; break; + case 'p': + opt_pidfile = optarg; + break; default: /* Unknown option or other error. * Error is printed by getopt, just return */ @@ -3424,6 +4075,14 @@ static int init_daemon_socket(void) goto end; } + /* Set the cloexec flag */ + ret = utils_set_fd_cloexec(client_sock); + if (ret < 0) { + ERR("Unable to set CLOEXEC flag to the client Unix socket (fd: %d). " + "Continuing but note that the consumer daemon will have a " + "reference to this socket on exec()", client_sock); + } + /* File permission MUST be 660 */ ret = chmod(client_unix_sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); if (ret < 0) { @@ -3440,6 +4099,14 @@ static int init_daemon_socket(void) goto end; } + /* Set the cloexec flag */ + ret = utils_set_fd_cloexec(apps_sock); + if (ret < 0) { + ERR("Unable to set CLOEXEC flag to the app Unix socket (fd: %d). " + "Continuing but note that the consumer daemon will have a " + "reference to this socket on exec()", apps_sock); + } + /* File permission MUST be 666 */ ret = chmod(apps_unix_sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); @@ -3449,6 +4116,9 @@ static int init_daemon_socket(void) goto end; } + DBG3("Session daemon client socket %d and application socket %d created", + client_sock, apps_sock); + end: umask(old_umask); return ret; @@ -3482,12 +4152,12 @@ static int set_permissions(char *rundir) ret = allowed_group(); if (ret < 0) { WARN("No tracing group detected"); - ret = 0; - goto end; + /* Setting gid to 0 if no tracing group is found */ + gid = 0; + } else { + gid = ret; } - gid = ret; - /* Set lttng run dir */ ret = chown(rundir, 0, gid); if (ret < 0) { @@ -3495,7 +4165,7 @@ static int set_permissions(char *rundir) PERROR("chown"); } - /* Ensure tracing group can search the run dir */ + /* Ensure all applications and tracing group can search the run dir */ ret = chmod(rundir, S_IRWXU | S_IXGRP | S_IXOTH); if (ret < 0) { ERR("Unable to set permissions on %s", rundir); @@ -3532,7 +4202,6 @@ static int set_permissions(char *rundir) DBG("All permissions are set"); -end: return ret; } @@ -3606,6 +4275,16 @@ static int set_consumer_sockets(struct consumer_data *consumer_data, goto error; } + /* + * Set the CLOEXEC flag. Return code is useless because either way, the + * show must go on. + */ + ret = utils_set_fd_cloexec(consumer_data->err_sock); + if (ret < 0) { + PERROR("utils_set_fd_cloexec"); + /* continue anyway */ + } + /* File permission MUST be 660 */ ret = chmod(consumer_data->err_unix_sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); @@ -3701,6 +4380,38 @@ static void set_ulimit(void) } } +/* + * Write pidfile using the rundir and opt_pidfile. + */ +static void write_pidfile(void) +{ + int ret; + char pidfile_path[PATH_MAX]; + + assert(rundir); + + if (opt_pidfile) { + strncpy(pidfile_path, opt_pidfile, sizeof(pidfile_path)); + } else { + /* Build pidfile path from rundir and opt_pidfile. */ + ret = snprintf(pidfile_path, sizeof(pidfile_path), "%s/" + DEFAULT_LTTNG_SESSIOND_PIDFILE, rundir); + if (ret < 0) { + PERROR("snprintf pidfile path"); + goto error; + } + } + + /* + * Create pid file in rundir. Return value is of no importance. The + * execution will continue even though we are not able to write the file. + */ + (void) utils_create_pid_file(getpid(), pidfile_path); + +error: + return; +} + /* * main */ @@ -3708,7 +4419,7 @@ int main(int argc, char **argv) { int ret = 0; void *status; - const char *home_path; + const char *home_path, *env_app_timeout; init_kernel_workarounds(); @@ -3716,9 +4427,16 @@ int main(int argc, char **argv) setup_consumerd_path(); + page_size = sysconf(_SC_PAGESIZE); + if (page_size < 0) { + PERROR("sysconf _SC_PAGESIZE"); + page_size = LONG_MAX; + WARN("Fallback page size to %ld", page_size); + } + /* Parse arguments */ progname = argv[0]; - if ((ret = parse_args(argc, argv) < 0)) { + if ((ret = parse_args(argc, argv)) < 0) { goto error; } @@ -3795,7 +4513,7 @@ int main(int argc, char **argv) DBG2("Kernel consumer cmd path: %s", kconsumer_data.cmd_unix_sock_path); } else { - home_path = get_home_dir(); + home_path = utils_get_home_dir(); if (home_path == NULL) { /* TODO: Add --socket PATH option */ ERR("Can't get HOME directory for sockets creation."); @@ -3832,7 +4550,7 @@ int main(int argc, char **argv) /* Set global SHM for ust */ if (strlen(wait_shm_path) == 0) { snprintf(wait_shm_path, PATH_MAX, - DEFAULT_HOME_APPS_WAIT_SHM_PATH, geteuid()); + DEFAULT_HOME_APPS_WAIT_SHM_PATH, getuid()); } /* Set health check Unix path */ @@ -3848,6 +4566,7 @@ int main(int argc, char **argv) DBG("Client socket path %s", client_unix_sock_path); DBG("Application socket path %s", apps_unix_sock_path); + DBG("Application wait path %s", wait_shm_path); DBG("LTTng run directory path: %s", rundir); /* 32 bits consumerd path setup */ @@ -3945,7 +4664,14 @@ int main(int argc, char **argv) } /* Setup the kernel pipe for waking up the kernel thread */ - if ((ret = utils_create_pipe_cloexec(kernel_poll_pipe)) < 0) { + if (is_root && !opt_no_kernel) { + if ((ret = utils_create_pipe_cloexec(kernel_poll_pipe)) < 0) { + goto exit; + } + } + + /* Setup the thread ht_cleanup communication pipe. */ + if (utils_create_pipe_cloexec(ht_cleanup_pipe) < 0) { goto exit; } @@ -3954,6 +4680,15 @@ int main(int argc, char **argv) goto exit; } + /* Setup the thread apps notify communication pipe. */ + if (utils_create_pipe_cloexec(apps_cmd_notify_pipe) < 0) { + goto exit; + } + + /* Initialize global buffer per UID and PID registry. */ + buffer_reg_init_uid_registry(); + buffer_reg_init_pid_registry(); + /* Init UST command queue. */ cds_wfq_init(&ust_cmd_queue.queue); @@ -3968,23 +4703,38 @@ int main(int argc, char **argv) cmd_init(); - /* Init all health thread counters. */ - health_init(&health_thread_cmd); - health_init(&health_thread_kernel); - health_init(&health_thread_app_manage); - health_init(&health_thread_app_reg); + /* Check for the application socket timeout env variable. */ + env_app_timeout = getenv(DEFAULT_APP_SOCKET_TIMEOUT_ENV); + if (env_app_timeout) { + app_socket_timeout = atoi(env_app_timeout); + } else { + app_socket_timeout = DEFAULT_APP_SOCKET_RW_TIMEOUT; + } + + write_pidfile(); + + /* Initialize communication library */ + lttcomm_init(); + /* This is to get the TCP timeout value. */ + lttcomm_inet_init(); /* - * Init health counters of the consumer thread. We do a quick hack here to - * the state of the consumer health is fine even if the thread is not - * started. This is simply to ease our life and has no cost what so ever. + * Initialize the health check subsystem. This call should set the + * appropriate time values. */ - health_init(&kconsumer_data.health); - health_poll_update(&kconsumer_data.health); - health_init(&ustconsumer32_data.health); - health_poll_update(&ustconsumer32_data.health); - health_init(&ustconsumer64_data.health); - health_poll_update(&ustconsumer64_data.health); + health_sessiond = health_app_create(HEALTH_NUM_TYPE); + if (!health_sessiond) { + PERROR("health_app_create error"); + goto exit_health_sessiond_cleanup; + } + + /* Create thread to manage the client socket */ + ret = pthread_create(&ht_cleanup_thread, NULL, + thread_ht_cleanup, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create ht_cleanup"); + goto exit_ht_cleanup; + } /* Create thread to manage the client socket */ ret = pthread_create(&health_thread, NULL, @@ -4026,27 +4776,46 @@ int main(int argc, char **argv) goto exit_apps; } - /* Create kernel thread to manage kernel event */ - ret = pthread_create(&kernel_thread, NULL, - thread_manage_kernel, (void *) NULL); + /* Create thread to manage application notify socket */ + ret = pthread_create(&apps_notify_thread, NULL, + ust_thread_manage_notify, (void *) NULL); if (ret != 0) { - PERROR("pthread_create kernel"); - goto exit_kernel; + PERROR("pthread_create apps"); + goto exit_apps_notify; + } + + /* Don't start this thread if kernel tracing is not requested nor root */ + if (is_root && !opt_no_kernel) { + /* Create kernel thread to manage kernel event */ + ret = pthread_create(&kernel_thread, NULL, + thread_manage_kernel, (void *) NULL); + if (ret != 0) { + PERROR("pthread_create kernel"); + goto exit_kernel; + } + + ret = pthread_join(kernel_thread, &status); + if (ret != 0) { + PERROR("pthread_join"); + goto error; /* join error, exit without cleanup */ + } } - ret = pthread_join(kernel_thread, &status); +exit_kernel: + ret = pthread_join(apps_notify_thread, &status); if (ret != 0) { - PERROR("pthread_join"); + PERROR("pthread_join apps notify"); goto error; /* join error, exit without cleanup */ } -exit_kernel: +exit_apps_notify: ret = pthread_join(apps_thread, &status); if (ret != 0) { - PERROR("pthread_join"); + PERROR("pthread_join apps"); goto error; /* join error, exit without cleanup */ } + exit_apps: ret = pthread_join(reg_apps_thread, &status); if (ret != 0) { @@ -4094,6 +4863,14 @@ exit_client: } exit_health: + ret = pthread_join(ht_cleanup_thread, &status); + if (ret != 0) { + PERROR("pthread_join ht cleanup thread"); + goto error; /* join error, exit without cleanup */ + } +exit_ht_cleanup: + health_app_destroy(health_sessiond); +exit_health_sessiond_cleanup: exit: /* * cleanup() is called when no other thread is running.