2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/action/group-internal.h>
23 #include <lttng/notification/notification-internal.h>
24 #include <lttng/condition/condition-internal.h>
25 #include <lttng/condition/buffer-usage-internal.h>
26 #include <lttng/condition/session-consumed-size-internal.h>
27 #include <lttng/condition/session-rotation-internal.h>
28 #include <lttng/condition/event-rule-internal.h>
29 #include <lttng/notification/channel-internal.h>
30 #include <lttng/trigger/trigger-internal.h>
38 #include "notification-thread.h"
39 #include "notification-thread-events.h"
40 #include "notification-thread-commands.h"
41 #include "lttng-sessiond.h"
44 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
47 enum lttng_object_type
{
48 LTTNG_OBJECT_TYPE_UNKNOWN
,
49 LTTNG_OBJECT_TYPE_NONE
,
50 LTTNG_OBJECT_TYPE_CHANNEL
,
51 LTTNG_OBJECT_TYPE_SESSION
,
54 struct lttng_trigger_list_element
{
55 /* No ownership of the trigger object is assumed. */
56 struct lttng_trigger
*trigger
;
57 struct cds_list_head node
;
60 struct lttng_channel_trigger_list
{
61 struct channel_key channel_key
;
62 /* List of struct lttng_trigger_list_element. */
63 struct cds_list_head list
;
64 /* Node in the channel_triggers_ht */
65 struct cds_lfht_node channel_triggers_ht_node
;
66 /* call_rcu delayed reclaim. */
67 struct rcu_head rcu_node
;
71 * List of triggers applying to a given session.
74 * - lttng_session_trigger_list_create()
75 * - lttng_session_trigger_list_build()
76 * - lttng_session_trigger_list_destroy()
77 * - lttng_session_trigger_list_add()
79 struct lttng_session_trigger_list
{
81 * Not owned by this; points to the session_info structure's
84 const char *session_name
;
85 /* List of struct lttng_trigger_list_element. */
86 struct cds_list_head list
;
87 /* Node in the session_triggers_ht */
88 struct cds_lfht_node session_triggers_ht_node
;
90 * Weak reference to the notification system's session triggers
93 * The session trigger list structure structure is owned by
94 * the session's session_info.
96 * The session_info is kept alive the the channel_infos holding a
97 * reference to it (reference counting). When those channels are
98 * destroyed (at runtime or on teardown), the reference they hold
99 * to the session_info are released. On destruction of session_info,
100 * session_info_destroy() will remove the list of triggers applying
101 * to this session from the notification system's state.
103 * This implies that the session_triggers_ht must be destroyed
104 * after the channels.
106 struct cds_lfht
*session_triggers_ht
;
107 /* Used for delayed RCU reclaim. */
108 struct rcu_head rcu_node
;
111 struct lttng_trigger_ht_element
{
112 struct lttng_trigger
*trigger
;
113 struct cds_lfht_node node
;
114 struct cds_lfht_node node_by_name
;
115 /* call_rcu delayed reclaim. */
116 struct rcu_head rcu_node
;
119 struct lttng_condition_list_element
{
120 struct lttng_condition
*condition
;
121 struct cds_list_head node
;
125 * Facilities to carry the different notifications type in the action processing
128 struct lttng_trigger_notification
{
130 struct lttng_ust_trigger_notification
*ust
;
134 enum lttng_domain_type type
;
137 struct channel_state_sample
{
138 struct channel_key key
;
139 struct cds_lfht_node channel_state_ht_node
;
140 uint64_t highest_usage
;
141 uint64_t lowest_usage
;
142 uint64_t channel_total_consumed
;
143 /* call_rcu delayed reclaim. */
144 struct rcu_head rcu_node
;
147 static unsigned long hash_channel_key(struct channel_key
*key
);
148 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
149 struct lttng_evaluation
**evaluation
,
150 const struct notification_thread_state
*state
,
151 const struct channel_state_sample
*previous_sample
,
152 const struct channel_state_sample
*latest_sample
,
153 uint64_t previous_session_consumed_total
,
154 uint64_t latest_session_consumed_total
,
155 struct channel_info
*channel_info
);
157 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
158 const struct lttng_evaluation
*evaluation
,
159 struct notification_client_list
*client_list
,
160 struct notification_thread_state
*state
,
161 uid_t channel_uid
, gid_t channel_gid
);
164 /* session_info API */
166 void session_info_destroy(void *_data
);
168 void session_info_get(struct session_info
*session_info
);
170 void session_info_put(struct session_info
*session_info
);
172 struct session_info
*session_info_create(const char *name
,
173 uid_t uid
, gid_t gid
,
174 struct lttng_session_trigger_list
*trigger_list
,
175 struct cds_lfht
*sessions_ht
);
177 void session_info_add_channel(struct session_info
*session_info
,
178 struct channel_info
*channel_info
);
180 void session_info_remove_channel(struct session_info
*session_info
,
181 struct channel_info
*channel_info
);
183 /* lttng_session_trigger_list API */
185 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
186 const char *session_name
,
187 struct cds_lfht
*session_triggers_ht
);
189 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
190 const struct notification_thread_state
*state
,
191 const char *session_name
);
193 void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list
*list
);
196 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
197 struct lttng_trigger
*trigger
);
200 int client_handle_transmission_status(
201 struct notification_client
*client
,
202 enum client_transmission_status transmission_status
,
203 struct notification_thread_state
*state
);
206 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
208 /* This double-cast is intended to supress pointer-to-cast warning. */
209 const int socket
= (int) (intptr_t) key
;
210 const struct notification_client
*client
= caa_container_of(node
,
211 struct notification_client
, client_socket_ht_node
);
213 return client
->socket
== socket
;
217 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
219 /* This double-cast is intended to supress pointer-to-cast warning. */
220 const notification_client_id id
= *((notification_client_id
*) key
);
221 const struct notification_client
*client
= caa_container_of(
222 node
, struct notification_client
, client_id_ht_node
);
224 return client
->id
== id
;
228 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
230 struct channel_key
*channel_key
= (struct channel_key
*) key
;
231 struct lttng_channel_trigger_list
*trigger_list
;
233 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
234 channel_triggers_ht_node
);
236 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
237 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
241 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
243 const char *session_name
= (const char *) key
;
244 struct lttng_session_trigger_list
*trigger_list
;
246 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
247 session_triggers_ht_node
);
249 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
253 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
255 struct channel_key
*channel_key
= (struct channel_key
*) key
;
256 struct channel_state_sample
*sample
;
258 sample
= caa_container_of(node
, struct channel_state_sample
,
259 channel_state_ht_node
);
261 return !!((channel_key
->key
== sample
->key
.key
) &&
262 (channel_key
->domain
== sample
->key
.domain
));
266 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
268 struct channel_key
*channel_key
= (struct channel_key
*) key
;
269 struct channel_info
*channel_info
;
271 channel_info
= caa_container_of(node
, struct channel_info
,
274 return !!((channel_key
->key
== channel_info
->key
.key
) &&
275 (channel_key
->domain
== channel_info
->key
.domain
));
279 int match_trigger(struct cds_lfht_node
*node
, const void *key
)
282 struct lttng_trigger
*trigger_key
= (struct lttng_trigger
*) key
;
283 struct lttng_trigger_ht_element
*trigger_ht_element
;
284 const struct lttng_credentials
*creds_key
;
285 const struct lttng_credentials
*creds_node
;
287 trigger_ht_element
= caa_container_of(node
, struct lttng_trigger_ht_element
,
290 match
= lttng_trigger_is_equal(trigger_key
, trigger_ht_element
->trigger
);
295 /* Validate credential */
296 /* TODO: this could be moved to lttng_trigger_equal depending on how we
297 * handle root behaviour on disable and listing.
299 creds_key
= lttng_trigger_get_credentials(trigger_key
);
300 creds_node
= lttng_trigger_get_credentials(trigger_ht_element
->trigger
);
301 match
= lttng_credentials_is_equal(creds_key
, creds_node
);
307 int match_trigger_token(struct cds_lfht_node
*node
, const void *key
)
309 const uint64_t *_key
= key
;
310 struct notification_trigger_tokens_ht_element
*element
;
312 element
= caa_container_of(node
, struct notification_trigger_tokens_ht_element
,
314 return *_key
== element
->token
;
318 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
320 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
321 struct notification_client_list
*client_list
;
322 const struct lttng_condition
*condition
;
324 assert(condition_key
);
326 client_list
= caa_container_of(node
, struct notification_client_list
,
327 notification_trigger_clients_ht_node
);
328 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
330 return !!lttng_condition_is_equal(condition_key
, condition
);
334 int match_session(struct cds_lfht_node
*node
, const void *key
)
336 const char *name
= key
;
337 struct session_info
*session_info
= caa_container_of(
338 node
, struct session_info
, sessions_ht_node
);
340 return !strcmp(session_info
->name
, name
);
344 * Match function for string node.
346 static int match_str(struct cds_lfht_node
*node
, const void *key
)
348 struct lttng_trigger_ht_element
*trigger_ht_element
;
351 trigger_ht_element
= caa_container_of(node
, struct lttng_trigger_ht_element
,
354 /* TODO error checking */
355 lttng_trigger_get_name(trigger_ht_element
->trigger
, &name
);
357 return hash_match_key_str(name
, (void *) key
);
361 unsigned long lttng_condition_buffer_usage_hash(
362 const struct lttng_condition
*_condition
)
365 unsigned long condition_type
;
366 struct lttng_condition_buffer_usage
*condition
;
368 condition
= container_of(_condition
,
369 struct lttng_condition_buffer_usage
, parent
);
371 condition_type
= (unsigned long) condition
->parent
.type
;
372 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
373 if (condition
->session_name
) {
374 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
376 if (condition
->channel_name
) {
377 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
379 if (condition
->domain
.set
) {
380 hash
^= hash_key_ulong(
381 (void *) condition
->domain
.type
,
384 if (condition
->threshold_ratio
.set
) {
387 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
388 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
389 } else if (condition
->threshold_bytes
.set
) {
392 val
= condition
->threshold_bytes
.value
;
393 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
399 unsigned long lttng_condition_session_consumed_size_hash(
400 const struct lttng_condition
*_condition
)
403 unsigned long condition_type
=
404 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
405 struct lttng_condition_session_consumed_size
*condition
;
408 condition
= container_of(_condition
,
409 struct lttng_condition_session_consumed_size
, parent
);
411 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
412 if (condition
->session_name
) {
413 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
415 val
= condition
->consumed_threshold_bytes
.value
;
416 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
421 unsigned long lttng_condition_session_rotation_hash(
422 const struct lttng_condition
*_condition
)
424 unsigned long hash
, condition_type
;
425 struct lttng_condition_session_rotation
*condition
;
427 condition
= container_of(_condition
,
428 struct lttng_condition_session_rotation
, parent
);
429 condition_type
= (unsigned long) condition
->parent
.type
;
430 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
431 assert(condition
->session_name
);
432 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
437 unsigned long lttng_condition_event_rule_hash(
438 const struct lttng_condition
*_condition
)
440 unsigned long hash
, condition_type
;
441 struct lttng_condition_event_rule
*condition
;
443 condition
= container_of(_condition
,
444 struct lttng_condition_event_rule
, parent
);
445 condition_type
= (unsigned long) condition
->parent
.type
;
446 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
448 /* TODO: further hasg using the event rule? on pattern maybe?*/
453 * The lttng_condition hashing code is kept in this file (rather than
454 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
455 * don't want to link in liblttng-ctl.
458 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
460 switch (condition
->type
) {
461 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
463 return lttng_condition_buffer_usage_hash(condition
);
464 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
465 return lttng_condition_session_consumed_size_hash(condition
);
466 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
467 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
468 return lttng_condition_session_rotation_hash(condition
);
469 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
470 return lttng_condition_event_rule_hash(condition
);
472 ERR("[notification-thread] Unexpected condition type caught");
478 unsigned long hash_channel_key(struct channel_key
*key
)
480 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
481 unsigned long domain_hash
= hash_key_ulong(
482 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
484 return key_hash
^ domain_hash
;
488 unsigned long hash_client_socket(int socket
)
490 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
494 unsigned long hash_client_id(notification_client_id id
)
496 return hash_key_u64(&id
, lttng_ht_seed
);
500 * Get the type of object to which a given condition applies. Bindings let
501 * the notification system evaluate a trigger's condition when a given
502 * object's state is updated.
504 * For instance, a condition bound to a channel will be evaluated everytime
505 * the channel's state is changed by a channel monitoring sample.
508 enum lttng_object_type
get_condition_binding_object(
509 const struct lttng_condition
*condition
)
511 switch (lttng_condition_get_type(condition
)) {
512 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
513 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
514 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
515 return LTTNG_OBJECT_TYPE_CHANNEL
;
516 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
517 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
518 return LTTNG_OBJECT_TYPE_SESSION
;
519 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
520 return LTTNG_OBJECT_TYPE_NONE
;
522 return LTTNG_OBJECT_TYPE_UNKNOWN
;
527 void free_channel_info_rcu(struct rcu_head
*node
)
529 free(caa_container_of(node
, struct channel_info
, rcu_node
));
533 void channel_info_destroy(struct channel_info
*channel_info
)
539 if (channel_info
->session_info
) {
540 session_info_remove_channel(channel_info
->session_info
,
542 session_info_put(channel_info
->session_info
);
544 if (channel_info
->name
) {
545 free(channel_info
->name
);
547 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
551 void free_session_info_rcu(struct rcu_head
*node
)
553 free(caa_container_of(node
, struct session_info
, rcu_node
));
556 /* Don't call directly, use the ref-counting mechanism. */
558 void session_info_destroy(void *_data
)
560 struct session_info
*session_info
= _data
;
563 assert(session_info
);
564 if (session_info
->channel_infos_ht
) {
565 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
567 ERR("[notification-thread] Failed to destroy channel information hash table");
570 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
573 cds_lfht_del(session_info
->sessions_ht
,
574 &session_info
->sessions_ht_node
);
576 free(session_info
->name
);
577 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
581 void session_info_get(struct session_info
*session_info
)
586 lttng_ref_get(&session_info
->ref
);
590 void session_info_put(struct session_info
*session_info
)
595 lttng_ref_put(&session_info
->ref
);
599 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
600 struct lttng_session_trigger_list
*trigger_list
,
601 struct cds_lfht
*sessions_ht
)
603 struct session_info
*session_info
;
607 session_info
= zmalloc(sizeof(*session_info
));
611 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
613 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
614 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
615 if (!session_info
->channel_infos_ht
) {
619 cds_lfht_node_init(&session_info
->sessions_ht_node
);
620 session_info
->name
= strdup(name
);
621 if (!session_info
->name
) {
624 session_info
->uid
= uid
;
625 session_info
->gid
= gid
;
626 session_info
->trigger_list
= trigger_list
;
627 session_info
->sessions_ht
= sessions_ht
;
631 session_info_put(session_info
);
636 void session_info_add_channel(struct session_info
*session_info
,
637 struct channel_info
*channel_info
)
640 cds_lfht_add(session_info
->channel_infos_ht
,
641 hash_channel_key(&channel_info
->key
),
642 &channel_info
->session_info_channels_ht_node
);
647 void session_info_remove_channel(struct session_info
*session_info
,
648 struct channel_info
*channel_info
)
651 cds_lfht_del(session_info
->channel_infos_ht
,
652 &channel_info
->session_info_channels_ht_node
);
657 struct channel_info
*channel_info_create(const char *channel_name
,
658 struct channel_key
*channel_key
, uint64_t channel_capacity
,
659 struct session_info
*session_info
)
661 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
667 cds_lfht_node_init(&channel_info
->channels_ht_node
);
668 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
669 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
670 channel_info
->capacity
= channel_capacity
;
672 channel_info
->name
= strdup(channel_name
);
673 if (!channel_info
->name
) {
678 * Set the references between session and channel infos:
679 * - channel_info holds a strong reference to session_info
680 * - session_info holds a weak reference to channel_info
682 session_info_get(session_info
);
683 session_info_add_channel(session_info
, channel_info
);
684 channel_info
->session_info
= session_info
;
688 channel_info_destroy(channel_info
);
693 bool notification_client_list_get(struct notification_client_list
*list
)
695 return urcu_ref_get_unless_zero(&list
->ref
);
699 void free_notification_client_list_rcu(struct rcu_head
*node
)
701 free(caa_container_of(node
, struct notification_client_list
,
706 void notification_client_list_release(struct urcu_ref
*list_ref
)
708 struct notification_client_list
*list
=
709 container_of(list_ref
, typeof(*list
), ref
);
710 struct notification_client_list_element
*client_list_element
, *tmp
;
712 if (list
->notification_trigger_clients_ht
) {
714 cds_lfht_del(list
->notification_trigger_clients_ht
,
715 &list
->notification_trigger_clients_ht_node
);
717 list
->notification_trigger_clients_ht
= NULL
;
719 cds_list_for_each_entry_safe(client_list_element
, tmp
,
721 free(client_list_element
);
723 pthread_mutex_destroy(&list
->lock
);
724 call_rcu(&list
->rcu_node
, free_notification_client_list_rcu
);
728 struct notification_client_list
*notification_client_list_create(
729 const struct lttng_trigger
*trigger
)
731 struct notification_client_list
*client_list
=
732 zmalloc(sizeof(*client_list
));
737 pthread_mutex_init(&client_list
->lock
, NULL
);
738 urcu_ref_init(&client_list
->ref
);
739 cds_lfht_node_init(&client_list
->notification_trigger_clients_ht_node
);
740 CDS_INIT_LIST_HEAD(&client_list
->list
);
741 client_list
->trigger
= trigger
;
747 void publish_notification_client_list(
748 struct notification_thread_state
*state
,
749 struct notification_client_list
*list
)
751 const struct lttng_condition
*condition
=
752 lttng_trigger_get_const_condition(list
->trigger
);
754 assert(!list
->notification_trigger_clients_ht
);
756 list
->notification_trigger_clients_ht
=
757 state
->notification_trigger_clients_ht
;
760 cds_lfht_add(state
->notification_trigger_clients_ht
,
761 lttng_condition_hash(condition
),
762 &list
->notification_trigger_clients_ht_node
);
767 void notification_client_list_put(struct notification_client_list
*list
)
772 return urcu_ref_put(&list
->ref
, notification_client_list_release
);
775 /* Provides a reference to the returned list. */
777 struct notification_client_list
*get_client_list_from_condition(
778 struct notification_thread_state
*state
,
779 const struct lttng_condition
*condition
)
781 struct cds_lfht_node
*node
;
782 struct cds_lfht_iter iter
;
783 struct notification_client_list
*list
= NULL
;
786 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
787 lttng_condition_hash(condition
),
788 match_client_list_condition
,
791 node
= cds_lfht_iter_get_node(&iter
);
793 list
= container_of(node
, struct notification_client_list
,
794 notification_trigger_clients_ht_node
);
795 list
= notification_client_list_get(list
) ? list
: NULL
;
802 int evaluate_channel_condition_for_client(
803 const struct lttng_condition
*condition
,
804 struct notification_thread_state
*state
,
805 struct lttng_evaluation
**evaluation
,
806 uid_t
*session_uid
, gid_t
*session_gid
)
809 struct cds_lfht_iter iter
;
810 struct cds_lfht_node
*node
;
811 struct channel_info
*channel_info
= NULL
;
812 struct channel_key
*channel_key
= NULL
;
813 struct channel_state_sample
*last_sample
= NULL
;
814 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
818 /* Find the channel associated with the condition. */
819 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
820 channel_trigger_list
, channel_triggers_ht_node
) {
821 struct lttng_trigger_list_element
*element
;
823 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
824 const struct lttng_condition
*current_condition
=
825 lttng_trigger_get_const_condition(
828 assert(current_condition
);
829 if (!lttng_condition_is_equal(condition
,
830 current_condition
)) {
834 /* Found the trigger, save the channel key. */
835 channel_key
= &channel_trigger_list
->channel_key
;
839 /* The channel key was found stop iteration. */
845 /* No channel found; normal exit. */
846 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
851 /* Fetch channel info for the matching channel. */
852 cds_lfht_lookup(state
->channels_ht
,
853 hash_channel_key(channel_key
),
857 node
= cds_lfht_iter_get_node(&iter
);
859 channel_info
= caa_container_of(node
, struct channel_info
,
862 /* Retrieve the channel's last sample, if it exists. */
863 cds_lfht_lookup(state
->channel_state_ht
,
864 hash_channel_key(channel_key
),
865 match_channel_state_sample
,
868 node
= cds_lfht_iter_get_node(&iter
);
870 last_sample
= caa_container_of(node
,
871 struct channel_state_sample
,
872 channel_state_ht_node
);
874 /* Nothing to evaluate, no sample was ever taken. Normal exit */
875 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
880 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
882 0, channel_info
->session_info
->consumed_data_size
,
885 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
889 *session_uid
= channel_info
->session_info
->uid
;
890 *session_gid
= channel_info
->session_info
->gid
;
897 const char *get_condition_session_name(const struct lttng_condition
*condition
)
899 const char *session_name
= NULL
;
900 enum lttng_condition_status status
;
902 switch (lttng_condition_get_type(condition
)) {
903 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
904 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
905 status
= lttng_condition_buffer_usage_get_session_name(
906 condition
, &session_name
);
908 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
909 status
= lttng_condition_session_consumed_size_get_session_name(
910 condition
, &session_name
);
912 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
913 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
914 status
= lttng_condition_session_rotation_get_session_name(
915 condition
, &session_name
);
920 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
921 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
929 int evaluate_session_condition_for_client(
930 const struct lttng_condition
*condition
,
931 struct notification_thread_state
*state
,
932 struct lttng_evaluation
**evaluation
,
933 uid_t
*session_uid
, gid_t
*session_gid
)
936 struct cds_lfht_iter iter
;
937 struct cds_lfht_node
*node
;
938 const char *session_name
;
939 struct session_info
*session_info
= NULL
;
942 session_name
= get_condition_session_name(condition
);
944 /* Find the session associated with the trigger. */
945 cds_lfht_lookup(state
->sessions_ht
,
946 hash_key_str(session_name
, lttng_ht_seed
),
950 node
= cds_lfht_iter_get_node(&iter
);
952 DBG("[notification-thread] No known session matching name \"%s\"",
958 session_info
= caa_container_of(node
, struct session_info
,
960 session_info_get(session_info
);
963 * Evaluation is performed in-line here since only one type of
964 * session-bound condition is handled for the moment.
966 switch (lttng_condition_get_type(condition
)) {
967 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
968 if (!session_info
->rotation
.ongoing
) {
970 goto end_session_put
;
973 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
974 session_info
->rotation
.id
);
977 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
980 goto end_session_put
;
986 goto end_session_put
;
989 *session_uid
= session_info
->uid
;
990 *session_gid
= session_info
->gid
;
993 session_info_put(session_info
);
1000 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
1001 const struct lttng_condition
*condition
,
1002 struct notification_client
*client
,
1003 struct notification_thread_state
*state
)
1006 struct lttng_evaluation
*evaluation
= NULL
;
1007 struct notification_client_list client_list
= {
1008 .lock
= PTHREAD_MUTEX_INITIALIZER
,
1010 struct notification_client_list_element client_list_element
= { 0 };
1011 uid_t object_uid
= 0;
1012 gid_t object_gid
= 0;
1019 switch (get_condition_binding_object(condition
)) {
1020 case LTTNG_OBJECT_TYPE_SESSION
:
1021 ret
= evaluate_session_condition_for_client(condition
, state
,
1022 &evaluation
, &object_uid
, &object_gid
);
1024 case LTTNG_OBJECT_TYPE_CHANNEL
:
1025 ret
= evaluate_channel_condition_for_client(condition
, state
,
1026 &evaluation
, &object_uid
, &object_gid
);
1028 case LTTNG_OBJECT_TYPE_NONE
:
1029 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
1032 case LTTNG_OBJECT_TYPE_UNKNOWN
:
1042 /* Evaluation yielded nothing. Normal exit. */
1043 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1049 * Create a temporary client list with the client currently
1052 cds_lfht_node_init(&client_list
.notification_trigger_clients_ht_node
);
1053 CDS_INIT_LIST_HEAD(&client_list
.list
);
1054 client_list
.trigger
= trigger
;
1056 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
1057 client_list_element
.client
= client
;
1058 cds_list_add(&client_list_element
.node
, &client_list
.list
);
1060 /* Send evaluation result to the newly-subscribed client. */
1061 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1062 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
1063 state
, object_uid
, object_gid
);
1070 int notification_thread_client_subscribe(struct notification_client
*client
,
1071 struct lttng_condition
*condition
,
1072 struct notification_thread_state
*state
,
1073 enum lttng_notification_channel_status
*_status
)
1076 struct notification_client_list
*client_list
= NULL
;
1077 struct lttng_condition_list_element
*condition_list_element
= NULL
;
1078 struct notification_client_list_element
*client_list_element
= NULL
;
1079 enum lttng_notification_channel_status status
=
1080 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1083 * Ensure that the client has not already subscribed to this condition
1086 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1087 if (lttng_condition_is_equal(condition_list_element
->condition
,
1089 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1094 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1095 if (!condition_list_element
) {
1099 client_list_element
= zmalloc(sizeof(*client_list_element
));
1100 if (!client_list_element
) {
1106 * Add the newly-subscribed condition to the client's subscription list.
1108 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1109 condition_list_element
->condition
= condition
;
1110 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1112 client_list
= get_client_list_from_condition(state
, condition
);
1115 * No notification-emiting trigger registered with this
1116 * condition. We don't evaluate the condition right away
1117 * since this trigger is not registered yet.
1119 free(client_list_element
);
1124 * The condition to which the client just subscribed is evaluated
1125 * at this point so that conditions that are already TRUE result
1126 * in a notification being sent out.
1128 * The client_list's trigger is used without locking the list itself.
1129 * This is correct since the list doesn't own the trigger and the
1130 * object is immutable.
1132 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1134 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1136 free(client_list_element
);
1141 * Add the client to the list of clients interested in a given trigger
1142 * if a "notification" trigger with a corresponding condition was
1145 client_list_element
->client
= client
;
1146 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1148 pthread_mutex_lock(&client_list
->lock
);
1149 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1150 pthread_mutex_unlock(&client_list
->lock
);
1156 notification_client_list_put(client_list
);
1160 free(condition_list_element
);
1161 free(client_list_element
);
1166 int notification_thread_client_unsubscribe(
1167 struct notification_client
*client
,
1168 struct lttng_condition
*condition
,
1169 struct notification_thread_state
*state
,
1170 enum lttng_notification_channel_status
*_status
)
1172 struct notification_client_list
*client_list
;
1173 struct lttng_condition_list_element
*condition_list_element
,
1175 struct notification_client_list_element
*client_list_element
,
1177 bool condition_found
= false;
1178 enum lttng_notification_channel_status status
=
1179 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1181 /* Remove the condition from the client's condition list. */
1182 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1183 &client
->condition_list
, node
) {
1184 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1189 cds_list_del(&condition_list_element
->node
);
1191 * The caller may be iterating on the client's conditions to
1192 * tear down a client's connection. In this case, the condition
1193 * will be destroyed at the end.
1195 if (condition
!= condition_list_element
->condition
) {
1196 lttng_condition_destroy(
1197 condition_list_element
->condition
);
1199 free(condition_list_element
);
1200 condition_found
= true;
1204 if (!condition_found
) {
1205 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1210 * Remove the client from the list of clients interested the trigger
1211 * matching the condition.
1213 client_list
= get_client_list_from_condition(state
, condition
);
1218 pthread_mutex_lock(&client_list
->lock
);
1219 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1220 &client_list
->list
, node
) {
1221 if (client_list_element
->client
->id
!= client
->id
) {
1224 cds_list_del(&client_list_element
->node
);
1225 free(client_list_element
);
1228 pthread_mutex_unlock(&client_list
->lock
);
1229 notification_client_list_put(client_list
);
1232 lttng_condition_destroy(condition
);
1240 void free_notification_client_rcu(struct rcu_head
*node
)
1242 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1246 void notification_client_destroy(struct notification_client
*client
,
1247 struct notification_thread_state
*state
)
1254 * The client object is not reachable by other threads, no need to lock
1257 if (client
->socket
>= 0) {
1258 (void) lttcomm_close_unix_sock(client
->socket
);
1259 client
->socket
= -1;
1261 client
->communication
.active
= false;
1262 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1263 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1264 pthread_mutex_destroy(&client
->lock
);
1265 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1269 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1273 struct notification_client
*get_client_from_socket(int socket
,
1274 struct notification_thread_state
*state
)
1276 struct cds_lfht_iter iter
;
1277 struct cds_lfht_node
*node
;
1278 struct notification_client
*client
= NULL
;
1280 cds_lfht_lookup(state
->client_socket_ht
,
1281 hash_client_socket(socket
),
1282 match_client_socket
,
1283 (void *) (unsigned long) socket
,
1285 node
= cds_lfht_iter_get_node(&iter
);
1290 client
= caa_container_of(node
, struct notification_client
,
1291 client_socket_ht_node
);
1297 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1301 struct notification_client
*get_client_from_id(notification_client_id id
,
1302 struct notification_thread_state
*state
)
1304 struct cds_lfht_iter iter
;
1305 struct cds_lfht_node
*node
;
1306 struct notification_client
*client
= NULL
;
1308 cds_lfht_lookup(state
->client_id_ht
,
1313 node
= cds_lfht_iter_get_node(&iter
);
1318 client
= caa_container_of(node
, struct notification_client
,
1325 bool buffer_usage_condition_applies_to_channel(
1326 const struct lttng_condition
*condition
,
1327 const struct channel_info
*channel_info
)
1329 enum lttng_condition_status status
;
1330 enum lttng_domain_type condition_domain
;
1331 const char *condition_session_name
= NULL
;
1332 const char *condition_channel_name
= NULL
;
1334 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1336 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1337 if (channel_info
->key
.domain
!= condition_domain
) {
1341 status
= lttng_condition_buffer_usage_get_session_name(
1342 condition
, &condition_session_name
);
1343 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1345 status
= lttng_condition_buffer_usage_get_channel_name(
1346 condition
, &condition_channel_name
);
1347 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1349 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1352 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1362 bool session_consumed_size_condition_applies_to_channel(
1363 const struct lttng_condition
*condition
,
1364 const struct channel_info
*channel_info
)
1366 enum lttng_condition_status status
;
1367 const char *condition_session_name
= NULL
;
1369 status
= lttng_condition_session_consumed_size_get_session_name(
1370 condition
, &condition_session_name
);
1371 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1373 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1383 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1384 const struct channel_info
*channel_info
)
1386 const struct lttng_condition
*condition
;
1387 bool trigger_applies
;
1389 condition
= lttng_trigger_get_const_condition(trigger
);
1394 switch (lttng_condition_get_type(condition
)) {
1395 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1396 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1397 trigger_applies
= buffer_usage_condition_applies_to_channel(
1398 condition
, channel_info
);
1400 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1401 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1402 condition
, channel_info
);
1408 return trigger_applies
;
1414 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1415 struct notification_client
*client
)
1417 bool applies
= false;
1418 struct lttng_condition_list_element
*condition_list_element
;
1420 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1422 applies
= lttng_condition_is_equal(
1423 condition_list_element
->condition
,
1424 lttng_trigger_get_condition(trigger
));
1432 /* Must be called with RCU read lock held. */
1434 struct lttng_session_trigger_list
*get_session_trigger_list(
1435 struct notification_thread_state
*state
,
1436 const char *session_name
)
1438 struct lttng_session_trigger_list
*list
= NULL
;
1439 struct cds_lfht_node
*node
;
1440 struct cds_lfht_iter iter
;
1442 cds_lfht_lookup(state
->session_triggers_ht
,
1443 hash_key_str(session_name
, lttng_ht_seed
),
1444 match_session_trigger_list
,
1447 node
= cds_lfht_iter_get_node(&iter
);
1450 * Not an error, the list of triggers applying to that session
1451 * will be initialized when the session is created.
1453 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1458 list
= caa_container_of(node
,
1459 struct lttng_session_trigger_list
,
1460 session_triggers_ht_node
);
1466 * Allocate an empty lttng_session_trigger_list for the session named
1469 * No ownership of 'session_name' is assumed by the session trigger list.
1470 * It is the caller's responsability to ensure the session name is alive
1471 * for as long as this list is.
1474 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1475 const char *session_name
,
1476 struct cds_lfht
*session_triggers_ht
)
1478 struct lttng_session_trigger_list
*list
;
1480 list
= zmalloc(sizeof(*list
));
1484 list
->session_name
= session_name
;
1485 CDS_INIT_LIST_HEAD(&list
->list
);
1486 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1487 list
->session_triggers_ht
= session_triggers_ht
;
1490 /* Publish the list through the session_triggers_ht. */
1491 cds_lfht_add(session_triggers_ht
,
1492 hash_key_str(session_name
, lttng_ht_seed
),
1493 &list
->session_triggers_ht_node
);
1500 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1502 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1507 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1509 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1511 /* Empty the list element by element, and then free the list itself. */
1512 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1513 &list
->list
, node
) {
1514 cds_list_del(&trigger_list_element
->node
);
1515 free(trigger_list_element
);
1518 /* Unpublish the list from the session_triggers_ht. */
1519 cds_lfht_del(list
->session_triggers_ht
,
1520 &list
->session_triggers_ht_node
);
1522 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1526 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1527 struct lttng_trigger
*trigger
)
1530 struct lttng_trigger_list_element
*new_element
=
1531 zmalloc(sizeof(*new_element
));
1537 CDS_INIT_LIST_HEAD(&new_element
->node
);
1538 new_element
->trigger
= trigger
;
1539 cds_list_add(&new_element
->node
, &list
->list
);
1545 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1546 const char *session_name
)
1548 bool applies
= false;
1549 const struct lttng_condition
*condition
;
1551 condition
= lttng_trigger_get_const_condition(trigger
);
1552 switch (lttng_condition_get_type(condition
)) {
1553 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1554 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1556 enum lttng_condition_status condition_status
;
1557 const char *condition_session_name
;
1559 condition_status
= lttng_condition_session_rotation_get_session_name(
1560 condition
, &condition_session_name
);
1561 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1562 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1566 assert(condition_session_name
);
1567 applies
= !strcmp(condition_session_name
, session_name
);
1578 * Allocate and initialize an lttng_session_trigger_list which contains
1579 * all triggers that apply to the session named 'session_name'.
1581 * No ownership of 'session_name' is assumed by the session trigger list.
1582 * It is the caller's responsability to ensure the session name is alive
1583 * for as long as this list is.
1586 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1587 const struct notification_thread_state
*state
,
1588 const char *session_name
)
1590 int trigger_count
= 0;
1591 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1592 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1593 struct cds_lfht_iter iter
;
1595 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1596 state
->session_triggers_ht
);
1598 /* Add all triggers applying to the session named 'session_name'. */
1599 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1603 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1608 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1609 trigger_ht_element
->trigger
);
1617 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1619 return session_trigger_list
;
1621 lttng_session_trigger_list_destroy(session_trigger_list
);
1626 struct session_info
*find_or_create_session_info(
1627 struct notification_thread_state
*state
,
1628 const char *name
, uid_t uid
, gid_t gid
)
1630 struct session_info
*session
= NULL
;
1631 struct cds_lfht_node
*node
;
1632 struct cds_lfht_iter iter
;
1633 struct lttng_session_trigger_list
*trigger_list
;
1636 cds_lfht_lookup(state
->sessions_ht
,
1637 hash_key_str(name
, lttng_ht_seed
),
1641 node
= cds_lfht_iter_get_node(&iter
);
1643 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1645 session
= caa_container_of(node
, struct session_info
,
1647 assert(session
->uid
== uid
);
1648 assert(session
->gid
== gid
);
1649 session_info_get(session
);
1653 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1654 if (!trigger_list
) {
1658 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1659 state
->sessions_ht
);
1661 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1663 lttng_session_trigger_list_destroy(trigger_list
);
1666 trigger_list
= NULL
;
1668 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1669 &session
->sessions_ht_node
);
1675 session_info_put(session
);
1680 int handle_notification_thread_command_add_channel(
1681 struct notification_thread_state
*state
,
1682 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1683 const char *channel_name
, enum lttng_domain_type channel_domain
,
1684 uint64_t channel_key_int
, uint64_t channel_capacity
,
1685 enum lttng_error_code
*cmd_result
)
1687 struct cds_list_head trigger_list
;
1688 struct channel_info
*new_channel_info
= NULL
;
1689 struct channel_key channel_key
= {
1690 .key
= channel_key_int
,
1691 .domain
= channel_domain
,
1693 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1694 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1695 int trigger_count
= 0;
1696 struct cds_lfht_iter iter
;
1697 struct session_info
*session_info
= NULL
;
1699 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1700 channel_name
, session_name
, channel_key_int
,
1701 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1703 CDS_INIT_LIST_HEAD(&trigger_list
);
1705 session_info
= find_or_create_session_info(state
, session_name
,
1706 session_uid
, session_gid
);
1707 if (!session_info
) {
1708 /* Allocation error or an internal error occurred. */
1712 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1713 channel_capacity
, session_info
);
1714 if (!new_channel_info
) {
1719 /* Build a list of all triggers applying to the new channel. */
1720 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1722 struct lttng_trigger_list_element
*new_element
;
1724 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1725 new_channel_info
)) {
1729 new_element
= zmalloc(sizeof(*new_element
));
1734 CDS_INIT_LIST_HEAD(&new_element
->node
);
1735 new_element
->trigger
= trigger_ht_element
->trigger
;
1736 cds_list_add(&new_element
->node
, &trigger_list
);
1741 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1743 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1744 if (!channel_trigger_list
) {
1747 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1748 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1749 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1750 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1753 /* Add channel to the channel_ht which owns the channel_infos. */
1754 cds_lfht_add(state
->channels_ht
,
1755 hash_channel_key(&new_channel_info
->key
),
1756 &new_channel_info
->channels_ht_node
);
1758 * Add the list of triggers associated with this channel to the
1759 * channel_triggers_ht.
1761 cds_lfht_add(state
->channel_triggers_ht
,
1762 hash_channel_key(&new_channel_info
->key
),
1763 &channel_trigger_list
->channel_triggers_ht_node
);
1765 session_info_put(session_info
);
1766 *cmd_result
= LTTNG_OK
;
1769 channel_info_destroy(new_channel_info
);
1770 session_info_put(session_info
);
1775 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1777 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1782 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1784 free(caa_container_of(node
, struct channel_state_sample
,
1789 int handle_notification_thread_command_remove_channel(
1790 struct notification_thread_state
*state
,
1791 uint64_t channel_key
, enum lttng_domain_type domain
,
1792 enum lttng_error_code
*cmd_result
)
1794 struct cds_lfht_node
*node
;
1795 struct cds_lfht_iter iter
;
1796 struct lttng_channel_trigger_list
*trigger_list
;
1797 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1798 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1799 struct channel_info
*channel_info
;
1801 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1802 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1806 cds_lfht_lookup(state
->channel_triggers_ht
,
1807 hash_channel_key(&key
),
1808 match_channel_trigger_list
,
1811 node
= cds_lfht_iter_get_node(&iter
);
1813 * There is a severe internal error if we are being asked to remove a
1814 * channel that doesn't exist.
1817 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1821 /* Free the list of triggers associated with this channel. */
1822 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1823 channel_triggers_ht_node
);
1824 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1825 &trigger_list
->list
, node
) {
1826 cds_list_del(&trigger_list_element
->node
);
1827 free(trigger_list_element
);
1829 cds_lfht_del(state
->channel_triggers_ht
, node
);
1830 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1832 /* Free sampled channel state. */
1833 cds_lfht_lookup(state
->channel_state_ht
,
1834 hash_channel_key(&key
),
1835 match_channel_state_sample
,
1838 node
= cds_lfht_iter_get_node(&iter
);
1840 * This is expected to be NULL if the channel is destroyed before we
1841 * received a sample.
1844 struct channel_state_sample
*sample
= caa_container_of(node
,
1845 struct channel_state_sample
,
1846 channel_state_ht_node
);
1848 cds_lfht_del(state
->channel_state_ht
, node
);
1849 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1852 /* Remove the channel from the channels_ht and free it. */
1853 cds_lfht_lookup(state
->channels_ht
,
1854 hash_channel_key(&key
),
1858 node
= cds_lfht_iter_get_node(&iter
);
1860 channel_info
= caa_container_of(node
, struct channel_info
,
1862 cds_lfht_del(state
->channels_ht
, node
);
1863 channel_info_destroy(channel_info
);
1866 *cmd_result
= LTTNG_OK
;
1871 int handle_notification_thread_command_session_rotation(
1872 struct notification_thread_state
*state
,
1873 enum notification_thread_command_type cmd_type
,
1874 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1875 uint64_t trace_archive_chunk_id
,
1876 struct lttng_trace_archive_location
*location
,
1877 enum lttng_error_code
*_cmd_result
)
1880 enum lttng_error_code cmd_result
= LTTNG_OK
;
1881 struct lttng_session_trigger_list
*trigger_list
;
1882 struct lttng_trigger_list_element
*trigger_list_element
;
1883 struct session_info
*session_info
;
1887 session_info
= find_or_create_session_info(state
, session_name
,
1888 session_uid
, session_gid
);
1889 if (!session_info
) {
1890 /* Allocation error or an internal error occurred. */
1892 cmd_result
= LTTNG_ERR_NOMEM
;
1896 session_info
->rotation
.ongoing
=
1897 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1898 session_info
->rotation
.id
= trace_archive_chunk_id
;
1899 trigger_list
= get_session_trigger_list(state
, session_name
);
1900 if (!trigger_list
) {
1901 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1906 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1908 const struct lttng_condition
*condition
;
1909 const struct lttng_action
*action
;
1910 const struct lttng_trigger
*trigger
;
1911 struct notification_client_list
*client_list
;
1912 struct lttng_evaluation
*evaluation
= NULL
;
1913 enum lttng_condition_type condition_type
;
1914 bool client_list_is_empty
;
1916 trigger
= trigger_list_element
->trigger
;
1917 condition
= lttng_trigger_get_const_condition(trigger
);
1919 condition_type
= lttng_condition_get_type(condition
);
1921 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1922 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1924 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1925 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1929 action
= lttng_trigger_get_const_action(trigger
);
1931 /* Notify actions are the only type currently supported. */
1932 assert(lttng_action_get_type_const(action
) ==
1933 LTTNG_ACTION_TYPE_NOTIFY
);
1935 client_list
= get_client_list_from_condition(state
, condition
);
1936 assert(client_list
);
1938 pthread_mutex_lock(&client_list
->lock
);
1939 client_list_is_empty
= cds_list_empty(&client_list
->list
);
1940 pthread_mutex_unlock(&client_list
->lock
);
1941 if (client_list_is_empty
) {
1943 * No clients interested in the evaluation's result,
1949 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1950 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1951 trace_archive_chunk_id
);
1953 evaluation
= lttng_evaluation_session_rotation_completed_create(
1954 trace_archive_chunk_id
, location
);
1958 /* Internal error */
1960 cmd_result
= LTTNG_ERR_UNK
;
1964 /* Dispatch evaluation result to all clients. */
1965 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1966 evaluation
, client_list
, state
,
1969 lttng_evaluation_destroy(evaluation
);
1971 notification_client_list_put(client_list
);
1972 if (caa_unlikely(ret
)) {
1977 session_info_put(session_info
);
1978 *_cmd_result
= cmd_result
;
1984 int handle_notification_thread_command_add_application(
1985 struct notification_thread_handle
*handle
,
1986 struct notification_thread_state
*state
,
1987 int read_side_trigger_event_application_pipe
,
1988 enum lttng_error_code
*_cmd_result
)
1991 enum lttng_error_code cmd_result
= LTTNG_OK
;
1992 struct notification_event_trigger_source_element
*element
= NULL
;
1994 element
= zmalloc(sizeof(*element
));
1996 cmd_result
= LTTNG_ERR_NOMEM
;
2001 CDS_INIT_LIST_HEAD(&element
->node
);
2002 element
->fd
= read_side_trigger_event_application_pipe
;
2004 pthread_mutex_lock(&handle
->event_trigger_sources
.lock
);
2005 cds_list_add(&element
->node
, &handle
->event_trigger_sources
.list
);
2006 pthread_mutex_unlock(&handle
->event_trigger_sources
.lock
);
2008 /* TODO: remove on failure to add to list? */
2010 /* Adding the read side pipe to the event poll */
2011 ret
= lttng_poll_add(&state
->events
,
2012 read_side_trigger_event_application_pipe
,
2013 LPOLLIN
| LPOLLERR
);
2015 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe
);
2017 /* TODO: what should be the value of cmd_result??? */
2018 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
2023 *_cmd_result
= cmd_result
;
2028 int handle_notification_thread_command_remove_application(
2029 struct notification_thread_handle
*handle
,
2030 struct notification_thread_state
*state
,
2031 int read_side_trigger_event_application_pipe
,
2032 enum lttng_error_code
*_cmd_result
)
2035 enum lttng_error_code cmd_result
= LTTNG_OK
;
2037 /* TODO: missing a lock propably to revisit */
2038 struct notification_event_trigger_source_element
*source_element
, *tmp
;
2039 cds_list_for_each_entry_safe(source_element
, tmp
,
2040 &handle
->event_trigger_sources
.list
, node
) {
2041 if (source_element
->fd
!= read_side_trigger_event_application_pipe
) {
2045 DBG("[notification-thread] Removed event source from event source list");
2046 cds_list_del(&source_element
->node
);
2050 DBG3("[notification-thread] Removing application event source from fd: %d", read_side_trigger_event_application_pipe
);
2051 /* Removing the read side pipe to the event poll */
2052 ret
= lttng_poll_del(&state
->events
,
2053 read_side_trigger_event_application_pipe
);
2055 /* TODO: what should be the value of cmd_result??? */
2056 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
2061 *_cmd_result
= cmd_result
;
2065 static int handle_notification_thread_command_get_tokens(
2066 struct notification_thread_handle
*handle
,
2067 struct notification_thread_state
*state
,
2068 struct lttng_triggers
**triggers
,
2069 enum lttng_error_code
*_cmd_result
)
2072 enum lttng_error_code cmd_result
= LTTNG_OK
;
2073 struct cds_lfht_iter iter
;
2074 struct notification_trigger_tokens_ht_element
*element
;
2075 struct lttng_triggers
*local_triggers
= NULL
;
2077 local_triggers
= lttng_triggers_create();
2078 if (!local_triggers
) {
2079 cmd_result
= LTTNG_ERR_NOMEM
;
2084 cds_lfht_for_each_entry (
2085 state
->trigger_tokens_ht
, &iter
, element
, node
) {
2086 ret
= lttng_triggers_add(local_triggers
, element
->trigger
);
2088 cmd_result
= LTTNG_ERR_FATAL
;
2093 /* Ownership is shared with the lttng_triggers object */
2094 lttng_trigger_get(element
->trigger
);
2099 /* Passing ownership up */
2100 *triggers
= local_triggers
;
2101 local_triggers
= NULL
;
2105 lttng_triggers_destroy(local_triggers
);
2106 *_cmd_result
= cmd_result
;
2111 int handle_notification_thread_command_list_triggers(
2112 struct notification_thread_handle
*handle
,
2113 struct notification_thread_state
*state
,
2116 struct lttng_triggers
**triggers
,
2117 enum lttng_error_code
*_cmd_result
)
2120 enum lttng_error_code cmd_result
= LTTNG_OK
;
2121 struct cds_lfht_iter iter
;
2122 struct lttng_trigger_ht_element
*trigger_ht_element
;
2123 struct lttng_triggers
*local_triggers
= NULL
;
2124 const struct lttng_credentials
*creds
;
2127 unsigned long count
;
2130 cds_lfht_count_nodes(state
->triggers_ht
, &scb
, &count
, &sca
);
2132 /* TODO check downcasting */
2133 local_triggers
= lttng_triggers_create();
2134 if (!local_triggers
) {
2135 cmd_result
= LTTNG_ERR_NOMEM
;
2139 cds_lfht_for_each_entry (state
->triggers_ht
, &iter
,
2140 trigger_ht_element
, node
) {
2141 /* Only return the trigger for which the requestion client have
2142 * access. For now the root user can only list its own
2144 * TODO: root user behavior
2146 creds
= lttng_trigger_get_credentials(trigger_ht_element
->trigger
);
2147 if ((uid
!= creds
->uid
) || (gid
!= creds
->gid
)) {
2151 ret
= lttng_triggers_add(local_triggers
, trigger_ht_element
->trigger
);
2156 /* Ownership is shared with the lttng_triggers object */
2157 lttng_trigger_get(trigger_ht_element
->trigger
);
2162 /* Passing ownership up */
2163 *triggers
= local_triggers
;
2164 local_triggers
= NULL
;
2168 lttng_triggers_destroy(local_triggers
);
2169 *_cmd_result
= cmd_result
;
2174 int condition_is_supported(struct lttng_condition
*condition
)
2178 switch (lttng_condition_get_type(condition
)) {
2179 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
2180 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
2182 enum lttng_domain_type domain
;
2184 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
2191 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
2197 * Older kernel tracers don't expose the API to monitor their
2198 * buffers. Therefore, we reject triggers that require that
2199 * mechanism to be available to be evaluated.
2201 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
2204 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
2207 * Check for kernel support.
2208 * Check for ust support ??
2221 int action_is_supported(struct lttng_action
*action
)
2225 switch (lttng_action_get_type(action
)) {
2226 case LTTNG_ACTION_TYPE_NOTIFY
:
2227 case LTTNG_ACTION_TYPE_START_SESSION
:
2228 case LTTNG_ACTION_TYPE_STOP_SESSION
:
2229 case LTTNG_ACTION_TYPE_ROTATE_SESSION
:
2230 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION
:
2232 /* TODO validate that this is true for kernel in regards to
2233 * rotation and snapshot. Start stop is not a problem notify
2236 /* For now all type of actions are supported */
2240 case LTTNG_ACTION_TYPE_GROUP
:
2242 /* TODO: Iterate over all internal actions and validate that
2243 * they are supported
2256 /* Must be called with RCU read lock held. */
2258 int bind_trigger_to_matching_session(struct lttng_trigger
*trigger
,
2259 struct notification_thread_state
*state
)
2262 const struct lttng_condition
*condition
;
2263 const char *session_name
;
2264 struct lttng_session_trigger_list
*trigger_list
;
2266 condition
= lttng_trigger_get_const_condition(trigger
);
2267 switch (lttng_condition_get_type(condition
)) {
2268 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
2269 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
2271 enum lttng_condition_status status
;
2273 status
= lttng_condition_session_rotation_get_session_name(
2274 condition
, &session_name
);
2275 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
2276 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2287 trigger_list
= get_session_trigger_list(state
, session_name
);
2288 if (!trigger_list
) {
2289 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2295 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2297 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
2302 /* Must be called with RCU read lock held. */
2304 int bind_trigger_to_matching_channels(struct lttng_trigger
*trigger
,
2305 struct notification_thread_state
*state
)
2308 struct cds_lfht_node
*node
;
2309 struct cds_lfht_iter iter
;
2310 struct channel_info
*channel
;
2312 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
2314 struct lttng_trigger_list_element
*trigger_list_element
;
2315 struct lttng_channel_trigger_list
*trigger_list
;
2316 struct cds_lfht_iter lookup_iter
;
2318 if (!trigger_applies_to_channel(trigger
, channel
)) {
2322 cds_lfht_lookup(state
->channel_triggers_ht
,
2323 hash_channel_key(&channel
->key
),
2324 match_channel_trigger_list
,
2327 node
= cds_lfht_iter_get_node(&lookup_iter
);
2329 trigger_list
= caa_container_of(node
,
2330 struct lttng_channel_trigger_list
,
2331 channel_triggers_ht_node
);
2333 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2334 if (!trigger_list_element
) {
2338 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2339 trigger_list_element
->trigger
= trigger
;
2340 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2341 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2348 static int action_notify_register_trigger(
2349 struct notification_thread_state
*state
,
2350 struct lttng_trigger
*trigger
)
2354 struct lttng_condition
*condition
;
2355 struct notification_client
*client
;
2356 struct notification_client_list
*client_list
= NULL
;
2357 struct cds_lfht_iter iter
;
2358 struct notification_client_list_element
*client_list_element
, *tmp
;
2360 condition
= lttng_trigger_get_condition(trigger
);
2363 client_list
= notification_client_list_create(trigger
);
2369 /* Build a list of clients to which this new trigger applies. */
2370 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2371 client_socket_ht_node
) {
2372 if (!trigger_applies_to_client(trigger
, client
)) {
2376 client_list_element
= zmalloc(sizeof(*client_list_element
));
2377 if (!client_list_element
) {
2379 goto error_put_client_list
;
2381 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2382 client_list_element
->client
= client
;
2383 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2386 switch (get_condition_binding_object(condition
)) {
2387 case LTTNG_OBJECT_TYPE_SESSION
:
2388 /* Add the trigger to the list if it matches a known session. */
2389 ret
= bind_trigger_to_matching_session(trigger
, state
);
2391 goto error_put_client_list
;
2394 case LTTNG_OBJECT_TYPE_CHANNEL
:
2396 * Add the trigger to list of triggers bound to the channels
2399 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2401 goto error_put_client_list
;
2404 case LTTNG_OBJECT_TYPE_NONE
:
2407 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2409 goto error_put_client_list
;
2413 * Since there is nothing preventing clients from subscribing to a
2414 * condition before the corresponding trigger is registered, we have
2415 * to evaluate this new condition right away.
2417 * At some point, we were waiting for the next "evaluation" (e.g. on
2418 * reception of a channel sample) to evaluate this new condition, but
2421 * The reason it was broken is that waiting for the next sample
2422 * does not allow us to properly handle transitions for edge-triggered
2425 * Consider this example: when we handle a new channel sample, we
2426 * evaluate each conditions twice: once with the previous state, and
2427 * again with the newest state. We then use those two results to
2428 * determine whether a state change happened: a condition was false and
2429 * became true. If a state change happened, we have to notify clients.
2431 * Now, if a client subscribes to a given notification and registers
2432 * a trigger *after* that subscription, we have to make sure the
2433 * condition is evaluated at this point while considering only the
2434 * current state. Otherwise, the next evaluation cycle may only see
2435 * that the evaluations remain the same (true for samples n-1 and n) and
2436 * the client will never know that the condition has been met.
2438 * No need to lock the list here as it has not been published yet.
2440 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2441 &client_list
->list
, node
) {
2442 ret
= evaluate_condition_for_client(trigger
, condition
,
2443 client_list_element
->client
, state
);
2445 goto error_put_client_list
;
2450 * Client list ownership transferred to the
2451 * notification_trigger_clients_ht.
2453 publish_notification_client_list(state
, client_list
);
2455 error_put_client_list
:
2456 notification_client_list_put(client_list
);
2462 bool trigger_name_taken(struct notification_thread_state
*state
, const char *name
)
2464 struct cds_lfht_node
*triggers_by_name_ht_node
;
2465 struct cds_lfht_iter iter
;
2466 /* TODO change hashing for trigger */
2467 cds_lfht_lookup(state
->triggers_by_name_ht
,
2468 hash_key_str(name
, lttng_ht_seed
),
2472 triggers_by_name_ht_node
= cds_lfht_iter_get_node(&iter
);
2473 if (triggers_by_name_ht_node
) {
2481 void generate_trigger_name(struct notification_thread_state
*state
, struct lttng_trigger
*trigger
, const char **name
)
2483 /* Here the offset criteria guarantee an end. This will be a nice
2484 * bikeshedding conversation. I would simply generate uuid and use them
2489 lttng_trigger_generate_name(trigger
, state
->trigger_id
.name_offset
);
2490 /* TODO error checking */
2491 lttng_trigger_get_name(trigger
, name
);
2492 taken
= trigger_name_taken(state
, *name
);
2494 state
->trigger_id
.name_offset
++;
2496 } while (taken
|| state
->trigger_id
.name_offset
== UINT32_MAX
);
2499 static bool action_is_notify(const struct lttng_action
*action
)
2501 /* TODO for action groups we need to iterate over all of them */
2502 enum lttng_action_type type
= lttng_action_get_type_const(action
);
2504 enum lttng_action_status status
;
2505 const struct lttng_action
*tmp
;
2506 unsigned int i
, count
;
2509 case LTTNG_ACTION_TYPE_NOTIFY
:
2512 case LTTNG_ACTION_TYPE_GROUP
:
2513 status
= lttng_action_group_get_count(action
, &count
);
2514 if (status
!= LTTNG_ACTION_STATUS_OK
) {
2517 for (i
= 0; i
< count
; i
++) {
2518 tmp
= lttng_action_group_get_at_index_const(action
, i
);
2520 ret
= action_is_notify(tmp
);
2535 * TODO: REVIEW THIS COMMENT.
2536 * FIXME A client's credentials are not checked when registering a trigger, nor
2537 * are they stored alongside with the trigger.
2539 * The effects of this are benign since:
2540 * - The client will succeed in registering the trigger, as it is valid,
2541 * - The trigger will, internally, be bound to the channel/session,
2542 * - The notifications will not be sent since the client's credentials
2543 * are checked against the channel at that moment.
2545 * If this function returns a non-zero value, it means something is
2546 * fundamentally broken and the whole subsystem/thread will be torn down.
2548 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2552 int handle_notification_thread_command_register_trigger(
2553 struct notification_thread_state
*state
,
2554 struct lttng_trigger
*trigger
,
2555 enum lttng_error_code
*cmd_result
)
2559 struct lttng_condition
*condition
;
2560 struct lttng_action
*action
;
2561 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2562 struct notification_trigger_tokens_ht_element
*trigger_tokens_ht_element
= NULL
;
2563 struct cds_lfht_node
*node
;
2564 const char* trigger_name
;
2565 bool free_trigger
= true;
2567 assert(trigger
->creds
.set
);
2571 /* Set the trigger's key */
2572 lttng_trigger_set_key(trigger
, state
->trigger_id
.token_generator
);
2574 if (lttng_trigger_get_name(trigger
, &trigger_name
) == LTTNG_TRIGGER_STATUS_UNSET
) {
2575 generate_trigger_name(state
, trigger
, &trigger_name
);
2576 } else if (trigger_name_taken(state
, trigger_name
)) {
2577 /* Not a fatal error */
2578 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2583 condition
= lttng_trigger_get_condition(trigger
);
2586 action
= lttng_trigger_get_action(trigger
);
2589 is_supported
= condition_is_supported(condition
);
2590 if (is_supported
< 0) {
2592 } else if (is_supported
== 0) {
2594 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2598 is_supported
= action_is_supported(action
);
2599 if (is_supported
< 0) {
2601 } else if (is_supported
== 0) {
2603 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2607 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2608 if (!trigger_ht_element
) {
2613 /* Add trigger to the trigger_ht. */
2614 cds_lfht_node_init(&trigger_ht_element
->node
);
2615 cds_lfht_node_init(&trigger_ht_element
->node_by_name
);
2618 * This element own the trigger object from now own, this is why there
2619 * is no lttng_trigger_get here.
2620 * This thread is now the owner of the trigger object.
2622 trigger_ht_element
->trigger
= trigger
;
2624 node
= cds_lfht_add_unique(state
->triggers_ht
,
2625 lttng_condition_hash(condition
),
2628 &trigger_ht_element
->node
);
2629 if (node
!= &trigger_ht_element
->node
) {
2630 /* Not a fatal error, simply report it to the client. */
2631 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2632 goto error_free_ht_element
;
2635 node
= cds_lfht_add_unique(state
->triggers_by_name_ht
,
2636 hash_key_str(trigger_name
, lttng_ht_seed
),
2639 &trigger_ht_element
->node_by_name
);
2640 if (node
!= &trigger_ht_element
->node_by_name
) {
2641 /* This should never happen */
2642 /* Not a fatal error, simply report it to the client. */
2643 /* TODO remove from the trigger_ht */
2644 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2645 goto error_free_ht_element
;
2648 if (lttng_condition_get_type(condition
) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
) {
2649 trigger_tokens_ht_element
= zmalloc(sizeof(*trigger_tokens_ht_element
));
2650 if (!trigger_tokens_ht_element
) {
2655 /* Add trigger token to the trigger_tokens_ht. */
2656 cds_lfht_node_init(&trigger_tokens_ht_element
->node
);
2657 trigger_tokens_ht_element
->token
= trigger
->key
.value
;
2658 trigger_tokens_ht_element
->trigger
= trigger
;
2660 node
= cds_lfht_add_unique(state
->trigger_tokens_ht
,
2661 hash_key_u64(&trigger_tokens_ht_element
->token
, lttng_ht_seed
),
2662 match_trigger_token
,
2663 &trigger_tokens_ht_element
->token
,
2664 &trigger_tokens_ht_element
->node
);
2665 if (node
!= &trigger_tokens_ht_element
->node
) {
2666 /* TODO: THIS IS A FATAL ERROR... should never happen */
2667 /* Not a fatal error, simply report it to the client. */
2668 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2669 goto error_free_ht_element
;
2674 * Ownership of the trigger and of its wrapper was transfered to
2675 * the triggers_ht. Same for token ht element if necessary.
2677 trigger_tokens_ht_element
= NULL
;
2678 trigger_ht_element
= NULL
;
2679 free_trigger
= false;
2681 if (action_is_notify(action
)) {
2682 ret
= action_notify_register_trigger(state
, trigger
);
2684 /* TODO should cmd_result be set here? */
2686 goto error_free_ht_element
;
2690 /* Increment the trigger unique id generator */
2691 state
->trigger_id
.token_generator
++;
2692 *cmd_result
= LTTNG_OK
;
2694 error_free_ht_element
:
2695 free(trigger_ht_element
);
2696 free(trigger_tokens_ht_element
);
2699 lttng_trigger_destroy(trigger
);
2706 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2708 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2713 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head
*node
)
2715 free(caa_container_of(node
, struct notification_trigger_tokens_ht_element
,
2720 int handle_notification_thread_command_unregister_trigger(
2721 struct notification_thread_state
*state
,
2722 struct lttng_trigger
*trigger
,
2723 enum lttng_error_code
*_cmd_reply
)
2725 struct cds_lfht_iter iter
;
2726 struct cds_lfht_node
*triggers_ht_node
;
2727 struct lttng_channel_trigger_list
*trigger_list
;
2728 struct notification_client_list
*client_list
;
2729 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2730 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2732 struct lttng_action
*action
= lttng_trigger_get_action(trigger
);
2733 enum lttng_error_code cmd_reply
;
2737 /* TODO change hashing for trigger */
2738 /* TODO Disabling for the root user is not complete, for now the root
2739 * user cannot disable the trigger from another user.
2741 cds_lfht_lookup(state
->triggers_ht
,
2742 lttng_condition_hash(condition
),
2746 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2747 if (!triggers_ht_node
) {
2748 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2751 cmd_reply
= LTTNG_OK
;
2754 /* Remove trigger from channel_triggers_ht. */
2755 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2756 channel_triggers_ht_node
) {
2757 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2759 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2760 &trigger_list
->list
, node
) {
2761 if (!lttng_trigger_is_equal(trigger
, trigger_element
->trigger
)) {
2765 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2766 cds_list_del(&trigger_element
->node
);
2767 /* A trigger can only appear once per channel */
2772 if (lttng_condition_get_type(condition
) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
) {
2773 struct notification_trigger_tokens_ht_element
*trigger_tokens_ht_element
;
2774 cds_lfht_for_each_entry(state
->trigger_tokens_ht
, &iter
, trigger_tokens_ht_element
,
2776 if (!lttng_trigger_is_equal(trigger
, trigger_tokens_ht_element
->trigger
)) {
2780 /* TODO talk to all app and remove it */
2781 DBG("[notification-thread] Removed trigger from tokens_ht");
2782 cds_lfht_del(state
->trigger_tokens_ht
,
2783 &trigger_tokens_ht_element
->node
);
2784 call_rcu(&trigger_tokens_ht_element
->rcu_node
, free_notification_trigger_tokens_ht_element_rcu
);
2790 if (action_is_notify(action
)) {
2792 * Remove and release the client list from
2793 * notification_trigger_clients_ht.
2795 client_list
= get_client_list_from_condition(state
, condition
);
2796 assert(client_list
);
2798 /* Put new reference and the hashtable's reference. */
2799 notification_client_list_put(client_list
);
2800 notification_client_list_put(client_list
);
2804 /* Remove trigger from triggers_ht. */
2805 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2806 struct lttng_trigger_ht_element
, node
);
2807 cds_lfht_del(state
->triggers_by_name_ht
, &trigger_ht_element
->node_by_name
);
2808 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2810 /* Release the ownership of the trigger */
2811 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2812 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2816 *_cmd_reply
= cmd_reply
;
2821 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2822 int handle_notification_thread_command(
2823 struct notification_thread_handle
*handle
,
2824 struct notification_thread_state
*state
)
2828 struct notification_thread_command
*cmd
;
2830 /* Read the event pipe to put it back into a quiescent state. */
2831 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2833 if (ret
!= sizeof(counter
)) {
2837 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2838 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2839 struct notification_thread_command
, cmd_list_node
);
2840 switch (cmd
->type
) {
2841 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2842 DBG("[notification-thread] Received register trigger command");
2843 ret
= handle_notification_thread_command_register_trigger(
2844 state
, cmd
->parameters
.trigger
,
2847 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2848 DBG("[notification-thread] Received unregister trigger command");
2849 ret
= handle_notification_thread_command_unregister_trigger(
2850 state
, cmd
->parameters
.trigger
,
2853 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2854 DBG("[notification-thread] Received add channel command");
2855 ret
= handle_notification_thread_command_add_channel(
2857 cmd
->parameters
.add_channel
.session
.name
,
2858 cmd
->parameters
.add_channel
.session
.uid
,
2859 cmd
->parameters
.add_channel
.session
.gid
,
2860 cmd
->parameters
.add_channel
.channel
.name
,
2861 cmd
->parameters
.add_channel
.channel
.domain
,
2862 cmd
->parameters
.add_channel
.channel
.key
,
2863 cmd
->parameters
.add_channel
.channel
.capacity
,
2866 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2867 DBG("[notification-thread] Received remove channel command");
2868 ret
= handle_notification_thread_command_remove_channel(
2869 state
, cmd
->parameters
.remove_channel
.key
,
2870 cmd
->parameters
.remove_channel
.domain
,
2873 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2874 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2875 DBG("[notification-thread] Received session rotation %s command",
2876 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2877 "ongoing" : "completed");
2878 ret
= handle_notification_thread_command_session_rotation(
2881 cmd
->parameters
.session_rotation
.session_name
,
2882 cmd
->parameters
.session_rotation
.uid
,
2883 cmd
->parameters
.session_rotation
.gid
,
2884 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2885 cmd
->parameters
.session_rotation
.location
,
2888 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION
:
2889 ret
= handle_notification_thread_command_add_application(
2892 cmd
->parameters
.application
.read_side_trigger_event_application_pipe
,
2895 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION
:
2896 ret
= handle_notification_thread_command_remove_application(
2899 cmd
->parameters
.application
.read_side_trigger_event_application_pipe
,
2902 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS
:
2904 struct lttng_triggers
*triggers
= NULL
;
2905 ret
= handle_notification_thread_command_get_tokens(
2906 handle
, state
, &triggers
, &cmd
->reply_code
);
2907 cmd
->reply
.get_tokens
.triggers
= triggers
;
2911 cmd
->reply_code
= LTTNG_OK
;
2915 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS
:
2917 struct lttng_triggers
*triggers
= NULL
;
2918 ret
= handle_notification_thread_command_list_triggers(
2921 cmd
->parameters
.list_triggers
.uid
,
2922 cmd
->parameters
.list_triggers
.gid
,
2925 cmd
->reply
.list_triggers
.triggers
= triggers
;
2929 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2930 DBG("[notification-thread] Received quit command");
2931 cmd
->reply_code
= LTTNG_OK
;
2934 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE
:
2936 const enum client_transmission_status client_status
=
2937 cmd
->parameters
.client_communication_update
2939 const notification_client_id client_id
=
2940 cmd
->parameters
.client_communication_update
.id
;
2941 struct notification_client
*client
;
2944 client
= get_client_from_id(client_id
, state
);
2948 * Client error was probably already picked-up by the
2949 * notification thread or it has disconnected
2950 * gracefully while this command was queued.
2952 DBG("Failed to find notification client to update communication status, client id = %" PRIu64
,
2956 pthread_mutex_lock(&client
->lock
);
2957 ret
= client_handle_transmission_status(
2958 client
, client_status
, state
);
2959 pthread_mutex_unlock(&client
->lock
);
2965 ERR("[notification-thread] Unknown internal command received");
2973 cds_list_del(&cmd
->cmd_list_node
);
2974 if (cmd
->is_async
) {
2978 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2980 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2983 /* Wake-up and return a fatal error to the calling thread. */
2984 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2985 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2986 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2988 /* Indicate a fatal error to the caller. */
2993 int socket_set_non_blocking(int socket
)
2997 /* Set the pipe as non-blocking. */
2998 ret
= fcntl(socket
, F_GETFL
, 0);
3000 PERROR("fcntl get socket flags");
3005 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
3007 PERROR("fcntl set O_NONBLOCK socket flag");
3010 DBG("Client socket (fd = %i) set as non-blocking", socket
);
3015 /* Client lock must be acquired by caller. */
3017 int client_reset_inbound_state(struct notification_client
*client
)
3021 ASSERT_LOCKED(client
->lock
);
3023 ret
= lttng_dynamic_buffer_set_size(
3024 &client
->communication
.inbound
.buffer
, 0);
3027 client
->communication
.inbound
.bytes_to_receive
=
3028 sizeof(struct lttng_notification_channel_message
);
3029 client
->communication
.inbound
.msg_type
=
3030 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
3031 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
3032 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
3033 ret
= lttng_dynamic_buffer_set_size(
3034 &client
->communication
.inbound
.buffer
,
3035 client
->communication
.inbound
.bytes_to_receive
);
3039 int handle_notification_thread_client_connect(
3040 struct notification_thread_state
*state
)
3043 struct notification_client
*client
;
3045 DBG("[notification-thread] Handling new notification channel client connection");
3047 client
= zmalloc(sizeof(*client
));
3053 pthread_mutex_init(&client
->lock
, NULL
);
3054 client
->id
= state
->next_notification_client_id
++;
3055 CDS_INIT_LIST_HEAD(&client
->condition_list
);
3056 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
3057 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
3058 client
->communication
.inbound
.expect_creds
= true;
3060 pthread_mutex_lock(&client
->lock
);
3061 ret
= client_reset_inbound_state(client
);
3062 pthread_mutex_unlock(&client
->lock
);
3064 ERR("[notification-thread] Failed to reset client communication's inbound state");
3069 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
3071 ERR("[notification-thread] Failed to accept new notification channel client connection");
3076 client
->socket
= ret
;
3078 ret
= socket_set_non_blocking(client
->socket
);
3080 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3084 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
3086 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3091 ret
= lttng_poll_add(&state
->events
, client
->socket
,
3092 LPOLLIN
| LPOLLERR
|
3093 LPOLLHUP
| LPOLLRDHUP
);
3095 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3099 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3103 cds_lfht_add(state
->client_socket_ht
,
3104 hash_client_socket(client
->socket
),
3105 &client
->client_socket_ht_node
);
3106 cds_lfht_add(state
->client_id_ht
,
3107 hash_client_id(client
->id
),
3108 &client
->client_id_ht_node
);
3113 notification_client_destroy(client
, state
);
3117 /* RCU read-lock must be held by the caller. */
3118 /* Client lock must be held by the caller */
3120 int notification_thread_client_disconnect(
3121 struct notification_client
*client
,
3122 struct notification_thread_state
*state
)
3125 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
3127 /* Acquire the client lock to disable its communication atomically. */
3128 client
->communication
.active
= false;
3129 ret
= lttng_poll_del(&state
->events
, client
->socket
);
3131 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3135 cds_lfht_del(state
->client_socket_ht
, &client
->client_socket_ht_node
);
3136 cds_lfht_del(state
->client_id_ht
, &client
->client_id_ht_node
);
3138 /* Release all conditions to which the client was subscribed. */
3139 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
3140 &client
->condition_list
, node
) {
3141 (void) notification_thread_client_unsubscribe(client
,
3142 condition_list_element
->condition
, state
, NULL
);
3146 * Client no longer accessible to other threads (through the
3149 notification_client_destroy(client
, state
);
3153 int handle_notification_thread_client_disconnect(
3154 int client_socket
, struct notification_thread_state
*state
)
3157 struct notification_client
*client
;
3160 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3162 client
= get_client_from_socket(client_socket
, state
);
3164 /* Internal state corruption, fatal error. */
3165 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3171 pthread_mutex_lock(&client
->lock
);
3172 ret
= notification_thread_client_disconnect(client
, state
);
3173 pthread_mutex_unlock(&client
->lock
);
3179 int handle_notification_thread_client_disconnect_all(
3180 struct notification_thread_state
*state
)
3182 struct cds_lfht_iter iter
;
3183 struct notification_client
*client
;
3184 bool error_encoutered
= false;
3187 DBG("[notification-thread] Closing all client connections");
3188 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
3189 client_socket_ht_node
) {
3192 pthread_mutex_lock(&client
->lock
);
3193 ret
= notification_thread_client_disconnect(
3195 pthread_mutex_unlock(&client
->lock
);
3197 error_encoutered
= true;
3201 return error_encoutered
? 1 : 0;
3204 int handle_notification_thread_trigger_unregister_all(
3205 struct notification_thread_state
*state
)
3207 bool error_occurred
= false;
3208 struct cds_lfht_iter iter
;
3209 struct lttng_trigger_ht_element
*trigger_ht_element
;
3212 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
3214 int ret
= handle_notification_thread_command_unregister_trigger(
3215 state
, trigger_ht_element
->trigger
, NULL
);
3217 error_occurred
= true;
3221 return error_occurred
? -1 : 0;
3225 int client_handle_transmission_status(
3226 struct notification_client
*client
,
3227 enum client_transmission_status transmission_status
,
3228 struct notification_thread_state
*state
)
3232 ASSERT_LOCKED(client
->lock
);
3234 switch (transmission_status
) {
3235 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
3236 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
3237 CLIENT_POLL_MASK_IN
);
3242 client
->communication
.outbound
.queued_command_reply
= false;
3243 client
->communication
.outbound
.dropped_notification
= false;
3245 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
3247 * We want to be notified whenever there is buffer space
3248 * available to send the rest of the payload.
3250 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
3251 CLIENT_POLL_MASK_IN_OUT
);
3256 case CLIENT_TRANSMISSION_STATUS_FAIL
:
3257 ret
= notification_thread_client_disconnect(client
, state
);
3262 case CLIENT_TRANSMISSION_STATUS_ERROR
:
3272 /* Client lock must be acquired by caller. */
3274 enum client_transmission_status
client_flush_outgoing_queue(
3275 struct notification_client
*client
)
3278 size_t to_send_count
;
3279 enum client_transmission_status status
;
3281 ASSERT_LOCKED(client
->lock
);
3283 if (!client
->communication
.active
) {
3284 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
3288 assert(client
->communication
.outbound
.buffer
.size
!= 0);
3289 to_send_count
= client
->communication
.outbound
.buffer
.size
;
3290 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3293 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
3294 client
->communication
.outbound
.buffer
.data
,
3296 if ((ret
< 0 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) ||
3297 (ret
> 0 && ret
< to_send_count
)) {
3298 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3300 to_send_count
-= max(ret
, 0);
3302 memcpy(client
->communication
.outbound
.buffer
.data
,
3303 client
->communication
.outbound
.buffer
.data
+
3304 client
->communication
.outbound
.buffer
.size
- to_send_count
,
3306 ret
= lttng_dynamic_buffer_set_size(
3307 &client
->communication
.outbound
.buffer
,
3312 status
= CLIENT_TRANSMISSION_STATUS_QUEUED
;
3313 } else if (ret
< 0) {
3314 /* Generic error, disconnect the client. */
3315 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3317 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
3319 /* No error and flushed the queue completely. */
3320 ret
= lttng_dynamic_buffer_set_size(
3321 &client
->communication
.outbound
.buffer
, 0);
3325 status
= CLIENT_TRANSMISSION_STATUS_COMPLETE
;
3330 return CLIENT_TRANSMISSION_STATUS_ERROR
;
3333 /* Client lock must be acquired by caller. */
3335 int client_send_command_reply(struct notification_client
*client
,
3336 struct notification_thread_state
*state
,
3337 enum lttng_notification_channel_status status
)
3340 struct lttng_notification_channel_command_reply reply
= {
3341 .status
= (int8_t) status
,
3343 struct lttng_notification_channel_message msg
= {
3344 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
3345 .size
= sizeof(reply
),
3347 char buffer
[sizeof(msg
) + sizeof(reply
)];
3348 enum client_transmission_status transmission_status
;
3350 ASSERT_LOCKED(client
->lock
);
3352 if (client
->communication
.outbound
.queued_command_reply
) {
3353 /* Protocol error. */
3357 memcpy(buffer
, &msg
, sizeof(msg
));
3358 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
3359 DBG("[notification-thread] Send command reply (%i)", (int) status
);
3361 /* Enqueue buffer to outgoing queue and flush it. */
3362 ret
= lttng_dynamic_buffer_append(
3363 &client
->communication
.outbound
.buffer
,
3364 buffer
, sizeof(buffer
));
3369 transmission_status
= client_flush_outgoing_queue(client
);
3370 ret
= client_handle_transmission_status(
3371 client
, transmission_status
, state
);
3376 if (client
->communication
.outbound
.buffer
.size
!= 0) {
3377 /* Queue could not be emptied. */
3378 client
->communication
.outbound
.queued_command_reply
= true;
3387 int client_handle_message_unknown(struct notification_client
*client
,
3388 struct notification_thread_state
*state
)
3392 pthread_mutex_lock(&client
->lock
);
3395 * Receiving message header. The function will be called again
3396 * once the rest of the message as been received and can be
3399 const struct lttng_notification_channel_message
*msg
;
3401 assert(sizeof(*msg
) == client
->communication
.inbound
.buffer
.size
);
3402 msg
= (const struct lttng_notification_channel_message
*)
3403 client
->communication
.inbound
.buffer
.data
;
3405 if (msg
->size
== 0 ||
3406 msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
3407 ERR("[notification-thread] Invalid notification channel message: length = %u",
3413 switch (msg
->type
) {
3414 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3415 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3416 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3420 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3424 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
3425 client
->communication
.inbound
.msg_type
=
3426 (enum lttng_notification_channel_message_type
) msg
->type
;
3427 ret
= lttng_dynamic_buffer_set_size(
3428 &client
->communication
.inbound
.buffer
, msg
->size
);
3430 pthread_mutex_unlock(&client
->lock
);
3435 int client_handle_message_handshake(struct notification_client
*client
,
3436 struct notification_thread_state
*state
)
3439 struct lttng_notification_channel_command_handshake
*handshake_client
;
3440 const struct lttng_notification_channel_command_handshake handshake_reply
= {
3441 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
3442 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
3444 const struct lttng_notification_channel_message msg_header
= {
3445 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
3446 .size
= sizeof(handshake_reply
),
3448 enum lttng_notification_channel_status status
=
3449 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3450 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
3451 enum client_transmission_status transmission_status
;
3453 pthread_mutex_lock(&client
->lock
);
3455 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
3456 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
3457 sizeof(handshake_reply
));
3460 (struct lttng_notification_channel_command_handshake
*)
3461 client
->communication
.inbound
.buffer
3463 client
->major
= handshake_client
->major
;
3464 client
->minor
= handshake_client
->minor
;
3465 if (!client
->communication
.inbound
.creds_received
) {
3466 ERR("[notification-thread] No credentials received from client");
3471 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
3472 &client
->communication
.inbound
.creds
);
3473 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
3474 &client
->communication
.inbound
.creds
);
3475 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3476 client
->uid
, client
->gid
, (int) client
->major
,
3477 (int) client
->minor
);
3479 if (handshake_client
->major
!=
3480 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
3481 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
3484 ret
= lttng_dynamic_buffer_append(
3485 &client
->communication
.outbound
.buffer
, send_buffer
,
3486 sizeof(send_buffer
));
3488 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3492 client
->validated
= true;
3493 client
->communication
.active
= true;
3495 transmission_status
= client_flush_outgoing_queue(client
);
3496 ret
= client_handle_transmission_status(
3497 client
, transmission_status
, state
);
3502 ret
= client_send_command_reply(client
, state
, status
);
3504 ERR("[notification-thread] Failed to send reply to notification channel client");
3508 /* Set reception state to receive the next message header. */
3509 ret
= client_reset_inbound_state(client
);
3511 ERR("[notification-thread] Failed to reset client communication's inbound state");
3516 pthread_mutex_unlock(&client
->lock
);
3521 int client_handle_message_subscription(
3522 struct notification_client
*client
,
3523 enum lttng_notification_channel_message_type msg_type
,
3524 struct notification_thread_state
*state
)
3527 struct lttng_condition
*condition
;
3528 enum lttng_notification_channel_status status
=
3529 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3530 const struct lttng_buffer_view condition_view
=
3531 lttng_buffer_view_from_dynamic_buffer(
3532 &client
->communication
.inbound
.buffer
,
3534 size_t expected_condition_size
;
3536 pthread_mutex_lock(&client
->lock
);
3537 expected_condition_size
= client
->communication
.inbound
.buffer
.size
;
3538 pthread_mutex_unlock(&client
->lock
);
3540 ret
= lttng_condition_create_from_buffer(&condition_view
, &condition
);
3541 if (ret
!= expected_condition_size
) {
3542 ERR("[notification-thread] Malformed condition received from client");
3546 if (msg_type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
3547 ret
= notification_thread_client_subscribe(
3548 client
, condition
, state
, &status
);
3550 ret
= notification_thread_client_unsubscribe(
3551 client
, condition
, state
, &status
);
3557 pthread_mutex_lock(&client
->lock
);
3558 ret
= client_send_command_reply(client
, state
, status
);
3560 ERR("[notification-thread] Failed to send reply to notification channel client");
3564 /* Set reception state to receive the next message header. */
3565 ret
= client_reset_inbound_state(client
);
3567 ERR("[notification-thread] Failed to reset client communication's inbound state");
3572 pthread_mutex_unlock(&client
->lock
);
3578 int client_dispatch_message(struct notification_client
*client
,
3579 struct notification_thread_state
*state
)
3583 if (client
->communication
.inbound
.msg_type
!=
3584 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
3585 client
->communication
.inbound
.msg_type
!=
3586 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
3587 !client
->validated
) {
3588 WARN("[notification-thread] client attempted a command before handshake");
3593 switch (client
->communication
.inbound
.msg_type
) {
3594 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
3596 ret
= client_handle_message_unknown(client
, state
);
3599 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3601 ret
= client_handle_message_handshake(client
, state
);
3604 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3605 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3607 ret
= client_handle_message_subscription(client
,
3608 client
->communication
.inbound
.msg_type
, state
);
3618 /* Incoming data from client. */
3619 int handle_notification_thread_client_in(
3620 struct notification_thread_state
*state
, int socket
)
3623 struct notification_client
*client
;
3626 bool message_is_complete
= false;
3628 client
= get_client_from_socket(socket
, state
);
3630 /* Internal error, abort. */
3635 pthread_mutex_lock(&client
->lock
);
3636 offset
= client
->communication
.inbound
.buffer
.size
-
3637 client
->communication
.inbound
.bytes_to_receive
;
3638 if (client
->communication
.inbound
.expect_creds
) {
3639 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
3640 client
->communication
.inbound
.buffer
.data
+ offset
,
3641 client
->communication
.inbound
.bytes_to_receive
,
3642 &client
->communication
.inbound
.creds
);
3644 client
->communication
.inbound
.expect_creds
= false;
3645 client
->communication
.inbound
.creds_received
= true;
3648 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
3649 client
->communication
.inbound
.buffer
.data
+ offset
,
3650 client
->communication
.inbound
.bytes_to_receive
);
3652 if (recv_ret
>= 0) {
3653 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
3654 message_is_complete
= client
->communication
.inbound
3655 .bytes_to_receive
== 0;
3657 pthread_mutex_unlock(&client
->lock
);
3659 goto error_disconnect_client
;
3662 if (message_is_complete
) {
3663 ret
= client_dispatch_message(client
, state
);
3666 * Only returns an error if this client must be
3669 goto error_disconnect_client
;
3674 error_disconnect_client
:
3675 pthread_mutex_lock(&client
->lock
);
3676 ret
= notification_thread_client_disconnect(client
, state
);
3677 pthread_mutex_unlock(&client
->lock
);
3681 /* Client ready to receive outgoing data. */
3682 int handle_notification_thread_client_out(
3683 struct notification_thread_state
*state
, int socket
)
3686 struct notification_client
*client
;
3687 enum client_transmission_status transmission_status
;
3689 client
= get_client_from_socket(socket
, state
);
3691 /* Internal error, abort. */
3696 pthread_mutex_lock(&client
->lock
);
3697 transmission_status
= client_flush_outgoing_queue(client
);
3698 ret
= client_handle_transmission_status(
3699 client
, transmission_status
, state
);
3700 pthread_mutex_unlock(&client
->lock
);
3709 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
3710 const struct channel_state_sample
*sample
,
3711 uint64_t buffer_capacity
)
3713 bool result
= false;
3715 enum lttng_condition_type condition_type
;
3716 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
3717 condition
, struct lttng_condition_buffer_usage
,
3720 if (use_condition
->threshold_bytes
.set
) {
3721 threshold
= use_condition
->threshold_bytes
.value
;
3724 * Threshold was expressed as a ratio.
3726 * TODO the threshold (in bytes) of conditions expressed
3727 * as a ratio of total buffer size could be cached to
3728 * forego this double-multiplication or it could be performed
3729 * as fixed-point math.
3731 * Note that caching should accomodate the case where the
3732 * condition applies to multiple channels (i.e. don't assume
3733 * that all channels matching my_chann* have the same size...)
3735 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3736 (double) buffer_capacity
);
3739 condition_type
= lttng_condition_get_type(condition
);
3740 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3741 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3742 threshold
, sample
->highest_usage
);
3745 * The low condition should only be triggered once _all_ of the
3746 * streams in a channel have gone below the "low" threshold.
3748 if (sample
->highest_usage
<= threshold
) {
3752 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3753 threshold
, sample
->highest_usage
);
3756 * For high buffer usage scenarios, we want to trigger whenever
3757 * _any_ of the streams has reached the "high" threshold.
3759 if (sample
->highest_usage
>= threshold
) {
3768 bool evaluate_session_consumed_size_condition(
3769 const struct lttng_condition
*condition
,
3770 uint64_t session_consumed_size
)
3773 const struct lttng_condition_session_consumed_size
*size_condition
=
3774 container_of(condition
,
3775 struct lttng_condition_session_consumed_size
,
3778 threshold
= size_condition
->consumed_threshold_bytes
.value
;
3779 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
3780 threshold
, session_consumed_size
);
3781 return session_consumed_size
>= threshold
;
3785 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
3786 struct lttng_evaluation
**evaluation
,
3787 const struct notification_thread_state
*state
,
3788 const struct channel_state_sample
*previous_sample
,
3789 const struct channel_state_sample
*latest_sample
,
3790 uint64_t previous_session_consumed_total
,
3791 uint64_t latest_session_consumed_total
,
3792 struct channel_info
*channel_info
)
3795 enum lttng_condition_type condition_type
;
3796 const bool previous_sample_available
= !!previous_sample
;
3797 bool previous_sample_result
= false;
3798 bool latest_sample_result
;
3800 condition_type
= lttng_condition_get_type(condition
);
3802 switch (condition_type
) {
3803 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3804 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3805 if (caa_likely(previous_sample_available
)) {
3806 previous_sample_result
=
3807 evaluate_buffer_usage_condition(condition
,
3808 previous_sample
, channel_info
->capacity
);
3810 latest_sample_result
= evaluate_buffer_usage_condition(
3811 condition
, latest_sample
,
3812 channel_info
->capacity
);
3814 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3815 if (caa_likely(previous_sample_available
)) {
3816 previous_sample_result
=
3817 evaluate_session_consumed_size_condition(
3819 previous_session_consumed_total
);
3821 latest_sample_result
=
3822 evaluate_session_consumed_size_condition(
3824 latest_session_consumed_total
);
3827 /* Unknown condition type; internal error. */
3831 if (!latest_sample_result
||
3832 (previous_sample_result
== latest_sample_result
)) {
3834 * Only trigger on a condition evaluation transition.
3836 * NOTE: This edge-triggered logic may not be appropriate for
3837 * future condition types.
3842 if (!evaluation
|| !latest_sample_result
) {
3846 switch (condition_type
) {
3847 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3848 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3849 *evaluation
= lttng_evaluation_buffer_usage_create(
3851 latest_sample
->highest_usage
,
3852 channel_info
->capacity
);
3854 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3855 *evaluation
= lttng_evaluation_session_consumed_size_create(
3856 latest_session_consumed_total
);
3871 int client_notification_overflow(struct notification_client
*client
)
3874 const struct lttng_notification_channel_message msg
= {
3875 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
3878 ASSERT_LOCKED(client
->lock
);
3880 DBG("Dropping notification addressed to client (socket fd = %i)",
3882 if (client
->communication
.outbound
.dropped_notification
) {
3884 * The client already has a "notification dropped" message
3885 * in its outgoing queue. Nothing to do since all
3886 * of those messages are coalesced.
3891 client
->communication
.outbound
.dropped_notification
= true;
3892 ret
= lttng_dynamic_buffer_append(
3893 &client
->communication
.outbound
.buffer
, &msg
,
3896 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3903 static int client_handle_transmission_status_wrapper(
3904 struct notification_client
*client
,
3905 enum client_transmission_status status
,
3908 return client_handle_transmission_status(client
, status
,
3909 (struct notification_thread_state
*) user_data
);
3913 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
3914 const struct lttng_evaluation
*evaluation
,
3915 struct notification_client_list
* client_list
,
3916 struct notification_thread_state
*state
,
3917 uid_t object_uid
, gid_t object_gid
)
3919 return notification_client_list_send_evaluation(client_list
,
3920 lttng_trigger_get_const_condition(trigger
), evaluation
,
3921 lttng_trigger_get_credentials(trigger
),
3922 &(struct lttng_credentials
){
3923 .uid
= object_uid
, .gid
= object_gid
},
3924 client_handle_transmission_status_wrapper
, state
);
3928 int notification_client_list_send_evaluation(
3929 struct notification_client_list
*client_list
,
3930 const struct lttng_condition
*condition
,
3931 const struct lttng_evaluation
*evaluation
,
3932 const struct lttng_credentials
*trigger_creds
,
3933 const struct lttng_credentials
*source_object_creds
,
3934 report_client_transmission_result_cb client_report
,
3938 struct lttng_dynamic_buffer msg_buffer
;
3939 struct notification_client_list_element
*client_list_element
, *tmp
;
3940 const struct lttng_notification notification
= {
3941 .condition
= (struct lttng_condition
*) condition
,
3942 .evaluation
= (struct lttng_evaluation
*) evaluation
,
3944 struct lttng_notification_channel_message msg_header
= {
3945 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
3948 lttng_dynamic_buffer_init(&msg_buffer
);
3950 ret
= lttng_dynamic_buffer_append(&msg_buffer
, &msg_header
,
3951 sizeof(msg_header
));
3956 ret
= lttng_notification_serialize(¬ification
, &msg_buffer
);
3958 ERR("[notification-thread] Failed to serialize notification");
3963 /* Update payload size. */
3964 ((struct lttng_notification_channel_message
* ) msg_buffer
.data
)->size
=
3965 (uint32_t) (msg_buffer
.size
- sizeof(msg_header
));
3967 pthread_mutex_lock(&client_list
->lock
);
3968 cds_list_for_each_entry_safe(client_list_element
, tmp
,
3969 &client_list
->list
, node
) {
3970 enum client_transmission_status transmission_status
;
3971 struct notification_client
*client
=
3972 client_list_element
->client
;
3975 pthread_mutex_lock(&client
->lock
);
3976 if (source_object_creds
) {
3977 if (client
->uid
!= source_object_creds
->uid
&&
3978 client
->gid
!= source_object_creds
->gid
&&
3981 * Client is not allowed to monitor this
3984 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3989 /* TODO: what is the behavior for root client on non root
3990 * trigger? Since multiple triggers (different user) can have the same condition
3991 * but with different action group that can have each a notify.
3992 * Does the root client receive multiple notification for all
3993 * those triggers with the same condition or only notification
3994 * for triggers the root user configured?
3995 * For now we do the later. All users including the root user
3996 * can only receive notification from trigger it registered.
3998 if (client
->uid
!= trigger_creds
->uid
&& client
->gid
!= trigger_creds
->gid
) {
3999 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
4003 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
4004 client
->socket
, msg_buffer
.size
);
4005 if (client
->communication
.outbound
.buffer
.size
) {
4007 * Outgoing data is already buffered for this client;
4008 * drop the notification and enqueue a "dropped
4009 * notification" message if this is the first dropped
4010 * notification since the socket spilled-over to the
4013 ret
= client_notification_overflow(client
);
4019 ret
= lttng_dynamic_buffer_append_buffer(
4020 &client
->communication
.outbound
.buffer
,
4026 transmission_status
= client_flush_outgoing_queue(client
);
4027 ret
= client_report(client
, transmission_status
, user_data
);
4032 pthread_mutex_unlock(&client
->lock
);
4034 goto end_unlock_list
;
4040 pthread_mutex_unlock(&client_list
->lock
);
4042 lttng_dynamic_buffer_reset(&msg_buffer
);
4046 int handle_notification_thread_event(struct notification_thread_state
*state
,
4048 enum lttng_domain_type domain
)
4051 struct lttng_ust_trigger_notification ust_notification
;
4052 uint64_t kernel_notification
;
4053 struct cds_lfht_node
*node
;
4054 struct cds_lfht_iter iter
;
4055 struct notification_trigger_tokens_ht_element
*element
;
4056 struct lttng_trigger_notification notification
;
4057 void *reception_buffer
;
4058 size_t reception_size
;
4059 enum action_executor_status executor_status
;
4060 struct notification_client_list
*client_list
= NULL
;
4062 notification
.type
= domain
;
4065 case LTTNG_DOMAIN_UST
:
4066 reception_buffer
= (void *) &ust_notification
;
4067 reception_size
= sizeof(ust_notification
);
4068 notification
.u
.ust
= &ust_notification
;
4070 case LTTNG_DOMAIN_KERNEL
:
4071 reception_buffer
= (void *) &kernel_notification
;
4072 reception_size
= sizeof(kernel_notification
);
4073 notification
.u
.kernel
= &kernel_notification
;
4080 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4081 * ensuring that read/write of sampling messages are atomic.
4083 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4085 ret
= lttng_read(pipe
, reception_buffer
, reception_size
);
4086 if (ret
!= reception_size
) {
4087 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4089 /* TODO: Should this error out completly.
4090 * This can happen when an app is killed as of today
4091 * ret = -1 cause the whole thread to die and fuck up
4098 case LTTNG_DOMAIN_UST
:
4099 notification
.id
= ust_notification
.id
;
4101 case LTTNG_DOMAIN_KERNEL
:
4102 notification
.id
= kernel_notification
;
4108 /* Find triggers associated with this token. */
4110 cds_lfht_lookup(state
->trigger_tokens_ht
,
4111 hash_key_u64(¬ification
.id
, lttng_ht_seed
), match_trigger_token
,
4112 ¬ification
.id
, &iter
);
4113 node
= cds_lfht_iter_get_node(&iter
);
4114 if (caa_unlikely(!node
)) {
4115 /* TODO: is this an error? This might happen if the receive side
4116 * is slow to process event from source and that the trigger was
4117 * removed but the app still kicking. This yield another
4118 * question on the trigger lifetime and when we can remove a
4119 * trigger. How to guarantee that all event with the token idea
4120 * have be processed? Do we want to provide this guarantee?
4122 * Update: I have encountered this when using a trigger on
4123 * sched_switch and then removing it. The frequency is quite
4124 * high hence we en up exactly in the mentionned scenario.
4125 * AFAIK this might be the best way to handle this.
4130 element
= caa_container_of(node
,
4131 struct notification_trigger_tokens_ht_element
,
4134 if (!lttng_trigger_is_ready_to_fire(element
->trigger
)) {
4139 client_list
= get_client_list_from_condition(state
,
4140 lttng_trigger_get_const_condition(element
->trigger
));
4141 executor_status
= action_executor_enqueue(
4142 state
->executor
, element
->trigger
, client_list
);
4143 switch (executor_status
) {
4144 case ACTION_EXECUTOR_STATUS_OK
:
4147 case ACTION_EXECUTOR_STATUS_OVERFLOW
:
4149 struct notification_client_list_element
*client_list_element
,
4153 * Not a fatal error; this is expected and simply means the
4154 * executor has too much work queued already.
4162 /* Warn clients that a notification (or more) was dropped. */
4163 pthread_mutex_lock(&client_list
->lock
);
4164 cds_list_for_each_entry_safe(client_list_element
, tmp
,
4165 &client_list
->list
, node
) {
4166 enum client_transmission_status transmission_status
;
4167 struct notification_client
*client
=
4168 client_list_element
->client
;
4170 pthread_mutex_lock(&client
->lock
);
4171 ret
= client_notification_overflow(client
);
4177 transmission_status
=
4178 client_flush_outgoing_queue(client
);
4179 ret
= client_handle_transmission_status(
4180 client
, transmission_status
, state
);
4186 pthread_mutex_unlock(&client
->lock
);
4191 pthread_mutex_lock(&client_list
->lock
);
4194 case ACTION_EXECUTOR_STATUS_ERROR
:
4195 /* Fatal error, shut down everything. */
4196 ERR("Fatal error encoutered while enqueuing action");
4200 /* Unhandled error. */
4205 notification_client_list_put(client_list
);
4211 int handle_notification_thread_channel_sample(
4212 struct notification_thread_state
*state
, int pipe
,
4213 enum lttng_domain_type domain
)
4216 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
4217 struct channel_info
*channel_info
;
4218 struct cds_lfht_node
*node
;
4219 struct cds_lfht_iter iter
;
4220 struct lttng_channel_trigger_list
*trigger_list
;
4221 struct lttng_trigger_list_element
*trigger_list_element
;
4222 bool previous_sample_available
= false;
4223 struct channel_state_sample previous_sample
, latest_sample
;
4224 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
4227 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4228 * ensuring that read/write of sampling messages are atomic.
4230 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
4231 if (ret
!= sizeof(sample_msg
)) {
4232 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4239 latest_sample
.key
.key
= sample_msg
.key
;
4240 latest_sample
.key
.domain
= domain
;
4241 latest_sample
.highest_usage
= sample_msg
.highest
;
4242 latest_sample
.lowest_usage
= sample_msg
.lowest
;
4243 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
4247 /* Retrieve the channel's informations */
4248 cds_lfht_lookup(state
->channels_ht
,
4249 hash_channel_key(&latest_sample
.key
),
4253 node
= cds_lfht_iter_get_node(&iter
);
4254 if (caa_unlikely(!node
)) {
4256 * Not an error since the consumer can push a sample to the pipe
4257 * and the rest of the session daemon could notify us of the
4258 * channel's destruction before we get a chance to process that
4261 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
4262 latest_sample
.key
.key
,
4263 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
4267 channel_info
= caa_container_of(node
, struct channel_info
,
4269 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
4271 latest_sample
.key
.key
,
4272 channel_info
->session_info
->name
,
4273 latest_sample
.highest_usage
,
4274 latest_sample
.lowest_usage
,
4275 latest_sample
.channel_total_consumed
);
4277 previous_session_consumed_total
=
4278 channel_info
->session_info
->consumed_data_size
;
4280 /* Retrieve the channel's last sample, if it exists, and update it. */
4281 cds_lfht_lookup(state
->channel_state_ht
,
4282 hash_channel_key(&latest_sample
.key
),
4283 match_channel_state_sample
,
4286 node
= cds_lfht_iter_get_node(&iter
);
4287 if (caa_likely(node
)) {
4288 struct channel_state_sample
*stored_sample
;
4290 /* Update the sample stored. */
4291 stored_sample
= caa_container_of(node
,
4292 struct channel_state_sample
,
4293 channel_state_ht_node
);
4295 memcpy(&previous_sample
, stored_sample
,
4296 sizeof(previous_sample
));
4297 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
4298 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
4299 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
4300 previous_sample_available
= true;
4302 latest_session_consumed_total
=
4303 previous_session_consumed_total
+
4304 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
4307 * This is the channel's first sample, allocate space for and
4308 * store the new sample.
4310 struct channel_state_sample
*stored_sample
;
4312 stored_sample
= zmalloc(sizeof(*stored_sample
));
4313 if (!stored_sample
) {
4318 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
4319 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
4320 cds_lfht_add(state
->channel_state_ht
,
4321 hash_channel_key(&stored_sample
->key
),
4322 &stored_sample
->channel_state_ht_node
);
4324 latest_session_consumed_total
=
4325 previous_session_consumed_total
+
4326 latest_sample
.channel_total_consumed
;
4329 channel_info
->session_info
->consumed_data_size
=
4330 latest_session_consumed_total
;
4332 /* Find triggers associated with this channel. */
4333 cds_lfht_lookup(state
->channel_triggers_ht
,
4334 hash_channel_key(&latest_sample
.key
),
4335 match_channel_trigger_list
,
4338 node
= cds_lfht_iter_get_node(&iter
);
4339 if (caa_likely(!node
)) {
4343 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
4344 channel_triggers_ht_node
);
4345 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
4347 const struct lttng_condition
*condition
;
4348 const struct lttng_action
*action
;
4349 struct lttng_trigger
*trigger
;
4350 struct notification_client_list
*client_list
= NULL
;
4351 struct lttng_evaluation
*evaluation
= NULL
;
4352 bool client_list_is_empty
;
4355 trigger
= trigger_list_element
->trigger
;
4356 condition
= lttng_trigger_get_const_condition(trigger
);
4358 action
= lttng_trigger_get_const_action(trigger
);
4360 if (!lttng_trigger_is_ready_to_fire(trigger
)) {
4364 /* Notify actions are the only type currently supported. */
4365 /* TODO support other type of action */
4366 assert(lttng_action_get_type_const(action
) ==
4367 LTTNG_ACTION_TYPE_NOTIFY
);
4370 * Check if any client is subscribed to the result of this
4373 client_list
= get_client_list_from_condition(state
, condition
);
4374 assert(client_list
);
4375 client_list_is_empty
= cds_list_empty(&client_list
->list
);
4376 if (client_list_is_empty
) {
4378 * No clients interested in the evaluation's result,
4384 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
4385 previous_sample_available
? &previous_sample
: NULL
,
4387 previous_session_consumed_total
,
4388 latest_session_consumed_total
,
4390 if (caa_unlikely(ret
)) {
4394 if (caa_likely(!evaluation
)) {
4398 /* Dispatch evaluation result to all clients. */
4399 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
4400 evaluation
, client_list
, state
,
4401 channel_info
->session_info
->uid
,
4402 channel_info
->session_info
->gid
);
4403 lttng_evaluation_destroy(evaluation
);
4405 notification_client_list_put(client_list
);
4406 if (caa_unlikely(ret
)) {