Build a list of triggers applying to a given session on creation
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action-internal.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/condition/session-consumed-size-internal.h>
36 #include <lttng/condition/session-rotation-internal.h>
37 #include <lttng/notification/channel-internal.h>
38
39 #include <time.h>
40 #include <unistd.h>
41 #include <assert.h>
42 #include <inttypes.h>
43 #include <fcntl.h>
44
45 #include "notification-thread.h"
46 #include "notification-thread-events.h"
47 #include "notification-thread-commands.h"
48 #include "lttng-sessiond.h"
49 #include "kernel.h"
50
51 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
54 struct lttng_trigger_list_element {
55 /* No ownership of the trigger object is assumed. */
56 const struct lttng_trigger *trigger;
57 struct cds_list_head node;
58 };
59
60 struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
62 /* List of struct lttng_trigger_list_element. */
63 struct cds_list_head list;
64 /* Node in the channel_triggers_ht */
65 struct cds_lfht_node channel_triggers_ht_node;
66 };
67
68 /*
69 * List of triggers applying to a given session.
70 *
71 * See:
72 * - lttng_session_trigger_list_create()
73 * - lttng_session_trigger_list_build()
74 * - lttng_session_trigger_list_destroy()
75 * - lttng_session_trigger_list_add()
76 */
77 struct lttng_session_trigger_list {
78 /*
79 * Not owned by this; points to the session_info structure's
80 * session name.
81 */
82 const char *session_name;
83 /* List of struct lttng_trigger_list_element. */
84 struct cds_list_head list;
85 /* Node in the session_triggers_ht */
86 struct cds_lfht_node session_triggers_ht_node;
87 /*
88 * Weak reference to the notification system's session triggers
89 * hashtable.
90 *
91 * The session trigger list structure structure is owned by
92 * the session's session_info.
93 *
94 * The session_info is kept alive the the channel_infos holding a
95 * reference to it (reference counting). When those channels are
96 * destroyed (at runtime or on teardown), the reference they hold
97 * to the session_info are released. On destruction of session_info,
98 * session_info_destroy() will remove the list of triggers applying
99 * to this session from the notification system's state.
100 *
101 * This implies that the session_triggers_ht must be destroyed
102 * after the channels.
103 */
104 struct cds_lfht *session_triggers_ht;
105 /* Used for delayed RCU reclaim. */
106 struct rcu_head rcu_node;
107 };
108
109 struct lttng_trigger_ht_element {
110 struct lttng_trigger *trigger;
111 struct cds_lfht_node node;
112 };
113
114 struct lttng_condition_list_element {
115 struct lttng_condition *condition;
116 struct cds_list_head node;
117 };
118
119 struct notification_client_list_element {
120 struct notification_client *client;
121 struct cds_list_head node;
122 };
123
124 struct notification_client_list {
125 struct lttng_trigger *trigger;
126 struct cds_list_head list;
127 struct cds_lfht_node notification_trigger_ht_node;
128 };
129
130 struct notification_client {
131 int socket;
132 /* Client protocol version. */
133 uint8_t major, minor;
134 uid_t uid;
135 gid_t gid;
136 /*
137 * Indicates if the credentials and versions of the client have been
138 * checked.
139 */
140 bool validated;
141 /*
142 * Conditions to which the client's notification channel is subscribed.
143 * List of struct lttng_condition_list_node. The condition member is
144 * owned by the client.
145 */
146 struct cds_list_head condition_list;
147 struct cds_lfht_node client_socket_ht_node;
148 struct {
149 struct {
150 /*
151 * During the reception of a message, the reception
152 * buffers' "size" is set to contain the current
153 * message's complete payload.
154 */
155 struct lttng_dynamic_buffer buffer;
156 /* Bytes left to receive for the current message. */
157 size_t bytes_to_receive;
158 /* Type of the message being received. */
159 enum lttng_notification_channel_message_type msg_type;
160 /*
161 * Indicates whether or not credentials are expected
162 * from the client.
163 */
164 bool expect_creds;
165 /*
166 * Indicates whether or not credentials were received
167 * from the client.
168 */
169 bool creds_received;
170 /* Only used during credentials reception. */
171 lttng_sock_cred creds;
172 } inbound;
173 struct {
174 /*
175 * Indicates whether or not a notification addressed to
176 * this client was dropped because a command reply was
177 * already buffered.
178 *
179 * A notification is dropped whenever the buffer is not
180 * empty.
181 */
182 bool dropped_notification;
183 /*
184 * Indicates whether or not a command reply is already
185 * buffered. In this case, it means that the client is
186 * not consuming command replies before emitting a new
187 * one. This could be caused by a protocol error or a
188 * misbehaving/malicious client.
189 */
190 bool queued_command_reply;
191 struct lttng_dynamic_buffer buffer;
192 } outbound;
193 } communication;
194 };
195
196 struct channel_state_sample {
197 struct channel_key key;
198 struct cds_lfht_node channel_state_ht_node;
199 uint64_t highest_usage;
200 uint64_t lowest_usage;
201 uint64_t channel_total_consumed;
202 };
203
204 static unsigned long hash_channel_key(struct channel_key *key);
205 static int evaluate_condition(const struct lttng_condition *condition,
206 struct lttng_evaluation **evaluation,
207 const struct notification_thread_state *state,
208 const struct channel_state_sample *previous_sample,
209 const struct channel_state_sample *latest_sample,
210 uint64_t previous_session_consumed_total,
211 uint64_t latest_session_consumed_total,
212 struct channel_info *channel_info);
213 static
214 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
215 const struct lttng_evaluation *evaluation,
216 struct notification_client_list *client_list,
217 struct notification_thread_state *state,
218 uid_t channel_uid, gid_t channel_gid);
219
220
221 /* session_info API */
222 static
223 void session_info_destroy(void *_data);
224 static
225 void session_info_get(struct session_info *session_info);
226 static
227 void session_info_put(struct session_info *session_info);
228 static
229 struct session_info *session_info_create(const char *name,
230 uid_t uid, gid_t gid,
231 struct lttng_session_trigger_list *trigger_list);
232 static
233 void session_info_add_channel(struct session_info *session_info,
234 struct channel_info *channel_info);
235 static
236 void session_info_remove_channel(struct session_info *session_info,
237 struct channel_info *channel_info);
238
239 /* lttng_session_trigger_list API */
240 static
241 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
242 const char *session_name,
243 struct cds_lfht *session_triggers_ht);
244 static
245 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
246 const struct notification_thread_state *state,
247 const char *session_name);
248 static
249 void lttng_session_trigger_list_destroy(
250 struct lttng_session_trigger_list *list);
251 static
252 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
253 const struct lttng_trigger *trigger);
254
255
256 static
257 int match_client(struct cds_lfht_node *node, const void *key)
258 {
259 /* This double-cast is intended to supress pointer-to-cast warning. */
260 int socket = (int) (intptr_t) key;
261 struct notification_client *client;
262
263 client = caa_container_of(node, struct notification_client,
264 client_socket_ht_node);
265
266 return !!(client->socket == socket);
267 }
268
269 static
270 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
271 {
272 struct channel_key *channel_key = (struct channel_key *) key;
273 struct lttng_channel_trigger_list *trigger_list;
274
275 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
276 channel_triggers_ht_node);
277
278 return !!((channel_key->key == trigger_list->channel_key.key) &&
279 (channel_key->domain == trigger_list->channel_key.domain));
280 }
281
282 static
283 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
284 {
285 struct channel_key *channel_key = (struct channel_key *) key;
286 struct channel_state_sample *sample;
287
288 sample = caa_container_of(node, struct channel_state_sample,
289 channel_state_ht_node);
290
291 return !!((channel_key->key == sample->key.key) &&
292 (channel_key->domain == sample->key.domain));
293 }
294
295 static
296 int match_channel_info(struct cds_lfht_node *node, const void *key)
297 {
298 struct channel_key *channel_key = (struct channel_key *) key;
299 struct channel_info *channel_info;
300
301 channel_info = caa_container_of(node, struct channel_info,
302 channels_ht_node);
303
304 return !!((channel_key->key == channel_info->key.key) &&
305 (channel_key->domain == channel_info->key.domain));
306 }
307
308 static
309 int match_condition(struct cds_lfht_node *node, const void *key)
310 {
311 struct lttng_condition *condition_key = (struct lttng_condition *) key;
312 struct lttng_trigger_ht_element *trigger;
313 struct lttng_condition *condition;
314
315 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
316 node);
317 condition = lttng_trigger_get_condition(trigger->trigger);
318 assert(condition);
319
320 return !!lttng_condition_is_equal(condition_key, condition);
321 }
322
323 static
324 int match_client_list(struct cds_lfht_node *node, const void *key)
325 {
326 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
327 struct notification_client_list *client_list;
328 struct lttng_condition *condition;
329 struct lttng_condition *condition_key = lttng_trigger_get_condition(
330 trigger_key);
331
332 assert(condition_key);
333
334 client_list = caa_container_of(node, struct notification_client_list,
335 notification_trigger_ht_node);
336 condition = lttng_trigger_get_condition(client_list->trigger);
337
338 return !!lttng_condition_is_equal(condition_key, condition);
339 }
340
341 static
342 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
343 {
344 struct lttng_condition *condition_key = (struct lttng_condition *) key;
345 struct notification_client_list *client_list;
346 struct lttng_condition *condition;
347
348 assert(condition_key);
349
350 client_list = caa_container_of(node, struct notification_client_list,
351 notification_trigger_ht_node);
352 condition = lttng_trigger_get_condition(client_list->trigger);
353
354 return !!lttng_condition_is_equal(condition_key, condition);
355 }
356
357 static
358 unsigned long lttng_condition_buffer_usage_hash(
359 const struct lttng_condition *_condition)
360 {
361 unsigned long hash;
362 unsigned long condition_type;
363 struct lttng_condition_buffer_usage *condition;
364
365 condition = container_of(_condition,
366 struct lttng_condition_buffer_usage, parent);
367
368 condition_type = (unsigned long) condition->parent.type;
369 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
370 if (condition->session_name) {
371 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
372 }
373 if (condition->channel_name) {
374 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
375 }
376 if (condition->domain.set) {
377 hash ^= hash_key_ulong(
378 (void *) condition->domain.type,
379 lttng_ht_seed);
380 }
381 if (condition->threshold_ratio.set) {
382 uint64_t val;
383
384 val = condition->threshold_ratio.value * (double) UINT32_MAX;
385 hash ^= hash_key_u64(&val, lttng_ht_seed);
386 } else if (condition->threshold_bytes.set) {
387 uint64_t val;
388
389 val = condition->threshold_bytes.value;
390 hash ^= hash_key_u64(&val, lttng_ht_seed);
391 }
392 return hash;
393 }
394
395 static
396 unsigned long lttng_condition_session_consumed_size_hash(
397 const struct lttng_condition *_condition)
398 {
399 unsigned long hash;
400 unsigned long condition_type =
401 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
402 struct lttng_condition_session_consumed_size *condition;
403 uint64_t val;
404
405 condition = container_of(_condition,
406 struct lttng_condition_session_consumed_size, parent);
407
408 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
409 if (condition->session_name) {
410 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
411 }
412 val = condition->consumed_threshold_bytes.value;
413 hash ^= hash_key_u64(&val, lttng_ht_seed);
414 return hash;
415 }
416
417
418 /*
419 * The lttng_condition hashing code is kept in this file (rather than
420 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
421 * don't want to link in liblttng-ctl.
422 */
423 static
424 unsigned long lttng_condition_hash(const struct lttng_condition *condition)
425 {
426 switch (condition->type) {
427 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
428 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
429 return lttng_condition_buffer_usage_hash(condition);
430 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
431 return lttng_condition_session_consumed_size_hash(condition);
432 default:
433 ERR("[notification-thread] Unexpected condition type caught");
434 abort();
435 }
436 }
437
438 static
439 unsigned long hash_channel_key(struct channel_key *key)
440 {
441 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
442 unsigned long domain_hash = hash_key_ulong(
443 (void *) (unsigned long) key->domain, lttng_ht_seed);
444
445 return key_hash ^ domain_hash;
446 }
447
448 static
449 void channel_info_destroy(struct channel_info *channel_info)
450 {
451 if (!channel_info) {
452 return;
453 }
454
455 if (channel_info->session_info) {
456 session_info_remove_channel(channel_info->session_info,
457 channel_info);
458 session_info_put(channel_info->session_info);
459 }
460 if (channel_info->name) {
461 free(channel_info->name);
462 }
463 free(channel_info);
464 }
465
466 /* Don't call directly, use the ref-counting mechanism. */
467 static
468 void session_info_destroy(void *_data)
469 {
470 struct session_info *session_info = _data;
471 int ret;
472
473 assert(session_info);
474 if (session_info->channel_infos_ht) {
475 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
476 if (ret) {
477 ERR("[notification-thread] Failed to destroy channel information hash table");
478 }
479 }
480 lttng_session_trigger_list_destroy(session_info->trigger_list);
481 free(session_info->name);
482 free(session_info);
483 }
484
485 static
486 void session_info_get(struct session_info *session_info)
487 {
488 if (!session_info) {
489 return;
490 }
491 lttng_ref_get(&session_info->ref);
492 }
493
494 static
495 void session_info_put(struct session_info *session_info)
496 {
497 if (!session_info) {
498 return;
499 }
500 lttng_ref_put(&session_info->ref);
501 }
502
503 static
504 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
505 struct lttng_session_trigger_list *trigger_list)
506 {
507 struct session_info *session_info;
508
509 assert(name);
510
511 session_info = zmalloc(sizeof(*session_info));
512 if (!session_info) {
513 goto end;
514 }
515 lttng_ref_init(&session_info->ref, session_info_destroy);
516
517 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
518 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
519 if (!session_info->channel_infos_ht) {
520 goto error;
521 }
522
523 cds_lfht_node_init(&session_info->sessions_ht_node);
524 session_info->name = strdup(name);
525 if (!session_info->name) {
526 goto error;
527 }
528 session_info->uid = uid;
529 session_info->gid = gid;
530 session_info->trigger_list = trigger_list;
531 end:
532 return session_info;
533 error:
534 session_info_put(session_info);
535 return NULL;
536 }
537
538 static
539 void session_info_add_channel(struct session_info *session_info,
540 struct channel_info *channel_info)
541 {
542 rcu_read_lock();
543 cds_lfht_add(session_info->channel_infos_ht,
544 hash_channel_key(&channel_info->key),
545 &channel_info->session_info_channels_ht_node);
546 rcu_read_unlock();
547 }
548
549 static
550 void session_info_remove_channel(struct session_info *session_info,
551 struct channel_info *channel_info)
552 {
553 rcu_read_lock();
554 cds_lfht_del(session_info->channel_infos_ht,
555 &channel_info->session_info_channels_ht_node);
556 rcu_read_unlock();
557 }
558
559 static
560 struct channel_info *channel_info_create(const char *channel_name,
561 struct channel_key *channel_key, uint64_t channel_capacity,
562 struct session_info *session_info)
563 {
564 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
565
566 if (!channel_info) {
567 goto end;
568 }
569
570 cds_lfht_node_init(&channel_info->channels_ht_node);
571 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
572 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
573 channel_info->capacity = channel_capacity;
574
575 channel_info->name = strdup(channel_name);
576 if (!channel_info->name) {
577 goto error;
578 }
579
580 /*
581 * Set the references between session and channel infos:
582 * - channel_info holds a strong reference to session_info
583 * - session_info holds a weak reference to channel_info
584 */
585 session_info_get(session_info);
586 session_info_add_channel(session_info, channel_info);
587 channel_info->session_info = session_info;
588 end:
589 return channel_info;
590 error:
591 channel_info_destroy(channel_info);
592 return NULL;
593 }
594
595 /* This function must be called with the RCU read lock held. */
596 static
597 int evaluate_condition_for_client(struct lttng_trigger *trigger,
598 struct lttng_condition *condition,
599 struct notification_client *client,
600 struct notification_thread_state *state)
601 {
602 int ret;
603 struct cds_lfht_iter iter;
604 struct cds_lfht_node *node;
605 struct channel_info *channel_info = NULL;
606 struct channel_key *channel_key = NULL;
607 struct channel_state_sample *last_sample = NULL;
608 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
609 struct lttng_evaluation *evaluation = NULL;
610 struct notification_client_list client_list = { 0 };
611 struct notification_client_list_element client_list_element = { 0 };
612
613 assert(trigger);
614 assert(condition);
615 assert(client);
616 assert(state);
617
618 /* Find the channel associated with the trigger. */
619 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
620 channel_trigger_list , channel_triggers_ht_node) {
621 struct lttng_trigger_list_element *element;
622
623 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
624 const struct lttng_condition *current_condition =
625 lttng_trigger_get_const_condition(
626 element->trigger);
627
628 assert(current_condition);
629 if (!lttng_condition_is_equal(condition,
630 current_condition)) {
631 continue;
632 }
633
634 /* Found the trigger, save the channel key. */
635 channel_key = &channel_trigger_list->channel_key;
636 break;
637 }
638 if (channel_key) {
639 /* The channel key was found stop iteration. */
640 break;
641 }
642 }
643
644 if (!channel_key){
645 /* No channel found; normal exit. */
646 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
647 ret = 0;
648 goto end;
649 }
650
651 /* Fetch channel info for the matching channel. */
652 cds_lfht_lookup(state->channels_ht,
653 hash_channel_key(channel_key),
654 match_channel_info,
655 channel_key,
656 &iter);
657 node = cds_lfht_iter_get_node(&iter);
658 assert(node);
659 channel_info = caa_container_of(node, struct channel_info,
660 channels_ht_node);
661
662 /* Retrieve the channel's last sample, if it exists. */
663 cds_lfht_lookup(state->channel_state_ht,
664 hash_channel_key(channel_key),
665 match_channel_state_sample,
666 channel_key,
667 &iter);
668 node = cds_lfht_iter_get_node(&iter);
669 if (node) {
670 last_sample = caa_container_of(node,
671 struct channel_state_sample,
672 channel_state_ht_node);
673 } else {
674 /* Nothing to evaluate, no sample was ever taken. Normal exit */
675 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
676 ret = 0;
677 goto end;
678 }
679
680 ret = evaluate_condition(condition, &evaluation, state,
681 NULL, last_sample,
682 0, channel_info->session_info->consumed_data_size,
683 channel_info);
684 if (ret) {
685 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
686 goto end;
687 }
688
689 if (!evaluation) {
690 /* Evaluation yielded nothing. Normal exit. */
691 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
692 ret = 0;
693 goto end;
694 }
695
696 /*
697 * Create a temporary client list with the client currently
698 * subscribing.
699 */
700 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
701 CDS_INIT_LIST_HEAD(&client_list.list);
702 client_list.trigger = trigger;
703
704 CDS_INIT_LIST_HEAD(&client_list_element.node);
705 client_list_element.client = client;
706 cds_list_add(&client_list_element.node, &client_list.list);
707
708 /* Send evaluation result to the newly-subscribed client. */
709 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
710 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
711 state, channel_info->session_info->uid,
712 channel_info->session_info->gid);
713
714 end:
715 return ret;
716 }
717
718 static
719 int notification_thread_client_subscribe(struct notification_client *client,
720 struct lttng_condition *condition,
721 struct notification_thread_state *state,
722 enum lttng_notification_channel_status *_status)
723 {
724 int ret = 0;
725 struct cds_lfht_iter iter;
726 struct cds_lfht_node *node;
727 struct notification_client_list *client_list;
728 struct lttng_condition_list_element *condition_list_element = NULL;
729 struct notification_client_list_element *client_list_element = NULL;
730 enum lttng_notification_channel_status status =
731 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
732
733 /*
734 * Ensure that the client has not already subscribed to this condition
735 * before.
736 */
737 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
738 if (lttng_condition_is_equal(condition_list_element->condition,
739 condition)) {
740 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
741 goto end;
742 }
743 }
744
745 condition_list_element = zmalloc(sizeof(*condition_list_element));
746 if (!condition_list_element) {
747 ret = -1;
748 goto error;
749 }
750 client_list_element = zmalloc(sizeof(*client_list_element));
751 if (!client_list_element) {
752 ret = -1;
753 goto error;
754 }
755
756 rcu_read_lock();
757
758 /*
759 * Add the newly-subscribed condition to the client's subscription list.
760 */
761 CDS_INIT_LIST_HEAD(&condition_list_element->node);
762 condition_list_element->condition = condition;
763 cds_list_add(&condition_list_element->node, &client->condition_list);
764
765 cds_lfht_lookup(state->notification_trigger_clients_ht,
766 lttng_condition_hash(condition),
767 match_client_list_condition,
768 condition,
769 &iter);
770 node = cds_lfht_iter_get_node(&iter);
771 if (!node) {
772 /*
773 * No notification-emiting trigger registered with this
774 * condition. We don't evaluate the condition right away
775 * since this trigger is not registered yet.
776 */
777 free(client_list_element);
778 goto end_unlock;
779 }
780
781 client_list = caa_container_of(node, struct notification_client_list,
782 notification_trigger_ht_node);
783 /*
784 * The condition to which the client just subscribed is evaluated
785 * at this point so that conditions that are already TRUE result
786 * in a notification being sent out.
787 */
788 if (evaluate_condition_for_client(client_list->trigger, condition,
789 client, state)) {
790 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
791 ret = -1;
792 free(client_list_element);
793 goto end_unlock;
794 }
795
796 /*
797 * Add the client to the list of clients interested in a given trigger
798 * if a "notification" trigger with a corresponding condition was
799 * added prior.
800 */
801 client_list_element->client = client;
802 CDS_INIT_LIST_HEAD(&client_list_element->node);
803 cds_list_add(&client_list_element->node, &client_list->list);
804 end_unlock:
805 rcu_read_unlock();
806 end:
807 if (_status) {
808 *_status = status;
809 }
810 return ret;
811 error:
812 free(condition_list_element);
813 free(client_list_element);
814 return ret;
815 }
816
817 static
818 int notification_thread_client_unsubscribe(
819 struct notification_client *client,
820 struct lttng_condition *condition,
821 struct notification_thread_state *state,
822 enum lttng_notification_channel_status *_status)
823 {
824 struct cds_lfht_iter iter;
825 struct cds_lfht_node *node;
826 struct notification_client_list *client_list;
827 struct lttng_condition_list_element *condition_list_element,
828 *condition_tmp;
829 struct notification_client_list_element *client_list_element,
830 *client_tmp;
831 bool condition_found = false;
832 enum lttng_notification_channel_status status =
833 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
834
835 /* Remove the condition from the client's condition list. */
836 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
837 &client->condition_list, node) {
838 if (!lttng_condition_is_equal(condition_list_element->condition,
839 condition)) {
840 continue;
841 }
842
843 cds_list_del(&condition_list_element->node);
844 /*
845 * The caller may be iterating on the client's conditions to
846 * tear down a client's connection. In this case, the condition
847 * will be destroyed at the end.
848 */
849 if (condition != condition_list_element->condition) {
850 lttng_condition_destroy(
851 condition_list_element->condition);
852 }
853 free(condition_list_element);
854 condition_found = true;
855 break;
856 }
857
858 if (!condition_found) {
859 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
860 goto end;
861 }
862
863 /*
864 * Remove the client from the list of clients interested the trigger
865 * matching the condition.
866 */
867 rcu_read_lock();
868 cds_lfht_lookup(state->notification_trigger_clients_ht,
869 lttng_condition_hash(condition),
870 match_client_list_condition,
871 condition,
872 &iter);
873 node = cds_lfht_iter_get_node(&iter);
874 if (!node) {
875 goto end_unlock;
876 }
877
878 client_list = caa_container_of(node, struct notification_client_list,
879 notification_trigger_ht_node);
880 cds_list_for_each_entry_safe(client_list_element, client_tmp,
881 &client_list->list, node) {
882 if (client_list_element->client->socket != client->socket) {
883 continue;
884 }
885 cds_list_del(&client_list_element->node);
886 free(client_list_element);
887 break;
888 }
889 end_unlock:
890 rcu_read_unlock();
891 end:
892 lttng_condition_destroy(condition);
893 if (_status) {
894 *_status = status;
895 }
896 return 0;
897 }
898
899 static
900 void notification_client_destroy(struct notification_client *client,
901 struct notification_thread_state *state)
902 {
903 struct lttng_condition_list_element *condition_list_element, *tmp;
904
905 if (!client) {
906 return;
907 }
908
909 /* Release all conditions to which the client was subscribed. */
910 cds_list_for_each_entry_safe(condition_list_element, tmp,
911 &client->condition_list, node) {
912 (void) notification_thread_client_unsubscribe(client,
913 condition_list_element->condition, state, NULL);
914 }
915
916 if (client->socket >= 0) {
917 (void) lttcomm_close_unix_sock(client->socket);
918 }
919 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
920 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
921 free(client);
922 }
923
924 /*
925 * Call with rcu_read_lock held (and hold for the lifetime of the returned
926 * client pointer).
927 */
928 static
929 struct notification_client *get_client_from_socket(int socket,
930 struct notification_thread_state *state)
931 {
932 struct cds_lfht_iter iter;
933 struct cds_lfht_node *node;
934 struct notification_client *client = NULL;
935
936 cds_lfht_lookup(state->client_socket_ht,
937 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
938 match_client,
939 (void *) (unsigned long) socket,
940 &iter);
941 node = cds_lfht_iter_get_node(&iter);
942 if (!node) {
943 goto end;
944 }
945
946 client = caa_container_of(node, struct notification_client,
947 client_socket_ht_node);
948 end:
949 return client;
950 }
951
952 static
953 bool buffer_usage_condition_applies_to_channel(
954 struct lttng_condition *condition,
955 struct channel_info *channel_info)
956 {
957 enum lttng_condition_status status;
958 enum lttng_domain_type condition_domain;
959 const char *condition_session_name = NULL;
960 const char *condition_channel_name = NULL;
961
962 status = lttng_condition_buffer_usage_get_domain_type(condition,
963 &condition_domain);
964 assert(status == LTTNG_CONDITION_STATUS_OK);
965 if (channel_info->key.domain != condition_domain) {
966 goto fail;
967 }
968
969 status = lttng_condition_buffer_usage_get_session_name(
970 condition, &condition_session_name);
971 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
972
973 status = lttng_condition_buffer_usage_get_channel_name(
974 condition, &condition_channel_name);
975 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
976
977 if (strcmp(channel_info->session_info->name, condition_session_name)) {
978 goto fail;
979 }
980 if (strcmp(channel_info->name, condition_channel_name)) {
981 goto fail;
982 }
983
984 return true;
985 fail:
986 return false;
987 }
988
989 static
990 bool session_consumed_size_condition_applies_to_channel(
991 struct lttng_condition *condition,
992 struct channel_info *channel_info)
993 {
994 enum lttng_condition_status status;
995 const char *condition_session_name = NULL;
996
997 status = lttng_condition_session_consumed_size_get_session_name(
998 condition, &condition_session_name);
999 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1000
1001 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1002 goto fail;
1003 }
1004
1005 return true;
1006 fail:
1007 return false;
1008 }
1009
1010 static
1011 bool trigger_applies_to_channel(struct lttng_trigger *trigger,
1012 struct channel_info *channel_info)
1013 {
1014 struct lttng_condition *condition;
1015 bool trigger_applies;
1016
1017 condition = lttng_trigger_get_condition(trigger);
1018 if (!condition) {
1019 goto fail;
1020 }
1021
1022 switch (lttng_condition_get_type(condition)) {
1023 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1024 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1025 trigger_applies = buffer_usage_condition_applies_to_channel(
1026 condition, channel_info);
1027 break;
1028 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1029 trigger_applies = session_consumed_size_condition_applies_to_channel(
1030 condition, channel_info);
1031 break;
1032 default:
1033 goto fail;
1034 }
1035
1036 return trigger_applies;
1037 fail:
1038 return false;
1039 }
1040
1041 static
1042 bool trigger_applies_to_client(struct lttng_trigger *trigger,
1043 struct notification_client *client)
1044 {
1045 bool applies = false;
1046 struct lttng_condition_list_element *condition_list_element;
1047
1048 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1049 node) {
1050 applies = lttng_condition_is_equal(
1051 condition_list_element->condition,
1052 lttng_trigger_get_condition(trigger));
1053 if (applies) {
1054 break;
1055 }
1056 }
1057 return applies;
1058 }
1059
1060 static
1061 int match_session(struct cds_lfht_node *node, const void *key)
1062 {
1063 const char *name = key;
1064 struct session_info *session_info = caa_container_of(
1065 node, struct session_info, sessions_ht_node);
1066
1067 return !strcmp(session_info->name, name);
1068 }
1069
1070 /*
1071 * Allocate an empty lttng_session_trigger_list for the session named
1072 * 'session_name'.
1073 *
1074 * No ownership of 'session_name' is assumed by the session trigger list.
1075 * It is the caller's responsability to ensure the session name is alive
1076 * for as long as this list is.
1077 */
1078 static
1079 struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1080 const char *session_name,
1081 struct cds_lfht *session_triggers_ht)
1082 {
1083 struct lttng_session_trigger_list *list;
1084
1085 list = zmalloc(sizeof(*list));
1086 if (!list) {
1087 goto end;
1088 }
1089 list->session_name = session_name;
1090 CDS_INIT_LIST_HEAD(&list->list);
1091 cds_lfht_node_init(&list->session_triggers_ht_node);
1092 list->session_triggers_ht = session_triggers_ht;
1093
1094 rcu_read_lock();
1095 /* Publish the list through the session_triggers_ht. */
1096 cds_lfht_add(session_triggers_ht,
1097 hash_key_str(session_name, lttng_ht_seed),
1098 &list->session_triggers_ht_node);
1099 rcu_read_unlock();
1100 end:
1101 return list;
1102 }
1103
1104 static
1105 void free_session_trigger_list_rcu(struct rcu_head *node)
1106 {
1107 free(caa_container_of(node, struct lttng_session_trigger_list,
1108 rcu_node));
1109 }
1110
1111 static
1112 void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1113 {
1114 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1115
1116 /* Empty the list element by element, and then free the list itself. */
1117 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1118 &list->list, node) {
1119 cds_list_del(&trigger_list_element->node);
1120 free(trigger_list_element);
1121 }
1122 rcu_read_lock();
1123 /* Unpublish the list from the session_triggers_ht. */
1124 cds_lfht_del(list->session_triggers_ht,
1125 &list->session_triggers_ht_node);
1126 rcu_read_unlock();
1127 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1128 }
1129
1130 static
1131 int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1132 const struct lttng_trigger *trigger)
1133 {
1134 int ret = 0;
1135 struct lttng_trigger_list_element *new_element =
1136 zmalloc(sizeof(*new_element));
1137
1138 if (!new_element) {
1139 ret = -1;
1140 goto end;
1141 }
1142 CDS_INIT_LIST_HEAD(&new_element->node);
1143 new_element->trigger = trigger;
1144 cds_list_add(&new_element->node, &list->list);
1145 end:
1146 return ret;
1147 }
1148
1149 static
1150 bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1151 const char *session_name)
1152 {
1153 bool applies = false;
1154 const struct lttng_condition *condition;
1155
1156 condition = lttng_trigger_get_const_condition(trigger);
1157 switch (lttng_condition_get_type(condition)) {
1158 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1159 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1160 {
1161 enum lttng_condition_status condition_status;
1162 const char *condition_session_name;
1163
1164 condition_status = lttng_condition_session_rotation_get_session_name(
1165 condition, &condition_session_name);
1166 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1167 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1168 goto end;
1169 }
1170
1171 assert(condition_session_name);
1172 applies = !strcmp(condition_session_name, session_name);
1173 break;
1174 }
1175 default:
1176 goto end;
1177 }
1178 end:
1179 return applies;
1180 }
1181
1182 /*
1183 * Allocate and initialize an lttng_session_trigger_list which contains
1184 * all triggers that apply to the session named 'session_name'.
1185 *
1186 * No ownership of 'session_name' is assumed by the session trigger list.
1187 * It is the caller's responsability to ensure the session name is alive
1188 * for as long as this list is.
1189 */
1190 static
1191 struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1192 const struct notification_thread_state *state,
1193 const char *session_name)
1194 {
1195 int trigger_count = 0;
1196 struct lttng_session_trigger_list *session_trigger_list = NULL;
1197 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1198 struct cds_lfht_iter iter;
1199
1200 session_trigger_list = lttng_session_trigger_list_create(session_name,
1201 state->session_triggers_ht);
1202
1203 /* Add all triggers applying to the session named 'session_name'. */
1204 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1205 node) {
1206 int ret;
1207
1208 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1209 session_name)) {
1210 continue;
1211 }
1212
1213 ret = lttng_session_trigger_list_add(session_trigger_list,
1214 trigger_ht_element->trigger);
1215 if (ret) {
1216 goto error;
1217 }
1218
1219 trigger_count++;
1220 }
1221
1222 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1223 trigger_count);
1224 return session_trigger_list;
1225 error:
1226 lttng_session_trigger_list_destroy(session_trigger_list);
1227 return NULL;
1228 }
1229
1230 static
1231 struct session_info *find_or_create_session_info(
1232 struct notification_thread_state *state,
1233 const char *name, uid_t uid, gid_t gid)
1234 {
1235 struct session_info *session = NULL;
1236 struct cds_lfht_node *node;
1237 struct cds_lfht_iter iter;
1238 struct lttng_session_trigger_list *trigger_list;
1239
1240 rcu_read_lock();
1241 cds_lfht_lookup(state->sessions_ht,
1242 hash_key_str(name, lttng_ht_seed),
1243 match_session,
1244 name,
1245 &iter);
1246 node = cds_lfht_iter_get_node(&iter);
1247 if (node) {
1248 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1249 name, uid, gid);
1250 session = caa_container_of(node, struct session_info,
1251 sessions_ht_node);
1252 assert(session->uid == uid);
1253 assert(session->gid == gid);
1254 goto error;
1255 }
1256
1257 trigger_list = lttng_session_trigger_list_build(state, name);
1258 if (!trigger_list) {
1259 goto error;
1260 }
1261
1262 session = session_info_create(name, uid, gid, trigger_list);
1263 if (!session) {
1264 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1265 name, uid, gid);
1266 goto error;
1267 }
1268 trigger_list = NULL;
1269
1270 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1271 &session->sessions_ht_node);
1272 rcu_read_unlock();
1273 return session;
1274 error:
1275 rcu_read_unlock();
1276 session_info_put(session);
1277 return NULL;
1278 }
1279
1280 static
1281 int handle_notification_thread_command_add_channel(
1282 struct notification_thread_state *state,
1283 const char *session_name, uid_t session_uid, gid_t session_gid,
1284 const char *channel_name, enum lttng_domain_type channel_domain,
1285 uint64_t channel_key_int, uint64_t channel_capacity,
1286 enum lttng_error_code *cmd_result)
1287 {
1288 struct cds_list_head trigger_list;
1289 struct channel_info *new_channel_info = NULL;
1290 struct channel_key channel_key = {
1291 .key = channel_key_int,
1292 .domain = channel_domain,
1293 };
1294 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1295 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1296 int trigger_count = 0;
1297 struct cds_lfht_iter iter;
1298 struct session_info *session_info = NULL;
1299
1300 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1301 channel_name, session_name, channel_key_int,
1302 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1303
1304 CDS_INIT_LIST_HEAD(&trigger_list);
1305
1306 session_info = find_or_create_session_info(state, session_name,
1307 session_uid, session_gid);
1308 if (!session_info) {
1309 /* Allocation error or an internal error occured. */
1310 goto error;
1311 }
1312
1313 new_channel_info = channel_info_create(channel_name, &channel_key,
1314 channel_capacity, session_info);
1315 if (!new_channel_info) {
1316 goto error;
1317 }
1318
1319 /* Build a list of all triggers applying to the new channel. */
1320 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1321 node) {
1322 struct lttng_trigger_list_element *new_element;
1323
1324 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1325 new_channel_info)) {
1326 continue;
1327 }
1328
1329 new_element = zmalloc(sizeof(*new_element));
1330 if (!new_element) {
1331 goto error;
1332 }
1333 CDS_INIT_LIST_HEAD(&new_element->node);
1334 new_element->trigger = trigger_ht_element->trigger;
1335 cds_list_add(&new_element->node, &trigger_list);
1336 trigger_count++;
1337 }
1338
1339 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1340 trigger_count);
1341 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1342 if (!channel_trigger_list) {
1343 goto error;
1344 }
1345 channel_trigger_list->channel_key = new_channel_info->key;
1346 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1347 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1348 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1349
1350 rcu_read_lock();
1351 /* Add channel to the channel_ht which owns the channel_infos. */
1352 cds_lfht_add(state->channels_ht,
1353 hash_channel_key(&new_channel_info->key),
1354 &new_channel_info->channels_ht_node);
1355 /*
1356 * Add the list of triggers associated with this channel to the
1357 * channel_triggers_ht.
1358 */
1359 cds_lfht_add(state->channel_triggers_ht,
1360 hash_channel_key(&new_channel_info->key),
1361 &channel_trigger_list->channel_triggers_ht_node);
1362 rcu_read_unlock();
1363 *cmd_result = LTTNG_OK;
1364 return 0;
1365 error:
1366 channel_info_destroy(new_channel_info);
1367 session_info_put(session_info);
1368 return 1;
1369 }
1370
1371 static
1372 int handle_notification_thread_command_remove_channel(
1373 struct notification_thread_state *state,
1374 uint64_t channel_key, enum lttng_domain_type domain,
1375 enum lttng_error_code *cmd_result)
1376 {
1377 struct cds_lfht_node *node;
1378 struct cds_lfht_iter iter;
1379 struct lttng_channel_trigger_list *trigger_list;
1380 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1381 struct channel_key key = { .key = channel_key, .domain = domain };
1382 struct channel_info *channel_info;
1383
1384 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1385 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1386
1387 rcu_read_lock();
1388
1389 cds_lfht_lookup(state->channel_triggers_ht,
1390 hash_channel_key(&key),
1391 match_channel_trigger_list,
1392 &key,
1393 &iter);
1394 node = cds_lfht_iter_get_node(&iter);
1395 /*
1396 * There is a severe internal error if we are being asked to remove a
1397 * channel that doesn't exist.
1398 */
1399 if (!node) {
1400 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1401 goto end;
1402 }
1403
1404 /* Free the list of triggers associated with this channel. */
1405 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1406 channel_triggers_ht_node);
1407 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1408 &trigger_list->list, node) {
1409 cds_list_del(&trigger_list_element->node);
1410 free(trigger_list_element);
1411 }
1412 cds_lfht_del(state->channel_triggers_ht, node);
1413 free(trigger_list);
1414
1415 /* Free sampled channel state. */
1416 cds_lfht_lookup(state->channel_state_ht,
1417 hash_channel_key(&key),
1418 match_channel_state_sample,
1419 &key,
1420 &iter);
1421 node = cds_lfht_iter_get_node(&iter);
1422 /*
1423 * This is expected to be NULL if the channel is destroyed before we
1424 * received a sample.
1425 */
1426 if (node) {
1427 struct channel_state_sample *sample = caa_container_of(node,
1428 struct channel_state_sample,
1429 channel_state_ht_node);
1430
1431 cds_lfht_del(state->channel_state_ht, node);
1432 free(sample);
1433 }
1434
1435 /* Remove the channel from the channels_ht and free it. */
1436 cds_lfht_lookup(state->channels_ht,
1437 hash_channel_key(&key),
1438 match_channel_info,
1439 &key,
1440 &iter);
1441 node = cds_lfht_iter_get_node(&iter);
1442 assert(node);
1443 channel_info = caa_container_of(node, struct channel_info,
1444 channels_ht_node);
1445 cds_lfht_del(state->channels_ht, node);
1446 channel_info_destroy(channel_info);
1447 end:
1448 rcu_read_unlock();
1449 *cmd_result = LTTNG_OK;
1450 return 0;
1451 }
1452
1453 static
1454 int condition_is_supported(struct lttng_condition *condition)
1455 {
1456 int ret;
1457
1458 switch (lttng_condition_get_type(condition)) {
1459 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1460 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1461 {
1462 enum lttng_domain_type domain;
1463
1464 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1465 &domain);
1466 if (ret) {
1467 ret = -1;
1468 goto end;
1469 }
1470
1471 if (domain != LTTNG_DOMAIN_KERNEL) {
1472 ret = 1;
1473 goto end;
1474 }
1475
1476 /*
1477 * Older kernel tracers don't expose the API to monitor their
1478 * buffers. Therefore, we reject triggers that require that
1479 * mechanism to be available to be evaluated.
1480 */
1481 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1482 kernel_tracer_fd);
1483 break;
1484 }
1485 default:
1486 ret = 1;
1487 }
1488 end:
1489 return ret;
1490 }
1491
1492 /*
1493 * FIXME A client's credentials are not checked when registering a trigger, nor
1494 * are they stored alongside with the trigger.
1495 *
1496 * The effects of this are benign since:
1497 * - The client will succeed in registering the trigger, as it is valid,
1498 * - The trigger will, internally, be bound to the channel,
1499 * - The notifications will not be sent since the client's credentials
1500 * are checked against the channel at that moment.
1501 *
1502 * If this function returns a non-zero value, it means something is
1503 * fundamentally broken and the whole subsystem/thread will be torn down.
1504 *
1505 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1506 * error code.
1507 */
1508 static
1509 int handle_notification_thread_command_register_trigger(
1510 struct notification_thread_state *state,
1511 struct lttng_trigger *trigger,
1512 enum lttng_error_code *cmd_result)
1513 {
1514 int ret = 0;
1515 struct lttng_condition *condition;
1516 struct notification_client *client;
1517 struct notification_client_list *client_list = NULL;
1518 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1519 struct notification_client_list_element *client_list_element, *tmp;
1520 struct cds_lfht_node *node;
1521 struct cds_lfht_iter iter;
1522 struct channel_info *channel;
1523 bool free_trigger = true;
1524
1525 rcu_read_lock();
1526
1527 condition = lttng_trigger_get_condition(trigger);
1528 assert(condition);
1529
1530 ret = condition_is_supported(condition);
1531 if (ret < 0) {
1532 goto error;
1533 } else if (ret == 0) {
1534 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1535 goto error;
1536 } else {
1537 /* Feature is supported, continue. */
1538 ret = 0;
1539 }
1540
1541 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1542 if (!trigger_ht_element) {
1543 ret = -1;
1544 goto error;
1545 }
1546
1547 /* Add trigger to the trigger_ht. */
1548 cds_lfht_node_init(&trigger_ht_element->node);
1549 trigger_ht_element->trigger = trigger;
1550
1551 node = cds_lfht_add_unique(state->triggers_ht,
1552 lttng_condition_hash(condition),
1553 match_condition,
1554 condition,
1555 &trigger_ht_element->node);
1556 if (node != &trigger_ht_element->node) {
1557 /* Not a fatal error, simply report it to the client. */
1558 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1559 goto error_free_ht_element;
1560 }
1561
1562 /*
1563 * Ownership of the trigger and of its wrapper was transfered to
1564 * the triggers_ht.
1565 */
1566 trigger_ht_element = NULL;
1567 free_trigger = false;
1568
1569 /*
1570 * The rest only applies to triggers that have a "notify" action.
1571 * It is not skipped as this is the only action type currently
1572 * supported.
1573 */
1574 client_list = zmalloc(sizeof(*client_list));
1575 if (!client_list) {
1576 ret = -1;
1577 goto error_free_ht_element;
1578 }
1579 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1580 CDS_INIT_LIST_HEAD(&client_list->list);
1581 client_list->trigger = trigger;
1582
1583 /* Build a list of clients to which this new trigger applies. */
1584 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1585 client_socket_ht_node) {
1586 if (!trigger_applies_to_client(trigger, client)) {
1587 continue;
1588 }
1589
1590 client_list_element = zmalloc(sizeof(*client_list_element));
1591 if (!client_list_element) {
1592 ret = -1;
1593 goto error_free_client_list;
1594 }
1595 CDS_INIT_LIST_HEAD(&client_list_element->node);
1596 client_list_element->client = client;
1597 cds_list_add(&client_list_element->node, &client_list->list);
1598 }
1599
1600 cds_lfht_add(state->notification_trigger_clients_ht,
1601 lttng_condition_hash(condition),
1602 &client_list->notification_trigger_ht_node);
1603
1604 /*
1605 * Add the trigger to list of triggers bound to the channels currently
1606 * known.
1607 */
1608 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1609 channels_ht_node) {
1610 struct lttng_trigger_list_element *trigger_list_element;
1611 struct lttng_channel_trigger_list *trigger_list;
1612
1613 if (!trigger_applies_to_channel(trigger, channel)) {
1614 continue;
1615 }
1616
1617 cds_lfht_lookup(state->channel_triggers_ht,
1618 hash_channel_key(&channel->key),
1619 match_channel_trigger_list,
1620 &channel->key,
1621 &iter);
1622 node = cds_lfht_iter_get_node(&iter);
1623 assert(node);
1624 trigger_list = caa_container_of(node,
1625 struct lttng_channel_trigger_list,
1626 channel_triggers_ht_node);
1627
1628 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1629 if (!trigger_list_element) {
1630 ret = -1;
1631 goto error_free_client_list;
1632 }
1633 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1634 trigger_list_element->trigger = trigger;
1635 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1636 }
1637
1638 /*
1639 * Since there is nothing preventing clients from subscribing to a
1640 * condition before the corresponding trigger is registered, we have
1641 * to evaluate this new condition right away.
1642 *
1643 * At some point, we were waiting for the next "evaluation" (e.g. on
1644 * reception of a channel sample) to evaluate this new condition, but
1645 * that was broken.
1646 *
1647 * The reason it was broken is that waiting for the next sample
1648 * does not allow us to properly handle transitions for edge-triggered
1649 * conditions.
1650 *
1651 * Consider this example: when we handle a new channel sample, we
1652 * evaluate each conditions twice: once with the previous state, and
1653 * again with the newest state. We then use those two results to
1654 * determine whether a state change happened: a condition was false and
1655 * became true. If a state change happened, we have to notify clients.
1656 *
1657 * Now, if a client subscribes to a given notification and registers
1658 * a trigger *after* that subscription, we have to make sure the
1659 * condition is evaluated at this point while considering only the
1660 * current state. Otherwise, the next evaluation cycle may only see
1661 * that the evaluations remain the same (true for samples n-1 and n) and
1662 * the client will never know that the condition has been met.
1663 */
1664 cds_list_for_each_entry_safe(client_list_element, tmp,
1665 &client_list->list, node) {
1666 ret = evaluate_condition_for_client(trigger, condition,
1667 client_list_element->client, state);
1668 if (ret) {
1669 goto error_free_client_list;
1670 }
1671 }
1672
1673 /*
1674 * Client list ownership transferred to the
1675 * notification_trigger_clients_ht.
1676 */
1677 client_list = NULL;
1678
1679 *cmd_result = LTTNG_OK;
1680 error_free_client_list:
1681 if (client_list) {
1682 cds_list_for_each_entry_safe(client_list_element, tmp,
1683 &client_list->list, node) {
1684 free(client_list_element);
1685 }
1686 free(client_list);
1687 }
1688 error_free_ht_element:
1689 free(trigger_ht_element);
1690 error:
1691 if (free_trigger) {
1692 struct lttng_action *action = lttng_trigger_get_action(trigger);
1693
1694 lttng_condition_destroy(condition);
1695 lttng_action_destroy(action);
1696 lttng_trigger_destroy(trigger);
1697 }
1698 rcu_read_unlock();
1699 return ret;
1700 }
1701
1702 static
1703 int handle_notification_thread_command_unregister_trigger(
1704 struct notification_thread_state *state,
1705 struct lttng_trigger *trigger,
1706 enum lttng_error_code *_cmd_reply)
1707 {
1708 struct cds_lfht_iter iter;
1709 struct cds_lfht_node *node, *triggers_ht_node;
1710 struct lttng_channel_trigger_list *trigger_list;
1711 struct notification_client_list *client_list;
1712 struct notification_client_list_element *client_list_element, *tmp;
1713 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1714 struct lttng_condition *condition = lttng_trigger_get_condition(
1715 trigger);
1716 struct lttng_action *action;
1717 enum lttng_error_code cmd_reply;
1718
1719 rcu_read_lock();
1720
1721 cds_lfht_lookup(state->triggers_ht,
1722 lttng_condition_hash(condition),
1723 match_condition,
1724 condition,
1725 &iter);
1726 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1727 if (!triggers_ht_node) {
1728 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1729 goto end;
1730 } else {
1731 cmd_reply = LTTNG_OK;
1732 }
1733
1734 /* Remove trigger from channel_triggers_ht. */
1735 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1736 channel_triggers_ht_node) {
1737 struct lttng_trigger_list_element *trigger_element, *tmp;
1738
1739 cds_list_for_each_entry_safe(trigger_element, tmp,
1740 &trigger_list->list, node) {
1741 const struct lttng_condition *current_condition =
1742 lttng_trigger_get_const_condition(
1743 trigger_element->trigger);
1744
1745 assert(current_condition);
1746 if (!lttng_condition_is_equal(condition,
1747 current_condition)) {
1748 continue;
1749 }
1750
1751 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1752 cds_list_del(&trigger_element->node);
1753 /* A trigger can only appear once per channel */
1754 break;
1755 }
1756 }
1757
1758 /*
1759 * Remove and release the client list from
1760 * notification_trigger_clients_ht.
1761 */
1762 cds_lfht_lookup(state->notification_trigger_clients_ht,
1763 lttng_condition_hash(condition),
1764 match_client_list,
1765 trigger,
1766 &iter);
1767 node = cds_lfht_iter_get_node(&iter);
1768 assert(node);
1769 client_list = caa_container_of(node, struct notification_client_list,
1770 notification_trigger_ht_node);
1771 cds_list_for_each_entry_safe(client_list_element, tmp,
1772 &client_list->list, node) {
1773 free(client_list_element);
1774 }
1775 cds_lfht_del(state->notification_trigger_clients_ht, node);
1776 free(client_list);
1777
1778 /* Remove trigger from triggers_ht. */
1779 trigger_ht_element = caa_container_of(triggers_ht_node,
1780 struct lttng_trigger_ht_element, node);
1781 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1782
1783 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1784 lttng_condition_destroy(condition);
1785 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1786 lttng_action_destroy(action);
1787 lttng_trigger_destroy(trigger_ht_element->trigger);
1788 free(trigger_ht_element);
1789 end:
1790 rcu_read_unlock();
1791 if (_cmd_reply) {
1792 *_cmd_reply = cmd_reply;
1793 }
1794 return 0;
1795 }
1796
1797 /* Returns 0 on success, 1 on exit requested, negative value on error. */
1798 int handle_notification_thread_command(
1799 struct notification_thread_handle *handle,
1800 struct notification_thread_state *state)
1801 {
1802 int ret;
1803 uint64_t counter;
1804 struct notification_thread_command *cmd;
1805
1806 /* Read the event pipe to put it back into a quiescent state. */
1807 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1808 sizeof(counter));
1809 if (ret == -1) {
1810 goto error;
1811 }
1812
1813 pthread_mutex_lock(&handle->cmd_queue.lock);
1814 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1815 struct notification_thread_command, cmd_list_node);
1816 switch (cmd->type) {
1817 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1818 DBG("[notification-thread] Received register trigger command");
1819 ret = handle_notification_thread_command_register_trigger(
1820 state, cmd->parameters.trigger,
1821 &cmd->reply_code);
1822 break;
1823 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1824 DBG("[notification-thread] Received unregister trigger command");
1825 ret = handle_notification_thread_command_unregister_trigger(
1826 state, cmd->parameters.trigger,
1827 &cmd->reply_code);
1828 break;
1829 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1830 DBG("[notification-thread] Received add channel command");
1831 ret = handle_notification_thread_command_add_channel(
1832 state,
1833 cmd->parameters.add_channel.session.name,
1834 cmd->parameters.add_channel.session.uid,
1835 cmd->parameters.add_channel.session.gid,
1836 cmd->parameters.add_channel.channel.name,
1837 cmd->parameters.add_channel.channel.domain,
1838 cmd->parameters.add_channel.channel.key,
1839 cmd->parameters.add_channel.channel.capacity,
1840 &cmd->reply_code);
1841 break;
1842 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1843 DBG("[notification-thread] Received remove channel command");
1844 ret = handle_notification_thread_command_remove_channel(
1845 state, cmd->parameters.remove_channel.key,
1846 cmd->parameters.remove_channel.domain,
1847 &cmd->reply_code);
1848 break;
1849 case NOTIFICATION_COMMAND_TYPE_QUIT:
1850 DBG("[notification-thread] Received quit command");
1851 cmd->reply_code = LTTNG_OK;
1852 ret = 1;
1853 goto end;
1854 default:
1855 ERR("[notification-thread] Unknown internal command received");
1856 goto error_unlock;
1857 }
1858
1859 if (ret) {
1860 goto error_unlock;
1861 }
1862 end:
1863 cds_list_del(&cmd->cmd_list_node);
1864 lttng_waiter_wake_up(&cmd->reply_waiter);
1865 pthread_mutex_unlock(&handle->cmd_queue.lock);
1866 return ret;
1867 error_unlock:
1868 /* Wake-up and return a fatal error to the calling thread. */
1869 lttng_waiter_wake_up(&cmd->reply_waiter);
1870 pthread_mutex_unlock(&handle->cmd_queue.lock);
1871 cmd->reply_code = LTTNG_ERR_FATAL;
1872 error:
1873 /* Indicate a fatal error to the caller. */
1874 return -1;
1875 }
1876
1877 static
1878 unsigned long hash_client_socket(int socket)
1879 {
1880 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1881 }
1882
1883 static
1884 int socket_set_non_blocking(int socket)
1885 {
1886 int ret, flags;
1887
1888 /* Set the pipe as non-blocking. */
1889 ret = fcntl(socket, F_GETFL, 0);
1890 if (ret == -1) {
1891 PERROR("fcntl get socket flags");
1892 goto end;
1893 }
1894 flags = ret;
1895
1896 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1897 if (ret == -1) {
1898 PERROR("fcntl set O_NONBLOCK socket flag");
1899 goto end;
1900 }
1901 DBG("Client socket (fd = %i) set as non-blocking", socket);
1902 end:
1903 return ret;
1904 }
1905
1906 static
1907 int client_reset_inbound_state(struct notification_client *client)
1908 {
1909 int ret;
1910
1911 ret = lttng_dynamic_buffer_set_size(
1912 &client->communication.inbound.buffer, 0);
1913 assert(!ret);
1914
1915 client->communication.inbound.bytes_to_receive =
1916 sizeof(struct lttng_notification_channel_message);
1917 client->communication.inbound.msg_type =
1918 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1919 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1920 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1921 ret = lttng_dynamic_buffer_set_size(
1922 &client->communication.inbound.buffer,
1923 client->communication.inbound.bytes_to_receive);
1924 return ret;
1925 }
1926
1927 int handle_notification_thread_client_connect(
1928 struct notification_thread_state *state)
1929 {
1930 int ret;
1931 struct notification_client *client;
1932
1933 DBG("[notification-thread] Handling new notification channel client connection");
1934
1935 client = zmalloc(sizeof(*client));
1936 if (!client) {
1937 /* Fatal error. */
1938 ret = -1;
1939 goto error;
1940 }
1941 CDS_INIT_LIST_HEAD(&client->condition_list);
1942 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1943 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1944 client->communication.inbound.expect_creds = true;
1945 ret = client_reset_inbound_state(client);
1946 if (ret) {
1947 ERR("[notification-thread] Failed to reset client communication's inbound state");
1948 ret = 0;
1949 goto error;
1950 }
1951
1952 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1953 if (ret < 0) {
1954 ERR("[notification-thread] Failed to accept new notification channel client connection");
1955 ret = 0;
1956 goto error;
1957 }
1958
1959 client->socket = ret;
1960
1961 ret = socket_set_non_blocking(client->socket);
1962 if (ret) {
1963 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1964 goto error;
1965 }
1966
1967 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1968 if (ret < 0) {
1969 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1970 ret = 0;
1971 goto error;
1972 }
1973
1974 ret = lttng_poll_add(&state->events, client->socket,
1975 LPOLLIN | LPOLLERR |
1976 LPOLLHUP | LPOLLRDHUP);
1977 if (ret < 0) {
1978 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1979 ret = 0;
1980 goto error;
1981 }
1982 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1983 client->socket);
1984
1985 rcu_read_lock();
1986 cds_lfht_add(state->client_socket_ht,
1987 hash_client_socket(client->socket),
1988 &client->client_socket_ht_node);
1989 rcu_read_unlock();
1990
1991 return ret;
1992 error:
1993 notification_client_destroy(client, state);
1994 return ret;
1995 }
1996
1997 int handle_notification_thread_client_disconnect(
1998 int client_socket,
1999 struct notification_thread_state *state)
2000 {
2001 int ret = 0;
2002 struct notification_client *client;
2003
2004 rcu_read_lock();
2005 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2006 client_socket);
2007 client = get_client_from_socket(client_socket, state);
2008 if (!client) {
2009 /* Internal state corruption, fatal error. */
2010 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2011 client_socket);
2012 ret = -1;
2013 goto end;
2014 }
2015
2016 ret = lttng_poll_del(&state->events, client_socket);
2017 if (ret) {
2018 ERR("[notification-thread] Failed to remove client socket from poll set");
2019 }
2020 cds_lfht_del(state->client_socket_ht,
2021 &client->client_socket_ht_node);
2022 notification_client_destroy(client, state);
2023 end:
2024 rcu_read_unlock();
2025 return ret;
2026 }
2027
2028 int handle_notification_thread_client_disconnect_all(
2029 struct notification_thread_state *state)
2030 {
2031 struct cds_lfht_iter iter;
2032 struct notification_client *client;
2033 bool error_encoutered = false;
2034
2035 rcu_read_lock();
2036 DBG("[notification-thread] Closing all client connections");
2037 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2038 client_socket_ht_node) {
2039 int ret;
2040
2041 ret = handle_notification_thread_client_disconnect(
2042 client->socket, state);
2043 if (ret) {
2044 error_encoutered = true;
2045 }
2046 }
2047 rcu_read_unlock();
2048 return error_encoutered ? 1 : 0;
2049 }
2050
2051 int handle_notification_thread_trigger_unregister_all(
2052 struct notification_thread_state *state)
2053 {
2054 bool error_occurred = false;
2055 struct cds_lfht_iter iter;
2056 struct lttng_trigger_ht_element *trigger_ht_element;
2057
2058 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2059 node) {
2060 int ret = handle_notification_thread_command_unregister_trigger(
2061 state, trigger_ht_element->trigger, NULL);
2062 if (ret) {
2063 error_occurred = true;
2064 }
2065 }
2066 return error_occurred ? -1 : 0;
2067 }
2068
2069 static
2070 int client_flush_outgoing_queue(struct notification_client *client,
2071 struct notification_thread_state *state)
2072 {
2073 ssize_t ret;
2074 size_t to_send_count;
2075
2076 assert(client->communication.outbound.buffer.size != 0);
2077 to_send_count = client->communication.outbound.buffer.size;
2078 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2079 client->socket);
2080
2081 ret = lttcomm_send_unix_sock_non_block(client->socket,
2082 client->communication.outbound.buffer.data,
2083 to_send_count);
2084 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2085 (ret > 0 && ret < to_send_count)) {
2086 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2087 client->socket);
2088 to_send_count -= max(ret, 0);
2089
2090 memcpy(client->communication.outbound.buffer.data,
2091 client->communication.outbound.buffer.data +
2092 client->communication.outbound.buffer.size - to_send_count,
2093 to_send_count);
2094 ret = lttng_dynamic_buffer_set_size(
2095 &client->communication.outbound.buffer,
2096 to_send_count);
2097 if (ret) {
2098 goto error;
2099 }
2100
2101 /*
2102 * We want to be notified whenever there is buffer space
2103 * available to send the rest of the payload.
2104 */
2105 ret = lttng_poll_mod(&state->events, client->socket,
2106 CLIENT_POLL_MASK_IN_OUT);
2107 if (ret) {
2108 goto error;
2109 }
2110 } else if (ret < 0) {
2111 /* Generic error, disconnect the client. */
2112 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2113 client->socket);
2114 ret = handle_notification_thread_client_disconnect(
2115 client->socket, state);
2116 if (ret) {
2117 goto error;
2118 }
2119 } else {
2120 /* No error and flushed the queue completely. */
2121 ret = lttng_dynamic_buffer_set_size(
2122 &client->communication.outbound.buffer, 0);
2123 if (ret) {
2124 goto error;
2125 }
2126 ret = lttng_poll_mod(&state->events, client->socket,
2127 CLIENT_POLL_MASK_IN);
2128 if (ret) {
2129 goto error;
2130 }
2131
2132 client->communication.outbound.queued_command_reply = false;
2133 client->communication.outbound.dropped_notification = false;
2134 }
2135
2136 return 0;
2137 error:
2138 return -1;
2139 }
2140
2141 static
2142 int client_send_command_reply(struct notification_client *client,
2143 struct notification_thread_state *state,
2144 enum lttng_notification_channel_status status)
2145 {
2146 int ret;
2147 struct lttng_notification_channel_command_reply reply = {
2148 .status = (int8_t) status,
2149 };
2150 struct lttng_notification_channel_message msg = {
2151 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2152 .size = sizeof(reply),
2153 };
2154 char buffer[sizeof(msg) + sizeof(reply)];
2155
2156 if (client->communication.outbound.queued_command_reply) {
2157 /* Protocol error. */
2158 goto error;
2159 }
2160
2161 memcpy(buffer, &msg, sizeof(msg));
2162 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2163 DBG("[notification-thread] Send command reply (%i)", (int) status);
2164
2165 /* Enqueue buffer to outgoing queue and flush it. */
2166 ret = lttng_dynamic_buffer_append(
2167 &client->communication.outbound.buffer,
2168 buffer, sizeof(buffer));
2169 if (ret) {
2170 goto error;
2171 }
2172
2173 ret = client_flush_outgoing_queue(client, state);
2174 if (ret) {
2175 goto error;
2176 }
2177
2178 if (client->communication.outbound.buffer.size != 0) {
2179 /* Queue could not be emptied. */
2180 client->communication.outbound.queued_command_reply = true;
2181 }
2182
2183 return 0;
2184 error:
2185 return -1;
2186 }
2187
2188 static
2189 int client_dispatch_message(struct notification_client *client,
2190 struct notification_thread_state *state)
2191 {
2192 int ret = 0;
2193
2194 if (client->communication.inbound.msg_type !=
2195 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2196 client->communication.inbound.msg_type !=
2197 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2198 !client->validated) {
2199 WARN("[notification-thread] client attempted a command before handshake");
2200 ret = -1;
2201 goto end;
2202 }
2203
2204 switch (client->communication.inbound.msg_type) {
2205 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2206 {
2207 /*
2208 * Receiving message header. The function will be called again
2209 * once the rest of the message as been received and can be
2210 * interpreted.
2211 */
2212 const struct lttng_notification_channel_message *msg;
2213
2214 assert(sizeof(*msg) ==
2215 client->communication.inbound.buffer.size);
2216 msg = (const struct lttng_notification_channel_message *)
2217 client->communication.inbound.buffer.data;
2218
2219 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2220 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2221 ret = -1;
2222 goto end;
2223 }
2224
2225 switch (msg->type) {
2226 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2227 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2228 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2229 break;
2230 default:
2231 ret = -1;
2232 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2233 goto end;
2234 }
2235
2236 client->communication.inbound.bytes_to_receive = msg->size;
2237 client->communication.inbound.msg_type =
2238 (enum lttng_notification_channel_message_type) msg->type;
2239 ret = lttng_dynamic_buffer_set_size(
2240 &client->communication.inbound.buffer, msg->size);
2241 if (ret) {
2242 goto end;
2243 }
2244 break;
2245 }
2246 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2247 {
2248 struct lttng_notification_channel_command_handshake *handshake_client;
2249 struct lttng_notification_channel_command_handshake handshake_reply = {
2250 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2251 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2252 };
2253 struct lttng_notification_channel_message msg_header = {
2254 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2255 .size = sizeof(handshake_reply),
2256 };
2257 enum lttng_notification_channel_status status =
2258 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2259 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2260
2261 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2262 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2263 sizeof(handshake_reply));
2264
2265 handshake_client =
2266 (struct lttng_notification_channel_command_handshake *)
2267 client->communication.inbound.buffer.data;
2268 client->major = handshake_client->major;
2269 client->minor = handshake_client->minor;
2270 if (!client->communication.inbound.creds_received) {
2271 ERR("[notification-thread] No credentials received from client");
2272 ret = -1;
2273 goto end;
2274 }
2275
2276 client->uid = LTTNG_SOCK_GET_UID_CRED(
2277 &client->communication.inbound.creds);
2278 client->gid = LTTNG_SOCK_GET_GID_CRED(
2279 &client->communication.inbound.creds);
2280 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2281 client->uid, client->gid, (int) client->major,
2282 (int) client->minor);
2283
2284 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2285 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2286 }
2287
2288 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2289 send_buffer, sizeof(send_buffer));
2290 if (ret) {
2291 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2292 goto end;
2293 }
2294
2295 ret = client_flush_outgoing_queue(client, state);
2296 if (ret) {
2297 goto end;
2298 }
2299
2300 ret = client_send_command_reply(client, state, status);
2301 if (ret) {
2302 ERR("[notification-thread] Failed to send reply to notification channel client");
2303 goto end;
2304 }
2305
2306 /* Set reception state to receive the next message header. */
2307 ret = client_reset_inbound_state(client);
2308 if (ret) {
2309 ERR("[notification-thread] Failed to reset client communication's inbound state");
2310 goto end;
2311 }
2312 client->validated = true;
2313 break;
2314 }
2315 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2316 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2317 {
2318 struct lttng_condition *condition;
2319 enum lttng_notification_channel_status status =
2320 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2321 const struct lttng_buffer_view condition_view =
2322 lttng_buffer_view_from_dynamic_buffer(
2323 &client->communication.inbound.buffer,
2324 0, -1);
2325 size_t expected_condition_size =
2326 client->communication.inbound.buffer.size;
2327
2328 ret = lttng_condition_create_from_buffer(&condition_view,
2329 &condition);
2330 if (ret != expected_condition_size) {
2331 ERR("[notification-thread] Malformed condition received from client");
2332 goto end;
2333 }
2334
2335 if (client->communication.inbound.msg_type ==
2336 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2337 ret = notification_thread_client_subscribe(client,
2338 condition, state, &status);
2339 } else {
2340 ret = notification_thread_client_unsubscribe(client,
2341 condition, state, &status);
2342 }
2343 if (ret) {
2344 goto end;
2345 }
2346
2347 ret = client_send_command_reply(client, state, status);
2348 if (ret) {
2349 ERR("[notification-thread] Failed to send reply to notification channel client");
2350 goto end;
2351 }
2352
2353 /* Set reception state to receive the next message header. */
2354 ret = client_reset_inbound_state(client);
2355 if (ret) {
2356 ERR("[notification-thread] Failed to reset client communication's inbound state");
2357 goto end;
2358 }
2359 break;
2360 }
2361 default:
2362 abort();
2363 }
2364 end:
2365 return ret;
2366 }
2367
2368 /* Incoming data from client. */
2369 int handle_notification_thread_client_in(
2370 struct notification_thread_state *state, int socket)
2371 {
2372 int ret = 0;
2373 struct notification_client *client;
2374 ssize_t recv_ret;
2375 size_t offset;
2376
2377 client = get_client_from_socket(socket, state);
2378 if (!client) {
2379 /* Internal error, abort. */
2380 ret = -1;
2381 goto end;
2382 }
2383
2384 offset = client->communication.inbound.buffer.size -
2385 client->communication.inbound.bytes_to_receive;
2386 if (client->communication.inbound.expect_creds) {
2387 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2388 client->communication.inbound.buffer.data + offset,
2389 client->communication.inbound.bytes_to_receive,
2390 &client->communication.inbound.creds);
2391 if (recv_ret > 0) {
2392 client->communication.inbound.expect_creds = false;
2393 client->communication.inbound.creds_received = true;
2394 }
2395 } else {
2396 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2397 client->communication.inbound.buffer.data + offset,
2398 client->communication.inbound.bytes_to_receive);
2399 }
2400 if (recv_ret < 0) {
2401 goto error_disconnect_client;
2402 }
2403
2404 client->communication.inbound.bytes_to_receive -= recv_ret;
2405 if (client->communication.inbound.bytes_to_receive == 0) {
2406 ret = client_dispatch_message(client, state);
2407 if (ret) {
2408 /*
2409 * Only returns an error if this client must be
2410 * disconnected.
2411 */
2412 goto error_disconnect_client;
2413 }
2414 } else {
2415 goto end;
2416 }
2417 end:
2418 return ret;
2419 error_disconnect_client:
2420 ret = handle_notification_thread_client_disconnect(socket, state);
2421 return ret;
2422 }
2423
2424 /* Client ready to receive outgoing data. */
2425 int handle_notification_thread_client_out(
2426 struct notification_thread_state *state, int socket)
2427 {
2428 int ret;
2429 struct notification_client *client;
2430
2431 client = get_client_from_socket(socket, state);
2432 if (!client) {
2433 /* Internal error, abort. */
2434 ret = -1;
2435 goto end;
2436 }
2437
2438 ret = client_flush_outgoing_queue(client, state);
2439 if (ret) {
2440 goto end;
2441 }
2442 end:
2443 return ret;
2444 }
2445
2446 static
2447 bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2448 const struct channel_state_sample *sample,
2449 uint64_t buffer_capacity)
2450 {
2451 bool result = false;
2452 uint64_t threshold;
2453 enum lttng_condition_type condition_type;
2454 const struct lttng_condition_buffer_usage *use_condition = container_of(
2455 condition, struct lttng_condition_buffer_usage,
2456 parent);
2457
2458 if (use_condition->threshold_bytes.set) {
2459 threshold = use_condition->threshold_bytes.value;
2460 } else {
2461 /*
2462 * Threshold was expressed as a ratio.
2463 *
2464 * TODO the threshold (in bytes) of conditions expressed
2465 * as a ratio of total buffer size could be cached to
2466 * forego this double-multiplication or it could be performed
2467 * as fixed-point math.
2468 *
2469 * Note that caching should accomodate the case where the
2470 * condition applies to multiple channels (i.e. don't assume
2471 * that all channels matching my_chann* have the same size...)
2472 */
2473 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2474 (double) buffer_capacity);
2475 }
2476
2477 condition_type = lttng_condition_get_type(condition);
2478 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2479 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2480 threshold, sample->highest_usage);
2481
2482 /*
2483 * The low condition should only be triggered once _all_ of the
2484 * streams in a channel have gone below the "low" threshold.
2485 */
2486 if (sample->highest_usage <= threshold) {
2487 result = true;
2488 }
2489 } else {
2490 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2491 threshold, sample->highest_usage);
2492
2493 /*
2494 * For high buffer usage scenarios, we want to trigger whenever
2495 * _any_ of the streams has reached the "high" threshold.
2496 */
2497 if (sample->highest_usage >= threshold) {
2498 result = true;
2499 }
2500 }
2501
2502 return result;
2503 }
2504
2505 static
2506 bool evaluate_session_consumed_size_condition(
2507 const struct lttng_condition *condition,
2508 uint64_t session_consumed_size)
2509 {
2510 uint64_t threshold;
2511 const struct lttng_condition_session_consumed_size *size_condition =
2512 container_of(condition,
2513 struct lttng_condition_session_consumed_size,
2514 parent);
2515
2516 threshold = size_condition->consumed_threshold_bytes.value;
2517 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2518 threshold, session_consumed_size);
2519 return session_consumed_size >= threshold;
2520 }
2521
2522 static
2523 int evaluate_condition(const struct lttng_condition *condition,
2524 struct lttng_evaluation **evaluation,
2525 const struct notification_thread_state *state,
2526 const struct channel_state_sample *previous_sample,
2527 const struct channel_state_sample *latest_sample,
2528 uint64_t previous_session_consumed_total,
2529 uint64_t latest_session_consumed_total,
2530 struct channel_info *channel_info)
2531 {
2532 int ret = 0;
2533 enum lttng_condition_type condition_type;
2534 const bool previous_sample_available = !!previous_sample;
2535 bool previous_sample_result = false;
2536 bool latest_sample_result;
2537
2538 condition_type = lttng_condition_get_type(condition);
2539
2540 switch (condition_type) {
2541 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2542 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2543 if (caa_likely(previous_sample_available)) {
2544 previous_sample_result =
2545 evaluate_buffer_usage_condition(condition,
2546 previous_sample, channel_info->capacity);
2547 }
2548 latest_sample_result = evaluate_buffer_usage_condition(
2549 condition, latest_sample,
2550 channel_info->capacity);
2551 break;
2552 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2553 if (caa_likely(previous_sample_available)) {
2554 previous_sample_result =
2555 evaluate_session_consumed_size_condition(
2556 condition,
2557 previous_session_consumed_total);
2558 }
2559 latest_sample_result =
2560 evaluate_session_consumed_size_condition(
2561 condition,
2562 latest_session_consumed_total);
2563 break;
2564 default:
2565 /* Unknown condition type; internal error. */
2566 abort();
2567 }
2568
2569 if (!latest_sample_result ||
2570 (previous_sample_result == latest_sample_result)) {
2571 /*
2572 * Only trigger on a condition evaluation transition.
2573 *
2574 * NOTE: This edge-triggered logic may not be appropriate for
2575 * future condition types.
2576 */
2577 goto end;
2578 }
2579
2580 if (!evaluation || !latest_sample_result) {
2581 goto end;
2582 }
2583
2584 switch (condition_type) {
2585 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2586 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2587 *evaluation = lttng_evaluation_buffer_usage_create(
2588 condition_type,
2589 latest_sample->highest_usage,
2590 channel_info->capacity);
2591 break;
2592 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2593 *evaluation = lttng_evaluation_session_consumed_size_create(
2594 latest_session_consumed_total);
2595 break;
2596 default:
2597 abort();
2598 }
2599
2600 if (!*evaluation) {
2601 ret = -1;
2602 goto end;
2603 }
2604 end:
2605 return ret;
2606 }
2607
2608 static
2609 int client_enqueue_dropped_notification(struct notification_client *client,
2610 struct notification_thread_state *state)
2611 {
2612 int ret;
2613 struct lttng_notification_channel_message msg = {
2614 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2615 .size = 0,
2616 };
2617
2618 ret = lttng_dynamic_buffer_append(
2619 &client->communication.outbound.buffer, &msg,
2620 sizeof(msg));
2621 return ret;
2622 }
2623
2624 static
2625 int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2626 const struct lttng_evaluation *evaluation,
2627 struct notification_client_list* client_list,
2628 struct notification_thread_state *state,
2629 uid_t channel_uid, gid_t channel_gid)
2630 {
2631 int ret = 0;
2632 struct lttng_dynamic_buffer msg_buffer;
2633 struct notification_client_list_element *client_list_element, *tmp;
2634 const struct lttng_notification notification = {
2635 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2636 .evaluation = (struct lttng_evaluation *) evaluation,
2637 };
2638 struct lttng_notification_channel_message msg_header = {
2639 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2640 };
2641
2642 lttng_dynamic_buffer_init(&msg_buffer);
2643
2644 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2645 sizeof(msg_header));
2646 if (ret) {
2647 goto end;
2648 }
2649
2650 ret = lttng_notification_serialize(&notification, &msg_buffer);
2651 if (ret) {
2652 ERR("[notification-thread] Failed to serialize notification");
2653 ret = -1;
2654 goto end;
2655 }
2656
2657 /* Update payload size. */
2658 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2659 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2660
2661 cds_list_for_each_entry_safe(client_list_element, tmp,
2662 &client_list->list, node) {
2663 struct notification_client *client =
2664 client_list_element->client;
2665
2666 if (client->uid != channel_uid && client->gid != channel_gid &&
2667 client->uid != 0) {
2668 /* Client is not allowed to monitor this channel. */
2669 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2670 continue;
2671 }
2672
2673 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2674 client->socket, msg_buffer.size);
2675 if (client->communication.outbound.buffer.size) {
2676 /*
2677 * Outgoing data is already buffered for this client;
2678 * drop the notification and enqueue a "dropped
2679 * notification" message if this is the first dropped
2680 * notification since the socket spilled-over to the
2681 * queue.
2682 */
2683 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2684 client->socket);
2685 if (!client->communication.outbound.dropped_notification) {
2686 client->communication.outbound.dropped_notification = true;
2687 ret = client_enqueue_dropped_notification(
2688 client, state);
2689 if (ret) {
2690 goto end;
2691 }
2692 }
2693 continue;
2694 }
2695
2696 ret = lttng_dynamic_buffer_append_buffer(
2697 &client->communication.outbound.buffer,
2698 &msg_buffer);
2699 if (ret) {
2700 goto end;
2701 }
2702
2703 ret = client_flush_outgoing_queue(client, state);
2704 if (ret) {
2705 goto end;
2706 }
2707 }
2708 ret = 0;
2709 end:
2710 lttng_dynamic_buffer_reset(&msg_buffer);
2711 return ret;
2712 }
2713
2714 int handle_notification_thread_channel_sample(
2715 struct notification_thread_state *state, int pipe,
2716 enum lttng_domain_type domain)
2717 {
2718 int ret = 0;
2719 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2720 struct channel_info *channel_info;
2721 struct cds_lfht_node *node;
2722 struct cds_lfht_iter iter;
2723 struct lttng_channel_trigger_list *trigger_list;
2724 struct lttng_trigger_list_element *trigger_list_element;
2725 bool previous_sample_available = false;
2726 struct channel_state_sample previous_sample, latest_sample;
2727 uint64_t previous_session_consumed_total, latest_session_consumed_total;
2728
2729 /*
2730 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2731 * ensuring that read/write of sampling messages are atomic.
2732 */
2733 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2734 if (ret != sizeof(sample_msg)) {
2735 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2736 pipe);
2737 ret = -1;
2738 goto end;
2739 }
2740
2741 ret = 0;
2742 latest_sample.key.key = sample_msg.key;
2743 latest_sample.key.domain = domain;
2744 latest_sample.highest_usage = sample_msg.highest;
2745 latest_sample.lowest_usage = sample_msg.lowest;
2746 latest_sample.channel_total_consumed = sample_msg.total_consumed;
2747
2748 rcu_read_lock();
2749
2750 /* Retrieve the channel's informations */
2751 cds_lfht_lookup(state->channels_ht,
2752 hash_channel_key(&latest_sample.key),
2753 match_channel_info,
2754 &latest_sample.key,
2755 &iter);
2756 node = cds_lfht_iter_get_node(&iter);
2757 if (caa_unlikely(!node)) {
2758 /*
2759 * Not an error since the consumer can push a sample to the pipe
2760 * and the rest of the session daemon could notify us of the
2761 * channel's destruction before we get a chance to process that
2762 * sample.
2763 */
2764 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2765 latest_sample.key.key,
2766 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2767 "user space");
2768 goto end_unlock;
2769 }
2770 channel_info = caa_container_of(node, struct channel_info,
2771 channels_ht_node);
2772 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
2773 channel_info->name,
2774 latest_sample.key.key,
2775 channel_info->session_info->name,
2776 latest_sample.highest_usage,
2777 latest_sample.lowest_usage,
2778 latest_sample.channel_total_consumed);
2779
2780 previous_session_consumed_total =
2781 channel_info->session_info->consumed_data_size;
2782
2783 /* Retrieve the channel's last sample, if it exists, and update it. */
2784 cds_lfht_lookup(state->channel_state_ht,
2785 hash_channel_key(&latest_sample.key),
2786 match_channel_state_sample,
2787 &latest_sample.key,
2788 &iter);
2789 node = cds_lfht_iter_get_node(&iter);
2790 if (caa_likely(node)) {
2791 struct channel_state_sample *stored_sample;
2792
2793 /* Update the sample stored. */
2794 stored_sample = caa_container_of(node,
2795 struct channel_state_sample,
2796 channel_state_ht_node);
2797
2798 memcpy(&previous_sample, stored_sample,
2799 sizeof(previous_sample));
2800 stored_sample->highest_usage = latest_sample.highest_usage;
2801 stored_sample->lowest_usage = latest_sample.lowest_usage;
2802 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
2803 previous_sample_available = true;
2804
2805 latest_session_consumed_total =
2806 previous_session_consumed_total +
2807 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
2808 } else {
2809 /*
2810 * This is the channel's first sample, allocate space for and
2811 * store the new sample.
2812 */
2813 struct channel_state_sample *stored_sample;
2814
2815 stored_sample = zmalloc(sizeof(*stored_sample));
2816 if (!stored_sample) {
2817 ret = -1;
2818 goto end_unlock;
2819 }
2820
2821 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2822 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2823 cds_lfht_add(state->channel_state_ht,
2824 hash_channel_key(&stored_sample->key),
2825 &stored_sample->channel_state_ht_node);
2826
2827 latest_session_consumed_total =
2828 previous_session_consumed_total +
2829 latest_sample.channel_total_consumed;
2830 }
2831
2832 channel_info->session_info->consumed_data_size =
2833 latest_session_consumed_total;
2834
2835 /* Find triggers associated with this channel. */
2836 cds_lfht_lookup(state->channel_triggers_ht,
2837 hash_channel_key(&latest_sample.key),
2838 match_channel_trigger_list,
2839 &latest_sample.key,
2840 &iter);
2841 node = cds_lfht_iter_get_node(&iter);
2842 if (caa_likely(!node)) {
2843 goto end_unlock;
2844 }
2845
2846 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2847 channel_triggers_ht_node);
2848 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2849 node) {
2850 const struct lttng_condition *condition;
2851 const struct lttng_action *action;
2852 const struct lttng_trigger *trigger;
2853 struct notification_client_list *client_list;
2854 struct lttng_evaluation *evaluation = NULL;
2855
2856 trigger = trigger_list_element->trigger;
2857 condition = lttng_trigger_get_const_condition(trigger);
2858 assert(condition);
2859 action = lttng_trigger_get_const_action(trigger);
2860
2861 /* Notify actions are the only type currently supported. */
2862 assert(lttng_action_get_type_const(action) ==
2863 LTTNG_ACTION_TYPE_NOTIFY);
2864
2865 /*
2866 * Check if any client is subscribed to the result of this
2867 * evaluation.
2868 */
2869 cds_lfht_lookup(state->notification_trigger_clients_ht,
2870 lttng_condition_hash(condition),
2871 match_client_list,
2872 trigger,
2873 &iter);
2874 node = cds_lfht_iter_get_node(&iter);
2875 assert(node);
2876
2877 client_list = caa_container_of(node,
2878 struct notification_client_list,
2879 notification_trigger_ht_node);
2880 if (cds_list_empty(&client_list->list)) {
2881 /*
2882 * No clients interested in the evaluation's result,
2883 * skip it.
2884 */
2885 continue;
2886 }
2887
2888 ret = evaluate_condition(condition, &evaluation, state,
2889 previous_sample_available ? &previous_sample : NULL,
2890 &latest_sample,
2891 previous_session_consumed_total,
2892 latest_session_consumed_total,
2893 channel_info);
2894 if (caa_unlikely(ret)) {
2895 goto end_unlock;
2896 }
2897
2898 if (caa_likely(!evaluation)) {
2899 continue;
2900 }
2901
2902 /* Dispatch evaluation result to all clients. */
2903 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2904 evaluation, client_list, state,
2905 channel_info->session_info->uid,
2906 channel_info->session_info->gid);
2907 lttng_evaluation_destroy(evaluation);
2908 if (caa_unlikely(ret)) {
2909 goto end_unlock;
2910 }
2911 }
2912 end_unlock:
2913 rcu_read_unlock();
2914 end:
2915 return ret;
2916 }
This page took 0.197753 seconds and 5 git commands to generate.