Deliverables 3 and 4
[deliverable/lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
daa81f58 24#include <lttng/ust-ctl.h>
51a9e1c7 25#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 26#include <common/common.h>
f263b7fd 27#include <common/compat/endian.h>
d3e2ba59
JD
28#include <common/kernel-ctl/kernel-ctl.h>
29#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
30#include <common/consumer/consumer-stream.h>
31#include <common/consumer/consumer-timer.h>
32#include <common/consumer/consumer-testpoint.h>
33#include <common/ust-consumer/ust-consumer.h>
331744e3 34
2b8f8754
MD
35static struct timer_signal_data timer_signal = {
36 .tid = 0,
37 .setup_done = 0,
38 .qs_done = 0,
39 .lock = PTHREAD_MUTEX_INITIALIZER,
40};
331744e3
JD
41
42/*
43 * Set custom signal mask to current thread.
44 */
45static void setmask(sigset_t *mask)
46{
47 int ret;
48
49 ret = sigemptyset(mask);
50 if (ret) {
51 PERROR("sigemptyset");
52 }
53 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
54 if (ret) {
d3e2ba59 55 PERROR("sigaddset switch");
331744e3
JD
56 }
57 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
58 if (ret) {
d3e2ba59
JD
59 PERROR("sigaddset teardown");
60 }
61 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
62 if (ret) {
63 PERROR("sigaddset live");
331744e3 64 }
daa81f58
JG
65 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
66 if (ret) {
67 PERROR("sigaddset monitor");
68 }
331744e3
JD
69}
70
daa81f58
JG
71static int channel_monitor_pipe = -1;
72
331744e3
JD
73/*
74 * Execute action on a timer switch.
d98a47c7
MD
75 *
76 * Beware: metadata_switch_timer() should *never* take a mutex also held
77 * while consumer_timer_switch_stop() is called. It would result in
78 * deadlocks.
331744e3
JD
79 */
80static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
daa81f58 81 int sig, siginfo_t *si)
331744e3
JD
82{
83 int ret;
84 struct lttng_consumer_channel *channel;
85
86 channel = si->si_value.sival_ptr;
87 assert(channel);
88
4419b4fb
MD
89 if (channel->switch_timer_error) {
90 return;
91 }
92
331744e3
JD
93 DBG("Switch timer for channel %" PRIu64, channel->key);
94 switch (ctx->type) {
95 case LTTNG_CONSUMER32_UST:
96 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
97 /*
98 * Locks taken by lttng_ustconsumer_request_metadata():
99 * - metadata_socket_lock
100 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 101 * - channel->metadata_cache->lock
4fa3dc0e 102 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
103 * - channel->timer_lock
104 * - channel->metadata_cache->lock
4fa3dc0e 105 *
5e41ebe1
MD
106 * Ensure that neither consumer_data.lock nor
107 * channel->lock are taken within this function, since
108 * they are held while consumer_timer_switch_stop() is
109 * called.
4fa3dc0e 110 */
94d49140 111 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 112 if (ret < 0) {
4419b4fb 113 channel->switch_timer_error = 1;
331744e3
JD
114 }
115 break;
116 case LTTNG_CONSUMER_KERNEL:
117 case LTTNG_CONSUMER_UNKNOWN:
118 assert(0);
119 break;
120 }
121}
122
528f2ffa
JD
123static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
124 uint64_t stream_id)
d3e2ba59
JD
125{
126 int ret;
50adc264 127 struct ctf_packet_index index;
d3e2ba59
JD
128
129 memset(&index, 0, sizeof(index));
528f2ffa 130 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
131 index.timestamp_end = htobe64(ts);
132 ret = consumer_stream_write_index(stream, &index);
133 if (ret < 0) {
134 goto error;
135 }
136
137error:
138 return ret;
139}
140
c585821b 141int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 142{
528f2ffa 143 uint64_t ts, stream_id;
d3e2ba59
JD
144 int ret;
145
d3e2ba59
JD
146 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
147 if (ret < 0) {
148 ERR("Failed to get the current timestamp");
c585821b 149 goto end;
d3e2ba59
JD
150 }
151 ret = kernctl_buffer_flush(stream->wait_fd);
152 if (ret < 0) {
153 ERR("Failed to flush kernel stream");
c585821b 154 goto end;
d3e2ba59
JD
155 }
156 ret = kernctl_snapshot(stream->wait_fd);
157 if (ret < 0) {
32af2c95 158 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 159 PERROR("live timer kernel snapshot");
d3e2ba59 160 ret = -1;
c585821b 161 goto end;
d3e2ba59 162 }
528f2ffa
JD
163 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
164 if (ret < 0) {
165 PERROR("kernctl_get_stream_id");
c585821b 166 goto end;
528f2ffa 167 }
d3e2ba59 168 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 169 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 170 if (ret < 0) {
c585821b 171 goto end;
d3e2ba59
JD
172 }
173 }
174 ret = 0;
c585821b 175end:
d3e2ba59
JD
176 return ret;
177}
178
c585821b 179static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 180{
d3e2ba59
JD
181 int ret;
182
d3e2ba59
JD
183 /*
184 * While holding the stream mutex, try to take a snapshot, if it
185 * succeeds, it means that data is ready to be sent, just let the data
186 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
187 * means that there is no data to read after the flush, so we can
188 * safely send the empty index.
c585821b
MD
189 *
190 * Doing a trylock and checking if waiting on metadata if
191 * trylock fails. Bail out of the stream is indeed waiting for
192 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 193 */
c585821b
MD
194 for (;;) {
195 ret = pthread_mutex_trylock(&stream->lock);
196 switch (ret) {
197 case 0:
198 break; /* We have the lock. */
199 case EBUSY:
200 pthread_mutex_lock(&stream->metadata_timer_lock);
201 if (stream->waiting_on_metadata) {
202 ret = 0;
203 stream->missed_metadata_flush = true;
204 pthread_mutex_unlock(&stream->metadata_timer_lock);
205 goto end; /* Bail out. */
206 }
207 pthread_mutex_unlock(&stream->metadata_timer_lock);
208 /* Try again. */
209 caa_cpu_relax();
210 continue;
211 default:
212 ERR("Unexpected pthread_mutex_trylock error %d", ret);
213 ret = -1;
214 goto end;
215 }
216 break;
217 }
218 ret = consumer_flush_kernel_index(stream);
219 pthread_mutex_unlock(&stream->lock);
220end:
221 return ret;
222}
223
224int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
225{
226 uint64_t ts, stream_id;
227 int ret;
228
94d49140
JD
229 ret = cds_lfht_is_node_deleted(&stream->node.node);
230 if (ret) {
c585821b 231 goto end;
94d49140
JD
232 }
233
84a182ce 234 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
235 if (ret < 0) {
236 ERR("Failed to get the current timestamp");
c585821b 237 goto end;
d3e2ba59 238 }
84a182ce
DG
239 lttng_ustconsumer_flush_buffer(stream, 1);
240 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 241 if (ret < 0) {
94d49140 242 if (ret != -EAGAIN) {
d3e2ba59
JD
243 ERR("Taking UST snapshot");
244 ret = -1;
c585821b 245 goto end;
d3e2ba59 246 }
70190e1c 247 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
248 if (ret < 0) {
249 PERROR("ustctl_get_stream_id");
c585821b 250 goto end;
528f2ffa 251 }
d3e2ba59 252 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 253 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 254 if (ret < 0) {
c585821b 255 goto end;
d3e2ba59
JD
256 }
257 }
258 ret = 0;
c585821b
MD
259end:
260 return ret;
261}
d3e2ba59 262
c585821b
MD
263static int check_ust_stream(struct lttng_consumer_stream *stream)
264{
265 int ret;
266
267 assert(stream);
268 assert(stream->ustream);
269 /*
270 * While holding the stream mutex, try to take a snapshot, if it
271 * succeeds, it means that data is ready to be sent, just let the data
272 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
273 * means that there is no data to read after the flush, so we can
274 * safely send the empty index.
275 *
276 * Doing a trylock and checking if waiting on metadata if
277 * trylock fails. Bail out of the stream is indeed waiting for
278 * metadata to be pushed. Busy wait on trylock otherwise.
279 */
280 for (;;) {
281 ret = pthread_mutex_trylock(&stream->lock);
282 switch (ret) {
283 case 0:
284 break; /* We have the lock. */
285 case EBUSY:
286 pthread_mutex_lock(&stream->metadata_timer_lock);
287 if (stream->waiting_on_metadata) {
288 ret = 0;
289 stream->missed_metadata_flush = true;
290 pthread_mutex_unlock(&stream->metadata_timer_lock);
291 goto end; /* Bail out. */
292 }
293 pthread_mutex_unlock(&stream->metadata_timer_lock);
294 /* Try again. */
295 caa_cpu_relax();
296 continue;
297 default:
298 ERR("Unexpected pthread_mutex_trylock error %d", ret);
299 ret = -1;
300 goto end;
301 }
302 break;
303 }
304 ret = consumer_flush_ust_index(stream);
d3e2ba59 305 pthread_mutex_unlock(&stream->lock);
c585821b 306end:
d3e2ba59
JD
307 return ret;
308}
309
310/*
311 * Execute action on a live timer
312 */
313static void live_timer(struct lttng_consumer_local_data *ctx,
daa81f58 314 int sig, siginfo_t *si)
d3e2ba59
JD
315{
316 int ret;
317 struct lttng_consumer_channel *channel;
318 struct lttng_consumer_stream *stream;
319 struct lttng_ht *ht;
320 struct lttng_ht_iter iter;
321
322 channel = si->si_value.sival_ptr;
323 assert(channel);
324
325 if (channel->switch_timer_error) {
326 goto error;
327 }
328 ht = consumer_data.stream_per_chan_id_ht;
329
330 DBG("Live timer for channel %" PRIu64, channel->key);
331
332 rcu_read_lock();
333 switch (ctx->type) {
334 case LTTNG_CONSUMER32_UST:
335 case LTTNG_CONSUMER64_UST:
336 cds_lfht_for_each_entry_duplicate(ht->ht,
337 ht->hash_fct(&channel->key, lttng_ht_seed),
338 ht->match_fct, &channel->key, &iter.iter,
339 stream, node_channel_id.node) {
340 ret = check_ust_stream(stream);
341 if (ret < 0) {
342 goto error_unlock;
343 }
344 }
345 break;
346 case LTTNG_CONSUMER_KERNEL:
347 cds_lfht_for_each_entry_duplicate(ht->ht,
348 ht->hash_fct(&channel->key, lttng_ht_seed),
349 ht->match_fct, &channel->key, &iter.iter,
350 stream, node_channel_id.node) {
351 ret = check_kernel_stream(stream);
352 if (ret < 0) {
353 goto error_unlock;
354 }
355 }
356 break;
357 case LTTNG_CONSUMER_UNKNOWN:
358 assert(0);
359 break;
360 }
361
362error_unlock:
363 rcu_read_unlock();
364
365error:
366 return;
367}
368
2b8f8754
MD
369static
370void consumer_timer_signal_thread_qs(unsigned int signr)
371{
372 sigset_t pending_set;
373 int ret;
374
375 /*
376 * We need to be the only thread interacting with the thread
377 * that manages signals for teardown synchronization.
378 */
379 pthread_mutex_lock(&timer_signal.lock);
380
381 /* Ensure we don't have any signal queued for this channel. */
382 for (;;) {
383 ret = sigemptyset(&pending_set);
384 if (ret == -1) {
385 PERROR("sigemptyset");
386 }
387 ret = sigpending(&pending_set);
388 if (ret == -1) {
389 PERROR("sigpending");
390 }
391 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
392 break;
393 }
394 caa_cpu_relax();
395 }
396
397 /*
398 * From this point, no new signal handler will be fired that would try to
399 * access "chan". However, we still need to wait for any currently
400 * executing handler to complete.
401 */
402 cmm_smp_mb();
403 CMM_STORE_SHARED(timer_signal.qs_done, 0);
404 cmm_smp_mb();
405
406 /*
407 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
408 * up.
409 */
410 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
411
412 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
413 caa_cpu_relax();
414 }
415 cmm_smp_mb();
416
417 pthread_mutex_unlock(&timer_signal.lock);
418}
419
331744e3 420/*
daa81f58
JG
421 * Start a timer channel timer which will fire at a given interval
422 * (timer_interval_us)and fire a given signal (signal).
423 *
424 * Returns a negative value on error, 0 if a timer was created, and
425 * a positive value if no timer was created (not an error).
331744e3 426 */
daa81f58
JG
427static
428int consumer_channel_timer_start(timer_t *timer_id,
429 struct lttng_consumer_channel *channel,
430 unsigned int timer_interval_us, int signal)
331744e3 431{
daa81f58 432 int ret = 0, delete_ret;
331744e3
JD
433 struct sigevent sev;
434 struct itimerspec its;
435
436 assert(channel);
437 assert(channel->key);
438
daa81f58
JG
439 if (timer_interval_us == 0) {
440 /* No creation needed; not an error. */
441 ret = 1;
442 goto end;
331744e3
JD
443 }
444
445 sev.sigev_notify = SIGEV_SIGNAL;
daa81f58 446 sev.sigev_signo = signal;
331744e3 447 sev.sigev_value.sival_ptr = channel;
daa81f58 448 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
449 if (ret == -1) {
450 PERROR("timer_create");
daa81f58 451 goto end;
331744e3 452 }
331744e3 453
daa81f58
JG
454 its.it_value.tv_sec = timer_interval_us / 1000000;
455 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
456 its.it_interval.tv_sec = its.it_value.tv_sec;
457 its.it_interval.tv_nsec = its.it_value.tv_nsec;
458
daa81f58 459 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
460 if (ret == -1) {
461 PERROR("timer_settime");
daa81f58
JG
462 goto error_destroy_timer;
463 }
464end:
465 return ret;
466error_destroy_timer:
467 delete_ret = timer_delete(*timer_id);
468 if (delete_ret == -1) {
469 PERROR("timer_delete");
470 }
471 goto end;
472}
473
474static
475int consumer_channel_timer_stop(timer_t *timer_id, int signal)
476{
477 int ret = 0;
478
479 ret = timer_delete(*timer_id);
480 if (ret == -1) {
481 PERROR("timer_delete");
482 goto end;
331744e3 483 }
daa81f58
JG
484
485 consumer_timer_signal_thread_qs(signal);
486 *timer_id = 0;
487end:
488 return ret;
489}
490
491/*
492 * Set the channel's switch timer.
493 */
494void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
495 unsigned int switch_timer_interval_us)
496{
497 int ret;
498
499 assert(channel);
500 assert(channel->key);
501
502 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
503 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
504
505 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
506}
507
508/*
daa81f58 509 * Stop and delete the channel's switch timer.
331744e3
JD
510 */
511void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
512{
513 int ret;
331744e3
JD
514
515 assert(channel);
516
daa81f58
JG
517 ret = consumer_channel_timer_stop(&channel->switch_timer,
518 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 519 if (ret == -1) {
daa81f58 520 ERR("Failed to stop switch timer");
331744e3
JD
521 }
522
2b8f8754 523 channel->switch_timer_enabled = 0;
331744e3
JD
524}
525
d3e2ba59 526/*
daa81f58 527 * Set the channel's live timer.
d3e2ba59
JD
528 */
529void consumer_timer_live_start(struct lttng_consumer_channel *channel,
daa81f58 530 unsigned int live_timer_interval_us)
d3e2ba59
JD
531{
532 int ret;
d3e2ba59
JD
533
534 assert(channel);
535 assert(channel->key);
536
daa81f58
JG
537 ret = consumer_channel_timer_start(&channel->live_timer, channel,
538 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 539
daa81f58
JG
540 channel->live_timer_enabled = !!(ret == 0);
541}
d3e2ba59 542
daa81f58
JG
543/*
544 * Stop and delete the channel's live timer.
545 */
546void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
547{
548 int ret;
d3e2ba59 549
daa81f58
JG
550 assert(channel);
551
552 ret = consumer_channel_timer_stop(&channel->live_timer,
553 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 554 if (ret == -1) {
daa81f58 555 ERR("Failed to stop live timer");
d3e2ba59 556 }
daa81f58
JG
557
558 channel->live_timer_enabled = 0;
d3e2ba59
JD
559}
560
561/*
daa81f58
JG
562 * Set the channel's monitoring timer.
563 *
564 * Returns a negative value on error, 0 if a timer was created, and
565 * a positive value if no timer was created (not an error).
d3e2ba59 566 */
daa81f58
JG
567int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
568 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
569{
570 int ret;
571
572 assert(channel);
daa81f58
JG
573 assert(channel->key);
574 assert(!channel->monitor_timer_enabled);
d3e2ba59 575
daa81f58
JG
576 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
577 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
578 channel->monitor_timer_enabled = !!(ret == 0);
579 return ret;
580}
581
582/*
583 * Stop and delete the channel's monitoring timer.
584 */
585int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
586{
587 int ret;
588
589 assert(channel);
590 assert(channel->monitor_timer_enabled);
591
592 ret = consumer_channel_timer_stop(&channel->monitor_timer,
593 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 594 if (ret == -1) {
daa81f58
JG
595 ERR("Failed to stop live timer");
596 goto end;
d3e2ba59
JD
597 }
598
daa81f58
JG
599 channel->monitor_timer_enabled = 0;
600end:
601 return ret;
d3e2ba59
JD
602}
603
331744e3
JD
604/*
605 * Block the RT signals for the entire process. It must be called from the
606 * consumer main before creating the threads
607 */
73664f81 608int consumer_signal_init(void)
331744e3
JD
609{
610 int ret;
611 sigset_t mask;
612
613 /* Block signal for entire process, so only our thread processes it. */
614 setmask(&mask);
615 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
616 if (ret) {
617 errno = ret;
618 PERROR("pthread_sigmask");
73664f81 619 return -1;
331744e3 620 }
73664f81 621 return 0;
331744e3
JD
622}
623
daa81f58
JG
624static
625int sample_ust_positions(struct lttng_consumer_channel *channel,
626 uint64_t *_highest_use, uint64_t *_lowest_use)
627{
628 int ret;
629 struct lttng_ht_iter iter;
630 struct lttng_consumer_stream *stream;
631 bool empty_channel = true;
632 uint64_t high = 0, low = UINT64_MAX;
633 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
634
635 rcu_read_lock();
636
637 cds_lfht_for_each_entry_duplicate(ht->ht,
638 ht->hash_fct(&channel->key, lttng_ht_seed),
639 ht->match_fct, &channel->key,
640 &iter.iter, stream, node_channel_id.node) {
641 unsigned long produced, consumed, usage;
642
643 empty_channel = false;
644
645 pthread_mutex_lock(&stream->lock);
646 if (cds_lfht_is_node_deleted(&stream->node.node)) {
647 goto next;
648 }
649
650 ret = ustctl_snapshot_sample_positions(stream->ustream);
651 if (ret) {
652 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
653 pthread_mutex_unlock(&stream->lock);
654 goto end;
655 }
656 ret = ustctl_snapshot_get_consumed(stream->ustream,
657 &consumed);
658 if (ret) {
659 ERR("Failed to get buffer consumed position in monitor timer");
660 pthread_mutex_unlock(&stream->lock);
661 goto end;
662 }
663 ret = ustctl_snapshot_get_produced(stream->ustream,
664 &produced);
665 if (ret) {
666 ERR("Failed to get buffer produced position in monitor timer");
667 pthread_mutex_unlock(&stream->lock);
668 goto end;
669 }
670
671 usage = produced - consumed;
672 high = (usage > high) ? usage : high;
673 low = (usage < low) ? usage : low;
674 next:
675 pthread_mutex_unlock(&stream->lock);
676 }
677
678 *_highest_use = high;
679 *_lowest_use = low;
680end:
681 rcu_read_unlock();
682 if (empty_channel) {
683 ret = -1;
684 }
685 return ret;
686}
687
688/*
689 * Execute action on a monitor timer.
690 */
691static
692void monitor_timer(struct lttng_consumer_local_data *ctx,
693 struct lttng_consumer_channel *channel)
694{
695 int ret;
696 int channel_monitor_pipe =
697 consumer_timer_thread_get_channel_monitor_pipe();
698 struct lttcomm_consumer_channel_monitor_msg msg = {
699 .key = channel->key,
700 };
701
702 assert(channel);
703 pthread_mutex_lock(&consumer_data.lock);
704
705 if (channel_monitor_pipe < 0) {
706 goto end;
707 }
708
709 switch (consumer_data.type) {
710 case LTTNG_CONSUMER_KERNEL:
711 /* TODO */
712 ret = -1;
713 break;
714 case LTTNG_CONSUMER32_UST:
715 case LTTNG_CONSUMER64_UST:
716 {
717 ret = sample_ust_positions(channel, &msg.highest, &msg.lowest);
718 break;
719 }
720 default:
721 abort();
722 }
723
724 if (ret) {
725 goto end;
726 }
727
728 /*
729 * Writes performed here are assumed to be atomic which is only
730 * guaranteed for sizes < than PIPE_BUF.
731 */
732 assert(sizeof(msg) <= PIPE_BUF);
733
734 do {
735 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
736 } while (ret == -1 && errno == EINTR);
737 if (ret == -1) {
738 if (errno == EAGAIN) {
739 /* Not an error, the sample is merely dropped. */
740 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
741 channel->key);
742 } else {
743 PERROR("write to the channel monitor pipe");
744 }
745 } else {
746 DBG("Sent channel monitoring sample for channel key %" PRIu64
747 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
748 channel->key, msg.highest, msg.lowest);
749 }
750end:
751 pthread_mutex_unlock(&consumer_data.lock);
752}
753
754int consumer_timer_thread_get_channel_monitor_pipe(void)
755{
756 return uatomic_read(&channel_monitor_pipe);
757}
758
759int consumer_timer_thread_set_channel_monitor_pipe(int fd)
760{
761 int ret;
762
763 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
764 if (ret != -1) {
765 ret = -1;
766 goto end;
767 }
768 ret = 0;
769end:
770 return ret;
771}
772
331744e3 773/*
d3e2ba59 774 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
daa81f58
JG
775 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
776 * LTTNG_CONSUMER_SIG_MONITOR.
331744e3 777 */
d3e2ba59 778void *consumer_timer_thread(void *data)
331744e3
JD
779{
780 int signr;
781 sigset_t mask;
782 siginfo_t info;
783 struct lttng_consumer_local_data *ctx = data;
784
8a9acb74
MD
785 rcu_register_thread();
786
1fc79fb4
MD
787 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
788
2d57de81
MD
789 if (testpoint(consumerd_thread_metadata_timer)) {
790 goto error_testpoint;
791 }
792
9ce5646a
MD
793 health_code_update();
794
331744e3
JD
795 /* Only self thread will receive signal mask. */
796 setmask(&mask);
797 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
798
799 while (1) {
9ce5646a
MD
800 health_code_update();
801
802 health_poll_entry();
331744e3 803 signr = sigwaitinfo(&mask, &info);
9ce5646a 804 health_poll_exit();
daa81f58
JG
805
806 /*
807 * NOTE: cascading conditions are used instead of a switch case
808 * since the use of SIGRTMIN in the definition of the signals'
809 * values prevents the reduction to an integer constant.
810 */
331744e3
JD
811 if (signr == -1) {
812 if (errno != EINTR) {
813 PERROR("sigwaitinfo");
814 }
815 continue;
816 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
daa81f58 817 metadata_switch_timer(ctx, info.si_signo, &info);
331744e3
JD
818 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
819 cmm_smp_mb();
820 CMM_STORE_SHARED(timer_signal.qs_done, 1);
821 cmm_smp_mb();
822 DBG("Signal timer metadata thread teardown");
d3e2ba59 823 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
daa81f58
JG
824 live_timer(ctx, info.si_signo, &info);
825 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
826 struct lttng_consumer_channel *channel;
827
828 channel = info.si_value.sival_ptr;
829 monitor_timer(ctx, channel);
331744e3
JD
830 } else {
831 ERR("Unexpected signal %d\n", info.si_signo);
832 }
833 }
834
2d57de81
MD
835error_testpoint:
836 /* Only reached in testpoint error */
837 health_error();
1fc79fb4
MD
838 health_unregister(health_consumerd);
839
8a9acb74
MD
840 rcu_unregister_thread();
841
1fc79fb4 842 /* Never return */
331744e3
JD
843 return NULL;
844}
This page took 0.119443 seconds and 5 git commands to generate.