2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/action/group-internal.h>
23 #include <lttng/notification/notification-internal.h>
24 #include <lttng/condition/condition-internal.h>
25 #include <lttng/condition/buffer-usage-internal.h>
26 #include <lttng/condition/session-consumed-size-internal.h>
27 #include <lttng/condition/session-rotation-internal.h>
28 #include <lttng/condition/event-rule-internal.h>
29 #include <lttng/notification/channel-internal.h>
30 #include <lttng/trigger/trigger-internal.h>
38 #include "notification-thread.h"
39 #include "notification-thread-events.h"
40 #include "notification-thread-commands.h"
41 #include "lttng-sessiond.h"
44 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
47 enum lttng_object_type
{
48 LTTNG_OBJECT_TYPE_UNKNOWN
,
49 LTTNG_OBJECT_TYPE_NONE
,
50 LTTNG_OBJECT_TYPE_CHANNEL
,
51 LTTNG_OBJECT_TYPE_SESSION
,
54 struct lttng_trigger_list_element
{
55 /* No ownership of the trigger object is assumed. */
56 struct lttng_trigger
*trigger
;
57 struct cds_list_head node
;
60 struct lttng_channel_trigger_list
{
61 struct channel_key channel_key
;
62 /* List of struct lttng_trigger_list_element. */
63 struct cds_list_head list
;
64 /* Node in the channel_triggers_ht */
65 struct cds_lfht_node channel_triggers_ht_node
;
66 /* call_rcu delayed reclaim. */
67 struct rcu_head rcu_node
;
71 * List of triggers applying to a given session.
74 * - lttng_session_trigger_list_create()
75 * - lttng_session_trigger_list_build()
76 * - lttng_session_trigger_list_destroy()
77 * - lttng_session_trigger_list_add()
79 struct lttng_session_trigger_list
{
81 * Not owned by this; points to the session_info structure's
84 const char *session_name
;
85 /* List of struct lttng_trigger_list_element. */
86 struct cds_list_head list
;
87 /* Node in the session_triggers_ht */
88 struct cds_lfht_node session_triggers_ht_node
;
90 * Weak reference to the notification system's session triggers
93 * The session trigger list structure structure is owned by
94 * the session's session_info.
96 * The session_info is kept alive the the channel_infos holding a
97 * reference to it (reference counting). When those channels are
98 * destroyed (at runtime or on teardown), the reference they hold
99 * to the session_info are released. On destruction of session_info,
100 * session_info_destroy() will remove the list of triggers applying
101 * to this session from the notification system's state.
103 * This implies that the session_triggers_ht must be destroyed
104 * after the channels.
106 struct cds_lfht
*session_triggers_ht
;
107 /* Used for delayed RCU reclaim. */
108 struct rcu_head rcu_node
;
111 struct lttng_trigger_ht_element
{
112 struct lttng_trigger
*trigger
;
113 struct cds_lfht_node node
;
114 struct cds_lfht_node node_by_name
;
115 /* call_rcu delayed reclaim. */
116 struct rcu_head rcu_node
;
119 struct lttng_condition_list_element
{
120 struct lttng_condition
*condition
;
121 struct cds_list_head node
;
125 * Facilities to carry the different notifications type in the action processing
128 struct lttng_trigger_notification
{
130 struct lttng_ust_trigger_notification
*ust
;
134 enum lttng_domain_type type
;
137 struct channel_state_sample
{
138 struct channel_key key
;
139 struct cds_lfht_node channel_state_ht_node
;
140 uint64_t highest_usage
;
141 uint64_t lowest_usage
;
142 uint64_t channel_total_consumed
;
143 /* call_rcu delayed reclaim. */
144 struct rcu_head rcu_node
;
147 static unsigned long hash_channel_key(struct channel_key
*key
);
148 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
149 struct lttng_evaluation
**evaluation
,
150 const struct notification_thread_state
*state
,
151 const struct channel_state_sample
*previous_sample
,
152 const struct channel_state_sample
*latest_sample
,
153 uint64_t previous_session_consumed_total
,
154 uint64_t latest_session_consumed_total
,
155 struct channel_info
*channel_info
);
157 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
158 const struct lttng_evaluation
*evaluation
,
159 struct notification_client_list
*client_list
,
160 struct notification_thread_state
*state
,
161 uid_t channel_uid
, gid_t channel_gid
);
164 /* session_info API */
166 void session_info_destroy(void *_data
);
168 void session_info_get(struct session_info
*session_info
);
170 void session_info_put(struct session_info
*session_info
);
172 struct session_info
*session_info_create(const char *name
,
173 uid_t uid
, gid_t gid
,
174 struct lttng_session_trigger_list
*trigger_list
,
175 struct cds_lfht
*sessions_ht
);
177 void session_info_add_channel(struct session_info
*session_info
,
178 struct channel_info
*channel_info
);
180 void session_info_remove_channel(struct session_info
*session_info
,
181 struct channel_info
*channel_info
);
183 /* lttng_session_trigger_list API */
185 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
186 const char *session_name
,
187 struct cds_lfht
*session_triggers_ht
);
189 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
190 const struct notification_thread_state
*state
,
191 const char *session_name
);
193 void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list
*list
);
196 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
197 struct lttng_trigger
*trigger
);
201 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
203 /* This double-cast is intended to supress pointer-to-cast warning. */
204 const int socket
= (int) (intptr_t) key
;
205 const struct notification_client
*client
= caa_container_of(node
,
206 struct notification_client
, client_socket_ht_node
);
208 return client
->socket
== socket
;
212 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
214 /* This double-cast is intended to supress pointer-to-cast warning. */
215 const notification_client_id id
= *((notification_client_id
*) key
);
216 const struct notification_client
*client
= caa_container_of(
217 node
, struct notification_client
, client_id_ht_node
);
219 return client
->id
== id
;
223 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
225 struct channel_key
*channel_key
= (struct channel_key
*) key
;
226 struct lttng_channel_trigger_list
*trigger_list
;
228 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
229 channel_triggers_ht_node
);
231 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
232 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
236 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
238 const char *session_name
= (const char *) key
;
239 struct lttng_session_trigger_list
*trigger_list
;
241 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
242 session_triggers_ht_node
);
244 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
248 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
250 struct channel_key
*channel_key
= (struct channel_key
*) key
;
251 struct channel_state_sample
*sample
;
253 sample
= caa_container_of(node
, struct channel_state_sample
,
254 channel_state_ht_node
);
256 return !!((channel_key
->key
== sample
->key
.key
) &&
257 (channel_key
->domain
== sample
->key
.domain
));
261 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
263 struct channel_key
*channel_key
= (struct channel_key
*) key
;
264 struct channel_info
*channel_info
;
266 channel_info
= caa_container_of(node
, struct channel_info
,
269 return !!((channel_key
->key
== channel_info
->key
.key
) &&
270 (channel_key
->domain
== channel_info
->key
.domain
));
274 int match_trigger(struct cds_lfht_node
*node
, const void *key
)
277 struct lttng_trigger
*trigger_key
= (struct lttng_trigger
*) key
;
278 struct lttng_trigger_ht_element
*trigger_ht_element
;
279 const struct lttng_credentials
*creds_key
;
280 const struct lttng_credentials
*creds_node
;
282 trigger_ht_element
= caa_container_of(node
, struct lttng_trigger_ht_element
,
285 match
= lttng_trigger_is_equal(trigger_key
, trigger_ht_element
->trigger
);
290 /* Validate credential */
291 /* TODO: this could be moved to lttng_trigger_equal depending on how we
292 * handle root behaviour on disable and listing.
294 creds_key
= lttng_trigger_get_credentials(trigger_key
);
295 creds_node
= lttng_trigger_get_credentials(trigger_ht_element
->trigger
);
296 match
= lttng_credentials_is_equal(creds_key
, creds_node
);
302 int match_trigger_token(struct cds_lfht_node
*node
, const void *key
)
304 const uint64_t *_key
= key
;
305 struct notification_trigger_tokens_ht_element
*element
;
307 element
= caa_container_of(node
, struct notification_trigger_tokens_ht_element
,
309 return *_key
== element
->token
;
313 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
315 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
316 struct notification_client_list
*client_list
;
317 const struct lttng_condition
*condition
;
319 assert(condition_key
);
321 client_list
= caa_container_of(node
, struct notification_client_list
,
322 notification_trigger_clients_ht_node
);
323 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
325 return !!lttng_condition_is_equal(condition_key
, condition
);
329 int match_session(struct cds_lfht_node
*node
, const void *key
)
331 const char *name
= key
;
332 struct session_info
*session_info
= caa_container_of(
333 node
, struct session_info
, sessions_ht_node
);
335 return !strcmp(session_info
->name
, name
);
339 * Match function for string node.
341 static int match_str(struct cds_lfht_node
*node
, const void *key
)
343 struct lttng_trigger_ht_element
*trigger_ht_element
;
346 trigger_ht_element
= caa_container_of(node
, struct lttng_trigger_ht_element
,
349 /* TODO error checking */
350 lttng_trigger_get_name(trigger_ht_element
->trigger
, &name
);
352 return hash_match_key_str(name
, (void *) key
);
356 unsigned long lttng_condition_buffer_usage_hash(
357 const struct lttng_condition
*_condition
)
360 unsigned long condition_type
;
361 struct lttng_condition_buffer_usage
*condition
;
363 condition
= container_of(_condition
,
364 struct lttng_condition_buffer_usage
, parent
);
366 condition_type
= (unsigned long) condition
->parent
.type
;
367 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
368 if (condition
->session_name
) {
369 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
371 if (condition
->channel_name
) {
372 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
374 if (condition
->domain
.set
) {
375 hash
^= hash_key_ulong(
376 (void *) condition
->domain
.type
,
379 if (condition
->threshold_ratio
.set
) {
382 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
383 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
384 } else if (condition
->threshold_bytes
.set
) {
387 val
= condition
->threshold_bytes
.value
;
388 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
394 unsigned long lttng_condition_session_consumed_size_hash(
395 const struct lttng_condition
*_condition
)
398 unsigned long condition_type
=
399 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
400 struct lttng_condition_session_consumed_size
*condition
;
403 condition
= container_of(_condition
,
404 struct lttng_condition_session_consumed_size
, parent
);
406 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
407 if (condition
->session_name
) {
408 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
410 val
= condition
->consumed_threshold_bytes
.value
;
411 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
416 unsigned long lttng_condition_session_rotation_hash(
417 const struct lttng_condition
*_condition
)
419 unsigned long hash
, condition_type
;
420 struct lttng_condition_session_rotation
*condition
;
422 condition
= container_of(_condition
,
423 struct lttng_condition_session_rotation
, parent
);
424 condition_type
= (unsigned long) condition
->parent
.type
;
425 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
426 assert(condition
->session_name
);
427 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
432 unsigned long lttng_condition_event_rule_hash(
433 const struct lttng_condition
*_condition
)
435 unsigned long hash
, condition_type
;
436 struct lttng_condition_event_rule
*condition
;
438 condition
= container_of(_condition
,
439 struct lttng_condition_event_rule
, parent
);
440 condition_type
= (unsigned long) condition
->parent
.type
;
441 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
443 /* TODO: further hasg using the event rule? on pattern maybe?*/
448 * The lttng_condition hashing code is kept in this file (rather than
449 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
450 * don't want to link in liblttng-ctl.
453 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
455 switch (condition
->type
) {
456 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
457 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
458 return lttng_condition_buffer_usage_hash(condition
);
459 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
460 return lttng_condition_session_consumed_size_hash(condition
);
461 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
462 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
463 return lttng_condition_session_rotation_hash(condition
);
464 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
465 return lttng_condition_event_rule_hash(condition
);
467 ERR("[notification-thread] Unexpected condition type caught");
473 unsigned long hash_channel_key(struct channel_key
*key
)
475 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
476 unsigned long domain_hash
= hash_key_ulong(
477 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
479 return key_hash
^ domain_hash
;
483 unsigned long hash_client_socket(int socket
)
485 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
489 unsigned long hash_client_id(notification_client_id id
)
491 return hash_key_u64(&id
, lttng_ht_seed
);
495 * Get the type of object to which a given condition applies. Bindings let
496 * the notification system evaluate a trigger's condition when a given
497 * object's state is updated.
499 * For instance, a condition bound to a channel will be evaluated everytime
500 * the channel's state is changed by a channel monitoring sample.
503 enum lttng_object_type
get_condition_binding_object(
504 const struct lttng_condition
*condition
)
506 switch (lttng_condition_get_type(condition
)) {
507 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
508 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
509 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
510 return LTTNG_OBJECT_TYPE_CHANNEL
;
511 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
512 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
513 return LTTNG_OBJECT_TYPE_SESSION
;
514 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
515 return LTTNG_OBJECT_TYPE_NONE
;
517 return LTTNG_OBJECT_TYPE_UNKNOWN
;
522 void free_channel_info_rcu(struct rcu_head
*node
)
524 free(caa_container_of(node
, struct channel_info
, rcu_node
));
528 void channel_info_destroy(struct channel_info
*channel_info
)
534 if (channel_info
->session_info
) {
535 session_info_remove_channel(channel_info
->session_info
,
537 session_info_put(channel_info
->session_info
);
539 if (channel_info
->name
) {
540 free(channel_info
->name
);
542 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
546 void free_session_info_rcu(struct rcu_head
*node
)
548 free(caa_container_of(node
, struct session_info
, rcu_node
));
551 /* Don't call directly, use the ref-counting mechanism. */
553 void session_info_destroy(void *_data
)
555 struct session_info
*session_info
= _data
;
558 assert(session_info
);
559 if (session_info
->channel_infos_ht
) {
560 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
562 ERR("[notification-thread] Failed to destroy channel information hash table");
565 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
568 cds_lfht_del(session_info
->sessions_ht
,
569 &session_info
->sessions_ht_node
);
571 free(session_info
->name
);
572 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
576 void session_info_get(struct session_info
*session_info
)
581 lttng_ref_get(&session_info
->ref
);
585 void session_info_put(struct session_info
*session_info
)
590 lttng_ref_put(&session_info
->ref
);
594 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
595 struct lttng_session_trigger_list
*trigger_list
,
596 struct cds_lfht
*sessions_ht
)
598 struct session_info
*session_info
;
602 session_info
= zmalloc(sizeof(*session_info
));
606 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
608 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
609 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
610 if (!session_info
->channel_infos_ht
) {
614 cds_lfht_node_init(&session_info
->sessions_ht_node
);
615 session_info
->name
= strdup(name
);
616 if (!session_info
->name
) {
619 session_info
->uid
= uid
;
620 session_info
->gid
= gid
;
621 session_info
->trigger_list
= trigger_list
;
622 session_info
->sessions_ht
= sessions_ht
;
626 session_info_put(session_info
);
631 void session_info_add_channel(struct session_info
*session_info
,
632 struct channel_info
*channel_info
)
635 cds_lfht_add(session_info
->channel_infos_ht
,
636 hash_channel_key(&channel_info
->key
),
637 &channel_info
->session_info_channels_ht_node
);
642 void session_info_remove_channel(struct session_info
*session_info
,
643 struct channel_info
*channel_info
)
646 cds_lfht_del(session_info
->channel_infos_ht
,
647 &channel_info
->session_info_channels_ht_node
);
652 struct channel_info
*channel_info_create(const char *channel_name
,
653 struct channel_key
*channel_key
, uint64_t channel_capacity
,
654 struct session_info
*session_info
)
656 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
662 cds_lfht_node_init(&channel_info
->channels_ht_node
);
663 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
664 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
665 channel_info
->capacity
= channel_capacity
;
667 channel_info
->name
= strdup(channel_name
);
668 if (!channel_info
->name
) {
673 * Set the references between session and channel infos:
674 * - channel_info holds a strong reference to session_info
675 * - session_info holds a weak reference to channel_info
677 session_info_get(session_info
);
678 session_info_add_channel(session_info
, channel_info
);
679 channel_info
->session_info
= session_info
;
683 channel_info_destroy(channel_info
);
688 bool notification_client_list_get(struct notification_client_list
*list
)
690 return urcu_ref_get_unless_zero(&list
->ref
);
694 void free_notification_client_list_rcu(struct rcu_head
*node
)
696 free(caa_container_of(node
, struct notification_client_list
,
701 void notification_client_list_release(struct urcu_ref
*list_ref
)
703 struct notification_client_list
*list
=
704 container_of(list_ref
, typeof(*list
), ref
);
705 struct notification_client_list_element
*client_list_element
, *tmp
;
707 if (list
->notification_trigger_clients_ht
) {
709 cds_lfht_del(list
->notification_trigger_clients_ht
,
710 &list
->notification_trigger_clients_ht_node
);
712 list
->notification_trigger_clients_ht
= NULL
;
714 cds_list_for_each_entry_safe(client_list_element
, tmp
,
716 free(client_list_element
);
718 pthread_mutex_destroy(&list
->lock
);
719 call_rcu(&list
->rcu_node
, free_notification_client_list_rcu
);
723 struct notification_client_list
*notification_client_list_create(
724 const struct lttng_trigger
*trigger
)
726 struct notification_client_list
*client_list
=
727 zmalloc(sizeof(*client_list
));
732 pthread_mutex_init(&client_list
->lock
, NULL
);
733 urcu_ref_init(&client_list
->ref
);
734 cds_lfht_node_init(&client_list
->notification_trigger_clients_ht_node
);
735 CDS_INIT_LIST_HEAD(&client_list
->list
);
736 client_list
->trigger
= trigger
;
742 void publish_notification_client_list(
743 struct notification_thread_state
*state
,
744 struct notification_client_list
*list
)
746 const struct lttng_condition
*condition
=
747 lttng_trigger_get_const_condition(list
->trigger
);
749 assert(!list
->notification_trigger_clients_ht
);
751 list
->notification_trigger_clients_ht
=
752 state
->notification_trigger_clients_ht
;
755 cds_lfht_add(state
->notification_trigger_clients_ht
,
756 lttng_condition_hash(condition
),
757 &list
->notification_trigger_clients_ht_node
);
762 void notification_client_list_put(struct notification_client_list
*list
)
767 return urcu_ref_put(&list
->ref
, notification_client_list_release
);
770 /* Provides a reference to the returned list. */
772 struct notification_client_list
*get_client_list_from_condition(
773 struct notification_thread_state
*state
,
774 const struct lttng_condition
*condition
)
776 struct cds_lfht_node
*node
;
777 struct cds_lfht_iter iter
;
778 struct notification_client_list
*list
= NULL
;
781 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
782 lttng_condition_hash(condition
),
783 match_client_list_condition
,
786 node
= cds_lfht_iter_get_node(&iter
);
788 list
= container_of(node
, struct notification_client_list
,
789 notification_trigger_clients_ht_node
);
790 list
= notification_client_list_get(list
) ? list
: NULL
;
797 int evaluate_channel_condition_for_client(
798 const struct lttng_condition
*condition
,
799 struct notification_thread_state
*state
,
800 struct lttng_evaluation
**evaluation
,
801 uid_t
*session_uid
, gid_t
*session_gid
)
804 struct cds_lfht_iter iter
;
805 struct cds_lfht_node
*node
;
806 struct channel_info
*channel_info
= NULL
;
807 struct channel_key
*channel_key
= NULL
;
808 struct channel_state_sample
*last_sample
= NULL
;
809 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
813 /* Find the channel associated with the condition. */
814 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
815 channel_trigger_list
, channel_triggers_ht_node
) {
816 struct lttng_trigger_list_element
*element
;
818 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
819 const struct lttng_condition
*current_condition
=
820 lttng_trigger_get_const_condition(
823 assert(current_condition
);
824 if (!lttng_condition_is_equal(condition
,
825 current_condition
)) {
829 /* Found the trigger, save the channel key. */
830 channel_key
= &channel_trigger_list
->channel_key
;
834 /* The channel key was found stop iteration. */
840 /* No channel found; normal exit. */
841 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
846 /* Fetch channel info for the matching channel. */
847 cds_lfht_lookup(state
->channels_ht
,
848 hash_channel_key(channel_key
),
852 node
= cds_lfht_iter_get_node(&iter
);
854 channel_info
= caa_container_of(node
, struct channel_info
,
857 /* Retrieve the channel's last sample, if it exists. */
858 cds_lfht_lookup(state
->channel_state_ht
,
859 hash_channel_key(channel_key
),
860 match_channel_state_sample
,
863 node
= cds_lfht_iter_get_node(&iter
);
865 last_sample
= caa_container_of(node
,
866 struct channel_state_sample
,
867 channel_state_ht_node
);
869 /* Nothing to evaluate, no sample was ever taken. Normal exit */
870 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
875 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
877 0, channel_info
->session_info
->consumed_data_size
,
880 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
884 *session_uid
= channel_info
->session_info
->uid
;
885 *session_gid
= channel_info
->session_info
->gid
;
892 const char *get_condition_session_name(const struct lttng_condition
*condition
)
894 const char *session_name
= NULL
;
895 enum lttng_condition_status status
;
897 switch (lttng_condition_get_type(condition
)) {
898 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
899 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
900 status
= lttng_condition_buffer_usage_get_session_name(
901 condition
, &session_name
);
903 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
904 status
= lttng_condition_session_consumed_size_get_session_name(
905 condition
, &session_name
);
907 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
908 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
909 status
= lttng_condition_session_rotation_get_session_name(
910 condition
, &session_name
);
915 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
916 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
924 int evaluate_session_condition_for_client(
925 const struct lttng_condition
*condition
,
926 struct notification_thread_state
*state
,
927 struct lttng_evaluation
**evaluation
,
928 uid_t
*session_uid
, gid_t
*session_gid
)
931 struct cds_lfht_iter iter
;
932 struct cds_lfht_node
*node
;
933 const char *session_name
;
934 struct session_info
*session_info
= NULL
;
937 session_name
= get_condition_session_name(condition
);
939 /* Find the session associated with the trigger. */
940 cds_lfht_lookup(state
->sessions_ht
,
941 hash_key_str(session_name
, lttng_ht_seed
),
945 node
= cds_lfht_iter_get_node(&iter
);
947 DBG("[notification-thread] No known session matching name \"%s\"",
953 session_info
= caa_container_of(node
, struct session_info
,
955 session_info_get(session_info
);
958 * Evaluation is performed in-line here since only one type of
959 * session-bound condition is handled for the moment.
961 switch (lttng_condition_get_type(condition
)) {
962 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
963 if (!session_info
->rotation
.ongoing
) {
965 goto end_session_put
;
968 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
969 session_info
->rotation
.id
);
972 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
975 goto end_session_put
;
981 goto end_session_put
;
984 *session_uid
= session_info
->uid
;
985 *session_gid
= session_info
->gid
;
988 session_info_put(session_info
);
995 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
996 const struct lttng_condition
*condition
,
997 struct notification_client
*client
,
998 struct notification_thread_state
*state
)
1001 struct lttng_evaluation
*evaluation
= NULL
;
1002 struct notification_client_list client_list
= {
1003 .lock
= PTHREAD_MUTEX_INITIALIZER
,
1005 struct notification_client_list_element client_list_element
= { 0 };
1006 uid_t object_uid
= 0;
1007 gid_t object_gid
= 0;
1014 switch (get_condition_binding_object(condition
)) {
1015 case LTTNG_OBJECT_TYPE_SESSION
:
1016 ret
= evaluate_session_condition_for_client(condition
, state
,
1017 &evaluation
, &object_uid
, &object_gid
);
1019 case LTTNG_OBJECT_TYPE_CHANNEL
:
1020 ret
= evaluate_channel_condition_for_client(condition
, state
,
1021 &evaluation
, &object_uid
, &object_gid
);
1023 case LTTNG_OBJECT_TYPE_NONE
:
1024 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
1027 case LTTNG_OBJECT_TYPE_UNKNOWN
:
1037 /* Evaluation yielded nothing. Normal exit. */
1038 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1044 * Create a temporary client list with the client currently
1047 cds_lfht_node_init(&client_list
.notification_trigger_clients_ht_node
);
1048 CDS_INIT_LIST_HEAD(&client_list
.list
);
1049 client_list
.trigger
= trigger
;
1051 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
1052 client_list_element
.client
= client
;
1053 cds_list_add(&client_list_element
.node
, &client_list
.list
);
1055 /* Send evaluation result to the newly-subscribed client. */
1056 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1057 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
1058 state
, object_uid
, object_gid
);
1065 int notification_thread_client_subscribe(struct notification_client
*client
,
1066 struct lttng_condition
*condition
,
1067 struct notification_thread_state
*state
,
1068 enum lttng_notification_channel_status
*_status
)
1071 struct notification_client_list
*client_list
= NULL
;
1072 struct lttng_condition_list_element
*condition_list_element
= NULL
;
1073 struct notification_client_list_element
*client_list_element
= NULL
;
1074 enum lttng_notification_channel_status status
=
1075 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1078 * Ensure that the client has not already subscribed to this condition
1081 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1082 if (lttng_condition_is_equal(condition_list_element
->condition
,
1084 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1089 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1090 if (!condition_list_element
) {
1094 client_list_element
= zmalloc(sizeof(*client_list_element
));
1095 if (!client_list_element
) {
1101 * Add the newly-subscribed condition to the client's subscription list.
1103 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1104 condition_list_element
->condition
= condition
;
1105 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1107 client_list
= get_client_list_from_condition(state
, condition
);
1110 * No notification-emiting trigger registered with this
1111 * condition. We don't evaluate the condition right away
1112 * since this trigger is not registered yet.
1114 free(client_list_element
);
1119 * The condition to which the client just subscribed is evaluated
1120 * at this point so that conditions that are already TRUE result
1121 * in a notification being sent out.
1123 * The client_list's trigger is used without locking the list itself.
1124 * This is correct since the list doesn't own the trigger and the
1125 * object is immutable.
1127 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1129 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1131 free(client_list_element
);
1136 * Add the client to the list of clients interested in a given trigger
1137 * if a "notification" trigger with a corresponding condition was
1140 client_list_element
->client
= client
;
1141 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1143 pthread_mutex_lock(&client_list
->lock
);
1144 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1145 pthread_mutex_unlock(&client_list
->lock
);
1151 notification_client_list_put(client_list
);
1155 free(condition_list_element
);
1156 free(client_list_element
);
1161 int notification_thread_client_unsubscribe(
1162 struct notification_client
*client
,
1163 struct lttng_condition
*condition
,
1164 struct notification_thread_state
*state
,
1165 enum lttng_notification_channel_status
*_status
)
1167 struct notification_client_list
*client_list
;
1168 struct lttng_condition_list_element
*condition_list_element
,
1170 struct notification_client_list_element
*client_list_element
,
1172 bool condition_found
= false;
1173 enum lttng_notification_channel_status status
=
1174 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1176 /* Remove the condition from the client's condition list. */
1177 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1178 &client
->condition_list
, node
) {
1179 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1184 cds_list_del(&condition_list_element
->node
);
1186 * The caller may be iterating on the client's conditions to
1187 * tear down a client's connection. In this case, the condition
1188 * will be destroyed at the end.
1190 if (condition
!= condition_list_element
->condition
) {
1191 lttng_condition_destroy(
1192 condition_list_element
->condition
);
1194 free(condition_list_element
);
1195 condition_found
= true;
1199 if (!condition_found
) {
1200 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1205 * Remove the client from the list of clients interested the trigger
1206 * matching the condition.
1208 client_list
= get_client_list_from_condition(state
, condition
);
1213 pthread_mutex_lock(&client_list
->lock
);
1214 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1215 &client_list
->list
, node
) {
1216 if (client_list_element
->client
->id
!= client
->id
) {
1219 cds_list_del(&client_list_element
->node
);
1220 free(client_list_element
);
1223 pthread_mutex_unlock(&client_list
->lock
);
1224 notification_client_list_put(client_list
);
1227 lttng_condition_destroy(condition
);
1235 void free_notification_client_rcu(struct rcu_head
*node
)
1237 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1241 void notification_client_destroy(struct notification_client
*client
,
1242 struct notification_thread_state
*state
)
1249 * The client object is not reachable by other threads, no need to lock
1252 if (client
->socket
>= 0) {
1253 (void) lttcomm_close_unix_sock(client
->socket
);
1254 client
->socket
= -1;
1256 client
->communication
.active
= false;
1257 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1258 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1259 pthread_mutex_destroy(&client
->lock
);
1260 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1264 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1268 struct notification_client
*get_client_from_socket(int socket
,
1269 struct notification_thread_state
*state
)
1271 struct cds_lfht_iter iter
;
1272 struct cds_lfht_node
*node
;
1273 struct notification_client
*client
= NULL
;
1275 cds_lfht_lookup(state
->client_socket_ht
,
1276 hash_client_socket(socket
),
1277 match_client_socket
,
1278 (void *) (unsigned long) socket
,
1280 node
= cds_lfht_iter_get_node(&iter
);
1285 client
= caa_container_of(node
, struct notification_client
,
1286 client_socket_ht_node
);
1292 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1296 struct notification_client
*get_client_from_id(notification_client_id id
,
1297 struct notification_thread_state
*state
)
1299 struct cds_lfht_iter iter
;
1300 struct cds_lfht_node
*node
;
1301 struct notification_client
*client
= NULL
;
1303 cds_lfht_lookup(state
->client_id_ht
,
1308 node
= cds_lfht_iter_get_node(&iter
);
1313 client
= caa_container_of(node
, struct notification_client
,
1320 bool buffer_usage_condition_applies_to_channel(
1321 const struct lttng_condition
*condition
,
1322 const struct channel_info
*channel_info
)
1324 enum lttng_condition_status status
;
1325 enum lttng_domain_type condition_domain
;
1326 const char *condition_session_name
= NULL
;
1327 const char *condition_channel_name
= NULL
;
1329 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1331 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1332 if (channel_info
->key
.domain
!= condition_domain
) {
1336 status
= lttng_condition_buffer_usage_get_session_name(
1337 condition
, &condition_session_name
);
1338 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1340 status
= lttng_condition_buffer_usage_get_channel_name(
1341 condition
, &condition_channel_name
);
1342 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1344 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1347 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1357 bool session_consumed_size_condition_applies_to_channel(
1358 const struct lttng_condition
*condition
,
1359 const struct channel_info
*channel_info
)
1361 enum lttng_condition_status status
;
1362 const char *condition_session_name
= NULL
;
1364 status
= lttng_condition_session_consumed_size_get_session_name(
1365 condition
, &condition_session_name
);
1366 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1368 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1378 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1379 const struct channel_info
*channel_info
)
1381 const struct lttng_condition
*condition
;
1382 bool trigger_applies
;
1384 condition
= lttng_trigger_get_const_condition(trigger
);
1389 switch (lttng_condition_get_type(condition
)) {
1390 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1391 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1392 trigger_applies
= buffer_usage_condition_applies_to_channel(
1393 condition
, channel_info
);
1395 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1396 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1397 condition
, channel_info
);
1403 return trigger_applies
;
1409 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1410 struct notification_client
*client
)
1412 bool applies
= false;
1413 struct lttng_condition_list_element
*condition_list_element
;
1415 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1417 applies
= lttng_condition_is_equal(
1418 condition_list_element
->condition
,
1419 lttng_trigger_get_condition(trigger
));
1427 /* Must be called with RCU read lock held. */
1429 struct lttng_session_trigger_list
*get_session_trigger_list(
1430 struct notification_thread_state
*state
,
1431 const char *session_name
)
1433 struct lttng_session_trigger_list
*list
= NULL
;
1434 struct cds_lfht_node
*node
;
1435 struct cds_lfht_iter iter
;
1437 cds_lfht_lookup(state
->session_triggers_ht
,
1438 hash_key_str(session_name
, lttng_ht_seed
),
1439 match_session_trigger_list
,
1442 node
= cds_lfht_iter_get_node(&iter
);
1445 * Not an error, the list of triggers applying to that session
1446 * will be initialized when the session is created.
1448 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1453 list
= caa_container_of(node
,
1454 struct lttng_session_trigger_list
,
1455 session_triggers_ht_node
);
1461 * Allocate an empty lttng_session_trigger_list for the session named
1464 * No ownership of 'session_name' is assumed by the session trigger list.
1465 * It is the caller's responsability to ensure the session name is alive
1466 * for as long as this list is.
1469 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1470 const char *session_name
,
1471 struct cds_lfht
*session_triggers_ht
)
1473 struct lttng_session_trigger_list
*list
;
1475 list
= zmalloc(sizeof(*list
));
1479 list
->session_name
= session_name
;
1480 CDS_INIT_LIST_HEAD(&list
->list
);
1481 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1482 list
->session_triggers_ht
= session_triggers_ht
;
1485 /* Publish the list through the session_triggers_ht. */
1486 cds_lfht_add(session_triggers_ht
,
1487 hash_key_str(session_name
, lttng_ht_seed
),
1488 &list
->session_triggers_ht_node
);
1495 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1497 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1502 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1504 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1506 /* Empty the list element by element, and then free the list itself. */
1507 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1508 &list
->list
, node
) {
1509 cds_list_del(&trigger_list_element
->node
);
1510 free(trigger_list_element
);
1513 /* Unpublish the list from the session_triggers_ht. */
1514 cds_lfht_del(list
->session_triggers_ht
,
1515 &list
->session_triggers_ht_node
);
1517 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1521 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1522 struct lttng_trigger
*trigger
)
1525 struct lttng_trigger_list_element
*new_element
=
1526 zmalloc(sizeof(*new_element
));
1532 CDS_INIT_LIST_HEAD(&new_element
->node
);
1533 new_element
->trigger
= trigger
;
1534 cds_list_add(&new_element
->node
, &list
->list
);
1540 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1541 const char *session_name
)
1543 bool applies
= false;
1544 const struct lttng_condition
*condition
;
1546 condition
= lttng_trigger_get_const_condition(trigger
);
1547 switch (lttng_condition_get_type(condition
)) {
1548 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1549 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1551 enum lttng_condition_status condition_status
;
1552 const char *condition_session_name
;
1554 condition_status
= lttng_condition_session_rotation_get_session_name(
1555 condition
, &condition_session_name
);
1556 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1557 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1561 assert(condition_session_name
);
1562 applies
= !strcmp(condition_session_name
, session_name
);
1573 * Allocate and initialize an lttng_session_trigger_list which contains
1574 * all triggers that apply to the session named 'session_name'.
1576 * No ownership of 'session_name' is assumed by the session trigger list.
1577 * It is the caller's responsability to ensure the session name is alive
1578 * for as long as this list is.
1581 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1582 const struct notification_thread_state
*state
,
1583 const char *session_name
)
1585 int trigger_count
= 0;
1586 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1587 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1588 struct cds_lfht_iter iter
;
1590 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1591 state
->session_triggers_ht
);
1593 /* Add all triggers applying to the session named 'session_name'. */
1594 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1598 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1603 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1604 trigger_ht_element
->trigger
);
1612 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1614 return session_trigger_list
;
1616 lttng_session_trigger_list_destroy(session_trigger_list
);
1621 struct session_info
*find_or_create_session_info(
1622 struct notification_thread_state
*state
,
1623 const char *name
, uid_t uid
, gid_t gid
)
1625 struct session_info
*session
= NULL
;
1626 struct cds_lfht_node
*node
;
1627 struct cds_lfht_iter iter
;
1628 struct lttng_session_trigger_list
*trigger_list
;
1631 cds_lfht_lookup(state
->sessions_ht
,
1632 hash_key_str(name
, lttng_ht_seed
),
1636 node
= cds_lfht_iter_get_node(&iter
);
1638 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1640 session
= caa_container_of(node
, struct session_info
,
1642 assert(session
->uid
== uid
);
1643 assert(session
->gid
== gid
);
1644 session_info_get(session
);
1648 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1649 if (!trigger_list
) {
1653 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1654 state
->sessions_ht
);
1656 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1658 lttng_session_trigger_list_destroy(trigger_list
);
1661 trigger_list
= NULL
;
1663 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1664 &session
->sessions_ht_node
);
1670 session_info_put(session
);
1675 int handle_notification_thread_command_add_channel(
1676 struct notification_thread_state
*state
,
1677 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1678 const char *channel_name
, enum lttng_domain_type channel_domain
,
1679 uint64_t channel_key_int
, uint64_t channel_capacity
,
1680 enum lttng_error_code
*cmd_result
)
1682 struct cds_list_head trigger_list
;
1683 struct channel_info
*new_channel_info
= NULL
;
1684 struct channel_key channel_key
= {
1685 .key
= channel_key_int
,
1686 .domain
= channel_domain
,
1688 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1689 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1690 int trigger_count
= 0;
1691 struct cds_lfht_iter iter
;
1692 struct session_info
*session_info
= NULL
;
1694 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1695 channel_name
, session_name
, channel_key_int
,
1696 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1698 CDS_INIT_LIST_HEAD(&trigger_list
);
1700 session_info
= find_or_create_session_info(state
, session_name
,
1701 session_uid
, session_gid
);
1702 if (!session_info
) {
1703 /* Allocation error or an internal error occurred. */
1707 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1708 channel_capacity
, session_info
);
1709 if (!new_channel_info
) {
1714 /* Build a list of all triggers applying to the new channel. */
1715 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1717 struct lttng_trigger_list_element
*new_element
;
1719 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1720 new_channel_info
)) {
1724 new_element
= zmalloc(sizeof(*new_element
));
1729 CDS_INIT_LIST_HEAD(&new_element
->node
);
1730 new_element
->trigger
= trigger_ht_element
->trigger
;
1731 cds_list_add(&new_element
->node
, &trigger_list
);
1736 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1738 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1739 if (!channel_trigger_list
) {
1742 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1743 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1744 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1745 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1748 /* Add channel to the channel_ht which owns the channel_infos. */
1749 cds_lfht_add(state
->channels_ht
,
1750 hash_channel_key(&new_channel_info
->key
),
1751 &new_channel_info
->channels_ht_node
);
1753 * Add the list of triggers associated with this channel to the
1754 * channel_triggers_ht.
1756 cds_lfht_add(state
->channel_triggers_ht
,
1757 hash_channel_key(&new_channel_info
->key
),
1758 &channel_trigger_list
->channel_triggers_ht_node
);
1760 session_info_put(session_info
);
1761 *cmd_result
= LTTNG_OK
;
1764 channel_info_destroy(new_channel_info
);
1765 session_info_put(session_info
);
1770 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1772 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1777 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1779 free(caa_container_of(node
, struct channel_state_sample
,
1784 int handle_notification_thread_command_remove_channel(
1785 struct notification_thread_state
*state
,
1786 uint64_t channel_key
, enum lttng_domain_type domain
,
1787 enum lttng_error_code
*cmd_result
)
1789 struct cds_lfht_node
*node
;
1790 struct cds_lfht_iter iter
;
1791 struct lttng_channel_trigger_list
*trigger_list
;
1792 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1793 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1794 struct channel_info
*channel_info
;
1796 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1797 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1801 cds_lfht_lookup(state
->channel_triggers_ht
,
1802 hash_channel_key(&key
),
1803 match_channel_trigger_list
,
1806 node
= cds_lfht_iter_get_node(&iter
);
1808 * There is a severe internal error if we are being asked to remove a
1809 * channel that doesn't exist.
1812 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1816 /* Free the list of triggers associated with this channel. */
1817 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1818 channel_triggers_ht_node
);
1819 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1820 &trigger_list
->list
, node
) {
1821 cds_list_del(&trigger_list_element
->node
);
1822 free(trigger_list_element
);
1824 cds_lfht_del(state
->channel_triggers_ht
, node
);
1825 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1827 /* Free sampled channel state. */
1828 cds_lfht_lookup(state
->channel_state_ht
,
1829 hash_channel_key(&key
),
1830 match_channel_state_sample
,
1833 node
= cds_lfht_iter_get_node(&iter
);
1835 * This is expected to be NULL if the channel is destroyed before we
1836 * received a sample.
1839 struct channel_state_sample
*sample
= caa_container_of(node
,
1840 struct channel_state_sample
,
1841 channel_state_ht_node
);
1843 cds_lfht_del(state
->channel_state_ht
, node
);
1844 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1847 /* Remove the channel from the channels_ht and free it. */
1848 cds_lfht_lookup(state
->channels_ht
,
1849 hash_channel_key(&key
),
1853 node
= cds_lfht_iter_get_node(&iter
);
1855 channel_info
= caa_container_of(node
, struct channel_info
,
1857 cds_lfht_del(state
->channels_ht
, node
);
1858 channel_info_destroy(channel_info
);
1861 *cmd_result
= LTTNG_OK
;
1866 int handle_notification_thread_command_session_rotation(
1867 struct notification_thread_state
*state
,
1868 enum notification_thread_command_type cmd_type
,
1869 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1870 uint64_t trace_archive_chunk_id
,
1871 struct lttng_trace_archive_location
*location
,
1872 enum lttng_error_code
*_cmd_result
)
1875 enum lttng_error_code cmd_result
= LTTNG_OK
;
1876 struct lttng_session_trigger_list
*trigger_list
;
1877 struct lttng_trigger_list_element
*trigger_list_element
;
1878 struct session_info
*session_info
;
1882 session_info
= find_or_create_session_info(state
, session_name
,
1883 session_uid
, session_gid
);
1884 if (!session_info
) {
1885 /* Allocation error or an internal error occurred. */
1887 cmd_result
= LTTNG_ERR_NOMEM
;
1891 session_info
->rotation
.ongoing
=
1892 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1893 session_info
->rotation
.id
= trace_archive_chunk_id
;
1894 trigger_list
= get_session_trigger_list(state
, session_name
);
1895 if (!trigger_list
) {
1896 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1901 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1903 const struct lttng_condition
*condition
;
1904 const struct lttng_action
*action
;
1905 const struct lttng_trigger
*trigger
;
1906 struct notification_client_list
*client_list
;
1907 struct lttng_evaluation
*evaluation
= NULL
;
1908 enum lttng_condition_type condition_type
;
1909 bool client_list_is_empty
;
1911 trigger
= trigger_list_element
->trigger
;
1912 condition
= lttng_trigger_get_const_condition(trigger
);
1914 condition_type
= lttng_condition_get_type(condition
);
1916 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1917 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1919 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1920 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1924 action
= lttng_trigger_get_const_action(trigger
);
1926 /* Notify actions are the only type currently supported. */
1927 assert(lttng_action_get_type_const(action
) ==
1928 LTTNG_ACTION_TYPE_NOTIFY
);
1930 client_list
= get_client_list_from_condition(state
, condition
);
1931 assert(client_list
);
1933 pthread_mutex_lock(&client_list
->lock
);
1934 client_list_is_empty
= cds_list_empty(&client_list
->list
);
1935 pthread_mutex_unlock(&client_list
->lock
);
1936 if (client_list_is_empty
) {
1938 * No clients interested in the evaluation's result,
1944 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1945 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1946 trace_archive_chunk_id
);
1948 evaluation
= lttng_evaluation_session_rotation_completed_create(
1949 trace_archive_chunk_id
, location
);
1953 /* Internal error */
1955 cmd_result
= LTTNG_ERR_UNK
;
1959 /* Dispatch evaluation result to all clients. */
1960 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1961 evaluation
, client_list
, state
,
1964 lttng_evaluation_destroy(evaluation
);
1966 notification_client_list_put(client_list
);
1967 if (caa_unlikely(ret
)) {
1972 session_info_put(session_info
);
1973 *_cmd_result
= cmd_result
;
1979 int handle_notification_thread_command_add_application(
1980 struct notification_thread_handle
*handle
,
1981 struct notification_thread_state
*state
,
1982 int read_side_trigger_event_application_pipe
,
1983 enum lttng_error_code
*_cmd_result
)
1986 enum lttng_error_code cmd_result
= LTTNG_OK
;
1987 struct notification_event_trigger_source_element
*element
= NULL
;
1989 element
= zmalloc(sizeof(*element
));
1991 cmd_result
= LTTNG_ERR_NOMEM
;
1996 CDS_INIT_LIST_HEAD(&element
->node
);
1997 element
->fd
= read_side_trigger_event_application_pipe
;
1999 pthread_mutex_lock(&handle
->event_trigger_sources
.lock
);
2000 cds_list_add(&element
->node
, &handle
->event_trigger_sources
.list
);
2001 pthread_mutex_unlock(&handle
->event_trigger_sources
.lock
);
2003 /* TODO: remove on failure to add to list? */
2005 /* Adding the read side pipe to the event poll */
2006 ret
= lttng_poll_add(&state
->events
,
2007 read_side_trigger_event_application_pipe
,
2008 LPOLLIN
| LPOLLERR
);
2010 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe
);
2012 /* TODO: what should be the value of cmd_result??? */
2013 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
2018 *_cmd_result
= cmd_result
;
2023 int handle_notification_thread_command_remove_application(
2024 struct notification_thread_handle
*handle
,
2025 struct notification_thread_state
*state
,
2026 int read_side_trigger_event_application_pipe
,
2027 enum lttng_error_code
*_cmd_result
)
2030 enum lttng_error_code cmd_result
= LTTNG_OK
;
2032 /* TODO: missing a lock propably to revisit */
2033 struct notification_event_trigger_source_element
*source_element
, *tmp
;
2034 cds_list_for_each_entry_safe(source_element
, tmp
,
2035 &handle
->event_trigger_sources
.list
, node
) {
2036 if (source_element
->fd
!= read_side_trigger_event_application_pipe
) {
2040 DBG("[notification-thread] Removed event source from event source list");
2041 cds_list_del(&source_element
->node
);
2045 DBG3("[notification-thread] Removing application event source from fd: %d", read_side_trigger_event_application_pipe
);
2046 /* Removing the read side pipe to the event poll */
2047 ret
= lttng_poll_del(&state
->events
,
2048 read_side_trigger_event_application_pipe
);
2050 /* TODO: what should be the value of cmd_result??? */
2051 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
2056 *_cmd_result
= cmd_result
;
2060 static int handle_notification_thread_command_get_tokens(
2061 struct notification_thread_handle
*handle
,
2062 struct notification_thread_state
*state
,
2063 struct lttng_triggers
**triggers
,
2064 enum lttng_error_code
*_cmd_result
)
2067 enum lttng_error_code cmd_result
= LTTNG_OK
;
2068 struct cds_lfht_iter iter
;
2069 struct notification_trigger_tokens_ht_element
*element
;
2070 struct lttng_triggers
*local_triggers
= NULL
;
2072 local_triggers
= lttng_triggers_create();
2073 if (!local_triggers
) {
2074 cmd_result
= LTTNG_ERR_NOMEM
;
2079 cds_lfht_for_each_entry (
2080 state
->trigger_tokens_ht
, &iter
, element
, node
) {
2081 ret
= lttng_triggers_add(local_triggers
, element
->trigger
);
2083 cmd_result
= LTTNG_ERR_FATAL
;
2088 /* Ownership is shared with the lttng_triggers object */
2089 lttng_trigger_get(element
->trigger
);
2094 /* Passing ownership up */
2095 *triggers
= local_triggers
;
2096 local_triggers
= NULL
;
2100 lttng_triggers_destroy(local_triggers
);
2101 *_cmd_result
= cmd_result
;
2106 int handle_notification_thread_command_list_triggers(
2107 struct notification_thread_handle
*handle
,
2108 struct notification_thread_state
*state
,
2111 struct lttng_triggers
**triggers
,
2112 enum lttng_error_code
*_cmd_result
)
2115 enum lttng_error_code cmd_result
= LTTNG_OK
;
2116 struct cds_lfht_iter iter
;
2117 struct lttng_trigger_ht_element
*trigger_ht_element
;
2118 struct lttng_triggers
*local_triggers
= NULL
;
2119 const struct lttng_credentials
*creds
;
2122 unsigned long count
;
2125 cds_lfht_count_nodes(state
->triggers_ht
, &scb
, &count
, &sca
);
2127 /* TODO check downcasting */
2128 local_triggers
= lttng_triggers_create();
2129 if (!local_triggers
) {
2130 cmd_result
= LTTNG_ERR_NOMEM
;
2134 cds_lfht_for_each_entry (state
->triggers_ht
, &iter
,
2135 trigger_ht_element
, node
) {
2136 /* Only return the trigger for which the requestion client have
2137 * access. For now the root user can only list its own
2139 * TODO: root user behavior
2141 creds
= lttng_trigger_get_credentials(trigger_ht_element
->trigger
);
2142 if ((uid
!= creds
->uid
) || (gid
!= creds
->gid
)) {
2146 ret
= lttng_triggers_add(local_triggers
, trigger_ht_element
->trigger
);
2151 /* Ownership is shared with the lttng_triggers object */
2152 lttng_trigger_get(trigger_ht_element
->trigger
);
2157 /* Passing ownership up */
2158 *triggers
= local_triggers
;
2159 local_triggers
= NULL
;
2163 lttng_triggers_destroy(local_triggers
);
2164 *_cmd_result
= cmd_result
;
2169 int condition_is_supported(struct lttng_condition
*condition
)
2173 switch (lttng_condition_get_type(condition
)) {
2174 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
2175 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
2177 enum lttng_domain_type domain
;
2179 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
2186 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
2192 * Older kernel tracers don't expose the API to monitor their
2193 * buffers. Therefore, we reject triggers that require that
2194 * mechanism to be available to be evaluated.
2196 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
2199 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
:
2202 * Check for kernel support.
2203 * Check for ust support ??
2216 int action_is_supported(struct lttng_action
*action
)
2220 switch (lttng_action_get_type(action
)) {
2221 case LTTNG_ACTION_TYPE_NOTIFY
:
2222 case LTTNG_ACTION_TYPE_START_SESSION
:
2223 case LTTNG_ACTION_TYPE_STOP_SESSION
:
2224 case LTTNG_ACTION_TYPE_ROTATE_SESSION
:
2225 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION
:
2227 /* TODO validate that this is true for kernel in regards to
2228 * rotation and snapshot. Start stop is not a problem notify
2231 /* For now all type of actions are supported */
2235 case LTTNG_ACTION_TYPE_GROUP
:
2237 /* TODO: Iterate over all internal actions and validate that
2238 * they are supported
2251 /* Must be called with RCU read lock held. */
2253 int bind_trigger_to_matching_session(struct lttng_trigger
*trigger
,
2254 struct notification_thread_state
*state
)
2257 const struct lttng_condition
*condition
;
2258 const char *session_name
;
2259 struct lttng_session_trigger_list
*trigger_list
;
2261 condition
= lttng_trigger_get_const_condition(trigger
);
2262 switch (lttng_condition_get_type(condition
)) {
2263 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
2264 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
2266 enum lttng_condition_status status
;
2268 status
= lttng_condition_session_rotation_get_session_name(
2269 condition
, &session_name
);
2270 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
2271 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2282 trigger_list
= get_session_trigger_list(state
, session_name
);
2283 if (!trigger_list
) {
2284 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2290 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2292 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
2297 /* Must be called with RCU read lock held. */
2299 int bind_trigger_to_matching_channels(struct lttng_trigger
*trigger
,
2300 struct notification_thread_state
*state
)
2303 struct cds_lfht_node
*node
;
2304 struct cds_lfht_iter iter
;
2305 struct channel_info
*channel
;
2307 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
2309 struct lttng_trigger_list_element
*trigger_list_element
;
2310 struct lttng_channel_trigger_list
*trigger_list
;
2311 struct cds_lfht_iter lookup_iter
;
2313 if (!trigger_applies_to_channel(trigger
, channel
)) {
2317 cds_lfht_lookup(state
->channel_triggers_ht
,
2318 hash_channel_key(&channel
->key
),
2319 match_channel_trigger_list
,
2322 node
= cds_lfht_iter_get_node(&lookup_iter
);
2324 trigger_list
= caa_container_of(node
,
2325 struct lttng_channel_trigger_list
,
2326 channel_triggers_ht_node
);
2328 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2329 if (!trigger_list_element
) {
2333 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2334 trigger_list_element
->trigger
= trigger
;
2335 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2336 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2343 static int action_notify_register_trigger(
2344 struct notification_thread_state
*state
,
2345 struct lttng_trigger
*trigger
)
2349 struct lttng_condition
*condition
;
2350 struct notification_client
*client
;
2351 struct notification_client_list
*client_list
= NULL
;
2352 struct cds_lfht_iter iter
;
2353 struct notification_client_list_element
*client_list_element
, *tmp
;
2355 condition
= lttng_trigger_get_condition(trigger
);
2358 client_list
= notification_client_list_create(trigger
);
2364 /* Build a list of clients to which this new trigger applies. */
2365 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2366 client_socket_ht_node
) {
2367 if (!trigger_applies_to_client(trigger
, client
)) {
2371 client_list_element
= zmalloc(sizeof(*client_list_element
));
2372 if (!client_list_element
) {
2374 goto error_put_client_list
;
2376 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2377 client_list_element
->client
= client
;
2378 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2381 switch (get_condition_binding_object(condition
)) {
2382 case LTTNG_OBJECT_TYPE_SESSION
:
2383 /* Add the trigger to the list if it matches a known session. */
2384 ret
= bind_trigger_to_matching_session(trigger
, state
);
2386 goto error_put_client_list
;
2389 case LTTNG_OBJECT_TYPE_CHANNEL
:
2391 * Add the trigger to list of triggers bound to the channels
2394 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2396 goto error_put_client_list
;
2399 case LTTNG_OBJECT_TYPE_NONE
:
2402 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2404 goto error_put_client_list
;
2408 * Since there is nothing preventing clients from subscribing to a
2409 * condition before the corresponding trigger is registered, we have
2410 * to evaluate this new condition right away.
2412 * At some point, we were waiting for the next "evaluation" (e.g. on
2413 * reception of a channel sample) to evaluate this new condition, but
2416 * The reason it was broken is that waiting for the next sample
2417 * does not allow us to properly handle transitions for edge-triggered
2420 * Consider this example: when we handle a new channel sample, we
2421 * evaluate each conditions twice: once with the previous state, and
2422 * again with the newest state. We then use those two results to
2423 * determine whether a state change happened: a condition was false and
2424 * became true. If a state change happened, we have to notify clients.
2426 * Now, if a client subscribes to a given notification and registers
2427 * a trigger *after* that subscription, we have to make sure the
2428 * condition is evaluated at this point while considering only the
2429 * current state. Otherwise, the next evaluation cycle may only see
2430 * that the evaluations remain the same (true for samples n-1 and n) and
2431 * the client will never know that the condition has been met.
2433 * No need to lock the list here as it has not been published yet.
2435 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2436 &client_list
->list
, node
) {
2437 ret
= evaluate_condition_for_client(trigger
, condition
,
2438 client_list_element
->client
, state
);
2440 goto error_put_client_list
;
2445 * Client list ownership transferred to the
2446 * notification_trigger_clients_ht.
2448 publish_notification_client_list(state
, client_list
);
2450 error_put_client_list
:
2451 notification_client_list_put(client_list
);
2457 bool trigger_name_taken(struct notification_thread_state
*state
, const char *name
)
2459 struct cds_lfht_node
*triggers_by_name_ht_node
;
2460 struct cds_lfht_iter iter
;
2461 /* TODO change hashing for trigger */
2462 cds_lfht_lookup(state
->triggers_by_name_ht
,
2463 hash_key_str(name
, lttng_ht_seed
),
2467 triggers_by_name_ht_node
= cds_lfht_iter_get_node(&iter
);
2468 if (triggers_by_name_ht_node
) {
2476 void generate_trigger_name(struct notification_thread_state
*state
, struct lttng_trigger
*trigger
, const char **name
)
2478 /* Here the offset criteria guarantee an end. This will be a nice
2479 * bikeshedding conversation. I would simply generate uuid and use them
2484 lttng_trigger_generate_name(trigger
, state
->trigger_id
.name_offset
);
2485 /* TODO error checking */
2486 lttng_trigger_get_name(trigger
, name
);
2487 taken
= trigger_name_taken(state
, *name
);
2489 state
->trigger_id
.name_offset
++;
2491 } while (taken
|| state
->trigger_id
.name_offset
== UINT32_MAX
);
2494 static bool action_is_notify(const struct lttng_action
*action
)
2496 /* TODO for action groups we need to iterate over all of them */
2497 enum lttng_action_type type
= lttng_action_get_type_const(action
);
2499 enum lttng_action_status status
;
2500 const struct lttng_action
*tmp
;
2501 unsigned int i
, count
;
2504 case LTTNG_ACTION_TYPE_NOTIFY
:
2507 case LTTNG_ACTION_TYPE_GROUP
:
2508 status
= lttng_action_group_get_count(action
, &count
);
2509 if (status
!= LTTNG_ACTION_STATUS_OK
) {
2512 for (i
= 0; i
< count
; i
++) {
2513 tmp
= lttng_action_group_get_at_index_const(action
, i
);
2515 ret
= action_is_notify(tmp
);
2530 * TODO: REVIEW THIS COMMENT.
2531 * FIXME A client's credentials are not checked when registering a trigger, nor
2532 * are they stored alongside with the trigger.
2534 * The effects of this are benign since:
2535 * - The client will succeed in registering the trigger, as it is valid,
2536 * - The trigger will, internally, be bound to the channel/session,
2537 * - The notifications will not be sent since the client's credentials
2538 * are checked against the channel at that moment.
2540 * If this function returns a non-zero value, it means something is
2541 * fundamentally broken and the whole subsystem/thread will be torn down.
2543 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2547 int handle_notification_thread_command_register_trigger(
2548 struct notification_thread_state
*state
,
2549 struct lttng_trigger
*trigger
,
2550 enum lttng_error_code
*cmd_result
)
2554 struct lttng_condition
*condition
;
2555 struct lttng_action
*action
;
2556 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2557 struct notification_trigger_tokens_ht_element
*trigger_tokens_ht_element
= NULL
;
2558 struct cds_lfht_node
*node
;
2559 const char* trigger_name
;
2560 bool free_trigger
= true;
2562 assert(trigger
->creds
.set
);
2566 /* Set the trigger's key */
2567 lttng_trigger_set_key(trigger
, state
->trigger_id
.token_generator
);
2569 if (lttng_trigger_get_name(trigger
, &trigger_name
) == LTTNG_TRIGGER_STATUS_UNSET
) {
2570 generate_trigger_name(state
, trigger
, &trigger_name
);
2571 } else if (trigger_name_taken(state
, trigger_name
)) {
2572 /* Not a fatal error */
2573 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2578 condition
= lttng_trigger_get_condition(trigger
);
2581 action
= lttng_trigger_get_action(trigger
);
2584 is_supported
= condition_is_supported(condition
);
2585 if (is_supported
< 0) {
2587 } else if (is_supported
== 0) {
2589 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2593 is_supported
= action_is_supported(action
);
2594 if (is_supported
< 0) {
2596 } else if (is_supported
== 0) {
2598 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2602 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2603 if (!trigger_ht_element
) {
2608 /* Add trigger to the trigger_ht. */
2609 cds_lfht_node_init(&trigger_ht_element
->node
);
2610 cds_lfht_node_init(&trigger_ht_element
->node_by_name
);
2613 * This element own the trigger object from now own, this is why there
2614 * is no lttng_trigger_get here.
2615 * This thread is now the owner of the trigger object.
2617 trigger_ht_element
->trigger
= trigger
;
2619 node
= cds_lfht_add_unique(state
->triggers_ht
,
2620 lttng_condition_hash(condition
),
2623 &trigger_ht_element
->node
);
2624 if (node
!= &trigger_ht_element
->node
) {
2625 /* Not a fatal error, simply report it to the client. */
2626 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2627 goto error_free_ht_element
;
2630 node
= cds_lfht_add_unique(state
->triggers_by_name_ht
,
2631 hash_key_str(trigger_name
, lttng_ht_seed
),
2634 &trigger_ht_element
->node_by_name
);
2635 if (node
!= &trigger_ht_element
->node_by_name
) {
2636 /* This should never happen */
2637 /* Not a fatal error, simply report it to the client. */
2638 /* TODO remove from the trigger_ht */
2639 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2640 goto error_free_ht_element
;
2643 if (lttng_condition_get_type(condition
) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
) {
2644 trigger_tokens_ht_element
= zmalloc(sizeof(*trigger_tokens_ht_element
));
2645 if (!trigger_tokens_ht_element
) {
2650 /* Add trigger token to the trigger_tokens_ht. */
2651 cds_lfht_node_init(&trigger_tokens_ht_element
->node
);
2652 trigger_tokens_ht_element
->token
= trigger
->key
.value
;
2653 trigger_tokens_ht_element
->trigger
= trigger
;
2655 node
= cds_lfht_add_unique(state
->trigger_tokens_ht
,
2656 hash_key_u64(&trigger_tokens_ht_element
->token
, lttng_ht_seed
),
2657 match_trigger_token
,
2658 &trigger_tokens_ht_element
->token
,
2659 &trigger_tokens_ht_element
->node
);
2660 if (node
!= &trigger_tokens_ht_element
->node
) {
2661 /* TODO: THIS IS A FATAL ERROR... should never happen */
2662 /* Not a fatal error, simply report it to the client. */
2663 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2664 goto error_free_ht_element
;
2669 * Ownership of the trigger and of its wrapper was transfered to
2670 * the triggers_ht. Same for token ht element if necessary.
2672 trigger_tokens_ht_element
= NULL
;
2673 trigger_ht_element
= NULL
;
2674 free_trigger
= false;
2676 if (action_is_notify(action
)) {
2677 ret
= action_notify_register_trigger(state
, trigger
);
2679 /* TODO should cmd_result be set here? */
2681 goto error_free_ht_element
;
2685 /* Increment the trigger unique id generator */
2686 state
->trigger_id
.token_generator
++;
2687 *cmd_result
= LTTNG_OK
;
2689 error_free_ht_element
:
2690 free(trigger_ht_element
);
2691 free(trigger_tokens_ht_element
);
2694 lttng_trigger_destroy(trigger
);
2701 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2703 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2708 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head
*node
)
2710 free(caa_container_of(node
, struct notification_trigger_tokens_ht_element
,
2715 int handle_notification_thread_command_unregister_trigger(
2716 struct notification_thread_state
*state
,
2717 struct lttng_trigger
*trigger
,
2718 enum lttng_error_code
*_cmd_reply
)
2720 struct cds_lfht_iter iter
;
2721 struct cds_lfht_node
*triggers_ht_node
;
2722 struct lttng_channel_trigger_list
*trigger_list
;
2723 struct notification_client_list
*client_list
;
2724 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2725 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2727 struct lttng_action
*action
= lttng_trigger_get_action(trigger
);
2728 enum lttng_error_code cmd_reply
;
2732 /* TODO change hashing for trigger */
2733 /* TODO Disabling for the root user is not complete, for now the root
2734 * user cannot disable the trigger from another user.
2736 cds_lfht_lookup(state
->triggers_ht
,
2737 lttng_condition_hash(condition
),
2741 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2742 if (!triggers_ht_node
) {
2743 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2746 cmd_reply
= LTTNG_OK
;
2749 /* Remove trigger from channel_triggers_ht. */
2750 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2751 channel_triggers_ht_node
) {
2752 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2754 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2755 &trigger_list
->list
, node
) {
2756 if (!lttng_trigger_is_equal(trigger
, trigger_element
->trigger
)) {
2760 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2761 cds_list_del(&trigger_element
->node
);
2762 /* A trigger can only appear once per channel */
2767 if (lttng_condition_get_type(condition
) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT
) {
2768 struct notification_trigger_tokens_ht_element
*trigger_tokens_ht_element
;
2769 cds_lfht_for_each_entry(state
->trigger_tokens_ht
, &iter
, trigger_tokens_ht_element
,
2771 if (!lttng_trigger_is_equal(trigger
, trigger_tokens_ht_element
->trigger
)) {
2775 /* TODO talk to all app and remove it */
2776 DBG("[notification-thread] Removed trigger from tokens_ht");
2777 cds_lfht_del(state
->trigger_tokens_ht
,
2778 &trigger_tokens_ht_element
->node
);
2779 call_rcu(&trigger_tokens_ht_element
->rcu_node
, free_notification_trigger_tokens_ht_element_rcu
);
2785 if (action_is_notify(action
)) {
2787 * Remove and release the client list from
2788 * notification_trigger_clients_ht.
2790 client_list
= get_client_list_from_condition(state
, condition
);
2791 assert(client_list
);
2793 /* Put new reference and the hashtable's reference. */
2794 notification_client_list_put(client_list
);
2795 notification_client_list_put(client_list
);
2799 /* Remove trigger from triggers_ht. */
2800 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2801 struct lttng_trigger_ht_element
, node
);
2802 cds_lfht_del(state
->triggers_by_name_ht
, &trigger_ht_element
->node_by_name
);
2803 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2805 /* Release the ownership of the trigger */
2806 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2807 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2811 *_cmd_reply
= cmd_reply
;
2816 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2817 int handle_notification_thread_command(
2818 struct notification_thread_handle
*handle
,
2819 struct notification_thread_state
*state
)
2823 struct notification_thread_command
*cmd
;
2825 /* Read the event pipe to put it back into a quiescent state. */
2826 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2828 if (ret
!= sizeof(counter
)) {
2832 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2833 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2834 struct notification_thread_command
, cmd_list_node
);
2835 switch (cmd
->type
) {
2836 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2837 DBG("[notification-thread] Received register trigger command");
2838 ret
= handle_notification_thread_command_register_trigger(
2839 state
, cmd
->parameters
.trigger
,
2842 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2843 DBG("[notification-thread] Received unregister trigger command");
2844 ret
= handle_notification_thread_command_unregister_trigger(
2845 state
, cmd
->parameters
.trigger
,
2848 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2849 DBG("[notification-thread] Received add channel command");
2850 ret
= handle_notification_thread_command_add_channel(
2852 cmd
->parameters
.add_channel
.session
.name
,
2853 cmd
->parameters
.add_channel
.session
.uid
,
2854 cmd
->parameters
.add_channel
.session
.gid
,
2855 cmd
->parameters
.add_channel
.channel
.name
,
2856 cmd
->parameters
.add_channel
.channel
.domain
,
2857 cmd
->parameters
.add_channel
.channel
.key
,
2858 cmd
->parameters
.add_channel
.channel
.capacity
,
2861 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2862 DBG("[notification-thread] Received remove channel command");
2863 ret
= handle_notification_thread_command_remove_channel(
2864 state
, cmd
->parameters
.remove_channel
.key
,
2865 cmd
->parameters
.remove_channel
.domain
,
2868 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2869 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2870 DBG("[notification-thread] Received session rotation %s command",
2871 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2872 "ongoing" : "completed");
2873 ret
= handle_notification_thread_command_session_rotation(
2876 cmd
->parameters
.session_rotation
.session_name
,
2877 cmd
->parameters
.session_rotation
.uid
,
2878 cmd
->parameters
.session_rotation
.gid
,
2879 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2880 cmd
->parameters
.session_rotation
.location
,
2883 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION
:
2884 ret
= handle_notification_thread_command_add_application(
2887 cmd
->parameters
.application
.read_side_trigger_event_application_pipe
,
2890 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION
:
2891 ret
= handle_notification_thread_command_remove_application(
2894 cmd
->parameters
.application
.read_side_trigger_event_application_pipe
,
2897 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS
:
2899 struct lttng_triggers
*triggers
= NULL
;
2900 ret
= handle_notification_thread_command_get_tokens(
2901 handle
, state
, &triggers
, &cmd
->reply_code
);
2902 cmd
->reply
.get_tokens
.triggers
= triggers
;
2906 cmd
->reply_code
= LTTNG_OK
;
2910 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS
:
2912 struct lttng_triggers
*triggers
= NULL
;
2913 ret
= handle_notification_thread_command_list_triggers(
2916 cmd
->parameters
.list_triggers
.uid
,
2917 cmd
->parameters
.list_triggers
.gid
,
2920 cmd
->reply
.list_triggers
.triggers
= triggers
;
2924 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2925 DBG("[notification-thread] Received quit command");
2926 cmd
->reply_code
= LTTNG_OK
;
2930 ERR("[notification-thread] Unknown internal command received");
2938 cds_list_del(&cmd
->cmd_list_node
);
2939 if (cmd
->is_async
) {
2943 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2945 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2948 /* Wake-up and return a fatal error to the calling thread. */
2949 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2950 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2951 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2953 /* Indicate a fatal error to the caller. */
2958 int socket_set_non_blocking(int socket
)
2962 /* Set the pipe as non-blocking. */
2963 ret
= fcntl(socket
, F_GETFL
, 0);
2965 PERROR("fcntl get socket flags");
2970 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2972 PERROR("fcntl set O_NONBLOCK socket flag");
2975 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2980 /* Client lock must be acquired by caller. */
2982 int client_reset_inbound_state(struct notification_client
*client
)
2986 ASSERT_LOCKED(client
->lock
);
2988 ret
= lttng_dynamic_buffer_set_size(
2989 &client
->communication
.inbound
.buffer
, 0);
2992 client
->communication
.inbound
.bytes_to_receive
=
2993 sizeof(struct lttng_notification_channel_message
);
2994 client
->communication
.inbound
.msg_type
=
2995 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2996 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2997 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2998 ret
= lttng_dynamic_buffer_set_size(
2999 &client
->communication
.inbound
.buffer
,
3000 client
->communication
.inbound
.bytes_to_receive
);
3004 int handle_notification_thread_client_connect(
3005 struct notification_thread_state
*state
)
3008 struct notification_client
*client
;
3010 DBG("[notification-thread] Handling new notification channel client connection");
3012 client
= zmalloc(sizeof(*client
));
3018 pthread_mutex_init(&client
->lock
, NULL
);
3019 client
->id
= state
->next_notification_client_id
++;
3020 CDS_INIT_LIST_HEAD(&client
->condition_list
);
3021 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
3022 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
3023 client
->communication
.inbound
.expect_creds
= true;
3025 pthread_mutex_lock(&client
->lock
);
3026 ret
= client_reset_inbound_state(client
);
3027 pthread_mutex_unlock(&client
->lock
);
3029 ERR("[notification-thread] Failed to reset client communication's inbound state");
3034 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
3036 ERR("[notification-thread] Failed to accept new notification channel client connection");
3041 client
->socket
= ret
;
3043 ret
= socket_set_non_blocking(client
->socket
);
3045 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3049 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
3051 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3056 ret
= lttng_poll_add(&state
->events
, client
->socket
,
3057 LPOLLIN
| LPOLLERR
|
3058 LPOLLHUP
| LPOLLRDHUP
);
3060 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3064 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3068 cds_lfht_add(state
->client_socket_ht
,
3069 hash_client_socket(client
->socket
),
3070 &client
->client_socket_ht_node
);
3071 cds_lfht_add(state
->client_id_ht
,
3072 hash_client_id(client
->id
),
3073 &client
->client_id_ht_node
);
3078 notification_client_destroy(client
, state
);
3082 /* RCU read-lock must be held by the caller. */
3084 int notification_thread_client_disconnect(
3085 struct notification_client
*client
,
3086 struct notification_thread_state
*state
)
3089 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
3091 /* Acquire the client lock to disable its communication atomically. */
3092 pthread_mutex_lock(&client
->lock
);
3093 client
->communication
.active
= false;
3094 ret
= lttng_poll_del(&state
->events
, client
->socket
);
3096 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3099 pthread_mutex_unlock(&client
->lock
);
3101 cds_lfht_del(state
->client_socket_ht
, &client
->client_socket_ht_node
);
3102 cds_lfht_del(state
->client_id_ht
, &client
->client_id_ht_node
);
3104 /* Release all conditions to which the client was subscribed. */
3105 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
3106 &client
->condition_list
, node
) {
3107 (void) notification_thread_client_unsubscribe(client
,
3108 condition_list_element
->condition
, state
, NULL
);
3112 * Client no longer accessible to other threads (through the
3115 notification_client_destroy(client
, state
);
3119 int handle_notification_thread_client_disconnect(
3120 int client_socket
, struct notification_thread_state
*state
)
3123 struct notification_client
*client
;
3126 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3128 client
= get_client_from_socket(client_socket
, state
);
3130 /* Internal state corruption, fatal error. */
3131 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3137 ret
= notification_thread_client_disconnect(client
, state
);
3143 int handle_notification_thread_client_disconnect_all(
3144 struct notification_thread_state
*state
)
3146 struct cds_lfht_iter iter
;
3147 struct notification_client
*client
;
3148 bool error_encoutered
= false;
3151 DBG("[notification-thread] Closing all client connections");
3152 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
3153 client_socket_ht_node
) {
3156 ret
= notification_thread_client_disconnect(
3159 error_encoutered
= true;
3163 return error_encoutered
? 1 : 0;
3166 int handle_notification_thread_trigger_unregister_all(
3167 struct notification_thread_state
*state
)
3169 bool error_occurred
= false;
3170 struct cds_lfht_iter iter
;
3171 struct lttng_trigger_ht_element
*trigger_ht_element
;
3174 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
3176 int ret
= handle_notification_thread_command_unregister_trigger(
3177 state
, trigger_ht_element
->trigger
, NULL
);
3179 error_occurred
= true;
3183 return error_occurred
? -1 : 0;
3187 int client_handle_transmission_status(
3188 struct notification_client
*client
,
3189 enum client_transmission_status transmission_status
,
3190 struct notification_thread_state
*state
)
3194 ASSERT_LOCKED(client
->lock
);
3196 switch (transmission_status
) {
3197 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
3198 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
3199 CLIENT_POLL_MASK_IN
);
3204 client
->communication
.outbound
.queued_command_reply
= false;
3205 client
->communication
.outbound
.dropped_notification
= false;
3207 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
3209 * We want to be notified whenever there is buffer space
3210 * available to send the rest of the payload.
3212 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
3213 CLIENT_POLL_MASK_IN_OUT
);
3218 case CLIENT_TRANSMISSION_STATUS_FAIL
:
3219 ret
= notification_thread_client_disconnect(client
, state
);
3224 case CLIENT_TRANSMISSION_STATUS_ERROR
:
3234 /* Client lock must be acquired by caller. */
3236 enum client_transmission_status
client_flush_outgoing_queue(
3237 struct notification_client
*client
)
3240 size_t to_send_count
;
3241 enum client_transmission_status status
;
3243 ASSERT_LOCKED(client
->lock
);
3245 assert(client
->communication
.outbound
.buffer
.size
!= 0);
3246 to_send_count
= client
->communication
.outbound
.buffer
.size
;
3247 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3250 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
3251 client
->communication
.outbound
.buffer
.data
,
3253 if ((ret
< 0 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) ||
3254 (ret
> 0 && ret
< to_send_count
)) {
3255 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3257 to_send_count
-= max(ret
, 0);
3259 memcpy(client
->communication
.outbound
.buffer
.data
,
3260 client
->communication
.outbound
.buffer
.data
+
3261 client
->communication
.outbound
.buffer
.size
- to_send_count
,
3263 ret
= lttng_dynamic_buffer_set_size(
3264 &client
->communication
.outbound
.buffer
,
3269 status
= CLIENT_TRANSMISSION_STATUS_QUEUED
;
3270 } else if (ret
< 0) {
3271 /* Generic error, disconnect the client. */
3272 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3274 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
3276 /* No error and flushed the queue completely. */
3277 ret
= lttng_dynamic_buffer_set_size(
3278 &client
->communication
.outbound
.buffer
, 0);
3282 status
= CLIENT_TRANSMISSION_STATUS_COMPLETE
;
3287 return CLIENT_TRANSMISSION_STATUS_ERROR
;
3290 /* Client lock must be acquired by caller. */
3292 int client_send_command_reply(struct notification_client
*client
,
3293 struct notification_thread_state
*state
,
3294 enum lttng_notification_channel_status status
)
3297 struct lttng_notification_channel_command_reply reply
= {
3298 .status
= (int8_t) status
,
3300 struct lttng_notification_channel_message msg
= {
3301 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
3302 .size
= sizeof(reply
),
3304 char buffer
[sizeof(msg
) + sizeof(reply
)];
3305 enum client_transmission_status transmission_status
;
3307 ASSERT_LOCKED(client
->lock
);
3309 if (client
->communication
.outbound
.queued_command_reply
) {
3310 /* Protocol error. */
3314 memcpy(buffer
, &msg
, sizeof(msg
));
3315 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
3316 DBG("[notification-thread] Send command reply (%i)", (int) status
);
3318 /* Enqueue buffer to outgoing queue and flush it. */
3319 ret
= lttng_dynamic_buffer_append(
3320 &client
->communication
.outbound
.buffer
,
3321 buffer
, sizeof(buffer
));
3326 transmission_status
= client_flush_outgoing_queue(client
);
3327 ret
= client_handle_transmission_status(
3328 client
, transmission_status
, state
);
3333 if (client
->communication
.outbound
.buffer
.size
!= 0) {
3334 /* Queue could not be emptied. */
3335 client
->communication
.outbound
.queued_command_reply
= true;
3344 int client_handle_message_unknown(struct notification_client
*client
,
3345 struct notification_thread_state
*state
)
3349 pthread_mutex_lock(&client
->lock
);
3352 * Receiving message header. The function will be called again
3353 * once the rest of the message as been received and can be
3356 const struct lttng_notification_channel_message
*msg
;
3358 assert(sizeof(*msg
) == client
->communication
.inbound
.buffer
.size
);
3359 msg
= (const struct lttng_notification_channel_message
*)
3360 client
->communication
.inbound
.buffer
.data
;
3362 if (msg
->size
== 0 ||
3363 msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
3364 ERR("[notification-thread] Invalid notification channel message: length = %u",
3370 switch (msg
->type
) {
3371 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3372 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3373 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3377 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3381 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
3382 client
->communication
.inbound
.msg_type
=
3383 (enum lttng_notification_channel_message_type
) msg
->type
;
3384 ret
= lttng_dynamic_buffer_set_size(
3385 &client
->communication
.inbound
.buffer
, msg
->size
);
3387 pthread_mutex_unlock(&client
->lock
);
3392 int client_handle_message_handshake(struct notification_client
*client
,
3393 struct notification_thread_state
*state
)
3396 struct lttng_notification_channel_command_handshake
*handshake_client
;
3397 const struct lttng_notification_channel_command_handshake handshake_reply
= {
3398 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
3399 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
3401 const struct lttng_notification_channel_message msg_header
= {
3402 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
3403 .size
= sizeof(handshake_reply
),
3405 enum lttng_notification_channel_status status
=
3406 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3407 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
3408 enum client_transmission_status transmission_status
;
3410 pthread_mutex_lock(&client
->lock
);
3412 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
3413 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
3414 sizeof(handshake_reply
));
3417 (struct lttng_notification_channel_command_handshake
*)
3418 client
->communication
.inbound
.buffer
3420 client
->major
= handshake_client
->major
;
3421 client
->minor
= handshake_client
->minor
;
3422 if (!client
->communication
.inbound
.creds_received
) {
3423 ERR("[notification-thread] No credentials received from client");
3428 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
3429 &client
->communication
.inbound
.creds
);
3430 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
3431 &client
->communication
.inbound
.creds
);
3432 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3433 client
->uid
, client
->gid
, (int) client
->major
,
3434 (int) client
->minor
);
3436 if (handshake_client
->major
!=
3437 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
3438 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
3441 ret
= lttng_dynamic_buffer_append(
3442 &client
->communication
.outbound
.buffer
, send_buffer
,
3443 sizeof(send_buffer
));
3445 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3449 transmission_status
= client_flush_outgoing_queue(client
);
3450 ret
= client_handle_transmission_status(
3451 client
, transmission_status
, state
);
3456 ret
= client_send_command_reply(client
, state
, status
);
3458 ERR("[notification-thread] Failed to send reply to notification channel client");
3462 /* Set reception state to receive the next message header. */
3463 ret
= client_reset_inbound_state(client
);
3465 ERR("[notification-thread] Failed to reset client communication's inbound state");
3468 client
->validated
= true;
3469 client
->communication
.active
= true;
3472 pthread_mutex_unlock(&client
->lock
);
3477 int client_handle_message_subscription(
3478 struct notification_client
*client
,
3479 enum lttng_notification_channel_message_type msg_type
,
3480 struct notification_thread_state
*state
)
3483 struct lttng_condition
*condition
;
3484 enum lttng_notification_channel_status status
=
3485 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3486 const struct lttng_buffer_view condition_view
=
3487 lttng_buffer_view_from_dynamic_buffer(
3488 &client
->communication
.inbound
.buffer
,
3490 size_t expected_condition_size
;
3492 pthread_mutex_lock(&client
->lock
);
3493 expected_condition_size
= client
->communication
.inbound
.buffer
.size
;
3494 pthread_mutex_unlock(&client
->lock
);
3496 ret
= lttng_condition_create_from_buffer(&condition_view
, &condition
);
3497 if (ret
!= expected_condition_size
) {
3498 ERR("[notification-thread] Malformed condition received from client");
3502 if (msg_type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
3503 ret
= notification_thread_client_subscribe(
3504 client
, condition
, state
, &status
);
3506 ret
= notification_thread_client_unsubscribe(
3507 client
, condition
, state
, &status
);
3513 pthread_mutex_lock(&client
->lock
);
3514 ret
= client_send_command_reply(client
, state
, status
);
3516 ERR("[notification-thread] Failed to send reply to notification channel client");
3520 /* Set reception state to receive the next message header. */
3521 ret
= client_reset_inbound_state(client
);
3523 ERR("[notification-thread] Failed to reset client communication's inbound state");
3528 pthread_mutex_unlock(&client
->lock
);
3534 int client_dispatch_message(struct notification_client
*client
,
3535 struct notification_thread_state
*state
)
3539 if (client
->communication
.inbound
.msg_type
!=
3540 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
3541 client
->communication
.inbound
.msg_type
!=
3542 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
3543 !client
->validated
) {
3544 WARN("[notification-thread] client attempted a command before handshake");
3549 switch (client
->communication
.inbound
.msg_type
) {
3550 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
3552 ret
= client_handle_message_unknown(client
, state
);
3555 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3557 ret
= client_handle_message_handshake(client
, state
);
3560 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3561 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3563 ret
= client_handle_message_subscription(client
,
3564 client
->communication
.inbound
.msg_type
, state
);
3574 /* Incoming data from client. */
3575 int handle_notification_thread_client_in(
3576 struct notification_thread_state
*state
, int socket
)
3579 struct notification_client
*client
;
3582 bool message_is_complete
= false;
3584 client
= get_client_from_socket(socket
, state
);
3586 /* Internal error, abort. */
3591 pthread_mutex_lock(&client
->lock
);
3592 offset
= client
->communication
.inbound
.buffer
.size
-
3593 client
->communication
.inbound
.bytes_to_receive
;
3594 if (client
->communication
.inbound
.expect_creds
) {
3595 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
3596 client
->communication
.inbound
.buffer
.data
+ offset
,
3597 client
->communication
.inbound
.bytes_to_receive
,
3598 &client
->communication
.inbound
.creds
);
3600 client
->communication
.inbound
.expect_creds
= false;
3601 client
->communication
.inbound
.creds_received
= true;
3604 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
3605 client
->communication
.inbound
.buffer
.data
+ offset
,
3606 client
->communication
.inbound
.bytes_to_receive
);
3608 if (recv_ret
>= 0) {
3609 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
3610 message_is_complete
= client
->communication
.inbound
3611 .bytes_to_receive
== 0;
3613 pthread_mutex_unlock(&client
->lock
);
3615 goto error_disconnect_client
;
3618 if (message_is_complete
) {
3619 ret
= client_dispatch_message(client
, state
);
3622 * Only returns an error if this client must be
3625 goto error_disconnect_client
;
3630 error_disconnect_client
:
3631 ret
= notification_thread_client_disconnect(client
, state
);
3635 /* Client ready to receive outgoing data. */
3636 int handle_notification_thread_client_out(
3637 struct notification_thread_state
*state
, int socket
)
3640 struct notification_client
*client
;
3641 enum client_transmission_status transmission_status
;
3643 client
= get_client_from_socket(socket
, state
);
3645 /* Internal error, abort. */
3650 pthread_mutex_lock(&client
->lock
);
3651 transmission_status
= client_flush_outgoing_queue(client
);
3652 ret
= client_handle_transmission_status(
3653 client
, transmission_status
, state
);
3654 pthread_mutex_unlock(&client
->lock
);
3663 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
3664 const struct channel_state_sample
*sample
,
3665 uint64_t buffer_capacity
)
3667 bool result
= false;
3669 enum lttng_condition_type condition_type
;
3670 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
3671 condition
, struct lttng_condition_buffer_usage
,
3674 if (use_condition
->threshold_bytes
.set
) {
3675 threshold
= use_condition
->threshold_bytes
.value
;
3678 * Threshold was expressed as a ratio.
3680 * TODO the threshold (in bytes) of conditions expressed
3681 * as a ratio of total buffer size could be cached to
3682 * forego this double-multiplication or it could be performed
3683 * as fixed-point math.
3685 * Note that caching should accomodate the case where the
3686 * condition applies to multiple channels (i.e. don't assume
3687 * that all channels matching my_chann* have the same size...)
3689 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3690 (double) buffer_capacity
);
3693 condition_type
= lttng_condition_get_type(condition
);
3694 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3695 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3696 threshold
, sample
->highest_usage
);
3699 * The low condition should only be triggered once _all_ of the
3700 * streams in a channel have gone below the "low" threshold.
3702 if (sample
->highest_usage
<= threshold
) {
3706 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3707 threshold
, sample
->highest_usage
);
3710 * For high buffer usage scenarios, we want to trigger whenever
3711 * _any_ of the streams has reached the "high" threshold.
3713 if (sample
->highest_usage
>= threshold
) {
3722 bool evaluate_session_consumed_size_condition(
3723 const struct lttng_condition
*condition
,
3724 uint64_t session_consumed_size
)
3727 const struct lttng_condition_session_consumed_size
*size_condition
=
3728 container_of(condition
,
3729 struct lttng_condition_session_consumed_size
,
3732 threshold
= size_condition
->consumed_threshold_bytes
.value
;
3733 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
3734 threshold
, session_consumed_size
);
3735 return session_consumed_size
>= threshold
;
3739 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
3740 struct lttng_evaluation
**evaluation
,
3741 const struct notification_thread_state
*state
,
3742 const struct channel_state_sample
*previous_sample
,
3743 const struct channel_state_sample
*latest_sample
,
3744 uint64_t previous_session_consumed_total
,
3745 uint64_t latest_session_consumed_total
,
3746 struct channel_info
*channel_info
)
3749 enum lttng_condition_type condition_type
;
3750 const bool previous_sample_available
= !!previous_sample
;
3751 bool previous_sample_result
= false;
3752 bool latest_sample_result
;
3754 condition_type
= lttng_condition_get_type(condition
);
3756 switch (condition_type
) {
3757 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3758 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3759 if (caa_likely(previous_sample_available
)) {
3760 previous_sample_result
=
3761 evaluate_buffer_usage_condition(condition
,
3762 previous_sample
, channel_info
->capacity
);
3764 latest_sample_result
= evaluate_buffer_usage_condition(
3765 condition
, latest_sample
,
3766 channel_info
->capacity
);
3768 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3769 if (caa_likely(previous_sample_available
)) {
3770 previous_sample_result
=
3771 evaluate_session_consumed_size_condition(
3773 previous_session_consumed_total
);
3775 latest_sample_result
=
3776 evaluate_session_consumed_size_condition(
3778 latest_session_consumed_total
);
3781 /* Unknown condition type; internal error. */
3785 if (!latest_sample_result
||
3786 (previous_sample_result
== latest_sample_result
)) {
3788 * Only trigger on a condition evaluation transition.
3790 * NOTE: This edge-triggered logic may not be appropriate for
3791 * future condition types.
3796 if (!evaluation
|| !latest_sample_result
) {
3800 switch (condition_type
) {
3801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3802 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3803 *evaluation
= lttng_evaluation_buffer_usage_create(
3805 latest_sample
->highest_usage
,
3806 channel_info
->capacity
);
3808 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3809 *evaluation
= lttng_evaluation_session_consumed_size_create(
3810 latest_session_consumed_total
);
3825 int client_notification_overflow(struct notification_client
*client
)
3828 const struct lttng_notification_channel_message msg
= {
3829 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
3832 ASSERT_LOCKED(client
->lock
);
3834 DBG("Dropping notification addressed to client (socket fd = %i)",
3836 if (client
->communication
.outbound
.dropped_notification
) {
3838 * The client already has a "notification dropped" message
3839 * in its outgoing queue. Nothing to do since all
3840 * of those messages are coalesced.
3845 client
->communication
.outbound
.dropped_notification
= true;
3846 ret
= lttng_dynamic_buffer_append(
3847 &client
->communication
.outbound
.buffer
, &msg
,
3850 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3857 static int client_handle_transmission_status_wrapper(
3858 struct notification_client
*client
,
3859 enum client_transmission_status status
,
3862 return client_handle_transmission_status(client
, status
,
3863 (struct notification_thread_state
*) user_data
);
3867 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
3868 const struct lttng_evaluation
*evaluation
,
3869 struct notification_client_list
* client_list
,
3870 struct notification_thread_state
*state
,
3871 uid_t object_uid
, gid_t object_gid
)
3873 return notification_client_list_send_evaluation(client_list
,
3874 lttng_trigger_get_const_condition(trigger
), evaluation
,
3875 lttng_trigger_get_credentials(trigger
),
3876 &(struct lttng_credentials
){
3877 .uid
= object_uid
, .gid
= object_gid
},
3878 client_handle_transmission_status_wrapper
, state
);
3882 int notification_client_list_send_evaluation(
3883 struct notification_client_list
*client_list
,
3884 const struct lttng_condition
*condition
,
3885 const struct lttng_evaluation
*evaluation
,
3886 const struct lttng_credentials
*trigger_creds
,
3887 const struct lttng_credentials
*source_object_creds
,
3888 report_client_transmission_result_cb client_report
,
3892 struct lttng_dynamic_buffer msg_buffer
;
3893 struct notification_client_list_element
*client_list_element
, *tmp
;
3894 const struct lttng_notification notification
= {
3895 .condition
= (struct lttng_condition
*) condition
,
3896 .evaluation
= (struct lttng_evaluation
*) evaluation
,
3898 struct lttng_notification_channel_message msg_header
= {
3899 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
3902 lttng_dynamic_buffer_init(&msg_buffer
);
3904 ret
= lttng_dynamic_buffer_append(&msg_buffer
, &msg_header
,
3905 sizeof(msg_header
));
3910 ret
= lttng_notification_serialize(¬ification
, &msg_buffer
);
3912 ERR("[notification-thread] Failed to serialize notification");
3917 /* Update payload size. */
3918 ((struct lttng_notification_channel_message
* ) msg_buffer
.data
)->size
=
3919 (uint32_t) (msg_buffer
.size
- sizeof(msg_header
));
3921 pthread_mutex_lock(&client_list
->lock
);
3922 cds_list_for_each_entry_safe(client_list_element
, tmp
,
3923 &client_list
->list
, node
) {
3924 enum client_transmission_status transmission_status
;
3925 struct notification_client
*client
=
3926 client_list_element
->client
;
3929 pthread_mutex_lock(&client
->lock
);
3930 if (source_object_creds
) {
3931 if (client
->uid
!= source_object_creds
->uid
&&
3932 client
->gid
!= source_object_creds
->gid
&&
3935 * Client is not allowed to monitor this
3938 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3943 /* TODO: what is the behavior for root client on non root
3944 * trigger? Since multiple triggers (different user) can have the same condition
3945 * but with different action group that can have each a notify.
3946 * Does the root client receive multiple notification for all
3947 * those triggers with the same condition or only notification
3948 * for triggers the root user configured?
3949 * For now we do the later. All users including the root user
3950 * can only receive notification from trigger it registered.
3952 if (client
->uid
!= trigger_creds
->uid
&& client
->gid
!= trigger_creds
->gid
) {
3953 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3957 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3958 client
->socket
, msg_buffer
.size
);
3959 if (client
->communication
.outbound
.buffer
.size
) {
3961 * Outgoing data is already buffered for this client;
3962 * drop the notification and enqueue a "dropped
3963 * notification" message if this is the first dropped
3964 * notification since the socket spilled-over to the
3967 ret
= client_notification_overflow(client
);
3973 ret
= lttng_dynamic_buffer_append_buffer(
3974 &client
->communication
.outbound
.buffer
,
3980 transmission_status
= client_flush_outgoing_queue(client
);
3981 ret
= client_report(client
, transmission_status
, user_data
);
3986 pthread_mutex_unlock(&client
->lock
);
3988 goto end_unlock_list
;
3994 pthread_mutex_unlock(&client_list
->lock
);
3996 lttng_dynamic_buffer_reset(&msg_buffer
);
4000 int perform_event_action_notify(struct notification_thread_state
*state
,
4001 const struct lttng_trigger
*trigger
,
4002 const struct lttng_trigger_notification
*notification
,
4003 const struct lttng_action
*action
)
4006 struct notification_client_list
*client_list
;
4007 struct lttng_evaluation
*evaluation
= NULL
;
4008 const struct lttng_credentials
*creds
= lttng_trigger_get_credentials(trigger
);
4011 * Check if any client is subscribed to the result of this
4014 client_list
= get_client_list_from_condition(state
, trigger
->condition
);
4015 assert(client_list
);
4016 if (cds_list_empty(&client_list
->list
)) {
4021 evaluation
= lttng_evaluation_event_rule_create(trigger
->name
);
4023 ERR("Failed to create event rule hit evaluation");
4028 /* Dispatch evaluation result to all clients.
4029 * Note that here the passed credentials are the one from the trigger,
4030 * this is because there is no internal object binded to the trigger per
4031 * see and the credential validation is done at the registration level
4032 * for the event rule based trigger. For a channel the credential
4033 * validation can only be done on notify since the trigger can be
4034 * registered before the channel/session creation.
4036 ret
= send_evaluation_to_clients(trigger
,
4037 evaluation
, client_list
, state
,
4040 lttng_evaluation_destroy(evaluation
);
4041 if (caa_unlikely(ret
)) {
4049 /* This can be called recursively, pass NULL for action on the first iteration */
4050 int perform_event_action(struct notification_thread_state
*state
,
4051 const struct lttng_trigger
*trigger
,
4052 const struct lttng_trigger_notification
*notification
,
4053 const struct lttng_action
*action
)
4056 enum lttng_action_type action_type
;
4061 action
= lttng_trigger_get_const_action(trigger
);
4064 action_type
= lttng_action_get_type_const(action
);
4065 DBG("Handling action %s for trigger id %s (%" PRIu64
")",
4066 lttng_action_type_string(action_type
), trigger
->name
,
4067 trigger
->key
.value
);
4069 switch (action_type
) {
4070 case LTTNG_ACTION_TYPE_GROUP
:
4072 /* Recurse into the group */
4073 const struct lttng_action
*tmp
= NULL
;
4074 unsigned int count
= 0;
4075 (void) lttng_action_group_get_count(action
, &count
);
4076 for (int i
= 0; i
< count
; i
++) {
4077 tmp
= lttng_action_group_get_at_index_const(action
, i
);
4079 ret
= perform_event_action(state
, trigger
, notification
, tmp
);
4086 case LTTNG_ACTION_TYPE_NOTIFY
:
4088 ret
= perform_event_action_notify(state
, trigger
, notification
, action
);
4098 int handle_notification_thread_event(struct notification_thread_state
*state
,
4100 enum lttng_domain_type domain
)
4103 struct lttng_ust_trigger_notification ust_notification
;
4104 uint64_t kernel_notification
;
4105 struct cds_lfht_node
*node
;
4106 struct cds_lfht_iter iter
;
4107 struct notification_trigger_tokens_ht_element
*element
;
4108 struct lttng_trigger_notification notification
;
4109 void *reception_buffer
;
4110 size_t reception_size
;
4111 enum action_executor_status executor_status
;
4112 struct notification_client_list
*client_list
= NULL
;
4114 notification
.type
= domain
;
4117 case LTTNG_DOMAIN_UST
:
4118 reception_buffer
= (void *) &ust_notification
;
4119 reception_size
= sizeof(ust_notification
);
4120 notification
.u
.ust
= &ust_notification
;
4122 case LTTNG_DOMAIN_KERNEL
:
4123 reception_buffer
= (void *) &kernel_notification
;
4124 reception_size
= sizeof(kernel_notification
);
4125 notification
.u
.kernel
= &kernel_notification
;
4132 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4133 * ensuring that read/write of sampling messages are atomic.
4135 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4137 ret
= lttng_read(pipe
, reception_buffer
, reception_size
);
4138 if (ret
!= reception_size
) {
4139 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4141 /* TODO: Should this error out completly.
4142 * This can happen when an app is killed as of today
4143 * ret = -1 cause the whole thread to die and fuck up
4150 case LTTNG_DOMAIN_UST
:
4151 notification
.id
= ust_notification
.id
;
4153 case LTTNG_DOMAIN_KERNEL
:
4154 notification
.id
= kernel_notification
;
4160 /* Find triggers associated with this token. */
4162 cds_lfht_lookup(state
->trigger_tokens_ht
,
4163 hash_key_u64(¬ification
.id
, lttng_ht_seed
), match_trigger_token
,
4164 ¬ification
.id
, &iter
);
4165 node
= cds_lfht_iter_get_node(&iter
);
4166 if (caa_unlikely(!node
)) {
4167 /* TODO: is this an error? This might happen if the receive side
4168 * is slow to process event from source and that the trigger was
4169 * removed but the app still kicking. This yield another
4170 * question on the trigger lifetime and when we can remove a
4171 * trigger. How to guarantee that all event with the token idea
4172 * have be processed? Do we want to provide this guarantee?
4174 * Update: I have encountered this when using a trigger on
4175 * sched_switch and then removing it. The frequency is quite
4176 * high hence we en up exactly in the mentionned scenario.
4177 * AFAIK this might be the best way to handle this.
4182 element
= caa_container_of(node
,
4183 struct notification_trigger_tokens_ht_element
,
4186 if (!lttng_trigger_is_ready_to_fire(element
->trigger
)) {
4191 client_list
= get_client_list_from_condition(state
,
4192 lttng_trigger_get_const_condition(element
->trigger
));
4193 executor_status
= action_executor_enqueue(
4194 state
->executor
, element
->trigger
, client_list
);
4195 switch (executor_status
) {
4196 case ACTION_EXECUTOR_STATUS_OK
:
4199 case ACTION_EXECUTOR_STATUS_OVERFLOW
:
4201 struct notification_client_list_element
*client_list_element
,
4205 * Not a fatal error; this is expected and simply means the
4206 * executor has too much work queued already.
4214 /* Warn clients that a notification (or more) was dropped. */
4215 pthread_mutex_lock(&client_list
->lock
);
4216 cds_list_for_each_entry_safe(client_list_element
, tmp
,
4217 &client_list
->list
, node
) {
4218 enum client_transmission_status transmission_status
;
4219 struct notification_client
*client
=
4220 client_list_element
->client
;
4222 pthread_mutex_lock(&client
->lock
);
4223 ret
= client_notification_overflow(client
);
4229 transmission_status
=
4230 client_flush_outgoing_queue(client
);
4231 ret
= client_handle_transmission_status(
4232 client
, transmission_status
, state
);
4238 pthread_mutex_unlock(&client
->lock
);
4243 pthread_mutex_lock(&client_list
->lock
);
4246 case ACTION_EXECUTOR_STATUS_ERROR
:
4247 /* Fatal error, shut down everything. */
4248 ERR("Fatal error encoutered while enqueuing action");
4252 /* Unhandled error. */
4257 notification_client_list_put(client_list
);
4263 int handle_notification_thread_channel_sample(
4264 struct notification_thread_state
*state
, int pipe
,
4265 enum lttng_domain_type domain
)
4268 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
4269 struct channel_info
*channel_info
;
4270 struct cds_lfht_node
*node
;
4271 struct cds_lfht_iter iter
;
4272 struct lttng_channel_trigger_list
*trigger_list
;
4273 struct lttng_trigger_list_element
*trigger_list_element
;
4274 bool previous_sample_available
= false;
4275 struct channel_state_sample previous_sample
, latest_sample
;
4276 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
4279 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4280 * ensuring that read/write of sampling messages are atomic.
4282 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
4283 if (ret
!= sizeof(sample_msg
)) {
4284 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4291 latest_sample
.key
.key
= sample_msg
.key
;
4292 latest_sample
.key
.domain
= domain
;
4293 latest_sample
.highest_usage
= sample_msg
.highest
;
4294 latest_sample
.lowest_usage
= sample_msg
.lowest
;
4295 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
4299 /* Retrieve the channel's informations */
4300 cds_lfht_lookup(state
->channels_ht
,
4301 hash_channel_key(&latest_sample
.key
),
4305 node
= cds_lfht_iter_get_node(&iter
);
4306 if (caa_unlikely(!node
)) {
4308 * Not an error since the consumer can push a sample to the pipe
4309 * and the rest of the session daemon could notify us of the
4310 * channel's destruction before we get a chance to process that
4313 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
4314 latest_sample
.key
.key
,
4315 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
4319 channel_info
= caa_container_of(node
, struct channel_info
,
4321 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
4323 latest_sample
.key
.key
,
4324 channel_info
->session_info
->name
,
4325 latest_sample
.highest_usage
,
4326 latest_sample
.lowest_usage
,
4327 latest_sample
.channel_total_consumed
);
4329 previous_session_consumed_total
=
4330 channel_info
->session_info
->consumed_data_size
;
4332 /* Retrieve the channel's last sample, if it exists, and update it. */
4333 cds_lfht_lookup(state
->channel_state_ht
,
4334 hash_channel_key(&latest_sample
.key
),
4335 match_channel_state_sample
,
4338 node
= cds_lfht_iter_get_node(&iter
);
4339 if (caa_likely(node
)) {
4340 struct channel_state_sample
*stored_sample
;
4342 /* Update the sample stored. */
4343 stored_sample
= caa_container_of(node
,
4344 struct channel_state_sample
,
4345 channel_state_ht_node
);
4347 memcpy(&previous_sample
, stored_sample
,
4348 sizeof(previous_sample
));
4349 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
4350 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
4351 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
4352 previous_sample_available
= true;
4354 latest_session_consumed_total
=
4355 previous_session_consumed_total
+
4356 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
4359 * This is the channel's first sample, allocate space for and
4360 * store the new sample.
4362 struct channel_state_sample
*stored_sample
;
4364 stored_sample
= zmalloc(sizeof(*stored_sample
));
4365 if (!stored_sample
) {
4370 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
4371 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
4372 cds_lfht_add(state
->channel_state_ht
,
4373 hash_channel_key(&stored_sample
->key
),
4374 &stored_sample
->channel_state_ht_node
);
4376 latest_session_consumed_total
=
4377 previous_session_consumed_total
+
4378 latest_sample
.channel_total_consumed
;
4381 channel_info
->session_info
->consumed_data_size
=
4382 latest_session_consumed_total
;
4384 /* Find triggers associated with this channel. */
4385 cds_lfht_lookup(state
->channel_triggers_ht
,
4386 hash_channel_key(&latest_sample
.key
),
4387 match_channel_trigger_list
,
4390 node
= cds_lfht_iter_get_node(&iter
);
4391 if (caa_likely(!node
)) {
4395 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
4396 channel_triggers_ht_node
);
4397 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
4399 const struct lttng_condition
*condition
;
4400 const struct lttng_action
*action
;
4401 struct lttng_trigger
*trigger
;
4402 struct notification_client_list
*client_list
= NULL
;
4403 struct lttng_evaluation
*evaluation
= NULL
;
4404 bool client_list_is_empty
;
4407 trigger
= trigger_list_element
->trigger
;
4408 condition
= lttng_trigger_get_const_condition(trigger
);
4410 action
= lttng_trigger_get_const_action(trigger
);
4412 if (!lttng_trigger_is_ready_to_fire(trigger
)) {
4416 /* Notify actions are the only type currently supported. */
4417 /* TODO support other type of action */
4418 assert(lttng_action_get_type_const(action
) ==
4419 LTTNG_ACTION_TYPE_NOTIFY
);
4422 * Check if any client is subscribed to the result of this
4425 client_list
= get_client_list_from_condition(state
, condition
);
4426 assert(client_list
);
4427 client_list_is_empty
= cds_list_empty(&client_list
->list
);
4428 if (client_list_is_empty
) {
4430 * No clients interested in the evaluation's result,
4436 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
4437 previous_sample_available
? &previous_sample
: NULL
,
4439 previous_session_consumed_total
,
4440 latest_session_consumed_total
,
4442 if (caa_unlikely(ret
)) {
4446 if (caa_likely(!evaluation
)) {
4450 /* Dispatch evaluation result to all clients. */
4451 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
4452 evaluation
, client_list
, state
,
4453 channel_info
->session_info
->uid
,
4454 channel_info
->session_info
->gid
);
4455 lttng_evaluation_destroy(evaluation
);
4457 notification_client_list_put(client_list
);
4458 if (caa_unlikely(ret
)) {