f7dd90cbe0aec976925dd0215af89d8b81648b10
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/notification/notification-internal.h>
23 #include <lttng/condition/condition-internal.h>
24 #include <lttng/condition/buffer-usage-internal.h>
25 #include <lttng/condition/session-consumed-size-internal.h>
26 #include <lttng/condition/session-rotation-internal.h>
27 #include <lttng/notification/channel-internal.h>
28
29 #include <time.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <inttypes.h>
33 #include <fcntl.h>
34
35 #include "notification-thread.h"
36 #include "notification-thread-events.h"
37 #include "notification-thread-commands.h"
38 #include "lttng-sessiond.h"
39 #include "kernel.h"
40
41 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
44 enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49 };
50
51 struct lttng_trigger_list_element {
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 /* List of struct lttng_trigger_list_element. */
60 struct cds_list_head list;
61 /* Node in the channel_triggers_ht */
62 struct cds_lfht_node channel_triggers_ht_node;
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
65 };
66
67 /*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76 struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106 };
107
108 struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
113 };
114
115 struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118 };
119
120 struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123 };
124
125 struct notification_client_list {
126 const struct lttng_trigger *trigger;
127 struct cds_list_head list;
128 struct cds_lfht_node notification_trigger_ht_node;
129 /* call_rcu delayed reclaim. */
130 struct rcu_head rcu_node;
131 };
132
133 struct notification_client {
134 int socket;
135 /* Client protocol version. */
136 uint8_t major, minor;
137 uid_t uid;
138 gid_t gid;
139 /*
140 * Indicates if the credentials and versions of the client have been
141 * checked.
142 */
143 bool validated;
144 /*
145 * Conditions to which the client's notification channel is subscribed.
146 * List of struct lttng_condition_list_node. The condition member is
147 * owned by the client.
148 */
149 struct cds_list_head condition_list;
150 struct cds_lfht_node client_socket_ht_node;
151 struct {
152 struct {
153 /*
154 * During the reception of a message, the reception
155 * buffers' "size" is set to contain the current
156 * message's complete payload.
157 */
158 struct lttng_dynamic_buffer buffer;
159 /* Bytes left to receive for the current message. */
160 size_t bytes_to_receive;
161 /* Type of the message being received. */
162 enum lttng_notification_channel_message_type msg_type;
163 /*
164 * Indicates whether or not credentials are expected
165 * from the client.
166 */
167 bool expect_creds;
168 /*
169 * Indicates whether or not credentials were received
170 * from the client.
171 */
172 bool creds_received;
173 /* Only used during credentials reception. */
174 lttng_sock_cred creds;
175 } inbound;
176 struct {
177 /*
178 * Indicates whether or not a notification addressed to
179 * this client was dropped because a command reply was
180 * already buffered.
181 *
182 * A notification is dropped whenever the buffer is not
183 * empty.
184 */
185 bool dropped_notification;
186 /*
187 * Indicates whether or not a command reply is already
188 * buffered. In this case, it means that the client is
189 * not consuming command replies before emitting a new
190 * one. This could be caused by a protocol error or a
191 * misbehaving/malicious client.
192 */
193 bool queued_command_reply;
194 struct lttng_dynamic_buffer buffer;
195 } outbound;
196 } communication;
197 /* call_rcu delayed reclaim. */
198 struct rcu_head rcu_node;
199 };
200
201 struct channel_state_sample {
202 struct channel_key key;
203 struct cds_lfht_node channel_state_ht_node;
204 uint64_t highest_usage;
205 uint64_t lowest_usage;
206 uint64_t channel_total_consumed;
207 /* call_rcu delayed reclaim. */
208 struct rcu_head rcu_node;
209 };
210
211 static unsigned long hash_channel_key(struct channel_key *key);
212 static int evaluate_buffer_condition(const struct lttng_condition *condition,
213 struct lttng_evaluation **evaluation,
214 const struct notification_thread_state *state,
215 const struct channel_state_sample *previous_sample,
216 const struct channel_state_sample *latest_sample,
217 uint64_t previous_session_consumed_total,
218 uint64_t latest_session_consumed_total,
219 struct channel_info *channel_info);
220 static
221 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
222 const struct lttng_evaluation *evaluation,
223 struct notification_client_list *client_list,
224 struct notification_thread_state *state,
225 uid_t channel_uid, gid_t channel_gid);
226
227
228 /* session_info API */
229 static
230 void session_info_destroy(void *_data);
231 static
232 void session_info_get(struct session_info *session_info);
233 static
234 void session_info_put(struct session_info *session_info);
235 static
236 struct session_info *session_info_create(const char *name,
237 uid_t uid, gid_t gid,
238 struct lttng_session_trigger_list *trigger_list,
239 struct cds_lfht *sessions_ht);
240 static
241 void session_info_add_channel(struct session_info *session_info,
242 struct channel_info *channel_info);
243 static
244 void session_info_remove_channel(struct session_info *session_info,
245 struct channel_info *channel_info);
246
247 /* lttng_session_trigger_list API */
248 static
249 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
250 const char *session_name,
251 struct cds_lfht *session_triggers_ht);
252 static
253 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
254 const struct notification_thread_state *state,
255 const char *session_name);
256 static
257 void lttng_session_trigger_list_destroy(
258 struct lttng_session_trigger_list *list);
259 static
260 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
261 const struct lttng_trigger *trigger);
262
263
264 static
265 int match_client(struct cds_lfht_node *node, const void *key)
266 {
267 /* This double-cast is intended to supress pointer-to-cast warning. */
268 int socket = (int) (intptr_t) key;
269 struct notification_client *client;
270
271 client = caa_container_of(node, struct notification_client,
272 client_socket_ht_node);
273
274 return !!(client->socket == socket);
275 }
276
277 static
278 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
279 {
280 struct channel_key *channel_key = (struct channel_key *) key;
281 struct lttng_channel_trigger_list *trigger_list;
282
283 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
284 channel_triggers_ht_node);
285
286 return !!((channel_key->key == trigger_list->channel_key.key) &&
287 (channel_key->domain == trigger_list->channel_key.domain));
288 }
289
290 static
291 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
292 {
293 const char *session_name = (const char *) key;
294 struct lttng_session_trigger_list *trigger_list;
295
296 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
297 session_triggers_ht_node);
298
299 return !!(strcmp(trigger_list->session_name, session_name) == 0);
300 }
301
302 static
303 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
304 {
305 struct channel_key *channel_key = (struct channel_key *) key;
306 struct channel_state_sample *sample;
307
308 sample = caa_container_of(node, struct channel_state_sample,
309 channel_state_ht_node);
310
311 return !!((channel_key->key == sample->key.key) &&
312 (channel_key->domain == sample->key.domain));
313 }
314
315 static
316 int match_channel_info(struct cds_lfht_node *node, const void *key)
317 {
318 struct channel_key *channel_key = (struct channel_key *) key;
319 struct channel_info *channel_info;
320
321 channel_info = caa_container_of(node, struct channel_info,
322 channels_ht_node);
323
324 return !!((channel_key->key == channel_info->key.key) &&
325 (channel_key->domain == channel_info->key.domain));
326 }
327
328 static
329 int match_condition(struct cds_lfht_node *node, const void *key)
330 {
331 struct lttng_condition *condition_key = (struct lttng_condition *) key;
332 struct lttng_trigger_ht_element *trigger;
333 struct lttng_condition *condition;
334
335 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
336 node);
337 condition = lttng_trigger_get_condition(trigger->trigger);
338 assert(condition);
339
340 return !!lttng_condition_is_equal(condition_key, condition);
341 }
342
343 static
344 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
345 {
346 struct lttng_condition *condition_key = (struct lttng_condition *) key;
347 struct notification_client_list *client_list;
348 const struct lttng_condition *condition;
349
350 assert(condition_key);
351
352 client_list = caa_container_of(node, struct notification_client_list,
353 notification_trigger_ht_node);
354 condition = lttng_trigger_get_const_condition(client_list->trigger);
355
356 return !!lttng_condition_is_equal(condition_key, condition);
357 }
358
359 static
360 int match_session(struct cds_lfht_node *node, const void *key)
361 {
362 const char *name = key;
363 struct session_info *session_info = caa_container_of(
364 node, struct session_info, sessions_ht_node);
365
366 return !strcmp(session_info->name, name);
367 }
368
369 static
370 unsigned long lttng_condition_buffer_usage_hash(
371 const struct lttng_condition *_condition)
372 {
373 unsigned long hash;
374 unsigned long condition_type;
375 struct lttng_condition_buffer_usage *condition;
376
377 condition = container_of(_condition,
378 struct lttng_condition_buffer_usage, parent);
379
380 condition_type = (unsigned long) condition->parent.type;
381 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
382 if (condition->session_name) {
383 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
384 }
385 if (condition->channel_name) {
386 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
387 }
388 if (condition->domain.set) {
389 hash ^= hash_key_ulong(
390 (void *) condition->domain.type,
391 lttng_ht_seed);
392 }
393 if (condition->threshold_ratio.set) {
394 uint64_t val;
395
396 val = condition->threshold_ratio.value * (double) UINT32_MAX;
397 hash ^= hash_key_u64(&val, lttng_ht_seed);
398 } else if (condition->threshold_bytes.set) {
399 uint64_t val;
400
401 val = condition->threshold_bytes.value;
402 hash ^= hash_key_u64(&val, lttng_ht_seed);
403 }
404 return hash;
405 }
406
407 static
408 unsigned long lttng_condition_session_consumed_size_hash(
409 const struct lttng_condition *_condition)
410 {
411 unsigned long hash;
412 unsigned long condition_type =
413 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
414 struct lttng_condition_session_consumed_size *condition;
415 uint64_t val;
416
417 condition = container_of(_condition,
418 struct lttng_condition_session_consumed_size, parent);
419
420 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
421 if (condition->session_name) {
422 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
423 }
424 val = condition->consumed_threshold_bytes.value;
425 hash ^= hash_key_u64(&val, lttng_ht_seed);
426 return hash;
427 }
428
429 static
430 unsigned long lttng_condition_session_rotation_hash(
431 const struct lttng_condition *_condition)
432 {
433 unsigned long hash, condition_type;
434 struct lttng_condition_session_rotation *condition;
435
436 condition = container_of(_condition,
437 struct lttng_condition_session_rotation, parent);
438 condition_type = (unsigned long) condition->parent.type;
439 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
440 assert(condition->session_name);
441 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
442 return hash;
443 }
444
445 /*
446 * The lttng_condition hashing code is kept in this file (rather than
447 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
448 * don't want to link in liblttng-ctl.
449 */
450 static
451 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
452 {
453 switch (condition->type) {
454 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
455 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
456 return lttng_condition_buffer_usage_hash(condition);
457 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
458 return lttng_condition_session_consumed_size_hash(condition);
459 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
460 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
461 return lttng_condition_session_rotation_hash(condition);
462 default:
463 ERR("[notification-thread] Unexpected condition type caught");
464 abort();
465 }
466 }
467
468 static
469 unsigned long hash_channel_key(struct channel_key *key)
470 {
471 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
472 unsigned long domain_hash = hash_key_ulong(
473 (void *) (unsigned long) key->domain, lttng_ht_seed);
474
475 return key_hash ^ domain_hash;
476 }
477
478 /*
479 * Get the type of object to which a given condition applies. Bindings let
480 * the notification system evaluate a trigger's condition when a given
481 * object's state is updated.
482 *
483 * For instance, a condition bound to a channel will be evaluated everytime
484 * the channel's state is changed by a channel monitoring sample.
485 */
486 static
487 enum lttng_object_type get_condition_binding_object(
488 const struct lttng_condition *condition)
489 {
490 switch (lttng_condition_get_type(condition)) {
491 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
492 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
493 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
494 return LTTNG_OBJECT_TYPE_CHANNEL;
495 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
496 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
497 return LTTNG_OBJECT_TYPE_SESSION;
498 default:
499 return LTTNG_OBJECT_TYPE_UNKNOWN;
500 }
501 }
502
503 static
504 void free_channel_info_rcu(struct rcu_head *node)
505 {
506 free(caa_container_of(node, struct channel_info, rcu_node));
507 }
508
509 static
510 void channel_info_destroy(struct channel_info *channel_info)
511 {
512 if (!channel_info) {
513 return;
514 }
515
516 if (channel_info->session_info) {
517 session_info_remove_channel(channel_info->session_info,
518 channel_info);
519 session_info_put(channel_info->session_info);
520 }
521 if (channel_info->name) {
522 free(channel_info->name);
523 }
524 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
525 }
526
527 static
528 void free_session_info_rcu(struct rcu_head *node)
529 {
530 free(caa_container_of(node, struct session_info, rcu_node));
531 }
532
533 /* Don't call directly, use the ref-counting mechanism. */
534 static
535 void session_info_destroy(void *_data)
536 {
537 struct session_info *session_info = _data;
538 int ret;
539
540 assert(session_info);
541 if (session_info->channel_infos_ht) {
542 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
543 if (ret) {
544 ERR("[notification-thread] Failed to destroy channel information hash table");
545 }
546 }
547 lttng_session_trigger_list_destroy(session_info->trigger_list);
548
549 rcu_read_lock();
550 cds_lfht_del(session_info->sessions_ht,
551 &session_info->sessions_ht_node);
552 rcu_read_unlock();
553 free(session_info->name);
554 call_rcu(&session_info->rcu_node, free_session_info_rcu);
555 }
556
557 static
558 void session_info_get(struct session_info *session_info)
559 {
560 if (!session_info) {
561 return;
562 }
563 lttng_ref_get(&session_info->ref);
564 }
565
566 static
567 void session_info_put(struct session_info *session_info)
568 {
569 if (!session_info) {
570 return;
571 }
572 lttng_ref_put(&session_info->ref);
573 }
574
575 static
576 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
577 struct lttng_session_trigger_list *trigger_list,
578 struct cds_lfht *sessions_ht)
579 {
580 struct session_info *session_info;
581
582 assert(name);
583
584 session_info = zmalloc(sizeof(*session_info));
585 if (!session_info) {
586 goto end;
587 }
588 lttng_ref_init(&session_info->ref, session_info_destroy);
589
590 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
591 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
592 if (!session_info->channel_infos_ht) {
593 goto error;
594 }
595
596 cds_lfht_node_init(&session_info->sessions_ht_node);
597 session_info->name = strdup(name);
598 if (!session_info->name) {
599 goto error;
600 }
601 session_info->uid = uid;
602 session_info->gid = gid;
603 session_info->trigger_list = trigger_list;
604 session_info->sessions_ht = sessions_ht;
605 end:
606 return session_info;
607 error:
608 session_info_put(session_info);
609 return NULL;
610 }
611
612 static
613 void session_info_add_channel(struct session_info *session_info,
614 struct channel_info *channel_info)
615 {
616 rcu_read_lock();
617 cds_lfht_add(session_info->channel_infos_ht,
618 hash_channel_key(&channel_info->key),
619 &channel_info->session_info_channels_ht_node);
620 rcu_read_unlock();
621 }
622
623 static
624 void session_info_remove_channel(struct session_info *session_info,
625 struct channel_info *channel_info)
626 {
627 rcu_read_lock();
628 cds_lfht_del(session_info->channel_infos_ht,
629 &channel_info->session_info_channels_ht_node);
630 rcu_read_unlock();
631 }
632
633 static
634 struct channel_info *channel_info_create(const char *channel_name,
635 struct channel_key *channel_key, uint64_t channel_capacity,
636 struct session_info *session_info)
637 {
638 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
639
640 if (!channel_info) {
641 goto end;
642 }
643
644 cds_lfht_node_init(&channel_info->channels_ht_node);
645 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
646 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
647 channel_info->capacity = channel_capacity;
648
649 channel_info->name = strdup(channel_name);
650 if (!channel_info->name) {
651 goto error;
652 }
653
654 /*
655 * Set the references between session and channel infos:
656 * - channel_info holds a strong reference to session_info
657 * - session_info holds a weak reference to channel_info
658 */
659 session_info_get(session_info);
660 session_info_add_channel(session_info, channel_info);
661 channel_info->session_info = session_info;
662 end:
663 return channel_info;
664 error:
665 channel_info_destroy(channel_info);
666 return NULL;
667 }
668
669 /* RCU read lock must be held by the caller. */
670 static
671 struct notification_client_list *get_client_list_from_condition(
672 struct notification_thread_state *state,
673 const struct lttng_condition *condition)
674 {
675 struct cds_lfht_node *node;
676 struct cds_lfht_iter iter;
677
678 cds_lfht_lookup(state->notification_trigger_clients_ht,
679 lttng_condition_hash(condition),
680 match_client_list_condition,
681 condition,
682 &iter);
683 node = cds_lfht_iter_get_node(&iter);
684
685 return node ? caa_container_of(node,
686 struct notification_client_list,
687 notification_trigger_ht_node) : NULL;
688 }
689
690 /* This function must be called with the RCU read lock held. */
691 static
692 int evaluate_channel_condition_for_client(
693 const struct lttng_condition *condition,
694 struct notification_thread_state *state,
695 struct lttng_evaluation **evaluation,
696 uid_t *session_uid, gid_t *session_gid)
697 {
698 int ret;
699 struct cds_lfht_iter iter;
700 struct cds_lfht_node *node;
701 struct channel_info *channel_info = NULL;
702 struct channel_key *channel_key = NULL;
703 struct channel_state_sample *last_sample = NULL;
704 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
705
706 /* Find the channel associated with the condition. */
707 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
708 channel_trigger_list, channel_triggers_ht_node) {
709 struct lttng_trigger_list_element *element;
710
711 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
712 const struct lttng_condition *current_condition =
713 lttng_trigger_get_const_condition(
714 element->trigger);
715
716 assert(current_condition);
717 if (!lttng_condition_is_equal(condition,
718 current_condition)) {
719 continue;
720 }
721
722 /* Found the trigger, save the channel key. */
723 channel_key = &channel_trigger_list->channel_key;
724 break;
725 }
726 if (channel_key) {
727 /* The channel key was found stop iteration. */
728 break;
729 }
730 }
731
732 if (!channel_key){
733 /* No channel found; normal exit. */
734 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
735 ret = 0;
736 goto end;
737 }
738
739 /* Fetch channel info for the matching channel. */
740 cds_lfht_lookup(state->channels_ht,
741 hash_channel_key(channel_key),
742 match_channel_info,
743 channel_key,
744 &iter);
745 node = cds_lfht_iter_get_node(&iter);
746 assert(node);
747 channel_info = caa_container_of(node, struct channel_info,
748 channels_ht_node);
749
750 /* Retrieve the channel's last sample, if it exists. */
751 cds_lfht_lookup(state->channel_state_ht,
752 hash_channel_key(channel_key),
753 match_channel_state_sample,
754 channel_key,
755 &iter);
756 node = cds_lfht_iter_get_node(&iter);
757 if (node) {
758 last_sample = caa_container_of(node,
759 struct channel_state_sample,
760 channel_state_ht_node);
761 } else {
762 /* Nothing to evaluate, no sample was ever taken. Normal exit */
763 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
764 ret = 0;
765 goto end;
766 }
767
768 ret = evaluate_buffer_condition(condition, evaluation, state,
769 NULL, last_sample,
770 0, channel_info->session_info->consumed_data_size,
771 channel_info);
772 if (ret) {
773 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
774 goto end;
775 }
776
777 *session_uid = channel_info->session_info->uid;
778 *session_gid = channel_info->session_info->gid;
779 end:
780 return ret;
781 }
782
783 static
784 const char *get_condition_session_name(const struct lttng_condition *condition)
785 {
786 const char *session_name = NULL;
787 enum lttng_condition_status status;
788
789 switch (lttng_condition_get_type(condition)) {
790 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
791 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
792 status = lttng_condition_buffer_usage_get_session_name(
793 condition, &session_name);
794 break;
795 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
796 status = lttng_condition_session_consumed_size_get_session_name(
797 condition, &session_name);
798 break;
799 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
800 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
801 status = lttng_condition_session_rotation_get_session_name(
802 condition, &session_name);
803 break;
804 default:
805 abort();
806 }
807 if (status != LTTNG_CONDITION_STATUS_OK) {
808 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
809 goto end;
810 }
811 end:
812 return session_name;
813 }
814
815 /* This function must be called with the RCU read lock held. */
816 static
817 int evaluate_session_condition_for_client(
818 const struct lttng_condition *condition,
819 struct notification_thread_state *state,
820 struct lttng_evaluation **evaluation,
821 uid_t *session_uid, gid_t *session_gid)
822 {
823 int ret;
824 struct cds_lfht_iter iter;
825 struct cds_lfht_node *node;
826 const char *session_name;
827 struct session_info *session_info = NULL;
828
829 session_name = get_condition_session_name(condition);
830
831 /* Find the session associated with the trigger. */
832 cds_lfht_lookup(state->sessions_ht,
833 hash_key_str(session_name, lttng_ht_seed),
834 match_session,
835 session_name,
836 &iter);
837 node = cds_lfht_iter_get_node(&iter);
838 if (!node) {
839 DBG("[notification-thread] No known session matching name \"%s\"",
840 session_name);
841 ret = 0;
842 goto end;
843 }
844
845 session_info = caa_container_of(node, struct session_info,
846 sessions_ht_node);
847 session_info_get(session_info);
848
849 /*
850 * Evaluation is performed in-line here since only one type of
851 * session-bound condition is handled for the moment.
852 */
853 switch (lttng_condition_get_type(condition)) {
854 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
855 if (!session_info->rotation.ongoing) {
856 ret = 0;
857 goto end_session_put;
858 }
859
860 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
861 session_info->rotation.id);
862 if (!*evaluation) {
863 /* Fatal error. */
864 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
865 session_info->name);
866 ret = -1;
867 goto end_session_put;
868 }
869 ret = 0;
870 break;
871 default:
872 ret = 0;
873 goto end_session_put;
874 }
875
876 *session_uid = session_info->uid;
877 *session_gid = session_info->gid;
878
879 end_session_put:
880 session_info_put(session_info);
881 end:
882 return ret;
883 }
884
885 /* This function must be called with the RCU read lock held. */
886 static
887 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
888 const struct lttng_condition *condition,
889 struct notification_client *client,
890 struct notification_thread_state *state)
891 {
892 int ret;
893 struct lttng_evaluation *evaluation = NULL;
894 struct notification_client_list client_list = { 0 };
895 struct notification_client_list_element client_list_element = { 0 };
896 uid_t object_uid = 0;
897 gid_t object_gid = 0;
898
899 assert(trigger);
900 assert(condition);
901 assert(client);
902 assert(state);
903
904 switch (get_condition_binding_object(condition)) {
905 case LTTNG_OBJECT_TYPE_SESSION:
906 ret = evaluate_session_condition_for_client(condition, state,
907 &evaluation, &object_uid, &object_gid);
908 break;
909 case LTTNG_OBJECT_TYPE_CHANNEL:
910 ret = evaluate_channel_condition_for_client(condition, state,
911 &evaluation, &object_uid, &object_gid);
912 break;
913 case LTTNG_OBJECT_TYPE_NONE:
914 ret = 0;
915 goto end;
916 case LTTNG_OBJECT_TYPE_UNKNOWN:
917 default:
918 ret = -1;
919 goto end;
920 }
921 if (ret) {
922 /* Fatal error. */
923 goto end;
924 }
925 if (!evaluation) {
926 /* Evaluation yielded nothing. Normal exit. */
927 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
928 ret = 0;
929 goto end;
930 }
931
932 /*
933 * Create a temporary client list with the client currently
934 * subscribing.
935 */
936 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
937 CDS_INIT_LIST_HEAD(&client_list.list);
938 client_list.trigger = trigger;
939
940 CDS_INIT_LIST_HEAD(&client_list_element.node);
941 client_list_element.client = client;
942 cds_list_add(&client_list_element.node, &client_list.list);
943
944 /* Send evaluation result to the newly-subscribed client. */
945 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
946 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
947 state, object_uid, object_gid);
948
949 end:
950 return ret;
951 }
952
953 static
954 int notification_thread_client_subscribe(struct notification_client *client,
955 struct lttng_condition *condition,
956 struct notification_thread_state *state,
957 enum lttng_notification_channel_status *_status)
958 {
959 int ret = 0;
960 struct notification_client_list *client_list;
961 struct lttng_condition_list_element *condition_list_element = NULL;
962 struct notification_client_list_element *client_list_element = NULL;
963 enum lttng_notification_channel_status status =
964 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
965
966 /*
967 * Ensure that the client has not already subscribed to this condition
968 * before.
969 */
970 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
971 if (lttng_condition_is_equal(condition_list_element->condition,
972 condition)) {
973 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
974 goto end;
975 }
976 }
977
978 condition_list_element = zmalloc(sizeof(*condition_list_element));
979 if (!condition_list_element) {
980 ret = -1;
981 goto error;
982 }
983 client_list_element = zmalloc(sizeof(*client_list_element));
984 if (!client_list_element) {
985 ret = -1;
986 goto error;
987 }
988
989 rcu_read_lock();
990
991 /*
992 * Add the newly-subscribed condition to the client's subscription list.
993 */
994 CDS_INIT_LIST_HEAD(&condition_list_element->node);
995 condition_list_element->condition = condition;
996 cds_list_add(&condition_list_element->node, &client->condition_list);
997
998 client_list = get_client_list_from_condition(state, condition);
999 if (!client_list) {
1000 /*
1001 * No notification-emiting trigger registered with this
1002 * condition. We don't evaluate the condition right away
1003 * since this trigger is not registered yet.
1004 */
1005 free(client_list_element);
1006 goto end_unlock;
1007 }
1008
1009 /*
1010 * The condition to which the client just subscribed is evaluated
1011 * at this point so that conditions that are already TRUE result
1012 * in a notification being sent out.
1013 */
1014 if (evaluate_condition_for_client(client_list->trigger, condition,
1015 client, state)) {
1016 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1017 ret = -1;
1018 free(client_list_element);
1019 goto end_unlock;
1020 }
1021
1022 /*
1023 * Add the client to the list of clients interested in a given trigger
1024 * if a "notification" trigger with a corresponding condition was
1025 * added prior.
1026 */
1027 client_list_element->client = client;
1028 CDS_INIT_LIST_HEAD(&client_list_element->node);
1029 cds_list_add(&client_list_element->node, &client_list->list);
1030 end_unlock:
1031 rcu_read_unlock();
1032 end:
1033 if (_status) {
1034 *_status = status;
1035 }
1036 return ret;
1037 error:
1038 free(condition_list_element);
1039 free(client_list_element);
1040 return ret;
1041 }
1042
1043 static
1044 int notification_thread_client_unsubscribe(
1045 struct notification_client *client,
1046 struct lttng_condition *condition,
1047 struct notification_thread_state *state,
1048 enum lttng_notification_channel_status *_status)
1049 {
1050 struct notification_client_list *client_list;
1051 struct lttng_condition_list_element *condition_list_element,
1052 *condition_tmp;
1053 struct notification_client_list_element *client_list_element,
1054 *client_tmp;
1055 bool condition_found = false;
1056 enum lttng_notification_channel_status status =
1057 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1058
1059 /* Remove the condition from the client's condition list. */
1060 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1061 &client->condition_list, node) {
1062 if (!lttng_condition_is_equal(condition_list_element->condition,
1063 condition)) {
1064 continue;
1065 }
1066
1067 cds_list_del(&condition_list_element->node);
1068 /*
1069 * The caller may be iterating on the client's conditions to
1070 * tear down a client's connection. In this case, the condition
1071 * will be destroyed at the end.
1072 */
1073 if (condition != condition_list_element->condition) {
1074 lttng_condition_destroy(
1075 condition_list_element->condition);
1076 }
1077 free(condition_list_element);
1078 condition_found = true;
1079 break;
1080 }
1081
1082 if (!condition_found) {
1083 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1084 goto end;
1085 }
1086
1087 /*
1088 * Remove the client from the list of clients interested the trigger
1089 * matching the condition.
1090 */
1091 rcu_read_lock();
1092 client_list = get_client_list_from_condition(state, condition);
1093 if (!client_list) {
1094 goto end_unlock;
1095 }
1096
1097 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1098 &client_list->list, node) {
1099 if (client_list_element->client->socket != client->socket) {
1100 continue;
1101 }
1102 cds_list_del(&client_list_element->node);
1103 free(client_list_element);
1104 break;
1105 }
1106 end_unlock:
1107 rcu_read_unlock();
1108 end:
1109 lttng_condition_destroy(condition);
1110 if (_status) {
1111 *_status = status;
1112 }
1113 return 0;
1114 }
1115
1116 static
1117 void free_notification_client_rcu(struct rcu_head *node)
1118 {
1119 free(caa_container_of(node, struct notification_client, rcu_node));
1120 }
1121
1122 static
1123 void notification_client_destroy(struct notification_client *client,
1124 struct notification_thread_state *state)
1125 {
1126 struct lttng_condition_list_element *condition_list_element, *tmp;
1127
1128 if (!client) {
1129 return;
1130 }
1131
1132 /* Release all conditions to which the client was subscribed. */
1133 cds_list_for_each_entry_safe(condition_list_element, tmp,
1134 &client->condition_list, node) {
1135 (void) notification_thread_client_unsubscribe(client,
1136 condition_list_element->condition, state, NULL);
1137 }
1138
1139 if (client->socket >= 0) {
1140 (void) lttcomm_close_unix_sock(client->socket);
1141 }
1142 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1143 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1144 call_rcu(&client->rcu_node, free_notification_client_rcu);
1145 }
1146
1147 /*
1148 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1149 * client pointer).
1150 */
1151 static
1152 struct notification_client *get_client_from_socket(int socket,
1153 struct notification_thread_state *state)
1154 {
1155 struct cds_lfht_iter iter;
1156 struct cds_lfht_node *node;
1157 struct notification_client *client = NULL;
1158
1159 cds_lfht_lookup(state->client_socket_ht,
1160 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
1161 match_client,
1162 (void *) (unsigned long) socket,
1163 &iter);
1164 node = cds_lfht_iter_get_node(&iter);
1165 if (!node) {
1166 goto end;
1167 }
1168
1169 client = caa_container_of(node, struct notification_client,
1170 client_socket_ht_node);
1171 end:
1172 return client;
1173 }
1174
1175 static
1176 bool buffer_usage_condition_applies_to_channel(
1177 const struct lttng_condition *condition,
1178 const struct channel_info *channel_info)
1179 {
1180 enum lttng_condition_status status;
1181 enum lttng_domain_type condition_domain;
1182 const char *condition_session_name = NULL;
1183 const char *condition_channel_name = NULL;
1184
1185 status = lttng_condition_buffer_usage_get_domain_type(condition,
1186 &condition_domain);
1187 assert(status == LTTNG_CONDITION_STATUS_OK);
1188 if (channel_info->key.domain != condition_domain) {
1189 goto fail;
1190 }
1191
1192 status = lttng_condition_buffer_usage_get_session_name(
1193 condition, &condition_session_name);
1194 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1195
1196 status = lttng_condition_buffer_usage_get_channel_name(
1197 condition, &condition_channel_name);
1198 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1199
1200 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1201 goto fail;
1202 }
1203 if (strcmp(channel_info->name, condition_channel_name)) {
1204 goto fail;
1205 }
1206
1207 return true;
1208 fail:
1209 return false;
1210 }
1211
1212 static
1213 bool session_consumed_size_condition_applies_to_channel(
1214 const struct lttng_condition *condition,
1215 const struct channel_info *channel_info)
1216 {
1217 enum lttng_condition_status status;
1218 const char *condition_session_name = NULL;
1219
1220 status = lttng_condition_session_consumed_size_get_session_name(
1221 condition, &condition_session_name);
1222 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1223
1224 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1225 goto fail;
1226 }
1227
1228 return true;
1229 fail:
1230 return false;
1231 }
1232
1233 static
1234 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1235 const struct channel_info *channel_info)
1236 {
1237 const struct lttng_condition *condition;
1238 bool trigger_applies;
1239
1240 condition = lttng_trigger_get_const_condition(trigger);
1241 if (!condition) {
1242 goto fail;
1243 }
1244
1245 switch (lttng_condition_get_type(condition)) {
1246 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1247 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1248 trigger_applies = buffer_usage_condition_applies_to_channel(
1249 condition, channel_info);
1250 break;
1251 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1252 trigger_applies = session_consumed_size_condition_applies_to_channel(
1253 condition, channel_info);
1254 break;
1255 default:
1256 goto fail;
1257 }
1258
1259 return trigger_applies;
1260 fail:
1261 return false;
1262 }
1263
1264 static
1265 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1266 struct notification_client *client)
1267 {
1268 bool applies = false;
1269 struct lttng_condition_list_element *condition_list_element;
1270
1271 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1272 node) {
1273 applies = lttng_condition_is_equal(
1274 condition_list_element->condition,
1275 lttng_trigger_get_condition(trigger));
1276 if (applies) {
1277 break;
1278 }
1279 }
1280 return applies;
1281 }
1282
1283 /* Must be called with RCU read lock held. */
1284 static
1285 struct lttng_session_trigger_list *get_session_trigger_list(
1286 struct notification_thread_state *state,
1287 const char *session_name)
1288 {
1289 struct lttng_session_trigger_list *list = NULL;
1290 struct cds_lfht_node *node;
1291 struct cds_lfht_iter iter;
1292
1293 cds_lfht_lookup(state->session_triggers_ht,
1294 hash_key_str(session_name, lttng_ht_seed),
1295 match_session_trigger_list,
1296 session_name,
1297 &iter);
1298 node = cds_lfht_iter_get_node(&iter);
1299 if (!node) {
1300 /*
1301 * Not an error, the list of triggers applying to that session
1302 * will be initialized when the session is created.
1303 */
1304 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1305 session_name);
1306 goto end;
1307 }
1308
1309 list = caa_container_of(node,
1310 struct lttng_session_trigger_list,
1311 session_triggers_ht_node);
1312 end:
1313 return list;
1314 }
1315
1316 /*
1317 * Allocate an empty lttng_session_trigger_list for the session named
1318 * 'session_name'.
1319 *
1320 * No ownership of 'session_name' is assumed by the session trigger list.
1321 * It is the caller's responsability to ensure the session name is alive
1322 * for as long as this list is.
1323 */
1324 static
1325 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1326 const char *session_name,
1327 struct cds_lfht *session_triggers_ht)
1328 {
1329 struct lttng_session_trigger_list *list;
1330
1331 list = zmalloc(sizeof(*list));
1332 if (!list) {
1333 goto end;
1334 }
1335 list->session_name = session_name;
1336 CDS_INIT_LIST_HEAD(&list->list);
1337 cds_lfht_node_init(&list->session_triggers_ht_node);
1338 list->session_triggers_ht = session_triggers_ht;
1339
1340 rcu_read_lock();
1341 /* Publish the list through the session_triggers_ht. */
1342 cds_lfht_add(session_triggers_ht,
1343 hash_key_str(session_name, lttng_ht_seed),
1344 &list->session_triggers_ht_node);
1345 rcu_read_unlock();
1346 end:
1347 return list;
1348 }
1349
1350 static
1351 void free_session_trigger_list_rcu(struct rcu_head *node)
1352 {
1353 free(caa_container_of(node, struct lttng_session_trigger_list,
1354 rcu_node));
1355 }
1356
1357 static
1358 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1359 {
1360 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1361
1362 /* Empty the list element by element, and then free the list itself. */
1363 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1364 &list->list, node) {
1365 cds_list_del(&trigger_list_element->node);
1366 free(trigger_list_element);
1367 }
1368 rcu_read_lock();
1369 /* Unpublish the list from the session_triggers_ht. */
1370 cds_lfht_del(list->session_triggers_ht,
1371 &list->session_triggers_ht_node);
1372 rcu_read_unlock();
1373 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1374 }
1375
1376 static
1377 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1378 const struct lttng_trigger *trigger)
1379 {
1380 int ret = 0;
1381 struct lttng_trigger_list_element *new_element =
1382 zmalloc(sizeof(*new_element));
1383
1384 if (!new_element) {
1385 ret = -1;
1386 goto end;
1387 }
1388 CDS_INIT_LIST_HEAD(&new_element->node);
1389 new_element->trigger = trigger;
1390 cds_list_add(&new_element->node, &list->list);
1391 end:
1392 return ret;
1393 }
1394
1395 static
1396 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1397 const char *session_name)
1398 {
1399 bool applies = false;
1400 const struct lttng_condition *condition;
1401
1402 condition = lttng_trigger_get_const_condition(trigger);
1403 switch (lttng_condition_get_type(condition)) {
1404 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1405 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1406 {
1407 enum lttng_condition_status condition_status;
1408 const char *condition_session_name;
1409
1410 condition_status = lttng_condition_session_rotation_get_session_name(
1411 condition, &condition_session_name);
1412 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1413 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1414 goto end;
1415 }
1416
1417 assert(condition_session_name);
1418 applies = !strcmp(condition_session_name, session_name);
1419 break;
1420 }
1421 default:
1422 goto end;
1423 }
1424 end:
1425 return applies;
1426 }
1427
1428 /*
1429 * Allocate and initialize an lttng_session_trigger_list which contains
1430 * all triggers that apply to the session named 'session_name'.
1431 *
1432 * No ownership of 'session_name' is assumed by the session trigger list.
1433 * It is the caller's responsability to ensure the session name is alive
1434 * for as long as this list is.
1435 */
1436 static
1437 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1438 const struct notification_thread_state *state,
1439 const char *session_name)
1440 {
1441 int trigger_count = 0;
1442 struct lttng_session_trigger_list *session_trigger_list = NULL;
1443 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1444 struct cds_lfht_iter iter;
1445
1446 session_trigger_list = lttng_session_trigger_list_create(session_name,
1447 state->session_triggers_ht);
1448
1449 /* Add all triggers applying to the session named 'session_name'. */
1450 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1451 node) {
1452 int ret;
1453
1454 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1455 session_name)) {
1456 continue;
1457 }
1458
1459 ret = lttng_session_trigger_list_add(session_trigger_list,
1460 trigger_ht_element->trigger);
1461 if (ret) {
1462 goto error;
1463 }
1464
1465 trigger_count++;
1466 }
1467
1468 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1469 trigger_count);
1470 return session_trigger_list;
1471 error:
1472 lttng_session_trigger_list_destroy(session_trigger_list);
1473 return NULL;
1474 }
1475
1476 static
1477 struct session_info *find_or_create_session_info(
1478 struct notification_thread_state *state,
1479 const char *name, uid_t uid, gid_t gid)
1480 {
1481 struct session_info *session = NULL;
1482 struct cds_lfht_node *node;
1483 struct cds_lfht_iter iter;
1484 struct lttng_session_trigger_list *trigger_list;
1485
1486 rcu_read_lock();
1487 cds_lfht_lookup(state->sessions_ht,
1488 hash_key_str(name, lttng_ht_seed),
1489 match_session,
1490 name,
1491 &iter);
1492 node = cds_lfht_iter_get_node(&iter);
1493 if (node) {
1494 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1495 name, uid, gid);
1496 session = caa_container_of(node, struct session_info,
1497 sessions_ht_node);
1498 assert(session->uid == uid);
1499 assert(session->gid == gid);
1500 session_info_get(session);
1501 goto end;
1502 }
1503
1504 trigger_list = lttng_session_trigger_list_build(state, name);
1505 if (!trigger_list) {
1506 goto error;
1507 }
1508
1509 session = session_info_create(name, uid, gid, trigger_list,
1510 state->sessions_ht);
1511 if (!session) {
1512 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1513 name, uid, gid);
1514 lttng_session_trigger_list_destroy(trigger_list);
1515 goto error;
1516 }
1517 trigger_list = NULL;
1518
1519 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1520 &session->sessions_ht_node);
1521 end:
1522 rcu_read_unlock();
1523 return session;
1524 error:
1525 rcu_read_unlock();
1526 session_info_put(session);
1527 return NULL;
1528 }
1529
1530 static
1531 int handle_notification_thread_command_add_channel(
1532 struct notification_thread_state *state,
1533 const char *session_name, uid_t session_uid, gid_t session_gid,
1534 const char *channel_name, enum lttng_domain_type channel_domain,
1535 uint64_t channel_key_int, uint64_t channel_capacity,
1536 enum lttng_error_code *cmd_result)
1537 {
1538 struct cds_list_head trigger_list;
1539 struct channel_info *new_channel_info = NULL;
1540 struct channel_key channel_key = {
1541 .key = channel_key_int,
1542 .domain = channel_domain,
1543 };
1544 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1545 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1546 int trigger_count = 0;
1547 struct cds_lfht_iter iter;
1548 struct session_info *session_info = NULL;
1549
1550 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1551 channel_name, session_name, channel_key_int,
1552 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1553
1554 CDS_INIT_LIST_HEAD(&trigger_list);
1555
1556 session_info = find_or_create_session_info(state, session_name,
1557 session_uid, session_gid);
1558 if (!session_info) {
1559 /* Allocation error or an internal error occurred. */
1560 goto error;
1561 }
1562
1563 new_channel_info = channel_info_create(channel_name, &channel_key,
1564 channel_capacity, session_info);
1565 if (!new_channel_info) {
1566 goto error;
1567 }
1568
1569 rcu_read_lock();
1570 /* Build a list of all triggers applying to the new channel. */
1571 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1572 node) {
1573 struct lttng_trigger_list_element *new_element;
1574
1575 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1576 new_channel_info)) {
1577 continue;
1578 }
1579
1580 new_element = zmalloc(sizeof(*new_element));
1581 if (!new_element) {
1582 rcu_read_unlock();
1583 goto error;
1584 }
1585 CDS_INIT_LIST_HEAD(&new_element->node);
1586 new_element->trigger = trigger_ht_element->trigger;
1587 cds_list_add(&new_element->node, &trigger_list);
1588 trigger_count++;
1589 }
1590 rcu_read_unlock();
1591
1592 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1593 trigger_count);
1594 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1595 if (!channel_trigger_list) {
1596 goto error;
1597 }
1598 channel_trigger_list->channel_key = new_channel_info->key;
1599 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1600 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1601 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1602
1603 rcu_read_lock();
1604 /* Add channel to the channel_ht which owns the channel_infos. */
1605 cds_lfht_add(state->channels_ht,
1606 hash_channel_key(&new_channel_info->key),
1607 &new_channel_info->channels_ht_node);
1608 /*
1609 * Add the list of triggers associated with this channel to the
1610 * channel_triggers_ht.
1611 */
1612 cds_lfht_add(state->channel_triggers_ht,
1613 hash_channel_key(&new_channel_info->key),
1614 &channel_trigger_list->channel_triggers_ht_node);
1615 rcu_read_unlock();
1616 session_info_put(session_info);
1617 *cmd_result = LTTNG_OK;
1618 return 0;
1619 error:
1620 channel_info_destroy(new_channel_info);
1621 session_info_put(session_info);
1622 return 1;
1623 }
1624
1625 static
1626 void free_channel_trigger_list_rcu(struct rcu_head *node)
1627 {
1628 free(caa_container_of(node, struct lttng_channel_trigger_list,
1629 rcu_node));
1630 }
1631
1632 static
1633 void free_channel_state_sample_rcu(struct rcu_head *node)
1634 {
1635 free(caa_container_of(node, struct channel_state_sample,
1636 rcu_node));
1637 }
1638
1639 static
1640 int handle_notification_thread_command_remove_channel(
1641 struct notification_thread_state *state,
1642 uint64_t channel_key, enum lttng_domain_type domain,
1643 enum lttng_error_code *cmd_result)
1644 {
1645 struct cds_lfht_node *node;
1646 struct cds_lfht_iter iter;
1647 struct lttng_channel_trigger_list *trigger_list;
1648 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1649 struct channel_key key = { .key = channel_key, .domain = domain };
1650 struct channel_info *channel_info;
1651
1652 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1653 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1654
1655 rcu_read_lock();
1656
1657 cds_lfht_lookup(state->channel_triggers_ht,
1658 hash_channel_key(&key),
1659 match_channel_trigger_list,
1660 &key,
1661 &iter);
1662 node = cds_lfht_iter_get_node(&iter);
1663 /*
1664 * There is a severe internal error if we are being asked to remove a
1665 * channel that doesn't exist.
1666 */
1667 if (!node) {
1668 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1669 goto end;
1670 }
1671
1672 /* Free the list of triggers associated with this channel. */
1673 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1674 channel_triggers_ht_node);
1675 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1676 &trigger_list->list, node) {
1677 cds_list_del(&trigger_list_element->node);
1678 free(trigger_list_element);
1679 }
1680 cds_lfht_del(state->channel_triggers_ht, node);
1681 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1682
1683 /* Free sampled channel state. */
1684 cds_lfht_lookup(state->channel_state_ht,
1685 hash_channel_key(&key),
1686 match_channel_state_sample,
1687 &key,
1688 &iter);
1689 node = cds_lfht_iter_get_node(&iter);
1690 /*
1691 * This is expected to be NULL if the channel is destroyed before we
1692 * received a sample.
1693 */
1694 if (node) {
1695 struct channel_state_sample *sample = caa_container_of(node,
1696 struct channel_state_sample,
1697 channel_state_ht_node);
1698
1699 cds_lfht_del(state->channel_state_ht, node);
1700 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1701 }
1702
1703 /* Remove the channel from the channels_ht and free it. */
1704 cds_lfht_lookup(state->channels_ht,
1705 hash_channel_key(&key),
1706 match_channel_info,
1707 &key,
1708 &iter);
1709 node = cds_lfht_iter_get_node(&iter);
1710 assert(node);
1711 channel_info = caa_container_of(node, struct channel_info,
1712 channels_ht_node);
1713 cds_lfht_del(state->channels_ht, node);
1714 channel_info_destroy(channel_info);
1715 end:
1716 rcu_read_unlock();
1717 *cmd_result = LTTNG_OK;
1718 return 0;
1719 }
1720
1721 static
1722 int handle_notification_thread_command_session_rotation(
1723 struct notification_thread_state *state,
1724 enum notification_thread_command_type cmd_type,
1725 const char *session_name, uid_t session_uid, gid_t session_gid,
1726 uint64_t trace_archive_chunk_id,
1727 struct lttng_trace_archive_location *location,
1728 enum lttng_error_code *_cmd_result)
1729 {
1730 int ret = 0;
1731 enum lttng_error_code cmd_result = LTTNG_OK;
1732 struct lttng_session_trigger_list *trigger_list;
1733 struct lttng_trigger_list_element *trigger_list_element;
1734 struct session_info *session_info;
1735
1736 rcu_read_lock();
1737
1738 session_info = find_or_create_session_info(state, session_name,
1739 session_uid, session_gid);
1740 if (!session_info) {
1741 /* Allocation error or an internal error occurred. */
1742 ret = -1;
1743 cmd_result = LTTNG_ERR_NOMEM;
1744 goto end;
1745 }
1746
1747 session_info->rotation.ongoing =
1748 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1749 session_info->rotation.id = trace_archive_chunk_id;
1750 trigger_list = get_session_trigger_list(state, session_name);
1751 if (!trigger_list) {
1752 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1753 session_name);
1754 goto end;
1755 }
1756
1757 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1758 node) {
1759 const struct lttng_condition *condition;
1760 const struct lttng_action *action;
1761 const struct lttng_trigger *trigger;
1762 struct notification_client_list *client_list;
1763 struct lttng_evaluation *evaluation = NULL;
1764 enum lttng_condition_type condition_type;
1765
1766 trigger = trigger_list_element->trigger;
1767 condition = lttng_trigger_get_const_condition(trigger);
1768 assert(condition);
1769 condition_type = lttng_condition_get_type(condition);
1770
1771 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1772 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1773 continue;
1774 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1775 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1776 continue;
1777 }
1778
1779 action = lttng_trigger_get_const_action(trigger);
1780
1781 /* Notify actions are the only type currently supported. */
1782 assert(lttng_action_get_type_const(action) ==
1783 LTTNG_ACTION_TYPE_NOTIFY);
1784
1785 client_list = get_client_list_from_condition(state, condition);
1786 assert(client_list);
1787
1788 if (cds_list_empty(&client_list->list)) {
1789 /*
1790 * No clients interested in the evaluation's result,
1791 * skip it.
1792 */
1793 continue;
1794 }
1795
1796 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1797 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1798 trace_archive_chunk_id);
1799 } else {
1800 evaluation = lttng_evaluation_session_rotation_completed_create(
1801 trace_archive_chunk_id, location);
1802 }
1803
1804 if (!evaluation) {
1805 /* Internal error */
1806 ret = -1;
1807 cmd_result = LTTNG_ERR_UNK;
1808 goto end;
1809 }
1810
1811 /* Dispatch evaluation result to all clients. */
1812 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1813 evaluation, client_list, state,
1814 session_info->uid,
1815 session_info->gid);
1816 lttng_evaluation_destroy(evaluation);
1817 if (caa_unlikely(ret)) {
1818 goto end;
1819 }
1820 }
1821 end:
1822 session_info_put(session_info);
1823 *_cmd_result = cmd_result;
1824 rcu_read_unlock();
1825 return ret;
1826 }
1827
1828 static
1829 int condition_is_supported(struct lttng_condition *condition)
1830 {
1831 int ret;
1832
1833 switch (lttng_condition_get_type(condition)) {
1834 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1835 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1836 {
1837 enum lttng_domain_type domain;
1838
1839 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1840 &domain);
1841 if (ret) {
1842 ret = -1;
1843 goto end;
1844 }
1845
1846 if (domain != LTTNG_DOMAIN_KERNEL) {
1847 ret = 1;
1848 goto end;
1849 }
1850
1851 /*
1852 * Older kernel tracers don't expose the API to monitor their
1853 * buffers. Therefore, we reject triggers that require that
1854 * mechanism to be available to be evaluated.
1855 */
1856 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1857 break;
1858 }
1859 default:
1860 ret = 1;
1861 }
1862 end:
1863 return ret;
1864 }
1865
1866 /* Must be called with RCU read lock held. */
1867 static
1868 int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1869 struct notification_thread_state *state)
1870 {
1871 int ret = 0;
1872 const struct lttng_condition *condition;
1873 const char *session_name;
1874 struct lttng_session_trigger_list *trigger_list;
1875
1876 condition = lttng_trigger_get_const_condition(trigger);
1877 switch (lttng_condition_get_type(condition)) {
1878 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1879 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1880 {
1881 enum lttng_condition_status status;
1882
1883 status = lttng_condition_session_rotation_get_session_name(
1884 condition, &session_name);
1885 if (status != LTTNG_CONDITION_STATUS_OK) {
1886 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1887 ret = -1;
1888 goto end;
1889 }
1890 break;
1891 }
1892 default:
1893 ret = -1;
1894 goto end;
1895 }
1896
1897 trigger_list = get_session_trigger_list(state, session_name);
1898 if (!trigger_list) {
1899 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1900 session_name);
1901 goto end;
1902
1903 }
1904
1905 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1906 session_name);
1907 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1908 end:
1909 return ret;
1910 }
1911
1912 /* Must be called with RCU read lock held. */
1913 static
1914 int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1915 struct notification_thread_state *state)
1916 {
1917 int ret = 0;
1918 struct cds_lfht_node *node;
1919 struct cds_lfht_iter iter;
1920 struct channel_info *channel;
1921
1922 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1923 channels_ht_node) {
1924 struct lttng_trigger_list_element *trigger_list_element;
1925 struct lttng_channel_trigger_list *trigger_list;
1926 struct cds_lfht_iter lookup_iter;
1927
1928 if (!trigger_applies_to_channel(trigger, channel)) {
1929 continue;
1930 }
1931
1932 cds_lfht_lookup(state->channel_triggers_ht,
1933 hash_channel_key(&channel->key),
1934 match_channel_trigger_list,
1935 &channel->key,
1936 &lookup_iter);
1937 node = cds_lfht_iter_get_node(&lookup_iter);
1938 assert(node);
1939 trigger_list = caa_container_of(node,
1940 struct lttng_channel_trigger_list,
1941 channel_triggers_ht_node);
1942
1943 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1944 if (!trigger_list_element) {
1945 ret = -1;
1946 goto end;
1947 }
1948 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1949 trigger_list_element->trigger = trigger;
1950 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1951 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1952 channel->name);
1953 }
1954 end:
1955 return ret;
1956 }
1957
1958 /*
1959 * FIXME A client's credentials are not checked when registering a trigger, nor
1960 * are they stored alongside with the trigger.
1961 *
1962 * The effects of this are benign since:
1963 * - The client will succeed in registering the trigger, as it is valid,
1964 * - The trigger will, internally, be bound to the channel/session,
1965 * - The notifications will not be sent since the client's credentials
1966 * are checked against the channel at that moment.
1967 *
1968 * If this function returns a non-zero value, it means something is
1969 * fundamentally broken and the whole subsystem/thread will be torn down.
1970 *
1971 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1972 * error code.
1973 */
1974 static
1975 int handle_notification_thread_command_register_trigger(
1976 struct notification_thread_state *state,
1977 struct lttng_trigger *trigger,
1978 enum lttng_error_code *cmd_result)
1979 {
1980 int ret = 0;
1981 struct lttng_condition *condition;
1982 struct notification_client *client;
1983 struct notification_client_list *client_list = NULL;
1984 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1985 struct notification_client_list_element *client_list_element, *tmp;
1986 struct cds_lfht_node *node;
1987 struct cds_lfht_iter iter;
1988 bool free_trigger = true;
1989
1990 rcu_read_lock();
1991
1992 condition = lttng_trigger_get_condition(trigger);
1993 assert(condition);
1994
1995 ret = condition_is_supported(condition);
1996 if (ret < 0) {
1997 goto error;
1998 } else if (ret == 0) {
1999 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2000 goto error;
2001 } else {
2002 /* Feature is supported, continue. */
2003 ret = 0;
2004 }
2005
2006 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2007 if (!trigger_ht_element) {
2008 ret = -1;
2009 goto error;
2010 }
2011
2012 /* Add trigger to the trigger_ht. */
2013 cds_lfht_node_init(&trigger_ht_element->node);
2014 trigger_ht_element->trigger = trigger;
2015
2016 node = cds_lfht_add_unique(state->triggers_ht,
2017 lttng_condition_hash(condition),
2018 match_condition,
2019 condition,
2020 &trigger_ht_element->node);
2021 if (node != &trigger_ht_element->node) {
2022 /* Not a fatal error, simply report it to the client. */
2023 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2024 goto error_free_ht_element;
2025 }
2026
2027 /*
2028 * Ownership of the trigger and of its wrapper was transfered to
2029 * the triggers_ht.
2030 */
2031 trigger_ht_element = NULL;
2032 free_trigger = false;
2033
2034 /*
2035 * The rest only applies to triggers that have a "notify" action.
2036 * It is not skipped as this is the only action type currently
2037 * supported.
2038 */
2039 client_list = zmalloc(sizeof(*client_list));
2040 if (!client_list) {
2041 ret = -1;
2042 goto error_free_ht_element;
2043 }
2044 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2045 CDS_INIT_LIST_HEAD(&client_list->list);
2046 client_list->trigger = trigger;
2047
2048 /* Build a list of clients to which this new trigger applies. */
2049 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2050 client_socket_ht_node) {
2051 if (!trigger_applies_to_client(trigger, client)) {
2052 continue;
2053 }
2054
2055 client_list_element = zmalloc(sizeof(*client_list_element));
2056 if (!client_list_element) {
2057 ret = -1;
2058 goto error_free_client_list;
2059 }
2060 CDS_INIT_LIST_HEAD(&client_list_element->node);
2061 client_list_element->client = client;
2062 cds_list_add(&client_list_element->node, &client_list->list);
2063 }
2064
2065 cds_lfht_add(state->notification_trigger_clients_ht,
2066 lttng_condition_hash(condition),
2067 &client_list->notification_trigger_ht_node);
2068
2069 switch (get_condition_binding_object(condition)) {
2070 case LTTNG_OBJECT_TYPE_SESSION:
2071 /* Add the trigger to the list if it matches a known session. */
2072 ret = bind_trigger_to_matching_session(trigger, state);
2073 if (ret) {
2074 goto error_free_client_list;
2075 }
2076 break;
2077 case LTTNG_OBJECT_TYPE_CHANNEL:
2078 /*
2079 * Add the trigger to list of triggers bound to the channels
2080 * currently known.
2081 */
2082 ret = bind_trigger_to_matching_channels(trigger, state);
2083 if (ret) {
2084 goto error_free_client_list;
2085 }
2086 break;
2087 case LTTNG_OBJECT_TYPE_NONE:
2088 break;
2089 default:
2090 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2091 ret = -1;
2092 goto error_free_client_list;
2093 }
2094
2095 /*
2096 * Since there is nothing preventing clients from subscribing to a
2097 * condition before the corresponding trigger is registered, we have
2098 * to evaluate this new condition right away.
2099 *
2100 * At some point, we were waiting for the next "evaluation" (e.g. on
2101 * reception of a channel sample) to evaluate this new condition, but
2102 * that was broken.
2103 *
2104 * The reason it was broken is that waiting for the next sample
2105 * does not allow us to properly handle transitions for edge-triggered
2106 * conditions.
2107 *
2108 * Consider this example: when we handle a new channel sample, we
2109 * evaluate each conditions twice: once with the previous state, and
2110 * again with the newest state. We then use those two results to
2111 * determine whether a state change happened: a condition was false and
2112 * became true. If a state change happened, we have to notify clients.
2113 *
2114 * Now, if a client subscribes to a given notification and registers
2115 * a trigger *after* that subscription, we have to make sure the
2116 * condition is evaluated at this point while considering only the
2117 * current state. Otherwise, the next evaluation cycle may only see
2118 * that the evaluations remain the same (true for samples n-1 and n) and
2119 * the client will never know that the condition has been met.
2120 */
2121 cds_list_for_each_entry_safe(client_list_element, tmp,
2122 &client_list->list, node) {
2123 ret = evaluate_condition_for_client(trigger, condition,
2124 client_list_element->client, state);
2125 if (ret) {
2126 goto error_free_client_list;
2127 }
2128 }
2129
2130 /*
2131 * Client list ownership transferred to the
2132 * notification_trigger_clients_ht.
2133 */
2134 client_list = NULL;
2135
2136 *cmd_result = LTTNG_OK;
2137 error_free_client_list:
2138 if (client_list) {
2139 cds_list_for_each_entry_safe(client_list_element, tmp,
2140 &client_list->list, node) {
2141 free(client_list_element);
2142 }
2143 free(client_list);
2144 }
2145 error_free_ht_element:
2146 free(trigger_ht_element);
2147 error:
2148 if (free_trigger) {
2149 struct lttng_action *action = lttng_trigger_get_action(trigger);
2150
2151 lttng_condition_destroy(condition);
2152 lttng_action_destroy(action);
2153 lttng_trigger_destroy(trigger);
2154 }
2155 rcu_read_unlock();
2156 return ret;
2157 }
2158
2159 static
2160 void free_notification_client_list_rcu(struct rcu_head *node)
2161 {
2162 free(caa_container_of(node, struct notification_client_list,
2163 rcu_node));
2164 }
2165
2166 static
2167 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2168 {
2169 free(caa_container_of(node, struct lttng_trigger_ht_element,
2170 rcu_node));
2171 }
2172
2173 static
2174 int handle_notification_thread_command_unregister_trigger(
2175 struct notification_thread_state *state,
2176 struct lttng_trigger *trigger,
2177 enum lttng_error_code *_cmd_reply)
2178 {
2179 struct cds_lfht_iter iter;
2180 struct cds_lfht_node *triggers_ht_node;
2181 struct lttng_channel_trigger_list *trigger_list;
2182 struct notification_client_list *client_list;
2183 struct notification_client_list_element *client_list_element, *tmp;
2184 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2185 struct lttng_condition *condition = lttng_trigger_get_condition(
2186 trigger);
2187 struct lttng_action *action;
2188 enum lttng_error_code cmd_reply;
2189
2190 rcu_read_lock();
2191
2192 cds_lfht_lookup(state->triggers_ht,
2193 lttng_condition_hash(condition),
2194 match_condition,
2195 condition,
2196 &iter);
2197 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2198 if (!triggers_ht_node) {
2199 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2200 goto end;
2201 } else {
2202 cmd_reply = LTTNG_OK;
2203 }
2204
2205 /* Remove trigger from channel_triggers_ht. */
2206 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2207 channel_triggers_ht_node) {
2208 struct lttng_trigger_list_element *trigger_element, *tmp;
2209
2210 cds_list_for_each_entry_safe(trigger_element, tmp,
2211 &trigger_list->list, node) {
2212 const struct lttng_condition *current_condition =
2213 lttng_trigger_get_const_condition(
2214 trigger_element->trigger);
2215
2216 assert(current_condition);
2217 if (!lttng_condition_is_equal(condition,
2218 current_condition)) {
2219 continue;
2220 }
2221
2222 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2223 cds_list_del(&trigger_element->node);
2224 /* A trigger can only appear once per channel */
2225 break;
2226 }
2227 }
2228
2229 /*
2230 * Remove and release the client list from
2231 * notification_trigger_clients_ht.
2232 */
2233 client_list = get_client_list_from_condition(state, condition);
2234 assert(client_list);
2235
2236 cds_list_for_each_entry_safe(client_list_element, tmp,
2237 &client_list->list, node) {
2238 free(client_list_element);
2239 }
2240 cds_lfht_del(state->notification_trigger_clients_ht,
2241 &client_list->notification_trigger_ht_node);
2242 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
2243
2244 /* Remove trigger from triggers_ht. */
2245 trigger_ht_element = caa_container_of(triggers_ht_node,
2246 struct lttng_trigger_ht_element, node);
2247 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2248
2249 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
2250 lttng_condition_destroy(condition);
2251 action = lttng_trigger_get_action(trigger_ht_element->trigger);
2252 lttng_action_destroy(action);
2253 lttng_trigger_destroy(trigger_ht_element->trigger);
2254 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2255 end:
2256 rcu_read_unlock();
2257 if (_cmd_reply) {
2258 *_cmd_reply = cmd_reply;
2259 }
2260 return 0;
2261 }
2262
2263 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2264 int handle_notification_thread_command(
2265 struct notification_thread_handle *handle,
2266 struct notification_thread_state *state)
2267 {
2268 int ret;
2269 uint64_t counter;
2270 struct notification_thread_command *cmd;
2271
2272 /* Read the event pipe to put it back into a quiescent state. */
2273 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2274 sizeof(counter));
2275 if (ret != sizeof(counter)) {
2276 goto error;
2277 }
2278
2279 pthread_mutex_lock(&handle->cmd_queue.lock);
2280 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2281 struct notification_thread_command, cmd_list_node);
2282 switch (cmd->type) {
2283 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2284 DBG("[notification-thread] Received register trigger command");
2285 ret = handle_notification_thread_command_register_trigger(
2286 state, cmd->parameters.trigger,
2287 &cmd->reply_code);
2288 break;
2289 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2290 DBG("[notification-thread] Received unregister trigger command");
2291 ret = handle_notification_thread_command_unregister_trigger(
2292 state, cmd->parameters.trigger,
2293 &cmd->reply_code);
2294 break;
2295 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2296 DBG("[notification-thread] Received add channel command");
2297 ret = handle_notification_thread_command_add_channel(
2298 state,
2299 cmd->parameters.add_channel.session.name,
2300 cmd->parameters.add_channel.session.uid,
2301 cmd->parameters.add_channel.session.gid,
2302 cmd->parameters.add_channel.channel.name,
2303 cmd->parameters.add_channel.channel.domain,
2304 cmd->parameters.add_channel.channel.key,
2305 cmd->parameters.add_channel.channel.capacity,
2306 &cmd->reply_code);
2307 break;
2308 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2309 DBG("[notification-thread] Received remove channel command");
2310 ret = handle_notification_thread_command_remove_channel(
2311 state, cmd->parameters.remove_channel.key,
2312 cmd->parameters.remove_channel.domain,
2313 &cmd->reply_code);
2314 break;
2315 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2316 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2317 DBG("[notification-thread] Received session rotation %s command",
2318 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2319 "ongoing" : "completed");
2320 ret = handle_notification_thread_command_session_rotation(
2321 state,
2322 cmd->type,
2323 cmd->parameters.session_rotation.session_name,
2324 cmd->parameters.session_rotation.uid,
2325 cmd->parameters.session_rotation.gid,
2326 cmd->parameters.session_rotation.trace_archive_chunk_id,
2327 cmd->parameters.session_rotation.location,
2328 &cmd->reply_code);
2329 break;
2330 case NOTIFICATION_COMMAND_TYPE_QUIT:
2331 DBG("[notification-thread] Received quit command");
2332 cmd->reply_code = LTTNG_OK;
2333 ret = 1;
2334 goto end;
2335 default:
2336 ERR("[notification-thread] Unknown internal command received");
2337 goto error_unlock;
2338 }
2339
2340 if (ret) {
2341 goto error_unlock;
2342 }
2343 end:
2344 cds_list_del(&cmd->cmd_list_node);
2345 lttng_waiter_wake_up(&cmd->reply_waiter);
2346 pthread_mutex_unlock(&handle->cmd_queue.lock);
2347 return ret;
2348 error_unlock:
2349 /* Wake-up and return a fatal error to the calling thread. */
2350 lttng_waiter_wake_up(&cmd->reply_waiter);
2351 pthread_mutex_unlock(&handle->cmd_queue.lock);
2352 cmd->reply_code = LTTNG_ERR_FATAL;
2353 error:
2354 /* Indicate a fatal error to the caller. */
2355 return -1;
2356 }
2357
2358 static
2359 unsigned long hash_client_socket(int socket)
2360 {
2361 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2362 }
2363
2364 static
2365 int socket_set_non_blocking(int socket)
2366 {
2367 int ret, flags;
2368
2369 /* Set the pipe as non-blocking. */
2370 ret = fcntl(socket, F_GETFL, 0);
2371 if (ret == -1) {
2372 PERROR("fcntl get socket flags");
2373 goto end;
2374 }
2375 flags = ret;
2376
2377 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2378 if (ret == -1) {
2379 PERROR("fcntl set O_NONBLOCK socket flag");
2380 goto end;
2381 }
2382 DBG("Client socket (fd = %i) set as non-blocking", socket);
2383 end:
2384 return ret;
2385 }
2386
2387 static
2388 int client_reset_inbound_state(struct notification_client *client)
2389 {
2390 int ret;
2391
2392 ret = lttng_dynamic_buffer_set_size(
2393 &client->communication.inbound.buffer, 0);
2394 assert(!ret);
2395
2396 client->communication.inbound.bytes_to_receive =
2397 sizeof(struct lttng_notification_channel_message);
2398 client->communication.inbound.msg_type =
2399 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2400 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2401 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2402 ret = lttng_dynamic_buffer_set_size(
2403 &client->communication.inbound.buffer,
2404 client->communication.inbound.bytes_to_receive);
2405 return ret;
2406 }
2407
2408 int handle_notification_thread_client_connect(
2409 struct notification_thread_state *state)
2410 {
2411 int ret;
2412 struct notification_client *client;
2413
2414 DBG("[notification-thread] Handling new notification channel client connection");
2415
2416 client = zmalloc(sizeof(*client));
2417 if (!client) {
2418 /* Fatal error. */
2419 ret = -1;
2420 goto error;
2421 }
2422 CDS_INIT_LIST_HEAD(&client->condition_list);
2423 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2424 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
2425 client->communication.inbound.expect_creds = true;
2426 ret = client_reset_inbound_state(client);
2427 if (ret) {
2428 ERR("[notification-thread] Failed to reset client communication's inbound state");
2429 ret = 0;
2430 goto error;
2431 }
2432
2433 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2434 if (ret < 0) {
2435 ERR("[notification-thread] Failed to accept new notification channel client connection");
2436 ret = 0;
2437 goto error;
2438 }
2439
2440 client->socket = ret;
2441
2442 ret = socket_set_non_blocking(client->socket);
2443 if (ret) {
2444 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2445 goto error;
2446 }
2447
2448 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2449 if (ret < 0) {
2450 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2451 ret = 0;
2452 goto error;
2453 }
2454
2455 ret = lttng_poll_add(&state->events, client->socket,
2456 LPOLLIN | LPOLLERR |
2457 LPOLLHUP | LPOLLRDHUP);
2458 if (ret < 0) {
2459 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2460 ret = 0;
2461 goto error;
2462 }
2463 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2464 client->socket);
2465
2466 rcu_read_lock();
2467 cds_lfht_add(state->client_socket_ht,
2468 hash_client_socket(client->socket),
2469 &client->client_socket_ht_node);
2470 rcu_read_unlock();
2471
2472 return ret;
2473 error:
2474 notification_client_destroy(client, state);
2475 return ret;
2476 }
2477
2478 int handle_notification_thread_client_disconnect(
2479 int client_socket,
2480 struct notification_thread_state *state)
2481 {
2482 int ret = 0;
2483 struct notification_client *client;
2484
2485 rcu_read_lock();
2486 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2487 client_socket);
2488 client = get_client_from_socket(client_socket, state);
2489 if (!client) {
2490 /* Internal state corruption, fatal error. */
2491 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2492 client_socket);
2493 ret = -1;
2494 goto end;
2495 }
2496
2497 ret = lttng_poll_del(&state->events, client_socket);
2498 if (ret) {
2499 ERR("[notification-thread] Failed to remove client socket from poll set");
2500 }
2501 cds_lfht_del(state->client_socket_ht,
2502 &client->client_socket_ht_node);
2503 notification_client_destroy(client, state);
2504 end:
2505 rcu_read_unlock();
2506 return ret;
2507 }
2508
2509 int handle_notification_thread_client_disconnect_all(
2510 struct notification_thread_state *state)
2511 {
2512 struct cds_lfht_iter iter;
2513 struct notification_client *client;
2514 bool error_encoutered = false;
2515
2516 rcu_read_lock();
2517 DBG("[notification-thread] Closing all client connections");
2518 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2519 client_socket_ht_node) {
2520 int ret;
2521
2522 ret = handle_notification_thread_client_disconnect(
2523 client->socket, state);
2524 if (ret) {
2525 error_encoutered = true;
2526 }
2527 }
2528 rcu_read_unlock();
2529 return error_encoutered ? 1 : 0;
2530 }
2531
2532 int handle_notification_thread_trigger_unregister_all(
2533 struct notification_thread_state *state)
2534 {
2535 bool error_occurred = false;
2536 struct cds_lfht_iter iter;
2537 struct lttng_trigger_ht_element *trigger_ht_element;
2538
2539 rcu_read_lock();
2540 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2541 node) {
2542 int ret = handle_notification_thread_command_unregister_trigger(
2543 state, trigger_ht_element->trigger, NULL);
2544 if (ret) {
2545 error_occurred = true;
2546 }
2547 }
2548 rcu_read_unlock();
2549 return error_occurred ? -1 : 0;
2550 }
2551
2552 static
2553 int client_flush_outgoing_queue(struct notification_client *client,
2554 struct notification_thread_state *state)
2555 {
2556 ssize_t ret;
2557 size_t to_send_count;
2558
2559 assert(client->communication.outbound.buffer.size != 0);
2560 to_send_count = client->communication.outbound.buffer.size;
2561 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2562 client->socket);
2563
2564 ret = lttcomm_send_unix_sock_non_block(client->socket,
2565 client->communication.outbound.buffer.data,
2566 to_send_count);
2567 if ((ret >= 0 && ret < to_send_count)) {
2568 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2569 client->socket);
2570 to_send_count -= max(ret, 0);
2571
2572 memcpy(client->communication.outbound.buffer.data,
2573 client->communication.outbound.buffer.data +
2574 client->communication.outbound.buffer.size - to_send_count,
2575 to_send_count);
2576 ret = lttng_dynamic_buffer_set_size(
2577 &client->communication.outbound.buffer,
2578 to_send_count);
2579 if (ret) {
2580 goto error;
2581 }
2582
2583 /*
2584 * We want to be notified whenever there is buffer space
2585 * available to send the rest of the payload.
2586 */
2587 ret = lttng_poll_mod(&state->events, client->socket,
2588 CLIENT_POLL_MASK_IN_OUT);
2589 if (ret) {
2590 goto error;
2591 }
2592 } else if (ret < 0) {
2593 /* Generic error, disconnect the client. */
2594 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2595 client->socket);
2596 ret = handle_notification_thread_client_disconnect(
2597 client->socket, state);
2598 if (ret) {
2599 goto error;
2600 }
2601 } else {
2602 /* No error and flushed the queue completely. */
2603 ret = lttng_dynamic_buffer_set_size(
2604 &client->communication.outbound.buffer, 0);
2605 if (ret) {
2606 goto error;
2607 }
2608 ret = lttng_poll_mod(&state->events, client->socket,
2609 CLIENT_POLL_MASK_IN);
2610 if (ret) {
2611 goto error;
2612 }
2613
2614 client->communication.outbound.queued_command_reply = false;
2615 client->communication.outbound.dropped_notification = false;
2616 }
2617
2618 return 0;
2619 error:
2620 return -1;
2621 }
2622
2623 static
2624 int client_send_command_reply(struct notification_client *client,
2625 struct notification_thread_state *state,
2626 enum lttng_notification_channel_status status)
2627 {
2628 int ret;
2629 struct lttng_notification_channel_command_reply reply = {
2630 .status = (int8_t) status,
2631 };
2632 struct lttng_notification_channel_message msg = {
2633 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2634 .size = sizeof(reply),
2635 };
2636 char buffer[sizeof(msg) + sizeof(reply)];
2637
2638 if (client->communication.outbound.queued_command_reply) {
2639 /* Protocol error. */
2640 goto error;
2641 }
2642
2643 memcpy(buffer, &msg, sizeof(msg));
2644 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2645 DBG("[notification-thread] Send command reply (%i)", (int) status);
2646
2647 /* Enqueue buffer to outgoing queue and flush it. */
2648 ret = lttng_dynamic_buffer_append(
2649 &client->communication.outbound.buffer,
2650 buffer, sizeof(buffer));
2651 if (ret) {
2652 goto error;
2653 }
2654
2655 ret = client_flush_outgoing_queue(client, state);
2656 if (ret) {
2657 goto error;
2658 }
2659
2660 if (client->communication.outbound.buffer.size != 0) {
2661 /* Queue could not be emptied. */
2662 client->communication.outbound.queued_command_reply = true;
2663 }
2664
2665 return 0;
2666 error:
2667 return -1;
2668 }
2669
2670 static
2671 int client_dispatch_message(struct notification_client *client,
2672 struct notification_thread_state *state)
2673 {
2674 int ret = 0;
2675
2676 if (client->communication.inbound.msg_type !=
2677 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2678 client->communication.inbound.msg_type !=
2679 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2680 !client->validated) {
2681 WARN("[notification-thread] client attempted a command before handshake");
2682 ret = -1;
2683 goto end;
2684 }
2685
2686 switch (client->communication.inbound.msg_type) {
2687 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2688 {
2689 /*
2690 * Receiving message header. The function will be called again
2691 * once the rest of the message as been received and can be
2692 * interpreted.
2693 */
2694 const struct lttng_notification_channel_message *msg;
2695
2696 assert(sizeof(*msg) ==
2697 client->communication.inbound.buffer.size);
2698 msg = (const struct lttng_notification_channel_message *)
2699 client->communication.inbound.buffer.data;
2700
2701 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2702 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2703 ret = -1;
2704 goto end;
2705 }
2706
2707 switch (msg->type) {
2708 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2709 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2710 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2711 break;
2712 default:
2713 ret = -1;
2714 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2715 goto end;
2716 }
2717
2718 client->communication.inbound.bytes_to_receive = msg->size;
2719 client->communication.inbound.msg_type =
2720 (enum lttng_notification_channel_message_type) msg->type;
2721 ret = lttng_dynamic_buffer_set_size(
2722 &client->communication.inbound.buffer, msg->size);
2723 if (ret) {
2724 goto end;
2725 }
2726 break;
2727 }
2728 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2729 {
2730 struct lttng_notification_channel_command_handshake *handshake_client;
2731 struct lttng_notification_channel_command_handshake handshake_reply = {
2732 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2733 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2734 };
2735 struct lttng_notification_channel_message msg_header = {
2736 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2737 .size = sizeof(handshake_reply),
2738 };
2739 enum lttng_notification_channel_status status =
2740 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2741 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2742
2743 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2744 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2745 sizeof(handshake_reply));
2746
2747 handshake_client =
2748 (struct lttng_notification_channel_command_handshake *)
2749 client->communication.inbound.buffer.data;
2750 client->major = handshake_client->major;
2751 client->minor = handshake_client->minor;
2752 if (!client->communication.inbound.creds_received) {
2753 ERR("[notification-thread] No credentials received from client");
2754 ret = -1;
2755 goto end;
2756 }
2757
2758 client->uid = LTTNG_SOCK_GET_UID_CRED(
2759 &client->communication.inbound.creds);
2760 client->gid = LTTNG_SOCK_GET_GID_CRED(
2761 &client->communication.inbound.creds);
2762 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2763 client->uid, client->gid, (int) client->major,
2764 (int) client->minor);
2765
2766 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2767 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2768 }
2769
2770 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2771 send_buffer, sizeof(send_buffer));
2772 if (ret) {
2773 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2774 goto end;
2775 }
2776
2777 ret = client_flush_outgoing_queue(client, state);
2778 if (ret) {
2779 goto end;
2780 }
2781
2782 ret = client_send_command_reply(client, state, status);
2783 if (ret) {
2784 ERR("[notification-thread] Failed to send reply to notification channel client");
2785 goto end;
2786 }
2787
2788 /* Set reception state to receive the next message header. */
2789 ret = client_reset_inbound_state(client);
2790 if (ret) {
2791 ERR("[notification-thread] Failed to reset client communication's inbound state");
2792 goto end;
2793 }
2794 client->validated = true;
2795 break;
2796 }
2797 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2798 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2799 {
2800 struct lttng_condition *condition;
2801 enum lttng_notification_channel_status status =
2802 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2803 struct lttng_payload_view condition_view =
2804 lttng_payload_view_from_dynamic_buffer(
2805 &client->communication.inbound.buffer,
2806 0, -1);
2807 size_t expected_condition_size =
2808 client->communication.inbound.buffer.size;
2809
2810 ret = lttng_condition_create_from_payload(&condition_view,
2811 &condition);
2812 if (ret != expected_condition_size) {
2813 ERR("[notification-thread] Malformed condition received from client");
2814 goto end;
2815 }
2816
2817 if (client->communication.inbound.msg_type ==
2818 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2819 ret = notification_thread_client_subscribe(client,
2820 condition, state, &status);
2821 } else {
2822 ret = notification_thread_client_unsubscribe(client,
2823 condition, state, &status);
2824 }
2825 if (ret) {
2826 goto end;
2827 }
2828
2829 ret = client_send_command_reply(client, state, status);
2830 if (ret) {
2831 ERR("[notification-thread] Failed to send reply to notification channel client");
2832 goto end;
2833 }
2834
2835 /* Set reception state to receive the next message header. */
2836 ret = client_reset_inbound_state(client);
2837 if (ret) {
2838 ERR("[notification-thread] Failed to reset client communication's inbound state");
2839 goto end;
2840 }
2841 break;
2842 }
2843 default:
2844 abort();
2845 }
2846 end:
2847 return ret;
2848 }
2849
2850 /* Incoming data from client. */
2851 int handle_notification_thread_client_in(
2852 struct notification_thread_state *state, int socket)
2853 {
2854 int ret = 0;
2855 struct notification_client *client;
2856 ssize_t recv_ret;
2857 size_t offset;
2858
2859 client = get_client_from_socket(socket, state);
2860 if (!client) {
2861 /* Internal error, abort. */
2862 ret = -1;
2863 goto end;
2864 }
2865
2866 offset = client->communication.inbound.buffer.size -
2867 client->communication.inbound.bytes_to_receive;
2868 if (client->communication.inbound.expect_creds) {
2869 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2870 client->communication.inbound.buffer.data + offset,
2871 client->communication.inbound.bytes_to_receive,
2872 &client->communication.inbound.creds);
2873 if (recv_ret > 0) {
2874 client->communication.inbound.expect_creds = false;
2875 client->communication.inbound.creds_received = true;
2876 }
2877 } else {
2878 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2879 client->communication.inbound.buffer.data + offset,
2880 client->communication.inbound.bytes_to_receive);
2881 }
2882 if (recv_ret < 0) {
2883 goto error_disconnect_client;
2884 }
2885
2886 client->communication.inbound.bytes_to_receive -= recv_ret;
2887 if (client->communication.inbound.bytes_to_receive == 0) {
2888 ret = client_dispatch_message(client, state);
2889 if (ret) {
2890 /*
2891 * Only returns an error if this client must be
2892 * disconnected.
2893 */
2894 goto error_disconnect_client;
2895 }
2896 } else {
2897 goto end;
2898 }
2899 end:
2900 return ret;
2901 error_disconnect_client:
2902 ret = handle_notification_thread_client_disconnect(socket, state);
2903 return ret;
2904 }
2905
2906 /* Client ready to receive outgoing data. */
2907 int handle_notification_thread_client_out(
2908 struct notification_thread_state *state, int socket)
2909 {
2910 int ret;
2911 struct notification_client *client;
2912
2913 client = get_client_from_socket(socket, state);
2914 if (!client) {
2915 /* Internal error, abort. */
2916 ret = -1;
2917 goto end;
2918 }
2919
2920 ret = client_flush_outgoing_queue(client, state);
2921 if (ret) {
2922 goto end;
2923 }
2924 end:
2925 return ret;
2926 }
2927
2928 static
2929 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2930 const struct channel_state_sample *sample,
2931 uint64_t buffer_capacity)
2932 {
2933 bool result = false;
2934 uint64_t threshold;
2935 enum lttng_condition_type condition_type;
2936 const struct lttng_condition_buffer_usage *use_condition = container_of(
2937 condition, struct lttng_condition_buffer_usage,
2938 parent);
2939
2940 if (use_condition->threshold_bytes.set) {
2941 threshold = use_condition->threshold_bytes.value;
2942 } else {
2943 /*
2944 * Threshold was expressed as a ratio.
2945 *
2946 * TODO the threshold (in bytes) of conditions expressed
2947 * as a ratio of total buffer size could be cached to
2948 * forego this double-multiplication or it could be performed
2949 * as fixed-point math.
2950 *
2951 * Note that caching should accommodates the case where the
2952 * condition applies to multiple channels (i.e. don't assume
2953 * that all channels matching my_chann* have the same size...)
2954 */
2955 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2956 (double) buffer_capacity);
2957 }
2958
2959 condition_type = lttng_condition_get_type(condition);
2960 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2961 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2962 threshold, sample->highest_usage);
2963
2964 /*
2965 * The low condition should only be triggered once _all_ of the
2966 * streams in a channel have gone below the "low" threshold.
2967 */
2968 if (sample->highest_usage <= threshold) {
2969 result = true;
2970 }
2971 } else {
2972 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2973 threshold, sample->highest_usage);
2974
2975 /*
2976 * For high buffer usage scenarios, we want to trigger whenever
2977 * _any_ of the streams has reached the "high" threshold.
2978 */
2979 if (sample->highest_usage >= threshold) {
2980 result = true;
2981 }
2982 }
2983
2984 return result;
2985 }
2986
2987 static
2988 bool evaluate_session_consumed_size_condition(
2989 const struct lttng_condition *condition,
2990 uint64_t session_consumed_size)
2991 {
2992 uint64_t threshold;
2993 const struct lttng_condition_session_consumed_size *size_condition =
2994 container_of(condition,
2995 struct lttng_condition_session_consumed_size,
2996 parent);
2997
2998 threshold = size_condition->consumed_threshold_bytes.value;
2999 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3000 threshold, session_consumed_size);
3001 return session_consumed_size >= threshold;
3002 }
3003
3004 static
3005 int evaluate_buffer_condition(const struct lttng_condition *condition,
3006 struct lttng_evaluation **evaluation,
3007 const struct notification_thread_state *state,
3008 const struct channel_state_sample *previous_sample,
3009 const struct channel_state_sample *latest_sample,
3010 uint64_t previous_session_consumed_total,
3011 uint64_t latest_session_consumed_total,
3012 struct channel_info *channel_info)
3013 {
3014 int ret = 0;
3015 enum lttng_condition_type condition_type;
3016 const bool previous_sample_available = !!previous_sample;
3017 bool previous_sample_result = false;
3018 bool latest_sample_result;
3019
3020 condition_type = lttng_condition_get_type(condition);
3021
3022 switch (condition_type) {
3023 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3024 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3025 if (caa_likely(previous_sample_available)) {
3026 previous_sample_result =
3027 evaluate_buffer_usage_condition(condition,
3028 previous_sample, channel_info->capacity);
3029 }
3030 latest_sample_result = evaluate_buffer_usage_condition(
3031 condition, latest_sample,
3032 channel_info->capacity);
3033 break;
3034 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3035 if (caa_likely(previous_sample_available)) {
3036 previous_sample_result =
3037 evaluate_session_consumed_size_condition(
3038 condition,
3039 previous_session_consumed_total);
3040 }
3041 latest_sample_result =
3042 evaluate_session_consumed_size_condition(
3043 condition,
3044 latest_session_consumed_total);
3045 break;
3046 default:
3047 /* Unknown condition type; internal error. */
3048 abort();
3049 }
3050
3051 if (!latest_sample_result ||
3052 (previous_sample_result == latest_sample_result)) {
3053 /*
3054 * Only trigger on a condition evaluation transition.
3055 *
3056 * NOTE: This edge-triggered logic may not be appropriate for
3057 * future condition types.
3058 */
3059 goto end;
3060 }
3061
3062 if (!evaluation || !latest_sample_result) {
3063 goto end;
3064 }
3065
3066 switch (condition_type) {
3067 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3068 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3069 *evaluation = lttng_evaluation_buffer_usage_create(
3070 condition_type,
3071 latest_sample->highest_usage,
3072 channel_info->capacity);
3073 break;
3074 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3075 *evaluation = lttng_evaluation_session_consumed_size_create(
3076 latest_session_consumed_total);
3077 break;
3078 default:
3079 abort();
3080 }
3081
3082 if (!*evaluation) {
3083 ret = -1;
3084 goto end;
3085 }
3086 end:
3087 return ret;
3088 }
3089
3090 static
3091 int client_enqueue_dropped_notification(struct notification_client *client)
3092 {
3093 int ret;
3094 struct lttng_notification_channel_message msg = {
3095 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3096 .size = 0,
3097 };
3098
3099 ret = lttng_dynamic_buffer_append(
3100 &client->communication.outbound.buffer, &msg,
3101 sizeof(msg));
3102 return ret;
3103 }
3104
3105 static
3106 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3107 const struct lttng_evaluation *evaluation,
3108 struct notification_client_list* client_list,
3109 struct notification_thread_state *state,
3110 uid_t channel_uid, gid_t channel_gid)
3111 {
3112 int ret = 0;
3113 struct lttng_payload msg_payload;
3114 struct notification_client_list_element *client_list_element, *tmp;
3115 const struct lttng_notification notification = {
3116 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3117 .evaluation = (struct lttng_evaluation *) evaluation,
3118 };
3119 struct lttng_notification_channel_message msg_header = {
3120 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3121 };
3122
3123 lttng_payload_init(&msg_payload);
3124
3125 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3126 sizeof(msg_header));
3127 if (ret) {
3128 goto end;
3129 }
3130
3131 ret = lttng_notification_serialize(&notification, &msg_payload);
3132 if (ret) {
3133 ERR("[notification-thread] Failed to serialize notification");
3134 ret = -1;
3135 goto end;
3136 }
3137
3138 /* Update payload size. */
3139 ((struct lttng_notification_channel_message * ) msg_payload.buffer.data)->size =
3140 (uint32_t) (msg_payload.buffer.size - sizeof(msg_header));
3141
3142 cds_list_for_each_entry_safe(client_list_element, tmp,
3143 &client_list->list, node) {
3144 struct notification_client *client =
3145 client_list_element->client;
3146
3147 if (client->uid != channel_uid && client->gid != channel_gid &&
3148 client->uid != 0) {
3149 /* Client is not allowed to monitor this channel. */
3150 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
3151 continue;
3152 }
3153
3154 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3155 client->socket, msg_payload.buffer.size);
3156 if (client->communication.outbound.buffer.size) {
3157 /*
3158 * Outgoing data is already buffered for this client;
3159 * drop the notification and enqueue a "dropped
3160 * notification" message if this is the first dropped
3161 * notification since the socket spilled-over to the
3162 * queue.
3163 */
3164 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3165 client->socket);
3166 if (!client->communication.outbound.dropped_notification) {
3167 client->communication.outbound.dropped_notification = true;
3168 ret = client_enqueue_dropped_notification(
3169 client);
3170 if (ret) {
3171 goto end;
3172 }
3173 }
3174 continue;
3175 }
3176
3177 ret = lttng_dynamic_buffer_append_buffer(
3178 &client->communication.outbound.buffer,
3179 &msg_payload.buffer);
3180 if (ret) {
3181 goto end;
3182 }
3183
3184 ret = client_flush_outgoing_queue(client, state);
3185 if (ret) {
3186 goto end;
3187 }
3188 }
3189 ret = 0;
3190 end:
3191 lttng_payload_reset(&msg_payload);
3192 return ret;
3193 }
3194
3195 int handle_notification_thread_channel_sample(
3196 struct notification_thread_state *state, int pipe,
3197 enum lttng_domain_type domain)
3198 {
3199 int ret = 0;
3200 struct lttcomm_consumer_channel_monitor_msg sample_msg;
3201 struct channel_info *channel_info;
3202 struct cds_lfht_node *node;
3203 struct cds_lfht_iter iter;
3204 struct lttng_channel_trigger_list *trigger_list;
3205 struct lttng_trigger_list_element *trigger_list_element;
3206 bool previous_sample_available = false;
3207 struct channel_state_sample previous_sample, latest_sample;
3208 uint64_t previous_session_consumed_total, latest_session_consumed_total;
3209
3210 /*
3211 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3212 * ensuring that read/write of sampling messages are atomic.
3213 */
3214 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
3215 if (ret != sizeof(sample_msg)) {
3216 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3217 pipe);
3218 ret = -1;
3219 goto end;
3220 }
3221
3222 ret = 0;
3223 latest_sample.key.key = sample_msg.key;
3224 latest_sample.key.domain = domain;
3225 latest_sample.highest_usage = sample_msg.highest;
3226 latest_sample.lowest_usage = sample_msg.lowest;
3227 latest_sample.channel_total_consumed = sample_msg.total_consumed;
3228
3229 rcu_read_lock();
3230
3231 /* Retrieve the channel's informations */
3232 cds_lfht_lookup(state->channels_ht,
3233 hash_channel_key(&latest_sample.key),
3234 match_channel_info,
3235 &latest_sample.key,
3236 &iter);
3237 node = cds_lfht_iter_get_node(&iter);
3238 if (caa_unlikely(!node)) {
3239 /*
3240 * Not an error since the consumer can push a sample to the pipe
3241 * and the rest of the session daemon could notify us of the
3242 * channel's destruction before we get a chance to process that
3243 * sample.
3244 */
3245 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3246 latest_sample.key.key,
3247 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3248 "user space");
3249 goto end_unlock;
3250 }
3251 channel_info = caa_container_of(node, struct channel_info,
3252 channels_ht_node);
3253 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
3254 channel_info->name,
3255 latest_sample.key.key,
3256 channel_info->session_info->name,
3257 latest_sample.highest_usage,
3258 latest_sample.lowest_usage,
3259 latest_sample.channel_total_consumed);
3260
3261 previous_session_consumed_total =
3262 channel_info->session_info->consumed_data_size;
3263
3264 /* Retrieve the channel's last sample, if it exists, and update it. */
3265 cds_lfht_lookup(state->channel_state_ht,
3266 hash_channel_key(&latest_sample.key),
3267 match_channel_state_sample,
3268 &latest_sample.key,
3269 &iter);
3270 node = cds_lfht_iter_get_node(&iter);
3271 if (caa_likely(node)) {
3272 struct channel_state_sample *stored_sample;
3273
3274 /* Update the sample stored. */
3275 stored_sample = caa_container_of(node,
3276 struct channel_state_sample,
3277 channel_state_ht_node);
3278
3279 memcpy(&previous_sample, stored_sample,
3280 sizeof(previous_sample));
3281 stored_sample->highest_usage = latest_sample.highest_usage;
3282 stored_sample->lowest_usage = latest_sample.lowest_usage;
3283 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
3284 previous_sample_available = true;
3285
3286 latest_session_consumed_total =
3287 previous_session_consumed_total +
3288 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
3289 } else {
3290 /*
3291 * This is the channel's first sample, allocate space for and
3292 * store the new sample.
3293 */
3294 struct channel_state_sample *stored_sample;
3295
3296 stored_sample = zmalloc(sizeof(*stored_sample));
3297 if (!stored_sample) {
3298 ret = -1;
3299 goto end_unlock;
3300 }
3301
3302 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3303 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3304 cds_lfht_add(state->channel_state_ht,
3305 hash_channel_key(&stored_sample->key),
3306 &stored_sample->channel_state_ht_node);
3307
3308 latest_session_consumed_total =
3309 previous_session_consumed_total +
3310 latest_sample.channel_total_consumed;
3311 }
3312
3313 channel_info->session_info->consumed_data_size =
3314 latest_session_consumed_total;
3315
3316 /* Find triggers associated with this channel. */
3317 cds_lfht_lookup(state->channel_triggers_ht,
3318 hash_channel_key(&latest_sample.key),
3319 match_channel_trigger_list,
3320 &latest_sample.key,
3321 &iter);
3322 node = cds_lfht_iter_get_node(&iter);
3323 if (caa_likely(!node)) {
3324 goto end_unlock;
3325 }
3326
3327 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3328 channel_triggers_ht_node);
3329 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3330 node) {
3331 const struct lttng_condition *condition;
3332 const struct lttng_action *action;
3333 const struct lttng_trigger *trigger;
3334 struct notification_client_list *client_list;
3335 struct lttng_evaluation *evaluation = NULL;
3336
3337 trigger = trigger_list_element->trigger;
3338 condition = lttng_trigger_get_const_condition(trigger);
3339 assert(condition);
3340 action = lttng_trigger_get_const_action(trigger);
3341
3342 /* Notify actions are the only type currently supported. */
3343 assert(lttng_action_get_type_const(action) ==
3344 LTTNG_ACTION_TYPE_NOTIFY);
3345
3346 /*
3347 * Check if any client is subscribed to the result of this
3348 * evaluation.
3349 */
3350 client_list = get_client_list_from_condition(state, condition);
3351 assert(client_list);
3352 if (cds_list_empty(&client_list->list)) {
3353 /*
3354 * No clients interested in the evaluation's result,
3355 * skip it.
3356 */
3357 continue;
3358 }
3359
3360 ret = evaluate_buffer_condition(condition, &evaluation, state,
3361 previous_sample_available ? &previous_sample : NULL,
3362 &latest_sample,
3363 previous_session_consumed_total,
3364 latest_session_consumed_total,
3365 channel_info);
3366 if (caa_unlikely(ret)) {
3367 goto end_unlock;
3368 }
3369
3370 if (caa_likely(!evaluation)) {
3371 continue;
3372 }
3373
3374 /* Dispatch evaluation result to all clients. */
3375 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3376 evaluation, client_list, state,
3377 channel_info->session_info->uid,
3378 channel_info->session_info->gid);
3379 lttng_evaluation_destroy(evaluation);
3380 if (caa_unlikely(ret)) {
3381 goto end_unlock;
3382 }
3383 }
3384 end_unlock:
3385 rcu_read_unlock();
3386 end:
3387 return ret;
3388 }
This page took 0.150941 seconds and 4 git commands to generate.