Fix: notification thread: RCU-safe reclaim of hash table nodes
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action-internal.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/condition/session-consumed-size-internal.h>
36 #include <lttng/condition/session-rotation-internal.h>
37 #include <lttng/notification/channel-internal.h>
38
39 #include <time.h>
40 #include <unistd.h>
41 #include <assert.h>
42 #include <inttypes.h>
43 #include <fcntl.h>
44
45 #include "notification-thread.h"
46 #include "notification-thread-events.h"
47 #include "notification-thread-commands.h"
48 #include "lttng-sessiond.h"
49 #include "kernel.h"
50
51 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
54 enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59 };
60
61 struct lttng_trigger_list_element {
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
64 struct cds_list_head node;
65 };
66
67 struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
69 /* List of struct lttng_trigger_list_element. */
70 struct cds_list_head list;
71 /* Node in the channel_triggers_ht */
72 struct cds_lfht_node channel_triggers_ht_node;
73 /* call_rcu delayed reclaim. */
74 struct rcu_head rcu_node;
75 };
76
77 /*
78 * List of triggers applying to a given session.
79 *
80 * See:
81 * - lttng_session_trigger_list_create()
82 * - lttng_session_trigger_list_build()
83 * - lttng_session_trigger_list_destroy()
84 * - lttng_session_trigger_list_add()
85 */
86 struct lttng_session_trigger_list {
87 /*
88 * Not owned by this; points to the session_info structure's
89 * session name.
90 */
91 const char *session_name;
92 /* List of struct lttng_trigger_list_element. */
93 struct cds_list_head list;
94 /* Node in the session_triggers_ht */
95 struct cds_lfht_node session_triggers_ht_node;
96 /*
97 * Weak reference to the notification system's session triggers
98 * hashtable.
99 *
100 * The session trigger list structure structure is owned by
101 * the session's session_info.
102 *
103 * The session_info is kept alive the the channel_infos holding a
104 * reference to it (reference counting). When those channels are
105 * destroyed (at runtime or on teardown), the reference they hold
106 * to the session_info are released. On destruction of session_info,
107 * session_info_destroy() will remove the list of triggers applying
108 * to this session from the notification system's state.
109 *
110 * This implies that the session_triggers_ht must be destroyed
111 * after the channels.
112 */
113 struct cds_lfht *session_triggers_ht;
114 /* Used for delayed RCU reclaim. */
115 struct rcu_head rcu_node;
116 };
117
118 struct lttng_trigger_ht_element {
119 struct lttng_trigger *trigger;
120 struct cds_lfht_node node;
121 /* call_rcu delayed reclaim. */
122 struct rcu_head rcu_node;
123 };
124
125 struct lttng_condition_list_element {
126 struct lttng_condition *condition;
127 struct cds_list_head node;
128 };
129
130 struct notification_client_list_element {
131 struct notification_client *client;
132 struct cds_list_head node;
133 };
134
135 struct notification_client_list {
136 const struct lttng_trigger *trigger;
137 struct cds_list_head list;
138 struct cds_lfht_node notification_trigger_ht_node;
139 /* call_rcu delayed reclaim. */
140 struct rcu_head rcu_node;
141 };
142
143 struct notification_client {
144 int socket;
145 /* Client protocol version. */
146 uint8_t major, minor;
147 uid_t uid;
148 gid_t gid;
149 /*
150 * Indicates if the credentials and versions of the client have been
151 * checked.
152 */
153 bool validated;
154 /*
155 * Conditions to which the client's notification channel is subscribed.
156 * List of struct lttng_condition_list_node. The condition member is
157 * owned by the client.
158 */
159 struct cds_list_head condition_list;
160 struct cds_lfht_node client_socket_ht_node;
161 struct {
162 struct {
163 /*
164 * During the reception of a message, the reception
165 * buffers' "size" is set to contain the current
166 * message's complete payload.
167 */
168 struct lttng_dynamic_buffer buffer;
169 /* Bytes left to receive for the current message. */
170 size_t bytes_to_receive;
171 /* Type of the message being received. */
172 enum lttng_notification_channel_message_type msg_type;
173 /*
174 * Indicates whether or not credentials are expected
175 * from the client.
176 */
177 bool expect_creds;
178 /*
179 * Indicates whether or not credentials were received
180 * from the client.
181 */
182 bool creds_received;
183 /* Only used during credentials reception. */
184 lttng_sock_cred creds;
185 } inbound;
186 struct {
187 /*
188 * Indicates whether or not a notification addressed to
189 * this client was dropped because a command reply was
190 * already buffered.
191 *
192 * A notification is dropped whenever the buffer is not
193 * empty.
194 */
195 bool dropped_notification;
196 /*
197 * Indicates whether or not a command reply is already
198 * buffered. In this case, it means that the client is
199 * not consuming command replies before emitting a new
200 * one. This could be caused by a protocol error or a
201 * misbehaving/malicious client.
202 */
203 bool queued_command_reply;
204 struct lttng_dynamic_buffer buffer;
205 } outbound;
206 } communication;
207 /* call_rcu delayed reclaim. */
208 struct rcu_head rcu_node;
209 };
210
211 struct channel_state_sample {
212 struct channel_key key;
213 struct cds_lfht_node channel_state_ht_node;
214 uint64_t highest_usage;
215 uint64_t lowest_usage;
216 uint64_t channel_total_consumed;
217 /* call_rcu delayed reclaim. */
218 struct rcu_head rcu_node;
219 };
220
221 static unsigned long hash_channel_key(struct channel_key *key);
222 static int evaluate_buffer_condition(const struct lttng_condition *condition,
223 struct lttng_evaluation **evaluation,
224 const struct notification_thread_state *state,
225 const struct channel_state_sample *previous_sample,
226 const struct channel_state_sample *latest_sample,
227 uint64_t previous_session_consumed_total,
228 uint64_t latest_session_consumed_total,
229 struct channel_info *channel_info);
230 static
231 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
232 const struct lttng_evaluation *evaluation,
233 struct notification_client_list *client_list,
234 struct notification_thread_state *state,
235 uid_t channel_uid, gid_t channel_gid);
236
237
238 /* session_info API */
239 static
240 void session_info_destroy(void *_data);
241 static
242 void session_info_get(struct session_info *session_info);
243 static
244 void session_info_put(struct session_info *session_info);
245 static
246 struct session_info *session_info_create(const char *name,
247 uid_t uid, gid_t gid,
248 struct lttng_session_trigger_list *trigger_list,
249 struct cds_lfht *sessions_ht);
250 static
251 void session_info_add_channel(struct session_info *session_info,
252 struct channel_info *channel_info);
253 static
254 void session_info_remove_channel(struct session_info *session_info,
255 struct channel_info *channel_info);
256
257 /* lttng_session_trigger_list API */
258 static
259 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
260 const char *session_name,
261 struct cds_lfht *session_triggers_ht);
262 static
263 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
264 const struct notification_thread_state *state,
265 const char *session_name);
266 static
267 void lttng_session_trigger_list_destroy(
268 struct lttng_session_trigger_list *list);
269 static
270 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
271 const struct lttng_trigger *trigger);
272
273
274 static
275 int match_client(struct cds_lfht_node *node, const void *key)
276 {
277 /* This double-cast is intended to supress pointer-to-cast warning. */
278 int socket = (int) (intptr_t) key;
279 struct notification_client *client;
280
281 client = caa_container_of(node, struct notification_client,
282 client_socket_ht_node);
283
284 return !!(client->socket == socket);
285 }
286
287 static
288 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
289 {
290 struct channel_key *channel_key = (struct channel_key *) key;
291 struct lttng_channel_trigger_list *trigger_list;
292
293 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
294 channel_triggers_ht_node);
295
296 return !!((channel_key->key == trigger_list->channel_key.key) &&
297 (channel_key->domain == trigger_list->channel_key.domain));
298 }
299
300 static
301 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
302 {
303 const char *session_name = (const char *) key;
304 struct lttng_session_trigger_list *trigger_list;
305
306 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
307 session_triggers_ht_node);
308
309 return !!(strcmp(trigger_list->session_name, session_name) == 0);
310 }
311
312 static
313 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
314 {
315 struct channel_key *channel_key = (struct channel_key *) key;
316 struct channel_state_sample *sample;
317
318 sample = caa_container_of(node, struct channel_state_sample,
319 channel_state_ht_node);
320
321 return !!((channel_key->key == sample->key.key) &&
322 (channel_key->domain == sample->key.domain));
323 }
324
325 static
326 int match_channel_info(struct cds_lfht_node *node, const void *key)
327 {
328 struct channel_key *channel_key = (struct channel_key *) key;
329 struct channel_info *channel_info;
330
331 channel_info = caa_container_of(node, struct channel_info,
332 channels_ht_node);
333
334 return !!((channel_key->key == channel_info->key.key) &&
335 (channel_key->domain == channel_info->key.domain));
336 }
337
338 static
339 int match_condition(struct cds_lfht_node *node, const void *key)
340 {
341 struct lttng_condition *condition_key = (struct lttng_condition *) key;
342 struct lttng_trigger_ht_element *trigger;
343 struct lttng_condition *condition;
344
345 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
346 node);
347 condition = lttng_trigger_get_condition(trigger->trigger);
348 assert(condition);
349
350 return !!lttng_condition_is_equal(condition_key, condition);
351 }
352
353 static
354 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
355 {
356 struct lttng_condition *condition_key = (struct lttng_condition *) key;
357 struct notification_client_list *client_list;
358 const struct lttng_condition *condition;
359
360 assert(condition_key);
361
362 client_list = caa_container_of(node, struct notification_client_list,
363 notification_trigger_ht_node);
364 condition = lttng_trigger_get_const_condition(client_list->trigger);
365
366 return !!lttng_condition_is_equal(condition_key, condition);
367 }
368
369 static
370 int match_session(struct cds_lfht_node *node, const void *key)
371 {
372 const char *name = key;
373 struct session_info *session_info = caa_container_of(
374 node, struct session_info, sessions_ht_node);
375
376 return !strcmp(session_info->name, name);
377 }
378
379 static
380 unsigned long lttng_condition_buffer_usage_hash(
381 const struct lttng_condition *_condition)
382 {
383 unsigned long hash;
384 unsigned long condition_type;
385 struct lttng_condition_buffer_usage *condition;
386
387 condition = container_of(_condition,
388 struct lttng_condition_buffer_usage, parent);
389
390 condition_type = (unsigned long) condition->parent.type;
391 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
392 if (condition->session_name) {
393 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
394 }
395 if (condition->channel_name) {
396 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
397 }
398 if (condition->domain.set) {
399 hash ^= hash_key_ulong(
400 (void *) condition->domain.type,
401 lttng_ht_seed);
402 }
403 if (condition->threshold_ratio.set) {
404 uint64_t val;
405
406 val = condition->threshold_ratio.value * (double) UINT32_MAX;
407 hash ^= hash_key_u64(&val, lttng_ht_seed);
408 } else if (condition->threshold_bytes.set) {
409 uint64_t val;
410
411 val = condition->threshold_bytes.value;
412 hash ^= hash_key_u64(&val, lttng_ht_seed);
413 }
414 return hash;
415 }
416
417 static
418 unsigned long lttng_condition_session_consumed_size_hash(
419 const struct lttng_condition *_condition)
420 {
421 unsigned long hash;
422 unsigned long condition_type =
423 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
424 struct lttng_condition_session_consumed_size *condition;
425 uint64_t val;
426
427 condition = container_of(_condition,
428 struct lttng_condition_session_consumed_size, parent);
429
430 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
431 if (condition->session_name) {
432 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
433 }
434 val = condition->consumed_threshold_bytes.value;
435 hash ^= hash_key_u64(&val, lttng_ht_seed);
436 return hash;
437 }
438
439 static
440 unsigned long lttng_condition_session_rotation_hash(
441 const struct lttng_condition *_condition)
442 {
443 unsigned long hash, condition_type;
444 struct lttng_condition_session_rotation *condition;
445
446 condition = container_of(_condition,
447 struct lttng_condition_session_rotation, parent);
448 condition_type = (unsigned long) condition->parent.type;
449 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
450 assert(condition->session_name);
451 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
452 return hash;
453 }
454
455 /*
456 * The lttng_condition hashing code is kept in this file (rather than
457 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
458 * don't want to link in liblttng-ctl.
459 */
460 static
461 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
462 {
463 switch (condition->type) {
464 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
465 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
466 return lttng_condition_buffer_usage_hash(condition);
467 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
468 return lttng_condition_session_consumed_size_hash(condition);
469 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
470 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
471 return lttng_condition_session_rotation_hash(condition);
472 default:
473 ERR("[notification-thread] Unexpected condition type caught");
474 abort();
475 }
476 }
477
478 static
479 unsigned long hash_channel_key(struct channel_key *key)
480 {
481 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
482 unsigned long domain_hash = hash_key_ulong(
483 (void *) (unsigned long) key->domain, lttng_ht_seed);
484
485 return key_hash ^ domain_hash;
486 }
487
488 /*
489 * Get the type of object to which a given condition applies. Bindings let
490 * the notification system evaluate a trigger's condition when a given
491 * object's state is updated.
492 *
493 * For instance, a condition bound to a channel will be evaluated everytime
494 * the channel's state is changed by a channel monitoring sample.
495 */
496 static
497 enum lttng_object_type get_condition_binding_object(
498 const struct lttng_condition *condition)
499 {
500 switch (lttng_condition_get_type(condition)) {
501 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
502 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
503 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
504 return LTTNG_OBJECT_TYPE_CHANNEL;
505 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
506 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
507 return LTTNG_OBJECT_TYPE_SESSION;
508 default:
509 return LTTNG_OBJECT_TYPE_UNKNOWN;
510 }
511 }
512
513 static
514 void free_channel_info_rcu(struct rcu_head *node)
515 {
516 free(caa_container_of(node, struct channel_info, rcu_node));
517 }
518
519 static
520 void channel_info_destroy(struct channel_info *channel_info)
521 {
522 if (!channel_info) {
523 return;
524 }
525
526 if (channel_info->session_info) {
527 session_info_remove_channel(channel_info->session_info,
528 channel_info);
529 session_info_put(channel_info->session_info);
530 }
531 if (channel_info->name) {
532 free(channel_info->name);
533 }
534 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
535 }
536
537 static
538 void free_session_info_rcu(struct rcu_head *node)
539 {
540 free(caa_container_of(node, struct session_info, rcu_node));
541 }
542
543 /* Don't call directly, use the ref-counting mechanism. */
544 static
545 void session_info_destroy(void *_data)
546 {
547 struct session_info *session_info = _data;
548 int ret;
549
550 assert(session_info);
551 if (session_info->channel_infos_ht) {
552 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
553 if (ret) {
554 ERR("[notification-thread] Failed to destroy channel information hash table");
555 }
556 }
557 lttng_session_trigger_list_destroy(session_info->trigger_list);
558
559 rcu_read_lock();
560 cds_lfht_del(session_info->sessions_ht,
561 &session_info->sessions_ht_node);
562 rcu_read_unlock();
563 free(session_info->name);
564 call_rcu(&session_info->rcu_node, free_session_info_rcu);
565 }
566
567 static
568 void session_info_get(struct session_info *session_info)
569 {
570 if (!session_info) {
571 return;
572 }
573 lttng_ref_get(&session_info->ref);
574 }
575
576 static
577 void session_info_put(struct session_info *session_info)
578 {
579 if (!session_info) {
580 return;
581 }
582 lttng_ref_put(&session_info->ref);
583 }
584
585 static
586 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
587 struct lttng_session_trigger_list *trigger_list,
588 struct cds_lfht *sessions_ht)
589 {
590 struct session_info *session_info;
591
592 assert(name);
593
594 session_info = zmalloc(sizeof(*session_info));
595 if (!session_info) {
596 goto end;
597 }
598 lttng_ref_init(&session_info->ref, session_info_destroy);
599
600 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
601 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
602 if (!session_info->channel_infos_ht) {
603 goto error;
604 }
605
606 cds_lfht_node_init(&session_info->sessions_ht_node);
607 session_info->name = strdup(name);
608 if (!session_info->name) {
609 goto error;
610 }
611 session_info->uid = uid;
612 session_info->gid = gid;
613 session_info->trigger_list = trigger_list;
614 session_info->sessions_ht = sessions_ht;
615 end:
616 return session_info;
617 error:
618 session_info_put(session_info);
619 return NULL;
620 }
621
622 static
623 void session_info_add_channel(struct session_info *session_info,
624 struct channel_info *channel_info)
625 {
626 rcu_read_lock();
627 cds_lfht_add(session_info->channel_infos_ht,
628 hash_channel_key(&channel_info->key),
629 &channel_info->session_info_channels_ht_node);
630 rcu_read_unlock();
631 }
632
633 static
634 void session_info_remove_channel(struct session_info *session_info,
635 struct channel_info *channel_info)
636 {
637 rcu_read_lock();
638 cds_lfht_del(session_info->channel_infos_ht,
639 &channel_info->session_info_channels_ht_node);
640 rcu_read_unlock();
641 }
642
643 static
644 struct channel_info *channel_info_create(const char *channel_name,
645 struct channel_key *channel_key, uint64_t channel_capacity,
646 struct session_info *session_info)
647 {
648 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
649
650 if (!channel_info) {
651 goto end;
652 }
653
654 cds_lfht_node_init(&channel_info->channels_ht_node);
655 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
656 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
657 channel_info->capacity = channel_capacity;
658
659 channel_info->name = strdup(channel_name);
660 if (!channel_info->name) {
661 goto error;
662 }
663
664 /*
665 * Set the references between session and channel infos:
666 * - channel_info holds a strong reference to session_info
667 * - session_info holds a weak reference to channel_info
668 */
669 session_info_get(session_info);
670 session_info_add_channel(session_info, channel_info);
671 channel_info->session_info = session_info;
672 end:
673 return channel_info;
674 error:
675 channel_info_destroy(channel_info);
676 return NULL;
677 }
678
679 /* RCU read lock must be held by the caller. */
680 static
681 struct notification_client_list *get_client_list_from_condition(
682 struct notification_thread_state *state,
683 const struct lttng_condition *condition)
684 {
685 struct cds_lfht_node *node;
686 struct cds_lfht_iter iter;
687
688 cds_lfht_lookup(state->notification_trigger_clients_ht,
689 lttng_condition_hash(condition),
690 match_client_list_condition,
691 condition,
692 &iter);
693 node = cds_lfht_iter_get_node(&iter);
694
695 return node ? caa_container_of(node,
696 struct notification_client_list,
697 notification_trigger_ht_node) : NULL;
698 }
699
700 /* This function must be called with the RCU read lock held. */
701 static
702 int evaluate_channel_condition_for_client(
703 const struct lttng_condition *condition,
704 struct notification_thread_state *state,
705 struct lttng_evaluation **evaluation,
706 uid_t *session_uid, gid_t *session_gid)
707 {
708 int ret;
709 struct cds_lfht_iter iter;
710 struct cds_lfht_node *node;
711 struct channel_info *channel_info = NULL;
712 struct channel_key *channel_key = NULL;
713 struct channel_state_sample *last_sample = NULL;
714 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
715
716 /* Find the channel associated with the condition. */
717 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
718 channel_trigger_list, channel_triggers_ht_node) {
719 struct lttng_trigger_list_element *element;
720
721 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
722 const struct lttng_condition *current_condition =
723 lttng_trigger_get_const_condition(
724 element->trigger);
725
726 assert(current_condition);
727 if (!lttng_condition_is_equal(condition,
728 current_condition)) {
729 continue;
730 }
731
732 /* Found the trigger, save the channel key. */
733 channel_key = &channel_trigger_list->channel_key;
734 break;
735 }
736 if (channel_key) {
737 /* The channel key was found stop iteration. */
738 break;
739 }
740 }
741
742 if (!channel_key){
743 /* No channel found; normal exit. */
744 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
745 ret = 0;
746 goto end;
747 }
748
749 /* Fetch channel info for the matching channel. */
750 cds_lfht_lookup(state->channels_ht,
751 hash_channel_key(channel_key),
752 match_channel_info,
753 channel_key,
754 &iter);
755 node = cds_lfht_iter_get_node(&iter);
756 assert(node);
757 channel_info = caa_container_of(node, struct channel_info,
758 channels_ht_node);
759
760 /* Retrieve the channel's last sample, if it exists. */
761 cds_lfht_lookup(state->channel_state_ht,
762 hash_channel_key(channel_key),
763 match_channel_state_sample,
764 channel_key,
765 &iter);
766 node = cds_lfht_iter_get_node(&iter);
767 if (node) {
768 last_sample = caa_container_of(node,
769 struct channel_state_sample,
770 channel_state_ht_node);
771 } else {
772 /* Nothing to evaluate, no sample was ever taken. Normal exit */
773 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
774 ret = 0;
775 goto end;
776 }
777
778 ret = evaluate_buffer_condition(condition, evaluation, state,
779 NULL, last_sample,
780 0, channel_info->session_info->consumed_data_size,
781 channel_info);
782 if (ret) {
783 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
784 goto end;
785 }
786
787 *session_uid = channel_info->session_info->uid;
788 *session_gid = channel_info->session_info->gid;
789 end:
790 return ret;
791 }
792
793 static
794 const char *get_condition_session_name(const struct lttng_condition *condition)
795 {
796 const char *session_name = NULL;
797 enum lttng_condition_status status;
798
799 switch (lttng_condition_get_type(condition)) {
800 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
802 status = lttng_condition_buffer_usage_get_session_name(
803 condition, &session_name);
804 break;
805 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
806 status = lttng_condition_session_consumed_size_get_session_name(
807 condition, &session_name);
808 break;
809 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
810 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
811 status = lttng_condition_session_rotation_get_session_name(
812 condition, &session_name);
813 break;
814 default:
815 abort();
816 }
817 if (status != LTTNG_CONDITION_STATUS_OK) {
818 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
819 goto end;
820 }
821 end:
822 return session_name;
823 }
824
825 /* This function must be called with the RCU read lock held. */
826 static
827 int evaluate_session_condition_for_client(
828 const struct lttng_condition *condition,
829 struct notification_thread_state *state,
830 struct lttng_evaluation **evaluation,
831 uid_t *session_uid, gid_t *session_gid)
832 {
833 int ret;
834 struct cds_lfht_iter iter;
835 struct cds_lfht_node *node;
836 const char *session_name;
837 struct session_info *session_info = NULL;
838
839 session_name = get_condition_session_name(condition);
840
841 /* Find the session associated with the trigger. */
842 cds_lfht_lookup(state->sessions_ht,
843 hash_key_str(session_name, lttng_ht_seed),
844 match_session,
845 session_name,
846 &iter);
847 node = cds_lfht_iter_get_node(&iter);
848 if (!node) {
849 DBG("[notification-thread] No known session matching name \"%s\"",
850 session_name);
851 ret = 0;
852 goto end;
853 }
854
855 session_info = caa_container_of(node, struct session_info,
856 sessions_ht_node);
857 session_info_get(session_info);
858
859 /*
860 * Evaluation is performed in-line here since only one type of
861 * session-bound condition is handled for the moment.
862 */
863 switch (lttng_condition_get_type(condition)) {
864 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
865 if (!session_info->rotation.ongoing) {
866 ret = 0;
867 goto end_session_put;
868 }
869
870 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
871 session_info->rotation.id);
872 if (!*evaluation) {
873 /* Fatal error. */
874 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
875 session_info->name);
876 ret = -1;
877 goto end_session_put;
878 }
879 ret = 0;
880 break;
881 default:
882 ret = 0;
883 goto end_session_put;
884 }
885
886 *session_uid = session_info->uid;
887 *session_gid = session_info->gid;
888
889 end_session_put:
890 session_info_put(session_info);
891 end:
892 return ret;
893 }
894
895 /* This function must be called with the RCU read lock held. */
896 static
897 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
898 const struct lttng_condition *condition,
899 struct notification_client *client,
900 struct notification_thread_state *state)
901 {
902 int ret;
903 struct lttng_evaluation *evaluation = NULL;
904 struct notification_client_list client_list = { 0 };
905 struct notification_client_list_element client_list_element = { 0 };
906 uid_t object_uid = 0;
907 gid_t object_gid = 0;
908
909 assert(trigger);
910 assert(condition);
911 assert(client);
912 assert(state);
913
914 switch (get_condition_binding_object(condition)) {
915 case LTTNG_OBJECT_TYPE_SESSION:
916 ret = evaluate_session_condition_for_client(condition, state,
917 &evaluation, &object_uid, &object_gid);
918 break;
919 case LTTNG_OBJECT_TYPE_CHANNEL:
920 ret = evaluate_channel_condition_for_client(condition, state,
921 &evaluation, &object_uid, &object_gid);
922 break;
923 case LTTNG_OBJECT_TYPE_NONE:
924 ret = 0;
925 goto end;
926 case LTTNG_OBJECT_TYPE_UNKNOWN:
927 default:
928 ret = -1;
929 goto end;
930 }
931
932 if (!evaluation) {
933 /* Evaluation yielded nothing. Normal exit. */
934 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
935 ret = 0;
936 goto end;
937 }
938
939 /*
940 * Create a temporary client list with the client currently
941 * subscribing.
942 */
943 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
944 CDS_INIT_LIST_HEAD(&client_list.list);
945 client_list.trigger = trigger;
946
947 CDS_INIT_LIST_HEAD(&client_list_element.node);
948 client_list_element.client = client;
949 cds_list_add(&client_list_element.node, &client_list.list);
950
951 /* Send evaluation result to the newly-subscribed client. */
952 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
953 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
954 state, object_uid, object_gid);
955
956 end:
957 return ret;
958 }
959
960 static
961 int notification_thread_client_subscribe(struct notification_client *client,
962 struct lttng_condition *condition,
963 struct notification_thread_state *state,
964 enum lttng_notification_channel_status *_status)
965 {
966 int ret = 0;
967 struct notification_client_list *client_list;
968 struct lttng_condition_list_element *condition_list_element = NULL;
969 struct notification_client_list_element *client_list_element = NULL;
970 enum lttng_notification_channel_status status =
971 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
972
973 /*
974 * Ensure that the client has not already subscribed to this condition
975 * before.
976 */
977 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
978 if (lttng_condition_is_equal(condition_list_element->condition,
979 condition)) {
980 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
981 goto end;
982 }
983 }
984
985 condition_list_element = zmalloc(sizeof(*condition_list_element));
986 if (!condition_list_element) {
987 ret = -1;
988 goto error;
989 }
990 client_list_element = zmalloc(sizeof(*client_list_element));
991 if (!client_list_element) {
992 ret = -1;
993 goto error;
994 }
995
996 rcu_read_lock();
997
998 /*
999 * Add the newly-subscribed condition to the client's subscription list.
1000 */
1001 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1002 condition_list_element->condition = condition;
1003 cds_list_add(&condition_list_element->node, &client->condition_list);
1004
1005 client_list = get_client_list_from_condition(state, condition);
1006 if (!client_list) {
1007 /*
1008 * No notification-emiting trigger registered with this
1009 * condition. We don't evaluate the condition right away
1010 * since this trigger is not registered yet.
1011 */
1012 free(client_list_element);
1013 goto end_unlock;
1014 }
1015
1016 /*
1017 * The condition to which the client just subscribed is evaluated
1018 * at this point so that conditions that are already TRUE result
1019 * in a notification being sent out.
1020 */
1021 if (evaluate_condition_for_client(client_list->trigger, condition,
1022 client, state)) {
1023 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1024 ret = -1;
1025 free(client_list_element);
1026 goto end_unlock;
1027 }
1028
1029 /*
1030 * Add the client to the list of clients interested in a given trigger
1031 * if a "notification" trigger with a corresponding condition was
1032 * added prior.
1033 */
1034 client_list_element->client = client;
1035 CDS_INIT_LIST_HEAD(&client_list_element->node);
1036 cds_list_add(&client_list_element->node, &client_list->list);
1037 end_unlock:
1038 rcu_read_unlock();
1039 end:
1040 if (_status) {
1041 *_status = status;
1042 }
1043 return ret;
1044 error:
1045 free(condition_list_element);
1046 free(client_list_element);
1047 return ret;
1048 }
1049
1050 static
1051 int notification_thread_client_unsubscribe(
1052 struct notification_client *client,
1053 struct lttng_condition *condition,
1054 struct notification_thread_state *state,
1055 enum lttng_notification_channel_status *_status)
1056 {
1057 struct notification_client_list *client_list;
1058 struct lttng_condition_list_element *condition_list_element,
1059 *condition_tmp;
1060 struct notification_client_list_element *client_list_element,
1061 *client_tmp;
1062 bool condition_found = false;
1063 enum lttng_notification_channel_status status =
1064 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1065
1066 /* Remove the condition from the client's condition list. */
1067 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1068 &client->condition_list, node) {
1069 if (!lttng_condition_is_equal(condition_list_element->condition,
1070 condition)) {
1071 continue;
1072 }
1073
1074 cds_list_del(&condition_list_element->node);
1075 /*
1076 * The caller may be iterating on the client's conditions to
1077 * tear down a client's connection. In this case, the condition
1078 * will be destroyed at the end.
1079 */
1080 if (condition != condition_list_element->condition) {
1081 lttng_condition_destroy(
1082 condition_list_element->condition);
1083 }
1084 free(condition_list_element);
1085 condition_found = true;
1086 break;
1087 }
1088
1089 if (!condition_found) {
1090 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1091 goto end;
1092 }
1093
1094 /*
1095 * Remove the client from the list of clients interested the trigger
1096 * matching the condition.
1097 */
1098 rcu_read_lock();
1099 client_list = get_client_list_from_condition(state, condition);
1100 if (!client_list) {
1101 goto end_unlock;
1102 }
1103
1104 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1105 &client_list->list, node) {
1106 if (client_list_element->client->socket != client->socket) {
1107 continue;
1108 }
1109 cds_list_del(&client_list_element->node);
1110 free(client_list_element);
1111 break;
1112 }
1113 end_unlock:
1114 rcu_read_unlock();
1115 end:
1116 lttng_condition_destroy(condition);
1117 if (_status) {
1118 *_status = status;
1119 }
1120 return 0;
1121 }
1122
1123 static
1124 void free_notification_client_rcu(struct rcu_head *node)
1125 {
1126 free(caa_container_of(node, struct notification_client, rcu_node));
1127 }
1128
1129 static
1130 void notification_client_destroy(struct notification_client *client,
1131 struct notification_thread_state *state)
1132 {
1133 struct lttng_condition_list_element *condition_list_element, *tmp;
1134
1135 if (!client) {
1136 return;
1137 }
1138
1139 /* Release all conditions to which the client was subscribed. */
1140 cds_list_for_each_entry_safe(condition_list_element, tmp,
1141 &client->condition_list, node) {
1142 (void) notification_thread_client_unsubscribe(client,
1143 condition_list_element->condition, state, NULL);
1144 }
1145
1146 if (client->socket >= 0) {
1147 (void) lttcomm_close_unix_sock(client->socket);
1148 }
1149 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1150 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1151 call_rcu(&client->rcu_node, free_notification_client_rcu);
1152 }
1153
1154 /*
1155 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1156 * client pointer).
1157 */
1158 static
1159 struct notification_client *get_client_from_socket(int socket,
1160 struct notification_thread_state *state)
1161 {
1162 struct cds_lfht_iter iter;
1163 struct cds_lfht_node *node;
1164 struct notification_client *client = NULL;
1165
1166 cds_lfht_lookup(state->client_socket_ht,
1167 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
1168 match_client,
1169 (void *) (unsigned long) socket,
1170 &iter);
1171 node = cds_lfht_iter_get_node(&iter);
1172 if (!node) {
1173 goto end;
1174 }
1175
1176 client = caa_container_of(node, struct notification_client,
1177 client_socket_ht_node);
1178 end:
1179 return client;
1180 }
1181
1182 static
1183 bool buffer_usage_condition_applies_to_channel(
1184 const struct lttng_condition *condition,
1185 const struct channel_info *channel_info)
1186 {
1187 enum lttng_condition_status status;
1188 enum lttng_domain_type condition_domain;
1189 const char *condition_session_name = NULL;
1190 const char *condition_channel_name = NULL;
1191
1192 status = lttng_condition_buffer_usage_get_domain_type(condition,
1193 &condition_domain);
1194 assert(status == LTTNG_CONDITION_STATUS_OK);
1195 if (channel_info->key.domain != condition_domain) {
1196 goto fail;
1197 }
1198
1199 status = lttng_condition_buffer_usage_get_session_name(
1200 condition, &condition_session_name);
1201 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1202
1203 status = lttng_condition_buffer_usage_get_channel_name(
1204 condition, &condition_channel_name);
1205 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1206
1207 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1208 goto fail;
1209 }
1210 if (strcmp(channel_info->name, condition_channel_name)) {
1211 goto fail;
1212 }
1213
1214 return true;
1215 fail:
1216 return false;
1217 }
1218
1219 static
1220 bool session_consumed_size_condition_applies_to_channel(
1221 const struct lttng_condition *condition,
1222 const struct channel_info *channel_info)
1223 {
1224 enum lttng_condition_status status;
1225 const char *condition_session_name = NULL;
1226
1227 status = lttng_condition_session_consumed_size_get_session_name(
1228 condition, &condition_session_name);
1229 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1230
1231 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1232 goto fail;
1233 }
1234
1235 return true;
1236 fail:
1237 return false;
1238 }
1239
1240 static
1241 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1242 const struct channel_info *channel_info)
1243 {
1244 const struct lttng_condition *condition;
1245 bool trigger_applies;
1246
1247 condition = lttng_trigger_get_const_condition(trigger);
1248 if (!condition) {
1249 goto fail;
1250 }
1251
1252 switch (lttng_condition_get_type(condition)) {
1253 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1254 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1255 trigger_applies = buffer_usage_condition_applies_to_channel(
1256 condition, channel_info);
1257 break;
1258 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1259 trigger_applies = session_consumed_size_condition_applies_to_channel(
1260 condition, channel_info);
1261 break;
1262 default:
1263 goto fail;
1264 }
1265
1266 return trigger_applies;
1267 fail:
1268 return false;
1269 }
1270
1271 static
1272 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1273 struct notification_client *client)
1274 {
1275 bool applies = false;
1276 struct lttng_condition_list_element *condition_list_element;
1277
1278 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1279 node) {
1280 applies = lttng_condition_is_equal(
1281 condition_list_element->condition,
1282 lttng_trigger_get_condition(trigger));
1283 if (applies) {
1284 break;
1285 }
1286 }
1287 return applies;
1288 }
1289
1290 /* Must be called with RCU read lock held. */
1291 static
1292 struct lttng_session_trigger_list *get_session_trigger_list(
1293 struct notification_thread_state *state,
1294 const char *session_name)
1295 {
1296 struct lttng_session_trigger_list *list = NULL;
1297 struct cds_lfht_node *node;
1298 struct cds_lfht_iter iter;
1299
1300 cds_lfht_lookup(state->session_triggers_ht,
1301 hash_key_str(session_name, lttng_ht_seed),
1302 match_session_trigger_list,
1303 session_name,
1304 &iter);
1305 node = cds_lfht_iter_get_node(&iter);
1306 if (!node) {
1307 /*
1308 * Not an error, the list of triggers applying to that session
1309 * will be initialized when the session is created.
1310 */
1311 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1312 session_name);
1313 goto end;
1314 }
1315
1316 list = caa_container_of(node,
1317 struct lttng_session_trigger_list,
1318 session_triggers_ht_node);
1319 end:
1320 return list;
1321 }
1322
1323 /*
1324 * Allocate an empty lttng_session_trigger_list for the session named
1325 * 'session_name'.
1326 *
1327 * No ownership of 'session_name' is assumed by the session trigger list.
1328 * It is the caller's responsability to ensure the session name is alive
1329 * for as long as this list is.
1330 */
1331 static
1332 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1333 const char *session_name,
1334 struct cds_lfht *session_triggers_ht)
1335 {
1336 struct lttng_session_trigger_list *list;
1337
1338 list = zmalloc(sizeof(*list));
1339 if (!list) {
1340 goto end;
1341 }
1342 list->session_name = session_name;
1343 CDS_INIT_LIST_HEAD(&list->list);
1344 cds_lfht_node_init(&list->session_triggers_ht_node);
1345 list->session_triggers_ht = session_triggers_ht;
1346
1347 rcu_read_lock();
1348 /* Publish the list through the session_triggers_ht. */
1349 cds_lfht_add(session_triggers_ht,
1350 hash_key_str(session_name, lttng_ht_seed),
1351 &list->session_triggers_ht_node);
1352 rcu_read_unlock();
1353 end:
1354 return list;
1355 }
1356
1357 static
1358 void free_session_trigger_list_rcu(struct rcu_head *node)
1359 {
1360 free(caa_container_of(node, struct lttng_session_trigger_list,
1361 rcu_node));
1362 }
1363
1364 static
1365 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1366 {
1367 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1368
1369 /* Empty the list element by element, and then free the list itself. */
1370 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1371 &list->list, node) {
1372 cds_list_del(&trigger_list_element->node);
1373 free(trigger_list_element);
1374 }
1375 rcu_read_lock();
1376 /* Unpublish the list from the session_triggers_ht. */
1377 cds_lfht_del(list->session_triggers_ht,
1378 &list->session_triggers_ht_node);
1379 rcu_read_unlock();
1380 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1381 }
1382
1383 static
1384 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1385 const struct lttng_trigger *trigger)
1386 {
1387 int ret = 0;
1388 struct lttng_trigger_list_element *new_element =
1389 zmalloc(sizeof(*new_element));
1390
1391 if (!new_element) {
1392 ret = -1;
1393 goto end;
1394 }
1395 CDS_INIT_LIST_HEAD(&new_element->node);
1396 new_element->trigger = trigger;
1397 cds_list_add(&new_element->node, &list->list);
1398 end:
1399 return ret;
1400 }
1401
1402 static
1403 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1404 const char *session_name)
1405 {
1406 bool applies = false;
1407 const struct lttng_condition *condition;
1408
1409 condition = lttng_trigger_get_const_condition(trigger);
1410 switch (lttng_condition_get_type(condition)) {
1411 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1412 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1413 {
1414 enum lttng_condition_status condition_status;
1415 const char *condition_session_name;
1416
1417 condition_status = lttng_condition_session_rotation_get_session_name(
1418 condition, &condition_session_name);
1419 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1420 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1421 goto end;
1422 }
1423
1424 assert(condition_session_name);
1425 applies = !strcmp(condition_session_name, session_name);
1426 break;
1427 }
1428 default:
1429 goto end;
1430 }
1431 end:
1432 return applies;
1433 }
1434
1435 /*
1436 * Allocate and initialize an lttng_session_trigger_list which contains
1437 * all triggers that apply to the session named 'session_name'.
1438 *
1439 * No ownership of 'session_name' is assumed by the session trigger list.
1440 * It is the caller's responsability to ensure the session name is alive
1441 * for as long as this list is.
1442 */
1443 static
1444 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1445 const struct notification_thread_state *state,
1446 const char *session_name)
1447 {
1448 int trigger_count = 0;
1449 struct lttng_session_trigger_list *session_trigger_list = NULL;
1450 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1451 struct cds_lfht_iter iter;
1452
1453 session_trigger_list = lttng_session_trigger_list_create(session_name,
1454 state->session_triggers_ht);
1455
1456 /* Add all triggers applying to the session named 'session_name'. */
1457 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1458 node) {
1459 int ret;
1460
1461 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1462 session_name)) {
1463 continue;
1464 }
1465
1466 ret = lttng_session_trigger_list_add(session_trigger_list,
1467 trigger_ht_element->trigger);
1468 if (ret) {
1469 goto error;
1470 }
1471
1472 trigger_count++;
1473 }
1474
1475 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1476 trigger_count);
1477 return session_trigger_list;
1478 error:
1479 lttng_session_trigger_list_destroy(session_trigger_list);
1480 return NULL;
1481 }
1482
1483 static
1484 struct session_info *find_or_create_session_info(
1485 struct notification_thread_state *state,
1486 const char *name, uid_t uid, gid_t gid)
1487 {
1488 struct session_info *session = NULL;
1489 struct cds_lfht_node *node;
1490 struct cds_lfht_iter iter;
1491 struct lttng_session_trigger_list *trigger_list;
1492
1493 rcu_read_lock();
1494 cds_lfht_lookup(state->sessions_ht,
1495 hash_key_str(name, lttng_ht_seed),
1496 match_session,
1497 name,
1498 &iter);
1499 node = cds_lfht_iter_get_node(&iter);
1500 if (node) {
1501 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1502 name, uid, gid);
1503 session = caa_container_of(node, struct session_info,
1504 sessions_ht_node);
1505 assert(session->uid == uid);
1506 assert(session->gid == gid);
1507 session_info_get(session);
1508 goto end;
1509 }
1510
1511 trigger_list = lttng_session_trigger_list_build(state, name);
1512 if (!trigger_list) {
1513 goto error;
1514 }
1515
1516 session = session_info_create(name, uid, gid, trigger_list,
1517 state->sessions_ht);
1518 if (!session) {
1519 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1520 name, uid, gid);
1521 goto error;
1522 }
1523 trigger_list = NULL;
1524
1525 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1526 &session->sessions_ht_node);
1527 end:
1528 rcu_read_unlock();
1529 return session;
1530 error:
1531 rcu_read_unlock();
1532 session_info_put(session);
1533 return NULL;
1534 }
1535
1536 static
1537 int handle_notification_thread_command_add_channel(
1538 struct notification_thread_state *state,
1539 const char *session_name, uid_t session_uid, gid_t session_gid,
1540 const char *channel_name, enum lttng_domain_type channel_domain,
1541 uint64_t channel_key_int, uint64_t channel_capacity,
1542 enum lttng_error_code *cmd_result)
1543 {
1544 struct cds_list_head trigger_list;
1545 struct channel_info *new_channel_info = NULL;
1546 struct channel_key channel_key = {
1547 .key = channel_key_int,
1548 .domain = channel_domain,
1549 };
1550 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1551 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1552 int trigger_count = 0;
1553 struct cds_lfht_iter iter;
1554 struct session_info *session_info = NULL;
1555
1556 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1557 channel_name, session_name, channel_key_int,
1558 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1559
1560 CDS_INIT_LIST_HEAD(&trigger_list);
1561
1562 session_info = find_or_create_session_info(state, session_name,
1563 session_uid, session_gid);
1564 if (!session_info) {
1565 /* Allocation error or an internal error occurred. */
1566 goto error;
1567 }
1568
1569 new_channel_info = channel_info_create(channel_name, &channel_key,
1570 channel_capacity, session_info);
1571 if (!new_channel_info) {
1572 goto error;
1573 }
1574
1575 rcu_read_lock();
1576 /* Build a list of all triggers applying to the new channel. */
1577 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1578 node) {
1579 struct lttng_trigger_list_element *new_element;
1580
1581 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1582 new_channel_info)) {
1583 continue;
1584 }
1585
1586 new_element = zmalloc(sizeof(*new_element));
1587 if (!new_element) {
1588 rcu_read_unlock();
1589 goto error;
1590 }
1591 CDS_INIT_LIST_HEAD(&new_element->node);
1592 new_element->trigger = trigger_ht_element->trigger;
1593 cds_list_add(&new_element->node, &trigger_list);
1594 trigger_count++;
1595 }
1596 rcu_read_unlock();
1597
1598 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1599 trigger_count);
1600 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1601 if (!channel_trigger_list) {
1602 goto error;
1603 }
1604 channel_trigger_list->channel_key = new_channel_info->key;
1605 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1606 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1607 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1608
1609 rcu_read_lock();
1610 /* Add channel to the channel_ht which owns the channel_infos. */
1611 cds_lfht_add(state->channels_ht,
1612 hash_channel_key(&new_channel_info->key),
1613 &new_channel_info->channels_ht_node);
1614 /*
1615 * Add the list of triggers associated with this channel to the
1616 * channel_triggers_ht.
1617 */
1618 cds_lfht_add(state->channel_triggers_ht,
1619 hash_channel_key(&new_channel_info->key),
1620 &channel_trigger_list->channel_triggers_ht_node);
1621 rcu_read_unlock();
1622 session_info_put(session_info);
1623 *cmd_result = LTTNG_OK;
1624 return 0;
1625 error:
1626 channel_info_destroy(new_channel_info);
1627 session_info_put(session_info);
1628 return 1;
1629 }
1630
1631 static
1632 void free_channel_trigger_list_rcu(struct rcu_head *node)
1633 {
1634 free(caa_container_of(node, struct lttng_channel_trigger_list,
1635 rcu_node));
1636 }
1637
1638 static
1639 void free_channel_state_sample_rcu(struct rcu_head *node)
1640 {
1641 free(caa_container_of(node, struct channel_state_sample,
1642 rcu_node));
1643 }
1644
1645 static
1646 int handle_notification_thread_command_remove_channel(
1647 struct notification_thread_state *state,
1648 uint64_t channel_key, enum lttng_domain_type domain,
1649 enum lttng_error_code *cmd_result)
1650 {
1651 struct cds_lfht_node *node;
1652 struct cds_lfht_iter iter;
1653 struct lttng_channel_trigger_list *trigger_list;
1654 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1655 struct channel_key key = { .key = channel_key, .domain = domain };
1656 struct channel_info *channel_info;
1657
1658 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1659 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1660
1661 rcu_read_lock();
1662
1663 cds_lfht_lookup(state->channel_triggers_ht,
1664 hash_channel_key(&key),
1665 match_channel_trigger_list,
1666 &key,
1667 &iter);
1668 node = cds_lfht_iter_get_node(&iter);
1669 /*
1670 * There is a severe internal error if we are being asked to remove a
1671 * channel that doesn't exist.
1672 */
1673 if (!node) {
1674 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1675 goto end;
1676 }
1677
1678 /* Free the list of triggers associated with this channel. */
1679 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1680 channel_triggers_ht_node);
1681 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1682 &trigger_list->list, node) {
1683 cds_list_del(&trigger_list_element->node);
1684 free(trigger_list_element);
1685 }
1686 cds_lfht_del(state->channel_triggers_ht, node);
1687 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1688
1689 /* Free sampled channel state. */
1690 cds_lfht_lookup(state->channel_state_ht,
1691 hash_channel_key(&key),
1692 match_channel_state_sample,
1693 &key,
1694 &iter);
1695 node = cds_lfht_iter_get_node(&iter);
1696 /*
1697 * This is expected to be NULL if the channel is destroyed before we
1698 * received a sample.
1699 */
1700 if (node) {
1701 struct channel_state_sample *sample = caa_container_of(node,
1702 struct channel_state_sample,
1703 channel_state_ht_node);
1704
1705 cds_lfht_del(state->channel_state_ht, node);
1706 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1707 }
1708
1709 /* Remove the channel from the channels_ht and free it. */
1710 cds_lfht_lookup(state->channels_ht,
1711 hash_channel_key(&key),
1712 match_channel_info,
1713 &key,
1714 &iter);
1715 node = cds_lfht_iter_get_node(&iter);
1716 assert(node);
1717 channel_info = caa_container_of(node, struct channel_info,
1718 channels_ht_node);
1719 cds_lfht_del(state->channels_ht, node);
1720 channel_info_destroy(channel_info);
1721 end:
1722 rcu_read_unlock();
1723 *cmd_result = LTTNG_OK;
1724 return 0;
1725 }
1726
1727 static
1728 int handle_notification_thread_command_session_rotation(
1729 struct notification_thread_state *state,
1730 enum notification_thread_command_type cmd_type,
1731 const char *session_name, uid_t session_uid, gid_t session_gid,
1732 uint64_t trace_archive_chunk_id,
1733 struct lttng_trace_archive_location *location,
1734 enum lttng_error_code *_cmd_result)
1735 {
1736 int ret = 0;
1737 enum lttng_error_code cmd_result = LTTNG_OK;
1738 struct lttng_session_trigger_list *trigger_list;
1739 struct lttng_trigger_list_element *trigger_list_element;
1740 struct session_info *session_info;
1741
1742 rcu_read_lock();
1743
1744 session_info = find_or_create_session_info(state, session_name,
1745 session_uid, session_gid);
1746 if (!session_info) {
1747 /* Allocation error or an internal error occurred. */
1748 ret = -1;
1749 cmd_result = LTTNG_ERR_NOMEM;
1750 goto end;
1751 }
1752
1753 session_info->rotation.ongoing =
1754 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1755 session_info->rotation.id = trace_archive_chunk_id;
1756 trigger_list = get_session_trigger_list(state, session_name);
1757 if (!trigger_list) {
1758 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1759 session_name);
1760 goto end;
1761 }
1762
1763 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1764 node) {
1765 const struct lttng_condition *condition;
1766 const struct lttng_action *action;
1767 const struct lttng_trigger *trigger;
1768 struct notification_client_list *client_list;
1769 struct lttng_evaluation *evaluation = NULL;
1770 enum lttng_condition_type condition_type;
1771
1772 trigger = trigger_list_element->trigger;
1773 condition = lttng_trigger_get_const_condition(trigger);
1774 assert(condition);
1775 condition_type = lttng_condition_get_type(condition);
1776
1777 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1778 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1779 continue;
1780 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1781 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1782 continue;
1783 }
1784
1785 action = lttng_trigger_get_const_action(trigger);
1786
1787 /* Notify actions are the only type currently supported. */
1788 assert(lttng_action_get_type_const(action) ==
1789 LTTNG_ACTION_TYPE_NOTIFY);
1790
1791 client_list = get_client_list_from_condition(state, condition);
1792 assert(client_list);
1793
1794 if (cds_list_empty(&client_list->list)) {
1795 /*
1796 * No clients interested in the evaluation's result,
1797 * skip it.
1798 */
1799 continue;
1800 }
1801
1802 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1803 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1804 trace_archive_chunk_id);
1805 } else {
1806 evaluation = lttng_evaluation_session_rotation_completed_create(
1807 trace_archive_chunk_id, location);
1808 }
1809
1810 if (!evaluation) {
1811 /* Internal error */
1812 ret = -1;
1813 cmd_result = LTTNG_ERR_UNK;
1814 goto end;
1815 }
1816
1817 /* Dispatch evaluation result to all clients. */
1818 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1819 evaluation, client_list, state,
1820 session_info->uid,
1821 session_info->gid);
1822 lttng_evaluation_destroy(evaluation);
1823 if (caa_unlikely(ret)) {
1824 goto end;
1825 }
1826 }
1827 end:
1828 session_info_put(session_info);
1829 *_cmd_result = cmd_result;
1830 rcu_read_unlock();
1831 return ret;
1832 }
1833
1834 static
1835 int condition_is_supported(struct lttng_condition *condition)
1836 {
1837 int ret;
1838
1839 switch (lttng_condition_get_type(condition)) {
1840 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1841 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1842 {
1843 enum lttng_domain_type domain;
1844
1845 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1846 &domain);
1847 if (ret) {
1848 ret = -1;
1849 goto end;
1850 }
1851
1852 if (domain != LTTNG_DOMAIN_KERNEL) {
1853 ret = 1;
1854 goto end;
1855 }
1856
1857 /*
1858 * Older kernel tracers don't expose the API to monitor their
1859 * buffers. Therefore, we reject triggers that require that
1860 * mechanism to be available to be evaluated.
1861 */
1862 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1863 kernel_tracer_fd);
1864 break;
1865 }
1866 default:
1867 ret = 1;
1868 }
1869 end:
1870 return ret;
1871 }
1872
1873 /* Must be called with RCU read lock held. */
1874 static
1875 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1876 struct notification_thread_state *state)
1877 {
1878 int ret = 0;
1879 const struct lttng_condition *condition;
1880 const char *session_name;
1881 struct lttng_session_trigger_list *trigger_list;
1882
1883 condition = lttng_trigger_get_const_condition(trigger);
1884 switch (lttng_condition_get_type(condition)) {
1885 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1886 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1887 {
1888 enum lttng_condition_status status;
1889
1890 status = lttng_condition_session_rotation_get_session_name(
1891 condition, &session_name);
1892 if (status != LTTNG_CONDITION_STATUS_OK) {
1893 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1894 ret = -1;
1895 goto end;
1896 }
1897 break;
1898 }
1899 default:
1900 ret = -1;
1901 goto end;
1902 }
1903
1904 trigger_list = get_session_trigger_list(state, session_name);
1905 if (!trigger_list) {
1906 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1907 session_name);
1908 goto end;
1909
1910 }
1911
1912 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1913 session_name);
1914 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1915 end:
1916 return ret;
1917 }
1918
1919 /* Must be called with RCU read lock held. */
1920 static
1921 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1922 struct notification_thread_state *state)
1923 {
1924 int ret = 0;
1925 struct cds_lfht_node *node;
1926 struct cds_lfht_iter iter;
1927 struct channel_info *channel;
1928
1929 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1930 channels_ht_node) {
1931 struct lttng_trigger_list_element *trigger_list_element;
1932 struct lttng_channel_trigger_list *trigger_list;
1933
1934 if (!trigger_applies_to_channel(trigger, channel)) {
1935 continue;
1936 }
1937
1938 cds_lfht_lookup(state->channel_triggers_ht,
1939 hash_channel_key(&channel->key),
1940 match_channel_trigger_list,
1941 &channel->key,
1942 &iter);
1943 node = cds_lfht_iter_get_node(&iter);
1944 assert(node);
1945 trigger_list = caa_container_of(node,
1946 struct lttng_channel_trigger_list,
1947 channel_triggers_ht_node);
1948
1949 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1950 if (!trigger_list_element) {
1951 ret = -1;
1952 goto end;
1953 }
1954 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1955 trigger_list_element->trigger = trigger;
1956 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1957 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1958 channel->name);
1959 }
1960 end:
1961 return ret;
1962 }
1963
1964 /*
1965 * FIXME A client's credentials are not checked when registering a trigger, nor
1966 * are they stored alongside with the trigger.
1967 *
1968 * The effects of this are benign since:
1969 * - The client will succeed in registering the trigger, as it is valid,
1970 * - The trigger will, internally, be bound to the channel/session,
1971 * - The notifications will not be sent since the client's credentials
1972 * are checked against the channel at that moment.
1973 *
1974 * If this function returns a non-zero value, it means something is
1975 * fundamentally broken and the whole subsystem/thread will be torn down.
1976 *
1977 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1978 * error code.
1979 */
1980 static
1981 int handle_notification_thread_command_register_trigger(
1982 struct notification_thread_state *state,
1983 struct lttng_trigger *trigger,
1984 enum lttng_error_code *cmd_result)
1985 {
1986 int ret = 0;
1987 struct lttng_condition *condition;
1988 struct notification_client *client;
1989 struct notification_client_list *client_list = NULL;
1990 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1991 struct notification_client_list_element *client_list_element, *tmp;
1992 struct cds_lfht_node *node;
1993 struct cds_lfht_iter iter;
1994 bool free_trigger = true;
1995
1996 rcu_read_lock();
1997
1998 condition = lttng_trigger_get_condition(trigger);
1999 assert(condition);
2000
2001 ret = condition_is_supported(condition);
2002 if (ret < 0) {
2003 goto error;
2004 } else if (ret == 0) {
2005 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2006 goto error;
2007 } else {
2008 /* Feature is supported, continue. */
2009 ret = 0;
2010 }
2011
2012 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2013 if (!trigger_ht_element) {
2014 ret = -1;
2015 goto error;
2016 }
2017
2018 /* Add trigger to the trigger_ht. */
2019 cds_lfht_node_init(&trigger_ht_element->node);
2020 trigger_ht_element->trigger = trigger;
2021
2022 node = cds_lfht_add_unique(state->triggers_ht,
2023 lttng_condition_hash(condition),
2024 match_condition,
2025 condition,
2026 &trigger_ht_element->node);
2027 if (node != &trigger_ht_element->node) {
2028 /* Not a fatal error, simply report it to the client. */
2029 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2030 goto error_free_ht_element;
2031 }
2032
2033 /*
2034 * Ownership of the trigger and of its wrapper was transfered to
2035 * the triggers_ht.
2036 */
2037 trigger_ht_element = NULL;
2038 free_trigger = false;
2039
2040 /*
2041 * The rest only applies to triggers that have a "notify" action.
2042 * It is not skipped as this is the only action type currently
2043 * supported.
2044 */
2045 client_list = zmalloc(sizeof(*client_list));
2046 if (!client_list) {
2047 ret = -1;
2048 goto error_free_ht_element;
2049 }
2050 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2051 CDS_INIT_LIST_HEAD(&client_list->list);
2052 client_list->trigger = trigger;
2053
2054 /* Build a list of clients to which this new trigger applies. */
2055 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2056 client_socket_ht_node) {
2057 if (!trigger_applies_to_client(trigger, client)) {
2058 continue;
2059 }
2060
2061 client_list_element = zmalloc(sizeof(*client_list_element));
2062 if (!client_list_element) {
2063 ret = -1;
2064 goto error_free_client_list;
2065 }
2066 CDS_INIT_LIST_HEAD(&client_list_element->node);
2067 client_list_element->client = client;
2068 cds_list_add(&client_list_element->node, &client_list->list);
2069 }
2070
2071 cds_lfht_add(state->notification_trigger_clients_ht,
2072 lttng_condition_hash(condition),
2073 &client_list->notification_trigger_ht_node);
2074
2075 switch (get_condition_binding_object(condition)) {
2076 case LTTNG_OBJECT_TYPE_SESSION:
2077 /* Add the trigger to the list if it matches a known session. */
2078 ret = bind_trigger_to_matching_session(trigger, state);
2079 if (ret) {
2080 goto error_free_client_list;
2081 }
2082 break;
2083 case LTTNG_OBJECT_TYPE_CHANNEL:
2084 /*
2085 * Add the trigger to list of triggers bound to the channels
2086 * currently known.
2087 */
2088 ret = bind_trigger_to_matching_channels(trigger, state);
2089 if (ret) {
2090 goto error_free_client_list;
2091 }
2092 break;
2093 case LTTNG_OBJECT_TYPE_NONE:
2094 break;
2095 default:
2096 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2097 ret = -1;
2098 goto error_free_client_list;
2099 }
2100
2101 /*
2102 * Since there is nothing preventing clients from subscribing to a
2103 * condition before the corresponding trigger is registered, we have
2104 * to evaluate this new condition right away.
2105 *
2106 * At some point, we were waiting for the next "evaluation" (e.g. on
2107 * reception of a channel sample) to evaluate this new condition, but
2108 * that was broken.
2109 *
2110 * The reason it was broken is that waiting for the next sample
2111 * does not allow us to properly handle transitions for edge-triggered
2112 * conditions.
2113 *
2114 * Consider this example: when we handle a new channel sample, we
2115 * evaluate each conditions twice: once with the previous state, and
2116 * again with the newest state. We then use those two results to
2117 * determine whether a state change happened: a condition was false and
2118 * became true. If a state change happened, we have to notify clients.
2119 *
2120 * Now, if a client subscribes to a given notification and registers
2121 * a trigger *after* that subscription, we have to make sure the
2122 * condition is evaluated at this point while considering only the
2123 * current state. Otherwise, the next evaluation cycle may only see
2124 * that the evaluations remain the same (true for samples n-1 and n) and
2125 * the client will never know that the condition has been met.
2126 */
2127 cds_list_for_each_entry_safe(client_list_element, tmp,
2128 &client_list->list, node) {
2129 ret = evaluate_condition_for_client(trigger, condition,
2130 client_list_element->client, state);
2131 if (ret) {
2132 goto error_free_client_list;
2133 }
2134 }
2135
2136 /*
2137 * Client list ownership transferred to the
2138 * notification_trigger_clients_ht.
2139 */
2140 client_list = NULL;
2141
2142 *cmd_result = LTTNG_OK;
2143 error_free_client_list:
2144 if (client_list) {
2145 cds_list_for_each_entry_safe(client_list_element, tmp,
2146 &client_list->list, node) {
2147 free(client_list_element);
2148 }
2149 free(client_list);
2150 }
2151 error_free_ht_element:
2152 free(trigger_ht_element);
2153 error:
2154 if (free_trigger) {
2155 struct lttng_action *action = lttng_trigger_get_action(trigger);
2156
2157 lttng_condition_destroy(condition);
2158 lttng_action_destroy(action);
2159 lttng_trigger_destroy(trigger);
2160 }
2161 rcu_read_unlock();
2162 return ret;
2163 }
2164
2165 static
2166 void free_notification_client_list_rcu(struct rcu_head *node)
2167 {
2168 free(caa_container_of(node, struct notification_client_list,
2169 rcu_node));
2170 }
2171
2172 static
2173 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2174 {
2175 free(caa_container_of(node, struct lttng_trigger_ht_element,
2176 rcu_node));
2177 }
2178
2179 static
2180 int handle_notification_thread_command_unregister_trigger(
2181 struct notification_thread_state *state,
2182 struct lttng_trigger *trigger,
2183 enum lttng_error_code *_cmd_reply)
2184 {
2185 struct cds_lfht_iter iter;
2186 struct cds_lfht_node *triggers_ht_node;
2187 struct lttng_channel_trigger_list *trigger_list;
2188 struct notification_client_list *client_list;
2189 struct notification_client_list_element *client_list_element, *tmp;
2190 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2191 struct lttng_condition *condition = lttng_trigger_get_condition(
2192 trigger);
2193 struct lttng_action *action;
2194 enum lttng_error_code cmd_reply;
2195
2196 rcu_read_lock();
2197
2198 cds_lfht_lookup(state->triggers_ht,
2199 lttng_condition_hash(condition),
2200 match_condition,
2201 condition,
2202 &iter);
2203 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2204 if (!triggers_ht_node) {
2205 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2206 goto end;
2207 } else {
2208 cmd_reply = LTTNG_OK;
2209 }
2210
2211 /* Remove trigger from channel_triggers_ht. */
2212 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2213 channel_triggers_ht_node) {
2214 struct lttng_trigger_list_element *trigger_element, *tmp;
2215
2216 cds_list_for_each_entry_safe(trigger_element, tmp,
2217 &trigger_list->list, node) {
2218 const struct lttng_condition *current_condition =
2219 lttng_trigger_get_const_condition(
2220 trigger_element->trigger);
2221
2222 assert(current_condition);
2223 if (!lttng_condition_is_equal(condition,
2224 current_condition)) {
2225 continue;
2226 }
2227
2228 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2229 cds_list_del(&trigger_element->node);
2230 /* A trigger can only appear once per channel */
2231 break;
2232 }
2233 }
2234
2235 /*
2236 * Remove and release the client list from
2237 * notification_trigger_clients_ht.
2238 */
2239 client_list = get_client_list_from_condition(state, condition);
2240 assert(client_list);
2241
2242 cds_list_for_each_entry_safe(client_list_element, tmp,
2243 &client_list->list, node) {
2244 free(client_list_element);
2245 }
2246 cds_lfht_del(state->notification_trigger_clients_ht,
2247 &client_list->notification_trigger_ht_node);
2248 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
2249
2250 /* Remove trigger from triggers_ht. */
2251 trigger_ht_element = caa_container_of(triggers_ht_node,
2252 struct lttng_trigger_ht_element, node);
2253 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2254
2255 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
2256 lttng_condition_destroy(condition);
2257 action = lttng_trigger_get_action(trigger_ht_element->trigger);
2258 lttng_action_destroy(action);
2259 lttng_trigger_destroy(trigger_ht_element->trigger);
2260 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2261 end:
2262 rcu_read_unlock();
2263 if (_cmd_reply) {
2264 *_cmd_reply = cmd_reply;
2265 }
2266 return 0;
2267 }
2268
2269 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2270 int handle_notification_thread_command(
2271 struct notification_thread_handle *handle,
2272 struct notification_thread_state *state)
2273 {
2274 int ret;
2275 uint64_t counter;
2276 struct notification_thread_command *cmd;
2277
2278 /* Read the event pipe to put it back into a quiescent state. */
2279 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2280 sizeof(counter));
2281 if (ret == -1) {
2282 goto error;
2283 }
2284
2285 pthread_mutex_lock(&handle->cmd_queue.lock);
2286 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2287 struct notification_thread_command, cmd_list_node);
2288 switch (cmd->type) {
2289 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2290 DBG("[notification-thread] Received register trigger command");
2291 ret = handle_notification_thread_command_register_trigger(
2292 state, cmd->parameters.trigger,
2293 &cmd->reply_code);
2294 break;
2295 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2296 DBG("[notification-thread] Received unregister trigger command");
2297 ret = handle_notification_thread_command_unregister_trigger(
2298 state, cmd->parameters.trigger,
2299 &cmd->reply_code);
2300 break;
2301 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2302 DBG("[notification-thread] Received add channel command");
2303 ret = handle_notification_thread_command_add_channel(
2304 state,
2305 cmd->parameters.add_channel.session.name,
2306 cmd->parameters.add_channel.session.uid,
2307 cmd->parameters.add_channel.session.gid,
2308 cmd->parameters.add_channel.channel.name,
2309 cmd->parameters.add_channel.channel.domain,
2310 cmd->parameters.add_channel.channel.key,
2311 cmd->parameters.add_channel.channel.capacity,
2312 &cmd->reply_code);
2313 break;
2314 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2315 DBG("[notification-thread] Received remove channel command");
2316 ret = handle_notification_thread_command_remove_channel(
2317 state, cmd->parameters.remove_channel.key,
2318 cmd->parameters.remove_channel.domain,
2319 &cmd->reply_code);
2320 break;
2321 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2322 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2323 DBG("[notification-thread] Received session rotation %s command",
2324 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2325 "ongoing" : "completed");
2326 ret = handle_notification_thread_command_session_rotation(
2327 state,
2328 cmd->type,
2329 cmd->parameters.session_rotation.session_name,
2330 cmd->parameters.session_rotation.uid,
2331 cmd->parameters.session_rotation.gid,
2332 cmd->parameters.session_rotation.trace_archive_chunk_id,
2333 cmd->parameters.session_rotation.location,
2334 &cmd->reply_code);
2335 break;
2336 case NOTIFICATION_COMMAND_TYPE_QUIT:
2337 DBG("[notification-thread] Received quit command");
2338 cmd->reply_code = LTTNG_OK;
2339 ret = 1;
2340 goto end;
2341 default:
2342 ERR("[notification-thread] Unknown internal command received");
2343 goto error_unlock;
2344 }
2345
2346 if (ret) {
2347 goto error_unlock;
2348 }
2349 end:
2350 cds_list_del(&cmd->cmd_list_node);
2351 lttng_waiter_wake_up(&cmd->reply_waiter);
2352 pthread_mutex_unlock(&handle->cmd_queue.lock);
2353 return ret;
2354 error_unlock:
2355 /* Wake-up and return a fatal error to the calling thread. */
2356 lttng_waiter_wake_up(&cmd->reply_waiter);
2357 pthread_mutex_unlock(&handle->cmd_queue.lock);
2358 cmd->reply_code = LTTNG_ERR_FATAL;
2359 error:
2360 /* Indicate a fatal error to the caller. */
2361 return -1;
2362 }
2363
2364 static
2365 unsigned long hash_client_socket(int socket)
2366 {
2367 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2368 }
2369
2370 static
2371 int socket_set_non_blocking(int socket)
2372 {
2373 int ret, flags;
2374
2375 /* Set the pipe as non-blocking. */
2376 ret = fcntl(socket, F_GETFL, 0);
2377 if (ret == -1) {
2378 PERROR("fcntl get socket flags");
2379 goto end;
2380 }
2381 flags = ret;
2382
2383 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2384 if (ret == -1) {
2385 PERROR("fcntl set O_NONBLOCK socket flag");
2386 goto end;
2387 }
2388 DBG("Client socket (fd = %i) set as non-blocking", socket);
2389 end:
2390 return ret;
2391 }
2392
2393 static
2394 int client_reset_inbound_state(struct notification_client *client)
2395 {
2396 int ret;
2397
2398 ret = lttng_dynamic_buffer_set_size(
2399 &client->communication.inbound.buffer, 0);
2400 assert(!ret);
2401
2402 client->communication.inbound.bytes_to_receive =
2403 sizeof(struct lttng_notification_channel_message);
2404 client->communication.inbound.msg_type =
2405 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2406 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2407 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2408 ret = lttng_dynamic_buffer_set_size(
2409 &client->communication.inbound.buffer,
2410 client->communication.inbound.bytes_to_receive);
2411 return ret;
2412 }
2413
2414 int handle_notification_thread_client_connect(
2415 struct notification_thread_state *state)
2416 {
2417 int ret;
2418 struct notification_client *client;
2419
2420 DBG("[notification-thread] Handling new notification channel client connection");
2421
2422 client = zmalloc(sizeof(*client));
2423 if (!client) {
2424 /* Fatal error. */
2425 ret = -1;
2426 goto error;
2427 }
2428 CDS_INIT_LIST_HEAD(&client->condition_list);
2429 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2430 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2431 client->communication.inbound.expect_creds = true;
2432 ret = client_reset_inbound_state(client);
2433 if (ret) {
2434 ERR("[notification-thread] Failed to reset client communication's inbound state");
2435 ret = 0;
2436 goto error;
2437 }
2438
2439 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2440 if (ret < 0) {
2441 ERR("[notification-thread] Failed to accept new notification channel client connection");
2442 ret = 0;
2443 goto error;
2444 }
2445
2446 client->socket = ret;
2447
2448 ret = socket_set_non_blocking(client->socket);
2449 if (ret) {
2450 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2451 goto error;
2452 }
2453
2454 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2455 if (ret < 0) {
2456 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2457 ret = 0;
2458 goto error;
2459 }
2460
2461 ret = lttng_poll_add(&state->events, client->socket,
2462 LPOLLIN | LPOLLERR |
2463 LPOLLHUP | LPOLLRDHUP);
2464 if (ret < 0) {
2465 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2466 ret = 0;
2467 goto error;
2468 }
2469 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2470 client->socket);
2471
2472 rcu_read_lock();
2473 cds_lfht_add(state->client_socket_ht,
2474 hash_client_socket(client->socket),
2475 &client->client_socket_ht_node);
2476 rcu_read_unlock();
2477
2478 return ret;
2479 error:
2480 notification_client_destroy(client, state);
2481 return ret;
2482 }
2483
2484 int handle_notification_thread_client_disconnect(
2485 int client_socket,
2486 struct notification_thread_state *state)
2487 {
2488 int ret = 0;
2489 struct notification_client *client;
2490
2491 rcu_read_lock();
2492 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2493 client_socket);
2494 client = get_client_from_socket(client_socket, state);
2495 if (!client) {
2496 /* Internal state corruption, fatal error. */
2497 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2498 client_socket);
2499 ret = -1;
2500 goto end;
2501 }
2502
2503 ret = lttng_poll_del(&state->events, client_socket);
2504 if (ret) {
2505 ERR("[notification-thread] Failed to remove client socket from poll set");
2506 }
2507 cds_lfht_del(state->client_socket_ht,
2508 &client->client_socket_ht_node);
2509 notification_client_destroy(client, state);
2510 end:
2511 rcu_read_unlock();
2512 return ret;
2513 }
2514
2515 int handle_notification_thread_client_disconnect_all(
2516 struct notification_thread_state *state)
2517 {
2518 struct cds_lfht_iter iter;
2519 struct notification_client *client;
2520 bool error_encoutered = false;
2521
2522 rcu_read_lock();
2523 DBG("[notification-thread] Closing all client connections");
2524 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2525 client_socket_ht_node) {
2526 int ret;
2527
2528 ret = handle_notification_thread_client_disconnect(
2529 client->socket, state);
2530 if (ret) {
2531 error_encoutered = true;
2532 }
2533 }
2534 rcu_read_unlock();
2535 return error_encoutered ? 1 : 0;
2536 }
2537
2538 int handle_notification_thread_trigger_unregister_all(
2539 struct notification_thread_state *state)
2540 {
2541 bool error_occurred = false;
2542 struct cds_lfht_iter iter;
2543 struct lttng_trigger_ht_element *trigger_ht_element;
2544
2545 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2546 node) {
2547 int ret = handle_notification_thread_command_unregister_trigger(
2548 state, trigger_ht_element->trigger, NULL);
2549 if (ret) {
2550 error_occurred = true;
2551 }
2552 }
2553 return error_occurred ? -1 : 0;
2554 }
2555
2556 static
2557 int client_flush_outgoing_queue(struct notification_client *client,
2558 struct notification_thread_state *state)
2559 {
2560 ssize_t ret;
2561 size_t to_send_count;
2562
2563 assert(client->communication.outbound.buffer.size != 0);
2564 to_send_count = client->communication.outbound.buffer.size;
2565 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2566 client->socket);
2567
2568 ret = lttcomm_send_unix_sock_non_block(client->socket,
2569 client->communication.outbound.buffer.data,
2570 to_send_count);
2571 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2572 (ret > 0 && ret < to_send_count)) {
2573 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2574 client->socket);
2575 to_send_count -= max(ret, 0);
2576
2577 memcpy(client->communication.outbound.buffer.data,
2578 client->communication.outbound.buffer.data +
2579 client->communication.outbound.buffer.size - to_send_count,
2580 to_send_count);
2581 ret = lttng_dynamic_buffer_set_size(
2582 &client->communication.outbound.buffer,
2583 to_send_count);
2584 if (ret) {
2585 goto error;
2586 }
2587
2588 /*
2589 * We want to be notified whenever there is buffer space
2590 * available to send the rest of the payload.
2591 */
2592 ret = lttng_poll_mod(&state->events, client->socket,
2593 CLIENT_POLL_MASK_IN_OUT);
2594 if (ret) {
2595 goto error;
2596 }
2597 } else if (ret < 0) {
2598 /* Generic error, disconnect the client. */
2599 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2600 client->socket);
2601 ret = handle_notification_thread_client_disconnect(
2602 client->socket, state);
2603 if (ret) {
2604 goto error;
2605 }
2606 } else {
2607 /* No error and flushed the queue completely. */
2608 ret = lttng_dynamic_buffer_set_size(
2609 &client->communication.outbound.buffer, 0);
2610 if (ret) {
2611 goto error;
2612 }
2613 ret = lttng_poll_mod(&state->events, client->socket,
2614 CLIENT_POLL_MASK_IN);
2615 if (ret) {
2616 goto error;
2617 }
2618
2619 client->communication.outbound.queued_command_reply = false;
2620 client->communication.outbound.dropped_notification = false;
2621 }
2622
2623 return 0;
2624 error:
2625 return -1;
2626 }
2627
2628 static
2629 int client_send_command_reply(struct notification_client *client,
2630 struct notification_thread_state *state,
2631 enum lttng_notification_channel_status status)
2632 {
2633 int ret;
2634 struct lttng_notification_channel_command_reply reply = {
2635 .status = (int8_t) status,
2636 };
2637 struct lttng_notification_channel_message msg = {
2638 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2639 .size = sizeof(reply),
2640 };
2641 char buffer[sizeof(msg) + sizeof(reply)];
2642
2643 if (client->communication.outbound.queued_command_reply) {
2644 /* Protocol error. */
2645 goto error;
2646 }
2647
2648 memcpy(buffer, &msg, sizeof(msg));
2649 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2650 DBG("[notification-thread] Send command reply (%i)", (int) status);
2651
2652 /* Enqueue buffer to outgoing queue and flush it. */
2653 ret = lttng_dynamic_buffer_append(
2654 &client->communication.outbound.buffer,
2655 buffer, sizeof(buffer));
2656 if (ret) {
2657 goto error;
2658 }
2659
2660 ret = client_flush_outgoing_queue(client, state);
2661 if (ret) {
2662 goto error;
2663 }
2664
2665 if (client->communication.outbound.buffer.size != 0) {
2666 /* Queue could not be emptied. */
2667 client->communication.outbound.queued_command_reply = true;
2668 }
2669
2670 return 0;
2671 error:
2672 return -1;
2673 }
2674
2675 static
2676 int client_dispatch_message(struct notification_client *client,
2677 struct notification_thread_state *state)
2678 {
2679 int ret = 0;
2680
2681 if (client->communication.inbound.msg_type !=
2682 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2683 client->communication.inbound.msg_type !=
2684 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2685 !client->validated) {
2686 WARN("[notification-thread] client attempted a command before handshake");
2687 ret = -1;
2688 goto end;
2689 }
2690
2691 switch (client->communication.inbound.msg_type) {
2692 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2693 {
2694 /*
2695 * Receiving message header. The function will be called again
2696 * once the rest of the message as been received and can be
2697 * interpreted.
2698 */
2699 const struct lttng_notification_channel_message *msg;
2700
2701 assert(sizeof(*msg) ==
2702 client->communication.inbound.buffer.size);
2703 msg = (const struct lttng_notification_channel_message *)
2704 client->communication.inbound.buffer.data;
2705
2706 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2707 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2708 ret = -1;
2709 goto end;
2710 }
2711
2712 switch (msg->type) {
2713 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2714 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2715 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2716 break;
2717 default:
2718 ret = -1;
2719 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2720 goto end;
2721 }
2722
2723 client->communication.inbound.bytes_to_receive = msg->size;
2724 client->communication.inbound.msg_type =
2725 (enum lttng_notification_channel_message_type) msg->type;
2726 ret = lttng_dynamic_buffer_set_size(
2727 &client->communication.inbound.buffer, msg->size);
2728 if (ret) {
2729 goto end;
2730 }
2731 break;
2732 }
2733 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2734 {
2735 struct lttng_notification_channel_command_handshake *handshake_client;
2736 struct lttng_notification_channel_command_handshake handshake_reply = {
2737 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2738 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2739 };
2740 struct lttng_notification_channel_message msg_header = {
2741 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2742 .size = sizeof(handshake_reply),
2743 };
2744 enum lttng_notification_channel_status status =
2745 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2746 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2747
2748 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2749 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2750 sizeof(handshake_reply));
2751
2752 handshake_client =
2753 (struct lttng_notification_channel_command_handshake *)
2754 client->communication.inbound.buffer.data;
2755 client->major = handshake_client->major;
2756 client->minor = handshake_client->minor;
2757 if (!client->communication.inbound.creds_received) {
2758 ERR("[notification-thread] No credentials received from client");
2759 ret = -1;
2760 goto end;
2761 }
2762
2763 client->uid = LTTNG_SOCK_GET_UID_CRED(
2764 &client->communication.inbound.creds);
2765 client->gid = LTTNG_SOCK_GET_GID_CRED(
2766 &client->communication.inbound.creds);
2767 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2768 client->uid, client->gid, (int) client->major,
2769 (int) client->minor);
2770
2771 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2772 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2773 }
2774
2775 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2776 send_buffer, sizeof(send_buffer));
2777 if (ret) {
2778 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2779 goto end;
2780 }
2781
2782 ret = client_flush_outgoing_queue(client, state);
2783 if (ret) {
2784 goto end;
2785 }
2786
2787 ret = client_send_command_reply(client, state, status);
2788 if (ret) {
2789 ERR("[notification-thread] Failed to send reply to notification channel client");
2790 goto end;
2791 }
2792
2793 /* Set reception state to receive the next message header. */
2794 ret = client_reset_inbound_state(client);
2795 if (ret) {
2796 ERR("[notification-thread] Failed to reset client communication's inbound state");
2797 goto end;
2798 }
2799 client->validated = true;
2800 break;
2801 }
2802 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2803 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2804 {
2805 struct lttng_condition *condition;
2806 enum lttng_notification_channel_status status =
2807 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2808 const struct lttng_buffer_view condition_view =
2809 lttng_buffer_view_from_dynamic_buffer(
2810 &client->communication.inbound.buffer,
2811 0, -1);
2812 size_t expected_condition_size =
2813 client->communication.inbound.buffer.size;
2814
2815 ret = lttng_condition_create_from_buffer(&condition_view,
2816 &condition);
2817 if (ret != expected_condition_size) {
2818 ERR("[notification-thread] Malformed condition received from client");
2819 goto end;
2820 }
2821
2822 if (client->communication.inbound.msg_type ==
2823 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2824 ret = notification_thread_client_subscribe(client,
2825 condition, state, &status);
2826 } else {
2827 ret = notification_thread_client_unsubscribe(client,
2828 condition, state, &status);
2829 }
2830 if (ret) {
2831 goto end;
2832 }
2833
2834 ret = client_send_command_reply(client, state, status);
2835 if (ret) {
2836 ERR("[notification-thread] Failed to send reply to notification channel client");
2837 goto end;
2838 }
2839
2840 /* Set reception state to receive the next message header. */
2841 ret = client_reset_inbound_state(client);
2842 if (ret) {
2843 ERR("[notification-thread] Failed to reset client communication's inbound state");
2844 goto end;
2845 }
2846 break;
2847 }
2848 default:
2849 abort();
2850 }
2851 end:
2852 return ret;
2853 }
2854
2855 /* Incoming data from client. */
2856 int handle_notification_thread_client_in(
2857 struct notification_thread_state *state, int socket)
2858 {
2859 int ret = 0;
2860 struct notification_client *client;
2861 ssize_t recv_ret;
2862 size_t offset;
2863
2864 client = get_client_from_socket(socket, state);
2865 if (!client) {
2866 /* Internal error, abort. */
2867 ret = -1;
2868 goto end;
2869 }
2870
2871 offset = client->communication.inbound.buffer.size -
2872 client->communication.inbound.bytes_to_receive;
2873 if (client->communication.inbound.expect_creds) {
2874 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2875 client->communication.inbound.buffer.data + offset,
2876 client->communication.inbound.bytes_to_receive,
2877 &client->communication.inbound.creds);
2878 if (recv_ret > 0) {
2879 client->communication.inbound.expect_creds = false;
2880 client->communication.inbound.creds_received = true;
2881 }
2882 } else {
2883 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2884 client->communication.inbound.buffer.data + offset,
2885 client->communication.inbound.bytes_to_receive);
2886 }
2887 if (recv_ret < 0) {
2888 goto error_disconnect_client;
2889 }
2890
2891 client->communication.inbound.bytes_to_receive -= recv_ret;
2892 if (client->communication.inbound.bytes_to_receive == 0) {
2893 ret = client_dispatch_message(client, state);
2894 if (ret) {
2895 /*
2896 * Only returns an error if this client must be
2897 * disconnected.
2898 */
2899 goto error_disconnect_client;
2900 }
2901 } else {
2902 goto end;
2903 }
2904 end:
2905 return ret;
2906 error_disconnect_client:
2907 ret = handle_notification_thread_client_disconnect(socket, state);
2908 return ret;
2909 }
2910
2911 /* Client ready to receive outgoing data. */
2912 int handle_notification_thread_client_out(
2913 struct notification_thread_state *state, int socket)
2914 {
2915 int ret;
2916 struct notification_client *client;
2917
2918 client = get_client_from_socket(socket, state);
2919 if (!client) {
2920 /* Internal error, abort. */
2921 ret = -1;
2922 goto end;
2923 }
2924
2925 ret = client_flush_outgoing_queue(client, state);
2926 if (ret) {
2927 goto end;
2928 }
2929 end:
2930 return ret;
2931 }
2932
2933 static
2934 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2935 const struct channel_state_sample *sample,
2936 uint64_t buffer_capacity)
2937 {
2938 bool result = false;
2939 uint64_t threshold;
2940 enum lttng_condition_type condition_type;
2941 const struct lttng_condition_buffer_usage *use_condition = container_of(
2942 condition, struct lttng_condition_buffer_usage,
2943 parent);
2944
2945 if (use_condition->threshold_bytes.set) {
2946 threshold = use_condition->threshold_bytes.value;
2947 } else {
2948 /*
2949 * Threshold was expressed as a ratio.
2950 *
2951 * TODO the threshold (in bytes) of conditions expressed
2952 * as a ratio of total buffer size could be cached to
2953 * forego this double-multiplication or it could be performed
2954 * as fixed-point math.
2955 *
2956 * Note that caching should accomodate the case where the
2957 * condition applies to multiple channels (i.e. don't assume
2958 * that all channels matching my_chann* have the same size...)
2959 */
2960 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2961 (double) buffer_capacity);
2962 }
2963
2964 condition_type = lttng_condition_get_type(condition);
2965 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2966 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2967 threshold, sample->highest_usage);
2968
2969 /*
2970 * The low condition should only be triggered once _all_ of the
2971 * streams in a channel have gone below the "low" threshold.
2972 */
2973 if (sample->highest_usage <= threshold) {
2974 result = true;
2975 }
2976 } else {
2977 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2978 threshold, sample->highest_usage);
2979
2980 /*
2981 * For high buffer usage scenarios, we want to trigger whenever
2982 * _any_ of the streams has reached the "high" threshold.
2983 */
2984 if (sample->highest_usage >= threshold) {
2985 result = true;
2986 }
2987 }
2988
2989 return result;
2990 }
2991
2992 static
2993 bool evaluate_session_consumed_size_condition(
2994 const struct lttng_condition *condition,
2995 uint64_t session_consumed_size)
2996 {
2997 uint64_t threshold;
2998 const struct lttng_condition_session_consumed_size *size_condition =
2999 container_of(condition,
3000 struct lttng_condition_session_consumed_size,
3001 parent);
3002
3003 threshold = size_condition->consumed_threshold_bytes.value;
3004 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3005 threshold, session_consumed_size);
3006 return session_consumed_size >= threshold;
3007 }
3008
3009 static
3010 int evaluate_buffer_condition(const struct lttng_condition *condition,
3011 struct lttng_evaluation **evaluation,
3012 const struct notification_thread_state *state,
3013 const struct channel_state_sample *previous_sample,
3014 const struct channel_state_sample *latest_sample,
3015 uint64_t previous_session_consumed_total,
3016 uint64_t latest_session_consumed_total,
3017 struct channel_info *channel_info)
3018 {
3019 int ret = 0;
3020 enum lttng_condition_type condition_type;
3021 const bool previous_sample_available = !!previous_sample;
3022 bool previous_sample_result = false;
3023 bool latest_sample