X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fmain.c;h=579d49ab8d5c1eff1c16ed8425e4f257dedd687a;hp=56d932da9f7a961a2274d63d7ee32b6f8ce877ed;hb=3316b9d604bd4607e255ca3bbed0ee3e980dde4a;hpb=8a7e45909a46d4aab3c3debde8f5ce2006c7d306 diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 56d932da9..579d49ab8 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -72,7 +72,6 @@ #include "ust-thread.h" #include "agent-thread.h" #include "save.h" -#include "load-session-thread.h" #include "notification-thread.h" #include "notification-thread-commands.h" #include "rotation-thread.h" @@ -86,6 +85,7 @@ #include "dispatch.h" #include "register.h" #include "manage-apps.h" +#include "manage-kernel.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -144,10 +144,6 @@ static const char *config_ignore_options[] = { "help", "version", "config" }; static int apps_cmd_pipe[2] = { -1, -1 }; static int apps_cmd_notify_pipe[2] = { -1, -1 }; -/* Pthread, Mutexes and Semaphores */ -static pthread_t kernel_thread; -static pthread_t load_session_thread; - /* * UST registration command queue. This queue is tied with a futex and uses a N * wakers / 1 waiter implemented and detailed in futex.c/.h @@ -161,9 +157,6 @@ static struct ust_cmd_queue ust_cmd_queue; static const char *module_proc_lttng = "/proc/lttng"; -/* Load session thread information to operate. */ -static struct load_session_thread_data *load_info; - /* * Section name to look for in the daemon configuration file. */ @@ -294,6 +287,7 @@ static void sessiond_cleanup(void) sessiond_close_quit_pipe(); utils_close_pipe(apps_cmd_pipe); utils_close_pipe(apps_cmd_notify_pipe); + utils_close_pipe(kernel_poll_pipe); ret = remove(config.pid_file_path.value); if (ret < 0) { @@ -359,11 +353,6 @@ static void sessiond_cleanup(void) close_consumer_sockets(); - if (load_info) { - load_session_destroy_data(load_info); - free(load_info); - } - /* * We do NOT rmdir rundir because there are other processes * using it, for instance lttng-relayd, which can start in @@ -383,290 +372,6 @@ static void sessiond_cleanup_options(void) run_as_destroy_worker(); } -/* - * Update the kernel poll set of all channel fd available over all tracing - * session. Add the wakeup pipe at the end of the set. - */ -static int update_kernel_poll(struct lttng_poll_event *events) -{ - int ret; - struct ltt_kernel_channel *channel; - struct ltt_session *session; - const struct ltt_session_list *session_list = session_get_list(); - - DBG("Updating kernel poll set"); - - session_lock_list(); - cds_list_for_each_entry(session, &session_list->head, list) { - if (!session_get(session)) { - continue; - } - session_lock(session); - if (session->kernel_session == NULL) { - session_unlock(session); - session_put(session); - continue; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { - /* Add channel fd to the kernel poll set */ - ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM); - if (ret < 0) { - session_unlock(session); - session_put(session); - goto error; - } - DBG("Channel fd %d added to kernel set", channel->fd); - } - session_unlock(session); - } - session_unlock_list(); - - return 0; - -error: - session_unlock_list(); - return -1; -} - -/* - * Find the channel fd from 'fd' over all tracing session. When found, check - * for new channel stream and send those stream fds to the kernel consumer. - * - * Useful for CPU hotplug feature. - */ -static int update_kernel_stream(int fd) -{ - int ret = 0; - struct ltt_session *session; - struct ltt_kernel_session *ksess; - struct ltt_kernel_channel *channel; - const struct ltt_session_list *session_list = session_get_list(); - - DBG("Updating kernel streams for channel fd %d", fd); - - session_lock_list(); - cds_list_for_each_entry(session, &session_list->head, list) { - if (!session_get(session)) { - continue; - } - session_lock(session); - if (session->kernel_session == NULL) { - session_unlock(session); - session_put(session); - continue; - } - ksess = session->kernel_session; - - cds_list_for_each_entry(channel, - &ksess->channel_list.head, list) { - struct lttng_ht_iter iter; - struct consumer_socket *socket; - - if (channel->fd != fd) { - continue; - } - DBG("Channel found, updating kernel streams"); - ret = kernel_open_channel_stream(channel); - if (ret < 0) { - goto error; - } - /* Update the stream global counter */ - ksess->stream_count_global += ret; - - /* - * Have we already sent fds to the consumer? If yes, it - * means that tracing is started so it is safe to send - * our updated stream fds. - */ - if (ksess->consumer_fds_sent != 1 - || ksess->consumer == NULL) { - ret = -1; - goto error; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ksess->consumer->socks->ht, - &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = kernel_consumer_send_channel_streams(socket, - channel, ksess, - session->output_traces ? 1 : 0); - pthread_mutex_unlock(socket->lock); - if (ret < 0) { - rcu_read_unlock(); - goto error; - } - } - rcu_read_unlock(); - } - session_unlock(session); - session_put(session); - } - session_unlock_list(); - return ret; - -error: - session_unlock(session); - session_put(session); - session_unlock_list(); - return ret; -} - -/* - * This thread manage event coming from the kernel. - * - * Features supported in this thread: - * -) CPU Hotplug - */ -static void *thread_manage_kernel(void *data) -{ - int ret, i, pollfd, update_poll_flag = 1, err = -1; - uint32_t revents, nb_fd; - char tmp; - struct lttng_poll_event events; - - DBG("[thread] Thread manage kernel started"); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL); - - /* - * This first step of the while is to clean this structure which could free - * non NULL pointers so initialize it before the loop. - */ - lttng_poll_init(&events); - - if (testpoint(sessiond_thread_manage_kernel)) { - goto error_testpoint; - } - - health_code_update(); - - if (testpoint(sessiond_thread_manage_kernel_before_loop)) { - goto error_testpoint; - } - - while (1) { - health_code_update(); - - if (update_poll_flag == 1) { - /* Clean events object. We are about to populate it again. */ - lttng_poll_clean(&events); - - ret = sessiond_set_thread_pollset(&events, 2); - if (ret < 0) { - goto error_poll_create; - } - - ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); - if (ret < 0) { - goto error; - } - - /* This will add the available kernel channel if any. */ - ret = update_kernel_poll(&events); - if (ret < 0) { - goto error; - } - update_poll_flag = 0; - } - - DBG("Thread kernel polling"); - - /* Poll infinite value of time */ - restart: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - DBG("Thread kernel return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } else if (ret == 0) { - /* Should not happen since timeout is infinite */ - ERR("Return value of poll is 0 with an infinite timeout.\n" - "This should not have happened! Continuing..."); - continue; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Check for data on kernel pipe */ - if (revents & LPOLLIN) { - if (pollfd == kernel_poll_pipe[0]) { - (void) lttng_read(kernel_poll_pipe[0], - &tmp, 1); - /* - * Ret value is useless here, if this pipe gets any actions an - * update is required anyway. - */ - update_poll_flag = 1; - continue; - } else { - /* - * New CPU detected by the kernel. Adding kernel stream to - * kernel session and updating the kernel consumer - */ - ret = update_kernel_stream(pollfd); - if (ret < 0) { - continue; - } - break; - } - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - update_poll_flag = 1; - continue; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - -exit: -error: - lttng_poll_clean(&events); -error_poll_create: -error_testpoint: - utils_close_pipe(kernel_poll_pipe); - kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1; - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - WARN("Kernel thread died unexpectedly. " - "Kernel tracing can continue but CPU hotplug is disabled."); - } - health_unregister(health_sessiond); - DBG("Kernel thread dying"); - return NULL; -} - /* * Signal pthread condition of the consumer data that the thread. */ @@ -2020,7 +1725,6 @@ static void destroy_all_sessions_and_wait(void) int main(int argc, char **argv) { int ret = 0, retval = 0; - void *status; const char *env_app_timeout; struct lttng_pipe *ust32_channel_monitor_pipe = NULL, *ust64_channel_monitor_pipe = NULL, @@ -2363,12 +2067,6 @@ int main(int argc, char **argv) /* Initialize TCP timeout values */ lttcomm_inet_init(); - if (load_session_init_data(&load_info) < 0) { - retval = -1; - goto exit_init_data; - } - load_info->path = config.load_session_path.value; - /* Create health-check thread. */ if (!launch_health_management_thread()) { retval = -1; @@ -2455,40 +2153,29 @@ int main(int argc, char **argv) /* Don't start this thread if kernel tracing is not requested nor root */ if (is_root && !config.no_kernel) { /* Create kernel thread to manage kernel event */ - ret = pthread_create(&kernel_thread, default_pthread_attr(), - thread_manage_kernel, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create kernel"); + if (!launch_kernel_management_thread(kernel_poll_pipe[0])) { retval = -1; - stop_threads(); goto exit_kernel; } } - /* Create session loading thread. */ - ret = pthread_create(&load_session_thread, default_pthread_attr(), - thread_load_session, load_info); + /* Load sessions. */ + ret = config_load_session(config.load_session_path.value, + NULL, 1, 1, NULL); if (ret) { - errno = ret; - PERROR("pthread_create load_session_thread"); + ERR("Session load failed: %s", error_get_str(ret)); retval = -1; - stop_threads(); goto exit_load_session; } + /* Initialization completed. */ + sessiond_signal_parents(); + /* * This is where we start awaiting program completion (e.g. through * signal that asks threads to teardown). */ - ret = pthread_join(load_session_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join load_session_thread"); - retval = -1; - } - /* Initiate teardown once activity occurs on the quit pipe. */ sessiond_wait_for_quit_pipe(-1U); @@ -2504,15 +2191,6 @@ int main(int argc, char **argv) destroy_all_sessions_and_wait(); exit_load_session: - - if (is_root && !config.no_kernel) { - ret = pthread_join(kernel_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join"); - retval = -1; - } - } exit_kernel: exit_agent_reg: exit_apps_notify: