Add likely/unlikely annotations on channel sample handling path
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/notification/channel-internal.h>
36
37 #include <time.h>
38 #include <unistd.h>
39 #include <assert.h>
40 #include <inttypes.h>
41 #include <fcntl.h>
42
43 #include "notification-thread.h"
44 #include "notification-thread-events.h"
45 #include "notification-thread-commands.h"
46 #include "lttng-sessiond.h"
47 #include "kernel.h"
48
49 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52 struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61 };
62
63 struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66 };
67
68 struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71 };
72
73 struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76 };
77
78 struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82 };
83
84 struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
91 * Indicates if the credentials and versions of the client have been
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
118 bool expect_creds;
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
124 /* Only used during credentials reception. */
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148 };
149
150 struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155 };
156
157 static unsigned long hash_channel_key(struct channel_key *key);
158 static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164 static
165 int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
171
172 static
173 void session_info_destroy(void *_data);
174 static
175 void session_info_get(struct session_info *session_info);
176 static
177 void session_info_put(struct session_info *session_info);
178 static
179 struct session_info *session_info_create(const char *name,
180 uid_t uid, gid_t gid);
181 static
182 void session_info_add_channel(struct session_info *session_info,
183 struct channel_info *channel_info);
184 static
185 void session_info_remove_channel(struct session_info *session_info,
186 struct channel_info *channel_info);
187
188 static
189 int match_client(struct cds_lfht_node *node, const void *key)
190 {
191 /* This double-cast is intended to supress pointer-to-cast warning. */
192 int socket = (int) (intptr_t) key;
193 struct notification_client *client;
194
195 client = caa_container_of(node, struct notification_client,
196 client_socket_ht_node);
197
198 return !!(client->socket == socket);
199 }
200
201 static
202 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
203 {
204 struct channel_key *channel_key = (struct channel_key *) key;
205 struct lttng_channel_trigger_list *trigger_list;
206
207 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
208 channel_triggers_ht_node);
209
210 return !!((channel_key->key == trigger_list->channel_key.key) &&
211 (channel_key->domain == trigger_list->channel_key.domain));
212 }
213
214 static
215 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
216 {
217 struct channel_key *channel_key = (struct channel_key *) key;
218 struct channel_state_sample *sample;
219
220 sample = caa_container_of(node, struct channel_state_sample,
221 channel_state_ht_node);
222
223 return !!((channel_key->key == sample->key.key) &&
224 (channel_key->domain == sample->key.domain));
225 }
226
227 static
228 int match_channel_info(struct cds_lfht_node *node, const void *key)
229 {
230 struct channel_key *channel_key = (struct channel_key *) key;
231 struct channel_info *channel_info;
232
233 channel_info = caa_container_of(node, struct channel_info,
234 channels_ht_node);
235
236 return !!((channel_key->key == channel_info->key.key) &&
237 (channel_key->domain == channel_info->key.domain));
238 }
239
240 static
241 int match_condition(struct cds_lfht_node *node, const void *key)
242 {
243 struct lttng_condition *condition_key = (struct lttng_condition *) key;
244 struct lttng_trigger_ht_element *trigger;
245 struct lttng_condition *condition;
246
247 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
248 node);
249 condition = lttng_trigger_get_condition(trigger->trigger);
250 assert(condition);
251
252 return !!lttng_condition_is_equal(condition_key, condition);
253 }
254
255 static
256 int match_client_list(struct cds_lfht_node *node, const void *key)
257 {
258 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
259 struct notification_client_list *client_list;
260 struct lttng_condition *condition;
261 struct lttng_condition *condition_key = lttng_trigger_get_condition(
262 trigger_key);
263
264 assert(condition_key);
265
266 client_list = caa_container_of(node, struct notification_client_list,
267 notification_trigger_ht_node);
268 condition = lttng_trigger_get_condition(client_list->trigger);
269
270 return !!lttng_condition_is_equal(condition_key, condition);
271 }
272
273 static
274 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
275 {
276 struct lttng_condition *condition_key = (struct lttng_condition *) key;
277 struct notification_client_list *client_list;
278 struct lttng_condition *condition;
279
280 assert(condition_key);
281
282 client_list = caa_container_of(node, struct notification_client_list,
283 notification_trigger_ht_node);
284 condition = lttng_trigger_get_condition(client_list->trigger);
285
286 return !!lttng_condition_is_equal(condition_key, condition);
287 }
288
289 static
290 unsigned long lttng_condition_buffer_usage_hash(
291 struct lttng_condition *_condition)
292 {
293 unsigned long hash = 0;
294 struct lttng_condition_buffer_usage *condition;
295
296 condition = container_of(_condition,
297 struct lttng_condition_buffer_usage, parent);
298
299 if (condition->session_name) {
300 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
301 }
302 if (condition->channel_name) {
303 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
304 }
305 if (condition->domain.set) {
306 hash ^= hash_key_ulong(
307 (void *) condition->domain.type,
308 lttng_ht_seed);
309 }
310 if (condition->threshold_ratio.set) {
311 uint64_t val;
312
313 val = condition->threshold_ratio.value * (double) UINT32_MAX;
314 hash ^= hash_key_u64(&val, lttng_ht_seed);
315 } else if (condition->threshold_bytes.set) {
316 uint64_t val;
317
318 val = condition->threshold_bytes.value;
319 hash ^= hash_key_u64(&val, lttng_ht_seed);
320 }
321 return hash;
322 }
323
324 /*
325 * The lttng_condition hashing code is kept in this file (rather than
326 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
327 * don't want to link in liblttng-ctl.
328 */
329 static
330 unsigned long lttng_condition_hash(struct lttng_condition *condition)
331 {
332 switch (condition->type) {
333 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
334 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
335 return lttng_condition_buffer_usage_hash(condition);
336 default:
337 ERR("[notification-thread] Unexpected condition type caught");
338 abort();
339 }
340 }
341
342 static
343 unsigned long hash_channel_key(struct channel_key *key)
344 {
345 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
346 unsigned long domain_hash = hash_key_ulong(
347 (void *) (unsigned long) key->domain, lttng_ht_seed);
348
349 return key_hash ^ domain_hash;
350 }
351
352 static
353 void channel_info_destroy(struct channel_info *channel_info)
354 {
355 if (!channel_info) {
356 return;
357 }
358
359 if (channel_info->session_info) {
360 session_info_remove_channel(channel_info->session_info,
361 channel_info);
362 session_info_put(channel_info->session_info);
363 }
364 if (channel_info->name) {
365 free(channel_info->name);
366 }
367 free(channel_info);
368 }
369
370 /* Don't call directly, use the ref-counting mechanism. */
371 static
372 void session_info_destroy(void *_data)
373 {
374 struct session_info *session_info = _data;
375
376 assert(session_info);
377 if (session_info->channel_infos_ht) {
378 cds_lfht_destroy(session_info->channel_infos_ht, NULL);
379 }
380 free(session_info->name);
381 free(session_info);
382 }
383
384 static
385 void session_info_get(struct session_info *session_info)
386 {
387 if (!session_info) {
388 return;
389 }
390 lttng_ref_get(&session_info->ref);
391 }
392
393 static
394 void session_info_put(struct session_info *session_info)
395 {
396 if (!session_info) {
397 return;
398 }
399 lttng_ref_put(&session_info->ref);
400 }
401
402 static
403 struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid)
404 {
405 struct session_info *session_info;
406
407 assert(name);
408
409 session_info = zmalloc(sizeof(*session_info));
410 if (!session_info) {
411 goto end;
412 }
413 lttng_ref_init(&session_info->ref, session_info_destroy);
414
415 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
416 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
417 if (!session_info->channel_infos_ht) {
418 goto error;
419 }
420
421 cds_lfht_node_init(&session_info->sessions_ht_node);
422 session_info->name = strdup(name);
423 if (!session_info->name) {
424 goto error;
425 }
426 session_info->uid = uid;
427 session_info->gid = gid;
428 end:
429 return session_info;
430 error:
431 session_info_put(session_info);
432 return NULL;
433 }
434
435 static
436 void session_info_add_channel(struct session_info *session_info,
437 struct channel_info *channel_info)
438 {
439 rcu_read_lock();
440 cds_lfht_add(session_info->channel_infos_ht,
441 hash_channel_key(&channel_info->key),
442 &channel_info->session_info_channels_ht_node);
443 rcu_read_unlock();
444 }
445
446 static
447 void session_info_remove_channel(struct session_info *session_info,
448 struct channel_info *channel_info)
449 {
450 rcu_read_lock();
451 cds_lfht_del(session_info->channel_infos_ht,
452 &channel_info->session_info_channels_ht_node);
453 rcu_read_unlock();
454 }
455
456 static
457 struct channel_info *channel_info_create(const char *channel_name,
458 struct channel_key *channel_key, uint64_t channel_capacity,
459 struct session_info *session_info)
460 {
461 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
462
463 if (!channel_info) {
464 goto end;
465 }
466
467 cds_lfht_node_init(&channel_info->channels_ht_node);
468 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
469 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
470 channel_info->capacity = channel_capacity;
471
472 channel_info->name = strdup(channel_name);
473 if (!channel_info->name) {
474 goto error;
475 }
476
477 /*
478 * Set the references between session and channel infos:
479 * - channel_info holds a strong reference to session_info
480 * - session_info holds a weak reference to channel_info
481 */
482 session_info_get(session_info);
483 session_info_add_channel(session_info, channel_info);
484 channel_info->session_info = session_info;
485 end:
486 return channel_info;
487 error:
488 channel_info_destroy(channel_info);
489 return NULL;
490 }
491
492 /* This function must be called with the RCU read lock held. */
493 static
494 int evaluate_condition_for_client(struct lttng_trigger *trigger,
495 struct lttng_condition *condition,
496 struct notification_client *client,
497 struct notification_thread_state *state)
498 {
499 int ret;
500 struct cds_lfht_iter iter;
501 struct cds_lfht_node *node;
502 struct channel_info *channel_info = NULL;
503 struct channel_key *channel_key = NULL;
504 struct channel_state_sample *last_sample = NULL;
505 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
506 struct lttng_evaluation *evaluation = NULL;
507 struct notification_client_list client_list = { 0 };
508 struct notification_client_list_element client_list_element = { 0 };
509
510 assert(trigger);
511 assert(condition);
512 assert(client);
513 assert(state);
514
515 /* Find the channel associated with the trigger. */
516 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
517 channel_trigger_list , channel_triggers_ht_node) {
518 struct lttng_trigger_list_element *element;
519
520 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
521 struct lttng_condition *current_condition =
522 lttng_trigger_get_condition(
523 element->trigger);
524
525 assert(current_condition);
526 if (!lttng_condition_is_equal(condition,
527 current_condition)) {
528 continue;
529 }
530
531 /* Found the trigger, save the channel key. */
532 channel_key = &channel_trigger_list->channel_key;
533 break;
534 }
535 if (channel_key) {
536 /* The channel key was found stop iteration. */
537 break;
538 }
539 }
540
541 if (!channel_key){
542 /* No channel found; normal exit. */
543 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
544 ret = 0;
545 goto end;
546 }
547
548 /* Fetch channel info for the matching channel. */
549 cds_lfht_lookup(state->channels_ht,
550 hash_channel_key(channel_key),
551 match_channel_info,
552 channel_key,
553 &iter);
554 node = cds_lfht_iter_get_node(&iter);
555 assert(node);
556 channel_info = caa_container_of(node, struct channel_info,
557 channels_ht_node);
558
559 /* Retrieve the channel's last sample, if it exists. */
560 cds_lfht_lookup(state->channel_state_ht,
561 hash_channel_key(channel_key),
562 match_channel_state_sample,
563 channel_key,
564 &iter);
565 node = cds_lfht_iter_get_node(&iter);
566 if (node) {
567 last_sample = caa_container_of(node,
568 struct channel_state_sample,
569 channel_state_ht_node);
570 } else {
571 /* Nothing to evaluate, no sample was ever taken. Normal exit */
572 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
573 ret = 0;
574 goto end;
575 }
576
577 ret = evaluate_condition(condition, &evaluation, state, NULL,
578 last_sample, channel_info->capacity);
579 if (ret) {
580 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
581 goto end;
582 }
583
584 if (!evaluation) {
585 /* Evaluation yielded nothing. Normal exit. */
586 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
587 ret = 0;
588 goto end;
589 }
590
591 /*
592 * Create a temporary client list with the client currently
593 * subscribing.
594 */
595 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
596 CDS_INIT_LIST_HEAD(&client_list.list);
597 client_list.trigger = trigger;
598
599 CDS_INIT_LIST_HEAD(&client_list_element.node);
600 client_list_element.client = client;
601 cds_list_add(&client_list_element.node, &client_list.list);
602
603 /* Send evaluation result to the newly-subscribed client. */
604 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
605 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
606 state, channel_info->session_info->uid,
607 channel_info->session_info->gid);
608
609 end:
610 return ret;
611 }
612
613 static
614 int notification_thread_client_subscribe(struct notification_client *client,
615 struct lttng_condition *condition,
616 struct notification_thread_state *state,
617 enum lttng_notification_channel_status *_status)
618 {
619 int ret = 0;
620 struct cds_lfht_iter iter;
621 struct cds_lfht_node *node;
622 struct notification_client_list *client_list;
623 struct lttng_condition_list_element *condition_list_element = NULL;
624 struct notification_client_list_element *client_list_element = NULL;
625 enum lttng_notification_channel_status status =
626 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
627
628 /*
629 * Ensure that the client has not already subscribed to this condition
630 * before.
631 */
632 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
633 if (lttng_condition_is_equal(condition_list_element->condition,
634 condition)) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
636 goto end;
637 }
638 }
639
640 condition_list_element = zmalloc(sizeof(*condition_list_element));
641 if (!condition_list_element) {
642 ret = -1;
643 goto error;
644 }
645 client_list_element = zmalloc(sizeof(*client_list_element));
646 if (!client_list_element) {
647 ret = -1;
648 goto error;
649 }
650
651 rcu_read_lock();
652
653 /*
654 * Add the newly-subscribed condition to the client's subscription list.
655 */
656 CDS_INIT_LIST_HEAD(&condition_list_element->node);
657 condition_list_element->condition = condition;
658 cds_list_add(&condition_list_element->node, &client->condition_list);
659
660 cds_lfht_lookup(state->notification_trigger_clients_ht,
661 lttng_condition_hash(condition),
662 match_client_list_condition,
663 condition,
664 &iter);
665 node = cds_lfht_iter_get_node(&iter);
666 if (!node) {
667 /*
668 * No notification-emiting trigger registered with this
669 * condition. We don't evaluate the condition right away
670 * since this trigger is not registered yet.
671 */
672 free(client_list_element);
673 goto end_unlock;
674 }
675
676 client_list = caa_container_of(node, struct notification_client_list,
677 notification_trigger_ht_node);
678 /*
679 * The condition to which the client just subscribed is evaluated
680 * at this point so that conditions that are already TRUE result
681 * in a notification being sent out.
682 */
683 if (evaluate_condition_for_client(client_list->trigger, condition,
684 client, state)) {
685 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
686 ret = -1;
687 goto end_unlock;
688 }
689
690 /*
691 * Add the client to the list of clients interested in a given trigger
692 * if a "notification" trigger with a corresponding condition was
693 * added prior.
694 */
695 client_list_element->client = client;
696 CDS_INIT_LIST_HEAD(&client_list_element->node);
697 cds_list_add(&client_list_element->node, &client_list->list);
698 end_unlock:
699 rcu_read_unlock();
700 end:
701 if (_status) {
702 *_status = status;
703 }
704 return ret;
705 error:
706 free(condition_list_element);
707 free(client_list_element);
708 return ret;
709 }
710
711 static
712 int notification_thread_client_unsubscribe(
713 struct notification_client *client,
714 struct lttng_condition *condition,
715 struct notification_thread_state *state,
716 enum lttng_notification_channel_status *_status)
717 {
718 struct cds_lfht_iter iter;
719 struct cds_lfht_node *node;
720 struct notification_client_list *client_list;
721 struct lttng_condition_list_element *condition_list_element,
722 *condition_tmp;
723 struct notification_client_list_element *client_list_element,
724 *client_tmp;
725 bool condition_found = false;
726 enum lttng_notification_channel_status status =
727 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
728
729 /* Remove the condition from the client's condition list. */
730 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
731 &client->condition_list, node) {
732 if (!lttng_condition_is_equal(condition_list_element->condition,
733 condition)) {
734 continue;
735 }
736
737 cds_list_del(&condition_list_element->node);
738 /*
739 * The caller may be iterating on the client's conditions to
740 * tear down a client's connection. In this case, the condition
741 * will be destroyed at the end.
742 */
743 if (condition != condition_list_element->condition) {
744 lttng_condition_destroy(
745 condition_list_element->condition);
746 }
747 free(condition_list_element);
748 condition_found = true;
749 break;
750 }
751
752 if (!condition_found) {
753 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
754 goto end;
755 }
756
757 /*
758 * Remove the client from the list of clients interested the trigger
759 * matching the condition.
760 */
761 rcu_read_lock();
762 cds_lfht_lookup(state->notification_trigger_clients_ht,
763 lttng_condition_hash(condition),
764 match_client_list_condition,
765 condition,
766 &iter);
767 node = cds_lfht_iter_get_node(&iter);
768 if (!node) {
769 goto end_unlock;
770 }
771
772 client_list = caa_container_of(node, struct notification_client_list,
773 notification_trigger_ht_node);
774 cds_list_for_each_entry_safe(client_list_element, client_tmp,
775 &client_list->list, node) {
776 if (client_list_element->client->socket != client->socket) {
777 continue;
778 }
779 cds_list_del(&client_list_element->node);
780 free(client_list_element);
781 break;
782 }
783 end_unlock:
784 rcu_read_unlock();
785 end:
786 lttng_condition_destroy(condition);
787 if (_status) {
788 *_status = status;
789 }
790 return 0;
791 }
792
793 static
794 void notification_client_destroy(struct notification_client *client,
795 struct notification_thread_state *state)
796 {
797 struct lttng_condition_list_element *condition_list_element, *tmp;
798
799 if (!client) {
800 return;
801 }
802
803 /* Release all conditions to which the client was subscribed. */
804 cds_list_for_each_entry_safe(condition_list_element, tmp,
805 &client->condition_list, node) {
806 (void) notification_thread_client_unsubscribe(client,
807 condition_list_element->condition, state, NULL);
808 }
809
810 if (client->socket >= 0) {
811 (void) lttcomm_close_unix_sock(client->socket);
812 }
813 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
814 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
815 free(client);
816 }
817
818 /*
819 * Call with rcu_read_lock held (and hold for the lifetime of the returned
820 * client pointer).
821 */
822 static
823 struct notification_client *get_client_from_socket(int socket,
824 struct notification_thread_state *state)
825 {
826 struct cds_lfht_iter iter;
827 struct cds_lfht_node *node;
828 struct notification_client *client = NULL;
829
830 cds_lfht_lookup(state->client_socket_ht,
831 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
832 match_client,
833 (void *) (unsigned long) socket,
834 &iter);
835 node = cds_lfht_iter_get_node(&iter);
836 if (!node) {
837 goto end;
838 }
839
840 client = caa_container_of(node, struct notification_client,
841 client_socket_ht_node);
842 end:
843 return client;
844 }
845
846 static
847 bool trigger_applies_to_channel(struct lttng_trigger *trigger,
848 struct channel_info *channel_info)
849 {
850 enum lttng_condition_status status;
851 struct lttng_condition *condition;
852 const char *trigger_session_name = NULL;
853 const char *trigger_channel_name = NULL;
854 enum lttng_domain_type trigger_domain;
855
856 condition = lttng_trigger_get_condition(trigger);
857 if (!condition) {
858 goto fail;
859 }
860
861 switch (lttng_condition_get_type(condition)) {
862 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
863 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
864 break;
865 default:
866 goto fail;
867 }
868
869 status = lttng_condition_buffer_usage_get_domain_type(condition,
870 &trigger_domain);
871 assert(status == LTTNG_CONDITION_STATUS_OK);
872 if (channel_info->key.domain != trigger_domain) {
873 goto fail;
874 }
875
876 status = lttng_condition_buffer_usage_get_session_name(
877 condition, &trigger_session_name);
878 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
879
880 status = lttng_condition_buffer_usage_get_channel_name(
881 condition, &trigger_channel_name);
882 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
883
884 if (strcmp(channel_info->session_info->name, trigger_session_name)) {
885 goto fail;
886 }
887 if (strcmp(channel_info->name, trigger_channel_name)) {
888 goto fail;
889 }
890
891 return true;
892 fail:
893 return false;
894 }
895
896 static
897 bool trigger_applies_to_client(struct lttng_trigger *trigger,
898 struct notification_client *client)
899 {
900 bool applies = false;
901 struct lttng_condition_list_element *condition_list_element;
902
903 cds_list_for_each_entry(condition_list_element, &client->condition_list,
904 node) {
905 applies = lttng_condition_is_equal(
906 condition_list_element->condition,
907 lttng_trigger_get_condition(trigger));
908 if (applies) {
909 break;
910 }
911 }
912 return applies;
913 }
914
915 static
916 int match_session(struct cds_lfht_node *node, const void *key)
917 {
918 const char *name = key;
919 struct session_info *session_info = caa_container_of(
920 node, struct session_info, sessions_ht_node);
921
922 return !strcmp(session_info->name, name);
923 }
924
925 static
926 struct session_info *find_or_create_session_info(
927 struct notification_thread_state *state,
928 const char *name, uid_t uid, gid_t gid)
929 {
930 struct session_info *session = NULL;
931 struct cds_lfht_node *node;
932 struct cds_lfht_iter iter;
933
934 rcu_read_lock();
935 cds_lfht_lookup(state->sessions_ht,
936 hash_key_str(name, lttng_ht_seed),
937 match_session,
938 name,
939 &iter);
940 node = cds_lfht_iter_get_node(&iter);
941 if (node) {
942 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
943 name, uid, gid);
944 session = caa_container_of(node, struct session_info,
945 sessions_ht_node);
946 assert(session->uid == uid);
947 assert(session->gid == gid);
948 goto end;
949 }
950
951 session = session_info_create(name, uid, gid);
952 if (!session) {
953 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
954 name, uid, gid);
955 goto end;
956 }
957 end:
958 rcu_read_unlock();
959 return session;
960 }
961
962 static
963 int handle_notification_thread_command_add_channel(
964 struct notification_thread_state *state,
965 const char *session_name, uid_t session_uid, gid_t session_gid,
966 const char *channel_name, enum lttng_domain_type channel_domain,
967 uint64_t channel_key_int, uint64_t channel_capacity,
968 enum lttng_error_code *cmd_result)
969 {
970 struct cds_list_head trigger_list;
971 struct channel_info *new_channel_info = NULL;
972 struct channel_key channel_key = {
973 .key = channel_key_int,
974 .domain = channel_domain,
975 };
976 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
977 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
978 int trigger_count = 0;
979 struct cds_lfht_iter iter;
980 struct session_info *session_info = NULL;
981
982 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
983 channel_name, session_name, channel_key_int,
984 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
985
986 CDS_INIT_LIST_HEAD(&trigger_list);
987
988 session_info = find_or_create_session_info(state, session_name,
989 session_uid, session_gid);
990 if (!session_info) {
991 /* Allocation error or an internal error occured. */
992 goto error;
993 }
994
995 new_channel_info = channel_info_create(channel_name, &channel_key,
996 channel_capacity, session_info);
997 if (!new_channel_info) {
998 goto error;
999 }
1000
1001 /* Build a list of all triggers applying to the new channel. */
1002 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1003 node) {
1004 struct lttng_trigger_list_element *new_element;
1005
1006 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1007 new_channel_info)) {
1008 continue;
1009 }
1010
1011 new_element = zmalloc(sizeof(*new_element));
1012 if (!new_element) {
1013 goto error;
1014 }
1015 CDS_INIT_LIST_HEAD(&new_element->node);
1016 new_element->trigger = trigger_ht_element->trigger;
1017 cds_list_add(&new_element->node, &trigger_list);
1018 trigger_count++;
1019 }
1020
1021 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1022 trigger_count);
1023 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1024 if (!channel_trigger_list) {
1025 goto error;
1026 }
1027 channel_trigger_list->channel_key = new_channel_info->key;
1028 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1029 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1030 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1031
1032 rcu_read_lock();
1033 /* Add channel to the channel_ht which owns the channel_infos. */
1034 cds_lfht_add(state->channels_ht,
1035 hash_channel_key(&new_channel_info->key),
1036 &new_channel_info->channels_ht_node);
1037 /*
1038 * Add the list of triggers associated with this channel to the
1039 * channel_triggers_ht.
1040 */
1041 cds_lfht_add(state->channel_triggers_ht,
1042 hash_channel_key(&new_channel_info->key),
1043 &channel_trigger_list->channel_triggers_ht_node);
1044 rcu_read_unlock();
1045 *cmd_result = LTTNG_OK;
1046 return 0;
1047 error:
1048 channel_info_destroy(new_channel_info);
1049 session_info_put(session_info);
1050 return 1;
1051 }
1052
1053 static
1054 int handle_notification_thread_command_remove_channel(
1055 struct notification_thread_state *state,
1056 uint64_t channel_key, enum lttng_domain_type domain,
1057 enum lttng_error_code *cmd_result)
1058 {
1059 struct cds_lfht_node *node;
1060 struct cds_lfht_iter iter;
1061 struct lttng_channel_trigger_list *trigger_list;
1062 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1063 struct channel_key key = { .key = channel_key, .domain = domain };
1064 struct channel_info *channel_info;
1065
1066 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1067 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1068
1069 rcu_read_lock();
1070
1071 cds_lfht_lookup(state->channel_triggers_ht,
1072 hash_channel_key(&key),
1073 match_channel_trigger_list,
1074 &key,
1075 &iter);
1076 node = cds_lfht_iter_get_node(&iter);
1077 /*
1078 * There is a severe internal error if we are being asked to remove a
1079 * channel that doesn't exist.
1080 */
1081 if (!node) {
1082 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1083 goto end;
1084 }
1085
1086 /* Free the list of triggers associated with this channel. */
1087 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1088 channel_triggers_ht_node);
1089 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1090 &trigger_list->list, node) {
1091 cds_list_del(&trigger_list_element->node);
1092 free(trigger_list_element);
1093 }
1094 cds_lfht_del(state->channel_triggers_ht, node);
1095 free(trigger_list);
1096
1097 /* Free sampled channel state. */
1098 cds_lfht_lookup(state->channel_state_ht,
1099 hash_channel_key(&key),
1100 match_channel_state_sample,
1101 &key,
1102 &iter);
1103 node = cds_lfht_iter_get_node(&iter);
1104 /*
1105 * This is expected to be NULL if the channel is destroyed before we
1106 * received a sample.
1107 */
1108 if (node) {
1109 struct channel_state_sample *sample = caa_container_of(node,
1110 struct channel_state_sample,
1111 channel_state_ht_node);
1112
1113 cds_lfht_del(state->channel_state_ht, node);
1114 free(sample);
1115 }
1116
1117 /* Remove the channel from the channels_ht and free it. */
1118 cds_lfht_lookup(state->channels_ht,
1119 hash_channel_key(&key),
1120 match_channel_info,
1121 &key,
1122 &iter);
1123 node = cds_lfht_iter_get_node(&iter);
1124 assert(node);
1125 channel_info = caa_container_of(node, struct channel_info,
1126 channels_ht_node);
1127 cds_lfht_del(state->channels_ht, node);
1128 channel_info_destroy(channel_info);
1129 end:
1130 rcu_read_unlock();
1131 *cmd_result = LTTNG_OK;
1132 return 0;
1133 }
1134
1135 static
1136 int condition_is_supported(struct lttng_condition *condition)
1137 {
1138 int ret;
1139
1140 switch (lttng_condition_get_type(condition)) {
1141 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1142 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1143 {
1144 enum lttng_domain_type domain;
1145
1146 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1147 &domain);
1148 if (ret) {
1149 ret = -1;
1150 goto end;
1151 }
1152
1153 if (domain != LTTNG_DOMAIN_KERNEL) {
1154 ret = 1;
1155 goto end;
1156 }
1157
1158 /*
1159 * Older kernel tracers don't expose the API to monitor their
1160 * buffers. Therefore, we reject triggers that require that
1161 * mechanism to be available to be evaluated.
1162 */
1163 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1164 kernel_tracer_fd);
1165 break;
1166 }
1167 default:
1168 ret = 1;
1169 }
1170 end:
1171 return ret;
1172 }
1173
1174 /*
1175 * FIXME A client's credentials are not checked when registering a trigger, nor
1176 * are they stored alongside with the trigger.
1177 *
1178 * The effects of this are benign since:
1179 * - The client will succeed in registering the trigger, as it is valid,
1180 * - The trigger will, internally, be bound to the channel,
1181 * - The notifications will not be sent since the client's credentials
1182 * are checked against the channel at that moment.
1183 *
1184 * If this function returns a non-zero value, it means something is
1185 * fundamentally broken and the whole subsystem/thread will be torn down.
1186 *
1187 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1188 * error code.
1189 */
1190 static
1191 int handle_notification_thread_command_register_trigger(
1192 struct notification_thread_state *state,
1193 struct lttng_trigger *trigger,
1194 enum lttng_error_code *cmd_result)
1195 {
1196 int ret = 0;
1197 struct lttng_condition *condition;
1198 struct notification_client *client;
1199 struct notification_client_list *client_list = NULL;
1200 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1201 struct notification_client_list_element *client_list_element, *tmp;
1202 struct cds_lfht_node *node;
1203 struct cds_lfht_iter iter;
1204 struct channel_info *channel;
1205 bool free_trigger = true;
1206
1207 rcu_read_lock();
1208
1209 condition = lttng_trigger_get_condition(trigger);
1210 assert(condition);
1211
1212 ret = condition_is_supported(condition);
1213 if (ret < 0) {
1214 goto error;
1215 } else if (ret == 0) {
1216 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1217 goto error;
1218 } else {
1219 /* Feature is supported, continue. */
1220 ret = 0;
1221 }
1222
1223 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1224 if (!trigger_ht_element) {
1225 ret = -1;
1226 goto error;
1227 }
1228
1229 /* Add trigger to the trigger_ht. */
1230 cds_lfht_node_init(&trigger_ht_element->node);
1231 trigger_ht_element->trigger = trigger;
1232
1233 node = cds_lfht_add_unique(state->triggers_ht,
1234 lttng_condition_hash(condition),
1235 match_condition,
1236 condition,
1237 &trigger_ht_element->node);
1238 if (node != &trigger_ht_element->node) {
1239 /* Not a fatal error, simply report it to the client. */
1240 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1241 goto error_free_ht_element;
1242 }
1243
1244 /*
1245 * Ownership of the trigger and of its wrapper was transfered to
1246 * the triggers_ht.
1247 */
1248 trigger_ht_element = NULL;
1249 free_trigger = false;
1250
1251 /*
1252 * The rest only applies to triggers that have a "notify" action.
1253 * It is not skipped as this is the only action type currently
1254 * supported.
1255 */
1256 client_list = zmalloc(sizeof(*client_list));
1257 if (!client_list) {
1258 ret = -1;
1259 goto error_free_ht_element;
1260 }
1261 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1262 CDS_INIT_LIST_HEAD(&client_list->list);
1263 client_list->trigger = trigger;
1264
1265 /* Build a list of clients to which this new trigger applies. */
1266 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1267 client_socket_ht_node) {
1268 if (!trigger_applies_to_client(trigger, client)) {
1269 continue;
1270 }
1271
1272 client_list_element = zmalloc(sizeof(*client_list_element));
1273 if (!client_list_element) {
1274 ret = -1;
1275 goto error_free_client_list;
1276 }
1277 CDS_INIT_LIST_HEAD(&client_list_element->node);
1278 client_list_element->client = client;
1279 cds_list_add(&client_list_element->node, &client_list->list);
1280 }
1281
1282 cds_lfht_add(state->notification_trigger_clients_ht,
1283 lttng_condition_hash(condition),
1284 &client_list->notification_trigger_ht_node);
1285
1286 /*
1287 * Add the trigger to list of triggers bound to the channels currently
1288 * known.
1289 */
1290 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1291 channels_ht_node) {
1292 struct lttng_trigger_list_element *trigger_list_element;
1293 struct lttng_channel_trigger_list *trigger_list;
1294
1295 if (!trigger_applies_to_channel(trigger, channel)) {
1296 continue;
1297 }
1298
1299 cds_lfht_lookup(state->channel_triggers_ht,
1300 hash_channel_key(&channel->key),
1301 match_channel_trigger_list,
1302 &channel->key,
1303 &iter);
1304 node = cds_lfht_iter_get_node(&iter);
1305 assert(node);
1306 trigger_list = caa_container_of(node,
1307 struct lttng_channel_trigger_list,
1308 channel_triggers_ht_node);
1309
1310 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1311 if (!trigger_list_element) {
1312 ret = -1;
1313 goto error_free_client_list;
1314 }
1315 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1316 trigger_list_element->trigger = trigger;
1317 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1318
1319 /* A trigger can only apply to one channel. */
1320 break;
1321 }
1322
1323 /*
1324 * Since there is nothing preventing clients from subscribing to a
1325 * condition before the corresponding trigger is registered, we have
1326 * to evaluate this new condition right away.
1327 *
1328 * At some point, we were waiting for the next "evaluation" (e.g. on
1329 * reception of a channel sample) to evaluate this new condition, but
1330 * that was broken.
1331 *
1332 * The reason it was broken is that waiting for the next sample
1333 * does not allow us to properly handle transitions for edge-triggered
1334 * conditions.
1335 *
1336 * Consider this example: when we handle a new channel sample, we
1337 * evaluate each conditions twice: once with the previous state, and
1338 * again with the newest state. We then use those two results to
1339 * determine whether a state change happened: a condition was false and
1340 * became true. If a state change happened, we have to notify clients.
1341 *
1342 * Now, if a client subscribes to a given notification and registers
1343 * a trigger *after* that subscription, we have to make sure the
1344 * condition is evaluated at this point while considering only the
1345 * current state. Otherwise, the next evaluation cycle may only see
1346 * that the evaluations remain the same (true for samples n-1 and n) and
1347 * the client will never know that the condition has been met.
1348 */
1349 cds_list_for_each_entry_safe(client_list_element, tmp,
1350 &client_list->list, node) {
1351 ret = evaluate_condition_for_client(trigger, condition,
1352 client_list_element->client, state);
1353 if (ret) {
1354 goto error_free_client_list;
1355 }
1356 }
1357
1358 /*
1359 * Client list ownership transferred to the
1360 * notification_trigger_clients_ht.
1361 */
1362 client_list = NULL;
1363
1364 *cmd_result = LTTNG_OK;
1365 error_free_client_list:
1366 if (client_list) {
1367 cds_list_for_each_entry_safe(client_list_element, tmp,
1368 &client_list->list, node) {
1369 free(client_list_element);
1370 }
1371 free(client_list);
1372 }
1373 error_free_ht_element:
1374 free(trigger_ht_element);
1375 error:
1376 if (free_trigger) {
1377 struct lttng_action *action = lttng_trigger_get_action(trigger);
1378
1379 lttng_condition_destroy(condition);
1380 lttng_action_destroy(action);
1381 lttng_trigger_destroy(trigger);
1382 }
1383 rcu_read_unlock();
1384 return ret;
1385 }
1386
1387 static
1388 int handle_notification_thread_command_unregister_trigger(
1389 struct notification_thread_state *state,
1390 struct lttng_trigger *trigger,
1391 enum lttng_error_code *_cmd_reply)
1392 {
1393 struct cds_lfht_iter iter;
1394 struct cds_lfht_node *node, *triggers_ht_node;
1395 struct lttng_channel_trigger_list *trigger_list;
1396 struct notification_client_list *client_list;
1397 struct notification_client_list_element *client_list_element, *tmp;
1398 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1399 struct lttng_condition *condition = lttng_trigger_get_condition(
1400 trigger);
1401 struct lttng_action *action;
1402 enum lttng_error_code cmd_reply;
1403
1404 rcu_read_lock();
1405
1406 cds_lfht_lookup(state->triggers_ht,
1407 lttng_condition_hash(condition),
1408 match_condition,
1409 condition,
1410 &iter);
1411 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1412 if (!triggers_ht_node) {
1413 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1414 goto end;
1415 } else {
1416 cmd_reply = LTTNG_OK;
1417 }
1418
1419 /* Remove trigger from channel_triggers_ht. */
1420 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1421 channel_triggers_ht_node) {
1422 struct lttng_trigger_list_element *trigger_element, *tmp;
1423
1424 cds_list_for_each_entry_safe(trigger_element, tmp,
1425 &trigger_list->list, node) {
1426 struct lttng_condition *current_condition =
1427 lttng_trigger_get_condition(
1428 trigger_element->trigger);
1429
1430 assert(current_condition);
1431 if (!lttng_condition_is_equal(condition,
1432 current_condition)) {
1433 continue;
1434 }
1435
1436 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1437 cds_list_del(&trigger_element->node);
1438 /* A trigger can only appear once per channel */
1439 break;
1440 }
1441 }
1442
1443 /*
1444 * Remove and release the client list from
1445 * notification_trigger_clients_ht.
1446 */
1447 cds_lfht_lookup(state->notification_trigger_clients_ht,
1448 lttng_condition_hash(condition),
1449 match_client_list,
1450 trigger,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 assert(node);
1454 client_list = caa_container_of(node, struct notification_client_list,
1455 notification_trigger_ht_node);
1456 cds_list_for_each_entry_safe(client_list_element, tmp,
1457 &client_list->list, node) {
1458 free(client_list_element);
1459 }
1460 cds_lfht_del(state->notification_trigger_clients_ht, node);
1461 free(client_list);
1462
1463 /* Remove trigger from triggers_ht. */
1464 trigger_ht_element = caa_container_of(triggers_ht_node,
1465 struct lttng_trigger_ht_element, node);
1466 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1467
1468 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1469 lttng_condition_destroy(condition);
1470 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1471 lttng_action_destroy(action);
1472 lttng_trigger_destroy(trigger_ht_element->trigger);
1473 free(trigger_ht_element);
1474 end:
1475 rcu_read_unlock();
1476 if (_cmd_reply) {
1477 *_cmd_reply = cmd_reply;
1478 }
1479 return 0;
1480 }
1481
1482 /* Returns 0 on success, 1 on exit requested, negative value on error. */
1483 int handle_notification_thread_command(
1484 struct notification_thread_handle *handle,
1485 struct notification_thread_state *state)
1486 {
1487 int ret;
1488 uint64_t counter;
1489 struct notification_thread_command *cmd;
1490
1491 /* Read the event pipe to put it back into a quiescent state. */
1492 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1493 sizeof(counter));
1494 if (ret == -1) {
1495 goto error;
1496 }
1497
1498 pthread_mutex_lock(&handle->cmd_queue.lock);
1499 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1500 struct notification_thread_command, cmd_list_node);
1501 switch (cmd->type) {
1502 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1503 DBG("[notification-thread] Received register trigger command");
1504 ret = handle_notification_thread_command_register_trigger(
1505 state, cmd->parameters.trigger,
1506 &cmd->reply_code);
1507 break;
1508 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1509 DBG("[notification-thread] Received unregister trigger command");
1510 ret = handle_notification_thread_command_unregister_trigger(
1511 state, cmd->parameters.trigger,
1512 &cmd->reply_code);
1513 break;
1514 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1515 DBG("[notification-thread] Received add channel command");
1516 ret = handle_notification_thread_command_add_channel(
1517 state,
1518 cmd->parameters.add_channel.session.name,
1519 cmd->parameters.add_channel.session.uid,
1520 cmd->parameters.add_channel.session.gid,
1521 cmd->parameters.add_channel.channel.name,
1522 cmd->parameters.add_channel.channel.domain,
1523 cmd->parameters.add_channel.channel.key,
1524 cmd->parameters.add_channel.channel.capacity,
1525 &cmd->reply_code);
1526 break;
1527 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1528 DBG("[notification-thread] Received remove channel command");
1529 ret = handle_notification_thread_command_remove_channel(
1530 state, cmd->parameters.remove_channel.key,
1531 cmd->parameters.remove_channel.domain,
1532 &cmd->reply_code);
1533 break;
1534 case NOTIFICATION_COMMAND_TYPE_QUIT:
1535 DBG("[notification-thread] Received quit command");
1536 cmd->reply_code = LTTNG_OK;
1537 ret = 1;
1538 goto end;
1539 default:
1540 ERR("[notification-thread] Unknown internal command received");
1541 goto error_unlock;
1542 }
1543
1544 if (ret) {
1545 goto error_unlock;
1546 }
1547 end:
1548 cds_list_del(&cmd->cmd_list_node);
1549 lttng_waiter_wake_up(&cmd->reply_waiter);
1550 pthread_mutex_unlock(&handle->cmd_queue.lock);
1551 return ret;
1552 error_unlock:
1553 /* Wake-up and return a fatal error to the calling thread. */
1554 lttng_waiter_wake_up(&cmd->reply_waiter);
1555 pthread_mutex_unlock(&handle->cmd_queue.lock);
1556 cmd->reply_code = LTTNG_ERR_FATAL;
1557 error:
1558 /* Indicate a fatal error to the caller. */
1559 return -1;
1560 }
1561
1562 static
1563 unsigned long hash_client_socket(int socket)
1564 {
1565 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1566 }
1567
1568 static
1569 int socket_set_non_blocking(int socket)
1570 {
1571 int ret, flags;
1572
1573 /* Set the pipe as non-blocking. */
1574 ret = fcntl(socket, F_GETFL, 0);
1575 if (ret == -1) {
1576 PERROR("fcntl get socket flags");
1577 goto end;
1578 }
1579 flags = ret;
1580
1581 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1582 if (ret == -1) {
1583 PERROR("fcntl set O_NONBLOCK socket flag");
1584 goto end;
1585 }
1586 DBG("Client socket (fd = %i) set as non-blocking", socket);
1587 end:
1588 return ret;
1589 }
1590
1591 static
1592 int client_reset_inbound_state(struct notification_client *client)
1593 {
1594 int ret;
1595
1596 ret = lttng_dynamic_buffer_set_size(
1597 &client->communication.inbound.buffer, 0);
1598 assert(!ret);
1599
1600 client->communication.inbound.bytes_to_receive =
1601 sizeof(struct lttng_notification_channel_message);
1602 client->communication.inbound.msg_type =
1603 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1604 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1605 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1606 ret = lttng_dynamic_buffer_set_size(
1607 &client->communication.inbound.buffer,
1608 client->communication.inbound.bytes_to_receive);
1609 return ret;
1610 }
1611
1612 int handle_notification_thread_client_connect(
1613 struct notification_thread_state *state)
1614 {
1615 int ret;
1616 struct notification_client *client;
1617
1618 DBG("[notification-thread] Handling new notification channel client connection");
1619
1620 client = zmalloc(sizeof(*client));
1621 if (!client) {
1622 /* Fatal error. */
1623 ret = -1;
1624 goto error;
1625 }
1626 CDS_INIT_LIST_HEAD(&client->condition_list);
1627 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1628 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1629 client->communication.inbound.expect_creds = true;
1630 ret = client_reset_inbound_state(client);
1631 if (ret) {
1632 ERR("[notification-thread] Failed to reset client communication's inbound state");
1633 ret = 0;
1634 goto error;
1635 }
1636
1637 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1638 if (ret < 0) {
1639 ERR("[notification-thread] Failed to accept new notification channel client connection");
1640 ret = 0;
1641 goto error;
1642 }
1643
1644 client->socket = ret;
1645
1646 ret = socket_set_non_blocking(client->socket);
1647 if (ret) {
1648 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1649 goto error;
1650 }
1651
1652 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1653 if (ret < 0) {
1654 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1655 ret = 0;
1656 goto error;
1657 }
1658
1659 ret = lttng_poll_add(&state->events, client->socket,
1660 LPOLLIN | LPOLLERR |
1661 LPOLLHUP | LPOLLRDHUP);
1662 if (ret < 0) {
1663 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1664 ret = 0;
1665 goto error;
1666 }
1667 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1668 client->socket);
1669
1670 rcu_read_lock();
1671 cds_lfht_add(state->client_socket_ht,
1672 hash_client_socket(client->socket),
1673 &client->client_socket_ht_node);
1674 rcu_read_unlock();
1675
1676 return ret;
1677 error:
1678 notification_client_destroy(client, state);
1679 return ret;
1680 }
1681
1682 int handle_notification_thread_client_disconnect(
1683 int client_socket,
1684 struct notification_thread_state *state)
1685 {
1686 int ret = 0;
1687 struct notification_client *client;
1688
1689 rcu_read_lock();
1690 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1691 client_socket);
1692 client = get_client_from_socket(client_socket, state);
1693 if (!client) {
1694 /* Internal state corruption, fatal error. */
1695 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1696 client_socket);
1697 ret = -1;
1698 goto end;
1699 }
1700
1701 ret = lttng_poll_del(&state->events, client_socket);
1702 if (ret) {
1703 ERR("[notification-thread] Failed to remove client socket from poll set");
1704 }
1705 cds_lfht_del(state->client_socket_ht,
1706 &client->client_socket_ht_node);
1707 notification_client_destroy(client, state);
1708 end:
1709 rcu_read_unlock();
1710 return ret;
1711 }
1712
1713 int handle_notification_thread_client_disconnect_all(
1714 struct notification_thread_state *state)
1715 {
1716 struct cds_lfht_iter iter;
1717 struct notification_client *client;
1718 bool error_encoutered = false;
1719
1720 rcu_read_lock();
1721 DBG("[notification-thread] Closing all client connections");
1722 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1723 client_socket_ht_node) {
1724 int ret;
1725
1726 ret = handle_notification_thread_client_disconnect(
1727 client->socket, state);
1728 if (ret) {
1729 error_encoutered = true;
1730 }
1731 }
1732 rcu_read_unlock();
1733 return error_encoutered ? 1 : 0;
1734 }
1735
1736 int handle_notification_thread_trigger_unregister_all(
1737 struct notification_thread_state *state)
1738 {
1739 bool error_occurred = false;
1740 struct cds_lfht_iter iter;
1741 struct lttng_trigger_ht_element *trigger_ht_element;
1742
1743 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1744 node) {
1745 int ret = handle_notification_thread_command_unregister_trigger(
1746 state, trigger_ht_element->trigger, NULL);
1747 if (ret) {
1748 error_occurred = true;
1749 }
1750 }
1751 return error_occurred ? -1 : 0;
1752 }
1753
1754 static
1755 int client_flush_outgoing_queue(struct notification_client *client,
1756 struct notification_thread_state *state)
1757 {
1758 ssize_t ret;
1759 size_t to_send_count;
1760
1761 assert(client->communication.outbound.buffer.size != 0);
1762 to_send_count = client->communication.outbound.buffer.size;
1763 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1764 client->socket);
1765
1766 ret = lttcomm_send_unix_sock_non_block(client->socket,
1767 client->communication.outbound.buffer.data,
1768 to_send_count);
1769 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1770 (ret > 0 && ret < to_send_count)) {
1771 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1772 client->socket);
1773 to_send_count -= max(ret, 0);
1774
1775 memcpy(client->communication.outbound.buffer.data,
1776 client->communication.outbound.buffer.data +
1777 client->communication.outbound.buffer.size - to_send_count,
1778 to_send_count);
1779 ret = lttng_dynamic_buffer_set_size(
1780 &client->communication.outbound.buffer,
1781 to_send_count);
1782 if (ret) {
1783 goto error;
1784 }
1785
1786 /*
1787 * We want to be notified whenever there is buffer space
1788 * available to send the rest of the payload.
1789 */
1790 ret = lttng_poll_mod(&state->events, client->socket,
1791 CLIENT_POLL_MASK_IN_OUT);
1792 if (ret) {
1793 goto error;
1794 }
1795 } else if (ret < 0) {
1796 /* Generic error, disconnect the client. */
1797 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1798 client->socket);
1799 ret = handle_notification_thread_client_disconnect(
1800 client->socket, state);
1801 if (ret) {
1802 goto error;
1803 }
1804 } else {
1805 /* No error and flushed the queue completely. */
1806 ret = lttng_dynamic_buffer_set_size(
1807 &client->communication.outbound.buffer, 0);
1808 if (ret) {
1809 goto error;
1810 }
1811 ret = lttng_poll_mod(&state->events, client->socket,
1812 CLIENT_POLL_MASK_IN);
1813 if (ret) {
1814 goto error;
1815 }
1816
1817 client->communication.outbound.queued_command_reply = false;
1818 client->communication.outbound.dropped_notification = false;
1819 }
1820
1821 return 0;
1822 error:
1823 return -1;
1824 }
1825
1826 static
1827 int client_send_command_reply(struct notification_client *client,
1828 struct notification_thread_state *state,
1829 enum lttng_notification_channel_status status)
1830 {
1831 int ret;
1832 struct lttng_notification_channel_command_reply reply = {
1833 .status = (int8_t) status,
1834 };
1835 struct lttng_notification_channel_message msg = {
1836 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1837 .size = sizeof(reply),
1838 };
1839 char buffer[sizeof(msg) + sizeof(reply)];
1840
1841 if (client->communication.outbound.queued_command_reply) {
1842 /* Protocol error. */
1843 goto error;
1844 }
1845
1846 memcpy(buffer, &msg, sizeof(msg));
1847 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1848 DBG("[notification-thread] Send command reply (%i)", (int) status);
1849
1850 /* Enqueue buffer to outgoing queue and flush it. */
1851 ret = lttng_dynamic_buffer_append(
1852 &client->communication.outbound.buffer,
1853 buffer, sizeof(buffer));
1854 if (ret) {
1855 goto error;
1856 }
1857
1858 ret = client_flush_outgoing_queue(client, state);
1859 if (ret) {
1860 goto error;
1861 }
1862
1863 if (client->communication.outbound.buffer.size != 0) {
1864 /* Queue could not be emptied. */
1865 client->communication.outbound.queued_command_reply = true;
1866 }
1867
1868 return 0;
1869 error:
1870 return -1;
1871 }
1872
1873 static
1874 int client_dispatch_message(struct notification_client *client,
1875 struct notification_thread_state *state)
1876 {
1877 int ret = 0;
1878
1879 if (client->communication.inbound.msg_type !=
1880 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1881 client->communication.inbound.msg_type !=
1882 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1883 !client->validated) {
1884 WARN("[notification-thread] client attempted a command before handshake");
1885 ret = -1;
1886 goto end;
1887 }
1888
1889 switch (client->communication.inbound.msg_type) {
1890 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1891 {
1892 /*
1893 * Receiving message header. The function will be called again
1894 * once the rest of the message as been received and can be
1895 * interpreted.
1896 */
1897 const struct lttng_notification_channel_message *msg;
1898
1899 assert(sizeof(*msg) ==
1900 client->communication.inbound.buffer.size);
1901 msg = (const struct lttng_notification_channel_message *)
1902 client->communication.inbound.buffer.data;
1903
1904 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1905 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1906 ret = -1;
1907 goto end;
1908 }
1909
1910 switch (msg->type) {
1911 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1912 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1913 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1914 break;
1915 default:
1916 ret = -1;
1917 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1918 goto end;
1919 }
1920
1921 client->communication.inbound.bytes_to_receive = msg->size;
1922 client->communication.inbound.msg_type =
1923 (enum lttng_notification_channel_message_type) msg->type;
1924 ret = lttng_dynamic_buffer_set_size(
1925 &client->communication.inbound.buffer, msg->size);
1926 if (ret) {
1927 goto end;
1928 }
1929 break;
1930 }
1931 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1932 {
1933 struct lttng_notification_channel_command_handshake *handshake_client;
1934 struct lttng_notification_channel_command_handshake handshake_reply = {
1935 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1936 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1937 };
1938 struct lttng_notification_channel_message msg_header = {
1939 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1940 .size = sizeof(handshake_reply),
1941 };
1942 enum lttng_notification_channel_status status =
1943 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1944 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1945
1946 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1947 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1948 sizeof(handshake_reply));
1949
1950 handshake_client =
1951 (struct lttng_notification_channel_command_handshake *)
1952 client->communication.inbound.buffer.data;
1953 client->major = handshake_client->major;
1954 client->minor = handshake_client->minor;
1955 if (!client->communication.inbound.creds_received) {
1956 ERR("[notification-thread] No credentials received from client");
1957 ret = -1;
1958 goto end;
1959 }
1960
1961 client->uid = LTTNG_SOCK_GET_UID_CRED(
1962 &client->communication.inbound.creds);
1963 client->gid = LTTNG_SOCK_GET_GID_CRED(
1964 &client->communication.inbound.creds);
1965 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1966 client->uid, client->gid, (int) client->major,
1967 (int) client->minor);
1968
1969 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1970 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1971 }
1972
1973 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1974 send_buffer, sizeof(send_buffer));
1975 if (ret) {
1976 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1977 goto end;
1978 }
1979
1980 ret = client_flush_outgoing_queue(client, state);
1981 if (ret) {
1982 goto end;
1983 }
1984
1985 ret = client_send_command_reply(client, state, status);
1986 if (ret) {
1987 ERR("[notification-thread] Failed to send reply to notification channel client");
1988 goto end;
1989 }
1990
1991 /* Set reception state to receive the next message header. */
1992 ret = client_reset_inbound_state(client);
1993 if (ret) {
1994 ERR("[notification-thread] Failed to reset client communication's inbound state");
1995 goto end;
1996 }
1997 client->validated = true;
1998 break;
1999 }
2000 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2001 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2002 {
2003 struct lttng_condition *condition;
2004 enum lttng_notification_channel_status status =
2005 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2006 const struct lttng_buffer_view condition_view =
2007 lttng_buffer_view_from_dynamic_buffer(
2008 &client->communication.inbound.buffer,
2009 0, -1);
2010 size_t expected_condition_size =
2011 client->communication.inbound.buffer.size;
2012
2013 ret = lttng_condition_create_from_buffer(&condition_view,
2014 &condition);
2015 if (ret != expected_condition_size) {
2016 ERR("[notification-thread] Malformed condition received from client");
2017 goto end;
2018 }
2019
2020 if (client->communication.inbound.msg_type ==
2021 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2022 ret = notification_thread_client_subscribe(client,
2023 condition, state, &status);
2024 } else {
2025 ret = notification_thread_client_unsubscribe(client,
2026 condition, state, &status);
2027 }
2028 if (ret) {
2029 goto end;
2030 }
2031
2032 ret = client_send_command_reply(client, state, status);
2033 if (ret) {
2034 ERR("[notification-thread] Failed to send reply to notification channel client");
2035 goto end;
2036 }
2037
2038 /* Set reception state to receive the next message header. */
2039 ret = client_reset_inbound_state(client);
2040 if (ret) {
2041 ERR("[notification-thread] Failed to reset client communication's inbound state");
2042 goto end;
2043 }
2044 break;
2045 }
2046 default:
2047 abort();
2048 }
2049 end:
2050 return ret;
2051 }
2052
2053 /* Incoming data from client. */
2054 int handle_notification_thread_client_in(
2055 struct notification_thread_state *state, int socket)
2056 {
2057 int ret = 0;
2058 struct notification_client *client;
2059 ssize_t recv_ret;
2060 size_t offset;
2061
2062 client = get_client_from_socket(socket, state);
2063 if (!client) {
2064 /* Internal error, abort. */
2065 ret = -1;
2066 goto end;
2067 }
2068
2069 offset = client->communication.inbound.buffer.size -
2070 client->communication.inbound.bytes_to_receive;
2071 if (client->communication.inbound.expect_creds) {
2072 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2073 client->communication.inbound.buffer.data + offset,
2074 client->communication.inbound.bytes_to_receive,
2075 &client->communication.inbound.creds);
2076 if (recv_ret > 0) {
2077 client->communication.inbound.expect_creds = false;
2078 client->communication.inbound.creds_received = true;
2079 }
2080 } else {
2081 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2082 client->communication.inbound.buffer.data + offset,
2083 client->communication.inbound.bytes_to_receive);
2084 }
2085 if (recv_ret < 0) {
2086 goto error_disconnect_client;
2087 }
2088
2089 client->communication.inbound.bytes_to_receive -= recv_ret;
2090 if (client->communication.inbound.bytes_to_receive == 0) {
2091 ret = client_dispatch_message(client, state);
2092 if (ret) {
2093 /*
2094 * Only returns an error if this client must be
2095 * disconnected.
2096 */
2097 goto error_disconnect_client;
2098 }
2099 } else {
2100 goto end;
2101 }
2102 end:
2103 return ret;
2104 error_disconnect_client:
2105 ret = handle_notification_thread_client_disconnect(socket, state);
2106 return ret;
2107 }
2108
2109 /* Client ready to receive outgoing data. */
2110 int handle_notification_thread_client_out(
2111 struct notification_thread_state *state, int socket)
2112 {
2113 int ret;
2114 struct notification_client *client;
2115
2116 client = get_client_from_socket(socket, state);
2117 if (!client) {
2118 /* Internal error, abort. */
2119 ret = -1;
2120 goto end;
2121 }
2122
2123 ret = client_flush_outgoing_queue(client, state);
2124 if (ret) {
2125 goto end;
2126 }
2127 end:
2128 return ret;
2129 }
2130
2131 static
2132 bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
2133 struct channel_state_sample *sample, uint64_t buffer_capacity)
2134 {
2135 bool result = false;
2136 uint64_t threshold;
2137 enum lttng_condition_type condition_type;
2138 struct lttng_condition_buffer_usage *use_condition = container_of(
2139 condition, struct lttng_condition_buffer_usage,
2140 parent);
2141
2142 if (!sample) {
2143 goto end;
2144 }
2145
2146 if (use_condition->threshold_bytes.set) {
2147 threshold = use_condition->threshold_bytes.value;
2148 } else {
2149 /*
2150 * Threshold was expressed as a ratio.
2151 *
2152 * TODO the threshold (in bytes) of conditions expressed
2153 * as a ratio of total buffer size could be cached to
2154 * forego this double-multiplication or it could be performed
2155 * as fixed-point math.
2156 *
2157 * Note that caching should accomodate the case where the
2158 * condition applies to multiple channels (i.e. don't assume
2159 * that all channels matching my_chann* have the same size...)
2160 */
2161 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2162 (double) buffer_capacity);
2163 }
2164
2165 condition_type = lttng_condition_get_type(condition);
2166 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2167 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2168 threshold, sample->highest_usage);
2169
2170 /*
2171 * The low condition should only be triggered once _all_ of the
2172 * streams in a channel have gone below the "low" threshold.
2173 */
2174 if (sample->highest_usage <= threshold) {
2175 result = true;
2176 }
2177 } else {
2178 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2179 threshold, sample->highest_usage);
2180
2181 /*
2182 * For high buffer usage scenarios, we want to trigger whenever
2183 * _any_ of the streams has reached the "high" threshold.
2184 */
2185 if (sample->highest_usage >= threshold) {
2186 result = true;
2187 }
2188 }
2189 end:
2190 return result;
2191 }
2192
2193 static
2194 int evaluate_condition(struct lttng_condition *condition,
2195 struct lttng_evaluation **evaluation,
2196 struct notification_thread_state *state,
2197 struct channel_state_sample *previous_sample,
2198 struct channel_state_sample *latest_sample,
2199 uint64_t buffer_capacity)
2200 {
2201 int ret = 0;
2202 enum lttng_condition_type condition_type;
2203 bool previous_sample_result;
2204 bool latest_sample_result;
2205
2206 condition_type = lttng_condition_get_type(condition);
2207 /* No other condition type supported for the moment. */
2208 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2209 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2210
2211 previous_sample_result = evaluate_buffer_usage_condition(condition,
2212 previous_sample, buffer_capacity);
2213 latest_sample_result = evaluate_buffer_usage_condition(condition,
2214 latest_sample, buffer_capacity);
2215
2216 if (!latest_sample_result ||
2217 (previous_sample_result == latest_sample_result)) {
2218 /*
2219 * Only trigger on a condition evaluation transition.
2220 *
2221 * NOTE: This edge-triggered logic may not be appropriate for
2222 * future condition types.
2223 */
2224 goto end;
2225 }
2226
2227 if (evaluation && latest_sample_result) {
2228 *evaluation = lttng_evaluation_buffer_usage_create(
2229 condition_type,
2230 latest_sample->highest_usage,
2231 buffer_capacity);
2232 if (!*evaluation) {
2233 ret = -1;
2234 goto end;
2235 }
2236 }
2237 end:
2238 return ret;
2239 }
2240
2241 static
2242 int client_enqueue_dropped_notification(struct notification_client *client,
2243 struct notification_thread_state *state)
2244 {
2245 int ret;
2246 struct lttng_notification_channel_message msg = {
2247 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2248 .size = 0,
2249 };
2250
2251 ret = lttng_dynamic_buffer_append(
2252 &client->communication.outbound.buffer, &msg,
2253 sizeof(msg));
2254 return ret;
2255 }
2256
2257 static
2258 int send_evaluation_to_clients(struct lttng_trigger *trigger,
2259 struct lttng_evaluation *evaluation,
2260 struct notification_client_list* client_list,
2261 struct notification_thread_state *state,
2262 uid_t channel_uid, gid_t channel_gid)
2263 {
2264 int ret = 0;
2265 struct lttng_dynamic_buffer msg_buffer;
2266 struct notification_client_list_element *client_list_element, *tmp;
2267 struct lttng_notification *notification;
2268 struct lttng_condition *condition;
2269 ssize_t expected_notification_size, notification_size;
2270 struct lttng_notification_channel_message msg;
2271
2272 lttng_dynamic_buffer_init(&msg_buffer);
2273
2274 condition = lttng_trigger_get_condition(trigger);
2275 assert(condition);
2276
2277 notification = lttng_notification_create(condition, evaluation);
2278 if (!notification) {
2279 ret = -1;
2280 goto end;
2281 }
2282
2283 expected_notification_size = lttng_notification_serialize(notification,
2284 NULL);
2285 if (expected_notification_size < 0) {
2286 ERR("[notification-thread] Failed to get size of serialized notification");
2287 ret = -1;
2288 goto end;
2289 }
2290
2291 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2292 msg.size = (uint32_t) expected_notification_size;
2293 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2294 if (ret) {
2295 goto end;
2296 }
2297
2298 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2299 msg_buffer.size + expected_notification_size);
2300 if (ret) {
2301 goto end;
2302 }
2303
2304 notification_size = lttng_notification_serialize(notification,
2305 msg_buffer.data + sizeof(msg));
2306 if (notification_size != expected_notification_size) {
2307 ERR("[notification-thread] Failed to serialize notification");
2308 ret = -1;
2309 goto end;
2310 }
2311
2312 cds_list_for_each_entry_safe(client_list_element, tmp,
2313 &client_list->list, node) {
2314 struct notification_client *client =
2315 client_list_element->client;
2316
2317 if (client->uid != channel_uid && client->gid != channel_gid &&
2318 client->uid != 0) {
2319 /* Client is not allowed to monitor this channel. */
2320 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2321 continue;
2322 }
2323
2324 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2325 client->socket, msg_buffer.size);
2326 if (client->communication.outbound.buffer.size) {
2327 /*
2328 * Outgoing data is already buffered for this client;
2329 * drop the notification and enqueue a "dropped
2330 * notification" message if this is the first dropped
2331 * notification since the socket spilled-over to the
2332 * queue.
2333 */
2334 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2335 client->socket);
2336 if (!client->communication.outbound.dropped_notification) {
2337 client->communication.outbound.dropped_notification = true;
2338 ret = client_enqueue_dropped_notification(
2339 client, state);
2340 if (ret) {
2341 goto end;
2342 }
2343 }
2344 continue;
2345 }
2346
2347 ret = lttng_dynamic_buffer_append_buffer(
2348 &client->communication.outbound.buffer,
2349 &msg_buffer);
2350 if (ret) {
2351 goto end;
2352 }
2353
2354 ret = client_flush_outgoing_queue(client, state);
2355 if (ret) {
2356 goto end;
2357 }
2358 }
2359 ret = 0;
2360 end:
2361 lttng_notification_destroy(notification);
2362 lttng_dynamic_buffer_reset(&msg_buffer);
2363 return ret;
2364 }
2365
2366 int handle_notification_thread_channel_sample(
2367 struct notification_thread_state *state, int pipe,
2368 enum lttng_domain_type domain)
2369 {
2370 int ret = 0;
2371 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2372 struct channel_state_sample previous_sample, latest_sample;
2373 struct channel_info *channel_info;
2374 struct cds_lfht_node *node;
2375 struct cds_lfht_iter iter;
2376 struct lttng_channel_trigger_list *trigger_list;
2377 struct lttng_trigger_list_element *trigger_list_element;
2378 bool previous_sample_available = false;
2379
2380 /*
2381 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2382 * ensuring that read/write of sampling messages are atomic.
2383 */
2384 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2385 if (ret != sizeof(sample_msg)) {
2386 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2387 pipe);
2388 ret = -1;
2389 goto end;
2390 }
2391
2392 ret = 0;
2393 latest_sample.key.key = sample_msg.key;
2394 latest_sample.key.domain = domain;
2395 latest_sample.highest_usage = sample_msg.highest;
2396 latest_sample.lowest_usage = sample_msg.lowest;
2397
2398 rcu_read_lock();
2399
2400 /* Retrieve the channel's informations */
2401 cds_lfht_lookup(state->channels_ht,
2402 hash_channel_key(&latest_sample.key),
2403 match_channel_info,
2404 &latest_sample.key,
2405 &iter);
2406 node = cds_lfht_iter_get_node(&iter);
2407 if (caa_unlikely(!node)) {
2408 /*
2409 * Not an error since the consumer can push a sample to the pipe
2410 * and the rest of the session daemon could notify us of the
2411 * channel's destruction before we get a chance to process that
2412 * sample.
2413 */
2414 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2415 latest_sample.key.key,
2416 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2417 "user space");
2418 goto end_unlock;
2419 }
2420 channel_info = caa_container_of(node, struct channel_info,
2421 channels_ht_node);
2422 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2423 channel_info->name,
2424 latest_sample.key.key,
2425 channel_info->session_info->name,
2426 latest_sample.highest_usage,
2427 latest_sample.lowest_usage);
2428
2429 /* Retrieve the channel's last sample, if it exists, and update it. */
2430 cds_lfht_lookup(state->channel_state_ht,
2431 hash_channel_key(&latest_sample.key),
2432 match_channel_state_sample,
2433 &latest_sample.key,
2434 &iter);
2435 node = cds_lfht_iter_get_node(&iter);
2436 if (caa_likely(node)) {
2437 struct channel_state_sample *stored_sample;
2438
2439 /* Update the sample stored. */
2440 stored_sample = caa_container_of(node,
2441 struct channel_state_sample,
2442 channel_state_ht_node);
2443 memcpy(&previous_sample, stored_sample,
2444 sizeof(previous_sample));
2445 stored_sample->highest_usage = latest_sample.highest_usage;
2446 stored_sample->lowest_usage = latest_sample.lowest_usage;
2447 previous_sample_available = true;
2448 } else {
2449 /*
2450 * This is the channel's first sample, allocate space for and
2451 * store the new sample.
2452 */
2453 struct channel_state_sample *stored_sample;
2454
2455 stored_sample = zmalloc(sizeof(*stored_sample));
2456 if (!stored_sample) {
2457 ret = -1;
2458 goto end_unlock;
2459 }
2460
2461 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2462 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2463 cds_lfht_add(state->channel_state_ht,
2464 hash_channel_key(&stored_sample->key),
2465 &stored_sample->channel_state_ht_node);
2466 }
2467
2468 /* Find triggers associated with this channel. */
2469 cds_lfht_lookup(state->channel_triggers_ht,
2470 hash_channel_key(&latest_sample.key),
2471 match_channel_trigger_list,
2472 &latest_sample.key,
2473 &iter);
2474 node = cds_lfht_iter_get_node(&iter);
2475 if (caa_likely(!node)) {
2476 goto end_unlock;
2477 }
2478
2479 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2480 channel_triggers_ht_node);
2481 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2482 node) {
2483 struct lttng_condition *condition;
2484 struct lttng_action *action;
2485 struct lttng_trigger *trigger;
2486 struct notification_client_list *client_list;
2487 struct lttng_evaluation *evaluation = NULL;
2488
2489 trigger = trigger_list_element->trigger;
2490 condition = lttng_trigger_get_condition(trigger);
2491 assert(condition);
2492 action = lttng_trigger_get_action(trigger);
2493
2494 /* Notify actions are the only type currently supported. */
2495 assert(lttng_action_get_type(action) ==
2496 LTTNG_ACTION_TYPE_NOTIFY);
2497
2498 /*
2499 * Check if any client is subscribed to the result of this
2500 * evaluation.
2501 */
2502 cds_lfht_lookup(state->notification_trigger_clients_ht,
2503 lttng_condition_hash(condition),
2504 match_client_list,
2505 trigger,
2506 &iter);
2507 node = cds_lfht_iter_get_node(&iter);
2508 assert(node);
2509
2510 client_list = caa_container_of(node,
2511 struct notification_client_list,
2512 notification_trigger_ht_node);
2513 if (cds_list_empty(&client_list->list)) {
2514 /*
2515 * No clients interested in the evaluation's result,
2516 * skip it.
2517 */
2518 continue;
2519 }
2520
2521 ret = evaluate_condition(condition, &evaluation, state,
2522 previous_sample_available ? &previous_sample : NULL,
2523 &latest_sample, channel_info->capacity);
2524 if (ret) {
2525 goto end_unlock;
2526 }
2527
2528 if (!evaluation) {
2529 continue;
2530 }
2531
2532 /* Dispatch evaluation result to all clients. */
2533 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2534 evaluation, client_list, state,
2535 channel_info->session_info->uid,
2536 channel_info->session_info->gid);
2537 lttng_evaluation_destroy(evaluation);
2538 if (caa_unlikely(ret)) {
2539 goto end_unlock;
2540 }
2541 }
2542 end_unlock:
2543 rcu_read_unlock();
2544 end:
2545 return ret;
2546 }
This page took 0.121947 seconds and 6 git commands to generate.