Fix: notification: deadlock on cmd_queue.lock and client->lock
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 /*
126 * Thread safety of notification_client and notification_client_list.
127 *
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
133 *
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
138 *
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
142 * sense.
143 */
144 struct notification_client_list {
145 pthread_mutex_t lock;
146 struct urcu_ref ref;
147 const struct lttng_trigger *trigger;
148 struct cds_list_head list;
149 /* Weak reference to container. */
150 struct cds_lfht *notification_trigger_clients_ht;
151 struct cds_lfht_node notification_trigger_clients_ht_node;
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node;
154 };
155
156 struct notification_client {
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock;
159 notification_client_id id;
160 int socket;
161 /* Client protocol version. */
162 uint8_t major, minor;
163 uid_t uid;
164 gid_t gid;
165 /*
166 * Indicates if the credentials and versions of the client have been
167 * checked.
168 */
169 bool validated;
170 /*
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
174 */
175 struct cds_list_head condition_list;
176 struct cds_lfht_node client_socket_ht_node;
177 struct cds_lfht_node client_id_ht_node;
178 struct {
179 /*
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
184 * clean-up.
185 */
186 bool active;
187 struct {
188 /*
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
192 */
193 struct lttng_dynamic_buffer buffer;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type;
198 /*
199 * Indicates whether or not credentials are expected
200 * from the client.
201 */
202 bool expect_creds;
203 /*
204 * Indicates whether or not credentials were received
205 * from the client.
206 */
207 bool creds_received;
208 /* Only used during credentials reception. */
209 lttng_sock_cred creds;
210 } inbound;
211 struct {
212 /*
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
215 * already buffered.
216 *
217 * A notification is dropped whenever the buffer is not
218 * empty.
219 */
220 bool dropped_notification;
221 /*
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
227 */
228 bool queued_command_reply;
229 struct lttng_dynamic_buffer buffer;
230 } outbound;
231 } communication;
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node;
234 };
235
236 struct channel_state_sample {
237 struct channel_key key;
238 struct cds_lfht_node channel_state_ht_node;
239 uint64_t highest_usage;
240 uint64_t lowest_usage;
241 uint64_t channel_total_consumed;
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node;
244 };
245
246 static unsigned long hash_channel_key(struct channel_key *key);
247 static int evaluate_buffer_condition(const struct lttng_condition *condition,
248 struct lttng_evaluation **evaluation,
249 const struct notification_thread_state *state,
250 const struct channel_state_sample *previous_sample,
251 const struct channel_state_sample *latest_sample,
252 uint64_t previous_session_consumed_total,
253 uint64_t latest_session_consumed_total,
254 struct channel_info *channel_info);
255 static
256 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
257 const struct lttng_evaluation *evaluation,
258 struct notification_client_list *client_list,
259 struct notification_thread_state *state,
260 uid_t channel_uid, gid_t channel_gid);
261
262
263 /* session_info API */
264 static
265 void session_info_destroy(void *_data);
266 static
267 void session_info_get(struct session_info *session_info);
268 static
269 void session_info_put(struct session_info *session_info);
270 static
271 struct session_info *session_info_create(const char *name,
272 uid_t uid, gid_t gid,
273 struct lttng_session_trigger_list *trigger_list,
274 struct cds_lfht *sessions_ht);
275 static
276 void session_info_add_channel(struct session_info *session_info,
277 struct channel_info *channel_info);
278 static
279 void session_info_remove_channel(struct session_info *session_info,
280 struct channel_info *channel_info);
281
282 /* lttng_session_trigger_list API */
283 static
284 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
285 const char *session_name,
286 struct cds_lfht *session_triggers_ht);
287 static
288 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
289 const struct notification_thread_state *state,
290 const char *session_name);
291 static
292 void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list *list);
294 static
295 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
296 const struct lttng_trigger *trigger);
297
298
299 static
300 int match_client_socket(struct cds_lfht_node *node, const void *key)
301 {
302 /* This double-cast is intended to supress pointer-to-cast warning. */
303 const int socket = (int) (intptr_t) key;
304 const struct notification_client *client = caa_container_of(node,
305 struct notification_client, client_socket_ht_node);
306
307 return client->socket == socket;
308 }
309
310 static
311 int match_client_id(struct cds_lfht_node *node, const void *key)
312 {
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id = *((notification_client_id *) key);
315 const struct notification_client *client = caa_container_of(
316 node, struct notification_client, client_id_ht_node);
317
318 return client->id == id;
319 }
320
321 static
322 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
323 {
324 struct channel_key *channel_key = (struct channel_key *) key;
325 struct lttng_channel_trigger_list *trigger_list;
326
327 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
328 channel_triggers_ht_node);
329
330 return !!((channel_key->key == trigger_list->channel_key.key) &&
331 (channel_key->domain == trigger_list->channel_key.domain));
332 }
333
334 static
335 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
336 {
337 const char *session_name = (const char *) key;
338 struct lttng_session_trigger_list *trigger_list;
339
340 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
341 session_triggers_ht_node);
342
343 return !!(strcmp(trigger_list->session_name, session_name) == 0);
344 }
345
346 static
347 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
348 {
349 struct channel_key *channel_key = (struct channel_key *) key;
350 struct channel_state_sample *sample;
351
352 sample = caa_container_of(node, struct channel_state_sample,
353 channel_state_ht_node);
354
355 return !!((channel_key->key == sample->key.key) &&
356 (channel_key->domain == sample->key.domain));
357 }
358
359 static
360 int match_channel_info(struct cds_lfht_node *node, const void *key)
361 {
362 struct channel_key *channel_key = (struct channel_key *) key;
363 struct channel_info *channel_info;
364
365 channel_info = caa_container_of(node, struct channel_info,
366 channels_ht_node);
367
368 return !!((channel_key->key == channel_info->key.key) &&
369 (channel_key->domain == channel_info->key.domain));
370 }
371
372 static
373 int match_condition(struct cds_lfht_node *node, const void *key)
374 {
375 struct lttng_condition *condition_key = (struct lttng_condition *) key;
376 struct lttng_trigger_ht_element *trigger;
377 struct lttng_condition *condition;
378
379 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
380 node);
381 condition = lttng_trigger_get_condition(trigger->trigger);
382 assert(condition);
383
384 return !!lttng_condition_is_equal(condition_key, condition);
385 }
386
387 static
388 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
389 {
390 struct lttng_condition *condition_key = (struct lttng_condition *) key;
391 struct notification_client_list *client_list;
392 const struct lttng_condition *condition;
393
394 assert(condition_key);
395
396 client_list = caa_container_of(node, struct notification_client_list,
397 notification_trigger_clients_ht_node);
398 condition = lttng_trigger_get_const_condition(client_list->trigger);
399
400 return !!lttng_condition_is_equal(condition_key, condition);
401 }
402
403 static
404 int match_session(struct cds_lfht_node *node, const void *key)
405 {
406 const char *name = key;
407 struct session_info *session_info = caa_container_of(
408 node, struct session_info, sessions_ht_node);
409
410 return !strcmp(session_info->name, name);
411 }
412
413 static
414 unsigned long lttng_condition_buffer_usage_hash(
415 const struct lttng_condition *_condition)
416 {
417 unsigned long hash;
418 unsigned long condition_type;
419 struct lttng_condition_buffer_usage *condition;
420
421 condition = container_of(_condition,
422 struct lttng_condition_buffer_usage, parent);
423
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
426 if (condition->session_name) {
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 }
429 if (condition->channel_name) {
430 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
431 }
432 if (condition->domain.set) {
433 hash ^= hash_key_ulong(
434 (void *) condition->domain.type,
435 lttng_ht_seed);
436 }
437 if (condition->threshold_ratio.set) {
438 uint64_t val;
439
440 val = condition->threshold_ratio.value * (double) UINT32_MAX;
441 hash ^= hash_key_u64(&val, lttng_ht_seed);
442 } else if (condition->threshold_bytes.set) {
443 uint64_t val;
444
445 val = condition->threshold_bytes.value;
446 hash ^= hash_key_u64(&val, lttng_ht_seed);
447 }
448 return hash;
449 }
450
451 static
452 unsigned long lttng_condition_session_consumed_size_hash(
453 const struct lttng_condition *_condition)
454 {
455 unsigned long hash;
456 unsigned long condition_type =
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
458 struct lttng_condition_session_consumed_size *condition;
459 uint64_t val;
460
461 condition = container_of(_condition,
462 struct lttng_condition_session_consumed_size, parent);
463
464 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
465 if (condition->session_name) {
466 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
467 }
468 val = condition->consumed_threshold_bytes.value;
469 hash ^= hash_key_u64(&val, lttng_ht_seed);
470 return hash;
471 }
472
473 static
474 unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition *_condition)
476 {
477 unsigned long hash, condition_type;
478 struct lttng_condition_session_rotation *condition;
479
480 condition = container_of(_condition,
481 struct lttng_condition_session_rotation, parent);
482 condition_type = (unsigned long) condition->parent.type;
483 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
484 assert(condition->session_name);
485 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
486 return hash;
487 }
488
489 /*
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
493 */
494 static
495 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
496 {
497 switch (condition->type) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
500 return lttng_condition_buffer_usage_hash(condition);
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
502 return lttng_condition_session_consumed_size_hash(condition);
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
505 return lttng_condition_session_rotation_hash(condition);
506 default:
507 ERR("[notification-thread] Unexpected condition type caught");
508 abort();
509 }
510 }
511
512 static
513 unsigned long hash_channel_key(struct channel_key *key)
514 {
515 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
516 unsigned long domain_hash = hash_key_ulong(
517 (void *) (unsigned long) key->domain, lttng_ht_seed);
518
519 return key_hash ^ domain_hash;
520 }
521
522 static
523 unsigned long hash_client_socket(int socket)
524 {
525 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
526 }
527
528 static
529 unsigned long hash_client_id(notification_client_id id)
530 {
531 return hash_key_u64(&id, lttng_ht_seed);
532 }
533
534 /*
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
538 *
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
541 */
542 static
543 enum lttng_object_type get_condition_binding_object(
544 const struct lttng_condition *condition)
545 {
546 switch (lttng_condition_get_type(condition)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
550 return LTTNG_OBJECT_TYPE_CHANNEL;
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
553 return LTTNG_OBJECT_TYPE_SESSION;
554 default:
555 return LTTNG_OBJECT_TYPE_UNKNOWN;
556 }
557 }
558
559 static
560 void free_channel_info_rcu(struct rcu_head *node)
561 {
562 free(caa_container_of(node, struct channel_info, rcu_node));
563 }
564
565 static
566 void channel_info_destroy(struct channel_info *channel_info)
567 {
568 if (!channel_info) {
569 return;
570 }
571
572 if (channel_info->session_info) {
573 session_info_remove_channel(channel_info->session_info,
574 channel_info);
575 session_info_put(channel_info->session_info);
576 }
577 if (channel_info->name) {
578 free(channel_info->name);
579 }
580 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
581 }
582
583 static
584 void free_session_info_rcu(struct rcu_head *node)
585 {
586 free(caa_container_of(node, struct session_info, rcu_node));
587 }
588
589 /* Don't call directly, use the ref-counting mechanism. */
590 static
591 void session_info_destroy(void *_data)
592 {
593 struct session_info *session_info = _data;
594 int ret;
595
596 assert(session_info);
597 if (session_info->channel_infos_ht) {
598 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
599 if (ret) {
600 ERR("[notification-thread] Failed to destroy channel information hash table");
601 }
602 }
603 lttng_session_trigger_list_destroy(session_info->trigger_list);
604
605 rcu_read_lock();
606 cds_lfht_del(session_info->sessions_ht,
607 &session_info->sessions_ht_node);
608 rcu_read_unlock();
609 free(session_info->name);
610 call_rcu(&session_info->rcu_node, free_session_info_rcu);
611 }
612
613 static
614 void session_info_get(struct session_info *session_info)
615 {
616 if (!session_info) {
617 return;
618 }
619 lttng_ref_get(&session_info->ref);
620 }
621
622 static
623 void session_info_put(struct session_info *session_info)
624 {
625 if (!session_info) {
626 return;
627 }
628 lttng_ref_put(&session_info->ref);
629 }
630
631 static
632 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
633 struct lttng_session_trigger_list *trigger_list,
634 struct cds_lfht *sessions_ht)
635 {
636 struct session_info *session_info;
637
638 assert(name);
639
640 session_info = zmalloc(sizeof(*session_info));
641 if (!session_info) {
642 goto end;
643 }
644 lttng_ref_init(&session_info->ref, session_info_destroy);
645
646 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
647 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
648 if (!session_info->channel_infos_ht) {
649 goto error;
650 }
651
652 cds_lfht_node_init(&session_info->sessions_ht_node);
653 session_info->name = strdup(name);
654 if (!session_info->name) {
655 goto error;
656 }
657 session_info->uid = uid;
658 session_info->gid = gid;
659 session_info->trigger_list = trigger_list;
660 session_info->sessions_ht = sessions_ht;
661 end:
662 return session_info;
663 error:
664 session_info_put(session_info);
665 return NULL;
666 }
667
668 static
669 void session_info_add_channel(struct session_info *session_info,
670 struct channel_info *channel_info)
671 {
672 rcu_read_lock();
673 cds_lfht_add(session_info->channel_infos_ht,
674 hash_channel_key(&channel_info->key),
675 &channel_info->session_info_channels_ht_node);
676 rcu_read_unlock();
677 }
678
679 static
680 void session_info_remove_channel(struct session_info *session_info,
681 struct channel_info *channel_info)
682 {
683 rcu_read_lock();
684 cds_lfht_del(session_info->channel_infos_ht,
685 &channel_info->session_info_channels_ht_node);
686 rcu_read_unlock();
687 }
688
689 static
690 struct channel_info *channel_info_create(const char *channel_name,
691 struct channel_key *channel_key, uint64_t channel_capacity,
692 struct session_info *session_info)
693 {
694 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
695
696 if (!channel_info) {
697 goto end;
698 }
699
700 cds_lfht_node_init(&channel_info->channels_ht_node);
701 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
702 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
703 channel_info->capacity = channel_capacity;
704
705 channel_info->name = strdup(channel_name);
706 if (!channel_info->name) {
707 goto error;
708 }
709
710 /*
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
714 */
715 session_info_get(session_info);
716 session_info_add_channel(session_info, channel_info);
717 channel_info->session_info = session_info;
718 end:
719 return channel_info;
720 error:
721 channel_info_destroy(channel_info);
722 return NULL;
723 }
724
725 static
726 bool notification_client_list_get(struct notification_client_list *list)
727 {
728 return urcu_ref_get_unless_zero(&list->ref);
729 }
730
731 static
732 void free_notification_client_list_rcu(struct rcu_head *node)
733 {
734 free(caa_container_of(node, struct notification_client_list,
735 rcu_node));
736 }
737
738 static
739 void notification_client_list_release(struct urcu_ref *list_ref)
740 {
741 struct notification_client_list *list =
742 container_of(list_ref, typeof(*list), ref);
743 struct notification_client_list_element *client_list_element, *tmp;
744
745 if (list->notification_trigger_clients_ht) {
746 rcu_read_lock();
747 cds_lfht_del(list->notification_trigger_clients_ht,
748 &list->notification_trigger_clients_ht_node);
749 rcu_read_unlock();
750 list->notification_trigger_clients_ht = NULL;
751 }
752 cds_list_for_each_entry_safe(client_list_element, tmp,
753 &list->list, node) {
754 free(client_list_element);
755 }
756 pthread_mutex_destroy(&list->lock);
757 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
758 }
759
760 static
761 struct notification_client_list *notification_client_list_create(
762 const struct lttng_trigger *trigger)
763 {
764 struct notification_client_list *client_list =
765 zmalloc(sizeof(*client_list));
766
767 if (!client_list) {
768 goto error;
769 }
770 pthread_mutex_init(&client_list->lock, NULL);
771 urcu_ref_init(&client_list->ref);
772 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
773 CDS_INIT_LIST_HEAD(&client_list->list);
774 client_list->trigger = trigger;
775 error:
776 return client_list;
777 }
778
779 static
780 void publish_notification_client_list(
781 struct notification_thread_state *state,
782 struct notification_client_list *list)
783 {
784 const struct lttng_condition *condition =
785 lttng_trigger_get_const_condition(list->trigger);
786
787 assert(!list->notification_trigger_clients_ht);
788
789 list->notification_trigger_clients_ht =
790 state->notification_trigger_clients_ht;
791
792 rcu_read_lock();
793 cds_lfht_add(state->notification_trigger_clients_ht,
794 lttng_condition_hash(condition),
795 &list->notification_trigger_clients_ht_node);
796 rcu_read_unlock();
797 }
798
799 static
800 void notification_client_list_put(struct notification_client_list *list)
801 {
802 if (!list) {
803 return;
804 }
805 return urcu_ref_put(&list->ref, notification_client_list_release);
806 }
807
808 /* Provides a reference to the returned list. */
809 static
810 struct notification_client_list *get_client_list_from_condition(
811 struct notification_thread_state *state,
812 const struct lttng_condition *condition)
813 {
814 struct cds_lfht_node *node;
815 struct cds_lfht_iter iter;
816 struct notification_client_list *list = NULL;
817
818 rcu_read_lock();
819 cds_lfht_lookup(state->notification_trigger_clients_ht,
820 lttng_condition_hash(condition),
821 match_client_list_condition,
822 condition,
823 &iter);
824 node = cds_lfht_iter_get_node(&iter);
825 if (node) {
826 list = container_of(node, struct notification_client_list,
827 notification_trigger_clients_ht_node);
828 list = notification_client_list_get(list) ? list : NULL;
829 }
830
831 rcu_read_unlock();
832 return list;
833 }
834
835 static
836 int evaluate_channel_condition_for_client(
837 const struct lttng_condition *condition,
838 struct notification_thread_state *state,
839 struct lttng_evaluation **evaluation,
840 uid_t *session_uid, gid_t *session_gid)
841 {
842 int ret;
843 struct cds_lfht_iter iter;
844 struct cds_lfht_node *node;
845 struct channel_info *channel_info = NULL;
846 struct channel_key *channel_key = NULL;
847 struct channel_state_sample *last_sample = NULL;
848 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
849
850 rcu_read_lock();
851
852 /* Find the channel associated with the condition. */
853 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
854 channel_trigger_list, channel_triggers_ht_node) {
855 struct lttng_trigger_list_element *element;
856
857 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
858 const struct lttng_condition *current_condition =
859 lttng_trigger_get_const_condition(
860 element->trigger);
861
862 assert(current_condition);
863 if (!lttng_condition_is_equal(condition,
864 current_condition)) {
865 continue;
866 }
867
868 /* Found the trigger, save the channel key. */
869 channel_key = &channel_trigger_list->channel_key;
870 break;
871 }
872 if (channel_key) {
873 /* The channel key was found stop iteration. */
874 break;
875 }
876 }
877
878 if (!channel_key){
879 /* No channel found; normal exit. */
880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
881 ret = 0;
882 goto end;
883 }
884
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state->channels_ht,
887 hash_channel_key(channel_key),
888 match_channel_info,
889 channel_key,
890 &iter);
891 node = cds_lfht_iter_get_node(&iter);
892 assert(node);
893 channel_info = caa_container_of(node, struct channel_info,
894 channels_ht_node);
895
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state->channel_state_ht,
898 hash_channel_key(channel_key),
899 match_channel_state_sample,
900 channel_key,
901 &iter);
902 node = cds_lfht_iter_get_node(&iter);
903 if (node) {
904 last_sample = caa_container_of(node,
905 struct channel_state_sample,
906 channel_state_ht_node);
907 } else {
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
910 ret = 0;
911 goto end;
912 }
913
914 ret = evaluate_buffer_condition(condition, evaluation, state,
915 NULL, last_sample,
916 0, channel_info->session_info->consumed_data_size,
917 channel_info);
918 if (ret) {
919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
920 goto end;
921 }
922
923 *session_uid = channel_info->session_info->uid;
924 *session_gid = channel_info->session_info->gid;
925 end:
926 rcu_read_unlock();
927 return ret;
928 }
929
930 static
931 const char *get_condition_session_name(const struct lttng_condition *condition)
932 {
933 const char *session_name = NULL;
934 enum lttng_condition_status status;
935
936 switch (lttng_condition_get_type(condition)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
939 status = lttng_condition_buffer_usage_get_session_name(
940 condition, &session_name);
941 break;
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
943 status = lttng_condition_session_consumed_size_get_session_name(
944 condition, &session_name);
945 break;
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
948 status = lttng_condition_session_rotation_get_session_name(
949 condition, &session_name);
950 break;
951 default:
952 abort();
953 }
954 if (status != LTTNG_CONDITION_STATUS_OK) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
956 goto end;
957 }
958 end:
959 return session_name;
960 }
961
962 static
963 int evaluate_session_condition_for_client(
964 const struct lttng_condition *condition,
965 struct notification_thread_state *state,
966 struct lttng_evaluation **evaluation,
967 uid_t *session_uid, gid_t *session_gid)
968 {
969 int ret;
970 struct cds_lfht_iter iter;
971 struct cds_lfht_node *node;
972 const char *session_name;
973 struct session_info *session_info = NULL;
974
975 rcu_read_lock();
976 session_name = get_condition_session_name(condition);
977
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state->sessions_ht,
980 hash_key_str(session_name, lttng_ht_seed),
981 match_session,
982 session_name,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 if (!node) {
986 DBG("[notification-thread] No known session matching name \"%s\"",
987 session_name);
988 ret = 0;
989 goto end;
990 }
991
992 session_info = caa_container_of(node, struct session_info,
993 sessions_ht_node);
994 session_info_get(session_info);
995
996 /*
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
999 */
1000 switch (lttng_condition_get_type(condition)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1002 if (!session_info->rotation.ongoing) {
1003 ret = 0;
1004 goto end_session_put;
1005 }
1006
1007 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1008 session_info->rotation.id);
1009 if (!*evaluation) {
1010 /* Fatal error. */
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info->name);
1013 ret = -1;
1014 goto end_session_put;
1015 }
1016 ret = 0;
1017 break;
1018 default:
1019 ret = 0;
1020 goto end_session_put;
1021 }
1022
1023 *session_uid = session_info->uid;
1024 *session_gid = session_info->gid;
1025
1026 end_session_put:
1027 session_info_put(session_info);
1028 end:
1029 rcu_read_unlock();
1030 return ret;
1031 }
1032
1033 static
1034 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1035 const struct lttng_condition *condition,
1036 struct notification_client *client,
1037 struct notification_thread_state *state)
1038 {
1039 int ret;
1040 struct lttng_evaluation *evaluation = NULL;
1041 struct notification_client_list client_list = {
1042 .lock = PTHREAD_MUTEX_INITIALIZER,
1043 };
1044 struct notification_client_list_element client_list_element = { 0 };
1045 uid_t object_uid = 0;
1046 gid_t object_gid = 0;
1047
1048 assert(trigger);
1049 assert(condition);
1050 assert(client);
1051 assert(state);
1052
1053 switch (get_condition_binding_object(condition)) {
1054 case LTTNG_OBJECT_TYPE_SESSION:
1055 ret = evaluate_session_condition_for_client(condition, state,
1056 &evaluation, &object_uid, &object_gid);
1057 break;
1058 case LTTNG_OBJECT_TYPE_CHANNEL:
1059 ret = evaluate_channel_condition_for_client(condition, state,
1060 &evaluation, &object_uid, &object_gid);
1061 break;
1062 case LTTNG_OBJECT_TYPE_NONE:
1063 ret = 0;
1064 goto end;
1065 case LTTNG_OBJECT_TYPE_UNKNOWN:
1066 default:
1067 ret = -1;
1068 goto end;
1069 }
1070 if (ret) {
1071 /* Fatal error. */
1072 goto end;
1073 }
1074 if (!evaluation) {
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1077 ret = 0;
1078 goto end;
1079 }
1080
1081 /*
1082 * Create a temporary client list with the client currently
1083 * subscribing.
1084 */
1085 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1086 CDS_INIT_LIST_HEAD(&client_list.list);
1087 client_list.trigger = trigger;
1088
1089 CDS_INIT_LIST_HEAD(&client_list_element.node);
1090 client_list_element.client = client;
1091 cds_list_add(&client_list_element.node, &client_list.list);
1092
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1096 state, object_uid, object_gid);
1097
1098 end:
1099 return ret;
1100 }
1101
1102 static
1103 int notification_thread_client_subscribe(struct notification_client *client,
1104 struct lttng_condition *condition,
1105 struct notification_thread_state *state,
1106 enum lttng_notification_channel_status *_status)
1107 {
1108 int ret = 0;
1109 struct notification_client_list *client_list = NULL;
1110 struct lttng_condition_list_element *condition_list_element = NULL;
1111 struct notification_client_list_element *client_list_element = NULL;
1112 enum lttng_notification_channel_status status =
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1114
1115 /*
1116 * Ensure that the client has not already subscribed to this condition
1117 * before.
1118 */
1119 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1120 if (lttng_condition_is_equal(condition_list_element->condition,
1121 condition)) {
1122 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1123 goto end;
1124 }
1125 }
1126
1127 condition_list_element = zmalloc(sizeof(*condition_list_element));
1128 if (!condition_list_element) {
1129 ret = -1;
1130 goto error;
1131 }
1132 client_list_element = zmalloc(sizeof(*client_list_element));
1133 if (!client_list_element) {
1134 ret = -1;
1135 goto error;
1136 }
1137
1138 /*
1139 * Add the newly-subscribed condition to the client's subscription list.
1140 */
1141 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1142 condition_list_element->condition = condition;
1143 cds_list_add(&condition_list_element->node, &client->condition_list);
1144
1145 client_list = get_client_list_from_condition(state, condition);
1146 if (!client_list) {
1147 /*
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1151 */
1152 free(client_list_element);
1153 goto end;
1154 }
1155
1156 /*
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
1160 *
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
1164 */
1165 if (evaluate_condition_for_client(client_list->trigger, condition,
1166 client, state)) {
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1168 ret = -1;
1169 free(client_list_element);
1170 goto end;
1171 }
1172
1173 /*
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1176 * added prior.
1177 */
1178 client_list_element->client = client;
1179 CDS_INIT_LIST_HEAD(&client_list_element->node);
1180
1181 pthread_mutex_lock(&client_list->lock);
1182 cds_list_add(&client_list_element->node, &client_list->list);
1183 pthread_mutex_unlock(&client_list->lock);
1184 end:
1185 if (_status) {
1186 *_status = status;
1187 }
1188 if (client_list) {
1189 notification_client_list_put(client_list);
1190 }
1191 return ret;
1192 error:
1193 free(condition_list_element);
1194 free(client_list_element);
1195 return ret;
1196 }
1197
1198 static
1199 int notification_thread_client_unsubscribe(
1200 struct notification_client *client,
1201 struct lttng_condition *condition,
1202 struct notification_thread_state *state,
1203 enum lttng_notification_channel_status *_status)
1204 {
1205 struct notification_client_list *client_list;
1206 struct lttng_condition_list_element *condition_list_element,
1207 *condition_tmp;
1208 struct notification_client_list_element *client_list_element,
1209 *client_tmp;
1210 bool condition_found = false;
1211 enum lttng_notification_channel_status status =
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1213
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1216 &client->condition_list, node) {
1217 if (!lttng_condition_is_equal(condition_list_element->condition,
1218 condition)) {
1219 continue;
1220 }
1221
1222 cds_list_del(&condition_list_element->node);
1223 /*
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1227 */
1228 if (condition != condition_list_element->condition) {
1229 lttng_condition_destroy(
1230 condition_list_element->condition);
1231 }
1232 free(condition_list_element);
1233 condition_found = true;
1234 break;
1235 }
1236
1237 if (!condition_found) {
1238 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1239 goto end;
1240 }
1241
1242 /*
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1245 */
1246 client_list = get_client_list_from_condition(state, condition);
1247 if (!client_list) {
1248 goto end;
1249 }
1250
1251 pthread_mutex_lock(&client_list->lock);
1252 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1253 &client_list->list, node) {
1254 if (client_list_element->client->id != client->id) {
1255 continue;
1256 }
1257 cds_list_del(&client_list_element->node);
1258 free(client_list_element);
1259 break;
1260 }
1261 pthread_mutex_unlock(&client_list->lock);
1262 notification_client_list_put(client_list);
1263 client_list = NULL;
1264 end:
1265 lttng_condition_destroy(condition);
1266 if (_status) {
1267 *_status = status;
1268 }
1269 return 0;
1270 }
1271
1272 static
1273 void free_notification_client_rcu(struct rcu_head *node)
1274 {
1275 free(caa_container_of(node, struct notification_client, rcu_node));
1276 }
1277
1278 static
1279 void notification_client_destroy(struct notification_client *client,
1280 struct notification_thread_state *state)
1281 {
1282 if (!client) {
1283 return;
1284 }
1285
1286 /*
1287 * The client object is not reachable by other threads, no need to lock
1288 * the client here.
1289 */
1290 if (client->socket >= 0) {
1291 (void) lttcomm_close_unix_sock(client->socket);
1292 client->socket = -1;
1293 }
1294 client->communication.active = false;
1295 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1296 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1297 pthread_mutex_destroy(&client->lock);
1298 call_rcu(&client->rcu_node, free_notification_client_rcu);
1299 }
1300
1301 /*
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1303 * client pointer).
1304 */
1305 static
1306 struct notification_client *get_client_from_socket(int socket,
1307 struct notification_thread_state *state)
1308 {
1309 struct cds_lfht_iter iter;
1310 struct cds_lfht_node *node;
1311 struct notification_client *client = NULL;
1312
1313 cds_lfht_lookup(state->client_socket_ht,
1314 hash_client_socket(socket),
1315 match_client_socket,
1316 (void *) (unsigned long) socket,
1317 &iter);
1318 node = cds_lfht_iter_get_node(&iter);
1319 if (!node) {
1320 goto end;
1321 }
1322
1323 client = caa_container_of(node, struct notification_client,
1324 client_socket_ht_node);
1325 end:
1326 return client;
1327 }
1328
1329 static
1330 bool buffer_usage_condition_applies_to_channel(
1331 const struct lttng_condition *condition,
1332 const struct channel_info *channel_info)
1333 {
1334 enum lttng_condition_status status;
1335 enum lttng_domain_type condition_domain;
1336 const char *condition_session_name = NULL;
1337 const char *condition_channel_name = NULL;
1338
1339 status = lttng_condition_buffer_usage_get_domain_type(condition,
1340 &condition_domain);
1341 assert(status == LTTNG_CONDITION_STATUS_OK);
1342 if (channel_info->key.domain != condition_domain) {
1343 goto fail;
1344 }
1345
1346 status = lttng_condition_buffer_usage_get_session_name(
1347 condition, &condition_session_name);
1348 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1349
1350 status = lttng_condition_buffer_usage_get_channel_name(
1351 condition, &condition_channel_name);
1352 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1353
1354 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1355 goto fail;
1356 }
1357 if (strcmp(channel_info->name, condition_channel_name)) {
1358 goto fail;
1359 }
1360
1361 return true;
1362 fail:
1363 return false;
1364 }
1365
1366 static
1367 bool session_consumed_size_condition_applies_to_channel(
1368 const struct lttng_condition *condition,
1369 const struct channel_info *channel_info)
1370 {
1371 enum lttng_condition_status status;
1372 const char *condition_session_name = NULL;
1373
1374 status = lttng_condition_session_consumed_size_get_session_name(
1375 condition, &condition_session_name);
1376 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1377
1378 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1379 goto fail;
1380 }
1381
1382 return true;
1383 fail:
1384 return false;
1385 }
1386
1387 static
1388 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1389 const struct channel_info *channel_info)
1390 {
1391 const struct lttng_condition *condition;
1392 bool trigger_applies;
1393
1394 condition = lttng_trigger_get_const_condition(trigger);
1395 if (!condition) {
1396 goto fail;
1397 }
1398
1399 switch (lttng_condition_get_type(condition)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1402 trigger_applies = buffer_usage_condition_applies_to_channel(
1403 condition, channel_info);
1404 break;
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1406 trigger_applies = session_consumed_size_condition_applies_to_channel(
1407 condition, channel_info);
1408 break;
1409 default:
1410 goto fail;
1411 }
1412
1413 return trigger_applies;
1414 fail:
1415 return false;
1416 }
1417
1418 static
1419 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1420 struct notification_client *client)
1421 {
1422 bool applies = false;
1423 struct lttng_condition_list_element *condition_list_element;
1424
1425 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1426 node) {
1427 applies = lttng_condition_is_equal(
1428 condition_list_element->condition,
1429 lttng_trigger_get_condition(trigger));
1430 if (applies) {
1431 break;
1432 }
1433 }
1434 return applies;
1435 }
1436
1437 /* Must be called with RCU read lock held. */
1438 static
1439 struct lttng_session_trigger_list *get_session_trigger_list(
1440 struct notification_thread_state *state,
1441 const char *session_name)
1442 {
1443 struct lttng_session_trigger_list *list = NULL;
1444 struct cds_lfht_node *node;
1445 struct cds_lfht_iter iter;
1446
1447 cds_lfht_lookup(state->session_triggers_ht,
1448 hash_key_str(session_name, lttng_ht_seed),
1449 match_session_trigger_list,
1450 session_name,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 if (!node) {
1454 /*
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1457 */
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1459 session_name);
1460 goto end;
1461 }
1462
1463 list = caa_container_of(node,
1464 struct lttng_session_trigger_list,
1465 session_triggers_ht_node);
1466 end:
1467 return list;
1468 }
1469
1470 /*
1471 * Allocate an empty lttng_session_trigger_list for the session named
1472 * 'session_name'.
1473 *
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1477 */
1478 static
1479 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1480 const char *session_name,
1481 struct cds_lfht *session_triggers_ht)
1482 {
1483 struct lttng_session_trigger_list *list;
1484
1485 list = zmalloc(sizeof(*list));
1486 if (!list) {
1487 goto end;
1488 }
1489 list->session_name = session_name;
1490 CDS_INIT_LIST_HEAD(&list->list);
1491 cds_lfht_node_init(&list->session_triggers_ht_node);
1492 list->session_triggers_ht = session_triggers_ht;
1493
1494 rcu_read_lock();
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht,
1497 hash_key_str(session_name, lttng_ht_seed),
1498 &list->session_triggers_ht_node);
1499 rcu_read_unlock();
1500 end:
1501 return list;
1502 }
1503
1504 static
1505 void free_session_trigger_list_rcu(struct rcu_head *node)
1506 {
1507 free(caa_container_of(node, struct lttng_session_trigger_list,
1508 rcu_node));
1509 }
1510
1511 static
1512 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1513 {
1514 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1515
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1518 &list->list, node) {
1519 cds_list_del(&trigger_list_element->node);
1520 free(trigger_list_element);
1521 }
1522 rcu_read_lock();
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list->session_triggers_ht,
1525 &list->session_triggers_ht_node);
1526 rcu_read_unlock();
1527 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1528 }
1529
1530 static
1531 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1532 const struct lttng_trigger *trigger)
1533 {
1534 int ret = 0;
1535 struct lttng_trigger_list_element *new_element =
1536 zmalloc(sizeof(*new_element));
1537
1538 if (!new_element) {
1539 ret = -1;
1540 goto end;
1541 }
1542 CDS_INIT_LIST_HEAD(&new_element->node);
1543 new_element->trigger = trigger;
1544 cds_list_add(&new_element->node, &list->list);
1545 end:
1546 return ret;
1547 }
1548
1549 static
1550 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1551 const char *session_name)
1552 {
1553 bool applies = false;
1554 const struct lttng_condition *condition;
1555
1556 condition = lttng_trigger_get_const_condition(trigger);
1557 switch (lttng_condition_get_type(condition)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1560 {
1561 enum lttng_condition_status condition_status;
1562 const char *condition_session_name;
1563
1564 condition_status = lttng_condition_session_rotation_get_session_name(
1565 condition, &condition_session_name);
1566 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1568 goto end;
1569 }
1570
1571 assert(condition_session_name);
1572 applies = !strcmp(condition_session_name, session_name);
1573 break;
1574 }
1575 default:
1576 goto end;
1577 }
1578 end:
1579 return applies;
1580 }
1581
1582 /*
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1585 *
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1589 */
1590 static
1591 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1592 const struct notification_thread_state *state,
1593 const char *session_name)
1594 {
1595 int trigger_count = 0;
1596 struct lttng_session_trigger_list *session_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 struct cds_lfht_iter iter;
1599
1600 session_trigger_list = lttng_session_trigger_list_create(session_name,
1601 state->session_triggers_ht);
1602
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1605 node) {
1606 int ret;
1607
1608 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1609 session_name)) {
1610 continue;
1611 }
1612
1613 ret = lttng_session_trigger_list_add(session_trigger_list,
1614 trigger_ht_element->trigger);
1615 if (ret) {
1616 goto error;
1617 }
1618
1619 trigger_count++;
1620 }
1621
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1623 trigger_count);
1624 return session_trigger_list;
1625 error:
1626 lttng_session_trigger_list_destroy(session_trigger_list);
1627 return NULL;
1628 }
1629
1630 static
1631 struct session_info *find_or_create_session_info(
1632 struct notification_thread_state *state,
1633 const char *name, uid_t uid, gid_t gid)
1634 {
1635 struct session_info *session = NULL;
1636 struct cds_lfht_node *node;
1637 struct cds_lfht_iter iter;
1638 struct lttng_session_trigger_list *trigger_list;
1639
1640 rcu_read_lock();
1641 cds_lfht_lookup(state->sessions_ht,
1642 hash_key_str(name, lttng_ht_seed),
1643 match_session,
1644 name,
1645 &iter);
1646 node = cds_lfht_iter_get_node(&iter);
1647 if (node) {
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1649 name, uid, gid);
1650 session = caa_container_of(node, struct session_info,
1651 sessions_ht_node);
1652 assert(session->uid == uid);
1653 assert(session->gid == gid);
1654 session_info_get(session);
1655 goto end;
1656 }
1657
1658 trigger_list = lttng_session_trigger_list_build(state, name);
1659 if (!trigger_list) {
1660 goto error;
1661 }
1662
1663 session = session_info_create(name, uid, gid, trigger_list,
1664 state->sessions_ht);
1665 if (!session) {
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1667 name, uid, gid);
1668 lttng_session_trigger_list_destroy(trigger_list);
1669 goto error;
1670 }
1671 trigger_list = NULL;
1672
1673 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1674 &session->sessions_ht_node);
1675 end:
1676 rcu_read_unlock();
1677 return session;
1678 error:
1679 rcu_read_unlock();
1680 session_info_put(session);
1681 return NULL;
1682 }
1683
1684 static
1685 int handle_notification_thread_command_add_channel(
1686 struct notification_thread_state *state,
1687 const char *session_name, uid_t session_uid, gid_t session_gid,
1688 const char *channel_name, enum lttng_domain_type channel_domain,
1689 uint64_t channel_key_int, uint64_t channel_capacity,
1690 enum lttng_error_code *cmd_result)
1691 {
1692 struct cds_list_head trigger_list;
1693 struct channel_info *new_channel_info = NULL;
1694 struct channel_key channel_key = {
1695 .key = channel_key_int,
1696 .domain = channel_domain,
1697 };
1698 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1699 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1700 int trigger_count = 0;
1701 struct cds_lfht_iter iter;
1702 struct session_info *session_info = NULL;
1703
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1705 channel_name, session_name, channel_key_int,
1706 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1707
1708 CDS_INIT_LIST_HEAD(&trigger_list);
1709
1710 session_info = find_or_create_session_info(state, session_name,
1711 session_uid, session_gid);
1712 if (!session_info) {
1713 /* Allocation error or an internal error occurred. */
1714 goto error;
1715 }
1716
1717 new_channel_info = channel_info_create(channel_name, &channel_key,
1718 channel_capacity, session_info);
1719 if (!new_channel_info) {
1720 goto error;
1721 }
1722
1723 rcu_read_lock();
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1726 node) {
1727 struct lttng_trigger_list_element *new_element;
1728
1729 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1730 new_channel_info)) {
1731 continue;
1732 }
1733
1734 new_element = zmalloc(sizeof(*new_element));
1735 if (!new_element) {
1736 rcu_read_unlock();
1737 goto error;
1738 }
1739 CDS_INIT_LIST_HEAD(&new_element->node);
1740 new_element->trigger = trigger_ht_element->trigger;
1741 cds_list_add(&new_element->node, &trigger_list);
1742 trigger_count++;
1743 }
1744 rcu_read_unlock();
1745
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1747 trigger_count);
1748 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1749 if (!channel_trigger_list) {
1750 goto error;
1751 }
1752 channel_trigger_list->channel_key = new_channel_info->key;
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1754 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1755 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1756
1757 rcu_read_lock();
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state->channels_ht,
1760 hash_channel_key(&new_channel_info->key),
1761 &new_channel_info->channels_ht_node);
1762 /*
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1765 */
1766 cds_lfht_add(state->channel_triggers_ht,
1767 hash_channel_key(&new_channel_info->key),
1768 &channel_trigger_list->channel_triggers_ht_node);
1769 rcu_read_unlock();
1770 session_info_put(session_info);
1771 *cmd_result = LTTNG_OK;
1772 return 0;
1773 error:
1774 channel_info_destroy(new_channel_info);
1775 session_info_put(session_info);
1776 return 1;
1777 }
1778
1779 static
1780 void free_channel_trigger_list_rcu(struct rcu_head *node)
1781 {
1782 free(caa_container_of(node, struct lttng_channel_trigger_list,
1783 rcu_node));
1784 }
1785
1786 static
1787 void free_channel_state_sample_rcu(struct rcu_head *node)
1788 {
1789 free(caa_container_of(node, struct channel_state_sample,
1790 rcu_node));
1791 }
1792
1793 static
1794 int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state *state,
1796 uint64_t channel_key, enum lttng_domain_type domain,
1797 enum lttng_error_code *cmd_result)
1798 {
1799 struct cds_lfht_node *node;
1800 struct cds_lfht_iter iter;
1801 struct lttng_channel_trigger_list *trigger_list;
1802 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1803 struct channel_key key = { .key = channel_key, .domain = domain };
1804 struct channel_info *channel_info;
1805
1806 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1807 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1808
1809 rcu_read_lock();
1810
1811 cds_lfht_lookup(state->channel_triggers_ht,
1812 hash_channel_key(&key),
1813 match_channel_trigger_list,
1814 &key,
1815 &iter);
1816 node = cds_lfht_iter_get_node(&iter);
1817 /*
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1820 */
1821 if (!node) {
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1823 goto end;
1824 }
1825
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1828 channel_triggers_ht_node);
1829 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1830 &trigger_list->list, node) {
1831 cds_list_del(&trigger_list_element->node);
1832 free(trigger_list_element);
1833 }
1834 cds_lfht_del(state->channel_triggers_ht, node);
1835 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1836
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state->channel_state_ht,
1839 hash_channel_key(&key),
1840 match_channel_state_sample,
1841 &key,
1842 &iter);
1843 node = cds_lfht_iter_get_node(&iter);
1844 /*
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1847 */
1848 if (node) {
1849 struct channel_state_sample *sample = caa_container_of(node,
1850 struct channel_state_sample,
1851 channel_state_ht_node);
1852
1853 cds_lfht_del(state->channel_state_ht, node);
1854 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1855 }
1856
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state->channels_ht,
1859 hash_channel_key(&key),
1860 match_channel_info,
1861 &key,
1862 &iter);
1863 node = cds_lfht_iter_get_node(&iter);
1864 assert(node);
1865 channel_info = caa_container_of(node, struct channel_info,
1866 channels_ht_node);
1867 cds_lfht_del(state->channels_ht, node);
1868 channel_info_destroy(channel_info);
1869 end:
1870 rcu_read_unlock();
1871 *cmd_result = LTTNG_OK;
1872 return 0;
1873 }
1874
1875 static
1876 int handle_notification_thread_command_session_rotation(
1877 struct notification_thread_state *state,
1878 enum notification_thread_command_type cmd_type,
1879 const char *session_name, uid_t session_uid, gid_t session_gid,
1880 uint64_t trace_archive_chunk_id,
1881 struct lttng_trace_archive_location *location,
1882 enum lttng_error_code *_cmd_result)
1883 {
1884 int ret = 0;
1885 enum lttng_error_code cmd_result = LTTNG_OK;
1886 struct lttng_session_trigger_list *trigger_list;
1887 struct lttng_trigger_list_element *trigger_list_element;
1888 struct session_info *session_info;
1889
1890 rcu_read_lock();
1891
1892 session_info = find_or_create_session_info(state, session_name,
1893 session_uid, session_gid);
1894 if (!session_info) {
1895 /* Allocation error or an internal error occurred. */
1896 ret = -1;
1897 cmd_result = LTTNG_ERR_NOMEM;
1898 goto end;
1899 }
1900
1901 session_info->rotation.ongoing =
1902 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1903 session_info->rotation.id = trace_archive_chunk_id;
1904 trigger_list = get_session_trigger_list(state, session_name);
1905 if (!trigger_list) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1907 session_name);
1908 goto end;
1909 }
1910
1911 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1912 node) {
1913 const struct lttng_condition *condition;
1914 const struct lttng_action *action;
1915 const struct lttng_trigger *trigger;
1916 struct notification_client_list *client_list;
1917 struct lttng_evaluation *evaluation = NULL;
1918 enum lttng_condition_type condition_type;
1919 bool client_list_is_empty;
1920
1921 trigger = trigger_list_element->trigger;
1922 condition = lttng_trigger_get_const_condition(trigger);
1923 assert(condition);
1924 condition_type = lttng_condition_get_type(condition);
1925
1926 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1927 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1928 continue;
1929 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1930 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1931 continue;
1932 }
1933
1934 action = lttng_trigger_get_const_action(trigger);
1935
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action) ==
1938 LTTNG_ACTION_TYPE_NOTIFY);
1939
1940 client_list = get_client_list_from_condition(state, condition);
1941 assert(client_list);
1942
1943 pthread_mutex_lock(&client_list->lock);
1944 client_list_is_empty = cds_list_empty(&client_list->list);
1945 pthread_mutex_unlock(&client_list->lock);
1946 if (client_list_is_empty) {
1947 /*
1948 * No clients interested in the evaluation's result,
1949 * skip it.
1950 */
1951 continue;
1952 }
1953
1954 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1955 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id);
1957 } else {
1958 evaluation = lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id, location);
1960 }
1961
1962 if (!evaluation) {
1963 /* Internal error */
1964 ret = -1;
1965 cmd_result = LTTNG_ERR_UNK;
1966 goto put_list;
1967 }
1968
1969 /* Dispatch evaluation result to all clients. */
1970 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1971 evaluation, client_list, state,
1972 session_info->uid,
1973 session_info->gid);
1974 lttng_evaluation_destroy(evaluation);
1975 put_list:
1976 notification_client_list_put(client_list);
1977 if (caa_unlikely(ret)) {
1978 break;
1979 }
1980 }
1981 end:
1982 session_info_put(session_info);
1983 *_cmd_result = cmd_result;
1984 rcu_read_unlock();
1985 return ret;
1986 }
1987
1988 static
1989 int condition_is_supported(struct lttng_condition *condition)
1990 {
1991 int ret;
1992
1993 switch (lttng_condition_get_type(condition)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1996 {
1997 enum lttng_domain_type domain;
1998
1999 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2000 &domain);
2001 if (ret) {
2002 ret = -1;
2003 goto end;
2004 }
2005
2006 if (domain != LTTNG_DOMAIN_KERNEL) {
2007 ret = 1;
2008 goto end;
2009 }
2010
2011 /*
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2015 */
2016 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2017 break;
2018 }
2019 default:
2020 ret = 1;
2021 }
2022 end:
2023 return ret;
2024 }
2025
2026 /* Must be called with RCU read lock held. */
2027 static
2028 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
2029 struct notification_thread_state *state)
2030 {
2031 int ret = 0;
2032 const struct lttng_condition *condition;
2033 const char *session_name;
2034 struct lttng_session_trigger_list *trigger_list;
2035
2036 condition = lttng_trigger_get_const_condition(trigger);
2037 switch (lttng_condition_get_type(condition)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2040 {
2041 enum lttng_condition_status status;
2042
2043 status = lttng_condition_session_rotation_get_session_name(
2044 condition, &session_name);
2045 if (status != LTTNG_CONDITION_STATUS_OK) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2047 ret = -1;
2048 goto end;
2049 }
2050 break;
2051 }
2052 default:
2053 ret = -1;
2054 goto end;
2055 }
2056
2057 trigger_list = get_session_trigger_list(state, session_name);
2058 if (!trigger_list) {
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2060 session_name);
2061 goto end;
2062
2063 }
2064
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2066 session_name);
2067 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2068 end:
2069 return ret;
2070 }
2071
2072 /* Must be called with RCU read lock held. */
2073 static
2074 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
2075 struct notification_thread_state *state)
2076 {
2077 int ret = 0;
2078 struct cds_lfht_node *node;
2079 struct cds_lfht_iter iter;
2080 struct channel_info *channel;
2081
2082 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2083 channels_ht_node) {
2084 struct lttng_trigger_list_element *trigger_list_element;
2085 struct lttng_channel_trigger_list *trigger_list;
2086 struct cds_lfht_iter lookup_iter;
2087
2088 if (!trigger_applies_to_channel(trigger, channel)) {
2089 continue;
2090 }
2091
2092 cds_lfht_lookup(state->channel_triggers_ht,
2093 hash_channel_key(&channel->key),
2094 match_channel_trigger_list,
2095 &channel->key,
2096 &lookup_iter);
2097 node = cds_lfht_iter_get_node(&lookup_iter);
2098 assert(node);
2099 trigger_list = caa_container_of(node,
2100 struct lttng_channel_trigger_list,
2101 channel_triggers_ht_node);
2102
2103 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2104 if (!trigger_list_element) {
2105 ret = -1;
2106 goto end;
2107 }
2108 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2109 trigger_list_element->trigger = trigger;
2110 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2112 channel->name);
2113 }
2114 end:
2115 return ret;
2116 }
2117
2118 /*
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2121 *
2122 * The effects of this are benign since:
2123 * - The client will succeed in registering the trigger, as it is valid,
2124 * - The trigger will, internally, be bound to the channel/session,
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
2127 *
2128 * If this function returns a non-zero value, it means something is
2129 * fundamentally broken and the whole subsystem/thread will be torn down.
2130 *
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2132 * error code.
2133 */
2134 static
2135 int handle_notification_thread_command_register_trigger(
2136 struct notification_thread_state *state,
2137 struct lttng_trigger *trigger,
2138 enum lttng_error_code *cmd_result)
2139 {
2140 int ret = 0;
2141 struct lttng_condition *condition;
2142 struct notification_client *client;
2143 struct notification_client_list *client_list = NULL;
2144 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2145 struct notification_client_list_element *client_list_element, *tmp;
2146 struct cds_lfht_node *node;
2147 struct cds_lfht_iter iter;
2148 bool free_trigger = true;
2149
2150 rcu_read_lock();
2151
2152 condition = lttng_trigger_get_condition(trigger);
2153 assert(condition);
2154
2155 ret = condition_is_supported(condition);
2156 if (ret < 0) {
2157 goto error;
2158 } else if (ret == 0) {
2159 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2160 goto error;
2161 } else {
2162 /* Feature is supported, continue. */
2163 ret = 0;
2164 }
2165
2166 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2167 if (!trigger_ht_element) {
2168 ret = -1;
2169 goto error;
2170 }
2171
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element->node);
2174 trigger_ht_element->trigger = trigger;
2175
2176 node = cds_lfht_add_unique(state->triggers_ht,
2177 lttng_condition_hash(condition),
2178 match_condition,
2179 condition,
2180 &trigger_ht_element->node);
2181 if (node != &trigger_ht_element->node) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2184 goto error_free_ht_element;
2185 }
2186
2187 /*
2188 * Ownership of the trigger and of its wrapper was transfered to
2189 * the triggers_ht.
2190 */
2191 trigger_ht_element = NULL;
2192 free_trigger = false;
2193
2194 /*
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2197 * supported.
2198 */
2199 client_list = notification_client_list_create(trigger);
2200 if (!client_list) {
2201 ret = -1;
2202 goto error_free_ht_element;
2203 }
2204
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2207 client_socket_ht_node) {
2208 if (!trigger_applies_to_client(trigger, client)) {
2209 continue;
2210 }
2211
2212 client_list_element = zmalloc(sizeof(*client_list_element));
2213 if (!client_list_element) {
2214 ret = -1;
2215 goto error_put_client_list;
2216 }
2217 CDS_INIT_LIST_HEAD(&client_list_element->node);
2218 client_list_element->client = client;
2219 cds_list_add(&client_list_element->node, &client_list->list);
2220 }
2221
2222 switch (get_condition_binding_object(condition)) {
2223 case LTTNG_OBJECT_TYPE_SESSION:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret = bind_trigger_to_matching_session(trigger, state);
2226 if (ret) {
2227 goto error_put_client_list;
2228 }
2229 break;
2230 case LTTNG_OBJECT_TYPE_CHANNEL:
2231 /*
2232 * Add the trigger to list of triggers bound to the channels
2233 * currently known.
2234 */
2235 ret = bind_trigger_to_matching_channels(trigger, state);
2236 if (ret) {
2237 goto error_put_client_list;
2238 }
2239 break;
2240 case LTTNG_OBJECT_TYPE_NONE:
2241 break;
2242 default:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2244 ret = -1;
2245 goto error_put_client_list;
2246 }
2247
2248 /*
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2252 *
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2255 * that was broken.
2256 *
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2259 * conditions.
2260 *
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2266 *
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
2273 *
2274 * No need to lock the list here as it has not been published yet.
2275 */
2276 cds_list_for_each_entry_safe(client_list_element, tmp,
2277 &client_list->list, node) {
2278 ret = evaluate_condition_for_client(trigger, condition,
2279 client_list_element->client, state);
2280 if (ret) {
2281 goto error_put_client_list;
2282 }
2283 }
2284
2285 /*
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2288 */
2289 publish_notification_client_list(state, client_list);
2290 client_list = NULL;
2291
2292 *cmd_result = LTTNG_OK;
2293
2294 error_put_client_list:
2295 notification_client_list_put(client_list);
2296
2297 error_free_ht_element:
2298 free(trigger_ht_element);
2299 error:
2300 if (free_trigger) {
2301 lttng_trigger_destroy(trigger);
2302 }
2303 rcu_read_unlock();
2304 return ret;
2305 }
2306
2307 static
2308 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2309 {
2310 free(caa_container_of(node, struct lttng_trigger_ht_element,
2311 rcu_node));
2312 }
2313
2314 static
2315 int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state *state,
2317 struct lttng_trigger *trigger,
2318 enum lttng_error_code *_cmd_reply)
2319 {
2320 struct cds_lfht_iter iter;
2321 struct cds_lfht_node *triggers_ht_node;
2322 struct lttng_channel_trigger_list *trigger_list;
2323 struct notification_client_list *client_list;
2324 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2325 struct lttng_condition *condition = lttng_trigger_get_condition(
2326 trigger);
2327 enum lttng_error_code cmd_reply;
2328
2329 rcu_read_lock();
2330
2331 cds_lfht_lookup(state->triggers_ht,
2332 lttng_condition_hash(condition),
2333 match_condition,
2334 condition,
2335 &iter);
2336 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2337 if (!triggers_ht_node) {
2338 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2339 goto end;
2340 } else {
2341 cmd_reply = LTTNG_OK;
2342 }
2343
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2346 channel_triggers_ht_node) {
2347 struct lttng_trigger_list_element *trigger_element, *tmp;
2348
2349 cds_list_for_each_entry_safe(trigger_element, tmp,
2350 &trigger_list->list, node) {
2351 const struct lttng_condition *current_condition =
2352 lttng_trigger_get_const_condition(
2353 trigger_element->trigger);
2354
2355 assert(current_condition);
2356 if (!lttng_condition_is_equal(condition,
2357 current_condition)) {
2358 continue;
2359 }
2360
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element->node);
2363 /* A trigger can only appear once per channel */
2364 break;
2365 }
2366 }
2367
2368 /*
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2371 */
2372 client_list = get_client_list_from_condition(state, condition);
2373 assert(client_list);
2374
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list);
2377 notification_client_list_put(client_list);
2378 client_list = NULL;
2379
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element = caa_container_of(triggers_ht_node,
2382 struct lttng_trigger_ht_element, node);
2383 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2384
2385 /* Release the ownership of the trigger. */
2386 lttng_trigger_destroy(trigger_ht_element->trigger);
2387 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2388 end:
2389 rcu_read_unlock();
2390 if (_cmd_reply) {
2391 *_cmd_reply = cmd_reply;
2392 }
2393 return 0;
2394 }
2395
2396 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2397 int handle_notification_thread_command(
2398 struct notification_thread_handle *handle,
2399 struct notification_thread_state *state)
2400 {
2401 int ret;
2402 uint64_t counter;
2403 struct notification_thread_command *cmd;
2404
2405 /* Read the event pipe to put it back into a quiescent state. */
2406 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2407 sizeof(counter));
2408 if (ret != sizeof(counter)) {
2409 goto error;
2410 }
2411
2412 pthread_mutex_lock(&handle->cmd_queue.lock);
2413 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2414 struct notification_thread_command, cmd_list_node);
2415 cds_list_del(&cmd->cmd_list_node);
2416 pthread_mutex_unlock(&handle->cmd_queue.lock);
2417 switch (cmd->type) {
2418 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2419 DBG("[notification-thread] Received register trigger command");
2420 ret = handle_notification_thread_command_register_trigger(
2421 state, cmd->parameters.trigger,
2422 &cmd->reply_code);
2423 break;
2424 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2425 DBG("[notification-thread] Received unregister trigger command");
2426 ret = handle_notification_thread_command_unregister_trigger(
2427 state, cmd->parameters.trigger,
2428 &cmd->reply_code);
2429 break;
2430 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2431 DBG("[notification-thread] Received add channel command");
2432 ret = handle_notification_thread_command_add_channel(
2433 state,
2434 cmd->parameters.add_channel.session.name,
2435 cmd->parameters.add_channel.session.uid,
2436 cmd->parameters.add_channel.session.gid,
2437 cmd->parameters.add_channel.channel.name,
2438 cmd->parameters.add_channel.channel.domain,
2439 cmd->parameters.add_channel.channel.key,
2440 cmd->parameters.add_channel.channel.capacity,
2441 &cmd->reply_code);
2442 break;
2443 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2444 DBG("[notification-thread] Received remove channel command");
2445 ret = handle_notification_thread_command_remove_channel(
2446 state, cmd->parameters.remove_channel.key,
2447 cmd->parameters.remove_channel.domain,
2448 &cmd->reply_code);
2449 break;
2450 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2451 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2452 DBG("[notification-thread] Received session rotation %s command",
2453 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2454 "ongoing" : "completed");
2455 ret = handle_notification_thread_command_session_rotation(
2456 state,
2457 cmd->type,
2458 cmd->parameters.session_rotation.session_name,
2459 cmd->parameters.session_rotation.uid,
2460 cmd->parameters.session_rotation.gid,
2461 cmd->parameters.session_rotation.trace_archive_chunk_id,
2462 cmd->parameters.session_rotation.location,
2463 &cmd->reply_code);
2464 break;
2465 case NOTIFICATION_COMMAND_TYPE_QUIT:
2466 DBG("[notification-thread] Received quit command");
2467 cmd->reply_code = LTTNG_OK;
2468 ret = 1;
2469 goto end;
2470 default:
2471 ERR("[notification-thread] Unknown internal command received");
2472 goto error_unlock;
2473 }
2474
2475 if (ret) {
2476 goto error_unlock;
2477 }
2478 end:
2479 if (cmd->is_async) {
2480 free(cmd);
2481 cmd = NULL;
2482 } else {
2483 lttng_waiter_wake_up(&cmd->reply_waiter);
2484 }
2485 return ret;
2486 error_unlock:
2487 /* Wake-up and return a fatal error to the calling thread. */
2488 lttng_waiter_wake_up(&cmd->reply_waiter);
2489 cmd->reply_code = LTTNG_ERR_FATAL;
2490 error:
2491 /* Indicate a fatal error to the caller. */
2492 return -1;
2493 }
2494
2495 static
2496 int socket_set_non_blocking(int socket)
2497 {
2498 int ret, flags;
2499
2500 /* Set the pipe as non-blocking. */
2501 ret = fcntl(socket, F_GETFL, 0);
2502 if (ret == -1) {
2503 PERROR("fcntl get socket flags");
2504 goto end;
2505 }
2506 flags = ret;
2507
2508 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2509 if (ret == -1) {
2510 PERROR("fcntl set O_NONBLOCK socket flag");
2511 goto end;
2512 }
2513 DBG("Client socket (fd = %i) set as non-blocking", socket);
2514 end:
2515 return ret;
2516 }
2517
2518 /* Client lock must be acquired by caller. */
2519 static
2520 int client_reset_inbound_state(struct notification_client *client)
2521 {
2522 int ret;
2523
2524 ASSERT_LOCKED(client->lock);
2525
2526 ret = lttng_dynamic_buffer_set_size(
2527 &client->communication.inbound.buffer, 0);
2528 assert(!ret);
2529
2530 client->communication.inbound.bytes_to_receive =
2531 sizeof(struct lttng_notification_channel_message);
2532 client->communication.inbound.msg_type =
2533 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2534 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2535 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2536 ret = lttng_dynamic_buffer_set_size(
2537 &client->communication.inbound.buffer,
2538 client->communication.inbound.bytes_to_receive);
2539 return ret;
2540 }
2541
2542 int handle_notification_thread_client_connect(
2543 struct notification_thread_state *state)
2544 {
2545 int ret;
2546 struct notification_client *client;
2547
2548 DBG("[notification-thread] Handling new notification channel client connection");
2549
2550 client = zmalloc(sizeof(*client));
2551 if (!client) {
2552 /* Fatal error. */
2553 ret = -1;
2554 goto error;
2555 }
2556 pthread_mutex_init(&client->lock, NULL);
2557 client->id = state->next_notification_client_id++;
2558 CDS_INIT_LIST_HEAD(&client->condition_list);
2559 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2560 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2561 client->communication.inbound.expect_creds = true;
2562
2563 pthread_mutex_lock(&client->lock);
2564 ret = client_reset_inbound_state(client);
2565 pthread_mutex_unlock(&client->lock);
2566 if (ret) {
2567 ERR("[notification-thread] Failed to reset client communication's inbound state");
2568 ret = 0;
2569 goto error;
2570 }
2571
2572 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2573 if (ret < 0) {
2574 ERR("[notification-thread] Failed to accept new notification channel client connection");
2575 ret = 0;
2576 goto error;
2577 }
2578
2579 client->socket = ret;
2580
2581 ret = socket_set_non_blocking(client->socket);
2582 if (ret) {
2583 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2584 goto error;
2585 }
2586
2587 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2588 if (ret < 0) {
2589 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2590 ret = 0;
2591 goto error;
2592 }
2593
2594 ret = lttng_poll_add(&state->events, client->socket,
2595 LPOLLIN | LPOLLERR |
2596 LPOLLHUP | LPOLLRDHUP);
2597 if (ret < 0) {
2598 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2599 ret = 0;
2600 goto error;
2601 }
2602 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2603 client->socket);
2604
2605 rcu_read_lock();
2606 cds_lfht_add(state->client_socket_ht,
2607 hash_client_socket(client->socket),
2608 &client->client_socket_ht_node);
2609 cds_lfht_add(state->client_id_ht,
2610 hash_client_id(client->id),
2611 &client->client_id_ht_node);
2612 rcu_read_unlock();
2613
2614 return ret;
2615 error:
2616 notification_client_destroy(client, state);
2617 return ret;
2618 }
2619
2620 /* RCU read-lock must be held by the caller. */
2621 /* Client lock must be held by the caller */
2622 static
2623 int notification_thread_client_disconnect(
2624 struct notification_client *client,
2625 struct notification_thread_state *state)
2626 {
2627 int ret;
2628 struct lttng_condition_list_element *condition_list_element, *tmp;
2629
2630 /* Acquire the client lock to disable its communication atomically. */
2631 client->communication.active = false;
2632 ret = lttng_poll_del(&state->events, client->socket);
2633 if (ret) {
2634 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2635 client->socket);
2636 }
2637
2638 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2639 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2640
2641 /* Release all conditions to which the client was subscribed. */
2642 cds_list_for_each_entry_safe(condition_list_element, tmp,
2643 &client->condition_list, node) {
2644 (void) notification_thread_client_unsubscribe(client,
2645 condition_list_element->condition, state, NULL);
2646 }
2647
2648 /*
2649 * Client no longer accessible to other threads (through the
2650 * client lists).
2651 */
2652 notification_client_destroy(client, state);
2653 return ret;
2654 }
2655
2656 int handle_notification_thread_client_disconnect(
2657 int client_socket, struct notification_thread_state *state)
2658 {
2659 int ret = 0;
2660 struct notification_client *client;
2661
2662 rcu_read_lock();
2663 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2664 client_socket);
2665 client = get_client_from_socket(client_socket, state);
2666 if (!client) {
2667 /* Internal state corruption, fatal error. */
2668 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2669 client_socket);
2670 ret = -1;
2671 goto end;
2672 }
2673
2674 pthread_mutex_lock(&client->lock);
2675 ret = notification_thread_client_disconnect(client, state);
2676 pthread_mutex_unlock(&client->lock);
2677 end:
2678 rcu_read_unlock();
2679 return ret;
2680 }
2681
2682 int handle_notification_thread_client_disconnect_all(
2683 struct notification_thread_state *state)
2684 {
2685 struct cds_lfht_iter iter;
2686 struct notification_client *client;
2687 bool error_encoutered = false;
2688
2689 rcu_read_lock();
2690 DBG("[notification-thread] Closing all client connections");
2691 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2692 client_socket_ht_node) {
2693 int ret;
2694
2695 pthread_mutex_lock(&client->lock);
2696 ret = notification_thread_client_disconnect(
2697 client, state);
2698 pthread_mutex_unlock(&client->lock);
2699 if (ret) {
2700 error_encoutered = true;
2701 }
2702 }
2703 rcu_read_unlock();
2704 return error_encoutered ? 1 : 0;
2705 }
2706
2707 int handle_notification_thread_trigger_unregister_all(
2708 struct notification_thread_state *state)
2709 {
2710 bool error_occurred = false;
2711 struct cds_lfht_iter iter;
2712 struct lttng_trigger_ht_element *trigger_ht_element;
2713
2714 rcu_read_lock();
2715 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2716 node) {
2717 int ret = handle_notification_thread_command_unregister_trigger(
2718 state, trigger_ht_element->trigger, NULL);
2719 if (ret) {
2720 error_occurred = true;
2721 }
2722 }
2723 rcu_read_unlock();
2724 return error_occurred ? -1 : 0;
2725 }
2726
2727 static
2728 int client_handle_transmission_status(
2729 struct notification_client *client,
2730 enum client_transmission_status transmission_status,
2731 struct notification_thread_state *state)
2732 {
2733 int ret = 0;
2734
2735 switch (transmission_status) {
2736 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
2737 ret = lttng_poll_mod(&state->events, client->socket,
2738 CLIENT_POLL_MASK_IN);
2739 if (ret) {
2740 goto end;
2741 }
2742
2743 client->communication.outbound.queued_command_reply = false;
2744 client->communication.outbound.dropped_notification = false;
2745 break;
2746 case CLIENT_TRANSMISSION_STATUS_QUEUED:
2747 /*
2748 * We want to be notified whenever there is buffer space
2749 * available to send the rest of the payload.
2750 */
2751 ret = lttng_poll_mod(&state->events, client->socket,
2752 CLIENT_POLL_MASK_IN_OUT);
2753 if (ret) {
2754 goto end;
2755 }
2756 break;
2757 case CLIENT_TRANSMISSION_STATUS_FAIL:
2758 ret = notification_thread_client_disconnect(client, state);
2759 if (ret) {
2760 goto end;
2761 }
2762 break;
2763 case CLIENT_TRANSMISSION_STATUS_ERROR:
2764 ret = -1;
2765 goto end;
2766 default:
2767 abort();
2768 }
2769 end:
2770 return ret;
2771 }
2772
2773 /* Client lock must be acquired by caller. */
2774 static
2775 enum client_transmission_status client_flush_outgoing_queue(
2776 struct notification_client *client,
2777 struct notification_thread_state *state)
2778 {
2779 ssize_t ret;
2780 size_t to_send_count;
2781 enum client_transmission_status status;
2782
2783 ASSERT_LOCKED(client->lock);
2784
2785 assert(client->communication.outbound.buffer.size != 0);
2786 to_send_count = client->communication.outbound.buffer.size;
2787 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2788 client->socket);
2789
2790 ret = lttcomm_send_unix_sock_non_block(client->socket,
2791 client->communication.outbound.buffer.data,
2792 to_send_count);
2793 if ((ret >= 0 && ret < to_send_count)) {
2794 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2795 client->socket);
2796 to_send_count -= max(ret, 0);
2797
2798 memcpy(client->communication.outbound.buffer.data,
2799 client->communication.outbound.buffer.data +
2800 client->communication.outbound.buffer.size - to_send_count,
2801 to_send_count);
2802 ret = lttng_dynamic_buffer_set_size(
2803 &client->communication.outbound.buffer,
2804 to_send_count);
2805 if (ret) {
2806 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2807 goto error;
2808 }
2809 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
2810 } else if (ret < 0) {
2811 /* Generic error, disconnect the client. */
2812 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
2813 client->socket);
2814 status = CLIENT_TRANSMISSION_STATUS_FAIL;
2815 } else {
2816 /* No error and flushed the queue completely. */
2817 ret = lttng_dynamic_buffer_set_size(
2818 &client->communication.outbound.buffer, 0);
2819 if (ret) {
2820 status = CLIENT_TRANSMISSION_STATUS_ERROR;
2821 goto error;
2822 }
2823 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
2824 }
2825
2826 ret = client_handle_transmission_status(client, status, state);
2827 if (ret) {
2828 goto error;
2829 }
2830
2831 return 0;
2832 error:
2833 return -1;
2834 }
2835
2836 /* Client lock must be acquired by caller. */
2837 static
2838 int client_send_command_reply(struct notification_client *client,
2839 struct notification_thread_state *state,
2840 enum lttng_notification_channel_status status)
2841 {
2842 int ret;
2843 struct lttng_notification_channel_command_reply reply = {
2844 .status = (int8_t) status,
2845 };
2846 struct lttng_notification_channel_message msg = {
2847 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2848 .size = sizeof(reply),
2849 };
2850 char buffer[sizeof(msg) + sizeof(reply)];
2851
2852 ASSERT_LOCKED(client->lock);
2853
2854 if (client->communication.outbound.queued_command_reply) {
2855 /* Protocol error. */
2856 goto error;
2857 }
2858
2859 memcpy(buffer, &msg, sizeof(msg));
2860 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2861 DBG("[notification-thread] Send command reply (%i)", (int) status);
2862
2863 /* Enqueue buffer to outgoing queue and flush it. */
2864 ret = lttng_dynamic_buffer_append(
2865 &client->communication.outbound.buffer,
2866 buffer, sizeof(buffer));
2867 if (ret) {
2868 goto error;
2869 }
2870
2871 ret = client_flush_outgoing_queue(client, state);
2872 if (ret) {
2873 goto error;
2874 }
2875
2876 if (client->communication.outbound.buffer.size != 0) {
2877 /* Queue could not be emptied. */
2878 client->communication.outbound.queued_command_reply = true;
2879 }
2880
2881 return 0;
2882 error:
2883 return -1;
2884 }
2885
2886 static
2887 int client_handle_message_unknown(struct notification_client *client,
2888 struct notification_thread_state *state)
2889 {
2890 int ret;
2891
2892 pthread_mutex_lock(&client->lock);
2893
2894 /*
2895 * Receiving message header. The function will be called again
2896 * once the rest of the message as been received and can be
2897 * interpreted.
2898 */
2899 const struct lttng_notification_channel_message *msg;
2900
2901 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2902 msg = (const struct lttng_notification_channel_message *)
2903 client->communication.inbound.buffer.data;
2904
2905 if (msg->size == 0 ||
2906 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2907 ERR("[notification-thread] Invalid notification channel message: length = %u",
2908 msg->size);
2909 ret = -1;
2910 goto end;
2911 }
2912
2913 switch (msg->type) {
2914 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2915 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2916 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2917 break;
2918 default:
2919 ret = -1;
2920 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2921 goto end;
2922 }
2923
2924 client->communication.inbound.bytes_to_receive = msg->size;
2925 client->communication.inbound.msg_type =
2926 (enum lttng_notification_channel_message_type) msg->type;
2927 ret = lttng_dynamic_buffer_set_size(
2928 &client->communication.inbound.buffer, msg->size);
2929 end:
2930 pthread_mutex_unlock(&client->lock);
2931 return ret;
2932 }
2933
2934 static
2935 int client_handle_message_handshake(struct notification_client *client,
2936 struct notification_thread_state *state)
2937 {
2938 int ret;
2939 struct lttng_notification_channel_command_handshake *handshake_client;
2940 const struct lttng_notification_channel_command_handshake handshake_reply = {
2941 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2942 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2943 };
2944 const struct lttng_notification_channel_message msg_header = {
2945 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2946 .size = sizeof(handshake_reply),
2947 };
2948 enum lttng_notification_channel_status status =
2949 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2950 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2951
2952 pthread_mutex_lock(&client->lock);
2953
2954 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2955 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2956 sizeof(handshake_reply));
2957
2958 handshake_client =
2959 (struct lttng_notification_channel_command_handshake *)
2960 client->communication.inbound.buffer
2961 .data;
2962 client->major = handshake_client->major;
2963 client->minor = handshake_client->minor;
2964 if (!client->communication.inbound.creds_received) {
2965 ERR("[notification-thread] No credentials received from client");
2966 ret = -1;
2967 goto end;
2968 }
2969
2970 client->uid = LTTNG_SOCK_GET_UID_CRED(
2971 &client->communication.inbound.creds);
2972 client->gid = LTTNG_SOCK_GET_GID_CRED(
2973 &client->communication.inbound.creds);
2974 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2975 client->uid, client->gid, (int) client->major,
2976 (int) client->minor);
2977
2978 if (handshake_client->major !=
2979 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2980 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2981 }
2982
2983 ret = lttng_dynamic_buffer_append(
2984 &client->communication.outbound.buffer, send_buffer,
2985 sizeof(send_buffer));
2986 if (ret) {
2987 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2988 goto end;
2989 }
2990
2991 client->validated = true;
2992 client->communication.active = true;
2993
2994 ret = client_flush_outgoing_queue(client, state);
2995 if (ret) {
2996 goto end;
2997 }
2998
2999 ret = client_send_command_reply(client, state, status);
3000 if (ret) {
3001 ERR("[notification-thread] Failed to send reply to notification channel client");
3002 goto end;
3003 }
3004
3005 /* Set reception state to receive the next message header. */
3006 ret = client_reset_inbound_state(client);
3007 if (ret) {
3008 ERR("[notification-thread] Failed to reset client communication's inbound state");
3009 goto end;
3010 }
3011
3012 end:
3013 pthread_mutex_unlock(&client->lock);
3014 return ret;
3015 }
3016
3017 static
3018 int client_handle_message_subscription(
3019 struct notification_client *client,
3020 enum lttng_notification_channel_message_type msg_type,
3021 struct notification_thread_state *state)
3022 {
3023 int ret;
3024 struct lttng_condition *condition;
3025 enum lttng_notification_channel_status status =
3026 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3027 struct lttng_payload_view condition_view =
3028 lttng_payload_view_from_dynamic_buffer(
3029 &client->communication.inbound.buffer,
3030 0, -1);
3031 size_t expected_condition_size;
3032
3033 pthread_mutex_lock(&client->lock);
3034 expected_condition_size = client->communication.inbound.buffer.size;
3035 pthread_mutex_unlock(&client->lock);
3036
3037 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3038 if (ret != expected_condition_size) {
3039 ERR("[notification-thread] Malformed condition received from client");
3040 goto end;
3041 }
3042
3043 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3044 ret = notification_thread_client_subscribe(
3045 client, condition, state, &status);
3046 } else {
3047 ret = notification_thread_client_unsubscribe(
3048 client, condition, state, &status);
3049 }
3050 if (ret) {
3051 goto end;
3052 }
3053
3054 pthread_mutex_lock(&client->lock);
3055 ret = client_send_command_reply(client, state, status);
3056 if (ret) {
3057 ERR("[notification-thread] Failed to send reply to notification channel client");
3058 goto end_unlock;
3059 }
3060
3061 /* Set reception state to receive the next message header. */
3062 ret = client_reset_inbound_state(client);
3063 if (ret) {
3064 ERR("[notification-thread] Failed to reset client communication's inbound state");
3065 goto end_unlock;
3066 }
3067
3068 end_unlock:
3069 pthread_mutex_unlock(&client->lock);
3070 end:
3071 return ret;
3072 }
3073
3074 static
3075 int client_dispatch_message(struct notification_client *client,
3076 struct notification_thread_state *state)
3077 {
3078 int ret = 0;
3079
3080 if (client->communication.inbound.msg_type !=
3081 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3082 client->communication.inbound.msg_type !=
3083 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3084 !client->validated) {
3085 WARN("[notification-thread] client attempted a command before handshake");
3086 ret = -1;
3087 goto end;
3088 }
3089
3090 switch (client->communication.inbound.msg_type) {
3091 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3092 {
3093 ret = client_handle_message_unknown(client, state);
3094 break;
3095 }
3096 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3097 {
3098 ret = client_handle_message_handshake(client, state);
3099 break;
3100 }
3101 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3102 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3103 {
3104 ret = client_handle_message_subscription(client,
3105 client->communication.inbound.msg_type, state);
3106 break;
3107 }
3108 default:
3109 abort();
3110 }
3111 end:
3112 return ret;
3113 }
3114
3115 /* Incoming data from client. */
3116 int handle_notification_thread_client_in(
3117 struct notification_thread_state *state, int socket)
3118 {
3119 int ret = 0;
3120 struct notification_client *client;
3121 ssize_t recv_ret;
3122 size_t offset;
3123 bool message_is_complete = false;
3124
3125 client = get_client_from_socket(socket, state);
3126 if (!client) {
3127 /* Internal error, abort. */
3128 ret = -1;
3129 goto end;
3130 }
3131
3132 pthread_mutex_lock(&client->lock);
3133 offset = client->communication.inbound.buffer.size -
3134 client->communication.inbound.bytes_to_receive;
3135 if (client->communication.inbound.expect_creds) {
3136 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3137 client->communication.inbound.buffer.data + offset,
3138 client->communication.inbound.bytes_to_receive,
3139 &client->communication.inbound.creds);
3140 if (recv_ret > 0) {
3141 client->communication.inbound.expect_creds = false;
3142 client->communication.inbound.creds_received = true;
3143 }
3144 } else {
3145 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3146 client->communication.inbound.buffer.data + offset,
3147 client->communication.inbound.bytes_to_receive);
3148 }
3149 if (recv_ret >= 0) {
3150 client->communication.inbound.bytes_to_receive -= recv_ret;
3151 message_is_complete = client->communication.inbound
3152 .bytes_to_receive == 0;
3153 }
3154 pthread_mutex_unlock(&client->lock);
3155 if (recv_ret < 0) {
3156 goto error_disconnect_client;
3157 }
3158
3159 if (message_is_complete) {
3160 ret = client_dispatch_message(client, state);
3161 if (ret) {
3162 /*
3163 * Only returns an error if this client must be
3164 * disconnected.
3165 */
3166 goto error_disconnect_client;
3167 }
3168 }
3169 end:
3170 return ret;
3171 error_disconnect_client:
3172 pthread_mutex_lock(&client->lock);
3173 ret = notification_thread_client_disconnect(client, state);
3174 pthread_mutex_unlock(&client->lock);
3175 return ret;
3176 }
3177
3178 /* Client ready to receive outgoing data. */
3179 int handle_notification_thread_client_out(
3180 struct notification_thread_state *state, int socket)
3181 {
3182 int ret;
3183 struct notification_client *client;
3184
3185 client = get_client_from_socket(socket, state);
3186 if (!client) {
3187 /* Internal error, abort. */
3188 ret = -1;
3189 goto end;
3190 }
3191
3192 pthread_mutex_lock(&client->lock);
3193 ret = client_flush_outgoing_queue(client, state);
3194 pthread_mutex_unlock(&client->lock);
3195 if (ret) {
3196 goto end;
3197 }
3198 end:
3199 return ret;
3200 }
3201
3202 static
3203 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3204 const struct channel_state_sample *sample,
3205 uint64_t buffer_capacity)
3206 {
3207 bool result = false;
3208 uint64_t threshold;
3209 enum lttng_condition_type condition_type;
3210 const struct lttng_condition_buffer_usage *use_condition = container_of(
3211 condition, struct lttng_condition_buffer_usage,
3212 parent);
3213
3214 if (use_condition->threshold_bytes.set) {
3215 threshold = use_condition->threshold_bytes.value;
3216 } else {
3217 /*
3218 * Threshold was expressed as a ratio.
3219 *
3220 * TODO the threshold (in bytes) of conditions expressed
3221 * as a ratio of total buffer size could be cached to
3222 * forego this double-multiplication or it could be performed
3223 * as fixed-point math.
3224 *
3225 * Note that caching should accommodates the case where the
3226 * condition applies to multiple channels (i.e. don't assume
3227 * that all channels matching my_chann* have the same size...)
3228 */
3229 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3230 (double) buffer_capacity);
3231 }
3232
3233 condition_type = lttng_condition_get_type(condition);
3234 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3235 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3236 threshold, sample->highest_usage);
3237
3238 /*
3239 * The low condition should only be triggered once _all_ of the
3240 * streams in a channel have gone below the "low" threshold.
3241 */
3242 if (sample->highest_usage <= threshold) {
3243 result = true;
3244 }
3245 } else {
3246 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3247 threshold, sample->highest_usage);
3248
3249 /*
3250 * For high buffer usage scenarios, we want to trigger whenever
3251 * _any_ of the streams has reached the "high" threshold.
3252 */
3253 if (sample->highest_usage >= threshold) {
3254 result = true;
3255 }
3256 }
3257
3258 return result;
3259 }
3260
3261 static
3262 bool evaluate_session_consumed_size_condition(
3263 const struct lttng_condition *condition,
3264 uint64_t session_consumed_size)
3265 {
3266 uint64_t threshold;
3267 const struct lttng_condition_session_consumed_size *size_condition =
3268 container_of(condition,
3269 struct lttng_condition_session_consumed_size,
3270 parent);
3271
3272 threshold = size_condition->consumed_threshold_bytes.value;
3273 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3274 threshold, session_consumed_size);
3275 return session_consumed_size >= threshold;
3276 }
3277
3278 static
3279 int evaluate_buffer_condition(const struct lttng_condition *condition,
3280 struct lttng_evaluation **evaluation,
3281 const struct notification_thread_state *state,
3282 const struct channel_state_sample *previous_sample,
3283 const struct channel_state_sample *latest_sample,
3284 uint64_t previous_session_consumed_total,
3285 uint64_t latest_session_consumed_total,
3286 struct channel_info *channel_info)
3287 {
3288 int ret = 0;
3289 enum lttng_condition_type condition_type;
3290 const bool previous_sample_available = !!previous_sample;
3291 bool previous_sample_result = false;
3292 bool latest_sample_result;
3293
3294 condition_type = lttng_condition_get_type(condition);
3295
3296 switch (condition_type) {
3297 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3299 if (caa_likely(previous_sample_available)) {
3300 previous_sample_result =
3301 evaluate_buffer_usage_condition(condition,
3302 previous_sample, channel_info->capacity);
3303 }
3304 latest_sample_result = evaluate_buffer_usage_condition(
3305 condition, latest_sample,
3306 channel_info->capacity);
3307 break;
3308 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3309 if (caa_likely(previous_sample_available)) {
3310 previous_sample_result =
3311 evaluate_session_consumed_size_condition(
3312 condition,
3313 previous_session_consumed_total);
3314 }
3315 latest_sample_result =
3316 evaluate_session_consumed_size_condition(
3317 condition,
3318 latest_session_consumed_total);
3319 break;
3320 default:
3321 /* Unknown condition type; internal error. */
3322 abort();
3323 }
3324
3325 if (!latest_sample_result ||
3326 (previous_sample_result == latest_sample_result)) {
3327 /*
3328 * Only trigger on a condition evaluation transition.
3329 *
3330 * NOTE: This edge-triggered logic may not be appropriate for
3331 * future condition types.
3332 */
3333 goto end;
3334 }
3335
3336 if (!evaluation || !latest_sample_result) {
3337 goto end;
3338 }
3339
3340 switch (condition_type) {
3341 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3342 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3343 *evaluation = lttng_evaluation_buffer_usage_create(
3344 condition_type,
3345 latest_sample->highest_usage,
3346 channel_info->capacity);
3347 break;
3348 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3349 *evaluation = lttng_evaluation_session_consumed_size_create(
3350 latest_session_consumed_total);
3351 break;
3352 default:
3353 abort();
3354 }
3355
3356 if (!*evaluation) {
3357 ret = -1;
3358 goto end;
3359 }
3360 end:
3361 return ret;
3362 }
3363
3364 static
3365 int client_enqueue_dropped_notification(struct notification_client *client)
3366 {
3367 int ret;
3368 struct lttng_notification_channel_message msg = {
3369 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3370 .size = 0,
3371 };
3372
3373 ASSERT_LOCKED(client->lock);
3374
3375 ret = lttng_dynamic_buffer_append(
3376 &client->communication.outbound.buffer, &msg,
3377 sizeof(msg));
3378 return ret;
3379 }
3380
3381 /*
3382 * Permission checks relative to notification channel clients are performed
3383 * here. Notice how object, client, and trigger credentials are involved in
3384 * this check.
3385 *
3386 * The `object` credentials are the credentials associated with the "subject"
3387 * of a condition. For instance, a `rotation completed` condition applies
3388 * to a session. When that condition is met, it will produce an evaluation
3389 * against a session. Hence, in this case, the `object` credentials are the
3390 * credentials of the "subject" session.
3391 *
3392 * The `trigger` credentials are the credentials of the user that registered the
3393 * trigger.
3394 *
3395 * The `client` credentials are the credentials of the user that created a given
3396 * notification channel.
3397 *
3398 * In terms of visibility, it is expected that non-privilieged users can only
3399 * register triggers against "their" objects (their own sessions and
3400 * applications they are allowed to interact with). They can then open a
3401 * notification channel and subscribe to notifications associated with those
3402 * triggers.
3403 *
3404 * As for privilieged users, they can register triggers against the objects of
3405 * other users. They can then subscribe to the notifications associated to their
3406 * triggers. Privilieged users _can't_ subscribe to the notifications of
3407 * triggers owned by other users; they must create their own triggers.
3408 *
3409 * This is more a concern of usability than security. It would be difficult for
3410 * a root user reliably subscribe to a specific set of conditions without
3411 * interference from external users (those could, for instance, unregister
3412 * their triggers).
3413 */
3414 static
3415 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3416 const struct lttng_evaluation *evaluation,
3417 struct notification_client_list* client_list,
3418 struct notification_thread_state *state,
3419 uid_t object_uid, gid_t object_gid)
3420 {
3421 int ret = 0;
3422 struct lttng_payload msg_payload;
3423 struct notification_client_list_element *client_list_element, *tmp;
3424 const struct lttng_notification notification = {
3425 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3426 .evaluation = (struct lttng_evaluation *) evaluation,
3427 };
3428 struct lttng_notification_channel_message msg_header = {
3429 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3430 };
3431 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
3432
3433 lttng_payload_init(&msg_payload);
3434
3435 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3436 sizeof(msg_header));
3437 if (ret) {
3438 goto end;
3439 }
3440
3441 ret = lttng_notification_serialize(&notification, &msg_payload);
3442 if (ret) {
3443 ERR("[notification-thread] Failed to serialize notification");
3444 ret = -1;
3445 goto end;
3446 }
3447
3448 /* Update payload size. */
3449 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
3450 ->size = (uint32_t)(
3451 msg_payload.buffer.size - sizeof(msg_header));
3452
3453 pthread_mutex_lock(&client_list->lock);
3454 cds_list_for_each_entry_safe(client_list_element, tmp,
3455 &client_list->list, node) {
3456 struct notification_client *client =
3457 client_list_element->client;
3458
3459 ret = 0;
3460 pthread_mutex_lock(&client->lock);
3461 if (client->uid != object_uid && client->gid != object_gid &&
3462 client->uid != 0) {
3463 /* Client is not allowed to monitor this channel. */
3464 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3465 goto unlock_client;
3466 }
3467
3468 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3469 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3470 goto unlock_client;
3471 }
3472
3473 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3474 client->socket, msg_payload.buffer.size);
3475 if (client->communication.outbound.buffer.size) {
3476 /*
3477 * Outgoing data is already buffered for this client;
3478 * drop the notification and enqueue a "dropped
3479 * notification" message if this is the first dropped
3480 * notification since the socket spilled-over to the
3481 * queue.
3482 */
3483 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3484 client->socket);
3485 if (!client->communication.outbound.dropped_notification) {
3486 client->communication.outbound.dropped_notification = true;
3487 ret = client_enqueue_dropped_notification(
3488 client);
3489 if (ret) {
3490 goto unlock_client;
3491 }
3492 }
3493 goto unlock_client;
3494 }
3495
3496 ret = lttng_dynamic_buffer_append_buffer(
3497 &client->communication.outbound.buffer,
3498 &msg_payload.buffer);
3499 if (ret) {
3500 goto unlock_client;
3501 }
3502
3503 ret = client_flush_outgoing_queue(client, state);
3504 if (ret) {
3505 goto unlock_client;
3506 }
3507 unlock_client:
3508 pthread_mutex_unlock(&client->lock);
3509 if (ret) {
3510 goto end_unlock_list;
3511 }
3512 }
3513 ret = 0;
3514
3515 end_unlock_list:
3516 pthread_mutex_unlock(&client_list->lock);
3517 end:
3518 lttng_payload_reset(&msg_payload);
3519 return ret;
3520 }
3521
3522 int handle_notification_thread_channel_sample(
3523 struct notification_thread_state *state, int pipe,
3524 enum lttng_domain_type domain)
3525 {
3526 int ret = 0;
3527 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3528 struct channel_info *channel_info;
3529 struct cds_lfht_node *node;
3530 struct cds_lfht_iter iter;
3531 struct lttng_channel_trigger_list *trigger_list;
3532 struct lttng_trigger_list_element *trigger_list_element;
3533 bool previous_sample_available = false;
3534 struct channel_state_sample previous_sample, latest_sample;
3535 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3536
3537 /*
3538 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3539 * ensuring that read/write of sampling messages are atomic.
3540 */
3541 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3542 if (ret != sizeof(sample_msg)) {
3543 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3544 pipe);
3545 ret = -1;
3546 goto end;
3547 }
3548
3549 ret = 0;
3550 latest_sample.key.key = sample_msg.key;
3551 latest_sample.key.domain = domain;
3552 latest_sample.highest_usage = sample_msg.highest;
3553 latest_sample.lowest_usage = sample_msg.lowest;
3554 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3555
3556 rcu_read_lock();
3557
3558 /* Retrieve the channel's informations */
3559 cds_lfht_lookup(state->channels_ht,
3560 hash_channel_key(&latest_sample.key),
3561 match_channel_info,
3562 &latest_sample.key,
3563 &iter);
3564 node = cds_lfht_iter_get_node(&iter);
3565 if (caa_unlikely(!node)) {
3566 /*
3567 * Not an error since the consumer can push a sample to the pipe
3568 * and the rest of the session daemon could notify us of the
3569 * channel's destruction before we get a chance to process that
3570 * sample.
3571 */
3572 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3573 latest_sample.key.key,
3574 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3575 "user space");
3576 goto end_unlock;
3577 }
3578 channel_info = caa_container_of(node, struct channel_info,
3579 channels_ht_node);
3580 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3581 channel_info->name,
3582 latest_sample.key.key,
3583 channel_info->session_info->name,
3584 latest_sample.highest_usage,
3585 latest_sample.lowest_usage,
3586 latest_sample.channel_total_consumed);
3587
3588 previous_session_consumed_total =
3589 channel_info->session_info->consumed_data_size;
3590
3591 /* Retrieve the channel's last sample, if it exists, and update it. */
3592 cds_lfht_lookup(state->channel_state_ht,
3593 hash_channel_key(&latest_sample.key),
3594 match_channel_state_sample,
3595 &latest_sample.key,
3596 &iter);
3597 node = cds_lfht_iter_get_node(&iter);
3598 if (caa_likely(node)) {
3599 struct channel_state_sample *stored_sample;
3600
3601 /* Update the sample stored. */
3602 stored_sample = caa_container_of(node,
3603 struct channel_state_sample,
3604 channel_state_ht_node);
3605
3606 memcpy(&previous_sample, stored_sample,
3607 sizeof(previous_sample));
3608 stored_sample->highest_usage = latest_sample.highest_usage;
3609 stored_sample->lowest_usage = latest_sample.lowest_usage;
3610 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3611 previous_sample_available = true;
3612
3613 latest_session_consumed_total =
3614 previous_session_consumed_total +
3615 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3616 } else {
3617 /*
3618 * This is the channel's first sample, allocate space for and
3619 * store the new sample.
3620 */
3621 struct channel_state_sample *stored_sample;
3622
3623 stored_sample = zmalloc(sizeof(*stored_sample));
3624 if (!stored_sample) {
3625 ret = -1;
3626 goto end_unlock;
3627 }
3628
3629 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3630 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3631 cds_lfht_add(state->channel_state_ht,
3632 hash_channel_key(&stored_sample->key),
3633 &stored_sample->channel_state_ht_node);
3634
3635 latest_session_consumed_total =
3636 previous_session_consumed_total +
3637 latest_sample.channel_total_consumed;
3638 }
3639
3640 channel_info->session_info->consumed_data_size =
3641 latest_session_consumed_total;
3642
3643 /* Find triggers associated with this channel. */
3644 cds_lfht_lookup(state->channel_triggers_ht,
3645 hash_channel_key(&latest_sample.key),
3646 match_channel_trigger_list,
3647 &latest_sample.key,
3648 &iter);
3649 node = cds_lfht_iter_get_node(&iter);
3650 if (caa_likely(!node)) {
3651 goto end_unlock;
3652 }
3653
3654 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3655 channel_triggers_ht_node);
3656 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3657 node) {
3658 const struct lttng_condition *condition;
3659 const struct lttng_action *action;
3660 const struct lttng_trigger *trigger;
3661 struct notification_client_list *client_list = NULL;
3662 struct lttng_evaluation *evaluation = NULL;
3663 bool client_list_is_empty;
3664
3665 ret = 0;
3666 trigger = trigger_list_element->trigger;
3667 condition = lttng_trigger_get_const_condition(trigger);
3668 assert(condition);
3669 action = lttng_trigger_get_const_action(trigger);
3670
3671 /* Notify actions are the only type currently supported. */
3672 assert(lttng_action_get_type_const(action) ==
3673 LTTNG_ACTION_TYPE_NOTIFY);
3674
3675 /*
3676 * Check if any client is subscribed to the result of this
3677 * evaluation.
3678 */
3679 client_list = get_client_list_from_condition(state, condition);
3680 assert(client_list);
3681 client_list_is_empty = cds_list_empty(&client_list->list);
3682 if (client_list_is_empty) {
3683 /*
3684 * No clients interested in the evaluation's result,
3685 * skip it.
3686 */
3687 goto put_list;
3688 }
3689
3690 ret = evaluate_buffer_condition(condition, &evaluation, state,
3691 previous_sample_available ? &previous_sample : NULL,
3692 &latest_sample,
3693 previous_session_consumed_total,
3694 latest_session_consumed_total,
3695 channel_info);
3696 if (caa_unlikely(ret)) {
3697 goto put_list;
3698 }
3699
3700 if (caa_likely(!evaluation)) {
3701 goto put_list;
3702 }
3703
3704 /* Dispatch evaluation result to all clients. */
3705 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3706 evaluation, client_list, state,
3707 channel_info->session_info->uid,
3708 channel_info->session_info->gid);
3709 lttng_evaluation_destroy(evaluation);
3710 put_list:
3711 notification_client_list_put(client_list);
3712 if (caa_unlikely(ret)) {
3713 break;
3714 }
3715 }
3716 end_unlock:
3717 rcu_read_unlock();
3718 end:
3719 return ret;
3720 }
This page took 0.213704 seconds and 6 git commands to generate.