2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
9 #include <lttng/trigger/trigger.h>
10 #include <lttng/notification/channel-internal.h>
11 #include <lttng/notification/notification-internal.h>
12 #include <lttng/condition/condition-internal.h>
13 #include <lttng/condition/buffer-usage-internal.h>
14 #include <common/error.h>
15 #include <common/config/session-config.h>
16 #include <common/defaults.h>
17 #include <common/utils.h>
18 #include <common/align.h>
19 #include <common/time.h>
20 #include <sys/eventfd.h>
25 #include "notification-thread.h"
26 #include "notification-thread-events.h"
27 #include "notification-thread-commands.h"
28 #include "lttng-sessiond.h"
29 #include "health-sessiond.h"
31 #include "testpoint.h"
34 #include <common/kernel-ctl/kernel-ctl.h>
37 #include <urcu/list.h>
38 #include <urcu/rculfhash.h>
41 int trigger_consumption_paused
;
43 * Destroy the thread data previously created by the init function.
45 void notification_thread_handle_destroy(
46 struct notification_thread_handle
*handle
)
54 assert(cds_list_empty(&handle
->cmd_queue
.list
));
55 pthread_mutex_destroy(&handle
->cmd_queue
.lock
);
56 sem_destroy(&handle
->ready
);
58 if (handle
->cmd_queue
.event_pipe
) {
59 lttng_pipe_destroy(handle
->cmd_queue
.event_pipe
);
61 if (handle
->channel_monitoring_pipes
.ust32_consumer
>= 0) {
62 ret
= close(handle
->channel_monitoring_pipes
.ust32_consumer
);
64 PERROR("close 32-bit consumer channel monitoring pipe");
67 if (handle
->channel_monitoring_pipes
.ust64_consumer
>= 0) {
68 ret
= close(handle
->channel_monitoring_pipes
.ust64_consumer
);
70 PERROR("close 64-bit consumer channel monitoring pipe");
73 if (handle
->channel_monitoring_pipes
.kernel_consumer
>= 0) {
74 ret
= close(handle
->channel_monitoring_pipes
.kernel_consumer
);
76 PERROR("close kernel consumer channel monitoring pipe");
85 * TODO: refactor this if needed. Lifetime of the kernel notification event source.
86 * The kernel_notification_monitor_fd ownwership remain to the main thread.
87 * This is because we need to close this fd before removing the modules.
89 struct notification_thread_handle
*notification_thread_handle_create(
90 struct lttng_pipe
*ust32_channel_monitor_pipe
,
91 struct lttng_pipe
*ust64_channel_monitor_pipe
,
92 struct lttng_pipe
*kernel_channel_monitor_pipe
,
93 int kernel_notification_monitor_fd
)
96 struct notification_thread_handle
*handle
;
97 struct lttng_pipe
*event_pipe
= NULL
;
99 handle
= zmalloc(sizeof(*handle
));
104 sem_init(&handle
->ready
, 0, 0);
106 event_pipe
= lttng_pipe_open(FD_CLOEXEC
);
108 ERR("event_pipe creation");
112 handle
->cmd_queue
.event_pipe
= event_pipe
;
115 CDS_INIT_LIST_HEAD(&handle
->cmd_queue
.list
);
116 ret
= pthread_mutex_init(&handle
->cmd_queue
.lock
, NULL
);
121 if (ust32_channel_monitor_pipe
) {
122 handle
->channel_monitoring_pipes
.ust32_consumer
=
123 lttng_pipe_release_readfd(
124 ust32_channel_monitor_pipe
);
125 if (handle
->channel_monitoring_pipes
.ust32_consumer
< 0) {
129 handle
->channel_monitoring_pipes
.ust32_consumer
= -1;
131 if (ust64_channel_monitor_pipe
) {
132 handle
->channel_monitoring_pipes
.ust64_consumer
=
133 lttng_pipe_release_readfd(
134 ust64_channel_monitor_pipe
);
135 if (handle
->channel_monitoring_pipes
.ust64_consumer
< 0) {
139 handle
->channel_monitoring_pipes
.ust64_consumer
= -1;
141 if (kernel_channel_monitor_pipe
) {
142 handle
->channel_monitoring_pipes
.kernel_consumer
=
143 lttng_pipe_release_readfd(
144 kernel_channel_monitor_pipe
);
145 if (handle
->channel_monitoring_pipes
.kernel_consumer
< 0) {
149 handle
->channel_monitoring_pipes
.kernel_consumer
= -1;
152 CDS_INIT_LIST_HEAD(&handle
->event_trigger_sources
.list
);
153 ret
= pthread_mutex_init(&handle
->event_trigger_sources
.lock
, NULL
);
160 lttng_pipe_destroy(event_pipe
);
161 notification_thread_handle_destroy(handle
);
166 char *get_notification_channel_sock_path(void)
169 bool is_root
= !getuid();
172 sock_path
= zmalloc(LTTNG_PATH_MAX
);
178 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
179 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
);
184 const char *home_path
= utils_get_home_dir();
187 ERR("Can't get HOME directory for socket creation");
191 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
192 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
206 void notification_channel_socket_destroy(int fd
)
209 char *sock_path
= get_notification_channel_sock_path();
211 DBG("[notification-thread] Destroying notification channel socket");
214 ret
= unlink(sock_path
);
217 PERROR("unlink notification channel socket");
223 PERROR("close notification channel socket");
228 int notification_channel_socket_create(void)
231 char *sock_path
= get_notification_channel_sock_path();
233 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
236 ret
= lttcomm_create_unix_sock(sock_path
);
238 ERR("[notification-thread] Failed to create notification socket");
243 ret
= chmod(sock_path
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
245 ERR("Set file permissions failed: %s", sock_path
);
246 PERROR("chmod notification channel socket");
253 ret
= utils_get_group_id(config
.tracing_group_name
.value
, true,
256 /* Default to root group. */
260 ret
= chown(sock_path
, 0, gid
);
262 ERR("Failed to set the notification channel socket's group");
268 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
273 if (fd
>= 0 && close(fd
) < 0) {
274 PERROR("close notification channel socket");
281 int init_poll_set(struct lttng_poll_event
*poll_set
,
282 struct notification_thread_handle
*handle
,
283 int notification_channel_socket
)
288 * Create pollset with size 5:
289 * - notification channel socket (listen for new connections),
290 * - command queue event fd (internal sessiond commands),
291 * - consumerd (32-bit user space) channel monitor pipe,
292 * - consumerd (64-bit user space) channel monitor pipe,
293 * - consumerd (kernel) channel monitor pipe.
295 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
300 ret
= lttng_poll_add(poll_set
, notification_channel_socket
,
301 LPOLLIN
| LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
);
303 ERR("[notification-thread] Failed to add notification channel socket to pollset");
306 ret
= lttng_poll_add(poll_set
, lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
),
309 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
312 ret
= lttng_poll_add(poll_set
,
313 handle
->channel_monitoring_pipes
.ust32_consumer
,
316 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
319 ret
= lttng_poll_add(poll_set
,
320 handle
->channel_monitoring_pipes
.ust64_consumer
,
323 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
326 if (handle
->channel_monitoring_pipes
.kernel_consumer
< 0) {
327 goto skip_kernel_consumer
;
329 ret
= lttng_poll_add(poll_set
,
330 handle
->channel_monitoring_pipes
.kernel_consumer
,
333 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
337 skip_kernel_consumer
:
341 lttng_poll_clean(poll_set
);
346 void fini_thread_state(struct notification_thread_state
*state
)
350 if (state
->client_socket_ht
) {
351 ret
= handle_notification_thread_client_disconnect_all(state
);
353 ret
= cds_lfht_destroy(state
->client_socket_ht
, NULL
);
356 if (state
->client_id_ht
) {
357 ret
= cds_lfht_destroy(state
->client_id_ht
, NULL
);
360 if (state
->triggers_ht
) {
361 ret
= handle_notification_thread_trigger_unregister_all(state
);
363 ret
= cds_lfht_destroy(state
->triggers_ht
, NULL
);
366 if (state
->channel_triggers_ht
) {
367 ret
= cds_lfht_destroy(state
->channel_triggers_ht
, NULL
);
370 if (state
->channel_state_ht
) {
371 ret
= cds_lfht_destroy(state
->channel_state_ht
, NULL
);
374 if (state
->notification_trigger_clients_ht
) {
375 ret
= cds_lfht_destroy(state
->notification_trigger_clients_ht
,
379 if (state
->channels_ht
) {
380 ret
= cds_lfht_destroy(state
->channels_ht
, NULL
);
383 if (state
->sessions_ht
) {
384 ret
= cds_lfht_destroy(state
->sessions_ht
, NULL
);
387 if (state
->triggers_by_name_uid_ht
) {
388 ret
= cds_lfht_destroy(state
->triggers_by_name_uid_ht
, NULL
);
391 if (state
->trigger_tokens_ht
) {
392 ret
= cds_lfht_destroy(state
->trigger_tokens_ht
, NULL
);
396 * Must be destroyed after all channels have been destroyed.
397 * See comment in struct lttng_session_trigger_list.
399 if (state
->session_triggers_ht
) {
400 ret
= cds_lfht_destroy(state
->session_triggers_ht
, NULL
);
403 if (state
->notification_channel_socket
>= 0) {
404 notification_channel_socket_destroy(
405 state
->notification_channel_socket
);
407 if (state
->executor
) {
408 action_executor_destroy(state
->executor
);
410 lttng_poll_clean(&state
->events
);
414 void mark_thread_as_ready(struct notification_thread_handle
*handle
)
416 DBG("Marking notification thread as ready");
417 sem_post(&handle
->ready
);
421 void wait_until_thread_is_ready(struct notification_thread_handle
*handle
)
423 DBG("Waiting for notification thread to be ready");
424 sem_wait(&handle
->ready
);
425 DBG("Notification thread is ready");
429 int init_thread_state(struct notification_thread_handle
*handle
,
430 struct notification_thread_state
*state
)
434 memset(state
, 0, sizeof(*state
));
435 state
->notification_channel_socket
= -1;
436 state
->trigger_id
.token_generator
= 1;
437 lttng_poll_init(&state
->events
);
439 ret
= notification_channel_socket_create();
443 state
->notification_channel_socket
= ret
;
445 ret
= init_poll_set(&state
->events
, handle
,
446 state
->notification_channel_socket
);
451 DBG("[notification-thread] Listening on notification channel socket");
452 ret
= lttcomm_listen_unix_sock(state
->notification_channel_socket
);
454 ERR("[notification-thread] Listen failed on notification channel socket");
458 state
->client_socket_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
459 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
460 if (!state
->client_socket_ht
) {
464 state
->client_id_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
465 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
466 if (!state
->client_id_ht
) {
470 state
->channel_triggers_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
471 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
472 if (!state
->channel_triggers_ht
) {
476 state
->session_triggers_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
477 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
478 if (!state
->session_triggers_ht
) {
482 state
->channel_state_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
483 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
484 if (!state
->channel_state_ht
) {
488 state
->notification_trigger_clients_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
489 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
490 if (!state
->notification_trigger_clients_ht
) {
494 state
->channels_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
495 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
496 if (!state
->channels_ht
) {
499 state
->sessions_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
500 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
501 if (!state
->sessions_ht
) {
504 state
->triggers_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
505 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
506 if (!state
->triggers_ht
) {
509 state
->triggers_by_name_uid_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
510 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
511 if (!state
->triggers_by_name_uid_ht
) {
515 state
->trigger_tokens_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
516 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
517 if (!state
->trigger_tokens_ht
) {
520 state
->executor
= action_executor_create(handle
);
521 if (!state
->executor
) {
524 mark_thread_as_ready(handle
);
528 fini_thread_state(state
);
533 int handle_channel_monitoring_pipe(int fd
, uint32_t revents
,
534 struct notification_thread_handle
*handle
,
535 struct notification_thread_state
*state
)
538 enum lttng_domain_type domain
;
540 if (fd
== handle
->channel_monitoring_pipes
.ust32_consumer
||
541 fd
== handle
->channel_monitoring_pipes
.ust64_consumer
) {
542 domain
= LTTNG_DOMAIN_UST
;
543 } else if (fd
== handle
->channel_monitoring_pipes
.kernel_consumer
) {
544 domain
= LTTNG_DOMAIN_KERNEL
;
549 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
550 ret
= lttng_poll_del(&state
->events
, fd
);
552 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
557 ret
= handle_notification_thread_channel_sample(
560 ERR("[notification-thread] Consumer sample handling error occurred");
568 static int handle_trigger_event_pipe(int fd
,
569 enum lttng_domain_type domain
,
571 struct notification_thread_state
*state
)
575 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
576 ret
= lttng_poll_del(&state
->events
, fd
);
578 ERR("[notification-thread] Failed to remove event monitoring pipe from poll set");
583 if (testpoint(sessiond_handle_trigger_event_pipe
)) {
588 if (caa_unlikely(trigger_consumption_paused
)) {
589 DBG("Trigger consumption paused, sleeping...");
594 ret
= handle_notification_thread_event(state
, fd
, domain
);
596 ERR("[notification-thread] Event sample handling error occurred for fd: %d", fd
);
606 * Return the event source domain type via parameter.
608 static bool fd_is_event_source(struct notification_thread_handle
*handle
, int fd
, enum lttng_domain_type
*domain
)
610 struct notification_event_trigger_source_element
*source_element
, *tmp
;
614 cds_list_for_each_entry_safe(source_element
, tmp
,
615 &handle
->event_trigger_sources
.list
, node
) {
616 if (source_element
->fd
!= fd
) {
619 *domain
= source_element
->domain
;
626 * This thread services notification channel clients and commands received
627 * from various lttng-sessiond components over a command queue.
630 void *thread_notification(void *data
)
633 struct notification_thread_handle
*handle
= data
;
634 struct notification_thread_state state
;
635 enum lttng_domain_type domain
;
637 DBG("[notification-thread] Started notification thread");
639 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_NOTIFICATION
);
640 rcu_register_thread();
644 ERR("[notification-thread] Invalid thread context provided");
648 health_code_update();
650 ret
= init_thread_state(handle
, &state
);
655 if (testpoint(sessiond_thread_notification
)) {
663 DBG("[notification-thread] Entering poll wait");
664 ret
= lttng_poll_wait(&state
.events
, -1);
665 DBG("[notification-thread] Poll wait returned (%i)", ret
);
669 * Restart interrupted system call.
671 if (errno
== EINTR
) {
674 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret
);
679 for (i
= 0; i
< fd_count
; i
++) {
680 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
681 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
683 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd
, revents
);
685 if (fd
== state
.notification_channel_socket
) {
686 if (revents
& LPOLLIN
) {
687 ret
= handle_notification_thread_client_connect(
693 (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
694 ERR("[notification-thread] Notification socket poll error");
697 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents
, fd
);
700 } else if (fd
== lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
)) {
701 ret
= handle_notification_thread_command(handle
,
704 DBG("[notification-thread] Error encountered while servicing command queue");
706 } else if (ret
> 0) {
709 } else if (fd
== handle
->channel_monitoring_pipes
.ust32_consumer
||
710 fd
== handle
->channel_monitoring_pipes
.ust64_consumer
||
711 fd
== handle
->channel_monitoring_pipes
.kernel_consumer
) {
712 ret
= handle_channel_monitoring_pipe(fd
,
713 revents
, handle
, &state
);
717 } else if (fd_is_event_source(handle
, fd
, &domain
)) {
718 ret
= handle_trigger_event_pipe(fd
, domain
, revents
, &state
);
723 /* Activity on a client's socket. */
724 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
726 * It doesn't matter if a command was
727 * pending on the client socket at this
728 * point since it now has no way to
729 * receive the notifications to which
730 * it was subscribing or unsubscribing.
732 ret
= handle_notification_thread_client_disconnect(
738 if (revents
& LPOLLIN
) {
739 ret
= handle_notification_thread_client_in(
746 if (revents
& LPOLLOUT
) {
747 ret
= handle_notification_thread_client_out(
759 fini_thread_state(&state
);
761 rcu_thread_offline();
762 rcu_unregister_thread();
763 health_unregister(health_sessiond
);
768 bool shutdown_notification_thread(void *thread_data
)
770 struct notification_thread_handle
*handle
= thread_data
;
772 notification_thread_command_quit(handle
);
776 struct lttng_thread
*launch_notification_thread(
777 struct notification_thread_handle
*handle
)
779 struct lttng_thread
*thread
;
781 thread
= lttng_thread_create("Notification",
783 shutdown_notification_thread
,
791 * Wait for the thread to be marked as "ready" before returning
792 * as other subsystems depend on the notification subsystem
793 * (e.g. rotation thread).
795 wait_until_thread_is_ready(handle
);