timer thread in progress
authorJulien Desfossez <jdesfossez@efficios.com>
Fri, 15 Sep 2017 18:26:43 +0000 (14:26 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Fri, 15 Sep 2017 18:26:43 +0000 (14:26 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/bin/lttng-sessiond/Makefile.am
src/bin/lttng-sessiond/main.c
src/bin/lttng-sessiond/rotate.c
src/bin/lttng-sessiond/rotation-thread.c
src/bin/lttng-sessiond/rotation-thread.h
src/bin/lttng-sessiond/session.h
src/bin/lttng-sessiond/sessiond-timer.c [new file with mode: 0644]
src/bin/lttng-sessiond/sessiond-timer.h [new file with mode: 0644]

index 592a5d7e071cfb56bdf9db83ffe977d8115b20b1..46e326de841e26ce048b41f60e4a4efadd7453c9 100644 (file)
@@ -34,7 +34,8 @@ lttng_sessiond_SOURCES = utils.c utils.h \
                        syscall.h syscall.c \
                        notification-thread.h notification-thread.c \
                        notification-thread-commands.h notification-thread-commands.c \
-                       notification-thread-events.h notification-thread-events.c
+                       notification-thread-events.h notification-thread-events.c \
+                      sessiond-timer.c
 
 if HAVE_LIBLTTNG_UST_CTL
 lttng_sessiond_SOURCES += trace-ust.c ust-registry.c ust-app.c \
index e725b7fcde6fa9d58e13bb4d9addac7b394de1f7..b2c36fc1d23ea92292816d7982d711ff8f415053 100644 (file)
@@ -77,6 +77,7 @@
 #include "syscall.h"
 #include "agent.h"
 #include "ht-cleanup.h"
+#include "sessiond-timer.h"
 
 #define CONSUMERD_FILE "lttng-consumerd"
 
@@ -217,6 +218,12 @@ static int apps_cmd_pipe[2] = { -1, -1 };
 
 int apps_cmd_notify_pipe[2] = { -1, -1 };
 
+/*
+ * Pipe to wakeup the rotation thread when the
+ * LTTNG_SESSIOND_SIG_ROTATE_PENDING signal is caught.
+ */
+static int rotate_timer_pipe[2] = { -1, -1 };
+
 /* Pthread, Mutexes and Semaphores */
 static pthread_t apps_thread;
 static pthread_t apps_notify_thread;
@@ -230,6 +237,7 @@ static pthread_t agent_reg_thread;
 static pthread_t load_session_thread;
 static pthread_t notification_thread;
 static pthread_t rotation_thread;
+static pthread_t timer_thread;
 
 /*
  * UST registration command queue. This queue is tied with a futex and uses a N
@@ -502,6 +510,9 @@ static void stop_threads(void)
        /* Dispatch thread */
        CMM_STORE_SHARED(dispatch_thread_exit, 1);
        futex_nto1_wake(&ust_cmd_queue.futex);
+
+       /* timer thread */
+       kill(getpid(), LTTNG_SESSIOND_SIG_EXIT);
 }
 
 /*
@@ -5714,6 +5725,7 @@ int main(int argc, char **argv)
                        *ust64_channel_monitor_pipe = NULL,
                        *kernel_channel_monitor_pipe = NULL;
        bool notification_thread_running = false;
+       bool timer_thread_running = false;
        struct lttng_pipe *ust32_channel_rotate_pipe = NULL,
                        *ust64_channel_rotate_pipe = NULL,
                        *kernel_channel_rotate_pipe = NULL;
@@ -6030,6 +6042,17 @@ int main(int argc, char **argv)
                goto exit_init_data;
        }
 
+       /*
+        * Create the rotate_timer_pipe as non blocking because we have to
+        * write in it from the sighandler.
+        */
+       ret = utils_create_pipe_cloexec_nonblock(rotate_timer_pipe);
+       if (ret < 0) {
+               ERR("Failed to create rotate pending pipe");
+               retval = -1;
+               goto exit_init_data;
+       }
+
        /* 64 bits consumerd path setup */
        ret = snprintf(ustconsumer64_data.err_unix_sock_path, PATH_MAX,
                        DEFAULT_USTCONSUMERD64_ERR_SOCK_PATH, rundir);
@@ -6272,7 +6295,8 @@ int main(int argc, char **argv)
                        ust32_channel_rotate_pipe,
                        ust64_channel_rotate_pipe,
                        kernel_channel_rotate_pipe,
-                       thread_quit_pipe[0]);
+                       thread_quit_pipe[0],
+                       rotate_timer_pipe[0]);
        if (!rotation_thread_handle) {
                retval = -1;
                ERR("Failed to create rotation thread shared data");
@@ -6280,6 +6304,18 @@ int main(int argc, char **argv)
                goto exit_rotation;
        }
 
