SoW-2020-0002: Trace Hit Counters: trigger error reporting integration
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <lttng/trigger/trigger.h>
10#include <lttng/notification/channel-internal.h>
11#include <lttng/notification/notification-internal.h>
12#include <lttng/condition/condition-internal.h>
13#include <lttng/condition/buffer-usage-internal.h>
14#include <common/error.h>
15#include <common/config/session-config.h>
16#include <common/defaults.h>
17#include <common/utils.h>
ab0ee2ca
JG
18#include <common/align.h>
19#include <common/time.h>
20#include <sys/eventfd.h>
21#include <sys/stat.h>
22#include <time.h>
23#include <signal.h>
24
25#include "notification-thread.h"
26#include "notification-thread-events.h"
27#include "notification-thread-commands.h"
28#include "lttng-sessiond.h"
29#include "health-sessiond.h"
c8a9de5a 30#include "thread.h"
2463b787
JR
31#include "testpoint.h"
32
33#include "kernel.h"
34#include <common/kernel-ctl/kernel-ctl.h>
ab0ee2ca
JG
35
36#include <urcu.h>
37#include <urcu/list.h>
38#include <urcu/rculfhash.h>
39
2463b787
JR
40
41int trigger_consumption_paused;
ab0ee2ca
JG
42/*
43 * Destroy the thread data previously created by the init function.
44 */
45void notification_thread_handle_destroy(
46 struct notification_thread_handle *handle)
47{
48 int ret;
ab0ee2ca
JG
49
50 if (!handle) {
51 goto end;
52 }
53
8ada111f 54 assert(cds_list_empty(&handle->cmd_queue.list));
ab0ee2ca 55 pthread_mutex_destroy(&handle->cmd_queue.lock);
c8a9de5a 56 sem_destroy(&handle->ready);
ab0ee2ca 57
814b4934
JR
58 if (handle->cmd_queue.event_pipe) {
59 lttng_pipe_destroy(handle->cmd_queue.event_pipe);
60 }
ab0ee2ca
JG
61 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
62 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
63 if (ret) {
64 PERROR("close 32-bit consumer channel monitoring pipe");
65 }
66 }
67 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
68 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
69 if (ret) {
70 PERROR("close 64-bit consumer channel monitoring pipe");
71 }
72 }
73 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
74 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
75 if (ret) {
76 PERROR("close kernel consumer channel monitoring pipe");
77 }
78 }
2463b787 79
ab0ee2ca
JG
80end:
81 free(handle);
82}
83
2463b787
JR
84/*
85 * TODO: refactor this if needed. Lifetime of the kernel notification event source.
86 * The kernel_notification_monitor_fd ownwership remain to the main thread.
87 * This is because we need to close this fd before removing the modules.
88 */
ab0ee2ca
JG
89struct notification_thread_handle *notification_thread_handle_create(
90 struct lttng_pipe *ust32_channel_monitor_pipe,
91 struct lttng_pipe *ust64_channel_monitor_pipe,
2463b787
JR
92 struct lttng_pipe *kernel_channel_monitor_pipe,
93 int kernel_notification_monitor_fd)
ab0ee2ca
JG
94{
95 int ret;
96 struct notification_thread_handle *handle;
814b4934 97 struct lttng_pipe *event_pipe = NULL;
ab0ee2ca
JG
98
99 handle = zmalloc(sizeof(*handle));
100 if (!handle) {
101 goto end;
102 }
103
c8a9de5a
JG
104 sem_init(&handle->ready, 0, 0);
105
18d08850 106 event_pipe = lttng_pipe_open(FD_CLOEXEC);
814b4934
JR
107 if (!event_pipe) {
108 ERR("event_pipe creation");
ab0ee2ca
JG
109 goto error;
110 }
814b4934
JR
111
112 handle->cmd_queue.event_pipe = event_pipe;
113 event_pipe = NULL;
114
ab0ee2ca
JG
115 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
116 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
117 if (ret) {
118 goto error;
119 }
120
121 if (ust32_channel_monitor_pipe) {
122 handle->channel_monitoring_pipes.ust32_consumer =
123 lttng_pipe_release_readfd(
124 ust32_channel_monitor_pipe);
125 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
126 goto error;
127 }
128 } else {
129 handle->channel_monitoring_pipes.ust32_consumer = -1;
130 }
131 if (ust64_channel_monitor_pipe) {
132 handle->channel_monitoring_pipes.ust64_consumer =
133 lttng_pipe_release_readfd(
134 ust64_channel_monitor_pipe);
135 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
136 goto error;
137 }
138 } else {
139 handle->channel_monitoring_pipes.ust64_consumer = -1;
140 }
141 if (kernel_channel_monitor_pipe) {
142 handle->channel_monitoring_pipes.kernel_consumer =
143 lttng_pipe_release_readfd(
144 kernel_channel_monitor_pipe);
145 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
146 goto error;
147 }
148 } else {
149 handle->channel_monitoring_pipes.kernel_consumer = -1;
150 }
2463b787
JR
151
152 CDS_INIT_LIST_HEAD(&handle->event_trigger_sources.list);
153 ret = pthread_mutex_init(&handle->event_trigger_sources.lock, NULL);
154 if (ret) {
155 goto error;
156 }
ab0ee2ca
JG
157end:
158 return handle;
159error:
814b4934 160 lttng_pipe_destroy(event_pipe);
ab0ee2ca
JG
161 notification_thread_handle_destroy(handle);
162 return NULL;
163}
164
165static
166char *get_notification_channel_sock_path(void)
167{
168 int ret;
169 bool is_root = !getuid();
170 char *sock_path;
171
172 sock_path = zmalloc(LTTNG_PATH_MAX);
173 if (!sock_path) {
174 goto error;
175 }
176
177 if (is_root) {
178 ret = snprintf(sock_path, LTTNG_PATH_MAX,
179 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
180 if (ret < 0) {
181 goto error;
182 }
183 } else {
4f00620d 184 const char *home_path = utils_get_home_dir();
ab0ee2ca
JG
185
186 if (!home_path) {
187 ERR("Can't get HOME directory for socket creation");
188 goto error;
189 }
190
191 ret = snprintf(sock_path, LTTNG_PATH_MAX,
192 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
193 home_path);
194 if (ret < 0) {
195 goto error;
196 }
197 }
198
199 return sock_path;
200error:
201 free(sock_path);
202 return NULL;
203}
204
205static
206void notification_channel_socket_destroy(int fd)
207{
208 int ret;
209 char *sock_path = get_notification_channel_sock_path();
210
211 DBG("[notification-thread] Destroying notification channel socket");
212
213 if (sock_path) {
214 ret = unlink(sock_path);
215 free(sock_path);
216 if (ret < 0) {
217 PERROR("unlink notification channel socket");
218 }
219 }
220
221 ret = close(fd);
222 if (ret) {
223 PERROR("close notification channel socket");
224 }
225}
226
227static
228int notification_channel_socket_create(void)
229{
230 int fd = -1, ret;
231 char *sock_path = get_notification_channel_sock_path();
232
233 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
234 sock_path);
235
236 ret = lttcomm_create_unix_sock(sock_path);
237 if (ret < 0) {
238 ERR("[notification-thread] Failed to create notification socket");
239 goto error;
240 }
241 fd = ret;
242
243 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
244 if (ret < 0) {
245 ERR("Set file permissions failed: %s", sock_path);
246 PERROR("chmod notification channel socket");
247 goto error;
248 }
249
250 if (getuid() == 0) {
28ab59d0
JR
251 gid_t gid;
252
253 ret = utils_get_group_id(config.tracing_group_name.value, true,
254 &gid);
255 if (ret) {
256 /* Default to root group. */
257 gid = 0;
258 }
259
260 ret = chown(sock_path, 0, gid);
ab0ee2ca
JG
261 if (ret) {
262 ERR("Failed to set the notification channel socket's group");
263 ret = -1;
264 goto error;
265 }
266 }
267
268 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
269 fd);
270 free(sock_path);
271 return fd;
272error:
273 if (fd >= 0 && close(fd) < 0) {
274 PERROR("close notification channel socket");
275 }
276 free(sock_path);
277 return ret;
278}
279
280static
281int init_poll_set(struct lttng_poll_event *poll_set,
282 struct notification_thread_handle *handle,
283 int notification_channel_socket)
284{
285 int ret;
286
287 /*
288 * Create pollset with size 5:
289 * - notification channel socket (listen for new connections),
290 * - command queue event fd (internal sessiond commands),
291 * - consumerd (32-bit user space) channel monitor pipe,
292 * - consumerd (64-bit user space) channel monitor pipe,
293 * - consumerd (kernel) channel monitor pipe.
294 */
295 ret = lttng_poll_create(poll_set, 5, LTTNG_CLOEXEC);
296 if (ret < 0) {
297 goto end;
298 }
299
300 ret = lttng_poll_add(poll_set, notification_channel_socket,
301 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
302 if (ret < 0) {
303 ERR("[notification-thread] Failed to add notification channel socket to pollset");
304 goto error;
305 }
814b4934 306 ret = lttng_poll_add(poll_set, lttng_pipe_get_readfd(handle->cmd_queue.event_pipe),
ab0ee2ca
JG
307 LPOLLIN | LPOLLERR);
308 if (ret < 0) {
309 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
310 goto error;
311 }
312 ret = lttng_poll_add(poll_set,
313 handle->channel_monitoring_pipes.ust32_consumer,
314 LPOLLIN | LPOLLERR);
315 if (ret < 0) {
316 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
317 goto error;
318 }
319 ret = lttng_poll_add(poll_set,
320 handle->channel_monitoring_pipes.ust64_consumer,
321 LPOLLIN | LPOLLERR);
322 if (ret < 0) {
323 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
324 goto error;
325 }
326 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
2463b787 327 goto skip_kernel_consumer;
ab0ee2ca
JG
328 }
329 ret = lttng_poll_add(poll_set,
330 handle->channel_monitoring_pipes.kernel_consumer,
331 LPOLLIN | LPOLLERR);
332 if (ret < 0) {
333 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
334 goto error;
335 }
2463b787
JR
336
337skip_kernel_consumer:
ab0ee2ca
JG
338end:
339 return ret;
340error:
341 lttng_poll_clean(poll_set);
342 return ret;
343}
344
345static
346void fini_thread_state(struct notification_thread_state *state)
347{
348 int ret;
349
350 if (state->client_socket_ht) {
351 ret = handle_notification_thread_client_disconnect_all(state);
352 assert(!ret);
353 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
354 assert(!ret);
355 }
ac1889bf
JG
356 if (state->client_id_ht) {
357 ret = cds_lfht_destroy(state->client_id_ht, NULL);
358 assert(!ret);
359 }
ab0ee2ca
JG
360 if (state->triggers_ht) {
361 ret = handle_notification_thread_trigger_unregister_all(state);
362 assert(!ret);
363 ret = cds_lfht_destroy(state->triggers_ht, NULL);
364 assert(!ret);
365 }
366 if (state->channel_triggers_ht) {
367 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
368 assert(!ret);
369 }
370 if (state->channel_state_ht) {
371 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
372 assert(!ret);
373 }
374 if (state->notification_trigger_clients_ht) {
375 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
376 NULL);
377 assert(!ret);
378 }
379 if (state->channels_ht) {
8abe313a
JG
380 ret = cds_lfht_destroy(state->channels_ht, NULL);
381 assert(!ret);
382 }
383 if (state->sessions_ht) {
384 ret = cds_lfht_destroy(state->sessions_ht, NULL);
ab0ee2ca
JG
385 assert(!ret);
386 }
2463b787
JR
387 if (state->triggers_by_name_uid_ht) {
388 ret = cds_lfht_destroy(state->triggers_by_name_uid_ht, NULL);
389 assert(!ret);
390 }
391 if (state->trigger_tokens_ht) {
392 ret = cds_lfht_destroy(state->trigger_tokens_ht, NULL);
393 assert(!ret);
394 }
ea9a44f0
JG
395 /*
396 * Must be destroyed after all channels have been destroyed.
397 * See comment in struct lttng_session_trigger_list.
398 */
399 if (state->session_triggers_ht) {
400 ret = cds_lfht_destroy(state->session_triggers_ht, NULL);
401 assert(!ret);
402 }
ab0ee2ca
JG
403 if (state->notification_channel_socket >= 0) {
404 notification_channel_socket_destroy(
405 state->notification_channel_socket);
406 }
f2b3ef9f
JG
407 if (state->executor) {
408 action_executor_destroy(state->executor);
409 }
ab0ee2ca
JG
410 lttng_poll_clean(&state->events);
411}
412
c8a9de5a
JG
413static
414void mark_thread_as_ready(struct notification_thread_handle *handle)
415{
416 DBG("Marking notification thread as ready");
417 sem_post(&handle->ready);
418}
419
420static
421void wait_until_thread_is_ready(struct notification_thread_handle *handle)
422{
423 DBG("Waiting for notification thread to be ready");
424 sem_wait(&handle->ready);
425 DBG("Notification thread is ready");
426}
427
ab0ee2ca
JG
428static
429int init_thread_state(struct notification_thread_handle *handle,
430 struct notification_thread_state *state)
431{
432 int ret;
433
434 memset(state, 0, sizeof(*state));
435 state->notification_channel_socket = -1;
2463b787 436 state->trigger_id.token_generator = 1;
ab0ee2ca
JG
437 lttng_poll_init(&state->events);
438
439 ret = notification_channel_socket_create();
440 if (ret < 0) {
441 goto end;
442 }
443 state->notification_channel_socket = ret;
444
445 ret = init_poll_set(&state->events, handle,
446 state->notification_channel_socket);
447 if (ret) {
448 goto end;
449 }
450
451 DBG("[notification-thread] Listening on notification channel socket");
452 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
453 if (ret < 0) {
454 ERR("[notification-thread] Listen failed on notification channel socket");
455 goto error;
456 }
457
458 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
459 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
460 if (!state->client_socket_ht) {
461 goto error;
462 }
463
ac1889bf
JG
464 state->client_id_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
465 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
466 if (!state->client_id_ht) {
467 goto error;
468 }
469
ab0ee2ca
JG
470 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
471 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
472 if (!state->channel_triggers_ht) {
473 goto error;
474 }
475
ea9a44f0
JG
476 state->session_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
477 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
478 if (!state->session_triggers_ht) {
479 goto error;
480 }
481
ab0ee2ca
JG
482 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
483 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
484 if (!state->channel_state_ht) {
485 goto error;
486 }
487
488 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
489 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
490 if (!state->notification_trigger_clients_ht) {
491 goto error;
492 }
493
494 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
495 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
496 if (!state->channels_ht) {
497 goto error;
498 }
8abe313a
JG
499 state->sessions_ht = cds_lfht_new(DEFAULT_HT_SIZE,
500 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
501 if (!state->sessions_ht) {
502 goto error;
503 }
ab0ee2ca
JG
504 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
505 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
506 if (!state->triggers_ht) {
507 goto error;
f2b3ef9f 508 }
2463b787
JR
509 state->triggers_by_name_uid_ht = cds_lfht_new(DEFAULT_HT_SIZE,
510 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
511 if (!state->triggers_by_name_uid_ht) {
512 goto error;
513 }
f2b3ef9f 514
2463b787
JR
515 state->trigger_tokens_ht = cds_lfht_new(DEFAULT_HT_SIZE,
516 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
517 if (!state->trigger_tokens_ht) {
518 goto error;
519 }
f2b3ef9f
JG
520 state->executor = action_executor_create(handle);
521 if (!state->executor) {
522 goto error;
ab0ee2ca 523 }
c8a9de5a 524 mark_thread_as_ready(handle);
ab0ee2ca
JG
525end:
526 return 0;
527error:
528 fini_thread_state(state);
529 return -1;
530}
531
532static
533int handle_channel_monitoring_pipe(int fd, uint32_t revents,
534 struct notification_thread_handle *handle,
535 struct notification_thread_state *state)
536{
537 int ret = 0;
538 enum lttng_domain_type domain;
539
540 if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
541 fd == handle->channel_monitoring_pipes.ust64_consumer) {
542 domain = LTTNG_DOMAIN_UST;
543 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
544 domain = LTTNG_DOMAIN_KERNEL;
545 } else {
546 abort();
547 }
548
549 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
550 ret = lttng_poll_del(&state->events, fd);
551 if (ret) {
552 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
553 }
554 goto end;
555 }
556
557 ret = handle_notification_thread_channel_sample(
558 state, fd, domain);
559 if (ret) {
4149ace8 560 ERR("[notification-thread] Consumer sample handling error occurred");
ab0ee2ca
JG
561 ret = -1;
562 goto end;
563 }
564end:
565 return ret;
566}
567
2463b787
JR
568static int handle_trigger_event_pipe(int fd,
569 enum lttng_domain_type domain,
570 uint32_t revents,
571 struct notification_thread_state *state)
572{
573 int ret = 0;
574
575 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
576 ret = lttng_poll_del(&state->events, fd);
577 if (ret) {
578 ERR("[notification-thread] Failed to remove event monitoring pipe from poll set");
579 }
580 goto end;
581 }
582
583 if (testpoint(sessiond_handle_trigger_event_pipe)) {
584 ret = 0;
585 goto end;
586 }
587
588 if (caa_unlikely(trigger_consumption_paused)) {
589 DBG("Trigger consumption paused, sleeping...");
590 sleep(1);
591 goto end;
592 }
593
594 ret = handle_notification_thread_event(state, fd, domain);
595 if (ret) {
596 ERR("[notification-thread] Event sample handling error occurred for fd: %d", fd);
597 ret = -1;
598 goto end;
599 }
600
601end:
602 return ret;
603}
604
605/*
606 * Return the event source domain type via parameter.
607 */
608static bool fd_is_event_source(struct notification_thread_handle *handle, int fd, enum lttng_domain_type *domain)
609{
610 struct notification_event_trigger_source_element *source_element, *tmp;
611
612 assert(domain);
613
614 cds_list_for_each_entry_safe(source_element, tmp,
615 &handle->event_trigger_sources.list, node) {
616 if (source_element->fd != fd) {
617 continue;
618 }
619 *domain = source_element->domain;
620 return true;
621 }
622 return false;
623}
624
ab0ee2ca
JG
625/*
626 * This thread services notification channel clients and commands received
627 * from various lttng-sessiond components over a command queue.
628 */
c8a9de5a 629static
ab0ee2ca
JG
630void *thread_notification(void *data)
631{
632 int ret;
633 struct notification_thread_handle *handle = data;
634 struct notification_thread_state state;
2463b787 635 enum lttng_domain_type domain;
ab0ee2ca
JG
636
637 DBG("[notification-thread] Started notification thread");
638
f620cc28
JG
639 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
640 rcu_register_thread();
641 rcu_thread_online();
642
ab0ee2ca
JG
643 if (!handle) {
644 ERR("[notification-thread] Invalid thread context provided");
645 goto end;
646 }
647
ab0ee2ca
JG
648 health_code_update();
649
650 ret = init_thread_state(handle, &state);
651 if (ret) {
652 goto end;
653 }
654
2463b787
JR
655 if (testpoint(sessiond_thread_notification)) {
656 goto end;
657 }
658
ab0ee2ca
JG
659 while (true) {
660 int fd_count, i;
661
662 health_poll_entry();
663 DBG("[notification-thread] Entering poll wait");
664 ret = lttng_poll_wait(&state.events, -1);
665 DBG("[notification-thread] Poll wait returned (%i)", ret);
666 health_poll_exit();
667 if (ret < 0) {
668 /*
669 * Restart interrupted system call.
670 */
671 if (errno == EINTR) {
672 continue;
673 }
674 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
675 goto error;
676 }
677
678 fd_count = ret;
679 for (i = 0; i < fd_count; i++) {
680 int fd = LTTNG_POLL_GETFD(&state.events, i);
681 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
682
683 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
684
685 if (fd == state.notification_channel_socket) {
686 if (revents & LPOLLIN) {
687 ret = handle_notification_thread_client_connect(
688 &state);
689 if (ret < 0) {
690 goto error;
691 }
692 } else if (revents &
693 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
694 ERR("[notification-thread] Notification socket poll error");
695 goto error;
696 } else {
697 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
698 goto error;
699 }
814b4934 700 } else if (fd == lttng_pipe_get_readfd(handle->cmd_queue.event_pipe)) {
ab0ee2ca
JG
701 ret = handle_notification_thread_command(handle,
702 &state);
703 if (ret < 0) {
704 DBG("[notification-thread] Error encountered while servicing command queue");
705 goto error;
706 } else if (ret > 0) {
707 goto exit;
708 }
709 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer ||
710 fd == handle->channel_monitoring_pipes.ust64_consumer ||
711 fd == handle->channel_monitoring_pipes.kernel_consumer) {
712 ret = handle_channel_monitoring_pipe(fd,
713 revents, handle, &state);
714 if (ret) {
715 goto error;
716 }
2463b787
JR
717 } else if (fd_is_event_source(handle, fd, &domain)) {
718 ret = handle_trigger_event_pipe(fd, domain, revents, &state);
719 if (ret) {
720 goto error;
721 }
ab0ee2ca
JG
722 } else {
723 /* Activity on a client's socket. */
724 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
725 /*
726 * It doesn't matter if a command was
727 * pending on the client socket at this
728 * point since it now has no way to
729 * receive the notifications to which
730 * it was subscribing or unsubscribing.
731 */
732 ret = handle_notification_thread_client_disconnect(
733 fd, &state);
734 if (ret) {
735 goto error;
736 }
737 } else {
738 if (revents & LPOLLIN) {
739 ret = handle_notification_thread_client_in(
740 &state, fd);
741 if (ret) {
742 goto error;
743 }
744 }
745
746 if (revents & LPOLLOUT) {
747 ret = handle_notification_thread_client_out(
748 &state, fd);
749 if (ret) {
750 goto error;
751 }
752 }
753 }
754 }
755 }
756 }
757exit:
758error:
759 fini_thread_state(&state);
f620cc28 760end:
ab0ee2ca
JG
761 rcu_thread_offline();
762 rcu_unregister_thread();
f620cc28 763 health_unregister(health_sessiond);
ab0ee2ca
JG
764 return NULL;
765}
c8a9de5a
JG
766
767static
768bool shutdown_notification_thread(void *thread_data)
769{
770 struct notification_thread_handle *handle = thread_data;
771
772 notification_thread_command_quit(handle);
773 return true;
774}
775
4a91420c
JG
776struct lttng_thread *launch_notification_thread(
777 struct notification_thread_handle *handle)
c8a9de5a
JG
778{
779 struct lttng_thread *thread;
780
781 thread = lttng_thread_create("Notification",
782 thread_notification,
783 shutdown_notification_thread,
784 NULL,
785 handle);
786 if (!thread) {
787 goto error;
788 }
789
790 /*
791 * Wait for the thread to be marked as "ready" before returning
792 * as other subsystems depend on the notification subsystem
793 * (e.g. rotation thread).
794 */
795 wait_until_thread_is_ready(handle);
4a91420c 796 return thread;
c8a9de5a 797error:
4a91420c 798 return NULL;
c8a9de5a 799}
This page took 0.077274 seconds and 5 git commands to generate.