From: Jérémie Galarneau Date: Sun, 2 Dec 2018 22:06:45 +0000 (-0500) Subject: Launch the kernel management thread using lttng_thread X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=5b0936814a6004448eeec430b2dfea534f35a2fc Launch the kernel management thread using lttng_thread Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 41538bce7..cb8da2d6c 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -49,7 +49,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \ client.c client.h \ dispatch.c dispatch.h \ register.c register.h \ - manage-apps.c manage-apps.h + manage-apps.c manage-apps.h \ + manage-kernel.c manage-kernel.h if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/agent-thread.c b/src/bin/lttng-sessiond/agent-thread.c index e648eda1e..9c98d30ab 100644 --- a/src/bin/lttng-sessiond/agent-thread.c +++ b/src/bin/lttng-sessiond/agent-thread.c @@ -454,7 +454,6 @@ error_tcp_socket: error_poll_create: uatomic_set(&agent_tracing_enabled, 0); DBG("[agent-thread] Cleaning up and stopping."); - lttng_pipe_destroy(quit_pipe); rcu_thread_offline(); rcu_unregister_thread(); return NULL; @@ -468,7 +467,14 @@ static bool shutdown_agent_management_thread(void *data) return notify_thread_pipe(write_fd) == 1; } -bool launch_agent_registration_thread(void) +static void cleanup_agent_management_thread(void *data) +{ + struct lttng_pipe *quit_pipe = data; + + lttng_pipe_destroy(quit_pipe); +} + +bool launch_agent_management_thread(void) { struct lttng_pipe *quit_pipe; struct lttng_thread *thread; @@ -480,6 +486,7 @@ bool launch_agent_registration_thread(void) thread = lttng_thread_create("Agent management", thread_agent_management, shutdown_agent_management_thread, + cleanup_agent_management_thread, quit_pipe); if (!thread) { goto error; @@ -488,6 +495,6 @@ bool launch_agent_registration_thread(void) lttng_thread_put(thread); return true; error: - lttng_pipe_destroy(quit_pipe); + cleanup_agent_management_thread(quit_pipe); return false; } diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index 56d932da9..7038e6707 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -86,6 +86,7 @@ #include "dispatch.h" #include "register.h" #include "manage-apps.h" +#include "manage-kernel.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -145,7 +146,6 @@ static int apps_cmd_pipe[2] = { -1, -1 }; static int apps_cmd_notify_pipe[2] = { -1, -1 }; /* Pthread, Mutexes and Semaphores */ -static pthread_t kernel_thread; static pthread_t load_session_thread; /* @@ -294,6 +294,7 @@ static void sessiond_cleanup(void) sessiond_close_quit_pipe(); utils_close_pipe(apps_cmd_pipe); utils_close_pipe(apps_cmd_notify_pipe); + utils_close_pipe(kernel_poll_pipe); ret = remove(config.pid_file_path.value); if (ret < 0) { @@ -383,290 +384,6 @@ static void sessiond_cleanup_options(void) run_as_destroy_worker(); } -/* - * Update the kernel poll set of all channel fd available over all tracing - * session. Add the wakeup pipe at the end of the set. - */ -static int update_kernel_poll(struct lttng_poll_event *events) -{ - int ret; - struct ltt_kernel_channel *channel; - struct ltt_session *session; - const struct ltt_session_list *session_list = session_get_list(); - - DBG("Updating kernel poll set"); - - session_lock_list(); - cds_list_for_each_entry(session, &session_list->head, list) { - if (!session_get(session)) { - continue; - } - session_lock(session); - if (session->kernel_session == NULL) { - session_unlock(session); - session_put(session); - continue; - } - - cds_list_for_each_entry(channel, - &session->kernel_session->channel_list.head, list) { - /* Add channel fd to the kernel poll set */ - ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM); - if (ret < 0) { - session_unlock(session); - session_put(session); - goto error; - } - DBG("Channel fd %d added to kernel set", channel->fd); - } - session_unlock(session); - } - session_unlock_list(); - - return 0; - -error: - session_unlock_list(); - return -1; -} - -/* - * Find the channel fd from 'fd' over all tracing session. When found, check - * for new channel stream and send those stream fds to the kernel consumer. - * - * Useful for CPU hotplug feature. - */ -static int update_kernel_stream(int fd) -{ - int ret = 0; - struct ltt_session *session; - struct ltt_kernel_session *ksess; - struct ltt_kernel_channel *channel; - const struct ltt_session_list *session_list = session_get_list(); - - DBG("Updating kernel streams for channel fd %d", fd); - - session_lock_list(); - cds_list_for_each_entry(session, &session_list->head, list) { - if (!session_get(session)) { - continue; - } - session_lock(session); - if (session->kernel_session == NULL) { - session_unlock(session); - session_put(session); - continue; - } - ksess = session->kernel_session; - - cds_list_for_each_entry(channel, - &ksess->channel_list.head, list) { - struct lttng_ht_iter iter; - struct consumer_socket *socket; - - if (channel->fd != fd) { - continue; - } - DBG("Channel found, updating kernel streams"); - ret = kernel_open_channel_stream(channel); - if (ret < 0) { - goto error; - } - /* Update the stream global counter */ - ksess->stream_count_global += ret; - - /* - * Have we already sent fds to the consumer? If yes, it - * means that tracing is started so it is safe to send - * our updated stream fds. - */ - if (ksess->consumer_fds_sent != 1 - || ksess->consumer == NULL) { - ret = -1; - goto error; - } - - rcu_read_lock(); - cds_lfht_for_each_entry(ksess->consumer->socks->ht, - &iter.iter, socket, node.node) { - pthread_mutex_lock(socket->lock); - ret = kernel_consumer_send_channel_streams(socket, - channel, ksess, - session->output_traces ? 1 : 0); - pthread_mutex_unlock(socket->lock); - if (ret < 0) { - rcu_read_unlock(); - goto error; - } - } - rcu_read_unlock(); - } - session_unlock(session); - session_put(session); - } - session_unlock_list(); - return ret; - -error: - session_unlock(session); - session_put(session); - session_unlock_list(); - return ret; -} - -/* - * This thread manage event coming from the kernel. - * - * Features supported in this thread: - * -) CPU Hotplug - */ -static void *thread_manage_kernel(void *data) -{ - int ret, i, pollfd, update_poll_flag = 1, err = -1; - uint32_t revents, nb_fd; - char tmp; - struct lttng_poll_event events; - - DBG("[thread] Thread manage kernel started"); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL); - - /* - * This first step of the while is to clean this structure which could free - * non NULL pointers so initialize it before the loop. - */ - lttng_poll_init(&events); - - if (testpoint(sessiond_thread_manage_kernel)) { - goto error_testpoint; - } - - health_code_update(); - - if (testpoint(sessiond_thread_manage_kernel_before_loop)) { - goto error_testpoint; - } - - while (1) { - health_code_update(); - - if (update_poll_flag == 1) { - /* Clean events object. We are about to populate it again. */ - lttng_poll_clean(&events); - - ret = sessiond_set_thread_pollset(&events, 2); - if (ret < 0) { - goto error_poll_create; - } - - ret = lttng_poll_add(&events, kernel_poll_pipe[0], LPOLLIN); - if (ret < 0) { - goto error; - } - - /* This will add the available kernel channel if any. */ - ret = update_kernel_poll(&events); - if (ret < 0) { - goto error; - } - update_poll_flag = 0; - } - - DBG("Thread kernel polling"); - - /* Poll infinite value of time */ - restart: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - DBG("Thread kernel return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } else if (ret == 0) { - /* Should not happen since timeout is infinite */ - ERR("Return value of poll is 0 with an infinite timeout.\n" - "This should not have happened! Continuing..."); - continue; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Check for data on kernel pipe */ - if (revents & LPOLLIN) { - if (pollfd == kernel_poll_pipe[0]) { - (void) lttng_read(kernel_poll_pipe[0], - &tmp, 1); - /* - * Ret value is useless here, if this pipe gets any actions an - * update is required anyway. - */ - update_poll_flag = 1; - continue; - } else { - /* - * New CPU detected by the kernel. Adding kernel stream to - * kernel session and updating the kernel consumer - */ - ret = update_kernel_stream(pollfd); - if (ret < 0) { - continue; - } - break; - } - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - update_poll_flag = 1; - continue; - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - } - -exit: -error: - lttng_poll_clean(&events); -error_poll_create: -error_testpoint: - utils_close_pipe(kernel_poll_pipe); - kernel_poll_pipe[0] = kernel_poll_pipe[1] = -1; - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - WARN("Kernel thread died unexpectedly. " - "Kernel tracing can continue but CPU hotplug is disabled."); - } - health_unregister(health_sessiond); - DBG("Kernel thread dying"); - return NULL; -} - /* * Signal pthread condition of the consumer data that the thread. */ @@ -2455,13 +2172,8 @@ int main(int argc, char **argv) /* Don't start this thread if kernel tracing is not requested nor root */ if (is_root && !config.no_kernel) { /* Create kernel thread to manage kernel event */ - ret = pthread_create(&kernel_thread, default_pthread_attr(), - thread_manage_kernel, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create kernel"); + if (!launch_kernel_management_thread(kernel_poll_pipe[0])) { retval = -1; - stop_threads(); goto exit_kernel; } } @@ -2504,15 +2216,6 @@ int main(int argc, char **argv) destroy_all_sessions_and_wait(); exit_load_session: - - if (is_root && !config.no_kernel) { - ret = pthread_join(kernel_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join"); - retval = -1; - } - } exit_kernel: exit_agent_reg: exit_apps_notify: diff --git a/src/bin/lttng-sessiond/manage-kernel.c b/src/bin/lttng-sessiond/manage-kernel.c new file mode 100644 index 000000000..35547743c --- /dev/null +++ b/src/bin/lttng-sessiond/manage-kernel.c @@ -0,0 +1,374 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include +#include + +#include "manage-kernel.h" +#include "testpoint.h" +#include "health-sessiond.h" +#include "utils.h" +#include "thread.h" +#include "kernel.h" +#include "kernel-consumer.h" + +struct thread_notifiers { + struct lttng_pipe *quit_pipe; + int kernel_poll_pipe_read_fd; +}; + +/* + * Update the kernel poll set of all channel fd available over all tracing + * session. Add the wakeup pipe at the end of the set. + */ +static int update_kernel_poll(struct lttng_poll_event *events) +{ + int ret; + struct ltt_kernel_channel *channel; + struct ltt_session *session; + const struct ltt_session_list *session_list = session_get_list(); + + DBG("Updating kernel poll set"); + + session_lock_list(); + cds_list_for_each_entry(session, &session_list->head, list) { + if (!session_get(session)) { + continue; + } + session_lock(session); + if (session->kernel_session == NULL) { + session_unlock(session); + session_put(session); + continue; + } + + cds_list_for_each_entry(channel, + &session->kernel_session->channel_list.head, list) { + /* Add channel fd to the kernel poll set */ + ret = lttng_poll_add(events, channel->fd, LPOLLIN | LPOLLRDNORM); + if (ret < 0) { + session_unlock(session); + session_put(session); + goto error; + } + DBG("Channel fd %d added to kernel set", channel->fd); + } + session_unlock(session); + } + session_unlock_list(); + + return 0; + +error: + session_unlock_list(); + return -1; +} + +/* + * Find the channel fd from 'fd' over all tracing session. When found, check + * for new channel stream and send those stream fds to the kernel consumer. + * + * Useful for CPU hotplug feature. + */ +static int update_kernel_stream(int fd) +{ + int ret = 0; + struct ltt_session *session; + struct ltt_kernel_session *ksess; + struct ltt_kernel_channel *channel; + const struct ltt_session_list *session_list = session_get_list(); + + DBG("Updating kernel streams for channel fd %d", fd); + + session_lock_list(); + cds_list_for_each_entry(session, &session_list->head, list) { + if (!session_get(session)) { + continue; + } + session_lock(session); + if (session->kernel_session == NULL) { + session_unlock(session); + session_put(session); + continue; + } + ksess = session->kernel_session; + + cds_list_for_each_entry(channel, + &ksess->channel_list.head, list) { + struct lttng_ht_iter iter; + struct consumer_socket *socket; + + if (channel->fd != fd) { + continue; + } + DBG("Channel found, updating kernel streams"); + ret = kernel_open_channel_stream(channel); + if (ret < 0) { + goto error; + } + /* Update the stream global counter */ + ksess->stream_count_global += ret; + + /* + * Have we already sent fds to the consumer? If yes, it + * means that tracing is started so it is safe to send + * our updated stream fds. + */ + if (ksess->consumer_fds_sent != 1 + || ksess->consumer == NULL) { + ret = -1; + goto error; + } + + rcu_read_lock(); + cds_lfht_for_each_entry(ksess->consumer->socks->ht, + &iter.iter, socket, node.node) { + pthread_mutex_lock(socket->lock); + ret = kernel_consumer_send_channel_streams(socket, + channel, ksess, + session->output_traces ? 1 : 0); + pthread_mutex_unlock(socket->lock); + if (ret < 0) { + rcu_read_unlock(); + goto error; + } + } + rcu_read_unlock(); + } + session_unlock(session); + session_put(session); + } + session_unlock_list(); + return ret; + +error: + session_unlock(session); + session_put(session); + session_unlock_list(); + return ret; +} + +/* + * This thread manage event coming from the kernel. + * + * Features supported in this thread: + * -) CPU Hotplug + */ +static void *thread_kernel_management(void *data) +{ + int ret, i, pollfd, update_poll_flag = 1, err = -1; + uint32_t revents, nb_fd; + char tmp; + struct lttng_poll_event events; + struct thread_notifiers *notifiers = data; + const int quit_pipe_read_fd = lttng_pipe_get_readfd(notifiers->quit_pipe); + + DBG("[thread] Thread manage kernel started"); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_KERNEL); + + /* + * This first step of the while is to clean this structure which could free + * non NULL pointers so initialize it before the loop. + */ + lttng_poll_init(&events); + + if (testpoint(sessiond_thread_manage_kernel)) { + goto error_testpoint; + } + + health_code_update(); + + if (testpoint(sessiond_thread_manage_kernel_before_loop)) { + goto error_testpoint; + } + + while (1) { + health_code_update(); + + if (update_poll_flag == 1) { + /* Clean events object. We are about to populate it again. */ + lttng_poll_clean(&events); + + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_poll_create; + } + + ret = lttng_poll_add(&events, + notifiers->kernel_poll_pipe_read_fd, + LPOLLIN); + if (ret < 0) { + goto error; + } + + ret = lttng_poll_add(&events, + quit_pipe_read_fd, + LPOLLIN); + if (ret < 0) { + goto error; + } + + /* This will add the available kernel channel if any. */ + ret = update_kernel_poll(&events); + if (ret < 0) { + goto error; + } + update_poll_flag = 0; + } + + DBG("Thread kernel polling"); + + /* Poll infinite value of time */ + restart: + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + DBG("Thread kernel return from poll on %d fds", + LTTNG_POLL_GETNB(&events)); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } else if (ret == 0) { + /* Should not happen since timeout is infinite */ + ERR("Return value of poll is 0 with an infinite timeout.\n" + "This should not have happened! Continuing..."); + continue; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + + if (pollfd == quit_pipe_read_fd) { + err = 0; + goto exit; + } + + /* Check for data on kernel pipe */ + if (revents & LPOLLIN) { + if (pollfd == notifiers->kernel_poll_pipe_read_fd) { + (void) lttng_read(notifiers->kernel_poll_pipe_read_fd, + &tmp, 1); + /* + * Ret value is useless here, if this pipe gets any actions an + * update is required anyway. + */ + update_poll_flag = 1; + continue; + } else { + /* + * New CPU detected by the kernel. Adding kernel stream to + * kernel session and updating the kernel consumer + */ + ret = update_kernel_stream(pollfd); + if (ret < 0) { + continue; + } + break; + } + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + update_poll_flag = 1; + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; + } + } + } + +exit: +error: + lttng_poll_clean(&events); +error_poll_create: +error_testpoint: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + WARN("Kernel thread died unexpectedly. " + "Kernel tracing can continue but CPU hotplug is disabled."); + } + health_unregister(health_sessiond); + DBG("Kernel thread dying"); + return NULL; +} + +static bool shutdown_kernel_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe); + + return notify_thread_pipe(write_fd) == 1; +} + +static void cleanup_kernel_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + + lttng_pipe_destroy(notifiers->quit_pipe); + free(notifiers); +} + +bool launch_kernel_management_thread(int kernel_poll_pipe_read_fd) +{ + struct lttng_pipe *quit_pipe; + struct thread_notifiers *notifiers = NULL; + struct lttng_thread *thread; + + quit_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!quit_pipe) { + goto error; + } + + notifiers = zmalloc(sizeof(*notifiers)); + if (!notifiers) { + goto error; + } + notifiers->quit_pipe = quit_pipe; + notifiers->kernel_poll_pipe_read_fd = kernel_poll_pipe_read_fd; + + thread = lttng_thread_create("Kernel management", + thread_kernel_management, + shutdown_kernel_management_thread, + cleanup_kernel_management_thread, + notifiers); + if (!thread) { + goto error; + } + lttng_thread_put(thread); + return true; +error: + cleanup_kernel_management_thread(notifiers); + return false; +} diff --git a/src/bin/lttng-sessiond/manage-kernel.h b/src/bin/lttng-sessiond/manage-kernel.h new file mode 100644 index 000000000..fba89265b --- /dev/null +++ b/src/bin/lttng-sessiond/manage-kernel.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef SESSIOND_KERNEL_MANAGEMENT_THREAD_H +#define SESSIOND_KERNEL_MANAGEMENT_THREAD_H + +#include +#include "lttng-sessiond.h" + +bool launch_kernel_management_thread(int kernel_poll_pipe_read_fd); + +#endif /* SESSIOND_KERNEL_MANAGEMENT_THREAD_H */