2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <lttng/notification/channel-internal.h>
21 #include <lttng/notification/notification-internal.h>
22 #include <lttng/condition/condition-internal.h>
23 #include <lttng/condition/buffer-usage-internal.h>
24 #include <common/error.h>
25 #include <common/config/session-config.h>
26 #include <common/defaults.h>
27 #include <common/utils.h>
28 #include <common/futex.h>
29 #include <common/align.h>
30 #include <common/time.h>
31 #include <sys/eventfd.h>
36 #include "notification-thread.h"
37 #include "notification-thread-events.h"
38 #include "notification-thread-commands.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
43 #include <urcu/list.h>
44 #include <urcu/rculfhash.h>
47 * This thread maintains an internal state associating clients and triggers.
49 * In order to speed-up and simplify queries, hash tables providing the
50 * following associations are maintained:
52 * - client_socket_ht: associate a client's socket (fd) to its "struct client"
53 * This hash table owns the "struct client" which must thus be
54 * disposed-of on removal from the hash table.
56 * - channel_triggers_ht:
57 * associates a channel key to a list of
58 * struct lttng_trigger_list_nodes. The triggers in this list are
59 * those that have conditions that apply to this channel.
60 * This hash table owns the list, but not the triggers themselves.
63 * associates a pair (channel key, channel domain) to its last
64 * sampled state received from the consumer daemon
65 * (struct channel_state).
66 * This previous sample is kept to implement edge-triggered
67 * conditions as we need to detect the state transitions.
68 * This hash table owns the channel state.
70 * - notification_trigger_clients_ht:
71 * associates notification-emitting triggers to clients
72 * (struct notification_client_ht_node) subscribed to those
74 * The condition's hash and match functions are used directly since
75 * all triggers in this hash table have the "notify" action.
76 * This hash table holds no ownership.
79 * associates a channel_key to a struct channel_info. The hash table
80 * holds the ownership of the struct channel_info.
83 * associated a condition to a struct lttng_trigger_ht_element.
84 * The hash table holds the ownership of the
85 * lttng_trigger_ht_elements along with the triggers themselves.
87 * The thread reacts to the following internal events:
88 * 1) creation of a tracing channel,
89 * 2) destruction of a tracing channel,
90 * 3) registration of a trigger,
91 * 4) unregistration of a trigger,
92 * 5) reception of a channel monitor sample from the consumer daemon.
94 * Events specific to notification-emitting triggers:
95 * 6) connection of a notification client,
96 * 7) disconnection of a notification client,
97 * 8) subscription of a client to a conditions' notifications,
98 * 9) unsubscription of a client from a conditions' notifications,
101 * 1) Creation of a tracing channel
102 * - notification_trigger_clients_ht is traversed to identify
103 * triggers which apply to this new channel,
104 * - triggers identified are added to the channel_triggers_ht.
105 * - add channel to channels_ht
107 * 2) Destruction of a tracing channel
108 * - remove entry from channel_triggers_ht, releasing the list wrapper and
110 * - remove entry from the channel_state_ht.
111 * - remove channel from channels_ht
113 * 3) Registration of a trigger
114 * - if the trigger's action is of type "notify",
115 * - traverse the list of conditions of every client to build a list of
116 * clients which have to be notified when this trigger's condition is met,
117 * - add list of clients (even if it is empty) to the
118 * notification_trigger_clients_ht,
119 * - add trigger to channel_triggers_ht (if applicable),
120 * - add trigger to triggers_ht
122 * 4) Unregistration of a trigger
123 * - if the trigger's action is of type "notify",
124 * - remove the trigger from the notification_trigger_clients_ht,
125 * - remove trigger from channel_triggers_ht (if applicable),
126 * - remove trigger from triggers_ht
128 * 5) Reception of a channel monitor sample from the consumer daemon
129 * - evaluate the conditions associated with the triggers found in
130 * the channel_triggers_ht,
131 * - if a condition evaluates to "true" and the condition is of type
132 * "notify", query the notification_trigger_clients_ht and send
133 * a notification to the clients.
135 * 6) Connection of a client
136 * - add client socket to the client_socket_ht.
138 * 7) Disconnection of a client
139 * - remove client socket from the client_socket_ht,
140 * - traverse all conditions to which the client is subscribed and remove
141 * the client from the notification_trigger_clients_ht.
143 * 8) Subscription of a client to a condition's notifications
144 * - Add the condition to the client's list of subscribed conditions,
145 * - Look-up notification_trigger_clients_ht and add the client to
148 * 9) Unsubscription of a client to a condition's notifications
149 * - Remove the condition from the client's list of subscribed conditions,
150 * - Look-up notification_trigger_clients_ht and remove the client
151 * from the list of clients.
155 * Destroy the thread data previously created by the init function.
157 void notification_thread_handle_destroy(
158 struct notification_thread_handle
*handle
)
161 struct notification_thread_command
*cmd
, *tmp
;
167 if (handle
->cmd_queue
.event_fd
< 0) {
170 ret
= close(handle
->cmd_queue
.event_fd
);
172 PERROR("close notification command queue event_fd");
175 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
176 /* Purge queue of in-flight commands and mark them as cancelled. */
177 cds_list_for_each_entry_safe(cmd
, tmp
, &handle
->cmd_queue
.list
,
179 cds_list_del(&cmd
->cmd_list_node
);
180 cmd
->reply_code
= LTTNG_ERR_COMMAND_CANCELLED
;
181 futex_nto1_wake(&cmd
->reply_futex
);
183 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
184 pthread_mutex_destroy(&handle
->cmd_queue
.lock
);
186 if (handle
->channel_monitoring_pipes
.ust32_consumer
>= 0) {
187 ret
= close(handle
->channel_monitoring_pipes
.ust32_consumer
);
189 PERROR("close 32-bit consumer channel monitoring pipe");
192 if (handle
->channel_monitoring_pipes
.ust64_consumer
>= 0) {
193 ret
= close(handle
->channel_monitoring_pipes
.ust64_consumer
);
195 PERROR("close 64-bit consumer channel monitoring pipe");
198 if (handle
->channel_monitoring_pipes
.kernel_consumer
>= 0) {
199 ret
= close(handle
->channel_monitoring_pipes
.kernel_consumer
);
201 PERROR("close kernel consumer channel monitoring pipe");
208 struct notification_thread_handle
*notification_thread_handle_create(
209 struct lttng_pipe
*ust32_channel_monitor_pipe
,
210 struct lttng_pipe
*ust64_channel_monitor_pipe
,
211 struct lttng_pipe
*kernel_channel_monitor_pipe
)
214 struct notification_thread_handle
*handle
;
216 handle
= zmalloc(sizeof(*handle
));
221 /* FIXME Replace eventfd by a pipe to support older kernels. */
222 handle
->cmd_queue
.event_fd
= eventfd(0, EFD_CLOEXEC
);
223 if (handle
->cmd_queue
.event_fd
< 0) {
224 PERROR("eventfd notification command queue");
227 CDS_INIT_LIST_HEAD(&handle
->cmd_queue
.list
);
228 ret
= pthread_mutex_init(&handle
->cmd_queue
.lock
, NULL
);
233 if (ust32_channel_monitor_pipe
) {
234 handle
->channel_monitoring_pipes
.ust32_consumer
=
235 lttng_pipe_release_readfd(
236 ust32_channel_monitor_pipe
);
237 if (handle
->channel_monitoring_pipes
.ust32_consumer
< 0) {
241 handle
->channel_monitoring_pipes
.ust32_consumer
= -1;
243 if (ust64_channel_monitor_pipe
) {
244 handle
->channel_monitoring_pipes
.ust64_consumer
=
245 lttng_pipe_release_readfd(
246 ust64_channel_monitor_pipe
);
247 if (handle
->channel_monitoring_pipes
.ust64_consumer
< 0) {
251 handle
->channel_monitoring_pipes
.ust64_consumer
= -1;
253 if (kernel_channel_monitor_pipe
) {
254 handle
->channel_monitoring_pipes
.kernel_consumer
=
255 lttng_pipe_release_readfd(
256 kernel_channel_monitor_pipe
);
257 if (handle
->channel_monitoring_pipes
.kernel_consumer
< 0) {
261 handle
->channel_monitoring_pipes
.kernel_consumer
= -1;
266 notification_thread_handle_destroy(handle
);
271 char *get_notification_channel_sock_path(void)
274 bool is_root
= !getuid();
277 sock_path
= zmalloc(LTTNG_PATH_MAX
);
283 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
284 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
);
289 char *home_path
= utils_get_home_dir();
292 ERR("Can't get HOME directory for socket creation");
296 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
297 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
311 void notification_channel_socket_destroy(int fd
)
314 char *sock_path
= get_notification_channel_sock_path();
316 DBG("[notification-thread] Destroying notification channel socket");
319 ret
= unlink(sock_path
);
322 PERROR("unlink notification channel socket");
328 PERROR("close notification channel socket");
333 int notification_channel_socket_create(void)
336 char *sock_path
= get_notification_channel_sock_path();
338 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
341 ret
= lttcomm_create_unix_sock(sock_path
);
343 ERR("[notification-thread] Failed to create notification socket");
348 ret
= chmod(sock_path
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
350 ERR("Set file permissions failed: %s", sock_path
);
351 PERROR("chmod notification channel socket");
356 ret
= chown(sock_path
, 0,
357 utils_get_group_id(tracing_group_name
));
359 ERR("Failed to set the notification channel socket's group");
365 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
370 if (fd
>= 0 && close(fd
) < 0) {
371 PERROR("close notification channel socket");
378 int init_poll_set(struct lttng_poll_event
*poll_set
,
379 struct notification_thread_handle
*handle
,
380 int notification_channel_socket
)
385 * Create pollset with size 5:
386 * - notification channel socket (listen for new connections),
387 * - command queue event fd (internal sessiond commands),
388 * - consumerd (32-bit user space) channel monitor pipe,
389 * - consumerd (64-bit user space) channel monitor pipe,
390 * - consumerd (kernel) channel monitor pipe.
392 ret
= lttng_poll_create(poll_set
, 5, LTTNG_CLOEXEC
);
397 ret
= lttng_poll_add(poll_set
, notification_channel_socket
,
398 LPOLLIN
| LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
);
400 ERR("[notification-thread] Failed to add notification channel socket to pollset");
403 ret
= lttng_poll_add(poll_set
, handle
->cmd_queue
.event_fd
,
406 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
409 ret
= lttng_poll_add(poll_set
,
410 handle
->channel_monitoring_pipes
.ust32_consumer
,
413 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
416 ret
= lttng_poll_add(poll_set
,
417 handle
->channel_monitoring_pipes
.ust64_consumer
,
420 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
423 if (handle
->channel_monitoring_pipes
.kernel_consumer
< 0) {
426 ret
= lttng_poll_add(poll_set
,
427 handle
->channel_monitoring_pipes
.kernel_consumer
,
430 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
436 lttng_poll_clean(poll_set
);
441 void fini_thread_state(struct notification_thread_state
*state
)
445 if (state
->client_socket_ht
) {
446 ret
= handle_notification_thread_client_disconnect_all(state
);
448 ret
= cds_lfht_destroy(state
->client_socket_ht
, NULL
);
451 if (state
->triggers_ht
) {
452 ret
= handle_notification_thread_trigger_unregister_all(state
);
454 ret
= cds_lfht_destroy(state
->triggers_ht
, NULL
);
457 if (state
->channel_triggers_ht
) {
458 ret
= cds_lfht_destroy(state
->channel_triggers_ht
, NULL
);
461 if (state
->channel_state_ht
) {
462 ret
= cds_lfht_destroy(state
->channel_state_ht
, NULL
);
465 if (state
->notification_trigger_clients_ht
) {
466 ret
= cds_lfht_destroy(state
->notification_trigger_clients_ht
,
470 if (state
->channels_ht
) {
471 ret
= cds_lfht_destroy(state
->channels_ht
,
476 if (state
->notification_channel_socket
>= 0) {
477 notification_channel_socket_destroy(
478 state
->notification_channel_socket
);
480 lttng_poll_clean(&state
->events
);
484 int init_thread_state(struct notification_thread_handle
*handle
,
485 struct notification_thread_state
*state
)
489 memset(state
, 0, sizeof(*state
));
490 state
->notification_channel_socket
= -1;
491 lttng_poll_init(&state
->events
);
493 ret
= notification_channel_socket_create();
497 state
->notification_channel_socket
= ret
;
499 ret
= init_poll_set(&state
->events
, handle
,
500 state
->notification_channel_socket
);
505 DBG("[notification-thread] Listening on notification channel socket");
506 ret
= lttcomm_listen_unix_sock(state
->notification_channel_socket
);
508 ERR("[notification-thread] Listen failed on notification channel socket");
512 state
->client_socket_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
513 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
514 if (!state
->client_socket_ht
) {
518 state
->channel_triggers_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
519 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
520 if (!state
->channel_triggers_ht
) {
524 state
->channel_state_ht
= cds_lfht_new(DEFAULT_HT_SIZE
, 1, 0,
525 CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
526 if (!state
->channel_state_ht
) {
530 state
->notification_trigger_clients_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
531 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
532 if (!state
->notification_trigger_clients_ht
) {
536 state
->channels_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
537 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
538 if (!state
->channels_ht
) {
542 state
->triggers_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
543 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
544 if (!state
->triggers_ht
) {
550 fini_thread_state(state
);
555 int handle_channel_monitoring_pipe(int fd
, uint32_t revents
,
556 struct notification_thread_handle
*handle
,
557 struct notification_thread_state
*state
)
560 enum lttng_domain_type domain
;
562 if (fd
== handle
->channel_monitoring_pipes
.ust32_consumer
||
563 fd
== handle
->channel_monitoring_pipes
.ust64_consumer
) {
564 domain
= LTTNG_DOMAIN_UST
;
565 } else if (fd
== handle
->channel_monitoring_pipes
.kernel_consumer
) {
566 domain
= LTTNG_DOMAIN_KERNEL
;
571 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
572 ret
= lttng_poll_del(&state
->events
, fd
);
574 ERR("[notification-thread] Failed to remove consumer monitoring pipe from poll set");
579 ret
= handle_notification_thread_channel_sample(
582 ERR("[notification-thread] Consumer sample handling error occured");
591 * This thread services notification channel clients and commands received
592 * from various lttng-sessiond components over a command queue.
594 void *thread_notification(void *data
)
597 struct notification_thread_handle
*handle
= data
;
598 struct notification_thread_state state
;
600 DBG("[notification-thread] Started notification thread");
603 ERR("[notification-thread] Invalid thread context provided");
607 rcu_register_thread();
610 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_NOTIFICATION
);
611 health_code_update();
613 ret
= init_thread_state(handle
, &state
);
618 /* Ready to handle client connections. */
619 sessiond_notify_ready();
625 DBG("[notification-thread] Entering poll wait");
626 ret
= lttng_poll_wait(&state
.events
, -1);
627 DBG("[notification-thread] Poll wait returned (%i)", ret
);
631 * Restart interrupted system call.
633 if (errno
== EINTR
) {
636 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret
);
641 for (i
= 0; i
< fd_count
; i
++) {
642 int fd
= LTTNG_POLL_GETFD(&state
.events
, i
);
643 uint32_t revents
= LTTNG_POLL_GETEV(&state
.events
, i
);
645 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd
, revents
);
647 if (fd
== state
.notification_channel_socket
) {
648 if (revents
& LPOLLIN
) {
649 ret
= handle_notification_thread_client_connect(
655 (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
656 ERR("[notification-thread] Notification socket poll error");
659 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents
, fd
);
662 } else if (fd
== handle
->cmd_queue
.event_fd
) {
663 ret
= handle_notification_thread_command(handle
,
666 DBG("[notification-thread] Error encountered while servicing command queue");
668 } else if (ret
> 0) {
671 } else if (fd
== handle
->channel_monitoring_pipes
.ust32_consumer
||
672 fd
== handle
->channel_monitoring_pipes
.ust64_consumer
||
673 fd
== handle
->channel_monitoring_pipes
.kernel_consumer
) {
674 ret
= handle_channel_monitoring_pipe(fd
,
675 revents
, handle
, &state
);
680 /* Activity on a client's socket. */
681 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
683 * It doesn't matter if a command was
684 * pending on the client socket at this
685 * point since it now has no way to
686 * receive the notifications to which
687 * it was subscribing or unsubscribing.
689 ret
= handle_notification_thread_client_disconnect(
695 if (revents
& LPOLLIN
) {
696 ret
= handle_notification_thread_client_in(
703 if (revents
& LPOLLOUT
) {
704 ret
= handle_notification_thread_client_out(
716 fini_thread_state(&state
);
717 health_unregister(health_sessiond
);
718 rcu_thread_offline();
719 rcu_unregister_thread();