Fix: notification thread: RCU-safe reclaim of hash table nodes
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/notification/channel-internal.h>
36
37 #include <time.h>
38 #include <unistd.h>
39 #include <assert.h>
40 #include <inttypes.h>
41 #include <fcntl.h>
42
43 #include "notification-thread.h"
44 #include "notification-thread-events.h"
45 #include "notification-thread-commands.h"
46 #include "lttng-sessiond.h"
47 #include "kernel.h"
48
49 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52 struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61 /* call_rcu delayed reclaim. */
62 struct rcu_head rcu_node;
63 };
64
65 struct lttng_trigger_ht_element {
66 struct lttng_trigger *trigger;
67 struct cds_lfht_node node;
68 /* call_rcu delayed reclaim. */
69 struct rcu_head rcu_node;
70 };
71
72 struct lttng_condition_list_element {
73 struct lttng_condition *condition;
74 struct cds_list_head node;
75 };
76
77 struct notification_client_list_element {
78 struct notification_client *client;
79 struct cds_list_head node;
80 };
81
82 struct notification_client_list {
83 struct lttng_trigger *trigger;
84 struct cds_list_head list;
85 struct cds_lfht_node notification_trigger_ht_node;
86 /* call_rcu delayed reclaim. */
87 struct rcu_head rcu_node;
88 };
89
90 struct notification_client {
91 int socket;
92 /* Client protocol version. */
93 uint8_t major, minor;
94 uid_t uid;
95 gid_t gid;
96 /*
97 * Indicates if the credentials and versions of the client have been
98 * checked.
99 */
100 bool validated;
101 /*
102 * Conditions to which the client's notification channel is subscribed.
103 * List of struct lttng_condition_list_node. The condition member is
104 * owned by the client.
105 */
106 struct cds_list_head condition_list;
107 struct cds_lfht_node client_socket_ht_node;
108 struct {
109 struct {
110 /*
111 * During the reception of a message, the reception
112 * buffers' "size" is set to contain the current
113 * message's complete payload.
114 */
115 struct lttng_dynamic_buffer buffer;
116 /* Bytes left to receive for the current message. */
117 size_t bytes_to_receive;
118 /* Type of the message being received. */
119 enum lttng_notification_channel_message_type msg_type;
120 /*
121 * Indicates whether or not credentials are expected
122 * from the client.
123 */
124 bool expect_creds;
125 /*
126 * Indicates whether or not credentials were received
127 * from the client.
128 */
129 bool creds_received;
130 /* Only used during credentials reception. */
131 lttng_sock_cred creds;
132 } inbound;
133 struct {
134 /*
135 * Indicates whether or not a notification addressed to
136 * this client was dropped because a command reply was
137 * already buffered.
138 *
139 * A notification is dropped whenever the buffer is not
140 * empty.
141 */
142 bool dropped_notification;
143 /*
144 * Indicates whether or not a command reply is already
145 * buffered. In this case, it means that the client is
146 * not consuming command replies before emitting a new
147 * one. This could be caused by a protocol error or a
148 * misbehaving/malicious client.
149 */
150 bool queued_command_reply;
151 struct lttng_dynamic_buffer buffer;
152 } outbound;
153 } communication;
154 /* call_rcu delayed reclaim. */
155 struct rcu_head rcu_node;
156 };
157
158 struct channel_state_sample {
159 struct channel_key key;
160 struct cds_lfht_node channel_state_ht_node;
161 uint64_t highest_usage;
162 uint64_t lowest_usage;
163 /* call_rcu delayed reclaim. */
164 struct rcu_head rcu_node;
165 };
166
167 static unsigned long hash_channel_key(struct channel_key *key);
168 static int evaluate_condition(struct lttng_condition *condition,
169 struct lttng_evaluation **evaluation,
170 struct notification_thread_state *state,
171 struct channel_state_sample *previous_sample,
172 struct channel_state_sample *latest_sample,
173 uint64_t buffer_capacity);
174 static
175 int send_evaluation_to_clients(struct lttng_trigger *trigger,
176 struct lttng_evaluation *evaluation,
177 struct notification_client_list *client_list,
178 struct notification_thread_state *state,
179 uid_t channel_uid, gid_t channel_gid);
180
181 static
182 int match_client(struct cds_lfht_node *node, const void *key)
183 {
184 /* This double-cast is intended to supress pointer-to-cast warning. */
185 int socket = (int) (intptr_t) key;
186 struct notification_client *client;
187
188 client = caa_container_of(node, struct notification_client,
189 client_socket_ht_node);
190
191 return !!(client->socket == socket);
192 }
193
194 static
195 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
196 {
197 struct channel_key *channel_key = (struct channel_key *) key;
198 struct lttng_channel_trigger_list *trigger_list;
199
200 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
201 channel_triggers_ht_node);
202
203 return !!((channel_key->key == trigger_list->channel_key.key) &&
204 (channel_key->domain == trigger_list->channel_key.domain));
205 }
206
207 static
208 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
209 {
210 struct channel_key *channel_key = (struct channel_key *) key;
211 struct channel_state_sample *sample;
212
213 sample = caa_container_of(node, struct channel_state_sample,
214 channel_state_ht_node);
215
216 return !!((channel_key->key == sample->key.key) &&
217 (channel_key->domain == sample->key.domain));
218 }
219
220 static
221 int match_channel_info(struct cds_lfht_node *node, const void *key)
222 {
223 struct channel_key *channel_key = (struct channel_key *) key;
224 struct channel_info *channel_info;
225
226 channel_info = caa_container_of(node, struct channel_info,
227 channels_ht_node);
228
229 return !!((channel_key->key == channel_info->key.key) &&
230 (channel_key->domain == channel_info->key.domain));
231 }
232
233 static
234 int match_condition(struct cds_lfht_node *node, const void *key)
235 {
236 struct lttng_condition *condition_key = (struct lttng_condition *) key;
237 struct lttng_trigger_ht_element *trigger;
238 struct lttng_condition *condition;
239
240 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
241 node);
242 condition = lttng_trigger_get_condition(trigger->trigger);
243 assert(condition);
244
245 return !!lttng_condition_is_equal(condition_key, condition);
246 }
247
248 static
249 int match_client_list(struct cds_lfht_node *node, const void *key)
250 {
251 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
252 struct notification_client_list *client_list;
253 struct lttng_condition *condition;
254 struct lttng_condition *condition_key = lttng_trigger_get_condition(
255 trigger_key);
256
257 assert(condition_key);
258
259 client_list = caa_container_of(node, struct notification_client_list,
260 notification_trigger_ht_node);
261 condition = lttng_trigger_get_condition(client_list->trigger);
262
263 return !!lttng_condition_is_equal(condition_key, condition);
264 }
265
266 static
267 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
268 {
269 struct lttng_condition *condition_key = (struct lttng_condition *) key;
270 struct notification_client_list *client_list;
271 struct lttng_condition *condition;
272
273 assert(condition_key);
274
275 client_list = caa_container_of(node, struct notification_client_list,
276 notification_trigger_ht_node);
277 condition = lttng_trigger_get_condition(client_list->trigger);
278
279 return !!lttng_condition_is_equal(condition_key, condition);
280 }
281
282 static
283 unsigned long lttng_condition_buffer_usage_hash(
284 struct lttng_condition *_condition)
285 {
286 unsigned long hash = 0;
287 struct lttng_condition_buffer_usage *condition;
288
289 condition = container_of(_condition,
290 struct lttng_condition_buffer_usage, parent);
291
292 if (condition->session_name) {
293 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
294 }
295 if (condition->channel_name) {
296 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
297 }
298 if (condition->domain.set) {
299 hash ^= hash_key_ulong(
300 (void *) condition->domain.type,
301 lttng_ht_seed);
302 }
303 if (condition->threshold_ratio.set) {
304 uint64_t val;
305
306 val = condition->threshold_ratio.value * (double) UINT32_MAX;
307 hash ^= hash_key_u64(&val, lttng_ht_seed);
308 } else if (condition->threshold_bytes.set) {
309 uint64_t val;
310
311 val = condition->threshold_bytes.value;
312 hash ^= hash_key_u64(&val, lttng_ht_seed);
313 }
314 return hash;
315 }
316
317 /*
318 * The lttng_condition hashing code is kept in this file (rather than
319 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
320 * don't want to link in liblttng-ctl.
321 */
322 static
323 unsigned long lttng_condition_hash(struct lttng_condition *condition)
324 {
325 switch (condition->type) {
326 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
327 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
328 return lttng_condition_buffer_usage_hash(condition);
329 default:
330 ERR("[notification-thread] Unexpected condition type caught");
331 abort();
332 }
333 }
334
335 static
336 unsigned long hash_channel_key(struct channel_key *key)
337 {
338 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
339 unsigned long domain_hash = hash_key_ulong(
340 (void *) (unsigned long) key->domain, lttng_ht_seed);
341
342 return key_hash ^ domain_hash;
343 }
344
345 static
346 void free_channel_info_rcu(struct rcu_head *node)
347 {
348 free(caa_container_of(node, struct channel_info, rcu_node));
349 }
350
351 static
352 void channel_info_destroy(struct channel_info *channel_info)
353 {
354 if (!channel_info) {
355 return;
356 }
357
358 if (channel_info->session_name) {
359 free(channel_info->session_name);
360 }
361 if (channel_info->channel_name) {
362 free(channel_info->channel_name);
363 }
364 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
365 }
366
367 static
368 struct channel_info *channel_info_copy(struct channel_info *channel_info)
369 {
370 struct channel_info *copy = zmalloc(sizeof(*channel_info));
371
372 assert(channel_info);
373 assert(channel_info->session_name);
374 assert(channel_info->channel_name);
375
376 if (!copy) {
377 goto end;
378 }
379
380 memcpy(copy, channel_info, sizeof(*channel_info));
381 copy->session_name = NULL;
382 copy->channel_name = NULL;
383
384 copy->session_name = strdup(channel_info->session_name);
385 if (!copy->session_name) {
386 goto error;
387 }
388 copy->channel_name = strdup(channel_info->channel_name);
389 if (!copy->channel_name) {
390 goto error;
391 }
392 cds_lfht_node_init(&channel_info->channels_ht_node);
393 end:
394 return copy;
395 error:
396 channel_info_destroy(copy);
397 return NULL;
398 }
399
400 /* This function must be called with the RCU read lock held. */
401 static
402 int evaluate_condition_for_client(struct lttng_trigger *trigger,
403 struct lttng_condition *condition,
404 struct notification_client *client,
405 struct notification_thread_state *state)
406 {
407 int ret;
408 struct cds_lfht_iter iter;
409 struct cds_lfht_node *node;
410 struct channel_info *channel_info = NULL;
411 struct channel_key *channel_key = NULL;
412 struct channel_state_sample *last_sample = NULL;
413 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
414 struct lttng_evaluation *evaluation = NULL;
415 struct notification_client_list client_list = { 0 };
416 struct notification_client_list_element client_list_element = { 0 };
417
418 assert(trigger);
419 assert(condition);
420 assert(client);
421 assert(state);
422
423 /* Find the channel associated with the trigger. */
424 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
425 channel_trigger_list , channel_triggers_ht_node) {
426 struct lttng_trigger_list_element *element;
427
428 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
429 struct lttng_condition *current_condition =
430 lttng_trigger_get_condition(
431 element->trigger);
432
433 assert(current_condition);
434 if (!lttng_condition_is_equal(condition,
435 current_condition)) {
436 continue;
437 }
438
439 /* Found the trigger, save the channel key. */
440 channel_key = &channel_trigger_list->channel_key;
441 break;
442 }
443 if (channel_key) {
444 /* The channel key was found stop iteration. */
445 break;
446 }
447 }
448
449 if (!channel_key){
450 /* No channel found; normal exit. */
451 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
452 ret = 0;
453 goto end;
454 }
455
456 /* Fetch channel info for the matching channel. */
457 cds_lfht_lookup(state->channels_ht,
458 hash_channel_key(channel_key),
459 match_channel_info,
460 channel_key,
461 &iter);
462 node = cds_lfht_iter_get_node(&iter);
463 assert(node);
464 channel_info = caa_container_of(node, struct channel_info,
465 channels_ht_node);
466
467 /* Retrieve the channel's last sample, if it exists. */
468 cds_lfht_lookup(state->channel_state_ht,
469 hash_channel_key(channel_key),
470 match_channel_state_sample,
471 channel_key,
472 &iter);
473 node = cds_lfht_iter_get_node(&iter);
474 if (node) {
475 last_sample = caa_container_of(node,
476 struct channel_state_sample,
477 channel_state_ht_node);
478 } else {
479 /* Nothing to evaluate, no sample was ever taken. Normal exit */
480 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
481 ret = 0;
482 goto end;
483 }
484
485 ret = evaluate_condition(condition, &evaluation, state, NULL,
486 last_sample, channel_info->capacity);
487 if (ret) {
488 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
489 goto end;
490 }
491
492 if (!evaluation) {
493 /* Evaluation yielded nothing. Normal exit. */
494 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
495 ret = 0;
496 goto end;
497 }
498
499 /*
500 * Create a temporary client list with the client currently
501 * subscribing.
502 */
503 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
504 CDS_INIT_LIST_HEAD(&client_list.list);
505 client_list.trigger = trigger;
506
507 CDS_INIT_LIST_HEAD(&client_list_element.node);
508 client_list_element.client = client;
509 cds_list_add(&client_list_element.node, &client_list.list);
510
511 /* Send evaluation result to the newly-subscribed client. */
512 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
513 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
514 state, channel_info->uid, channel_info->gid);
515
516 end:
517 return ret;
518 }
519
520 static
521 int notification_thread_client_subscribe(struct notification_client *client,
522 struct lttng_condition *condition,
523 struct notification_thread_state *state,
524 enum lttng_notification_channel_status *_status)
525 {
526 int ret = 0;
527 struct cds_lfht_iter iter;
528 struct cds_lfht_node *node;
529 struct notification_client_list *client_list;
530 struct lttng_condition_list_element *condition_list_element = NULL;
531 struct notification_client_list_element *client_list_element = NULL;
532 enum lttng_notification_channel_status status =
533 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
534
535 /*
536 * Ensure that the client has not already subscribed to this condition
537 * before.
538 */
539 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
540 if (lttng_condition_is_equal(condition_list_element->condition,
541 condition)) {
542 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
543 goto end;
544 }
545 }
546
547 condition_list_element = zmalloc(sizeof(*condition_list_element));
548 if (!condition_list_element) {
549 ret = -1;
550 goto error;
551 }
552 client_list_element = zmalloc(sizeof(*client_list_element));
553 if (!client_list_element) {
554 ret = -1;
555 goto error;
556 }
557
558 rcu_read_lock();
559
560 /*
561 * Add the newly-subscribed condition to the client's subscription list.
562 */
563 CDS_INIT_LIST_HEAD(&condition_list_element->node);
564 condition_list_element->condition = condition;
565 cds_list_add(&condition_list_element->node, &client->condition_list);
566
567 cds_lfht_lookup(state->notification_trigger_clients_ht,
568 lttng_condition_hash(condition),
569 match_client_list_condition,
570 condition,
571 &iter);
572 node = cds_lfht_iter_get_node(&iter);
573 if (!node) {
574 free(client_list_element);
575 goto end_unlock;
576 }
577
578 client_list = caa_container_of(node, struct notification_client_list,
579 notification_trigger_ht_node);
580 if (evaluate_condition_for_client(client_list->trigger, condition,
581 client, state)) {
582 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
583 ret = -1;
584 free(client_list_element);
585 goto end_unlock;
586 }
587
588 /*
589 * Add the client to the list of clients interested in a given trigger
590 * if a "notification" trigger with a corresponding condition was
591 * added prior.
592 */
593 client_list_element->client = client;
594 CDS_INIT_LIST_HEAD(&client_list_element->node);
595 cds_list_add(&client_list_element->node, &client_list->list);
596 end_unlock:
597 rcu_read_unlock();
598 end:
599 if (_status) {
600 *_status = status;
601 }
602 return ret;
603 error:
604 free(condition_list_element);
605 free(client_list_element);
606 return ret;
607 }
608
609 static
610 int notification_thread_client_unsubscribe(
611 struct notification_client *client,
612 struct lttng_condition *condition,
613 struct notification_thread_state *state,
614 enum lttng_notification_channel_status *_status)
615 {
616 struct cds_lfht_iter iter;
617 struct cds_lfht_node *node;
618 struct notification_client_list *client_list;
619 struct lttng_condition_list_element *condition_list_element,
620 *condition_tmp;
621 struct notification_client_list_element *client_list_element,
622 *client_tmp;
623 bool condition_found = false;
624 enum lttng_notification_channel_status status =
625 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
626
627 /* Remove the condition from the client's condition list. */
628 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
629 &client->condition_list, node) {
630 if (!lttng_condition_is_equal(condition_list_element->condition,
631 condition)) {
632 continue;
633 }
634
635 cds_list_del(&condition_list_element->node);
636 /*
637 * The caller may be iterating on the client's conditions to
638 * tear down a client's connection. In this case, the condition
639 * will be destroyed at the end.
640 */
641 if (condition != condition_list_element->condition) {
642 lttng_condition_destroy(
643 condition_list_element->condition);
644 }
645 free(condition_list_element);
646 condition_found = true;
647 break;
648 }
649
650 if (!condition_found) {
651 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
652 goto end;
653 }
654
655 /*
656 * Remove the client from the list of clients interested the trigger
657 * matching the condition.
658 */
659 rcu_read_lock();
660 cds_lfht_lookup(state->notification_trigger_clients_ht,
661 lttng_condition_hash(condition),
662 match_client_list_condition,
663 condition,
664 &iter);
665 node = cds_lfht_iter_get_node(&iter);
666 if (!node) {
667 goto end_unlock;
668 }
669
670 client_list = caa_container_of(node, struct notification_client_list,
671 notification_trigger_ht_node);
672 cds_list_for_each_entry_safe(client_list_element, client_tmp,
673 &client_list->list, node) {
674 if (client_list_element->client->socket != client->socket) {
675 continue;
676 }
677 cds_list_del(&client_list_element->node);
678 free(client_list_element);
679 break;
680 }
681 end_unlock:
682 rcu_read_unlock();
683 end:
684 lttng_condition_destroy(condition);
685 if (_status) {
686 *_status = status;
687 }
688 return 0;
689 }
690
691 static
692 void free_notification_client_rcu(struct rcu_head *node)
693 {
694 free(caa_container_of(node, struct notification_client, rcu_node));
695 }
696
697 static
698 void notification_client_destroy(struct notification_client *client,
699 struct notification_thread_state *state)
700 {
701 struct lttng_condition_list_element *condition_list_element, *tmp;
702
703 if (!client) {
704 return;
705 }
706
707 /* Release all conditions to which the client was subscribed. */
708 cds_list_for_each_entry_safe(condition_list_element, tmp,
709 &client->condition_list, node) {
710 (void) notification_thread_client_unsubscribe(client,
711 condition_list_element->condition, state, NULL);
712 }
713
714 if (client->socket >= 0) {
715 (void) lttcomm_close_unix_sock(client->socket);
716 }
717 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
718 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
719 call_rcu(&client->rcu_node, free_notification_client_rcu);
720 }
721
722 /*
723 * Call with rcu_read_lock held (and hold for the lifetime of the returned
724 * client pointer).
725 */
726 static
727 struct notification_client *get_client_from_socket(int socket,
728 struct notification_thread_state *state)
729 {
730 struct cds_lfht_iter iter;
731 struct cds_lfht_node *node;
732 struct notification_client *client = NULL;
733
734 cds_lfht_lookup(state->client_socket_ht,
735 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
736 match_client,
737 (void *) (unsigned long) socket,
738 &iter);
739 node = cds_lfht_iter_get_node(&iter);
740 if (!node) {
741 goto end;
742 }
743
744 client = caa_container_of(node, struct notification_client,
745 client_socket_ht_node);
746 end:
747 return client;
748 }
749
750 static
751 bool trigger_applies_to_channel(struct lttng_trigger *trigger,
752 struct channel_info *info)
753 {
754 enum lttng_condition_status status;
755 struct lttng_condition *condition;
756 const char *trigger_session_name = NULL;
757 const char *trigger_channel_name = NULL;
758 enum lttng_domain_type trigger_domain;
759
760 condition = lttng_trigger_get_condition(trigger);
761 if (!condition) {
762 goto fail;
763 }
764
765 switch (lttng_condition_get_type(condition)) {
766 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
767 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
768 break;
769 default:
770 goto fail;
771 }
772
773 status = lttng_condition_buffer_usage_get_domain_type(condition,
774 &trigger_domain);
775 assert(status == LTTNG_CONDITION_STATUS_OK);
776 if (info->key.domain != trigger_domain) {
777 goto fail;
778 }
779
780 status = lttng_condition_buffer_usage_get_session_name(
781 condition, &trigger_session_name);
782 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
783
784 status = lttng_condition_buffer_usage_get_channel_name(
785 condition, &trigger_channel_name);
786 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
787
788 if (strcmp(info->session_name, trigger_session_name)) {
789 goto fail;
790 }
791 if (strcmp(info->channel_name, trigger_channel_name)) {
792 goto fail;
793 }
794
795 return true;
796 fail:
797 return false;
798 }
799
800 static
801 bool trigger_applies_to_client(struct lttng_trigger *trigger,
802 struct notification_client *client)
803 {
804 bool applies = false;
805 struct lttng_condition_list_element *condition_list_element;
806
807 cds_list_for_each_entry(condition_list_element, &client->condition_list,
808 node) {
809 applies = lttng_condition_is_equal(
810 condition_list_element->condition,
811 lttng_trigger_get_condition(trigger));
812 if (applies) {
813 break;
814 }
815 }
816 return applies;
817 }
818
819 static
820 int handle_notification_thread_command_add_channel(
821 struct notification_thread_state *state,
822 struct channel_info *channel_info,
823 enum lttng_error_code *cmd_result)
824 {
825 struct cds_list_head trigger_list;
826 struct channel_info *new_channel_info;
827 struct channel_key *channel_key;
828 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
829 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
830 int trigger_count = 0;
831 struct cds_lfht_iter iter;
832
833 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
834 channel_info->channel_name, channel_info->session_name,
835 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
836
837 CDS_INIT_LIST_HEAD(&trigger_list);
838
839 new_channel_info = channel_info_copy(channel_info);
840 if (!new_channel_info) {
841 goto error;
842 }
843
844 channel_key = &new_channel_info->key;
845
846 rcu_read_lock();
847 /* Build a list of all triggers applying to the new channel. */
848 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
849 node) {
850 struct lttng_trigger_list_element *new_element;
851
852 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
853 channel_info)) {
854 continue;
855 }
856
857 new_element = zmalloc(sizeof(*new_element));
858 if (!new_element) {
859 goto error_unlock;
860 }
861 CDS_INIT_LIST_HEAD(&new_element->node);
862 new_element->trigger = trigger_ht_element->trigger;
863 cds_list_add(&new_element->node, &trigger_list);
864 trigger_count++;
865 }
866
867 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
868 trigger_count);
869 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
870 if (!channel_trigger_list) {
871 goto error_unlock;
872 }
873 channel_trigger_list->channel_key = *channel_key;
874 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
875 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
876 cds_list_splice(&trigger_list, &channel_trigger_list->list);
877
878 /* Add channel to the channel_ht which owns the channel_infos. */
879 cds_lfht_add(state->channels_ht,
880 hash_channel_key(channel_key),
881 &new_channel_info->channels_ht_node);
882 /*
883 * Add the list of triggers associated with this channel to the
884 * channel_triggers_ht.
885 */
886 cds_lfht_add(state->channel_triggers_ht,
887 hash_channel_key(channel_key),
888 &channel_trigger_list->channel_triggers_ht_node);
889 rcu_read_unlock();
890 *cmd_result = LTTNG_OK;
891 return 0;
892 error_unlock:
893 rcu_read_unlock();
894 error:
895 /* Empty trigger list */
896 channel_info_destroy(new_channel_info);
897 return 1;
898 }
899
900 static
901 void free_channel_trigger_list_rcu(struct rcu_head *node)
902 {
903 free(caa_container_of(node, struct lttng_channel_trigger_list,
904 rcu_node));
905 }
906
907 static
908 void free_channel_state_sample_rcu(struct rcu_head *node)
909 {
910 free(caa_container_of(node, struct channel_state_sample,
911 rcu_node));
912 }
913
914 static
915 int handle_notification_thread_command_remove_channel(
916 struct notification_thread_state *state,
917 uint64_t channel_key, enum lttng_domain_type domain,
918 enum lttng_error_code *cmd_result)
919 {
920 struct cds_lfht_node *node;
921 struct cds_lfht_iter iter;
922 struct lttng_channel_trigger_list *trigger_list;
923 struct lttng_trigger_list_element *trigger_list_element, *tmp;
924 struct channel_key key = { .key = channel_key, .domain = domain };
925 struct channel_info *channel_info;
926
927 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
928 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
929
930 rcu_read_lock();
931
932 cds_lfht_lookup(state->channel_triggers_ht,
933 hash_channel_key(&key),
934 match_channel_trigger_list,
935 &key,
936 &iter);
937 node = cds_lfht_iter_get_node(&iter);
938 /*
939 * There is a severe internal error if we are being asked to remove a
940 * channel that doesn't exist.
941 */
942 if (!node) {
943 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
944 goto end;
945 }
946
947 /* Free the list of triggers associated with this channel. */
948 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
949 channel_triggers_ht_node);
950 cds_list_for_each_entry_safe(trigger_list_element, tmp,
951 &trigger_list->list, node) {
952 cds_list_del(&trigger_list_element->node);
953 free(trigger_list_element);
954 }
955 cds_lfht_del(state->channel_triggers_ht, node);
956 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
957
958 /* Free sampled channel state. */
959 cds_lfht_lookup(state->channel_state_ht,
960 hash_channel_key(&key),
961 match_channel_state_sample,
962 &key,
963 &iter);
964 node = cds_lfht_iter_get_node(&iter);
965 /*
966 * This is expected to be NULL if the channel is destroyed before we
967 * received a sample.
968 */
969 if (node) {
970 struct channel_state_sample *sample = caa_container_of(node,
971 struct channel_state_sample,
972 channel_state_ht_node);
973
974 cds_lfht_del(state->channel_state_ht, node);
975 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
976 }
977
978 /* Remove the channel from the channels_ht and free it. */
979 cds_lfht_lookup(state->channels_ht,
980 hash_channel_key(&key),
981 match_channel_info,
982 &key,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 assert(node);
986 channel_info = caa_container_of(node, struct channel_info,
987 channels_ht_node);
988 cds_lfht_del(state->channels_ht, node);
989 channel_info_destroy(channel_info);
990 end:
991 rcu_read_unlock();
992 *cmd_result = LTTNG_OK;
993 return 0;
994 }
995
996 static
997 int condition_is_supported(struct lttng_condition *condition)
998 {
999 int ret;
1000
1001 switch (lttng_condition_get_type(condition)) {
1002 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1003 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1004 {
1005 enum lttng_domain_type domain;
1006
1007 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1008 &domain);
1009 if (ret) {
1010 ret = -1;
1011 goto end;
1012 }
1013
1014 if (domain != LTTNG_DOMAIN_KERNEL) {
1015 ret = 1;
1016 goto end;
1017 }
1018
1019 /*
1020 * Older kernel tracers don't expose the API to monitor their
1021 * buffers. Therefore, we reject triggers that require that
1022 * mechanism to be available to be evaluated.
1023 */
1024 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1025 kernel_tracer_fd);
1026 break;
1027 }
1028 default:
1029 ret = 1;
1030 }
1031 end:
1032 return ret;
1033 }
1034
1035 /*
1036 * FIXME A client's credentials are not checked when registering a trigger, nor
1037 * are they stored alongside with the trigger.
1038 *
1039 * The effects of this are benign since:
1040 * - The client will succeed in registering the trigger, as it is valid,
1041 * - The trigger will, internally, be bound to the channel,
1042 * - The notifications will not be sent since the client's credentials
1043 * are checked against the channel at that moment.
1044 *
1045 * If this function returns a non-zero value, it means something is
1046 * fundamentally and the whole subsystem/thread will be torn down.
1047 *
1048 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1049 * error code.
1050 */
1051 static
1052 int handle_notification_thread_command_register_trigger(
1053 struct notification_thread_state *state,
1054 struct lttng_trigger *trigger,
1055 enum lttng_error_code *cmd_result)
1056 {
1057 int ret = 0;
1058 struct lttng_condition *condition;
1059 struct notification_client *client;
1060 struct notification_client_list *client_list = NULL;
1061 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1062 struct notification_client_list_element *client_list_element, *tmp;
1063 struct cds_lfht_node *node;
1064 struct cds_lfht_iter iter;
1065 struct channel_info *channel;
1066 bool free_trigger = true;
1067
1068 rcu_read_lock();
1069
1070 condition = lttng_trigger_get_condition(trigger);
1071 assert(condition);
1072
1073 ret = condition_is_supported(condition);
1074 if (ret < 0) {
1075 goto error;
1076 } else if (ret == 0) {
1077 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1078 goto error;
1079 } else {
1080 /* Feature is supported, continue. */
1081 ret = 0;
1082 }
1083
1084 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1085 if (!trigger_ht_element) {
1086 ret = -1;
1087 goto error;
1088 }
1089
1090 /* Add trigger to the trigger_ht. */
1091 cds_lfht_node_init(&trigger_ht_element->node);
1092 trigger_ht_element->trigger = trigger;
1093
1094 node = cds_lfht_add_unique(state->triggers_ht,
1095 lttng_condition_hash(condition),
1096 match_condition,
1097 condition,
1098 &trigger_ht_element->node);
1099 if (node != &trigger_ht_element->node) {
1100 /* Not a fatal error, simply report it to the client. */
1101 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1102 goto error_free_ht_element;
1103 }
1104
1105 /*
1106 * Ownership of the trigger and of its wrapper was transfered to
1107 * the triggers_ht.
1108 */
1109 trigger_ht_element = NULL;
1110 free_trigger = false;
1111
1112 /*
1113 * The rest only applies to triggers that have a "notify" action.
1114 * It is not skipped as this is the only action type currently
1115 * supported.
1116 */
1117 client_list = zmalloc(sizeof(*client_list));
1118 if (!client_list) {
1119 ret = -1;
1120 goto error_free_ht_element;
1121 }
1122 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1123 CDS_INIT_LIST_HEAD(&client_list->list);
1124 client_list->trigger = trigger;
1125
1126 /* Build a list of clients to which this new trigger applies. */
1127 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1128 client_socket_ht_node) {
1129 if (!trigger_applies_to_client(trigger, client)) {
1130 continue;
1131 }
1132
1133 client_list_element = zmalloc(sizeof(*client_list_element));
1134 if (!client_list_element) {
1135 ret = -1;
1136 goto error_free_client_list;
1137 }
1138 CDS_INIT_LIST_HEAD(&client_list_element->node);
1139 client_list_element->client = client;
1140 cds_list_add(&client_list_element->node, &client_list->list);
1141 }
1142
1143 cds_lfht_add(state->notification_trigger_clients_ht,
1144 lttng_condition_hash(condition),
1145 &client_list->notification_trigger_ht_node);
1146 /*
1147 * Client list ownership transferred to the
1148 * notification_trigger_clients_ht.
1149 */
1150 client_list = NULL;
1151
1152 /*
1153 * Add the trigger to list of triggers bound to the channels currently
1154 * known.
1155 */
1156 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1157 channels_ht_node) {
1158 struct lttng_trigger_list_element *trigger_list_element;
1159 struct lttng_channel_trigger_list *trigger_list;
1160 struct cds_lfht_iter lookup_iter;
1161
1162 if (!trigger_applies_to_channel(trigger, channel)) {
1163 continue;
1164 }
1165
1166 cds_lfht_lookup(state->channel_triggers_ht,
1167 hash_channel_key(&channel->key),
1168 match_channel_trigger_list,
1169 &channel->key,
1170 &lookup_iter);
1171 node = cds_lfht_iter_get_node(&lookup_iter);
1172 assert(node);
1173 trigger_list = caa_container_of(node,
1174 struct lttng_channel_trigger_list,
1175 channel_triggers_ht_node);
1176
1177 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1178 if (!trigger_list_element) {
1179 ret = -1;
1180 goto error_free_client_list;
1181 }
1182 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1183 trigger_list_element->trigger = trigger;
1184 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1185
1186 /* A trigger can only apply to one channel. */
1187 break;
1188 }
1189
1190 *cmd_result = LTTNG_OK;
1191 error_free_client_list:
1192 if (client_list) {
1193 cds_list_for_each_entry_safe(client_list_element, tmp,
1194 &client_list->list, node) {
1195 free(client_list_element);
1196 }
1197 free(client_list);
1198 }
1199 error_free_ht_element:
1200 free(trigger_ht_element);
1201 error:
1202 if (free_trigger) {
1203 struct lttng_action *action = lttng_trigger_get_action(trigger);
1204
1205 lttng_condition_destroy(condition);
1206 lttng_action_destroy(action);
1207 lttng_trigger_destroy(trigger);
1208 }
1209 rcu_read_unlock();
1210 return ret;
1211 }
1212
1213 static
1214 void free_notification_client_list_rcu(struct rcu_head *node)
1215 {
1216 free(caa_container_of(node, struct notification_client_list,
1217 rcu_node));
1218 }
1219
1220 static
1221 void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
1222 {
1223 free(caa_container_of(node, struct lttng_trigger_ht_element,
1224 rcu_node));
1225 }
1226
1227 static
1228 int handle_notification_thread_command_unregister_trigger(
1229 struct notification_thread_state *state,
1230 struct lttng_trigger *trigger,
1231 enum lttng_error_code *_cmd_reply)
1232 {
1233 struct cds_lfht_iter iter;
1234 struct cds_lfht_node *node, *triggers_ht_node;
1235 struct lttng_channel_trigger_list *trigger_list;
1236 struct notification_client_list *client_list;
1237 struct notification_client_list_element *client_list_element, *tmp;
1238 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1239 struct lttng_condition *condition = lttng_trigger_get_condition(
1240 trigger);
1241 struct lttng_action *action;
1242 enum lttng_error_code cmd_reply;
1243
1244 rcu_read_lock();
1245
1246 cds_lfht_lookup(state->triggers_ht,
1247 lttng_condition_hash(condition),
1248 match_condition,
1249 condition,
1250 &iter);
1251 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1252 if (!triggers_ht_node) {
1253 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1254 goto end;
1255 } else {
1256 cmd_reply = LTTNG_OK;
1257 }
1258
1259 /* Remove trigger from channel_triggers_ht. */
1260 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1261 channel_triggers_ht_node) {
1262 struct lttng_trigger_list_element *trigger_element, *tmp;
1263
1264 cds_list_for_each_entry_safe(trigger_element, tmp,
1265 &trigger_list->list, node) {
1266 struct lttng_condition *current_condition =
1267 lttng_trigger_get_condition(
1268 trigger_element->trigger);
1269
1270 assert(current_condition);
1271 if (!lttng_condition_is_equal(condition,
1272 current_condition)) {
1273 continue;
1274 }
1275
1276 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1277 cds_list_del(&trigger_element->node);
1278 /* A trigger can only appear once per channel */
1279 break;
1280 }
1281 }
1282
1283 /*
1284 * Remove and release the client list from
1285 * notification_trigger_clients_ht.
1286 */
1287 cds_lfht_lookup(state->notification_trigger_clients_ht,
1288 lttng_condition_hash(condition),
1289 match_client_list,
1290 trigger,
1291 &iter);
1292 node = cds_lfht_iter_get_node(&iter);
1293 assert(node);
1294 client_list = caa_container_of(node, struct notification_client_list,
1295 notification_trigger_ht_node);
1296 cds_list_for_each_entry_safe(client_list_element, tmp,
1297 &client_list->list, node) {
1298 free(client_list_element);
1299 }
1300 cds_lfht_del(state->notification_trigger_clients_ht, node);
1301 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
1302
1303 /* Remove trigger from triggers_ht. */
1304 trigger_ht_element = caa_container_of(triggers_ht_node,
1305 struct lttng_trigger_ht_element, node);
1306 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1307
1308 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1309 lttng_condition_destroy(condition);
1310 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1311 lttng_action_destroy(action);
1312 lttng_trigger_destroy(trigger_ht_element->trigger);
1313 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
1314 end:
1315 rcu_read_unlock();
1316 if (_cmd_reply) {
1317 *_cmd_reply = cmd_reply;
1318 }
1319 return 0;
1320 }
1321
1322 /* Returns 0 on success, 1 on exit requested, negative value on error. */
1323 int handle_notification_thread_command(
1324 struct notification_thread_handle *handle,
1325 struct notification_thread_state *state)
1326 {
1327 int ret;
1328 uint64_t counter;
1329 struct notification_thread_command *cmd;
1330
1331 /* Read event_fd to put it back into a quiescent state. */
1332 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
1333 if (ret == -1) {
1334 goto error;
1335 }
1336
1337 pthread_mutex_lock(&handle->cmd_queue.lock);
1338 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1339 struct notification_thread_command, cmd_list_node);
1340 switch (cmd->type) {
1341 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1342 DBG("[notification-thread] Received register trigger command");
1343 ret = handle_notification_thread_command_register_trigger(
1344 state, cmd->parameters.trigger,
1345 &cmd->reply_code);
1346 break;
1347 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1348 DBG("[notification-thread] Received unregister trigger command");
1349 ret = handle_notification_thread_command_unregister_trigger(
1350 state, cmd->parameters.trigger,
1351 &cmd->reply_code);
1352 break;
1353 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1354 DBG("[notification-thread] Received add channel command");
1355 ret = handle_notification_thread_command_add_channel(
1356 state, &cmd->parameters.add_channel,
1357 &cmd->reply_code);
1358 break;
1359 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1360 DBG("[notification-thread] Received remove channel command");
1361 ret = handle_notification_thread_command_remove_channel(
1362 state, cmd->parameters.remove_channel.key,
1363 cmd->parameters.remove_channel.domain,
1364 &cmd->reply_code);
1365 break;
1366 case NOTIFICATION_COMMAND_TYPE_QUIT:
1367 DBG("[notification-thread] Received quit command");
1368 cmd->reply_code = LTTNG_OK;
1369 ret = 1;
1370 goto end;
1371 default:
1372 ERR("[notification-thread] Unknown internal command received");
1373 goto error_unlock;
1374 }
1375
1376 if (ret) {
1377 goto error_unlock;
1378 }
1379 end:
1380 cds_list_del(&cmd->cmd_list_node);
1381 lttng_waiter_wake_up(&cmd->reply_waiter);
1382 pthread_mutex_unlock(&handle->cmd_queue.lock);
1383 return ret;
1384 error_unlock:
1385 /* Wake-up and return a fatal error to the calling thread. */
1386 lttng_waiter_wake_up(&cmd->reply_waiter);
1387 pthread_mutex_unlock(&handle->cmd_queue.lock);
1388 cmd->reply_code = LTTNG_ERR_FATAL;
1389 error:
1390 /* Indicate a fatal error to the caller. */
1391 return -1;
1392 }
1393
1394 static
1395 unsigned long hash_client_socket(int socket)
1396 {
1397 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1398 }
1399
1400 static
1401 int socket_set_non_blocking(int socket)
1402 {
1403 int ret, flags;
1404
1405 /* Set the pipe as non-blocking. */
1406 ret = fcntl(socket, F_GETFL, 0);
1407 if (ret == -1) {
1408 PERROR("fcntl get socket flags");
1409 goto end;
1410 }
1411 flags = ret;
1412
1413 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1414 if (ret == -1) {
1415 PERROR("fcntl set O_NONBLOCK socket flag");
1416 goto end;
1417 }
1418 DBG("Client socket (fd = %i) set as non-blocking", socket);
1419 end:
1420 return ret;
1421 }
1422
1423 static
1424 int client_reset_inbound_state(struct notification_client *client)
1425 {
1426 int ret;
1427
1428 ret = lttng_dynamic_buffer_set_size(
1429 &client->communication.inbound.buffer, 0);
1430 assert(!ret);
1431
1432 client->communication.inbound.bytes_to_receive =
1433 sizeof(struct lttng_notification_channel_message);
1434 client->communication.inbound.msg_type =
1435 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1436 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1437 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1438 ret = lttng_dynamic_buffer_set_size(
1439 &client->communication.inbound.buffer,
1440 client->communication.inbound.bytes_to_receive);
1441 return ret;
1442 }
1443
1444 int handle_notification_thread_client_connect(
1445 struct notification_thread_state *state)
1446 {
1447 int ret;
1448 struct notification_client *client;
1449
1450 DBG("[notification-thread] Handling new notification channel client connection");
1451
1452 client = zmalloc(sizeof(*client));
1453 if (!client) {
1454 /* Fatal error. */
1455 ret = -1;
1456 goto error;
1457 }
1458 CDS_INIT_LIST_HEAD(&client->condition_list);
1459 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1460 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1461 client->communication.inbound.expect_creds = true;
1462 ret = client_reset_inbound_state(client);
1463 if (ret) {
1464 ERR("[notification-thread] Failed to reset client communication's inbound state");
1465 ret = 0;
1466 goto error;
1467 }
1468
1469 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1470 if (ret < 0) {
1471 ERR("[notification-thread] Failed to accept new notification channel client connection");
1472 ret = 0;
1473 goto error;
1474 }
1475
1476 client->socket = ret;
1477
1478 ret = socket_set_non_blocking(client->socket);
1479 if (ret) {
1480 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1481 goto error;
1482 }
1483
1484 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1485 if (ret < 0) {
1486 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1487 ret = 0;
1488 goto error;
1489 }
1490
1491 ret = lttng_poll_add(&state->events, client->socket,
1492 LPOLLIN | LPOLLERR |
1493 LPOLLHUP | LPOLLRDHUP);
1494 if (ret < 0) {
1495 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1496 ret = 0;
1497 goto error;
1498 }
1499 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1500 client->socket);
1501
1502 rcu_read_lock();
1503 cds_lfht_add(state->client_socket_ht,
1504 hash_client_socket(client->socket),
1505 &client->client_socket_ht_node);
1506 rcu_read_unlock();
1507
1508 return ret;
1509 error:
1510 notification_client_destroy(client, state);
1511 return ret;
1512 }
1513
1514 int handle_notification_thread_client_disconnect(
1515 int client_socket,
1516 struct notification_thread_state *state)
1517 {
1518 int ret = 0;
1519 struct notification_client *client;
1520
1521 rcu_read_lock();
1522 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1523 client_socket);
1524 client = get_client_from_socket(client_socket, state);
1525 if (!client) {
1526 /* Internal state corruption, fatal error. */
1527 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1528 client_socket);
1529 ret = -1;
1530 goto end;
1531 }
1532
1533 ret = lttng_poll_del(&state->events, client_socket);
1534 if (ret) {
1535 ERR("[notification-thread] Failed to remove client socket from poll set");
1536 }
1537 cds_lfht_del(state->client_socket_ht,
1538 &client->client_socket_ht_node);
1539 notification_client_destroy(client, state);
1540 end:
1541 rcu_read_unlock();
1542 return ret;
1543 }
1544
1545 int handle_notification_thread_client_disconnect_all(
1546 struct notification_thread_state *state)
1547 {
1548 struct cds_lfht_iter iter;
1549 struct notification_client *client;
1550 bool error_encoutered = false;
1551
1552 rcu_read_lock();
1553 DBG("[notification-thread] Closing all client connections");
1554 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1555 client_socket_ht_node) {
1556 int ret;
1557
1558 ret = handle_notification_thread_client_disconnect(
1559 client->socket, state);
1560 if (ret) {
1561 error_encoutered = true;
1562 }
1563 }
1564 rcu_read_unlock();
1565 return error_encoutered ? 1 : 0;
1566 }
1567
1568 int handle_notification_thread_trigger_unregister_all(
1569 struct notification_thread_state *state)
1570 {
1571 bool error_occurred = false;
1572 struct cds_lfht_iter iter;
1573 struct lttng_trigger_ht_element *trigger_ht_element;
1574
1575 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1576 node) {
1577 int ret = handle_notification_thread_command_unregister_trigger(
1578 state, trigger_ht_element->trigger, NULL);
1579 if (ret) {
1580 error_occurred = true;
1581 }
1582 }
1583 return error_occurred ? -1 : 0;
1584 }
1585
1586 static
1587 int client_flush_outgoing_queue(struct notification_client *client,
1588 struct notification_thread_state *state)
1589 {
1590 ssize_t ret;
1591 size_t to_send_count;
1592
1593 assert(client->communication.outbound.buffer.size != 0);
1594 to_send_count = client->communication.outbound.buffer.size;
1595 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1596 client->socket);
1597
1598 ret = lttcomm_send_unix_sock_non_block(client->socket,
1599 client->communication.outbound.buffer.data,
1600 to_send_count);
1601 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1602 (ret > 0 && ret < to_send_count)) {
1603 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1604 client->socket);
1605 to_send_count -= max(ret, 0);
1606
1607 memcpy(client->communication.outbound.buffer.data,
1608 client->communication.outbound.buffer.data +
1609 client->communication.outbound.buffer.size - to_send_count,
1610 to_send_count);
1611 ret = lttng_dynamic_buffer_set_size(
1612 &client->communication.outbound.buffer,
1613 to_send_count);
1614 if (ret) {
1615 goto error;
1616 }
1617
1618 /*
1619 * We want to be notified whenever there is buffer space
1620 * available to send the rest of the payload.
1621 */
1622 ret = lttng_poll_mod(&state->events, client->socket,
1623 CLIENT_POLL_MASK_IN_OUT);
1624 if (ret) {
1625 goto error;
1626 }
1627 } else if (ret < 0) {
1628 /* Generic error, disconnect the client. */
1629 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1630 client->socket);
1631 ret = handle_notification_thread_client_disconnect(
1632 client->socket, state);
1633 if (ret) {
1634 goto error;
1635 }
1636 } else {
1637 /* No error and flushed the queue completely. */
1638 ret = lttng_dynamic_buffer_set_size(
1639 &client->communication.outbound.buffer, 0);
1640 if (ret) {
1641 goto error;
1642 }
1643 ret = lttng_poll_mod(&state->events, client->socket,
1644 CLIENT_POLL_MASK_IN);
1645 if (ret) {
1646 goto error;
1647 }
1648
1649 client->communication.outbound.queued_command_reply = false;
1650 client->communication.outbound.dropped_notification = false;
1651 }
1652
1653 return 0;
1654 error:
1655 return -1;
1656 }
1657
1658 static
1659 int client_send_command_reply(struct notification_client *client,
1660 struct notification_thread_state *state,
1661 enum lttng_notification_channel_status status)
1662 {
1663 int ret;
1664 struct lttng_notification_channel_command_reply reply = {
1665 .status = (int8_t) status,
1666 };
1667 struct lttng_notification_channel_message msg = {
1668 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1669 .size = sizeof(reply),
1670 };
1671 char buffer[sizeof(msg) + sizeof(reply)];
1672
1673 if (client->communication.outbound.queued_command_reply) {
1674 /* Protocol error. */
1675 goto error;
1676 }
1677
1678 memcpy(buffer, &msg, sizeof(msg));
1679 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1680 DBG("[notification-thread] Send command reply (%i)", (int) status);
1681
1682 /* Enqueue buffer to outgoing queue and flush it. */
1683 ret = lttng_dynamic_buffer_append(
1684 &client->communication.outbound.buffer,
1685 buffer, sizeof(buffer));
1686 if (ret) {
1687 goto error;
1688 }
1689
1690 ret = client_flush_outgoing_queue(client, state);
1691 if (ret) {
1692 goto error;
1693 }
1694
1695 if (client->communication.outbound.buffer.size != 0) {
1696 /* Queue could not be emptied. */
1697 client->communication.outbound.queued_command_reply = true;
1698 }
1699
1700 return 0;
1701 error:
1702 return -1;
1703 }
1704
1705 static
1706 int client_dispatch_message(struct notification_client *client,
1707 struct notification_thread_state *state)
1708 {
1709 int ret = 0;
1710
1711 if (client->communication.inbound.msg_type !=
1712 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1713 client->communication.inbound.msg_type !=
1714 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1715 !client->validated) {
1716 WARN("[notification-thread] client attempted a command before handshake");
1717 ret = -1;
1718 goto end;
1719 }
1720
1721 switch (client->communication.inbound.msg_type) {
1722 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1723 {
1724 /*
1725 * Receiving message header. The function will be called again
1726 * once the rest of the message as been received and can be
1727 * interpreted.
1728 */
1729 const struct lttng_notification_channel_message *msg;
1730
1731 assert(sizeof(*msg) ==
1732 client->communication.inbound.buffer.size);
1733 msg = (const struct lttng_notification_channel_message *)
1734 client->communication.inbound.buffer.data;
1735
1736 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1737 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1738 ret = -1;
1739 goto end;
1740 }
1741
1742 switch (msg->type) {
1743 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1744 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1745 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1746 break;
1747 default:
1748 ret = -1;
1749 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1750 goto end;
1751 }
1752
1753 client->communication.inbound.bytes_to_receive = msg->size;
1754 client->communication.inbound.msg_type =
1755 (enum lttng_notification_channel_message_type) msg->type;
1756 ret = lttng_dynamic_buffer_set_size(
1757 &client->communication.inbound.buffer, msg->size);
1758 if (ret) {
1759 goto end;
1760 }
1761 break;
1762 }
1763 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1764 {
1765 struct lttng_notification_channel_command_handshake *handshake_client;
1766 struct lttng_notification_channel_command_handshake handshake_reply = {
1767 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1768 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1769 };
1770 struct lttng_notification_channel_message msg_header = {
1771 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1772 .size = sizeof(handshake_reply),
1773 };
1774 enum lttng_notification_channel_status status =
1775 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1776 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1777
1778 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1779 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1780 sizeof(handshake_reply));
1781
1782 handshake_client =
1783 (struct lttng_notification_channel_command_handshake *)
1784 client->communication.inbound.buffer.data;
1785 client->major = handshake_client->major;
1786 client->minor = handshake_client->minor;
1787 if (!client->communication.inbound.creds_received) {
1788 ERR("[notification-thread] No credentials received from client");
1789 ret = -1;
1790 goto end;
1791 }
1792
1793 client->uid = LTTNG_SOCK_GET_UID_CRED(
1794 &client->communication.inbound.creds);
1795 client->gid = LTTNG_SOCK_GET_GID_CRED(
1796 &client->communication.inbound.creds);
1797 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1798 client->uid, client->gid, (int) client->major,
1799 (int) client->minor);
1800
1801 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1802 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1803 }
1804
1805 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1806 send_buffer, sizeof(send_buffer));
1807 if (ret) {
1808 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1809 goto end;
1810 }
1811
1812 ret = client_flush_outgoing_queue(client, state);
1813 if (ret) {
1814 goto end;
1815 }
1816
1817 ret = client_send_command_reply(client, state, status);
1818 if (ret) {
1819 ERR("[notification-thread] Failed to send reply to notification channel client");
1820 goto end;
1821 }
1822
1823 /* Set reception state to receive the next message header. */
1824 ret = client_reset_inbound_state(client);
1825 if (ret) {
1826 ERR("[notification-thread] Failed to reset client communication's inbound state");
1827 goto end;
1828 }
1829 client->validated = true;
1830 break;
1831 }
1832 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1833 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1834 {
1835 struct lttng_condition *condition;
1836 enum lttng_notification_channel_status status =
1837 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1838 const struct lttng_buffer_view condition_view =
1839 lttng_buffer_view_from_dynamic_buffer(
1840 &client->communication.inbound.buffer,
1841 0, -1);
1842 size_t expected_condition_size =
1843 client->communication.inbound.buffer.size;
1844
1845 ret = lttng_condition_create_from_buffer(&condition_view,
1846 &condition);
1847 if (ret != expected_condition_size) {
1848 ERR("[notification-thread] Malformed condition received from client");
1849 goto end;
1850 }
1851
1852 if (client->communication.inbound.msg_type ==
1853 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1854 /*
1855 * FIXME The current state should be evaluated on
1856 * subscription.
1857 */
1858 ret = notification_thread_client_subscribe(client,
1859 condition, state, &status);
1860 } else {
1861 ret = notification_thread_client_unsubscribe(client,
1862 condition, state, &status);
1863 }
1864 if (ret) {
1865 goto end;
1866 }
1867
1868 ret = client_send_command_reply(client, state, status);
1869 if (ret) {
1870 ERR("[notification-thread] Failed to send reply to notification channel client");
1871 goto end;
1872 }
1873
1874 /* Set reception state to receive the next message header. */
1875 ret = client_reset_inbound_state(client);
1876 if (ret) {
1877 ERR("[notification-thread] Failed to reset client communication's inbound state");
1878 goto end;
1879 }
1880 break;
1881 }
1882 default:
1883 abort();
1884 }
1885 end:
1886 return ret;
1887 }
1888
1889 /* Incoming data from client. */
1890 int handle_notification_thread_client_in(
1891 struct notification_thread_state *state, int socket)
1892 {
1893 int ret = 0;
1894 struct notification_client *client;
1895 ssize_t recv_ret;
1896 size_t offset;
1897
1898 client = get_client_from_socket(socket, state);
1899 if (!client) {
1900 /* Internal error, abort. */
1901 ret = -1;
1902 goto end;
1903 }
1904
1905 offset = client->communication.inbound.buffer.size -
1906 client->communication.inbound.bytes_to_receive;
1907 if (client->communication.inbound.expect_creds) {
1908 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1909 client->communication.inbound.buffer.data + offset,
1910 client->communication.inbound.bytes_to_receive,
1911 &client->communication.inbound.creds);
1912 if (recv_ret > 0) {
1913 client->communication.inbound.expect_creds = false;
1914 client->communication.inbound.creds_received = true;
1915 }
1916 } else {
1917 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1918 client->communication.inbound.buffer.data + offset,
1919 client->communication.inbound.bytes_to_receive);
1920 }
1921 if (recv_ret < 0) {
1922 goto error_disconnect_client;
1923 }
1924
1925 client->communication.inbound.bytes_to_receive -= recv_ret;
1926 if (client->communication.inbound.bytes_to_receive == 0) {
1927 ret = client_dispatch_message(client, state);
1928 if (ret) {
1929 /*
1930 * Only returns an error if this client must be
1931 * disconnected.
1932 */
1933 goto error_disconnect_client;
1934 }
1935 } else {
1936 goto end;
1937 }
1938 end:
1939 return ret;
1940 error_disconnect_client:
1941 ret = handle_notification_thread_client_disconnect(socket, state);
1942 return ret;
1943 }
1944
1945 /* Client ready to receive outgoing data. */
1946 int handle_notification_thread_client_out(
1947 struct notification_thread_state *state, int socket)
1948 {
1949 int ret;
1950 struct notification_client *client;
1951
1952 client = get_client_from_socket(socket, state);
1953 if (!client) {
1954 /* Internal error, abort. */
1955 ret = -1;
1956 goto end;
1957 }
1958
1959 ret = client_flush_outgoing_queue(client, state);
1960 if (ret) {
1961 goto end;
1962 }
1963 end:
1964 return ret;
1965 }
1966
1967 static
1968 bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1969 struct channel_state_sample *sample, uint64_t buffer_capacity)
1970 {
1971 bool result = false;
1972 uint64_t threshold;
1973 enum lttng_condition_type condition_type;
1974 struct lttng_condition_buffer_usage *use_condition = container_of(
1975 condition, struct lttng_condition_buffer_usage,
1976 parent);
1977
1978 if (!sample) {
1979 goto end;
1980 }
1981
1982 if (use_condition->threshold_bytes.set) {
1983 threshold = use_condition->threshold_bytes.value;
1984 } else {
1985 /*
1986 * Threshold was expressed as a ratio.
1987 *
1988 * TODO the threshold (in bytes) of conditions expressed
1989 * as a ratio of total buffer size could be cached to
1990 * forego this double-multiplication or it could be performed
1991 * as fixed-point math.
1992 *
1993 * Note that caching should accomodate the case where the
1994 * condition applies to multiple channels (i.e. don't assume
1995 * that all channels matching my_chann* have the same size...)
1996 */
1997 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1998 (double) buffer_capacity);
1999 }
2000
2001 condition_type = lttng_condition_get_type(condition);
2002 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2003 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2004 threshold, sample->highest_usage);
2005
2006 /*
2007 * The low condition should only be triggered once _all_ of the
2008 * streams in a channel have gone below the "low" threshold.
2009 */
2010 if (sample->highest_usage <= threshold) {
2011 result = true;
2012 }
2013 } else {
2014 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2015 threshold, sample->highest_usage);
2016
2017 /*
2018 * For high buffer usage scenarios, we want to trigger whenever
2019 * _any_ of the streams has reached the "high" threshold.
2020 */
2021 if (sample->highest_usage >= threshold) {
2022 result = true;
2023 }
2024 }
2025 end:
2026 return result;
2027 }
2028
2029 static
2030 int evaluate_condition(struct lttng_condition *condition,
2031 struct lttng_evaluation **evaluation,
2032 struct notification_thread_state *state,
2033 struct channel_state_sample *previous_sample,
2034 struct channel_state_sample *latest_sample,
2035 uint64_t buffer_capacity)
2036 {
2037 int ret = 0;
2038 enum lttng_condition_type condition_type;
2039 bool previous_sample_result;
2040 bool latest_sample_result;
2041
2042 condition_type = lttng_condition_get_type(condition);
2043 /* No other condition type supported for the moment. */
2044 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2045 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2046
2047 previous_sample_result = evaluate_buffer_usage_condition(condition,
2048 previous_sample, buffer_capacity);
2049 latest_sample_result = evaluate_buffer_usage_condition(condition,
2050 latest_sample, buffer_capacity);
2051
2052 if (!latest_sample_result ||
2053 (previous_sample_result == latest_sample_result)) {
2054 /*
2055 * Only trigger on a condition evaluation transition.
2056 *
2057 * NOTE: This edge-triggered logic may not be appropriate for
2058 * future condition types.
2059 */
2060 goto end;
2061 }
2062
2063 if (evaluation && latest_sample_result) {
2064 *evaluation = lttng_evaluation_buffer_usage_create(
2065 condition_type,
2066 latest_sample->highest_usage,
2067 buffer_capacity);
2068 if (!*evaluation) {
2069 ret = -1;
2070 goto end;
2071 }
2072 }
2073 end:
2074 return ret;
2075 }
2076
2077 static
2078 int client_enqueue_dropped_notification(struct notification_client *client,
2079 struct notification_thread_state *state)
2080 {
2081 int ret;
2082 struct lttng_notification_channel_message msg = {
2083 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2084 .size = 0,
2085 };
2086
2087 ret = lttng_dynamic_buffer_append(
2088 &client->communication.outbound.buffer, &msg,
2089 sizeof(msg));
2090 return ret;
2091 }
2092
2093 static
2094 int send_evaluation_to_clients(struct lttng_trigger *trigger,
2095 struct lttng_evaluation *evaluation,
2096 struct notification_client_list* client_list,
2097 struct notification_thread_state *state,
2098 uid_t channel_uid, gid_t channel_gid)
2099 {
2100 int ret = 0;
2101 struct lttng_dynamic_buffer msg_buffer;
2102 struct notification_client_list_element *client_list_element, *tmp;
2103 struct lttng_notification *notification;
2104 struct lttng_condition *condition;
2105 ssize_t expected_notification_size, notification_size;
2106 struct lttng_notification_channel_message msg;
2107
2108 lttng_dynamic_buffer_init(&msg_buffer);
2109
2110 condition = lttng_trigger_get_condition(trigger);
2111 assert(condition);
2112
2113 notification = lttng_notification_create(condition, evaluation);
2114 if (!notification) {
2115 ret = -1;
2116 goto end;
2117 }
2118
2119 expected_notification_size = lttng_notification_serialize(notification,
2120 NULL);
2121 if (expected_notification_size < 0) {
2122 ERR("[notification-thread] Failed to get size of serialized notification");
2123 ret = -1;
2124 goto end;
2125 }
2126
2127 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2128 msg.size = (uint32_t) expected_notification_size;
2129 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2130 if (ret) {
2131 goto end;
2132 }
2133
2134 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2135 msg_buffer.size + expected_notification_size);
2136 if (ret) {
2137 goto end;
2138 }
2139
2140 notification_size = lttng_notification_serialize(notification,
2141 msg_buffer.data + sizeof(msg));
2142 if (notification_size != expected_notification_size) {
2143 ERR("[notification-thread] Failed to serialize notification");
2144 ret = -1;
2145 goto end;
2146 }
2147
2148 cds_list_for_each_entry_safe(client_list_element, tmp,
2149 &client_list->list, node) {
2150 struct notification_client *client =
2151 client_list_element->client;
2152
2153 if (client->uid != channel_uid && client->gid != channel_gid &&
2154 client->uid != 0) {
2155 /* Client is not allowed to monitor this channel. */
2156 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2157 continue;
2158 }
2159
2160 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2161 client->socket, msg_buffer.size);
2162 if (client->communication.outbound.buffer.size) {
2163 /*
2164 * Outgoing data is already buffered for this client;
2165 * drop the notification and enqueue a "dropped
2166 * notification" message if this is the first dropped
2167 * notification since the socket spilled-over to the
2168 * queue.
2169 */
2170 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2171 client->socket);
2172 if (!client->communication.outbound.dropped_notification) {
2173 client->communication.outbound.dropped_notification = true;
2174 ret = client_enqueue_dropped_notification(
2175 client, state);
2176 if (ret) {
2177 goto end;
2178 }
2179 }
2180 continue;
2181 }
2182
2183 ret = lttng_dynamic_buffer_append_buffer(
2184 &client->communication.outbound.buffer,
2185 &msg_buffer);
2186 if (ret) {
2187 goto end;
2188 }
2189
2190 ret = client_flush_outgoing_queue(client, state);
2191 if (ret) {
2192 goto end;
2193 }
2194 }
2195 ret = 0;
2196 end:
2197 lttng_notification_destroy(notification);
2198 lttng_dynamic_buffer_reset(&msg_buffer);
2199 return ret;
2200 }
2201
2202 int handle_notification_thread_channel_sample(
2203 struct notification_thread_state *state, int pipe,
2204 enum lttng_domain_type domain)
2205 {
2206 int ret = 0;
2207 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2208 struct channel_state_sample previous_sample, latest_sample;
2209 struct channel_info *channel_info;
2210 struct cds_lfht_node *node;
2211 struct cds_lfht_iter iter;
2212 struct lttng_channel_trigger_list *trigger_list;
2213 struct lttng_trigger_list_element *trigger_list_element;
2214 bool previous_sample_available = false;
2215
2216 /*
2217 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2218 * ensuring that read/write of sampling messages are atomic.
2219 */
2220 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2221 if (ret != sizeof(sample_msg)) {
2222 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2223 pipe);
2224 ret = -1;
2225 goto end;
2226 }
2227
2228 ret = 0;
2229 latest_sample.key.key = sample_msg.key;
2230 latest_sample.key.domain = domain;
2231 latest_sample.highest_usage = sample_msg.highest;
2232 latest_sample.lowest_usage = sample_msg.lowest;
2233
2234 rcu_read_lock();
2235
2236 /* Retrieve the channel's informations */
2237 cds_lfht_lookup(state->channels_ht,
2238 hash_channel_key(&latest_sample.key),
2239 match_channel_info,
2240 &latest_sample.key,
2241 &iter);
2242 node = cds_lfht_iter_get_node(&iter);
2243 if (!node) {
2244 /*
2245 * Not an error since the consumer can push a sample to the pipe
2246 * and the rest of the session daemon could notify us of the
2247 * channel's destruction before we get a chance to process that
2248 * sample.
2249 */
2250 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2251 latest_sample.key.key,
2252 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2253 "user space");
2254 goto end_unlock;
2255 }
2256 channel_info = caa_container_of(node, struct channel_info,
2257 channels_ht_node);
2258 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2259 channel_info->channel_name,
2260 latest_sample.key.key,
2261 channel_info->session_name,
2262 latest_sample.highest_usage,
2263 latest_sample.lowest_usage);
2264
2265 /* Retrieve the channel's last sample, if it exists, and update it. */
2266 cds_lfht_lookup(state->channel_state_ht,
2267 hash_channel_key(&latest_sample.key),
2268 match_channel_state_sample,
2269 &latest_sample.key,
2270 &iter);
2271 node = cds_lfht_iter_get_node(&iter);
2272 if (node) {
2273 struct channel_state_sample *stored_sample;
2274
2275 /* Update the sample stored. */
2276 stored_sample = caa_container_of(node,
2277 struct channel_state_sample,
2278 channel_state_ht_node);
2279 memcpy(&previous_sample, stored_sample,
2280 sizeof(previous_sample));
2281 stored_sample->highest_usage = latest_sample.highest_usage;
2282 stored_sample->lowest_usage = latest_sample.lowest_usage;
2283 previous_sample_available = true;
2284 } else {
2285 /*
2286 * This is the channel's first sample, allocate space for and
2287 * store the new sample.
2288 */
2289 struct channel_state_sample *stored_sample;
2290
2291 stored_sample = zmalloc(sizeof(*stored_sample));
2292 if (!stored_sample) {
2293 ret = -1;
2294 goto end_unlock;
2295 }
2296
2297 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2298 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2299 cds_lfht_add(state->channel_state_ht,
2300 hash_channel_key(&stored_sample->key),
2301 &stored_sample->channel_state_ht_node);
2302 }
2303
2304 /* Find triggers associated with this channel. */
2305 cds_lfht_lookup(state->channel_triggers_ht,
2306 hash_channel_key(&latest_sample.key),
2307 match_channel_trigger_list,
2308 &latest_sample.key,
2309 &iter);
2310 node = cds_lfht_iter_get_node(&iter);
2311 if (!node) {
2312 goto end_unlock;
2313 }
2314
2315 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2316 channel_triggers_ht_node);
2317 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2318 node) {
2319 struct lttng_condition *condition;
2320 struct lttng_action *action;
2321 struct lttng_trigger *trigger;
2322 struct notification_client_list *client_list;
2323 struct lttng_evaluation *evaluation = NULL;
2324
2325 trigger = trigger_list_element->trigger;
2326 condition = lttng_trigger_get_condition(trigger);
2327 assert(condition);
2328 action = lttng_trigger_get_action(trigger);
2329
2330 /* Notify actions are the only type currently supported. */
2331 assert(lttng_action_get_type(action) ==
2332 LTTNG_ACTION_TYPE_NOTIFY);
2333
2334 /*
2335 * Check if any client is subscribed to the result of this
2336 * evaluation.
2337 */
2338 cds_lfht_lookup(state->notification_trigger_clients_ht,
2339 lttng_condition_hash(condition),
2340 match_client_list,
2341 trigger,
2342 &iter);
2343 node = cds_lfht_iter_get_node(&iter);
2344 assert(node);
2345
2346 client_list = caa_container_of(node,
2347 struct notification_client_list,
2348 notification_trigger_ht_node);
2349 if (cds_list_empty(&client_list->list)) {
2350 /*
2351 * No clients interested in the evaluation's result,
2352 * skip it.
2353 */
2354 continue;
2355 }
2356
2357 ret = evaluate_condition(condition, &evaluation, state,
2358 previous_sample_available ? &previous_sample : NULL,
2359 &latest_sample, channel_info->capacity);
2360 if (ret) {
2361 goto end_unlock;
2362 }
2363
2364 if (!evaluation) {
2365 continue;
2366 }
2367
2368 /* Dispatch evaluation result to all clients. */
2369 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2370 evaluation, client_list, state,
2371 channel_info->uid, channel_info->gid);
2372 if (ret) {
2373 goto end_unlock;
2374 }
2375 }
2376 end_unlock:
2377 rcu_read_unlock();
2378 end:
2379 return ret;
2380 }
This page took 0.117598 seconds and 5 git commands to generate.