SoW-2020-0002: Trace Hit Counters: trigger error reporting integration
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "lttng/action/action.h"
9 #include "lttng/trigger/trigger-internal.h"
10 #define _LGPL_SOURCE
11 #include <urcu.h>
12 #include <urcu/rculfhash.h>
13
14 #include <common/defaults.h>
15 #include <common/error.h>
16 #include <common/futex.h>
17 #include <common/unix.h>
18 #include <common/dynamic-buffer.h>
19 #include <common/hashtable/utils.h>
20 #include <common/sessiond-comm/sessiond-comm.h>
21 #include <common/macros.h>
22 #include <lttng/condition/condition.h>
23 #include <lttng/action/action-internal.h>
24 #include <lttng/action/group-internal.h>
25 #include <lttng/domain-internal.h>
26 #include <lttng/notification/notification-internal.h>
27 #include <lttng/condition/condition-internal.h>
28 #include <lttng/condition/buffer-usage-internal.h>
29 #include <lttng/condition/session-consumed-size-internal.h>
30 #include <lttng/condition/session-rotation-internal.h>
31 #include <lttng/condition/event-rule-internal.h>
32 #include <lttng/domain-internal.h>
33 #include <lttng/notification/channel-internal.h>
34 #include <lttng/trigger/trigger-internal.h>
35
36 #include <time.h>
37 #include <unistd.h>
38 #include <assert.h>
39 #include <inttypes.h>
40 #include <fcntl.h>
41
42 #include "condition-internal.h"
43 #include "notification-thread.h"
44 #include "notification-thread-events.h"
45 #include "notification-thread-commands.h"
46 #include "lttng-sessiond.h"
47 #include "kernel.h"
48 #include "trigger-error-accounting.h"
49
50 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
51 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
52
53 enum lttng_object_type {
54 LTTNG_OBJECT_TYPE_UNKNOWN,
55 LTTNG_OBJECT_TYPE_NONE,
56 LTTNG_OBJECT_TYPE_CHANNEL,
57 LTTNG_OBJECT_TYPE_SESSION,
58 };
59
60 struct lttng_trigger_list_element {
61 /* No ownership of the trigger object is assumed. */
62 struct lttng_trigger *trigger;
63 struct cds_list_head node;
64 };
65
66 struct lttng_channel_trigger_list {
67 struct channel_key channel_key;
68 /* List of struct lttng_trigger_list_element. */
69 struct cds_list_head list;
70 /* Node in the channel_triggers_ht */
71 struct cds_lfht_node channel_triggers_ht_node;
72 /* call_rcu delayed reclaim. */
73 struct rcu_head rcu_node;
74 };
75
76 /*
77 * List of triggers applying to a given session.
78 *
79 * See:
80 * - lttng_session_trigger_list_create()
81 * - lttng_session_trigger_list_build()
82 * - lttng_session_trigger_list_destroy()
83 * - lttng_session_trigger_list_add()
84 */
85 struct lttng_session_trigger_list {
86 /*
87 * Not owned by this; points to the session_info structure's
88 * session name.
89 */
90 const char *session_name;
91 /* List of struct lttng_trigger_list_element. */
92 struct cds_list_head list;
93 /* Node in the session_triggers_ht */
94 struct cds_lfht_node session_triggers_ht_node;
95 /*
96 * Weak reference to the notification system's session triggers
97 * hashtable.
98 *
99 * The session trigger list structure structure is owned by
100 * the session's session_info.
101 *
102 * The session_info is kept alive the the channel_infos holding a
103 * reference to it (reference counting). When those channels are
104 * destroyed (at runtime or on teardown), the reference they hold
105 * to the session_info are released. On destruction of session_info,
106 * session_info_destroy() will remove the list of triggers applying
107 * to this session from the notification system's state.
108 *
109 * This implies that the session_triggers_ht must be destroyed
110 * after the channels.
111 */
112 struct cds_lfht *session_triggers_ht;
113 /* Used for delayed RCU reclaim. */
114 struct rcu_head rcu_node;
115 };
116
117 struct lttng_trigger_ht_element {
118 struct lttng_trigger *trigger;
119 struct cds_lfht_node node;
120 struct cds_lfht_node node_by_name_uid;
121 /* call_rcu delayed reclaim. */
122 struct rcu_head rcu_node;
123 };
124
125 struct lttng_condition_list_element {
126 struct lttng_condition *condition;
127 struct cds_list_head node;
128 };
129
130 struct channel_state_sample {
131 struct channel_key key;
132 struct cds_lfht_node channel_state_ht_node;
133 uint64_t highest_usage;
134 uint64_t lowest_usage;
135 uint64_t channel_total_consumed;
136 /* call_rcu delayed reclaim. */
137 struct rcu_head rcu_node;
138 };
139
140 static unsigned long hash_channel_key(struct channel_key *key);
141 static int evaluate_buffer_condition(const struct lttng_condition *condition,
142 struct lttng_evaluation **evaluation,
143 const struct notification_thread_state *state,
144 const struct channel_state_sample *previous_sample,
145 const struct channel_state_sample *latest_sample,
146 uint64_t previous_session_consumed_total,
147 uint64_t latest_session_consumed_total,
148 struct channel_info *channel_info);
149 static
150 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
151 const struct lttng_evaluation *evaluation,
152 struct notification_client_list *client_list,
153 struct notification_thread_state *state,
154 uid_t channel_uid, gid_t channel_gid);
155
156
157 /* session_info API */
158 static
159 void session_info_destroy(void *_data);
160 static
161 void session_info_get(struct session_info *session_info);
162 static
163 void session_info_put(struct session_info *session_info);
164 static
165 struct session_info *session_info_create(const char *name,
166 uid_t uid, gid_t gid,
167 struct lttng_session_trigger_list *trigger_list,
168 struct cds_lfht *sessions_ht);
169 static
170 void session_info_add_channel(struct session_info *session_info,
171 struct channel_info *channel_info);
172 static
173 void session_info_remove_channel(struct session_info *session_info,
174 struct channel_info *channel_info);
175
176 /* lttng_session_trigger_list API */
177 static
178 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
179 const char *session_name,
180 struct cds_lfht *session_triggers_ht);
181 static
182 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
183 const struct notification_thread_state *state,
184 const char *session_name);
185 static
186 void lttng_session_trigger_list_destroy(
187 struct lttng_session_trigger_list *list);
188 static
189 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
190 struct lttng_trigger *trigger);
191
192 static
193 int client_handle_transmission_status(
194 struct notification_client *client,
195 enum client_transmission_status transmission_status,
196 struct notification_thread_state *state);
197
198 static
199 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node);
200
201 static
202 int match_client_socket(struct cds_lfht_node *node, const void *key)
203 {
204 /* This double-cast is intended to supress pointer-to-cast warning. */
205 const int socket = (int) (intptr_t) key;
206 const struct notification_client *client = caa_container_of(node,
207 struct notification_client, client_socket_ht_node);
208
209 return client->socket == socket;
210 }
211
212 static
213 int match_client_id(struct cds_lfht_node *node, const void *key)
214 {
215 /* This double-cast is intended to supress pointer-to-cast warning. */
216 const notification_client_id id = *((notification_client_id *) key);
217 const struct notification_client *client = caa_container_of(
218 node, struct notification_client, client_id_ht_node);
219
220 return client->id == id;
221 }
222
223 static
224 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
225 {
226 struct channel_key *channel_key = (struct channel_key *) key;
227 struct lttng_channel_trigger_list *trigger_list;
228
229 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
230 channel_triggers_ht_node);
231
232 return !!((channel_key->key == trigger_list->channel_key.key) &&
233 (channel_key->domain == trigger_list->channel_key.domain));
234 }
235
236 static
237 int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
238 {
239 const char *session_name = (const char *) key;
240 struct lttng_session_trigger_list *trigger_list;
241
242 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
243 session_triggers_ht_node);
244
245 return !!(strcmp(trigger_list->session_name, session_name) == 0);
246 }
247
248 static
249 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
250 {
251 struct channel_key *channel_key = (struct channel_key *) key;
252 struct channel_state_sample *sample;
253
254 sample = caa_container_of(node, struct channel_state_sample,
255 channel_state_ht_node);
256
257 return !!((channel_key->key == sample->key.key) &&
258 (channel_key->domain == sample->key.domain));
259 }
260
261 static
262 int match_channel_info(struct cds_lfht_node *node, const void *key)
263 {
264 struct channel_key *channel_key = (struct channel_key *) key;
265 struct channel_info *channel_info;
266
267 channel_info = caa_container_of(node, struct channel_info,
268 channels_ht_node);
269
270 return !!((channel_key->key == channel_info->key.key) &&
271 (channel_key->domain == channel_info->key.domain));
272 }
273
274 static
275 int match_trigger(struct cds_lfht_node *node, const void *key)
276 {
277 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
278 struct lttng_trigger_ht_element *trigger_ht_element;
279
280 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
281 node);
282
283 return !!lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
284 }
285
286 static
287 int match_trigger_token(struct cds_lfht_node *node, const void *key)
288 {
289 const uint64_t *_key = key;
290 struct notification_trigger_tokens_ht_element *element;
291
292 element = caa_container_of(node, struct notification_trigger_tokens_ht_element,
293 node);
294 return *_key == element->token ;
295 }
296
297 static
298 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
299 {
300 struct lttng_condition *condition_key = (struct lttng_condition *) key;
301 struct notification_client_list *client_list;
302 const struct lttng_condition *condition;
303
304 assert(condition_key);
305
306 client_list = caa_container_of(node, struct notification_client_list,
307 notification_trigger_clients_ht_node);
308 condition = lttng_trigger_get_const_condition(client_list->trigger);
309
310 return !!lttng_condition_is_equal(condition_key, condition);
311 }
312
313 static
314 int match_session(struct cds_lfht_node *node, const void *key)
315 {
316 const char *name = key;
317 struct session_info *session_info = caa_container_of(
318 node, struct session_info, sessions_ht_node);
319
320 return !strcmp(session_info->name, name);
321 }
322
323 /*
324 * Match trigger based on name and credentials only.
325 * Name duplication is NOT allowed for the same uid.
326 */
327 static int match_name_uid(struct cds_lfht_node *node, const void *key)
328 {
329 bool match = false;
330 struct lttng_trigger_ht_element *trigger_ht_element;
331 const char *name;
332 const char *key_name;
333 enum lttng_trigger_status status;
334 const struct lttng_credentials *key_creds;
335 const struct lttng_credentials *node_creds;
336 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
337
338 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
339 node_by_name_uid);
340
341 status = lttng_trigger_get_name(trigger_ht_element->trigger, &name);
342 assert(status == LTTNG_TRIGGER_STATUS_OK);
343
344 status = lttng_trigger_get_name(trigger_key, &key_name);
345 assert(status == LTTNG_TRIGGER_STATUS_OK);
346
347 if (strcmp(name, key_name) != 0) {
348 goto end;
349 }
350
351 /* Check the uid */
352 key_creds = lttng_trigger_get_credentials(trigger_key);
353 node_creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
354
355 match = lttng_credentials_is_equal_uid(key_creds, node_creds);
356
357 end:
358 return match;
359 }
360
361 static
362 unsigned long hash_channel_key(struct channel_key *key)
363 {
364 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
365 unsigned long domain_hash = hash_key_ulong(
366 (void *) (unsigned long) key->domain, lttng_ht_seed);
367
368 return key_hash ^ domain_hash;
369 }
370
371 static
372 unsigned long hash_client_socket(int socket)
373 {
374 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
375 }
376
377 static
378 unsigned long hash_client_id(notification_client_id id)
379 {
380 return hash_key_u64(&id, lttng_ht_seed);
381 }
382
383 /*
384 * Get the type of object to which a given condition applies. Bindings let
385 * the notification system evaluate a trigger's condition when a given
386 * object's state is updated.
387 *
388 * For instance, a condition bound to a channel will be evaluated everytime
389 * the channel's state is changed by a channel monitoring sample.
390 */
391 static
392 enum lttng_object_type get_condition_binding_object(
393 const struct lttng_condition *condition)
394 {
395 switch (lttng_condition_get_type(condition)) {
396 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
397 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
398 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
399 return LTTNG_OBJECT_TYPE_CHANNEL;
400 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
401 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
402 return LTTNG_OBJECT_TYPE_SESSION;
403 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
404 return LTTNG_OBJECT_TYPE_NONE;
405 default:
406 return LTTNG_OBJECT_TYPE_UNKNOWN;
407 }
408 }
409
410 static
411 void free_channel_info_rcu(struct rcu_head *node)
412 {
413 free(caa_container_of(node, struct channel_info, rcu_node));
414 }
415
416 static
417 void channel_info_destroy(struct channel_info *channel_info)
418 {
419 if (!channel_info) {
420 return;
421 }
422
423 if (channel_info->session_info) {
424 session_info_remove_channel(channel_info->session_info,
425 channel_info);
426 session_info_put(channel_info->session_info);
427 }
428 if (channel_info->name) {
429 free(channel_info->name);
430 }
431 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
432 }
433
434 static
435 void free_session_info_rcu(struct rcu_head *node)
436 {
437 free(caa_container_of(node, struct session_info, rcu_node));
438 }
439
440 /* Don't call directly, use the ref-counting mechanism. */
441 static
442 void session_info_destroy(void *_data)
443 {
444 struct session_info *session_info = _data;
445 int ret;
446
447 assert(session_info);
448 if (session_info->channel_infos_ht) {
449 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
450 if (ret) {
451 ERR("[notification-thread] Failed to destroy channel information hash table");
452 }
453 }
454 lttng_session_trigger_list_destroy(session_info->trigger_list);
455
456 rcu_read_lock();
457 cds_lfht_del(session_info->sessions_ht,
458 &session_info->sessions_ht_node);
459 rcu_read_unlock();
460 free(session_info->name);
461 call_rcu(&session_info->rcu_node, free_session_info_rcu);
462 }
463
464 static
465 void session_info_get(struct session_info *session_info)
466 {
467 if (!session_info) {
468 return;
469 }
470 lttng_ref_get(&session_info->ref);
471 }
472
473 static
474 void session_info_put(struct session_info *session_info)
475 {
476 if (!session_info) {
477 return;
478 }
479 lttng_ref_put(&session_info->ref);
480 }
481
482 static
483 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
484 struct lttng_session_trigger_list *trigger_list,
485 struct cds_lfht *sessions_ht)
486 {
487 struct session_info *session_info;
488
489 assert(name);
490
491 session_info = zmalloc(sizeof(*session_info));
492 if (!session_info) {
493 goto end;
494 }
495 lttng_ref_init(&session_info->ref, session_info_destroy);
496
497 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
498 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
499 if (!session_info->channel_infos_ht) {
500 goto error;
501 }
502
503 cds_lfht_node_init(&session_info->sessions_ht_node);
504 session_info->name = strdup(name);
505 if (!session_info->name) {
506 goto error;
507 }
508 session_info->uid = uid;
509 session_info->gid = gid;
510 session_info->trigger_list = trigger_list;
511 session_info->sessions_ht = sessions_ht;
512 end:
513 return session_info;
514 error:
515 session_info_put(session_info);
516 return NULL;
517 }
518
519 static
520 void session_info_add_channel(struct session_info *session_info,
521 struct channel_info *channel_info)
522 {
523 rcu_read_lock();
524 cds_lfht_add(session_info->channel_infos_ht,
525 hash_channel_key(&channel_info->key),
526 &channel_info->session_info_channels_ht_node);
527 rcu_read_unlock();
528 }
529
530 static
531 void session_info_remove_channel(struct session_info *session_info,
532 struct channel_info *channel_info)
533 {
534 rcu_read_lock();
535 cds_lfht_del(session_info->channel_infos_ht,
536 &channel_info->session_info_channels_ht_node);
537 rcu_read_unlock();
538 }
539
540 static
541 struct channel_info *channel_info_create(const char *channel_name,
542 struct channel_key *channel_key, uint64_t channel_capacity,
543 struct session_info *session_info)
544 {
545 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
546
547 if (!channel_info) {
548 goto end;
549 }
550
551 cds_lfht_node_init(&channel_info->channels_ht_node);
552 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
553 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
554 channel_info->capacity = channel_capacity;
555
556 channel_info->name = strdup(channel_name);
557 if (!channel_info->name) {
558 goto error;
559 }
560
561 /*
562 * Set the references between session and channel infos:
563 * - channel_info holds a strong reference to session_info
564 * - session_info holds a weak reference to channel_info
565 */
566 session_info_get(session_info);
567 session_info_add_channel(session_info, channel_info);
568 channel_info->session_info = session_info;
569 end:
570 return channel_info;
571 error:
572 channel_info_destroy(channel_info);
573 return NULL;
574 }
575
576 LTTNG_HIDDEN
577 bool notification_client_list_get(struct notification_client_list *list)
578 {
579 return urcu_ref_get_unless_zero(&list->ref);
580 }
581
582 static
583 void free_notification_client_list_rcu(struct rcu_head *node)
584 {
585 free(caa_container_of(node, struct notification_client_list,
586 rcu_node));
587 }
588
589 static
590 void notification_client_list_release(struct urcu_ref *list_ref)
591 {
592 struct notification_client_list *list =
593 container_of(list_ref, typeof(*list), ref);
594 struct notification_client_list_element *client_list_element, *tmp;
595
596 if (list->notification_trigger_clients_ht) {
597 rcu_read_lock();
598 cds_lfht_del(list->notification_trigger_clients_ht,
599 &list->notification_trigger_clients_ht_node);
600 rcu_read_unlock();
601 list->notification_trigger_clients_ht = NULL;
602 }
603 cds_list_for_each_entry_safe(client_list_element, tmp,
604 &list->list, node) {
605 free(client_list_element);
606 }
607 pthread_mutex_destroy(&list->lock);
608 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
609 }
610
611 static
612 struct notification_client_list *notification_client_list_create(
613 const struct lttng_trigger *trigger)
614 {
615 struct notification_client_list *client_list =
616 zmalloc(sizeof(*client_list));
617
618 if (!client_list) {
619 goto error;
620 }
621 pthread_mutex_init(&client_list->lock, NULL);
622 urcu_ref_init(&client_list->ref);
623 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
624 CDS_INIT_LIST_HEAD(&client_list->list);
625 client_list->trigger = trigger;
626 error:
627 return client_list;
628 }
629
630 static
631 void publish_notification_client_list(
632 struct notification_thread_state *state,
633 struct notification_client_list *list)
634 {
635 const struct lttng_condition *condition =
636 lttng_trigger_get_const_condition(list->trigger);
637
638 assert(!list->notification_trigger_clients_ht);
639 notification_client_list_get(list);
640
641 list->notification_trigger_clients_ht =
642 state->notification_trigger_clients_ht;
643
644 rcu_read_lock();
645 cds_lfht_add(state->notification_trigger_clients_ht,
646 lttng_condition_hash(condition),
647 &list->notification_trigger_clients_ht_node);
648 rcu_read_unlock();
649 }
650
651 LTTNG_HIDDEN
652 void notification_client_list_put(struct notification_client_list *list)
653 {
654 if (!list) {
655 return;
656 }
657 return urcu_ref_put(&list->ref, notification_client_list_release);
658 }
659
660 /* Provides a reference to the returned list. */
661 static
662 struct notification_client_list *get_client_list_from_condition(
663 struct notification_thread_state *state,
664 const struct lttng_condition *condition)
665 {
666 struct cds_lfht_node *node;
667 struct cds_lfht_iter iter;
668 struct notification_client_list *list = NULL;
669
670 rcu_read_lock();
671 cds_lfht_lookup(state->notification_trigger_clients_ht,
672 lttng_condition_hash(condition),
673 match_client_list_condition,
674 condition,
675 &iter);
676 node = cds_lfht_iter_get_node(&iter);
677 if (node) {
678 list = container_of(node, struct notification_client_list,
679 notification_trigger_clients_ht_node);
680 list = notification_client_list_get(list) ? list : NULL;
681 }
682
683 rcu_read_unlock();
684 return list;
685 }
686
687 static
688 int evaluate_channel_condition_for_client(
689 const struct lttng_condition *condition,
690 struct notification_thread_state *state,
691 struct lttng_evaluation **evaluation,
692 uid_t *session_uid, gid_t *session_gid)
693 {
694 int ret;
695 struct cds_lfht_iter iter;
696 struct cds_lfht_node *node;
697 struct channel_info *channel_info = NULL;
698 struct channel_key *channel_key = NULL;
699 struct channel_state_sample *last_sample = NULL;
700 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
701
702 rcu_read_lock();
703
704 /* Find the channel associated with the condition. */
705 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
706 channel_trigger_list, channel_triggers_ht_node) {
707 struct lttng_trigger_list_element *element;
708
709 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
710 const struct lttng_condition *current_condition =
711 lttng_trigger_get_const_condition(
712 element->trigger);
713
714 assert(current_condition);
715 if (!lttng_condition_is_equal(condition,
716 current_condition)) {
717 continue;
718 }
719
720 /* Found the trigger, save the channel key. */
721 channel_key = &channel_trigger_list->channel_key;
722 break;
723 }
724 if (channel_key) {
725 /* The channel key was found stop iteration. */
726 break;
727 }
728 }
729
730 if (!channel_key){
731 /* No channel found; normal exit. */
732 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
733 ret = 0;
734 goto end;
735 }
736
737 /* Fetch channel info for the matching channel. */
738 cds_lfht_lookup(state->channels_ht,
739 hash_channel_key(channel_key),
740 match_channel_info,
741 channel_key,
742 &iter);
743 node = cds_lfht_iter_get_node(&iter);
744 assert(node);
745 channel_info = caa_container_of(node, struct channel_info,
746 channels_ht_node);
747
748 /* Retrieve the channel's last sample, if it exists. */
749 cds_lfht_lookup(state->channel_state_ht,
750 hash_channel_key(channel_key),
751 match_channel_state_sample,
752 channel_key,
753 &iter);
754 node = cds_lfht_iter_get_node(&iter);
755 if (node) {
756 last_sample = caa_container_of(node,
757 struct channel_state_sample,
758 channel_state_ht_node);
759 } else {
760 /* Nothing to evaluate, no sample was ever taken. Normal exit */
761 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
762 ret = 0;
763 goto end;
764 }
765
766 ret = evaluate_buffer_condition(condition, evaluation, state,
767 NULL, last_sample,
768 0, channel_info->session_info->consumed_data_size,
769 channel_info);
770 if (ret) {
771 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
772 goto end;
773 }
774
775 *session_uid = channel_info->session_info->uid;
776 *session_gid = channel_info->session_info->gid;
777 end:
778 rcu_read_unlock();
779 return ret;
780 }
781
782 static
783 const char *get_condition_session_name(const struct lttng_condition *condition)
784 {
785 const char *session_name = NULL;
786 enum lttng_condition_status status;
787
788 switch (lttng_condition_get_type(condition)) {
789 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
790 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
791 status = lttng_condition_buffer_usage_get_session_name(
792 condition, &session_name);
793 break;
794 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
795 status = lttng_condition_session_consumed_size_get_session_name(
796 condition, &session_name);
797 break;
798 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
799 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
800 status = lttng_condition_session_rotation_get_session_name(
801 condition, &session_name);
802 break;
803 default:
804 abort();
805 }
806 if (status != LTTNG_CONDITION_STATUS_OK) {
807 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
808 goto end;
809 }
810 end:
811 return session_name;
812 }
813
814 static
815 int evaluate_session_condition_for_client(
816 const struct lttng_condition *condition,
817 struct notification_thread_state *state,
818 struct lttng_evaluation **evaluation,
819 uid_t *session_uid, gid_t *session_gid)
820 {
821 int ret;
822 struct cds_lfht_iter iter;
823 struct cds_lfht_node *node;
824 const char *session_name;
825 struct session_info *session_info = NULL;
826
827 rcu_read_lock();
828 session_name = get_condition_session_name(condition);
829
830 /* Find the session associated with the trigger. */
831 cds_lfht_lookup(state->sessions_ht,
832 hash_key_str(session_name, lttng_ht_seed),
833 match_session,
834 session_name,
835 &iter);
836 node = cds_lfht_iter_get_node(&iter);
837 if (!node) {
838 DBG("[notification-thread] No known session matching name \"%s\"",
839 session_name);
840 ret = 0;
841 goto end;
842 }
843
844 session_info = caa_container_of(node, struct session_info,
845 sessions_ht_node);
846 session_info_get(session_info);
847
848 /*
849 * Evaluation is performed in-line here since only one type of
850 * session-bound condition is handled for the moment.
851 */
852 switch (lttng_condition_get_type(condition)) {
853 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
854 if (!session_info->rotation.ongoing) {
855 ret = 0;
856 goto end_session_put;
857 }
858
859 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
860 session_info->rotation.id);
861 if (!*evaluation) {
862 /* Fatal error. */
863 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
864 session_info->name);
865 ret = -1;
866 goto end_session_put;
867 }
868 ret = 0;
869 break;
870 default:
871 ret = 0;
872 goto end_session_put;
873 }
874
875 *session_uid = session_info->uid;
876 *session_gid = session_info->gid;
877
878 end_session_put:
879 session_info_put(session_info);
880 end:
881 rcu_read_unlock();
882 return ret;
883 }
884
885 static
886 int evaluate_condition_for_client(const struct lttng_trigger *trigger,
887 const struct lttng_condition *condition,
888 struct notification_client *client,
889 struct notification_thread_state *state)
890 {
891 int ret;
892 struct lttng_evaluation *evaluation = NULL;
893 struct notification_client_list client_list = {
894 .lock = PTHREAD_MUTEX_INITIALIZER,
895 };
896 struct notification_client_list_element client_list_element = { 0 };
897 uid_t object_uid = 0;
898 gid_t object_gid = 0;
899
900 assert(trigger);
901 assert(condition);
902 assert(client);
903 assert(state);
904
905 switch (get_condition_binding_object(condition)) {
906 case LTTNG_OBJECT_TYPE_SESSION:
907 ret = evaluate_session_condition_for_client(condition, state,
908 &evaluation, &object_uid, &object_gid);
909 break;
910 case LTTNG_OBJECT_TYPE_CHANNEL:
911 ret = evaluate_channel_condition_for_client(condition, state,
912 &evaluation, &object_uid, &object_gid);
913 break;
914 case LTTNG_OBJECT_TYPE_NONE:
915 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
916 ret = 0;
917 goto end;
918 case LTTNG_OBJECT_TYPE_UNKNOWN:
919 default:
920 ret = -1;
921 goto end;
922 }
923 if (ret) {
924 /* Fatal error. */
925 goto end;
926 }
927 if (!evaluation) {
928 /* Evaluation yielded nothing. Normal exit. */
929 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
930 ret = 0;
931 goto end;
932 }
933
934 /*
935 * Create a temporary client list with the client currently
936 * subscribing.
937 */
938 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
939 CDS_INIT_LIST_HEAD(&client_list.list);
940 client_list.trigger = trigger;
941
942 CDS_INIT_LIST_HEAD(&client_list_element.node);
943 client_list_element.client = client;
944 cds_list_add(&client_list_element.node, &client_list.list);
945
946 /* Send evaluation result to the newly-subscribed client. */
947 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
948 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
949 state, object_uid, object_gid);
950
951 end:
952 return ret;
953 }
954
955 static
956 int notification_thread_client_subscribe(struct notification_client *client,
957 struct lttng_condition *condition,
958 struct notification_thread_state *state,
959 enum lttng_notification_channel_status *_status)
960 {
961 int ret = 0;
962 struct notification_client_list *client_list = NULL;
963 struct lttng_condition_list_element *condition_list_element = NULL;
964 struct notification_client_list_element *client_list_element = NULL;
965 enum lttng_notification_channel_status status =
966 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
967
968 /*
969 * Ensure that the client has not already subscribed to this condition
970 * before.
971 */
972 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
973 if (lttng_condition_is_equal(condition_list_element->condition,
974 condition)) {
975 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
976 goto end;
977 }
978 }
979
980 condition_list_element = zmalloc(sizeof(*condition_list_element));
981 if (!condition_list_element) {
982 ret = -1;
983 goto error;
984 }
985 client_list_element = zmalloc(sizeof(*client_list_element));
986 if (!client_list_element) {
987 ret = -1;
988 goto error;
989 }
990
991 /*
992 * Add the newly-subscribed condition to the client's subscription list.
993 */
994 CDS_INIT_LIST_HEAD(&condition_list_element->node);
995 condition_list_element->condition = condition;
996 cds_list_add(&condition_list_element->node, &client->condition_list);
997
998 client_list = get_client_list_from_condition(state, condition);
999 if (!client_list) {
1000 /*
1001 * No notification-emiting trigger registered with this
1002 * condition. We don't evaluate the condition right away
1003 * since this trigger is not registered yet.
1004 */
1005 free(client_list_element);
1006 goto end;
1007 }
1008
1009 /*
1010 * The condition to which the client just subscribed is evaluated
1011 * at this point so that conditions that are already TRUE result
1012 * in a notification being sent out.
1013 *
1014 * The client_list's trigger is used without locking the list itself.
1015 * This is correct since the list doesn't own the trigger and the
1016 * object is immutable.
1017 */
1018 if (evaluate_condition_for_client(client_list->trigger, condition,
1019 client, state)) {
1020 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1021 ret = -1;
1022 free(client_list_element);
1023 goto end;
1024 }
1025
1026 /*
1027 * Add the client to the list of clients interested in a given trigger
1028 * if a "notification" trigger with a corresponding condition was
1029 * added prior.
1030 */
1031 client_list_element->client = client;
1032 CDS_INIT_LIST_HEAD(&client_list_element->node);
1033
1034 pthread_mutex_lock(&client_list->lock);
1035 cds_list_add(&client_list_element->node, &client_list->list);
1036 pthread_mutex_unlock(&client_list->lock);
1037 end:
1038 if (_status) {
1039 *_status = status;
1040 }
1041 if (client_list) {
1042 notification_client_list_put(client_list);
1043 }
1044 return ret;
1045 error:
1046 free(condition_list_element);
1047 free(client_list_element);
1048 return ret;
1049 }
1050
1051 static
1052 int notification_thread_client_unsubscribe(
1053 struct notification_client *client,
1054 struct lttng_condition *condition,
1055 struct notification_thread_state *state,
1056 enum lttng_notification_channel_status *_status)
1057 {
1058 struct notification_client_list *client_list;
1059 struct lttng_condition_list_element *condition_list_element,
1060 *condition_tmp;
1061 struct notification_client_list_element *client_list_element,
1062 *client_tmp;
1063 bool condition_found = false;
1064 enum lttng_notification_channel_status status =
1065 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1066
1067 /* Remove the condition from the client's condition list. */
1068 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1069 &client->condition_list, node) {
1070 if (!lttng_condition_is_equal(condition_list_element->condition,
1071 condition)) {
1072 continue;
1073 }
1074
1075 cds_list_del(&condition_list_element->node);
1076 /*
1077 * The caller may be iterating on the client's conditions to
1078 * tear down a client's connection. In this case, the condition
1079 * will be destroyed at the end.
1080 */
1081 if (condition != condition_list_element->condition) {
1082 lttng_condition_destroy(
1083 condition_list_element->condition);
1084 }
1085 free(condition_list_element);
1086 condition_found = true;
1087 break;
1088 }
1089
1090 if (!condition_found) {
1091 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1092 goto end;
1093 }
1094
1095 /*
1096 * Remove the client from the list of clients interested the trigger
1097 * matching the condition.
1098 */
1099 client_list = get_client_list_from_condition(state, condition);
1100 if (!client_list) {
1101 goto end;
1102 }
1103
1104 pthread_mutex_lock(&client_list->lock);
1105 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1106 &client_list->list, node) {
1107 if (client_list_element->client->id != client->id) {
1108 continue;
1109 }
1110 cds_list_del(&client_list_element->node);
1111 free(client_list_element);
1112 break;
1113 }
1114 pthread_mutex_unlock(&client_list->lock);
1115 notification_client_list_put(client_list);
1116 client_list = NULL;
1117 end:
1118 lttng_condition_destroy(condition);
1119 if (_status) {
1120 *_status = status;
1121 }
1122 return 0;
1123 }
1124
1125 static
1126 void free_notification_client_rcu(struct rcu_head *node)
1127 {
1128 free(caa_container_of(node, struct notification_client, rcu_node));
1129 }
1130
1131 static
1132 void notification_client_destroy(struct notification_client *client,
1133 struct notification_thread_state *state)
1134 {
1135 if (!client) {
1136 return;
1137 }
1138
1139 /*
1140 * The client object is not reachable by other threads, no need to lock
1141 * the client here.
1142 */
1143 if (client->socket >= 0) {
1144 (void) lttcomm_close_unix_sock(client->socket);
1145 client->socket = -1;
1146 }
1147 client->communication.active = false;
1148 lttng_payload_reset(&client->communication.inbound.payload);
1149 lttng_payload_reset(&client->communication.outbound.payload);
1150 pthread_mutex_destroy(&client->lock);
1151 call_rcu(&client->rcu_node, free_notification_client_rcu);
1152 }
1153
1154 /*
1155 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1156 * client pointer).
1157 */
1158 static
1159 struct notification_client *get_client_from_socket(int socket,
1160 struct notification_thread_state *state)
1161 {
1162 struct cds_lfht_iter iter;
1163 struct cds_lfht_node *node;
1164 struct notification_client *client = NULL;
1165
1166 cds_lfht_lookup(state->client_socket_ht,
1167 hash_client_socket(socket),
1168 match_client_socket,
1169 (void *) (unsigned long) socket,
1170 &iter);
1171 node = cds_lfht_iter_get_node(&iter);
1172 if (!node) {
1173 goto end;
1174 }
1175
1176 client = caa_container_of(node, struct notification_client,
1177 client_socket_ht_node);
1178 end:
1179 return client;
1180 }
1181
1182 /*
1183 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1184 * client pointer).
1185 */
1186 static
1187 struct notification_client *get_client_from_id(notification_client_id id,
1188 struct notification_thread_state *state)
1189 {
1190 struct cds_lfht_iter iter;
1191 struct cds_lfht_node *node;
1192 struct notification_client *client = NULL;
1193
1194 cds_lfht_lookup(state->client_id_ht,
1195 hash_client_id(id),
1196 match_client_id,
1197 &id,
1198 &iter);
1199 node = cds_lfht_iter_get_node(&iter);
1200 if (!node) {
1201 goto end;
1202 }
1203
1204 client = caa_container_of(node, struct notification_client,
1205 client_id_ht_node);
1206 end:
1207 return client;
1208 }
1209
1210 static
1211 bool buffer_usage_condition_applies_to_channel(
1212 const struct lttng_condition *condition,
1213 const struct channel_info *channel_info)
1214 {
1215 enum lttng_condition_status status;
1216 enum lttng_domain_type condition_domain;
1217 const char *condition_session_name = NULL;
1218 const char *condition_channel_name = NULL;
1219
1220 status = lttng_condition_buffer_usage_get_domain_type(condition,
1221 &condition_domain);
1222 assert(status == LTTNG_CONDITION_STATUS_OK);
1223 if (channel_info->key.domain != condition_domain) {
1224 goto fail;
1225 }
1226
1227 status = lttng_condition_buffer_usage_get_session_name(
1228 condition, &condition_session_name);
1229 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1230
1231 status = lttng_condition_buffer_usage_get_channel_name(
1232 condition, &condition_channel_name);
1233 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1234
1235 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1236 goto fail;
1237 }
1238 if (strcmp(channel_info->name, condition_channel_name)) {
1239 goto fail;
1240 }
1241
1242 return true;
1243 fail:
1244 return false;
1245 }
1246
1247 static
1248 bool session_consumed_size_condition_applies_to_channel(
1249 const struct lttng_condition *condition,
1250 const struct channel_info *channel_info)
1251 {
1252 enum lttng_condition_status status;
1253 const char *condition_session_name = NULL;
1254
1255 status = lttng_condition_session_consumed_size_get_session_name(
1256 condition, &condition_session_name);
1257 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1258
1259 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1260 goto fail;
1261 }
1262
1263 return true;
1264 fail:
1265 return false;
1266 }
1267
1268 static
1269 bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1270 const struct channel_info *channel_info)
1271 {
1272 const struct lttng_condition *condition;
1273 bool trigger_applies;
1274
1275 condition = lttng_trigger_get_const_condition(trigger);
1276 if (!condition) {
1277 goto fail;
1278 }
1279
1280 switch (lttng_condition_get_type(condition)) {
1281 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1282 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1283 trigger_applies = buffer_usage_condition_applies_to_channel(
1284 condition, channel_info);
1285 break;
1286 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1287 trigger_applies = session_consumed_size_condition_applies_to_channel(
1288 condition, channel_info);
1289 break;
1290 default:
1291 goto fail;
1292 }
1293
1294 return trigger_applies;
1295 fail:
1296 return false;
1297 }
1298
1299 static
1300 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1301 struct notification_client *client)
1302 {
1303 bool applies = false;
1304 struct lttng_condition_list_element *condition_list_element;
1305
1306 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1307 node) {
1308 applies = lttng_condition_is_equal(
1309 condition_list_element->condition,
1310 lttng_trigger_get_condition(trigger));
1311 if (applies) {
1312 break;
1313 }
1314 }
1315 return applies;
1316 }
1317
1318 /* Must be called with RCU read lock held. */
1319 static
1320 struct lttng_session_trigger_list *get_session_trigger_list(
1321 struct notification_thread_state *state,
1322 const char *session_name)
1323 {
1324 struct lttng_session_trigger_list *list = NULL;
1325 struct cds_lfht_node *node;
1326 struct cds_lfht_iter iter;
1327
1328 cds_lfht_lookup(state->session_triggers_ht,
1329 hash_key_str(session_name, lttng_ht_seed),
1330 match_session_trigger_list,
1331 session_name,
1332 &iter);
1333 node = cds_lfht_iter_get_node(&iter);
1334 if (!node) {
1335 /*
1336 * Not an error, the list of triggers applying to that session
1337 * will be initialized when the session is created.
1338 */
1339 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1340 session_name);
1341 goto end;
1342 }
1343
1344 list = caa_container_of(node,
1345 struct lttng_session_trigger_list,
1346 session_triggers_ht_node);
1347 end:
1348 return list;
1349 }
1350
1351 /*
1352 * Allocate an empty lttng_session_trigger_list for the session named
1353 * 'session_name'.
1354 *
1355 * No ownership of 'session_name' is assumed by the session trigger list.
1356 * It is the caller's responsability to ensure the session name is alive
1357 * for as long as this list is.
1358 */
1359 static
1360 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1361 const char *session_name,
1362 struct cds_lfht *session_triggers_ht)
1363 {
1364 struct lttng_session_trigger_list *list;
1365
1366 list = zmalloc(sizeof(*list));
1367 if (!list) {
1368 goto end;
1369 }
1370 list->session_name = session_name;
1371 CDS_INIT_LIST_HEAD(&list->list);
1372 cds_lfht_node_init(&list->session_triggers_ht_node);
1373 list->session_triggers_ht = session_triggers_ht;
1374
1375 rcu_read_lock();
1376 /* Publish the list through the session_triggers_ht. */
1377 cds_lfht_add(session_triggers_ht,
1378 hash_key_str(session_name, lttng_ht_seed),
1379 &list->session_triggers_ht_node);
1380 rcu_read_unlock();
1381 end:
1382 return list;
1383 }
1384
1385 static
1386 void free_session_trigger_list_rcu(struct rcu_head *node)
1387 {
1388 free(caa_container_of(node, struct lttng_session_trigger_list,
1389 rcu_node));
1390 }
1391
1392 static
1393 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1394 {
1395 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1396
1397 /* Empty the list element by element, and then free the list itself. */
1398 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1399 &list->list, node) {
1400 cds_list_del(&trigger_list_element->node);
1401 free(trigger_list_element);
1402 }
1403 rcu_read_lock();
1404 /* Unpublish the list from the session_triggers_ht. */
1405 cds_lfht_del(list->session_triggers_ht,
1406 &list->session_triggers_ht_node);
1407 rcu_read_unlock();
1408 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1409 }
1410
1411 static
1412 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1413 struct lttng_trigger *trigger)
1414 {
1415 int ret = 0;
1416 struct lttng_trigger_list_element *new_element =
1417 zmalloc(sizeof(*new_element));
1418
1419 if (!new_element) {
1420 ret = -1;
1421 goto end;
1422 }
1423 CDS_INIT_LIST_HEAD(&new_element->node);
1424 new_element->trigger = trigger;
1425 cds_list_add(&new_element->node, &list->list);
1426 end:
1427 return ret;
1428 }
1429
1430 static
1431 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1432 const char *session_name)
1433 {
1434 bool applies = false;
1435 const struct lttng_condition *condition;
1436
1437 condition = lttng_trigger_get_const_condition(trigger);
1438 switch (lttng_condition_get_type(condition)) {
1439 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1440 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1441 {
1442 enum lttng_condition_status condition_status;
1443 const char *condition_session_name;
1444
1445 condition_status = lttng_condition_session_rotation_get_session_name(
1446 condition, &condition_session_name);
1447 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1448 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1449 goto end;
1450 }
1451
1452 assert(condition_session_name);
1453 applies = !strcmp(condition_session_name, session_name);
1454 break;
1455 }
1456 default:
1457 goto end;
1458 }
1459 end:
1460 return applies;
1461 }
1462
1463 /*
1464 * Allocate and initialize an lttng_session_trigger_list which contains
1465 * all triggers that apply to the session named 'session_name'.
1466 *
1467 * No ownership of 'session_name' is assumed by the session trigger list.
1468 * It is the caller's responsability to ensure the session name is alive
1469 * for as long as this list is.
1470 */
1471 static
1472 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1473 const struct notification_thread_state *state,
1474 const char *session_name)
1475 {
1476 int trigger_count = 0;
1477 struct lttng_session_trigger_list *session_trigger_list = NULL;
1478 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1479 struct cds_lfht_iter iter;
1480
1481 session_trigger_list = lttng_session_trigger_list_create(session_name,
1482 state->session_triggers_ht);
1483
1484 /* Add all triggers applying to the session named 'session_name'. */
1485 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1486 node) {
1487 int ret;
1488
1489 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1490 session_name)) {
1491 continue;
1492 }
1493
1494 ret = lttng_session_trigger_list_add(session_trigger_list,
1495 trigger_ht_element->trigger);
1496 if (ret) {
1497 goto error;
1498 }
1499
1500 trigger_count++;
1501 }
1502
1503 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1504 trigger_count);
1505 return session_trigger_list;
1506 error:
1507 lttng_session_trigger_list_destroy(session_trigger_list);
1508 return NULL;
1509 }
1510
1511 static
1512 struct session_info *find_or_create_session_info(
1513 struct notification_thread_state *state,
1514 const char *name, uid_t uid, gid_t gid)
1515 {
1516 struct session_info *session = NULL;
1517 struct cds_lfht_node *node;
1518 struct cds_lfht_iter iter;
1519 struct lttng_session_trigger_list *trigger_list;
1520
1521 rcu_read_lock();
1522 cds_lfht_lookup(state->sessions_ht,
1523 hash_key_str(name, lttng_ht_seed),
1524 match_session,
1525 name,
1526 &iter);
1527 node = cds_lfht_iter_get_node(&iter);
1528 if (node) {
1529 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1530 name, uid, gid);
1531 session = caa_container_of(node, struct session_info,
1532 sessions_ht_node);
1533 assert(session->uid == uid);
1534 assert(session->gid == gid);
1535 session_info_get(session);
1536 goto end;
1537 }
1538
1539 trigger_list = lttng_session_trigger_list_build(state, name);
1540 if (!trigger_list) {
1541 goto error;
1542 }
1543
1544 session = session_info_create(name, uid, gid, trigger_list,
1545 state->sessions_ht);
1546 if (!session) {
1547 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1548 name, uid, gid);
1549 lttng_session_trigger_list_destroy(trigger_list);
1550 goto error;
1551 }
1552 trigger_list = NULL;
1553
1554 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1555 &session->sessions_ht_node);
1556 end:
1557 rcu_read_unlock();
1558 return session;
1559 error:
1560 rcu_read_unlock();
1561 session_info_put(session);
1562 return NULL;
1563 }
1564
1565 static
1566 int handle_notification_thread_command_add_channel(
1567 struct notification_thread_state *state,
1568 const char *session_name, uid_t session_uid, gid_t session_gid,
1569 const char *channel_name, enum lttng_domain_type channel_domain,
1570 uint64_t channel_key_int, uint64_t channel_capacity,
1571 enum lttng_error_code *cmd_result)
1572 {
1573 struct cds_list_head trigger_list;
1574 struct channel_info *new_channel_info = NULL;
1575 struct channel_key channel_key = {
1576 .key = channel_key_int,
1577 .domain = channel_domain,
1578 };
1579 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1580 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1581 int trigger_count = 0;
1582 struct cds_lfht_iter iter;
1583 struct session_info *session_info = NULL;
1584
1585 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1586 channel_name, session_name, channel_key_int,
1587 lttng_domain_type_str(channel_domain));
1588
1589 CDS_INIT_LIST_HEAD(&trigger_list);
1590
1591 session_info = find_or_create_session_info(state, session_name,
1592 session_uid, session_gid);
1593 if (!session_info) {
1594 /* Allocation error or an internal error occurred. */
1595 goto error;
1596 }
1597
1598 new_channel_info = channel_info_create(channel_name, &channel_key,
1599 channel_capacity, session_info);
1600 if (!new_channel_info) {
1601 goto error;
1602 }
1603
1604 rcu_read_lock();
1605 /* Build a list of all triggers applying to the new channel. */
1606 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1607 node) {
1608 struct lttng_trigger_list_element *new_element;
1609
1610 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1611 new_channel_info)) {
1612 continue;
1613 }
1614
1615 new_element = zmalloc(sizeof(*new_element));
1616 if (!new_element) {
1617 rcu_read_unlock();
1618 goto error;
1619 }
1620 CDS_INIT_LIST_HEAD(&new_element->node);
1621 new_element->trigger = trigger_ht_element->trigger;
1622 cds_list_add(&new_element->node, &trigger_list);
1623 trigger_count++;
1624 }
1625 rcu_read_unlock();
1626
1627 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1628 trigger_count);
1629 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1630 if (!channel_trigger_list) {
1631 goto error;
1632 }
1633 channel_trigger_list->channel_key = new_channel_info->key;
1634 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1635 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1636 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1637
1638 rcu_read_lock();
1639 /* Add channel to the channel_ht which owns the channel_infos. */
1640 cds_lfht_add(state->channels_ht,
1641 hash_channel_key(&new_channel_info->key),
1642 &new_channel_info->channels_ht_node);
1643 /*
1644 * Add the list of triggers associated with this channel to the
1645 * channel_triggers_ht.
1646 */
1647 cds_lfht_add(state->channel_triggers_ht,
1648 hash_channel_key(&new_channel_info->key),
1649 &channel_trigger_list->channel_triggers_ht_node);
1650 rcu_read_unlock();
1651 session_info_put(session_info);
1652 *cmd_result = LTTNG_OK;
1653 return 0;
1654 error:
1655 channel_info_destroy(new_channel_info);
1656 session_info_put(session_info);
1657 return 1;
1658 }
1659
1660 static
1661 void free_channel_trigger_list_rcu(struct rcu_head *node)
1662 {
1663 free(caa_container_of(node, struct lttng_channel_trigger_list,
1664 rcu_node));
1665 }
1666
1667 static
1668 void free_channel_state_sample_rcu(struct rcu_head *node)
1669 {
1670 free(caa_container_of(node, struct channel_state_sample,
1671 rcu_node));
1672 }
1673
1674 static
1675 int handle_notification_thread_command_remove_channel(
1676 struct notification_thread_state *state,
1677 uint64_t channel_key, enum lttng_domain_type domain,
1678 enum lttng_error_code *cmd_result)
1679 {
1680 struct cds_lfht_node *node;
1681 struct cds_lfht_iter iter;
1682 struct lttng_channel_trigger_list *trigger_list;
1683 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1684 struct channel_key key = { .key = channel_key, .domain = domain };
1685 struct channel_info *channel_info;
1686
1687 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1688 channel_key, lttng_domain_type_str(domain));
1689
1690 rcu_read_lock();
1691
1692 cds_lfht_lookup(state->channel_triggers_ht,
1693 hash_channel_key(&key),
1694 match_channel_trigger_list,
1695 &key,
1696 &iter);
1697 node = cds_lfht_iter_get_node(&iter);
1698 /*
1699 * There is a severe internal error if we are being asked to remove a
1700 * channel that doesn't exist.
1701 */
1702 if (!node) {
1703 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1704 goto end;
1705 }
1706
1707 /* Free the list of triggers associated with this channel. */
1708 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1709 channel_triggers_ht_node);
1710 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1711 &trigger_list->list, node) {
1712 cds_list_del(&trigger_list_element->node);
1713 free(trigger_list_element);
1714 }
1715 cds_lfht_del(state->channel_triggers_ht, node);
1716 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
1717
1718 /* Free sampled channel state. */
1719 cds_lfht_lookup(state->channel_state_ht,
1720 hash_channel_key(&key),
1721 match_channel_state_sample,
1722 &key,
1723 &iter);
1724 node = cds_lfht_iter_get_node(&iter);
1725 /*
1726 * This is expected to be NULL if the channel is destroyed before we
1727 * received a sample.
1728 */
1729 if (node) {
1730 struct channel_state_sample *sample = caa_container_of(node,
1731 struct channel_state_sample,
1732 channel_state_ht_node);
1733
1734 cds_lfht_del(state->channel_state_ht, node);
1735 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
1736 }
1737
1738 /* Remove the channel from the channels_ht and free it. */
1739 cds_lfht_lookup(state->channels_ht,
1740 hash_channel_key(&key),
1741 match_channel_info,
1742 &key,
1743 &iter);
1744 node = cds_lfht_iter_get_node(&iter);
1745 assert(node);
1746 channel_info = caa_container_of(node, struct channel_info,
1747 channels_ht_node);
1748 cds_lfht_del(state->channels_ht, node);
1749 channel_info_destroy(channel_info);
1750 end:
1751 rcu_read_unlock();
1752 *cmd_result = LTTNG_OK;
1753 return 0;
1754 }
1755
1756 static
1757 int handle_notification_thread_command_session_rotation(
1758 struct notification_thread_state *state,
1759 enum notification_thread_command_type cmd_type,
1760 const char *session_name, uid_t session_uid, gid_t session_gid,
1761 uint64_t trace_archive_chunk_id,
1762 struct lttng_trace_archive_location *location,
1763 enum lttng_error_code *_cmd_result)
1764 {
1765 int ret = 0;
1766 enum lttng_error_code cmd_result = LTTNG_OK;
1767 struct lttng_session_trigger_list *trigger_list;
1768 struct lttng_trigger_list_element *trigger_list_element;
1769 struct session_info *session_info;
1770 const struct lttng_credentials session_creds = {
1771 .uid = LTTNG_OPTIONAL_INIT_VALUE(session_uid),
1772 .gid = LTTNG_OPTIONAL_INIT_VALUE(session_gid),
1773 };
1774
1775 rcu_read_lock();
1776
1777 session_info = find_or_create_session_info(state, session_name,
1778 session_uid, session_gid);
1779 if (!session_info) {
1780 /* Allocation error or an internal error occurred. */
1781 ret = -1;
1782 cmd_result = LTTNG_ERR_NOMEM;
1783 goto end;
1784 }
1785
1786 session_info->rotation.ongoing =
1787 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1788 session_info->rotation.id = trace_archive_chunk_id;
1789 trigger_list = get_session_trigger_list(state, session_name);
1790 if (!trigger_list) {
1791 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1792 session_name);
1793 goto end;
1794 }
1795
1796 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1797 node) {
1798 const struct lttng_condition *condition;
1799 struct lttng_trigger *trigger;
1800 struct notification_client_list *client_list;
1801 struct lttng_evaluation *evaluation = NULL;
1802 enum lttng_condition_type condition_type;
1803 enum action_executor_status executor_status;
1804
1805 trigger = trigger_list_element->trigger;
1806 condition = lttng_trigger_get_const_condition(trigger);
1807 assert(condition);
1808 condition_type = lttng_condition_get_type(condition);
1809
1810 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1811 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1812 continue;
1813 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1814 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1815 continue;
1816 }
1817
1818 client_list = get_client_list_from_condition(state, condition);
1819 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1820 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1821 trace_archive_chunk_id);
1822 } else {
1823 evaluation = lttng_evaluation_session_rotation_completed_create(
1824 trace_archive_chunk_id, location);
1825 }
1826
1827 if (!evaluation) {
1828 /* Internal error */
1829 ret = -1;
1830 cmd_result = LTTNG_ERR_UNK;
1831 goto put_list;
1832 }
1833
1834 /*
1835 * Ownership of `evaluation` transferred to the action executor
1836 * no matter the result.
1837 */
1838 executor_status = action_executor_enqueue(state->executor,
1839 trigger, evaluation, &session_creds,
1840 client_list);
1841 evaluation = NULL;
1842 switch (executor_status) {
1843 case ACTION_EXECUTOR_STATUS_OK:
1844 break;
1845 case ACTION_EXECUTOR_STATUS_ERROR:
1846 case ACTION_EXECUTOR_STATUS_INVALID:
1847 /*
1848 * TODO Add trigger identification (name/id) when
1849 * it is added to the API.
1850 */
1851 ERR("Fatal error occurred while enqueuing action associated with session rotation trigger");
1852 ret = -1;
1853 goto put_list;
1854 case ACTION_EXECUTOR_STATUS_OVERFLOW:
1855 /*
1856 * TODO Add trigger identification (name/id) when
1857 * it is added to the API.
1858 *
1859 * Not a fatal error.
1860 */
1861 WARN("No space left when enqueuing action associated with session rotation trigger");
1862 ret = 0;
1863 goto put_list;
1864 default:
1865 abort();
1866 }
1867
1868 put_list:
1869 notification_client_list_put(client_list);
1870 if (caa_unlikely(ret)) {
1871 break;
1872 }
1873 }
1874 end:
1875 session_info_put(session_info);
1876 *_cmd_result = cmd_result;
1877 rcu_read_unlock();
1878 return ret;
1879 }
1880
1881 static
1882 int handle_notification_thread_command_add_application(
1883 struct notification_thread_handle *handle,
1884 struct notification_thread_state *state,
1885 int read_side_trigger_event_application_pipe,
1886 enum lttng_domain_type domain_type,
1887 enum lttng_error_code *_cmd_result)
1888 {
1889 int ret = 0;
1890 enum lttng_error_code cmd_result = LTTNG_OK;
1891 struct notification_event_trigger_source_element *element = NULL;
1892
1893 element = zmalloc(sizeof(*element));
1894 if (!element) {
1895 cmd_result = LTTNG_ERR_NOMEM;
1896 ret = -1;
1897 goto end;
1898 }
1899
1900 CDS_INIT_LIST_HEAD(&element->node);
1901 element->fd = read_side_trigger_event_application_pipe;
1902 element->domain = domain_type;
1903
1904 pthread_mutex_lock(&handle->event_trigger_sources.lock);
1905 cds_list_add(&element->node, &handle->event_trigger_sources.list);
1906 pthread_mutex_unlock(&handle->event_trigger_sources.lock);
1907
1908 /* TODO: remove on failure to add to list? */
1909
1910 /* Adding the read side pipe to the event poll */
1911 ret = lttng_poll_add(&state->events,
1912 read_side_trigger_event_application_pipe,
1913 LPOLLIN | LPOLLERR);
1914
1915 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe);
1916 if (ret < 0) {
1917 /* TODO: what should be the value of cmd_result??? */
1918 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
1919 goto end;
1920 }
1921
1922 end:
1923 *_cmd_result = cmd_result;
1924 return ret;
1925 }
1926
1927 static
1928 int handle_notification_thread_command_remove_application(
1929 struct notification_thread_handle *handle,
1930 struct notification_thread_state *state,
1931 int read_side_trigger_event_application_pipe,
1932 enum lttng_error_code *_cmd_result)
1933 {
1934 int ret = 0;
1935 enum lttng_error_code cmd_result = LTTNG_OK;
1936 /* Used for logging */
1937 enum lttng_domain_type domain = LTTNG_DOMAIN_NONE;
1938
1939 /* TODO: missing a lock propably to revisit */
1940 struct notification_event_trigger_source_element *source_element = NULL, *tmp;
1941
1942 cds_list_for_each_entry_safe(source_element, tmp,
1943 &handle->event_trigger_sources.list, node) {
1944 if (source_element->fd != read_side_trigger_event_application_pipe) {
1945 continue;
1946 }
1947
1948 cds_list_del(&source_element->node);
1949 break;
1950 }
1951
1952 /* It should always be found */
1953 assert(source_element);
1954
1955 DBG3("[notification-thread] Removing application event source from fd: %d of domain: %s",
1956 read_side_trigger_event_application_pipe,
1957 lttng_domain_type_str(domain));
1958 free(source_element);
1959
1960 /* Removing the read side pipe to the event poll */
1961 ret = lttng_poll_del(&state->events,
1962 read_side_trigger_event_application_pipe);
1963 if (ret < 0) {
1964 /* TODO: what should be the value of cmd_result??? */
1965 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
1966 goto end;
1967 }
1968
1969 end:
1970 *_cmd_result = cmd_result;
1971 return ret;
1972 }
1973
1974 static int handle_notification_thread_command_get_tokens(
1975 struct notification_thread_handle *handle,
1976 struct notification_thread_state *state,
1977 struct lttng_triggers **triggers,
1978 enum lttng_error_code *_cmd_result)
1979 {
1980 int ret = 0, i = 0;
1981 enum lttng_error_code cmd_result = LTTNG_OK;
1982 struct cds_lfht_iter iter;
1983 struct notification_trigger_tokens_ht_element *element;
1984 struct lttng_triggers *local_triggers = NULL;
1985
1986 local_triggers = lttng_triggers_create();
1987 if (!local_triggers) {
1988 cmd_result = LTTNG_ERR_NOMEM;
1989 goto end;
1990 }
1991
1992 rcu_read_lock();
1993 cds_lfht_for_each_entry (
1994 state->trigger_tokens_ht, &iter, element, node) {
1995 ret = lttng_triggers_add(local_triggers, element->trigger);
1996 if (ret < 0) {
1997 cmd_result = LTTNG_ERR_FATAL;
1998 ret = -1;
1999 goto end;
2000 }
2001
2002 i++;
2003 }
2004
2005 /* Passing ownership up */
2006 *triggers = local_triggers;
2007 local_triggers = NULL;
2008
2009 end:
2010 rcu_read_unlock();
2011 lttng_triggers_destroy(local_triggers);
2012 *_cmd_result = cmd_result;
2013 return ret;
2014 }
2015
2016 static
2017 int trigger_update_error_count(struct lttng_trigger *trigger)
2018 {
2019 int ret = 0;
2020 uint64_t error_count = 0;
2021 enum trigger_error_accounting_status status;
2022
2023 status = trigger_error_accounting_get_count(trigger, &error_count);
2024 if (status != TRIGGER_ERROR_ACCOUNTING_STATUS_OK) {
2025 ERR("Error getting trigger error count");
2026 }
2027
2028 lttng_trigger_set_error_count(trigger, error_count);
2029 return ret;
2030 }
2031
2032 static int handle_notification_thread_command_list_triggers(
2033 struct notification_thread_handle *handle,
2034 struct notification_thread_state *state,
2035 uid_t uid,
2036 struct lttng_triggers **triggers,
2037 enum lttng_error_code *_cmd_result)
2038 {
2039 int ret = 0, i = 0;
2040 enum lttng_error_code cmd_result = LTTNG_OK;
2041 struct cds_lfht_iter iter;
2042 struct lttng_trigger_ht_element *trigger_ht_element;
2043 struct lttng_triggers *local_triggers = NULL;
2044 const struct lttng_credentials *creds;
2045
2046 long scb, sca;
2047 unsigned long count;
2048
2049 rcu_read_lock();
2050 cds_lfht_count_nodes(state->triggers_ht, &scb, &count, &sca);
2051
2052 local_triggers = lttng_triggers_create();
2053 if (!local_triggers) {
2054 cmd_result = LTTNG_ERR_NOMEM;
2055 goto end;
2056 }
2057
2058 cds_lfht_for_each_entry (state->triggers_ht, &iter,
2059 trigger_ht_element, node) {
2060 /*
2061 * Only return the trigger for which the requestion client have
2062 * access.
2063 * Root user have visibility over all triggers.
2064 */
2065 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2066 if (uid != lttng_credentials_get_uid(creds) && uid != 0) {
2067 continue;
2068 }
2069
2070 ret = trigger_update_error_count(trigger_ht_element->trigger);
2071 assert(!ret);
2072
2073 ret = lttng_triggers_add(local_triggers, trigger_ht_element->trigger);
2074 if (ret < 0) {
2075 ret = -1;
2076 goto end;
2077 }
2078
2079 i++;
2080 }
2081
2082 /* Passing ownership up */
2083 *triggers = local_triggers;
2084 local_triggers = NULL;
2085
2086 end:
2087 rcu_read_unlock();
2088 lttng_triggers_destroy(local_triggers);
2089 *_cmd_result = cmd_result;
2090 return ret;
2091 }
2092
2093 static
2094 int condition_is_supported(struct lttng_condition *condition)
2095 {
2096 int ret;
2097
2098 switch (lttng_condition_get_type(condition)) {
2099 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2100 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2101 {
2102 enum lttng_domain_type domain;
2103
2104 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2105 &domain);
2106 if (ret) {
2107 ret = -1;
2108 goto end;
2109 }
2110
2111 if (domain != LTTNG_DOMAIN_KERNEL) {
2112 ret = 1;
2113 goto end;
2114 }
2115
2116 /*
2117 * Older kernel tracers don't expose the API to monitor their
2118 * buffers. Therefore, we reject triggers that require that
2119 * mechanism to be available to be evaluated.
2120 */
2121 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2122 break;
2123 }
2124 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2125 {
2126 /* TODO:
2127 * Check for kernel support.
2128 * Check for ust support ??
2129 */
2130 ret = 1;
2131 break;
2132 }
2133 default:
2134 ret = 1;
2135 }
2136 end:
2137 return ret;
2138 }
2139
2140 static
2141 int action_is_supported(struct lttng_action *action)
2142 {
2143 int ret;
2144
2145 switch (lttng_action_get_type(action)) {
2146 case LTTNG_ACTION_TYPE_NOTIFY:
2147 case LTTNG_ACTION_TYPE_START_SESSION:
2148 case LTTNG_ACTION_TYPE_STOP_SESSION:
2149 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
2150 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
2151 {
2152 /* TODO validate that this is true for kernel in regards to
2153 * rotation and snapshot. Start stop is not a problem notify
2154 * either.
2155 */
2156 /* For now all type of actions are supported */
2157 ret = 1;
2158 break;
2159 }
2160 case LTTNG_ACTION_TYPE_GROUP:
2161 {
2162 /* TODO: Iterate over all internal actions and validate that
2163 * they are supported
2164 */
2165 ret = 1;
2166 break;
2167
2168 }
2169 default:
2170 ret = 1;
2171 }
2172
2173 return ret;
2174 }
2175
2176 /* Must be called with RCU read lock held. */
2177 static
2178 int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2179 struct notification_thread_state *state)
2180 {
2181 int ret = 0;
2182 const struct lttng_condition *condition;
2183 const char *session_name;
2184 struct lttng_session_trigger_list *trigger_list;
2185
2186 condition = lttng_trigger_get_const_condition(trigger);
2187 switch (lttng_condition_get_type(condition)) {
2188 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2189 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2190 {
2191 enum lttng_condition_status status;
2192
2193 status = lttng_condition_session_rotation_get_session_name(
2194 condition, &session_name);
2195 if (status != LTTNG_CONDITION_STATUS_OK) {
2196 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2197 ret = -1;
2198 goto end;
2199 }
2200 break;
2201 }
2202 default:
2203 ret = -1;
2204 goto end;
2205 }
2206
2207 trigger_list = get_session_trigger_list(state, session_name);
2208 if (!trigger_list) {
2209 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2210 session_name);
2211 goto end;
2212
2213 }
2214
2215 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2216 session_name);
2217 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2218 end:
2219 return ret;
2220 }
2221
2222 /* Must be called with RCU read lock held. */
2223 static
2224 int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
2225 struct notification_thread_state *state)
2226 {
2227 int ret = 0;
2228 struct cds_lfht_node *node;
2229 struct cds_lfht_iter iter;
2230 struct channel_info *channel;
2231
2232 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2233 channels_ht_node) {
2234 struct lttng_trigger_list_element *trigger_list_element;
2235 struct lttng_channel_trigger_list *trigger_list;
2236 struct cds_lfht_iter lookup_iter;
2237
2238 if (!trigger_applies_to_channel(trigger, channel)) {
2239 continue;
2240 }
2241
2242 cds_lfht_lookup(state->channel_triggers_ht,
2243 hash_channel_key(&channel->key),
2244 match_channel_trigger_list,
2245 &channel->key,
2246 &lookup_iter);
2247 node = cds_lfht_iter_get_node(&lookup_iter);
2248 assert(node);
2249 trigger_list = caa_container_of(node,
2250 struct lttng_channel_trigger_list,
2251 channel_triggers_ht_node);
2252
2253 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2254 if (!trigger_list_element) {
2255 ret = -1;
2256 goto end;
2257 }
2258 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2259 trigger_list_element->trigger = trigger;
2260 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2261 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2262 channel->name);
2263 }
2264 end:
2265 return ret;
2266 }
2267
2268 static
2269 bool is_trigger_action_notify(const struct lttng_trigger *trigger)
2270 {
2271 bool is_notify = false;
2272 unsigned int i, count;
2273 enum lttng_action_status action_status;
2274 const struct lttng_action *action =
2275 lttng_trigger_get_const_action(trigger);
2276 enum lttng_action_type action_type;
2277
2278 assert(action);
2279 action_type = lttng_action_get_type(action);
2280 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2281 is_notify = true;
2282 goto end;
2283 } else if (action_type != LTTNG_ACTION_TYPE_GROUP) {
2284 goto end;
2285 }
2286
2287 action_status = lttng_action_group_get_count(action, &count);
2288 assert(action_status == LTTNG_ACTION_STATUS_OK);
2289
2290 for (i = 0; i < count; i++) {
2291 const struct lttng_action *inner_action =
2292 lttng_action_group_get_at_index(
2293 action, i);
2294
2295 action_type = lttng_action_get_type(inner_action);
2296 if (action_type == LTTNG_ACTION_TYPE_NOTIFY) {
2297 is_notify = true;
2298 goto end;
2299 }
2300 }
2301
2302 end:
2303 return is_notify;
2304 }
2305
2306 static bool trigger_name_taken(struct notification_thread_state *state,
2307 const struct lttng_trigger *trigger)
2308 {
2309 struct cds_lfht_node *triggers_by_name_uid_ht_node;
2310 struct cds_lfht_iter iter;
2311
2312 /*
2313 * No duplicata is allowed in the triggers_by_name_uid_ht.
2314 * The match is done against the trigger name and uid.
2315 */
2316 cds_lfht_lookup(state->triggers_by_name_uid_ht,
2317 hash_key_str(trigger->name, lttng_ht_seed),
2318 match_name_uid,
2319 trigger,
2320 &iter);
2321 triggers_by_name_uid_ht_node = cds_lfht_iter_get_node(&iter);
2322 if (triggers_by_name_uid_ht_node) {
2323 return true;
2324 } else {
2325 return false;
2326 }
2327 }
2328
2329 static
2330 void generate_trigger_name(struct notification_thread_state *state, struct lttng_trigger *trigger, const char **name)
2331 {
2332 /*
2333 * Here the offset criteria guarantee an end. This will be a nice
2334 * bikeshedding conversation. I would simply generate uuid and use them
2335 * as trigger name.
2336 */
2337 bool taken = false;
2338 enum lttng_trigger_status status;
2339 do {
2340 lttng_trigger_generate_name(trigger, state->trigger_id.name_offset);
2341
2342 status = lttng_trigger_get_name(trigger, name);
2343 assert(status == LTTNG_TRIGGER_STATUS_OK);
2344
2345 taken = trigger_name_taken(state, trigger);
2346 if (taken) {
2347 state->trigger_id.name_offset++;
2348 }
2349 } while (taken || state->trigger_id.name_offset == UINT32_MAX);
2350 }
2351
2352 /*
2353 * FIXME A client's credentials are not checked when registering a trigger.
2354 *
2355 * The effects of this are benign since:
2356 * - The client will succeed in registering the trigger, as it is valid,
2357 * - The trigger will, internally, be bound to the channel/session,
2358 * - The notifications will not be sent since the client's credentials
2359 * are checked against the channel at that moment.
2360 *
2361 * If this function returns a non-zero value, it means something is
2362 * fundamentally broken and the whole subsystem/thread will be torn down.
2363 *
2364 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2365 * error code.
2366 */
2367 static
2368 int handle_notification_thread_command_register_trigger(
2369 struct notification_thread_state *state,
2370 struct lttng_trigger *trigger,
2371 enum lttng_error_code *cmd_result)
2372 {
2373 int ret = 0;
2374 int is_supported;
2375 struct lttng_condition *condition;
2376 struct lttng_action *action;
2377 struct notification_client *client;
2378 struct notification_client_list *client_list = NULL;
2379 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2380 struct notification_client_list_element *client_list_element;
2381 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2382 struct cds_lfht_node *node;
2383 struct cds_lfht_iter iter;
2384 const char* trigger_name;
2385 bool free_trigger = true;
2386 struct lttng_evaluation *evaluation = NULL;
2387 struct lttng_credentials object_creds;
2388 uid_t object_uid;
2389 gid_t object_gid;
2390 enum action_executor_status executor_status;
2391
2392 rcu_read_lock();
2393
2394 /* Set the trigger's tracer token */
2395 lttng_trigger_set_tracer_token(trigger, state->trigger_id.token_generator);
2396
2397 if (lttng_trigger_get_name(trigger, &trigger_name) ==
2398 LTTNG_TRIGGER_STATUS_UNSET) {
2399 generate_trigger_name(state, trigger, &trigger_name);
2400 } else if (trigger_name_taken(state, trigger)) {
2401 /* Not a fatal error */
2402 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2403 ret = 0;
2404 goto error;
2405 }
2406
2407 condition = lttng_trigger_get_condition(trigger);
2408 assert(condition);
2409
2410 action = lttng_trigger_get_action(trigger);
2411 assert(action);
2412
2413 is_supported = condition_is_supported(condition);
2414 if (is_supported < 0) {
2415 goto error;
2416 } else if (is_supported == 0) {
2417 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2418 goto error;
2419 }
2420
2421 is_supported = action_is_supported(action);
2422 if (is_supported < 0) {
2423 goto error;
2424 } else if (is_supported == 0) {
2425 ret = 0;
2426 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2427 goto error;
2428 }
2429
2430 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2431 if (!trigger_ht_element) {
2432 ret = -1;
2433 goto error;
2434 }
2435
2436 /* Add trigger to the trigger_ht. */
2437 cds_lfht_node_init(&trigger_ht_element->node);
2438 cds_lfht_node_init(&trigger_ht_element->node_by_name_uid);
2439 trigger_ht_element->trigger = trigger;
2440
2441 node = cds_lfht_add_unique(state->triggers_ht,
2442 lttng_condition_hash(condition),
2443 match_trigger,
2444 trigger,
2445 &trigger_ht_element->node);
2446 if (node != &trigger_ht_element->node) {
2447 /* Not a fatal error, simply report it to the client. */
2448 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2449 goto error_free_ht_element;
2450 }
2451
2452 node = cds_lfht_add_unique(state->triggers_by_name_uid_ht,
2453 hash_key_str(trigger_name, lttng_ht_seed), match_name_uid,
2454 trigger, &trigger_ht_element->node_by_name_uid);
2455 if (node != &trigger_ht_element->node_by_name_uid) {
2456 /* Not a fatal error, simply report it to the client. */
2457 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2458 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2459 goto error_free_ht_element;
2460 }
2461
2462 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2463 uint64_t error_counter_index = 0;
2464 enum trigger_error_accounting_status error_accounting_status;
2465
2466 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2467 if (!trigger_tokens_ht_element) {
2468 ret = -1;
2469 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2470 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2471 goto error;
2472 }
2473
2474 error_accounting_status = trigger_error_accounting_register_trigger(
2475 trigger, &error_counter_index);
2476 if (error_accounting_status != TRIGGER_ERROR_ACCOUNTING_STATUS_OK) {
2477 ERR("Error registering trigger for error accounting");
2478 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2479 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2480 *cmd_result = LTTNG_ERR_TRIGGER_GROUP_ERROR_COUNTER_FULL;
2481 goto error_free_ht_element;
2482 }
2483
2484 lttng_trigger_set_error_counter_index(trigger, error_counter_index);
2485
2486 /* Add trigger token to the trigger_tokens_ht. */
2487 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2488 trigger_tokens_ht_element->token = LTTNG_OPTIONAL_GET(trigger->tracer_token);
2489 trigger_tokens_ht_element->trigger = trigger;
2490
2491 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2492 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2493 match_trigger_token,
2494 &trigger_tokens_ht_element->token,
2495 &trigger_tokens_ht_element->node);
2496 if (node != &trigger_tokens_ht_element->node) {
2497 /* TODO: THIS IS A FATAL ERROR... should never happen */
2498 /* Not a fatal error, simply report it to the client. */
2499 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2500 cds_lfht_del(state->triggers_ht, &trigger_ht_element->node);
2501 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2502 trigger_error_accounting_unregister_trigger(trigger);
2503 goto error_free_ht_element;
2504 }
2505 }
2506
2507 /*
2508 * Ownership of the trigger and of its wrapper was transfered to
2509 * the triggers_ht. Same for token ht element if necessary.
2510 */
2511 trigger_tokens_ht_element = NULL;
2512 trigger_ht_element = NULL;
2513 free_trigger = false;
2514
2515 /*
2516 * The rest only applies to triggers that have a "notify" action.
2517 * It is not skipped as this is the only action type currently
2518 * supported.
2519 */
2520 if (is_trigger_action_notify(trigger)) {
2521 client_list = notification_client_list_create(trigger);
2522 if (!client_list) {
2523 ret = -1;
2524 goto error_free_ht_element;
2525 }
2526
2527 /* Build a list of clients to which this new trigger applies. */
2528 cds_lfht_for_each_entry (state->client_socket_ht, &iter, client,
2529 client_socket_ht_node) {
2530 if (!trigger_applies_to_client(trigger, client)) {
2531 continue;
2532 }
2533
2534 client_list_element =
2535 zmalloc(sizeof(*client_list_element));
2536 if (!client_list_element) {
2537 ret = -1;
2538 goto error_put_client_list;
2539 }
2540
2541 CDS_INIT_LIST_HEAD(&client_list_element->node);
2542 client_list_element->client = client;
2543 cds_list_add(&client_list_element->node,
2544 &client_list->list);
2545 }
2546
2547 /*
2548 * Client list ownership transferred to the
2549 * notification_trigger_clients_ht.
2550 */
2551 publish_notification_client_list(state, client_list);
2552 }
2553
2554 switch (get_condition_binding_object(condition)) {
2555 case LTTNG_OBJECT_TYPE_SESSION:
2556 /* Add the trigger to the list if it matches a known session. */
2557 ret = bind_trigger_to_matching_session(trigger, state);
2558 if (ret) {
2559 goto error_put_client_list;
2560 }
2561 break;
2562 case LTTNG_OBJECT_TYPE_CHANNEL:
2563 /*
2564 * Add the trigger to list of triggers bound to the channels
2565 * currently known.
2566 */
2567 ret = bind_trigger_to_matching_channels(trigger, state);
2568 if (ret) {
2569 goto error_put_client_list;
2570 }
2571 break;
2572 case LTTNG_OBJECT_TYPE_NONE:
2573 break;
2574 default:
2575 ERR("Unknown object type on which to bind a newly registered trigger was encountered");
2576 ret = -1;
2577 goto error_put_client_list;
2578 }
2579
2580 /*
2581 * The new trigger's condition must be evaluated against the current
2582 * state.
2583 *
2584 * In the case of `notify` action, nothing preventing clients from
2585 * subscribing to a condition before the corresponding trigger is
2586 * registered, we have to evaluate this new condition right away.
2587 *
2588 * At some point, we were waiting for the next "evaluation" (e.g. on
2589 * reception of a channel sample) to evaluate this new condition, but
2590 * that was broken.
2591 *
2592 * The reason it was broken is that waiting for the next sample
2593 * does not allow us to properly handle transitions for edge-triggered
2594 * conditions.
2595 *
2596 * Consider this example: when we handle a new channel sample, we
2597 * evaluate each conditions twice: once with the previous state, and
2598 * again with the newest state. We then use those two results to
2599 * determine whether a state change happened: a condition was false and
2600 * became true. If a state change happened, we have to notify clients.
2601 *
2602 * Now, if a client subscribes to a given notification and registers
2603 * a trigger *after* that subscription, we have to make sure the
2604 * condition is evaluated at this point while considering only the
2605 * current state. Otherwise, the next evaluation cycle may only see
2606 * that the evaluations remain the same (true for samples n-1 and n) and
2607 * the client will never know that the condition has been met.
2608 */
2609 switch (get_condition_binding_object(condition)) {
2610 case LTTNG_OBJECT_TYPE_SESSION:
2611 ret = evaluate_session_condition_for_client(condition, state,
2612 &evaluation, &object_uid,
2613 &object_gid);
2614 break;
2615 case LTTNG_OBJECT_TYPE_CHANNEL:
2616 ret = evaluate_channel_condition_for_client(condition, state,
2617 &evaluation, &object_uid,
2618 &object_gid);
2619 break;
2620 case LTTNG_OBJECT_TYPE_NONE:
2621 ret = 0;
2622 break;
2623 case LTTNG_OBJECT_TYPE_UNKNOWN:
2624 default:
2625 ret = -1;
2626 break;
2627 }
2628
2629 if (ret) {
2630 /* Fatal error. */
2631 goto error_put_client_list;
2632 }
2633
2634 LTTNG_OPTIONAL_SET(&object_creds.uid, object_uid);
2635 LTTNG_OPTIONAL_SET(&object_creds.gid, object_gid);
2636
2637 DBG("Newly registered trigger's condition evaluated to %s",
2638 evaluation ? "true" : "false");
2639 if (!evaluation) {
2640 /* Evaluation yielded nothing. Normal exit. */
2641 ret = 0;
2642 goto end;
2643 }
2644
2645 /*
2646 * Ownership of `evaluation` transferred to the action executor
2647 * no matter the result.
2648 */
2649 executor_status = action_executor_enqueue(state->executor, trigger,
2650 evaluation, &object_creds, client_list);
2651 evaluation = NULL;
2652 switch (executor_status) {
2653 case ACTION_EXECUTOR_STATUS_OK:
2654 break;
2655 case ACTION_EXECUTOR_STATUS_ERROR:
2656 case ACTION_EXECUTOR_STATUS_INVALID:
2657 /*
2658 * TODO Add trigger identification (name/id) when
2659 * it is added to the API.
2660 */
2661 ERR("Fatal error occurred while enqueuing action associated to newly registered trigger");
2662 ret = -1;
2663 goto error_put_client_list;
2664 case ACTION_EXECUTOR_STATUS_OVERFLOW:
2665 /*
2666 * TODO Add trigger identification (name/id) when
2667 * it is added to the API.
2668 *
2669 * Not a fatal error.
2670 */
2671 WARN("No space left when enqueuing action associated to newly registered trigger");
2672 ret = 0;
2673 goto end;
2674 default:
2675 abort();
2676 }
2677
2678 end:
2679 /* Increment the trigger unique id generator */
2680 state->trigger_id.token_generator++;
2681 *cmd_result = LTTNG_OK;
2682
2683 error_put_client_list:
2684 notification_client_list_put(client_list);
2685
2686 error_free_ht_element:
2687 if (trigger_ht_element) {
2688 /* Delayed removal due to RCU constraint on delete. */
2689 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2690 }
2691
2692 free(trigger_tokens_ht_element);
2693 error:
2694 if (free_trigger) {
2695 lttng_trigger_destroy(trigger);
2696 }
2697 rcu_read_unlock();
2698 return ret;
2699 }
2700
2701 static
2702 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2703 {
2704 free(caa_container_of(node, struct lttng_trigger_ht_element,
2705 rcu_node));
2706 }
2707
2708 static
2709 void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
2710 {
2711 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
2712 rcu_node));
2713 }
2714
2715 static
2716 int handle_notification_thread_command_unregister_trigger(
2717 struct notification_thread_state *state,
2718 struct lttng_trigger *trigger,
2719 enum lttng_error_code *_cmd_reply)
2720 {
2721 struct cds_lfht_iter iter;
2722 struct cds_lfht_node *triggers_ht_node;
2723 struct lttng_channel_trigger_list *trigger_list;
2724 struct notification_client_list *client_list;
2725 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2726 struct lttng_condition *condition = lttng_trigger_get_condition(
2727 trigger);
2728 enum lttng_error_code cmd_reply;
2729
2730 rcu_read_lock();
2731
2732 cds_lfht_lookup(state->triggers_ht,
2733 lttng_condition_hash(condition),
2734 match_trigger,
2735 trigger,
2736 &iter);
2737 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2738 if (!triggers_ht_node) {
2739 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2740 goto end;
2741 } else {
2742 cmd_reply = LTTNG_OK;
2743 }
2744
2745 /* Remove trigger from channel_triggers_ht. */
2746 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2747 channel_triggers_ht_node) {
2748 struct lttng_trigger_list_element *trigger_element, *tmp;
2749
2750 cds_list_for_each_entry_safe(trigger_element, tmp,
2751 &trigger_list->list, node) {
2752 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
2753 continue;
2754 }
2755
2756 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2757 cds_list_del(&trigger_element->node);
2758 /* A trigger can only appear once per channel */
2759 break;
2760 }
2761 }
2762
2763 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2764 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
2765 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter, trigger_tokens_ht_element,
2766 node) {
2767 if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
2768 continue;
2769 }
2770
2771 trigger_error_accounting_unregister_trigger(trigger_tokens_ht_element->trigger);
2772
2773 /* TODO talk to all app and remove it */
2774 DBG("[notification-thread] Removed trigger from tokens_ht");
2775 cds_lfht_del(state->trigger_tokens_ht,
2776 &trigger_tokens_ht_element->node);
2777 call_rcu(&trigger_tokens_ht_element->rcu_node, free_notification_trigger_tokens_ht_element_rcu);
2778
2779 break;
2780 }
2781 }
2782
2783 if (is_trigger_action_notify(trigger)) {
2784 /*
2785 * Remove and release the client list from
2786 * notification_trigger_clients_ht.
2787 */
2788 client_list = get_client_list_from_condition(state, condition);
2789 assert(client_list);
2790
2791 /* Put new reference and the hashtable's reference. */
2792 notification_client_list_put(client_list);
2793 notification_client_list_put(client_list);
2794 client_list = NULL;
2795 }
2796
2797 /* Remove trigger from triggers_ht. */
2798 trigger_ht_element = caa_container_of(triggers_ht_node,
2799 struct lttng_trigger_ht_element, node);
2800 cds_lfht_del(state->triggers_by_name_uid_ht, &trigger_ht_element->node_by_name_uid);
2801 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2802
2803 /* Release the ownership of the trigger. */
2804 lttng_trigger_destroy(trigger_ht_element->trigger);
2805 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
2806 end:
2807 rcu_read_unlock();
2808 if (_cmd_reply) {
2809 *_cmd_reply = cmd_reply;
2810 }
2811 return 0;
2812 }
2813
2814 /* Returns 0 on success, 1 on exit requested, negative value on error. */
2815 int handle_notification_thread_command(
2816 struct notification_thread_handle *handle,
2817 struct notification_thread_state *state)
2818 {
2819 int ret;
2820 uint64_t counter;
2821 struct notification_thread_command *cmd;
2822
2823 /* Read the event pipe to put it back into a quiescent state. */
2824 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
2825 sizeof(counter));
2826 if (ret != sizeof(counter)) {
2827 goto error;
2828 }
2829
2830 pthread_mutex_lock(&handle->cmd_queue.lock);
2831 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2832 struct notification_thread_command, cmd_list_node);
2833 cds_list_del(&cmd->cmd_list_node);
2834 pthread_mutex_unlock(&handle->cmd_queue.lock);
2835 switch (cmd->type) {
2836 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2837 DBG("[notification-thread] Received register trigger command");
2838 ret = handle_notification_thread_command_register_trigger(
2839 state, cmd->parameters.trigger,
2840 &cmd->reply_code);
2841 break;
2842 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2843 DBG("[notification-thread] Received unregister trigger command");
2844 ret = handle_notification_thread_command_unregister_trigger(
2845 state, cmd->parameters.trigger,
2846 &cmd->reply_code);
2847 break;
2848 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2849 DBG("[notification-thread] Received add channel command");
2850 ret = handle_notification_thread_command_add_channel(
2851 state,
2852 cmd->parameters.add_channel.session.name,
2853 cmd->parameters.add_channel.session.uid,
2854 cmd->parameters.add_channel.session.gid,
2855 cmd->parameters.add_channel.channel.name,
2856 cmd->parameters.add_channel.channel.domain,
2857 cmd->parameters.add_channel.channel.key,
2858 cmd->parameters.add_channel.channel.capacity,
2859 &cmd->reply_code);
2860 break;
2861 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2862 DBG("[notification-thread] Received remove channel command");
2863 ret = handle_notification_thread_command_remove_channel(
2864 state, cmd->parameters.remove_channel.key,
2865 cmd->parameters.remove_channel.domain,
2866 &cmd->reply_code);
2867 break;
2868 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
2869 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
2870 DBG("[notification-thread] Received session rotation %s command",
2871 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2872 "ongoing" : "completed");
2873 ret = handle_notification_thread_command_session_rotation(
2874 state,
2875 cmd->type,
2876 cmd->parameters.session_rotation.session_name,
2877 cmd->parameters.session_rotation.uid,
2878 cmd->parameters.session_rotation.gid,
2879 cmd->parameters.session_rotation.trace_archive_chunk_id,
2880 cmd->parameters.session_rotation.location,
2881 &cmd->reply_code);
2882 break;
2883 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION:
2884 ret = handle_notification_thread_command_add_application(
2885 handle,
2886 state,
2887 cmd->parameters.application.read_side_trigger_event_application_pipe,
2888 cmd->parameters.application.domain,
2889 &cmd->reply_code);
2890 break;
2891 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION:
2892 ret = handle_notification_thread_command_remove_application(
2893 handle,
2894 state,
2895 cmd->parameters.application.read_side_trigger_event_application_pipe,
2896 &cmd->reply_code);
2897 break;
2898 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS:
2899 {
2900 struct lttng_triggers *triggers = NULL;
2901 ret = handle_notification_thread_command_get_tokens(
2902 handle, state, &triggers, &cmd->reply_code);
2903 cmd->reply.get_tokens.triggers = triggers;
2904 ret = 0;
2905 break;
2906 }
2907 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2908 {
2909 struct lttng_triggers *triggers = NULL;
2910 ret = handle_notification_thread_command_list_triggers(
2911 handle,
2912 state,
2913 cmd->parameters.list_triggers.uid,
2914 &triggers,
2915 &cmd->reply_code);
2916 cmd->reply.list_triggers.triggers = triggers;
2917 ret = 0;
2918 break;
2919 }
2920 case NOTIFICATION_COMMAND_TYPE_QUIT:
2921 DBG("[notification-thread] Received quit command");
2922 cmd->reply_code = LTTNG_OK;
2923 ret = 1;
2924 goto end;
2925 case NOTIFICATION_COMMAND_TYPE_CLIENT_COMMUNICATION_UPDATE:
2926 {
2927 const enum client_transmission_status client_status =
2928 cmd->parameters.client_communication_update
2929 .status;
2930 const notification_client_id client_id =
2931 cmd->parameters.client_communication_update.id;
2932 struct notification_client *client;
2933
2934 rcu_read_lock();
2935 client = get_client_from_id(client_id, state);
2936
2937 if (!client) {
2938 /*
2939 * Client error was probably already picked-up by the
2940 * notification thread or it has disconnected
2941 * gracefully while this command was queued.
2942 */
2943 DBG("Failed to find notification client to update communication status, client id = %" PRIu64,
2944 client_id);
2945 ret = 0;
2946 } else {
2947 ret = client_handle_transmission_status(
2948 client, client_status, state);
2949 }
2950 rcu_read_unlock();
2951 break;
2952 }
2953 default:
2954 ERR("[notification-thread] Unknown internal command received");
2955 goto error_unlock;
2956 }
2957
2958 if (ret) {
2959 goto error_unlock;
2960 }
2961 end:
2962 if (cmd->is_async) {
2963 free(cmd);
2964 cmd = NULL;
2965 } else {
2966 lttng_waiter_wake_up(&cmd->reply_waiter);
2967 }
2968 return ret;
2969 error_unlock:
2970 /* Wake-up and return a fatal error to the calling thread. */
2971 lttng_waiter_wake_up(&cmd->reply_waiter);
2972 cmd->reply_code = LTTNG_ERR_FATAL;
2973 error:
2974 /* Indicate a fatal error to the caller. */
2975 return -1;
2976 }
2977
2978 static
2979 int socket_set_non_blocking(int socket)
2980 {
2981 int ret, flags;
2982
2983 /* Set the pipe as non-blocking. */
2984 ret = fcntl(socket, F_GETFL, 0);
2985 if (ret == -1) {
2986 PERROR("fcntl get socket flags");
2987 goto end;
2988 }
2989 flags = ret;
2990
2991 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2992 if (ret == -1) {
2993 PERROR("fcntl set O_NONBLOCK socket flag");
2994 goto end;
2995 }
2996 DBG("Client socket (fd = %i) set as non-blocking", socket);
2997 end:
2998 return ret;
2999 }
3000
3001 static
3002 int client_reset_inbound_state(struct notification_client *client)
3003 {
3004 int ret;
3005
3006
3007 lttng_payload_clear(&client->communication.inbound.payload);
3008
3009 client->communication.inbound.bytes_to_receive =
3010 sizeof(struct lttng_notification_channel_message);
3011 client->communication.inbound.msg_type =
3012 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
3013 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
3014 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
3015 ret = lttng_dynamic_buffer_set_size(
3016 &client->communication.inbound.payload.buffer,
3017 client->communication.inbound.bytes_to_receive);
3018
3019 return ret;
3020 }
3021
3022 int handle_notification_thread_client_connect(
3023 struct notification_thread_state *state)
3024 {
3025 int ret;
3026 struct notification_client *client;
3027
3028 DBG("[notification-thread] Handling new notification channel client connection");
3029
3030 client = zmalloc(sizeof(*client));
3031 if (!client) {
3032 /* Fatal error. */
3033 ret = -1;
3034 goto error;
3035 }
3036
3037 pthread_mutex_init(&client->lock, NULL);
3038 client->id = state->next_notification_client_id++;
3039 CDS_INIT_LIST_HEAD(&client->condition_list);
3040 lttng_payload_init(&client->communication.inbound.payload);
3041 lttng_payload_init(&client->communication.outbound.payload);
3042 client->communication.inbound.expect_creds = true;
3043
3044 ret = client_reset_inbound_state(client);
3045 if (ret) {
3046 ERR("[notification-thread] Failed to reset client communication's inbound state");
3047 ret = 0;
3048 goto error;
3049 }
3050
3051 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3052 if (ret < 0) {
3053 ERR("[notification-thread] Failed to accept new notification channel client connection");
3054 ret = 0;
3055 goto error;
3056 }
3057
3058 client->socket = ret;
3059
3060 ret = socket_set_non_blocking(client->socket);
3061 if (ret) {
3062 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3063 goto error;
3064 }
3065
3066 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3067 if (ret < 0) {
3068 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3069 ret = 0;
3070 goto error;
3071 }
3072
3073 ret = lttng_poll_add(&state->events, client->socket,
3074 LPOLLIN | LPOLLERR |
3075 LPOLLHUP | LPOLLRDHUP);
3076 if (ret < 0) {
3077 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3078 ret = 0;
3079 goto error;
3080 }
3081 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3082 client->socket);
3083
3084 rcu_read_lock();
3085 cds_lfht_add(state->client_socket_ht,
3086 hash_client_socket(client->socket),
3087 &client->client_socket_ht_node);
3088 cds_lfht_add(state->client_id_ht,
3089 hash_client_id(client->id),
3090 &client->client_id_ht_node);
3091 rcu_read_unlock();
3092
3093 return ret;
3094
3095 error:
3096 notification_client_destroy(client, state);
3097 return ret;
3098 }
3099
3100 /*
3101 * RCU read-lock must be held by the caller.
3102 * Client lock must _not_ be held by the caller.
3103 */
3104 static
3105 int notification_thread_client_disconnect(
3106 struct notification_client *client,
3107 struct notification_thread_state *state)
3108 {
3109 int ret;
3110 struct lttng_condition_list_element *condition_list_element, *tmp;
3111
3112 /* Acquire the client lock to disable its communication atomically. */
3113 pthread_mutex_lock(&client->lock);
3114 client->communication.active = false;
3115 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3116 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3117 pthread_mutex_unlock(&client->lock);
3118
3119 ret = lttng_poll_del(&state->events, client->socket);
3120 if (ret) {
3121 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3122 client->socket);
3123 }
3124
3125 /* Release all conditions to which the client was subscribed. */
3126 cds_list_for_each_entry_safe(condition_list_element, tmp,
3127 &client->condition_list, node) {
3128 (void) notification_thread_client_unsubscribe(client,
3129 condition_list_element->condition, state, NULL);
3130 }
3131
3132 /*
3133 * Client no longer accessible to other threads (through the
3134 * client lists).
3135 */
3136 notification_client_destroy(client, state);
3137 return ret;
3138 }
3139
3140 int handle_notification_thread_client_disconnect(
3141 int client_socket, struct notification_thread_state *state)
3142 {
3143 int ret = 0;
3144 struct notification_client *client;
3145
3146 rcu_read_lock();
3147 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3148 client_socket);
3149 client = get_client_from_socket(client_socket, state);
3150 if (!client) {
3151 /* Internal state corruption, fatal error. */
3152 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3153 client_socket);
3154 ret = -1;
3155 goto end;
3156 }
3157
3158 ret = notification_thread_client_disconnect(client, state);
3159 end:
3160 rcu_read_unlock();
3161 return ret;
3162 }
3163
3164 int handle_notification_thread_client_disconnect_all(
3165 struct notification_thread_state *state)
3166 {
3167 struct cds_lfht_iter iter;
3168 struct notification_client *client;
3169 bool error_encoutered = false;
3170
3171 rcu_read_lock();
3172 DBG("[notification-thread] Closing all client connections");
3173 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
3174 client_socket_ht_node) {
3175 int ret;
3176
3177 ret = notification_thread_client_disconnect(
3178 client, state);
3179 if (ret) {
3180 error_encoutered = true;
3181 }
3182 }
3183 rcu_read_unlock();
3184 return error_encoutered ? 1 : 0;
3185 }
3186
3187 int handle_notification_thread_trigger_unregister_all(
3188 struct notification_thread_state *state)
3189 {
3190 bool error_occurred = false;
3191 struct cds_lfht_iter iter;
3192 struct lttng_trigger_ht_element *trigger_ht_element;
3193
3194 rcu_read_lock();
3195 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3196 node) {
3197 int ret = handle_notification_thread_command_unregister_trigger(
3198 state, trigger_ht_element->trigger, NULL);
3199 if (ret) {
3200 error_occurred = true;
3201 }
3202 }
3203 rcu_read_unlock();
3204 return error_occurred ? -1 : 0;
3205 }
3206
3207 static
3208 int client_handle_transmission_status(
3209 struct notification_client *client,
3210 enum client_transmission_status transmission_status,
3211 struct notification_thread_state *state)
3212 {
3213 int ret = 0;
3214
3215 switch (transmission_status) {
3216 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3217 ret = lttng_poll_mod(&state->events, client->socket,
3218 CLIENT_POLL_MASK_IN);
3219 if (ret) {
3220 goto end;
3221 }
3222
3223 break;
3224 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3225 /*
3226 * We want to be notified whenever there is buffer space
3227 * available to send the rest of the payload.
3228 */
3229 ret = lttng_poll_mod(&state->events, client->socket,
3230 CLIENT_POLL_MASK_IN_OUT);
3231 if (ret) {
3232 goto end;
3233 }
3234 break;
3235 case CLIENT_TRANSMISSION_STATUS_FAIL:
3236 ret = notification_thread_client_disconnect(client, state);
3237 if (ret) {
3238 goto end;
3239 }
3240 break;
3241 case CLIENT_TRANSMISSION_STATUS_ERROR:
3242 ret = -1;
3243 goto end;
3244 default:
3245 abort();
3246 }
3247 end:
3248 return ret;
3249 }
3250
3251 /* Client lock must be acquired by caller. */
3252 static
3253 enum client_transmission_status client_flush_outgoing_queue(
3254 struct notification_client *client)
3255 {
3256 ssize_t ret;
3257 size_t to_send_count;
3258 enum client_transmission_status status;
3259 struct lttng_payload_view pv = lttng_payload_view_from_payload(
3260 &client->communication.outbound.payload, 0, -1);
3261 const int fds_to_send_count =
3262 lttng_payload_view_get_fd_handle_count(&pv);
3263
3264 ASSERT_LOCKED(client->lock);
3265
3266 if (!client->communication.active) {
3267 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3268 goto end;
3269 }
3270
3271 if (pv.buffer.size == 0) {
3272 /*
3273 * If both data and fds are equal to zero, we are in an invalid
3274 * state.
3275 */
3276 assert(fds_to_send_count != 0);
3277 goto send_fds;
3278 }
3279
3280 /* Send data. */
3281 to_send_count = pv.buffer.size;
3282 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3283 client->socket);
3284
3285 ret = lttcomm_send_unix_sock_non_block(client->socket,
3286 pv.buffer.data,
3287 to_send_count);
3288 if ((ret >= 0 && ret < to_send_count)) {
3289 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3290 client->socket);
3291 to_send_count -= max(ret, 0);
3292
3293 memmove(client->communication.outbound.payload.buffer.data,
3294 pv.buffer.data +
3295 pv.buffer.size - to_send_count,
3296 to_send_count);
3297 ret = lttng_dynamic_buffer_set_size(
3298 &client->communication.outbound.payload.buffer,
3299 to_send_count);
3300 if (ret) {
3301 goto error;
3302 }
3303
3304 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3305 goto end;
3306 } else if (ret < 0) {
3307 /* Generic error, disable the client's communication. */
3308 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
3309 client->socket);
3310 client->communication.active = false;
3311 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3312 goto end;
3313 } else {
3314 /*
3315 * No error and flushed the queue completely.
3316 *
3317 * The payload buffer size is used later to
3318 * check if there is notifications queued. So albeit that the
3319 * direct caller knows that the transmission is complete, we
3320 * need to set the buffer size to zero.
3321 */
3322 ret = lttng_dynamic_buffer_set_size(
3323 &client->communication.outbound.payload.buffer, 0);
3324 if (ret) {
3325 goto error;
3326 }
3327 }
3328
3329 send_fds:
3330 /* No fds to send, transmission is complete. */
3331 if (fds_to_send_count == 0) {
3332 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3333 goto end;
3334 }
3335
3336 ret = lttcomm_send_payload_view_fds_unix_sock_non_block(
3337 client->socket, &pv);
3338 if (ret < 0) {
3339 /* Generic error, disable the client's communication. */
3340 ERR("[notification-thread] Failed to flush outgoing fds queue, disconnecting client (socket fd = %i)",
3341 client->socket);
3342 client->communication.active = false;
3343 status = CLIENT_TRANSMISSION_STATUS_FAIL;
3344 goto end;
3345 } else if (ret == 0) {
3346 /* Nothing could be sent. */
3347 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
3348 } else {
3349 /* Fd passing is an all or nothing kind of thing. */
3350 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
3351 /*
3352 * The payload _fd_array count is used later to
3353 * check if there is notifications queued. So although the
3354 * direct caller knows that the transmission is complete, we
3355 * need to clear the _fd_array for the queuing check.
3356 */
3357 lttng_dynamic_pointer_array_clear(
3358 &client->communication.outbound.payload
3359 ._fd_handles);
3360 }
3361
3362 end:
3363 if (status == CLIENT_TRANSMISSION_STATUS_COMPLETE) {
3364 client->communication.outbound.queued_command_reply = false;
3365 client->communication.outbound.dropped_notification = false;
3366 lttng_payload_clear(&client->communication.outbound.payload);
3367 }
3368
3369 return status;
3370 error:
3371 return CLIENT_TRANSMISSION_STATUS_ERROR;
3372 }
3373
3374 static
3375 bool client_has_outbound_data_left(
3376 const struct notification_client *client)
3377 {
3378 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
3379 &client->communication.outbound.payload, 0, -1);
3380 const bool has_data = pv.buffer.size != 0;
3381 const bool has_fds = lttng_payload_view_get_fd_handle_count(&pv);
3382
3383 return has_data || has_fds;
3384 }
3385
3386 /* Client lock must _not_ be held by the caller. */
3387 static
3388 int client_send_command_reply(struct notification_client *client,
3389 struct notification_thread_state *state,
3390 enum lttng_notification_channel_status status)
3391 {
3392 int ret;
3393 struct lttng_notification_channel_command_reply reply = {
3394 .status = (int8_t) status,
3395 };
3396 struct lttng_notification_channel_message msg = {
3397 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3398 .size = sizeof(reply),
3399 };
3400 char buffer[sizeof(msg) + sizeof(reply)];
3401 enum client_transmission_status transmission_status;
3402
3403 memcpy(buffer, &msg, sizeof(msg));
3404 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3405 DBG("[notification-thread] Send command reply (%i)", (int) status);
3406
3407 pthread_mutex_lock(&client->lock);
3408 if (client->communication.outbound.queued_command_reply) {
3409 /* Protocol error. */
3410 goto error_unlock;
3411 }
3412
3413 /* Enqueue buffer to outgoing queue and flush it. */
3414 ret = lttng_dynamic_buffer_append(
3415 &client->communication.outbound.payload.buffer,
3416 buffer, sizeof(buffer));
3417 if (ret) {
3418 goto error_unlock;
3419 }
3420
3421 transmission_status = client_flush_outgoing_queue(client);
3422
3423 if (client_has_outbound_data_left(client)) {
3424 /* Queue could not be emptied. */
3425 client->communication.outbound.queued_command_reply = true;
3426 }
3427
3428 pthread_mutex_unlock(&client->lock);
3429 ret = client_handle_transmission_status(
3430 client, transmission_status, state);
3431 if (ret) {
3432 goto error;
3433 }
3434
3435 return 0;
3436 error_unlock:
3437 pthread_mutex_unlock(&client->lock);
3438 error:
3439 return -1;
3440 }
3441
3442 static
3443 int client_handle_message_unknown(struct notification_client *client,
3444 struct notification_thread_state *state)
3445 {
3446 int ret;
3447 /*
3448 * Receiving message header. The function will be called again
3449 * once the rest of the message as been received and can be
3450 * interpreted.
3451 */
3452 const struct lttng_notification_channel_message *msg;
3453
3454 assert(sizeof(*msg) == client->communication.inbound.payload.buffer.size);
3455 msg = (const struct lttng_notification_channel_message *)
3456 client->communication.inbound.payload.buffer.data;
3457
3458 if (msg->size == 0 ||
3459 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3460 ERR("[notification-thread] Invalid notification channel message: length = %u",
3461 msg->size);
3462 ret = -1;
3463 goto end;
3464 }
3465
3466 switch (msg->type) {
3467 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3468 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3469 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3470 break;
3471 default:
3472 ret = -1;
3473 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3474 goto end;
3475 }
3476
3477 client->communication.inbound.bytes_to_receive = msg->size;
3478 client->communication.inbound.fds_to_receive = msg->fds;
3479 client->communication.inbound.msg_type =
3480 (enum lttng_notification_channel_message_type) msg->type;
3481 ret = lttng_dynamic_buffer_set_size(
3482 &client->communication.inbound.payload.buffer, msg->size);
3483
3484 /* msg is not valid anymore due to lttng_dynamic_buffer_set_size. */
3485 msg = NULL;
3486 end:
3487 return ret;
3488 }
3489
3490 static
3491 int client_handle_message_handshake(struct notification_client *client,
3492 struct notification_thread_state *state)
3493 {
3494 int ret;
3495 struct lttng_notification_channel_command_handshake *handshake_client;
3496 const struct lttng_notification_channel_command_handshake handshake_reply = {
3497 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3498 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3499 };
3500 const struct lttng_notification_channel_message msg_header = {
3501 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3502 .size = sizeof(handshake_reply),
3503 };
3504 enum lttng_notification_channel_status status =
3505 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3506 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3507
3508 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3509 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3510 sizeof(handshake_reply));
3511
3512 handshake_client =
3513 (struct lttng_notification_channel_command_handshake *)
3514 client->communication.inbound.payload.buffer
3515 .data;
3516 client->major = handshake_client->major;
3517 client->minor = handshake_client->minor;
3518 if (!client->communication.inbound.creds_received) {
3519 ERR("[notification-thread] No credentials received from client");
3520 ret = -1;
3521 goto end;
3522 }
3523
3524 client->uid = LTTNG_SOCK_GET_UID_CRED(
3525 &client->communication.inbound.creds);
3526 client->gid = LTTNG_SOCK_GET_GID_CRED(
3527 &client->communication.inbound.creds);
3528 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3529 client->uid, client->gid, (int) client->major,
3530 (int) client->minor);
3531
3532 if (handshake_client->major !=
3533 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3534 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3535 }
3536
3537 pthread_mutex_lock(&client->lock);
3538 /* Outgoing queue will be flushed when the command reply is sent. */
3539 ret = lttng_dynamic_buffer_append(
3540 &client->communication.outbound.payload.buffer, send_buffer,
3541 sizeof(send_buffer));
3542 if (ret) {
3543 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3544 goto end_unlock;
3545 }
3546
3547 client->validated = true;
3548 client->communication.active = true;
3549 pthread_mutex_unlock(&client->lock);
3550
3551 /* Set reception state to receive the next message header. */
3552 ret = client_reset_inbound_state(client);
3553 if (ret) {
3554 ERR("[notification-thread] Failed to reset client communication's inbound state");
3555 goto end;
3556 }
3557
3558 /* Flushes the outgoing queue. */
3559 ret = client_send_command_reply(client, state, status);
3560 if (ret) {
3561 ERR("[notification-thread] Failed to send reply to notification channel client");
3562 goto end;
3563 }
3564
3565 goto end;
3566 end_unlock:
3567 pthread_mutex_unlock(&client->lock);
3568 end:
3569 return ret;
3570 }
3571
3572 static
3573 int client_handle_message_subscription(
3574 struct notification_client *client,
3575 enum lttng_notification_channel_message_type msg_type,
3576 struct notification_thread_state *state)
3577 {
3578 int ret;
3579 struct lttng_condition *condition;
3580 enum lttng_notification_channel_status status =
3581 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3582 struct lttng_payload_view condition_view =
3583 lttng_payload_view_from_payload(
3584 &client->communication.inbound.payload,
3585 0, -1);
3586 size_t expected_condition_size;
3587
3588 /*
3589 * No need to lock client to sample the inbound state as the only
3590 * other thread accessing clients (action executor) only uses the
3591 * outbound state.
3592 */
3593 expected_condition_size = client->communication.inbound.payload.buffer.size;
3594 ret = lttng_condition_create_from_payload(&condition_view, &condition);
3595 if (ret != expected_condition_size) {
3596 ERR("[notification-thread] Malformed condition received from client");
3597 goto end;
3598 }
3599
3600 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3601 ret = notification_thread_client_subscribe(
3602 client, condition, state, &status);
3603 } else {
3604 ret = notification_thread_client_unsubscribe(
3605 client, condition, state, &status);
3606 }
3607
3608 if (ret) {
3609 goto end;
3610 }
3611
3612 /* Set reception state to receive the next message header. */
3613 ret = client_reset_inbound_state(client);
3614 if (ret) {
3615 ERR("[notification-thread] Failed to reset client communication's inbound state");
3616 goto end;
3617 }
3618
3619 ret = client_send_command_reply(client, state, status);
3620 if (ret) {
3621 ERR("[notification-thread] Failed to send reply to notification channel client");
3622 goto end;
3623 }
3624
3625 end:
3626 return ret;
3627 }
3628
3629 static
3630 int client_dispatch_message(struct notification_client *client,
3631 struct notification_thread_state *state)
3632 {
3633 int ret = 0;
3634
3635 if (client->communication.inbound.msg_type !=
3636 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3637 client->communication.inbound.msg_type !=
3638 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3639 !client->validated) {
3640 WARN("[notification-thread] client attempted a command before handshake");
3641 ret = -1;
3642 goto end;
3643 }
3644
3645 switch (client->communication.inbound.msg_type) {
3646 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3647 {
3648 ret = client_handle_message_unknown(client, state);
3649 break;
3650 }
3651 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3652 {
3653 ret = client_handle_message_handshake(client, state);
3654 break;
3655 }
3656 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3657 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3658 {
3659 ret = client_handle_message_subscription(client,
3660 client->communication.inbound.msg_type, state);
3661 break;
3662 }
3663 default:
3664 abort();
3665 }
3666 end:
3667 return ret;
3668 }
3669
3670 /* Incoming data from client. */
3671 int handle_notification_thread_client_in(
3672 struct notification_thread_state *state, int socket)
3673 {
3674 int ret = 0;
3675 struct notification_client *client;
3676 ssize_t recv_ret;
3677 size_t offset;
3678
3679 rcu_read_lock();
3680 client = get_client_from_socket(socket, state);
3681 if (!client) {
3682 /* Internal error, abort. */
3683 ret = -1;
3684 goto end;
3685 }
3686
3687 offset = client->communication.inbound.payload.buffer.size -
3688 client->communication.inbound.bytes_to_receive;
3689 if (client->communication.inbound.expect_creds) {
3690 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3691 client->communication.inbound.payload.buffer.data + offset,
3692 client->communication.inbound.bytes_to_receive,
3693 &client->communication.inbound.creds);
3694 if (recv_ret > 0) {
3695 client->communication.inbound.expect_creds = false;
3696 client->communication.inbound.creds_received = true;
3697 }
3698 } else {
3699 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3700 client->communication.inbound.payload.buffer.data + offset,
3701 client->communication.inbound.bytes_to_receive);
3702 }
3703 if (recv_ret >= 0) {
3704 client->communication.inbound.bytes_to_receive -= recv_ret;
3705 } else {
3706 goto error_disconnect_client;
3707 }
3708
3709 if (client->communication.inbound.bytes_to_receive != 0) {
3710 /* Message incomplete wait for more data. */
3711 ret = 0;
3712 goto end;
3713 }
3714
3715 assert(client->communication.inbound.bytes_to_receive == 0);
3716
3717 /* Receive fds. */
3718 if (client->communication.inbound.fds_to_receive != 0) {
3719 ret = lttcomm_recv_payload_fds_unix_sock_non_block(
3720 client->socket,
3721 client->communication.inbound.fds_to_receive,
3722 &client->communication.inbound.payload);
3723 if (ret > 0) {
3724 /*
3725 * Fds received. non blocking fds passing is all
3726 * or nothing.
3727 */
3728 ssize_t expected_size;
3729
3730 expected_size = sizeof(int) *
3731 client->communication.inbound
3732 .fds_to_receive;
3733 assert(ret == expected_size);
3734 client->communication.inbound.fds_to_receive = 0;
3735 } else if (ret == 0) {
3736 /* Received nothing. */
3737 ret = 0;
3738 goto end;
3739 } else {
3740 goto error_disconnect_client;
3741 }
3742 }
3743
3744 /* At this point the message is complete.*/
3745 assert(client->communication.inbound.bytes_to_receive == 0 &&
3746 client->communication.inbound.fds_to_receive == 0);
3747 ret = client_dispatch_message(client, state);
3748 if (ret) {
3749 /*
3750 * Only returns an error if this client must be
3751 * disconnected.
3752 */
3753 goto error_disconnect_client;
3754 }
3755
3756 end:
3757 rcu_read_unlock();
3758 return ret;
3759
3760 error_disconnect_client:
3761 ret = notification_thread_client_disconnect(client, state);
3762 goto end;
3763 }
3764
3765 /* Client ready to receive outgoing data. */
3766 int handle_notification_thread_client_out(
3767 struct notification_thread_state *state, int socket)
3768 {
3769 int ret;
3770 struct notification_client *client;
3771 enum client_transmission_status transmission_status;
3772
3773 rcu_read_lock();
3774 client = get_client_from_socket(socket, state);
3775 if (!client) {
3776 /* Internal error, abort. */
3777 ret = -1;
3778 goto end;
3779 }
3780
3781 pthread_mutex_lock(&client->lock);
3782 transmission_status = client_flush_outgoing_queue(client);
3783 pthread_mutex_unlock(&client->lock);
3784
3785 ret = client_handle_transmission_status(
3786 client, transmission_status, state);
3787 if (ret) {
3788 goto end;
3789 }
3790 end:
3791 rcu_read_unlock();
3792 return ret;
3793 }
3794
3795 static
3796 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3797 const struct channel_state_sample *sample,
3798 uint64_t buffer_capacity)
3799 {
3800 bool result = false;
3801 uint64_t threshold;
3802 enum lttng_condition_type condition_type;
3803 const struct lttng_condition_buffer_usage *use_condition = container_of(
3804 condition, struct lttng_condition_buffer_usage,
3805 parent);
3806
3807 if (use_condition->threshold_bytes.set) {
3808 threshold = use_condition->threshold_bytes.value;
3809 } else {
3810 /*
3811 * Threshold was expressed as a ratio.
3812 *
3813 * TODO the threshold (in bytes) of conditions expressed
3814 * as a ratio of total buffer size could be cached to
3815 * forego this double-multiplication or it could be performed
3816 * as fixed-point math.
3817 *
3818 * Note that caching should accommodates the case where the
3819 * condition applies to multiple channels (i.e. don't assume
3820 * that all channels matching my_chann* have the same size...)
3821 */
3822 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3823 (double) buffer_capacity);
3824 }
3825
3826 condition_type = lttng_condition_get_type(condition);
3827 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3828 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3829 threshold, sample->highest_usage);
3830
3831 /*
3832 * The low condition should only be triggered once _all_ of the
3833 * streams in a channel have gone below the "low" threshold.
3834 */
3835 if (sample->highest_usage <= threshold) {
3836 result = true;
3837 }
3838 } else {
3839 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3840 threshold, sample->highest_usage);
3841
3842 /*
3843 * For high buffer usage scenarios, we want to trigger whenever
3844 * _any_ of the streams has reached the "high" threshold.
3845 */
3846 if (sample->highest_usage >= threshold) {
3847 result = true;
3848 }
3849 }
3850
3851 return result;
3852 }
3853
3854 static
3855 bool evaluate_session_consumed_size_condition(
3856 const struct lttng_condition *condition,
3857 uint64_t session_consumed_size)
3858 {
3859 uint64_t threshold;
3860 const struct lttng_condition_session_consumed_size *size_condition =
3861 container_of(condition,
3862 struct lttng_condition_session_consumed_size,
3863 parent);
3864
3865 threshold = size_condition->consumed_threshold_bytes.value;
3866 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3867 threshold, session_consumed_size);
3868 return session_consumed_size >= threshold;
3869 }
3870
3871 static
3872 int evaluate_buffer_condition(const struct lttng_condition *condition,
3873 struct lttng_evaluation **evaluation,
3874 const struct notification_thread_state *state,
3875 const struct channel_state_sample *previous_sample,
3876 const struct channel_state_sample *latest_sample,
3877 uint64_t previous_session_consumed_total,
3878 uint64_t latest_session_consumed_total,
3879 struct channel_info *channel_info)
3880 {
3881 int ret = 0;
3882 enum lttng_condition_type condition_type;
3883 const bool previous_sample_available = !!previous_sample;
3884 bool previous_sample_result = false;
3885 bool latest_sample_result;
3886
3887 condition_type = lttng_condition_get_type(condition);
3888
3889 switch (condition_type) {
3890 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3891 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3892 if (caa_likely(previous_sample_available)) {
3893 previous_sample_result =
3894 evaluate_buffer_usage_condition(condition,
3895 previous_sample, channel_info->capacity);
3896 }
3897 latest_sample_result = evaluate_buffer_usage_condition(
3898 condition, latest_sample,
3899 channel_info->capacity);
3900 break;
3901 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3902 if (caa_likely(previous_sample_available)) {
3903 previous_sample_result =
3904 evaluate_session_consumed_size_condition(
3905 condition,
3906 previous_session_consumed_total);
3907 }
3908 latest_sample_result =
3909 evaluate_session_consumed_size_condition(
3910 condition,
3911 latest_session_consumed_total);
3912 break;
3913 default:
3914 /* Unknown condition type; internal error. */
3915 abort();
3916 }
3917
3918 if (!latest_sample_result ||
3919 (previous_sample_result == latest_sample_result)) {
3920 /*
3921 * Only trigger on a condition evaluation transition.
3922 *
3923 * NOTE: This edge-triggered logic may not be appropriate for
3924 * future condition types.
3925 */
3926 goto end;
3927 }
3928
3929 if (!evaluation || !latest_sample_result) {
3930 goto end;
3931 }
3932
3933 switch (condition_type) {
3934 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3935 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3936 *evaluation = lttng_evaluation_buffer_usage_create(
3937 condition_type,
3938 latest_sample->highest_usage,
3939 channel_info->capacity);
3940 break;
3941 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3942 *evaluation = lttng_evaluation_session_consumed_size_create(
3943 latest_session_consumed_total);
3944 break;
3945 default:
3946 abort();
3947 }
3948
3949 if (!*evaluation) {
3950 ret = -1;
3951 goto end;
3952 }
3953 end:
3954 return ret;
3955 }
3956
3957 static
3958 int client_notification_overflow(struct notification_client *client)
3959 {
3960 int ret = 0;
3961 const struct lttng_notification_channel_message msg = {
3962 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3963 };
3964
3965 ASSERT_LOCKED(client->lock);
3966
3967 DBG("Dropping notification addressed to client (socket fd = %i)",
3968 client->socket);
3969 if (client->communication.outbound.dropped_notification) {
3970 /*
3971 * The client already has a "notification dropped" message
3972 * in its outgoing queue. Nothing to do since all
3973 * of those messages are coalesced.
3974 */
3975 goto end;
3976 }
3977
3978 client->communication.outbound.dropped_notification = true;
3979 ret = lttng_dynamic_buffer_append(
3980 &client->communication.outbound.payload.buffer, &msg,
3981 sizeof(msg));
3982 if (ret) {
3983 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3984 client->socket);
3985 }
3986 end:
3987 return ret;
3988 }
3989
3990 static int client_handle_transmission_status_wrapper(
3991 struct notification_client *client,
3992 enum client_transmission_status status,
3993 void *user_data)
3994 {
3995 return client_handle_transmission_status(client, status,
3996 (struct notification_thread_state *) user_data);
3997 }
3998
3999 static
4000 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
4001 const struct lttng_evaluation *evaluation,
4002 struct notification_client_list* client_list,
4003 struct notification_thread_state *state,
4004 uid_t object_uid, gid_t object_gid)
4005 {
4006 const struct lttng_credentials creds = {
4007 .uid = LTTNG_OPTIONAL_INIT_VALUE(object_uid),
4008 .gid = LTTNG_OPTIONAL_INIT_VALUE(object_gid),
4009 };
4010
4011 return notification_client_list_send_evaluation(client_list,
4012 lttng_trigger_get_const_condition(trigger), evaluation,
4013 lttng_trigger_get_credentials(trigger),
4014 &creds,
4015 client_handle_transmission_status_wrapper, state);
4016 }
4017
4018 /*
4019 * Permission checks relative to notification channel clients are performed
4020 * here. Notice how object, client, and trigger credentials are involved in
4021 * this check.
4022 *
4023 * The `object` credentials are the credentials associated with the "subject"
4024 * of a condition. For instance, a `rotation completed` condition applies
4025 * to a session. When that condition is met, it will produce an evaluation
4026 * against a session. Hence, in this case, the `object` credentials are the
4027 * credentials of the "subject" session.
4028 *
4029 * The `trigger` credentials are the credentials of the user that registered the
4030 * trigger.
4031 *
4032 * The `client` credentials are the credentials of the user that created a given
4033 * notification channel.
4034 *
4035 * In terms of visibility, it is expected that non-privilieged users can only
4036 * register triggers against "their" objects (their own sessions and
4037 * applications they are allowed to interact with). They can then open a
4038 * notification channel and subscribe to notifications associated with those
4039 * triggers.
4040 *
4041 * As for privilieged users, they can register triggers against the objects of
4042 * other users. They can then subscribe to the notifications associated to their
4043 * triggers. Privilieged users _can't_ subscribe to the notifications of
4044 * triggers owned by other users; they must create their own triggers.
4045 *
4046 * This is more a concern of usability than security. It would be difficult for
4047 * a root user reliably subscribe to a specific set of conditions without
4048 * interference from external users (those could, for instance, unregister
4049 * their triggers).
4050 */
4051 LTTNG_HIDDEN
4052 int notification_client_list_send_evaluation(
4053 struct notification_client_list *client_list,
4054 const struct lttng_condition *condition,
4055 const struct lttng_evaluation *evaluation,
4056 const struct lttng_credentials *trigger_creds,
4057 const struct lttng_credentials *source_object_creds,
4058 report_client_transmission_result_cb client_report,
4059 void *user_data)
4060 {
4061 int ret = 0;
4062 struct lttng_payload msg_payload;
4063 struct notification_client_list_element *client_list_element, *tmp;
4064 const struct lttng_notification notification = {
4065 .condition = (struct lttng_condition *) condition,
4066 .evaluation = (struct lttng_evaluation *) evaluation,
4067 };
4068 struct lttng_notification_channel_message msg_header = {
4069 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
4070 };
4071
4072 lttng_payload_init(&msg_payload);
4073
4074 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
4075 sizeof(msg_header));
4076 if (ret) {
4077 goto end;
4078 }
4079
4080 ret = lttng_notification_serialize(&notification, &msg_payload);
4081 if (ret) {
4082 ERR("[notification-thread] Failed to serialize notification");
4083 ret = -1;
4084 goto end;
4085 }
4086
4087 /* Update payload size. */
4088 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
4089 ->size = (uint32_t)(
4090 msg_payload.buffer.size - sizeof(msg_header));
4091
4092 /* Update the payload number of fds. */
4093 {
4094 const struct lttng_payload_view pv = lttng_payload_view_from_payload(
4095 &msg_payload, 0, -1);
4096
4097 ((struct lttng_notification_channel_message *)
4098 msg_payload.buffer.data)->fds = (uint32_t)
4099 lttng_payload_view_get_fd_handle_count(&pv);
4100 }
4101
4102 pthread_mutex_lock(&client_list->lock);
4103 cds_list_for_each_entry_safe(client_list_element, tmp,
4104 &client_list->list, node) {
4105 enum client_transmission_status transmission_status;
4106 struct notification_client *client =
4107 client_list_element->client;
4108
4109 ret = 0;
4110 pthread_mutex_lock(&client->lock);
4111 if (!client->communication.active) {
4112 /*
4113 * Skip inactive client (protocol error or
4114 * disconnecting).
4115 */
4116 DBG("Skipping client at it is marked as inactive");
4117 goto skip_client;
4118 }
4119
4120 if (source_object_creds) {
4121 if (client->uid != lttng_credentials_get_uid(source_object_creds) &&
4122 client->gid != lttng_credentials_get_gid(source_object_creds) &&
4123 client->uid != 0) {
4124 /*
4125 * Client is not allowed to monitor this
4126 * object.
4127 */
4128 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
4129 goto skip_client;
4130 }
4131 }
4132
4133 if (client->uid != lttng_credentials_get_uid(trigger_creds) && client->gid != lttng_credentials_get_gid(trigger_creds)) {
4134 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
4135 goto skip_client;
4136 }
4137
4138 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
4139 client->socket, msg_payload.buffer.size);
4140
4141 if (client_has_outbound_data_left(client)) {
4142 /*
4143 * Outgoing data is already buffered for this client;
4144 * drop the notification and enqueue a "dropped
4145 * notification" message if this is the first dropped
4146 * notification since the socket spilled-over to the
4147 * queue.
4148 */
4149 ret = client_notification_overflow(client);
4150 if (ret) {
4151 /* Fatal error. */
4152 goto skip_client;
4153 }
4154 }
4155
4156 ret = lttng_payload_copy(&msg_payload, &client->communication.outbound.payload);
4157 if (ret) {
4158 /* Fatal error. */
4159 goto skip_client;
4160 }
4161
4162 transmission_status = client_flush_outgoing_queue(client);
4163 pthread_mutex_unlock(&client->lock);
4164 ret = client_report(client, transmission_status, user_data);
4165 if (ret) {
4166 /* Fatal error. */
4167 goto end_unlock_list;
4168 }
4169
4170 continue;
4171
4172 skip_client:
4173 pthread_mutex_unlock(&client->lock);
4174 if (ret) {
4175 /* Fatal error. */
4176 goto end_unlock_list;
4177 }
4178 }
4179 ret = 0;
4180
4181 end_unlock_list:
4182 pthread_mutex_unlock(&client_list->lock);
4183 end:
4184 lttng_payload_reset(&msg_payload);
4185 return ret;
4186 }
4187
4188 static struct lttng_trigger_notification *receive_notification(int pipe,
4189 enum lttng_domain_type domain)
4190 {
4191 int ret;
4192 uint64_t id;
4193 struct lttng_trigger_notification *notification = NULL;
4194 char *capture_buffer = NULL;
4195 size_t capture_buffer_size;
4196 void *reception_buffer;
4197 size_t reception_size;
4198
4199 struct lttng_ust_trigger_notification ust_notification;
4200 struct lttng_kernel_trigger_notification kernel_notification;
4201
4202 /* Init lttng_trigger_notification */
4203
4204 switch(domain) {
4205 case LTTNG_DOMAIN_UST:
4206 reception_buffer = (void *) &ust_notification;
4207 reception_size = sizeof(ust_notification);
4208 break;
4209 case LTTNG_DOMAIN_KERNEL:
4210 reception_buffer = (void *) &kernel_notification;
4211 reception_size = sizeof(kernel_notification);
4212 break;
4213 default:
4214 assert(0);
4215 }
4216
4217 /*
4218 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4219 * ensuring that read/write of sampling messages are atomic.
4220 */
4221 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4222
4223 ret = lttng_read(pipe, reception_buffer, reception_size);
4224 if (ret != reception_size) {
4225 PERROR("Failed to read from event source pipe (fd = %i, size to read=%zu, ret=%d)",
4226 pipe, reception_size, ret);
4227 /* TODO: Should this error out completly.
4228 * This can happen when an app is killed as of today
4229 * ret = -1 cause the whole thread to die and fuck up
4230 * everything.
4231 */
4232 goto end;
4233 }
4234
4235 switch(domain) {
4236 case LTTNG_DOMAIN_UST:
4237 id = ust_notification.id;
4238 capture_buffer_size =
4239 ust_notification.capture_buf_size;
4240 break;
4241 case LTTNG_DOMAIN_KERNEL:
4242 id = kernel_notification.id;
4243 capture_buffer_size =
4244 kernel_notification.capture_buf_size;
4245 break;
4246 default:
4247 assert(0);
4248 }
4249
4250 if (capture_buffer_size == 0) {
4251 capture_buffer = NULL;
4252 goto skip_capture;
4253 }
4254
4255 capture_buffer = zmalloc(capture_buffer_size);
4256 if (!capture_buffer) {
4257 ERR("[notification-thread] Failed to allocate capture buffer");
4258 goto end;
4259 }
4260
4261 /*
4262 * Fetch additional payload (capture).
4263 */
4264 ret = lttng_read(pipe, capture_buffer, capture_buffer_size);
4265 if (ret != capture_buffer_size) {
4266 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4267 pipe);
4268 /* TODO: Should this error out completly.
4269 * This can happen when an app is killed as of today
4270 * ret = -1 cause the whole thread to die and fuck up
4271 * everything.
4272 */
4273 goto end;
4274 }
4275
4276 skip_capture:
4277 notification = lttng_trigger_notification_create(
4278 id, domain, capture_buffer, capture_buffer_size);
4279 if (notification == NULL) {
4280 goto end;
4281 }
4282
4283 /* Ownership transfered to the lttng_trigger_notification object */
4284 capture_buffer = NULL;
4285
4286 end:
4287 free(capture_buffer);
4288 return notification;
4289 }
4290
4291 int handle_notification_thread_event(struct notification_thread_state *state,
4292 int pipe,
4293 enum lttng_domain_type domain)
4294 {
4295 int ret;
4296 enum lttng_trigger_status trigger_status;
4297 struct cds_lfht_node *node;
4298 struct cds_lfht_iter iter;
4299 struct notification_trigger_tokens_ht_element *element;
4300 struct lttng_evaluation *evaluation = NULL;
4301 struct lttng_trigger_notification *notification = NULL;
4302 enum action_executor_status executor_status;
4303 struct notification_client_list *client_list = NULL;
4304 const char *trigger_name;
4305 unsigned int capture_count = 0;
4306
4307 notification = receive_notification(pipe, domain);
4308 if (notification == NULL) {
4309 ERR("[notification-thread] Error receiving notification from tracer (fd = %i, domain = %s)",
4310 pipe, lttng_domain_type_str(domain));
4311 ret = -1;
4312 goto end;
4313 }
4314
4315 /* Find triggers associated with this token. */
4316 rcu_read_lock();
4317 cds_lfht_lookup(state->trigger_tokens_ht,
4318 hash_key_u64(&notification->id, lttng_ht_seed),
4319 match_trigger_token, &notification->id, &iter);
4320 node = cds_lfht_iter_get_node(&iter);
4321 if (caa_unlikely(!node)) {
4322 /* TODO: is this an error? This might happen if the receive side
4323 * is slow to process event from source and that the trigger was
4324 * removed but the app still kicking. This yield another
4325 * question on the trigger lifetime and when we can remove a
4326 * trigger. How to guarantee that all event with the token idea
4327 * have be processed? Do we want to provide this guarantee?
4328 *
4329 * Update: I have encountered this when using a trigger on
4330 * sched_switch and then removing it. The frequency is quite
4331 * high hence we en up exactly in the mentionned scenario.
4332 * AFAIK this might be the best way to handle this.
4333 */
4334 ret = 0;
4335 goto end_unlock;
4336 }
4337 element = caa_container_of(node,
4338 struct notification_trigger_tokens_ht_element,
4339 node);
4340
4341 if (!lttng_trigger_should_fire(element->trigger)) {
4342 ret = 0;
4343 goto end_unlock;
4344 }
4345
4346 lttng_trigger_fire(element->trigger);
4347
4348 trigger_status = lttng_trigger_get_name(element->trigger, &trigger_name);
4349 assert(trigger_status == LTTNG_TRIGGER_STATUS_OK);
4350
4351 if (LTTNG_CONDITION_STATUS_OK !=
4352 lttng_condition_event_rule_get_capture_descriptor_count(
4353 lttng_trigger_get_const_condition(
4354 element->trigger),
4355 &capture_count)) {
4356 ERR("Get capture count");
4357 ret = -1;
4358 goto end;
4359 }
4360
4361 if (!notification->capture_buffer && capture_count != 0) {
4362 ERR("Expected capture but capture buffer is null");
4363 ret = -1;
4364 goto end;
4365 }
4366
4367 evaluation = lttng_evaluation_event_rule_create(
4368 container_of(lttng_trigger_get_const_condition(
4369 element->trigger),
4370 struct lttng_condition_event_rule,
4371 parent),
4372 trigger_name,
4373 notification->capture_buffer,
4374 notification->capture_buf_size, false);
4375
4376 if (evaluation == NULL) {
4377 ERR("[notification-thread] Failed to create event rule hit evaluation");
4378 ret = -1;
4379 goto end_unlock;
4380 }
4381 client_list = get_client_list_from_condition(state,
4382 lttng_trigger_get_const_condition(element->trigger));
4383 executor_status = action_executor_enqueue(state->executor,
4384 element->trigger, evaluation, NULL, client_list);
4385 switch (executor_status) {
4386 case ACTION_EXECUTOR_STATUS_OK:
4387 ret = 0;
4388 break;
4389 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4390 {
4391 struct notification_client_list_element *client_list_element,
4392 *tmp;
4393
4394 /*
4395 * Not a fatal error; this is expected and simply means the
4396 * executor has too much work queued already.
4397 */
4398 ret = 0;
4399
4400 if (!client_list) {
4401 break;
4402 }
4403
4404 /* Warn clients that a notification (or more) was dropped. */
4405 pthread_mutex_lock(&client_list->lock);
4406 cds_list_for_each_entry_safe(client_list_element, tmp,
4407 &client_list->list, node) {
4408 enum client_transmission_status transmission_status;
4409 struct notification_client *client =
4410 client_list_element->client;
4411
4412 pthread_mutex_lock(&client->lock);
4413 ret = client_notification_overflow(client);
4414 if (ret) {
4415 /* Fatal error. */
4416 goto next_client;
4417 }
4418
4419 transmission_status =
4420 client_flush_outgoing_queue(client);
4421 ret = client_handle_transmission_status(
4422 client, transmission_status, state);
4423 if (ret) {
4424 /* Fatal error. */
4425 goto next_client;
4426 }
4427 next_client:
4428 pthread_mutex_unlock(&client->lock);
4429 if (ret) {
4430 break;
4431 }
4432 }
4433 pthread_mutex_unlock(&client_list->lock);
4434 break;
4435 }
4436 case ACTION_EXECUTOR_STATUS_INVALID:
4437 case ACTION_EXECUTOR_STATUS_ERROR:
4438 /* Fatal error, shut down everything. */
4439 ERR("Fatal error encoutered while enqueuing action");
4440 ret = -1;
4441 goto end_unlock;
4442 default:
4443 /* Unhandled error. */
4444 abort();
4445 }
4446
4447 end_unlock:
4448 lttng_trigger_notification_destroy(notification);
4449 notification_client_list_put(client_list);
4450 rcu_read_unlock();
4451 end:
4452 return ret;
4453 }
4454
4455 int handle_notification_thread_channel_sample(
4456 struct notification_thread_state *state, int pipe,
4457 enum lttng_domain_type domain)
4458 {
4459 int ret = 0;
4460 struct lttcomm_consumer_channel_monitor_msg sample_msg;
4461 struct channel_info *channel_info;
4462 struct cds_lfht_node *node;
4463 struct cds_lfht_iter iter;
4464 struct lttng_channel_trigger_list *trigger_list;
4465 struct lttng_trigger_list_element *trigger_list_element;
4466 bool previous_sample_available = false;
4467 struct channel_state_sample previous_sample, latest_sample;
4468 uint64_t previous_session_consumed_total, latest_session_consumed_total;
4469 struct lttng_credentials channel_creds;
4470
4471 /*
4472 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4473 * ensuring that read/write of sampling messages are atomic.
4474 */
4475 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
4476 if (ret != sizeof(sample_msg)) {
4477 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4478 pipe);
4479 ret = -1;
4480 goto end;
4481 }
4482
4483 ret = 0;
4484 latest_sample.key.key = sample_msg.key;
4485 latest_sample.key.domain = domain;
4486 latest_sample.highest_usage = sample_msg.highest;
4487 latest_sample.lowest_usage = sample_msg.lowest;
4488 latest_sample.channel_total_consumed = sample_msg.total_consumed;
4489
4490 rcu_read_lock();
4491
4492 /* Retrieve the channel's informations */
4493 cds_lfht_lookup(state->channels_ht,
4494 hash_channel_key(&latest_sample.key),
4495 match_channel_info,
4496 &latest_sample.key,
4497 &iter);
4498 node = cds_lfht_iter_get_node(&iter);
4499 if (caa_unlikely(!node)) {
4500 /*
4501 * Not an error since the consumer can push a sample to the pipe
4502 * and the rest of the session daemon could notify us of the
4503 * channel's destruction before we get a chance to process that
4504 * sample.
4505 */
4506 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4507 latest_sample.key.key,
4508 lttng_domain_type_str(domain));
4509 goto end_unlock;
4510 }
4511 channel_info = caa_container_of(node, struct channel_info,
4512 channels_ht_node);
4513 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
4514 channel_info->name,
4515 latest_sample.key.key,
4516 channel_info->session_info->name,
4517 latest_sample.highest_usage,
4518 latest_sample.lowest_usage,
4519 latest_sample.channel_total_consumed);
4520
4521 previous_session_consumed_total =
4522 channel_info->session_info->consumed_data_size;
4523
4524 /* Retrieve the channel's last sample, if it exists, and update it. */
4525 cds_lfht_lookup(state->channel_state_ht,
4526 hash_channel_key(&latest_sample.key),
4527 match_channel_state_sample,
4528 &latest_sample.key,
4529 &iter);
4530 node = cds_lfht_iter_get_node(&iter);
4531 if (caa_likely(node)) {
4532 struct channel_state_sample *stored_sample;
4533
4534 /* Update the sample stored. */
4535 stored_sample = caa_container_of(node,
4536 struct channel_state_sample,
4537 channel_state_ht_node);
4538
4539 memcpy(&previous_sample, stored_sample,
4540 sizeof(previous_sample));
4541 stored_sample->highest_usage = latest_sample.highest_usage;
4542 stored_sample->lowest_usage = latest_sample.lowest_usage;
4543 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
4544 previous_sample_available = true;
4545
4546 latest_session_consumed_total =
4547 previous_session_consumed_total +
4548 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
4549 } else {
4550 /*
4551 * This is the channel's first sample, allocate space for and
4552 * store the new sample.
4553 */
4554 struct channel_state_sample *stored_sample;
4555
4556 stored_sample = zmalloc(sizeof(*stored_sample));
4557 if (!stored_sample) {
4558 ret = -1;
4559 goto end_unlock;
4560 }
4561
4562 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4563 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4564 cds_lfht_add(state->channel_state_ht,
4565 hash_channel_key(&stored_sample->key),
4566 &stored_sample->channel_state_ht_node);
4567
4568 latest_session_consumed_total =
4569 previous_session_consumed_total +
4570 latest_sample.channel_total_consumed;
4571 }
4572
4573 channel_info->session_info->consumed_data_size =
4574 latest_session_consumed_total;
4575
4576 /* Find triggers associated with this channel. */
4577 cds_lfht_lookup(state->channel_triggers_ht,
4578 hash_channel_key(&latest_sample.key),
4579 match_channel_trigger_list,
4580 &latest_sample.key,
4581 &iter);
4582 node = cds_lfht_iter_get_node(&iter);
4583 if (caa_likely(!node)) {
4584 goto end_unlock;
4585 }
4586
4587 channel_creds = (typeof(channel_creds)) {
4588 .uid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->uid),
4589 .gid = LTTNG_OPTIONAL_INIT_VALUE(channel_info->session_info->gid),
4590 };
4591
4592 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4593 channel_triggers_ht_node);
4594 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4595 node) {
4596 const struct lttng_condition *condition;
4597 const struct lttng_action *action;
4598 struct lttng_trigger *trigger;
4599 struct notification_client_list *client_list = NULL;
4600 struct lttng_evaluation *evaluation = NULL;
4601 enum action_executor_status executor_status;
4602
4603 ret = 0;
4604 trigger = trigger_list_element->trigger;
4605 condition = lttng_trigger_get_const_condition(trigger);
4606 assert(condition);
4607 action = lttng_trigger_get_const_action(trigger);
4608
4609 if (!lttng_trigger_should_fire(trigger)) {
4610 goto put_list;
4611 }
4612
4613 lttng_trigger_fire(trigger);
4614
4615 /* Notify actions are the only type currently supported. */
4616 /* TODO support other type of action */
4617 assert(lttng_action_get_type(action) ==
4618 LTTNG_ACTION_TYPE_NOTIFY);
4619
4620 /*
4621 * Check if any client is subscribed to the result of this
4622 * evaluation.
4623 */
4624 client_list = get_client_list_from_condition(state, condition);
4625
4626 ret = evaluate_buffer_condition(condition, &evaluation, state,
4627 previous_sample_available ? &previous_sample : NULL,
4628 &latest_sample,
4629 previous_session_consumed_total,
4630 latest_session_consumed_total,
4631 channel_info);
4632 if (caa_unlikely(ret)) {
4633 goto put_list;
4634 }
4635
4636 if (caa_likely(!evaluation)) {
4637 goto put_list;
4638 }
4639
4640 /*
4641 * Ownership of `evaluation` transferred to the action executor
4642 * no matter the result.
4643 */
4644 executor_status = action_executor_enqueue(state->executor,
4645 trigger, evaluation, &channel_creds,
4646 client_list);
4647 evaluation = NULL;
4648 switch (executor_status) {
4649 case ACTION_EXECUTOR_STATUS_OK:
4650 break;
4651 case ACTION_EXECUTOR_STATUS_ERROR:
4652 case ACTION_EXECUTOR_STATUS_INVALID:
4653 /*
4654 * TODO Add trigger identification (name/id) when
4655 * it is added to the API.
4656 */
4657 ERR("Fatal error occurred while enqueuing action associated with buffer-condition trigger");
4658 ret = -1;
4659 goto put_list;
4660 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4661 /*
4662 * TODO Add trigger identification (name/id) when
4663 * it is added to the API.
4664 *
4665 * Not a fatal error.
4666 */
4667 WARN("No space left when enqueuing action associated with buffer-condition trigger");
4668 ret = 0;
4669 goto put_list;
4670 default:
4671 abort();
4672 }
4673
4674 put_list:
4675 notification_client_list_put(client_list);
4676 if (caa_unlikely(ret)) {
4677 break;
4678 }
4679 }
4680 end_unlock:
4681 rcu_read_unlock();
4682 end:
4683 return ret;
4684 }
This page took 0.172207 seconds and 5 git commands to generate.