Fix: There is more tests than the plan
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
29#include <common/consumer-stream.h>
331744e3
JD
30
31#include "consumer-timer.h"
2d57de81 32#include "consumer-testpoint.h"
331744e3
JD
33#include "ust-consumer/ust-consumer.h"
34
2b8f8754
MD
35static struct timer_signal_data timer_signal = {
36 .tid = 0,
37 .setup_done = 0,
38 .qs_done = 0,
39 .lock = PTHREAD_MUTEX_INITIALIZER,
40};
331744e3
JD
41
42/*
43 * Set custom signal mask to current thread.
44 */
45static void setmask(sigset_t *mask)
46{
47 int ret;
48
49 ret = sigemptyset(mask);
50 if (ret) {
51 PERROR("sigemptyset");
52 }
53 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
54 if (ret) {
d3e2ba59 55 PERROR("sigaddset switch");
331744e3
JD
56 }
57 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
58 if (ret) {
d3e2ba59
JD
59 PERROR("sigaddset teardown");
60 }
61 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
62 if (ret) {
63 PERROR("sigaddset live");
331744e3
JD
64 }
65}
66
67/*
68 * Execute action on a timer switch.
d98a47c7
MD
69 *
70 * Beware: metadata_switch_timer() should *never* take a mutex also held
71 * while consumer_timer_switch_stop() is called. It would result in
72 * deadlocks.
331744e3
JD
73 */
74static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
75 int sig, siginfo_t *si, void *uc)
76{
77 int ret;
78 struct lttng_consumer_channel *channel;
79
80 channel = si->si_value.sival_ptr;
81 assert(channel);
82
4419b4fb
MD
83 if (channel->switch_timer_error) {
84 return;
85 }
86
331744e3
JD
87 DBG("Switch timer for channel %" PRIu64, channel->key);
88 switch (ctx->type) {
89 case LTTNG_CONSUMER32_UST:
90 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
91 /*
92 * Locks taken by lttng_ustconsumer_request_metadata():
93 * - metadata_socket_lock
94 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 95 * - channel->metadata_cache->lock
4fa3dc0e 96 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
97 * - channel->timer_lock
98 * - channel->metadata_cache->lock
4fa3dc0e 99 *
5e41ebe1
MD
100 * Ensure that neither consumer_data.lock nor
101 * channel->lock are taken within this function, since
102 * they are held while consumer_timer_switch_stop() is
103 * called.
4fa3dc0e 104 */
94d49140 105 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 106 if (ret < 0) {
4419b4fb 107 channel->switch_timer_error = 1;
331744e3
JD
108 }
109 break;
110 case LTTNG_CONSUMER_KERNEL:
111 case LTTNG_CONSUMER_UNKNOWN:
112 assert(0);
113 break;
114 }
115}
116
528f2ffa
JD
117static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
118 uint64_t stream_id)
d3e2ba59
JD
119{
120 int ret;
50adc264 121 struct ctf_packet_index index;
d3e2ba59
JD
122
123 memset(&index, 0, sizeof(index));
528f2ffa 124 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
125 index.timestamp_end = htobe64(ts);
126 ret = consumer_stream_write_index(stream, &index);
127 if (ret < 0) {
128 goto error;
129 }
130
131error:
132 return ret;
133}
134
c585821b 135int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 136{
528f2ffa 137 uint64_t ts, stream_id;
d3e2ba59
JD
138 int ret;
139
d3e2ba59
JD
140 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
141 if (ret < 0) {
142 ERR("Failed to get the current timestamp");
c585821b 143 goto end;
d3e2ba59
JD
144 }
145 ret = kernctl_buffer_flush(stream->wait_fd);
146 if (ret < 0) {
147 ERR("Failed to flush kernel stream");
c585821b 148 goto end;
d3e2ba59
JD
149 }
150 ret = kernctl_snapshot(stream->wait_fd);
151 if (ret < 0) {
08b1dcd3
DG
152 if (errno != EAGAIN && errno != ENODATA) {
153 PERROR("live timer kernel snapshot");
d3e2ba59 154 ret = -1;
c585821b 155 goto end;
d3e2ba59 156 }
528f2ffa
JD
157 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
158 if (ret < 0) {
159 PERROR("kernctl_get_stream_id");
c585821b 160 goto end;
528f2ffa 161 }
d3e2ba59 162 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 163 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 164 if (ret < 0) {
c585821b 165 goto end;
d3e2ba59
JD
166 }
167 }
168 ret = 0;
c585821b 169end:
d3e2ba59
JD
170 return ret;
171}
172
c585821b 173static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 174{
d3e2ba59
JD
175 int ret;
176
d3e2ba59
JD
177 /*
178 * While holding the stream mutex, try to take a snapshot, if it
179 * succeeds, it means that data is ready to be sent, just let the data
180 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
181 * means that there is no data to read after the flush, so we can
182 * safely send the empty index.
c585821b
MD
183 *
184 * Doing a trylock and checking if waiting on metadata if
185 * trylock fails. Bail out of the stream is indeed waiting for
186 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 187 */
c585821b
MD
188 for (;;) {
189 ret = pthread_mutex_trylock(&stream->lock);
190 switch (ret) {
191 case 0:
192 break; /* We have the lock. */
193 case EBUSY:
194 pthread_mutex_lock(&stream->metadata_timer_lock);
195 if (stream->waiting_on_metadata) {
196 ret = 0;
197 stream->missed_metadata_flush = true;
198 pthread_mutex_unlock(&stream->metadata_timer_lock);
199 goto end; /* Bail out. */
200 }
201 pthread_mutex_unlock(&stream->metadata_timer_lock);
202 /* Try again. */
203 caa_cpu_relax();
204 continue;
205 default:
206 ERR("Unexpected pthread_mutex_trylock error %d", ret);
207 ret = -1;
208 goto end;
209 }
210 break;
211 }
212 ret = consumer_flush_kernel_index(stream);
213 pthread_mutex_unlock(&stream->lock);
214end:
215 return ret;
216}
217
218int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
219{
220 uint64_t ts, stream_id;
221 int ret;
222
94d49140
JD
223 ret = cds_lfht_is_node_deleted(&stream->node.node);
224 if (ret) {
c585821b 225 goto end;
94d49140
JD
226 }
227
84a182ce 228 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
229 if (ret < 0) {
230 ERR("Failed to get the current timestamp");
c585821b 231 goto end;
d3e2ba59 232 }
84a182ce
DG
233 lttng_ustconsumer_flush_buffer(stream, 1);
234 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 235 if (ret < 0) {
94d49140 236 if (ret != -EAGAIN) {
d3e2ba59
JD
237 ERR("Taking UST snapshot");
238 ret = -1;
c585821b 239 goto end;
d3e2ba59 240 }
70190e1c 241 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
242 if (ret < 0) {
243 PERROR("ustctl_get_stream_id");
c585821b 244 goto end;
528f2ffa 245 }
d3e2ba59 246 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 247 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 248 if (ret < 0) {
c585821b 249 goto end;
d3e2ba59
JD
250 }
251 }
252 ret = 0;
c585821b
MD
253end:
254 return ret;
255}
d3e2ba59 256
c585821b
MD
257static int check_ust_stream(struct lttng_consumer_stream *stream)
258{
259 int ret;
260
261 assert(stream);
262 assert(stream->ustream);
263 /*
264 * While holding the stream mutex, try to take a snapshot, if it
265 * succeeds, it means that data is ready to be sent, just let the data
266 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
267 * means that there is no data to read after the flush, so we can
268 * safely send the empty index.
269 *
270 * Doing a trylock and checking if waiting on metadata if
271 * trylock fails. Bail out of the stream is indeed waiting for
272 * metadata to be pushed. Busy wait on trylock otherwise.
273 */
274 for (;;) {
275 ret = pthread_mutex_trylock(&stream->lock);
276 switch (ret) {
277 case 0:
278 break; /* We have the lock. */
279 case EBUSY:
280 pthread_mutex_lock(&stream->metadata_timer_lock);
281 if (stream->waiting_on_metadata) {
282 ret = 0;
283 stream->missed_metadata_flush = true;
284 pthread_mutex_unlock(&stream->metadata_timer_lock);
285 goto end; /* Bail out. */
286 }
287 pthread_mutex_unlock(&stream->metadata_timer_lock);
288 /* Try again. */
289 caa_cpu_relax();
290 continue;
291 default:
292 ERR("Unexpected pthread_mutex_trylock error %d", ret);
293 ret = -1;
294 goto end;
295 }
296 break;
297 }
298 ret = consumer_flush_ust_index(stream);
d3e2ba59 299 pthread_mutex_unlock(&stream->lock);
c585821b 300end:
d3e2ba59
JD
301 return ret;
302}
303
304/*
305 * Execute action on a live timer
306 */
307static void live_timer(struct lttng_consumer_local_data *ctx,
308 int sig, siginfo_t *si, void *uc)
309{
310 int ret;
311 struct lttng_consumer_channel *channel;
312 struct lttng_consumer_stream *stream;
313 struct lttng_ht *ht;
314 struct lttng_ht_iter iter;
315
316 channel = si->si_value.sival_ptr;
317 assert(channel);
318
319 if (channel->switch_timer_error) {
320 goto error;
321 }
322 ht = consumer_data.stream_per_chan_id_ht;
323
324 DBG("Live timer for channel %" PRIu64, channel->key);
325
326 rcu_read_lock();
327 switch (ctx->type) {
328 case LTTNG_CONSUMER32_UST:
329 case LTTNG_CONSUMER64_UST:
330 cds_lfht_for_each_entry_duplicate(ht->ht,
331 ht->hash_fct(&channel->key, lttng_ht_seed),
332 ht->match_fct, &channel->key, &iter.iter,
333 stream, node_channel_id.node) {
334 ret = check_ust_stream(stream);
335 if (ret < 0) {
336 goto error_unlock;
337 }
338 }
339 break;
340 case LTTNG_CONSUMER_KERNEL:
341 cds_lfht_for_each_entry_duplicate(ht->ht,
342 ht->hash_fct(&channel->key, lttng_ht_seed),
343 ht->match_fct, &channel->key, &iter.iter,
344 stream, node_channel_id.node) {
345 ret = check_kernel_stream(stream);
346 if (ret < 0) {
347 goto error_unlock;
348 }
349 }
350 break;
351 case LTTNG_CONSUMER_UNKNOWN:
352 assert(0);
353 break;
354 }
355
356error_unlock:
357 rcu_read_unlock();
358
359error:
360 return;
361}
362
2b8f8754
MD
363static
364void consumer_timer_signal_thread_qs(unsigned int signr)
365{
366 sigset_t pending_set;
367 int ret;
368
369 /*
370 * We need to be the only thread interacting with the thread
371 * that manages signals for teardown synchronization.
372 */
373 pthread_mutex_lock(&timer_signal.lock);
374
375 /* Ensure we don't have any signal queued for this channel. */
376 for (;;) {
377 ret = sigemptyset(&pending_set);
378 if (ret == -1) {
379 PERROR("sigemptyset");
380 }
381 ret = sigpending(&pending_set);
382 if (ret == -1) {
383 PERROR("sigpending");
384 }
385 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
386 break;
387 }
388 caa_cpu_relax();
389 }
390
391 /*
392 * From this point, no new signal handler will be fired that would try to
393 * access "chan". However, we still need to wait for any currently
394 * executing handler to complete.
395 */
396 cmm_smp_mb();
397 CMM_STORE_SHARED(timer_signal.qs_done, 0);
398 cmm_smp_mb();
399
400 /*
401 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
402 * up.
403 */
404 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
405
406 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
407 caa_cpu_relax();
408 }
409 cmm_smp_mb();
410
411 pthread_mutex_unlock(&timer_signal.lock);
412}
413
331744e3
JD
414/*
415 * Set the timer for periodical metadata flush.
416 */
417void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
418 unsigned int switch_timer_interval)
419{
420 int ret;
421 struct sigevent sev;
422 struct itimerspec its;
423
424 assert(channel);
425 assert(channel->key);
426
427 if (switch_timer_interval == 0) {
428 return;
429 }
430
431 sev.sigev_notify = SIGEV_SIGNAL;
432 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
433 sev.sigev_value.sival_ptr = channel;
434 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
435 if (ret == -1) {
436 PERROR("timer_create");
437 }
438 channel->switch_timer_enabled = 1;
439
440 its.it_value.tv_sec = switch_timer_interval / 1000000;
441 its.it_value.tv_nsec = switch_timer_interval % 1000000;
442 its.it_interval.tv_sec = its.it_value.tv_sec;
443 its.it_interval.tv_nsec = its.it_value.tv_nsec;
444
445 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
446 if (ret == -1) {
447 PERROR("timer_settime");
448 }
449}
450
451/*
452 * Stop and delete timer.
453 */
454void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
455{
456 int ret;
331744e3
JD
457
458 assert(channel);
459
460 ret = timer_delete(channel->switch_timer);
461 if (ret == -1) {
462 PERROR("timer_delete");
463 }
464
2b8f8754 465 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 466
2b8f8754
MD
467 channel->switch_timer = 0;
468 channel->switch_timer_enabled = 0;
331744e3
JD
469}
470
d3e2ba59
JD
471/*
472 * Set the timer for the live mode.
473 */
474void consumer_timer_live_start(struct lttng_consumer_channel *channel,
475 int live_timer_interval)
476{
477 int ret;
478 struct sigevent sev;
479 struct itimerspec its;
480
481 assert(channel);
482 assert(channel->key);
483
fac41e72 484 if (live_timer_interval <= 0) {
d3e2ba59
JD
485 return;
486 }
487
488 sev.sigev_notify = SIGEV_SIGNAL;
489 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
490 sev.sigev_value.sival_ptr = channel;
491 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
492 if (ret == -1) {
493 PERROR("timer_create");
494 }
495 channel->live_timer_enabled = 1;
496
497 its.it_value.tv_sec = live_timer_interval / 1000000;
498 its.it_value.tv_nsec = live_timer_interval % 1000000;
499 its.it_interval.tv_sec = its.it_value.tv_sec;
500 its.it_interval.tv_nsec = its.it_value.tv_nsec;
501
502 ret = timer_settime(channel->live_timer, 0, &its, NULL);
503 if (ret == -1) {
504 PERROR("timer_settime");
505 }
506}
507
508/*
509 * Stop and delete timer.
510 */
511void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
512{
513 int ret;
514
515 assert(channel);
516
517 ret = timer_delete(channel->live_timer);
518 if (ret == -1) {
519 PERROR("timer_delete");
520 }
521
522 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
523
524 channel->live_timer = 0;
525 channel->live_timer_enabled = 0;
526}
527
331744e3
JD
528/*
529 * Block the RT signals for the entire process. It must be called from the
530 * consumer main before creating the threads
531 */
73664f81 532int consumer_signal_init(void)
331744e3
JD
533{
534 int ret;
535 sigset_t mask;
536
537 /* Block signal for entire process, so only our thread processes it. */
538 setmask(&mask);
539 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
540 if (ret) {
541 errno = ret;
542 PERROR("pthread_sigmask");
73664f81 543 return -1;
331744e3 544 }
73664f81 545 return 0;
331744e3
JD
546}
547
548/*
d3e2ba59
JD
549 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
550 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 551 */
d3e2ba59 552void *consumer_timer_thread(void *data)
331744e3
JD
553{
554 int signr;
555 sigset_t mask;
556 siginfo_t info;
557 struct lttng_consumer_local_data *ctx = data;
558
8a9acb74
MD
559 rcu_register_thread();
560
1fc79fb4
MD
561 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
562
2d57de81
MD
563 if (testpoint(consumerd_thread_metadata_timer)) {
564 goto error_testpoint;
565 }
566
9ce5646a
MD
567 health_code_update();
568
331744e3
JD
569 /* Only self thread will receive signal mask. */
570 setmask(&mask);
571 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
572
573 while (1) {
9ce5646a
MD
574 health_code_update();
575
576 health_poll_entry();
331744e3 577 signr = sigwaitinfo(&mask, &info);
9ce5646a 578 health_poll_exit();
331744e3
JD
579 if (signr == -1) {
580 if (errno != EINTR) {
581 PERROR("sigwaitinfo");
582 }
583 continue;
584 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
585 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
586 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
587 cmm_smp_mb();
588 CMM_STORE_SHARED(timer_signal.qs_done, 1);
589 cmm_smp_mb();
590 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
591 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
592 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
593 } else {
594 ERR("Unexpected signal %d\n", info.si_signo);
595 }
596 }
597
2d57de81
MD
598error_testpoint:
599 /* Only reached in testpoint error */
600 health_error();
1fc79fb4
MD
601 health_unregister(health_consumerd);
602
8a9acb74
MD
603 rcu_unregister_thread();
604
1fc79fb4 605 /* Never return */
331744e3
JD
606 return NULL;
607}
This page took 0.064106 seconds and 5 git commands to generate.