2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 #include <lttng/trigger/trigger.h>
20 #include <lttng/notification/channel-internal.h>
21 #include <lttng/notification/notification-internal.h>
22 #include <lttng/condition/condition-internal.h>
23 #include <lttng/condition/buffer-usage-internal.h>
24 #include <common/error.h>
25 #include <common/config/session-config.h>
26 #include <common/defaults.h>
27 #include <common/utils.h>
28 #include <common/align.h>
29 #include <common/time.h>
30 #include <sys/eventfd.h>
35 #include "notification-thread.h"
36 #include "lttng-sessiond.h"
37 #include "health-sessiond.h"
39 #include <urcu/list.h>
41 #define CLIENT_RECEPTION_BUFFER_SIZE (4 * PAGE_SIZE)
42 #define SIMULATION_TIMER_INTERVAL_NS 2 * NSEC_PER_SEC
43 #define SIMULATION_TIMER_SIGNAL SIGRTMIN + 10
45 static int simulation_timer_event_fd
= -1;
46 static timer_t simulation_timer
;
50 struct cds_list_head list_node
;
52 * Conditions to which the client is registered.
54 struct cds_list_head condition_list
;
57 static struct cds_list_head client_list
;
58 static struct cds_list_head trigger_list
;
59 static char *client_reception_buffer
;
62 * The simulation timer will alternate between "buffers" between full and
63 * empty values, firing all low/high usage triggers in alternance.
65 static pthread_mutex_t simulation_lock
= PTHREAD_MUTEX_INITIALIZER
;
66 static uint64_t simulation_buffer_use_bytes
;
67 static double simulation_buffer_use_ratio
= 0.0;
68 static uint64_t simulation_buffer_capacity
= UINT32_MAX
;
71 * Destroy the thread data previously created by the init function.
73 void notification_destroy_data(struct notification_thread_data
*data
)
81 if (data
->cmd_queue
.event_fd
< 0) {
84 ret
= close(data
->cmd_queue
.event_fd
);
86 PERROR("close notification command queue event_fd");
89 pthread_mutex_destroy(&data
->cmd_queue
.lock
);
90 /* TODO: purge queue and mark commands as cancelled. */
96 * Initialize the thread's data. This MUST be called before the notification
99 struct notification_thread_data
*notification_init_data(void)
102 struct notification_thread_data
*data
;
104 data
= zmalloc(sizeof(*data
));
109 data
->cmd_queue
.event_fd
= eventfd(0, EFD_CLOEXEC
);
110 if (data
->cmd_queue
.event_fd
< 0) {
111 PERROR("eventfd notification command queue");
114 CDS_INIT_LIST_HEAD(&data
->cmd_queue
.list
);
115 ret
= pthread_mutex_init(&data
->cmd_queue
.lock
, NULL
);
122 notification_destroy_data(data
);
127 void simulation_timer_thread(union sigval val
)
130 uint64_t counter
= 1;
132 pthread_mutex_lock(&simulation_lock
);
133 if (simulation_buffer_use_bytes
== 0) {
134 simulation_buffer_use_bytes
= UINT32_MAX
;
135 simulation_buffer_use_ratio
= 1.0;
137 simulation_buffer_use_bytes
= 0;
138 simulation_buffer_use_ratio
= 0.0;
140 pthread_mutex_unlock(&simulation_lock
);
141 ret
= write(simulation_timer_event_fd
, &counter
, sizeof(counter
));
143 PERROR("writer simulation timer event fd");
148 int simulation_timer_start(void)
152 struct itimerspec its
;
154 ret
= eventfd(0, EFD_CLOEXEC
);
156 PERROR("eventfd simulation timer event fd");
159 simulation_timer_event_fd
= ret
;
161 sev
.sigev_notify
= SIGEV_THREAD
;
162 sev
.sigev_value
.sival_ptr
= NULL
;
163 sev
.sigev_notify_function
= simulation_timer_thread
;
164 sev
.sigev_notify_attributes
= NULL
;
167 * Valgrind indicates a leak when timer_create() is used
168 * in the "SIGEV_THREAD" mode. This bug has been known to upstream glibc
169 * since 2009, but no fix has been implemented so far.
171 ret
= timer_create(CLOCK_MONOTONIC
, &sev
, &simulation_timer
);
173 PERROR("timer_create simulation timer");
177 its
.it_value
.tv_sec
= SIMULATION_TIMER_INTERVAL_NS
/ NSEC_PER_SEC
;
178 its
.it_value
.tv_nsec
= (SIMULATION_TIMER_INTERVAL_NS
% NSEC_PER_SEC
);
179 its
.it_interval
.tv_sec
= its
.it_value
.tv_sec
;
180 its
.it_interval
.tv_nsec
= its
.it_value
.tv_nsec
;
182 ret
= timer_settime(simulation_timer
, 0, &its
, NULL
);
184 PERROR("timer_settime simulation timer");
194 void simulation_timer_stop(void)
198 ret
= timer_delete(simulation_timer
);
200 PERROR("timer_delete simulation timer");
205 char *get_notification_channel_sock_path(void)
208 bool is_root
= !getuid();
211 sock_path
= zmalloc(LTTNG_PATH_MAX
);
217 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
218 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK
);
223 char *home_path
= utils_get_home_dir();
226 ERR("Can't get HOME directory for socket creation");
230 ret
= snprintf(sock_path
, LTTNG_PATH_MAX
,
231 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK
,
245 void notification_channel_socket_destroy(int fd
)
248 char *sock_path
= get_notification_channel_sock_path();
250 DBG("[notification-thread] Destroying notification channel socket");
253 ret
= unlink(sock_path
);
256 PERROR("unlink notification channel socket");
262 PERROR("close notification channel socket");
267 int notification_channel_socket_create(void)
270 char *sock_path
= get_notification_channel_sock_path();
272 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
275 ret
= lttcomm_create_unix_sock(sock_path
);
277 ERR("[notification-thread] Failed to create notification socket");
282 ret
= chmod(sock_path
, S_IRUSR
| S_IWUSR
| S_IRGRP
| S_IWGRP
);
284 ERR("Set file permissions failed: %s", sock_path
);
285 PERROR("chmod notification channel socket");
289 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
294 if (fd
>= 0 && close(fd
) < 0) {
295 PERROR("close notification channel socket");
302 int handle_new_connection(int socket
)
305 struct client
*client
;
307 DBG("[notification-thread] Handling new notification channel client connection");
309 client
= zmalloc(sizeof(*client
));
314 ret
= lttcomm_accept_unix_sock(socket
);
316 ERR("[notification-thread] Failed to accept new notification channel client connection");
320 client
->socket
= ret
;
321 CDS_INIT_LIST_HEAD(&client
->condition_list
);
323 /* FIXME handle creds. */
324 ret
= lttcomm_setsockopt_creds_unix_sock(socket
);
326 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
330 cds_list_add(&client
->list_node
, &client_list
);
331 return client
->socket
;
338 int send_command_reply(int socket
,
339 enum lttng_notification_channel_status status
)
342 struct lttng_notification_channel_command_reply reply
= {
343 .status
= (int8_t) status
,
346 DBG("[notification-thread] Send command reply (%i)", (int) status
);
348 ret
= lttcomm_send_unix_sock(socket
, &reply
, sizeof(reply
));
350 ERR("[notification-thread] Failed to send command reply");
359 struct client
*get_client_from_fd(int fd
)
361 struct client
*client
;
363 cds_list_for_each_entry(client
, &client_list
, list_node
) {
364 if (client
->socket
== fd
) {
372 int handle_notification_channel_client(int socket
)
376 struct client
*client
= get_client_from_fd(socket
);
377 struct lttng_condition
*condition
;
378 struct lttng_notification_channel_command command
;
379 enum lttng_notification_channel_status status
=
380 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
381 struct lttng_trigger
*trigger
;
385 /* Receive command header. */
388 ret
= lttcomm_recv_unix_sock(socket
, ((char *) &command
) + received
,
389 sizeof(command
) - received
);
391 ERR("[notification-thread] Failed to receive channel command from client (received %zu bytes)", received
);
395 } while (received
< sizeof(command
));
398 if (command
.size
>= CLIENT_RECEPTION_BUFFER_SIZE
) {
399 ERR("[notification-thread] Notification channel client attempted to send condition larger (%u bytes) than client reception buffer (%u bytes)",
401 (unsigned int) CLIENT_RECEPTION_BUFFER_SIZE
);
407 ret
= lttcomm_recv_unix_sock(socket
,
408 client_reception_buffer
+ received
,
409 command
.size
- received
);
411 ERR("[notification-thread] Failed to receive condition from client");
415 } while (received
< sizeof(command
));
417 ret
= lttng_condition_create_from_buffer(client_reception_buffer
,
419 if (ret
< 0 || ret
< command
.size
) {
420 ERR("[notification-thread] Malformed condition received from client");
424 DBG("[notification-thread] Successfully received condition from notification channel client");
427 * A client may only listen for a condition that is currently associated
428 * with a trigger known to the system.
430 DBG("[notification-thread] Comparing registered condition to known trigger conditions");
431 cds_list_for_each_entry(trigger
, &trigger_list
, list_node
) {
432 struct lttng_condition
*trigger_condition
=
433 lttng_trigger_get_condition(trigger
);
435 if (!trigger_condition
) {
436 ERR("[notification-thread] lttng_trigger_get_condition returned NULL");
437 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ERROR
;
441 if (lttng_condition_is_equal(trigger_condition
, condition
)) {
442 /* Matching condition found. */
443 DBG("[notification-thread] Found a matching condition, accepting client subscription request");
444 cds_list_add(&condition
->list_node
,
445 &client
->condition_list
);
450 /* No match found, refuse the subscription. */
451 DBG("[notification-thread] No matching condition found, refusing client subscription request");
452 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN
;
454 if (send_command_reply(socket
, status
)) {
463 void client_destroy(struct client
*client
)
465 struct lttng_condition
*condition
, *tmp
;
467 cds_list_for_each_entry_safe(condition
, tmp
, &client
->condition_list
,
469 cds_list_del(&condition
->list_node
);
470 lttng_condition_destroy(condition
);
473 (void) lttcomm_close_unix_sock(client
->socket
);
478 void clean_up_notification_channel_client(int socket
)
480 struct client
*client
;
482 DBG("[notification-thread] Searching for client data for clean-up");
483 cds_list_for_each_entry(client
, &client_list
, list_node
) {
484 if (client
->socket
== socket
) {
485 DBG("[notification-thread] Client data found for clean-up");
486 cds_list_del(&client
->list_node
);
487 client_destroy(client
);
491 ERR("[notification-thread] Failed to clean-up client data");
495 void activate_triggers(struct cds_list_head
*new_triggers_list
)
497 struct lttng_trigger
*trigger
, *tmp
;
499 DBG("[notification-thread] Moving triggers from new list to activated trigger set");
500 cds_list_for_each_entry_safe(trigger
, tmp
, new_triggers_list
, list_node
) {
501 cds_list_del(&trigger
->list_node
);
502 cds_list_add(&trigger
->list_node
, &trigger_list
);
507 void clean_up_triggers(void)
509 struct lttng_trigger
*trigger
, *tmp
;
511 DBG("[notification-thread] Cleaning up triggers");
512 cds_list_for_each_entry_safe(trigger
, tmp
, &trigger_list
, list_node
) {
513 DBG("[notification-thread] Destroying trigger");
514 cds_list_del(&trigger
->list_node
);
515 lttng_trigger_destroy(trigger
);
520 struct lttng_evaluation
*evaluate_buffer_usage_condition(
521 struct lttng_condition
*_condition
)
524 struct lttng_evaluation
*evaluation
= NULL
;
525 struct lttng_condition_buffer_usage
*condition
= container_of(
526 _condition
, struct lttng_condition_buffer_usage
,
529 if (condition
->threshold_bytes
.set
) {
530 threshold
= condition
->threshold_bytes
.value
;
532 /* Threshold was expressed as a ratio. */
533 threshold
= (uint64_t) (condition
->threshold_ratio
.value
*
534 (double) simulation_buffer_capacity
);
537 if (condition
->parent
.type
==
538 LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
539 if (simulation_buffer_use_bytes
<= threshold
) {
540 evaluation
= lttng_evaluation_buffer_usage_create(
541 condition
->parent
.type
,
542 simulation_buffer_use_bytes
,
543 simulation_buffer_capacity
);
546 if (simulation_buffer_use_bytes
>= threshold
) {
547 evaluation
= lttng_evaluation_buffer_usage_create(
548 condition
->parent
.type
,
549 simulation_buffer_use_bytes
,
550 simulation_buffer_capacity
);
557 void notify_client(struct client
*client
, struct lttng_condition
*condition
,
558 struct lttng_evaluation
*evaluation
)
560 ssize_t notification_size
, ret
;
561 char *notification_buffer
;
562 struct lttng_notification
*notification
;
564 notification
= lttng_notification_create(condition
, evaluation
);
566 ERR("[notification-thread] Failed to create client notification");
570 notification_size
= lttng_notification_serialize(notification
, NULL
);
571 if (notification_size
< 0) {
572 ERR("[notification-thread] Failed to get size of serialized notification");
576 notification_buffer
= zmalloc(notification_size
);
577 if (!notification_buffer
) {
578 ERR("[notification-thread] Failed to allocate notification serialization buffer");
581 ret
= lttng_notification_serialize(notification
, notification_buffer
);
582 if (ret
!= notification_size
) {
583 ERR("[notification-thread] Failed to serialize notification");
587 ret
= lttcomm_send_unix_sock(client
->socket
, notification_buffer
,
590 ERR("[notification-thread] Failed to send notification to client");
596 void evaluate_client_conditions(void)
598 struct client
*client
;
600 DBG("[notification-thread] Evaluating client conditions");
601 cds_list_for_each_entry(client
, &client_list
, list_node
) {
602 struct lttng_condition
*condition
;
603 cds_list_for_each_entry(condition
, &client
->condition_list
,
605 struct lttng_evaluation
*evaluation
= NULL
;
606 switch (lttng_condition_get_type(condition
)) {
607 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
608 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
609 evaluation
= evaluate_buffer_usage_condition(
613 ERR("[notification-thread] Unknown condition type encountered in evaluation");
618 DBG("[notification-thread] Condition evaluated to true");
619 notify_client(client
, condition
, evaluation
);
620 lttng_evaluation_destroy(evaluation
);
624 DBG("[notification-thread] Client conditions evaluated");
628 * This thread services notification channel clients and received notifications
629 * from various lttng-sessiond components over a command queue.
631 void *thread_notification(void *data
)
634 struct lttng_poll_event events
;
635 int notification_channel_socket
;
636 struct notification_thread_data
*ctx
= data
;
638 DBG("[notification-thread] Started notification thread");
640 CDS_INIT_LIST_HEAD(&client_list
);
641 CDS_INIT_LIST_HEAD(&trigger_list
);
643 simulation_timer_start();
646 ERR("[notification-thread] Invalid thread context provided");
650 rcu_register_thread();
653 health_register(health_sessiond
, HEALTH_SESSIOND_TYPE_NOTIFICATION
);
654 health_code_update();
656 client_reception_buffer
= zmalloc(CLIENT_RECEPTION_BUFFER_SIZE
);
657 if (!client_reception_buffer
) {
658 ERR("[notification-thread] Failed to allocate client reception buffer");
662 ret
= notification_channel_socket_create();
666 notification_channel_socket
= ret
;
669 * Create pollset with size 2, quit pipe and notification channel
670 * socket, and the command queue event fd.
672 ret
= sessiond_set_thread_pollset(&events
, 3);
674 goto error_poll_create
;
677 /* Add notification channel socket to poll set. */
678 ret
= lttng_poll_add(&events
, notification_channel_socket
,
679 LPOLLIN
| LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
);
681 ERR("[notification-thread] Failed to add notification channel socket to pollset");
685 ret
= lttng_poll_add(&events
, ctx
->cmd_queue
.event_fd
,
688 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
692 ret
= lttng_poll_add(&events
, simulation_timer_event_fd
,
695 ERR("[notification-thread] Failed to add timer event fd to pollset");
699 DBG("[notification-thread] Listening on notification channel socket");
700 ret
= lttcomm_listen_unix_sock(notification_channel_socket
);
702 ERR("[notification-thread] Listen failed on notification channel socket");
710 DBG("[notification-thread] Entering poll wait");
711 ret
= lttng_poll_wait(&events
, -1);
712 DBG("[notification-thread] Poll wait returned (%i)", ret
);
716 * Restart interrupted system call.
718 if (errno
== EINTR
) {
721 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret
);
726 for (i
= 0; i
< fd_count
; i
++) {
727 int fd
= LTTNG_POLL_GETFD(&events
, i
);
728 uint32_t revents
= LTTNG_POLL_GETEV(&events
, i
);
730 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd
, revents
);
732 /* Thread quit pipe has been closed. Killing thread. */
733 if (sessiond_check_thread_quit_pipe(fd
, revents
)) {
734 DBG("[notification-thread] Quit pipe signaled, exiting.");
738 if (fd
== notification_channel_socket
) {
739 if (revents
& LPOLLIN
) {
742 ret
= handle_new_connection(
743 notification_channel_socket
);
749 ret
= lttng_poll_add(&events
, new_socket
,
751 LPOLLHUP
| LPOLLRDHUP
);
753 ERR("[notification-thread] Failed to add notification channel client socket to pollset");
756 DBG("[notification-thread] Added new notification channel client socket to poll set");
758 (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
759 ERR("[notification-thread] Notification socket poll error");
762 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents
, fd
);
765 } else if (fd
== ctx
->cmd_queue
.event_fd
) {
767 * Handling of internaly-generated events to
768 * evaluate against the set of active
773 DBG("[notification-thread] Event received on command queue event fd");
774 ret
= read(fd
, &counter
, sizeof(counter
));
776 ERR("read on command queue event fd");
779 pthread_mutex_lock(&ctx
->cmd_queue
.lock
);
780 activate_triggers(&ctx
->cmd_queue
.list
);
781 pthread_mutex_unlock(&ctx
->cmd_queue
.lock
);
782 } else if (fd
== simulation_timer_event_fd
) {
784 * Place-holder timer to simulate activity in
789 DBG("[notification-thread] Simulation timer fired");
790 ret
= read(fd
, &counter
, sizeof(counter
));
792 ERR("read on simulation timer event fd");
795 pthread_mutex_lock(&simulation_lock
);
796 evaluate_client_conditions();
797 pthread_mutex_unlock(&simulation_lock
);
799 if (revents
& (LPOLLERR
| LPOLLHUP
| LPOLLRDHUP
)) {
801 * It doesn't matter if a command was
802 * pending on the client socket at this
803 * point since it now has now way to
804 * receive the notifications to which
805 * it was subscribing or unsubscribing.
807 DBG("[notification-thread] Closing client connection (fd = %i)", fd
);
808 clean_up_notification_channel_client(fd
);
809 } else if (revents
& LPOLLIN
) {
810 ret
= handle_notification_channel_client(fd
);
812 DBG("[notification-thread] Closing client connection following error");
813 clean_up_notification_channel_client(fd
);
816 DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents
, fd
);
824 lttng_poll_clean(&events
);
826 notification_channel_socket_destroy(notification_channel_socket
);
827 health_unregister(health_sessiond
);
828 rcu_thread_offline();
829 rcu_unregister_thread();
830 free(client_reception_buffer
);
833 simulation_timer_stop();