Fix: do not repurpose iterator while it is being used
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <urcu.h>
20 #include <urcu/rculfhash.h>
21
22 #include <common/defaults.h>
23 #include <common/error.h>
24 #include <common/futex.h>
25 #include <common/unix.h>
26 #include <common/dynamic-buffer.h>
27 #include <common/hashtable/utils.h>
28 #include <common/sessiond-comm/sessiond-comm.h>
29 #include <common/macros.h>
30 #include <lttng/condition/condition.h>
31 #include <lttng/action/action.h>
32 #include <lttng/notification/notification-internal.h>
33 #include <lttng/condition/condition-internal.h>
34 #include <lttng/condition/buffer-usage-internal.h>
35 #include <lttng/notification/channel-internal.h>
36
37 #include <time.h>
38 #include <unistd.h>
39 #include <assert.h>
40 #include <inttypes.h>
41 #include <fcntl.h>
42
43 #include "notification-thread.h"
44 #include "notification-thread-events.h"
45 #include "notification-thread-commands.h"
46 #include "lttng-sessiond.h"
47 #include "kernel.h"
48
49 #define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50 #define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52 struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55 };
56
57 struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61 };
62
63 struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66 };
67
68 struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71 };
72
73 struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76 };
77
78 struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82 };
83
84 struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
91 * Indicates if the credentials and versions of the client have been
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
118 bool expect_creds;
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
124 /* Only used during credentials reception. */
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148 };
149
150 struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155 };
156
157 static unsigned long hash_channel_key(struct channel_key *key);
158 static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164 static
165 int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
171 static
172 int match_client(struct cds_lfht_node *node, const void *key)
173 {
174 /* This double-cast is intended to supress pointer-to-cast warning. */
175 int socket = (int) (intptr_t) key;
176 struct notification_client *client;
177
178 client = caa_container_of(node, struct notification_client,
179 client_socket_ht_node);
180
181 return !!(client->socket == socket);
182 }
183
184 static
185 int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
186 {
187 struct channel_key *channel_key = (struct channel_key *) key;
188 struct lttng_channel_trigger_list *trigger_list;
189
190 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
191 channel_triggers_ht_node);
192
193 return !!((channel_key->key == trigger_list->channel_key.key) &&
194 (channel_key->domain == trigger_list->channel_key.domain));
195 }
196
197 static
198 int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
199 {
200 struct channel_key *channel_key = (struct channel_key *) key;
201 struct channel_state_sample *sample;
202
203 sample = caa_container_of(node, struct channel_state_sample,
204 channel_state_ht_node);
205
206 return !!((channel_key->key == sample->key.key) &&
207 (channel_key->domain == sample->key.domain));
208 }
209
210 static
211 int match_channel_info(struct cds_lfht_node *node, const void *key)
212 {
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct channel_info *channel_info;
215
216 channel_info = caa_container_of(node, struct channel_info,
217 channels_ht_node);
218
219 return !!((channel_key->key == channel_info->key.key) &&
220 (channel_key->domain == channel_info->key.domain));
221 }
222
223 static
224 int match_condition(struct cds_lfht_node *node, const void *key)
225 {
226 struct lttng_condition *condition_key = (struct lttng_condition *) key;
227 struct lttng_trigger_ht_element *trigger;
228 struct lttng_condition *condition;
229
230 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
231 node);
232 condition = lttng_trigger_get_condition(trigger->trigger);
233 assert(condition);
234
235 return !!lttng_condition_is_equal(condition_key, condition);
236 }
237
238 static
239 int match_client_list(struct cds_lfht_node *node, const void *key)
240 {
241 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
242 struct notification_client_list *client_list;
243 struct lttng_condition *condition;
244 struct lttng_condition *condition_key = lttng_trigger_get_condition(
245 trigger_key);
246
247 assert(condition_key);
248
249 client_list = caa_container_of(node, struct notification_client_list,
250 notification_trigger_ht_node);
251 condition = lttng_trigger_get_condition(client_list->trigger);
252
253 return !!lttng_condition_is_equal(condition_key, condition);
254 }
255
256 static
257 int match_client_list_condition(struct cds_lfht_node *node, const void *key)
258 {
259 struct lttng_condition *condition_key = (struct lttng_condition *) key;
260 struct notification_client_list *client_list;
261 struct lttng_condition *condition;
262
263 assert(condition_key);
264
265 client_list = caa_container_of(node, struct notification_client_list,
266 notification_trigger_ht_node);
267 condition = lttng_trigger_get_condition(client_list->trigger);
268
269 return !!lttng_condition_is_equal(condition_key, condition);
270 }
271
272 static
273 unsigned long lttng_condition_buffer_usage_hash(
274 struct lttng_condition *_condition)
275 {
276 unsigned long hash = 0;
277 struct lttng_condition_buffer_usage *condition;
278
279 condition = container_of(_condition,
280 struct lttng_condition_buffer_usage, parent);
281
282 if (condition->session_name) {
283 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
284 }
285 if (condition->channel_name) {
286 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
287 }
288 if (condition->domain.set) {
289 hash ^= hash_key_ulong(
290 (void *) condition->domain.type,
291 lttng_ht_seed);
292 }
293 if (condition->threshold_ratio.set) {
294 uint64_t val;
295
296 val = condition->threshold_ratio.value * (double) UINT32_MAX;
297 hash ^= hash_key_u64(&val, lttng_ht_seed);
298 } else if (condition->threshold_bytes.set) {
299 uint64_t val;
300
301 val = condition->threshold_bytes.value;
302 hash ^= hash_key_u64(&val, lttng_ht_seed);
303 }
304 return hash;
305 }
306
307 /*
308 * The lttng_condition hashing code is kept in this file (rather than
309 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
310 * don't want to link in liblttng-ctl.
311 */
312 static
313 unsigned long lttng_condition_hash(struct lttng_condition *condition)
314 {
315 switch (condition->type) {
316 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
318 return lttng_condition_buffer_usage_hash(condition);
319 default:
320 ERR("[notification-thread] Unexpected condition type caught");
321 abort();
322 }
323 }
324
325 static
326 unsigned long hash_channel_key(struct channel_key *key)
327 {
328 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
329 unsigned long domain_hash = hash_key_ulong(
330 (void *) (unsigned long) key->domain, lttng_ht_seed);
331
332 return key_hash ^ domain_hash;
333 }
334
335 static
336 void channel_info_destroy(struct channel_info *channel_info)
337 {
338 if (!channel_info) {
339 return;
340 }
341
342 if (channel_info->session_name) {
343 free(channel_info->session_name);
344 }
345 if (channel_info->channel_name) {
346 free(channel_info->channel_name);
347 }
348 free(channel_info);
349 }
350
351 static
352 struct channel_info *channel_info_copy(struct channel_info *channel_info)
353 {
354 struct channel_info *copy = zmalloc(sizeof(*channel_info));
355
356 assert(channel_info);
357 assert(channel_info->session_name);
358 assert(channel_info->channel_name);
359
360 if (!copy) {
361 goto end;
362 }
363
364 memcpy(copy, channel_info, sizeof(*channel_info));
365 copy->session_name = NULL;
366 copy->channel_name = NULL;
367
368 copy->session_name = strdup(channel_info->session_name);
369 if (!copy->session_name) {
370 goto error;
371 }
372 copy->channel_name = strdup(channel_info->channel_name);
373 if (!copy->channel_name) {
374 goto error;
375 }
376 cds_lfht_node_init(&channel_info->channels_ht_node);
377 end:
378 return copy;
379 error:
380 channel_info_destroy(copy);
381 return NULL;
382 }
383
384 /* This function must be called with the RCU read lock held. */
385 static
386 int evaluate_condition_for_client(struct lttng_trigger *trigger,
387 struct lttng_condition *condition,
388 struct notification_client *client,
389 struct notification_thread_state *state)
390 {
391 int ret;
392 struct cds_lfht_iter iter;
393 struct cds_lfht_node *node;
394 struct channel_info *channel_info = NULL;
395 struct channel_key *channel_key = NULL;
396 struct channel_state_sample *last_sample = NULL;
397 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
398 struct lttng_evaluation *evaluation = NULL;
399 struct notification_client_list client_list = { 0 };
400 struct notification_client_list_element client_list_element = { 0 };
401
402 assert(trigger);
403 assert(condition);
404 assert(client);
405 assert(state);
406
407 /* Find the channel associated with the trigger. */
408 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
409 channel_trigger_list , channel_triggers_ht_node) {
410 struct lttng_trigger_list_element *element;
411
412 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
413 struct lttng_condition *current_condition =
414 lttng_trigger_get_condition(
415 element->trigger);
416
417 assert(current_condition);
418 if (!lttng_condition_is_equal(condition,
419 current_condition)) {
420 continue;
421 }
422
423 /* Found the trigger, save the channel key. */
424 channel_key = &channel_trigger_list->channel_key;
425 break;
426 }
427 if (channel_key) {
428 /* The channel key was found stop iteration. */
429 break;
430 }
431 }
432
433 if (!channel_key){
434 /* No channel found; normal exit. */
435 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
436 ret = 0;
437 goto end;
438 }
439
440 /* Fetch channel info for the matching channel. */
441 cds_lfht_lookup(state->channels_ht,
442 hash_channel_key(channel_key),
443 match_channel_info,
444 channel_key,
445 &iter);
446 node = cds_lfht_iter_get_node(&iter);
447 assert(node);
448 channel_info = caa_container_of(node, struct channel_info,
449 channels_ht_node);
450
451 /* Retrieve the channel's last sample, if it exists. */
452 cds_lfht_lookup(state->channel_state_ht,
453 hash_channel_key(channel_key),
454 match_channel_state_sample,
455 channel_key,
456 &iter);
457 node = cds_lfht_iter_get_node(&iter);
458 if (node) {
459 last_sample = caa_container_of(node,
460 struct channel_state_sample,
461 channel_state_ht_node);
462 } else {
463 /* Nothing to evaluate, no sample was ever taken. Normal exit */
464 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
465 ret = 0;
466 goto end;
467 }
468
469 ret = evaluate_condition(condition, &evaluation, state, NULL,
470 last_sample, channel_info->capacity);
471 if (ret) {
472 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
473 goto end;
474 }
475
476 if (!evaluation) {
477 /* Evaluation yielded nothing. Normal exit. */
478 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
479 ret = 0;
480 goto end;
481 }
482
483 /*
484 * Create a temporary client list with the client currently
485 * subscribing.
486 */
487 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
488 CDS_INIT_LIST_HEAD(&client_list.list);
489 client_list.trigger = trigger;
490
491 CDS_INIT_LIST_HEAD(&client_list_element.node);
492 client_list_element.client = client;
493 cds_list_add(&client_list_element.node, &client_list.list);
494
495 /* Send evaluation result to the newly-subscribed client. */
496 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
497 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
498 state, channel_info->uid, channel_info->gid);
499
500 end:
501 return ret;
502 }
503
504 static
505 int notification_thread_client_subscribe(struct notification_client *client,
506 struct lttng_condition *condition,
507 struct notification_thread_state *state,
508 enum lttng_notification_channel_status *_status)
509 {
510 int ret = 0;
511 struct cds_lfht_iter iter;
512 struct cds_lfht_node *node;
513 struct notification_client_list *client_list;
514 struct lttng_condition_list_element *condition_list_element = NULL;
515 struct notification_client_list_element *client_list_element = NULL;
516 enum lttng_notification_channel_status status =
517 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
518
519 /*
520 * Ensure that the client has not already subscribed to this condition
521 * before.
522 */
523 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
524 if (lttng_condition_is_equal(condition_list_element->condition,
525 condition)) {
526 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
527 goto end;
528 }
529 }
530
531 condition_list_element = zmalloc(sizeof(*condition_list_element));
532 if (!condition_list_element) {
533 ret = -1;
534 goto error;
535 }
536 client_list_element = zmalloc(sizeof(*client_list_element));
537 if (!client_list_element) {
538 ret = -1;
539 goto error;
540 }
541
542 rcu_read_lock();
543
544 /*
545 * Add the newly-subscribed condition to the client's subscription list.
546 */
547 CDS_INIT_LIST_HEAD(&condition_list_element->node);
548 condition_list_element->condition = condition;
549 cds_list_add(&condition_list_element->node, &client->condition_list);
550
551 cds_lfht_lookup(state->notification_trigger_clients_ht,
552 lttng_condition_hash(condition),
553 match_client_list_condition,
554 condition,
555 &iter);
556 node = cds_lfht_iter_get_node(&iter);
557 if (!node) {
558 free(client_list_element);
559 goto end_unlock;
560 }
561
562 client_list = caa_container_of(node, struct notification_client_list,
563 notification_trigger_ht_node);
564 if (evaluate_condition_for_client(client_list->trigger, condition,
565 client, state)) {
566 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
567 ret = -1;
568 free(client_list_element);
569 goto end_unlock;
570 }
571
572 /*
573 * Add the client to the list of clients interested in a given trigger
574 * if a "notification" trigger with a corresponding condition was
575 * added prior.
576 */
577 client_list_element->client = client;
578 CDS_INIT_LIST_HEAD(&client_list_element->node);
579 cds_list_add(&client_list_element->node, &client_list->list);
580 end_unlock:
581 rcu_read_unlock();
582 end:
583 if (_status) {
584 *_status = status;
585 }
586 return ret;
587 error:
588 free(condition_list_element);
589 free(client_list_element);
590 return ret;
591 }
592
593 static
594 int notification_thread_client_unsubscribe(
595 struct notification_client *client,
596 struct lttng_condition *condition,
597 struct notification_thread_state *state,
598 enum lttng_notification_channel_status *_status)
599 {
600 struct cds_lfht_iter iter;
601 struct cds_lfht_node *node;
602 struct notification_client_list *client_list;
603 struct lttng_condition_list_element *condition_list_element,
604 *condition_tmp;
605 struct notification_client_list_element *client_list_element,
606 *client_tmp;
607 bool condition_found = false;
608 enum lttng_notification_channel_status status =
609 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
610
611 /* Remove the condition from the client's condition list. */
612 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
613 &client->condition_list, node) {
614 if (!lttng_condition_is_equal(condition_list_element->condition,
615 condition)) {
616 continue;
617 }
618
619 cds_list_del(&condition_list_element->node);
620 /*
621 * The caller may be iterating on the client's conditions to
622 * tear down a client's connection. In this case, the condition
623 * will be destroyed at the end.
624 */
625 if (condition != condition_list_element->condition) {
626 lttng_condition_destroy(
627 condition_list_element->condition);
628 }
629 free(condition_list_element);
630 condition_found = true;
631 break;
632 }
633
634 if (!condition_found) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
636 goto end;
637 }
638
639 /*
640 * Remove the client from the list of clients interested the trigger
641 * matching the condition.
642 */
643 rcu_read_lock();
644 cds_lfht_lookup(state->notification_trigger_clients_ht,
645 lttng_condition_hash(condition),
646 match_client_list_condition,
647 condition,
648 &iter);
649 node = cds_lfht_iter_get_node(&iter);
650 if (!node) {
651 goto end_unlock;
652 }
653
654 client_list = caa_container_of(node, struct notification_client_list,
655 notification_trigger_ht_node);
656 cds_list_for_each_entry_safe(client_list_element, client_tmp,
657 &client_list->list, node) {
658 if (client_list_element->client->socket != client->socket) {
659 continue;
660 }
661 cds_list_del(&client_list_element->node);
662 free(client_list_element);
663 break;
664 }
665 end_unlock:
666 rcu_read_unlock();
667 end:
668 lttng_condition_destroy(condition);
669 if (_status) {
670 *_status = status;
671 }
672 return 0;
673 }
674
675 static
676 void notification_client_destroy(struct notification_client *client,
677 struct notification_thread_state *state)
678 {
679 struct lttng_condition_list_element *condition_list_element, *tmp;
680
681 if (!client) {
682 return;
683 }
684
685 /* Release all conditions to which the client was subscribed. */
686 cds_list_for_each_entry_safe(condition_list_element, tmp,
687 &client->condition_list, node) {
688 (void) notification_thread_client_unsubscribe(client,
689 condition_list_element->condition, state, NULL);
690 }
691
692 if (client->socket >= 0) {
693 (void) lttcomm_close_unix_sock(client->socket);
694 }
695 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
696 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
697 free(client);
698 }
699
700 /*
701 * Call with rcu_read_lock held (and hold for the lifetime of the returned
702 * client pointer).
703 */
704 static
705 struct notification_client *get_client_from_socket(int socket,
706 struct notification_thread_state *state)
707 {
708 struct cds_lfht_iter iter;
709 struct cds_lfht_node *node;
710 struct notification_client *client = NULL;
711
712 cds_lfht_lookup(state->client_socket_ht,
713 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
714 match_client,
715 (void *) (unsigned long) socket,
716 &iter);
717 node = cds_lfht_iter_get_node(&iter);
718 if (!node) {
719 goto end;
720 }
721
722 client = caa_container_of(node, struct notification_client,
723 client_socket_ht_node);
724 end:
725 return client;
726 }
727
728 static
729 bool trigger_applies_to_channel(struct lttng_trigger *trigger,
730 struct channel_info *info)
731 {
732 enum lttng_condition_status status;
733 struct lttng_condition *condition;
734 const char *trigger_session_name = NULL;
735 const char *trigger_channel_name = NULL;
736 enum lttng_domain_type trigger_domain;
737
738 condition = lttng_trigger_get_condition(trigger);
739 if (!condition) {
740 goto fail;
741 }
742
743 switch (lttng_condition_get_type(condition)) {
744 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
745 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
746 break;
747 default:
748 goto fail;
749 }
750
751 status = lttng_condition_buffer_usage_get_domain_type(condition,
752 &trigger_domain);
753 assert(status == LTTNG_CONDITION_STATUS_OK);
754 if (info->key.domain != trigger_domain) {
755 goto fail;
756 }
757
758 status = lttng_condition_buffer_usage_get_session_name(
759 condition, &trigger_session_name);
760 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
761
762 status = lttng_condition_buffer_usage_get_channel_name(
763 condition, &trigger_channel_name);
764 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
765
766 if (strcmp(info->session_name, trigger_session_name)) {
767 goto fail;
768 }
769 if (strcmp(info->channel_name, trigger_channel_name)) {
770 goto fail;
771 }
772
773 return true;
774 fail:
775 return false;
776 }
777
778 static
779 bool trigger_applies_to_client(struct lttng_trigger *trigger,
780 struct notification_client *client)
781 {
782 bool applies = false;
783 struct lttng_condition_list_element *condition_list_element;
784
785 cds_list_for_each_entry(condition_list_element, &client->condition_list,
786 node) {
787 applies = lttng_condition_is_equal(
788 condition_list_element->condition,
789 lttng_trigger_get_condition(trigger));
790 if (applies) {
791 break;
792 }
793 }
794 return applies;
795 }
796
797 static
798 int handle_notification_thread_command_add_channel(
799 struct notification_thread_state *state,
800 struct channel_info *channel_info,
801 enum lttng_error_code *cmd_result)
802 {
803 struct cds_list_head trigger_list;
804 struct channel_info *new_channel_info;
805 struct channel_key *channel_key;
806 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
807 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
808 int trigger_count = 0;
809 struct cds_lfht_iter iter;
810
811 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
812 channel_info->channel_name, channel_info->session_name,
813 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
814
815 CDS_INIT_LIST_HEAD(&trigger_list);
816
817 new_channel_info = channel_info_copy(channel_info);
818 if (!new_channel_info) {
819 goto error;
820 }
821
822 channel_key = &new_channel_info->key;
823
824 /* Build a list of all triggers applying to the new channel. */
825 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
826 node) {
827 struct lttng_trigger_list_element *new_element;
828
829 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
830 channel_info)) {
831 continue;
832 }
833
834 new_element = zmalloc(sizeof(*new_element));
835 if (!new_element) {
836 goto error;
837 }
838 CDS_INIT_LIST_HEAD(&new_element->node);
839 new_element->trigger = trigger_ht_element->trigger;
840 cds_list_add(&new_element->node, &trigger_list);
841 trigger_count++;
842 }
843
844 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
845 trigger_count);
846 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
847 if (!channel_trigger_list) {
848 goto error;
849 }
850 channel_trigger_list->channel_key = *channel_key;
851 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
852 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
853 cds_list_splice(&trigger_list, &channel_trigger_list->list);
854
855 rcu_read_lock();
856 /* Add channel to the channel_ht which owns the channel_infos. */
857 cds_lfht_add(state->channels_ht,
858 hash_channel_key(channel_key),
859 &new_channel_info->channels_ht_node);
860 /*
861 * Add the list of triggers associated with this channel to the
862 * channel_triggers_ht.
863 */
864 cds_lfht_add(state->channel_triggers_ht,
865 hash_channel_key(channel_key),
866 &channel_trigger_list->channel_triggers_ht_node);
867 rcu_read_unlock();
868 *cmd_result = LTTNG_OK;
869 return 0;
870 error:
871 /* Empty trigger list */
872 channel_info_destroy(new_channel_info);
873 return 1;
874 }
875
876 static
877 int handle_notification_thread_command_remove_channel(
878 struct notification_thread_state *state,
879 uint64_t channel_key, enum lttng_domain_type domain,
880 enum lttng_error_code *cmd_result)
881 {
882 struct cds_lfht_node *node;
883 struct cds_lfht_iter iter;
884 struct lttng_channel_trigger_list *trigger_list;
885 struct lttng_trigger_list_element *trigger_list_element, *tmp;
886 struct channel_key key = { .key = channel_key, .domain = domain };
887 struct channel_info *channel_info;
888
889 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
890 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
891
892 rcu_read_lock();
893
894 cds_lfht_lookup(state->channel_triggers_ht,
895 hash_channel_key(&key),
896 match_channel_trigger_list,
897 &key,
898 &iter);
899 node = cds_lfht_iter_get_node(&iter);
900 /*
901 * There is a severe internal error if we are being asked to remove a
902 * channel that doesn't exist.
903 */
904 if (!node) {
905 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
906 goto end;
907 }
908
909 /* Free the list of triggers associated with this channel. */
910 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
911 channel_triggers_ht_node);
912 cds_list_for_each_entry_safe(trigger_list_element, tmp,
913 &trigger_list->list, node) {
914 cds_list_del(&trigger_list_element->node);
915 free(trigger_list_element);
916 }
917 cds_lfht_del(state->channel_triggers_ht, node);
918 free(trigger_list);
919
920 /* Free sampled channel state. */
921 cds_lfht_lookup(state->channel_state_ht,
922 hash_channel_key(&key),
923 match_channel_state_sample,
924 &key,
925 &iter);
926 node = cds_lfht_iter_get_node(&iter);
927 /*
928 * This is expected to be NULL if the channel is destroyed before we
929 * received a sample.
930 */
931 if (node) {
932 struct channel_state_sample *sample = caa_container_of(node,
933 struct channel_state_sample,
934 channel_state_ht_node);
935
936 cds_lfht_del(state->channel_state_ht, node);
937 free(sample);
938 }
939
940 /* Remove the channel from the channels_ht and free it. */
941 cds_lfht_lookup(state->channels_ht,
942 hash_channel_key(&key),
943 match_channel_info,
944 &key,
945 &iter);
946 node = cds_lfht_iter_get_node(&iter);
947 assert(node);
948 channel_info = caa_container_of(node, struct channel_info,
949 channels_ht_node);
950 cds_lfht_del(state->channels_ht, node);
951 channel_info_destroy(channel_info);
952 end:
953 rcu_read_unlock();
954 *cmd_result = LTTNG_OK;
955 return 0;
956 }
957
958 static
959 int condition_is_supported(struct lttng_condition *condition)
960 {
961 int ret;
962
963 switch (lttng_condition_get_type(condition)) {
964 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
965 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
966 {
967 enum lttng_domain_type domain;
968
969 ret = lttng_condition_buffer_usage_get_domain_type(condition,
970 &domain);
971 if (ret) {
972 ret = -1;
973 goto end;
974 }
975
976 if (domain != LTTNG_DOMAIN_KERNEL) {
977 ret = 1;
978 goto end;
979 }
980
981 /*
982 * Older kernel tracers don't expose the API to monitor their
983 * buffers. Therefore, we reject triggers that require that
984 * mechanism to be available to be evaluated.
985 */
986 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
987 kernel_tracer_fd);
988 break;
989 }
990 default:
991 ret = 1;
992 }
993 end:
994 return ret;
995 }
996
997 /*
998 * FIXME A client's credentials are not checked when registering a trigger, nor
999 * are they stored alongside with the trigger.
1000 *
1001 * The effects of this are benign since:
1002 * - The client will succeed in registering the trigger, as it is valid,
1003 * - The trigger will, internally, be bound to the channel,
1004 * - The notifications will not be sent since the client's credentials
1005 * are checked against the channel at that moment.
1006 *
1007 * If this function returns a non-zero value, it means something is
1008 * fundamentally and the whole subsystem/thread will be torn down.
1009 *
1010 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1011 * error code.
1012 */
1013 static
1014 int handle_notification_thread_command_register_trigger(
1015 struct notification_thread_state *state,
1016 struct lttng_trigger *trigger,
1017 enum lttng_error_code *cmd_result)
1018 {
1019 int ret = 0;
1020 struct lttng_condition *condition;
1021 struct notification_client *client;
1022 struct notification_client_list *client_list = NULL;
1023 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1024 struct notification_client_list_element *client_list_element, *tmp;
1025 struct cds_lfht_node *node;
1026 struct cds_lfht_iter iter;
1027 struct channel_info *channel;
1028 bool free_trigger = true;
1029
1030 rcu_read_lock();
1031
1032 condition = lttng_trigger_get_condition(trigger);
1033 assert(condition);
1034
1035 ret = condition_is_supported(condition);
1036 if (ret < 0) {
1037 goto error;
1038 } else if (ret == 0) {
1039 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1040 goto error;
1041 } else {
1042 /* Feature is supported, continue. */
1043 ret = 0;
1044 }
1045
1046 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1047 if (!trigger_ht_element) {
1048 ret = -1;
1049 goto error;
1050 }
1051
1052 /* Add trigger to the trigger_ht. */
1053 cds_lfht_node_init(&trigger_ht_element->node);
1054 trigger_ht_element->trigger = trigger;
1055
1056 node = cds_lfht_add_unique(state->triggers_ht,
1057 lttng_condition_hash(condition),
1058 match_condition,
1059 condition,
1060 &trigger_ht_element->node);
1061 if (node != &trigger_ht_element->node) {
1062 /* Not a fatal error, simply report it to the client. */
1063 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1064 goto error_free_ht_element;
1065 }
1066
1067 /*
1068 * Ownership of the trigger and of its wrapper was transfered to
1069 * the triggers_ht.
1070 */
1071 trigger_ht_element = NULL;
1072 free_trigger = false;
1073
1074 /*
1075 * The rest only applies to triggers that have a "notify" action.
1076 * It is not skipped as this is the only action type currently
1077 * supported.
1078 */
1079 client_list = zmalloc(sizeof(*client_list));
1080 if (!client_list) {
1081 ret = -1;
1082 goto error_free_ht_element;
1083 }
1084 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1085 CDS_INIT_LIST_HEAD(&client_list->list);
1086 client_list->trigger = trigger;
1087
1088 /* Build a list of clients to which this new trigger applies. */
1089 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1090 client_socket_ht_node) {
1091 if (!trigger_applies_to_client(trigger, client)) {
1092 continue;
1093 }
1094
1095 client_list_element = zmalloc(sizeof(*client_list_element));
1096 if (!client_list_element) {
1097 ret = -1;
1098 goto error_free_client_list;
1099 }
1100 CDS_INIT_LIST_HEAD(&client_list_element->node);
1101 client_list_element->client = client;
1102 cds_list_add(&client_list_element->node, &client_list->list);
1103 }
1104
1105 cds_lfht_add(state->notification_trigger_clients_ht,
1106 lttng_condition_hash(condition),
1107 &client_list->notification_trigger_ht_node);
1108 /*
1109 * Client list ownership transferred to the
1110 * notification_trigger_clients_ht.
1111 */
1112 client_list = NULL;
1113
1114 /*
1115 * Add the trigger to list of triggers bound to the channels currently
1116 * known.
1117 */
1118 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1119 channels_ht_node) {
1120 struct lttng_trigger_list_element *trigger_list_element;
1121 struct lttng_channel_trigger_list *trigger_list;
1122 struct cds_lfht_iter lookup_iter;
1123
1124 if (!trigger_applies_to_channel(trigger, channel)) {
1125 continue;
1126 }
1127
1128 cds_lfht_lookup(state->channel_triggers_ht,
1129 hash_channel_key(&channel->key),
1130 match_channel_trigger_list,
1131 &channel->key,
1132 &lookup_iter);
1133 node = cds_lfht_iter_get_node(&lookup_iter);
1134 assert(node);
1135 trigger_list = caa_container_of(node,
1136 struct lttng_channel_trigger_list,
1137 channel_triggers_ht_node);
1138
1139 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1140 if (!trigger_list_element) {
1141 ret = -1;
1142 goto error_free_client_list;
1143 }
1144 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1145 trigger_list_element->trigger = trigger;
1146 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1147
1148 /* A trigger can only apply to one channel. */
1149 break;
1150 }
1151
1152 *cmd_result = LTTNG_OK;
1153 error_free_client_list:
1154 if (client_list) {
1155 cds_list_for_each_entry_safe(client_list_element, tmp,
1156 &client_list->list, node) {
1157 free(client_list_element);
1158 }
1159 free(client_list);
1160 }
1161 error_free_ht_element:
1162 free(trigger_ht_element);
1163 error:
1164 if (free_trigger) {
1165 struct lttng_action *action = lttng_trigger_get_action(trigger);
1166
1167 lttng_condition_destroy(condition);
1168 lttng_action_destroy(action);
1169 lttng_trigger_destroy(trigger);
1170 }
1171 rcu_read_unlock();
1172 return ret;
1173 }
1174
1175 static
1176 int handle_notification_thread_command_unregister_trigger(
1177 struct notification_thread_state *state,
1178 struct lttng_trigger *trigger,
1179 enum lttng_error_code *_cmd_reply)
1180 {
1181 struct cds_lfht_iter iter;
1182 struct cds_lfht_node *node, *triggers_ht_node;
1183 struct lttng_channel_trigger_list *trigger_list;
1184 struct notification_client_list *client_list;
1185 struct notification_client_list_element *client_list_element, *tmp;
1186 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1187 struct lttng_condition *condition = lttng_trigger_get_condition(
1188 trigger);
1189 struct lttng_action *action;
1190 enum lttng_error_code cmd_reply;
1191
1192 rcu_read_lock();
1193
1194 cds_lfht_lookup(state->triggers_ht,
1195 lttng_condition_hash(condition),
1196 match_condition,
1197 condition,
1198 &iter);
1199 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1200 if (!triggers_ht_node) {
1201 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1202 goto end;
1203 } else {
1204 cmd_reply = LTTNG_OK;
1205 }
1206
1207 /* Remove trigger from channel_triggers_ht. */
1208 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1209 channel_triggers_ht_node) {
1210 struct lttng_trigger_list_element *trigger_element, *tmp;
1211
1212 cds_list_for_each_entry_safe(trigger_element, tmp,
1213 &trigger_list->list, node) {
1214 struct lttng_condition *current_condition =
1215 lttng_trigger_get_condition(
1216 trigger_element->trigger);
1217
1218 assert(current_condition);
1219 if (!lttng_condition_is_equal(condition,
1220 current_condition)) {
1221 continue;
1222 }
1223
1224 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1225 cds_list_del(&trigger_element->node);
1226 /* A trigger can only appear once per channel */
1227 break;
1228 }
1229 }
1230
1231 /*
1232 * Remove and release the client list from
1233 * notification_trigger_clients_ht.
1234 */
1235 cds_lfht_lookup(state->notification_trigger_clients_ht,
1236 lttng_condition_hash(condition),
1237 match_client_list,
1238 trigger,
1239 &iter);
1240 node = cds_lfht_iter_get_node(&iter);
1241 assert(node);
1242 client_list = caa_container_of(node, struct notification_client_list,
1243 notification_trigger_ht_node);
1244 cds_list_for_each_entry_safe(client_list_element, tmp,
1245 &client_list->list, node) {
1246 free(client_list_element);
1247 }
1248 cds_lfht_del(state->notification_trigger_clients_ht, node);
1249 free(client_list);
1250
1251 /* Remove trigger from triggers_ht. */
1252 trigger_ht_element = caa_container_of(triggers_ht_node,
1253 struct lttng_trigger_ht_element, node);
1254 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1255
1256 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1257 lttng_condition_destroy(condition);
1258 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1259 lttng_action_destroy(action);
1260 lttng_trigger_destroy(trigger_ht_element->trigger);
1261 free(trigger_ht_element);
1262 end:
1263 rcu_read_unlock();
1264 if (_cmd_reply) {
1265 *_cmd_reply = cmd_reply;
1266 }
1267 return 0;
1268 }
1269
1270 /* Returns 0 on success, 1 on exit requested, negative value on error. */
1271 int handle_notification_thread_command(
1272 struct notification_thread_handle *handle,
1273 struct notification_thread_state *state)
1274 {
1275 int ret;
1276 uint64_t counter;
1277 struct notification_thread_command *cmd;
1278
1279 /* Read event_fd to put it back into a quiescent state. */
1280 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
1281 if (ret == -1) {
1282 goto error;
1283 }
1284
1285 pthread_mutex_lock(&handle->cmd_queue.lock);
1286 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1287 struct notification_thread_command, cmd_list_node);
1288 switch (cmd->type) {
1289 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1290 DBG("[notification-thread] Received register trigger command");
1291 ret = handle_notification_thread_command_register_trigger(
1292 state, cmd->parameters.trigger,
1293 &cmd->reply_code);
1294 break;
1295 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1296 DBG("[notification-thread] Received unregister trigger command");
1297 ret = handle_notification_thread_command_unregister_trigger(
1298 state, cmd->parameters.trigger,
1299 &cmd->reply_code);
1300 break;
1301 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1302 DBG("[notification-thread] Received add channel command");
1303 ret = handle_notification_thread_command_add_channel(
1304 state, &cmd->parameters.add_channel,
1305 &cmd->reply_code);
1306 break;
1307 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1308 DBG("[notification-thread] Received remove channel command");
1309 ret = handle_notification_thread_command_remove_channel(
1310 state, cmd->parameters.remove_channel.key,
1311 cmd->parameters.remove_channel.domain,
1312 &cmd->reply_code);
1313 break;
1314 case NOTIFICATION_COMMAND_TYPE_QUIT:
1315 DBG("[notification-thread] Received quit command");
1316 cmd->reply_code = LTTNG_OK;
1317 ret = 1;
1318 goto end;
1319 default:
1320 ERR("[notification-thread] Unknown internal command received");
1321 goto error_unlock;
1322 }
1323
1324 if (ret) {
1325 goto error_unlock;
1326 }
1327 end:
1328 cds_list_del(&cmd->cmd_list_node);
1329 lttng_waiter_wake_up(&cmd->reply_waiter);
1330 pthread_mutex_unlock(&handle->cmd_queue.lock);
1331 return ret;
1332 error_unlock:
1333 /* Wake-up and return a fatal error to the calling thread. */
1334 lttng_waiter_wake_up(&cmd->reply_waiter);
1335 pthread_mutex_unlock(&handle->cmd_queue.lock);
1336 cmd->reply_code = LTTNG_ERR_FATAL;
1337 error:
1338 /* Indicate a fatal error to the caller. */
1339 return -1;
1340 }
1341
1342 static
1343 unsigned long hash_client_socket(int socket)
1344 {
1345 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1346 }
1347
1348 static
1349 int socket_set_non_blocking(int socket)
1350 {
1351 int ret, flags;
1352
1353 /* Set the pipe as non-blocking. */
1354 ret = fcntl(socket, F_GETFL, 0);
1355 if (ret == -1) {
1356 PERROR("fcntl get socket flags");
1357 goto end;
1358 }
1359 flags = ret;
1360
1361 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1362 if (ret == -1) {
1363 PERROR("fcntl set O_NONBLOCK socket flag");
1364 goto end;
1365 }
1366 DBG("Client socket (fd = %i) set as non-blocking", socket);
1367 end:
1368 return ret;
1369 }
1370
1371 static
1372 int client_reset_inbound_state(struct notification_client *client)
1373 {
1374 int ret;
1375
1376 ret = lttng_dynamic_buffer_set_size(
1377 &client->communication.inbound.buffer, 0);
1378 assert(!ret);
1379
1380 client->communication.inbound.bytes_to_receive =
1381 sizeof(struct lttng_notification_channel_message);
1382 client->communication.inbound.msg_type =
1383 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1384 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1385 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1386 ret = lttng_dynamic_buffer_set_size(
1387 &client->communication.inbound.buffer,
1388 client->communication.inbound.bytes_to_receive);
1389 return ret;
1390 }
1391
1392 int handle_notification_thread_client_connect(
1393 struct notification_thread_state *state)
1394 {
1395 int ret;
1396 struct notification_client *client;
1397
1398 DBG("[notification-thread] Handling new notification channel client connection");
1399
1400 client = zmalloc(sizeof(*client));
1401 if (!client) {
1402 /* Fatal error. */
1403 ret = -1;
1404 goto error;
1405 }
1406 CDS_INIT_LIST_HEAD(&client->condition_list);
1407 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1408 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1409 client->communication.inbound.expect_creds = true;
1410 ret = client_reset_inbound_state(client);
1411 if (ret) {
1412 ERR("[notification-thread] Failed to reset client communication's inbound state");
1413 ret = 0;
1414 goto error;
1415 }
1416
1417 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1418 if (ret < 0) {
1419 ERR("[notification-thread] Failed to accept new notification channel client connection");
1420 ret = 0;
1421 goto error;
1422 }
1423
1424 client->socket = ret;
1425
1426 ret = socket_set_non_blocking(client->socket);
1427 if (ret) {
1428 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1429 goto error;
1430 }
1431
1432 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1433 if (ret < 0) {
1434 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1435 ret = 0;
1436 goto error;
1437 }
1438
1439 ret = lttng_poll_add(&state->events, client->socket,
1440 LPOLLIN | LPOLLERR |
1441 LPOLLHUP | LPOLLRDHUP);
1442 if (ret < 0) {
1443 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1444 ret = 0;
1445 goto error;
1446 }
1447 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1448 client->socket);
1449
1450 rcu_read_lock();
1451 cds_lfht_add(state->client_socket_ht,
1452 hash_client_socket(client->socket),
1453 &client->client_socket_ht_node);
1454 rcu_read_unlock();
1455
1456 return ret;
1457 error:
1458 notification_client_destroy(client, state);
1459 return ret;
1460 }
1461
1462 int handle_notification_thread_client_disconnect(
1463 int client_socket,
1464 struct notification_thread_state *state)
1465 {
1466 int ret = 0;
1467 struct notification_client *client;
1468
1469 rcu_read_lock();
1470 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1471 client_socket);
1472 client = get_client_from_socket(client_socket, state);
1473 if (!client) {
1474 /* Internal state corruption, fatal error. */
1475 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1476 client_socket);
1477 ret = -1;
1478 goto end;
1479 }
1480
1481 ret = lttng_poll_del(&state->events, client_socket);
1482 if (ret) {
1483 ERR("[notification-thread] Failed to remove client socket from poll set");
1484 }
1485 cds_lfht_del(state->client_socket_ht,
1486 &client->client_socket_ht_node);
1487 notification_client_destroy(client, state);
1488 end:
1489 rcu_read_unlock();
1490 return ret;
1491 }
1492
1493 int handle_notification_thread_client_disconnect_all(
1494 struct notification_thread_state *state)
1495 {
1496 struct cds_lfht_iter iter;
1497 struct notification_client *client;
1498 bool error_encoutered = false;
1499
1500 rcu_read_lock();
1501 DBG("[notification-thread] Closing all client connections");
1502 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1503 client_socket_ht_node) {
1504 int ret;
1505
1506 ret = handle_notification_thread_client_disconnect(
1507 client->socket, state);
1508 if (ret) {
1509 error_encoutered = true;
1510 }
1511 }
1512 rcu_read_unlock();
1513 return error_encoutered ? 1 : 0;
1514 }
1515
1516 int handle_notification_thread_trigger_unregister_all(
1517 struct notification_thread_state *state)
1518 {
1519 bool error_occurred = false;
1520 struct cds_lfht_iter iter;
1521 struct lttng_trigger_ht_element *trigger_ht_element;
1522
1523 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1524 node) {
1525 int ret = handle_notification_thread_command_unregister_trigger(
1526 state, trigger_ht_element->trigger, NULL);
1527 if (ret) {
1528 error_occurred = true;
1529 }
1530 }
1531 return error_occurred ? -1 : 0;
1532 }
1533
1534 static
1535 int client_flush_outgoing_queue(struct notification_client *client,
1536 struct notification_thread_state *state)
1537 {
1538 ssize_t ret;
1539 size_t to_send_count;
1540
1541 assert(client->communication.outbound.buffer.size != 0);
1542 to_send_count = client->communication.outbound.buffer.size;
1543 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1544 client->socket);
1545
1546 ret = lttcomm_send_unix_sock_non_block(client->socket,
1547 client->communication.outbound.buffer.data,
1548 to_send_count);
1549 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1550 (ret > 0 && ret < to_send_count)) {
1551 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1552 client->socket);
1553 to_send_count -= max(ret, 0);
1554
1555 memcpy(client->communication.outbound.buffer.data,
1556 client->communication.outbound.buffer.data +
1557 client->communication.outbound.buffer.size - to_send_count,
1558 to_send_count);
1559 ret = lttng_dynamic_buffer_set_size(
1560 &client->communication.outbound.buffer,
1561 to_send_count);
1562 if (ret) {
1563 goto error;
1564 }
1565
1566 /*
1567 * We want to be notified whenever there is buffer space
1568 * available to send the rest of the payload.
1569 */
1570 ret = lttng_poll_mod(&state->events, client->socket,
1571 CLIENT_POLL_MASK_IN_OUT);
1572 if (ret) {
1573 goto error;
1574 }
1575 } else if (ret < 0) {
1576 /* Generic error, disconnect the client. */
1577 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1578 client->socket);
1579 ret = handle_notification_thread_client_disconnect(
1580 client->socket, state);
1581 if (ret) {
1582 goto error;
1583 }
1584 } else {
1585 /* No error and flushed the queue completely. */
1586 ret = lttng_dynamic_buffer_set_size(
1587 &client->communication.outbound.buffer, 0);
1588 if (ret) {
1589 goto error;
1590 }
1591 ret = lttng_poll_mod(&state->events, client->socket,
1592 CLIENT_POLL_MASK_IN);
1593 if (ret) {
1594 goto error;
1595 }
1596
1597 client->communication.outbound.queued_command_reply = false;
1598 client->communication.outbound.dropped_notification = false;
1599 }
1600
1601 return 0;
1602 error:
1603 return -1;
1604 }
1605
1606 static
1607 int client_send_command_reply(struct notification_client *client,
1608 struct notification_thread_state *state,
1609 enum lttng_notification_channel_status status)
1610 {
1611 int ret;
1612 struct lttng_notification_channel_command_reply reply = {
1613 .status = (int8_t) status,
1614 };
1615 struct lttng_notification_channel_message msg = {
1616 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1617 .size = sizeof(reply),
1618 };
1619 char buffer[sizeof(msg) + sizeof(reply)];
1620
1621 if (client->communication.outbound.queued_command_reply) {
1622 /* Protocol error. */
1623 goto error;
1624 }
1625
1626 memcpy(buffer, &msg, sizeof(msg));
1627 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1628 DBG("[notification-thread] Send command reply (%i)", (int) status);
1629
1630 /* Enqueue buffer to outgoing queue and flush it. */
1631 ret = lttng_dynamic_buffer_append(
1632 &client->communication.outbound.buffer,
1633 buffer, sizeof(buffer));
1634 if (ret) {
1635 goto error;
1636 }
1637
1638 ret = client_flush_outgoing_queue(client, state);
1639 if (ret) {
1640 goto error;
1641 }
1642
1643 if (client->communication.outbound.buffer.size != 0) {
1644 /* Queue could not be emptied. */
1645 client->communication.outbound.queued_command_reply = true;
1646 }
1647
1648 return 0;
1649 error:
1650 return -1;
1651 }
1652
1653 static
1654 int client_dispatch_message(struct notification_client *client,
1655 struct notification_thread_state *state)
1656 {
1657 int ret = 0;
1658
1659 if (client->communication.inbound.msg_type !=
1660 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1661 client->communication.inbound.msg_type !=
1662 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1663 !client->validated) {
1664 WARN("[notification-thread] client attempted a command before handshake");
1665 ret = -1;
1666 goto end;
1667 }
1668
1669 switch (client->communication.inbound.msg_type) {
1670 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1671 {
1672 /*
1673 * Receiving message header. The function will be called again
1674 * once the rest of the message as been received and can be
1675 * interpreted.
1676 */
1677 const struct lttng_notification_channel_message *msg;
1678
1679 assert(sizeof(*msg) ==
1680 client->communication.inbound.buffer.size);
1681 msg = (const struct lttng_notification_channel_message *)
1682 client->communication.inbound.buffer.data;
1683
1684 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1685 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1686 ret = -1;
1687 goto end;
1688 }
1689
1690 switch (msg->type) {
1691 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1692 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1693 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1694 break;
1695 default:
1696 ret = -1;
1697 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1698 goto end;
1699 }
1700
1701 client->communication.inbound.bytes_to_receive = msg->size;
1702 client->communication.inbound.msg_type =
1703 (enum lttng_notification_channel_message_type) msg->type;
1704 ret = lttng_dynamic_buffer_set_size(
1705 &client->communication.inbound.buffer, msg->size);
1706 if (ret) {
1707 goto end;
1708 }
1709 break;
1710 }
1711 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1712 {
1713 struct lttng_notification_channel_command_handshake *handshake_client;
1714 struct lttng_notification_channel_command_handshake handshake_reply = {
1715 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1716 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1717 };
1718 struct lttng_notification_channel_message msg_header = {
1719 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1720 .size = sizeof(handshake_reply),
1721 };
1722 enum lttng_notification_channel_status status =
1723 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1724 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1725
1726 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1727 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1728 sizeof(handshake_reply));
1729
1730 handshake_client =
1731 (struct lttng_notification_channel_command_handshake *)
1732 client->communication.inbound.buffer.data;
1733 client->major = handshake_client->major;
1734 client->minor = handshake_client->minor;
1735 if (!client->communication.inbound.creds_received) {
1736 ERR("[notification-thread] No credentials received from client");
1737 ret = -1;
1738 goto end;
1739 }
1740
1741 client->uid = LTTNG_SOCK_GET_UID_CRED(
1742 &client->communication.inbound.creds);
1743 client->gid = LTTNG_SOCK_GET_GID_CRED(
1744 &client->communication.inbound.creds);
1745 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1746 client->uid, client->gid, (int) client->major,
1747 (int) client->minor);
1748
1749 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1750 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1751 }
1752
1753 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1754 send_buffer, sizeof(send_buffer));
1755 if (ret) {
1756 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1757 goto end;
1758 }
1759
1760 ret = client_flush_outgoing_queue(client, state);
1761 if (ret) {
1762 goto end;
1763 }
1764
1765 ret = client_send_command_reply(client, state, status);
1766 if (ret) {
1767 ERR("[notification-thread] Failed to send reply to notification channel client");
1768 goto end;
1769 }
1770
1771 /* Set reception state to receive the next message header. */
1772 ret = client_reset_inbound_state(client);
1773 if (ret) {
1774 ERR("[notification-thread] Failed to reset client communication's inbound state");
1775 goto end;
1776 }
1777 client->validated = true;
1778 break;
1779 }
1780 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1781 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1782 {
1783 struct lttng_condition *condition;
1784 enum lttng_notification_channel_status status =
1785 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1786 const struct lttng_buffer_view condition_view =
1787 lttng_buffer_view_from_dynamic_buffer(
1788 &client->communication.inbound.buffer,
1789 0, -1);
1790 size_t expected_condition_size =
1791 client->communication.inbound.buffer.size;
1792
1793 ret = lttng_condition_create_from_buffer(&condition_view,
1794 &condition);
1795 if (ret != expected_condition_size) {
1796 ERR("[notification-thread] Malformed condition received from client");
1797 goto end;
1798 }
1799
1800 if (client->communication.inbound.msg_type ==
1801 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1802 /*
1803 * FIXME The current state should be evaluated on
1804 * subscription.
1805 */
1806 ret = notification_thread_client_subscribe(client,
1807 condition, state, &status);
1808 } else {
1809 ret = notification_thread_client_unsubscribe(client,
1810 condition, state, &status);
1811 }
1812 if (ret) {
1813 goto end;
1814 }
1815
1816 ret = client_send_command_reply(client, state, status);
1817 if (ret) {
1818 ERR("[notification-thread] Failed to send reply to notification channel client");
1819 goto end;
1820 }
1821
1822 /* Set reception state to receive the next message header. */
1823 ret = client_reset_inbound_state(client);
1824 if (ret) {
1825 ERR("[notification-thread] Failed to reset client communication's inbound state");
1826 goto end;
1827 }
1828 break;
1829 }
1830 default:
1831 abort();
1832 }
1833 end:
1834 return ret;
1835 }
1836
1837 /* Incoming data from client. */
1838 int handle_notification_thread_client_in(
1839 struct notification_thread_state *state, int socket)
1840 {
1841 int ret = 0;
1842 struct notification_client *client;
1843 ssize_t recv_ret;
1844 size_t offset;
1845
1846 client = get_client_from_socket(socket, state);
1847 if (!client) {
1848 /* Internal error, abort. */
1849 ret = -1;
1850 goto end;
1851 }
1852
1853 offset = client->communication.inbound.buffer.size -
1854 client->communication.inbound.bytes_to_receive;
1855 if (client->communication.inbound.expect_creds) {
1856 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1857 client->communication.inbound.buffer.data + offset,
1858 client->communication.inbound.bytes_to_receive,
1859 &client->communication.inbound.creds);
1860 if (recv_ret > 0) {
1861 client->communication.inbound.expect_creds = false;
1862 client->communication.inbound.creds_received = true;
1863 }
1864 } else {
1865 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1866 client->communication.inbound.buffer.data + offset,
1867 client->communication.inbound.bytes_to_receive);
1868 }
1869 if (recv_ret < 0) {
1870 goto error_disconnect_client;
1871 }
1872
1873 client->communication.inbound.bytes_to_receive -= recv_ret;
1874 if (client->communication.inbound.bytes_to_receive == 0) {
1875 ret = client_dispatch_message(client, state);
1876 if (ret) {
1877 /*
1878 * Only returns an error if this client must be
1879 * disconnected.
1880 */
1881 goto error_disconnect_client;
1882 }
1883 } else {
1884 goto end;
1885 }
1886 end:
1887 return ret;
1888 error_disconnect_client:
1889 ret = handle_notification_thread_client_disconnect(socket, state);
1890 return ret;
1891 }
1892
1893 /* Client ready to receive outgoing data. */
1894 int handle_notification_thread_client_out(
1895 struct notification_thread_state *state, int socket)
1896 {
1897 int ret;
1898 struct notification_client *client;
1899
1900 client = get_client_from_socket(socket, state);
1901 if (!client) {
1902 /* Internal error, abort. */
1903 ret = -1;
1904 goto end;
1905 }
1906
1907 ret = client_flush_outgoing_queue(client, state);
1908 if (ret) {
1909 goto end;
1910 }
1911 end:
1912 return ret;
1913 }
1914
1915 static
1916 bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1917 struct channel_state_sample *sample, uint64_t buffer_capacity)
1918 {
1919 bool result = false;
1920 uint64_t threshold;
1921 enum lttng_condition_type condition_type;
1922 struct lttng_condition_buffer_usage *use_condition = container_of(
1923 condition, struct lttng_condition_buffer_usage,
1924 parent);
1925
1926 if (!sample) {
1927 goto end;
1928 }
1929
1930 if (use_condition->threshold_bytes.set) {
1931 threshold = use_condition->threshold_bytes.value;
1932 } else {
1933 /*
1934 * Threshold was expressed as a ratio.
1935 *
1936 * TODO the threshold (in bytes) of conditions expressed
1937 * as a ratio of total buffer size could be cached to
1938 * forego this double-multiplication or it could be performed
1939 * as fixed-point math.
1940 *
1941 * Note that caching should accomodate the case where the
1942 * condition applies to multiple channels (i.e. don't assume
1943 * that all channels matching my_chann* have the same size...)
1944 */
1945 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1946 (double) buffer_capacity);
1947 }
1948
1949 condition_type = lttng_condition_get_type(condition);
1950 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1951 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1952 threshold, sample->highest_usage);
1953
1954 /*
1955 * The low condition should only be triggered once _all_ of the
1956 * streams in a channel have gone below the "low" threshold.
1957 */
1958 if (sample->highest_usage <= threshold) {
1959 result = true;
1960 }
1961 } else {
1962 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1963 threshold, sample->highest_usage);
1964
1965 /*
1966 * For high buffer usage scenarios, we want to trigger whenever
1967 * _any_ of the streams has reached the "high" threshold.
1968 */
1969 if (sample->highest_usage >= threshold) {
1970 result = true;
1971 }
1972 }
1973 end:
1974 return result;
1975 }
1976
1977 static
1978 int evaluate_condition(struct lttng_condition *condition,
1979 struct lttng_evaluation **evaluation,
1980 struct notification_thread_state *state,
1981 struct channel_state_sample *previous_sample,
1982 struct channel_state_sample *latest_sample,
1983 uint64_t buffer_capacity)
1984 {
1985 int ret = 0;
1986 enum lttng_condition_type condition_type;
1987 bool previous_sample_result;
1988 bool latest_sample_result;
1989
1990 condition_type = lttng_condition_get_type(condition);
1991 /* No other condition type supported for the moment. */
1992 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1993 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1994
1995 previous_sample_result = evaluate_buffer_usage_condition(condition,
1996 previous_sample, buffer_capacity);
1997 latest_sample_result = evaluate_buffer_usage_condition(condition,
1998 latest_sample, buffer_capacity);
1999
2000 if (!latest_sample_result ||
2001 (previous_sample_result == latest_sample_result)) {
2002 /*
2003 * Only trigger on a condition evaluation transition.
2004 *
2005 * NOTE: This edge-triggered logic may not be appropriate for
2006 * future condition types.
2007 */
2008 goto end;
2009 }
2010
2011 if (evaluation && latest_sample_result) {
2012 *evaluation = lttng_evaluation_buffer_usage_create(
2013 condition_type,
2014 latest_sample->highest_usage,
2015 buffer_capacity);
2016 if (!*evaluation) {
2017 ret = -1;
2018 goto end;
2019 }
2020 }
2021 end:
2022 return ret;
2023 }
2024
2025 static
2026 int client_enqueue_dropped_notification(struct notification_client *client,
2027 struct notification_thread_state *state)
2028 {
2029 int ret;
2030 struct lttng_notification_channel_message msg = {
2031 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2032 .size = 0,
2033 };
2034
2035 ret = lttng_dynamic_buffer_append(
2036 &client->communication.outbound.buffer, &msg,
2037 sizeof(msg));
2038 return ret;
2039 }
2040
2041 static
2042 int send_evaluation_to_clients(struct lttng_trigger *trigger,
2043 struct lttng_evaluation *evaluation,
2044 struct notification_client_list* client_list,
2045 struct notification_thread_state *state,
2046 uid_t channel_uid, gid_t channel_gid)
2047 {
2048 int ret = 0;
2049 struct lttng_dynamic_buffer msg_buffer;
2050 struct notification_client_list_element *client_list_element, *tmp;
2051 struct lttng_notification *notification;
2052 struct lttng_condition *condition;
2053 ssize_t expected_notification_size, notification_size;
2054 struct lttng_notification_channel_message msg;
2055
2056 lttng_dynamic_buffer_init(&msg_buffer);
2057
2058 condition = lttng_trigger_get_condition(trigger);
2059 assert(condition);
2060
2061 notification = lttng_notification_create(condition, evaluation);
2062 if (!notification) {
2063 ret = -1;
2064 goto end;
2065 }
2066
2067 expected_notification_size = lttng_notification_serialize(notification,
2068 NULL);
2069 if (expected_notification_size < 0) {
2070 ERR("[notification-thread] Failed to get size of serialized notification");
2071 ret = -1;
2072 goto end;
2073 }
2074
2075 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2076 msg.size = (uint32_t) expected_notification_size;
2077 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2078 if (ret) {
2079 goto end;
2080 }
2081
2082 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2083 msg_buffer.size + expected_notification_size);
2084 if (ret) {
2085 goto end;
2086 }
2087
2088 notification_size = lttng_notification_serialize(notification,
2089 msg_buffer.data + sizeof(msg));
2090 if (notification_size != expected_notification_size) {
2091 ERR("[notification-thread] Failed to serialize notification");
2092 ret = -1;
2093 goto end;
2094 }
2095
2096 cds_list_for_each_entry_safe(client_list_element, tmp,
2097 &client_list->list, node) {
2098 struct notification_client *client =
2099 client_list_element->client;
2100
2101 if (client->uid != channel_uid && client->gid != channel_gid &&
2102 client->uid != 0) {
2103 /* Client is not allowed to monitor this channel. */
2104 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2105 continue;
2106 }
2107
2108 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2109 client->socket, msg_buffer.size);
2110 if (client->communication.outbound.buffer.size) {
2111 /*
2112 * Outgoing data is already buffered for this client;
2113 * drop the notification and enqueue a "dropped
2114 * notification" message if this is the first dropped
2115 * notification since the socket spilled-over to the
2116 * queue.
2117 */
2118 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2119 client->socket);
2120 if (!client->communication.outbound.dropped_notification) {
2121 client->communication.outbound.dropped_notification = true;
2122 ret = client_enqueue_dropped_notification(
2123 client, state);
2124 if (ret) {
2125 goto end;
2126 }
2127 }
2128 continue;
2129 }
2130
2131 ret = lttng_dynamic_buffer_append_buffer(
2132 &client->communication.outbound.buffer,
2133 &msg_buffer);
2134 if (ret) {
2135 goto end;
2136 }
2137
2138 ret = client_flush_outgoing_queue(client, state);
2139 if (ret) {
2140 goto end;
2141 }
2142 }
2143 ret = 0;
2144 end:
2145 lttng_notification_destroy(notification);
2146 lttng_dynamic_buffer_reset(&msg_buffer);
2147 return ret;
2148 }
2149
2150 int handle_notification_thread_channel_sample(
2151 struct notification_thread_state *state, int pipe,
2152 enum lttng_domain_type domain)
2153 {
2154 int ret = 0;
2155 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2156 struct channel_state_sample previous_sample, latest_sample;
2157 struct channel_info *channel_info;
2158 struct cds_lfht_node *node;
2159 struct cds_lfht_iter iter;
2160 struct lttng_channel_trigger_list *trigger_list;
2161 struct lttng_trigger_list_element *trigger_list_element;
2162 bool previous_sample_available = false;
2163
2164 /*
2165 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2166 * ensuring that read/write of sampling messages are atomic.
2167 */
2168 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2169 if (ret != sizeof(sample_msg)) {
2170 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2171 pipe);
2172 ret = -1;
2173 goto end;
2174 }
2175
2176 ret = 0;
2177 latest_sample.key.key = sample_msg.key;
2178 latest_sample.key.domain = domain;
2179 latest_sample.highest_usage = sample_msg.highest;
2180 latest_sample.lowest_usage = sample_msg.lowest;
2181
2182 rcu_read_lock();
2183
2184 /* Retrieve the channel's informations */
2185 cds_lfht_lookup(state->channels_ht,
2186 hash_channel_key(&latest_sample.key),
2187 match_channel_info,
2188 &latest_sample.key,
2189 &iter);
2190 node = cds_lfht_iter_get_node(&iter);
2191 if (!node) {
2192 /*
2193 * Not an error since the consumer can push a sample to the pipe
2194 * and the rest of the session daemon could notify us of the
2195 * channel's destruction before we get a chance to process that
2196 * sample.
2197 */
2198 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2199 latest_sample.key.key,
2200 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2201 "user space");
2202 goto end_unlock;
2203 }
2204 channel_info = caa_container_of(node, struct channel_info,
2205 channels_ht_node);
2206 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2207 channel_info->channel_name,
2208 latest_sample.key.key,
2209 channel_info->session_name,
2210 latest_sample.highest_usage,
2211 latest_sample.lowest_usage);
2212
2213 /* Retrieve the channel's last sample, if it exists, and update it. */
2214 cds_lfht_lookup(state->channel_state_ht,
2215 hash_channel_key(&latest_sample.key),
2216 match_channel_state_sample,
2217 &latest_sample.key,
2218 &iter);
2219 node = cds_lfht_iter_get_node(&iter);
2220 if (node) {
2221 struct channel_state_sample *stored_sample;
2222
2223 /* Update the sample stored. */
2224 stored_sample = caa_container_of(node,
2225 struct channel_state_sample,
2226 channel_state_ht_node);
2227 memcpy(&previous_sample, stored_sample,
2228 sizeof(previous_sample));
2229 stored_sample->highest_usage = latest_sample.highest_usage;
2230 stored_sample->lowest_usage = latest_sample.lowest_usage;
2231 previous_sample_available = true;
2232 } else {
2233 /*
2234 * This is the channel's first sample, allocate space for and
2235 * store the new sample.
2236 */
2237 struct channel_state_sample *stored_sample;
2238
2239 stored_sample = zmalloc(sizeof(*stored_sample));
2240 if (!stored_sample) {
2241 ret = -1;
2242 goto end_unlock;
2243 }
2244
2245 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2246 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2247 cds_lfht_add(state->channel_state_ht,
2248 hash_channel_key(&stored_sample->key),
2249 &stored_sample->channel_state_ht_node);
2250 }
2251
2252 /* Find triggers associated with this channel. */
2253 cds_lfht_lookup(state->channel_triggers_ht,
2254 hash_channel_key(&latest_sample.key),
2255 match_channel_trigger_list,
2256 &latest_sample.key,
2257 &iter);
2258 node = cds_lfht_iter_get_node(&iter);
2259 if (!node) {
2260 goto end_unlock;
2261 }
2262
2263 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2264 channel_triggers_ht_node);
2265 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2266 node) {
2267 struct lttng_condition *condition;
2268 struct lttng_action *action;
2269 struct lttng_trigger *trigger;
2270 struct notification_client_list *client_list;
2271 struct lttng_evaluation *evaluation = NULL;
2272
2273 trigger = trigger_list_element->trigger;
2274 condition = lttng_trigger_get_condition(trigger);
2275 assert(condition);
2276 action = lttng_trigger_get_action(trigger);
2277
2278 /* Notify actions are the only type currently supported. */
2279 assert(lttng_action_get_type(action) ==
2280 LTTNG_ACTION_TYPE_NOTIFY);
2281
2282 /*
2283 * Check if any client is subscribed to the result of this
2284 * evaluation.
2285 */
2286 cds_lfht_lookup(state->notification_trigger_clients_ht,
2287 lttng_condition_hash(condition),
2288 match_client_list,
2289 trigger,
2290 &iter);
2291 node = cds_lfht_iter_get_node(&iter);
2292 assert(node);
2293
2294 client_list = caa_container_of(node,
2295 struct notification_client_list,
2296 notification_trigger_ht_node);
2297 if (cds_list_empty(&client_list->list)) {
2298 /*
2299 * No clients interested in the evaluation's result,
2300 * skip it.
2301 */
2302 continue;
2303 }
2304
2305 ret = evaluate_condition(condition, &evaluation, state,
2306 previous_sample_available ? &previous_sample : NULL,
2307 &latest_sample, channel_info->capacity);
2308 if (ret) {
2309 goto end_unlock;
2310 }
2311
2312 if (!evaluation) {
2313 continue;
2314 }
2315
2316 /* Dispatch evaluation result to all clients. */
2317 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2318 evaluation, client_list, state,
2319 channel_info->uid, channel_info->gid);
2320 if (ret) {
2321 goto end_unlock;
2322 }
2323 }
2324 end_unlock:
2325 rcu_read_unlock();
2326 end:
2327 return ret;
2328 }
This page took 0.144516 seconds and 5 git commands to generate.