From 55c2a7f9cf105923d1ce86f2deb550bbf7e8062f Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Fri, 15 Sep 2017 14:26:43 -0400 Subject: [PATCH] timer thread in progress Signed-off-by: Julien Desfossez --- src/bin/lttng-sessiond/Makefile.am | 3 +- src/bin/lttng-sessiond/main.c | 57 ++++- src/bin/lttng-sessiond/rotate.c | 1 - src/bin/lttng-sessiond/rotation-thread.c | 80 ++++++- src/bin/lttng-sessiond/rotation-thread.h | 3 +- src/bin/lttng-sessiond/session.h | 6 + src/bin/lttng-sessiond/sessiond-timer.c | 275 +++++++++++++++++++++++ src/bin/lttng-sessiond/sessiond-timer.h | 50 +++++ 8 files changed, 468 insertions(+), 7 deletions(-) create mode 100644 src/bin/lttng-sessiond/sessiond-timer.c create mode 100644 src/bin/lttng-sessiond/sessiond-timer.h diff --git a/src/bin/lttng-sessiond/Makefile.am b/src/bin/lttng-sessiond/Makefile.am index 592a5d7e0..46e326de8 100644 --- a/src/bin/lttng-sessiond/Makefile.am +++ b/src/bin/lttng-sessiond/Makefile.am @@ -34,7 +34,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \ syscall.h syscall.c \ notification-thread.h notification-thread.c \ notification-thread-commands.h notification-thread-commands.c \ - notification-thread-events.h notification-thread-events.c + notification-thread-events.h notification-thread-events.c \ + sessiond-timer.c if HAVE_LIBLTTNG_UST_CTL lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \ diff --git a/src/bin/lttng-sessiond/main.c b/src/bin/lttng-sessiond/main.c index e725b7fcd..b2c36fc1d 100644 --- a/src/bin/lttng-sessiond/main.c +++ b/src/bin/lttng-sessiond/main.c @@ -77,6 +77,7 @@ #include "syscall.h" #include "agent.h" #include "ht-cleanup.h" +#include "sessiond-timer.h" #define CONSUMERD_FILE "lttng-consumerd" @@ -217,6 +218,12 @@ static int apps_cmd_pipe[2] = { -1, -1 }; int apps_cmd_notify_pipe[2] = { -1, -1 }; +/* + * Pipe to wakeup the rotation thread when the + * LTTNG_SESSIOND_SIG_ROTATE_PENDING signal is caught. + */ +static int rotate_timer_pipe[2] = { -1, -1 }; + /* Pthread, Mutexes and Semaphores */ static pthread_t apps_thread; static pthread_t apps_notify_thread; @@ -230,6 +237,7 @@ static pthread_t agent_reg_thread; static pthread_t load_session_thread; static pthread_t notification_thread; static pthread_t rotation_thread; +static pthread_t timer_thread; /* * UST registration command queue. This queue is tied with a futex and uses a N @@ -502,6 +510,9 @@ static void stop_threads(void) /* Dispatch thread */ CMM_STORE_SHARED(dispatch_thread_exit, 1); futex_nto1_wake(&ust_cmd_queue.futex); + + /* timer thread */ + kill(getpid(), LTTNG_SESSIOND_SIG_EXIT); } /* @@ -5714,6 +5725,7 @@ int main(int argc, char **argv) *ust64_channel_monitor_pipe = NULL, *kernel_channel_monitor_pipe = NULL; bool notification_thread_running = false; + bool timer_thread_running = false; struct lttng_pipe *ust32_channel_rotate_pipe = NULL, *ust64_channel_rotate_pipe = NULL, *kernel_channel_rotate_pipe = NULL; @@ -6030,6 +6042,17 @@ int main(int argc, char **argv) goto exit_init_data; } + /* + * Create the rotate_timer_pipe as non blocking because we have to + * write in it from the sighandler. + */ + ret = utils_create_pipe_cloexec_nonblock(rotate_timer_pipe); + if (ret < 0) { + ERR("Failed to create rotate pending pipe"); + retval = -1; + goto exit_init_data; + } + /* 64 bits consumerd path setup */ ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX, DEFAULT_USTCONSUMERD64_ERR_SOCK_PATH, rundir); @@ -6272,7 +6295,8 @@ int main(int argc, char **argv) ust32_channel_rotate_pipe, ust64_channel_rotate_pipe, kernel_channel_rotate_pipe, - thread_quit_pipe[0]); + thread_quit_pipe[0], + rotate_timer_pipe[0]); if (!rotation_thread_handle) { retval = -1; ERR("Failed to create rotation thread shared data"); @@ -6280,6 +6304,18 @@ int main(int argc, char **argv) goto exit_rotation; } + /* Create timer thread. */ + ret = pthread_create(&timer_thread, default_pthread_attr(), + sessiond_timer_thread, NULL); + if (ret) { + errno = ret; + PERROR("pthread_create timer"); + retval = -1; + stop_threads(); + goto exit_notification; + } + timer_thread_running = true; + /* Create rotation thread. */ ret = pthread_create(&rotation_thread, default_pthread_attr(), thread_rotation, rotation_thread_handle); @@ -6509,6 +6545,16 @@ exit_init_data: if (rotation_thread_handle) { rotation_thread_handle_destroy(rotation_thread_handle); + ret = close(rotate_timer_pipe[0]); + if (ret < 0) { + PERROR("Close rotate pending pipe"); + retval = -1; + } + ret = close(rotate_timer_pipe[1]); + if (ret < 0) { + PERROR("Close rotate pending pipe"); + retval = -1; + } } ret = pthread_join(rotation_thread, &status); @@ -6518,6 +6564,15 @@ exit_init_data: retval = -1; } + if (timer_thread_running) { + ret = pthread_join(timer_thread, &status); + if (ret) { + errno = ret; + PERROR("pthread_join timer thread"); + retval = -1; + } + } + rcu_thread_offline(); rcu_unregister_thread(); diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c index 98406f429..9ce6f7e07 100644 --- a/src/bin/lttng-sessiond/rotate.c +++ b/src/bin/lttng-sessiond/rotate.c @@ -340,4 +340,3 @@ int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id) end: return ret; } - diff --git a/src/bin/lttng-sessiond/rotation-thread.c b/src/bin/lttng-sessiond/rotation-thread.c index 24c9715a1..118326dae 100644 --- a/src/bin/lttng-sessiond/rotation-thread.c +++ b/src/bin/lttng-sessiond/rotation-thread.c @@ -129,7 +129,7 @@ struct rotation_thread_handle *rotation_thread_handle_create( struct lttng_pipe *ust32_channel_rotate_pipe, struct lttng_pipe *ust64_channel_rotate_pipe, struct lttng_pipe *kernel_channel_rotate_pipe, - int thread_quit_pipe) + int thread_quit_pipe, int rotate_timer_pipe) { struct rotation_thread_handle *handle; @@ -169,6 +169,7 @@ struct rotation_thread_handle *rotation_thread_handle_create( handle->kernel_consumer = -1; } handle->thread_quit_pipe = thread_quit_pipe; + handle->rotate_timer_pipe = rotate_timer_pipe; end: return handle; @@ -184,13 +185,14 @@ int init_poll_set(struct lttng_poll_event *poll_set, int ret; /* - * Create pollset with size 4: + * Create pollset with size 5: * - sessiond quit pipe * - consumerd (32-bit user space) channel rotate pipe, * - consumerd (64-bit user space) channel rotate pipe, * - consumerd (kernel) channel rotate pipe. + * - sessiond rotate pending pipe */ - ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC); + ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC); if (ret < 0) { goto end; } @@ -201,6 +203,12 @@ int init_poll_set(struct lttng_poll_event *poll_set, ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset"); goto error; } + ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe, + LPOLLIN | LPOLLERR); + if (ret < 0) { + ERR("[rotation-thread] Failed to add rotate_pending fd to pollset"); + goto error; + } ret = lttng_poll_add(poll_set, handle->ust32_consumer, LPOLLIN | LPOLLERR); if (ret < 0) { @@ -334,6 +342,65 @@ end: return ret; } +static +int handle_rotate_timer_pipe(int fd, uint32_t revents, + struct rotation_thread_handle *handle, + struct rotation_thread_state *state) +{ + int ret = 0; + uint64_t session_id; + struct ltt_session *session; + + if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { + ret = lttng_poll_del(&state->events, fd); + if (ret) { + ERR("[rotation-thread] Failed to remove consumer " + "rotate pending pipe from poll set"); + } + goto end; + } + + do { + ret = read(fd, &session_id, sizeof(session_id)); + } while (ret == -1 && errno == EINTR); + if (ret != sizeof(session_id)) { + ERR("[rotation-thread] Failed to read from pipe (fd = %i)", + fd); + ret = -1; + goto end; + } + + session = session_find_by_id(session_id); + if (!session) { + ERR("[rotation-thread] Session %" PRIu64 " not found", + session_id); + ret = -1; + goto end; + } + + DBG("[rotation-thread] Check rotate pending on session %" PRIu64, + session_id); + ret = relay_rotate_pending(session, session->rotate_count - 1); + if (ret < 0) { + ERR("[rotation-thread] Check relay rotate pending"); + goto end; + } + if (ret == 0) { + DBG("[rotation-thread] Rotation completed on the relay for " + "session %" PRIu64, session_id); + /* TODO: delete timer */ + } else if (ret == 1) { + DBG("[rotation-thread] Rotation still pending on the relay for " + "session %" PRIu64, session_id); + } + fprintf(stderr, "RET PENDING: %d\n", ret); + + ret = 0; + +end: + return ret; +} + void *thread_rotation(void *data) { int ret; @@ -391,6 +458,13 @@ void *thread_rotation(void *data) if (fd == handle->thread_quit_pipe) { DBG("[rotation-thread] Quit pipe activity"); goto exit; + } else if (fd == handle->rotate_timer_pipe) { + ret = handle_rotate_timer_pipe(fd, revents, + handle, &state); + if (ret) { + ERR("[rotation-thread] Rotate pending"); + goto error; + } } else if (fd == handle->ust32_consumer || fd == handle->ust64_consumer || fd == handle->kernel_consumer) { diff --git a/src/bin/lttng-sessiond/rotation-thread.h b/src/bin/lttng-sessiond/rotation-thread.h index 580ddbbb4..762ef0c0d 100644 --- a/src/bin/lttng-sessiond/rotation-thread.h +++ b/src/bin/lttng-sessiond/rotation-thread.h @@ -51,6 +51,7 @@ struct rotation_thread_handle { int ust64_consumer; int kernel_consumer; int thread_quit_pipe; + int rotate_timer_pipe; }; struct rotation_thread_state { @@ -62,7 +63,7 @@ struct rotation_thread_handle *rotation_thread_handle_create( struct lttng_pipe *ust32_channel_rotate_pipe, struct lttng_pipe *ust64_channel_rotate_pipe, struct lttng_pipe *kernel_channel_rotate_pipe, - int thread_quit_pipe); + int thread_quit_pipe, int rotate_timer_pipe); void rotation_thread_handle_destroy( struct rotation_thread_handle *handle); diff --git a/src/bin/lttng-sessiond/session.h b/src/bin/lttng-sessiond/session.h index 25f0927f7..5edd96268 100644 --- a/src/bin/lttng-sessiond/session.h +++ b/src/bin/lttng-sessiond/session.h @@ -161,6 +161,12 @@ struct ltt_session { time_t current_chunk_start_ts; time_t session_last_stop_ts; time_t last_begin_rotation_ts; + /* + * Timer to check periodically if a relay has completed the last + * rotation. + */ + int rotate_relay_pending_timer_enabled; + timer_t rotate_relay_pending_timer; }; /* Prototypes */ diff --git a/src/bin/lttng-sessiond/sessiond-timer.c b/src/bin/lttng-sessiond/sessiond-timer.c new file mode 100644 index 000000000..bf27a9ee6 --- /dev/null +++ b/src/bin/lttng-sessiond/sessiond-timer.c @@ -0,0 +1,275 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#define _LGPL_SOURCE +#include +#include +#include + +#include "sessiond-timer.h" +#include "health-sessiond.h" + +#if 0 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +static struct timer_signal_data timer_signal = { + .tid = 0, + .setup_done = 0, + .qs_done = 0, + .lock = PTHREAD_MUTEX_INITIALIZER, +}; + +/* + * Set custom signal mask to current thread. + */ +static void setmask(sigset_t *mask) +{ + int ret; + + ret = sigemptyset(mask); + if (ret) { + PERROR("sigemptyset"); + } + ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN); + if (ret) { + PERROR("sigaddset teardown"); + } + ret = sigaddset(mask, LTTNG_SESSIOND_SIG_EXIT); + if (ret) { + PERROR("sigaddset exit"); + } + ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING); + if (ret) { + PERROR("sigaddset switch"); + } +} + +static +void sessiond_timer_signal_thread_qs(unsigned int signr) +{ + sigset_t pending_set; + int ret; + + /* + * We need to be the only thread interacting with the thread + * that manages signals for teardown synchronization. + */ + pthread_mutex_lock(&timer_signal.lock); + + /* Ensure we don't have any signal queued for this session. */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + if (!sigismember(&pending_set, signr)) { + break; + } + caa_cpu_relax(); + } + + /* + * From this point, no new signal handler will be fired that would try to + * access "session". However, we still need to wait for any currently + * executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread wakes + * up. + */ + kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) { + caa_cpu_relax(); + } + cmm_smp_mb(); + + pthread_mutex_unlock(&timer_signal.lock); +} + +/* + * Start a timer on a session that will fire at a given interval + * (timer_interval_us) and fire a given signal (signal). + * + * Returns a negative value on error, 0 if a timer was created, and + * a positive value if no timer was created (not an error). + */ +static +int session_timer_start(timer_t *timer_id, struct ltt_session *session, + unsigned int timer_interval_us, int signal) +{ + int ret = 0, delete_ret; + struct sigevent sev; + struct itimerspec its; + + assert(session); + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = signal; + sev.sigev_value.sival_ptr = session; + ret = timer_create(CLOCKID, &sev, timer_id); + if (ret == -1) { + PERROR("timer_create"); + goto end; + } + + its.it_value.tv_sec = timer_interval_us / 1000000; + its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000; + its.it_interval.tv_sec = its.it_value.tv_sec; + its.it_interval.tv_nsec = its.it_value.tv_nsec; + + ret = timer_settime(*timer_id, 0, &its, NULL); + if (ret == -1) { + PERROR("timer_settime"); + goto error_destroy_timer; + } + goto end; + +error_destroy_timer: + delete_ret = timer_delete(*timer_id); + if (delete_ret == -1) { + PERROR("timer_delete"); + } + +end: + return ret; +} + + +static +int session_timer_stop(timer_t *timer_id, int signal) +{ + int ret = 0; + + ret = timer_delete(*timer_id); + if (ret == -1) { + PERROR("timer_delete"); + goto end; + } + + sessiond_timer_signal_thread_qs(signal); + *timer_id = 0; +end: + return ret; +} + +int rotate_pending_timer_start(struct ltt_session *session, unsigned int + interval_us) +{ + int ret; + + ret = session_timer_start(&session->rotate_relay_pending_timer, + session, interval_us, + LTTNG_SESSIOND_SIG_ROTATE_PENDING); + session->rotate_relay_pending_timer_enabled = !!(ret == 0); + + return ret; +} + +/* + * Block the RT signals for the entire process. It must be called from the + * sessiond main before creating the threads + */ +int sessiond_timer_signal_init(void) +{ + int ret; + sigset_t mask; + + /* Block signal for entire process, so only our thread processes it. */ + setmask(&mask); + ret = pthread_sigmask(SIG_BLOCK, &mask, NULL); + if (ret) { + errno = ret; + PERROR("pthread_sigmask"); + return -1; + } + return 0; +} + +/* + * This thread is the sighandler for the timer signals. + */ +void *sessiond_timer_thread(void *data) +{ + int signr; + sigset_t mask; + siginfo_t info; + + rcu_register_thread(); + rcu_thread_online(); + + health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION); + + health_code_update(); + + /* Only self thread will receive signal mask. */ + setmask(&mask); + CMM_STORE_SHARED(timer_signal.tid, pthread_self()); + + while (1) { + health_code_update(); + + health_poll_entry(); + signr = sigwaitinfo(&mask, &info); + health_poll_exit(); + + /* + * NOTE: cascading conditions are used instead of a switch case + * since the use of SIGRTMIN in the definition of the signals' + * values prevents the reduction to an integer constant. + */ + if (signr == -1) { + if (errno != EINTR) { + PERROR("sigwaitinfo"); + } + continue; + } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) { + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 1); + cmm_smp_mb(); + DBG("Signal timer metadata thread teardown"); + } else if (signr == LTTNG_SESSIOND_SIG_EXIT) { + goto end; + } else { + ERR("Unexpected signal %d\n", info.si_signo); + } + } + +end: + health_unregister(health_sessiond); + rcu_thread_offline(); + rcu_unregister_thread(); + return NULL; +} diff --git a/src/bin/lttng-sessiond/sessiond-timer.h b/src/bin/lttng-sessiond/sessiond-timer.h new file mode 100644 index 000000000..681e154f3 --- /dev/null +++ b/src/bin/lttng-sessiond/sessiond-timer.h @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2017 - Julien Desfossez + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#ifndef SESSIOND_TIMER_H +#define SESSIOND_TIMER_H + +#include + +#include "session.h" + +#define LTTNG_SESSIOND_SIG_TEARDOWN SIGRTMIN + 10 +#define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11 +#define LTTNG_SESSIOND_SIG_ROTATE_PENDING SIGRTMIN + 12 + +#define CLOCKID CLOCK_MONOTONIC + +/* + * Handle timer teardown race wrt memory free of private data by sessiond + * signals are handled by a single thread, which permits a synchronization + * point between handling of each signal. Internal lock ensures mutual + * exclusion. + */ +struct timer_signal_data { + pthread_t tid; /* thread id managing signals */ + int setup_done; + int qs_done; + pthread_mutex_t lock; +}; + +void *sessiond_timer_thread(void *data); +int sessiond_timer_signal_init(void); + +int rotate_pending_timer_start(struct ltt_session *session, unsigned int + interval_us); + +#endif /* SESSIOND_TIMER_H */ -- 2.34.1