Fix: remove bad UST header in consumer-timer.c
[lttng-tools.git] / src / common / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <common/common.h>
25 #include <common/kernel-ctl/kernel-ctl.h>
26 #include <common/kernel-consumer/kernel-consumer.h>
27 #include <common/consumer-stream.h>
28
29 #include "consumer-timer.h"
30 #include "ust-consumer/ust-consumer.h"
31
32 static struct timer_signal_data timer_signal = {
33 .tid = 0,
34 .setup_done = 0,
35 .qs_done = 0,
36 .lock = PTHREAD_MUTEX_INITIALIZER,
37 };
38
39 /*
40 * Set custom signal mask to current thread.
41 */
42 static void setmask(sigset_t *mask)
43 {
44 int ret;
45
46 ret = sigemptyset(mask);
47 if (ret) {
48 PERROR("sigemptyset");
49 }
50 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
51 if (ret) {
52 PERROR("sigaddset switch");
53 }
54 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
55 if (ret) {
56 PERROR("sigaddset teardown");
57 }
58 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
59 if (ret) {
60 PERROR("sigaddset live");
61 }
62 }
63
64 /*
65 * Execute action on a timer switch.
66 *
67 * Beware: metadata_switch_timer() should *never* take a mutex also held
68 * while consumer_timer_switch_stop() is called. It would result in
69 * deadlocks.
70 */
71 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
72 int sig, siginfo_t *si, void *uc)
73 {
74 int ret;
75 struct lttng_consumer_channel *channel;
76
77 channel = si->si_value.sival_ptr;
78 assert(channel);
79
80 if (channel->switch_timer_error) {
81 return;
82 }
83
84 DBG("Switch timer for channel %" PRIu64, channel->key);
85 switch (ctx->type) {
86 case LTTNG_CONSUMER32_UST:
87 case LTTNG_CONSUMER64_UST:
88 /*
89 * Locks taken by lttng_ustconsumer_request_metadata():
90 * - metadata_socket_lock
91 * - Calling lttng_ustconsumer_recv_metadata():
92 * - channel->metadata_cache->lock
93 * - Calling consumer_metadata_cache_flushed():
94 * - channel->timer_lock
95 * - channel->metadata_cache->lock
96 *
97 * Ensure that neither consumer_data.lock nor
98 * channel->lock are taken within this function, since
99 * they are held while consumer_timer_switch_stop() is
100 * called.
101 */
102 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
103 if (ret < 0) {
104 channel->switch_timer_error = 1;
105 }
106 break;
107 case LTTNG_CONSUMER_KERNEL:
108 case LTTNG_CONSUMER_UNKNOWN:
109 assert(0);
110 break;
111 }
112 }
113
114 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
115 {
116 int ret;
117 struct lttng_packet_index index;
118
119 memset(&index, 0, sizeof(index));
120 index.timestamp_end = htobe64(ts);
121 ret = consumer_stream_write_index(stream, &index);
122 if (ret < 0) {
123 goto error;
124 }
125
126 error:
127 return ret;
128 }
129
130 static int check_kernel_stream(struct lttng_consumer_stream *stream)
131 {
132 uint64_t ts;
133 int ret;
134
135 /*
136 * While holding the stream mutex, try to take a snapshot, if it
137 * succeeds, it means that data is ready to be sent, just let the data
138 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
139 * means that there is no data to read after the flush, so we can
140 * safely send the empty index.
141 */
142 pthread_mutex_lock(&stream->lock);
143 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
144 if (ret < 0) {
145 ERR("Failed to get the current timestamp");
146 goto error_unlock;
147 }
148 ret = kernctl_buffer_flush(stream->wait_fd);
149 if (ret < 0) {
150 ERR("Failed to flush kernel stream");
151 goto error_unlock;
152 }
153 ret = kernctl_snapshot(stream->wait_fd);
154 if (ret < 0) {
155 if (errno != EAGAIN) {
156 ERR("Taking kernel snapshot");
157 ret = -1;
158 goto error_unlock;
159 }
160 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
161 ret = send_empty_index(stream, ts);
162 if (ret < 0) {
163 goto error_unlock;
164 }
165 }
166 ret = 0;
167
168 error_unlock:
169 pthread_mutex_unlock(&stream->lock);
170 return ret;
171 }
172
173 static int check_ust_stream(struct lttng_consumer_stream *stream)
174 {
175 uint64_t ts;
176 int ret;
177
178 assert(stream);
179 assert(stream->ustream);
180 /*
181 * While holding the stream mutex, try to take a snapshot, if it
182 * succeeds, it means that data is ready to be sent, just let the data
183 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
184 * means that there is no data to read after the flush, so we can
185 * safely send the empty index.
186 */
187 pthread_mutex_lock(&stream->lock);
188 ret = cds_lfht_is_node_deleted(&stream->node.node);
189 if (ret) {
190 goto error_unlock;
191 }
192
193 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
194 if (ret < 0) {
195 ERR("Failed to get the current timestamp");
196 goto error_unlock;
197 }
198 lttng_ustconsumer_flush_buffer(stream, 1);
199 ret = lttng_ustconsumer_take_snapshot(stream);
200 if (ret < 0) {
201 if (ret != -EAGAIN) {
202 ERR("Taking UST snapshot");
203 ret = -1;
204 goto error_unlock;
205 }
206 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
207 ret = send_empty_index(stream, ts);
208 if (ret < 0) {
209 goto error_unlock;
210 }
211 }
212 ret = 0;
213
214 error_unlock:
215 pthread_mutex_unlock(&stream->lock);
216 return ret;
217 }
218
219 /*
220 * Execute action on a live timer
221 */
222 static void live_timer(struct lttng_consumer_local_data *ctx,
223 int sig, siginfo_t *si, void *uc)
224 {
225 int ret;
226 struct lttng_consumer_channel *channel;
227 struct lttng_consumer_stream *stream;
228 struct lttng_ht *ht;
229 struct lttng_ht_iter iter;
230
231 channel = si->si_value.sival_ptr;
232 assert(channel);
233
234 if (channel->switch_timer_error) {
235 goto error;
236 }
237 ht = consumer_data.stream_per_chan_id_ht;
238
239 DBG("Live timer for channel %" PRIu64, channel->key);
240
241 rcu_read_lock();
242 switch (ctx->type) {
243 case LTTNG_CONSUMER32_UST:
244 case LTTNG_CONSUMER64_UST:
245 cds_lfht_for_each_entry_duplicate(ht->ht,
246 ht->hash_fct(&channel->key, lttng_ht_seed),
247 ht->match_fct, &channel->key, &iter.iter,
248 stream, node_channel_id.node) {
249 ret = check_ust_stream(stream);
250 if (ret < 0) {
251 goto error_unlock;
252 }
253 }
254 break;
255 case LTTNG_CONSUMER_KERNEL:
256 cds_lfht_for_each_entry_duplicate(ht->ht,
257 ht->hash_fct(&channel->key, lttng_ht_seed),
258 ht->match_fct, &channel->key, &iter.iter,
259 stream, node_channel_id.node) {
260 ret = check_kernel_stream(stream);
261 if (ret < 0) {
262 goto error_unlock;
263 }
264 }
265 break;
266 case LTTNG_CONSUMER_UNKNOWN:
267 assert(0);
268 break;
269 }
270
271 error_unlock:
272 rcu_read_unlock();
273
274 error:
275 return;
276 }
277
278 static
279 void consumer_timer_signal_thread_qs(unsigned int signr)
280 {
281 sigset_t pending_set;
282 int ret;
283
284 /*
285 * We need to be the only thread interacting with the thread
286 * that manages signals for teardown synchronization.
287 */
288 pthread_mutex_lock(&timer_signal.lock);
289
290 /* Ensure we don't have any signal queued for this channel. */
291 for (;;) {
292 ret = sigemptyset(&pending_set);
293 if (ret == -1) {
294 PERROR("sigemptyset");
295 }
296 ret = sigpending(&pending_set);
297 if (ret == -1) {
298 PERROR("sigpending");
299 }
300 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
301 break;
302 }
303 caa_cpu_relax();
304 }
305
306 /*
307 * From this point, no new signal handler will be fired that would try to
308 * access "chan". However, we still need to wait for any currently
309 * executing handler to complete.
310 */
311 cmm_smp_mb();
312 CMM_STORE_SHARED(timer_signal.qs_done, 0);
313 cmm_smp_mb();
314
315 /*
316 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
317 * up.
318 */
319 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
320
321 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
322 caa_cpu_relax();
323 }
324 cmm_smp_mb();
325
326 pthread_mutex_unlock(&timer_signal.lock);
327 }
328
329 /*
330 * Set the timer for periodical metadata flush.
331 */
332 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
333 unsigned int switch_timer_interval)
334 {
335 int ret;
336 struct sigevent sev;
337 struct itimerspec its;
338
339 assert(channel);
340 assert(channel->key);
341
342 if (switch_timer_interval == 0) {
343 return;
344 }
345
346 sev.sigev_notify = SIGEV_SIGNAL;
347 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
348 sev.sigev_value.sival_ptr = channel;
349 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
350 if (ret == -1) {
351 PERROR("timer_create");
352 }
353 channel->switch_timer_enabled = 1;
354
355 its.it_value.tv_sec = switch_timer_interval / 1000000;
356 its.it_value.tv_nsec = switch_timer_interval % 1000000;
357 its.it_interval.tv_sec = its.it_value.tv_sec;
358 its.it_interval.tv_nsec = its.it_value.tv_nsec;
359
360 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
361 if (ret == -1) {
362 PERROR("timer_settime");
363 }
364 }
365
366 /*
367 * Stop and delete timer.
368 */
369 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
370 {
371 int ret;
372
373 assert(channel);
374
375 ret = timer_delete(channel->switch_timer);
376 if (ret == -1) {
377 PERROR("timer_delete");
378 }
379
380 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
381
382 channel->switch_timer = 0;
383 channel->switch_timer_enabled = 0;
384 }
385
386 /*
387 * Set the timer for the live mode.
388 */
389 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
390 int live_timer_interval)
391 {
392 int ret;
393 struct sigevent sev;
394 struct itimerspec its;
395
396 assert(channel);
397 assert(channel->key);
398
399 if (live_timer_interval == 0) {
400 return;
401 }
402
403 sev.sigev_notify = SIGEV_SIGNAL;
404 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
405 sev.sigev_value.sival_ptr = channel;
406 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
407 if (ret == -1) {
408 PERROR("timer_create");
409 }
410 channel->live_timer_enabled = 1;
411
412 its.it_value.tv_sec = live_timer_interval / 1000000;
413 its.it_value.tv_nsec = live_timer_interval % 1000000;
414 its.it_interval.tv_sec = its.it_value.tv_sec;
415 its.it_interval.tv_nsec = its.it_value.tv_nsec;
416
417 ret = timer_settime(channel->live_timer, 0, &its, NULL);
418 if (ret == -1) {
419 PERROR("timer_settime");
420 }
421 }
422
423 /*
424 * Stop and delete timer.
425 */
426 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
427 {
428 int ret;
429
430 assert(channel);
431
432 ret = timer_delete(channel->live_timer);
433 if (ret == -1) {
434 PERROR("timer_delete");
435 }
436
437 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
438
439 channel->live_timer = 0;
440 channel->live_timer_enabled = 0;
441 }
442
443 /*
444 * Block the RT signals for the entire process. It must be called from the
445 * consumer main before creating the threads
446 */
447 void consumer_signal_init(void)
448 {
449 int ret;
450 sigset_t mask;
451
452 /* Block signal for entire process, so only our thread processes it. */
453 setmask(&mask);
454 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
455 if (ret) {
456 errno = ret;
457 PERROR("pthread_sigmask");
458 }
459 }
460
461 /*
462 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
463 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
464 */
465 void *consumer_timer_thread(void *data)
466 {
467 int signr;
468 sigset_t mask;
469 siginfo_t info;
470 struct lttng_consumer_local_data *ctx = data;
471
472 /* Only self thread will receive signal mask. */
473 setmask(&mask);
474 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
475
476 while (1) {
477 signr = sigwaitinfo(&mask, &info);
478 if (signr == -1) {
479 if (errno != EINTR) {
480 PERROR("sigwaitinfo");
481 }
482 continue;
483 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
484 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
485 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
486 cmm_smp_mb();
487 CMM_STORE_SHARED(timer_signal.qs_done, 1);
488 cmm_smp_mb();
489 DBG("Signal timer metadata thread teardown");
490 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
491 live_timer(ctx, info.si_signo, &info, NULL);
492 } else {
493 ERR("Unexpected signal %d\n", info.si_signo);
494 }
495 }
496
497 return NULL;
498 }
This page took 0.041873 seconds and 6 git commands to generate.