Fix: missing rcu read locking in trigger "unregister all" command
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
fa7c4fcd 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
44f2d7db 41#include <fcntl.h>
ab0ee2ca 42
fa7c4fcd
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
0fe4c3bf
JG
61 /* call_rcu delayed reclaim. */
62 struct rcu_head rcu_node;
ab0ee2ca
JG
63};
64
65struct lttng_trigger_ht_element {
66 struct lttng_trigger *trigger;
67 struct cds_lfht_node node;
0fe4c3bf
JG
68 /* call_rcu delayed reclaim. */
69 struct rcu_head rcu_node;
ab0ee2ca
JG
70};
71
72struct lttng_condition_list_element {
73 struct lttng_condition *condition;
74 struct cds_list_head node;
75};
76
77struct notification_client_list_element {
78 struct notification_client *client;
79 struct cds_list_head node;
80};
81
82struct notification_client_list {
83 struct lttng_trigger *trigger;
84 struct cds_list_head list;
85 struct cds_lfht_node notification_trigger_ht_node;
0fe4c3bf
JG
86 /* call_rcu delayed reclaim. */
87 struct rcu_head rcu_node;
ab0ee2ca
JG
88};
89
90struct notification_client {
91 int socket;
92 /* Client protocol version. */
93 uint8_t major, minor;
94 uid_t uid;
95 gid_t gid;
96 /*
6a00208d 97 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
98 * checked.
99 */
100 bool validated;
101 /*
102 * Conditions to which the client's notification channel is subscribed.
103 * List of struct lttng_condition_list_node. The condition member is
104 * owned by the client.
105 */
106 struct cds_list_head condition_list;
107 struct cds_lfht_node client_socket_ht_node;
108 struct {
109 struct {
b5eb6b46
JG
110 /*
111 * During the reception of a message, the reception
112 * buffers' "size" is set to contain the current
113 * message's complete payload.
114 */
ab0ee2ca
JG
115 struct lttng_dynamic_buffer buffer;
116 /* Bytes left to receive for the current message. */
117 size_t bytes_to_receive;
118 /* Type of the message being received. */
119 enum lttng_notification_channel_message_type msg_type;
120 /*
121 * Indicates whether or not credentials are expected
122 * from the client.
123 */
f950d02c 124 bool expect_creds;
ab0ee2ca
JG
125 /*
126 * Indicates whether or not credentials were received
127 * from the client.
128 */
129 bool creds_received;
b5eb6b46 130 /* Only used during credentials reception. */
ab0ee2ca
JG
131 lttng_sock_cred creds;
132 } inbound;
133 struct {
134 /*
135 * Indicates whether or not a notification addressed to
136 * this client was dropped because a command reply was
137 * already buffered.
138 *
139 * A notification is dropped whenever the buffer is not
140 * empty.
141 */
142 bool dropped_notification;
143 /*
144 * Indicates whether or not a command reply is already
145 * buffered. In this case, it means that the client is
146 * not consuming command replies before emitting a new
147 * one. This could be caused by a protocol error or a
148 * misbehaving/malicious client.
149 */
150 bool queued_command_reply;
151 struct lttng_dynamic_buffer buffer;
152 } outbound;
153 } communication;
0fe4c3bf
JG
154 /* call_rcu delayed reclaim. */
155 struct rcu_head rcu_node;
ab0ee2ca
JG
156};
157
158struct channel_state_sample {
159 struct channel_key key;
160 struct cds_lfht_node channel_state_ht_node;
161 uint64_t highest_usage;
162 uint64_t lowest_usage;
0fe4c3bf
JG
163 /* call_rcu delayed reclaim. */
164 struct rcu_head rcu_node;
ab0ee2ca
JG
165};
166
f0225486
JR
167static unsigned long hash_channel_key(struct channel_key *key);
168static int evaluate_condition(struct lttng_condition *condition,
169 struct lttng_evaluation **evaluation,
170 struct notification_thread_state *state,
171 struct channel_state_sample *previous_sample,
172 struct channel_state_sample *latest_sample,
173 uint64_t buffer_capacity);
174static
175int send_evaluation_to_clients(struct lttng_trigger *trigger,
176 struct lttng_evaluation *evaluation,
177 struct notification_client_list *client_list,
178 struct notification_thread_state *state,
179 uid_t channel_uid, gid_t channel_gid);
180
ab0ee2ca
JG
181static
182int match_client(struct cds_lfht_node *node, const void *key)
183{
184 /* This double-cast is intended to supress pointer-to-cast warning. */
185 int socket = (int) (intptr_t) key;
186 struct notification_client *client;
187
188 client = caa_container_of(node, struct notification_client,
189 client_socket_ht_node);
190
191 return !!(client->socket == socket);
192}
193
194static
195int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
196{
197 struct channel_key *channel_key = (struct channel_key *) key;
198 struct lttng_channel_trigger_list *trigger_list;
199
200 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
201 channel_triggers_ht_node);
202
203 return !!((channel_key->key == trigger_list->channel_key.key) &&
204 (channel_key->domain == trigger_list->channel_key.domain));
205}
206
207static
208int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
209{
210 struct channel_key *channel_key = (struct channel_key *) key;
211 struct channel_state_sample *sample;
212
213 sample = caa_container_of(node, struct channel_state_sample,
214 channel_state_ht_node);
215
216 return !!((channel_key->key == sample->key.key) &&
217 (channel_key->domain == sample->key.domain));
218}
219
220static
221int match_channel_info(struct cds_lfht_node *node, const void *key)
222{
223 struct channel_key *channel_key = (struct channel_key *) key;
224 struct channel_info *channel_info;
225
226 channel_info = caa_container_of(node, struct channel_info,
227 channels_ht_node);
228
229 return !!((channel_key->key == channel_info->key.key) &&
230 (channel_key->domain == channel_info->key.domain));
231}
232
233static
234int match_condition(struct cds_lfht_node *node, const void *key)
235{
236 struct lttng_condition *condition_key = (struct lttng_condition *) key;
237 struct lttng_trigger_ht_element *trigger;
238 struct lttng_condition *condition;
239
240 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
241 node);
242 condition = lttng_trigger_get_condition(trigger->trigger);
243 assert(condition);
244
245 return !!lttng_condition_is_equal(condition_key, condition);
246}
247
248static
249int match_client_list(struct cds_lfht_node *node, const void *key)
250{
251 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
252 struct notification_client_list *client_list;
253 struct lttng_condition *condition;
254 struct lttng_condition *condition_key = lttng_trigger_get_condition(
255 trigger_key);
256
257 assert(condition_key);
258
259 client_list = caa_container_of(node, struct notification_client_list,
260 notification_trigger_ht_node);
261 condition = lttng_trigger_get_condition(client_list->trigger);
262
263 return !!lttng_condition_is_equal(condition_key, condition);
264}
265
266static
267int match_client_list_condition(struct cds_lfht_node *node, const void *key)
268{
269 struct lttng_condition *condition_key = (struct lttng_condition *) key;
270 struct notification_client_list *client_list;
271 struct lttng_condition *condition;
272
273 assert(condition_key);
274
275 client_list = caa_container_of(node, struct notification_client_list,
276 notification_trigger_ht_node);
277 condition = lttng_trigger_get_condition(client_list->trigger);
278
279 return !!lttng_condition_is_equal(condition_key, condition);
280}
281
282static
283unsigned long lttng_condition_buffer_usage_hash(
284 struct lttng_condition *_condition)
285{
286 unsigned long hash = 0;
287 struct lttng_condition_buffer_usage *condition;
288
289 condition = container_of(_condition,
290 struct lttng_condition_buffer_usage, parent);
291
292 if (condition->session_name) {
293 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
294 }
295 if (condition->channel_name) {
8f56701f 296 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
297 }
298 if (condition->domain.set) {
299 hash ^= hash_key_ulong(
300 (void *) condition->domain.type,
301 lttng_ht_seed);
302 }
303 if (condition->threshold_ratio.set) {
304 uint64_t val;
305
306 val = condition->threshold_ratio.value * (double) UINT32_MAX;
307 hash ^= hash_key_u64(&val, lttng_ht_seed);
b6d67aac 308 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
309 uint64_t val;
310
311 val = condition->threshold_bytes.value;
312 hash ^= hash_key_u64(&val, lttng_ht_seed);
313 }
314 return hash;
315}
316
317/*
318 * The lttng_condition hashing code is kept in this file (rather than
319 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
320 * don't want to link in liblttng-ctl.
321 */
322static
323unsigned long lttng_condition_hash(struct lttng_condition *condition)
324{
325 switch (condition->type) {
326 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
327 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
328 return lttng_condition_buffer_usage_hash(condition);
329 default:
330 ERR("[notification-thread] Unexpected condition type caught");
331 abort();
332 }
333}
334
f0225486
JR
335static
336unsigned long hash_channel_key(struct channel_key *key)
337{
338 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
339 unsigned long domain_hash = hash_key_ulong(
340 (void *) (unsigned long) key->domain, lttng_ht_seed);
341
342 return key_hash ^ domain_hash;
343}
344
0fe4c3bf
JG
345static
346void free_channel_info_rcu(struct rcu_head *node)
347{
348 free(caa_container_of(node, struct channel_info, rcu_node));
349}
350
ab0ee2ca
JG
351static
352void channel_info_destroy(struct channel_info *channel_info)
353{
354 if (!channel_info) {
355 return;
356 }
357
358 if (channel_info->session_name) {
359 free(channel_info->session_name);
360 }
361 if (channel_info->channel_name) {
362 free(channel_info->channel_name);
363 }
0fe4c3bf 364 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
ab0ee2ca
JG
365}
366
367static
368struct channel_info *channel_info_copy(struct channel_info *channel_info)
369{
370 struct channel_info *copy = zmalloc(sizeof(*channel_info));
371
372 assert(channel_info);
373 assert(channel_info->session_name);
374 assert(channel_info->channel_name);
375
376 if (!copy) {
377 goto end;
378 }
379
380 memcpy(copy, channel_info, sizeof(*channel_info));
381 copy->session_name = NULL;
382 copy->channel_name = NULL;
383
384 copy->session_name = strdup(channel_info->session_name);
385 if (!copy->session_name) {
386 goto error;
387 }
388 copy->channel_name = strdup(channel_info->channel_name);
389 if (!copy->channel_name) {
390 goto error;
391 }
392 cds_lfht_node_init(&channel_info->channels_ht_node);
393end:
394 return copy;
395error:
396 channel_info_destroy(copy);
397 return NULL;
398}
399
f0225486
JR
400/* This function must be called with the RCU read lock held. */
401static
402int evaluate_condition_for_client(struct lttng_trigger *trigger,
403 struct lttng_condition *condition,
404 struct notification_client *client,
405 struct notification_thread_state *state)
406{
407 int ret;
408 struct cds_lfht_iter iter;
409 struct cds_lfht_node *node;
410 struct channel_info *channel_info = NULL;
411 struct channel_key *channel_key = NULL;
412 struct channel_state_sample *last_sample = NULL;
413 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
414 struct lttng_evaluation *evaluation = NULL;
415 struct notification_client_list client_list = { 0 };
416 struct notification_client_list_element client_list_element = { 0 };
417
418 assert(trigger);
419 assert(condition);
420 assert(client);
421 assert(state);
422
423 /* Find the channel associated with the trigger. */
424 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
425 channel_trigger_list , channel_triggers_ht_node) {
426 struct lttng_trigger_list_element *element;
427
428 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
429 struct lttng_condition *current_condition =
430 lttng_trigger_get_condition(
431 element->trigger);
432
433 assert(current_condition);
434 if (!lttng_condition_is_equal(condition,
435 current_condition)) {
436 continue;
437 }
438
439 /* Found the trigger, save the channel key. */
440 channel_key = &channel_trigger_list->channel_key;
441 break;
442 }
443 if (channel_key) {
444 /* The channel key was found stop iteration. */
445 break;
446 }
447 }
448
449 if (!channel_key){
450 /* No channel found; normal exit. */
451 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
452 ret = 0;
453 goto end;
454 }
455
456 /* Fetch channel info for the matching channel. */
457 cds_lfht_lookup(state->channels_ht,
458 hash_channel_key(channel_key),
459 match_channel_info,
460 channel_key,
461 &iter);
462 node = cds_lfht_iter_get_node(&iter);
463 assert(node);
464 channel_info = caa_container_of(node, struct channel_info,
465 channels_ht_node);
466
467 /* Retrieve the channel's last sample, if it exists. */
468 cds_lfht_lookup(state->channel_state_ht,
469 hash_channel_key(channel_key),
470 match_channel_state_sample,
471 channel_key,
472 &iter);
473 node = cds_lfht_iter_get_node(&iter);
474 if (node) {
475 last_sample = caa_container_of(node,
476 struct channel_state_sample,
477 channel_state_ht_node);
478 } else {
479 /* Nothing to evaluate, no sample was ever taken. Normal exit */
480 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
481 ret = 0;
482 goto end;
483 }
484
485 ret = evaluate_condition(condition, &evaluation, state, NULL,
486 last_sample, channel_info->capacity);
487 if (ret) {
e7d6cdaa 488 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
f0225486
JR
489 goto end;
490 }
491
492 if (!evaluation) {
493 /* Evaluation yielded nothing. Normal exit. */
494 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
495 ret = 0;
496 goto end;
497 }
498
499 /*
500 * Create a temporary client list with the client currently
501 * subscribing.
502 */
503 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
504 CDS_INIT_LIST_HEAD(&client_list.list);
505 client_list.trigger = trigger;
506
507 CDS_INIT_LIST_HEAD(&client_list_element.node);
508 client_list_element.client = client;
509 cds_list_add(&client_list_element.node, &client_list.list);
510
511 /* Send evaluation result to the newly-subscribed client. */
512 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
513 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
514 state, channel_info->uid, channel_info->gid);
515
516end:
517 return ret;
518}
519
ab0ee2ca
JG
520static
521int notification_thread_client_subscribe(struct notification_client *client,
522 struct lttng_condition *condition,
523 struct notification_thread_state *state,
524 enum lttng_notification_channel_status *_status)
525{
526 int ret = 0;
527 struct cds_lfht_iter iter;
528 struct cds_lfht_node *node;
529 struct notification_client_list *client_list;
530 struct lttng_condition_list_element *condition_list_element = NULL;
531 struct notification_client_list_element *client_list_element = NULL;
532 enum lttng_notification_channel_status status =
533 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
534
535 /*
536 * Ensure that the client has not already subscribed to this condition
537 * before.
538 */
539 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
540 if (lttng_condition_is_equal(condition_list_element->condition,
541 condition)) {
542 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
543 goto end;
544 }
545 }
546
547 condition_list_element = zmalloc(sizeof(*condition_list_element));
548 if (!condition_list_element) {
549 ret = -1;
550 goto error;
551 }
552 client_list_element = zmalloc(sizeof(*client_list_element));
553 if (!client_list_element) {
554 ret = -1;
555 goto error;
556 }
557
558 rcu_read_lock();
559
560 /*
561 * Add the newly-subscribed condition to the client's subscription list.
562 */
563 CDS_INIT_LIST_HEAD(&condition_list_element->node);
564 condition_list_element->condition = condition;
565 cds_list_add(&condition_list_element->node, &client->condition_list);
566
ab0ee2ca
JG
567 cds_lfht_lookup(state->notification_trigger_clients_ht,
568 lttng_condition_hash(condition),
569 match_client_list_condition,
570 condition,
571 &iter);
572 node = cds_lfht_iter_get_node(&iter);
573 if (!node) {
c3dce007 574 free(client_list_element);
ab0ee2ca
JG
575 goto end_unlock;
576 }
577
578 client_list = caa_container_of(node, struct notification_client_list,
579 notification_trigger_ht_node);
f0225486
JR
580 if (evaluate_condition_for_client(client_list->trigger, condition,
581 client, state)) {
582 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
583 ret = -1;
d39ea393 584 free(client_list_element);
f0225486
JR
585 goto end_unlock;
586 }
587
588 /*
589 * Add the client to the list of clients interested in a given trigger
590 * if a "notification" trigger with a corresponding condition was
591 * added prior.
592 */
ab0ee2ca
JG
593 client_list_element->client = client;
594 CDS_INIT_LIST_HEAD(&client_list_element->node);
595 cds_list_add(&client_list_element->node, &client_list->list);
596end_unlock:
597 rcu_read_unlock();
598end:
599 if (_status) {
600 *_status = status;
601 }
602 return ret;
603error:
604 free(condition_list_element);
605 free(client_list_element);
606 return ret;
607}
608
609static
610int notification_thread_client_unsubscribe(
611 struct notification_client *client,
612 struct lttng_condition *condition,
613 struct notification_thread_state *state,
614 enum lttng_notification_channel_status *_status)
615{
616 struct cds_lfht_iter iter;
617 struct cds_lfht_node *node;
618 struct notification_client_list *client_list;
619 struct lttng_condition_list_element *condition_list_element,
620 *condition_tmp;
621 struct notification_client_list_element *client_list_element,
622 *client_tmp;
623 bool condition_found = false;
624 enum lttng_notification_channel_status status =
625 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
626
627 /* Remove the condition from the client's condition list. */
628 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
629 &client->condition_list, node) {
630 if (!lttng_condition_is_equal(condition_list_element->condition,
631 condition)) {
632 continue;
633 }
634
635 cds_list_del(&condition_list_element->node);
636 /*
637 * The caller may be iterating on the client's conditions to
638 * tear down a client's connection. In this case, the condition
639 * will be destroyed at the end.
640 */
641 if (condition != condition_list_element->condition) {
642 lttng_condition_destroy(
643 condition_list_element->condition);
644 }
645 free(condition_list_element);
646 condition_found = true;
647 break;
648 }
649
650 if (!condition_found) {
651 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
652 goto end;
653 }
654
655 /*
656 * Remove the client from the list of clients interested the trigger
657 * matching the condition.
658 */
659 rcu_read_lock();
660 cds_lfht_lookup(state->notification_trigger_clients_ht,
661 lttng_condition_hash(condition),
662 match_client_list_condition,
663 condition,
664 &iter);
665 node = cds_lfht_iter_get_node(&iter);
666 if (!node) {
667 goto end_unlock;
668 }
669
670 client_list = caa_container_of(node, struct notification_client_list,
671 notification_trigger_ht_node);
672 cds_list_for_each_entry_safe(client_list_element, client_tmp,
673 &client_list->list, node) {
674 if (client_list_element->client->socket != client->socket) {
675 continue;
676 }
677 cds_list_del(&client_list_element->node);
678 free(client_list_element);
679 break;
680 }
681end_unlock:
682 rcu_read_unlock();
683end:
684 lttng_condition_destroy(condition);
685 if (_status) {
686 *_status = status;
687 }
688 return 0;
689}
690
0fe4c3bf
JG
691static
692void free_notification_client_rcu(struct rcu_head *node)
693{
694 free(caa_container_of(node, struct notification_client, rcu_node));
695}
696
ab0ee2ca
JG
697static
698void notification_client_destroy(struct notification_client *client,
699 struct notification_thread_state *state)
700{
701 struct lttng_condition_list_element *condition_list_element, *tmp;
702
703 if (!client) {
704 return;
705 }
706
707 /* Release all conditions to which the client was subscribed. */
708 cds_list_for_each_entry_safe(condition_list_element, tmp,
709 &client->condition_list, node) {
710 (void) notification_thread_client_unsubscribe(client,
711 condition_list_element->condition, state, NULL);
712 }
713
714 if (client->socket >= 0) {
715 (void) lttcomm_close_unix_sock(client->socket);
716 }
717 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
718 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
0fe4c3bf 719 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
720}
721
722/*
723 * Call with rcu_read_lock held (and hold for the lifetime of the returned
724 * client pointer).
725 */
726static
727struct notification_client *get_client_from_socket(int socket,
728 struct notification_thread_state *state)
729{
730 struct cds_lfht_iter iter;
731 struct cds_lfht_node *node;
732 struct notification_client *client = NULL;
733
734 cds_lfht_lookup(state->client_socket_ht,
735 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
736 match_client,
737 (void *) (unsigned long) socket,
738 &iter);
739 node = cds_lfht_iter_get_node(&iter);
740 if (!node) {
741 goto end;
742 }
743
744 client = caa_container_of(node, struct notification_client,
745 client_socket_ht_node);
746end:
747 return client;
748}
749
750static
751bool trigger_applies_to_channel(struct lttng_trigger *trigger,
752 struct channel_info *info)
753{
754 enum lttng_condition_status status;
755 struct lttng_condition *condition;
756 const char *trigger_session_name = NULL;
757 const char *trigger_channel_name = NULL;
758 enum lttng_domain_type trigger_domain;
759
760 condition = lttng_trigger_get_condition(trigger);
761 if (!condition) {
762 goto fail;
763 }
764
765 switch (lttng_condition_get_type(condition)) {
766 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
767 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
768 break;
769 default:
770 goto fail;
771 }
772
773 status = lttng_condition_buffer_usage_get_domain_type(condition,
774 &trigger_domain);
775 assert(status == LTTNG_CONDITION_STATUS_OK);
776 if (info->key.domain != trigger_domain) {
777 goto fail;
778 }
779
780 status = lttng_condition_buffer_usage_get_session_name(
781 condition, &trigger_session_name);
782 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
783
784 status = lttng_condition_buffer_usage_get_channel_name(
785 condition, &trigger_channel_name);
786 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
787
788 if (strcmp(info->session_name, trigger_session_name)) {
789 goto fail;
790 }
791 if (strcmp(info->channel_name, trigger_channel_name)) {
792 goto fail;
793 }
794
795 return true;
796fail:
797 return false;
798}
799
800static
801bool trigger_applies_to_client(struct lttng_trigger *trigger,
802 struct notification_client *client)
803{
804 bool applies = false;
805 struct lttng_condition_list_element *condition_list_element;
806
807 cds_list_for_each_entry(condition_list_element, &client->condition_list,
808 node) {
809 applies = lttng_condition_is_equal(
810 condition_list_element->condition,
811 lttng_trigger_get_condition(trigger));
812 if (applies) {
813 break;
814 }
815 }
816 return applies;
817}
818
ab0ee2ca
JG
819static
820int handle_notification_thread_command_add_channel(
821 struct notification_thread_state *state,
822 struct channel_info *channel_info,
823 enum lttng_error_code *cmd_result)
824{
825 struct cds_list_head trigger_list;
826 struct channel_info *new_channel_info;
827 struct channel_key *channel_key;
828 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
829 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
830 int trigger_count = 0;
831 struct cds_lfht_iter iter;
832
833 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
834 channel_info->channel_name, channel_info->session_name,
835 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
836
837 CDS_INIT_LIST_HEAD(&trigger_list);
838
839 new_channel_info = channel_info_copy(channel_info);
840 if (!new_channel_info) {
841 goto error;
842 }
843
844 channel_key = &new_channel_info->key;
845
0fe4c3bf 846 rcu_read_lock();
ab0ee2ca
JG
847 /* Build a list of all triggers applying to the new channel. */
848 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
849 node) {
850 struct lttng_trigger_list_element *new_element;
851
852 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
853 channel_info)) {
854 continue;
855 }
856
857 new_element = zmalloc(sizeof(*new_element));
858 if (!new_element) {
0fe4c3bf 859 goto error_unlock;
ab0ee2ca
JG
860 }
861 CDS_INIT_LIST_HEAD(&new_element->node);
862 new_element->trigger = trigger_ht_element->trigger;
863 cds_list_add(&new_element->node, &trigger_list);
864 trigger_count++;
865 }
866
867 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
868 trigger_count);
869 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
870 if (!channel_trigger_list) {
0fe4c3bf 871 goto error_unlock;
ab0ee2ca
JG
872 }
873 channel_trigger_list->channel_key = *channel_key;
874 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
875 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
876 cds_list_splice(&trigger_list, &channel_trigger_list->list);
877
ab0ee2ca
JG
878 /* Add channel to the channel_ht which owns the channel_infos. */
879 cds_lfht_add(state->channels_ht,
880 hash_channel_key(channel_key),
881 &new_channel_info->channels_ht_node);
882 /*
883 * Add the list of triggers associated with this channel to the
884 * channel_triggers_ht.
885 */
886 cds_lfht_add(state->channel_triggers_ht,
887 hash_channel_key(channel_key),
888 &channel_trigger_list->channel_triggers_ht_node);
889 rcu_read_unlock();
890 *cmd_result = LTTNG_OK;
891 return 0;
0fe4c3bf
JG
892error_unlock:
893 rcu_read_unlock();
ab0ee2ca
JG
894error:
895 /* Empty trigger list */
896 channel_info_destroy(new_channel_info);
897 return 1;
898}
899
0fe4c3bf
JG
900static
901void free_channel_trigger_list_rcu(struct rcu_head *node)
902{
903 free(caa_container_of(node, struct lttng_channel_trigger_list,
904 rcu_node));
905}
906
907static
908void free_channel_state_sample_rcu(struct rcu_head *node)
909{
910 free(caa_container_of(node, struct channel_state_sample,
911 rcu_node));
912}
913
ab0ee2ca
JG
914static
915int handle_notification_thread_command_remove_channel(
916 struct notification_thread_state *state,
917 uint64_t channel_key, enum lttng_domain_type domain,
918 enum lttng_error_code *cmd_result)
919{
920 struct cds_lfht_node *node;
921 struct cds_lfht_iter iter;
922 struct lttng_channel_trigger_list *trigger_list;
923 struct lttng_trigger_list_element *trigger_list_element, *tmp;
924 struct channel_key key = { .key = channel_key, .domain = domain };
925 struct channel_info *channel_info;
926
927 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
928 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
929
930 rcu_read_lock();
931
932 cds_lfht_lookup(state->channel_triggers_ht,
933 hash_channel_key(&key),
934 match_channel_trigger_list,
935 &key,
936 &iter);
937 node = cds_lfht_iter_get_node(&iter);
938 /*
939 * There is a severe internal error if we are being asked to remove a
940 * channel that doesn't exist.
941 */
942 if (!node) {
943 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
944 goto end;
945 }
946
947 /* Free the list of triggers associated with this channel. */
948 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
949 channel_triggers_ht_node);
950 cds_list_for_each_entry_safe(trigger_list_element, tmp,
951 &trigger_list->list, node) {
952 cds_list_del(&trigger_list_element->node);
953 free(trigger_list_element);
954 }
955 cds_lfht_del(state->channel_triggers_ht, node);
0fe4c3bf 956 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
957
958 /* Free sampled channel state. */
959 cds_lfht_lookup(state->channel_state_ht,
960 hash_channel_key(&key),
961 match_channel_state_sample,
962 &key,
963 &iter);
964 node = cds_lfht_iter_get_node(&iter);
965 /*
966 * This is expected to be NULL if the channel is destroyed before we
967 * received a sample.
968 */
969 if (node) {
970 struct channel_state_sample *sample = caa_container_of(node,
971 struct channel_state_sample,
972 channel_state_ht_node);
973
974 cds_lfht_del(state->channel_state_ht, node);
0fe4c3bf 975 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
976 }
977
978 /* Remove the channel from the channels_ht and free it. */
979 cds_lfht_lookup(state->channels_ht,
980 hash_channel_key(&key),
981 match_channel_info,
982 &key,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 assert(node);
986 channel_info = caa_container_of(node, struct channel_info,
987 channels_ht_node);
988 cds_lfht_del(state->channels_ht, node);
989 channel_info_destroy(channel_info);
990end:
991 rcu_read_unlock();
992 *cmd_result = LTTNG_OK;
993 return 0;
994}
995
fa7c4fcd
JG
996static
997int condition_is_supported(struct lttng_condition *condition)
998{
999 int ret;
1000
1001 switch (lttng_condition_get_type(condition)) {
1002 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1003 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1004 {
1005 enum lttng_domain_type domain;
1006
1007 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1008 &domain);
1009 if (ret) {
1010 ret = -1;
1011 goto end;
1012 }
1013
1014 if (domain != LTTNG_DOMAIN_KERNEL) {
1015 ret = 1;
1016 goto end;
1017 }
1018
1019 /*
1020 * Older kernel tracers don't expose the API to monitor their
1021 * buffers. Therefore, we reject triggers that require that
1022 * mechanism to be available to be evaluated.
1023 */
1024 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1025 kernel_tracer_fd);
1026 break;
1027 }
1028 default:
1029 ret = 1;
1030 }
1031end:
1032 return ret;
1033}
1034
ab0ee2ca
JG
1035/*
1036 * FIXME A client's credentials are not checked when registering a trigger, nor
1037 * are they stored alongside with the trigger.
1038 *
fa7c4fcd 1039 * The effects of this are benign since:
ab0ee2ca
JG
1040 * - The client will succeed in registering the trigger, as it is valid,
1041 * - The trigger will, internally, be bound to the channel,
1042 * - The notifications will not be sent since the client's credentials
1043 * are checked against the channel at that moment.
fa7c4fcd
JG
1044 *
1045 * If this function returns a non-zero value, it means something is
1046 * fundamentally and the whole subsystem/thread will be torn down.
1047 *
1048 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1049 * error code.
ab0ee2ca
JG
1050 */
1051static
1052int handle_notification_thread_command_register_trigger(
1053 struct notification_thread_state *state,
1054 struct lttng_trigger *trigger,
1055 enum lttng_error_code *cmd_result)
1056{
1057 int ret = 0;
1058 struct lttng_condition *condition;
1059 struct notification_client *client;
1060 struct notification_client_list *client_list = NULL;
1061 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1062 struct notification_client_list_element *client_list_element, *tmp;
1063 struct cds_lfht_node *node;
1064 struct cds_lfht_iter iter;
1065 struct channel_info *channel;
1066 bool free_trigger = true;
1067
1068 rcu_read_lock();
1069
1070 condition = lttng_trigger_get_condition(trigger);
fa7c4fcd
JG
1071 assert(condition);
1072
1073 ret = condition_is_supported(condition);
1074 if (ret < 0) {
1075 goto error;
1076 } else if (ret == 0) {
1077 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1078 goto error;
1079 } else {
1080 /* Feature is supported, continue. */
1081 ret = 0;
1082 }
1083
ab0ee2ca
JG
1084 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1085 if (!trigger_ht_element) {
1086 ret = -1;
1087 goto error;
1088 }
1089
1090 /* Add trigger to the trigger_ht. */
1091 cds_lfht_node_init(&trigger_ht_element->node);
1092 trigger_ht_element->trigger = trigger;
1093
1094 node = cds_lfht_add_unique(state->triggers_ht,
1095 lttng_condition_hash(condition),
1096 match_condition,
1097 condition,
1098 &trigger_ht_element->node);
1099 if (node != &trigger_ht_element->node) {
1100 /* Not a fatal error, simply report it to the client. */
1101 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1102 goto error_free_ht_element;
1103 }
1104
1105 /*
1106 * Ownership of the trigger and of its wrapper was transfered to
1107 * the triggers_ht.
1108 */
1109 trigger_ht_element = NULL;
1110 free_trigger = false;
1111
1112 /*
1113 * The rest only applies to triggers that have a "notify" action.
1114 * It is not skipped as this is the only action type currently
1115 * supported.
1116 */
1117 client_list = zmalloc(sizeof(*client_list));
1118 if (!client_list) {
1119 ret = -1;
1120 goto error_free_ht_element;
1121 }
1122 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1123 CDS_INIT_LIST_HEAD(&client_list->list);
1124 client_list->trigger = trigger;
1125
1126 /* Build a list of clients to which this new trigger applies. */
1127 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1128 client_socket_ht_node) {
1129 if (!trigger_applies_to_client(trigger, client)) {
1130 continue;
1131 }
1132
1133 client_list_element = zmalloc(sizeof(*client_list_element));
1134 if (!client_list_element) {
1135 ret = -1;
1136 goto error_free_client_list;
1137 }
1138 CDS_INIT_LIST_HEAD(&client_list_element->node);
1139 client_list_element->client = client;
1140 cds_list_add(&client_list_element->node, &client_list->list);
1141 }
1142
1143 cds_lfht_add(state->notification_trigger_clients_ht,
1144 lttng_condition_hash(condition),
1145 &client_list->notification_trigger_ht_node);
1146 /*
1147 * Client list ownership transferred to the
1148 * notification_trigger_clients_ht.
1149 */
1150 client_list = NULL;
1151
1152 /*
1153 * Add the trigger to list of triggers bound to the channels currently
1154 * known.
1155 */
1156 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1157 channels_ht_node) {
1158 struct lttng_trigger_list_element *trigger_list_element;
1159 struct lttng_channel_trigger_list *trigger_list;
88c6cf5e 1160 struct cds_lfht_iter lookup_iter;
ab0ee2ca
JG
1161
1162 if (!trigger_applies_to_channel(trigger, channel)) {
1163 continue;
1164 }
1165
1166 cds_lfht_lookup(state->channel_triggers_ht,
1167 hash_channel_key(&channel->key),
1168 match_channel_trigger_list,
1169 &channel->key,
88c6cf5e
MD
1170 &lookup_iter);
1171 node = cds_lfht_iter_get_node(&lookup_iter);
ab0ee2ca 1172 assert(node);
ab0ee2ca
JG
1173 trigger_list = caa_container_of(node,
1174 struct lttng_channel_trigger_list,
1175 channel_triggers_ht_node);
1176
1177 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1178 if (!trigger_list_element) {
1179 ret = -1;
1180 goto error_free_client_list;
1181 }
1182 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1183 trigger_list_element->trigger = trigger;
1184 cds_list_add(&trigger_list_element->node, &trigger_list->list);
f0225486 1185
ab0ee2ca
JG
1186 /* A trigger can only apply to one channel. */
1187 break;
1188 }
1189
1190 *cmd_result = LTTNG_OK;
1191error_free_client_list:
1192 if (client_list) {
1193 cds_list_for_each_entry_safe(client_list_element, tmp,
1194 &client_list->list, node) {
1195 free(client_list_element);
1196 }
1197 free(client_list);
1198 }
1199error_free_ht_element:
1200 free(trigger_ht_element);
1201error:
1202 if (free_trigger) {
1203 struct lttng_action *action = lttng_trigger_get_action(trigger);
1204
1205 lttng_condition_destroy(condition);
1206 lttng_action_destroy(action);
1207 lttng_trigger_destroy(trigger);
1208 }
1209 rcu_read_unlock();
1210 return ret;
1211}
1212
0fe4c3bf
JG
1213static
1214void free_notification_client_list_rcu(struct rcu_head *node)
1215{
1216 free(caa_container_of(node, struct notification_client_list,
1217 rcu_node));
1218}
1219
1220static
1221void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
1222{
1223 free(caa_container_of(node, struct lttng_trigger_ht_element,
1224 rcu_node));
1225}
1226
7c2340da 1227static
ab0ee2ca
JG
1228int handle_notification_thread_command_unregister_trigger(
1229 struct notification_thread_state *state,
1230 struct lttng_trigger *trigger,
1231 enum lttng_error_code *_cmd_reply)
1232{
1233 struct cds_lfht_iter iter;
1234 struct cds_lfht_node *node, *triggers_ht_node;
1235 struct lttng_channel_trigger_list *trigger_list;
1236 struct notification_client_list *client_list;
1237 struct notification_client_list_element *client_list_element, *tmp;
1238 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1239 struct lttng_condition *condition = lttng_trigger_get_condition(
1240 trigger);
1241 struct lttng_action *action;
1242 enum lttng_error_code cmd_reply;
1243
1244 rcu_read_lock();
1245
1246 cds_lfht_lookup(state->triggers_ht,
1247 lttng_condition_hash(condition),
1248 match_condition,
1249 condition,
1250 &iter);
1251 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1252 if (!triggers_ht_node) {
1253 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1254 goto end;
1255 } else {
1256 cmd_reply = LTTNG_OK;
1257 }
1258
1259 /* Remove trigger from channel_triggers_ht. */
1260 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1261 channel_triggers_ht_node) {
1262 struct lttng_trigger_list_element *trigger_element, *tmp;
1263
1264 cds_list_for_each_entry_safe(trigger_element, tmp,
1265 &trigger_list->list, node) {
1266 struct lttng_condition *current_condition =
1267 lttng_trigger_get_condition(
1268 trigger_element->trigger);
1269
1270 assert(current_condition);
1271 if (!lttng_condition_is_equal(condition,
1272 current_condition)) {
1273 continue;
1274 }
1275
1276 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1277 cds_list_del(&trigger_element->node);
f0225486
JR
1278 /* A trigger can only appear once per channel */
1279 break;
ab0ee2ca
JG
1280 }
1281 }
1282
1283 /*
1284 * Remove and release the client list from
1285 * notification_trigger_clients_ht.
1286 */
1287 cds_lfht_lookup(state->notification_trigger_clients_ht,
1288 lttng_condition_hash(condition),
1289 match_client_list,
1290 trigger,
1291 &iter);
1292 node = cds_lfht_iter_get_node(&iter);
1293 assert(node);
1294 client_list = caa_container_of(node, struct notification_client_list,
1295 notification_trigger_ht_node);
1296 cds_list_for_each_entry_safe(client_list_element, tmp,
1297 &client_list->list, node) {
1298 free(client_list_element);
1299 }
1300 cds_lfht_del(state->notification_trigger_clients_ht, node);
0fe4c3bf 1301 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
ab0ee2ca
JG
1302
1303 /* Remove trigger from triggers_ht. */
1304 trigger_ht_element = caa_container_of(triggers_ht_node,
1305 struct lttng_trigger_ht_element, node);
1306 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1307
1308 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1309 lttng_condition_destroy(condition);
1310 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1311 lttng_action_destroy(action);
1312 lttng_trigger_destroy(trigger_ht_element->trigger);
0fe4c3bf 1313 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
1314end:
1315 rcu_read_unlock();
1316 if (_cmd_reply) {
1317 *_cmd_reply = cmd_reply;
1318 }
1319 return 0;
1320}
1321
1322/* Returns 0 on success, 1 on exit requested, negative value on error. */
1323int handle_notification_thread_command(
1324 struct notification_thread_handle *handle,
1325 struct notification_thread_state *state)
1326{
1327 int ret;
1328 uint64_t counter;
1329 struct notification_thread_command *cmd;
1330
1331 /* Read event_fd to put it back into a quiescent state. */
800c5e10 1332 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter, sizeof(counter));
ab0ee2ca
JG
1333 if (ret == -1) {
1334 goto error;
1335 }
1336
1337 pthread_mutex_lock(&handle->cmd_queue.lock);
1338 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1339 struct notification_thread_command, cmd_list_node);
1340 switch (cmd->type) {
1341 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1342 DBG("[notification-thread] Received register trigger command");
1343 ret = handle_notification_thread_command_register_trigger(
1344 state, cmd->parameters.trigger,
1345 &cmd->reply_code);
1346 break;
1347 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1348 DBG("[notification-thread] Received unregister trigger command");
1349 ret = handle_notification_thread_command_unregister_trigger(
1350 state, cmd->parameters.trigger,
1351 &cmd->reply_code);
1352 break;
1353 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1354 DBG("[notification-thread] Received add channel command");
1355 ret = handle_notification_thread_command_add_channel(
1356 state, &cmd->parameters.add_channel,
1357 &cmd->reply_code);
1358 break;
1359 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1360 DBG("[notification-thread] Received remove channel command");
1361 ret = handle_notification_thread_command_remove_channel(
1362 state, cmd->parameters.remove_channel.key,
1363 cmd->parameters.remove_channel.domain,
1364 &cmd->reply_code);
1365 break;
1366 case NOTIFICATION_COMMAND_TYPE_QUIT:
1367 DBG("[notification-thread] Received quit command");
1368 cmd->reply_code = LTTNG_OK;
1369 ret = 1;
1370 goto end;
1371 default:
1372 ERR("[notification-thread] Unknown internal command received");
1373 goto error_unlock;
1374 }
1375
1376 if (ret) {
1377 goto error_unlock;
1378 }
1379end:
1380 cds_list_del(&cmd->cmd_list_node);
8f5e7d40 1381 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1382 pthread_mutex_unlock(&handle->cmd_queue.lock);
1383 return ret;
1384error_unlock:
1385 /* Wake-up and return a fatal error to the calling thread. */
8f5e7d40 1386 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1387 pthread_mutex_unlock(&handle->cmd_queue.lock);
1388 cmd->reply_code = LTTNG_ERR_FATAL;
1389error:
1390 /* Indicate a fatal error to the caller. */
1391 return -1;
1392}
1393
1394static
1395unsigned long hash_client_socket(int socket)
1396{
1397 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1398}
1399
1400static
1401int socket_set_non_blocking(int socket)
1402{
1403 int ret, flags;
1404
1405 /* Set the pipe as non-blocking. */
1406 ret = fcntl(socket, F_GETFL, 0);
1407 if (ret == -1) {
1408 PERROR("fcntl get socket flags");
1409 goto end;
1410 }
1411 flags = ret;
1412
1413 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1414 if (ret == -1) {
1415 PERROR("fcntl set O_NONBLOCK socket flag");
1416 goto end;
1417 }
1418 DBG("Client socket (fd = %i) set as non-blocking", socket);
1419end:
1420 return ret;
1421}
1422
1423static
b5eb6b46 1424int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1425{
1426 int ret;
1427
1428 ret = lttng_dynamic_buffer_set_size(
1429 &client->communication.inbound.buffer, 0);
1430 assert(!ret);
1431
1432 client->communication.inbound.bytes_to_receive =
1433 sizeof(struct lttng_notification_channel_message);
1434 client->communication.inbound.msg_type =
1435 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1436 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1437 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
b5eb6b46
JG
1438 ret = lttng_dynamic_buffer_set_size(
1439 &client->communication.inbound.buffer,
1440 client->communication.inbound.bytes_to_receive);
1441 return ret;
ab0ee2ca
JG
1442}
1443
1444int handle_notification_thread_client_connect(
1445 struct notification_thread_state *state)
1446{
1447 int ret;
1448 struct notification_client *client;
1449
1450 DBG("[notification-thread] Handling new notification channel client connection");
1451
1452 client = zmalloc(sizeof(*client));
1453 if (!client) {
1454 /* Fatal error. */
1455 ret = -1;
1456 goto error;
1457 }
1458 CDS_INIT_LIST_HEAD(&client->condition_list);
1459 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1460 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
f950d02c 1461 client->communication.inbound.expect_creds = true;
b5eb6b46
JG
1462 ret = client_reset_inbound_state(client);
1463 if (ret) {
1464 ERR("[notification-thread] Failed to reset client communication's inbound state");
1465 ret = 0;
1466 goto error;
1467 }
ab0ee2ca
JG
1468
1469 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1470 if (ret < 0) {
1471 ERR("[notification-thread] Failed to accept new notification channel client connection");
1472 ret = 0;
1473 goto error;
1474 }
1475
1476 client->socket = ret;
1477
1478 ret = socket_set_non_blocking(client->socket);
1479 if (ret) {
1480 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1481 goto error;
1482 }
1483
1484 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1485 if (ret < 0) {
1486 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1487 ret = 0;
1488 goto error;
1489 }
1490
1491 ret = lttng_poll_add(&state->events, client->socket,
1492 LPOLLIN | LPOLLERR |
1493 LPOLLHUP | LPOLLRDHUP);
1494 if (ret < 0) {
1495 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1496 ret = 0;
1497 goto error;
1498 }
1499 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1500 client->socket);
1501
ab0ee2ca
JG
1502 rcu_read_lock();
1503 cds_lfht_add(state->client_socket_ht,
1504 hash_client_socket(client->socket),
1505 &client->client_socket_ht_node);
1506 rcu_read_unlock();
1507
1508 return ret;
1509error:
1510 notification_client_destroy(client, state);
1511 return ret;
1512}
1513
1514int handle_notification_thread_client_disconnect(
1515 int client_socket,
1516 struct notification_thread_state *state)
1517{
1518 int ret = 0;
1519 struct notification_client *client;
1520
1521 rcu_read_lock();
1522 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1523 client_socket);
1524 client = get_client_from_socket(client_socket, state);
1525 if (!client) {
1526 /* Internal state corruption, fatal error. */
1527 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1528 client_socket);
1529 ret = -1;
1530 goto end;
1531 }
1532
1533 ret = lttng_poll_del(&state->events, client_socket);
1534 if (ret) {
1535 ERR("[notification-thread] Failed to remove client socket from poll set");
1536 }
1537 cds_lfht_del(state->client_socket_ht,
1538 &client->client_socket_ht_node);
1539 notification_client_destroy(client, state);
1540end:
1541 rcu_read_unlock();
1542 return ret;
1543}
1544
1545int handle_notification_thread_client_disconnect_all(
1546 struct notification_thread_state *state)
1547{
1548 struct cds_lfht_iter iter;
1549 struct notification_client *client;
1550 bool error_encoutered = false;
1551
1552 rcu_read_lock();
1553 DBG("[notification-thread] Closing all client connections");
1554 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1555 client_socket_ht_node) {
1556 int ret;
1557
1558 ret = handle_notification_thread_client_disconnect(
1559 client->socket, state);
1560 if (ret) {
1561 error_encoutered = true;
1562 }
1563 }
1564 rcu_read_unlock();
1565 return error_encoutered ? 1 : 0;
1566}
1567
1568int handle_notification_thread_trigger_unregister_all(
1569 struct notification_thread_state *state)
1570{
8a9bd316 1571 bool error_occurred = false;
ab0ee2ca
JG
1572 struct cds_lfht_iter iter;
1573 struct lttng_trigger_ht_element *trigger_ht_element;
1574
6bfc8da0 1575 rcu_read_lock();
ab0ee2ca
JG
1576 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1577 node) {
1578 int ret = handle_notification_thread_command_unregister_trigger(
1579 state, trigger_ht_element->trigger, NULL);
1580 if (ret) {
8a9bd316 1581 error_occurred = true;
ab0ee2ca
JG
1582 }
1583 }
6bfc8da0 1584 rcu_read_unlock();
8a9bd316 1585 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1586}
1587
1588static
1589int client_flush_outgoing_queue(struct notification_client *client,
1590 struct notification_thread_state *state)
1591{
1592 ssize_t ret;
1593 size_t to_send_count;
1594
1595 assert(client->communication.outbound.buffer.size != 0);
1596 to_send_count = client->communication.outbound.buffer.size;
1597 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1598 client->socket);
1599
1600 ret = lttcomm_send_unix_sock_non_block(client->socket,
1601 client->communication.outbound.buffer.data,
1602 to_send_count);
1603 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1604 (ret > 0 && ret < to_send_count)) {
1605 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1606 client->socket);
1607 to_send_count -= max(ret, 0);
1608
1609 memcpy(client->communication.outbound.buffer.data,
1610 client->communication.outbound.buffer.data +
1611 client->communication.outbound.buffer.size - to_send_count,
1612 to_send_count);
1613 ret = lttng_dynamic_buffer_set_size(
1614 &client->communication.outbound.buffer,
1615 to_send_count);
1616 if (ret) {
1617 goto error;
1618 }
1619
1620 /*
1621 * We want to be notified whenever there is buffer space
1622 * available to send the rest of the payload.
1623 */
1624 ret = lttng_poll_mod(&state->events, client->socket,
1625 CLIENT_POLL_MASK_IN_OUT);
1626 if (ret) {
1627 goto error;
1628 }
1629 } else if (ret < 0) {
1630 /* Generic error, disconnect the client. */
1631 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1632 client->socket);
1633 ret = handle_notification_thread_client_disconnect(
1634 client->socket, state);
1635 if (ret) {
1636 goto error;
1637 }
1638 } else {
1639 /* No error and flushed the queue completely. */
1640 ret = lttng_dynamic_buffer_set_size(
1641 &client->communication.outbound.buffer, 0);
1642 if (ret) {
1643 goto error;
1644 }
1645 ret = lttng_poll_mod(&state->events, client->socket,
1646 CLIENT_POLL_MASK_IN);
1647 if (ret) {
1648 goto error;
1649 }
1650
1651 client->communication.outbound.queued_command_reply = false;
1652 client->communication.outbound.dropped_notification = false;
1653 }
1654
1655 return 0;
1656error:
1657 return -1;
1658}
1659
1660static
1661int client_send_command_reply(struct notification_client *client,
1662 struct notification_thread_state *state,
1663 enum lttng_notification_channel_status status)
1664{
1665 int ret;
1666 struct lttng_notification_channel_command_reply reply = {
1667 .status = (int8_t) status,
1668 };
1669 struct lttng_notification_channel_message msg = {
1670 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1671 .size = sizeof(reply),
1672 };
1673 char buffer[sizeof(msg) + sizeof(reply)];
1674
1675 if (client->communication.outbound.queued_command_reply) {
1676 /* Protocol error. */
1677 goto error;
1678 }
1679
1680 memcpy(buffer, &msg, sizeof(msg));
1681 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1682 DBG("[notification-thread] Send command reply (%i)", (int) status);
1683
1684 /* Enqueue buffer to outgoing queue and flush it. */
1685 ret = lttng_dynamic_buffer_append(
1686 &client->communication.outbound.buffer,
1687 buffer, sizeof(buffer));
1688 if (ret) {
1689 goto error;
1690 }
1691
1692 ret = client_flush_outgoing_queue(client, state);
1693 if (ret) {
1694 goto error;
1695 }
1696
1697 if (client->communication.outbound.buffer.size != 0) {
1698 /* Queue could not be emptied. */
1699 client->communication.outbound.queued_command_reply = true;
1700 }
1701
1702 return 0;
1703error:
1704 return -1;
1705}
1706
1707static
1708int client_dispatch_message(struct notification_client *client,
1709 struct notification_thread_state *state)
1710{
1711 int ret = 0;
1712
1713 if (client->communication.inbound.msg_type !=
1714 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1715 client->communication.inbound.msg_type !=
1716 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1717 !client->validated) {
1718 WARN("[notification-thread] client attempted a command before handshake");
1719 ret = -1;
1720 goto end;
1721 }
1722
1723 switch (client->communication.inbound.msg_type) {
1724 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1725 {
1726 /*
1727 * Receiving message header. The function will be called again
1728 * once the rest of the message as been received and can be
1729 * interpreted.
1730 */
1731 const struct lttng_notification_channel_message *msg;
1732
1733 assert(sizeof(*msg) ==
1734 client->communication.inbound.buffer.size);
1735 msg = (const struct lttng_notification_channel_message *)
1736 client->communication.inbound.buffer.data;
1737
1738 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1739 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1740 ret = -1;
1741 goto end;
1742 }
1743
1744 switch (msg->type) {
1745 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1746 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1747 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1748 break;
1749 default:
1750 ret = -1;
1751 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1752 goto end;
1753 }
1754
1755 client->communication.inbound.bytes_to_receive = msg->size;
1756 client->communication.inbound.msg_type =
1757 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1758 ret = lttng_dynamic_buffer_set_size(
b5eb6b46 1759 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1760 if (ret) {
1761 goto end;
1762 }
1763 break;
1764 }
1765 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1766 {
1767 struct lttng_notification_channel_command_handshake *handshake_client;
1768 struct lttng_notification_channel_command_handshake handshake_reply = {
1769 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1770 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1771 };
1772 struct lttng_notification_channel_message msg_header = {
1773 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1774 .size = sizeof(handshake_reply),
1775 };
1776 enum lttng_notification_channel_status status =
1777 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1778 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1779
1780 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1781 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1782 sizeof(handshake_reply));
1783
1784 handshake_client =
1785 (struct lttng_notification_channel_command_handshake *)
1786 client->communication.inbound.buffer.data;
b8165ded 1787 client->major = handshake_client->major;
ab0ee2ca
JG
1788 client->minor = handshake_client->minor;
1789 if (!client->communication.inbound.creds_received) {
1790 ERR("[notification-thread] No credentials received from client");
1791 ret = -1;
1792 goto end;
1793 }
1794
1795 client->uid = LTTNG_SOCK_GET_UID_CRED(
1796 &client->communication.inbound.creds);
1797 client->gid = LTTNG_SOCK_GET_GID_CRED(
1798 &client->communication.inbound.creds);
1799 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1800 client->uid, client->gid, (int) client->major,
1801 (int) client->minor);
1802
1803 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1804 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1805 }
1806
1807 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1808 send_buffer, sizeof(send_buffer));
1809 if (ret) {
1810 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1811 goto end;
1812 }
1813
1814 ret = client_flush_outgoing_queue(client, state);
1815 if (ret) {
1816 goto end;
1817 }
1818
1819 ret = client_send_command_reply(client, state, status);
1820 if (ret) {
1821 ERR("[notification-thread] Failed to send reply to notification channel client");
1822 goto end;
1823 }
1824
1825 /* Set reception state to receive the next message header. */
b5eb6b46
JG
1826 ret = client_reset_inbound_state(client);
1827 if (ret) {
1828 ERR("[notification-thread] Failed to reset client communication's inbound state");
1829 goto end;
1830 }
ab0ee2ca
JG
1831 client->validated = true;
1832 break;
1833 }
1834 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1835 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1836 {
1837 struct lttng_condition *condition;
1838 enum lttng_notification_channel_status status =
1839 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1840 const struct lttng_buffer_view condition_view =
1841 lttng_buffer_view_from_dynamic_buffer(
1842 &client->communication.inbound.buffer,
1843 0, -1);
1844 size_t expected_condition_size =
1845 client->communication.inbound.buffer.size;
1846
1847 ret = lttng_condition_create_from_buffer(&condition_view,
1848 &condition);
1849 if (ret != expected_condition_size) {
1850 ERR("[notification-thread] Malformed condition received from client");
1851 goto end;
1852 }
1853
1854 if (client->communication.inbound.msg_type ==
1855 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1856 /*
1857 * FIXME The current state should be evaluated on
1858 * subscription.
1859 */
1860 ret = notification_thread_client_subscribe(client,
1861 condition, state, &status);
1862 } else {
1863 ret = notification_thread_client_unsubscribe(client,
1864 condition, state, &status);
1865 }
1866 if (ret) {
1867 goto end;
1868 }
1869
1870 ret = client_send_command_reply(client, state, status);
1871 if (ret) {
1872 ERR("[notification-thread] Failed to send reply to notification channel client");
1873 goto end;
1874 }
1875
1876 /* Set reception state to receive the next message header. */
b5eb6b46
JG
1877 ret = client_reset_inbound_state(client);
1878 if (ret) {
1879 ERR("[notification-thread] Failed to reset client communication's inbound state");
1880 goto end;
1881 }
ab0ee2ca
JG
1882 break;
1883 }
1884 default:
1885 abort();
1886 }
1887end:
1888 return ret;
1889}
1890
1891/* Incoming data from client. */
1892int handle_notification_thread_client_in(
1893 struct notification_thread_state *state, int socket)
1894{
b5eb6b46 1895 int ret = 0;
ab0ee2ca
JG
1896 struct notification_client *client;
1897 ssize_t recv_ret;
1898 size_t offset;
1899
1900 client = get_client_from_socket(socket, state);
1901 if (!client) {
1902 /* Internal error, abort. */
1903 ret = -1;
1904 goto end;
1905 }
1906
b5eb6b46
JG
1907 offset = client->communication.inbound.buffer.size -
1908 client->communication.inbound.bytes_to_receive;
f950d02c 1909 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1910 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1911 client->communication.inbound.buffer.data + offset,
1912 client->communication.inbound.bytes_to_receive,
1913 &client->communication.inbound.creds);
1914 if (recv_ret > 0) {
f950d02c 1915 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1916 client->communication.inbound.creds_received = true;
1917 }
1918 } else {
1919 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1920 client->communication.inbound.buffer.data + offset,
1921 client->communication.inbound.bytes_to_receive);
1922 }
1923 if (recv_ret < 0) {
1924 goto error_disconnect_client;
1925 }
1926
1927 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1928 if (client->communication.inbound.bytes_to_receive == 0) {
1929 ret = client_dispatch_message(client, state);
1930 if (ret) {
1931 /*
1932 * Only returns an error if this client must be
1933 * disconnected.
1934 */
1935 goto error_disconnect_client;
1936 }
1937 } else {
1938 goto end;
1939 }
1940end:
1941 return ret;
1942error_disconnect_client:
1943 ret = handle_notification_thread_client_disconnect(socket, state);
1944 return ret;
1945}
1946
1947/* Client ready to receive outgoing data. */
1948int handle_notification_thread_client_out(
1949 struct notification_thread_state *state, int socket)
1950{
1951 int ret;
1952 struct notification_client *client;
1953
1954 client = get_client_from_socket(socket, state);
1955 if (!client) {
1956 /* Internal error, abort. */
1957 ret = -1;
1958 goto end;
1959 }
1960
1961 ret = client_flush_outgoing_queue(client, state);
1962 if (ret) {
1963 goto end;
1964 }
1965end:
1966 return ret;
1967}
1968
1969static
1970bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1971 struct channel_state_sample *sample, uint64_t buffer_capacity)
1972{
1973 bool result = false;
1974 uint64_t threshold;
1975 enum lttng_condition_type condition_type;
1976 struct lttng_condition_buffer_usage *use_condition = container_of(
1977 condition, struct lttng_condition_buffer_usage,
1978 parent);
1979
1980 if (!sample) {
1981 goto end;
1982 }
1983
1984 if (use_condition->threshold_bytes.set) {
1985 threshold = use_condition->threshold_bytes.value;
1986 } else {
1987 /*
1988 * Threshold was expressed as a ratio.
1989 *
1990 * TODO the threshold (in bytes) of conditions expressed
1991 * as a ratio of total buffer size could be cached to
1992 * forego this double-multiplication or it could be performed
1993 * as fixed-point math.
1994 *
1995 * Note that caching should accomodate the case where the
1996 * condition applies to multiple channels (i.e. don't assume
1997 * that all channels matching my_chann* have the same size...)
1998 */
1999 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2000 (double) buffer_capacity);
2001 }
2002
2003 condition_type = lttng_condition_get_type(condition);
2004 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2005 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2006 threshold, sample->highest_usage);
2007
2008 /*
2009 * The low condition should only be triggered once _all_ of the
2010 * streams in a channel have gone below the "low" threshold.
2011 */
2012 if (sample->highest_usage <= threshold) {
2013 result = true;
2014 }
2015 } else {
2016 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2017 threshold, sample->highest_usage);
2018
2019 /*
2020 * For high buffer usage scenarios, we want to trigger whenever
2021 * _any_ of the streams has reached the "high" threshold.
2022 */
2023 if (sample->highest_usage >= threshold) {
2024 result = true;
2025 }
2026 }
2027end:
2028 return result;
2029}
2030
2031static
2032int evaluate_condition(struct lttng_condition *condition,
2033 struct lttng_evaluation **evaluation,
2034 struct notification_thread_state *state,
2035 struct channel_state_sample *previous_sample,
2036 struct channel_state_sample *latest_sample,
2037 uint64_t buffer_capacity)
2038{
2039 int ret = 0;
2040 enum lttng_condition_type condition_type;
2041 bool previous_sample_result;
2042 bool latest_sample_result;
2043
2044 condition_type = lttng_condition_get_type(condition);
2045 /* No other condition type supported for the moment. */
2046 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
2047 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
2048
2049 previous_sample_result = evaluate_buffer_usage_condition(condition,
2050 previous_sample, buffer_capacity);
2051 latest_sample_result = evaluate_buffer_usage_condition(condition,
2052 latest_sample, buffer_capacity);
2053
2054 if (!latest_sample_result ||
2055 (previous_sample_result == latest_sample_result)) {
2056 /*
2057 * Only trigger on a condition evaluation transition.
2058 *
2059 * NOTE: This edge-triggered logic may not be appropriate for
2060 * future condition types.
2061 */
2062 goto end;
2063 }
2064
2065 if (evaluation && latest_sample_result) {
2066 *evaluation = lttng_evaluation_buffer_usage_create(
2067 condition_type,
2068 latest_sample->highest_usage,
2069 buffer_capacity);
2070 if (!*evaluation) {
2071 ret = -1;
2072 goto end;
2073 }
2074 }
2075end:
2076 return ret;
2077}
2078
2079static
2080int client_enqueue_dropped_notification(struct notification_client *client,
2081 struct notification_thread_state *state)
2082{
2083 int ret;
2084 struct lttng_notification_channel_message msg = {
2085 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2086 .size = 0,
2087 };
2088
2089 ret = lttng_dynamic_buffer_append(
2090 &client->communication.outbound.buffer, &msg,
2091 sizeof(msg));
2092 return ret;
2093}
2094
2095static
2096int send_evaluation_to_clients(struct lttng_trigger *trigger,
2097 struct lttng_evaluation *evaluation,
2098 struct notification_client_list* client_list,
2099 struct notification_thread_state *state,
2100 uid_t channel_uid, gid_t channel_gid)
2101{
2102 int ret = 0;
2103 struct lttng_dynamic_buffer msg_buffer;
2104 struct notification_client_list_element *client_list_element, *tmp;
2105 struct lttng_notification *notification;
2106 struct lttng_condition *condition;
2107 ssize_t expected_notification_size, notification_size;
2108 struct lttng_notification_channel_message msg;
2109
2110 lttng_dynamic_buffer_init(&msg_buffer);
2111
2112 condition = lttng_trigger_get_condition(trigger);
2113 assert(condition);
2114
2115 notification = lttng_notification_create(condition, evaluation);
2116 if (!notification) {
2117 ret = -1;
2118 goto end;
2119 }
2120
2121 expected_notification_size = lttng_notification_serialize(notification,
2122 NULL);
2123 if (expected_notification_size < 0) {
2124 ERR("[notification-thread] Failed to get size of serialized notification");
2125 ret = -1;
2126 goto end;
2127 }
2128
2129 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
2130 msg.size = (uint32_t) expected_notification_size;
2131 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
2132 if (ret) {
2133 goto end;
2134 }
2135
2136 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
2137 msg_buffer.size + expected_notification_size);
2138 if (ret) {
2139 goto end;
2140 }
2141
2142 notification_size = lttng_notification_serialize(notification,
2143 msg_buffer.data + sizeof(msg));
2144 if (notification_size != expected_notification_size) {
2145 ERR("[notification-thread] Failed to serialize notification");
2146 ret = -1;
2147 goto end;
2148 }
2149
2150 cds_list_for_each_entry_safe(client_list_element, tmp,
2151 &client_list->list, node) {
2152 struct notification_client *client =
2153 client_list_element->client;
2154
2155 if (client->uid != channel_uid && client->gid != channel_gid &&
2156 client->uid != 0) {
2157 /* Client is not allowed to monitor this channel. */
2158 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2159 continue;
2160 }
2161
2162 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2163 client->socket, msg_buffer.size);
2164 if (client->communication.outbound.buffer.size) {
2165 /*
2166 * Outgoing data is already buffered for this client;
2167 * drop the notification and enqueue a "dropped
2168 * notification" message if this is the first dropped
2169 * notification since the socket spilled-over to the
2170 * queue.
2171 */
2172 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2173 client->socket);
2174 if (!client->communication.outbound.dropped_notification) {
2175 client->communication.outbound.dropped_notification = true;
2176 ret = client_enqueue_dropped_notification(
2177 client, state);
2178 if (ret) {
2179 goto end;
2180 }
2181 }
2182 continue;
2183 }
2184
2185 ret = lttng_dynamic_buffer_append_buffer(
2186 &client->communication.outbound.buffer,
2187 &msg_buffer);
2188 if (ret) {
2189 goto end;
2190 }
2191
2192 ret = client_flush_outgoing_queue(client, state);
2193 if (ret) {
2194 goto end;
2195 }
2196 }
2197 ret = 0;
2198end:
2199 lttng_notification_destroy(notification);
2200 lttng_dynamic_buffer_reset(&msg_buffer);
2201 return ret;
2202}
2203
2204int handle_notification_thread_channel_sample(
2205 struct notification_thread_state *state, int pipe,
2206 enum lttng_domain_type domain)
2207{
2208 int ret = 0;
2209 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2210 struct channel_state_sample previous_sample, latest_sample;
2211 struct channel_info *channel_info;
2212 struct cds_lfht_node *node;
2213 struct cds_lfht_iter iter;
2214 struct lttng_channel_trigger_list *trigger_list;
2215 struct lttng_trigger_list_element *trigger_list_element;
2216 bool previous_sample_available = false;
2217
2218 /*
2219 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2220 * ensuring that read/write of sampling messages are atomic.
2221 */
8af8a558 2222 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2223 if (ret != sizeof(sample_msg)) {
2224 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2225 pipe);
2226 ret = -1;
2227 goto end;
2228 }
2229
2230 ret = 0;
2231 latest_sample.key.key = sample_msg.key;
2232 latest_sample.key.domain = domain;
2233 latest_sample.highest_usage = sample_msg.highest;
2234 latest_sample.lowest_usage = sample_msg.lowest;
2235
2236 rcu_read_lock();
2237
2238 /* Retrieve the channel's informations */
2239 cds_lfht_lookup(state->channels_ht,
2240 hash_channel_key(&latest_sample.key),
2241 match_channel_info,
2242 &latest_sample.key,
2243 &iter);
2244 node = cds_lfht_iter_get_node(&iter);
2245 if (!node) {
2246 /*
2247 * Not an error since the consumer can push a sample to the pipe
2248 * and the rest of the session daemon could notify us of the
2249 * channel's destruction before we get a chance to process that
2250 * sample.
2251 */
2252 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2253 latest_sample.key.key,
2254 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2255 "user space");
2256 goto end_unlock;
2257 }
2258 channel_info = caa_container_of(node, struct channel_info,
2259 channels_ht_node);
2260 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2261 channel_info->channel_name,
2262 latest_sample.key.key,
2263 channel_info->session_name,
2264 latest_sample.highest_usage,
2265 latest_sample.lowest_usage);
2266
2267 /* Retrieve the channel's last sample, if it exists, and update it. */
2268 cds_lfht_lookup(state->channel_state_ht,
2269 hash_channel_key(&latest_sample.key),
2270 match_channel_state_sample,
2271 &latest_sample.key,
2272 &iter);
2273 node = cds_lfht_iter_get_node(&iter);
2274 if (node) {
2275 struct channel_state_sample *stored_sample;
2276
2277 /* Update the sample stored. */
2278 stored_sample = caa_container_of(node,
2279 struct channel_state_sample,
2280 channel_state_ht_node);
2281 memcpy(&previous_sample, stored_sample,
2282 sizeof(previous_sample));
2283 stored_sample->highest_usage = latest_sample.highest_usage;
2284 stored_sample->lowest_usage = latest_sample.lowest_usage;
2285 previous_sample_available = true;
2286 } else {
2287 /*
2288 * This is the channel's first sample, allocate space for and
2289 * store the new sample.
2290 */
2291 struct channel_state_sample *stored_sample;
2292
2293 stored_sample = zmalloc(sizeof(*stored_sample));
2294 if (!stored_sample) {
2295 ret = -1;
2296 goto end_unlock;
2297 }
2298
2299 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2300 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2301 cds_lfht_add(state->channel_state_ht,
2302 hash_channel_key(&stored_sample->key),
2303 &stored_sample->channel_state_ht_node);
2304 }
2305
2306 /* Find triggers associated with this channel. */
2307 cds_lfht_lookup(state->channel_triggers_ht,
2308 hash_channel_key(&latest_sample.key),
2309 match_channel_trigger_list,
2310 &latest_sample.key,
2311 &iter);
2312 node = cds_lfht_iter_get_node(&iter);
2313 if (!node) {
2314 goto end_unlock;
2315 }
2316
2317 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2318 channel_triggers_ht_node);
2319 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2320 node) {
2321 struct lttng_condition *condition;
2322 struct lttng_action *action;
2323 struct lttng_trigger *trigger;
2324 struct notification_client_list *client_list;
2325 struct lttng_evaluation *evaluation = NULL;
2326
2327 trigger = trigger_list_element->trigger;
2328 condition = lttng_trigger_get_condition(trigger);
2329 assert(condition);
2330 action = lttng_trigger_get_action(trigger);
2331
2332 /* Notify actions are the only type currently supported. */
2333 assert(lttng_action_get_type(action) ==
2334 LTTNG_ACTION_TYPE_NOTIFY);
2335
2336 /*
2337 * Check if any client is subscribed to the result of this
2338 * evaluation.
2339 */
2340 cds_lfht_lookup(state->notification_trigger_clients_ht,
2341 lttng_condition_hash(condition),
2342 match_client_list,
2343 trigger,
2344 &iter);
2345 node = cds_lfht_iter_get_node(&iter);
2346 assert(node);
2347
2348 client_list = caa_container_of(node,
2349 struct notification_client_list,
2350 notification_trigger_ht_node);
2351 if (cds_list_empty(&client_list->list)) {
2352 /*
2353 * No clients interested in the evaluation's result,
2354 * skip it.
2355 */
2356 continue;
2357 }
2358
2359 ret = evaluate_condition(condition, &evaluation, state,
2360 previous_sample_available ? &previous_sample : NULL,
2361 &latest_sample, channel_info->capacity);
2362 if (ret) {
2363 goto end_unlock;
2364 }
2365
2366 if (!evaluation) {
2367 continue;
2368 }
2369
2370 /* Dispatch evaluation result to all clients. */
2371 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2372 evaluation, client_list, state,
2373 channel_info->uid, channel_info->gid);
2374 if (ret) {
2375 goto end_unlock;
2376 }
2377 }
2378end_unlock:
2379 rcu_read_unlock();
2380end:
2381 return ret;
2382}
This page took 0.121477 seconds and 5 git commands to generate.