a2fa8a1ca854933bdf505508002b3d113f609ce4
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 struct notification_client_list {
126 const struct lttng_trigger *trigger;
127 struct cds_list_head list;
128 struct cds_lfht_node notification_trigger_ht_node;
129 /* call_rcu delayed reclaim. */
130 struct rcu_head rcu_node;
131 };
132
133 struct notification_client {
134 notification_client_id id;
135 int socket;
136 /* Client protocol version. */
137 uint8_t major, minor;
138 uid_t uid;
139 gid_t gid;
140 /*
141 * Indicates if the credentials and versions of the client have been
142 * checked.
143 */
144 bool validated;
145 /*
146 * Conditions to which the client's notification channel is subscribed.
147 * List of struct lttng_condition_list_node. The condition member is
148 * owned by the client.
149 */
150 struct cds_list_head condition_list;
151 struct cds_lfht_node client_socket_ht_node;
152 struct cds_lfht_node client_id_ht_node;
153 struct {
154 /*
155 * If a client's communication is inactive, it means a fatal
156 * error (either a protocol error or the socket API returned
157 * a fatal error). No further communication should be attempted;
158 * the client is queued for clean-up.
159 */
160 bool active;
161 struct {
162 /*
163 * During the reception of a message, the reception
164 * buffers' "size" is set to contain the current
165 * message's complete payload.
166 */
167 struct lttng_dynamic_buffer buffer;
168 /* Bytes left to receive for the current message. */
169 size_t bytes_to_receive;
170 /* Type of the message being received. */
171 enum lttng_notification_channel_message_type msg_type;
172 /*
173 * Indicates whether or not credentials are expected
174 * from the client.
175 */
176 bool expect_creds;
177 /*
178 * Indicates whether or not credentials were received
179 * from the client.
180 */
181 bool creds_received;
182 /* Only used during credentials reception. */
183 lttng_sock_cred creds;
184 } inbound;
185 struct {
186 /*
187 * Indicates whether or not a notification addressed to
188 * this client was dropped because a command reply was
189 * already buffered.
190 *
191 * A notification is dropped whenever the buffer is not
192 * empty.
193 */
194 bool dropped_notification;
195 /*
196 * Indicates whether or not a command reply is already
197 * buffered. In this case, it means that the client is
198 * not consuming command replies before emitting a new
199 * one. This could be caused by a protocol error or a
200 * misbehaving/malicious client.
201 */
202 bool queued_command_reply;
203 struct lttng_dynamic_buffer buffer;
204 } outbound;
205 } communication;
206 /* call_rcu delayed reclaim. */
207 struct rcu_head rcu_node;
208 };
209
210 struct channel_state_sample {
211 struct channel_key key;
212 struct cds_lfht_node channel_state_ht_node;
213 uint64_t highest_usage;
214 uint64_t lowest_usage;
215 uint64_t channel_total_consumed;
216 /* call_rcu delayed reclaim. */
217 struct rcu_head rcu_node;
218 };
219
220 static unsigned long hash_channel_key(struct channel_key *key);
221 static int evaluate_buffer_condition(const struct lttng_condition *condition,
222 struct lttng_evaluation **evaluation,
223 const struct notification_thread_state *state,
224 const struct channel_state_sample *previous_sample,
225 const struct channel_state_sample *latest_sample,
226 uint64_t previous_session_consumed_total,
227 uint64_t latest_session_consumed_total,
228 struct channel_info *channel_info);
229 static
230 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
231 const struct lttng_evaluation *evaluation,
232 struct notification_client_list *client_list,
233 struct notification_thread_state *state,
234 uid_t channel_uid, gid_t channel_gid);
235
236
237 /* session_info API */
238 static
239 void session_info_destroy(void *_data);
240 static
241 void session_info_get(struct session_info *session_info);
242 static
243 void session_info_put(struct session_info *session_info);
244 static
245 struct session_info *session_info_create(const char *name,
246 uid_t uid, gid_t gid,
247 struct lttng_session_trigger_list *trigger_list,
248 struct cds_lfht *sessions_ht);
249 static
250 void session_info_add_channel(struct session_info *session_info,
251 struct channel_info *channel_info);
252 static
253 void session_info_remove_channel(struct session_info *session_info,
254 struct channel_info *channel_info);
255
256 /* lttng_session_trigger_list API */
257 static
258 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
259 const char *session_name,
260 struct cds_lfht *session_triggers_ht);
261 static
262 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
263 const struct notification_thread_state *state,
264 const char *session_name);
265 static
266 void lttng_session_trigger_list_destroy(
267 struct lttng_session_trigger_list *list);
268 static
269 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
270 const struct lttng_trigger *trigger);
271
272
273 static
274 int match_client_socket(struct cds_lfht_node *node, const void *key)
275 {
276 /* This double-cast is intended to supress pointer-to-cast warning. */
277 const int socket = (int) (intptr_t) key;
278 const struct notification_client *client = caa_container_of(node,
279 struct notification_client, client_socket_ht_node);
280
281 return client->socket == socket;
282 }
283
284 static
285 int match_client_id(struct cds_lfht_node *node, const void *key)
286 {
287 /* This double-cast is intended to supress pointer-to-cast warning. */
288 const notification_client_id id = *((notification_client_id *) key);
289 const struct notification_client *client = caa_container_of(
290 node, struct notification_client, client_id_ht_node);
291
292 return client->id == id;
293 }
294
295 static
296 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
297 {
298 struct channel_key *channel_key = (struct channel_key *) key;
299 struct lttng_channel_trigger_list *trigger_list;
300
301 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
302 channel_triggers_ht_node);
303
304 return !!((channel_key->key == trigger_list->channel_key.key) &&
305 (channel_key->domain == trigger_list->channel_key.domain));
306 }
307
308 static
309 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
310 {
311 const char *session_name = (const char *) key;
312 struct lttng_session_trigger_list *trigger_list;
313
314 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
315 session_triggers_ht_node);
316
317 return !!(strcmp(trigger_list->session_name, session_name) == 0);
318 }
319
320 static
321 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
322 {
323 struct channel_key *channel_key = (struct channel_key *) key;
324 struct channel_state_sample *sample;
325
326 sample = caa_container_of(node, struct channel_state_sample,
327 channel_state_ht_node);
328
329 return !!((channel_key->key == sample->key.key) &&
330 (channel_key->domain == sample->key.domain));
331 }
332
333 static
334 int match_channel_info(struct cds_lfht_node *node, const void *key)
335 {
336 struct channel_key *channel_key = (struct channel_key *) key;
337 struct channel_info *channel_info;
338
339 channel_info = caa_container_of(node, struct channel_info,
340 channels_ht_node);
341
342 return !!((channel_key->key == channel_info->key.key) &&
343 (channel_key->domain == channel_info->key.domain));
344 }
345
346 static
347 int match_condition(struct cds_lfht_node *node, const void *key)
348 {
349 struct lttng_condition *condition_key = (struct lttng_condition *) key;
350 struct lttng_trigger_ht_element *trigger;
351 struct lttng_condition *condition;
352
353 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
354 node);
355 condition = lttng_trigger_get_condition(trigger->trigger);
356 assert(condition);
357
358 return !!lttng_condition_is_equal(condition_key, condition);
359 }
360
361 static
362 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
363 {
364 struct lttng_condition *condition_key = (struct lttng_condition *) key;
365 struct notification_client_list *client_list;
366 const struct lttng_condition *condition;
367
368 assert(condition_key);
369
370 client_list = caa_container_of(node, struct notification_client_list,
371 notification_trigger_ht_node);
372 condition = lttng_trigger_get_const_condition(client_list->trigger);
373
374 return !!lttng_condition_is_equal(condition_key, condition);
375 }
376
377 static
378 int match_session(struct cds_lfht_node *node, const void *key)
379 {
380 const char *name = key;
381 struct session_info *session_info = caa_container_of(
382 node, struct session_info, sessions_ht_node);
383
384 return !strcmp(session_info->name, name);
385 }
386
387 static
388 unsigned long lttng_condition_buffer_usage_hash(
389 const struct lttng_condition *_condition)
390 {
391 unsigned long hash;
392 unsigned long condition_type;
393 struct lttng_condition_buffer_usage *condition;
394
395 condition = container_of(_condition,
396 struct lttng_condition_buffer_usage, parent);
397
398 condition_type = (unsigned long) condition->parent.type;
399 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
400 if (condition->session_name) {
401 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
402 }
403 if (condition->channel_name) {
404 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
405 }
406 if (condition->domain.set) {
407 hash ^= hash_key_ulong(
408 (void *) condition->domain.type,
409 lttng_ht_seed);
410 }
411 if (condition->threshold_ratio.set) {
412 uint64_t val;
413
414 val = condition->threshold_ratio.value * (double) UINT32_MAX;
415 hash ^= hash_key_u64(&val, lttng_ht_seed);
416 } else if (condition->threshold_bytes.set) {
417 uint64_t val;
418
419 val = condition->threshold_bytes.value;
420 hash ^= hash_key_u64(&val, lttng_ht_seed);
421 }
422 return hash;
423 }
424
425 static
426 unsigned long lttng_condition_session_consumed_size_hash(
427 const struct lttng_condition *_condition)
428 {
429 unsigned long hash;
430 unsigned long condition_type =
431 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
432 struct lttng_condition_session_consumed_size *condition;
433 uint64_t val;
434
435 condition = container_of(_condition,
436 struct lttng_condition_session_consumed_size, parent);
437
438 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
439 if (condition->session_name) {
440 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
441 }
442 val = condition->consumed_threshold_bytes.value;
443 hash ^= hash_key_u64(&val, lttng_ht_seed);
444 return hash;
445 }
446
447 static
448 unsigned long lttng_condition_session_rotation_hash(
449 const struct lttng_condition *_condition)
450 {
451 unsigned long hash, condition_type;
452 struct lttng_condition_session_rotation *condition;
453
454 condition = container_of(_condition,
455 struct lttng_condition_session_rotation, parent);
456 condition_type = (unsigned long) condition->parent.type;
457 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
458 assert(condition->session_name);
459 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
460 return hash;
461 }
462
463 /*
464 * The lttng_condition hashing code is kept in this file (rather than
465 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
466 * don't want to link in liblttng-ctl.
467 */
468 static
469 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
470 {
471 switch (condition->type) {
472 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
473 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
474 return lttng_condition_buffer_usage_hash(condition);
475 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
476 return lttng_condition_session_consumed_size_hash(condition);
477 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
478 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
479 return lttng_condition_session_rotation_hash(condition);
480 default:
481 ERR("[notification-thread] Unexpected condition type caught");
482 abort();
483 }
484 }
485
486 static
487 unsigned long hash_channel_key(struct channel_key *key)
488 {
489 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
490 unsigned long domain_hash = hash_key_ulong(
491 (void *) (unsigned long) key->domain, lttng_ht_seed);
492
493 return key_hash ^ domain_hash;
494 }
495
496 static
497 unsigned long hash_client_socket(int socket)
498 {
499 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
500 }
501
502 static
503 unsigned long hash_client_id(notification_client_id id)
504 {
505 return hash_key_u64(&id, lttng_ht_seed);
506 }
507
508 /*
509 * Get the type of object to which a given condition applies. Bindings let
510 * the notification system evaluate a trigger's condition when a given
511 * object's state is updated.
512 *
513 * For instance, a condition bound to a channel will be evaluated everytime
514 * the channel's state is changed by a channel monitoring sample.
515 */
516 static
517 enum lttng_object_type get_condition_binding_object(
518 const struct lttng_condition *condition)
519 {
520 switch (lttng_condition_get_type(condition)) {
521 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
522 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
523 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
524 return LTTNG_OBJECT_TYPE_CHANNEL;
525 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
526 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
527 return LTTNG_OBJECT_TYPE_SESSION;
528 default:
529 return LTTNG_OBJECT_TYPE_UNKNOWN;
530 }
531 }
532
533 static
534 void free_channel_info_rcu(struct rcu_head *node)
535 {
536 free(caa_container_of(node, struct channel_info, rcu_node));
537 }
538
539 static
540 void channel_info_destroy(struct channel_info *channel_info)
541 {
542 if (!channel_info) {
543 return;
544 }
545
546 if (channel_info->session_info) {
547 session_info_remove_channel(channel_info->session_info,
548 channel_info);
549 session_info_put(channel_info->session_info);
550 }
551 if (channel_info->name) {
552 free(channel_info->name);
553 }
554 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
555 }
556
557 static
558 void free_session_info_rcu(struct rcu_head *node)
559 {
560 free(caa_container_of(node, struct session_info, rcu_node));
561 }
562
563 /* Don't call directly, use the ref-counting mechanism. */
564 static
565 void session_info_destroy(void *_data)
566 {
567 struct session_info *session_info = _data;
568 int ret;
569
570 assert(session_info);
571 if (session_info->channel_infos_ht) {
572 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
573 if (ret) {
574 ERR("[notification-thread] Failed to destroy channel information hash table");
575 }
576 }
577 lttng_session_trigger_list_destroy(session_info->trigger_list);
578
579 rcu_read_lock();
580 cds_lfht_del(session_info->sessions_ht,
581 &session_info->sessions_ht_node);
582 rcu_read_unlock();
583 free(session_info->name);
584 call_rcu(&session_info->rcu_node, free_session_info_rcu);
585 }
586
587 static
588 void session_info_get(struct session_info *session_info)
589 {
590 if (!session_info) {
591 return;
592 }
593 lttng_ref_get(&session_info->ref);
594 }
595
596 static
597 void session_info_put(struct session_info *session_info)
598 {
599 if (!session_info) {
600 return;
601 }
602 lttng_ref_put(&session_info->ref);
603 }
604
605 static
606 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
607 struct lttng_session_trigger_list *trigger_list,
608 struct cds_lfht *sessions_ht)
609 {
610 struct session_info *session_info;
611
612 assert(name);
613
614 session_info = zmalloc(sizeof(*session_info));
615 if (!session_info) {
616 goto end;
617 }
618 lttng_ref_init(&session_info->ref, session_info_destroy);
619
620 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
621 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
622 if (!session_info->channel_infos_ht) {
623 goto error;
624 }
625
626 cds_lfht_node_init(&session_info->sessions_ht_node);
627 session_info->name = strdup(name);
628 if (!session_info->name) {
629 goto error;
630 }
631 session_info->uid = uid;
632 session_info->gid = gid;
633 session_info->trigger_list = trigger_list;
634 session_info->sessions_ht = sessions_ht;
635 end:
636 return session_info;
637 error:
638 session_info_put(session_info);
639 return NULL;
640 }
641
642 static
643 void session_info_add_channel(struct session_info *session_info,
644 struct channel_info *channel_info)
645 {
646 rcu_read_lock();
647 cds_lfht_add(session_info->channel_infos_ht,
648 hash_channel_key(&channel_info->key),
649 &channel_info->session_info_channels_ht_node);
650 rcu_read_unlock();
651 }
652
653 static
654 void session_info_remove_channel(struct session_info *session_info,
655 struct channel_info *channel_info)
656 {
657 rcu_read_lock();
658 cds_lfht_del(session_info->channel_infos_ht,
659 &channel_info->session_info_channels_ht_node);
660 rcu_read_unlock();
661 }
662
663 static
664 struct channel_info *channel_info_create(const char *channel_name,
665 struct channel_key *channel_key, uint64_t channel_capacity,
666 struct session_info *session_info)
667 {
668 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
669
670 if (!channel_info) {
671 goto end;
672 }
673
674 cds_lfht_node_init(&channel_info->channels_ht_node);
675 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
676 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
677 channel_info->capacity = channel_capacity;
678
679 channel_info->name = strdup(channel_name);
680 if (!channel_info->name) {
681 goto error;
682 }
683
684 /*
685 * Set the references between session and channel infos:
686 * - channel_info holds a strong reference to session_info
687 * - session_info holds a weak reference to channel_info
688 */
689 session_info_get(session_info);
690 session_info_add_channel(session_info, channel_info);
691 channel_info->session_info = session_info;
692 end:
693 return channel_info;
694 error:
695 channel_info_destroy(channel_info);
696 return NULL;
697 }
698
699 /* RCU read lock must be held by the caller. */
700 static
701 struct notification_client_list *get_client_list_from_condition(
702 struct notification_thread_state *state,
703 const struct lttng_condition *condition)
704 {
705 struct cds_lfht_node *node;
706 struct cds_lfht_iter iter;
707
708 cds_lfht_lookup(state->notification_trigger_clients_ht,
709 lttng_condition_hash(condition),
710 match_client_list_condition,
711 condition,
712 &iter);
713 node = cds_lfht_iter_get_node(&iter);
714
715 return node ? caa_container_of(node,
716 struct notification_client_list,
717 notification_trigger_ht_node) : NULL;
718 }
719
720 /* This function must be called with the RCU read lock held. */
721 static
722 int evaluate_channel_condition_for_client(
723 const struct lttng_condition *condition,
724 struct notification_thread_state *state,
725 struct lttng_evaluation **evaluation,
726 uid_t *session_uid, gid_t *session_gid)
727 {
728 int ret;
729 struct cds_lfht_iter iter;
730 struct cds_lfht_node *node;
731 struct channel_info *channel_info = NULL;
732 struct channel_key *channel_key = NULL;
733 struct channel_state_sample *last_sample = NULL;
734 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
735
736 /* Find the channel associated with the condition. */
737 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
738 channel_trigger_list, channel_triggers_ht_node) {
739 struct lttng_trigger_list_element *element;
740
741 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
742 const struct lttng_condition *current_condition =
743 lttng_trigger_get_const_condition(
744 element->trigger);
745
746 assert(current_condition);
747 if (!lttng_condition_is_equal(condition,
748 current_condition)) {
749 continue;
750 }
751
752 /* Found the trigger, save the channel key. */
753 channel_key = &channel_trigger_list->channel_key;
754 break;
755 }
756 if (channel_key) {
757 /* The channel key was found stop iteration. */
758 break;
759 }
760 }
761
762 if (!channel_key){
763 /* No channel found; normal exit. */
764 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
765 ret = 0;
766 goto end;
767 }
768
769 /* Fetch channel info for the matching channel. */
770 cds_lfht_lookup(state->channels_ht,
771 hash_channel_key(channel_key),
772 match_channel_info,
773 channel_key,
774 &iter);
775 node = cds_lfht_iter_get_node(&iter);
776 assert(node);
777 channel_info = caa_container_of(node, struct channel_info,
778 channels_ht_node);
779
780 /* Retrieve the channel's last sample, if it exists. */
781 cds_lfht_lookup(state->channel_state_ht,
782 hash_channel_key(channel_key),
783 match_channel_state_sample,
784 channel_key,
785 &iter);
786 node = cds_lfht_iter_get_node(&iter);
787 if (node) {
788 last_sample = caa_container_of(node,
789 struct channel_state_sample,
790 channel_state_ht_node);
791 } else {
792 /* Nothing to evaluate, no sample was ever taken. Normal exit */
793 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
794 ret = 0;
795 goto end;
796 }
797
798 ret = evaluate_buffer_condition(condition, evaluation, state,
799 NULL, last_sample,
800 0, channel_info->session_info->consumed_data_size,
801 channel_info);
802 if (ret) {
803 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
804 goto end;
805 }
806
807 *session_uid = channel_info->session_info->uid;
808 *session_gid = channel_info->session_info->gid;
809 end:
810 return ret;
811 }
812
813 static
814 const char *get_condition_session_name(const struct lttng_condition *condition)
815 {
816 const char *session_name = NULL;
817 enum lttng_condition_status status;
818
819 switch (lttng_condition_get_type(condition)) {
820 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
821 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
822 status = lttng_condition_buffer_usage_get_session_name(
823 condition, &session_name);
824 break;
825 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
826 status = lttng_condition_session_consumed_size_get_session_name(
827 condition, &session_name);
828 break;
829 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
830 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
831 status = lttng_condition_session_rotation_get_session_name(
832 condition, &session_name);
833 break;
834 default:
835 abort();
836 }
837 if (status != LTTNG_CONDITION_STATUS_OK) {
838 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
839 goto end;
840 }
841 end:
842 return session_name;
843 }
844
845 /* This function must be called with the RCU read lock held. */
846 static
847 int evaluate_session_condition_for_client(
848 const struct lttng_condition *condition,
849 struct notification_thread_state *state,
850 struct lttng_evaluation **evaluation,
851 uid_t *session_uid, gid_t *session_gid)
852 {
853 int ret;
854 struct cds_lfht_iter iter;
855 struct cds_lfht_node *node;
856 const char *session_name;
857 struct session_info *session_info = NULL;
858
859 session_name = get_condition_session_name(condition);
860
861 /* Find the session associated with the trigger. */
862 cds_lfht_lookup(state->sessions_ht,
863 hash_key_str(session_name, lttng_ht_seed),
864 match_session,
865 session_name,
866 &iter);
867 node = cds_lfht_iter_get_node(&iter);
868 if (!node) {
869 DBG("[notification-thread] No known session matching name \"%s\"",
870 session_name);
871 ret = 0;
872 goto end;
873 }
874
875 session_info = caa_container_of(node, struct session_info,
876 sessions_ht_node);
877 session_info_get(session_info);
878
879 /*
880 * Evaluation is performed in-line here since only one type of
881 * session-bound condition is handled for the moment.
882 */
883 switch (lttng_condition_get_type(condition)) {
884 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
885 if (!session_info->rotation.ongoing) {
886 ret = 0;
887 goto end_session_put;
888 }
889
890 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
891 session_info->rotation.id);
892 if (!*evaluation) {
893 /* Fatal error. */
894 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
895 session_info->name);
896 ret = -1;
897 goto end_session_put;
898 }
899 ret = 0;
900 break;
901 default:
902 ret = 0;
903 goto end_session_put;
904 }
905
906 *session_uid = session_info->uid;
907 *session_gid = session_info->gid;
908
909 end_session_put:
910 session_info_put(session_info);
911 end:
912 return ret;
913 }
914
915 /* This function must be called with the RCU read lock held. */
916 static
917 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
918 const struct lttng_condition *condition,
919 struct notification_client *client,
920 struct notification_thread_state *state)
921 {
922 int ret;
923 struct lttng_evaluation *evaluation = NULL;
924 struct notification_client_list client_list = { 0 };
925 struct notification_client_list_element client_list_element = { 0 };
926 uid_t object_uid = 0;
927 gid_t object_gid = 0;
928
929 assert(trigger);
930 assert(condition);
931 assert(client);
932 assert(state);
933
934 switch (get_condition_binding_object(condition)) {
935 case LTTNG_OBJECT_TYPE_SESSION:
936 ret = evaluate_session_condition_for_client(condition, state,
937 &evaluation, &object_uid, &object_gid);
938 break;
939 case LTTNG_OBJECT_TYPE_CHANNEL:
940 ret = evaluate_channel_condition_for_client(condition, state,
941 &evaluation, &object_uid, &object_gid);
942 break;
943 case LTTNG_OBJECT_TYPE_NONE:
944 ret = 0;
945 goto end;
946 case LTTNG_OBJECT_TYPE_UNKNOWN:
947 default:
948 ret = -1;
949 goto end;
950 }
951 if (ret) {
952 /* Fatal error. */
953 goto end;
954 }
955 if (!evaluation) {
956 /* Evaluation yielded nothing. Normal exit. */
957 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
958 ret = 0;
959 goto end;
960 }
961
962 /*
963 * Create a temporary client list with the client currently
964 * subscribing.
965 */
966 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
967 CDS_INIT_LIST_HEAD(&client_list.list);
968 client_list.trigger = trigger;
969
970 CDS_INIT_LIST_HEAD(&client_list_element.node);
971 client_list_element.client = client;
972 cds_list_add(&client_list_element.node, &client_list.list);
973
974 /* Send evaluation result to the newly-subscribed client. */
975 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
976 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
977 state, object_uid, object_gid);
978
979 end:
980 return ret;
981 }
982
983 static
984 int notification_thread_client_subscribe(struct notification_client *client,
985 struct lttng_condition *condition,
986 struct notification_thread_state *state,
987 enum lttng_notification_channel_status *_status)
988 {
989 int ret = 0;
990 struct notification_client_list *client_list;
991 struct lttng_condition_list_element *condition_list_element = NULL;
992 struct notification_client_list_element *client_list_element = NULL;
993 enum lttng_notification_channel_status status =
994 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
995
996 /*
997 * Ensure that the client has not already subscribed to this condition
998 * before.
999 */
1000 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1001 if (lttng_condition_is_equal(condition_list_element->condition,
1002 condition)) {
1003 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1004 goto end;
1005 }
1006 }
1007
1008 condition_list_element = zmalloc(sizeof(*condition_list_element));
1009 if (!condition_list_element) {
1010 ret = -1;
1011 goto error;
1012 }
1013 client_list_element = zmalloc(sizeof(*client_list_element));
1014 if (!client_list_element) {
1015 ret = -1;
1016 goto error;
1017 }
1018
1019 rcu_read_lock();
1020
1021 /*
1022 * Add the newly-subscribed condition to the client's subscription list.
1023 */
1024 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1025 condition_list_element->condition = condition;
1026 cds_list_add(&condition_list_element->node, &client->condition_list);
1027
1028 client_list = get_client_list_from_condition(state, condition);
1029 if (!client_list) {
1030 /*
1031 * No notification-emiting trigger registered with this
1032 * condition. We don't evaluate the condition right away
1033 * since this trigger is not registered yet.
1034 */
1035 free(client_list_element);
1036 goto end_unlock;
1037 }
1038
1039 /*
1040 * The condition to which the client just subscribed is evaluated
1041 * at this point so that conditions that are already TRUE result
1042 * in a notification being sent out.
1043 */
1044 if (evaluate_condition_for_client(client_list->trigger, condition,
1045 client, state)) {
1046 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1047 ret = -1;
1048 free(client_list_element);
1049 goto end_unlock;
1050 }
1051
1052 /*
1053 * Add the client to the list of clients interested in a given trigger
1054 * if a "notification" trigger with a corresponding condition was
1055 * added prior.
1056 */
1057 client_list_element->client = client;
1058 CDS_INIT_LIST_HEAD(&client_list_element->node);
1059 cds_list_add(&client_list_element->node, &client_list->list);
1060 end_unlock:
1061 rcu_read_unlock();
1062 end:
1063 if (_status) {
1064 *_status = status;
1065 }
1066 return ret;
1067 error:
1068 free(condition_list_element);
1069 free(client_list_element);
1070 return ret;
1071 }
1072
1073 static
1074 int notification_thread_client_unsubscribe(
1075 struct notification_client *client,
1076 struct lttng_condition *condition,
1077 struct notification_thread_state *state,
1078 enum lttng_notification_channel_status *_status)
1079 {
1080 struct notification_client_list *client_list;
1081 struct lttng_condition_list_element *condition_list_element,
1082 *condition_tmp;
1083 struct notification_client_list_element *client_list_element,
1084 *client_tmp;
1085 bool condition_found = false;
1086 enum lttng_notification_channel_status status =
1087 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1088
1089 /* Remove the condition from the client's condition list. */
1090 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1091 &client->condition_list, node) {
1092 if (!lttng_condition_is_equal(condition_list_element->condition,
1093 condition)) {
1094 continue;
1095 }
1096
1097 cds_list_del(&condition_list_element->node);
1098 /*
1099 * The caller may be iterating on the client's conditions to
1100 * tear down a client's connection. In this case, the condition
1101 * will be destroyed at the end.
1102 */
1103 if (condition != condition_list_element->condition) {
1104 lttng_condition_destroy(
1105 condition_list_element->condition);
1106 }
1107 free(condition_list_element);
1108 condition_found = true;
1109 break;
1110 }
1111
1112 if (!condition_found) {
1113 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1114 goto end;
1115 }
1116
1117 /*
1118 * Remove the client from the list of clients interested the trigger
1119 * matching the condition.
1120 */
1121 rcu_read_lock();
1122 client_list = get_client_list_from_condition(state, condition);
1123 if (!client_list) {
1124 goto end_unlock;
1125 }
1126
1127 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1128 &client_list->list, node) {
1129 if (client_list_element->client->socket != client->socket) {
1130 continue;
1131 }
1132 cds_list_del(&client_list_element->node);
1133 free(client_list_element);
1134 break;
1135 }
1136 end_unlock:
1137 rcu_read_unlock();
1138 end:
1139 lttng_condition_destroy(condition);
1140 if (_status) {
1141 *_status = status;
1142 }
1143 return 0;
1144 }
1145
1146 static
1147 void free_notification_client_rcu(struct rcu_head *node)
1148 {
1149 free(caa_container_of(node, struct notification_client, rcu_node));
1150 }
1151
1152 static
1153 void notification_client_destroy(struct notification_client *client,
1154 struct notification_thread_state *state)
1155 {
1156 struct lttng_condition_list_element *condition_list_element, *tmp;
1157
1158 if (!client) {
1159 return;
1160 }
1161
1162 /* Release all conditions to which the client was subscribed. */
1163 cds_list_for_each_entry_safe(condition_list_element, tmp,
1164 &client->condition_list, node) {
1165 (void) notification_thread_client_unsubscribe(client,
1166 condition_list_element->condition, state, NULL);
1167 }
1168
1169 if (client->socket >= 0) {
1170 (void) lttcomm_close_unix_sock(client->socket);
1171 client->socket = -1;
1172 }
1173 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1174 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1175 call_rcu(&client->rcu_node, free_notification_client_rcu);
1176 }
1177
1178 /*
1179 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1180 * client pointer).
1181 */
1182 static
1183 struct notification_client *get_client_from_socket(int socket,
1184 struct notification_thread_state *state)
1185 {
1186 struct cds_lfht_iter iter;
1187 struct cds_lfht_node *node;
1188 struct notification_client *client = NULL;
1189
1190 cds_lfht_lookup(state->client_socket_ht,
1191 hash_client_socket(socket),
1192 match_client_socket,
1193 (void *) (unsigned long) socket,
1194 &iter);
1195 node = cds_lfht_iter_get_node(&iter);
1196 if (!node) {
1197 goto end;
1198 }
1199
1200 client = caa_container_of(node, struct notification_client,
1201 client_socket_ht_node);
1202 end:
1203 return client;
1204 }
1205
1206 /*
1207 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1208 * client pointer).
1209 */
1210 static
1211 struct notification_client *get_client_from_id(notification_client_id id,
1212 struct notification_thread_state *state)
1213 {
1214 struct cds_lfht_iter iter;
1215 struct cds_lfht_node *node;
1216 struct notification_client *client = NULL;
1217
1218 cds_lfht_lookup(state->client_id_ht,
1219 hash_client_id(id),
1220 match_client_id,
1221 &id,
1222 &iter);
1223 node = cds_lfht_iter_get_node(&iter);
1224 if (!node) {
1225 goto end;
1226 }
1227
1228 client = caa_container_of(node, struct notification_client,
1229 client_id_ht_node);
1230 end:
1231 return client;
1232 }
1233
1234 static
1235 bool buffer_usage_condition_applies_to_channel(
1236 const struct lttng_condition *condition,
1237 const struct channel_info *channel_info)
1238 {
1239 enum lttng_condition_status status;
1240 enum lttng_domain_type condition_domain;
1241 const char *condition_session_name = NULL;
1242 const char *condition_channel_name = NULL;
1243
1244 status = lttng_condition_buffer_usage_get_domain_type(condition,
1245 &condition_domain);
1246 assert(status == LTTNG_CONDITION_STATUS_OK);
1247 if (channel_info->key.domain != condition_domain) {
1248 goto fail;
1249 }
1250
1251 status = lttng_condition_buffer_usage_get_session_name(
1252 condition, &condition_session_name);
1253 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1254
1255 status = lttng_condition_buffer_usage_get_channel_name(
1256 condition, &condition_channel_name);
1257 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1258
1259 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1260 goto fail;
1261 }
1262 if (strcmp(channel_info->name, condition_channel_name)) {
1263 goto fail;
1264 }
1265
1266 return true;
1267 fail:
1268 return false;
1269 }
1270
1271 static
1272 bool session_consumed_size_condition_applies_to_channel(
1273 const struct lttng_condition *condition,
1274 const struct channel_info *channel_info)
1275 {
1276 enum lttng_condition_status status;
1277 const char *condition_session_name = NULL;
1278
1279 status = lttng_condition_session_consumed_size_get_session_name(
1280 condition, &condition_session_name);
1281 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1282
1283 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1284 goto fail;
1285 }
1286
1287 return true;
1288 fail:
1289 return false;
1290 }
1291
1292 static
1293 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1294 const struct channel_info *channel_info)
1295 {
1296 const struct lttng_condition *condition;
1297 bool trigger_applies;
1298
1299 condition = lttng_trigger_get_const_condition(trigger);
1300 if (!condition) {
1301 goto fail;
1302 }
1303
1304 switch (lttng_condition_get_type(condition)) {
1305 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1306 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1307 trigger_applies = buffer_usage_condition_applies_to_channel(
1308 condition, channel_info);
1309 break;
1310 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1311 trigger_applies = session_consumed_size_condition_applies_to_channel(
1312 condition, channel_info);
1313 break;
1314 default:
1315 goto fail;
1316 }
1317
1318 return trigger_applies;
1319 fail:
1320 return false;
1321 }
1322
1323 static
1324 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1325 struct notification_client *client)
1326 {
1327 bool applies = false;
1328 struct lttng_condition_list_element *condition_list_element;
1329
1330 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1331 node) {
1332 applies = lttng_condition_is_equal(
1333 condition_list_element->condition,
1334 lttng_trigger_get_condition(trigger));
1335 if (applies) {
1336 break;
1337 }
1338 }
1339 return applies;
1340 }
1341
1342 /* Must be called with RCU read lock held. */
1343 static
1344 struct lttng_session_trigger_list *get_session_trigger_list(
1345 struct notification_thread_state *state,
1346 const char *session_name)
1347 {
1348 struct lttng_session_trigger_list *list = NULL;
1349 struct cds_lfht_node *node;
1350 struct cds_lfht_iter iter;
1351
1352 cds_lfht_lookup(state->session_triggers_ht,
1353 hash_key_str(session_name, lttng_ht_seed),
1354 match_session_trigger_list,
1355 session_name,
1356 &iter);
1357 node = cds_lfht_iter_get_node(&iter);
1358 if (!node) {
1359 /*
1360 * Not an error, the list of triggers applying to that session
1361 * will be initialized when the session is created.
1362 */
1363 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1364 session_name);
1365 goto end;
1366 }
1367
1368 list = caa_container_of(node,
1369 struct lttng_session_trigger_list,
1370 session_triggers_ht_node);
1371 end:
1372 return list;
1373 }
1374
1375 /*
1376 * Allocate an empty lttng_session_trigger_list for the session named
1377 * 'session_name'.
1378 *
1379 * No ownership of 'session_name' is assumed by the session trigger list.
1380 * It is the caller's responsability to ensure the session name is alive
1381 * for as long as this list is.
1382 */
1383 static
1384 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1385 const char *session_name,
1386 struct cds_lfht *session_triggers_ht)
1387 {
1388 struct lttng_session_trigger_list *list;
1389
1390 list = zmalloc(sizeof(*list));
1391 if (!list) {
1392 goto end;
1393 }
1394 list->session_name = session_name;
1395 CDS_INIT_LIST_HEAD(&list->list);
1396 cds_lfht_node_init(&list->session_triggers_ht_node);
1397 list->session_triggers_ht = session_triggers_ht;
1398
1399 rcu_read_lock();
1400 /* Publish the list through the session_triggers_ht. */
1401 cds_lfht_add(session_triggers_ht,
1402 hash_key_str(session_name, lttng_ht_seed),
1403 &list->session_triggers_ht_node);
1404 rcu_read_unlock();
1405 end:
1406 return list;
1407 }
1408
1409 static
1410 void free_session_trigger_list_rcu(struct rcu_head *node)
1411 {
1412 free(caa_container_of(node, struct lttng_session_trigger_list,
1413 rcu_node));
1414 }
1415
1416 static
1417 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1418 {
1419 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1420
1421 /* Empty the list element by element, and then free the list itself. */
1422 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1423 &list->list, node) {
1424 cds_list_del(&trigger_list_element->node);
1425 free(trigger_list_element);
1426 }
1427 rcu_read_lock();
1428 /* Unpublish the list from the session_triggers_ht. */
1429 cds_lfht_del(list->session_triggers_ht,
1430 &list->session_triggers_ht_node);
1431 rcu_read_unlock();
1432 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1433 }
1434
1435 static
1436 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1437 const struct lttng_trigger *trigger)
1438 {
1439 int ret = 0;
1440 struct lttng_trigger_list_element *new_element =
1441 zmalloc(sizeof(*new_element));
1442
1443 if (!new_element) {
1444 ret = -1;
1445 goto end;
1446 }
1447 CDS_INIT_LIST_HEAD(&new_element->node);
1448 new_element->trigger = trigger;
1449 cds_list_add(&new_element->node, &list->list);
1450 end:
1451 return ret;
1452 }
1453
1454 static
1455 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1456 const char *session_name)
1457 {
1458 bool applies = false;
1459 const struct lttng_condition *condition;
1460
1461 condition = lttng_trigger_get_const_condition(trigger);
1462 switch (lttng_condition_get_type(condition)) {
1463 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1464 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1465 {
1466 enum lttng_condition_status condition_status;
1467 const char *condition_session_name;
1468
1469 condition_status = lttng_condition_session_rotation_get_session_name(
1470 condition, &condition_session_name);
1471 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1472 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1473 goto end;
1474 }
1475
1476 assert(condition_session_name);
1477 applies = !strcmp(condition_session_name, session_name);
1478 break;
1479 }
1480 default:
1481 goto end;
1482 }
1483 end:
1484 return applies;
1485 }
1486
1487 /*
1488 * Allocate and initialize an lttng_session_trigger_list which contains
1489 * all triggers that apply to the session named 'session_name'.
1490 *
1491 * No ownership of 'session_name' is assumed by the session trigger list.
1492 * It is the caller's responsability to ensure the session name is alive
1493 * for as long as this list is.
1494 */
1495 static
1496 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1497 const struct notification_thread_state *state,
1498 const char *session_name)
1499 {
1500 int trigger_count = 0;
1501 struct lttng_session_trigger_list *session_trigger_list = NULL;
1502 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1503 struct cds_lfht_iter iter;
1504
1505 session_trigger_list = lttng_session_trigger_list_create(session_name,
1506 state->session_triggers_ht);
1507
1508 /* Add all triggers applying to the session named 'session_name'. */
1509 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1510 node) {
1511 int ret;
1512
1513 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1514 session_name)) {
1515 continue;
1516 }
1517
1518 ret = lttng_session_trigger_list_add(session_trigger_list,
1519 trigger_ht_element->trigger);
1520 if (ret) {
1521 goto error;
1522 }
1523
1524 trigger_count++;
1525 }
1526
1527 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1528 trigger_count);
1529 return session_trigger_list;
1530 error:
1531 lttng_session_trigger_list_destroy(session_trigger_list);
1532 return NULL;
1533 }
1534
1535 static
1536 struct session_info *find_or_create_session_info(
1537 struct notification_thread_state *state,
1538 const char *name, uid_t uid, gid_t gid)
1539 {
1540 struct session_info *session = NULL;
1541 struct cds_lfht_node *node;
1542 struct cds_lfht_iter iter;
1543 struct lttng_session_trigger_list *trigger_list;
1544
1545 rcu_read_lock();
1546 cds_lfht_lookup(state->sessions_ht,
1547 hash_key_str(name, lttng_ht_seed),
1548 match_session,
1549 name,
1550 &iter);
1551 node = cds_lfht_iter_get_node(&iter);
1552 if (node) {
1553 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1554 name, uid, gid);
1555 session = caa_container_of(node, struct session_info,
1556 sessions_ht_node);
1557 assert(session->uid == uid);
1558 assert(session->gid == gid);
1559 session_info_get(session);
1560 goto end;
1561 }
1562
1563 trigger_list = lttng_session_trigger_list_build(state, name);
1564 if (!trigger_list) {
1565 goto error;
1566 }
1567
1568 session = session_info_create(name, uid, gid, trigger_list,
1569 state->sessions_ht);
1570 if (!session) {
1571 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1572 name, uid, gid);
1573 lttng_session_trigger_list_destroy(trigger_list);
1574 goto error;
1575 }
1576 trigger_list = NULL;
1577
1578 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1579 &session->sessions_ht_node);
1580 end:
1581 rcu_read_unlock();
1582 return session;
1583 error:
1584 rcu_read_unlock();
1585 session_info_put(session);
1586 return NULL;
1587 }
1588
1589 static
1590 int handle_notification_thread_command_add_channel(
1591 struct notification_thread_state *state,
1592 const char *session_name, uid_t session_uid, gid_t session_gid,
1593 const char *channel_name, enum lttng_domain_type channel_domain,
1594 uint64_t channel_key_int, uint64_t channel_capacity,
1595 enum lttng_error_code *cmd_result)
1596 {
1597 struct cds_list_head trigger_list;
1598 struct channel_info *new_channel_info = NULL;
1599 struct channel_key channel_key = {
1600 .key = channel_key_int,
1601 .domain = channel_domain,
1602 };
1603 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1604 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1605 int trigger_count = 0;
1606 struct cds_lfht_iter iter;
1607 struct session_info *session_info = NULL;
1608
1609 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1610 channel_name, session_name, channel_key_int,
1611 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1612
1613 CDS_INIT_LIST_HEAD(&trigger_list);
1614
1615 session_info = find_or_create_session_info(state, session_name,
1616 session_uid, session_gid);
1617 if (!session_info) {
1618 /* Allocation error or an internal error occurred. */
1619 goto error;
1620 }
1621
1622 new_channel_info = channel_info_create(channel_name, &channel_key,
1623 channel_capacity, session_info);
1624 if (!new_channel_info) {
1625 goto error;
1626 }
1627
1628 rcu_read_lock();
1629 /* Build a list of all triggers applying to the new channel. */
1630 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1631 node) {
1632 struct lttng_trigger_list_element *new_element;
1633
1634 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1635 new_channel_info)) {
1636 continue;
1637 }
1638
1639 new_element = zmalloc(sizeof(*new_element));
1640 if (!new_element) {
1641 rcu_read_unlock();
1642 goto error;
1643 }
1644 CDS_INIT_LIST_HEAD(&new_element->node);
1645 new_element->trigger = trigger_ht_element->trigger;
1646 cds_list_add(&new_element->node, &trigger_list);
1647 trigger_count++;
1648 }
1649 rcu_read_unlock();
1650
1651 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1652 trigger_count);
1653 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1654 if (!channel_trigger_list) {
1655 goto error;
1656 }
1657 channel_trigger_list->channel_key = new_channel_info->key;
1658 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1659 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1660 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1661
1662 rcu_read_lock();
1663 /* Add channel to the channel_ht which owns the channel_infos. */
1664 cds_lfht_add(state->channels_ht,
1665 hash_channel_key(&new_channel_info->key),
1666 &new_channel_info->channels_ht_node);
1667 /*
1668 * Add the list of triggers associated with this channel to the
1669 * channel_triggers_ht.
1670 */
1671 cds_lfht_add(state->channel_triggers_ht,
1672 hash_channel_key(&new_channel_info->key),
1673 &channel_trigger_list->channel_triggers_ht_node);
1674 rcu_read_unlock();
1675 session_info_put(session_info);
1676 *cmd_result = LTTNG_OK;
1677 return 0;
1678 error:
1679 channel_info_destroy(new_channel_info);
1680 session_info_put(session_info);
1681 return 1;
1682 }
1683
1684 static
1685 void free_channel_trigger_list_rcu(struct rcu_head *node)
1686 {
1687 free(caa_container_of(node, struct lttng_channel_trigger_list,
1688 rcu_node));
1689 }
1690
1691 static
1692 void free_channel_state_sample_rcu(struct rcu_head *node)
1693 {
1694 free(caa_container_of(node, struct channel_state_sample,
1695 rcu_node));
1696 }
1697
1698 static
1699 int handle_notification_thread_command_remove_channel(
1700 struct notification_thread_state *state,
1701 uint64_t channel_key, enum lttng_domain_type domain,
1702 enum lttng_error_code *cmd_result)
1703 {
1704 struct cds_lfht_node *node;
1705 struct cds_lfht_iter iter;
1706 struct lttng_channel_trigger_list *trigger_list;
1707 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1708 struct channel_key key = { .key = channel_key, .domain = domain };
1709 struct channel_info *channel_info;
1710
1711 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1712 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1713
1714 rcu_read_lock();
1715
1716 cds_lfht_lookup(state->channel_triggers_ht,
1717 hash_channel_key(&key),
1718 match_channel_trigger_list,
1719 &key,
1720 &iter);
1721 node = cds_lfht_iter_get_node(&iter);
1722 /*
1723 * There is a severe internal error if we are being asked to remove a
1724 * channel that doesn't exist.
1725 */
1726 if (!node) {
1727 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1728 goto end;
1729 }
1730
1731 /* Free the list of triggers associated with this channel. */
1732 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1733 channel_triggers_ht_node);
1734 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1735 &trigger_list->list, node) {
1736 cds_list_del(&trigger_list_element->node);
1737 free(trigger_list_element);
1738 }
1739 cds_lfht_del(state->channel_triggers_ht, node);
1740 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1741
1742 /* Free sampled channel state. */
1743 cds_lfht_lookup(state->channel_state_ht,
1744 hash_channel_key(&key),
1745 match_channel_state_sample,
1746 &key,
1747 &iter);
1748 node = cds_lfht_iter_get_node(&iter);
1749 /*
1750 * This is expected to be NULL if the channel is destroyed before we
1751 * received a sample.
1752 */
1753 if (node) {
1754 struct channel_state_sample *sample = caa_container_of(node,
1755 struct channel_state_sample,
1756 channel_state_ht_node);
1757
1758 cds_lfht_del(state->channel_state_ht, node);
1759 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1760 }
1761
1762 /* Remove the channel from the channels_ht and free it. */
1763 cds_lfht_lookup(state->channels_ht,
1764 hash_channel_key(&key),
1765 match_channel_info,
1766 &key,
1767 &iter);
1768 node = cds_lfht_iter_get_node(&iter);
1769 assert(node);
1770 channel_info = caa_container_of(node, struct channel_info,
1771 channels_ht_node);
1772 cds_lfht_del(state->channels_ht, node);
1773 channel_info_destroy(channel_info);
1774 end:
1775 rcu_read_unlock();
1776 *cmd_result = LTTNG_OK;
1777 return 0;
1778 }
1779
1780 static
1781 int handle_notification_thread_command_session_rotation(
1782 struct notification_thread_state *state,
1783 enum notification_thread_command_type cmd_type,
1784 const char *session_name, uid_t session_uid, gid_t session_gid,
1785 uint64_t trace_archive_chunk_id,
1786 struct lttng_trace_archive_location *location,
1787 enum lttng_error_code *_cmd_result)
1788 {
1789 int ret = 0;
1790 enum lttng_error_code cmd_result = LTTNG_OK;
1791 struct lttng_session_trigger_list *trigger_list;
1792 struct lttng_trigger_list_element *trigger_list_element;
1793 struct session_info *session_info;
1794
1795 rcu_read_lock();
1796
1797 session_info = find_or_create_session_info(state, session_name,
1798 session_uid, session_gid);
1799 if (!session_info) {
1800 /* Allocation error or an internal error occurred. */
1801 ret = -1;
1802 cmd_result = LTTNG_ERR_NOMEM;
1803 goto end;
1804 }
1805
1806 session_info->rotation.ongoing =
1807 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1808 session_info->rotation.id = trace_archive_chunk_id;
1809 trigger_list = get_session_trigger_list(state, session_name);
1810 if (!trigger_list) {
1811 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1812 session_name);
1813 goto end;
1814 }
1815
1816 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1817 node) {
1818 const struct lttng_condition *condition;
1819 const struct lttng_action *action;
1820 const struct lttng_trigger *trigger;
1821 struct notification_client_list *client_list;
1822 struct lttng_evaluation *evaluation = NULL;
1823 enum lttng_condition_type condition_type;
1824
1825 trigger = trigger_list_element->trigger;
1826 condition = lttng_trigger_get_const_condition(trigger);
1827 assert(condition);
1828 condition_type = lttng_condition_get_type(condition);
1829
1830 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1831 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1832 continue;
1833 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1834 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1835 continue;
1836 }
1837
1838 action = lttng_trigger_get_const_action(trigger);
1839
1840 /* Notify actions are the only type currently supported. */
1841 assert(lttng_action_get_type_const(action) ==
1842 LTTNG_ACTION_TYPE_NOTIFY);
1843
1844 client_list = get_client_list_from_condition(state, condition);
1845 assert(client_list);
1846
1847 if (cds_list_empty(&client_list->list)) {
1848 /*
1849 * No clients interested in the evaluation's result,
1850 * skip it.
1851 */
1852 continue;
1853 }
1854
1855 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1856 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1857 trace_archive_chunk_id);
1858 } else {
1859 evaluation = lttng_evaluation_session_rotation_completed_create(
1860 trace_archive_chunk_id, location);
1861 }
1862
1863 if (!evaluation) {
1864 /* Internal error */
1865 ret = -1;
1866 cmd_result = LTTNG_ERR_UNK;
1867 goto end;
1868 }
1869
1870 /* Dispatch evaluation result to all clients. */
1871 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1872 evaluation, client_list, state,
1873 session_info->uid,
1874 session_info->gid);
1875 lttng_evaluation_destroy(evaluation);
1876 if (caa_unlikely(ret)) {
1877 goto end;
1878 }
1879 }
1880 end:
1881 session_info_put(session_info);
1882 *_cmd_result = cmd_result;
1883 rcu_read_unlock();
1884 return ret;
1885 }
1886
1887 static
1888 int condition_is_supported(struct lttng_condition *condition)
1889 {
1890 int ret;
1891
1892 switch (lttng_condition_get_type(condition)) {
1893 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1894 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1895 {
1896 enum lttng_domain_type domain;
1897
1898 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1899 &domain);
1900 if (ret) {
1901 ret = -1;
1902 goto end;
1903 }
1904
1905 if (domain != LTTNG_DOMAIN_KERNEL) {
1906 ret = 1;
1907 goto end;
1908 }
1909
1910 /*
1911 * Older kernel tracers don't expose the API to monitor their
1912 * buffers. Therefore, we reject triggers that require that
1913 * mechanism to be available to be evaluated.
1914 */
1915 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1916 break;
1917 }
1918 default:
1919 ret = 1;
1920 }
1921 end:
1922 return ret;
1923 }
1924
1925 /* Must be called with RCU read lock held. */
1926 static
1927 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1928 struct notification_thread_state *state)
1929 {
1930 int ret = 0;
1931 const struct lttng_condition *condition;
1932 const char *session_name;
1933 struct lttng_session_trigger_list *trigger_list;
1934
1935 condition = lttng_trigger_get_const_condition(trigger);
1936 switch (lttng_condition_get_type(condition)) {
1937 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1938 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1939 {
1940 enum lttng_condition_status status;
1941
1942 status = lttng_condition_session_rotation_get_session_name(
1943 condition, &session_name);
1944 if (status != LTTNG_CONDITION_STATUS_OK) {
1945 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1946 ret = -1;
1947 goto end;
1948 }
1949 break;
1950 }
1951 default:
1952 ret = -1;
1953 goto end;
1954 }
1955
1956 trigger_list = get_session_trigger_list(state, session_name);
1957 if (!trigger_list) {
1958 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1959 session_name);
1960 goto end;
1961
1962 }
1963
1964 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1965 session_name);
1966 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1967 end:
1968 return ret;
1969 }
1970
1971 /* Must be called with RCU read lock held. */
1972 static
1973 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1974 struct notification_thread_state *state)
1975 {
1976 int ret = 0;
1977 struct cds_lfht_node *node;
1978 struct cds_lfht_iter iter;
1979 struct channel_info *channel;
1980
1981 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1982 channels_ht_node) {
1983 struct lttng_trigger_list_element *trigger_list_element;
1984 struct lttng_channel_trigger_list *trigger_list;
1985 struct cds_lfht_iter lookup_iter;
1986
1987 if (!trigger_applies_to_channel(trigger, channel)) {
1988 continue;
1989 }
1990
1991 cds_lfht_lookup(state->channel_triggers_ht,
1992 hash_channel_key(&channel->key),
1993 match_channel_trigger_list,
1994 &channel->key,
1995 &lookup_iter);
1996 node = cds_lfht_iter_get_node(&lookup_iter);
1997 assert(node);
1998 trigger_list = caa_container_of(node,
1999 struct lttng_channel_trigger_list,
2000 channel_triggers_ht_node);
2001
2002 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2003 if (!trigger_list_element) {
2004 ret = -1;
2005 goto end;
2006 }
2007 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2008 trigger_list_element->trigger = trigger;
2009 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2010 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2011 channel->name);
2012 }
2013 end:
2014 return ret;
2015 }
2016
2017 /*
2018 * FIXME A client's credentials are not checked when registering a trigger, nor
2019 * are they stored alongside with the trigger.
2020 *
2021 * The effects of this are benign since:
2022 * - The client will succeed in registering the trigger, as it is valid,
2023 * - The trigger will, internally, be bound to the channel/session,
2024 * - The notifications will not be sent since the client's credentials
2025 * are checked against the channel at that moment.
2026 *
2027 * If this function returns a non-zero value, it means something is
2028 * fundamentally broken and the whole subsystem/thread will be torn down.
2029 *
2030 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2031 * error code.
2032 */
2033 static
2034 int handle_notification_thread_command_register_trigger(
2035 struct notification_thread_state *state,
2036 struct lttng_trigger *trigger,
2037 enum lttng_error_code *cmd_result)
2038 {
2039 int ret = 0;
2040 struct lttng_condition *condition;
2041 struct notification_client *client;
2042 struct notification_client_list *client_list = NULL;
2043 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2044 struct notification_client_list_element *client_list_element, *tmp;
2045 struct cds_lfht_node *node;
2046 struct cds_lfht_iter iter;
2047 bool free_trigger = true;
2048
2049 rcu_read_lock();
2050
2051 condition = lttng_trigger_get_condition(trigger);
2052 assert(condition);
2053
2054 ret = condition_is_supported(condition);
2055 if (ret < 0) {
2056 goto error;
2057 } else if (ret == 0) {
2058 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2059 goto error;
2060 } else {
2061 /* Feature is supported, continue. */
2062 ret = 0;
2063 }
2064
2065 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2066 if (!trigger_ht_element) {
2067 ret = -1;
2068 goto error;
2069 }
2070
2071 /* Add trigger to the trigger_ht. */
2072 cds_lfht_node_init(&trigger_ht_element->node);
2073 trigger_ht_element->trigger = trigger;
2074
2075 node = cds_lfht_add_unique(state->triggers_ht,
2076 lttng_condition_hash(condition),
2077 match_condition,
2078 condition,
2079 &trigger_ht_element->node);
2080 if (node != &trigger_ht_element->node) {
2081 /* Not a fatal error, simply report it to the client. */
2082 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2083 goto error_free_ht_element;
2084 }
2085
2086 /*
2087 * Ownership of the trigger and of its wrapper was transfered to
2088 * the triggers_ht.
2089 */
2090 trigger_ht_element = NULL;
2091 free_trigger = false;
2092
2093 /*
2094 * The rest only applies to triggers that have a "notify" action.
2095 * It is not skipped as this is the only action type currently
2096 * supported.
2097 */
2098 client_list = zmalloc(sizeof(*client_list));
2099 if (!client_list) {
2100 ret = -1;
2101 goto error_free_ht_element;
2102 }
2103 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2104 CDS_INIT_LIST_HEAD(&client_list->list);
2105 client_list->trigger = trigger;
2106
2107 /* Build a list of clients to which this new trigger applies. */
2108 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2109 client_socket_ht_node) {
2110 if (!trigger_applies_to_client(trigger, client)) {
2111 continue;
2112 }
2113
2114 client_list_element = zmalloc(sizeof(*client_list_element));
2115 if (!client_list_element) {
2116 ret = -1;
2117 goto error_free_client_list;
2118 }
2119 CDS_INIT_LIST_HEAD(&client_list_element->node);
2120 client_list_element->client = client;
2121 cds_list_add(&client_list_element->node, &client_list->list);
2122 }
2123
2124 cds_lfht_add(state->notification_trigger_clients_ht,
2125 lttng_condition_hash(condition),
2126 &client_list->notification_trigger_ht_node);
2127
2128 switch (get_condition_binding_object(condition)) {
2129 case LTTNG_OBJECT_TYPE_SESSION:
2130 /* Add the trigger to the list if it matches a known session. */
2131 ret = bind_trigger_to_matching_session(trigger, state);
2132 if (ret) {
2133 goto error_free_client_list;
2134 }
2135 break;
2136 case LTTNG_OBJECT_TYPE_CHANNEL:
2137 /*
2138 * Add the trigger to list of triggers bound to the channels
2139 * currently known.
2140 */
2141 ret = bind_trigger_to_matching_channels(trigger, state);
2142 if (ret) {
2143 goto error_free_client_list;
2144 }
2145 break;
2146 case LTTNG_OBJECT_TYPE_NONE:
2147 break;
2148 default:
2149 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2150 ret = -1;
2151 goto error_free_client_list;
2152 }
2153
2154 /*
2155 * Since there is nothing preventing clients from subscribing to a
2156 * condition before the corresponding trigger is registered, we have
2157 * to evaluate this new condition right away.
2158 *
2159 * At some point, we were waiting for the next "evaluation" (e.g. on
2160 * reception of a channel sample) to evaluate this new condition, but
2161 * that was broken.
2162 *
2163 * The reason it was broken is that waiting for the next sample
2164 * does not allow us to properly handle transitions for edge-triggered
2165 * conditions.
2166 *
2167 * Consider this example: when we handle a new channel sample, we
2168 * evaluate each conditions twice: once with the previous state, and
2169 * again with the newest state. We then use those two results to
2170 * determine whether a state change happened: a condition was false and
2171 * became true. If a state change happened, we have to notify clients.
2172 *
2173 * Now, if a client subscribes to a given notification and registers
2174 * a trigger *after* that subscription, we have to make sure the
2175 * condition is evaluated at this point while considering only the
2176 * current state. Otherwise, the next evaluation cycle may only see
2177 * that the evaluations remain the same (true for samples n-1 and n) and
2178 * the client will never know that the condition has been met.
2179 */
2180 cds_list_for_each_entry_safe(client_list_element, tmp,
2181 &client_list->list, node) {
2182 ret = evaluate_condition_for_client(trigger, condition,
2183 client_list_element->client, state);
2184 if (ret) {
2185 goto error_free_client_list;
2186 }
2187 }
2188
2189 /*
2190 * Client list ownership transferred to the
2191 * notification_trigger_clients_ht.
2192 */
2193 client_list = NULL;
2194
2195 *cmd_result = LTTNG_OK;
2196 error_free_client_list:
2197 if (client_list) {
2198 cds_list_for_each_entry_safe(client_list_element, tmp,
2199 &client_list->list, node) {
2200 free(client_list_element);
2201 }
2202 free(client_list);
2203 }
2204 error_free_ht_element:
2205 free(trigger_ht_element);
2206 error:
2207 if (free_trigger) {
2208 lttng_trigger_destroy(trigger);
2209 }
2210 rcu_read_unlock();
2211 return ret;
2212 }
2213
2214 static
2215 void free_notification_client_list_rcu(struct rcu_head *node)
2216 {
2217 free(caa_container_of(node, struct notification_client_list,
2218 rcu_node));
2219 }
2220
2221 static
2222 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2223 {
2224 free(caa_container_of(node, struct lttng_trigger_ht_element,
2225 rcu_node));
2226 }
2227
2228 static
2229 int handle_notification_thread_command_unregister_trigger(
2230 struct notification_thread_state *state,
2231 struct lttng_trigger *trigger,
2232 enum lttng_error_code *_cmd_reply)
2233 {
2234 struct cds_lfht_iter iter;
2235 struct cds_lfht_node *triggers_ht_node;
2236 struct lttng_channel_trigger_list *trigger_list;
2237 struct notification_client_list *client_list;
2238 struct notification_client_list_element *client_list_element, *tmp;
2239 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2240 struct lttng_condition *condition = lttng_trigger_get_condition(
2241 trigger);
2242 enum lttng_error_code cmd_reply;
2243
2244 rcu_read_lock();
2245
2246 cds_lfht_lookup(state->triggers_ht,
2247 lttng_condition_hash(condition),
2248 match_condition,
2249 condition,
2250 &iter);
2251 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2252 if (!triggers_ht_node) {
2253 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2254 goto end;
2255 } else {
2256 cmd_reply = LTTNG_OK;
2257 }
2258
2259 /* Remove trigger from channel_triggers_ht. */
2260 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2261 channel_triggers_ht_node) {
2262 struct lttng_trigger_list_element *trigger_element, *tmp;
2263
2264 cds_list_for_each_entry_safe(trigger_element, tmp,
2265 &trigger_list->list, node) {
2266 const struct lttng_condition *current_condition =
2267 lttng_trigger_get_const_condition(
2268 trigger_element->trigger);
2269
2270 assert(current_condition);
2271 if (!lttng_condition_is_equal(condition,
2272 current_condition)) {
2273 continue;
2274 }
2275
2276 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2277 cds_list_del(&trigger_element->node);
2278 /* A trigger can only appear once per channel */
2279 break;
2280 }
2281 }
2282
2283 /*
2284 * Remove and release the client list from
2285 * notification_trigger_clients_ht.
2286 */
2287 client_list = get_client_list_from_condition(state, condition);
2288 assert(client_list);
2289
2290 cds_list_for_each_entry_safe(client_list_element, tmp,
2291 &client_list->list, node) {
2292 free(client_list_element);
2293 }
2294 cds_lfht_del(state->notification_trigger_clients_ht,
2295 &client_list->notification_trigger_ht_node);
2296 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
2297
2298 /* Remove trigger from triggers_ht. */
2299 trigger_ht_element = caa_container_of(triggers_ht_node,
2300 struct lttng_trigger_ht_element, node);
2301 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2302
2303 /* Release the ownership of the trigger. */
2304 lttng_trigger_destroy(trigger_ht_element->trigger);
2305 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2306 end:
2307 rcu_read_unlock();
2308 if (_cmd_reply) {
2309 *_cmd_reply = cmd_reply;
2310 }
2311 return 0;
2312 }
2313
2314 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2315 int handle_notification_thread_command(
2316 struct notification_thread_handle *handle,
2317 struct notification_thread_state *state)
2318 {
2319 int ret;
2320 uint64_t counter;
2321 struct notification_thread_command *cmd;
2322
2323 /* Read the event pipe to put it back into a quiescent state. */
2324 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2325 sizeof(counter));
2326 if (ret != sizeof(counter)) {
2327 goto error;
2328 }
2329
2330 pthread_mutex_lock(&handle->cmd_queue.lock);
2331 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2332 struct notification_thread_command, cmd_list_node);
2333 switch (cmd->type) {
2334 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2335 DBG("[notification-thread] Received register trigger command");
2336 ret = handle_notification_thread_command_register_trigger(
2337 state, cmd->parameters.trigger,
2338 &cmd->reply_code);
2339 break;
2340 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2341 DBG("[notification-thread] Received unregister trigger command");
2342 ret = handle_notification_thread_command_unregister_trigger(
2343 state, cmd->parameters.trigger,
2344 &cmd->reply_code);
2345 break;
2346 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2347 DBG("[notification-thread] Received add channel command");
2348 ret = handle_notification_thread_command_add_channel(
2349 state,
2350 cmd->parameters.add_channel.session.name,
2351 cmd->parameters.add_channel.session.uid,
2352 cmd->parameters.add_channel.session.gid,
2353 cmd->parameters.add_channel.channel.name,
2354 cmd->parameters.add_channel.channel.domain,
2355 cmd->parameters.add_channel.channel.key,
2356 cmd->parameters.add_channel.channel.capacity,
2357 &cmd->reply_code);
2358 break;
2359 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2360 DBG("[notification-thread] Received remove channel command");
2361 ret = handle_notification_thread_command_remove_channel(
2362 state, cmd->parameters.remove_channel.key,
2363 cmd->parameters.remove_channel.domain,
2364 &cmd->reply_code);
2365 break;
2366 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2367 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2368 DBG("[notification-thread] Received session rotation %s command",
2369 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2370 "ongoing" : "completed");
2371 ret = handle_notification_thread_command_session_rotation(
2372 state,
2373 cmd->type,
2374 cmd->parameters.session_rotation.session_name,
2375 cmd->parameters.session_rotation.uid,
2376 cmd->parameters.session_rotation.gid,
2377 cmd->parameters.session_rotation.trace_archive_chunk_id,
2378 cmd->parameters.session_rotation.location,
2379 &cmd->reply_code);
2380 break;
2381 case NOTIFICATION_COMMAND_TYPE_QUIT:
2382 DBG("[notification-thread] Received quit command");
2383 cmd->reply_code = LTTNG_OK;
2384 ret = 1;
2385 goto end;
2386 default:
2387 ERR("[notification-thread] Unknown internal command received");
2388 goto error_unlock;
2389 }
2390
2391 if (ret) {
2392 goto error_unlock;
2393 }
2394 end:
2395 cds_list_del(&cmd->cmd_list_node);
2396 lttng_waiter_wake_up(&cmd->reply_waiter);
2397 pthread_mutex_unlock(&handle->cmd_queue.lock);
2398 return ret;
2399 error_unlock:
2400 /* Wake-up and return a fatal error to the calling thread. */
2401 lttng_waiter_wake_up(&cmd->reply_waiter);
2402 pthread_mutex_unlock(&handle->cmd_queue.lock);
2403 cmd->reply_code = LTTNG_ERR_FATAL;
2404 error:
2405 /* Indicate a fatal error to the caller. */
2406 return -1;
2407 }
2408
2409 static
2410 int socket_set_non_blocking(int socket)
2411 {
2412 int ret, flags;
2413
2414 /* Set the pipe as non-blocking. */
2415 ret = fcntl(socket, F_GETFL, 0);
2416 if (ret == -1) {
2417 PERROR("fcntl get socket flags");
2418 goto end;
2419 }
2420 flags = ret;
2421
2422 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2423 if (ret == -1) {
2424 PERROR("fcntl set O_NONBLOCK socket flag");
2425 goto end;
2426 }
2427 DBG("Client socket (fd = %i) set as non-blocking", socket);
2428 end:
2429 return ret;
2430 }
2431
2432 static
2433 int client_reset_inbound_state(struct notification_client *client)
2434 {
2435 int ret;
2436
2437 ret = lttng_dynamic_buffer_set_size(
2438 &client->communication.inbound.buffer, 0);
2439 assert(!ret);
2440
2441 client->communication.inbound.bytes_to_receive =
2442 sizeof(struct lttng_notification_channel_message);
2443 client->communication.inbound.msg_type =
2444 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2445 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2446 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2447 ret = lttng_dynamic_buffer_set_size(
2448 &client->communication.inbound.buffer,
2449 client->communication.inbound.bytes_to_receive);
2450 return ret;
2451 }
2452
2453 int handle_notification_thread_client_connect(
2454 struct notification_thread_state *state)
2455 {
2456 int ret;
2457 struct notification_client *client;
2458
2459 DBG("[notification-thread] Handling new notification channel client connection");
2460
2461 client = zmalloc(sizeof(*client));
2462 if (!client) {
2463 /* Fatal error. */
2464 ret = -1;
2465 goto error;
2466 }
2467 client->id = state->next_notification_client_id++;
2468 CDS_INIT_LIST_HEAD(&client->condition_list);
2469 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2470 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2471 client->communication.inbound.expect_creds = true;
2472 ret = client_reset_inbound_state(client);
2473 if (ret) {
2474 ERR("[notification-thread] Failed to reset client communication's inbound state");
2475 ret = 0;
2476 goto error;
2477 }
2478
2479 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2480 if (ret < 0) {
2481 ERR("[notification-thread] Failed to accept new notification channel client connection");
2482 ret = 0;
2483 goto error;
2484 }
2485
2486 client->socket = ret;
2487
2488 ret = socket_set_non_blocking(client->socket);
2489 if (ret) {
2490 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2491 goto error;
2492 }
2493
2494 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2495 if (ret < 0) {
2496 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2497 ret = 0;
2498 goto error;
2499 }
2500
2501 ret = lttng_poll_add(&state->events, client->socket,
2502 LPOLLIN | LPOLLERR |
2503 LPOLLHUP | LPOLLRDHUP);
2504 if (ret < 0) {
2505 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2506 ret = 0;
2507 goto error;
2508 }
2509 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2510 client->socket);
2511
2512 rcu_read_lock();
2513 cds_lfht_add(state->client_socket_ht,
2514 hash_client_socket(client->socket),
2515 &client->client_socket_ht_node);
2516 cds_lfht_add(state->client_id_ht,
2517 hash_client_id(client->id),
2518 &client->client_id_ht_node);
2519 rcu_read_unlock();
2520
2521 return ret;
2522 error:
2523 notification_client_destroy(client, state);
2524 return ret;
2525 }
2526
2527 int handle_notification_thread_client_disconnect(
2528 int client_socket,
2529 struct notification_thread_state *state)
2530 {
2531 int ret = 0;
2532 struct notification_client *client;
2533
2534 rcu_read_lock();
2535 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2536 client_socket);
2537 client = get_client_from_socket(client_socket, state);
2538 if (!client) {
2539 /* Internal state corruption, fatal error. */
2540 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2541 client_socket);
2542 ret = -1;
2543 goto end;
2544 }
2545
2546 ret = lttng_poll_del(&state->events, client_socket);
2547 if (ret) {
2548 ERR("[notification-thread] Failed to remove client socket from poll set");
2549 }
2550 cds_lfht_del(state->client_socket_ht,
2551 &client->client_socket_ht_node);
2552 cds_lfht_del(state->client_id_ht,
2553 &client->client_id_ht_node);
2554 notification_client_destroy(client, state);
2555 end:
2556 rcu_read_unlock();
2557 return ret;
2558 }
2559
2560 int handle_notification_thread_client_disconnect_all(
2561 struct notification_thread_state *state)
2562 {
2563 struct cds_lfht_iter iter;
2564 struct notification_client *client;
2565 bool error_encoutered = false;
2566
2567 rcu_read_lock();
2568 DBG("[notification-thread] Closing all client connections");
2569 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2570 client_socket_ht_node) {
2571 int ret;
2572
2573 ret = handle_notification_thread_client_disconnect(
2574 client->socket, state);
2575 if (ret) {
2576 error_encoutered = true;
2577 }
2578 }
2579 rcu_read_unlock();
2580 return error_encoutered ? 1 : 0;
2581 }
2582
2583 int handle_notification_thread_trigger_unregister_all(
2584 struct notification_thread_state *state)
2585 {
2586 bool error_occurred = false;
2587 struct cds_lfht_iter iter;
2588 struct lttng_trigger_ht_element *trigger_ht_element;
2589
2590 rcu_read_lock();
2591 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2592 node) {
2593 int ret = handle_notification_thread_command_unregister_trigger(
2594 state, trigger_ht_element->trigger, NULL);
2595 if (ret) {
2596 error_occurred = true;
2597 }
2598 }
2599 rcu_read_unlock();
2600 return error_occurred ? -1 : 0;
2601 }
2602
2603 static
2604 int client_flush_outgoing_queue(struct notification_client *client,
2605 struct notification_thread_state *state)
2606 {
2607 ssize_t ret;
2608 size_t to_send_count;
2609
2610 assert(client->communication.outbound.buffer.size != 0);
2611 to_send_count = client->communication.outbound.buffer.size;
2612 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2613 client->socket);
2614
2615 ret = lttcomm_send_unix_sock_non_block(client->socket,
2616 client->communication.outbound.buffer.data,
2617 to_send_count);
2618 if ((ret >= 0 && ret < to_send_count)) {
2619 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2620 client->socket);
2621 to_send_count -= max(ret, 0);
2622
2623 memcpy(client->communication.outbound.buffer.data,
2624 client->communication.outbound.buffer.data +
2625 client->communication.outbound.buffer.size - to_send_count,
2626 to_send_count);
2627 ret = lttng_dynamic_buffer_set_size(
2628 &client->communication.outbound.buffer,
2629 to_send_count);
2630 if (ret) {
2631 goto error;
2632 }
2633
2634 /*
2635 * We want to be notified whenever there is buffer space
2636 * available to send the rest of the payload.
2637 */
2638 ret = lttng_poll_mod(&state->events, client->socket,
2639 CLIENT_POLL_MASK_IN_OUT);
2640 if (ret) {
2641 goto error;
2642 }
2643 } else if (ret < 0) {
2644 /* Generic error, disconnect the client. */
2645 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2646 client->socket);
2647 ret = handle_notification_thread_client_disconnect(
2648 client->socket, state);
2649 if (ret) {
2650 goto error;
2651 }
2652 } else {
2653 /* No error and flushed the queue completely. */
2654 ret = lttng_dynamic_buffer_set_size(
2655 &client->communication.outbound.buffer, 0);
2656 if (ret) {
2657 goto error;
2658 }
2659 ret = lttng_poll_mod(&state->events, client->socket,
2660 CLIENT_POLL_MASK_IN);
2661 if (ret) {
2662 goto error;
2663 }
2664
2665 client->communication.outbound.queued_command_reply = false;
2666 client->communication.outbound.dropped_notification = false;
2667 }
2668
2669 return 0;
2670 error:
2671 return -1;
2672 }
2673
2674 static
2675 int client_send_command_reply(struct notification_client *client,
2676 struct notification_thread_state *state,
2677 enum lttng_notification_channel_status status)
2678 {
2679 int ret;
2680 struct lttng_notification_channel_command_reply reply = {
2681 .status = (int8_t) status,
2682 };
2683 struct lttng_notification_channel_message msg = {
2684 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2685 .size = sizeof(reply),
2686 };
2687 char buffer[sizeof(msg) + sizeof(reply)];
2688
2689 if (client->communication.outbound.queued_command_reply) {
2690 /* Protocol error. */
2691 goto error;
2692 }
2693
2694 memcpy(buffer, &msg, sizeof(msg));
2695 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2696 DBG("[notification-thread] Send command reply (%i)", (int) status);
2697
2698 /* Enqueue buffer to outgoing queue and flush it. */
2699 ret = lttng_dynamic_buffer_append(
2700 &client->communication.outbound.buffer,
2701 buffer, sizeof(buffer));
2702 if (ret) {
2703 goto error;
2704 }
2705
2706 ret = client_flush_outgoing_queue(client, state);
2707 if (ret) {
2708 goto error;
2709 }
2710
2711 if (client->communication.outbound.buffer.size != 0) {
2712 /* Queue could not be emptied. */
2713 client->communication.outbound.queued_command_reply = true;
2714 }
2715
2716 return 0;
2717 error:
2718 return -1;
2719 }
2720
2721 static
2722 int client_dispatch_message(struct notification_client *client,
2723 struct notification_thread_state *state)
2724 {
2725 int ret = 0;
2726
2727 if (client->communication.inbound.msg_type !=
2728 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2729 client->communication.inbound.msg_type !=
2730 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2731 !client->validated) {
2732 WARN("[notification-thread] client attempted a command before handshake");
2733 ret = -1;
2734 goto end;
2735 }
2736
2737 switch (client->communication.inbound.msg_type) {
2738 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2739 {
2740 /*
2741 * Receiving message header. The function will be called again
2742 * once the rest of the message as been received and can be
2743 * interpreted.
2744 */
2745 const struct lttng_notification_channel_message *msg;
2746
2747 assert(sizeof(*msg) ==
2748 client->communication.inbound.buffer.size);
2749 msg = (const struct lttng_notification_channel_message *)
2750 client->communication.inbound.buffer.data;
2751
2752 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2753 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2754 ret = -1;
2755 goto end;
2756 }
2757
2758 switch (msg->type) {
2759 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2760 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2761 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2762 break;
2763 default:
2764 ret = -1;
2765 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2766 goto end;
2767 }
2768
2769 client->communication.inbound.bytes_to_receive = msg->size;
2770 client->communication.inbound.msg_type =
2771 (enum lttng_notification_channel_message_type) msg->type;
2772 ret = lttng_dynamic_buffer_set_size(
2773 &client->communication.inbound.buffer, msg->size);
2774 if (ret) {
2775 goto end;
2776 }
2777 break;
2778 }
2779 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2780 {
2781 struct lttng_notification_channel_command_handshake *handshake_client;
2782 struct lttng_notification_channel_command_handshake handshake_reply = {
2783 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2784 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2785 };
2786 struct lttng_notification_channel_message msg_header = {
2787 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2788 .size = sizeof(handshake_reply),
2789 };
2790 enum lttng_notification_channel_status status =
2791 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2792 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2793
2794 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2795 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2796 sizeof(handshake_reply));
2797
2798 handshake_client =
2799 (struct lttng_notification_channel_command_handshake *)
2800 client->communication.inbound.buffer.data;
2801 client->major = handshake_client->major;
2802 client->minor = handshake_client->minor;
2803 if (!client->communication.inbound.creds_received) {
2804 ERR("[notification-thread] No credentials received from client");
2805 ret = -1;
2806 goto end;
2807 }
2808
2809 client->uid = LTTNG_SOCK_GET_UID_CRED(
2810 &client->communication.inbound.creds);
2811 client->gid = LTTNG_SOCK_GET_GID_CRED(
2812 &client->communication.inbound.creds);
2813 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2814 client->uid, client->gid, (int) client->major,
2815 (int) client->minor);
2816
2817 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2818 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2819 }
2820
2821 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2822 send_buffer, sizeof(send_buffer));
2823 if (ret) {
2824 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2825 goto end;
2826 }
2827
2828 ret = client_flush_outgoing_queue(client, state);
2829 if (ret) {
2830 goto end;
2831 }
2832
2833 ret = client_send_command_reply(client, state, status);
2834 if (ret) {
2835 ERR("[notification-thread] Failed to send reply to notification channel client");
2836 goto end;
2837 }
2838
2839 /* Set reception state to receive the next message header. */
2840 ret = client_reset_inbound_state(client);
2841 if (ret) {
2842 ERR("[notification-thread] Failed to reset client communication's inbound state");
2843 goto end;
2844 }
2845 client->validated = true;
2846 client->communication.active = true;
2847 break;
2848 }
2849 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2850 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2851 {
2852 struct lttng_condition *condition;
2853 enum lttng_notification_channel_status status =
2854 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2855 struct lttng_payload_view condition_view =
2856 lttng_payload_view_from_dynamic_buffer(
2857 &client->communication.inbound.buffer,
2858 0, -1);
2859 size_t expected_condition_size =
2860 client->communication.inbound.buffer.size;
2861
2862 ret = lttng_condition_create_from_payload(&condition_view,
2863 &condition);
2864 if (ret != expected_condition_size) {
2865 ERR("[notification-thread] Malformed condition received from client");
2866 goto end;
2867 }
2868
2869 if (client->communication.inbound.msg_type ==
2870 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2871 ret = notification_thread_client_subscribe(client,
2872 condition, state, &status);
2873 } else {
2874 ret = notification_thread_client_unsubscribe(client,
2875 condition, state, &status);
2876 }
2877 if (ret) {
2878 goto end;
2879 }
2880
2881 ret = client_send_command_reply(client, state, status);
2882 if (ret) {
2883 ERR("[notification-thread] Failed to send reply to notification channel client");
2884 goto end;
2885 }
2886
2887 /* Set reception state to receive the next message header. */
2888 ret = client_reset_inbound_state(client);
2889 if (ret) {
2890 ERR("[notification-thread] Failed to reset client communication's inbound state");
2891 goto end;
2892 }
2893 break;
2894 }
2895 default:
2896 abort();
2897 }
2898 end:
2899 return ret;
2900 }
2901
2902 /* Incoming data from client. */
2903 int handle_notification_thread_client_in(
2904 struct notification_thread_state *state, int socket)
2905 {
2906 int ret = 0;
2907 struct notification_client *client;
2908 ssize_t recv_ret;
2909 size_t offset;
2910
2911 client = get_client_from_socket(socket, state);
2912 if (!client) {
2913 /* Internal error, abort. */
2914 ret = -1;
2915 goto end;
2916 }
2917
2918 offset = client->communication.inbound.buffer.size -
2919 client->communication.inbound.bytes_to_receive;
2920 if (client->communication.inbound.expect_creds) {
2921 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2922 client->communication.inbound.buffer.data + offset,
2923 client->communication.inbound.bytes_to_receive,
2924 &client->communication.inbound.creds);
2925 if (recv_ret > 0) {
2926 client->communication.inbound.expect_creds = false;
2927 client->communication.inbound.creds_received = true;
2928 }
2929 } else {
2930 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2931 client->communication.inbound.buffer.data + offset,
2932 client->communication.inbound.bytes_to_receive);
2933 }
2934 if (recv_ret < 0) {
2935 goto error_disconnect_client;
2936 }
2937
2938 client->communication.inbound.bytes_to_receive -= recv_ret;
2939 if (client->communication.inbound.bytes_to_receive == 0) {
2940 ret = client_dispatch_message(client, state);
2941 if (ret) {
2942 /*
2943 * Only returns an error if this client must be
2944 * disconnected.
2945 */
2946 goto error_disconnect_client;
2947 }
2948 } else {
2949 goto end;
2950 }
2951 end:
2952 return ret;
2953 error_disconnect_client:
2954 ret = handle_notification_thread_client_disconnect(socket, state);
2955 return ret;
2956 }
2957
2958 /* Client ready to receive outgoing data. */
2959 int handle_notification_thread_client_out(
2960 struct notification_thread_state *state, int socket)
2961 {
2962 int ret;
2963 struct notification_client *client;
2964
2965 client = get_client_from_socket(socket, state);
2966 if (!client) {
2967 /* Internal error, abort. */
2968 ret = -1;
2969 goto end;
2970 }
2971
2972 ret = client_flush_outgoing_queue(client, state);
2973 if (ret) {
2974 goto end;
2975 }
2976 end:
2977 return ret;
2978 }
2979
2980 static
2981 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2982 const struct channel_state_sample *sample,
2983 uint64_t buffer_capacity)
2984 {
2985 bool result = false;
2986 uint64_t threshold;
2987 enum lttng_condition_type condition_type;
2988 const struct lttng_condition_buffer_usage *use_condition = container_of(
2989 condition, struct lttng_condition_buffer_usage,
2990 parent);
2991
2992 if (use_condition->threshold_bytes.set) {
2993 threshold = use_condition->threshold_bytes.value;
2994 } else {
2995 /*
2996 * Threshold was expressed as a ratio.
2997 *
2998 * TODO the threshold (in bytes) of conditions expressed
2999 * as a ratio of total buffer size could be cached to
3000 * forego this double-multiplication or it could be performed
3001 * as fixed-point math.
3002 *
3003 * Note that caching should accommodates the case where the
3004 * condition applies to multiple channels (i.e. don't assume
3005 * that all channels matching my_chann* have the same size...)
3006 */
3007 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3008 (double) buffer_capacity);
3009 }
3010
3011 condition_type = lttng_condition_get_type(condition);
3012 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3013 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3014 threshold, sample->highest_usage);
3015
3016 /*
3017 * The low condition should only be triggered once _all_ of the
3018 * streams in a channel have gone below the "low" threshold.
3019 */
3020 if (sample->highest_usage <= threshold) {
3021 result = true;
3022 }
3023 } else {
3024 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3025 threshold, sample->highest_usage);
3026
3027 /*
3028 * For high buffer usage scenarios, we want to trigger whenever
3029 * _any_ of the streams has reached the "high" threshold.
3030 */
3031 if (sample->highest_usage >= threshold) {
3032 result = true;
3033 }
3034 }
3035
3036 return result;
3037 }
3038
3039 static
3040 bool evaluate_session_consumed_size_condition(
3041 const struct lttng_condition *condition,
3042 uint64_t session_consumed_size)
3043 {
3044 uint64_t threshold;
3045 const struct lttng_condition_session_consumed_size *size_condition =
3046 container_of(condition,
3047 struct lttng_condition_session_consumed_size,
3048 parent);
3049
3050 threshold = size_condition->consumed_threshold_bytes.value;
3051 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3052 threshold, session_consumed_size);
3053 return session_consumed_size >= threshold;
3054 }
3055
3056 static
3057 int evaluate_buffer_condition(const struct lttng_condition *condition,
3058 struct lttng_evaluation **evaluation,
3059 const struct notification_thread_state *state,
3060 const struct channel_state_sample *previous_sample,
3061 const struct channel_state_sample *latest_sample,
3062 uint64_t previous_session_consumed_total,
3063 uint64_t latest_session_consumed_total,
3064 struct channel_info *channel_info)
3065 {
3066 int ret = 0;
3067 enum lttng_condition_type condition_type;
3068 const bool previous_sample_available = !!previous_sample;
3069 bool previous_sample_result = false;
3070 bool latest_sample_result;
3071
3072 condition_type = lttng_condition_get_type(condition);
3073
3074 switch (condition_type) {
3075 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3076 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3077 if (caa_likely(previous_sample_available)) {
3078 previous_sample_result =
3079 evaluate_buffer_usage_condition(condition,
3080 previous_sample, channel_info->capacity);
3081 }
3082 latest_sample_result = evaluate_buffer_usage_condition(
3083 condition, latest_sample,
3084 channel_info->capacity);
3085 break;
3086 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3087 if (caa_likely(previous_sample_available)) {
3088 previous_sample_result =
3089 evaluate_session_consumed_size_condition(
3090 condition,
3091 previous_session_consumed_total);
3092 }
3093 latest_sample_result =
3094 evaluate_session_consumed_size_condition(
3095 condition,
3096 latest_session_consumed_total);
3097 break;
3098 default:
3099 /* Unknown condition type; internal error. */
3100 abort();
3101 }
3102
3103 if (!latest_sample_result ||
3104 (previous_sample_result == latest_sample_result)) {
3105 /*
3106 * Only trigger on a condition evaluation transition.
3107 *
3108 * NOTE: This edge-triggered logic may not be appropriate for
3109 * future condition types.
3110 */
3111 goto end;
3112 }
3113
3114 if (!evaluation || !latest_sample_result) {
3115 goto end;
3116 }
3117
3118 switch (condition_type) {
3119 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3120 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3121 *evaluation = lttng_evaluation_buffer_usage_create(
3122 condition_type,
3123 latest_sample->highest_usage,
3124 channel_info->capacity);
3125 break;
3126 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3127 *evaluation = lttng_evaluation_session_consumed_size_create(
3128 latest_session_consumed_total);
3129 break;
3130 default:
3131 abort();
3132 }
3133
3134 if (!*evaluation) {
3135 ret = -1;
3136 goto end;
3137 }
3138 end:
3139 return ret;
3140 }
3141
3142 static
3143 int client_enqueue_dropped_notification(struct notification_client *client)
3144 {
3145 int ret;
3146 struct lttng_notification_channel_message msg = {
3147 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3148 .size = 0,
3149 };
3150
3151 ret = lttng_dynamic_buffer_append(
3152 &client->communication.outbound.buffer, &msg,
3153 sizeof(msg));
3154 return ret;
3155 }
3156
3157 /*
3158 * Permission checks relative to notification channel clients are performed
3159 * here. Notice how object, client, and trigger credentials are involved in
3160 * this check.
3161 *
3162 * The `object` credentials are the credentials associated with the "subject"
3163 * of a condition. For instance, a `rotation completed` condition applies
3164 * to a session. When that condition is met, it will produce an evaluation
3165 * against a session. Hence, in this case, the `object` credentials are the
3166 * credentials of the "subject" session.
3167 *
3168 * The `trigger` credentials are the credentials of the user that registered the
3169 * trigger.
3170 *
3171 * The `client` credentials are the credentials of the user that created a given
3172 * notification channel.
3173 *
3174 * In terms of visibility, it is expected that non-privilieged users can only
3175 * register triggers against "their" objects (their own sessions and
3176 * applications they are allowed to interact with). They can then open a
3177 * notification channel and subscribe to notifications associated with those
3178 * triggers.
3179 *
3180 * As for privilieged users, they can register triggers against the objects of
3181 * other users. They can then subscribe to the notifications associated to their
3182 * triggers. Privilieged users _can't_ subscribe to the notifications of
3183 * triggers owned by other users; they must create their own triggers.
3184 *
3185 * This is more a concern of usability than security. It would be difficult for
3186 * a root user reliably subscribe to a specific set of conditions without
3187 * interference from external users (those could, for instance, unregister
3188 * their triggers).
3189 */
3190 static
3191 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3192 const struct lttng_evaluation *evaluation,
3193 struct notification_client_list* client_list,
3194 struct notification_thread_state *state,
3195 uid_t object_uid, gid_t object_gid)
3196 {
3197 int ret = 0;
3198 struct lttng_payload msg_payload;
3199 struct notification_client_list_element *client_list_element, *tmp;
3200 const struct lttng_notification notification = {
3201 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3202 .evaluation = (struct lttng_evaluation *) evaluation,
3203 };
3204 struct lttng_notification_channel_message msg_header = {
3205 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3206 };
3207 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
3208
3209 lttng_payload_init(&msg_payload);
3210
3211 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3212 sizeof(msg_header));
3213 if (ret) {
3214 goto end;
3215 }
3216
3217 ret = lttng_notification_serialize(&notification, &msg_payload);
3218 if (ret) {
3219 ERR("[notification-thread] Failed to serialize notification");
3220 ret = -1;
3221 goto end;
3222 }
3223
3224 /* Update payload size. */
3225 ((struct lttng_notification_channel_message * ) msg_payload.buffer.data)->size =
3226 (uint32_t) (msg_payload.buffer.size - sizeof(msg_header));
3227
3228 cds_list_for_each_entry_safe(client_list_element, tmp,
3229 &client_list->list, node) {
3230 struct notification_client *client =
3231 client_list_element->client;
3232
3233 if (client->uid != object_uid && client->gid != object_gid &&
3234 client->uid != 0) {
3235 /* Client is not allowed to monitor this channel. */
3236 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3237 continue;
3238 }
3239
3240 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3241 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3242 continue;
3243 }
3244
3245 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3246 client->socket, msg_payload.buffer.size);
3247 if (client->communication.outbound.buffer.size) {
3248 /*
3249 * Outgoing data is already buffered for this client;
3250 * drop the notification and enqueue a "dropped
3251 * notification" message if this is the first dropped
3252 * notification since the socket spilled-over to the
3253 * queue.
3254 */
3255 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3256 client->socket);
3257 if (!client->communication.outbound.dropped_notification) {
3258 client->communication.outbound.dropped_notification = true;
3259 ret = client_enqueue_dropped_notification(
3260 client);
3261 if (ret) {
3262 goto end;
3263 }
3264 }
3265 continue;
3266 }
3267
3268 ret = lttng_dynamic_buffer_append_buffer(
3269 &client->communication.outbound.buffer,
3270 &msg_payload.buffer);
3271 if (ret) {
3272 goto end;
3273 }
3274
3275 ret = client_flush_outgoing_queue(client, state);
3276 if (ret) {
3277 goto end;
3278 }
3279 }
3280 ret = 0;
3281 end:
3282 lttng_payload_reset(&msg_payload);
3283 return ret;
3284 }
3285
3286 int handle_notification_thread_channel_sample(
3287 struct notification_thread_state *state, int pipe,
3288 enum lttng_domain_type domain)
3289 {
3290 int ret = 0;
3291 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3292 struct channel_info *channel_info;
3293 struct cds_lfht_node *node;
3294 struct cds_lfht_iter iter;
3295 struct lttng_channel_trigger_list *trigger_list;
3296 struct lttng_trigger_list_element *trigger_list_element;
3297 bool previous_sample_available = false;
3298 struct channel_state_sample previous_sample, latest_sample;
3299 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3300
3301 /*
3302 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3303 * ensuring that read/write of sampling messages are atomic.
3304 */
3305 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3306 if (ret != sizeof(sample_msg)) {
3307 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3308 pipe);
3309 ret = -1;
3310 goto end;
3311 }
3312
3313 ret = 0;
3314 latest_sample.key.key = sample_msg.key;
3315 latest_sample.key.domain = domain;
3316 latest_sample.highest_usage = sample_msg.highest;
3317 latest_sample.lowest_usage = sample_msg.lowest;
3318 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3319
3320 rcu_read_lock();
3321
3322 /* Retrieve the channel's informations */
3323 cds_lfht_lookup(state->channels_ht,
3324 hash_channel_key(&latest_sample.key),
3325 match_channel_info,
3326 &latest_sample.key,
3327 &iter);
3328 node = cds_lfht_iter_get_node(&iter);
3329 if (caa_unlikely(!node)) {
3330 /*
3331 * Not an error since the consumer can push a sample to the pipe
3332 * and the rest of the session daemon could notify us of the
3333 * channel's destruction before we get a chance to process that
3334 * sample.
3335 */
3336 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3337 latest_sample.key.key,
3338 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3339 "user space");
3340 goto end_unlock;
3341 }
3342 channel_info = caa_container_of(node, struct channel_info,
3343 channels_ht_node);
3344 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3345 channel_info->name,
3346 latest_sample.key.key,
3347 channel_info->session_info->name,
3348 latest_sample.highest_usage,
3349 latest_sample.lowest_usage,
3350 latest_sample.channel_total_consumed);
3351
3352 previous_session_consumed_total =
3353 channel_info->session_info->consumed_data_size;
3354
3355 /* Retrieve the channel's last sample, if it exists, and update it. */
3356 cds_lfht_lookup(state->channel_state_ht,
3357 hash_channel_key(&latest_sample.key),
3358 match_channel_state_sample,
3359 &latest_sample.key,
3360 &iter);
3361 node = cds_lfht_iter_get_node(&iter);
3362 if (caa_likely(node)) {
3363 struct channel_state_sample *stored_sample;
3364
3365 /* Update the sample stored. */
3366 stored_sample = caa_container_of(node,
3367 struct channel_state_sample,
3368 channel_state_ht_node);
3369
3370 memcpy(&previous_sample, stored_sample,
3371 sizeof(previous_sample));
3372 stored_sample->highest_usage = latest_sample.highest_usage;
3373 stored_sample->lowest_usage = latest_sample.lowest_usage;
3374 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3375 previous_sample_available = true;
3376
3377 latest_session_consumed_total =
3378 previous_session_consumed_total +
3379 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3380 } else {
3381 /*
3382 * This is the channel's first sample, allocate space for and
3383 * store the new sample.
3384 */
3385 struct channel_state_sample *stored_sample;
3386
3387 stored_sample = zmalloc(sizeof(*stored_sample));
3388 if (!stored_sample) {
3389 ret = -1;
3390 goto end_unlock;
3391 }
3392
3393 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3394 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3395 cds_lfht_add(state->channel_state_ht,
3396 hash_channel_key(&stored_sample->key),
3397 &stored_sample->channel_state_ht_node);
3398
3399 latest_session_consumed_total =
3400 previous_session_consumed_total +
3401 latest_sample.channel_total_consumed;
3402 }
3403
3404 channel_info->session_info->consumed_data_size =
3405 latest_session_consumed_total;
3406
3407 /* Find triggers associated with this channel. */
3408 cds_lfht_lookup(state->channel_triggers_ht,
3409 hash_channel_key(&latest_sample.key),
3410 match_channel_trigger_list,
3411 &latest_sample.key,
3412 &iter);
3413 node = cds_lfht_iter_get_node(&iter);
3414 if (caa_likely(!node)) {
3415 goto end_unlock;
3416 }
3417
3418 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3419 channel_triggers_ht_node);
3420 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3421 node) {
3422 const struct lttng_condition *condition;
3423 const struct lttng_action *action;
3424 const struct lttng_trigger *trigger;
3425 struct notification_client_list *client_list;
3426 struct lttng_evaluation *evaluation = NULL;
3427
3428 trigger = trigger_list_element->trigger;
3429 condition = lttng_trigger_get_const_condition(trigger);
3430 assert(condition);
3431 action = lttng_trigger_get_const_action(trigger);
3432
3433 /* Notify actions are the only type currently supported. */
3434 assert(lttng_action_get_type_const(action) ==
3435 LTTNG_ACTION_TYPE_NOTIFY);
3436
3437 /*
3438 * Check if any client is subscribed to the result of this
3439 * evaluation.
3440 */
3441 client_list = get_client_list_from_condition(state, condition);
3442 assert(client_list);
3443 if (cds_list_empty(&client_list->list)) {
3444 /*
3445 * No clients interested in the evaluation's result,
3446 * skip it.
3447 */
3448 continue;
3449 }
3450
3451 ret = evaluate_buffer_condition(condition, &evaluation, state,
3452 previous_sample_available ? &previous_sample : NULL,
3453 &latest_sample,
3454 previous_session_consumed_total,
3455 latest_session_consumed_total,
3456 channel_info);
3457 if (caa_unlikely(ret)) {
3458 goto end_unlock;
3459 }
3460
3461 if (caa_likely(!evaluation)) {
3462 continue;
3463 }
3464
3465 /* Dispatch evaluation result to all clients. */
3466 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3467 evaluation, client_list, state