Add a comment clarifying the ownership of triggers
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
... / ...
CommitLineData
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
31#include <lttng/action/action-internal.h>
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
35#include <lttng/condition/session-consumed-size-internal.h>
36#include <lttng/notification/channel-internal.h>
37
38#include <time.h>
39#include <unistd.h>
40#include <assert.h>
41#include <inttypes.h>
42#include <fcntl.h>
43
44#include "notification-thread.h"
45#include "notification-thread-events.h"
46#include "notification-thread-commands.h"
47#include "lttng-sessiond.h"
48#include "kernel.h"
49
50#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
51#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
52
53struct lttng_trigger_list_element {
54 /* No ownership of the trigger object is assumed. */
55 const struct lttng_trigger *trigger;
56 struct cds_list_head node;
57};
58
59struct lttng_channel_trigger_list {
60 struct channel_key channel_key;
61 struct cds_list_head list;
62 struct cds_lfht_node channel_triggers_ht_node;
63};
64
65struct lttng_trigger_ht_element {
66 struct lttng_trigger *trigger;
67 struct cds_lfht_node node;
68};
69
70struct lttng_condition_list_element {
71 struct lttng_condition *condition;
72 struct cds_list_head node;
73};
74
75struct notification_client_list_element {
76 struct notification_client *client;
77 struct cds_list_head node;
78};
79
80struct notification_client_list {
81 struct lttng_trigger *trigger;
82 struct cds_list_head list;
83 struct cds_lfht_node notification_trigger_ht_node;
84};
85
86struct notification_client {
87 int socket;
88 /* Client protocol version. */
89 uint8_t major, minor;
90 uid_t uid;
91 gid_t gid;
92 /*
93 * Indicates if the credentials and versions of the client have been
94 * checked.
95 */
96 bool validated;
97 /*
98 * Conditions to which the client's notification channel is subscribed.
99 * List of struct lttng_condition_list_node. The condition member is
100 * owned by the client.
101 */
102 struct cds_list_head condition_list;
103 struct cds_lfht_node client_socket_ht_node;
104 struct {
105 struct {
106 /*
107 * During the reception of a message, the reception
108 * buffers' "size" is set to contain the current
109 * message's complete payload.
110 */
111 struct lttng_dynamic_buffer buffer;
112 /* Bytes left to receive for the current message. */
113 size_t bytes_to_receive;
114 /* Type of the message being received. */
115 enum lttng_notification_channel_message_type msg_type;
116 /*
117 * Indicates whether or not credentials are expected
118 * from the client.
119 */
120 bool expect_creds;
121 /*
122 * Indicates whether or not credentials were received
123 * from the client.
124 */
125 bool creds_received;
126 /* Only used during credentials reception. */
127 lttng_sock_cred creds;
128 } inbound;
129 struct {
130 /*
131 * Indicates whether or not a notification addressed to
132 * this client was dropped because a command reply was
133 * already buffered.
134 *
135 * A notification is dropped whenever the buffer is not
136 * empty.
137 */
138 bool dropped_notification;
139 /*
140 * Indicates whether or not a command reply is already
141 * buffered. In this case, it means that the client is
142 * not consuming command replies before emitting a new
143 * one. This could be caused by a protocol error or a
144 * misbehaving/malicious client.
145 */
146 bool queued_command_reply;
147 struct lttng_dynamic_buffer buffer;
148 } outbound;
149 } communication;
150};
151
152struct channel_state_sample {
153 struct channel_key key;
154 struct cds_lfht_node channel_state_ht_node;
155 uint64_t highest_usage;
156 uint64_t lowest_usage;
157 uint64_t channel_total_consumed;
158};
159
160static unsigned long hash_channel_key(struct channel_key *key);
161static int evaluate_condition(const struct lttng_condition *condition,
162 struct lttng_evaluation **evaluation,
163 const struct notification_thread_state *state,
164 const struct channel_state_sample *previous_sample,
165 const struct channel_state_sample *latest_sample,
166 uint64_t previous_session_consumed_total,
167 uint64_t latest_session_consumed_total,
168 struct channel_info *channel_info);
169static
170int send_evaluation_to_clients(const struct lttng_trigger *trigger,
171 const struct lttng_evaluation *evaluation,
172 struct notification_client_list *client_list,
173 struct notification_thread_state *state,
174 uid_t channel_uid, gid_t channel_gid);
175
176
177static
178void session_info_destroy(void *_data);
179static
180void session_info_get(struct session_info *session_info);
181static
182void session_info_put(struct session_info *session_info);
183static
184struct session_info *session_info_create(const char *name,
185 uid_t uid, gid_t gid);
186static
187void session_info_add_channel(struct session_info *session_info,
188 struct channel_info *channel_info);
189static
190void session_info_remove_channel(struct session_info *session_info,
191 struct channel_info *channel_info);
192
193static
194int match_client(struct cds_lfht_node *node, const void *key)
195{
196 /* This double-cast is intended to supress pointer-to-cast warning. */
197 int socket = (int) (intptr_t) key;
198 struct notification_client *client;
199
200 client = caa_container_of(node, struct notification_client,
201 client_socket_ht_node);
202
203 return !!(client->socket == socket);
204}
205
206static
207int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
208{
209 struct channel_key *channel_key = (struct channel_key *) key;
210 struct lttng_channel_trigger_list *trigger_list;
211
212 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
213 channel_triggers_ht_node);
214
215 return !!((channel_key->key == trigger_list->channel_key.key) &&
216 (channel_key->domain == trigger_list->channel_key.domain));
217}
218
219static
220int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
221{
222 struct channel_key *channel_key = (struct channel_key *) key;
223 struct channel_state_sample *sample;
224
225 sample = caa_container_of(node, struct channel_state_sample,
226 channel_state_ht_node);
227
228 return !!((channel_key->key == sample->key.key) &&
229 (channel_key->domain == sample->key.domain));
230}
231
232static
233int match_channel_info(struct cds_lfht_node *node, const void *key)
234{
235 struct channel_key *channel_key = (struct channel_key *) key;
236 struct channel_info *channel_info;
237
238 channel_info = caa_container_of(node, struct channel_info,
239 channels_ht_node);
240
241 return !!((channel_key->key == channel_info->key.key) &&
242 (channel_key->domain == channel_info->key.domain));
243}
244
245static
246int match_condition(struct cds_lfht_node *node, const void *key)
247{
248 struct lttng_condition *condition_key = (struct lttng_condition *) key;
249 struct lttng_trigger_ht_element *trigger;
250 struct lttng_condition *condition;
251
252 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
253 node);
254 condition = lttng_trigger_get_condition(trigger->trigger);
255 assert(condition);
256
257 return !!lttng_condition_is_equal(condition_key, condition);
258}
259
260static
261int match_client_list(struct cds_lfht_node *node, const void *key)
262{
263 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
264 struct notification_client_list *client_list;
265 struct lttng_condition *condition;
266 struct lttng_condition *condition_key = lttng_trigger_get_condition(
267 trigger_key);
268
269 assert(condition_key);
270
271 client_list = caa_container_of(node, struct notification_client_list,
272 notification_trigger_ht_node);
273 condition = lttng_trigger_get_condition(client_list->trigger);
274
275 return !!lttng_condition_is_equal(condition_key, condition);
276}
277
278static
279int match_client_list_condition(struct cds_lfht_node *node, const void *key)
280{
281 struct lttng_condition *condition_key = (struct lttng_condition *) key;
282 struct notification_client_list *client_list;
283 struct lttng_condition *condition;
284
285 assert(condition_key);
286
287 client_list = caa_container_of(node, struct notification_client_list,
288 notification_trigger_ht_node);
289 condition = lttng_trigger_get_condition(client_list->trigger);
290
291 return !!lttng_condition_is_equal(condition_key, condition);
292}
293
294static
295unsigned long lttng_condition_buffer_usage_hash(
296 const struct lttng_condition *_condition)
297{
298 unsigned long hash;
299 unsigned long condition_type;
300 struct lttng_condition_buffer_usage *condition;
301
302 condition = container_of(_condition,
303 struct lttng_condition_buffer_usage, parent);
304
305 condition_type = (unsigned long) condition->parent.type;
306 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
307 if (condition->session_name) {
308 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
309 }
310 if (condition->channel_name) {
311 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
312 }
313 if (condition->domain.set) {
314 hash ^= hash_key_ulong(
315 (void *) condition->domain.type,
316 lttng_ht_seed);
317 }
318 if (condition->threshold_ratio.set) {
319 uint64_t val;
320
321 val = condition->threshold_ratio.value * (double) UINT32_MAX;
322 hash ^= hash_key_u64(&val, lttng_ht_seed);
323 } else if (condition->threshold_bytes.set) {
324 uint64_t val;
325
326 val = condition->threshold_bytes.value;
327 hash ^= hash_key_u64(&val, lttng_ht_seed);
328 }
329 return hash;
330}
331
332static
333unsigned long lttng_condition_session_consumed_size_hash(
334 const struct lttng_condition *_condition)
335{
336 unsigned long hash;
337 unsigned long condition_type =
338 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
339 struct lttng_condition_session_consumed_size *condition;
340 uint64_t val;
341
342 condition = container_of(_condition,
343 struct lttng_condition_session_consumed_size, parent);
344
345 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
346 if (condition->session_name) {
347 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
348 }
349 val = condition->consumed_threshold_bytes.value;
350 hash ^= hash_key_u64(&val, lttng_ht_seed);
351 return hash;
352}
353
354
355/*
356 * The lttng_condition hashing code is kept in this file (rather than
357 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
358 * don't want to link in liblttng-ctl.
359 */
360static
361unsigned long lttng_condition_hash(const struct lttng_condition *condition)
362{
363 switch (condition->type) {
364 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
365 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
366 return lttng_condition_buffer_usage_hash(condition);
367 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
368 return lttng_condition_session_consumed_size_hash(condition);
369 default:
370 ERR("[notification-thread] Unexpected condition type caught");
371 abort();
372 }
373}
374
375static
376unsigned long hash_channel_key(struct channel_key *key)
377{
378 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
379 unsigned long domain_hash = hash_key_ulong(
380 (void *) (unsigned long) key->domain, lttng_ht_seed);
381
382 return key_hash ^ domain_hash;
383}
384
385static
386void channel_info_destroy(struct channel_info *channel_info)
387{
388 if (!channel_info) {
389 return;
390 }
391
392 if (channel_info->session_info) {
393 session_info_remove_channel(channel_info->session_info,
394 channel_info);
395 session_info_put(channel_info->session_info);
396 }
397 if (channel_info->name) {
398 free(channel_info->name);
399 }
400 free(channel_info);
401}
402
403/* Don't call directly, use the ref-counting mechanism. */
404static
405void session_info_destroy(void *_data)
406{
407 struct session_info *session_info = _data;
408 int ret;
409
410 assert(session_info);
411 if (session_info->channel_infos_ht) {
412 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
413 if (ret) {
414 ERR("[notification-thread] Failed to destroy channel information hash table");
415 }
416 }
417 free(session_info->name);
418 free(session_info);
419}
420
421static
422void session_info_get(struct session_info *session_info)
423{
424 if (!session_info) {
425 return;
426 }
427 lttng_ref_get(&session_info->ref);
428}
429
430static
431void session_info_put(struct session_info *session_info)
432{
433 if (!session_info) {
434 return;
435 }
436 lttng_ref_put(&session_info->ref);
437}
438
439static
440struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid)
441{
442 struct session_info *session_info;
443
444 assert(name);
445
446 session_info = zmalloc(sizeof(*session_info));
447 if (!session_info) {
448 goto end;
449 }
450 lttng_ref_init(&session_info->ref, session_info_destroy);
451
452 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
453 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
454 if (!session_info->channel_infos_ht) {
455 goto error;
456 }
457
458 cds_lfht_node_init(&session_info->sessions_ht_node);
459 session_info->name = strdup(name);
460 if (!session_info->name) {
461 goto error;
462 }
463 session_info->uid = uid;
464 session_info->gid = gid;
465end:
466 return session_info;
467error:
468 session_info_put(session_info);
469 return NULL;
470}
471
472static
473void session_info_add_channel(struct session_info *session_info,
474 struct channel_info *channel_info)
475{
476 rcu_read_lock();
477 cds_lfht_add(session_info->channel_infos_ht,
478 hash_channel_key(&channel_info->key),
479 &channel_info->session_info_channels_ht_node);
480 rcu_read_unlock();
481}
482
483static
484void session_info_remove_channel(struct session_info *session_info,
485 struct channel_info *channel_info)
486{
487 rcu_read_lock();
488 cds_lfht_del(session_info->channel_infos_ht,
489 &channel_info->session_info_channels_ht_node);
490 rcu_read_unlock();
491}
492
493static
494struct channel_info *channel_info_create(const char *channel_name,
495 struct channel_key *channel_key, uint64_t channel_capacity,
496 struct session_info *session_info)
497{
498 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
499
500 if (!channel_info) {
501 goto end;
502 }
503
504 cds_lfht_node_init(&channel_info->channels_ht_node);
505 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
506 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
507 channel_info->capacity = channel_capacity;
508
509 channel_info->name = strdup(channel_name);
510 if (!channel_info->name) {
511 goto error;
512 }
513
514 /*
515 * Set the references between session and channel infos:
516 * - channel_info holds a strong reference to session_info
517 * - session_info holds a weak reference to channel_info
518 */
519 session_info_get(session_info);
520 session_info_add_channel(session_info, channel_info);
521 channel_info->session_info = session_info;
522end:
523 return channel_info;
524error:
525 channel_info_destroy(channel_info);
526 return NULL;
527}
528
529/* This function must be called with the RCU read lock held. */
530static
531int evaluate_condition_for_client(struct lttng_trigger *trigger,
532 struct lttng_condition *condition,
533 struct notification_client *client,
534 struct notification_thread_state *state)
535{
536 int ret;
537 struct cds_lfht_iter iter;
538 struct cds_lfht_node *node;
539 struct channel_info *channel_info = NULL;
540 struct channel_key *channel_key = NULL;
541 struct channel_state_sample *last_sample = NULL;
542 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
543 struct lttng_evaluation *evaluation = NULL;
544 struct notification_client_list client_list = { 0 };
545 struct notification_client_list_element client_list_element = { 0 };
546
547 assert(trigger);
548 assert(condition);
549 assert(client);
550 assert(state);
551
552 /* Find the channel associated with the trigger. */
553 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
554 channel_trigger_list , channel_triggers_ht_node) {
555 struct lttng_trigger_list_element *element;
556
557 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
558 const struct lttng_condition *current_condition =
559 lttng_trigger_get_const_condition(
560 element->trigger);
561
562 assert(current_condition);
563 if (!lttng_condition_is_equal(condition,
564 current_condition)) {
565 continue;
566 }
567
568 /* Found the trigger, save the channel key. */
569 channel_key = &channel_trigger_list->channel_key;
570 break;
571 }
572 if (channel_key) {
573 /* The channel key was found stop iteration. */
574 break;
575 }
576 }
577
578 if (!channel_key){
579 /* No channel found; normal exit. */
580 DBG("[notification-thread] No channel associated with newly subscribed-to condition");
581 ret = 0;
582 goto end;
583 }
584
585 /* Fetch channel info for the matching channel. */
586 cds_lfht_lookup(state->channels_ht,
587 hash_channel_key(channel_key),
588 match_channel_info,
589 channel_key,
590 &iter);
591 node = cds_lfht_iter_get_node(&iter);
592 assert(node);
593 channel_info = caa_container_of(node, struct channel_info,
594 channels_ht_node);
595
596 /* Retrieve the channel's last sample, if it exists. */
597 cds_lfht_lookup(state->channel_state_ht,
598 hash_channel_key(channel_key),
599 match_channel_state_sample,
600 channel_key,
601 &iter);
602 node = cds_lfht_iter_get_node(&iter);
603 if (node) {
604 last_sample = caa_container_of(node,
605 struct channel_state_sample,
606 channel_state_ht_node);
607 } else {
608 /* Nothing to evaluate, no sample was ever taken. Normal exit */
609 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
610 ret = 0;
611 goto end;
612 }
613
614 ret = evaluate_condition(condition, &evaluation, state,
615 NULL, last_sample,
616 0, channel_info->session_info->consumed_data_size,
617 channel_info);
618 if (ret) {
619 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
620 goto end;
621 }
622
623 if (!evaluation) {
624 /* Evaluation yielded nothing. Normal exit. */
625 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
626 ret = 0;
627 goto end;
628 }
629
630 /*
631 * Create a temporary client list with the client currently
632 * subscribing.
633 */
634 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
635 CDS_INIT_LIST_HEAD(&client_list.list);
636 client_list.trigger = trigger;
637
638 CDS_INIT_LIST_HEAD(&client_list_element.node);
639 client_list_element.client = client;
640 cds_list_add(&client_list_element.node, &client_list.list);
641
642 /* Send evaluation result to the newly-subscribed client. */
643 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
644 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
645 state, channel_info->session_info->uid,
646 channel_info->session_info->gid);
647
648end:
649 return ret;
650}
651
652static
653int notification_thread_client_subscribe(struct notification_client *client,
654 struct lttng_condition *condition,
655 struct notification_thread_state *state,
656 enum lttng_notification_channel_status *_status)
657{
658 int ret = 0;
659 struct cds_lfht_iter iter;
660 struct cds_lfht_node *node;
661 struct notification_client_list *client_list;
662 struct lttng_condition_list_element *condition_list_element = NULL;
663 struct notification_client_list_element *client_list_element = NULL;
664 enum lttng_notification_channel_status status =
665 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
666
667 /*
668 * Ensure that the client has not already subscribed to this condition
669 * before.
670 */
671 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
672 if (lttng_condition_is_equal(condition_list_element->condition,
673 condition)) {
674 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
675 goto end;
676 }
677 }
678
679 condition_list_element = zmalloc(sizeof(*condition_list_element));
680 if (!condition_list_element) {
681 ret = -1;
682 goto error;
683 }
684 client_list_element = zmalloc(sizeof(*client_list_element));
685 if (!client_list_element) {
686 ret = -1;
687 goto error;
688 }
689
690 rcu_read_lock();
691
692 /*
693 * Add the newly-subscribed condition to the client's subscription list.
694 */
695 CDS_INIT_LIST_HEAD(&condition_list_element->node);
696 condition_list_element->condition = condition;
697 cds_list_add(&condition_list_element->node, &client->condition_list);
698
699 cds_lfht_lookup(state->notification_trigger_clients_ht,
700 lttng_condition_hash(condition),
701 match_client_list_condition,
702 condition,
703 &iter);
704 node = cds_lfht_iter_get_node(&iter);
705 if (!node) {
706 /*
707 * No notification-emiting trigger registered with this
708 * condition. We don't evaluate the condition right away
709 * since this trigger is not registered yet.
710 */
711 free(client_list_element);
712 goto end_unlock;
713 }
714
715 client_list = caa_container_of(node, struct notification_client_list,
716 notification_trigger_ht_node);
717 /*
718 * The condition to which the client just subscribed is evaluated
719 * at this point so that conditions that are already TRUE result
720 * in a notification being sent out.
721 */
722 if (evaluate_condition_for_client(client_list->trigger, condition,
723 client, state)) {
724 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
725 ret = -1;
726 free(client_list_element);
727 goto end_unlock;
728 }
729
730 /*
731 * Add the client to the list of clients interested in a given trigger
732 * if a "notification" trigger with a corresponding condition was
733 * added prior.
734 */
735 client_list_element->client = client;
736 CDS_INIT_LIST_HEAD(&client_list_element->node);
737 cds_list_add(&client_list_element->node, &client_list->list);
738end_unlock:
739 rcu_read_unlock();
740end:
741 if (_status) {
742 *_status = status;
743 }
744 return ret;
745error:
746 free(condition_list_element);
747 free(client_list_element);
748 return ret;
749}
750
751static
752int notification_thread_client_unsubscribe(
753 struct notification_client *client,
754 struct lttng_condition *condition,
755 struct notification_thread_state *state,
756 enum lttng_notification_channel_status *_status)
757{
758 struct cds_lfht_iter iter;
759 struct cds_lfht_node *node;
760 struct notification_client_list *client_list;
761 struct lttng_condition_list_element *condition_list_element,
762 *condition_tmp;
763 struct notification_client_list_element *client_list_element,
764 *client_tmp;
765 bool condition_found = false;
766 enum lttng_notification_channel_status status =
767 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
768
769 /* Remove the condition from the client's condition list. */
770 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
771 &client->condition_list, node) {
772 if (!lttng_condition_is_equal(condition_list_element->condition,
773 condition)) {
774 continue;
775 }
776
777 cds_list_del(&condition_list_element->node);
778 /*
779 * The caller may be iterating on the client's conditions to
780 * tear down a client's connection. In this case, the condition
781 * will be destroyed at the end.
782 */
783 if (condition != condition_list_element->condition) {
784 lttng_condition_destroy(
785 condition_list_element->condition);
786 }
787 free(condition_list_element);
788 condition_found = true;
789 break;
790 }
791
792 if (!condition_found) {
793 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
794 goto end;
795 }
796
797 /*
798 * Remove the client from the list of clients interested the trigger
799 * matching the condition.
800 */
801 rcu_read_lock();
802 cds_lfht_lookup(state->notification_trigger_clients_ht,
803 lttng_condition_hash(condition),
804 match_client_list_condition,
805 condition,
806 &iter);
807 node = cds_lfht_iter_get_node(&iter);
808 if (!node) {
809 goto end_unlock;
810 }
811
812 client_list = caa_container_of(node, struct notification_client_list,
813 notification_trigger_ht_node);
814 cds_list_for_each_entry_safe(client_list_element, client_tmp,
815 &client_list->list, node) {
816 if (client_list_element->client->socket != client->socket) {
817 continue;
818 }
819 cds_list_del(&client_list_element->node);
820 free(client_list_element);
821 break;
822 }
823end_unlock:
824 rcu_read_unlock();
825end:
826 lttng_condition_destroy(condition);
827 if (_status) {
828 *_status = status;
829 }
830 return 0;
831}
832
833static
834void notification_client_destroy(struct notification_client *client,
835 struct notification_thread_state *state)
836{
837 struct lttng_condition_list_element *condition_list_element, *tmp;
838
839 if (!client) {
840 return;
841 }
842
843 /* Release all conditions to which the client was subscribed. */
844 cds_list_for_each_entry_safe(condition_list_element, tmp,
845 &client->condition_list, node) {
846 (void) notification_thread_client_unsubscribe(client,
847 condition_list_element->condition, state, NULL);
848 }
849
850 if (client->socket >= 0) {
851 (void) lttcomm_close_unix_sock(client->socket);
852 }
853 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
854 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
855 free(client);
856}
857
858/*
859 * Call with rcu_read_lock held (and hold for the lifetime of the returned
860 * client pointer).
861 */
862static
863struct notification_client *get_client_from_socket(int socket,
864 struct notification_thread_state *state)
865{
866 struct cds_lfht_iter iter;
867 struct cds_lfht_node *node;
868 struct notification_client *client = NULL;
869
870 cds_lfht_lookup(state->client_socket_ht,
871 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
872 match_client,
873 (void *) (unsigned long) socket,
874 &iter);
875 node = cds_lfht_iter_get_node(&iter);
876 if (!node) {
877 goto end;
878 }
879
880 client = caa_container_of(node, struct notification_client,
881 client_socket_ht_node);
882end:
883 return client;
884}
885
886static
887bool buffer_usage_condition_applies_to_channel(
888 struct lttng_condition *condition,
889 struct channel_info *channel_info)
890{
891 enum lttng_condition_status status;
892 enum lttng_domain_type condition_domain;
893 const char *condition_session_name = NULL;
894 const char *condition_channel_name = NULL;
895
896 status = lttng_condition_buffer_usage_get_domain_type(condition,
897 &condition_domain);
898 assert(status == LTTNG_CONDITION_STATUS_OK);
899 if (channel_info->key.domain != condition_domain) {
900 goto fail;
901 }
902
903 status = lttng_condition_buffer_usage_get_session_name(
904 condition, &condition_session_name);
905 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
906
907 status = lttng_condition_buffer_usage_get_channel_name(
908 condition, &condition_channel_name);
909 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
910
911 if (strcmp(channel_info->session_info->name, condition_session_name)) {
912 goto fail;
913 }
914 if (strcmp(channel_info->name, condition_channel_name)) {
915 goto fail;
916 }
917
918 return true;
919fail:
920 return false;
921}
922
923static
924bool session_consumed_size_condition_applies_to_channel(
925 struct lttng_condition *condition,
926 struct channel_info *channel_info)
927{
928 enum lttng_condition_status status;
929 const char *condition_session_name = NULL;
930
931 status = lttng_condition_session_consumed_size_get_session_name(
932 condition, &condition_session_name);
933 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
934
935 if (strcmp(channel_info->session_info->name, condition_session_name)) {
936 goto fail;
937 }
938
939 return true;
940fail:
941 return false;
942}
943
944static
945bool trigger_applies_to_channel(struct lttng_trigger *trigger,
946 struct channel_info *channel_info)
947{
948 struct lttng_condition *condition;
949 bool trigger_applies;
950
951 condition = lttng_trigger_get_condition(trigger);
952 if (!condition) {
953 goto fail;
954 }
955
956 switch (lttng_condition_get_type(condition)) {
957 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
958 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
959 trigger_applies = buffer_usage_condition_applies_to_channel(
960 condition, channel_info);
961 break;
962 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
963 trigger_applies = session_consumed_size_condition_applies_to_channel(
964 condition, channel_info);
965 break;
966 default:
967 goto fail;
968 }
969
970 return trigger_applies;
971fail:
972 return false;
973}
974
975static
976bool trigger_applies_to_client(struct lttng_trigger *trigger,
977 struct notification_client *client)
978{
979 bool applies = false;
980 struct lttng_condition_list_element *condition_list_element;
981
982 cds_list_for_each_entry(condition_list_element, &client->condition_list,
983 node) {
984 applies = lttng_condition_is_equal(
985 condition_list_element->condition,
986 lttng_trigger_get_condition(trigger));
987 if (applies) {
988 break;
989 }
990 }
991 return applies;
992}
993
994static
995int match_session(struct cds_lfht_node *node, const void *key)
996{
997 const char *name = key;
998 struct session_info *session_info = caa_container_of(
999 node, struct session_info, sessions_ht_node);
1000
1001 return !strcmp(session_info->name, name);
1002}
1003
1004static
1005struct session_info *find_or_create_session_info(
1006 struct notification_thread_state *state,
1007 const char *name, uid_t uid, gid_t gid)
1008{
1009 struct session_info *session = NULL;
1010 struct cds_lfht_node *node;
1011 struct cds_lfht_iter iter;
1012
1013 rcu_read_lock();
1014 cds_lfht_lookup(state->sessions_ht,
1015 hash_key_str(name, lttng_ht_seed),
1016 match_session,
1017 name,
1018 &iter);
1019 node = cds_lfht_iter_get_node(&iter);
1020 if (node) {
1021 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1022 name, uid, gid);
1023 session = caa_container_of(node, struct session_info,
1024 sessions_ht_node);
1025 assert(session->uid == uid);
1026 assert(session->gid == gid);
1027 goto end;
1028 }
1029
1030 session = session_info_create(name, uid, gid);
1031 if (!session) {
1032 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1033 name, uid, gid);
1034 goto end;
1035 }
1036
1037 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
1038 &session->sessions_ht_node);
1039end:
1040 rcu_read_unlock();
1041 return session;
1042}
1043
1044static
1045int handle_notification_thread_command_add_channel(
1046 struct notification_thread_state *state,
1047 const char *session_name, uid_t session_uid, gid_t session_gid,
1048 const char *channel_name, enum lttng_domain_type channel_domain,
1049 uint64_t channel_key_int, uint64_t channel_capacity,
1050 enum lttng_error_code *cmd_result)
1051{
1052 struct cds_list_head trigger_list;
1053 struct channel_info *new_channel_info = NULL;
1054 struct channel_key channel_key = {
1055 .key = channel_key_int,
1056 .domain = channel_domain,
1057 };
1058 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1059 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1060 int trigger_count = 0;
1061 struct cds_lfht_iter iter;
1062 struct session_info *session_info = NULL;
1063
1064 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
1065 channel_name, session_name, channel_key_int,
1066 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1067
1068 CDS_INIT_LIST_HEAD(&trigger_list);
1069
1070 session_info = find_or_create_session_info(state, session_name,
1071 session_uid, session_gid);
1072 if (!session_info) {
1073 /* Allocation error or an internal error occured. */
1074 goto error;
1075 }
1076
1077 new_channel_info = channel_info_create(channel_name, &channel_key,
1078 channel_capacity, session_info);
1079 if (!new_channel_info) {
1080 goto error;
1081 }
1082
1083 /* Build a list of all triggers applying to the new channel. */
1084 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1085 node) {
1086 struct lttng_trigger_list_element *new_element;
1087
1088 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
1089 new_channel_info)) {
1090 continue;
1091 }
1092
1093 new_element = zmalloc(sizeof(*new_element));
1094 if (!new_element) {
1095 goto error;
1096 }
1097 CDS_INIT_LIST_HEAD(&new_element->node);
1098 new_element->trigger = trigger_ht_element->trigger;
1099 cds_list_add(&new_element->node, &trigger_list);
1100 trigger_count++;
1101 }
1102
1103 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1104 trigger_count);
1105 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1106 if (!channel_trigger_list) {
1107 goto error;
1108 }
1109 channel_trigger_list->channel_key = new_channel_info->key;
1110 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1111 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1112 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1113
1114 rcu_read_lock();
1115 /* Add channel to the channel_ht which owns the channel_infos. */
1116 cds_lfht_add(state->channels_ht,
1117 hash_channel_key(&new_channel_info->key),
1118 &new_channel_info->channels_ht_node);
1119 /*
1120 * Add the list of triggers associated with this channel to the
1121 * channel_triggers_ht.
1122 */
1123 cds_lfht_add(state->channel_triggers_ht,
1124 hash_channel_key(&new_channel_info->key),
1125 &channel_trigger_list->channel_triggers_ht_node);
1126 rcu_read_unlock();
1127 *cmd_result = LTTNG_OK;
1128 return 0;
1129error:
1130 channel_info_destroy(new_channel_info);
1131 session_info_put(session_info);
1132 return 1;
1133}
1134
1135static
1136int handle_notification_thread_command_remove_channel(
1137 struct notification_thread_state *state,
1138 uint64_t channel_key, enum lttng_domain_type domain,
1139 enum lttng_error_code *cmd_result)
1140{
1141 struct cds_lfht_node *node;
1142 struct cds_lfht_iter iter;
1143 struct lttng_channel_trigger_list *trigger_list;
1144 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1145 struct channel_key key = { .key = channel_key, .domain = domain };
1146 struct channel_info *channel_info;
1147
1148 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1149 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1150
1151 rcu_read_lock();
1152
1153 cds_lfht_lookup(state->channel_triggers_ht,
1154 hash_channel_key(&key),
1155 match_channel_trigger_list,
1156 &key,
1157 &iter);
1158 node = cds_lfht_iter_get_node(&iter);
1159 /*
1160 * There is a severe internal error if we are being asked to remove a
1161 * channel that doesn't exist.
1162 */
1163 if (!node) {
1164 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1165 goto end;
1166 }
1167
1168 /* Free the list of triggers associated with this channel. */
1169 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1170 channel_triggers_ht_node);
1171 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1172 &trigger_list->list, node) {
1173 cds_list_del(&trigger_list_element->node);
1174 free(trigger_list_element);
1175 }
1176 cds_lfht_del(state->channel_triggers_ht, node);
1177 free(trigger_list);
1178
1179 /* Free sampled channel state. */
1180 cds_lfht_lookup(state->channel_state_ht,
1181 hash_channel_key(&key),
1182 match_channel_state_sample,
1183 &key,
1184 &iter);
1185 node = cds_lfht_iter_get_node(&iter);
1186 /*
1187 * This is expected to be NULL if the channel is destroyed before we
1188 * received a sample.
1189 */
1190 if (node) {
1191 struct channel_state_sample *sample = caa_container_of(node,
1192 struct channel_state_sample,
1193 channel_state_ht_node);
1194
1195 cds_lfht_del(state->channel_state_ht, node);
1196 free(sample);
1197 }
1198
1199 /* Remove the channel from the channels_ht and free it. */
1200 cds_lfht_lookup(state->channels_ht,
1201 hash_channel_key(&key),
1202 match_channel_info,
1203 &key,
1204 &iter);
1205 node = cds_lfht_iter_get_node(&iter);
1206 assert(node);
1207 channel_info = caa_container_of(node, struct channel_info,
1208 channels_ht_node);
1209 cds_lfht_del(state->channels_ht, node);
1210 channel_info_destroy(channel_info);
1211end:
1212 rcu_read_unlock();
1213 *cmd_result = LTTNG_OK;
1214 return 0;
1215}
1216
1217static
1218int condition_is_supported(struct lttng_condition *condition)
1219{
1220 int ret;
1221
1222 switch (lttng_condition_get_type(condition)) {
1223 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1224 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1225 {
1226 enum lttng_domain_type domain;
1227
1228 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1229 &domain);
1230 if (ret) {
1231 ret = -1;
1232 goto end;
1233 }
1234
1235 if (domain != LTTNG_DOMAIN_KERNEL) {
1236 ret = 1;
1237 goto end;
1238 }
1239
1240 /*
1241 * Older kernel tracers don't expose the API to monitor their
1242 * buffers. Therefore, we reject triggers that require that
1243 * mechanism to be available to be evaluated.
1244 */
1245 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1246 kernel_tracer_fd);
1247 break;
1248 }
1249 default:
1250 ret = 1;
1251 }
1252end:
1253 return ret;
1254}
1255
1256/*
1257 * FIXME A client's credentials are not checked when registering a trigger, nor
1258 * are they stored alongside with the trigger.
1259 *
1260 * The effects of this are benign since:
1261 * - The client will succeed in registering the trigger, as it is valid,
1262 * - The trigger will, internally, be bound to the channel,
1263 * - The notifications will not be sent since the client's credentials
1264 * are checked against the channel at that moment.
1265 *
1266 * If this function returns a non-zero value, it means something is
1267 * fundamentally broken and the whole subsystem/thread will be torn down.
1268 *
1269 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1270 * error code.
1271 */
1272static
1273int handle_notification_thread_command_register_trigger(
1274 struct notification_thread_state *state,
1275 struct lttng_trigger *trigger,
1276 enum lttng_error_code *cmd_result)
1277{
1278 int ret = 0;
1279 struct lttng_condition *condition;
1280 struct notification_client *client;
1281 struct notification_client_list *client_list = NULL;
1282 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1283 struct notification_client_list_element *client_list_element, *tmp;
1284 struct cds_lfht_node *node;
1285 struct cds_lfht_iter iter;
1286 struct channel_info *channel;
1287 bool free_trigger = true;
1288
1289 rcu_read_lock();
1290
1291 condition = lttng_trigger_get_condition(trigger);
1292 assert(condition);
1293
1294 ret = condition_is_supported(condition);
1295 if (ret < 0) {
1296 goto error;
1297 } else if (ret == 0) {
1298 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
1299 goto error;
1300 } else {
1301 /* Feature is supported, continue. */
1302 ret = 0;
1303 }
1304
1305 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
1306 if (!trigger_ht_element) {
1307 ret = -1;
1308 goto error;
1309 }
1310
1311 /* Add trigger to the trigger_ht. */
1312 cds_lfht_node_init(&trigger_ht_element->node);
1313 trigger_ht_element->trigger = trigger;
1314
1315 node = cds_lfht_add_unique(state->triggers_ht,
1316 lttng_condition_hash(condition),
1317 match_condition,
1318 condition,
1319 &trigger_ht_element->node);
1320 if (node != &trigger_ht_element->node) {
1321 /* Not a fatal error, simply report it to the client. */
1322 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
1323 goto error_free_ht_element;
1324 }
1325
1326 /*
1327 * Ownership of the trigger and of its wrapper was transfered to
1328 * the triggers_ht.
1329 */
1330 trigger_ht_element = NULL;
1331 free_trigger = false;
1332
1333 /*
1334 * The rest only applies to triggers that have a "notify" action.
1335 * It is not skipped as this is the only action type currently
1336 * supported.
1337 */
1338 client_list = zmalloc(sizeof(*client_list));
1339 if (!client_list) {
1340 ret = -1;
1341 goto error_free_ht_element;
1342 }
1343 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
1344 CDS_INIT_LIST_HEAD(&client_list->list);
1345 client_list->trigger = trigger;
1346
1347 /* Build a list of clients to which this new trigger applies. */
1348 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1349 client_socket_ht_node) {
1350 if (!trigger_applies_to_client(trigger, client)) {
1351 continue;
1352 }
1353
1354 client_list_element = zmalloc(sizeof(*client_list_element));
1355 if (!client_list_element) {
1356 ret = -1;
1357 goto error_free_client_list;
1358 }
1359 CDS_INIT_LIST_HEAD(&client_list_element->node);
1360 client_list_element->client = client;
1361 cds_list_add(&client_list_element->node, &client_list->list);
1362 }
1363
1364 cds_lfht_add(state->notification_trigger_clients_ht,
1365 lttng_condition_hash(condition),
1366 &client_list->notification_trigger_ht_node);
1367
1368 /*
1369 * Add the trigger to list of triggers bound to the channels currently
1370 * known.
1371 */
1372 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1373 channels_ht_node) {
1374 struct lttng_trigger_list_element *trigger_list_element;
1375 struct lttng_channel_trigger_list *trigger_list;
1376
1377 if (!trigger_applies_to_channel(trigger, channel)) {
1378 continue;
1379 }
1380
1381 cds_lfht_lookup(state->channel_triggers_ht,
1382 hash_channel_key(&channel->key),
1383 match_channel_trigger_list,
1384 &channel->key,
1385 &iter);
1386 node = cds_lfht_iter_get_node(&iter);
1387 assert(node);
1388 trigger_list = caa_container_of(node,
1389 struct lttng_channel_trigger_list,
1390 channel_triggers_ht_node);
1391
1392 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1393 if (!trigger_list_element) {
1394 ret = -1;
1395 goto error_free_client_list;
1396 }
1397 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1398 trigger_list_element->trigger = trigger;
1399 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1400 }
1401
1402 /*
1403 * Since there is nothing preventing clients from subscribing to a
1404 * condition before the corresponding trigger is registered, we have
1405 * to evaluate this new condition right away.
1406 *
1407 * At some point, we were waiting for the next "evaluation" (e.g. on
1408 * reception of a channel sample) to evaluate this new condition, but
1409 * that was broken.
1410 *
1411 * The reason it was broken is that waiting for the next sample
1412 * does not allow us to properly handle transitions for edge-triggered
1413 * conditions.
1414 *
1415 * Consider this example: when we handle a new channel sample, we
1416 * evaluate each conditions twice: once with the previous state, and
1417 * again with the newest state. We then use those two results to
1418 * determine whether a state change happened: a condition was false and
1419 * became true. If a state change happened, we have to notify clients.
1420 *
1421 * Now, if a client subscribes to a given notification and registers
1422 * a trigger *after* that subscription, we have to make sure the
1423 * condition is evaluated at this point while considering only the
1424 * current state. Otherwise, the next evaluation cycle may only see
1425 * that the evaluations remain the same (true for samples n-1 and n) and
1426 * the client will never know that the condition has been met.
1427 */
1428 cds_list_for_each_entry_safe(client_list_element, tmp,
1429 &client_list->list, node) {
1430 ret = evaluate_condition_for_client(trigger, condition,
1431 client_list_element->client, state);
1432 if (ret) {
1433 goto error_free_client_list;
1434 }
1435 }
1436
1437 /*
1438 * Client list ownership transferred to the
1439 * notification_trigger_clients_ht.
1440 */
1441 client_list = NULL;
1442
1443 *cmd_result = LTTNG_OK;
1444error_free_client_list:
1445 if (client_list) {
1446 cds_list_for_each_entry_safe(client_list_element, tmp,
1447 &client_list->list, node) {
1448 free(client_list_element);
1449 }
1450 free(client_list);
1451 }
1452error_free_ht_element:
1453 free(trigger_ht_element);
1454error:
1455 if (free_trigger) {
1456 struct lttng_action *action = lttng_trigger_get_action(trigger);
1457
1458 lttng_condition_destroy(condition);
1459 lttng_action_destroy(action);
1460 lttng_trigger_destroy(trigger);
1461 }
1462 rcu_read_unlock();
1463 return ret;
1464}
1465
1466static
1467int handle_notification_thread_command_unregister_trigger(
1468 struct notification_thread_state *state,
1469 struct lttng_trigger *trigger,
1470 enum lttng_error_code *_cmd_reply)
1471{
1472 struct cds_lfht_iter iter;
1473 struct cds_lfht_node *node, *triggers_ht_node;
1474 struct lttng_channel_trigger_list *trigger_list;
1475 struct notification_client_list *client_list;
1476 struct notification_client_list_element *client_list_element, *tmp;
1477 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1478 struct lttng_condition *condition = lttng_trigger_get_condition(
1479 trigger);
1480 struct lttng_action *action;
1481 enum lttng_error_code cmd_reply;
1482
1483 rcu_read_lock();
1484
1485 cds_lfht_lookup(state->triggers_ht,
1486 lttng_condition_hash(condition),
1487 match_condition,
1488 condition,
1489 &iter);
1490 triggers_ht_node = cds_lfht_iter_get_node(&iter);
1491 if (!triggers_ht_node) {
1492 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
1493 goto end;
1494 } else {
1495 cmd_reply = LTTNG_OK;
1496 }
1497
1498 /* Remove trigger from channel_triggers_ht. */
1499 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
1500 channel_triggers_ht_node) {
1501 struct lttng_trigger_list_element *trigger_element, *tmp;
1502
1503 cds_list_for_each_entry_safe(trigger_element, tmp,
1504 &trigger_list->list, node) {
1505 const struct lttng_condition *current_condition =
1506 lttng_trigger_get_const_condition(
1507 trigger_element->trigger);
1508
1509 assert(current_condition);
1510 if (!lttng_condition_is_equal(condition,
1511 current_condition)) {
1512 continue;
1513 }
1514
1515 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
1516 cds_list_del(&trigger_element->node);
1517 /* A trigger can only appear once per channel */
1518 break;
1519 }
1520 }
1521
1522 /*
1523 * Remove and release the client list from
1524 * notification_trigger_clients_ht.
1525 */
1526 cds_lfht_lookup(state->notification_trigger_clients_ht,
1527 lttng_condition_hash(condition),
1528 match_client_list,
1529 trigger,
1530 &iter);
1531 node = cds_lfht_iter_get_node(&iter);
1532 assert(node);
1533 client_list = caa_container_of(node, struct notification_client_list,
1534 notification_trigger_ht_node);
1535 cds_list_for_each_entry_safe(client_list_element, tmp,
1536 &client_list->list, node) {
1537 free(client_list_element);
1538 }
1539 cds_lfht_del(state->notification_trigger_clients_ht, node);
1540 free(client_list);
1541
1542 /* Remove trigger from triggers_ht. */
1543 trigger_ht_element = caa_container_of(triggers_ht_node,
1544 struct lttng_trigger_ht_element, node);
1545 cds_lfht_del(state->triggers_ht, triggers_ht_node);
1546
1547 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
1548 lttng_condition_destroy(condition);
1549 action = lttng_trigger_get_action(trigger_ht_element->trigger);
1550 lttng_action_destroy(action);
1551 lttng_trigger_destroy(trigger_ht_element->trigger);
1552 free(trigger_ht_element);
1553end:
1554 rcu_read_unlock();
1555 if (_cmd_reply) {
1556 *_cmd_reply = cmd_reply;
1557 }
1558 return 0;
1559}
1560
1561/* Returns 0 on success, 1 on exit requested, negative value on error. */
1562int handle_notification_thread_command(
1563 struct notification_thread_handle *handle,
1564 struct notification_thread_state *state)
1565{
1566 int ret;
1567 uint64_t counter;
1568 struct notification_thread_command *cmd;
1569
1570 /* Read the event pipe to put it back into a quiescent state. */
1571 ret = read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
1572 sizeof(counter));
1573 if (ret == -1) {
1574 goto error;
1575 }
1576
1577 pthread_mutex_lock(&handle->cmd_queue.lock);
1578 cmd = cds_list_first_entry(&handle->cmd_queue.list,
1579 struct notification_thread_command, cmd_list_node);
1580 switch (cmd->type) {
1581 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
1582 DBG("[notification-thread] Received register trigger command");
1583 ret = handle_notification_thread_command_register_trigger(
1584 state, cmd->parameters.trigger,
1585 &cmd->reply_code);
1586 break;
1587 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
1588 DBG("[notification-thread] Received unregister trigger command");
1589 ret = handle_notification_thread_command_unregister_trigger(
1590 state, cmd->parameters.trigger,
1591 &cmd->reply_code);
1592 break;
1593 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
1594 DBG("[notification-thread] Received add channel command");
1595 ret = handle_notification_thread_command_add_channel(
1596 state,
1597 cmd->parameters.add_channel.session.name,
1598 cmd->parameters.add_channel.session.uid,
1599 cmd->parameters.add_channel.session.gid,
1600 cmd->parameters.add_channel.channel.name,
1601 cmd->parameters.add_channel.channel.domain,
1602 cmd->parameters.add_channel.channel.key,
1603 cmd->parameters.add_channel.channel.capacity,
1604 &cmd->reply_code);
1605 break;
1606 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
1607 DBG("[notification-thread] Received remove channel command");
1608 ret = handle_notification_thread_command_remove_channel(
1609 state, cmd->parameters.remove_channel.key,
1610 cmd->parameters.remove_channel.domain,
1611 &cmd->reply_code);
1612 break;
1613 case NOTIFICATION_COMMAND_TYPE_QUIT:
1614 DBG("[notification-thread] Received quit command");
1615 cmd->reply_code = LTTNG_OK;
1616 ret = 1;
1617 goto end;
1618 default:
1619 ERR("[notification-thread] Unknown internal command received");
1620 goto error_unlock;
1621 }
1622
1623 if (ret) {
1624 goto error_unlock;
1625 }
1626end:
1627 cds_list_del(&cmd->cmd_list_node);
1628 lttng_waiter_wake_up(&cmd->reply_waiter);
1629 pthread_mutex_unlock(&handle->cmd_queue.lock);
1630 return ret;
1631error_unlock:
1632 /* Wake-up and return a fatal error to the calling thread. */
1633 lttng_waiter_wake_up(&cmd->reply_waiter);
1634 pthread_mutex_unlock(&handle->cmd_queue.lock);
1635 cmd->reply_code = LTTNG_ERR_FATAL;
1636error:
1637 /* Indicate a fatal error to the caller. */
1638 return -1;
1639}
1640
1641static
1642unsigned long hash_client_socket(int socket)
1643{
1644 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
1645}
1646
1647static
1648int socket_set_non_blocking(int socket)
1649{
1650 int ret, flags;
1651
1652 /* Set the pipe as non-blocking. */
1653 ret = fcntl(socket, F_GETFL, 0);
1654 if (ret == -1) {
1655 PERROR("fcntl get socket flags");
1656 goto end;
1657 }
1658 flags = ret;
1659
1660 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
1661 if (ret == -1) {
1662 PERROR("fcntl set O_NONBLOCK socket flag");
1663 goto end;
1664 }
1665 DBG("Client socket (fd = %i) set as non-blocking", socket);
1666end:
1667 return ret;
1668}
1669
1670static
1671int client_reset_inbound_state(struct notification_client *client)
1672{
1673 int ret;
1674
1675 ret = lttng_dynamic_buffer_set_size(
1676 &client->communication.inbound.buffer, 0);
1677 assert(!ret);
1678
1679 client->communication.inbound.bytes_to_receive =
1680 sizeof(struct lttng_notification_channel_message);
1681 client->communication.inbound.msg_type =
1682 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
1683 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
1684 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
1685 ret = lttng_dynamic_buffer_set_size(
1686 &client->communication.inbound.buffer,
1687 client->communication.inbound.bytes_to_receive);
1688 return ret;
1689}
1690
1691int handle_notification_thread_client_connect(
1692 struct notification_thread_state *state)
1693{
1694 int ret;
1695 struct notification_client *client;
1696
1697 DBG("[notification-thread] Handling new notification channel client connection");
1698
1699 client = zmalloc(sizeof(*client));
1700 if (!client) {
1701 /* Fatal error. */
1702 ret = -1;
1703 goto error;
1704 }
1705 CDS_INIT_LIST_HEAD(&client->condition_list);
1706 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
1707 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
1708 client->communication.inbound.expect_creds = true;
1709 ret = client_reset_inbound_state(client);
1710 if (ret) {
1711 ERR("[notification-thread] Failed to reset client communication's inbound state");
1712 ret = 0;
1713 goto error;
1714 }
1715
1716 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
1717 if (ret < 0) {
1718 ERR("[notification-thread] Failed to accept new notification channel client connection");
1719 ret = 0;
1720 goto error;
1721 }
1722
1723 client->socket = ret;
1724
1725 ret = socket_set_non_blocking(client->socket);
1726 if (ret) {
1727 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
1728 goto error;
1729 }
1730
1731 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
1732 if (ret < 0) {
1733 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
1734 ret = 0;
1735 goto error;
1736 }
1737
1738 ret = lttng_poll_add(&state->events, client->socket,
1739 LPOLLIN | LPOLLERR |
1740 LPOLLHUP | LPOLLRDHUP);
1741 if (ret < 0) {
1742 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
1743 ret = 0;
1744 goto error;
1745 }
1746 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
1747 client->socket);
1748
1749 rcu_read_lock();
1750 cds_lfht_add(state->client_socket_ht,
1751 hash_client_socket(client->socket),
1752 &client->client_socket_ht_node);
1753 rcu_read_unlock();
1754
1755 return ret;
1756error:
1757 notification_client_destroy(client, state);
1758 return ret;
1759}
1760
1761int handle_notification_thread_client_disconnect(
1762 int client_socket,
1763 struct notification_thread_state *state)
1764{
1765 int ret = 0;
1766 struct notification_client *client;
1767
1768 rcu_read_lock();
1769 DBG("[notification-thread] Closing client connection (socket fd = %i)",
1770 client_socket);
1771 client = get_client_from_socket(client_socket, state);
1772 if (!client) {
1773 /* Internal state corruption, fatal error. */
1774 ERR("[notification-thread] Unable to find client (socket fd = %i)",
1775 client_socket);
1776 ret = -1;
1777 goto end;
1778 }
1779
1780 ret = lttng_poll_del(&state->events, client_socket);
1781 if (ret) {
1782 ERR("[notification-thread] Failed to remove client socket from poll set");
1783 }
1784 cds_lfht_del(state->client_socket_ht,
1785 &client->client_socket_ht_node);
1786 notification_client_destroy(client, state);
1787end:
1788 rcu_read_unlock();
1789 return ret;
1790}
1791
1792int handle_notification_thread_client_disconnect_all(
1793 struct notification_thread_state *state)
1794{
1795 struct cds_lfht_iter iter;
1796 struct notification_client *client;
1797 bool error_encoutered = false;
1798
1799 rcu_read_lock();
1800 DBG("[notification-thread] Closing all client connections");
1801 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1802 client_socket_ht_node) {
1803 int ret;
1804
1805 ret = handle_notification_thread_client_disconnect(
1806 client->socket, state);
1807 if (ret) {
1808 error_encoutered = true;
1809 }
1810 }
1811 rcu_read_unlock();
1812 return error_encoutered ? 1 : 0;
1813}
1814
1815int handle_notification_thread_trigger_unregister_all(
1816 struct notification_thread_state *state)
1817{
1818 bool error_occurred = false;
1819 struct cds_lfht_iter iter;
1820 struct lttng_trigger_ht_element *trigger_ht_element;
1821
1822 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1823 node) {
1824 int ret = handle_notification_thread_command_unregister_trigger(
1825 state, trigger_ht_element->trigger, NULL);
1826 if (ret) {
1827 error_occurred = true;
1828 }
1829 }
1830 return error_occurred ? -1 : 0;
1831}
1832
1833static
1834int client_flush_outgoing_queue(struct notification_client *client,
1835 struct notification_thread_state *state)
1836{
1837 ssize_t ret;
1838 size_t to_send_count;
1839
1840 assert(client->communication.outbound.buffer.size != 0);
1841 to_send_count = client->communication.outbound.buffer.size;
1842 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
1843 client->socket);
1844
1845 ret = lttcomm_send_unix_sock_non_block(client->socket,
1846 client->communication.outbound.buffer.data,
1847 to_send_count);
1848 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
1849 (ret > 0 && ret < to_send_count)) {
1850 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
1851 client->socket);
1852 to_send_count -= max(ret, 0);
1853
1854 memcpy(client->communication.outbound.buffer.data,
1855 client->communication.outbound.buffer.data +
1856 client->communication.outbound.buffer.size - to_send_count,
1857 to_send_count);
1858 ret = lttng_dynamic_buffer_set_size(
1859 &client->communication.outbound.buffer,
1860 to_send_count);
1861 if (ret) {
1862 goto error;
1863 }
1864
1865 /*
1866 * We want to be notified whenever there is buffer space
1867 * available to send the rest of the payload.
1868 */
1869 ret = lttng_poll_mod(&state->events, client->socket,
1870 CLIENT_POLL_MASK_IN_OUT);
1871 if (ret) {
1872 goto error;
1873 }
1874 } else if (ret < 0) {
1875 /* Generic error, disconnect the client. */
1876 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
1877 client->socket);
1878 ret = handle_notification_thread_client_disconnect(
1879 client->socket, state);
1880 if (ret) {
1881 goto error;
1882 }
1883 } else {
1884 /* No error and flushed the queue completely. */
1885 ret = lttng_dynamic_buffer_set_size(
1886 &client->communication.outbound.buffer, 0);
1887 if (ret) {
1888 goto error;
1889 }
1890 ret = lttng_poll_mod(&state->events, client->socket,
1891 CLIENT_POLL_MASK_IN);
1892 if (ret) {
1893 goto error;
1894 }
1895
1896 client->communication.outbound.queued_command_reply = false;
1897 client->communication.outbound.dropped_notification = false;
1898 }
1899
1900 return 0;
1901error:
1902 return -1;
1903}
1904
1905static
1906int client_send_command_reply(struct notification_client *client,
1907 struct notification_thread_state *state,
1908 enum lttng_notification_channel_status status)
1909{
1910 int ret;
1911 struct lttng_notification_channel_command_reply reply = {
1912 .status = (int8_t) status,
1913 };
1914 struct lttng_notification_channel_message msg = {
1915 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
1916 .size = sizeof(reply),
1917 };
1918 char buffer[sizeof(msg) + sizeof(reply)];
1919
1920 if (client->communication.outbound.queued_command_reply) {
1921 /* Protocol error. */
1922 goto error;
1923 }
1924
1925 memcpy(buffer, &msg, sizeof(msg));
1926 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
1927 DBG("[notification-thread] Send command reply (%i)", (int) status);
1928
1929 /* Enqueue buffer to outgoing queue and flush it. */
1930 ret = lttng_dynamic_buffer_append(
1931 &client->communication.outbound.buffer,
1932 buffer, sizeof(buffer));
1933 if (ret) {
1934 goto error;
1935 }
1936
1937 ret = client_flush_outgoing_queue(client, state);
1938 if (ret) {
1939 goto error;
1940 }
1941
1942 if (client->communication.outbound.buffer.size != 0) {
1943 /* Queue could not be emptied. */
1944 client->communication.outbound.queued_command_reply = true;
1945 }
1946
1947 return 0;
1948error:
1949 return -1;
1950}
1951
1952static
1953int client_dispatch_message(struct notification_client *client,
1954 struct notification_thread_state *state)
1955{
1956 int ret = 0;
1957
1958 if (client->communication.inbound.msg_type !=
1959 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
1960 client->communication.inbound.msg_type !=
1961 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
1962 !client->validated) {
1963 WARN("[notification-thread] client attempted a command before handshake");
1964 ret = -1;
1965 goto end;
1966 }
1967
1968 switch (client->communication.inbound.msg_type) {
1969 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
1970 {
1971 /*
1972 * Receiving message header. The function will be called again
1973 * once the rest of the message as been received and can be
1974 * interpreted.
1975 */
1976 const struct lttng_notification_channel_message *msg;
1977
1978 assert(sizeof(*msg) ==
1979 client->communication.inbound.buffer.size);
1980 msg = (const struct lttng_notification_channel_message *)
1981 client->communication.inbound.buffer.data;
1982
1983 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
1984 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
1985 ret = -1;
1986 goto end;
1987 }
1988
1989 switch (msg->type) {
1990 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
1991 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
1992 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
1993 break;
1994 default:
1995 ret = -1;
1996 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
1997 goto end;
1998 }
1999
2000 client->communication.inbound.bytes_to_receive = msg->size;
2001 client->communication.inbound.msg_type =
2002 (enum lttng_notification_channel_message_type) msg->type;
2003 ret = lttng_dynamic_buffer_set_size(
2004 &client->communication.inbound.buffer, msg->size);
2005 if (ret) {
2006 goto end;
2007 }
2008 break;
2009 }
2010 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2011 {
2012 struct lttng_notification_channel_command_handshake *handshake_client;
2013 struct lttng_notification_channel_command_handshake handshake_reply = {
2014 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2015 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2016 };
2017 struct lttng_notification_channel_message msg_header = {
2018 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2019 .size = sizeof(handshake_reply),
2020 };
2021 enum lttng_notification_channel_status status =
2022 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2023 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2024
2025 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2026 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2027 sizeof(handshake_reply));
2028
2029 handshake_client =
2030 (struct lttng_notification_channel_command_handshake *)
2031 client->communication.inbound.buffer.data;
2032 client->major = handshake_client->major;
2033 client->minor = handshake_client->minor;
2034 if (!client->communication.inbound.creds_received) {
2035 ERR("[notification-thread] No credentials received from client");
2036 ret = -1;
2037 goto end;
2038 }
2039
2040 client->uid = LTTNG_SOCK_GET_UID_CRED(
2041 &client->communication.inbound.creds);
2042 client->gid = LTTNG_SOCK_GET_GID_CRED(
2043 &client->communication.inbound.creds);
2044 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2045 client->uid, client->gid, (int) client->major,
2046 (int) client->minor);
2047
2048 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2049 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2050 }
2051
2052 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2053 send_buffer, sizeof(send_buffer));
2054 if (ret) {
2055 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2056 goto end;
2057 }
2058
2059 ret = client_flush_outgoing_queue(client, state);
2060 if (ret) {
2061 goto end;
2062 }
2063
2064 ret = client_send_command_reply(client, state, status);
2065 if (ret) {
2066 ERR("[notification-thread] Failed to send reply to notification channel client");
2067 goto end;
2068 }
2069
2070 /* Set reception state to receive the next message header. */
2071 ret = client_reset_inbound_state(client);
2072 if (ret) {
2073 ERR("[notification-thread] Failed to reset client communication's inbound state");
2074 goto end;
2075 }
2076 client->validated = true;
2077 break;
2078 }
2079 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2080 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2081 {
2082 struct lttng_condition *condition;
2083 enum lttng_notification_channel_status status =
2084 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2085 const struct lttng_buffer_view condition_view =
2086 lttng_buffer_view_from_dynamic_buffer(
2087 &client->communication.inbound.buffer,
2088 0, -1);
2089 size_t expected_condition_size =
2090 client->communication.inbound.buffer.size;
2091
2092 ret = lttng_condition_create_from_buffer(&condition_view,
2093 &condition);
2094 if (ret != expected_condition_size) {
2095 ERR("[notification-thread] Malformed condition received from client");
2096 goto end;
2097 }
2098
2099 if (client->communication.inbound.msg_type ==
2100 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
2101 ret = notification_thread_client_subscribe(client,
2102 condition, state, &status);
2103 } else {
2104 ret = notification_thread_client_unsubscribe(client,
2105 condition, state, &status);
2106 }
2107 if (ret) {
2108 goto end;
2109 }
2110
2111 ret = client_send_command_reply(client, state, status);
2112 if (ret) {
2113 ERR("[notification-thread] Failed to send reply to notification channel client");
2114 goto end;
2115 }
2116
2117 /* Set reception state to receive the next message header. */
2118 ret = client_reset_inbound_state(client);
2119 if (ret) {
2120 ERR("[notification-thread] Failed to reset client communication's inbound state");
2121 goto end;
2122 }
2123 break;
2124 }
2125 default:
2126 abort();
2127 }
2128end:
2129 return ret;
2130}
2131
2132/* Incoming data from client. */
2133int handle_notification_thread_client_in(
2134 struct notification_thread_state *state, int socket)
2135{
2136 int ret = 0;
2137 struct notification_client *client;
2138 ssize_t recv_ret;
2139 size_t offset;
2140
2141 client = get_client_from_socket(socket, state);
2142 if (!client) {
2143 /* Internal error, abort. */
2144 ret = -1;
2145 goto end;
2146 }
2147
2148 offset = client->communication.inbound.buffer.size -
2149 client->communication.inbound.bytes_to_receive;
2150 if (client->communication.inbound.expect_creds) {
2151 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2152 client->communication.inbound.buffer.data + offset,
2153 client->communication.inbound.bytes_to_receive,
2154 &client->communication.inbound.creds);
2155 if (recv_ret > 0) {
2156 client->communication.inbound.expect_creds = false;
2157 client->communication.inbound.creds_received = true;
2158 }
2159 } else {
2160 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2161 client->communication.inbound.buffer.data + offset,
2162 client->communication.inbound.bytes_to_receive);
2163 }
2164 if (recv_ret < 0) {
2165 goto error_disconnect_client;
2166 }
2167
2168 client->communication.inbound.bytes_to_receive -= recv_ret;
2169 if (client->communication.inbound.bytes_to_receive == 0) {
2170 ret = client_dispatch_message(client, state);
2171 if (ret) {
2172 /*
2173 * Only returns an error if this client must be
2174 * disconnected.
2175 */
2176 goto error_disconnect_client;
2177 }
2178 } else {
2179 goto end;
2180 }
2181end:
2182 return ret;
2183error_disconnect_client:
2184 ret = handle_notification_thread_client_disconnect(socket, state);
2185 return ret;
2186}
2187
2188/* Client ready to receive outgoing data. */
2189int handle_notification_thread_client_out(
2190 struct notification_thread_state *state, int socket)
2191{
2192 int ret;
2193 struct notification_client *client;
2194
2195 client = get_client_from_socket(socket, state);
2196 if (!client) {
2197 /* Internal error, abort. */
2198 ret = -1;
2199 goto end;
2200 }
2201
2202 ret = client_flush_outgoing_queue(client, state);
2203 if (ret) {
2204 goto end;
2205 }
2206end:
2207 return ret;
2208}
2209
2210static
2211bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2212 const struct channel_state_sample *sample,
2213 uint64_t buffer_capacity)
2214{
2215 bool result = false;
2216 uint64_t threshold;
2217 enum lttng_condition_type condition_type;
2218 const struct lttng_condition_buffer_usage *use_condition = container_of(
2219 condition, struct lttng_condition_buffer_usage,
2220 parent);
2221
2222 if (use_condition->threshold_bytes.set) {
2223 threshold = use_condition->threshold_bytes.value;
2224 } else {
2225 /*
2226 * Threshold was expressed as a ratio.
2227 *
2228 * TODO the threshold (in bytes) of conditions expressed
2229 * as a ratio of total buffer size could be cached to
2230 * forego this double-multiplication or it could be performed
2231 * as fixed-point math.
2232 *
2233 * Note that caching should accomodate the case where the
2234 * condition applies to multiple channels (i.e. don't assume
2235 * that all channels matching my_chann* have the same size...)
2236 */
2237 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2238 (double) buffer_capacity);
2239 }
2240
2241 condition_type = lttng_condition_get_type(condition);
2242 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2243 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2244 threshold, sample->highest_usage);
2245
2246 /*
2247 * The low condition should only be triggered once _all_ of the
2248 * streams in a channel have gone below the "low" threshold.
2249 */
2250 if (sample->highest_usage <= threshold) {
2251 result = true;
2252 }
2253 } else {
2254 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2255 threshold, sample->highest_usage);
2256
2257 /*
2258 * For high buffer usage scenarios, we want to trigger whenever
2259 * _any_ of the streams has reached the "high" threshold.
2260 */
2261 if (sample->highest_usage >= threshold) {
2262 result = true;
2263 }
2264 }
2265
2266 return result;
2267}
2268
2269static
2270bool evaluate_session_consumed_size_condition(
2271 const struct lttng_condition *condition,
2272 uint64_t session_consumed_size)
2273{
2274 uint64_t threshold;
2275 const struct lttng_condition_session_consumed_size *size_condition =
2276 container_of(condition,
2277 struct lttng_condition_session_consumed_size,
2278 parent);
2279
2280 threshold = size_condition->consumed_threshold_bytes.value;
2281 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
2282 threshold, session_consumed_size);
2283 return session_consumed_size >= threshold;
2284}
2285
2286static
2287int evaluate_condition(const struct lttng_condition *condition,
2288 struct lttng_evaluation **evaluation,
2289 const struct notification_thread_state *state,
2290 const struct channel_state_sample *previous_sample,
2291 const struct channel_state_sample *latest_sample,
2292 uint64_t previous_session_consumed_total,
2293 uint64_t latest_session_consumed_total,
2294 struct channel_info *channel_info)
2295{
2296 int ret = 0;
2297 enum lttng_condition_type condition_type;
2298 const bool previous_sample_available = !!previous_sample;
2299 bool previous_sample_result = false;
2300 bool latest_sample_result;
2301
2302 condition_type = lttng_condition_get_type(condition);
2303
2304 switch (condition_type) {
2305 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2306 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2307 if (caa_likely(previous_sample_available)) {
2308 previous_sample_result =
2309 evaluate_buffer_usage_condition(condition,
2310 previous_sample, channel_info->capacity);
2311 }
2312 latest_sample_result = evaluate_buffer_usage_condition(
2313 condition, latest_sample,
2314 channel_info->capacity);
2315 break;
2316 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2317 if (caa_likely(previous_sample_available)) {
2318 previous_sample_result =
2319 evaluate_session_consumed_size_condition(
2320 condition,
2321 previous_session_consumed_total);
2322 }
2323 latest_sample_result =
2324 evaluate_session_consumed_size_condition(
2325 condition,
2326 latest_session_consumed_total);
2327 break;
2328 default:
2329 /* Unknown condition type; internal error. */
2330 abort();
2331 }
2332
2333 if (!latest_sample_result ||
2334 (previous_sample_result == latest_sample_result)) {
2335 /*
2336 * Only trigger on a condition evaluation transition.
2337 *
2338 * NOTE: This edge-triggered logic may not be appropriate for
2339 * future condition types.
2340 */
2341 goto end;
2342 }
2343
2344 if (!evaluation || !latest_sample_result) {
2345 goto end;
2346 }
2347
2348 switch (condition_type) {
2349 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2350 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2351 *evaluation = lttng_evaluation_buffer_usage_create(
2352 condition_type,
2353 latest_sample->highest_usage,
2354 channel_info->capacity);
2355 break;
2356 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
2357 *evaluation = lttng_evaluation_session_consumed_size_create(
2358 latest_session_consumed_total);
2359 break;
2360 default:
2361 abort();
2362 }
2363
2364 if (!*evaluation) {
2365 ret = -1;
2366 goto end;
2367 }
2368end:
2369 return ret;
2370}
2371
2372static
2373int client_enqueue_dropped_notification(struct notification_client *client,
2374 struct notification_thread_state *state)
2375{
2376 int ret;
2377 struct lttng_notification_channel_message msg = {
2378 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
2379 .size = 0,
2380 };
2381
2382 ret = lttng_dynamic_buffer_append(
2383 &client->communication.outbound.buffer, &msg,
2384 sizeof(msg));
2385 return ret;
2386}
2387
2388static
2389int send_evaluation_to_clients(const struct lttng_trigger *trigger,
2390 const struct lttng_evaluation *evaluation,
2391 struct notification_client_list* client_list,
2392 struct notification_thread_state *state,
2393 uid_t channel_uid, gid_t channel_gid)
2394{
2395 int ret = 0;
2396 struct lttng_dynamic_buffer msg_buffer;
2397 struct notification_client_list_element *client_list_element, *tmp;
2398 const struct lttng_notification notification = {
2399 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
2400 .evaluation = (struct lttng_evaluation *) evaluation,
2401 };
2402 struct lttng_notification_channel_message msg_header = {
2403 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
2404 };
2405
2406 lttng_dynamic_buffer_init(&msg_buffer);
2407
2408 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
2409 sizeof(msg_header));
2410 if (ret) {
2411 goto end;
2412 }
2413
2414 ret = lttng_notification_serialize(&notification, &msg_buffer);
2415 if (ret) {
2416 ERR("[notification-thread] Failed to serialize notification");
2417 ret = -1;
2418 goto end;
2419 }
2420
2421 /* Update payload size. */
2422 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
2423 (uint32_t) (msg_buffer.size - sizeof(msg_header));
2424
2425 cds_list_for_each_entry_safe(client_list_element, tmp,
2426 &client_list->list, node) {
2427 struct notification_client *client =
2428 client_list_element->client;
2429
2430 if (client->uid != channel_uid && client->gid != channel_gid &&
2431 client->uid != 0) {
2432 /* Client is not allowed to monitor this channel. */
2433 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
2434 continue;
2435 }
2436
2437 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
2438 client->socket, msg_buffer.size);
2439 if (client->communication.outbound.buffer.size) {
2440 /*
2441 * Outgoing data is already buffered for this client;
2442 * drop the notification and enqueue a "dropped
2443 * notification" message if this is the first dropped
2444 * notification since the socket spilled-over to the
2445 * queue.
2446 */
2447 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
2448 client->socket);
2449 if (!client->communication.outbound.dropped_notification) {
2450 client->communication.outbound.dropped_notification = true;
2451 ret = client_enqueue_dropped_notification(
2452 client, state);
2453 if (ret) {
2454 goto end;
2455 }
2456 }
2457 continue;
2458 }
2459
2460 ret = lttng_dynamic_buffer_append_buffer(
2461 &client->communication.outbound.buffer,
2462 &msg_buffer);
2463 if (ret) {
2464 goto end;
2465 }
2466
2467 ret = client_flush_outgoing_queue(client, state);
2468 if (ret) {
2469 goto end;
2470 }
2471 }
2472 ret = 0;
2473end:
2474 lttng_dynamic_buffer_reset(&msg_buffer);
2475 return ret;
2476}
2477
2478int handle_notification_thread_channel_sample(
2479 struct notification_thread_state *state, int pipe,
2480 enum lttng_domain_type domain)
2481{
2482 int ret = 0;
2483 struct lttcomm_consumer_channel_monitor_msg sample_msg;
2484 struct channel_info *channel_info;
2485 struct cds_lfht_node *node;
2486 struct cds_lfht_iter iter;
2487 struct lttng_channel_trigger_list *trigger_list;
2488 struct lttng_trigger_list_element *trigger_list_element;
2489 bool previous_sample_available = false;
2490 struct channel_state_sample previous_sample, latest_sample;
2491 uint64_t previous_session_consumed_total, latest_session_consumed_total;
2492
2493 /*
2494 * The monitoring pipe only holds messages smaller than PIPE_BUF,
2495 * ensuring that read/write of sampling messages are atomic.
2496 */
2497 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
2498 if (ret != sizeof(sample_msg)) {
2499 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
2500 pipe);
2501 ret = -1;
2502 goto end;
2503 }
2504
2505 ret = 0;
2506 latest_sample.key.key = sample_msg.key;
2507 latest_sample.key.domain = domain;
2508 latest_sample.highest_usage = sample_msg.highest;
2509 latest_sample.lowest_usage = sample_msg.lowest;
2510 latest_sample.channel_total_consumed = sample_msg.total_consumed;
2511
2512 rcu_read_lock();
2513
2514 /* Retrieve the channel's informations */
2515 cds_lfht_lookup(state->channels_ht,
2516 hash_channel_key(&latest_sample.key),
2517 match_channel_info,
2518 &latest_sample.key,
2519 &iter);
2520 node = cds_lfht_iter_get_node(&iter);
2521 if (caa_unlikely(!node)) {
2522 /*
2523 * Not an error since the consumer can push a sample to the pipe
2524 * and the rest of the session daemon could notify us of the
2525 * channel's destruction before we get a chance to process that
2526 * sample.
2527 */
2528 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
2529 latest_sample.key.key,
2530 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
2531 "user space");
2532 goto end_unlock;
2533 }
2534 channel_info = caa_container_of(node, struct channel_info,
2535 channels_ht_node);
2536 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
2537 channel_info->name,
2538 latest_sample.key.key,
2539 channel_info->session_info->name,
2540 latest_sample.highest_usage,
2541 latest_sample.lowest_usage,
2542 latest_sample.channel_total_consumed);
2543
2544 previous_session_consumed_total =
2545 channel_info->session_info->consumed_data_size;
2546
2547 /* Retrieve the channel's last sample, if it exists, and update it. */
2548 cds_lfht_lookup(state->channel_state_ht,
2549 hash_channel_key(&latest_sample.key),
2550 match_channel_state_sample,
2551 &latest_sample.key,
2552 &iter);
2553 node = cds_lfht_iter_get_node(&iter);
2554 if (caa_likely(node)) {
2555 struct channel_state_sample *stored_sample;
2556
2557 /* Update the sample stored. */
2558 stored_sample = caa_container_of(node,
2559 struct channel_state_sample,
2560 channel_state_ht_node);
2561
2562 memcpy(&previous_sample, stored_sample,
2563 sizeof(previous_sample));
2564 stored_sample->highest_usage = latest_sample.highest_usage;
2565 stored_sample->lowest_usage = latest_sample.lowest_usage;
2566 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
2567 previous_sample_available = true;
2568
2569 latest_session_consumed_total =
2570 previous_session_consumed_total +
2571 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
2572 } else {
2573 /*
2574 * This is the channel's first sample, allocate space for and
2575 * store the new sample.
2576 */
2577 struct channel_state_sample *stored_sample;
2578
2579 stored_sample = zmalloc(sizeof(*stored_sample));
2580 if (!stored_sample) {
2581 ret = -1;
2582 goto end_unlock;
2583 }
2584
2585 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
2586 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
2587 cds_lfht_add(state->channel_state_ht,
2588 hash_channel_key(&stored_sample->key),
2589 &stored_sample->channel_state_ht_node);
2590
2591 latest_session_consumed_total =
2592 previous_session_consumed_total +
2593 latest_sample.channel_total_consumed;
2594 }
2595
2596 channel_info->session_info->consumed_data_size =
2597 latest_session_consumed_total;
2598
2599 /* Find triggers associated with this channel. */
2600 cds_lfht_lookup(state->channel_triggers_ht,
2601 hash_channel_key(&latest_sample.key),
2602 match_channel_trigger_list,
2603 &latest_sample.key,
2604 &iter);
2605 node = cds_lfht_iter_get_node(&iter);
2606 if (caa_likely(!node)) {
2607 goto end_unlock;
2608 }
2609
2610 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
2611 channel_triggers_ht_node);
2612 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
2613 node) {
2614 const struct lttng_condition *condition;
2615 const struct lttng_action *action;
2616 const struct lttng_trigger *trigger;
2617 struct notification_client_list *client_list;
2618 struct lttng_evaluation *evaluation = NULL;
2619
2620 trigger = trigger_list_element->trigger;
2621 condition = lttng_trigger_get_const_condition(trigger);
2622 assert(condition);
2623 action = lttng_trigger_get_const_action(trigger);
2624
2625 /* Notify actions are the only type currently supported. */
2626 assert(lttng_action_get_type_const(action) ==
2627 LTTNG_ACTION_TYPE_NOTIFY);
2628
2629 /*
2630 * Check if any client is subscribed to the result of this
2631 * evaluation.
2632 */
2633 cds_lfht_lookup(state->notification_trigger_clients_ht,
2634 lttng_condition_hash(condition),
2635 match_client_list,
2636 trigger,
2637 &iter);
2638 node = cds_lfht_iter_get_node(&iter);
2639 assert(node);
2640
2641 client_list = caa_container_of(node,
2642 struct notification_client_list,
2643 notification_trigger_ht_node);
2644 if (cds_list_empty(&client_list->list)) {
2645 /*
2646 * No clients interested in the evaluation's result,
2647 * skip it.
2648 */
2649 continue;
2650 }
2651
2652 ret = evaluate_condition(condition, &evaluation, state,
2653 previous_sample_available ? &previous_sample : NULL,
2654 &latest_sample,
2655 previous_session_consumed_total,
2656 latest_session_consumed_total,
2657 channel_info);
2658 if (caa_unlikely(ret)) {
2659 goto end_unlock;
2660 }
2661
2662 if (caa_likely(!evaluation)) {
2663 continue;
2664 }
2665
2666 /* Dispatch evaluation result to all clients. */
2667 ret = send_evaluation_to_clients(trigger_list_element->trigger,
2668 evaluation, client_list, state,
2669 channel_info->session_info->uid,
2670 channel_info->session_info->gid);
2671 lttng_evaluation_destroy(evaluation);
2672 if (caa_unlikely(ret)) {
2673 goto end_unlock;
2674 }
2675 }
2676end_unlock:
2677 rcu_read_unlock();
2678end:
2679 return ret;
2680}
This page took 0.102251 seconds and 5 git commands to generate.