Deliverables 3 and 4
[deliverable/lttng-tools.git] / src / bin / lttng-sessiond / notification-thread.c
1 /*
2 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <lttng/trigger/trigger.h>
20 #include <lttng/notification/channel-internal.h>
21 #include <lttng/notification/notification-internal.h>
22 #include <lttng/condition/condition-internal.h>
23 #include <lttng/condition/buffer-usage-internal.h>
24 #include <common/error.h>
25 #include <common/config/session-config.h>
26 #include <common/defaults.h>
27 #include <common/utils.h>
28 #include <common/futex.h>
29 #include <common/align.h>
30 #include <common/time.h>
31 #include <sys/eventfd.h>
32 #include <sys/stat.h>
33 #include <time.h>
34 #include <signal.h>
35
36 #include "notification-thread.h"
37 #include "notification-thread-events.h"
38 #include "notification-thread-commands.h"
39 #include "lttng-sessiond.h"
40 #include "health-sessiond.h"
41
42 #include <urcu.h>
43 #include <urcu/list.h>
44 #include <urcu/rculfhash.h>
45
46 /**
47 * This thread maintains an internal state associating clients and triggers.
48 *
49 * In order to speed-up and simplify queries, hash tables providing the
50 * following associations are maintained:
51 *
52 * - client_socket_ht: associate a client's socket (fd) to its "struct client"
53 * This hash table owns the "struct client" which must thus be
54 * disposed-of on removal from the hash table.
55 *
56 * - channel_triggers_ht:
57 * associates a channel key to a list of
58 * struct lttng_trigger_list_nodes. The triggers in this list are
59 * those that have conditions that apply to this channel.
60 * This hash table owns the list, but not the triggers themselves.
61 *
62 * - channel_state_ht:
63 * associates a pair (channel key, channel domain) to its last
64 * sampled state received from the consumer daemon
65 * (struct channel_state).
66 * This previous sample is kept to implement edge-triggered
67 * conditions as we need to detect the state transitions.
68 * This hash table owns the channel state.
69 *
70 * - notification_trigger_clients_ht:
71 * associates notification-emitting triggers to clients
72 * (struct notification_client_ht_node) subscribed to those
73 * conditions.
74 * The condition's hash and match functions are used directly since
75 * all triggers in this hash table have the "notify" action.
76 * This hash table holds no ownership.
77 *
78 * - channels_ht:
79 * associates a channel_key to a struct channel_info. The hash table
80 * holds the ownership of the struct channel_info.
81 *
82 * - triggers_ht:
83 * associated a condition to a struct lttng_trigger_ht_element.
84 * The hash table holds the ownership of the
85 * lttng_trigger_ht_elements along with the triggers themselves.
86 *
87 * The thread reacts to the following internal events:
88 * 1) creation of a tracing channel,
89 * 2) destruction of a tracing channel,
90 * 3) registration of a trigger,
91 * 4) unregistration of a trigger,
92 * 5) reception of a channel monitor sample from the consumer daemon.
93 *
94 * Events specific to notification-emitting triggers:
95 * 6) connection of a notification client,
96 * 7) disconnection of a notification client,
97 * 8) subscription of a client to a conditions' notifications,
98 * 9) unsubscription of a client from a conditions' notifications,
99 *
100 *
101 * 1) Creation of a tracing channel
102 * - notification_trigger_clients_ht is traversed to identify
103 * triggers which apply to this new channel,
104 * - triggers identified are added to the channel_triggers_ht.
105 * - add channel to channels_ht
106 *
107 * 2) Destruction of a tracing channel
108 * - remove entry from channel_triggers_ht, releasing the list wrapper and
109 * elements,
110 * - remove entry from the channel_state_ht.
111 * - remove channel from channels_ht
112 *
113 * 3) Registration of a trigger
114 * - if the trigger's action is of type "notify",
115 * - traverse the list of conditions of every client to build a list of
116 * clients which have to be notified when this trigger's condition is met,
117 * - add list of clients (even if it is empty) to the
118 * notification_trigger_clients_ht,
119 * - add trigger to channel_triggers_ht (if applicable),
120 * - add trigger to triggers_ht
121 *
122 * 4) Unregistration of a trigger
123 * - if the trigger's action is of type "notify",
124 * - remove the trigger from the notification_trigger_clients_ht,
125 * - remove trigger from channel_triggers_ht (if applicable),
126 * - remove trigger from triggers_ht
127 *
128 * 5) Reception of a channel monitor sample from the consumer daemon
129 * - evaluate the conditions associated with the triggers found in
130 * the channel_triggers_ht,
131 * - if a condition evaluates to "true" and the condition is of type
132 * "notify", query the notification_trigger_clients_ht and send
133 * a notification to the clients.
134 *
135 * 6) Connection of a client
136 * - add client socket to the client_socket_ht.
137 *
138 * 7) Disconnection of a client
139 * - remove client socket from the client_socket_ht,
140 * - traverse all conditions to which the client is subscribed and remove
141 * the client from the notification_trigger_clients_ht.
142 *
143 * 8) Subscription of a client to a condition's notifications
144 * - Add the condition to the client's list of subscribed conditions,
145 * - Look-up notification_trigger_clients_ht and add the client to
146 * list of clients.
147 *
148 * 9) Unsubscription of a client to a condition's notifications
149 * - Remove the condition from the client's list of subscribed conditions,
150 * - Look-up notification_trigger_clients_ht and remove the client
151 * from the list of clients.
152 */
153
154 /*
155 * Destroy the thread data previously created by the init function.
156 */
157 void notification_thread_handle_destroy(
158 struct notification_thread_handle *handle)
159 {
160 int ret;
161 struct notification_thread_command *cmd, *tmp;
162
163 if (!handle) {
164 goto end;
165 }
166
167 if (handle->cmd_queue.event_fd < 0) {
168 goto end;
169 }
170 ret = close(handle->cmd_queue.event_fd);
171 if (ret < 0) {
172 PERROR("close notification command queue event_fd");
173 }
174
175 pthread_mutex_lock(&handle->cmd_queue.lock);
176 /* Purge queue of in-flight commands and mark them as cancelled. */
177 cds_list_for_each_entry_safe(cmd, tmp, &handle->cmd_queue.list,
178 cmd_list_node) {
179 cds_list_del(&cmd->cmd_list_node);
180 cmd->reply_code = LTTNG_ERR_COMMAND_CANCELLED;
181 futex_nto1_wake(&cmd->reply_futex);
182 }
183 pthread_mutex_unlock(&handle->cmd_queue.lock);
184 pthread_mutex_destroy(&handle->cmd_queue.lock);
185
186 if (handle->channel_monitoring_pipes.ust32_consumer >= 0) {
187 ret = close(handle->channel_monitoring_pipes.ust32_consumer);
188 if (ret) {
189 PERROR("close 32-bit consumer channel monitoring pipe");
190 }
191 }
192 if (handle->channel_monitoring_pipes.ust64_consumer >= 0) {
193 ret = close(handle->channel_monitoring_pipes.ust64_consumer);
194 if (ret) {
195 PERROR("close 64-bit consumer channel monitoring pipe");
196 }
197 }
198 if (handle->channel_monitoring_pipes.kernel_consumer >= 0) {
199 ret = close(handle->channel_monitoring_pipes.kernel_consumer);
200 if (ret) {
201 PERROR("close kernel consumer channel monitoring pipe");
202 }
203 }
204 end:
205 free(handle);
206 }
207
208 struct notification_thread_handle *notification_thread_handle_create(
209 struct lttng_pipe *ust32_channel_monitor_pipe,
210 struct lttng_pipe *ust64_channel_monitor_pipe,
211 struct lttng_pipe *kernel_channel_monitor_pipe)
212 {
213 int ret;
214 struct notification_thread_handle *handle;
215
216 handle = zmalloc(sizeof(*handle));
217 if (!handle) {
218 goto end;
219 }
220
221 /* FIXME Replace eventfd by a pipe to support older kernels. */
222 handle->cmd_queue.event_fd = eventfd(0, EFD_CLOEXEC);
223 if (handle->cmd_queue.event_fd < 0) {
224 PERROR("eventfd notification command queue");
225 goto error;
226 }
227 CDS_INIT_LIST_HEAD(&handle->cmd_queue.list);
228 ret = pthread_mutex_init(&handle->cmd_queue.lock, NULL);
229 if (ret) {
230 goto error;
231 }
232
233 if (ust32_channel_monitor_pipe) {
234 handle->channel_monitoring_pipes.ust32_consumer =
235 lttng_pipe_release_readfd(
236 ust32_channel_monitor_pipe);
237 if (handle->channel_monitoring_pipes.ust32_consumer < 0) {
238 goto error;
239 }
240 } else {
241 handle->channel_monitoring_pipes.ust32_consumer = -1;
242 }
243 if (ust64_channel_monitor_pipe) {
244 handle->channel_monitoring_pipes.ust64_consumer =
245 lttng_pipe_release_readfd(
246 ust64_channel_monitor_pipe);
247 if (handle->channel_monitoring_pipes.ust64_consumer < 0) {
248 goto error;
249 }
250 } else {
251 handle->channel_monitoring_pipes.ust64_consumer = -1;
252 }
253 if (kernel_channel_monitor_pipe) {
254 handle->channel_monitoring_pipes.kernel_consumer =
255 lttng_pipe_release_readfd(
256 kernel_channel_monitor_pipe);
257 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
258 goto error;
259 }
260 } else {
261 handle->channel_monitoring_pipes.kernel_consumer = -1;
262 }
263 end:
264 return handle;
265 error:
266 notification_thread_handle_destroy(handle);
267 return NULL;
268 }
269
270 static
271 char *get_notification_channel_sock_path(void)
272 {
273 int ret;
274 bool is_root = !getuid();
275 char *sock_path;
276
277 sock_path = zmalloc(LTTNG_PATH_MAX);
278 if (!sock_path) {
279 goto error;
280 }
281
282 if (is_root) {
283 ret = snprintf(sock_path, LTTNG_PATH_MAX,
284 DEFAULT_GLOBAL_NOTIFICATION_CHANNEL_UNIX_SOCK);
285 if (ret < 0) {
286 goto error;
287 }
288 } else {
289 char *home_path = utils_get_home_dir();
290
291 if (!home_path) {
292 ERR("Can't get HOME directory for socket creation");
293 goto error;
294 }
295
296 ret = snprintf(sock_path, LTTNG_PATH_MAX,
297 DEFAULT_HOME_NOTIFICATION_CHANNEL_UNIX_SOCK,
298 home_path);
299 if (ret < 0) {
300 goto error;
301 }
302 }
303
304 return sock_path;
305 error:
306 free(sock_path);
307 return NULL;
308 }
309
310 static
311 void notification_channel_socket_destroy(int fd)
312 {
313 int ret;
314 char *sock_path = get_notification_channel_sock_path();
315
316 DBG("[notification-thread] Destroying notification channel socket");
317
318 if (sock_path) {
319 ret = unlink(sock_path);
320 free(sock_path);
321 if (ret < 0) {
322 PERROR("unlink notification channel socket");
323 }
324 }
325
326 ret = close(fd);
327 if (ret) {
328 PERROR("close notification channel socket");
329 }
330 }
331
332 static
333 int notification_channel_socket_create(void)
334 {
335 int fd = -1, ret;
336 char *sock_path = get_notification_channel_sock_path();
337
338 DBG("[notification-thread] Creating notification channel UNIX socket at %s",
339 sock_path);
340
341 ret = lttcomm_create_unix_sock(sock_path);
342 if (ret < 0) {
343 ERR("[notification-thread] Failed to create notification socket");
344 goto error;
345 }
346 fd = ret;
347
348 ret = chmod(sock_path, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
349 if (ret < 0) {
350 ERR("Set file permissions failed: %s", sock_path);
351 PERROR("chmod notification channel socket");
352 goto error;
353 }
354
355 DBG("[notification-thread] Notification channel UNIX socket created (fd = %i)",
356 fd);
357 free(sock_path);
358 return fd;
359 error:
360 if (fd >= 0 && close(fd) < 0) {
361 PERROR("close notification channel socket");
362 }
363 free(sock_path);
364 return ret;
365 }
366
367 static
368 int init_poll_set(struct lttng_poll_event *poll_set,
369 struct notification_thread_handle *handle,
370 int notification_channel_socket)
371 {
372 int ret;
373
374 /*
375 * Create pollset with size 6:
376 * - quit pipe,
377 * - notification channel socket (listen for new connections),
378 * - command queue event fd (internal sessiond commands),
379 * - consumerd (32-bit user space) channel monitor pipe,
380 * - consumerd (64-but user space) channel monitor pipe,
381 * - consumerd (kernel) channel monitor pipe.
382 */
383 ret = sessiond_set_thread_pollset(poll_set, 6);
384 if (ret < 0) {
385 goto end;
386 }
387
388 ret = lttng_poll_add(poll_set, notification_channel_socket,
389 LPOLLIN | LPOLLERR | LPOLLHUP | LPOLLRDHUP);
390 if (ret < 0) {
391 ERR("[notification-thread] Failed to add notification channel socket to pollset");
392 goto error;
393 }
394 ret = lttng_poll_add(poll_set, handle->cmd_queue.event_fd,
395 LPOLLIN | LPOLLERR);
396 if (ret < 0) {
397 ERR("[notification-thread] Failed to add notification command queue event fd to pollset");
398 goto error;
399 }
400 ret = lttng_poll_add(poll_set,
401 handle->channel_monitoring_pipes.ust32_consumer,
402 LPOLLIN | LPOLLERR);
403 if (ret < 0) {
404 ERR("[notification-thread] Failed to add ust-32 channel monitoring pipe fd to pollset");
405 goto error;
406 }
407 ret = lttng_poll_add(poll_set,
408 handle->channel_monitoring_pipes.ust64_consumer,
409 LPOLLIN | LPOLLERR);
410 if (ret < 0) {
411 ERR("[notification-thread] Failed to add ust-64 channel monitoring pipe fd to pollset");
412 goto error;
413 }
414 if (handle->channel_monitoring_pipes.kernel_consumer < 0) {
415 goto end;
416 }
417 ret = lttng_poll_add(poll_set,
418 handle->channel_monitoring_pipes.kernel_consumer,
419 LPOLLIN | LPOLLERR);
420 if (ret < 0) {
421 ERR("[notification-thread] Failed to add kernel channel monitoring pipe fd to pollset");
422 goto error;
423 }
424 end:
425 return ret;
426 error:
427 lttng_poll_clean(poll_set);
428 return ret;
429 }
430
431 static
432 void fini_thread_state(struct notification_thread_state *state)
433 {
434 int ret;
435
436 if (state->client_socket_ht) {
437 ret = handle_notification_thread_client_disconnect_all(state);
438 assert(!ret);
439 ret = cds_lfht_destroy(state->client_socket_ht, NULL);
440 assert(!ret);
441 }
442 if (state->triggers_ht) {
443 ret = handle_notification_thread_trigger_unregister_all(state);
444 assert(!ret);
445 ret = cds_lfht_destroy(state->triggers_ht, NULL);
446 assert(!ret);
447 }
448 if (state->channel_triggers_ht) {
449 ret = cds_lfht_destroy(state->channel_triggers_ht, NULL);
450 assert(!ret);
451 }
452 if (state->channel_state_ht) {
453 ret = cds_lfht_destroy(state->channel_state_ht, NULL);
454 assert(!ret);
455 }
456 if (state->notification_trigger_clients_ht) {
457 ret = cds_lfht_destroy(state->notification_trigger_clients_ht,
458 NULL);
459 assert(!ret);
460 }
461 if (state->channels_ht) {
462 ret = cds_lfht_destroy(state->channels_ht,
463 NULL);
464 assert(!ret);
465 }
466
467 if (state->notification_channel_socket >= 0) {
468 notification_channel_socket_destroy(
469 state->notification_channel_socket);
470 }
471 lttng_poll_clean(&state->events);
472 }
473
474 static
475 int init_thread_state(struct notification_thread_handle *handle,
476 struct notification_thread_state *state)
477 {
478 int ret;
479
480 memset(state, 0, sizeof(*state));
481 state->notification_channel_socket = -1;
482 lttng_poll_init(&state->events);
483
484 ret = notification_channel_socket_create();
485 if (ret < 0) {
486 goto end;
487 }
488 state->notification_channel_socket = ret;
489
490 ret = init_poll_set(&state->events, handle,
491 state->notification_channel_socket);
492 if (ret) {
493 goto end;
494 }
495
496 DBG("[notification-thread] Listening on notification channel socket");
497 ret = lttcomm_listen_unix_sock(state->notification_channel_socket);
498 if (ret < 0) {
499 ERR("[notification-thread] Listen failed on notification channel socket");
500 goto error;
501 }
502
503 state->client_socket_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
504 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
505 if (!state->client_socket_ht) {
506 goto error;
507 }
508
509 state->channel_triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
510 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
511 if (!state->channel_triggers_ht) {
512 goto error;
513 }
514
515 state->channel_state_ht = cds_lfht_new(DEFAULT_HT_SIZE, 1, 0,
516 CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
517 if (!state->channel_state_ht) {
518 goto error;
519 }
520
521 state->notification_trigger_clients_ht = cds_lfht_new(DEFAULT_HT_SIZE,
522 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
523 if (!state->notification_trigger_clients_ht) {
524 goto error;
525 }
526
527 state->channels_ht = cds_lfht_new(DEFAULT_HT_SIZE,
528 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
529 if (!state->channels_ht) {
530 goto error;
531 }
532
533 state->triggers_ht = cds_lfht_new(DEFAULT_HT_SIZE,
534 1, 0, CDS_LFHT_AUTO_RESIZE | CDS_LFHT_ACCOUNTING, NULL);
535 if (!state->triggers_ht) {
536 goto error;
537 }
538 end:
539 return 0;
540 error:
541 fini_thread_state(state);
542 return -1;
543 }
544
545 /*
546 * This thread services notification channel clients and received notifications
547 * from various lttng-sessiond components over a command queue.
548 */
549 void *thread_notification(void *data)
550 {
551 int ret;
552 struct notification_thread_handle *handle = data;
553 struct notification_thread_state state;
554
555 DBG("[notification-thread] Started notification thread");
556
557 if (!handle) {
558 ERR("[notification-thread] Invalid thread context provided");
559 goto end;
560 }
561
562 rcu_register_thread();
563 rcu_thread_online();
564
565 health_register(health_sessiond, HEALTH_SESSIOND_TYPE_NOTIFICATION);
566 health_code_update();
567
568 ret = init_thread_state(handle, &state);
569 if (ret) {
570 goto end;
571 }
572
573 /* Ready to handle client connections. */
574 sessiond_notify_ready();
575
576 while (true) {
577 int fd_count, i;
578
579 health_poll_entry();
580 DBG("[notification-thread] Entering poll wait");
581 ret = lttng_poll_wait(&state.events, -1);
582 DBG("[notification-thread] Poll wait returned (%i)", ret);
583 health_poll_exit();
584 if (ret < 0) {
585 /*
586 * Restart interrupted system call.
587 */
588 if (errno == EINTR) {
589 continue;
590 }
591 ERR("[notification-thread] Error encountered during lttng_poll_wait (%i)", ret);
592 goto error;
593 }
594
595 fd_count = ret;
596 for (i = 0; i < fd_count; i++) {
597 int fd = LTTNG_POLL_GETFD(&state.events, i);
598 uint32_t revents = LTTNG_POLL_GETEV(&state.events, i);
599
600 DBG("[notification-thread] Handling fd (%i) activity (%u)", fd, revents);
601
602 /* Thread quit pipe has been closed. Killing thread. */
603 if (sessiond_check_thread_quit_pipe(fd, revents)) {
604 DBG("[notification-thread] Quit pipe signaled, exiting.");
605 goto exit;
606 }
607
608 if (fd == state.notification_channel_socket) {
609 if (revents & LPOLLIN) {
610 ret = handle_notification_thread_client_connect(
611 &state);
612 if (ret < 0) {
613 goto error;
614 }
615 } else if (revents &
616 (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
617 ERR("[notification-thread] Notification socket poll error");
618 goto error;
619 } else {
620 ERR("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
621 goto error;
622 }
623 } else if (fd == handle->cmd_queue.event_fd) {
624 ret = handle_notification_thread_command(handle,
625 &state);
626 if (ret) {
627 goto error;
628 }
629 } else if (fd == handle->channel_monitoring_pipes.ust32_consumer) {
630 ret = handle_notification_thread_channel_sample(
631 &state, fd, LTTNG_DOMAIN_UST);
632 if (ret) {
633 goto error;
634 }
635 } else if (fd == handle->channel_monitoring_pipes.ust64_consumer) {
636 ret = handle_notification_thread_channel_sample(
637 &state, fd, LTTNG_DOMAIN_UST);
638 if (ret) {
639 goto error;
640 }
641 } else if (fd == handle->channel_monitoring_pipes.kernel_consumer) {
642 ret = handle_notification_thread_channel_sample(
643 &state, fd, LTTNG_DOMAIN_KERNEL);
644 if (ret) {
645 goto error;
646 }
647 } else {
648 /* Activity on a client's socket. */
649 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
650 /*
651 * It doesn't matter if a command was
652 * pending on the client socket at this
653 * point since it now has now way to
654 * receive the notifications to which
655 * it was subscribing or unsubscribing.
656 */
657 ret = handle_notification_thread_client_disconnect(
658 fd, &state);
659 if (ret) {
660 goto error;
661 }
662 } else if (revents & LPOLLIN) {
663 ret = handle_notification_thread_client(
664 &state, fd);
665 if (ret) {
666 goto error;
667 }
668 } else {
669 DBG("[notification-thread] Unexpected poll events %u for notification socket %i", revents, fd);
670 }
671 }
672 }
673 }
674 exit:
675 error:
676 fini_thread_state(&state);
677 health_unregister(health_sessiond);
678 rcu_thread_offline();
679 rcu_unregister_thread();
680 end:
681 return NULL;
682 }
This page took 0.049671 seconds and 5 git commands to generate.