Mi start command: support and validation
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
29#include <common/consumer-stream.h>
331744e3
JD
30
31#include "consumer-timer.h"
2d57de81 32#include "consumer-testpoint.h"
331744e3
JD
33#include "ust-consumer/ust-consumer.h"
34
2b8f8754
MD
35static struct timer_signal_data timer_signal = {
36 .tid = 0,
37 .setup_done = 0,
38 .qs_done = 0,
39 .lock = PTHREAD_MUTEX_INITIALIZER,
40};
331744e3
JD
41
42/*
43 * Set custom signal mask to current thread.
44 */
45static void setmask(sigset_t *mask)
46{
47 int ret;
48
49 ret = sigemptyset(mask);
50 if (ret) {
51 PERROR("sigemptyset");
52 }
53 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
54 if (ret) {
d3e2ba59 55 PERROR("sigaddset switch");
331744e3
JD
56 }
57 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
58 if (ret) {
d3e2ba59
JD
59 PERROR("sigaddset teardown");
60 }
61 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
62 if (ret) {
63 PERROR("sigaddset live");
331744e3
JD
64 }
65}
66
67/*
68 * Execute action on a timer switch.
d98a47c7
MD
69 *
70 * Beware: metadata_switch_timer() should *never* take a mutex also held
71 * while consumer_timer_switch_stop() is called. It would result in
72 * deadlocks.
331744e3
JD
73 */
74static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
75 int sig, siginfo_t *si, void *uc)
76{
77 int ret;
78 struct lttng_consumer_channel *channel;
79
80 channel = si->si_value.sival_ptr;
81 assert(channel);
82
4419b4fb
MD
83 if (channel->switch_timer_error) {
84 return;
85 }
86
331744e3
JD
87 DBG("Switch timer for channel %" PRIu64, channel->key);
88 switch (ctx->type) {
89 case LTTNG_CONSUMER32_UST:
90 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
91 /*
92 * Locks taken by lttng_ustconsumer_request_metadata():
93 * - metadata_socket_lock
94 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 95 * - channel->metadata_cache->lock
4fa3dc0e 96 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
97 * - channel->timer_lock
98 * - channel->metadata_cache->lock
4fa3dc0e 99 *
5e41ebe1
MD
100 * Ensure that neither consumer_data.lock nor
101 * channel->lock are taken within this function, since
102 * they are held while consumer_timer_switch_stop() is
103 * called.
4fa3dc0e 104 */
94d49140 105 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 106 if (ret < 0) {
4419b4fb 107 channel->switch_timer_error = 1;
331744e3
JD
108 }
109 break;
110 case LTTNG_CONSUMER_KERNEL:
111 case LTTNG_CONSUMER_UNKNOWN:
112 assert(0);
113 break;
114 }
115}
116
528f2ffa
JD
117static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
118 uint64_t stream_id)
d3e2ba59
JD
119{
120 int ret;
50adc264 121 struct ctf_packet_index index;
d3e2ba59
JD
122
123 memset(&index, 0, sizeof(index));
528f2ffa 124 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
125 index.timestamp_end = htobe64(ts);
126 ret = consumer_stream_write_index(stream, &index);
127 if (ret < 0) {
128 goto error;
129 }
130
131error:
132 return ret;
133}
134
135static int check_kernel_stream(struct lttng_consumer_stream *stream)
136{
528f2ffa 137 uint64_t ts, stream_id;
d3e2ba59
JD
138 int ret;
139
140 /*
141 * While holding the stream mutex, try to take a snapshot, if it
142 * succeeds, it means that data is ready to be sent, just let the data
143 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
144 * means that there is no data to read after the flush, so we can
145 * safely send the empty index.
146 */
147 pthread_mutex_lock(&stream->lock);
148 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
149 if (ret < 0) {
150 ERR("Failed to get the current timestamp");
151 goto error_unlock;
152 }
153 ret = kernctl_buffer_flush(stream->wait_fd);
154 if (ret < 0) {
155 ERR("Failed to flush kernel stream");
156 goto error_unlock;
157 }
158 ret = kernctl_snapshot(stream->wait_fd);
159 if (ret < 0) {
08b1dcd3
DG
160 if (errno != EAGAIN && errno != ENODATA) {
161 PERROR("live timer kernel snapshot");
d3e2ba59
JD
162 ret = -1;
163 goto error_unlock;
164 }
528f2ffa
JD
165 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
166 if (ret < 0) {
167 PERROR("kernctl_get_stream_id");
168 goto error_unlock;
169 }
d3e2ba59 170 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 171 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
172 if (ret < 0) {
173 goto error_unlock;
174 }
175 }
176 ret = 0;
177
178error_unlock:
179 pthread_mutex_unlock(&stream->lock);
180 return ret;
181}
182
183static int check_ust_stream(struct lttng_consumer_stream *stream)
184{
528f2ffa 185 uint64_t ts, stream_id;
d3e2ba59
JD
186 int ret;
187
188 assert(stream);
189 assert(stream->ustream);
190 /*
191 * While holding the stream mutex, try to take a snapshot, if it
192 * succeeds, it means that data is ready to be sent, just let the data
193 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
194 * means that there is no data to read after the flush, so we can
195 * safely send the empty index.
196 */
197 pthread_mutex_lock(&stream->lock);
94d49140
JD
198 ret = cds_lfht_is_node_deleted(&stream->node.node);
199 if (ret) {
200 goto error_unlock;
201 }
202
84a182ce 203 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
204 if (ret < 0) {
205 ERR("Failed to get the current timestamp");
206 goto error_unlock;
207 }
84a182ce
DG
208 lttng_ustconsumer_flush_buffer(stream, 1);
209 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 210 if (ret < 0) {
94d49140 211 if (ret != -EAGAIN) {
d3e2ba59
JD
212 ERR("Taking UST snapshot");
213 ret = -1;
214 goto error_unlock;
215 }
70190e1c 216 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
217 if (ret < 0) {
218 PERROR("ustctl_get_stream_id");
219 goto error_unlock;
220 }
d3e2ba59 221 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 222 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
223 if (ret < 0) {
224 goto error_unlock;
225 }
226 }
227 ret = 0;
228
229error_unlock:
230 pthread_mutex_unlock(&stream->lock);
231 return ret;
232}
233
234/*
235 * Execute action on a live timer
236 */
237static void live_timer(struct lttng_consumer_local_data *ctx,
238 int sig, siginfo_t *si, void *uc)
239{
240 int ret;
241 struct lttng_consumer_channel *channel;
242 struct lttng_consumer_stream *stream;
243 struct lttng_ht *ht;
244 struct lttng_ht_iter iter;
245
246 channel = si->si_value.sival_ptr;
247 assert(channel);
248
249 if (channel->switch_timer_error) {
250 goto error;
251 }
252 ht = consumer_data.stream_per_chan_id_ht;
253
254 DBG("Live timer for channel %" PRIu64, channel->key);
255
256 rcu_read_lock();
257 switch (ctx->type) {
258 case LTTNG_CONSUMER32_UST:
259 case LTTNG_CONSUMER64_UST:
260 cds_lfht_for_each_entry_duplicate(ht->ht,
261 ht->hash_fct(&channel->key, lttng_ht_seed),
262 ht->match_fct, &channel->key, &iter.iter,
263 stream, node_channel_id.node) {
264 ret = check_ust_stream(stream);
265 if (ret < 0) {
266 goto error_unlock;
267 }
268 }
269 break;
270 case LTTNG_CONSUMER_KERNEL:
271 cds_lfht_for_each_entry_duplicate(ht->ht,
272 ht->hash_fct(&channel->key, lttng_ht_seed),
273 ht->match_fct, &channel->key, &iter.iter,
274 stream, node_channel_id.node) {
275 ret = check_kernel_stream(stream);
276 if (ret < 0) {
277 goto error_unlock;
278 }
279 }
280 break;
281 case LTTNG_CONSUMER_UNKNOWN:
282 assert(0);
283 break;
284 }
285
286error_unlock:
287 rcu_read_unlock();
288
289error:
290 return;
291}
292
2b8f8754
MD
293static
294void consumer_timer_signal_thread_qs(unsigned int signr)
295{
296 sigset_t pending_set;
297 int ret;
298
299 /*
300 * We need to be the only thread interacting with the thread
301 * that manages signals for teardown synchronization.
302 */
303 pthread_mutex_lock(&timer_signal.lock);
304
305 /* Ensure we don't have any signal queued for this channel. */
306 for (;;) {
307 ret = sigemptyset(&pending_set);
308 if (ret == -1) {
309 PERROR("sigemptyset");
310 }
311 ret = sigpending(&pending_set);
312 if (ret == -1) {
313 PERROR("sigpending");
314 }
315 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
316 break;
317 }
318 caa_cpu_relax();
319 }
320
321 /*
322 * From this point, no new signal handler will be fired that would try to
323 * access "chan". However, we still need to wait for any currently
324 * executing handler to complete.
325 */
326 cmm_smp_mb();
327 CMM_STORE_SHARED(timer_signal.qs_done, 0);
328 cmm_smp_mb();
329
330 /*
331 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
332 * up.
333 */
334 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
335
336 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
337 caa_cpu_relax();
338 }
339 cmm_smp_mb();
340
341 pthread_mutex_unlock(&timer_signal.lock);
342}
343
331744e3
JD
344/*
345 * Set the timer for periodical metadata flush.
346 */
347void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
348 unsigned int switch_timer_interval)
349{
350 int ret;
351 struct sigevent sev;
352 struct itimerspec its;
353
354 assert(channel);
355 assert(channel->key);
356
357 if (switch_timer_interval == 0) {
358 return;
359 }
360
361 sev.sigev_notify = SIGEV_SIGNAL;
362 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
363 sev.sigev_value.sival_ptr = channel;
364 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
365 if (ret == -1) {
366 PERROR("timer_create");
367 }
368 channel->switch_timer_enabled = 1;
369
370 its.it_value.tv_sec = switch_timer_interval / 1000000;
371 its.it_value.tv_nsec = switch_timer_interval % 1000000;
372 its.it_interval.tv_sec = its.it_value.tv_sec;
373 its.it_interval.tv_nsec = its.it_value.tv_nsec;
374
375 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
376 if (ret == -1) {
377 PERROR("timer_settime");
378 }
379}
380
381/*
382 * Stop and delete timer.
383 */
384void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
385{
386 int ret;
331744e3
JD
387
388 assert(channel);
389
390 ret = timer_delete(channel->switch_timer);
391 if (ret == -1) {
392 PERROR("timer_delete");
393 }
394
2b8f8754 395 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 396
2b8f8754
MD
397 channel->switch_timer = 0;
398 channel->switch_timer_enabled = 0;
331744e3
JD
399}
400
d3e2ba59
JD
401/*
402 * Set the timer for the live mode.
403 */
404void consumer_timer_live_start(struct lttng_consumer_channel *channel,
405 int live_timer_interval)
406{
407 int ret;
408 struct sigevent sev;
409 struct itimerspec its;
410
411 assert(channel);
412 assert(channel->key);
413
fac41e72 414 if (live_timer_interval <= 0) {
d3e2ba59
JD
415 return;
416 }
417
418 sev.sigev_notify = SIGEV_SIGNAL;
419 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
420 sev.sigev_value.sival_ptr = channel;
421 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
422 if (ret == -1) {
423 PERROR("timer_create");
424 }
425 channel->live_timer_enabled = 1;
426
427 its.it_value.tv_sec = live_timer_interval / 1000000;
428 its.it_value.tv_nsec = live_timer_interval % 1000000;
429 its.it_interval.tv_sec = its.it_value.tv_sec;
430 its.it_interval.tv_nsec = its.it_value.tv_nsec;
431
432 ret = timer_settime(channel->live_timer, 0, &its, NULL);
433 if (ret == -1) {
434 PERROR("timer_settime");
435 }
436}
437
438/*
439 * Stop and delete timer.
440 */
441void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
442{
443 int ret;
444
445 assert(channel);
446
447 ret = timer_delete(channel->live_timer);
448 if (ret == -1) {
449 PERROR("timer_delete");
450 }
451
452 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
453
454 channel->live_timer = 0;
455 channel->live_timer_enabled = 0;
456}
457
331744e3
JD
458/*
459 * Block the RT signals for the entire process. It must be called from the
460 * consumer main before creating the threads
461 */
462void consumer_signal_init(void)
463{
464 int ret;
465 sigset_t mask;
466
467 /* Block signal for entire process, so only our thread processes it. */
468 setmask(&mask);
469 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
470 if (ret) {
471 errno = ret;
472 PERROR("pthread_sigmask");
473 }
474}
475
476/*
d3e2ba59
JD
477 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
478 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 479 */
d3e2ba59 480void *consumer_timer_thread(void *data)
331744e3
JD
481{
482 int signr;
483 sigset_t mask;
484 siginfo_t info;
485 struct lttng_consumer_local_data *ctx = data;
486
1fc79fb4
MD
487 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
488
2d57de81
MD
489 if (testpoint(consumerd_thread_metadata_timer)) {
490 goto error_testpoint;
491 }
492
9ce5646a
MD
493 health_code_update();
494
331744e3
JD
495 /* Only self thread will receive signal mask. */
496 setmask(&mask);
497 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
498
499 while (1) {
9ce5646a
MD
500 health_code_update();
501
502 health_poll_entry();
331744e3 503 signr = sigwaitinfo(&mask, &info);
9ce5646a 504 health_poll_exit();
331744e3
JD
505 if (signr == -1) {
506 if (errno != EINTR) {
507 PERROR("sigwaitinfo");
508 }
509 continue;
510 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
511 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
512 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
513 cmm_smp_mb();
514 CMM_STORE_SHARED(timer_signal.qs_done, 1);
515 cmm_smp_mb();
516 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
517 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
518 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
519 } else {
520 ERR("Unexpected signal %d\n", info.si_signo);
521 }
522 }
523
2d57de81
MD
524error_testpoint:
525 /* Only reached in testpoint error */
526 health_error();
1fc79fb4
MD
527 health_unregister(health_consumerd);
528
529 /* Never return */
331744e3
JD
530 return NULL;
531}
This page took 0.055857 seconds and 5 git commands to generate.