From: Jérémie Galarneau Date: Sat, 1 Dec 2018 00:25:06 +0000 (-0500) Subject: Launch the application management thread with lttng_thread X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=7649924e1b3a80687c2fb79f8778e08bedb671ce Launch the application management thread with lttng_thread Signed-off-by: Jérémie Galarneau --- diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 27491c870..41538bce7 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -48,7 +48,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \ health.c \ client.c client.h \ dispatch.c dispatch.h \ - register.c register.h + register.c register.h \ + manage-apps.c manage-apps.h if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index b211370d5..f1c2cd37c 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -85,6 +85,7 @@ #include "client.h" #include "dispatch.h" #include "register.h" +#include "manage-apps.h" static const char *help_msg = #ifdef LTTNG_EMBED_HELP @@ -143,7 +144,6 @@ static const char *config_ignore_options[] = { "help", "version", "config" }; static int apps_cmd_pipe[2] = { -1, -1 }; /* Pthread, Mutexes and Semaphores */ -static pthread_t apps_thread; static pthread_t apps_notify_thread; static pthread_t kernel_thread; static pthread_t agent_reg_thread; @@ -299,6 +299,8 @@ static void sessiond_cleanup(void) PERROR("remove pidfile %s", config.pid_file_path.value); } + utils_close_pipe(apps_cmd_pipe); + DBG("Removing sessiond and consumerd content of directory %s", config.rundir.value); @@ -1050,177 +1052,6 @@ error_poll: return NULL; } -/* - * This thread receives application command sockets (FDs) on the - * apps_cmd_pipe and waits (polls) on them until they are closed - * or an error occurs. - * - * At that point, it flushes the data (tracing and metadata) associated - * with this application and tears down ust app sessions and other - * associated data structures through ust_app_unregister(). - * - * Note that this thread never sends commands to the applications - * through the command sockets; it merely listens for hang-ups - * and errors on those sockets and cleans-up as they occur. - */ -static void *thread_manage_apps(void *data) -{ - int i, ret, pollfd, err = -1; - ssize_t size_ret; - uint32_t revents, nb_fd; - struct lttng_poll_event events; - - DBG("[thread] Manage application started"); - - rcu_register_thread(); - rcu_thread_online(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE); - - if (testpoint(sessiond_thread_manage_apps)) { - goto error_testpoint; - } - - health_code_update(); - - ret = sessiond_set_thread_pollset(&events, 2); - if (ret < 0) { - goto error_poll_create; - } - - ret = lttng_poll_add(&events, apps_cmd_pipe[0], LPOLLIN | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - if (testpoint(sessiond_thread_manage_apps_before_loop)) { - goto error; - } - - health_code_update(); - - while (1) { - DBG("Apps thread polling"); - - /* Inifinite blocking call, waiting for transmission */ - restart: - health_poll_entry(); - ret = lttng_poll_wait(&events, -1); - DBG("Apps thread return from poll on %d fds", - LTTNG_POLL_GETNB(&events)); - health_poll_exit(); - if (ret < 0) { - /* - * Restart interrupted system call. - */ - if (errno == EINTR) { - goto restart; - } - goto error; - } - - nb_fd = ret; - - for (i = 0; i < nb_fd; i++) { - /* Fetch once the poll data */ - revents = LTTNG_POLL_GETEV(&events, i); - pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Thread quit pipe has been closed. Killing thread. */ - ret = sessiond_check_thread_quit_pipe(pollfd, revents); - if (ret) { - err = 0; - goto exit; - } - - /* Inspect the apps cmd pipe */ - if (pollfd == apps_cmd_pipe[0]) { - if (revents & LPOLLIN) { - int sock; - - /* Empty pipe */ - size_ret = lttng_read(apps_cmd_pipe[0], &sock, sizeof(sock)); - if (size_ret < sizeof(sock)) { - PERROR("read apps cmd pipe"); - goto error; - } - - health_code_update(); - - /* - * Since this is a command socket (write then read), - * we only monitor the error events of the socket. - */ - ret = lttng_poll_add(&events, sock, - LPOLLERR | LPOLLHUP | LPOLLRDHUP); - if (ret < 0) { - goto error; - } - - DBG("Apps with sock %d added to poll set", sock); - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - ERR("Apps command pipe error"); - goto error; - } else { - ERR("Unknown poll events %u for sock %d", revents, pollfd); - goto error; - } - } else { - /* - * At this point, we know that a registered application made - * the event at poll_wait. - */ - if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - /* Removing from the poll set */ - ret = lttng_poll_del(&events, pollfd); - if (ret < 0) { - goto error; - } - - /* Socket closed on remote end. */ - ust_app_unregister(pollfd); - } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); - goto error; - } - } - - health_code_update(); - } - } - -exit: -error: - lttng_poll_clean(&events); -error_poll_create: -error_testpoint: - utils_close_pipe(apps_cmd_pipe); - apps_cmd_pipe[0] = apps_cmd_pipe[1] = -1; - - /* - * We don't clean the UST app hash table here since already registered - * applications can still be controlled so let them be until the session - * daemon dies or the applications stop. - */ - - if (err) { - health_error(); - ERR("Health error occurred in %s", __func__); - } - health_unregister(health_sessiond); - DBG("Application communication apps thread cleanup complete"); - rcu_thread_offline(); - rcu_unregister_thread(); - return NULL; -} - /* * Setup necessary data for kernel tracer action. */ @@ -2605,13 +2436,8 @@ int main(int argc, char **argv) } /* Create thread to manage application socket */ - ret = pthread_create(&apps_thread, default_pthread_attr(), - thread_manage_apps, (void *) NULL); - if (ret) { - errno = ret; - PERROR("pthread_create apps"); + if (!launch_application_management_thread(apps_cmd_pipe[0])) { retval = -1; - stop_threads(); goto exit_apps; } @@ -2715,13 +2541,6 @@ exit_agent_reg: retval = -1; } exit_apps_notify: - - ret = pthread_join(apps_thread, &status); - if (ret) { - errno = ret; - PERROR("pthread_join apps"); - retval = -1; - } exit_apps: exit_reg_apps: exit_dispatch: diff --git a/src/bin/lttng-sessiond/manage-apps.c b/src/bin/lttng-sessiond/manage-apps.c new file mode 100644 index 000000000..6ef0351fe --- /dev/null +++ b/src/bin/lttng-sessiond/manage-apps.c @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "manage-apps.h" +#include "testpoint.h" +#include "health-sessiond.h" +#include "utils.h" +#include "thread.h" + +struct thread_notifiers { + struct lttng_pipe *quit_pipe; + int apps_cmd_pipe_read_fd; +}; + +static void cleanup_application_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + + lttng_pipe_destroy(notifiers->quit_pipe); + free(notifiers); +} + +/* + * This thread receives application command sockets (FDs) on the + * apps_cmd_pipe and waits (polls) on them until they are closed + * or an error occurs. + * + * At that point, it flushes the data (tracing and metadata) associated + * with this application and tears down ust app sessions and other + * associated data structures through ust_app_unregister(). + * + * Note that this thread never sends commands to the applications + * through the command sockets; it merely listens for hang-ups + * and errors on those sockets and cleans-up as they occur. + */ +static void *thread_application_management(void *data) +{ + int i, ret, pollfd, err = -1; + ssize_t size_ret; + uint32_t revents, nb_fd; + struct lttng_poll_event events; + struct thread_notifiers *notifiers = data; + const int quit_pipe_read_fd = lttng_pipe_get_readfd( + notifiers->quit_pipe); + + DBG("[thread] Manage application started"); + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_APP_MANAGE); + + if (testpoint(sessiond_thread_manage_apps)) { + goto error_testpoint; + } + + health_code_update(); + + ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + if (ret < 0) { + goto error_poll_create; + } + + ret = lttng_poll_add(&events, notifiers->apps_cmd_pipe_read_fd, + LPOLLIN | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + + ret = lttng_poll_add(&events, quit_pipe_read_fd, LPOLLIN | LPOLLERR); + if (ret < 0) { + goto error; + } + + if (testpoint(sessiond_thread_manage_apps_before_loop)) { + goto error; + } + + health_code_update(); + + while (1) { + DBG("Apps thread polling"); + + /* Inifinite blocking call, waiting for transmission */ + restart: + health_poll_entry(); + ret = lttng_poll_wait(&events, -1); + DBG("Apps thread return from poll on %d fds", + LTTNG_POLL_GETNB(&events)); + health_poll_exit(); + if (ret < 0) { + /* + * Restart interrupted system call. + */ + if (errno == EINTR) { + goto restart; + } + goto error; + } + + nb_fd = ret; + + for (i = 0; i < nb_fd; i++) { + /* Fetch once the poll data */ + revents = LTTNG_POLL_GETEV(&events, i); + pollfd = LTTNG_POLL_GETFD(&events, i); + + health_code_update(); + + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + + if (pollfd == quit_pipe_read_fd) { + err = 0; + goto exit; + } else if (pollfd == notifiers->apps_cmd_pipe_read_fd) { + /* Inspect the apps cmd pipe */ + if (revents & LPOLLIN) { + int sock; + + /* Empty pipe */ + size_ret = lttng_read( + notifiers->apps_cmd_pipe_read_fd, + &sock, sizeof(sock)); + if (size_ret < sizeof(sock)) { + PERROR("read apps cmd pipe"); + goto error; + } + + health_code_update(); + + /* + * Since this is a command socket (write then read), + * we only monitor the error events of the socket. + */ + ret = lttng_poll_add(&events, sock, + LPOLLERR | LPOLLHUP | LPOLLRDHUP); + if (ret < 0) { + goto error; + } + + DBG("Apps with sock %d added to poll set", sock); + } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ERR("Apps command pipe error"); + goto error; + } else { + ERR("Unknown poll events %u for sock %d", revents, pollfd); + goto error; + } + } else { + /* + * At this point, we know that a registered application made + * the event at poll_wait. + */ + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + /* Removing from the poll set */ + ret = lttng_poll_del(&events, pollfd); + if (ret < 0) { + goto error; + } + + /* Socket closed on remote end. */ + ust_app_unregister(pollfd); + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto error; + } + } + + health_code_update(); + } + } + +exit: +error: + lttng_poll_clean(&events); +error_poll_create: +error_testpoint: + + /* + * We don't clean the UST app hash table here since already registered + * applications can still be controlled so let them be until the session + * daemon dies or the applications stop. + */ + + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_sessiond); + DBG("Application communication apps thread cleanup complete"); + rcu_thread_offline(); + rcu_unregister_thread(); + return NULL; +} + +static bool shutdown_application_management_thread(void *data) +{ + struct thread_notifiers *notifiers = data; + const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe); + + return notify_thread_pipe(write_fd) == 1; +} + +bool launch_application_management_thread(int apps_cmd_pipe_read_fd) +{ + struct lttng_pipe *quit_pipe; + struct thread_notifiers *notifiers = NULL; + struct lttng_thread *thread; + + quit_pipe = lttng_pipe_open(FD_CLOEXEC); + if (!quit_pipe) { + goto error; + } + + notifiers = zmalloc(sizeof(*notifiers)); + if (!notifiers) { + goto error; + } + notifiers->quit_pipe = quit_pipe; + notifiers->apps_cmd_pipe_read_fd = apps_cmd_pipe_read_fd; + + thread = lttng_thread_create("UST application management", + thread_application_management, + shutdown_application_management_thread, + cleanup_application_management_thread, + notifiers); + if (!thread) { + goto error; + } + + lttng_thread_put(thread); + return true; +error: + cleanup_application_management_thread(notifiers); + return false; +} diff --git a/src/bin/lttng-sessiond/manage-apps.h b/src/bin/lttng-sessiond/manage-apps.h new file mode 100644 index 000000000..7a1477835 --- /dev/null +++ b/src/bin/lttng-sessiond/manage-apps.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2011 - David Goulet + * Mathieu Desnoyers + * 2013 - Jérémie Galarneau + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef SESSIOND_APPLICATION_MANAGEMENT_THREAD_H +#define SESSIOND_APPLICATION_MANAGEMENT_THREAD_H + +#include +#include "lttng-sessiond.h" + +bool launch_application_management_thread(int apps_cmd_pipe_read_fd); + +#endif /* SESSIOND_APPLICATION_MANAGEMENT_THREAD_H */