syscall.h syscall.c \
notification-thread.h notification-thread.c \
notification-thread-commands.h notification-thread-commands.c \
- notification-thread-events.h notification-thread-events.c
+ notification-thread-events.h notification-thread-events.c \
+ sessiond-timer.c
if HAVE_LIBLTTNG_UST_CTL
lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
#include "syscall.h"
#include "agent.h"
#include "ht-cleanup.h"
+#include "sessiond-timer.h"
#define CONSUMERD_FILE "lttng-consumerd"
int apps_cmd_notify_pipe[2] = { -1, -1 };
+/*
+ * Pipe to wakeup the rotation thread when the
+ * LTTNG_SESSIOND_SIG_ROTATE_PENDING signal is caught.
+ */
+static int rotate_timer_pipe[2] = { -1, -1 };
+
/* Pthread, Mutexes and Semaphores */
static pthread_t apps_thread;
static pthread_t apps_notify_thread;
static pthread_t load_session_thread;
static pthread_t notification_thread;
static pthread_t rotation_thread;
+static pthread_t timer_thread;
/*
* UST registration command queue. This queue is tied with a futex and uses a N
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&ust_cmd_queue.futex);
+
+ /* timer thread */
+ kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
}
/*
*ust64_channel_monitor_pipe = NULL,
*kernel_channel_monitor_pipe = NULL;
bool notification_thread_running = false;
+ bool timer_thread_running = false;
struct lttng_pipe *ust32_channel_rotate_pipe = NULL,
*ust64_channel_rotate_pipe = NULL,
*kernel_channel_rotate_pipe = NULL;
goto exit_init_data;
}
+ /*
+ * Create the rotate_timer_pipe as non blocking because we have to
+ * write in it from the sighandler.
+ */
+ ret = utils_create_pipe_cloexec_nonblock(rotate_timer_pipe);
+ if (ret < 0) {
+ ERR("Failed to create rotate pending pipe");
+ retval = -1;
+ goto exit_init_data;
+ }
+
/* 64 bits consumerd path setup */
ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX,
DEFAULT_USTCONSUMERD64_ERR_SOCK_PATH, rundir);
ust32_channel_rotate_pipe,
ust64_channel_rotate_pipe,
kernel_channel_rotate_pipe,
- thread_quit_pipe[0]);
+ thread_quit_pipe[0],
+ rotate_timer_pipe[0]);
if (!rotation_thread_handle) {
retval = -1;
ERR("Failed to create rotation thread shared data");
goto exit_rotation;
}
+ /* Create timer thread. */
+ ret = pthread_create(&timer_thread, default_pthread_attr(),
+ sessiond_timer_thread, NULL);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_create timer");
+ retval = -1;
+ stop_threads();
+ goto exit_notification;
+ }
+ timer_thread_running = true;
+
/* Create rotation thread. */
ret = pthread_create(&rotation_thread, default_pthread_attr(),
thread_rotation, rotation_thread_handle);
if (rotation_thread_handle) {
rotation_thread_handle_destroy(rotation_thread_handle);
+ ret = close(rotate_timer_pipe[0]);
+ if (ret < 0) {
+ PERROR("Close rotate pending pipe");
+ retval = -1;
+ }
+ ret = close(rotate_timer_pipe[1]);
+ if (ret < 0) {
+ PERROR("Close rotate pending pipe");
+ retval = -1;
+ }
}
ret = pthread_join(rotation_thread, &status);
retval = -1;
}
+ if (timer_thread_running) {
+ ret = pthread_join(timer_thread, &status);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join timer thread");
+ retval = -1;
+ }
+ }
+
rcu_thread_offline();
rcu_unregister_thread();
struct lttng_pipe *ust32_channel_rotate_pipe,
struct lttng_pipe *ust64_channel_rotate_pipe,
struct lttng_pipe *kernel_channel_rotate_pipe,
- int thread_quit_pipe)
+ int thread_quit_pipe, int rotate_timer_pipe)
{
struct rotation_thread_handle *handle;
handle->kernel_consumer = -1;
}
handle->thread_quit_pipe = thread_quit_pipe;
+ handle->rotate_timer_pipe = rotate_timer_pipe;
end:
return handle;
int ret;
/*
- * Create pollset with size 4:
+ * Create pollset with size 5:
* - sessiond quit pipe
* - consumerd (32-bit user space) channel rotate pipe,
* - consumerd (64-bit user space) channel rotate pipe,
* - consumerd (kernel) channel rotate pipe.
+ * - sessiond rotate pending pipe
*/
- ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC);
+ ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
if (ret < 0) {
goto end;
}
ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
goto error;
}
+ ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
+ LPOLLIN | LPOLLERR);
+ if (ret < 0) {
+ ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+ goto error;
+ }
ret = lttng_poll_add(poll_set, handle->ust32_consumer,
LPOLLIN | LPOLLERR);
if (ret < 0) {
return ret;
}
+static
+int handle_rotate_timer_pipe(int fd, uint32_t revents,
+ struct rotation_thread_handle *handle,
+ struct rotation_thread_state *state)
+{
+ int ret = 0;
+ uint64_t session_id;
+ struct ltt_session *session;
+
+ if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ret = lttng_poll_del(&state->events, fd);
+ if (ret) {
+ ERR("[rotation-thread] Failed to remove consumer "
+ "rotate pending pipe from poll set");
+ }
+ goto end;
+ }
+
+ do {
+ ret = read(fd, &session_id, sizeof(session_id));
+ } while (ret == -1 && errno == EINTR);
+ if (ret != sizeof(session_id)) {
+ ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
+ fd);
+ ret = -1;
+ goto end;
+ }
+
+ session = session_find_by_id(session_id);
+ if (!session) {
+ ERR("[rotation-thread] Session %" PRIu64 " not found",
+ session_id);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+ session_id);
+ ret = relay_rotate_pending(session, session->rotate_count - 1);
+ if (ret < 0) {
+ ERR("[rotation-thread] Check relay rotate pending");
+ goto end;
+ }
+ if (ret == 0) {
+ DBG("[rotation-thread] Rotation completed on the relay for "
+ "session %" PRIu64, session_id);
+ /* TODO: delete timer */
+ } else if (ret == 1) {
+ DBG("[rotation-thread] Rotation still pending on the relay for "
+ "session %" PRIu64, session_id);
+ }
+ fprintf(stderr, "RET PENDING: %d\n", ret);
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
void *thread_rotation(void *data)
{
int ret;
if (fd == handle->thread_quit_pipe) {
DBG("[rotation-thread] Quit pipe activity");
goto exit;
+ } else if (fd == handle->rotate_timer_pipe) {
+ ret = handle_rotate_timer_pipe(fd, revents,
+ handle, &state);
+ if (ret) {
+ ERR("[rotation-thread] Rotate pending");
+ goto error;
+ }
} else if (fd == handle->ust32_consumer ||
fd == handle->ust64_consumer ||
fd == handle->kernel_consumer) {
int ust64_consumer;
int kernel_consumer;
int thread_quit_pipe;
+ int rotate_timer_pipe;
};
struct rotation_thread_state {
struct lttng_pipe *ust32_channel_rotate_pipe,
struct lttng_pipe *ust64_channel_rotate_pipe,
struct lttng_pipe *kernel_channel_rotate_pipe,
- int thread_quit_pipe);
+ int thread_quit_pipe, int rotate_timer_pipe);
void rotation_thread_handle_destroy(
struct rotation_thread_handle *handle);
time_t current_chunk_start_ts;
time_t session_last_stop_ts;
time_t last_begin_rotation_ts;
+ /*
+ * Timer to check periodically if a relay has completed the last
+ * rotation.
+ */
+ int rotate_relay_pending_timer_enabled;
+ timer_t rotate_relay_pending_timer;
};
/* Prototypes */
--- /dev/null
+/*
+ * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _LGPL_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <signal.h>
+
+#include "sessiond-timer.h"
+#include "health-sessiond.h"
+
+#if 0
+#include <bin/lttng-sessiond/ust-ctl.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
+#include <common/common.h>
+#include <common/compat/endian.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/consumer/consumer-stream.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-testpoint.h>
+#include <common/ust-consumer/ust-consumer.h>
+#endif
+
+static struct timer_signal_data timer_signal = {
+ .tid = 0,
+ .setup_done = 0,
+ .qs_done = 0,
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+
+/*
+ * Set custom signal mask to current thread.
+ */
+static void setmask(sigset_t *mask)
+{
+ int ret;
+
+ ret = sigemptyset(mask);
+ if (ret) {
+ PERROR("sigemptyset");
+ }
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN);
+ if (ret) {
+ PERROR("sigaddset teardown");
+ }
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_EXIT);
+ if (ret) {
+ PERROR("sigaddset exit");
+ }
+ ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ if (ret) {
+ PERROR("sigaddset switch");
+ }
+}
+
+static
+void sessiond_timer_signal_thread_qs(unsigned int signr)
+{
+ sigset_t pending_set;
+ int ret;
+
+ /*
+ * We need to be the only thread interacting with the thread
+ * that manages signals for teardown synchronization.
+ */
+ pthread_mutex_lock(&timer_signal.lock);
+
+ /* Ensure we don't have any signal queued for this session. */
+ for (;;) {
+ ret = sigemptyset(&pending_set);
+ if (ret == -1) {
+ PERROR("sigemptyset");
+ }
+ ret = sigpending(&pending_set);
+ if (ret == -1) {
+ PERROR("sigpending");
+ }
+ if (!sigismember(&pending_set, signr)) {
+ break;
+ }
+ caa_cpu_relax();
+ }
+
+ /*
+ * From this point, no new signal handler will be fired that would try to
+ * access "session". However, we still need to wait for any currently
+ * executing handler to complete.
+ */
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 0);
+ cmm_smp_mb();
+
+ /*
+ * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread wakes
+ * up.
+ */
+ kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN);
+
+ while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
+ caa_cpu_relax();
+ }
+ cmm_smp_mb();
+
+ pthread_mutex_unlock(&timer_signal.lock);
+}
+
+/*
+ * Start a timer on a session that will fire at a given interval
+ * (timer_interval_us) and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
+ */
+static
+int session_timer_start(timer_t *timer_id, struct ltt_session *session,
+ unsigned int timer_interval_us, int signal)
+{
+ int ret = 0, delete_ret;
+ struct sigevent sev;
+ struct itimerspec its;
+
+ assert(session);
+
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = signal;
+ sev.sigev_value.sival_ptr = session;
+ ret = timer_create(CLOCKID, &sev, timer_id);
+ if (ret == -1) {
+ PERROR("timer_create");
+ goto end;
+ }
+
+ its.it_value.tv_sec = timer_interval_us / 1000000;
+ its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
+ its.it_interval.tv_sec = its.it_value.tv_sec;
+ its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+ ret = timer_settime(*timer_id, 0, &its, NULL);
+ if (ret == -1) {
+ PERROR("timer_settime");
+ goto error_destroy_timer;
+ }
+ goto end;
+
+error_destroy_timer:
+ delete_ret = timer_delete(*timer_id);
+ if (delete_ret == -1) {
+ PERROR("timer_delete");
+ }
+
+end:
+ return ret;
+}
+
+
+static
+int session_timer_stop(timer_t *timer_id, int signal)
+{
+ int ret = 0;
+
+ ret = timer_delete(*timer_id);
+ if (ret == -1) {
+ PERROR("timer_delete");
+ goto end;
+ }
+
+ sessiond_timer_signal_thread_qs(signal);
+ *timer_id = 0;
+end:
+ return ret;
+}
+
+int rotate_pending_timer_start(struct ltt_session *session, unsigned int
+ interval_us)
+{
+ int ret;
+
+ ret = session_timer_start(&session->rotate_relay_pending_timer,
+ session, interval_us,
+ LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+ session->rotate_relay_pending_timer_enabled = !!(ret == 0);
+
+ return ret;
+}
+
+/*
+ * Block the RT signals for the entire process. It must be called from the
+ * sessiond main before creating the threads
+ */
+int sessiond_timer_signal_init(void)
+{
+ int ret;
+ sigset_t mask;
+
+ /* Block signal for entire process, so only our thread processes it. */
+ setmask(&mask);
+ ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_sigmask");
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * This thread is the sighandler for the timer signals.
+ */
+void *sessiond_timer_thread(void *data)
+{
+ int signr;
+ sigset_t mask;
+ siginfo_t info;
+
+ rcu_register_thread();
+ rcu_thread_online();
+
+ health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
+
+ health_code_update();
+
+ /* Only self thread will receive signal mask. */
+ setmask(&mask);
+ CMM_STORE_SHARED(timer_signal.tid, pthread_self());
+
+ while (1) {
+ health_code_update();
+
+ health_poll_entry();
+ signr = sigwaitinfo(&mask, &info);
+ health_poll_exit();
+
+ /*
+ * NOTE: cascading conditions are used instead of a switch case
+ * since the use of SIGRTMIN in the definition of the signals'
+ * values prevents the reduction to an integer constant.
+ */
+ if (signr == -1) {
+ if (errno != EINTR) {
+ PERROR("sigwaitinfo");
+ }
+ continue;
+ } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) {
+ cmm_smp_mb();
+ CMM_STORE_SHARED(timer_signal.qs_done, 1);
+ cmm_smp_mb();
+ DBG("Signal timer metadata thread teardown");
+ } else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
+ goto end;
+ } else {
+ ERR("Unexpected signal %d\n", info.si_signo);
+ }
+ }
+
+end:
+ health_unregister(health_sessiond);
+ rcu_thread_offline();
+ rcu_unregister_thread();
+ return NULL;
+}
--- /dev/null
+/*
+ * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef SESSIOND_TIMER_H
+#define SESSIOND_TIMER_H
+
+#include <pthread.h>
+
+#include "session.h"
+
+#define LTTNG_SESSIOND_SIG_TEARDOWN SIGRTMIN + 10
+#define LTTNG_SESSIOND_SIG_EXIT SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_ROTATE_PENDING SIGRTMIN + 12
+
+#define CLOCKID CLOCK_MONOTONIC
+
+/*
+ * Handle timer teardown race wrt memory free of private data by sessiond
+ * signals are handled by a single thread, which permits a synchronization
+ * point between handling of each signal. Internal lock ensures mutual
+ * exclusion.
+ */
+struct timer_signal_data {
+ pthread_t tid; /* thread id managing signals */
+ int setup_done;
+ int qs_done;
+ pthread_mutex_t lock;
+};
+
+void *sessiond_timer_thread(void *data);
+int sessiond_timer_signal_init(void);
+
+int rotate_pending_timer_start(struct ltt_session *session, unsigned int
+ interval_us);
+
+#endif /* SESSIOND_TIMER_H */