X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=f1c2cd37cfedf22281f100fc80537259bd3fc32b;hp=1641ebeb876dfe2bc820ce6b0ea5d608148656c1;hb=7649924e1b3a80687c2fb79f8778e08bedb671ce;hpb=917a718d4ec336ca98820f3cf56a2db57fc9b1dd diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 1641ebeb8..f1c2cd37c 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -83,6 +83,9 @@ #include "timer.h" #include "thread.h" #include "client.h" +#include "dispatch.h" +#include "register.h" +#include "manage-apps.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -134,11 +137,6 @@ static const struct option long_options[] = { /* Command line options to ignore from configuration file */ static const char *config_ignore_options[] = { "help", "version", "config" }; -/* Shared between threads */ -static int dispatch_thread_exit; - -static int apps_sock = -1; - /* * This pipe is used to inform the thread managing application communication * that a command is queued and ready to be processed. @@ -146,11 +144,8 @@ static int apps_sock = -1; static int apps_cmd_pipe[2] = { -1, -1 }; /* Pthread, Mutexes and Semaphores */ -static pthread_t apps_thread; static pthread_t apps_notify_thread; -static pthread_t reg_apps_thread; static pthread_t kernel_thread; -static pthread_t dispatch_thread; static pthread_t agent_reg_thread; static pthread_t load_session_thread; @@ -191,10 +186,6 @@ static void stop_threads(void) if (ret < 0) { ERR("write error on thread quit pipe"); } - - /* Dispatch thread */ - CMM_STORE_SHARED(dispatch_thread_exit, 1); - futex_nto1_wake(&ust_cmd_queue.futex); } /* @@ -308,6 +299,8 @@ static void sessiond_cleanup(void) PERROR("remove pidfile %s", config.pid_file_path.value); } + utils_close_pipe(apps_cmd_pipe); + DBG("Removing sessiond and consumerd content of directory %s", config.rundir.value); @@ -391,31 +384,6 @@ static void sessiond_cleanup_options(void) run_as_destroy_worker(); } -/* - * Notify UST applications using the shm mmap futex. - */ -static int notify_ust_apps(int active) -{ - char *wait_shm_mmap; - - DBG("Notifying applications of session daemon state: %d", active); - - /* See shm.c for this call implying mmap, shm and futex calls */ - wait_shm_mmap = shm_ust_get_mmap(config.wait_shm_path.value, is_root); - if (wait_shm_mmap == NULL) { - goto error; - } - - /* Wake waiting process */ - futex_wait_update((int32_t *) wait_shm_mmap, active); - - /* Apps notified successfully */ - return 0; - -error: - return -1; -} - /* * Update the kernel poll set of all channel fd available over all tracing * session. Add the wakeup pipe at the end of the set. @@ -547,55 +515,6 @@ error: return ret; } -/* - * For each tracing session, update newly registered apps. The session list - * lock MUST be acquired before calling this. - */ -static void update_ust_app(int app_sock) -{ - struct ltt_session *sess, *stmp; - const struct ltt_session_list *session_list = session_get_list(); - - /* Consumer is in an ERROR state. Stop any application update. */ - if (uatomic_read(&ust_consumerd_state) == CONSUMER_ERROR) { - /* Stop the update process since the consumer is dead. */ - return; - } - - /* For all tracing session(s) */ - cds_list_for_each_entry_safe(sess, stmp, &session_list->head, list) { - struct ust_app *app; - - if (!session_get(sess)) { - continue; - } - session_lock(sess); - if (!sess->ust_session) { - goto unlock_session; - } - - rcu_read_lock(); - assert(app_sock >= 0); - app = ust_app_find_by_sock(app_sock); - if (app == NULL) { - /* - * Application can be unregistered before so - * this is possible hence simply stopping the - * update. - */ - DBG3("UST app update failed to find app sock %d", - app_sock); - goto unlock_rcu; - } - ust_app_global_update(sess->ust_session, app); - unlock_rcu: - rcu_read_unlock(); - unlock_session: - session_unlock(sess); - session_put(sess); - } -} - /* * This thread manage event coming from the kernel. * @@ -1133,797 +1052,6 @@ error_poll: return NULL; } -/* - * This thread receives application command sockets (FDs) on the - * apps_cmd_pipe and waits (polls) on them until they are closed - * or an error occurs. - * - * At that point, it flushes the data (tracing and metadata) associated - * with this application and tears down ust app sessions and other - * associated data structures through ust_app_unregister(). - * - * Note that this thread never sends commands to the applications - * through the command sockets; it merely listens for hang-ups - * and errors on those sockets and cleans-up as they occur. - */ -static void *thread_manage_apps(void *data) -{ - int i, ret, pollfd, err = -1; - ssize_t size_ret; - uint32_t revents, nb_fd; - struct lttng_poll_event events; - - DBG("[thread] Manage application started"); - - rcu_register_thread(); - rcu_thread_online(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE); - - if (testpoint(sessiond_thread_manage_apps)) { - goto error_testpoint; - } - - health_code_update(); - - ret = sessiond_set_thread_pollset(&events, 2); - if (ret < 0) { - goto error_poll_create; - } - - ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - if (testpoint(sessiond_thread_manage_apps_before_loop)) { - goto error; - } - - health_code_update(); - - while (1) { - DBG("Apps thread polling"); - - /* Inifinite blocking call, waiting for transmission */ - restart: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - DBG("Apps thread return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Inspect the apps cmd pipe */ - if (pollfd == apps_cmd_pipe[0]) { - if (revents & LPOLLIN) { - int sock; - - /* Empty pipe */ - size_ret = lttng_read(apps_cmd_pipe[0], &sock, sizeof(sock)); - if (size_ret < sizeof(sock)) { - PERROR("read apps cmd pipe"); - goto error; - } - - health_code_update(); - - /* - * Since this is a command socket (write then read), - * we only monitor the error events of the socket. - */ - ret = lttng_poll_add(&events, sock, - LPOLLERR | LPOLLHUP | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - DBG("Apps with sock %d added to poll set", sock); - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Apps command pipe error"); - goto error; - } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); - goto error; - } - } else { - /* - * At this point, we know that a registered application made - * the event at poll_wait. - */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - /* Removing from the poll set */ - ret = lttng_poll_del(&events, pollfd); - if (ret < 0) { - goto error; - } - - /* Socket closed on remote end. */ - ust_app_unregister(pollfd); - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - - health_code_update(); - } - } - -exit: -error: - lttng_poll_clean(&events); -error_poll_create: -error_testpoint: - utils_close_pipe(apps_cmd_pipe); - apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1; - - /* - * We don't clean the UST app hash table here since already registered - * applications can still be controlled so let them be until the session - * daemon dies or the applications stop. - */ - - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - DBG("Application communication apps thread cleanup complete"); - rcu_thread_offline(); - rcu_unregister_thread(); - return NULL; -} - -/* - * Send a socket to a thread This is called from the dispatch UST registration - * thread once all sockets are set for the application. - * - * The sock value can be invalid, we don't really care, the thread will handle - * it and make the necessary cleanup if so. - * - * On success, return 0 else a negative value being the errno message of the - * write(). - */ -static int send_socket_to_thread(int fd, int sock) -{ - ssize_t ret; - - /* - * It's possible that the FD is set as invalid with -1 concurrently just - * before calling this function being a shutdown state of the thread. - */ - if (fd < 0) { - ret = -EBADF; - goto error; - } - - ret = lttng_write(fd, &sock, sizeof(sock)); - if (ret < sizeof(sock)) { - PERROR("write apps pipe %d", fd); - if (ret < 0) { - ret = -errno; - } - goto error; - } - - /* All good. Don't send back the write positive ret value. */ - ret = 0; -error: - return (int) ret; -} - -/* - * Sanitize the wait queue of the dispatch registration thread meaning removing - * invalid nodes from it. This is to avoid memory leaks for the case the UST - * notify socket is never received. - */ -static void sanitize_wait_queue(struct ust_reg_wait_queue *wait_queue) -{ - int ret, nb_fd = 0, i; - unsigned int fd_added = 0; - struct lttng_poll_event events; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; - - assert(wait_queue); - - lttng_poll_init(&events); - - /* Just skip everything for an empty queue. */ - if (!wait_queue->count) { - goto end; - } - - ret = lttng_poll_create(&events, wait_queue->count, LTTNG_CLOEXEC); - if (ret < 0) { - goto error_create; - } - - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { - assert(wait_node->app); - ret = lttng_poll_add(&events, wait_node->app->sock, - LPOLLHUP | LPOLLERR); - if (ret < 0) { - goto error; - } - - fd_added = 1; - } - - if (!fd_added) { - goto end; - } - - /* - * Poll but don't block so we can quickly identify the faulty events and - * clean them afterwards from the wait queue. - */ - ret = lttng_poll_wait(&events, 0); - if (ret < 0) { - goto error; - } - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Get faulty FD. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue->head, head) { - if (pollfd == wait_node->app->sock && - (revents & (LPOLLHUP | LPOLLERR))) { - cds_list_del(&wait_node->head); - wait_queue->count--; - ust_app_destroy(wait_node->app); - free(wait_node); - /* - * Silence warning of use-after-free in - * cds_list_for_each_entry_safe which uses - * __typeof__(*wait_node). - */ - wait_node = NULL; - break; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - - if (nb_fd > 0) { - DBG("Wait queue sanitized, %d node were cleaned up", nb_fd); - } - -end: - lttng_poll_clean(&events); - return; - -error: - lttng_poll_clean(&events); -error_create: - ERR("Unable to sanitize wait queue"); - return; -} - -/* - * Dispatch request from the registration threads to the application - * communication thread. - */ -static void *thread_dispatch_ust_registration(void *data) -{ - int ret, err = -1; - struct cds_wfcq_node *node; - struct ust_command *ust_cmd = NULL; - struct ust_reg_wait_node *wait_node = NULL, *tmp_wait_node; - struct ust_reg_wait_queue wait_queue = { - .count = 0, - }; - - rcu_register_thread(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG_DISPATCH); - - if (testpoint(sessiond_thread_app_reg_dispatch)) { - goto error_testpoint; - } - - health_code_update(); - - CDS_INIT_LIST_HEAD(&wait_queue.head); - - DBG("[thread] Dispatch UST command started"); - - for (;;) { - health_code_update(); - - /* Atomically prepare the queue futex */ - futex_nto1_prepare(&ust_cmd_queue.futex); - - if (CMM_LOAD_SHARED(dispatch_thread_exit)) { - break; - } - - do { - struct ust_app *app = NULL; - ust_cmd = NULL; - - /* - * Make sure we don't have node(s) that have hung up before receiving - * the notify socket. This is to clean the list in order to avoid - * memory leaks from notify socket that are never seen. - */ - sanitize_wait_queue(&wait_queue); - - health_code_update(); - /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail); - if (node == NULL) { - DBG("Woken up but nothing in the UST command queue"); - /* Continue thread execution */ - break; - } - - ust_cmd = caa_container_of(node, struct ust_command, node); - - DBG("Dispatching UST registration pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - - if (ust_cmd->reg_msg.type == USTCTL_SOCKET_CMD) { - wait_node = zmalloc(sizeof(*wait_node)); - if (!wait_node) { - PERROR("zmalloc wait_node dispatch"); - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(ust_cmd); - goto error; - } - CDS_INIT_LIST_HEAD(&wait_node->head); - - /* Create application object if socket is CMD. */ - wait_node->app = ust_app_create(&ust_cmd->reg_msg, - ust_cmd->sock); - if (!wait_node->app) { - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(wait_node); - free(ust_cmd); - continue; - } - /* - * Add application to the wait queue so we can set the notify - * socket before putting this object in the global ht. - */ - cds_list_add(&wait_node->head, &wait_queue.head); - wait_queue.count++; - - free(ust_cmd); - /* - * We have to continue here since we don't have the notify - * socket and the application MUST be added to the hash table - * only at that moment. - */ - continue; - } else { - /* - * Look for the application in the local wait queue and set the - * notify socket if found. - */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { - health_code_update(); - if (wait_node->app->pid == ust_cmd->reg_msg.pid) { - wait_node->app->notify_sock = ust_cmd->sock; - cds_list_del(&wait_node->head); - wait_queue.count--; - app = wait_node->app; - free(wait_node); - DBG3("UST app notify socket %d is set", ust_cmd->sock); - break; - } - } - - /* - * With no application at this stage the received socket is - * basically useless so close it before we free the cmd data - * structure for good. - */ - if (!app) { - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - } - free(ust_cmd); - } - - if (app) { - /* - * @session_lock_list - * - * Lock the global session list so from the register up to the - * registration done message, no thread can see the application - * and change its state. - */ - session_lock_list(); - rcu_read_lock(); - - /* - * Add application to the global hash table. This needs to be - * done before the update to the UST registry can locate the - * application. - */ - ust_app_add(app); - - /* Set app version. This call will print an error if needed. */ - (void) ust_app_version(app); - - /* Send notify socket through the notify pipe. */ - ret = send_socket_to_thread(apps_cmd_notify_pipe[1], - app->notify_sock); - if (ret < 0) { - rcu_read_unlock(); - session_unlock_list(); - /* - * No notify thread, stop the UST tracing. However, this is - * not an internal error of the this thread thus setting - * the health error code to a normal exit. - */ - err = 0; - goto error; - } - - /* - * Update newly registered application with the tracing - * registry info already enabled information. - */ - update_ust_app(app->sock); - - /* - * Don't care about return value. Let the manage apps threads - * handle app unregistration upon socket close. - */ - (void) ust_app_register_done(app); - - /* - * Even if the application socket has been closed, send the app - * to the thread and unregistration will take place at that - * place. - */ - ret = send_socket_to_thread(apps_cmd_pipe[1], app->sock); - if (ret < 0) { - rcu_read_unlock(); - session_unlock_list(); - /* - * No apps. thread, stop the UST tracing. However, this is - * not an internal error of the this thread thus setting - * the health error code to a normal exit. - */ - err = 0; - goto error; - } - - rcu_read_unlock(); - session_unlock_list(); - } - } while (node != NULL); - - health_poll_entry(); - /* Futex wait on queue. Blocking call on futex() */ - futex_nto1_wait(&ust_cmd_queue.futex); - health_poll_exit(); - } - /* Normal exit, no error */ - err = 0; - -error: - /* Clean up wait queue. */ - cds_list_for_each_entry_safe(wait_node, tmp_wait_node, - &wait_queue.head, head) { - cds_list_del(&wait_node->head); - wait_queue.count--; - free(wait_node); - } - - /* Empty command queue. */ - for (;;) { - /* Dequeue command for registration */ - node = cds_wfcq_dequeue_blocking(&ust_cmd_queue.head, &ust_cmd_queue.tail); - if (node == NULL) { - break; - } - ust_cmd = caa_container_of(node, struct ust_command, node); - ret = close(ust_cmd->sock); - if (ret < 0) { - PERROR("close ust sock exit dispatch %d", ust_cmd->sock); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - free(ust_cmd); - } - -error_testpoint: - DBG("Dispatch thread dying"); - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - rcu_unregister_thread(); - return NULL; -} - -/* - * This thread manage application registration. - */ -static void *thread_registration_apps(void *data) -{ - int sock = -1, i, ret, pollfd, err = -1; - uint32_t revents, nb_fd; - struct lttng_poll_event events; - /* - * Get allocated in this thread, enqueued to a global queue, dequeued and - * freed in the manage apps thread. - */ - struct ust_command *ust_cmd = NULL; - - DBG("[thread] Manage application registration started"); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_REG); - - if (testpoint(sessiond_thread_registration_apps)) { - goto error_testpoint; - } - - ret = lttcomm_listen_unix_sock(apps_sock); - if (ret < 0) { - goto error_listen; - } - - /* - * Pass 2 as size here for the thread quit pipe and apps socket. Nothing - * more will be added to this poll set. - */ - ret = sessiond_set_thread_pollset(&events, 2); - if (ret < 0) { - goto error_create_poll; - } - - /* Add the application registration socket */ - ret = lttng_poll_add(&events, apps_sock, LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error_poll_add; - } - - /* Notify all applications to register */ - ret = notify_ust_apps(1); - if (ret < 0) { - ERR("Failed to notify applications or create the wait shared memory.\n" - "Execution continues but there might be problem for already\n" - "running applications that wishes to register."); - } - - while (1) { - DBG("Accepting application registration"); - - /* Inifinite blocking call, waiting for transmission */ - restart: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - health_code_update(); - - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Event on the registration socket */ - if (pollfd == apps_sock) { - if (revents & LPOLLIN) { - sock = lttcomm_accept_unix_sock(apps_sock); - if (sock < 0) { - goto error; - } - - /* - * Set socket timeout for both receiving and ending. - * app_socket_timeout is in seconds, whereas - * lttcomm_setsockopt_rcv_timeout and - * lttcomm_setsockopt_snd_timeout expect msec as - * parameter. - */ - if (config.app_socket_timeout >= 0) { - (void) lttcomm_setsockopt_rcv_timeout(sock, - config.app_socket_timeout * 1000); - (void) lttcomm_setsockopt_snd_timeout(sock, - config.app_socket_timeout * 1000); - } - - /* - * Set the CLOEXEC flag. Return code is useless because - * either way, the show must go on. - */ - (void) utils_set_fd_cloexec(sock); - - /* Create UST registration command for enqueuing */ - ust_cmd = zmalloc(sizeof(struct ust_command)); - if (ust_cmd == NULL) { - PERROR("ust command zmalloc"); - ret = close(sock); - if (ret) { - PERROR("close"); - } - goto error; - } - - /* - * Using message-based transmissions to ensure we don't - * have to deal with partially received messages. - */ - ret = lttng_fd_get(LTTNG_FD_APPS, 1); - if (ret < 0) { - ERR("Exhausted file descriptors allowed for applications."); - free(ust_cmd); - ret = close(sock); - if (ret) { - PERROR("close"); - } - sock = -1; - continue; - } - - health_code_update(); - ret = ust_app_recv_registration(sock, &ust_cmd->reg_msg); - if (ret < 0) { - free(ust_cmd); - /* Close socket of the application. */ - ret = close(sock); - if (ret) { - PERROR("close"); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - sock = -1; - continue; - } - health_code_update(); - - ust_cmd->sock = sock; - sock = -1; - - DBG("UST registration received with pid:%d ppid:%d uid:%d" - " gid:%d sock:%d name:%s (version %d.%d)", - ust_cmd->reg_msg.pid, ust_cmd->reg_msg.ppid, - ust_cmd->reg_msg.uid, ust_cmd->reg_msg.gid, - ust_cmd->sock, ust_cmd->reg_msg.name, - ust_cmd->reg_msg.major, ust_cmd->reg_msg.minor); - - /* - * Lock free enqueue the registration request. The red pill - * has been taken! This apps will be part of the *system*. - */ - cds_wfcq_enqueue(&ust_cmd_queue.head, &ust_cmd_queue.tail, &ust_cmd->node); - - /* - * Wake the registration queue futex. Implicit memory - * barrier with the exchange in cds_wfcq_enqueue. - */ - futex_nto1_wake(&ust_cmd_queue.futex); - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Register apps socket poll error"); - goto error; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - } - -exit: -error: - /* Notify that the registration thread is gone */ - notify_ust_apps(0); - - if (apps_sock >= 0) { - ret = close(apps_sock); - if (ret) { - PERROR("close"); - } - } - if (sock >= 0) { - ret = close(sock); - if (ret) { - PERROR("close"); - } - lttng_fd_put(LTTNG_FD_APPS, 1); - } - unlink(config.apps_unix_sock_path.value); - -error_poll_add: - lttng_poll_clean(&events); -error_listen: -error_create_poll: -error_testpoint: - DBG("UST Registration thread cleanup complete"); - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - - return NULL; -} - /* * Setup necessary data for kernel tracer action. */ @@ -2511,49 +1639,6 @@ end: return ret; } -/* - * Creates the application socket. - */ -static int init_daemon_socket(void) -{ - int ret = 0; - mode_t old_umask; - - old_umask = umask(0); - - /* Create the application unix socket */ - apps_sock = lttcomm_create_unix_sock(config.apps_unix_sock_path.value); - if (apps_sock < 0) { - ERR("Create unix sock failed: %s", config.apps_unix_sock_path.value); - ret = -1; - goto end; - } - - /* Set the cloexec flag */ - ret = utils_set_fd_cloexec(apps_sock); - if (ret < 0) { - ERR("Unable to set CLOEXEC flag to the app Unix socket (fd: %d). " - "Continuing but note that the consumer daemon will have a " - "reference to this socket on exec()", apps_sock); - } - - /* File permission MUST be 666 */ - ret = chmod(config.apps_unix_sock_path.value, - S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); - if (ret < 0) { - ERR("Set file permissions failed: %s", config.apps_unix_sock_path.value); - PERROR("chmod"); - goto end; - } - - DBG3("Session daemon application socket %d created", - apps_sock); - -end: - umask(old_umask); - return ret; -} - /* * Create lockfile using the rundir and return its fd. */ @@ -3219,12 +2304,6 @@ int main(int argc, char **argv) goto exit_init_data; } - /* Setup the needed unix socket */ - if (init_daemon_socket()) { - retval = -1; - goto exit_init_data; - } - /* Set credentials to socket */ if (is_root && set_permissions(config.rundir.value)) { retval = -1; @@ -3344,36 +2423,21 @@ int main(int argc, char **argv) goto exit_client; } - /* Create thread to dispatch registration */ - ret = pthread_create(&dispatch_thread, default_pthread_attr(), - thread_dispatch_ust_registration, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create dispatch"); + if (!launch_ust_dispatch_thread(&ust_cmd_queue, apps_cmd_pipe[1], + apps_cmd_notify_pipe[1])) { retval = -1; - stop_threads(); goto exit_dispatch; } /* Create thread to manage application registration. */ - ret = pthread_create(®_apps_thread, default_pthread_attr(), - thread_registration_apps, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create registration"); + if (!launch_application_registration_thread(&ust_cmd_queue)) { retval = -1; - stop_threads(); goto exit_reg_apps; } /* Create thread to manage application socket */ - ret = pthread_create(&apps_thread, default_pthread_attr(), - thread_manage_apps, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create apps"); + if (!launch_application_management_thread(apps_cmd_pipe[0])) { retval = -1; - stop_threads(); goto exit_apps; } @@ -3477,33 +2541,8 @@ exit_agent_reg: retval = -1; } exit_apps_notify: - - ret = pthread_join(apps_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join apps"); - retval = -1; - } exit_apps: - - ret = pthread_join(reg_apps_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join"); - retval = -1; - } exit_reg_apps: - - /* - * Join dispatch thread after joining reg_apps_thread to ensure - * we don't leak applications in the queue. - */ - ret = pthread_join(dispatch_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join"); - retval = -1; - } exit_dispatch: exit_client: exit_rotation: