remove debug
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.c
1 /*
2 * Copyright (C) 2017 - Julien Desfossez <jdesfossez@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <lttng/trigger/trigger.h>
20 #include <common/error.h>
21 #include <common/config/session-config.h>
22 #include <common/defaults.h>
23 #include <common/utils.h>
24 #include <common/futex.h>
25 #include <common/align.h>
26 #include <common/time.h>
27 #include <common/hashtable/utils.h>
28 #include <sys/eventfd.h>
29 #include <sys/stat.h>
30 #include <time.h>
31 #include <signal.h>
32 #include <inttypes.h>
33
34 #include <common/kernel-ctl/kernel-ctl.h>
35 #include "rotation-thread.h"
36 #include "lttng-sessiond.h"
37 #include "health-sessiond.h"
38 #include "rotate.h"
39 #include "cmd.h"
40 #include "session.h"
41 #include "sessiond-timer.h"
42
43 #include <urcu.h>
44 #include <urcu/list.h>
45 #include <urcu/rculfhash.h>
46
47 struct cds_lfht *channel_pending_rotate_ht;
48
49 static
50 void channel_rotation_info_destroy(struct rotation_channel_info *channel_info)
51 {
52 assert(channel_info);
53 free(channel_info);
54 }
55
56 static
57 int match_channel_info(struct cds_lfht_node *node, const void *key)
58 {
59 struct rotation_channel_key *channel_key = (struct rotation_channel_key *) key;
60 struct rotation_channel_info *channel_info;
61
62 channel_info = caa_container_of(node, struct rotation_channel_info,
63 rotate_channels_ht_node);
64
65 return !!((channel_key->key == channel_info->channel_key.key) &&
66 (channel_key->domain == channel_info->channel_key.domain));
67 }
68
69 static
70 struct rotation_channel_info *lookup_channel_pending(uint64_t key,
71 enum lttng_domain_type domain)
72 {
73 struct cds_lfht_iter iter;
74 struct cds_lfht_node *node;
75 struct rotation_channel_info *channel_info = NULL;
76 struct rotation_channel_key channel_key = { .key = key,
77 .domain = domain };
78
79 cds_lfht_lookup(channel_pending_rotate_ht,
80 hash_channel_key(&channel_key),
81 match_channel_info,
82 &channel_key, &iter);
83 node = cds_lfht_iter_get_node(&iter);
84 if (!node) {
85 goto end;
86 }
87
88 channel_info = caa_container_of(node, struct rotation_channel_info,
89 rotate_channels_ht_node);
90 cds_lfht_del(channel_pending_rotate_ht, node);
91 end:
92 return channel_info;
93 }
94
95 /*
96 * Destroy the thread data previously created by the init function.
97 */
98 void rotation_thread_handle_destroy(
99 struct rotation_thread_handle *handle)
100 {
101 int ret;
102
103 if (!handle) {
104 goto end;
105 }
106
107 if (handle->ust32_consumer >= 0) {
108 ret = close(handle->ust32_consumer);
109 if (ret) {
110 PERROR("close 32-bit consumer channel rotation pipe");
111 }
112 }
113 if (handle->ust64_consumer >= 0) {
114 ret = close(handle->ust64_consumer);
115 if (ret) {
116 PERROR("close 64-bit consumer channel rotation pipe");
117 }
118 }
119 if (handle->kernel_consumer >= 0) {
120 ret = close(handle->kernel_consumer);
121 if (ret) {
122 PERROR("close kernel consumer channel rotation pipe");
123 }
124 }
125
126 end:
127 free(handle);
128 }
129
130 struct rotation_thread_handle *rotation_thread_handle_create(
131 struct lttng_pipe *ust32_channel_rotate_pipe,
132 struct lttng_pipe *ust64_channel_rotate_pipe,
133 struct lttng_pipe *kernel_channel_rotate_pipe,
134 int thread_quit_pipe, int rotate_timer_pipe)
135 {
136 struct rotation_thread_handle *handle;
137
138 handle = zmalloc(sizeof(*handle));
139 if (!handle) {
140 goto end;
141 }
142
143 if (ust32_channel_rotate_pipe) {
144 handle->ust32_consumer =
145 lttng_pipe_release_readfd(
146 ust32_channel_rotate_pipe);
147 if (handle->ust32_consumer < 0) {
148 goto error;
149 }
150 } else {
151 handle->ust32_consumer = -1;
152 }
153 if (ust64_channel_rotate_pipe) {
154 handle->ust64_consumer =
155 lttng_pipe_release_readfd(
156 ust64_channel_rotate_pipe);
157 if (handle->ust64_consumer < 0) {
158 goto error;
159 }
160 } else {
161 handle->ust64_consumer = -1;
162 }
163 if (kernel_channel_rotate_pipe) {
164 handle->kernel_consumer =
165 lttng_pipe_release_readfd(
166 kernel_channel_rotate_pipe);
167 if (handle->kernel_consumer < 0) {
168 goto error;
169 }
170 } else {
171 handle->kernel_consumer = -1;
172 }
173 handle->thread_quit_pipe = thread_quit_pipe;
174 handle->rotate_timer_pipe = rotate_timer_pipe;
175
176 end:
177 return handle;
178 error:
179 rotation_thread_handle_destroy(handle);
180 return NULL;
181 }
182
183 static
184 int init_poll_set(struct lttng_poll_event *poll_set,
185 struct rotation_thread_handle *handle)
186 {
187 int ret;
188
189 /*
190 * Create pollset with size 5:
191 * - sessiond quit pipe
192 * - consumerd (32-bit user space) channel rotate pipe,
193 * - consumerd (64-bit user space) channel rotate pipe,
194 * - consumerd (kernel) channel rotate pipe.
195 * - sessiond rotate pending pipe
196 */
197 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
198 if (ret < 0) {
199 goto end;
200 }
201
202 ret = lttng_poll_add(poll_set, handle->thread_quit_pipe,
203 LPOLLIN | LPOLLERR);
204 if (ret < 0) {
205 ERR("[rotation-thread] Failed to add thread_quit_pipe fd to pollset");
206 goto error;
207 }
208 ret = lttng_poll_add(poll_set, handle->rotate_timer_pipe,
209 LPOLLIN | LPOLLERR);
210 if (ret < 0) {
211 ERR("[rotation-thread] Failed to add rotate_pending fd to pollset");
212 goto error;
213 }
214 ret = lttng_poll_add(poll_set, handle->ust32_consumer,
215 LPOLLIN | LPOLLERR);
216 if (ret < 0) {
217 ERR("[rotation-thread] Failed to add ust-32 channel rotation pipe fd to pollset");
218 goto error;
219 }
220 ret = lttng_poll_add(poll_set, handle->ust64_consumer,
221 LPOLLIN | LPOLLERR);
222 if (ret < 0) {
223 ERR("[rotation-thread] Failed to add ust-64 channel rotation pipe fd to pollset");
224 goto error;
225 }
226 if (handle->kernel_consumer < 0) {
227 goto end;
228 }
229 ret = lttng_poll_add(poll_set, handle->kernel_consumer,
230 LPOLLIN | LPOLLERR);
231 if (ret < 0) {
232 ERR("[rotation-thread] Failed to add kernel channel rotation pipe fd to pollset");
233 goto error;
234 }
235
236 end:
237 return ret;
238 error:
239 lttng_poll_clean(poll_set);
240 return ret;
241 }
242
243 static
244 void fini_thread_state(struct rotation_thread_state *state)
245 {
246 lttng_poll_clean(&state->events);
247 }
248
249 static
250 int init_thread_state(struct rotation_thread_handle *handle,
251 struct rotation_thread_state *state)
252 {
253 int ret;
254
255 memset(state, 0, sizeof(*state));
256 lttng_poll_init(&state->events);
257
258 ret = init_poll_set(&state->events, handle);
259 if (ret) {
260 goto end;
261 }
262
263 channel_pending_rotate_ht = cds_lfht_new(DEFAULT_HT_SIZE,
264 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
265 if (!channel_pending_rotate_ht) {
266 ret = -1;
267 }
268
269 end:
270 return 0;
271 }
272
273 static
274 int handle_channel_rotation_pipe(int fd, uint32_t revents,
275 struct rotation_thread_handle *handle,
276 struct rotation_thread_state *state)
277 {
278 int ret = 0;
279 enum lttng_domain_type domain;
280 struct rotation_channel_info *channel_info;
281 struct ltt_session *session = NULL;
282 uint64_t key;
283
284 if (fd == handle->ust32_consumer ||
285 fd == handle->ust64_consumer) {
286 domain = LTTNG_DOMAIN_UST;
287 } else if (fd == handle->kernel_consumer) {
288 domain = LTTNG_DOMAIN_KERNEL;
289 } else {
290 abort();
291 }
292
293 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
294 ret = lttng_poll_del(&state->events, fd);
295 if (ret) {
296 ERR("[rotation-thread] Failed to remove consumer "
297 "rotation pipe from poll set");
298 }
299 goto end;
300 }
301
302 do {
303 ret = read(fd, &key, sizeof(key));
304 } while (ret == -1 && errno == EINTR);
305 if (ret != sizeof(key)) {
306 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
307 fd);
308 ret = -1;
309 goto end;
310 }
311
312 DBG("[rotation-thread] Received notification for chan %" PRIu64
313 ", domain %d\n", key, domain);
314
315 channel_info = lookup_channel_pending(key, domain);
316 if (!channel_info) {
317 ERR("[rotation-thread] Failed to find channel_info (key = %"
318 PRIu64 ")", key);
319 ret = -1;
320 goto end;
321 }
322 rcu_read_lock();
323 session_lock_list();
324 session = session_find_by_id(channel_info->session_id);
325 if (!session) {
326 ERR("[rotation-thread] Session %" PRIu64 " not found",
327 channel_info->session_id);
328 ret = -1;
329 goto end_unlock;
330 }
331
332 if (--session->nr_chan_rotate_pending == 0) {
333 time_t now = time(NULL);
334
335 if (now == (time_t) -1) {
336 session->rotate_status = LTTNG_ROTATE_ERROR;
337 ret = LTTNG_ERR_ROTATE_NOT_AVAILABLE;
338 goto end;
339 }
340 session_lock(session);
341
342 ret = rename_complete_chunk(session, now);
343 if (ret < 0) {
344 ERR("Failed to rename completed rotation chunk");
345 session_unlock(session);
346 goto end;
347 }
348 session->rotate_pending = false;
349 if (session->rotate_pending_relay) {
350 ret = sessiond_timer_rotate_pending_start(
351 session,
352 DEFAULT_ROTATE_PENDING_RELAY_TIMER);
353 if (ret) {
354 ERR("Enabling rotate pending timer");
355 ret = -1;
356 session_unlock(session);
357 goto end;
358 }
359 }
360 session_unlock(session);
361 }
362
363 channel_rotation_info_destroy(channel_info);
364
365 ret = 0;
366
367 end_unlock:
368 session_unlock_list();
369 rcu_read_unlock();
370 end:
371 return ret;
372 }
373
374 static
375 int rotate_pending_relay_timer(struct ltt_session *session)
376 {
377 int ret;
378
379 DBG("[rotation-thread] Check rotate pending on session %" PRIu64,
380 session->id);
381 ret = relay_rotate_pending(session, session->rotate_count - 1);
382 if (ret < 0) {
383 ERR("[rotation-thread] Check relay rotate pending");
384 goto end;
385 }
386 if (ret == 0) {
387 DBG("[rotation-thread] Rotation completed on the relay for "
388 "session %" PRIu64, session->id);
389 /*
390 * Stop the timer and clear the queue, the timers are currently
391 * ignored because of the rotate_pending_relay_check_in_progress
392 * flag.
393 */
394 sessiond_timer_rotate_pending_stop(session);
395 /*
396 * Now we can clear the pending flag in the session. New
397 * rotations can start now.
398 */
399 session->rotate_pending_relay = false;
400 } else if (ret == 1) {
401 DBG("[rotation-thread] Rotation still pending on the relay for "
402 "session %" PRIu64, session->id);
403 }
404 /*
405 * Allow the timer thread to send other notifications when needed.
406 */
407 session->rotate_pending_relay_check_in_progress = false;
408
409 ret = 0;
410
411 end:
412 return ret;
413 }
414
415 static
416 int rotate_timer(struct ltt_session *session)
417 {
418 int ret;
419
420 DBG("[rotation-thread] Rotate timer on session %" PRIu64, session->id);
421
422 /*
423 * If the session is stopped, we need to cancel this timer.
424 */
425 session_lock(session);
426 if (!session->active && session->rotate_timer_enabled) {
427 sessiond_rotate_timer_stop(session);
428 }
429
430 ret = cmd_rotate_session(session, NULL);
431 session_unlock(session);
432 if (ret == -LTTNG_ERR_ROTATE_PENDING) {
433 ret = 0;
434 goto end;
435 } else if (ret != LTTNG_OK) {
436 ret = -1;
437 goto end;
438 }
439
440
441 ret = 0;
442
443 end:
444 return ret;
445 }
446
447 static
448 int handle_rotate_timer_pipe(int fd, uint32_t revents,
449 struct rotation_thread_handle *handle,
450 struct rotation_thread_state *state)
451 {
452 int ret = 0;
453 struct ltt_session *session;
454 struct sessiond_rotation_timer timer_data;
455
456 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
457 ret = lttng_poll_del(&state->events, fd);
458 if (ret) {
459 ERR("[rotation-thread] Failed to remove consumer "
460 "rotate pending pipe from poll set");
461 }
462 goto end;
463 }
464
465 memset(&timer_data, 0, sizeof(struct sessiond_rotation_timer));
466
467 do {
468 ret = read(fd, &timer_data, sizeof(timer_data));
469 } while (ret == -1 && errno == EINTR);
470 if (ret != sizeof(timer_data)) {
471 ERR("[rotation-thread] Failed to read from pipe (fd = %i)",
472 fd);
473 ret = -1;
474 goto end;
475 }
476
477 rcu_read_lock();
478 session_lock_list();
479 session = session_find_by_id(timer_data.session_id);
480 if (!session) {
481 ERR("[rotation-thread] Session %" PRIu64 " not found",
482 timer_data.session_id);
483 /*
484 * This is a non-fatal error, and we cannot report it to the
485 * user (timer), so just print the error and continue the
486 * processing.
487 */
488 ret = 0;
489 goto end_unlock;
490 }
491
492 if (timer_data.signal == LTTNG_SESSIOND_SIG_ROTATE_PENDING) {
493 ret = rotate_pending_relay_timer(session);
494 } else if (timer_data.signal == LTTNG_SESSIOND_SIG_ROTATE_TIMER) {
495 ret = rotate_timer(session);
496 } else {
497 ERR("Unknown signal in rotate timer");
498 ret = -1;
499 }
500
501 end_unlock:
502 session_unlock_list();
503 rcu_read_unlock();
504 end:
505 return ret;
506 }
507
508 void *thread_rotation(void *data)
509 {
510 int ret;
511 struct rotation_thread_handle *handle = data;
512 struct rotation_thread_state state;
513
514 DBG("[rotation-thread] Started rotation thread");
515
516 if (!handle) {
517 ERR("[rotation-thread] Invalid thread context provided");
518 goto end;
519 }
520
521 rcu_register_thread();
522 rcu_thread_online();
523
524 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
525 health_code_update();
526
527 ret = init_thread_state(handle, &state);
528 if (ret) {
529 goto end;
530 }
531
532 /* Ready to handle client connections. */
533 sessiond_notify_ready();
534
535 while (true) {
536 int fd_count, i;
537
538 health_poll_entry();
539 DBG("[rotation-thread] Entering poll wait");
540 ret = lttng_poll_wait(&state.events, -1);
541 DBG("[rotation-thread] Poll wait returned (%i)", ret);
542 health_poll_exit();
543 if (ret < 0) {
544 /*
545 * Restart interrupted system call.
546 */
547 if (errno == EINTR) {
548 continue;
549 }
550 ERR("[rotation-thread] Error encountered during lttng_poll_wait (%i)", ret);
551 goto error;
552 }
553
554 fd_count = ret;
555 for (i = 0; i < fd_count; i++) {
556 int fd = LTTNG_POLL_GETFD(&state.events, i);
557 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
558
559 DBG("[rotation-thread] Handling fd (%i) activity (%u)",
560 fd, revents);
561
562 if (fd == handle->thread_quit_pipe) {
563 DBG("[rotation-thread] Quit pipe activity");
564 goto exit;
565 } else if (fd == handle->rotate_timer_pipe) {
566 ret = handle_rotate_timer_pipe(fd, revents,
567 handle, &state);
568 if (ret) {
569 ERR("[rotation-thread] Rotate timer");
570 goto error;
571 }
572 } else if (fd == handle->ust32_consumer ||
573 fd == handle->ust64_consumer ||
574 fd == handle->kernel_consumer) {
575 ret = handle_channel_rotation_pipe(fd,
576 revents, handle, &state);
577 if (ret) {
578 ERR("[rotation-thread] Exit main loop");
579 goto error;
580 }
581 }
582 }
583 }
584 exit:
585 error:
586 fini_thread_state(&state);
587 health_unregister(health_sessiond);
588 rcu_thread_offline();
589 rcu_unregister_thread();
590 end:
591 return NULL;
592 }
This page took 0.043757 seconds and 5 git commands to generate.