SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <urcu.h>
10 #include <urcu/rculfhash.h>
11
12 #include <common/defaults.h>
13 #include <common/error.h>
14 #include <common/futex.h>
15 #include <common/unix.h>
16 #include <common/dynamic-buffer.h>
17 #include <common/hashtable/utils.h>
18 #include <common/sessiond-comm/sessiond-comm.h>
19 #include <common/macros.h>
20 #include <lttng/condition/condition.h>
21 #include <lttng/action/action-internal.h>
22 #include <lttng/action/group-internal.h>
23 #include <lttng/notification/notification-internal.h>
24 #include <lttng/condition/condition-internal.h>
25 #include <lttng/condition/buffer-usage-internal.h>
26 #include <lttng/condition/session-consumed-size-internal.h>
27 #include <lttng/condition/session-rotation-internal.h>
28 #include <lttng/condition/event-rule-internal.h>
29 #include <lttng/notification/channel-internal.h>
30 #include <lttng/trigger/trigger-internal.h>
31
32 #include <time.h>
33 #include <unistd.h>
34 #include <assert.h>
35 #include <inttypes.h>
36 #include <fcntl.h>
37
38 #include "notification-thread.h"
39 #include "notification-thread-events.h"
40 #include "notification-thread-commands.h"
41 #include "lttng-sessiond.h"
42 #include "kernel.h"
43
44 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
46
47 enum lttng_object_type {
48 LTTNG_OBJECT_TYPE_UNKNOWN,
49 LTTNG_OBJECT_TYPE_NONE,
50 LTTNG_OBJECT_TYPE_CHANNEL,
51 LTTNG_OBJECT_TYPE_SESSION,
52 };
53
54 struct lttng_trigger_list_element {
55 /* No ownership of the trigger object is assumed. */
56 struct lttng_trigger *trigger;
57 struct cds_list_head node;
58 };
59
60 struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
62 /* List of struct lttng_trigger_list_element. */
63 struct cds_list_head list;
64 /* Node in the channel_triggers_ht */
65 struct cds_lfht_node channel_triggers_ht_node;
66 /* call_rcu delayed reclaim. */
67 struct rcu_head rcu_node;
68 };
69
70 /*
71 * List of triggers applying to a given session.
72 *
73 * See:
74 * - lttng_session_trigger_list_create()
75 * - lttng_session_trigger_list_build()
76 * - lttng_session_trigger_list_destroy()
77 * - lttng_session_trigger_list_add()
78 */
79 struct lttng_session_trigger_list {
80 /*
81 * Not owned by this; points to the session_info structure's
82 * session name.
83 */
84 const char *session_name;
85 /* List of struct lttng_trigger_list_element. */
86 struct cds_list_head list;
87 /* Node in the session_triggers_ht */
88 struct cds_lfht_node session_triggers_ht_node;
89 /*
90 * Weak reference to the notification system's session triggers
91 * hashtable.
92 *
93 * The session trigger list structure structure is owned by
94 * the session's session_info.
95 *
96 * The session_info is kept alive the the channel_infos holding a
97 * reference to it (reference counting). When those channels are
98 * destroyed (at runtime or on teardown), the reference they hold
99 * to the session_info are released. On destruction of session_info,
100 * session_info_destroy() will remove the list of triggers applying
101 * to this session from the notification system's state.
102 *
103 * This implies that the session_triggers_ht must be destroyed
104 * after the channels.
105 */
106 struct cds_lfht *session_triggers_ht;
107 /* Used for delayed RCU reclaim. */
108 struct rcu_head rcu_node;
109 };
110
111 struct lttng_trigger_ht_element {
112 struct lttng_trigger *trigger;
113 struct cds_lfht_node node;
114 struct cds_lfht_node node_by_name;
115 /* call_rcu delayed reclaim. */
116 struct rcu_head rcu_node;
117 };
118
119 struct lttng_condition_list_element {
120 struct lttng_condition *condition;
121 struct cds_list_head node;
122 };
123
124 /*
125 * Facilities to carry the different notifications type in the action processing
126 * code path.
127 */
128 struct lttng_trigger_notification {
129 union {
130 struct lttng_ust_trigger_notification *ust;
131 uint64_t *kernel;
132 } u;
133 uint64_t id;
134 enum lttng_domain_type type;
135 };
136
137 struct channel_state_sample {
138 struct channel_key key;
139 struct cds_lfht_node channel_state_ht_node;
140 uint64_t highest_usage;
141 uint64_t lowest_usage;
142 uint64_t channel_total_consumed;
143 /* call_rcu delayed reclaim. */
144 struct rcu_head rcu_node;
145 };
146
147 static unsigned long hash_channel_key(struct channel_key *key);
148 static int evaluate_buffer_condition(const struct lttng_condition *condition,
149 struct lttng_evaluation **evaluation,
150 const struct notification_thread_state *state,
151 const struct channel_state_sample *previous_sample,
152 const struct channel_state_sample *latest_sample,
153 uint64_t previous_session_consumed_total,
154 uint64_t latest_session_consumed_total,
155 struct channel_info *channel_info);
156 static
157 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
158 const struct lttng_evaluation *evaluation,
159 struct notification_client_list *client_list,
160 struct notification_thread_state *state,
161 uid_t channel_uid, gid_t channel_gid);
162
163
164 /* session_info API */
165 static
166 void session_info_destroy(void *_data);
167 static
168 void session_info_get(struct session_info *session_info);
169 static
170 void session_info_put(struct session_info *session_info);
171 static
172 struct session_info *session_info_create(const char *name,
173 uid_t uid, gid_t gid,
174 struct lttng_session_trigger_list *trigger_list,
175 struct cds_lfht *sessions_ht);
176 static
177 void session_info_add_channel(struct session_info *session_info,
178 struct channel_info *channel_info);
179 static
180 void session_info_remove_channel(struct session_info *session_info,
181 struct channel_info *channel_info);
182
183 /* lttng_session_trigger_list API */
184 static
185 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
186 const char *session_name,
187 struct cds_lfht *session_triggers_ht);
188 static
189 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
190 const struct notification_thread_state *state,
191 const char *session_name);
192 static
193 void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list *list);
195 static
196 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
197 struct lttng_trigger *trigger);
198
199 static
200 int client_handle_transmission_status(
201 struct notification_client *client,
202 enum client_transmission_status transmission_status,
203 struct notification_thread_state *state);
204
205 static
206 int match_client_socket(struct cds_lfht_node *node, const void *key)
207 {
208 /* This double-cast is intended to supress pointer-to-cast warning. */
209 const int socket = (int) (intptr_t) key;
210 const struct notification_client *client = caa_container_of(node,
211 struct notification_client, client_socket_ht_node);
212
213 return client->socket == socket;
214 }
215
216 static
217 int match_client_id(struct cds_lfht_node *node, const void *key)
218 {
219 /* This double-cast is intended to supress pointer-to-cast warning. */
220 const notification_client_id id = *((notification_client_id *) key);
221 const struct notification_client *client = caa_container_of(
222 node, struct notification_client, client_id_ht_node);
223
224 return client->id == id;
225 }
226
227 static
228 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
229 {
230 struct channel_key *channel_key = (struct channel_key *) key;
231 struct lttng_channel_trigger_list *trigger_list;
232
233 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
234 channel_triggers_ht_node);
235
236 return !!((channel_key->key == trigger_list->channel_key.key) &&
237 (channel_key->domain == trigger_list->channel_key.domain));
238 }
239
240 static
241 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
242 {
243 const char *session_name = (const char *) key;
244 struct lttng_session_trigger_list *trigger_list;
245
246 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
247 session_triggers_ht_node);
248
249 return !!(strcmp(trigger_list->session_name, session_name) == 0);
250 }
251
252 static
253 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
254 {
255 struct channel_key *channel_key = (struct channel_key *) key;
256 struct channel_state_sample *sample;
257
258 sample = caa_container_of(node, struct channel_state_sample,
259 channel_state_ht_node);
260
261 return !!((channel_key->key == sample->key.key) &&
262 (channel_key->domain == sample->key.domain));
263 }
264
265 static
266 int match_channel_info(struct cds_lfht_node *node, const void *key)
267 {
268 struct channel_key *channel_key = (struct channel_key *) key;
269 struct channel_info *channel_info;
270
271 channel_info = caa_container_of(node, struct channel_info,
272 channels_ht_node);
273
274 return !!((channel_key->key == channel_info->key.key) &&
275 (channel_key->domain == channel_info->key.domain));
276 }
277
278 static
279 int match_trigger(struct cds_lfht_node *node, const void *key)
280 {
281 bool match = false;
282 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
283 struct lttng_trigger_ht_element *trigger_ht_element;
284 const struct lttng_credentials *creds_key;
285 const struct lttng_credentials *creds_node;
286
287 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
288 node);
289
290 match = lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
291 if (!match) {
292 goto end;
293 }
294
295 /* Validate credential */
296 /* TODO: this could be moved to lttng_trigger_equal depending on how we
297 * handle root behaviour on disable and listing.
298 */
299 creds_key = lttng_trigger_get_credentials(trigger_key);
300 creds_node = lttng_trigger_get_credentials(trigger_ht_element->trigger);
301 match = lttng_credentials_is_equal(creds_key, creds_node);
302 end:
303 return !!match;
304 }
305
306 static
307 int match_trigger_token(struct cds_lfht_node *node, const void *key)
308 {
309 const uint64_t *_key = key;
310 struct notification_trigger_tokens_ht_element *element;
311
312 element = caa_container_of(node, struct notification_trigger_tokens_ht_element,
313 node);
314 return *_key == element->token ;
315 }
316
317 static
318 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
319 {
320 struct lttng_condition *condition_key = (struct lttng_condition *) key;
321 struct notification_client_list *client_list;
322 const struct lttng_condition *condition;
323
324 assert(condition_key);
325
326 client_list = caa_container_of(node, struct notification_client_list,
327 notification_trigger_clients_ht_node);
328 condition = lttng_trigger_get_const_condition(client_list->trigger);
329
330 return !!lttng_condition_is_equal(condition_key, condition);
331 }
332
333 static
334 int match_session(struct cds_lfht_node *node, const void *key)
335 {
336 const char *name = key;
337 struct session_info *session_info = caa_container_of(
338 node, struct session_info, sessions_ht_node);
339
340 return !strcmp(session_info->name, name);
341 }
342
343 /*
344 * Match function for string node.
345 */
346 static int match_str(struct cds_lfht_node *node, const void *key)
347 {
348 struct lttng_trigger_ht_element *trigger_ht_element;
349 const char *name;
350
351 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
352 node_by_name);
353
354 /* TODO error checking */
355 lttng_trigger_get_name(trigger_ht_element->trigger, &name);
356
357 return hash_match_key_str(name, (void *) key);
358 }
359
360 static
361 unsigned long lttng_condition_buffer_usage_hash(
362 const struct lttng_condition *_condition)
363 {
364 unsigned long hash;
365 unsigned long condition_type;
366 struct lttng_condition_buffer_usage *condition;
367
368 condition = container_of(_condition,
369 struct lttng_condition_buffer_usage, parent);
370
371 condition_type = (unsigned long) condition->parent.type;
372 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
373 if (condition->session_name) {
374 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
375 }
376 if (condition->channel_name) {
377 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
378 }
379 if (condition->domain.set) {
380 hash ^= hash_key_ulong(
381 (void *) condition->domain.type,
382 lttng_ht_seed);
383 }
384 if (condition->threshold_ratio.set) {
385 uint64_t val;
386
387 val = condition->threshold_ratio.value * (double) UINT32_MAX;
388 hash ^= hash_key_u64(&val, lttng_ht_seed);
389 } else if (condition->threshold_bytes.set) {
390 uint64_t val;
391
392 val = condition->threshold_bytes.value;
393 hash ^= hash_key_u64(&val, lttng_ht_seed);
394 }
395 return hash;
396 }
397
398 static
399 unsigned long lttng_condition_session_consumed_size_hash(
400 const struct lttng_condition *_condition)
401 {
402 unsigned long hash;
403 unsigned long condition_type =
404 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
405 struct lttng_condition_session_consumed_size *condition;
406 uint64_t val;
407
408 condition = container_of(_condition,
409 struct lttng_condition_session_consumed_size, parent);
410
411 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
412 if (condition->session_name) {
413 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
414 }
415 val = condition->consumed_threshold_bytes.value;
416 hash ^= hash_key_u64(&val, lttng_ht_seed);
417 return hash;
418 }
419
420 static
421 unsigned long lttng_condition_session_rotation_hash(
422 const struct lttng_condition *_condition)
423 {
424 unsigned long hash, condition_type;
425 struct lttng_condition_session_rotation *condition;
426
427 condition = container_of(_condition,
428 struct lttng_condition_session_rotation, parent);
429 condition_type = (unsigned long) condition->parent.type;
430 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
431 assert(condition->session_name);
432 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
433 return hash;
434 }
435
436 static
437 unsigned long lttng_condition_event_rule_hash(
438 const struct lttng_condition *_condition)
439 {
440 unsigned long hash, condition_type;
441 struct lttng_condition_event_rule *condition;
442
443 condition = container_of(_condition,
444 struct lttng_condition_event_rule, parent);
445 condition_type = (unsigned long) condition->parent.type;
446 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
447
448 /* TODO: further hasg using the event rule? on pattern maybe?*/
449 return hash;
450 }
451
452 /*
453 * The lttng_condition hashing code is kept in this file (rather than
454 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
455 * don't want to link in liblttng-ctl.
456 */
457 static
458 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
459 {
460 switch (condition->type) {
461 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
462 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
463 return lttng_condition_buffer_usage_hash(condition);
464 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
465 return lttng_condition_session_consumed_size_hash(condition);
466 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
467 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
468 return lttng_condition_session_rotation_hash(condition);
469 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
470 return lttng_condition_event_rule_hash(condition);
471 default:
472 ERR("[notification-thread] Unexpected condition type caught");
473 abort();
474 }
475 }
476
477 static
478 unsigned long hash_channel_key(struct channel_key *key)
479 {
480 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
481 unsigned long domain_hash = hash_key_ulong(
482 (void *) (unsigned long) key->domain, lttng_ht_seed);
483
484 return key_hash ^ domain_hash;
485 }
486
487 static
488 unsigned long hash_client_socket(int socket)
489 {
490 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
491 }
492
493 static
494 unsigned long hash_client_id(notification_client_id id)
495 {
496 return hash_key_u64(&id, lttng_ht_seed);
497 }
498
499 /*
500 * Get the type of object to which a given condition applies. Bindings let
501 * the notification system evaluate a trigger's condition when a given
502 * object's state is updated.
503 *
504 * For instance, a condition bound to a channel will be evaluated everytime
505 * the channel's state is changed by a channel monitoring sample.
506 */
507 static
508 enum lttng_object_type get_condition_binding_object(
509 const struct lttng_condition *condition)
510 {
511 switch (lttng_condition_get_type(condition)) {
512 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
513 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
514 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
515 return LTTNG_OBJECT_TYPE_CHANNEL;
516 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
517 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
518 return LTTNG_OBJECT_TYPE_SESSION;
519 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
520 return LTTNG_OBJECT_TYPE_NONE;
521 default:
522 return LTTNG_OBJECT_TYPE_UNKNOWN;
523 }
524 }
525
526 static
527 void free_channel_info_rcu(struct rcu_head *node)
528 {
529 free(caa_container_of(node, struct channel_info, rcu_node));
530 }
531
532 static
533 void channel_info_destroy(struct channel_info *channel_info)
534 {
535 if (!channel_info) {
536 return;
537 }
538
539 if (channel_info->session_info) {
540 session_info_remove_channel(channel_info->session_info,
541 channel_info);
542 session_info_put(channel_info->session_info);
543 }
544 if (channel_info->name) {
545 free(channel_info->name);
546 }
547 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
548 }
549
550 static
551 void free_session_info_rcu(struct rcu_head *node)
552 {
553 free(caa_container_of(node, struct session_info, rcu_node));
554 }
555
556 /* Don't call directly, use the ref-counting mechanism. */
557 static
558 void session_info_destroy(void *_data)
559 {
560 struct session_info *session_info = _data;
561 int ret;
562
563 assert(session_info);
564 if (session_info->channel_infos_ht) {
565 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
566 if (ret) {
567 ERR("[notification-thread] Failed to destroy channel information hash table");
568 }
569 }
570 lttng_session_trigger_list_destroy(session_info->trigger_list);
571
572 rcu_read_lock();
573 cds_lfht_del(session_info->sessions_ht,
574 &session_info->sessions_ht_node);
575 rcu_read_unlock();
576 free(session_info->name);
577 call_rcu(&session_info->rcu_node, free_session_info_rcu);
578 }
579
580 static
581 void session_info_get(struct session_info *session_info)
582 {
583 if (!session_info) {
584 return;
585 }
586 lttng_ref_get(&session_info->ref);
587 }
588
589 static
590 void session_info_put(struct session_info *session_info)
591 {
592 if (!session_info) {
593 return;
594 }
595 lttng_ref_put(&session_info->ref);
596 }
597
598 static
599 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
600 struct lttng_session_trigger_list *trigger_list,
601 struct cds_lfht *sessions_ht)
602 {
603 struct session_info *session_info;
604
605 assert(name);
606
607 session_info = zmalloc(sizeof(*session_info));
608 if (!session_info) {
609 goto end;
610 }
611 lttng_ref_init(&session_info->ref, session_info_destroy);
612
613 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
614 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
615 if (!session_info->channel_infos_ht) {
616 goto error;
617 }
618
619 cds_lfht_node_init(&session_info->sessions_ht_node);
620 session_info->name = strdup(name);
621 if (!session_info->name) {
622 goto error;
623 }
624 session_info->uid = uid;
625 session_info->gid = gid;
626 session_info->trigger_list = trigger_list;
627 session_info->sessions_ht = sessions_ht;
628 end:
629 return session_info;
630 error:
631 session_info_put(session_info);
632 return NULL;
633 }
634
635 static
636 void session_info_add_channel(struct session_info *session_info,
637 struct channel_info *channel_info)
638 {
639 rcu_read_lock();
640 cds_lfht_add(session_info->channel_infos_ht,
641 hash_channel_key(&channel_info->key),
642 &channel_info->session_info_channels_ht_node);
643 rcu_read_unlock();
644 }
645
646 static
647 void session_info_remove_channel(struct session_info *session_info,
648 struct channel_info *channel_info)
649 {
650 rcu_read_lock();
651 cds_lfht_del(session_info->channel_infos_ht,
652 &channel_info->session_info_channels_ht_node);
653 rcu_read_unlock();
654 }
655
656 static
657 struct channel_info *channel_info_create(const char *channel_name,
658 struct channel_key *channel_key, uint64_t channel_capacity,
659 struct session_info *session_info)
660 {
661 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
662
663 if (!channel_info) {
664 goto end;
665 }
666
667 cds_lfht_node_init(&channel_info->channels_ht_node);
668 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
669 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
670 channel_info->capacity = channel_capacity;
671
672 channel_info->name = strdup(channel_name);
673 if (!channel_info->name) {
674 goto error;
675 }
676
677 /*
678 * Set the references between session and channel infos:
679 * - channel_info holds a strong reference to session_info
680 * - session_info holds a weak reference to channel_info
681 */
682 session_info_get(session_info);
683 session_info_add_channel(session_info, channel_info);
684 channel_info->session_info = session_info;
685 end:
686 return channel_info;
687 error:
688 channel_info_destroy(channel_info);
689 return NULL;
690 }
691
692 LTTNG_HIDDEN
693 bool notification_client_list_get(struct notification_client_list *list)
694 {
695 return urcu_ref_get_unless_zero(&list->ref);
696 }
697
698 static
699 void free_notification_client_list_rcu(struct rcu_head *node)
700 {
701 free(caa_container_of(node, struct notification_client_list,
702 rcu_node));
703 }
704
705 static
706 void notification_client_list_release(struct urcu_ref *list_ref)
707 {
708 struct notification_client_list *list =
709 container_of(list_ref, typeof(*list), ref);
710 struct notification_client_list_element *client_list_element, *tmp;
711
712 if (list->notification_trigger_clients_ht) {
713 rcu_read_lock();
714 cds_lfht_del(list->notification_trigger_clients_ht,
715 &list->notification_trigger_clients_ht_node);
716 rcu_read_unlock();
717 list->notification_trigger_clients_ht = NULL;
718 }
719 cds_list_for_each_entry_safe(client_list_element, tmp,
720 &list->list, node) {
721 free(client_list_element);
722 }
723 pthread_mutex_destroy(&list->lock);
724 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
725 }
726
727 static
728 struct notification_client_list *notification_client_list_create(
729 const struct lttng_trigger *trigger)
730 {
731 struct notification_client_list *client_list =
732 zmalloc(sizeof(*client_list));
733
734 if (!client_list) {
735 goto error;
736 }
737 pthread_mutex_init(&client_list->lock, NULL);
738 urcu_ref_init(&client_list->ref);
739 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
740 CDS_INIT_LIST_HEAD(&client_list->list);
741 client_list->trigger = trigger;
742 error:
743 return client_list;
744 }
745
746 static
747 void publish_notification_client_list(
748 struct notification_thread_state *state,
749 struct notification_client_list *list)
750 {
751 const struct lttng_condition *condition =
752 lttng_trigger_get_const_condition(list->trigger);
753
754 assert(!list->notification_trigger_clients_ht);
755
756 list->notification_trigger_clients_ht =
757 state->notification_trigger_clients_ht;
758
759 rcu_read_lock();
760 cds_lfht_add(state->notification_trigger_clients_ht,
761 lttng_condition_hash(condition),
762 &list->notification_trigger_clients_ht_node);
763 rcu_read_unlock();
764 }
765
766 LTTNG_HIDDEN
767 void notification_client_list_put(struct notification_client_list *list)
768 {
769 if (!list) {
770 return;
771 }
772 return urcu_ref_put(&list->ref, notification_client_list_release);
773 }
774
775 /* Provides a reference to the returned list. */
776 static
777 struct notification_client_list *get_client_list_from_condition(
778 struct notification_thread_state *state,
779 const struct lttng_condition *condition)
780 {
781 struct cds_lfht_node *node;
782 struct cds_lfht_iter iter;
783 struct notification_client_list *list = NULL;
784
785 rcu_read_lock();
786 cds_lfht_lookup(state->notification_trigger_clients_ht,
787 lttng_condition_hash(condition),
788 match_client_list_condition,
789 condition,
790 &iter);
791 node = cds_lfht_iter_get_node(&iter);
792 if (node) {
793 list = container_of(node, struct notification_client_list,
794 notification_trigger_clients_ht_node);
795 list = notification_client_list_get(list) ? list : NULL;
796 }
797 rcu_read_unlock();
798 return list;
799 }
800
801 static
802 int evaluate_channel_condition_for_client(
803 const struct lttng_condition *condition,
804 struct notification_thread_state *state,
805 struct lttng_evaluation **evaluation,
806 uid_t *session_uid, gid_t *session_gid)
807 {
808 int ret;
809 struct cds_lfht_iter iter;
810 struct cds_lfht_node *node;
811 struct channel_info *channel_info = NULL;
812 struct channel_key *channel_key = NULL;
813 struct channel_state_sample *last_sample = NULL;
814 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
815
816 rcu_read_lock();
817
818 /* Find the channel associated with the condition. */
819 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
820 channel_trigger_list, channel_triggers_ht_node) {
821 struct lttng_trigger_list_element *element;
822
823 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
824 const struct lttng_condition *current_condition =
825 lttng_trigger_get_const_condition(
826 element->trigger);
827
828 assert(current_condition);
829 if (!lttng_condition_is_equal(condition,
830 current_condition)) {
831 continue;
832 }
833
834 /* Found the trigger, save the channel key. */
835 channel_key = &channel_trigger_list->channel_key;
836 break;
837 }
838 if (channel_key) {
839 /* The channel key was found stop iteration. */
840 break;
841 }
842 }
843
844 if (!channel_key){
845 /* No channel found; normal exit. */
846 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
847 ret = 0;
848 goto end;
849 }
850
851 /* Fetch channel info for the matching channel. */
852 cds_lfht_lookup(state->channels_ht,
853 hash_channel_key(channel_key),
854 match_channel_info,
855 channel_key,
856 &iter);
857 node = cds_lfht_iter_get_node(&iter);
858 assert(node);
859 channel_info = caa_container_of(node, struct channel_info,
860 channels_ht_node);
861
862 /* Retrieve the channel's last sample, if it exists. */
863 cds_lfht_lookup(state->channel_state_ht,
864 hash_channel_key(channel_key),
865 match_channel_state_sample,
866 channel_key,
867 &iter);
868 node = cds_lfht_iter_get_node(&iter);
869 if (node) {
870 last_sample = caa_container_of(node,
871 struct channel_state_sample,
872 channel_state_ht_node);
873 } else {
874 /* Nothing to evaluate, no sample was ever taken. Normal exit */
875 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
876 ret = 0;
877 goto end;
878 }
879
880 ret = evaluate_buffer_condition(condition, evaluation, state,
881 NULL, last_sample,
882 0, channel_info->session_info->consumed_data_size,
883 channel_info);
884 if (ret) {
885 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
886 goto end;
887 }
888
889 *session_uid = channel_info->session_info->uid;
890 *session_gid = channel_info->session_info->gid;
891 end:
892 rcu_read_unlock();
893 return ret;
894 }
895
896 static
897 const char *get_condition_session_name(const struct lttng_condition *condition)
898 {
899 const char *session_name = NULL;
900 enum lttng_condition_status status;
901
902 switch (lttng_condition_get_type(condition)) {
903 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
904 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
905 status = lttng_condition_buffer_usage_get_session_name(
906 condition, &session_name);
907 break;
908 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
909 status = lttng_condition_session_consumed_size_get_session_name(
910 condition, &session_name);
911 break;
912 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
913 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
914 status = lttng_condition_session_rotation_get_session_name(
915 condition, &session_name);
916 break;
917 default:
918 abort();
919 }
920 if (status != LTTNG_CONDITION_STATUS_OK) {
921 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
922 goto end;
923 }
924 end:
925 return session_name;
926 }
927
928 static
929 int evaluate_session_condition_for_client(
930 const struct lttng_condition *condition,
931 struct notification_thread_state *state,
932 struct lttng_evaluation **evaluation,
933 uid_t *session_uid, gid_t *session_gid)
934 {
935 int ret;
936 struct cds_lfht_iter iter;
937 struct cds_lfht_node *node;
938 const char *session_name;
939 struct session_info *session_info = NULL;
940
941 rcu_read_lock();
942 session_name = get_condition_session_name(condition);
943
944 /* Find the session associated with the trigger. */
945 cds_lfht_lookup(state->sessions_ht,
946 hash_key_str(session_name, lttng_ht_seed),
947 match_session,
948 session_name,
949 &iter);
950 node = cds_lfht_iter_get_node(&iter);
951 if (!node) {
952 DBG("[notification-thread] No known session matching name \"%s\"",
953 session_name);
954 ret = 0;
955 goto end;
956 }
957
958 session_info = caa_container_of(node, struct session_info,
959 sessions_ht_node);
960 session_info_get(session_info);
961
962 /*
963 * Evaluation is performed in-line here since only one type of
964 * session-bound condition is handled for the moment.
965 */
966 switch (lttng_condition_get_type(condition)) {
967 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
968 if (!session_info->rotation.ongoing) {
969 ret = 0;
970 goto end_session_put;
971 }
972
973 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
974 session_info->rotation.id);
975 if (!*evaluation) {
976 /* Fatal error. */
977 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
978 session_info->name);
979 ret = -1;
980 goto end_session_put;
981 }
982 ret = 0;
983 break;
984 default:
985 ret = 0;
986 goto end_session_put;
987 }
988
989 *session_uid = session_info->uid;
990 *session_gid = session_info->gid;
991
992 end_session_put:
993 session_info_put(session_info);
994 end:
995 rcu_read_unlock();
996 return ret;
997 }
998
999 static
1000 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1001 const struct lttng_condition *condition,
1002 struct notification_client *client,
1003 struct notification_thread_state *state)
1004 {
1005 int ret;
1006 struct lttng_evaluation *evaluation = NULL;
1007 struct notification_client_list client_list = {
1008 .lock = PTHREAD_MUTEX_INITIALIZER,
1009 };
1010 struct notification_client_list_element client_list_element = { 0 };
1011 uid_t object_uid = 0;
1012 gid_t object_gid = 0;
1013
1014 assert(trigger);
1015 assert(condition);
1016 assert(client);
1017 assert(state);
1018
1019 switch (get_condition_binding_object(condition)) {
1020 case LTTNG_OBJECT_TYPE_SESSION:
1021 ret = evaluate_session_condition_for_client(condition, state,
1022 &evaluation, &object_uid, &object_gid);
1023 break;
1024 case LTTNG_OBJECT_TYPE_CHANNEL:
1025 ret = evaluate_channel_condition_for_client(condition, state,
1026 &evaluation, &object_uid, &object_gid);
1027 break;
1028 case LTTNG_OBJECT_TYPE_NONE:
1029 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
1030 ret = 0;
1031 goto end;
1032 case LTTNG_OBJECT_TYPE_UNKNOWN:
1033 default:
1034 ret = -1;
1035 goto end;
1036 }
1037 if (ret) {
1038 /* Fatal error. */
1039 goto end;
1040 }
1041 if (!evaluation) {
1042 /* Evaluation yielded nothing. Normal exit. */
1043 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1044 ret = 0;
1045 goto end;
1046 }
1047
1048 /*
1049 * Create a temporary client list with the client currently
1050 * subscribing.
1051 */
1052 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
1053 CDS_INIT_LIST_HEAD(&client_list.list);
1054 client_list.trigger = trigger;
1055
1056 CDS_INIT_LIST_HEAD(&client_list_element.node);
1057 client_list_element.client = client;
1058 cds_list_add(&client_list_element.node, &client_list.list);
1059
1060 /* Send evaluation result to the newly-subscribed client. */
1061 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1062 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
1063 state, object_uid, object_gid);
1064
1065 end:
1066 return ret;
1067 }
1068
1069 static
1070 int notification_thread_client_subscribe(struct notification_client *client,
1071 struct lttng_condition *condition,
1072 struct notification_thread_state *state,
1073 enum lttng_notification_channel_status *_status)
1074 {
1075 int ret = 0;
1076 struct notification_client_list *client_list = NULL;
1077 struct lttng_condition_list_element *condition_list_element = NULL;
1078 struct notification_client_list_element *client_list_element = NULL;
1079 enum lttng_notification_channel_status status =
1080 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1081
1082 /*
1083 * Ensure that the client has not already subscribed to this condition
1084 * before.
1085 */
1086 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1087 if (lttng_condition_is_equal(condition_list_element->condition,
1088 condition)) {
1089 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1090 goto end;
1091 }
1092 }
1093
1094 condition_list_element = zmalloc(sizeof(*condition_list_element));
1095 if (!condition_list_element) {
1096 ret = -1;
1097 goto error;
1098 }
1099 client_list_element = zmalloc(sizeof(*client_list_element));
1100 if (!client_list_element) {
1101 ret = -1;
1102 goto error;
1103 }
1104
1105 /*
1106 * Add the newly-subscribed condition to the client's subscription list.
1107 */
1108 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1109 condition_list_element->condition = condition;
1110 cds_list_add(&condition_list_element->node, &client->condition_list);
1111
1112 client_list = get_client_list_from_condition(state, condition);
1113 if (!client_list) {
1114 /*
1115 * No notification-emiting trigger registered with this
1116 * condition. We don't evaluate the condition right away
1117 * since this trigger is not registered yet.
1118 */
1119 free(client_list_element);
1120 goto end;
1121 }
1122
1123 /*
1124 * The condition to which the client just subscribed is evaluated
1125 * at this point so that conditions that are already TRUE result
1126 * in a notification being sent out.
1127 *
1128 * The client_list's trigger is used without locking the list itself.
1129 * This is correct since the list doesn't own the trigger and the
1130 * object is immutable.
1131 */
1132 if (evaluate_condition_for_client(client_list->trigger, condition,
1133 client, state)) {
1134 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1135 ret = -1;
1136 free(client_list_element);
1137 goto end;
1138 }
1139
1140 /*
1141 * Add the client to the list of clients interested in a given trigger
1142 * if a "notification" trigger with a corresponding condition was
1143 * added prior.
1144 */
1145 client_list_element->client = client;
1146 CDS_INIT_LIST_HEAD(&client_list_element->node);
1147
1148 pthread_mutex_lock(&client_list->lock);
1149 cds_list_add(&client_list_element->node, &client_list->list);
1150 pthread_mutex_unlock(&client_list->lock);
1151 end:
1152 if (_status) {
1153 *_status = status;
1154 }
1155 if (client_list) {
1156 notification_client_list_put(client_list);
1157 }
1158 return ret;
1159 error:
1160 free(condition_list_element);
1161 free(client_list_element);
1162 return ret;
1163 }
1164
1165 static
1166 int notification_thread_client_unsubscribe(
1167 struct notification_client *client,
1168 struct lttng_condition *condition,
1169 struct notification_thread_state *state,
1170 enum lttng_notification_channel_status *_status)
1171 {
1172 struct notification_client_list *client_list;
1173 struct lttng_condition_list_element *condition_list_element,
1174 *condition_tmp;
1175 struct notification_client_list_element *client_list_element,
1176 *client_tmp;
1177 bool condition_found = false;
1178 enum lttng_notification_channel_status status =
1179 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1180
1181 /* Remove the condition from the client's condition list. */
1182 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1183 &client->condition_list, node) {
1184 if (!lttng_condition_is_equal(condition_list_element->condition,
1185 condition)) {
1186 continue;
1187 }
1188
1189 cds_list_del(&condition_list_element->node);
1190 /*
1191 * The caller may be iterating on the client's conditions to
1192 * tear down a client's connection. In this case, the condition
1193 * will be destroyed at the end.
1194 */
1195 if (condition != condition_list_element->condition) {
1196 lttng_condition_destroy(
1197 condition_list_element->condition);
1198 }
1199 free(condition_list_element);
1200 condition_found = true;
1201 break;
1202 }
1203
1204 if (!condition_found) {
1205 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1206 goto end;
1207 }
1208
1209 /*
1210 * Remove the client from the list of clients interested the trigger
1211 * matching the condition.
1212 */
1213 client_list = get_client_list_from_condition(state, condition);
1214 if (!client_list) {
1215 goto end;
1216 }
1217
1218 pthread_mutex_lock(&client_list->lock);
1219 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1220 &client_list->list, node) {
1221 if (client_list_element->client->id != client->id) {
1222 continue;
1223 }
1224 cds_list_del(&client_list_element->node);
1225 free(client_list_element);
1226 break;
1227 }
1228 pthread_mutex_unlock(&client_list->lock);
1229 notification_client_list_put(client_list);
1230 client_list = NULL;
1231 end:
1232 lttng_condition_destroy(condition);
1233 if (_status) {
1234 *_status = status;
1235 }
1236 return 0;
1237 }
1238
1239 static
1240 void free_notification_client_rcu(struct rcu_head *node)
1241 {
1242 free(caa_container_of(node, struct notification_client, rcu_node));
1243 }
1244
1245 static
1246 void notification_client_destroy(struct notification_client *client,
1247 struct notification_thread_state *state)
1248 {
1249 if (!client) {
1250 return;
1251 }
1252
1253 /*
1254 * The client object is not reachable by other threads, no need to lock
1255 * the client here.
1256 */
1257 if (client->socket >= 0) {
1258 (void) lttcomm_close_unix_sock(client->socket);
1259 client->socket = -1;
1260 }
1261 client->communication.active = false;
1262 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1263 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1264 pthread_mutex_destroy(&client->lock);
1265 call_rcu(&client->rcu_node, free_notification_client_rcu);
1266 }
1267
1268 /*
1269 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1270 * client pointer).
1271 */
1272 static
1273 struct notification_client *get_client_from_socket(int socket,
1274 struct notification_thread_state *state)
1275 {
1276 struct cds_lfht_iter iter;
1277 struct cds_lfht_node *node;
1278 struct notification_client *client = NULL;
1279
1280 cds_lfht_lookup(state->client_socket_ht,
1281 hash_client_socket(socket),
1282 match_client_socket,
1283 (void *) (unsigned long) socket,
1284 &iter);
1285 node = cds_lfht_iter_get_node(&iter);
1286 if (!node) {
1287 goto end;
1288 }
1289
1290 client = caa_container_of(node, struct notification_client,
1291 client_socket_ht_node);
1292 end:
1293 return client;
1294 }
1295
1296 /*
1297 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1298 * client pointer).
1299 */
1300 static
1301 struct notification_client *get_client_from_id(notification_client_id id,
1302 struct notification_thread_state *state)
1303 {
1304 struct cds_lfht_iter iter;
1305 struct cds_lfht_node *node;
1306 struct notification_client *client = NULL;
1307
1308 cds_lfht_lookup(state->client_id_ht,
1309 hash_client_id(id),
1310 match_client_id,
1311 &id,
1312 &iter);
1313 node = cds_lfht_iter_get_node(&iter);
1314 if (!node) {
1315 goto end;
1316 }
1317
1318 client = caa_container_of(node, struct notification_client,
1319 client_id_ht_node);
1320 end:
1321 return client;
1322 }
1323
1324 static
1325 bool buffer_usage_condition_applies_to_channel(
1326 const struct lttng_condition *condition,
1327 const struct channel_info *channel_info)
1328 {
1329 enum lttng_condition_status status;
1330 enum lttng_domain_type condition_domain;
1331 const char *condition_session_name = NULL;
1332 const char *condition_channel_name = NULL;
1333
1334 status = lttng_condition_buffer_usage_get_domain_type(condition,
1335 &condition_domain);
1336 assert(status == LTTNG_CONDITION_STATUS_OK);
1337 if (channel_info->key.domain != condition_domain) {
1338 goto fail;
1339 }
1340
1341 status = lttng_condition_buffer_usage_get_session_name(
1342 condition, &condition_session_name);
1343 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1344
1345 status = lttng_condition_buffer_usage_get_channel_name(
1346 condition, &condition_channel_name);
1347 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1348
1349 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1350 goto fail;
1351 }
1352 if (strcmp(channel_info->name, condition_channel_name)) {
1353 goto fail;
1354 }
1355
1356 return true;
1357 fail:
1358 return false;
1359 }
1360
1361 static
1362 bool session_consumed_size_condition_applies_to_channel(
1363 const struct lttng_condition *condition,
1364 const struct channel_info *channel_info)
1365 {
1366 enum lttng_condition_status status;
1367 const char *condition_session_name = NULL;
1368
1369 status = lttng_condition_session_consumed_size_get_session_name(
1370 condition, &condition_session_name);
1371 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1372
1373 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1374 goto fail;
1375 }
1376
1377 return true;
1378 fail:
1379 return false;
1380 }
1381
1382 static
1383 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1384 const struct channel_info *channel_info)
1385 {
1386 const struct lttng_condition *condition;
1387 bool trigger_applies;
1388
1389 condition = lttng_trigger_get_const_condition(trigger);
1390 if (!condition) {
1391 goto fail;
1392 }
1393
1394 switch (lttng_condition_get_type(condition)) {
1395 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1396 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1397 trigger_applies = buffer_usage_condition_applies_to_channel(
1398 condition, channel_info);
1399 break;
1400 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1401 trigger_applies = session_consumed_size_condition_applies_to_channel(
1402 condition, channel_info);
1403 break;
1404 default:
1405 goto fail;
1406 }
1407
1408 return trigger_applies;
1409 fail:
1410 return false;
1411 }
1412
1413 static
1414 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1415 struct notification_client *client)
1416 {
1417 bool applies = false;
1418 struct lttng_condition_list_element *condition_list_element;
1419
1420 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1421 node) {
1422 applies = lttng_condition_is_equal(
1423 condition_list_element->condition,
1424 lttng_trigger_get_condition(trigger));
1425 if (applies) {
1426 break;
1427 }
1428 }
1429 return applies;
1430 }
1431
1432 /* Must be called with RCU read lock held. */
1433 static
1434 struct lttng_session_trigger_list *get_session_trigger_list(
1435 struct notification_thread_state *state,
1436 const char *session_name)
1437 {
1438 struct lttng_session_trigger_list *list = NULL;
1439 struct cds_lfht_node *node;
1440 struct cds_lfht_iter iter;
1441
1442 cds_lfht_lookup(state->session_triggers_ht,
1443 hash_key_str(session_name, lttng_ht_seed),
1444 match_session_trigger_list,
1445 session_name,
1446 &iter);
1447 node = cds_lfht_iter_get_node(&iter);
1448 if (!node) {
1449 /*
1450 * Not an error, the list of triggers applying to that session
1451 * will be initialized when the session is created.
1452 */
1453 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1454 session_name);
1455 goto end;
1456 }
1457
1458 list = caa_container_of(node,
1459 struct lttng_session_trigger_list,
1460 session_triggers_ht_node);
1461 end:
1462 return list;
1463 }
1464
1465 /*
1466 * Allocate an empty lttng_session_trigger_list for the session named
1467 * 'session_name'.
1468 *
1469 * No ownership of 'session_name' is assumed by the session trigger list.
1470 * It is the caller's responsability to ensure the session name is alive
1471 * for as long as this list is.
1472 */
1473 static
1474 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1475 const char *session_name,
1476 struct cds_lfht *session_triggers_ht)
1477 {
1478 struct lttng_session_trigger_list *list;
1479
1480 list = zmalloc(sizeof(*list));
1481 if (!list) {
1482 goto end;
1483 }
1484 list->session_name = session_name;
1485 CDS_INIT_LIST_HEAD(&list->list);
1486 cds_lfht_node_init(&list->session_triggers_ht_node);
1487 list->session_triggers_ht = session_triggers_ht;
1488
1489 rcu_read_lock();
1490 /* Publish the list through the session_triggers_ht. */
1491 cds_lfht_add(session_triggers_ht,
1492 hash_key_str(session_name, lttng_ht_seed),
1493 &list->session_triggers_ht_node);
1494 rcu_read_unlock();
1495 end:
1496 return list;
1497 }
1498
1499 static
1500 void free_session_trigger_list_rcu(struct rcu_head *node)
1501 {
1502 free(caa_container_of(node, struct lttng_session_trigger_list,
1503 rcu_node));
1504 }
1505
1506 static
1507 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1508 {
1509 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1510
1511 /* Empty the list element by element, and then free the list itself. */
1512 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1513 &list->list, node) {
1514 cds_list_del(&trigger_list_element->node);
1515 free(trigger_list_element);
1516 }
1517 rcu_read_lock();
1518 /* Unpublish the list from the session_triggers_ht. */
1519 cds_lfht_del(list->session_triggers_ht,
1520 &list->session_triggers_ht_node);
1521 rcu_read_unlock();
1522 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1523 }
1524
1525 static
1526 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1527 struct lttng_trigger *trigger)
1528 {
1529 int ret = 0;
1530 struct lttng_trigger_list_element *new_element =
1531 zmalloc(sizeof(*new_element));
1532
1533 if (!new_element) {
1534 ret = -1;
1535 goto end;
1536 }
1537 CDS_INIT_LIST_HEAD(&new_element->node);
1538 new_element->trigger = trigger;
1539 cds_list_add(&new_element->node, &list->list);
1540 end:
1541 return ret;
1542 }
1543
1544 static
1545 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1546 const char *session_name)
1547 {
1548 bool applies = false;
1549 const struct lttng_condition *condition;
1550
1551 condition = lttng_trigger_get_const_condition(trigger);
1552 switch (lttng_condition_get_type(condition)) {
1553 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1554 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1555 {
1556 enum lttng_condition_status condition_status;
1557 const char *condition_session_name;
1558
1559 condition_status = lttng_condition_session_rotation_get_session_name(
1560 condition, &condition_session_name);
1561 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1562 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1563 goto end;
1564 }
1565
1566 assert(condition_session_name);
1567 applies = !strcmp(condition_session_name, session_name);
1568 break;
1569 }
1570 default:
1571 goto end;
1572 }
1573 end:
1574 return applies;
1575 }
1576
1577 /*
1578 * Allocate and initialize an lttng_session_trigger_list which contains
1579 * all triggers that apply to the session named 'session_name'.
1580 *
1581 * No ownership of 'session_name' is assumed by the session trigger list.
1582 * It is the caller's responsability to ensure the session name is alive
1583 * for as long as this list is.
1584 */
1585 static
1586 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1587 const struct notification_thread_state *state,
1588 const char *session_name)
1589 {
1590 int trigger_count = 0;
1591 struct lttng_session_trigger_list *session_trigger_list = NULL;
1592 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1593 struct cds_lfht_iter iter;
1594
1595 session_trigger_list = lttng_session_trigger_list_create(session_name,
1596 state->session_triggers_ht);
1597
1598 /* Add all triggers applying to the session named 'session_name'. */
1599 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1600 node) {
1601 int ret;
1602
1603 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1604 session_name)) {
1605 continue;
1606 }
1607
1608 ret = lttng_session_trigger_list_add(session_trigger_list,
1609 trigger_ht_element->trigger);
1610 if (ret) {
1611 goto error;
1612 }
1613
1614 trigger_count++;
1615 }
1616
1617 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1618 trigger_count);
1619 return session_trigger_list;
1620 error:
1621 lttng_session_trigger_list_destroy(session_trigger_list);
1622 return NULL;
1623 }
1624
1625 static
1626 struct session_info *find_or_create_session_info(
1627 struct notification_thread_state *state,
1628 const char *name, uid_t uid, gid_t gid)
1629 {
1630 struct session_info *session = NULL;
1631 struct cds_lfht_node *node;
1632 struct cds_lfht_iter iter;
1633 struct lttng_session_trigger_list *trigger_list;
1634
1635 rcu_read_lock();
1636 cds_lfht_lookup(state->sessions_ht,
1637 hash_key_str(name, lttng_ht_seed),
1638 match_session,
1639 name,
1640 &iter);
1641 node = cds_lfht_iter_get_node(&iter);
1642 if (node) {
1643 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1644 name, uid, gid);
1645 session = caa_container_of(node, struct session_info,
1646 sessions_ht_node);
1647 assert(session->uid == uid);
1648 assert(session->gid == gid);
1649 session_info_get(session);
1650 goto end;
1651 }
1652
1653 trigger_list = lttng_session_trigger_list_build(state, name);
1654 if (!trigger_list) {
1655 goto error;
1656 }
1657
1658 session = session_info_create(name, uid, gid, trigger_list,
1659 state->sessions_ht);
1660 if (!session) {
1661 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1662 name, uid, gid);
1663 lttng_session_trigger_list_destroy(trigger_list);
1664 goto error;
1665 }
1666 trigger_list = NULL;
1667
1668 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1669 &session->sessions_ht_node);
1670 end:
1671 rcu_read_unlock();
1672 return session;
1673 error:
1674 rcu_read_unlock();
1675 session_info_put(session);
1676 return NULL;
1677 }
1678
1679 static
1680 int handle_notification_thread_command_add_channel(
1681 struct notification_thread_state *state,
1682 const char *session_name, uid_t session_uid, gid_t session_gid,
1683 const char *channel_name, enum lttng_domain_type channel_domain,
1684 uint64_t channel_key_int, uint64_t channel_capacity,
1685 enum lttng_error_code *cmd_result)
1686 {
1687 struct cds_list_head trigger_list;
1688 struct channel_info *new_channel_info = NULL;
1689 struct channel_key channel_key = {
1690 .key = channel_key_int,
1691 .domain = channel_domain,
1692 };
1693 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1694 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1695 int trigger_count = 0;
1696 struct cds_lfht_iter iter;
1697 struct session_info *session_info = NULL;
1698
1699 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1700 channel_name, session_name, channel_key_int,
1701 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1702
1703 CDS_INIT_LIST_HEAD(&trigger_list);
1704
1705 session_info = find_or_create_session_info(state, session_name,
1706 session_uid, session_gid);
1707 if (!session_info) {
1708 /* Allocation error or an internal error occurred. */
1709 goto error;
1710 }
1711
1712 new_channel_info = channel_info_create(channel_name, &channel_key,
1713 channel_capacity, session_info);
1714 if (!new_channel_info) {
1715 goto error;
1716 }
1717
1718 rcu_read_lock();
1719 /* Build a list of all triggers applying to the new channel. */
1720 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1721 node) {
1722 struct lttng_trigger_list_element *new_element;
1723
1724 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1725 new_channel_info)) {
1726 continue;
1727 }
1728
1729 new_element = zmalloc(sizeof(*new_element));
1730 if (!new_element) {
1731 rcu_read_unlock();
1732 goto error;
1733 }
1734 CDS_INIT_LIST_HEAD(&new_element->node);
1735 new_element->trigger = trigger_ht_element->trigger;
1736 cds_list_add(&new_element->node, &trigger_list);
1737 trigger_count++;
1738 }
1739 rcu_read_unlock();
1740
1741 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1742 trigger_count);
1743 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1744 if (!channel_trigger_list) {
1745 goto error;
1746 }
1747 channel_trigger_list->channel_key = new_channel_info->key;
1748 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1749 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1750 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1751
1752 rcu_read_lock();
1753 /* Add channel to the channel_ht which owns the channel_infos. */
1754 cds_lfht_add(state->channels_ht,
1755 hash_channel_key(&new_channel_info->key),
1756 &new_channel_info->channels_ht_node);
1757 /*
1758 * Add the list of triggers associated with this channel to the
1759 * channel_triggers_ht.
1760 */
1761 cds_lfht_add(state->channel_triggers_ht,
1762 hash_channel_key(&new_channel_info->key),
1763 &channel_trigger_list->channel_triggers_ht_node);
1764 rcu_read_unlock();
1765 session_info_put(session_info);
1766 *cmd_result = LTTNG_OK;
1767 return 0;
1768 error:
1769 channel_info_destroy(new_channel_info);
1770 session_info_put(session_info);
1771 return 1;
1772 }
1773
1774 static
1775 void free_channel_trigger_list_rcu(struct rcu_head *node)
1776 {
1777 free(caa_container_of(node, struct lttng_channel_trigger_list,
1778 rcu_node));
1779 }
1780
1781 static
1782 void free_channel_state_sample_rcu(struct rcu_head *node)
1783 {
1784 free(caa_container_of(node, struct channel_state_sample,
1785 rcu_node));
1786 }
1787
1788 static
1789 int handle_notification_thread_command_remove_channel(
1790 struct notification_thread_state *state,
1791 uint64_t channel_key, enum lttng_domain_type domain,
1792 enum lttng_error_code *cmd_result)
1793 {
1794 struct cds_lfht_node *node;
1795 struct cds_lfht_iter iter;
1796 struct lttng_channel_trigger_list *trigger_list;
1797 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1798 struct channel_key key = { .key = channel_key, .domain = domain };
1799 struct channel_info *channel_info;
1800
1801 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1802 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1803
1804 rcu_read_lock();
1805
1806 cds_lfht_lookup(state->channel_triggers_ht,
1807 hash_channel_key(&key),
1808 match_channel_trigger_list,
1809 &key,
1810 &iter);
1811 node = cds_lfht_iter_get_node(&iter);
1812 /*
1813 * There is a severe internal error if we are being asked to remove a
1814 * channel that doesn't exist.
1815 */
1816 if (!node) {
1817 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1818 goto end;
1819 }
1820
1821 /* Free the list of triggers associated with this channel. */
1822 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1823 channel_triggers_ht_node);
1824 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1825 &trigger_list->list, node) {
1826 cds_list_del(&trigger_list_element->node);
1827 free(trigger_list_element);
1828 }
1829 cds_lfht_del(state->channel_triggers_ht, node);
1830 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1831
1832 /* Free sampled channel state. */
1833 cds_lfht_lookup(state->channel_state_ht,
1834 hash_channel_key(&key),
1835 match_channel_state_sample,
1836 &key,
1837 &iter);
1838 node = cds_lfht_iter_get_node(&iter);
1839 /*
1840 * This is expected to be NULL if the channel is destroyed before we
1841 * received a sample.
1842 */
1843 if (node) {
1844 struct channel_state_sample *sample = caa_container_of(node,
1845 struct channel_state_sample,
1846 channel_state_ht_node);
1847
1848 cds_lfht_del(state->channel_state_ht, node);
1849 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1850 }
1851
1852 /* Remove the channel from the channels_ht and free it. */
1853 cds_lfht_lookup(state->channels_ht,
1854 hash_channel_key(&key),
1855 match_channel_info,
1856 &key,
1857 &iter);
1858 node = cds_lfht_iter_get_node(&iter);
1859 assert(node);
1860 channel_info = caa_container_of(node, struct channel_info,
1861 channels_ht_node);
1862 cds_lfht_del(state->channels_ht, node);
1863 channel_info_destroy(channel_info);
1864 end:
1865 rcu_read_unlock();
1866 *cmd_result = LTTNG_OK;
1867 return 0;
1868 }
1869
1870 static
1871 int handle_notification_thread_command_session_rotation(
1872 struct notification_thread_state *state,
1873 enum notification_thread_command_type cmd_type,
1874 const char *session_name, uid_t session_uid, gid_t session_gid,
1875 uint64_t trace_archive_chunk_id,
1876 struct lttng_trace_archive_location *location,
1877 enum lttng_error_code *_cmd_result)
1878 {
1879 int ret = 0;
1880 enum lttng_error_code cmd_result = LTTNG_OK;
1881 struct lttng_session_trigger_list *trigger_list;
1882 struct lttng_trigger_list_element *trigger_list_element;
1883 struct session_info *session_info;
1884
1885 rcu_read_lock();
1886
1887 session_info = find_or_create_session_info(state, session_name,
1888 session_uid, session_gid);
1889 if (!session_info) {
1890 /* Allocation error or an internal error occurred. */
1891 ret = -1;
1892 cmd_result = LTTNG_ERR_NOMEM;
1893 goto end;
1894 }
1895
1896 session_info->rotation.ongoing =
1897 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1898 session_info->rotation.id = trace_archive_chunk_id;
1899 trigger_list = get_session_trigger_list(state, session_name);
1900 if (!trigger_list) {
1901 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1902 session_name);
1903 goto end;
1904 }
1905
1906 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1907 node) {
1908 const struct lttng_condition *condition;
1909 const struct lttng_action *action;
1910 const struct lttng_trigger *trigger;
1911 struct notification_client_list *client_list;
1912 struct lttng_evaluation *evaluation = NULL;
1913 enum lttng_condition_type condition_type;
1914 bool client_list_is_empty;
1915
1916 trigger = trigger_list_element->trigger;
1917 condition = lttng_trigger_get_const_condition(trigger);
1918 assert(condition);
1919 condition_type = lttng_condition_get_type(condition);
1920
1921 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1922 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1923 continue;
1924 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1925 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1926 continue;
1927 }
1928
1929 action = lttng_trigger_get_const_action(trigger);
1930
1931 /* Notify actions are the only type currently supported. */
1932 assert(lttng_action_get_type_const(action) ==
1933 LTTNG_ACTION_TYPE_NOTIFY);
1934
1935 client_list = get_client_list_from_condition(state, condition);
1936 assert(client_list);
1937
1938 pthread_mutex_lock(&client_list->lock);
1939 client_list_is_empty = cds_list_empty(&client_list->list);
1940 pthread_mutex_unlock(&client_list->lock);
1941 if (client_list_is_empty) {
1942 /*
1943 * No clients interested in the evaluation's result,
1944 * skip it.
1945 */
1946 continue;
1947 }
1948
1949 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1950 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1951 trace_archive_chunk_id);
1952 } else {
1953 evaluation = lttng_evaluation_session_rotation_completed_create(
1954 trace_archive_chunk_id, location);
1955 }
1956
1957 if (!evaluation) {
1958 /* Internal error */
1959 ret = -1;
1960 cmd_result = LTTNG_ERR_UNK;
1961 goto put_list;
1962 }
1963
1964 /* Dispatch evaluation result to all clients. */
1965 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1966 evaluation, client_list, state,
1967 session_info->uid,
1968 session_info->gid);
1969 lttng_evaluation_destroy(evaluation);
1970 put_list:
1971 notification_client_list_put(client_list);
1972 if (caa_unlikely(ret)) {
1973 break;
1974 }
1975 }
1976 end:
1977 session_info_put(session_info);
1978 *_cmd_result = cmd_result;
1979 rcu_read_unlock();
1980 return ret;
1981 }
1982
1983 static
1984 int handle_notification_thread_command_add_application(
1985 struct notification_thread_handle *handle,
1986 struct notification_thread_state *state,
1987 int read_side_trigger_event_application_pipe,
1988 enum lttng_error_code *_cmd_result)
1989 {
1990 int ret = 0;
1991 enum lttng_error_code cmd_result = LTTNG_OK;
1992 struct notification_event_trigger_source_element *element = NULL;
1993
1994 element = zmalloc(sizeof(*element));
1995 if (!element) {
1996 cmd_result = LTTNG_ERR_NOMEM;
1997 ret = -1;
1998 goto end;
1999 }
2000
2001 CDS_INIT_LIST_HEAD(&element->node);
2002 element->fd = read_side_trigger_event_application_pipe;
2003
2004 pthread_mutex_lock(&handle->event_trigger_sources.lock);
2005 cds_list_add(&element->node, &handle->event_trigger_sources.list);
2006 pthread_mutex_unlock(&handle->event_trigger_sources.lock);
2007
2008 /* TODO: remove on failure to add to list? */
2009
2010 /* Adding the read side pipe to the event poll */
2011 ret = lttng_poll_add(&state->events,
2012 read_side_trigger_event_application_pipe,
2013 LPOLLIN | LPOLLERR);
2014
2015 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe);
2016 if (ret < 0) {
2017 /* TODO: what should be the value of cmd_result??? */
2018 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
2019 goto end;
2020 }
2021
2022 end:
2023 *_cmd_result = cmd_result;
2024 return ret;
2025 }
2026
2027 static
2028 int handle_notification_thread_command_remove_application(
2029 struct notification_thread_handle *handle,
2030 struct notification_thread_state *state,
2031 int read_side_trigger_event_application_pipe,
2032 enum lttng_error_code *_cmd_result)
2033 {
2034 int ret = 0;
2035 enum lttng_error_code cmd_result = LTTNG_OK;
2036
2037 /* TODO: missing a lock propably to revisit */
2038 struct notification_event_trigger_source_element *source_element, *tmp;
2039 cds_list_for_each_entry_safe(source_element, tmp,
2040 &handle->event_trigger_sources.list, node) {
2041 if (source_element->fd != read_side_trigger_event_application_pipe) {
2042 continue;
2043 }
2044
2045 DBG("[notification-thread] Removed event source from event source list");
2046 cds_list_del(&source_element->node);
2047 break;
2048 }
2049
2050 DBG3("[notification-thread] Removing application event source from fd: %d", read_side_trigger_event_application_pipe);
2051 /* Removing the read side pipe to the event poll */
2052 ret = lttng_poll_del(&state->events,
2053 read_side_trigger_event_application_pipe);
2054 if (ret < 0) {
2055 /* TODO: what should be the value of cmd_result??? */
2056 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
2057 goto end;
2058 }
2059
2060 end:
2061 *_cmd_result = cmd_result;
2062 return ret;
2063 }
2064
2065 static int handle_notification_thread_command_get_tokens(
2066 struct notification_thread_handle *handle,
2067 struct notification_thread_state *state,
2068 struct lttng_triggers **triggers,
2069 enum lttng_error_code *_cmd_result)
2070 {
2071 int ret = 0, i = 0;
2072 enum lttng_error_code cmd_result = LTTNG_OK;
2073 struct cds_lfht_iter iter;
2074 struct notification_trigger_tokens_ht_element *element;
2075 struct lttng_triggers *local_triggers = NULL;
2076
2077 local_triggers = lttng_triggers_create();
2078 if (!local_triggers) {
2079 cmd_result = LTTNG_ERR_NOMEM;
2080 goto end;
2081 }
2082
2083 rcu_read_lock();
2084 cds_lfht_for_each_entry (
2085 state->trigger_tokens_ht, &iter, element, node) {
2086 ret = lttng_triggers_add(local_triggers, element->trigger);
2087 if (ret < 0) {
2088 cmd_result = LTTNG_ERR_FATAL;
2089 ret = -1;
2090 goto end;
2091 }
2092
2093 /* Ownership is shared with the lttng_triggers object */
2094 lttng_trigger_get(element->trigger);
2095
2096 i++;
2097 }
2098
2099 /* Passing ownership up */
2100 *triggers = local_triggers;
2101 local_triggers = NULL;
2102
2103 end:
2104 rcu_read_unlock();
2105 lttng_triggers_destroy(local_triggers);
2106 *_cmd_result = cmd_result;
2107 return ret;
2108 }
2109
2110 static
2111 int handle_notification_thread_command_list_triggers(
2112 struct notification_thread_handle *handle,
2113 struct notification_thread_state *state,
2114 uid_t uid,
2115 gid_t gid,
2116 struct lttng_triggers **triggers,
2117 enum lttng_error_code *_cmd_result)
2118 {
2119 int ret = 0, i = 0;
2120 enum lttng_error_code cmd_result = LTTNG_OK;
2121 struct cds_lfht_iter iter;
2122 struct lttng_trigger_ht_element *trigger_ht_element;
2123 struct lttng_triggers *local_triggers = NULL;
2124 const struct lttng_credentials *creds;
2125
2126 long scb, sca;
2127 unsigned long count;
2128
2129 rcu_read_lock();
2130 cds_lfht_count_nodes(state->triggers_ht, &scb, &count, &sca);
2131
2132 /* TODO check downcasting */
2133 local_triggers = lttng_triggers_create();
2134 if (!local_triggers) {
2135 cmd_result = LTTNG_ERR_NOMEM;
2136 goto end;
2137 }
2138
2139 cds_lfht_for_each_entry (state->triggers_ht, &iter,
2140 trigger_ht_element, node) {
2141 /* Only return the trigger for which the requestion client have
2142 * access. For now the root user can only list its own
2143 * triggers.
2144 * TODO: root user behavior
2145 */
2146 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2147 if ((uid != creds->uid) || (gid != creds->gid)) {
2148 continue;
2149 }
2150
2151 ret = lttng_triggers_add(local_triggers, trigger_ht_element->trigger);
2152 if (ret < 0) {
2153 ret = -1;
2154 goto end;
2155 }
2156 /* Ownership is shared with the lttng_triggers object */
2157 lttng_trigger_get(trigger_ht_element->trigger);
2158
2159 i++;
2160 }
2161
2162 /* Passing ownership up */
2163 *triggers = local_triggers;
2164 local_triggers = NULL;
2165
2166 end:
2167 rcu_read_unlock();
2168 lttng_triggers_destroy(local_triggers);
2169 *_cmd_result = cmd_result;
2170 return ret;
2171 }
2172
2173 static
2174 int condition_is_supported(struct lttng_condition *condition)
2175 {
2176 int ret;
2177
2178 switch (lttng_condition_get_type(condition)) {
2179 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2180 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2181 {
2182 enum lttng_domain_type domain;
2183
2184 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2185 &domain);
2186 if (ret) {
2187 ret = -1;
2188 goto end;
2189 }
2190
2191 if (domain != LTTNG_DOMAIN_KERNEL) {
2192 ret = 1;
2193 goto end;
2194 }
2195
2196 /*
2197 * Older kernel tracers don't expose the API to monitor their
2198 * buffers. Therefore, we reject triggers that require that
2199 * mechanism to be available to be evaluated.
2200 */
2201 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2202 break;
2203 }
2204 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2205 {
2206 /* TODO:
2207 * Check for kernel support.
2208 * Check for ust support ??
2209 */
2210 ret = 1;
2211 break;
2212 }
2213 default:
2214 ret = 1;
2215 }
2216 end:
2217 return ret;
2218 }
2219
2220 static
2221 int action_is_supported(struct lttng_action *action)
2222 {
2223 int ret;
2224
2225 switch (lttng_action_get_type(action)) {
2226 case LTTNG_ACTION_TYPE_NOTIFY:
2227 case LTTNG_ACTION_TYPE_START_SESSION:
2228 case LTTNG_ACTION_TYPE_STOP_SESSION:
2229 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
2230 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
2231 {
2232 /* TODO validate that this is true for kernel in regards to
2233 * rotation and snapshot. Start stop is not a problem notify
2234 * either.
2235 */
2236 /* For now all type of actions are supported */
2237 ret = 1;
2238 break;
2239 }
2240 case LTTNG_ACTION_TYPE_GROUP:
2241 {
2242 /* TODO: Iterate over all internal actions and validate that
2243 * they are supported
2244 */
2245 ret = 1;
2246 break;
2247
2248 }
2249 default:
2250 ret = 1;
2251 }
2252
2253 return ret;
2254 }
2255
2256 /* Must be called with RCU read lock held. */
2257 static
2258 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2259 struct notification_thread_state *state)
2260 {
2261 int ret = 0;
2262 const struct lttng_condition *condition;
2263 const char *session_name;
2264 struct lttng_session_trigger_list *trigger_list;
2265
2266 condition = lttng_trigger_get_const_condition(trigger);
2267 switch (lttng_condition_get_type(condition)) {
2268 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2269 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2270 {
2271 enum lttng_condition_status status;
2272
2273 status = lttng_condition_session_rotation_get_session_name(
2274 condition, &session_name);
2275 if (status != LTTNG_CONDITION_STATUS_OK) {
2276 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2277 ret = -1;
2278 goto end;
2279 }
2280 break;
2281 }
2282 default:
2283 ret = -1;
2284 goto end;
2285 }
2286
2287 trigger_list = get_session_trigger_list(state, session_name);
2288 if (!trigger_list) {
2289 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2290 session_name);
2291 goto end;
2292
2293 }
2294
2295 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2296 session_name);
2297 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2298 end:
2299 return ret;
2300 }
2301
2302 /* Must be called with RCU read lock held. */
2303 static
2304 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2305 struct notification_thread_state *state)
2306 {
2307 int ret = 0;
2308 struct cds_lfht_node *node;
2309 struct cds_lfht_iter iter;
2310 struct channel_info *channel;
2311
2312 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2313 channels_ht_node) {
2314 struct lttng_trigger_list_element *trigger_list_element;
2315 struct lttng_channel_trigger_list *trigger_list;
2316 struct cds_lfht_iter lookup_iter;
2317
2318 if (!trigger_applies_to_channel(trigger, channel)) {
2319 continue;
2320 }
2321
2322 cds_lfht_lookup(state->channel_triggers_ht,
2323 hash_channel_key(&channel->key),
2324 match_channel_trigger_list,
2325 &channel->key,
2326 &lookup_iter);
2327 node = cds_lfht_iter_get_node(&lookup_iter);
2328 assert(node);
2329 trigger_list = caa_container_of(node,
2330 struct lttng_channel_trigger_list,
2331 channel_triggers_ht_node);
2332
2333 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2334 if (!trigger_list_element) {
2335 ret = -1;
2336 goto end;
2337 }
2338 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2339 trigger_list_element->trigger = trigger;
2340 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2341 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2342 channel->name);
2343 }
2344 end:
2345 return ret;
2346 }
2347
2348 static int action_notify_register_trigger(
2349 struct notification_thread_state *state,
2350 struct lttng_trigger *trigger)
2351 {
2352
2353 int ret = 0;
2354 struct lttng_condition *condition;
2355 struct notification_client *client;
2356 struct notification_client_list *client_list = NULL;
2357 struct cds_lfht_iter iter;
2358 struct notification_client_list_element *client_list_element, *tmp;
2359
2360 condition = lttng_trigger_get_condition(trigger);
2361 assert(condition);
2362
2363 client_list = notification_client_list_create(trigger);
2364 if (!client_list) {
2365 ret = -1;
2366 goto end;
2367 }
2368
2369 /* Build a list of clients to which this new trigger applies. */
2370 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2371 client_socket_ht_node) {
2372 if (!trigger_applies_to_client(trigger, client)) {
2373 continue;
2374 }
2375
2376 client_list_element = zmalloc(sizeof(*client_list_element));
2377 if (!client_list_element) {
2378 ret = -1;
2379 goto error_put_client_list;
2380 }
2381 CDS_INIT_LIST_HEAD(&client_list_element->node);
2382 client_list_element->client = client;
2383 cds_list_add(&client_list_element->node, &client_list->list);
2384 }
2385
2386 switch (get_condition_binding_object(condition)) {
2387 case LTTNG_OBJECT_TYPE_SESSION:
2388 /* Add the trigger to the list if it matches a known session. */
2389 ret = bind_trigger_to_matching_session(trigger, state);
2390 if (ret) {
2391 goto error_put_client_list;
2392 }
2393 break;
2394 case LTTNG_OBJECT_TYPE_CHANNEL:
2395 /*
2396 * Add the trigger to list of triggers bound to the channels
2397 * currently known.
2398 */
2399 ret = bind_trigger_to_matching_channels(trigger, state);
2400 if (ret) {
2401 goto error_put_client_list;
2402 }
2403 break;
2404 case LTTNG_OBJECT_TYPE_NONE:
2405 break;
2406 default:
2407 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2408 ret = -1;
2409 goto error_put_client_list;
2410 }
2411
2412 /*
2413 * Since there is nothing preventing clients from subscribing to a
2414 * condition before the corresponding trigger is registered, we have
2415 * to evaluate this new condition right away.
2416 *
2417 * At some point, we were waiting for the next "evaluation" (e.g. on
2418 * reception of a channel sample) to evaluate this new condition, but
2419 * that was broken.
2420 *
2421 * The reason it was broken is that waiting for the next sample
2422 * does not allow us to properly handle transitions for edge-triggered
2423 * conditions.
2424 *
2425 * Consider this example: when we handle a new channel sample, we
2426 * evaluate each conditions twice: once with the previous state, and
2427 * again with the newest state. We then use those two results to
2428 * determine whether a state change happened: a condition was false and
2429 * became true. If a state change happened, we have to notify clients.
2430 *
2431 * Now, if a client subscribes to a given notification and registers
2432 * a trigger *after* that subscription, we have to make sure the
2433 * condition is evaluated at this point while considering only the
2434 * current state. Otherwise, the next evaluation cycle may only see
2435 * that the evaluations remain the same (true for samples n-1 and n) and
2436 * the client will never know that the condition has been met.
2437 *
2438 * No need to lock the list here as it has not been published yet.
2439 */
2440 cds_list_for_each_entry_safe(client_list_element, tmp,
2441 &client_list->list, node) {
2442 ret = evaluate_condition_for_client(trigger, condition,
2443 client_list_element->client, state);
2444 if (ret) {
2445 goto error_put_client_list;
2446 }
2447 }
2448
2449 /*
2450 * Client list ownership transferred to the
2451 * notification_trigger_clients_ht.
2452 */
2453 publish_notification_client_list(state, client_list);
2454 client_list = NULL;
2455 error_put_client_list:
2456 notification_client_list_put(client_list);
2457 end:
2458 return ret;
2459 }
2460
2461 static
2462 bool trigger_name_taken(struct notification_thread_state *state, const char *name)
2463 {
2464 struct cds_lfht_node *triggers_by_name_ht_node;
2465 struct cds_lfht_iter iter;
2466 /* TODO change hashing for trigger */
2467 cds_lfht_lookup(state->triggers_by_name_ht,
2468 hash_key_str(name, lttng_ht_seed),
2469 match_str,
2470 name,
2471 &iter);
2472 triggers_by_name_ht_node = cds_lfht_iter_get_node(&iter);
2473 if (triggers_by_name_ht_node) {
2474 return true;
2475 } else {
2476 return false;
2477 }
2478
2479 }
2480 static
2481 void generate_trigger_name(struct notification_thread_state *state, struct lttng_trigger *trigger, const char **name)
2482 {
2483 /* Here the offset criteria guarantee an end. This will be a nice
2484 * bikeshedding conversation. I would simply generate uuid and use them
2485 * as trigger name.
2486 */
2487 bool taken = false;
2488 do {
2489 lttng_trigger_generate_name(trigger, state->trigger_id.name_offset);
2490 /* TODO error checking */
2491 lttng_trigger_get_name(trigger, name);
2492 taken = trigger_name_taken(state, *name);
2493 if (taken) {
2494 state->trigger_id.name_offset++;
2495 }
2496 } while (taken || state->trigger_id.name_offset == UINT32_MAX);
2497 }
2498
2499 static bool action_is_notify(const struct lttng_action *action)
2500 {
2501 /* TODO for action groups we need to iterate over all of them */
2502 enum lttng_action_type type = lttng_action_get_type_const(action);
2503 bool ret = false;
2504 enum lttng_action_status status;
2505 const struct lttng_action *tmp;
2506 unsigned int i, count;
2507
2508 switch (type) {
2509 case LTTNG_ACTION_TYPE_NOTIFY:
2510 ret = true;
2511 break;
2512 case LTTNG_ACTION_TYPE_GROUP:
2513 status = lttng_action_group_get_count(action, &count);
2514 if (status != LTTNG_ACTION_STATUS_OK) {
2515 assert(0);
2516 }
2517 for (i = 0; i < count; i++) {
2518 tmp = lttng_action_group_get_at_index_const(action, i);
2519 assert(tmp);
2520 ret = action_is_notify(tmp);
2521 if (ret) {
2522 break;
2523 }
2524 }
2525 break;
2526 default:
2527 ret = false;
2528 break;
2529 }
2530
2531 return ret;
2532 }
2533
2534 /*
2535 * TODO: REVIEW THIS COMMENT.
2536 * FIXME A client's credentials are not checked when registering a trigger, nor
2537 * are they stored alongside with the trigger.
2538 *
2539 * The effects of this are benign since:
2540 * - The client will succeed in registering the trigger, as it is valid,
2541 * - The trigger will, internally, be bound to the channel/session,
2542 * - The notifications will not be sent since the client's credentials
2543 * are checked against the channel at that moment.
2544 *
2545 * If this function returns a non-zero value, it means something is
2546 * fundamentally broken and the whole subsystem/thread will be torn down.
2547 *
2548 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2549 * error code.
2550 */
2551 static
2552 int handle_notification_thread_command_register_trigger(
2553 struct notification_thread_state *state,
2554 struct lttng_trigger *trigger,
2555 enum lttng_error_code *cmd_result)
2556 {
2557 int ret = 0;
2558 int is_supported;
2559 struct lttng_condition *condition;
2560 struct lttng_action *action;
2561 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2562 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2563 struct cds_lfht_node *node;
2564 const char* trigger_name;
2565 bool free_trigger = true;
2566
2567 assert(trigger->creds.set);
2568
2569 rcu_read_lock();
2570
2571 /* Set the trigger's key */
2572 lttng_trigger_set_key(trigger, state->trigger_id.token_generator);
2573
2574 if (lttng_trigger_get_name(trigger, &trigger_name) == LTTNG_TRIGGER_STATUS_UNSET) {
2575 generate_trigger_name(state, trigger, &trigger_name);
2576 } else if (trigger_name_taken(state, trigger_name)) {
2577 /* Not a fatal error */
2578 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2579 ret = 0;
2580 goto error;
2581 }
2582
2583 condition = lttng_trigger_get_condition(trigger);
2584 assert(condition);
2585
2586 action = lttng_trigger_get_action(trigger);
2587 assert(action);
2588
2589 is_supported = condition_is_supported(condition);
2590 if (is_supported < 0) {
2591 goto error;
2592 } else if (is_supported == 0) {
2593 ret = 0;
2594 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2595 goto error;
2596 }
2597
2598 is_supported = action_is_supported(action);
2599 if (is_supported < 0) {
2600 goto error;
2601 } else if (is_supported == 0) {
2602 ret = 0;
2603 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2604 goto error;
2605 }
2606
2607 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2608 if (!trigger_ht_element) {
2609 ret = -1;
2610 goto error;
2611 }
2612
2613 /* Add trigger to the trigger_ht. */
2614 cds_lfht_node_init(&trigger_ht_element->node);
2615 cds_lfht_node_init(&trigger_ht_element->node_by_name);
2616
2617 /*
2618 * This element own the trigger object from now own, this is why there
2619 * is no lttng_trigger_get here.
2620 * This thread is now the owner of the trigger object.
2621 */
2622 trigger_ht_element->trigger = trigger;
2623
2624 node = cds_lfht_add_unique(state->triggers_ht,
2625 lttng_condition_hash(condition),
2626 match_trigger,
2627 trigger,
2628 &trigger_ht_element->node);
2629 if (node != &trigger_ht_element->node) {
2630 /* Not a fatal error, simply report it to the client. */
2631 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2632 goto error_free_ht_element;
2633 }
2634
2635 node = cds_lfht_add_unique(state->triggers_by_name_ht,
2636 hash_key_str(trigger_name, lttng_ht_seed),
2637 match_str,
2638 trigger_name,
2639 &trigger_ht_element->node_by_name);
2640 if (node != &trigger_ht_element->node_by_name) {
2641 /* This should never happen */
2642 /* Not a fatal error, simply report it to the client. */
2643 /* TODO remove from the trigger_ht */
2644 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2645 goto error_free_ht_element;
2646 }
2647
2648 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2649 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2650 if (!trigger_tokens_ht_element) {
2651 ret = -1;
2652 goto error;
2653 }
2654
2655 /* Add trigger token to the trigger_tokens_ht. */
2656 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2657 trigger_tokens_ht_element->token = trigger->key.value;
2658 trigger_tokens_ht_element->trigger = trigger;
2659
2660 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2661 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2662 match_trigger_token,
2663 &trigger_tokens_ht_element->token,
2664 &trigger_tokens_ht_element->node);
2665 if (node != &trigger_tokens_ht_element->node) {
2666 /* TODO: THIS IS A FATAL ERROR... should never happen */
2667 /* Not a fatal error, simply report it to the client. */
2668 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2669 goto error_free_ht_element;
2670 }
2671 }
2672
2673 /*
2674 * Ownership of the trigger and of its wrapper was transfered to
2675 * the triggers_ht. Same for token ht element if necessary.
2676 */
2677 trigger_tokens_ht_element = NULL;
2678 trigger_ht_element = NULL;
2679 free_trigger = false;
2680
2681 if (action_is_notify(action)) {
2682 ret = action_notify_register_trigger(state, trigger);
2683 if (ret < 0) {
2684 /* TODO should cmd_result be set here? */
2685 ret = -1;
2686 goto error_free_ht_element;
2687 }
2688 }
2689
2690 /* Increment the trigger unique id generator */
2691 state->trigger_id.token_generator++;
2692 *cmd_result = LTTNG_OK;
2693
2694 error_free_ht_element:
2695 free(trigger_ht_element);
2696 free(trigger_tokens_ht_element);
2697 error:
2698 if (free_trigger) {
2699 lttng_trigger_destroy(trigger);
2700 }
2701 rcu_read_unlock();
2702 return ret;
2703 }
2704
2705 static
2706 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2707 {
2708 free(caa_container_of(node, struct lttng_trigger_ht_element,
2709 rcu_node));
2710 }
2711
2712 static
2713 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2714 {
2715 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2716 rcu_node));
2717 }
2718
2719 static
2720 int handle_notification_thread_command_unregister_trigger(
2721 struct notification_thread_state *state,
2722 struct lttng_trigger *trigger,
2723 enum lttng_error_code *_cmd_reply)
2724 {
2725 struct cds_lfht_iter iter;
2726 struct cds_lfht_node *triggers_ht_node;
2727 struct lttng_channel_trigger_list *trigger_list;
2728 struct notification_client_list *client_list;
2729 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2730 struct lttng_condition *condition = lttng_trigger_get_condition(
2731 trigger);
2732 struct lttng_action *action = lttng_trigger_get_action(trigger);
2733 enum lttng_error_code cmd_reply;
2734
2735 rcu_read_lock();
2736
2737 /* TODO change hashing for trigger */
2738 /* TODO Disabling for the root user is not complete, for now the root
2739 * user cannot disable the trigger from another user.
2740 */
2741 cds_lfht_lookup(state->triggers_ht,
2742 lttng_condition_hash(condition),
2743 match_trigger,
2744 trigger,
2745 &iter);
2746 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2747 if (!triggers_ht_node) {
2748 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2749 goto end;
2750 } else {
2751 cmd_reply = LTTNG_OK;
2752 }
2753
2754 /* Remove trigger from channel_triggers_ht. */
2755 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2756 channel_triggers_ht_node) {
2757 struct lttng_trigger_list_element *trigger_element, *tmp;
2758
2759 cds_list_for_each_entry_safe(trigger_element, tmp,
2760 &trigger_list->list, node) {
2761 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
2762 continue;
2763 }
2764
2765 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2766 cds_list_del(&trigger_element->node);
2767 /* A trigger can only appear once per channel */
2768 break;
2769 }
2770 }
2771
2772 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2773 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
2774 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter, trigger_tokens_ht_element,
2775 node) {
2776 if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
2777 continue;
2778 }
2779
2780 /* TODO talk to all app and remove it */
2781 DBG("[notification-thread] Removed trigger from tokens_ht");
2782 cds_lfht_del(state->trigger_tokens_ht,
2783 &trigger_tokens_ht_element->node);
2784 call_rcu(&trigger_tokens_ht_element->rcu_node, free_notification_trigger_tokens_ht_element_rcu);
2785
2786 break;
2787 }
2788 }
2789
2790 if (action_is_notify(action)) {
2791 /*
2792 * Remove and release the client list from
2793 * notification_trigger_clients_ht.
2794 */
2795 client_list = get_client_list_from_condition(state, condition);
2796 assert(client_list);
2797
2798 /* Put new reference and the hashtable's reference. */
2799 notification_client_list_put(client_list);
2800 notification_client_list_put(client_list);
2801 client_list = NULL;
2802 }
2803
2804 /* Remove trigger from triggers_ht. */
2805 trigger_ht_element = caa_container_of(triggers_ht_node,
2806 struct lttng_trigger_ht_element, node);
2807 cds_lfht_del(state->triggers_by_name_ht, &trigger_ht_element->node_by_name);
2808 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2809
2810 /* Release the ownership of the trigger */
2811 lttng_trigger_destroy(trigger_ht_element->trigger);
2812 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2813 end:
2814 rcu_read_unlock();
2815 if (_cmd_reply) {
2816 *_cmd_reply = cmd_reply;
2817 }
2818 return 0;
2819 }
2820
2821 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2822 int handle_notification_thread_command(
2823 struct notification_thread_handle *handle,
2824 struct notification_thread_state *state)
2825 {
2826 int ret;
2827 uint64_t counter;
2828 struct notification_thread_command *cmd;
2829
2830 /* Read the event pipe to put it back into a quiescent state. */
2831 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2832 sizeof(counter));
2833 if (ret != sizeof(counter)) {
2834 goto error;
2835 }
2836
2837 pthread_mutex_lock(&handle->cmd_queue.lock);
2838 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2839 struct notification_thread_command, cmd_list_node);
2840 switch (cmd->type) {
2841 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2842 DBG("[notification-thread] Received register trigger command");
2843 ret = handle_notification_thread_command_register_trigger(
2844 state, cmd->parameters.trigger,
2845 &cmd->reply_code);
2846 break;
2847 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2848 DBG("[notification-thread] Received unregister trigger command");
2849 ret = handle_notification_thread_command_unregister_trigger(
2850 state, cmd->parameters.trigger,
2851 &cmd->reply_code);
2852 break;
2853 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2854 DBG("[notification-thread] Received add channel command");
2855 ret = handle_notification_thread_command_add_channel(
2856 state,
2857 cmd->parameters.add_channel.session.name,
2858 cmd->parameters.add_channel.session.uid,
2859 cmd->parameters.add_channel.session.gid,
2860 cmd->parameters.add_channel.channel.name,
2861 cmd->parameters.add_channel.channel.domain,
2862 cmd->parameters.add_channel.channel.key,
2863 cmd->parameters.add_channel.channel.capacity,
2864 &cmd->reply_code);
2865 break;
2866 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2867 DBG("[notification-thread] Received remove channel command");
2868 ret = handle_notification_thread_command_remove_channel(
2869 state, cmd->parameters.remove_channel.key,
2870 cmd->parameters.remove_channel.domain,
2871 &cmd->reply_code);
2872 break;
2873 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2874 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2875 DBG("[notification-thread] Received session rotation %s command",
2876 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2877 "ongoing" : "completed");
2878 ret = handle_notification_thread_command_session_rotation(
2879 state,
2880 cmd->type,
2881 cmd->parameters.session_rotation.session_name,
2882 cmd->parameters.session_rotation.uid,
2883 cmd->parameters.session_rotation.gid,
2884 cmd->parameters.session_rotation.trace_archive_chunk_id,
2885 cmd->parameters.session_rotation.location,
2886 &cmd->reply_code);
2887 break;
2888 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION:
2889 ret = handle_notification_thread_command_add_application(
2890 handle,
2891 state,
2892 cmd->parameters.application.read_side_trigger_event_application_pipe,
2893 &cmd->reply_code);
2894 break;
2895 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION:
2896 ret = handle_notification_thread_command_remove_application(
2897 handle,
2898 state,
2899 cmd->parameters.application.read_side_trigger_event_application_pipe,
2900 &cmd->reply_code);
2901 break;
2902 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS:
2903 {
2904 struct lttng_triggers *triggers = NULL;
2905 ret = handle_notification_thread_command_get_tokens(
2906 handle, state, &triggers, &cmd->reply_code);
2907 cmd->reply.get_tokens.triggers = triggers;
2908 ret = 0;
2909 break;
2910
2911 cmd->reply_code = LTTNG_OK;
2912 ret = 0;
2913 break;
2914 }
2915 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2916 {
2917 struct lttng_triggers *triggers = NULL;
2918 ret = handle_notification_thread_command_list_triggers(
2919 handle,
2920 state,
2921 cmd->parameters.list_triggers.uid,
2922 cmd->parameters.list_triggers.gid,
2923 &triggers,
2924 &cmd->reply_code);
2925 cmd->reply.list_triggers.triggers = triggers;
2926 ret = 0;
2927 break;
2928 }
2929 case NOTIFICATION_COMMAND_TYPE_QUIT:
2930 DBG("[notification-thread] Received quit command");
2931 cmd->reply_code = LTTNG_OK;
2932 ret = 1;
2933 goto end;
2934 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2935 {
2936 const enum client_transmission_status client_status =
2937 cmd->parameters.client_communication_update
2938 .status;
2939 const notification_client_id client_id =
2940 cmd->parameters.client_communication_update.id;
2941 struct notification_client *client;
2942
2943 rcu_read_lock();
2944 client = get_client_from_id(client_id, state);
2945
2946 if (!client) {
2947 /*
2948 * Client error was probably already picked-up by the
2949 * notification thread or it has disconnected
2950 * gracefully while this command was queued.
2951 */
2952 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2953 client_id);
2954 ret = 0;
2955 } else {
2956 pthread_mutex_lock(&client->lock);
2957 ret = client_handle_transmission_status(
2958 client, client_status, state);
2959 pthread_mutex_unlock(&client->lock);
2960 }
2961 rcu_read_unlock();
2962 break;
2963 }
2964 default:
2965 ERR("[notification-thread] Unknown internal command received");
2966 goto error_unlock;
2967 }
2968
2969 if (ret) {
2970 goto error_unlock;
2971 }
2972 end:
2973 cds_list_del(&cmd->cmd_list_node);
2974 if (cmd->is_async) {
2975 free(cmd);
2976 cmd = NULL;
2977 } else {
2978 lttng_waiter_wake_up(&cmd->reply_waiter);
2979 }
2980 pthread_mutex_unlock(&handle->cmd_queue.lock);
2981 return ret;
2982 error_unlock:
2983 /* Wake-up and return a fatal error to the calling thread. */
2984 lttng_waiter_wake_up(&cmd->reply_waiter);
2985 pthread_mutex_unlock(&handle->cmd_queue.lock);
2986 cmd->reply_code = LTTNG_ERR_FATAL;
2987 error:
2988 /* Indicate a fatal error to the caller. */
2989 return -1;
2990 }
2991
2992 static
2993 int socket_set_non_blocking(int socket)
2994 {
2995 int ret, flags;
2996
2997 /* Set the pipe as non-blocking. */
2998 ret = fcntl(socket, F_GETFL, 0);
2999 if (ret == -1) {
3000 PERROR("fcntl get socket flags");
3001 goto end;
3002 }
3003 flags = ret;
3004
3005 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
3006 if (ret == -1) {
3007 PERROR("fcntl set O_NONBLOCK socket flag");
3008 goto end;
3009 }
3010 DBG("Client socket (fd = %i) set as non-blocking", socket);
3011 end:
3012 return ret;
3013 }
3014
3015 /* Client lock must be acquired by caller. */
3016 static
3017 int client_reset_inbound_state(struct notification_client *client)
3018 {
3019 int ret;
3020
3021 ASSERT_LOCKED(client->lock);
3022
3023 ret = lttng_dynamic_buffer_set_size(
3024 &client->communication.inbound.buffer, 0);
3025 assert(!ret);
3026
3027 client->communication.inbound.bytes_to_receive =
3028 sizeof(struct lttng_notification_channel_message);
3029 client->communication.inbound.msg_type =
3030 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
3031 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3032 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
3033 ret = lttng_dynamic_buffer_set_size(
3034 &client->communication.inbound.buffer,
3035 client->communication.inbound.bytes_to_receive);
3036 return ret;
3037 }
3038
3039 int handle_notification_thread_client_connect(
3040 struct notification_thread_state *state)
3041 {
3042 int ret;
3043 struct notification_client *client;
3044
3045 DBG("[notification-thread] Handling new notification channel client connection");
3046
3047 client = zmalloc(sizeof(*client));
3048 if (!client) {
3049 /* Fatal error. */
3050 ret = -1;
3051 goto error;
3052 }
3053 pthread_mutex_init(&client->lock, NULL);
3054 client->id = state->next_notification_client_id++;
3055 CDS_INIT_LIST_HEAD(&client->condition_list);
3056 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
3057 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
3058 client->communication.inbound.expect_creds = true;
3059
3060 pthread_mutex_lock(&client->lock);
3061 ret = client_reset_inbound_state(client);
3062 pthread_mutex_unlock(&client->lock);
3063 if (ret) {
3064 ERR("[notification-thread] Failed to reset client communication's inbound state");
3065 ret = 0;
3066 goto error;
3067 }
3068
3069 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3070 if (ret < 0) {
3071 ERR("[notification-thread] Failed to accept new notification channel client connection");
3072 ret = 0;
3073 goto error;
3074 }
3075
3076 client->socket = ret;
3077
3078 ret = socket_set_non_blocking(client->socket);
3079 if (ret) {
3080 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3081 goto error;
3082 }
3083
3084 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3085 if (ret < 0) {
3086 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3087 ret = 0;
3088 goto error;
3089 }
3090
3091 ret = lttng_poll_add(&state->events, client->socket,
3092 LPOLLIN | LPOLLERR |
3093 LPOLLHUP | LPOLLRDHUP);
3094 if (ret < 0) {
3095 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3096 ret = 0;
3097 goto error;
3098 }
3099 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3100 client->socket);
3101
3102 rcu_read_lock();
3103 cds_lfht_add(state->client_socket_ht,
3104 hash_client_socket(client->socket),
3105 &client->client_socket_ht_node);
3106 cds_lfht_add(state->client_id_ht,
3107 hash_client_id(client->id),
3108 &client->client_id_ht_node);
3109 rcu_read_unlock();
3110
3111 return ret;
3112 error:
3113 notification_client_destroy(client, state);
3114 return ret;
3115 }
3116
3117 /* RCU read-lock must be held by the caller. */
3118 /* Client lock must be held by the caller */
3119 static
3120 int notification_thread_client_disconnect(
3121 struct notification_client *client,
3122 struct notification_thread_state *state)
3123 {
3124 int ret;
3125 struct lttng_condition_list_element *condition_list_element, *tmp;
3126
3127 /* Acquire the client lock to disable its communication atomically. */
3128 client->communication.active = false;
3129 ret = lttng_poll_del(&state->events, client->socket);
3130 if (ret) {
3131 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3132 client->socket);
3133 }
3134
3135 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3136 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3137
3138 /* Release all conditions to which the client was subscribed. */
3139 cds_list_for_each_entry_safe(condition_list_element, tmp,
3140 &client->condition_list, node) {
3141 (void) notification_thread_client_unsubscribe(client,
3142 condition_list_element->condition, state, NULL);
3143 }
3144
3145 /*
3146 * Client no longer accessible to other threads (through the
3147 * client lists).
3148 */
3149 notification_client_destroy(client, state);
3150 return ret;
3151 }
3152
3153 int handle_notification_thread_client_disconnect(
3154 int client_socket, struct notification_thread_state *state)
3155 {
3156 int ret = 0;
3157 struct notification_client *client;
3158
3159 rcu_read_lock();
3160 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3161 client_socket);
3162 client = get_client_from_socket(client_socket, state);
3163 if (!client) {
3164 /* Internal state corruption, fatal error. */
3165 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3166 client_socket);
3167 ret = -1;
3168 goto end;
3169 }
3170
3171 pthread_mutex_lock(&client->lock);
3172 ret = notification_thread_client_disconnect(client, state);
3173 pthread_mutex_unlock(&client->lock);
3174 end:
3175 rcu_read_unlock();
3176 return ret;
3177 }
3178
3179 int handle_notification_thread_client_disconnect_all(
3180 struct notification_thread_state *state)
3181 {
3182 struct cds_lfht_iter iter;
3183 struct notification_client *client;
3184 bool error_encoutered = false;
3185
3186 rcu_read_lock();
3187 DBG("[notification-thread] Closing all client connections");
3188 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3189 client_socket_ht_node) {
3190 int ret;
3191
3192 pthread_mutex_lock(&client->lock);
3193 ret = notification_thread_client_disconnect(
3194 client, state);
3195 pthread_mutex_unlock(&client->lock);
3196 if (ret) {
3197 error_encoutered = true;
3198 }
3199 }
3200 rcu_read_unlock();
3201 return error_encoutered ? 1 : 0;
3202 }
3203
3204 int handle_notification_thread_trigger_unregister_all(
3205 struct notification_thread_state *state)
3206 {
3207 bool error_occurred = false;
3208 struct cds_lfht_iter iter;
3209 struct lttng_trigger_ht_element *trigger_ht_element;
3210
3211 rcu_read_lock();
3212 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3213 node) {
3214 int ret = handle_notification_thread_command_unregister_trigger(
3215 state, trigger_ht_element->trigger, NULL);
3216 if (ret) {
3217 error_occurred = true;
3218 }
3219 }
3220 rcu_read_unlock();
3221 return error_occurred ? -1 : 0;
3222 }
3223
3224 static
3225 int client_handle_transmission_status(
3226 struct notification_client *client,
3227 enum client_transmission_status transmission_status,
3228 struct notification_thread_state *state)
3229 {
3230 int ret = 0;
3231
3232 ASSERT_LOCKED(client->lock);
3233
3234 switch (transmission_status) {
3235 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3236 ret = lttng_poll_mod(&state->events, client->socket,
3237 CLIENT_POLL_MASK_IN);
3238 if (ret) {
3239 goto end;
3240 }
3241
3242 client->communication.outbound.queued_command_reply = false;
3243 client->communication.outbound.dropped_notification = false;
3244 break;
3245 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3246 /*
3247 * We want to be notified whenever there is buffer space
3248 * available to send the rest of the payload.
3249 */
3250 ret = lttng_poll_mod(&state->events, client->socket,
3251 CLIENT_POLL_MASK_IN_OUT);
3252 if (ret) {
3253 goto end;
3254 }
3255 break;
3256 case CLIENT_TRANSMISSION_STATUS_FAIL:
3257 ret = notification_thread_client_disconnect(client, state);
3258 if (ret) {
3259 goto end;
3260 }
3261 break;
3262 case CLIENT_TRANSMISSION_STATUS_ERROR:
3263 ret = -1;
3264 goto end;
3265 default:
3266 abort();
3267 }
3268 end:
3269 return ret;
3270 }
3271
3272 /* Client lock must be acquired by caller. */
3273 static
3274 enum client_transmission_status client_flush_outgoing_queue(
3275 struct notification_client *client)
3276 {
3277 ssize_t ret;
3278 size_t to_send_count;
3279 enum client_transmission_status status;
3280
3281 ASSERT_LOCKED(client->lock);
3282
3283 if (!client->communication.active) {
3284 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3285 goto end;
3286 }
3287
3288 assert(client->communication.outbound.buffer.size != 0);
3289 to_send_count = client->communication.outbound.buffer.size;
3290 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3291 client->socket);
3292
3293 ret = lttcomm_send_unix_sock_non_block(client->socket,
3294 client->communication.outbound.buffer.data,
3295 to_send_count);
3296 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
3297 (ret > 0 && ret < to_send_count)) {
3298 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3299 client->socket);
3300 to_send_count -= max(ret, 0);
3301
3302 memcpy(client->communication.outbound.buffer.data,
3303 client->communication.outbound.buffer.data +
3304 client->communication.outbound.buffer.size - to_send_count,
3305 to_send_count);
3306 ret = lttng_dynamic_buffer_set_size(
3307 &client->communication.outbound.buffer,
3308 to_send_count);
3309 if (ret) {
3310 goto error;
3311 }
3312 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3313 } else if (ret < 0) {
3314 /* Generic error, disconnect the client. */
3315 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3316 client->socket);
3317 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3318 } else {
3319 /* No error and flushed the queue completely. */
3320 ret = lttng_dynamic_buffer_set_size(
3321 &client->communication.outbound.buffer, 0);
3322 if (ret) {
3323 goto error;
3324 }
3325 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3326 }
3327 end:
3328 return status;
3329 error:
3330 return CLIENT_TRANSMISSION_STATUS_ERROR;
3331 }
3332
3333 /* Client lock must be acquired by caller. */
3334 static
3335 int client_send_command_reply(struct notification_client *client,
3336 struct notification_thread_state *state,
3337 enum lttng_notification_channel_status status)
3338 {
3339 int ret;
3340 struct lttng_notification_channel_command_reply reply = {
3341 .status = (int8_t) status,
3342 };
3343 struct lttng_notification_channel_message msg = {
3344 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3345 .size = sizeof(reply),
3346 };
3347 char buffer[sizeof(msg) + sizeof(reply)];
3348 enum client_transmission_status transmission_status;
3349
3350 ASSERT_LOCKED(client->lock);
3351
3352 if (client->communication.outbound.queued_command_reply) {
3353 /* Protocol error. */
3354 goto error;
3355 }
3356
3357 memcpy(buffer, &msg, sizeof(msg));
3358 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3359 DBG("[notification-thread] Send command reply (%i)", (int) status);
3360
3361 /* Enqueue buffer to outgoing queue and flush it. */
3362 ret = lttng_dynamic_buffer_append(
3363 &client->communication.outbound.buffer,
3364 buffer, sizeof(buffer));
3365 if (ret) {
3366 goto error;
3367 }
3368
3369 transmission_status = client_flush_outgoing_queue(client);
3370 ret = client_handle_transmission_status(
3371 client, transmission_status, state);
3372 if (ret) {
3373 goto error;
3374 }
3375
3376 if (client->communication.outbound.buffer.size != 0) {
3377 /* Queue could not be emptied. */
3378 client->communication.outbound.queued_command_reply = true;
3379 }
3380
3381 return 0;
3382 error:
3383 return -1;
3384 }
3385
3386 static
3387 int client_handle_message_unknown(struct notification_client *client,
3388 struct notification_thread_state *state)
3389 {
3390 int ret;
3391
3392 pthread_mutex_lock(&client->lock);
3393
3394 /*
3395 * Receiving message header. The function will be called again
3396 * once the rest of the message as been received and can be
3397 * interpreted.
3398 */
3399 const struct lttng_notification_channel_message *msg;
3400
3401 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
3402 msg = (const struct lttng_notification_channel_message *)
3403 client->communication.inbound.buffer.data;
3404
3405 if (msg->size == 0 ||
3406 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3407 ERR("[notification-thread] Invalid notification channel message: length = %u",
3408 msg->size);
3409 ret = -1;
3410 goto end;
3411 }
3412
3413 switch (msg->type) {
3414 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3415 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3416 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3417 break;
3418 default:
3419 ret = -1;
3420 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3421 goto end;
3422 }
3423
3424 client->communication.inbound.bytes_to_receive = msg->size;
3425 client->communication.inbound.msg_type =
3426 (enum lttng_notification_channel_message_type) msg->type;
3427 ret = lttng_dynamic_buffer_set_size(
3428 &client->communication.inbound.buffer, msg->size);
3429 end:
3430 pthread_mutex_unlock(&client->lock);
3431 return ret;
3432 }
3433
3434 static
3435 int client_handle_message_handshake(struct notification_client *client,
3436 struct notification_thread_state *state)
3437 {
3438 int ret;
3439 struct lttng_notification_channel_command_handshake *handshake_client;
3440 const struct lttng_notification_channel_command_handshake handshake_reply = {
3441 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3442 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3443 };
3444 const struct lttng_notification_channel_message msg_header = {
3445 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3446 .size = sizeof(handshake_reply),
3447 };
3448 enum lttng_notification_channel_status status =
3449 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3450 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3451 enum client_transmission_status transmission_status;
3452
3453 pthread_mutex_lock(&client->lock);
3454
3455 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3456 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3457 sizeof(handshake_reply));
3458
3459 handshake_client =
3460 (struct lttng_notification_channel_command_handshake *)
3461 client->communication.inbound.buffer
3462 .data;
3463 client->major = handshake_client->major;
3464 client->minor = handshake_client->minor;
3465 if (!client->communication.inbound.creds_received) {
3466 ERR("[notification-thread] No credentials received from client");
3467 ret = -1;
3468 goto end;
3469 }
3470
3471 client->uid = LTTNG_SOCK_GET_UID_CRED(
3472 &client->communication.inbound.creds);
3473 client->gid = LTTNG_SOCK_GET_GID_CRED(
3474 &client->communication.inbound.creds);
3475 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3476 client->uid, client->gid, (int) client->major,
3477 (int) client->minor);
3478
3479 if (handshake_client->major !=
3480 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3481 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3482 }
3483
3484 ret = lttng_dynamic_buffer_append(
3485 &client->communication.outbound.buffer, send_buffer,
3486 sizeof(send_buffer));
3487 if (ret) {
3488 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3489 goto end;
3490 }
3491
3492 client->validated = true;
3493 client->communication.active = true;
3494
3495 transmission_status = client_flush_outgoing_queue(client);
3496 ret = client_handle_transmission_status(
3497 client, transmission_status, state);
3498 if (ret) {
3499 goto end;
3500 }
3501
3502 ret = client_send_command_reply(client, state, status);
3503 if (ret) {
3504 ERR("[notification-thread] Failed to send reply to notification channel client");
3505 goto end;
3506 }
3507
3508 /* Set reception state to receive the next message header. */
3509 ret = client_reset_inbound_state(client);
3510 if (ret) {
3511 ERR("[notification-thread] Failed to reset client communication's inbound state");
3512 goto end;
3513 }
3514
3515 end:
3516 pthread_mutex_unlock(&client->lock);
3517 return ret;
3518 }
3519
3520 static
3521 int client_handle_message_subscription(
3522 struct notification_client *client,
3523 enum lttng_notification_channel_message_type msg_type,
3524 struct notification_thread_state *state)
3525 {
3526 int ret;
3527 struct lttng_condition *condition;
3528 enum lttng_notification_channel_status status =
3529 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3530 const struct lttng_buffer_view condition_view =
3531 lttng_buffer_view_from_dynamic_buffer(
3532 &client->communication.inbound.buffer,
3533 0, -1);
3534 size_t expected_condition_size;
3535
3536 pthread_mutex_lock(&client->lock);
3537 expected_condition_size = client->communication.inbound.buffer.size;
3538 pthread_mutex_unlock(&client->lock);
3539
3540 ret = lttng_condition_create_from_buffer(&condition_view, &condition);
3541 if (ret != expected_condition_size) {
3542 ERR("[notification-thread] Malformed condition received from client");
3543 goto end;
3544 }
3545
3546 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3547 ret = notification_thread_client_subscribe(
3548 client, condition, state, &status);
3549 } else {
3550 ret = notification_thread_client_unsubscribe(
3551 client, condition, state, &status);
3552 }
3553 if (ret) {
3554 goto end;
3555 }
3556
3557 pthread_mutex_lock(&client->lock);
3558 ret = client_send_command_reply(client, state, status);
3559 if (ret) {
3560 ERR("[notification-thread] Failed to send reply to notification channel client");
3561 goto end_unlock;
3562 }
3563
3564 /* Set reception state to receive the next message header. */
3565 ret = client_reset_inbound_state(client);
3566 if (ret) {
3567 ERR("[notification-thread] Failed to reset client communication's inbound state");
3568 goto end_unlock;
3569 }
3570
3571 end_unlock:
3572 pthread_mutex_unlock(&client->lock);
3573 end:
3574 return ret;
3575 }
3576
3577 static
3578 int client_dispatch_message(struct notification_client *client,
3579 struct notification_thread_state *state)
3580 {
3581 int ret = 0;
3582
3583 if (client->communication.inbound.msg_type !=
3584 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3585 client->communication.inbound.msg_type !=
3586 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3587 !client->validated) {
3588 WARN("[notification-thread] client attempted a command before handshake");
3589 ret = -1;
3590 goto end;
3591 }
3592
3593 switch (client->communication.inbound.msg_type) {
3594 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3595 {
3596 ret = client_handle_message_unknown(client, state);
3597 break;
3598 }
3599 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3600 {
3601 ret = client_handle_message_handshake(client, state);
3602 break;
3603 }
3604 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3605 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3606 {
3607 ret = client_handle_message_subscription(client,
3608 client->communication.inbound.msg_type, state);
3609 break;
3610 }
3611 default:
3612 abort();
3613 }
3614 end:
3615 return ret;
3616 }
3617
3618 /* Incoming data from client. */
3619 int handle_notification_thread_client_in(
3620 struct notification_thread_state *state, int socket)
3621 {
3622 int ret = 0;
3623 struct notification_client *client;
3624 ssize_t recv_ret;
3625 size_t offset;
3626 bool message_is_complete = false;
3627
3628 client = get_client_from_socket(socket, state);
3629 if (!client) {
3630 /* Internal error, abort. */
3631 ret = -1;
3632 goto end;
3633 }
3634
3635 pthread_mutex_lock(&client->lock);
3636 offset = client->communication.inbound.buffer.size -
3637 client->communication.inbound.bytes_to_receive;
3638 if (client->communication.inbound.expect_creds) {
3639 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3640 client->communication.inbound.buffer.data + offset,
3641 client->communication.inbound.bytes_to_receive,
3642 &client->communication.inbound.creds);
3643 if (recv_ret > 0) {
3644 client->communication.inbound.expect_creds = false;
3645 client->communication.inbound.creds_received = true;
3646 }
3647 } else {
3648 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3649 client->communication.inbound.buffer.data + offset,
3650 client->communication.inbound.bytes_to_receive);
3651 }
3652 if (recv_ret >= 0) {
3653 client->communication.inbound.bytes_to_receive -= recv_ret;
3654 message_is_complete = client->communication.inbound
3655 .bytes_to_receive == 0;
3656 }
3657 pthread_mutex_unlock(&client->lock);
3658 if (recv_ret < 0) {
3659 goto error_disconnect_client;
3660 }
3661
3662 if (message_is_complete) {
3663 ret = client_dispatch_message(client, state);
3664 if (ret) {
3665 /*
3666 * Only returns an error if this client must be
3667 * disconnected.
3668 */
3669 goto error_disconnect_client;
3670 }
3671 }
3672 end:
3673 return ret;
3674 error_disconnect_client:
3675 pthread_mutex_lock(&client->lock);
3676 ret = notification_thread_client_disconnect(client, state);
3677 pthread_mutex_unlock(&client->lock);
3678 return ret;
3679 }
3680
3681 /* Client ready to receive outgoing data. */
3682 int handle_notification_thread_client_out(
3683 struct notification_thread_state *state, int socket)
3684 {
3685 int ret;
3686 struct notification_client *client;
3687 enum client_transmission_status transmission_status;
3688
3689 client = get_client_from_socket(socket, state);
3690 if (!client) {
3691 /* Internal error, abort. */
3692 ret = -1;
3693 goto end;
3694 }
3695
3696 pthread_mutex_lock(&client->lock);
3697 transmission_status = client_flush_outgoing_queue(client);
3698 ret = client_handle_transmission_status(
3699 client, transmission_status, state);
3700 pthread_mutex_unlock(&client->lock);
3701 if (ret) {
3702 goto end;
3703 }
3704 end:
3705 return ret;
3706 }
3707
3708 static
3709 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3710 const struct channel_state_sample *sample,
3711 uint64_t buffer_capacity)
3712 {
3713 bool result = false;
3714 uint64_t threshold;
3715 enum lttng_condition_type condition_type;
3716 const struct lttng_condition_buffer_usage *use_condition = container_of(
3717 condition, struct lttng_condition_buffer_usage,
3718 parent);
3719
3720 if (use_condition->threshold_bytes.set) {
3721 threshold = use_condition->threshold_bytes.value;
3722 } else {
3723 /*
3724 * Threshold was expressed as a ratio.
3725 *
3726 * TODO the threshold (in bytes) of conditions expressed
3727 * as a ratio of total buffer size could be cached to
3728 * forego this double-multiplication or it could be performed
3729 * as fixed-point math.
3730 *
3731 * Note that caching should accomodate the case where the
3732 * condition applies to multiple channels (i.e. don't assume
3733 * that all channels matching my_chann* have the same size...)
3734 */
3735 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3736 (double) buffer_capacity);
3737 }
3738
3739 condition_type = lttng_condition_get_type(condition);
3740 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3741 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3742 threshold, sample->highest_usage);
3743
3744 /*
3745 * The low condition should only be triggered once _all_ of the
3746 * streams in a channel have gone below the "low" threshold.
3747 */
3748 if (sample->highest_usage <= threshold) {
3749 result = true;
3750 }
3751 } else {
3752 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3753 threshold, sample->highest_usage);
3754
3755 /*
3756 * For high buffer usage scenarios, we want to trigger whenever
3757 * _any_ of the streams has reached the "high" threshold.
3758 */
3759 if (sample->highest_usage >= threshold) {
3760 result = true;
3761 }
3762 }
3763
3764 return result;
3765 }
3766
3767 static
3768 bool evaluate_session_consumed_size_condition(
3769 const struct lttng_condition *condition,
3770 uint64_t session_consumed_size)
3771 {
3772 uint64_t threshold;
3773 const struct lttng_condition_session_consumed_size *size_condition =
3774 container_of(condition,
3775 struct lttng_condition_session_consumed_size,
3776 parent);
3777
3778 threshold = size_condition->consumed_threshold_bytes.value;
3779 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3780 threshold, session_consumed_size);
3781 return session_consumed_size >= threshold;
3782 }
3783
3784 static
3785 int evaluate_buffer_condition(const struct lttng_condition *condition,
3786 struct lttng_evaluation **evaluation,
3787 const struct notification_thread_state *state,
3788 const struct channel_state_sample *previous_sample,
3789 const struct channel_state_sample *latest_sample,
3790 uint64_t previous_session_consumed_total,
3791 uint64_t latest_session_consumed_total,
3792 struct channel_info *channel_info)
3793 {
3794 int ret = 0;
3795 enum lttng_condition_type condition_type;
3796 const bool previous_sample_available = !!previous_sample;
3797 bool previous_sample_result = false;
3798 bool latest_sample_result;
3799
3800 condition_type = lttng_condition_get_type(condition);
3801
3802 switch (condition_type) {
3803 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3804 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3805 if (caa_likely(previous_sample_available)) {
3806 previous_sample_result =
3807 evaluate_buffer_usage_condition(condition,
3808 previous_sample, channel_info->capacity);
3809 }
3810 latest_sample_result = evaluate_buffer_usage_condition(
3811 condition, latest_sample,
3812 channel_info->capacity);
3813 break;
3814 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3815 if (caa_likely(previous_sample_available)) {
3816 previous_sample_result =
3817 evaluate_session_consumed_size_condition(
3818 condition,
3819 previous_session_consumed_total);
3820 }
3821 latest_sample_result =
3822 evaluate_session_consumed_size_condition(
3823 condition,
3824 latest_session_consumed_total);
3825 break;
3826 default:
3827 /* Unknown condition type; internal error. */
3828 abort();
3829 }
3830
3831 if (!latest_sample_result ||
3832 (previous_sample_result == latest_sample_result)) {
3833 /*
3834 * Only trigger on a condition evaluation transition.
3835 *
3836 * NOTE: This edge-triggered logic may not be appropriate for
3837 * future condition types.
3838 */
3839 goto end;
3840 }
3841
3842 if (!evaluation || !latest_sample_result) {
3843 goto end;
3844 }
3845
3846 switch (condition_type) {
3847 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3848 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3849 *evaluation = lttng_evaluation_buffer_usage_create(
3850 condition_type,
3851 latest_sample->highest_usage,
3852 channel_info->capacity);
3853 break;
3854 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3855 *evaluation = lttng_evaluation_session_consumed_size_create(
3856 latest_session_consumed_total);
3857 break;
3858 default:
3859 abort();
3860 }
3861
3862 if (!*evaluation) {
3863 ret = -1;
3864 goto end;
3865 }
3866 end:
3867 return ret;
3868 }
3869
3870 static
3871 int client_notification_overflow(struct notification_client *client)
3872 {
3873 int ret = 0;
3874 const struct lttng_notification_channel_message msg = {
3875 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3876 };
3877
3878 ASSERT_LOCKED(client->lock);
3879
3880 DBG("Dropping notification addressed to client (socket fd = %i)",
3881 client->socket);
3882 if (client->communication.outbound.dropped_notification) {
3883 /*
3884 * The client already has a "notification dropped" message
3885 * in its outgoing queue. Nothing to do since all
3886 * of those messages are coalesced.
3887 */
3888 goto end;
3889 }
3890
3891 client->communication.outbound.dropped_notification = true;
3892 ret = lttng_dynamic_buffer_append(
3893 &client->communication.outbound.buffer, &msg,
3894 sizeof(msg));
3895 if (ret) {
3896 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3897 client->socket);
3898 }
3899 end:
3900 return ret;
3901 }
3902
3903 static int client_handle_transmission_status_wrapper(
3904 struct notification_client *client,
3905 enum client_transmission_status status,
3906 void *user_data)
3907 {
3908 return client_handle_transmission_status(client, status,
3909 (struct notification_thread_state *) user_data);
3910 }
3911
3912 static
3913 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3914 const struct lttng_evaluation *evaluation,
3915 struct notification_client_list* client_list,
3916 struct notification_thread_state *state,
3917 uid_t object_uid, gid_t object_gid)
3918 {
3919 return notification_client_list_send_evaluation(client_list,
3920 lttng_trigger_get_const_condition(trigger), evaluation,
3921 lttng_trigger_get_credentials(trigger),
3922 &(struct lttng_credentials){
3923 .uid = object_uid, .gid = object_gid},
3924 client_handle_transmission_status_wrapper, state);
3925 }
3926
3927 LTTNG_HIDDEN
3928 int notification_client_list_send_evaluation(
3929 struct notification_client_list *client_list,
3930 const struct lttng_condition *condition,
3931 const struct lttng_evaluation *evaluation,
3932 const struct lttng_credentials *trigger_creds,
3933 const struct lttng_credentials *source_object_creds,
3934 report_client_transmission_result_cb client_report,
3935 void *user_data)
3936 {
3937 int ret = 0;
3938 struct lttng_dynamic_buffer msg_buffer;
3939 struct notification_client_list_element *client_list_element, *tmp;
3940 const struct lttng_notification notification = {
3941 .condition = (struct lttng_condition *) condition,
3942 .evaluation = (struct lttng_evaluation *) evaluation,
3943 };
3944 struct lttng_notification_channel_message msg_header = {
3945 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3946 };
3947
3948 lttng_dynamic_buffer_init(&msg_buffer);
3949
3950 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
3951 sizeof(msg_header));
3952 if (ret) {
3953 goto end;
3954 }
3955
3956 ret = lttng_notification_serialize(&notification, &msg_buffer);
3957 if (ret) {
3958 ERR("[notification-thread] Failed to serialize notification");
3959 ret = -1;
3960 goto end;
3961 }
3962
3963 /* Update payload size. */
3964 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
3965 (uint32_t) (msg_buffer.size - sizeof(msg_header));
3966
3967 pthread_mutex_lock(&client_list->lock);
3968 cds_list_for_each_entry_safe(client_list_element, tmp,
3969 &client_list->list, node) {
3970 enum client_transmission_status transmission_status;
3971 struct notification_client *client =
3972 client_list_element->client;
3973
3974 ret = 0;
3975 pthread_mutex_lock(&client->lock);
3976 if (source_object_creds) {
3977 if (client->uid != source_object_creds->uid &&
3978 client->gid != source_object_creds->gid &&
3979 client->uid != 0) {
3980 /*
3981 * Client is not allowed to monitor this
3982 * object.
3983 */
3984 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3985 goto unlock_client;
3986 }
3987 }
3988
3989 /* TODO: what is the behavior for root client on non root
3990 * trigger? Since multiple triggers (different user) can have the same condition
3991 * but with different action group that can have each a notify.
3992 * Does the root client receive multiple notification for all
3993 * those triggers with the same condition or only notification
3994 * for triggers the root user configured?
3995 * For now we do the later. All users including the root user
3996 * can only receive notification from trigger it registered.
3997 */
3998 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3999 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
4000 goto unlock_client;
4001 }
4002
4003 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
4004 client->socket, msg_buffer.size);
4005 if (client->communication.outbound.buffer.size) {
4006 /*
4007 * Outgoing data is already buffered for this client;
4008 * drop the notification and enqueue a "dropped
4009 * notification" message if this is the first dropped
4010 * notification since the socket spilled-over to the
4011 * queue.
4012 */
4013 ret = client_notification_overflow(client);
4014 if (ret) {
4015 goto unlock_client;
4016 }
4017 }
4018
4019 ret = lttng_dynamic_buffer_append_buffer(
4020 &client->communication.outbound.buffer,
4021 &msg_buffer);
4022 if (ret) {
4023 goto unlock_client;
4024 }
4025
4026 transmission_status = client_flush_outgoing_queue(client);
4027 ret = client_report(client, transmission_status, user_data);
4028 if (ret) {
4029 goto unlock_client;
4030 }
4031 unlock_client:
4032 pthread_mutex_unlock(&client->lock);
4033 if (ret) {
4034 goto end_unlock_list;
4035 }
4036 }
4037 ret = 0;
4038
4039 end_unlock_list:
4040 pthread_mutex_unlock(&client_list->lock);
4041 end:
4042 lttng_dynamic_buffer_reset(&msg_buffer);
4043 return ret;
4044 }
4045
4046 int handle_notification_thread_event(struct notification_thread_state *state,
4047 int pipe,
4048 enum lttng_domain_type domain)
4049 {
4050 int ret;
4051 struct lttng_ust_trigger_notification ust_notification;
4052 uint64_t kernel_notification;
4053 struct cds_lfht_node *node;
4054 struct cds_lfht_iter iter;
4055 struct notification_trigger_tokens_ht_element *element;
4056 struct lttng_trigger_notification notification;
4057 void *reception_buffer;
4058 size_t reception_size;
4059 enum action_executor_status executor_status;
4060 struct notification_client_list *client_list = NULL;
4061
4062 notification.type = domain;
4063
4064 switch(domain) {
4065 case LTTNG_DOMAIN_UST:
4066 reception_buffer = (void *) &ust_notification;
4067 reception_size = sizeof(ust_notification);
4068 notification.u.ust = &ust_notification;
4069 break;
4070 case LTTNG_DOMAIN_KERNEL:
4071 reception_buffer = (void *) &kernel_notification;
4072 reception_size = sizeof(kernel_notification);
4073 notification.u.kernel = &kernel_notification;
4074 break;
4075 default:
4076 assert(0);
4077 }
4078
4079 /*
4080 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4081 * ensuring that read/write of sampling messages are atomic.
4082 */
4083 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4084
4085 ret = lttng_read(pipe, reception_buffer, reception_size);
4086 if (ret != reception_size) {
4087 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4088 pipe);
4089 /* TODO: Should this error out completly.
4090 * This can happen when an app is killed as of today
4091 * ret = -1 cause the whole thread to die and fuck up
4092 * everything.
4093 */
4094 goto end;
4095 }
4096
4097 switch(domain) {
4098 case LTTNG_DOMAIN_UST:
4099 notification.id = ust_notification.id;
4100 break;
4101 case LTTNG_DOMAIN_KERNEL:
4102 notification.id = kernel_notification;
4103 break;
4104 default:
4105 assert(0);
4106 }
4107
4108 /* Find triggers associated with this token. */
4109 rcu_read_lock();
4110 cds_lfht_lookup(state->trigger_tokens_ht,
4111 hash_key_u64(&notification.id, lttng_ht_seed), match_trigger_token,
4112 &notification.id, &iter);
4113 node = cds_lfht_iter_get_node(&iter);
4114 if (caa_unlikely(!node)) {
4115 /* TODO: is this an error? This might happen if the receive side
4116 * is slow to process event from source and that the trigger was
4117 * removed but the app still kicking. This yield another
4118 * question on the trigger lifetime and when we can remove a
4119 * trigger. How to guarantee that all event with the token idea
4120 * have be processed? Do we want to provide this guarantee?
4121 *
4122 * Update: I have encountered this when using a trigger on
4123 * sched_switch and then removing it. The frequency is quite
4124 * high hence we en up exactly in the mentionned scenario.
4125 * AFAIK this might be the best way to handle this.
4126 */
4127 ret = 0;
4128 goto end_unlock;
4129 }
4130 element = caa_container_of(node,
4131 struct notification_trigger_tokens_ht_element,
4132 node);
4133
4134 if (!lttng_trigger_is_ready_to_fire(element->trigger)) {
4135 ret = 0;
4136 goto end_unlock;
4137 }
4138
4139 client_list = get_client_list_from_condition(state,
4140 lttng_trigger_get_const_condition(element->trigger));
4141 executor_status = action_executor_enqueue(
4142 state->executor, element->trigger, client_list);
4143 switch (executor_status) {
4144 case ACTION_EXECUTOR_STATUS_OK:
4145 ret = 0;
4146 break;
4147 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4148 {
4149 struct notification_client_list_element *client_list_element,
4150 *tmp;
4151
4152 /*
4153 * Not a fatal error; this is expected and simply means the
4154 * executor has too much work queued already.
4155 */
4156 ret = 0;
4157
4158 if (!client_list) {
4159 break;
4160 }
4161
4162 /* Warn clients that a notification (or more) was dropped. */
4163 pthread_mutex_lock(&client_list->lock);
4164 cds_list_for_each_entry_safe(client_list_element, tmp,
4165 &client_list->list, node) {
4166 enum client_transmission_status transmission_status;
4167 struct notification_client *client =
4168 client_list_element->client;
4169
4170 pthread_mutex_lock(&client->lock);
4171 ret = client_notification_overflow(client);
4172 if (ret) {
4173 /* Fatal error. */
4174 goto next_client;
4175 }
4176
4177 transmission_status =
4178 client_flush_outgoing_queue(client);
4179 ret = client_handle_transmission_status(
4180 client, transmission_status, state);
4181 if (ret) {
4182 /* Fatal error. */
4183 goto next_client;
4184 }
4185 next_client:
4186 pthread_mutex_unlock(&client->lock);
4187 if (ret) {
4188 break;
4189 }
4190 }
4191 pthread_mutex_lock(&client_list->lock);
4192 break;
4193 }
4194 case ACTION_EXECUTOR_STATUS_ERROR:
4195 /* Fatal error, shut down everything. */
4196 ERR("Fatal error encoutered while enqueuing action");
4197 ret = -1;
4198 goto end_unlock;
4199 default:
4200 /* Unhandled error. */
4201 abort();
4202 }
4203
4204 end_unlock:
4205 notification_client_list_put(client_list);
4206 rcu_read_unlock();
4207 end:
4208 return ret;
4209 }
4210
4211 int handle_notification_thread_channel_sample(
4212 struct notification_thread_state *state, int pipe,
4213 enum lttng_domain_type domain)
4214 {
4215 int ret = 0;
4216 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4217 struct channel_info *channel_info;
4218 struct cds_lfht_node *node;
4219 struct cds_lfht_iter iter;
4220 struct lttng_channel_trigger_list *trigger_list;
4221 struct lttng_trigger_list_element *trigger_list_element;
4222 bool previous_sample_available = false;
4223 struct channel_state_sample previous_sample, latest_sample;
4224 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4225
4226 /*
4227 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4228 * ensuring that read/write of sampling messages are atomic.
4229 */
4230 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4231 if (ret != sizeof(sample_msg)) {
4232 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4233 pipe);
4234 ret = -1;
4235 goto end;
4236 }
4237
4238 ret = 0;
4239 latest_sample.key.key = sample_msg.key;
4240 latest_sample.key.domain = domain;
4241 latest_sample.highest_usage = sample_msg.highest;
4242 latest_sample.lowest_usage = sample_msg.lowest;
4243 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4244
4245 rcu_read_lock();
4246
4247 /* Retrieve the channel's informations */
4248 cds_lfht_lookup(state->channels_ht,
4249 hash_channel_key(&latest_sample.key),
4250 match_channel_info,
4251 &latest_sample.key,
4252 &iter);
4253 node = cds_lfht_iter_get_node(&iter);
4254 if (caa_unlikely(!node)) {
4255 /*
4256 * Not an error since the consumer can push a sample to the pipe
4257 * and the rest of the session daemon could notify us of the
4258 * channel's destruction before we get a chance to process that
4259 * sample.
4260 */
4261 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4262 latest_sample.key.key,
4263 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
4264 "user space");
4265 goto end_unlock;
4266 }
4267 channel_info = caa_container_of(node, struct channel_info,
4268 channels_ht_node);
4269 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4270 channel_info->name,
4271 latest_sample.key.key,
4272 channel_info->session_info->name,
4273 latest_sample.highest_usage,
4274 latest_sample.lowest_usage,
4275 latest_sample.channel_total_consumed);
4276
4277 previous_session_consumed_total =
4278 channel_info->session_info->consumed_data_size;
4279
4280 /* Retrieve the channel's last sample, if it exists, and update it. */
4281 cds_lfht_lookup(state->channel_state_ht,
4282 hash_channel_key(&latest_sample.key),
4283 match_channel_state_sample,
4284 &latest_sample.key,
4285 &iter);
4286 node = cds_lfht_iter_get_node(&iter);
4287 if (caa_likely(node)) {
4288 struct channel_state_sample *stored_sample;
4289
4290 /* Update the sample stored. */
4291 stored_sample = caa_container_of(node,
4292 struct channel_state_sample,
4293 channel_state_ht_node);
4294
4295 memcpy(&previous_sample, stored_sample,
4296 sizeof(previous_sample));
4297 stored_sample->highest_usage = latest_sample.highest_usage;
4298 stored_sample->lowest_usage = latest_sample.lowest_usage;
4299 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4300 previous_sample_available = true;
4301
4302 latest_session_consumed_total =
4303 previous_session_consumed_total +
4304 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4305 } else {
4306 /*
4307 * This is the channel's first sample, allocate space for and
4308 * store the new sample.
4309 */
4310 struct channel_state_sample *stored_sample;
4311
4312 stored_sample = zmalloc(sizeof(*stored_sample));
4313 if (!stored_sample) {
4314 ret = -1;
4315 goto end_unlock;
4316 }
4317
4318 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4319 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4320 cds_lfht_add(state->channel_state_ht,
4321 hash_channel_key(&stored_sample->key),
4322 &stored_sample->channel_state_ht_node);
4323
4324 latest_session_consumed_total =
4325 previous_session_consumed_total +
4326 latest_sample.channel_total_consumed;
4327 }
4328
4329 channel_info->session_info->consumed_data_size =
4330 latest_session_consumed_total;
4331
4332 /* Find triggers associated with this channel. */
4333 cds_lfht_lookup(state->channel_triggers_ht,
4334 hash_channel_key(&latest_sample.key),
4335 match_channel_trigger_list,
4336 &latest_sample.key,
4337 &iter);
4338 node = cds_lfht_iter_get_node(&iter);
4339 if (caa_likely(!node)) {
4340 goto end_unlock;
4341 }
4342
4343 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4344 channel_triggers_ht_node);
4345 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4346 node) {
4347 const struct lttng_condition *condition;
4348 const struct lttng_action *action;
4349 struct lttng_trigger *trigger;
4350 struct notification_client_list *client_list = NULL;
4351 struct lttng_evaluation *evaluation = NULL;
4352 bool client_list_is_empty;
4353
4354 ret = 0;
4355 trigger = trigger_list_element->trigger;
4356 condition = lttng_trigger_get_const_condition(trigger);
4357 assert(condition);
4358 action = lttng_trigger_get_const_action(trigger);
4359
4360 if (!lttng_trigger_is_ready_to_fire(trigger)) {
4361 goto put_list;
4362 }
4363
4364 /* Notify actions are the only type currently supported. */
4365 /* TODO support other type of action */
4366 assert(lttng_action_get_type_const(action) ==
4367 LTTNG_ACTION_TYPE_NOTIFY);
4368
4369 /*
4370 * Check if any client is subscribed to the result of this
4371 * evaluation.
4372 */
4373 client_list = get_client_list_from_condition(state, condition);
4374 assert(client_list);
4375 client_list_is_empty = cds_list_empty(&client_list->list);
4376 if (client_list_is_empty) {
4377 /*
4378 * No clients interested in the evaluation's result,
4379 * skip it.
4380 */
4381 goto put_list;
4382 }
4383
4384 ret = evaluate_buffer_condition(condition, &evaluation, state,
4385 previous_sample_available ? &previous_sample : NULL,
4386 &latest_sample,
4387 previous_session_consumed_total,
4388 latest_session_consumed_total,
4389 channel_info);
4390 if (caa_unlikely(ret)) {
4391 goto put_list;
4392 }
4393
4394 if (caa_likely(!evaluation)) {
4395 goto put_list;
4396 }
4397
4398 /* Dispatch evaluation result to all clients. */
4399 ret = send_evaluation_to_clients(trigger_list_element->trigger,
4400 evaluation, client_list, state,
4401 channel_info->session_info->uid,
4402 channel_info->session_info->gid);
4403 lttng_evaluation_destroy(evaluation);
4404 put_list:
4405 notification_client_list_put(client_list);
4406 if (caa_unlikely(ret)) {
4407 break;
4408 }
4409 }
4410 end_unlock:
4411 rcu_read_unlock();
4412 end:
4413 return ret;
4414 }
This page took 0.200567 seconds and 5 git commands to generate.