X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fnotification-thread.c;h=5a7c0075aa1f24ae72592a1ce7adac61fa9b68e1;hp=c47b36533e2bdef05ad3ba4c609b9382e10856ed;hb=2463b7879c00298daa79744cdaae82ac061a4ed8;hpb=814b4934e2604a419bcb8eec57c0450dbb47e2c3 diff --git a/src/bin/lttng-sessiond/notification-thread.c b/src/bin/lttng-sessiond/notification-thread.c index c47b36533..5a7c0075a 100644 --- a/src/bin/lttng-sessiond/notification-thread.c +++ b/src/bin/lttng-sessiond/notification-thread.c @@ -1,18 +1,8 @@ /* - * Copyright (C) 2017 - Jérémie Galarneau + * Copyright (C) 2017 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License, version 2 only, as - * published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #define _LGPL_SOURCE @@ -37,11 +27,18 @@ #include "notification-thread-commands.h" #include "lttng-sessiond.h" #include "health-sessiond.h" +#include "thread.h" +#include "testpoint.h" + +#include "kernel.h" +#include #include #include #include + +int trigger_consumption_paused; /* * Destroy the thread data previously created by the init function. */ @@ -56,6 +53,7 @@ void notification_thread_handle_destroy( assert(cds_list_empty(&handle->cmd_queue.list)); pthread_mutex_destroy(&handle->cmd_queue.lock); + sem_destroy(&handle->ready); if (handle->cmd_queue.event_pipe) { lttng_pipe_destroy(handle->cmd_queue.event_pipe); @@ -78,14 +76,21 @@ void notification_thread_handle_destroy( PERROR("close kernel consumer channel monitoring pipe"); } } + end: free(handle); } +/* + * TODO: refactor this if needed. Lifetime of the kernel notification event source. + * The kernel_notification_monitor_fd ownwership remain to the main thread. + * This is because we need to close this fd before removing the modules. + */ struct notification_thread_handle *notification_thread_handle_create( struct lttng_pipe *ust32_channel_monitor_pipe, struct lttng_pipe *ust64_channel_monitor_pipe, - struct lttng_pipe *kernel_channel_monitor_pipe) + struct lttng_pipe *kernel_channel_monitor_pipe, + int kernel_notification_monitor_fd) { int ret; struct notification_thread_handle *handle; @@ -96,7 +101,9 @@ struct notification_thread_handle *notification_thread_handle_create( goto end; } - event_pipe = lttng_pipe_open(O_CLOEXEC); + sem_init(&handle->ready, 0, 0); + + event_pipe = lttng_pipe_open(FD_CLOEXEC); if (!event_pipe) { ERR("event_pipe creation"); goto error; @@ -141,6 +148,12 @@ struct notification_thread_handle *notification_thread_handle_create( } else { handle->channel_monitoring_pipes.kernel_consumer = -1; } + + CDS_INIT_LIST_HEAD(&handle->event_trigger_sources.list); + ret = pthread_mutex_init(&handle->event_trigger_sources.lock, NULL); + if (ret) { + goto error; + } end: return handle; error: @@ -168,7 +181,7 @@ char *get_notification_channel_sock_path(void) goto error; } } else { - char *home_path = utils_get_home_dir(); + const char *home_path = utils_get_home_dir(); if (!home_path) { ERR("Can't get HOME directory for socket creation"); @@ -235,8 +248,16 @@ int notification_channel_socket_create(void) } if (getuid() == 0) { - ret = chown(sock_path, 0, - utils_get_group_id(tracing_group_name)); + gid_t gid; + + ret = utils_get_group_id(config.tracing_group_name.value, true, + &gid); + if (ret) { + /* Default to root group. */ + gid = 0; + } + + ret = chown(sock_path, 0, gid); if (ret) { ERR("Failed to set the notification channel socket's group"); ret = -1; @@ -303,7 +324,7 @@ int init_poll_set(struct lttng_poll_event *poll_set, goto error; } if (handle->channel_monitoring_pipes.kernel_consumer < 0) { - goto end; + goto skip_kernel_consumer; } ret = lttng_poll_add(poll_set, handle->channel_monitoring_pipes.kernel_consumer, @@ -312,6 +333,8 @@ int init_poll_set(struct lttng_poll_event *poll_set, ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset"); goto error; } + +skip_kernel_consumer: end: return ret; error: @@ -330,6 +353,10 @@ void fini_thread_state(struct notification_thread_state *state) ret = cds_lfht_destroy(state->client_socket_ht, NULL); assert(!ret); } + if (state->client_id_ht) { + ret = cds_lfht_destroy(state->client_id_ht, NULL); + assert(!ret); + } if (state->triggers_ht) { ret = handle_notification_thread_trigger_unregister_all(state); assert(!ret); @@ -350,18 +377,54 @@ void fini_thread_state(struct notification_thread_state *state) assert(!ret); } if (state->channels_ht) { - ret = cds_lfht_destroy(state->channels_ht, - NULL); + ret = cds_lfht_destroy(state->channels_ht, NULL); + assert(!ret); + } + if (state->sessions_ht) { + ret = cds_lfht_destroy(state->sessions_ht, NULL); + assert(!ret); + } + if (state->triggers_by_name_uid_ht) { + ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL); + assert(!ret); + } + if (state->trigger_tokens_ht) { + ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL); + assert(!ret); + } + /* + * Must be destroyed after all channels have been destroyed. + * See comment in struct lttng_session_trigger_list. + */ + if (state->session_triggers_ht) { + ret = cds_lfht_destroy(state->session_triggers_ht, NULL); assert(!ret); } - if (state->notification_channel_socket >= 0) { notification_channel_socket_destroy( state->notification_channel_socket); } + if (state->executor) { + action_executor_destroy(state->executor); + } lttng_poll_clean(&state->events); } +static +void mark_thread_as_ready(struct notification_thread_handle *handle) +{ + DBG("Marking notification thread as ready"); + sem_post(&handle->ready); +} + +static +void wait_until_thread_is_ready(struct notification_thread_handle *handle) +{ + DBG("Waiting for notification thread to be ready"); + sem_wait(&handle->ready); + DBG("Notification thread is ready"); +} + static int init_thread_state(struct notification_thread_handle *handle, struct notification_thread_state *state) @@ -370,6 +433,7 @@ int init_thread_state(struct notification_thread_handle *handle, memset(state, 0, sizeof(*state)); state->notification_channel_socket = -1; + state->trigger_id.token_generator = 1; lttng_poll_init(&state->events); ret = notification_channel_socket_create(); @@ -397,12 +461,24 @@ int init_thread_state(struct notification_thread_handle *handle, goto error; } + state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->client_id_ht) { + goto error; + } + state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->channel_triggers_ht) { goto error; } + state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, + CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->session_triggers_ht) { + goto error; + } + state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->channel_state_ht) { @@ -420,12 +496,32 @@ int init_thread_state(struct notification_thread_handle *handle, if (!state->channels_ht) { goto error; } - + state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->sessions_ht) { + goto error; + } state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); if (!state->triggers_ht) { goto error; } + state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->triggers_by_name_uid_ht) { + goto error; + } + + state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE, + 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL); + if (!state->trigger_tokens_ht) { + goto error; + } + state->executor = action_executor_create(handle); + if (!state->executor) { + goto error; + } + mark_thread_as_ready(handle); end: return 0; error: @@ -469,27 +565,86 @@ end: return ret; } +static int handle_trigger_event_pipe(int fd, + enum lttng_domain_type domain, + uint32_t revents, + struct notification_thread_state *state) +{ + int ret = 0; + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ret = lttng_poll_del(&state->events, fd); + if (ret) { + ERR("[notification-thread] Failed to remove event monitoring pipe from poll set"); + } + goto end; + } + + if (testpoint(sessiond_handle_trigger_event_pipe)) { + ret = 0; + goto end; + } + + if (caa_unlikely(trigger_consumption_paused)) { + DBG("Trigger consumption paused, sleeping..."); + sleep(1); + goto end; + } + + ret = handle_notification_thread_event(state, fd, domain); + if (ret) { + ERR("[notification-thread] Event sample handling error occurred for fd: %d", fd); + ret = -1; + goto end; + } + +end: + return ret; +} + +/* + * Return the event source domain type via parameter. + */ +static bool fd_is_event_source(struct notification_thread_handle *handle, int fd, enum lttng_domain_type *domain) +{ + struct notification_event_trigger_source_element *source_element, *tmp; + + assert(domain); + + cds_list_for_each_entry_safe(source_element, tmp, + &handle->event_trigger_sources.list, node) { + if (source_element->fd != fd) { + continue; + } + *domain = source_element->domain; + return true; + } + return false; +} + /* * This thread services notification channel clients and commands received * from various lttng-sessiond components over a command queue. */ +static void *thread_notification(void *data) { int ret; struct notification_thread_handle *handle = data; struct notification_thread_state state; + enum lttng_domain_type domain; DBG("[notification-thread] Started notification thread"); + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); + rcu_register_thread(); + rcu_thread_online(); + if (!handle) { ERR("[notification-thread] Invalid thread context provided"); goto end; } - rcu_register_thread(); - rcu_thread_online(); - - health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); health_code_update(); ret = init_thread_state(handle, &state); @@ -497,8 +652,9 @@ void *thread_notification(void *data) goto end; } - /* Ready to handle client connections. */ - sessiond_notify_ready(); + if (testpoint(sessiond_thread_notification)) { + goto end; + } while (true) { int fd_count, i; @@ -524,9 +680,6 @@ void *thread_notification(void *data) int fd = LTTNG_POLL_GETFD(&state.events, i); uint32_t revents = LTTNG_POLL_GETEV(&state.events, i); - if (!revents) { - continue; - } DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents); if (fd == state.notification_channel_socket) { @@ -561,6 +714,11 @@ void *thread_notification(void *data) if (ret) { goto error; } + } else if (fd_is_event_source(handle, fd, &domain)) { + ret = handle_trigger_event_pipe(fd, domain, revents, &state); + if (ret) { + goto error; + } } else { /* Activity on a client's socket. */ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -599,9 +757,43 @@ void *thread_notification(void *data) exit: error: fini_thread_state(&state); - health_unregister(health_sessiond); +end: rcu_thread_offline(); rcu_unregister_thread(); -end: + health_unregister(health_sessiond); + return NULL; +} + +static +bool shutdown_notification_thread(void *thread_data) +{ + struct notification_thread_handle *handle = thread_data; + + notification_thread_command_quit(handle); + return true; +} + +struct lttng_thread *launch_notification_thread( + struct notification_thread_handle *handle) +{ + struct lttng_thread *thread; + + thread = lttng_thread_create("Notification", + thread_notification, + shutdown_notification_thread, + NULL, + handle); + if (!thread) { + goto error; + } + + /* + * Wait for the thread to be marked as "ready" before returning + * as other subsystems depend on the notification subsystem + * (e.g. rotation thread). + */ + wait_until_thread_is_ready(handle); + return thread; +error: return NULL; }