Fix: client_list_element leak on failure to evaluate a condition
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
fa7c4fcd 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
44f2d7db 41#include <fcntl.h>
ab0ee2ca 42
fa7c4fcd
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
6a00208d 91 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
b5eb6b46
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
f950d02c 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
b5eb6b46 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
f0225486
JR
157static unsigned long hash_channel_key(struct channel_key *key);
158static int evaluate_condition(struct lttng_condition *condition,
159 struct lttng_evaluation **evaluation,
160 struct notification_thread_state *state,
161 struct channel_state_sample *previous_sample,
162 struct channel_state_sample *latest_sample,
163 uint64_t buffer_capacity);
164static
165int send_evaluation_to_clients(struct lttng_trigger *trigger,
166 struct lttng_evaluation *evaluation,
167 struct notification_client_list *client_list,
168 struct notification_thread_state *state,
169 uid_t channel_uid, gid_t channel_gid);
170
ab0ee2ca
JG
171static
172int match_client(struct cds_lfht_node *node, const void *key)
173{
174 /* This double-cast is intended to supress pointer-to-cast warning. */
175 int socket = (int) (intptr_t) key;
176 struct notification_client *client;
177
178 client = caa_container_of(node, struct notification_client,
179 client_socket_ht_node);
180
181 return !!(client->socket == socket);
182}
183
184static
185int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
186{
187 struct channel_key *channel_key = (struct channel_key *) key;
188 struct lttng_channel_trigger_list *trigger_list;
189
190 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
191 channel_triggers_ht_node);
192
193 return !!((channel_key->key == trigger_list->channel_key.key) &&
194 (channel_key->domain == trigger_list->channel_key.domain));
195}
196
197static
198int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
199{
200 struct channel_key *channel_key = (struct channel_key *) key;
201 struct channel_state_sample *sample;
202
203 sample = caa_container_of(node, struct channel_state_sample,
204 channel_state_ht_node);
205
206 return !!((channel_key->key == sample->key.key) &&
207 (channel_key->domain == sample->key.domain));
208}
209
210static
211int match_channel_info(struct cds_lfht_node *node, const void *key)
212{
213 struct channel_key *channel_key = (struct channel_key *) key;
214 struct channel_info *channel_info;
215
216 channel_info = caa_container_of(node, struct channel_info,
217 channels_ht_node);
218
219 return !!((channel_key->key == channel_info->key.key) &&
220 (channel_key->domain == channel_info->key.domain));
221}
222
223static
224int match_condition(struct cds_lfht_node *node, const void *key)
225{
226 struct lttng_condition *condition_key = (struct lttng_condition *) key;
227 struct lttng_trigger_ht_element *trigger;
228 struct lttng_condition *condition;
229
230 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
231 node);
232 condition = lttng_trigger_get_condition(trigger->trigger);
233 assert(condition);
234
235 return !!lttng_condition_is_equal(condition_key, condition);
236}
237
238static
239int match_client_list(struct cds_lfht_node *node, const void *key)
240{
241 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
242 struct notification_client_list *client_list;
243 struct lttng_condition *condition;
244 struct lttng_condition *condition_key = lttng_trigger_get_condition(
245 trigger_key);
246
247 assert(condition_key);
248
249 client_list = caa_container_of(node, struct notification_client_list,
250 notification_trigger_ht_node);
251 condition = lttng_trigger_get_condition(client_list->trigger);
252
253 return !!lttng_condition_is_equal(condition_key, condition);
254}
255
256static
257int match_client_list_condition(struct cds_lfht_node *node, const void *key)
258{
259 struct lttng_condition *condition_key = (struct lttng_condition *) key;
260 struct notification_client_list *client_list;
261 struct lttng_condition *condition;
262
263 assert(condition_key);
264
265 client_list = caa_container_of(node, struct notification_client_list,
266 notification_trigger_ht_node);
267 condition = lttng_trigger_get_condition(client_list->trigger);
268
269 return !!lttng_condition_is_equal(condition_key, condition);
270}
271
272static
273unsigned long lttng_condition_buffer_usage_hash(
274 struct lttng_condition *_condition)
275{
276 unsigned long hash = 0;
277 struct lttng_condition_buffer_usage *condition;
278
279 condition = container_of(_condition,
280 struct lttng_condition_buffer_usage, parent);
281
282 if (condition->session_name) {
283 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
284 }
285 if (condition->channel_name) {
8f56701f 286 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
287 }
288 if (condition->domain.set) {
289 hash ^= hash_key_ulong(
290 (void *) condition->domain.type,
291 lttng_ht_seed);
292 }
293 if (condition->threshold_ratio.set) {
294 uint64_t val;
295
296 val = condition->threshold_ratio.value * (double) UINT32_MAX;
297 hash ^= hash_key_u64(&val, lttng_ht_seed);
b6d67aac 298 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
299 uint64_t val;
300
301 val = condition->threshold_bytes.value;
302 hash ^= hash_key_u64(&val, lttng_ht_seed);
303 }
304 return hash;
305}
306
307/*
308 * The lttng_condition hashing code is kept in this file (rather than
309 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
310 * don't want to link in liblttng-ctl.
311 */
312static
313unsigned long lttng_condition_hash(struct lttng_condition *condition)
314{
315 switch (condition->type) {
316 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
317 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
318 return lttng_condition_buffer_usage_hash(condition);
319 default:
320 ERR("[notification-thread] Unexpected condition type caught");
321 abort();
322 }
323}
324
f0225486
JR
325static
326unsigned long hash_channel_key(struct channel_key *key)
327{
328 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
329 unsigned long domain_hash = hash_key_ulong(
330 (void *) (unsigned long) key->domain, lttng_ht_seed);
331
332 return key_hash ^ domain_hash;
333}
334
ab0ee2ca
JG
335static
336void channel_info_destroy(struct channel_info *channel_info)
337{
338 if (!channel_info) {
339 return;
340 }
341
342 if (channel_info->session_name) {
343 free(channel_info->session_name);
344 }
345 if (channel_info->channel_name) {
346 free(channel_info->channel_name);
347 }
348 free(channel_info);
349}
350
351static
352struct channel_info *channel_info_copy(struct channel_info *channel_info)
353{
354 struct channel_info *copy = zmalloc(sizeof(*channel_info));
355
356 assert(channel_info);
357 assert(channel_info->session_name);
358 assert(channel_info->channel_name);
359
360 if (!copy) {
361 goto end;
362 }
363
364 memcpy(copy, channel_info, sizeof(*channel_info));
365 copy->session_name = NULL;
366 copy->channel_name = NULL;
367
368 copy->session_name = strdup(channel_info->session_name);
369 if (!copy->session_name) {
370 goto error;
371 }
372 copy->channel_name = strdup(channel_info->channel_name);
373 if (!copy->channel_name) {
374 goto error;
375 }
376 cds_lfht_node_init(&channel_info->channels_ht_node);
377end:
378 return copy;
379error:
380 channel_info_destroy(copy);
381 return NULL;
382}
383
f0225486
JR
384/* This function must be called with the RCU read lock held. */
385static
386int evaluate_condition_for_client(struct lttng_trigger *trigger,
387 struct lttng_condition *condition,
388 struct notification_client *client,
389 struct notification_thread_state *state)
390{
391 int ret;
392 struct cds_lfht_iter iter;
393 struct cds_lfht_node *node;
394 struct channel_info *channel_info = NULL;
395 struct channel_key *channel_key = NULL;
396 struct channel_state_sample *last_sample = NULL;
397 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
398 struct lttng_evaluation *evaluation = NULL;
399 struct notification_client_list client_list = { 0 };
400 struct notification_client_list_element client_list_element = { 0 };
401
402 assert(trigger);
403 assert(condition);
404 assert(client);
405 assert(state);
406
407 /* Find the channel associated with the trigger. */
408 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
409 channel_trigger_list , channel_triggers_ht_node) {
410 struct lttng_trigger_list_element *element;
411
412 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
413 struct lttng_condition *current_condition =
414 lttng_trigger_get_condition(
415 element->trigger);
416
417 assert(current_condition);
418 if (!lttng_condition_is_equal(condition,
419 current_condition)) {
420 continue;
421 }
422
423 /* Found the trigger, save the channel key. */
424 channel_key = &channel_trigger_list->channel_key;
425 break;
426 }
427 if (channel_key) {
428 /* The channel key was found stop iteration. */
429 break;
430 }
431 }
432
433 if (!channel_key){
434 /* No channel found; normal exit. */
435 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
436 ret = 0;
437 goto end;
438 }
439
440 /* Fetch channel info for the matching channel. */
441 cds_lfht_lookup(state->channels_ht,
442 hash_channel_key(channel_key),
443 match_channel_info,
444 channel_key,
445 &iter);
446 node = cds_lfht_iter_get_node(&iter);
447 assert(node);
448 channel_info = caa_container_of(node, struct channel_info,
449 channels_ht_node);
450
451 /* Retrieve the channel's last sample, if it exists. */
452 cds_lfht_lookup(state->channel_state_ht,
453 hash_channel_key(channel_key),
454 match_channel_state_sample,
455 channel_key,
456 &iter);
457 node = cds_lfht_iter_get_node(&iter);
458 if (node) {
459 last_sample = caa_container_of(node,
460 struct channel_state_sample,
461 channel_state_ht_node);
462 } else {
463 /* Nothing to evaluate, no sample was ever taken. Normal exit */
464 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
465 ret = 0;
466 goto end;
467 }
468
469 ret = evaluate_condition(condition, &evaluation, state, NULL,
470 last_sample, channel_info->capacity);
471 if (ret) {
e7d6cdaa 472 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
f0225486
JR
473 goto end;
474 }
475
476 if (!evaluation) {
477 /* Evaluation yielded nothing. Normal exit. */
478 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
479 ret = 0;
480 goto end;
481 }
482
483 /*
484 * Create a temporary client list with the client currently
485 * subscribing.
486 */
487 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
488 CDS_INIT_LIST_HEAD(&client_list.list);
489 client_list.trigger = trigger;
490
491 CDS_INIT_LIST_HEAD(&client_list_element.node);
492 client_list_element.client = client;
493 cds_list_add(&client_list_element.node, &client_list.list);
494
495 /* Send evaluation result to the newly-subscribed client. */
496 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
497 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
498 state, channel_info->uid, channel_info->gid);
499
500end:
501 return ret;
502}
503
ab0ee2ca
JG
504static
505int notification_thread_client_subscribe(struct notification_client *client,
506 struct lttng_condition *condition,
507 struct notification_thread_state *state,
508 enum lttng_notification_channel_status *_status)
509{
510 int ret = 0;
511 struct cds_lfht_iter iter;
512 struct cds_lfht_node *node;
513 struct notification_client_list *client_list;
514 struct lttng_condition_list_element *condition_list_element = NULL;
515 struct notification_client_list_element *client_list_element = NULL;
516 enum lttng_notification_channel_status status =
517 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
518
519 /*
520 * Ensure that the client has not already subscribed to this condition
521 * before.
522 */
523 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
524 if (lttng_condition_is_equal(condition_list_element->condition,
525 condition)) {
526 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
527 goto end;
528 }
529 }
530
531 condition_list_element = zmalloc(sizeof(*condition_list_element));
532 if (!condition_list_element) {
533 ret = -1;
534 goto error;
535 }
536 client_list_element = zmalloc(sizeof(*client_list_element));
537 if (!client_list_element) {
538 ret = -1;
539 goto error;
540 }
541
542 rcu_read_lock();
543
544 /*
545 * Add the newly-subscribed condition to the client's subscription list.
546 */
547 CDS_INIT_LIST_HEAD(&condition_list_element->node);
548 condition_list_element->condition = condition;
549 cds_list_add(&condition_list_element->node, &client->condition_list);
550
ab0ee2ca
JG
551 cds_lfht_lookup(state->notification_trigger_clients_ht,
552 lttng_condition_hash(condition),
553 match_client_list_condition,
554 condition,
555 &iter);
556 node = cds_lfht_iter_get_node(&iter);
557 if (!node) {
c3dce007 558 free(client_list_element);
ab0ee2ca
JG
559 goto end_unlock;
560 }
561
562 client_list = caa_container_of(node, struct notification_client_list,
563 notification_trigger_ht_node);
f0225486
JR
564 if (evaluate_condition_for_client(client_list->trigger, condition,
565 client, state)) {
566 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
567 ret = -1;
d39ea393 568 free(client_list_element);
f0225486
JR
569 goto end_unlock;
570 }
571
572 /*
573 * Add the client to the list of clients interested in a given trigger
574 * if a "notification" trigger with a corresponding condition was
575 * added prior.
576 */
ab0ee2ca
JG
577 client_list_element->client = client;
578 CDS_INIT_LIST_HEAD(&client_list_element->node);
579 cds_list_add(&client_list_element->node, &client_list->list);
580end_unlock:
581 rcu_read_unlock();
582end:
583 if (_status) {
584 *_status = status;
585 }
586 return ret;
587error:
588 free(condition_list_element);
589 free(client_list_element);
590 return ret;
591}
592
593static
594int notification_thread_client_unsubscribe(
595 struct notification_client *client,
596 struct lttng_condition *condition,
597 struct notification_thread_state *state,
598 enum lttng_notification_channel_status *_status)
599{
600 struct cds_lfht_iter iter;
601 struct cds_lfht_node *node;
602 struct notification_client_list *client_list;
603 struct lttng_condition_list_element *condition_list_element,
604 *condition_tmp;
605 struct notification_client_list_element *client_list_element,
606 *client_tmp;
607 bool condition_found = false;
608 enum lttng_notification_channel_status status =
609 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
610
611 /* Remove the condition from the client's condition list. */
612 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
613 &client->condition_list, node) {
614 if (!lttng_condition_is_equal(condition_list_element->condition,
615 condition)) {
616 continue;
617 }
618
619 cds_list_del(&condition_list_element->node);
620 /*
621 * The caller may be iterating on the client's conditions to
622 * tear down a client's connection. In this case, the condition
623 * will be destroyed at the end.
624 */
625 if (condition != condition_list_element->condition) {
626 lttng_condition_destroy(
627 condition_list_element->condition);
628 }
629 free(condition_list_element);
630 condition_found = true;
631 break;
632 }
633
634 if (!condition_found) {
635 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
636 goto end;
637 }
638
639 /*
640 * Remove the client from the list of clients interested the trigger
641 * matching the condition.
642 */
643 rcu_read_lock();
644 cds_lfht_lookup(state->notification_trigger_clients_ht,
645 lttng_condition_hash(condition),
646 match_client_list_condition,
647 condition,
648 &iter);
649 node = cds_lfht_iter_get_node(&iter);
650 if (!node) {
651 goto end_unlock;
652 }
653
654 client_list = caa_container_of(node, struct notification_client_list,
655 notification_trigger_ht_node);
656 cds_list_for_each_entry_safe(client_list_element, client_tmp,
657 &client_list->list, node) {
658 if (client_list_element->client->socket != client->socket) {
659 continue;
660 }
661 cds_list_del(&client_list_element->node);
662 free(client_list_element);
663 break;
664 }
665end_unlock:
666 rcu_read_unlock();
667end:
668 lttng_condition_destroy(condition);
669 if (_status) {
670 *_status = status;
671 }
672 return 0;
673}
674
675static
676void notification_client_destroy(struct notification_client *client,
677 struct notification_thread_state *state)
678{
679 struct lttng_condition_list_element *condition_list_element, *tmp;
680
681 if (!client) {
682 return;
683 }
684
685 /* Release all conditions to which the client was subscribed. */
686 cds_list_for_each_entry_safe(condition_list_element, tmp,
687 &client->condition_list, node) {
688 (void) notification_thread_client_unsubscribe(client,
689 condition_list_element->condition, state, NULL);
690 }
691
692 if (client->socket >= 0) {
693 (void) lttcomm_close_unix_sock(client->socket);
694 }
695 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
696 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
697 free(client);
698}
699
700/*
701 * Call with rcu_read_lock held (and hold for the lifetime of the returned
702 * client pointer).
703 */
704static
705struct notification_client *get_client_from_socket(int socket,
706 struct notification_thread_state *state)
707{
708 struct cds_lfht_iter iter;
709 struct cds_lfht_node *node;
710 struct notification_client *client = NULL;
711
712 cds_lfht_lookup(state->client_socket_ht,
713 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
714 match_client,
715 (void *) (unsigned long) socket,
716 &iter);
717 node = cds_lfht_iter_get_node(&iter);
718 if (!node) {
719 goto end;
720 }
721
722 client = caa_container_of(node, struct notification_client,
723 client_socket_ht_node);
724end:
725 return client;
726}
727
728static
729bool trigger_applies_to_channel(struct lttng_trigger *trigger,
730 struct channel_info *info)
731{
732 enum lttng_condition_status status;
733 struct lttng_condition *condition;
734 const char *trigger_session_name = NULL;
735 const char *trigger_channel_name = NULL;
736 enum lttng_domain_type trigger_domain;
737
738 condition = lttng_trigger_get_condition(trigger);
739 if (!condition) {
740 goto fail;
741 }
742
743 switch (lttng_condition_get_type(condition)) {
744 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
745 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
746 break;
747 default:
748 goto fail;
749 }
750
751 status = lttng_condition_buffer_usage_get_domain_type(condition,
752 &trigger_domain);
753 assert(status == LTTNG_CONDITION_STATUS_OK);
754 if (info->key.domain != trigger_domain) {
755 goto fail;
756 }
757
758 status = lttng_condition_buffer_usage_get_session_name(
759 condition, &trigger_session_name);
760 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
761
762 status = lttng_condition_buffer_usage_get_channel_name(
763 condition, &trigger_channel_name);
764 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
765
766 if (strcmp(info->session_name, trigger_session_name)) {
767 goto fail;
768 }
769 if (strcmp(info->channel_name, trigger_channel_name)) {
770 goto fail;
771 }
772
773 return true;
774fail:
775 return false;
776}
777
778static
779bool trigger_applies_to_client(struct lttng_trigger *trigger,
780 struct notification_client *client)
781{
782 bool applies = false;
783 struct lttng_condition_list_element *condition_list_element;
784
785 cds_list_for_each_entry(condition_list_element, &client->condition_list,
786 node) {
787 applies = lttng_condition_is_equal(
788 condition_list_element->condition,
789 lttng_trigger_get_condition(trigger));
790 if (applies) {
791 break;
792 }
793 }
794 return applies;
795}
796
ab0ee2ca
JG
797static
798int handle_notification_thread_command_add_channel(
799 struct notification_thread_state *state,
800 struct channel_info *channel_info,
801 enum lttng_error_code *cmd_result)
802{
803 struct cds_list_head trigger_list;
804 struct channel_info *new_channel_info;
805 struct channel_key *channel_key;
806 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
807 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
808 int trigger_count = 0;
809 struct cds_lfht_iter iter;
810
811 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
812 channel_info->channel_name, channel_info->session_name,
813 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
814
815 CDS_INIT_LIST_HEAD(&trigger_list);
816
817 new_channel_info = channel_info_copy(channel_info);
818 if (!new_channel_info) {
819 goto error;
820 }
821
822 channel_key = &new_channel_info->key;
823
824 /* Build a list of all triggers applying to the new channel. */
825 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
826 node) {
827 struct lttng_trigger_list_element *new_element;
828
829 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
830 channel_info)) {
831 continue;
832 }
833
834 new_element = zmalloc(sizeof(*new_element));
835 if (!new_element) {
836 goto error;
837 }
838 CDS_INIT_LIST_HEAD(&new_element->node);
839 new_element->trigger = trigger_ht_element->trigger;
840 cds_list_add(&new_element->node, &trigger_list);
841 trigger_count++;
842 }
843
844 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
845 trigger_count);
846 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
847 if (!channel_trigger_list) {
848 goto error;
849 }
850 channel_trigger_list->channel_key = *channel_key;
851 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
852 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
853 cds_list_splice(&trigger_list, &channel_trigger_list->list);
854
855 rcu_read_lock();
856 /* Add channel to the channel_ht which owns the channel_infos. */
857 cds_lfht_add(state->channels_ht,
858 hash_channel_key(channel_key),
859 &new_channel_info->channels_ht_node);
860 /*
861 * Add the list of triggers associated with this channel to the
862 * channel_triggers_ht.
863 */
864 cds_lfht_add(state->channel_triggers_ht,
865 hash_channel_key(channel_key),
866 &channel_trigger_list->channel_triggers_ht_node);
867 rcu_read_unlock();
868 *cmd_result = LTTNG_OK;
869 return 0;
870error:
871 /* Empty trigger list */
872 channel_info_destroy(new_channel_info);
873 return 1;
874}
875
876static
877int handle_notification_thread_command_remove_channel(
878 struct notification_thread_state *state,
879 uint64_t channel_key, enum lttng_domain_type domain,
880 enum lttng_error_code *cmd_result)
881{
882 struct cds_lfht_node *node;
883 struct cds_lfht_iter iter;
884 struct lttng_channel_trigger_list *trigger_list;
885 struct lttng_trigger_list_element *trigger_list_element, *tmp;
886 struct channel_key key = { .key = channel_key, .domain = domain };
887 struct channel_info *channel_info;
888
889 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
890 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
891
892 rcu_read_lock();
893
894 cds_lfht_lookup(state->channel_triggers_ht,
895 hash_channel_key(&key),
896 match_channel_trigger_list,
897 &key,
898 &iter);
899 node = cds_lfht_iter_get_node(&iter);
900 /*
901 * There is a severe internal error if we are being asked to remove a
902 * channel that doesn't exist.
903 */
904 if (!node) {
905 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
906 goto end;
907 }
908
909 /* Free the list of triggers associated with this channel. */
910 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
911 channel_triggers_ht_node);
912 cds_list_for_each_entry_safe(trigger_list_element, tmp,
913 &trigger_list->list, node) {
914 cds_list_del(&trigger_list_element->node);
915 free(trigger_list_element);
916 }
917 cds_lfht_del(state->channel_triggers_ht, node);
918 free(trigger_list);
919
920 /* Free sampled channel state. */
921 cds_lfht_lookup(state->channel_state_ht,
922 hash_channel_key(&key),
923 match_channel_state_sample,
924 &key,
925 &iter);
926 node = cds_lfht_iter_get_node(&iter);
927 /*
928 * This is expected to be NULL if the channel is destroyed before we
929 * received a sample.
930 */
931 if (node) {
932 struct channel_state_sample *sample = caa_container_of(node,
933 struct channel_state_sample,
934 channel_state_ht_node);
935
936 cds_lfht_del(state->channel_state_ht, node);
937 free(sample);
938 }
939
940 /* Remove the channel from the channels_ht and free it. */
941 cds_lfht_lookup(state->channels_ht,
942 hash_channel_key(&key),
943 match_channel_info,
944 &key,
945 &iter);
946 node = cds_lfht_iter_get_node(&iter);
947 assert(node);
948 channel_info = caa_container_of(node, struct channel_info,
949 channels_ht_node);
950 cds_lfht_del(state->channels_ht, node);
951 channel_info_destroy(channel_info);
952end:
953 rcu_read_unlock();
954 *cmd_result = LTTNG_OK;
955 return 0;
956}
957
fa7c4fcd
JG
958static
959int condition_is_supported(struct lttng_condition *condition)
960{
961 int ret;
962
963 switch (lttng_condition_get_type(condition)) {
964 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
965 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
966 {
967 enum lttng_domain_type domain;
968
969 ret = lttng_condition_buffer_usage_get_domain_type(condition,
970 &domain);
971 if (ret) {
972 ret = -1;
973 goto end;
974 }
975
976 if (domain != LTTNG_DOMAIN_KERNEL) {
977 ret = 1;
978 goto end;
979 }
980
981 /*
982 * Older kernel tracers don't expose the API to monitor their
983 * buffers. Therefore, we reject triggers that require that
984 * mechanism to be available to be evaluated.
985 */
986 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
987 kernel_tracer_fd);
988 break;
989 }
990 default:
991 ret = 1;
992 }
993end:
994 return ret;
995}
996
ab0ee2ca
JG
997/*
998 * FIXME A client's credentials are not checked when registering a trigger, nor
999 * are they stored alongside with the trigger.
1000 *
fa7c4fcd 1001 * The effects of this are benign since:
ab0ee2ca
JG
1002 * - The client will succeed in registering the trigger, as it is valid,
1003 * - The trigger will, internally, be bound to the channel,
1004 * - The notifications will not be sent since the client's credentials
1005 * are checked against the channel at that moment.
fa7c4fcd
JG
1006 *
1007 * If this function returns a non-zero value, it means something is
1008 * fundamentally and the whole subsystem/thread will be torn down.
1009 *
1010 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1011 * error code.
ab0ee2ca
JG
1012 */
1013static
1014int handle_notification_thread_command_register_trigger(
1015 struct notification_thread_state *state,
1016 struct lttng_trigger *trigger,
1017 enum lttng_error_code *cmd_result)
1018{
1019 int ret = 0;
1020 struct lttng_condition *condition;
1021 struct notification_client *client;
1022 struct notification_client_list *client_list = NULL;
1023 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1024 struct notification_client_list_element *client_list_element, *tmp;
1025 struct cds_lfht_node *node;
1026 struct cds_lfht_iter iter;
1027 struct channel_info *channel;
1028 bool free_trigger = true;
1029
1030 rcu_read_lock();
1031
1032 condition = lttng_trigger_get_condition(trigger);
fa7c4fcd
JG
1033 assert(condition);
1034
1035 ret = condition_is_supported(condition);
1036 if (ret < 0) {
1037 goto error;
1038 } else if (ret == 0) {
1039 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1040 goto error;
1041 } else {
1042 /* Feature is supported, continue. */
1043 ret = 0;
1044 }
1045
ab0ee2ca
JG
1046 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1047 if (!trigger_ht_element) {
1048 ret = -1;
1049 goto error;
1050 }
1051
1052 /* Add trigger to the trigger_ht. */
1053 cds_lfht_node_init(&trigger_ht_element->node);
1054 trigger_ht_element->trigger = trigger;
1055
1056 node = cds_lfht_add_unique(state->triggers_ht,
1057 lttng_condition_hash(condition),
1058 match_condition,
1059 condition,
1060 &trigger_ht_element->node);
1061 if (node != &trigger_ht_element->node) {
1062 /* Not a fatal error, simply report it to the client. */
1063 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1064 goto error_free_ht_element;
1065 }
1066
1067 /*
1068 * Ownership of the trigger and of its wrapper was transfered to
1069 * the triggers_ht.
1070 */
1071 trigger_ht_element = NULL;
1072 free_trigger = false;
1073
1074 /*
1075 * The rest only applies to triggers that have a "notify" action.
1076 * It is not skipped as this is the only action type currently
1077 * supported.
1078 */
1079 client_list = zmalloc(sizeof(*client_list));
1080 if (!client_list) {
1081 ret = -1;
1082 goto error_free_ht_element;
1083 }
1084 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1085 CDS_INIT_LIST_HEAD(&client_list->list);
1086 client_list->trigger = trigger;
1087
1088 /* Build a list of clients to which this new trigger applies. */
1089 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1090 client_socket_ht_node) {
1091 if (!trigger_applies_to_client(trigger, client)) {
1092 continue;
1093 }
1094
1095 client_list_element = zmalloc(sizeof(*client_list_element));
1096 if (!client_list_element) {
1097 ret = -1;
1098 goto error_free_client_list;
1099 }
1100 CDS_INIT_LIST_HEAD(&client_list_element->node);
1101 client_list_element->client = client;
1102 cds_list_add(&client_list_element->node, &client_list->list);
1103 }
1104
1105 cds_lfht_add(state->notification_trigger_clients_ht,
1106 lttng_condition_hash(condition),
1107 &client_list->notification_trigger_ht_node);
1108 /*
1109 * Client list ownership transferred to the
1110 * notification_trigger_clients_ht.
1111 */
1112 client_list = NULL;
1113
1114 /*
1115 * Add the trigger to list of triggers bound to the channels currently
1116 * known.
1117 */
1118 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1119 channels_ht_node) {
1120 struct lttng_trigger_list_element *trigger_list_element;
1121 struct lttng_channel_trigger_list *trigger_list;
1122
1123 if (!trigger_applies_to_channel(trigger, channel)) {
1124 continue;
1125 }
1126
1127 cds_lfht_lookup(state->channel_triggers_ht,
1128 hash_channel_key(&channel->key),
1129 match_channel_trigger_list,
1130 &channel->key,
1131 &iter);
1132 node = cds_lfht_iter_get_node(&iter);
1133 assert(node);
ab0ee2ca
JG
1134 trigger_list = caa_container_of(node,
1135 struct lttng_channel_trigger_list,
1136 channel_triggers_ht_node);
1137
1138 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1139 if (!trigger_list_element) {
1140 ret = -1;
1141 goto error_free_client_list;
1142 }
1143 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1144 trigger_list_element->trigger = trigger;
1145 cds_list_add(&trigger_list_element->node, &trigger_list->list);
f0225486 1146
ab0ee2ca
JG
1147 /* A trigger can only apply to one channel. */
1148 break;
1149 }
1150
1151 *cmd_result = LTTNG_OK;
1152error_free_client_list:
1153 if (client_list) {
1154 cds_list_for_each_entry_safe(client_list_element, tmp,
1155 &client_list->list, node) {
1156 free(client_list_element);
1157 }
1158 free(client_list);
1159 }
1160error_free_ht_element:
1161 free(trigger_ht_element);
1162error:
1163 if (free_trigger) {
1164 struct lttng_action *action = lttng_trigger_get_action(trigger);
1165
1166 lttng_condition_destroy(condition);
1167 lttng_action_destroy(action);
1168 lttng_trigger_destroy(trigger);
1169 }
1170 rcu_read_unlock();
1171 return ret;
1172}
1173
7c2340da 1174static
ab0ee2ca
JG
1175int handle_notification_thread_command_unregister_trigger(
1176 struct notification_thread_state *state,
1177 struct lttng_trigger *trigger,
1178 enum lttng_error_code *_cmd_reply)
1179{
1180 struct cds_lfht_iter iter;
1181 struct cds_lfht_node *node, *triggers_ht_node;
1182 struct lttng_channel_trigger_list *trigger_list;
1183 struct notification_client_list *client_list;
1184 struct notification_client_list_element *client_list_element, *tmp;
1185 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1186 struct lttng_condition *condition = lttng_trigger_get_condition(
1187 trigger);
1188 struct lttng_action *action;
1189 enum lttng_error_code cmd_reply;
1190
1191 rcu_read_lock();
1192
1193 cds_lfht_lookup(state->triggers_ht,
1194 lttng_condition_hash(condition),
1195 match_condition,
1196 condition,
1197 &iter);
1198 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1199 if (!triggers_ht_node) {
1200 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1201 goto end;
1202 } else {
1203 cmd_reply = LTTNG_OK;
1204 }
1205
1206 /* Remove trigger from channel_triggers_ht. */
1207 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1208 channel_triggers_ht_node) {
1209 struct lttng_trigger_list_element *trigger_element, *tmp;
1210
1211 cds_list_for_each_entry_safe(trigger_element, tmp,
1212 &trigger_list->list, node) {
1213 struct lttng_condition *current_condition =
1214 lttng_trigger_get_condition(
1215 trigger_element->trigger);
1216
1217 assert(current_condition);
1218 if (!lttng_condition_is_equal(condition,
1219 current_condition)) {
1220 continue;
1221 }
1222
1223 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1224 cds_list_del(&trigger_element->node);
f0225486
JR
1225 /* A trigger can only appear once per channel */
1226 break;
ab0ee2ca
JG
1227 }
1228 }
1229
1230 /*
1231 * Remove and release the client list from
1232 * notification_trigger_clients_ht.
1233 */
1234 cds_lfht_lookup(state->notification_trigger_clients_ht,
1235 lttng_condition_hash(condition),
1236 match_client_list,
1237 trigger,
1238 &iter);
1239 node = cds_lfht_iter_get_node(&iter);
1240 assert(node);
1241 client_list = caa_container_of(node, struct notification_client_list,
1242 notification_trigger_ht_node);
1243 cds_list_for_each_entry_safe(client_list_element, tmp,
1244 &client_list->list, node) {
1245 free(client_list_element);
1246 }
1247 cds_lfht_del(state->notification_trigger_clients_ht, node);
1248 free(client_list);
1249
1250 /* Remove trigger from triggers_ht. */
1251 trigger_ht_element = caa_container_of(triggers_ht_node,
1252 struct lttng_trigger_ht_element, node);
1253 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1254
1255 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1256 lttng_condition_destroy(condition);
1257 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1258 lttng_action_destroy(action);
1259 lttng_trigger_destroy(trigger_ht_element->trigger);
1260 free(trigger_ht_element);
1261end:
1262 rcu_read_unlock();
1263 if (_cmd_reply) {
1264 *_cmd_reply = cmd_reply;
1265 }
1266 return 0;
1267}
1268
1269/* Returns 0 on success, 1 on exit requested, negative value on error. */
1270int handle_notification_thread_command(
1271 struct notification_thread_handle *handle,
1272 struct notification_thread_state *state)
1273{
1274 int ret;
1275 uint64_t counter;
1276 struct notification_thread_command *cmd;
1277
1278 /* Read event_fd to put it back into a quiescent state. */
800c5e10 1279 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
ab0ee2ca
JG
1280 if (ret == -1) {
1281 goto error;
1282 }
1283
1284 pthread_mutex_lock(&handle->cmd_queue.lock);
1285 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1286 struct notification_thread_command, cmd_list_node);
1287 switch (cmd->type) {
1288 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1289 DBG("[notification-thread] Received register trigger command");
1290 ret = handle_notification_thread_command_register_trigger(
1291 state, cmd->parameters.trigger,
1292 &cmd->reply_code);
1293 break;
1294 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1295 DBG("[notification-thread] Received unregister trigger command");
1296 ret = handle_notification_thread_command_unregister_trigger(
1297 state, cmd->parameters.trigger,
1298 &cmd->reply_code);
1299 break;
1300 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1301 DBG("[notification-thread] Received add channel command");
1302 ret = handle_notification_thread_command_add_channel(
1303 state, &cmd->parameters.add_channel,
1304 &cmd->reply_code);
1305 break;
1306 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1307 DBG("[notification-thread] Received remove channel command");
1308 ret = handle_notification_thread_command_remove_channel(
1309 state, cmd->parameters.remove_channel.key,
1310 cmd->parameters.remove_channel.domain,
1311 &cmd->reply_code);
1312 break;
1313 case NOTIFICATION_COMMAND_TYPE_QUIT:
1314 DBG("[notification-thread] Received quit command");
1315 cmd->reply_code = LTTNG_OK;
1316 ret = 1;
1317 goto end;
1318 default:
1319 ERR("[notification-thread] Unknown internal command received");
1320 goto error_unlock;
1321 }
1322
1323 if (ret) {
1324 goto error_unlock;
1325 }
1326end:
1327 cds_list_del(&cmd->cmd_list_node);
8f5e7d40 1328 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1329 pthread_mutex_unlock(&handle->cmd_queue.lock);
1330 return ret;
1331error_unlock:
1332 /* Wake-up and return a fatal error to the calling thread. */
8f5e7d40 1333 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1334 pthread_mutex_unlock(&handle->cmd_queue.lock);
1335 cmd->reply_code = LTTNG_ERR_FATAL;
1336error:
1337 /* Indicate a fatal error to the caller. */
1338 return -1;
1339}
1340
1341static
1342unsigned long hash_client_socket(int socket)
1343{
1344 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1345}
1346
1347static
1348int socket_set_non_blocking(int socket)
1349{
1350 int ret, flags;
1351
1352 /* Set the pipe as non-blocking. */
1353 ret = fcntl(socket, F_GETFL, 0);
1354 if (ret == -1) {
1355 PERROR("fcntl get socket flags");
1356 goto end;
1357 }
1358 flags = ret;
1359
1360 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1361 if (ret == -1) {
1362 PERROR("fcntl set O_NONBLOCK socket flag");
1363 goto end;
1364 }
1365 DBG("Client socket (fd = %i) set as non-blocking", socket);
1366end:
1367 return ret;
1368}
1369
1370static
b5eb6b46 1371int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1372{
1373 int ret;
1374
1375 ret = lttng_dynamic_buffer_set_size(
1376 &client->communication.inbound.buffer, 0);
1377 assert(!ret);
1378
1379 client->communication.inbound.bytes_to_receive =
1380 sizeof(struct lttng_notification_channel_message);
1381 client->communication.inbound.msg_type =
1382 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1383 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1384 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
b5eb6b46
JG
1385 ret = lttng_dynamic_buffer_set_size(
1386 &client->communication.inbound.buffer,
1387 client->communication.inbound.bytes_to_receive);
1388 return ret;
ab0ee2ca
JG
1389}
1390
1391int handle_notification_thread_client_connect(
1392 struct notification_thread_state *state)
1393{
1394 int ret;
1395 struct notification_client *client;
1396
1397 DBG("[notification-thread] Handling new notification channel client connection");
1398
1399 client = zmalloc(sizeof(*client));
1400 if (!client) {
1401 /* Fatal error. */
1402 ret = -1;
1403 goto error;
1404 }
1405 CDS_INIT_LIST_HEAD(&client->condition_list);
1406 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1407 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
f950d02c 1408 client->communication.inbound.expect_creds = true;
b5eb6b46
JG
1409 ret = client_reset_inbound_state(client);
1410 if (ret) {
1411 ERR("[notification-thread] Failed to reset client communication's inbound state");
1412 ret = 0;
1413 goto error;
1414 }
ab0ee2ca
JG
1415
1416 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1417 if (ret < 0) {
1418 ERR("[notification-thread] Failed to accept new notification channel client connection");
1419 ret = 0;
1420 goto error;
1421 }
1422
1423 client->socket = ret;
1424
1425 ret = socket_set_non_blocking(client->socket);
1426 if (ret) {
1427 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1428 goto error;
1429 }
1430
1431 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1432 if (ret < 0) {
1433 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1434 ret = 0;
1435 goto error;
1436 }
1437
1438 ret = lttng_poll_add(&state->events, client->socket,
1439 LPOLLIN | LPOLLERR |
1440 LPOLLHUP | LPOLLRDHUP);
1441 if (ret < 0) {
1442 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1443 ret = 0;
1444 goto error;
1445 }
1446 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1447 client->socket);
1448
ab0ee2ca
JG
1449 rcu_read_lock();
1450 cds_lfht_add(state->client_socket_ht,
1451 hash_client_socket(client->socket),
1452 &client->client_socket_ht_node);
1453 rcu_read_unlock();
1454
1455 return ret;
1456error:
1457 notification_client_destroy(client, state);
1458 return ret;
1459}
1460
1461int handle_notification_thread_client_disconnect(
1462 int client_socket,
1463 struct notification_thread_state *state)
1464{
1465 int ret = 0;
1466 struct notification_client *client;
1467
1468 rcu_read_lock();
1469 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1470 client_socket);
1471 client = get_client_from_socket(client_socket, state);
1472 if (!client) {
1473 /* Internal state corruption, fatal error. */
1474 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1475 client_socket);
1476 ret = -1;
1477 goto end;
1478 }
1479
1480 ret = lttng_poll_del(&state->events, client_socket);
1481 if (ret) {
1482 ERR("[notification-thread] Failed to remove client socket from poll set");
1483 }
1484 cds_lfht_del(state->client_socket_ht,
1485 &client->client_socket_ht_node);
1486 notification_client_destroy(client, state);
1487end:
1488 rcu_read_unlock();
1489 return ret;
1490}
1491
1492int handle_notification_thread_client_disconnect_all(
1493 struct notification_thread_state *state)
1494{
1495 struct cds_lfht_iter iter;
1496 struct notification_client *client;
1497 bool error_encoutered = false;
1498
1499 rcu_read_lock();
1500 DBG("[notification-thread] Closing all client connections");
1501 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1502 client_socket_ht_node) {
1503 int ret;
1504
1505 ret = handle_notification_thread_client_disconnect(
1506 client->socket, state);
1507 if (ret) {
1508 error_encoutered = true;
1509 }
1510 }
1511 rcu_read_unlock();
1512 return error_encoutered ? 1 : 0;
1513}
1514
1515int handle_notification_thread_trigger_unregister_all(
1516 struct notification_thread_state *state)
1517{
8a9bd316 1518 bool error_occurred = false;
ab0ee2ca
JG
1519 struct cds_lfht_iter iter;
1520 struct lttng_trigger_ht_element *trigger_ht_element;
1521
1522 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1523 node) {
1524 int ret = handle_notification_thread_command_unregister_trigger(
1525 state, trigger_ht_element->trigger, NULL);
1526 if (ret) {
8a9bd316 1527 error_occurred = true;
ab0ee2ca
JG
1528 }
1529 }
8a9bd316 1530 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1531}
1532
1533static
1534int client_flush_outgoing_queue(struct notification_client *client,
1535 struct notification_thread_state *state)
1536{
1537 ssize_t ret;
1538 size_t to_send_count;
1539
1540 assert(client->communication.outbound.buffer.size != 0);
1541 to_send_count = client->communication.outbound.buffer.size;
1542 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1543 client->socket);
1544
1545 ret = lttcomm_send_unix_sock_non_block(client->socket,
1546 client->communication.outbound.buffer.data,
1547 to_send_count);
1548 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1549 (ret > 0 && ret < to_send_count)) {
1550 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1551 client->socket);
1552 to_send_count -= max(ret, 0);
1553
1554 memcpy(client->communication.outbound.buffer.data,
1555 client->communication.outbound.buffer.data +
1556 client->communication.outbound.buffer.size - to_send_count,
1557 to_send_count);
1558 ret = lttng_dynamic_buffer_set_size(
1559 &client->communication.outbound.buffer,
1560 to_send_count);
1561 if (ret) {
1562 goto error;
1563 }
1564
1565 /*
1566 * We want to be notified whenever there is buffer space
1567 * available to send the rest of the payload.
1568 */
1569 ret = lttng_poll_mod(&state->events, client->socket,
1570 CLIENT_POLL_MASK_IN_OUT);
1571 if (ret) {
1572 goto error;
1573 }
1574 } else if (ret < 0) {
1575 /* Generic error, disconnect the client. */
1576 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1577 client->socket);
1578 ret = handle_notification_thread_client_disconnect(
1579 client->socket, state);
1580 if (ret) {
1581 goto error;
1582 }
1583 } else {
1584 /* No error and flushed the queue completely. */
1585 ret = lttng_dynamic_buffer_set_size(
1586 &client->communication.outbound.buffer, 0);
1587 if (ret) {
1588 goto error;
1589 }
1590 ret = lttng_poll_mod(&state->events, client->socket,
1591 CLIENT_POLL_MASK_IN);
1592 if (ret) {
1593 goto error;
1594 }
1595
1596 client->communication.outbound.queued_command_reply = false;
1597 client->communication.outbound.dropped_notification = false;
1598 }
1599
1600 return 0;
1601error:
1602 return -1;
1603}
1604
1605static
1606int client_send_command_reply(struct notification_client *client,
1607 struct notification_thread_state *state,
1608 enum lttng_notification_channel_status status)
1609{
1610 int ret;
1611 struct lttng_notification_channel_command_reply reply = {
1612 .status = (int8_t) status,
1613 };
1614 struct lttng_notification_channel_message msg = {
1615 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1616 .size = sizeof(reply),
1617 };
1618 char buffer[sizeof(msg) + sizeof(reply)];
1619
1620 if (client->communication.outbound.queued_command_reply) {
1621 /* Protocol error. */
1622 goto error;
1623 }
1624
1625 memcpy(buffer, &msg, sizeof(msg));
1626 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1627 DBG("[notification-thread] Send command reply (%i)", (int) status);
1628
1629 /* Enqueue buffer to outgoing queue and flush it. */
1630 ret = lttng_dynamic_buffer_append(
1631 &client->communication.outbound.buffer,
1632 buffer, sizeof(buffer));
1633 if (ret) {
1634 goto error;
1635 }
1636
1637 ret = client_flush_outgoing_queue(client, state);
1638 if (ret) {
1639 goto error;
1640 }
1641
1642 if (client->communication.outbound.buffer.size != 0) {
1643 /* Queue could not be emptied. */
1644 client->communication.outbound.queued_command_reply = true;
1645 }
1646
1647 return 0;
1648error:
1649 return -1;
1650}
1651
1652static
1653int client_dispatch_message(struct notification_client *client,
1654 struct notification_thread_state *state)
1655{
1656 int ret = 0;
1657
1658 if (client->communication.inbound.msg_type !=
1659 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1660 client->communication.inbound.msg_type !=
1661 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1662 !client->validated) {
1663 WARN("[notification-thread] client attempted a command before handshake");
1664 ret = -1;
1665 goto end;
1666 }
1667
1668 switch (client->communication.inbound.msg_type) {
1669 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1670 {
1671 /*
1672 * Receiving message header. The function will be called again
1673 * once the rest of the message as been received and can be
1674 * interpreted.
1675 */
1676 const struct lttng_notification_channel_message *msg;
1677
1678 assert(sizeof(*msg) ==
1679 client->communication.inbound.buffer.size);
1680 msg = (const struct lttng_notification_channel_message *)
1681 client->communication.inbound.buffer.data;
1682
1683 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1684 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1685 ret = -1;
1686 goto end;
1687 }
1688
1689 switch (msg->type) {
1690 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1691 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1692 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1693 break;
1694 default:
1695 ret = -1;
1696 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1697 goto end;
1698 }
1699
1700 client->communication.inbound.bytes_to_receive = msg->size;
1701 client->communication.inbound.msg_type =
1702 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1703 ret = lttng_dynamic_buffer_set_size(
b5eb6b46 1704 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1705 if (ret) {
1706 goto end;
1707 }
1708 break;
1709 }
1710 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1711 {
1712 struct lttng_notification_channel_command_handshake *handshake_client;
1713 struct lttng_notification_channel_command_handshake handshake_reply = {
1714 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1715 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1716 };
1717 struct lttng_notification_channel_message msg_header = {
1718 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1719 .size = sizeof(handshake_reply),
1720 };
1721 enum lttng_notification_channel_status status =
1722 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1723 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1724
1725 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1726 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1727 sizeof(handshake_reply));
1728
1729 handshake_client =
1730 (struct lttng_notification_channel_command_handshake *)
1731 client->communication.inbound.buffer.data;
b8165ded 1732 client->major = handshake_client->major;
ab0ee2ca
JG
1733 client->minor = handshake_client->minor;
1734 if (!client->communication.inbound.creds_received) {
1735 ERR("[notification-thread] No credentials received from client");
1736 ret = -1;
1737 goto end;
1738 }
1739
1740 client->uid = LTTNG_SOCK_GET_UID_CRED(
1741 &client->communication.inbound.creds);
1742 client->gid = LTTNG_SOCK_GET_GID_CRED(
1743 &client->communication.inbound.creds);
1744 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1745 client->uid, client->gid, (int) client->major,
1746 (int) client->minor);
1747
1748 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1749 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1750 }
1751
1752 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1753 send_buffer, sizeof(send_buffer));
1754 if (ret) {
1755 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1756 goto end;
1757 }
1758
1759 ret = client_flush_outgoing_queue(client, state);
1760 if (ret) {
1761 goto end;
1762 }
1763
1764 ret = client_send_command_reply(client, state, status);
1765 if (ret) {
1766 ERR("[notification-thread] Failed to send reply to notification channel client");
1767 goto end;
1768 }
1769
1770 /* Set reception state to receive the next message header. */
b5eb6b46
JG
1771 ret = client_reset_inbound_state(client);
1772 if (ret) {
1773 ERR("[notification-thread] Failed to reset client communication's inbound state");
1774 goto end;
1775 }
ab0ee2ca
JG
1776 client->validated = true;
1777 break;
1778 }
1779 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1780 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1781 {
1782 struct lttng_condition *condition;
1783 enum lttng_notification_channel_status status =
1784 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1785 const struct lttng_buffer_view condition_view =
1786 lttng_buffer_view_from_dynamic_buffer(
1787 &client->communication.inbound.buffer,
1788 0, -1);
1789 size_t expected_condition_size =
1790 client->communication.inbound.buffer.size;
1791
1792 ret = lttng_condition_create_from_buffer(&condition_view,
1793 &condition);
1794 if (ret != expected_condition_size) {
1795 ERR("[notification-thread] Malformed condition received from client");
1796 goto end;
1797 }
1798
1799 if (client->communication.inbound.msg_type ==
1800 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1801 /*
1802 * FIXME The current state should be evaluated on
1803 * subscription.
1804 */
1805 ret = notification_thread_client_subscribe(client,
1806 condition, state, &status);
1807 } else {
1808 ret = notification_thread_client_unsubscribe(client,
1809 condition, state, &status);
1810 }
1811 if (ret) {
1812 goto end;
1813 }
1814
1815 ret = client_send_command_reply(client, state, status);
1816 if (ret) {
1817 ERR("[notification-thread] Failed to send reply to notification channel client");
1818 goto end;
1819 }
1820
1821 /* Set reception state to receive the next message header. */
b5eb6b46
JG
1822 ret = client_reset_inbound_state(client);
1823 if (ret) {
1824 ERR("[notification-thread] Failed to reset client communication's inbound state");
1825 goto end;
1826 }
ab0ee2ca
JG
1827 break;
1828 }
1829 default:
1830 abort();
1831 }
1832end:
1833 return ret;
1834}
1835
1836/* Incoming data from client. */
1837int handle_notification_thread_client_in(
1838 struct notification_thread_state *state, int socket)
1839{
b5eb6b46 1840 int ret = 0;
ab0ee2ca
JG
1841 struct notification_client *client;
1842 ssize_t recv_ret;
1843 size_t offset;
1844
1845 client = get_client_from_socket(socket, state);
1846 if (!client) {
1847 /* Internal error, abort. */
1848 ret = -1;
1849 goto end;
1850 }
1851
b5eb6b46
JG
1852 offset = client->communication.inbound.buffer.size -
1853 client->communication.inbound.bytes_to_receive;
f950d02c 1854 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1855 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1856 client->communication.inbound.buffer.data + offset,
1857 client->communication.inbound.bytes_to_receive,
1858 &client->communication.inbound.creds);
1859 if (recv_ret > 0) {
f950d02c 1860 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1861 client->communication.inbound.creds_received = true;
1862 }
1863 } else {
1864 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1865 client->communication.inbound.buffer.data + offset,
1866 client->communication.inbound.bytes_to_receive);
1867 }
1868 if (recv_ret < 0) {
1869 goto error_disconnect_client;
1870 }
1871
1872 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1873 if (client->communication.inbound.bytes_to_receive == 0) {
1874 ret = client_dispatch_message(client, state);
1875 if (ret) {
1876 /*
1877 * Only returns an error if this client must be
1878 * disconnected.
1879 */
1880 goto error_disconnect_client;
1881 }
1882 } else {
1883 goto end;
1884 }
1885end:
1886 return ret;
1887error_disconnect_client:
1888 ret = handle_notification_thread_client_disconnect(socket, state);
1889 return ret;
1890}
1891
1892/* Client ready to receive outgoing data. */
1893int handle_notification_thread_client_out(
1894 struct notification_thread_state *state, int socket)
1895{
1896 int ret;
1897 struct notification_client *client;
1898
1899 client = get_client_from_socket(socket, state);
1900 if (!client) {
1901 /* Internal error, abort. */
1902 ret = -1;
1903 goto end;
1904 }
1905
1906 ret = client_flush_outgoing_queue(client, state);
1907 if (ret) {
1908 goto end;
1909 }
1910end:
1911 return ret;
1912}
1913
1914static
1915bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1916 struct channel_state_sample *sample, uint64_t buffer_capacity)
1917{
1918 bool result = false;
1919 uint64_t threshold;
1920 enum lttng_condition_type condition_type;
1921 struct lttng_condition_buffer_usage *use_condition = container_of(
1922 condition, struct lttng_condition_buffer_usage,
1923 parent);
1924
1925 if (!sample) {
1926 goto end;
1927 }
1928
1929 if (use_condition->threshold_bytes.set) {
1930 threshold = use_condition->threshold_bytes.value;
1931 } else {
1932 /*
1933 * Threshold was expressed as a ratio.
1934 *
1935 * TODO the threshold (in bytes) of conditions expressed
1936 * as a ratio of total buffer size could be cached to
1937 * forego this double-multiplication or it could be performed
1938 * as fixed-point math.
1939 *
1940 * Note that caching should accomodate the case where the
1941 * condition applies to multiple channels (i.e. don't assume
1942 * that all channels matching my_chann* have the same size...)
1943 */
1944 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1945 (double) buffer_capacity);
1946 }
1947
1948 condition_type = lttng_condition_get_type(condition);
1949 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1950 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1951 threshold, sample->highest_usage);
1952
1953 /*
1954 * The low condition should only be triggered once _all_ of the
1955 * streams in a channel have gone below the "low" threshold.
1956 */
1957 if (sample->highest_usage <= threshold) {
1958 result = true;
1959 }
1960 } else {
1961 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1962 threshold, sample->highest_usage);
1963
1964 /*
1965 * For high buffer usage scenarios, we want to trigger whenever
1966 * _any_ of the streams has reached the "high" threshold.
1967 */
1968 if (sample->highest_usage >= threshold) {
1969 result = true;
1970 }
1971 }
1972end:
1973 return result;
1974}
1975
1976static
1977int evaluate_condition(struct lttng_condition *condition,
1978 struct lttng_evaluation **evaluation,
1979 struct notification_thread_state *state,
1980 struct channel_state_sample *previous_sample,
1981 struct channel_state_sample *latest_sample,
1982 uint64_t buffer_capacity)
1983{
1984 int ret = 0;
1985 enum lttng_condition_type condition_type;
1986 bool previous_sample_result;
1987 bool latest_sample_result;
1988
1989 condition_type = lttng_condition_get_type(condition);
1990 /* No other condition type supported for the moment. */
1991 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1992 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1993
1994 previous_sample_result = evaluate_buffer_usage_condition(condition,
1995 previous_sample, buffer_capacity);
1996 latest_sample_result = evaluate_buffer_usage_condition(condition,
1997 latest_sample, buffer_capacity);
1998
1999 if (!latest_sample_result ||
2000 (previous_sample_result == latest_sample_result)) {
2001 /*
2002 * Only trigger on a condition evaluation transition.
2003 *
2004 * NOTE: This edge-triggered logic may not be appropriate for
2005 * future condition types.
2006 */
2007 goto end;
2008 }
2009
2010 if (evaluation && latest_sample_result) {
2011 *evaluation = lttng_evaluation_buffer_usage_create(
2012 condition_type,
2013 latest_sample->highest_usage,
2014 buffer_capacity);
2015 if (!*evaluation) {
2016 ret = -1;
2017 goto end;
2018 }
2019 }
2020end:
2021 return ret;
2022}
2023
2024static
2025int client_enqueue_dropped_notification(struct notification_client *client,
2026 struct notification_thread_state *state)
2027{
2028 int ret;
2029 struct lttng_notification_channel_message msg = {
2030 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2031 .size = 0,
2032 };
2033
2034 ret = lttng_dynamic_buffer_append(
2035 &client->communication.outbound.buffer, &msg,
2036 sizeof(msg));
2037 return ret;
2038}
2039
2040static
2041int send_evaluation_to_clients(struct lttng_trigger *trigger,
2042 struct lttng_evaluation *evaluation,
2043 struct notification_client_list* client_list,
2044 struct notification_thread_state *state,
2045 uid_t channel_uid, gid_t channel_gid)
2046{
2047 int ret = 0;
2048 struct lttng_dynamic_buffer msg_buffer;
2049 struct notification_client_list_element *client_list_element, *tmp;
2050 struct lttng_notification *notification;
2051 struct lttng_condition *condition;
2052 ssize_t expected_notification_size, notification_size;
2053 struct lttng_notification_channel_message msg;
2054
2055 lttng_dynamic_buffer_init(&msg_buffer);
2056
2057 condition = lttng_trigger_get_condition(trigger);
2058 assert(condition);
2059
2060 notification = lttng_notification_create(condition, evaluation);
2061 if (!notification) {
2062 ret = -1;
2063 goto end;
2064 }
2065
2066 expected_notification_size = lttng_notification_serialize(notification,
2067 NULL);
2068 if (expected_notification_size < 0) {
2069 ERR("[notification-thread] Failed to get size of serialized notification");
2070 ret = -1;
2071 goto end;
2072 }
2073
2074 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2075 msg.size = (uint32_t) expected_notification_size;
2076 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2077 if (ret) {
2078 goto end;
2079 }
2080
2081 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2082 msg_buffer.size + expected_notification_size);
2083 if (ret) {
2084 goto end;
2085 }
2086
2087 notification_size = lttng_notification_serialize(notification,
2088 msg_buffer.data + sizeof(msg));
2089 if (notification_size != expected_notification_size) {
2090 ERR("[notification-thread] Failed to serialize notification");
2091 ret = -1;
2092 goto end;
2093 }
2094
2095 cds_list_for_each_entry_safe(client_list_element, tmp,
2096 &client_list->list, node) {
2097 struct notification_client *client =
2098 client_list_element->client;
2099
2100 if (client->uid != channel_uid && client->gid != channel_gid &&
2101 client->uid != 0) {
2102 /* Client is not allowed to monitor this channel. */
2103 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2104 continue;
2105 }
2106
2107 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2108 client->socket, msg_buffer.size);
2109 if (client->communication.outbound.buffer.size) {
2110 /*
2111 * Outgoing data is already buffered for this client;
2112 * drop the notification and enqueue a "dropped
2113 * notification" message if this is the first dropped
2114 * notification since the socket spilled-over to the
2115 * queue.
2116 */
2117 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2118 client->socket);
2119 if (!client->communication.outbound.dropped_notification) {
2120 client->communication.outbound.dropped_notification = true;
2121 ret = client_enqueue_dropped_notification(
2122 client, state);
2123 if (ret) {
2124 goto end;
2125 }
2126 }
2127 continue;
2128 }
2129
2130 ret = lttng_dynamic_buffer_append_buffer(
2131 &client->communication.outbound.buffer,
2132 &msg_buffer);
2133 if (ret) {
2134 goto end;
2135 }
2136
2137 ret = client_flush_outgoing_queue(client, state);
2138 if (ret) {
2139 goto end;
2140 }
2141 }
2142 ret = 0;
2143end:
2144 lttng_notification_destroy(notification);
2145 lttng_dynamic_buffer_reset(&msg_buffer);
2146 return ret;
2147}
2148
2149int handle_notification_thread_channel_sample(
2150 struct notification_thread_state *state, int pipe,
2151 enum lttng_domain_type domain)
2152{
2153 int ret = 0;
2154 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2155 struct channel_state_sample previous_sample, latest_sample;
2156 struct channel_info *channel_info;
2157 struct cds_lfht_node *node;
2158 struct cds_lfht_iter iter;
2159 struct lttng_channel_trigger_list *trigger_list;
2160 struct lttng_trigger_list_element *trigger_list_element;
2161 bool previous_sample_available = false;
2162
2163 /*
2164 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2165 * ensuring that read/write of sampling messages are atomic.
2166 */
8af8a558 2167 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2168 if (ret != sizeof(sample_msg)) {
2169 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2170 pipe);
2171 ret = -1;
2172 goto end;
2173 }
2174
2175 ret = 0;
2176 latest_sample.key.key = sample_msg.key;
2177 latest_sample.key.domain = domain;
2178 latest_sample.highest_usage = sample_msg.highest;
2179 latest_sample.lowest_usage = sample_msg.lowest;
2180
2181 rcu_read_lock();
2182
2183 /* Retrieve the channel's informations */
2184 cds_lfht_lookup(state->channels_ht,
2185 hash_channel_key(&latest_sample.key),
2186 match_channel_info,
2187 &latest_sample.key,
2188 &iter);
2189 node = cds_lfht_iter_get_node(&iter);
2190 if (!node) {
2191 /*
2192 * Not an error since the consumer can push a sample to the pipe
2193 * and the rest of the session daemon could notify us of the
2194 * channel's destruction before we get a chance to process that
2195 * sample.
2196 */
2197 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2198 latest_sample.key.key,
2199 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2200 "user space");
2201 goto end_unlock;
2202 }
2203 channel_info = caa_container_of(node, struct channel_info,
2204 channels_ht_node);
2205 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2206 channel_info->channel_name,
2207 latest_sample.key.key,
2208 channel_info->session_name,
2209 latest_sample.highest_usage,
2210 latest_sample.lowest_usage);
2211
2212 /* Retrieve the channel's last sample, if it exists, and update it. */
2213 cds_lfht_lookup(state->channel_state_ht,
2214 hash_channel_key(&latest_sample.key),
2215 match_channel_state_sample,
2216 &latest_sample.key,
2217 &iter);
2218 node = cds_lfht_iter_get_node(&iter);
2219 if (node) {
2220 struct channel_state_sample *stored_sample;
2221
2222 /* Update the sample stored. */
2223 stored_sample = caa_container_of(node,
2224 struct channel_state_sample,
2225 channel_state_ht_node);
2226 memcpy(&previous_sample, stored_sample,
2227 sizeof(previous_sample));
2228 stored_sample->highest_usage = latest_sample.highest_usage;
2229 stored_sample->lowest_usage = latest_sample.lowest_usage;
2230 previous_sample_available = true;
2231 } else {
2232 /*
2233 * This is the channel's first sample, allocate space for and
2234 * store the new sample.
2235 */
2236 struct channel_state_sample *stored_sample;
2237
2238 stored_sample = zmalloc(sizeof(*stored_sample));
2239 if (!stored_sample) {
2240 ret = -1;
2241 goto end_unlock;
2242 }
2243
2244 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2245 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2246 cds_lfht_add(state->channel_state_ht,
2247 hash_channel_key(&stored_sample->key),
2248 &stored_sample->channel_state_ht_node);
2249 }
2250
2251 /* Find triggers associated with this channel. */
2252 cds_lfht_lookup(state->channel_triggers_ht,
2253 hash_channel_key(&latest_sample.key),
2254 match_channel_trigger_list,
2255 &latest_sample.key,
2256 &iter);
2257 node = cds_lfht_iter_get_node(&iter);
2258 if (!node) {
2259 goto end_unlock;
2260 }
2261
2262 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2263 channel_triggers_ht_node);
2264 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2265 node) {
2266 struct lttng_condition *condition;
2267 struct lttng_action *action;
2268 struct lttng_trigger *trigger;
2269 struct notification_client_list *client_list;
2270 struct lttng_evaluation *evaluation = NULL;
2271
2272 trigger = trigger_list_element->trigger;
2273 condition = lttng_trigger_get_condition(trigger);
2274 assert(condition);
2275 action = lttng_trigger_get_action(trigger);
2276
2277 /* Notify actions are the only type currently supported. */
2278 assert(lttng_action_get_type(action) ==
2279 LTTNG_ACTION_TYPE_NOTIFY);
2280
2281 /*
2282 * Check if any client is subscribed to the result of this
2283 * evaluation.
2284 */
2285 cds_lfht_lookup(state->notification_trigger_clients_ht,
2286 lttng_condition_hash(condition),
2287 match_client_list,
2288 trigger,
2289 &iter);
2290 node = cds_lfht_iter_get_node(&iter);
2291 assert(node);
2292
2293 client_list = caa_container_of(node,
2294 struct notification_client_list,
2295 notification_trigger_ht_node);
2296 if (cds_list_empty(&client_list->list)) {
2297 /*
2298 * No clients interested in the evaluation's result,
2299 * skip it.
2300 */
2301 continue;
2302 }
2303
2304 ret = evaluate_condition(condition, &evaluation, state,
2305 previous_sample_available ? &previous_sample : NULL,
2306 &latest_sample, channel_info->capacity);
2307 if (ret) {
2308 goto end_unlock;
2309 }
2310
2311 if (!evaluation) {
2312 continue;
2313 }
2314
2315 /* Dispatch evaluation result to all clients. */
2316 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2317 evaluation, client_list, state,
2318 channel_info->uid, channel_info->gid);
2319 if (ret) {
2320 goto end_unlock;
2321 }
2322 }
2323end_unlock:
2324 rcu_read_unlock();
2325end:
2326 return ret;
2327}
This page took 0.117151 seconds and 5 git commands to generate.