SoW-2019-0002: Dynamic Snapshot
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <urcu.h>
10#include <urcu/rculfhash.h>
11
ab0ee2ca
JG
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/futex.h>
15#include <common/unix.h>
16#include <common/dynamic-buffer.h>
17#include <common/hashtable/utils.h>
18#include <common/sessiond-comm/sessiond-comm.h>
19#include <common/macros.h>
20#include <lttng/condition/condition.h>
9b63a4aa 21#include <lttng/action/action-internal.h>
1831ae68 22#include <lttng/action/group-internal.h>
ab0ee2ca
JG
23#include <lttng/notification/notification-internal.h>
24#include <lttng/condition/condition-internal.h>
25#include <lttng/condition/buffer-usage-internal.h>
e8360425 26#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 27#include <lttng/condition/session-rotation-internal.h>
1831ae68 28#include <lttng/condition/event-rule-internal.h>
ab0ee2ca 29#include <lttng/notification/channel-internal.h>
1831ae68 30#include <lttng/trigger/trigger-internal.h>
1da26331 31
ab0ee2ca
JG
32#include <time.h>
33#include <unistd.h>
34#include <assert.h>
35#include <inttypes.h>
d53ea4e4 36#include <fcntl.h>
ab0ee2ca 37
1da26331
JG
38#include "notification-thread.h"
39#include "notification-thread-events.h"
40#include "notification-thread-commands.h"
41#include "lttng-sessiond.h"
42#include "kernel.h"
43
ab0ee2ca
JG
44#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
45#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
46
51eab943
JG
47enum lttng_object_type {
48 LTTNG_OBJECT_TYPE_UNKNOWN,
49 LTTNG_OBJECT_TYPE_NONE,
50 LTTNG_OBJECT_TYPE_CHANNEL,
51 LTTNG_OBJECT_TYPE_SESSION,
52};
53
ab0ee2ca 54struct lttng_trigger_list_element {
9e0bb80e 55 /* No ownership of the trigger object is assumed. */
1831ae68 56 struct lttng_trigger *trigger;
ab0ee2ca
JG
57 struct cds_list_head node;
58};
59
60struct lttng_channel_trigger_list {
61 struct channel_key channel_key;
ea9a44f0 62 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 63 struct cds_list_head list;
ea9a44f0 64 /* Node in the channel_triggers_ht */
ab0ee2ca 65 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
66 /* call_rcu delayed reclaim. */
67 struct rcu_head rcu_node;
ab0ee2ca
JG
68};
69
ea9a44f0
JG
70/*
71 * List of triggers applying to a given session.
72 *
73 * See:
74 * - lttng_session_trigger_list_create()
75 * - lttng_session_trigger_list_build()
76 * - lttng_session_trigger_list_destroy()
77 * - lttng_session_trigger_list_add()
78 */
79struct lttng_session_trigger_list {
80 /*
81 * Not owned by this; points to the session_info structure's
82 * session name.
83 */
84 const char *session_name;
85 /* List of struct lttng_trigger_list_element. */
86 struct cds_list_head list;
87 /* Node in the session_triggers_ht */
88 struct cds_lfht_node session_triggers_ht_node;
89 /*
90 * Weak reference to the notification system's session triggers
91 * hashtable.
92 *
93 * The session trigger list structure structure is owned by
94 * the session's session_info.
95 *
96 * The session_info is kept alive the the channel_infos holding a
97 * reference to it (reference counting). When those channels are
98 * destroyed (at runtime or on teardown), the reference they hold
99 * to the session_info are released. On destruction of session_info,
100 * session_info_destroy() will remove the list of triggers applying
101 * to this session from the notification system's state.
102 *
103 * This implies that the session_triggers_ht must be destroyed
104 * after the channels.
105 */
106 struct cds_lfht *session_triggers_ht;
107 /* Used for delayed RCU reclaim. */
108 struct rcu_head rcu_node;
109};
110
ab0ee2ca
JG
111struct lttng_trigger_ht_element {
112 struct lttng_trigger *trigger;
113 struct cds_lfht_node node;
1831ae68 114 struct cds_lfht_node node_by_name;
83b934ad
MD
115 /* call_rcu delayed reclaim. */
116 struct rcu_head rcu_node;
ab0ee2ca
JG
117};
118
119struct lttng_condition_list_element {
120 struct lttng_condition *condition;
121 struct cds_list_head node;
122};
123
1831ae68
FD
124/*
125 * Facilities to carry the different notifications type in the action processing
126 * code path.
127 */
128struct lttng_trigger_notification {
129 union {
130 struct lttng_ust_trigger_notification *ust;
131 uint64_t *kernel;
132 } u;
133 uint64_t id;
134 enum lttng_domain_type type;
ab0ee2ca
JG
135};
136
137struct channel_state_sample {
138 struct channel_key key;
139 struct cds_lfht_node channel_state_ht_node;
140 uint64_t highest_usage;
141 uint64_t lowest_usage;
e8360425 142 uint64_t channel_total_consumed;
83b934ad
MD
143 /* call_rcu delayed reclaim. */
144 struct rcu_head rcu_node;
ab0ee2ca
JG
145};
146
e4db5ace 147static unsigned long hash_channel_key(struct channel_key *key);
ed327204 148static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 149 struct lttng_evaluation **evaluation,
e8360425
JD
150 const struct notification_thread_state *state,
151 const struct channel_state_sample *previous_sample,
152 const struct channel_state_sample *latest_sample,
153 uint64_t previous_session_consumed_total,
154 uint64_t latest_session_consumed_total,
155 struct channel_info *channel_info);
e4db5ace 156static
9b63a4aa
JG
157int send_evaluation_to_clients(const struct lttng_trigger *trigger,
158 const struct lttng_evaluation *evaluation,
e4db5ace
JR
159 struct notification_client_list *client_list,
160 struct notification_thread_state *state,
161 uid_t channel_uid, gid_t channel_gid);
162
8abe313a 163
ea9a44f0 164/* session_info API */
8abe313a
JG
165static
166void session_info_destroy(void *_data);
167static
168void session_info_get(struct session_info *session_info);
169static
170void session_info_put(struct session_info *session_info);
171static
172struct session_info *session_info_create(const char *name,
ea9a44f0 173 uid_t uid, gid_t gid,
1eee26c5
JG
174 struct lttng_session_trigger_list *trigger_list,
175 struct cds_lfht *sessions_ht);
8abe313a
JG
176static
177void session_info_add_channel(struct session_info *session_info,
178 struct channel_info *channel_info);
179static
180void session_info_remove_channel(struct session_info *session_info,
181 struct channel_info *channel_info);
182
ea9a44f0
JG
183/* lttng_session_trigger_list API */
184static
185struct lttng_session_trigger_list *lttng_session_trigger_list_create(
186 const char *session_name,
187 struct cds_lfht *session_triggers_ht);
188static
189struct lttng_session_trigger_list *lttng_session_trigger_list_build(
190 const struct notification_thread_state *state,
191 const char *session_name);
192static
193void lttng_session_trigger_list_destroy(
194 struct lttng_session_trigger_list *list);
195static
196int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1831ae68 197 struct lttng_trigger *trigger);
ea9a44f0
JG
198
199
ab0ee2ca 200static
1831ae68 201int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
202{
203 /* This double-cast is intended to supress pointer-to-cast warning. */
1831ae68
FD
204 const int socket = (int) (intptr_t) key;
205 const struct notification_client *client = caa_container_of(node,
206 struct notification_client, client_socket_ht_node);
ab0ee2ca 207
1831ae68
FD
208 return client->socket == socket;
209}
ab0ee2ca 210
1831ae68
FD
211static
212int match_client_id(struct cds_lfht_node *node, const void *key)
213{
214 /* This double-cast is intended to supress pointer-to-cast warning. */
215 const notification_client_id id = *((notification_client_id *) key);
216 const struct notification_client *client = caa_container_of(
217 node, struct notification_client, client_id_ht_node);
218
219 return client->id == id;
ab0ee2ca
JG
220}
221
222static
223int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
224{
225 struct channel_key *channel_key = (struct channel_key *) key;
226 struct lttng_channel_trigger_list *trigger_list;
227
228 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
229 channel_triggers_ht_node);
230
231 return !!((channel_key->key == trigger_list->channel_key.key) &&
232 (channel_key->domain == trigger_list->channel_key.domain));
233}
234
51eab943
JG
235static
236int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
237{
238 const char *session_name = (const char *) key;
239 struct lttng_session_trigger_list *trigger_list;
240
241 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
242 session_triggers_ht_node);
243
244 return !!(strcmp(trigger_list->session_name, session_name) == 0);
245}
246
ab0ee2ca
JG
247static
248int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
249{
250 struct channel_key *channel_key = (struct channel_key *) key;
251 struct channel_state_sample *sample;
252
253 sample = caa_container_of(node, struct channel_state_sample,
254 channel_state_ht_node);
255
256 return !!((channel_key->key == sample->key.key) &&
257 (channel_key->domain == sample->key.domain));
258}
259
260static
261int match_channel_info(struct cds_lfht_node *node, const void *key)
262{
263 struct channel_key *channel_key = (struct channel_key *) key;
264 struct channel_info *channel_info;
265
266 channel_info = caa_container_of(node, struct channel_info,
267 channels_ht_node);
268
269 return !!((channel_key->key == channel_info->key.key) &&
270 (channel_key->domain == channel_info->key.domain));
271}
272
273static
1831ae68 274int match_trigger(struct cds_lfht_node *node, const void *key)
ab0ee2ca 275{
1831ae68
FD
276 bool match = false;
277 struct lttng_trigger *trigger_key = (struct lttng_trigger *) key;
278 struct lttng_trigger_ht_element *trigger_ht_element;
279 const struct lttng_credentials *creds_key;
280 const struct lttng_credentials *creds_node;
ab0ee2ca 281
1831ae68 282 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
ab0ee2ca 283 node);
ab0ee2ca 284
1831ae68
FD
285 match = lttng_trigger_is_equal(trigger_key, trigger_ht_element->trigger);
286 if (!match) {
287 goto end;
288 }
289
290 /* Validate credential */
291 /* TODO: this could be moved to lttng_trigger_equal depending on how we
292 * handle root behaviour on disable and listing.
293 */
294 creds_key = lttng_trigger_get_credentials(trigger_key);
295 creds_node = lttng_trigger_get_credentials(trigger_ht_element->trigger);
296 match = lttng_credentials_is_equal(creds_key, creds_node);
297end:
298 return !!match;
299}
300
301static
302int match_trigger_token(struct cds_lfht_node *node, const void *key)
303{
304 const uint64_t *_key = key;
305 struct notification_trigger_tokens_ht_element *element;
306
307 element = caa_container_of(node, struct notification_trigger_tokens_ht_element,
308 node);
309 return *_key == element->token ;
ab0ee2ca
JG
310}
311
ab0ee2ca
JG
312static
313int match_client_list_condition(struct cds_lfht_node *node, const void *key)
314{
315 struct lttng_condition *condition_key = (struct lttng_condition *) key;
316 struct notification_client_list *client_list;
f82f93a1 317 const struct lttng_condition *condition;
ab0ee2ca
JG
318
319 assert(condition_key);
320
321 client_list = caa_container_of(node, struct notification_client_list,
1831ae68 322 notification_trigger_clients_ht_node);
f82f93a1 323 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
324
325 return !!lttng_condition_is_equal(condition_key, condition);
326}
327
f82f93a1
JG
328static
329int match_session(struct cds_lfht_node *node, const void *key)
330{
331 const char *name = key;
332 struct session_info *session_info = caa_container_of(
333 node, struct session_info, sessions_ht_node);
334
335 return !strcmp(session_info->name, name);
336}
337
1831ae68
FD
338/*
339 * Match function for string node.
340 */
341static int match_str(struct cds_lfht_node *node, const void *key)
342{
343 struct lttng_trigger_ht_element *trigger_ht_element;
344 const char *name;
345
346 trigger_ht_element = caa_container_of(node, struct lttng_trigger_ht_element,
347 node_by_name);
348
349 /* TODO error checking */
350 lttng_trigger_get_name(trigger_ht_element->trigger, &name);
351
352 return hash_match_key_str(name, (void *) key);
353}
354
ab0ee2ca
JG
355static
356unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 357 const struct lttng_condition *_condition)
ab0ee2ca 358{
9a2746aa
JG
359 unsigned long hash;
360 unsigned long condition_type;
ab0ee2ca
JG
361 struct lttng_condition_buffer_usage *condition;
362
363 condition = container_of(_condition,
364 struct lttng_condition_buffer_usage, parent);
365
9a2746aa
JG
366 condition_type = (unsigned long) condition->parent.type;
367 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
368 if (condition->session_name) {
369 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
370 }
371 if (condition->channel_name) {
8f56701f 372 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
373 }
374 if (condition->domain.set) {
375 hash ^= hash_key_ulong(
376 (void *) condition->domain.type,
377 lttng_ht_seed);
378 }
379 if (condition->threshold_ratio.set) {
380 uint64_t val;
381
382 val = condition->threshold_ratio.value * (double) UINT32_MAX;
383 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 384 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
385 uint64_t val;
386
387 val = condition->threshold_bytes.value;
388 hash ^= hash_key_u64(&val, lttng_ht_seed);
389 }
390 return hash;
391}
392
e8360425
JD
393static
394unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 395 const struct lttng_condition *_condition)
e8360425 396{
9a2746aa
JG
397 unsigned long hash;
398 unsigned long condition_type =
399 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
400 struct lttng_condition_session_consumed_size *condition;
401 uint64_t val;
402
403 condition = container_of(_condition,
404 struct lttng_condition_session_consumed_size, parent);
405
9a2746aa 406 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
407 if (condition->session_name) {
408 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
409 }
410 val = condition->consumed_threshold_bytes.value;
411 hash ^= hash_key_u64(&val, lttng_ht_seed);
412 return hash;
413}
414
a8393880
JG
415static
416unsigned long lttng_condition_session_rotation_hash(
417 const struct lttng_condition *_condition)
418{
419 unsigned long hash, condition_type;
420 struct lttng_condition_session_rotation *condition;
421
422 condition = container_of(_condition,
423 struct lttng_condition_session_rotation, parent);
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
426 assert(condition->session_name);
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 return hash;
429}
9a2746aa 430
1831ae68
FD
431static
432unsigned long lttng_condition_event_rule_hash(
433 const struct lttng_condition *_condition)
434{
435 unsigned long hash, condition_type;
436 struct lttng_condition_event_rule *condition;
437
438 condition = container_of(_condition,
439 struct lttng_condition_event_rule, parent);
440 condition_type = (unsigned long) condition->parent.type;
441 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
442
443 /* TODO: further hasg using the event rule? on pattern maybe?*/
444 return hash;
445}
446
ab0ee2ca
JG
447/*
448 * The lttng_condition hashing code is kept in this file (rather than
449 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
450 * don't want to link in liblttng-ctl.
451 */
452static
9b63a4aa 453unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
454{
455 switch (condition->type) {
456 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
457 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
458 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
459 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
460 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
461 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
462 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
463 return lttng_condition_session_rotation_hash(condition);
1831ae68
FD
464 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
465 return lttng_condition_event_rule_hash(condition);
ab0ee2ca
JG
466 default:
467 ERR("[notification-thread] Unexpected condition type caught");
468 abort();
469 }
470}
471
e4db5ace
JR
472static
473unsigned long hash_channel_key(struct channel_key *key)
474{
475 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
476 unsigned long domain_hash = hash_key_ulong(
477 (void *) (unsigned long) key->domain, lttng_ht_seed);
478
479 return key_hash ^ domain_hash;
480}
481
1831ae68
FD
482static
483unsigned long hash_client_socket(int socket)
484{
485 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
486}
487
488static
489unsigned long hash_client_id(notification_client_id id)
490{
491 return hash_key_u64(&id, lttng_ht_seed);
492}
493
f82f93a1
JG
494/*
495 * Get the type of object to which a given condition applies. Bindings let
496 * the notification system evaluate a trigger's condition when a given
497 * object's state is updated.
498 *
499 * For instance, a condition bound to a channel will be evaluated everytime
500 * the channel's state is changed by a channel monitoring sample.
501 */
502static
503enum lttng_object_type get_condition_binding_object(
504 const struct lttng_condition *condition)
505{
506 switch (lttng_condition_get_type(condition)) {
507 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
508 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
509 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
510 return LTTNG_OBJECT_TYPE_CHANNEL;
511 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
512 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
513 return LTTNG_OBJECT_TYPE_SESSION;
1831ae68
FD
514 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
515 return LTTNG_OBJECT_TYPE_NONE;
f82f93a1
JG
516 default:
517 return LTTNG_OBJECT_TYPE_UNKNOWN;
518 }
519}
520
83b934ad
MD
521static
522void free_channel_info_rcu(struct rcu_head *node)
523{
524 free(caa_container_of(node, struct channel_info, rcu_node));
525}
526
ab0ee2ca
JG
527static
528void channel_info_destroy(struct channel_info *channel_info)
529{
530 if (!channel_info) {
531 return;
532 }
533
8abe313a
JG
534 if (channel_info->session_info) {
535 session_info_remove_channel(channel_info->session_info,
536 channel_info);
537 session_info_put(channel_info->session_info);
ab0ee2ca 538 }
8abe313a
JG
539 if (channel_info->name) {
540 free(channel_info->name);
ab0ee2ca 541 }
83b934ad
MD
542 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
543}
544
545static
546void free_session_info_rcu(struct rcu_head *node)
547{
548 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
549}
550
8abe313a 551/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 552static
8abe313a 553void session_info_destroy(void *_data)
ab0ee2ca 554{
8abe313a 555 struct session_info *session_info = _data;
3b68d0a3 556 int ret;
ab0ee2ca 557
8abe313a
JG
558 assert(session_info);
559 if (session_info->channel_infos_ht) {
3b68d0a3
JR
560 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
561 if (ret) {
4c3f302b 562 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 563 }
8abe313a 564 }
ea9a44f0 565 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
566
567 rcu_read_lock();
568 cds_lfht_del(session_info->sessions_ht,
569 &session_info->sessions_ht_node);
570 rcu_read_unlock();
8abe313a 571 free(session_info->name);
83b934ad 572 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 573}
ab0ee2ca 574
8abe313a
JG
575static
576void session_info_get(struct session_info *session_info)
577{
578 if (!session_info) {
579 return;
580 }
581 lttng_ref_get(&session_info->ref);
582}
583
584static
585void session_info_put(struct session_info *session_info)
586{
587 if (!session_info) {
588 return;
ab0ee2ca 589 }
8abe313a
JG
590 lttng_ref_put(&session_info->ref);
591}
592
593static
ea9a44f0 594struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
595 struct lttng_session_trigger_list *trigger_list,
596 struct cds_lfht *sessions_ht)
8abe313a
JG
597{
598 struct session_info *session_info;
ab0ee2ca 599
8abe313a 600 assert(name);
ab0ee2ca 601
8abe313a
JG
602 session_info = zmalloc(sizeof(*session_info));
603 if (!session_info) {
604 goto end;
605 }
606 lttng_ref_init(&session_info->ref, session_info_destroy);
607
608 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
609 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
610 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
611 goto error;
612 }
8abe313a
JG
613
614 cds_lfht_node_init(&session_info->sessions_ht_node);
615 session_info->name = strdup(name);
616 if (!session_info->name) {
ab0ee2ca
JG
617 goto error;
618 }
8abe313a
JG
619 session_info->uid = uid;
620 session_info->gid = gid;
ea9a44f0 621 session_info->trigger_list = trigger_list;
1eee26c5 622 session_info->sessions_ht = sessions_ht;
8abe313a
JG
623end:
624 return session_info;
625error:
626 session_info_put(session_info);
627 return NULL;
628}
629
630static
631void session_info_add_channel(struct session_info *session_info,
632 struct channel_info *channel_info)
633{
634 rcu_read_lock();
635 cds_lfht_add(session_info->channel_infos_ht,
636 hash_channel_key(&channel_info->key),
637 &channel_info->session_info_channels_ht_node);
638 rcu_read_unlock();
639}
640
641static
642void session_info_remove_channel(struct session_info *session_info,
643 struct channel_info *channel_info)
644{
645 rcu_read_lock();
646 cds_lfht_del(session_info->channel_infos_ht,
647 &channel_info->session_info_channels_ht_node);
648 rcu_read_unlock();
649}
650
651static
652struct channel_info *channel_info_create(const char *channel_name,
653 struct channel_key *channel_key, uint64_t channel_capacity,
654 struct session_info *session_info)
655{
656 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
657
658 if (!channel_info) {
659 goto end;
660 }
661
ab0ee2ca 662 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
663 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
664 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
665 channel_info->capacity = channel_capacity;
666
667 channel_info->name = strdup(channel_name);
668 if (!channel_info->name) {
669 goto error;
670 }
671
672 /*
673 * Set the references between session and channel infos:
674 * - channel_info holds a strong reference to session_info
675 * - session_info holds a weak reference to channel_info
676 */
677 session_info_get(session_info);
678 session_info_add_channel(session_info, channel_info);
679 channel_info->session_info = session_info;
ab0ee2ca 680end:
8abe313a 681 return channel_info;
ab0ee2ca 682error:
8abe313a 683 channel_info_destroy(channel_info);
ab0ee2ca
JG
684 return NULL;
685}
686
1831ae68
FD
687LTTNG_HIDDEN
688bool notification_client_list_get(struct notification_client_list *list)
689{
690 return urcu_ref_get_unless_zero(&list->ref);
691}
692
693static
694void free_notification_client_list_rcu(struct rcu_head *node)
695{
696 free(caa_container_of(node, struct notification_client_list,
697 rcu_node));
698}
699
700static
701void notification_client_list_release(struct urcu_ref *list_ref)
702{
703 struct notification_client_list *list =
704 container_of(list_ref, typeof(*list), ref);
705 struct notification_client_list_element *client_list_element, *tmp;
706
707 if (list->notification_trigger_clients_ht) {
708 rcu_read_lock();
709 cds_lfht_del(list->notification_trigger_clients_ht,
710 &list->notification_trigger_clients_ht_node);
711 rcu_read_unlock();
712 list->notification_trigger_clients_ht = NULL;
713 }
714 cds_list_for_each_entry_safe(client_list_element, tmp,
715 &list->list, node) {
716 free(client_list_element);
717 }
718 pthread_mutex_destroy(&list->lock);
719 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
720}
721
722static
723struct notification_client_list *notification_client_list_create(
724 const struct lttng_trigger *trigger)
725{
726 struct notification_client_list *client_list =
727 zmalloc(sizeof(*client_list));
728
729 if (!client_list) {
730 goto error;
731 }
732 pthread_mutex_init(&client_list->lock, NULL);
733 urcu_ref_init(&client_list->ref);
734 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
735 CDS_INIT_LIST_HEAD(&client_list->list);
736 client_list->trigger = trigger;
737error:
738 return client_list;
739}
740
741static
742void publish_notification_client_list(
743 struct notification_thread_state *state,
744 struct notification_client_list *list)
745{
746 const struct lttng_condition *condition =
747 lttng_trigger_get_const_condition(list->trigger);
748
749 assert(!list->notification_trigger_clients_ht);
750
751 list->notification_trigger_clients_ht =
752 state->notification_trigger_clients_ht;
753
754 rcu_read_lock();
755 cds_lfht_add(state->notification_trigger_clients_ht,
756 lttng_condition_hash(condition),
757 &list->notification_trigger_clients_ht_node);
758 rcu_read_unlock();
759}
760
761LTTNG_HIDDEN
762void notification_client_list_put(struct notification_client_list *list)
763{
764 if (!list) {
765 return;
766 }
767 return urcu_ref_put(&list->ref, notification_client_list_release);
768}
769
770/* Provides a reference to the returned list. */
ed327204
JG
771static
772struct notification_client_list *get_client_list_from_condition(
773 struct notification_thread_state *state,
774 const struct lttng_condition *condition)
775{
776 struct cds_lfht_node *node;
777 struct cds_lfht_iter iter;
1831ae68 778 struct notification_client_list *list = NULL;
ed327204 779
1831ae68 780 rcu_read_lock();
ed327204
JG
781 cds_lfht_lookup(state->notification_trigger_clients_ht,
782 lttng_condition_hash(condition),
783 match_client_list_condition,
784 condition,
785 &iter);
786 node = cds_lfht_iter_get_node(&iter);
1831ae68
FD
787 if (node) {
788 list = container_of(node, struct notification_client_list,
789 notification_trigger_clients_ht_node);
790 list = notification_client_list_get(list) ? list : NULL;
791 }
792 rcu_read_unlock();
793 return list;
ed327204
JG
794}
795
e4db5ace 796static
f82f93a1
JG
797int evaluate_channel_condition_for_client(
798 const struct lttng_condition *condition,
799 struct notification_thread_state *state,
800 struct lttng_evaluation **evaluation,
801 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
802{
803 int ret;
804 struct cds_lfht_iter iter;
805 struct cds_lfht_node *node;
806 struct channel_info *channel_info = NULL;
807 struct channel_key *channel_key = NULL;
808 struct channel_state_sample *last_sample = NULL;
809 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 810
1831ae68
FD
811 rcu_read_lock();
812
f82f93a1 813 /* Find the channel associated with the condition. */
e4db5ace 814 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 815 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
816 struct lttng_trigger_list_element *element;
817
818 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 819 const struct lttng_condition *current_condition =
f82f93a1 820 lttng_trigger_get_const_condition(
e4db5ace
JR
821 element->trigger);
822
823 assert(current_condition);
824 if (!lttng_condition_is_equal(condition,
2ae99f0b 825 current_condition)) {
e4db5ace
JR
826 continue;
827 }
828
829 /* Found the trigger, save the channel key. */
830 channel_key = &channel_trigger_list->channel_key;
831 break;
832 }
833 if (channel_key) {
834 /* The channel key was found stop iteration. */
835 break;
836 }
837 }
838
839 if (!channel_key){
840 /* No channel found; normal exit. */
f82f93a1 841 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
842 ret = 0;
843 goto end;
844 }
845
846 /* Fetch channel info for the matching channel. */
847 cds_lfht_lookup(state->channels_ht,
848 hash_channel_key(channel_key),
849 match_channel_info,
850 channel_key,
851 &iter);
852 node = cds_lfht_iter_get_node(&iter);
853 assert(node);
854 channel_info = caa_container_of(node, struct channel_info,
855 channels_ht_node);
856
857 /* Retrieve the channel's last sample, if it exists. */
858 cds_lfht_lookup(state->channel_state_ht,
859 hash_channel_key(channel_key),
860 match_channel_state_sample,
861 channel_key,
862 &iter);
863 node = cds_lfht_iter_get_node(&iter);
864 if (node) {
865 last_sample = caa_container_of(node,
866 struct channel_state_sample,
867 channel_state_ht_node);
868 } else {
869 /* Nothing to evaluate, no sample was ever taken. Normal exit */
870 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
871 ret = 0;
872 goto end;
873 }
874
f82f93a1 875 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
876 NULL, last_sample,
877 0, channel_info->session_info->consumed_data_size,
878 channel_info);
e4db5ace 879 if (ret) {
066a3c82 880 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
881 goto end;
882 }
883
f82f93a1
JG
884 *session_uid = channel_info->session_info->uid;
885 *session_gid = channel_info->session_info->gid;
886end:
1831ae68 887 rcu_read_unlock();
f82f93a1
JG
888 return ret;
889}
890
891static
892const char *get_condition_session_name(const struct lttng_condition *condition)
893{
894 const char *session_name = NULL;
895 enum lttng_condition_status status;
896
897 switch (lttng_condition_get_type(condition)) {
898 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
899 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
900 status = lttng_condition_buffer_usage_get_session_name(
901 condition, &session_name);
902 break;
903 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
904 status = lttng_condition_session_consumed_size_get_session_name(
905 condition, &session_name);
906 break;
907 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
908 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
909 status = lttng_condition_session_rotation_get_session_name(
910 condition, &session_name);
911 break;
912 default:
913 abort();
914 }
915 if (status != LTTNG_CONDITION_STATUS_OK) {
916 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
917 goto end;
918 }
919end:
920 return session_name;
921}
922
f82f93a1
JG
923static
924int evaluate_session_condition_for_client(
925 const struct lttng_condition *condition,
926 struct notification_thread_state *state,
927 struct lttng_evaluation **evaluation,
928 uid_t *session_uid, gid_t *session_gid)
929{
930 int ret;
931 struct cds_lfht_iter iter;
932 struct cds_lfht_node *node;
933 const char *session_name;
934 struct session_info *session_info = NULL;
935
1831ae68 936 rcu_read_lock();
f82f93a1
JG
937 session_name = get_condition_session_name(condition);
938
939 /* Find the session associated with the trigger. */
940 cds_lfht_lookup(state->sessions_ht,
941 hash_key_str(session_name, lttng_ht_seed),
942 match_session,
943 session_name,
944 &iter);
945 node = cds_lfht_iter_get_node(&iter);
946 if (!node) {
947 DBG("[notification-thread] No known session matching name \"%s\"",
948 session_name);
949 ret = 0;
950 goto end;
951 }
952
953 session_info = caa_container_of(node, struct session_info,
954 sessions_ht_node);
955 session_info_get(session_info);
956
957 /*
958 * Evaluation is performed in-line here since only one type of
959 * session-bound condition is handled for the moment.
960 */
961 switch (lttng_condition_get_type(condition)) {
962 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
963 if (!session_info->rotation.ongoing) {
be558f88 964 ret = 0;
f82f93a1
JG
965 goto end_session_put;
966 }
967
968 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
969 session_info->rotation.id);
970 if (!*evaluation) {
971 /* Fatal error. */
972 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
973 session_info->name);
974 ret = -1;
975 goto end_session_put;
976 }
977 ret = 0;
978 break;
979 default:
980 ret = 0;
981 goto end_session_put;
982 }
983
984 *session_uid = session_info->uid;
985 *session_gid = session_info->gid;
986
987end_session_put:
988 session_info_put(session_info);
989end:
1831ae68 990 rcu_read_unlock();
f82f93a1
JG
991 return ret;
992}
993
f82f93a1
JG
994static
995int evaluate_condition_for_client(const struct lttng_trigger *trigger,
996 const struct lttng_condition *condition,
997 struct notification_client *client,
998 struct notification_thread_state *state)
999{
1000 int ret;
1001 struct lttng_evaluation *evaluation = NULL;
1831ae68
FD
1002 struct notification_client_list client_list = {
1003 .lock = PTHREAD_MUTEX_INITIALIZER,
1004 };
f82f93a1
JG
1005 struct notification_client_list_element client_list_element = { 0 };
1006 uid_t object_uid = 0;
1007 gid_t object_gid = 0;
1008
1009 assert(trigger);
1010 assert(condition);
1011 assert(client);
1012 assert(state);
1013
1014 switch (get_condition_binding_object(condition)) {
1015 case LTTNG_OBJECT_TYPE_SESSION:
1016 ret = evaluate_session_condition_for_client(condition, state,
1017 &evaluation, &object_uid, &object_gid);
1018 break;
1019 case LTTNG_OBJECT_TYPE_CHANNEL:
1020 ret = evaluate_channel_condition_for_client(condition, state,
1021 &evaluation, &object_uid, &object_gid);
1022 break;
1023 case LTTNG_OBJECT_TYPE_NONE:
1831ae68 1024 DBG("[notification-thread] Newly subscribed-to condition not binded to object, nothing to evaluate");
f82f93a1
JG
1025 ret = 0;
1026 goto end;
1027 case LTTNG_OBJECT_TYPE_UNKNOWN:
1028 default:
1029 ret = -1;
1030 goto end;
1031 }
af0c318d
JG
1032 if (ret) {
1033 /* Fatal error. */
1034 goto end;
1035 }
e4db5ace
JR
1036 if (!evaluation) {
1037 /* Evaluation yielded nothing. Normal exit. */
1038 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1039 ret = 0;
1040 goto end;
1041 }
1042
1043 /*
1044 * Create a temporary client list with the client currently
1045 * subscribing.
1046 */
1831ae68 1047 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
e4db5ace
JR
1048 CDS_INIT_LIST_HEAD(&client_list.list);
1049 client_list.trigger = trigger;
1050
1051 CDS_INIT_LIST_HEAD(&client_list_element.node);
1052 client_list_element.client = client;
1053 cds_list_add(&client_list_element.node, &client_list.list);
1054
1055 /* Send evaluation result to the newly-subscribed client. */
1056 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1057 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 1058 state, object_uid, object_gid);
e4db5ace
JR
1059
1060end:
1061 return ret;
1062}
1063
ab0ee2ca
JG
1064static
1065int notification_thread_client_subscribe(struct notification_client *client,
1066 struct lttng_condition *condition,
1067 struct notification_thread_state *state,
1068 enum lttng_notification_channel_status *_status)
1069{
1070 int ret = 0;
1831ae68 1071 struct notification_client_list *client_list = NULL;
ab0ee2ca
JG
1072 struct lttng_condition_list_element *condition_list_element = NULL;
1073 struct notification_client_list_element *client_list_element = NULL;
1074 enum lttng_notification_channel_status status =
1075 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1076
1077 /*
1078 * Ensure that the client has not already subscribed to this condition
1079 * before.
1080 */
1081 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1082 if (lttng_condition_is_equal(condition_list_element->condition,
1083 condition)) {
1084 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1085 goto end;
1086 }
1087 }
1088
1089 condition_list_element = zmalloc(sizeof(*condition_list_element));
1090 if (!condition_list_element) {
1091 ret = -1;
1092 goto error;
1093 }
1094 client_list_element = zmalloc(sizeof(*client_list_element));
1095 if (!client_list_element) {
1096 ret = -1;
1097 goto error;
1098 }
1099
ab0ee2ca
JG
1100 /*
1101 * Add the newly-subscribed condition to the client's subscription list.
1102 */
1103 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1104 condition_list_element->condition = condition;
1105 cds_list_add(&condition_list_element->node, &client->condition_list);
1106
ed327204
JG
1107 client_list = get_client_list_from_condition(state, condition);
1108 if (!client_list) {
2ae99f0b
JG
1109 /*
1110 * No notification-emiting trigger registered with this
1111 * condition. We don't evaluate the condition right away
1112 * since this trigger is not registered yet.
1113 */
4fb43b68 1114 free(client_list_element);
1831ae68 1115 goto end;
ab0ee2ca
JG
1116 }
1117
2ae99f0b
JG
1118 /*
1119 * The condition to which the client just subscribed is evaluated
1120 * at this point so that conditions that are already TRUE result
1121 * in a notification being sent out.
1831ae68
FD
1122 *
1123 * The client_list's trigger is used without locking the list itself.
1124 * This is correct since the list doesn't own the trigger and the
1125 * object is immutable.
2ae99f0b 1126 */
e4db5ace
JR
1127 if (evaluate_condition_for_client(client_list->trigger, condition,
1128 client, state)) {
1129 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1130 ret = -1;
4bd5b4af 1131 free(client_list_element);
1831ae68 1132 goto end;
e4db5ace
JR
1133 }
1134
1135 /*
1136 * Add the client to the list of clients interested in a given trigger
1137 * if a "notification" trigger with a corresponding condition was
1138 * added prior.
1139 */
ab0ee2ca
JG
1140 client_list_element->client = client;
1141 CDS_INIT_LIST_HEAD(&client_list_element->node);
1831ae68
FD
1142
1143 pthread_mutex_lock(&client_list->lock);
ab0ee2ca 1144 cds_list_add(&client_list_element->node, &client_list->list);
1831ae68 1145 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca
JG
1146end:
1147 if (_status) {
1148 *_status = status;
1149 }
1831ae68
FD
1150 if (client_list) {
1151 notification_client_list_put(client_list);
1152 }
ab0ee2ca
JG
1153 return ret;
1154error:
1155 free(condition_list_element);
1156 free(client_list_element);
1157 return ret;
1158}
1159
1160static
1161int notification_thread_client_unsubscribe(
1162 struct notification_client *client,
1163 struct lttng_condition *condition,
1164 struct notification_thread_state *state,
1165 enum lttng_notification_channel_status *_status)
1166{
ab0ee2ca
JG
1167 struct notification_client_list *client_list;
1168 struct lttng_condition_list_element *condition_list_element,
1169 *condition_tmp;
1170 struct notification_client_list_element *client_list_element,
1171 *client_tmp;
1172 bool condition_found = false;
1173 enum lttng_notification_channel_status status =
1174 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1175
1176 /* Remove the condition from the client's condition list. */
1177 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1178 &client->condition_list, node) {
1179 if (!lttng_condition_is_equal(condition_list_element->condition,
1180 condition)) {
1181 continue;
1182 }
1183
1184 cds_list_del(&condition_list_element->node);
1185 /*
1186 * The caller may be iterating on the client's conditions to
1187 * tear down a client's connection. In this case, the condition
1188 * will be destroyed at the end.
1189 */
1190 if (condition != condition_list_element->condition) {
1191 lttng_condition_destroy(
1192 condition_list_element->condition);
1193 }
1194 free(condition_list_element);
1195 condition_found = true;
1196 break;
1197 }
1198
1199 if (!condition_found) {
1200 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1201 goto end;
1202 }
1203
1204 /*
1205 * Remove the client from the list of clients interested the trigger
1206 * matching the condition.
1207 */
ed327204
JG
1208 client_list = get_client_list_from_condition(state, condition);
1209 if (!client_list) {
1831ae68 1210 goto end;
ab0ee2ca
JG
1211 }
1212
1831ae68 1213 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
1214 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1215 &client_list->list, node) {
1831ae68 1216 if (client_list_element->client->id != client->id) {
ab0ee2ca
JG
1217 continue;
1218 }
1219 cds_list_del(&client_list_element->node);
1220 free(client_list_element);
1221 break;
1222 }
1831ae68
FD
1223 pthread_mutex_unlock(&client_list->lock);
1224 notification_client_list_put(client_list);
1225 client_list = NULL;
ab0ee2ca
JG
1226end:
1227 lttng_condition_destroy(condition);
1228 if (_status) {
1229 *_status = status;
1230 }
1231 return 0;
1232}
1233
83b934ad
MD
1234static
1235void free_notification_client_rcu(struct rcu_head *node)
1236{
1237 free(caa_container_of(node, struct notification_client, rcu_node));
1238}
1239
ab0ee2ca
JG
1240static
1241void notification_client_destroy(struct notification_client *client,
1242 struct notification_thread_state *state)
1243{
ab0ee2ca
JG
1244 if (!client) {
1245 return;
1246 }
1247
1831ae68
FD
1248 /*
1249 * The client object is not reachable by other threads, no need to lock
1250 * the client here.
1251 */
ab0ee2ca
JG
1252 if (client->socket >= 0) {
1253 (void) lttcomm_close_unix_sock(client->socket);
1831ae68 1254 client->socket = -1;
ab0ee2ca 1255 }
1831ae68 1256 client->communication.active = false;
ab0ee2ca
JG
1257 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1258 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
1831ae68 1259 pthread_mutex_destroy(&client->lock);
83b934ad 1260 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1261}
1262
1263/*
1264 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1265 * client pointer).
1266 */
1267static
1268struct notification_client *get_client_from_socket(int socket,
1269 struct notification_thread_state *state)
1270{
1271 struct cds_lfht_iter iter;
1272 struct cds_lfht_node *node;
1273 struct notification_client *client = NULL;
1274
1275 cds_lfht_lookup(state->client_socket_ht,
1831ae68
FD
1276 hash_client_socket(socket),
1277 match_client_socket,
ab0ee2ca
JG
1278 (void *) (unsigned long) socket,
1279 &iter);
1280 node = cds_lfht_iter_get_node(&iter);
1281 if (!node) {
1282 goto end;
1283 }
1284
1285 client = caa_container_of(node, struct notification_client,
1286 client_socket_ht_node);
1287end:
1288 return client;
1289}
1290
1831ae68
FD
1291/*
1292 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1293 * client pointer).
1294 */
1295static
1296struct notification_client *get_client_from_id(notification_client_id id,
1297 struct notification_thread_state *state)
1298{
1299 struct cds_lfht_iter iter;
1300 struct cds_lfht_node *node;
1301 struct notification_client *client = NULL;
1302
1303 cds_lfht_lookup(state->client_id_ht,
1304 hash_client_id(id),
1305 match_client_id,
1306 &id,
1307 &iter);
1308 node = cds_lfht_iter_get_node(&iter);
1309 if (!node) {
1310 goto end;
1311 }
1312
1313 client = caa_container_of(node, struct notification_client,
1314 client_id_ht_node);
1315end:
1316 return client;
1317}
1318
ab0ee2ca 1319static
e8360425 1320bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1321 const struct lttng_condition *condition,
1322 const struct channel_info *channel_info)
ab0ee2ca
JG
1323{
1324 enum lttng_condition_status status;
e8360425
JD
1325 enum lttng_domain_type condition_domain;
1326 const char *condition_session_name = NULL;
1327 const char *condition_channel_name = NULL;
ab0ee2ca 1328
e8360425
JD
1329 status = lttng_condition_buffer_usage_get_domain_type(condition,
1330 &condition_domain);
1331 assert(status == LTTNG_CONDITION_STATUS_OK);
1332 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1333 goto fail;
1334 }
1335
e8360425
JD
1336 status = lttng_condition_buffer_usage_get_session_name(
1337 condition, &condition_session_name);
1338 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1339
1340 status = lttng_condition_buffer_usage_get_channel_name(
1341 condition, &condition_channel_name);
1342 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1343
1344 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1345 goto fail;
1346 }
1347 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1348 goto fail;
1349 }
1350
e8360425
JD
1351 return true;
1352fail:
1353 return false;
1354}
1355
1356static
1357bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1358 const struct lttng_condition *condition,
1359 const struct channel_info *channel_info)
e8360425
JD
1360{
1361 enum lttng_condition_status status;
1362 const char *condition_session_name = NULL;
1363
1364 status = lttng_condition_session_consumed_size_get_session_name(
1365 condition, &condition_session_name);
1366 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1367
1368 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1369 goto fail;
1370 }
1371
e8360425
JD
1372 return true;
1373fail:
1374 return false;
1375}
ab0ee2ca 1376
51eab943
JG
1377static
1378bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1379 const struct channel_info *channel_info)
1380{
1381 const struct lttng_condition *condition;
e8360425 1382 bool trigger_applies;
ab0ee2ca 1383
51eab943 1384 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1385 if (!condition) {
ab0ee2ca
JG
1386 goto fail;
1387 }
e8360425
JD
1388
1389 switch (lttng_condition_get_type(condition)) {
1390 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1391 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1392 trigger_applies = buffer_usage_condition_applies_to_channel(
1393 condition, channel_info);
1394 break;
1395 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1396 trigger_applies = session_consumed_size_condition_applies_to_channel(
1397 condition, channel_info);
1398 break;
1399 default:
ab0ee2ca
JG
1400 goto fail;
1401 }
1402
e8360425 1403 return trigger_applies;
ab0ee2ca
JG
1404fail:
1405 return false;
1406}
1407
1408static
1409bool trigger_applies_to_client(struct lttng_trigger *trigger,
1410 struct notification_client *client)
1411{
1412 bool applies = false;
1413 struct lttng_condition_list_element *condition_list_element;
1414
1415 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1416 node) {
1417 applies = lttng_condition_is_equal(
1418 condition_list_element->condition,
1419 lttng_trigger_get_condition(trigger));
1420 if (applies) {
1421 break;
1422 }
1423 }
1424 return applies;
1425}
1426
ed327204
JG
1427/* Must be called with RCU read lock held. */
1428static
1429struct lttng_session_trigger_list *get_session_trigger_list(
1430 struct notification_thread_state *state,
1431 const char *session_name)
1432{
1433 struct lttng_session_trigger_list *list = NULL;
1434 struct cds_lfht_node *node;
1435 struct cds_lfht_iter iter;
1436
1437 cds_lfht_lookup(state->session_triggers_ht,
1438 hash_key_str(session_name, lttng_ht_seed),
1439 match_session_trigger_list,
1440 session_name,
1441 &iter);
1442 node = cds_lfht_iter_get_node(&iter);
1443 if (!node) {
1444 /*
1445 * Not an error, the list of triggers applying to that session
1446 * will be initialized when the session is created.
1447 */
1448 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1449 session_name);
1450 goto end;
1451 }
1452
1453 list = caa_container_of(node,
1454 struct lttng_session_trigger_list,
1455 session_triggers_ht_node);
1456end:
1457 return list;
1458}
1459
ea9a44f0
JG
1460/*
1461 * Allocate an empty lttng_session_trigger_list for the session named
1462 * 'session_name'.
1463 *
1464 * No ownership of 'session_name' is assumed by the session trigger list.
1465 * It is the caller's responsability to ensure the session name is alive
1466 * for as long as this list is.
1467 */
1468static
1469struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1470 const char *session_name,
1471 struct cds_lfht *session_triggers_ht)
1472{
1473 struct lttng_session_trigger_list *list;
1474
1475 list = zmalloc(sizeof(*list));
1476 if (!list) {
1477 goto end;
1478 }
1479 list->session_name = session_name;
1480 CDS_INIT_LIST_HEAD(&list->list);
1481 cds_lfht_node_init(&list->session_triggers_ht_node);
1482 list->session_triggers_ht = session_triggers_ht;
1483
1484 rcu_read_lock();
1485 /* Publish the list through the session_triggers_ht. */
1486 cds_lfht_add(session_triggers_ht,
1487 hash_key_str(session_name, lttng_ht_seed),
1488 &list->session_triggers_ht_node);
1489 rcu_read_unlock();
1490end:
1491 return list;
1492}
1493
1494static
1495void free_session_trigger_list_rcu(struct rcu_head *node)
1496{
1497 free(caa_container_of(node, struct lttng_session_trigger_list,
1498 rcu_node));
1499}
1500
1501static
1502void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1503{
1504 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1505
1506 /* Empty the list element by element, and then free the list itself. */
1507 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1508 &list->list, node) {
1509 cds_list_del(&trigger_list_element->node);
1510 free(trigger_list_element);
1511 }
1512 rcu_read_lock();
1513 /* Unpublish the list from the session_triggers_ht. */
1514 cds_lfht_del(list->session_triggers_ht,
1515 &list->session_triggers_ht_node);
1516 rcu_read_unlock();
1517 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1518}
1519
1520static
1521int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1831ae68 1522 struct lttng_trigger *trigger)
ea9a44f0
JG
1523{
1524 int ret = 0;
1525 struct lttng_trigger_list_element *new_element =
1526 zmalloc(sizeof(*new_element));
1527
1528 if (!new_element) {
1529 ret = -1;
1530 goto end;
1531 }
1532 CDS_INIT_LIST_HEAD(&new_element->node);
1533 new_element->trigger = trigger;
1534 cds_list_add(&new_element->node, &list->list);
1535end:
1536 return ret;
1537}
1538
1539static
1540bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1541 const char *session_name)
1542{
1543 bool applies = false;
1544 const struct lttng_condition *condition;
1545
1546 condition = lttng_trigger_get_const_condition(trigger);
1547 switch (lttng_condition_get_type(condition)) {
1548 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1549 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1550 {
1551 enum lttng_condition_status condition_status;
1552 const char *condition_session_name;
1553
1554 condition_status = lttng_condition_session_rotation_get_session_name(
1555 condition, &condition_session_name);
1556 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1557 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1558 goto end;
1559 }
1560
1561 assert(condition_session_name);
1562 applies = !strcmp(condition_session_name, session_name);
1563 break;
1564 }
1565 default:
1566 goto end;
1567 }
1568end:
1569 return applies;
1570}
1571
1572/*
1573 * Allocate and initialize an lttng_session_trigger_list which contains
1574 * all triggers that apply to the session named 'session_name'.
1575 *
1576 * No ownership of 'session_name' is assumed by the session trigger list.
1577 * It is the caller's responsability to ensure the session name is alive
1578 * for as long as this list is.
1579 */
1580static
1581struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1582 const struct notification_thread_state *state,
1583 const char *session_name)
1584{
1585 int trigger_count = 0;
1586 struct lttng_session_trigger_list *session_trigger_list = NULL;
1587 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1588 struct cds_lfht_iter iter;
1589
1590 session_trigger_list = lttng_session_trigger_list_create(session_name,
1591 state->session_triggers_ht);
1592
1593 /* Add all triggers applying to the session named 'session_name'. */
1594 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1595 node) {
1596 int ret;
1597
1598 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1599 session_name)) {
1600 continue;
1601 }
1602
1603 ret = lttng_session_trigger_list_add(session_trigger_list,
1604 trigger_ht_element->trigger);
1605 if (ret) {
1606 goto error;
1607 }
1608
1609 trigger_count++;
1610 }
1611
1612 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1613 trigger_count);
1614 return session_trigger_list;
1615error:
1616 lttng_session_trigger_list_destroy(session_trigger_list);
1617 return NULL;
1618}
1619
8abe313a
JG
1620static
1621struct session_info *find_or_create_session_info(
1622 struct notification_thread_state *state,
1623 const char *name, uid_t uid, gid_t gid)
1624{
1625 struct session_info *session = NULL;
1626 struct cds_lfht_node *node;
1627 struct cds_lfht_iter iter;
ea9a44f0 1628 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1629
1630 rcu_read_lock();
1631 cds_lfht_lookup(state->sessions_ht,
1632 hash_key_str(name, lttng_ht_seed),
1633 match_session,
1634 name,
1635 &iter);
1636 node = cds_lfht_iter_get_node(&iter);
1637 if (node) {
1638 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1639 name, uid, gid);
1640 session = caa_container_of(node, struct session_info,
1641 sessions_ht_node);
1642 assert(session->uid == uid);
1643 assert(session->gid == gid);
1eee26c5
JG
1644 session_info_get(session);
1645 goto end;
ea9a44f0
JG
1646 }
1647
1648 trigger_list = lttng_session_trigger_list_build(state, name);
1649 if (!trigger_list) {
1650 goto error;
8abe313a
JG
1651 }
1652
1eee26c5
JG
1653 session = session_info_create(name, uid, gid, trigger_list,
1654 state->sessions_ht);
8abe313a
JG
1655 if (!session) {
1656 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1657 name, uid, gid);
b248644d 1658 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1659 goto error;
8abe313a 1660 }
ea9a44f0 1661 trigger_list = NULL;
577983cb
JG
1662
1663 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1664 &session->sessions_ht_node);
1eee26c5 1665end:
8abe313a
JG
1666 rcu_read_unlock();
1667 return session;
ea9a44f0
JG
1668error:
1669 rcu_read_unlock();
1670 session_info_put(session);
1671 return NULL;
8abe313a
JG
1672}
1673
ab0ee2ca
JG
1674static
1675int handle_notification_thread_command_add_channel(
8abe313a
JG
1676 struct notification_thread_state *state,
1677 const char *session_name, uid_t session_uid, gid_t session_gid,
1678 const char *channel_name, enum lttng_domain_type channel_domain,
1679 uint64_t channel_key_int, uint64_t channel_capacity,
1680 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1681{
1682 struct cds_list_head trigger_list;
8abe313a
JG
1683 struct channel_info *new_channel_info = NULL;
1684 struct channel_key channel_key = {
1685 .key = channel_key_int,
1686 .domain = channel_domain,
1687 };
ab0ee2ca
JG
1688 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1689 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1690 int trigger_count = 0;
1691 struct cds_lfht_iter iter;
8abe313a 1692 struct session_info *session_info = NULL;
ab0ee2ca
JG
1693
1694 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1695 channel_name, session_name, channel_key_int,
1696 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1697
1698 CDS_INIT_LIST_HEAD(&trigger_list);
1699
8abe313a
JG
1700 session_info = find_or_create_session_info(state, session_name,
1701 session_uid, session_gid);
1702 if (!session_info) {
a9577b76 1703 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1704 goto error;
1705 }
1706
8abe313a
JG
1707 new_channel_info = channel_info_create(channel_name, &channel_key,
1708 channel_capacity, session_info);
1709 if (!new_channel_info) {
1710 goto error;
1711 }
ab0ee2ca 1712
83b934ad 1713 rcu_read_lock();
ab0ee2ca
JG
1714 /* Build a list of all triggers applying to the new channel. */
1715 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1716 node) {
1717 struct lttng_trigger_list_element *new_element;
1718
1719 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1720 new_channel_info)) {
ab0ee2ca
JG
1721 continue;
1722 }
1723
1724 new_element = zmalloc(sizeof(*new_element));
1725 if (!new_element) {
83b934ad 1726 rcu_read_unlock();
ab0ee2ca
JG
1727 goto error;
1728 }
1729 CDS_INIT_LIST_HEAD(&new_element->node);
1730 new_element->trigger = trigger_ht_element->trigger;
1731 cds_list_add(&new_element->node, &trigger_list);
1732 trigger_count++;
1733 }
83b934ad 1734 rcu_read_unlock();
ab0ee2ca
JG
1735
1736 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1737 trigger_count);
1738 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1739 if (!channel_trigger_list) {
1740 goto error;
1741 }
8abe313a 1742 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1743 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1744 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1745 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1746
1747 rcu_read_lock();
1748 /* Add channel to the channel_ht which owns the channel_infos. */
1749 cds_lfht_add(state->channels_ht,
8abe313a 1750 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1751 &new_channel_info->channels_ht_node);
1752 /*
1753 * Add the list of triggers associated with this channel to the
1754 * channel_triggers_ht.
1755 */
1756 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1757 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1758 &channel_trigger_list->channel_triggers_ht_node);
1759 rcu_read_unlock();
1eee26c5 1760 session_info_put(session_info);
ab0ee2ca
JG
1761 *cmd_result = LTTNG_OK;
1762 return 0;
1763error:
ab0ee2ca 1764 channel_info_destroy(new_channel_info);
8abe313a 1765 session_info_put(session_info);
ab0ee2ca
JG
1766 return 1;
1767}
1768
83b934ad
MD
1769static
1770void free_channel_trigger_list_rcu(struct rcu_head *node)
1771{
1772 free(caa_container_of(node, struct lttng_channel_trigger_list,
1773 rcu_node));
1774}
1775
1776static
1777void free_channel_state_sample_rcu(struct rcu_head *node)
1778{
1779 free(caa_container_of(node, struct channel_state_sample,
1780 rcu_node));
1781}
1782
ab0ee2ca
JG
1783static
1784int handle_notification_thread_command_remove_channel(
1785 struct notification_thread_state *state,
1786 uint64_t channel_key, enum lttng_domain_type domain,
1787 enum lttng_error_code *cmd_result)
1788{
1789 struct cds_lfht_node *node;
1790 struct cds_lfht_iter iter;
1791 struct lttng_channel_trigger_list *trigger_list;
1792 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1793 struct channel_key key = { .key = channel_key, .domain = domain };
1794 struct channel_info *channel_info;
1795
1796 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1797 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1798
1799 rcu_read_lock();
1800
1801 cds_lfht_lookup(state->channel_triggers_ht,
1802 hash_channel_key(&key),
1803 match_channel_trigger_list,
1804 &key,
1805 &iter);
1806 node = cds_lfht_iter_get_node(&iter);
1807 /*
1808 * There is a severe internal error if we are being asked to remove a
1809 * channel that doesn't exist.
1810 */
1811 if (!node) {
1812 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1813 goto end;
1814 }
1815
1816 /* Free the list of triggers associated with this channel. */
1817 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1818 channel_triggers_ht_node);
1819 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1820 &trigger_list->list, node) {
1821 cds_list_del(&trigger_list_element->node);
1822 free(trigger_list_element);
1823 }
1824 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1825 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1826
1827 /* Free sampled channel state. */
1828 cds_lfht_lookup(state->channel_state_ht,
1829 hash_channel_key(&key),
1830 match_channel_state_sample,
1831 &key,
1832 &iter);
1833 node = cds_lfht_iter_get_node(&iter);
1834 /*
1835 * This is expected to be NULL if the channel is destroyed before we
1836 * received a sample.
1837 */
1838 if (node) {
1839 struct channel_state_sample *sample = caa_container_of(node,
1840 struct channel_state_sample,
1841 channel_state_ht_node);
1842
1843 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1844 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1845 }
1846
1847 /* Remove the channel from the channels_ht and free it. */
1848 cds_lfht_lookup(state->channels_ht,
1849 hash_channel_key(&key),
1850 match_channel_info,
1851 &key,
1852 &iter);
1853 node = cds_lfht_iter_get_node(&iter);
1854 assert(node);
1855 channel_info = caa_container_of(node, struct channel_info,
1856 channels_ht_node);
1857 cds_lfht_del(state->channels_ht, node);
1858 channel_info_destroy(channel_info);
1859end:
1860 rcu_read_unlock();
1861 *cmd_result = LTTNG_OK;
1862 return 0;
1863}
1864
0ca52944 1865static
ed327204 1866int handle_notification_thread_command_session_rotation(
0ca52944 1867 struct notification_thread_state *state,
ed327204
JG
1868 enum notification_thread_command_type cmd_type,
1869 const char *session_name, uid_t session_uid, gid_t session_gid,
1870 uint64_t trace_archive_chunk_id,
1871 struct lttng_trace_archive_location *location,
1872 enum lttng_error_code *_cmd_result)
0ca52944 1873{
ed327204
JG
1874 int ret = 0;
1875 enum lttng_error_code cmd_result = LTTNG_OK;
1876 struct lttng_session_trigger_list *trigger_list;
1877 struct lttng_trigger_list_element *trigger_list_element;
1878 struct session_info *session_info;
0ca52944 1879
ed327204
JG
1880 rcu_read_lock();
1881
1882 session_info = find_or_create_session_info(state, session_name,
1883 session_uid, session_gid);
1884 if (!session_info) {
a9577b76 1885 /* Allocation error or an internal error occurred. */
ed327204
JG
1886 ret = -1;
1887 cmd_result = LTTNG_ERR_NOMEM;
1888 goto end;
1889 }
1890
1891 session_info->rotation.ongoing =
1892 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1893 session_info->rotation.id = trace_archive_chunk_id;
1894 trigger_list = get_session_trigger_list(state, session_name);
1895 if (!trigger_list) {
1896 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1897 session_name);
1898 goto end;
1899 }
1900
1901 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1902 node) {
1903 const struct lttng_condition *condition;
1904 const struct lttng_action *action;
1905 const struct lttng_trigger *trigger;
1906 struct notification_client_list *client_list;
1907 struct lttng_evaluation *evaluation = NULL;
1908 enum lttng_condition_type condition_type;
1831ae68 1909 bool client_list_is_empty;
ed327204
JG
1910
1911 trigger = trigger_list_element->trigger;
1912 condition = lttng_trigger_get_const_condition(trigger);
1913 assert(condition);
1914 condition_type = lttng_condition_get_type(condition);
1915
1916 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1917 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1918 continue;
1919 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1920 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1921 continue;
1922 }
1923
1924 action = lttng_trigger_get_const_action(trigger);
1925
1926 /* Notify actions are the only type currently supported. */
1927 assert(lttng_action_get_type_const(action) ==
1928 LTTNG_ACTION_TYPE_NOTIFY);
1929
1930 client_list = get_client_list_from_condition(state, condition);
1931 assert(client_list);
1932
1831ae68
FD
1933 pthread_mutex_lock(&client_list->lock);
1934 client_list_is_empty = cds_list_empty(&client_list->list);
1935 pthread_mutex_unlock(&client_list->lock);
1936 if (client_list_is_empty) {
ed327204
JG
1937 /*
1938 * No clients interested in the evaluation's result,
1939 * skip it.
1940 */
1941 continue;
1942 }
1943
1944 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1945 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1946 trace_archive_chunk_id);
1947 } else {
1948 evaluation = lttng_evaluation_session_rotation_completed_create(
1949 trace_archive_chunk_id, location);
1950 }
1951
1952 if (!evaluation) {
1953 /* Internal error */
1954 ret = -1;
1955 cmd_result = LTTNG_ERR_UNK;
1831ae68 1956 goto put_list;
ed327204
JG
1957 }
1958
1959 /* Dispatch evaluation result to all clients. */
1960 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1961 evaluation, client_list, state,
1962 session_info->uid,
1963 session_info->gid);
1964 lttng_evaluation_destroy(evaluation);
1831ae68
FD
1965put_list:
1966 notification_client_list_put(client_list);
ed327204 1967 if (caa_unlikely(ret)) {
1831ae68 1968 break;
ed327204
JG
1969 }
1970 }
1971end:
1972 session_info_put(session_info);
1973 *_cmd_result = cmd_result;
1974 rcu_read_unlock();
1975 return ret;
0ca52944
JG
1976}
1977
1da26331 1978static
1831ae68
FD
1979int handle_notification_thread_command_add_application(
1980 struct notification_thread_handle *handle,
1981 struct notification_thread_state *state,
1982 int read_side_trigger_event_application_pipe,
1983 enum lttng_error_code *_cmd_result)
1da26331 1984{
1831ae68
FD
1985 int ret = 0;
1986 enum lttng_error_code cmd_result = LTTNG_OK;
1987 struct notification_event_trigger_source_element *element = NULL;
1da26331 1988
1831ae68
FD
1989 element = zmalloc(sizeof(*element));
1990 if (!element) {
1991 cmd_result = LTTNG_ERR_NOMEM;
1992 ret = -1;
1993 goto end;
1994 }
1da26331 1995
1831ae68
FD
1996 CDS_INIT_LIST_HEAD(&element->node);
1997 element->fd = read_side_trigger_event_application_pipe;
1da26331 1998
1831ae68
FD
1999 pthread_mutex_lock(&handle->event_trigger_sources.lock);
2000 cds_list_add(&element->node, &handle->event_trigger_sources.list);
2001 pthread_mutex_unlock(&handle->event_trigger_sources.lock);
1da26331 2002
1831ae68
FD
2003 /* TODO: remove on failure to add to list? */
2004
2005 /* Adding the read side pipe to the event poll */
2006 ret = lttng_poll_add(&state->events,
2007 read_side_trigger_event_application_pipe,
2008 LPOLLIN | LPOLLERR);
2009
2010 DBG3("[notification-thread] Adding application event source from fd: %d", read_side_trigger_event_application_pipe);
2011 if (ret < 0) {
2012 /* TODO: what should be the value of cmd_result??? */
2013 ERR("[notification-thread] Failed to add event source pipe fd to pollset");
2014 goto end;
1da26331 2015 }
1831ae68 2016
1da26331 2017end:
1831ae68 2018 *_cmd_result = cmd_result;
1da26331
JG
2019 return ret;
2020}
2021
51eab943 2022static
1831ae68
FD
2023int handle_notification_thread_command_remove_application(
2024 struct notification_thread_handle *handle,
2025 struct notification_thread_state *state,
2026 int read_side_trigger_event_application_pipe,
2027 enum lttng_error_code *_cmd_result)
51eab943
JG
2028{
2029 int ret = 0;
1831ae68 2030 enum lttng_error_code cmd_result = LTTNG_OK;
51eab943 2031
1831ae68
FD
2032 /* TODO: missing a lock propably to revisit */
2033 struct notification_event_trigger_source_element *source_element, *tmp;
2034 cds_list_for_each_entry_safe(source_element, tmp,
2035 &handle->event_trigger_sources.list, node) {
2036 if (source_element->fd != read_side_trigger_event_application_pipe) {
2037 continue;
2038 }
2039
2040 DBG("[notification-thread] Removed event source from event source list");
2041 cds_list_del(&source_element->node);
2042 break;
2043 }
2044
2045 DBG3("[notification-thread] Removing application event source from fd: %d", read_side_trigger_event_application_pipe);
2046 /* Removing the read side pipe to the event poll */
2047 ret = lttng_poll_del(&state->events,
2048 read_side_trigger_event_application_pipe);
2049 if (ret < 0) {
2050 /* TODO: what should be the value of cmd_result??? */
2051 ERR("[notification-thread] Failed to remove event source pipe fd from pollset");
2052 goto end;
2053 }
2054
2055end:
2056 *_cmd_result = cmd_result;
2057 return ret;
2058}
2059
2060static int handle_notification_thread_command_get_tokens(
2061 struct notification_thread_handle *handle,
2062 struct notification_thread_state *state,
2063 struct lttng_triggers **triggers,
2064 enum lttng_error_code *_cmd_result)
2065{
2066 int ret = 0, i = 0;
2067 enum lttng_error_code cmd_result = LTTNG_OK;
2068 struct cds_lfht_iter iter;
2069 struct notification_trigger_tokens_ht_element *element;
2070 struct lttng_triggers *local_triggers = NULL;
2071
2072 local_triggers = lttng_triggers_create();
2073 if (!local_triggers) {
2074 cmd_result = LTTNG_ERR_NOMEM;
2075 goto end;
2076 }
2077
2078 rcu_read_lock();
2079 cds_lfht_for_each_entry (
2080 state->trigger_tokens_ht, &iter, element, node) {
2081 ret = lttng_triggers_add(local_triggers, element->trigger);
2082 if (ret < 0) {
2083 cmd_result = LTTNG_ERR_FATAL;
2084 ret = -1;
2085 goto end;
2086 }
2087
2088 /* Ownership is shared with the lttng_triggers object */
2089 lttng_trigger_get(element->trigger);
2090
2091 i++;
2092 }
2093
2094 /* Passing ownership up */
2095 *triggers = local_triggers;
2096 local_triggers = NULL;
2097
2098end:
2099 rcu_read_unlock();
2100 lttng_triggers_destroy(local_triggers);
2101 *_cmd_result = cmd_result;
2102 return ret;
2103}
2104
2105static
2106int handle_notification_thread_command_list_triggers(
2107 struct notification_thread_handle *handle,
2108 struct notification_thread_state *state,
2109 uid_t uid,
2110 gid_t gid,
2111 struct lttng_triggers **triggers,
2112 enum lttng_error_code *_cmd_result)
2113{
2114 int ret = 0, i = 0;
2115 enum lttng_error_code cmd_result = LTTNG_OK;
2116 struct cds_lfht_iter iter;
2117 struct lttng_trigger_ht_element *trigger_ht_element;
2118 struct lttng_triggers *local_triggers = NULL;
2119 const struct lttng_credentials *creds;
2120
2121 long scb, sca;
2122 unsigned long count;
2123
2124 rcu_read_lock();
2125 cds_lfht_count_nodes(state->triggers_ht, &scb, &count, &sca);
2126
2127 /* TODO check downcasting */
2128 local_triggers = lttng_triggers_create();
2129 if (!local_triggers) {
2130 cmd_result = LTTNG_ERR_NOMEM;
2131 goto end;
2132 }
2133
2134 cds_lfht_for_each_entry (state->triggers_ht, &iter,
2135 trigger_ht_element, node) {
2136 /* Only return the trigger for which the requestion client have
2137 * access. For now the root user can only list its own
2138 * triggers.
2139 * TODO: root user behavior
2140 */
2141 creds = lttng_trigger_get_credentials(trigger_ht_element->trigger);
2142 if ((uid != creds->uid) || (gid != creds->gid)) {
2143 continue;
2144 }
2145
2146 ret = lttng_triggers_add(local_triggers, trigger_ht_element->trigger);
2147 if (ret < 0) {
2148 ret = -1;
2149 goto end;
2150 }
2151 /* Ownership is shared with the lttng_triggers object */
2152 lttng_trigger_get(trigger_ht_element->trigger);
2153
2154 i++;
2155 }
2156
2157 /* Passing ownership up */
2158 *triggers = local_triggers;
2159 local_triggers = NULL;
2160
2161end:
2162 rcu_read_unlock();
2163 lttng_triggers_destroy(local_triggers);
2164 *_cmd_result = cmd_result;
2165 return ret;
2166}
2167
2168static
2169int condition_is_supported(struct lttng_condition *condition)
2170{
2171 int ret;
2172
2173 switch (lttng_condition_get_type(condition)) {
2174 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
2175 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
2176 {
2177 enum lttng_domain_type domain;
2178
2179 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2180 &domain);
2181 if (ret) {
2182 ret = -1;
2183 goto end;
2184 }
2185
2186 if (domain != LTTNG_DOMAIN_KERNEL) {
2187 ret = 1;
2188 goto end;
2189 }
2190
2191 /*
2192 * Older kernel tracers don't expose the API to monitor their
2193 * buffers. Therefore, we reject triggers that require that
2194 * mechanism to be available to be evaluated.
2195 */
2196 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
2197 break;
2198 }
2199 case LTTNG_CONDITION_TYPE_EVENT_RULE_HIT:
2200 {
2201 /* TODO:
2202 * Check for kernel support.
2203 * Check for ust support ??
2204 */
2205 ret = 1;
2206 break;
2207 }
2208 default:
2209 ret = 1;
2210 }
2211end:
2212 return ret;
2213}
2214
2215static
2216int action_is_supported(struct lttng_action *action)
2217{
2218 int ret;
2219
2220 switch (lttng_action_get_type(action)) {
2221 case LTTNG_ACTION_TYPE_NOTIFY:
2222 case LTTNG_ACTION_TYPE_START_SESSION:
2223 case LTTNG_ACTION_TYPE_STOP_SESSION:
2224 case LTTNG_ACTION_TYPE_ROTATE_SESSION:
2225 case LTTNG_ACTION_TYPE_SNAPSHOT_SESSION:
2226 {
2227 /* TODO validate that this is true for kernel in regards to
2228 * rotation and snapshot. Start stop is not a problem notify
2229 * either.
2230 */
2231 /* For now all type of actions are supported */
2232 ret = 1;
2233 break;
2234 }
2235 case LTTNG_ACTION_TYPE_GROUP:
2236 {
2237 /* TODO: Iterate over all internal actions and validate that
2238 * they are supported
2239 */
2240 ret = 1;
2241 break;
2242
2243 }
2244 default:
2245 ret = 1;
2246 }
2247
2248 return ret;
2249}
2250
2251/* Must be called with RCU read lock held. */
2252static
2253int bind_trigger_to_matching_session(struct lttng_trigger *trigger,
2254 struct notification_thread_state *state)
2255{
2256 int ret = 0;
2257 const struct lttng_condition *condition;
2258 const char *session_name;
2259 struct lttng_session_trigger_list *trigger_list;
2260
2261 condition = lttng_trigger_get_const_condition(trigger);
2262 switch (lttng_condition_get_type(condition)) {
2263 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2264 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2265 {
2266 enum lttng_condition_status status;
51eab943
JG
2267
2268 status = lttng_condition_session_rotation_get_session_name(
2269 condition, &session_name);
2270 if (status != LTTNG_CONDITION_STATUS_OK) {
2271 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2272 ret = -1;
2273 goto end;
2274 }
2275 break;
2276 }
2277 default:
2278 ret = -1;
2279 goto end;
2280 }
2281
ed327204
JG
2282 trigger_list = get_session_trigger_list(state, session_name);
2283 if (!trigger_list) {
51eab943
JG
2284 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2285 session_name);
2286 goto end;
51eab943 2287
ed327204 2288 }
51eab943
JG
2289
2290 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2291 session_name);
2292 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2293end:
2294 return ret;
2295}
2296
2297/* Must be called with RCU read lock held. */
2298static
1831ae68 2299int bind_trigger_to_matching_channels(struct lttng_trigger *trigger,
51eab943
JG
2300 struct notification_thread_state *state)
2301{
2302 int ret = 0;
2303 struct cds_lfht_node *node;
2304 struct cds_lfht_iter iter;
2305 struct channel_info *channel;
2306
2307 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2308 channels_ht_node) {
2309 struct lttng_trigger_list_element *trigger_list_element;
2310 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 2311 struct cds_lfht_iter lookup_iter;
51eab943
JG
2312
2313 if (!trigger_applies_to_channel(trigger, channel)) {
2314 continue;
2315 }
2316
2317 cds_lfht_lookup(state->channel_triggers_ht,
2318 hash_channel_key(&channel->key),
2319 match_channel_trigger_list,
2320 &channel->key,
a5d64ae7
MD
2321 &lookup_iter);
2322 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
2323 assert(node);
2324 trigger_list = caa_container_of(node,
2325 struct lttng_channel_trigger_list,
2326 channel_triggers_ht_node);
2327
2328 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2329 if (!trigger_list_element) {
2330 ret = -1;
2331 goto end;
2332 }
2333 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2334 trigger_list_element->trigger = trigger;
2335 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2336 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2337 channel->name);
2338 }
2339end:
2340 return ret;
2341}
2342
1831ae68 2343static int action_notify_register_trigger(
8abe313a 2344 struct notification_thread_state *state,
1831ae68 2345 struct lttng_trigger *trigger)
ab0ee2ca 2346{
1831ae68 2347
ab0ee2ca
JG
2348 int ret = 0;
2349 struct lttng_condition *condition;
2350 struct notification_client *client;
2351 struct notification_client_list *client_list = NULL;
ab0ee2ca 2352 struct cds_lfht_iter iter;
1831ae68 2353 struct notification_client_list_element *client_list_element, *tmp;
ab0ee2ca
JG
2354
2355 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2356 assert(condition);
2357
1831ae68 2358 client_list = notification_client_list_create(trigger);
ab0ee2ca
JG
2359 if (!client_list) {
2360 ret = -1;
1831ae68 2361 goto end;
ab0ee2ca 2362 }
ab0ee2ca
JG
2363
2364 /* Build a list of clients to which this new trigger applies. */
2365 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2366 client_socket_ht_node) {
2367 if (!trigger_applies_to_client(trigger, client)) {
2368 continue;
2369 }
2370
2371 client_list_element = zmalloc(sizeof(*client_list_element));
2372 if (!client_list_element) {
2373 ret = -1;
1831ae68 2374 goto error_put_client_list;
ab0ee2ca
JG
2375 }
2376 CDS_INIT_LIST_HEAD(&client_list_element->node);
2377 client_list_element->client = client;
2378 cds_list_add(&client_list_element->node, &client_list->list);
2379 }
2380
f82f93a1 2381 switch (get_condition_binding_object(condition)) {
51eab943
JG
2382 case LTTNG_OBJECT_TYPE_SESSION:
2383 /* Add the trigger to the list if it matches a known session. */
2384 ret = bind_trigger_to_matching_session(trigger, state);
2385 if (ret) {
1831ae68 2386 goto error_put_client_list;
ab0ee2ca 2387 }
f82f93a1 2388 break;
51eab943
JG
2389 case LTTNG_OBJECT_TYPE_CHANNEL:
2390 /*
2391 * Add the trigger to list of triggers bound to the channels
2392 * currently known.
2393 */
2394 ret = bind_trigger_to_matching_channels(trigger, state);
2395 if (ret) {
1831ae68 2396 goto error_put_client_list;
ab0ee2ca 2397 }
51eab943
JG
2398 break;
2399 case LTTNG_OBJECT_TYPE_NONE:
2400 break;
2401 default:
2402 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2403 ret = -1;
1831ae68 2404 goto error_put_client_list;
ab0ee2ca
JG
2405 }
2406
2ae99f0b
JG
2407 /*
2408 * Since there is nothing preventing clients from subscribing to a
2409 * condition before the corresponding trigger is registered, we have
2410 * to evaluate this new condition right away.
2411 *
2412 * At some point, we were waiting for the next "evaluation" (e.g. on
2413 * reception of a channel sample) to evaluate this new condition, but
2414 * that was broken.
2415 *
2416 * The reason it was broken is that waiting for the next sample
2417 * does not allow us to properly handle transitions for edge-triggered
2418 * conditions.
2419 *
2420 * Consider this example: when we handle a new channel sample, we
2421 * evaluate each conditions twice: once with the previous state, and
2422 * again with the newest state. We then use those two results to
2423 * determine whether a state change happened: a condition was false and
2424 * became true. If a state change happened, we have to notify clients.
2425 *
2426 * Now, if a client subscribes to a given notification and registers
2427 * a trigger *after* that subscription, we have to make sure the
2428 * condition is evaluated at this point while considering only the
2429 * current state. Otherwise, the next evaluation cycle may only see
2430 * that the evaluations remain the same (true for samples n-1 and n) and
2431 * the client will never know that the condition has been met.
1831ae68
FD
2432 *
2433 * No need to lock the list here as it has not been published yet.
2ae99f0b
JG
2434 */
2435 cds_list_for_each_entry_safe(client_list_element, tmp,
2436 &client_list->list, node) {
2437 ret = evaluate_condition_for_client(trigger, condition,
2438 client_list_element->client, state);
2439 if (ret) {
1831ae68 2440 goto error_put_client_list;
2ae99f0b
JG
2441 }
2442 }
2443
2444 /*
2445 * Client list ownership transferred to the
2446 * notification_trigger_clients_ht.
2447 */
1831ae68 2448 publish_notification_client_list(state, client_list);
2ae99f0b 2449 client_list = NULL;
1831ae68
FD
2450error_put_client_list:
2451 notification_client_list_put(client_list);
2452end:
2453 return ret;
2454}
2ae99f0b 2455
1831ae68
FD
2456static
2457bool trigger_name_taken(struct notification_thread_state *state, const char *name)
2458{
2459 struct cds_lfht_node *triggers_by_name_ht_node;
2460 struct cds_lfht_iter iter;
2461 /* TODO change hashing for trigger */
2462 cds_lfht_lookup(state->triggers_by_name_ht,
2463 hash_key_str(name, lttng_ht_seed),
2464 match_str,
2465 name,
2466 &iter);
2467 triggers_by_name_ht_node = cds_lfht_iter_get_node(&iter);
2468 if (triggers_by_name_ht_node) {
2469 return true;
2470 } else {
2471 return false;
2472 }
2473
2474}
2475static
2476void generate_trigger_name(struct notification_thread_state *state, struct lttng_trigger *trigger, const char **name)
2477{
2478 /* Here the offset criteria guarantee an end. This will be a nice
2479 * bikeshedding conversation. I would simply generate uuid and use them
2480 * as trigger name.
2481 */
2482 bool taken = false;
2483 do {
2484 lttng_trigger_generate_name(trigger, state->trigger_id.name_offset);
2485 /* TODO error checking */
2486 lttng_trigger_get_name(trigger, name);
2487 taken = trigger_name_taken(state, *name);
2488 if (taken) {
2489 state->trigger_id.name_offset++;
2490 }
2491 } while (taken || state->trigger_id.name_offset == UINT32_MAX);
2492}
2493
2494static bool action_is_notify(const struct lttng_action *action)
2495{
2496 /* TODO for action groups we need to iterate over all of them */
2497 enum lttng_action_type type = lttng_action_get_type_const(action);
2498 bool ret = false;
2499 enum lttng_action_status status;
2500 const struct lttng_action *tmp;
2501 unsigned int i, count;
2502
2503 switch (type) {
2504 case LTTNG_ACTION_TYPE_NOTIFY:
2505 ret = true;
2506 break;
2507 case LTTNG_ACTION_TYPE_GROUP:
2508 status = lttng_action_group_get_count(action, &count);
2509 if (status != LTTNG_ACTION_STATUS_OK) {
2510 assert(0);
2511 }
2512 for (i = 0; i < count; i++) {
2513 tmp = lttng_action_group_get_at_index_const(action, i);
2514 assert(tmp);
2515 ret = action_is_notify(tmp);
2516 if (ret) {
2517 break;
2518 }
2519 }
2520 break;
2521 default:
2522 ret = false;
2523 break;
2524 }
2525
2526 return ret;
2527}
2528
2529/*
2530 * TODO: REVIEW THIS COMMENT.
2531 * FIXME A client's credentials are not checked when registering a trigger, nor
2532 * are they stored alongside with the trigger.
2533 *
2534 * The effects of this are benign since:
2535 * - The client will succeed in registering the trigger, as it is valid,
2536 * - The trigger will, internally, be bound to the channel/session,
2537 * - The notifications will not be sent since the client's credentials
2538 * are checked against the channel at that moment.
2539 *
2540 * If this function returns a non-zero value, it means something is
2541 * fundamentally broken and the whole subsystem/thread will be torn down.
2542 *
2543 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2544 * error code.
2545 */
2546static
2547int handle_notification_thread_command_register_trigger(
2548 struct notification_thread_state *state,
2549 struct lttng_trigger *trigger,
2550 enum lttng_error_code *cmd_result)
2551{
2552 int ret = 0;
2553 int is_supported;
2554 struct lttng_condition *condition;
2555 struct lttng_action *action;
2556 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2557 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element = NULL;
2558 struct cds_lfht_node *node;
2559 const char* trigger_name;
2560 bool free_trigger = true;
2561
2562 assert(trigger->creds.set);
2563
2564 rcu_read_lock();
2565
2566 /* Set the trigger's key */
2567 lttng_trigger_set_key(trigger, state->trigger_id.token_generator);
2568
2569 if (lttng_trigger_get_name(trigger, &trigger_name) == LTTNG_TRIGGER_STATUS_UNSET) {
2570 generate_trigger_name(state, trigger, &trigger_name);
2571 } else if (trigger_name_taken(state, trigger_name)) {
2572 /* Not a fatal error */
2573 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2574 ret = 0;
2575 goto error;
2576 }
2577
2578 condition = lttng_trigger_get_condition(trigger);
2579 assert(condition);
2580
2581 action = lttng_trigger_get_action(trigger);
2582 assert(action);
2583
2584 is_supported = condition_is_supported(condition);
2585 if (is_supported < 0) {
2586 goto error;
2587 } else if (is_supported == 0) {
2588 ret = 0;
2589 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2590 goto error;
2591 }
2592
2593 is_supported = action_is_supported(action);
2594 if (is_supported < 0) {
2595 goto error;
2596 } else if (is_supported == 0) {
2597 ret = 0;
2598 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2599 goto error;
2600 }
2601
2602 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2603 if (!trigger_ht_element) {
2604 ret = -1;
2605 goto error;
2606 }
2607
2608 /* Add trigger to the trigger_ht. */
2609 cds_lfht_node_init(&trigger_ht_element->node);
2610 cds_lfht_node_init(&trigger_ht_element->node_by_name);
2611
2612 /*
2613 * This element own the trigger object from now own, this is why there
2614 * is no lttng_trigger_get here.
2615 * This thread is now the owner of the trigger object.
2616 */
2617 trigger_ht_element->trigger = trigger;
2618
2619 node = cds_lfht_add_unique(state->triggers_ht,
2620 lttng_condition_hash(condition),
2621 match_trigger,
2622 trigger,
2623 &trigger_ht_element->node);
2624 if (node != &trigger_ht_element->node) {
2625 /* Not a fatal error, simply report it to the client. */
2626 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2627 goto error_free_ht_element;
2628 }
2629
2630 node = cds_lfht_add_unique(state->triggers_by_name_ht,
2631 hash_key_str(trigger_name, lttng_ht_seed),
2632 match_str,
2633 trigger_name,
2634 &trigger_ht_element->node_by_name);
2635 if (node != &trigger_ht_element->node_by_name) {
2636 /* This should never happen */
2637 /* Not a fatal error, simply report it to the client. */
2638 /* TODO remove from the trigger_ht */
2639 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2640 goto error_free_ht_element;
2641 }
2642
2643 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2644 trigger_tokens_ht_element = zmalloc(sizeof(*trigger_tokens_ht_element));
2645 if (!trigger_tokens_ht_element) {
2646 ret = -1;
2647 goto error;
2648 }
2649
2650 /* Add trigger token to the trigger_tokens_ht. */
2651 cds_lfht_node_init(&trigger_tokens_ht_element->node);
2652 trigger_tokens_ht_element->token = trigger->key.value;
2653 trigger_tokens_ht_element->trigger = trigger;
2654
2655 node = cds_lfht_add_unique(state->trigger_tokens_ht,
2656 hash_key_u64(&trigger_tokens_ht_element->token, lttng_ht_seed),
2657 match_trigger_token,
2658 &trigger_tokens_ht_element->token,
2659 &trigger_tokens_ht_element->node);
2660 if (node != &trigger_tokens_ht_element->node) {
2661 /* TODO: THIS IS A FATAL ERROR... should never happen */
2662 /* Not a fatal error, simply report it to the client. */
2663 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2664 goto error_free_ht_element;
2665 }
2666 }
2667
2668 /*
2669 * Ownership of the trigger and of its wrapper was transfered to
2670 * the triggers_ht. Same for token ht element if necessary.
2671 */
2672 trigger_tokens_ht_element = NULL;
2673 trigger_ht_element = NULL;
2674 free_trigger = false;
2675
2676 if (action_is_notify(action)) {
2677 ret = action_notify_register_trigger(state, trigger);
2678 if (ret < 0) {
2679 /* TODO should cmd_result be set here? */
2680 ret = -1;
2681 goto error_free_ht_element;
ab0ee2ca 2682 }
ab0ee2ca 2683 }
1831ae68
FD
2684
2685 /* Increment the trigger unique id generator */
2686 state->trigger_id.token_generator++;
2687 *cmd_result = LTTNG_OK;
2688
ab0ee2ca
JG
2689error_free_ht_element:
2690 free(trigger_ht_element);
1831ae68 2691 free(trigger_tokens_ht_element);
ab0ee2ca
JG
2692error:
2693 if (free_trigger) {
ab0ee2ca
JG
2694 lttng_trigger_destroy(trigger);
2695 }
2696 rcu_read_unlock();
2697 return ret;
2698}
2699
83b934ad 2700static
1831ae68 2701void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
83b934ad 2702{
1831ae68 2703 free(caa_container_of(node, struct lttng_trigger_ht_element,
83b934ad
MD
2704 rcu_node));
2705}
2706
2707static
1831ae68 2708void free_notification_trigger_tokens_ht_element_rcu(struct rcu_head *node)
83b934ad 2709{
1831ae68 2710 free(caa_container_of(node, struct notification_trigger_tokens_ht_element,
83b934ad
MD
2711 rcu_node));
2712}
2713
cc2295b5 2714static
ab0ee2ca
JG
2715int handle_notification_thread_command_unregister_trigger(
2716 struct notification_thread_state *state,
2717 struct lttng_trigger *trigger,
2718 enum lttng_error_code *_cmd_reply)
2719{
2720 struct cds_lfht_iter iter;
ed327204 2721 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2722 struct lttng_channel_trigger_list *trigger_list;
2723 struct notification_client_list *client_list;
ab0ee2ca
JG
2724 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2725 struct lttng_condition *condition = lttng_trigger_get_condition(
2726 trigger);
1831ae68 2727 struct lttng_action *action = lttng_trigger_get_action(trigger);
ab0ee2ca
JG
2728 enum lttng_error_code cmd_reply;
2729
2730 rcu_read_lock();
2731
1831ae68
FD
2732 /* TODO change hashing for trigger */
2733 /* TODO Disabling for the root user is not complete, for now the root
2734 * user cannot disable the trigger from another user.
2735 */
ab0ee2ca
JG
2736 cds_lfht_lookup(state->triggers_ht,
2737 lttng_condition_hash(condition),
1831ae68
FD
2738 match_trigger,
2739 trigger,
ab0ee2ca
JG
2740 &iter);
2741 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2742 if (!triggers_ht_node) {
2743 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2744 goto end;
2745 } else {
2746 cmd_reply = LTTNG_OK;
2747 }
2748
2749 /* Remove trigger from channel_triggers_ht. */
2750 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2751 channel_triggers_ht_node) {
2752 struct lttng_trigger_list_element *trigger_element, *tmp;
2753
2754 cds_list_for_each_entry_safe(trigger_element, tmp,
2755 &trigger_list->list, node) {
1831ae68 2756 if (!lttng_trigger_is_equal(trigger, trigger_element->trigger)) {
ab0ee2ca
JG
2757 continue;
2758 }
2759
2760 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2761 cds_list_del(&trigger_element->node);
e4db5ace
JR
2762 /* A trigger can only appear once per channel */
2763 break;
ab0ee2ca
JG
2764 }
2765 }
2766
1831ae68
FD
2767 if (lttng_condition_get_type(condition) == LTTNG_CONDITION_TYPE_EVENT_RULE_HIT) {
2768 struct notification_trigger_tokens_ht_element *trigger_tokens_ht_element;
2769 cds_lfht_for_each_entry(state->trigger_tokens_ht, &iter, trigger_tokens_ht_element,
2770 node) {
2771 if (!lttng_trigger_is_equal(trigger, trigger_tokens_ht_element->trigger)) {
2772 continue;
2773 }
ed327204 2774
1831ae68
FD
2775 /* TODO talk to all app and remove it */
2776 DBG("[notification-thread] Removed trigger from tokens_ht");
2777 cds_lfht_del(state->trigger_tokens_ht,
2778 &trigger_tokens_ht_element->node);
2779 call_rcu(&trigger_tokens_ht_element->rcu_node, free_notification_trigger_tokens_ht_element_rcu);
2780
2781 break;
2782 }
2783 }
2784
2785 if (action_is_notify(action)) {
2786 /*
2787 * Remove and release the client list from
2788 * notification_trigger_clients_ht.
2789 */
2790 client_list = get_client_list_from_condition(state, condition);
2791 assert(client_list);
2792
2793 /* Put new reference and the hashtable's reference. */
2794 notification_client_list_put(client_list);
2795 notification_client_list_put(client_list);
2796 client_list = NULL;
ab0ee2ca 2797 }
ab0ee2ca
JG
2798
2799 /* Remove trigger from triggers_ht. */
2800 trigger_ht_element = caa_container_of(triggers_ht_node,
2801 struct lttng_trigger_ht_element, node);
1831ae68 2802 cds_lfht_del(state->triggers_by_name_ht, &trigger_ht_element->node_by_name);
ab0ee2ca
JG
2803 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2804
1831ae68 2805 /* Release the ownership of the trigger */
ab0ee2ca 2806 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2807 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2808end:
2809 rcu_read_unlock();
2810 if (_cmd_reply) {
2811 *_cmd_reply = cmd_reply;
2812 }
2813 return 0;
2814}
2815
2816/* Returns 0 on success, 1 on exit requested, negative value on error. */
2817int handle_notification_thread_command(
2818 struct notification_thread_handle *handle,
2819 struct notification_thread_state *state)
2820{
2821 int ret;
2822 uint64_t counter;
2823 struct notification_thread_command *cmd;
2824
8abe313a 2825 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2826 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2827 sizeof(counter));
18aeca05 2828 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2829 goto error;
2830 }
2831
2832 pthread_mutex_lock(&handle->cmd_queue.lock);
2833 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2834 struct notification_thread_command, cmd_list_node);
2835 switch (cmd->type) {
2836 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2837 DBG("[notification-thread] Received register trigger command");
2838 ret = handle_notification_thread_command_register_trigger(
2839 state, cmd->parameters.trigger,
2840 &cmd->reply_code);
2841 break;
2842 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2843 DBG("[notification-thread] Received unregister trigger command");
2844 ret = handle_notification_thread_command_unregister_trigger(
2845 state, cmd->parameters.trigger,
2846 &cmd->reply_code);
2847 break;
2848 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2849 DBG("[notification-thread] Received add channel command");
2850 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2851 state,
2852 cmd->parameters.add_channel.session.name,
2853 cmd->parameters.add_channel.session.uid,
2854 cmd->parameters.add_channel.session.gid,
2855 cmd->parameters.add_channel.channel.name,
2856 cmd->parameters.add_channel.channel.domain,
2857 cmd->parameters.add_channel.channel.key,
2858 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2859 &cmd->reply_code);
2860 break;
2861 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2862 DBG("[notification-thread] Received remove channel command");
2863 ret = handle_notification_thread_command_remove_channel(
2864 state, cmd->parameters.remove_channel.key,
2865 cmd->parameters.remove_channel.domain,
2866 &cmd->reply_code);
2867 break;
0ca52944 2868 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2869 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2870 DBG("[notification-thread] Received session rotation %s command",
2871 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2872 "ongoing" : "completed");
2873 ret = handle_notification_thread_command_session_rotation(
0ca52944 2874 state,
ed327204
JG
2875 cmd->type,
2876 cmd->parameters.session_rotation.session_name,
2877 cmd->parameters.session_rotation.uid,
2878 cmd->parameters.session_rotation.gid,
2879 cmd->parameters.session_rotation.trace_archive_chunk_id,
2880 cmd->parameters.session_rotation.location,
0ca52944
JG
2881 &cmd->reply_code);
2882 break;
1831ae68
FD
2883 case NOTIFICATION_COMMAND_TYPE_ADD_APPLICATION:
2884 ret = handle_notification_thread_command_add_application(
2885 handle,
2886 state,
2887 cmd->parameters.application.read_side_trigger_event_application_pipe,
2888 &cmd->reply_code);
2889 break;
2890 case NOTIFICATION_COMMAND_TYPE_REMOVE_APPLICATION:
2891 ret = handle_notification_thread_command_remove_application(
2892 handle,
2893 state,
2894 cmd->parameters.application.read_side_trigger_event_application_pipe,
2895 &cmd->reply_code);
2896 break;
2897 case NOTIFICATION_COMMAND_TYPE_GET_TOKENS:
2898 {
2899 struct lttng_triggers *triggers = NULL;
2900 ret = handle_notification_thread_command_get_tokens(
2901 handle, state, &triggers, &cmd->reply_code);
2902 cmd->reply.get_tokens.triggers = triggers;
2903 ret = 0;
2904 break;
2905
2906 cmd->reply_code = LTTNG_OK;
2907 ret = 0;
2908 break;
2909 }
2910 case NOTIFICATION_COMMAND_TYPE_LIST_TRIGGERS:
2911 {
2912 struct lttng_triggers *triggers = NULL;
2913 ret = handle_notification_thread_command_list_triggers(
2914 handle,
2915 state,
2916 cmd->parameters.list_triggers.uid,
2917 cmd->parameters.list_triggers.gid,
2918 &triggers,
2919 &cmd->reply_code);
2920 cmd->reply.list_triggers.triggers = triggers;
2921 ret = 0;
2922 break;
2923 }
ab0ee2ca
JG
2924 case NOTIFICATION_COMMAND_TYPE_QUIT:
2925 DBG("[notification-thread] Received quit command");
2926 cmd->reply_code = LTTNG_OK;
2927 ret = 1;
2928 goto end;
2929 default:
2930 ERR("[notification-thread] Unknown internal command received");
2931 goto error_unlock;
2932 }
2933
2934 if (ret) {
2935 goto error_unlock;
2936 }
2937end:
2938 cds_list_del(&cmd->cmd_list_node);
1831ae68
FD
2939 if (cmd->is_async) {
2940 free(cmd);
2941 cmd = NULL;
2942 } else {
2943 lttng_waiter_wake_up(&cmd->reply_waiter);
2944 }
ab0ee2ca
JG
2945 pthread_mutex_unlock(&handle->cmd_queue.lock);
2946 return ret;
2947error_unlock:
2948 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2949 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2950 pthread_mutex_unlock(&handle->cmd_queue.lock);
2951 cmd->reply_code = LTTNG_ERR_FATAL;
2952error:
2953 /* Indicate a fatal error to the caller. */
2954 return -1;
2955}
2956
ab0ee2ca
JG
2957static
2958int socket_set_non_blocking(int socket)
2959{
2960 int ret, flags;
2961
2962 /* Set the pipe as non-blocking. */
2963 ret = fcntl(socket, F_GETFL, 0);
2964 if (ret == -1) {
2965 PERROR("fcntl get socket flags");
2966 goto end;
2967 }
2968 flags = ret;
2969
2970 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2971 if (ret == -1) {
2972 PERROR("fcntl set O_NONBLOCK socket flag");
2973 goto end;
2974 }
2975 DBG("Client socket (fd = %i) set as non-blocking", socket);
2976end:
2977 return ret;
2978}
2979
1831ae68 2980/* Client lock must be acquired by caller. */
ab0ee2ca 2981static
14fa22f8 2982int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2983{
2984 int ret;
2985
1831ae68
FD
2986 ASSERT_LOCKED(client->lock);
2987
ab0ee2ca
JG
2988 ret = lttng_dynamic_buffer_set_size(
2989 &client->communication.inbound.buffer, 0);
2990 assert(!ret);
2991
2992 client->communication.inbound.bytes_to_receive =
2993 sizeof(struct lttng_notification_channel_message);
2994 client->communication.inbound.msg_type =
2995 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2996 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2997 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2998 ret = lttng_dynamic_buffer_set_size(
2999 &client->communication.inbound.buffer,
3000 client->communication.inbound.bytes_to_receive);
3001 return ret;
ab0ee2ca
JG
3002}
3003
3004int handle_notification_thread_client_connect(
3005 struct notification_thread_state *state)
3006{
3007 int ret;
3008 struct notification_client *client;
3009
3010 DBG("[notification-thread] Handling new notification channel client connection");
3011
3012 client = zmalloc(sizeof(*client));
3013 if (!client) {
3014 /* Fatal error. */
3015 ret = -1;
3016 goto error;
3017 }
1831ae68
FD
3018 pthread_mutex_init(&client->lock, NULL);
3019 client->id = state->next_notification_client_id++;
ab0ee2ca
JG
3020 CDS_INIT_LIST_HEAD(&client->condition_list);
3021 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
3022 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 3023 client->communication.inbound.expect_creds = true;
1831ae68
FD
3024
3025 pthread_mutex_lock(&client->lock);
14fa22f8 3026 ret = client_reset_inbound_state(client);
1831ae68 3027 pthread_mutex_unlock(&client->lock);
14fa22f8
JG
3028 if (ret) {
3029 ERR("[notification-thread] Failed to reset client communication's inbound state");
3030 ret = 0;
3031 goto error;
3032 }
ab0ee2ca
JG
3033
3034 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
3035 if (ret < 0) {
3036 ERR("[notification-thread] Failed to accept new notification channel client connection");
3037 ret = 0;
3038 goto error;
3039 }
3040
3041 client->socket = ret;
3042
3043 ret = socket_set_non_blocking(client->socket);
3044 if (ret) {
3045 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
3046 goto error;
3047 }
3048
3049 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
3050 if (ret < 0) {
3051 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
3052 ret = 0;
3053 goto error;
3054 }
3055
3056 ret = lttng_poll_add(&state->events, client->socket,
3057 LPOLLIN | LPOLLERR |
3058 LPOLLHUP | LPOLLRDHUP);
3059 if (ret < 0) {
3060 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
3061 ret = 0;
3062 goto error;
3063 }
3064 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
3065 client->socket);
3066
ab0ee2ca
JG
3067 rcu_read_lock();
3068 cds_lfht_add(state->client_socket_ht,
3069 hash_client_socket(client->socket),
3070 &client->client_socket_ht_node);
1831ae68
FD
3071 cds_lfht_add(state->client_id_ht,
3072 hash_client_id(client->id),
3073 &client->client_id_ht_node);
ab0ee2ca
JG
3074 rcu_read_unlock();
3075
3076 return ret;
3077error:
3078 notification_client_destroy(client, state);
3079 return ret;
3080}
3081
1831ae68
FD
3082/* RCU read-lock must be held by the caller. */
3083static
3084int notification_thread_client_disconnect(
3085 struct notification_client *client,
ab0ee2ca 3086 struct notification_thread_state *state)
1831ae68
FD
3087{
3088 int ret;
3089 struct lttng_condition_list_element *condition_list_element, *tmp;
3090
3091 /* Acquire the client lock to disable its communication atomically. */
3092 pthread_mutex_lock(&client->lock);
3093 client->communication.active = false;
3094 ret = lttng_poll_del(&state->events, client->socket);
3095 if (ret) {
3096 ERR("[notification-thread] Failed to remove client socket %d from poll set",
3097 client->socket);
3098 }
3099 pthread_mutex_unlock(&client->lock);
3100
3101 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
3102 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
3103
3104 /* Release all conditions to which the client was subscribed. */
3105 cds_list_for_each_entry_safe(condition_list_element, tmp,
3106 &client->condition_list, node) {
3107 (void) notification_thread_client_unsubscribe(client,
3108 condition_list_element->condition, state, NULL);
3109 }
3110
3111 /*
3112 * Client no longer accessible to other threads (through the
3113 * client lists).
3114 */
3115 notification_client_destroy(client, state);
3116 return ret;
3117}
3118
3119int handle_notification_thread_client_disconnect(
3120 int client_socket, struct notification_thread_state *state)
ab0ee2ca
JG
3121{
3122 int ret = 0;
3123 struct notification_client *client;
3124
3125 rcu_read_lock();
3126 DBG("[notification-thread] Closing client connection (socket fd = %i)",
3127 client_socket);
3128 client = get_client_from_socket(client_socket, state);
3129 if (!client) {
3130 /* Internal state corruption, fatal error. */
3131 ERR("[notification-thread] Unable to find client (socket fd = %i)",
3132 client_socket);
3133 ret = -1;
3134 goto end;
3135 }
3136
1831ae68 3137 ret = notification_thread_client_disconnect(client, state);
ab0ee2ca
JG
3138end:
3139 rcu_read_unlock();
3140 return ret;
3141}
3142
3143int handle_notification_thread_client_disconnect_all(
3144 struct notification_thread_state *state)
3145{
3146 struct cds_lfht_iter iter;
3147 struct notification_client *client;
3148 bool error_encoutered = false;
3149
3150 rcu_read_lock();
3151 DBG("[notification-thread] Closing all client connections");
3152 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
1831ae68 3153 client_socket_ht_node) {
ab0ee2ca
JG
3154 int ret;
3155
1831ae68
FD
3156 ret = notification_thread_client_disconnect(
3157 client, state);
ab0ee2ca
JG
3158 if (ret) {
3159 error_encoutered = true;
3160 }
3161 }
3162 rcu_read_unlock();
3163 return error_encoutered ? 1 : 0;
3164}
3165
3166int handle_notification_thread_trigger_unregister_all(
3167 struct notification_thread_state *state)
3168{
4149ace8 3169 bool error_occurred = false;
ab0ee2ca
JG
3170 struct cds_lfht_iter iter;
3171 struct lttng_trigger_ht_element *trigger_ht_element;
3172
763de384 3173 rcu_read_lock();
ab0ee2ca
JG
3174 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
3175 node) {
3176 int ret = handle_notification_thread_command_unregister_trigger(
3177 state, trigger_ht_element->trigger, NULL);
3178 if (ret) {
4149ace8 3179 error_occurred = true;
ab0ee2ca
JG
3180 }
3181 }
763de384 3182 rcu_read_unlock();
4149ace8 3183 return error_occurred ? -1 : 0;
ab0ee2ca
JG
3184}
3185
3186static
1831ae68
FD
3187int client_handle_transmission_status(
3188 struct notification_client *client,
3189 enum client_transmission_status transmission_status,
ab0ee2ca 3190 struct notification_thread_state *state)
1831ae68
FD
3191{
3192 int ret = 0;
3193
3194 ASSERT_LOCKED(client->lock);
3195
3196 switch (transmission_status) {
3197 case CLIENT_TRANSMISSION_STATUS_COMPLETE:
3198 ret = lttng_poll_mod(&state->events, client->socket,
3199 CLIENT_POLL_MASK_IN);
3200 if (ret) {
3201 goto end;
3202 }
3203
3204 client->communication.outbound.queued_command_reply = false;
3205 client->communication.outbound.dropped_notification = false;
3206 break;
3207 case CLIENT_TRANSMISSION_STATUS_QUEUED:
3208 /*
3209 * We want to be notified whenever there is buffer space
3210 * available to send the rest of the payload.
3211 */
3212 ret = lttng_poll_mod(&state->events, client->socket,
3213 CLIENT_POLL_MASK_IN_OUT);
3214 if (ret) {
3215 goto end;
3216 }
3217 break;
3218 case CLIENT_TRANSMISSION_STATUS_FAIL:
3219 ret = notification_thread_client_disconnect(client, state);
3220 if (ret) {
3221 goto end;
3222 }
3223 break;
3224 case CLIENT_TRANSMISSION_STATUS_ERROR:
3225 ret = -1;
3226 goto end;
3227 default:
3228 abort();
3229 }
3230end:
3231 return ret;
3232}
3233
3234/* Client lock must be acquired by caller. */
3235static
3236enum client_transmission_status client_flush_outgoing_queue(
3237 struct notification_client *client)
ab0ee2ca
JG
3238{
3239 ssize_t ret;
3240 size_t to_send_count;
1831ae68
FD
3241 enum client_transmission_status status;
3242
3243 ASSERT_LOCKED(client->lock);
ab0ee2ca
JG
3244
3245 assert(client->communication.outbound.buffer.size != 0);
3246 to_send_count = client->communication.outbound.buffer.size;
3247 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
3248 client->socket);
3249
3250 ret = lttcomm_send_unix_sock_non_block(client->socket,
3251 client->communication.outbound.buffer.data,
3252 to_send_count);
3253 if ((ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) ||
3254 (ret > 0 && ret < to_send_count)) {
3255 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
3256 client->socket);
3257 to_send_count -= max(ret, 0);
3258
3259 memcpy(client->communication.outbound.buffer.data,
3260 client->communication.outbound.buffer.data +
3261 client->communication.outbound.buffer.size - to_send_count,
3262 to_send_count);
3263 ret = lttng_dynamic_buffer_set_size(
3264 &client->communication.outbound.buffer,
3265 to_send_count);
3266 if (ret) {
3267 goto error;
3268 }
1831ae68 3269 status = CLIENT_TRANSMISSION_STATUS_QUEUED;
ab0ee2ca
JG
3270 } else if (ret < 0) {
3271 /* Generic error, disconnect the client. */
1831ae68 3272 ERR("[notification-thread] Failed to flush outgoing queue, disconnecting client (socket fd = %i)",
ab0ee2ca 3273 client->socket);
1831ae68 3274 status = CLIENT_TRANSMISSION_STATUS_FAIL;
ab0ee2ca
JG
3275 } else {
3276 /* No error and flushed the queue completely. */
3277 ret = lttng_dynamic_buffer_set_size(
3278 &client->communication.outbound.buffer, 0);
3279 if (ret) {
3280 goto error;
3281 }
1831ae68 3282 status = CLIENT_TRANSMISSION_STATUS_COMPLETE;
ab0ee2ca
JG
3283 }
3284
1831ae68 3285 return status;
ab0ee2ca 3286error:
1831ae68 3287 return CLIENT_TRANSMISSION_STATUS_ERROR;
ab0ee2ca
JG
3288}
3289
1831ae68 3290/* Client lock must be acquired by caller. */
ab0ee2ca
JG
3291static
3292int client_send_command_reply(struct notification_client *client,
3293 struct notification_thread_state *state,
3294 enum lttng_notification_channel_status status)
3295{
3296 int ret;
3297 struct lttng_notification_channel_command_reply reply = {
3298 .status = (int8_t) status,
3299 };
3300 struct lttng_notification_channel_message msg = {
3301 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
3302 .size = sizeof(reply),
3303 };
3304 char buffer[sizeof(msg) + sizeof(reply)];
1831ae68
FD
3305 enum client_transmission_status transmission_status;
3306
3307 ASSERT_LOCKED(client->lock);
ab0ee2ca
JG
3308
3309 if (client->communication.outbound.queued_command_reply) {
3310 /* Protocol error. */
3311 goto error;
3312 }
3313
3314 memcpy(buffer, &msg, sizeof(msg));
3315 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
3316 DBG("[notification-thread] Send command reply (%i)", (int) status);
3317
3318 /* Enqueue buffer to outgoing queue and flush it. */
3319 ret = lttng_dynamic_buffer_append(
3320 &client->communication.outbound.buffer,
3321 buffer, sizeof(buffer));
3322 if (ret) {
1831ae68
FD
3323 goto error;
3324 }
3325
3326 transmission_status = client_flush_outgoing_queue(client);
3327 ret = client_handle_transmission_status(
3328 client, transmission_status, state);
3329 if (ret) {
3330 goto error;
3331 }
3332
3333 if (client->communication.outbound.buffer.size != 0) {
3334 /* Queue could not be emptied. */
3335 client->communication.outbound.queued_command_reply = true;
3336 }
3337
3338 return 0;
3339error:
3340 return -1;
3341}
3342
3343static
3344int client_handle_message_unknown(struct notification_client *client,
3345 struct notification_thread_state *state)
3346{
3347 int ret;
3348
3349 pthread_mutex_lock(&client->lock);
3350
3351 /*
3352 * Receiving message header. The function will be called again
3353 * once the rest of the message as been received and can be
3354 * interpreted.
3355 */
3356 const struct lttng_notification_channel_message *msg;
3357
3358 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
3359 msg = (const struct lttng_notification_channel_message *)
3360 client->communication.inbound.buffer.data;
3361
3362 if (msg->size == 0 ||
3363 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
3364 ERR("[notification-thread] Invalid notification channel message: length = %u",
3365 msg->size);
3366 ret = -1;
3367 goto end;
3368 }
3369
3370 switch (msg->type) {
3371 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3372 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3373 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3374 break;
3375 default:
3376 ret = -1;
3377 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
3378 goto end;
3379 }
3380
3381 client->communication.inbound.bytes_to_receive = msg->size;
3382 client->communication.inbound.msg_type =
3383 (enum lttng_notification_channel_message_type) msg->type;
3384 ret = lttng_dynamic_buffer_set_size(
3385 &client->communication.inbound.buffer, msg->size);
3386end:
3387 pthread_mutex_unlock(&client->lock);
3388 return ret;
3389}
3390
3391static
3392int client_handle_message_handshake(struct notification_client *client,
3393 struct notification_thread_state *state)
3394{
3395 int ret;
3396 struct lttng_notification_channel_command_handshake *handshake_client;
3397 const struct lttng_notification_channel_command_handshake handshake_reply = {
3398 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
3399 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
3400 };
3401 const struct lttng_notification_channel_message msg_header = {
3402 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
3403 .size = sizeof(handshake_reply),
3404 };
3405 enum lttng_notification_channel_status status =
3406 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3407 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
3408 enum client_transmission_status transmission_status;
3409
3410 pthread_mutex_lock(&client->lock);
3411
3412 memcpy(send_buffer, &msg_header, sizeof(msg_header));
3413 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
3414 sizeof(handshake_reply));
3415
3416 handshake_client =
3417 (struct lttng_notification_channel_command_handshake *)
3418 client->communication.inbound.buffer
3419 .data;
3420 client->major = handshake_client->major;
3421 client->minor = handshake_client->minor;
3422 if (!client->communication.inbound.creds_received) {
3423 ERR("[notification-thread] No credentials received from client");
3424 ret = -1;
3425 goto end;
3426 }
3427
3428 client->uid = LTTNG_SOCK_GET_UID_CRED(
3429 &client->communication.inbound.creds);
3430 client->gid = LTTNG_SOCK_GET_GID_CRED(
3431 &client->communication.inbound.creds);
3432 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
3433 client->uid, client->gid, (int) client->major,
3434 (int) client->minor);
3435
3436 if (handshake_client->major !=
3437 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
3438 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
3439 }
3440
3441 ret = lttng_dynamic_buffer_append(
3442 &client->communication.outbound.buffer, send_buffer,
3443 sizeof(send_buffer));
3444 if (ret) {
3445 ERR("[notification-thread] Failed to send protocol version to notification channel client");
3446 goto end;
3447 }
3448
3449 transmission_status = client_flush_outgoing_queue(client);
3450 ret = client_handle_transmission_status(
3451 client, transmission_status, state);
3452 if (ret) {
3453 goto end;
3454 }
3455
3456 ret = client_send_command_reply(client, state, status);
3457 if (ret) {
3458 ERR("[notification-thread] Failed to send reply to notification channel client");
3459 goto end;
3460 }
3461
3462 /* Set reception state to receive the next message header. */
3463 ret = client_reset_inbound_state(client);
3464 if (ret) {
3465 ERR("[notification-thread] Failed to reset client communication's inbound state");
3466 goto end;
3467 }
3468 client->validated = true;
3469 client->communication.active = true;
3470
3471end:
3472 pthread_mutex_unlock(&client->lock);
3473 return ret;
3474}
3475
3476static
3477int client_handle_message_subscription(
3478 struct notification_client *client,
3479 enum lttng_notification_channel_message_type msg_type,
3480 struct notification_thread_state *state)
3481{
3482 int ret;
3483 struct lttng_condition *condition;
3484 enum lttng_notification_channel_status status =
3485 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
3486 const struct lttng_buffer_view condition_view =
3487 lttng_buffer_view_from_dynamic_buffer(
3488 &client->communication.inbound.buffer,
3489 0, -1);
3490 size_t expected_condition_size;
3491
3492 pthread_mutex_lock(&client->lock);
3493 expected_condition_size = client->communication.inbound.buffer.size;
3494 pthread_mutex_unlock(&client->lock);
3495
3496 ret = lttng_condition_create_from_buffer(&condition_view, &condition);
3497 if (ret != expected_condition_size) {
3498 ERR("[notification-thread] Malformed condition received from client");
3499 goto end;
3500 }
3501
3502 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3503 ret = notification_thread_client_subscribe(
3504 client, condition, state, &status);
3505 } else {
3506 ret = notification_thread_client_unsubscribe(
3507 client, condition, state, &status);
3508 }
3509 if (ret) {
3510 goto end;
ab0ee2ca
JG
3511 }
3512
1831ae68
FD
3513 pthread_mutex_lock(&client->lock);
3514 ret = client_send_command_reply(client, state, status);
ab0ee2ca 3515 if (ret) {
1831ae68
FD
3516 ERR("[notification-thread] Failed to send reply to notification channel client");
3517 goto end_unlock;
ab0ee2ca
JG
3518 }
3519
1831ae68
FD
3520 /* Set reception state to receive the next message header. */
3521 ret = client_reset_inbound_state(client);
3522 if (ret) {
3523 ERR("[notification-thread] Failed to reset client communication's inbound state");
3524 goto end_unlock;
ab0ee2ca
JG
3525 }
3526
1831ae68
FD
3527end_unlock:
3528 pthread_mutex_unlock(&client->lock);
3529end:
3530 return ret;
ab0ee2ca
JG
3531}
3532
3533static
3534int client_dispatch_message(struct notification_client *client,
3535 struct notification_thread_state *state)
3536{
3537 int ret = 0;
3538
3539 if (client->communication.inbound.msg_type !=
3540 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3541 client->communication.inbound.msg_type !=
3542 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3543 !client->validated) {
3544 WARN("[notification-thread] client attempted a command before handshake");
3545 ret = -1;
3546 goto end;
3547 }
3548
3549 switch (client->communication.inbound.msg_type) {
3550 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3551 {
1831ae68 3552 ret = client_handle_message_unknown(client, state);
ab0ee2ca
JG
3553 break;
3554 }
3555 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3556 {
1831ae68 3557 ret = client_handle_message_handshake(client, state);
ab0ee2ca
JG
3558 break;
3559 }
3560 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3561 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3562 {
1831ae68
FD
3563 ret = client_handle_message_subscription(client,
3564 client->communication.inbound.msg_type, state);
ab0ee2ca
JG
3565 break;
3566 }
3567 default:
3568 abort();
3569 }
3570end:
3571 return ret;
3572}
3573
3574/* Incoming data from client. */
3575int handle_notification_thread_client_in(
3576 struct notification_thread_state *state, int socket)
3577{
14fa22f8 3578 int ret = 0;
ab0ee2ca
JG
3579 struct notification_client *client;
3580 ssize_t recv_ret;
3581 size_t offset;
1831ae68 3582 bool message_is_complete = false;
ab0ee2ca
JG
3583
3584 client = get_client_from_socket(socket, state);
3585 if (!client) {
3586 /* Internal error, abort. */
3587 ret = -1;
3588 goto end;
3589 }
3590
1831ae68 3591 pthread_mutex_lock(&client->lock);
14fa22f8
JG
3592 offset = client->communication.inbound.buffer.size -
3593 client->communication.inbound.bytes_to_receive;
01ea340e 3594 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
3595 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3596 client->communication.inbound.buffer.data + offset,
3597 client->communication.inbound.bytes_to_receive,
3598 &client->communication.inbound.creds);
3599 if (recv_ret > 0) {
01ea340e 3600 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
3601 client->communication.inbound.creds_received = true;
3602 }
3603 } else {
3604 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3605 client->communication.inbound.buffer.data + offset,
3606 client->communication.inbound.bytes_to_receive);
3607 }
1831ae68
FD
3608 if (recv_ret >= 0) {
3609 client->communication.inbound.bytes_to_receive -= recv_ret;
3610 message_is_complete = client->communication.inbound
3611 .bytes_to_receive == 0;
3612 }
3613 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3614 if (recv_ret < 0) {
3615 goto error_disconnect_client;
3616 }
3617
1831ae68 3618 if (message_is_complete) {
ab0ee2ca
JG
3619 ret = client_dispatch_message(client, state);
3620 if (ret) {
3621 /*
3622 * Only returns an error if this client must be
3623 * disconnected.
3624 */
3625 goto error_disconnect_client;
3626 }
ab0ee2ca
JG
3627 }
3628end:
3629 return ret;
3630error_disconnect_client:
1831ae68 3631 ret = notification_thread_client_disconnect(client, state);
ab0ee2ca
JG
3632 return ret;
3633}
3634
3635/* Client ready to receive outgoing data. */
3636int handle_notification_thread_client_out(
3637 struct notification_thread_state *state, int socket)
3638{
3639 int ret;
3640 struct notification_client *client;
1831ae68 3641 enum client_transmission_status transmission_status;
ab0ee2ca
JG
3642
3643 client = get_client_from_socket(socket, state);
3644 if (!client) {
3645 /* Internal error, abort. */
3646 ret = -1;
3647 goto end;
3648 }
3649
1831ae68
FD
3650 pthread_mutex_lock(&client->lock);
3651 transmission_status = client_flush_outgoing_queue(client);
3652 ret = client_handle_transmission_status(
3653 client, transmission_status, state);
3654 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3655 if (ret) {
3656 goto end;
3657 }
3658end:
3659 return ret;
3660}
3661
3662static
e8360425
JD
3663bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3664 const struct channel_state_sample *sample,
3665 uint64_t buffer_capacity)
ab0ee2ca
JG
3666{
3667 bool result = false;
3668 uint64_t threshold;
3669 enum lttng_condition_type condition_type;
e8360425 3670 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
3671 condition, struct lttng_condition_buffer_usage,
3672 parent);
3673
ab0ee2ca
JG
3674 if (use_condition->threshold_bytes.set) {
3675 threshold = use_condition->threshold_bytes.value;
3676 } else {
3677 /*
3678 * Threshold was expressed as a ratio.
3679 *
3680 * TODO the threshold (in bytes) of conditions expressed
3681 * as a ratio of total buffer size could be cached to
3682 * forego this double-multiplication or it could be performed
3683 * as fixed-point math.
3684 *
3685 * Note that caching should accomodate the case where the
3686 * condition applies to multiple channels (i.e. don't assume
3687 * that all channels matching my_chann* have the same size...)
3688 */
3689 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3690 (double) buffer_capacity);
3691 }
3692
3693 condition_type = lttng_condition_get_type(condition);
3694 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3695 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3696 threshold, sample->highest_usage);
3697
3698 /*
3699 * The low condition should only be triggered once _all_ of the
3700 * streams in a channel have gone below the "low" threshold.
3701 */
3702 if (sample->highest_usage <= threshold) {
3703 result = true;
3704 }
3705 } else {
3706 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3707 threshold, sample->highest_usage);
3708
3709 /*
3710 * For high buffer usage scenarios, we want to trigger whenever
3711 * _any_ of the streams has reached the "high" threshold.
3712 */
3713 if (sample->highest_usage >= threshold) {
3714 result = true;
3715 }
3716 }
e8360425 3717
ab0ee2ca
JG
3718 return result;
3719}
3720
3721static
e8360425
JD
3722bool evaluate_session_consumed_size_condition(
3723 const struct lttng_condition *condition,
3724 uint64_t session_consumed_size)
3725{
3726 uint64_t threshold;
3727 const struct lttng_condition_session_consumed_size *size_condition =
3728 container_of(condition,
3729 struct lttng_condition_session_consumed_size,
3730 parent);
3731
3732 threshold = size_condition->consumed_threshold_bytes.value;
3733 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3734 threshold, session_consumed_size);
3735 return session_consumed_size >= threshold;
3736}
3737
3738static
ed327204 3739int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3740 struct lttng_evaluation **evaluation,
e8360425
JD
3741 const struct notification_thread_state *state,
3742 const struct channel_state_sample *previous_sample,
3743 const struct channel_state_sample *latest_sample,
3744 uint64_t previous_session_consumed_total,
3745 uint64_t latest_session_consumed_total,
3746 struct channel_info *channel_info)
ab0ee2ca
JG
3747{
3748 int ret = 0;
3749 enum lttng_condition_type condition_type;
e8360425
JD
3750 const bool previous_sample_available = !!previous_sample;
3751 bool previous_sample_result = false;
ab0ee2ca
JG
3752 bool latest_sample_result;
3753
3754 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3755
e8360425
JD
3756 switch (condition_type) {
3757 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3758 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3759 if (caa_likely(previous_sample_available)) {
3760 previous_sample_result =
3761 evaluate_buffer_usage_condition(condition,
3762 previous_sample, channel_info->capacity);
3763 }
3764 latest_sample_result = evaluate_buffer_usage_condition(
3765 condition, latest_sample,
3766 channel_info->capacity);
3767 break;
3768 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3769 if (caa_likely(previous_sample_available)) {
3770 previous_sample_result =
3771 evaluate_session_consumed_size_condition(
3772 condition,
3773 previous_session_consumed_total);
3774 }
3775 latest_sample_result =
3776 evaluate_session_consumed_size_condition(
3777 condition,
3778 latest_session_consumed_total);
3779 break;
3780 default:
3781 /* Unknown condition type; internal error. */
3782 abort();
3783 }
ab0ee2ca
JG
3784
3785 if (!latest_sample_result ||
3786 (previous_sample_result == latest_sample_result)) {
3787 /*
3788 * Only trigger on a condition evaluation transition.
3789 *
3790 * NOTE: This edge-triggered logic may not be appropriate for
3791 * future condition types.
3792 */
3793 goto end;
3794 }
3795
e8360425
JD
3796 if (!evaluation || !latest_sample_result) {
3797 goto end;
3798 }
3799
3800 switch (condition_type) {
3801 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3802 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3803 *evaluation = lttng_evaluation_buffer_usage_create(
3804 condition_type,
3805 latest_sample->highest_usage,
e8360425
JD
3806 channel_info->capacity);
3807 break;
3808 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3809 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3810 latest_session_consumed_total);
3811 break;
3812 default:
3813 abort();
3814 }
3815
3816 if (!*evaluation) {
3817 ret = -1;
3818 goto end;
ab0ee2ca
JG
3819 }
3820end:
3821 return ret;
3822}
3823
3824static
1831ae68 3825int client_notification_overflow(struct notification_client *client)
ab0ee2ca 3826{
1831ae68
FD
3827 int ret = 0;
3828 const struct lttng_notification_channel_message msg = {
ab0ee2ca 3829 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
ab0ee2ca
JG
3830 };
3831
1831ae68
FD
3832 ASSERT_LOCKED(client->lock);
3833
3834 DBG("Dropping notification addressed to client (socket fd = %i)",
3835 client->socket);
3836 if (client->communication.outbound.dropped_notification) {
3837 /*
3838 * The client already has a "notification dropped" message
3839 * in its outgoing queue. Nothing to do since all
3840 * of those messages are coalesced.
3841 */
3842 goto end;
3843 }
3844
3845 client->communication.outbound.dropped_notification = true;
ab0ee2ca
JG
3846 ret = lttng_dynamic_buffer_append(
3847 &client->communication.outbound.buffer, &msg,
3848 sizeof(msg));
1831ae68
FD
3849 if (ret) {
3850 PERROR("Failed to enqueue \"dropped notification\" message in client's (socket fd = %i) outgoing queue",
3851 client->socket);
3852 }
3853end:
ab0ee2ca
JG
3854 return ret;
3855}
3856
1831ae68
FD
3857static int client_handle_transmission_status_wrapper(
3858 struct notification_client *client,
3859 enum client_transmission_status status,
3860 void *user_data)
3861{
3862 return client_handle_transmission_status(client, status,
3863 (struct notification_thread_state *) user_data);
3864}
3865
ab0ee2ca 3866static
9b63a4aa
JG
3867int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3868 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
3869 struct notification_client_list* client_list,
3870 struct notification_thread_state *state,
1831ae68
FD
3871 uid_t object_uid, gid_t object_gid)
3872{
3873 return notification_client_list_send_evaluation(client_list,
3874 lttng_trigger_get_const_condition(trigger), evaluation,
3875 lttng_trigger_get_credentials(trigger),
3876 &(struct lttng_credentials){
3877 .uid = object_uid, .gid = object_gid},
3878 client_handle_transmission_status_wrapper, state);
3879}
3880
3881LTTNG_HIDDEN
3882int notification_client_list_send_evaluation(
3883 struct notification_client_list *client_list,
3884 const struct lttng_condition *condition,
3885 const struct lttng_evaluation *evaluation,
3886 const struct lttng_credentials *trigger_creds,
3887 const struct lttng_credentials *source_object_creds,
3888 report_client_transmission_result_cb client_report,
3889 void *user_data)
ab0ee2ca
JG
3890{
3891 int ret = 0;
3892 struct lttng_dynamic_buffer msg_buffer;
3893 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa 3894 const struct lttng_notification notification = {
1831ae68 3895 .condition = (struct lttng_condition *) condition,
9b63a4aa
JG
3896 .evaluation = (struct lttng_evaluation *) evaluation,
3897 };
3647288f
JG
3898 struct lttng_notification_channel_message msg_header = {
3899 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3900 };
ab0ee2ca
JG
3901
3902 lttng_dynamic_buffer_init(&msg_buffer);
3903
3647288f
JG
3904 ret = lttng_dynamic_buffer_append(&msg_buffer, &msg_header,
3905 sizeof(msg_header));
ab0ee2ca
JG
3906 if (ret) {
3907 goto end;
3908 }
3909
9b63a4aa 3910 ret = lttng_notification_serialize(&notification, &msg_buffer);
ab0ee2ca 3911 if (ret) {
ab0ee2ca
JG
3912 ERR("[notification-thread] Failed to serialize notification");
3913 ret = -1;
3914 goto end;
3915 }
3916
3647288f
JG
3917 /* Update payload size. */
3918 ((struct lttng_notification_channel_message * ) msg_buffer.data)->size =
3919 (uint32_t) (msg_buffer.size - sizeof(msg_header));
3920
1831ae68 3921 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
3922 cds_list_for_each_entry_safe(client_list_element, tmp,
3923 &client_list->list, node) {
1831ae68 3924 enum client_transmission_status transmission_status;
ab0ee2ca
JG
3925 struct notification_client *client =
3926 client_list_element->client;
3927
1831ae68
FD
3928 ret = 0;
3929 pthread_mutex_lock(&client->lock);
3930 if (source_object_creds) {
3931 if (client->uid != source_object_creds->uid &&
3932 client->gid != source_object_creds->gid &&
3933 client->uid != 0) {
3934 /*
3935 * Client is not allowed to monitor this
3936 * object.
3937 */
3938 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
3939 goto unlock_client;
3940 }
3941 }
3942
3943 /* TODO: what is the behavior for root client on non root
3944 * trigger? Since multiple triggers (different user) can have the same condition
3945 * but with different action group that can have each a notify.
3946 * Does the root client receive multiple notification for all
3947 * those triggers with the same condition or only notification
3948 * for triggers the root user configured?
3949 * For now we do the later. All users including the root user
3950 * can only receive notification from trigger it registered.
3951 */
3952 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3953 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
3954 goto unlock_client;
ab0ee2ca
JG
3955 }
3956
3957 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
3958 client->socket, msg_buffer.size);
3959 if (client->communication.outbound.buffer.size) {
3960 /*
3961 * Outgoing data is already buffered for this client;
3962 * drop the notification and enqueue a "dropped
3963 * notification" message if this is the first dropped
3964 * notification since the socket spilled-over to the
3965 * queue.
3966 */
1831ae68
FD
3967 ret = client_notification_overflow(client);
3968 if (ret) {
3969 goto unlock_client;
ab0ee2ca 3970 }
ab0ee2ca
JG
3971 }
3972
3973 ret = lttng_dynamic_buffer_append_buffer(
3974 &client->communication.outbound.buffer,
3975 &msg_buffer);
3976 if (ret) {
1831ae68 3977 goto unlock_client;
ab0ee2ca
JG
3978 }
3979
1831ae68
FD
3980 transmission_status = client_flush_outgoing_queue(client);
3981 ret = client_report(client, transmission_status, user_data);
ab0ee2ca 3982 if (ret) {
1831ae68
FD
3983 goto unlock_client;
3984 }
3985unlock_client:
3986 pthread_mutex_unlock(&client->lock);
3987 if (ret) {
3988 goto end_unlock_list;
ab0ee2ca
JG
3989 }
3990 }
3991 ret = 0;
1831ae68
FD
3992
3993end_unlock_list:
3994 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca 3995end:
ab0ee2ca
JG
3996 lttng_dynamic_buffer_reset(&msg_buffer);
3997 return ret;
3998}
3999
1831ae68
FD
4000int perform_event_action_notify(struct notification_thread_state *state,
4001 const struct lttng_trigger *trigger,
4002 const struct lttng_trigger_notification *notification,
4003 const struct lttng_action *action)
4004{
4005 int ret;
4006 struct notification_client_list *client_list;
4007 struct lttng_evaluation *evaluation = NULL;
4008 const struct lttng_credentials *creds = lttng_trigger_get_credentials(trigger);
4009
4010 /*
4011 * Check if any client is subscribed to the result of this
4012 * evaluation.
4013 */
4014 client_list = get_client_list_from_condition(state, trigger->condition);
4015 assert(client_list);
4016 if (cds_list_empty(&client_list->list)) {
4017 ret = 0;
4018 goto end;
4019 }
4020
4021 evaluation = lttng_evaluation_event_rule_create(trigger->name);
4022 if (!evaluation) {
4023 ERR("Failed to create event rule hit evaluation");
4024 ret = -1;
4025 goto end;
4026 }
4027
4028 /* Dispatch evaluation result to all clients.
4029 * Note that here the passed credentials are the one from the trigger,
4030 * this is because there is no internal object binded to the trigger per
4031 * see and the credential validation is done at the registration level
4032 * for the event rule based trigger. For a channel the credential
4033 * validation can only be done on notify since the trigger can be
4034 * registered before the channel/session creation.
4035 */
4036 ret = send_evaluation_to_clients(trigger,
4037 evaluation, client_list, state,
4038 creds->uid,
4039 creds->gid);
4040 lttng_evaluation_destroy(evaluation);
4041 if (caa_unlikely(ret)) {
4042 goto end;
4043 }
4044end:
4045 return ret;
4046
4047}
4048
4049/* This can be called recursively, pass NULL for action on the first iteration */
4050int perform_event_action(struct notification_thread_state *state,
4051 const struct lttng_trigger *trigger,
4052 const struct lttng_trigger_notification *notification,
4053 const struct lttng_action *action)
4054{
4055 int ret = 0;
4056 enum lttng_action_type action_type;
4057
4058 assert(trigger);
4059
4060 if (!action) {
4061 action = lttng_trigger_get_const_action(trigger);
4062 }
4063
4064 action_type = lttng_action_get_type_const(action);
4065 DBG("Handling action %s for trigger id %s (%" PRIu64 ")",
4066 lttng_action_type_string(action_type), trigger->name,
4067 trigger->key.value);
4068
4069 switch (action_type) {
4070 case LTTNG_ACTION_TYPE_GROUP:
4071 {
4072 /* Recurse into the group */
4073 const struct lttng_action *tmp = NULL;
4074 unsigned int count = 0;
4075 (void) lttng_action_group_get_count(action, &count);
4076 for (int i = 0; i < count; i++) {
4077 tmp = lttng_action_group_get_at_index_const(action, i);
4078 assert(tmp);
4079 ret = perform_event_action(state, trigger, notification, tmp);
4080 if (ret < 0) {
4081 goto end;
4082 }
4083 }
4084 break;
4085 }
4086 case LTTNG_ACTION_TYPE_NOTIFY:
4087 {
4088 ret = perform_event_action_notify(state, trigger, notification, action);
4089 break;
4090 }
4091 default:
4092 break;
4093 }
4094end:
4095 return ret;
4096}
4097
4098int handle_notification_thread_event(struct notification_thread_state *state,
4099 int pipe,
4100 enum lttng_domain_type domain)
4101{
4102 int ret;
4103 struct lttng_ust_trigger_notification ust_notification;
4104 uint64_t kernel_notification;
4105 struct cds_lfht_node *node;
4106 struct cds_lfht_iter iter;
4107 struct notification_trigger_tokens_ht_element *element;
4108 struct lttng_trigger_notification notification;
4109 void *reception_buffer;
4110 size_t reception_size;
4111 enum action_executor_status executor_status;
4112 struct notification_client_list *client_list = NULL;
4113
4114 notification.type = domain;
4115
4116 switch(domain) {
4117 case LTTNG_DOMAIN_UST:
4118 reception_buffer = (void *) &ust_notification;
4119 reception_size = sizeof(ust_notification);
4120 notification.u.ust = &ust_notification;
4121 break;
4122 case LTTNG_DOMAIN_KERNEL:
4123 reception_buffer = (void *) &kernel_notification;
4124 reception_size = sizeof(kernel_notification);
4125 notification.u.kernel = &kernel_notification;
4126 break;
4127 default:
4128 assert(0);
4129 }
4130
4131 /*
4132 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4133 * ensuring that read/write of sampling messages are atomic.
4134 */
4135 /* TODO: should we read as much as we can ? EWOULDBLOCK? */
4136
4137 ret = lttng_read(pipe, reception_buffer, reception_size);
4138 if (ret != reception_size) {
4139 ERR("[notification-thread] Failed to read from event source pipe (fd = %i)",
4140 pipe);
4141 /* TODO: Should this error out completly.
4142 * This can happen when an app is killed as of today
4143 * ret = -1 cause the whole thread to die and fuck up
4144 * everything.
4145 */
4146 goto end;
4147 }
4148
4149 switch(domain) {
4150 case LTTNG_DOMAIN_UST:
4151 notification.id = ust_notification.id;
4152 break;
4153 case LTTNG_DOMAIN_KERNEL:
4154 notification.id = kernel_notification;
4155 break;
4156 default:
4157 assert(0);
4158 }
4159
4160 /* Find triggers associated with this token. */
4161 rcu_read_lock();
4162 cds_lfht_lookup(state->trigger_tokens_ht,
4163 hash_key_u64(&notification.id, lttng_ht_seed), match_trigger_token,
4164 &notification.id, &iter);
4165 node = cds_lfht_iter_get_node(&iter);
4166 if (caa_unlikely(!node)) {
4167 /* TODO: is this an error? This might happen if the receive side
4168 * is slow to process event from source and that the trigger was
4169 * removed but the app still kicking. This yield another
4170 * question on the trigger lifetime and when we can remove a
4171 * trigger. How to guarantee that all event with the token idea
4172 * have be processed? Do we want to provide this guarantee?
4173 *
4174 * Update: I have encountered this when using a trigger on
4175 * sched_switch and then removing it. The frequency is quite
4176 * high hence we en up exactly in the mentionned scenario.
4177 * AFAIK this might be the best way to handle this.
4178 */
4179 ret = 0;
4180 goto end_unlock;
4181 }
4182 element = caa_container_of(node,
4183 struct notification_trigger_tokens_ht_element,
4184 node);
4185
4186 if (!lttng_trigger_is_ready_to_fire(element->trigger)) {
4187 ret = 0;
4188 goto end_unlock;
4189 }
4190
4191 client_list = get_client_list_from_condition(state,
4192 lttng_trigger_get_const_condition(element->trigger));
4193 executor_status = action_executor_enqueue(
4194 state->executor, element->trigger, client_list);
4195 switch (executor_status) {
4196 case ACTION_EXECUTOR_STATUS_OK:
4197 ret = 0;
4198 break;
4199 case ACTION_EXECUTOR_STATUS_OVERFLOW:
4200 {
4201 struct notification_client_list_element *client_list_element,
4202 *tmp;
4203
4204 /*
4205 * Not a fatal error; this is expected and simply means the
4206 * executor has too much work queued already.
4207 */
4208 ret = 0;
4209
4210 if (!client_list) {
4211 break;
4212 }
4213
4214 /* Warn clients that a notification (or more) was dropped. */
4215 pthread_mutex_lock(&client_list->lock);
4216 cds_list_for_each_entry_safe(client_list_element, tmp,
4217 &client_list->list, node) {
4218 enum client_transmission_status transmission_status;
4219 struct notification_client *client =
4220 client_list_element->client;
4221
4222 pthread_mutex_lock(&client->lock);
4223 ret = client_notification_overflow(client);
4224 if (ret) {
4225 /* Fatal error. */
4226 goto next_client;
4227 }
4228
4229 transmission_status =
4230 client_flush_outgoing_queue(client);
4231 ret = client_handle_transmission_status(
4232 client, transmission_status, state);
4233 if (ret) {
4234 /* Fatal error. */
4235 goto next_client;
4236 }
4237next_client:
4238 pthread_mutex_unlock(&client->lock);
4239 if (ret) {
4240 break;
4241 }
4242 }
4243 pthread_mutex_lock(&client_list->lock);
4244 break;
4245 }
4246 case ACTION_EXECUTOR_STATUS_ERROR:
4247 /* Fatal error, shut down everything. */
4248 ERR("Fatal error encoutered while enqueuing action");
4249 ret = -1;
4250 goto end_unlock;
4251 default:
4252 /* Unhandled error. */
4253 abort();
4254 }
4255
4256end_unlock:
4257 notification_client_list_put(client_list);
4258 rcu_read_unlock();
4259end:
4260 return ret;
4261}
4262
ab0ee2ca
JG
4263int handle_notification_thread_channel_sample(
4264 struct notification_thread_state *state, int pipe,
4265 enum lttng_domain_type domain)
4266{
4267 int ret = 0;
4268 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
4269 struct channel_info *channel_info;
4270 struct cds_lfht_node *node;
4271 struct cds_lfht_iter iter;
4272 struct lttng_channel_trigger_list *trigger_list;
4273 struct lttng_trigger_list_element *trigger_list_element;
4274 bool previous_sample_available = false;
e8360425
JD
4275 struct channel_state_sample previous_sample, latest_sample;
4276 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
4277
4278 /*
4279 * The monitoring pipe only holds messages smaller than PIPE_BUF,
4280 * ensuring that read/write of sampling messages are atomic.
4281 */
7c2551ef 4282 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
4283 if (ret != sizeof(sample_msg)) {
4284 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
4285 pipe);
4286 ret = -1;
4287 goto end;
4288 }
4289
4290 ret = 0;
4291 latest_sample.key.key = sample_msg.key;
4292 latest_sample.key.domain = domain;
4293 latest_sample.highest_usage = sample_msg.highest;
4294 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 4295 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
4296
4297 rcu_read_lock();
4298
4299 /* Retrieve the channel's informations */
4300 cds_lfht_lookup(state->channels_ht,
4301 hash_channel_key(&latest_sample.key),
4302 match_channel_info,
4303 &latest_sample.key,
4304 &iter);
4305 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4306 if (caa_unlikely(!node)) {
ab0ee2ca
JG
4307 /*
4308 * Not an error since the consumer can push a sample to the pipe
4309 * and the rest of the session daemon could notify us of the
4310 * channel's destruction before we get a chance to process that
4311 * sample.
4312 */
4313 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
4314 latest_sample.key.key,
4315 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
4316 "user space");
4317 goto end_unlock;
4318 }
4319 channel_info = caa_container_of(node, struct channel_info,
4320 channels_ht_node);
e8360425 4321 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 4322 channel_info->name,
ab0ee2ca 4323 latest_sample.key.key,
8abe313a 4324 channel_info->session_info->name,
ab0ee2ca 4325 latest_sample.highest_usage,
e8360425
JD
4326 latest_sample.lowest_usage,
4327 latest_sample.channel_total_consumed);
4328
4329 previous_session_consumed_total =
4330 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
4331
4332 /* Retrieve the channel's last sample, if it exists, and update it. */
4333 cds_lfht_lookup(state->channel_state_ht,
4334 hash_channel_key(&latest_sample.key),
4335 match_channel_state_sample,
4336 &latest_sample.key,
4337 &iter);
4338 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4339 if (caa_likely(node)) {
ab0ee2ca
JG
4340 struct channel_state_sample *stored_sample;
4341
4342 /* Update the sample stored. */
4343 stored_sample = caa_container_of(node,
4344 struct channel_state_sample,
4345 channel_state_ht_node);
e8360425 4346
ab0ee2ca
JG
4347 memcpy(&previous_sample, stored_sample,
4348 sizeof(previous_sample));
4349 stored_sample->highest_usage = latest_sample.highest_usage;
4350 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 4351 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 4352 previous_sample_available = true;
e8360425
JD
4353
4354 latest_session_consumed_total =
4355 previous_session_consumed_total +
4356 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
4357 } else {
4358 /*
4359 * This is the channel's first sample, allocate space for and
4360 * store the new sample.
4361 */
4362 struct channel_state_sample *stored_sample;
4363
4364 stored_sample = zmalloc(sizeof(*stored_sample));
4365 if (!stored_sample) {
4366 ret = -1;
4367 goto end_unlock;
4368 }
4369
4370 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
4371 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
4372 cds_lfht_add(state->channel_state_ht,
4373 hash_channel_key(&stored_sample->key),
4374 &stored_sample->channel_state_ht_node);
e8360425
JD
4375
4376 latest_session_consumed_total =
4377 previous_session_consumed_total +
4378 latest_sample.channel_total_consumed;
ab0ee2ca
JG
4379 }
4380
e8360425
JD
4381 channel_info->session_info->consumed_data_size =
4382 latest_session_consumed_total;
4383
ab0ee2ca
JG
4384 /* Find triggers associated with this channel. */
4385 cds_lfht_lookup(state->channel_triggers_ht,
4386 hash_channel_key(&latest_sample.key),
4387 match_channel_trigger_list,
4388 &latest_sample.key,
4389 &iter);
4390 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 4391 if (caa_likely(!node)) {
ab0ee2ca
JG
4392 goto end_unlock;
4393 }
4394
4395 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
4396 channel_triggers_ht_node);
4397 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
4398 node) {
9b63a4aa
JG
4399 const struct lttng_condition *condition;
4400 const struct lttng_action *action;
1831ae68
FD
4401 struct lttng_trigger *trigger;
4402 struct notification_client_list *client_list = NULL;
ab0ee2ca 4403 struct lttng_evaluation *evaluation = NULL;
1831ae68 4404 bool client_list_is_empty;
ab0ee2ca 4405
1831ae68 4406 ret = 0;
ab0ee2ca 4407 trigger = trigger_list_element->trigger;
9b63a4aa 4408 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 4409 assert(condition);
9b63a4aa 4410 action = lttng_trigger_get_const_action(trigger);
1831ae68
FD
4411
4412 if (!lttng_trigger_is_ready_to_fire(trigger)) {
4413 goto put_list;
4414 }
ab0ee2ca
JG
4415
4416 /* Notify actions are the only type currently supported. */
1831ae68 4417 /* TODO support other type of action */
9b63a4aa 4418 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
4419 LTTNG_ACTION_TYPE_NOTIFY);
4420
4421 /*
4422 * Check if any client is subscribed to the result of this
4423 * evaluation.
4424 */
ed327204
JG
4425 client_list = get_client_list_from_condition(state, condition);
4426 assert(client_list);
1831ae68
FD
4427 client_list_is_empty = cds_list_empty(&client_list->list);
4428 if (client_list_is_empty) {
ab0ee2ca
JG
4429 /*
4430 * No clients interested in the evaluation's result,
4431 * skip it.
4432 */
1831ae68 4433 goto put_list;
ab0ee2ca
JG
4434 }
4435
ed327204 4436 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 4437 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
4438 &latest_sample,
4439 previous_session_consumed_total,
4440 latest_session_consumed_total,
4441 channel_info);
4442 if (caa_unlikely(ret)) {
1831ae68 4443 goto put_list;
ab0ee2ca
JG
4444 }
4445
e8360425 4446 if (caa_likely(!evaluation)) {
1831ae68 4447 goto put_list;
ab0ee2ca
JG
4448 }
4449
4450 /* Dispatch evaluation result to all clients. */
4451 ret = send_evaluation_to_clients(trigger_list_element->trigger,
4452 evaluation, client_list, state,
8abe313a
JG
4453 channel_info->session_info->uid,
4454 channel_info->session_info->gid);
4455 lttng_evaluation_destroy(evaluation);
1831ae68
FD
4456put_list:
4457 notification_client_list_put(client_list);
e0b5f87b 4458 if (caa_unlikely(ret)) {
1831ae68 4459 break;
ab0ee2ca
JG
4460 }
4461 }
4462end_unlock:
4463 rcu_read_unlock();
4464end:
4465 return ret;
4466}
This page took 0.245889 seconds and 5 git commands to generate.