sessiond: notification: maintain an id to notification_client ht
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <urcu.h>
10#include <urcu/rculfhash.h>
11
ab0ee2ca
JG
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/futex.h>
15#include <common/unix.h>
16#include <common/dynamic-buffer.h>
17#include <common/hashtable/utils.h>
18#include <common/sessiond-comm/sessiond-comm.h>
19#include <common/macros.h>
20#include <lttng/condition/condition.h>
9b63a4aa 21#include <lttng/action/action-internal.h>
ab0ee2ca
JG
22#include <lttng/notification/notification-internal.h>
23#include <lttng/condition/condition-internal.h>
24#include <lttng/condition/buffer-usage-internal.h>
e8360425 25#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 26#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 27#include <lttng/notification/channel-internal.h>
1da26331 28
ab0ee2ca
JG
29#include <time.h>
30#include <unistd.h>
31#include <assert.h>
32#include <inttypes.h>
d53ea4e4 33#include <fcntl.h>
ab0ee2ca 34
1da26331
JG
35#include "notification-thread.h"
36#include "notification-thread-events.h"
37#include "notification-thread-commands.h"
38#include "lttng-sessiond.h"
39#include "kernel.h"
40
ab0ee2ca
JG
41#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
51eab943
JG
44enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49};
50
ab0ee2ca 51struct lttng_trigger_list_element {
9e0bb80e
JG
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
ab0ee2ca
JG
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
ea9a44f0 59 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 60 struct cds_list_head list;
ea9a44f0 61 /* Node in the channel_triggers_ht */
ab0ee2ca 62 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
ab0ee2ca
JG
65};
66
ea9a44f0
JG
67/*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106};
107
ab0ee2ca
JG
108struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
83b934ad
MD
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
ab0ee2ca
JG
113};
114
115struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118};
119
120struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123};
124
125struct notification_client_list {
f82f93a1 126 const struct lttng_trigger *trigger;
ab0ee2ca
JG
127 struct cds_list_head list;
128 struct cds_lfht_node notification_trigger_ht_node;
83b934ad
MD
129 /* call_rcu delayed reclaim. */
130 struct rcu_head rcu_node;
ab0ee2ca
JG
131};
132
133struct notification_client {
ac1889bf 134 notification_client_id id;
ab0ee2ca
JG
135 int socket;
136 /* Client protocol version. */
137 uint8_t major, minor;
138 uid_t uid;
139 gid_t gid;
140 /*
5332364f 141 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
142 * checked.
143 */
144 bool validated;
145 /*
146 * Conditions to which the client's notification channel is subscribed.
147 * List of struct lttng_condition_list_node. The condition member is
148 * owned by the client.
149 */
150 struct cds_list_head condition_list;
151 struct cds_lfht_node client_socket_ht_node;
ac1889bf 152 struct cds_lfht_node client_id_ht_node;
ab0ee2ca
JG
153 struct {
154 struct {
14fa22f8
JG
155 /*
156 * During the reception of a message, the reception
157 * buffers' "size" is set to contain the current
158 * message's complete payload.
159 */
ab0ee2ca
JG
160 struct lttng_dynamic_buffer buffer;
161 /* Bytes left to receive for the current message. */
162 size_t bytes_to_receive;
163 /* Type of the message being received. */
164 enum lttng_notification_channel_message_type msg_type;
165 /*
166 * Indicates whether or not credentials are expected
167 * from the client.
168 */
01ea340e 169 bool expect_creds;
ab0ee2ca
JG
170 /*
171 * Indicates whether or not credentials were received
172 * from the client.
173 */
174 bool creds_received;
14fa22f8 175 /* Only used during credentials reception. */
ab0ee2ca
JG
176 lttng_sock_cred creds;
177 } inbound;
178 struct {
179 /*
180 * Indicates whether or not a notification addressed to
181 * this client was dropped because a command reply was
182 * already buffered.
183 *
184 * A notification is dropped whenever the buffer is not
185 * empty.
186 */
187 bool dropped_notification;
188 /*
189 * Indicates whether or not a command reply is already
190 * buffered. In this case, it means that the client is
191 * not consuming command replies before emitting a new
192 * one. This could be caused by a protocol error or a
193 * misbehaving/malicious client.
194 */
195 bool queued_command_reply;
196 struct lttng_dynamic_buffer buffer;
197 } outbound;
198 } communication;
83b934ad
MD
199 /* call_rcu delayed reclaim. */
200 struct rcu_head rcu_node;
ab0ee2ca
JG
201};
202
203struct channel_state_sample {
204 struct channel_key key;
205 struct cds_lfht_node channel_state_ht_node;
206 uint64_t highest_usage;
207 uint64_t lowest_usage;
e8360425 208 uint64_t channel_total_consumed;
83b934ad
MD
209 /* call_rcu delayed reclaim. */
210 struct rcu_head rcu_node;
ab0ee2ca
JG
211};
212
e4db5ace 213static unsigned long hash_channel_key(struct channel_key *key);
ed327204 214static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 215 struct lttng_evaluation **evaluation,
e8360425
JD
216 const struct notification_thread_state *state,
217 const struct channel_state_sample *previous_sample,
218 const struct channel_state_sample *latest_sample,
219 uint64_t previous_session_consumed_total,
220 uint64_t latest_session_consumed_total,
221 struct channel_info *channel_info);
e4db5ace 222static
9b63a4aa
JG
223int send_evaluation_to_clients(const struct lttng_trigger *trigger,
224 const struct lttng_evaluation *evaluation,
e4db5ace
JR
225 struct notification_client_list *client_list,
226 struct notification_thread_state *state,
227 uid_t channel_uid, gid_t channel_gid);
228
8abe313a 229
ea9a44f0 230/* session_info API */
8abe313a
JG
231static
232void session_info_destroy(void *_data);
233static
234void session_info_get(struct session_info *session_info);
235static
236void session_info_put(struct session_info *session_info);
237static
238struct session_info *session_info_create(const char *name,
ea9a44f0 239 uid_t uid, gid_t gid,
1eee26c5
JG
240 struct lttng_session_trigger_list *trigger_list,
241 struct cds_lfht *sessions_ht);
8abe313a
JG
242static
243void session_info_add_channel(struct session_info *session_info,
244 struct channel_info *channel_info);
245static
246void session_info_remove_channel(struct session_info *session_info,
247 struct channel_info *channel_info);
248
ea9a44f0
JG
249/* lttng_session_trigger_list API */
250static
251struct lttng_session_trigger_list *lttng_session_trigger_list_create(
252 const char *session_name,
253 struct cds_lfht *session_triggers_ht);
254static
255struct lttng_session_trigger_list *lttng_session_trigger_list_build(
256 const struct notification_thread_state *state,
257 const char *session_name);
258static
259void lttng_session_trigger_list_destroy(
260 struct lttng_session_trigger_list *list);
261static
262int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
263 const struct lttng_trigger *trigger);
264
265
ab0ee2ca 266static
ac1889bf 267int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
268{
269 /* This double-cast is intended to supress pointer-to-cast warning. */
ac1889bf
JG
270 const int socket = (int) (intptr_t) key;
271 const struct notification_client *client = caa_container_of(node,
272 struct notification_client, client_socket_ht_node);
ab0ee2ca 273
ac1889bf
JG
274 return client->socket == socket;
275}
276
277static
278int match_client_id(struct cds_lfht_node *node, const void *key)
279{
280 /* This double-cast is intended to supress pointer-to-cast warning. */
281 const notification_client_id id = *((notification_client_id *) key);
282 const struct notification_client *client = caa_container_of(
283 node, struct notification_client, client_id_ht_node);
ab0ee2ca 284
ac1889bf 285 return client->id == id;
ab0ee2ca
JG
286}
287
288static
289int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
290{
291 struct channel_key *channel_key = (struct channel_key *) key;
292 struct lttng_channel_trigger_list *trigger_list;
293
294 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
295 channel_triggers_ht_node);
296
297 return !!((channel_key->key == trigger_list->channel_key.key) &&
298 (channel_key->domain == trigger_list->channel_key.domain));
299}
300
51eab943
JG
301static
302int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
303{
304 const char *session_name = (const char *) key;
305 struct lttng_session_trigger_list *trigger_list;
306
307 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
308 session_triggers_ht_node);
309
310 return !!(strcmp(trigger_list->session_name, session_name) == 0);
311}
312
ab0ee2ca
JG
313static
314int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
315{
316 struct channel_key *channel_key = (struct channel_key *) key;
317 struct channel_state_sample *sample;
318
319 sample = caa_container_of(node, struct channel_state_sample,
320 channel_state_ht_node);
321
322 return !!((channel_key->key == sample->key.key) &&
323 (channel_key->domain == sample->key.domain));
324}
325
326static
327int match_channel_info(struct cds_lfht_node *node, const void *key)
328{
329 struct channel_key *channel_key = (struct channel_key *) key;
330 struct channel_info *channel_info;
331
332 channel_info = caa_container_of(node, struct channel_info,
333 channels_ht_node);
334
335 return !!((channel_key->key == channel_info->key.key) &&
336 (channel_key->domain == channel_info->key.domain));
337}
338
339static
340int match_condition(struct cds_lfht_node *node, const void *key)
341{
342 struct lttng_condition *condition_key = (struct lttng_condition *) key;
343 struct lttng_trigger_ht_element *trigger;
344 struct lttng_condition *condition;
345
346 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
347 node);
348 condition = lttng_trigger_get_condition(trigger->trigger);
349 assert(condition);
350
351 return !!lttng_condition_is_equal(condition_key, condition);
352}
353
ab0ee2ca
JG
354static
355int match_client_list_condition(struct cds_lfht_node *node, const void *key)
356{
357 struct lttng_condition *condition_key = (struct lttng_condition *) key;
358 struct notification_client_list *client_list;
f82f93a1 359 const struct lttng_condition *condition;
ab0ee2ca
JG
360
361 assert(condition_key);
362
363 client_list = caa_container_of(node, struct notification_client_list,
364 notification_trigger_ht_node);
f82f93a1 365 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
366
367 return !!lttng_condition_is_equal(condition_key, condition);
368}
369
f82f93a1
JG
370static
371int match_session(struct cds_lfht_node *node, const void *key)
372{
373 const char *name = key;
374 struct session_info *session_info = caa_container_of(
375 node, struct session_info, sessions_ht_node);
376
377 return !strcmp(session_info->name, name);
378}
379
ab0ee2ca
JG
380static
381unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 382 const struct lttng_condition *_condition)
ab0ee2ca 383{
9a2746aa
JG
384 unsigned long hash;
385 unsigned long condition_type;
ab0ee2ca
JG
386 struct lttng_condition_buffer_usage *condition;
387
388 condition = container_of(_condition,
389 struct lttng_condition_buffer_usage, parent);
390
9a2746aa
JG
391 condition_type = (unsigned long) condition->parent.type;
392 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
393 if (condition->session_name) {
394 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
395 }
396 if (condition->channel_name) {
8f56701f 397 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
398 }
399 if (condition->domain.set) {
400 hash ^= hash_key_ulong(
401 (void *) condition->domain.type,
402 lttng_ht_seed);
403 }
404 if (condition->threshold_ratio.set) {
405 uint64_t val;
406
407 val = condition->threshold_ratio.value * (double) UINT32_MAX;
408 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 409 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
410 uint64_t val;
411
412 val = condition->threshold_bytes.value;
413 hash ^= hash_key_u64(&val, lttng_ht_seed);
414 }
415 return hash;
416}
417
e8360425
JD
418static
419unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 420 const struct lttng_condition *_condition)
e8360425 421{
9a2746aa
JG
422 unsigned long hash;
423 unsigned long condition_type =
424 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
425 struct lttng_condition_session_consumed_size *condition;
426 uint64_t val;
427
428 condition = container_of(_condition,
429 struct lttng_condition_session_consumed_size, parent);
430
9a2746aa 431 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
432 if (condition->session_name) {
433 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
434 }
435 val = condition->consumed_threshold_bytes.value;
436 hash ^= hash_key_u64(&val, lttng_ht_seed);
437 return hash;
438}
439
a8393880
JG
440static
441unsigned long lttng_condition_session_rotation_hash(
442 const struct lttng_condition *_condition)
443{
444 unsigned long hash, condition_type;
445 struct lttng_condition_session_rotation *condition;
446
447 condition = container_of(_condition,
448 struct lttng_condition_session_rotation, parent);
449 condition_type = (unsigned long) condition->parent.type;
450 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
451 assert(condition->session_name);
452 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
453 return hash;
454}
9a2746aa 455
ab0ee2ca
JG
456/*
457 * The lttng_condition hashing code is kept in this file (rather than
458 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
459 * don't want to link in liblttng-ctl.
460 */
461static
9b63a4aa 462unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
463{
464 switch (condition->type) {
465 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
466 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
467 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
468 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
469 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
470 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
471 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
472 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
473 default:
474 ERR("[notification-thread] Unexpected condition type caught");
475 abort();
476 }
477}
478
e4db5ace
JR
479static
480unsigned long hash_channel_key(struct channel_key *key)
481{
482 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
483 unsigned long domain_hash = hash_key_ulong(
484 (void *) (unsigned long) key->domain, lttng_ht_seed);
485
486 return key_hash ^ domain_hash;
487}
488
ac1889bf
JG
489static
490unsigned long hash_client_socket(int socket)
491{
492 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
493}
494
495static
496unsigned long hash_client_id(notification_client_id id)
497{
498 return hash_key_u64(&id, lttng_ht_seed);
499}
500
f82f93a1
JG
501/*
502 * Get the type of object to which a given condition applies. Bindings let
503 * the notification system evaluate a trigger's condition when a given
504 * object's state is updated.
505 *
506 * For instance, a condition bound to a channel will be evaluated everytime
507 * the channel's state is changed by a channel monitoring sample.
508 */
509static
510enum lttng_object_type get_condition_binding_object(
511 const struct lttng_condition *condition)
512{
513 switch (lttng_condition_get_type(condition)) {
514 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
515 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
516 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
e742b055 517 return LTTNG_OBJECT_TYPE_CHANNEL;
f82f93a1
JG
518 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
519 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
520 return LTTNG_OBJECT_TYPE_SESSION;
521 default:
522 return LTTNG_OBJECT_TYPE_UNKNOWN;
523 }
524}
525
83b934ad
MD
526static
527void free_channel_info_rcu(struct rcu_head *node)
528{
529 free(caa_container_of(node, struct channel_info, rcu_node));
530}
531
ab0ee2ca
JG
532static
533void channel_info_destroy(struct channel_info *channel_info)
534{
535 if (!channel_info) {
536 return;
537 }
538
8abe313a
JG
539 if (channel_info->session_info) {
540 session_info_remove_channel(channel_info->session_info,
541 channel_info);
542 session_info_put(channel_info->session_info);
ab0ee2ca 543 }
8abe313a
JG
544 if (channel_info->name) {
545 free(channel_info->name);
ab0ee2ca 546 }
83b934ad
MD
547 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
548}
549
550static
551void free_session_info_rcu(struct rcu_head *node)
552{
553 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
554}
555
8abe313a 556/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 557static
8abe313a 558void session_info_destroy(void *_data)
ab0ee2ca 559{
8abe313a 560 struct session_info *session_info = _data;
3b68d0a3 561 int ret;
ab0ee2ca 562
8abe313a
JG
563 assert(session_info);
564 if (session_info->channel_infos_ht) {
3b68d0a3
JR
565 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
566 if (ret) {
4c3f302b 567 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 568 }
8abe313a 569 }
ea9a44f0 570 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
571
572 rcu_read_lock();
573 cds_lfht_del(session_info->sessions_ht,
574 &session_info->sessions_ht_node);
575 rcu_read_unlock();
8abe313a 576 free(session_info->name);
83b934ad 577 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 578}
ab0ee2ca 579
8abe313a
JG
580static
581void session_info_get(struct session_info *session_info)
582{
583 if (!session_info) {
584 return;
585 }
586 lttng_ref_get(&session_info->ref);
587}
588
589static
590void session_info_put(struct session_info *session_info)
591{
592 if (!session_info) {
593 return;
ab0ee2ca 594 }
8abe313a
JG
595 lttng_ref_put(&session_info->ref);
596}
597
598static
ea9a44f0 599struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
600 struct lttng_session_trigger_list *trigger_list,
601 struct cds_lfht *sessions_ht)
8abe313a
JG
602{
603 struct session_info *session_info;
ab0ee2ca 604
8abe313a 605 assert(name);
ab0ee2ca 606
8abe313a
JG
607 session_info = zmalloc(sizeof(*session_info));
608 if (!session_info) {
609 goto end;
610 }
611 lttng_ref_init(&session_info->ref, session_info_destroy);
612
613 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
614 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
615 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
616 goto error;
617 }
8abe313a
JG
618
619 cds_lfht_node_init(&session_info->sessions_ht_node);
620 session_info->name = strdup(name);
621 if (!session_info->name) {
ab0ee2ca
JG
622 goto error;
623 }
8abe313a
JG
624 session_info->uid = uid;
625 session_info->gid = gid;
ea9a44f0 626 session_info->trigger_list = trigger_list;
1eee26c5 627 session_info->sessions_ht = sessions_ht;
8abe313a
JG
628end:
629 return session_info;
630error:
631 session_info_put(session_info);
632 return NULL;
633}
634
635static
636void session_info_add_channel(struct session_info *session_info,
637 struct channel_info *channel_info)
638{
639 rcu_read_lock();
640 cds_lfht_add(session_info->channel_infos_ht,
641 hash_channel_key(&channel_info->key),
642 &channel_info->session_info_channels_ht_node);
643 rcu_read_unlock();
644}
645
646static
647void session_info_remove_channel(struct session_info *session_info,
648 struct channel_info *channel_info)
649{
650 rcu_read_lock();
651 cds_lfht_del(session_info->channel_infos_ht,
652 &channel_info->session_info_channels_ht_node);
653 rcu_read_unlock();
654}
655
656static
657struct channel_info *channel_info_create(const char *channel_name,
658 struct channel_key *channel_key, uint64_t channel_capacity,
659 struct session_info *session_info)
660{
661 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
662
663 if (!channel_info) {
664 goto end;
665 }
666
ab0ee2ca 667 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
668 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
669 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
670 channel_info->capacity = channel_capacity;
671
672 channel_info->name = strdup(channel_name);
673 if (!channel_info->name) {
674 goto error;
675 }
676
677 /*
678 * Set the references between session and channel infos:
679 * - channel_info holds a strong reference to session_info
680 * - session_info holds a weak reference to channel_info
681 */
682 session_info_get(session_info);
683 session_info_add_channel(session_info, channel_info);
684 channel_info->session_info = session_info;
ab0ee2ca 685end:
8abe313a 686 return channel_info;
ab0ee2ca 687error:
8abe313a 688 channel_info_destroy(channel_info);
ab0ee2ca
JG
689 return NULL;
690}
691
ed327204
JG
692/* RCU read lock must be held by the caller. */
693static
694struct notification_client_list *get_client_list_from_condition(
695 struct notification_thread_state *state,
696 const struct lttng_condition *condition)
697{
698 struct cds_lfht_node *node;
699 struct cds_lfht_iter iter;
700
701 cds_lfht_lookup(state->notification_trigger_clients_ht,
702 lttng_condition_hash(condition),
703 match_client_list_condition,
704 condition,
705 &iter);
706 node = cds_lfht_iter_get_node(&iter);
707
e742b055 708 return node ? caa_container_of(node,
ed327204
JG
709 struct notification_client_list,
710 notification_trigger_ht_node) : NULL;
711}
712
e4db5ace
JR
713/* This function must be called with the RCU read lock held. */
714static
f82f93a1
JG
715int evaluate_channel_condition_for_client(
716 const struct lttng_condition *condition,
717 struct notification_thread_state *state,
718 struct lttng_evaluation **evaluation,
719 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
720{
721 int ret;
722 struct cds_lfht_iter iter;
723 struct cds_lfht_node *node;
724 struct channel_info *channel_info = NULL;
725 struct channel_key *channel_key = NULL;
726 struct channel_state_sample *last_sample = NULL;
727 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 728
f82f93a1 729 /* Find the channel associated with the condition. */
e4db5ace 730 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 731 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
732 struct lttng_trigger_list_element *element;
733
734 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 735 const struct lttng_condition *current_condition =
f82f93a1 736 lttng_trigger_get_const_condition(
e4db5ace
JR
737 element->trigger);
738
739 assert(current_condition);
740 if (!lttng_condition_is_equal(condition,
2ae99f0b 741 current_condition)) {
e4db5ace
JR
742 continue;
743 }
744
745 /* Found the trigger, save the channel key. */
746 channel_key = &channel_trigger_list->channel_key;
747 break;
748 }
749 if (channel_key) {
750 /* The channel key was found stop iteration. */
751 break;
752 }
753 }
754
755 if (!channel_key){
756 /* No channel found; normal exit. */
f82f93a1 757 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
758 ret = 0;
759 goto end;
760 }
761
762 /* Fetch channel info for the matching channel. */
763 cds_lfht_lookup(state->channels_ht,
764 hash_channel_key(channel_key),
765 match_channel_info,
766 channel_key,
767 &iter);
768 node = cds_lfht_iter_get_node(&iter);
769 assert(node);
770 channel_info = caa_container_of(node, struct channel_info,
771 channels_ht_node);
772
773 /* Retrieve the channel's last sample, if it exists. */
774 cds_lfht_lookup(state->channel_state_ht,
775 hash_channel_key(channel_key),
776 match_channel_state_sample,
777 channel_key,
778 &iter);
779 node = cds_lfht_iter_get_node(&iter);
780 if (node) {
781 last_sample = caa_container_of(node,
782 struct channel_state_sample,
783 channel_state_ht_node);
784 } else {
785 /* Nothing to evaluate, no sample was ever taken. Normal exit */
786 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
787 ret = 0;
788 goto end;
789 }
790
f82f93a1 791 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
792 NULL, last_sample,
793 0, channel_info->session_info->consumed_data_size,
794 channel_info);
e4db5ace 795 if (ret) {
066a3c82 796 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
797 goto end;
798 }
799
f82f93a1
JG
800 *session_uid = channel_info->session_info->uid;
801 *session_gid = channel_info->session_info->gid;
802end:
803 return ret;
804}
805
806static
807const char *get_condition_session_name(const struct lttng_condition *condition)
808{
809 const char *session_name = NULL;
810 enum lttng_condition_status status;
811
812 switch (lttng_condition_get_type(condition)) {
813 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
814 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
815 status = lttng_condition_buffer_usage_get_session_name(
816 condition, &session_name);
817 break;
818 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
819 status = lttng_condition_session_consumed_size_get_session_name(
820 condition, &session_name);
821 break;
822 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
823 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
824 status = lttng_condition_session_rotation_get_session_name(
825 condition, &session_name);
826 break;
827 default:
828 abort();
829 }
830 if (status != LTTNG_CONDITION_STATUS_OK) {
831 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
832 goto end;
833 }
834end:
835 return session_name;
836}
837
838/* This function must be called with the RCU read lock held. */
839static
840int evaluate_session_condition_for_client(
841 const struct lttng_condition *condition,
842 struct notification_thread_state *state,
843 struct lttng_evaluation **evaluation,
844 uid_t *session_uid, gid_t *session_gid)
845{
846 int ret;
847 struct cds_lfht_iter iter;
848 struct cds_lfht_node *node;
849 const char *session_name;
850 struct session_info *session_info = NULL;
851
852 session_name = get_condition_session_name(condition);
853
854 /* Find the session associated with the trigger. */
855 cds_lfht_lookup(state->sessions_ht,
856 hash_key_str(session_name, lttng_ht_seed),
857 match_session,
858 session_name,
859 &iter);
860 node = cds_lfht_iter_get_node(&iter);
861 if (!node) {
862 DBG("[notification-thread] No known session matching name \"%s\"",
863 session_name);
864 ret = 0;
865 goto end;
866 }
867
868 session_info = caa_container_of(node, struct session_info,
869 sessions_ht_node);
870 session_info_get(session_info);
871
872 /*
873 * Evaluation is performed in-line here since only one type of
874 * session-bound condition is handled for the moment.
875 */
876 switch (lttng_condition_get_type(condition)) {
877 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
878 if (!session_info->rotation.ongoing) {
be558f88 879 ret = 0;
f82f93a1
JG
880 goto end_session_put;
881 }
882
883 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
884 session_info->rotation.id);
885 if (!*evaluation) {
886 /* Fatal error. */
887 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
888 session_info->name);
889 ret = -1;
890 goto end_session_put;
891 }
892 ret = 0;
893 break;
894 default:
895 ret = 0;
896 goto end_session_put;
897 }
898
899 *session_uid = session_info->uid;
900 *session_gid = session_info->gid;
901
902end_session_put:
903 session_info_put(session_info);
904end:
905 return ret;
906}
907
908/* This function must be called with the RCU read lock held. */
909static
910int evaluate_condition_for_client(const struct lttng_trigger *trigger,
911 const struct lttng_condition *condition,
912 struct notification_client *client,
913 struct notification_thread_state *state)
914{
915 int ret;
916 struct lttng_evaluation *evaluation = NULL;
917 struct notification_client_list client_list = { 0 };
918 struct notification_client_list_element client_list_element = { 0 };
919 uid_t object_uid = 0;
920 gid_t object_gid = 0;
921
922 assert(trigger);
923 assert(condition);
924 assert(client);
925 assert(state);
926
927 switch (get_condition_binding_object(condition)) {
928 case LTTNG_OBJECT_TYPE_SESSION:
929 ret = evaluate_session_condition_for_client(condition, state,
930 &evaluation, &object_uid, &object_gid);
931 break;
932 case LTTNG_OBJECT_TYPE_CHANNEL:
933 ret = evaluate_channel_condition_for_client(condition, state,
934 &evaluation, &object_uid, &object_gid);
935 break;
936 case LTTNG_OBJECT_TYPE_NONE:
937 ret = 0;
938 goto end;
939 case LTTNG_OBJECT_TYPE_UNKNOWN:
940 default:
941 ret = -1;
942 goto end;
943 }
af0c318d
JG
944 if (ret) {
945 /* Fatal error. */
946 goto end;
947 }
e4db5ace
JR
948 if (!evaluation) {
949 /* Evaluation yielded nothing. Normal exit. */
950 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
951 ret = 0;
952 goto end;
953 }
954
955 /*
956 * Create a temporary client list with the client currently
957 * subscribing.
958 */
959 cds_lfht_node_init(&client_list.notification_trigger_ht_node);
960 CDS_INIT_LIST_HEAD(&client_list.list);
961 client_list.trigger = trigger;
962
963 CDS_INIT_LIST_HEAD(&client_list_element.node);
964 client_list_element.client = client;
965 cds_list_add(&client_list_element.node, &client_list.list);
966
967 /* Send evaluation result to the newly-subscribed client. */
968 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
969 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 970 state, object_uid, object_gid);
e4db5ace
JR
971
972end:
973 return ret;
974}
975
ab0ee2ca
JG
976static
977int notification_thread_client_subscribe(struct notification_client *client,
978 struct lttng_condition *condition,
979 struct notification_thread_state *state,
980 enum lttng_notification_channel_status *_status)
981{
982 int ret = 0;
ab0ee2ca
JG
983 struct notification_client_list *client_list;
984 struct lttng_condition_list_element *condition_list_element = NULL;
985 struct notification_client_list_element *client_list_element = NULL;
986 enum lttng_notification_channel_status status =
987 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
988
989 /*
990 * Ensure that the client has not already subscribed to this condition
991 * before.
992 */
993 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
994 if (lttng_condition_is_equal(condition_list_element->condition,
995 condition)) {
996 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
997 goto end;
998 }
999 }
1000
1001 condition_list_element = zmalloc(sizeof(*condition_list_element));
1002 if (!condition_list_element) {
1003 ret = -1;
1004 goto error;
1005 }
1006 client_list_element = zmalloc(sizeof(*client_list_element));
1007 if (!client_list_element) {
1008 ret = -1;
1009 goto error;
1010 }
1011
1012 rcu_read_lock();
1013
1014 /*
1015 * Add the newly-subscribed condition to the client's subscription list.
1016 */
1017 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1018 condition_list_element->condition = condition;
1019 cds_list_add(&condition_list_element->node, &client->condition_list);
1020
ed327204
JG
1021 client_list = get_client_list_from_condition(state, condition);
1022 if (!client_list) {
2ae99f0b
JG
1023 /*
1024 * No notification-emiting trigger registered with this
1025 * condition. We don't evaluate the condition right away
1026 * since this trigger is not registered yet.
1027 */
4fb43b68 1028 free(client_list_element);
ab0ee2ca
JG
1029 goto end_unlock;
1030 }
1031
2ae99f0b
JG
1032 /*
1033 * The condition to which the client just subscribed is evaluated
1034 * at this point so that conditions that are already TRUE result
1035 * in a notification being sent out.
1036 */
e4db5ace
JR
1037 if (evaluate_condition_for_client(client_list->trigger, condition,
1038 client, state)) {
1039 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1040 ret = -1;
4bd5b4af 1041 free(client_list_element);
e4db5ace
JR
1042 goto end_unlock;
1043 }
1044
1045 /*
1046 * Add the client to the list of clients interested in a given trigger
1047 * if a "notification" trigger with a corresponding condition was
1048 * added prior.
1049 */
ab0ee2ca
JG
1050 client_list_element->client = client;
1051 CDS_INIT_LIST_HEAD(&client_list_element->node);
1052 cds_list_add(&client_list_element->node, &client_list->list);
1053end_unlock:
1054 rcu_read_unlock();
1055end:
1056 if (_status) {
1057 *_status = status;
1058 }
1059 return ret;
1060error:
1061 free(condition_list_element);
1062 free(client_list_element);
1063 return ret;
1064}
1065
1066static
1067int notification_thread_client_unsubscribe(
1068 struct notification_client *client,
1069 struct lttng_condition *condition,
1070 struct notification_thread_state *state,
1071 enum lttng_notification_channel_status *_status)
1072{
ab0ee2ca
JG
1073 struct notification_client_list *client_list;
1074 struct lttng_condition_list_element *condition_list_element,
1075 *condition_tmp;
1076 struct notification_client_list_element *client_list_element,
1077 *client_tmp;
1078 bool condition_found = false;
1079 enum lttng_notification_channel_status status =
1080 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1081
1082 /* Remove the condition from the client's condition list. */
1083 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1084 &client->condition_list, node) {
1085 if (!lttng_condition_is_equal(condition_list_element->condition,
1086 condition)) {
1087 continue;
1088 }
1089
1090 cds_list_del(&condition_list_element->node);
1091 /*
1092 * The caller may be iterating on the client's conditions to
1093 * tear down a client's connection. In this case, the condition
1094 * will be destroyed at the end.
1095 */
1096 if (condition != condition_list_element->condition) {
1097 lttng_condition_destroy(
1098 condition_list_element->condition);
1099 }
1100 free(condition_list_element);
1101 condition_found = true;
1102 break;
1103 }
1104
1105 if (!condition_found) {
1106 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1107 goto end;
1108 }
1109
1110 /*
1111 * Remove the client from the list of clients interested the trigger
1112 * matching the condition.
1113 */
1114 rcu_read_lock();
ed327204
JG
1115 client_list = get_client_list_from_condition(state, condition);
1116 if (!client_list) {
ab0ee2ca
JG
1117 goto end_unlock;
1118 }
1119
ab0ee2ca
JG
1120 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1121 &client_list->list, node) {
1122 if (client_list_element->client->socket != client->socket) {
1123 continue;
1124 }
1125 cds_list_del(&client_list_element->node);
1126 free(client_list_element);
1127 break;
1128 }
1129end_unlock:
1130 rcu_read_unlock();
1131end:
1132 lttng_condition_destroy(condition);
1133 if (_status) {
1134 *_status = status;
1135 }
1136 return 0;
1137}
1138
83b934ad
MD
1139static
1140void free_notification_client_rcu(struct rcu_head *node)
1141{
1142 free(caa_container_of(node, struct notification_client, rcu_node));
1143}
1144
ab0ee2ca
JG
1145static
1146void notification_client_destroy(struct notification_client *client,
1147 struct notification_thread_state *state)
1148{
1149 struct lttng_condition_list_element *condition_list_element, *tmp;
1150
1151 if (!client) {
1152 return;
1153 }
1154
1155 /* Release all conditions to which the client was subscribed. */
1156 cds_list_for_each_entry_safe(condition_list_element, tmp,
1157 &client->condition_list, node) {
1158 (void) notification_thread_client_unsubscribe(client,
1159 condition_list_element->condition, state, NULL);
1160 }
1161
1162 if (client->socket >= 0) {
1163 (void) lttcomm_close_unix_sock(client->socket);
ac1889bf 1164 client->socket = -1;
ab0ee2ca
JG
1165 }
1166 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1167 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
83b934ad 1168 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1169}
1170
1171/*
1172 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1173 * client pointer).
1174 */
1175static
1176struct notification_client *get_client_from_socket(int socket,
1177 struct notification_thread_state *state)
1178{
1179 struct cds_lfht_iter iter;
1180 struct cds_lfht_node *node;
1181 struct notification_client *client = NULL;
1182
1183 cds_lfht_lookup(state->client_socket_ht,
ac1889bf
JG
1184 hash_client_socket(socket),
1185 match_client_socket,
ab0ee2ca
JG
1186 (void *) (unsigned long) socket,
1187 &iter);
1188 node = cds_lfht_iter_get_node(&iter);
1189 if (!node) {
1190 goto end;
1191 }
1192
1193 client = caa_container_of(node, struct notification_client,
1194 client_socket_ht_node);
1195end:
1196 return client;
1197}
1198
ac1889bf
JG
1199/*
1200 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1201 * client pointer).
1202 */
1203static
1204struct notification_client *get_client_from_id(notification_client_id id,
1205 struct notification_thread_state *state)
1206{
1207 struct cds_lfht_iter iter;
1208 struct cds_lfht_node *node;
1209 struct notification_client *client = NULL;
1210
1211 cds_lfht_lookup(state->client_id_ht,
1212 hash_client_id(id),
1213 match_client_id,
1214 &id,
1215 &iter);
1216 node = cds_lfht_iter_get_node(&iter);
1217 if (!node) {
1218 goto end;
1219 }
1220
1221 client = caa_container_of(node, struct notification_client,
1222 client_id_ht_node);
1223end:
1224 return client;
1225}
1226
ab0ee2ca 1227static
e8360425 1228bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1229 const struct lttng_condition *condition,
1230 const struct channel_info *channel_info)
ab0ee2ca
JG
1231{
1232 enum lttng_condition_status status;
e8360425
JD
1233 enum lttng_domain_type condition_domain;
1234 const char *condition_session_name = NULL;
1235 const char *condition_channel_name = NULL;
ab0ee2ca 1236
e8360425
JD
1237 status = lttng_condition_buffer_usage_get_domain_type(condition,
1238 &condition_domain);
1239 assert(status == LTTNG_CONDITION_STATUS_OK);
1240 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1241 goto fail;
1242 }
1243
e8360425
JD
1244 status = lttng_condition_buffer_usage_get_session_name(
1245 condition, &condition_session_name);
1246 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1247
1248 status = lttng_condition_buffer_usage_get_channel_name(
1249 condition, &condition_channel_name);
1250 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1251
1252 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1253 goto fail;
1254 }
1255 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1256 goto fail;
1257 }
1258
e8360425
JD
1259 return true;
1260fail:
1261 return false;
1262}
1263
1264static
1265bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1266 const struct lttng_condition *condition,
1267 const struct channel_info *channel_info)
e8360425
JD
1268{
1269 enum lttng_condition_status status;
1270 const char *condition_session_name = NULL;
1271
1272 status = lttng_condition_session_consumed_size_get_session_name(
1273 condition, &condition_session_name);
1274 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1275
1276 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1277 goto fail;
1278 }
1279
e8360425
JD
1280 return true;
1281fail:
1282 return false;
1283}
ab0ee2ca 1284
51eab943
JG
1285static
1286bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1287 const struct channel_info *channel_info)
1288{
1289 const struct lttng_condition *condition;
e8360425 1290 bool trigger_applies;
ab0ee2ca 1291
51eab943 1292 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1293 if (!condition) {
ab0ee2ca
JG
1294 goto fail;
1295 }
e8360425
JD
1296
1297 switch (lttng_condition_get_type(condition)) {
1298 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1299 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1300 trigger_applies = buffer_usage_condition_applies_to_channel(
1301 condition, channel_info);
1302 break;
1303 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1304 trigger_applies = session_consumed_size_condition_applies_to_channel(
1305 condition, channel_info);
1306 break;
1307 default:
ab0ee2ca
JG
1308 goto fail;
1309 }
1310
e8360425 1311 return trigger_applies;
ab0ee2ca
JG
1312fail:
1313 return false;
1314}
1315
1316static
1317bool trigger_applies_to_client(struct lttng_trigger *trigger,
1318 struct notification_client *client)
1319{
1320 bool applies = false;
1321 struct lttng_condition_list_element *condition_list_element;
1322
1323 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1324 node) {
1325 applies = lttng_condition_is_equal(
1326 condition_list_element->condition,
1327 lttng_trigger_get_condition(trigger));
1328 if (applies) {
1329 break;
1330 }
1331 }
1332 return applies;
1333}
1334
ed327204
JG
1335/* Must be called with RCU read lock held. */
1336static
1337struct lttng_session_trigger_list *get_session_trigger_list(
1338 struct notification_thread_state *state,
1339 const char *session_name)
1340{
1341 struct lttng_session_trigger_list *list = NULL;
1342 struct cds_lfht_node *node;
1343 struct cds_lfht_iter iter;
1344
1345 cds_lfht_lookup(state->session_triggers_ht,
1346 hash_key_str(session_name, lttng_ht_seed),
1347 match_session_trigger_list,
1348 session_name,
1349 &iter);
1350 node = cds_lfht_iter_get_node(&iter);
1351 if (!node) {
1352 /*
1353 * Not an error, the list of triggers applying to that session
1354 * will be initialized when the session is created.
1355 */
1356 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1357 session_name);
1358 goto end;
1359 }
1360
e742b055 1361 list = caa_container_of(node,
ed327204
JG
1362 struct lttng_session_trigger_list,
1363 session_triggers_ht_node);
1364end:
1365 return list;
1366}
1367
ea9a44f0
JG
1368/*
1369 * Allocate an empty lttng_session_trigger_list for the session named
1370 * 'session_name'.
1371 *
1372 * No ownership of 'session_name' is assumed by the session trigger list.
1373 * It is the caller's responsability to ensure the session name is alive
1374 * for as long as this list is.
1375 */
1376static
1377struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1378 const char *session_name,
1379 struct cds_lfht *session_triggers_ht)
1380{
1381 struct lttng_session_trigger_list *list;
1382
1383 list = zmalloc(sizeof(*list));
1384 if (!list) {
1385 goto end;
1386 }
1387 list->session_name = session_name;
1388 CDS_INIT_LIST_HEAD(&list->list);
1389 cds_lfht_node_init(&list->session_triggers_ht_node);
1390 list->session_triggers_ht = session_triggers_ht;
1391
1392 rcu_read_lock();
1393 /* Publish the list through the session_triggers_ht. */
1394 cds_lfht_add(session_triggers_ht,
1395 hash_key_str(session_name, lttng_ht_seed),
1396 &list->session_triggers_ht_node);
1397 rcu_read_unlock();
1398end:
1399 return list;
1400}
1401
1402static
1403void free_session_trigger_list_rcu(struct rcu_head *node)
1404{
1405 free(caa_container_of(node, struct lttng_session_trigger_list,
1406 rcu_node));
1407}
1408
1409static
1410void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1411{
1412 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1413
1414 /* Empty the list element by element, and then free the list itself. */
1415 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1416 &list->list, node) {
1417 cds_list_del(&trigger_list_element->node);
1418 free(trigger_list_element);
1419 }
1420 rcu_read_lock();
1421 /* Unpublish the list from the session_triggers_ht. */
1422 cds_lfht_del(list->session_triggers_ht,
1423 &list->session_triggers_ht_node);
1424 rcu_read_unlock();
1425 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1426}
1427
1428static
1429int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1430 const struct lttng_trigger *trigger)
1431{
1432 int ret = 0;
1433 struct lttng_trigger_list_element *new_element =
1434 zmalloc(sizeof(*new_element));
1435
1436 if (!new_element) {
1437 ret = -1;
1438 goto end;
1439 }
1440 CDS_INIT_LIST_HEAD(&new_element->node);
1441 new_element->trigger = trigger;
1442 cds_list_add(&new_element->node, &list->list);
1443end:
1444 return ret;
1445}
1446
1447static
1448bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1449 const char *session_name)
1450{
1451 bool applies = false;
1452 const struct lttng_condition *condition;
1453
1454 condition = lttng_trigger_get_const_condition(trigger);
1455 switch (lttng_condition_get_type(condition)) {
1456 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1457 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1458 {
1459 enum lttng_condition_status condition_status;
1460 const char *condition_session_name;
1461
1462 condition_status = lttng_condition_session_rotation_get_session_name(
1463 condition, &condition_session_name);
1464 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1465 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1466 goto end;
1467 }
1468
1469 assert(condition_session_name);
1470 applies = !strcmp(condition_session_name, session_name);
1471 break;
1472 }
1473 default:
1474 goto end;
1475 }
1476end:
1477 return applies;
1478}
1479
1480/*
1481 * Allocate and initialize an lttng_session_trigger_list which contains
1482 * all triggers that apply to the session named 'session_name'.
1483 *
1484 * No ownership of 'session_name' is assumed by the session trigger list.
1485 * It is the caller's responsability to ensure the session name is alive
1486 * for as long as this list is.
1487 */
1488static
1489struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1490 const struct notification_thread_state *state,
1491 const char *session_name)
1492{
1493 int trigger_count = 0;
1494 struct lttng_session_trigger_list *session_trigger_list = NULL;
1495 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1496 struct cds_lfht_iter iter;
1497
1498 session_trigger_list = lttng_session_trigger_list_create(session_name,
1499 state->session_triggers_ht);
1500
1501 /* Add all triggers applying to the session named 'session_name'. */
1502 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1503 node) {
1504 int ret;
1505
1506 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1507 session_name)) {
1508 continue;
1509 }
1510
1511 ret = lttng_session_trigger_list_add(session_trigger_list,
1512 trigger_ht_element->trigger);
1513 if (ret) {
1514 goto error;
1515 }
1516
1517 trigger_count++;
1518 }
1519
1520 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1521 trigger_count);
1522 return session_trigger_list;
1523error:
1524 lttng_session_trigger_list_destroy(session_trigger_list);
1525 return NULL;
1526}
1527
8abe313a
JG
1528static
1529struct session_info *find_or_create_session_info(
1530 struct notification_thread_state *state,
1531 const char *name, uid_t uid, gid_t gid)
1532{
1533 struct session_info *session = NULL;
1534 struct cds_lfht_node *node;
1535 struct cds_lfht_iter iter;
ea9a44f0 1536 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1537
1538 rcu_read_lock();
1539 cds_lfht_lookup(state->sessions_ht,
1540 hash_key_str(name, lttng_ht_seed),
1541 match_session,
1542 name,
1543 &iter);
1544 node = cds_lfht_iter_get_node(&iter);
1545 if (node) {
1546 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1547 name, uid, gid);
1548 session = caa_container_of(node, struct session_info,
1549 sessions_ht_node);
1550 assert(session->uid == uid);
1551 assert(session->gid == gid);
1eee26c5
JG
1552 session_info_get(session);
1553 goto end;
ea9a44f0
JG
1554 }
1555
1556 trigger_list = lttng_session_trigger_list_build(state, name);
1557 if (!trigger_list) {
1558 goto error;
8abe313a
JG
1559 }
1560
1eee26c5
JG
1561 session = session_info_create(name, uid, gid, trigger_list,
1562 state->sessions_ht);
8abe313a
JG
1563 if (!session) {
1564 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1565 name, uid, gid);
b248644d 1566 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1567 goto error;
8abe313a 1568 }
ea9a44f0 1569 trigger_list = NULL;
577983cb
JG
1570
1571 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1572 &session->sessions_ht_node);
1eee26c5 1573end:
8abe313a
JG
1574 rcu_read_unlock();
1575 return session;
ea9a44f0
JG
1576error:
1577 rcu_read_unlock();
1578 session_info_put(session);
1579 return NULL;
8abe313a
JG
1580}
1581
ab0ee2ca
JG
1582static
1583int handle_notification_thread_command_add_channel(
8abe313a
JG
1584 struct notification_thread_state *state,
1585 const char *session_name, uid_t session_uid, gid_t session_gid,
1586 const char *channel_name, enum lttng_domain_type channel_domain,
1587 uint64_t channel_key_int, uint64_t channel_capacity,
1588 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1589{
1590 struct cds_list_head trigger_list;
8abe313a
JG
1591 struct channel_info *new_channel_info = NULL;
1592 struct channel_key channel_key = {
1593 .key = channel_key_int,
1594 .domain = channel_domain,
1595 };
ab0ee2ca
JG
1596 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 int trigger_count = 0;
1599 struct cds_lfht_iter iter;
8abe313a 1600 struct session_info *session_info = NULL;
ab0ee2ca
JG
1601
1602 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1603 channel_name, session_name, channel_key_int,
1604 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1605
1606 CDS_INIT_LIST_HEAD(&trigger_list);
1607
8abe313a
JG
1608 session_info = find_or_create_session_info(state, session_name,
1609 session_uid, session_gid);
1610 if (!session_info) {
a9577b76 1611 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1612 goto error;
1613 }
1614
8abe313a
JG
1615 new_channel_info = channel_info_create(channel_name, &channel_key,
1616 channel_capacity, session_info);
1617 if (!new_channel_info) {
1618 goto error;
1619 }
ab0ee2ca 1620
83b934ad 1621 rcu_read_lock();
ab0ee2ca
JG
1622 /* Build a list of all triggers applying to the new channel. */
1623 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1624 node) {
1625 struct lttng_trigger_list_element *new_element;
1626
1627 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1628 new_channel_info)) {
ab0ee2ca
JG
1629 continue;
1630 }
1631
1632 new_element = zmalloc(sizeof(*new_element));
1633 if (!new_element) {
83b934ad 1634 rcu_read_unlock();
ab0ee2ca
JG
1635 goto error;
1636 }
1637 CDS_INIT_LIST_HEAD(&new_element->node);
1638 new_element->trigger = trigger_ht_element->trigger;
1639 cds_list_add(&new_element->node, &trigger_list);
1640 trigger_count++;
1641 }
83b934ad 1642 rcu_read_unlock();
ab0ee2ca
JG
1643
1644 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1645 trigger_count);
1646 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1647 if (!channel_trigger_list) {
1648 goto error;
1649 }
8abe313a 1650 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1651 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1652 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1653 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1654
1655 rcu_read_lock();
1656 /* Add channel to the channel_ht which owns the channel_infos. */
1657 cds_lfht_add(state->channels_ht,
8abe313a 1658 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1659 &new_channel_info->channels_ht_node);
1660 /*
1661 * Add the list of triggers associated with this channel to the
1662 * channel_triggers_ht.
1663 */
1664 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1665 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1666 &channel_trigger_list->channel_triggers_ht_node);
1667 rcu_read_unlock();
1eee26c5 1668 session_info_put(session_info);
ab0ee2ca
JG
1669 *cmd_result = LTTNG_OK;
1670 return 0;
1671error:
ab0ee2ca 1672 channel_info_destroy(new_channel_info);
8abe313a 1673 session_info_put(session_info);
ab0ee2ca
JG
1674 return 1;
1675}
1676
83b934ad
MD
1677static
1678void free_channel_trigger_list_rcu(struct rcu_head *node)
1679{
1680 free(caa_container_of(node, struct lttng_channel_trigger_list,
1681 rcu_node));
1682}
1683
1684static
1685void free_channel_state_sample_rcu(struct rcu_head *node)
1686{
1687 free(caa_container_of(node, struct channel_state_sample,
1688 rcu_node));
1689}
1690
ab0ee2ca
JG
1691static
1692int handle_notification_thread_command_remove_channel(
1693 struct notification_thread_state *state,
1694 uint64_t channel_key, enum lttng_domain_type domain,
1695 enum lttng_error_code *cmd_result)
1696{
1697 struct cds_lfht_node *node;
1698 struct cds_lfht_iter iter;
1699 struct lttng_channel_trigger_list *trigger_list;
1700 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1701 struct channel_key key = { .key = channel_key, .domain = domain };
1702 struct channel_info *channel_info;
1703
1704 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1705 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1706
1707 rcu_read_lock();
1708
1709 cds_lfht_lookup(state->channel_triggers_ht,
1710 hash_channel_key(&key),
1711 match_channel_trigger_list,
1712 &key,
1713 &iter);
1714 node = cds_lfht_iter_get_node(&iter);
1715 /*
1716 * There is a severe internal error if we are being asked to remove a
1717 * channel that doesn't exist.
1718 */
1719 if (!node) {
1720 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1721 goto end;
1722 }
1723
1724 /* Free the list of triggers associated with this channel. */
1725 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1726 channel_triggers_ht_node);
1727 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1728 &trigger_list->list, node) {
1729 cds_list_del(&trigger_list_element->node);
1730 free(trigger_list_element);
1731 }
1732 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1733 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1734
1735 /* Free sampled channel state. */
1736 cds_lfht_lookup(state->channel_state_ht,
1737 hash_channel_key(&key),
1738 match_channel_state_sample,
1739 &key,
1740 &iter);
1741 node = cds_lfht_iter_get_node(&iter);
1742 /*
1743 * This is expected to be NULL if the channel is destroyed before we
1744 * received a sample.
1745 */
1746 if (node) {
1747 struct channel_state_sample *sample = caa_container_of(node,
1748 struct channel_state_sample,
1749 channel_state_ht_node);
1750
1751 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1752 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1753 }
1754
1755 /* Remove the channel from the channels_ht and free it. */
1756 cds_lfht_lookup(state->channels_ht,
1757 hash_channel_key(&key),
1758 match_channel_info,
1759 &key,
1760 &iter);
1761 node = cds_lfht_iter_get_node(&iter);
1762 assert(node);
1763 channel_info = caa_container_of(node, struct channel_info,
1764 channels_ht_node);
1765 cds_lfht_del(state->channels_ht, node);
1766 channel_info_destroy(channel_info);
1767end:
1768 rcu_read_unlock();
1769 *cmd_result = LTTNG_OK;
1770 return 0;
1771}
1772
0ca52944 1773static
ed327204 1774int handle_notification_thread_command_session_rotation(
0ca52944 1775 struct notification_thread_state *state,
ed327204
JG
1776 enum notification_thread_command_type cmd_type,
1777 const char *session_name, uid_t session_uid, gid_t session_gid,
1778 uint64_t trace_archive_chunk_id,
1779 struct lttng_trace_archive_location *location,
1780 enum lttng_error_code *_cmd_result)
0ca52944 1781{
ed327204
JG
1782 int ret = 0;
1783 enum lttng_error_code cmd_result = LTTNG_OK;
1784 struct lttng_session_trigger_list *trigger_list;
1785 struct lttng_trigger_list_element *trigger_list_element;
1786 struct session_info *session_info;
0ca52944 1787
ed327204
JG
1788 rcu_read_lock();
1789
1790 session_info = find_or_create_session_info(state, session_name,
1791 session_uid, session_gid);
1792 if (!session_info) {
a9577b76 1793 /* Allocation error or an internal error occurred. */
ed327204
JG
1794 ret = -1;
1795 cmd_result = LTTNG_ERR_NOMEM;
1796 goto end;
1797 }
1798
1799 session_info->rotation.ongoing =
1800 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1801 session_info->rotation.id = trace_archive_chunk_id;
1802 trigger_list = get_session_trigger_list(state, session_name);
1803 if (!trigger_list) {
1804 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1805 session_name);
1806 goto end;
1807 }
1808
1809 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1810 node) {
1811 const struct lttng_condition *condition;
1812 const struct lttng_action *action;
1813 const struct lttng_trigger *trigger;
1814 struct notification_client_list *client_list;
1815 struct lttng_evaluation *evaluation = NULL;
1816 enum lttng_condition_type condition_type;
1817
1818 trigger = trigger_list_element->trigger;
1819 condition = lttng_trigger_get_const_condition(trigger);
1820 assert(condition);
1821 condition_type = lttng_condition_get_type(condition);
1822
1823 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1824 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1825 continue;
1826 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1827 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1828 continue;
1829 }
1830
1831 action = lttng_trigger_get_const_action(trigger);
1832
1833 /* Notify actions are the only type currently supported. */
1834 assert(lttng_action_get_type_const(action) ==
1835 LTTNG_ACTION_TYPE_NOTIFY);
1836
1837 client_list = get_client_list_from_condition(state, condition);
1838 assert(client_list);
1839
1840 if (cds_list_empty(&client_list->list)) {
1841 /*
1842 * No clients interested in the evaluation's result,
1843 * skip it.
1844 */
1845 continue;
1846 }
1847
1848 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1849 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1850 trace_archive_chunk_id);
1851 } else {
1852 evaluation = lttng_evaluation_session_rotation_completed_create(
1853 trace_archive_chunk_id, location);
1854 }
1855
1856 if (!evaluation) {
1857 /* Internal error */
1858 ret = -1;
1859 cmd_result = LTTNG_ERR_UNK;
1860 goto end;
1861 }
1862
1863 /* Dispatch evaluation result to all clients. */
1864 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1865 evaluation, client_list, state,
1866 session_info->uid,
1867 session_info->gid);
1868 lttng_evaluation_destroy(evaluation);
1869 if (caa_unlikely(ret)) {
1870 goto end;
1871 }
1872 }
1873end:
1874 session_info_put(session_info);
1875 *_cmd_result = cmd_result;
1876 rcu_read_unlock();
1877 return ret;
0ca52944
JG
1878}
1879
1da26331
JG
1880static
1881int condition_is_supported(struct lttng_condition *condition)
1882{
1883 int ret;
1884
1885 switch (lttng_condition_get_type(condition)) {
1886 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1887 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1888 {
1889 enum lttng_domain_type domain;
1890
1891 ret = lttng_condition_buffer_usage_get_domain_type(condition,
1892 &domain);
1893 if (ret) {
1894 ret = -1;
1895 goto end;
1896 }
1897
1898 if (domain != LTTNG_DOMAIN_KERNEL) {
1899 ret = 1;
1900 goto end;
1901 }
1902
1903 /*
1904 * Older kernel tracers don't expose the API to monitor their
1905 * buffers. Therefore, we reject triggers that require that
1906 * mechanism to be available to be evaluated.
1907 */
7d268848 1908 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1da26331
JG
1909 break;
1910 }
1911 default:
1912 ret = 1;
1913 }
1914end:
1915 return ret;
1916}
1917
51eab943
JG
1918/* Must be called with RCU read lock held. */
1919static
1920int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
1921 struct notification_thread_state *state)
1922{
1923 int ret = 0;
51eab943
JG
1924 const struct lttng_condition *condition;
1925 const char *session_name;
1926 struct lttng_session_trigger_list *trigger_list;
1927
1928 condition = lttng_trigger_get_const_condition(trigger);
1929 switch (lttng_condition_get_type(condition)) {
1930 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1931 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1932 {
1933 enum lttng_condition_status status;
1934
1935 status = lttng_condition_session_rotation_get_session_name(
1936 condition, &session_name);
1937 if (status != LTTNG_CONDITION_STATUS_OK) {
1938 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
1939 ret = -1;
1940 goto end;
1941 }
1942 break;
1943 }
1944 default:
1945 ret = -1;
1946 goto end;
1947 }
1948
ed327204
JG
1949 trigger_list = get_session_trigger_list(state, session_name);
1950 if (!trigger_list) {
51eab943
JG
1951 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
1952 session_name);
1953 goto end;
51eab943 1954
ed327204 1955 }
51eab943
JG
1956
1957 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
1958 session_name);
1959 ret = lttng_session_trigger_list_add(trigger_list, trigger);
1960end:
1961 return ret;
1962}
1963
1964/* Must be called with RCU read lock held. */
1965static
1966int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
1967 struct notification_thread_state *state)
1968{
1969 int ret = 0;
1970 struct cds_lfht_node *node;
1971 struct cds_lfht_iter iter;
1972 struct channel_info *channel;
1973
1974 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
1975 channels_ht_node) {
1976 struct lttng_trigger_list_element *trigger_list_element;
1977 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 1978 struct cds_lfht_iter lookup_iter;
51eab943
JG
1979
1980 if (!trigger_applies_to_channel(trigger, channel)) {
1981 continue;
1982 }
1983
1984 cds_lfht_lookup(state->channel_triggers_ht,
1985 hash_channel_key(&channel->key),
1986 match_channel_trigger_list,
1987 &channel->key,
a5d64ae7
MD
1988 &lookup_iter);
1989 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
1990 assert(node);
1991 trigger_list = caa_container_of(node,
1992 struct lttng_channel_trigger_list,
1993 channel_triggers_ht_node);
1994
1995 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
1996 if (!trigger_list_element) {
1997 ret = -1;
1998 goto end;
1999 }
2000 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2001 trigger_list_element->trigger = trigger;
2002 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2003 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2004 channel->name);
2005 }
2006end:
2007 return ret;
2008}
2009
ab0ee2ca
JG
2010/*
2011 * FIXME A client's credentials are not checked when registering a trigger, nor
2012 * are they stored alongside with the trigger.
2013 *
1da26331 2014 * The effects of this are benign since:
ab0ee2ca 2015 * - The client will succeed in registering the trigger, as it is valid,
51eab943 2016 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
2017 * - The notifications will not be sent since the client's credentials
2018 * are checked against the channel at that moment.
1da26331
JG
2019 *
2020 * If this function returns a non-zero value, it means something is
50ca7858 2021 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
2022 *
2023 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2024 * error code.
ab0ee2ca
JG
2025 */
2026static
2027int handle_notification_thread_command_register_trigger(
8abe313a
JG
2028 struct notification_thread_state *state,
2029 struct lttng_trigger *trigger,
2030 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
2031{
2032 int ret = 0;
2033 struct lttng_condition *condition;
2034 struct notification_client *client;
2035 struct notification_client_list *client_list = NULL;
2036 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2037 struct notification_client_list_element *client_list_element, *tmp;
2038 struct cds_lfht_node *node;
2039 struct cds_lfht_iter iter;
ab0ee2ca
JG
2040 bool free_trigger = true;
2041
2042 rcu_read_lock();
2043
2044 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2045 assert(condition);
2046
2047 ret = condition_is_supported(condition);
2048 if (ret < 0) {
2049 goto error;
2050 } else if (ret == 0) {
2051 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2052 goto error;
2053 } else {
2054 /* Feature is supported, continue. */
2055 ret = 0;
2056 }
2057
ab0ee2ca
JG
2058 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2059 if (!trigger_ht_element) {
2060 ret = -1;
2061 goto error;
2062 }
2063
2064 /* Add trigger to the trigger_ht. */
2065 cds_lfht_node_init(&trigger_ht_element->node);
2066 trigger_ht_element->trigger = trigger;
2067
2068 node = cds_lfht_add_unique(state->triggers_ht,
2069 lttng_condition_hash(condition),
2070 match_condition,
2071 condition,
2072 &trigger_ht_element->node);
2073 if (node != &trigger_ht_element->node) {
2074 /* Not a fatal error, simply report it to the client. */
2075 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2076 goto error_free_ht_element;
2077 }
2078
2079 /*
2080 * Ownership of the trigger and of its wrapper was transfered to
2081 * the triggers_ht.
2082 */
2083 trigger_ht_element = NULL;
2084 free_trigger = false;
2085
2086 /*
2087 * The rest only applies to triggers that have a "notify" action.
2088 * It is not skipped as this is the only action type currently
2089 * supported.
2090 */
2091 client_list = zmalloc(sizeof(*client_list));
2092 if (!client_list) {
2093 ret = -1;
2094 goto error_free_ht_element;
2095 }
2096 cds_lfht_node_init(&client_list->notification_trigger_ht_node);
2097 CDS_INIT_LIST_HEAD(&client_list->list);
2098 client_list->trigger = trigger;
2099
2100 /* Build a list of clients to which this new trigger applies. */
2101 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2102 client_socket_ht_node) {
2103 if (!trigger_applies_to_client(trigger, client)) {
2104 continue;
2105 }
2106
2107 client_list_element = zmalloc(sizeof(*client_list_element));
2108 if (!client_list_element) {
2109 ret = -1;
2110 goto error_free_client_list;
2111 }
2112 CDS_INIT_LIST_HEAD(&client_list_element->node);
2113 client_list_element->client = client;
2114 cds_list_add(&client_list_element->node, &client_list->list);
2115 }
2116
2117 cds_lfht_add(state->notification_trigger_clients_ht,
2118 lttng_condition_hash(condition),
2119 &client_list->notification_trigger_ht_node);
ab0ee2ca 2120
f82f93a1 2121 switch (get_condition_binding_object(condition)) {
51eab943
JG
2122 case LTTNG_OBJECT_TYPE_SESSION:
2123 /* Add the trigger to the list if it matches a known session. */
2124 ret = bind_trigger_to_matching_session(trigger, state);
2125 if (ret) {
2126 goto error_free_client_list;
ab0ee2ca 2127 }
f82f93a1 2128 break;
51eab943
JG
2129 case LTTNG_OBJECT_TYPE_CHANNEL:
2130 /*
2131 * Add the trigger to list of triggers bound to the channels
2132 * currently known.
2133 */
2134 ret = bind_trigger_to_matching_channels(trigger, state);
2135 if (ret) {
ab0ee2ca
JG
2136 goto error_free_client_list;
2137 }
51eab943
JG
2138 break;
2139 case LTTNG_OBJECT_TYPE_NONE:
2140 break;
2141 default:
2142 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2143 ret = -1;
2144 goto error_free_client_list;
ab0ee2ca
JG
2145 }
2146
2ae99f0b
JG
2147 /*
2148 * Since there is nothing preventing clients from subscribing to a
2149 * condition before the corresponding trigger is registered, we have
2150 * to evaluate this new condition right away.
2151 *
2152 * At some point, we were waiting for the next "evaluation" (e.g. on
2153 * reception of a channel sample) to evaluate this new condition, but
2154 * that was broken.
2155 *
2156 * The reason it was broken is that waiting for the next sample
2157 * does not allow us to properly handle transitions for edge-triggered
2158 * conditions.
2159 *
2160 * Consider this example: when we handle a new channel sample, we
2161 * evaluate each conditions twice: once with the previous state, and
2162 * again with the newest state. We then use those two results to
2163 * determine whether a state change happened: a condition was false and
2164 * became true. If a state change happened, we have to notify clients.
2165 *
2166 * Now, if a client subscribes to a given notification and registers
2167 * a trigger *after* that subscription, we have to make sure the
2168 * condition is evaluated at this point while considering only the
2169 * current state. Otherwise, the next evaluation cycle may only see
2170 * that the evaluations remain the same (true for samples n-1 and n) and
2171 * the client will never know that the condition has been met.
2172 */
2173 cds_list_for_each_entry_safe(client_list_element, tmp,
2174 &client_list->list, node) {
2175 ret = evaluate_condition_for_client(trigger, condition,
2176 client_list_element->client, state);
2177 if (ret) {
2178 goto error_free_client_list;
2179 }
2180 }
2181
2182 /*
2183 * Client list ownership transferred to the
2184 * notification_trigger_clients_ht.
2185 */
2186 client_list = NULL;
2187
ab0ee2ca
JG
2188 *cmd_result = LTTNG_OK;
2189error_free_client_list:
2190 if (client_list) {
2191 cds_list_for_each_entry_safe(client_list_element, tmp,
2192 &client_list->list, node) {
2193 free(client_list_element);
2194 }
2195 free(client_list);
2196 }
2197error_free_ht_element:
2198 free(trigger_ht_element);
2199error:
2200 if (free_trigger) {
ab0ee2ca
JG
2201 lttng_trigger_destroy(trigger);
2202 }
2203 rcu_read_unlock();
2204 return ret;
2205}
2206
83b934ad
MD
2207static
2208void free_notification_client_list_rcu(struct rcu_head *node)
2209{
2210 free(caa_container_of(node, struct notification_client_list,
2211 rcu_node));
2212}
2213
2214static
2215void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2216{
2217 free(caa_container_of(node, struct lttng_trigger_ht_element,
2218 rcu_node));
2219}
2220
cc2295b5 2221static
ab0ee2ca
JG
2222int handle_notification_thread_command_unregister_trigger(
2223 struct notification_thread_state *state,
2224 struct lttng_trigger *trigger,
2225 enum lttng_error_code *_cmd_reply)
2226{
2227 struct cds_lfht_iter iter;
ed327204 2228 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2229 struct lttng_channel_trigger_list *trigger_list;
2230 struct notification_client_list *client_list;
2231 struct notification_client_list_element *client_list_element, *tmp;
2232 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2233 struct lttng_condition *condition = lttng_trigger_get_condition(
2234 trigger);
ab0ee2ca
JG
2235 enum lttng_error_code cmd_reply;
2236
2237 rcu_read_lock();
2238
2239 cds_lfht_lookup(state->triggers_ht,
2240 lttng_condition_hash(condition),
2241 match_condition,
2242 condition,
2243 &iter);
2244 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2245 if (!triggers_ht_node) {
2246 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2247 goto end;
2248 } else {
2249 cmd_reply = LTTNG_OK;
2250 }
2251
2252 /* Remove trigger from channel_triggers_ht. */
2253 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2254 channel_triggers_ht_node) {
2255 struct lttng_trigger_list_element *trigger_element, *tmp;
2256
2257 cds_list_for_each_entry_safe(trigger_element, tmp,
2258 &trigger_list->list, node) {
9b63a4aa
JG
2259 const struct lttng_condition *current_condition =
2260 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2261 trigger_element->trigger);
2262
2263 assert(current_condition);
2264 if (!lttng_condition_is_equal(condition,
2265 current_condition)) {
2266 continue;
2267 }
2268
2269 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2270 cds_list_del(&trigger_element->node);
e4db5ace
JR
2271 /* A trigger can only appear once per channel */
2272 break;
ab0ee2ca
JG
2273 }
2274 }
2275
2276 /*
2277 * Remove and release the client list from
2278 * notification_trigger_clients_ht.
2279 */
ed327204
JG
2280 client_list = get_client_list_from_condition(state, condition);
2281 assert(client_list);
2282
ab0ee2ca
JG
2283 cds_list_for_each_entry_safe(client_list_element, tmp,
2284 &client_list->list, node) {
2285 free(client_list_element);
2286 }
ed327204
JG
2287 cds_lfht_del(state->notification_trigger_clients_ht,
2288 &client_list->notification_trigger_ht_node);
83b934ad 2289 call_rcu(&client_list->rcu_node, free_notification_client_list_rcu);
ab0ee2ca
JG
2290
2291 /* Remove trigger from triggers_ht. */
2292 trigger_ht_element = caa_container_of(triggers_ht_node,
2293 struct lttng_trigger_ht_element, node);
2294 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2295
7ca172c1 2296 /* Release the ownership of the trigger. */
ab0ee2ca 2297 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2298 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2299end:
2300 rcu_read_unlock();
2301 if (_cmd_reply) {
2302 *_cmd_reply = cmd_reply;
2303 }
2304 return 0;
2305}
2306
2307/* Returns 0 on success, 1 on exit requested, negative value on error. */
2308int handle_notification_thread_command(
2309 struct notification_thread_handle *handle,
2310 struct notification_thread_state *state)
2311{
2312 int ret;
2313 uint64_t counter;
2314 struct notification_thread_command *cmd;
2315
8abe313a 2316 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2317 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2318 sizeof(counter));
18aeca05 2319 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2320 goto error;
2321 }
2322
2323 pthread_mutex_lock(&handle->cmd_queue.lock);
2324 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2325 struct notification_thread_command, cmd_list_node);
2326 switch (cmd->type) {
2327 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2328 DBG("[notification-thread] Received register trigger command");
2329 ret = handle_notification_thread_command_register_trigger(
2330 state, cmd->parameters.trigger,
2331 &cmd->reply_code);
2332 break;
2333 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2334 DBG("[notification-thread] Received unregister trigger command");
2335 ret = handle_notification_thread_command_unregister_trigger(
2336 state, cmd->parameters.trigger,
2337 &cmd->reply_code);
2338 break;
2339 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2340 DBG("[notification-thread] Received add channel command");
2341 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2342 state,
2343 cmd->parameters.add_channel.session.name,
2344 cmd->parameters.add_channel.session.uid,
2345 cmd->parameters.add_channel.session.gid,
2346 cmd->parameters.add_channel.channel.name,
2347 cmd->parameters.add_channel.channel.domain,
2348 cmd->parameters.add_channel.channel.key,
2349 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2350 &cmd->reply_code);
2351 break;
2352 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2353 DBG("[notification-thread] Received remove channel command");
2354 ret = handle_notification_thread_command_remove_channel(
2355 state, cmd->parameters.remove_channel.key,
2356 cmd->parameters.remove_channel.domain,
2357 &cmd->reply_code);
2358 break;
0ca52944 2359 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2360 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2361 DBG("[notification-thread] Received session rotation %s command",
2362 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2363 "ongoing" : "completed");
2364 ret = handle_notification_thread_command_session_rotation(
0ca52944 2365 state,
ed327204
JG
2366 cmd->type,
2367 cmd->parameters.session_rotation.session_name,
2368 cmd->parameters.session_rotation.uid,
2369 cmd->parameters.session_rotation.gid,
2370 cmd->parameters.session_rotation.trace_archive_chunk_id,
2371 cmd->parameters.session_rotation.location,
0ca52944
JG
2372 &cmd->reply_code);
2373 break;
ab0ee2ca
JG
2374 case NOTIFICATION_COMMAND_TYPE_QUIT:
2375 DBG("[notification-thread] Received quit command");
2376 cmd->reply_code = LTTNG_OK;
2377 ret = 1;
2378 goto end;
2379 default:
2380 ERR("[notification-thread] Unknown internal command received");
2381 goto error_unlock;
2382 }
2383
2384 if (ret) {
2385 goto error_unlock;
2386 }
2387end:
2388 cds_list_del(&cmd->cmd_list_node);
8ada111f 2389 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2390 pthread_mutex_unlock(&handle->cmd_queue.lock);
2391 return ret;
2392error_unlock:
2393 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2394 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2395 pthread_mutex_unlock(&handle->cmd_queue.lock);
2396 cmd->reply_code = LTTNG_ERR_FATAL;
2397error:
2398 /* Indicate a fatal error to the caller. */
2399 return -1;
2400}
2401
ab0ee2ca
JG
2402static
2403int socket_set_non_blocking(int socket)
2404{
2405 int ret, flags;
2406
2407 /* Set the pipe as non-blocking. */
2408 ret = fcntl(socket, F_GETFL, 0);
2409 if (ret == -1) {
2410 PERROR("fcntl get socket flags");
2411 goto end;
2412 }
2413 flags = ret;
2414
2415 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2416 if (ret == -1) {
2417 PERROR("fcntl set O_NONBLOCK socket flag");
2418 goto end;
2419 }
2420 DBG("Client socket (fd = %i) set as non-blocking", socket);
2421end:
2422 return ret;
2423}
2424
2425static
14fa22f8 2426int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2427{
2428 int ret;
2429
2430 ret = lttng_dynamic_buffer_set_size(
2431 &client->communication.inbound.buffer, 0);
2432 assert(!ret);
2433
2434 client->communication.inbound.bytes_to_receive =
2435 sizeof(struct lttng_notification_channel_message);
2436 client->communication.inbound.msg_type =
2437 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2438 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2439 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2440 ret = lttng_dynamic_buffer_set_size(
2441 &client->communication.inbound.buffer,
2442 client->communication.inbound.bytes_to_receive);
2443 return ret;
ab0ee2ca
JG
2444}
2445
2446int handle_notification_thread_client_connect(
2447 struct notification_thread_state *state)
2448{
2449 int ret;
2450 struct notification_client *client;
2451
2452 DBG("[notification-thread] Handling new notification channel client connection");
2453
2454 client = zmalloc(sizeof(*client));
2455 if (!client) {
2456 /* Fatal error. */
2457 ret = -1;
2458 goto error;
2459 }
ac1889bf 2460 client->id = state->next_notification_client_id++;
ab0ee2ca
JG
2461 CDS_INIT_LIST_HEAD(&client->condition_list);
2462 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2463 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2464 client->communication.inbound.expect_creds = true;
14fa22f8
JG
2465 ret = client_reset_inbound_state(client);
2466 if (ret) {
2467 ERR("[notification-thread] Failed to reset client communication's inbound state");
e742b055 2468 ret = 0;
14fa22f8
JG
2469 goto error;
2470 }
ab0ee2ca
JG
2471
2472 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2473 if (ret < 0) {
2474 ERR("[notification-thread] Failed to accept new notification channel client connection");
2475 ret = 0;
2476 goto error;
2477 }
2478
2479 client->socket = ret;
2480
2481 ret = socket_set_non_blocking(client->socket);
2482 if (ret) {
2483 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2484 goto error;
2485 }
2486
2487 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2488 if (ret < 0) {
2489 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2490 ret = 0;
2491 goto error;
2492 }
2493
2494 ret = lttng_poll_add(&state->events, client->socket,
2495 LPOLLIN | LPOLLERR |
2496 LPOLLHUP | LPOLLRDHUP);
2497 if (ret < 0) {
2498 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2499 ret = 0;
2500 goto error;
2501 }
2502 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2503 client->socket);
2504
ab0ee2ca
JG
2505 rcu_read_lock();
2506 cds_lfht_add(state->client_socket_ht,
2507 hash_client_socket(client->socket),
2508 &client->client_socket_ht_node);
ac1889bf
JG
2509 cds_lfht_add(state->client_id_ht,
2510 hash_client_id(client->id),
2511 &client->client_id_ht_node);
ab0ee2ca
JG
2512 rcu_read_unlock();
2513
2514 return ret;
2515error:
2516 notification_client_destroy(client, state);
2517 return ret;
2518}
2519
2520int handle_notification_thread_client_disconnect(
2521 int client_socket,
2522 struct notification_thread_state *state)
2523{
2524 int ret = 0;
2525 struct notification_client *client;
2526
2527 rcu_read_lock();
2528 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2529 client_socket);
2530 client = get_client_from_socket(client_socket, state);
2531 if (!client) {
2532 /* Internal state corruption, fatal error. */
2533 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2534 client_socket);
2535 ret = -1;
2536 goto end;
2537 }
2538
2539 ret = lttng_poll_del(&state->events, client_socket);
2540 if (ret) {
2541 ERR("[notification-thread] Failed to remove client socket from poll set");
2542 }
e742b055 2543 cds_lfht_del(state->client_socket_ht,
ab0ee2ca 2544 &client->client_socket_ht_node);
ac1889bf
JG
2545 cds_lfht_del(state->client_id_ht,
2546 &client->client_id_ht_node);
ab0ee2ca
JG
2547 notification_client_destroy(client, state);
2548end:
2549 rcu_read_unlock();
2550 return ret;
2551}
2552
2553int handle_notification_thread_client_disconnect_all(
2554 struct notification_thread_state *state)
2555{
2556 struct cds_lfht_iter iter;
2557 struct notification_client *client;
2558 bool error_encoutered = false;
2559
2560 rcu_read_lock();
2561 DBG("[notification-thread] Closing all client connections");
2562 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2563 client_socket_ht_node) {
2564 int ret;
2565
2566 ret = handle_notification_thread_client_disconnect(
2567 client->socket, state);
2568 if (ret) {
2569 error_encoutered = true;
2570 }
2571 }
2572 rcu_read_unlock();
2573 return error_encoutered ? 1 : 0;
2574}
2575
2576int handle_notification_thread_trigger_unregister_all(
2577 struct notification_thread_state *state)
2578{
4149ace8 2579 bool error_occurred = false;
ab0ee2ca
JG
2580 struct cds_lfht_iter iter;
2581 struct lttng_trigger_ht_element *trigger_ht_element;
2582
763de384 2583 rcu_read_lock();
ab0ee2ca
JG
2584 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2585 node) {
2586 int ret = handle_notification_thread_command_unregister_trigger(
2587 state, trigger_ht_element->trigger, NULL);
2588 if (ret) {
4149ace8 2589 error_occurred = true;
ab0ee2ca
JG
2590 }
2591 }
763de384 2592 rcu_read_unlock();
4149ace8 2593 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2594}
2595
2596static
2597int client_flush_outgoing_queue(struct notification_client *client,
2598 struct notification_thread_state *state)
2599{
2600 ssize_t ret;
2601 size_t to_send_count;
2602
2603 assert(client->communication.outbound.buffer.size != 0);
2604 to_send_count = client->communication.outbound.buffer.size;
2605 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2606 client->socket);
2607
2608 ret = lttcomm_send_unix_sock_non_block(client->socket,
2609 client->communication.outbound.buffer.data,
2610 to_send_count);
44c180ca 2611 if ((ret >= 0 && ret < to_send_count)) {
ab0ee2ca
JG
2612 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2613 client->socket);
2614 to_send_count -= max(ret, 0);
2615
2616 memcpy(client->communication.outbound.buffer.data,
2617 client->communication.outbound.buffer.data +
2618 client->communication.outbound.buffer.size - to_send_count,
2619 to_send_count);
2620 ret = lttng_dynamic_buffer_set_size(
2621 &client->communication.outbound.buffer,
2622 to_send_count);
2623 if (ret) {
2624 goto error;
2625 }
2626
2627 /*
2628 * We want to be notified whenever there is buffer space
2629 * available to send the rest of the payload.
2630 */
2631 ret = lttng_poll_mod(&state->events, client->socket,
2632 CLIENT_POLL_MASK_IN_OUT);
2633 if (ret) {
2634 goto error;
2635 }
2636 } else if (ret < 0) {
2637 /* Generic error, disconnect the client. */
2638 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2639 client->socket);
2640 ret = handle_notification_thread_client_disconnect(
2641 client->socket, state);
2642 if (ret) {
2643 goto error;
2644 }
2645 } else {
2646 /* No error and flushed the queue completely. */
2647 ret = lttng_dynamic_buffer_set_size(
2648 &client->communication.outbound.buffer, 0);
2649 if (ret) {
2650 goto error;
2651 }
2652 ret = lttng_poll_mod(&state->events, client->socket,
2653 CLIENT_POLL_MASK_IN);
2654 if (ret) {
2655 goto error;
2656 }
2657
2658 client->communication.outbound.queued_command_reply = false;
2659 client->communication.outbound.dropped_notification = false;
2660 }
2661
2662 return 0;
2663error:
2664 return -1;
2665}
2666
2667static
2668int client_send_command_reply(struct notification_client *client,
2669 struct notification_thread_state *state,
2670 enum lttng_notification_channel_status status)
2671{
2672 int ret;
2673 struct lttng_notification_channel_command_reply reply = {
2674 .status = (int8_t) status,
2675 };
2676 struct lttng_notification_channel_message msg = {
2677 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2678 .size = sizeof(reply),
2679 };
2680 char buffer[sizeof(msg) + sizeof(reply)];
2681
2682 if (client->communication.outbound.queued_command_reply) {
2683 /* Protocol error. */
2684 goto error;
2685 }
2686
2687 memcpy(buffer, &msg, sizeof(msg));
2688 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2689 DBG("[notification-thread] Send command reply (%i)", (int) status);
2690
2691 /* Enqueue buffer to outgoing queue and flush it. */
2692 ret = lttng_dynamic_buffer_append(
2693 &client->communication.outbound.buffer,
2694 buffer, sizeof(buffer));
2695 if (ret) {
2696 goto error;
2697 }
2698
2699 ret = client_flush_outgoing_queue(client, state);
2700 if (ret) {
2701 goto error;
2702 }
2703
2704 if (client->communication.outbound.buffer.size != 0) {
2705 /* Queue could not be emptied. */
2706 client->communication.outbound.queued_command_reply = true;
2707 }
2708
2709 return 0;
2710error:
2711 return -1;
2712}
2713
2714static
2715int client_dispatch_message(struct notification_client *client,
2716 struct notification_thread_state *state)
2717{
2718 int ret = 0;
2719
2720 if (client->communication.inbound.msg_type !=
2721 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
2722 client->communication.inbound.msg_type !=
2723 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
2724 !client->validated) {
2725 WARN("[notification-thread] client attempted a command before handshake");
2726 ret = -1;
2727 goto end;
2728 }
2729
2730 switch (client->communication.inbound.msg_type) {
2731 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
2732 {
2733 /*
2734 * Receiving message header. The function will be called again
2735 * once the rest of the message as been received and can be
2736 * interpreted.
2737 */
2738 const struct lttng_notification_channel_message *msg;
2739
2740 assert(sizeof(*msg) ==
2741 client->communication.inbound.buffer.size);
2742 msg = (const struct lttng_notification_channel_message *)
2743 client->communication.inbound.buffer.data;
2744
2745 if (msg->size == 0 || msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2746 ERR("[notification-thread] Invalid notification channel message: length = %u", msg->size);
2747 ret = -1;
2748 goto end;
2749 }
2750
2751 switch (msg->type) {
2752 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2753 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2754 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2755 break;
2756 default:
2757 ret = -1;
2758 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2759 goto end;
2760 }
2761
2762 client->communication.inbound.bytes_to_receive = msg->size;
2763 client->communication.inbound.msg_type =
2764 (enum lttng_notification_channel_message_type) msg->type;
ab0ee2ca 2765 ret = lttng_dynamic_buffer_set_size(
14fa22f8 2766 &client->communication.inbound.buffer, msg->size);
ab0ee2ca
JG
2767 if (ret) {
2768 goto end;
2769 }
2770 break;
2771 }
2772 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2773 {
2774 struct lttng_notification_channel_command_handshake *handshake_client;
2775 struct lttng_notification_channel_command_handshake handshake_reply = {
2776 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2777 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2778 };
2779 struct lttng_notification_channel_message msg_header = {
2780 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2781 .size = sizeof(handshake_reply),
2782 };
2783 enum lttng_notification_channel_status status =
2784 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2785 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2786
2787 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2788 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2789 sizeof(handshake_reply));
2790
2791 handshake_client =
2792 (struct lttng_notification_channel_command_handshake *)
2793 client->communication.inbound.buffer.data;
47a32869 2794 client->major = handshake_client->major;
ab0ee2ca
JG
2795 client->minor = handshake_client->minor;
2796 if (!client->communication.inbound.creds_received) {
2797 ERR("[notification-thread] No credentials received from client");
2798 ret = -1;
2799 goto end;
2800 }
2801
2802 client->uid = LTTNG_SOCK_GET_UID_CRED(
2803 &client->communication.inbound.creds);
2804 client->gid = LTTNG_SOCK_GET_GID_CRED(
2805 &client->communication.inbound.creds);
2806 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2807 client->uid, client->gid, (int) client->major,
2808 (int) client->minor);
2809
2810 if (handshake_client->major != LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2811 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2812 }
2813
2814 ret = lttng_dynamic_buffer_append(&client->communication.outbound.buffer,
2815 send_buffer, sizeof(send_buffer));
2816 if (ret) {
2817 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2818 goto end;
2819 }
2820
2821 ret = client_flush_outgoing_queue(client, state);
2822 if (ret) {
2823 goto end;
2824 }
2825
2826 ret = client_send_command_reply(client, state, status);
2827 if (ret) {
2828 ERR("[notification-thread] Failed to send reply to notification channel client");
2829 goto end;
2830 }
2831
2832 /* Set reception state to receive the next message header. */
14fa22f8
JG
2833 ret = client_reset_inbound_state(client);
2834 if (ret) {
2835 ERR("[notification-thread] Failed to reset client communication's inbound state");
2836 goto end;
2837 }
ab0ee2ca
JG
2838 client->validated = true;
2839 break;
2840 }
2841 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2842 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2843 {
2844 struct lttng_condition *condition;
2845 enum lttng_notification_channel_status status =
2846 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
c0a66c84
JG
2847 struct lttng_payload_view condition_view =
2848 lttng_payload_view_from_dynamic_buffer(
ab0ee2ca
JG
2849 &client->communication.inbound.buffer,
2850 0, -1);
2851 size_t expected_condition_size =
2852 client->communication.inbound.buffer.size;
2853
c0a66c84 2854 ret = lttng_condition_create_from_payload(&condition_view,
ab0ee2ca
JG
2855 &condition);
2856 if (ret != expected_condition_size) {
2857 ERR("[notification-thread] Malformed condition received from client");
2858 goto end;
2859 }
2860
2861 if (client->communication.inbound.msg_type ==
2862 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
ab0ee2ca
JG
2863 ret = notification_thread_client_subscribe(client,
2864 condition, state, &status);
2865 } else {
2866 ret = notification_thread_client_unsubscribe(client,
2867 condition, state, &status);
2868 }
2869 if (ret) {
2870 goto end;
2871 }
2872
2873 ret = client_send_command_reply(client, state, status);
2874 if (ret) {
2875 ERR("[notification-thread] Failed to send reply to notification channel client");
2876 goto end;
2877 }
2878
2879 /* Set reception state to receive the next message header. */
14fa22f8
JG
2880 ret = client_reset_inbound_state(client);
2881 if (ret) {
2882 ERR("[notification-thread] Failed to reset client communication's inbound state");
2883 goto end;
2884 }
ab0ee2ca
JG
2885 break;
2886 }
2887 default:
2888 abort();
2889 }
2890end:
2891 return ret;
2892}
2893
2894/* Incoming data from client. */
2895int handle_notification_thread_client_in(
2896 struct notification_thread_state *state, int socket)
2897{
14fa22f8 2898 int ret = 0;
ab0ee2ca
JG
2899 struct notification_client *client;
2900 ssize_t recv_ret;
2901 size_t offset;
2902
2903 client = get_client_from_socket(socket, state);
2904 if (!client) {
2905 /* Internal error, abort. */
2906 ret = -1;
2907 goto end;
2908 }
2909
14fa22f8
JG
2910 offset = client->communication.inbound.buffer.size -
2911 client->communication.inbound.bytes_to_receive;
01ea340e 2912 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
2913 recv_ret = lttcomm_recv_creds_unix_sock(socket,
2914 client->communication.inbound.buffer.data + offset,
2915 client->communication.inbound.bytes_to_receive,
2916 &client->communication.inbound.creds);
2917 if (recv_ret > 0) {
01ea340e 2918 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
2919 client->communication.inbound.creds_received = true;
2920 }
2921 } else {
2922 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
2923 client->communication.inbound.buffer.data + offset,
2924 client->communication.inbound.bytes_to_receive);
2925 }
2926 if (recv_ret < 0) {
2927 goto error_disconnect_client;
2928 }
2929
2930 client->communication.inbound.bytes_to_receive -= recv_ret;
ab0ee2ca
JG
2931 if (client->communication.inbound.bytes_to_receive == 0) {
2932 ret = client_dispatch_message(client, state);
2933 if (ret) {
2934 /*
2935 * Only returns an error if this client must be
2936 * disconnected.
2937 */
2938 goto error_disconnect_client;
2939 }
2940 } else {
2941 goto end;
2942 }
2943end:
2944 return ret;
2945error_disconnect_client:
2946 ret = handle_notification_thread_client_disconnect(socket, state);
2947 return ret;
2948}
2949
2950/* Client ready to receive outgoing data. */
2951int handle_notification_thread_client_out(
2952 struct notification_thread_state *state, int socket)
2953{
2954 int ret;
2955 struct notification_client *client;
2956
2957 client = get_client_from_socket(socket, state);
2958 if (!client) {
2959 /* Internal error, abort. */
2960 ret = -1;
2961 goto end;
2962 }
2963
2964 ret = client_flush_outgoing_queue(client, state);
2965 if (ret) {
2966 goto end;
2967 }
2968end:
2969 return ret;
2970}
2971
2972static
e8360425
JD
2973bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
2974 const struct channel_state_sample *sample,
2975 uint64_t buffer_capacity)
ab0ee2ca
JG
2976{
2977 bool result = false;
2978 uint64_t threshold;
2979 enum lttng_condition_type condition_type;
e8360425 2980 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
2981 condition, struct lttng_condition_buffer_usage,
2982 parent);
2983
ab0ee2ca
JG
2984 if (use_condition->threshold_bytes.set) {
2985 threshold = use_condition->threshold_bytes.value;
2986 } else {
2987 /*
2988 * Threshold was expressed as a ratio.
2989 *
2990 * TODO the threshold (in bytes) of conditions expressed
2991 * as a ratio of total buffer size could be cached to
2992 * forego this double-multiplication or it could be performed
2993 * as fixed-point math.
2994 *
d49487dc 2995 * Note that caching should accommodates the case where the
ab0ee2ca
JG
2996 * condition applies to multiple channels (i.e. don't assume
2997 * that all channels matching my_chann* have the same size...)
2998 */
2999 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3000 (double) buffer_capacity);
3001 }
3002
3003 condition_type = lttng_condition_get_type(condition);
3004 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3005 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3006 threshold, sample->highest_usage);
3007
3008 /*
3009 * The low condition should only be triggered once _all_ of the
3010 * streams in a channel have gone below the "low" threshold.
3011 */
3012 if (sample->highest_usage <= threshold) {
3013 result = true;
3014 }
3015 } else {
3016 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3017 threshold, sample->highest_usage);
3018
3019 /*
3020 * For high buffer usage scenarios, we want to trigger whenever
3021 * _any_ of the streams has reached the "high" threshold.
3022 */
3023 if (sample->highest_usage >= threshold) {
3024 result = true;
3025 }
3026 }
e8360425 3027
ab0ee2ca
JG
3028 return result;
3029}
3030
3031static
e8360425
JD
3032bool evaluate_session_consumed_size_condition(
3033 const struct lttng_condition *condition,
3034 uint64_t session_consumed_size)
3035{
3036 uint64_t threshold;
3037 const struct lttng_condition_session_consumed_size *size_condition =
3038 container_of(condition,
3039 struct lttng_condition_session_consumed_size,
3040 parent);
3041
3042 threshold = size_condition->consumed_threshold_bytes.value;
3043 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3044 threshold, session_consumed_size);
3045 return session_consumed_size >= threshold;
3046}
3047
3048static
ed327204 3049int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3050 struct lttng_evaluation **evaluation,
e8360425
JD
3051 const struct notification_thread_state *state,
3052 const struct channel_state_sample *previous_sample,
3053 const struct channel_state_sample *latest_sample,
3054 uint64_t previous_session_consumed_total,
3055 uint64_t latest_session_consumed_total,
3056 struct channel_info *channel_info)
ab0ee2ca
JG
3057{
3058 int ret = 0;
3059 enum lttng_condition_type condition_type;
e8360425
JD
3060 const bool previous_sample_available = !!previous_sample;
3061 bool previous_sample_result = false;
ab0ee2ca
JG
3062 bool latest_sample_result;
3063
3064 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3065
e8360425
JD
3066 switch (condition_type) {
3067 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3068 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3069 if (caa_likely(previous_sample_available)) {
3070 previous_sample_result =
3071 evaluate_buffer_usage_condition(condition,
3072 previous_sample, channel_info->capacity);
3073 }
3074 latest_sample_result = evaluate_buffer_usage_condition(
3075 condition, latest_sample,
3076 channel_info->capacity);
3077 break;
3078 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3079 if (caa_likely(previous_sample_available)) {
3080 previous_sample_result =
3081 evaluate_session_consumed_size_condition(
3082 condition,
3083 previous_session_consumed_total);
3084 }
3085 latest_sample_result =
3086 evaluate_session_consumed_size_condition(
3087 condition,
3088 latest_session_consumed_total);
3089 break;
3090 default:
3091 /* Unknown condition type; internal error. */
3092 abort();
3093 }
ab0ee2ca
JG
3094
3095 if (!latest_sample_result ||
3096 (previous_sample_result == latest_sample_result)) {
3097 /*
3098 * Only trigger on a condition evaluation transition.
3099 *
3100 * NOTE: This edge-triggered logic may not be appropriate for
3101 * future condition types.
3102 */
3103 goto end;
3104 }
3105
e8360425
JD
3106 if (!evaluation || !latest_sample_result) {
3107 goto end;
3108 }
3109
3110 switch (condition_type) {
3111 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3112 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3113 *evaluation = lttng_evaluation_buffer_usage_create(
3114 condition_type,
3115 latest_sample->highest_usage,
e8360425
JD
3116 channel_info->capacity);
3117 break;
3118 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3119 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3120 latest_session_consumed_total);
3121 break;
3122 default:
3123 abort();
3124 }
3125
3126 if (!*evaluation) {
3127 ret = -1;
3128 goto end;
ab0ee2ca
JG
3129 }
3130end:
3131 return ret;
3132}
3133
3134static
adc93634 3135int client_enqueue_dropped_notification(struct notification_client *client)
ab0ee2ca
JG
3136{
3137 int ret;
3138 struct lttng_notification_channel_message msg = {
3139 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3140 .size = 0,
3141 };
3142
3143 ret = lttng_dynamic_buffer_append(
3144 &client->communication.outbound.buffer, &msg,
3145 sizeof(msg));
3146 return ret;
3147}
3148
f37d0f86
JG
3149/*
3150 * Permission checks relative to notification channel clients are performed
3151 * here. Notice how object, client, and trigger credentials are involved in
3152 * this check.
3153 *
3154 * The `object` credentials are the credentials associated with the "subject"
3155 * of a condition. For instance, a `rotation completed` condition applies
3156 * to a session. When that condition is met, it will produce an evaluation
3157 * against a session. Hence, in this case, the `object` credentials are the
3158 * credentials of the "subject" session.
3159 *
3160 * The `trigger` credentials are the credentials of the user that registered the
3161 * trigger.
3162 *
3163 * The `client` credentials are the credentials of the user that created a given
3164 * notification channel.
3165 *
3166 * In terms of visibility, it is expected that non-privilieged users can only
3167 * register triggers against "their" objects (their own sessions and
3168 * applications they are allowed to interact with). They can then open a
3169 * notification channel and subscribe to notifications associated with those
3170 * triggers.
3171 *
3172 * As for privilieged users, they can register triggers against the objects of
3173 * other users. They can then subscribe to the notifications associated to their
3174 * triggers. Privilieged users _can't_ subscribe to the notifications of
3175 * triggers owned by other users; they must create their own triggers.
3176 *
3177 * This is more a concern of usability than security. It would be difficult for
3178 * a root user reliably subscribe to a specific set of conditions without
3179 * interference from external users (those could, for instance, unregister
3180 * their triggers).
3181 */
ab0ee2ca 3182static
9b63a4aa
JG
3183int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3184 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
3185 struct notification_client_list* client_list,
3186 struct notification_thread_state *state,
90843f49 3187 uid_t object_uid, gid_t object_gid)
ab0ee2ca
JG
3188{
3189 int ret = 0;
c0a66c84 3190 struct lttng_payload msg_payload;
ab0ee2ca 3191 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
3192 const struct lttng_notification notification = {
3193 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3194 .evaluation = (struct lttng_evaluation *) evaluation,
3195 };
3647288f
JG
3196 struct lttng_notification_channel_message msg_header = {
3197 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3198 };
90843f49 3199 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
ab0ee2ca 3200
c0a66c84 3201 lttng_payload_init(&msg_payload);
ab0ee2ca 3202
c0a66c84 3203 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3647288f 3204 sizeof(msg_header));
ab0ee2ca
JG
3205 if (ret) {
3206 goto end;
3207 }
3208
c0a66c84 3209 ret = lttng_notification_serialize(&notification, &msg_payload);
ab0ee2ca 3210 if (ret) {
ab0ee2ca
JG
3211 ERR("[notification-thread] Failed to serialize notification");
3212 ret = -1;
3213 goto end;
3214 }
3215
3647288f 3216 /* Update payload size. */
c0a66c84
JG
3217 ((struct lttng_notification_channel_message * ) msg_payload.buffer.data)->size =
3218 (uint32_t) (msg_payload.buffer.size - sizeof(msg_header));
3647288f 3219
ab0ee2ca
JG
3220 cds_list_for_each_entry_safe(client_list_element, tmp,
3221 &client_list->list, node) {
3222 struct notification_client *client =
3223 client_list_element->client;
3224
90843f49 3225 if (client->uid != object_uid && client->gid != object_gid &&
ab0ee2ca
JG
3226 client->uid != 0) {
3227 /* Client is not allowed to monitor this channel. */
90843f49
JR
3228 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3229 continue;
3230 }
3231
3232 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3233 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
ab0ee2ca
JG
3234 continue;
3235 }
3236
3237 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
c0a66c84 3238 client->socket, msg_payload.buffer.size);
ab0ee2ca
JG
3239 if (client->communication.outbound.buffer.size) {
3240 /*
3241 * Outgoing data is already buffered for this client;
3242 * drop the notification and enqueue a "dropped
3243 * notification" message if this is the first dropped
3244 * notification since the socket spilled-over to the
3245 * queue.
3246 */
3247 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3248 client->socket);
3249 if (!client->communication.outbound.dropped_notification) {
3250 client->communication.outbound.dropped_notification = true;
3251 ret = client_enqueue_dropped_notification(
adc93634 3252 client);
ab0ee2ca
JG
3253 if (ret) {
3254 goto end;
3255 }
3256 }
3257 continue;
3258 }
3259
3260 ret = lttng_dynamic_buffer_append_buffer(
3261 &client->communication.outbound.buffer,
c0a66c84 3262 &msg_payload.buffer);
ab0ee2ca
JG
3263 if (ret) {
3264 goto end;
3265 }
3266
3267 ret = client_flush_outgoing_queue(client, state);
3268 if (ret) {
3269 goto end;
3270 }
3271 }
3272 ret = 0;
3273end:
c0a66c84 3274 lttng_payload_reset(&msg_payload);
ab0ee2ca
JG
3275 return ret;
3276}
3277
3278int handle_notification_thread_channel_sample(
3279 struct notification_thread_state *state, int pipe,
3280 enum lttng_domain_type domain)
3281{
3282 int ret = 0;
3283 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
3284 struct channel_info *channel_info;
3285 struct cds_lfht_node *node;
3286 struct cds_lfht_iter iter;
3287 struct lttng_channel_trigger_list *trigger_list;
3288 struct lttng_trigger_list_element *trigger_list_element;
3289 bool previous_sample_available = false;
e8360425
JD
3290 struct channel_state_sample previous_sample, latest_sample;
3291 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
3292
3293 /*
3294 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3295 * ensuring that read/write of sampling messages are atomic.
3296 */
7c2551ef 3297 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
3298 if (ret != sizeof(sample_msg)) {
3299 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3300 pipe);
3301 ret = -1;
3302 goto end;
3303 }
3304
3305 ret = 0;
3306 latest_sample.key.key = sample_msg.key;
3307 latest_sample.key.domain = domain;
3308 latest_sample.highest_usage = sample_msg.highest;
3309 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 3310 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
3311
3312 rcu_read_lock();
3313
3314 /* Retrieve the channel's informations */
3315 cds_lfht_lookup(state->channels_ht,
3316 hash_channel_key(&latest_sample.key),
3317 match_channel_info,
3318 &latest_sample.key,
3319 &iter);
3320 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3321 if (caa_unlikely(!node)) {
ab0ee2ca
JG
3322 /*
3323 * Not an error since the consumer can push a sample to the pipe
3324 * and the rest of the session daemon could notify us of the
3325 * channel's destruction before we get a chance to process that
3326 * sample.
3327 */
3328 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3329 latest_sample.key.key,
3330 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3331 "user space");
3332 goto end_unlock;
3333 }
3334 channel_info = caa_container_of(node, struct channel_info,
3335 channels_ht_node);
e8360425 3336 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 3337 channel_info->name,
ab0ee2ca 3338 latest_sample.key.key,
8abe313a 3339 channel_info->session_info->name,
ab0ee2ca 3340 latest_sample.highest_usage,
e8360425
JD
3341 latest_sample.lowest_usage,
3342 latest_sample.channel_total_consumed);
3343
3344 previous_session_consumed_total =
3345 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
3346
3347 /* Retrieve the channel's last sample, if it exists, and update it. */
3348 cds_lfht_lookup(state->channel_state_ht,
3349 hash_channel_key(&latest_sample.key),
3350 match_channel_state_sample,
3351 &latest_sample.key,
3352 &iter);
3353 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3354 if (caa_likely(node)) {
ab0ee2ca
JG
3355 struct channel_state_sample *stored_sample;
3356
3357 /* Update the sample stored. */
3358 stored_sample = caa_container_of(node,
3359 struct channel_state_sample,
3360 channel_state_ht_node);
e8360425 3361
ab0ee2ca
JG
3362 memcpy(&previous_sample, stored_sample,
3363 sizeof(previous_sample));
3364 stored_sample->highest_usage = latest_sample.highest_usage;
3365 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3366 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3367 previous_sample_available = true;
e8360425
JD
3368
3369 latest_session_consumed_total =
3370 previous_session_consumed_total +
3371 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3372 } else {
3373 /*
3374 * This is the channel's first sample, allocate space for and
3375 * store the new sample.
3376 */
3377 struct channel_state_sample *stored_sample;
3378
3379 stored_sample = zmalloc(sizeof(*stored_sample));
3380 if (!stored_sample) {
3381 ret = -1;
3382 goto end_unlock;
3383 }
3384
3385 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3386 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3387 cds_lfht_add(state->channel_state_ht,
3388 hash_channel_key(&stored_sample->key),
3389 &stored_sample->channel_state_ht_node);
e8360425
JD
3390
3391 latest_session_consumed_total =
3392 previous_session_consumed_total +
3393 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3394 }
3395
e8360425
JD
3396 channel_info->session_info->consumed_data_size =
3397 latest_session_consumed_total;
3398
ab0ee2ca
JG
3399 /* Find triggers associated with this channel. */
3400 cds_lfht_lookup(state->channel_triggers_ht,
3401 hash_channel_key(&latest_sample.key),
3402 match_channel_trigger_list,
3403 &latest_sample.key,
3404 &iter);
3405 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3406 if (caa_likely(!node)) {
ab0ee2ca
JG
3407 goto end_unlock;
3408 }
3409
3410 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3411 channel_triggers_ht_node);
3412 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
e742b055 3413 node) {
9b63a4aa
JG
3414 const struct lttng_condition *condition;
3415 const struct lttng_action *action;
3416 const struct lttng_trigger *trigger;
ab0ee2ca
JG
3417 struct notification_client_list *client_list;
3418 struct lttng_evaluation *evaluation = NULL;
3419
3420 trigger = trigger_list_element->trigger;
9b63a4aa 3421 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 3422 assert(condition);
9b63a4aa 3423 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
3424
3425 /* Notify actions are the only type currently supported. */
9b63a4aa 3426 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
3427 LTTNG_ACTION_TYPE_NOTIFY);
3428
3429 /*
3430 * Check if any client is subscribed to the result of this
3431 * evaluation.
3432 */
ed327204
JG
3433 client_list = get_client_list_from_condition(state, condition);
3434 assert(client_list);
ab0ee2ca
JG
3435 if (cds_list_empty(&client_list->list)) {
3436 /*
3437 * No clients interested in the evaluation's result,
3438 * skip it.
3439 */
3440 continue;
3441 }
3442
ed327204 3443 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 3444 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
3445 &latest_sample,
3446 previous_session_consumed_total,
3447 latest_session_consumed_total,
3448 channel_info);
3449 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3450 goto end_unlock;
3451 }
3452
e8360425 3453 if (caa_likely(!evaluation)) {
ab0ee2ca
JG
3454 continue;
3455 }
3456
3457 /* Dispatch evaluation result to all clients. */
3458 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3459 evaluation, client_list, state,
8abe313a
JG
3460 channel_info->session_info->uid,
3461 channel_info->session_info->gid);
3462 lttng_evaluation_destroy(evaluation);
e0b5f87b 3463 if (caa_unlikely(ret)) {
ab0ee2ca
JG
3464 goto end_unlock;
3465 }
3466 }
3467end_unlock:
3468 rcu_read_unlock();
3469end:
3470 return ret;
3471}
This page took 0.206971 seconds and 5 git commands to generate.