SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/action/group-internal.h>
23 #include <lttng/notification/notification-internal.h>
24 #include <lttng/condition/condition-internal.h>
25 #include <lttng/condition/buffer-usage-internal.h>
26 #include <lttng/condition/session-consumed-size-internal.h>
27 #include <lttng/condition/session-rotation-internal.h>
28 #include <lttng/condition/event-rule-internal.h>
29 #include <lttng/notification/channel-internal.h>
30 #include <lttng/trigger/trigger-internal.h>
31
32 #include <time.h>
33 #include <unistd.h>
34 #include <assert.h>
35 #include <inttypes.h>
36 #include <fcntl.h>
37
38 #include "notification-thread.h"
39 #include "notification-thread-events.h"
40 #include "notification-thread-commands.h"
41 #include "lttng-sessiond.h"
42 #include "kernel.h"
43
44 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
46
47 enum lttng_object_type {
48 LTTNG_OBJECT_TYPE_UNKNOWN,
49 LTTNG_OBJECT_TYPE_NONE,
50 LTTNG_OBJECT_TYPE_CHANNEL,
51 LTTNG_OBJECT_TYPE_SESSION,
52 };
53
54 struct lttng_trigger_list_element {
55 /* No ownership of the trigger object is assumed. */
56 struct lttng_trigger *trigger;
57 struct cds_list_head node;
58 };
59
60 struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
62 /* List of struct lttng_trigger_list_element. */
63 struct cds_list_head list;
64 /* Node in the channel_triggers_ht */
65 struct cds_lfht_node channel_triggers_ht_node;
66 /* call_rcu delayed reclaim. */
67 struct rcu_head rcu_node;
68 };
69
70 /*
71 * List of triggers applying to a given session.
72 *
73 * See:
74 * - lttng_session_trigger_list_create()
75 * - lttng_session_trigger_list_build()
76 * - lttng_session_trigger_list_destroy()
77 * - lttng_session_trigger_list_add()
78 */
79 struct lttng_session_trigger_list {
80 /*
81 * Not owned by this; points to the session_info structure's
82 * session name.
83 */
84 const char *session_name;
85 /* List of struct lttng_trigger_list_element. */
86 struct cds_list_head list;
87 /* Node in the session_triggers_ht */
88 struct cds_lfht_node session_triggers_ht_node;
89 /*
90 * Weak reference to the notification system's session triggers
91 * hashtable.
92 *
93 * The session trigger list structure structure is owned by
94 * the session's session_info.
95 *
96 * The session_info is kept alive the the channel_infos holding a
97 * reference to it (reference counting). When those channels are
98 * destroyed (at runtime or on teardown), the reference they hold
99 * to the session_info are released. On destruction of session_info,
100 * session_info_destroy() will remove the list of triggers applying
101 * to this session from the notification system's state.
102 *
103 * This implies that the session_triggers_ht must be destroyed
104 * after the channels.
105 */
106 struct cds_lfht *session_triggers_ht;
107 /* Used for delayed RCU reclaim. */
108 struct rcu_head rcu_node;
109 };
110
111 struct lttng_trigger_ht_element {
112 struct lttng_trigger *trigger;
113 struct cds_lfht_node node;
114 struct cds_lfht_node node_by_name;
115 /* call_rcu delayed reclaim. */
116 struct rcu_head rcu_node;
117 };
118
119 struct lttng_condition_list_element {
120 struct lttng_condition *condition;
121 struct cds_list_head node;
122 };
123
124 /*
125 * Facilities to carry the different notifications type in the action processing
126 * code path.
127 */
128 struct lttng_trigger_notification {
129 union {
130 struct lttng_ust_trigger_notification *ust;
131 uint64_t *kernel;
132 } u;
133 uint64_t id;
134 enum lttng_domain_type type;
135 };
136
137 struct channel_state_sample {
138 struct channel_key key;
139 struct cds_lfht_node channel_state_ht_node;
140 uint64_t highest_usage;
141 uint64_t lowest_usage;
142 uint64_t channel_total_consumed;
143 /* call_rcu delayed reclaim. */
144 struct rcu_head rcu_node;
145 };
146
147 static unsigned long hash_channel_key(struct channel_key *key);
148 static int evaluate_buffer_condition(const struct lttng_condition *condition,
149 struct lttng_evaluation **evaluation,
150 const struct notification_thread_state *state,
151 const struct channel_state_sample *previous_sample,
152 const struct channel_state_sample *latest_sample,
153 uint64_t previous_session_consumed_total,
154 uint64_t latest_session_consumed_total,
155 struct channel_info *channel_info);
156 static
157 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
158 const struct lttng_evaluation *evaluation,
159 struct notification_client_list *client_list,
160 struct notification_thread_state *state,
161 uid_t channel_uid, gid_t channel_gid);
162
163
164 /* session_info API */
165 static
166 void session_info_destroy(void *_data);
167 static
168 void session_info_get(struct session_info *session_info);
169 static
170 void session_info_put(struct session_info *session_info);
171 static
172 struct session_info *session_info_create(const char *name,
173 uid_t uid, gid_t gid,
174 struct lttng_session_trigger_list *trigger_list,
175 struct cds_lfht *sessions_ht);
176 static
177 void session_info_add_channel(struct session_info *session_info,
178 struct channel_info *channel_info);
179 static
180 void session_info_remove_channel(struct session_info *session_info,
181 struct channel_info *channel_info);
182
183 /* lttng_session_trigger_list API */
184 static
185 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
186 const char *session_name,
187 struct cds_lfht *session_triggers_ht);
188 static
189 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
190 const struct notification_thread_state *state,
191 const char *session_name);
192 static
193 void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list *list);
195 static
196 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
197 struct lttng_trigger *trigger);
198
199
200 static
201 int match_client_socket(struct cds_lfht_node *node, const void *key)
202 {
203 /* This double-cast is intended to supress pointer-to-cast warning. */
204 const int socket = (int) (intptr_t) key;
205 const struct notification_client *client = caa_container_of(node,
206 struct notification_client, client_socket_ht_node);
207
208 return client->socket == socket;
209 }
210
211 static
212 int match_client_id(struct cds_lfht_node *node, const void *key)
213 {
214 /* This double-cast is intended to supress pointer-to-cast warning. */
215 const notification_client_id id = *((notification_client_id *) key);
216 const struct notification_client *client = caa_container_of(
217 node, struct notification_client, client_id_ht_node);
218
219 return client->id == id;
220 }
221
222 static
223 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
224 {
225 struct channel_key *channel_key = (struct channel_key *) key;
226 struct lttng_channel_trigger_list *trigger_list;
227
228 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
229 channel_triggers_ht_node);
230
231 return !!((channel_key->key == trigger_list->channel_key.key) &&
232 (channel_key->domain == trigger_list->channel_key.domain));
233 }
234
235 static
236 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
237 {
238 const char *session_name = (const char *) key;
239 struct lttng_session_trigger_list *trigger_list;
240
241 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
242 session_triggers_ht_node);
243
244 return !!(strcmp(trigger_list->session_name, session_name) == 0);
245 }
246
247 static
248 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
249 {
250 struct channel_key *channel_key = (struct channel_key *) key;
251 struct channel_state_sample *sample;
252
253 sample = caa_container_of(node, struct channel_state_sample,
254 channel_state_ht_node);
255
256 return !!((channel_key->key == sample->key.key) &&
257 (channel_key->domain == sample->key.domain));
258 }
259
260 static
261 int match_channel_info(struct cds_lfht_node *node, const void *key)
262 {
263 struct channel_key *channel_key = (struct channel_key *) key;
264 struct channel_info *channel_info;
265
266 channel_info = caa_container_of(node, struct channel_info,
267 channels_ht_node);
268
269 return !!((channel_key->key == channel_info->key.key) &&
270 (channel_key->domain == channel_info->key.domain));
271 }
272
273 static
274 int match_trigger(struct cds_lfht_node *node, const void *key)
275 {
276 bool match = false;
277 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
278 struct lttng_trigger_ht_element *trigger_ht_element;
279 const struct lttng_credentials *creds_key;
280 const struct lttng_credentials *creds_node;
281
282 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
283 node);
284
285 match = lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
286 if (!match) {
287 goto end;
288 }
289
290 /* Validate credential */
291 /* TODO: this could be moved to lttng_trigger_equal depending on how we
292 * handle root behaviour on disable and listing.
293 */
294 creds_key = lttng_trigger_get_credentials(trigger_key);
295 creds_node = lttng_trigger_get_credentials(trigger_ht_element->trigger);
296 match = lttng_credentials_is_equal(creds_key, creds_node);
297 end:
298 return !!match;
299 }
300
301 static
302 int match_trigger_token(struct cds_lfht_node *node, const void *key)
303 {
304 const uint64_t *_key = key;
305 struct notification_trigger_tokens_ht_element *element;
306
307 element = caa_container_of(node, struct notification_trigger_tokens_ht_element,
308 node);
309 return *_key == element->token ;
310 }
311
312 static
313 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
314 {
315 struct lttng_condition *condition_key = (struct lttng_condition *) key;
316 struct notification_client_list *client_list;
317 const struct lttng_condition *condition;
318
319 assert(condition_key);
320
321 client_list = caa_container_of(node, struct notification_client_list,
322 notification_trigger_clients_ht_node);
323 condition = lttng_trigger_get_const_condition(client_list->trigger);
324
325 return !!lttng_condition_is_equal(condition_key, condition);
326 }
327
328 static
329 int match_session(struct cds_lfht_node *node, const void *key)
330 {
331 const char *name = key;
332 struct session_info *session_info = caa_container_of(
333 node, struct session_info, sessions_ht_node);
334
335 return !strcmp(session_info->name, name);
336 }
337
338 /*
339 * Match function for string node.
340 */
341 static int match_str(struct cds_lfht_node *node, const void *key)
342 {
343 struct lttng_trigger_ht_element *trigger_ht_element;
344 const char *name;
345
346 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
347 node_by_name);
348
349 /* TODO error checking */
350 lttng_trigger_get_name(trigger_ht_element->trigger, &name);
351
352 return hash_match_key_str(name, (void *) key);
353 }
354
355 static
356 unsigned long lttng_condition_buffer_usage_hash(
357 const struct lttng_condition *_condition)
358 {
359 unsigned long hash;
360 unsigned long condition_type;
361 struct lttng_condition_buffer_usage *condition;
362
363 condition = container_of(_condition,
364 struct lttng_condition_buffer_usage, parent);
365
366 condition_type = (unsigned long) condition->parent.type;
367 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
368 if (condition->session_name) {
369 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
370 }
371 if (condition->channel_name) {
372 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
373 }
374 if (condition->domain.set) {
375 hash ^= hash_key_ulong(
376 (void *) condition->domain.type,
377 lttng_ht_seed);
378 }
379 if (condition->threshold_ratio.set) {
380 uint64_t val;
381
382 val = condition->threshold_ratio.value * (double) UINT32_MAX;
383 hash ^= hash_key_u64(&val, lttng_ht_seed);
384 } else if (condition->threshold_bytes.set) {
385 uint64_t val;
386
387 val = condition->threshold_bytes.value;
388 hash ^= hash_key_u64(&val, lttng_ht_seed);
389 }
390 return hash;
391 }
392
393 static
394 unsigned long lttng_condition_session_consumed_size_hash(
395 const struct lttng_condition *_condition)
396 {
397 unsigned long hash;
398 unsigned long condition_type =
399 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
400 struct lttng_condition_session_consumed_size *condition;
401 uint64_t val;
402
403 condition = container_of(_condition,
404 struct lttng_condition_session_consumed_size, parent);
405
406 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
407 if (condition->session_name) {
408 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
409 }
410 val = condition->consumed_threshold_bytes.value;
411 hash ^= hash_key_u64(&val, lttng_ht_seed);
412 return hash;
413 }
414
415 static
416 unsigned long lttng_condition_session_rotation_hash(
417 const struct lttng_condition *_condition)
418 {
419 unsigned long hash, condition_type;
420 struct lttng_condition_session_rotation *condition;
421
422 condition = container_of(_condition,
423 struct lttng_condition_session_rotation, parent);
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
426 assert(condition->session_name);
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 return hash;
429 }
430
431 static
432 unsigned long lttng_condition_event_rule_hash(
433 const struct lttng_condition *_condition)
434 {
435 unsigned long hash, condition_type;
436 struct lttng_condition_event_rule *condition;
437
438 condition = container_of(_condition,
439 struct lttng_condition_event_rule, parent);
440 condition_type = (unsigned long) condition->parent.type;
441 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
442
443 /* TODO: further hasg using the event rule? on pattern maybe?*/
444 return hash;
445 }
446
447 /*
448 * The lttng_condition hashing code is kept in this file (rather than
449 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
450 * don't want to link in liblttng-ctl.
451 */
452 static
453 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
454 {
455 switch (condition->type) {
456 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
457 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
458 return lttng_condition_buffer_usage_hash(condition);
459 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
460 return lttng_condition_session_consumed_size_hash(condition);
461 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
462 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
463 return lttng_condition_session_rotation_hash(condition);
464 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
465 return lttng_condition_event_rule_hash(condition);
466 default:
467 ERR("[notification-thread] Unexpected condition type caught");
468 abort();
469 }
470 }
471
472 static
473 unsigned long hash_channel_key(struct channel_key *key)
474 {
475 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
476 unsigned long domain_hash = hash_key_ulong(
477 (void *) (unsigned long) key->domain, lttng_ht_seed);
478
479 return key_hash ^ domain_hash;
480 }
481
482 static
483 unsigned long hash_client_socket(int socket)
484 {
485 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
486 }
487
488 static
489 unsigned long hash_client_id(notification_client_id id)
490 {
491 return hash_key_u64(&id, lttng_ht_seed);
492 }
493
494 /*
495 * Get the type of object to which a given condition applies. Bindings let
496 * the notification system evaluate a trigger's condition when a given
497 * object's state is updated.
498 *
499 * For instance, a condition bound to a channel will be evaluated everytime
500 * the channel's state is changed by a channel monitoring sample.
501 */
502 static
503 enum lttng_object_type get_condition_binding_object(
504 const struct lttng_condition *condition)
505 {
506 switch (lttng_condition_get_type(condition)) {
507 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
508 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
509 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
510 return LTTNG_OBJECT_TYPE_CHANNEL;
511 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
512 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
513 return LTTNG_OBJECT_TYPE_SESSION;
514 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
515 return LTTNG_OBJECT_TYPE_NONE;
516 default:
517 return LTTNG_OBJECT_TYPE_UNKNOWN;
518 }
519 }
520
521 static
522 void free_channel_info_rcu(struct rcu_head *node)
523 {
524 free(caa_container_of(node, struct channel_info, rcu_node));
525 }
526
527 static
528 void channel_info_destroy(struct channel_info *channel_info)
529 {
530 if (!channel_info) {
531 return;
532 }
533
534 if (channel_info->session_info) {
535 session_info_remove_channel(channel_info->session_info,
536 channel_info);
537 session_info_put(channel_info->session_info);
538 }
539 if (channel_info->name) {
540 free(channel_info->name);
541 }
542 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
543 }
544
545 static
546 void free_session_info_rcu(struct rcu_head *node)
547 {
548 free(caa_container_of(node, struct session_info, rcu_node));
549 }
550
551 /* Don't call directly, use the ref-counting mechanism. */
552 static
553 void session_info_destroy(void *_data)
554 {
555 struct session_info *session_info = _data;
556 int ret;
557
558 assert(session_info);
559 if (session_info->channel_infos_ht) {
560 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
561 if (ret) {
562 ERR("[notification-thread] Failed to destroy channel information hash table");
563 }
564 }
565 lttng_session_trigger_list_destroy(session_info->trigger_list);
566
567 rcu_read_lock();
568 cds_lfht_del(session_info->sessions_ht,
569 &session_info->sessions_ht_node);
570 rcu_read_unlock();
571 free(session_info->name);
572 call_rcu(&session_info->rcu_node, free_session_info_rcu);
573 }
574
575 static
576 void session_info_get(struct session_info *session_info)
577 {
578 if (!session_info) {
579 return;
580 }
581 lttng_ref_get(&session_info->ref);
582 }
583
584 static
585 void session_info_put(struct session_info *session_info)
586 {
587 if (!session_info) {
588 return;
589 }
590 lttng_ref_put(&session_info->ref);
591 }
592
593 static
594 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
595 struct lttng_session_trigger_list *trigger_list,
596 struct cds_lfht *sessions_ht)
597 {
598 struct session_info *session_info;
599
600 assert(name);
601
602 session_info = zmalloc(sizeof(*session_info));
603 if (!session_info) {
604 goto end;
605 }
606 lttng_ref_init(&session_info->ref, session_info_destroy);
607
608 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
609 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
610 if (!session_info->channel_infos_ht) {
611 goto error;
612 }
613
614 cds_lfht_node_init(&session_info->sessions_ht_node);
615 session_info->name = strdup(name);
616 if (!session_info->name) {
617 goto error;
618 }
619 session_info->uid = uid;
620 session_info->gid = gid;
621 session_info->trigger_list = trigger_list;
622 session_info->sessions_ht = sessions_ht;
623 end:
624 return session_info;
625 error:
626 session_info_put(session_info);
627 return NULL;
628 }
629
630 static
631 void session_info_add_channel(struct session_info *session_info,
632 struct channel_info *channel_info)
633 {
634 rcu_read_lock();
635 cds_lfht_add(session_info->channel_infos_ht,
636 hash_channel_key(&channel_info->key),
637 &channel_info->session_info_channels_ht_node);
638 rcu_read_unlock();
639 }
640
641 static
642 void session_info_remove_channel(struct session_info *session_info,
643 struct channel_info *channel_info)
644 {
645 rcu_read_lock();
646 cds_lfht_del(session_info->channel_infos_ht,
647 &channel_info->session_info_channels_ht_node);
648 rcu_read_unlock();
649 }
650
651 static
652 struct channel_info *channel_info_create(const char *channel_name,
653 struct channel_key *channel_key, uint64_t channel_capacity,
654 struct session_info *session_info)
655 {
656 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
657
658 if (!channel_info) {
659 goto end;
660 }
661
662 cds_lfht_node_init(&channel_info->channels_ht_node);
663 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
664 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
665 channel_info->capacity = channel_capacity;
666
667 channel_info->name = strdup(channel_name);
668 if (!channel_info->name) {
669 goto error;
670 }
671
672 /*
673 * Set the references between session and channel infos:
674 * - channel_info holds a strong reference to session_info
675 * - session_info holds a weak reference to channel_info
676 */
677 session_info_get(session_info);
678 session_info_add_channel(session_info, channel_info);
679 channel_info->session_info = session_info;
680 end:
681 return channel_info;
682 error:
683 channel_info_destroy(channel_info);
684 return NULL;
685 }
686
687 LTTNG_HIDDEN
688 bool notification_client_list_get(struct notification_client_list *list)
689 {
690 return urcu_ref_get_unless_zero(&list->ref);
691 }
692
693 static
694 void free_notification_client_list_rcu(struct rcu_head *node)
695 {
696 free(caa_container_of(node, struct notification_client_list,
697 rcu_node));
698 }
699
700 static
701 void notification_client_list_release(struct urcu_ref *list_ref)
702 {
703 struct notification_client_list *list =
704 container_of(list_ref, typeof(*list), ref);
705 struct notification_client_list_element *client_list_element, *tmp;
706
707 if (list->notification_trigger_clients_ht) {
708 rcu_read_lock();
709 cds_lfht_del(list->notification_trigger_clients_ht,
710 &list->notification_trigger_clients_ht_node);
711 rcu_read_unlock();
712 list->notification_trigger_clients_ht = NULL;
713 }
714 cds_list_for_each_entry_safe(client_list_element, tmp,
715 &list->list, node) {
716 free(client_list_element);
717 }
718 pthread_mutex_destroy(&list->lock);
719 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
720 }
721
722 static
723 struct notification_client_list *notification_client_list_create(
724 const struct lttng_trigger *trigger)
725 {
726 struct notification_client_list *client_list =
727 zmalloc(sizeof(*client_list));
728
729 if (!client_list) {
730 goto error;
731 }
732 pthread_mutex_init(&client_list->lock, NULL);
733 urcu_ref_init(&client_list->ref);
734 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
735 CDS_INIT_LIST_HEAD(&client_list->list);
736 client_list->trigger = trigger;
737 error:
738 return client_list;
739 }
740
741 static
742 void publish_notification_client_list(
743 struct notification_thread_state *state,
744 struct notification_client_list *list)
745 {
746 const struct lttng_condition *condition =
747 lttng_trigger_get_const_condition(list->trigger);
748
749 assert(!list->notification_trigger_clients_ht);
750
751 list->notification_trigger_clients_ht =
752 state->notification_trigger_clients_ht;
753
754 rcu_read_lock();
755 cds_lfht_add(state->notification_trigger_clients_ht,
756 lttng_condition_hash(condition),
757 &list->notification_trigger_clients_ht_node);
758 rcu_read_unlock();
759 }
760
761 LTTNG_HIDDEN
762 void notification_client_list_put(struct notification_client_list *list)
763 {
764 if (!list) {
765 return;
766 }
767 return urcu_ref_put(&list->ref, notification_client_list_release);
768 }
769
770 /* Provides a reference to the returned list. */
771 static
772 struct notification_client_list *get_client_list_from_condition(
773 struct notification_thread_state *state,
774 const struct lttng_condition *condition)
775 {
776 struct cds_lfht_node *node;
777 struct cds_lfht_iter iter;
778 struct notification_client_list *list = NULL;
779
780 rcu_read_lock();
781 cds_lfht_lookup(state->notification_trigger_clients_ht,
782 lttng_condition_hash(condition),
783 match_client_list_condition,
784 condition,
785 &iter);
786 node = cds_lfht_iter_get_node(&iter);
787 if (node) {
788 list = container_of(node, struct notification_client_list,
789 notification_trigger_clients_ht_node);
790 list = notification_client_list_get(list) ? list : NULL;
791 }
792 rcu_read_unlock();
793 return list;
794 }
795
796 static
797 int evaluate_channel_condition_for_client(
798 const struct lttng_condition *condition,
799 struct notification_thread_state *state,
800 struct lttng_evaluation **evaluation,
801 uid_t *session_uid, gid_t *session_gid)
802 {
803 int ret;
804 struct cds_lfht_iter iter;
805 struct cds_lfht_node *node;
806 struct channel_info *channel_info = NULL;
807 struct channel_key *channel_key = NULL;
808 struct channel_state_sample *last_sample = NULL;
809 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
810
811 rcu_read_lock();
812
813 /* Find the channel associated with the condition. */
814 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
815 channel_trigger_list, channel_triggers_ht_node) {
816 struct lttng_trigger_list_element *element;
817
818 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
819 const struct lttng_condition *current_condition =
820 lttng_trigger_get_const_condition(
821 element->trigger);
822
823 assert(current_condition);
824 if (!lttng_condition_is_equal(condition,
825 current_condition)) {
826 continue;
827 }
828
829 /* Found the trigger, save the channel key. */
830 channel_key = &channel_trigger_list->channel_key;
831 break;
832 }
833 if (channel_key) {
834 /* The channel key was found stop iteration. */
835 break;
836 }
837 }
838
839 if (!channel_key){
840 /* No channel found; normal exit. */
841 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
842 ret = 0;
843 goto end;
844 }
845
846 /* Fetch channel info for the matching channel. */
847 cds_lfht_lookup(state->channels_ht,
848 hash_channel_key(channel_key),
849 match_channel_info,
850 channel_key,
851 &iter);
852 node = cds_lfht_iter_get_node(&iter);
853 assert(node);
854 channel_info = caa_container_of(node, struct channel_info,
855 channels_ht_node);
856
857 /* Retrieve the channel's last sample, if it exists. */
858 cds_lfht_lookup(state->channel_state_ht,
859 hash_channel_key(channel_key),
860 match_channel_state_sample,
861 channel_key,
862 &iter);
863 node = cds_lfht_iter_get_node(&iter);
864 if (node) {
865 last_sample = caa_container_of(node,
866 struct channel_state_sample,
867 channel_state_ht_node);
868 } else {
869 /* Nothing to evaluate, no sample was ever taken. Normal exit */
870 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
871 ret = 0;
872 goto end;
873 }
874
875 ret = evaluate_buffer_condition(condition, evaluation, state,
876 NULL, last_sample,
877 0, channel_info->session_info->consumed_data_size,
878 channel_info);
879 if (ret) {
880 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
881 goto end;
882 }
883
884 *session_uid = channel_info->session_info->uid;
885 *session_gid = channel_info->session_info->gid;
886 end:
887 rcu_read_unlock();
888 return ret;
889 }
890
891 static
892 const char *get_condition_session_name(const struct lttng_condition *condition)
893 {
894 const char *session_name = NULL;
895 enum lttng_condition_status status;
896
897 switch (lttng_condition_get_type(condition)) {
898 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
899 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
900 status = lttng_condition_buffer_usage_get_session_name(
901 condition, &session_name);
902 break;
903 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
904 status = lttng_condition_session_consumed_size_get_session_name(
905 condition, &session_name);
906 break;
907 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
908 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
909 status = lttng_condition_session_rotation_get_session_name(
910 condition, &session_name);
911 break;
912 default:
913 abort();
914 }
915 if (status != LTTNG_CONDITION_STATUS_OK) {
916 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
917 goto end;
918 }
919 end:
920 return session_name;
921 }
922
923 static
924 int evaluate_session_condition_for_client(
925 const struct lttng_condition *condition,
926 struct notification_thread_state *state,
927 struct lttng_evaluation **evaluation,
928 uid_t *session_uid, gid_t *session_gid)
929 {
930 int ret;
931 struct cds_lfht_iter iter;
932 struct cds_lfht_node *node;
933 const char *session_name;
934 struct session_info *session_info = NULL;
935
936 rcu_read_lock();
937 session_name = get_condition_session_name(condition);
938
939 /* Find the session associated with the trigger. */
940 cds_lfht_lookup(state->sessions_ht,
941 hash_key_str(session_name, lttng_ht_seed),
942 match_session,
943 session_name,
944 &iter);
945 node = cds_lfht_iter_get_node(&iter);
946 if (!node) {
947 DBG("[notification-thread] No known session matching name \"%s\"",
948 session_name);
949 ret = 0;
950 goto end;
951 }
952
953 session_info = caa_container_of(node, struct session_info,
954 sessions_ht_node);
955 session_info_get(session_info);
956
957 /*
958 * Evaluation is performed in-line here since only one type of
959 * session-bound condition is handled for the moment.
960 */
961 switch (lttng_condition_get_type(condition)) {
962 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
963 if (!session_info->rotation.ongoing) {
964 ret = 0;
965 goto end_session_put;
966 }
967
968 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
969 session_info->rotation.id);
970 if (!*evaluation) {
971 /* Fatal error. */
972 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
973 session_info->name);
974 ret = -1;
975 goto end_session_put;
976 }
977 ret = 0;
978 break;
979 default:
980 ret = 0;
981 goto end_session_put;
982 }
983
984 *session_uid = session_info->uid;
985 *session_gid = session_info->gid;
986
987 end_session_put:
988 session_info_put(session_info);
989 end:
990 rcu_read_unlock();
991 return ret;
992 }
993
994 static
995 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
996 const struct lttng_condition *condition,
997 struct notification_client *client,
998 struct notification_thread_state *state)
999 {
1000 int ret;
1001 struct lttng_evaluation *evaluation = NULL;
1002 struct notification_client_list client_list = {
1003 .lock = PTHREAD_MUTEX_INITIALIZER,
1004 };
1005 struct notification_client_list_element client_list_element = { 0 };
1006 uid_t object_uid = 0;
1007 gid_t object_gid = 0;
1008
1009 assert(trigger);
1010 assert(condition);
1011 assert(client);
1012 assert(state);
1013
1014 switch (get_condition_binding_object(condition)) {
1015 case LTTNG_OBJECT_TYPE_SESSION:
1016 ret = evaluate_session_condition_for_client(condition, state,
1017 &evaluation, &object_uid, &object_gid);
1018 break;
1019 case LTTNG_OBJECT_TYPE_CHANNEL:
1020 ret = evaluate_channel_condition_for_client(condition, state,
1021 &evaluation, &object_uid, &object_gid);
1022 break;
1023 case LTTNG_OBJECT_TYPE_NONE:
1024 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
1025 ret = 0;
1026 goto end;
1027 case LTTNG_OBJECT_TYPE_UNKNOWN:
1028 default:
1029 ret = -1;
1030 goto end;
1031 }
1032 if (ret) {
1033 /* Fatal error. */
1034 goto end;
1035 }
1036 if (!evaluation) {
1037 /* Evaluation yielded nothing. Normal exit. */
1038 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1039 ret = 0;
1040 goto end;
1041 }
1042
1043 /*
1044 * Create a temporary client list with the client currently
1045 * subscribing.
1046 */
1047 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1048 CDS_INIT_LIST_HEAD(&client_list.list);
1049 client_list.trigger = trigger;
1050
1051 CDS_INIT_LIST_HEAD(&client_list_element.node);
1052 client_list_element.client = client;
1053 cds_list_add(&client_list_element.node, &client_list.list);
1054
1055 /* Send evaluation result to the newly-subscribed client. */
1056 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1057 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1058 state, object_uid, object_gid);
1059
1060 end:
1061 return ret;
1062 }
1063
1064 static
1065 int notification_thread_client_subscribe(struct notification_client *client,
1066 struct lttng_condition *condition,
1067 struct notification_thread_state *state,
1068 enum lttng_notification_channel_status *_status)
1069 {
1070 int ret = 0;
1071 struct notification_client_list *client_list = NULL;
1072 struct lttng_condition_list_element *condition_list_element = NULL;
1073 struct notification_client_list_element *client_list_element = NULL;
1074 enum lttng_notification_channel_status status =
1075 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1076
1077 /*
1078 * Ensure that the client has not already subscribed to this condition
1079 * before.
1080 */
1081 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1082 if (lttng_condition_is_equal(condition_list_element->condition,
1083 condition)) {
1084 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1085 goto end;
1086 }
1087 }
1088
1089 condition_list_element = zmalloc(sizeof(*condition_list_element));
1090 if (!condition_list_element) {
1091 ret = -1;
1092 goto error;
1093 }
1094 client_list_element = zmalloc(sizeof(*client_list_element));
1095 if (!client_list_element) {
1096 ret = -1;
1097 goto error;
1098 }
1099
1100 /*
1101 * Add the newly-subscribed condition to the client's subscription list.
1102 */
1103 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1104 condition_list_element->condition = condition;
1105 cds_list_add(&condition_list_element->node, &client->condition_list);
1106
1107 client_list = get_client_list_from_condition(state, condition);
1108 if (!client_list) {
1109 /*
1110 * No notification-emiting trigger registered with this
1111 * condition. We don't evaluate the condition right away
1112 * since this trigger is not registered yet.
1113 */
1114 free(client_list_element);
1115 goto end;
1116 }
1117
1118 /*
1119 * The condition to which the client just subscribed is evaluated
1120 * at this point so that conditions that are already TRUE result
1121 * in a notification being sent out.
1122 *
1123 * The client_list's trigger is used without locking the list itself.
1124 * This is correct since the list doesn't own the trigger and the
1125 * object is immutable.
1126 */
1127 if (evaluate_condition_for_client(client_list->trigger, condition,
1128 client, state)) {
1129 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1130 ret = -1;
1131 free(client_list_element);
1132 goto end;
1133 }
1134
1135 /*
1136 * Add the client to the list of clients interested in a given trigger
1137 * if a "notification" trigger with a corresponding condition was
1138 * added prior.
1139 */
1140 client_list_element->client = client;
1141 CDS_INIT_LIST_HEAD(&client_list_element->node);
1142
1143 pthread_mutex_lock(&client_list->lock);
1144 cds_list_add(&client_list_element->node, &client_list->list);
1145 pthread_mutex_unlock(&client_list->lock);
1146 end:
1147 if (_status) {
1148 *_status = status;
1149 }
1150 if (client_list) {
1151 notification_client_list_put(client_list);
1152 }
1153 return ret;
1154 error:
1155 free(condition_list_element);
1156 free(client_list_element);
1157 return ret;
1158 }
1159
1160 static
1161 int notification_thread_client_unsubscribe(
1162 struct notification_client *client,
1163 struct lttng_condition *condition,
1164 struct notification_thread_state *state,
1165 enum lttng_notification_channel_status *_status)
1166 {
1167 struct notification_client_list *client_list;
1168 struct lttng_condition_list_element *condition_list_element,
1169 *condition_tmp;
1170 struct notification_client_list_element *client_list_element,
1171 *client_tmp;
1172 bool condition_found = false;
1173 enum lttng_notification_channel_status status =
1174 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1175
1176 /* Remove the condition from the client's condition list. */
1177 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1178 &client->condition_list, node) {
1179 if (!lttng_condition_is_equal(condition_list_element->condition,
1180 condition)) {
1181 continue;
1182 }
1183
1184 cds_list_del(&condition_list_element->node);
1185 /*
1186 * The caller may be iterating on the client's conditions to
1187 * tear down a client's connection. In this case, the condition
1188 * will be destroyed at the end.
1189 */
1190 if (condition != condition_list_element->condition) {
1191 lttng_condition_destroy(
1192 condition_list_element->condition);
1193 }
1194 free(condition_list_element);
1195 condition_found = true;
1196 break;
1197 }
1198
1199 if (!condition_found) {
1200 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1201 goto end;
1202 }
1203
1204 /*
1205 * Remove the client from the list of clients interested the trigger
1206 * matching the condition.
1207 */
1208 client_list = get_client_list_from_condition(state, condition);
1209 if (!client_list) {
1210 goto end;
1211 }
1212
1213 pthread_mutex_lock(&client_list->lock);
1214 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1215 &client_list->list, node) {
1216 if (client_list_element->client->id != client->id) {
1217 continue;
1218 }
1219 cds_list_del(&client_list_element->node);
1220 free(client_list_element);
1221 break;
1222 }
1223 pthread_mutex_unlock(&client_list->lock);
1224 notification_client_list_put(client_list);
1225 client_list = NULL;
1226 end:
1227 lttng_condition_destroy(condition);
1228 if (_status) {
1229 *_status = status;
1230 }
1231 return 0;
1232 }
1233
1234 static
1235 void free_notification_client_rcu(struct rcu_head *node)
1236 {
1237 free(caa_container_of(node, struct notification_client, rcu_node));
1238 }
1239
1240 static
1241 void notification_client_destroy(struct notification_client *client,
1242 struct notification_thread_state *state)
1243 {
1244 if (!client) {
1245 return;
1246 }
1247
1248 /*
1249 * The client object is not reachable by other threads, no need to lock
1250 * the client here.
1251 */
1252 if (client->socket >= 0) {
1253 (void) lttcomm_close_unix_sock(client->socket);
1254 client->socket = -1;
1255 }
1256 client->communication.active = false;
1257 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1258 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1259 pthread_mutex_destroy(&client->lock);
1260 call_rcu(&client->rcu_node, free_notification_client_rcu);
1261 }
1262
1263 /*
1264 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1265 * client pointer).
1266 */
1267 static
1268 struct notification_client *get_client_from_socket(int socket,
1269 struct notification_thread_state *state)
1270 {
1271 struct cds_lfht_iter iter;
1272 struct cds_lfht_node *node;
1273 struct notification_client *client = NULL;
1274
1275 cds_lfht_lookup(state->client_socket_ht,
1276 hash_client_socket(socket),
1277 match_client_socket,
1278 (void *) (unsigned long) socket,
1279 &iter);
1280 node = cds_lfht_iter_get_node(&iter);
1281 if (!node) {
1282 goto end;
1283 }
1284
1285 client = caa_container_of(node, struct notification_client,
1286 client_socket_ht_node);
1287 end:
1288 return client;
1289 }
1290
1291 /*
1292 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1293 * client pointer).
1294 */
1295 static
1296 struct notification_client *get_client_from_id(notification_client_id id,
1297 struct notification_thread_state *state)
1298 {
1299 struct cds_lfht_iter iter;
1300 struct cds_lfht_node *node;
1301 struct notification_client *client = NULL;
1302
1303 cds_lfht_lookup(state->client_id_ht,
1304 hash_client_id(id),
1305 match_client_id,
1306 &id,
1307 &iter);
1308 node = cds_lfht_iter_get_node(&iter);
1309 if (!node) {
1310 goto end;
1311 }
1312
1313 client = caa_container_of(node, struct notification_client,
1314 client_id_ht_node);
1315 end:
1316 return client;
1317 }
1318
1319 static
1320 bool buffer_usage_condition_applies_to_channel(
1321 const struct lttng_condition *condition,
1322 const struct channel_info *channel_info)
1323 {
1324 enum lttng_condition_status status;
1325 enum lttng_domain_type condition_domain;
1326 const char *condition_session_name = NULL;
1327 const char *condition_channel_name = NULL;
1328
1329 status = lttng_condition_buffer_usage_get_domain_type(condition,
1330 &condition_domain);
1331 assert(status == LTTNG_CONDITION_STATUS_OK);
1332 if (channel_info->key.domain != condition_domain) {
1333 goto fail;
1334 }
1335
1336 status = lttng_condition_buffer_usage_get_session_name(
1337 condition, &condition_session_name);
1338 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1339
1340 status = lttng_condition_buffer_usage_get_channel_name(
1341 condition, &condition_channel_name);
1342 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1343
1344 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1345 goto fail;
1346 }
1347 if (strcmp(channel_info->name, condition_channel_name)) {
1348 goto fail;
1349 }
1350
1351 return true;
1352 fail:
1353 return false;
1354 }
1355
1356 static
1357 bool session_consumed_size_condition_applies_to_channel(
1358 const struct lttng_condition *condition,
1359 const struct channel_info *channel_info)
1360 {
1361 enum lttng_condition_status status;
1362 const char *condition_session_name = NULL;
1363
1364 status = lttng_condition_session_consumed_size_get_session_name(
1365 condition, &condition_session_name);
1366 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1367
1368 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1369 goto fail;
1370 }
1371
1372 return true;
1373 fail:
1374 return false;
1375 }
1376
1377 static
1378 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1379 const struct channel_info *channel_info)
1380 {
1381 const struct lttng_condition *condition;
1382 bool trigger_applies;
1383
1384 condition = lttng_trigger_get_const_condition(trigger);
1385 if (!condition) {
1386 goto fail;
1387 }
1388
1389 switch (lttng_condition_get_type(condition)) {
1390 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1391 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1392 trigger_applies = buffer_usage_condition_applies_to_channel(
1393 condition, channel_info);
1394 break;
1395 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1396 trigger_applies = session_consumed_size_condition_applies_to_channel(
1397 condition, channel_info);
1398 break;
1399 default:
1400 goto fail;
1401 }
1402
1403 return trigger_applies;
1404 fail:
1405 return false;
1406 }
1407
1408 static
1409 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1410 struct notification_client *client)
1411 {
1412 bool applies = false;
1413 struct lttng_condition_list_element *condition_list_element;
1414
1415 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1416 node) {
1417 applies = lttng_condition_is_equal(
1418 condition_list_element->condition,
1419 lttng_trigger_get_condition(trigger));
1420 if (applies) {
1421 break;
1422 }
1423 }
1424 return applies;
1425 }
1426
1427 /* Must be called with RCU read lock held. */
1428 static
1429 struct lttng_session_trigger_list *get_session_trigger_list(
1430 struct notification_thread_state *state,
1431 const char *session_name)
1432 {
1433 struct lttng_session_trigger_list *list = NULL;
1434 struct cds_lfht_node *node;
1435 struct cds_lfht_iter iter;
1436
1437 cds_lfht_lookup(state->session_triggers_ht,
1438 hash_key_str(session_name, lttng_ht_seed),
1439 match_session_trigger_list,
1440 session_name,
1441 &iter);
1442 node = cds_lfht_iter_get_node(&iter);
1443 if (!node) {
1444 /*
1445 * Not an error, the list of triggers applying to that session
1446 * will be initialized when the session is created.
1447 */
1448 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1449 session_name);
1450 goto end;
1451 }
1452
1453 list = caa_container_of(node,
1454 struct lttng_session_trigger_list,
1455 session_triggers_ht_node);
1456 end:
1457 return list;
1458 }
1459
1460 /*
1461 * Allocate an empty lttng_session_trigger_list for the session named
1462 * 'session_name'.
1463 *
1464 * No ownership of 'session_name' is assumed by the session trigger list.
1465 * It is the caller's responsability to ensure the session name is alive
1466 * for as long as this list is.
1467 */
1468 static
1469 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1470 const char *session_name,
1471 struct cds_lfht *session_triggers_ht)
1472 {
1473 struct lttng_session_trigger_list *list;
1474
1475 list = zmalloc(sizeof(*list));
1476 if (!list) {
1477 goto end;
1478 }
1479 list->session_name = session_name;
1480 CDS_INIT_LIST_HEAD(&list->list);
1481 cds_lfht_node_init(&list->session_triggers_ht_node);
1482 list->session_triggers_ht = session_triggers_ht;
1483
1484 rcu_read_lock();
1485 /* Publish the list through the session_triggers_ht. */
1486 cds_lfht_add(session_triggers_ht,
1487 hash_key_str(session_name, lttng_ht_seed),
1488 &list->session_triggers_ht_node);
1489 rcu_read_unlock();
1490 end:
1491 return list;
1492 }
1493
1494 static
1495 void free_session_trigger_list_rcu(struct rcu_head *node)
1496 {
1497 free(caa_container_of(node, struct lttng_session_trigger_list,
1498 rcu_node));
1499 }
1500
1501 static
1502 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1503 {
1504 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1505
1506 /* Empty the list element by element, and then free the list itself. */
1507 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1508 &list->list, node) {
1509 cds_list_del(&trigger_list_element->node);
1510 free(trigger_list_element);
1511 }
1512 rcu_read_lock();
1513 /* Unpublish the list from the session_triggers_ht. */
1514 cds_lfht_del(list->session_triggers_ht,
1515 &list->session_triggers_ht_node);
1516 rcu_read_unlock();
1517 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1518 }
1519
1520 static
1521 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1522 struct lttng_trigger *trigger)
1523 {
1524 int ret = 0;
1525 struct lttng_trigger_list_element *new_element =
1526 zmalloc(sizeof(*new_element));
1527
1528 if (!new_element) {
1529 ret = -1;
1530 goto end;
1531 }
1532 CDS_INIT_LIST_HEAD(&new_element->node);
1533 new_element->trigger = trigger;
1534 cds_list_add(&new_element->node, &list->list);
1535 end:
1536 return ret;
1537 }
1538
1539 static
1540 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1541 const char *session_name)
1542 {
1543 bool applies = false;
1544 const struct lttng_condition *condition;
1545
1546 condition = lttng_trigger_get_const_condition(trigger);
1547 switch (lttng_condition_get_type(condition)) {
1548 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1549 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1550 {
1551 enum lttng_condition_status condition_status;
1552 const char *condition_session_name;
1553
1554 condition_status = lttng_condition_session_rotation_get_session_name(
1555 condition, &condition_session_name);
1556 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1557 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1558 goto end;
1559 }
1560
1561 assert(condition_session_name);
1562 applies = !strcmp(condition_session_name, session_name);
1563 break;
1564 }
1565 default:
1566 goto end;
1567 }
1568 end:
1569 return applies;
1570 }
1571
1572 /*
1573 * Allocate and initialize an lttng_session_trigger_list which contains
1574 * all triggers that apply to the session named 'session_name'.
1575 *
1576 * No ownership of 'session_name' is assumed by the session trigger list.
1577 * It is the caller's responsability to ensure the session name is alive
1578 * for as long as this list is.
1579 */
1580 static
1581 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1582 const struct notification_thread_state *state,
1583 const char *session_name)
1584 {
1585 int trigger_count = 0;
1586 struct lttng_session_trigger_list *session_trigger_list = NULL;
1587 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1588 struct cds_lfht_iter iter;
1589
1590 session_trigger_list = lttng_session_trigger_list_create(session_name,
1591 state->session_triggers_ht);
1592
1593 /* Add all triggers applying to the session named 'session_name'. */
1594 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1595 node) {
1596 int ret;
1597
1598 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1599 session_name)) {
1600 continue;
1601 }
1602
1603 ret = lttng_session_trigger_list_add(session_trigger_list,
1604 trigger_ht_element->trigger);
1605 if (ret) {
1606 goto error;
1607 }
1608
1609 trigger_count++;
1610 }
1611
1612 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1613 trigger_count);
1614 return session_trigger_list;
1615 error:
1616 lttng_session_trigger_list_destroy(session_trigger_list);
1617 return NULL;
1618 }
1619
1620 static
1621 struct session_info *find_or_create_session_info(
1622 struct notification_thread_state *state,
1623 const char *name, uid_t uid, gid_t gid)
1624 {
1625 struct session_info *session = NULL;
1626 struct cds_lfht_node *node;
1627 struct cds_lfht_iter iter;
1628 struct lttng_session_trigger_list *trigger_list;
1629
1630 rcu_read_lock();
1631 cds_lfht_lookup(state->sessions_ht,
1632 hash_key_str(name, lttng_ht_seed),
1633 match_session,
1634 name,
1635 &iter);
1636 node = cds_lfht_iter_get_node(&iter);
1637 if (node) {
1638 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1639 name, uid, gid);
1640 session = caa_container_of(node, struct session_info,
1641 sessions_ht_node);
1642 assert(session->uid == uid);
1643 assert(session->gid == gid);
1644 session_info_get(session);
1645 goto end;
1646 }
1647
1648 trigger_list = lttng_session_trigger_list_build(state, name);
1649 if (!trigger_list) {
1650 goto error;
1651 }
1652
1653 session = session_info_create(name, uid, gid, trigger_list,
1654 state->sessions_ht);
1655 if (!session) {
1656 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1657 name, uid, gid);
1658 lttng_session_trigger_list_destroy(trigger_list);
1659 goto error;
1660 }
1661 trigger_list = NULL;
1662
1663 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1664 &session->sessions_ht_node);
1665 end:
1666 rcu_read_unlock();
1667 return session;
1668 error:
1669 rcu_read_unlock();
1670 session_info_put(session);
1671 return NULL;
1672 }
1673
1674 static
1675 int handle_notification_thread_command_add_channel(
1676 struct notification_thread_state *state,
1677 const char *session_name, uid_t session_uid, gid_t session_gid,
1678 const char *channel_name, enum lttng_domain_type channel_domain,
1679 uint64_t channel_key_int, uint64_t channel_capacity,
1680 enum lttng_error_code *cmd_result)
1681 {
1682 struct cds_list_head trigger_list;
1683 struct channel_info *new_channel_info = NULL;
1684 struct channel_key channel_key = {
1685 .key = channel_key_int,
1686 .domain = channel_domain,
1687 };
1688 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1689 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1690 int trigger_count = 0;
1691 struct cds_lfht_iter iter;
1692 struct session_info *session_info = NULL;
1693
1694 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1695 channel_name, session_name, channel_key_int,
1696 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1697
1698 CDS_INIT_LIST_HEAD(&trigger_list);
1699
1700 session_info = find_or_create_session_info(state, session_name,
1701 session_uid, session_gid);
1702 if (!session_info) {
1703 /* Allocation error or an internal error occurred. */
1704 goto error;
1705 }
1706
1707 new_channel_info = channel_info_create(channel_name, &channel_key,
1708 channel_capacity, session_info);
1709 if (!new_channel_info) {
1710 goto error;
1711 }
1712
1713 rcu_read_lock();
1714 /* Build a list of all triggers applying to the new channel. */
1715 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1716 node) {
1717 struct lttng_trigger_list_element *new_element;
1718
1719 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1720 new_channel_info)) {
1721 continue;
1722 }
1723
1724 new_element = zmalloc(sizeof(*new_element));
1725 if (!new_element) {
1726 rcu_read_unlock();
1727 goto error;
1728 }
1729 CDS_INIT_LIST_HEAD(&new_element->node);
1730 new_element->trigger = trigger_ht_element->trigger;
1731 cds_list_add(&new_element->node, &trigger_list);
1732 trigger_count++;
1733 }
1734 rcu_read_unlock();
1735
1736 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1737 trigger_count);
1738 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1739 if (!channel_trigger_list) {
1740 goto error;
1741 }
1742 channel_trigger_list->channel_key = new_channel_info->key;
1743 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1744 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1745 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1746
1747 rcu_read_lock();
1748 /* Add channel to the channel_ht which owns the channel_infos. */
1749 cds_lfht_add(state->channels_ht,
1750 hash_channel_key(&new_channel_info->key),
1751 &new_channel_info->channels_ht_node);
1752 /*
1753 * Add the list of triggers associated with this channel to the
1754 * channel_triggers_ht.
1755 */
1756 cds_lfht_add(state->channel_triggers_ht,
1757 hash_channel_key(&new_channel_info->key),
1758 &channel_trigger_list->channel_triggers_ht_node);
1759 rcu_read_unlock();
1760 session_info_put(session_info);
1761 *cmd_result = LTTNG_OK;
1762 return 0;
1763 error:
1764 channel_info_destroy(new_channel_info);
1765 session_info_put(session_info);
1766 return 1;
1767 }
1768
1769 static
1770 void free_channel_trigger_list_rcu(struct rcu_head *node)
1771 {
1772 free(caa_container_of(node, struct lttng_channel_trigger_list,
1773 rcu_node));
1774 }
1775
1776 static
1777 void free_channel_state_sample_rcu(struct rcu_head *node)
1778 {
1779 free(caa_container_of(node, struct channel_state_sample,
1780 rcu_node));
1781 }
1782
1783 static
1784 int handle_notification_thread_command_remove_channel(
1785 struct notification_thread_state *state,
1786 uint64_t channel_key, enum lttng_domain_type domain,
1787 enum lttng_error_code *cmd_result)
1788 {
1789 struct cds_lfht_node *node;
1790 struct cds_lfht_iter iter;
1791 struct lttng_channel_trigger_list *trigger_list;
1792 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1793 struct channel_key key = { .key = channel_key, .domain = domain };
1794 struct channel_info *channel_info;
1795
1796 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1797 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1798
1799 rcu_read_lock();
1800
1801 cds_lfht_lookup(state->channel_triggers_ht,
1802 hash_channel_key(&key),
1803 match_channel_trigger_list,
1804 &key,
1805 &iter);
1806 node = cds_lfht_iter_get_node(&iter);
1807 /*
1808 * There is a severe internal error if we are being asked to remove a
1809 * channel that doesn't exist.
1810 */
1811 if (!node) {
1812 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1813 goto end;
1814 }
1815
1816 /* Free the list of triggers associated with this channel. */
1817 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1818 channel_triggers_ht_node);
1819 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1820 &trigger_list->list, node) {
1821 cds_list_del(&trigger_list_element->node);
1822 free(trigger_list_element);
1823 }
1824 cds_lfht_del(state->channel_triggers_ht, node);
1825 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1826
1827 /* Free sampled channel state. */
1828 cds_lfht_lookup(state->channel_state_ht,
1829 hash_channel_key(&key),
1830 match_channel_state_sample,
1831 &key,
1832 &iter);
1833 node = cds_lfht_iter_get_node(&iter);
1834 /*
1835 * This is expected to be NULL if the channel is destroyed before we
1836 * received a sample.
1837 */
1838 if (node) {
1839 struct channel_state_sample *sample = caa_container_of(node,
1840 struct channel_state_sample,
1841 channel_state_ht_node);
1842
1843 cds_lfht_del(state->channel_state_ht, node);
1844 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1845 }
1846
1847 /* Remove the channel from the channels_ht and free it. */
1848 cds_lfht_lookup(state->channels_ht,
1849 hash_channel_key(&key),
1850 match_channel_info,
1851 &key,
1852 &iter);
1853 node = cds_lfht_iter_get_node(&iter);
1854 assert(node);
1855 channel_info = caa_container_of(node, struct channel_info,
1856 channels_ht_node);
1857 cds_lfht_del(state->channels_ht, node);
1858 channel_info_destroy(channel_info);
1859 end:
1860 rcu_read_unlock();
1861 *cmd_result = LTTNG_OK;
1862 return 0;
1863 }
1864
1865 static
1866 int handle_notification_thread_command_session_rotation(
1867 struct notification_thread_state *state,
1868 enum notification_thread_command_type cmd_type,
1869 const char *session_name, uid_t session_uid, gid_t session_gid,
1870 uint64_t trace_archive_chunk_id,
1871 struct lttng_trace_archive_location *location,
1872 enum lttng_error_code *_cmd_result)
1873 {
1874 int ret = 0;
1875 enum lttng_error_code cmd_result = LTTNG_OK;
1876 struct lttng_session_trigger_list *trigger_list;
1877 struct lttng_trigger_list_element *trigger_list_element;
1878 struct session_info *session_info;
1879
1880 rcu_read_lock();
1881
1882 session_info = find_or_create_session_info(state, session_name,
1883 session_uid, session_gid);
1884 if (!session_info) {
1885 /* Allocation error or an internal error occurred. */
1886 ret = -1;
1887 cmd_result = LTTNG_ERR_NOMEM;
1888 goto end;
1889 }
1890
1891 session_info->rotation.ongoing =
1892 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1893 session_info->rotation.id = trace_archive_chunk_id;
1894 trigger_list = get_session_trigger_list(state, session_name);
1895 if (!trigger_list) {
1896 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1897 session_name);
1898 goto end;
1899 }
1900
1901 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1902 node) {
1903 const struct lttng_condition *condition;
1904 const struct lttng_action *action;
1905 const struct lttng_trigger *trigger;
1906 struct notification_client_list *client_list;
1907 struct lttng_evaluation *evaluation = NULL;
1908 enum lttng_condition_type condition_type;
1909 bool client_list_is_empty;
1910
1911 trigger = trigger_list_element->trigger;
1912 condition = lttng_trigger_get_const_condition(trigger);
1913 assert(condition);
1914 condition_type = lttng_condition_get_type(condition);
1915
1916 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1917 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1918 continue;
1919 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1920 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1921 continue;
1922 }
1923
1924 action = lttng_trigger_get_const_action(trigger);
1925
1926 /* Notify actions are the only type currently supported. */
1927 assert(lttng_action_get_type_const(action) ==
1928 LTTNG_ACTION_TYPE_NOTIFY);
1929
1930 client_list = get_client_list_from_condition(state, condition);
1931 assert(client_list);
1932
1933 pthread_mutex_lock(&client_list->lock);
1934 client_list_is_empty = cds_list_empty(&client_list->list);
1935 pthread_mutex_unlock(&client_list->lock);
1936 if (client_list_is_empty) {
1937 /*
1938 * No clients interested in the evaluation's result,
1939 * skip it.
1940 */
1941 continue;
1942 }
1943
1944 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1945 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1946 trace_archive_chunk_id);
1947 } else {
1948 evaluation = lttng_evaluation_session_rotation_completed_create(
1949 trace_archive_chunk_id, location);
1950 }
1951
1952 if (!evaluation) {
1953 /* Internal error */
1954 ret = -1;
1955 cmd_result = LTTNG_ERR_UNK;
1956 goto put_list;
1957 }
1958
1959 /* Dispatch evaluation result to all clients. */
1960 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1961 evaluation, client_list, state,
1962 session_info->uid,
1963 session_info->gid);
1964 lttng_evaluation_destroy(evaluation);
1965 put_list:
1966 notification_client_list_put(client_list);
1967 if (caa_unlikely(ret)) {
1968 break;
1969 }
1970 }
1971 end:
1972 session_info_put(session_info);
1973 *_cmd_result = cmd_result;
1974 rcu_read_unlock();
1975 return ret;
1976 }
1977
1978 static
1979 int handle_notification_thread_command_add_application(
1980 struct notification_thread_handle *handle,
1981 struct notification_thread_state *state,
1982 int read_side_trigger_event_application_pipe,
1983 enum lttng_error_code *_cmd_result)
1984 {
1985 int ret = 0;
1986 enum lttng_error_code cmd_result = LTTNG_OK;
1987 struct notification_event_trigger_source_element *element = NULL;
1988
1989 element = zmalloc(sizeof(*element));
1990 if (!element) {
1991 cmd_result = LTTNG_ERR_NOMEM;
1992 ret = -1;
1993 goto end;
1994 }
1995
1996 CDS_INIT_LIST_HEAD(&element->node);
1997 element->fd = read_side_trigger_event_application_pipe;
1998
1999 pthread_mutex_lock(&handle->event_trigger_sources.lock);
2000 cds_list_add(&element->node, &handle->event_trigger_sources.list);
2001 pthread_mutex_unlock(&handle->event_trigger_sources.lock);
2002
2003 /* TODO: remove on failure to add to list? */
2004
2005 /* Adding the read side pipe to the event poll */
2006 ret = lttng_poll_add(&state->events,
2007 read_side_trigger_event_application_pipe,
2008 LPOLLIN | LPOLLERR);
2009
2010 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe);
2011 if (ret < 0) {
2012 /* TODO: what should be the value of cmd_result??? */
2013 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
2014 goto end;
2015 }
2016
2017 end:
2018 *_cmd_result = cmd_result;
2019 return ret;
2020 }
2021
2022 static
2023 int handle_notification_thread_command_remove_application(
2024 struct notification_thread_handle *handle,
2025 struct notification_thread_state *state,
2026 int read_side_trigger_event_application_pipe,
2027 enum lttng_error_code *_cmd_result)
2028 {
2029 int ret = 0;
2030 enum lttng_error_code cmd_result = LTTNG_OK;
2031
2032 /* TODO: missing a lock propably to revisit */
2033 struct notification_event_trigger_source_element *source_element, *tmp;
2034 cds_list_for_each_entry_safe(source_element, tmp,
2035 &handle->event_trigger_sources.list, node) {
2036 if (source_element->fd != read_side_trigger_event_application_pipe) {
2037 continue;
2038 }
2039
2040 DBG("[notification-thread] Removed event source from event source list");
2041 cds_list_del(&source_element->node);
2042 break;
2043 }
2044
2045 DBG3("[notification-thread] Removing application event source from fd: %d", read_side_trigger_event_application_pipe);
2046 /* Removing the read side pipe to the event poll */
2047 ret = lttng_poll_del(&state->events,
2048 read_side_trigger_event_application_pipe);
2049 if (ret < 0) {
2050 /* TODO: what should be the value of cmd_result??? */
2051 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
2052 goto end;
2053 }
2054
2055 end:
2056 *_cmd_result = cmd_result;
2057 return ret;
2058 }
2059
2060 static int handle_notification_thread_command_get_tokens(
2061 struct notification_thread_handle *handle,
2062 struct notification_thread_state *state,
2063 struct lttng_triggers **triggers,
2064 enum lttng_error_code *_cmd_result)
2065 {
2066 int ret = 0, i = 0;
2067 enum lttng_error_code cmd_result = LTTNG_OK;
2068 struct cds_lfht_iter iter;
2069 struct notification_trigger_tokens_ht_element *element;
2070 struct lttng_triggers *local_triggers = NULL;
2071
2072 local_triggers = lttng_triggers_create();
2073 if (!local_triggers) {
2074 cmd_result = LTTNG_ERR_NOMEM;
2075 goto end;
2076 }
2077
2078 rcu_read_lock();
2079 cds_lfht_for_each_entry (
2080 state->trigger_tokens_ht, &iter, element, node) {
2081 ret = lttng_triggers_add(local_triggers, element->trigger);
2082 if (ret < 0) {
2083 cmd_result = LTTNG_ERR_FATAL;
2084 ret = -1;
2085 goto end;
2086 }
2087
2088 /* Ownership is shared with the lttng_triggers object */
2089 lttng_trigger_get(element->trigger);
2090
2091 i++;
2092 }
2093
2094 /* Passing ownership up */
2095 *triggers = local_triggers;
2096 local_triggers = NULL;
2097
2098 end:
2099 rcu_read_unlock();
2100 lttng_triggers_destroy(local_triggers);
2101 *_cmd_result = cmd_result;
2102 return ret;
2103 }
2104
2105 static
2106 int handle_notification_thread_command_list_triggers(
2107 struct notification_thread_handle *handle,
2108 struct notification_thread_state *state,
2109 uid_t uid,
2110 gid_t gid,
2111 struct lttng_triggers **triggers,
2112 enum lttng_error_code *_cmd_result)
2113 {
2114 int ret = 0, i = 0;
2115 enum lttng_error_code cmd_result = LTTNG_OK;
2116 struct cds_lfht_iter iter;
2117 struct lttng_trigger_ht_element *trigger_ht_element;
2118 struct lttng_triggers *local_triggers = NULL;
2119 const struct lttng_credentials *creds;
2120
2121 long scb, sca;
2122 unsigned long count;
2123
2124 rcu_read_lock();
2125 cds_lfht_count_nodes(state->triggers_ht, &scb, &count, &sca);
2126
2127 /* TODO check downcasting */
2128 local_triggers = lttng_triggers_create();
2129 if (!local_triggers) {
2130 cmd_result = LTTNG_ERR_NOMEM;
2131 goto end;
2132 }
2133
2134 cds_lfht_for_each_entry (state->triggers_ht, &iter,
2135 trigger_ht_element, node) {
2136 /* Only return the trigger for which the requestion client have
2137 * access. For now the root user can only list its own
2138 * triggers.
2139 * TODO: root user behavior
2140 */
2141 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2142 if ((uid != creds->uid) || (gid != creds->gid)) {
2143 continue;
2144 }
2145
2146 ret = lttng_triggers_add(local_triggers, trigger_ht_element->trigger);
2147 if (ret < 0) {
2148 ret = -1;
2149 goto end;
2150 }
2151 /* Ownership is shared with the lttng_triggers object */
2152 lttng_trigger_get(trigger_ht_element->trigger);
2153
2154 i++;
2155 }
2156
2157 /* Passing ownership up */
2158 *triggers = local_triggers;
2159 local_triggers = NULL;
2160
2161 end:
2162 rcu_read_unlock();
2163 lttng_triggers_destroy(local_triggers);
2164 *_cmd_result = cmd_result;
2165 return ret;
2166 }
2167
2168 static
2169 int condition_is_supported(struct lttng_condition *condition)
2170 {
2171 int ret;
2172
2173 switch (lttng_condition_get_type(condition)) {
2174 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2175 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2176 {
2177 enum lttng_domain_type domain;
2178
2179 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2180 &domain);
2181 if (ret) {
2182 ret = -1;
2183 goto end;
2184 }
2185
2186 if (domain != LTTNG_DOMAIN_KERNEL) {
2187 ret = 1;
2188 goto end;
2189 }
2190
2191 /*
2192 * Older kernel tracers don't expose the API to monitor their
2193 * buffers. Therefore, we reject triggers that require that
2194 * mechanism to be available to be evaluated.
2195 */
2196 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2197 break;
2198 }
2199 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2200 {
2201 /* TODO:
2202 * Check for kernel support.
2203 * Check for ust support ??
2204 */
2205 ret = 1;
2206 break;
2207 }
2208 default:
2209 ret = 1;
2210 }
2211 end:
2212 return ret;
2213 }
2214
2215 static
2216 int action_is_supported(struct lttng_action *action)
2217 {
2218 int ret;
2219
2220 switch (lttng_action_get_type(action)) {
2221 case LTTNG_ACTION_TYPE_NOTIFY:
2222 case LTTNG_ACTION_TYPE_START_SESSION:
2223 case LTTNG_ACTION_TYPE_STOP_SESSION:
2224 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
2225 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
2226 {
2227 /* TODO validate that this is true for kernel in regards to
2228 * rotation and snapshot. Start stop is not a problem notify
2229 * either.
2230 */
2231 /* For now all type of actions are supported */
2232 ret = 1;
2233 break;
2234 }
2235 case LTTNG_ACTION_TYPE_GROUP:
2236 {
2237 /* TODO: Iterate over all internal actions and validate that
2238 * they are supported
2239 */
2240 ret = 1;
2241 break;
2242
2243 }
2244 default:
2245 ret = 1;
2246 }
2247
2248 return ret;
2249 }
2250
2251 /* Must be called with RCU read lock held. */
2252 static
2253 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2254 struct notification_thread_state *state)
2255 {
2256 int ret = 0;
2257 const struct lttng_condition *condition;
2258 const char *session_name;
2259 struct lttng_session_trigger_list *trigger_list;
2260
2261 condition = lttng_trigger_get_const_condition(trigger);
2262 switch (lttng_condition_get_type(condition)) {
2263 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2264 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2265 {
2266 enum lttng_condition_status status;
2267
2268 status = lttng_condition_session_rotation_get_session_name(
2269 condition, &session_name);
2270 if (status != LTTNG_CONDITION_STATUS_OK) {
2271 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2272 ret = -1;
2273 goto end;
2274 }
2275 break;
2276 }
2277 default:
2278 ret = -1;
2279 goto end;
2280 }
2281
2282 trigger_list = get_session_trigger_list(state, session_name);
2283 if (!trigger_list) {
2284 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2285 session_name);
2286 goto end;
2287
2288 }
2289
2290 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2291 session_name);
2292 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2293 end:
2294 return ret;
2295 }
2296
2297 /* Must be called with RCU read lock held. */
2298 static
2299 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2300 struct notification_thread_state *state)
2301 {
2302 int ret = 0;
2303 struct cds_lfht_node *node;
2304 struct cds_lfht_iter iter;
2305 struct channel_info *channel;
2306
2307 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2308 channels_ht_node) {
2309 struct lttng_trigger_list_element *trigger_list_element;
2310 struct lttng_channel_trigger_list *trigger_list;
2311 struct cds_lfht_iter lookup_iter;
2312
2313 if (!trigger_applies_to_channel(trigger, channel)) {
2314 continue;
2315 }
2316
2317 cds_lfht_lookup(state->channel_triggers_ht,
2318 hash_channel_key(&channel->key),
2319 match_channel_trigger_list,
2320 &channel->key,
2321 &lookup_iter);
2322 node = cds_lfht_iter_get_node(&lookup_iter);
2323 assert(node);
2324 trigger_list = caa_container_of(node,
2325 struct lttng_channel_trigger_list,
2326 channel_triggers_ht_node);
2327
2328 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2329 if (!trigger_list_element) {
2330 ret = -1;
2331 goto end;
2332 }
2333 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2334 trigger_list_element->trigger = trigger;
2335 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2336 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2337 channel->name);
2338 }
2339 end:
2340 return ret;
2341 }
2342
2343 static int action_notify_register_trigger(
2344 struct notification_thread_state *state,
2345 struct lttng_trigger *trigger)
2346 {
2347
2348 int ret = 0;
2349 struct lttng_condition *condition;
2350 struct notification_client *client;
2351 struct notification_client_list *client_list = NULL;
2352 struct cds_lfht_iter iter;
2353 struct notification_client_list_element *client_list_element, *tmp;
2354
2355 condition = lttng_trigger_get_condition(trigger);
2356 assert(condition);
2357
2358 client_list = notification_client_list_create(trigger);
2359 if (!client_list) {
2360 ret = -1;
2361 goto end;
2362 }
2363
2364 /* Build a list of clients to which this new trigger applies. */
2365 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2366 client_socket_ht_node) {
2367 if (!trigger_applies_to_client(trigger, client)) {
2368 continue;
2369 }
2370
2371 client_list_element = zmalloc(sizeof(*client_list_element));
2372 if (!client_list_element) {
2373 ret = -1;
2374 goto error_put_client_list;
2375 }
2376 CDS_INIT_LIST_HEAD(&client_list_element->node);
2377 client_list_element->client = client;
2378 cds_list_add(&client_list_element->node, &client_list->list);
2379 }
2380
2381 switch (get_condition_binding_object(condition)) {
2382 case LTTNG_OBJECT_TYPE_SESSION:
2383 /* Add the trigger to the list if it matches a known session. */
2384 ret = bind_trigger_to_matching_session(trigger, state);
2385 if (ret) {
2386 goto error_put_client_list;
2387 }
2388 break;
2389 case LTTNG_OBJECT_TYPE_CHANNEL:
2390 /*
2391 * Add the trigger to list of triggers bound to the channels
2392 * currently known.
2393 */
2394 ret = bind_trigger_to_matching_channels(trigger, state);
2395 if (ret) {
2396 goto error_put_client_list;
2397 }
2398 break;
2399 case LTTNG_OBJECT_TYPE_NONE:
2400 break;
2401 default:
2402 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2403 ret = -1;
2404 goto error_put_client_list;
2405 }
2406
2407 /*
2408 * Since there is nothing preventing clients from subscribing to a
2409 * condition before the corresponding trigger is registered, we have
2410 * to evaluate this new condition right away.
2411 *
2412 * At some point, we were waiting for the next "evaluation" (e.g. on
2413 * reception of a channel sample) to evaluate this new condition, but
2414 * that was broken.
2415 *
2416 * The reason it was broken is that waiting for the next sample
2417 * does not allow us to properly handle transitions for edge-triggered
2418 * conditions.
2419 *
2420 * Consider this example: when we handle a new channel sample, we
2421 * evaluate each conditions twice: once with the previous state, and
2422 * again with the newest state. We then use those two results to
2423 * determine whether a state change happened: a condition was false and
2424 * became true. If a state change happened, we have to notify clients.
2425 *
2426 * Now, if a client subscribes to a given notification and registers
2427 * a trigger *after* that subscription, we have to make sure the
2428 * condition is evaluated at this point while considering only the
2429 * current state. Otherwise, the next evaluation cycle may only see
2430 * that the evaluations remain the same (true for samples n-1 and n) and
2431 * the client will never know that the condition has been met.
2432 *
2433 * No need to lock the list here as it has not been published yet.
2434 */
2435 cds_list_for_each_entry_safe(client_list_element, tmp,
2436 &client_list->list, node) {
2437 ret = evaluate_condition_for_client(trigger, condition,
2438 client_list_element->client, state);
2439 if (ret) {
2440 goto error_put_client_list;
2441 }
2442 }
2443
2444 /*
2445 * Client list ownership transferred to the
2446 * notification_trigger_clients_ht.
2447 */
2448 publish_notification_client_list(state, client_list);
2449 client_list = NULL;
2450 error_put_client_list:
2451 notification_client_list_put(client_list);
2452 end:
2453 return ret;
2454 }
2455
2456 static
2457 bool trigger_name_taken(struct notification_thread_state *state, const char *name)
2458 {
2459 struct cds_lfht_node *triggers_by_name_ht_node;
2460 struct cds_lfht_iter iter;
2461 /* TODO change hashing for trigger */
2462 cds_lfht_lookup(state->triggers_by_name_ht,
2463 hash_key_str(name, lttng_ht_seed),
2464 match_str,
2465 name,
2466 &iter);
2467 triggers_by_name_ht_node = cds_lfht_iter_get_node(&iter);
2468 if (triggers_by_name_ht_node) {
2469 return true;
2470 } else {
2471 return false;
2472 }
2473
2474 }
2475 static
2476 void generate_trigger_name(struct notification_thread_state *state, struct lttng_trigger *trigger, const char **name)
2477 {
2478 /* Here the offset criteria guarantee an end. This will be a nice
2479 * bikeshedding conversation. I would simply generate uuid and use them
2480 * as trigger name.
2481 */
2482 bool taken = false;
2483 do {
2484 lttng_trigger_generate_name(trigger, state->trigger_id.name_offset);
2485 /* TODO error checking */
2486 lttng_trigger_get_name(trigger, name);
2487 taken = trigger_name_taken(state, *name);
2488 if (taken) {
2489 state->trigger_id.name_offset++;
2490 }
2491 } while (taken || state->trigger_id.name_offset == UINT32_MAX);
2492 }
2493
2494 static bool action_is_notify(const struct lttng_action *action)
2495 {
2496 /* TODO for action groups we need to iterate over all of them */
2497 enum lttng_action_type type = lttng_action_get_type_const(action);
2498 bool ret = false;
2499 enum lttng_action_status status;
2500 const struct lttng_action *tmp;
2501 unsigned int i, count;
2502
2503 switch (type) {
2504 case LTTNG_ACTION_TYPE_NOTIFY:
2505 ret = true;
2506 break;
2507 case LTTNG_ACTION_TYPE_GROUP:
2508 status = lttng_action_group_get_count(action, &count);
2509 if (status != LTTNG_ACTION_STATUS_OK) {
2510 assert(0);
2511 }
2512 for (i = 0; i < count; i++) {
2513 tmp = lttng_action_group_get_at_index_const(action, i);
2514 assert(tmp);
2515 ret = action_is_notify(tmp);
2516 if (ret) {
2517 break;
2518 }
2519 }
2520 break;
2521 default:
2522 ret = false;
2523 break;
2524 }
2525
2526 return ret;
2527 }
2528
2529 /*
2530 * TODO: REVIEW THIS COMMENT.
2531 * FIXME A client's credentials are not checked when registering a trigger, nor
2532 * are they stored alongside with the trigger.
2533 *
2534 * The effects of this are benign since:
2535 * - The client will succeed in registering the trigger, as it is valid,
2536 * - The trigger will, internally, be bound to the channel/session,
2537 * - The notifications will not be sent since the client's credentials
2538 * are checked against the channel at that moment.
2539 *
2540 * If this function returns a non-zero value, it means something is
2541 * fundamentally broken and the whole subsystem/thread will be torn down.
2542 *
2543 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2544 * error code.
2545 */
2546 static
2547 int handle_notification_thread_command_register_trigger(
2548 struct notification_thread_state *state,
2549 struct lttng_trigger *trigger,
2550 enum lttng_error_code *cmd_result)
2551 {
2552 int ret = 0;
2553 int is_supported;
2554 struct lttng_condition *condition;
2555 struct lttng_action *action;
2556 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2557 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2558 struct cds_lfht_node *node;
2559 const char* trigger_name;
2560 bool free_trigger = true;
2561
2562 assert(trigger->creds.set);
2563
2564 rcu_read_lock();
2565
2566 /* Set the trigger's key */
2567 lttng_trigger_set_key(trigger, state->trigger_id.token_generator);
2568
2569 if (lttng_trigger_get_name(trigger, &trigger_name) == LTTNG_TRIGGER_STATUS_UNSET) {
2570 generate_trigger_name(state, trigger, &trigger_name);
2571 } else if (trigger_name_taken(state, trigger_name)) {
2572 /* Not a fatal error */
2573 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2574 ret = 0;
2575 goto error;
2576 }
2577
2578 condition = lttng_trigger_get_condition(trigger);
2579 assert(condition);
2580
2581 action = lttng_trigger_get_action(trigger);
2582 assert(action);
2583
2584 is_supported = condition_is_supported(condition);
2585 if (is_supported < 0) {
2586 goto error;
2587 } else if (is_supported == 0) {
2588 ret = 0;
2589 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2590 goto error;
2591 }
2592
2593 is_supported = action_is_supported(action);
2594 if (is_supported < 0) {
2595 goto error;
2596 } else if (is_supported == 0) {
2597 ret = 0;
2598 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2599 goto error;
2600 }
2601
2602 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2603 if (!trigger_ht_element) {
2604 ret = -1;
2605 goto error;
2606 }
2607
2608 /* Add trigger to the trigger_ht. */
2609 cds_lfht_node_init(&trigger_ht_element->node);
2610 cds_lfht_node_init(&trigger_ht_element->node_by_name);
2611
2612 /*
2613 * This element own the trigger object from now own, this is why there
2614 * is no lttng_trigger_get here.
2615 * This thread is now the owner of the trigger object.
2616 */
2617 trigger_ht_element->trigger = trigger;
2618
2619 node = cds_lfht_add_unique(state->triggers_ht,
2620 lttng_condition_hash(condition),
2621 match_trigger,
2622 trigger,
2623 &trigger_ht_element->node);
2624 if (node != &trigger_ht_element->node) {
2625 /* Not a fatal error, simply report it to the client. */
2626 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2627 goto error_free_ht_element;
2628 }
2629
2630 node = cds_lfht_add_unique(state->triggers_by_name_ht,
2631 hash_key_str(trigger_name, lttng_ht_seed),
2632 match_str,
2633 trigger_name,
2634 &trigger_ht_element->node_by_name);
2635 if (node != &trigger_ht_element->node_by_name) {
2636 /* This should never happen */
2637 /* Not a fatal error, simply report it to the client. */
2638 /* TODO remove from the trigger_ht */
2639 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2640 goto error_free_ht_element;
2641 }
2642
2643 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2644 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2645 if (!trigger_tokens_ht_element) {
2646 ret = -1;
2647 goto error;
2648 }
2649
2650 /* Add trigger token to the trigger_tokens_ht. */
2651 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2652 trigger_tokens_ht_element->token = trigger->key.value;
2653 trigger_tokens_ht_element->trigger = trigger;
2654
2655 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2656 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2657 match_trigger_token,
2658 &trigger_tokens_ht_element->token,
2659 &trigger_tokens_ht_element->node);
2660 if (node != &trigger_tokens_ht_element->node) {
2661 /* TODO: THIS IS A FATAL ERROR... should never happen */
2662 /* Not a fatal error, simply report it to the client. */
2663 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2664 goto error_free_ht_element;
2665 }
2666 }
2667
2668 /*
2669 * Ownership of the trigger and of its wrapper was transfered to
2670 * the triggers_ht. Same for token ht element if necessary.
2671 */
2672 trigger_tokens_ht_element = NULL;
2673 trigger_ht_element = NULL;
2674 free_trigger = false;
2675
2676 if (action_is_notify(action)) {
2677 ret = action_notify_register_trigger(state, trigger);
2678 if (ret < 0) {
2679 /* TODO should cmd_result be set here? */
2680 ret = -1;
2681 goto error_free_ht_element;
2682 }
2683 }
2684
2685 /* Increment the trigger unique id generator */
2686 state->trigger_id.token_generator++;
2687 *cmd_result = LTTNG_OK;
2688
2689 error_free_ht_element:
2690 free(trigger_ht_element);
2691 free(trigger_tokens_ht_element);
2692 error:
2693 if (free_trigger) {
2694 lttng_trigger_destroy(trigger);
2695 }
2696 rcu_read_unlock();
2697 return ret;
2698 }
2699
2700 static
2701 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2702 {
2703 free(caa_container_of(node, struct lttng_trigger_ht_element,
2704 rcu_node));
2705 }
2706
2707 static
2708 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2709 {
2710 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2711 rcu_node));
2712 }
2713
2714 static
2715 int handle_notification_thread_command_unregister_trigger(
2716 struct notification_thread_state *state,
2717 struct lttng_trigger *trigger,
2718 enum lttng_error_code *_cmd_reply)
2719 {
2720 struct cds_lfht_iter iter;
2721 struct cds_lfht_node *triggers_ht_node;
2722 struct lttng_channel_trigger_list *trigger_list;
2723 struct notification_client_list *client_list;
2724 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2725 struct lttng_condition *condition = lttng_trigger_get_condition(
2726 trigger);
2727 struct lttng_action *action = lttng_trigger_get_action(trigger);
2728 enum lttng_error_code cmd_reply;
2729
2730 rcu_read_lock();
2731
2732 /* TODO change hashing for trigger */
2733 /* TODO Disabling for the root user is not complete, for now the root
2734 * user cannot disable the trigger from another user.
2735 */
2736 cds_lfht_lookup(state->triggers_ht,
2737 lttng_condition_hash(condition),
2738 match_trigger,
2739 trigger,
2740 &iter);
2741 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2742 if (!triggers_ht_node) {
2743 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2744 goto end;
2745 } else {
2746 cmd_reply = LTTNG_OK;
2747 }
2748
2749 /* Remove trigger from channel_triggers_ht. */
2750 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2751 channel_triggers_ht_node) {
2752 struct lttng_trigger_list_element *trigger_element, *tmp;
2753
2754 cds_list_for_each_entry_safe(trigger_element, tmp,
2755 &trigger_list->list, node) {
2756 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
2757 continue;
2758 }
2759
2760 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2761 cds_list_del(&trigger_element->node);
2762 /* A trigger can only appear once per channel */
2763 break;
2764 }
2765 }
2766
2767 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2768 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
2769 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter, trigger_tokens_ht_element,
2770 node) {
2771 if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
2772 continue;
2773 }
2774
2775 /* TODO talk to all app and remove it */
2776 DBG("[notification-thread] Removed trigger from tokens_ht");
2777 cds_lfht_del(state->trigger_tokens_ht,
2778 &trigger_tokens_ht_element->node);
2779 call_rcu(&trigger_tokens_ht_element->rcu_node, free_notification_trigger_tokens_ht_element_rcu);
2780
2781 break;
2782 }
2783 }
2784
2785 if (action_is_notify(action)) {
2786 /*
2787 * Remove and release the client list from
2788 * notification_trigger_clients_ht.
2789 */
2790 client_list = get_client_list_from_condition(state, condition);
2791 assert(client_list);
2792
2793 /* Put new reference and the hashtable's reference. */
2794 notification_client_list_put(client_list);
2795 notification_client_list_put(client_list);
2796 client_list = NULL;
2797 }
2798
2799 /* Remove trigger from triggers_ht. */
2800 trigger_ht_element = caa_container_of(triggers_ht_node,
2801 struct lttng_trigger_ht_element, node);
2802 cds_lfht_del(state->triggers_by_name_ht, &trigger_ht_element->node_by_name);
2803 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2804
2805 /* Release the ownership of the trigger */
2806 lttng_trigger_destroy(trigger_ht_element->trigger);
2807 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2808 end:
2809 rcu_read_unlock();
2810 if (_cmd_reply) {
2811 *_cmd_reply = cmd_reply;
2812 }
2813 return 0;
2814 }
2815
2816 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2817 int handle_notification_thread_command(
2818 struct notification_thread_handle *handle,
2819 struct notification_thread_state *state)
2820 {
2821 int ret;
2822 uint64_t counter;
2823 struct notification_thread_command *cmd;
2824
2825 /* Read the event pipe to put it back into a quiescent state. */
2826 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2827 sizeof(counter));
2828 if (ret != sizeof(counter)) {
2829 goto error;
2830 }
2831
2832 pthread_mutex_lock(&handle->cmd_queue.lock);
2833 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2834 struct notification_thread_command, cmd_list_node);
2835 switch (cmd->type) {
2836 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2837 DBG("[notification-thread] Received register trigger command");
2838 ret = handle_notification_thread_command_register_trigger(
2839 state, cmd->parameters.trigger,
2840 &cmd->reply_code);
2841 break;
2842 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2843 DBG("[notification-thread] Received unregister trigger command");
2844 ret = handle_notification_thread_command_unregister_trigger(
2845 state, cmd->parameters.trigger,
2846 &cmd->reply_code);
2847 break;
2848 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2849 DBG("[notification-thread] Received add channel command");
2850 ret = handle_notification_thread_command_add_channel(
2851 state,
2852 cmd->parameters.add_channel.session.name,
2853 cmd->parameters.add_channel.session.uid,
2854 cmd->parameters.add_channel.session.gid,
2855 cmd->parameters.add_channel.channel.name,
2856 cmd->parameters.add_channel.channel.domain,
2857 cmd->parameters.add_channel.channel.key,
2858 cmd->parameters.add_channel.channel.capacity,
2859 &cmd->reply_code);
2860 break;
2861 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2862 DBG("[notification-thread] Received remove channel command");
2863 ret = handle_notification_thread_command_remove_channel(
2864 state, cmd->parameters.remove_channel.key,
2865 cmd->parameters.remove_channel.domain,
2866 &cmd->reply_code);
2867 break;
2868 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2869 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2870 DBG("[notification-thread] Received session rotation %s command",
2871 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2872 "ongoing" : "completed");
2873 ret = handle_notification_thread_command_session_rotation(
2874 state,
2875 cmd->type,
2876 cmd->parameters.session_rotation.session_name,
2877 cmd->parameters.session_rotation.uid,
2878 cmd->parameters.session_rotation.gid,
2879 cmd->parameters.session_rotation.trace_archive_chunk_id,
2880 cmd->parameters.session_rotation.location,
2881 &cmd->reply_code);
2882 break;
2883 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION:
2884 ret = handle_notification_thread_command_add_application(
2885 handle,
2886 state,
2887 cmd->parameters.application.read_side_trigger_event_application_pipe,
2888 &cmd->reply_code);
2889 break;
2890 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION:
2891 ret = handle_notification_thread_command_remove_application(
2892 handle,
2893 state,
2894 cmd->parameters.application.read_side_trigger_event_application_pipe,
2895 &cmd->reply_code);
2896 break;
2897 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS:
2898 {
2899 struct lttng_triggers *triggers = NULL;
2900 ret = handle_notification_thread_command_get_tokens(
2901 handle, state, &triggers, &cmd->reply_code);
2902 cmd->reply.get_tokens.triggers = triggers;
2903 ret = 0;
2904 break;
2905
2906 cmd->reply_code = LTTNG_OK;
2907 ret = 0;
2908 break;
2909 }
2910 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2911 {
2912 struct lttng_triggers *triggers = NULL;
2913 ret = handle_notification_thread_command_list_triggers(
2914 handle,
2915 state,
2916 cmd->parameters.list_triggers.uid,
2917 cmd->parameters.list_triggers.gid,
2918 &triggers,
2919 &cmd->reply_code);
2920 cmd->reply.list_triggers.triggers = triggers;
2921 ret = 0;
2922 break;
2923 }
2924 case NOTIFICATION_COMMAND_TYPE_QUIT:
2925 DBG("[notification-thread] Received quit command");
2926 cmd->reply_code = LTTNG_OK;
2927 ret = 1;
2928 goto end;
2929 default:
2930 ERR("[notification-thread] Unknown internal command received");
2931 goto error_unlock;
2932 }
2933
2934 if (ret) {
2935 goto error_unlock;
2936 }
2937 end:
2938 cds_list_del(&cmd->cmd_list_node);
2939 if (cmd->is_async) {
2940 free(cmd);
2941 cmd = NULL;
2942 } else {
2943 lttng_waiter_wake_up(&cmd->reply_waiter);
2944 }
2945 pthread_mutex_unlock(&handle->cmd_queue.lock);
2946 return ret;
2947 error_unlock:
2948 /* Wake-up and return a fatal error to the calling thread. */
2949 lttng_waiter_wake_up(&cmd->reply_waiter);
2950 pthread_mutex_unlock(&handle->cmd_queue.lock);
2951 cmd->reply_code = LTTNG_ERR_FATAL;
2952 error:
2953 /* Indicate a fatal error to the caller. */
2954 return -1;
2955 }
2956
2957 static
2958 int socket_set_non_blocking(int socket)
2959 {
2960 int ret, flags;
2961
2962 /* Set the pipe as non-blocking. */
2963 ret = fcntl(socket, F_GETFL, 0);
2964 if (ret == -1) {
2965 PERROR("fcntl get socket flags");
2966 goto end;
2967 }
2968 flags = ret;
2969
2970 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2971 if (ret == -1) {
2972 PERROR("fcntl set O_NONBLOCK socket flag");
2973 goto end;
2974 }
2975 DBG("Client socket (fd = %i) set as non-blocking", socket);
2976 end:
2977 return ret;
2978 }
2979
2980 /* Client lock must be acquired by caller. */
2981 static
2982 int client_reset_inbound_state(struct notification_client *client)
2983 {
2984 int ret;
2985
2986 ASSERT_LOCKED(client->lock);
2987
2988 ret = lttng_dynamic_buffer_set_size(
2989 &client->communication.inbound.buffer, 0);
2990 assert(!ret);
2991
2992 client->communication.inbound.bytes_to_receive =
2993 sizeof(struct lttng_notification_channel_message);
2994 client->communication.inbound.msg_type =
2995 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
2996 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2997 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
2998 ret = lttng_dynamic_buffer_set_size(
2999 &client->communication.inbound.buffer,
3000 client->communication.inbound.bytes_to_receive);
3001 return ret;
3002 }
3003
3004 int handle_notification_thread_client_connect(
3005 struct notification_thread_state *state)
3006 {
3007 int ret;
3008 struct notification_client *client;
3009
3010 DBG("[notification-thread] Handling new notification channel client connection");
3011
3012 client = zmalloc(sizeof(*client));
3013 if (!client) {
3014 /* Fatal error. */
3015 ret = -1;
3016 goto error;
3017 }
3018 pthread_mutex_init(&client->lock, NULL);
3019 client->id = state->next_notification_client_id++;
3020 CDS_INIT_LIST_HEAD(&client->condition_list);
3021 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
3022 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
3023 client->communication.inbound.expect_creds = true;
3024
3025 pthread_mutex_lock(&client->lock);
3026 ret = client_reset_inbound_state(client);
3027 pthread_mutex_unlock(&client->lock);
3028 if (ret) {
3029 ERR("[notification-thread] Failed to reset client communication's inbound state");
3030 ret = 0;
3031 goto error;
3032 }
3033
3034 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3035 if (ret < 0) {
3036 ERR("[notification-thread] Failed to accept new notification channel client connection");
3037 ret = 0;
3038 goto error;
3039 }
3040
3041 client->socket = ret;
3042
3043 ret = socket_set_non_blocking(client->socket);
3044 if (ret) {
3045 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3046 goto error;
3047 }
3048
3049 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3050 if (ret < 0) {
3051 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3052 ret = 0;
3053 goto error;
3054 }
3055
3056 ret = lttng_poll_add(&state->events, client->socket,
3057 LPOLLIN | LPOLLERR |
3058 LPOLLHUP | LPOLLRDHUP);
3059 if (ret < 0) {
3060 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3061 ret = 0;
3062 goto error;
3063 }
3064 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3065 client->socket);
3066
3067 rcu_read_lock();
3068 cds_lfht_add(state->client_socket_ht,
3069 hash_client_socket(client->socket),
3070 &client->client_socket_ht_node);
3071 cds_lfht_add(state->client_id_ht,
3072 hash_client_id(client->id),
3073 &client->client_id_ht_node);
3074 rcu_read_unlock();
3075
3076 return ret;
3077 error:
3078 notification_client_destroy(client, state);
3079 return ret;
3080 }
3081
3082 /* RCU read-lock must be held by the caller. */
3083 static
3084 int notification_thread_client_disconnect(
3085 struct notification_client *client,
3086 struct notification_thread_state *state)
3087 {
3088 int ret;
3089 struct lttng_condition_list_element *condition_list_element, *tmp;
3090
3091 /* Acquire the client lock to disable its communication atomically. */
3092 pthread_mutex_lock(&client->lock);
3093 client->communication.active = false;
3094 ret = lttng_poll_del(&state->events, client->socket);
3095 if (ret) {
3096 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3097 client->socket);
3098 }
3099 pthread_mutex_unlock(&client->lock);
3100
3101 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3102 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3103
3104 /* Release all conditions to which the client was subscribed. */
3105 cds_list_for_each_entry_safe(condition_list_element, tmp,
3106 &client->condition_list, node) {
3107 (void) notification_thread_client_unsubscribe(client,
3108 condition_list_element->condition, state, NULL);
3109 }
3110
3111 /*
3112 * Client no longer accessible to other threads (through the
3113 * client lists).
3114 */
3115 notification_client_destroy(client, state);
3116 return ret;
3117 }
3118
3119 int handle_notification_thread_client_disconnect(
3120 int client_socket, struct notification_thread_state *state)
3121 {
3122 int ret = 0;
3123 struct notification_client *client;
3124
3125 rcu_read_lock();
3126 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3127 client_socket);
3128 client = get_client_from_socket(client_socket, state);
3129 if (!client) {
3130 /* Internal state corruption, fatal error. */
3131 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3132 client_socket);
3133 ret = -1;
3134 goto end;
3135 }
3136
3137 ret = notification_thread_client_disconnect(client, state);
3138 end:
3139 rcu_read_unlock();
3140 return ret;
3141 }
3142
3143 int handle_notification_thread_client_disconnect_all(
3144 struct notification_thread_state *state)
3145 {
3146 struct cds_lfht_iter iter;
3147 struct notification_client *client;
3148 bool error_encoutered = false;
3149
3150 rcu_read_lock();
3151 DBG("[notification-thread] Closing all client connections");
3152 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3153 client_socket_ht_node) {
3154 int ret;
3155
3156 ret = notification_thread_client_disconnect(
3157 client, state);
3158 if (ret) {
3159 error_encoutered = true;
3160 }
3161 }
3162 rcu_read_unlock();
3163 return error_encoutered ? 1 : 0;
3164 }
3165
3166 int handle_notification_thread_trigger_unregister_all(
3167 struct notification_thread_state *state)
3168 {
3169 bool error_occurred = false;
3170 struct cds_lfht_iter iter;
3171 struct lttng_trigger_ht_element *trigger_ht_element;
3172
3173 rcu_read_lock();
3174 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3175 node) {
3176 int ret = handle_notification_thread_command_unregister_trigger(
3177 state, trigger_ht_element->trigger, NULL);
3178 if (ret) {
3179 error_occurred = true;
3180 }
3181 }
3182 rcu_read_unlock();
3183 return error_occurred ? -1 : 0;
3184 }
3185
3186 static
3187 int client_handle_transmission_status(
3188 struct notification_client *client,
3189 enum client_transmission_status transmission_status,
3190 struct notification_thread_state *state)
3191 {
3192 int ret = 0;
3193
3194 ASSERT_LOCKED(client->lock);
3195
3196 switch (transmission_status) {
3197 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3198 ret = lttng_poll_mod(&state->events, client->socket,
3199 CLIENT_POLL_MASK_IN);
3200 if (ret) {
3201 goto end;
3202 }
3203
3204 client->communication.outbound.queued_command_reply = false;
3205 client->communication.outbound.dropped_notification = false;
3206 break;
3207 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3208 /*
3209 * We want to be notified whenever there is buffer space
3210 * available to send the rest of the payload.
3211 */
3212 ret = lttng_poll_mod(&state->events, client->socket,
3213 CLIENT_POLL_MASK_IN_OUT);
3214 if (ret) {
3215 goto end;
3216 }
3217 break;
3218 case CLIENT_TRANSMISSION_STATUS_FAIL:
3219 ret = notification_thread_client_disconnect(client, state);
3220 if (ret) {
3221 goto end;
3222 }
3223 break;
3224 case CLIENT_TRANSMISSION_STATUS_ERROR:
3225 ret = -1;
3226 goto end;
3227 default:
3228 abort();
3229 }
3230 end:
3231 return ret;
3232 }
3233
3234 /* Client lock must be acquired by caller. */
3235 static
3236 enum client_transmission_status client_flush_outgoing_queue(
3237 struct notification_client *client)
3238 {
3239 ssize_t ret;
3240 size_t to_send_count;
3241 enum client_transmission_status status;
3242
3243 ASSERT_LOCKED(client->lock);
3244
3245 assert(client->communication.outbound.buffer.size != 0);
3246 to_send_count = client->communication.outbound.buffer.size;
3247 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3248 client->socket);
3249
3250 ret = lttcomm_send_unix_sock_non_block(client->socket,
3251 client->communication.outbound.buffer.data,
3252 to_send_count);
3253 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
3254 (ret > 0 && ret < to_send_count)) {
3255 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3256 client->socket);
3257 to_send_count -= max(ret, 0);
3258
3259 memcpy(client->communication.outbound.buffer.data,
3260 client->communication.outbound.buffer.data +
3261 client->communication.outbound.buffer.size - to_send_count,
3262 to_send_count);
3263 ret = lttng_dynamic_buffer_set_size(
3264 &client->communication.outbound.buffer,
3265 to_send_count);
3266 if (ret) {
3267 goto error;
3268 }
3269 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3270 } else if (ret < 0) {
3271 /* Generic error, disconnect the client. */
3272 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3273 client->socket);
3274 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3275 } else {
3276 /* No error and flushed the queue completely. */
3277 ret = lttng_dynamic_buffer_set_size(
3278 &client->communication.outbound.buffer, 0);
3279 if (ret) {
3280 goto error;
3281 }
3282 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3283 }
3284
3285 return status;
3286 error:
3287 return CLIENT_TRANSMISSION_STATUS_ERROR;
3288 }
3289
3290 /* Client lock must be acquired by caller. */
3291 static
3292 int client_send_command_reply(struct notification_client *client,
3293 struct notification_thread_state *state,
3294 enum lttng_notification_channel_status status)
3295 {
3296 int ret;
3297 struct lttng_notification_channel_command_reply reply = {
3298 .status = (int8_t) status,
3299 };
3300 struct lttng_notification_channel_message msg = {
3301 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3302 .size = sizeof(reply),
3303 };
3304 char buffer[sizeof(msg) + sizeof(reply)];
3305 enum client_transmission_status transmission_status;
3306
3307 ASSERT_LOCKED(client->lock);
3308
3309 if (client->communication.outbound.queued_command_reply) {
3310 /* Protocol error. */
3311 goto error;
3312 }
3313
3314 memcpy(buffer, &msg, sizeof(msg));
3315 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3316 DBG("[notification-thread] Send command reply (%i)", (int) status);
3317
3318 /* Enqueue buffer to outgoing queue and flush it. */
3319 ret = lttng_dynamic_buffer_append(
3320 &client->communication.outbound.buffer,
3321 buffer, sizeof(buffer));
3322 if (ret) {
3323 goto error;
3324 }
3325
3326 transmission_status = client_flush_outgoing_queue(client);
3327 ret = client_handle_transmission_status(
3328 client, transmission_status, state);
3329 if (ret) {
3330 goto error;
3331 }
3332
3333 if (client->communication.outbound.buffer.size != 0) {
3334 /* Queue could not be emptied. */
3335 client->communication.outbound.queued_command_reply = true;
3336 }
3337
3338 return 0;
3339 error:
3340 return -1;
3341 }
3342
3343 static
3344 int client_handle_message_unknown(struct notification_client *client,
3345 struct notification_thread_state *state)
3346 {
3347 int ret;
3348
3349 pthread_mutex_lock(&client->lock);
3350
3351 /*
3352 * Receiving message header. The function will be called again
3353 * once the rest of the message as been received and can be
3354 * interpreted.
3355 */
3356 const struct lttng_notification_channel_message *msg;
3357
3358 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
3359 msg = (const struct lttng_notification_channel_message *)
3360 client->communication.inbound.buffer.data;
3361
3362 if (msg->size == 0 ||
3363 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3364 ERR("[notification-thread] Invalid notification channel message: length = %u",
3365 msg->size);
3366 ret = -1;
3367 goto end;
3368 }
3369
3370 switch (msg->type) {
3371 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3372 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3373 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3374 break;
3375 default:
3376 ret = -1;
3377 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3378 goto end;
3379 }
3380
3381 client->communication.inbound.bytes_to_receive = msg->size;
3382 client->communication.inbound.msg_type =
3383 (enum lttng_notification_channel_message_type) msg->type;
3384 ret = lttng_dynamic_buffer_set_size(
3385 &client->communication.inbound.buffer, msg->size);
3386 end:
3387 pthread_mutex_unlock(&client->lock);
3388 return ret;
3389 }
3390
3391 static
3392 int client_handle_message_handshake(struct notification_client *client,
3393 struct notification_thread_state *state)
3394 {
3395 int ret;
3396 struct lttng_notification_channel_command_handshake *handshake_client;
3397 const struct lttng_notification_channel_command_handshake handshake_reply = {
3398 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3399 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3400 };
3401 const struct lttng_notification_channel_message msg_header = {
3402 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3403 .size = sizeof(handshake_reply),
3404 };
3405 enum lttng_notification_channel_status status =
3406 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3407 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3408 enum client_transmission_status transmission_status;
3409
3410 pthread_mutex_lock(&client->lock);
3411
3412 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3413 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3414 sizeof(handshake_reply));
3415
3416 handshake_client =
3417 (struct lttng_notification_channel_command_handshake *)
3418 client->communication.inbound.buffer
3419 .data;
3420 client->major = handshake_client->major;
3421 client->minor = handshake_client->minor;
3422 if (!client->communication.inbound.creds_received) {
3423 ERR("[notification-thread] No credentials received from client");
3424 ret = -1;
3425 goto end;
3426 }
3427
3428 client->uid = LTTNG_SOCK_GET_UID_CRED(
3429 &client->communication.inbound.creds);
3430 client->gid = LTTNG_SOCK_GET_GID_CRED(
3431 &client->communication.inbound.creds);
3432 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3433 client->uid, client->gid, (int) client->major,
3434 (int) client->minor);
3435
3436 if (handshake_client->major !=
3437 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3438 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3439 }
3440
3441 ret = lttng_dynamic_buffer_append(
3442 &client->communication.outbound.buffer, send_buffer,
3443 sizeof(send_buffer));
3444 if (ret) {
3445 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3446 goto end;
3447 }
3448
3449 transmission_status = client_flush_outgoing_queue(client);
3450 ret = client_handle_transmission_status(
3451 client, transmission_status, state);
3452 if (ret) {
3453 goto end;
3454 }
3455
3456 ret = client_send_command_reply(client, state, status);
3457 if (ret) {
3458 ERR("[notification-thread] Failed to send reply to notification channel client");
3459 goto end;
3460 }
3461
3462 /* Set reception state to receive the next message header. */
3463 ret = client_reset_inbound_state(client);
3464 if (ret) {
3465 ERR("[notification-thread] Failed to reset client communication's inbound state");
3466 goto end;
3467 }
3468 client->validated = true;
3469 client->communication.active = true;
3470
3471 end:
3472 pthread_mutex_unlock(&client->lock);
3473 return ret;
3474 }
3475
3476 static
3477 int client_handle_message_subscription(
3478 struct notification_client *client,
3479 enum lttng_notification_channel_message_type msg_type,
3480 struct notification_thread_state *state)
3481 {
3482 int ret;
3483 struct lttng_condition *condition;
3484 enum lttng_notification_channel_status status =
3485 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3486 const struct lttng_buffer_view condition_view =
3487 lttng_buffer_view_from_dynamic_buffer(
3488 &client->communication.inbound.buffer,
3489 0, -1);
3490 size_t expected_condition_size;
3491
3492 pthread_mutex_lock(&client->lock);
3493 expected_condition_size = client->communication.inbound.buffer.size;
3494 pthread_mutex_unlock(&client->lock);
3495
3496 ret = lttng_condition_create_from_buffer(&condition_view, &condition);
3497 if (ret != expected_condition_size) {
3498 ERR("[notification-thread] Malformed condition received from client");
3499 goto end;
3500 }
3501
3502 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3503 ret = notification_thread_client_subscribe(
3504 client, condition, state, &status);
3505 } else {
3506 ret = notification_thread_client_unsubscribe(
3507 client, condition, state, &status);
3508 }
3509 if (ret) {
3510 goto end;
3511 }
3512
3513 pthread_mutex_lock(&client->lock);
3514 ret = client_send_command_reply(client, state, status);
3515 if (ret) {
3516 ERR("[notification-thread] Failed to send reply to notification channel client");
3517 goto end_unlock;
3518 }
3519
3520 /* Set reception state to receive the next message header. */
3521 ret = client_reset_inbound_state(client);
3522 if (ret) {
3523 ERR("[notification-thread] Failed to reset client communication's inbound state");
3524 goto end_unlock;
3525 }
3526
3527 end_unlock:
3528 pthread_mutex_unlock(&client->lock);
3529 end:
3530 return ret;
3531 }
3532
3533 static
3534 int client_dispatch_message(struct notification_client *client,
3535 struct notification_thread_state *state)
3536 {
3537 int ret = 0;
3538
3539 if (client->communication.inbound.msg_type !=
3540 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3541 client->communication.inbound.msg_type !=
3542 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3543 !client->validated) {
3544 WARN("[notification-thread] client attempted a command before handshake");
3545 ret = -1;
3546 goto end;
3547 }
3548
3549 switch (client->communication.inbound.msg_type) {
3550 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3551 {
3552 ret = client_handle_message_unknown(client, state);
3553 break;
3554 }
3555 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3556 {
3557 ret = client_handle_message_handshake(client, state);
3558 break;
3559 }
3560 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3561 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3562 {
3563 ret = client_handle_message_subscription(client,
3564 client->communication.inbound.msg_type, state);
3565 break;
3566 }
3567 default:
3568 abort();
3569 }
3570 end:
3571 return ret;
3572 }
3573
3574 /* Incoming data from client. */
3575 int handle_notification_thread_client_in(
3576 struct notification_thread_state *state, int socket)
3577 {
3578 int ret = 0;
3579 struct notification_client *client;
3580 ssize_t recv_ret;
3581 size_t offset;
3582 bool message_is_complete = false;
3583
3584 client = get_client_from_socket(socket, state);
3585 if (!client) {
3586 /* Internal error, abort. */
3587 ret = -1;
3588 goto end;
3589 }
3590
3591 pthread_mutex_lock(&client->lock);
3592 offset = client->communication.inbound.buffer.size -
3593 client->communication.inbound.bytes_to_receive;
3594 if (client->communication.inbound.expect_creds) {
3595 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3596 client->communication.inbound.buffer.data + offset,
3597 client->communication.inbound.bytes_to_receive,
3598 &client->communication.inbound.creds);
3599 if (recv_ret > 0) {
3600 client->communication.inbound.expect_creds = false;
3601 client->communication.inbound.creds_received = true;
3602 }
3603 } else {
3604 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3605 client->communication.inbound.buffer.data + offset,
3606 client->communication.inbound.bytes_to_receive);
3607 }
3608 if (recv_ret >= 0) {
3609 client->communication.inbound.bytes_to_receive -= recv_ret;
3610 message_is_complete = client->communication.inbound
3611 .bytes_to_receive == 0;
3612 }
3613 pthread_mutex_unlock(&client->lock);
3614 if (recv_ret < 0) {
3615 goto error_disconnect_client;
3616 }
3617
3618 if (message_is_complete) {
3619 ret = client_dispatch_message(client, state);
3620 if (ret) {
3621 /*
3622 * Only returns an error if this client must be
3623 * disconnected.
3624 */
3625 goto error_disconnect_client;
3626 }
3627 }
3628 end:
3629 return ret;
3630 error_disconnect_client:
3631 ret = notification_thread_client_disconnect(client, state);
3632 return ret;
3633 }
3634
3635 /* Client ready to receive outgoing data. */
3636 int handle_notification_thread_client_out(
3637 struct notification_thread_state *state, int socket)
3638 {
3639 int ret;
3640 struct notification_client *client;
3641 enum client_transmission_status transmission_status;
3642
3643 client = get_client_from_socket(socket, state);
3644 if (!client) {
3645 /* Internal error, abort. */
3646 ret = -1;
3647 goto end;
3648 }
3649
3650 pthread_mutex_lock(&client->lock);
3651 transmission_status = client_flush_outgoing_queue(client);
3652 ret = client_handle_transmission_status(
3653 client, transmission_status, state);
3654 pthread_mutex_unlock(&client->lock);
3655 if (ret) {
3656 goto end;
3657 }
3658 end:
3659 return ret;
3660 }
3661
3662 static
3663 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3664 const struct channel_state_sample *sample,
3665 uint64_t buffer_capacity)
3666 {
3667 bool result = false;
3668 uint64_t threshold;
3669 enum lttng_condition_type condition_type;
3670 const struct lttng_condition_buffer_usage *use_condition = container_of(
3671 condition, struct lttng_condition_buffer_usage,
3672 parent);
3673
3674 if (use_condition->threshold_bytes.set) {
3675 threshold = use_condition->threshold_bytes.value;
3676 } else {
3677 /*
3678 * Threshold was expressed as a ratio.
3679 *
3680 * TODO the threshold (in bytes) of conditions expressed
3681 * as a ratio of total buffer size could be cached to
3682 * forego this double-multiplication or it could be performed
3683 * as fixed-point math.
3684 *
3685 * Note that caching should accomodate the case where the
3686 * condition applies to multiple channels (i.e. don't assume
3687 * that all channels matching my_chann* have the same size...)
3688 */
3689 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3690 (double) buffer_capacity);
3691 }
3692
3693 condition_type = lttng_condition_get_type(condition);
3694 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3695 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3696 threshold, sample->highest_usage);
3697
3698 /*
3699 * The low condition should only be triggered once _all_ of the
3700 * streams in a channel have gone below the "low" threshold.
3701 */
3702 if (sample->highest_usage <= threshold) {
3703 result = true;
3704 }
3705 } else {
3706 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3707 threshold, sample->highest_usage);
3708
3709 /*
3710 * For high buffer usage scenarios, we want to trigger whenever
3711 * _any_ of the streams has reached the "high" threshold.
3712 */
3713 if (sample->highest_usage >= threshold) {
3714 result = true;
3715 }
3716 }
3717
3718 return result;
3719 }
3720
3721 static
3722 bool evaluate_session_consumed_size_condition(
3723 const struct lttng_condition *condition,
3724 uint64_t session_consumed_size)
3725 {
3726 uint64_t threshold;
3727 const struct lttng_condition_session_consumed_size *size_condition =
3728 container_of(condition,
3729 struct lttng_condition_session_consumed_size,
3730 parent);
3731
3732 threshold = size_condition->consumed_threshold_bytes.value;
3733 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3734 threshold, session_consumed_size);
3735 return session_consumed_size >= threshold;
3736 }
3737
3738 static
3739 int evaluate_buffer_condition(const struct lttng_condition *condition,
3740 struct lttng_evaluation **evaluation,
3741 const struct notification_thread_state *state,
3742 const struct channel_state_sample *previous_sample,
3743 const struct channel_state_sample *latest_sample,
3744 uint64_t previous_session_consumed_total,
3745 uint64_t latest_session_consumed_total,
3746 struct channel_info *channel_info)
3747 {
3748 int ret = 0;
3749 enum lttng_condition_type condition_type;
3750 const bool previous_sample_available = !!previous_sample;
3751 bool previous_sample_result = false;
3752 bool latest_sample_result;
3753
3754 condition_type = lttng_condition_get_type(condition);
3755
3756 switch (condition_type) {
3757 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3758 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3759 if (caa_likely(previous_sample_available)) {
3760 previous_sample_result =
3761 evaluate_buffer_usage_condition(condition,
3762 previous_sample, channel_info->capacity);
3763 }
3764 latest_sample_result = evaluate_buffer_usage_condition(
3765 condition, latest_sample,
3766 channel_info->capacity);
3767 break;
3768 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3769 if (caa_likely(previous_sample_available)) {
3770 previous_sample_result =
3771 evaluate_session_consumed_size_condition(
3772 condition,
3773 previous_session_consumed_total);
3774 }
3775 latest_sample_result =
3776 evaluate_session_consumed_size_condition(
3777 condition,
3778 latest_session_consumed_total);
3779 break;
3780 default:
3781 /* Unknown condition type; internal error. */
3782 abort();
3783 }
3784
3785 if (!latest_sample_result ||
3786 (previous_sample_result == latest_sample_result)) {
3787 /*
3788 * Only trigger on a condition evaluation transition.
3789 *
3790 * NOTE: This edge-triggered logic may not be appropriate for
3791 * future condition types.
3792 */
3793 goto end;
3794 }
3795
3796 if (!evaluation || !latest_sample_result) {
3797 goto end;
3798 }
3799
3800 switch (condition_type) {
3801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3802 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3803 *evaluation = lttng_evaluation_buffer_usage_create(
3804 condition_type,
3805 latest_sample->highest_usage,
3806 channel_info->capacity);
3807 break;
3808 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3809 *evaluation = lttng_evaluation_session_consumed_size_create(
3810 latest_session_consumed_total);
3811 break;
3812 default:
3813 abort();
3814 }
3815
3816 if (!*evaluation) {
3817 ret = -1;
3818 goto end;
3819 }
3820 end:
3821 return ret;
3822 }
3823
3824 static
3825 int client_notification_overflow(struct notification_client *client)
3826 {
3827 int ret = 0;
3828 const struct lttng_notification_channel_message msg = {
3829 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3830 };
3831
3832 ASSERT_LOCKED(client->lock);
3833
3834 DBG("Dropping notification addressed to client (socket fd = %i)",
3835 client->socket);
3836 if (client->communication.outbound.dropped_notification) {
3837 /*
3838 * The client already has a "notification dropped" message
3839 * in its outgoing queue. Nothing to do since all
3840 * of those messages are coalesced.
3841 */
3842 goto end;
3843 }
3844
3845 client->communication.outbound.dropped_notification = true;
3846 ret = lttng_dynamic_buffer_append(
3847 &client->communication.outbound.buffer, &msg,
3848 sizeof(msg));
3849 if (ret) {
3850 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3851 client->socket);
3852 }
3853 end:
3854 return ret;
3855 }
3856
3857 static int client_handle_transmission_status_wrapper(
3858 struct notification_client *client,
3859 enum client_transmission_status status,
3860 void *user_data)
3861 {
3862 return client_handle_transmission_status(client, status,
3863 (struct notification_thread_state *) user_data);
3864 }
3865
3866 static
3867 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3868 const struct lttng_evaluation *evaluation,
3869 struct notification_client_list* client_list,
3870 struct notification_thread_state *state,
3871 uid_t object_uid, gid_t object_gid)
3872 {
3873 return notification_client_list_send_evaluation(client_list,
3874 lttng_trigger_get_const_condition(trigger), evaluation,
3875 lttng_trigger_get_credentials(trigger),
3876 &(struct lttng_credentials){
3877 .uid = object_uid, .gid = object_gid},
3878 client_handle_transmission_status_wrapper, state);
3879 }
3880
3881 LTTNG_HIDDEN
3882 int notification_client_list_send_evaluation(
3883 struct notification_client_list *client_list,
3884 const struct lttng_condition *condition,
3885 const struct lttng_evaluation *evaluation,
3886 const struct lttng_credentials *trigger_creds,
3887 const struct lttng_credentials *source_object_creds,
3888 report_client_transmission_result_cb client_report,
3889 void *user_data)
3890 {
3891 int ret = 0;
3892 struct lttng_dynamic_buffer msg_buffer;
3893 struct notification_client_list_element *client_list_element, *tmp;
3894 const struct lttng_notification notification = {
3895 .condition = (struct lttng_condition *) condition,
3896 .evaluation = (struct lttng_evaluation *) evaluation,
3897 };
3898 struct lttng_notification_channel_message msg_header = {
3899 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3900 };
3901
3902 lttng_dynamic_buffer_init(&msg_buffer);
3903
3904 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
3905 sizeof(msg_header));
3906 if (ret) {
3907 goto end;
3908 }
3909
3910 ret = lttng_notification_serialize(&notification, &msg_buffer);
3911 if (ret) {
3912 ERR("[notification-thread] Failed to serialize notification");
3913 ret = -1;
3914 goto end;
3915 }
3916
3917 /* Update payload size. */
3918 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
3919 (uint32_t) (msg_buffer.size - sizeof(msg_header));
3920
3921 pthread_mutex_lock(&client_list->lock);
3922 cds_list_for_each_entry_safe(client_list_element, tmp,
3923 &client_list->list, node) {
3924 enum client_transmission_status transmission_status;
3925 struct notification_client *client =
3926 client_list_element->client;
3927
3928 ret = 0;
3929 pthread_mutex_lock(&client->lock);
3930 if (source_object_creds) {
3931 if (client->uid != source_object_creds->uid &&
3932 client->gid != source_object_creds->gid &&
3933 client->uid != 0) {
3934 /*
3935 * Client is not allowed to monitor this
3936 * object.
3937 */
3938 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3939 goto unlock_client;
3940 }
3941 }
3942
3943 /* TODO: what is the behavior for root client on non root
3944 * trigger? Since multiple triggers (different user) can have the same condition
3945 * but with different action group that can have each a notify.
3946 * Does the root client receive multiple notification for all
3947 * those triggers with the same condition or only notification
3948 * for triggers the root user configured?
3949 * For now we do the later. All users including the root user
3950 * can only receive notification from trigger it registered.
3951 */
3952 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3953 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3954 goto unlock_client;
3955 }
3956
3957 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3958 client->socket, msg_buffer.size);
3959 if (client->communication.outbound.buffer.size) {
3960 /*
3961 * Outgoing data is already buffered for this client;
3962 * drop the notification and enqueue a "dropped
3963 * notification" message if this is the first dropped
3964 * notification since the socket spilled-over to the
3965 * queue.
3966 */
3967 ret = client_notification_overflow(client);
3968 if (ret) {
3969 goto unlock_client;
3970 }
3971 }
3972
3973 ret = lttng_dynamic_buffer_append_buffer(
3974 &client->communication.outbound.buffer,
3975 &msg_buffer);
3976 if (ret) {
3977 goto unlock_client;
3978 }
3979
3980 transmission_status = client_flush_outgoing_queue(client);
3981 ret = client_report(client, transmission_status, user_data);
3982 if (ret) {
3983 goto unlock_client;
3984 }
3985 unlock_client:
3986 pthread_mutex_unlock(&client->lock);
3987 if (ret) {
3988 goto end_unlock_list;
3989 }
3990 }
3991 ret = 0;
3992
3993 end_unlock_list:
3994 pthread_mutex_unlock(&client_list->lock);
3995 end:
3996 lttng_dynamic_buffer_reset(&msg_buffer);
3997 return ret;
3998 }
3999
4000 int perform_event_action_notify(struct notification_thread_state *state,
4001 const struct lttng_trigger *trigger,
4002 const struct lttng_trigger_notification *notification,
4003 const struct lttng_action *action)
4004 {
4005 int ret;
4006 struct notification_client_list *client_list;
4007 struct lttng_evaluation *evaluation = NULL;
4008 const struct lttng_credentials *creds = lttng_trigger_get_credentials(trigger);
4009
4010 /*
4011 * Check if any client is subscribed to the result of this
4012 * evaluation.
4013 */
4014 client_list = get_client_list_from_condition(state, trigger->condition);
4015 assert(client_list);
4016 if (cds_list_empty(&client_list->list)) {
4017 ret = 0;
4018 goto end;
4019 }
4020
4021 evaluation = lttng_evaluation_event_rule_create(trigger->name);
4022 if (!evaluation) {
4023 ERR("Failed to create event rule hit evaluation");
4024 ret = -1;
4025 goto end;
4026 }
4027
4028 /* Dispatch evaluation result to all clients.
4029 * Note that here the passed credentials are the one from the trigger,
4030 * this is because there is no internal object binded to the trigger per
4031 * see and the credential validation is done at the registration level
4032 * for the event rule based trigger. For a channel the credential
4033 * validation can only be done on notify since the trigger can be
4034 * registered before the channel/session creation.
4035 */
4036 ret = send_evaluation_to_clients(trigger,
4037 evaluation, client_list, state,
4038 creds->uid,
4039 creds->gid);
4040 lttng_evaluation_destroy(evaluation);
4041 if (caa_unlikely(ret)) {
4042 goto end;
4043 }
4044 end:
4045 return ret;
4046
4047 }
4048
4049 /* This can be called recursively, pass NULL for action on the first iteration */
4050 int perform_event_action(struct notification_thread_state *state,
4051 const struct lttng_trigger *trigger,
4052 const struct lttng_trigger_notification *notification,
4053 const struct lttng_action *action)
4054 {
4055 int ret = 0;
4056 enum lttng_action_type action_type;
4057
4058 assert(trigger);
4059
4060 if (!action) {
4061 action = lttng_trigger_get_const_action(trigger);
4062 }
4063
4064 action_type = lttng_action_get_type_const(action);
4065 DBG("Handling action %s for trigger id %s (%" PRIu64 ")",
4066 lttng_action_type_string(action_type), trigger->name,
4067 trigger->key.value);
4068
4069 switch (action_type) {
4070 case LTTNG_ACTION_TYPE_GROUP:
4071 {
4072 /* Recurse into the group */
4073 const struct lttng_action *tmp = NULL;
4074 unsigned int count = 0;
4075 (void) lttng_action_group_get_count(action, &count);
4076 for (int i = 0; i < count; i++) {
4077 tmp = lttng_action_group_get_at_index_const(action, i);
4078 assert(tmp);
4079 ret = perform_event_action(state, trigger, notification, tmp);
4080 if (ret < 0) {
4081 goto end;
4082 }
4083 }
4084 break;
4085 }
4086 case LTTNG_ACTION_TYPE_NOTIFY:
4087 {
4088 ret = perform_event_action_notify(state, trigger, notification, action);
4089 break;
4090 }
4091 default:
4092 break;
4093 }
4094 end:
4095 return ret;
4096 }
4097
4098 int handle_notification_thread_event(struct notification_thread_state *state,
4099 int pipe,
4100 enum lttng_domain_type domain)
4101 {
4102 int ret;
4103 struct lttng_ust_trigger_notification ust_notification;
4104 uint64_t kernel_notification;
4105 struct cds_lfht_node *node;
4106 struct cds_lfht_iter iter;
4107 struct notification_trigger_tokens_ht_element *element;
4108 struct lttng_trigger_notification notification;
4109 void *reception_buffer;
4110 size_t reception_size;
4111 enum action_executor_status executor_status;
4112 struct notification_client_list *client_list = NULL;
4113
4114 notification.type = domain;
4115
4116 switch(domain) {
4117 case LTTNG_DOMAIN_UST:
4118 reception_buffer = (void *) &ust_notification;
4119 reception_size = sizeof(ust_notification);
4120 notification.u.ust = &ust_notification;
4121 break;
4122 case LTTNG_DOMAIN_KERNEL:
4123 reception_buffer = (void *) &kernel_notification;
4124 reception_size = sizeof(kernel_notification);
4125 notification.u.kernel = &kernel_notification;
4126 break;
4127 default:
4128 assert(0);
4129 }
4130
4131 /*
4132 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4133 * ensuring that read/write of sampling messages are atomic.
4134 */
4135 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4136
4137 ret = lttng_read(pipe, reception_buffer, reception_size);
4138 if (ret != reception_size) {
4139 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4140 pipe);
4141 /* TODO: Should this error out completly.
4142 * This can happen when an app is killed as of today
4143 * ret = -1 cause the whole thread to die and fuck up
4144 * everything.
4145 */
4146 goto end;
4147 }
4148
4149 switch(domain) {
4150 case LTTNG_DOMAIN_UST:
4151 notification.id = ust_notification.id;
4152 break;
4153 case LTTNG_DOMAIN_KERNEL:
4154 notification.id = kernel_notification;
4155 break;
4156 default:
4157 assert(0);
4158 }
4159
4160 /* Find triggers associated with this token. */
4161 rcu_read_lock();
4162 cds_lfht_lookup(state->trigger_tokens_ht,
4163 hash_key_u64(&notification.id, lttng_ht_seed), match_trigger_token,
4164 &notification.id, &iter);
4165 node = cds_lfht_iter_get_node(&iter);
4166 if (caa_unlikely(!node)) {
4167 /* TODO: is this an error? This might happen if the receive side
4168 * is slow to process event from source and that the trigger was
4169 * removed but the app still kicking. This yield another
4170 * question on the trigger lifetime and when we can remove a
4171 * trigger. How to guarantee that all event with the token idea
4172 * have be processed? Do we want to provide this guarantee?
4173 *
4174 * Update: I have encountered this when using a trigger on
4175 * sched_switch and then removing it. The frequency is quite
4176 * high hence we en up exactly in the mentionned scenario.
4177 * AFAIK this might be the best way to handle this.
4178 */
4179 ret = 0;
4180 goto end_unlock;
4181 }
4182 element = caa_container_of(node,
4183 struct notification_trigger_tokens_ht_element,
4184 node);
4185
4186 if (!lttng_trigger_is_ready_to_fire(element->trigger)) {
4187 ret = 0;
4188 goto end_unlock;
4189 }
4190
4191 client_list = get_client_list_from_condition(state,
4192 lttng_trigger_get_const_condition(element->trigger));
4193 executor_status = action_executor_enqueue(
4194 state->executor, element->trigger, client_list);
4195 switch (executor_status) {
4196 case ACTION_EXECUTOR_STATUS_OK:
4197 ret = 0;
4198 break;
4199 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4200 {
4201 struct notification_client_list_element *client_list_element,
4202 *tmp;
4203
4204 /*
4205 * Not a fatal error; this is expected and simply means the
4206 * executor has too much work queued already.
4207 */
4208 ret = 0;
4209
4210 if (!client_list) {
4211 break;
4212 }
4213
4214 /* Warn clients that a notification (or more) was dropped. */
4215 pthread_mutex_lock(&client_list->lock);
4216 cds_list_for_each_entry_safe(client_list_element, tmp,
4217 &client_list->list, node) {
4218 enum client_transmission_status transmission_status;
4219 struct notification_client *client =
4220 client_list_element->client;
4221
4222 pthread_mutex_lock(&client->lock);
4223 ret = client_notification_overflow(client);
4224 if (ret) {
4225 /* Fatal error. */
4226 goto next_client;
4227 }
4228
4229 transmission_status =
4230 client_flush_outgoing_queue(client);
4231 ret = client_handle_transmission_status(
4232 client, transmission_status, state);
4233 if (ret) {
4234 /* Fatal error. */
4235 goto next_client;
4236 }
4237 next_client:
4238 pthread_mutex_unlock(&client->lock);
4239 if (ret) {
4240 break;
4241 }
4242 }
4243 pthread_mutex_lock(&client_list->lock);
4244 break;
4245 }
4246 case ACTION_EXECUTOR_STATUS_ERROR:
4247 /* Fatal error, shut down everything. */
4248 ERR("Fatal error encoutered while enqueuing action");
4249 ret = -1;
4250 goto end_unlock;
4251 default:
4252 /* Unhandled error. */
4253 abort();
4254 }
4255
4256 end_unlock:
4257 notification_client_list_put(client_list);
4258 rcu_read_unlock();
4259 end:
4260 return ret;
4261 }
4262
4263 int handle_notification_thread_channel_sample(
4264 struct notification_thread_state *state, int pipe,
4265 enum lttng_domain_type domain)
4266 {
4267 int ret = 0;
4268 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4269 struct channel_info *channel_info;
4270 struct cds_lfht_node *node;
4271 struct cds_lfht_iter iter;
4272 struct lttng_channel_trigger_list *trigger_list;
4273 struct lttng_trigger_list_element *trigger_list_element;
4274 bool previous_sample_available = false;
4275 struct channel_state_sample previous_sample, latest_sample;
4276 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4277
4278 /*
4279 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4280 * ensuring that read/write of sampling messages are atomic.
4281 */
4282 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4283 if (ret != sizeof(sample_msg)) {
4284 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4285 pipe);
4286 ret = -1;
4287 goto end;
4288 }
4289
4290 ret = 0;
4291 latest_sample.key.key = sample_msg.key;
4292 latest_sample.key.domain = domain;
4293 latest_sample.highest_usage = sample_msg.highest;
4294 latest_sample.lowest_usage = sample_msg.lowest;
4295 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4296
4297 rcu_read_lock();
4298
4299 /* Retrieve the channel's informations */
4300 cds_lfht_lookup(state->channels_ht,
4301 hash_channel_key(&latest_sample.key),
4302 match_channel_info,
4303 &latest_sample.key,
4304 &iter);
4305 node = cds_lfht_iter_get_node(&iter);
4306 if (caa_unlikely(!node)) {
4307 /*
4308 * Not an error since the consumer can push a sample to the pipe
4309 * and the rest of the session daemon could notify us of the
4310 * channel's destruction before we get a chance to process that
4311 * sample.
4312 */
4313 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4314 latest_sample.key.key,
4315 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
4316 "user space");
4317 goto end_unlock;
4318 }
4319 channel_info = caa_container_of(node, struct channel_info,
4320 channels_ht_node);
4321 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4322 channel_info->name,
4323 latest_sample.key.key,
4324 channel_info->session_info->name,
4325 latest_sample.highest_usage,
4326 latest_sample.lowest_usage,
4327 latest_sample.channel_total_consumed);
4328
4329 previous_session_consumed_total =
4330 channel_info->session_info->consumed_data_size;
4331
4332 /* Retrieve the channel's last sample, if it exists, and update it. */
4333 cds_lfht_lookup(state->channel_state_ht,
4334 hash_channel_key(&latest_sample.key),
4335 match_channel_state_sample,
4336 &latest_sample.key,
4337 &iter);
4338 node = cds_lfht_iter_get_node(&iter);
4339 if (caa_likely(node)) {
4340 struct channel_state_sample *stored_sample;
4341
4342 /* Update the sample stored. */
4343 stored_sample = caa_container_of(node,
4344 struct channel_state_sample,
4345 channel_state_ht_node);
4346
4347 memcpy(&previous_sample, stored_sample,
4348 sizeof(previous_sample));
4349 stored_sample->highest_usage = latest_sample.highest_usage;
4350 stored_sample->lowest_usage = latest_sample.lowest_usage;
4351 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4352 previous_sample_available = true;
4353
4354 latest_session_consumed_total =
4355 previous_session_consumed_total +
4356 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4357 } else {
4358 /*
4359 * This is the channel's first sample, allocate space for and
4360 * store the new sample.
4361 */
4362 struct channel_state_sample *stored_sample;
4363
4364 stored_sample = zmalloc(sizeof(*stored_sample));
4365 if (!stored_sample) {
4366 ret = -1;
4367 goto end_unlock;
4368 }
4369
4370 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4371 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4372 cds_lfht_add(state->channel_state_ht,
4373 hash_channel_key(&stored_sample->key),
4374 &stored_sample->channel_state_ht_node);
4375
4376 latest_session_consumed_total =
4377 previous_session_consumed_total +
4378 latest_sample.channel_total_consumed;
4379 }
4380
4381 channel_info->session_info->consumed_data_size =
4382 latest_session_consumed_total;
4383
4384 /* Find triggers associated with this channel. */
4385 cds_lfht_lookup(state->channel_triggers_ht,
4386 hash_channel_key(&latest_sample.key),
4387 match_channel_trigger_list,
4388 &latest_sample.key,
4389 &iter);
4390 node = cds_lfht_iter_get_node(&iter);
4391 if (caa_likely(!node)) {
4392 goto end_unlock;
4393 }
4394
4395 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4396 channel_triggers_ht_node);
4397 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4398 node) {
4399 const struct lttng_condition *condition;
4400 const struct lttng_action *action;
4401 struct lttng_trigger *trigger;
4402 struct notification_client_list *client_list = NULL;
4403 struct lttng_evaluation *evaluation = NULL;
4404 bool client_list_is_empty;
4405
4406 ret = 0;
4407 trigger = trigger_list_element->trigger;
4408 condition = lttng_trigger_get_const_condition(trigger);
4409 assert(condition);
4410 action = lttng_trigger_get_const_action(trigger);
4411
4412 if (!lttng_trigger_is_ready_to_fire(trigger)) {
4413 goto put_list;
4414 }
4415
4416 /* Notify actions are the only type currently supported. */
4417 /* TODO support other type of action */
4418 assert(lttng_action_get_type_const(action) ==
4419 LTTNG_ACTION_TYPE_NOTIFY);
4420
4421 /*
4422 * Check if any client is subscribed to the result of this
4423 * evaluation.
4424 */
4425 client_list = get_client_list_from_condition(state, condition);
4426 assert(client_list);
4427 client_list_is_empty = cds_list_empty(&client_list->list);
4428 if (client_list_is_empty) {
4429 /*
4430 * No clients interested in the evaluation's result,
4431 * skip it.
4432 */
4433 goto put_list;
4434 }
4435
4436 ret = evaluate_buffer_condition(condition, &evaluation, state,
4437 previous_sample_available ? &previous_sample : NULL,
4438 &latest_sample,
4439 previous_session_consumed_total,
4440 latest_session_consumed_total,
4441 channel_info);
4442 if (caa_unlikely(ret)) {
4443 goto put_list;
4444 }
4445
4446 if (caa_likely(!evaluation)) {
4447 goto put_list;
4448 }
4449
4450 /* Dispatch evaluation result to all clients. */
4451 ret = send_evaluation_to_clients(trigger_list_element->trigger,
4452 evaluation, client_list, state,
4453 channel_info->session_info->uid,
4454 channel_info->session_info->gid);
4455 lttng_evaluation_destroy(evaluation);
4456 put_list:
4457 notification_client_list_put(client_list);
4458 if (caa_unlikely(ret)) {
4459 break;
4460 }
4461 }
4462 end_unlock:
4463 rcu_read_unlock();
4464 end:
4465 return ret;
4466 }
This page took 0.190932 seconds and 5 git commands to generate.