Typo: occured -> occurred
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/notification/channel-internal.h>
1da26331 36
ab0ee2ca
JG
37#include <time.h>
38#include <unistd.h>
39#include <assert.h>
40#include <inttypes.h>
d53ea4e4 41#include <fcntl.h>
ab0ee2ca 42
1da26331
JG
43#include "notification-thread.h"
44#include "notification-thread-events.h"
45#include "notification-thread-commands.h"
46#include "lttng-sessiond.h"
47#include "kernel.h"
48
ab0ee2ca
JG
49#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
50#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
51
52struct lttng_trigger_list_element {
53 struct lttng_trigger *trigger;
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
59 struct cds_list_head list;
60 struct cds_lfht_node channel_triggers_ht_node;
61};
62
63struct lttng_trigger_ht_element {
64 struct lttng_trigger *trigger;
65 struct cds_lfht_node node;
66};
67
68struct lttng_condition_list_element {
69 struct lttng_condition *condition;
70 struct cds_list_head node;
71};
72
73struct notification_client_list_element {
74 struct notification_client *client;
75 struct cds_list_head node;
76};
77
78struct notification_client_list {
79 struct lttng_trigger *trigger;
80 struct cds_list_head list;
81 struct cds_lfht_node notification_trigger_ht_node;
82};
83
84struct notification_client {
85 int socket;
86 /* Client protocol version. */
87 uint8_t major, minor;
88 uid_t uid;
89 gid_t gid;
90 /*
91 * Indicates if the credentials and versions of the client has been
92 * checked.
93 */
94 bool validated;
95 /*
96 * Conditions to which the client's notification channel is subscribed.
97 * List of struct lttng_condition_list_node. The condition member is
98 * owned by the client.
99 */
100 struct cds_list_head condition_list;
101 struct cds_lfht_node client_socket_ht_node;
102 struct {
103 struct {
14fa22f8
JG
104 /*
105 * During the reception of a message, the reception
106 * buffers' "size" is set to contain the current
107 * message's complete payload.
108 */
ab0ee2ca
JG
109 struct lttng_dynamic_buffer buffer;
110 /* Bytes left to receive for the current message. */
111 size_t bytes_to_receive;
112 /* Type of the message being received. */
113 enum lttng_notification_channel_message_type msg_type;
114 /*
115 * Indicates whether or not credentials are expected
116 * from the client.
117 */
01ea340e 118 bool expect_creds;
ab0ee2ca
JG
119 /*
120 * Indicates whether or not credentials were received
121 * from the client.
122 */
123 bool creds_received;
14fa22f8 124 /* Only used during credentials reception. */
ab0ee2ca
JG
125 lttng_sock_cred creds;
126 } inbound;
127 struct {
128 /*
129 * Indicates whether or not a notification addressed to
130 * this client was dropped because a command reply was
131 * already buffered.
132 *
133 * A notification is dropped whenever the buffer is not
134 * empty.
135 */
136 bool dropped_notification;
137 /*
138 * Indicates whether or not a command reply is already
139 * buffered. In this case, it means that the client is
140 * not consuming command replies before emitting a new
141 * one. This could be caused by a protocol error or a
142 * misbehaving/malicious client.
143 */
144 bool queued_command_reply;
145 struct lttng_dynamic_buffer buffer;
146 } outbound;
147 } communication;
148};
149
150struct channel_state_sample {
151 struct channel_key key;
152 struct cds_lfht_node channel_state_ht_node;
153 uint64_t highest_usage;
154 uint64_t lowest_usage;
155};
156
157static
158int match_client(struct cds_lfht_node *node, const void *key)
159{
160 /* This double-cast is intended to supress pointer-to-cast warning. */
161 int socket = (int) (intptr_t) key;
162 struct notification_client *client;
163
164 client = caa_container_of(node, struct notification_client,
165 client_socket_ht_node);
166
167 return !!(client->socket == socket);
168}
169
170static
171int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
172{
173 struct channel_key *channel_key = (struct channel_key *) key;
174 struct lttng_channel_trigger_list *trigger_list;
175
176 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
177 channel_triggers_ht_node);
178
179 return !!((channel_key->key == trigger_list->channel_key.key) &&
180 (channel_key->domain == trigger_list->channel_key.domain));
181}
182
183static
184int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
185{
186 struct channel_key *channel_key = (struct channel_key *) key;
187 struct channel_state_sample *sample;
188
189 sample = caa_container_of(node, struct channel_state_sample,
190 channel_state_ht_node);
191
192 return !!((channel_key->key == sample->key.key) &&
193 (channel_key->domain == sample->key.domain));
194}
195
196static
197int match_channel_info(struct cds_lfht_node *node, const void *key)
198{
199 struct channel_key *channel_key = (struct channel_key *) key;
200 struct channel_info *channel_info;
201
202 channel_info = caa_container_of(node, struct channel_info,
203 channels_ht_node);
204
205 return !!((channel_key->key == channel_info->key.key) &&
206 (channel_key->domain == channel_info->key.domain));
207}
208
209static
210int match_condition(struct cds_lfht_node *node, const void *key)
211{
212 struct lttng_condition *condition_key = (struct lttng_condition *) key;
213 struct lttng_trigger_ht_element *trigger;
214 struct lttng_condition *condition;
215
216 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
217 node);
218 condition = lttng_trigger_get_condition(trigger->trigger);
219 assert(condition);
220
221 return !!lttng_condition_is_equal(condition_key, condition);
222}
223
224static
225int match_client_list(struct cds_lfht_node *node, const void *key)
226{
227 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
228 struct notification_client_list *client_list;
229 struct lttng_condition *condition;
230 struct lttng_condition *condition_key = lttng_trigger_get_condition(
231 trigger_key);
232
233 assert(condition_key);
234
235 client_list = caa_container_of(node, struct notification_client_list,
236 notification_trigger_ht_node);
237 condition = lttng_trigger_get_condition(client_list->trigger);
238
239 return !!lttng_condition_is_equal(condition_key, condition);
240}
241
242static
243int match_client_list_condition(struct cds_lfht_node *node, const void *key)
244{
245 struct lttng_condition *condition_key = (struct lttng_condition *) key;
246 struct notification_client_list *client_list;
247 struct lttng_condition *condition;
248
249 assert(condition_key);
250
251 client_list = caa_container_of(node, struct notification_client_list,
252 notification_trigger_ht_node);
253 condition = lttng_trigger_get_condition(client_list->trigger);
254
255 return !!lttng_condition_is_equal(condition_key, condition);
256}
257
258static
259unsigned long lttng_condition_buffer_usage_hash(
260 struct lttng_condition *_condition)
261{
262 unsigned long hash = 0;
263 struct lttng_condition_buffer_usage *condition;
264
265 condition = container_of(_condition,
266 struct lttng_condition_buffer_usage, parent);
267
268 if (condition->session_name) {
269 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
270 }
271 if (condition->channel_name) {
8f56701f 272 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
273 }
274 if (condition->domain.set) {
275 hash ^= hash_key_ulong(
276 (void *) condition->domain.type,
277 lttng_ht_seed);
278 }
279 if (condition->threshold_ratio.set) {
280 uint64_t val;
281
282 val = condition->threshold_ratio.value * (double) UINT32_MAX;
283 hash ^= hash_key_u64(&val, lttng_ht_seed);
284 } else if (condition->threshold_ratio.set) {
285 uint64_t val;
286
287 val = condition->threshold_bytes.value;
288 hash ^= hash_key_u64(&val, lttng_ht_seed);
289 }
290 return hash;
291}
292
293/*
294 * The lttng_condition hashing code is kept in this file (rather than
295 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
296 * don't want to link in liblttng-ctl.
297 */
298static
299unsigned long lttng_condition_hash(struct lttng_condition *condition)
300{
301 switch (condition->type) {
302 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
303 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
304 return lttng_condition_buffer_usage_hash(condition);
305 default:
306 ERR("[notification-thread] Unexpected condition type caught");
307 abort();
308 }
309}
310
311static
312void channel_info_destroy(struct channel_info *channel_info)
313{
314 if (!channel_info) {
315 return;
316 }
317
318 if (channel_info->session_name) {
319 free(channel_info->session_name);
320 }
321 if (channel_info->channel_name) {
322 free(channel_info->channel_name);
323 }
324 free(channel_info);
325}
326
327static
328struct channel_info *channel_info_copy(struct channel_info *channel_info)
329{
330 struct channel_info *copy = zmalloc(sizeof(*channel_info));
331
332 assert(channel_info);
333 assert(channel_info->session_name);
334 assert(channel_info->channel_name);
335
336 if (!copy) {
337 goto end;
338 }
339
340 memcpy(copy, channel_info, sizeof(*channel_info));
341 copy->session_name = NULL;
342 copy->channel_name = NULL;
343
344 copy->session_name = strdup(channel_info->session_name);
345 if (!copy->session_name) {
346 goto error;
347 }
348 copy->channel_name = strdup(channel_info->channel_name);
349 if (!copy->channel_name) {
350 goto error;
351 }
352 cds_lfht_node_init(&channel_info->channels_ht_node);
353end:
354 return copy;
355error:
356 channel_info_destroy(copy);
357 return NULL;
358}
359
360static
361int notification_thread_client_subscribe(struct notification_client *client,
362 struct lttng_condition *condition,
363 struct notification_thread_state *state,
364 enum lttng_notification_channel_status *_status)
365{
366 int ret = 0;
367 struct cds_lfht_iter iter;
368 struct cds_lfht_node *node;
369 struct notification_client_list *client_list;
370 struct lttng_condition_list_element *condition_list_element = NULL;
371 struct notification_client_list_element *client_list_element = NULL;
372 enum lttng_notification_channel_status status =
373 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
374
375 /*
376 * Ensure that the client has not already subscribed to this condition
377 * before.
378 */
379 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
380 if (lttng_condition_is_equal(condition_list_element->condition,
381 condition)) {
382 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
383 goto end;
384 }
385 }
386
387 condition_list_element = zmalloc(sizeof(*condition_list_element));
388 if (!condition_list_element) {
389 ret = -1;
390 goto error;
391 }
392 client_list_element = zmalloc(sizeof(*client_list_element));
393 if (!client_list_element) {
394 ret = -1;
395 goto error;
396 }
397
398 rcu_read_lock();
399
400 /*
401 * Add the newly-subscribed condition to the client's subscription list.
402 */
403 CDS_INIT_LIST_HEAD(&condition_list_element->node);
404 condition_list_element->condition = condition;
405 cds_list_add(&condition_list_element->node, &client->condition_list);
406
407 /*
408 * Add the client to the list of clients interested in a given trigger
409 * if a "notification" trigger with a corresponding condition was
410 * added prior.
411 */
412 cds_lfht_lookup(state->notification_trigger_clients_ht,
413 lttng_condition_hash(condition),
414 match_client_list_condition,
415 condition,
416 &iter);
417 node = cds_lfht_iter_get_node(&iter);
418 if (!node) {
4fb43b68 419 free(client_list_element);
ab0ee2ca
JG
420 goto end_unlock;
421 }
422
423 client_list = caa_container_of(node, struct notification_client_list,
424 notification_trigger_ht_node);
425 client_list_element->client = client;
426 CDS_INIT_LIST_HEAD(&client_list_element->node);
427 cds_list_add(&client_list_element->node, &client_list->list);
428end_unlock:
429 rcu_read_unlock();
430end:
431 if (_status) {
432 *_status = status;
433 }
434 return ret;
435error:
436 free(condition_list_element);
437 free(client_list_element);
438 return ret;
439}
440
441static
442int notification_thread_client_unsubscribe(
443 struct notification_client *client,
444 struct lttng_condition *condition,
445 struct notification_thread_state *state,
446 enum lttng_notification_channel_status *_status)
447{
448 struct cds_lfht_iter iter;
449 struct cds_lfht_node *node;
450 struct notification_client_list *client_list;
451 struct lttng_condition_list_element *condition_list_element,
452 *condition_tmp;
453 struct notification_client_list_element *client_list_element,
454 *client_tmp;
455 bool condition_found = false;
456 enum lttng_notification_channel_status status =
457 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
458
459 /* Remove the condition from the client's condition list. */
460 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
461 &client->condition_list, node) {
462 if (!lttng_condition_is_equal(condition_list_element->condition,
463 condition)) {
464 continue;
465 }
466
467 cds_list_del(&condition_list_element->node);
468 /*
469 * The caller may be iterating on the client's conditions to
470 * tear down a client's connection. In this case, the condition
471 * will be destroyed at the end.
472 */
473 if (condition != condition_list_element->condition) {
474 lttng_condition_destroy(
475 condition_list_element->condition);
476 }
477 free(condition_list_element);
478 condition_found = true;
479 break;
480 }
481
482 if (!condition_found) {
483 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
484 goto end;
485 }
486
487 /*
488 * Remove the client from the list of clients interested the trigger
489 * matching the condition.
490 */
491 rcu_read_lock();
492 cds_lfht_lookup(state->notification_trigger_clients_ht,
493 lttng_condition_hash(condition),
494 match_client_list_condition,
495 condition,
496 &iter);
497 node = cds_lfht_iter_get_node(&iter);
498 if (!node) {
499 goto end_unlock;
500 }
501
502 client_list = caa_container_of(node, struct notification_client_list,
503 notification_trigger_ht_node);
504 cds_list_for_each_entry_safe(client_list_element, client_tmp,
505 &client_list->list, node) {
506 if (client_list_element->client->socket != client->socket) {
507 continue;
508 }
509 cds_list_del(&client_list_element->node);
510 free(client_list_element);
511 break;
512 }
513end_unlock:
514 rcu_read_unlock();
515end:
516 lttng_condition_destroy(condition);
517 if (_status) {
518 *_status = status;
519 }
520 return 0;
521}
522
523static
524void notification_client_destroy(struct notification_client *client,
525 struct notification_thread_state *state)
526{
527 struct lttng_condition_list_element *condition_list_element, *tmp;
528
529 if (!client) {
530 return;
531 }
532
533 /* Release all conditions to which the client was subscribed. */
534 cds_list_for_each_entry_safe(condition_list_element, tmp,
535 &client->condition_list, node) {
536 (void) notification_thread_client_unsubscribe(client,
537 condition_list_element->condition, state, NULL);
538 }
539
540 if (client->socket >= 0) {
541 (void) lttcomm_close_unix_sock(client->socket);
542 }
543 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
544 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
545 free(client);
546}
547
548/*
549 * Call with rcu_read_lock held (and hold for the lifetime of the returned
550 * client pointer).
551 */
552static
553struct notification_client *get_client_from_socket(int socket,
554 struct notification_thread_state *state)
555{
556 struct cds_lfht_iter iter;
557 struct cds_lfht_node *node;
558 struct notification_client *client = NULL;
559
560 cds_lfht_lookup(state->client_socket_ht,
561 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
562 match_client,
563 (void *) (unsigned long) socket,
564 &iter);
565 node = cds_lfht_iter_get_node(&iter);
566 if (!node) {
567 goto end;
568 }
569
570 client = caa_container_of(node, struct notification_client,
571 client_socket_ht_node);
572end:
573 return client;
574}
575
576static
577bool trigger_applies_to_channel(struct lttng_trigger *trigger,
578 struct channel_info *info)
579{
580 enum lttng_condition_status status;
581 struct lttng_condition *condition;
582 const char *trigger_session_name = NULL;
583 const char *trigger_channel_name = NULL;
584 enum lttng_domain_type trigger_domain;
585
586 condition = lttng_trigger_get_condition(trigger);
587 if (!condition) {
588 goto fail;
589 }
590
591 switch (lttng_condition_get_type(condition)) {
592 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
593 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
594 break;
595 default:
596 goto fail;
597 }
598
599 status = lttng_condition_buffer_usage_get_domain_type(condition,
600 &trigger_domain);
601 assert(status == LTTNG_CONDITION_STATUS_OK);
602 if (info->key.domain != trigger_domain) {
603 goto fail;
604 }
605
606 status = lttng_condition_buffer_usage_get_session_name(
607 condition, &trigger_session_name);
608 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_session_name);
609
610 status = lttng_condition_buffer_usage_get_channel_name(
611 condition, &trigger_channel_name);
612 assert((status == LTTNG_CONDITION_STATUS_OK) && trigger_channel_name);
613
614 if (strcmp(info->session_name, trigger_session_name)) {
615 goto fail;
616 }
617 if (strcmp(info->channel_name, trigger_channel_name)) {
618 goto fail;
619 }
620
621 return true;
622fail:
623 return false;
624}
625
626static
627bool trigger_applies_to_client(struct lttng_trigger *trigger,
628 struct notification_client *client)
629{
630 bool applies = false;
631 struct lttng_condition_list_element *condition_list_element;
632
633 cds_list_for_each_entry(condition_list_element, &client->condition_list,
634 node) {
635 applies = lttng_condition_is_equal(
636 condition_list_element->condition,
637 lttng_trigger_get_condition(trigger));
638 if (applies) {
639 break;
640 }
641 }
642 return applies;
643}
644
645static
646unsigned long hash_channel_key(struct channel_key *key)
647{
648 return hash_key_u64(&key->key, lttng_ht_seed) ^ hash_key_ulong(
649 (void *) (unsigned long) key->domain, lttng_ht_seed);
650}
651
652static
653int handle_notification_thread_command_add_channel(
654 struct notification_thread_state *state,
655 struct channel_info *channel_info,
656 enum lttng_error_code *cmd_result)
657{
658 struct cds_list_head trigger_list;
659 struct channel_info *new_channel_info;
660 struct channel_key *channel_key;
661 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
662 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
663 int trigger_count = 0;
664 struct cds_lfht_iter iter;
665
666 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
667 channel_info->channel_name, channel_info->session_name,
668 channel_info->key.key, channel_info->key.domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
669
670 CDS_INIT_LIST_HEAD(&trigger_list);
671
672 new_channel_info = channel_info_copy(channel_info);
673 if (!new_channel_info) {
674 goto error;
675 }
676
677 channel_key = &new_channel_info->key;
678
679 /* Build a list of all triggers applying to the new channel. */
680 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
681 node) {
682 struct lttng_trigger_list_element *new_element;
683
684 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
685 channel_info)) {
686 continue;
687 }
688
689 new_element = zmalloc(sizeof(*new_element));
690 if (!new_element) {
691 goto error;
692 }
693 CDS_INIT_LIST_HEAD(&new_element->node);
694 new_element->trigger = trigger_ht_element->trigger;
695 cds_list_add(&new_element->node, &trigger_list);
696 trigger_count++;
697 }
698
699 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
700 trigger_count);
701 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
702 if (!channel_trigger_list) {
703 goto error;
704 }
705 channel_trigger_list->channel_key = *channel_key;
706 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
707 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
708 cds_list_splice(&trigger_list, &channel_trigger_list->list);
709
710 rcu_read_lock();
711 /* Add channel to the channel_ht which owns the channel_infos. */
712 cds_lfht_add(state->channels_ht,
713 hash_channel_key(channel_key),
714 &new_channel_info->channels_ht_node);
715 /*
716 * Add the list of triggers associated with this channel to the
717 * channel_triggers_ht.
718 */
719 cds_lfht_add(state->channel_triggers_ht,
720 hash_channel_key(channel_key),
721 &channel_trigger_list->channel_triggers_ht_node);
722 rcu_read_unlock();
723 *cmd_result = LTTNG_OK;
724 return 0;
725error:
726 /* Empty trigger list */
727 channel_info_destroy(new_channel_info);
728 return 1;
729}
730
731static
732int handle_notification_thread_command_remove_channel(
733 struct notification_thread_state *state,
734 uint64_t channel_key, enum lttng_domain_type domain,
735 enum lttng_error_code *cmd_result)
736{
737 struct cds_lfht_node *node;
738 struct cds_lfht_iter iter;
739 struct lttng_channel_trigger_list *trigger_list;
740 struct lttng_trigger_list_element *trigger_list_element, *tmp;
741 struct channel_key key = { .key = channel_key, .domain = domain };
742 struct channel_info *channel_info;
743
744 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
745 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
746
747 rcu_read_lock();
748
749 cds_lfht_lookup(state->channel_triggers_ht,
750 hash_channel_key(&key),
751 match_channel_trigger_list,
752 &key,
753 &iter);
754 node = cds_lfht_iter_get_node(&iter);
755 /*
756 * There is a severe internal error if we are being asked to remove a
757 * channel that doesn't exist.
758 */
759 if (!node) {
760 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
761 goto end;
762 }
763
764 /* Free the list of triggers associated with this channel. */
765 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
766 channel_triggers_ht_node);
767 cds_list_for_each_entry_safe(trigger_list_element, tmp,
768 &trigger_list->list, node) {
769 cds_list_del(&trigger_list_element->node);
770 free(trigger_list_element);
771 }
772 cds_lfht_del(state->channel_triggers_ht, node);
773 free(trigger_list);
774
775 /* Free sampled channel state. */
776 cds_lfht_lookup(state->channel_state_ht,
777 hash_channel_key(&key),
778 match_channel_state_sample,
779 &key,
780 &iter);
781 node = cds_lfht_iter_get_node(&iter);
782 /*
783 * This is expected to be NULL if the channel is destroyed before we
784 * received a sample.
785 */
786 if (node) {
787 struct channel_state_sample *sample = caa_container_of(node,
788 struct channel_state_sample,
789 channel_state_ht_node);
790
791 cds_lfht_del(state->channel_state_ht, node);
792 free(sample);
793 }
794
795 /* Remove the channel from the channels_ht and free it. */
796 cds_lfht_lookup(state->channels_ht,
797 hash_channel_key(&key),
798 match_channel_info,
799 &key,
800 &iter);
801 node = cds_lfht_iter_get_node(&iter);
802 assert(node);
803 channel_info = caa_container_of(node, struct channel_info,
804 channels_ht_node);
805 cds_lfht_del(state->channels_ht, node);
806 channel_info_destroy(channel_info);
807end:
808 rcu_read_unlock();
809 *cmd_result = LTTNG_OK;
810 return 0;
811}
812
1da26331
JG
813static
814int condition_is_supported(struct lttng_condition *condition)
815{
816 int ret;
817
818 switch (lttng_condition_get_type(condition)) {
819 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
820 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
821 {
822 enum lttng_domain_type domain;
823
824 ret = lttng_condition_buffer_usage_get_domain_type(condition,
825 &domain);
826 if (ret) {
827 ret = -1;
828 goto end;
829 }
830
831 if (domain != LTTNG_DOMAIN_KERNEL) {
832 ret = 1;
833 goto end;
834 }
835
836 /*
837 * Older kernel tracers don't expose the API to monitor their
838 * buffers. Therefore, we reject triggers that require that
839 * mechanism to be available to be evaluated.
840 */
841 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
842 kernel_tracer_fd);
843 break;
844 }
845 default:
846 ret = 1;
847 }
848end:
849 return ret;
850}
851
ab0ee2ca
JG
852/*
853 * FIXME A client's credentials are not checked when registering a trigger, nor
854 * are they stored alongside with the trigger.
855 *
1da26331 856 * The effects of this are benign since:
ab0ee2ca
JG
857 * - The client will succeed in registering the trigger, as it is valid,
858 * - The trigger will, internally, be bound to the channel,
859 * - The notifications will not be sent since the client's credentials
860 * are checked against the channel at that moment.
1da26331
JG
861 *
862 * If this function returns a non-zero value, it means something is
863 * fundamentally and the whole subsystem/thread will be torn down.
864 *
865 * If a non-fatal error occurs, just set the cmd_result to the appropriate
866 * error code.
ab0ee2ca
JG
867 */
868static
869int handle_notification_thread_command_register_trigger(
870 struct notification_thread_state *state,
871 struct lttng_trigger *trigger,
872 enum lttng_error_code *cmd_result)
873{
874 int ret = 0;
875 struct lttng_condition *condition;
876 struct notification_client *client;
877 struct notification_client_list *client_list = NULL;
878 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
879 struct notification_client_list_element *client_list_element, *tmp;
880 struct cds_lfht_node *node;
881 struct cds_lfht_iter iter;
882 struct channel_info *channel;
883 bool free_trigger = true;
884
885 rcu_read_lock();
886
887 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
888 assert(condition);
889
890 ret = condition_is_supported(condition);
891 if (ret < 0) {
892 goto error;
893 } else if (ret == 0) {
894 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
895 goto error;
896 } else {
897 /* Feature is supported, continue. */
898 ret = 0;
899 }
900
ab0ee2ca
JG
901 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
902 if (!trigger_ht_element) {
903 ret = -1;
904 goto error;
905 }
906
907 /* Add trigger to the trigger_ht. */
908 cds_lfht_node_init(&trigger_ht_element->node);
909 trigger_ht_element->trigger = trigger;
910
911 node = cds_lfht_add_unique(state->triggers_ht,
912 lttng_condition_hash(condition),
913 match_condition,
914 condition,
915 &trigger_ht_element->node);
916 if (node != &trigger_ht_element->node) {
917 /* Not a fatal error, simply report it to the client. */
918 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
919 goto error_free_ht_element;
920 }
921
922 /*
923 * Ownership of the trigger and of its wrapper was transfered to
924 * the triggers_ht.
925 */
926 trigger_ht_element = NULL;
927 free_trigger = false;
928
929 /*
930 * The rest only applies to triggers that have a "notify" action.
931 * It is not skipped as this is the only action type currently
932 * supported.
933 */
934 client_list = zmalloc(sizeof(*client_list));
935 if (!client_list) {
936 ret = -1;
937 goto error_free_ht_element;
938 }
939 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
940 CDS_INIT_LIST_HEAD(&client_list->list);
941 client_list->trigger = trigger;
942
943 /* Build a list of clients to which this new trigger applies. */
944 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
945 client_socket_ht_node) {
946 if (!trigger_applies_to_client(trigger, client)) {
947 continue;
948 }
949
950 client_list_element = zmalloc(sizeof(*client_list_element));
951 if (!client_list_element) {
952 ret = -1;
953 goto error_free_client_list;
954 }
955 CDS_INIT_LIST_HEAD(&client_list_element->node);
956 client_list_element->client = client;
957 cds_list_add(&client_list_element->node, &client_list->list);
958 }
959
960 cds_lfht_add(state->notification_trigger_clients_ht,
961 lttng_condition_hash(condition),
962 &client_list->notification_trigger_ht_node);
963 /*
964 * Client list ownership transferred to the
965 * notification_trigger_clients_ht.
966 */
967 client_list = NULL;
968
969 /*
970 * Add the trigger to list of triggers bound to the channels currently
971 * known.
972 */
973 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
974 channels_ht_node) {
975 struct lttng_trigger_list_element *trigger_list_element;
976 struct lttng_channel_trigger_list *trigger_list;
977
978 if (!trigger_applies_to_channel(trigger, channel)) {
979 continue;
980 }
981
982 cds_lfht_lookup(state->channel_triggers_ht,
983 hash_channel_key(&channel->key),
984 match_channel_trigger_list,
985 &channel->key,
986 &iter);
987 node = cds_lfht_iter_get_node(&iter);
988 assert(node);
989 /* Free the list of triggers associated with this channel. */
990 trigger_list = caa_container_of(node,
991 struct lttng_channel_trigger_list,
992 channel_triggers_ht_node);
993
994 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
995 if (!trigger_list_element) {
996 ret = -1;
997 goto error_free_client_list;
998 }
999 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1000 trigger_list_element->trigger = trigger;
1001 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1002 /* A trigger can only apply to one channel. */
1003 break;
1004 }
1005
1006 *cmd_result = LTTNG_OK;
1007error_free_client_list:
1008 if (client_list) {
1009 cds_list_for_each_entry_safe(client_list_element, tmp,
1010 &client_list->list, node) {
1011 free(client_list_element);
1012 }
1013 free(client_list);
1014 }
1015error_free_ht_element:
1016 free(trigger_ht_element);
1017error:
1018 if (free_trigger) {
1019 struct lttng_action *action = lttng_trigger_get_action(trigger);
1020
1021 lttng_condition_destroy(condition);
1022 lttng_action_destroy(action);
1023 lttng_trigger_destroy(trigger);
1024 }
1025 rcu_read_unlock();
1026 return ret;
1027}
1028
cc2295b5 1029static
ab0ee2ca
JG
1030int handle_notification_thread_command_unregister_trigger(
1031 struct notification_thread_state *state,
1032 struct lttng_trigger *trigger,
1033 enum lttng_error_code *_cmd_reply)
1034{
1035 struct cds_lfht_iter iter;
1036 struct cds_lfht_node *node, *triggers_ht_node;
1037 struct lttng_channel_trigger_list *trigger_list;
1038 struct notification_client_list *client_list;
1039 struct notification_client_list_element *client_list_element, *tmp;
1040 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1041 struct lttng_condition *condition = lttng_trigger_get_condition(
1042 trigger);
1043 struct lttng_action *action;
1044 enum lttng_error_code cmd_reply;
1045
1046 rcu_read_lock();
1047
1048 cds_lfht_lookup(state->triggers_ht,
1049 lttng_condition_hash(condition),
1050 match_condition,
1051 condition,
1052 &iter);
1053 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1054 if (!triggers_ht_node) {
1055 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1056 goto end;
1057 } else {
1058 cmd_reply = LTTNG_OK;
1059 }
1060
1061 /* Remove trigger from channel_triggers_ht. */
1062 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1063 channel_triggers_ht_node) {
1064 struct lttng_trigger_list_element *trigger_element, *tmp;
1065
1066 cds_list_for_each_entry_safe(trigger_element, tmp,
1067 &trigger_list->list, node) {
1068 struct lttng_condition *current_condition =
1069 lttng_trigger_get_condition(
1070 trigger_element->trigger);
1071
1072 assert(current_condition);
1073 if (!lttng_condition_is_equal(condition,
1074 current_condition)) {
1075 continue;
1076 }
1077
1078 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1079 cds_list_del(&trigger_element->node);
1080 }
1081 }
1082
1083 /*
1084 * Remove and release the client list from
1085 * notification_trigger_clients_ht.
1086 */
1087 cds_lfht_lookup(state->notification_trigger_clients_ht,
1088 lttng_condition_hash(condition),
1089 match_client_list,
1090 trigger,
1091 &iter);
1092 node = cds_lfht_iter_get_node(&iter);
1093 assert(node);
1094 client_list = caa_container_of(node, struct notification_client_list,
1095 notification_trigger_ht_node);
1096 cds_list_for_each_entry_safe(client_list_element, tmp,
1097 &client_list->list, node) {
1098 free(client_list_element);
1099 }
1100 cds_lfht_del(state->notification_trigger_clients_ht, node);
1101 free(client_list);
1102
1103 /* Remove trigger from triggers_ht. */
1104 trigger_ht_element = caa_container_of(triggers_ht_node,
1105 struct lttng_trigger_ht_element, node);
1106 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1107
1108 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1109 lttng_condition_destroy(condition);
1110 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1111 lttng_action_destroy(action);
1112 lttng_trigger_destroy(trigger_ht_element->trigger);
1113 free(trigger_ht_element);
1114end:
1115 rcu_read_unlock();
1116 if (_cmd_reply) {
1117 *_cmd_reply = cmd_reply;
1118 }
1119 return 0;
1120}
1121
1122/* Returns 0 on success, 1 on exit requested, negative value on error. */
1123int handle_notification_thread_command(
1124 struct notification_thread_handle *handle,
1125 struct notification_thread_state *state)
1126{
1127 int ret;
1128 uint64_t counter;
1129 struct notification_thread_command *cmd;
1130
1131 /* Read event_fd to put it back into a quiescent state. */
1132 ret = read(handle->cmd_queue.event_fd, &counter, sizeof(counter));
1133 if (ret == -1) {
1134 goto error;
1135 }
1136
1137 pthread_mutex_lock(&handle->cmd_queue.lock);
1138 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1139 struct notification_thread_command, cmd_list_node);
1140 switch (cmd->type) {
1141 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1142 DBG("[notification-thread] Received register trigger command");
1143 ret = handle_notification_thread_command_register_trigger(
1144 state, cmd->parameters.trigger,
1145 &cmd->reply_code);
1146 break;
1147 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1148 DBG("[notification-thread] Received unregister trigger command");
1149 ret = handle_notification_thread_command_unregister_trigger(
1150 state, cmd->parameters.trigger,
1151 &cmd->reply_code);
1152 break;
1153 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1154 DBG("[notification-thread] Received add channel command");
1155 ret = handle_notification_thread_command_add_channel(
1156 state, &cmd->parameters.add_channel,
1157 &cmd->reply_code);
1158 break;
1159 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1160 DBG("[notification-thread] Received remove channel command");
1161 ret = handle_notification_thread_command_remove_channel(
1162 state, cmd->parameters.remove_channel.key,
1163 cmd->parameters.remove_channel.domain,
1164 &cmd->reply_code);
1165 break;
1166 case NOTIFICATION_COMMAND_TYPE_QUIT:
1167 DBG("[notification-thread] Received quit command");
1168 cmd->reply_code = LTTNG_OK;
1169 ret = 1;
1170 goto end;
1171 default:
1172 ERR("[notification-thread] Unknown internal command received");
1173 goto error_unlock;
1174 }
1175
1176 if (ret) {
1177 goto error_unlock;
1178 }
1179end:
1180 cds_list_del(&cmd->cmd_list_node);
8ada111f 1181 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1182 pthread_mutex_unlock(&handle->cmd_queue.lock);
1183 return ret;
1184error_unlock:
1185 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 1186 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
1187 pthread_mutex_unlock(&handle->cmd_queue.lock);
1188 cmd->reply_code = LTTNG_ERR_FATAL;
1189error:
1190 /* Indicate a fatal error to the caller. */
1191 return -1;
1192}
1193
1194static
1195unsigned long hash_client_socket(int socket)
1196{
1197 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1198}
1199
1200static
1201int socket_set_non_blocking(int socket)
1202{
1203 int ret, flags;
1204
1205 /* Set the pipe as non-blocking. */
1206 ret = fcntl(socket, F_GETFL, 0);
1207 if (ret == -1) {
1208 PERROR("fcntl get socket flags");
1209 goto end;
1210 }
1211 flags = ret;
1212
1213 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1214 if (ret == -1) {
1215 PERROR("fcntl set O_NONBLOCK socket flag");
1216 goto end;
1217 }
1218 DBG("Client socket (fd = %i) set as non-blocking", socket);
1219end:
1220 return ret;
1221}
1222
1223static
14fa22f8 1224int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
1225{
1226 int ret;
1227
1228 ret = lttng_dynamic_buffer_set_size(
1229 &client->communication.inbound.buffer, 0);
1230 assert(!ret);
1231
1232 client->communication.inbound.bytes_to_receive =
1233 sizeof(struct lttng_notification_channel_message);
1234 client->communication.inbound.msg_type =
1235 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
1236 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1237 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
1238 ret = lttng_dynamic_buffer_set_size(
1239 &client->communication.inbound.buffer,
1240 client->communication.inbound.bytes_to_receive);
1241 return ret;
ab0ee2ca
JG
1242}
1243
1244int handle_notification_thread_client_connect(
1245 struct notification_thread_state *state)
1246{
1247 int ret;
1248 struct notification_client *client;
1249
1250 DBG("[notification-thread] Handling new notification channel client connection");
1251
1252 client = zmalloc(sizeof(*client));
1253 if (!client) {
1254 /* Fatal error. */
1255 ret = -1;
1256 goto error;
1257 }
1258 CDS_INIT_LIST_HEAD(&client->condition_list);
1259 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1260 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 1261 client->communication.inbound.expect_creds = true;
14fa22f8
JG
1262 ret = client_reset_inbound_state(client);
1263 if (ret) {
1264 ERR("[notification-thread] Failed to reset client communication's inbound state");
1265 ret = 0;
1266 goto error;
1267 }
ab0ee2ca
JG
1268
1269 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1270 if (ret < 0) {
1271 ERR("[notification-thread] Failed to accept new notification channel client connection");
1272 ret = 0;
1273 goto error;
1274 }
1275
1276 client->socket = ret;
1277
1278 ret = socket_set_non_blocking(client->socket);
1279 if (ret) {
1280 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1281 goto error;
1282 }
1283
1284 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1285 if (ret < 0) {
1286 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1287 ret = 0;
1288 goto error;
1289 }
1290
1291 ret = lttng_poll_add(&state->events, client->socket,
1292 LPOLLIN | LPOLLERR |
1293 LPOLLHUP | LPOLLRDHUP);
1294 if (ret < 0) {
1295 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1296 ret = 0;
1297 goto error;
1298 }
1299 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1300 client->socket);
1301
ab0ee2ca
JG
1302 rcu_read_lock();
1303 cds_lfht_add(state->client_socket_ht,
1304 hash_client_socket(client->socket),
1305 &client->client_socket_ht_node);
1306 rcu_read_unlock();
1307
1308 return ret;
1309error:
1310 notification_client_destroy(client, state);
1311 return ret;
1312}
1313
1314int handle_notification_thread_client_disconnect(
1315 int client_socket,
1316 struct notification_thread_state *state)
1317{
1318 int ret = 0;
1319 struct notification_client *client;
1320
1321 rcu_read_lock();
1322 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1323 client_socket);
1324 client = get_client_from_socket(client_socket, state);
1325 if (!client) {
1326 /* Internal state corruption, fatal error. */
1327 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1328 client_socket);
1329 ret = -1;
1330 goto end;
1331 }
1332
1333 ret = lttng_poll_del(&state->events, client_socket);
1334 if (ret) {
1335 ERR("[notification-thread] Failed to remove client socket from poll set");
1336 }
1337 cds_lfht_del(state->client_socket_ht,
1338 &client->client_socket_ht_node);
1339 notification_client_destroy(client, state);
1340end:
1341 rcu_read_unlock();
1342 return ret;
1343}
1344
1345int handle_notification_thread_client_disconnect_all(
1346 struct notification_thread_state *state)
1347{
1348 struct cds_lfht_iter iter;
1349 struct notification_client *client;
1350 bool error_encoutered = false;
1351
1352 rcu_read_lock();
1353 DBG("[notification-thread] Closing all client connections");
1354 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1355 client_socket_ht_node) {
1356 int ret;
1357
1358 ret = handle_notification_thread_client_disconnect(
1359 client->socket, state);
1360 if (ret) {
1361 error_encoutered = true;
1362 }
1363 }
1364 rcu_read_unlock();
1365 return error_encoutered ? 1 : 0;
1366}
1367
1368int handle_notification_thread_trigger_unregister_all(
1369 struct notification_thread_state *state)
1370{
4149ace8 1371 bool error_occurred = false;
ab0ee2ca
JG
1372 struct cds_lfht_iter iter;
1373 struct lttng_trigger_ht_element *trigger_ht_element;
1374
1375 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1376 node) {
1377 int ret = handle_notification_thread_command_unregister_trigger(
1378 state, trigger_ht_element->trigger, NULL);
1379 if (ret) {
4149ace8 1380 error_occurred = true;
ab0ee2ca
JG
1381 }
1382 }
4149ace8 1383 return error_occurred ? -1 : 0;
ab0ee2ca
JG
1384}
1385
1386static
1387int client_flush_outgoing_queue(struct notification_client *client,
1388 struct notification_thread_state *state)
1389{
1390 ssize_t ret;
1391 size_t to_send_count;
1392
1393 assert(client->communication.outbound.buffer.size != 0);
1394 to_send_count = client->communication.outbound.buffer.size;
1395 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1396 client->socket);
1397
1398 ret = lttcomm_send_unix_sock_non_block(client->socket,
1399 client->communication.outbound.buffer.data,
1400 to_send_count);
1401 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1402 (ret > 0 && ret < to_send_count)) {
1403 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1404 client->socket);
1405 to_send_count -= max(ret, 0);
1406
1407 memcpy(client->communication.outbound.buffer.data,
1408 client->communication.outbound.buffer.data +
1409 client->communication.outbound.buffer.size - to_send_count,
1410 to_send_count);
1411 ret = lttng_dynamic_buffer_set_size(
1412 &client->communication.outbound.buffer,
1413 to_send_count);
1414 if (ret) {
1415 goto error;
1416 }
1417
1418 /*
1419 * We want to be notified whenever there is buffer space
1420 * available to send the rest of the payload.
1421 */
1422 ret = lttng_poll_mod(&state->events, client->socket,
1423 CLIENT_POLL_MASK_IN_OUT);
1424 if (ret) {
1425 goto error;
1426 }
1427 } else if (ret < 0) {
1428 /* Generic error, disconnect the client. */
1429 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1430 client->socket);
1431 ret = handle_notification_thread_client_disconnect(
1432 client->socket, state);
1433 if (ret) {
1434 goto error;
1435 }
1436 } else {
1437 /* No error and flushed the queue completely. */
1438 ret = lttng_dynamic_buffer_set_size(
1439 &client->communication.outbound.buffer, 0);
1440 if (ret) {
1441 goto error;
1442 }
1443 ret = lttng_poll_mod(&state->events, client->socket,
1444 CLIENT_POLL_MASK_IN);
1445 if (ret) {
1446 goto error;
1447 }
1448
1449 client->communication.outbound.queued_command_reply = false;
1450 client->communication.outbound.dropped_notification = false;
1451 }
1452
1453 return 0;
1454error:
1455 return -1;
1456}
1457
1458static
1459int client_send_command_reply(struct notification_client *client,
1460 struct notification_thread_state *state,
1461 enum lttng_notification_channel_status status)
1462{
1463 int ret;
1464 struct lttng_notification_channel_command_reply reply = {
1465 .status = (int8_t) status,
1466 };
1467 struct lttng_notification_channel_message msg = {
1468 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1469 .size = sizeof(reply),
1470 };
1471 char buffer[sizeof(msg) + sizeof(reply)];
1472
1473 if (client->communication.outbound.queued_command_reply) {
1474 /* Protocol error. */
1475 goto error;
1476 }
1477
1478 memcpy(buffer, &msg, sizeof(msg));
1479 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1480 DBG("[notification-thread] Send command reply (%i)", (int) status);
1481
1482 /* Enqueue buffer to outgoing queue and flush it. */
1483 ret = lttng_dynamic_buffer_append(
1484 &client->communication.outbound.buffer,
1485 buffer, sizeof(buffer));
1486 if (ret) {
1487 goto error;
1488 }
1489
1490 ret = client_flush_outgoing_queue(client, state);
1491 if (ret) {
1492 goto error;
1493 }
1494
1495 if (client->communication.outbound.buffer.size != 0) {
1496 /* Queue could not be emptied. */
1497 client->communication.outbound.queued_command_reply = true;
1498 }
1499
1500 return 0;
1501error:
1502 return -1;
1503}
1504
1505static
1506int client_dispatch_message(struct notification_client *client,
1507 struct notification_thread_state *state)
1508{
1509 int ret = 0;
1510
1511 if (client->communication.inbound.msg_type !=
1512 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1513 client->communication.inbound.msg_type !=
1514 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1515 !client->validated) {
1516 WARN("[notification-thread] client attempted a command before handshake");
1517 ret = -1;
1518 goto end;
1519 }
1520
1521 switch (client->communication.inbound.msg_type) {
1522 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1523 {
1524 /*
1525 * Receiving message header. The function will be called again
1526 * once the rest of the message as been received and can be
1527 * interpreted.
1528 */
1529 const struct lttng_notification_channel_message *msg;
1530
1531 assert(sizeof(*msg) ==
1532 client->communication.inbound.buffer.size);
1533 msg = (const struct lttng_notification_channel_message *)
1534 client->communication.inbound.buffer.data;
1535
1536 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1537 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1538 ret = -1;
1539 goto end;
1540 }
1541
1542 switch (msg->type) {
1543 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1544 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1545 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1546 break;
1547 default:
1548 ret = -1;
1549 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1550 goto end;
1551 }
1552
1553 client->communication.inbound.bytes_to_receive = msg->size;
1554 client->communication.inbound.msg_type =
1555 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 1556 ret = lttng_dynamic_buffer_set_size(
14fa22f8 1557 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
1558 if (ret) {
1559 goto end;
1560 }
1561 break;
1562 }
1563 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1564 {
1565 struct lttng_notification_channel_command_handshake *handshake_client;
1566 struct lttng_notification_channel_command_handshake handshake_reply = {
1567 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
1568 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
1569 };
1570 struct lttng_notification_channel_message msg_header = {
1571 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
1572 .size = sizeof(handshake_reply),
1573 };
1574 enum lttng_notification_channel_status status =
1575 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1576 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
1577
1578 memcpy(send_buffer, &msg_header, sizeof(msg_header));
1579 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
1580 sizeof(handshake_reply));
1581
1582 handshake_client =
1583 (struct lttng_notification_channel_command_handshake *)
1584 client->communication.inbound.buffer.data;
47a32869 1585 client->major = handshake_client->major;
ab0ee2ca
JG
1586 client->minor = handshake_client->minor;
1587 if (!client->communication.inbound.creds_received) {
1588 ERR("[notification-thread] No credentials received from client");
1589 ret = -1;
1590 goto end;
1591 }
1592
1593 client->uid = LTTNG_SOCK_GET_UID_CRED(
1594 &client->communication.inbound.creds);
1595 client->gid = LTTNG_SOCK_GET_GID_CRED(
1596 &client->communication.inbound.creds);
1597 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
1598 client->uid, client->gid, (int) client->major,
1599 (int) client->minor);
1600
1601 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
1602 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
1603 }
1604
1605 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
1606 send_buffer, sizeof(send_buffer));
1607 if (ret) {
1608 ERR("[notification-thread] Failed to send protocol version to notification channel client");
1609 goto end;
1610 }
1611
1612 ret = client_flush_outgoing_queue(client, state);
1613 if (ret) {
1614 goto end;
1615 }
1616
1617 ret = client_send_command_reply(client, state, status);
1618 if (ret) {
1619 ERR("[notification-thread] Failed to send reply to notification channel client");
1620 goto end;
1621 }
1622
1623 /* Set reception state to receive the next message header. */
14fa22f8
JG
1624 ret = client_reset_inbound_state(client);
1625 if (ret) {
1626 ERR("[notification-thread] Failed to reset client communication's inbound state");
1627 goto end;
1628 }
ab0ee2ca
JG
1629 client->validated = true;
1630 break;
1631 }
1632 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1633 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1634 {
1635 struct lttng_condition *condition;
1636 enum lttng_notification_channel_status status =
1637 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1638 const struct lttng_buffer_view condition_view =
1639 lttng_buffer_view_from_dynamic_buffer(
1640 &client->communication.inbound.buffer,
1641 0, -1);
1642 size_t expected_condition_size =
1643 client->communication.inbound.buffer.size;
1644
1645 ret = lttng_condition_create_from_buffer(&condition_view,
1646 &condition);
1647 if (ret != expected_condition_size) {
1648 ERR("[notification-thread] Malformed condition received from client");
1649 goto end;
1650 }
1651
1652 if (client->communication.inbound.msg_type ==
1653 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
1654 /*
1655 * FIXME The current state should be evaluated on
1656 * subscription.
1657 */
1658 ret = notification_thread_client_subscribe(client,
1659 condition, state, &status);
1660 } else {
1661 ret = notification_thread_client_unsubscribe(client,
1662 condition, state, &status);
1663 }
1664 if (ret) {
1665 goto end;
1666 }
1667
1668 ret = client_send_command_reply(client, state, status);
1669 if (ret) {
1670 ERR("[notification-thread] Failed to send reply to notification channel client");
1671 goto end;
1672 }
1673
1674 /* Set reception state to receive the next message header. */
14fa22f8
JG
1675 ret = client_reset_inbound_state(client);
1676 if (ret) {
1677 ERR("[notification-thread] Failed to reset client communication's inbound state");
1678 goto end;
1679 }
ab0ee2ca
JG
1680 break;
1681 }
1682 default:
1683 abort();
1684 }
1685end:
1686 return ret;
1687}
1688
1689/* Incoming data from client. */
1690int handle_notification_thread_client_in(
1691 struct notification_thread_state *state, int socket)
1692{
14fa22f8 1693 int ret = 0;
ab0ee2ca
JG
1694 struct notification_client *client;
1695 ssize_t recv_ret;
1696 size_t offset;
1697
1698 client = get_client_from_socket(socket, state);
1699 if (!client) {
1700 /* Internal error, abort. */
1701 ret = -1;
1702 goto end;
1703 }
1704
14fa22f8
JG
1705 offset = client->communication.inbound.buffer.size -
1706 client->communication.inbound.bytes_to_receive;
01ea340e 1707 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
1708 recv_ret = lttcomm_recv_creds_unix_sock(socket,
1709 client->communication.inbound.buffer.data + offset,
1710 client->communication.inbound.bytes_to_receive,
1711 &client->communication.inbound.creds);
1712 if (recv_ret > 0) {
01ea340e 1713 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
1714 client->communication.inbound.creds_received = true;
1715 }
1716 } else {
1717 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
1718 client->communication.inbound.buffer.data + offset,
1719 client->communication.inbound.bytes_to_receive);
1720 }
1721 if (recv_ret < 0) {
1722 goto error_disconnect_client;
1723 }
1724
1725 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
1726 if (client->communication.inbound.bytes_to_receive == 0) {
1727 ret = client_dispatch_message(client, state);
1728 if (ret) {
1729 /*
1730 * Only returns an error if this client must be
1731 * disconnected.
1732 */
1733 goto error_disconnect_client;
1734 }
1735 } else {
1736 goto end;
1737 }
1738end:
1739 return ret;
1740error_disconnect_client:
1741 ret = handle_notification_thread_client_disconnect(socket, state);
1742 return ret;
1743}
1744
1745/* Client ready to receive outgoing data. */
1746int handle_notification_thread_client_out(
1747 struct notification_thread_state *state, int socket)
1748{
1749 int ret;
1750 struct notification_client *client;
1751
1752 client = get_client_from_socket(socket, state);
1753 if (!client) {
1754 /* Internal error, abort. */
1755 ret = -1;
1756 goto end;
1757 }
1758
1759 ret = client_flush_outgoing_queue(client, state);
1760 if (ret) {
1761 goto end;
1762 }
1763end:
1764 return ret;
1765}
1766
1767static
1768bool evaluate_buffer_usage_condition(struct lttng_condition *condition,
1769 struct channel_state_sample *sample, uint64_t buffer_capacity)
1770{
1771 bool result = false;
1772 uint64_t threshold;
1773 enum lttng_condition_type condition_type;
1774 struct lttng_condition_buffer_usage *use_condition = container_of(
1775 condition, struct lttng_condition_buffer_usage,
1776 parent);
1777
1778 if (!sample) {
1779 goto end;
1780 }
1781
1782 if (use_condition->threshold_bytes.set) {
1783 threshold = use_condition->threshold_bytes.value;
1784 } else {
1785 /*
1786 * Threshold was expressed as a ratio.
1787 *
1788 * TODO the threshold (in bytes) of conditions expressed
1789 * as a ratio of total buffer size could be cached to
1790 * forego this double-multiplication or it could be performed
1791 * as fixed-point math.
1792 *
1793 * Note that caching should accomodate the case where the
1794 * condition applies to multiple channels (i.e. don't assume
1795 * that all channels matching my_chann* have the same size...)
1796 */
1797 threshold = (uint64_t) (use_condition->threshold_ratio.value *
1798 (double) buffer_capacity);
1799 }
1800
1801 condition_type = lttng_condition_get_type(condition);
1802 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
1803 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1804 threshold, sample->highest_usage);
1805
1806 /*
1807 * The low condition should only be triggered once _all_ of the
1808 * streams in a channel have gone below the "low" threshold.
1809 */
1810 if (sample->highest_usage <= threshold) {
1811 result = true;
1812 }
1813 } else {
1814 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
1815 threshold, sample->highest_usage);
1816
1817 /*
1818 * For high buffer usage scenarios, we want to trigger whenever
1819 * _any_ of the streams has reached the "high" threshold.
1820 */
1821 if (sample->highest_usage >= threshold) {
1822 result = true;
1823 }
1824 }
1825end:
1826 return result;
1827}
1828
1829static
1830int evaluate_condition(struct lttng_condition *condition,
1831 struct lttng_evaluation **evaluation,
1832 struct notification_thread_state *state,
1833 struct channel_state_sample *previous_sample,
1834 struct channel_state_sample *latest_sample,
1835 uint64_t buffer_capacity)
1836{
1837 int ret = 0;
1838 enum lttng_condition_type condition_type;
1839 bool previous_sample_result;
1840 bool latest_sample_result;
1841
1842 condition_type = lttng_condition_get_type(condition);
1843 /* No other condition type supported for the moment. */
1844 assert(condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW ||
1845 condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH);
1846
1847 previous_sample_result = evaluate_buffer_usage_condition(condition,
1848 previous_sample, buffer_capacity);
1849 latest_sample_result = evaluate_buffer_usage_condition(condition,
1850 latest_sample, buffer_capacity);
1851
1852 if (!latest_sample_result ||
1853 (previous_sample_result == latest_sample_result)) {
1854 /*
1855 * Only trigger on a condition evaluation transition.
1856 *
1857 * NOTE: This edge-triggered logic may not be appropriate for
1858 * future condition types.
1859 */
1860 goto end;
1861 }
1862
1863 if (evaluation && latest_sample_result) {
1864 *evaluation = lttng_evaluation_buffer_usage_create(
1865 condition_type,
1866 latest_sample->highest_usage,
1867 buffer_capacity);
1868 if (!*evaluation) {
1869 ret = -1;
1870 goto end;
1871 }
1872 }
1873end:
1874 return ret;
1875}
1876
1877static
1878int client_enqueue_dropped_notification(struct notification_client *client,
1879 struct notification_thread_state *state)
1880{
1881 int ret;
1882 struct lttng_notification_channel_message msg = {
1883 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
1884 .size = 0,
1885 };
1886
1887 ret = lttng_dynamic_buffer_append(
1888 &client->communication.outbound.buffer, &msg,
1889 sizeof(msg));
1890 return ret;
1891}
1892
1893static
1894int send_evaluation_to_clients(struct lttng_trigger *trigger,
1895 struct lttng_evaluation *evaluation,
1896 struct notification_client_list* client_list,
1897 struct notification_thread_state *state,
1898 uid_t channel_uid, gid_t channel_gid)
1899{
1900 int ret = 0;
1901 struct lttng_dynamic_buffer msg_buffer;
1902 struct notification_client_list_element *client_list_element, *tmp;
1903 struct lttng_notification *notification;
1904 struct lttng_condition *condition;
1905 ssize_t expected_notification_size, notification_size;
1906 struct lttng_notification_channel_message msg;
1907
1908 lttng_dynamic_buffer_init(&msg_buffer);
1909
1910 condition = lttng_trigger_get_condition(trigger);
1911 assert(condition);
1912
1913 notification = lttng_notification_create(condition, evaluation);
1914 if (!notification) {
1915 ret = -1;
1916 goto end;
1917 }
1918
1919 expected_notification_size = lttng_notification_serialize(notification,
1920 NULL);
1921 if (expected_notification_size < 0) {
1922 ERR("[notification-thread] Failed to get size of serialized notification");
1923 ret = -1;
1924 goto end;
1925 }
1926
1927 msg.type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION;
1928 msg.size = (uint32_t) expected_notification_size;
1929 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg, sizeof(msg));
1930 if (ret) {
1931 goto end;
1932 }
1933
1934 ret = lttng_dynamic_buffer_set_size(&msg_buffer,
1935 msg_buffer.size + expected_notification_size);
1936 if (ret) {
1937 goto end;
1938 }
1939
1940 notification_size = lttng_notification_serialize(notification,
1941 msg_buffer.data + sizeof(msg));
1942 if (notification_size != expected_notification_size) {
1943 ERR("[notification-thread] Failed to serialize notification");
1944 ret = -1;
1945 goto end;
1946 }
1947
1948 cds_list_for_each_entry_safe(client_list_element, tmp,
1949 &client_list->list, node) {
1950 struct notification_client *client =
1951 client_list_element->client;
1952
1953 if (client->uid != channel_uid && client->gid != channel_gid &&
1954 client->uid != 0) {
1955 /* Client is not allowed to monitor this channel. */
1956 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
1957 continue;
1958 }
1959
1960 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
1961 client->socket, msg_buffer.size);
1962 if (client->communication.outbound.buffer.size) {
1963 /*
1964 * Outgoing data is already buffered for this client;
1965 * drop the notification and enqueue a "dropped
1966 * notification" message if this is the first dropped
1967 * notification since the socket spilled-over to the
1968 * queue.
1969 */
1970 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
1971 client->socket);
1972 if (!client->communication.outbound.dropped_notification) {
1973 client->communication.outbound.dropped_notification = true;
1974 ret = client_enqueue_dropped_notification(
1975 client, state);
1976 if (ret) {
1977 goto end;
1978 }
1979 }
1980 continue;
1981 }
1982
1983 ret = lttng_dynamic_buffer_append_buffer(
1984 &client->communication.outbound.buffer,
1985 &msg_buffer);
1986 if (ret) {
1987 goto end;
1988 }
1989
1990 ret = client_flush_outgoing_queue(client, state);
1991 if (ret) {
1992 goto end;
1993 }
1994 }
1995 ret = 0;
1996end:
1997 lttng_notification_destroy(notification);
1998 lttng_dynamic_buffer_reset(&msg_buffer);
1999 return ret;
2000}
2001
2002int handle_notification_thread_channel_sample(
2003 struct notification_thread_state *state, int pipe,
2004 enum lttng_domain_type domain)
2005{
2006 int ret = 0;
2007 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2008 struct channel_state_sample previous_sample, latest_sample;
2009 struct channel_info *channel_info;
2010 struct cds_lfht_node *node;
2011 struct cds_lfht_iter iter;
2012 struct lttng_channel_trigger_list *trigger_list;
2013 struct lttng_trigger_list_element *trigger_list_element;
2014 bool previous_sample_available = false;
2015
2016 /*
2017 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2018 * ensuring that read/write of sampling messages are atomic.
2019 */
7c2551ef 2020 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
2021 if (ret != sizeof(sample_msg)) {
2022 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2023 pipe);
2024 ret = -1;
2025 goto end;
2026 }
2027
2028 ret = 0;
2029 latest_sample.key.key = sample_msg.key;
2030 latest_sample.key.domain = domain;
2031 latest_sample.highest_usage = sample_msg.highest;
2032 latest_sample.lowest_usage = sample_msg.lowest;
2033
2034 rcu_read_lock();
2035
2036 /* Retrieve the channel's informations */
2037 cds_lfht_lookup(state->channels_ht,
2038 hash_channel_key(&latest_sample.key),
2039 match_channel_info,
2040 &latest_sample.key,
2041 &iter);
2042 node = cds_lfht_iter_get_node(&iter);
2043 if (!node) {
2044 /*
2045 * Not an error since the consumer can push a sample to the pipe
2046 * and the rest of the session daemon could notify us of the
2047 * channel's destruction before we get a chance to process that
2048 * sample.
2049 */
2050 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2051 latest_sample.key.key,
2052 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2053 "user space");
2054 goto end_unlock;
2055 }
2056 channel_info = caa_container_of(node, struct channel_info,
2057 channels_ht_node);
2058 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64")",
2059 channel_info->channel_name,
2060 latest_sample.key.key,
2061 channel_info->session_name,
2062 latest_sample.highest_usage,
2063 latest_sample.lowest_usage);
2064
2065 /* Retrieve the channel's last sample, if it exists, and update it. */
2066 cds_lfht_lookup(state->channel_state_ht,
2067 hash_channel_key(&latest_sample.key),
2068 match_channel_state_sample,
2069 &latest_sample.key,
2070 &iter);
2071 node = cds_lfht_iter_get_node(&iter);
2072 if (node) {
2073 struct channel_state_sample *stored_sample;
2074
2075 /* Update the sample stored. */
2076 stored_sample = caa_container_of(node,
2077 struct channel_state_sample,
2078 channel_state_ht_node);
2079 memcpy(&previous_sample, stored_sample,
2080 sizeof(previous_sample));
2081 stored_sample->highest_usage = latest_sample.highest_usage;
2082 stored_sample->lowest_usage = latest_sample.lowest_usage;
2083 previous_sample_available = true;
2084 } else {
2085 /*
2086 * This is the channel's first sample, allocate space for and
2087 * store the new sample.
2088 */
2089 struct channel_state_sample *stored_sample;
2090
2091 stored_sample = zmalloc(sizeof(*stored_sample));
2092 if (!stored_sample) {
2093 ret = -1;
2094 goto end_unlock;
2095 }
2096
2097 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2098 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2099 cds_lfht_add(state->channel_state_ht,
2100 hash_channel_key(&stored_sample->key),
2101 &stored_sample->channel_state_ht_node);
2102 }
2103
2104 /* Find triggers associated with this channel. */
2105 cds_lfht_lookup(state->channel_triggers_ht,
2106 hash_channel_key(&latest_sample.key),
2107 match_channel_trigger_list,
2108 &latest_sample.key,
2109 &iter);
2110 node = cds_lfht_iter_get_node(&iter);
2111 if (!node) {
2112 goto end_unlock;
2113 }
2114
2115 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2116 channel_triggers_ht_node);
2117 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2118 node) {
2119 struct lttng_condition *condition;
2120 struct lttng_action *action;
2121 struct lttng_trigger *trigger;
2122 struct notification_client_list *client_list;
2123 struct lttng_evaluation *evaluation = NULL;
2124
2125 trigger = trigger_list_element->trigger;
2126 condition = lttng_trigger_get_condition(trigger);
2127 assert(condition);
2128 action = lttng_trigger_get_action(trigger);
2129
2130 /* Notify actions are the only type currently supported. */
2131 assert(lttng_action_get_type(action) ==
2132 LTTNG_ACTION_TYPE_NOTIFY);
2133
2134 /*
2135 * Check if any client is subscribed to the result of this
2136 * evaluation.
2137 */
2138 cds_lfht_lookup(state->notification_trigger_clients_ht,
2139 lttng_condition_hash(condition),
2140 match_client_list,
2141 trigger,
2142 &iter);
2143 node = cds_lfht_iter_get_node(&iter);
2144 assert(node);
2145
2146 client_list = caa_container_of(node,
2147 struct notification_client_list,
2148 notification_trigger_ht_node);
2149 if (cds_list_empty(&client_list->list)) {
2150 /*
2151 * No clients interested in the evaluation's result,
2152 * skip it.
2153 */
2154 continue;
2155 }
2156
2157 ret = evaluate_condition(condition, &evaluation, state,
2158 previous_sample_available ? &previous_sample : NULL,
2159 &latest_sample, channel_info->capacity);
2160 if (ret) {
2161 goto end_unlock;
2162 }
2163
2164 if (!evaluation) {
2165 continue;
2166 }
2167
2168 /* Dispatch evaluation result to all clients. */
2169 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2170 evaluation, client_list, state,
2171 channel_info->uid, channel_info->gid);
2172 if (ret) {
2173 goto end_unlock;
2174 }
2175 }
2176end_unlock:
2177 rcu_read_unlock();
2178end:
2179 return ret;
2180}
This page took 0.110156 seconds and 5 git commands to generate.