2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 * SPDX-License-Identifier: GPL-2.0-only
10 #include <urcu/rculfhash.h>
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
44 enum lttng_object_type
{
45 LTTNG_OBJECT_TYPE_UNKNOWN
,
46 LTTNG_OBJECT_TYPE_NONE
,
47 LTTNG_OBJECT_TYPE_CHANNEL
,
48 LTTNG_OBJECT_TYPE_SESSION
,
51 struct lttng_trigger_list_element
{
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger
*trigger
;
54 struct cds_list_head node
;
57 struct lttng_channel_trigger_list
{
58 struct channel_key channel_key
;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list
;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node
;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node
;
68 * List of triggers applying to a given session.
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
76 struct lttng_session_trigger_list
{
78 * Not owned by this; points to the session_info structure's
81 const char *session_name
;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list
;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node
;
87 * Weak reference to the notification system's session triggers
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
103 struct cds_lfht
*session_triggers_ht
;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node
;
108 struct lttng_trigger_ht_element
{
109 struct lttng_trigger
*trigger
;
110 struct cds_lfht_node node
;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node
;
115 struct lttng_condition_list_element
{
116 struct lttng_condition
*condition
;
117 struct cds_list_head node
;
120 struct notification_client_list_element
{
121 struct notification_client
*client
;
122 struct cds_list_head node
;
126 * Thread safety of notification_client and notification_client_list.
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
144 struct notification_client_list
{
145 pthread_mutex_t lock
;
147 const struct lttng_trigger
*trigger
;
148 struct cds_list_head list
;
149 /* Weak reference to container. */
150 struct cds_lfht
*notification_trigger_clients_ht
;
151 struct cds_lfht_node notification_trigger_clients_ht_node
;
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node
;
156 struct notification_client
{
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock
;
159 notification_client_id id
;
161 /* Client protocol version. */
162 uint8_t major
, minor
;
166 * Indicates if the credentials and versions of the client have been
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
175 struct cds_list_head condition_list
;
176 struct cds_lfht_node client_socket_ht_node
;
177 struct cds_lfht_node client_id_ht_node
;
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
193 struct lttng_dynamic_buffer buffer
;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive
;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type
;
199 * Indicates whether or not credentials are expected
204 * Indicates whether or not credentials were received
208 /* Only used during credentials reception. */
209 lttng_sock_cred creds
;
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
217 * A notification is dropped whenever the buffer is not
220 bool dropped_notification
;
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
228 bool queued_command_reply
;
229 struct lttng_dynamic_buffer buffer
;
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node
;
236 struct channel_state_sample
{
237 struct channel_key key
;
238 struct cds_lfht_node channel_state_ht_node
;
239 uint64_t highest_usage
;
240 uint64_t lowest_usage
;
241 uint64_t channel_total_consumed
;
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node
;
246 static unsigned long hash_channel_key(struct channel_key
*key
);
247 static int evaluate_buffer_condition(const struct lttng_condition
*condition
,
248 struct lttng_evaluation
**evaluation
,
249 const struct notification_thread_state
*state
,
250 const struct channel_state_sample
*previous_sample
,
251 const struct channel_state_sample
*latest_sample
,
252 uint64_t previous_session_consumed_total
,
253 uint64_t latest_session_consumed_total
,
254 struct channel_info
*channel_info
);
256 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
257 const struct lttng_evaluation
*evaluation
,
258 struct notification_client_list
*client_list
,
259 struct notification_thread_state
*state
,
260 uid_t channel_uid
, gid_t channel_gid
);
263 /* session_info API */
265 void session_info_destroy(void *_data
);
267 void session_info_get(struct session_info
*session_info
);
269 void session_info_put(struct session_info
*session_info
);
271 struct session_info
*session_info_create(const char *name
,
272 uid_t uid
, gid_t gid
,
273 struct lttng_session_trigger_list
*trigger_list
,
274 struct cds_lfht
*sessions_ht
);
276 void session_info_add_channel(struct session_info
*session_info
,
277 struct channel_info
*channel_info
);
279 void session_info_remove_channel(struct session_info
*session_info
,
280 struct channel_info
*channel_info
);
282 /* lttng_session_trigger_list API */
284 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
285 const char *session_name
,
286 struct cds_lfht
*session_triggers_ht
);
288 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
289 const struct notification_thread_state
*state
,
290 const char *session_name
);
292 void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list
*list
);
295 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
296 const struct lttng_trigger
*trigger
);
300 int match_client_socket(struct cds_lfht_node
*node
, const void *key
)
302 /* This double-cast is intended to supress pointer-to-cast warning. */
303 const int socket
= (int) (intptr_t) key
;
304 const struct notification_client
*client
= caa_container_of(node
,
305 struct notification_client
, client_socket_ht_node
);
307 return client
->socket
== socket
;
311 int match_client_id(struct cds_lfht_node
*node
, const void *key
)
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id
= *((notification_client_id
*) key
);
315 const struct notification_client
*client
= caa_container_of(
316 node
, struct notification_client
, client_id_ht_node
);
318 return client
->id
== id
;
322 int match_channel_trigger_list(struct cds_lfht_node
*node
, const void *key
)
324 struct channel_key
*channel_key
= (struct channel_key
*) key
;
325 struct lttng_channel_trigger_list
*trigger_list
;
327 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
328 channel_triggers_ht_node
);
330 return !!((channel_key
->key
== trigger_list
->channel_key
.key
) &&
331 (channel_key
->domain
== trigger_list
->channel_key
.domain
));
335 int match_session_trigger_list(struct cds_lfht_node
*node
, const void *key
)
337 const char *session_name
= (const char *) key
;
338 struct lttng_session_trigger_list
*trigger_list
;
340 trigger_list
= caa_container_of(node
, struct lttng_session_trigger_list
,
341 session_triggers_ht_node
);
343 return !!(strcmp(trigger_list
->session_name
, session_name
) == 0);
347 int match_channel_state_sample(struct cds_lfht_node
*node
, const void *key
)
349 struct channel_key
*channel_key
= (struct channel_key
*) key
;
350 struct channel_state_sample
*sample
;
352 sample
= caa_container_of(node
, struct channel_state_sample
,
353 channel_state_ht_node
);
355 return !!((channel_key
->key
== sample
->key
.key
) &&
356 (channel_key
->domain
== sample
->key
.domain
));
360 int match_channel_info(struct cds_lfht_node
*node
, const void *key
)
362 struct channel_key
*channel_key
= (struct channel_key
*) key
;
363 struct channel_info
*channel_info
;
365 channel_info
= caa_container_of(node
, struct channel_info
,
368 return !!((channel_key
->key
== channel_info
->key
.key
) &&
369 (channel_key
->domain
== channel_info
->key
.domain
));
373 int match_condition(struct cds_lfht_node
*node
, const void *key
)
375 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
376 struct lttng_trigger_ht_element
*trigger
;
377 struct lttng_condition
*condition
;
379 trigger
= caa_container_of(node
, struct lttng_trigger_ht_element
,
381 condition
= lttng_trigger_get_condition(trigger
->trigger
);
384 return !!lttng_condition_is_equal(condition_key
, condition
);
388 int match_client_list_condition(struct cds_lfht_node
*node
, const void *key
)
390 struct lttng_condition
*condition_key
= (struct lttng_condition
*) key
;
391 struct notification_client_list
*client_list
;
392 const struct lttng_condition
*condition
;
394 assert(condition_key
);
396 client_list
= caa_container_of(node
, struct notification_client_list
,
397 notification_trigger_clients_ht_node
);
398 condition
= lttng_trigger_get_const_condition(client_list
->trigger
);
400 return !!lttng_condition_is_equal(condition_key
, condition
);
404 int match_session(struct cds_lfht_node
*node
, const void *key
)
406 const char *name
= key
;
407 struct session_info
*session_info
= caa_container_of(
408 node
, struct session_info
, sessions_ht_node
);
410 return !strcmp(session_info
->name
, name
);
414 unsigned long lttng_condition_buffer_usage_hash(
415 const struct lttng_condition
*_condition
)
418 unsigned long condition_type
;
419 struct lttng_condition_buffer_usage
*condition
;
421 condition
= container_of(_condition
,
422 struct lttng_condition_buffer_usage
, parent
);
424 condition_type
= (unsigned long) condition
->parent
.type
;
425 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
426 if (condition
->session_name
) {
427 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
429 if (condition
->channel_name
) {
430 hash
^= hash_key_str(condition
->channel_name
, lttng_ht_seed
);
432 if (condition
->domain
.set
) {
433 hash
^= hash_key_ulong(
434 (void *) condition
->domain
.type
,
437 if (condition
->threshold_ratio
.set
) {
440 val
= condition
->threshold_ratio
.value
* (double) UINT32_MAX
;
441 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
442 } else if (condition
->threshold_bytes
.set
) {
445 val
= condition
->threshold_bytes
.value
;
446 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
452 unsigned long lttng_condition_session_consumed_size_hash(
453 const struct lttng_condition
*_condition
)
456 unsigned long condition_type
=
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
;
458 struct lttng_condition_session_consumed_size
*condition
;
461 condition
= container_of(_condition
,
462 struct lttng_condition_session_consumed_size
, parent
);
464 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
465 if (condition
->session_name
) {
466 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
468 val
= condition
->consumed_threshold_bytes
.value
;
469 hash
^= hash_key_u64(&val
, lttng_ht_seed
);
474 unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition
*_condition
)
477 unsigned long hash
, condition_type
;
478 struct lttng_condition_session_rotation
*condition
;
480 condition
= container_of(_condition
,
481 struct lttng_condition_session_rotation
, parent
);
482 condition_type
= (unsigned long) condition
->parent
.type
;
483 hash
= hash_key_ulong((void *) condition_type
, lttng_ht_seed
);
484 assert(condition
->session_name
);
485 hash
^= hash_key_str(condition
->session_name
, lttng_ht_seed
);
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
495 unsigned long lttng_condition_hash(const struct lttng_condition
*condition
)
497 switch (condition
->type
) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
500 return lttng_condition_buffer_usage_hash(condition
);
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
502 return lttng_condition_session_consumed_size_hash(condition
);
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
505 return lttng_condition_session_rotation_hash(condition
);
507 ERR("[notification-thread] Unexpected condition type caught");
513 unsigned long hash_channel_key(struct channel_key
*key
)
515 unsigned long key_hash
= hash_key_u64(&key
->key
, lttng_ht_seed
);
516 unsigned long domain_hash
= hash_key_ulong(
517 (void *) (unsigned long) key
->domain
, lttng_ht_seed
);
519 return key_hash
^ domain_hash
;
523 unsigned long hash_client_socket(int socket
)
525 return hash_key_ulong((void *) (unsigned long) socket
, lttng_ht_seed
);
529 unsigned long hash_client_id(notification_client_id id
)
531 return hash_key_u64(&id
, lttng_ht_seed
);
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
543 enum lttng_object_type
get_condition_binding_object(
544 const struct lttng_condition
*condition
)
546 switch (lttng_condition_get_type(condition
)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
550 return LTTNG_OBJECT_TYPE_CHANNEL
;
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
553 return LTTNG_OBJECT_TYPE_SESSION
;
555 return LTTNG_OBJECT_TYPE_UNKNOWN
;
560 void free_channel_info_rcu(struct rcu_head
*node
)
562 free(caa_container_of(node
, struct channel_info
, rcu_node
));
566 void channel_info_destroy(struct channel_info
*channel_info
)
572 if (channel_info
->session_info
) {
573 session_info_remove_channel(channel_info
->session_info
,
575 session_info_put(channel_info
->session_info
);
577 if (channel_info
->name
) {
578 free(channel_info
->name
);
580 call_rcu(&channel_info
->rcu_node
, free_channel_info_rcu
);
584 void free_session_info_rcu(struct rcu_head
*node
)
586 free(caa_container_of(node
, struct session_info
, rcu_node
));
589 /* Don't call directly, use the ref-counting mechanism. */
591 void session_info_destroy(void *_data
)
593 struct session_info
*session_info
= _data
;
596 assert(session_info
);
597 if (session_info
->channel_infos_ht
) {
598 ret
= cds_lfht_destroy(session_info
->channel_infos_ht
, NULL
);
600 ERR("[notification-thread] Failed to destroy channel information hash table");
603 lttng_session_trigger_list_destroy(session_info
->trigger_list
);
606 cds_lfht_del(session_info
->sessions_ht
,
607 &session_info
->sessions_ht_node
);
609 free(session_info
->name
);
610 call_rcu(&session_info
->rcu_node
, free_session_info_rcu
);
614 void session_info_get(struct session_info
*session_info
)
619 lttng_ref_get(&session_info
->ref
);
623 void session_info_put(struct session_info
*session_info
)
628 lttng_ref_put(&session_info
->ref
);
632 struct session_info
*session_info_create(const char *name
, uid_t uid
, gid_t gid
,
633 struct lttng_session_trigger_list
*trigger_list
,
634 struct cds_lfht
*sessions_ht
)
636 struct session_info
*session_info
;
640 session_info
= zmalloc(sizeof(*session_info
));
644 lttng_ref_init(&session_info
->ref
, session_info_destroy
);
646 session_info
->channel_infos_ht
= cds_lfht_new(DEFAULT_HT_SIZE
,
647 1, 0, CDS_LFHT_AUTO_RESIZE
| CDS_LFHT_ACCOUNTING
, NULL
);
648 if (!session_info
->channel_infos_ht
) {
652 cds_lfht_node_init(&session_info
->sessions_ht_node
);
653 session_info
->name
= strdup(name
);
654 if (!session_info
->name
) {
657 session_info
->uid
= uid
;
658 session_info
->gid
= gid
;
659 session_info
->trigger_list
= trigger_list
;
660 session_info
->sessions_ht
= sessions_ht
;
664 session_info_put(session_info
);
669 void session_info_add_channel(struct session_info
*session_info
,
670 struct channel_info
*channel_info
)
673 cds_lfht_add(session_info
->channel_infos_ht
,
674 hash_channel_key(&channel_info
->key
),
675 &channel_info
->session_info_channels_ht_node
);
680 void session_info_remove_channel(struct session_info
*session_info
,
681 struct channel_info
*channel_info
)
684 cds_lfht_del(session_info
->channel_infos_ht
,
685 &channel_info
->session_info_channels_ht_node
);
690 struct channel_info
*channel_info_create(const char *channel_name
,
691 struct channel_key
*channel_key
, uint64_t channel_capacity
,
692 struct session_info
*session_info
)
694 struct channel_info
*channel_info
= zmalloc(sizeof(*channel_info
));
700 cds_lfht_node_init(&channel_info
->channels_ht_node
);
701 cds_lfht_node_init(&channel_info
->session_info_channels_ht_node
);
702 memcpy(&channel_info
->key
, channel_key
, sizeof(*channel_key
));
703 channel_info
->capacity
= channel_capacity
;
705 channel_info
->name
= strdup(channel_name
);
706 if (!channel_info
->name
) {
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
715 session_info_get(session_info
);
716 session_info_add_channel(session_info
, channel_info
);
717 channel_info
->session_info
= session_info
;
721 channel_info_destroy(channel_info
);
726 bool notification_client_list_get(struct notification_client_list
*list
)
728 return urcu_ref_get_unless_zero(&list
->ref
);
732 void free_notification_client_list_rcu(struct rcu_head
*node
)
734 free(caa_container_of(node
, struct notification_client_list
,
739 void notification_client_list_release(struct urcu_ref
*list_ref
)
741 struct notification_client_list
*list
=
742 container_of(list_ref
, typeof(*list
), ref
);
743 struct notification_client_list_element
*client_list_element
, *tmp
;
745 if (list
->notification_trigger_clients_ht
) {
747 cds_lfht_del(list
->notification_trigger_clients_ht
,
748 &list
->notification_trigger_clients_ht_node
);
750 list
->notification_trigger_clients_ht
= NULL
;
752 cds_list_for_each_entry_safe(client_list_element
, tmp
,
754 free(client_list_element
);
756 pthread_mutex_destroy(&list
->lock
);
757 call_rcu(&list
->rcu_node
, free_notification_client_list_rcu
);
761 struct notification_client_list
*notification_client_list_create(
762 const struct lttng_trigger
*trigger
)
764 struct notification_client_list
*client_list
=
765 zmalloc(sizeof(*client_list
));
770 pthread_mutex_init(&client_list
->lock
, NULL
);
771 urcu_ref_init(&client_list
->ref
);
772 cds_lfht_node_init(&client_list
->notification_trigger_clients_ht_node
);
773 CDS_INIT_LIST_HEAD(&client_list
->list
);
774 client_list
->trigger
= trigger
;
780 void publish_notification_client_list(
781 struct notification_thread_state
*state
,
782 struct notification_client_list
*list
)
784 const struct lttng_condition
*condition
=
785 lttng_trigger_get_const_condition(list
->trigger
);
787 assert(!list
->notification_trigger_clients_ht
);
789 list
->notification_trigger_clients_ht
=
790 state
->notification_trigger_clients_ht
;
793 cds_lfht_add(state
->notification_trigger_clients_ht
,
794 lttng_condition_hash(condition
),
795 &list
->notification_trigger_clients_ht_node
);
800 void notification_client_list_put(struct notification_client_list
*list
)
805 return urcu_ref_put(&list
->ref
, notification_client_list_release
);
808 /* Provides a reference to the returned list. */
810 struct notification_client_list
*get_client_list_from_condition(
811 struct notification_thread_state
*state
,
812 const struct lttng_condition
*condition
)
814 struct cds_lfht_node
*node
;
815 struct cds_lfht_iter iter
;
816 struct notification_client_list
*list
= NULL
;
819 cds_lfht_lookup(state
->notification_trigger_clients_ht
,
820 lttng_condition_hash(condition
),
821 match_client_list_condition
,
824 node
= cds_lfht_iter_get_node(&iter
);
826 list
= container_of(node
, struct notification_client_list
,
827 notification_trigger_clients_ht_node
);
828 list
= notification_client_list_get(list
) ? list
: NULL
;
836 int evaluate_channel_condition_for_client(
837 const struct lttng_condition
*condition
,
838 struct notification_thread_state
*state
,
839 struct lttng_evaluation
**evaluation
,
840 uid_t
*session_uid
, gid_t
*session_gid
)
843 struct cds_lfht_iter iter
;
844 struct cds_lfht_node
*node
;
845 struct channel_info
*channel_info
= NULL
;
846 struct channel_key
*channel_key
= NULL
;
847 struct channel_state_sample
*last_sample
= NULL
;
848 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
852 /* Find the channel associated with the condition. */
853 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
,
854 channel_trigger_list
, channel_triggers_ht_node
) {
855 struct lttng_trigger_list_element
*element
;
857 cds_list_for_each_entry(element
, &channel_trigger_list
->list
, node
) {
858 const struct lttng_condition
*current_condition
=
859 lttng_trigger_get_const_condition(
862 assert(current_condition
);
863 if (!lttng_condition_is_equal(condition
,
864 current_condition
)) {
868 /* Found the trigger, save the channel key. */
869 channel_key
= &channel_trigger_list
->channel_key
;
873 /* The channel key was found stop iteration. */
879 /* No channel found; normal exit. */
880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state
->channels_ht
,
887 hash_channel_key(channel_key
),
891 node
= cds_lfht_iter_get_node(&iter
);
893 channel_info
= caa_container_of(node
, struct channel_info
,
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state
->channel_state_ht
,
898 hash_channel_key(channel_key
),
899 match_channel_state_sample
,
902 node
= cds_lfht_iter_get_node(&iter
);
904 last_sample
= caa_container_of(node
,
905 struct channel_state_sample
,
906 channel_state_ht_node
);
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
914 ret
= evaluate_buffer_condition(condition
, evaluation
, state
,
916 0, channel_info
->session_info
->consumed_data_size
,
919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
923 *session_uid
= channel_info
->session_info
->uid
;
924 *session_gid
= channel_info
->session_info
->gid
;
931 const char *get_condition_session_name(const struct lttng_condition
*condition
)
933 const char *session_name
= NULL
;
934 enum lttng_condition_status status
;
936 switch (lttng_condition_get_type(condition
)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
939 status
= lttng_condition_buffer_usage_get_session_name(
940 condition
, &session_name
);
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
943 status
= lttng_condition_session_consumed_size_get_session_name(
944 condition
, &session_name
);
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
948 status
= lttng_condition_session_rotation_get_session_name(
949 condition
, &session_name
);
954 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
963 int evaluate_session_condition_for_client(
964 const struct lttng_condition
*condition
,
965 struct notification_thread_state
*state
,
966 struct lttng_evaluation
**evaluation
,
967 uid_t
*session_uid
, gid_t
*session_gid
)
970 struct cds_lfht_iter iter
;
971 struct cds_lfht_node
*node
;
972 const char *session_name
;
973 struct session_info
*session_info
= NULL
;
976 session_name
= get_condition_session_name(condition
);
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state
->sessions_ht
,
980 hash_key_str(session_name
, lttng_ht_seed
),
984 node
= cds_lfht_iter_get_node(&iter
);
986 DBG("[notification-thread] No known session matching name \"%s\"",
992 session_info
= caa_container_of(node
, struct session_info
,
994 session_info_get(session_info
);
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
1000 switch (lttng_condition_get_type(condition
)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1002 if (!session_info
->rotation
.ongoing
) {
1004 goto end_session_put
;
1007 *evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1008 session_info
->rotation
.id
);
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info
->name
);
1014 goto end_session_put
;
1020 goto end_session_put
;
1023 *session_uid
= session_info
->uid
;
1024 *session_gid
= session_info
->gid
;
1027 session_info_put(session_info
);
1034 int evaluate_condition_for_client(const struct lttng_trigger
*trigger
,
1035 const struct lttng_condition
*condition
,
1036 struct notification_client
*client
,
1037 struct notification_thread_state
*state
)
1040 struct lttng_evaluation
*evaluation
= NULL
;
1041 struct notification_client_list client_list
= {
1042 .lock
= PTHREAD_MUTEX_INITIALIZER
,
1044 struct notification_client_list_element client_list_element
= { 0 };
1045 uid_t object_uid
= 0;
1046 gid_t object_gid
= 0;
1053 switch (get_condition_binding_object(condition
)) {
1054 case LTTNG_OBJECT_TYPE_SESSION
:
1055 ret
= evaluate_session_condition_for_client(condition
, state
,
1056 &evaluation
, &object_uid
, &object_gid
);
1058 case LTTNG_OBJECT_TYPE_CHANNEL
:
1059 ret
= evaluate_channel_condition_for_client(condition
, state
,
1060 &evaluation
, &object_uid
, &object_gid
);
1062 case LTTNG_OBJECT_TYPE_NONE
:
1065 case LTTNG_OBJECT_TYPE_UNKNOWN
:
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1082 * Create a temporary client list with the client currently
1085 cds_lfht_node_init(&client_list
.notification_trigger_clients_ht_node
);
1086 CDS_INIT_LIST_HEAD(&client_list
.list
);
1087 client_list
.trigger
= trigger
;
1089 CDS_INIT_LIST_HEAD(&client_list_element
.node
);
1090 client_list_element
.client
= client
;
1091 cds_list_add(&client_list_element
.node
, &client_list
.list
);
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret
= send_evaluation_to_clients(trigger
, evaluation
, &client_list
,
1096 state
, object_uid
, object_gid
);
1103 int notification_thread_client_subscribe(struct notification_client
*client
,
1104 struct lttng_condition
*condition
,
1105 struct notification_thread_state
*state
,
1106 enum lttng_notification_channel_status
*_status
)
1109 struct notification_client_list
*client_list
= NULL
;
1110 struct lttng_condition_list_element
*condition_list_element
= NULL
;
1111 struct notification_client_list_element
*client_list_element
= NULL
;
1112 enum lttng_notification_channel_status status
=
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1116 * Ensure that the client has not already subscribed to this condition
1119 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
, node
) {
1120 if (lttng_condition_is_equal(condition_list_element
->condition
,
1122 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED
;
1127 condition_list_element
= zmalloc(sizeof(*condition_list_element
));
1128 if (!condition_list_element
) {
1132 client_list_element
= zmalloc(sizeof(*client_list_element
));
1133 if (!client_list_element
) {
1139 * Add the newly-subscribed condition to the client's subscription list.
1141 CDS_INIT_LIST_HEAD(&condition_list_element
->node
);
1142 condition_list_element
->condition
= condition
;
1143 cds_list_add(&condition_list_element
->node
, &client
->condition_list
);
1145 client_list
= get_client_list_from_condition(state
, condition
);
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1152 free(client_list_element
);
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
1165 if (evaluate_condition_for_client(client_list
->trigger
, condition
,
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1169 free(client_list_element
);
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1178 client_list_element
->client
= client
;
1179 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
1181 pthread_mutex_lock(&client_list
->lock
);
1182 cds_list_add(&client_list_element
->node
, &client_list
->list
);
1183 pthread_mutex_unlock(&client_list
->lock
);
1189 notification_client_list_put(client_list
);
1193 free(condition_list_element
);
1194 free(client_list_element
);
1199 int notification_thread_client_unsubscribe(
1200 struct notification_client
*client
,
1201 struct lttng_condition
*condition
,
1202 struct notification_thread_state
*state
,
1203 enum lttng_notification_channel_status
*_status
)
1205 struct notification_client_list
*client_list
;
1206 struct lttng_condition_list_element
*condition_list_element
,
1208 struct notification_client_list_element
*client_list_element
,
1210 bool condition_found
= false;
1211 enum lttng_notification_channel_status status
=
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element
, condition_tmp
,
1216 &client
->condition_list
, node
) {
1217 if (!lttng_condition_is_equal(condition_list_element
->condition
,
1222 cds_list_del(&condition_list_element
->node
);
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1228 if (condition
!= condition_list_element
->condition
) {
1229 lttng_condition_destroy(
1230 condition_list_element
->condition
);
1232 free(condition_list_element
);
1233 condition_found
= true;
1237 if (!condition_found
) {
1238 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION
;
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1246 client_list
= get_client_list_from_condition(state
, condition
);
1251 pthread_mutex_lock(&client_list
->lock
);
1252 cds_list_for_each_entry_safe(client_list_element
, client_tmp
,
1253 &client_list
->list
, node
) {
1254 if (client_list_element
->client
->id
!= client
->id
) {
1257 cds_list_del(&client_list_element
->node
);
1258 free(client_list_element
);
1261 pthread_mutex_unlock(&client_list
->lock
);
1262 notification_client_list_put(client_list
);
1265 lttng_condition_destroy(condition
);
1273 void free_notification_client_rcu(struct rcu_head
*node
)
1275 free(caa_container_of(node
, struct notification_client
, rcu_node
));
1279 void notification_client_destroy(struct notification_client
*client
,
1280 struct notification_thread_state
*state
)
1287 * The client object is not reachable by other threads, no need to lock
1290 if (client
->socket
>= 0) {
1291 (void) lttcomm_close_unix_sock(client
->socket
);
1292 client
->socket
= -1;
1294 client
->communication
.active
= false;
1295 lttng_dynamic_buffer_reset(&client
->communication
.inbound
.buffer
);
1296 lttng_dynamic_buffer_reset(&client
->communication
.outbound
.buffer
);
1297 pthread_mutex_destroy(&client
->lock
);
1298 call_rcu(&client
->rcu_node
, free_notification_client_rcu
);
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1306 struct notification_client
*get_client_from_socket(int socket
,
1307 struct notification_thread_state
*state
)
1309 struct cds_lfht_iter iter
;
1310 struct cds_lfht_node
*node
;
1311 struct notification_client
*client
= NULL
;
1313 cds_lfht_lookup(state
->client_socket_ht
,
1314 hash_client_socket(socket
),
1315 match_client_socket
,
1316 (void *) (unsigned long) socket
,
1318 node
= cds_lfht_iter_get_node(&iter
);
1323 client
= caa_container_of(node
, struct notification_client
,
1324 client_socket_ht_node
);
1330 bool buffer_usage_condition_applies_to_channel(
1331 const struct lttng_condition
*condition
,
1332 const struct channel_info
*channel_info
)
1334 enum lttng_condition_status status
;
1335 enum lttng_domain_type condition_domain
;
1336 const char *condition_session_name
= NULL
;
1337 const char *condition_channel_name
= NULL
;
1339 status
= lttng_condition_buffer_usage_get_domain_type(condition
,
1341 assert(status
== LTTNG_CONDITION_STATUS_OK
);
1342 if (channel_info
->key
.domain
!= condition_domain
) {
1346 status
= lttng_condition_buffer_usage_get_session_name(
1347 condition
, &condition_session_name
);
1348 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1350 status
= lttng_condition_buffer_usage_get_channel_name(
1351 condition
, &condition_channel_name
);
1352 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_channel_name
);
1354 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1357 if (strcmp(channel_info
->name
, condition_channel_name
)) {
1367 bool session_consumed_size_condition_applies_to_channel(
1368 const struct lttng_condition
*condition
,
1369 const struct channel_info
*channel_info
)
1371 enum lttng_condition_status status
;
1372 const char *condition_session_name
= NULL
;
1374 status
= lttng_condition_session_consumed_size_get_session_name(
1375 condition
, &condition_session_name
);
1376 assert((status
== LTTNG_CONDITION_STATUS_OK
) && condition_session_name
);
1378 if (strcmp(channel_info
->session_info
->name
, condition_session_name
)) {
1388 bool trigger_applies_to_channel(const struct lttng_trigger
*trigger
,
1389 const struct channel_info
*channel_info
)
1391 const struct lttng_condition
*condition
;
1392 bool trigger_applies
;
1394 condition
= lttng_trigger_get_const_condition(trigger
);
1399 switch (lttng_condition_get_type(condition
)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1402 trigger_applies
= buffer_usage_condition_applies_to_channel(
1403 condition
, channel_info
);
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
1406 trigger_applies
= session_consumed_size_condition_applies_to_channel(
1407 condition
, channel_info
);
1413 return trigger_applies
;
1419 bool trigger_applies_to_client(struct lttng_trigger
*trigger
,
1420 struct notification_client
*client
)
1422 bool applies
= false;
1423 struct lttng_condition_list_element
*condition_list_element
;
1425 cds_list_for_each_entry(condition_list_element
, &client
->condition_list
,
1427 applies
= lttng_condition_is_equal(
1428 condition_list_element
->condition
,
1429 lttng_trigger_get_condition(trigger
));
1437 /* Must be called with RCU read lock held. */
1439 struct lttng_session_trigger_list
*get_session_trigger_list(
1440 struct notification_thread_state
*state
,
1441 const char *session_name
)
1443 struct lttng_session_trigger_list
*list
= NULL
;
1444 struct cds_lfht_node
*node
;
1445 struct cds_lfht_iter iter
;
1447 cds_lfht_lookup(state
->session_triggers_ht
,
1448 hash_key_str(session_name
, lttng_ht_seed
),
1449 match_session_trigger_list
,
1452 node
= cds_lfht_iter_get_node(&iter
);
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1463 list
= caa_container_of(node
,
1464 struct lttng_session_trigger_list
,
1465 session_triggers_ht_node
);
1471 * Allocate an empty lttng_session_trigger_list for the session named
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1479 struct lttng_session_trigger_list
*lttng_session_trigger_list_create(
1480 const char *session_name
,
1481 struct cds_lfht
*session_triggers_ht
)
1483 struct lttng_session_trigger_list
*list
;
1485 list
= zmalloc(sizeof(*list
));
1489 list
->session_name
= session_name
;
1490 CDS_INIT_LIST_HEAD(&list
->list
);
1491 cds_lfht_node_init(&list
->session_triggers_ht_node
);
1492 list
->session_triggers_ht
= session_triggers_ht
;
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht
,
1497 hash_key_str(session_name
, lttng_ht_seed
),
1498 &list
->session_triggers_ht_node
);
1505 void free_session_trigger_list_rcu(struct rcu_head
*node
)
1507 free(caa_container_of(node
, struct lttng_session_trigger_list
,
1512 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list
*list
)
1514 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1518 &list
->list
, node
) {
1519 cds_list_del(&trigger_list_element
->node
);
1520 free(trigger_list_element
);
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list
->session_triggers_ht
,
1525 &list
->session_triggers_ht_node
);
1527 call_rcu(&list
->rcu_node
, free_session_trigger_list_rcu
);
1531 int lttng_session_trigger_list_add(struct lttng_session_trigger_list
*list
,
1532 const struct lttng_trigger
*trigger
)
1535 struct lttng_trigger_list_element
*new_element
=
1536 zmalloc(sizeof(*new_element
));
1542 CDS_INIT_LIST_HEAD(&new_element
->node
);
1543 new_element
->trigger
= trigger
;
1544 cds_list_add(&new_element
->node
, &list
->list
);
1550 bool trigger_applies_to_session(const struct lttng_trigger
*trigger
,
1551 const char *session_name
)
1553 bool applies
= false;
1554 const struct lttng_condition
*condition
;
1556 condition
= lttng_trigger_get_const_condition(trigger
);
1557 switch (lttng_condition_get_type(condition
)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
1561 enum lttng_condition_status condition_status
;
1562 const char *condition_session_name
;
1564 condition_status
= lttng_condition_session_rotation_get_session_name(
1565 condition
, &condition_session_name
);
1566 if (condition_status
!= LTTNG_CONDITION_STATUS_OK
) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1571 assert(condition_session_name
);
1572 applies
= !strcmp(condition_session_name
, session_name
);
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1591 struct lttng_session_trigger_list
*lttng_session_trigger_list_build(
1592 const struct notification_thread_state
*state
,
1593 const char *session_name
)
1595 int trigger_count
= 0;
1596 struct lttng_session_trigger_list
*session_trigger_list
= NULL
;
1597 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1598 struct cds_lfht_iter iter
;
1600 session_trigger_list
= lttng_session_trigger_list_create(session_name
,
1601 state
->session_triggers_ht
);
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1608 if (!trigger_applies_to_session(trigger_ht_element
->trigger
,
1613 ret
= lttng_session_trigger_list_add(session_trigger_list
,
1614 trigger_ht_element
->trigger
);
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1624 return session_trigger_list
;
1626 lttng_session_trigger_list_destroy(session_trigger_list
);
1631 struct session_info
*find_or_create_session_info(
1632 struct notification_thread_state
*state
,
1633 const char *name
, uid_t uid
, gid_t gid
)
1635 struct session_info
*session
= NULL
;
1636 struct cds_lfht_node
*node
;
1637 struct cds_lfht_iter iter
;
1638 struct lttng_session_trigger_list
*trigger_list
;
1641 cds_lfht_lookup(state
->sessions_ht
,
1642 hash_key_str(name
, lttng_ht_seed
),
1646 node
= cds_lfht_iter_get_node(&iter
);
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1650 session
= caa_container_of(node
, struct session_info
,
1652 assert(session
->uid
== uid
);
1653 assert(session
->gid
== gid
);
1654 session_info_get(session
);
1658 trigger_list
= lttng_session_trigger_list_build(state
, name
);
1659 if (!trigger_list
) {
1663 session
= session_info_create(name
, uid
, gid
, trigger_list
,
1664 state
->sessions_ht
);
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1668 lttng_session_trigger_list_destroy(trigger_list
);
1671 trigger_list
= NULL
;
1673 cds_lfht_add(state
->sessions_ht
, hash_key_str(name
, lttng_ht_seed
),
1674 &session
->sessions_ht_node
);
1680 session_info_put(session
);
1685 int handle_notification_thread_command_add_channel(
1686 struct notification_thread_state
*state
,
1687 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1688 const char *channel_name
, enum lttng_domain_type channel_domain
,
1689 uint64_t channel_key_int
, uint64_t channel_capacity
,
1690 enum lttng_error_code
*cmd_result
)
1692 struct cds_list_head trigger_list
;
1693 struct channel_info
*new_channel_info
= NULL
;
1694 struct channel_key channel_key
= {
1695 .key
= channel_key_int
,
1696 .domain
= channel_domain
,
1698 struct lttng_channel_trigger_list
*channel_trigger_list
= NULL
;
1699 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
1700 int trigger_count
= 0;
1701 struct cds_lfht_iter iter
;
1702 struct session_info
*session_info
= NULL
;
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64
" in %s domain",
1705 channel_name
, session_name
, channel_key_int
,
1706 channel_domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1708 CDS_INIT_LIST_HEAD(&trigger_list
);
1710 session_info
= find_or_create_session_info(state
, session_name
,
1711 session_uid
, session_gid
);
1712 if (!session_info
) {
1713 /* Allocation error or an internal error occurred. */
1717 new_channel_info
= channel_info_create(channel_name
, &channel_key
,
1718 channel_capacity
, session_info
);
1719 if (!new_channel_info
) {
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
1727 struct lttng_trigger_list_element
*new_element
;
1729 if (!trigger_applies_to_channel(trigger_ht_element
->trigger
,
1730 new_channel_info
)) {
1734 new_element
= zmalloc(sizeof(*new_element
));
1739 CDS_INIT_LIST_HEAD(&new_element
->node
);
1740 new_element
->trigger
= trigger_ht_element
->trigger
;
1741 cds_list_add(&new_element
->node
, &trigger_list
);
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1748 channel_trigger_list
= zmalloc(sizeof(*channel_trigger_list
));
1749 if (!channel_trigger_list
) {
1752 channel_trigger_list
->channel_key
= new_channel_info
->key
;
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list
->list
);
1754 cds_lfht_node_init(&channel_trigger_list
->channel_triggers_ht_node
);
1755 cds_list_splice(&trigger_list
, &channel_trigger_list
->list
);
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state
->channels_ht
,
1760 hash_channel_key(&new_channel_info
->key
),
1761 &new_channel_info
->channels_ht_node
);
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1766 cds_lfht_add(state
->channel_triggers_ht
,
1767 hash_channel_key(&new_channel_info
->key
),
1768 &channel_trigger_list
->channel_triggers_ht_node
);
1770 session_info_put(session_info
);
1771 *cmd_result
= LTTNG_OK
;
1774 channel_info_destroy(new_channel_info
);
1775 session_info_put(session_info
);
1780 void free_channel_trigger_list_rcu(struct rcu_head
*node
)
1782 free(caa_container_of(node
, struct lttng_channel_trigger_list
,
1787 void free_channel_state_sample_rcu(struct rcu_head
*node
)
1789 free(caa_container_of(node
, struct channel_state_sample
,
1794 int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state
*state
,
1796 uint64_t channel_key
, enum lttng_domain_type domain
,
1797 enum lttng_error_code
*cmd_result
)
1799 struct cds_lfht_node
*node
;
1800 struct cds_lfht_iter iter
;
1801 struct lttng_channel_trigger_list
*trigger_list
;
1802 struct lttng_trigger_list_element
*trigger_list_element
, *tmp
;
1803 struct channel_key key
= { .key
= channel_key
, .domain
= domain
};
1804 struct channel_info
*channel_info
;
1806 DBG("[notification-thread] Removing channel key = %" PRIu64
" in %s domain",
1807 channel_key
, domain
== LTTNG_DOMAIN_KERNEL
? "kernel" : "user space");
1811 cds_lfht_lookup(state
->channel_triggers_ht
,
1812 hash_channel_key(&key
),
1813 match_channel_trigger_list
,
1816 node
= cds_lfht_iter_get_node(&iter
);
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
1828 channel_triggers_ht_node
);
1829 cds_list_for_each_entry_safe(trigger_list_element
, tmp
,
1830 &trigger_list
->list
, node
) {
1831 cds_list_del(&trigger_list_element
->node
);
1832 free(trigger_list_element
);
1834 cds_lfht_del(state
->channel_triggers_ht
, node
);
1835 call_rcu(&trigger_list
->rcu_node
, free_channel_trigger_list_rcu
);
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state
->channel_state_ht
,
1839 hash_channel_key(&key
),
1840 match_channel_state_sample
,
1843 node
= cds_lfht_iter_get_node(&iter
);
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1849 struct channel_state_sample
*sample
= caa_container_of(node
,
1850 struct channel_state_sample
,
1851 channel_state_ht_node
);
1853 cds_lfht_del(state
->channel_state_ht
, node
);
1854 call_rcu(&sample
->rcu_node
, free_channel_state_sample_rcu
);
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state
->channels_ht
,
1859 hash_channel_key(&key
),
1863 node
= cds_lfht_iter_get_node(&iter
);
1865 channel_info
= caa_container_of(node
, struct channel_info
,
1867 cds_lfht_del(state
->channels_ht
, node
);
1868 channel_info_destroy(channel_info
);
1871 *cmd_result
= LTTNG_OK
;
1876 int handle_notification_thread_command_session_rotation(
1877 struct notification_thread_state
*state
,
1878 enum notification_thread_command_type cmd_type
,
1879 const char *session_name
, uid_t session_uid
, gid_t session_gid
,
1880 uint64_t trace_archive_chunk_id
,
1881 struct lttng_trace_archive_location
*location
,
1882 enum lttng_error_code
*_cmd_result
)
1885 enum lttng_error_code cmd_result
= LTTNG_OK
;
1886 struct lttng_session_trigger_list
*trigger_list
;
1887 struct lttng_trigger_list_element
*trigger_list_element
;
1888 struct session_info
*session_info
;
1892 session_info
= find_or_create_session_info(state
, session_name
,
1893 session_uid
, session_gid
);
1894 if (!session_info
) {
1895 /* Allocation error or an internal error occurred. */
1897 cmd_result
= LTTNG_ERR_NOMEM
;
1901 session_info
->rotation
.ongoing
=
1902 cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
;
1903 session_info
->rotation
.id
= trace_archive_chunk_id
;
1904 trigger_list
= get_session_trigger_list(state
, session_name
);
1905 if (!trigger_list
) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1911 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
1913 const struct lttng_condition
*condition
;
1914 const struct lttng_action
*action
;
1915 const struct lttng_trigger
*trigger
;
1916 struct notification_client_list
*client_list
;
1917 struct lttng_evaluation
*evaluation
= NULL
;
1918 enum lttng_condition_type condition_type
;
1919 bool client_list_is_empty
;
1921 trigger
= trigger_list_element
->trigger
;
1922 condition
= lttng_trigger_get_const_condition(trigger
);
1924 condition_type
= lttng_condition_get_type(condition
);
1926 if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
&&
1927 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1929 } else if (condition_type
== LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
&&
1930 cmd_type
!= NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
) {
1934 action
= lttng_trigger_get_const_action(trigger
);
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action
) ==
1938 LTTNG_ACTION_TYPE_NOTIFY
);
1940 client_list
= get_client_list_from_condition(state
, condition
);
1941 assert(client_list
);
1943 pthread_mutex_lock(&client_list
->lock
);
1944 client_list_is_empty
= cds_list_empty(&client_list
->list
);
1945 pthread_mutex_unlock(&client_list
->lock
);
1946 if (client_list_is_empty
) {
1948 * No clients interested in the evaluation's result,
1954 if (cmd_type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
) {
1955 evaluation
= lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id
);
1958 evaluation
= lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id
, location
);
1963 /* Internal error */
1965 cmd_result
= LTTNG_ERR_UNK
;
1969 /* Dispatch evaluation result to all clients. */
1970 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
1971 evaluation
, client_list
, state
,
1974 lttng_evaluation_destroy(evaluation
);
1976 notification_client_list_put(client_list
);
1977 if (caa_unlikely(ret
)) {
1982 session_info_put(session_info
);
1983 *_cmd_result
= cmd_result
;
1989 int condition_is_supported(struct lttng_condition
*condition
)
1993 switch (lttng_condition_get_type(condition
)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
1997 enum lttng_domain_type domain
;
1999 ret
= lttng_condition_buffer_usage_get_domain_type(condition
,
2006 if (domain
!= LTTNG_DOMAIN_KERNEL
) {
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2016 ret
= kernel_supports_ring_buffer_snapshot_sample_positions();
2026 /* Must be called with RCU read lock held. */
2028 int bind_trigger_to_matching_session(const struct lttng_trigger
*trigger
,
2029 struct notification_thread_state
*state
)
2032 const struct lttng_condition
*condition
;
2033 const char *session_name
;
2034 struct lttng_session_trigger_list
*trigger_list
;
2036 condition
= lttng_trigger_get_const_condition(trigger
);
2037 switch (lttng_condition_get_type(condition
)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING
:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED
:
2041 enum lttng_condition_status status
;
2043 status
= lttng_condition_session_rotation_get_session_name(
2044 condition
, &session_name
);
2045 if (status
!= LTTNG_CONDITION_STATUS_OK
) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2057 trigger_list
= get_session_trigger_list(state
, session_name
);
2058 if (!trigger_list
) {
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2067 ret
= lttng_session_trigger_list_add(trigger_list
, trigger
);
2072 /* Must be called with RCU read lock held. */
2074 int bind_trigger_to_matching_channels(const struct lttng_trigger
*trigger
,
2075 struct notification_thread_state
*state
)
2078 struct cds_lfht_node
*node
;
2079 struct cds_lfht_iter iter
;
2080 struct channel_info
*channel
;
2082 cds_lfht_for_each_entry(state
->channels_ht
, &iter
, channel
,
2084 struct lttng_trigger_list_element
*trigger_list_element
;
2085 struct lttng_channel_trigger_list
*trigger_list
;
2086 struct cds_lfht_iter lookup_iter
;
2088 if (!trigger_applies_to_channel(trigger
, channel
)) {
2092 cds_lfht_lookup(state
->channel_triggers_ht
,
2093 hash_channel_key(&channel
->key
),
2094 match_channel_trigger_list
,
2097 node
= cds_lfht_iter_get_node(&lookup_iter
);
2099 trigger_list
= caa_container_of(node
,
2100 struct lttng_channel_trigger_list
,
2101 channel_triggers_ht_node
);
2103 trigger_list_element
= zmalloc(sizeof(*trigger_list_element
));
2104 if (!trigger_list_element
) {
2108 CDS_INIT_LIST_HEAD(&trigger_list_element
->node
);
2109 trigger_list_element
->trigger
= trigger
;
2110 cds_list_add(&trigger_list_element
->node
, &trigger_list
->list
);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2122 * The effects of this are benign since:
2123 * - The client will succeed in registering the trigger, as it is valid,
2124 * - The trigger will, internally, be bound to the channel/session,
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
2128 * If this function returns a non-zero value, it means something is
2129 * fundamentally broken and the whole subsystem/thread will be torn down.
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2135 int handle_notification_thread_command_register_trigger(
2136 struct notification_thread_state
*state
,
2137 struct lttng_trigger
*trigger
,
2138 enum lttng_error_code
*cmd_result
)
2141 struct lttng_condition
*condition
;
2142 struct notification_client
*client
;
2143 struct notification_client_list
*client_list
= NULL
;
2144 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2145 struct notification_client_list_element
*client_list_element
, *tmp
;
2146 struct cds_lfht_node
*node
;
2147 struct cds_lfht_iter iter
;
2148 bool free_trigger
= true;
2152 condition
= lttng_trigger_get_condition(trigger
);
2155 ret
= condition_is_supported(condition
);
2158 } else if (ret
== 0) {
2159 *cmd_result
= LTTNG_ERR_NOT_SUPPORTED
;
2162 /* Feature is supported, continue. */
2166 trigger_ht_element
= zmalloc(sizeof(*trigger_ht_element
));
2167 if (!trigger_ht_element
) {
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element
->node
);
2174 trigger_ht_element
->trigger
= trigger
;
2176 node
= cds_lfht_add_unique(state
->triggers_ht
,
2177 lttng_condition_hash(condition
),
2180 &trigger_ht_element
->node
);
2181 if (node
!= &trigger_ht_element
->node
) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result
= LTTNG_ERR_TRIGGER_EXISTS
;
2184 goto error_free_ht_element
;
2188 * Ownership of the trigger and of its wrapper was transfered to
2191 trigger_ht_element
= NULL
;
2192 free_trigger
= false;
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2199 client_list
= notification_client_list_create(trigger
);
2202 goto error_free_ht_element
;
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2207 client_socket_ht_node
) {
2208 if (!trigger_applies_to_client(trigger
, client
)) {
2212 client_list_element
= zmalloc(sizeof(*client_list_element
));
2213 if (!client_list_element
) {
2215 goto error_put_client_list
;
2217 CDS_INIT_LIST_HEAD(&client_list_element
->node
);
2218 client_list_element
->client
= client
;
2219 cds_list_add(&client_list_element
->node
, &client_list
->list
);
2222 switch (get_condition_binding_object(condition
)) {
2223 case LTTNG_OBJECT_TYPE_SESSION
:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret
= bind_trigger_to_matching_session(trigger
, state
);
2227 goto error_put_client_list
;
2230 case LTTNG_OBJECT_TYPE_CHANNEL
:
2232 * Add the trigger to list of triggers bound to the channels
2235 ret
= bind_trigger_to_matching_channels(trigger
, state
);
2237 goto error_put_client_list
;
2240 case LTTNG_OBJECT_TYPE_NONE
:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2245 goto error_put_client_list
;
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
2274 * No need to lock the list here as it has not been published yet.
2276 cds_list_for_each_entry_safe(client_list_element
, tmp
,
2277 &client_list
->list
, node
) {
2278 ret
= evaluate_condition_for_client(trigger
, condition
,
2279 client_list_element
->client
, state
);
2281 goto error_put_client_list
;
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2289 publish_notification_client_list(state
, client_list
);
2292 *cmd_result
= LTTNG_OK
;
2294 error_put_client_list
:
2295 notification_client_list_put(client_list
);
2297 error_free_ht_element
:
2298 free(trigger_ht_element
);
2301 lttng_trigger_destroy(trigger
);
2308 void free_lttng_trigger_ht_element_rcu(struct rcu_head
*node
)
2310 free(caa_container_of(node
, struct lttng_trigger_ht_element
,
2315 int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state
*state
,
2317 struct lttng_trigger
*trigger
,
2318 enum lttng_error_code
*_cmd_reply
)
2320 struct cds_lfht_iter iter
;
2321 struct cds_lfht_node
*triggers_ht_node
;
2322 struct lttng_channel_trigger_list
*trigger_list
;
2323 struct notification_client_list
*client_list
;
2324 struct lttng_trigger_ht_element
*trigger_ht_element
= NULL
;
2325 struct lttng_condition
*condition
= lttng_trigger_get_condition(
2327 enum lttng_error_code cmd_reply
;
2331 cds_lfht_lookup(state
->triggers_ht
,
2332 lttng_condition_hash(condition
),
2336 triggers_ht_node
= cds_lfht_iter_get_node(&iter
);
2337 if (!triggers_ht_node
) {
2338 cmd_reply
= LTTNG_ERR_TRIGGER_NOT_FOUND
;
2341 cmd_reply
= LTTNG_OK
;
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state
->channel_triggers_ht
, &iter
, trigger_list
,
2346 channel_triggers_ht_node
) {
2347 struct lttng_trigger_list_element
*trigger_element
, *tmp
;
2349 cds_list_for_each_entry_safe(trigger_element
, tmp
,
2350 &trigger_list
->list
, node
) {
2351 const struct lttng_condition
*current_condition
=
2352 lttng_trigger_get_const_condition(
2353 trigger_element
->trigger
);
2355 assert(current_condition
);
2356 if (!lttng_condition_is_equal(condition
,
2357 current_condition
)) {
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element
->node
);
2363 /* A trigger can only appear once per channel */
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2372 client_list
= get_client_list_from_condition(state
, condition
);
2373 assert(client_list
);
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list
);
2377 notification_client_list_put(client_list
);
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element
= caa_container_of(triggers_ht_node
,
2382 struct lttng_trigger_ht_element
, node
);
2383 cds_lfht_del(state
->triggers_ht
, triggers_ht_node
);
2385 /* Release the ownership of the trigger. */
2386 lttng_trigger_destroy(trigger_ht_element
->trigger
);
2387 call_rcu(&trigger_ht_element
->rcu_node
, free_lttng_trigger_ht_element_rcu
);
2391 *_cmd_reply
= cmd_reply
;
2396 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2397 int handle_notification_thread_command(
2398 struct notification_thread_handle
*handle
,
2399 struct notification_thread_state
*state
)
2403 struct notification_thread_command
*cmd
;
2405 /* Read the event pipe to put it back into a quiescent state. */
2406 ret
= lttng_read(lttng_pipe_get_readfd(handle
->cmd_queue
.event_pipe
), &counter
,
2408 if (ret
!= sizeof(counter
)) {
2412 pthread_mutex_lock(&handle
->cmd_queue
.lock
);
2413 cmd
= cds_list_first_entry(&handle
->cmd_queue
.list
,
2414 struct notification_thread_command
, cmd_list_node
);
2415 switch (cmd
->type
) {
2416 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER
:
2417 DBG("[notification-thread] Received register trigger command");
2418 ret
= handle_notification_thread_command_register_trigger(
2419 state
, cmd
->parameters
.trigger
,
2422 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER
:
2423 DBG("[notification-thread] Received unregister trigger command");
2424 ret
= handle_notification_thread_command_unregister_trigger(
2425 state
, cmd
->parameters
.trigger
,
2428 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL
:
2429 DBG("[notification-thread] Received add channel command");
2430 ret
= handle_notification_thread_command_add_channel(
2432 cmd
->parameters
.add_channel
.session
.name
,
2433 cmd
->parameters
.add_channel
.session
.uid
,
2434 cmd
->parameters
.add_channel
.session
.gid
,
2435 cmd
->parameters
.add_channel
.channel
.name
,
2436 cmd
->parameters
.add_channel
.channel
.domain
,
2437 cmd
->parameters
.add_channel
.channel
.key
,
2438 cmd
->parameters
.add_channel
.channel
.capacity
,
2441 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL
:
2442 DBG("[notification-thread] Received remove channel command");
2443 ret
= handle_notification_thread_command_remove_channel(
2444 state
, cmd
->parameters
.remove_channel
.key
,
2445 cmd
->parameters
.remove_channel
.domain
,
2448 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
:
2449 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED
:
2450 DBG("[notification-thread] Received session rotation %s command",
2451 cmd
->type
== NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING
?
2452 "ongoing" : "completed");
2453 ret
= handle_notification_thread_command_session_rotation(
2456 cmd
->parameters
.session_rotation
.session_name
,
2457 cmd
->parameters
.session_rotation
.uid
,
2458 cmd
->parameters
.session_rotation
.gid
,
2459 cmd
->parameters
.session_rotation
.trace_archive_chunk_id
,
2460 cmd
->parameters
.session_rotation
.location
,
2463 case NOTIFICATION_COMMAND_TYPE_QUIT
:
2464 DBG("[notification-thread] Received quit command");
2465 cmd
->reply_code
= LTTNG_OK
;
2469 ERR("[notification-thread] Unknown internal command received");
2477 cds_list_del(&cmd
->cmd_list_node
);
2478 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2479 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2482 /* Wake-up and return a fatal error to the calling thread. */
2483 lttng_waiter_wake_up(&cmd
->reply_waiter
);
2484 pthread_mutex_unlock(&handle
->cmd_queue
.lock
);
2485 cmd
->reply_code
= LTTNG_ERR_FATAL
;
2487 /* Indicate a fatal error to the caller. */
2492 int socket_set_non_blocking(int socket
)
2496 /* Set the pipe as non-blocking. */
2497 ret
= fcntl(socket
, F_GETFL
, 0);
2499 PERROR("fcntl get socket flags");
2504 ret
= fcntl(socket
, F_SETFL
, flags
| O_NONBLOCK
);
2506 PERROR("fcntl set O_NONBLOCK socket flag");
2509 DBG("Client socket (fd = %i) set as non-blocking", socket
);
2514 /* Client lock must be acquired by caller. */
2516 int client_reset_inbound_state(struct notification_client
*client
)
2520 ASSERT_LOCKED(client
->lock
);
2522 ret
= lttng_dynamic_buffer_set_size(
2523 &client
->communication
.inbound
.buffer
, 0);
2526 client
->communication
.inbound
.bytes_to_receive
=
2527 sizeof(struct lttng_notification_channel_message
);
2528 client
->communication
.inbound
.msg_type
=
2529 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
;
2530 LTTNG_SOCK_SET_UID_CRED(&client
->communication
.inbound
.creds
, -1);
2531 LTTNG_SOCK_SET_GID_CRED(&client
->communication
.inbound
.creds
, -1);
2532 ret
= lttng_dynamic_buffer_set_size(
2533 &client
->communication
.inbound
.buffer
,
2534 client
->communication
.inbound
.bytes_to_receive
);
2538 int handle_notification_thread_client_connect(
2539 struct notification_thread_state
*state
)
2542 struct notification_client
*client
;
2544 DBG("[notification-thread] Handling new notification channel client connection");
2546 client
= zmalloc(sizeof(*client
));
2552 pthread_mutex_init(&client
->lock
, NULL
);
2553 client
->id
= state
->next_notification_client_id
++;
2554 CDS_INIT_LIST_HEAD(&client
->condition_list
);
2555 lttng_dynamic_buffer_init(&client
->communication
.inbound
.buffer
);
2556 lttng_dynamic_buffer_init(&client
->communication
.outbound
.buffer
);
2557 client
->communication
.inbound
.expect_creds
= true;
2559 pthread_mutex_lock(&client
->lock
);
2560 ret
= client_reset_inbound_state(client
);
2561 pthread_mutex_unlock(&client
->lock
);
2563 ERR("[notification-thread] Failed to reset client communication's inbound state");
2568 ret
= lttcomm_accept_unix_sock(state
->notification_channel_socket
);
2570 ERR("[notification-thread] Failed to accept new notification channel client connection");
2575 client
->socket
= ret
;
2577 ret
= socket_set_non_blocking(client
->socket
);
2579 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2583 ret
= lttcomm_setsockopt_creds_unix_sock(client
->socket
);
2585 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2590 ret
= lttng_poll_add(&state
->events
, client
->socket
,
2591 LPOLLIN
| LPOLLERR
|
2592 LPOLLHUP
| LPOLLRDHUP
);
2594 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2598 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2602 cds_lfht_add(state
->client_socket_ht
,
2603 hash_client_socket(client
->socket
),
2604 &client
->client_socket_ht_node
);
2605 cds_lfht_add(state
->client_id_ht
,
2606 hash_client_id(client
->id
),
2607 &client
->client_id_ht_node
);
2612 notification_client_destroy(client
, state
);
2616 /* RCU read-lock must be held by the caller. */
2617 /* Client lock must be held by the caller */
2619 int notification_thread_client_disconnect(
2620 struct notification_client
*client
,
2621 struct notification_thread_state
*state
)
2624 struct lttng_condition_list_element
*condition_list_element
, *tmp
;
2626 /* Acquire the client lock to disable its communication atomically. */
2627 client
->communication
.active
= false;
2628 ret
= lttng_poll_del(&state
->events
, client
->socket
);
2630 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2634 cds_lfht_del(state
->client_socket_ht
, &client
->client_socket_ht_node
);
2635 cds_lfht_del(state
->client_id_ht
, &client
->client_id_ht_node
);
2637 /* Release all conditions to which the client was subscribed. */
2638 cds_list_for_each_entry_safe(condition_list_element
, tmp
,
2639 &client
->condition_list
, node
) {
2640 (void) notification_thread_client_unsubscribe(client
,
2641 condition_list_element
->condition
, state
, NULL
);
2645 * Client no longer accessible to other threads (through the
2648 notification_client_destroy(client
, state
);
2652 int handle_notification_thread_client_disconnect(
2653 int client_socket
, struct notification_thread_state
*state
)
2656 struct notification_client
*client
;
2659 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2661 client
= get_client_from_socket(client_socket
, state
);
2663 /* Internal state corruption, fatal error. */
2664 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2670 pthread_mutex_lock(&client
->lock
);
2671 ret
= notification_thread_client_disconnect(client
, state
);
2672 pthread_mutex_unlock(&client
->lock
);
2678 int handle_notification_thread_client_disconnect_all(
2679 struct notification_thread_state
*state
)
2681 struct cds_lfht_iter iter
;
2682 struct notification_client
*client
;
2683 bool error_encoutered
= false;
2686 DBG("[notification-thread] Closing all client connections");
2687 cds_lfht_for_each_entry(state
->client_socket_ht
, &iter
, client
,
2688 client_socket_ht_node
) {
2691 pthread_mutex_lock(&client
->lock
);
2692 ret
= notification_thread_client_disconnect(
2694 pthread_mutex_unlock(&client
->lock
);
2696 error_encoutered
= true;
2700 return error_encoutered
? 1 : 0;
2703 int handle_notification_thread_trigger_unregister_all(
2704 struct notification_thread_state
*state
)
2706 bool error_occurred
= false;
2707 struct cds_lfht_iter iter
;
2708 struct lttng_trigger_ht_element
*trigger_ht_element
;
2711 cds_lfht_for_each_entry(state
->triggers_ht
, &iter
, trigger_ht_element
,
2713 int ret
= handle_notification_thread_command_unregister_trigger(
2714 state
, trigger_ht_element
->trigger
, NULL
);
2716 error_occurred
= true;
2720 return error_occurred
? -1 : 0;
2724 int client_handle_transmission_status(
2725 struct notification_client
*client
,
2726 enum client_transmission_status transmission_status
,
2727 struct notification_thread_state
*state
)
2731 switch (transmission_status
) {
2732 case CLIENT_TRANSMISSION_STATUS_COMPLETE
:
2733 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2734 CLIENT_POLL_MASK_IN
);
2739 client
->communication
.outbound
.queued_command_reply
= false;
2740 client
->communication
.outbound
.dropped_notification
= false;
2742 case CLIENT_TRANSMISSION_STATUS_QUEUED
:
2744 * We want to be notified whenever there is buffer space
2745 * available to send the rest of the payload.
2747 ret
= lttng_poll_mod(&state
->events
, client
->socket
,
2748 CLIENT_POLL_MASK_IN_OUT
);
2753 case CLIENT_TRANSMISSION_STATUS_FAIL
:
2754 ret
= notification_thread_client_disconnect(client
, state
);
2759 case CLIENT_TRANSMISSION_STATUS_ERROR
:
2769 /* Client lock must be acquired by caller. */
2771 enum client_transmission_status
client_flush_outgoing_queue(
2772 struct notification_client
*client
,
2773 struct notification_thread_state
*state
)
2776 size_t to_send_count
;
2777 enum client_transmission_status status
;
2779 ASSERT_LOCKED(client
->lock
);
2781 assert(client
->communication
.outbound
.buffer
.size
!= 0);
2782 to_send_count
= client
->communication
.outbound
.buffer
.size
;
2783 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2786 ret
= lttcomm_send_unix_sock_non_block(client
->socket
,
2787 client
->communication
.outbound
.buffer
.data
,
2789 if ((ret
>= 0 && ret
< to_send_count
)) {
2790 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2792 to_send_count
-= max(ret
, 0);
2794 memcpy(client
->communication
.outbound
.buffer
.data
,
2795 client
->communication
.outbound
.buffer
.data
+
2796 client
->communication
.outbound
.buffer
.size
- to_send_count
,
2798 ret
= lttng_dynamic_buffer_set_size(
2799 &client
->communication
.outbound
.buffer
,
2802 status
= CLIENT_TRANSMISSION_STATUS_ERROR
;
2805 status
= CLIENT_TRANSMISSION_STATUS_QUEUED
;
2806 } else if (ret
< 0) {
2807 /* Generic error, disconnect the client. */
2808 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2810 status
= CLIENT_TRANSMISSION_STATUS_FAIL
;
2812 /* No error and flushed the queue completely. */
2813 ret
= lttng_dynamic_buffer_set_size(
2814 &client
->communication
.outbound
.buffer
, 0);
2816 status
= CLIENT_TRANSMISSION_STATUS_ERROR
;
2819 status
= CLIENT_TRANSMISSION_STATUS_COMPLETE
;
2822 ret
= client_handle_transmission_status(client
, status
, state
);
2832 /* Client lock must be acquired by caller. */
2834 int client_send_command_reply(struct notification_client
*client
,
2835 struct notification_thread_state
*state
,
2836 enum lttng_notification_channel_status status
)
2839 struct lttng_notification_channel_command_reply reply
= {
2840 .status
= (int8_t) status
,
2842 struct lttng_notification_channel_message msg
= {
2843 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY
,
2844 .size
= sizeof(reply
),
2846 char buffer
[sizeof(msg
) + sizeof(reply
)];
2848 ASSERT_LOCKED(client
->lock
);
2850 if (client
->communication
.outbound
.queued_command_reply
) {
2851 /* Protocol error. */
2855 memcpy(buffer
, &msg
, sizeof(msg
));
2856 memcpy(buffer
+ sizeof(msg
), &reply
, sizeof(reply
));
2857 DBG("[notification-thread] Send command reply (%i)", (int) status
);
2859 /* Enqueue buffer to outgoing queue and flush it. */
2860 ret
= lttng_dynamic_buffer_append(
2861 &client
->communication
.outbound
.buffer
,
2862 buffer
, sizeof(buffer
));
2867 ret
= client_flush_outgoing_queue(client
, state
);
2872 if (client
->communication
.outbound
.buffer
.size
!= 0) {
2873 /* Queue could not be emptied. */
2874 client
->communication
.outbound
.queued_command_reply
= true;
2883 int client_handle_message_unknown(struct notification_client
*client
,
2884 struct notification_thread_state
*state
)
2888 pthread_mutex_lock(&client
->lock
);
2891 * Receiving message header. The function will be called again
2892 * once the rest of the message as been received and can be
2895 const struct lttng_notification_channel_message
*msg
;
2897 assert(sizeof(*msg
) == client
->communication
.inbound
.buffer
.size
);
2898 msg
= (const struct lttng_notification_channel_message
*)
2899 client
->communication
.inbound
.buffer
.data
;
2901 if (msg
->size
== 0 ||
2902 msg
->size
> DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE
) {
2903 ERR("[notification-thread] Invalid notification channel message: length = %u",
2909 switch (msg
->type
) {
2910 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
2911 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
2912 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
2916 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2920 client
->communication
.inbound
.bytes_to_receive
= msg
->size
;
2921 client
->communication
.inbound
.msg_type
=
2922 (enum lttng_notification_channel_message_type
) msg
->type
;
2923 ret
= lttng_dynamic_buffer_set_size(
2924 &client
->communication
.inbound
.buffer
, msg
->size
);
2926 pthread_mutex_unlock(&client
->lock
);
2931 int client_handle_message_handshake(struct notification_client
*client
,
2932 struct notification_thread_state
*state
)
2935 struct lttng_notification_channel_command_handshake
*handshake_client
;
2936 const struct lttng_notification_channel_command_handshake handshake_reply
= {
2937 .major
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
,
2938 .minor
= LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR
,
2940 const struct lttng_notification_channel_message msg_header
= {
2941 .type
= LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
,
2942 .size
= sizeof(handshake_reply
),
2944 enum lttng_notification_channel_status status
=
2945 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
2946 char send_buffer
[sizeof(msg_header
) + sizeof(handshake_reply
)];
2948 pthread_mutex_lock(&client
->lock
);
2950 memcpy(send_buffer
, &msg_header
, sizeof(msg_header
));
2951 memcpy(send_buffer
+ sizeof(msg_header
), &handshake_reply
,
2952 sizeof(handshake_reply
));
2955 (struct lttng_notification_channel_command_handshake
*)
2956 client
->communication
.inbound
.buffer
2958 client
->major
= handshake_client
->major
;
2959 client
->minor
= handshake_client
->minor
;
2960 if (!client
->communication
.inbound
.creds_received
) {
2961 ERR("[notification-thread] No credentials received from client");
2966 client
->uid
= LTTNG_SOCK_GET_UID_CRED(
2967 &client
->communication
.inbound
.creds
);
2968 client
->gid
= LTTNG_SOCK_GET_GID_CRED(
2969 &client
->communication
.inbound
.creds
);
2970 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2971 client
->uid
, client
->gid
, (int) client
->major
,
2972 (int) client
->minor
);
2974 if (handshake_client
->major
!=
2975 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR
) {
2976 status
= LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION
;
2979 ret
= lttng_dynamic_buffer_append(
2980 &client
->communication
.outbound
.buffer
, send_buffer
,
2981 sizeof(send_buffer
));
2983 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2987 client
->validated
= true;
2988 client
->communication
.active
= true;
2990 ret
= client_flush_outgoing_queue(client
, state
);
2995 ret
= client_send_command_reply(client
, state
, status
);
2997 ERR("[notification-thread] Failed to send reply to notification channel client");
3001 /* Set reception state to receive the next message header. */
3002 ret
= client_reset_inbound_state(client
);
3004 ERR("[notification-thread] Failed to reset client communication's inbound state");
3009 pthread_mutex_unlock(&client
->lock
);
3014 int client_handle_message_subscription(
3015 struct notification_client
*client
,
3016 enum lttng_notification_channel_message_type msg_type
,
3017 struct notification_thread_state
*state
)
3020 struct lttng_condition
*condition
;
3021 enum lttng_notification_channel_status status
=
3022 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK
;
3023 struct lttng_payload_view condition_view
=
3024 lttng_payload_view_from_dynamic_buffer(
3025 &client
->communication
.inbound
.buffer
,
3027 size_t expected_condition_size
;
3029 pthread_mutex_lock(&client
->lock
);
3030 expected_condition_size
= client
->communication
.inbound
.buffer
.size
;
3031 pthread_mutex_unlock(&client
->lock
);
3033 ret
= lttng_condition_create_from_payload(&condition_view
, &condition
);
3034 if (ret
!= expected_condition_size
) {
3035 ERR("[notification-thread] Malformed condition received from client");
3039 if (msg_type
== LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
) {
3040 ret
= notification_thread_client_subscribe(
3041 client
, condition
, state
, &status
);
3043 ret
= notification_thread_client_unsubscribe(
3044 client
, condition
, state
, &status
);
3050 pthread_mutex_lock(&client
->lock
);
3051 ret
= client_send_command_reply(client
, state
, status
);
3053 ERR("[notification-thread] Failed to send reply to notification channel client");
3057 /* Set reception state to receive the next message header. */
3058 ret
= client_reset_inbound_state(client
);
3060 ERR("[notification-thread] Failed to reset client communication's inbound state");
3065 pthread_mutex_unlock(&client
->lock
);
3071 int client_dispatch_message(struct notification_client
*client
,
3072 struct notification_thread_state
*state
)
3076 if (client
->communication
.inbound
.msg_type
!=
3077 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
&&
3078 client
->communication
.inbound
.msg_type
!=
3079 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
&&
3080 !client
->validated
) {
3081 WARN("[notification-thread] client attempted a command before handshake");
3086 switch (client
->communication
.inbound
.msg_type
) {
3087 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN
:
3089 ret
= client_handle_message_unknown(client
, state
);
3092 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE
:
3094 ret
= client_handle_message_handshake(client
, state
);
3097 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE
:
3098 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE
:
3100 ret
= client_handle_message_subscription(client
,
3101 client
->communication
.inbound
.msg_type
, state
);
3111 /* Incoming data from client. */
3112 int handle_notification_thread_client_in(
3113 struct notification_thread_state
*state
, int socket
)
3116 struct notification_client
*client
;
3119 bool message_is_complete
= false;
3121 client
= get_client_from_socket(socket
, state
);
3123 /* Internal error, abort. */
3128 pthread_mutex_lock(&client
->lock
);
3129 offset
= client
->communication
.inbound
.buffer
.size
-
3130 client
->communication
.inbound
.bytes_to_receive
;
3131 if (client
->communication
.inbound
.expect_creds
) {
3132 recv_ret
= lttcomm_recv_creds_unix_sock(socket
,
3133 client
->communication
.inbound
.buffer
.data
+ offset
,
3134 client
->communication
.inbound
.bytes_to_receive
,
3135 &client
->communication
.inbound
.creds
);
3137 client
->communication
.inbound
.expect_creds
= false;
3138 client
->communication
.inbound
.creds_received
= true;
3141 recv_ret
= lttcomm_recv_unix_sock_non_block(socket
,
3142 client
->communication
.inbound
.buffer
.data
+ offset
,
3143 client
->communication
.inbound
.bytes_to_receive
);
3145 if (recv_ret
>= 0) {
3146 client
->communication
.inbound
.bytes_to_receive
-= recv_ret
;
3147 message_is_complete
= client
->communication
.inbound
3148 .bytes_to_receive
== 0;
3150 pthread_mutex_unlock(&client
->lock
);
3152 goto error_disconnect_client
;
3155 if (message_is_complete
) {
3156 ret
= client_dispatch_message(client
, state
);
3159 * Only returns an error if this client must be
3162 goto error_disconnect_client
;
3167 error_disconnect_client
:
3168 pthread_mutex_lock(&client
->lock
);
3169 ret
= notification_thread_client_disconnect(client
, state
);
3170 pthread_mutex_unlock(&client
->lock
);
3174 /* Client ready to receive outgoing data. */
3175 int handle_notification_thread_client_out(
3176 struct notification_thread_state
*state
, int socket
)
3179 struct notification_client
*client
;
3181 client
= get_client_from_socket(socket
, state
);
3183 /* Internal error, abort. */
3188 pthread_mutex_lock(&client
->lock
);
3189 ret
= client_flush_outgoing_queue(client
, state
);
3190 pthread_mutex_unlock(&client
->lock
);
3199 bool evaluate_buffer_usage_condition(const struct lttng_condition
*condition
,
3200 const struct channel_state_sample
*sample
,
3201 uint64_t buffer_capacity
)
3203 bool result
= false;
3205 enum lttng_condition_type condition_type
;
3206 const struct lttng_condition_buffer_usage
*use_condition
= container_of(
3207 condition
, struct lttng_condition_buffer_usage
,
3210 if (use_condition
->threshold_bytes
.set
) {
3211 threshold
= use_condition
->threshold_bytes
.value
;
3214 * Threshold was expressed as a ratio.
3216 * TODO the threshold (in bytes) of conditions expressed
3217 * as a ratio of total buffer size could be cached to
3218 * forego this double-multiplication or it could be performed
3219 * as fixed-point math.
3221 * Note that caching should accommodates the case where the
3222 * condition applies to multiple channels (i.e. don't assume
3223 * that all channels matching my_chann* have the same size...)
3225 threshold
= (uint64_t) (use_condition
->threshold_ratio
.value
*
3226 (double) buffer_capacity
);
3229 condition_type
= lttng_condition_get_type(condition
);
3230 if (condition_type
== LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
) {
3231 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3232 threshold
, sample
->highest_usage
);
3235 * The low condition should only be triggered once _all_ of the
3236 * streams in a channel have gone below the "low" threshold.
3238 if (sample
->highest_usage
<= threshold
) {
3242 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64
", highest usage = %" PRIu64
,
3243 threshold
, sample
->highest_usage
);
3246 * For high buffer usage scenarios, we want to trigger whenever
3247 * _any_ of the streams has reached the "high" threshold.
3249 if (sample
->highest_usage
>= threshold
) {
3258 bool evaluate_session_consumed_size_condition(
3259 const struct lttng_condition
*condition
,
3260 uint64_t session_consumed_size
)
3263 const struct lttng_condition_session_consumed_size
*size_condition
=
3264 container_of(condition
,
3265 struct lttng_condition_session_consumed_size
,
3268 threshold
= size_condition
->consumed_threshold_bytes
.value
;
3269 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64
", current size = %" PRIu64
,
3270 threshold
, session_consumed_size
);
3271 return session_consumed_size
>= threshold
;
3275 int evaluate_buffer_condition(const struct lttng_condition
*condition
,
3276 struct lttng_evaluation
**evaluation
,
3277 const struct notification_thread_state
*state
,
3278 const struct channel_state_sample
*previous_sample
,
3279 const struct channel_state_sample
*latest_sample
,
3280 uint64_t previous_session_consumed_total
,
3281 uint64_t latest_session_consumed_total
,
3282 struct channel_info
*channel_info
)
3285 enum lttng_condition_type condition_type
;
3286 const bool previous_sample_available
= !!previous_sample
;
3287 bool previous_sample_result
= false;
3288 bool latest_sample_result
;
3290 condition_type
= lttng_condition_get_type(condition
);
3292 switch (condition_type
) {
3293 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3294 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3295 if (caa_likely(previous_sample_available
)) {
3296 previous_sample_result
=
3297 evaluate_buffer_usage_condition(condition
,
3298 previous_sample
, channel_info
->capacity
);
3300 latest_sample_result
= evaluate_buffer_usage_condition(
3301 condition
, latest_sample
,
3302 channel_info
->capacity
);
3304 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3305 if (caa_likely(previous_sample_available
)) {
3306 previous_sample_result
=
3307 evaluate_session_consumed_size_condition(
3309 previous_session_consumed_total
);
3311 latest_sample_result
=
3312 evaluate_session_consumed_size_condition(
3314 latest_session_consumed_total
);
3317 /* Unknown condition type; internal error. */
3321 if (!latest_sample_result
||
3322 (previous_sample_result
== latest_sample_result
)) {
3324 * Only trigger on a condition evaluation transition.
3326 * NOTE: This edge-triggered logic may not be appropriate for
3327 * future condition types.
3332 if (!evaluation
|| !latest_sample_result
) {
3336 switch (condition_type
) {
3337 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW
:
3338 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH
:
3339 *evaluation
= lttng_evaluation_buffer_usage_create(
3341 latest_sample
->highest_usage
,
3342 channel_info
->capacity
);
3344 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE
:
3345 *evaluation
= lttng_evaluation_session_consumed_size_create(
3346 latest_session_consumed_total
);
3361 int client_enqueue_dropped_notification(struct notification_client
*client
)
3364 struct lttng_notification_channel_message msg
= {
3365 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED
,
3369 ASSERT_LOCKED(client
->lock
);
3371 ret
= lttng_dynamic_buffer_append(
3372 &client
->communication
.outbound
.buffer
, &msg
,
3378 * Permission checks relative to notification channel clients are performed
3379 * here. Notice how object, client, and trigger credentials are involved in
3382 * The `object` credentials are the credentials associated with the "subject"
3383 * of a condition. For instance, a `rotation completed` condition applies
3384 * to a session. When that condition is met, it will produce an evaluation
3385 * against a session. Hence, in this case, the `object` credentials are the
3386 * credentials of the "subject" session.
3388 * The `trigger` credentials are the credentials of the user that registered the
3391 * The `client` credentials are the credentials of the user that created a given
3392 * notification channel.
3394 * In terms of visibility, it is expected that non-privilieged users can only
3395 * register triggers against "their" objects (their own sessions and
3396 * applications they are allowed to interact with). They can then open a
3397 * notification channel and subscribe to notifications associated with those
3400 * As for privilieged users, they can register triggers against the objects of
3401 * other users. They can then subscribe to the notifications associated to their
3402 * triggers. Privilieged users _can't_ subscribe to the notifications of
3403 * triggers owned by other users; they must create their own triggers.
3405 * This is more a concern of usability than security. It would be difficult for
3406 * a root user reliably subscribe to a specific set of conditions without
3407 * interference from external users (those could, for instance, unregister
3411 int send_evaluation_to_clients(const struct lttng_trigger
*trigger
,
3412 const struct lttng_evaluation
*evaluation
,
3413 struct notification_client_list
* client_list
,
3414 struct notification_thread_state
*state
,
3415 uid_t object_uid
, gid_t object_gid
)
3418 struct lttng_payload msg_payload
;
3419 struct notification_client_list_element
*client_list_element
, *tmp
;
3420 const struct lttng_notification notification
= {
3421 .condition
= (struct lttng_condition
*) lttng_trigger_get_const_condition(trigger
),
3422 .evaluation
= (struct lttng_evaluation
*) evaluation
,
3424 struct lttng_notification_channel_message msg_header
= {
3425 .type
= (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION
,
3427 const struct lttng_credentials
*trigger_creds
= lttng_trigger_get_credentials(trigger
);
3429 lttng_payload_init(&msg_payload
);
3431 ret
= lttng_dynamic_buffer_append(&msg_payload
.buffer
, &msg_header
,
3432 sizeof(msg_header
));
3437 ret
= lttng_notification_serialize(¬ification
, &msg_payload
);
3439 ERR("[notification-thread] Failed to serialize notification");
3444 /* Update payload size. */
3445 ((struct lttng_notification_channel_message
*) msg_payload
.buffer
.data
)
3446 ->size
= (uint32_t)(
3447 msg_payload
.buffer
.size
- sizeof(msg_header
));
3449 pthread_mutex_lock(&client_list
->lock
);
3450 cds_list_for_each_entry_safe(client_list_element
, tmp
,
3451 &client_list
->list
, node
) {
3452 struct notification_client
*client
=
3453 client_list_element
->client
;
3456 pthread_mutex_lock(&client
->lock
);
3457 if (client
->uid
!= object_uid
&& client
->gid
!= object_gid
&&
3459 /* Client is not allowed to monitor this channel. */
3460 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3464 if (client
->uid
!= trigger_creds
->uid
&& client
->gid
!= trigger_creds
->gid
) {
3465 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3469 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3470 client
->socket
, msg_payload
.buffer
.size
);
3471 if (client
->communication
.outbound
.buffer
.size
) {
3473 * Outgoing data is already buffered for this client;
3474 * drop the notification and enqueue a "dropped
3475 * notification" message if this is the first dropped
3476 * notification since the socket spilled-over to the
3479 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3481 if (!client
->communication
.outbound
.dropped_notification
) {
3482 client
->communication
.outbound
.dropped_notification
= true;
3483 ret
= client_enqueue_dropped_notification(
3492 ret
= lttng_dynamic_buffer_append_buffer(
3493 &client
->communication
.outbound
.buffer
,
3494 &msg_payload
.buffer
);
3499 ret
= client_flush_outgoing_queue(client
, state
);
3504 pthread_mutex_unlock(&client
->lock
);
3506 goto end_unlock_list
;
3512 pthread_mutex_unlock(&client_list
->lock
);
3514 lttng_payload_reset(&msg_payload
);
3518 int handle_notification_thread_channel_sample(
3519 struct notification_thread_state
*state
, int pipe
,
3520 enum lttng_domain_type domain
)
3523 struct lttcomm_consumer_channel_monitor_msg sample_msg
;
3524 struct channel_info
*channel_info
;
3525 struct cds_lfht_node
*node
;
3526 struct cds_lfht_iter iter
;
3527 struct lttng_channel_trigger_list
*trigger_list
;
3528 struct lttng_trigger_list_element
*trigger_list_element
;
3529 bool previous_sample_available
= false;
3530 struct channel_state_sample previous_sample
, latest_sample
;
3531 uint64_t previous_session_consumed_total
, latest_session_consumed_total
;
3534 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3535 * ensuring that read/write of sampling messages are atomic.
3537 ret
= lttng_read(pipe
, &sample_msg
, sizeof(sample_msg
));
3538 if (ret
!= sizeof(sample_msg
)) {
3539 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3546 latest_sample
.key
.key
= sample_msg
.key
;
3547 latest_sample
.key
.domain
= domain
;
3548 latest_sample
.highest_usage
= sample_msg
.highest
;
3549 latest_sample
.lowest_usage
= sample_msg
.lowest
;
3550 latest_sample
.channel_total_consumed
= sample_msg
.total_consumed
;
3554 /* Retrieve the channel's informations */
3555 cds_lfht_lookup(state
->channels_ht
,
3556 hash_channel_key(&latest_sample
.key
),
3560 node
= cds_lfht_iter_get_node(&iter
);
3561 if (caa_unlikely(!node
)) {
3563 * Not an error since the consumer can push a sample to the pipe
3564 * and the rest of the session daemon could notify us of the
3565 * channel's destruction before we get a chance to process that
3568 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64
" in %s domain",
3569 latest_sample
.key
.key
,
3570 domain
== LTTNG_DOMAIN_KERNEL
? "kernel" :
3574 channel_info
= caa_container_of(node
, struct channel_info
,
3576 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64
") in session %s (highest usage = %" PRIu64
", lowest usage = %" PRIu64
", total consumed = %" PRIu64
")",
3578 latest_sample
.key
.key
,
3579 channel_info
->session_info
->name
,
3580 latest_sample
.highest_usage
,
3581 latest_sample
.lowest_usage
,
3582 latest_sample
.channel_total_consumed
);
3584 previous_session_consumed_total
=
3585 channel_info
->session_info
->consumed_data_size
;
3587 /* Retrieve the channel's last sample, if it exists, and update it. */
3588 cds_lfht_lookup(state
->channel_state_ht
,
3589 hash_channel_key(&latest_sample
.key
),
3590 match_channel_state_sample
,
3593 node
= cds_lfht_iter_get_node(&iter
);
3594 if (caa_likely(node
)) {
3595 struct channel_state_sample
*stored_sample
;
3597 /* Update the sample stored. */
3598 stored_sample
= caa_container_of(node
,
3599 struct channel_state_sample
,
3600 channel_state_ht_node
);
3602 memcpy(&previous_sample
, stored_sample
,
3603 sizeof(previous_sample
));
3604 stored_sample
->highest_usage
= latest_sample
.highest_usage
;
3605 stored_sample
->lowest_usage
= latest_sample
.lowest_usage
;
3606 stored_sample
->channel_total_consumed
= latest_sample
.channel_total_consumed
;
3607 previous_sample_available
= true;
3609 latest_session_consumed_total
=
3610 previous_session_consumed_total
+
3611 (latest_sample
.channel_total_consumed
- previous_sample
.channel_total_consumed
);
3614 * This is the channel's first sample, allocate space for and
3615 * store the new sample.
3617 struct channel_state_sample
*stored_sample
;
3619 stored_sample
= zmalloc(sizeof(*stored_sample
));
3620 if (!stored_sample
) {
3625 memcpy(stored_sample
, &latest_sample
, sizeof(*stored_sample
));
3626 cds_lfht_node_init(&stored_sample
->channel_state_ht_node
);
3627 cds_lfht_add(state
->channel_state_ht
,
3628 hash_channel_key(&stored_sample
->key
),
3629 &stored_sample
->channel_state_ht_node
);
3631 latest_session_consumed_total
=
3632 previous_session_consumed_total
+
3633 latest_sample
.channel_total_consumed
;
3636 channel_info
->session_info
->consumed_data_size
=
3637 latest_session_consumed_total
;
3639 /* Find triggers associated with this channel. */
3640 cds_lfht_lookup(state
->channel_triggers_ht
,
3641 hash_channel_key(&latest_sample
.key
),
3642 match_channel_trigger_list
,
3645 node
= cds_lfht_iter_get_node(&iter
);
3646 if (caa_likely(!node
)) {
3650 trigger_list
= caa_container_of(node
, struct lttng_channel_trigger_list
,
3651 channel_triggers_ht_node
);
3652 cds_list_for_each_entry(trigger_list_element
, &trigger_list
->list
,
3654 const struct lttng_condition
*condition
;
3655 const struct lttng_action
*action
;
3656 const struct lttng_trigger
*trigger
;
3657 struct notification_client_list
*client_list
= NULL
;
3658 struct lttng_evaluation
*evaluation
= NULL
;
3659 bool client_list_is_empty
;
3662 trigger
= trigger_list_element
->trigger
;
3663 condition
= lttng_trigger_get_const_condition(trigger
);
3665 action
= lttng_trigger_get_const_action(trigger
);
3667 /* Notify actions are the only type currently supported. */
3668 assert(lttng_action_get_type_const(action
) ==
3669 LTTNG_ACTION_TYPE_NOTIFY
);
3672 * Check if any client is subscribed to the result of this
3675 client_list
= get_client_list_from_condition(state
, condition
);
3676 assert(client_list
);
3677 client_list_is_empty
= cds_list_empty(&client_list
->list
);
3678 if (client_list_is_empty
) {
3680 * No clients interested in the evaluation's result,
3686 ret
= evaluate_buffer_condition(condition
, &evaluation
, state
,
3687 previous_sample_available
? &previous_sample
: NULL
,
3689 previous_session_consumed_total
,
3690 latest_session_consumed_total
,
3692 if (caa_unlikely(ret
)) {
3696 if (caa_likely(!evaluation
)) {
3700 /* Dispatch evaluation result to all clients. */
3701 ret
= send_evaluation_to_clients(trigger_list_element
->trigger
,
3702 evaluation
, client_list
, state
,
3703 channel_info
->session_info
->uid
,
3704 channel_info
->session_info
->gid
);
3705 lttng_evaluation_destroy(evaluation
);
3707 notification_client_list_put(client_list
);
3708 if (caa_unlikely(ret
)) {