Relay rotate pending command
[lttng-tools.git] / src / bin / lttng-sessiond / sessiond-timer.c
1 /*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <inttypes.h>
21 #include <signal.h>
22
23 #include "sessiond-timer.h"
24 #include "health-sessiond.h"
25 #include "rotation-thread.h"
26
27 static
28 struct timer_signal_data timer_signal = {
29 .tid = 0,
30 .qs_done = 0,
31 .lock = PTHREAD_MUTEX_INITIALIZER,
32 };
33
34 /*
35 * Set custom signal mask to current thread.
36 */
37 static
38 void setmask(sigset_t *mask)
39 {
40 int ret;
41
42 ret = sigemptyset(mask);
43 if (ret) {
44 PERROR("sigemptyset");
45 }
46 ret = sigaddset(mask, LTTNG_SESSIOND_SIG_TEARDOWN);
47 if (ret) {
48 PERROR("sigaddset teardown");
49 }
50 ret = sigaddset(mask, LTTNG_SESSIOND_SIG_EXIT);
51 if (ret) {
52 PERROR("sigaddset exit");
53 }
54 ret = sigaddset(mask, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
55 if (ret) {
56 PERROR("sigaddset switch");
57 }
58 }
59
60 /*
61 * This is the same function as consumer_timer_signal_thread_qs, when it
62 * returns, it means that no timer signr is currently pending or being handled
63 * by the timer thread. This cannot be called from the timer thread.
64 */
65 static
66 void sessiond_timer_signal_thread_qs(unsigned int signr)
67 {
68 sigset_t pending_set;
69 int ret;
70
71 /*
72 * We need to be the only thread interacting with the thread
73 * that manages signals for teardown synchronization.
74 */
75 pthread_mutex_lock(&timer_signal.lock);
76
77 /* Ensure we don't have any signal queued for this session. */
78 for (;;) {
79 ret = sigemptyset(&pending_set);
80 if (ret == -1) {
81 PERROR("sigemptyset");
82 }
83 ret = sigpending(&pending_set);
84 if (ret == -1) {
85 PERROR("sigpending");
86 }
87 if (!sigismember(&pending_set, signr)) {
88 break;
89 }
90 caa_cpu_relax();
91 }
92
93 /*
94 * From this point, no new signal handler will be fired that would try to
95 * access "session". However, we still need to wait for any currently
96 * executing handler to complete.
97 */
98 cmm_smp_mb();
99 CMM_STORE_SHARED(timer_signal.qs_done, 0);
100 cmm_smp_mb();
101
102 /*
103 * Kill with LTTNG_SESSIOND_SIG_TEARDOWN, so signal management thread
104 * wakes up.
105 */
106 kill(getpid(), LTTNG_SESSIOND_SIG_TEARDOWN);
107
108 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
109 caa_cpu_relax();
110 }
111 cmm_smp_mb();
112
113 pthread_mutex_unlock(&timer_signal.lock);
114 }
115
116 /*
117 * Start a timer on a session that will fire at a given interval
118 * (timer_interval_us) and fire a given signal (signal).
119 *
120 * Returns a negative value on error, 0 if a timer was created, and
121 * a positive value if no timer was created (not an error).
122 */
123 static
124 int session_timer_start(timer_t *timer_id, struct ltt_session *session,
125 unsigned int timer_interval_us, int signal, bool one_shot)
126 {
127 int ret = 0, delete_ret;
128 struct sigevent sev;
129 struct itimerspec its;
130
131 assert(session);
132
133 sev.sigev_notify = SIGEV_SIGNAL;
134 sev.sigev_signo = signal;
135 sev.sigev_value.sival_ptr = session;
136 ret = timer_create(CLOCKID, &sev, timer_id);
137 if (ret == -1) {
138 PERROR("timer_create");
139 goto end;
140 }
141
142 its.it_value.tv_sec = timer_interval_us / 1000000;
143 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
144 if (one_shot) {
145 its.it_interval.tv_sec = 0;
146 its.it_interval.tv_nsec = 0;
147 } else {
148 its.it_interval.tv_sec = its.it_value.tv_sec;
149 its.it_interval.tv_nsec = its.it_value.tv_nsec;
150 }
151
152 ret = timer_settime(*timer_id, 0, &its, NULL);
153 if (ret == -1) {
154 PERROR("timer_settime");
155 goto error_destroy_timer;
156 }
157 goto end;
158
159 error_destroy_timer:
160 delete_ret = timer_delete(*timer_id);
161 if (delete_ret == -1) {
162 PERROR("timer_delete");
163 }
164
165 end:
166 return ret;
167 }
168
169 static
170 int session_timer_stop(timer_t *timer_id, int signal)
171 {
172 int ret = 0;
173
174 ret = timer_delete(*timer_id);
175 if (ret == -1) {
176 PERROR("timer_delete");
177 goto end;
178 }
179
180 sessiond_timer_signal_thread_qs(signal);
181 *timer_id = 0;
182 end:
183 return ret;
184 }
185
186 int sessiond_timer_rotate_pending_start(struct ltt_session *session,
187 unsigned int interval_us)
188 {
189 int ret;
190
191 DBG("Enabling rotate pending timer on session %" PRIu64, session->id);
192 /*
193 * We arm this timer in a one-shot mode so we don't have to disable it
194 * explicitly (which could deadlock if the timer thread is blocked writing
195 * in the rotation_timer_pipe).
196 * Instead, we re-arm it if needed after the rotation_pending check as
197 * returned. Also, this timer is usually only needed once, so there is no
198 * need to go through the whole signal teardown scheme everytime.
199 */
200 ret = session_timer_start(&session->rotate_relay_pending_timer,
201 session, interval_us,
202 LTTNG_SESSIOND_SIG_ROTATE_PENDING,
203 /* one-shot */ true);
204 if (ret == 0) {
205 session->rotate_relay_pending_timer_enabled = true;
206 }
207
208 return ret;
209 }
210
211 /*
212 * Stop and delete the channel's live timer.
213 * Called with session and session_list locks held.
214 */
215 void sessiond_timer_rotate_pending_stop(struct ltt_session *session)
216 {
217 int ret;
218
219 assert(session);
220
221 DBG("Disabling timer rotate pending on session %" PRIu64, session->id);
222 ret = session_timer_stop(&session->rotate_relay_pending_timer,
223 LTTNG_SESSIOND_SIG_ROTATE_PENDING);
224 if (ret == -1) {
225 ERR("Failed to stop rotate_pending timer");
226 }
227
228 session->rotate_relay_pending_timer_enabled = false;
229 }
230
231 /*
232 * Block the RT signals for the entire process. It must be called from the
233 * sessiond main before creating the threads
234 */
235 int sessiond_timer_signal_init(void)
236 {
237 int ret;
238 sigset_t mask;
239
240 /* Block signal for entire process, so only our thread processes it. */
241 setmask(&mask);
242 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
243 if (ret) {
244 errno = ret;
245 PERROR("pthread_sigmask");
246 return -1;
247 }
248 return 0;
249 }
250
251 /*
252 * Called with the rotation_timer_queue lock held.
253 * Return true if the same timer job already exists in the queue, false if not.
254 */
255 static
256 bool check_duplicate_timer_job(struct timer_thread_parameters *ctx,
257 struct ltt_session *session, unsigned int signal)
258 {
259 bool ret = false;
260 struct sessiond_rotation_timer *node;
261
262 rcu_read_lock();
263 cds_list_for_each_entry(node, &ctx->rotation_timer_queue->list, head) {
264 if (node->session_id == session->id && node->signal == signal) {
265 ret = true;
266 goto end;
267 }
268 }
269
270 end:
271 rcu_read_unlock();
272 return ret;
273 }
274
275 /*
276 * Add the session ID and signal value to the rotation_timer_queue if it is
277 * not already there and wakeup the rotation thread. The rotation thread
278 * empties the whole queue everytime it is woken up. The event_pipe is
279 * non-blocking, if it would block, we just return because we know the
280 * rotation thread will be awaken anyway.
281 */
282 static
283 int enqueue_timer_rotate_job(struct timer_thread_parameters *ctx,
284 struct ltt_session *session, unsigned int signal)
285 {
286 int ret;
287 bool has_duplicate_timer_job;
288 char *c = "!";
289
290 pthread_mutex_lock(&ctx->rotation_timer_queue->lock);
291 has_duplicate_timer_job = check_duplicate_timer_job(ctx, session,
292 signal);
293
294 if (!has_duplicate_timer_job) {
295 struct sessiond_rotation_timer *timer_data = NULL;
296
297 timer_data = zmalloc(sizeof(struct sessiond_rotation_timer));
298 if (!timer_data) {
299 PERROR("Allocation of timer data");
300 goto error;
301 }
302 timer_data->session_id = session->id;
303 timer_data->signal = signal;
304 cds_list_add_tail(&timer_data->head,
305 &ctx->rotation_timer_queue->list);
306 } else {
307 /*
308 * This timer job is already pending, we don't need to add
309 * it.
310 */
311 pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
312 ret = 0;
313 goto end;
314 }
315 pthread_mutex_unlock(&ctx->rotation_timer_queue->lock);
316
317 ret = lttng_write(
318 lttng_pipe_get_writefd(ctx->rotation_timer_queue->event_pipe),
319 c, 1);
320 if (ret < 0) {
321 /*
322 * We do not want to block in the timer handler, the job has been
323 * enqueued in the list, the wakeup pipe is probably full, the job
324 * will be processed when the rotation_thread catches up.
325 */
326 if (errno == EAGAIN || errno == EWOULDBLOCK) {
327 ret = 0;
328 goto end;
329 }
330 PERROR("Timer wakeup rotation thread");
331 goto error;
332 }
333
334 ret = 0;
335 goto end;
336
337 error:
338 ret = -1;
339 end:
340 return ret;
341 }
342
343 /*
344 * Ask the rotation thread to check if the last rotation started in this
345 * session is still pending on the relay.
346 */
347 static
348 void relay_rotation_pending_timer(struct timer_thread_parameters *ctx,
349 int sig, siginfo_t *si)
350 {
351 int ret;
352 struct ltt_session *session = si->si_value.sival_ptr;
353 assert(session);
354
355 ret = enqueue_timer_rotate_job(ctx, session, LTTNG_SESSIOND_SIG_ROTATE_PENDING);
356 if (ret) {
357 PERROR("wakeup rotate pipe");
358 }
359 }
360
361 /*
362 * This thread is the sighandler for the timer signals.
363 */
364 void *sessiond_timer_thread(void *data)
365 {
366 int signr;
367 sigset_t mask;
368 siginfo_t info;
369 struct timer_thread_parameters *ctx = data;
370
371 rcu_register_thread();
372 rcu_thread_online();
373
374 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_TIMER);
375
376 health_code_update();
377
378 /* Only self thread will receive signal mask. */
379 setmask(&mask);
380 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
381
382 while (1) {
383 health_code_update();
384
385 health_poll_entry();
386 signr = sigwaitinfo(&mask, &info);
387 health_poll_exit();
388
389 /*
390 * NOTE: cascading conditions are used instead of a switch case
391 * since the use of SIGRTMIN in the definition of the signals'
392 * values prevents the reduction to an integer constant.
393 */
394 if (signr == -1) {
395 if (errno != EINTR) {
396 PERROR("sigwaitinfo");
397 }
398 continue;
399 } else if (signr == LTTNG_SESSIOND_SIG_TEARDOWN) {
400 cmm_smp_mb();
401 CMM_STORE_SHARED(timer_signal.qs_done, 1);
402 cmm_smp_mb();
403 DBG("Signal timer metadata thread teardown");
404 } else if (signr == LTTNG_SESSIOND_SIG_EXIT) {
405 goto end;
406 } else if (signr == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
407 relay_rotation_pending_timer(ctx, info.si_signo, &info);
408 } else {
409 ERR("Unexpected signal %d\n", info.si_signo);
410 }
411 }
412
413 end:
414 DBG("[timer-thread] Exit");
415 health_unregister(health_sessiond);
416 rcu_thread_offline();
417 rcu_unregister_thread();
418 return NULL;
419 }
This page took 0.039821 seconds and 6 git commands to generate.