+       /* Create timer thread. */
+       ret = pthread_create(&timer_thread, default_pthread_attr(),
+                       sessiond_timer_thread, NULL);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_create timer");
+               retval = -1;
+               stop_threads();
+               goto exit_notification;
+       }
+       timer_thread_running = true;
+
        /* Create rotation thread. */
        ret = pthread_create(&rotation_thread, default_pthread_attr(),
                        thread_rotation, rotation_thread_handle);
@@ -6509,6 +6545,16 @@ exit_init_data:
 
        if (rotation_thread_handle) {
                rotation_thread_handle_destroy(rotation_thread_handle);
+               ret = close(rotate_timer_pipe[0]);
+               if (ret < 0) {
+                       PERROR("Close rotate pending pipe");
+                       retval = -1;
+               }
+               ret = close(rotate_timer_pipe[1]);
+               if (ret < 0) {
+                       PERROR("Close rotate pending pipe");
+                       retval = -1;
+               }
        }
 
        ret = pthread_join(rotation_thread, &status);
@@ -6518,6 +6564,15 @@ exit_init_data:
                retval = -1;
        }
 
+       if (timer_thread_running) {
+               ret = pthread_join(timer_thread, &status);
+               if (ret) {
+                       errno = ret;
+                       PERROR("pthread_join timer thread");
+                       retval = -1;
+               }
+       }
+
        rcu_thread_offline();
        rcu_unregister_thread();
 
index 98406f4298fee1b55b91362e6714460d2c7f9495..9ce6f7e07229543c0fabc3b9126452571d4feb61 100644 (file)
@@ -340,4 +340,3 @@ int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
 end:
        return ret;
 }
