Send indexes in streaming mode
[lttng-tools.git] / src / bin / lttng-relayd / main.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License, version 2 only,
7 * as published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _GNU_SOURCE
20 #include <getopt.h>
21 #include <grp.h>
22 #include <limits.h>
23 #include <pthread.h>
24 #include <signal.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/mman.h>
29 #include <sys/mount.h>
30 #include <sys/resource.h>
31 #include <sys/socket.h>
32 #include <sys/stat.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <inttypes.h>
36 #include <urcu/futex.h>
37 #include <urcu/uatomic.h>
38 #include <unistd.h>
39 #include <fcntl.h>
40 #include <config.h>
41
42 #include <lttng/lttng.h>
43 #include <common/common.h>
44 #include <common/compat/poll.h>
45 #include <common/compat/socket.h>
46 #include <common/defaults.h>
47 #include <common/futex.h>
48 #include <common/sessiond-comm/sessiond-comm.h>
49 #include <common/sessiond-comm/inet.h>
50 #include <common/sessiond-comm/relayd.h>
51 #include <common/uri.h>
52 #include <common/utils.h>
53
54 #include "cmd.h"
55 #include "index.h"
56 #include "utils.h"
57 #include "lttng-relayd.h"
58
59 /* command line options */
60 char *opt_output_path;
61 static int opt_daemon;
62 static struct lttng_uri *control_uri;
63 static struct lttng_uri *data_uri;
64
65 const char *progname;
66
67 /*
68 * Quit pipe for all threads. This permits a single cancellation point
69 * for all threads when receiving an event on the pipe.
70 */
71 static int thread_quit_pipe[2] = { -1, -1 };
72
73 /*
74 * This pipe is used to inform the worker thread that a command is queued and
75 * ready to be processed.
76 */
77 static int relay_cmd_pipe[2] = { -1, -1 };
78
79 /* Shared between threads */
80 static int dispatch_thread_exit;
81
82 static pthread_t listener_thread;
83 static pthread_t dispatcher_thread;
84 static pthread_t worker_thread;
85
86 static uint64_t last_relay_stream_id;
87 static uint64_t last_relay_session_id;
88
89 /*
90 * Relay command queue.
91 *
92 * The relay_thread_listener and relay_thread_dispatcher communicate with this
93 * queue.
94 */
95 static struct relay_cmd_queue relay_cmd_queue;
96
97 /* buffer allocated at startup, used to store the trace data */
98 static char *data_buffer;
99 static unsigned int data_buffer_size;
100
101 /* Global hash table that stores relay index object. */
102 static struct lttng_ht *indexes_ht;
103
104 /* We need those values for the file/dir creation. */
105 static uid_t relayd_uid;
106 static gid_t relayd_gid;
107
108 /*
109 * usage function on stderr
110 */
111 static
112 void usage(void)
113 {
114 fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname);
115 fprintf(stderr, " -h, --help Display this usage.\n");
116 fprintf(stderr, " -d, --daemonize Start as a daemon.\n");
117 fprintf(stderr, " -C, --control-port URL Control port listening.\n");
118 fprintf(stderr, " -D, --data-port URL Data port listening.\n");
119 fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
120 fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
121 }
122
123 static
124 int parse_args(int argc, char **argv)
125 {
126 int c;
127 int ret = 0;
128 char *default_address;
129
130 static struct option long_options[] = {
131 { "control-port", 1, 0, 'C', },
132 { "data-port", 1, 0, 'D', },
133 { "daemonize", 0, 0, 'd', },
134 { "help", 0, 0, 'h', },
135 { "output", 1, 0, 'o', },
136 { "verbose", 0, 0, 'v', },
137 { NULL, 0, 0, 0, },
138 };
139
140 while (1) {
141 int option_index = 0;
142 c = getopt_long(argc, argv, "dhv" "C:D:o:",
143 long_options, &option_index);
144 if (c == -1) {
145 break;
146 }
147
148 switch (c) {
149 case 0:
150 fprintf(stderr, "option %s", long_options[option_index].name);
151 if (optarg) {
152 fprintf(stderr, " with arg %s\n", optarg);
153 }
154 break;
155 case 'C':
156 ret = uri_parse(optarg, &control_uri);
157 if (ret < 0) {
158 ERR("Invalid control URI specified");
159 goto exit;
160 }
161 if (control_uri->port == 0) {
162 control_uri->port = DEFAULT_NETWORK_CONTROL_PORT;
163 }
164 break;
165 case 'D':
166 ret = uri_parse(optarg, &data_uri);
167 if (ret < 0) {
168 ERR("Invalid data URI specified");
169 goto exit;
170 }
171 if (data_uri->port == 0) {
172 data_uri->port = DEFAULT_NETWORK_DATA_PORT;
173 }
174 break;
175 case 'd':
176 opt_daemon = 1;
177 break;
178 case 'h':
179 usage();
180 exit(EXIT_FAILURE);
181 case 'o':
182 ret = asprintf(&opt_output_path, "%s", optarg);
183 if (ret < 0) {
184 PERROR("asprintf opt_output_path");
185 goto exit;
186 }
187 break;
188 case 'v':
189 /* Verbose level can increase using multiple -v */
190 lttng_opt_verbose += 1;
191 break;
192 default:
193 /* Unknown option or other error.
194 * Error is printed by getopt, just return */
195 ret = -1;
196 goto exit;
197 }
198 }
199
200 /* assign default values */
201 if (control_uri == NULL) {
202 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
203 DEFAULT_NETWORK_CONTROL_PORT);
204 if (ret < 0) {
205 PERROR("asprintf default data address");
206 goto exit;
207 }
208
209 ret = uri_parse(default_address, &control_uri);
210 free(default_address);
211 if (ret < 0) {
212 ERR("Invalid control URI specified");
213 goto exit;
214 }
215 }
216 if (data_uri == NULL) {
217 ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
218 DEFAULT_NETWORK_DATA_PORT);
219 if (ret < 0) {
220 PERROR("asprintf default data address");
221 goto exit;
222 }
223
224 ret = uri_parse(default_address, &data_uri);
225 free(default_address);
226 if (ret < 0) {
227 ERR("Invalid data URI specified");
228 goto exit;
229 }
230 }
231
232 exit:
233 return ret;
234 }
235
236 /*
237 * Cleanup the daemon
238 */
239 static
240 void cleanup(void)
241 {
242 DBG("Cleaning up");
243
244 /* free the dynamically allocated opt_output_path */
245 free(opt_output_path);
246
247 /* Close thread quit pipes */
248 utils_close_pipe(thread_quit_pipe);
249
250 uri_free(control_uri);
251 uri_free(data_uri);
252 }
253
254 /*
255 * Write to writable pipe used to notify a thread.
256 */
257 static
258 int notify_thread_pipe(int wpipe)
259 {
260 int ret;
261
262 do {
263 ret = write(wpipe, "!", 1);
264 } while (ret < 0 && errno == EINTR);
265 if (ret < 0 || ret != 1) {
266 PERROR("write poll pipe");
267 }
268
269 return ret;
270 }
271
272 /*
273 * Stop all threads by closing the thread quit pipe.
274 */
275 static
276 void stop_threads(void)
277 {
278 int ret;
279
280 /* Stopping all threads */
281 DBG("Terminating all threads");
282 ret = notify_thread_pipe(thread_quit_pipe[1]);
283 if (ret < 0) {
284 ERR("write error on thread quit pipe");
285 }
286
287 /* Dispatch thread */
288 CMM_STORE_SHARED(dispatch_thread_exit, 1);
289 futex_nto1_wake(&relay_cmd_queue.futex);
290 }
291
292 /*
293 * Signal handler for the daemon
294 *
295 * Simply stop all worker threads, leaving main() return gracefully after
296 * joining all threads and calling cleanup().
297 */
298 static
299 void sighandler(int sig)
300 {
301 switch (sig) {
302 case SIGPIPE:
303 DBG("SIGPIPE caught");
304 return;
305 case SIGINT:
306 DBG("SIGINT caught");
307 stop_threads();
308 break;
309 case SIGTERM:
310 DBG("SIGTERM caught");
311 stop_threads();
312 break;
313 default:
314 break;
315 }
316 }
317
318 /*
319 * Setup signal handler for :
320 * SIGINT, SIGTERM, SIGPIPE
321 */
322 static
323 int set_signal_handler(void)
324 {
325 int ret = 0;
326 struct sigaction sa;
327 sigset_t sigset;
328
329 if ((ret = sigemptyset(&sigset)) < 0) {
330 PERROR("sigemptyset");
331 return ret;
332 }
333
334 sa.sa_handler = sighandler;
335 sa.sa_mask = sigset;
336 sa.sa_flags = 0;
337 if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
338 PERROR("sigaction");
339 return ret;
340 }
341
342 if ((ret = sigaction(SIGINT, &sa, NULL)) < 0) {
343 PERROR("sigaction");
344 return ret;
345 }
346
347 if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
348 PERROR("sigaction");
349 return ret;
350 }
351
352 DBG("Signal handler set for SIGTERM, SIGPIPE and SIGINT");
353
354 return ret;
355 }
356
357 /*
358 * Init thread quit pipe.
359 *
360 * Return -1 on error or 0 if all pipes are created.
361 */
362 static
363 int init_thread_quit_pipe(void)
364 {
365 int ret;
366
367 ret = utils_create_pipe_cloexec(thread_quit_pipe);
368
369 return ret;
370 }
371
372 /*
373 * Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
374 */
375 static
376 int create_thread_poll_set(struct lttng_poll_event *events, int size)
377 {
378 int ret;
379
380 if (events == NULL || size == 0) {
381 ret = -1;
382 goto error;
383 }
384
385 ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
386 if (ret < 0) {
387 goto error;
388 }
389
390 /* Add quit pipe */
391 ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN);
392 if (ret < 0) {
393 goto error;
394 }
395
396 return 0;
397
398 error:
399 return ret;
400 }
401
402 /*
403 * Check if the thread quit pipe was triggered.
404 *
405 * Return 1 if it was triggered else 0;
406 */
407 static
408 int check_thread_quit_pipe(int fd, uint32_t events)
409 {
410 if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
411 return 1;
412 }
413
414 return 0;
415 }
416
417 /*
418 * Create and init socket from uri.
419 */
420 static
421 struct lttcomm_sock *relay_init_sock(struct lttng_uri *uri)
422 {
423 int ret;
424 struct lttcomm_sock *sock = NULL;
425
426 sock = lttcomm_alloc_sock_from_uri(uri);
427 if (sock == NULL) {
428 ERR("Allocating socket");
429 goto error;
430 }
431
432 ret = lttcomm_create_sock(sock);
433 if (ret < 0) {
434 goto error;
435 }
436 DBG("Listening on sock %d", sock->fd);
437
438 ret = sock->ops->bind(sock);
439 if (ret < 0) {
440 goto error;
441 }
442
443 ret = sock->ops->listen(sock, -1);
444 if (ret < 0) {
445 goto error;
446
447 }
448
449 return sock;
450
451 error:
452 if (sock) {
453 lttcomm_destroy_sock(sock);
454 }
455 return NULL;
456 }
457
458 /*
459 * Return nonzero if stream needs to be closed.
460 */
461 static
462 int close_stream_check(struct relay_stream *stream)
463 {
464
465 if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
466 /*
467 * We are about to close the stream so set the data pending flag to 1
468 * which will make the end data pending command skip the stream which
469 * is now closed and ready. Note that after proceeding to a file close,
470 * the written file is ready for reading.
471 */
472 stream->data_pending_check_done = 1;
473 return 1;
474 }
475 return 0;
476 }
477
478 /*
479 * This thread manages the listening for new connections on the network
480 */
481 static
482 void *relay_thread_listener(void *data)
483 {
484 int i, ret, pollfd, err = -1;
485 int val = 1;
486 uint32_t revents, nb_fd;
487 struct lttng_poll_event events;
488 struct lttcomm_sock *control_sock, *data_sock;
489
490 DBG("[thread] Relay listener started");
491
492 control_sock = relay_init_sock(control_uri);
493 if (!control_sock) {
494 goto error_sock_control;
495 }
496
497 data_sock = relay_init_sock(data_uri);
498 if (!data_sock) {
499 goto error_sock_relay;
500 }
501
502 /*
503 * Pass 3 as size here for the thread quit pipe, control and data socket.
504 */
505 ret = create_thread_poll_set(&events, 3);
506 if (ret < 0) {
507 goto error_create_poll;
508 }
509
510 /* Add the control socket */
511 ret = lttng_poll_add(&events, control_sock->fd, LPOLLIN | LPOLLRDHUP);
512 if (ret < 0) {
513 goto error_poll_add;
514 }
515
516 /* Add the data socket */
517 ret = lttng_poll_add(&events, data_sock->fd, LPOLLIN | LPOLLRDHUP);
518 if (ret < 0) {
519 goto error_poll_add;
520 }
521
522 while (1) {
523 DBG("Listener accepting connections");
524
525 restart:
526 ret = lttng_poll_wait(&events, -1);
527 if (ret < 0) {
528 /*
529 * Restart interrupted system call.
530 */
531 if (errno == EINTR) {
532 goto restart;
533 }
534 goto error;
535 }
536
537 nb_fd = ret;
538
539 DBG("Relay new connection received");
540 for (i = 0; i < nb_fd; i++) {
541 /* Fetch once the poll data */
542 revents = LTTNG_POLL_GETEV(&events, i);
543 pollfd = LTTNG_POLL_GETFD(&events, i);
544
545 /* Thread quit pipe has been closed. Killing thread. */
546 ret = check_thread_quit_pipe(pollfd, revents);
547 if (ret) {
548 err = 0;
549 goto exit;
550 }
551
552 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
553 ERR("socket poll error");
554 goto error;
555 } else if (revents & LPOLLIN) {
556 /*
557 * Get allocated in this thread,
558 * enqueued to a global queue, dequeued
559 * and freed in the worker thread.
560 */
561 struct relay_command *relay_cmd;
562 struct lttcomm_sock *newsock;
563
564 relay_cmd = zmalloc(sizeof(struct relay_command));
565 if (relay_cmd == NULL) {
566 PERROR("relay command zmalloc");
567 goto error;
568 }
569
570 if (pollfd == data_sock->fd) {
571 newsock = data_sock->ops->accept(data_sock);
572 if (!newsock) {
573 PERROR("accepting data sock");
574 free(relay_cmd);
575 goto error;
576 }
577 relay_cmd->type = RELAY_DATA;
578 DBG("Relay data connection accepted, socket %d", newsock->fd);
579 } else {
580 assert(pollfd == control_sock->fd);
581 newsock = control_sock->ops->accept(control_sock);
582 if (!newsock) {
583 PERROR("accepting control sock");
584 free(relay_cmd);
585 goto error;
586 }
587 relay_cmd->type = RELAY_CONTROL;
588 DBG("Relay control connection accepted, socket %d", newsock->fd);
589 }
590 ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR,
591 &val, sizeof(int));
592 if (ret < 0) {
593 PERROR("setsockopt inet");
594 lttcomm_destroy_sock(newsock);
595 free(relay_cmd);
596 goto error;
597 }
598 relay_cmd->sock = newsock;
599 /*
600 * Lock free enqueue the request.
601 */
602 cds_wfq_enqueue(&relay_cmd_queue.queue, &relay_cmd->node);
603
604 /*
605 * Wake the dispatch queue futex. Implicit memory
606 * barrier with the exchange in cds_wfq_enqueue.
607 */
608 futex_nto1_wake(&relay_cmd_queue.futex);
609 }
610 }
611 }
612
613 exit:
614 error:
615 error_poll_add:
616 lttng_poll_clean(&events);
617 error_create_poll:
618 if (data_sock->fd >= 0) {
619 ret = data_sock->ops->close(data_sock);
620 if (ret) {
621 PERROR("close");
622 }
623 }
624 lttcomm_destroy_sock(data_sock);
625 error_sock_relay:
626 if (control_sock->fd >= 0) {
627 ret = control_sock->ops->close(control_sock);
628 if (ret) {
629 PERROR("close");
630 }
631 }
632 lttcomm_destroy_sock(control_sock);
633 error_sock_control:
634 if (err) {
635 DBG("Thread exited with error");
636 }
637 DBG("Relay listener thread cleanup complete");
638 stop_threads();
639 return NULL;
640 }
641
642 /*
643 * This thread manages the dispatching of the requests to worker threads
644 */
645 static
646 void *relay_thread_dispatcher(void *data)
647 {
648 int ret;
649 struct cds_wfq_node *node;
650 struct relay_command *relay_cmd = NULL;
651
652 DBG("[thread] Relay dispatcher started");
653
654 while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
655 /* Atomically prepare the queue futex */
656 futex_nto1_prepare(&relay_cmd_queue.futex);
657
658 do {
659 /* Dequeue commands */
660 node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
661 if (node == NULL) {
662 DBG("Woken up but nothing in the relay command queue");
663 /* Continue thread execution */
664 break;
665 }
666
667 relay_cmd = caa_container_of(node, struct relay_command, node);
668 DBG("Dispatching request waiting on sock %d", relay_cmd->sock->fd);
669
670 /*
671 * Inform worker thread of the new request. This
672 * call is blocking so we can be assured that the data will be read
673 * at some point in time or wait to the end of the world :)
674 */
675 do {
676 ret = write(relay_cmd_pipe[1], relay_cmd,
677 sizeof(struct relay_command));
678 } while (ret < 0 && errno == EINTR);
679 free(relay_cmd);
680 if (ret < 0 || ret != sizeof(struct relay_command)) {
681 PERROR("write cmd pipe");
682 goto error;
683 }
684 } while (node != NULL);
685
686 /* Futex wait on queue. Blocking call on futex() */
687 futex_nto1_wait(&relay_cmd_queue.futex);
688 }
689
690 error:
691 DBG("Dispatch thread dying");
692 stop_threads();
693 return NULL;
694 }
695
696 /*
697 * Get stream from stream id.
698 * Need to be called with RCU read-side lock held.
699 */
700 static
701 struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id,
702 struct lttng_ht *streams_ht)
703 {
704 struct lttng_ht_node_ulong *node;
705 struct lttng_ht_iter iter;
706 struct relay_stream *ret;
707
708 lttng_ht_lookup(streams_ht,
709 (void *)((unsigned long) stream_id),
710 &iter);
711 node = lttng_ht_iter_get_node_ulong(&iter);
712 if (node == NULL) {
713 DBG("Relay stream %" PRIu64 " not found", stream_id);
714 ret = NULL;
715 goto end;
716 }
717
718 ret = caa_container_of(node, struct relay_stream, stream_n);
719
720 end:
721 return ret;
722 }
723
724 static
725 void deferred_free_stream(struct rcu_head *head)
726 {
727 struct relay_stream *stream =
728 caa_container_of(head, struct relay_stream, rcu_node);
729 free(stream->path_name);
730 free(stream->channel_name);
731 free(stream);
732 }
733
734 /*
735 * relay_delete_session: Free all memory associated with a session and
736 * close all the FDs
737 */
738 static
739 void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht)
740 {
741 struct lttng_ht_iter iter;
742 struct lttng_ht_node_ulong *node;
743 struct relay_stream *stream;
744 int ret;
745
746 if (!cmd->session) {
747 return;
748 }
749
750 DBG("Relay deleting session %" PRIu64, cmd->session->id);
751
752 rcu_read_lock();
753 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, node, node) {
754 node = lttng_ht_iter_get_node_ulong(&iter);
755 if (node) {
756 stream = caa_container_of(node,
757 struct relay_stream, stream_n);
758 if (stream->session == cmd->session) {
759 ret = close(stream->fd);
760 if (ret < 0) {
761 PERROR("close stream fd on delete session");
762 }
763 ret = lttng_ht_del(streams_ht, &iter);
764 assert(!ret);
765 call_rcu(&stream->rcu_node,
766 deferred_free_stream);
767 }
768 /* Cleanup index of that stream. */
769 relay_index_destroy_by_stream_id(stream->stream_handle,
770 indexes_ht);
771 }
772 }
773 rcu_read_unlock();
774
775 free(cmd->session);
776 }
777
778 /*
779 * Copy index data from the control port to a given index object.
780 */
781 static void copy_index_control_data(struct relay_index *index,
782 struct lttcomm_relayd_index *data)
783 {
784 assert(index);
785 assert(data);
786
787 /*
788 * The index on disk is encoded in big endian, so we don't need to convert
789 * the data received on the network. The data_offset value is NEVER
790 * modified here and is updated by the data thread.
791 */
792 index->index_data.packet_size = data->packet_size;
793 index->index_data.content_size = data->content_size;
794 index->index_data.timestamp_begin = data->timestamp_begin;
795 index->index_data.timestamp_end = data->timestamp_end;
796 index->index_data.events_discarded = data->events_discarded;
797 index->index_data.stream_id = data->stream_id;
798 }
799
800 /*
801 * Handle the RELAYD_CREATE_SESSION command.
802 *
803 * On success, send back the session id or else return a negative value.
804 */
805 static
806 int relay_create_session(struct lttcomm_relayd_hdr *recv_hdr,
807 struct relay_command *cmd)
808 {
809 int ret = 0, send_ret;
810 struct relay_session *session;
811 struct lttcomm_relayd_status_session reply;
812
813 assert(recv_hdr);
814 assert(cmd);
815
816 memset(&reply, 0, sizeof(reply));
817
818 session = zmalloc(sizeof(struct relay_session));
819 if (session == NULL) {
820 PERROR("relay session zmalloc");
821 ret = -1;
822 goto error;
823 }
824
825 session->id = ++last_relay_session_id;
826 session->sock = cmd->sock;
827 cmd->session = session;
828
829 reply.session_id = htobe64(session->id);
830
831 DBG("Created session %" PRIu64, session->id);
832
833 error:
834 if (ret < 0) {
835 reply.ret_code = htobe32(LTTNG_ERR_FATAL);
836 } else {
837 reply.ret_code = htobe32(LTTNG_OK);
838 }
839
840 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
841 if (send_ret < 0) {
842 ERR("Relayd sending session id");
843 ret = send_ret;
844 }
845
846 return ret;
847 }
848
849 /*
850 * relay_add_stream: allocate a new stream for a session
851 */
852 static
853 int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
854 struct relay_command *cmd, struct lttng_ht *streams_ht)
855 {
856 struct relay_session *session = cmd->session;
857 struct relay_stream *stream = NULL;
858 struct lttcomm_relayd_status_stream reply;
859 int ret, send_ret;
860
861 if (!session || cmd->version_check_done == 0) {
862 ERR("Trying to add a stream before version check");
863 ret = -1;
864 goto end_no_session;
865 }
866
867 stream = zmalloc(sizeof(struct relay_stream));
868 if (stream == NULL) {
869 PERROR("relay stream zmalloc");
870 ret = -1;
871 goto end_no_session;
872 }
873
874 switch (cmd->minor) {
875 case 1: /* LTTng sessiond 2.1 */
876 ret = cmd_recv_stream_2_1(cmd, stream);
877 break;
878 case 2: /* LTTng sessiond 2.2 */
879 default:
880 ret = cmd_recv_stream_2_2(cmd, stream);
881 break;
882 }
883 if (ret < 0) {
884 goto err_free_stream;
885 }
886
887 rcu_read_lock();
888 stream->stream_handle = ++last_relay_stream_id;
889 stream->prev_seq = -1ULL;
890 stream->session = session;
891 stream->index_fd = -1;
892
893 ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
894 if (ret < 0) {
895 ERR("relay creating output directory");
896 goto end;
897 }
898
899 /*
900 * No need to use run_as API here because whatever we receives, the relayd
901 * uses its own credentials for the stream files.
902 */
903 ret = utils_create_stream_file(stream->path_name, stream->channel_name,
904 stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
905 if (ret < 0) {
906 ERR("Create output file");
907 goto end;
908 }
909 stream->fd = ret;
910 if (stream->tracefile_size) {
911 DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name);
912 } else {
913 DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name);
914 }
915
916 lttng_ht_node_init_ulong(&stream->stream_n,
917 (unsigned long) stream->stream_handle);
918 lttng_ht_add_unique_ulong(streams_ht,
919 &stream->stream_n);
920
921 DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
922 stream->stream_handle);
923
924 end:
925 reply.handle = htobe64(stream->stream_handle);
926 /* send the session id to the client or a negative return code on error */
927 if (ret < 0) {
928 reply.ret_code = htobe32(LTTNG_ERR_UNK);
929 /* stream was not properly added to the ht, so free it */
930 free(stream);
931 } else {
932 reply.ret_code = htobe32(LTTNG_OK);
933 }
934
935 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
936 sizeof(struct lttcomm_relayd_status_stream), 0);
937 if (send_ret < 0) {
938 ERR("Relay sending stream id");
939 ret = send_ret;
940 }
941 rcu_read_unlock();
942
943 end_no_session:
944 return ret;
945
946 err_free_stream:
947 free(stream->path_name);
948 free(stream->channel_name);
949 free(stream);
950 return ret;
951 }
952
953 /*
954 * relay_close_stream: close a specific stream
955 */
956 static
957 int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
958 struct relay_command *cmd, struct lttng_ht *streams_ht)
959 {
960 struct relay_session *session = cmd->session;
961 struct lttcomm_relayd_close_stream stream_info;
962 struct lttcomm_relayd_generic_reply reply;
963 struct relay_stream *stream;
964 int ret, send_ret;
965 struct lttng_ht_iter iter;
966
967 DBG("Close stream received");
968
969 if (!session || cmd->version_check_done == 0) {
970 ERR("Trying to close a stream before version check");
971 ret = -1;
972 goto end_no_session;
973 }
974
975 ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
976 sizeof(struct lttcomm_relayd_close_stream), 0);
977 if (ret < sizeof(struct lttcomm_relayd_close_stream)) {
978 if (ret == 0) {
979 /* Orderly shutdown. Not necessary to print an error. */
980 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
981 } else {
982 ERR("Relay didn't receive valid add_stream struct size : %d", ret);
983 }
984 ret = -1;
985 goto end_no_session;
986 }
987
988 rcu_read_lock();
989 stream = relay_stream_from_stream_id(be64toh(stream_info.stream_id),
990 streams_ht);
991 if (!stream) {
992 ret = -1;
993 goto end_unlock;
994 }
995
996 stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
997 stream->close_flag = 1;
998
999 if (close_stream_check(stream)) {
1000 int delret;
1001
1002 delret = close(stream->fd);
1003 if (delret < 0) {
1004 PERROR("close stream");
1005 }
1006
1007 if (stream->index_fd >= 0) {
1008 delret = close(stream->index_fd);
1009 if (delret < 0) {
1010 PERROR("close stream index_fd");
1011 }
1012 }
1013 iter.iter.node = &stream->stream_n.node;
1014 delret = lttng_ht_del(streams_ht, &iter);
1015 assert(!delret);
1016 call_rcu(&stream->rcu_node,
1017 deferred_free_stream);
1018 DBG("Closed tracefile %d from close stream", stream->fd);
1019 }
1020
1021 end_unlock:
1022 rcu_read_unlock();
1023
1024 if (ret < 0) {
1025 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1026 } else {
1027 reply.ret_code = htobe32(LTTNG_OK);
1028 }
1029 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1030 sizeof(struct lttcomm_relayd_generic_reply), 0);
1031 if (send_ret < 0) {
1032 ERR("Relay sending stream id");
1033 ret = send_ret;
1034 }
1035
1036 end_no_session:
1037 return ret;
1038 }
1039
1040 /*
1041 * relay_unknown_command: send -1 if received unknown command
1042 */
1043 static
1044 void relay_unknown_command(struct relay_command *cmd)
1045 {
1046 struct lttcomm_relayd_generic_reply reply;
1047 int ret;
1048
1049 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1050 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1051 sizeof(struct lttcomm_relayd_generic_reply), 0);
1052 if (ret < 0) {
1053 ERR("Relay sending unknown command");
1054 }
1055 }
1056
1057 /*
1058 * relay_start: send an acknowledgment to the client to tell if we are
1059 * ready to receive data. We are ready if a session is established.
1060 */
1061 static
1062 int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
1063 struct relay_command *cmd)
1064 {
1065 int ret = htobe32(LTTNG_OK);
1066 struct lttcomm_relayd_generic_reply reply;
1067 struct relay_session *session = cmd->session;
1068
1069 if (!session) {
1070 DBG("Trying to start the streaming without a session established");
1071 ret = htobe32(LTTNG_ERR_UNK);
1072 }
1073
1074 reply.ret_code = ret;
1075 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1076 sizeof(struct lttcomm_relayd_generic_reply), 0);
1077 if (ret < 0) {
1078 ERR("Relay sending start ack");
1079 }
1080
1081 return ret;
1082 }
1083
1084 /*
1085 * Append padding to the file pointed by the file descriptor fd.
1086 */
1087 static int write_padding_to_file(int fd, uint32_t size)
1088 {
1089 int ret = 0;
1090 char *zeros;
1091
1092 if (size == 0) {
1093 goto end;
1094 }
1095
1096 zeros = zmalloc(size);
1097 if (zeros == NULL) {
1098 PERROR("zmalloc zeros for padding");
1099 ret = -1;
1100 goto end;
1101 }
1102
1103 do {
1104 ret = write(fd, zeros, size);
1105 } while (ret < 0 && errno == EINTR);
1106 if (ret < 0 || ret != size) {
1107 PERROR("write padding to file");
1108 }
1109
1110 free(zeros);
1111
1112 end:
1113 return ret;
1114 }
1115
1116 /*
1117 * relay_recv_metadata: receive the metada for the session.
1118 */
1119 static
1120 int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
1121 struct relay_command *cmd, struct lttng_ht *streams_ht)
1122 {
1123 int ret = htobe32(LTTNG_OK);
1124 struct relay_session *session = cmd->session;
1125 struct lttcomm_relayd_metadata_payload *metadata_struct;
1126 struct relay_stream *metadata_stream;
1127 uint64_t data_size, payload_size;
1128
1129 if (!session) {
1130 ERR("Metadata sent before version check");
1131 ret = -1;
1132 goto end;
1133 }
1134
1135 data_size = payload_size = be64toh(recv_hdr->data_size);
1136 if (data_size < sizeof(struct lttcomm_relayd_metadata_payload)) {
1137 ERR("Incorrect data size");
1138 ret = -1;
1139 goto end;
1140 }
1141 payload_size -= sizeof(struct lttcomm_relayd_metadata_payload);
1142
1143 if (data_buffer_size < data_size) {
1144 /* In case the realloc fails, we can free the memory */
1145 char *tmp_data_ptr;
1146
1147 tmp_data_ptr = realloc(data_buffer, data_size);
1148 if (!tmp_data_ptr) {
1149 ERR("Allocating data buffer");
1150 free(data_buffer);
1151 ret = -1;
1152 goto end;
1153 }
1154 data_buffer = tmp_data_ptr;
1155 data_buffer_size = data_size;
1156 }
1157 memset(data_buffer, 0, data_size);
1158 DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
1159 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0);
1160 if (ret < 0 || ret != data_size) {
1161 if (ret == 0) {
1162 /* Orderly shutdown. Not necessary to print an error. */
1163 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1164 } else {
1165 ERR("Relay didn't receive the whole metadata");
1166 }
1167 ret = -1;
1168 goto end;
1169 }
1170 metadata_struct = (struct lttcomm_relayd_metadata_payload *) data_buffer;
1171
1172 rcu_read_lock();
1173 metadata_stream = relay_stream_from_stream_id(
1174 be64toh(metadata_struct->stream_id), streams_ht);
1175 if (!metadata_stream) {
1176 ret = -1;
1177 goto end_unlock;
1178 }
1179
1180 do {
1181 ret = write(metadata_stream->fd, metadata_struct->payload,
1182 payload_size);
1183 } while (ret < 0 && errno == EINTR);
1184 if (ret < 0 || ret != payload_size) {
1185 ERR("Relay error writing metadata on file");
1186 ret = -1;
1187 goto end_unlock;
1188 }
1189
1190 ret = write_padding_to_file(metadata_stream->fd,
1191 be32toh(metadata_struct->padding_size));
1192 if (ret < 0) {
1193 goto end_unlock;
1194 }
1195
1196 DBG2("Relay metadata written");
1197
1198 end_unlock:
1199 rcu_read_unlock();
1200 end:
1201 return ret;
1202 }
1203
1204 /*
1205 * relay_send_version: send relayd version number
1206 */
1207 static
1208 int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
1209 struct relay_command *cmd, struct lttng_ht *streams_ht)
1210 {
1211 int ret;
1212 struct lttcomm_relayd_version reply, msg;
1213
1214 assert(cmd);
1215
1216 cmd->version_check_done = 1;
1217
1218 /* Get version from the other side. */
1219 ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
1220 if (ret < 0 || ret != sizeof(msg)) {
1221 if (ret == 0) {
1222 /* Orderly shutdown. Not necessary to print an error. */
1223 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1224 } else {
1225 ERR("Relay failed to receive the version values.");
1226 }
1227 ret = -1;
1228 goto end;
1229 }
1230
1231 reply.major = RELAYD_VERSION_COMM_MAJOR;
1232 reply.minor = RELAYD_VERSION_COMM_MINOR;
1233
1234 /* Major versions must be the same */
1235 if (reply.major != be32toh(msg.major)) {
1236 DBG("Incompatible major versions (%u vs %u), deleting session",
1237 reply.major, be32toh(msg.major));
1238 relay_delete_session(cmd, streams_ht);
1239 ret = 0;
1240 goto end;
1241 }
1242
1243 cmd->major = reply.major;
1244 /* We adapt to the lowest compatible version */
1245 if (reply.minor <= be32toh(msg.minor)) {
1246 cmd->minor = reply.minor;
1247 } else {
1248 cmd->minor = be32toh(msg.minor);
1249 }
1250
1251 reply.major = htobe32(reply.major);
1252 reply.minor = htobe32(reply.minor);
1253 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
1254 sizeof(struct lttcomm_relayd_version), 0);
1255 if (ret < 0) {
1256 ERR("Relay sending version");
1257 }
1258
1259 DBG("Version check done using protocol %u.%u", cmd->major,
1260 cmd->minor);
1261
1262 end:
1263 return ret;
1264 }
1265
1266 /*
1267 * Check for data pending for a given stream id from the session daemon.
1268 */
1269 static
1270 int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
1271 struct relay_command *cmd, struct lttng_ht *streams_ht)
1272 {
1273 struct relay_session *session = cmd->session;
1274 struct lttcomm_relayd_data_pending msg;
1275 struct lttcomm_relayd_generic_reply reply;
1276 struct relay_stream *stream;
1277 int ret;
1278 uint64_t last_net_seq_num, stream_id;
1279
1280 DBG("Data pending command received");
1281
1282 if (!session || cmd->version_check_done == 0) {
1283 ERR("Trying to check for data before version check");
1284 ret = -1;
1285 goto end_no_session;
1286 }
1287
1288 ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
1289 if (ret < sizeof(msg)) {
1290 if (ret == 0) {
1291 /* Orderly shutdown. Not necessary to print an error. */
1292 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1293 } else {
1294 ERR("Relay didn't receive valid data_pending struct size : %d",
1295 ret);
1296 }
1297 ret = -1;
1298 goto end_no_session;
1299 }
1300
1301 stream_id = be64toh(msg.stream_id);
1302 last_net_seq_num = be64toh(msg.last_net_seq_num);
1303
1304 rcu_read_lock();
1305 stream = relay_stream_from_stream_id(stream_id, streams_ht);
1306 if (stream == NULL) {
1307 ret = -1;
1308 goto end_unlock;
1309 }
1310
1311 DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64
1312 " and last_seq %" PRIu64, stream_id, stream->prev_seq,
1313 last_net_seq_num);
1314
1315 /* Avoid wrapping issue */
1316 if (((int64_t) (stream->prev_seq - last_net_seq_num)) >= 0) {
1317 /* Data has in fact been written and is NOT pending */
1318 ret = 0;
1319 } else {
1320 /* Data still being streamed thus pending */
1321 ret = 1;
1322 }
1323
1324 /* Pending check is now done. */
1325 stream->data_pending_check_done = 1;
1326
1327 end_unlock:
1328 rcu_read_unlock();
1329
1330 reply.ret_code = htobe32(ret);
1331 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
1332 if (ret < 0) {
1333 ERR("Relay data pending ret code failed");
1334 }
1335
1336 end_no_session:
1337 return ret;
1338 }
1339
1340 /*
1341 * Wait for the control socket to reach a quiescent state.
1342 *
1343 * Note that for now, when receiving this command from the session daemon, this
1344 * means that every subsequent commands or data received on the control socket
1345 * has been handled. So, this is why we simply return OK here.
1346 */
1347 static
1348 int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
1349 struct relay_command *cmd, struct lttng_ht *streams_ht)
1350 {
1351 int ret;
1352 uint64_t stream_id;
1353 struct relay_stream *stream;
1354 struct lttng_ht_iter iter;
1355 struct lttcomm_relayd_quiescent_control msg;
1356 struct lttcomm_relayd_generic_reply reply;
1357
1358 DBG("Checking quiescent state on control socket");
1359
1360 if (!cmd->session || cmd->version_check_done == 0) {
1361 ERR("Trying to check for data before version check");
1362 ret = -1;
1363 goto end_no_session;
1364 }
1365
1366 ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
1367 if (ret < sizeof(msg)) {
1368 if (ret == 0) {
1369 /* Orderly shutdown. Not necessary to print an error. */
1370 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1371 } else {
1372 ERR("Relay didn't receive valid begin data_pending struct size: %d",
1373 ret);
1374 }
1375 ret = -1;
1376 goto end_no_session;
1377 }
1378
1379 stream_id = be64toh(msg.stream_id);
1380
1381 rcu_read_lock();
1382 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
1383 if (stream->stream_handle == stream_id) {
1384 stream->data_pending_check_done = 1;
1385 DBG("Relay quiescent control pending flag set to %" PRIu64,
1386 stream_id);
1387 break;
1388 }
1389 }
1390 rcu_read_unlock();
1391
1392 reply.ret_code = htobe32(LTTNG_OK);
1393 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
1394 if (ret < 0) {
1395 ERR("Relay data quiescent control ret code failed");
1396 }
1397
1398 end_no_session:
1399 return ret;
1400 }
1401
1402 /*
1403 * Initialize a data pending command. This means that a client is about to ask
1404 * for data pending for each stream he/she holds. Simply iterate over all
1405 * streams of a session and set the data_pending_check_done flag.
1406 *
1407 * This command returns to the client a LTTNG_OK code.
1408 */
1409 static
1410 int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
1411 struct relay_command *cmd, struct lttng_ht *streams_ht)
1412 {
1413 int ret;
1414 struct lttng_ht_iter iter;
1415 struct lttcomm_relayd_begin_data_pending msg;
1416 struct lttcomm_relayd_generic_reply reply;
1417 struct relay_stream *stream;
1418 uint64_t session_id;
1419
1420 assert(recv_hdr);
1421 assert(cmd);
1422 assert(streams_ht);
1423
1424 DBG("Init streams for data pending");
1425
1426 if (!cmd->session || cmd->version_check_done == 0) {
1427 ERR("Trying to check for data before version check");
1428 ret = -1;
1429 goto end_no_session;
1430 }
1431
1432 ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
1433 if (ret < sizeof(msg)) {
1434 if (ret == 0) {
1435 /* Orderly shutdown. Not necessary to print an error. */
1436 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1437 } else {
1438 ERR("Relay didn't receive valid begin data_pending struct size: %d",
1439 ret);
1440 }
1441 ret = -1;
1442 goto end_no_session;
1443 }
1444
1445 session_id = be64toh(msg.session_id);
1446
1447 /*
1448 * Iterate over all streams to set the begin data pending flag. For now, the
1449 * streams are indexed by stream handle so we have to iterate over all
1450 * streams to find the one associated with the right session_id.
1451 */
1452 rcu_read_lock();
1453 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
1454 if (stream->session->id == session_id) {
1455 stream->data_pending_check_done = 0;
1456 DBG("Set begin data pending flag to stream %" PRIu64,
1457 stream->stream_handle);
1458 }
1459 }
1460 rcu_read_unlock();
1461
1462 /* All good, send back reply. */
1463 reply.ret_code = htobe32(LTTNG_OK);
1464
1465 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
1466 if (ret < 0) {
1467 ERR("Relay begin data pending send reply failed");
1468 }
1469
1470 end_no_session:
1471 return ret;
1472 }
1473
1474 /*
1475 * End data pending command. This will check, for a given session id, if each
1476 * stream associated with it has its data_pending_check_done flag set. If not,
1477 * this means that the client lost track of the stream but the data is still
1478 * being streamed on our side. In this case, we inform the client that data is
1479 * inflight.
1480 *
1481 * Return to the client if there is data in flight or not with a ret_code.
1482 */
1483 static
1484 int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
1485 struct relay_command *cmd, struct lttng_ht *streams_ht)
1486 {
1487 int ret;
1488 struct lttng_ht_iter iter;
1489 struct lttcomm_relayd_end_data_pending msg;
1490 struct lttcomm_relayd_generic_reply reply;
1491 struct relay_stream *stream;
1492 uint64_t session_id;
1493 uint32_t is_data_inflight = 0;
1494
1495 assert(recv_hdr);
1496 assert(cmd);
1497 assert(streams_ht);
1498
1499 DBG("End data pending command");
1500
1501 if (!cmd->session || cmd->version_check_done == 0) {
1502 ERR("Trying to check for data before version check");
1503 ret = -1;
1504 goto end_no_session;
1505 }
1506
1507 ret = cmd->sock->ops->recvmsg(cmd->sock, &msg, sizeof(msg), 0);
1508 if (ret < sizeof(msg)) {
1509 if (ret == 0) {
1510 /* Orderly shutdown. Not necessary to print an error. */
1511 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1512 } else {
1513 ERR("Relay didn't receive valid end data_pending struct size: %d",
1514 ret);
1515 }
1516 ret = -1;
1517 goto end_no_session;
1518 }
1519
1520 session_id = be64toh(msg.session_id);
1521
1522 /* Iterate over all streams to see if the begin data pending flag is set. */
1523 rcu_read_lock();
1524 cds_lfht_for_each_entry(streams_ht->ht, &iter.iter, stream, stream_n.node) {
1525 if (stream->session->id == session_id &&
1526 !stream->data_pending_check_done) {
1527 is_data_inflight = 1;
1528 DBG("Data is still in flight for stream %" PRIu64,
1529 stream->stream_handle);
1530 break;
1531 }
1532 }
1533 rcu_read_unlock();
1534
1535 /* All good, send back reply. */
1536 reply.ret_code = htobe32(is_data_inflight);
1537
1538 ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
1539 if (ret < 0) {
1540 ERR("Relay end data pending send reply failed");
1541 }
1542
1543 end_no_session:
1544 return ret;
1545 }
1546
1547 /*
1548 * Receive an index for a specific stream.
1549 *
1550 * Return 0 on success else a negative value.
1551 */
1552 static
1553 int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
1554 struct relay_command *cmd, struct lttng_ht *streams_ht,
1555 struct lttng_ht *indexes_ht)
1556 {
1557 int ret, send_ret, index_created = 0;
1558 struct relay_session *session = cmd->session;
1559 struct lttcomm_relayd_index index_info;
1560 struct relay_index *index, *wr_index = NULL;
1561 struct lttcomm_relayd_generic_reply reply;
1562 struct relay_stream *stream;
1563 uint64_t net_seq_num;
1564
1565 assert(cmd);
1566 assert(streams_ht);
1567 assert(indexes_ht);
1568
1569 DBG("Relay receiving index");
1570
1571 if (!session || cmd->version_check_done == 0) {
1572 ERR("Trying to close a stream before version check");
1573 ret = -1;
1574 goto end_no_session;
1575 }
1576
1577 ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info,
1578 sizeof(index_info), 0);
1579 if (ret < sizeof(index_info)) {
1580 if (ret == 0) {
1581 /* Orderly shutdown. Not necessary to print an error. */
1582 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1583 } else {
1584 ERR("Relay didn't receive valid index struct size : %d", ret);
1585 }
1586 ret = -1;
1587 goto end_no_session;
1588 }
1589
1590 net_seq_num = be64toh(index_info.net_seq_num);
1591
1592 rcu_read_lock();
1593 stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id),
1594 streams_ht);
1595 if (!stream) {
1596 ret = -1;
1597 goto end_rcu_unlock;
1598 }
1599
1600 index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht);
1601 if (!index) {
1602 /* A successful creation will add the object to the HT. */
1603 index = relay_index_create(stream->stream_handle, net_seq_num);
1604 if (!index) {
1605 goto end_rcu_unlock;
1606 }
1607 index_created = 1;
1608 }
1609
1610 copy_index_control_data(index, &index_info);
1611
1612 if (index_created) {
1613 /*
1614 * Try to add the relay index object to the hash table. If an object
1615 * already exist, destroy back the index created, set the data in this
1616 * object and write it on disk.
1617 */
1618 relay_index_add(index, indexes_ht, &wr_index);
1619 if (wr_index) {
1620 copy_index_control_data(wr_index, &index_info);
1621 free(index);
1622 }
1623 } else {
1624 /* The index already exists so write it on disk. */
1625 wr_index = index;
1626 }
1627
1628 /* Do we have a writable ready index to write on disk. */
1629 if (wr_index) {
1630 /* Starting at 2.4, create the index file if none available. */
1631 if (cmd->minor >= 4 && stream->index_fd < 0) {
1632 ret = index_create_file(stream->path_name, stream->channel_name,
1633 relayd_uid, relayd_gid, stream->tracefile_size,
1634 stream->tracefile_count_current);
1635 if (ret < 0) {
1636 goto end_rcu_unlock;
1637 }
1638 stream->index_fd = ret;
1639 }
1640
1641 ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
1642 if (ret < 0) {
1643 goto end_rcu_unlock;
1644 }
1645 }
1646
1647 end_rcu_unlock:
1648 rcu_read_unlock();
1649
1650 if (ret < 0) {
1651 reply.ret_code = htobe32(LTTNG_ERR_UNK);
1652 } else {
1653 reply.ret_code = htobe32(LTTNG_OK);
1654 }
1655 send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0);
1656 if (send_ret < 0) {
1657 ERR("Relay sending close index id reply");
1658 ret = send_ret;
1659 }
1660
1661 end_no_session:
1662 return ret;
1663 }
1664
1665 /*
1666 * relay_process_control: Process the commands received on the control socket
1667 */
1668 static
1669 int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
1670 struct relay_command *cmd, struct lttng_ht *streams_ht,
1671 struct lttng_ht *index_streams_ht,
1672 struct lttng_ht *indexes_ht)
1673 {
1674 int ret = 0;
1675
1676 switch (be32toh(recv_hdr->cmd)) {
1677 case RELAYD_CREATE_SESSION:
1678 ret = relay_create_session(recv_hdr, cmd);
1679 break;
1680 case RELAYD_ADD_STREAM:
1681 ret = relay_add_stream(recv_hdr, cmd, streams_ht);
1682 break;
1683 case RELAYD_START_DATA:
1684 ret = relay_start(recv_hdr, cmd);
1685 break;
1686 case RELAYD_SEND_METADATA:
1687 ret = relay_recv_metadata(recv_hdr, cmd, streams_ht);
1688 break;
1689 case RELAYD_VERSION:
1690 ret = relay_send_version(recv_hdr, cmd, streams_ht);
1691 break;
1692 case RELAYD_CLOSE_STREAM:
1693 ret = relay_close_stream(recv_hdr, cmd, streams_ht);
1694 break;
1695 case RELAYD_DATA_PENDING:
1696 ret = relay_data_pending(recv_hdr, cmd, streams_ht);
1697 break;
1698 case RELAYD_QUIESCENT_CONTROL:
1699 ret = relay_quiescent_control(recv_hdr, cmd, streams_ht);
1700 break;
1701 case RELAYD_BEGIN_DATA_PENDING:
1702 ret = relay_begin_data_pending(recv_hdr, cmd, streams_ht);
1703 break;
1704 case RELAYD_END_DATA_PENDING:
1705 ret = relay_end_data_pending(recv_hdr, cmd, streams_ht);
1706 break;
1707 case RELAYD_SEND_INDEX:
1708 ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht);
1709 break;
1710 case RELAYD_UPDATE_SYNC_INFO:
1711 default:
1712 ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
1713 relay_unknown_command(cmd);
1714 ret = -1;
1715 goto end;
1716 }
1717
1718 end:
1719 return ret;
1720 }
1721
1722 /*
1723 * relay_process_data: Process the data received on the data socket
1724 */
1725 static
1726 int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht,
1727 struct lttng_ht *indexes_ht)
1728 {
1729 int ret = 0, rotate_index = 0, index_created = 0;
1730 struct relay_stream *stream;
1731 struct relay_index *index, *wr_index = NULL;
1732 struct lttcomm_relayd_data_hdr data_hdr;
1733 uint64_t stream_id, data_offset;
1734 uint64_t net_seq_num;
1735 uint32_t data_size;
1736
1737 ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
1738 sizeof(struct lttcomm_relayd_data_hdr), 0);
1739 if (ret <= 0) {
1740 if (ret == 0) {
1741 /* Orderly shutdown. Not necessary to print an error. */
1742 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1743 } else {
1744 ERR("Unable to receive data header on sock %d", cmd->sock->fd);
1745 }
1746 ret = -1;
1747 goto end;
1748 }
1749
1750 stream_id = be64toh(data_hdr.stream_id);
1751
1752 rcu_read_lock();
1753 stream = relay_stream_from_stream_id(stream_id, streams_ht);
1754 if (!stream) {
1755 ret = -1;
1756 goto end_rcu_unlock;
1757 }
1758
1759 data_size = be32toh(data_hdr.data_size);
1760 if (data_buffer_size < data_size) {
1761 char *tmp_data_ptr;
1762
1763 tmp_data_ptr = realloc(data_buffer, data_size);
1764 if (!tmp_data_ptr) {
1765 ERR("Allocating data buffer");
1766 free(data_buffer);
1767 ret = -1;
1768 goto end_rcu_unlock;
1769 }
1770 data_buffer = tmp_data_ptr;
1771 data_buffer_size = data_size;
1772 }
1773 memset(data_buffer, 0, data_size);
1774
1775 net_seq_num = be64toh(data_hdr.net_seq_num);
1776
1777 DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
1778 data_size, stream_id, net_seq_num);
1779 ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, 0);
1780 if (ret <= 0) {
1781 if (ret == 0) {
1782 /* Orderly shutdown. Not necessary to print an error. */
1783 DBG("Socket %d did an orderly shutdown", cmd->sock->fd);
1784 }
1785 ret = -1;
1786 goto end_rcu_unlock;
1787 }
1788
1789 /* Check if a rotation is needed. */
1790 if (stream->tracefile_size > 0 &&
1791 (stream->tracefile_size_current + data_size) >
1792 stream->tracefile_size) {
1793 ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
1794 stream->tracefile_size, stream->tracefile_count,
1795 relayd_uid, relayd_gid, stream->fd,
1796 &(stream->tracefile_count_current), &stream->fd);
1797 if (ret < 0) {
1798 ERR("Rotating stream output file");
1799 goto end_rcu_unlock;
1800 }
1801 /* Reset current size because we just perform a stream rotation. */
1802 stream->tracefile_size_current = 0;
1803 rotate_index = 1;
1804 }
1805
1806 /* Get data offset because we are about to update the index. */
1807 data_offset = htobe64(stream->tracefile_size_current);
1808
1809 /*
1810 * Lookup for an existing index for that stream id/sequence number. If on
1811 * exists, the control thread already received the data for it thus we need
1812 * to write it on disk.
1813 */
1814 index = relay_index_find(stream_id, net_seq_num, indexes_ht);
1815 if (!index) {
1816 /* A successful creation will add the object to the HT. */
1817 index = relay_index_create(stream->stream_handle, net_seq_num);
1818 if (!index) {
1819 goto end_rcu_unlock;
1820 }
1821 index_created = 1;
1822 }
1823
1824 if (rotate_index || stream->index_fd < 0) {
1825 index->to_close_fd = stream->index_fd;
1826 ret = index_create_file(stream->path_name, stream->channel_name,
1827 relayd_uid, relayd_gid, stream->tracefile_size,
1828 stream->tracefile_count_current);
1829 if (ret < 0) {
1830 /* This will close the stream's index fd if one. */
1831 relay_index_free_safe(index);
1832 goto end_rcu_unlock;
1833 }
1834 stream->index_fd = ret;
1835 }
1836 index->fd = stream->index_fd;
1837 index->index_data.offset = data_offset;
1838
1839 if (index_created) {
1840 /*
1841 * Try to add the relay index object to the hash table. If an object
1842 * already exist, destroy back the index created and set the data.
1843 */
1844 relay_index_add(index, indexes_ht, &wr_index);
1845 if (wr_index) {
1846 /* Copy back data from the created index. */
1847 wr_index->fd = index->fd;
1848 wr_index->to_close_fd = index->to_close_fd;
1849 wr_index->index_data.offset = data_offset;
1850 free(index);
1851 }
1852 } else {
1853 /* The index already exists so write it on disk. */
1854 wr_index = index;
1855 }
1856
1857 /* Do we have a writable ready index to write on disk. */
1858 if (wr_index) {
1859 /* Starting at 2.4, create the index file if none available. */
1860 if (cmd->minor >= 4 && stream->index_fd < 0) {
1861 ret = index_create_file(stream->path_name, stream->channel_name,
1862 relayd_uid, relayd_gid, stream->tracefile_size,
1863 stream->tracefile_count_current);
1864 if (ret < 0) {
1865 goto end_rcu_unlock;
1866 }
1867 stream->index_fd = ret;
1868 }
1869
1870 ret = relay_index_write(wr_index->fd, wr_index, indexes_ht);
1871 if (ret < 0) {
1872 goto end_rcu_unlock;
1873 }
1874 }
1875
1876 do {
1877 ret = write(stream->fd, data_buffer, data_size);
1878 } while (ret < 0 && errno == EINTR);
1879 if (ret < 0 || ret != data_size) {
1880 ERR("Relay error writing data to file");
1881 ret = -1;
1882 goto end_rcu_unlock;
1883 }
1884
1885 DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
1886 ret, stream->stream_handle);
1887
1888 ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size));
1889 if (ret < 0) {
1890 goto end_rcu_unlock;
1891 }
1892 stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size);
1893
1894 stream->prev_seq = net_seq_num;
1895
1896 /* Check if we need to close the FD */
1897 if (close_stream_check(stream)) {
1898 int cret;
1899 struct lttng_ht_iter iter;
1900
1901 cret = close(stream->fd);
1902 if (cret < 0) {
1903 PERROR("close stream process data");
1904 }
1905
1906 cret = close(stream->index_fd);
1907 if (cret < 0) {
1908 PERROR("close stream index_fd");
1909 }
1910 iter.iter.node = &stream->stream_n.node;
1911 ret = lttng_ht_del(streams_ht, &iter);
1912 assert(!ret);
1913 call_rcu(&stream->rcu_node,
1914 deferred_free_stream);
1915 DBG("Closed tracefile %d after recv data", stream->fd);
1916 }
1917
1918 end_rcu_unlock:
1919 rcu_read_unlock();
1920 end:
1921 return ret;
1922 }
1923
1924 static
1925 void relay_cleanup_poll_connection(struct lttng_poll_event *events, int pollfd)
1926 {
1927 int ret;
1928
1929 lttng_poll_del(events, pollfd);
1930
1931 ret = close(pollfd);
1932 if (ret < 0) {
1933 ERR("Closing pollfd %d", pollfd);
1934 }
1935 }
1936
1937 static
1938 int relay_add_connection(int fd, struct lttng_poll_event *events,
1939 struct lttng_ht *relay_connections_ht)
1940 {
1941 struct relay_command *relay_connection;
1942 int ret;
1943
1944 relay_connection = zmalloc(sizeof(struct relay_command));
1945 if (relay_connection == NULL) {
1946 PERROR("Relay command zmalloc");
1947 goto error;
1948 }
1949 do {
1950 ret = read(fd, relay_connection, sizeof(struct relay_command));
1951 } while (ret < 0 && errno == EINTR);
1952 if (ret < 0 || ret < sizeof(struct relay_command)) {
1953 PERROR("read relay cmd pipe");
1954 goto error_read;
1955 }
1956
1957 lttng_ht_node_init_ulong(&relay_connection->sock_n,
1958 (unsigned long) relay_connection->sock->fd);
1959 rcu_read_lock();
1960 lttng_ht_add_unique_ulong(relay_connections_ht,
1961 &relay_connection->sock_n);
1962 rcu_read_unlock();
1963 return lttng_poll_add(events,
1964 relay_connection->sock->fd,
1965 LPOLLIN | LPOLLRDHUP);
1966
1967 error_read:
1968 free(relay_connection);
1969 error:
1970 return -1;
1971 }
1972
1973 static
1974 void deferred_free_connection(struct rcu_head *head)
1975 {
1976 struct relay_command *relay_connection =
1977 caa_container_of(head, struct relay_command, rcu_node);
1978
1979 lttcomm_destroy_sock(relay_connection->sock);
1980 free(relay_connection);
1981 }
1982
1983 static
1984 void relay_del_connection(struct lttng_ht *relay_connections_ht,
1985 struct lttng_ht *streams_ht, struct lttng_ht_iter *iter,
1986 struct relay_command *relay_connection)
1987 {
1988 int ret;
1989
1990 ret = lttng_ht_del(relay_connections_ht, iter);
1991 assert(!ret);
1992 if (relay_connection->type == RELAY_CONTROL) {
1993 relay_delete_session(relay_connection, streams_ht);
1994 }
1995
1996 call_rcu(&relay_connection->rcu_node,
1997 deferred_free_connection);
1998 }
1999
2000 /*
2001 * This thread does the actual work
2002 */
2003 static
2004 void *relay_thread_worker(void *data)
2005 {
2006 int ret, err = -1, last_seen_data_fd = -1;
2007 uint32_t nb_fd;
2008 struct relay_command *relay_connection;
2009 struct lttng_poll_event events;
2010 struct lttng_ht *relay_connections_ht;
2011 struct lttng_ht_node_ulong *node;
2012 struct lttng_ht_iter iter;
2013 struct lttng_ht *streams_ht;
2014 struct lttng_ht *index_streams_ht;
2015 struct lttcomm_relayd_hdr recv_hdr;
2016
2017 DBG("[thread] Relay worker started");
2018
2019 rcu_register_thread();
2020
2021 /* table of connections indexed on socket */
2022 relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2023 if (!relay_connections_ht) {
2024 goto relay_connections_ht_error;
2025 }
2026
2027 /* tables of streams indexed by stream ID */
2028 streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
2029 if (!streams_ht) {
2030 goto streams_ht_error;
2031 }
2032
2033 /* Tables of received indexes indexed by index handle and net_seq_num. */
2034 indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64);
2035 if (!indexes_ht) {
2036 goto indexes_ht_error;
2037 }
2038
2039 ret = create_thread_poll_set(&events, 2);
2040 if (ret < 0) {
2041 goto error_poll_create;
2042 }
2043
2044 ret = lttng_poll_add(&events, relay_cmd_pipe[0], LPOLLIN | LPOLLRDHUP);
2045 if (ret < 0) {
2046 goto error;
2047 }
2048
2049 restart:
2050 while (1) {
2051 int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
2052
2053 /* Infinite blocking call, waiting for transmission */
2054 DBG3("Relayd worker thread polling...");
2055 ret = lttng_poll_wait(&events, -1);
2056 if (ret < 0) {
2057 /*
2058 * Restart interrupted system call.
2059 */
2060 if (errno == EINTR) {
2061 goto restart;
2062 }
2063 goto error;
2064 }
2065
2066 nb_fd = ret;
2067
2068 /*
2069 * Process control. The control connection is prioritised so we don't
2070 * starve it with high throughout put tracing data on the data
2071 * connection.
2072 */
2073 for (i = 0; i < nb_fd; i++) {
2074 /* Fetch once the poll data */
2075 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
2076 int pollfd = LTTNG_POLL_GETFD(&events, i);
2077
2078 /* Thread quit pipe has been closed. Killing thread. */
2079 ret = check_thread_quit_pipe(pollfd, revents);
2080 if (ret) {
2081 err = 0;
2082 goto exit;
2083 }
2084
2085 /* Inspect the relay cmd pipe for new connection */
2086 if (pollfd == relay_cmd_pipe[0]) {
2087 if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
2088 ERR("Relay pipe error");
2089 goto error;
2090 } else if (revents & LPOLLIN) {
2091 DBG("Relay command received");
2092 ret = relay_add_connection(relay_cmd_pipe[0],
2093 &events, relay_connections_ht);
2094 if (ret < 0) {
2095 goto error;
2096 }
2097 }
2098 } else if (revents) {
2099 rcu_read_lock();
2100 lttng_ht_lookup(relay_connections_ht,
2101 (void *)((unsigned long) pollfd),
2102 &iter);
2103 node = lttng_ht_iter_get_node_ulong(&iter);
2104 if (node == NULL) {
2105 DBG2("Relay sock %d not found", pollfd);
2106 rcu_read_unlock();
2107 goto error;
2108 }
2109 relay_connection = caa_container_of(node,
2110 struct relay_command, sock_n);
2111
2112 if (revents & (LPOLLERR)) {
2113 ERR("POLL ERROR");
2114 relay_cleanup_poll_connection(&events, pollfd);
2115 relay_del_connection(relay_connections_ht,
2116 streams_ht, &iter,
2117 relay_connection);
2118 if (last_seen_data_fd == pollfd) {
2119 last_seen_data_fd = last_notdel_data_fd;
2120 }
2121 } else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
2122 DBG("Socket %d hung up", pollfd);
2123 relay_cleanup_poll_connection(&events, pollfd);
2124 relay_del_connection(relay_connections_ht,
2125 streams_ht, &iter,
2126 relay_connection);
2127 if (last_seen_data_fd == pollfd) {
2128 last_seen_data_fd = last_notdel_data_fd;
2129 }
2130 } else if (revents & LPOLLIN) {
2131 /* control socket */
2132 if (relay_connection->type == RELAY_CONTROL) {
2133 ret = relay_connection->sock->ops->recvmsg(
2134 relay_connection->sock, &recv_hdr,
2135 sizeof(struct lttcomm_relayd_hdr), 0);
2136 /* connection closed */
2137 if (ret <= 0) {
2138 relay_cleanup_poll_connection(&events, pollfd);
2139 relay_del_connection(relay_connections_ht,
2140 streams_ht, &iter,
2141 relay_connection);
2142 DBG("Control connection closed with %d", pollfd);
2143 } else {
2144 if (relay_connection->session) {
2145 DBG2("Relay worker receiving data for session : %" PRIu64,
2146 relay_connection->session->id);
2147 }
2148 ret = relay_process_control(&recv_hdr,
2149 relay_connection,
2150 streams_ht,
2151 index_streams_ht,
2152 indexes_ht);
2153 if (ret < 0) {
2154 /* Clear the session on error. */
2155 relay_cleanup_poll_connection(&events, pollfd);
2156 relay_del_connection(relay_connections_ht,
2157 streams_ht, &iter,
2158 relay_connection);
2159 DBG("Connection closed with %d", pollfd);
2160 }
2161 seen_control = 1;
2162 }
2163 } else {
2164 /*
2165 * Flag the last seen data fd not deleted. It will be
2166 * used as the last seen fd if any fd gets deleted in
2167 * this first loop.
2168 */
2169 last_notdel_data_fd = pollfd;
2170 }
2171 }
2172 rcu_read_unlock();
2173 }
2174 }
2175
2176 /*
2177 * The last loop handled a control request, go back to poll to make
2178 * sure we prioritise the control socket.
2179 */
2180 if (seen_control) {
2181 continue;
2182 }
2183
2184 if (last_seen_data_fd >= 0) {
2185 for (i = 0; i < nb_fd; i++) {
2186 int pollfd = LTTNG_POLL_GETFD(&events, i);
2187 if (last_seen_data_fd == pollfd) {
2188 idx = i;
2189 break;
2190 }
2191 }
2192 }
2193
2194 /* Process data connection. */
2195 for (i = idx + 1; i < nb_fd; i++) {
2196 /* Fetch the poll data. */
2197 uint32_t revents = LTTNG_POLL_GETEV(&events, i);
2198 int pollfd = LTTNG_POLL_GETFD(&events, i);
2199
2200 /* Skip the command pipe. It's handled in the first loop. */
2201 if (pollfd == relay_cmd_pipe[0]) {
2202 continue;
2203 }
2204
2205 if (revents) {
2206 rcu_read_lock();
2207 lttng_ht_lookup(relay_connections_ht,
2208 (void *)((unsigned long) pollfd),
2209 &iter);
2210 node = lttng_ht_iter_get_node_ulong(&iter);
2211 if (node == NULL) {
2212 /* Skip it. Might be removed before. */
2213 rcu_read_unlock();
2214 continue;
2215 }
2216 relay_connection = caa_container_of(node,
2217 struct relay_command, sock_n);
2218
2219 if (revents & LPOLLIN) {
2220 if (relay_connection->type != RELAY_DATA) {
2221 continue;
2222 }
2223
2224 ret = relay_process_data(relay_connection, streams_ht,
2225 indexes_ht);
2226 /* connection closed */
2227 if (ret < 0) {
2228 relay_cleanup_poll_connection(&events, pollfd);
2229 relay_del_connection(relay_connections_ht,
2230 streams_ht, &iter,
2231 relay_connection);
2232 DBG("Data connection closed with %d", pollfd);
2233 /*
2234 * Every goto restart call sets the last seen fd where
2235 * here we don't really care since we gracefully
2236 * continue the loop after the connection is deleted.
2237 */
2238 } else {
2239 /* Keep last seen port. */
2240 last_seen_data_fd = pollfd;
2241 rcu_read_unlock();
2242 goto restart;
2243 }
2244 }
2245 rcu_read_unlock();
2246 }
2247 }
2248 last_seen_data_fd = -1;
2249 }
2250
2251 exit:
2252 error:
2253 lttng_poll_clean(&events);
2254
2255 /* empty the hash table and free the memory */
2256 rcu_read_lock();
2257 cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
2258 node = lttng_ht_iter_get_node_ulong(&iter);
2259 if (node) {
2260 relay_connection = caa_container_of(node,
2261 struct relay_command, sock_n);
2262 relay_del_connection(relay_connections_ht,
2263 streams_ht, &iter,
2264 relay_connection);
2265 }
2266 }
2267 rcu_read_unlock();
2268 error_poll_create:
2269 lttng_ht_destroy(indexes_ht);
2270 indexes_ht_error:
2271 lttng_ht_destroy(streams_ht);
2272 streams_ht_error:
2273 lttng_ht_destroy(relay_connections_ht);
2274 relay_connections_ht_error:
2275 /* Close relay cmd pipes */
2276 utils_close_pipe(relay_cmd_pipe);
2277 if (err) {
2278 DBG("Thread exited with error");
2279 }
2280 DBG("Worker thread cleanup complete");
2281 free(data_buffer);
2282 stop_threads();
2283 rcu_unregister_thread();
2284 return NULL;
2285 }
2286
2287 /*
2288 * Create the relay command pipe to wake thread_manage_apps.
2289 * Closed in cleanup().
2290 */
2291 static int create_relay_cmd_pipe(void)
2292 {
2293 int ret;
2294
2295 ret = utils_create_pipe_cloexec(relay_cmd_pipe);
2296
2297 return ret;
2298 }
2299
2300 /*
2301 * main
2302 */
2303 int main(int argc, char **argv)
2304 {
2305 int ret = 0;
2306 void *status;
2307
2308 /* Create thread quit pipe */
2309 if ((ret = init_thread_quit_pipe()) < 0) {
2310 goto error;
2311 }
2312
2313 /* Parse arguments */
2314 progname = argv[0];
2315 if ((ret = parse_args(argc, argv)) < 0) {
2316 goto exit;
2317 }
2318
2319 if ((ret = set_signal_handler()) < 0) {
2320 goto exit;
2321 }
2322
2323 /* Try to create directory if -o, --output is specified. */
2324 if (opt_output_path) {
2325 if (*opt_output_path != '/') {
2326 ERR("Please specify an absolute path for -o, --output PATH");
2327 goto exit;
2328 }
2329
2330 ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
2331 if (ret < 0) {
2332 ERR("Unable to create %s", opt_output_path);
2333 goto exit;
2334 }
2335 }
2336
2337 /* Daemonize */
2338 if (opt_daemon) {
2339 ret = daemon(0, 0);
2340 if (ret < 0) {
2341 PERROR("daemon");
2342 goto exit;
2343 }
2344 }
2345
2346 /* We need those values for the file/dir creation. */
2347 relayd_uid = getuid();
2348 relayd_gid = getgid();
2349
2350 /* Check if daemon is UID = 0 */
2351 if (relayd_uid == 0) {
2352 if (control_uri->port < 1024 || data_uri->port < 1024) {
2353 ERR("Need to be root to use ports < 1024");
2354 ret = -1;
2355 goto exit;
2356 }
2357 }
2358
2359 /* Setup the thread apps communication pipe. */
2360 if ((ret = create_relay_cmd_pipe()) < 0) {
2361 goto exit;
2362 }
2363
2364 /* Init relay command queue. */
2365 cds_wfq_init(&relay_cmd_queue.queue);
2366
2367 /* Set up max poll set size */
2368 lttng_poll_set_max_size();
2369
2370 /* Initialize communication library */
2371 lttcomm_init();
2372
2373 /* Setup the dispatcher thread */
2374 ret = pthread_create(&dispatcher_thread, NULL,
2375 relay_thread_dispatcher, (void *) NULL);
2376 if (ret != 0) {
2377 PERROR("pthread_create dispatcher");
2378 goto exit_dispatcher;
2379 }
2380
2381 /* Setup the worker thread */
2382 ret = pthread_create(&worker_thread, NULL,
2383 relay_thread_worker, (void *) NULL);
2384 if (ret != 0) {
2385 PERROR("pthread_create worker");
2386 goto exit_worker;
2387 }
2388
2389 /* Setup the listener thread */
2390 ret = pthread_create(&listener_thread, NULL,
2391 relay_thread_listener, (void *) NULL);
2392 if (ret != 0) {
2393 PERROR("pthread_create listener");
2394 goto exit_listener;
2395 }
2396
2397 exit_listener:
2398 ret = pthread_join(listener_thread, &status);
2399 if (ret != 0) {
2400 PERROR("pthread_join");
2401 goto error; /* join error, exit without cleanup */
2402 }
2403
2404 exit_worker:
2405 ret = pthread_join(worker_thread, &status);
2406 if (ret != 0) {
2407 PERROR("pthread_join");
2408 goto error; /* join error, exit without cleanup */
2409 }
2410
2411 exit_dispatcher:
2412 ret = pthread_join(dispatcher_thread, &status);
2413 if (ret != 0) {
2414 PERROR("pthread_join");
2415 goto error; /* join error, exit without cleanup */
2416 }
2417
2418 exit:
2419 cleanup();
2420 if (!ret) {
2421 exit(EXIT_SUCCESS);
2422 }
2423
2424 error:
2425 exit(EXIT_FAILURE);
2426 }
This page took 0.131329 seconds and 6 git commands to generate.