Add git commit id to the version if it's not a tag
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
29#include <common/consumer-stream.h>
331744e3
JD
30
31#include "consumer-timer.h"
2d57de81 32#include "consumer-testpoint.h"
331744e3
JD
33#include "ust-consumer/ust-consumer.h"
34
2b8f8754
MD
35static struct timer_signal_data timer_signal = {
36 .tid = 0,
37 .setup_done = 0,
38 .qs_done = 0,
39 .lock = PTHREAD_MUTEX_INITIALIZER,
40};
331744e3
JD
41
42/*
43 * Set custom signal mask to current thread.
44 */
45static void setmask(sigset_t *mask)
46{
47 int ret;
48
49 ret = sigemptyset(mask);
50 if (ret) {
51 PERROR("sigemptyset");
52 }
53 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
54 if (ret) {
d3e2ba59 55 PERROR("sigaddset switch");
331744e3
JD
56 }
57 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
58 if (ret) {
d3e2ba59
JD
59 PERROR("sigaddset teardown");
60 }
61 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
62 if (ret) {
63 PERROR("sigaddset live");
331744e3
JD
64 }
65}
66
67/*
68 * Execute action on a timer switch.
d98a47c7
MD
69 *
70 * Beware: metadata_switch_timer() should *never* take a mutex also held
71 * while consumer_timer_switch_stop() is called. It would result in
72 * deadlocks.
331744e3
JD
73 */
74static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
75 int sig, siginfo_t *si, void *uc)
76{
77 int ret;
78 struct lttng_consumer_channel *channel;
79
80 channel = si->si_value.sival_ptr;
81 assert(channel);
82
4419b4fb
MD
83 if (channel->switch_timer_error) {
84 return;
85 }
86
331744e3
JD
87 DBG("Switch timer for channel %" PRIu64, channel->key);
88 switch (ctx->type) {
89 case LTTNG_CONSUMER32_UST:
90 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
91 /*
92 * Locks taken by lttng_ustconsumer_request_metadata():
93 * - metadata_socket_lock
94 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 95 * - channel->metadata_cache->lock
4fa3dc0e 96 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
97 * - channel->timer_lock
98 * - channel->metadata_cache->lock
4fa3dc0e 99 *
5e41ebe1
MD
100 * Ensure that neither consumer_data.lock nor
101 * channel->lock are taken within this function, since
102 * they are held while consumer_timer_switch_stop() is
103 * called.
4fa3dc0e 104 */
94d49140 105 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 106 if (ret < 0) {
4419b4fb 107 channel->switch_timer_error = 1;
331744e3
JD
108 }
109 break;
110 case LTTNG_CONSUMER_KERNEL:
111 case LTTNG_CONSUMER_UNKNOWN:
112 assert(0);
113 break;
114 }
115}
116
d3e2ba59
JD
117static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
118{
119 int ret;
50adc264 120 struct ctf_packet_index index;
d3e2ba59
JD
121
122 memset(&index, 0, sizeof(index));
123 index.timestamp_end = htobe64(ts);
124 ret = consumer_stream_write_index(stream, &index);
125 if (ret < 0) {
126 goto error;
127 }
128
129error:
130 return ret;
131}
132
133static int check_kernel_stream(struct lttng_consumer_stream *stream)
134{
135 uint64_t ts;
136 int ret;
137
138 /*
139 * While holding the stream mutex, try to take a snapshot, if it
140 * succeeds, it means that data is ready to be sent, just let the data
141 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
142 * means that there is no data to read after the flush, so we can
143 * safely send the empty index.
144 */
145 pthread_mutex_lock(&stream->lock);
146 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
147 if (ret < 0) {
148 ERR("Failed to get the current timestamp");
149 goto error_unlock;
150 }
151 ret = kernctl_buffer_flush(stream->wait_fd);
152 if (ret < 0) {
153 ERR("Failed to flush kernel stream");
154 goto error_unlock;
155 }
156 ret = kernctl_snapshot(stream->wait_fd);
157 if (ret < 0) {
08b1dcd3
DG
158 if (errno != EAGAIN && errno != ENODATA) {
159 PERROR("live timer kernel snapshot");
d3e2ba59
JD
160 ret = -1;
161 goto error_unlock;
162 }
163 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
164 ret = send_empty_index(stream, ts);
165 if (ret < 0) {
166 goto error_unlock;
167 }
168 }
169 ret = 0;
170
171error_unlock:
172 pthread_mutex_unlock(&stream->lock);
173 return ret;
174}
175
176static int check_ust_stream(struct lttng_consumer_stream *stream)
177{
178 uint64_t ts;
179 int ret;
180
181 assert(stream);
182 assert(stream->ustream);
183 /*
184 * While holding the stream mutex, try to take a snapshot, if it
185 * succeeds, it means that data is ready to be sent, just let the data
186 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
187 * means that there is no data to read after the flush, so we can
188 * safely send the empty index.
189 */
190 pthread_mutex_lock(&stream->lock);
94d49140
JD
191 ret = cds_lfht_is_node_deleted(&stream->node.node);
192 if (ret) {
193 goto error_unlock;
194 }
195
84a182ce 196 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
197 if (ret < 0) {
198 ERR("Failed to get the current timestamp");
199 goto error_unlock;
200 }
84a182ce
DG
201 lttng_ustconsumer_flush_buffer(stream, 1);
202 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 203 if (ret < 0) {
94d49140 204 if (ret != -EAGAIN) {
d3e2ba59
JD
205 ERR("Taking UST snapshot");
206 ret = -1;
207 goto error_unlock;
208 }
209 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
210 ret = send_empty_index(stream, ts);
211 if (ret < 0) {
212 goto error_unlock;
213 }
214 }
215 ret = 0;
216
217error_unlock:
218 pthread_mutex_unlock(&stream->lock);
219 return ret;
220}
221
222/*
223 * Execute action on a live timer
224 */
225static void live_timer(struct lttng_consumer_local_data *ctx,
226 int sig, siginfo_t *si, void *uc)
227{
228 int ret;
229 struct lttng_consumer_channel *channel;
230 struct lttng_consumer_stream *stream;
231 struct lttng_ht *ht;
232 struct lttng_ht_iter iter;
233
234 channel = si->si_value.sival_ptr;
235 assert(channel);
236
237 if (channel->switch_timer_error) {
238 goto error;
239 }
240 ht = consumer_data.stream_per_chan_id_ht;
241
242 DBG("Live timer for channel %" PRIu64, channel->key);
243
244 rcu_read_lock();
245 switch (ctx->type) {
246 case LTTNG_CONSUMER32_UST:
247 case LTTNG_CONSUMER64_UST:
248 cds_lfht_for_each_entry_duplicate(ht->ht,
249 ht->hash_fct(&channel->key, lttng_ht_seed),
250 ht->match_fct, &channel->key, &iter.iter,
251 stream, node_channel_id.node) {
252 ret = check_ust_stream(stream);
253 if (ret < 0) {
254 goto error_unlock;
255 }
256 }
257 break;
258 case LTTNG_CONSUMER_KERNEL:
259 cds_lfht_for_each_entry_duplicate(ht->ht,
260 ht->hash_fct(&channel->key, lttng_ht_seed),
261 ht->match_fct, &channel->key, &iter.iter,
262 stream, node_channel_id.node) {
263 ret = check_kernel_stream(stream);
264 if (ret < 0) {
265 goto error_unlock;
266 }
267 }
268 break;
269 case LTTNG_CONSUMER_UNKNOWN:
270 assert(0);
271 break;
272 }
273
274error_unlock:
275 rcu_read_unlock();
276
277error:
278 return;
279}
280
2b8f8754
MD
281static
282void consumer_timer_signal_thread_qs(unsigned int signr)
283{
284 sigset_t pending_set;
285 int ret;
286
287 /*
288 * We need to be the only thread interacting with the thread
289 * that manages signals for teardown synchronization.
290 */
291 pthread_mutex_lock(&timer_signal.lock);
292
293 /* Ensure we don't have any signal queued for this channel. */
294 for (;;) {
295 ret = sigemptyset(&pending_set);
296 if (ret == -1) {
297 PERROR("sigemptyset");
298 }
299 ret = sigpending(&pending_set);
300 if (ret == -1) {
301 PERROR("sigpending");
302 }
303 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
304 break;
305 }
306 caa_cpu_relax();
307 }
308
309 /*
310 * From this point, no new signal handler will be fired that would try to
311 * access "chan". However, we still need to wait for any currently
312 * executing handler to complete.
313 */
314 cmm_smp_mb();
315 CMM_STORE_SHARED(timer_signal.qs_done, 0);
316 cmm_smp_mb();
317
318 /*
319 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
320 * up.
321 */
322 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
323
324 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
325 caa_cpu_relax();
326 }
327 cmm_smp_mb();
328
329 pthread_mutex_unlock(&timer_signal.lock);
330}
331
331744e3
JD
332/*
333 * Set the timer for periodical metadata flush.
334 */
335void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
336 unsigned int switch_timer_interval)
337{
338 int ret;
339 struct sigevent sev;
340 struct itimerspec its;
341
342 assert(channel);
343 assert(channel->key);
344
345 if (switch_timer_interval == 0) {
346 return;
347 }
348
349 sev.sigev_notify = SIGEV_SIGNAL;
350 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
351 sev.sigev_value.sival_ptr = channel;
352 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
353 if (ret == -1) {
354 PERROR("timer_create");
355 }
356 channel->switch_timer_enabled = 1;
357
358 its.it_value.tv_sec = switch_timer_interval / 1000000;
359 its.it_value.tv_nsec = switch_timer_interval % 1000000;
360 its.it_interval.tv_sec = its.it_value.tv_sec;
361 its.it_interval.tv_nsec = its.it_value.tv_nsec;
362
363 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
364 if (ret == -1) {
365 PERROR("timer_settime");
366 }
367}
368
369/*
370 * Stop and delete timer.
371 */
372void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
373{
374 int ret;
331744e3
JD
375
376 assert(channel);
377
378 ret = timer_delete(channel->switch_timer);
379 if (ret == -1) {
380 PERROR("timer_delete");
381 }
382
2b8f8754 383 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 384
2b8f8754
MD
385 channel->switch_timer = 0;
386 channel->switch_timer_enabled = 0;
331744e3
JD
387}
388
d3e2ba59
JD
389/*
390 * Set the timer for the live mode.
391 */
392void consumer_timer_live_start(struct lttng_consumer_channel *channel,
393 int live_timer_interval)
394{
395 int ret;
396 struct sigevent sev;
397 struct itimerspec its;
398
399 assert(channel);
400 assert(channel->key);
401
fac41e72 402 if (live_timer_interval <= 0) {
d3e2ba59
JD
403 return;
404 }
405
406 sev.sigev_notify = SIGEV_SIGNAL;
407 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
408 sev.sigev_value.sival_ptr = channel;
409 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
410 if (ret == -1) {
411 PERROR("timer_create");
412 }
413 channel->live_timer_enabled = 1;
414
415 its.it_value.tv_sec = live_timer_interval / 1000000;
416 its.it_value.tv_nsec = live_timer_interval % 1000000;
417 its.it_interval.tv_sec = its.it_value.tv_sec;
418 its.it_interval.tv_nsec = its.it_value.tv_nsec;
419
420 ret = timer_settime(channel->live_timer, 0, &its, NULL);
421 if (ret == -1) {
422 PERROR("timer_settime");
423 }
424}
425
426/*
427 * Stop and delete timer.
428 */
429void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
430{
431 int ret;
432
433 assert(channel);
434
435 ret = timer_delete(channel->live_timer);
436 if (ret == -1) {
437 PERROR("timer_delete");
438 }
439
440 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
441
442 channel->live_timer = 0;
443 channel->live_timer_enabled = 0;
444}
445
331744e3
JD
446/*
447 * Block the RT signals for the entire process. It must be called from the
448 * consumer main before creating the threads
449 */
450void consumer_signal_init(void)
451{
452 int ret;
453 sigset_t mask;
454
455 /* Block signal for entire process, so only our thread processes it. */
456 setmask(&mask);
457 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
458 if (ret) {
459 errno = ret;
460 PERROR("pthread_sigmask");
461 }
462}
463
464/*
d3e2ba59
JD
465 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
466 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 467 */
d3e2ba59 468void *consumer_timer_thread(void *data)
331744e3
JD
469{
470 int signr;
471 sigset_t mask;
472 siginfo_t info;
473 struct lttng_consumer_local_data *ctx = data;
474
1fc79fb4
MD
475 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
476
2d57de81
MD
477 if (testpoint(consumerd_thread_metadata_timer)) {
478 goto error_testpoint;
479 }
480
9ce5646a
MD
481 health_code_update();
482
331744e3
JD
483 /* Only self thread will receive signal mask. */
484 setmask(&mask);
485 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
486
487 while (1) {
9ce5646a
MD
488 health_code_update();
489
490 health_poll_entry();
331744e3 491 signr = sigwaitinfo(&mask, &info);
9ce5646a 492 health_poll_exit();
331744e3
JD
493 if (signr == -1) {
494 if (errno != EINTR) {
495 PERROR("sigwaitinfo");
496 }
497 continue;
498 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
499 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
500 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
501 cmm_smp_mb();
502 CMM_STORE_SHARED(timer_signal.qs_done, 1);
503 cmm_smp_mb();
504 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
505 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
506 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
507 } else {
508 ERR("Unexpected signal %d\n", info.si_signo);
509 }
510 }
511
2d57de81
MD
512error_testpoint:
513 /* Only reached in testpoint error */
514 health_error();
1fc79fb4
MD
515 health_unregister(health_consumerd);
516
517 /* Never return */
331744e3
JD
518 return NULL;
519}
This page took 0.05502 seconds and 5 git commands to generate.