-
index 24c9715a167e50c11cc0af54f8df51282ca9d7ce..118326dae99c80d745845733c602d6d601a59e33 100644 (file)
@@ -129,7 +129,7 @@ struct rotation_thread_handle *rotation_thread_handle_create(
                struct lttng_pipe *ust32_channel_rotate_pipe,
                struct lttng_pipe *ust64_channel_rotate_pipe,
                struct lttng_pipe *kernel_channel_rotate_pipe,
-               int thread_quit_pipe)
+               int thread_quit_pipe, int rotate_timer_pipe)
 {
        struct rotation_thread_handle *handle;
 
@@ -169,6 +169,7 @@ struct rotation_thread_handle *rotation_thread_handle_create(
                handle->kernel_consumer = -1;
        }
        handle->thread_quit_pipe = thread_quit_pipe;
+       handle->rotate_timer_pipe = rotate_timer_pipe;
 
 end:
        return handle;
@@ -184,13 +185,14 @@ int init_poll_set(struct lttng_poll_event *poll_set,
        int ret;
 
        /*
-        * Create pollset with size 4:
+        * Create pollset with size 5:
         *      - sessiond quit pipe
         *      - consumerd (32-bit user space) channel rotate pipe,
         *      - consumerd (64-bit user space) channel rotate pipe,
         *      - consumerd (kernel) channel rotate pipe.
+        *      - sessiond rotate pending pipe
         */
-       ret = lttng_poll_create(poll_set, 4, LTTNG_CLOEXEC);
+       ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
        if (ret < 0) {
                goto end;
        }
@@ -201,6 +203,12 @@ int init_poll_set(struct lttng_poll_event *poll_set,
                ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
                goto error;
        }
+       ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
+                       LPOLLIN | LPOLLERR);
+       if (ret < 0) {
+               ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
+               goto error;
+       }
        ret = lttng_poll_add(poll_set, handle->ust32_consumer,
                        LPOLLIN | LPOLLERR);
        if (ret < 0) {
@@ -334,6 +342,65 @@ end:
        return ret;
 }
 
+static
+int handle_rotate_timer_pipe(int fd, uint32_t revents,
+               struct rotation_thread_handle *handle,
+               struct rotation_thread_state *state)
+{
+       int ret = 0;
+       uint64_t session_id;
+       struct ltt_session *session;
+
+       if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+               ret = lttng_poll_del(&state->events, fd);
+               if (ret) {
+                       ERR("[rotation-thread] Failed to remove consumer "
+                                       "rotate pending pipe from poll set");
+               }
+               goto end;
+       }
+
+       do {
+               ret = read(fd, &session_id, sizeof(session_id));
+       } while (ret == -1 && errno == EINTR);
+       if (ret != sizeof(session_id)) {
+               ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
+                               fd);
+               ret = -1;
+               goto end;
+       }
+
+       session = session_find_by_id(session_id);
+       if (!session) {
+               ERR("[rotation-thread] Session %" PRIu64 " not found",
+                               session_id);
+               ret = -1;
+               goto end;
+       }
+
+       DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
+                       session_id);
+       ret = relay_rotate_pending(session, session->rotate_count - 1);
+       if (ret < 0) {
+               ERR("[rotation-thread] Check relay rotate pending");
+               goto end;
+       }
+       if (ret == 0) {
+               DBG("[rotation-thread] Rotation completed on the relay for "
+                               "session %" PRIu64, session_id);
+               /* TODO: delete timer */
+       } else if (ret == 1) {
+               DBG("[rotation-thread] Rotation still pending on the relay for "
+                               "session %" PRIu64, session_id);
+       }
+       fprintf(stderr, "RET PENDING: %d\n", ret);
+
+       ret = 0;
+
+end:
+       return ret;
+}
+
 void *thread_rotation(void *data)
 {
        int ret;
@@ -391,6 +458,13 @@ void *thread_rotation(void *data)
                        if (fd == handle->thread_quit_pipe) {
                                DBG("[rotation-thread] Quit pipe activity");
                                goto exit;
+                       } else if (fd == handle->rotate_timer_pipe) {
+                               ret = handle_rotate_timer_pipe(fd, revents,
+                                               handle, &state);
+                               if (ret) {
+                                       ERR("[rotation-thread] Rotate pending");
+                                       goto error;
+                               }
                        } else if (fd == handle->ust32_consumer ||
                                        fd == handle->ust64_consumer ||
                                        fd == handle->kernel_consumer) {
index 580ddbbb4f07054301773fe1e59d3de1f5070f22..762ef0c0de1705b5412f6cfc7e36bfba37344044 100644 (file)
@@ -51,6 +51,7 @@ struct rotation_thread_handle {
        int ust64_consumer;
        int kernel_consumer;
        int thread_quit_pipe;
+       int rotate_timer_pipe;
 };
 
 struct rotation_thread_state {
@@ -62,7 +63,7 @@ struct rotation_thread_handle *rotation_thread_handle_create(
                struct lttng_pipe *ust32_channel_rotate_pipe,
                struct lttng_pipe *ust64_channel_rotate_pipe,
                struct lttng_pipe *kernel_channel_rotate_pipe,
-               int thread_quit_pipe);
+               int thread_quit_pipe, int rotate_timer_pipe);
 
 void rotation_thread_handle_destroy(
                struct rotation_thread_handle *handle);
index 25f0927f7c0879e42a2b8e14d8c196fd715e5854..5edd962688df79ce3188d48a8a6ee4b8444e02e6 100644 (file)
@@ -161,6 +161,12 @@ struct ltt_session {
        time_t current_chunk_start_ts;
        time_t session_last_stop_ts;
        time_t last_begin_rotation_ts;
+       /*
+        * Timer to check periodically if a relay has completed the last
+        * rotation.
+        */
+       int rotate_relay_pending_timer_enabled;
+       timer_t rotate_relay_pending_timer;
 };
 
 /* Prototypes */
diff --git a/src/bin/lttng-sessiond/sessiond-timer.c b/src/bin/lttng-sessiond/sessiond-timer.c
new file mode 100644 (file)
index 0000000..bf27a9e
--- /dev/null
@@ -0,0 +1,275 @@
+/*
+ * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License, version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * this program; if not, write to the Free Software Foundation, Inc., 51
+ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#define _LGPL_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <signal.h>
+
+#include "sessiond-timer.h"
+#include "health-sessiond.h"
+
+#if 0
+#include <bin/lttng-sessiond/ust-ctl.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
+#include <common/common.h>
+#include <common/compat/endian.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/consumer/consumer-stream.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-testpoint.h>
+#include <common/ust-consumer/ust-consumer.h>
+#endif
+
+static struct timer_signal_data timer_signal = {
+       .tid = 0,
+       .setup_done = 0,
+       .qs_done = 0,
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
+
+/*
+ * Set custom signal mask to current thread.
+ */
+static void setmask(sigset_t *mask)
+{
+       int ret;
+
+       ret = sigemptyset(mask);
+       if (ret) {
+               PERROR("sigemptyset");
+       }
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN);
+       if (ret) {
+               PERROR("sigaddset teardown");
+       }
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_EXIT);
+       if (ret) {
+               PERROR("sigaddset exit");
+       }
+       ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       if (ret) {
+               PERROR("sigaddset switch");
+       }
+}
+
+static
+void sessiond_timer_signal_thread_qs(unsigned int signr)
+{
+       sigset_t pending_set;
+       int ret;
+
+       /*
+        * We need to be the only thread interacting with the thread
+        * that manages signals for teardown synchronization.
+        */
+       pthread_mutex_lock(&timer_signal.lock);
+
+       /* Ensure we don't have any signal queued for this session. */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, signr)) {
+                       break;
+               }
+               caa_cpu_relax();
+       }
+
+       /*
+        * From this point, no new signal handler will be fired that would try to
+        * access "session". However, we still need to wait for any currently
+        * executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread wakes
+        * up.
+        */
+       kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
+               caa_cpu_relax();
+       }
+       cmm_smp_mb();
+
+       pthread_mutex_unlock(&timer_signal.lock);
+}
+
+/*
+ * Start a timer on a session that will fire at a given interval
+ * (timer_interval_us) and fire a given signal (signal).
+ *
+ * Returns a negative value on error, 0 if a timer was created, and
+ * a positive value if no timer was created (not an error).
+ */
+static
+int session_timer_start(timer_t *timer_id, struct ltt_session *session,
+               unsigned int timer_interval_us, int signal)
+{
+       int ret = 0, delete_ret;
+       struct sigevent sev;
+       struct itimerspec its;
+
+       assert(session);
+
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = signal;
+       sev.sigev_value.sival_ptr = session;
+       ret = timer_create(CLOCKID, &sev, timer_id);
+       if (ret == -1) {
+               PERROR("timer_create");
+               goto end;
+       }
+
+       its.it_value.tv_sec = timer_interval_us / 1000000;
+       its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+       ret = timer_settime(*timer_id, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+               goto error_destroy_timer;
+       }
+       goto end;
+
+error_destroy_timer:
+       delete_ret = timer_delete(*timer_id);
+       if (delete_ret == -1) {
+               PERROR("timer_delete");
+       }
+
+end:
+       return ret;
+}
+
+
+static
+int session_timer_stop(timer_t *timer_id, int signal)
+{
+       int ret = 0;
+
+       ret = timer_delete(*timer_id);
+       if (ret == -1) {
+               PERROR("timer_delete");
+               goto end;
+       }
+
+       sessiond_timer_signal_thread_qs(signal);
+       *timer_id = 0;
+end:
+       return ret;
+}
+
+int rotate_pending_timer_start(struct ltt_session *session, unsigned int
+               interval_us)
+{
+       int ret;
+
+       ret = session_timer_start(&session->rotate_relay_pending_timer,
+                       session, interval_us,
+                       LTTNG_SESSIOND_SIG_ROTATE_PENDING);
+       session->rotate_relay_pending_timer_enabled = !!(ret == 0);
+
+       return ret;
+}
+
+/*
+ * Block the RT signals for the entire process. It must be called from the
+ * sessiond main before creating the threads
+ */
+int sessiond_timer_signal_init(void)
+{
+       int ret;
+       sigset_t mask;
+
+       /* Block signal for entire process, so only our thread processes it. */
+       setmask(&mask);
+       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_sigmask");
+               return -1;
+       }
+       return 0;
+}
+
+/*
+ * This thread is the sighandler for the timer signals.
+ */
+void *sessiond_timer_thread(void *data)
+{
+       int signr;
+       sigset_t mask;
+       siginfo_t info;
+
+       rcu_register_thread();
+       rcu_thread_online();
+
+       health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
+
+       health_code_update();
+
+       /* Only self thread will receive signal mask. */
+       setmask(&mask);
+       CMM_STORE_SHARED(timer_signal.tid, pthread_self());
+
+       while (1) {
+               health_code_update();
+
+               health_poll_entry();
+               signr = sigwaitinfo(&mask, &info);
+               health_poll_exit();
+
+               /*
+                * NOTE: cascading conditions are used instead of a switch case
+                * since the use of SIGRTMIN in the definition of the signals'
+                * values prevents the reduction to an integer constant.
+                */
+               if (signr == -1) {
+                       if (errno != EINTR) {
+                               PERROR("sigwaitinfo");
+                       }
+                       continue;
+               } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) {
+                       cmm_smp_mb();
+                       CMM_STORE_SHARED(timer_signal.qs_done, 1);
+                       cmm_smp_mb();
+                       DBG("Signal timer metadata thread teardown");
+               } else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
+                       goto end;
+               } else {
+                       ERR("Unexpected signal %d\n", info.si_signo);
+               }
+       }
+
+end:
+       health_unregister(health_sessiond);
+       rcu_thread_offline();
+       rcu_unregister_thread();
+       return NULL;
+}
diff --git a/src/bin/lttng-sessiond/sessiond-timer.h b/src/bin/lttng-sessiond/sessiond-timer.h
new file mode 100644 (file)
index 0000000..681e154
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License, version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef SESSIOND_TIMER_H
+#define SESSIOND_TIMER_H
+
+#include <pthread.h>
+
+#include "session.h"
+
+#define LTTNG_SESSIOND_SIG_TEARDOWN            SIGRTMIN + 10
+#define LTTNG_SESSIOND_SIG_EXIT                        SIGRTMIN + 11
+#define LTTNG_SESSIOND_SIG_ROTATE_PENDING      SIGRTMIN + 12
+
+#define CLOCKID CLOCK_MONOTONIC
+
+/*
+ * Handle timer teardown race wrt memory free of private data by sessiond
+ * signals are handled by a single thread, which permits a synchronization
+ * point between handling of each signal. Internal lock ensures mutual
+ * exclusion.
+ */
+struct timer_signal_data {
+       pthread_t tid;  /* thread id managing signals */
+       int setup_done;
+       int qs_done;
+       pthread_mutex_t lock;
+};
+
+void *sessiond_timer_thread(void *data);
+int sessiond_timer_signal_init(void);
+
+int rotate_pending_timer_start(struct ltt_session *session, unsigned int
+               interval_us);
+
+#endif /* SESSIOND_TIMER_H */
This page took 0.036845 seconds and 5 git commands to generate.