CUSTOM: liver timer: immediate liver timer control on data pending and destroy
[lttng-tools.git] / src / common / consumer / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _LGPL_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
33
34 typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35 typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37 typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39 typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
40
41 static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46 };
47
48 /*
49 * Set custom signal mask to current thread.
50 */
51 static void setmask(sigset_t *mask)
52 {
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
61 PERROR("sigaddset switch");
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
70 }
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
72 if (ret) {
73 PERROR("sigaddset exit");
74 }
75 }
76
77 /*
78 * Execute action on a timer switch.
79 *
80 * Beware: metadata_switch_timer() should *never* take a mutex also held
81 * while consumer_timer_switch_stop() is called. It would result in
82 * deadlocks.
83 */
84 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
85 int sig, siginfo_t *si, void *uc)
86 {
87 int ret;
88 struct lttng_consumer_channel *channel;
89
90 channel = si->si_value.sival_ptr;
91 assert(channel);
92
93 if (channel->switch_timer_error) {
94 return;
95 }
96
97 DBG("Switch timer for channel %" PRIu64, channel->key);
98 switch (ctx->type) {
99 case LTTNG_CONSUMER32_UST:
100 case LTTNG_CONSUMER64_UST:
101 /*
102 * Locks taken by lttng_ustconsumer_request_metadata():
103 * - metadata_socket_lock
104 * - Calling lttng_ustconsumer_recv_metadata():
105 * - channel->metadata_cache->lock
106 * - Calling consumer_metadata_cache_flushed():
107 * - channel->timer_lock
108 * - channel->metadata_cache->lock
109 *
110 * Ensure that neither consumer_data.lock nor
111 * channel->lock are taken within this function, since
112 * they are held while consumer_timer_switch_stop() is
113 * called.
114 */
115 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
116 if (ret < 0) {
117 channel->switch_timer_error = 1;
118 }
119 break;
120 case LTTNG_CONSUMER_KERNEL:
121 case LTTNG_CONSUMER_UNKNOWN:
122 assert(0);
123 break;
124 }
125 }
126
127 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
128 uint64_t stream_id)
129 {
130 int ret;
131 struct ctf_packet_index index;
132
133 memset(&index, 0, sizeof(index));
134 index.stream_id = htobe64(stream_id);
135 index.timestamp_end = htobe64(ts);
136 ret = consumer_stream_write_index(stream, &index);
137 if (ret < 0) {
138 goto error;
139 }
140
141 error:
142 return ret;
143 }
144
145 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
146 {
147 uint64_t ts, stream_id;
148 int ret;
149
150 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
151 if (ret < 0) {
152 ERR("Failed to get the current timestamp");
153 goto end;
154 }
155 ret = kernctl_buffer_flush(stream->wait_fd);
156 if (ret < 0) {
157 ERR("Failed to flush kernel stream");
158 goto end;
159 }
160 ret = kernctl_snapshot(stream->wait_fd);
161 if (ret < 0) {
162 if (ret != -EAGAIN && ret != -ENODATA) {
163 PERROR("live timer kernel snapshot");
164 ret = -1;
165 goto end;
166 }
167 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
168 if (ret < 0) {
169 PERROR("kernctl_get_stream_id");
170 goto end;
171 }
172 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
173 ret = send_empty_index(stream, ts, stream_id);
174 if (ret < 0) {
175 goto end;
176 }
177 }
178 ret = 0;
179 end:
180 return ret;
181 }
182
183 static int check_stream(struct lttng_consumer_stream *stream,
184 flush_index_cb flush_index)
185 {
186 int ret;
187
188 /*
189 * While holding the stream mutex, try to take a snapshot, if it
190 * succeeds, it means that data is ready to be sent, just let the data
191 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
192 * means that there is no data to read after the flush, so we can
193 * safely send the empty index.
194 *
195 * Doing a trylock and checking if waiting on metadata if
196 * trylock fails. Bail out of the stream is indeed waiting for
197 * metadata to be pushed. Busy wait on trylock otherwise.
198 */
199 for (;;) {
200 ret = pthread_mutex_trylock(&stream->lock);
201 switch (ret) {
202 case 0:
203 break; /* We have the lock. */
204 case EBUSY:
205 pthread_mutex_lock(&stream->metadata_timer_lock);
206 if (stream->waiting_on_metadata) {
207 ret = 0;
208 stream->missed_metadata_flush = true;
209 pthread_mutex_unlock(&stream->metadata_timer_lock);
210 goto end; /* Bail out. */
211 }
212 pthread_mutex_unlock(&stream->metadata_timer_lock);
213 /* Try again. */
214 caa_cpu_relax();
215 continue;
216 default:
217 ERR("Unexpected pthread_mutex_trylock error %d", ret);
218 ret = -1;
219 goto end;
220 }
221 break;
222 }
223 ret = flush_index(stream);
224 pthread_mutex_unlock(&stream->lock);
225 end:
226 return ret;
227 }
228
229 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
230 {
231 uint64_t ts, stream_id;
232 int ret;
233
234 ret = cds_lfht_is_node_deleted(&stream->node.node);
235 if (ret) {
236 goto end;
237 }
238
239 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
240 if (ret < 0) {
241 ERR("Failed to get the current timestamp");
242 goto end;
243 }
244 lttng_ustconsumer_flush_buffer(stream, 1);
245 ret = lttng_ustconsumer_take_snapshot(stream);
246 if (ret < 0) {
247 if (ret != -EAGAIN) {
248 ERR("Taking UST snapshot");
249 ret = -1;
250 goto end;
251 }
252 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
253 if (ret < 0) {
254 PERROR("ustctl_get_stream_id");
255 goto end;
256 }
257 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
258 ret = send_empty_index(stream, ts, stream_id);
259 if (ret < 0) {
260 goto end;
261 }
262 }
263 ret = 0;
264 end:
265 return ret;
266 }
267
268 /*
269 * Execute action on a live timer
270 */
271 static void live_timer(struct lttng_consumer_local_data *ctx,
272 int sig, siginfo_t *si, void *uc)
273 {
274 int ret;
275 struct lttng_consumer_channel *channel;
276 struct lttng_consumer_stream *stream;
277 struct lttng_ht_iter iter;
278 const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
279 const flush_index_cb flush_index =
280 ctx->type == LTTNG_CONSUMER_KERNEL ?
281 consumer_flush_kernel_index :
282 consumer_flush_ust_index;
283
284 channel = si->si_value.sival_ptr;
285 assert(channel);
286
287 if (channel->switch_timer_error) {
288 goto error;
289 }
290
291 DBG("Live timer for channel %" PRIu64, channel->key);
292 if (channel->live_timer_enabled != 1) {
293 DBG("Liver timer was stopped before the iteration. Quitting early.");
294 goto skip;
295 }
296
297 rcu_read_lock();
298 cds_lfht_for_each_entry_duplicate(ht->ht,
299 ht->hash_fct(&channel->key, lttng_ht_seed),
300 ht->match_fct, &channel->key, &iter.iter,
301 stream, node_channel_id.node) {
302 if (channel->live_timer_enabled != 1) {
303 DBG("Liver timer was stopped during the iteration. Quitting early.");
304 goto skip_unlock;
305 }
306
307 ret = check_stream(stream, flush_index);
308 if (ret < 0) {
309 goto error_unlock;
310 }
311 }
312
313 skip_unlock:
314 error_unlock:
315 rcu_read_unlock();
316
317 skip:
318 error:
319 return;
320 }
321
322 static
323 void consumer_timer_signal_thread_qs(unsigned int signr)
324 {
325 sigset_t pending_set;
326 int ret;
327
328 /*
329 * We need to be the only thread interacting with the thread
330 * that manages signals for teardown synchronization.
331 */
332 pthread_mutex_lock(&timer_signal.lock);
333
334 /* Ensure we don't have any signal queued for this channel. */
335 for (;;) {
336 ret = sigemptyset(&pending_set);
337 if (ret == -1) {
338 PERROR("sigemptyset");
339 }
340 ret = sigpending(&pending_set);
341 if (ret == -1) {
342 PERROR("sigpending");
343 }
344 if (!sigismember(&pending_set, signr)) {
345 break;
346 }
347 caa_cpu_relax();
348 }
349
350 /*
351 * From this point, no new signal handler will be fired that would try to
352 * access "chan". However, we still need to wait for any currently
353 * executing handler to complete.
354 */
355 cmm_smp_mb();
356 CMM_STORE_SHARED(timer_signal.qs_done, 0);
357 cmm_smp_mb();
358
359 /*
360 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
361 * up.
362 */
363 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
364
365 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
366 caa_cpu_relax();
367 }
368 cmm_smp_mb();
369
370 pthread_mutex_unlock(&timer_signal.lock);
371 }
372
373 /*
374 * Set the timer for periodical metadata flush.
375 */
376 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
377 unsigned int switch_timer_interval)
378 {
379 int ret;
380 struct sigevent sev;
381 struct itimerspec its;
382
383 assert(channel);
384 assert(channel->key);
385
386 if (switch_timer_interval == 0) {
387 return;
388 }
389
390 sev.sigev_notify = SIGEV_SIGNAL;
391 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
392 sev.sigev_value.sival_ptr = channel;
393 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
394 if (ret == -1) {
395 PERROR("timer_create");
396 }
397 channel->switch_timer_enabled = 1;
398
399 its.it_value.tv_sec = switch_timer_interval / 1000000;
400 its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
401 its.it_interval.tv_sec = its.it_value.tv_sec;
402 its.it_interval.tv_nsec = its.it_value.tv_nsec;
403
404 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
405 if (ret == -1) {
406 PERROR("timer_settime");
407 }
408 }
409
410 /*
411 * Stop and delete timer.
412 */
413 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
414 {
415 int ret;
416
417 assert(channel);
418
419 ret = timer_delete(channel->switch_timer);
420 if (ret == -1) {
421 PERROR("timer_delete");
422 }
423
424 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
425
426 channel->switch_timer = 0;
427 channel->switch_timer_enabled = 0;
428 }
429
430 /*
431 * Set the timer for the live mode.
432 */
433 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
434 int live_timer_interval)
435 {
436 int ret;
437 struct sigevent sev;
438 struct itimerspec its;
439
440 assert(channel);
441 assert(channel->key);
442
443 if (live_timer_interval <= 0) {
444 return;
445 }
446
447 sev.sigev_notify = SIGEV_SIGNAL;
448 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
449 sev.sigev_value.sival_ptr = channel;
450 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
451 if (ret == -1) {
452 PERROR("timer_create");
453 }
454 channel->live_timer_enabled = 1;
455
456 its.it_value.tv_sec = live_timer_interval / 1000000;
457 its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
458 its.it_interval.tv_sec = its.it_value.tv_sec;
459 its.it_interval.tv_nsec = its.it_value.tv_nsec;
460
461 ret = timer_settime(channel->live_timer, 0, &its, NULL);
462 if (ret == -1) {
463 PERROR("timer_settime");
464 }
465 }
466
467 /*
468 * Stop and delete timer.
469 */
470 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
471 {
472 int ret;
473
474 assert(channel);
475
476 if (!channel->live_timer) {
477 return;
478 }
479
480 ret = timer_delete(channel->live_timer);
481 if (ret == -1) {
482 PERROR("timer_delete");
483 }
484
485 channel->live_timer = 0;
486 channel->live_timer_enabled = 0;
487
488 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
489 }
490
491 /*
492 * Block the RT signals for the entire process. It must be called from the
493 * consumer main before creating the threads
494 */
495 int consumer_signal_init(void)
496 {
497 int ret;
498 sigset_t mask;
499
500 /* Block signal for entire process, so only our thread processes it. */
501 setmask(&mask);
502 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
503 if (ret) {
504 errno = ret;
505 PERROR("pthread_sigmask");
506 return -1;
507 }
508 return 0;
509 }
510
511 /*
512 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
513 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
514 * LTTNG_CONSUMER_SIG_EXIT.
515 */
516 void *consumer_timer_thread(void *data)
517 {
518 int signr;
519 sigset_t mask;
520 siginfo_t info;
521 struct lttng_consumer_local_data *ctx = data;
522
523 rcu_register_thread();
524
525 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
526
527 if (testpoint(consumerd_thread_metadata_timer)) {
528 goto error_testpoint;
529 }
530
531 health_code_update();
532
533 /* Only self thread will receive signal mask. */
534 setmask(&mask);
535 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
536
537 while (1) {
538 health_code_update();
539
540 health_poll_entry();
541 signr = sigwaitinfo(&mask, &info);
542 health_poll_exit();
543 if (signr == -1) {
544 if (errno != EINTR) {
545 PERROR("sigwaitinfo");
546 }
547 continue;
548 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
549 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
550 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
551 cmm_smp_mb();
552 CMM_STORE_SHARED(timer_signal.qs_done, 1);
553 cmm_smp_mb();
554 DBG("Signal timer metadata thread teardown");
555 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
556 live_timer(ctx, info.si_signo, &info, NULL);
557 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
558 assert(consumer_quit);
559 goto end;
560 } else {
561 ERR("Unexpected signal %d\n", info.si_signo);
562 }
563 }
564
565 error_testpoint:
566 /* Only reached in testpoint error */
567 health_error();
568 end:
569 health_unregister(health_consumerd);
570 rcu_unregister_thread();
571 return NULL;
572 }
This page took 0.04203 seconds and 5 git commands to generate.