Rename LTTng index in CTF index
[lttng-tools.git] / src / common / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/kernel-ctl/kernel-ctl.h>
27 #include <common/kernel-consumer/kernel-consumer.h>
28 #include <common/consumer-stream.h>
29
30 #include "consumer-timer.h"
31 #include "ust-consumer/ust-consumer.h"
32
33 static struct timer_signal_data timer_signal = {
34 .tid = 0,
35 .setup_done = 0,
36 .qs_done = 0,
37 .lock = PTHREAD_MUTEX_INITIALIZER,
38 };
39
40 /*
41 * Set custom signal mask to current thread.
42 */
43 static void setmask(sigset_t *mask)
44 {
45 int ret;
46
47 ret = sigemptyset(mask);
48 if (ret) {
49 PERROR("sigemptyset");
50 }
51 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
52 if (ret) {
53 PERROR("sigaddset switch");
54 }
55 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
56 if (ret) {
57 PERROR("sigaddset teardown");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
60 if (ret) {
61 PERROR("sigaddset live");
62 }
63 }
64
65 /*
66 * Execute action on a timer switch.
67 *
68 * Beware: metadata_switch_timer() should *never* take a mutex also held
69 * while consumer_timer_switch_stop() is called. It would result in
70 * deadlocks.
71 */
72 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
73 int sig, siginfo_t *si, void *uc)
74 {
75 int ret;
76 struct lttng_consumer_channel *channel;
77
78 channel = si->si_value.sival_ptr;
79 assert(channel);
80
81 if (channel->switch_timer_error) {
82 return;
83 }
84
85 DBG("Switch timer for channel %" PRIu64, channel->key);
86 switch (ctx->type) {
87 case LTTNG_CONSUMER32_UST:
88 case LTTNG_CONSUMER64_UST:
89 /*
90 * Locks taken by lttng_ustconsumer_request_metadata():
91 * - metadata_socket_lock
92 * - Calling lttng_ustconsumer_recv_metadata():
93 * - channel->metadata_cache->lock
94 * - Calling consumer_metadata_cache_flushed():
95 * - channel->timer_lock
96 * - channel->metadata_cache->lock
97 *
98 * Ensure that neither consumer_data.lock nor
99 * channel->lock are taken within this function, since
100 * they are held while consumer_timer_switch_stop() is
101 * called.
102 */
103 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
104 if (ret < 0) {
105 channel->switch_timer_error = 1;
106 }
107 break;
108 case LTTNG_CONSUMER_KERNEL:
109 case LTTNG_CONSUMER_UNKNOWN:
110 assert(0);
111 break;
112 }
113 }
114
115 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
116 {
117 int ret;
118 struct ctf_packet_index index;
119
120 memset(&index, 0, sizeof(index));
121 index.timestamp_end = htobe64(ts);
122 ret = consumer_stream_write_index(stream, &index);
123 if (ret < 0) {
124 goto error;
125 }
126
127 error:
128 return ret;
129 }
130
131 static int check_kernel_stream(struct lttng_consumer_stream *stream)
132 {
133 uint64_t ts;
134 int ret;
135
136 /*
137 * While holding the stream mutex, try to take a snapshot, if it
138 * succeeds, it means that data is ready to be sent, just let the data
139 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
140 * means that there is no data to read after the flush, so we can
141 * safely send the empty index.
142 */
143 pthread_mutex_lock(&stream->lock);
144 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
145 if (ret < 0) {
146 ERR("Failed to get the current timestamp");
147 goto error_unlock;
148 }
149 ret = kernctl_buffer_flush(stream->wait_fd);
150 if (ret < 0) {
151 ERR("Failed to flush kernel stream");
152 goto error_unlock;
153 }
154 ret = kernctl_snapshot(stream->wait_fd);
155 if (ret < 0) {
156 if (errno != EAGAIN) {
157 ERR("Taking kernel snapshot");
158 ret = -1;
159 goto error_unlock;
160 }
161 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
162 ret = send_empty_index(stream, ts);
163 if (ret < 0) {
164 goto error_unlock;
165 }
166 }
167 ret = 0;
168
169 error_unlock:
170 pthread_mutex_unlock(&stream->lock);
171 return ret;
172 }
173
174 static int check_ust_stream(struct lttng_consumer_stream *stream)
175 {
176 uint64_t ts;
177 int ret;
178
179 assert(stream);
180 assert(stream->ustream);
181 /*
182 * While holding the stream mutex, try to take a snapshot, if it
183 * succeeds, it means that data is ready to be sent, just let the data
184 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
185 * means that there is no data to read after the flush, so we can
186 * safely send the empty index.
187 */
188 pthread_mutex_lock(&stream->lock);
189 ret = cds_lfht_is_node_deleted(&stream->node.node);
190 if (ret) {
191 goto error_unlock;
192 }
193
194 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
195 if (ret < 0) {
196 ERR("Failed to get the current timestamp");
197 goto error_unlock;
198 }
199 lttng_ustconsumer_flush_buffer(stream, 1);
200 ret = lttng_ustconsumer_take_snapshot(stream);
201 if (ret < 0) {
202 if (ret != -EAGAIN) {
203 ERR("Taking UST snapshot");
204 ret = -1;
205 goto error_unlock;
206 }
207 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
208 ret = send_empty_index(stream, ts);
209 if (ret < 0) {
210 goto error_unlock;
211 }
212 }
213 ret = 0;
214
215 error_unlock:
216 pthread_mutex_unlock(&stream->lock);
217 return ret;
218 }
219
220 /*
221 * Execute action on a live timer
222 */
223 static void live_timer(struct lttng_consumer_local_data *ctx,
224 int sig, siginfo_t *si, void *uc)
225 {
226 int ret;
227 struct lttng_consumer_channel *channel;
228 struct lttng_consumer_stream *stream;
229 struct lttng_ht *ht;
230 struct lttng_ht_iter iter;
231
232 channel = si->si_value.sival_ptr;
233 assert(channel);
234
235 if (channel->switch_timer_error) {
236 goto error;
237 }
238 ht = consumer_data.stream_per_chan_id_ht;
239
240 DBG("Live timer for channel %" PRIu64, channel->key);
241
242 rcu_read_lock();
243 switch (ctx->type) {
244 case LTTNG_CONSUMER32_UST:
245 case LTTNG_CONSUMER64_UST:
246 cds_lfht_for_each_entry_duplicate(ht->ht,
247 ht->hash_fct(&channel->key, lttng_ht_seed),
248 ht->match_fct, &channel->key, &iter.iter,
249 stream, node_channel_id.node) {
250 ret = check_ust_stream(stream);
251 if (ret < 0) {
252 goto error_unlock;
253 }
254 }
255 break;
256 case LTTNG_CONSUMER_KERNEL:
257 cds_lfht_for_each_entry_duplicate(ht->ht,
258 ht->hash_fct(&channel->key, lttng_ht_seed),
259 ht->match_fct, &channel->key, &iter.iter,
260 stream, node_channel_id.node) {
261 ret = check_kernel_stream(stream);
262 if (ret < 0) {
263 goto error_unlock;
264 }
265 }
266 break;
267 case LTTNG_CONSUMER_UNKNOWN:
268 assert(0);
269 break;
270 }
271
272 error_unlock:
273 rcu_read_unlock();
274
275 error:
276 return;
277 }
278
279 static
280 void consumer_timer_signal_thread_qs(unsigned int signr)
281 {
282 sigset_t pending_set;
283 int ret;
284
285 /*
286 * We need to be the only thread interacting with the thread
287 * that manages signals for teardown synchronization.
288 */
289 pthread_mutex_lock(&timer_signal.lock);
290
291 /* Ensure we don't have any signal queued for this channel. */
292 for (;;) {
293 ret = sigemptyset(&pending_set);
294 if (ret == -1) {
295 PERROR("sigemptyset");
296 }
297 ret = sigpending(&pending_set);
298 if (ret == -1) {
299 PERROR("sigpending");
300 }
301 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
302 break;
303 }
304 caa_cpu_relax();
305 }
306
307 /*
308 * From this point, no new signal handler will be fired that would try to
309 * access "chan". However, we still need to wait for any currently
310 * executing handler to complete.
311 */
312 cmm_smp_mb();
313 CMM_STORE_SHARED(timer_signal.qs_done, 0);
314 cmm_smp_mb();
315
316 /*
317 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
318 * up.
319 */
320 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
321
322 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
323 caa_cpu_relax();
324 }
325 cmm_smp_mb();
326
327 pthread_mutex_unlock(&timer_signal.lock);
328 }
329
330 /*
331 * Set the timer for periodical metadata flush.
332 */
333 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
334 unsigned int switch_timer_interval)
335 {
336 int ret;
337 struct sigevent sev;
338 struct itimerspec its;
339
340 assert(channel);
341 assert(channel->key);
342
343 if (switch_timer_interval == 0) {
344 return;
345 }
346
347 sev.sigev_notify = SIGEV_SIGNAL;
348 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
349 sev.sigev_value.sival_ptr = channel;
350 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
351 if (ret == -1) {
352 PERROR("timer_create");
353 }
354 channel->switch_timer_enabled = 1;
355
356 its.it_value.tv_sec = switch_timer_interval / 1000000;
357 its.it_value.tv_nsec = switch_timer_interval % 1000000;
358 its.it_interval.tv_sec = its.it_value.tv_sec;
359 its.it_interval.tv_nsec = its.it_value.tv_nsec;
360
361 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
362 if (ret == -1) {
363 PERROR("timer_settime");
364 }
365 }
366
367 /*
368 * Stop and delete timer.
369 */
370 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
371 {
372 int ret;
373
374 assert(channel);
375
376 ret = timer_delete(channel->switch_timer);
377 if (ret == -1) {
378 PERROR("timer_delete");
379 }
380
381 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
382
383 channel->switch_timer = 0;
384 channel->switch_timer_enabled = 0;
385 }
386
387 /*
388 * Set the timer for the live mode.
389 */
390 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
391 int live_timer_interval)
392 {
393 int ret;
394 struct sigevent sev;
395 struct itimerspec its;
396
397 assert(channel);
398 assert(channel->key);
399
400 if (live_timer_interval <= 0) {
401 return;
402 }
403
404 sev.sigev_notify = SIGEV_SIGNAL;
405 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
406 sev.sigev_value.sival_ptr = channel;
407 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
408 if (ret == -1) {
409 PERROR("timer_create");
410 }
411 channel->live_timer_enabled = 1;
412
413 its.it_value.tv_sec = live_timer_interval / 1000000;
414 its.it_value.tv_nsec = live_timer_interval % 1000000;
415 its.it_interval.tv_sec = its.it_value.tv_sec;
416 its.it_interval.tv_nsec = its.it_value.tv_nsec;
417
418 ret = timer_settime(channel->live_timer, 0, &its, NULL);
419 if (ret == -1) {
420 PERROR("timer_settime");
421 }
422 }
423
424 /*
425 * Stop and delete timer.
426 */
427 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
428 {
429 int ret;
430
431 assert(channel);
432
433 ret = timer_delete(channel->live_timer);
434 if (ret == -1) {
435 PERROR("timer_delete");
436 }
437
438 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
439
440 channel->live_timer = 0;
441 channel->live_timer_enabled = 0;
442 }
443
444 /*
445 * Block the RT signals for the entire process. It must be called from the
446 * consumer main before creating the threads
447 */
448 void consumer_signal_init(void)
449 {
450 int ret;
451 sigset_t mask;
452
453 /* Block signal for entire process, so only our thread processes it. */
454 setmask(&mask);
455 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
456 if (ret) {
457 errno = ret;
458 PERROR("pthread_sigmask");
459 }
460 }
461
462 /*
463 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
464 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
465 */
466 void *consumer_timer_thread(void *data)
467 {
468 int signr;
469 sigset_t mask;
470 siginfo_t info;
471 struct lttng_consumer_local_data *ctx = data;
472
473 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
474
475 health_code_update();
476
477 /* Only self thread will receive signal mask. */
478 setmask(&mask);
479 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
480
481 while (1) {
482 health_code_update();
483
484 health_poll_entry();
485 signr = sigwaitinfo(&mask, &info);
486 health_poll_exit();
487 if (signr == -1) {
488 if (errno != EINTR) {
489 PERROR("sigwaitinfo");
490 }
491 continue;
492 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
493 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
494 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
495 cmm_smp_mb();
496 CMM_STORE_SHARED(timer_signal.qs_done, 1);
497 cmm_smp_mb();
498 DBG("Signal timer metadata thread teardown");
499 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
500 live_timer(ctx, info.si_signo, &info, NULL);
501 } else {
502 ERR("Unexpected signal %d\n", info.si_signo);
503 }
504 }
505
506 /* Currently never reached */
507 health_unregister(health_consumerd);
508
509 /* Never return */
510 return NULL;
511 }
This page took 0.039898 seconds and 5 git commands to generate.