Fix: missing rcu read locking in trigger "unregister all" command
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca
JG
1/*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18#define _LGPL_SOURCE
19#include <urcu.h>
20#include <urcu/rculfhash.h>
21
ab0ee2ca
JG
22#include <common/defaults.h>
23#include <common/error.h>
24#include <common/futex.h>
25#include <common/unix.h>
26#include <common/dynamic-buffer.h>
27#include <common/hashtable/utils.h>
28#include <common/sessiond-comm/sessiond-comm.h>
29#include <common/macros.h>
30#include <lttng/condition/condition.h>
9b63a4aa 31#include <lttng/action/action-internal.h>
ab0ee2ca
JG
32#include <lttng/notification/notification-internal.h>
33#include <lttng/condition/condition-internal.h>
34#include <lttng/condition/buffer-usage-internal.h>
e8360425 35#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 36#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 37#include <lttng/notification/channel-internal.h>
1da26331 38
ab0ee2ca
JG
39#include <time.h>
40#include <unistd.h>
41#include <assert.h>
42#include <inttypes.h>
d53ea4e4 43#include <fcntl.h>
ab0ee2ca 44
1da26331
JG
45#include "notification-thread.h"
46#include "notification-thread-events.h"
47#include "notification-thread-commands.h"
48#include "lttng-sessiond.h"
49#include "kernel.h"
50
ab0ee2ca
JG
51#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
52#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
53
51eab943
JG
54enum lttng_object_type {
55 LTTNG_OBJECT_TYPE_UNKNOWN,
56 LTTNG_OBJECT_TYPE_NONE,
57 LTTNG_OBJECT_TYPE_CHANNEL,
58 LTTNG_OBJECT_TYPE_SESSION,
59};
60
ab0ee2ca 61struct lttng_trigger_list_element {
9e0bb80e
JG
62 /* No ownership of the trigger object is assumed. */
63 const struct lttng_trigger *trigger;
ab0ee2ca
JG
64 struct cds_list_head node;
65};
66
67struct lttng_channel_trigger_list {
68 struct channel_key channel_key;
ea9a44f0 69 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 70 struct cds_list_head list;
ea9a44f0 71 /* Node in the channel_triggers_ht */
ab0ee2ca 72 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
73 /* call_rcu delayed reclaim. */
74 struct rcu_head rcu_node;
ab0ee2ca
JG
75};
76
ea9a44f0
JG
77/*
78 * List of triggers applying to a given session.
79 *
80 * See:
81 * - lttng_session_trigger_list_create()
82 * - lttng_session_trigger_list_build()
83 * - lttng_session_trigger_list_destroy()
84 * - lttng_session_trigger_list_add()
85 */
86struct lttng_session_trigger_list {
87 /*
88 * Not owned by this; points to the session_info structure's
89 * session name.
90 */
91 const char *session_name;
92 /* List of struct lttng_trigger_list_element. */
93 struct cds_list_head list;
94 /* Node in the session_triggers_ht */
95 struct cds_lfht_node session_triggers_ht_node;
96 /*
97 * Weak reference to the notification system's session triggers
98 * hashtable.
99 *
100 * The session trigger list structure structure is owned by
101 * the session's session_info.
102 *
103 * The session_info is kept alive the the channel_infos holding a
104 * reference to it (reference counting). When those channels are
105 * destroyed (at runtime or on teardown), the reference they hold
106 * to the session_info are released. On destruction of session_info,
107 * session_info_destroy() will remove the list of triggers applying
108 * to this session from the notification system's state.
109 *
110 * This implies that the session_triggers_ht must be destroyed
111 * after the channels.
112 */
113 struct cds_lfht *session_triggers_ht;
114 /* Used for delayed RCU reclaim. */
115 struct rcu_head rcu_node;
116};
117
ab0ee2ca
JG
118struct lttng_trigger_ht_element {
119 struct lttng_trigger *trigger;
120 struct cds_lfht_node node;
83b934ad
MD
121 /* call_rcu delayed reclaim. */
122 struct rcu_head rcu_node;
ab0ee2ca
JG
123};
124
125struct lttng_condition_list_element {
126 struct lttng_condition *condition;
127 struct cds_list_head node;
128};
129
130struct notification_client_list_element {
131 struct notification_client *client;
132 struct cds_list_head node;
133};
134
135struct notification_client_list {
f82f93a1 136 const struct lttng_trigger *trigger;
ab0ee2ca
JG
137 struct cds_list_head list;
138 struct cds_lfht_node notification_trigger_ht_node;
83b934ad
MD
139 /* call_rcu delayed reclaim. */
140 struct rcu_head rcu_node;
ab0ee2ca
JG
141};
142
143struct notification_client {
144 int socket;
145 /* Client protocol version. */
146 uint8_t major, minor;
147 uid_t uid;
148 gid_t gid;
149 /*
5332364f 150 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
151 * checked.
152 */
153 bool validated;
154 /*
155 * Conditions to which the client's notification channel is subscribed.
156 * List of struct lttng_condition_list_node. The condition member is
157 * owned by the client.
158 */
159 struct cds_list_head condition_list;
160 struct cds_lfht_node client_socket_ht_node;
161 struct {
162 struct {
14fa22f8
JG
163 /*
164 * During the reception of a message, the reception
165 * buffers' "size" is set to contain the current
166 * message's complete payload.
167 */
ab0ee2ca
JG
168 struct lttng_dynamic_buffer buffer;
169 /* Bytes left to receive for the current message. */
170 size_t bytes_to_receive;
171 /* Type of the message being received. */
172 enum lttng_notification_channel_message_type msg_type;
173 /*
174 * Indicates whether or not credentials are expected
175 * from the client.
176 */
01ea340e 177 bool expect_creds;
ab0ee2ca
JG
178 /*
179 * Indicates whether or not credentials were received
180 * from the client.
181 */
182 bool creds_received;
14fa22f8 183 /* Only used during credentials reception. */
ab0ee2ca
JG
184 lttng_sock_cred creds;
185 } inbound;
186 struct {
187 /*
188 * Indicates whether or not a notification addressed to
189 * this client was dropped because a command reply was
190 * already buffered.
191 *
192 * A notification is dropped whenever the buffer is not
193 * empty.
194 */
195 bool dropped_notification;
196 /*
197 * Indicates whether or not a command reply is already
198 * buffered. In this case, it means that the client is
199 * not consuming command replies before emitting a new
200 * one. This could be caused by a protocol error or a
201 * misbehaving/malicious client.
202 */
203 bool queued_command_reply;
204 struct lttng_dynamic_buffer buffer;
205 } outbound;
206 } communication;
83b934ad
MD
207 /* call_rcu delayed reclaim. */
208 struct rcu_head rcu_node;
ab0ee2ca
JG
209};
210
211struct channel_state_sample {
212 struct channel_key key;
213 struct cds_lfht_node channel_state_ht_node;
214 uint64_t highest_usage;
215 uint64_t lowest_usage;
e8360425 216 uint64_t channel_total_consumed;
83b934ad
MD
217 /* call_rcu delayed reclaim. */
218 struct rcu_head rcu_node;
ab0ee2ca
JG
219};
220
e4db5ace 221static unsigned long hash_channel_key(struct channel_key *key);
ed327204 222static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 223 struct lttng_evaluation **evaluation,
e8360425
JD
224 const struct notification_thread_state *state,
225 const struct channel_state_sample *previous_sample,
226 const struct channel_state_sample *latest_sample,
227 uint64_t previous_session_consumed_total,
228 uint64_t latest_session_consumed_total,
229 struct channel_info *channel_info);
e4db5ace 230static
9b63a4aa
JG
231int send_evaluation_to_clients(const struct lttng_trigger *trigger,
232 const struct lttng_evaluation *evaluation,
e4db5ace
JR
233 struct notification_client_list *client_list,
234 struct notification_thread_state *state,
235 uid_t channel_uid, gid_t channel_gid);
236
8abe313a 237
ea9a44f0 238/* session_info API */
8abe313a
JG
239static
240void session_info_destroy(void *_data);
241static
242void session_info_get(struct session_info *session_info);
243static
244void session_info_put(struct session_info *session_info);
245static
246struct session_info *session_info_create(const char *name,
ea9a44f0 247 uid_t uid, gid_t gid,
1eee26c5
JG
248 struct lttng_session_trigger_list *trigger_list,
249 struct cds_lfht *sessions_ht);
8abe313a
JG
250static
251void session_info_add_channel(struct session_info *session_info,
252 struct channel_info *channel_info);
253static
254void session_info_remove_channel(struct session_info *session_info,
255 struct channel_info *channel_info);
256
ea9a44f0
JG
257/* lttng_session_trigger_list API */
258static
259struct lttng_session_trigger_list *lttng_session_trigger_list_create(
260 const char *session_name,
261 struct cds_lfht *session_triggers_ht);
262static
263struct lttng_session_trigger_list *lttng_session_trigger_list_build(
264 const struct notification_thread_state *state,
265 const char *session_name);
266static
267void lttng_session_trigger_list_destroy(
268 struct lttng_session_trigger_list *list);
269static
270int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
271 const struct lttng_trigger *trigger);
272
273
ab0ee2ca
JG
274static
275int match_client(struct cds_lfht_node *node, const void *key)
276{
277 /* This double-cast is intended to supress pointer-to-cast warning. */
278 int socket = (int) (intptr_t) key;
279 struct notification_client *client;
280
281 client = caa_container_of(node, struct notification_client,
282 client_socket_ht_node);
283
284 return !!(client->socket == socket);
285}
286
287static
288int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
289{
290 struct channel_key *channel_key = (struct channel_key *) key;
291 struct lttng_channel_trigger_list *trigger_list;
292
293 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
294 channel_triggers_ht_node);
295
296 return !!((channel_key->key == trigger_list->channel_key.key) &&
297 (channel_key->domain == trigger_list->channel_key.domain));
298}
299
51eab943
JG
300static
301int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
302{
303 const char *session_name = (const char *) key;
304 struct lttng_session_trigger_list *trigger_list;
305
306 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
307 session_triggers_ht_node);
308
309 return !!(strcmp(trigger_list->session_name, session_name) == 0);
310}
311
ab0ee2ca
JG
312static
313int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
314{
315 struct channel_key *channel_key = (struct channel_key *) key;
316 struct channel_state_sample *sample;
317
318 sample = caa_container_of(node, struct channel_state_sample,
319 channel_state_ht_node);
320
321 return !!((channel_key->key == sample->key.key) &&
322 (channel_key->domain == sample->key.domain));
323}
324
325static
326int match_channel_info(struct cds_lfht_node *node, const void *key)
327{
328 struct channel_key *channel_key = (struct channel_key *) key;
329 struct channel_info *channel_info;
330
331 channel_info = caa_container_of(node, struct channel_info,
332 channels_ht_node);
333
334 return !!((channel_key->key == channel_info->key.key) &&
335 (channel_key->domain == channel_info->key.domain));
336}
337
338static
339int match_condition(struct cds_lfht_node *node, const void *key)
340{
341 struct lttng_condition *condition_key = (struct lttng_condition *) key;
342 struct lttng_trigger_ht_element *trigger;
343 struct lttng_condition *condition;
344
345 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
346 node);
347 condition = lttng_trigger_get_condition(trigger->trigger);
348 assert(condition);
349
350 return !!lttng_condition_is_equal(condition_key, condition);
351}
352
ab0ee2ca
JG
353static
354int match_client_list_condition(struct cds_lfht_node *node, const void *key)
355{
356 struct lttng_condition *condition_key = (struct lttng_condition *) key;
357 struct notification_client_list *client_list;
f82f93a1 358 const struct lttng_condition *condition;
ab0ee2ca
JG
359
360 assert(condition_key);
361
362 client_list = caa_container_of(node, struct notification_client_list,
363 notification_trigger_ht_node);
f82f93a1 364 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
365
366 return !!lttng_condition_is_equal(condition_key, condition);
367}
368
f82f93a1
JG
369static
370int match_session(struct cds_lfht_node *node, const void *key)
371{
372 const char *name = key;
373 struct session_info *session_info = caa_container_of(
374 node, struct session_info, sessions_ht_node);
375
376 return !strcmp(session_info->name, name);
377}
378
ab0ee2ca
JG
379static
380unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 381 const struct lttng_condition *_condition)
ab0ee2ca 382{
9a2746aa
JG
383 unsigned long hash;
384 unsigned long condition_type;
ab0ee2ca
JG
385 struct lttng_condition_buffer_usage *condition;
386
387 condition = container_of(_condition,
388 struct lttng_condition_buffer_usage, parent);
389
9a2746aa
JG
390 condition_type = (unsigned long) condition->parent.type;
391 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
392 if (condition->session_name) {
393 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
394 }
395 if (condition->channel_name) {
8f56701f 396 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
397 }
398 if (condition->domain.set) {
399 hash ^= hash_key_ulong(
400 (void *) condition->domain.type,
401 lttng_ht_seed);
402 }
403 if (condition->threshold_ratio.set) {
404 uint64_t val;
405
406 val = condition->threshold_ratio.value * (double) UINT32_MAX;
407 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 408 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
409 uint64_t val;
410
411 val = condition->threshold_bytes.value;
412 hash ^= hash_key_u64(&val, lttng_ht_seed);
413 }
414 return hash;
415}
416
e8360425
JD
417static
418unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 419 const struct lttng_condition *_condition)
e8360425 420{
9a2746aa
JG
421 unsigned long hash;
422 unsigned long condition_type =
423 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
424 struct lttng_condition_session_consumed_size *condition;
425 uint64_t val;
426
427 condition = container_of(_condition,
428 struct lttng_condition_session_consumed_size, parent);
429
9a2746aa 430 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
431 if (condition->session_name) {
432 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
433 }
434 val = condition->consumed_threshold_bytes.value;
435 hash ^= hash_key_u64(&val, lttng_ht_seed);
436 return hash;
437}
438
a8393880
JG
439static
440unsigned long lttng_condition_session_rotation_hash(
441 const struct lttng_condition *_condition)
442{
443 unsigned long hash, condition_type;
444 struct lttng_condition_session_rotation *condition;
445
446 condition = container_of(_condition,
447 struct lttng_condition_session_rotation, parent);
448 condition_type = (unsigned long) condition->parent.type;
449 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
450 assert(condition->session_name);
451 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
452 return hash;
453}
9a2746aa 454
ab0ee2ca
JG
455/*
456 * The lttng_condition hashing code is kept in this file (rather than
457 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
458 * don't want to link in liblttng-ctl.
459 */
460static
9b63a4aa 461unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
462{
463 switch (condition->type) {
464 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
465 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
466 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
467 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
468 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
469 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
470 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
471 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
472 default:
473 ERR("[notification-thread] Unexpected condition type caught");
474 abort();
475 }
476}
477
e4db5ace
JR
478static
479unsigned long hash_channel_key(struct channel_key *key)
480{
481 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
482 unsigned long domain_hash = hash_key_ulong(
483 (void *) (unsigned long) key->domain, lttng_ht_seed);
484
485 return key_hash ^ domain_hash;
486}
487
f82f93a1
JG
488/*
489 * Get the type of object to which a given condition applies. Bindings let
490 * the notification system evaluate a trigger's condition when a given
491 * object's state is updated.
492 *
493 * For instance, a condition bound to a channel will be evaluated everytime
494 * the channel's state is changed by a channel monitoring sample.
495 */
496static
497enum lttng_object_type get_condition_binding_object(
498 const struct lttng_condition *condition)
499{
500 switch (lttng_condition_get_type(condition)) {
501 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
502 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
503 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
504 return LTTNG_OBJECT_TYPE_CHANNEL;
505 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
506 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
507 return LTTNG_OBJECT_TYPE_SESSION;
508 default:
509 return LTTNG_OBJECT_TYPE_UNKNOWN;
510 }
511}
512
83b934ad
MD
513static
514void free_channel_info_rcu(struct rcu_head *node)
515{
516 free(caa_container_of(node, struct channel_info, rcu_node));
517}
518
ab0ee2ca
JG
519static
520void channel_info_destroy(struct channel_info *channel_info)
521{
522 if (!channel_info) {
523 return;
524 }
525
8abe313a
JG
526 if (channel_info->session_info) {
527 session_info_remove_channel(channel_info->session_info,
528 channel_info);
529 session_info_put(channel_info->session_info);
ab0ee2ca 530 }
8abe313a
JG
531 if (channel_info->name) {
532 free(channel_info->name);
ab0ee2ca 533 }
83b934ad
MD
534 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
535}
536
537static
538void free_session_info_rcu(struct rcu_head *node)
539{
540 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
541}
542
8abe313a 543/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 544static
8abe313a 545void session_info_destroy(void *_data)
ab0ee2ca 546{
8abe313a 547 struct session_info *session_info = _data;
3b68d0a3 548 int ret;
ab0ee2ca 549
8abe313a
JG
550 assert(session_info);
551 if (session_info->channel_infos_ht) {
3b68d0a3
JR
552 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
553 if (ret) {
4c3f302b 554 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 555 }
8abe313a 556 }
ea9a44f0 557 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
558
559 rcu_read_lock();
560 cds_lfht_del(session_info->sessions_ht,
561 &session_info->sessions_ht_node);
562 rcu_read_unlock();
8abe313a 563 free(session_info->name);
83b934ad 564 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 565}
ab0ee2ca 566
8abe313a
JG
567static
568void session_info_get(struct session_info *session_info)
569{
570 if (!session_info) {
571 return;
572 }
573 lttng_ref_get(&session_info->ref);
574}
575
576static
577void session_info_put(struct session_info *session_info)
578{
579 if (!session_info) {
580 return;
ab0ee2ca 581 }
8abe313a
JG
582 lttng_ref_put(&session_info->ref);
583}
584
585static
ea9a44f0 586struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
587 struct lttng_session_trigger_list *trigger_list,
588 struct cds_lfht *sessions_ht)
8abe313a
JG
589{
590 struct session_info *session_info;
ab0ee2ca 591
8abe313a 592 assert(name);
ab0ee2ca 593
8abe313a
JG
594 session_info = zmalloc(sizeof(*session_info));
595 if (!session_info) {
596 goto end;
597 }
598 lttng_ref_init(&session_info->ref, session_info_destroy);
599
600 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
601 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
602 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
603 goto error;
604 }
8abe313a
JG
605
606 cds_lfht_node_init(&session_info->sessions_ht_node);
607 session_info->name = strdup(name);
608 if (!session_info->name) {
ab0ee2ca
JG
609 goto error;
610 }
8abe313a
JG
611 session_info->uid = uid;
612 session_info->gid = gid;
ea9a44f0 613 session_info->trigger_list = trigger_list;
1eee26c5 614 session_info->sessions_ht = sessions_ht;
8abe313a
JG
615end:
616 return session_info;
617error:
618 session_info_put(session_info);
619 return NULL;
620}
621
622static
623void session_info_add_channel(struct session_info *session_info,
624 struct channel_info *channel_info)
625{
626 rcu_read_lock();
627 cds_lfht_add(session_info->channel_infos_ht,
628 hash_channel_key(&channel_info->key),
629 &channel_info->session_info_channels_ht_node);
630 rcu_read_unlock();
631}
632
633static
634void session_info_remove_channel(struct session_info *session_info,
635 struct channel_info *channel_info)
636{
637 rcu_read_lock();
638 cds_lfht_del(session_info->channel_infos_ht,
639 &channel_info->session_info_channels_ht_node);
640 rcu_read_unlock();
641}
642
643static
644struct channel_info *channel_info_create(const char *channel_name,
645 struct channel_key *channel_key, uint64_t channel_capacity,
646 struct session_info *session_info)
647{
648 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
649
650 if (!channel_info) {
651 goto end;
652 }
653
ab0ee2ca 654 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
655 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
656 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
657 channel_info->capacity = channel_capacity;
658
659 channel_info->name = strdup(channel_name);
660 if (!channel_info->name) {
661 goto error;
662 }
663
664 /*
665 * Set the references between session and channel infos:
666 * - channel_info holds a strong reference to session_info
667 * - session_info holds a weak reference to channel_info
668 */
669 session_info_get(session_info);
670 session_info_add_channel(session_info, channel_info);
671 channel_info->session_info = session_info;
ab0ee2ca 672end:
8abe313a 673 return channel_info;
ab0ee2ca 674error:
8abe313a 675 channel_info_destroy(channel_info);
ab0ee2ca
JG
676 return NULL;
677}
678
ed327204
JG
679/* RCU read lock must be held by the caller. */
680static
681struct notification_client_list *get_client_list_from_condition(
682 struct notification_thread_state *state,
683 const struct lttng_condition *condition)
684{
685 struct cds_lfht_node *node;
686 struct cds_lfht_iter iter;
687
688 cds_lfht_lookup(state->notification_trigger_clients_ht,
689 lttng_condition_hash(condition),
690 match_client_list_condition,
691 condition,
692 &iter);
693 node = cds_lfht_iter_get_node(&iter);
694
695 return node ? caa_container_of(node,
696 struct notification_client_list,
697 notification_trigger_ht_node) : NULL;
698}
699
e4db5ace
JR
700/* This function must be called with the RCU read lock held. */
701static
f82f93a1
JG
702int evaluate_channel_condition_for_client(
703 const struct lttng_condition *condition,
704 struct notification_thread_state *state,
705 struct lttng_evaluation **evaluation,
706 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
707{
708 int ret;
709 struct cds_lfht_iter iter;
710 struct cds_lfht_node *node;
711 struct channel_info *channel_info = NULL;
712 struct channel_key *channel_key = NULL;
713 struct channel_state_sample *last_sample = NULL;
714 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 715
f82f93a1 716 /* Find the channel associated with the condition. */
e4db5ace 717 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 718 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
719 struct lttng_trigger_list_element *element;
720
721 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 722 const struct lttng_condition *current_condition =
f82f93a1 723 lttng_trigger_get_const_condition(
e4db5ace
JR
724 element->trigger);
725
726 assert(current_condition);
727 if (!lttng_condition_is_equal(condition,
2ae99f0b 728 current_condition)) {
e4db5ace
JR
729 continue;
730 }
731
732 /* Found the trigger, save the channel key. */
733 channel_key = &channel_trigger_list->channel_key;
734 break;
735 }
736 if (channel_key) {
737 /* The channel key was found stop iteration. */
738 break;
739 }
740 }
741
742 if (!channel_key){
743 /* No channel found; normal exit. */
f82f93a1 744 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
745 ret = 0;
746 goto end;
747 }
748
749 /* Fetch channel info for the matching channel. */
750 cds_lfht_lookup(state->channels_ht,
751 hash_channel_key(channel_key),
752 match_channel_info,
753 channel_key,
754 &iter);
755 node = cds_lfht_iter_get_node(&iter);
756 assert(node);
757 channel_info = caa_container_of(node, struct channel_info,
758 channels_ht_node);
759
760 /* Retrieve the channel's last sample, if it exists. */
761 cds_lfht_lookup(state->channel_state_ht,
762 hash_channel_key(channel_key),
763 match_channel_state_sample,
764 channel_key,
765 &iter);
766 node = cds_lfht_iter_get_node(&iter);
767 if (node) {
768 last_sample = caa_container_of(node,
769 struct channel_state_sample,
770 channel_state_ht_node);
771 } else {
772 /* Nothing to evaluate, no sample was ever taken. Normal exit */
773 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
774 ret = 0;
775 goto end;
776 }
777
f82f93a1 778 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
779 NULL, last_sample,
780 0, channel_info->session_info->consumed_data_size,
781 channel_info);
e4db5ace 782 if (ret) {
066a3c82 783 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
784 goto end;
785 }
786
f82f93a1
JG
787 *session_uid = channel_info->session_info->uid;
788 *session_gid = channel_info->session_info->gid;
789end:
790 return ret;
791}
792
793static
794const char *get_condition_session_name(const struct lttng_condition *condition)
795{
796 const char *session_name = NULL;
797 enum lttng_condition_status status;
798
799 switch (lttng_condition_get_type(condition)) {
800 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
802 status = lttng_condition_buffer_usage_get_session_name(
803 condition, &session_name);
804 break;
805 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
806 status = lttng_condition_session_consumed_size_get_session_name(
807 condition, &session_name);
808 break;
809 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
810 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
811 status = lttng_condition_session_rotation_get_session_name(
812 condition, &session_name);
813 break;
814 default:
815 abort();
816 }
817 if (status != LTTNG_CONDITION_STATUS_OK) {
818 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
819 goto end;
820 }
821end:
822 return session_name;
823}
824
825/* This function must be called with the RCU read lock held. */
826static
827int evaluate_session_condition_for_client(
828 const struct lttng_condition *condition,
829 struct notification_thread_state *state,
830 struct lttng_evaluation **evaluation,
831 uid_t *session_uid, gid_t *session_gid)
832{
833 int ret;
834 struct cds_lfht_iter iter;
835 struct cds_lfht_node *node;
836 const char *session_name;
837 struct session_info *session_info = NULL;
838
839 session_name = get_condition_session_name(condition);
840
841 /* Find the session associated with the trigger. */
842 cds_lfht_lookup(state->sessions_ht,
843 hash_key_str(session_name, lttng_ht_seed),
844 match_session,
845 session_name,
846 &iter);
847 node = cds_lfht_iter_get_node(&iter);
848 if (!node) {
849 DBG("[notification-thread] No known session matching name \"%s\"",
850 session_name);
851 ret = 0;
852 goto end;
853 }
854
855 session_info = caa_container_of(node, struct session_info,
856 sessions_ht_node);
857 session_info_get(session_info);
858
859 /*
860 * Evaluation is performed in-line here since only one type of
861 * session-bound condition is handled for the moment.
862 */
863 switch (lttng_condition_get_type(condition)) {
864 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
865 if (!session_info->rotation.ongoing) {
be558f88 866 ret = 0;
f82f93a1
JG
867 goto end_session_put;
868 }
869
870 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
871 session_info->rotation.id);
872 if (!*evaluation) {
873 /* Fatal error. */
874 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
875 session_info->name);
876 ret = -1;
877 goto end_session_put;
878 }
879 ret = 0;
880 break;
881 default:
882 ret = 0;
883 goto end_session_put;
884 }
885
886 *session_uid = session_info->uid;
887 *session_gid = session_info->gid;
888
889end_session_put:
890 session_info_put(session_info);
891end:
892 return ret;
893}
894
895/* This function must be called with the RCU read lock held. */
896static
897int evaluate_condition_for_client(const struct lttng_trigger *trigger,
898 const struct lttng_condition *condition,
899 struct notification_client *client,
900 struct notification_thread_state *state)
901{
902 int ret;
903 struct lttng_evaluation *evaluation = NULL;
904 struct notification_client_list client_list = { 0 };
905 struct notification_client_list_element client_list_element = { 0 };
906 uid_t object_uid = 0;
907 gid_t object_gid = 0;
908
909 assert(trigger);
910 assert(condition);
911 assert(client);
912 assert(state);
913
914 switch (get_condition_binding_object(condition)) {
915 case LTTNG_OBJECT_TYPE_SESSION:
916 ret = evaluate_session_condition_for_client(condition, state,
917 &evaluation, &object_uid, &object_gid);
918 break;
919 case LTTNG_OBJECT_TYPE_CHANNEL:
920 ret = evaluate_channel_condition_for_client(condition, state,
921 &evaluation, &object_uid, &object_gid);
922 break;
923 case LTTNG_OBJECT_TYPE_NONE:
924 ret = 0;
925 goto end;
926 case LTTNG_OBJECT_TYPE_UNKNOWN:
927 default:
928 ret = -1;
929 goto end;
930 }
931
e4db5ace
JR
932 if (!evaluation) {
933 /* Evaluation yielded nothing. Normal exit. */
934 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
935 ret = 0;
936 goto end;
937 }
938
939 /*
940 * Create a temporary client list with the client currently
941 * subscribing.
942 */
943 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
944 CDS_INIT_LIST_HEAD(&client_list.list);
945 client_list.trigger = trigger;
946
947 CDS_INIT_LIST_HEAD(&client_list_element.node);
948 client_list_element.client = client;
949 cds_list_add(&client_list_element.node, &client_list.list);
950
951 /* Send evaluation result to the newly-subscribed client. */
952 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
953 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 954 state, object_uid, object_gid);
e4db5ace
JR
955
956end:
957 return ret;
958}
959
ab0ee2ca
JG
960static
961int notification_thread_client_subscribe(struct notification_client *client,
962 struct lttng_condition *condition,
963 struct notification_thread_state *state,
964 enum lttng_notification_channel_status *_status)
965{
966 int ret = 0;
ab0ee2ca
JG
967 struct notification_client_list *client_list;
968 struct lttng_condition_list_element *condition_list_element = NULL;
969 struct notification_client_list_element *client_list_element = NULL;
970 enum lttng_notification_channel_status status =
971 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
972
973 /*
974 * Ensure that the client has not already subscribed to this condition
975 * before.
976 */
977 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
978 if (lttng_condition_is_equal(condition_list_element->condition,
979 condition)) {
980 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
981 goto end;
982 }
983 }
984
985 condition_list_element = zmalloc(sizeof(*condition_list_element));
986 if (!condition_list_element) {
987 ret = -1;
988 goto error;
989 }
990 client_list_element = zmalloc(sizeof(*client_list_element));
991 if (!client_list_element) {
992 ret = -1;
993 goto error;
994 }
995
996 rcu_read_lock();
997
998 /*
999 * Add the newly-subscribed condition to the client's subscription list.
1000 */
1001 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1002 condition_list_element->condition = condition;
1003 cds_list_add(&condition_list_element->node, &client->condition_list);
1004
ed327204
JG
1005 client_list = get_client_list_from_condition(state, condition);
1006 if (!client_list) {
2ae99f0b
JG
1007 /*
1008 * No notification-emiting trigger registered with this
1009 * condition. We don't evaluate the condition right away
1010 * since this trigger is not registered yet.
1011 */
4fb43b68 1012 free(client_list_element);
ab0ee2ca
JG
1013 goto end_unlock;
1014 }
1015
2ae99f0b
JG
1016 /*
1017 * The condition to which the client just subscribed is evaluated
1018 * at this point so that conditions that are already TRUE result
1019 * in a notification being sent out.
1020 */
e4db5ace
JR
1021 if (evaluate_condition_for_client(client_list->trigger, condition,
1022 client, state)) {
1023 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1024 ret = -1;
4bd5b4af 1025 free(client_list_element);
e4db5ace
JR
1026 goto end_unlock;
1027 }
1028
1029 /*
1030 * Add the client to the list of clients interested in a given trigger
1031 * if a "notification" trigger with a corresponding condition was
1032 * added prior.
1033 */
ab0ee2ca
JG
1034 client_list_element->client = client;
1035 CDS_INIT_LIST_HEAD(&client_list_element->node);
1036 cds_list_add(&client_list_element->node, &client_list->list);
1037end_unlock:
1038 rcu_read_unlock();
1039end:
1040 if (_status) {
1041 *_status = status;
1042 }
1043 return ret;
1044error:
1045 free(condition_list_element);
1046 free(client_list_element);
1047 return ret;
1048}
1049
1050static
1051int notification_thread_client_unsubscribe(
1052 struct notification_client *client,
1053 struct lttng_condition *condition,
1054 struct notification_thread_state *state,
1055 enum lttng_notification_channel_status *_status)
1056{
ab0ee2ca
JG
1057 struct notification_client_list *client_list;
1058 struct lttng_condition_list_element *condition_list_element,
1059 *condition_tmp;
1060 struct notification_client_list_element *client_list_element,
1061 *client_tmp;
1062 bool condition_found = false;
1063 enum lttng_notification_channel_status status =
1064 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1065
1066 /* Remove the condition from the client's condition list. */
1067 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1068 &client->condition_list, node) {
1069 if (!lttng_condition_is_equal(condition_list_element->condition,
1070 condition)) {
1071 continue;
1072 }
1073
1074 cds_list_del(&condition_list_element->node);
1075 /*
1076 * The caller may be iterating on the client's conditions to
1077 * tear down a client's connection. In this case, the condition
1078 * will be destroyed at the end.
1079 */
1080 if (condition != condition_list_element->condition) {
1081 lttng_condition_destroy(
1082 condition_list_element->condition);
1083 }
1084 free(condition_list_element);
1085 condition_found = true;
1086 break;
1087 }
1088
1089 if (!condition_found) {
1090 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1091 goto end;
1092 }
1093
1094 /*
1095 * Remove the client from the list of clients interested the trigger
1096 * matching the condition.
1097 */
1098 rcu_read_lock();
ed327204
JG
1099 client_list = get_client_list_from_condition(state, condition);
1100 if (!client_list) {
ab0ee2ca
JG
1101 goto end_unlock;
1102 }
1103
ab0ee2ca
JG
1104 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1105 &client_list->list, node) {
1106 if (client_list_element->client->socket != client->socket) {
1107 continue;
1108 }
1109 cds_list_del(&client_list_element->node);
1110 free(client_list_element);
1111 break;
1112 }
1113end_unlock:
1114 rcu_read_unlock();
1115end:
1116 lttng_condition_destroy(condition);
1117 if (_status) {
1118 *_status = status;
1119 }
1120 return 0;
1121}
1122
83b934ad
MD
1123static
1124void free_notification_client_rcu(struct rcu_head *node)
1125{
1126 free(caa_container_of(node, struct notification_client, rcu_node));
1127}
1128
ab0ee2ca
JG
1129static
1130void notification_client_destroy(struct notification_client *client,
1131 struct notification_thread_state *state)
1132{
1133 struct lttng_condition_list_element *condition_list_element, *tmp;
1134
1135 if (!client) {
1136 return;
1137 }
1138
1139 /* Release all conditions to which the client was subscribed. */
1140 cds_list_for_each_entry_safe(condition_list_element, tmp,
1141 &client->condition_list, node) {
1142 (void) notification_thread_client_unsubscribe(client,
1143 condition_list_element->condition, state, NULL);
1144 }
1145
1146 if (client->socket >= 0) {
1147 (void) lttcomm_close_unix_sock(client->socket);
1148 }
1149 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1150 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
83b934ad 1151 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1152}
1153
1154/*
1155 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1156 * client pointer).
1157 */
1158static
1159struct notification_client *get_client_from_socket(int socket,
1160 struct notification_thread_state *state)
1161{
1162 struct cds_lfht_iter iter;
1163 struct cds_lfht_node *node;
1164 struct notification_client *client = NULL;
1165
1166 cds_lfht_lookup(state->client_socket_ht,
1167 hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed),
1168 match_client,
1169 (void *) (unsigned long) socket,
1170 &iter);
1171 node = cds_lfht_iter_get_node(&iter);
1172 if (!node) {
1173 goto end;
1174 }
1175
1176 client = caa_container_of(node, struct notification_client,
1177 client_socket_ht_node);
1178end:
1179 return client;
1180}
1181
1182static
e8360425 1183bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1184 const struct lttng_condition *condition,
1185 const struct channel_info *channel_info)
ab0ee2ca
JG
1186{
1187 enum lttng_condition_status status;
e8360425
JD
1188 enum lttng_domain_type condition_domain;
1189 const char *condition_session_name = NULL;
1190 const char *condition_channel_name = NULL;
ab0ee2ca 1191
e8360425
JD
1192 status = lttng_condition_buffer_usage_get_domain_type(condition,
1193 &condition_domain);
1194 assert(status == LTTNG_CONDITION_STATUS_OK);
1195 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1196 goto fail;
1197 }
1198
e8360425
JD
1199 status = lttng_condition_buffer_usage_get_session_name(
1200 condition, &condition_session_name);
1201 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1202
1203 status = lttng_condition_buffer_usage_get_channel_name(
1204 condition, &condition_channel_name);
1205 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1206
1207 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1208 goto fail;
1209 }
1210 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1211 goto fail;
1212 }
1213
e8360425
JD
1214 return true;
1215fail:
1216 return false;
1217}
1218
1219static
1220bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1221 const struct lttng_condition *condition,
1222 const struct channel_info *channel_info)
e8360425
JD
1223{
1224 enum lttng_condition_status status;
1225 const char *condition_session_name = NULL;
1226
1227 status = lttng_condition_session_consumed_size_get_session_name(
1228 condition, &condition_session_name);
1229 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1230
1231 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1232 goto fail;
1233 }
1234
e8360425
JD
1235 return true;
1236fail:
1237 return false;
1238}
ab0ee2ca 1239
51eab943
JG
1240static
1241bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1242 const struct channel_info *channel_info)
1243{
1244 const struct lttng_condition *condition;
e8360425 1245 bool trigger_applies;
ab0ee2ca 1246
51eab943 1247 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1248 if (!condition) {
ab0ee2ca
JG
1249 goto fail;
1250 }
e8360425
JD
1251
1252 switch (lttng_condition_get_type(condition)) {
1253 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1254 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1255 trigger_applies = buffer_usage_condition_applies_to_channel(
1256 condition, channel_info);
1257 break;
1258 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1259 trigger_applies = session_consumed_size_condition_applies_to_channel(
1260 condition, channel_info);
1261 break;
1262 default:
ab0ee2ca
JG
1263 goto fail;
1264 }
1265
e8360425 1266 return trigger_applies;
ab0ee2ca
JG
1267fail:
1268 return false;
1269}
1270
1271static
1272bool trigger_applies_to_client(struct lttng_trigger *trigger,
1273 struct notification_client *client)
1274{
1275 bool applies = false;
1276 struct lttng_condition_list_element *condition_list_element;
1277
1278 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1279 node) {
1280 applies = lttng_condition_is_equal(
1281 condition_list_element->condition,
1282 lttng_trigger_get_condition(trigger));
1283 if (applies) {
1284 break;
1285 }
1286 }
1287 return applies;
1288}
1289
ed327204
JG
1290/* Must be called with RCU read lock held. */
1291static
1292struct lttng_session_trigger_list *get_session_trigger_list(
1293 struct notification_thread_state *state,
1294 const char *session_name)
1295{
1296 struct lttng_session_trigger_list *list = NULL;
1297 struct cds_lfht_node *node;
1298 struct cds_lfht_iter iter;
1299
1300 cds_lfht_lookup(state->session_triggers_ht,
1301 hash_key_str(session_name, lttng_ht_seed),
1302 match_session_trigger_list,
1303 session_name,
1304 &iter);
1305 node = cds_lfht_iter_get_node(&iter);
1306 if (!node) {
1307 /*
1308 * Not an error, the list of triggers applying to that session
1309 * will be initialized when the session is created.
1310 */
1311 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1312 session_name);
1313 goto end;
1314 }
1315
1316 list = caa_container_of(node,
1317 struct lttng_session_trigger_list,
1318 session_triggers_ht_node);
1319end:
1320 return list;
1321}
1322
ea9a44f0
JG
1323/*
1324 * Allocate an empty lttng_session_trigger_list for the session named
1325 * 'session_name'.
1326 *
1327 * No ownership of 'session_name' is assumed by the session trigger list.
1328 * It is the caller's responsability to ensure the session name is alive
1329 * for as long as this list is.
1330 */
1331static
1332struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1333 const char *session_name,
1334 struct cds_lfht *session_triggers_ht)
1335{
1336 struct lttng_session_trigger_list *list;
1337
1338 list = zmalloc(sizeof(*list));
1339 if (!list) {
1340 goto end;
1341 }
1342 list->session_name = session_name;
1343 CDS_INIT_LIST_HEAD(&list->list);
1344 cds_lfht_node_init(&list->session_triggers_ht_node);
1345 list->session_triggers_ht = session_triggers_ht;
1346
1347 rcu_read_lock();
1348 /* Publish the list through the session_triggers_ht. */
1349 cds_lfht_add(session_triggers_ht,
1350 hash_key_str(session_name, lttng_ht_seed),
1351 &list->session_triggers_ht_node);
1352 rcu_read_unlock();
1353end:
1354 return list;
1355}
1356
1357static
1358void free_session_trigger_list_rcu(struct rcu_head *node)
1359{
1360 free(caa_container_of(node, struct lttng_session_trigger_list,
1361 rcu_node));
1362}
1363
1364static
1365void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1366{
1367 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1368
1369 /* Empty the list element by element, and then free the list itself. */
1370 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1371 &list->list, node) {
1372 cds_list_del(&trigger_list_element->node);
1373 free(trigger_list_element);
1374 }
1375 rcu_read_lock();
1376 /* Unpublish the list from the session_triggers_ht. */
1377 cds_lfht_del(list->session_triggers_ht,
1378 &list->session_triggers_ht_node);
1379 rcu_read_unlock();
1380 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1381}
1382
1383static
1384int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1385 const struct lttng_trigger *trigger)
1386{
1387 int ret = 0;
1388 struct lttng_trigger_list_element *new_element =
1389 zmalloc(sizeof(*new_element));
1390
1391 if (!new_element) {
1392 ret = -1;
1393 goto end;
1394 }
1395 CDS_INIT_LIST_HEAD(&new_element->node);
1396 new_element->trigger = trigger;
1397 cds_list_add(&new_element->node, &list->list);
1398end:
1399 return ret;
1400}
1401
1402static
1403bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1404 const char *session_name)
1405{
1406 bool applies = false;
1407 const struct lttng_condition *condition;
1408
1409 condition = lttng_trigger_get_const_condition(trigger);
1410 switch (lttng_condition_get_type(condition)) {
1411 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1412 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1413 {
1414 enum lttng_condition_status condition_status;
1415 const char *condition_session_name;
1416
1417 condition_status = lttng_condition_session_rotation_get_session_name(
1418 condition, &condition_session_name);
1419 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1420 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1421 goto end;
1422 }
1423
1424 assert(condition_session_name);
1425 applies = !strcmp(condition_session_name, session_name);
1426 break;
1427 }
1428 default:
1429 goto end;
1430 }
1431end:
1432 return applies;
1433}
1434
1435/*
1436 * Allocate and initialize an lttng_session_trigger_list which contains
1437 * all triggers that apply to the session named 'session_name'.
1438 *
1439 * No ownership of 'session_name' is assumed by the session trigger list.
1440 * It is the caller's responsability to ensure the session name is alive
1441 * for as long as this list is.
1442 */
1443static
1444struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1445 const struct notification_thread_state *state,
1446 const char *session_name)
1447{
1448 int trigger_count = 0;
1449 struct lttng_session_trigger_list *session_trigger_list = NULL;
1450 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1451 struct cds_lfht_iter iter;
1452
1453 session_trigger_list = lttng_session_trigger_list_create(session_name,
1454 state->session_triggers_ht);
1455
1456 /* Add all triggers applying to the session named 'session_name'. */
1457 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1458 node) {
1459 int ret;
1460
1461 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1462 session_name)) {
1463 continue;
1464 }
1465
1466 ret = lttng_session_trigger_list_add(session_trigger_list,
1467 trigger_ht_element->trigger);
1468 if (ret) {
1469 goto error;
1470 }
1471
1472 trigger_count++;
1473 }
1474
1475 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1476 trigger_count);
1477 return session_trigger_list;
1478error:
1479 lttng_session_trigger_list_destroy(session_trigger_list);
1480 return NULL;
1481}
1482
8abe313a
JG
1483static
1484struct session_info *find_or_create_session_info(
1485 struct notification_thread_state *state,
1486 const char *name, uid_t uid, gid_t gid)
1487{
1488 struct session_info *session = NULL;
1489 struct cds_lfht_node *node;
1490 struct cds_lfht_iter iter;
ea9a44f0 1491 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1492
1493 rcu_read_lock();
1494 cds_lfht_lookup(state->sessions_ht,
1495 hash_key_str(name, lttng_ht_seed),
1496 match_session,
1497 name,
1498 &iter);
1499 node = cds_lfht_iter_get_node(&iter);
1500 if (node) {
1501 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1502 name, uid, gid);
1503 session = caa_container_of(node, struct session_info,
1504 sessions_ht_node);
1505 assert(session->uid == uid);
1506 assert(session->gid == gid);
1eee26c5
JG
1507 session_info_get(session);
1508 goto end;
ea9a44f0
JG
1509 }
1510
1511 trigger_list = lttng_session_trigger_list_build(state, name);
1512 if (!trigger_list) {
1513 goto error;
8abe313a
JG
1514 }
1515
1eee26c5
JG
1516 session = session_info_create(name, uid, gid, trigger_list,
1517 state->sessions_ht);
8abe313a
JG
1518 if (!session) {
1519 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1520 name, uid, gid);
b248644d 1521 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1522 goto error;
8abe313a 1523 }
ea9a44f0 1524 trigger_list = NULL;
577983cb
JG
1525
1526 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1527 &session->sessions_ht_node);
1eee26c5 1528end:
8abe313a
JG
1529 rcu_read_unlock();
1530 return session;
ea9a44f0
JG
1531error:
1532 rcu_read_unlock();
1533 session_info_put(session);
1534 return NULL;
8abe313a
JG
1535}
1536
ab0ee2ca
JG
1537static
1538int handle_notification_thread_command_add_channel(
8abe313a
JG
1539 struct notification_thread_state *state,
1540 const char *session_name, uid_t session_uid, gid_t session_gid,
1541 const char *channel_name, enum lttng_domain_type channel_domain,
1542 uint64_t channel_key_int, uint64_t channel_capacity,
1543 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1544{
1545 struct cds_list_head trigger_list;
8abe313a
JG
1546 struct channel_info *new_channel_info = NULL;
1547 struct channel_key channel_key = {
1548 .key = channel_key_int,
1549 .domain = channel_domain,
1550 };
ab0ee2ca
JG
1551 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1552 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1553 int trigger_count = 0;
1554 struct cds_lfht_iter iter;
8abe313a 1555 struct session_info *session_info = NULL;
ab0ee2ca
JG
1556
1557 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1558 channel_name, session_name, channel_key_int,
1559 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1560
1561 CDS_INIT_LIST_HEAD(&trigger_list);
1562
8abe313a
JG
1563 session_info = find_or_create_session_info(state, session_name,
1564 session_uid, session_gid);
1565 if (!session_info) {
a9577b76 1566 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1567 goto error;
1568 }
1569
8abe313a
JG
1570 new_channel_info = channel_info_create(channel_name, &channel_key,
1571 channel_capacity, session_info);
1572 if (!new_channel_info) {
1573 goto error;
1574 }
ab0ee2ca 1575
83b934ad 1576 rcu_read_lock();
ab0ee2ca
JG
1577 /* Build a list of all triggers applying to the new channel. */
1578 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1579 node) {
1580 struct lttng_trigger_list_element *new_element;
1581
1582 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1583 new_channel_info)) {
ab0ee2ca
JG
1584 continue;
1585 }
1586
1587 new_element = zmalloc(sizeof(*new_element));
1588 if (!new_element) {
83b934ad 1589 rcu_read_unlock();
ab0ee2ca
JG
1590 goto error;
1591 }
1592 CDS_INIT_LIST_HEAD(&new_element->node);
1593 new_element->trigger = trigger_ht_element->trigger;
1594 cds_list_add(&new_element->node, &trigger_list);
1595 trigger_count++;
1596 }
83b934ad 1597 rcu_read_unlock();
ab0ee2ca
JG
1598
1599 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1600 trigger_count);
1601 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1602 if (!channel_trigger_list) {
1603 goto error;
1604 }
8abe313a 1605 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1606 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1607 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1608 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1609
1610 rcu_read_lock();
1611 /* Add channel to the channel_ht which owns the channel_infos. */
1612 cds_lfht_add(state->channels_ht,
8abe313a 1613 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1614 &new_channel_info->channels_ht_node);
1615 /*
1616 * Add the list of triggers associated with this channel to the
1617 * channel_triggers_ht.
1618 */
1619 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1620 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1621 &channel_trigger_list->channel_triggers_ht_node);
1622 rcu_read_unlock();
1eee26c5 1623 session_info_put(session_info);
ab0ee2ca
JG
1624 *cmd_result = LTTNG_OK;
1625 return 0;
1626error:
ab0ee2ca 1627 channel_info_destroy(new_channel_info);
8abe313a 1628 session_info_put(session_info);
ab0ee2ca
JG
1629 return 1;
1630}
1631
83b934ad
MD
1632static
1633void free_channel_trigger_list_rcu(struct rcu_head *node)
1634{
1635 free(caa_container_of(node, struct lttng_channel_trigger_list,
1636 rcu_node));
1637}
1638
1639static
1640void free_channel_state_sample_rcu(struct rcu_head *node)
1641{
1642 free(caa_container_of(node, struct channel_state_sample,
1643 rcu_node));
1644}
1645
ab0ee2ca
JG
1646static
1647int handle_notification_thread_command_remove_channel(
1648 struct notification_thread_state *state,
1649 uint64_t channel_key, enum lttng_domain_type domain,
1650 enum lttng_error_code *cmd_result)
1651{
1652 struct cds_lfht_node *node;
1653 struct cds_lfht_iter iter;
1654 struct lttng_channel_trigger_list *trigger_list;
1655 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1656 struct channel_key key = { .key = channel_key, .domain = domain };
1657 struct channel_info *channel_info;
1658
1659 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1660 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1661
1662 rcu_read_lock();
1663
1664 cds_lfht_lookup(state->channel_triggers_ht,
1665 hash_channel_key(&key),
1666 match_channel_trigger_list,
1667 &key,
1668 &iter);
1669 node = cds_lfht_iter_get_node(&iter);
1670 /*
1671 * There is a severe internal error if we are being asked to remove a
1672 * channel that doesn't exist.
1673 */
1674 if (!node) {
1675 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1676 goto end;
1677 }
1678
1679 /* Free the list of triggers associated with this channel. */
1680 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1681 channel_triggers_ht_node);
1682 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1683 &trigger_list->list, node) {
1684 cds_list_del(&trigger_list_element->node);
1685 free(trigger_list_element);
1686 }
1687 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1688 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1689
1690 /* Free sampled channel state. */
1691 cds_lfht_lookup(state->channel_state_ht,
1692 hash_channel_key(&key),
1693 match_channel_state_sample,
1694 &key,
1695 &iter);
1696 node = cds_lfht_iter_get_node(&iter);
1697 /*
1698 * This is expected to be NULL if the channel is destroyed before we
1699 * received a sample.
1700 */
1701 if (node) {
1702 struct channel_state_sample *sample = caa_container_of(node,
1703 struct channel_state_sample,
1704 channel_state_ht_node);
1705
1706 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1707 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1708 }
1709
1710 /* Remove the channel from the channels_ht and free it. */
1711 cds_lfht_lookup(state->channels_ht,
1712 hash_channel_key(&key),
1713 match_channel_info,
1714 &key,
1715 &iter);
1716 node = cds_lfht_iter_get_node(&iter);
1717 assert(node);
1718 channel_info = caa_container_of(node, struct channel_info,
1719 channels_ht_node);
1720 cds_lfht_del(state->channels_ht, node);
1721 channel_info_destroy(channel_info);
1722end:
1723 rcu_read_unlock();
1724 *cmd_result = LTTNG_OK;
1725 return 0;
1726}
1727
0ca52944 1728static
ed327204 1729int handle_notification_thread_command_session_rotation(
0ca52944 1730 struct notification_thread_state *state,
ed327204
JG
1731 enum notification_thread_command_type cmd_type,
1732 const char *session_name, uid_t session_uid, gid_t session_gid,
1733 uint64_t trace_archive_chunk_id,
1734 struct lttng_trace_archive_location *location,
1735 enum lttng_error_code *_cmd_result)
0ca52944 1736{
ed327204
JG
1737 int ret = 0;
1738 enum lttng_error_code cmd_result = LTTNG_OK;
1739 struct lttng_session_trigger_list *trigger_list;
1740 struct lttng_trigger_list_element *trigger_list_element;
1741 struct session_info *session_info;
0ca52944 1742
ed327204
JG
1743 rcu_read_lock();
1744
1745 session_info = find_or_create_session_info(state, session_name,
1746 session_uid, session_gid);
1747 if (!session_info) {
a9577b76 1748 /* Allocation error or an internal error occurred. */
ed327204
JG
1749 ret = -1;
1750 cmd_result = LTTNG_ERR_NOMEM;
1751 goto end;
1752 }
1753
1754 session_info->rotation.ongoing =
1755 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1756 session_info->rotation.id = trace_archive_chunk_id;
1757 trigger_list = get_session_trigger_list(state, session_name);
1758 if (!trigger_list) {
1759 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1760 session_name);
1761 goto end;
1762 }
1763
1764 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1765 node) {
1766 const struct lttng_condition *condition;
1767 const struct lttng_action *action;
1768 const struct lttng_trigger *trigger;
1769 struct notification_client_list *client_list;
1770 struct lttng_evaluation *evaluation = NULL;
1771 enum lttng_condition_type condition_type;
1772
1773 trigger = trigger_list_element->trigger;
1774 condition = lttng_trigger_get_const_condition(trigger);
1775 assert(condition);
1776 condition_type = lttng_condition_get_type(condition);
1777
1778 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1779 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1780 continue;
1781 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1782 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1783 continue;
1784 }
1785
1786 action = lttng_trigger_get_const_action(trigger);
1787
1788 /* Notify actions are the only type currently supported. */
1789 assert(lttng_action_get_type_const(action) ==
1790 LTTNG_ACTION_TYPE_NOTIFY);
1791
1792 client_list = get_client_list_from_condition(state, condition);
1793 assert(client_list);
1794
1795 if (cds_list_empty(&client_list->list)) {
1796 /*
1797 * No clients interested in the evaluation's result,
1798 * skip it.
1799 */
1800 continue;
1801 }
1802
1803 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1804 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1805 trace_archive_chunk_id);
1806 } else {
1807 evaluation = lttng_evaluation_session_rotation_completed_create(
1808 trace_archive_chunk_id, location);
1809 }
1810
1811 if (!evaluation) {
1812 /* Internal error */
1813 ret = -1;
1814 cmd_result = LTTNG_ERR_UNK;
1815 goto end;
1816 }
1817
1818 /* Dispatch evaluation result to all clients. */
1819 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1820 evaluation, client_list, state,
1821 session_info->uid,
1822 session_info->gid);
1823 lttng_evaluation_destroy(evaluation);
1824 if (caa_unlikely(ret)) {
1825 goto end;
1826 }
1827 }
1828end:
1829 session_info_put(session_info);
1830 *_cmd_result = cmd_result;
1831 rcu_read_unlock();
1832 return ret;
0ca52944
JG
1833}
1834
1da26331
JG
1835static
1836int condition_is_supported(struct lttng_condition *condition)
1837{
1838 int ret;
1839
1840 switch (lttng_condition_get_type(condition)) {
1841 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1842 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1843 {
1844 enum lttng_domain_type domain;
1845
1846 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1847 &domain);
1848 if (ret) {
1849 ret = -1;
1850 goto end;
1851 }
1852
1853 if (domain != LTTNG_DOMAIN_KERNEL) {
1854 ret = 1;
1855 goto end;
1856 }
1857
1858 /*
1859 * Older kernel tracers don't expose the API to monitor their
1860 * buffers. Therefore, we reject triggers that require that
1861 * mechanism to be available to be evaluated.
1862 */
1863 ret = kernel_supports_ring_buffer_snapshot_sample_positions(
1864 kernel_tracer_fd);
1865 break;
1866 }
1867 default:
1868 ret = 1;
1869 }
1870end:
1871 return ret;
1872}
1873
51eab943
JG
1874/* Must be called with RCU read lock held. */
1875static
1876int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1877 struct notification_thread_state *state)
1878{
1879 int ret = 0;
51eab943
JG
1880 const struct lttng_condition *condition;
1881 const char *session_name;
1882 struct lttng_session_trigger_list *trigger_list;
1883
1884 condition = lttng_trigger_get_const_condition(trigger);
1885 switch (lttng_condition_get_type(condition)) {
1886 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1887 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1888 {
1889 enum lttng_condition_status status;
1890
1891 status = lttng_condition_session_rotation_get_session_name(
1892 condition, &session_name);
1893 if (status != LTTNG_CONDITION_STATUS_OK) {
1894 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1895 ret = -1;
1896 goto end;
1897 }
1898 break;
1899 }
1900 default:
1901 ret = -1;
1902 goto end;
1903 }
1904
ed327204
JG
1905 trigger_list = get_session_trigger_list(state, session_name);
1906 if (!trigger_list) {
51eab943
JG
1907 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1908 session_name);
1909 goto end;
51eab943 1910
ed327204 1911 }
51eab943
JG
1912
1913 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1914 session_name);
1915 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1916end:
1917 return ret;
1918}
1919
1920/* Must be called with RCU read lock held. */
1921static
1922int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1923 struct notification_thread_state *state)
1924{
1925 int ret = 0;
1926 struct cds_lfht_node *node;
1927 struct cds_lfht_iter iter;
1928 struct channel_info *channel;
1929
1930 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1931 channels_ht_node) {
1932 struct lttng_trigger_list_element *trigger_list_element;
1933 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 1934 struct cds_lfht_iter lookup_iter;
51eab943
JG
1935
1936 if (!trigger_applies_to_channel(trigger, channel)) {
1937 continue;
1938 }
1939
1940 cds_lfht_lookup(state->channel_triggers_ht,
1941 hash_channel_key(&channel->key),
1942 match_channel_trigger_list,
1943 &channel->key,
a5d64ae7
MD
1944 &lookup_iter);
1945 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
1946 assert(node);
1947 trigger_list = caa_container_of(node,
1948 struct lttng_channel_trigger_list,
1949 channel_triggers_ht_node);
1950
1951 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1952 if (!trigger_list_element) {
1953 ret = -1;
1954 goto end;
1955 }
1956 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
1957 trigger_list_element->trigger = trigger;
1958 cds_list_add(&trigger_list_element->node, &trigger_list->list);
1959 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
1960 channel->name);
1961 }
1962end:
1963 return ret;
1964}
1965
ab0ee2ca
JG
1966/*
1967 * FIXME A client's credentials are not checked when registering a trigger, nor
1968 * are they stored alongside with the trigger.
1969 *
1da26331 1970 * The effects of this are benign since:
ab0ee2ca 1971 * - The client will succeed in registering the trigger, as it is valid,
51eab943 1972 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
1973 * - The notifications will not be sent since the client's credentials
1974 * are checked against the channel at that moment.
1da26331
JG
1975 *
1976 * If this function returns a non-zero value, it means something is
50ca7858 1977 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
1978 *
1979 * If a non-fatal error occurs, just set the cmd_result to the appropriate
1980 * error code.
ab0ee2ca
JG
1981 */
1982static
1983int handle_notification_thread_command_register_trigger(
8abe313a
JG
1984 struct notification_thread_state *state,
1985 struct lttng_trigger *trigger,
1986 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1987{
1988 int ret = 0;
1989 struct lttng_condition *condition;
1990 struct notification_client *client;
1991 struct notification_client_list *client_list = NULL;
1992 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1993 struct notification_client_list_element *client_list_element, *tmp;
1994 struct cds_lfht_node *node;
1995 struct cds_lfht_iter iter;
ab0ee2ca
JG
1996 bool free_trigger = true;
1997
1998 rcu_read_lock();
1999
2000 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2001 assert(condition);
2002
2003 ret = condition_is_supported(condition);
2004 if (ret < 0) {
2005 goto error;
2006 } else if (ret == 0) {
2007 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2008 goto error;
2009 } else {
2010 /* Feature is supported, continue. */
2011 ret = 0;
2012 }
2013
ab0ee2ca
JG
2014 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2015 if (!trigger_ht_element) {
2016 ret = -1;
2017 goto error;
2018 }
2019
2020 /* Add trigger to the trigger_ht. */
2021 cds_lfht_node_init(&trigger_ht_element->node);
2022 trigger_ht_element->trigger = trigger;
2023
2024 node = cds_lfht_add_unique(state->triggers_ht,
2025 lttng_condition_hash(condition),
2026 match_condition,
2027 condition,
2028 &trigger_ht_element->node);
2029 if (node != &trigger_ht_element->node) {
2030 /* Not a fatal error, simply report it to the client. */
2031 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2032 goto error_free_ht_element;
2033 }
2034
2035 /*
2036 * Ownership of the trigger and of its wrapper was transfered to
2037 * the triggers_ht.
2038 */
2039 trigger_ht_element = NULL;
2040 free_trigger = false;
2041
2042 /*
2043 * The rest only applies to triggers that have a "notify" action.
2044 * It is not skipped as this is the only action type currently
2045 * supported.
2046 */
2047 client_list = zmalloc(sizeof(*client_list));
2048 if (!client_list) {
2049 ret = -1;
2050 goto error_free_ht_element;
2051 }
2052 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2053 CDS_INIT_LIST_HEAD(&client_list->list);
2054 client_list->trigger = trigger;
2055
2056 /* Build a list of clients to which this new trigger applies. */
2057 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2058 client_socket_ht_node) {
2059 if (!trigger_applies_to_client(trigger, client)) {
2060 continue;
2061 }
2062
2063 client_list_element = zmalloc(sizeof(*client_list_element));
2064 if (!client_list_element) {
2065 ret = -1;
2066 goto error_free_client_list;
2067 }
2068 CDS_INIT_LIST_HEAD(&client_list_element->node);
2069 client_list_element->client = client;
2070 cds_list_add(&client_list_element->node, &client_list->list);
2071 }
2072
2073 cds_lfht_add(state->notification_trigger_clients_ht,
2074 lttng_condition_hash(condition),
2075 &client_list->notification_trigger_ht_node);
ab0ee2ca 2076
f82f93a1 2077 switch (get_condition_binding_object(condition)) {
51eab943
JG
2078 case LTTNG_OBJECT_TYPE_SESSION:
2079 /* Add the trigger to the list if it matches a known session. */
2080 ret = bind_trigger_to_matching_session(trigger, state);
2081 if (ret) {
2082 goto error_free_client_list;
ab0ee2ca 2083 }
f82f93a1 2084 break;
51eab943
JG
2085 case LTTNG_OBJECT_TYPE_CHANNEL:
2086 /*
2087 * Add the trigger to list of triggers bound to the channels
2088 * currently known.
2089 */
2090 ret = bind_trigger_to_matching_channels(trigger, state);
2091 if (ret) {
ab0ee2ca
JG
2092 goto error_free_client_list;
2093 }
51eab943
JG
2094 break;
2095 case LTTNG_OBJECT_TYPE_NONE:
2096 break;
2097 default:
2098 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2099 ret = -1;
2100 goto error_free_client_list;
ab0ee2ca
JG
2101 }
2102
2ae99f0b
JG
2103 /*
2104 * Since there is nothing preventing clients from subscribing to a
2105 * condition before the corresponding trigger is registered, we have
2106 * to evaluate this new condition right away.
2107 *
2108 * At some point, we were waiting for the next "evaluation" (e.g. on
2109 * reception of a channel sample) to evaluate this new condition, but
2110 * that was broken.
2111 *
2112 * The reason it was broken is that waiting for the next sample
2113 * does not allow us to properly handle transitions for edge-triggered
2114 * conditions.
2115 *
2116 * Consider this example: when we handle a new channel sample, we
2117 * evaluate each conditions twice: once with the previous state, and
2118 * again with the newest state. We then use those two results to
2119 * determine whether a state change happened: a condition was false and
2120 * became true. If a state change happened, we have to notify clients.
2121 *
2122 * Now, if a client subscribes to a given notification and registers
2123 * a trigger *after* that subscription, we have to make sure the
2124 * condition is evaluated at this point while considering only the
2125 * current state. Otherwise, the next evaluation cycle may only see
2126 * that the evaluations remain the same (true for samples n-1 and n) and
2127 * the client will never know that the condition has been met.
2128 */
2129 cds_list_for_each_entry_safe(client_list_element, tmp,
2130 &client_list->list, node) {
2131 ret = evaluate_condition_for_client(trigger, condition,
2132 client_list_element->client, state);
2133 if (ret) {
2134 goto error_free_client_list;
2135 }
2136 }
2137
2138 /*
2139 * Client list ownership transferred to the
2140 * notification_trigger_clients_ht.
2141 */
2142 client_list = NULL;
2143
ab0ee2ca
JG
2144 *cmd_result = LTTNG_OK;
2145error_free_client_list:
2146 if (client_list) {
2147 cds_list_for_each_entry_safe(client_list_element, tmp,
2148 &client_list->list, node) {
2149 free(client_list_element);
2150 }
2151 free(client_list);
2152 }
2153error_free_ht_element:
2154 free(trigger_ht_element);
2155error:
2156 if (free_trigger) {
2157 struct lttng_action *action = lttng_trigger_get_action(trigger);
2158
2159 lttng_condition_destroy(condition);
2160 lttng_action_destroy(action);
2161 lttng_trigger_destroy(trigger);
2162 }
2163 rcu_read_unlock();
2164 return ret;
2165}
2166
83b934ad
MD
2167static
2168void free_notification_client_list_rcu(struct rcu_head *node)
2169{
2170 free(caa_container_of(node, struct notification_client_list,
2171 rcu_node));
2172}
2173
2174static
2175void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2176{
2177 free(caa_container_of(node, struct lttng_trigger_ht_element,
2178 rcu_node));
2179}
2180
cc2295b5 2181static
ab0ee2ca
JG
2182int handle_notification_thread_command_unregister_trigger(
2183 struct notification_thread_state *state,
2184 struct lttng_trigger *trigger,
2185 enum lttng_error_code *_cmd_reply)
2186{
2187 struct cds_lfht_iter iter;
ed327204 2188 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2189 struct lttng_channel_trigger_list *trigger_list;
2190 struct notification_client_list *client_list;
2191 struct notification_client_list_element *client_list_element, *tmp;
2192 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2193 struct lttng_condition *condition = lttng_trigger_get_condition(
2194 trigger);
2195 struct lttng_action *action;
2196 enum lttng_error_code cmd_reply;
2197
2198 rcu_read_lock();
2199
2200 cds_lfht_lookup(state->triggers_ht,
2201 lttng_condition_hash(condition),
2202 match_condition,
2203 condition,
2204 &iter);
2205 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2206 if (!triggers_ht_node) {
2207 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2208 goto end;
2209 } else {
2210 cmd_reply = LTTNG_OK;
2211 }
2212
2213 /* Remove trigger from channel_triggers_ht. */
2214 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2215 channel_triggers_ht_node) {
2216 struct lttng_trigger_list_element *trigger_element, *tmp;
2217
2218 cds_list_for_each_entry_safe(trigger_element, tmp,
2219 &trigger_list->list, node) {
9b63a4aa
JG
2220 const struct lttng_condition *current_condition =
2221 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2222 trigger_element->trigger);
2223
2224 assert(current_condition);
2225 if (!lttng_condition_is_equal(condition,
2226 current_condition)) {
2227 continue;
2228 }
2229
2230 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2231 cds_list_del(&trigger_element->node);
e4db5ace
JR
2232 /* A trigger can only appear once per channel */
2233 break;
ab0ee2ca
JG
2234 }
2235 }
2236
2237 /*
2238 * Remove and release the client list from
2239 * notification_trigger_clients_ht.
2240 */
ed327204
JG
2241 client_list = get_client_list_from_condition(state, condition);
2242 assert(client_list);
2243
ab0ee2ca
JG
2244 cds_list_for_each_entry_safe(client_list_element, tmp,
2245 &client_list->list, node) {
2246 free(client_list_element);
2247 }
ed327204
JG
2248 cds_lfht_del(state->notification_trigger_clients_ht,
2249 &client_list->notification_trigger_ht_node);
83b934ad 2250 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
ab0ee2ca
JG
2251
2252 /* Remove trigger from triggers_ht. */
2253 trigger_ht_element = caa_container_of(triggers_ht_node,
2254 struct lttng_trigger_ht_element, node);
2255 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2256
2257 condition = lttng_trigger_get_condition(trigger_ht_element->trigger);
2258 lttng_condition_destroy(condition);
2259 action = lttng_trigger_get_action(trigger_ht_element->trigger);
2260 lttng_action_destroy(action);
2261 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2262 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2263end:
2264 rcu_read_unlock();
2265 if (_cmd_reply) {
2266 *_cmd_reply = cmd_reply;
2267 }
2268 return 0;
2269}
2270
2271/* Returns 0 on success, 1 on exit requested, negative value on error. */
2272int handle_notification_thread_command(
2273 struct notification_thread_handle *handle,
2274 struct notification_thread_state *state)
2275{
2276 int ret;
2277 uint64_t counter;
2278 struct notification_thread_command *cmd;
2279
8abe313a 2280 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2281 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2282 sizeof(counter));
18aeca05 2283 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2284 goto error;
2285 }
2286
2287 pthread_mutex_lock(&handle->cmd_queue.lock);
2288 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2289 struct notification_thread_command, cmd_list_node);
2290 switch (cmd->type) {
2291 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2292 DBG("[notification-thread] Received register trigger command");
2293 ret = handle_notification_thread_command_register_trigger(
2294 state, cmd->parameters.trigger,
2295 &cmd->reply_code);
2296 break;
2297 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2298 DBG("[notification-thread] Received unregister trigger command");
2299 ret = handle_notification_thread_command_unregister_trigger(
2300 state, cmd->parameters.trigger,
2301 &cmd->reply_code);
2302 break;
2303 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2304 DBG("[notification-thread] Received add channel command");
2305 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2306 state,
2307 cmd->parameters.add_channel.session.name,
2308 cmd->parameters.add_channel.session.uid,
2309 cmd->parameters.add_channel.session.gid,
2310 cmd->parameters.add_channel.channel.name,
2311 cmd->parameters.add_channel.channel.domain,
2312 cmd->parameters.add_channel.channel.key,
2313 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2314 &cmd->reply_code);
2315 break;
2316 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2317 DBG("[notification-thread] Received remove channel command");
2318 ret = handle_notification_thread_command_remove_channel(
2319 state, cmd->parameters.remove_channel.key,
2320 cmd->parameters.remove_channel.domain,
2321 &cmd->reply_code);
2322 break;
0ca52944 2323 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2324 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2325 DBG("[notification-thread] Received session rotation %s command",
2326 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2327 "ongoing" : "completed");
2328 ret = handle_notification_thread_command_session_rotation(
0ca52944 2329 state,
ed327204
JG
2330 cmd->type,
2331 cmd->parameters.session_rotation.session_name,
2332 cmd->parameters.session_rotation.uid,
2333 cmd->parameters.session_rotation.gid,
2334 cmd->parameters.session_rotation.trace_archive_chunk_id,
2335 cmd->parameters.session_rotation.location,
0ca52944
JG
2336 &cmd->reply_code);
2337 break;
ab0ee2ca
JG
2338 case NOTIFICATION_COMMAND_TYPE_QUIT:
2339 DBG("[notification-thread] Received quit command");
2340 cmd->reply_code = LTTNG_OK;
2341 ret = 1;
2342 goto end;
2343 default:
2344 ERR("[notification-thread] Unknown internal command received");
2345 goto error_unlock;
2346 }
2347
2348 if (ret) {
2349 goto error_unlock;
2350 }
2351end:
2352 cds_list_del(&cmd->cmd_list_node);
8ada111f 2353 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2354 pthread_mutex_unlock(&handle->cmd_queue.lock);
2355 return ret;
2356error_unlock:
2357 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2358 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2359 pthread_mutex_unlock(&handle->cmd_queue.lock);
2360 cmd->reply_code = LTTNG_ERR_FATAL;
2361error:
2362 /* Indicate a fatal error to the caller. */
2363 return -1;
2364}
2365
2366static
2367unsigned long hash_client_socket(int socket)
2368{
2369 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
2370}
2371
2372static
2373int socket_set_non_blocking(int socket)
2374{
2375 int ret, flags;
2376
2377 /* Set the pipe as non-blocking. */
2378 ret = fcntl(socket, F_GETFL, 0);
2379 if (ret == -1) {
2380 PERROR("fcntl get socket flags");
2381 goto end;
2382 }
2383 flags = ret;
2384
2385 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2386 if (ret == -1) {
2387 PERROR("fcntl set O_NONBLOCK socket flag");
2388 goto end;
2389 }
2390 DBG("Client socket (fd = %i) set as non-blocking", socket);
2391end:
2392 return ret;
2393}
2394
2395static
14fa22f8 2396int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2397{
2398 int ret;
2399
2400 ret = lttng_dynamic_buffer_set_size(
2401 &client->communication.inbound.buffer, 0);
2402 assert(!ret);
2403
2404 client->communication.inbound.bytes_to_receive =
2405 sizeof(struct lttng_notification_channel_message);
2406 client->communication.inbound.msg_type =
2407 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2408 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2409 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2410 ret = lttng_dynamic_buffer_set_size(
2411 &client->communication.inbound.buffer,
2412 client->communication.inbound.bytes_to_receive);
2413 return ret;
ab0ee2ca
JG
2414}
2415
2416int handle_notification_thread_client_connect(
2417 struct notification_thread_state *state)
2418{
2419 int ret;
2420 struct notification_client *client;
2421
2422 DBG("[notification-thread] Handling new notification channel client connection");
2423
2424 client = zmalloc(sizeof(*client));
2425 if (!client) {
2426 /* Fatal error. */
2427 ret = -1;
2428 goto error;
2429 }
2430 CDS_INIT_LIST_HEAD(&client->condition_list);
2431 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2432 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2433 client->communication.inbound.expect_creds = true;
14fa22f8
JG
2434 ret = client_reset_inbound_state(client);
2435 if (ret) {
2436 ERR("[notification-thread] Failed to reset client communication's inbound state");
2437 ret = 0;
2438 goto error;
2439 }
ab0ee2ca
JG
2440
2441 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2442 if (ret < 0) {
2443 ERR("[notification-thread] Failed to accept new notification channel client connection");
2444 ret = 0;
2445 goto error;
2446 }
2447
2448 client->socket = ret;
2449
2450 ret = socket_set_non_blocking(client->socket);
2451 if (ret) {
2452 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2453 goto error;
2454 }
2455
2456 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2457 if (ret < 0) {
2458 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2459 ret = 0;
2460 goto error;
2461 }
2462
2463 ret = lttng_poll_add(&state->events, client->socket,
2464 LPOLLIN | LPOLLERR |
2465 LPOLLHUP | LPOLLRDHUP);
2466 if (ret < 0) {
2467 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2468 ret = 0;
2469 goto error;
2470 }
2471 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2472 client->socket);
2473
ab0ee2ca
JG
2474 rcu_read_lock();
2475 cds_lfht_add(state->client_socket_ht,
2476 hash_client_socket(client->socket),
2477 &client->client_socket_ht_node);
2478 rcu_read_unlock();
2479
2480 return ret;
2481error:
2482 notification_client_destroy(client, state);
2483 return ret;
2484}
2485
2486int handle_notification_thread_client_disconnect(
2487 int client_socket,
2488 struct notification_thread_state *state)
2489{
2490 int ret = 0;
2491 struct notification_client *client;
2492
2493 rcu_read_lock();
2494 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2495 client_socket);
2496 client = get_client_from_socket(client_socket, state);
2497 if (!client) {
2498 /* Internal state corruption, fatal error. */
2499 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2500 client_socket);
2501 ret = -1;
2502 goto end;
2503 }
2504
2505 ret = lttng_poll_del(&state->events, client_socket);
2506 if (ret) {
2507 ERR("[notification-thread] Failed to remove client socket from poll set");
2508 }
2509 cds_lfht_del(state->client_socket_ht,
2510 &client->client_socket_ht_node);
2511 notification_client_destroy(client, state);
2512end:
2513 rcu_read_unlock();
2514 return ret;
2515}
2516
2517int handle_notification_thread_client_disconnect_all(
2518 struct notification_thread_state *state)
2519{
2520 struct cds_lfht_iter iter;
2521 struct notification_client *client;
2522 bool error_encoutered = false;
2523
2524 rcu_read_lock();
2525 DBG("[notification-thread] Closing all client connections");
2526 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2527 client_socket_ht_node) {
2528 int ret;
2529
2530 ret = handle_notification_thread_client_disconnect(
2531 client->socket, state);
2532 if (ret) {
2533 error_encoutered = true;
2534 }
2535 }
2536 rcu_read_unlock();
2537 return error_encoutered ? 1 : 0;
2538}
2539
2540int handle_notification_thread_trigger_unregister_all(
2541 struct notification_thread_state *state)
2542{
4149ace8 2543 bool error_occurred = false;
ab0ee2ca
JG
2544 struct cds_lfht_iter iter;
2545 struct lttng_trigger_ht_element *trigger_ht_element;
2546
763de384 2547 rcu_read_lock();
ab0ee2ca
JG
2548 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2549 node) {
2550 int ret = handle_notification_thread_command_unregister_trigger(
2551 state, trigger_ht_element->trigger, NULL);
2552 if (ret) {
4149ace8 2553 error_occurred = true;
ab0ee2ca
JG
2554 }
2555 }
763de384 2556 rcu_read_unlock();
4149ace8 2557 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2558}
2559
2560static
2561int client_flush_outgoing_queue(struct notification_client *client,
2562 struct notification_thread_state *state)
2563{
2564 ssize_t ret;
2565 size_t to_send_count;
2566
2567 assert(client->communication.outbound.buffer.size != 0);
2568 to_send_count = client->communication.outbound.buffer.size;
2569 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2570 client->socket);
2571
2572 ret = lttcomm_send_unix_sock_non_block(client->socket,
2573 client->communication.outbound.buffer.data,
2574 to_send_count);
2575 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
2576 (ret > 0 && ret < to_send_count)) {
2577 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2578 client->socket);
2579 to_send_count -= max(ret, 0);
2580
2581 memcpy(client->communication.outbound.buffer.data,
2582 client->communication.outbound.buffer.data +
2583 client->communication.outbound.buffer.size - to_send_count,
2584 to_send_count);
2585 ret = lttng_dynamic_buffer_set_size(
2586 &client->communication.outbound.buffer,
2587 to_send_count);
2588 if (ret) {
2589 goto error;
2590 }
2591
2592 /*
2593 * We want to be notified whenever there is buffer space
2594 * available to send the rest of the payload.
2595 */
2596 ret = lttng_poll_mod(&state->events, client->socket,
2597 CLIENT_POLL_MASK_IN_OUT);
2598 if (ret) {
2599 goto error;
2600 }
2601 } else if (ret < 0) {
2602 /* Generic error, disconnect the client. */
2603 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2604 client->socket);
2605 ret = handle_notification_thread_client_disconnect(
2606 client->socket, state);
2607 if (ret) {
2608 goto error;
2609 }
2610 } else {
2611 /* No error and flushed the queue completely. */
2612 ret = lttng_dynamic_buffer_set_size(
2613 &client->communication.outbound.buffer, 0);
2614 if (ret) {
2615 goto error;
2616 }
2617 ret = lttng_poll_mod(&state->events, client->socket,
2618 CLIENT_POLL_MASK_IN);
2619 if (ret) {
2620 goto error;
2621 }
2622
2623 client->communication.outbound.queued_command_reply = false;
2624 client->communication.outbound.dropped_notification = false;
2625 }
2626
2627 return 0;
2628error:
2629 return -1;
2630}
2631
2632static
2633int client_send_command_reply(struct notification_client *client,
2634 struct notification_thread_state *state,
2635 enum lttng_notification_channel_status status)
2636{
2637 int ret;
2638 struct lttng_notification_channel_command_reply reply = {
2639 .status = (int8_t) status,
2640 };
2641 struct lttng_notification_channel_message msg = {
2642 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2643 .size = sizeof(reply),
2644 };
2645 char buffer[sizeof(msg) + sizeof(reply)];
2646
2647 if (client->communication.outbound.queued_command_reply) {
2648 /* Protocol error. */
2649 goto error;
2650 }
2651
2652 memcpy(buffer, &msg, sizeof(msg));
2653 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2654 DBG("[notification-thread] Send command reply (%i)", (int) status);
2655
2656 /* Enqueue buffer to outgoing queue and flush it. */
2657 ret = lttng_dynamic_buffer_append(
2658 &client->communication.outbound.buffer,
2659 buffer, sizeof(buffer));
2660 if (ret) {
2661 goto error;
2662 }
2663
2664 ret = client_flush_outgoing_queue(client, state);
2665 if (ret) {
2666 goto error;
2667 }
2668
2669 if (client->communication.outbound.buffer.size != 0) {
2670 /* Queue could not be emptied. */
2671 client->communication.outbound.queued_command_reply = true;
2672 }
2673
2674 return 0;
2675error:
2676 return -1;
2677}
2678
2679static
2680int client_dispatch_message(struct notification_client *client,
2681 struct notification_thread_state *state)
2682{
2683 int ret = 0;
2684
2685 if (client->communication.inbound.msg_type !=
2686 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2687 client->communication.inbound.msg_type !=
2688 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2689 !client->validated) {
2690 WARN("[notification-thread] client attempted a command before handshake");
2691 ret = -1;
2692 goto end;
2693 }
2694
2695 switch (client->communication.inbound.msg_type) {
2696 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2697 {
2698 /*
2699 * Receiving message header. The function will be called again
2700 * once the rest of the message as been received and can be
2701 * interpreted.
2702 */
2703 const struct lttng_notification_channel_message *msg;
2704
2705 assert(sizeof(*msg) ==
2706 client->communication.inbound.buffer.size);
2707 msg = (const struct lttng_notification_channel_message *)
2708 client->communication.inbound.buffer.data;
2709
2710 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2711 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2712 ret = -1;
2713 goto end;
2714 }
2715
2716 switch (msg->type) {
2717 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2718 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2719 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2720 break;
2721 default:
2722 ret = -1;
2723 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2724 goto end;
2725 }
2726
2727 client->communication.inbound.bytes_to_receive = msg->size;
2728 client->communication.inbound.msg_type =
2729 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2730 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2731 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2732 if (ret) {
2733 goto end;
2734 }
2735 break;
2736 }
2737 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2738 {
2739 struct lttng_notification_channel_command_handshake *handshake_client;
2740 struct lttng_notification_channel_command_handshake handshake_reply = {
2741 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2742 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2743 };
2744 struct lttng_notification_channel_message msg_header = {
2745 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2746 .size = sizeof(handshake_reply),
2747 };
2748 enum lttng_notification_channel_status status =
2749 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2750 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2751
2752 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2753 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2754 sizeof(handshake_reply));
2755
2756 handshake_client =
2757 (struct lttng_notification_channel_command_handshake *)
2758 client->communication.inbound.buffer.data;
47a32869 2759 client->major = handshake_client->major;
ab0ee2ca
JG
2760 client->minor = handshake_client->minor;
2761 if (!client->communication.inbound.creds_received) {
2762 ERR("[notification-thread] No credentials received from client");
2763 ret = -1;
2764 goto end;
2765 }
2766
2767 client->uid = LTTNG_SOCK_GET_UID_CRED(
2768 &client->communication.inbound.creds);
2769 client->gid = LTTNG_SOCK_GET_GID_CRED(
2770 &client->communication.inbound.creds);
2771 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2772 client->uid, client->gid, (int) client->major,
2773 (int) client->minor);
2774
2775 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2776 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2777 }
2778
2779 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2780 send_buffer, sizeof(send_buffer));
2781 if (ret) {
2782 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2783 goto end;
2784 }
2785
2786 ret = client_flush_outgoing_queue(client, state);
2787 if (ret) {
2788 goto end;
2789 }
2790
2791 ret = client_send_command_reply(client, state, status);
2792 if (ret) {
2793 ERR("[notification-thread] Failed to send reply to notification channel client");
2794 goto end;
2795 }
2796
2797 /* Set reception state to receive the next message header. */
14fa22f8
JG
2798 ret = client_reset_inbound_state(client);
2799 if (ret) {
2800 ERR("[notification-thread] Failed to reset client communication's inbound state");
2801 goto end;
2802 }
ab0ee2ca
JG
2803 client->validated = true;
2804 break;
2805 }
2806 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2807 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2808 {
2809 struct lttng_condition *condition;
2810 enum lttng_notification_channel_status status =
2811 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2812 const struct lttng_buffer_view condition_view =
2813 lttng_buffer_view_from_dynamic_buffer(
2814 &client->communication.inbound.buffer,
2815 0, -1);
2816 size_t expected_condition_size =
2817 client->communication.inbound.buffer.size;
2818
2819 ret = lttng_condition_create_from_buffer(&condition_view,
2820 &condition);
2821 if (ret != expected_condition_size) {
2822 ERR("[notification-thread] Malformed condition received from client");
2823 goto end;
2824 }
2825
2826 if (client->communication.inbound.msg_type ==
2827 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2828 ret = notification_thread_client_subscribe(client,
2829 condition, state, &status);
2830 } else {
2831 ret = notification_thread_client_unsubscribe(client,
2832 condition, state, &status);
2833 }
2834 if (ret) {
2835 goto end;
2836 }
2837
2838 ret = client_send_command_reply(client, state, status);
2839 if (ret) {
2840 ERR("[notification-thread] Failed to send reply to notification channel client");
2841 goto end;
2842 }
2843
2844 /* Set reception state to receive the next message header. */
14fa22f8
JG
2845 ret = client_reset_inbound_state(client);
2846 if (ret) {
2847 ERR("[notification-thread] Failed to reset client communication's inbound state");
2848 goto end;
2849 }
ab0ee2ca
JG
2850 break;
2851 }
2852 default:
2853 abort();
2854 }
2855end:
2856 return ret;
2857}
2858
2859/* Incoming data from client. */
2860int handle_notification_thread_client_in(
2861 struct notification_thread_state *state, int socket)
2862{
14fa22f8 2863 int ret = 0;
ab0ee2ca
JG
2864 struct notification_client *client;
2865 ssize_t recv_ret;
2866 size_t offset;
2867
2868 client = get_client_from_socket(socket, state);
2869 if (!client) {
2870 /* Internal error, abort. */
2871 ret = -1;
2872 goto end;
2873 }
2874
14fa22f8
JG
2875 offset = client->communication.inbound.buffer.size -
2876 client->communication.inbound.bytes_to_receive;
01ea340e 2877 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2878 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2879 client->communication.inbound.buffer.data + offset,
2880 client->communication.inbound.bytes_to_receive,
2881 &client->communication.inbound.creds);
2882 if (recv_ret > 0) {
01ea340e 2883 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2884 client->communication.inbound.creds_received = true;
2885 }
2886 } else {
2887 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2888 client->communication.inbound.buffer.data + offset,
2889 client->communication.inbound.bytes_to_receive);
2890 }
2891 if (recv_ret < 0) {
2892 goto error_disconnect_client;
2893 }
2894
2895 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2896 if (client->communication.inbound.bytes_to_receive == 0) {
2897 ret = client_dispatch_message(client, state);
2898 if (ret) {
2899 /*
2900 * Only returns an error if this client must be
2901 * disconnected.
2902 */
2903 goto error_disconnect_client;
2904 }
2905 } else {
2906 goto end;
2907 }
2908end:
2909 return ret;
2910error_disconnect_client:
2911 ret = handle_notification_thread_client_disconnect(socket, state);
2912 return ret;
2913}
2914
2915/* Client ready to receive outgoing data. */
2916int handle_notification_thread_client_out(
2917 struct notification_thread_state *state, int socket)
2918{
2919 int ret;
2920 struct notification_client *client;
2921
2922 client = get_client_from_socket(socket, state);
2923 if (!client) {
2924 /* Internal error, abort. */
2925 ret = -1;
2926 goto end;
2927 }
2928
2929 ret = client_flush_outgoing_queue(client, state);
2930 if (ret) {
2931 goto end;
2932 }
2933end:
2934 return ret;
2935}
2936
2937static
e8360425
JD
2938bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2939 const struct channel_state_sample *sample,
2940 uint64_t buffer_capacity)
ab0ee2ca
JG
2941{
2942 bool result = false;
2943 uint64_t threshold;
2944 enum lttng_condition_type condition_type;
e8360425 2945 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2946 condition, struct lttng_condition_buffer_usage,
2947 parent);
2948
ab0ee2ca
JG
2949 if (use_condition->threshold_bytes.set) {
2950 threshold = use_condition->threshold_bytes.value;
2951 } else {
2952 /*
2953 * Threshold was expressed as a ratio.
2954 *
2955 * TODO the threshold (in bytes) of conditions expressed
2956 * as a ratio of total buffer size could be cached to
2957 * forego this double-multiplication or it could be performed
2958 * as fixed-point math.
2959 *
2960 * Note that caching should accomodate the case where the
2961 * condition applies to multiple channels (i.e. don't assume
2962 * that all channels matching my_chann* have the same size...)
2963 */
2964 threshold = (uint64_t) (use_condition->threshold_ratio.value *
2965 (double) buffer_capacity);
2966 }
2967
2968 condition_type = lttng_condition_get_type(condition);
2969 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
2970 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2971 threshold, sample->highest_usage);
2972
2973 /*
2974 * The low condition should only be triggered once _all_ of the
2975 * streams in a channel have gone below the "low" threshold.
2976 */
2977 if (sample->highest_usage <= threshold) {
2978 result = true;
2979 }
2980 } else {
2981 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
2982 threshold, sample->highest_usage);
2983
2984 /*
2985 * For high buffer usage scenarios, we want to trigger whenever
2986 * _any_ of the streams has reached the "high" threshold.
2987 */
2988 if (sample->highest_usage >= threshold) {
2989 result = true;
2990 }
2991 }
e8360425 2992
ab0ee2ca
JG
2993 return result;
2994}
2995
2996static
e8360425
JD
2997bool evaluate_session_consumed_size_condition(
2998 const struct lttng_condition *condition,
2999 uint64_t session_consumed_size)
3000{
3001 uint64_t threshold;
3002 const struct lttng_condition_session_consumed_size *size_condition =
3003 container_of(condition,
3004 struct lttng_condition_session_consumed_size,
3005 parent);
3006
3007 threshold = size_condition->consumed_threshold_bytes.value;
3008 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3009 threshold, session_consumed_size);
3010 return session_consumed_size >= threshold;
3011}
3012
3013static
ed327204 3014int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3015 struct lttng_evaluation **evaluation,
e8360425
JD
3016 const struct notification_thread_state *state,
3017 const struct channel_state_sample *previous_sample,
3018 const struct channel_state_sample *latest_sample,
3019 uint64_t previous_session_consumed_total,
3020 uint64_t latest_session_consumed_total,
3021 struct channel_info *channel_info)
ab0ee2ca
JG
3022{
3023 int ret = 0;
3024 enum lttng_condition_type condition_type;
e8360425
JD
3025 const bool previous_sample_available = !!previous_sample;
3026 bool previous_sample_result = false;
ab0ee2ca
JG
3027 bool latest_sample_result;
3028
3029 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3030
e8360425
JD
3031 switch (condition_type) {
3032 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3033 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3034 if (caa_likely(previous_sample_available)) {
3035 previous_sample_result =
3036 evaluate_buffer_usage_condition(condition,
3037 previous_sample, channel_info->capacity);
3038 }
3039 latest_sample_result = evaluate_buffer_usage_condition(
3040 condition, latest_sample,
3041 channel_info->capacity);
3042 break;
3043 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3044 if (caa_likely(previous_sample_available)) {
3045 previous_sample_result =
3046 evaluate_session_consumed_size_condition(
3047 condition,
3048 previous_session_consumed_total);
3049 }
3050 latest_sample_result =
3051 evaluate_session_consumed_size_condition(
3052 condition,
3053 latest_session_consumed_total);
3054 break;
3055 default:
3056 /* Unknown condition type; internal error. */
3057 abort();
3058 }
ab0ee2ca
JG
3059
3060 if (!latest_sample_result ||
3061 (previous_sample_result == latest_sample_result)) {
3062 /*
3063 * Only trigger on a condition evaluation transition.
3064 *
3065 * NOTE: This edge-triggered logic may not be appropriate for
3066 * future condition types.
3067 */
3068 goto end;
3069 }
3070
e8360425
JD
3071 if (!evaluation || !latest_sample_result) {
3072 goto end;
3073 }
3074
3075 switch (condition_type) {
3076 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3077 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3078 *evaluation = lttng_evaluation_buffer_usage_create(
3079 condition_type,
3080 latest_sample->highest_usage,
e8360425
JD
3081 channel_info->capacity);
3082 break;
3083 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3084 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3085 latest_session_consumed_total);
3086 break;
3087 default:
3088 abort();
3089 }
3090
3091 if (!*evaluation) {
3092 ret = -1;
3093 goto end;
ab0ee2ca
JG
3094 }
3095end:
3096 return ret;
3097}
3098
3099static
3100int client_enqueue_dropped_notification(struct notification_client *client,
3101 struct notification_thread_state *state)
3102{
3103 int ret;
3104 struct lttng_notification_channel_message msg = {
3105 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3106 .size = 0,
3107 };
3108
3109 ret = lttng_dynamic_buffer_append(
3110 &client->communication.outbound.buffer, &msg,
3111 sizeof(msg));
3112 return ret;
3113}
3114
3115static
9b63a4aa
JG
3116int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3117 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
3118 struct notification_client_list* client_list,
3119 struct notification_thread_state *state,
3120 uid_t channel_uid, gid_t channel_gid)
3121{
3122 int ret = 0;
3123 struct lttng_dynamic_buffer msg_buffer;
3124 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
3125 const struct lttng_notification notification = {
3126 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3127 .evaluation = (struct lttng_evaluation *) evaluation,
3128 };
3647288f
JG
3129 struct lttng_notification_channel_message msg_header = {
3130 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3131 };
ab0ee2ca
JG
3132
3133 lttng_dynamic_buffer_init(&msg_buffer);
3134
3647288f
JG
3135 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
3136 sizeof(msg_header));
ab0ee2ca
JG
3137 if (ret) {
3138 goto end;
3139 }
3140
9b63a4aa 3141 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 3142 if (ret) {
ab0ee2ca
JG
3143 ERR("[notification-thread] Failed to serialize notification");
3144 ret = -1;
3145 goto end;
3146 }
3147
3647288f
JG
3148 /* Update payload size. */
3149 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
3150 (uint32_t) (msg_buffer.size - sizeof(msg_header));
3151
ab0ee2ca
JG
3152 cds_list_for_each_entry_safe(client_list_element, tmp,
3153 &client_list->list, node) {
3154 struct notification_client *client =
3155 client_list_element->client;
3156
3157 if (client->uid != channel_uid && client->gid != channel_gid &&
3158 client->uid != 0) {
3159 /* Client is not allowed to monitor this channel. */
3160 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this channel");
3161 continue;
3162 }
3163
3164 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3165 client->socket, msg_buffer.size);
3166 if (client->communication.outbound.buffer.size) {
3167 /*
3168 * Outgoing data is already buffered for this client;
3169 * drop the notification and enqueue a "dropped
3170 * notification" message if this is the first dropped
3171 * notification since the socket spilled-over to the
3172 * queue.
3173 */
3174 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3175 client->socket);
3176 if (!client->communication.outbound.dropped_notification) {
3177 client->communication.outbound.dropped_notification = true;
3178 ret = client_enqueue_dropped_notification(
3179 client, state);
3180 if (ret) {
3181 goto end;
3182 }
3183 }
3184 continue;
3185 }
3186
3187 ret = lttng_dynamic_buffer_append_buffer(
3188 &client->communication.outbound.buffer,
3189 &msg_buffer);
3190 if (ret) {
3191 goto end;
3192 }
3193
3194 ret = client_flush_outgoing_queue(client, state);
3195 if (ret) {
3196 goto end;
3197 }
3198 }
3199 ret = 0;
3200end:
ab0ee2ca
JG
3201 lttng_dynamic_buffer_reset(&msg_buffer);
3202 return ret;
3203}
3204
3205int handle_notification_thread_channel_sample(
3206 struct notification_thread_state *state, int pipe,
3207 enum lttng_domain_type domain)
3208{
3209 int ret = 0;
3210 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
3211 struct channel_info *channel_info;
3212 struct cds_lfht_node *node;
3213 struct cds_lfht_iter iter;
3214 struct lttng_channel_trigger_list *trigger_list;
3215 struct lttng_trigger_list_element *trigger_list_element;
3216 bool previous_sample_available = false;
e8360425
JD
3217 struct channel_state_sample previous_sample, latest_sample;
3218 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
3219
3220 /*
3221 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3222 * ensuring that read/write of sampling messages are atomic.
3223 */
7c2551ef 3224 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
3225 if (ret != sizeof(sample_msg)) {
3226 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3227 pipe);
3228 ret = -1;
3229 goto end;
3230 }
3231
3232 ret = 0;
3233 latest_sample.key.key = sample_msg.key;
3234 latest_sample.key.domain = domain;
3235 latest_sample.highest_usage = sample_msg.highest;
3236 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 3237 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
3238
3239 rcu_read_lock();
3240
3241 /* Retrieve the channel's informations */
3242 cds_lfht_lookup(state->channels_ht,
3243 hash_channel_key(&latest_sample.key),
3244 match_channel_info,
3245 &latest_sample.key,
3246 &iter);
3247 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3248 if (caa_unlikely(!node)) {
ab0ee2ca
JG
3249 /*
3250 * Not an error since the consumer can push a sample to the pipe
3251 * and the rest of the session daemon could notify us of the
3252 * channel's destruction before we get a chance to process that
3253 * sample.
3254 */
3255 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3256 latest_sample.key.key,
3257 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3258 "user space");
3259 goto end_unlock;
3260 }
3261 channel_info = caa_container_of(node, struct channel_info,
3262 channels_ht_node);
e8360425 3263 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 3264 channel_info->name,
ab0ee2ca 3265 latest_sample.key.key,
8abe313a 3266 channel_info->session_info->name,
ab0ee2ca 3267 latest_sample.highest_usage,
e8360425
JD
3268 latest_sample.lowest_usage,
3269 latest_sample.channel_total_consumed);
3270
3271 previous_session_consumed_total =
3272 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
3273
3274 /* Retrieve the channel's last sample, if it exists, and update it. */
3275 cds_lfht_lookup(state->channel_state_ht,
3276 hash_channel_key(&latest_sample.key),
3277 match_channel_state_sample,
3278 &latest_sample.key,
3279 &iter);
3280 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3281 if (caa_likely(node)) {
ab0ee2ca
JG
3282 struct channel_state_sample *stored_sample;
3283
3284 /* Update the sample stored. */
3285 stored_sample = caa_container_of(node,
3286 struct channel_state_sample,
3287 channel_state_ht_node);
e8360425 3288
ab0ee2ca
JG
3289 memcpy(&previous_sample, stored_sample,
3290 sizeof(previous_sample));
3291 stored_sample->highest_usage = latest_sample.highest_usage;
3292 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3293 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3294 previous_sample_available = true;
e8360425
JD
3295
3296 latest_session_consumed_total =
3297 previous_session_consumed_total +
3298 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3299 } else {
3300 /*
3301 * This is the channel's first sample, allocate space for and
3302 * store the new sample.
3303 */
3304 struct channel_state_sample *stored_sample;
3305
3306 stored_sample = zmalloc(sizeof(*stored_sample));
3307 if (!stored_sample) {
3308 ret = -1;
3309 goto end_unlock;
3310 }
3311
3312 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3313 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3314 cds_lfht_add(state->channel_state_ht,
3315 hash_channel_key(&stored_sample->key),
3316 &stored_sample->channel_state_ht_node);
e8360425
JD
3317
3318 latest_session_consumed_total =
3319 previous_session_consumed_total +
3320 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3321 }
3322
e8360425
JD
3323 channel_info->session_info->consumed_data_size =
3324 latest_session_consumed_total;
3325
ab0ee2ca
JG
3326 /* Find triggers associated with this channel. */
3327 cds_lfht_lookup(state->channel_triggers_ht,
3328 hash_channel_key(&latest_sample.key),
3329 match_channel_trigger_list,
3330 &latest_sample.key,
3331 &iter);
3332 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3333 if (caa_likely(!node)) {
ab0ee2ca
JG
3334 goto end_unlock;
3335 }
3336
3337 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3338 channel_triggers_ht_node);
3339 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
3340 node) {
9b63a4aa
JG
3341 const struct lttng_condition *condition;