2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
34 typedef int (*sample_positions_cb
)(struct lttng_consumer_stream
*stream
);
35 typedef int (*get_consumed_cb
)(struct lttng_consumer_stream
*stream
,
36 unsigned long *consumed
);
37 typedef int (*get_produced_cb
)(struct lttng_consumer_stream
*stream
,
38 unsigned long *produced
);
39 typedef int (*flush_index_cb
)(struct lttng_consumer_stream
*stream
);
41 static struct timer_signal_data timer_signal
= {
45 .lock
= PTHREAD_MUTEX_INITIALIZER
,
49 * Set custom signal mask to current thread.
51 static void setmask(sigset_t
*mask
)
55 ret
= sigemptyset(mask
);
57 PERROR("sigemptyset");
59 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_SWITCH
);
61 PERROR("sigaddset switch");
63 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_TEARDOWN
);
65 PERROR("sigaddset teardown");
67 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_LIVE
);
69 PERROR("sigaddset live");
71 ret
= sigaddset(mask
, LTTNG_CONSUMER_SIG_EXIT
);
73 PERROR("sigaddset exit");
78 * Execute action on a timer switch.
80 * Beware: metadata_switch_timer() should *never* take a mutex also held
81 * while consumer_timer_switch_stop() is called. It would result in
84 static void metadata_switch_timer(struct lttng_consumer_local_data
*ctx
,
85 int sig
, siginfo_t
*si
, void *uc
)
88 struct lttng_consumer_channel
*channel
;
90 channel
= si
->si_value
.sival_ptr
;
93 if (channel
->switch_timer_error
) {
97 DBG("Switch timer for channel %" PRIu64
, channel
->key
);
99 case LTTNG_CONSUMER32_UST
:
100 case LTTNG_CONSUMER64_UST
:
102 * Locks taken by lttng_ustconsumer_request_metadata():
103 * - metadata_socket_lock
104 * - Calling lttng_ustconsumer_recv_metadata():
105 * - channel->metadata_cache->lock
106 * - Calling consumer_metadata_cache_flushed():
107 * - channel->timer_lock
108 * - channel->metadata_cache->lock
110 * Ensure that neither consumer_data.lock nor
111 * channel->lock are taken within this function, since
112 * they are held while consumer_timer_switch_stop() is
115 ret
= lttng_ustconsumer_request_metadata(ctx
, channel
, 1, 1);
117 channel
->switch_timer_error
= 1;
120 case LTTNG_CONSUMER_KERNEL
:
121 case LTTNG_CONSUMER_UNKNOWN
:
127 static int send_empty_index(struct lttng_consumer_stream
*stream
, uint64_t ts
,
131 struct ctf_packet_index index
;
133 memset(&index
, 0, sizeof(index
));
134 index
.stream_id
= htobe64(stream_id
);
135 index
.timestamp_end
= htobe64(ts
);
136 ret
= consumer_stream_write_index(stream
, &index
);
145 int consumer_flush_kernel_index(struct lttng_consumer_stream
*stream
)
147 uint64_t ts
, stream_id
;
150 ret
= kernctl_get_current_timestamp(stream
->wait_fd
, &ts
);
152 ERR("Failed to get the current timestamp");
155 ret
= kernctl_buffer_flush(stream
->wait_fd
);
157 ERR("Failed to flush kernel stream");
160 ret
= kernctl_snapshot(stream
->wait_fd
);
162 if (ret
!= -EAGAIN
&& ret
!= -ENODATA
) {
163 PERROR("live timer kernel snapshot");
167 ret
= kernctl_get_stream_id(stream
->wait_fd
, &stream_id
);
169 PERROR("kernctl_get_stream_id");
172 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
173 ret
= send_empty_index(stream
, ts
, stream_id
);
183 static int check_stream(struct lttng_consumer_stream
*stream
,
184 flush_index_cb flush_index
)
189 * While holding the stream mutex, try to take a snapshot, if it
190 * succeeds, it means that data is ready to be sent, just let the data
191 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
192 * means that there is no data to read after the flush, so we can
193 * safely send the empty index.
195 * Doing a trylock and checking if waiting on metadata if
196 * trylock fails. Bail out of the stream is indeed waiting for
197 * metadata to be pushed. Busy wait on trylock otherwise.
200 ret
= pthread_mutex_trylock(&stream
->lock
);
203 break; /* We have the lock. */
205 pthread_mutex_lock(&stream
->metadata_timer_lock
);
206 if (stream
->waiting_on_metadata
) {
208 stream
->missed_metadata_flush
= true;
209 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
210 goto end
; /* Bail out. */
212 pthread_mutex_unlock(&stream
->metadata_timer_lock
);
217 ERR("Unexpected pthread_mutex_trylock error %d", ret
);
223 ret
= flush_index(stream
);
224 pthread_mutex_unlock(&stream
->lock
);
229 int consumer_flush_ust_index(struct lttng_consumer_stream
*stream
)
231 uint64_t ts
, stream_id
;
234 ret
= cds_lfht_is_node_deleted(&stream
->node
.node
);
239 ret
= lttng_ustconsumer_get_current_timestamp(stream
, &ts
);
241 ERR("Failed to get the current timestamp");
244 lttng_ustconsumer_flush_buffer(stream
, 1);
245 ret
= lttng_ustconsumer_take_snapshot(stream
);
247 if (ret
!= -EAGAIN
) {
248 ERR("Taking UST snapshot");
252 ret
= lttng_ustconsumer_get_stream_id(stream
, &stream_id
);
254 PERROR("ustctl_get_stream_id");
257 DBG("Stream %" PRIu64
" empty, sending beacon", stream
->key
);
258 ret
= send_empty_index(stream
, ts
, stream_id
);
269 * Execute action on a live timer
271 static void live_timer(struct lttng_consumer_local_data
*ctx
,
272 int sig
, siginfo_t
*si
, void *uc
)
275 struct lttng_consumer_channel
*channel
;
276 struct lttng_consumer_stream
*stream
;
277 struct lttng_ht_iter iter
;
278 const struct lttng_ht
*ht
= consumer_data
.stream_per_chan_id_ht
;
279 const flush_index_cb flush_index
=
280 ctx
->type
== LTTNG_CONSUMER_KERNEL
?
281 consumer_flush_kernel_index
:
282 consumer_flush_ust_index
;
284 channel
= si
->si_value
.sival_ptr
;
287 if (channel
->switch_timer_error
) {
291 DBG("Live timer for channel %" PRIu64
, channel
->key
);
292 if (channel
->live_timer_enabled
!= 1) {
293 DBG("Liver timer was stopped before the iteration. Quitting early.");
298 cds_lfht_for_each_entry_duplicate(ht
->ht
,
299 ht
->hash_fct(&channel
->key
, lttng_ht_seed
),
300 ht
->match_fct
, &channel
->key
, &iter
.iter
,
301 stream
, node_channel_id
.node
) {
302 if (channel
->live_timer_enabled
!= 1) {
303 DBG("Liver timer was stopped during the iteration. Quitting early.");
307 ret
= check_stream(stream
, flush_index
);
323 void consumer_timer_signal_thread_qs(unsigned int signr
)
325 sigset_t pending_set
;
329 * We need to be the only thread interacting with the thread
330 * that manages signals for teardown synchronization.
332 pthread_mutex_lock(&timer_signal
.lock
);
334 /* Ensure we don't have any signal queued for this channel. */
336 ret
= sigemptyset(&pending_set
);
338 PERROR("sigemptyset");
340 ret
= sigpending(&pending_set
);
342 PERROR("sigpending");
344 if (!sigismember(&pending_set
, signr
)) {
351 * From this point, no new signal handler will be fired that would try to
352 * access "chan". However, we still need to wait for any currently
353 * executing handler to complete.
356 CMM_STORE_SHARED(timer_signal
.qs_done
, 0);
360 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
363 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN
);
365 while (!CMM_LOAD_SHARED(timer_signal
.qs_done
)) {
370 pthread_mutex_unlock(&timer_signal
.lock
);
374 * Set the timer for periodical metadata flush.
376 void consumer_timer_switch_start(struct lttng_consumer_channel
*channel
,
377 unsigned int switch_timer_interval
)
381 struct itimerspec its
;
384 assert(channel
->key
);
386 if (switch_timer_interval
== 0) {
390 sev
.sigev_notify
= SIGEV_SIGNAL
;
391 sev
.sigev_signo
= LTTNG_CONSUMER_SIG_SWITCH
;
392 sev
.sigev_value
.sival_ptr
= channel
;
393 ret
= timer_create(CLOCKID
, &sev
, &channel
->switch_timer
);
395 PERROR("timer_create");
397 channel
->switch_timer_enabled
= 1;
399 its
.it_value
.tv_sec
= switch_timer_interval
/ 1000000;
400 its
.it_value
.tv_nsec
= (switch_timer_interval
% 1000000) * 1000;
401 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
402 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
404 ret
= timer_settime(channel
->switch_timer
, 0, &its
, NULL
);
406 PERROR("timer_settime");
411 * Stop and delete timer.
413 void consumer_timer_switch_stop(struct lttng_consumer_channel
*channel
)
419 ret
= timer_delete(channel
->switch_timer
);
421 PERROR("timer_delete");
424 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH
);
426 channel
->switch_timer
= 0;
427 channel
->switch_timer_enabled
= 0;
431 * Set the timer for the live mode.
433 void consumer_timer_live_start(struct lttng_consumer_channel
*channel
,
434 int live_timer_interval
)
438 struct itimerspec its
;
441 assert(channel
->key
);
443 if (live_timer_interval
<= 0) {
447 sev
.sigev_notify
= SIGEV_SIGNAL
;
448 sev
.sigev_signo
= LTTNG_CONSUMER_SIG_LIVE
;
449 sev
.sigev_value
.sival_ptr
= channel
;
450 ret
= timer_create(CLOCKID
, &sev
, &channel
->live_timer
);
452 PERROR("timer_create");
454 channel
->live_timer_enabled
= 1;
456 its
.it_value
.tv_sec
= live_timer_interval
/ 1000000;
457 its
.it_value
.tv_nsec
= (live_timer_interval
% 1000000) * 1000;
458 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
459 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
461 ret
= timer_settime(channel
->live_timer
, 0, &its
, NULL
);
463 PERROR("timer_settime");
468 * Stop and delete timer.
470 void consumer_timer_live_stop(struct lttng_consumer_channel
*channel
)
476 if (!channel
->live_timer
) {
480 ret
= timer_delete(channel
->live_timer
);
482 PERROR("timer_delete");
485 channel
->live_timer
= 0;
486 channel
->live_timer_enabled
= 0;
488 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE
);
492 * Block the RT signals for the entire process. It must be called from the
493 * consumer main before creating the threads
495 int consumer_signal_init(void)
500 /* Block signal for entire process, so only our thread processes it. */
502 ret
= pthread_sigmask(SIG_BLOCK
, &mask
, NULL
);
505 PERROR("pthread_sigmask");
512 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
513 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
514 * LTTNG_CONSUMER_SIG_EXIT.
516 void *consumer_timer_thread(void *data
)
521 struct lttng_consumer_local_data
*ctx
= data
;
523 rcu_register_thread();
525 health_register(health_consumerd
, HEALTH_CONSUMERD_TYPE_METADATA_TIMER
);
527 if (testpoint(consumerd_thread_metadata_timer
)) {
528 goto error_testpoint
;
531 health_code_update();
533 /* Only self thread will receive signal mask. */
535 CMM_STORE_SHARED(timer_signal
.tid
, pthread_self());
538 health_code_update();
541 signr
= sigwaitinfo(&mask
, &info
);
544 if (errno
!= EINTR
) {
545 PERROR("sigwaitinfo");
548 } else if (signr
== LTTNG_CONSUMER_SIG_SWITCH
) {
549 metadata_switch_timer(ctx
, info
.si_signo
, &info
, NULL
);
550 } else if (signr
== LTTNG_CONSUMER_SIG_TEARDOWN
) {
552 CMM_STORE_SHARED(timer_signal
.qs_done
, 1);
554 DBG("Signal timer metadata thread teardown");
555 } else if (signr
== LTTNG_CONSUMER_SIG_LIVE
) {
556 live_timer(ctx
, info
.si_signo
, &info
, NULL
);
557 } else if (signr
== LTTNG_CONSUMER_SIG_EXIT
) {
558 assert(consumer_quit
);
561 ERR("Unexpected signal %d\n", info
.si_signo
);
566 /* Only reached in testpoint error */
569 health_unregister(health_consumerd
);
570 rcu_unregister_thread();