sessiond: notification: refactor: split transmission and poll update
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 /*
126 * Thread safety of notification_client and notification_client_list.
127 *
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
133 *
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
138 *
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
142 * sense.
143 */
144 struct notification_client_list {
145 pthread_mutex_t lock;
146 struct urcu_ref ref;
147 const struct lttng_trigger *trigger;
148 struct cds_list_head list;
149 /* Weak reference to container. */
150 struct cds_lfht *notification_trigger_clients_ht;
151 struct cds_lfht_node notification_trigger_clients_ht_node;
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node;
154 };
155
156 struct notification_client {
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock;
159 notification_client_id id;
160 int socket;
161 /* Client protocol version. */
162 uint8_t major, minor;
163 uid_t uid;
164 gid_t gid;
165 /*
166 * Indicates if the credentials and versions of the client have been
167 * checked.
168 */
169 bool validated;
170 /*
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
174 */
175 struct cds_list_head condition_list;
176 struct cds_lfht_node client_socket_ht_node;
177 struct cds_lfht_node client_id_ht_node;
178 struct {
179 /*
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
184 * clean-up.
185 */
186 bool active;
187 struct {
188 /*
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
192 */
193 struct lttng_dynamic_buffer buffer;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type;
198 /*
199 * Indicates whether or not credentials are expected
200 * from the client.
201 */
202 bool expect_creds;
203 /*
204 * Indicates whether or not credentials were received
205 * from the client.
206 */
207 bool creds_received;
208 /* Only used during credentials reception. */
209 lttng_sock_cred creds;
210 } inbound;
211 struct {
212 /*
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
215 * already buffered.
216 *
217 * A notification is dropped whenever the buffer is not
218 * empty.
219 */
220 bool dropped_notification;
221 /*
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
227 */
228 bool queued_command_reply;
229 struct lttng_dynamic_buffer buffer;
230 } outbound;
231 } communication;
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node;
234 };
235
236 struct channel_state_sample {
237 struct channel_key key;
238 struct cds_lfht_node channel_state_ht_node;
239 uint64_t highest_usage;
240 uint64_t lowest_usage;
241 uint64_t channel_total_consumed;
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node;
244 };
245
246 static unsigned long hash_channel_key(struct channel_key *key);
247 static int evaluate_buffer_condition(const struct lttng_condition *condition,
248 struct lttng_evaluation **evaluation,
249 const struct notification_thread_state *state,
250 const struct channel_state_sample *previous_sample,
251 const struct channel_state_sample *latest_sample,
252 uint64_t previous_session_consumed_total,
253 uint64_t latest_session_consumed_total,
254 struct channel_info *channel_info);
255 static
256 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
257 const struct lttng_evaluation *evaluation,
258 struct notification_client_list *client_list,
259 struct notification_thread_state *state,
260 uid_t channel_uid, gid_t channel_gid);
261
262
263 /* session_info API */
264 static
265 void session_info_destroy(void *_data);
266 static
267 void session_info_get(struct session_info *session_info);
268 static
269 void session_info_put(struct session_info *session_info);
270 static
271 struct session_info *session_info_create(const char *name,
272 uid_t uid, gid_t gid,
273 struct lttng_session_trigger_list *trigger_list,
274 struct cds_lfht *sessions_ht);
275 static
276 void session_info_add_channel(struct session_info *session_info,
277 struct channel_info *channel_info);
278 static
279 void session_info_remove_channel(struct session_info *session_info,
280 struct channel_info *channel_info);
281
282 /* lttng_session_trigger_list API */
283 static
284 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
285 const char *session_name,
286 struct cds_lfht *session_triggers_ht);
287 static
288 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
289 const struct notification_thread_state *state,
290 const char *session_name);
291 static
292 void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list *list);
294 static
295 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
296 const struct lttng_trigger *trigger);
297
298
299 static
300 int match_client_socket(struct cds_lfht_node *node, const void *key)
301 {
302 /* This double-cast is intended to supress pointer-to-cast warning. */
303 const int socket = (int) (intptr_t) key;
304 const struct notification_client *client = caa_container_of(node,
305 struct notification_client, client_socket_ht_node);
306
307 return client->socket == socket;
308 }
309
310 static
311 int match_client_id(struct cds_lfht_node *node, const void *key)
312 {
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id = *((notification_client_id *) key);
315 const struct notification_client *client = caa_container_of(
316 node, struct notification_client, client_id_ht_node);
317
318 return client->id == id;
319 }
320
321 static
322 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
323 {
324 struct channel_key *channel_key = (struct channel_key *) key;
325 struct lttng_channel_trigger_list *trigger_list;
326
327 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
328 channel_triggers_ht_node);
329
330 return !!((channel_key->key == trigger_list->channel_key.key) &&
331 (channel_key->domain == trigger_list->channel_key.domain));
332 }
333
334 static
335 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
336 {
337 const char *session_name = (const char *) key;
338 struct lttng_session_trigger_list *trigger_list;
339
340 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
341 session_triggers_ht_node);
342
343 return !!(strcmp(trigger_list->session_name, session_name) == 0);
344 }
345
346 static
347 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
348 {
349 struct channel_key *channel_key = (struct channel_key *) key;
350 struct channel_state_sample *sample;
351
352 sample = caa_container_of(node, struct channel_state_sample,
353 channel_state_ht_node);
354
355 return !!((channel_key->key == sample->key.key) &&
356 (channel_key->domain == sample->key.domain));
357 }
358
359 static
360 int match_channel_info(struct cds_lfht_node *node, const void *key)
361 {
362 struct channel_key *channel_key = (struct channel_key *) key;
363 struct channel_info *channel_info;
364
365 channel_info = caa_container_of(node, struct channel_info,
366 channels_ht_node);
367
368 return !!((channel_key->key == channel_info->key.key) &&
369 (channel_key->domain == channel_info->key.domain));
370 }
371
372 static
373 int match_condition(struct cds_lfht_node *node, const void *key)
374 {
375 struct lttng_condition *condition_key = (struct lttng_condition *) key;
376 struct lttng_trigger_ht_element *trigger;
377 struct lttng_condition *condition;
378
379 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
380 node);
381 condition = lttng_trigger_get_condition(trigger->trigger);
382 assert(condition);
383
384 return !!lttng_condition_is_equal(condition_key, condition);
385 }
386
387 static
388 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
389 {
390 struct lttng_condition *condition_key = (struct lttng_condition *) key;
391 struct notification_client_list *client_list;
392 const struct lttng_condition *condition;
393
394 assert(condition_key);
395
396 client_list = caa_container_of(node, struct notification_client_list,
397 notification_trigger_clients_ht_node);
398 condition = lttng_trigger_get_const_condition(client_list->trigger);
399
400 return !!lttng_condition_is_equal(condition_key, condition);
401 }
402
403 static
404 int match_session(struct cds_lfht_node *node, const void *key)
405 {
406 const char *name = key;
407 struct session_info *session_info = caa_container_of(
408 node, struct session_info, sessions_ht_node);
409
410 return !strcmp(session_info->name, name);
411 }
412
413 static
414 unsigned long lttng_condition_buffer_usage_hash(
415 const struct lttng_condition *_condition)
416 {
417 unsigned long hash;
418 unsigned long condition_type;
419 struct lttng_condition_buffer_usage *condition;
420
421 condition = container_of(_condition,
422 struct lttng_condition_buffer_usage, parent);
423
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
426 if (condition->session_name) {
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 }
429 if (condition->channel_name) {
430 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
431 }
432 if (condition->domain.set) {
433 hash ^= hash_key_ulong(
434 (void *) condition->domain.type,
435 lttng_ht_seed);
436 }
437 if (condition->threshold_ratio.set) {
438 uint64_t val;
439
440 val = condition->threshold_ratio.value * (double) UINT32_MAX;
441 hash ^= hash_key_u64(&val, lttng_ht_seed);
442 } else if (condition->threshold_bytes.set) {
443 uint64_t val;
444
445 val = condition->threshold_bytes.value;
446 hash ^= hash_key_u64(&val, lttng_ht_seed);
447 }
448 return hash;
449 }
450
451 static
452 unsigned long lttng_condition_session_consumed_size_hash(
453 const struct lttng_condition *_condition)
454 {
455 unsigned long hash;
456 unsigned long condition_type =
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
458 struct lttng_condition_session_consumed_size *condition;
459 uint64_t val;
460
461 condition = container_of(_condition,
462 struct lttng_condition_session_consumed_size, parent);
463
464 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
465 if (condition->session_name) {
466 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
467 }
468 val = condition->consumed_threshold_bytes.value;
469 hash ^= hash_key_u64(&val, lttng_ht_seed);
470 return hash;
471 }
472
473 static
474 unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition *_condition)
476 {
477 unsigned long hash, condition_type;
478 struct lttng_condition_session_rotation *condition;
479
480 condition = container_of(_condition,
481 struct lttng_condition_session_rotation, parent);
482 condition_type = (unsigned long) condition->parent.type;
483 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
484 assert(condition->session_name);
485 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
486 return hash;
487 }
488
489 /*
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
493 */
494 static
495 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
496 {
497 switch (condition->type) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
500 return lttng_condition_buffer_usage_hash(condition);
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
502 return lttng_condition_session_consumed_size_hash(condition);
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
505 return lttng_condition_session_rotation_hash(condition);
506 default:
507 ERR("[notification-thread] Unexpected condition type caught");
508 abort();
509 }
510 }
511
512 static
513 unsigned long hash_channel_key(struct channel_key *key)
514 {
515 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
516 unsigned long domain_hash = hash_key_ulong(
517 (void *) (unsigned long) key->domain, lttng_ht_seed);
518
519 return key_hash ^ domain_hash;
520 }
521
522 static
523 unsigned long hash_client_socket(int socket)
524 {
525 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
526 }
527
528 static
529 unsigned long hash_client_id(notification_client_id id)
530 {
531 return hash_key_u64(&id, lttng_ht_seed);
532 }
533
534 /*
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
538 *
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
541 */
542 static
543 enum lttng_object_type get_condition_binding_object(
544 const struct lttng_condition *condition)
545 {
546 switch (lttng_condition_get_type(condition)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
550 return LTTNG_OBJECT_TYPE_CHANNEL;
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
553 return LTTNG_OBJECT_TYPE_SESSION;
554 default:
555 return LTTNG_OBJECT_TYPE_UNKNOWN;
556 }
557 }
558
559 static
560 void free_channel_info_rcu(struct rcu_head *node)
561 {
562 free(caa_container_of(node, struct channel_info, rcu_node));
563 }
564
565 static
566 void channel_info_destroy(struct channel_info *channel_info)
567 {
568 if (!channel_info) {
569 return;
570 }
571
572 if (channel_info->session_info) {
573 session_info_remove_channel(channel_info->session_info,
574 channel_info);
575 session_info_put(channel_info->session_info);
576 }
577 if (channel_info->name) {
578 free(channel_info->name);
579 }
580 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
581 }
582
583 static
584 void free_session_info_rcu(struct rcu_head *node)
585 {
586 free(caa_container_of(node, struct session_info, rcu_node));
587 }
588
589 /* Don't call directly, use the ref-counting mechanism. */
590 static
591 void session_info_destroy(void *_data)
592 {
593 struct session_info *session_info = _data;
594 int ret;
595
596 assert(session_info);
597 if (session_info->channel_infos_ht) {
598 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
599 if (ret) {
600 ERR("[notification-thread] Failed to destroy channel information hash table");
601 }
602 }
603 lttng_session_trigger_list_destroy(session_info->trigger_list);
604
605 rcu_read_lock();
606 cds_lfht_del(session_info->sessions_ht,
607 &session_info->sessions_ht_node);
608 rcu_read_unlock();
609 free(session_info->name);
610 call_rcu(&session_info->rcu_node, free_session_info_rcu);
611 }
612
613 static
614 void session_info_get(struct session_info *session_info)
615 {
616 if (!session_info) {
617 return;
618 }
619 lttng_ref_get(&session_info->ref);
620 }
621
622 static
623 void session_info_put(struct session_info *session_info)
624 {
625 if (!session_info) {
626 return;
627 }
628 lttng_ref_put(&session_info->ref);
629 }
630
631 static
632 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
633 struct lttng_session_trigger_list *trigger_list,
634 struct cds_lfht *sessions_ht)
635 {
636 struct session_info *session_info;
637
638 assert(name);
639
640 session_info = zmalloc(sizeof(*session_info));
641 if (!session_info) {
642 goto end;
643 }
644 lttng_ref_init(&session_info->ref, session_info_destroy);
645
646 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
647 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
648 if (!session_info->channel_infos_ht) {
649 goto error;
650 }
651
652 cds_lfht_node_init(&session_info->sessions_ht_node);
653 session_info->name = strdup(name);
654 if (!session_info->name) {
655 goto error;
656 }
657 session_info->uid = uid;
658 session_info->gid = gid;
659 session_info->trigger_list = trigger_list;
660 session_info->sessions_ht = sessions_ht;
661 end:
662 return session_info;
663 error:
664 session_info_put(session_info);
665 return NULL;
666 }
667
668 static
669 void session_info_add_channel(struct session_info *session_info,
670 struct channel_info *channel_info)
671 {
672 rcu_read_lock();
673 cds_lfht_add(session_info->channel_infos_ht,
674 hash_channel_key(&channel_info->key),
675 &channel_info->session_info_channels_ht_node);
676 rcu_read_unlock();
677 }
678
679 static
680 void session_info_remove_channel(struct session_info *session_info,
681 struct channel_info *channel_info)
682 {
683 rcu_read_lock();
684 cds_lfht_del(session_info->channel_infos_ht,
685 &channel_info->session_info_channels_ht_node);
686 rcu_read_unlock();
687 }
688
689 static
690 struct channel_info *channel_info_create(const char *channel_name,
691 struct channel_key *channel_key, uint64_t channel_capacity,
692 struct session_info *session_info)
693 {
694 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
695
696 if (!channel_info) {
697 goto end;
698 }
699
700 cds_lfht_node_init(&channel_info->channels_ht_node);
701 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
702 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
703 channel_info->capacity = channel_capacity;
704
705 channel_info->name = strdup(channel_name);
706 if (!channel_info->name) {
707 goto error;
708 }
709
710 /*
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
714 */
715 session_info_get(session_info);
716 session_info_add_channel(session_info, channel_info);
717 channel_info->session_info = session_info;
718 end:
719 return channel_info;
720 error:
721 channel_info_destroy(channel_info);
722 return NULL;
723 }
724
725 static
726 bool notification_client_list_get(struct notification_client_list *list)
727 {
728 return urcu_ref_get_unless_zero(&list->ref);
729 }
730
731 static
732 void free_notification_client_list_rcu(struct rcu_head *node)
733 {
734 free(caa_container_of(node, struct notification_client_list,
735 rcu_node));
736 }
737
738 static
739 void notification_client_list_release(struct urcu_ref *list_ref)
740 {
741 struct notification_client_list *list =
742 container_of(list_ref, typeof(*list), ref);
743 struct notification_client_list_element *client_list_element, *tmp;
744
745 if (list->notification_trigger_clients_ht) {
746 rcu_read_lock();
747 cds_lfht_del(list->notification_trigger_clients_ht,
748 &list->notification_trigger_clients_ht_node);
749 rcu_read_unlock();
750 list->notification_trigger_clients_ht = NULL;
751 }
752 cds_list_for_each_entry_safe(client_list_element, tmp,
753 &list->list, node) {
754 free(client_list_element);
755 }
756 pthread_mutex_destroy(&list->lock);
757 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
758 }
759
760 static
761 struct notification_client_list *notification_client_list_create(
762 const struct lttng_trigger *trigger)
763 {
764 struct notification_client_list *client_list =
765 zmalloc(sizeof(*client_list));
766
767 if (!client_list) {
768 goto error;
769 }
770 pthread_mutex_init(&client_list->lock, NULL);
771 urcu_ref_init(&client_list->ref);
772 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
773 CDS_INIT_LIST_HEAD(&client_list->list);
774 client_list->trigger = trigger;
775 error:
776 return client_list;
777 }
778
779 static
780 void publish_notification_client_list(
781 struct notification_thread_state *state,
782 struct notification_client_list *list)
783 {
784 const struct lttng_condition *condition =
785 lttng_trigger_get_const_condition(list->trigger);
786
787 assert(!list->notification_trigger_clients_ht);
788
789 list->notification_trigger_clients_ht =
790 state->notification_trigger_clients_ht;
791
792 rcu_read_lock();
793 cds_lfht_add(state->notification_trigger_clients_ht,
794 lttng_condition_hash(condition),
795 &list->notification_trigger_clients_ht_node);
796 rcu_read_unlock();
797 }
798
799 static
800 void notification_client_list_put(struct notification_client_list *list)
801 {
802 if (!list) {
803 return;
804 }
805 return urcu_ref_put(&list->ref, notification_client_list_release);
806 }
807
808 /* Provides a reference to the returned list. */
809 static
810 struct notification_client_list *get_client_list_from_condition(
811 struct notification_thread_state *state,
812 const struct lttng_condition *condition)
813 {
814 struct cds_lfht_node *node;
815 struct cds_lfht_iter iter;
816 struct notification_client_list *list = NULL;
817
818 rcu_read_lock();
819 cds_lfht_lookup(state->notification_trigger_clients_ht,
820 lttng_condition_hash(condition),
821 match_client_list_condition,
822 condition,
823 &iter);
824 node = cds_lfht_iter_get_node(&iter);
825 if (node) {
826 list = container_of(node, struct notification_client_list,
827 notification_trigger_clients_ht_node);
828 list = notification_client_list_get(list) ? list : NULL;
829 }
830
831 rcu_read_unlock();
832 return list;
833 }
834
835 static
836 int evaluate_channel_condition_for_client(
837 const struct lttng_condition *condition,
838 struct notification_thread_state *state,
839 struct lttng_evaluation **evaluation,
840 uid_t *session_uid, gid_t *session_gid)
841 {
842 int ret;
843 struct cds_lfht_iter iter;
844 struct cds_lfht_node *node;
845 struct channel_info *channel_info = NULL;
846 struct channel_key *channel_key = NULL;
847 struct channel_state_sample *last_sample = NULL;
848 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
849
850 rcu_read_lock();
851
852 /* Find the channel associated with the condition. */
853 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
854 channel_trigger_list, channel_triggers_ht_node) {
855 struct lttng_trigger_list_element *element;
856
857 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
858 const struct lttng_condition *current_condition =
859 lttng_trigger_get_const_condition(
860 element->trigger);
861
862 assert(current_condition);
863 if (!lttng_condition_is_equal(condition,
864 current_condition)) {
865 continue;
866 }
867
868 /* Found the trigger, save the channel key. */
869 channel_key = &channel_trigger_list->channel_key;
870 break;
871 }
872 if (channel_key) {
873 /* The channel key was found stop iteration. */
874 break;
875 }
876 }
877
878 if (!channel_key){
879 /* No channel found; normal exit. */
880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
881 ret = 0;
882 goto end;
883 }
884
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state->channels_ht,
887 hash_channel_key(channel_key),
888 match_channel_info,
889 channel_key,
890 &iter);
891 node = cds_lfht_iter_get_node(&iter);
892 assert(node);
893 channel_info = caa_container_of(node, struct channel_info,
894 channels_ht_node);
895
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state->channel_state_ht,
898 hash_channel_key(channel_key),
899 match_channel_state_sample,
900 channel_key,
901 &iter);
902 node = cds_lfht_iter_get_node(&iter);
903 if (node) {
904 last_sample = caa_container_of(node,
905 struct channel_state_sample,
906 channel_state_ht_node);
907 } else {
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
910 ret = 0;
911 goto end;
912 }
913
914 ret = evaluate_buffer_condition(condition, evaluation, state,
915 NULL, last_sample,
916 0, channel_info->session_info->consumed_data_size,
917 channel_info);
918 if (ret) {
919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
920 goto end;
921 }
922
923 *session_uid = channel_info->session_info->uid;
924 *session_gid = channel_info->session_info->gid;
925 end:
926 rcu_read_unlock();
927 return ret;
928 }
929
930 static
931 const char *get_condition_session_name(const struct lttng_condition *condition)
932 {
933 const char *session_name = NULL;
934 enum lttng_condition_status status;
935
936 switch (lttng_condition_get_type(condition)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
939 status = lttng_condition_buffer_usage_get_session_name(
940 condition, &session_name);
941 break;
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
943 status = lttng_condition_session_consumed_size_get_session_name(
944 condition, &session_name);
945 break;
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
948 status = lttng_condition_session_rotation_get_session_name(
949 condition, &session_name);
950 break;
951 default:
952 abort();
953 }
954 if (status != LTTNG_CONDITION_STATUS_OK) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
956 goto end;
957 }
958 end:
959 return session_name;
960 }
961
962 static
963 int evaluate_session_condition_for_client(
964 const struct lttng_condition *condition,
965 struct notification_thread_state *state,
966 struct lttng_evaluation **evaluation,
967 uid_t *session_uid, gid_t *session_gid)
968 {
969 int ret;
970 struct cds_lfht_iter iter;
971 struct cds_lfht_node *node;
972 const char *session_name;
973 struct session_info *session_info = NULL;
974
975 rcu_read_lock();
976 session_name = get_condition_session_name(condition);
977
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state->sessions_ht,
980 hash_key_str(session_name, lttng_ht_seed),
981 match_session,
982 session_name,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 if (!node) {
986 DBG("[notification-thread] No known session matching name \"%s\"",
987 session_name);
988 ret = 0;
989 goto end;
990 }
991
992 session_info = caa_container_of(node, struct session_info,
993 sessions_ht_node);
994 session_info_get(session_info);
995
996 /*
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
999 */
1000 switch (lttng_condition_get_type(condition)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1002 if (!session_info->rotation.ongoing) {
1003 ret = 0;
1004 goto end_session_put;
1005 }
1006
1007 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1008 session_info->rotation.id);
1009 if (!*evaluation) {
1010 /* Fatal error. */
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info->name);
1013 ret = -1;
1014 goto end_session_put;
1015 }
1016 ret = 0;
1017 break;
1018 default:
1019 ret = 0;
1020 goto end_session_put;
1021 }
1022
1023 *session_uid = session_info->uid;
1024 *session_gid = session_info->gid;
1025
1026 end_session_put:
1027 session_info_put(session_info);
1028 end:
1029 rcu_read_unlock();
1030 return ret;
1031 }
1032
1033 static
1034 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1035 const struct lttng_condition *condition,
1036 struct notification_client *client,
1037 struct notification_thread_state *state)
1038 {
1039 int ret;
1040 struct lttng_evaluation *evaluation = NULL;
1041 struct notification_client_list client_list = {
1042 .lock = PTHREAD_MUTEX_INITIALIZER,
1043 };
1044 struct notification_client_list_element client_list_element = { 0 };
1045 uid_t object_uid = 0;
1046 gid_t object_gid = 0;
1047
1048 assert(trigger);
1049 assert(condition);
1050 assert(client);
1051 assert(state);
1052
1053 switch (get_condition_binding_object(condition)) {
1054 case LTTNG_OBJECT_TYPE_SESSION:
1055 ret = evaluate_session_condition_for_client(condition, state,
1056 &evaluation, &object_uid, &object_gid);
1057 break;
1058 case LTTNG_OBJECT_TYPE_CHANNEL:
1059 ret = evaluate_channel_condition_for_client(condition, state,
1060 &evaluation, &object_uid, &object_gid);
1061 break;
1062 case LTTNG_OBJECT_TYPE_NONE:
1063 ret = 0;
1064 goto end;
1065 case LTTNG_OBJECT_TYPE_UNKNOWN:
1066 default:
1067 ret = -1;
1068 goto end;
1069 }
1070 if (ret) {
1071 /* Fatal error. */
1072 goto end;
1073 }
1074 if (!evaluation) {
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1077 ret = 0;
1078 goto end;
1079 }
1080
1081 /*
1082 * Create a temporary client list with the client currently
1083 * subscribing.
1084 */
1085 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1086 CDS_INIT_LIST_HEAD(&client_list.list);
1087 client_list.trigger = trigger;
1088
1089 CDS_INIT_LIST_HEAD(&client_list_element.node);
1090 client_list_element.client = client;
1091 cds_list_add(&client_list_element.node, &client_list.list);
1092
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1096 state, object_uid, object_gid);
1097
1098 end:
1099 return ret;
1100 }
1101
1102 static
1103 int notification_thread_client_subscribe(struct notification_client *client,
1104 struct lttng_condition *condition,
1105 struct notification_thread_state *state,
1106 enum lttng_notification_channel_status *_status)
1107 {
1108 int ret = 0;
1109 struct notification_client_list *client_list = NULL;
1110 struct lttng_condition_list_element *condition_list_element = NULL;
1111 struct notification_client_list_element *client_list_element = NULL;
1112 enum lttng_notification_channel_status status =
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1114
1115 /*
1116 * Ensure that the client has not already subscribed to this condition
1117 * before.
1118 */
1119 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1120 if (lttng_condition_is_equal(condition_list_element->condition,
1121 condition)) {
1122 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1123 goto end;
1124 }
1125 }
1126
1127 condition_list_element = zmalloc(sizeof(*condition_list_element));
1128 if (!condition_list_element) {
1129 ret = -1;
1130 goto error;
1131 }
1132 client_list_element = zmalloc(sizeof(*client_list_element));
1133 if (!client_list_element) {
1134 ret = -1;
1135 goto error;
1136 }
1137
1138 /*
1139 * Add the newly-subscribed condition to the client's subscription list.
1140 */
1141 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1142 condition_list_element->condition = condition;
1143 cds_list_add(&condition_list_element->node, &client->condition_list);
1144
1145 client_list = get_client_list_from_condition(state, condition);
1146 if (!client_list) {
1147 /*
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1151 */
1152 free(client_list_element);
1153 goto end;
1154 }
1155
1156 /*
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
1160 *
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
1164 */
1165 if (evaluate_condition_for_client(client_list->trigger, condition,
1166 client, state)) {
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1168 ret = -1;
1169 free(client_list_element);
1170 goto end;
1171 }
1172
1173 /*
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1176 * added prior.
1177 */
1178 client_list_element->client = client;
1179 CDS_INIT_LIST_HEAD(&client_list_element->node);
1180
1181 pthread_mutex_lock(&client_list->lock);
1182 cds_list_add(&client_list_element->node, &client_list->list);
1183 pthread_mutex_unlock(&client_list->lock);
1184 end:
1185 if (_status) {
1186 *_status = status;
1187 }
1188 if (client_list) {
1189 notification_client_list_put(client_list);
1190 }
1191 return ret;
1192 error:
1193 free(condition_list_element);
1194 free(client_list_element);
1195 return ret;
1196 }
1197
1198 static
1199 int notification_thread_client_unsubscribe(
1200 struct notification_client *client,
1201 struct lttng_condition *condition,
1202 struct notification_thread_state *state,
1203 enum lttng_notification_channel_status *_status)
1204 {
1205 struct notification_client_list *client_list;
1206 struct lttng_condition_list_element *condition_list_element,
1207 *condition_tmp;
1208 struct notification_client_list_element *client_list_element,
1209 *client_tmp;
1210 bool condition_found = false;
1211 enum lttng_notification_channel_status status =
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1213
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1216 &client->condition_list, node) {
1217 if (!lttng_condition_is_equal(condition_list_element->condition,
1218 condition)) {
1219 continue;
1220 }
1221
1222 cds_list_del(&condition_list_element->node);
1223 /*
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1227 */
1228 if (condition != condition_list_element->condition) {
1229 lttng_condition_destroy(
1230 condition_list_element->condition);
1231 }
1232 free(condition_list_element);
1233 condition_found = true;
1234 break;
1235 }
1236
1237 if (!condition_found) {
1238 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1239 goto end;
1240 }
1241
1242 /*
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1245 */
1246 client_list = get_client_list_from_condition(state, condition);
1247 if (!client_list) {
1248 goto end;
1249 }
1250
1251 pthread_mutex_lock(&client_list->lock);
1252 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1253 &client_list->list, node) {
1254 if (client_list_element->client->id != client->id) {
1255 continue;
1256 }
1257 cds_list_del(&client_list_element->node);
1258 free(client_list_element);
1259 break;
1260 }
1261 pthread_mutex_unlock(&client_list->lock);
1262 notification_client_list_put(client_list);
1263 client_list = NULL;
1264 end:
1265 lttng_condition_destroy(condition);
1266 if (_status) {
1267 *_status = status;
1268 }
1269 return 0;
1270 }
1271
1272 static
1273 void free_notification_client_rcu(struct rcu_head *node)
1274 {
1275 free(caa_container_of(node, struct notification_client, rcu_node));
1276 }
1277
1278 static
1279 void notification_client_destroy(struct notification_client *client,
1280 struct notification_thread_state *state)
1281 {
1282 if (!client) {
1283 return;
1284 }
1285
1286 /*
1287 * The client object is not reachable by other threads, no need to lock
1288 * the client here.
1289 */
1290 if (client->socket >= 0) {
1291 (void) lttcomm_close_unix_sock(client->socket);
1292 client->socket = -1;
1293 }
1294 client->communication.active = false;
1295 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1296 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1297 pthread_mutex_destroy(&client->lock);
1298 call_rcu(&client->rcu_node, free_notification_client_rcu);
1299 }
1300
1301 /*
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1303 * client pointer).
1304 */
1305 static
1306 struct notification_client *get_client_from_socket(int socket,
1307 struct notification_thread_state *state)
1308 {
1309 struct cds_lfht_iter iter;
1310 struct cds_lfht_node *node;
1311 struct notification_client *client = NULL;
1312
1313 cds_lfht_lookup(state->client_socket_ht,
1314 hash_client_socket(socket),
1315 match_client_socket,
1316 (void *) (unsigned long) socket,
1317 &iter);
1318 node = cds_lfht_iter_get_node(&iter);
1319 if (!node) {
1320 goto end;
1321 }
1322
1323 client = caa_container_of(node, struct notification_client,
1324 client_socket_ht_node);
1325 end:
1326 return client;
1327 }
1328
1329 static
1330 bool buffer_usage_condition_applies_to_channel(
1331 const struct lttng_condition *condition,
1332 const struct channel_info *channel_info)
1333 {
1334 enum lttng_condition_status status;
1335 enum lttng_domain_type condition_domain;
1336 const char *condition_session_name = NULL;
1337 const char *condition_channel_name = NULL;
1338
1339 status = lttng_condition_buffer_usage_get_domain_type(condition,
1340 &condition_domain);
1341 assert(status == LTTNG_CONDITION_STATUS_OK);
1342 if (channel_info->key.domain != condition_domain) {
1343 goto fail;
1344 }
1345
1346 status = lttng_condition_buffer_usage_get_session_name(
1347 condition, &condition_session_name);
1348 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1349
1350 status = lttng_condition_buffer_usage_get_channel_name(
1351 condition, &condition_channel_name);
1352 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1353
1354 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1355 goto fail;
1356 }
1357 if (strcmp(channel_info->name, condition_channel_name)) {
1358 goto fail;
1359 }
1360
1361 return true;
1362 fail:
1363 return false;
1364 }
1365
1366 static
1367 bool session_consumed_size_condition_applies_to_channel(
1368 const struct lttng_condition *condition,
1369 const struct channel_info *channel_info)
1370 {
1371 enum lttng_condition_status status;
1372 const char *condition_session_name = NULL;
1373
1374 status = lttng_condition_session_consumed_size_get_session_name(
1375 condition, &condition_session_name);
1376 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1377
1378 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1379 goto fail;
1380 }
1381
1382 return true;
1383 fail:
1384 return false;
1385 }
1386
1387 static
1388 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1389 const struct channel_info *channel_info)
1390 {
1391 const struct lttng_condition *condition;
1392 bool trigger_applies;
1393
1394 condition = lttng_trigger_get_const_condition(trigger);
1395 if (!condition) {
1396 goto fail;
1397 }
1398
1399 switch (lttng_condition_get_type(condition)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1402 trigger_applies = buffer_usage_condition_applies_to_channel(
1403 condition, channel_info);
1404 break;
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1406 trigger_applies = session_consumed_size_condition_applies_to_channel(
1407 condition, channel_info);
1408 break;
1409 default:
1410 goto fail;
1411 }
1412
1413 return trigger_applies;
1414 fail:
1415 return false;
1416 }
1417
1418 static
1419 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1420 struct notification_client *client)
1421 {
1422 bool applies = false;
1423 struct lttng_condition_list_element *condition_list_element;
1424
1425 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1426 node) {
1427 applies = lttng_condition_is_equal(
1428 condition_list_element->condition,
1429 lttng_trigger_get_condition(trigger));
1430 if (applies) {
1431 break;
1432 }
1433 }
1434 return applies;
1435 }
1436
1437 /* Must be called with RCU read lock held. */
1438 static
1439 struct lttng_session_trigger_list *get_session_trigger_list(
1440 struct notification_thread_state *state,
1441 const char *session_name)
1442 {
1443 struct lttng_session_trigger_list *list = NULL;
1444 struct cds_lfht_node *node;
1445 struct cds_lfht_iter iter;
1446
1447 cds_lfht_lookup(state->session_triggers_ht,
1448 hash_key_str(session_name, lttng_ht_seed),
1449 match_session_trigger_list,
1450 session_name,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 if (!node) {
1454 /*
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1457 */
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1459 session_name);
1460 goto end;
1461 }
1462
1463 list = caa_container_of(node,
1464 struct lttng_session_trigger_list,
1465 session_triggers_ht_node);
1466 end:
1467 return list;
1468 }
1469
1470 /*
1471 * Allocate an empty lttng_session_trigger_list for the session named
1472 * 'session_name'.
1473 *
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1477 */
1478 static
1479 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1480 const char *session_name,
1481 struct cds_lfht *session_triggers_ht)
1482 {
1483 struct lttng_session_trigger_list *list;
1484
1485 list = zmalloc(sizeof(*list));
1486 if (!list) {
1487 goto end;
1488 }
1489 list->session_name = session_name;
1490 CDS_INIT_LIST_HEAD(&list->list);
1491 cds_lfht_node_init(&list->session_triggers_ht_node);
1492 list->session_triggers_ht = session_triggers_ht;
1493
1494 rcu_read_lock();
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht,
1497 hash_key_str(session_name, lttng_ht_seed),
1498 &list->session_triggers_ht_node);
1499 rcu_read_unlock();
1500 end:
1501 return list;
1502 }
1503
1504 static
1505 void free_session_trigger_list_rcu(struct rcu_head *node)
1506 {
1507 free(caa_container_of(node, struct lttng_session_trigger_list,
1508 rcu_node));
1509 }
1510
1511 static
1512 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1513 {
1514 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1515
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1518 &list->list, node) {
1519 cds_list_del(&trigger_list_element->node);
1520 free(trigger_list_element);
1521 }
1522 rcu_read_lock();
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list->session_triggers_ht,
1525 &list->session_triggers_ht_node);
1526 rcu_read_unlock();
1527 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1528 }
1529
1530 static
1531 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1532 const struct lttng_trigger *trigger)
1533 {
1534 int ret = 0;
1535 struct lttng_trigger_list_element *new_element =
1536 zmalloc(sizeof(*new_element));
1537
1538 if (!new_element) {
1539 ret = -1;
1540 goto end;
1541 }
1542 CDS_INIT_LIST_HEAD(&new_element->node);
1543 new_element->trigger = trigger;
1544 cds_list_add(&new_element->node, &list->list);
1545 end:
1546 return ret;
1547 }
1548
1549 static
1550 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1551 const char *session_name)
1552 {
1553 bool applies = false;
1554 const struct lttng_condition *condition;
1555
1556 condition = lttng_trigger_get_const_condition(trigger);
1557 switch (lttng_condition_get_type(condition)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1560 {
1561 enum lttng_condition_status condition_status;
1562 const char *condition_session_name;
1563
1564 condition_status = lttng_condition_session_rotation_get_session_name(
1565 condition, &condition_session_name);
1566 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1568 goto end;
1569 }
1570
1571 assert(condition_session_name);
1572 applies = !strcmp(condition_session_name, session_name);
1573 break;
1574 }
1575 default:
1576 goto end;
1577 }
1578 end:
1579 return applies;
1580 }
1581
1582 /*
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1585 *
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1589 */
1590 static
1591 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1592 const struct notification_thread_state *state,
1593 const char *session_name)
1594 {
1595 int trigger_count = 0;
1596 struct lttng_session_trigger_list *session_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 struct cds_lfht_iter iter;
1599
1600 session_trigger_list = lttng_session_trigger_list_create(session_name,
1601 state->session_triggers_ht);
1602
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1605 node) {
1606 int ret;
1607
1608 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1609 session_name)) {
1610 continue;
1611 }
1612
1613 ret = lttng_session_trigger_list_add(session_trigger_list,
1614 trigger_ht_element->trigger);
1615 if (ret) {
1616 goto error;
1617 }
1618
1619 trigger_count++;
1620 }
1621
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1623 trigger_count);
1624 return session_trigger_list;
1625 error:
1626 lttng_session_trigger_list_destroy(session_trigger_list);
1627 return NULL;
1628 }
1629
1630 static
1631 struct session_info *find_or_create_session_info(
1632 struct notification_thread_state *state,
1633 const char *name, uid_t uid, gid_t gid)
1634 {
1635 struct session_info *session = NULL;
1636 struct cds_lfht_node *node;
1637 struct cds_lfht_iter iter;
1638 struct lttng_session_trigger_list *trigger_list;
1639
1640 rcu_read_lock();
1641 cds_lfht_lookup(state->sessions_ht,
1642 hash_key_str(name, lttng_ht_seed),
1643 match_session,
1644 name,
1645 &iter);
1646 node = cds_lfht_iter_get_node(&iter);
1647 if (node) {
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1649 name, uid, gid);
1650 session = caa_container_of(node, struct session_info,
1651 sessions_ht_node);
1652 assert(session->uid == uid);
1653 assert(session->gid == gid);
1654 session_info_get(session);
1655 goto end;
1656 }
1657
1658 trigger_list = lttng_session_trigger_list_build(state, name);
1659 if (!trigger_list) {
1660 goto error;
1661 }
1662
1663 session = session_info_create(name, uid, gid, trigger_list,
1664 state->sessions_ht);
1665 if (!session) {
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1667 name, uid, gid);
1668 lttng_session_trigger_list_destroy(trigger_list);
1669 goto error;
1670 }
1671 trigger_list = NULL;
1672
1673 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1674 &session->sessions_ht_node);
1675 end:
1676 rcu_read_unlock();
1677 return session;
1678 error:
1679 rcu_read_unlock();
1680 session_info_put(session);
1681 return NULL;
1682 }
1683
1684 static
1685 int handle_notification_thread_command_add_channel(
1686 struct notification_thread_state *state,
1687 const char *session_name, uid_t session_uid, gid_t session_gid,
1688 const char *channel_name, enum lttng_domain_type channel_domain,
1689 uint64_t channel_key_int, uint64_t channel_capacity,
1690 enum lttng_error_code *cmd_result)
1691 {
1692 struct cds_list_head trigger_list;
1693 struct channel_info *new_channel_info = NULL;
1694 struct channel_key channel_key = {
1695 .key = channel_key_int,
1696 .domain = channel_domain,
1697 };
1698 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1699 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1700 int trigger_count = 0;
1701 struct cds_lfht_iter iter;
1702 struct session_info *session_info = NULL;
1703
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1705 channel_name, session_name, channel_key_int,
1706 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1707
1708 CDS_INIT_LIST_HEAD(&trigger_list);
1709
1710 session_info = find_or_create_session_info(state, session_name,
1711 session_uid, session_gid);
1712 if (!session_info) {
1713 /* Allocation error or an internal error occurred. */
1714 goto error;
1715 }
1716
1717 new_channel_info = channel_info_create(channel_name, &channel_key,
1718 channel_capacity, session_info);
1719 if (!new_channel_info) {
1720 goto error;
1721 }
1722
1723 rcu_read_lock();
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1726 node) {
1727 struct lttng_trigger_list_element *new_element;
1728
1729 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1730 new_channel_info)) {
1731 continue;
1732 }
1733
1734 new_element = zmalloc(sizeof(*new_element));
1735 if (!new_element) {
1736 rcu_read_unlock();
1737 goto error;
1738 }
1739 CDS_INIT_LIST_HEAD(&new_element->node);
1740 new_element->trigger = trigger_ht_element->trigger;
1741 cds_list_add(&new_element->node, &trigger_list);
1742 trigger_count++;
1743 }
1744 rcu_read_unlock();
1745
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1747 trigger_count);
1748 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1749 if (!channel_trigger_list) {
1750 goto error;
1751 }
1752 channel_trigger_list->channel_key = new_channel_info->key;
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1754 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1755 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1756
1757 rcu_read_lock();
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state->channels_ht,
1760 hash_channel_key(&new_channel_info->key),
1761 &new_channel_info->channels_ht_node);
1762 /*
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1765 */
1766 cds_lfht_add(state->channel_triggers_ht,
1767 hash_channel_key(&new_channel_info->key),
1768 &channel_trigger_list->channel_triggers_ht_node);
1769 rcu_read_unlock();
1770 session_info_put(session_info);
1771 *cmd_result = LTTNG_OK;
1772 return 0;
1773 error:
1774 channel_info_destroy(new_channel_info);
1775 session_info_put(session_info);
1776 return 1;
1777 }
1778
1779 static
1780 void free_channel_trigger_list_rcu(struct rcu_head *node)
1781 {
1782 free(caa_container_of(node, struct lttng_channel_trigger_list,
1783 rcu_node));
1784 }
1785
1786 static
1787 void free_channel_state_sample_rcu(struct rcu_head *node)
1788 {
1789 free(caa_container_of(node, struct channel_state_sample,
1790 rcu_node));
1791 }
1792
1793 static
1794 int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state *state,
1796 uint64_t channel_key, enum lttng_domain_type domain,
1797 enum lttng_error_code *cmd_result)
1798 {
1799 struct cds_lfht_node *node;
1800 struct cds_lfht_iter iter;
1801 struct lttng_channel_trigger_list *trigger_list;
1802 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1803 struct channel_key key = { .key = channel_key, .domain = domain };
1804 struct channel_info *channel_info;
1805
1806 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1807 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1808
1809 rcu_read_lock();
1810
1811 cds_lfht_lookup(state->channel_triggers_ht,
1812 hash_channel_key(&key),
1813 match_channel_trigger_list,
1814 &key,
1815 &iter);
1816 node = cds_lfht_iter_get_node(&iter);
1817 /*
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1820 */
1821 if (!node) {
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1823 goto end;
1824 }
1825
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1828 channel_triggers_ht_node);
1829 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1830 &trigger_list->list, node) {
1831 cds_list_del(&trigger_list_element->node);
1832 free(trigger_list_element);
1833 }
1834 cds_lfht_del(state->channel_triggers_ht, node);
1835 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1836
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state->channel_state_ht,
1839 hash_channel_key(&key),
1840 match_channel_state_sample,
1841 &key,
1842 &iter);
1843 node = cds_lfht_iter_get_node(&iter);
1844 /*
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1847 */
1848 if (node) {
1849 struct channel_state_sample *sample = caa_container_of(node,
1850 struct channel_state_sample,
1851 channel_state_ht_node);
1852
1853 cds_lfht_del(state->channel_state_ht, node);
1854 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1855 }
1856
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state->channels_ht,
1859 hash_channel_key(&key),
1860 match_channel_info,
1861 &key,
1862 &iter);
1863 node = cds_lfht_iter_get_node(&iter);
1864 assert(node);
1865 channel_info = caa_container_of(node, struct channel_info,
1866 channels_ht_node);
1867 cds_lfht_del(state->channels_ht, node);
1868 channel_info_destroy(channel_info);
1869 end:
1870 rcu_read_unlock();
1871 *cmd_result = LTTNG_OK;
1872 return 0;
1873 }
1874
1875 static
1876 int handle_notification_thread_command_session_rotation(
1877 struct notification_thread_state *state,
1878 enum notification_thread_command_type cmd_type,
1879 const char *session_name, uid_t session_uid, gid_t session_gid,
1880 uint64_t trace_archive_chunk_id,
1881 struct lttng_trace_archive_location *location,
1882 enum lttng_error_code *_cmd_result)
1883 {
1884 int ret = 0;
1885 enum lttng_error_code cmd_result = LTTNG_OK;
1886 struct lttng_session_trigger_list *trigger_list;
1887 struct lttng_trigger_list_element *trigger_list_element;
1888 struct session_info *session_info;
1889
1890 rcu_read_lock();
1891
1892 session_info = find_or_create_session_info(state, session_name,
1893 session_uid, session_gid);
1894 if (!session_info) {
1895 /* Allocation error or an internal error occurred. */
1896 ret = -1;
1897 cmd_result = LTTNG_ERR_NOMEM;
1898 goto end;
1899 }
1900
1901 session_info->rotation.ongoing =
1902 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1903 session_info->rotation.id = trace_archive_chunk_id;
1904 trigger_list = get_session_trigger_list(state, session_name);
1905 if (!trigger_list) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1907 session_name);
1908 goto end;
1909 }
1910
1911 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1912 node) {
1913 const struct lttng_condition *condition;
1914 const struct lttng_action *action;
1915 const struct lttng_trigger *trigger;
1916 struct notification_client_list *client_list;
1917 struct lttng_evaluation *evaluation = NULL;
1918 enum lttng_condition_type condition_type;
1919 bool client_list_is_empty;
1920
1921 trigger = trigger_list_element->trigger;
1922 condition = lttng_trigger_get_const_condition(trigger);
1923 assert(condition);
1924 condition_type = lttng_condition_get_type(condition);
1925
1926 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1927 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1928 continue;
1929 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1930 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1931 continue;
1932 }
1933
1934 action = lttng_trigger_get_const_action(trigger);
1935
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action) ==
1938 LTTNG_ACTION_TYPE_NOTIFY);
1939
1940 client_list = get_client_list_from_condition(state, condition);
1941 assert(client_list);
1942
1943 pthread_mutex_lock(&client_list->lock);
1944 client_list_is_empty = cds_list_empty(&client_list->list);
1945 pthread_mutex_unlock(&client_list->lock);
1946 if (client_list_is_empty) {
1947 /*
1948 * No clients interested in the evaluation's result,
1949 * skip it.
1950 */
1951 continue;
1952 }
1953
1954 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1955 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id);
1957 } else {
1958 evaluation = lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id, location);
1960 }
1961
1962 if (!evaluation) {
1963 /* Internal error */
1964 ret = -1;
1965 cmd_result = LTTNG_ERR_UNK;
1966 goto put_list;
1967 }
1968
1969 /* Dispatch evaluation result to all clients. */
1970 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1971 evaluation, client_list, state,
1972 session_info->uid,
1973 session_info->gid);
1974 lttng_evaluation_destroy(evaluation);
1975 put_list:
1976 notification_client_list_put(client_list);
1977 if (caa_unlikely(ret)) {
1978 break;
1979 }
1980 }
1981 end:
1982 session_info_put(session_info);
1983 *_cmd_result = cmd_result;
1984 rcu_read_unlock();
1985 return ret;
1986 }
1987
1988 static
1989 int condition_is_supported(struct lttng_condition *condition)
1990 {
1991 int ret;
1992
1993 switch (lttng_condition_get_type(condition)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1996 {
1997 enum lttng_domain_type domain;
1998
1999 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2000 &domain);
2001 if (ret) {
2002 ret = -1;
2003 goto end;
2004 }
2005
2006 if (domain != LTTNG_DOMAIN_KERNEL) {
2007 ret = 1;
2008 goto end;
2009 }
2010
2011 /*
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2015 */
2016 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2017 break;
2018 }
2019 default:
2020 ret = 1;
2021 }
2022 end:
2023 return ret;
2024 }
2025
2026 /* Must be called with RCU read lock held. */
2027 static
2028 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
2029 struct notification_thread_state *state)
2030 {
2031 int ret = 0;
2032 const struct lttng_condition *condition;
2033 const char *session_name;
2034 struct lttng_session_trigger_list *trigger_list;
2035
2036 condition = lttng_trigger_get_const_condition(trigger);
2037 switch (lttng_condition_get_type(condition)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2040 {
2041 enum lttng_condition_status status;
2042
2043 status = lttng_condition_session_rotation_get_session_name(
2044 condition, &session_name);
2045 if (status != LTTNG_CONDITION_STATUS_OK) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2047 ret = -1;
2048 goto end;
2049 }
2050 break;
2051 }
2052 default:
2053 ret = -1;
2054 goto end;
2055 }
2056
2057 trigger_list = get_session_trigger_list(state, session_name);
2058 if (!trigger_list) {
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2060 session_name);
2061 goto end;
2062
2063 }
2064
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2066 session_name);
2067 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2068 end:
2069 return ret;
2070 }
2071
2072 /* Must be called with RCU read lock held. */
2073 static
2074 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
2075 struct notification_thread_state *state)
2076 {
2077 int ret = 0;
2078 struct cds_lfht_node *node;
2079 struct cds_lfht_iter iter;
2080 struct channel_info *channel;
2081
2082 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2083 channels_ht_node) {
2084 struct lttng_trigger_list_element *trigger_list_element;
2085 struct lttng_channel_trigger_list *trigger_list;
2086 struct cds_lfht_iter lookup_iter;
2087
2088 if (!trigger_applies_to_channel(trigger, channel)) {
2089 continue;
2090 }
2091
2092 cds_lfht_lookup(state->channel_triggers_ht,
2093 hash_channel_key(&channel->key),
2094 match_channel_trigger_list,
2095 &channel->key,
2096 &lookup_iter);
2097 node = cds_lfht_iter_get_node(&lookup_iter);
2098 assert(node);
2099 trigger_list = caa_container_of(node,
2100 struct lttng_channel_trigger_list,
2101 channel_triggers_ht_node);
2102
2103 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2104 if (!trigger_list_element) {
2105 ret = -1;
2106 goto end;
2107 }
2108 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2109 trigger_list_element->trigger = trigger;
2110 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2112 channel->name);
2113 }
2114 end:
2115 return ret;
2116 }
2117
2118 /*
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2121 *
2122 * The effects of this are benign since:
2123 * - The client will succeed in registering the trigger, as it is valid,
2124 * - The trigger will, internally, be bound to the channel/session,
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
2127 *
2128 * If this function returns a non-zero value, it means something is
2129 * fundamentally broken and the whole subsystem/thread will be torn down.
2130 *
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2132 * error code.
2133 */
2134 static
2135 int handle_notification_thread_command_register_trigger(
2136 struct notification_thread_state *state,
2137 struct lttng_trigger *trigger,
2138 enum lttng_error_code *cmd_result)
2139 {
2140 int ret = 0;
2141 struct lttng_condition *condition;
2142 struct notification_client *client;
2143 struct notification_client_list *client_list = NULL;
2144 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2145 struct notification_client_list_element *client_list_element, *tmp;
2146 struct cds_lfht_node *node;
2147 struct cds_lfht_iter iter;
2148 bool free_trigger = true;
2149
2150 rcu_read_lock();
2151
2152 condition = lttng_trigger_get_condition(trigger);
2153 assert(condition);
2154
2155 ret = condition_is_supported(condition);
2156 if (ret < 0) {
2157 goto error;
2158 } else if (ret == 0) {
2159 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2160 goto error;
2161 } else {
2162 /* Feature is supported, continue. */
2163 ret = 0;
2164 }
2165
2166 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2167 if (!trigger_ht_element) {
2168 ret = -1;
2169 goto error;
2170 }
2171
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element->node);
2174 trigger_ht_element->trigger = trigger;
2175
2176 node = cds_lfht_add_unique(state->triggers_ht,
2177 lttng_condition_hash(condition),
2178 match_condition,
2179 condition,
2180 &trigger_ht_element->node);
2181 if (node != &trigger_ht_element->node) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2184 goto error_free_ht_element;
2185 }
2186
2187 /*
2188 * Ownership of the trigger and of its wrapper was transfered to
2189 * the triggers_ht.
2190 */
2191 trigger_ht_element = NULL;
2192 free_trigger = false;
2193
2194 /*
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2197 * supported.
2198 */
2199 client_list = notification_client_list_create(trigger);
2200 if (!client_list) {
2201 ret = -1;
2202 goto error_free_ht_element;
2203 }
2204
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2207 client_socket_ht_node) {
2208 if (!trigger_applies_to_client(trigger, client)) {
2209 continue;
2210 }
2211
2212 client_list_element = zmalloc(sizeof(*client_list_element));
2213 if (!client_list_element) {
2214 ret = -1;
2215 goto error_put_client_list;
2216 }
2217 CDS_INIT_LIST_HEAD(&client_list_element->node);
2218 client_list_element->client = client;
2219 cds_list_add(&client_list_element->node, &client_list->list);
2220 }
2221
2222 switch (get_condition_binding_object(condition)) {
2223 case LTTNG_OBJECT_TYPE_SESSION:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret = bind_trigger_to_matching_session(trigger, state);
2226 if (ret) {
2227 goto error_put_client_list;
2228 }
2229 break;
2230 case LTTNG_OBJECT_TYPE_CHANNEL:
2231 /*
2232 * Add the trigger to list of triggers bound to the channels
2233 * currently known.
2234 */
2235 ret = bind_trigger_to_matching_channels(trigger, state);
2236 if (ret) {
2237 goto error_put_client_list;
2238 }
2239 break;
2240 case LTTNG_OBJECT_TYPE_NONE:
2241 break;
2242 default:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2244 ret = -1;
2245 goto error_put_client_list;
2246 }
2247
2248 /*
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2252 *
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2255 * that was broken.
2256 *
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2259 * conditions.
2260 *
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2266 *
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
2273 *
2274 * No need to lock the list here as it has not been published yet.
2275 */
2276 cds_list_for_each_entry_safe(client_list_element, tmp,
2277 &client_list->list, node) {
2278 ret = evaluate_condition_for_client(trigger, condition,
2279 client_list_element->client, state);
2280 if (ret) {
2281 goto error_put_client_list;
2282 }
2283 }
2284
2285 /*
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2288 */
2289 publish_notification_client_list(state, client_list);
2290 client_list = NULL;
2291
2292 *cmd_result = LTTNG_OK;
2293
2294 error_put_client_list:
2295 notification_client_list_put(client_list);
2296
2297 error_free_ht_element:
2298 free(trigger_ht_element);
2299 error:
2300 if (free_trigger) {
2301 lttng_trigger_destroy(trigger);
2302 }
2303 rcu_read_unlock();
2304 return ret;
2305 }
2306
2307 static
2308 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2309 {
2310 free(caa_container_of(node, struct lttng_trigger_ht_element,
2311 rcu_node));
2312 }
2313
2314 static
2315 int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state *state,
2317 struct lttng_trigger *trigger,
2318 enum lttng_error_code *_cmd_reply)
2319 {
2320 struct cds_lfht_iter iter;
2321 struct cds_lfht_node *triggers_ht_node;
2322 struct lttng_channel_trigger_list *trigger_list;
2323 struct notification_client_list *client_list;
2324 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2325 struct lttng_condition *condition = lttng_trigger_get_condition(
2326 trigger);
2327 enum lttng_error_code cmd_reply;
2328
2329 rcu_read_lock();
2330
2331 cds_lfht_lookup(state->triggers_ht,
2332 lttng_condition_hash(condition),
2333 match_condition,
2334 condition,
2335 &iter);
2336 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2337 if (!triggers_ht_node) {
2338 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2339 goto end;
2340 } else {
2341 cmd_reply = LTTNG_OK;
2342 }
2343
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2346 channel_triggers_ht_node) {
2347 struct lttng_trigger_list_element *trigger_element, *tmp;
2348
2349 cds_list_for_each_entry_safe(trigger_element, tmp,
2350 &trigger_list->list, node) {
2351 const struct lttng_condition *current_condition =
2352 lttng_trigger_get_const_condition(
2353 trigger_element->trigger);
2354
2355 assert(current_condition);
2356 if (!lttng_condition_is_equal(condition,
2357 current_condition)) {
2358 continue;
2359 }
2360
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element->node);
2363 /* A trigger can only appear once per channel */
2364 break;
2365 }
2366 }
2367
2368 /*
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2371 */
2372 client_list = get_client_list_from_condition(state, condition);
2373 assert(client_list);
2374
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list);
2377 notification_client_list_put(client_list);
2378 client_list = NULL;
2379
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element = caa_container_of(triggers_ht_node,
2382 struct lttng_trigger_ht_element, node);
2383 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2384
2385 /* Release the ownership of the trigger. */
2386 lttng_trigger_destroy(trigger_ht_element->trigger);
2387 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2388 end:
2389 rcu_read_unlock();
2390 if (_cmd_reply) {
2391 *_cmd_reply = cmd_reply;
2392 }
2393 return 0;
2394 }
2395
2396 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2397 int handle_notification_thread_command(
2398 struct notification_thread_handle *handle,
2399 struct notification_thread_state *state)
2400 {
2401 int ret;
2402 uint64_t counter;
2403 struct notification_thread_command *cmd;
2404
2405 /* Read the event pipe to put it back into a quiescent state. */
2406 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2407 sizeof(counter));
2408 if (ret != sizeof(counter)) {
2409 goto error;
2410 }
2411
2412 pthread_mutex_lock(&handle->cmd_queue.lock);
2413 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2414 struct notification_thread_command, cmd_list_node);
2415 switch (cmd->type) {
2416 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2417 DBG("[notification-thread] Received register trigger command");
2418 ret = handle_notification_thread_command_register_trigger(
2419 state, cmd->parameters.trigger,
2420 &cmd->reply_code);
2421 break;
2422 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2423 DBG("[notification-thread] Received unregister trigger command");
2424 ret = handle_notification_thread_command_unregister_trigger(
2425 state, cmd->parameters.trigger,
2426 &cmd->reply_code);
2427 break;
2428 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2429 DBG("[notification-thread] Received add channel command");
2430 ret = handle_notification_thread_command_add_channel(
2431 state,
2432 cmd->parameters.add_channel.session.name,
2433 cmd->parameters.add_channel.session.uid,
2434 cmd->parameters.add_channel.session.gid,
2435 cmd->parameters.add_channel.channel.name,
2436 cmd->parameters.add_channel.channel.domain,
2437 cmd->parameters.add_channel.channel.key,
2438 cmd->parameters.add_channel.channel.capacity,
2439 &cmd->reply_code);
2440 break;
2441 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2442 DBG("[notification-thread] Received remove channel command");
2443 ret = handle_notification_thread_command_remove_channel(
2444 state, cmd->parameters.remove_channel.key,
2445 cmd->parameters.remove_channel.domain,
2446 &cmd->reply_code);
2447 break;
2448 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2449 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2450 DBG("[notification-thread] Received session rotation %s command",
2451 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2452 "ongoing" : "completed");
2453 ret = handle_notification_thread_command_session_rotation(
2454 state,
2455 cmd->type,
2456 cmd->parameters.session_rotation.session_name,
2457 cmd->parameters.session_rotation.uid,
2458 cmd->parameters.session_rotation.gid,
2459 cmd->parameters.session_rotation.trace_archive_chunk_id,
2460 cmd->parameters.session_rotation.location,
2461 &cmd->reply_code);
2462 break;
2463 case NOTIFICATION_COMMAND_TYPE_QUIT:
2464 DBG("[notification-thread] Received quit command");
2465 cmd->reply_code = LTTNG_OK;
2466 ret = 1;
2467 goto end;
2468 default:
2469 ERR("[notification-thread] Unknown internal command received");
2470 goto error_unlock;
2471 }
2472
2473 if (ret) {
2474 goto error_unlock;
2475 }
2476 end:
2477 cds_list_del(&cmd->cmd_list_node);
2478 lttng_waiter_wake_up(&cmd->reply_waiter);
2479 pthread_mutex_unlock(&handle->cmd_queue.lock);
2480 return ret;
2481 error_unlock:
2482 /* Wake-up and return a fatal error to the calling thread. */
2483 lttng_waiter_wake_up(&cmd->reply_waiter);
2484 pthread_mutex_unlock(&handle->cmd_queue.lock);
2485 cmd->reply_code = LTTNG_ERR_FATAL;
2486 error:
2487 /* Indicate a fatal error to the caller. */
2488 return -1;
2489 }
2490
2491 static
2492 int socket_set_non_blocking(int socket)
2493 {
2494 int ret, flags;
2495
2496 /* Set the pipe as non-blocking. */
2497 ret = fcntl(socket, F_GETFL, 0);
2498 if (ret == -1) {
2499 PERROR("fcntl get socket flags");
2500 goto end;
2501 }
2502 flags = ret;
2503
2504 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2505 if (ret == -1) {
2506 PERROR("fcntl set O_NONBLOCK socket flag");
2507 goto end;
2508 }
2509 DBG("Client socket (fd = %i) set as non-blocking", socket);
2510 end:
2511 return ret;
2512 }
2513
2514 /* Client lock must be acquired by caller. */
2515 static
2516 int client_reset_inbound_state(struct notification_client *client)
2517 {
2518 int ret;
2519
2520 ASSERT_LOCKED(client->lock);
2521
2522 ret = lttng_dynamic_buffer_set_size(
2523 &client->communication.inbound.buffer, 0);
2524 assert(!ret);
2525
2526 client->communication.inbound.bytes_to_receive =
2527 sizeof(struct lttng_notification_channel_message);
2528 client->communication.inbound.msg_type =
2529 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2530 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2531 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2532 ret = lttng_dynamic_buffer_set_size(
2533 &client->communication.inbound.buffer,
2534 client->communication.inbound.bytes_to_receive);
2535 return ret;
2536 }
2537
2538 int handle_notification_thread_client_connect(
2539 struct notification_thread_state *state)
2540 {
2541 int ret;
2542 struct notification_client *client;
2543
2544 DBG("[notification-thread] Handling new notification channel client connection");
2545
2546 client = zmalloc(sizeof(*client));
2547 if (!client) {
2548 /* Fatal error. */
2549 ret = -1;
2550 goto error;
2551 }
2552 pthread_mutex_init(&client->lock, NULL);
2553 client->id = state->next_notification_client_id++;
2554 CDS_INIT_LIST_HEAD(&client->condition_list);
2555 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2556 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2557 client->communication.inbound.expect_creds = true;
2558
2559 pthread_mutex_lock(&client->lock);
2560 ret = client_reset_inbound_state(client);
2561 pthread_mutex_unlock(&client->lock);
2562 if (ret) {
2563 ERR("[notification-thread] Failed to reset client communication's inbound state");
2564 ret = 0;
2565 goto error;
2566 }
2567
2568 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2569 if (ret < 0) {
2570 ERR("[notification-thread] Failed to accept new notification channel client connection");
2571 ret = 0;
2572 goto error;
2573 }
2574
2575 client->socket = ret;
2576
2577 ret = socket_set_non_blocking(client->socket);
2578 if (ret) {
2579 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2580 goto error;
2581 }
2582
2583 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2584 if (ret < 0) {
2585 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2586 ret = 0;
2587 goto error;
2588 }
2589
2590 ret = lttng_poll_add(&state->events, client->socket,
2591 LPOLLIN | LPOLLERR |
2592 LPOLLHUP | LPOLLRDHUP);
2593 if (ret < 0) {
2594 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2595 ret = 0;
2596 goto error;
2597 }
2598 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2599 client->socket);
2600
2601 rcu_read_lock();
2602 cds_lfht_add(state->client_socket_ht,
2603 hash_client_socket(client->socket),
2604 &client->client_socket_ht_node);
2605 cds_lfht_add(state->client_id_ht,
2606 hash_client_id(client->id),
2607 &client->client_id_ht_node);
2608 rcu_read_unlock();
2609
2610 return ret;
2611 error:
2612 notification_client_destroy(client, state);
2613 return ret;
2614 }
2615
2616 /* RCU read-lock must be held by the caller. */
2617 /* Client lock must be held by the caller */
2618 static
2619 int notification_thread_client_disconnect(
2620 struct notification_client *client,
2621 struct notification_thread_state *state)
2622 {
2623 int ret;
2624 struct lttng_condition_list_element *condition_list_element, *tmp;
2625
2626 /* Acquire the client lock to disable its communication atomically. */
2627 client->communication.active = false;
2628 ret = lttng_poll_del(&state->events, client->socket);
2629 if (ret) {
2630 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2631 client->socket);
2632 }
2633
2634 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2635 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2636
2637 /* Release all conditions to which the client was subscribed. */
2638 cds_list_for_each_entry_safe(condition_list_element, tmp,
2639 &client->condition_list, node) {
2640 (void) notification_thread_client_unsubscribe(client,
2641 condition_list_element->condition, state, NULL);
2642 }
2643
2644 /*
2645 * Client no longer accessible to other threads (through the
2646 * client lists).
2647 */
2648 notification_client_destroy(client, state);
2649 return ret;
2650 }
2651
2652 int handle_notification_thread_client_disconnect(
2653 int client_socket, struct notification_thread_state *state)
2654 {
2655 int ret = 0;
2656 struct notification_client *client;
2657
2658 rcu_read_lock();
2659 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2660 client_socket);
2661 client = get_client_from_socket(client_socket, state);
2662 if (!client) {
2663 /* Internal state corruption, fatal error. */
2664 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2665 client_socket);
2666 ret = -1;
2667 goto end;
2668 }
2669
2670 pthread_mutex_lock(&client->lock);
2671 ret = notification_thread_client_disconnect(client, state);
2672 pthread_mutex_unlock(&client->lock);
2673 end:
2674 rcu_read_unlock();
2675 return ret;
2676 }
2677
2678 int handle_notification_thread_client_disconnect_all(
2679 struct notification_thread_state *state)
2680 {
2681 struct cds_lfht_iter iter;
2682 struct notification_client *client;
2683 bool error_encoutered = false;
2684
2685 rcu_read_lock();
2686 DBG("[notification-thread] Closing all client connections");
2687 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2688 client_socket_ht_node) {
2689 int ret;
2690
2691 pthread_mutex_lock(&client->lock);
2692 ret = notification_thread_client_disconnect(
2693 client, state);
2694 pthread_mutex_unlock(&client->lock);
2695 if (ret) {
2696 error_encoutered = true;
2697 }
2698 }
2699 rcu_read_unlock();
2700 return error_encoutered ? 1 : 0;
2701 }
2702
2703 int handle_notification_thread_trigger_unregister_all(
2704 struct notification_thread_state *state)
2705 {
2706 bool error_occurred = false;
2707 struct cds_lfht_iter iter;
2708 struct lttng_trigger_ht_element *trigger_ht_element;
2709
2710 rcu_read_lock();
2711 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2712 node) {
2713 int ret = handle_notification_thread_command_unregister_trigger(
2714 state, trigger_ht_element->trigger, NULL);
2715 if (ret) {
2716 error_occurred = true;
2717 }
2718 }
2719 rcu_read_unlock();
2720 return error_occurred ? -1 : 0;
2721 }
2722
2723 static
2724 int client_handle_transmission_status(
2725 struct notification_client *client,
2726 enum client_transmission_status transmission_status,
2727 struct notification_thread_state *state)
2728 {
2729 int ret = 0;
2730
2731 switch (transmission_status) {
2732 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
2733 ret = lttng_poll_mod(&state->events, client->socket,
2734 CLIENT_POLL_MASK_IN);
2735 if (ret) {
2736 goto end;
2737 }
2738
2739 client->communication.outbound.queued_command_reply = false;
2740 client->communication.outbound.dropped_notification = false;
2741 break;
2742 case CLIENT_TRANSMISSION_STATUS_QUEUED:
2743 /*
2744 * We want to be notified whenever there is buffer space
2745 * available to send the rest of the payload.
2746 */
2747 ret = lttng_poll_mod(&state->events, client->socket,
2748 CLIENT_POLL_MASK_IN_OUT);
2749 if (ret) {
2750 goto end;
2751 }
2752 break;
2753 case CLIENT_TRANSMISSION_STATUS_FAIL:
2754 ret = notification_thread_client_disconnect(client, state);
2755 if (ret) {
2756 goto end;
2757 }
2758 break;
2759 case CLIENT_TRANSMISSION_STATUS_ERROR:
2760 ret = -1;
2761 goto end;
2762 default:
2763 abort();
2764 }
2765 end:
2766 return ret;
2767 }
2768
2769 /* Client lock must be acquired by caller. */
2770 static
2771 enum client_transmission_status client_flush_outgoing_queue(
2772 struct notification_client *client,
2773 struct notification_thread_state *state)
2774 {
2775 ssize_t ret;
2776 size_t to_send_count;
2777 enum client_transmission_status status;
2778
2779 ASSERT_LOCKED(client->lock);
2780
2781 assert(client->communication.outbound.buffer.size != 0);
2782 to_send_count = client->communication.outbound.buffer.size;
2783 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2784 client->socket);
2785
2786 ret = lttcomm_send_unix_sock_non_block(client->socket,
2787 client->communication.outbound.buffer.data,
2788 to_send_count);
2789 if ((ret >= 0 && ret < to_send_count)) {
2790 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2791 client->socket);
2792 to_send_count -= max(ret, 0);
2793
2794 memcpy(client->communication.outbound.buffer.data,
2795 client->communication.outbound.buffer.data +
2796 client->communication.outbound.buffer.size - to_send_count,
2797 to_send_count);
2798 ret = lttng_dynamic_buffer_set_size(
2799 &client->communication.outbound.buffer,
2800 to_send_count);
2801 if (ret) {
2802 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2803 goto error;
2804 }
2805 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
2806 } else if (ret < 0) {
2807 /* Generic error, disconnect the client. */
2808 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2809 client->socket);
2810 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2811 } else {
2812 /* No error and flushed the queue completely. */
2813 ret = lttng_dynamic_buffer_set_size(
2814 &client->communication.outbound.buffer, 0);
2815 if (ret) {
2816 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2817 goto error;
2818 }
2819 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
2820 }
2821
2822 ret = client_handle_transmission_status(client, status, state);
2823 if (ret) {
2824 goto error;
2825 }
2826
2827 return 0;
2828 error:
2829 return -1;
2830 }
2831
2832 /* Client lock must be acquired by caller. */
2833 static
2834 int client_send_command_reply(struct notification_client *client,
2835 struct notification_thread_state *state,
2836 enum lttng_notification_channel_status status)
2837 {
2838 int ret;
2839 struct lttng_notification_channel_command_reply reply = {
2840 .status = (int8_t) status,
2841 };
2842 struct lttng_notification_channel_message msg = {
2843 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2844 .size = sizeof(reply),
2845 };
2846 char buffer[sizeof(msg) + sizeof(reply)];
2847
2848 ASSERT_LOCKED(client->lock);
2849
2850 if (client->communication.outbound.queued_command_reply) {
2851 /* Protocol error. */
2852 goto error;
2853 }
2854
2855 memcpy(buffer, &msg, sizeof(msg));
2856 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2857 DBG("[notification-thread] Send command reply (%i)", (int) status);
2858
2859 /* Enqueue buffer to outgoing queue and flush it. */
2860 ret = lttng_dynamic_buffer_append(
2861 &client->communication.outbound.buffer,
2862 buffer, sizeof(buffer));
2863 if (ret) {
2864 goto error;
2865 }
2866
2867 ret = client_flush_outgoing_queue(client, state);
2868 if (ret) {
2869 goto error;
2870 }
2871
2872 if (client->communication.outbound.buffer.size != 0) {
2873 /* Queue could not be emptied. */
2874 client->communication.outbound.queued_command_reply = true;
2875 }
2876
2877 return 0;
2878 error:
2879 return -1;
2880 }
2881
2882 static
2883 int client_handle_message_unknown(struct notification_client *client,
2884 struct notification_thread_state *state)
2885 {
2886 int ret;
2887
2888 pthread_mutex_lock(&client->lock);
2889
2890 /*
2891 * Receiving message header. The function will be called again
2892 * once the rest of the message as been received and can be
2893 * interpreted.
2894 */
2895 const struct lttng_notification_channel_message *msg;
2896
2897 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2898 msg = (const struct lttng_notification_channel_message *)
2899 client->communication.inbound.buffer.data;
2900
2901 if (msg->size == 0 ||
2902 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2903 ERR("[notification-thread] Invalid notification channel message: length = %u",
2904 msg->size);
2905 ret = -1;
2906 goto end;
2907 }
2908
2909 switch (msg->type) {
2910 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2911 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2912 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2913 break;
2914 default:
2915 ret = -1;
2916 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2917 goto end;
2918 }
2919
2920 client->communication.inbound.bytes_to_receive = msg->size;
2921 client->communication.inbound.msg_type =
2922 (enum lttng_notification_channel_message_type) msg->type;
2923 ret = lttng_dynamic_buffer_set_size(
2924 &client->communication.inbound.buffer, msg->size);
2925 end:
2926 pthread_mutex_unlock(&client->lock);
2927 return ret;
2928 }
2929
2930 static
2931 int client_handle_message_handshake(struct notification_client *client,
2932 struct notification_thread_state *state)
2933 {
2934 int ret;
2935 struct lttng_notification_channel_command_handshake *handshake_client;
2936 const struct lttng_notification_channel_command_handshake handshake_reply = {
2937 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2938 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2939 };
2940 const struct lttng_notification_channel_message msg_header = {
2941 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2942 .size = sizeof(handshake_reply),
2943 };
2944 enum lttng_notification_channel_status status =
2945 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2946 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2947
2948 pthread_mutex_lock(&client->lock);
2949
2950 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2951 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2952 sizeof(handshake_reply));
2953
2954 handshake_client =
2955 (struct lttng_notification_channel_command_handshake *)
2956 client->communication.inbound.buffer
2957 .data;
2958 client->major = handshake_client->major;
2959 client->minor = handshake_client->minor;
2960 if (!client->communication.inbound.creds_received) {
2961 ERR("[notification-thread] No credentials received from client");
2962 ret = -1;
2963 goto end;
2964 }
2965
2966 client->uid = LTTNG_SOCK_GET_UID_CRED(
2967 &client->communication.inbound.creds);
2968 client->gid = LTTNG_SOCK_GET_GID_CRED(
2969 &client->communication.inbound.creds);
2970 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2971 client->uid, client->gid, (int) client->major,
2972 (int) client->minor);
2973
2974 if (handshake_client->major !=
2975 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2976 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2977 }
2978
2979 ret = lttng_dynamic_buffer_append(
2980 &client->communication.outbound.buffer, send_buffer,
2981 sizeof(send_buffer));
2982 if (ret) {
2983 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2984 goto end;
2985 }
2986
2987 client->validated = true;
2988 client->communication.active = true;
2989
2990 ret = client_flush_outgoing_queue(client, state);
2991 if (ret) {
2992 goto end;
2993 }
2994
2995 ret = client_send_command_reply(client, state, status);
2996 if (ret) {
2997 ERR("[notification-thread] Failed to send reply to notification channel client");
2998 goto end;
2999 }
3000
3001 /* Set reception state to receive the next message header. */
3002 ret = client_reset_inbound_state(client);
3003 if (ret) {
3004 ERR("[notification-thread] Failed to reset client communication's inbound state");
3005 goto end;
3006 }
3007
3008 end:
3009 pthread_mutex_unlock(&client->lock);
3010 return ret;
3011 }
3012
3013 static
3014 int client_handle_message_subscription(
3015 struct notification_client *client,
3016 enum lttng_notification_channel_message_type msg_type,
3017 struct notification_thread_state *state)
3018 {
3019 int ret;
3020 struct lttng_condition *condition;
3021 enum lttng_notification_channel_status status =
3022 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3023 struct lttng_payload_view condition_view =
3024 lttng_payload_view_from_dynamic_buffer(
3025 &client->communication.inbound.buffer,
3026 0, -1);
3027 size_t expected_condition_size;
3028
3029 pthread_mutex_lock(&client->lock);
3030 expected_condition_size = client->communication.inbound.buffer.size;
3031 pthread_mutex_unlock(&client->lock);
3032
3033 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3034 if (ret != expected_condition_size) {
3035 ERR("[notification-thread] Malformed condition received from client");
3036 goto end;
3037 }
3038
3039 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3040 ret = notification_thread_client_subscribe(
3041 client, condition, state, &status);
3042 } else {
3043 ret = notification_thread_client_unsubscribe(
3044 client, condition, state, &status);
3045 }
3046 if (ret) {
3047 goto end;
3048 }
3049
3050 pthread_mutex_lock(&client->lock);
3051 ret = client_send_command_reply(client, state, status);
3052 if (ret) {
3053 ERR("[notification-thread] Failed to send reply to notification channel client");
3054 goto end_unlock;
3055 }
3056
3057 /* Set reception state to receive the next message header. */
3058 ret = client_reset_inbound_state(client);
3059 if (ret) {
3060 ERR("[notification-thread] Failed to reset client communication's inbound state");
3061 goto end_unlock;
3062 }
3063
3064 end_unlock:
3065 pthread_mutex_unlock(&client->lock);
3066 end:
3067 return ret;
3068 }
3069
3070 static
3071 int client_dispatch_message(struct notification_client *client,
3072 struct notification_thread_state *state)
3073 {
3074 int ret = 0;
3075
3076 if (client->communication.inbound.msg_type !=
3077 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3078 client->communication.inbound.msg_type !=
3079 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3080 !client->validated) {
3081 WARN("[notification-thread] client attempted a command before handshake");
3082 ret = -1;
3083 goto end;
3084 }
3085
3086 switch (client->communication.inbound.msg_type) {
3087 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3088 {
3089 ret = client_handle_message_unknown(client, state);
3090 break;
3091 }
3092 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3093 {
3094 ret = client_handle_message_handshake(client, state);
3095 break;
3096 }
3097 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3098 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3099 {
3100 ret = client_handle_message_subscription(client,
3101 client->communication.inbound.msg_type, state);
3102 break;
3103 }
3104 default:
3105 abort();
3106 }
3107 end:
3108 return ret;
3109 }
3110
3111 /* Incoming data from client. */
3112 int handle_notification_thread_client_in(
3113 struct notification_thread_state *state, int socket)
3114 {
3115 int ret = 0;
3116 struct notification_client *client;
3117 ssize_t recv_ret;
3118 size_t offset;
3119 bool message_is_complete = false;
3120
3121 client = get_client_from_socket(socket, state);
3122 if (!client) {
3123 /* Internal error, abort. */
3124 ret = -1;
3125 goto end;
3126 }
3127
3128 pthread_mutex_lock(&client->lock);
3129 offset = client->communication.inbound.buffer.size -
3130 client->communication.inbound.bytes_to_receive;
3131 if (client->communication.inbound.expect_creds) {
3132 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3133 client->communication.inbound.buffer.data + offset,
3134 client->communication.inbound.bytes_to_receive,
3135 &client->communication.inbound.creds);
3136 if (recv_ret > 0) {
3137 client->communication.inbound.expect_creds = false;
3138 client->communication.inbound.creds_received = true;
3139 }
3140 } else {
3141 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3142 client->communication.inbound.buffer.data + offset,
3143 client->communication.inbound.bytes_to_receive);
3144 }
3145 if (recv_ret >= 0) {
3146 client->communication.inbound.bytes_to_receive -= recv_ret;
3147 message_is_complete = client->communication.inbound
3148 .bytes_to_receive == 0;
3149 }
3150 pthread_mutex_unlock(&client->lock);
3151 if (recv_ret < 0) {
3152 goto error_disconnect_client;
3153 }
3154
3155 if (message_is_complete) {
3156 ret = client_dispatch_message(client, state);
3157 if (ret) {
3158 /*
3159 * Only returns an error if this client must be
3160 * disconnected.
3161 */
3162 goto error_disconnect_client;
3163 }
3164 }
3165 end:
3166 return ret;
3167 error_disconnect_client:
3168 pthread_mutex_lock(&client->lock);
3169 ret = notification_thread_client_disconnect(client, state);
3170 pthread_mutex_unlock(&client->lock);
3171 return ret;
3172 }
3173
3174 /* Client ready to receive outgoing data. */
3175 int handle_notification_thread_client_out(
3176 struct notification_thread_state *state, int socket)
3177 {
3178 int ret;
3179 struct notification_client *client;
3180
3181 client = get_client_from_socket(socket, state);
3182 if (!client) {
3183 /* Internal error, abort. */
3184 ret = -1;
3185 goto end;
3186 }
3187
3188 pthread_mutex_lock(&client->lock);
3189 ret = client_flush_outgoing_queue(client, state);
3190 pthread_mutex_unlock(&client->lock);
3191 if (ret) {
3192 goto end;
3193 }
3194 end:
3195 return ret;
3196 }
3197
3198 static
3199 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3200 const struct channel_state_sample *sample,
3201 uint64_t buffer_capacity)
3202 {
3203 bool result = false;
3204 uint64_t threshold;
3205 enum lttng_condition_type condition_type;
3206 const struct lttng_condition_buffer_usage *use_condition = container_of(
3207 condition, struct lttng_condition_buffer_usage,
3208 parent);
3209
3210 if (use_condition->threshold_bytes.set) {
3211 threshold = use_condition->threshold_bytes.value;
3212 } else {
3213 /*
3214 * Threshold was expressed as a ratio.
3215 *
3216 * TODO the threshold (in bytes) of conditions expressed
3217 * as a ratio of total buffer size could be cached to
3218 * forego this double-multiplication or it could be performed
3219 * as fixed-point math.
3220 *
3221 * Note that caching should accommodates the case where the
3222 * condition applies to multiple channels (i.e. don't assume
3223 * that all channels matching my_chann* have the same size...)
3224 */
3225 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3226 (double) buffer_capacity);
3227 }
3228
3229 condition_type = lttng_condition_get_type(condition);
3230 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3231 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3232 threshold, sample->highest_usage);
3233
3234 /*
3235 * The low condition should only be triggered once _all_ of the
3236 * streams in a channel have gone below the "low" threshold.
3237 */
3238 if (sample->highest_usage <= threshold) {
3239 result = true;
3240 }
3241 } else {
3242 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3243 threshold, sample->highest_usage);
3244
3245 /*
3246 * For high buffer usage scenarios, we want to trigger whenever
3247 * _any_ of the streams has reached the "high" threshold.
3248 */
3249 if (sample->highest_usage >= threshold) {
3250 result = true;
3251 }
3252 }
3253
3254 return result;
3255 }
3256
3257 static
3258 bool evaluate_session_consumed_size_condition(
3259 const struct lttng_condition *condition,
3260 uint64_t session_consumed_size)
3261 {
3262 uint64_t threshold;
3263 const struct lttng_condition_session_consumed_size *size_condition =
3264 container_of(condition,
3265 struct lttng_condition_session_consumed_size,
3266 parent);
3267
3268 threshold = size_condition->consumed_threshold_bytes.value;
3269 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3270 threshold, session_consumed_size);
3271 return session_consumed_size >= threshold;
3272 }
3273
3274 static
3275 int evaluate_buffer_condition(const struct lttng_condition *condition,
3276 struct lttng_evaluation **evaluation,
3277 const struct notification_thread_state *state,
3278 const struct channel_state_sample *previous_sample,
3279 const struct channel_state_sample *latest_sample,
3280 uint64_t previous_session_consumed_total,
3281 uint64_t latest_session_consumed_total,
3282 struct channel_info *channel_info)
3283 {
3284 int ret = 0;
3285 enum lttng_condition_type condition_type;
3286 const bool previous_sample_available = !!previous_sample;
3287 bool previous_sample_result = false;
3288 bool latest_sample_result;
3289
3290 condition_type = lttng_condition_get_type(condition);
3291
3292 switch (condition_type) {
3293 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3294 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3295 if (caa_likely(previous_sample_available)) {
3296 previous_sample_result =
3297 evaluate_buffer_usage_condition(condition,
3298 previous_sample, channel_info->capacity);
3299 }
3300 latest_sample_result = evaluate_buffer_usage_condition(
3301 condition, latest_sample,
3302 channel_info->capacity);
3303 break;
3304 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3305 if (caa_likely(previous_sample_available)) {
3306 previous_sample_result =
3307 evaluate_session_consumed_size_condition(
3308 condition,
3309 previous_session_consumed_total);
3310 }
3311 latest_sample_result =
3312 evaluate_session_consumed_size_condition(
3313 condition,
3314 latest_session_consumed_total);
3315 break;
3316 default:
3317 /* Unknown condition type; internal error. */
3318 abort();
3319 }
3320
3321 if (!latest_sample_result ||
3322 (previous_sample_result == latest_sample_result)) {
3323 /*
3324 * Only trigger on a condition evaluation transition.
3325 *
3326 * NOTE: This edge-triggered logic may not be appropriate for
3327 * future condition types.
3328 */
3329 goto end;
3330 }
3331
3332 if (!evaluation || !latest_sample_result) {
3333 goto end;
3334 }
3335
3336 switch (condition_type) {
3337 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3338 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3339 *evaluation = lttng_evaluation_buffer_usage_create(
3340 condition_type,
3341 latest_sample->highest_usage,
3342 channel_info->capacity);
3343 break;
3344 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3345 *evaluation = lttng_evaluation_session_consumed_size_create(
3346 latest_session_consumed_total);
3347 break;
3348 default:
3349 abort();
3350 }
3351
3352 if (!*evaluation) {
3353 ret = -1;
3354 goto end;
3355 }
3356 end:
3357 return ret;
3358 }
3359
3360 static
3361 int client_enqueue_dropped_notification(struct notification_client *client)
3362 {
3363 int ret;
3364 struct lttng_notification_channel_message msg = {
3365 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3366 .size = 0,
3367 };
3368
3369 ASSERT_LOCKED(client->lock);
3370
3371 ret = lttng_dynamic_buffer_append(
3372 &client->communication.outbound.buffer, &msg,
3373 sizeof(msg));
3374 return ret;
3375 }
3376
3377 /*
3378 * Permission checks relative to notification channel clients are performed
3379 * here. Notice how object, client, and trigger credentials are involved in
3380 * this check.
3381 *
3382 * The `object` credentials are the credentials associated with the "subject"
3383 * of a condition. For instance, a `rotation completed` condition applies
3384 * to a session. When that condition is met, it will produce an evaluation
3385 * against a session. Hence, in this case, the `object` credentials are the
3386 * credentials of the "subject" session.
3387 *
3388 * The `trigger` credentials are the credentials of the user that registered the
3389 * trigger.
3390 *
3391 * The `client` credentials are the credentials of the user that created a given
3392 * notification channel.
3393 *
3394 * In terms of visibility, it is expected that non-privilieged users can only
3395 * register triggers against "their" objects (their own sessions and
3396 * applications they are allowed to interact with). They can then open a
3397 * notification channel and subscribe to notifications associated with those
3398 * triggers.
3399 *
3400 * As for privilieged users, they can register triggers against the objects of
3401 * other users. They can then subscribe to the notifications associated to their
3402 * triggers. Privilieged users _can't_ subscribe to the notifications of
3403 * triggers owned by other users; they must create their own triggers.
3404 *
3405 * This is more a concern of usability than security. It would be difficult for
3406 * a root user reliably subscribe to a specific set of conditions without
3407 * interference from external users (those could, for instance, unregister
3408 * their triggers).
3409 */
3410 static
3411 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3412 const struct lttng_evaluation *evaluation,
3413 struct notification_client_list* client_list,
3414 struct notification_thread_state *state,
3415 uid_t object_uid, gid_t object_gid)
3416 {
3417 int ret = 0;
3418 struct lttng_payload msg_payload;
3419 struct notification_client_list_element *client_list_element, *tmp;
3420 const struct lttng_notification notification = {
3421 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3422 .evaluation = (struct lttng_evaluation *) evaluation,
3423 };
3424 struct lttng_notification_channel_message msg_header = {
3425 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3426 };
3427 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
3428
3429 lttng_payload_init(&msg_payload);
3430
3431 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3432 sizeof(msg_header));
3433 if (ret) {
3434 goto end;
3435 }
3436
3437 ret = lttng_notification_serialize(&notification, &msg_payload);
3438 if (ret) {
3439 ERR("[notification-thread] Failed to serialize notification");
3440 ret = -1;
3441 goto end;
3442 }
3443
3444 /* Update payload size. */
3445 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
3446 ->size = (uint32_t)(
3447 msg_payload.buffer.size - sizeof(msg_header));
3448
3449 pthread_mutex_lock(&client_list->lock);
3450 cds_list_for_each_entry_safe(client_list_element, tmp,
3451 &client_list->list, node) {
3452 struct notification_client *client =
3453 client_list_element->client;
3454
3455 ret = 0;
3456 pthread_mutex_lock(&client->lock);
3457 if (client->uid != object_uid && client->gid != object_gid &&
3458 client->uid != 0) {
3459 /* Client is not allowed to monitor this channel. */
3460 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3461 goto unlock_client;
3462 }
3463
3464 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3465 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3466 goto unlock_client;
3467 }
3468
3469 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3470 client->socket, msg_payload.buffer.size);
3471 if (client->communication.outbound.buffer.size) {
3472 /*
3473 * Outgoing data is already buffered for this client;
3474 * drop the notification and enqueue a "dropped
3475 * notification" message if this is the first dropped
3476 * notification since the socket spilled-over to the
3477 * queue.
3478 */
3479 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3480 client->socket);
3481 if (!client->communication.outbound.dropped_notification) {
3482 client->communication.outbound.dropped_notification = true;
3483 ret = client_enqueue_dropped_notification(
3484 client);
3485 if (ret) {
3486 goto unlock_client;
3487 }
3488 }
3489 goto unlock_client;
3490 }
3491
3492 ret = lttng_dynamic_buffer_append_buffer(
3493 &client->communication.outbound.buffer,
3494 &msg_payload.buffer);
3495 if (ret) {
3496 goto unlock_client;
3497 }
3498
3499 ret = client_flush_outgoing_queue(client, state);
3500 if (ret) {
3501 goto unlock_client;
3502 }
3503 unlock_client:
3504 pthread_mutex_unlock(&client->lock);
3505 if (ret) {
3506 goto end_unlock_list;
3507 }
3508 }
3509 ret = 0;
3510
3511 end_unlock_list:
3512 pthread_mutex_unlock(&client_list->lock);
3513 end:
3514 lttng_payload_reset(&msg_payload);
3515 return ret;
3516 }
3517
3518 int handle_notification_thread_channel_sample(
3519 struct notification_thread_state *state, int pipe,
3520 enum lttng_domain_type domain)
3521 {
3522 int ret = 0;
3523 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3524 struct channel_info *channel_info;
3525 struct cds_lfht_node *node;
3526 struct cds_lfht_iter iter;
3527 struct lttng_channel_trigger_list *trigger_list;
3528 struct lttng_trigger_list_element *trigger_list_element;
3529 bool previous_sample_available = false;
3530 struct channel_state_sample previous_sample, latest_sample;
3531 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3532
3533 /*
3534 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3535 * ensuring that read/write of sampling messages are atomic.
3536 */
3537 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3538 if (ret != sizeof(sample_msg)) {
3539 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3540 pipe);
3541 ret = -1;
3542 goto end;
3543 }
3544
3545 ret = 0;
3546 latest_sample.key.key = sample_msg.key;
3547 latest_sample.key.domain = domain;
3548 latest_sample.highest_usage = sample_msg.highest;
3549 latest_sample.lowest_usage = sample_msg.lowest;
3550 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3551
3552 rcu_read_lock();
3553
3554 /* Retrieve the channel's informations */
3555 cds_lfht_lookup(state->channels_ht,
3556 hash_channel_key(&latest_sample.key),
3557 match_channel_info,
3558 &latest_sample.key,
3559 &iter);
3560 node = cds_lfht_iter_get_node(&iter);
3561 if (caa_unlikely(!node)) {
3562 /*
3563 * Not an error since the consumer can push a sample to the pipe
3564 * and the rest of the session daemon could notify us of the
3565 * channel's destruction before we get a chance to process that
3566 * sample.
3567 */
3568 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3569 latest_sample.key.key,
3570 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3571 "user space");
3572 goto end_unlock;
3573 }
3574 channel_info = caa_container_of(node, struct channel_info,
3575 channels_ht_node);
3576 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3577 channel_info->name,
3578 latest_sample.key.key,
3579 channel_info->session_info->name,
3580 latest_sample.highest_usage,
3581 latest_sample.lowest_usage,
3582 latest_sample.channel_total_consumed);
3583
3584 previous_session_consumed_total =
3585 channel_info->session_info->consumed_data_size;
3586
3587 /* Retrieve the channel's last sample, if it exists, and update it. */
3588 cds_lfht_lookup(state->channel_state_ht,
3589 hash_channel_key(&latest_sample.key),
3590 match_channel_state_sample,
3591 &latest_sample.key,
3592 &iter);
3593 node = cds_lfht_iter_get_node(&iter);
3594 if (caa_likely(node)) {
3595 struct channel_state_sample *stored_sample;
3596
3597 /* Update the sample stored. */
3598 stored_sample = caa_container_of(node,
3599 struct channel_state_sample,
3600 channel_state_ht_node);
3601
3602 memcpy(&previous_sample, stored_sample,
3603 sizeof(previous_sample));
3604 stored_sample->highest_usage = latest_sample.highest_usage;
3605 stored_sample->lowest_usage = latest_sample.lowest_usage;
3606 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3607 previous_sample_available = true;
3608
3609 latest_session_consumed_total =
3610 previous_session_consumed_total +
3611 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3612 } else {
3613 /*
3614 * This is the channel's first sample, allocate space for and
3615 * store the new sample.
3616 */
3617 struct channel_state_sample *stored_sample;
3618
3619 stored_sample = zmalloc(sizeof(*stored_sample));
3620 if (!stored_sample) {
3621 ret = -1;
3622 goto end_unlock;
3623 }
3624
3625 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3626 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3627 cds_lfht_add(state->channel_state_ht,
3628 hash_channel_key(&stored_sample->key),
3629 &stored_sample->channel_state_ht_node);
3630
3631 latest_session_consumed_total =
3632 previous_session_consumed_total +
3633 latest_sample.channel_total_consumed;
3634 }
3635
3636 channel_info->session_info->consumed_data_size =
3637 latest_session_consumed_total;
3638
3639 /* Find triggers associated with this channel. */
3640 cds_lfht_lookup(state->channel_triggers_ht,
3641 hash_channel_key(&latest_sample.key),
3642 match_channel_trigger_list,
3643 &latest_sample.key,
3644 &iter);
3645 node = cds_lfht_iter_get_node(&iter);
3646 if (caa_likely(!node)) {
3647 goto end_unlock;
3648 }
3649
3650 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3651 channel_triggers_ht_node);
3652 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3653 node) {
3654 const struct lttng_condition *condition;
3655 const struct lttng_action *action;
3656 const struct lttng_trigger *trigger;
3657 struct notification_client_list *client_list = NULL;
3658 struct lttng_evaluation *evaluation = NULL;
3659 bool client_list_is_empty;
3660
3661 ret = 0;
3662 trigger = trigger_list_element->trigger;
3663 condition = lttng_trigger_get_const_condition(trigger);
3664 assert(condition);
3665 action = lttng_trigger_get_const_action(trigger);
3666
3667 /* Notify actions are the only type currently supported. */
3668 assert(lttng_action_get_type_const(action) ==
3669 LTTNG_ACTION_TYPE_NOTIFY);
3670
3671 /*
3672 * Check if any client is subscribed to the result of this
3673 * evaluation.
3674 */
3675 client_list = get_client_list_from_condition(state, condition);
3676 assert(client_list);
3677 client_list_is_empty = cds_list_empty(&client_list->list);
3678 if (client_list_is_empty) {
3679 /*
3680 * No clients interested in the evaluation's result,
3681 * skip it.
3682 */
3683 goto put_list;
3684 }
3685
3686 ret = evaluate_buffer_condition(condition, &evaluation, state,
3687 previous_sample_available ? &previous_sample : NULL,
3688 &latest_sample,
3689 previous_session_consumed_total,
3690 latest_session_consumed_total,
3691 channel_info);
3692 if (caa_unlikely(ret)) {
3693 goto put_list;
3694 }
3695
3696 if (caa_likely(!evaluation)) {
3697 goto put_list;
3698 }
3699
3700 /* Dispatch evaluation result to all clients. */
3701 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3702 evaluation, client_list, state,
3703 channel_info->session_info->uid,
3704 channel_info->session_info->gid);
3705 lttng_evaluation_destroy(evaluation);
3706 put_list:
3707 notification_client_list_put(client_list);
3708 if (caa_unlikely(ret)) {
3709 break;
3710 }
3711 }
3712 end_unlock:
3713 rcu_read_unlock();
3714 end:
3715 return ret;
3716 }
This page took 0.144487 seconds and 6 git commands to generate.