CUSTOM: liver timer: immediate liver timer control on data pending and destroy
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
9304a209
JG
34typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
40
2b8f8754
MD
41static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46};
331744e3
JD
47
48/*
49 * Set custom signal mask to current thread.
50 */
51static void setmask(sigset_t *mask)
52{
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
d3e2ba59 61 PERROR("sigaddset switch");
331744e3
JD
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
d3e2ba59
JD
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
331744e3 70 }
5383626c
MD
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
72 if (ret) {
73 PERROR("sigaddset exit");
74 }
331744e3
JD
75}
76
77/*
78 * Execute action on a timer switch.
d98a47c7
MD
79 *
80 * Beware: metadata_switch_timer() should *never* take a mutex also held
81 * while consumer_timer_switch_stop() is called. It would result in
82 * deadlocks.
331744e3
JD
83 */
84static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
85 int sig, siginfo_t *si, void *uc)
86{
87 int ret;
88 struct lttng_consumer_channel *channel;
89
90 channel = si->si_value.sival_ptr;
91 assert(channel);
92
4419b4fb
MD
93 if (channel->switch_timer_error) {
94 return;
95 }
96
331744e3
JD
97 DBG("Switch timer for channel %" PRIu64, channel->key);
98 switch (ctx->type) {
99 case LTTNG_CONSUMER32_UST:
100 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
101 /*
102 * Locks taken by lttng_ustconsumer_request_metadata():
103 * - metadata_socket_lock
104 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 105 * - channel->metadata_cache->lock
4fa3dc0e 106 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
107 * - channel->timer_lock
108 * - channel->metadata_cache->lock
4fa3dc0e 109 *
5e41ebe1
MD
110 * Ensure that neither consumer_data.lock nor
111 * channel->lock are taken within this function, since
112 * they are held while consumer_timer_switch_stop() is
113 * called.
4fa3dc0e 114 */
94d49140 115 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 116 if (ret < 0) {
4419b4fb 117 channel->switch_timer_error = 1;
331744e3
JD
118 }
119 break;
120 case LTTNG_CONSUMER_KERNEL:
121 case LTTNG_CONSUMER_UNKNOWN:
122 assert(0);
123 break;
124 }
125}
126
528f2ffa
JD
127static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
128 uint64_t stream_id)
d3e2ba59
JD
129{
130 int ret;
50adc264 131 struct ctf_packet_index index;
d3e2ba59
JD
132
133 memset(&index, 0, sizeof(index));
528f2ffa 134 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
135 index.timestamp_end = htobe64(ts);
136 ret = consumer_stream_write_index(stream, &index);
137 if (ret < 0) {
138 goto error;
139 }
140
141error:
142 return ret;
143}
144
c585821b 145int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 146{
528f2ffa 147 uint64_t ts, stream_id;
d3e2ba59
JD
148 int ret;
149
d3e2ba59
JD
150 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
151 if (ret < 0) {
152 ERR("Failed to get the current timestamp");
c585821b 153 goto end;
d3e2ba59
JD
154 }
155 ret = kernctl_buffer_flush(stream->wait_fd);
156 if (ret < 0) {
157 ERR("Failed to flush kernel stream");
c585821b 158 goto end;
d3e2ba59
JD
159 }
160 ret = kernctl_snapshot(stream->wait_fd);
161 if (ret < 0) {
32af2c95 162 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 163 PERROR("live timer kernel snapshot");
d3e2ba59 164 ret = -1;
c585821b 165 goto end;
d3e2ba59 166 }
528f2ffa
JD
167 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
168 if (ret < 0) {
169 PERROR("kernctl_get_stream_id");
c585821b 170 goto end;
528f2ffa 171 }
d3e2ba59 172 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 173 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 174 if (ret < 0) {
c585821b 175 goto end;
d3e2ba59
JD
176 }
177 }
178 ret = 0;
c585821b 179end:
d3e2ba59
JD
180 return ret;
181}
182
9304a209
JG
183static int check_stream(struct lttng_consumer_stream *stream,
184 flush_index_cb flush_index)
d3e2ba59 185{
d3e2ba59
JD
186 int ret;
187
d3e2ba59
JD
188 /*
189 * While holding the stream mutex, try to take a snapshot, if it
190 * succeeds, it means that data is ready to be sent, just let the data
191 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
192 * means that there is no data to read after the flush, so we can
193 * safely send the empty index.
c585821b
MD
194 *
195 * Doing a trylock and checking if waiting on metadata if
196 * trylock fails. Bail out of the stream is indeed waiting for
197 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 198 */
c585821b
MD
199 for (;;) {
200 ret = pthread_mutex_trylock(&stream->lock);
201 switch (ret) {
202 case 0:
203 break; /* We have the lock. */
204 case EBUSY:
205 pthread_mutex_lock(&stream->metadata_timer_lock);
206 if (stream->waiting_on_metadata) {
207 ret = 0;
208 stream->missed_metadata_flush = true;
209 pthread_mutex_unlock(&stream->metadata_timer_lock);
210 goto end; /* Bail out. */
211 }
212 pthread_mutex_unlock(&stream->metadata_timer_lock);
213 /* Try again. */
214 caa_cpu_relax();
215 continue;
216 default:
217 ERR("Unexpected pthread_mutex_trylock error %d", ret);
218 ret = -1;
219 goto end;
220 }
221 break;
222 }
9304a209 223 ret = flush_index(stream);
c585821b
MD
224 pthread_mutex_unlock(&stream->lock);
225end:
226 return ret;
227}
228
229int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
230{
231 uint64_t ts, stream_id;
232 int ret;
233
94d49140
JD
234 ret = cds_lfht_is_node_deleted(&stream->node.node);
235 if (ret) {
c585821b 236 goto end;
94d49140
JD
237 }
238
84a182ce 239 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
240 if (ret < 0) {
241 ERR("Failed to get the current timestamp");
c585821b 242 goto end;
d3e2ba59 243 }
84a182ce
DG
244 lttng_ustconsumer_flush_buffer(stream, 1);
245 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 246 if (ret < 0) {
94d49140 247 if (ret != -EAGAIN) {
d3e2ba59
JD
248 ERR("Taking UST snapshot");
249 ret = -1;
c585821b 250 goto end;
d3e2ba59 251 }
70190e1c 252 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
253 if (ret < 0) {
254 PERROR("ustctl_get_stream_id");
c585821b 255 goto end;
528f2ffa 256 }
d3e2ba59 257 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 258 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 259 if (ret < 0) {
c585821b 260 goto end;
d3e2ba59
JD
261 }
262 }
263 ret = 0;
c585821b
MD
264end:
265 return ret;
266}
d3e2ba59 267
d3e2ba59
JD
268/*
269 * Execute action on a live timer
270 */
271static void live_timer(struct lttng_consumer_local_data *ctx,
272 int sig, siginfo_t *si, void *uc)
273{
274 int ret;
275 struct lttng_consumer_channel *channel;
276 struct lttng_consumer_stream *stream;
d3e2ba59 277 struct lttng_ht_iter iter;
9304a209
JG
278 const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
279 const flush_index_cb flush_index =
280 ctx->type == LTTNG_CONSUMER_KERNEL ?
281 consumer_flush_kernel_index :
282 consumer_flush_ust_index;
d3e2ba59
JD
283
284 channel = si->si_value.sival_ptr;
285 assert(channel);
286
287 if (channel->switch_timer_error) {
288 goto error;
289 }
d3e2ba59
JD
290
291 DBG("Live timer for channel %" PRIu64, channel->key);
d60962b2
JR
292 if (channel->live_timer_enabled != 1) {
293 DBG("Liver timer was stopped before the iteration. Quitting early.");
294 goto skip;
295 }
d3e2ba59
JD
296
297 rcu_read_lock();
9304a209
JG
298 cds_lfht_for_each_entry_duplicate(ht->ht,
299 ht->hash_fct(&channel->key, lttng_ht_seed),
300 ht->match_fct, &channel->key, &iter.iter,
301 stream, node_channel_id.node) {
d60962b2
JR
302 if (channel->live_timer_enabled != 1) {
303 DBG("Liver timer was stopped during the iteration. Quitting early.");
304 goto skip_unlock;
305 }
306
9304a209
JG
307 ret = check_stream(stream, flush_index);
308 if (ret < 0) {
309 goto error_unlock;
d3e2ba59 310 }
d3e2ba59
JD
311 }
312
d60962b2 313skip_unlock:
d3e2ba59
JD
314error_unlock:
315 rcu_read_unlock();
316
d60962b2 317skip:
d3e2ba59
JD
318error:
319 return;
320}
321
2b8f8754
MD
322static
323void consumer_timer_signal_thread_qs(unsigned int signr)
324{
325 sigset_t pending_set;
326 int ret;
327
328 /*
329 * We need to be the only thread interacting with the thread
330 * that manages signals for teardown synchronization.
331 */
332 pthread_mutex_lock(&timer_signal.lock);
333
334 /* Ensure we don't have any signal queued for this channel. */
335 for (;;) {
336 ret = sigemptyset(&pending_set);
337 if (ret == -1) {
338 PERROR("sigemptyset");
339 }
340 ret = sigpending(&pending_set);
341 if (ret == -1) {
342 PERROR("sigpending");
343 }
6f7cc0be 344 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
345 break;
346 }
347 caa_cpu_relax();
348 }
349
350 /*
351 * From this point, no new signal handler will be fired that would try to
352 * access "chan". However, we still need to wait for any currently
353 * executing handler to complete.
354 */
355 cmm_smp_mb();
356 CMM_STORE_SHARED(timer_signal.qs_done, 0);
357 cmm_smp_mb();
358
359 /*
360 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
361 * up.
362 */
363 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
364
365 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
366 caa_cpu_relax();
367 }
368 cmm_smp_mb();
369
370 pthread_mutex_unlock(&timer_signal.lock);
371}
372
331744e3
JD
373/*
374 * Set the timer for periodical metadata flush.
375 */
376void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
377 unsigned int switch_timer_interval)
378{
379 int ret;
380 struct sigevent sev;
381 struct itimerspec its;
382
383 assert(channel);
384 assert(channel->key);
385
386 if (switch_timer_interval == 0) {
387 return;
388 }
389
390 sev.sigev_notify = SIGEV_SIGNAL;
391 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
392 sev.sigev_value.sival_ptr = channel;
393 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
394 if (ret == -1) {
395 PERROR("timer_create");
396 }
397 channel->switch_timer_enabled = 1;
398
399 its.it_value.tv_sec = switch_timer_interval / 1000000;
69f60d21 400 its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
331744e3
JD
401 its.it_interval.tv_sec = its.it_value.tv_sec;
402 its.it_interval.tv_nsec = its.it_value.tv_nsec;
403
404 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
405 if (ret == -1) {
406 PERROR("timer_settime");
407 }
408}
409
410/*
411 * Stop and delete timer.
412 */
413void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
414{
415 int ret;
331744e3
JD
416
417 assert(channel);
418
419 ret = timer_delete(channel->switch_timer);
420 if (ret == -1) {
421 PERROR("timer_delete");
422 }
423
2b8f8754 424 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 425
2b8f8754
MD
426 channel->switch_timer = 0;
427 channel->switch_timer_enabled = 0;
331744e3
JD
428}
429
d3e2ba59
JD
430/*
431 * Set the timer for the live mode.
432 */
433void consumer_timer_live_start(struct lttng_consumer_channel *channel,
434 int live_timer_interval)
435{
436 int ret;
437 struct sigevent sev;
438 struct itimerspec its;
439
440 assert(channel);
441 assert(channel->key);
442
fac41e72 443 if (live_timer_interval <= 0) {
d3e2ba59
JD
444 return;
445 }
446
447 sev.sigev_notify = SIGEV_SIGNAL;
448 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
449 sev.sigev_value.sival_ptr = channel;
450 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
451 if (ret == -1) {
452 PERROR("timer_create");
453 }
454 channel->live_timer_enabled = 1;
455
456 its.it_value.tv_sec = live_timer_interval / 1000000;
69f60d21 457 its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
d3e2ba59
JD
458 its.it_interval.tv_sec = its.it_value.tv_sec;
459 its.it_interval.tv_nsec = its.it_value.tv_nsec;
460
461 ret = timer_settime(channel->live_timer, 0, &its, NULL);
462 if (ret == -1) {
463 PERROR("timer_settime");
464 }
465}
466
467/*
468 * Stop and delete timer.
469 */
470void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
471{
472 int ret;
473
474 assert(channel);
475
d60962b2
JR
476 if (!channel->live_timer) {
477 return;
478 }
479
d3e2ba59
JD
480 ret = timer_delete(channel->live_timer);
481 if (ret == -1) {
482 PERROR("timer_delete");
483 }
484
d3e2ba59
JD
485 channel->live_timer = 0;
486 channel->live_timer_enabled = 0;
d60962b2
JR
487
488 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59
JD
489}
490
331744e3
JD
491/*
492 * Block the RT signals for the entire process. It must be called from the
493 * consumer main before creating the threads
494 */
73664f81 495int consumer_signal_init(void)
331744e3
JD
496{
497 int ret;
498 sigset_t mask;
499
500 /* Block signal for entire process, so only our thread processes it. */
501 setmask(&mask);
502 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
503 if (ret) {
504 errno = ret;
505 PERROR("pthread_sigmask");
73664f81 506 return -1;
331744e3 507 }
73664f81 508 return 0;
331744e3
JD
509}
510
511/*
d3e2ba59 512 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
5383626c
MD
513 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
514 * LTTNG_CONSUMER_SIG_EXIT.
331744e3 515 */
d3e2ba59 516void *consumer_timer_thread(void *data)
331744e3
JD
517{
518 int signr;
519 sigset_t mask;
520 siginfo_t info;
521 struct lttng_consumer_local_data *ctx = data;
522
8a9acb74
MD
523 rcu_register_thread();
524
1fc79fb4
MD
525 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
526
2d57de81
MD
527 if (testpoint(consumerd_thread_metadata_timer)) {
528 goto error_testpoint;
529 }
530
9ce5646a
MD
531 health_code_update();
532
331744e3
JD
533 /* Only self thread will receive signal mask. */
534 setmask(&mask);
535 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
536
537 while (1) {
9ce5646a
MD
538 health_code_update();
539
540 health_poll_entry();
331744e3 541 signr = sigwaitinfo(&mask, &info);
9ce5646a 542 health_poll_exit();
331744e3
JD
543 if (signr == -1) {
544 if (errno != EINTR) {
545 PERROR("sigwaitinfo");
546 }
547 continue;
548 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
549 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
550 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
551 cmm_smp_mb();
552 CMM_STORE_SHARED(timer_signal.qs_done, 1);
553 cmm_smp_mb();
554 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
555 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
556 live_timer(ctx, info.si_signo, &info, NULL);
5383626c
MD
557 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
558 assert(consumer_quit);
559 goto end;
331744e3
JD
560 } else {
561 ERR("Unexpected signal %d\n", info.si_signo);
562 }
563 }
564
2d57de81
MD
565error_testpoint:
566 /* Only reached in testpoint error */
567 health_error();
5383626c 568end:
1fc79fb4 569 health_unregister(health_consumerd);
8a9acb74 570 rcu_unregister_thread();
331744e3
JD
571 return NULL;
572}
This page took 0.080486 seconds and 5 git commands to generate.