Backport: relayd: rename fd-cap parameter to fd-pool-size
[lttng-tools.git] / src / bin / lttng-relayd / main.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License, version 2 only,
9 * as published by the Free Software Foundation.
10 *
11 * This program is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14 * more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #define _LGPL_SOURCE
22 #include <getopt.h>
23 #include <grp.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/mman.h>
31 #include <sys/mount.h>
32 #include <sys/resource.h>
33 #include <sys/socket.h>
34 #include <sys/stat.h>
35 #include <sys/types.h>
36 #include <sys/wait.h>
37 #include <sys/resource.h>
38 #include <inttypes.h>
39 #include <urcu/futex.h>
40 #include <urcu/uatomic.h>
41 #include <unistd.h>
42 #include <fcntl.h>
43 #include <ctype.h>
44
45 #include <lttng/lttng.h>
46 #include <common/common.h>
47 #include <common/compat/poll.h>
48 #include <common/compat/socket.h>
49 #include <common/compat/endian.h>
50 #include <common/compat/getenv.h>
51 #include <common/defaults.h>
52 #include <common/daemonize.h>
53 #include <common/futex.h>
54 #include <common/sessiond-comm/sessiond-comm.h>
55 #include <common/sessiond-comm/inet.h>
56 #include <common/sessiond-comm/relayd.h>
57 #include <common/uri.h>
58 #include <common/utils.h>
59 #include <common/config/session-config.h>
60 #include <common/dynamic-buffer.h>
61 #include <common/buffer-view.h>
62 #include <common/fd-tracker/utils.h>
63 #include <urcu/rculist.h>
64
65 #include "version.h"
66 #include "cmd.h"
67 #include "ctf-trace.h"
68 #include "index.h"
69 #include "utils.h"
70 #include "lttng-relayd.h"
71 #include "live.h"
72 #include "health-relayd.h"
73 #include "testpoint.h"
74 #include "viewer-stream.h"
75 #include "session.h"
76 #include "stream.h"
77 #include "connection.h"
78 #include "tracefile-array.h"
79 #include "tcp_keep_alive.h"
80
81 enum relay_connection_status {
82 RELAY_CONNECTION_STATUS_OK,
83 /* An error occured while processing an event on the connection. */
84 RELAY_CONNECTION_STATUS_ERROR,
85 /* Connection closed/shutdown cleanly. */
86 RELAY_CONNECTION_STATUS_CLOSED,
87 };
88
89 /* command line options */
90 char *opt_output_path, *opt_working_directory;
91 static int opt_daemon, opt_background, opt_print_version;
92 int opt_group_output_by_session;
93 int opt_group_output_by_host;
94
95 /*
96 * We need to wait for listener and live listener threads, as well as
97 * health check thread, before being ready to signal readiness.
98 */
99 #define NR_LTTNG_RELAY_READY 3
100 static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
101
102 /* Size of receive buffer. */
103 #define RECV_DATA_BUFFER_SIZE 65536
104
105 static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
106 static pid_t child_ppid; /* Internal parent PID use with daemonize. */
107
108 static struct lttng_uri *control_uri;
109 static struct lttng_uri *data_uri;
110 static struct lttng_uri *live_uri;
111
112 const char *progname;
113
114 const char *tracing_group_name = DEFAULT_TRACING_GROUP;
115 static int tracing_group_name_override;
116
117 const char * const config_section_name = "relayd";
118
119 /*
120 * Quit pipe for all threads. This permits a single cancellation point
121 * for all threads when receiving an event on the pipe.
122 */
123 int thread_quit_pipe[2] = { -1, -1 };
124
125 /*
126 * This pipe is used to inform the worker thread that a command is queued and
127 * ready to be processed.
128 */
129 static int relay_conn_pipe[2] = { -1, -1 };
130
131 /* Shared between threads */
132 static int dispatch_thread_exit;
133
134 static pthread_t listener_thread;
135 static pthread_t dispatcher_thread;
136 static pthread_t worker_thread;
137 static pthread_t health_thread;
138
139 /*
140 * last_relay_stream_id_lock protects last_relay_stream_id increment
141 * atomicity on 32-bit architectures.
142 */
143 static pthread_mutex_t last_relay_stream_id_lock = PTHREAD_MUTEX_INITIALIZER;
144 static uint64_t last_relay_stream_id;
145
146 /*
147 * Relay command queue.
148 *
149 * The relay_thread_listener and relay_thread_dispatcher communicate with this
150 * queue.
151 */
152 static struct relay_conn_queue relay_conn_queue;
153
154 /* Cap of file desriptors to be in simultaneous use by the relay daemon. */
155 static unsigned int lttng_opt_fd_pool_size = -1;
156
157 /* Global relay stream hash table. */
158 struct lttng_ht *relay_streams_ht;
159
160 /* Global relay viewer stream hash table. */
161 struct lttng_ht *viewer_streams_ht;
162
163 /* Global relay sessions hash table. */
164 struct lttng_ht *sessions_ht;
165
166 /* Relayd health monitoring */
167 struct health_app *health_relayd;
168
169 /* Global fd tracker. */
170 struct fd_tracker *the_fd_tracker;
171
172 static struct option long_options[] = {
173 { "control-port", 1, 0, 'C', },
174 { "data-port", 1, 0, 'D', },
175 { "live-port", 1, 0, 'L', },
176 { "daemonize", 0, 0, 'd', },
177 { "background", 0, 0, 'b', },
178 { "group", 1, 0, 'g', },
179 { "fd-pool-size", 1, 0, '\0', },
180 { "help", 0, 0, 'h', },
181 { "output", 1, 0, 'o', },
182 { "verbose", 0, 0, 'v', },
183 { "config", 1, 0, 'f' },
184 { "version", 0, 0, 'V' },
185 { "working-directory", 1, 0, 'w', },
186 { "group-output-by-session", 0, 0, 's', },
187 { "group-output-by-host", 0, 0, 'p', },
188 { NULL, 0, 0, 0, },
189 };
190
191 static const char *config_ignore_options[] = { "help", "config", "version" };
192
193 static void print_version(void) {
194 fprintf(stdout, "%s\n", VERSION);
195 }
196
197 static void relayd_config_log(void)
198 {
199 DBG("LTTng-relayd " VERSION " - " VERSION_NAME "%s%s",
200 GIT_VERSION[0] == '\0' ? "" : " - " GIT_VERSION,
201 EXTRA_VERSION_NAME[0] == '\0' ? "" : " - " EXTRA_VERSION_NAME);
202 if (EXTRA_VERSION_DESCRIPTION[0] != '\0') {
203 DBG("LTTng-relayd extra version description:\n\t" EXTRA_VERSION_DESCRIPTION "\n");
204 }
205 if (EXTRA_VERSION_PATCHES[0] != '\0') {
206 DBG("LTTng-relayd extra patches:\n\t" EXTRA_VERSION_PATCHES "\n");
207 }
208 }
209
210 /*
211 * Take an option from the getopt output and set it in the right variable to be
212 * used later.
213 *
214 * Return 0 on success else a negative value.
215 */
216 static int set_option(int opt, const char *arg, const char *optname)
217 {
218 int ret;
219
220 switch (opt) {
221 case 0:
222 if (!strcmp(optname, "fd-pool-size")) {
223 unsigned long v;
224
225 errno = 0;
226 v = strtoul(arg, NULL, 0);
227 if (errno != 0 || !isdigit(arg[0])) {
228 ERR("Wrong value in --fd-pool-size parameter: %s", arg);
229 ret = -1;
230 goto end;
231 }
232 if (v >= UINT_MAX) {
233 ERR("File descriptor cap overflow in --fd-pool-size parameter: %s", arg);
234 ret = -1;
235 goto end;
236 }
237 lttng_opt_fd_pool_size = (unsigned int) v;
238 } else {
239 fprintf(stderr, "unknown option %s", optname);
240 if (arg) {
241 fprintf(stderr, " with arg %s\n", arg);
242 }
243 }
244 break;
245 case 'C':
246 if (lttng_is_setuid_setgid()) {
247 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
248 "-C, --control-port");
249 } else {
250 ret = uri_parse(arg, &control_uri);
251 if (ret < 0) {
252 ERR("Invalid control URI specified");
253 goto end;
254 }
255 if (control_uri->port == 0) {
256 control_uri->port = DEFAULT_NETWORK_CONTROL_PORT;
257 }
258 }
259 break;
260 case 'D':
261 if (lttng_is_setuid_setgid()) {
262 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
263 "-D, -data-port");
264 } else {
265 ret = uri_parse(arg, &data_uri);
266 if (ret < 0) {
267 ERR("Invalid data URI specified");
268 goto end;
269 }
270 if (data_uri->port == 0) {
271 data_uri->port = DEFAULT_NETWORK_DATA_PORT;
272 }
273 }
274 break;
275 case 'L':
276 if (lttng_is_setuid_setgid()) {
277 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
278 "-L, -live-port");
279 } else {
280 ret = uri_parse(arg, &live_uri);
281 if (ret < 0) {
282 ERR("Invalid live URI specified");
283 goto end;
284 }
285 if (live_uri->port == 0) {
286 live_uri->port = DEFAULT_NETWORK_VIEWER_PORT;
287 }
288 }
289 break;
290 case 'd':
291 opt_daemon = 1;
292 break;
293 case 'b':
294 opt_background = 1;
295 break;
296 case 'g':
297 if (lttng_is_setuid_setgid()) {
298 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
299 "-g, --group");
300 } else {
301 tracing_group_name = strdup(arg);
302 if (tracing_group_name == NULL) {
303 ret = -errno;
304 PERROR("strdup");
305 goto end;
306 }
307 tracing_group_name_override = 1;
308 }
309 break;
310 case 'h':
311 ret = utils_show_man_page(8, "lttng-relayd");
312 if (ret) {
313 ERR("Cannot view man page lttng-relayd(8)");
314 perror("exec");
315 }
316 exit(EXIT_FAILURE);
317 case 'V':
318 opt_print_version = 1;
319 break;
320 case 'o':
321 if (lttng_is_setuid_setgid()) {
322 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
323 "-o, --output");
324 } else {
325 ret = asprintf(&opt_output_path, "%s", arg);
326 if (ret < 0) {
327 ret = -errno;
328 PERROR("asprintf opt_output_path");
329 goto end;
330 }
331 }
332 break;
333 case 'w':
334 if (lttng_is_setuid_setgid()) {
335 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
336 "-w, --working-directory");
337 } else {
338 ret = asprintf(&opt_working_directory, "%s", arg);
339 if (ret < 0) {
340 ret = -errno;
341 PERROR("asprintf working_directory");
342 goto end;
343 }
344 }
345 break;
346
347 case 'v':
348 /* Verbose level can increase using multiple -v */
349 if (arg) {
350 lttng_opt_verbose = config_parse_value(arg);
351 } else {
352 /* Only 3 level of verbosity (-vvv). */
353 if (lttng_opt_verbose < 3) {
354 lttng_opt_verbose += 1;
355 }
356 }
357 break;
358 case 's':
359 if (opt_group_output_by_host) {
360 ERR("Cannot set --group-output-by-session, --group-output-by-host already defined");
361 exit(EXIT_FAILURE);
362 }
363 opt_group_output_by_session = 1;
364 break;
365 case 'p':
366 if (opt_group_output_by_session) {
367 ERR("Cannot set --group-output-by-host, --group-output-by-session already defined");
368 exit(EXIT_FAILURE);
369 }
370 opt_group_output_by_host = 1;
371 break;
372 default:
373 /* Unknown option or other error.
374 * Error is printed by getopt, just return */
375 ret = -1;
376 goto end;
377 }
378
379 /* All good. */
380 ret = 0;
381
382 end:
383 return ret;
384 }
385
386 /*
387 * config_entry_handler_cb used to handle options read from a config file.
388 * See config_entry_handler_cb comment in common/config/session-config.h for the
389 * return value conventions.
390 */
391 static int config_entry_handler(const struct config_entry *entry, void *unused)
392 {
393 int ret = 0, i;
394
395 if (!entry || !entry->name || !entry->value) {
396 ret = -EINVAL;
397 goto end;
398 }
399
400 /* Check if the option is to be ignored */
401 for (i = 0; i < sizeof(config_ignore_options) / sizeof(char *); i++) {
402 if (!strcmp(entry->name, config_ignore_options[i])) {
403 goto end;
404 }
405 }
406
407 for (i = 0; i < (sizeof(long_options) / sizeof(struct option)) - 1; i++) {
408 /* Ignore if entry name is not fully matched. */
409 if (strcmp(entry->name, long_options[i].name)) {
410 continue;
411 }
412
413 /*
414 * If the option takes no argument on the command line,
415 * we have to check if the value is "true". We support
416 * non-zero numeric values, true, on and yes.
417 */
418 if (!long_options[i].has_arg) {
419 ret = config_parse_value(entry->value);
420 if (ret <= 0) {
421 if (ret) {
422 WARN("Invalid configuration value \"%s\" for option %s",
423 entry->value, entry->name);
424 }
425 /* False, skip boolean config option. */
426 goto end;
427 }
428 }
429
430 ret = set_option(long_options[i].val, entry->value, entry->name);
431 goto end;
432 }
433
434 WARN("Unrecognized option \"%s\" in daemon configuration file.",
435 entry->name);
436
437 end:
438 return ret;
439 }
440
441 static void parse_env_options(void)
442 {
443 char *value = NULL;
444
445 value = lttng_secure_getenv(DEFAULT_LTTNG_RELAYD_WORKING_DIRECTORY_ENV);
446 if (value) {
447 opt_working_directory = value;
448 }
449 }
450
451 static int set_fd_pool_size(void)
452 {
453 int ret = 0;
454 struct rlimit rlimit;
455
456 ret = getrlimit(RLIMIT_NOFILE, &rlimit);
457 if (ret) {
458 PERROR("Failed to get file descriptor limit");
459 ret = -1;
460 goto end;
461 }
462
463 DBG("File descriptor count limits are %lu (soft) and %lu (hard)",
464 rlimit.rlim_cur, rlimit.rlim_max);
465 if (lttng_opt_fd_pool_size == -1) {
466 /* Use default value (soft limit - reserve). */
467 if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
468 ERR("The process' file number limit is too low (%lu). The process' file number limit must be set to at least %i.",
469 rlimit.rlim_cur, DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
470 ret = -1;
471 goto end;
472 }
473 lttng_opt_fd_pool_size = rlimit.rlim_cur -
474 DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
475 goto end;
476 }
477
478 if (lttng_opt_fd_pool_size < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
479 ERR("File descriptor pool size must be set to at least %d",
480 DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
481 ret = -1;
482 goto end;
483 }
484
485 if (lttng_opt_fd_pool_size > rlimit.rlim_cur) {
486 ERR("File descriptor pool size argument (%u) exceeds the process' soft limit (%lu).",
487 lttng_opt_fd_pool_size, rlimit.rlim_cur);
488 ret = -1;
489 goto end;
490 }
491
492
493 DBG("File descriptor pool size argument (%u) adjusted to %u to accomodate transient fd uses",
494 lttng_opt_fd_pool_size,
495 lttng_opt_fd_pool_size - DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE);
496 lttng_opt_fd_pool_size -= DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
497 end:
498 return ret;
499 }
500
501 static int set_options(int argc, char **argv)
502 {
503 int c, ret = 0, option_index = 0, retval = 0;
504 int orig_optopt = optopt, orig_optind = optind;
505 char *default_address, *optstring;
506 const char *config_path = NULL;
507
508 optstring = utils_generate_optstring(long_options,
509 sizeof(long_options) / sizeof(struct option));
510 if (!optstring) {
511 retval = -ENOMEM;
512 goto exit;
513 }
514
515 /* Check for the --config option */
516
517 while ((c = getopt_long(argc, argv, optstring, long_options,
518 &option_index)) != -1) {
519 if (c == '?') {
520 retval = -EINVAL;
521 goto exit;
522 } else if (c != 'f') {
523 continue;
524 }
525
526 if (lttng_is_setuid_setgid()) {
527 WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
528 "-f, --config");
529 } else {
530 config_path = utils_expand_path(optarg);
531 if (!config_path) {
532 ERR("Failed to resolve path: %s", optarg);
533 }
534 }
535 }
536
537 ret = config_get_section_entries(config_path, config_section_name,
538 config_entry_handler, NULL);
539 if (ret) {
540 if (ret > 0) {
541 ERR("Invalid configuration option at line %i", ret);
542 }
543 retval = -1;
544 goto exit;
545 }
546
547 /* Reset getopt's global state */
548 optopt = orig_optopt;
549 optind = orig_optind;
550 while (1) {
551 c = getopt_long(argc, argv, optstring, long_options, &option_index);
552 if (c == -1) {
553 break;
554 }
555
556 ret = set_option(c, optarg, long_options[option_index].name);
557 if (ret < 0) {
558 retval = -1;
559 goto exit;
560 }
561 }
562
563 /* assign default values */
564 if (control_uri == NULL) {
565 ret = asprintf(&default_address,
566 "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
567 DEFAULT_NETWORK_CONTROL_PORT);
568 if (ret < 0) {
569 PERROR("asprintf default data address");
570 retval = -1;
571 goto exit;
572 }
573
574 ret = uri_parse(default_address, &control_uri);
575 free(default_address);
576 if (ret < 0) {
577 ERR("Invalid control URI specified");
578 retval = -1;
579 goto exit;
580 }
581 }
582 if (data_uri == NULL) {
583 ret = asprintf(&default_address,
584 "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
585 DEFAULT_NETWORK_DATA_PORT);
586 if (ret < 0) {
587 PERROR("asprintf default data address");
588 retval = -1;
589 goto exit;
590 }
591
592 ret = uri_parse(default_address, &data_uri);
593 free(default_address);
594 if (ret < 0) {
595 ERR("Invalid data URI specified");
596 retval = -1;
597 goto exit;
598 }
599 }
600 if (live_uri == NULL) {
601 ret = asprintf(&default_address,
602 "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
603 DEFAULT_NETWORK_VIEWER_PORT);
604 if (ret < 0) {
605 PERROR("asprintf default viewer control address");
606 retval = -1;
607 goto exit;
608 }
609
610 ret = uri_parse(default_address, &live_uri);
611 free(default_address);
612 if (ret < 0) {
613 ERR("Invalid viewer control URI specified");
614 retval = -1;
615 goto exit;
616 }
617 }
618 ret = set_fd_pool_size();
619 if (ret) {
620 retval = -1;
621 goto exit;
622 }
623
624 if (!opt_group_output_by_session && !opt_group_output_by_host) {
625 /* Group by host by default */
626 opt_group_output_by_host = 1;
627 }
628
629 exit:
630 free(optstring);
631 return retval;
632 }
633
634 static void print_global_objects(void)
635 {
636 print_viewer_streams();
637 print_relay_streams();
638 print_sessions();
639 }
640
641 /*
642 * Cleanup the daemon
643 */
644 static void relayd_cleanup(void)
645 {
646 print_global_objects();
647
648 DBG("Cleaning up");
649
650 if (viewer_streams_ht)
651 lttng_ht_destroy(viewer_streams_ht);
652 if (relay_streams_ht)
653 lttng_ht_destroy(relay_streams_ht);
654 if (sessions_ht)
655 lttng_ht_destroy(sessions_ht);
656
657 /* free the dynamically allocated opt_output_path */
658 free(opt_output_path);
659
660 /* Close thread quit pipes */
661 (void) fd_tracker_util_pipe_close(the_fd_tracker, thread_quit_pipe);
662
663 uri_free(control_uri);
664 uri_free(data_uri);
665 /* Live URI is freed in the live thread. */
666
667 if (tracing_group_name_override) {
668 free((void *) tracing_group_name);
669 }
670 }
671
672 /*
673 * Write to writable pipe used to notify a thread.
674 */
675 static int notify_thread_pipe(int wpipe)
676 {
677 ssize_t ret;
678
679 ret = lttng_write(wpipe, "!", 1);
680 if (ret < 1) {
681 PERROR("write poll pipe");
682 goto end;
683 }
684 ret = 0;
685 end:
686 return ret;
687 }
688
689 static int notify_health_quit_pipe(int *pipe)
690 {
691 ssize_t ret;
692
693 ret = lttng_write(pipe[1], "4", 1);
694 if (ret < 1) {
695 PERROR("write relay health quit");
696 goto end;
697 }
698 ret = 0;
699 end:
700 return ret;
701 }
702
703 /*
704 * Stop all relayd and relayd-live threads.
705 */
706 int lttng_relay_stop_threads(void)
707 {
708 int retval = 0;
709
710 /* Stopping all threads */
711 DBG("Terminating all threads");
712 if (notify_thread_pipe(thread_quit_pipe[1])) {
713 ERR("write error on thread quit pipe");
714 retval = -1;
715 }
716
717 if (notify_health_quit_pipe(health_quit_pipe)) {
718 ERR("write error on health quit pipe");
719 }
720
721 /* Dispatch thread */
722 CMM_STORE_SHARED(dispatch_thread_exit, 1);
723 futex_nto1_wake(&relay_conn_queue.futex);
724
725 if (relayd_live_stop()) {
726 ERR("Error stopping live threads");
727 retval = -1;
728 }
729 return retval;
730 }
731
732 /*
733 * Signal handler for the daemon
734 *
735 * Simply stop all worker threads, leaving main() return gracefully after
736 * joining all threads and calling cleanup().
737 */
738 static void sighandler(int sig)
739 {
740 switch (sig) {
741 case SIGINT:
742 DBG("SIGINT caught");
743 if (lttng_relay_stop_threads()) {
744 ERR("Error stopping threads");
745 }
746 break;
747 case SIGTERM:
748 DBG("SIGTERM caught");
749 if (lttng_relay_stop_threads()) {
750 ERR("Error stopping threads");
751 }
752 break;
753 case SIGUSR1:
754 CMM_STORE_SHARED(recv_child_signal, 1);
755 break;
756 default:
757 break;
758 }
759 }
760
761 /*
762 * Setup signal handler for :
763 * SIGINT, SIGTERM, SIGPIPE
764 */
765 static int set_signal_handler(void)
766 {
767 int ret = 0;
768 struct sigaction sa;
769 sigset_t sigset;
770
771 if ((ret = sigemptyset(&sigset)) < 0) {
772 PERROR("sigemptyset");
773 return ret;
774 }
775
776 sa.sa_mask = sigset;
777 sa.sa_flags = 0;
778
779 sa.sa_handler = sighandler;
780 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
781 PERROR("sigaction");
782 return ret;
783 }
784
785 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
786 PERROR("sigaction");
787 return ret;
788 }
789
790 if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
791 PERROR("sigaction");
792 return ret;
793 }
794
795 sa.sa_handler = SIG_IGN;
796 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
797 PERROR("sigaction");
798 return ret;
799 }
800
801 DBG("Signal handler set for SIGTERM, SIGUSR1, SIGPIPE and SIGINT");
802
803 return ret;
804 }
805
806 void lttng_relay_notify_ready(void)
807 {
808 /* Notify the parent of the fork() process that we are ready. */
809 if (opt_daemon || opt_background) {
810 if (uatomic_sub_return(&lttng_relay_ready, 1) == 0) {
811 kill(child_ppid, SIGUSR1);
812 }
813 }
814 }
815
816 /*
817 * Init thread quit pipe.
818 *
819 * Return -1 on error or 0 if all pipes are created.
820 */
821 static int init_thread_quit_pipe(void)
822 {
823 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
824 "Quit pipe", thread_quit_pipe);
825 }
826
827 /*
828 * Init health quit pipe.
829 *
830 * Return -1 on error or 0 if all pipes are created.
831 */
832 static int init_health_quit_pipe(void)
833 {
834 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
835 "Health quit pipe", health_quit_pipe);
836 }
837
838 /*
839 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
840 */
841 static int create_named_thread_poll_set(struct lttng_poll_event *events,
842 int size, const char *name)
843 {
844 int ret;
845
846 if (events == NULL || size == 0) {
847 ret = -1;
848 goto error;
849 }
850
851 ret = fd_tracker_util_poll_create(the_fd_tracker,
852 name, events, 1, LTTNG_CLOEXEC);
853
854 /* Add quit pipe */
855 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
856 if (ret < 0) {
857 goto error;
858 }
859
860 return 0;
861
862 error:
863 return ret;
864 }
865
866 /*
867 * Check if the thread quit pipe was triggered.
868 *
869 * Return 1 if it was triggered else 0;
870 */
871 static int check_thread_quit_pipe(int fd, uint32_t events)
872 {
873 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
874 return 1;
875 }
876
877 return 0;
878 }
879
880 static int create_sock(void *data, int *out_fd)
881 {
882 int ret;
883 struct lttcomm_sock *sock = data;
884
885 ret = lttcomm_create_sock(sock);
886 if (ret < 0) {
887 goto end;
888 }
889
890 *out_fd = sock->fd;
891 end:
892 return ret;
893 }
894
895 static int close_sock(void *data, int *in_fd)
896 {
897 struct lttcomm_sock *sock = data;
898
899 return sock->ops->close(sock);
900 }
901
902 static int accept_sock(void *data, int *out_fd)
903 {
904 int ret = 0;
905 /* Socks is an array of in_sock, out_sock. */
906 struct lttcomm_sock **socks = data;
907 struct lttcomm_sock *in_sock = socks[0];
908
909 socks[1] = in_sock->ops->accept(in_sock);
910 if (!socks[1]) {
911 ret = -1;
912 goto end;
913 }
914 *out_fd = socks[1]->fd;
915 end:
916 return ret;
917 }
918
919 /*
920 * Create and init socket from uri.
921 */
922 static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri,
923 const char *name)
924 {
925 int ret, sock_fd;
926 struct lttcomm_sock *sock = NULL;
927 char uri_str[PATH_MAX];
928 char *formated_name = NULL;
929
930 sock = lttcomm_alloc_sock_from_uri(uri);
931 if (sock == NULL) {
932 ERR("Allocating socket");
933 goto error;
934 }
935
936 /*
937 * Don't fail to create the socket if the name can't be built as it is
938 * only used for debugging purposes.
939 */
940 ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
941 uri_str[sizeof(uri_str) - 1] = '\0';
942 if (ret >= 0) {
943 ret = asprintf(&formated_name, "%s socket @ %s", name,
944 uri_str);
945 if (ret < 0) {
946 formated_name = NULL;
947 }
948 }
949
950 ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
951 (const char **) (formated_name ? &formated_name : NULL),
952 1, create_sock, sock);
953 free(formated_name);
954 DBG("Listening on %s socket %d", name, sock->fd);
955
956 ret = sock->ops->bind(sock);
957 if (ret < 0) {
958 goto error;
959 }
960
961 ret = sock->ops->listen(sock, -1);
962 if (ret < 0) {
963 goto error;
964
965 }
966
967 return sock;
968
969 error:
970 if (sock) {
971 lttcomm_destroy_sock(sock);
972 }
973 return NULL;
974 }
975
976 static
977 struct lttcomm_sock *accept_relayd_sock(struct lttcomm_sock *listening_sock,
978 const char *name)
979 {
980 int out_fd, ret;
981 struct lttcomm_sock *socks[2] = { listening_sock, NULL };
982 struct lttcomm_sock *new_sock = NULL;
983
984 ret = fd_tracker_open_unsuspendable_fd(
985 the_fd_tracker, &out_fd,
986 (const char **) &name,
987 1, accept_sock, &socks);
988 if (ret) {
989 goto end;
990 }
991 new_sock = socks[1];
992 DBG("%s accepted, socket %d", name, new_sock->fd);
993 end:
994 return new_sock;
995 }
996
997 /*
998 * This thread manages the listening for new connections on the network
999 */
1000 static void *relay_thread_listener(void *data)
1001 {
1002 int i, ret, pollfd, err = -1;
1003 uint32_t revents, nb_fd;
1004 struct lttng_poll_event events;
1005 struct lttcomm_sock *control_sock, *data_sock;
1006
1007 DBG("[thread] Relay listener started");
1008
1009 health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
1010
1011 health_code_update();
1012
1013 control_sock = relay_socket_create(control_uri, "Control listener");
1014 if (!control_sock) {
1015 goto error_sock_control;
1016 }
1017
1018 data_sock = relay_socket_create(data_uri, "Data listener");
1019 if (!data_sock) {
1020 goto error_sock_relay;
1021 }
1022
1023 /*
1024 * Pass 3 as size here for the thread quit pipe, control and
1025 * data socket.
1026 */
1027 ret = create_named_thread_poll_set(&events, 3, "Listener thread epoll");
1028 if (ret < 0) {
1029 goto error_create_poll;
1030 }
1031
1032 /* Add the control socket */
1033 ret = lttng_poll_add(&events, control_sock->fd, LPOLLIN | LPOLLRDHUP);
1034 if (ret < 0) {
1035 goto error_poll_add;
1036 }
1037
1038 /* Add the data socket */
1039 ret = lttng_poll_add(&events, data_sock->fd, LPOLLIN | LPOLLRDHUP);
1040 if (ret < 0) {
1041 goto error_poll_add;
1042 }
1043
1044 lttng_relay_notify_ready();
1045
1046 if (testpoint(relayd_thread_listener)) {
1047 goto error_testpoint;
1048 }
1049
1050 while (1) {
1051 health_code_update();
1052
1053 DBG("Listener accepting connections");
1054
1055 restart:
1056 health_poll_entry();
1057 ret = lttng_poll_wait(&events, -1);
1058 health_poll_exit();
1059 if (ret < 0) {
1060 /*
1061 * Restart interrupted system call.
1062 */
1063 if (errno == EINTR) {
1064 goto restart;
1065 }
1066 goto error;
1067 }
1068
1069 nb_fd = ret;
1070
1071 DBG("Relay new connection received");
1072 for (i = 0; i < nb_fd; i++) {
1073 health_code_update();
1074
1075 /* Fetch once the poll data */
1076 revents = LTTNG_POLL_GETEV(&events, i);
1077 pollfd = LTTNG_POLL_GETFD(&events, i);
1078
1079 if (!revents) {
1080 /*
1081 * No activity for this FD (poll
1082 * implementation).
1083 */
1084 continue;
1085 }
1086
1087 /* Thread quit pipe has been closed. Killing thread. */
1088 ret = check_thread_quit_pipe(pollfd, revents);
1089 if (ret) {
1090 err = 0;
1091 goto exit;
1092 }
1093
1094 if (revents & LPOLLIN) {
1095 /*
1096 * A new connection is requested, therefore a
1097 * sessiond/consumerd connection is allocated in
1098 * this thread, enqueued to a global queue and
1099 * dequeued (and freed) in the worker thread.
1100 */
1101 int val = 1;
1102 struct relay_connection *new_conn;
1103 struct lttcomm_sock *newsock = NULL;
1104 enum connection_type type;
1105
1106 if (pollfd == data_sock->fd) {
1107 type = RELAY_DATA;
1108 newsock = accept_relayd_sock(data_sock,
1109 "Data socket to relayd");
1110 } else {
1111 assert(pollfd == control_sock->fd);
1112 type = RELAY_CONTROL;
1113 newsock = accept_relayd_sock(control_sock,
1114 "Control socket to relayd");
1115 }
1116 if (!newsock) {
1117 PERROR("accepting sock");
1118 goto error;
1119 }
1120
1121 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val,
1122 sizeof(val));
1123 if (ret < 0) {
1124 PERROR("setsockopt inet");
1125 lttcomm_destroy_sock(newsock);
1126 goto error;
1127 }
1128
1129 ret = socket_apply_keep_alive_config(newsock->fd);
1130 if (ret < 0) {
1131 ERR("Failed to apply TCP keep-alive configuration on socket (%i)",
1132 newsock->fd);
1133 lttcomm_destroy_sock(newsock);
1134 goto error;
1135 }
1136
1137 new_conn = connection_create(newsock, type);
1138 if (!new_conn) {
1139 lttcomm_destroy_sock(newsock);
1140 goto error;
1141 }
1142
1143 /* Enqueue request for the dispatcher thread. */
1144 cds_wfcq_enqueue(&relay_conn_queue.head, &relay_conn_queue.tail,
1145 &new_conn->qnode);
1146
1147 /*
1148 * Wake the dispatch queue futex.
1149 * Implicit memory barrier with the
1150 * exchange in cds_wfcq_enqueue.
1151 */
1152 futex_nto1_wake(&relay_conn_queue.futex);
1153 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
1154 ERR("socket poll error");
1155 goto error;
1156 } else {
1157 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
1158 goto error;
1159 }
1160 }
1161 }
1162
1163 exit:
1164 error:
1165 error_poll_add:
1166 error_testpoint:
1167 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
1168 error_create_poll:
1169 if (data_sock->fd >= 0) {
1170 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
1171 &data_sock->fd, 1, close_sock,
1172 data_sock);
1173 if (ret) {
1174 PERROR("close");
1175 }
1176 }
1177 lttcomm_destroy_sock(data_sock);
1178 error_sock_relay:
1179 if (control_sock->fd >= 0) {
1180 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
1181 &control_sock->fd, 1, close_sock,
1182 control_sock);
1183 if (ret) {
1184 PERROR("close");
1185 }
1186 }
1187 lttcomm_destroy_sock(control_sock);
1188 error_sock_control:
1189 if (err) {
1190 health_error();
1191 ERR("Health error occurred in %s", __func__);
1192 }
1193 health_unregister(health_relayd);
1194 DBG("Relay listener thread cleanup complete");
1195 lttng_relay_stop_threads();
1196 return NULL;
1197 }
1198
1199 /*
1200 * This thread manages the dispatching of the requests to worker threads
1201 */
1202 static void *relay_thread_dispatcher(void *data)
1203 {
1204 int err = -1;
1205 ssize_t ret;
1206 struct cds_wfcq_node *node;
1207 struct relay_connection *new_conn = NULL;
1208
1209 DBG("[thread] Relay dispatcher started");
1210
1211 health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
1212
1213 if (testpoint(relayd_thread_dispatcher)) {
1214 goto error_testpoint;
1215 }
1216
1217 health_code_update();
1218
1219 for (;;) {
1220 health_code_update();
1221
1222 /* Atomically prepare the queue futex */
1223 futex_nto1_prepare(&relay_conn_queue.futex);
1224
1225 if (CMM_LOAD_SHARED(dispatch_thread_exit)) {
1226 break;
1227 }
1228
1229 do {
1230 health_code_update();
1231
1232 /* Dequeue commands */
1233 node = cds_wfcq_dequeue_blocking(&relay_conn_queue.head,
1234 &relay_conn_queue.tail);
1235 if (node == NULL) {
1236 DBG("Woken up but nothing in the relay command queue");
1237 /* Continue thread execution */
1238 break;
1239 }
1240 new_conn = caa_container_of(node, struct relay_connection, qnode);
1241
1242 DBG("Dispatching request waiting on sock %d", new_conn->sock->fd);
1243
1244 /*
1245 * Inform worker thread of the new request. This
1246 * call is blocking so we can be assured that
1247 * the data will be read at some point in time
1248 * or wait to the end of the world :)
1249 */
1250 ret = lttng_write(relay_conn_pipe[1], &new_conn, sizeof(new_conn));
1251 if (ret < 0) {
1252 PERROR("write connection pipe");
1253 connection_put(new_conn);
1254 goto error;
1255 }
1256 } while (node != NULL);
1257
1258 /* Futex wait on queue. Blocking call on futex() */
1259 health_poll_entry();
1260 futex_nto1_wait(&relay_conn_queue.futex);
1261 health_poll_exit();
1262 }
1263
1264 /* Normal exit, no error */
1265 err = 0;
1266
1267 error:
1268 error_testpoint:
1269 if (err) {
1270 health_error();
1271 ERR("Health error occurred in %s", __func__);
1272 }
1273 health_unregister(health_relayd);
1274 DBG("Dispatch thread dying");
1275 lttng_relay_stop_threads();
1276 return NULL;
1277 }
1278
1279 /*
1280 * Set index data from the control port to a given index object.
1281 */
1282 static int set_index_control_data(struct relay_index *index,
1283 struct lttcomm_relayd_index *data,
1284 struct relay_connection *conn)
1285 {
1286 struct ctf_packet_index index_data;
1287
1288 /*
1289 * The index on disk is encoded in big endian.
1290 */
1291 index_data.packet_size = htobe64(data->packet_size);
1292 index_data.content_size = htobe64(data->content_size);
1293 index_data.timestamp_begin = htobe64(data->timestamp_begin);
1294 index_data.timestamp_end = htobe64(data->timestamp_end);
1295 index_data.events_discarded = htobe64(data->events_discarded);
1296 index_data.stream_id = htobe64(data->stream_id);
1297
1298 if (conn->minor >= 8) {
1299 index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
1300 index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
1301 }
1302
1303 return relay_index_set_data(index, &index_data);
1304 }
1305
1306 /*
1307 * Handle the RELAYD_CREATE_SESSION command.
1308 *
1309 * On success, send back the session id or else return a negative value.
1310 */
1311 static int relay_create_session(const struct lttcomm_relayd_hdr *recv_hdr,
1312 struct relay_connection *conn,
1313 const struct lttng_buffer_view *payload)
1314 {
1315 int ret = 0;
1316 ssize_t send_ret;
1317 struct relay_session *session;
1318 struct lttcomm_relayd_status_session reply;
1319 char session_name[LTTNG_NAME_MAX];
1320 char hostname[LTTNG_HOST_NAME_MAX];
1321 uint32_t live_timer = 0;
1322 bool snapshot = false;
1323
1324 memset(session_name, 0, LTTNG_NAME_MAX);
1325 memset(hostname, 0, LTTNG_HOST_NAME_MAX);
1326
1327 memset(&reply, 0, sizeof(reply));
1328
1329 switch (conn->minor) {
1330 case 1:
1331 case 2:
1332 case 3:
1333 break;
1334 case 4: /* LTTng sessiond 2.4 */
1335 default:
1336 ret = cmd_create_session_2_4(payload, session_name,
1337 hostname, &live_timer, &snapshot);
1338 }
1339 if (ret < 0) {
1340 goto send_reply;
1341 }
1342
1343 session = session_create(session_name, hostname, live_timer,
1344 snapshot, conn->major, conn->minor);
1345 if (!session) {
1346 ret = -1;
1347 goto send_reply;
1348 }
1349 assert(!conn->session);
1350 conn->session = session;
1351 DBG("Created session %" PRIu64, session->id);
1352
1353 reply.session_id = htobe64(session->id);
1354
1355 send_reply:
1356 if (ret < 0) {
1357 reply.ret_code = htobe32(LTTNG_ERR_FATAL);
1358 } else {
1359 reply.ret_code = htobe32(LTTNG_OK);
1360 }
1361
1362 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
1363 if (send_ret < (ssize_t) sizeof(reply)) {
1364 ERR("Failed to send \"create session\" command reply (ret = %zd)",
1365 send_ret);
1366 ret = -1;
1367 }
1368
1369 return ret;
1370 }
1371
1372 /*
1373 * When we have received all the streams and the metadata for a channel,
1374 * we make them visible to the viewer threads.
1375 */
1376 static void publish_connection_local_streams(struct relay_connection *conn)
1377 {
1378 struct relay_stream *stream;
1379 struct relay_session *session = conn->session;
1380
1381 /*
1382 * We publish all streams belonging to a session atomically wrt
1383 * session lock.
1384 */
1385 pthread_mutex_lock(&session->lock);
1386 rcu_read_lock();
1387 cds_list_for_each_entry_rcu(stream, &session->recv_list,
1388 recv_node) {
1389 stream_publish(stream);
1390 }
1391 rcu_read_unlock();
1392
1393 /*
1394 * Inform the viewer that there are new streams in the session.
1395 */
1396 if (session->viewer_attached) {
1397 uatomic_set(&session->new_streams, 1);
1398 }
1399 pthread_mutex_unlock(&session->lock);
1400 }
1401
1402 /*
1403 * relay_add_stream: allocate a new stream for a session
1404 */
1405 static int relay_add_stream(const struct lttcomm_relayd_hdr *recv_hdr,
1406 struct relay_connection *conn,
1407 const struct lttng_buffer_view *payload)
1408 {
1409 int ret;
1410 ssize_t send_ret;
1411 struct relay_session *session = conn->session;
1412 struct relay_stream *stream = NULL;
1413 struct lttcomm_relayd_status_stream reply;
1414 struct ctf_trace *trace = NULL;
1415 uint64_t stream_handle = -1ULL;
1416 char *path_name = NULL, *channel_name = NULL;
1417 uint64_t tracefile_size = 0, tracefile_count = 0;
1418
1419 if (!session || !conn->version_check_done) {
1420 ERR("Trying to add a stream before version check");
1421 ret = -1;
1422 goto end_no_session;
1423 }
1424
1425 switch (session->minor) {
1426 case 1: /* LTTng sessiond 2.1. Allocates path_name and channel_name. */
1427 ret = cmd_recv_stream_2_1(payload, &path_name,
1428 &channel_name, session);
1429 break;
1430 case 2: /* LTTng sessiond 2.2. Allocates path_name and channel_name. */
1431 default:
1432 ret = cmd_recv_stream_2_2(payload, &path_name,
1433 &channel_name, &tracefile_size, &tracefile_count,
1434 session);
1435 break;
1436 }
1437 if (ret < 0) {
1438 goto send_reply;
1439 }
1440
1441 trace = ctf_trace_get_by_path_or_create(session, path_name);
1442 if (!trace) {
1443 goto send_reply;
1444 }
1445 /* This stream here has one reference on the trace. */
1446
1447 pthread_mutex_lock(&last_relay_stream_id_lock);
1448 stream_handle = ++last_relay_stream_id;
1449 pthread_mutex_unlock(&last_relay_stream_id_lock);
1450
1451 /* We pass ownership of path_name and channel_name. */
1452 stream = stream_create(trace, stream_handle, path_name,
1453 channel_name, tracefile_size, tracefile_count);
1454 path_name = NULL;
1455 channel_name = NULL;
1456
1457 /*
1458 * Streams are the owners of their trace. Reference to trace is
1459 * kept within stream_create().
1460 */
1461 ctf_trace_put(trace);
1462
1463 send_reply:
1464 memset(&reply, 0, sizeof(reply));
1465 reply.handle = htobe64(stream_handle);
1466 if (!stream) {
1467 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1468 } else {
1469 reply.ret_code = htobe32(LTTNG_OK);
1470 }
1471
1472 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
1473 sizeof(struct lttcomm_relayd_status_stream), 0);
1474 if (send_ret < (ssize_t) sizeof(reply)) {
1475 ERR("Failed to send \"add stream\" command reply (ret = %zd)",
1476 send_ret);
1477 ret = -1;
1478 }
1479
1480 end_no_session:
1481 free(path_name);
1482 free(channel_name);
1483 return ret;
1484 }
1485
1486 /*
1487 * relay_close_stream: close a specific stream
1488 */
1489 static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr,
1490 struct relay_connection *conn,
1491 const struct lttng_buffer_view *payload)
1492 {
1493 int ret;
1494 ssize_t send_ret;
1495 struct relay_session *session = conn->session;
1496 struct lttcomm_relayd_close_stream stream_info;
1497 struct lttcomm_relayd_generic_reply reply;
1498 struct relay_stream *stream;
1499
1500 DBG("Close stream received");
1501
1502 if (!session || !conn->version_check_done) {
1503 ERR("Trying to close a stream before version check");
1504 ret = -1;
1505 goto end_no_session;
1506 }
1507
1508 if (payload->size < sizeof(stream_info)) {
1509 ERR("Unexpected payload size in \"relay_close_stream\": expected >= %zu bytes, got %zu bytes",
1510 sizeof(stream_info), payload->size);
1511 ret = -1;
1512 goto end_no_session;
1513 }
1514 memcpy(&stream_info, payload->data, sizeof(stream_info));
1515 stream_info.stream_id = be64toh(stream_info.stream_id);
1516 stream_info.last_net_seq_num = be64toh(stream_info.last_net_seq_num);
1517
1518 stream = stream_get_by_id(stream_info.stream_id);
1519 if (!stream) {
1520 ret = -1;
1521 goto end;
1522 }
1523
1524 /*
1525 * Set last_net_seq_num before the close flag. Required by data
1526 * pending check.
1527 */
1528 pthread_mutex_lock(&stream->lock);
1529 stream->last_net_seq_num = stream_info.last_net_seq_num;
1530 pthread_mutex_unlock(&stream->lock);
1531
1532 /*
1533 * This is one of the conditions which may trigger a stream close
1534 * with the others being:
1535 * 1) A close command is received for a stream
1536 * 2) The control connection owning the stream is closed
1537 * 3) We have received all of the stream's data _after_ a close
1538 * request.
1539 */
1540 try_stream_close(stream);
1541 if (stream->is_metadata) {
1542 struct relay_viewer_stream *vstream;
1543
1544 vstream = viewer_stream_get_by_id(stream->stream_handle);
1545 if (vstream) {
1546 if (vstream->metadata_sent == stream->metadata_received) {
1547 /*
1548 * Since all the metadata has been sent to the
1549 * viewer and that we have a request to close
1550 * its stream, we can safely teardown the
1551 * corresponding metadata viewer stream.
1552 */
1553 viewer_stream_put(vstream);
1554 }
1555 /* Put local reference. */
1556 viewer_stream_put(vstream);
1557 }
1558 }
1559 stream_put(stream);
1560 ret = 0;
1561
1562 end:
1563 memset(&reply, 0, sizeof(reply));
1564 if (ret < 0) {
1565 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1566 } else {
1567 reply.ret_code = htobe32(LTTNG_OK);
1568 }
1569 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
1570 sizeof(struct lttcomm_relayd_generic_reply), 0);
1571 if (send_ret < (ssize_t) sizeof(reply)) {
1572 ERR("Failed to send \"close stream\" command reply (ret = %zd)",
1573 send_ret);
1574 ret = -1;
1575 }
1576
1577 end_no_session:
1578 return ret;
1579 }
1580
1581 /*
1582 * relay_reset_metadata: reset a metadata stream
1583 */
1584 static
1585 int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
1586 struct relay_connection *conn,
1587 const struct lttng_buffer_view *payload)
1588 {
1589 int ret;
1590 ssize_t send_ret;
1591 struct relay_session *session = conn->session;
1592 struct lttcomm_relayd_reset_metadata stream_info;
1593 struct lttcomm_relayd_generic_reply reply;
1594 struct relay_stream *stream;
1595
1596 DBG("Reset metadata received");
1597
1598 if (!session || !conn->version_check_done) {
1599 ERR("Trying to reset a metadata stream before version check");
1600 ret = -1;
1601 goto end_no_session;
1602 }
1603
1604 if (payload->size < sizeof(stream_info)) {
1605 ERR("Unexpected payload size in \"relay_reset_metadata\": expected >= %zu bytes, got %zu bytes",
1606 sizeof(stream_info), payload->size);
1607 ret = -1;
1608 goto end_no_session;
1609 }
1610 memcpy(&stream_info, payload->data, sizeof(stream_info));
1611 stream_info.stream_id = be64toh(stream_info.stream_id);
1612 stream_info.version = be64toh(stream_info.version);
1613
1614 DBG("Update metadata to version %" PRIu64, stream_info.version);
1615
1616 /* Unsupported for live sessions for now. */
1617 if (session->live_timer != 0) {
1618 ret = -1;
1619 goto end;
1620 }
1621
1622 stream = stream_get_by_id(stream_info.stream_id);
1623 if (!stream) {
1624 ret = -1;
1625 goto end;
1626 }
1627 pthread_mutex_lock(&stream->lock);
1628 if (!stream->is_metadata) {
1629 ret = -1;
1630 goto end_unlock;
1631 }
1632
1633 ret = stream_fd_rotate(stream->stream_fd,
1634 stream->path_name, stream->channel_name, 0, 0, NULL);
1635 if (ret < 0) {
1636 ERR("Failed to rotate metadata file %s of channel %s",
1637 stream->path_name, stream->channel_name);
1638 goto end_unlock;
1639 }
1640
1641 end_unlock:
1642 pthread_mutex_unlock(&stream->lock);
1643 stream_put(stream);
1644
1645 end:
1646 memset(&reply, 0, sizeof(reply));
1647 if (ret < 0) {
1648 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1649 } else {
1650 reply.ret_code = htobe32(LTTNG_OK);
1651 }
1652 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
1653 sizeof(struct lttcomm_relayd_generic_reply), 0);
1654 if (send_ret < (ssize_t) sizeof(reply)) {
1655 ERR("Failed to send \"reset metadata\" command reply (ret = %zd)",
1656 send_ret);
1657 ret = -1;
1658 }
1659
1660 end_no_session:
1661 return ret;
1662 }
1663
1664 /*
1665 * relay_unknown_command: send -1 if received unknown command
1666 */
1667 static void relay_unknown_command(struct relay_connection *conn)
1668 {
1669 struct lttcomm_relayd_generic_reply reply;
1670 ssize_t send_ret;
1671
1672 memset(&reply, 0, sizeof(reply));
1673 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1674 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
1675 if (send_ret < sizeof(reply)) {
1676 ERR("Failed to send \"unknown command\" command reply (ret = %zd)", send_ret);
1677 }
1678 }
1679
1680 /*
1681 * relay_start: send an acknowledgment to the client to tell if we are
1682 * ready to receive data. We are ready if a session is established.
1683 */
1684 static int relay_start(const struct lttcomm_relayd_hdr *recv_hdr,
1685 struct relay_connection *conn,
1686 const struct lttng_buffer_view *payload)
1687 {
1688 int ret = 0;
1689 ssize_t send_ret;
1690 struct lttcomm_relayd_generic_reply reply;
1691 struct relay_session *session = conn->session;
1692
1693 if (!session) {
1694 DBG("Trying to start the streaming without a session established");
1695 ret = htobe32(LTTNG_ERR_UNK);
1696 }
1697
1698 memset(&reply, 0, sizeof(reply));
1699 reply.ret_code = htobe32(LTTNG_OK);
1700 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
1701 sizeof(reply), 0);
1702 if (send_ret < (ssize_t) sizeof(reply)) {
1703 ERR("Failed to send \"relay_start\" command reply (ret = %zd)",
1704 send_ret);
1705 ret = -1;
1706 }
1707
1708 return ret;
1709 }
1710
1711 /*
1712 * Append padding to the file pointed by the file descriptor fd.
1713 */
1714 static int write_padding_to_file(int fd, uint32_t size)
1715 {
1716 ssize_t ret = 0;
1717 char *zeros;
1718
1719 if (size == 0) {
1720 goto end;
1721 }
1722
1723 zeros = zmalloc(size);
1724 if (zeros == NULL) {
1725 PERROR("zmalloc zeros for padding");
1726 ret = -1;
1727 goto end;
1728 }
1729
1730 ret = lttng_write(fd, zeros, size);
1731 if (ret < size) {
1732 PERROR("write padding to file");
1733 }
1734
1735 free(zeros);
1736
1737 end:
1738 return ret;
1739 }
1740
1741 /*
1742 * relay_recv_metadata: receive the metadata for the session.
1743 */
1744 static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
1745 struct relay_connection *conn,
1746 const struct lttng_buffer_view *payload)
1747 {
1748 int ret = 0;
1749 ssize_t size_ret;
1750 struct relay_session *session = conn->session;
1751 struct lttcomm_relayd_metadata_payload metadata_payload_header;
1752 struct relay_stream *metadata_stream;
1753 uint64_t metadata_payload_size;
1754 int metadata_fd = -1;
1755
1756 if (!session) {
1757 ERR("Metadata sent before version check");
1758 ret = -1;
1759 goto end;
1760 }
1761
1762 if (recv_hdr->data_size < sizeof(struct lttcomm_relayd_metadata_payload)) {
1763 ERR("Incorrect data size");
1764 ret = -1;
1765 goto end;
1766 }
1767 metadata_payload_size = recv_hdr->data_size -
1768 sizeof(struct lttcomm_relayd_metadata_payload);
1769
1770 memcpy(&metadata_payload_header, payload->data,
1771 sizeof(metadata_payload_header));
1772 metadata_payload_header.stream_id = be64toh(
1773 metadata_payload_header.stream_id);
1774 metadata_payload_header.padding_size = be32toh(
1775 metadata_payload_header.padding_size);
1776
1777 metadata_stream = stream_get_by_id(metadata_payload_header.stream_id);
1778 if (!metadata_stream) {
1779 ret = -1;
1780 goto end;
1781 }
1782
1783 pthread_mutex_lock(&metadata_stream->lock);
1784
1785 metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
1786 if (metadata_fd < 0) {
1787 goto end_put;
1788 }
1789 size_ret = lttng_write(metadata_fd,
1790 payload->data + sizeof(metadata_payload_header),
1791 metadata_payload_size);
1792 if (size_ret < metadata_payload_size) {
1793 ERR("Relay error writing metadata on file");
1794 ret = -1;
1795 goto end_put_fd;
1796 }
1797
1798 size_ret = write_padding_to_file(metadata_fd,
1799 metadata_payload_header.padding_size);
1800 if (size_ret < (int64_t) metadata_payload_header.padding_size) {
1801 ret = -1;
1802 goto end_put_fd;
1803 }
1804
1805 metadata_stream->metadata_received +=
1806 metadata_payload_size + metadata_payload_header.padding_size;
1807 DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
1808 metadata_stream->metadata_received);
1809
1810 end_put_fd:
1811 stream_fd_put_fd(metadata_stream->stream_fd);
1812 end_put:
1813 pthread_mutex_unlock(&metadata_stream->lock);
1814 stream_put(metadata_stream);
1815 end:
1816 return ret;
1817 }
1818
1819 /*
1820 * relay_send_version: send relayd version number
1821 */
1822 static int relay_send_version(const struct lttcomm_relayd_hdr *recv_hdr,
1823 struct relay_connection *conn,
1824 const struct lttng_buffer_view *payload)
1825 {
1826 int ret;
1827 ssize_t send_ret;
1828 struct lttcomm_relayd_version reply, msg;
1829 bool compatible = true;
1830
1831 conn->version_check_done = true;
1832
1833 /* Get version from the other side. */
1834 if (payload->size < sizeof(msg)) {
1835 ERR("Unexpected payload size in \"relay_send_version\": expected >= %zu bytes, got %zu bytes",
1836 sizeof(msg), payload->size);
1837 ret = -1;
1838 goto end;
1839 }
1840
1841 memcpy(&msg, payload->data, sizeof(msg));
1842 msg.major = be32toh(msg.major);
1843 msg.minor = be32toh(msg.minor);
1844
1845 memset(&reply, 0, sizeof(reply));
1846 reply.major = RELAYD_VERSION_COMM_MAJOR;
1847 reply.minor = RELAYD_VERSION_COMM_MINOR;
1848
1849 /* Major versions must be the same */
1850 if (reply.major != msg.major) {
1851 DBG("Incompatible major versions (%u vs %u), deleting session",
1852 reply.major, msg.major);
1853 compatible = false;
1854 }
1855
1856 conn->major = reply.major;
1857 /* We adapt to the lowest compatible version */
1858 if (reply.minor <= msg.minor) {
1859 conn->minor = reply.minor;
1860 } else {
1861 conn->minor = msg.minor;
1862 }
1863
1864 reply.major = htobe32(reply.major);
1865 reply.minor = htobe32(reply.minor);
1866 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
1867 sizeof(reply), 0);
1868 if (send_ret < (ssize_t) sizeof(reply)) {
1869 ERR("Failed to send \"send version\" command reply (ret = %zd)",
1870 send_ret);
1871 ret = -1;
1872 goto end;
1873 } else {
1874 ret = 0;
1875 }
1876
1877 if (!compatible) {
1878 ret = -1;
1879 goto end;
1880 }
1881
1882 DBG("Version check done using protocol %u.%u", conn->major,
1883 conn->minor);
1884
1885 end:
1886 return ret;
1887 }
1888
1889 /*
1890 * Check for data pending for a given stream id from the session daemon.
1891 */
1892 static int relay_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
1893 struct relay_connection *conn,
1894 const struct lttng_buffer_view *payload)
1895 {
1896 struct relay_session *session = conn->session;
1897 struct lttcomm_relayd_data_pending msg;
1898 struct lttcomm_relayd_generic_reply reply;
1899 struct relay_stream *stream;
1900 ssize_t send_ret;
1901 int ret;
1902
1903 DBG("Data pending command received");
1904
1905 if (!session || !conn->version_check_done) {
1906 ERR("Trying to check for data before version check");
1907 ret = -1;
1908 goto end_no_session;
1909 }
1910
1911 if (payload->size < sizeof(msg)) {
1912 ERR("Unexpected payload size in \"relay_data_pending\": expected >= %zu bytes, got %zu bytes",
1913 sizeof(msg), payload->size);
1914 ret = -1;
1915 goto end_no_session;
1916 }
1917 memcpy(&msg, payload->data, sizeof(msg));
1918 msg.stream_id = be64toh(msg.stream_id);
1919 msg.last_net_seq_num = be64toh(msg.last_net_seq_num);
1920
1921 stream = stream_get_by_id(msg.stream_id);
1922 if (stream == NULL) {
1923 ret = -1;
1924 goto end;
1925 }
1926
1927 pthread_mutex_lock(&stream->lock);
1928
1929 DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
1930 " and last_seq %" PRIu64, msg.stream_id,
1931 stream->prev_seq, msg.last_net_seq_num);
1932
1933 /* Avoid wrapping issue */
1934 if (((int64_t) (stream->prev_seq - msg.last_net_seq_num)) >= 0) {
1935 /* Data has in fact been written and is NOT pending */
1936 ret = 0;
1937 } else {
1938 /* Data still being streamed thus pending */
1939 ret = 1;
1940 }
1941
1942 stream->data_pending_check_done = true;
1943 pthread_mutex_unlock(&stream->lock);
1944
1945 stream_put(stream);
1946 end:
1947
1948 memset(&reply, 0, sizeof(reply));
1949 reply.ret_code = htobe32(ret);
1950 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
1951 if (send_ret < (ssize_t) sizeof(reply)) {
1952 ERR("Failed to send \"data pending\" command reply (ret = %zd)",
1953 send_ret);
1954 ret = -1;
1955 }
1956
1957 end_no_session:
1958 return ret;
1959 }
1960
1961 /*
1962 * Wait for the control socket to reach a quiescent state.
1963 *
1964 * Note that for now, when receiving this command from the session
1965 * daemon, this means that every subsequent commands or data received on
1966 * the control socket has been handled. So, this is why we simply return
1967 * OK here.
1968 */
1969 static int relay_quiescent_control(const struct lttcomm_relayd_hdr *recv_hdr,
1970 struct relay_connection *conn,
1971 const struct lttng_buffer_view *payload)
1972 {
1973 int ret;
1974 ssize_t send_ret;
1975 struct relay_stream *stream;
1976 struct lttcomm_relayd_quiescent_control msg;
1977 struct lttcomm_relayd_generic_reply reply;
1978
1979 DBG("Checking quiescent state on control socket");
1980
1981 if (!conn->session || !conn->version_check_done) {
1982 ERR("Trying to check for data before version check");
1983 ret = -1;
1984 goto end_no_session;
1985 }
1986
1987 if (payload->size < sizeof(msg)) {
1988 ERR("Unexpected payload size in \"relay_quiescent_control\": expected >= %zu bytes, got %zu bytes",
1989 sizeof(msg), payload->size);
1990 ret = -1;
1991 goto end_no_session;
1992 }
1993 memcpy(&msg, payload->data, sizeof(msg));
1994 msg.stream_id = be64toh(msg.stream_id);
1995
1996 stream = stream_get_by_id(msg.stream_id);
1997 if (!stream) {
1998 goto reply;
1999 }
2000 pthread_mutex_lock(&stream->lock);
2001 stream->data_pending_check_done = true;
2002 pthread_mutex_unlock(&stream->lock);
2003
2004 DBG("Relay quiescent control pending flag set to %" PRIu64, msg.stream_id);
2005 stream_put(stream);
2006 reply:
2007 memset(&reply, 0, sizeof(reply));
2008 reply.ret_code = htobe32(LTTNG_OK);
2009 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
2010 if (send_ret < (ssize_t) sizeof(reply)) {
2011 ERR("Failed to send \"quiescent control\" command reply (ret = %zd)",
2012 send_ret);
2013 ret = -1;
2014 } else {
2015 ret = 0;
2016 }
2017
2018 end_no_session:
2019 return ret;
2020 }
2021
2022 /*
2023 * Initialize a data pending command. This means that a consumer is about
2024 * to ask for data pending for each stream it holds. Simply iterate over
2025 * all streams of a session and set the data_pending_check_done flag.
2026 *
2027 * This command returns to the client a LTTNG_OK code.
2028 */
2029 static int relay_begin_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
2030 struct relay_connection *conn,
2031 const struct lttng_buffer_view *payload)
2032 {
2033 int ret;
2034 ssize_t send_ret;
2035 struct lttng_ht_iter iter;
2036 struct lttcomm_relayd_begin_data_pending msg;
2037 struct lttcomm_relayd_generic_reply reply;
2038 struct relay_stream *stream;
2039
2040 assert(recv_hdr);
2041 assert(conn);
2042
2043 DBG("Init streams for data pending");
2044
2045 if (!conn->session || !conn->version_check_done) {
2046 ERR("Trying to check for data before version check");
2047 ret = -1;
2048 goto end_no_session;
2049 }
2050
2051 if (payload->size < sizeof(msg)) {
2052 ERR("Unexpected payload size in \"relay_begin_data_pending\": expected >= %zu bytes, got %zu bytes",
2053 sizeof(msg), payload->size);
2054 ret = -1;
2055 goto end_no_session;
2056 }
2057 memcpy(&msg, payload->data, sizeof(msg));
2058 msg.session_id = be64toh(msg.session_id);
2059
2060 /*
2061 * Iterate over all streams to set the begin data pending flag.
2062 * For now, the streams are indexed by stream handle so we have
2063 * to iterate over all streams to find the one associated with
2064 * the right session_id.
2065 */
2066 rcu_read_lock();
2067 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
2068 node.node) {
2069 if (!stream_get(stream)) {
2070 continue;
2071 }
2072 if (stream->trace->session->id == msg.session_id) {
2073 pthread_mutex_lock(&stream->lock);
2074 stream->data_pending_check_done = false;
2075 pthread_mutex_unlock(&stream->lock);
2076 DBG("Set begin data pending flag to stream %" PRIu64,
2077 stream->stream_handle);
2078 }
2079 stream_put(stream);
2080 }
2081 rcu_read_unlock();
2082
2083 memset(&reply, 0, sizeof(reply));
2084 /* All good, send back reply. */
2085 reply.ret_code = htobe32(LTTNG_OK);
2086
2087 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
2088 if (send_ret < (ssize_t) sizeof(reply)) {
2089 ERR("Failed to send \"begin data pending\" command reply (ret = %zd)",
2090 send_ret);
2091 ret = -1;
2092 } else {
2093 ret = 0;
2094 }
2095
2096 end_no_session:
2097 return ret;
2098 }
2099
2100 /*
2101 * End data pending command. This will check, for a given session id, if
2102 * each stream associated with it has its data_pending_check_done flag
2103 * set. If not, this means that the client lost track of the stream but
2104 * the data is still being streamed on our side. In this case, we inform
2105 * the client that data is in flight.
2106 *
2107 * Return to the client if there is data in flight or not with a ret_code.
2108 */
2109 static int relay_end_data_pending(const struct lttcomm_relayd_hdr *recv_hdr,
2110 struct relay_connection *conn,
2111 const struct lttng_buffer_view *payload)
2112 {
2113 int ret;
2114 ssize_t send_ret;
2115 struct lttng_ht_iter iter;
2116 struct lttcomm_relayd_end_data_pending msg;
2117 struct lttcomm_relayd_generic_reply reply;
2118 struct relay_stream *stream;
2119 uint32_t is_data_inflight = 0;
2120
2121 DBG("End data pending command");
2122
2123 if (!conn->session || !conn->version_check_done) {
2124 ERR("Trying to check for data before version check");
2125 ret = -1;
2126 goto end_no_session;
2127 }
2128
2129 if (payload->size < sizeof(msg)) {
2130 ERR("Unexpected payload size in \"relay_end_data_pending\": expected >= %zu bytes, got %zu bytes",
2131 sizeof(msg), payload->size);
2132 ret = -1;
2133 goto end_no_session;
2134 }
2135 memcpy(&msg, payload->data, sizeof(msg));
2136 msg.session_id = be64toh(msg.session_id);
2137
2138 /*
2139 * Iterate over all streams to see if the begin data pending
2140 * flag is set.
2141 */
2142 rcu_read_lock();
2143 cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
2144 node.node) {
2145 if (!stream_get(stream)) {
2146 continue;
2147 }
2148 if (stream->trace->session->id != msg.session_id) {
2149 stream_put(stream);
2150 continue;
2151 }
2152 pthread_mutex_lock(&stream->lock);
2153 if (!stream->data_pending_check_done) {
2154 if (!stream->closed || !(((int64_t) (stream->prev_seq - stream->last_net_seq_num)) >= 0)) {
2155 is_data_inflight = 1;
2156 DBG("Data is still in flight for stream %" PRIu64,
2157 stream->stream_handle);
2158 pthread_mutex_unlock(&stream->lock);
2159 stream_put(stream);
2160 break;
2161 }
2162 }
2163 pthread_mutex_unlock(&stream->lock);
2164 stream_put(stream);
2165 }
2166 rcu_read_unlock();
2167
2168 memset(&reply, 0, sizeof(reply));
2169 /* All good, send back reply. */
2170 reply.ret_code = htobe32(is_data_inflight);
2171
2172 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
2173 if (send_ret < (ssize_t) sizeof(reply)) {
2174 ERR("Failed to send \"end data pending\" command reply (ret = %zd)",
2175 send_ret);
2176 ret = -1;
2177 } else {
2178 ret = 0;
2179 }
2180
2181 end_no_session:
2182 return ret;
2183 }
2184
2185 /*
2186 * Receive an index for a specific stream.
2187 *
2188 * Return 0 on success else a negative value.
2189 */
2190 static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr,
2191 struct relay_connection *conn,
2192 const struct lttng_buffer_view *payload)
2193 {
2194 int ret;
2195 ssize_t send_ret;
2196 struct relay_session *session = conn->session;
2197 struct lttcomm_relayd_index index_info;
2198 struct relay_index *index;
2199 struct lttcomm_relayd_generic_reply reply;
2200 struct relay_stream *stream;
2201 size_t msg_len;
2202
2203 assert(conn);
2204
2205 DBG("Relay receiving index");
2206
2207 if (!session || !conn->version_check_done) {
2208 ERR("Trying to close a stream before version check");
2209 ret = -1;
2210 goto end_no_session;
2211 }
2212
2213 msg_len = lttcomm_relayd_index_len(
2214 lttng_to_index_major(conn->major, conn->minor),
2215 lttng_to_index_minor(conn->major, conn->minor));
2216 if (payload->size < msg_len) {
2217 ERR("Unexpected payload size in \"relay_recv_index\": expected >= %zu bytes, got %zu bytes",
2218 msg_len, payload->size);
2219 ret = -1;
2220 goto end_no_session;
2221 }
2222 memcpy(&index_info, payload->data, msg_len);
2223 index_info.relay_stream_id = be64toh(index_info.relay_stream_id);
2224 index_info.net_seq_num = be64toh(index_info.net_seq_num);
2225 index_info.packet_size = be64toh(index_info.packet_size);
2226 index_info.content_size = be64toh(index_info.content_size);
2227 index_info.timestamp_begin = be64toh(index_info.timestamp_begin);
2228 index_info.timestamp_end = be64toh(index_info.timestamp_end);
2229 index_info.events_discarded = be64toh(index_info.events_discarded);
2230 index_info.stream_id = be64toh(index_info.stream_id);
2231
2232 if (conn->minor >= 8) {
2233 index_info.stream_instance_id =
2234 be64toh(index_info.stream_instance_id);
2235 index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
2236 }
2237
2238 stream = stream_get_by_id(index_info.relay_stream_id);
2239 if (!stream) {
2240 ERR("stream_get_by_id not found");
2241 ret = -1;
2242 goto end;
2243 }
2244 pthread_mutex_lock(&stream->lock);
2245
2246 /* Live beacon handling */
2247 if (index_info.packet_size == 0) {
2248 DBG("Received live beacon for stream %" PRIu64,
2249 stream->stream_handle);
2250
2251 /*
2252 * Only flag a stream inactive when it has already
2253 * received data and no indexes are in flight.
2254 */
2255 if (stream->index_received_seqcount > 0
2256 && stream->indexes_in_flight == 0) {
2257 stream->beacon_ts_end = index_info.timestamp_end;
2258 }
2259 ret = 0;
2260 goto end_stream_put;
2261 } else {
2262 stream->beacon_ts_end = -1ULL;
2263 }
2264
2265 if (stream->ctf_stream_id == -1ULL) {
2266 stream->ctf_stream_id = index_info.stream_id;
2267 }
2268 index = relay_index_get_by_id_or_create(stream, index_info.net_seq_num);
2269 if (!index) {
2270 ret = -1;
2271 ERR("relay_index_get_by_id_or_create index NULL");
2272 goto end_stream_put;
2273 }
2274 if (set_index_control_data(index, &index_info, conn)) {
2275 ERR("set_index_control_data error");
2276 relay_index_put(index);
2277 ret = -1;
2278 goto end_stream_put;
2279 }
2280 ret = relay_index_try_flush(index);
2281 if (ret == 0) {
2282 tracefile_array_commit_seq(stream->tfa);
2283 stream->index_received_seqcount++;
2284 } else if (ret > 0) {
2285 /* no flush. */
2286 ret = 0;
2287 } else {
2288 ERR("relay_index_try_flush error %d", ret);
2289 relay_index_put(index);
2290 ret = -1;
2291 }
2292
2293 end_stream_put:
2294 pthread_mutex_unlock(&stream->lock);
2295 stream_put(stream);
2296
2297 end:
2298
2299 memset(&reply, 0, sizeof(reply));
2300 if (ret < 0) {
2301 reply.ret_code = htobe32(LTTNG_ERR_UNK);
2302 } else {
2303 reply.ret_code = htobe32(LTTNG_OK);
2304 }
2305 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
2306 if (send_ret < (ssize_t) sizeof(reply)) {
2307 ERR("Failed to send \"recv index\" command reply (ret = %zd)", send_ret);
2308 ret = -1;
2309 }
2310
2311 end_no_session:
2312 return ret;
2313 }
2314
2315 /*
2316 * Receive the streams_sent message.
2317 *
2318 * Return 0 on success else a negative value.
2319 */
2320 static int relay_streams_sent(const struct lttcomm_relayd_hdr *recv_hdr,
2321 struct relay_connection *conn,
2322 const struct lttng_buffer_view *payload)
2323 {
2324 int ret;
2325 ssize_t send_ret;
2326 struct lttcomm_relayd_generic_reply reply;
2327
2328 assert(conn);
2329
2330 DBG("Relay receiving streams_sent");
2331
2332 if (!conn->session || !conn->version_check_done) {
2333 ERR("Trying to close a stream before version check");
2334 ret = -1;
2335 goto end_no_session;
2336 }
2337
2338 /*
2339 * Publish every pending stream in the connection recv list which are
2340 * now ready to be used by the viewer.
2341 */
2342 publish_connection_local_streams(conn);
2343
2344 memset(&reply, 0, sizeof(reply));
2345 reply.ret_code = htobe32(LTTNG_OK);
2346 send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
2347 if (send_ret < (ssize_t) sizeof(reply)) {
2348 ERR("Failed to send \"streams sent\" command reply (ret = %zd)",
2349 send_ret);
2350 ret = -1;
2351 } else {
2352 /* Success. */
2353 ret = 0;
2354 }
2355
2356 end_no_session:
2357 return ret;
2358 }
2359
2360 #define DBG_CMD(cmd_name, conn) \
2361 DBG3("Processing \"%s\" command for socket %i", cmd_name, conn->sock->fd);
2362
2363 static int relay_process_control_command(struct relay_connection *conn,
2364 const struct lttcomm_relayd_hdr *header,
2365 const struct lttng_buffer_view *payload)
2366 {
2367 int ret = 0;
2368
2369 switch (header->cmd) {
2370 case RELAYD_CREATE_SESSION:
2371 DBG_CMD("RELAYD_CREATE_SESSION", conn);
2372 ret = relay_create_session(header, conn, payload);
2373 break;
2374 case RELAYD_ADD_STREAM:
2375 DBG_CMD("RELAYD_ADD_STREAM", conn);
2376 ret = relay_add_stream(header, conn, payload);
2377 break;
2378 case RELAYD_START_DATA:
2379 DBG_CMD("RELAYD_START_DATA", conn);
2380 ret = relay_start(header, conn, payload);
2381 break;
2382 case RELAYD_SEND_METADATA:
2383 DBG_CMD("RELAYD_SEND_METADATA", conn);
2384 ret = relay_recv_metadata(header, conn, payload);
2385 break;
2386 case RELAYD_VERSION:
2387 DBG_CMD("RELAYD_VERSION", conn);
2388 ret = relay_send_version(header, conn, payload);
2389 break;
2390 case RELAYD_CLOSE_STREAM:
2391 DBG_CMD("RELAYD_CLOSE_STREAM", conn);
2392 ret = relay_close_stream(header, conn, payload);
2393 break;
2394 case RELAYD_DATA_PENDING:
2395 DBG_CMD("RELAYD_DATA_PENDING", conn);
2396 ret = relay_data_pending(header, conn, payload);
2397 break;
2398 case RELAYD_QUIESCENT_CONTROL:
2399 DBG_CMD("RELAYD_QUIESCENT_CONTROL", conn);
2400 ret = relay_quiescent_control(header, conn, payload);
2401 break;
2402 case RELAYD_BEGIN_DATA_PENDING:
2403 DBG_CMD("RELAYD_BEGIN_DATA_PENDING", conn);
2404 ret = relay_begin_data_pending(header, conn, payload);
2405 break;
2406 case RELAYD_END_DATA_PENDING:
2407 DBG_CMD("RELAYD_END_DATA_PENDING", conn);
2408 ret = relay_end_data_pending(header, conn, payload);
2409 break;
2410 case RELAYD_SEND_INDEX:
2411 DBG_CMD("RELAYD_SEND_INDEX", conn);
2412 ret = relay_recv_index(header, conn, payload);
2413 break;
2414 case RELAYD_STREAMS_SENT:
2415 DBG_CMD("RELAYD_STREAMS_SENT", conn);
2416 ret = relay_streams_sent(header, conn, payload);
2417 break;
2418 case RELAYD_RESET_METADATA:
2419 DBG_CMD("RELAYD_RESET_METADATA", conn);
2420 ret = relay_reset_metadata(header, conn, payload);
2421 break;
2422 case RELAYD_UPDATE_SYNC_INFO:
2423 default:
2424 ERR("Received unknown command (%u)", header->cmd);
2425 relay_unknown_command(conn);
2426 ret = -1;
2427 goto end;
2428 }
2429
2430 end:
2431 return ret;
2432 }
2433
2434 static enum relay_connection_status relay_process_control_receive_payload(
2435 struct relay_connection *conn)
2436 {
2437 int ret = 0;
2438 enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
2439 struct lttng_dynamic_buffer *reception_buffer =
2440 &conn->protocol.ctrl.reception_buffer;
2441 struct ctrl_connection_state_receive_payload *state =
2442 &conn->protocol.ctrl.state.receive_payload;
2443 struct lttng_buffer_view payload_view;
2444
2445 if (state->left_to_receive == 0) {
2446 /* Short-circuit for payload-less commands. */
2447 goto reception_complete;
2448 }
2449 ret = conn->sock->ops->recvmsg(conn->sock,
2450 reception_buffer->data + state->received,
2451 state->left_to_receive, MSG_DONTWAIT);
2452 if (ret < 0) {
2453 if (errno != EAGAIN && errno != EWOULDBLOCK) {
2454 PERROR("Unable to receive command payload on sock %d",
2455 conn->sock->fd);
2456 status = RELAY_CONNECTION_STATUS_ERROR;
2457 }
2458 goto end;
2459 } else if (ret == 0) {
2460 DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
2461 status = RELAY_CONNECTION_STATUS_CLOSED;
2462 goto end;
2463 }
2464
2465 assert(ret > 0);
2466 assert(ret <= state->left_to_receive);
2467
2468 state->left_to_receive -= ret;
2469 state->received += ret;
2470
2471 if (state->left_to_receive > 0) {
2472 /*
2473 * Can't transition to the protocol's next state, wait to
2474 * receive the rest of the header.
2475 */
2476 DBG3("Partial reception of control connection protocol payload (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
2477 state->received, state->left_to_receive,
2478 conn->sock->fd);
2479 goto end;
2480 }
2481
2482 reception_complete:
2483 DBG("Done receiving control command payload: fd = %i, payload size = %" PRIu64 " bytes",
2484 conn->sock->fd, state->received);
2485 /*
2486 * The payload required to process the command has been received.
2487 * A view to the reception buffer is forwarded to the various
2488 * commands and the state of the control is reset on success.
2489 *
2490 * Commands are responsible for sending their reply to the peer.
2491 */
2492 payload_view = lttng_buffer_view_from_dynamic_buffer(reception_buffer,
2493 0, -1);
2494 ret = relay_process_control_command(conn,
2495 &state->header, &payload_view);
2496 if (ret < 0) {
2497 status = RELAY_CONNECTION_STATUS_ERROR;
2498 goto end;
2499 }
2500
2501 ret = connection_reset_protocol_state(conn);
2502 if (ret) {
2503 status = RELAY_CONNECTION_STATUS_ERROR;
2504 }
2505 end:
2506 return status;
2507 }
2508
2509 static enum relay_connection_status relay_process_control_receive_header(
2510 struct relay_connection *conn)
2511 {
2512 int ret = 0;
2513 enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
2514 struct lttcomm_relayd_hdr header;
2515 struct lttng_dynamic_buffer *reception_buffer =
2516 &conn->protocol.ctrl.reception_buffer;
2517 struct ctrl_connection_state_receive_header *state =
2518 &conn->protocol.ctrl.state.receive_header;
2519
2520 assert(state->left_to_receive != 0);
2521
2522 ret = conn->sock->ops->recvmsg(conn->sock,
2523 reception_buffer->data + state->received,
2524 state->left_to_receive, MSG_DONTWAIT);
2525 if (ret < 0) {
2526 if (errno != EAGAIN && errno != EWOULDBLOCK) {
2527 PERROR("Unable to receive control command header on sock %d",
2528 conn->sock->fd);
2529 status = RELAY_CONNECTION_STATUS_ERROR;
2530 }
2531 goto end;
2532 } else if (ret == 0) {
2533 DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
2534 status = RELAY_CONNECTION_STATUS_CLOSED;
2535 goto end;
2536 }
2537
2538 assert(ret > 0);
2539 assert(ret <= state->left_to_receive);
2540
2541 state->left_to_receive -= ret;
2542 state->received += ret;
2543
2544 if (state->left_to_receive > 0) {
2545 /*
2546 * Can't transition to the protocol's next state, wait to
2547 * receive the rest of the header.
2548 */
2549 DBG3("Partial reception of control connection protocol header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
2550 state->received, state->left_to_receive,
2551 conn->sock->fd);
2552 goto end;
2553 }
2554
2555 /* Transition to next state: receiving the command's payload. */
2556 conn->protocol.ctrl.state_id =
2557 CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD;
2558 memcpy(&header, reception_buffer->data, sizeof(header));
2559 header.circuit_id = be64toh(header.circuit_id);
2560 header.data_size = be64toh(header.data_size);
2561 header.cmd = be32toh(header.cmd);
2562 header.cmd_version = be32toh(header.cmd_version);
2563 memcpy(&conn->protocol.ctrl.state.receive_payload.header,
2564 &header, sizeof(header));
2565
2566 DBG("Done receiving control command header: fd = %i, cmd = %" PRIu32 ", cmd_version = %" PRIu32 ", payload size = %" PRIu64 " bytes",
2567 conn->sock->fd, header.cmd, header.cmd_version,
2568 header.data_size);
2569
2570 if (header.data_size > DEFAULT_NETWORK_RELAYD_CTRL_MAX_PAYLOAD_SIZE) {
2571 ERR("Command header indicates a payload (%" PRIu64 " bytes) that exceeds the maximal payload size allowed on a control connection.",
2572 header.data_size);
2573 status = RELAY_CONNECTION_STATUS_ERROR;
2574 goto end;
2575 }
2576
2577 conn->protocol.ctrl.state.receive_payload.left_to_receive =
2578 header.data_size;
2579 conn->protocol.ctrl.state.receive_payload.received = 0;
2580 ret = lttng_dynamic_buffer_set_size(reception_buffer,
2581 header.data_size);
2582 if (ret) {
2583 status = RELAY_CONNECTION_STATUS_ERROR;
2584 goto end;
2585 }
2586
2587 if (header.data_size == 0) {
2588 /*
2589 * Manually invoke the next state as the poll loop
2590 * will not wake-up to allow us to proceed further.
2591 */
2592 status = relay_process_control_receive_payload(conn);
2593 }
2594 end:
2595 return status;
2596 }
2597
2598 /*
2599 * Process the commands received on the control socket
2600 */
2601 static enum relay_connection_status relay_process_control(
2602 struct relay_connection *conn)
2603 {
2604 enum relay_connection_status status;
2605
2606 switch (conn->protocol.ctrl.state_id) {
2607 case CTRL_CONNECTION_STATE_RECEIVE_HEADER:
2608 status = relay_process_control_receive_header(conn);
2609 break;
2610 case CTRL_CONNECTION_STATE_RECEIVE_PAYLOAD:
2611 status = relay_process_control_receive_payload(conn);
2612 break;
2613 default:
2614 ERR("Unknown control connection protocol state encountered.");
2615 abort();
2616 }
2617
2618 return status;
2619 }
2620
2621 /*
2622 * Handle index for a data stream.
2623 *
2624 * Called with the stream lock held.
2625 *
2626 * Return 0 on success else a negative value.
2627 */
2628 static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
2629 bool rotate_index)
2630 {
2631 int ret = 0;
2632 uint64_t data_offset;
2633 struct relay_index *index;
2634
2635 /* Get data offset because we are about to update the index. */
2636 data_offset = htobe64(stream->tracefile_size_current);
2637
2638 DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
2639 stream->stream_handle, net_seq_num, stream->tracefile_size_current);
2640
2641 /*
2642 * Lookup for an existing index for that stream id/sequence
2643 * number. If it exists, the control thread has already received the
2644 * data for it, thus we need to write it to disk.
2645 */
2646 index = relay_index_get_by_id_or_create(stream, net_seq_num);
2647 if (!index) {
2648 ret = -1;
2649 goto end;
2650 }
2651
2652 if (rotate_index || !stream->index_file) {
2653 uint32_t major, minor;
2654
2655 /* Put ref on previous index_file. */
2656 if (stream->index_file) {
2657 relay_index_file_put(stream->index_file);
2658 stream->index_file = NULL;
2659 }
2660 major = stream->trace->session->major;
2661 minor = stream->trace->session->minor;
2662 stream->index_file = relay_index_file_create(stream->path_name,
2663 stream->channel_name,
2664 stream->tracefile_size,
2665 tracefile_array_get_file_index_head(stream->tfa),
2666 lttng_to_index_major(major, minor),
2667 lttng_to_index_minor(major, minor));
2668 if (!stream->index_file) {
2669 ret = -1;
2670 /* Put self-ref for this index due to error. */
2671 relay_index_put(index);
2672 index = NULL;
2673 goto end;
2674 }
2675 }
2676
2677 if (relay_index_set_file(index, stream->index_file, data_offset)) {
2678 ret = -1;
2679 /* Put self-ref for this index due to error. */
2680 relay_index_put(index);
2681 index = NULL;
2682 goto end;
2683 }
2684
2685 ret = relay_index_try_flush(index);
2686 if (ret == 0) {
2687 tracefile_array_commit_seq(stream->tfa);
2688 stream->index_received_seqcount++;
2689 } else if (ret > 0) {
2690 /* No flush. */
2691 ret = 0;
2692 } else {
2693 /* Put self-ref for this index due to error. */
2694 relay_index_put(index);
2695 index = NULL;
2696 ret = -1;
2697 }
2698 end:
2699 return ret;
2700 }
2701
2702 static enum relay_connection_status relay_process_data_receive_header(
2703 struct relay_connection *conn)
2704 {
2705 int ret;
2706 enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
2707 struct data_connection_state_receive_header *state =
2708 &conn->protocol.data.state.receive_header;
2709 struct lttcomm_relayd_data_hdr header;
2710 struct relay_stream *stream;
2711
2712 assert(state->left_to_receive != 0);
2713
2714 ret = conn->sock->ops->recvmsg(conn->sock,
2715 state->header_reception_buffer + state->received,
2716 state->left_to_receive, MSG_DONTWAIT);
2717 if (ret < 0) {
2718 if (errno != EAGAIN && errno != EWOULDBLOCK) {
2719 PERROR("Unable to receive data header on sock %d", conn->sock->fd);
2720 status = RELAY_CONNECTION_STATUS_ERROR;
2721 }
2722 goto end;
2723 } else if (ret == 0) {
2724 /* Orderly shutdown. Not necessary to print an error. */
2725 DBG("Socket %d performed an orderly shutdown (received EOF)", conn->sock->fd);
2726 status = RELAY_CONNECTION_STATUS_CLOSED;
2727 goto end;
2728 }
2729
2730 assert(ret > 0);
2731 assert(ret <= state->left_to_receive);
2732
2733 state->left_to_receive -= ret;
2734 state->received += ret;
2735
2736 if (state->left_to_receive > 0) {
2737 /*
2738 * Can't transition to the protocol's next state, wait to
2739 * receive the rest of the header.
2740 */
2741 DBG3("Partial reception of data connection header (received %" PRIu64 " bytes, %" PRIu64 " bytes left to receive, fd = %i)",
2742 state->received, state->left_to_receive,
2743 conn->sock->fd);
2744 ret = 0;
2745 goto end;
2746 }
2747
2748 /* Transition to next state: receiving the payload. */
2749 conn->protocol.data.state_id = DATA_CONNECTION_STATE_RECEIVE_PAYLOAD;
2750
2751 memcpy(&header, state->header_reception_buffer, sizeof(header));
2752 header.circuit_id = be64toh(header.circuit_id);
2753 header.stream_id = be64toh(header.stream_id);
2754 header.data_size = be32toh(header.data_size);
2755 header.net_seq_num = be64toh(header.net_seq_num);
2756 header.padding_size = be32toh(header.padding_size);
2757 memcpy(&conn->protocol.data.state.receive_payload.header, &header, sizeof(header));
2758
2759 conn->protocol.data.state.receive_payload.left_to_receive =
2760 header.data_size;
2761 conn->protocol.data.state.receive_payload.received = 0;
2762 conn->protocol.data.state.receive_payload.rotate_index = false;
2763
2764 DBG("Received data connection header on fd %i: circuit_id = %" PRIu64 ", stream_id = %" PRIu64 ", data_size = %" PRIu32 ", net_seq_num = %" PRIu64 ", padding_size = %" PRIu32,
2765 conn->sock->fd, header.circuit_id,
2766 header.stream_id, header.data_size,
2767 header.net_seq_num, header.padding_size);
2768
2769 stream = stream_get_by_id(header.stream_id);
2770 if (!stream) {
2771 DBG("relay_process_data_receive_payload: Cannot find stream %" PRIu64,
2772 header.stream_id);
2773 /* Protocol error. */
2774 status = RELAY_CONNECTION_STATUS_ERROR;
2775 goto end;
2776 }
2777
2778 pthread_mutex_lock(&stream->lock);
2779
2780 /* Check if a rotation is needed. */
2781 if (stream->tracefile_size > 0 &&
2782 (stream->tracefile_size_current + header.data_size) >
2783 stream->tracefile_size) {
2784 uint64_t old_id, new_id;
2785
2786 old_id = tracefile_array_get_file_index_head(stream->tfa);
2787 tracefile_array_file_rotate(stream->tfa);
2788
2789 /* new_id is updated by utils_rotate_stream_file. */
2790 new_id = old_id;
2791
2792 ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
2793 stream->channel_name, stream->tracefile_size,
2794 stream->tracefile_count, &new_id);
2795 if (ret < 0) {
2796 ERR("Failed to rotate stream output file");
2797 status = RELAY_CONNECTION_STATUS_ERROR;
2798 goto end_stream_unlock;
2799 }
2800
2801 /*
2802 * Reset current size because we just performed a stream
2803 * rotation.
2804 */
2805 stream->tracefile_size_current = 0;
2806 conn->protocol.data.state.receive_payload.rotate_index = true;
2807 }
2808
2809 ret = 0;
2810 end_stream_unlock:
2811 pthread_mutex_unlock(&stream->lock);
2812 stream_put(stream);
2813 end:
2814 return status;
2815 }
2816
2817 static enum relay_connection_status relay_process_data_receive_payload(
2818 struct relay_connection *conn)
2819 {
2820 int ret;
2821 enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
2822 struct relay_stream *stream;
2823 struct data_connection_state_receive_payload *state =
2824 &conn->protocol.data.state.receive_payload;
2825 const size_t chunk_size = RECV_DATA_BUFFER_SIZE;
2826 char data_buffer[chunk_size];
2827 bool partial_recv = false;
2828 bool new_stream = false, close_requested = false;
2829 uint64_t left_to_receive = state->left_to_receive;
2830 struct relay_session *session;
2831 int stream_fd = -1;
2832
2833 DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
2834 state->header.stream_id, state->header.net_seq_num,
2835 state->received, left_to_receive);
2836
2837 stream = stream_get_by_id(state->header.stream_id);
2838 if (!stream) {
2839 /* Protocol error. */
2840 ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
2841 state->header.stream_id);
2842 status = RELAY_CONNECTION_STATUS_ERROR;
2843 goto end;
2844 }
2845
2846 pthread_mutex_lock(&stream->lock);
2847 session = stream->trace->session;
2848 if (!conn->session) {
2849 ret = connection_set_session(conn, session);
2850 if (ret) {
2851 status = RELAY_CONNECTION_STATUS_ERROR;
2852 goto end_stream_unlock;
2853 }
2854 }
2855
2856 stream_fd = stream_fd_get_fd(stream->stream_fd);
2857 if (stream_fd < 0) {
2858 status = RELAY_CONNECTION_STATUS_ERROR;
2859 goto end_stream_unlock;
2860 }
2861
2862 /*
2863 * The size of the "chunk" received on any iteration is bounded by:
2864 * - the data left to receive,
2865 * - the data immediately available on the socket,
2866 * - the on-stack data buffer
2867 */
2868 while (left_to_receive > 0 && !partial_recv) {
2869 ssize_t write_ret;
2870 size_t recv_size = min(left_to_receive, chunk_size);
2871
2872 ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
2873 recv_size, MSG_DONTWAIT);
2874 if (ret < 0) {
2875 if (errno != EAGAIN && errno != EWOULDBLOCK) {
2876 PERROR("Socket %d error", conn->sock->fd);
2877 status = RELAY_CONNECTION_STATUS_ERROR;
2878 }
2879 goto end_put_fd;
2880 } else if (ret == 0) {
2881 /* No more data ready to be consumed on socket. */
2882 DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
2883 state->header.stream_id);
2884 status = RELAY_CONNECTION_STATUS_CLOSED;
2885 break;
2886 } else if (ret < (int) recv_size) {
2887 /*
2888 * All the data available on the socket has been
2889 * consumed.
2890 */
2891 partial_recv = true;
2892 }
2893
2894 recv_size = ret;
2895
2896 /* Write data to stream output fd. */
2897 write_ret = lttng_write(stream_fd, data_buffer,
2898 recv_size);
2899 if (write_ret < (ssize_t) recv_size) {
2900 ERR("Relay error writing data to file");
2901 status = RELAY_CONNECTION_STATUS_ERROR;
2902 goto end_put_fd;
2903 }
2904
2905 left_to_receive -= recv_size;
2906 state->received += recv_size;
2907 state->left_to_receive = left_to_receive;
2908
2909 DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
2910 write_ret, stream->stream_handle);
2911 }
2912
2913 if (state->left_to_receive > 0) {
2914 /*
2915 * Did not receive all the data expected, wait for more data to
2916 * become available on the socket.
2917 */
2918 DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
2919 state->header.stream_id, state->received,
2920 state->left_to_receive);
2921 goto end_put_fd;
2922 }
2923
2924 ret = write_padding_to_file(stream_fd,
2925 state->header.padding_size);
2926 if ((int64_t) ret < (int64_t) state->header.padding_size) {
2927 ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
2928 stream->stream_handle,
2929 state->header.net_seq_num, ret);
2930 status = RELAY_CONNECTION_STATUS_ERROR;
2931 goto end_put_fd;
2932 }
2933
2934
2935 if (session->minor >= 4 && !session->snapshot) {
2936 ret = handle_index_data(stream, state->header.net_seq_num,
2937 state->rotate_index);
2938 if (ret < 0) {
2939 ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
2940 stream->stream_handle,
2941 state->header.net_seq_num, ret);
2942 status = RELAY_CONNECTION_STATUS_ERROR;
2943 goto end_put_fd;
2944 }
2945 }
2946
2947 stream->tracefile_size_current += state->header.data_size +
2948 state->header.padding_size;
2949
2950 if (stream->prev_seq == -1ULL) {
2951 new_stream = true;
2952 }
2953
2954 stream->prev_seq = state->header.net_seq_num;
2955
2956 /*
2957 * Resetting the protocol state (to RECEIVE_HEADER) will trash the
2958 * contents of *state which are aliased (union) to the same location as
2959 * the new state. Don't use it beyond this point.
2960 */
2961 connection_reset_protocol_state(conn);
2962 state = NULL;
2963
2964 end_put_fd:
2965 stream_fd_put_fd(stream->stream_fd);
2966 end_stream_unlock:
2967 close_requested = stream->close_requested;
2968 pthread_mutex_unlock(&stream->lock);
2969 if (close_requested && left_to_receive == 0) {
2970 try_stream_close(stream);
2971 }
2972
2973 if (new_stream) {
2974 pthread_mutex_lock(&session->lock);
2975 uatomic_set(&session->new_streams, 1);
2976 pthread_mutex_unlock(&session->lock);
2977 }
2978
2979 stream_put(stream);
2980 end:
2981 return status;
2982 }
2983
2984 /*
2985 * relay_process_data: Process the data received on the data socket
2986 */
2987 static enum relay_connection_status relay_process_data(
2988 struct relay_connection *conn)
2989 {
2990 enum relay_connection_status status;
2991
2992 switch (conn->protocol.data.state_id) {
2993 case DATA_CONNECTION_STATE_RECEIVE_HEADER:
2994 status = relay_process_data_receive_header(conn);
2995 break;
2996 case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
2997 status = relay_process_data_receive_payload(conn);
2998 break;
2999 default:
3000 ERR("Unexpected data connection communication state.");
3001 abort();
3002 }
3003
3004 return status;
3005 }
3006
3007 static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
3008 {
3009 int ret;
3010
3011 (void) lttng_poll_del(events, pollfd);
3012
3013 ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
3014 fd_tracker_util_close_fd, NULL);
3015 if (ret < 0) {
3016 ERR("Closing pollfd %d", pollfd);
3017 }
3018 }
3019
3020 static void relay_thread_close_connection(struct lttng_poll_event *events,
3021 int pollfd, struct relay_connection *conn)
3022 {
3023 const char *type_str;
3024
3025 switch (conn->type) {
3026 case RELAY_DATA:
3027 type_str = "Data";
3028 break;
3029 case RELAY_CONTROL:
3030 type_str = "Control";
3031 break;
3032 case RELAY_VIEWER_COMMAND:
3033 type_str = "Viewer Command";
3034 break;
3035 case RELAY_VIEWER_NOTIFICATION:
3036 type_str = "Viewer Notification";
3037 break;
3038 default:
3039 type_str = "Unknown";
3040 }
3041 cleanup_connection_pollfd(events, pollfd);
3042 connection_put(conn);
3043 DBG("%s connection closed with %d", type_str, pollfd);
3044 }
3045
3046 /*
3047 * This thread does the actual work
3048 */
3049 static void *relay_thread_worker(void *data)
3050 {
3051 int ret, err = -1, last_seen_data_fd = -1;
3052 uint32_t nb_fd;
3053 struct lttng_poll_event events;
3054 struct lttng_ht *relay_connections_ht;
3055 struct lttng_ht_iter iter;
3056 struct relay_connection *destroy_conn = NULL;
3057
3058 DBG("[thread] Relay worker started");
3059
3060 rcu_register_thread();
3061
3062 health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
3063
3064 if (testpoint(relayd_thread_worker)) {
3065 goto error_testpoint;
3066 }
3067
3068 health_code_update();
3069
3070 /* table of connections indexed on socket */
3071 relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
3072 if (!relay_connections_ht) {
3073 goto relay_connections_ht_error;
3074 }
3075
3076 ret = create_named_thread_poll_set(&events, 2, "Worker thread epoll");
3077 if (ret < 0) {
3078 goto error_poll_create;
3079 }
3080
3081 ret = lttng_poll_add(&events, relay_conn_pipe[0], LPOLLIN | LPOLLRDHUP);
3082 if (ret < 0) {
3083 goto error;
3084 }
3085
3086 restart:
3087 while (1) {
3088 int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
3089
3090 health_code_update();
3091
3092 /* Infinite blocking call, waiting for transmission */
3093 DBG3("Relayd worker thread polling...");
3094 health_poll_entry();
3095 ret = lttng_poll_wait(&events, -1);
3096 health_poll_exit();
3097 if (ret < 0) {
3098 /*
3099 * Restart interrupted system call.
3100 */
3101 if (errno == EINTR) {
3102 goto restart;
3103 }
3104 goto error;
3105 }
3106
3107 nb_fd = ret;
3108
3109 /*
3110 * Process control. The control connection is
3111 * prioritized so we don't starve it with high
3112 * throughput tracing data on the data connection.
3113 */
3114 for (i = 0; i < nb_fd; i++) {
3115 /* Fetch once the poll data */
3116 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
3117 int pollfd = LTTNG_POLL_GETFD(&events, i);
3118
3119 health_code_update();
3120
3121 if (!revents) {
3122 /*
3123 * No activity for this FD (poll
3124 * implementation).
3125 */
3126 continue;
3127 }
3128
3129 /* Thread quit pipe has been closed. Killing thread. */
3130 ret = check_thread_quit_pipe(pollfd, revents);
3131 if (ret) {
3132 err = 0;
3133 goto exit;
3134 }
3135
3136 /* Inspect the relay conn pipe for new connection */
3137 if (pollfd == relay_conn_pipe[0]) {
3138 if (revents & LPOLLIN) {
3139 struct relay_connection *conn;
3140
3141 ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
3142 if (ret < 0) {
3143 goto error;
3144 }
3145 lttng_poll_add(&events, conn->sock->fd,
3146 LPOLLIN | LPOLLRDHUP);
3147 connection_ht_add(relay_connections_ht, conn);
3148 DBG("Connection socket %d added", conn->sock->fd);
3149 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
3150 ERR("Relay connection pipe error");
3151 goto error;
3152 } else {
3153 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
3154 goto error;
3155 }
3156 } else {
3157 struct relay_connection *ctrl_conn;
3158
3159 ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
3160 /* If not found, there is a synchronization issue. */
3161 assert(ctrl_conn);
3162
3163 if (ctrl_conn->type == RELAY_DATA) {
3164 if (revents & LPOLLIN) {
3165 /*
3166 * Flag the last seen data fd not deleted. It will be
3167 * used as the last seen fd if any fd gets deleted in
3168 * this first loop.
3169 */
3170 last_notdel_data_fd = pollfd;
3171 }
3172 goto put_ctrl_connection;
3173 }
3174 assert(ctrl_conn->type == RELAY_CONTROL);
3175
3176 if (revents & LPOLLIN) {
3177 enum relay_connection_status status;
3178
3179 status = relay_process_control(ctrl_conn);
3180 if (status != RELAY_CONNECTION_STATUS_OK) {
3181 /*
3182 * On socket error flag the session as aborted to force
3183 * the cleanup of its stream otherwise it can leak
3184 * during the lifetime of the relayd.
3185 *
3186 * This prevents situations in which streams can be
3187 * left opened because an index was received, the
3188 * control connection is closed, and the data
3189 * connection is closed (uncleanly) before the packet's
3190 * data provided.
3191 *
3192 * Since the control connection encountered an error,
3193 * it is okay to be conservative and close the
3194 * session right now as we can't rely on the protocol
3195 * being respected anymore.
3196 */
3197 if (status == RELAY_CONNECTION_STATUS_ERROR) {
3198 session_abort(ctrl_conn->session);
3199 }
3200
3201 /* Clear the connection on error or close. */
3202 relay_thread_close_connection(&events,
3203 pollfd,
3204 ctrl_conn);
3205 }
3206 seen_control = 1;
3207 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
3208 relay_thread_close_connection(&events,
3209 pollfd, ctrl_conn);
3210 if (last_seen_data_fd == pollfd) {
3211 last_seen_data_fd = last_notdel_data_fd;
3212 }
3213 } else {
3214 ERR("Unexpected poll events %u for control sock %d",
3215 revents, pollfd);
3216 connection_put(ctrl_conn);
3217 goto error;
3218 }
3219 put_ctrl_connection:
3220 connection_put(ctrl_conn);
3221 }
3222 }
3223
3224 /*
3225 * The last loop handled a control request, go back to poll to make
3226 * sure we prioritise the control socket.
3227 */
3228 if (seen_control) {
3229 continue;
3230 }
3231
3232 if (last_seen_data_fd >= 0) {
3233 for (i = 0; i < nb_fd; i++) {
3234 int pollfd = LTTNG_POLL_GETFD(&events, i);
3235
3236 health_code_update();
3237
3238 if (last_seen_data_fd == pollfd) {
3239 idx = i;
3240 break;
3241 }
3242 }
3243 }
3244
3245 /* Process data connection. */
3246 for (i = idx + 1; i < nb_fd; i++) {
3247 /* Fetch the poll data. */
3248 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
3249 int pollfd = LTTNG_POLL_GETFD(&events, i);
3250 struct relay_connection *data_conn;
3251
3252 health_code_update();
3253
3254 if (!revents) {
3255 /* No activity for this FD (poll implementation). */
3256 continue;
3257 }
3258
3259 /* Skip the command pipe. It's handled in the first loop. */
3260 if (pollfd == relay_conn_pipe[0]) {
3261 continue;
3262 }
3263
3264 data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
3265 if (!data_conn) {
3266 /* Skip it. Might be removed before. */
3267 continue;
3268 }
3269 if (data_conn->type == RELAY_CONTROL) {
3270 goto put_data_connection;
3271 }
3272 assert(data_conn->type == RELAY_DATA);
3273
3274 if (revents & LPOLLIN) {
3275 enum relay_connection_status status;
3276
3277 status = relay_process_data(data_conn);
3278 /* Connection closed or error. */
3279 if (status != RELAY_CONNECTION_STATUS_OK) {
3280 /*
3281 * On socket error flag the session as aborted to force
3282 * the cleanup of its stream otherwise it can leak
3283 * during the lifetime of the relayd.
3284 *
3285 * This prevents situations in which streams can be
3286 * left opened because an index was received, the
3287 * control connection is closed, and the data
3288 * connection is closed (uncleanly) before the packet's
3289 * data provided.
3290 *
3291 * Since the data connection encountered an error,
3292 * it is okay to be conservative and close the
3293 * session right now as we can't rely on the protocol
3294 * being respected anymore.
3295 */
3296 if (status == RELAY_CONNECTION_STATUS_ERROR) {
3297 session_abort(data_conn->session);
3298 }
3299 relay_thread_close_connection(&events, pollfd,
3300 data_conn);
3301 /*
3302 * Every goto restart call sets the last seen fd where
3303 * here we don't really care since we gracefully
3304 * continue the loop after the connection is deleted.
3305 */
3306 } else {
3307 /* Keep last seen port. */
3308 last_seen_data_fd = pollfd;
3309 connection_put(data_conn);
3310 goto restart;
3311 }
3312 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
3313 relay_thread_close_connection(&events, pollfd,
3314 data_conn);
3315 } else {
3316 ERR("Unknown poll events %u for data sock %d",
3317 revents, pollfd);
3318 }
3319 put_data_connection:
3320 connection_put(data_conn);
3321 }
3322 last_seen_data_fd = -1;
3323 }
3324
3325 /* Normal exit, no error */
3326 ret = 0;
3327
3328 exit:
3329 error:
3330 /* Cleanup reamaining connection object. */
3331 rcu_read_lock();
3332 cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
3333 destroy_conn,
3334 sock_n.node) {
3335 health_code_update();
3336
3337 session_abort(destroy_conn->session);
3338
3339 /*
3340 * No need to grab another ref, because we own
3341 * destroy_conn.
3342 */
3343 relay_thread_close_connection(&events, destroy_conn->sock->fd,
3344 destroy_conn);
3345 }
3346 rcu_read_unlock();
3347
3348 (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
3349 error_poll_create:
3350 lttng_ht_destroy(relay_connections_ht);
3351 relay_connections_ht_error:
3352 /* Close relay conn pipes */
3353 (void) fd_tracker_util_pipe_close(the_fd_tracker,
3354 relay_conn_pipe);
3355 if (err) {
3356 DBG("Thread exited with error");
3357 }
3358 DBG("Worker thread cleanup complete");
3359 error_testpoint:
3360 if (err) {
3361 health_error();
3362 ERR("Health error occurred in %s", __func__);
3363 }
3364 health_unregister(health_relayd);
3365 rcu_unregister_thread();
3366 lttng_relay_stop_threads();
3367 return NULL;
3368 }
3369
3370 /*
3371 * Create the relay command pipe to wake thread_manage_apps.
3372 * Closed in cleanup().
3373 */
3374 static int create_relay_conn_pipe(void)
3375 {
3376 return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
3377 "Relayd connection pipe", relay_conn_pipe);
3378 }
3379
3380 static
3381 int stdio_open(void *data, int *fds)
3382 {
3383 fds[0] = fileno(stdout);
3384 fds[1] = fileno(stderr);
3385 return 0;
3386 }
3387
3388 static
3389 int noop_close(void *data, int *fds)
3390 {
3391 return 0;
3392 }
3393
3394 static
3395 int track_stdio(void)
3396 {
3397 int fds[2];
3398 const char *names[] = { "stdout", "stderr" };
3399
3400 return fd_tracker_open_unsuspendable_fd(the_fd_tracker, fds,
3401 names, 2, stdio_open, NULL);
3402 }
3403
3404 static
3405 void untrack_stdio(void)
3406 {
3407 int fds[] = { fileno(stdout), fileno(stderr) };
3408
3409 /*
3410 * noop_close is used since we don't really want to close
3411 * the stdio output fds; we merely want to stop tracking them.
3412 */
3413 (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker,
3414 fds, 2, noop_close, NULL);
3415 }
3416
3417 /*
3418 * main
3419 */
3420 int main(int argc, char **argv)
3421 {
3422 int ret = 0, retval = 0;
3423 void *status;
3424
3425 /* Parse environment variables */
3426 parse_env_options();
3427
3428 /*
3429 * Parse arguments.
3430 * Command line arguments overwrite environment.
3431 */
3432 progname = argv[0];
3433 if (set_options(argc, argv)) {
3434 retval = -1;
3435 goto exit_options;
3436 }
3437
3438 if (set_signal_handler()) {
3439 retval = -1;
3440 goto exit_options;
3441 }
3442
3443 relayd_config_log();
3444
3445 if (opt_print_version) {
3446 print_version();
3447 retval = 0;
3448 goto exit_options;
3449 }
3450
3451 ret = fclose(stdin);
3452 if (ret) {
3453 PERROR("Failed to close stdin");
3454 goto exit_options;
3455 }
3456 /* Try to create directory if -o, --output is specified. */
3457 if (opt_output_path) {
3458 if (*opt_output_path != '/') {
3459 ERR("Please specify an absolute path for -o, --output PATH");
3460 retval = -1;
3461 goto exit_options;
3462 }
3463
3464 ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG,
3465 -1, -1);
3466 if (ret < 0) {
3467 ERR("Unable to create %s", opt_output_path);
3468 retval = -1;
3469 goto exit_options;
3470 }
3471 }
3472
3473 /* Daemonize */
3474 if (opt_daemon || opt_background) {
3475 ret = lttng_daemonize(&child_ppid, &recv_child_signal,
3476 !opt_background);
3477 if (ret < 0) {
3478 retval = -1;
3479 goto exit_options;
3480 }
3481 }
3482
3483 if (opt_working_directory) {
3484 ret = utils_change_working_dir(opt_working_directory);
3485 if (ret) {
3486 ERR("Changing working directory");
3487 goto exit_options;
3488 }
3489 }
3490 /*
3491 * The RCU thread registration (and use, through the fd-tracker's
3492 * creation) is done after the daemonization to allow us to not
3493 * deal with liburcu's fork() management as the call RCU needs to
3494 * be restored.
3495 */
3496 rcu_register_thread();
3497
3498 the_fd_tracker = fd_tracker_create(lttng_opt_fd_pool_size);
3499 if (!the_fd_tracker) {
3500 retval = -1;
3501 goto exit_options;
3502 }
3503
3504 ret = track_stdio();
3505 if (ret) {
3506 retval = -1;
3507 goto exit_tracker;
3508 }
3509
3510 /* Initialize thread health monitoring */
3511 health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
3512 if (!health_relayd) {
3513 PERROR("health_app_create error");
3514 retval = -1;
3515 goto exit_health_app_create;
3516 }
3517
3518 /* Create thread quit pipe */
3519 if (init_thread_quit_pipe()) {
3520 retval = -1;
3521 goto exit_init_data;
3522 }
3523
3524 /* Setup the thread apps communication pipe. */
3525 if (create_relay_conn_pipe()) {
3526 retval = -1;
3527 goto exit_init_data;
3528 }
3529
3530 /* Init relay command queue. */
3531 cds_wfcq_init(&relay_conn_queue.head, &relay_conn_queue.tail);
3532
3533 /* Initialize communication library */
3534 lttcomm_init();
3535 lttcomm_inet_init();
3536
3537 /* tables of sessions indexed by session ID */
3538 sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3539 if (!sessions_ht) {
3540 retval = -1;
3541 goto exit_init_data;
3542 }
3543
3544 /* tables of streams indexed by stream ID */
3545 relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3546 if (!relay_streams_ht) {
3547 retval = -1;
3548 goto exit_init_data;
3549 }
3550
3551 /* tables of streams indexed by stream ID */
3552 viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
3553 if (!viewer_streams_ht) {
3554 retval = -1;
3555 goto exit_init_data;
3556 }
3557
3558 ret = init_health_quit_pipe();
3559 if (ret) {
3560 retval = -1;
3561 goto exit_health_quit_pipe;
3562 }
3563
3564 /* Create thread to manage the client socket */
3565 ret = pthread_create(&health_thread, default_pthread_attr(),
3566 thread_manage_health, (void *) NULL);
3567 if (ret) {
3568 errno = ret;
3569 PERROR("pthread_create health");
3570 retval = -1;
3571 goto exit_health_thread;
3572 }
3573
3574 /* Setup the dispatcher thread */
3575 ret = pthread_create(&dispatcher_thread, default_pthread_attr(),
3576 relay_thread_dispatcher, (void *) NULL);
3577 if (ret) {
3578 errno = ret;
3579 PERROR("pthread_create dispatcher");
3580 retval = -1;
3581 goto exit_dispatcher_thread;
3582 }
3583
3584 /* Setup the worker thread */
3585 ret = pthread_create(&worker_thread, default_pthread_attr(),
3586 relay_thread_worker, NULL);
3587 if (ret) {
3588 errno = ret;
3589 PERROR("pthread_create worker");
3590 retval = -1;
3591 goto exit_worker_thread;
3592 }
3593
3594 /* Setup the listener thread */
3595 ret = pthread_create(&listener_thread, default_pthread_attr(),
3596 relay_thread_listener, (void *) NULL);
3597 if (ret) {
3598 errno = ret;
3599 PERROR("pthread_create listener");
3600 retval = -1;
3601 goto exit_listener_thread;
3602 }
3603
3604 ret = relayd_live_create(live_uri);
3605 if (ret) {
3606 ERR("Starting live viewer threads");
3607 retval = -1;
3608 goto exit_live;
3609 }
3610
3611 /*
3612 * This is where we start awaiting program completion (e.g. through
3613 * signal that asks threads to teardown).
3614 */
3615
3616 ret = relayd_live_join();
3617 if (ret) {
3618 retval = -1;
3619 }
3620 exit_live:
3621
3622 ret = pthread_join(listener_thread, &status);
3623 if (ret) {
3624 errno = ret;
3625 PERROR("pthread_join listener_thread");
3626 retval = -1;
3627 }
3628
3629 exit_listener_thread:
3630 ret = pthread_join(worker_thread, &status);
3631 if (ret) {
3632 errno = ret;
3633 PERROR("pthread_join worker_thread");
3634 retval = -1;
3635 }
3636
3637 exit_worker_thread:
3638 ret = pthread_join(dispatcher_thread, &status);
3639 if (ret) {
3640 errno = ret;
3641 PERROR("pthread_join dispatcher_thread");
3642 retval = -1;
3643 }
3644 exit_dispatcher_thread:
3645
3646 ret = pthread_join(health_thread, &status);
3647 if (ret) {
3648 errno = ret;
3649 PERROR("pthread_join health_thread");
3650 retval = -1;
3651 }
3652 exit_health_thread:
3653
3654 (void) fd_tracker_util_pipe_close(the_fd_tracker, health_quit_pipe);
3655 exit_health_quit_pipe:
3656
3657 exit_init_data:
3658 health_app_destroy(health_relayd);
3659 exit_health_app_create:
3660
3661 /*
3662 * Wait for all pending call_rcu work to complete before tearing
3663 * down data structures. call_rcu worker may be trying to
3664 * perform lookups in those structures.
3665 */
3666 rcu_barrier();
3667 relayd_cleanup();
3668
3669 /* Ensure all prior call_rcu are done. */
3670 rcu_barrier();
3671 exit_tracker:
3672 untrack_stdio();
3673 /*
3674 * fd_tracker_destroy() will log the contents of the fd-tracker
3675 * if a leak is detected.
3676 */
3677 fd_tracker_destroy(the_fd_tracker);
3678 rcu_unregister_thread();
3679 exit_options:
3680 if (!retval) {
3681 exit(EXIT_SUCCESS);
3682 } else {
3683 exit(EXIT_FAILURE);
3684 }
3685 }
This page took 0.192368 seconds and 5 git commands to generate.