3f8cb4804ddd0800061f634afb3d6e92022e8103
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 /*
126 * Thread safety of notification_client and notification_client_list.
127 *
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
133 *
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
138 *
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
142 * sense.
143 */
144 struct notification_client_list {
145 pthread_mutex_t lock;
146 struct urcu_ref ref;
147 const struct lttng_trigger *trigger;
148 struct cds_list_head list;
149 /* Weak reference to container. */
150 struct cds_lfht *notification_trigger_clients_ht;
151 struct cds_lfht_node notification_trigger_clients_ht_node;
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node;
154 };
155
156 struct notification_client {
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock;
159 notification_client_id id;
160 int socket;
161 /* Client protocol version. */
162 uint8_t major, minor;
163 uid_t uid;
164 gid_t gid;
165 /*
166 * Indicates if the credentials and versions of the client have been
167 * checked.
168 */
169 bool validated;
170 /*
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
174 */
175 struct cds_list_head condition_list;
176 struct cds_lfht_node client_socket_ht_node;
177 struct cds_lfht_node client_id_ht_node;
178 struct {
179 /*
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
184 * clean-up.
185 */
186 bool active;
187 struct {
188 /*
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
192 */
193 struct lttng_dynamic_buffer buffer;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type;
198 /*
199 * Indicates whether or not credentials are expected
200 * from the client.
201 */
202 bool expect_creds;
203 /*
204 * Indicates whether or not credentials were received
205 * from the client.
206 */
207 bool creds_received;
208 /* Only used during credentials reception. */
209 lttng_sock_cred creds;
210 } inbound;
211 struct {
212 /*
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
215 * already buffered.
216 *
217 * A notification is dropped whenever the buffer is not
218 * empty.
219 */
220 bool dropped_notification;
221 /*
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
227 */
228 bool queued_command_reply;
229 struct lttng_dynamic_buffer buffer;
230 } outbound;
231 } communication;
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node;
234 };
235
236 struct channel_state_sample {
237 struct channel_key key;
238 struct cds_lfht_node channel_state_ht_node;
239 uint64_t highest_usage;
240 uint64_t lowest_usage;
241 uint64_t channel_total_consumed;
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node;
244 };
245
246 static unsigned long hash_channel_key(struct channel_key *key);
247 static int evaluate_buffer_condition(const struct lttng_condition *condition,
248 struct lttng_evaluation **evaluation,
249 const struct notification_thread_state *state,
250 const struct channel_state_sample *previous_sample,
251 const struct channel_state_sample *latest_sample,
252 uint64_t previous_session_consumed_total,
253 uint64_t latest_session_consumed_total,
254 struct channel_info *channel_info);
255 static
256 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
257 const struct lttng_evaluation *evaluation,
258 struct notification_client_list *client_list,
259 struct notification_thread_state *state,
260 uid_t channel_uid, gid_t channel_gid);
261
262
263 /* session_info API */
264 static
265 void session_info_destroy(void *_data);
266 static
267 void session_info_get(struct session_info *session_info);
268 static
269 void session_info_put(struct session_info *session_info);
270 static
271 struct session_info *session_info_create(const char *name,
272 uid_t uid, gid_t gid,
273 struct lttng_session_trigger_list *trigger_list,
274 struct cds_lfht *sessions_ht);
275 static
276 void session_info_add_channel(struct session_info *session_info,
277 struct channel_info *channel_info);
278 static
279 void session_info_remove_channel(struct session_info *session_info,
280 struct channel_info *channel_info);
281
282 /* lttng_session_trigger_list API */
283 static
284 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
285 const char *session_name,
286 struct cds_lfht *session_triggers_ht);
287 static
288 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
289 const struct notification_thread_state *state,
290 const char *session_name);
291 static
292 void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list *list);
294 static
295 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
296 const struct lttng_trigger *trigger);
297
298
299 static
300 int match_client_socket(struct cds_lfht_node *node, const void *key)
301 {
302 /* This double-cast is intended to supress pointer-to-cast warning. */
303 const int socket = (int) (intptr_t) key;
304 const struct notification_client *client = caa_container_of(node,
305 struct notification_client, client_socket_ht_node);
306
307 return client->socket == socket;
308 }
309
310 static
311 int match_client_id(struct cds_lfht_node *node, const void *key)
312 {
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id = *((notification_client_id *) key);
315 const struct notification_client *client = caa_container_of(
316 node, struct notification_client, client_id_ht_node);
317
318 return client->id == id;
319 }
320
321 static
322 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
323 {
324 struct channel_key *channel_key = (struct channel_key *) key;
325 struct lttng_channel_trigger_list *trigger_list;
326
327 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
328 channel_triggers_ht_node);
329
330 return !!((channel_key->key == trigger_list->channel_key.key) &&
331 (channel_key->domain == trigger_list->channel_key.domain));
332 }
333
334 static
335 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
336 {
337 const char *session_name = (const char *) key;
338 struct lttng_session_trigger_list *trigger_list;
339
340 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
341 session_triggers_ht_node);
342
343 return !!(strcmp(trigger_list->session_name, session_name) == 0);
344 }
345
346 static
347 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
348 {
349 struct channel_key *channel_key = (struct channel_key *) key;
350 struct channel_state_sample *sample;
351
352 sample = caa_container_of(node, struct channel_state_sample,
353 channel_state_ht_node);
354
355 return !!((channel_key->key == sample->key.key) &&
356 (channel_key->domain == sample->key.domain));
357 }
358
359 static
360 int match_channel_info(struct cds_lfht_node *node, const void *key)
361 {
362 struct channel_key *channel_key = (struct channel_key *) key;
363 struct channel_info *channel_info;
364
365 channel_info = caa_container_of(node, struct channel_info,
366 channels_ht_node);
367
368 return !!((channel_key->key == channel_info->key.key) &&
369 (channel_key->domain == channel_info->key.domain));
370 }
371
372 static
373 int match_condition(struct cds_lfht_node *node, const void *key)
374 {
375 struct lttng_condition *condition_key = (struct lttng_condition *) key;
376 struct lttng_trigger_ht_element *trigger;
377 struct lttng_condition *condition;
378
379 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
380 node);
381 condition = lttng_trigger_get_condition(trigger->trigger);
382 assert(condition);
383
384 return !!lttng_condition_is_equal(condition_key, condition);
385 }
386
387 static
388 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
389 {
390 struct lttng_condition *condition_key = (struct lttng_condition *) key;
391 struct notification_client_list *client_list;
392 const struct lttng_condition *condition;
393
394 assert(condition_key);
395
396 client_list = caa_container_of(node, struct notification_client_list,
397 notification_trigger_clients_ht_node);
398 condition = lttng_trigger_get_const_condition(client_list->trigger);
399
400 return !!lttng_condition_is_equal(condition_key, condition);
401 }
402
403 static
404 int match_session(struct cds_lfht_node *node, const void *key)
405 {
406 const char *name = key;
407 struct session_info *session_info = caa_container_of(
408 node, struct session_info, sessions_ht_node);
409
410 return !strcmp(session_info->name, name);
411 }
412
413 static
414 unsigned long lttng_condition_buffer_usage_hash(
415 const struct lttng_condition *_condition)
416 {
417 unsigned long hash;
418 unsigned long condition_type;
419 struct lttng_condition_buffer_usage *condition;
420
421 condition = container_of(_condition,
422 struct lttng_condition_buffer_usage, parent);
423
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
426 if (condition->session_name) {
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 }
429 if (condition->channel_name) {
430 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
431 }
432 if (condition->domain.set) {
433 hash ^= hash_key_ulong(
434 (void *) condition->domain.type,
435 lttng_ht_seed);
436 }
437 if (condition->threshold_ratio.set) {
438 uint64_t val;
439
440 val = condition->threshold_ratio.value * (double) UINT32_MAX;
441 hash ^= hash_key_u64(&val, lttng_ht_seed);
442 } else if (condition->threshold_bytes.set) {
443 uint64_t val;
444
445 val = condition->threshold_bytes.value;
446 hash ^= hash_key_u64(&val, lttng_ht_seed);
447 }
448 return hash;
449 }
450
451 static
452 unsigned long lttng_condition_session_consumed_size_hash(
453 const struct lttng_condition *_condition)
454 {
455 unsigned long hash;
456 unsigned long condition_type =
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
458 struct lttng_condition_session_consumed_size *condition;
459 uint64_t val;
460
461 condition = container_of(_condition,
462 struct lttng_condition_session_consumed_size, parent);
463
464 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
465 if (condition->session_name) {
466 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
467 }
468 val = condition->consumed_threshold_bytes.value;
469 hash ^= hash_key_u64(&val, lttng_ht_seed);
470 return hash;
471 }
472
473 static
474 unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition *_condition)
476 {
477 unsigned long hash, condition_type;
478 struct lttng_condition_session_rotation *condition;
479
480 condition = container_of(_condition,
481 struct lttng_condition_session_rotation, parent);
482 condition_type = (unsigned long) condition->parent.type;
483 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
484 assert(condition->session_name);
485 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
486 return hash;
487 }
488
489 /*
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
493 */
494 static
495 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
496 {
497 switch (condition->type) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
500 return lttng_condition_buffer_usage_hash(condition);
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
502 return lttng_condition_session_consumed_size_hash(condition);
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
505 return lttng_condition_session_rotation_hash(condition);
506 default:
507 ERR("[notification-thread] Unexpected condition type caught");
508 abort();
509 }
510 }
511
512 static
513 unsigned long hash_channel_key(struct channel_key *key)
514 {
515 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
516 unsigned long domain_hash = hash_key_ulong(
517 (void *) (unsigned long) key->domain, lttng_ht_seed);
518
519 return key_hash ^ domain_hash;
520 }
521
522 static
523 unsigned long hash_client_socket(int socket)
524 {
525 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
526 }
527
528 static
529 unsigned long hash_client_id(notification_client_id id)
530 {
531 return hash_key_u64(&id, lttng_ht_seed);
532 }
533
534 /*
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
538 *
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
541 */
542 static
543 enum lttng_object_type get_condition_binding_object(
544 const struct lttng_condition *condition)
545 {
546 switch (lttng_condition_get_type(condition)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
550 return LTTNG_OBJECT_TYPE_CHANNEL;
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
553 return LTTNG_OBJECT_TYPE_SESSION;
554 default:
555 return LTTNG_OBJECT_TYPE_UNKNOWN;
556 }
557 }
558
559 static
560 void free_channel_info_rcu(struct rcu_head *node)
561 {
562 free(caa_container_of(node, struct channel_info, rcu_node));
563 }
564
565 static
566 void channel_info_destroy(struct channel_info *channel_info)
567 {
568 if (!channel_info) {
569 return;
570 }
571
572 if (channel_info->session_info) {
573 session_info_remove_channel(channel_info->session_info,
574 channel_info);
575 session_info_put(channel_info->session_info);
576 }
577 if (channel_info->name) {
578 free(channel_info->name);
579 }
580 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
581 }
582
583 static
584 void free_session_info_rcu(struct rcu_head *node)
585 {
586 free(caa_container_of(node, struct session_info, rcu_node));
587 }
588
589 /* Don't call directly, use the ref-counting mechanism. */
590 static
591 void session_info_destroy(void *_data)
592 {
593 struct session_info *session_info = _data;
594 int ret;
595
596 assert(session_info);
597 if (session_info->channel_infos_ht) {
598 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
599 if (ret) {
600 ERR("[notification-thread] Failed to destroy channel information hash table");
601 }
602 }
603 lttng_session_trigger_list_destroy(session_info->trigger_list);
604
605 rcu_read_lock();
606 cds_lfht_del(session_info->sessions_ht,
607 &session_info->sessions_ht_node);
608 rcu_read_unlock();
609 free(session_info->name);
610 call_rcu(&session_info->rcu_node, free_session_info_rcu);
611 }
612
613 static
614 void session_info_get(struct session_info *session_info)
615 {
616 if (!session_info) {
617 return;
618 }
619 lttng_ref_get(&session_info->ref);
620 }
621
622 static
623 void session_info_put(struct session_info *session_info)
624 {
625 if (!session_info) {
626 return;
627 }
628 lttng_ref_put(&session_info->ref);
629 }
630
631 static
632 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
633 struct lttng_session_trigger_list *trigger_list,
634 struct cds_lfht *sessions_ht)
635 {
636 struct session_info *session_info;
637
638 assert(name);
639
640 session_info = zmalloc(sizeof(*session_info));
641 if (!session_info) {
642 goto end;
643 }
644 lttng_ref_init(&session_info->ref, session_info_destroy);
645
646 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
647 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
648 if (!session_info->channel_infos_ht) {
649 goto error;
650 }
651
652 cds_lfht_node_init(&session_info->sessions_ht_node);
653 session_info->name = strdup(name);
654 if (!session_info->name) {
655 goto error;
656 }
657 session_info->uid = uid;
658 session_info->gid = gid;
659 session_info->trigger_list = trigger_list;
660 session_info->sessions_ht = sessions_ht;
661 end:
662 return session_info;
663 error:
664 session_info_put(session_info);
665 return NULL;
666 }
667
668 static
669 void session_info_add_channel(struct session_info *session_info,
670 struct channel_info *channel_info)
671 {
672 rcu_read_lock();
673 cds_lfht_add(session_info->channel_infos_ht,
674 hash_channel_key(&channel_info->key),
675 &channel_info->session_info_channels_ht_node);
676 rcu_read_unlock();
677 }
678
679 static
680 void session_info_remove_channel(struct session_info *session_info,
681 struct channel_info *channel_info)
682 {
683 rcu_read_lock();
684 cds_lfht_del(session_info->channel_infos_ht,
685 &channel_info->session_info_channels_ht_node);
686 rcu_read_unlock();
687 }
688
689 static
690 struct channel_info *channel_info_create(const char *channel_name,
691 struct channel_key *channel_key, uint64_t channel_capacity,
692 struct session_info *session_info)
693 {
694 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
695
696 if (!channel_info) {
697 goto end;
698 }
699
700 cds_lfht_node_init(&channel_info->channels_ht_node);
701 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
702 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
703 channel_info->capacity = channel_capacity;
704
705 channel_info->name = strdup(channel_name);
706 if (!channel_info->name) {
707 goto error;
708 }
709
710 /*
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
714 */
715 session_info_get(session_info);
716 session_info_add_channel(session_info, channel_info);
717 channel_info->session_info = session_info;
718 end:
719 return channel_info;
720 error:
721 channel_info_destroy(channel_info);
722 return NULL;
723 }
724
725 static
726 bool notification_client_list_get(struct notification_client_list *list)
727 {
728 return urcu_ref_get_unless_zero(&list->ref);
729 }
730
731 static
732 void free_notification_client_list_rcu(struct rcu_head *node)
733 {
734 free(caa_container_of(node, struct notification_client_list,
735 rcu_node));
736 }
737
738 static
739 void notification_client_list_release(struct urcu_ref *list_ref)
740 {
741 struct notification_client_list *list =
742 container_of(list_ref, typeof(*list), ref);
743 struct notification_client_list_element *client_list_element, *tmp;
744
745 if (list->notification_trigger_clients_ht) {
746 rcu_read_lock();
747 cds_lfht_del(list->notification_trigger_clients_ht,
748 &list->notification_trigger_clients_ht_node);
749 rcu_read_unlock();
750 list->notification_trigger_clients_ht = NULL;
751 }
752 cds_list_for_each_entry_safe(client_list_element, tmp,
753 &list->list, node) {
754 free(client_list_element);
755 }
756 pthread_mutex_destroy(&list->lock);
757 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
758 }
759
760 static
761 struct notification_client_list *notification_client_list_create(
762 const struct lttng_trigger *trigger)
763 {
764 struct notification_client_list *client_list =
765 zmalloc(sizeof(*client_list));
766
767 if (!client_list) {
768 goto error;
769 }
770 pthread_mutex_init(&client_list->lock, NULL);
771 urcu_ref_init(&client_list->ref);
772 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
773 CDS_INIT_LIST_HEAD(&client_list->list);
774 client_list->trigger = trigger;
775 error:
776 return client_list;
777 }
778
779 static
780 void publish_notification_client_list(
781 struct notification_thread_state *state,
782 struct notification_client_list *list)
783 {
784 const struct lttng_condition *condition =
785 lttng_trigger_get_const_condition(list->trigger);
786
787 assert(!list->notification_trigger_clients_ht);
788
789 list->notification_trigger_clients_ht =
790 state->notification_trigger_clients_ht;
791
792 rcu_read_lock();
793 cds_lfht_add(state->notification_trigger_clients_ht,
794 lttng_condition_hash(condition),
795 &list->notification_trigger_clients_ht_node);
796 rcu_read_unlock();
797 }
798
799 static
800 void notification_client_list_put(struct notification_client_list *list)
801 {
802 if (!list) {
803 return;
804 }
805 return urcu_ref_put(&list->ref, notification_client_list_release);
806 }
807
808 /* Provides a reference to the returned list. */
809 static
810 struct notification_client_list *get_client_list_from_condition(
811 struct notification_thread_state *state,
812 const struct lttng_condition *condition)
813 {
814 struct cds_lfht_node *node;
815 struct cds_lfht_iter iter;
816 struct notification_client_list *list = NULL;
817
818 rcu_read_lock();
819 cds_lfht_lookup(state->notification_trigger_clients_ht,
820 lttng_condition_hash(condition),
821 match_client_list_condition,
822 condition,
823 &iter);
824 node = cds_lfht_iter_get_node(&iter);
825 if (node) {
826 list = container_of(node, struct notification_client_list,
827 notification_trigger_clients_ht_node);
828 list = notification_client_list_get(list) ? list : NULL;
829 }
830
831 rcu_read_unlock();
832 return list;
833 }
834
835 static
836 int evaluate_channel_condition_for_client(
837 const struct lttng_condition *condition,
838 struct notification_thread_state *state,
839 struct lttng_evaluation **evaluation,
840 uid_t *session_uid, gid_t *session_gid)
841 {
842 int ret;
843 struct cds_lfht_iter iter;
844 struct cds_lfht_node *node;
845 struct channel_info *channel_info = NULL;
846 struct channel_key *channel_key = NULL;
847 struct channel_state_sample *last_sample = NULL;
848 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
849
850 rcu_read_lock();
851
852 /* Find the channel associated with the condition. */
853 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
854 channel_trigger_list, channel_triggers_ht_node) {
855 struct lttng_trigger_list_element *element;
856
857 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
858 const struct lttng_condition *current_condition =
859 lttng_trigger_get_const_condition(
860 element->trigger);
861
862 assert(current_condition);
863 if (!lttng_condition_is_equal(condition,
864 current_condition)) {
865 continue;
866 }
867
868 /* Found the trigger, save the channel key. */
869 channel_key = &channel_trigger_list->channel_key;
870 break;
871 }
872 if (channel_key) {
873 /* The channel key was found stop iteration. */
874 break;
875 }
876 }
877
878 if (!channel_key){
879 /* No channel found; normal exit. */
880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
881 ret = 0;
882 goto end;
883 }
884
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state->channels_ht,
887 hash_channel_key(channel_key),
888 match_channel_info,
889 channel_key,
890 &iter);
891 node = cds_lfht_iter_get_node(&iter);
892 assert(node);
893 channel_info = caa_container_of(node, struct channel_info,
894 channels_ht_node);
895
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state->channel_state_ht,
898 hash_channel_key(channel_key),
899 match_channel_state_sample,
900 channel_key,
901 &iter);
902 node = cds_lfht_iter_get_node(&iter);
903 if (node) {
904 last_sample = caa_container_of(node,
905 struct channel_state_sample,
906 channel_state_ht_node);
907 } else {
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
910 ret = 0;
911 goto end;
912 }
913
914 ret = evaluate_buffer_condition(condition, evaluation, state,
915 NULL, last_sample,
916 0, channel_info->session_info->consumed_data_size,
917 channel_info);
918 if (ret) {
919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
920 goto end;
921 }
922
923 *session_uid = channel_info->session_info->uid;
924 *session_gid = channel_info->session_info->gid;
925 end:
926 rcu_read_unlock();
927 return ret;
928 }
929
930 static
931 const char *get_condition_session_name(const struct lttng_condition *condition)
932 {
933 const char *session_name = NULL;
934 enum lttng_condition_status status;
935
936 switch (lttng_condition_get_type(condition)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
939 status = lttng_condition_buffer_usage_get_session_name(
940 condition, &session_name);
941 break;
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
943 status = lttng_condition_session_consumed_size_get_session_name(
944 condition, &session_name);
945 break;
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
948 status = lttng_condition_session_rotation_get_session_name(
949 condition, &session_name);
950 break;
951 default:
952 abort();
953 }
954 if (status != LTTNG_CONDITION_STATUS_OK) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
956 goto end;
957 }
958 end:
959 return session_name;
960 }
961
962 static
963 int evaluate_session_condition_for_client(
964 const struct lttng_condition *condition,
965 struct notification_thread_state *state,
966 struct lttng_evaluation **evaluation,
967 uid_t *session_uid, gid_t *session_gid)
968 {
969 int ret;
970 struct cds_lfht_iter iter;
971 struct cds_lfht_node *node;
972 const char *session_name;
973 struct session_info *session_info = NULL;
974
975 rcu_read_lock();
976 session_name = get_condition_session_name(condition);
977
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state->sessions_ht,
980 hash_key_str(session_name, lttng_ht_seed),
981 match_session,
982 session_name,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 if (!node) {
986 DBG("[notification-thread] No known session matching name \"%s\"",
987 session_name);
988 ret = 0;
989 goto end;
990 }
991
992 session_info = caa_container_of(node, struct session_info,
993 sessions_ht_node);
994 session_info_get(session_info);
995
996 /*
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
999 */
1000 switch (lttng_condition_get_type(condition)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1002 if (!session_info->rotation.ongoing) {
1003 ret = 0;
1004 goto end_session_put;
1005 }
1006
1007 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1008 session_info->rotation.id);
1009 if (!*evaluation) {
1010 /* Fatal error. */
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info->name);
1013 ret = -1;
1014 goto end_session_put;
1015 }
1016 ret = 0;
1017 break;
1018 default:
1019 ret = 0;
1020 goto end_session_put;
1021 }
1022
1023 *session_uid = session_info->uid;
1024 *session_gid = session_info->gid;
1025
1026 end_session_put:
1027 session_info_put(session_info);
1028 end:
1029 rcu_read_unlock();
1030 return ret;
1031 }
1032
1033 static
1034 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1035 const struct lttng_condition *condition,
1036 struct notification_client *client,
1037 struct notification_thread_state *state)
1038 {
1039 int ret;
1040 struct lttng_evaluation *evaluation = NULL;
1041 struct notification_client_list client_list = {
1042 .lock = PTHREAD_MUTEX_INITIALIZER,
1043 };
1044 struct notification_client_list_element client_list_element = { 0 };
1045 uid_t object_uid = 0;
1046 gid_t object_gid = 0;
1047
1048 assert(trigger);
1049 assert(condition);
1050 assert(client);
1051 assert(state);
1052
1053 switch (get_condition_binding_object(condition)) {
1054 case LTTNG_OBJECT_TYPE_SESSION:
1055 ret = evaluate_session_condition_for_client(condition, state,
1056 &evaluation, &object_uid, &object_gid);
1057 break;
1058 case LTTNG_OBJECT_TYPE_CHANNEL:
1059 ret = evaluate_channel_condition_for_client(condition, state,
1060 &evaluation, &object_uid, &object_gid);
1061 break;
1062 case LTTNG_OBJECT_TYPE_NONE:
1063 ret = 0;
1064 goto end;
1065 case LTTNG_OBJECT_TYPE_UNKNOWN:
1066 default:
1067 ret = -1;
1068 goto end;
1069 }
1070 if (ret) {
1071 /* Fatal error. */
1072 goto end;
1073 }
1074 if (!evaluation) {
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1077 ret = 0;
1078 goto end;
1079 }
1080
1081 /*
1082 * Create a temporary client list with the client currently
1083 * subscribing.
1084 */
1085 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1086 CDS_INIT_LIST_HEAD(&client_list.list);
1087 client_list.trigger = trigger;
1088
1089 CDS_INIT_LIST_HEAD(&client_list_element.node);
1090 client_list_element.client = client;
1091 cds_list_add(&client_list_element.node, &client_list.list);
1092
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1096 state, object_uid, object_gid);
1097
1098 end:
1099 return ret;
1100 }
1101
1102 static
1103 int notification_thread_client_subscribe(struct notification_client *client,
1104 struct lttng_condition *condition,
1105 struct notification_thread_state *state,
1106 enum lttng_notification_channel_status *_status)
1107 {
1108 int ret = 0;
1109 struct notification_client_list *client_list = NULL;
1110 struct lttng_condition_list_element *condition_list_element = NULL;
1111 struct notification_client_list_element *client_list_element = NULL;
1112 enum lttng_notification_channel_status status =
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1114
1115 /*
1116 * Ensure that the client has not already subscribed to this condition
1117 * before.
1118 */
1119 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1120 if (lttng_condition_is_equal(condition_list_element->condition,
1121 condition)) {
1122 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1123 goto end;
1124 }
1125 }
1126
1127 condition_list_element = zmalloc(sizeof(*condition_list_element));
1128 if (!condition_list_element) {
1129 ret = -1;
1130 goto error;
1131 }
1132 client_list_element = zmalloc(sizeof(*client_list_element));
1133 if (!client_list_element) {
1134 ret = -1;
1135 goto error;
1136 }
1137
1138 /*
1139 * Add the newly-subscribed condition to the client's subscription list.
1140 */
1141 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1142 condition_list_element->condition = condition;
1143 cds_list_add(&condition_list_element->node, &client->condition_list);
1144
1145 client_list = get_client_list_from_condition(state, condition);
1146 if (!client_list) {
1147 /*
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1151 */
1152 free(client_list_element);
1153 goto end;
1154 }
1155
1156 /*
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
1160 *
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
1164 */
1165 if (evaluate_condition_for_client(client_list->trigger, condition,
1166 client, state)) {
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1168 ret = -1;
1169 free(client_list_element);
1170 goto end;
1171 }
1172
1173 /*
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1176 * added prior.
1177 */
1178 client_list_element->client = client;
1179 CDS_INIT_LIST_HEAD(&client_list_element->node);
1180
1181 pthread_mutex_lock(&client_list->lock);
1182 cds_list_add(&client_list_element->node, &client_list->list);
1183 pthread_mutex_unlock(&client_list->lock);
1184 end:
1185 if (_status) {
1186 *_status = status;
1187 }
1188 if (client_list) {
1189 notification_client_list_put(client_list);
1190 }
1191 return ret;
1192 error:
1193 free(condition_list_element);
1194 free(client_list_element);
1195 return ret;
1196 }
1197
1198 static
1199 int notification_thread_client_unsubscribe(
1200 struct notification_client *client,
1201 struct lttng_condition *condition,
1202 struct notification_thread_state *state,
1203 enum lttng_notification_channel_status *_status)
1204 {
1205 struct notification_client_list *client_list;
1206 struct lttng_condition_list_element *condition_list_element,
1207 *condition_tmp;
1208 struct notification_client_list_element *client_list_element,
1209 *client_tmp;
1210 bool condition_found = false;
1211 enum lttng_notification_channel_status status =
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1213
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1216 &client->condition_list, node) {
1217 if (!lttng_condition_is_equal(condition_list_element->condition,
1218 condition)) {
1219 continue;
1220 }
1221
1222 cds_list_del(&condition_list_element->node);
1223 /*
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1227 */
1228 if (condition != condition_list_element->condition) {
1229 lttng_condition_destroy(
1230 condition_list_element->condition);
1231 }
1232 free(condition_list_element);
1233 condition_found = true;
1234 break;
1235 }
1236
1237 if (!condition_found) {
1238 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1239 goto end;
1240 }
1241
1242 /*
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1245 */
1246 client_list = get_client_list_from_condition(state, condition);
1247 if (!client_list) {
1248 goto end;
1249 }
1250
1251 pthread_mutex_lock(&client_list->lock);
1252 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1253 &client_list->list, node) {
1254 if (client_list_element->client->id != client->id) {
1255 continue;
1256 }
1257 cds_list_del(&client_list_element->node);
1258 free(client_list_element);
1259 break;
1260 }
1261 pthread_mutex_unlock(&client_list->lock);
1262 notification_client_list_put(client_list);
1263 client_list = NULL;
1264 end:
1265 lttng_condition_destroy(condition);
1266 if (_status) {
1267 *_status = status;
1268 }
1269 return 0;
1270 }
1271
1272 static
1273 void free_notification_client_rcu(struct rcu_head *node)
1274 {
1275 free(caa_container_of(node, struct notification_client, rcu_node));
1276 }
1277
1278 static
1279 void notification_client_destroy(struct notification_client *client,
1280 struct notification_thread_state *state)
1281 {
1282 if (!client) {
1283 return;
1284 }
1285
1286 /*
1287 * The client object is not reachable by other threads, no need to lock
1288 * the client here.
1289 */
1290 if (client->socket >= 0) {
1291 (void) lttcomm_close_unix_sock(client->socket);
1292 client->socket = -1;
1293 }
1294 client->communication.active = false;
1295 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1296 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1297 pthread_mutex_destroy(&client->lock);
1298 call_rcu(&client->rcu_node, free_notification_client_rcu);
1299 }
1300
1301 /*
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1303 * client pointer).
1304 */
1305 static
1306 struct notification_client *get_client_from_socket(int socket,
1307 struct notification_thread_state *state)
1308 {
1309 struct cds_lfht_iter iter;
1310 struct cds_lfht_node *node;
1311 struct notification_client *client = NULL;
1312
1313 cds_lfht_lookup(state->client_socket_ht,
1314 hash_client_socket(socket),
1315 match_client_socket,
1316 (void *) (unsigned long) socket,
1317 &iter);
1318 node = cds_lfht_iter_get_node(&iter);
1319 if (!node) {
1320 goto end;
1321 }
1322
1323 client = caa_container_of(node, struct notification_client,
1324 client_socket_ht_node);
1325 end:
1326 return client;
1327 }
1328
1329 static
1330 bool buffer_usage_condition_applies_to_channel(
1331 const struct lttng_condition *condition,
1332 const struct channel_info *channel_info)
1333 {
1334 enum lttng_condition_status status;
1335 enum lttng_domain_type condition_domain;
1336 const char *condition_session_name = NULL;
1337 const char *condition_channel_name = NULL;
1338
1339 status = lttng_condition_buffer_usage_get_domain_type(condition,
1340 &condition_domain);
1341 assert(status == LTTNG_CONDITION_STATUS_OK);
1342 if (channel_info->key.domain != condition_domain) {
1343 goto fail;
1344 }
1345
1346 status = lttng_condition_buffer_usage_get_session_name(
1347 condition, &condition_session_name);
1348 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1349
1350 status = lttng_condition_buffer_usage_get_channel_name(
1351 condition, &condition_channel_name);
1352 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1353
1354 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1355 goto fail;
1356 }
1357 if (strcmp(channel_info->name, condition_channel_name)) {
1358 goto fail;
1359 }
1360
1361 return true;
1362 fail:
1363 return false;
1364 }
1365
1366 static
1367 bool session_consumed_size_condition_applies_to_channel(
1368 const struct lttng_condition *condition,
1369 const struct channel_info *channel_info)
1370 {
1371 enum lttng_condition_status status;
1372 const char *condition_session_name = NULL;
1373
1374 status = lttng_condition_session_consumed_size_get_session_name(
1375 condition, &condition_session_name);
1376 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1377
1378 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1379 goto fail;
1380 }
1381
1382 return true;
1383 fail:
1384 return false;
1385 }
1386
1387 static
1388 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1389 const struct channel_info *channel_info)
1390 {
1391 const struct lttng_condition *condition;
1392 bool trigger_applies;
1393
1394 condition = lttng_trigger_get_const_condition(trigger);
1395 if (!condition) {
1396 goto fail;
1397 }
1398
1399 switch (lttng_condition_get_type(condition)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1402 trigger_applies = buffer_usage_condition_applies_to_channel(
1403 condition, channel_info);
1404 break;
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1406 trigger_applies = session_consumed_size_condition_applies_to_channel(
1407 condition, channel_info);
1408 break;
1409 default:
1410 goto fail;
1411 }
1412
1413 return trigger_applies;
1414 fail:
1415 return false;
1416 }
1417
1418 static
1419 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1420 struct notification_client *client)
1421 {
1422 bool applies = false;
1423 struct lttng_condition_list_element *condition_list_element;
1424
1425 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1426 node) {
1427 applies = lttng_condition_is_equal(
1428 condition_list_element->condition,
1429 lttng_trigger_get_condition(trigger));
1430 if (applies) {
1431 break;
1432 }
1433 }
1434 return applies;
1435 }
1436
1437 /* Must be called with RCU read lock held. */
1438 static
1439 struct lttng_session_trigger_list *get_session_trigger_list(
1440 struct notification_thread_state *state,
1441 const char *session_name)
1442 {
1443 struct lttng_session_trigger_list *list = NULL;
1444 struct cds_lfht_node *node;
1445 struct cds_lfht_iter iter;
1446
1447 cds_lfht_lookup(state->session_triggers_ht,
1448 hash_key_str(session_name, lttng_ht_seed),
1449 match_session_trigger_list,
1450 session_name,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 if (!node) {
1454 /*
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1457 */
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1459 session_name);
1460 goto end;
1461 }
1462
1463 list = caa_container_of(node,
1464 struct lttng_session_trigger_list,
1465 session_triggers_ht_node);
1466 end:
1467 return list;
1468 }
1469
1470 /*
1471 * Allocate an empty lttng_session_trigger_list for the session named
1472 * 'session_name'.
1473 *
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1477 */
1478 static
1479 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1480 const char *session_name,
1481 struct cds_lfht *session_triggers_ht)
1482 {
1483 struct lttng_session_trigger_list *list;
1484
1485 list = zmalloc(sizeof(*list));
1486 if (!list) {
1487 goto end;
1488 }
1489 list->session_name = session_name;
1490 CDS_INIT_LIST_HEAD(&list->list);
1491 cds_lfht_node_init(&list->session_triggers_ht_node);
1492 list->session_triggers_ht = session_triggers_ht;
1493
1494 rcu_read_lock();
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht,
1497 hash_key_str(session_name, lttng_ht_seed),
1498 &list->session_triggers_ht_node);
1499 rcu_read_unlock();
1500 end:
1501 return list;
1502 }
1503
1504 static
1505 void free_session_trigger_list_rcu(struct rcu_head *node)
1506 {
1507 free(caa_container_of(node, struct lttng_session_trigger_list,
1508 rcu_node));
1509 }
1510
1511 static
1512 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1513 {
1514 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1515
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1518 &list->list, node) {
1519 cds_list_del(&trigger_list_element->node);
1520 free(trigger_list_element);
1521 }
1522 rcu_read_lock();
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list->session_triggers_ht,
1525 &list->session_triggers_ht_node);
1526 rcu_read_unlock();
1527 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1528 }
1529
1530 static
1531 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1532 const struct lttng_trigger *trigger)
1533 {
1534 int ret = 0;
1535 struct lttng_trigger_list_element *new_element =
1536 zmalloc(sizeof(*new_element));
1537
1538 if (!new_element) {
1539 ret = -1;
1540 goto end;
1541 }
1542 CDS_INIT_LIST_HEAD(&new_element->node);
1543 new_element->trigger = trigger;
1544 cds_list_add(&new_element->node, &list->list);
1545 end:
1546 return ret;
1547 }
1548
1549 static
1550 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1551 const char *session_name)
1552 {
1553 bool applies = false;
1554 const struct lttng_condition *condition;
1555
1556 condition = lttng_trigger_get_const_condition(trigger);
1557 switch (lttng_condition_get_type(condition)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1560 {
1561 enum lttng_condition_status condition_status;
1562 const char *condition_session_name;
1563
1564 condition_status = lttng_condition_session_rotation_get_session_name(
1565 condition, &condition_session_name);
1566 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1568 goto end;
1569 }
1570
1571 assert(condition_session_name);
1572 applies = !strcmp(condition_session_name, session_name);
1573 break;
1574 }
1575 default:
1576 goto end;
1577 }
1578 end:
1579 return applies;
1580 }
1581
1582 /*
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1585 *
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1589 */
1590 static
1591 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1592 const struct notification_thread_state *state,
1593 const char *session_name)
1594 {
1595 int trigger_count = 0;
1596 struct lttng_session_trigger_list *session_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 struct cds_lfht_iter iter;
1599
1600 session_trigger_list = lttng_session_trigger_list_create(session_name,
1601 state->session_triggers_ht);
1602
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1605 node) {
1606 int ret;
1607
1608 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1609 session_name)) {
1610 continue;
1611 }
1612
1613 ret = lttng_session_trigger_list_add(session_trigger_list,
1614 trigger_ht_element->trigger);
1615 if (ret) {
1616 goto error;
1617 }
1618
1619 trigger_count++;
1620 }
1621
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1623 trigger_count);
1624 return session_trigger_list;
1625 error:
1626 lttng_session_trigger_list_destroy(session_trigger_list);
1627 return NULL;
1628 }
1629
1630 static
1631 struct session_info *find_or_create_session_info(
1632 struct notification_thread_state *state,
1633 const char *name, uid_t uid, gid_t gid)
1634 {
1635 struct session_info *session = NULL;
1636 struct cds_lfht_node *node;
1637 struct cds_lfht_iter iter;
1638 struct lttng_session_trigger_list *trigger_list;
1639
1640 rcu_read_lock();
1641 cds_lfht_lookup(state->sessions_ht,
1642 hash_key_str(name, lttng_ht_seed),
1643 match_session,
1644 name,
1645 &iter);
1646 node = cds_lfht_iter_get_node(&iter);
1647 if (node) {
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1649 name, uid, gid);
1650 session = caa_container_of(node, struct session_info,
1651 sessions_ht_node);
1652 assert(session->uid == uid);
1653 assert(session->gid == gid);
1654 session_info_get(session);
1655 goto end;
1656 }
1657
1658 trigger_list = lttng_session_trigger_list_build(state, name);
1659 if (!trigger_list) {
1660 goto error;
1661 }
1662
1663 session = session_info_create(name, uid, gid, trigger_list,
1664 state->sessions_ht);
1665 if (!session) {
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1667 name, uid, gid);
1668 lttng_session_trigger_list_destroy(trigger_list);
1669 goto error;
1670 }
1671 trigger_list = NULL;
1672
1673 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1674 &session->sessions_ht_node);
1675 end:
1676 rcu_read_unlock();
1677 return session;
1678 error:
1679 rcu_read_unlock();
1680 session_info_put(session);
1681 return NULL;
1682 }
1683
1684 static
1685 int handle_notification_thread_command_add_channel(
1686 struct notification_thread_state *state,
1687 const char *session_name, uid_t session_uid, gid_t session_gid,
1688 const char *channel_name, enum lttng_domain_type channel_domain,
1689 uint64_t channel_key_int, uint64_t channel_capacity,
1690 enum lttng_error_code *cmd_result)
1691 {
1692 struct cds_list_head trigger_list;
1693 struct channel_info *new_channel_info = NULL;
1694 struct channel_key channel_key = {
1695 .key = channel_key_int,
1696 .domain = channel_domain,
1697 };
1698 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1699 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1700 int trigger_count = 0;
1701 struct cds_lfht_iter iter;
1702 struct session_info *session_info = NULL;
1703
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1705 channel_name, session_name, channel_key_int,
1706 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1707
1708 CDS_INIT_LIST_HEAD(&trigger_list);
1709
1710 session_info = find_or_create_session_info(state, session_name,
1711 session_uid, session_gid);
1712 if (!session_info) {
1713 /* Allocation error or an internal error occurred. */
1714 goto error;
1715 }
1716
1717 new_channel_info = channel_info_create(channel_name, &channel_key,
1718 channel_capacity, session_info);
1719 if (!new_channel_info) {
1720 goto error;
1721 }
1722
1723 rcu_read_lock();
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1726 node) {
1727 struct lttng_trigger_list_element *new_element;
1728
1729 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1730 new_channel_info)) {
1731 continue;
1732 }
1733
1734 new_element = zmalloc(sizeof(*new_element));
1735 if (!new_element) {
1736 rcu_read_unlock();
1737 goto error;
1738 }
1739 CDS_INIT_LIST_HEAD(&new_element->node);
1740 new_element->trigger = trigger_ht_element->trigger;
1741 cds_list_add(&new_element->node, &trigger_list);
1742 trigger_count++;
1743 }
1744 rcu_read_unlock();
1745
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1747 trigger_count);
1748 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1749 if (!channel_trigger_list) {
1750 goto error;
1751 }
1752 channel_trigger_list->channel_key = new_channel_info->key;
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1754 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1755 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1756
1757 rcu_read_lock();
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state->channels_ht,
1760 hash_channel_key(&new_channel_info->key),
1761 &new_channel_info->channels_ht_node);
1762 /*
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1765 */
1766 cds_lfht_add(state->channel_triggers_ht,
1767 hash_channel_key(&new_channel_info->key),
1768 &channel_trigger_list->channel_triggers_ht_node);
1769 rcu_read_unlock();
1770 session_info_put(session_info);
1771 *cmd_result = LTTNG_OK;
1772 return 0;
1773 error:
1774 channel_info_destroy(new_channel_info);
1775 session_info_put(session_info);
1776 return 1;
1777 }
1778
1779 static
1780 void free_channel_trigger_list_rcu(struct rcu_head *node)
1781 {
1782 free(caa_container_of(node, struct lttng_channel_trigger_list,
1783 rcu_node));
1784 }
1785
1786 static
1787 void free_channel_state_sample_rcu(struct rcu_head *node)
1788 {
1789 free(caa_container_of(node, struct channel_state_sample,
1790 rcu_node));
1791 }
1792
1793 static
1794 int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state *state,
1796 uint64_t channel_key, enum lttng_domain_type domain,
1797 enum lttng_error_code *cmd_result)
1798 {
1799 struct cds_lfht_node *node;
1800 struct cds_lfht_iter iter;
1801 struct lttng_channel_trigger_list *trigger_list;
1802 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1803 struct channel_key key = { .key = channel_key, .domain = domain };
1804 struct channel_info *channel_info;
1805
1806 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1807 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1808
1809 rcu_read_lock();
1810
1811 cds_lfht_lookup(state->channel_triggers_ht,
1812 hash_channel_key(&key),
1813 match_channel_trigger_list,
1814 &key,
1815 &iter);
1816 node = cds_lfht_iter_get_node(&iter);
1817 /*
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1820 */
1821 if (!node) {
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1823 goto end;
1824 }
1825
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1828 channel_triggers_ht_node);
1829 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1830 &trigger_list->list, node) {
1831 cds_list_del(&trigger_list_element->node);
1832 free(trigger_list_element);
1833 }
1834 cds_lfht_del(state->channel_triggers_ht, node);
1835 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1836
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state->channel_state_ht,
1839 hash_channel_key(&key),
1840 match_channel_state_sample,
1841 &key,
1842 &iter);
1843 node = cds_lfht_iter_get_node(&iter);
1844 /*
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1847 */
1848 if (node) {
1849 struct channel_state_sample *sample = caa_container_of(node,
1850 struct channel_state_sample,
1851 channel_state_ht_node);
1852
1853 cds_lfht_del(state->channel_state_ht, node);
1854 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1855 }
1856
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state->channels_ht,
1859 hash_channel_key(&key),
1860 match_channel_info,
1861 &key,
1862 &iter);
1863 node = cds_lfht_iter_get_node(&iter);
1864 assert(node);
1865 channel_info = caa_container_of(node, struct channel_info,
1866 channels_ht_node);
1867 cds_lfht_del(state->channels_ht, node);
1868 channel_info_destroy(channel_info);
1869 end:
1870 rcu_read_unlock();
1871 *cmd_result = LTTNG_OK;
1872 return 0;
1873 }
1874
1875 static
1876 int handle_notification_thread_command_session_rotation(
1877 struct notification_thread_state *state,
1878 enum notification_thread_command_type cmd_type,
1879 const char *session_name, uid_t session_uid, gid_t session_gid,
1880 uint64_t trace_archive_chunk_id,
1881 struct lttng_trace_archive_location *location,
1882 enum lttng_error_code *_cmd_result)
1883 {
1884 int ret = 0;
1885 enum lttng_error_code cmd_result = LTTNG_OK;
1886 struct lttng_session_trigger_list *trigger_list;
1887 struct lttng_trigger_list_element *trigger_list_element;
1888 struct session_info *session_info;
1889
1890 rcu_read_lock();
1891
1892 session_info = find_or_create_session_info(state, session_name,
1893 session_uid, session_gid);
1894 if (!session_info) {
1895 /* Allocation error or an internal error occurred. */
1896 ret = -1;
1897 cmd_result = LTTNG_ERR_NOMEM;
1898 goto end;
1899 }
1900
1901 session_info->rotation.ongoing =
1902 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1903 session_info->rotation.id = trace_archive_chunk_id;
1904 trigger_list = get_session_trigger_list(state, session_name);
1905 if (!trigger_list) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1907 session_name);
1908 goto end;
1909 }
1910
1911 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1912 node) {
1913 const struct lttng_condition *condition;
1914 const struct lttng_action *action;
1915 const struct lttng_trigger *trigger;
1916 struct notification_client_list *client_list;
1917 struct lttng_evaluation *evaluation = NULL;
1918 enum lttng_condition_type condition_type;
1919 bool client_list_is_empty;
1920
1921 trigger = trigger_list_element->trigger;
1922 condition = lttng_trigger_get_const_condition(trigger);
1923 assert(condition);
1924 condition_type = lttng_condition_get_type(condition);
1925
1926 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1927 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1928 continue;
1929 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1930 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1931 continue;
1932 }
1933
1934 action = lttng_trigger_get_const_action(trigger);
1935
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action) ==
1938 LTTNG_ACTION_TYPE_NOTIFY);
1939
1940 client_list = get_client_list_from_condition(state, condition);
1941 assert(client_list);
1942
1943 pthread_mutex_lock(&client_list->lock);
1944 client_list_is_empty = cds_list_empty(&client_list->list);
1945 pthread_mutex_unlock(&client_list->lock);
1946 if (client_list_is_empty) {
1947 /*
1948 * No clients interested in the evaluation's result,
1949 * skip it.
1950 */
1951 continue;
1952 }
1953
1954 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1955 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id);
1957 } else {
1958 evaluation = lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id, location);
1960 }
1961
1962 if (!evaluation) {
1963 /* Internal error */
1964 ret = -1;
1965 cmd_result = LTTNG_ERR_UNK;
1966 goto put_list;
1967 }
1968
1969 /* Dispatch evaluation result to all clients. */
1970 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1971 evaluation, client_list, state,
1972 session_info->uid,
1973 session_info->gid);
1974 lttng_evaluation_destroy(evaluation);
1975 put_list:
1976 notification_client_list_put(client_list);
1977 if (caa_unlikely(ret)) {
1978 break;
1979 }
1980 }
1981 end:
1982 session_info_put(session_info);
1983 *_cmd_result = cmd_result;
1984 rcu_read_unlock();
1985 return ret;
1986 }
1987
1988 static
1989 int condition_is_supported(struct lttng_condition *condition)
1990 {
1991 int ret;
1992
1993 switch (lttng_condition_get_type(condition)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1996 {
1997 enum lttng_domain_type domain;
1998
1999 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2000 &domain);
2001 if (ret) {
2002 ret = -1;
2003 goto end;
2004 }
2005
2006 if (domain != LTTNG_DOMAIN_KERNEL) {
2007 ret = 1;
2008 goto end;
2009 }
2010
2011 /*
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2015 */
2016 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2017 break;
2018 }
2019 default:
2020 ret = 1;
2021 }
2022 end:
2023 return ret;
2024 }
2025
2026 /* Must be called with RCU read lock held. */
2027 static
2028 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
2029 struct notification_thread_state *state)
2030 {
2031 int ret = 0;
2032 const struct lttng_condition *condition;
2033 const char *session_name;
2034 struct lttng_session_trigger_list *trigger_list;
2035
2036 condition = lttng_trigger_get_const_condition(trigger);
2037 switch (lttng_condition_get_type(condition)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2040 {
2041 enum lttng_condition_status status;
2042
2043 status = lttng_condition_session_rotation_get_session_name(
2044 condition, &session_name);
2045 if (status != LTTNG_CONDITION_STATUS_OK) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2047 ret = -1;
2048 goto end;
2049 }
2050 break;
2051 }
2052 default:
2053 ret = -1;
2054 goto end;
2055 }
2056
2057 trigger_list = get_session_trigger_list(state, session_name);
2058 if (!trigger_list) {
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2060 session_name);
2061 goto end;
2062
2063 }
2064
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2066 session_name);
2067 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2068 end:
2069 return ret;
2070 }
2071
2072 /* Must be called with RCU read lock held. */
2073 static
2074 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
2075 struct notification_thread_state *state)
2076 {
2077 int ret = 0;
2078 struct cds_lfht_node *node;
2079 struct cds_lfht_iter iter;
2080 struct channel_info *channel;
2081
2082 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2083 channels_ht_node) {
2084 struct lttng_trigger_list_element *trigger_list_element;
2085 struct lttng_channel_trigger_list *trigger_list;
2086 struct cds_lfht_iter lookup_iter;
2087
2088 if (!trigger_applies_to_channel(trigger, channel)) {
2089 continue;
2090 }
2091
2092 cds_lfht_lookup(state->channel_triggers_ht,
2093 hash_channel_key(&channel->key),
2094 match_channel_trigger_list,
2095 &channel->key,
2096 &lookup_iter);
2097 node = cds_lfht_iter_get_node(&lookup_iter);
2098 assert(node);
2099 trigger_list = caa_container_of(node,
2100 struct lttng_channel_trigger_list,
2101 channel_triggers_ht_node);
2102
2103 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2104 if (!trigger_list_element) {
2105 ret = -1;
2106 goto end;
2107 }
2108 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2109 trigger_list_element->trigger = trigger;
2110 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2112 channel->name);
2113 }
2114 end:
2115 return ret;
2116 }
2117
2118 /*
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2121 *
2122 * The effects of this are benign since:
2123 * - The client will succeed in registering the trigger, as it is valid,
2124 * - The trigger will, internally, be bound to the channel/session,
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
2127 *
2128 * If this function returns a non-zero value, it means something is
2129 * fundamentally broken and the whole subsystem/thread will be torn down.
2130 *
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2132 * error code.
2133 */
2134 static
2135 int handle_notification_thread_command_register_trigger(
2136 struct notification_thread_state *state,
2137 struct lttng_trigger *trigger,
2138 enum lttng_error_code *cmd_result)
2139 {
2140 int ret = 0;
2141 struct lttng_condition *condition;
2142 struct notification_client *client;
2143 struct notification_client_list *client_list = NULL;
2144 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2145 struct notification_client_list_element *client_list_element, *tmp;
2146 struct cds_lfht_node *node;
2147 struct cds_lfht_iter iter;
2148 bool free_trigger = true;
2149
2150 rcu_read_lock();
2151
2152 condition = lttng_trigger_get_condition(trigger);
2153 assert(condition);
2154
2155 ret = condition_is_supported(condition);
2156 if (ret < 0) {
2157 goto error;
2158 } else if (ret == 0) {
2159 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2160 goto error;
2161 } else {
2162 /* Feature is supported, continue. */
2163 ret = 0;
2164 }
2165
2166 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2167 if (!trigger_ht_element) {
2168 ret = -1;
2169 goto error;
2170 }
2171
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element->node);
2174 trigger_ht_element->trigger = trigger;
2175
2176 node = cds_lfht_add_unique(state->triggers_ht,
2177 lttng_condition_hash(condition),
2178 match_condition,
2179 condition,
2180 &trigger_ht_element->node);
2181 if (node != &trigger_ht_element->node) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2184 goto error_free_ht_element;
2185 }
2186
2187 /*
2188 * Ownership of the trigger and of its wrapper was transfered to
2189 * the triggers_ht.
2190 */
2191 trigger_ht_element = NULL;
2192 free_trigger = false;
2193
2194 /*
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2197 * supported.
2198 */
2199 client_list = notification_client_list_create(trigger);
2200 if (!client_list) {
2201 ret = -1;
2202 goto error_free_ht_element;
2203 }
2204
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2207 client_socket_ht_node) {
2208 if (!trigger_applies_to_client(trigger, client)) {
2209 continue;
2210 }
2211
2212 client_list_element = zmalloc(sizeof(*client_list_element));
2213 if (!client_list_element) {
2214 ret = -1;
2215 goto error_put_client_list;
2216 }
2217 CDS_INIT_LIST_HEAD(&client_list_element->node);
2218 client_list_element->client = client;
2219 cds_list_add(&client_list_element->node, &client_list->list);
2220 }
2221
2222 switch (get_condition_binding_object(condition)) {
2223 case LTTNG_OBJECT_TYPE_SESSION:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret = bind_trigger_to_matching_session(trigger, state);
2226 if (ret) {
2227 goto error_put_client_list;
2228 }
2229 break;
2230 case LTTNG_OBJECT_TYPE_CHANNEL:
2231 /*
2232 * Add the trigger to list of triggers bound to the channels
2233 * currently known.
2234 */
2235 ret = bind_trigger_to_matching_channels(trigger, state);
2236 if (ret) {
2237 goto error_put_client_list;
2238 }
2239 break;
2240 case LTTNG_OBJECT_TYPE_NONE:
2241 break;
2242 default:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2244 ret = -1;
2245 goto error_put_client_list;
2246 }
2247
2248 /*
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2252 *
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2255 * that was broken.
2256 *
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2259 * conditions.
2260 *
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2266 *
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
2273 *
2274 * No need to lock the list here as it has not been published yet.
2275 */
2276 cds_list_for_each_entry_safe(client_list_element, tmp,
2277 &client_list->list, node) {
2278 ret = evaluate_condition_for_client(trigger, condition,
2279 client_list_element->client, state);
2280 if (ret) {
2281 goto error_put_client_list;
2282 }
2283 }
2284
2285 /*
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2288 */
2289 publish_notification_client_list(state, client_list);
2290 client_list = NULL;
2291
2292 *cmd_result = LTTNG_OK;
2293
2294 error_put_client_list:
2295 notification_client_list_put(client_list);
2296
2297 error_free_ht_element:
2298 free(trigger_ht_element);
2299 error:
2300 if (free_trigger) {
2301 lttng_trigger_destroy(trigger);
2302 }
2303 rcu_read_unlock();
2304 return ret;
2305 }
2306
2307 static
2308 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2309 {
2310 free(caa_container_of(node, struct lttng_trigger_ht_element,
2311 rcu_node));
2312 }
2313
2314 static
2315 int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state *state,
2317 struct lttng_trigger *trigger,
2318 enum lttng_error_code *_cmd_reply)
2319 {
2320 struct cds_lfht_iter iter;
2321 struct cds_lfht_node *triggers_ht_node;
2322 struct lttng_channel_trigger_list *trigger_list;
2323 struct notification_client_list *client_list;
2324 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2325 struct lttng_condition *condition = lttng_trigger_get_condition(
2326 trigger);
2327 enum lttng_error_code cmd_reply;
2328
2329 rcu_read_lock();
2330
2331 cds_lfht_lookup(state->triggers_ht,
2332 lttng_condition_hash(condition),
2333 match_condition,
2334 condition,
2335 &iter);
2336 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2337 if (!triggers_ht_node) {
2338 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2339 goto end;
2340 } else {
2341 cmd_reply = LTTNG_OK;
2342 }
2343
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2346 channel_triggers_ht_node) {
2347 struct lttng_trigger_list_element *trigger_element, *tmp;
2348
2349 cds_list_for_each_entry_safe(trigger_element, tmp,
2350 &trigger_list->list, node) {
2351 const struct lttng_condition *current_condition =
2352 lttng_trigger_get_const_condition(
2353 trigger_element->trigger);
2354
2355 assert(current_condition);
2356 if (!lttng_condition_is_equal(condition,
2357 current_condition)) {
2358 continue;
2359 }
2360
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element->node);
2363 /* A trigger can only appear once per channel */
2364 break;
2365 }
2366 }
2367
2368 /*
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2371 */
2372 client_list = get_client_list_from_condition(state, condition);
2373 assert(client_list);
2374
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list);
2377 notification_client_list_put(client_list);
2378 client_list = NULL;
2379
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element = caa_container_of(triggers_ht_node,
2382 struct lttng_trigger_ht_element, node);
2383 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2384
2385 /* Release the ownership of the trigger. */
2386 lttng_trigger_destroy(trigger_ht_element->trigger);
2387 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2388 end:
2389 rcu_read_unlock();
2390 if (_cmd_reply) {
2391 *_cmd_reply = cmd_reply;
2392 }
2393 return 0;
2394 }
2395
2396 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2397 int handle_notification_thread_command(
2398 struct notification_thread_handle *handle,
2399 struct notification_thread_state *state)
2400 {
2401 int ret;
2402 uint64_t counter;
2403 struct notification_thread_command *cmd;
2404
2405 /* Read the event pipe to put it back into a quiescent state. */
2406 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2407 sizeof(counter));
2408 if (ret != sizeof(counter)) {
2409 goto error;
2410 }
2411
2412 pthread_mutex_lock(&handle->cmd_queue.lock);
2413 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2414 struct notification_thread_command, cmd_list_node);
2415 switch (cmd->type) {
2416 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2417 DBG("[notification-thread] Received register trigger command");
2418 ret = handle_notification_thread_command_register_trigger(
2419 state, cmd->parameters.trigger,
2420 &cmd->reply_code);
2421 break;
2422 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2423 DBG("[notification-thread] Received unregister trigger command");
2424 ret = handle_notification_thread_command_unregister_trigger(
2425 state, cmd->parameters.trigger,
2426 &cmd->reply_code);
2427 break;
2428 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2429 DBG("[notification-thread] Received add channel command");
2430 ret = handle_notification_thread_command_add_channel(
2431 state,
2432 cmd->parameters.add_channel.session.name,
2433 cmd->parameters.add_channel.session.uid,
2434 cmd->parameters.add_channel.session.gid,
2435 cmd->parameters.add_channel.channel.name,
2436 cmd->parameters.add_channel.channel.domain,
2437 cmd->parameters.add_channel.channel.key,
2438 cmd->parameters.add_channel.channel.capacity,
2439 &cmd->reply_code);
2440 break;
2441 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2442 DBG("[notification-thread] Received remove channel command");
2443 ret = handle_notification_thread_command_remove_channel(
2444 state, cmd->parameters.remove_channel.key,
2445 cmd->parameters.remove_channel.domain,
2446 &cmd->reply_code);
2447 break;
2448 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2449 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2450 DBG("[notification-thread] Received session rotation %s command",
2451 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2452 "ongoing" : "completed");
2453 ret = handle_notification_thread_command_session_rotation(
2454 state,
2455 cmd->type,
2456 cmd->parameters.session_rotation.session_name,
2457 cmd->parameters.session_rotation.uid,
2458 cmd->parameters.session_rotation.gid,
2459 cmd->parameters.session_rotation.trace_archive_chunk_id,
2460 cmd->parameters.session_rotation.location,
2461 &cmd->reply_code);
2462 break;
2463 case NOTIFICATION_COMMAND_TYPE_QUIT:
2464 DBG("[notification-thread] Received quit command");
2465 cmd->reply_code = LTTNG_OK;
2466 ret = 1;
2467 goto end;
2468 default:
2469 ERR("[notification-thread] Unknown internal command received");
2470 goto error_unlock;
2471 }
2472
2473 if (ret) {
2474 goto error_unlock;
2475 }
2476 end:
2477 cds_list_del(&cmd->cmd_list_node);
2478 if (cmd->is_async) {
2479 free(cmd);
2480 cmd = NULL;
2481 } else {
2482 lttng_waiter_wake_up(&cmd->reply_waiter);
2483 }
2484 pthread_mutex_unlock(&handle->cmd_queue.lock);
2485 return ret;
2486 error_unlock:
2487 /* Wake-up and return a fatal error to the calling thread. */
2488 lttng_waiter_wake_up(&cmd->reply_waiter);
2489 pthread_mutex_unlock(&handle->cmd_queue.lock);
2490 cmd->reply_code = LTTNG_ERR_FATAL;
2491 error:
2492 /* Indicate a fatal error to the caller. */
2493 return -1;
2494 }
2495
2496 static
2497 int socket_set_non_blocking(int socket)
2498 {
2499 int ret, flags;
2500
2501 /* Set the pipe as non-blocking. */
2502 ret = fcntl(socket, F_GETFL, 0);
2503 if (ret == -1) {
2504 PERROR("fcntl get socket flags");
2505 goto end;
2506 }
2507 flags = ret;
2508
2509 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2510 if (ret == -1) {
2511 PERROR("fcntl set O_NONBLOCK socket flag");
2512 goto end;
2513 }
2514 DBG("Client socket (fd = %i) set as non-blocking", socket);
2515 end:
2516 return ret;
2517 }
2518
2519 /* Client lock must be acquired by caller. */
2520 static
2521 int client_reset_inbound_state(struct notification_client *client)
2522 {
2523 int ret;
2524
2525 ASSERT_LOCKED(client->lock);
2526
2527 ret = lttng_dynamic_buffer_set_size(
2528 &client->communication.inbound.buffer, 0);
2529 assert(!ret);
2530
2531 client->communication.inbound.bytes_to_receive =
2532 sizeof(struct lttng_notification_channel_message);
2533 client->communication.inbound.msg_type =
2534 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2535 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2536 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2537 ret = lttng_dynamic_buffer_set_size(
2538 &client->communication.inbound.buffer,
2539 client->communication.inbound.bytes_to_receive);
2540 return ret;
2541 }
2542
2543 int handle_notification_thread_client_connect(
2544 struct notification_thread_state *state)
2545 {
2546 int ret;
2547 struct notification_client *client;
2548
2549 DBG("[notification-thread] Handling new notification channel client connection");
2550
2551 client = zmalloc(sizeof(*client));
2552 if (!client) {
2553 /* Fatal error. */
2554 ret = -1;
2555 goto error;
2556 }
2557 pthread_mutex_init(&client->lock, NULL);
2558 client->id = state->next_notification_client_id++;
2559 CDS_INIT_LIST_HEAD(&client->condition_list);
2560 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2561 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2562 client->communication.inbound.expect_creds = true;
2563
2564 pthread_mutex_lock(&client->lock);
2565 ret = client_reset_inbound_state(client);
2566 pthread_mutex_unlock(&client->lock);
2567 if (ret) {
2568 ERR("[notification-thread] Failed to reset client communication's inbound state");
2569 ret = 0;
2570 goto error;
2571 }
2572
2573 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2574 if (ret < 0) {
2575 ERR("[notification-thread] Failed to accept new notification channel client connection");
2576 ret = 0;
2577 goto error;
2578 }
2579
2580 client->socket = ret;
2581
2582 ret = socket_set_non_blocking(client->socket);
2583 if (ret) {
2584 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2585 goto error;
2586 }
2587
2588 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2589 if (ret < 0) {
2590 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2591 ret = 0;
2592 goto error;
2593 }
2594
2595 ret = lttng_poll_add(&state->events, client->socket,
2596 LPOLLIN | LPOLLERR |
2597 LPOLLHUP | LPOLLRDHUP);
2598 if (ret < 0) {
2599 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2600 ret = 0;
2601 goto error;
2602 }
2603 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2604 client->socket);
2605
2606 rcu_read_lock();
2607 cds_lfht_add(state->client_socket_ht,
2608 hash_client_socket(client->socket),
2609 &client->client_socket_ht_node);
2610 cds_lfht_add(state->client_id_ht,
2611 hash_client_id(client->id),
2612 &client->client_id_ht_node);
2613 rcu_read_unlock();
2614
2615 return ret;
2616 error:
2617 notification_client_destroy(client, state);
2618 return ret;
2619 }
2620
2621 /* RCU read-lock must be held by the caller. */
2622 /* Client lock must be held by the caller */
2623 static
2624 int notification_thread_client_disconnect(
2625 struct notification_client *client,
2626 struct notification_thread_state *state)
2627 {
2628 int ret;
2629 struct lttng_condition_list_element *condition_list_element, *tmp;
2630
2631 /* Acquire the client lock to disable its communication atomically. */
2632 client->communication.active = false;
2633 ret = lttng_poll_del(&state->events, client->socket);
2634 if (ret) {
2635 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2636 client->socket);
2637 }
2638
2639 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2640 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2641
2642 /* Release all conditions to which the client was subscribed. */
2643 cds_list_for_each_entry_safe(condition_list_element, tmp,
2644 &client->condition_list, node) {
2645 (void) notification_thread_client_unsubscribe(client,
2646 condition_list_element->condition, state, NULL);
2647 }
2648
2649 /*
2650 * Client no longer accessible to other threads (through the
2651 * client lists).
2652 */
2653 notification_client_destroy(client, state);
2654 return ret;
2655 }
2656
2657 int handle_notification_thread_client_disconnect(
2658 int client_socket, struct notification_thread_state *state)
2659 {
2660 int ret = 0;
2661 struct notification_client *client;
2662
2663 rcu_read_lock();
2664 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2665 client_socket);
2666 client = get_client_from_socket(client_socket, state);
2667 if (!client) {
2668 /* Internal state corruption, fatal error. */
2669 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2670 client_socket);
2671 ret = -1;
2672 goto end;
2673 }
2674
2675 pthread_mutex_lock(&client->lock);
2676 ret = notification_thread_client_disconnect(client, state);
2677 pthread_mutex_unlock(&client->lock);
2678 end:
2679 rcu_read_unlock();
2680 return ret;
2681 }
2682
2683 int handle_notification_thread_client_disconnect_all(
2684 struct notification_thread_state *state)
2685 {
2686 struct cds_lfht_iter iter;
2687 struct notification_client *client;
2688 bool error_encoutered = false;
2689
2690 rcu_read_lock();
2691 DBG("[notification-thread] Closing all client connections");
2692 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2693 client_socket_ht_node) {
2694 int ret;
2695
2696 pthread_mutex_lock(&client->lock);
2697 ret = notification_thread_client_disconnect(
2698 client, state);
2699 pthread_mutex_unlock(&client->lock);
2700 if (ret) {
2701 error_encoutered = true;
2702 }
2703 }
2704 rcu_read_unlock();
2705 return error_encoutered ? 1 : 0;
2706 }
2707
2708 int handle_notification_thread_trigger_unregister_all(
2709 struct notification_thread_state *state)
2710 {
2711 bool error_occurred = false;
2712 struct cds_lfht_iter iter;
2713 struct lttng_trigger_ht_element *trigger_ht_element;
2714
2715 rcu_read_lock();
2716 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2717 node) {
2718 int ret = handle_notification_thread_command_unregister_trigger(
2719 state, trigger_ht_element->trigger, NULL);
2720 if (ret) {
2721 error_occurred = true;
2722 }
2723 }
2724 rcu_read_unlock();
2725 return error_occurred ? -1 : 0;
2726 }
2727
2728 static
2729 int client_handle_transmission_status(
2730 struct notification_client *client,
2731 enum client_transmission_status transmission_status,
2732 struct notification_thread_state *state)
2733 {
2734 int ret = 0;
2735
2736 switch (transmission_status) {
2737 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
2738 ret = lttng_poll_mod(&state->events, client->socket,
2739 CLIENT_POLL_MASK_IN);
2740 if (ret) {
2741 goto end;
2742 }
2743
2744 client->communication.outbound.queued_command_reply = false;
2745 client->communication.outbound.dropped_notification = false;
2746 break;
2747 case CLIENT_TRANSMISSION_STATUS_QUEUED:
2748 /*
2749 * We want to be notified whenever there is buffer space
2750 * available to send the rest of the payload.
2751 */
2752 ret = lttng_poll_mod(&state->events, client->socket,
2753 CLIENT_POLL_MASK_IN_OUT);
2754 if (ret) {
2755 goto end;
2756 }
2757 break;
2758 case CLIENT_TRANSMISSION_STATUS_FAIL:
2759 ret = notification_thread_client_disconnect(client, state);
2760 if (ret) {
2761 goto end;
2762 }
2763 break;
2764 case CLIENT_TRANSMISSION_STATUS_ERROR:
2765 ret = -1;
2766 goto end;
2767 default:
2768 abort();
2769 }
2770 end:
2771 return ret;
2772 }
2773
2774 /* Client lock must be acquired by caller. */
2775 static
2776 enum client_transmission_status client_flush_outgoing_queue(
2777 struct notification_client *client,
2778 struct notification_thread_state *state)
2779 {
2780 ssize_t ret;
2781 size_t to_send_count;
2782 enum client_transmission_status status;
2783
2784 ASSERT_LOCKED(client->lock);
2785
2786 assert(client->communication.outbound.buffer.size != 0);
2787 to_send_count = client->communication.outbound.buffer.size;
2788 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2789 client->socket);
2790
2791 ret = lttcomm_send_unix_sock_non_block(client->socket,
2792 client->communication.outbound.buffer.data,
2793 to_send_count);
2794 if ((ret >= 0 && ret < to_send_count)) {
2795 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2796 client->socket);
2797 to_send_count -= max(ret, 0);
2798
2799 memcpy(client->communication.outbound.buffer.data,
2800 client->communication.outbound.buffer.data +
2801 client->communication.outbound.buffer.size - to_send_count,
2802 to_send_count);
2803 ret = lttng_dynamic_buffer_set_size(
2804 &client->communication.outbound.buffer,
2805 to_send_count);
2806 if (ret) {
2807 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2808 goto error;
2809 }
2810 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
2811 } else if (ret < 0) {
2812 /* Generic error, disconnect the client. */
2813 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2814 client->socket);
2815 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2816 } else {
2817 /* No error and flushed the queue completely. */
2818 ret = lttng_dynamic_buffer_set_size(
2819 &client->communication.outbound.buffer, 0);
2820 if (ret) {
2821 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2822 goto error;
2823 }
2824 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
2825 }
2826
2827 ret = client_handle_transmission_status(client, status, state);
2828 if (ret) {
2829 goto error;
2830 }
2831
2832 return 0;
2833 error:
2834 return -1;
2835 }
2836
2837 /* Client lock must be acquired by caller. */
2838 static
2839 int client_send_command_reply(struct notification_client *client,
2840 struct notification_thread_state *state,
2841 enum lttng_notification_channel_status status)
2842 {
2843 int ret;
2844 struct lttng_notification_channel_command_reply reply = {
2845 .status = (int8_t) status,
2846 };
2847 struct lttng_notification_channel_message msg = {
2848 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2849 .size = sizeof(reply),
2850 };
2851 char buffer[sizeof(msg) + sizeof(reply)];
2852
2853 ASSERT_LOCKED(client->lock);
2854
2855 if (client->communication.outbound.queued_command_reply) {
2856 /* Protocol error. */
2857 goto error;
2858 }
2859
2860 memcpy(buffer, &msg, sizeof(msg));
2861 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2862 DBG("[notification-thread] Send command reply (%i)", (int) status);
2863
2864 /* Enqueue buffer to outgoing queue and flush it. */
2865 ret = lttng_dynamic_buffer_append(
2866 &client->communication.outbound.buffer,
2867 buffer, sizeof(buffer));
2868 if (ret) {
2869 goto error;
2870 }
2871
2872 ret = client_flush_outgoing_queue(client, state);
2873 if (ret) {
2874 goto error;
2875 }
2876
2877 if (client->communication.outbound.buffer.size != 0) {
2878 /* Queue could not be emptied. */
2879 client->communication.outbound.queued_command_reply = true;
2880 }
2881
2882 return 0;
2883 error:
2884 return -1;
2885 }
2886
2887 static
2888 int client_handle_message_unknown(struct notification_client *client,
2889 struct notification_thread_state *state)
2890 {
2891 int ret;
2892
2893 pthread_mutex_lock(&client->lock);
2894
2895 /*
2896 * Receiving message header. The function will be called again
2897 * once the rest of the message as been received and can be
2898 * interpreted.
2899 */
2900 const struct lttng_notification_channel_message *msg;
2901
2902 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2903 msg = (const struct lttng_notification_channel_message *)
2904 client->communication.inbound.buffer.data;
2905
2906 if (msg->size == 0 ||
2907 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2908 ERR("[notification-thread] Invalid notification channel message: length = %u",
2909 msg->size);
2910 ret = -1;
2911 goto end;
2912 }
2913
2914 switch (msg->type) {
2915 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2916 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2917 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2918 break;
2919 default:
2920 ret = -1;
2921 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2922 goto end;
2923 }
2924
2925 client->communication.inbound.bytes_to_receive = msg->size;
2926 client->communication.inbound.msg_type =
2927 (enum lttng_notification_channel_message_type) msg->type;
2928 ret = lttng_dynamic_buffer_set_size(
2929 &client->communication.inbound.buffer, msg->size);
2930 end:
2931 pthread_mutex_unlock(&client->lock);
2932 return ret;
2933 }
2934
2935 static
2936 int client_handle_message_handshake(struct notification_client *client,
2937 struct notification_thread_state *state)
2938 {
2939 int ret;
2940 struct lttng_notification_channel_command_handshake *handshake_client;
2941 const struct lttng_notification_channel_command_handshake handshake_reply = {
2942 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2943 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2944 };
2945 const struct lttng_notification_channel_message msg_header = {
2946 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2947 .size = sizeof(handshake_reply),
2948 };
2949 enum lttng_notification_channel_status status =
2950 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2951 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2952
2953 pthread_mutex_lock(&client->lock);
2954
2955 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2956 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2957 sizeof(handshake_reply));
2958
2959 handshake_client =
2960 (struct lttng_notification_channel_command_handshake *)
2961 client->communication.inbound.buffer
2962 .data;
2963 client->major = handshake_client->major;
2964 client->minor = handshake_client->minor;
2965 if (!client->communication.inbound.creds_received) {
2966 ERR("[notification-thread] No credentials received from client");
2967 ret = -1;
2968 goto end;
2969 }
2970
2971 client->uid = LTTNG_SOCK_GET_UID_CRED(
2972 &client->communication.inbound.creds);
2973 client->gid = LTTNG_SOCK_GET_GID_CRED(
2974 &client->communication.inbound.creds);
2975 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2976 client->uid, client->gid, (int) client->major,
2977 (int) client->minor);
2978
2979 if (handshake_client->major !=
2980 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2981 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2982 }
2983
2984 ret = lttng_dynamic_buffer_append(
2985 &client->communication.outbound.buffer, send_buffer,
2986 sizeof(send_buffer));
2987 if (ret) {
2988 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2989 goto end;
2990 }
2991
2992 client->validated = true;
2993 client->communication.active = true;
2994
2995 ret = client_flush_outgoing_queue(client, state);
2996 if (ret) {
2997 goto end;
2998 }
2999
3000 ret = client_send_command_reply(client, state, status);
3001 if (ret) {
3002 ERR("[notification-thread] Failed to send reply to notification channel client");
3003 goto end;
3004 }
3005
3006 /* Set reception state to receive the next message header. */
3007 ret = client_reset_inbound_state(client);
3008 if (ret) {
3009 ERR("[notification-thread] Failed to reset client communication's inbound state");
3010 goto end;
3011 }
3012
3013 end:
3014 pthread_mutex_unlock(&client->lock);
3015 return ret;
3016 }
3017
3018 static
3019 int client_handle_message_subscription(
3020 struct notification_client *client,
3021 enum lttng_notification_channel_message_type msg_type,
3022 struct notification_thread_state *state)
3023 {
3024 int ret;
3025 struct lttng_condition *condition;
3026 enum lttng_notification_channel_status status =
3027 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3028 struct lttng_payload_view condition_view =
3029 lttng_payload_view_from_dynamic_buffer(
3030 &client->communication.inbound.buffer,
3031 0, -1);
3032 size_t expected_condition_size;
3033
3034 pthread_mutex_lock(&client->lock);
3035 expected_condition_size = client->communication.inbound.buffer.size;
3036 pthread_mutex_unlock(&client->lock);
3037
3038 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3039 if (ret != expected_condition_size) {
3040 ERR("[notification-thread] Malformed condition received from client");
3041 goto end;
3042 }
3043
3044 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3045 ret = notification_thread_client_subscribe(
3046 client, condition, state, &status);
3047 } else {
3048 ret = notification_thread_client_unsubscribe(
3049 client, condition, state, &status);
3050 }
3051 if (ret) {
3052 goto end;
3053 }
3054
3055 pthread_mutex_lock(&client->lock);
3056 ret = client_send_command_reply(client, state, status);
3057 if (ret) {
3058 ERR("[notification-thread] Failed to send reply to notification channel client");
3059 goto end_unlock;
3060 }
3061
3062 /* Set reception state to receive the next message header. */
3063 ret = client_reset_inbound_state(client);
3064 if (ret) {
3065 ERR("[notification-thread] Failed to reset client communication's inbound state");
3066 goto end_unlock;
3067 }
3068
3069 end_unlock:
3070 pthread_mutex_unlock(&client->lock);
3071 end:
3072 return ret;
3073 }
3074
3075 static
3076 int client_dispatch_message(struct notification_client *client,
3077 struct notification_thread_state *state)
3078 {
3079 int ret = 0;
3080
3081 if (client->communication.inbound.msg_type !=
3082 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3083 client->communication.inbound.msg_type !=
3084 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3085 !client->validated) {
3086 WARN("[notification-thread] client attempted a command before handshake");
3087 ret = -1;
3088 goto end;
3089 }
3090
3091 switch (client->communication.inbound.msg_type) {
3092 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3093 {
3094 ret = client_handle_message_unknown(client, state);
3095 break;
3096 }
3097 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3098 {
3099 ret = client_handle_message_handshake(client, state);
3100 break;
3101 }
3102 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3103 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3104 {
3105 ret = client_handle_message_subscription(client,
3106 client->communication.inbound.msg_type, state);
3107 break;
3108 }
3109 default:
3110 abort();
3111 }
3112 end:
3113 return ret;
3114 }
3115
3116 /* Incoming data from client. */
3117 int handle_notification_thread_client_in(
3118 struct notification_thread_state *state, int socket)
3119 {
3120 int ret = 0;
3121 struct notification_client *client;
3122 ssize_t recv_ret;
3123 size_t offset;
3124 bool message_is_complete = false;
3125
3126 client = get_client_from_socket(socket, state);
3127 if (!client) {
3128 /* Internal error, abort. */
3129 ret = -1;
3130 goto end;
3131 }
3132
3133 pthread_mutex_lock(&client->lock);
3134 offset = client->communication.inbound.buffer.size -
3135 client->communication.inbound.bytes_to_receive;
3136 if (client->communication.inbound.expect_creds) {
3137 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3138 client->communication.inbound.buffer.data + offset,
3139 client->communication.inbound.bytes_to_receive,
3140 &client->communication.inbound.creds);
3141 if (recv_ret > 0) {
3142 client->communication.inbound.expect_creds = false;
3143 client->communication.inbound.creds_received = true;
3144 }
3145 } else {
3146 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3147 client->communication.inbound.buffer.data + offset,
3148 client->communication.inbound.bytes_to_receive);
3149 }
3150 if (recv_ret >= 0) {
3151 client->communication.inbound.bytes_to_receive -= recv_ret;
3152 message_is_complete = client->communication.inbound
3153 .bytes_to_receive == 0;
3154 }
3155 pthread_mutex_unlock(&client->lock);
3156 if (recv_ret < 0) {
3157 goto error_disconnect_client;
3158 }
3159
3160 if (message_is_complete) {
3161 ret = client_dispatch_message(client, state);
3162 if (ret) {
3163 /*
3164 * Only returns an error if this client must be
3165 * disconnected.
3166 */
3167 goto error_disconnect_client;
3168 }
3169 }
3170 end:
3171 return ret;
3172 error_disconnect_client:
3173 pthread_mutex_lock(&client->lock);
3174 ret = notification_thread_client_disconnect(client, state);
3175 pthread_mutex_unlock(&client->lock);
3176 return ret;
3177 }
3178
3179 /* Client ready to receive outgoing data. */
3180 int handle_notification_thread_client_out(
3181 struct notification_thread_state *state, int socket)
3182 {
3183 int ret;
3184 struct notification_client *client;
3185
3186 client = get_client_from_socket(socket, state);
3187 if (!client) {
3188 /* Internal error, abort. */
3189 ret = -1;
3190 goto end;
3191 }
3192
3193 pthread_mutex_lock(&client->lock);
3194 ret = client_flush_outgoing_queue(client, state);
3195 pthread_mutex_unlock(&client->lock);
3196 if (ret) {
3197 goto end;
3198 }
3199 end:
3200 return ret;
3201 }
3202
3203 static
3204 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3205 const struct channel_state_sample *sample,
3206 uint64_t buffer_capacity)
3207 {
3208 bool result = false;
3209 uint64_t threshold;
3210 enum lttng_condition_type condition_type;
3211 const struct lttng_condition_buffer_usage *use_condition = container_of(
3212 condition, struct lttng_condition_buffer_usage,
3213 parent);
3214
3215 if (use_condition->threshold_bytes.set) {
3216 threshold = use_condition->threshold_bytes.value;
3217 } else {
3218 /*
3219 * Threshold was expressed as a ratio.
3220 *
3221 * TODO the threshold (in bytes) of conditions expressed
3222 * as a ratio of total buffer size could be cached to
3223 * forego this double-multiplication or it could be performed
3224 * as fixed-point math.
3225 *
3226 * Note that caching should accommodates the case where the
3227 * condition applies to multiple channels (i.e. don't assume
3228 * that all channels matching my_chann* have the same size...)
3229 */
3230 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3231 (double) buffer_capacity);
3232 }
3233
3234 condition_type = lttng_condition_get_type(condition);
3235 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3236 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3237 threshold, sample->highest_usage);
3238
3239 /*
3240 * The low condition should only be triggered once _all_ of the
3241 * streams in a channel have gone below the "low" threshold.
3242 */
3243 if (sample->highest_usage <= threshold) {
3244 result = true;
3245 }
3246 } else {
3247 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3248 threshold, sample->highest_usage);
3249
3250 /*
3251 * For high buffer usage scenarios, we want to trigger whenever
3252 * _any_ of the streams has reached the "high" threshold.
3253 */
3254 if (sample->highest_usage >= threshold) {
3255 result = true;
3256 }
3257 }
3258
3259 return result;
3260 }
3261
3262 static
3263 bool evaluate_session_consumed_size_condition(
3264 const struct lttng_condition *condition,
3265 uint64_t session_consumed_size)
3266 {
3267 uint64_t threshold;
3268 const struct lttng_condition_session_consumed_size *size_condition =
3269 container_of(condition,
3270 struct lttng_condition_session_consumed_size,
3271 parent);
3272
3273 threshold = size_condition->consumed_threshold_bytes.value;
3274 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3275 threshold, session_consumed_size);
3276 return session_consumed_size >= threshold;
3277 }
3278
3279 static
3280 int evaluate_buffer_condition(const struct lttng_condition *condition,
3281 struct lttng_evaluation **evaluation,
3282 const struct notification_thread_state *state,
3283 const struct channel_state_sample *previous_sample,
3284 const struct channel_state_sample *latest_sample,
3285 uint64_t previous_session_consumed_total,
3286 uint64_t latest_session_consumed_total,
3287 struct channel_info *channel_info)
3288 {
3289 int ret = 0;
3290 enum lttng_condition_type condition_type;
3291 const bool previous_sample_available = !!previous_sample;
3292 bool previous_sample_result = false;
3293 bool latest_sample_result;
3294
3295 condition_type = lttng_condition_get_type(condition);
3296
3297 switch (condition_type) {
3298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3299 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3300 if (caa_likely(previous_sample_available)) {
3301 previous_sample_result =
3302 evaluate_buffer_usage_condition(condition,
3303 previous_sample, channel_info->capacity);
3304 }
3305 latest_sample_result = evaluate_buffer_usage_condition(
3306 condition, latest_sample,
3307 channel_info->capacity);
3308 break;
3309 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3310 if (caa_likely(previous_sample_available)) {
3311 previous_sample_result =
3312 evaluate_session_consumed_size_condition(
3313 condition,
3314 previous_session_consumed_total);
3315 }
3316 latest_sample_result =
3317 evaluate_session_consumed_size_condition(
3318 condition,
3319 latest_session_consumed_total);
3320 break;
3321 default:
3322 /* Unknown condition type; internal error. */
3323 abort();
3324 }
3325
3326 if (!latest_sample_result ||
3327 (previous_sample_result == latest_sample_result)) {
3328 /*
3329 * Only trigger on a condition evaluation transition.
3330 *
3331 * NOTE: This edge-triggered logic may not be appropriate for
3332 * future condition types.
3333 */
3334 goto end;
3335 }
3336
3337 if (!evaluation || !latest_sample_result) {
3338 goto end;
3339 }
3340
3341 switch (condition_type) {
3342 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3343 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3344 *evaluation = lttng_evaluation_buffer_usage_create(
3345 condition_type,
3346 latest_sample->highest_usage,
3347 channel_info->capacity);
3348 break;
3349 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3350 *evaluation = lttng_evaluation_session_consumed_size_create(
3351 latest_session_consumed_total);
3352 break;
3353 default:
3354 abort();
3355 }
3356
3357 if (!*evaluation) {
3358 ret = -1;
3359 goto end;
3360 }
3361 end:
3362 return ret;
3363 }
3364
3365 static
3366 int client_enqueue_dropped_notification(struct notification_client *client)
3367 {
3368 int ret;
3369 struct lttng_notification_channel_message msg = {
3370 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3371 .size = 0,
3372 };
3373
3374 ASSERT_LOCKED(client->lock);
3375
3376 ret = lttng_dynamic_buffer_append(
3377 &client->communication.outbound.buffer, &msg,
3378 sizeof(msg));
3379 return ret;
3380 }
3381
3382 /*
3383 * Permission checks relative to notification channel clients are performed
3384 * here. Notice how object, client, and trigger credentials are involved in
3385 * this check.
3386 *
3387 * The `object` credentials are the credentials associated with the "subject"
3388 * of a condition. For instance, a `rotation completed` condition applies
3389 * to a session. When that condition is met, it will produce an evaluation
3390 * against a session. Hence, in this case, the `object` credentials are the
3391 * credentials of the "subject" session.
3392 *
3393 * The `trigger` credentials are the credentials of the user that registered the
3394 * trigger.
3395 *
3396 * The `client` credentials are the credentials of the user that created a given
3397 * notification channel.
3398 *
3399 * In terms of visibility, it is expected that non-privilieged users can only
3400 * register triggers against "their" objects (their own sessions and
3401 * applications they are allowed to interact with). They can then open a
3402 * notification channel and subscribe to notifications associated with those
3403 * triggers.
3404 *
3405 * As for privilieged users, they can register triggers against the objects of
3406 * other users. They can then subscribe to the notifications associated to their
3407 * triggers. Privilieged users _can't_ subscribe to the notifications of
3408 * triggers owned by other users; they must create their own triggers.
3409 *
3410 * This is more a concern of usability than security. It would be difficult for
3411 * a root user reliably subscribe to a specific set of conditions without
3412 * interference from external users (those could, for instance, unregister
3413 * their triggers).
3414 */
3415 static
3416 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3417 const struct lttng_evaluation *evaluation,
3418 struct notification_client_list* client_list,
3419 struct notification_thread_state *state,
3420 uid_t object_uid, gid_t object_gid)
3421 {
3422 int ret = 0;
3423 struct lttng_payload msg_payload;
3424 struct notification_client_list_element *client_list_element, *tmp;
3425 const struct lttng_notification notification = {
3426 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3427 .evaluation = (struct lttng_evaluation *) evaluation,
3428 };
3429 struct lttng_notification_channel_message msg_header = {
3430 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3431 };
3432 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
3433
3434 lttng_payload_init(&msg_payload);
3435
3436 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3437 sizeof(msg_header));
3438 if (ret) {
3439 goto end;
3440 }
3441
3442 ret = lttng_notification_serialize(&notification, &msg_payload);
3443 if (ret) {
3444 ERR("[notification-thread] Failed to serialize notification");
3445 ret = -1;
3446 goto end;
3447 }
3448
3449 /* Update payload size. */
3450 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
3451 ->size = (uint32_t)(
3452 msg_payload.buffer.size - sizeof(msg_header));
3453
3454 pthread_mutex_lock(&client_list->lock);
3455 cds_list_for_each_entry_safe(client_list_element, tmp,
3456 &client_list->list, node) {
3457 struct notification_client *client =
3458 client_list_element->client;
3459
3460 ret = 0;
3461 pthread_mutex_lock(&client->lock);
3462 if (client->uid != object_uid && client->gid != object_gid &&
3463 client->uid != 0) {
3464 /* Client is not allowed to monitor this channel. */
3465 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3466 goto unlock_client;
3467 }
3468
3469 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3470 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3471 goto unlock_client;
3472 }
3473
3474 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3475 client->socket, msg_payload.buffer.size);
3476 if (client->communication.outbound.buffer.size) {
3477 /*
3478 * Outgoing data is already buffered for this client;
3479 * drop the notification and enqueue a "dropped
3480 * notification" message if this is the first dropped
3481 * notification since the socket spilled-over to the
3482 * queue.
3483 */
3484 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3485 client->socket);
3486 if (!client->communication.outbound.dropped_notification) {
3487 client->communication.outbound.dropped_notification = true;
3488 ret = client_enqueue_dropped_notification(
3489 client);
3490 if (ret) {
3491 goto unlock_client;
3492 }
3493 }
3494 goto unlock_client;
3495 }
3496
3497 ret = lttng_dynamic_buffer_append_buffer(
3498 &client->communication.outbound.buffer,
3499 &msg_payload.buffer);
3500 if (ret) {
3501 goto unlock_client;
3502 }
3503
3504 ret = client_flush_outgoing_queue(client, state);
3505 if (ret) {
3506 goto unlock_client;
3507 }
3508 unlock_client:
3509 pthread_mutex_unlock(&client->lock);
3510 if (ret) {
3511 goto end_unlock_list;
3512 }
3513 }
3514 ret = 0;
3515
3516 end_unlock_list:
3517 pthread_mutex_unlock(&client_list->lock);
3518 end:
3519 lttng_payload_reset(&msg_payload);
3520 return ret;
3521 }
3522
3523 int handle_notification_thread_channel_sample(
3524 struct notification_thread_state *state, int pipe,
3525 enum lttng_domain_type domain)
3526 {
3527 int ret = 0;
3528 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3529 struct channel_info *channel_info;
3530 struct cds_lfht_node *node;
3531 struct cds_lfht_iter iter;
3532 struct lttng_channel_trigger_list *trigger_list;
3533 struct lttng_trigger_list_element *trigger_list_element;
3534 bool previous_sample_available = false;
3535 struct channel_state_sample previous_sample, latest_sample;
3536 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3537
3538 /*
3539 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3540 * ensuring that read/write of sampling messages are atomic.
3541 */
3542 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3543 if (ret != sizeof(sample_msg)) {
3544 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3545 pipe);
3546 ret = -1;
3547 goto end;
3548 }
3549
3550 ret = 0;
3551 latest_sample.key.key = sample_msg.key;
3552 latest_sample.key.domain = domain;
3553 latest_sample.highest_usage = sample_msg.highest;
3554 latest_sample.lowest_usage = sample_msg.lowest;
3555 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3556
3557 rcu_read_lock();
3558
3559 /* Retrieve the channel's informations */
3560 cds_lfht_lookup(state->channels_ht,
3561 hash_channel_key(&latest_sample.key),
3562 match_channel_info,
3563 &latest_sample.key,
3564 &iter);
3565 node = cds_lfht_iter_get_node(&iter);
3566 if (caa_unlikely(!node)) {
3567 /*
3568 * Not an error since the consumer can push a sample to the pipe
3569 * and the rest of the session daemon could notify us of the
3570 * channel's destruction before we get a chance to process that
3571 * sample.
3572 */
3573 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3574 latest_sample.key.key,
3575 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3576 "user space");
3577 goto end_unlock;
3578 }
3579 channel_info = caa_container_of(node, struct channel_info,
3580 channels_ht_node);
3581 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3582 channel_info->name,
3583 latest_sample.key.key,
3584 channel_info->session_info->name,
3585 latest_sample.highest_usage,
3586 latest_sample.lowest_usage,
3587 latest_sample.channel_total_consumed);
3588
3589 previous_session_consumed_total =
3590 channel_info->session_info->consumed_data_size;
3591
3592 /* Retrieve the channel's last sample, if it exists, and update it. */
3593 cds_lfht_lookup(state->channel_state_ht,
3594 hash_channel_key(&latest_sample.key),
3595 match_channel_state_sample,
3596 &latest_sample.key,
3597 &iter);
3598 node = cds_lfht_iter_get_node(&iter);
3599 if (caa_likely(node)) {
3600 struct channel_state_sample *stored_sample;
3601
3602 /* Update the sample stored. */
3603 stored_sample = caa_container_of(node,
3604 struct channel_state_sample,
3605 channel_state_ht_node);
3606
3607 memcpy(&previous_sample, stored_sample,
3608 sizeof(previous_sample));
3609 stored_sample->highest_usage = latest_sample.highest_usage;
3610 stored_sample->lowest_usage = latest_sample.lowest_usage;
3611 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3612 previous_sample_available = true;
3613
3614 latest_session_consumed_total =
3615 previous_session_consumed_total +
3616 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3617 } else {
3618 /*
3619 * This is the channel's first sample, allocate space for and
3620 * store the new sample.
3621 */
3622 struct channel_state_sample *stored_sample;
3623
3624 stored_sample = zmalloc(sizeof(*stored_sample));
3625 if (!stored_sample) {
3626 ret = -1;
3627 goto end_unlock;
3628 }
3629
3630 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3631 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3632 cds_lfht_add(state->channel_state_ht,
3633 hash_channel_key(&stored_sample->key),
3634 &stored_sample->channel_state_ht_node);
3635
3636 latest_session_consumed_total =
3637 previous_session_consumed_total +
3638 latest_sample.channel_total_consumed;
3639 }
3640
3641 channel_info->session_info->consumed_data_size =
3642 latest_session_consumed_total;
3643
3644 /* Find triggers associated with this channel. */
3645 cds_lfht_lookup(state->channel_triggers_ht,
3646 hash_channel_key(&latest_sample.key),
3647 match_channel_trigger_list,
3648 &latest_sample.key,
3649 &iter);
3650 node = cds_lfht_iter_get_node(&iter);
3651 if (caa_likely(!node)) {
3652 goto end_unlock;
3653 }
3654
3655 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3656 channel_triggers_ht_node);
3657 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3658 node) {
3659 const struct lttng_condition *condition;
3660 const struct lttng_action *action;
3661 const struct lttng_trigger *trigger;
3662 struct notification_client_list *client_list = NULL;
3663 struct lttng_evaluation *evaluation = NULL;
3664 bool client_list_is_empty;
3665
3666 ret = 0;
3667 trigger = trigger_list_element->trigger;
3668 condition = lttng_trigger_get_const_condition(trigger);
3669 assert(condition);
3670 action = lttng_trigger_get_const_action(trigger);
3671
3672 /* Notify actions are the only type currently supported. */
3673 assert(lttng_action_get_type_const(action) ==
3674 LTTNG_ACTION_TYPE_NOTIFY);
3675
3676 /*
3677 * Check if any client is subscribed to the result of this
3678 * evaluation.
3679 */
3680 client_list = get_client_list_from_condition(state, condition);
3681 assert(client_list);
3682 client_list_is_empty = cds_list_empty(&client_list->list);
3683 if (client_list_is_empty) {
3684 /*
3685 * No clients interested in the evaluation's result,
3686 * skip it.
3687 */
3688 goto put_list;
3689 }
3690
3691 ret = evaluate_buffer_condition(condition, &evaluation, state,
3692 previous_sample_available ? &previous_sample : NULL,
3693 &latest_sample,
3694 previous_session_consumed_total,
3695 latest_session_consumed_total,
3696 channel_info);
3697 if (caa_unlikely(ret)) {
3698 goto put_list;
3699 }
3700
3701 if (caa_likely(!evaluation)) {
3702 goto put_list;
3703 }
3704
3705 /* Dispatch evaluation result to all clients. */
3706 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3707 evaluation, client_list, state,
3708 channel_info->session_info->uid,
3709 channel_info->session_info->gid);
3710 lttng_evaluation_destroy(evaluation);
3711 put_list:
3712 notification_client_list_put(client_list);
3713 if (caa_unlikely(ret)) {
3714 break;
3715 }
3716 }
3717 end_unlock:
3718 rcu_read_unlock();
3719 end:
3720 return ret;
3721 }
This page took 0.159514 seconds and 4 git commands to generate.