745bde816aa7471222f871c3859d981389e22638
[lttng-tools.git] / src / common / relayd / relayd.c
1 /*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
18 #define _LGPL_SOURCE
19 #include <assert.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <inttypes.h>
25
26 #include <common/common.h>
27 #include <common/defaults.h>
28 #include <common/compat/endian.h>
29 #include <common/compat/string.h>
30 #include <common/sessiond-comm/relayd.h>
31 #include <common/index/ctf-index.h>
32
33 #include "relayd.h"
34
35 /*
36 * Send command. Fill up the header and append the data.
37 */
38 static int send_command(struct lttcomm_relayd_sock *rsock,
39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
40 int flags)
41 {
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
62 memset(&header, 0, sizeof(header));
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
78 if (ret < 0) {
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
81 ret = -errno;
82 goto error;
83 }
84 error:
85 free(buf);
86 alloc_error:
87 return ret;
88 }
89
90 /*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
94 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
95 {
96 int ret;
97
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
102 DBG3("Relayd waiting for reply of size %zu", size);
103
104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
109 } else {
110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
111 rsock->sock.fd, size, ret);
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
115 goto error;
116 }
117
118 error:
119 return ret;
120 }
121
122 /*
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 char *session_name, char *hostname,
130 int session_live_timer, unsigned int snapshot)
131 {
132 int ret;
133 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
134 size_t session_name_len;
135 size_t hostname_len;
136 size_t msg_length;
137
138 /* The two names are sent with a '\0' delimiter between them. */
139 session_name_len = strlen(session_name) + 1;
140 hostname_len = strlen(hostname) + 1;
141
142 msg_length = sizeof(*msg) + session_name_len + hostname_len;
143 msg = zmalloc(msg_length);
144 if (!msg) {
145 PERROR("zmalloc create_session_2_11 command message");
146 ret = -1;
147 goto error;
148 }
149
150 assert(session_name_len <= UINT32_MAX);
151 msg->session_name_len = htobe32(session_name_len);
152
153 assert(hostname_len <= UINT32_MAX);
154 msg->hostname_len = htobe32(hostname_len);
155
156 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
157 ret = -1;
158 goto error;
159 }
160 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
161 ret = -1;
162 goto error;
163 }
164
165 msg->live_timer = htobe32(session_live_timer);
166 msg->snapshot = !!snapshot;
167
168 /* Send command */
169 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
170 if (ret < 0) {
171 goto error;
172 }
173 error:
174 free(msg);
175 return ret;
176 }
177 /*
178 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
179 * support the live reading capability.
180 */
181 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
182 char *session_name, char *hostname, int session_live_timer,
183 unsigned int snapshot)
184 {
185 int ret;
186 struct lttcomm_relayd_create_session_2_4 msg;
187
188 if (lttng_strncpy(msg.session_name, session_name,
189 sizeof(msg.session_name))) {
190 ret = -1;
191 goto error;
192 }
193 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
194 ret = -1;
195 goto error;
196 }
197 msg.live_timer = htobe32(session_live_timer);
198 msg.snapshot = htobe32(snapshot);
199
200 /* Send command */
201 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
202 if (ret < 0) {
203 goto error;
204 }
205
206 error:
207 return ret;
208 }
209
210 /*
211 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
212 */
213 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
214 {
215 int ret;
216
217 /* Send command */
218 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
219 if (ret < 0) {
220 goto error;
221 }
222
223 error:
224 return ret;
225 }
226
227 /*
228 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
229 * set session_id of the relayd if we have a successful reply from the relayd.
230 *
231 * On success, return 0 else a negative value which is either an errno error or
232 * a lttng error code from the relayd.
233 */
234 int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
235 char *session_name, char *hostname, int session_live_timer,
236 unsigned int snapshot)
237 {
238 int ret;
239 struct lttcomm_relayd_status_session reply;
240
241 assert(rsock);
242 assert(session_id);
243
244 DBG("Relayd create session");
245
246 if (rsock->minor < 4) {
247 /* From 2.1 to 2.3 */
248 ret = relayd_create_session_2_1(rsock);
249 } else if (rsock->minor >= 4 && rsock->minor < 11) {
250 /* From 2.4 to 2.10 */
251 ret = relayd_create_session_2_4(rsock, session_name,
252 hostname, session_live_timer, snapshot);
253 } else {
254 /* From 2.11 to ... */
255 ret = relayd_create_session_2_11(rsock, session_name,
256 hostname, session_live_timer, snapshot);
257 }
258
259 if (ret < 0) {
260 goto error;
261 }
262
263 /* Receive response */
264 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
265 if (ret < 0) {
266 goto error;
267 }
268
269 reply.session_id = be64toh(reply.session_id);
270 reply.ret_code = be32toh(reply.ret_code);
271
272 /* Return session id or negative ret code. */
273 if (reply.ret_code != LTTNG_OK) {
274 ret = -1;
275 ERR("Relayd create session replied error %d", reply.ret_code);
276 goto error;
277 } else {
278 ret = 0;
279 *session_id = reply.session_id;
280 }
281
282 DBG("Relayd session created with id %" PRIu64, reply.session_id);
283
284 error:
285 return ret;
286 }
287
288 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
289 const char *channel_name, const char *pathname)
290 {
291 int ret;
292 struct lttcomm_relayd_add_stream msg;
293
294 memset(&msg, 0, sizeof(msg));
295 if (lttng_strncpy(msg.channel_name, channel_name,
296 sizeof(msg.channel_name))) {
297 ret = -1;
298 goto error;
299 }
300
301 if (lttng_strncpy(msg.pathname, pathname,
302 sizeof(msg.pathname))) {
303 ret = -1;
304 goto error;
305 }
306
307 /* Send command */
308 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
309 if (ret < 0) {
310 ret = -1;
311 goto error;
312 }
313 ret = 0;
314 error:
315 return ret;
316 }
317
318 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
319 const char *channel_name, const char *pathname,
320 uint64_t tracefile_size, uint64_t tracefile_count)
321 {
322 int ret;
323 struct lttcomm_relayd_add_stream_2_2 msg;
324
325 memset(&msg, 0, sizeof(msg));
326 /* Compat with relayd 2.2 to 2.10 */
327 if (lttng_strncpy(msg.channel_name, channel_name,
328 sizeof(msg.channel_name))) {
329 ret = -1;
330 goto error;
331 }
332 if (lttng_strncpy(msg.pathname, pathname,
333 sizeof(msg.pathname))) {
334 ret = -1;
335 goto error;
336 }
337 msg.tracefile_size = htobe64(tracefile_size);
338 msg.tracefile_count = htobe64(tracefile_count);
339
340 /* Send command */
341 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
342 if (ret < 0) {
343 goto error;
344 }
345 ret = 0;
346 error:
347 return ret;
348 }
349
350 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
351 const char *channel_name, const char *pathname,
352 uint64_t tracefile_size, uint64_t tracefile_count)
353 {
354 int ret;
355 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
356 size_t channel_name_len;
357 size_t pathname_len;
358 size_t msg_length;
359
360 /* The two names are sent with a '\0' delimiter between them. */
361 channel_name_len = strlen(channel_name) + 1;
362 pathname_len = strlen(pathname) + 1;
363
364 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
365 msg = zmalloc(msg_length);
366 if (!msg) {
367 PERROR("zmalloc add_stream_2_11 command message");
368 ret = -1;
369 goto error;
370 }
371
372 assert(channel_name_len <= UINT32_MAX);
373 msg->channel_name_len = htobe32(channel_name_len);
374
375 assert(pathname_len <= UINT32_MAX);
376 msg->pathname_len = htobe32(pathname_len);
377
378 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
379 ret = -1;
380 goto error;
381 }
382 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
383 ret = -1;
384 goto error;
385 }
386
387 msg->tracefile_size = htobe64(tracefile_size);
388 msg->tracefile_count = htobe64(tracefile_count);
389
390 /* Send command */
391 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
392 if (ret < 0) {
393 goto error;
394 }
395 ret = 0;
396 error:
397 free(msg);
398 return ret;
399 }
400
401 /*
402 * Add stream on the relayd and assign stream handle to the stream_id argument.
403 *
404 * On success return 0 else return ret_code negative value.
405 */
406 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
407 const char *pathname, uint64_t *stream_id,
408 uint64_t tracefile_size, uint64_t tracefile_count,
409 uint64_t trace_archive_id)
410 {
411 int ret;
412 struct lttcomm_relayd_status_stream reply;
413
414 /* Code flow error. Safety net. */
415 assert(rsock);
416 assert(channel_name);
417 assert(pathname);
418
419 DBG("Relayd adding stream for channel name %s", channel_name);
420
421 /* Compat with relayd 2.1 */
422 if (rsock->minor == 1) {
423 /* For 2.1 */
424 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
425
426 } else if (rsock->minor > 1 && rsock->minor < 11) {
427 /* From 2.2 to 2.10 */
428 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
429 tracefile_size, tracefile_count);
430 } else {
431 /* From 2.11 to ...*/
432 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
433 tracefile_size, tracefile_count);
434 }
435
436 if (ret) {
437 ret = -1;
438 goto error;
439 }
440
441 /* Waiting for reply */
442 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
443 if (ret < 0) {
444 goto error;
445 }
446
447 /* Back to host bytes order. */
448 reply.handle = be64toh(reply.handle);
449 reply.ret_code = be32toh(reply.ret_code);
450
451 /* Return session id or negative ret code. */
452 if (reply.ret_code != LTTNG_OK) {
453 ret = -1;
454 ERR("Relayd add stream replied error %d", reply.ret_code);
455 } else {
456 /* Success */
457 ret = 0;
458 *stream_id = reply.handle;
459 }
460
461 DBG("Relayd stream added successfully with handle %" PRIu64,
462 reply.handle);
463
464 error:
465 return ret;
466 }
467
468 /*
469 * Inform the relay that all the streams for the current channel has been sent.
470 *
471 * On success return 0 else return ret_code negative value.
472 */
473 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
474 {
475 int ret;
476 struct lttcomm_relayd_generic_reply reply;
477
478 /* Code flow error. Safety net. */
479 assert(rsock);
480
481 DBG("Relayd sending streams sent.");
482
483 /* This feature was introduced in 2.4, ignore it for earlier versions. */
484 if (rsock->minor < 4) {
485 ret = 0;
486 goto end;
487 }
488
489 /* Send command */
490 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
491 if (ret < 0) {
492 goto error;
493 }
494
495 /* Waiting for reply */
496 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
497 if (ret < 0) {
498 goto error;
499 }
500
501 /* Back to host bytes order. */
502 reply.ret_code = be32toh(reply.ret_code);
503
504 /* Return session id or negative ret code. */
505 if (reply.ret_code != LTTNG_OK) {
506 ret = -1;
507 ERR("Relayd streams sent replied error %d", reply.ret_code);
508 goto error;
509 } else {
510 /* Success */
511 ret = 0;
512 }
513
514 DBG("Relayd streams sent success");
515
516 error:
517 end:
518 return ret;
519 }
520
521 /*
522 * Check version numbers on the relayd.
523 * If major versions are compatible, we assign minor_to_use to the
524 * minor version of the procotol we are going to use for this session.
525 *
526 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
527 * otherwise, or a negative value on network errors.
528 */
529 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
530 {
531 int ret;
532 struct lttcomm_relayd_version msg;
533
534 /* Code flow error. Safety net. */
535 assert(rsock);
536
537 DBG("Relayd version check for major.minor %u.%u", rsock->major,
538 rsock->minor);
539
540 memset(&msg, 0, sizeof(msg));
541 /* Prepare network byte order before transmission. */
542 msg.major = htobe32(rsock->major);
543 msg.minor = htobe32(rsock->minor);
544
545 /* Send command */
546 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
547 if (ret < 0) {
548 goto error;
549 }
550
551 /* Receive response */
552 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
553 if (ret < 0) {
554 goto error;
555 }
556
557 /* Set back to host bytes order */
558 msg.major = be32toh(msg.major);
559 msg.minor = be32toh(msg.minor);
560
561 /*
562 * Only validate the major version. If the other side is higher,
563 * communication is not possible. Only major version equal can talk to each
564 * other. If the minor version differs, the lowest version is used by both
565 * sides.
566 */
567 if (msg.major != rsock->major) {
568 /* Not compatible */
569 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
570 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
571 msg.major, rsock->major);
572 goto error;
573 }
574
575 /*
576 * If the relayd's minor version is higher, it will adapt to our version so
577 * we can continue to use the latest relayd communication data structure.
578 * If the received minor version is higher, the relayd should adapt to us.
579 */
580 if (rsock->minor > msg.minor) {
581 rsock->minor = msg.minor;
582 }
583
584 /* Version number compatible */
585 DBG2("Relayd version is compatible, using protocol version %u.%u",
586 rsock->major, rsock->minor);
587 ret = 0;
588
589 error:
590 return ret;
591 }
592
593 /*
594 * Add stream on the relayd and assign stream handle to the stream_id argument.
595 *
596 * On success return 0 else return ret_code negative value.
597 */
598 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
599 {
600 int ret;
601
602 /* Code flow error. Safety net. */
603 assert(rsock);
604
605 DBG("Relayd sending metadata of size %zu", len);
606
607 /* Send command */
608 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
609 if (ret < 0) {
610 goto error;
611 }
612
613 DBG2("Relayd metadata added successfully");
614
615 /*
616 * After that call, the metadata data MUST be sent to the relayd so the
617 * receive size on the other end matches the len of the metadata packet
618 * header. This is why we don't wait for a reply here.
619 */
620
621 error:
622 return ret;
623 }
624
625 /*
626 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
627 */
628 int relayd_connect(struct lttcomm_relayd_sock *rsock)
629 {
630 /* Code flow error. Safety net. */
631 assert(rsock);
632
633 if (!rsock->sock.ops) {
634 /*
635 * Attempting a connect on a non-initialized socket.
636 */
637 return -ECONNRESET;
638 }
639
640 DBG3("Relayd connect ...");
641
642 return rsock->sock.ops->connect(&rsock->sock);
643 }
644
645 /*
646 * Close relayd socket with an allocated lttcomm_relayd_sock.
647 *
648 * If no socket operations are found, simply return 0 meaning that everything
649 * is fine. Without operations, the socket can not possibly be opened or used.
650 * This is possible if the socket was allocated but not created. However, the
651 * caller could simply use it to store a valid file descriptor for instance
652 * passed over a Unix socket and call this to cleanup but still without a valid
653 * ops pointer.
654 *
655 * Return the close returned value. On error, a negative value is usually
656 * returned back from close(2).
657 */
658 int relayd_close(struct lttcomm_relayd_sock *rsock)
659 {
660 int ret;
661
662 /* Code flow error. Safety net. */
663 assert(rsock);
664
665 /* An invalid fd is fine, return success. */
666 if (rsock->sock.fd < 0) {
667 ret = 0;
668 goto end;
669 }
670
671 DBG3("Relayd closing socket %d", rsock->sock.fd);
672
673 if (rsock->sock.ops) {
674 ret = rsock->sock.ops->close(&rsock->sock);
675 } else {
676 /* Default call if no specific ops found. */
677 ret = close(rsock->sock.fd);
678 if (ret < 0) {
679 PERROR("relayd_close default close");
680 }
681 }
682 rsock->sock.fd = -1;
683
684 end:
685 return ret;
686 }
687
688 /*
689 * Send data header structure to the relayd.
690 */
691 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
692 struct lttcomm_relayd_data_hdr *hdr, size_t size)
693 {
694 int ret;
695
696 /* Code flow error. Safety net. */
697 assert(rsock);
698 assert(hdr);
699
700 if (rsock->sock.fd < 0) {
701 return -ECONNRESET;
702 }
703
704 DBG3("Relayd sending data header of size %zu", size);
705
706 /* Again, safety net */
707 if (size == 0) {
708 size = sizeof(struct lttcomm_relayd_data_hdr);
709 }
710
711 /* Only send data header. */
712 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
713 if (ret < 0) {
714 ret = -errno;
715 goto error;
716 }
717
718 /*
719 * The data MUST be sent right after that command for the receive on the
720 * other end to match the size in the header.
721 */
722
723 error:
724 return ret;
725 }
726
727 /*
728 * Send close stream command to the relayd.
729 */
730 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
731 uint64_t last_net_seq_num)
732 {
733 int ret;
734 struct lttcomm_relayd_close_stream msg;
735 struct lttcomm_relayd_generic_reply reply;
736
737 /* Code flow error. Safety net. */
738 assert(rsock);
739
740 DBG("Relayd closing stream id %" PRIu64, stream_id);
741
742 memset(&msg, 0, sizeof(msg));
743 msg.stream_id = htobe64(stream_id);
744 msg.last_net_seq_num = htobe64(last_net_seq_num);
745
746 /* Send command */
747 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
748 if (ret < 0) {
749 goto error;
750 }
751
752 /* Receive response */
753 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
754 if (ret < 0) {
755 goto error;
756 }
757
758 reply.ret_code = be32toh(reply.ret_code);
759
760 /* Return session id or negative ret code. */
761 if (reply.ret_code != LTTNG_OK) {
762 ret = -1;
763 ERR("Relayd close stream replied error %d", reply.ret_code);
764 } else {
765 /* Success */
766 ret = 0;
767 }
768
769 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
770
771 error:
772 return ret;
773 }
774
775 /*
776 * Check for data availability for a given stream id.
777 *
778 * Return 0 if NOT pending, 1 if so and a negative value on error.
779 */
780 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
781 uint64_t last_net_seq_num)
782 {
783 int ret;
784 struct lttcomm_relayd_data_pending msg;
785 struct lttcomm_relayd_generic_reply reply;
786
787 /* Code flow error. Safety net. */
788 assert(rsock);
789
790 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
791
792 memset(&msg, 0, sizeof(msg));
793 msg.stream_id = htobe64(stream_id);
794 msg.last_net_seq_num = htobe64(last_net_seq_num);
795
796 /* Send command */
797 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
798 sizeof(msg), 0);
799 if (ret < 0) {
800 goto error;
801 }
802
803 /* Receive response */
804 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
805 if (ret < 0) {
806 goto error;
807 }
808
809 reply.ret_code = be32toh(reply.ret_code);
810
811 /* Return session id or negative ret code. */
812 if (reply.ret_code >= LTTNG_OK) {
813 ERR("Relayd data pending replied error %d", reply.ret_code);
814 }
815
816 /* At this point, the ret code is either 1 or 0 */
817 ret = reply.ret_code;
818
819 DBG("Relayd data is %s pending for stream id %" PRIu64,
820 ret == 1 ? "" : "NOT", stream_id);
821
822 error:
823 return ret;
824 }
825
826 /*
827 * Check on the relayd side for a quiescent state on the control socket.
828 */
829 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
830 uint64_t metadata_stream_id)
831 {
832 int ret;
833 struct lttcomm_relayd_quiescent_control msg;
834 struct lttcomm_relayd_generic_reply reply;
835
836 /* Code flow error. Safety net. */
837 assert(rsock);
838
839 DBG("Relayd checking quiescent control state");
840
841 memset(&msg, 0, sizeof(msg));
842 msg.stream_id = htobe64(metadata_stream_id);
843
844 /* Send command */
845 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
846 if (ret < 0) {
847 goto error;
848 }
849
850 /* Receive response */
851 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
852 if (ret < 0) {
853 goto error;
854 }
855
856 reply.ret_code = be32toh(reply.ret_code);
857
858 /* Return session id or negative ret code. */
859 if (reply.ret_code != LTTNG_OK) {
860 ret = -1;
861 ERR("Relayd quiescent control replied error %d", reply.ret_code);
862 goto error;
863 }
864
865 /* Control socket is quiescent */
866 return 0;
867
868 error:
869 return ret;
870 }
871
872 /*
873 * Begin a data pending command for a specific session id.
874 */
875 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
876 {
877 int ret;
878 struct lttcomm_relayd_begin_data_pending msg;
879 struct lttcomm_relayd_generic_reply reply;
880
881 /* Code flow error. Safety net. */
882 assert(rsock);
883
884 DBG("Relayd begin data pending");
885
886 memset(&msg, 0, sizeof(msg));
887 msg.session_id = htobe64(id);
888
889 /* Send command */
890 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
891 if (ret < 0) {
892 goto error;
893 }
894
895 /* Receive response */
896 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
897 if (ret < 0) {
898 goto error;
899 }
900
901 reply.ret_code = be32toh(reply.ret_code);
902
903 /* Return session id or negative ret code. */
904 if (reply.ret_code != LTTNG_OK) {
905 ret = -1;
906 ERR("Relayd begin data pending replied error %d", reply.ret_code);
907 goto error;
908 }
909
910 return 0;
911
912 error:
913 return ret;
914 }
915
916 /*
917 * End a data pending command for a specific session id.
918 *
919 * Return 0 on success and set is_data_inflight to 0 if no data is being
920 * streamed or 1 if it is the case.
921 */
922 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
923 unsigned int *is_data_inflight)
924 {
925 int ret, recv_ret;
926 struct lttcomm_relayd_end_data_pending msg;
927 struct lttcomm_relayd_generic_reply reply;
928
929 /* Code flow error. Safety net. */
930 assert(rsock);
931
932 DBG("Relayd end data pending");
933
934 memset(&msg, 0, sizeof(msg));
935 msg.session_id = htobe64(id);
936
937 /* Send command */
938 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
939 if (ret < 0) {
940 goto error;
941 }
942
943 /* Receive response */
944 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
945 if (ret < 0) {
946 goto error;
947 }
948
949 recv_ret = be32toh(reply.ret_code);
950 if (recv_ret < 0) {
951 ret = recv_ret;
952 goto error;
953 }
954
955 *is_data_inflight = recv_ret;
956
957 DBG("Relayd end data pending is data inflight: %d", recv_ret);
958
959 return 0;
960
961 error:
962 return ret;
963 }
964
965 /*
966 * Send index to the relayd.
967 */
968 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
969 struct ctf_packet_index *index, uint64_t relay_stream_id,
970 uint64_t net_seq_num)
971 {
972 int ret;
973 struct lttcomm_relayd_index msg;
974 struct lttcomm_relayd_generic_reply reply;
975
976 /* Code flow error. Safety net. */
977 assert(rsock);
978
979 if (rsock->minor < 4) {
980 DBG("Not sending indexes before protocol 2.4");
981 ret = 0;
982 goto error;
983 }
984
985 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
986
987 memset(&msg, 0, sizeof(msg));
988 msg.relay_stream_id = htobe64(relay_stream_id);
989 msg.net_seq_num = htobe64(net_seq_num);
990
991 /* The index is already in big endian. */
992 msg.packet_size = index->packet_size;
993 msg.content_size = index->content_size;
994 msg.timestamp_begin = index->timestamp_begin;
995 msg.timestamp_end = index->timestamp_end;
996 msg.events_discarded = index->events_discarded;
997 msg.stream_id = index->stream_id;
998
999 if (rsock->minor >= 8) {
1000 msg.stream_instance_id = index->stream_instance_id;
1001 msg.packet_seq_num = index->packet_seq_num;
1002 }
1003
1004 /* Send command */
1005 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1006 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1007 rsock->minor),
1008 lttng_to_index_minor(rsock->major, rsock->minor)),
1009 0);
1010 if (ret < 0) {
1011 goto error;
1012 }
1013
1014 /* Receive response */
1015 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1016 if (ret < 0) {
1017 goto error;
1018 }
1019
1020 reply.ret_code = be32toh(reply.ret_code);
1021
1022 /* Return session id or negative ret code. */
1023 if (reply.ret_code != LTTNG_OK) {
1024 ret = -1;
1025 ERR("Relayd send index replied error %d", reply.ret_code);
1026 } else {
1027 /* Success */
1028 ret = 0;
1029 }
1030
1031 error:
1032 return ret;
1033 }
1034
1035 /*
1036 * Ask the relay to reset the metadata trace file (regeneration).
1037 */
1038 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1039 uint64_t stream_id, uint64_t version)
1040 {
1041 int ret;
1042 struct lttcomm_relayd_reset_metadata msg;
1043 struct lttcomm_relayd_generic_reply reply;
1044
1045 /* Code flow error. Safety net. */
1046 assert(rsock);
1047
1048 /* Should have been prevented by the sessiond. */
1049 if (rsock->minor < 8) {
1050 ERR("Metadata regeneration unsupported before 2.8");
1051 ret = -1;
1052 goto error;
1053 }
1054
1055 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1056
1057 memset(&msg, 0, sizeof(msg));
1058 msg.stream_id = htobe64(stream_id);
1059 msg.version = htobe64(version);
1060
1061 /* Send command */
1062 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1063 if (ret < 0) {
1064 goto error;
1065 }
1066
1067 /* Receive response */
1068 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1069 if (ret < 0) {
1070 goto error;
1071 }
1072
1073 reply.ret_code = be32toh(reply.ret_code);
1074
1075 /* Return session id or negative ret code. */
1076 if (reply.ret_code != LTTNG_OK) {
1077 ret = -1;
1078 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1079 } else {
1080 /* Success */
1081 ret = 0;
1082 }
1083
1084 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1085
1086 error:
1087 return ret;
1088 }
1089
1090 int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1091 const char *new_pathname, uint64_t new_chunk_id,
1092 uint64_t seq_num)
1093 {
1094 int ret;
1095 struct lttcomm_relayd_rotate_stream *msg = NULL;
1096 struct lttcomm_relayd_generic_reply reply;
1097 size_t len;
1098 int msg_len;
1099
1100 /* Code flow error. Safety net. */
1101 assert(rsock);
1102
1103 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1104
1105 /* Account for the trailing NULL. */
1106 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
1107 if (len > LTTNG_PATH_MAX) {
1108 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1109 ret = -1;
1110 goto error;
1111 }
1112
1113 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1114 msg = zmalloc(msg_len);
1115 if (!msg) {
1116 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1117 msg_len);
1118 ret = -1;
1119 goto error;
1120 }
1121
1122 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1123 ret = -1;
1124 ERR("Failed to copy relayd rotate stream command's new path name");
1125 goto error;
1126 }
1127
1128 msg->pathname_length = htobe32(len);
1129 msg->stream_id = htobe64(stream_id);
1130 msg->new_chunk_id = htobe64(new_chunk_id);
1131 /*
1132 * The seq_num is invalid for metadata streams, but it is ignored on
1133 * the relay.
1134 */
1135 msg->rotate_at_seq_num = htobe64(seq_num);
1136
1137 /* Send command. */
1138 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1139 if (ret < 0) {
1140 ERR("Send rotate command");
1141 goto error;
1142 }
1143
1144 /* Receive response. */
1145 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1146 if (ret < 0) {
1147 ERR("Receive rotate reply");
1148 goto error;
1149 }
1150
1151 reply.ret_code = be32toh(reply.ret_code);
1152
1153 /* Return session id or negative ret code. */
1154 if (reply.ret_code != LTTNG_OK) {
1155 ret = -1;
1156 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1157 } else {
1158 /* Success. */
1159 ret = 0;
1160 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1161 }
1162
1163 error:
1164 free(msg);
1165 return ret;
1166 }
1167
1168 int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1169 const char *old_path, const char *new_path)
1170 {
1171 int ret;
1172 struct lttcomm_relayd_rotate_rename *msg = NULL;
1173 struct lttcomm_relayd_generic_reply reply;
1174 size_t old_path_length, new_path_length;
1175 size_t msg_length;
1176
1177 /* Code flow error. Safety net. */
1178 assert(rsock);
1179
1180 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1181
1182 /* The two paths are sent with a '\0' delimiter between them. */
1183 old_path_length = strlen(old_path) + 1;
1184 new_path_length = strlen(new_path) + 1;
1185
1186 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1187 msg = zmalloc(msg_length);
1188 if (!msg) {
1189 PERROR("zmalloc rotate-rename command message");
1190 ret = -1;
1191 goto error;
1192 }
1193
1194 assert(old_path_length <= UINT32_MAX);
1195 msg->old_path_length = htobe32(old_path_length);
1196
1197 assert(new_path_length <= UINT32_MAX);
1198 msg->new_path_length = htobe32(new_path_length);
1199
1200 strcpy(msg->paths, old_path);
1201 strcpy(msg->paths + old_path_length, new_path);
1202
1203 /* Send command */
1204 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1205 msg_length, 0);
1206 if (ret < 0) {
1207 goto error;
1208 }
1209
1210 /* Receive response */
1211 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1212 if (ret < 0) {
1213 goto error;
1214 }
1215
1216 reply.ret_code = be32toh(reply.ret_code);
1217
1218 /* Return session id or negative ret code. */
1219 if (reply.ret_code != LTTNG_OK) {
1220 ret = -1;
1221 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1222 } else {
1223 /* Success */
1224 ret = 0;
1225 }
1226
1227 DBG("Relayd rotate rename completed successfully");
1228
1229 error:
1230 free(msg);
1231 return ret;
1232 }
1233
1234 int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1235 {
1236 int ret;
1237 struct lttcomm_relayd_rotate_pending msg;
1238 struct lttcomm_relayd_rotate_pending_reply reply;
1239
1240 /* Code flow error. Safety net. */
1241 assert(rsock);
1242
1243 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1244 chunk_id);
1245
1246 memset(&msg, 0, sizeof(msg));
1247 msg.chunk_id = htobe64(chunk_id);
1248
1249 /* Send command */
1250 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1251 sizeof(msg), 0);
1252 if (ret < 0) {
1253 goto error;
1254 }
1255
1256 /* Receive response */
1257 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1258 if (ret < 0) {
1259 goto error;
1260 }
1261
1262 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1263
1264 /* Return session id or negative ret code. */
1265 if (reply.generic.ret_code != LTTNG_OK) {
1266 ret = -reply.generic.ret_code;
1267 ERR("Relayd rotate pending replied with error %d", ret);
1268 goto error;
1269 } else {
1270 /* No error, just rotate pending state */
1271 if (reply.is_pending == 0 || reply.is_pending == 1) {
1272 ret = reply.is_pending;
1273 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1274 ret ? "rotation pending" : "rotation NOT pending");
1275 } else {
1276 ret = -LTTNG_ERR_UNK;
1277 }
1278 }
1279
1280 error:
1281 return ret;
1282 }
1283
1284 int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1285 {
1286 int ret;
1287 struct lttcomm_relayd_mkdir *msg;
1288 struct lttcomm_relayd_generic_reply reply;
1289 size_t len;
1290
1291 /* Code flow error. Safety net. */
1292 assert(rsock);
1293
1294 DBG("Relayd mkdir path %s", path);
1295
1296 len = strlen(path) + 1;
1297 msg = zmalloc(sizeof(msg->length) + len);
1298 if (!msg) {
1299 PERROR("Alloc mkdir msg");
1300 ret = -1;
1301 goto error;
1302 }
1303 msg->length = htobe32((uint32_t) len);
1304
1305 if (lttng_strncpy(msg->path, path, len)) {
1306 ret = -1;
1307 goto error;
1308 }
1309
1310 /* Send command */
1311 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1312 sizeof(msg->length) + len, 0);
1313 if (ret < 0) {
1314 goto error;
1315 }
1316
1317 /* Receive response */
1318 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1319 if (ret < 0) {
1320 goto error;
1321 }
1322
1323 reply.ret_code = be32toh(reply.ret_code);
1324
1325 /* Return session id or negative ret code. */
1326 if (reply.ret_code != LTTNG_OK) {
1327 ret = -1;
1328 ERR("Relayd mkdir replied error %d", reply.ret_code);
1329 } else {
1330 /* Success */
1331 ret = 0;
1332 }
1333
1334 DBG("Relayd mkdir completed successfully");
1335
1336 error:
1337 free(msg);
1338 return ret;
1339 }
This page took 0.092967 seconds and 4 git commands to generate.