sessiond: notification: synchronize notification client (and list)
[lttng-tools.git] / src / bin / lttng-sessiond / notification-thread-events.c
CommitLineData
ab0ee2ca 1/*
ab5be9fa 2 * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
ab0ee2ca 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
ab0ee2ca 5 *
ab0ee2ca
JG
6 */
7
8#define _LGPL_SOURCE
9#include <urcu.h>
10#include <urcu/rculfhash.h>
11
ab0ee2ca
JG
12#include <common/defaults.h>
13#include <common/error.h>
14#include <common/futex.h>
15#include <common/unix.h>
16#include <common/dynamic-buffer.h>
17#include <common/hashtable/utils.h>
18#include <common/sessiond-comm/sessiond-comm.h>
19#include <common/macros.h>
20#include <lttng/condition/condition.h>
9b63a4aa 21#include <lttng/action/action-internal.h>
ab0ee2ca
JG
22#include <lttng/notification/notification-internal.h>
23#include <lttng/condition/condition-internal.h>
24#include <lttng/condition/buffer-usage-internal.h>
e8360425 25#include <lttng/condition/session-consumed-size-internal.h>
ea9a44f0 26#include <lttng/condition/session-rotation-internal.h>
ab0ee2ca 27#include <lttng/notification/channel-internal.h>
1da26331 28
ab0ee2ca
JG
29#include <time.h>
30#include <unistd.h>
31#include <assert.h>
32#include <inttypes.h>
d53ea4e4 33#include <fcntl.h>
ab0ee2ca 34
1da26331
JG
35#include "notification-thread.h"
36#include "notification-thread-events.h"
37#include "notification-thread-commands.h"
38#include "lttng-sessiond.h"
39#include "kernel.h"
40
ab0ee2ca
JG
41#define CLIENT_POLL_MASK_IN (LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP)
42#define CLIENT_POLL_MASK_IN_OUT (CLIENT_POLL_MASK_IN | LPOLLOUT)
43
51eab943
JG
44enum lttng_object_type {
45 LTTNG_OBJECT_TYPE_UNKNOWN,
46 LTTNG_OBJECT_TYPE_NONE,
47 LTTNG_OBJECT_TYPE_CHANNEL,
48 LTTNG_OBJECT_TYPE_SESSION,
49};
50
ab0ee2ca 51struct lttng_trigger_list_element {
9e0bb80e
JG
52 /* No ownership of the trigger object is assumed. */
53 const struct lttng_trigger *trigger;
ab0ee2ca
JG
54 struct cds_list_head node;
55};
56
57struct lttng_channel_trigger_list {
58 struct channel_key channel_key;
ea9a44f0 59 /* List of struct lttng_trigger_list_element. */
ab0ee2ca 60 struct cds_list_head list;
ea9a44f0 61 /* Node in the channel_triggers_ht */
ab0ee2ca 62 struct cds_lfht_node channel_triggers_ht_node;
83b934ad
MD
63 /* call_rcu delayed reclaim. */
64 struct rcu_head rcu_node;
ab0ee2ca
JG
65};
66
ea9a44f0
JG
67/*
68 * List of triggers applying to a given session.
69 *
70 * See:
71 * - lttng_session_trigger_list_create()
72 * - lttng_session_trigger_list_build()
73 * - lttng_session_trigger_list_destroy()
74 * - lttng_session_trigger_list_add()
75 */
76struct lttng_session_trigger_list {
77 /*
78 * Not owned by this; points to the session_info structure's
79 * session name.
80 */
81 const char *session_name;
82 /* List of struct lttng_trigger_list_element. */
83 struct cds_list_head list;
84 /* Node in the session_triggers_ht */
85 struct cds_lfht_node session_triggers_ht_node;
86 /*
87 * Weak reference to the notification system's session triggers
88 * hashtable.
89 *
90 * The session trigger list structure structure is owned by
91 * the session's session_info.
92 *
93 * The session_info is kept alive the the channel_infos holding a
94 * reference to it (reference counting). When those channels are
95 * destroyed (at runtime or on teardown), the reference they hold
96 * to the session_info are released. On destruction of session_info,
97 * session_info_destroy() will remove the list of triggers applying
98 * to this session from the notification system's state.
99 *
100 * This implies that the session_triggers_ht must be destroyed
101 * after the channels.
102 */
103 struct cds_lfht *session_triggers_ht;
104 /* Used for delayed RCU reclaim. */
105 struct rcu_head rcu_node;
106};
107
ab0ee2ca
JG
108struct lttng_trigger_ht_element {
109 struct lttng_trigger *trigger;
110 struct cds_lfht_node node;
83b934ad
MD
111 /* call_rcu delayed reclaim. */
112 struct rcu_head rcu_node;
ab0ee2ca
JG
113};
114
115struct lttng_condition_list_element {
116 struct lttng_condition *condition;
117 struct cds_list_head node;
118};
119
120struct notification_client_list_element {
121 struct notification_client *client;
122 struct cds_list_head node;
123};
124
505b2d90
JG
125/*
126 * Thread safety of notification_client and notification_client_list.
127 *
128 * The notification thread (main thread) and the action executor
129 * interact through client lists. Hence, when the action executor
130 * thread looks-up the list of clients subscribed to a given
131 * condition, it will acquire a reference to the list and lock it
132 * while attempting to communicate with the various clients.
133 *
134 * It is not necessary to reference-count clients as they are guaranteed
135 * to be 'alive' if they are present in a list and that list is locked. Indeed,
136 * removing references to the client from those subscription lists is part of
137 * the work performed on destruction of a client.
138 *
139 * No provision for other access scenarios are taken into account;
140 * this is the bare minimum to make these accesses safe and the
141 * notification thread's state is _not_ "thread-safe" in any general
142 * sense.
143 */
ab0ee2ca 144struct notification_client_list {
505b2d90
JG
145 pthread_mutex_t lock;
146 struct urcu_ref ref;
f82f93a1 147 const struct lttng_trigger *trigger;
ab0ee2ca 148 struct cds_list_head list;
505b2d90
JG
149 /* Weak reference to container. */
150 struct cds_lfht *notification_trigger_clients_ht;
151 struct cds_lfht_node notification_trigger_clients_ht_node;
83b934ad
MD
152 /* call_rcu delayed reclaim. */
153 struct rcu_head rcu_node;
ab0ee2ca
JG
154};
155
156struct notification_client {
505b2d90
JG
157 /* Nests within the notification_client_list lock. */
158 pthread_mutex_t lock;
ac1889bf 159 notification_client_id id;
ab0ee2ca
JG
160 int socket;
161 /* Client protocol version. */
162 uint8_t major, minor;
163 uid_t uid;
164 gid_t gid;
165 /*
5332364f 166 * Indicates if the credentials and versions of the client have been
ab0ee2ca
JG
167 * checked.
168 */
169 bool validated;
170 /*
171 * Conditions to which the client's notification channel is subscribed.
172 * List of struct lttng_condition_list_node. The condition member is
173 * owned by the client.
174 */
175 struct cds_list_head condition_list;
176 struct cds_lfht_node client_socket_ht_node;
ac1889bf 177 struct cds_lfht_node client_id_ht_node;
ab0ee2ca 178 struct {
1b5edb83 179 /*
505b2d90
JG
180 * If a client's communication is inactive, it means that a
181 * fatal error has occurred (could be either a protocol error or
182 * the socket API returned a fatal error). No further
183 * communication should be attempted; the client is queued for
184 * clean-up.
1b5edb83
JG
185 */
186 bool active;
ab0ee2ca 187 struct {
14fa22f8
JG
188 /*
189 * During the reception of a message, the reception
190 * buffers' "size" is set to contain the current
191 * message's complete payload.
192 */
ab0ee2ca
JG
193 struct lttng_dynamic_buffer buffer;
194 /* Bytes left to receive for the current message. */
195 size_t bytes_to_receive;
196 /* Type of the message being received. */
197 enum lttng_notification_channel_message_type msg_type;
198 /*
199 * Indicates whether or not credentials are expected
200 * from the client.
201 */
01ea340e 202 bool expect_creds;
ab0ee2ca
JG
203 /*
204 * Indicates whether or not credentials were received
205 * from the client.
206 */
207 bool creds_received;
14fa22f8 208 /* Only used during credentials reception. */
ab0ee2ca
JG
209 lttng_sock_cred creds;
210 } inbound;
211 struct {
212 /*
213 * Indicates whether or not a notification addressed to
214 * this client was dropped because a command reply was
215 * already buffered.
216 *
217 * A notification is dropped whenever the buffer is not
218 * empty.
219 */
220 bool dropped_notification;
221 /*
222 * Indicates whether or not a command reply is already
223 * buffered. In this case, it means that the client is
224 * not consuming command replies before emitting a new
225 * one. This could be caused by a protocol error or a
226 * misbehaving/malicious client.
227 */
228 bool queued_command_reply;
229 struct lttng_dynamic_buffer buffer;
230 } outbound;
231 } communication;
83b934ad
MD
232 /* call_rcu delayed reclaim. */
233 struct rcu_head rcu_node;
ab0ee2ca
JG
234};
235
236struct channel_state_sample {
237 struct channel_key key;
238 struct cds_lfht_node channel_state_ht_node;
239 uint64_t highest_usage;
240 uint64_t lowest_usage;
e8360425 241 uint64_t channel_total_consumed;
83b934ad
MD
242 /* call_rcu delayed reclaim. */
243 struct rcu_head rcu_node;
ab0ee2ca
JG
244};
245
e4db5ace 246static unsigned long hash_channel_key(struct channel_key *key);
ed327204 247static int evaluate_buffer_condition(const struct lttng_condition *condition,
e4db5ace 248 struct lttng_evaluation **evaluation,
e8360425
JD
249 const struct notification_thread_state *state,
250 const struct channel_state_sample *previous_sample,
251 const struct channel_state_sample *latest_sample,
252 uint64_t previous_session_consumed_total,
253 uint64_t latest_session_consumed_total,
254 struct channel_info *channel_info);
e4db5ace 255static
9b63a4aa
JG
256int send_evaluation_to_clients(const struct lttng_trigger *trigger,
257 const struct lttng_evaluation *evaluation,
e4db5ace
JR
258 struct notification_client_list *client_list,
259 struct notification_thread_state *state,
260 uid_t channel_uid, gid_t channel_gid);
261
8abe313a 262
ea9a44f0 263/* session_info API */
8abe313a
JG
264static
265void session_info_destroy(void *_data);
266static
267void session_info_get(struct session_info *session_info);
268static
269void session_info_put(struct session_info *session_info);
270static
271struct session_info *session_info_create(const char *name,
ea9a44f0 272 uid_t uid, gid_t gid,
1eee26c5
JG
273 struct lttng_session_trigger_list *trigger_list,
274 struct cds_lfht *sessions_ht);
8abe313a
JG
275static
276void session_info_add_channel(struct session_info *session_info,
277 struct channel_info *channel_info);
278static
279void session_info_remove_channel(struct session_info *session_info,
280 struct channel_info *channel_info);
281
ea9a44f0
JG
282/* lttng_session_trigger_list API */
283static
284struct lttng_session_trigger_list *lttng_session_trigger_list_create(
285 const char *session_name,
286 struct cds_lfht *session_triggers_ht);
287static
288struct lttng_session_trigger_list *lttng_session_trigger_list_build(
289 const struct notification_thread_state *state,
290 const char *session_name);
291static
292void lttng_session_trigger_list_destroy(
293 struct lttng_session_trigger_list *list);
294static
295int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
296 const struct lttng_trigger *trigger);
297
298
ab0ee2ca 299static
ac1889bf 300int match_client_socket(struct cds_lfht_node *node, const void *key)
ab0ee2ca
JG
301{
302 /* This double-cast is intended to supress pointer-to-cast warning. */
ac1889bf
JG
303 const int socket = (int) (intptr_t) key;
304 const struct notification_client *client = caa_container_of(node,
305 struct notification_client, client_socket_ht_node);
ab0ee2ca 306
ac1889bf
JG
307 return client->socket == socket;
308}
309
310static
311int match_client_id(struct cds_lfht_node *node, const void *key)
312{
313 /* This double-cast is intended to supress pointer-to-cast warning. */
314 const notification_client_id id = *((notification_client_id *) key);
315 const struct notification_client *client = caa_container_of(
316 node, struct notification_client, client_id_ht_node);
ab0ee2ca 317
ac1889bf 318 return client->id == id;
ab0ee2ca
JG
319}
320
321static
322int match_channel_trigger_list(struct cds_lfht_node *node, const void *key)
323{
324 struct channel_key *channel_key = (struct channel_key *) key;
325 struct lttng_channel_trigger_list *trigger_list;
326
327 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
328 channel_triggers_ht_node);
329
330 return !!((channel_key->key == trigger_list->channel_key.key) &&
331 (channel_key->domain == trigger_list->channel_key.domain));
332}
333
51eab943
JG
334static
335int match_session_trigger_list(struct cds_lfht_node *node, const void *key)
336{
337 const char *session_name = (const char *) key;
338 struct lttng_session_trigger_list *trigger_list;
339
340 trigger_list = caa_container_of(node, struct lttng_session_trigger_list,
341 session_triggers_ht_node);
342
343 return !!(strcmp(trigger_list->session_name, session_name) == 0);
344}
345
ab0ee2ca
JG
346static
347int match_channel_state_sample(struct cds_lfht_node *node, const void *key)
348{
349 struct channel_key *channel_key = (struct channel_key *) key;
350 struct channel_state_sample *sample;
351
352 sample = caa_container_of(node, struct channel_state_sample,
353 channel_state_ht_node);
354
355 return !!((channel_key->key == sample->key.key) &&
356 (channel_key->domain == sample->key.domain));
357}
358
359static
360int match_channel_info(struct cds_lfht_node *node, const void *key)
361{
362 struct channel_key *channel_key = (struct channel_key *) key;
363 struct channel_info *channel_info;
364
365 channel_info = caa_container_of(node, struct channel_info,
366 channels_ht_node);
367
368 return !!((channel_key->key == channel_info->key.key) &&
369 (channel_key->domain == channel_info->key.domain));
370}
371
372static
373int match_condition(struct cds_lfht_node *node, const void *key)
374{
375 struct lttng_condition *condition_key = (struct lttng_condition *) key;
376 struct lttng_trigger_ht_element *trigger;
377 struct lttng_condition *condition;
378
379 trigger = caa_container_of(node, struct lttng_trigger_ht_element,
380 node);
381 condition = lttng_trigger_get_condition(trigger->trigger);
382 assert(condition);
383
384 return !!lttng_condition_is_equal(condition_key, condition);
385}
386
ab0ee2ca
JG
387static
388int match_client_list_condition(struct cds_lfht_node *node, const void *key)
389{
390 struct lttng_condition *condition_key = (struct lttng_condition *) key;
391 struct notification_client_list *client_list;
f82f93a1 392 const struct lttng_condition *condition;
ab0ee2ca
JG
393
394 assert(condition_key);
395
396 client_list = caa_container_of(node, struct notification_client_list,
505b2d90 397 notification_trigger_clients_ht_node);
f82f93a1 398 condition = lttng_trigger_get_const_condition(client_list->trigger);
ab0ee2ca
JG
399
400 return !!lttng_condition_is_equal(condition_key, condition);
401}
402
f82f93a1
JG
403static
404int match_session(struct cds_lfht_node *node, const void *key)
405{
406 const char *name = key;
407 struct session_info *session_info = caa_container_of(
408 node, struct session_info, sessions_ht_node);
409
410 return !strcmp(session_info->name, name);
411}
412
ab0ee2ca
JG
413static
414unsigned long lttng_condition_buffer_usage_hash(
9b63a4aa 415 const struct lttng_condition *_condition)
ab0ee2ca 416{
9a2746aa
JG
417 unsigned long hash;
418 unsigned long condition_type;
ab0ee2ca
JG
419 struct lttng_condition_buffer_usage *condition;
420
421 condition = container_of(_condition,
422 struct lttng_condition_buffer_usage, parent);
423
9a2746aa
JG
424 condition_type = (unsigned long) condition->parent.type;
425 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
ab0ee2ca
JG
426 if (condition->session_name) {
427 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
428 }
429 if (condition->channel_name) {
8f56701f 430 hash ^= hash_key_str(condition->channel_name, lttng_ht_seed);
ab0ee2ca
JG
431 }
432 if (condition->domain.set) {
433 hash ^= hash_key_ulong(
434 (void *) condition->domain.type,
435 lttng_ht_seed);
436 }
437 if (condition->threshold_ratio.set) {
438 uint64_t val;
439
440 val = condition->threshold_ratio.value * (double) UINT32_MAX;
441 hash ^= hash_key_u64(&val, lttng_ht_seed);
6633b0dd 442 } else if (condition->threshold_bytes.set) {
ab0ee2ca
JG
443 uint64_t val;
444
445 val = condition->threshold_bytes.value;
446 hash ^= hash_key_u64(&val, lttng_ht_seed);
447 }
448 return hash;
449}
450
e8360425
JD
451static
452unsigned long lttng_condition_session_consumed_size_hash(
9b63a4aa 453 const struct lttng_condition *_condition)
e8360425 454{
9a2746aa
JG
455 unsigned long hash;
456 unsigned long condition_type =
457 (unsigned long) LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE;
e8360425
JD
458 struct lttng_condition_session_consumed_size *condition;
459 uint64_t val;
460
461 condition = container_of(_condition,
462 struct lttng_condition_session_consumed_size, parent);
463
9a2746aa 464 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
e8360425
JD
465 if (condition->session_name) {
466 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
467 }
468 val = condition->consumed_threshold_bytes.value;
469 hash ^= hash_key_u64(&val, lttng_ht_seed);
470 return hash;
471}
472
a8393880
JG
473static
474unsigned long lttng_condition_session_rotation_hash(
475 const struct lttng_condition *_condition)
476{
477 unsigned long hash, condition_type;
478 struct lttng_condition_session_rotation *condition;
479
480 condition = container_of(_condition,
481 struct lttng_condition_session_rotation, parent);
482 condition_type = (unsigned long) condition->parent.type;
483 hash = hash_key_ulong((void *) condition_type, lttng_ht_seed);
484 assert(condition->session_name);
485 hash ^= hash_key_str(condition->session_name, lttng_ht_seed);
486 return hash;
487}
9a2746aa 488
ab0ee2ca
JG
489/*
490 * The lttng_condition hashing code is kept in this file (rather than
491 * condition.c) since it makes use of GPLv2 code (hashtable utils), which we
492 * don't want to link in liblttng-ctl.
493 */
494static
9b63a4aa 495unsigned long lttng_condition_hash(const struct lttng_condition *condition)
ab0ee2ca
JG
496{
497 switch (condition->type) {
498 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
499 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
500 return lttng_condition_buffer_usage_hash(condition);
e8360425
JD
501 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
502 return lttng_condition_session_consumed_size_hash(condition);
a8393880
JG
503 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
504 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
505 return lttng_condition_session_rotation_hash(condition);
ab0ee2ca
JG
506 default:
507 ERR("[notification-thread] Unexpected condition type caught");
508 abort();
509 }
510}
511
e4db5ace
JR
512static
513unsigned long hash_channel_key(struct channel_key *key)
514{
515 unsigned long key_hash = hash_key_u64(&key->key, lttng_ht_seed);
516 unsigned long domain_hash = hash_key_ulong(
517 (void *) (unsigned long) key->domain, lttng_ht_seed);
518
519 return key_hash ^ domain_hash;
520}
521
ac1889bf
JG
522static
523unsigned long hash_client_socket(int socket)
524{
525 return hash_key_ulong((void *) (unsigned long) socket, lttng_ht_seed);
526}
527
528static
529unsigned long hash_client_id(notification_client_id id)
530{
531 return hash_key_u64(&id, lttng_ht_seed);
532}
533
f82f93a1
JG
534/*
535 * Get the type of object to which a given condition applies. Bindings let
536 * the notification system evaluate a trigger's condition when a given
537 * object's state is updated.
538 *
539 * For instance, a condition bound to a channel will be evaluated everytime
540 * the channel's state is changed by a channel monitoring sample.
541 */
542static
543enum lttng_object_type get_condition_binding_object(
544 const struct lttng_condition *condition)
545{
546 switch (lttng_condition_get_type(condition)) {
547 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
548 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
549 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
e742b055 550 return LTTNG_OBJECT_TYPE_CHANNEL;
f82f93a1
JG
551 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
552 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
553 return LTTNG_OBJECT_TYPE_SESSION;
554 default:
555 return LTTNG_OBJECT_TYPE_UNKNOWN;
556 }
557}
558
83b934ad
MD
559static
560void free_channel_info_rcu(struct rcu_head *node)
561{
562 free(caa_container_of(node, struct channel_info, rcu_node));
563}
564
ab0ee2ca
JG
565static
566void channel_info_destroy(struct channel_info *channel_info)
567{
568 if (!channel_info) {
569 return;
570 }
571
8abe313a
JG
572 if (channel_info->session_info) {
573 session_info_remove_channel(channel_info->session_info,
574 channel_info);
575 session_info_put(channel_info->session_info);
ab0ee2ca 576 }
8abe313a
JG
577 if (channel_info->name) {
578 free(channel_info->name);
ab0ee2ca 579 }
83b934ad
MD
580 call_rcu(&channel_info->rcu_node, free_channel_info_rcu);
581}
582
583static
584void free_session_info_rcu(struct rcu_head *node)
585{
586 free(caa_container_of(node, struct session_info, rcu_node));
ab0ee2ca
JG
587}
588
8abe313a 589/* Don't call directly, use the ref-counting mechanism. */
ab0ee2ca 590static
8abe313a 591void session_info_destroy(void *_data)
ab0ee2ca 592{
8abe313a 593 struct session_info *session_info = _data;
3b68d0a3 594 int ret;
ab0ee2ca 595
8abe313a
JG
596 assert(session_info);
597 if (session_info->channel_infos_ht) {
3b68d0a3
JR
598 ret = cds_lfht_destroy(session_info->channel_infos_ht, NULL);
599 if (ret) {
4c3f302b 600 ERR("[notification-thread] Failed to destroy channel information hash table");
3b68d0a3 601 }
8abe313a 602 }
ea9a44f0 603 lttng_session_trigger_list_destroy(session_info->trigger_list);
1eee26c5
JG
604
605 rcu_read_lock();
606 cds_lfht_del(session_info->sessions_ht,
607 &session_info->sessions_ht_node);
608 rcu_read_unlock();
8abe313a 609 free(session_info->name);
83b934ad 610 call_rcu(&session_info->rcu_node, free_session_info_rcu);
8abe313a 611}
ab0ee2ca 612
8abe313a
JG
613static
614void session_info_get(struct session_info *session_info)
615{
616 if (!session_info) {
617 return;
618 }
619 lttng_ref_get(&session_info->ref);
620}
621
622static
623void session_info_put(struct session_info *session_info)
624{
625 if (!session_info) {
626 return;
ab0ee2ca 627 }
8abe313a
JG
628 lttng_ref_put(&session_info->ref);
629}
630
631static
ea9a44f0 632struct session_info *session_info_create(const char *name, uid_t uid, gid_t gid,
1eee26c5
JG
633 struct lttng_session_trigger_list *trigger_list,
634 struct cds_lfht *sessions_ht)
8abe313a
JG
635{
636 struct session_info *session_info;
ab0ee2ca 637
8abe313a 638 assert(name);
ab0ee2ca 639
8abe313a
JG
640 session_info = zmalloc(sizeof(*session_info));
641 if (!session_info) {
642 goto end;
643 }
644 lttng_ref_init(&session_info->ref, session_info_destroy);
645
646 session_info->channel_infos_ht = cds_lfht_new(DEFAULT_HT_SIZE,
647 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
648 if (!session_info->channel_infos_ht) {
ab0ee2ca
JG
649 goto error;
650 }
8abe313a
JG
651
652 cds_lfht_node_init(&session_info->sessions_ht_node);
653 session_info->name = strdup(name);
654 if (!session_info->name) {
ab0ee2ca
JG
655 goto error;
656 }
8abe313a
JG
657 session_info->uid = uid;
658 session_info->gid = gid;
ea9a44f0 659 session_info->trigger_list = trigger_list;
1eee26c5 660 session_info->sessions_ht = sessions_ht;
8abe313a
JG
661end:
662 return session_info;
663error:
664 session_info_put(session_info);
665 return NULL;
666}
667
668static
669void session_info_add_channel(struct session_info *session_info,
670 struct channel_info *channel_info)
671{
672 rcu_read_lock();
673 cds_lfht_add(session_info->channel_infos_ht,
674 hash_channel_key(&channel_info->key),
675 &channel_info->session_info_channels_ht_node);
676 rcu_read_unlock();
677}
678
679static
680void session_info_remove_channel(struct session_info *session_info,
681 struct channel_info *channel_info)
682{
683 rcu_read_lock();
684 cds_lfht_del(session_info->channel_infos_ht,
685 &channel_info->session_info_channels_ht_node);
686 rcu_read_unlock();
687}
688
689static
690struct channel_info *channel_info_create(const char *channel_name,
691 struct channel_key *channel_key, uint64_t channel_capacity,
692 struct session_info *session_info)
693{
694 struct channel_info *channel_info = zmalloc(sizeof(*channel_info));
695
696 if (!channel_info) {
697 goto end;
698 }
699
ab0ee2ca 700 cds_lfht_node_init(&channel_info->channels_ht_node);
8abe313a
JG
701 cds_lfht_node_init(&channel_info->session_info_channels_ht_node);
702 memcpy(&channel_info->key, channel_key, sizeof(*channel_key));
703 channel_info->capacity = channel_capacity;
704
705 channel_info->name = strdup(channel_name);
706 if (!channel_info->name) {
707 goto error;
708 }
709
710 /*
711 * Set the references between session and channel infos:
712 * - channel_info holds a strong reference to session_info
713 * - session_info holds a weak reference to channel_info
714 */
715 session_info_get(session_info);
716 session_info_add_channel(session_info, channel_info);
717 channel_info->session_info = session_info;
ab0ee2ca 718end:
8abe313a 719 return channel_info;
ab0ee2ca 720error:
8abe313a 721 channel_info_destroy(channel_info);
ab0ee2ca
JG
722 return NULL;
723}
724
505b2d90
JG
725static
726bool notification_client_list_get(struct notification_client_list *list)
727{
728 return urcu_ref_get_unless_zero(&list->ref);
729}
730
731static
732void free_notification_client_list_rcu(struct rcu_head *node)
733{
734 free(caa_container_of(node, struct notification_client_list,
735 rcu_node));
736}
737
738static
739void notification_client_list_release(struct urcu_ref *list_ref)
740{
741 struct notification_client_list *list =
742 container_of(list_ref, typeof(*list), ref);
743 struct notification_client_list_element *client_list_element, *tmp;
744
745 if (list->notification_trigger_clients_ht) {
746 rcu_read_lock();
747 cds_lfht_del(list->notification_trigger_clients_ht,
748 &list->notification_trigger_clients_ht_node);
749 rcu_read_unlock();
750 list->notification_trigger_clients_ht = NULL;
751 }
752 cds_list_for_each_entry_safe(client_list_element, tmp,
753 &list->list, node) {
754 free(client_list_element);
755 }
756 pthread_mutex_destroy(&list->lock);
757 call_rcu(&list->rcu_node, free_notification_client_list_rcu);
758}
759
760static
761struct notification_client_list *notification_client_list_create(
762 const struct lttng_trigger *trigger)
763{
764 struct notification_client_list *client_list =
765 zmalloc(sizeof(*client_list));
766
767 if (!client_list) {
768 goto error;
769 }
770 pthread_mutex_init(&client_list->lock, NULL);
771 urcu_ref_init(&client_list->ref);
772 cds_lfht_node_init(&client_list->notification_trigger_clients_ht_node);
773 CDS_INIT_LIST_HEAD(&client_list->list);
774 client_list->trigger = trigger;
775error:
776 return client_list;
777}
778
779static
780void publish_notification_client_list(
781 struct notification_thread_state *state,
782 struct notification_client_list *list)
783{
784 const struct lttng_condition *condition =
785 lttng_trigger_get_const_condition(list->trigger);
786
787 assert(!list->notification_trigger_clients_ht);
788
789 list->notification_trigger_clients_ht =
790 state->notification_trigger_clients_ht;
791
792 rcu_read_lock();
793 cds_lfht_add(state->notification_trigger_clients_ht,
794 lttng_condition_hash(condition),
795 &list->notification_trigger_clients_ht_node);
796 rcu_read_unlock();
797}
798
799static
800void notification_client_list_put(struct notification_client_list *list)
801{
802 if (!list) {
803 return;
804 }
805 return urcu_ref_put(&list->ref, notification_client_list_release);
806}
807
808/* Provides a reference to the returned list. */
ed327204
JG
809static
810struct notification_client_list *get_client_list_from_condition(
811 struct notification_thread_state *state,
812 const struct lttng_condition *condition)
813{
814 struct cds_lfht_node *node;
815 struct cds_lfht_iter iter;
505b2d90 816 struct notification_client_list *list = NULL;
ed327204 817
505b2d90 818 rcu_read_lock();
ed327204
JG
819 cds_lfht_lookup(state->notification_trigger_clients_ht,
820 lttng_condition_hash(condition),
821 match_client_list_condition,
822 condition,
823 &iter);
824 node = cds_lfht_iter_get_node(&iter);
505b2d90
JG
825 if (node) {
826 list = container_of(node, struct notification_client_list,
827 notification_trigger_clients_ht_node);
828 list = notification_client_list_get(list) ? list : NULL;
829 }
ed327204 830
505b2d90
JG
831 rcu_read_unlock();
832 return list;
ed327204
JG
833}
834
e4db5ace 835static
f82f93a1
JG
836int evaluate_channel_condition_for_client(
837 const struct lttng_condition *condition,
838 struct notification_thread_state *state,
839 struct lttng_evaluation **evaluation,
840 uid_t *session_uid, gid_t *session_gid)
e4db5ace
JR
841{
842 int ret;
843 struct cds_lfht_iter iter;
844 struct cds_lfht_node *node;
845 struct channel_info *channel_info = NULL;
846 struct channel_key *channel_key = NULL;
847 struct channel_state_sample *last_sample = NULL;
848 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
e4db5ace 849
505b2d90
JG
850 rcu_read_lock();
851
f82f93a1 852 /* Find the channel associated with the condition. */
e4db5ace 853 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter,
f82f93a1 854 channel_trigger_list, channel_triggers_ht_node) {
e4db5ace
JR
855 struct lttng_trigger_list_element *element;
856
857 cds_list_for_each_entry(element, &channel_trigger_list->list, node) {
9b63a4aa 858 const struct lttng_condition *current_condition =
f82f93a1 859 lttng_trigger_get_const_condition(
e4db5ace
JR
860 element->trigger);
861
862 assert(current_condition);
863 if (!lttng_condition_is_equal(condition,
2ae99f0b 864 current_condition)) {
e4db5ace
JR
865 continue;
866 }
867
868 /* Found the trigger, save the channel key. */
869 channel_key = &channel_trigger_list->channel_key;
870 break;
871 }
872 if (channel_key) {
873 /* The channel key was found stop iteration. */
874 break;
875 }
876 }
877
878 if (!channel_key){
879 /* No channel found; normal exit. */
f82f93a1 880 DBG("[notification-thread] No known channel associated with newly subscribed-to condition");
e4db5ace
JR
881 ret = 0;
882 goto end;
883 }
884
885 /* Fetch channel info for the matching channel. */
886 cds_lfht_lookup(state->channels_ht,
887 hash_channel_key(channel_key),
888 match_channel_info,
889 channel_key,
890 &iter);
891 node = cds_lfht_iter_get_node(&iter);
892 assert(node);
893 channel_info = caa_container_of(node, struct channel_info,
894 channels_ht_node);
895
896 /* Retrieve the channel's last sample, if it exists. */
897 cds_lfht_lookup(state->channel_state_ht,
898 hash_channel_key(channel_key),
899 match_channel_state_sample,
900 channel_key,
901 &iter);
902 node = cds_lfht_iter_get_node(&iter);
903 if (node) {
904 last_sample = caa_container_of(node,
905 struct channel_state_sample,
906 channel_state_ht_node);
907 } else {
908 /* Nothing to evaluate, no sample was ever taken. Normal exit */
909 DBG("[notification-thread] No channel sample associated with newly subscribed-to condition");
910 ret = 0;
911 goto end;
912 }
913
f82f93a1 914 ret = evaluate_buffer_condition(condition, evaluation, state,
e8360425
JD
915 NULL, last_sample,
916 0, channel_info->session_info->consumed_data_size,
917 channel_info);
e4db5ace 918 if (ret) {
066a3c82 919 WARN("[notification-thread] Fatal error occurred while evaluating a newly subscribed-to condition");
e4db5ace
JR
920 goto end;
921 }
922
f82f93a1
JG
923 *session_uid = channel_info->session_info->uid;
924 *session_gid = channel_info->session_info->gid;
925end:
505b2d90 926 rcu_read_unlock();
f82f93a1
JG
927 return ret;
928}
929
930static
931const char *get_condition_session_name(const struct lttng_condition *condition)
932{
933 const char *session_name = NULL;
934 enum lttng_condition_status status;
935
936 switch (lttng_condition_get_type(condition)) {
937 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
938 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
939 status = lttng_condition_buffer_usage_get_session_name(
940 condition, &session_name);
941 break;
942 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
943 status = lttng_condition_session_consumed_size_get_session_name(
944 condition, &session_name);
945 break;
946 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
947 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
948 status = lttng_condition_session_rotation_get_session_name(
949 condition, &session_name);
950 break;
951 default:
952 abort();
953 }
954 if (status != LTTNG_CONDITION_STATUS_OK) {
955 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
956 goto end;
957 }
958end:
959 return session_name;
960}
961
f82f93a1
JG
962static
963int evaluate_session_condition_for_client(
964 const struct lttng_condition *condition,
965 struct notification_thread_state *state,
966 struct lttng_evaluation **evaluation,
967 uid_t *session_uid, gid_t *session_gid)
968{
969 int ret;
970 struct cds_lfht_iter iter;
971 struct cds_lfht_node *node;
972 const char *session_name;
973 struct session_info *session_info = NULL;
974
505b2d90 975 rcu_read_lock();
f82f93a1
JG
976 session_name = get_condition_session_name(condition);
977
978 /* Find the session associated with the trigger. */
979 cds_lfht_lookup(state->sessions_ht,
980 hash_key_str(session_name, lttng_ht_seed),
981 match_session,
982 session_name,
983 &iter);
984 node = cds_lfht_iter_get_node(&iter);
985 if (!node) {
986 DBG("[notification-thread] No known session matching name \"%s\"",
987 session_name);
988 ret = 0;
989 goto end;
990 }
991
992 session_info = caa_container_of(node, struct session_info,
993 sessions_ht_node);
994 session_info_get(session_info);
995
996 /*
997 * Evaluation is performed in-line here since only one type of
998 * session-bound condition is handled for the moment.
999 */
1000 switch (lttng_condition_get_type(condition)) {
1001 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1002 if (!session_info->rotation.ongoing) {
be558f88 1003 ret = 0;
f82f93a1
JG
1004 goto end_session_put;
1005 }
1006
1007 *evaluation = lttng_evaluation_session_rotation_ongoing_create(
1008 session_info->rotation.id);
1009 if (!*evaluation) {
1010 /* Fatal error. */
1011 ERR("[notification-thread] Failed to create session rotation ongoing evaluation for session \"%s\"",
1012 session_info->name);
1013 ret = -1;
1014 goto end_session_put;
1015 }
1016 ret = 0;
1017 break;
1018 default:
1019 ret = 0;
1020 goto end_session_put;
1021 }
1022
1023 *session_uid = session_info->uid;
1024 *session_gid = session_info->gid;
1025
1026end_session_put:
1027 session_info_put(session_info);
1028end:
505b2d90 1029 rcu_read_unlock();
f82f93a1
JG
1030 return ret;
1031}
1032
f82f93a1
JG
1033static
1034int evaluate_condition_for_client(const struct lttng_trigger *trigger,
1035 const struct lttng_condition *condition,
1036 struct notification_client *client,
1037 struct notification_thread_state *state)
1038{
1039 int ret;
1040 struct lttng_evaluation *evaluation = NULL;
505b2d90
JG
1041 struct notification_client_list client_list = {
1042 .lock = PTHREAD_MUTEX_INITIALIZER,
1043 };
f82f93a1
JG
1044 struct notification_client_list_element client_list_element = { 0 };
1045 uid_t object_uid = 0;
1046 gid_t object_gid = 0;
1047
1048 assert(trigger);
1049 assert(condition);
1050 assert(client);
1051 assert(state);
1052
1053 switch (get_condition_binding_object(condition)) {
1054 case LTTNG_OBJECT_TYPE_SESSION:
1055 ret = evaluate_session_condition_for_client(condition, state,
1056 &evaluation, &object_uid, &object_gid);
1057 break;
1058 case LTTNG_OBJECT_TYPE_CHANNEL:
1059 ret = evaluate_channel_condition_for_client(condition, state,
1060 &evaluation, &object_uid, &object_gid);
1061 break;
1062 case LTTNG_OBJECT_TYPE_NONE:
1063 ret = 0;
1064 goto end;
1065 case LTTNG_OBJECT_TYPE_UNKNOWN:
1066 default:
1067 ret = -1;
1068 goto end;
1069 }
af0c318d
JG
1070 if (ret) {
1071 /* Fatal error. */
1072 goto end;
1073 }
e4db5ace
JR
1074 if (!evaluation) {
1075 /* Evaluation yielded nothing. Normal exit. */
1076 DBG("[notification-thread] Newly subscribed-to condition evaluated to false, nothing to report to client");
1077 ret = 0;
1078 goto end;
1079 }
1080
1081 /*
1082 * Create a temporary client list with the client currently
1083 * subscribing.
1084 */
505b2d90 1085 cds_lfht_node_init(&client_list.notification_trigger_clients_ht_node);
e4db5ace
JR
1086 CDS_INIT_LIST_HEAD(&client_list.list);
1087 client_list.trigger = trigger;
1088
1089 CDS_INIT_LIST_HEAD(&client_list_element.node);
1090 client_list_element.client = client;
1091 cds_list_add(&client_list_element.node, &client_list.list);
1092
1093 /* Send evaluation result to the newly-subscribed client. */
1094 DBG("[notification-thread] Newly subscribed-to condition evaluated to true, notifying client");
1095 ret = send_evaluation_to_clients(trigger, evaluation, &client_list,
f82f93a1 1096 state, object_uid, object_gid);
e4db5ace
JR
1097
1098end:
1099 return ret;
1100}
1101
ab0ee2ca
JG
1102static
1103int notification_thread_client_subscribe(struct notification_client *client,
1104 struct lttng_condition *condition,
1105 struct notification_thread_state *state,
1106 enum lttng_notification_channel_status *_status)
1107{
1108 int ret = 0;
505b2d90 1109 struct notification_client_list *client_list = NULL;
ab0ee2ca
JG
1110 struct lttng_condition_list_element *condition_list_element = NULL;
1111 struct notification_client_list_element *client_list_element = NULL;
1112 enum lttng_notification_channel_status status =
1113 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1114
1115 /*
1116 * Ensure that the client has not already subscribed to this condition
1117 * before.
1118 */
1119 cds_list_for_each_entry(condition_list_element, &client->condition_list, node) {
1120 if (lttng_condition_is_equal(condition_list_element->condition,
1121 condition)) {
1122 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_ALREADY_SUBSCRIBED;
1123 goto end;
1124 }
1125 }
1126
1127 condition_list_element = zmalloc(sizeof(*condition_list_element));
1128 if (!condition_list_element) {
1129 ret = -1;
1130 goto error;
1131 }
1132 client_list_element = zmalloc(sizeof(*client_list_element));
1133 if (!client_list_element) {
1134 ret = -1;
1135 goto error;
1136 }
1137
ab0ee2ca
JG
1138 /*
1139 * Add the newly-subscribed condition to the client's subscription list.
1140 */
1141 CDS_INIT_LIST_HEAD(&condition_list_element->node);
1142 condition_list_element->condition = condition;
1143 cds_list_add(&condition_list_element->node, &client->condition_list);
1144
ed327204
JG
1145 client_list = get_client_list_from_condition(state, condition);
1146 if (!client_list) {
2ae99f0b
JG
1147 /*
1148 * No notification-emiting trigger registered with this
1149 * condition. We don't evaluate the condition right away
1150 * since this trigger is not registered yet.
1151 */
4fb43b68 1152 free(client_list_element);
505b2d90 1153 goto end;
ab0ee2ca
JG
1154 }
1155
2ae99f0b
JG
1156 /*
1157 * The condition to which the client just subscribed is evaluated
1158 * at this point so that conditions that are already TRUE result
1159 * in a notification being sent out.
505b2d90
JG
1160 *
1161 * The client_list's trigger is used without locking the list itself.
1162 * This is correct since the list doesn't own the trigger and the
1163 * object is immutable.
2ae99f0b 1164 */
e4db5ace
JR
1165 if (evaluate_condition_for_client(client_list->trigger, condition,
1166 client, state)) {
1167 WARN("[notification-thread] Evaluation of a condition on client subscription failed, aborting.");
1168 ret = -1;
4bd5b4af 1169 free(client_list_element);
505b2d90 1170 goto end;
e4db5ace
JR
1171 }
1172
1173 /*
1174 * Add the client to the list of clients interested in a given trigger
1175 * if a "notification" trigger with a corresponding condition was
1176 * added prior.
1177 */
ab0ee2ca
JG
1178 client_list_element->client = client;
1179 CDS_INIT_LIST_HEAD(&client_list_element->node);
505b2d90
JG
1180
1181 pthread_mutex_lock(&client_list->lock);
ab0ee2ca 1182 cds_list_add(&client_list_element->node, &client_list->list);
505b2d90 1183 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca
JG
1184end:
1185 if (_status) {
1186 *_status = status;
1187 }
505b2d90
JG
1188 if (client_list) {
1189 notification_client_list_put(client_list);
1190 }
ab0ee2ca
JG
1191 return ret;
1192error:
1193 free(condition_list_element);
1194 free(client_list_element);
1195 return ret;
1196}
1197
1198static
1199int notification_thread_client_unsubscribe(
1200 struct notification_client *client,
1201 struct lttng_condition *condition,
1202 struct notification_thread_state *state,
1203 enum lttng_notification_channel_status *_status)
1204{
ab0ee2ca
JG
1205 struct notification_client_list *client_list;
1206 struct lttng_condition_list_element *condition_list_element,
1207 *condition_tmp;
1208 struct notification_client_list_element *client_list_element,
1209 *client_tmp;
1210 bool condition_found = false;
1211 enum lttng_notification_channel_status status =
1212 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
1213
1214 /* Remove the condition from the client's condition list. */
1215 cds_list_for_each_entry_safe(condition_list_element, condition_tmp,
1216 &client->condition_list, node) {
1217 if (!lttng_condition_is_equal(condition_list_element->condition,
1218 condition)) {
1219 continue;
1220 }
1221
1222 cds_list_del(&condition_list_element->node);
1223 /*
1224 * The caller may be iterating on the client's conditions to
1225 * tear down a client's connection. In this case, the condition
1226 * will be destroyed at the end.
1227 */
1228 if (condition != condition_list_element->condition) {
1229 lttng_condition_destroy(
1230 condition_list_element->condition);
1231 }
1232 free(condition_list_element);
1233 condition_found = true;
1234 break;
1235 }
1236
1237 if (!condition_found) {
1238 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNKNOWN_CONDITION;
1239 goto end;
1240 }
1241
1242 /*
1243 * Remove the client from the list of clients interested the trigger
1244 * matching the condition.
1245 */
ed327204
JG
1246 client_list = get_client_list_from_condition(state, condition);
1247 if (!client_list) {
505b2d90 1248 goto end;
ab0ee2ca
JG
1249 }
1250
505b2d90 1251 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
1252 cds_list_for_each_entry_safe(client_list_element, client_tmp,
1253 &client_list->list, node) {
505b2d90 1254 if (client_list_element->client->id != client->id) {
ab0ee2ca
JG
1255 continue;
1256 }
1257 cds_list_del(&client_list_element->node);
1258 free(client_list_element);
1259 break;
1260 }
505b2d90
JG
1261 pthread_mutex_unlock(&client_list->lock);
1262 notification_client_list_put(client_list);
1263 client_list = NULL;
ab0ee2ca
JG
1264end:
1265 lttng_condition_destroy(condition);
1266 if (_status) {
1267 *_status = status;
1268 }
1269 return 0;
1270}
1271
83b934ad
MD
1272static
1273void free_notification_client_rcu(struct rcu_head *node)
1274{
1275 free(caa_container_of(node, struct notification_client, rcu_node));
1276}
1277
ab0ee2ca
JG
1278static
1279void notification_client_destroy(struct notification_client *client,
1280 struct notification_thread_state *state)
1281{
ab0ee2ca
JG
1282 if (!client) {
1283 return;
1284 }
1285
505b2d90
JG
1286 /*
1287 * The client object is not reachable by other threads, no need to lock
1288 * the client here.
1289 */
ab0ee2ca
JG
1290 if (client->socket >= 0) {
1291 (void) lttcomm_close_unix_sock(client->socket);
ac1889bf 1292 client->socket = -1;
ab0ee2ca 1293 }
505b2d90 1294 client->communication.active = false;
ab0ee2ca
JG
1295 lttng_dynamic_buffer_reset(&client->communication.inbound.buffer);
1296 lttng_dynamic_buffer_reset(&client->communication.outbound.buffer);
505b2d90 1297 pthread_mutex_destroy(&client->lock);
83b934ad 1298 call_rcu(&client->rcu_node, free_notification_client_rcu);
ab0ee2ca
JG
1299}
1300
1301/*
1302 * Call with rcu_read_lock held (and hold for the lifetime of the returned
1303 * client pointer).
1304 */
1305static
1306struct notification_client *get_client_from_socket(int socket,
1307 struct notification_thread_state *state)
1308{
1309 struct cds_lfht_iter iter;
1310 struct cds_lfht_node *node;
1311 struct notification_client *client = NULL;
1312
1313 cds_lfht_lookup(state->client_socket_ht,
ac1889bf
JG
1314 hash_client_socket(socket),
1315 match_client_socket,
ab0ee2ca
JG
1316 (void *) (unsigned long) socket,
1317 &iter);
1318 node = cds_lfht_iter_get_node(&iter);
1319 if (!node) {
1320 goto end;
1321 }
1322
1323 client = caa_container_of(node, struct notification_client,
1324 client_socket_ht_node);
1325end:
1326 return client;
1327}
1328
1329static
e8360425 1330bool buffer_usage_condition_applies_to_channel(
51eab943
JG
1331 const struct lttng_condition *condition,
1332 const struct channel_info *channel_info)
ab0ee2ca
JG
1333{
1334 enum lttng_condition_status status;
e8360425
JD
1335 enum lttng_domain_type condition_domain;
1336 const char *condition_session_name = NULL;
1337 const char *condition_channel_name = NULL;
ab0ee2ca 1338
e8360425
JD
1339 status = lttng_condition_buffer_usage_get_domain_type(condition,
1340 &condition_domain);
1341 assert(status == LTTNG_CONDITION_STATUS_OK);
1342 if (channel_info->key.domain != condition_domain) {
ab0ee2ca
JG
1343 goto fail;
1344 }
1345
e8360425
JD
1346 status = lttng_condition_buffer_usage_get_session_name(
1347 condition, &condition_session_name);
1348 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1349
1350 status = lttng_condition_buffer_usage_get_channel_name(
1351 condition, &condition_channel_name);
1352 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_channel_name);
1353
1354 if (strcmp(channel_info->session_info->name, condition_session_name)) {
1355 goto fail;
1356 }
1357 if (strcmp(channel_info->name, condition_channel_name)) {
ab0ee2ca
JG
1358 goto fail;
1359 }
1360
e8360425
JD
1361 return true;
1362fail:
1363 return false;
1364}
1365
1366static
1367bool session_consumed_size_condition_applies_to_channel(
51eab943
JG
1368 const struct lttng_condition *condition,
1369 const struct channel_info *channel_info)
e8360425
JD
1370{
1371 enum lttng_condition_status status;
1372 const char *condition_session_name = NULL;
1373
1374 status = lttng_condition_session_consumed_size_get_session_name(
1375 condition, &condition_session_name);
1376 assert((status == LTTNG_CONDITION_STATUS_OK) && condition_session_name);
1377
1378 if (strcmp(channel_info->session_info->name, condition_session_name)) {
ab0ee2ca
JG
1379 goto fail;
1380 }
1381
e8360425
JD
1382 return true;
1383fail:
1384 return false;
1385}
ab0ee2ca 1386
51eab943
JG
1387static
1388bool trigger_applies_to_channel(const struct lttng_trigger *trigger,
1389 const struct channel_info *channel_info)
1390{
1391 const struct lttng_condition *condition;
e8360425 1392 bool trigger_applies;
ab0ee2ca 1393
51eab943 1394 condition = lttng_trigger_get_const_condition(trigger);
e8360425 1395 if (!condition) {
ab0ee2ca
JG
1396 goto fail;
1397 }
e8360425
JD
1398
1399 switch (lttng_condition_get_type(condition)) {
1400 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1401 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1402 trigger_applies = buffer_usage_condition_applies_to_channel(
1403 condition, channel_info);
1404 break;
1405 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
1406 trigger_applies = session_consumed_size_condition_applies_to_channel(
1407 condition, channel_info);
1408 break;
1409 default:
ab0ee2ca
JG
1410 goto fail;
1411 }
1412
e8360425 1413 return trigger_applies;
ab0ee2ca
JG
1414fail:
1415 return false;
1416}
1417
1418static
1419bool trigger_applies_to_client(struct lttng_trigger *trigger,
1420 struct notification_client *client)
1421{
1422 bool applies = false;
1423 struct lttng_condition_list_element *condition_list_element;
1424
1425 cds_list_for_each_entry(condition_list_element, &client->condition_list,
1426 node) {
1427 applies = lttng_condition_is_equal(
1428 condition_list_element->condition,
1429 lttng_trigger_get_condition(trigger));
1430 if (applies) {
1431 break;
1432 }
1433 }
1434 return applies;
1435}
1436
ed327204
JG
1437/* Must be called with RCU read lock held. */
1438static
1439struct lttng_session_trigger_list *get_session_trigger_list(
1440 struct notification_thread_state *state,
1441 const char *session_name)
1442{
1443 struct lttng_session_trigger_list *list = NULL;
1444 struct cds_lfht_node *node;
1445 struct cds_lfht_iter iter;
1446
1447 cds_lfht_lookup(state->session_triggers_ht,
1448 hash_key_str(session_name, lttng_ht_seed),
1449 match_session_trigger_list,
1450 session_name,
1451 &iter);
1452 node = cds_lfht_iter_get_node(&iter);
1453 if (!node) {
1454 /*
1455 * Not an error, the list of triggers applying to that session
1456 * will be initialized when the session is created.
1457 */
1458 DBG("[notification-thread] No trigger list found for session \"%s\" as it is not yet known to the notification system",
1459 session_name);
1460 goto end;
1461 }
1462
e742b055 1463 list = caa_container_of(node,
ed327204
JG
1464 struct lttng_session_trigger_list,
1465 session_triggers_ht_node);
1466end:
1467 return list;
1468}
1469
ea9a44f0
JG
1470/*
1471 * Allocate an empty lttng_session_trigger_list for the session named
1472 * 'session_name'.
1473 *
1474 * No ownership of 'session_name' is assumed by the session trigger list.
1475 * It is the caller's responsability to ensure the session name is alive
1476 * for as long as this list is.
1477 */
1478static
1479struct lttng_session_trigger_list *lttng_session_trigger_list_create(
1480 const char *session_name,
1481 struct cds_lfht *session_triggers_ht)
1482{
1483 struct lttng_session_trigger_list *list;
1484
1485 list = zmalloc(sizeof(*list));
1486 if (!list) {
1487 goto end;
1488 }
1489 list->session_name = session_name;
1490 CDS_INIT_LIST_HEAD(&list->list);
1491 cds_lfht_node_init(&list->session_triggers_ht_node);
1492 list->session_triggers_ht = session_triggers_ht;
1493
1494 rcu_read_lock();
1495 /* Publish the list through the session_triggers_ht. */
1496 cds_lfht_add(session_triggers_ht,
1497 hash_key_str(session_name, lttng_ht_seed),
1498 &list->session_triggers_ht_node);
1499 rcu_read_unlock();
1500end:
1501 return list;
1502}
1503
1504static
1505void free_session_trigger_list_rcu(struct rcu_head *node)
1506{
1507 free(caa_container_of(node, struct lttng_session_trigger_list,
1508 rcu_node));
1509}
1510
1511static
1512void lttng_session_trigger_list_destroy(struct lttng_session_trigger_list *list)
1513{
1514 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1515
1516 /* Empty the list element by element, and then free the list itself. */
1517 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1518 &list->list, node) {
1519 cds_list_del(&trigger_list_element->node);
1520 free(trigger_list_element);
1521 }
1522 rcu_read_lock();
1523 /* Unpublish the list from the session_triggers_ht. */
1524 cds_lfht_del(list->session_triggers_ht,
1525 &list->session_triggers_ht_node);
1526 rcu_read_unlock();
1527 call_rcu(&list->rcu_node, free_session_trigger_list_rcu);
1528}
1529
1530static
1531int lttng_session_trigger_list_add(struct lttng_session_trigger_list *list,
1532 const struct lttng_trigger *trigger)
1533{
1534 int ret = 0;
1535 struct lttng_trigger_list_element *new_element =
1536 zmalloc(sizeof(*new_element));
1537
1538 if (!new_element) {
1539 ret = -1;
1540 goto end;
1541 }
1542 CDS_INIT_LIST_HEAD(&new_element->node);
1543 new_element->trigger = trigger;
1544 cds_list_add(&new_element->node, &list->list);
1545end:
1546 return ret;
1547}
1548
1549static
1550bool trigger_applies_to_session(const struct lttng_trigger *trigger,
1551 const char *session_name)
1552{
1553 bool applies = false;
1554 const struct lttng_condition *condition;
1555
1556 condition = lttng_trigger_get_const_condition(trigger);
1557 switch (lttng_condition_get_type(condition)) {
1558 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
1559 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
1560 {
1561 enum lttng_condition_status condition_status;
1562 const char *condition_session_name;
1563
1564 condition_status = lttng_condition_session_rotation_get_session_name(
1565 condition, &condition_session_name);
1566 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
1567 ERR("[notification-thread] Failed to retrieve session rotation condition's session name");
1568 goto end;
1569 }
1570
1571 assert(condition_session_name);
1572 applies = !strcmp(condition_session_name, session_name);
1573 break;
1574 }
1575 default:
1576 goto end;
1577 }
1578end:
1579 return applies;
1580}
1581
1582/*
1583 * Allocate and initialize an lttng_session_trigger_list which contains
1584 * all triggers that apply to the session named 'session_name'.
1585 *
1586 * No ownership of 'session_name' is assumed by the session trigger list.
1587 * It is the caller's responsability to ensure the session name is alive
1588 * for as long as this list is.
1589 */
1590static
1591struct lttng_session_trigger_list *lttng_session_trigger_list_build(
1592 const struct notification_thread_state *state,
1593 const char *session_name)
1594{
1595 int trigger_count = 0;
1596 struct lttng_session_trigger_list *session_trigger_list = NULL;
1597 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1598 struct cds_lfht_iter iter;
1599
1600 session_trigger_list = lttng_session_trigger_list_create(session_name,
1601 state->session_triggers_ht);
1602
1603 /* Add all triggers applying to the session named 'session_name'. */
1604 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1605 node) {
1606 int ret;
1607
1608 if (!trigger_applies_to_session(trigger_ht_element->trigger,
1609 session_name)) {
1610 continue;
1611 }
1612
1613 ret = lttng_session_trigger_list_add(session_trigger_list,
1614 trigger_ht_element->trigger);
1615 if (ret) {
1616 goto error;
1617 }
1618
1619 trigger_count++;
1620 }
1621
1622 DBG("[notification-thread] Found %i triggers that apply to newly created session",
1623 trigger_count);
1624 return session_trigger_list;
1625error:
1626 lttng_session_trigger_list_destroy(session_trigger_list);
1627 return NULL;
1628}
1629
8abe313a
JG
1630static
1631struct session_info *find_or_create_session_info(
1632 struct notification_thread_state *state,
1633 const char *name, uid_t uid, gid_t gid)
1634{
1635 struct session_info *session = NULL;
1636 struct cds_lfht_node *node;
1637 struct cds_lfht_iter iter;
ea9a44f0 1638 struct lttng_session_trigger_list *trigger_list;
8abe313a
JG
1639
1640 rcu_read_lock();
1641 cds_lfht_lookup(state->sessions_ht,
1642 hash_key_str(name, lttng_ht_seed),
1643 match_session,
1644 name,
1645 &iter);
1646 node = cds_lfht_iter_get_node(&iter);
1647 if (node) {
1648 DBG("[notification-thread] Found session info of session \"%s\" (uid = %i, gid = %i)",
1649 name, uid, gid);
1650 session = caa_container_of(node, struct session_info,
1651 sessions_ht_node);
1652 assert(session->uid == uid);
1653 assert(session->gid == gid);
1eee26c5
JG
1654 session_info_get(session);
1655 goto end;
ea9a44f0
JG
1656 }
1657
1658 trigger_list = lttng_session_trigger_list_build(state, name);
1659 if (!trigger_list) {
1660 goto error;
8abe313a
JG
1661 }
1662
1eee26c5
JG
1663 session = session_info_create(name, uid, gid, trigger_list,
1664 state->sessions_ht);
8abe313a
JG
1665 if (!session) {
1666 ERR("[notification-thread] Failed to allocation session info for session \"%s\" (uid = %i, gid = %i)",
1667 name, uid, gid);
b248644d 1668 lttng_session_trigger_list_destroy(trigger_list);
ea9a44f0 1669 goto error;
8abe313a 1670 }
ea9a44f0 1671 trigger_list = NULL;
577983cb
JG
1672
1673 cds_lfht_add(state->sessions_ht, hash_key_str(name, lttng_ht_seed),
9b63a4aa 1674 &session->sessions_ht_node);
1eee26c5 1675end:
8abe313a
JG
1676 rcu_read_unlock();
1677 return session;
ea9a44f0
JG
1678error:
1679 rcu_read_unlock();
1680 session_info_put(session);
1681 return NULL;
8abe313a
JG
1682}
1683
ab0ee2ca
JG
1684static
1685int handle_notification_thread_command_add_channel(
8abe313a
JG
1686 struct notification_thread_state *state,
1687 const char *session_name, uid_t session_uid, gid_t session_gid,
1688 const char *channel_name, enum lttng_domain_type channel_domain,
1689 uint64_t channel_key_int, uint64_t channel_capacity,
1690 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
1691{
1692 struct cds_list_head trigger_list;
8abe313a
JG
1693 struct channel_info *new_channel_info = NULL;
1694 struct channel_key channel_key = {
1695 .key = channel_key_int,
1696 .domain = channel_domain,
1697 };
ab0ee2ca
JG
1698 struct lttng_channel_trigger_list *channel_trigger_list = NULL;
1699 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
1700 int trigger_count = 0;
1701 struct cds_lfht_iter iter;
8abe313a 1702 struct session_info *session_info = NULL;
ab0ee2ca
JG
1703
1704 DBG("[notification-thread] Adding channel %s from session %s, channel key = %" PRIu64 " in %s domain",
8abe313a
JG
1705 channel_name, session_name, channel_key_int,
1706 channel_domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
ab0ee2ca
JG
1707
1708 CDS_INIT_LIST_HEAD(&trigger_list);
1709
8abe313a
JG
1710 session_info = find_or_create_session_info(state, session_name,
1711 session_uid, session_gid);
1712 if (!session_info) {
a9577b76 1713 /* Allocation error or an internal error occurred. */
ab0ee2ca
JG
1714 goto error;
1715 }
1716
8abe313a
JG
1717 new_channel_info = channel_info_create(channel_name, &channel_key,
1718 channel_capacity, session_info);
1719 if (!new_channel_info) {
1720 goto error;
1721 }
ab0ee2ca 1722
83b934ad 1723 rcu_read_lock();
ab0ee2ca
JG
1724 /* Build a list of all triggers applying to the new channel. */
1725 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
1726 node) {
1727 struct lttng_trigger_list_element *new_element;
1728
1729 if (!trigger_applies_to_channel(trigger_ht_element->trigger,
8abe313a 1730 new_channel_info)) {
ab0ee2ca
JG
1731 continue;
1732 }
1733
1734 new_element = zmalloc(sizeof(*new_element));
1735 if (!new_element) {
83b934ad 1736 rcu_read_unlock();
ab0ee2ca
JG
1737 goto error;
1738 }
1739 CDS_INIT_LIST_HEAD(&new_element->node);
1740 new_element->trigger = trigger_ht_element->trigger;
1741 cds_list_add(&new_element->node, &trigger_list);
1742 trigger_count++;
1743 }
83b934ad 1744 rcu_read_unlock();
ab0ee2ca
JG
1745
1746 DBG("[notification-thread] Found %i triggers that apply to newly added channel",
1747 trigger_count);
1748 channel_trigger_list = zmalloc(sizeof(*channel_trigger_list));
1749 if (!channel_trigger_list) {
1750 goto error;
1751 }
8abe313a 1752 channel_trigger_list->channel_key = new_channel_info->key;
ab0ee2ca
JG
1753 CDS_INIT_LIST_HEAD(&channel_trigger_list->list);
1754 cds_lfht_node_init(&channel_trigger_list->channel_triggers_ht_node);
1755 cds_list_splice(&trigger_list, &channel_trigger_list->list);
1756
1757 rcu_read_lock();
1758 /* Add channel to the channel_ht which owns the channel_infos. */
1759 cds_lfht_add(state->channels_ht,
8abe313a 1760 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1761 &new_channel_info->channels_ht_node);
1762 /*
1763 * Add the list of triggers associated with this channel to the
1764 * channel_triggers_ht.
1765 */
1766 cds_lfht_add(state->channel_triggers_ht,
8abe313a 1767 hash_channel_key(&new_channel_info->key),
ab0ee2ca
JG
1768 &channel_trigger_list->channel_triggers_ht_node);
1769 rcu_read_unlock();
1eee26c5 1770 session_info_put(session_info);
ab0ee2ca
JG
1771 *cmd_result = LTTNG_OK;
1772 return 0;
1773error:
ab0ee2ca 1774 channel_info_destroy(new_channel_info);
8abe313a 1775 session_info_put(session_info);
ab0ee2ca
JG
1776 return 1;
1777}
1778
83b934ad
MD
1779static
1780void free_channel_trigger_list_rcu(struct rcu_head *node)
1781{
1782 free(caa_container_of(node, struct lttng_channel_trigger_list,
1783 rcu_node));
1784}
1785
1786static
1787void free_channel_state_sample_rcu(struct rcu_head *node)
1788{
1789 free(caa_container_of(node, struct channel_state_sample,
1790 rcu_node));
1791}
1792
ab0ee2ca
JG
1793static
1794int handle_notification_thread_command_remove_channel(
1795 struct notification_thread_state *state,
1796 uint64_t channel_key, enum lttng_domain_type domain,
1797 enum lttng_error_code *cmd_result)
1798{
1799 struct cds_lfht_node *node;
1800 struct cds_lfht_iter iter;
1801 struct lttng_channel_trigger_list *trigger_list;
1802 struct lttng_trigger_list_element *trigger_list_element, *tmp;
1803 struct channel_key key = { .key = channel_key, .domain = domain };
1804 struct channel_info *channel_info;
1805
1806 DBG("[notification-thread] Removing channel key = %" PRIu64 " in %s domain",
1807 channel_key, domain == LTTNG_DOMAIN_KERNEL ? "kernel" : "user space");
1808
1809 rcu_read_lock();
1810
1811 cds_lfht_lookup(state->channel_triggers_ht,
1812 hash_channel_key(&key),
1813 match_channel_trigger_list,
1814 &key,
1815 &iter);
1816 node = cds_lfht_iter_get_node(&iter);
1817 /*
1818 * There is a severe internal error if we are being asked to remove a
1819 * channel that doesn't exist.
1820 */
1821 if (!node) {
1822 ERR("[notification-thread] Channel being removed is unknown to the notification thread");
1823 goto end;
1824 }
1825
1826 /* Free the list of triggers associated with this channel. */
1827 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
1828 channel_triggers_ht_node);
1829 cds_list_for_each_entry_safe(trigger_list_element, tmp,
1830 &trigger_list->list, node) {
1831 cds_list_del(&trigger_list_element->node);
1832 free(trigger_list_element);
1833 }
1834 cds_lfht_del(state->channel_triggers_ht, node);
83b934ad 1835 call_rcu(&trigger_list->rcu_node, free_channel_trigger_list_rcu);
ab0ee2ca
JG
1836
1837 /* Free sampled channel state. */
1838 cds_lfht_lookup(state->channel_state_ht,
1839 hash_channel_key(&key),
1840 match_channel_state_sample,
1841 &key,
1842 &iter);
1843 node = cds_lfht_iter_get_node(&iter);
1844 /*
1845 * This is expected to be NULL if the channel is destroyed before we
1846 * received a sample.
1847 */
1848 if (node) {
1849 struct channel_state_sample *sample = caa_container_of(node,
1850 struct channel_state_sample,
1851 channel_state_ht_node);
1852
1853 cds_lfht_del(state->channel_state_ht, node);
83b934ad 1854 call_rcu(&sample->rcu_node, free_channel_state_sample_rcu);
ab0ee2ca
JG
1855 }
1856
1857 /* Remove the channel from the channels_ht and free it. */
1858 cds_lfht_lookup(state->channels_ht,
1859 hash_channel_key(&key),
1860 match_channel_info,
1861 &key,
1862 &iter);
1863 node = cds_lfht_iter_get_node(&iter);
1864 assert(node);
1865 channel_info = caa_container_of(node, struct channel_info,
1866 channels_ht_node);
1867 cds_lfht_del(state->channels_ht, node);
1868 channel_info_destroy(channel_info);
1869end:
1870 rcu_read_unlock();
1871 *cmd_result = LTTNG_OK;
1872 return 0;
1873}
1874
0ca52944 1875static
ed327204 1876int handle_notification_thread_command_session_rotation(
0ca52944 1877 struct notification_thread_state *state,
ed327204
JG
1878 enum notification_thread_command_type cmd_type,
1879 const char *session_name, uid_t session_uid, gid_t session_gid,
1880 uint64_t trace_archive_chunk_id,
1881 struct lttng_trace_archive_location *location,
1882 enum lttng_error_code *_cmd_result)
0ca52944 1883{
ed327204
JG
1884 int ret = 0;
1885 enum lttng_error_code cmd_result = LTTNG_OK;
1886 struct lttng_session_trigger_list *trigger_list;
1887 struct lttng_trigger_list_element *trigger_list_element;
1888 struct session_info *session_info;
0ca52944 1889
ed327204
JG
1890 rcu_read_lock();
1891
1892 session_info = find_or_create_session_info(state, session_name,
1893 session_uid, session_gid);
1894 if (!session_info) {
a9577b76 1895 /* Allocation error or an internal error occurred. */
ed327204
JG
1896 ret = -1;
1897 cmd_result = LTTNG_ERR_NOMEM;
1898 goto end;
1899 }
1900
1901 session_info->rotation.ongoing =
1902 cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING;
1903 session_info->rotation.id = trace_archive_chunk_id;
1904 trigger_list = get_session_trigger_list(state, session_name);
1905 if (!trigger_list) {
1906 DBG("[notification-thread] No triggers applying to session \"%s\" found",
1907 session_name);
1908 goto end;
1909 }
1910
1911 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
1912 node) {
1913 const struct lttng_condition *condition;
1914 const struct lttng_action *action;
1915 const struct lttng_trigger *trigger;
1916 struct notification_client_list *client_list;
1917 struct lttng_evaluation *evaluation = NULL;
1918 enum lttng_condition_type condition_type;
505b2d90 1919 bool client_list_is_empty;
ed327204
JG
1920
1921 trigger = trigger_list_element->trigger;
1922 condition = lttng_trigger_get_const_condition(trigger);
1923 assert(condition);
1924 condition_type = lttng_condition_get_type(condition);
1925
1926 if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING &&
1927 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1928 continue;
1929 } else if (condition_type == LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED &&
1930 cmd_type != NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED) {
1931 continue;
1932 }
1933
1934 action = lttng_trigger_get_const_action(trigger);
1935
1936 /* Notify actions are the only type currently supported. */
1937 assert(lttng_action_get_type_const(action) ==
1938 LTTNG_ACTION_TYPE_NOTIFY);
1939
1940 client_list = get_client_list_from_condition(state, condition);
1941 assert(client_list);
1942
505b2d90
JG
1943 pthread_mutex_lock(&client_list->lock);
1944 client_list_is_empty = cds_list_empty(&client_list->list);
1945 pthread_mutex_unlock(&client_list->lock);
1946 if (client_list_is_empty) {
ed327204
JG
1947 /*
1948 * No clients interested in the evaluation's result,
1949 * skip it.
1950 */
1951 continue;
1952 }
1953
1954 if (cmd_type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING) {
1955 evaluation = lttng_evaluation_session_rotation_ongoing_create(
1956 trace_archive_chunk_id);
1957 } else {
1958 evaluation = lttng_evaluation_session_rotation_completed_create(
1959 trace_archive_chunk_id, location);
1960 }
1961
1962 if (!evaluation) {
1963 /* Internal error */
1964 ret = -1;
1965 cmd_result = LTTNG_ERR_UNK;
505b2d90 1966 goto put_list;
ed327204
JG
1967 }
1968
1969 /* Dispatch evaluation result to all clients. */
1970 ret = send_evaluation_to_clients(trigger_list_element->trigger,
1971 evaluation, client_list, state,
1972 session_info->uid,
1973 session_info->gid);
1974 lttng_evaluation_destroy(evaluation);
505b2d90
JG
1975put_list:
1976 notification_client_list_put(client_list);
ed327204 1977 if (caa_unlikely(ret)) {
505b2d90 1978 break;
ed327204
JG
1979 }
1980 }
1981end:
1982 session_info_put(session_info);
1983 *_cmd_result = cmd_result;
1984 rcu_read_unlock();
1985 return ret;
0ca52944
JG
1986}
1987
1da26331
JG
1988static
1989int condition_is_supported(struct lttng_condition *condition)
1990{
1991 int ret;
1992
1993 switch (lttng_condition_get_type(condition)) {
1994 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
1995 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
1996 {
1997 enum lttng_domain_type domain;
1998
1999 ret = lttng_condition_buffer_usage_get_domain_type(condition,
2000 &domain);
2001 if (ret) {
2002 ret = -1;
2003 goto end;
2004 }
2005
2006 if (domain != LTTNG_DOMAIN_KERNEL) {
2007 ret = 1;
2008 goto end;
2009 }
2010
2011 /*
2012 * Older kernel tracers don't expose the API to monitor their
2013 * buffers. Therefore, we reject triggers that require that
2014 * mechanism to be available to be evaluated.
2015 */
7d268848 2016 ret = kernel_supports_ring_buffer_snapshot_sample_positions();
1da26331
JG
2017 break;
2018 }
2019 default:
2020 ret = 1;
2021 }
2022end:
2023 return ret;
2024}
2025
51eab943
JG
2026/* Must be called with RCU read lock held. */
2027static
2028int bind_trigger_to_matching_session(const struct lttng_trigger *trigger,
2029 struct notification_thread_state *state)
2030{
2031 int ret = 0;
51eab943
JG
2032 const struct lttng_condition *condition;
2033 const char *session_name;
2034 struct lttng_session_trigger_list *trigger_list;
2035
2036 condition = lttng_trigger_get_const_condition(trigger);
2037 switch (lttng_condition_get_type(condition)) {
2038 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_ONGOING:
2039 case LTTNG_CONDITION_TYPE_SESSION_ROTATION_COMPLETED:
2040 {
2041 enum lttng_condition_status status;
2042
2043 status = lttng_condition_session_rotation_get_session_name(
2044 condition, &session_name);
2045 if (status != LTTNG_CONDITION_STATUS_OK) {
2046 ERR("[notification-thread] Failed to bind trigger to session: unable to get 'session_rotation' condition's session name");
2047 ret = -1;
2048 goto end;
2049 }
2050 break;
2051 }
2052 default:
2053 ret = -1;
2054 goto end;
2055 }
2056
ed327204
JG
2057 trigger_list = get_session_trigger_list(state, session_name);
2058 if (!trigger_list) {
51eab943
JG
2059 DBG("[notification-thread] Unable to bind trigger applying to session \"%s\" as it is not yet known to the notification system",
2060 session_name);
2061 goto end;
51eab943 2062
ed327204 2063 }
51eab943
JG
2064
2065 DBG("[notification-thread] Newly registered trigger bound to session \"%s\"",
2066 session_name);
2067 ret = lttng_session_trigger_list_add(trigger_list, trigger);
2068end:
2069 return ret;
2070}
2071
2072/* Must be called with RCU read lock held. */
2073static
2074int bind_trigger_to_matching_channels(const struct lttng_trigger *trigger,
2075 struct notification_thread_state *state)
2076{
2077 int ret = 0;
2078 struct cds_lfht_node *node;
2079 struct cds_lfht_iter iter;
2080 struct channel_info *channel;
2081
2082 cds_lfht_for_each_entry(state->channels_ht, &iter, channel,
2083 channels_ht_node) {
2084 struct lttng_trigger_list_element *trigger_list_element;
2085 struct lttng_channel_trigger_list *trigger_list;
a5d64ae7 2086 struct cds_lfht_iter lookup_iter;
51eab943
JG
2087
2088 if (!trigger_applies_to_channel(trigger, channel)) {
2089 continue;
2090 }
2091
2092 cds_lfht_lookup(state->channel_triggers_ht,
2093 hash_channel_key(&channel->key),
2094 match_channel_trigger_list,
2095 &channel->key,
a5d64ae7
MD
2096 &lookup_iter);
2097 node = cds_lfht_iter_get_node(&lookup_iter);
51eab943
JG
2098 assert(node);
2099 trigger_list = caa_container_of(node,
2100 struct lttng_channel_trigger_list,
2101 channel_triggers_ht_node);
2102
2103 trigger_list_element = zmalloc(sizeof(*trigger_list_element));
2104 if (!trigger_list_element) {
2105 ret = -1;
2106 goto end;
2107 }
2108 CDS_INIT_LIST_HEAD(&trigger_list_element->node);
2109 trigger_list_element->trigger = trigger;
2110 cds_list_add(&trigger_list_element->node, &trigger_list->list);
2111 DBG("[notification-thread] Newly registered trigger bound to channel \"%s\"",
2112 channel->name);
2113 }
2114end:
2115 return ret;
2116}
2117
ab0ee2ca
JG
2118/*
2119 * FIXME A client's credentials are not checked when registering a trigger, nor
2120 * are they stored alongside with the trigger.
2121 *
1da26331 2122 * The effects of this are benign since:
ab0ee2ca 2123 * - The client will succeed in registering the trigger, as it is valid,
51eab943 2124 * - The trigger will, internally, be bound to the channel/session,
ab0ee2ca
JG
2125 * - The notifications will not be sent since the client's credentials
2126 * are checked against the channel at that moment.
1da26331
JG
2127 *
2128 * If this function returns a non-zero value, it means something is
50ca7858 2129 * fundamentally broken and the whole subsystem/thread will be torn down.
1da26331
JG
2130 *
2131 * If a non-fatal error occurs, just set the cmd_result to the appropriate
2132 * error code.
ab0ee2ca
JG
2133 */
2134static
2135int handle_notification_thread_command_register_trigger(
8abe313a
JG
2136 struct notification_thread_state *state,
2137 struct lttng_trigger *trigger,
2138 enum lttng_error_code *cmd_result)
ab0ee2ca
JG
2139{
2140 int ret = 0;
2141 struct lttng_condition *condition;
2142 struct notification_client *client;
2143 struct notification_client_list *client_list = NULL;
2144 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2145 struct notification_client_list_element *client_list_element, *tmp;
2146 struct cds_lfht_node *node;
2147 struct cds_lfht_iter iter;
ab0ee2ca
JG
2148 bool free_trigger = true;
2149
2150 rcu_read_lock();
2151
2152 condition = lttng_trigger_get_condition(trigger);
1da26331
JG
2153 assert(condition);
2154
2155 ret = condition_is_supported(condition);
2156 if (ret < 0) {
2157 goto error;
2158 } else if (ret == 0) {
2159 *cmd_result = LTTNG_ERR_NOT_SUPPORTED;
2160 goto error;
2161 } else {
2162 /* Feature is supported, continue. */
2163 ret = 0;
2164 }
2165
ab0ee2ca
JG
2166 trigger_ht_element = zmalloc(sizeof(*trigger_ht_element));
2167 if (!trigger_ht_element) {
2168 ret = -1;
2169 goto error;
2170 }
2171
2172 /* Add trigger to the trigger_ht. */
2173 cds_lfht_node_init(&trigger_ht_element->node);
2174 trigger_ht_element->trigger = trigger;
2175
2176 node = cds_lfht_add_unique(state->triggers_ht,
2177 lttng_condition_hash(condition),
2178 match_condition,
2179 condition,
2180 &trigger_ht_element->node);
2181 if (node != &trigger_ht_element->node) {
2182 /* Not a fatal error, simply report it to the client. */
2183 *cmd_result = LTTNG_ERR_TRIGGER_EXISTS;
2184 goto error_free_ht_element;
2185 }
2186
2187 /*
2188 * Ownership of the trigger and of its wrapper was transfered to
2189 * the triggers_ht.
2190 */
2191 trigger_ht_element = NULL;
2192 free_trigger = false;
2193
2194 /*
2195 * The rest only applies to triggers that have a "notify" action.
2196 * It is not skipped as this is the only action type currently
2197 * supported.
2198 */
505b2d90 2199 client_list = notification_client_list_create(trigger);
ab0ee2ca
JG
2200 if (!client_list) {
2201 ret = -1;
2202 goto error_free_ht_element;
2203 }
ab0ee2ca
JG
2204
2205 /* Build a list of clients to which this new trigger applies. */
2206 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
2207 client_socket_ht_node) {
2208 if (!trigger_applies_to_client(trigger, client)) {
2209 continue;
2210 }
2211
2212 client_list_element = zmalloc(sizeof(*client_list_element));
2213 if (!client_list_element) {
2214 ret = -1;
505b2d90 2215 goto error_put_client_list;
ab0ee2ca
JG
2216 }
2217 CDS_INIT_LIST_HEAD(&client_list_element->node);
2218 client_list_element->client = client;
2219 cds_list_add(&client_list_element->node, &client_list->list);
2220 }
2221
f82f93a1 2222 switch (get_condition_binding_object(condition)) {
51eab943
JG
2223 case LTTNG_OBJECT_TYPE_SESSION:
2224 /* Add the trigger to the list if it matches a known session. */
2225 ret = bind_trigger_to_matching_session(trigger, state);
2226 if (ret) {
505b2d90 2227 goto error_put_client_list;
ab0ee2ca 2228 }
f82f93a1 2229 break;
51eab943
JG
2230 case LTTNG_OBJECT_TYPE_CHANNEL:
2231 /*
2232 * Add the trigger to list of triggers bound to the channels
2233 * currently known.
2234 */
2235 ret = bind_trigger_to_matching_channels(trigger, state);
2236 if (ret) {
505b2d90 2237 goto error_put_client_list;
ab0ee2ca 2238 }
51eab943
JG
2239 break;
2240 case LTTNG_OBJECT_TYPE_NONE:
2241 break;
2242 default:
2243 ERR("[notification-thread] Unknown object type on which to bind a newly registered trigger was encountered");
2244 ret = -1;
505b2d90 2245 goto error_put_client_list;
ab0ee2ca
JG
2246 }
2247
2ae99f0b
JG
2248 /*
2249 * Since there is nothing preventing clients from subscribing to a
2250 * condition before the corresponding trigger is registered, we have
2251 * to evaluate this new condition right away.
2252 *
2253 * At some point, we were waiting for the next "evaluation" (e.g. on
2254 * reception of a channel sample) to evaluate this new condition, but
2255 * that was broken.
2256 *
2257 * The reason it was broken is that waiting for the next sample
2258 * does not allow us to properly handle transitions for edge-triggered
2259 * conditions.
2260 *
2261 * Consider this example: when we handle a new channel sample, we
2262 * evaluate each conditions twice: once with the previous state, and
2263 * again with the newest state. We then use those two results to
2264 * determine whether a state change happened: a condition was false and
2265 * became true. If a state change happened, we have to notify clients.
2266 *
2267 * Now, if a client subscribes to a given notification and registers
2268 * a trigger *after* that subscription, we have to make sure the
2269 * condition is evaluated at this point while considering only the
2270 * current state. Otherwise, the next evaluation cycle may only see
2271 * that the evaluations remain the same (true for samples n-1 and n) and
2272 * the client will never know that the condition has been met.
505b2d90
JG
2273 *
2274 * No need to lock the list here as it has not been published yet.
2ae99f0b
JG
2275 */
2276 cds_list_for_each_entry_safe(client_list_element, tmp,
2277 &client_list->list, node) {
2278 ret = evaluate_condition_for_client(trigger, condition,
2279 client_list_element->client, state);
2280 if (ret) {
505b2d90 2281 goto error_put_client_list;
2ae99f0b
JG
2282 }
2283 }
2284
2285 /*
2286 * Client list ownership transferred to the
2287 * notification_trigger_clients_ht.
2288 */
505b2d90 2289 publish_notification_client_list(state, client_list);
2ae99f0b
JG
2290 client_list = NULL;
2291
ab0ee2ca 2292 *cmd_result = LTTNG_OK;
505b2d90
JG
2293
2294error_put_client_list:
2295 notification_client_list_put(client_list);
2296
ab0ee2ca
JG
2297error_free_ht_element:
2298 free(trigger_ht_element);
2299error:
2300 if (free_trigger) {
ab0ee2ca
JG
2301 lttng_trigger_destroy(trigger);
2302 }
2303 rcu_read_unlock();
2304 return ret;
2305}
2306
83b934ad
MD
2307static
2308void free_lttng_trigger_ht_element_rcu(struct rcu_head *node)
2309{
2310 free(caa_container_of(node, struct lttng_trigger_ht_element,
2311 rcu_node));
2312}
2313
cc2295b5 2314static
ab0ee2ca
JG
2315int handle_notification_thread_command_unregister_trigger(
2316 struct notification_thread_state *state,
2317 struct lttng_trigger *trigger,
2318 enum lttng_error_code *_cmd_reply)
2319{
2320 struct cds_lfht_iter iter;
ed327204 2321 struct cds_lfht_node *triggers_ht_node;
ab0ee2ca
JG
2322 struct lttng_channel_trigger_list *trigger_list;
2323 struct notification_client_list *client_list;
ab0ee2ca
JG
2324 struct lttng_trigger_ht_element *trigger_ht_element = NULL;
2325 struct lttng_condition *condition = lttng_trigger_get_condition(
2326 trigger);
ab0ee2ca
JG
2327 enum lttng_error_code cmd_reply;
2328
2329 rcu_read_lock();
2330
2331 cds_lfht_lookup(state->triggers_ht,
2332 lttng_condition_hash(condition),
2333 match_condition,
2334 condition,
2335 &iter);
2336 triggers_ht_node = cds_lfht_iter_get_node(&iter);
2337 if (!triggers_ht_node) {
2338 cmd_reply = LTTNG_ERR_TRIGGER_NOT_FOUND;
2339 goto end;
2340 } else {
2341 cmd_reply = LTTNG_OK;
2342 }
2343
2344 /* Remove trigger from channel_triggers_ht. */
2345 cds_lfht_for_each_entry(state->channel_triggers_ht, &iter, trigger_list,
2346 channel_triggers_ht_node) {
2347 struct lttng_trigger_list_element *trigger_element, *tmp;
2348
2349 cds_list_for_each_entry_safe(trigger_element, tmp,
2350 &trigger_list->list, node) {
9b63a4aa
JG
2351 const struct lttng_condition *current_condition =
2352 lttng_trigger_get_const_condition(
ab0ee2ca
JG
2353 trigger_element->trigger);
2354
2355 assert(current_condition);
2356 if (!lttng_condition_is_equal(condition,
2357 current_condition)) {
2358 continue;
2359 }
2360
2361 DBG("[notification-thread] Removed trigger from channel_triggers_ht");
2362 cds_list_del(&trigger_element->node);
e4db5ace
JR
2363 /* A trigger can only appear once per channel */
2364 break;
ab0ee2ca
JG
2365 }
2366 }
2367
2368 /*
2369 * Remove and release the client list from
2370 * notification_trigger_clients_ht.
2371 */
ed327204
JG
2372 client_list = get_client_list_from_condition(state, condition);
2373 assert(client_list);
2374
505b2d90
JG
2375 /* Put new reference and the hashtable's reference. */
2376 notification_client_list_put(client_list);
2377 notification_client_list_put(client_list);
2378 client_list = NULL;
ab0ee2ca
JG
2379
2380 /* Remove trigger from triggers_ht. */
2381 trigger_ht_element = caa_container_of(triggers_ht_node,
2382 struct lttng_trigger_ht_element, node);
2383 cds_lfht_del(state->triggers_ht, triggers_ht_node);
2384
7ca172c1 2385 /* Release the ownership of the trigger. */
ab0ee2ca 2386 lttng_trigger_destroy(trigger_ht_element->trigger);
83b934ad 2387 call_rcu(&trigger_ht_element->rcu_node, free_lttng_trigger_ht_element_rcu);
ab0ee2ca
JG
2388end:
2389 rcu_read_unlock();
2390 if (_cmd_reply) {
2391 *_cmd_reply = cmd_reply;
2392 }
2393 return 0;
2394}
2395
2396/* Returns 0 on success, 1 on exit requested, negative value on error. */
2397int handle_notification_thread_command(
2398 struct notification_thread_handle *handle,
2399 struct notification_thread_state *state)
2400{
2401 int ret;
2402 uint64_t counter;
2403 struct notification_thread_command *cmd;
2404
8abe313a 2405 /* Read the event pipe to put it back into a quiescent state. */
18aeca05 2406 ret = lttng_read(lttng_pipe_get_readfd(handle->cmd_queue.event_pipe), &counter,
8abe313a 2407 sizeof(counter));
18aeca05 2408 if (ret != sizeof(counter)) {
ab0ee2ca
JG
2409 goto error;
2410 }
2411
2412 pthread_mutex_lock(&handle->cmd_queue.lock);
2413 cmd = cds_list_first_entry(&handle->cmd_queue.list,
2414 struct notification_thread_command, cmd_list_node);
2415 switch (cmd->type) {
2416 case NOTIFICATION_COMMAND_TYPE_REGISTER_TRIGGER:
2417 DBG("[notification-thread] Received register trigger command");
2418 ret = handle_notification_thread_command_register_trigger(
2419 state, cmd->parameters.trigger,
2420 &cmd->reply_code);
2421 break;
2422 case NOTIFICATION_COMMAND_TYPE_UNREGISTER_TRIGGER:
2423 DBG("[notification-thread] Received unregister trigger command");
2424 ret = handle_notification_thread_command_unregister_trigger(
2425 state, cmd->parameters.trigger,
2426 &cmd->reply_code);
2427 break;
2428 case NOTIFICATION_COMMAND_TYPE_ADD_CHANNEL:
2429 DBG("[notification-thread] Received add channel command");
2430 ret = handle_notification_thread_command_add_channel(
8abe313a
JG
2431 state,
2432 cmd->parameters.add_channel.session.name,
2433 cmd->parameters.add_channel.session.uid,
2434 cmd->parameters.add_channel.session.gid,
2435 cmd->parameters.add_channel.channel.name,
2436 cmd->parameters.add_channel.channel.domain,
2437 cmd->parameters.add_channel.channel.key,
2438 cmd->parameters.add_channel.channel.capacity,
ab0ee2ca
JG
2439 &cmd->reply_code);
2440 break;
2441 case NOTIFICATION_COMMAND_TYPE_REMOVE_CHANNEL:
2442 DBG("[notification-thread] Received remove channel command");
2443 ret = handle_notification_thread_command_remove_channel(
2444 state, cmd->parameters.remove_channel.key,
2445 cmd->parameters.remove_channel.domain,
2446 &cmd->reply_code);
2447 break;
0ca52944 2448 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING:
0ca52944 2449 case NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_COMPLETED:
ed327204
JG
2450 DBG("[notification-thread] Received session rotation %s command",
2451 cmd->type == NOTIFICATION_COMMAND_TYPE_SESSION_ROTATION_ONGOING ?
2452 "ongoing" : "completed");
2453 ret = handle_notification_thread_command_session_rotation(
0ca52944 2454 state,
ed327204
JG
2455 cmd->type,
2456 cmd->parameters.session_rotation.session_name,
2457 cmd->parameters.session_rotation.uid,
2458 cmd->parameters.session_rotation.gid,
2459 cmd->parameters.session_rotation.trace_archive_chunk_id,
2460 cmd->parameters.session_rotation.location,
0ca52944
JG
2461 &cmd->reply_code);
2462 break;
ab0ee2ca
JG
2463 case NOTIFICATION_COMMAND_TYPE_QUIT:
2464 DBG("[notification-thread] Received quit command");
2465 cmd->reply_code = LTTNG_OK;
2466 ret = 1;
2467 goto end;
2468 default:
2469 ERR("[notification-thread] Unknown internal command received");
2470 goto error_unlock;
2471 }
2472
2473 if (ret) {
2474 goto error_unlock;
2475 }
2476end:
2477 cds_list_del(&cmd->cmd_list_node);
8ada111f 2478 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2479 pthread_mutex_unlock(&handle->cmd_queue.lock);
2480 return ret;
2481error_unlock:
2482 /* Wake-up and return a fatal error to the calling thread. */
8ada111f 2483 lttng_waiter_wake_up(&cmd->reply_waiter);
ab0ee2ca
JG
2484 pthread_mutex_unlock(&handle->cmd_queue.lock);
2485 cmd->reply_code = LTTNG_ERR_FATAL;
2486error:
2487 /* Indicate a fatal error to the caller. */
2488 return -1;
2489}
2490
ab0ee2ca
JG
2491static
2492int socket_set_non_blocking(int socket)
2493{
2494 int ret, flags;
2495
2496 /* Set the pipe as non-blocking. */
2497 ret = fcntl(socket, F_GETFL, 0);
2498 if (ret == -1) {
2499 PERROR("fcntl get socket flags");
2500 goto end;
2501 }
2502 flags = ret;
2503
2504 ret = fcntl(socket, F_SETFL, flags | O_NONBLOCK);
2505 if (ret == -1) {
2506 PERROR("fcntl set O_NONBLOCK socket flag");
2507 goto end;
2508 }
2509 DBG("Client socket (fd = %i) set as non-blocking", socket);
2510end:
2511 return ret;
2512}
2513
505b2d90 2514/* Client lock must be acquired by caller. */
ab0ee2ca 2515static
14fa22f8 2516int client_reset_inbound_state(struct notification_client *client)
ab0ee2ca
JG
2517{
2518 int ret;
2519
505b2d90
JG
2520 ASSERT_LOCKED(client->lock);
2521
ab0ee2ca
JG
2522 ret = lttng_dynamic_buffer_set_size(
2523 &client->communication.inbound.buffer, 0);
2524 assert(!ret);
2525
2526 client->communication.inbound.bytes_to_receive =
2527 sizeof(struct lttng_notification_channel_message);
2528 client->communication.inbound.msg_type =
2529 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN;
ab0ee2ca
JG
2530 LTTNG_SOCK_SET_UID_CRED(&client->communication.inbound.creds, -1);
2531 LTTNG_SOCK_SET_GID_CRED(&client->communication.inbound.creds, -1);
14fa22f8
JG
2532 ret = lttng_dynamic_buffer_set_size(
2533 &client->communication.inbound.buffer,
2534 client->communication.inbound.bytes_to_receive);
2535 return ret;
ab0ee2ca
JG
2536}
2537
2538int handle_notification_thread_client_connect(
2539 struct notification_thread_state *state)
2540{
2541 int ret;
2542 struct notification_client *client;
2543
2544 DBG("[notification-thread] Handling new notification channel client connection");
2545
2546 client = zmalloc(sizeof(*client));
2547 if (!client) {
2548 /* Fatal error. */
2549 ret = -1;
2550 goto error;
2551 }
505b2d90 2552 pthread_mutex_init(&client->lock, NULL);
ac1889bf 2553 client->id = state->next_notification_client_id++;
ab0ee2ca
JG
2554 CDS_INIT_LIST_HEAD(&client->condition_list);
2555 lttng_dynamic_buffer_init(&client->communication.inbound.buffer);
2556 lttng_dynamic_buffer_init(&client->communication.outbound.buffer);
01ea340e 2557 client->communication.inbound.expect_creds = true;
505b2d90
JG
2558
2559 pthread_mutex_lock(&client->lock);
14fa22f8 2560 ret = client_reset_inbound_state(client);
505b2d90 2561 pthread_mutex_unlock(&client->lock);
14fa22f8
JG
2562 if (ret) {
2563 ERR("[notification-thread] Failed to reset client communication's inbound state");
e742b055 2564 ret = 0;
14fa22f8
JG
2565 goto error;
2566 }
ab0ee2ca
JG
2567
2568 ret = lttcomm_accept_unix_sock(state->notification_channel_socket);
2569 if (ret < 0) {
2570 ERR("[notification-thread] Failed to accept new notification channel client connection");
2571 ret = 0;
2572 goto error;
2573 }
2574
2575 client->socket = ret;
2576
2577 ret = socket_set_non_blocking(client->socket);
2578 if (ret) {
2579 ERR("[notification-thread] Failed to set new notification channel client connection socket as non-blocking");
2580 goto error;
2581 }
2582
2583 ret = lttcomm_setsockopt_creds_unix_sock(client->socket);
2584 if (ret < 0) {
2585 ERR("[notification-thread] Failed to set socket options on new notification channel client socket");
2586 ret = 0;
2587 goto error;
2588 }
2589
2590 ret = lttng_poll_add(&state->events, client->socket,
2591 LPOLLIN | LPOLLERR |
2592 LPOLLHUP | LPOLLRDHUP);
2593 if (ret < 0) {
2594 ERR("[notification-thread] Failed to add notification channel client socket to poll set");
2595 ret = 0;
2596 goto error;
2597 }
2598 DBG("[notification-thread] Added new notification channel client socket (%i) to poll set",
2599 client->socket);
2600
ab0ee2ca
JG
2601 rcu_read_lock();
2602 cds_lfht_add(state->client_socket_ht,
2603 hash_client_socket(client->socket),
2604 &client->client_socket_ht_node);
ac1889bf
JG
2605 cds_lfht_add(state->client_id_ht,
2606 hash_client_id(client->id),
2607 &client->client_id_ht_node);
ab0ee2ca
JG
2608 rcu_read_unlock();
2609
2610 return ret;
2611error:
2612 notification_client_destroy(client, state);
2613 return ret;
2614}
2615
505b2d90
JG
2616/* RCU read-lock must be held by the caller. */
2617/* Client lock must be held by the caller */
2618static
2619int notification_thread_client_disconnect(
2620 struct notification_client *client,
ab0ee2ca 2621 struct notification_thread_state *state)
505b2d90
JG
2622{
2623 int ret;
2624 struct lttng_condition_list_element *condition_list_element, *tmp;
2625
2626 /* Acquire the client lock to disable its communication atomically. */
2627 client->communication.active = false;
2628 ret = lttng_poll_del(&state->events, client->socket);
2629 if (ret) {
2630 ERR("[notification-thread] Failed to remove client socket %d from poll set",
2631 client->socket);
2632 }
2633
2634 cds_lfht_del(state->client_socket_ht, &client->client_socket_ht_node);
2635 cds_lfht_del(state->client_id_ht, &client->client_id_ht_node);
2636
2637 /* Release all conditions to which the client was subscribed. */
2638 cds_list_for_each_entry_safe(condition_list_element, tmp,
2639 &client->condition_list, node) {
2640 (void) notification_thread_client_unsubscribe(client,
2641 condition_list_element->condition, state, NULL);
2642 }
2643
2644 /*
2645 * Client no longer accessible to other threads (through the
2646 * client lists).
2647 */
2648 notification_client_destroy(client, state);
2649 return ret;
2650}
2651
2652int handle_notification_thread_client_disconnect(
2653 int client_socket, struct notification_thread_state *state)
ab0ee2ca
JG
2654{
2655 int ret = 0;
2656 struct notification_client *client;
2657
2658 rcu_read_lock();
2659 DBG("[notification-thread] Closing client connection (socket fd = %i)",
2660 client_socket);
2661 client = get_client_from_socket(client_socket, state);
2662 if (!client) {
2663 /* Internal state corruption, fatal error. */
2664 ERR("[notification-thread] Unable to find client (socket fd = %i)",
2665 client_socket);
2666 ret = -1;
2667 goto end;
2668 }
2669
505b2d90
JG
2670 pthread_mutex_lock(&client->lock);
2671 ret = notification_thread_client_disconnect(client, state);
2672 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
2673end:
2674 rcu_read_unlock();
2675 return ret;
2676}
2677
2678int handle_notification_thread_client_disconnect_all(
2679 struct notification_thread_state *state)
2680{
2681 struct cds_lfht_iter iter;
2682 struct notification_client *client;
2683 bool error_encoutered = false;
2684
2685 rcu_read_lock();
2686 DBG("[notification-thread] Closing all client connections");
2687 cds_lfht_for_each_entry(state->client_socket_ht, &iter, client,
505b2d90 2688 client_socket_ht_node) {
ab0ee2ca
JG
2689 int ret;
2690
505b2d90
JG
2691 pthread_mutex_lock(&client->lock);
2692 ret = notification_thread_client_disconnect(
2693 client, state);
2694 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
2695 if (ret) {
2696 error_encoutered = true;
2697 }
2698 }
2699 rcu_read_unlock();
2700 return error_encoutered ? 1 : 0;
2701}
2702
2703int handle_notification_thread_trigger_unregister_all(
2704 struct notification_thread_state *state)
2705{
4149ace8 2706 bool error_occurred = false;
ab0ee2ca
JG
2707 struct cds_lfht_iter iter;
2708 struct lttng_trigger_ht_element *trigger_ht_element;
2709
763de384 2710 rcu_read_lock();
ab0ee2ca
JG
2711 cds_lfht_for_each_entry(state->triggers_ht, &iter, trigger_ht_element,
2712 node) {
2713 int ret = handle_notification_thread_command_unregister_trigger(
2714 state, trigger_ht_element->trigger, NULL);
2715 if (ret) {
4149ace8 2716 error_occurred = true;
ab0ee2ca
JG
2717 }
2718 }
763de384 2719 rcu_read_unlock();
4149ace8 2720 return error_occurred ? -1 : 0;
ab0ee2ca
JG
2721}
2722
505b2d90 2723/* Client lock must be acquired by caller. */
ab0ee2ca
JG
2724static
2725int client_flush_outgoing_queue(struct notification_client *client,
2726 struct notification_thread_state *state)
2727{
2728 ssize_t ret;
2729 size_t to_send_count;
2730
505b2d90
JG
2731 ASSERT_LOCKED(client->lock);
2732
ab0ee2ca
JG
2733 assert(client->communication.outbound.buffer.size != 0);
2734 to_send_count = client->communication.outbound.buffer.size;
2735 DBG("[notification-thread] Flushing client (socket fd = %i) outgoing queue",
2736 client->socket);
2737
2738 ret = lttcomm_send_unix_sock_non_block(client->socket,
2739 client->communication.outbound.buffer.data,
2740 to_send_count);
44c180ca 2741 if ((ret >= 0 && ret < to_send_count)) {
ab0ee2ca
JG
2742 DBG("[notification-thread] Client (socket fd = %i) outgoing queue could not be completely flushed",
2743 client->socket);
2744 to_send_count -= max(ret, 0);
2745
2746 memcpy(client->communication.outbound.buffer.data,
2747 client->communication.outbound.buffer.data +
2748 client->communication.outbound.buffer.size - to_send_count,
2749 to_send_count);
2750 ret = lttng_dynamic_buffer_set_size(
2751 &client->communication.outbound.buffer,
2752 to_send_count);
2753 if (ret) {
2754 goto error;
2755 }
2756
2757 /*
2758 * We want to be notified whenever there is buffer space
2759 * available to send the rest of the payload.
2760 */
2761 ret = lttng_poll_mod(&state->events, client->socket,
2762 CLIENT_POLL_MASK_IN_OUT);
2763 if (ret) {
2764 goto error;
2765 }
2766 } else if (ret < 0) {
2767 /* Generic error, disconnect the client. */
2768 ERR("[notification-thread] Failed to send flush outgoing queue, disconnecting client (socket fd = %i)",
2769 client->socket);
505b2d90 2770 ret = notification_thread_client_disconnect(client, state);
ab0ee2ca
JG
2771 if (ret) {
2772 goto error;
2773 }
2774 } else {
2775 /* No error and flushed the queue completely. */
2776 ret = lttng_dynamic_buffer_set_size(
2777 &client->communication.outbound.buffer, 0);
2778 if (ret) {
2779 goto error;
2780 }
2781 ret = lttng_poll_mod(&state->events, client->socket,
2782 CLIENT_POLL_MASK_IN);
2783 if (ret) {
2784 goto error;
2785 }
2786
2787 client->communication.outbound.queued_command_reply = false;
2788 client->communication.outbound.dropped_notification = false;
2789 }
2790
2791 return 0;
2792error:
2793 return -1;
2794}
2795
505b2d90 2796/* Client lock must be acquired by caller. */
ab0ee2ca
JG
2797static
2798int client_send_command_reply(struct notification_client *client,
2799 struct notification_thread_state *state,
2800 enum lttng_notification_channel_status status)
2801{
2802 int ret;
2803 struct lttng_notification_channel_command_reply reply = {
2804 .status = (int8_t) status,
2805 };
2806 struct lttng_notification_channel_message msg = {
2807 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_COMMAND_REPLY,
2808 .size = sizeof(reply),
2809 };
2810 char buffer[sizeof(msg) + sizeof(reply)];
2811
505b2d90
JG
2812 ASSERT_LOCKED(client->lock);
2813
ab0ee2ca
JG
2814 if (client->communication.outbound.queued_command_reply) {
2815 /* Protocol error. */
2816 goto error;
2817 }
2818
2819 memcpy(buffer, &msg, sizeof(msg));
2820 memcpy(buffer + sizeof(msg), &reply, sizeof(reply));
2821 DBG("[notification-thread] Send command reply (%i)", (int) status);
2822
2823 /* Enqueue buffer to outgoing queue and flush it. */
2824 ret = lttng_dynamic_buffer_append(
2825 &client->communication.outbound.buffer,
2826 buffer, sizeof(buffer));
2827 if (ret) {
2828 goto error;
2829 }
2830
2831 ret = client_flush_outgoing_queue(client, state);
2832 if (ret) {
2833 goto error;
2834 }
2835
2836 if (client->communication.outbound.buffer.size != 0) {
2837 /* Queue could not be emptied. */
2838 client->communication.outbound.queued_command_reply = true;
2839 }
2840
2841 return 0;
2842error:
2843 return -1;
2844}
2845
505b2d90
JG
2846static
2847int client_handle_message_unknown(struct notification_client *client,
2848 struct notification_thread_state *state)
2849{
2850 int ret;
2851
2852 pthread_mutex_lock(&client->lock);
2853
2854 /*
2855 * Receiving message header. The function will be called again
2856 * once the rest of the message as been received and can be
2857 * interpreted.
2858 */
2859 const struct lttng_notification_channel_message *msg;
2860
2861 assert(sizeof(*msg) == client->communication.inbound.buffer.size);
2862 msg = (const struct lttng_notification_channel_message *)
2863 client->communication.inbound.buffer.data;
2864
2865 if (msg->size == 0 ||
2866 msg->size > DEFAULT_MAX_NOTIFICATION_CLIENT_MESSAGE_PAYLOAD_SIZE) {
2867 ERR("[notification-thread] Invalid notification channel message: length = %u",
2868 msg->size);
2869 ret = -1;
2870 goto end;
2871 }
2872
2873 switch (msg->type) {
2874 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
2875 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
2876 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
2877 break;
2878 default:
2879 ret = -1;
2880 ERR("[notification-thread] Invalid notification channel message: unexpected message type");
2881 goto end;
2882 }
2883
2884 client->communication.inbound.bytes_to_receive = msg->size;
2885 client->communication.inbound.msg_type =
2886 (enum lttng_notification_channel_message_type) msg->type;
2887 ret = lttng_dynamic_buffer_set_size(
2888 &client->communication.inbound.buffer, msg->size);
2889end:
2890 pthread_mutex_unlock(&client->lock);
2891 return ret;
2892}
2893
2894static
2895int client_handle_message_handshake(struct notification_client *client,
2896 struct notification_thread_state *state)
2897{
2898 int ret;
2899 struct lttng_notification_channel_command_handshake *handshake_client;
2900 const struct lttng_notification_channel_command_handshake handshake_reply = {
2901 .major = LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR,
2902 .minor = LTTNG_NOTIFICATION_CHANNEL_VERSION_MINOR,
2903 };
2904 const struct lttng_notification_channel_message msg_header = {
2905 .type = LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE,
2906 .size = sizeof(handshake_reply),
2907 };
2908 enum lttng_notification_channel_status status =
2909 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2910 char send_buffer[sizeof(msg_header) + sizeof(handshake_reply)];
2911
2912 pthread_mutex_lock(&client->lock);
2913
2914 memcpy(send_buffer, &msg_header, sizeof(msg_header));
2915 memcpy(send_buffer + sizeof(msg_header), &handshake_reply,
2916 sizeof(handshake_reply));
2917
2918 handshake_client =
2919 (struct lttng_notification_channel_command_handshake *)
2920 client->communication.inbound.buffer
2921 .data;
2922 client->major = handshake_client->major;
2923 client->minor = handshake_client->minor;
2924 if (!client->communication.inbound.creds_received) {
2925 ERR("[notification-thread] No credentials received from client");
2926 ret = -1;
2927 goto end;
2928 }
2929
2930 client->uid = LTTNG_SOCK_GET_UID_CRED(
2931 &client->communication.inbound.creds);
2932 client->gid = LTTNG_SOCK_GET_GID_CRED(
2933 &client->communication.inbound.creds);
2934 DBG("[notification-thread] Received handshake from client (uid = %u, gid = %u) with version %i.%i",
2935 client->uid, client->gid, (int) client->major,
2936 (int) client->minor);
2937
2938 if (handshake_client->major !=
2939 LTTNG_NOTIFICATION_CHANNEL_VERSION_MAJOR) {
2940 status = LTTNG_NOTIFICATION_CHANNEL_STATUS_UNSUPPORTED_VERSION;
2941 }
2942
2943 ret = lttng_dynamic_buffer_append(
2944 &client->communication.outbound.buffer, send_buffer,
2945 sizeof(send_buffer));
2946 if (ret) {
2947 ERR("[notification-thread] Failed to send protocol version to notification channel client");
2948 goto end;
2949 }
2950
2951 client->validated = true;
2952 client->communication.active = true;
2953
2954 ret = client_flush_outgoing_queue(client, state);
2955 if (ret) {
2956 goto end;
2957 }
2958
2959 ret = client_send_command_reply(client, state, status);
2960 if (ret) {
2961 ERR("[notification-thread] Failed to send reply to notification channel client");
2962 goto end;
2963 }
2964
2965 /* Set reception state to receive the next message header. */
2966 ret = client_reset_inbound_state(client);
2967 if (ret) {
2968 ERR("[notification-thread] Failed to reset client communication's inbound state");
2969 goto end;
2970 }
2971
2972end:
2973 pthread_mutex_unlock(&client->lock);
2974 return ret;
2975}
2976
2977static
2978int client_handle_message_subscription(
2979 struct notification_client *client,
2980 enum lttng_notification_channel_message_type msg_type,
2981 struct notification_thread_state *state)
2982{
2983 int ret;
2984 struct lttng_condition *condition;
2985 enum lttng_notification_channel_status status =
2986 LTTNG_NOTIFICATION_CHANNEL_STATUS_OK;
2987 struct lttng_payload_view condition_view =
2988 lttng_payload_view_from_dynamic_buffer(
2989 &client->communication.inbound.buffer,
2990 0, -1);
2991 size_t expected_condition_size;
2992
2993 pthread_mutex_lock(&client->lock);
2994 expected_condition_size = client->communication.inbound.buffer.size;
2995 pthread_mutex_unlock(&client->lock);
2996
2997 ret = lttng_condition_create_from_payload(&condition_view, &condition);
2998 if (ret != expected_condition_size) {
2999 ERR("[notification-thread] Malformed condition received from client");
3000 goto end;
3001 }
3002
3003 if (msg_type == LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE) {
3004 ret = notification_thread_client_subscribe(
3005 client, condition, state, &status);
3006 } else {
3007 ret = notification_thread_client_unsubscribe(
3008 client, condition, state, &status);
3009 }
3010 if (ret) {
3011 goto end;
3012 }
3013
3014 pthread_mutex_lock(&client->lock);
3015 ret = client_send_command_reply(client, state, status);
3016 if (ret) {
3017 ERR("[notification-thread] Failed to send reply to notification channel client");
3018 goto end_unlock;
3019 }
3020
3021 /* Set reception state to receive the next message header. */
3022 ret = client_reset_inbound_state(client);
3023 if (ret) {
3024 ERR("[notification-thread] Failed to reset client communication's inbound state");
3025 goto end_unlock;
3026 }
3027
3028end_unlock:
3029 pthread_mutex_unlock(&client->lock);
3030end:
3031 return ret;
3032}
3033
ab0ee2ca
JG
3034static
3035int client_dispatch_message(struct notification_client *client,
3036 struct notification_thread_state *state)
3037{
3038 int ret = 0;
3039
3040 if (client->communication.inbound.msg_type !=
3041 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE &&
3042 client->communication.inbound.msg_type !=
3043 LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN &&
3044 !client->validated) {
3045 WARN("[notification-thread] client attempted a command before handshake");
3046 ret = -1;
3047 goto end;
3048 }
3049
3050 switch (client->communication.inbound.msg_type) {
3051 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNKNOWN:
3052 {
505b2d90 3053 ret = client_handle_message_unknown(client, state);
ab0ee2ca
JG
3054 break;
3055 }
3056 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_HANDSHAKE:
3057 {
505b2d90 3058 ret = client_handle_message_handshake(client, state);
ab0ee2ca
JG
3059 break;
3060 }
3061 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_SUBSCRIBE:
3062 case LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_UNSUBSCRIBE:
3063 {
505b2d90
JG
3064 ret = client_handle_message_subscription(client,
3065 client->communication.inbound.msg_type, state);
ab0ee2ca
JG
3066 break;
3067 }
3068 default:
3069 abort();
3070 }
3071end:
3072 return ret;
3073}
3074
3075/* Incoming data from client. */
3076int handle_notification_thread_client_in(
3077 struct notification_thread_state *state, int socket)
3078{
14fa22f8 3079 int ret = 0;
ab0ee2ca
JG
3080 struct notification_client *client;
3081 ssize_t recv_ret;
3082 size_t offset;
505b2d90 3083 bool message_is_complete = false;
ab0ee2ca
JG
3084
3085 client = get_client_from_socket(socket, state);
3086 if (!client) {
3087 /* Internal error, abort. */
3088 ret = -1;
3089 goto end;
3090 }
3091
505b2d90 3092 pthread_mutex_lock(&client->lock);
14fa22f8
JG
3093 offset = client->communication.inbound.buffer.size -
3094 client->communication.inbound.bytes_to_receive;
01ea340e 3095 if (client->communication.inbound.expect_creds) {
ab0ee2ca
JG
3096 recv_ret = lttcomm_recv_creds_unix_sock(socket,
3097 client->communication.inbound.buffer.data + offset,
3098 client->communication.inbound.bytes_to_receive,
3099 &client->communication.inbound.creds);
3100 if (recv_ret > 0) {
01ea340e 3101 client->communication.inbound.expect_creds = false;
ab0ee2ca
JG
3102 client->communication.inbound.creds_received = true;
3103 }
3104 } else {
3105 recv_ret = lttcomm_recv_unix_sock_non_block(socket,
3106 client->communication.inbound.buffer.data + offset,
3107 client->communication.inbound.bytes_to_receive);
3108 }
505b2d90
JG
3109 if (recv_ret >= 0) {
3110 client->communication.inbound.bytes_to_receive -= recv_ret;
3111 message_is_complete = client->communication.inbound
3112 .bytes_to_receive == 0;
3113 }
3114 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3115 if (recv_ret < 0) {
3116 goto error_disconnect_client;
3117 }
3118
505b2d90 3119 if (message_is_complete) {
ab0ee2ca
JG
3120 ret = client_dispatch_message(client, state);
3121 if (ret) {
3122 /*
3123 * Only returns an error if this client must be
3124 * disconnected.
3125 */
3126 goto error_disconnect_client;
3127 }
ab0ee2ca
JG
3128 }
3129end:
3130 return ret;
3131error_disconnect_client:
505b2d90
JG
3132 pthread_mutex_lock(&client->lock);
3133 ret = notification_thread_client_disconnect(client, state);
3134 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3135 return ret;
3136}
3137
3138/* Client ready to receive outgoing data. */
3139int handle_notification_thread_client_out(
3140 struct notification_thread_state *state, int socket)
3141{
3142 int ret;
3143 struct notification_client *client;
3144
3145 client = get_client_from_socket(socket, state);
3146 if (!client) {
3147 /* Internal error, abort. */
3148 ret = -1;
3149 goto end;
3150 }
3151
505b2d90 3152 pthread_mutex_lock(&client->lock);
ab0ee2ca 3153 ret = client_flush_outgoing_queue(client, state);
505b2d90 3154 pthread_mutex_unlock(&client->lock);
ab0ee2ca
JG
3155 if (ret) {
3156 goto end;
3157 }
3158end:
3159 return ret;
3160}
3161
3162static
e8360425
JD
3163bool evaluate_buffer_usage_condition(const struct lttng_condition *condition,
3164 const struct channel_state_sample *sample,
3165 uint64_t buffer_capacity)
ab0ee2ca
JG
3166{
3167 bool result = false;
3168 uint64_t threshold;
3169 enum lttng_condition_type condition_type;
e8360425 3170 const struct lttng_condition_buffer_usage *use_condition = container_of(
ab0ee2ca
JG
3171 condition, struct lttng_condition_buffer_usage,
3172 parent);
3173
ab0ee2ca
JG
3174 if (use_condition->threshold_bytes.set) {
3175 threshold = use_condition->threshold_bytes.value;
3176 } else {
3177 /*
3178 * Threshold was expressed as a ratio.
3179 *
3180 * TODO the threshold (in bytes) of conditions expressed
3181 * as a ratio of total buffer size could be cached to
3182 * forego this double-multiplication or it could be performed
3183 * as fixed-point math.
3184 *
d49487dc 3185 * Note that caching should accommodates the case where the
ab0ee2ca
JG
3186 * condition applies to multiple channels (i.e. don't assume
3187 * that all channels matching my_chann* have the same size...)
3188 */
3189 threshold = (uint64_t) (use_condition->threshold_ratio.value *
3190 (double) buffer_capacity);
3191 }
3192
3193 condition_type = lttng_condition_get_type(condition);
3194 if (condition_type == LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW) {
3195 DBG("[notification-thread] Low buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3196 threshold, sample->highest_usage);
3197
3198 /*
3199 * The low condition should only be triggered once _all_ of the
3200 * streams in a channel have gone below the "low" threshold.
3201 */
3202 if (sample->highest_usage <= threshold) {
3203 result = true;
3204 }
3205 } else {
3206 DBG("[notification-thread] High buffer usage condition being evaluated: threshold = %" PRIu64 ", highest usage = %" PRIu64,
3207 threshold, sample->highest_usage);
3208
3209 /*
3210 * For high buffer usage scenarios, we want to trigger whenever
3211 * _any_ of the streams has reached the "high" threshold.
3212 */
3213 if (sample->highest_usage >= threshold) {
3214 result = true;
3215 }
3216 }
e8360425 3217
ab0ee2ca
JG
3218 return result;
3219}
3220
3221static
e8360425
JD
3222bool evaluate_session_consumed_size_condition(
3223 const struct lttng_condition *condition,
3224 uint64_t session_consumed_size)
3225{
3226 uint64_t threshold;
3227 const struct lttng_condition_session_consumed_size *size_condition =
3228 container_of(condition,
3229 struct lttng_condition_session_consumed_size,
3230 parent);
3231
3232 threshold = size_condition->consumed_threshold_bytes.value;
3233 DBG("[notification-thread] Session consumed size condition being evaluated: threshold = %" PRIu64 ", current size = %" PRIu64,
3234 threshold, session_consumed_size);
3235 return session_consumed_size >= threshold;
3236}
3237
3238static
ed327204 3239int evaluate_buffer_condition(const struct lttng_condition *condition,
ab0ee2ca 3240 struct lttng_evaluation **evaluation,
e8360425
JD
3241 const struct notification_thread_state *state,
3242 const struct channel_state_sample *previous_sample,
3243 const struct channel_state_sample *latest_sample,
3244 uint64_t previous_session_consumed_total,
3245 uint64_t latest_session_consumed_total,
3246 struct channel_info *channel_info)
ab0ee2ca
JG
3247{
3248 int ret = 0;
3249 enum lttng_condition_type condition_type;
e8360425
JD
3250 const bool previous_sample_available = !!previous_sample;
3251 bool previous_sample_result = false;
ab0ee2ca
JG
3252 bool latest_sample_result;
3253
3254 condition_type = lttng_condition_get_type(condition);
ab0ee2ca 3255
e8360425
JD
3256 switch (condition_type) {
3257 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3258 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
3259 if (caa_likely(previous_sample_available)) {
3260 previous_sample_result =
3261 evaluate_buffer_usage_condition(condition,
3262 previous_sample, channel_info->capacity);
3263 }
3264 latest_sample_result = evaluate_buffer_usage_condition(
3265 condition, latest_sample,
3266 channel_info->capacity);
3267 break;
3268 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3269 if (caa_likely(previous_sample_available)) {
3270 previous_sample_result =
3271 evaluate_session_consumed_size_condition(
3272 condition,
3273 previous_session_consumed_total);
3274 }
3275 latest_sample_result =
3276 evaluate_session_consumed_size_condition(
3277 condition,
3278 latest_session_consumed_total);
3279 break;
3280 default:
3281 /* Unknown condition type; internal error. */
3282 abort();
3283 }
ab0ee2ca
JG
3284
3285 if (!latest_sample_result ||
3286 (previous_sample_result == latest_sample_result)) {
3287 /*
3288 * Only trigger on a condition evaluation transition.
3289 *
3290 * NOTE: This edge-triggered logic may not be appropriate for
3291 * future condition types.
3292 */
3293 goto end;
3294 }
3295
e8360425
JD
3296 if (!evaluation || !latest_sample_result) {
3297 goto end;
3298 }
3299
3300 switch (condition_type) {
3301 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_LOW:
3302 case LTTNG_CONDITION_TYPE_BUFFER_USAGE_HIGH:
ab0ee2ca
JG
3303 *evaluation = lttng_evaluation_buffer_usage_create(
3304 condition_type,
3305 latest_sample->highest_usage,
e8360425
JD
3306 channel_info->capacity);
3307 break;
3308 case LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE:
3309 *evaluation = lttng_evaluation_session_consumed_size_create(
e8360425
JD
3310 latest_session_consumed_total);
3311 break;
3312 default:
3313 abort();
3314 }
3315
3316 if (!*evaluation) {
3317 ret = -1;
3318 goto end;
ab0ee2ca
JG
3319 }
3320end:
3321 return ret;
3322}
3323
3324static
adc93634 3325int client_enqueue_dropped_notification(struct notification_client *client)
ab0ee2ca
JG
3326{
3327 int ret;
3328 struct lttng_notification_channel_message msg = {
3329 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION_DROPPED,
3330 .size = 0,
3331 };
3332
505b2d90
JG
3333 ASSERT_LOCKED(client->lock);
3334
ab0ee2ca
JG
3335 ret = lttng_dynamic_buffer_append(
3336 &client->communication.outbound.buffer, &msg,
3337 sizeof(msg));
3338 return ret;
3339}
3340
f37d0f86
JG
3341/*
3342 * Permission checks relative to notification channel clients are performed
3343 * here. Notice how object, client, and trigger credentials are involved in
3344 * this check.
3345 *
3346 * The `object` credentials are the credentials associated with the "subject"
3347 * of a condition. For instance, a `rotation completed` condition applies
3348 * to a session. When that condition is met, it will produce an evaluation
3349 * against a session. Hence, in this case, the `object` credentials are the
3350 * credentials of the "subject" session.
3351 *
3352 * The `trigger` credentials are the credentials of the user that registered the
3353 * trigger.
3354 *
3355 * The `client` credentials are the credentials of the user that created a given
3356 * notification channel.
3357 *
3358 * In terms of visibility, it is expected that non-privilieged users can only
3359 * register triggers against "their" objects (their own sessions and
3360 * applications they are allowed to interact with). They can then open a
3361 * notification channel and subscribe to notifications associated with those
3362 * triggers.
3363 *
3364 * As for privilieged users, they can register triggers against the objects of
3365 * other users. They can then subscribe to the notifications associated to their
3366 * triggers. Privilieged users _can't_ subscribe to the notifications of
3367 * triggers owned by other users; they must create their own triggers.
3368 *
3369 * This is more a concern of usability than security. It would be difficult for
3370 * a root user reliably subscribe to a specific set of conditions without
3371 * interference from external users (those could, for instance, unregister
3372 * their triggers).
3373 */
ab0ee2ca 3374static
9b63a4aa
JG
3375int send_evaluation_to_clients(const struct lttng_trigger *trigger,
3376 const struct lttng_evaluation *evaluation,
ab0ee2ca
JG
3377 struct notification_client_list* client_list,
3378 struct notification_thread_state *state,
90843f49 3379 uid_t object_uid, gid_t object_gid)
ab0ee2ca
JG
3380{
3381 int ret = 0;
c0a66c84 3382 struct lttng_payload msg_payload;
ab0ee2ca 3383 struct notification_client_list_element *client_list_element, *tmp;
9b63a4aa
JG
3384 const struct lttng_notification notification = {
3385 .condition = (struct lttng_condition *) lttng_trigger_get_const_condition(trigger),
3386 .evaluation = (struct lttng_evaluation *) evaluation,
3387 };
3647288f
JG
3388 struct lttng_notification_channel_message msg_header = {
3389 .type = (int8_t) LTTNG_NOTIFICATION_CHANNEL_MESSAGE_TYPE_NOTIFICATION,
3390 };
90843f49 3391 const struct lttng_credentials *trigger_creds = lttng_trigger_get_credentials(trigger);
ab0ee2ca 3392
c0a66c84 3393 lttng_payload_init(&msg_payload);
ab0ee2ca 3394
c0a66c84 3395 ret = lttng_dynamic_buffer_append(&msg_payload.buffer, &msg_header,
3647288f 3396 sizeof(msg_header));
ab0ee2ca
JG
3397 if (ret) {
3398 goto end;
3399 }
3400
c0a66c84 3401 ret = lttng_notification_serialize(&notification, &msg_payload);
ab0ee2ca 3402 if (ret) {
ab0ee2ca
JG
3403 ERR("[notification-thread] Failed to serialize notification");
3404 ret = -1;
3405 goto end;
3406 }
3407
3647288f 3408 /* Update payload size. */
505b2d90
JG
3409 ((struct lttng_notification_channel_message *) msg_payload.buffer.data)
3410 ->size = (uint32_t)(
3411 msg_payload.buffer.size - sizeof(msg_header));
3647288f 3412
505b2d90 3413 pthread_mutex_lock(&client_list->lock);
ab0ee2ca
JG
3414 cds_list_for_each_entry_safe(client_list_element, tmp,
3415 &client_list->list, node) {
3416 struct notification_client *client =
3417 client_list_element->client;
3418
505b2d90
JG
3419 ret = 0;
3420 pthread_mutex_lock(&client->lock);
90843f49 3421 if (client->uid != object_uid && client->gid != object_gid &&
ab0ee2ca
JG
3422 client->uid != 0) {
3423 /* Client is not allowed to monitor this channel. */
90843f49 3424 DBG("[notification-thread] Skipping client at it does not have the object permission to receive notification for this trigger");
505b2d90 3425 goto unlock_client;
90843f49
JR
3426 }
3427
3428 if (client->uid != trigger_creds->uid && client->gid != trigger_creds->gid) {
3429 DBG("[notification-thread] Skipping client at it does not have the permission to receive notification for this trigger");
505b2d90 3430 goto unlock_client;
ab0ee2ca
JG
3431 }
3432
3433 DBG("[notification-thread] Sending notification to client (fd = %i, %zu bytes)",
c0a66c84 3434 client->socket, msg_payload.buffer.size);
ab0ee2ca
JG
3435 if (client->communication.outbound.buffer.size) {
3436 /*
3437 * Outgoing data is already buffered for this client;
3438 * drop the notification and enqueue a "dropped
3439 * notification" message if this is the first dropped
3440 * notification since the socket spilled-over to the
3441 * queue.
3442 */
3443 DBG("[notification-thread] Dropping notification addressed to client (socket fd = %i)",
3444 client->socket);
3445 if (!client->communication.outbound.dropped_notification) {
3446 client->communication.outbound.dropped_notification = true;
3447 ret = client_enqueue_dropped_notification(
adc93634 3448 client);
ab0ee2ca 3449 if (ret) {
505b2d90 3450 goto unlock_client;
ab0ee2ca
JG
3451 }
3452 }
505b2d90 3453 goto unlock_client;
ab0ee2ca
JG
3454 }
3455
3456 ret = lttng_dynamic_buffer_append_buffer(
3457 &client->communication.outbound.buffer,
c0a66c84 3458 &msg_payload.buffer);
ab0ee2ca 3459 if (ret) {
505b2d90 3460 goto unlock_client;
ab0ee2ca
JG
3461 }
3462
3463 ret = client_flush_outgoing_queue(client, state);
3464 if (ret) {
505b2d90
JG
3465 goto unlock_client;
3466 }
3467unlock_client:
3468 pthread_mutex_unlock(&client->lock);
3469 if (ret) {
3470 goto end_unlock_list;
ab0ee2ca
JG
3471 }
3472 }
3473 ret = 0;
505b2d90
JG
3474
3475end_unlock_list:
3476 pthread_mutex_unlock(&client_list->lock);
ab0ee2ca 3477end:
c0a66c84 3478 lttng_payload_reset(&msg_payload);
ab0ee2ca
JG
3479 return ret;
3480}
3481
3482int handle_notification_thread_channel_sample(
3483 struct notification_thread_state *state, int pipe,
3484 enum lttng_domain_type domain)
3485{
3486 int ret = 0;
3487 struct lttcomm_consumer_channel_monitor_msg sample_msg;
ab0ee2ca
JG
3488 struct channel_info *channel_info;
3489 struct cds_lfht_node *node;
3490 struct cds_lfht_iter iter;
3491 struct lttng_channel_trigger_list *trigger_list;
3492 struct lttng_trigger_list_element *trigger_list_element;
3493 bool previous_sample_available = false;
e8360425
JD
3494 struct channel_state_sample previous_sample, latest_sample;
3495 uint64_t previous_session_consumed_total, latest_session_consumed_total;
ab0ee2ca
JG
3496
3497 /*
3498 * The monitoring pipe only holds messages smaller than PIPE_BUF,
3499 * ensuring that read/write of sampling messages are atomic.
3500 */
7c2551ef 3501 ret = lttng_read(pipe, &sample_msg, sizeof(sample_msg));
ab0ee2ca
JG
3502 if (ret != sizeof(sample_msg)) {
3503 ERR("[notification-thread] Failed to read from monitoring pipe (fd = %i)",
3504 pipe);
3505 ret = -1;
3506 goto end;
3507 }
3508
3509 ret = 0;
3510 latest_sample.key.key = sample_msg.key;
3511 latest_sample.key.domain = domain;
3512 latest_sample.highest_usage = sample_msg.highest;
3513 latest_sample.lowest_usage = sample_msg.lowest;
e8360425 3514 latest_sample.channel_total_consumed = sample_msg.total_consumed;
ab0ee2ca
JG
3515
3516 rcu_read_lock();
3517
3518 /* Retrieve the channel's informations */
3519 cds_lfht_lookup(state->channels_ht,
3520 hash_channel_key(&latest_sample.key),
3521 match_channel_info,
3522 &latest_sample.key,
3523 &iter);
3524 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3525 if (caa_unlikely(!node)) {
ab0ee2ca
JG
3526 /*
3527 * Not an error since the consumer can push a sample to the pipe
3528 * and the rest of the session daemon could notify us of the
3529 * channel's destruction before we get a chance to process that
3530 * sample.
3531 */
3532 DBG("[notification-thread] Received a sample for an unknown channel from consumerd, key = %" PRIu64 " in %s domain",
3533 latest_sample.key.key,
3534 domain == LTTNG_DOMAIN_KERNEL ? "kernel" :
3535 "user space");
3536 goto end_unlock;
3537 }
3538 channel_info = caa_container_of(node, struct channel_info,
3539 channels_ht_node);
e8360425 3540 DBG("[notification-thread] Handling channel sample for channel %s (key = %" PRIu64 ") in session %s (highest usage = %" PRIu64 ", lowest usage = %" PRIu64", total consumed = %" PRIu64")",
8abe313a 3541 channel_info->name,
ab0ee2ca 3542 latest_sample.key.key,
8abe313a 3543 channel_info->session_info->name,
ab0ee2ca 3544 latest_sample.highest_usage,
e8360425
JD
3545 latest_sample.lowest_usage,
3546 latest_sample.channel_total_consumed);
3547
3548 previous_session_consumed_total =
3549 channel_info->session_info->consumed_data_size;
ab0ee2ca
JG
3550
3551 /* Retrieve the channel's last sample, if it exists, and update it. */
3552 cds_lfht_lookup(state->channel_state_ht,
3553 hash_channel_key(&latest_sample.key),
3554 match_channel_state_sample,
3555 &latest_sample.key,
3556 &iter);
3557 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3558 if (caa_likely(node)) {
ab0ee2ca
JG
3559 struct channel_state_sample *stored_sample;
3560
3561 /* Update the sample stored. */
3562 stored_sample = caa_container_of(node,
3563 struct channel_state_sample,
3564 channel_state_ht_node);
e8360425 3565
ab0ee2ca
JG
3566 memcpy(&previous_sample, stored_sample,
3567 sizeof(previous_sample));
3568 stored_sample->highest_usage = latest_sample.highest_usage;
3569 stored_sample->lowest_usage = latest_sample.lowest_usage;
0f0479d7 3570 stored_sample->channel_total_consumed = latest_sample.channel_total_consumed;
ab0ee2ca 3571 previous_sample_available = true;
e8360425
JD
3572
3573 latest_session_consumed_total =
3574 previous_session_consumed_total +
3575 (latest_sample.channel_total_consumed - previous_sample.channel_total_consumed);
ab0ee2ca
JG
3576 } else {
3577 /*
3578 * This is the channel's first sample, allocate space for and
3579 * store the new sample.
3580 */
3581 struct channel_state_sample *stored_sample;
3582
3583 stored_sample = zmalloc(sizeof(*stored_sample));
3584 if (!stored_sample) {
3585 ret = -1;
3586 goto end_unlock;
3587 }
3588
3589 memcpy(stored_sample, &latest_sample, sizeof(*stored_sample));
3590 cds_lfht_node_init(&stored_sample->channel_state_ht_node);
3591 cds_lfht_add(state->channel_state_ht,
3592 hash_channel_key(&stored_sample->key),
3593 &stored_sample->channel_state_ht_node);
e8360425
JD
3594
3595 latest_session_consumed_total =
3596 previous_session_consumed_total +
3597 latest_sample.channel_total_consumed;
ab0ee2ca
JG
3598 }
3599
e8360425
JD
3600 channel_info->session_info->consumed_data_size =
3601 latest_session_consumed_total;
3602
ab0ee2ca
JG
3603 /* Find triggers associated with this channel. */
3604 cds_lfht_lookup(state->channel_triggers_ht,
3605 hash_channel_key(&latest_sample.key),
3606 match_channel_trigger_list,
3607 &latest_sample.key,
3608 &iter);
3609 node = cds_lfht_iter_get_node(&iter);
e0b5f87b 3610 if (caa_likely(!node)) {
ab0ee2ca
JG
3611 goto end_unlock;
3612 }
3613
3614 trigger_list = caa_container_of(node, struct lttng_channel_trigger_list,
3615 channel_triggers_ht_node);
3616 cds_list_for_each_entry(trigger_list_element, &trigger_list->list,
e742b055 3617 node) {
9b63a4aa
JG
3618 const struct lttng_condition *condition;
3619 const struct lttng_action *action;
3620 const struct lttng_trigger *trigger;
505b2d90 3621 struct notification_client_list *client_list = NULL;
ab0ee2ca 3622 struct lttng_evaluation *evaluation = NULL;
505b2d90 3623 bool client_list_is_empty;
ab0ee2ca 3624
505b2d90 3625 ret = 0;
ab0ee2ca 3626 trigger = trigger_list_element->trigger;
9b63a4aa 3627 condition = lttng_trigger_get_const_condition(trigger);
ab0ee2ca 3628 assert(condition);
9b63a4aa 3629 action = lttng_trigger_get_const_action(trigger);
ab0ee2ca
JG
3630
3631 /* Notify actions are the only type currently supported. */
9b63a4aa 3632 assert(lttng_action_get_type_const(action) ==
ab0ee2ca
JG
3633 LTTNG_ACTION_TYPE_NOTIFY);
3634
3635 /*
3636 * Check if any client is subscribed to the result of this
3637 * evaluation.
3638 */
ed327204
JG
3639 client_list = get_client_list_from_condition(state, condition);
3640 assert(client_list);
505b2d90
JG
3641 client_list_is_empty = cds_list_empty(&client_list->list);
3642 if (client_list_is_empty) {
ab0ee2ca
JG
3643 /*
3644 * No clients interested in the evaluation's result,
3645 * skip it.
3646 */
505b2d90 3647 goto put_list;
ab0ee2ca
JG
3648 }
3649
ed327204 3650 ret = evaluate_buffer_condition(condition, &evaluation, state,
ab0ee2ca 3651 previous_sample_available ? &previous_sample : NULL,
e8360425
JD
3652 &latest_sample,
3653 previous_session_consumed_total,
3654 latest_session_consumed_total,
3655 channel_info);
3656 if (caa_unlikely(ret)) {
505b2d90 3657 goto put_list;
ab0ee2ca
JG
3658 }
3659
e8360425 3660 if (caa_likely(!evaluation)) {
505b2d90 3661 goto put_list;
ab0ee2ca
JG
3662 }
3663
3664 /* Dispatch evaluation result to all clients. */
3665 ret = send_evaluation_to_clients(trigger_list_element->trigger,
3666 evaluation, client_list, state,
8abe313a
JG
3667 channel_info->session_info->uid,
3668 channel_info->session_info->gid);
3669 lttng_evaluation_destroy(evaluation);
505b2d90
JG
3670put_list:
3671 notification_client_list_put(client_list);
e0b5f87b 3672 if (caa_unlikely(ret)) {
505b2d90 3673 break;
ab0ee2ca
JG
3674 }
3675 }
3676end_unlock:
3677 rcu_read_unlock();
3678end:
3679 return ret;
3680}
This page took 0.229219 seconds and 5 git commands to generate.