Save/load: support session trace format
[deliverable/lttng-tools.git] / src / common / relayd / relayd.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/stat.h>
13 #include <inttypes.h>
14
15 #include <bitset>
16 #include <common/common.hpp>
17 #include <common/compat/endian.hpp>
18 #include <common/compat/string.hpp>
19 #include <common/defaults.hpp>
20 #include <common/index/ctf-index.hpp>
21 #include <common/sessiond-comm/relayd.hpp>
22 #include <common/string-utils/format.hpp>
23 #include <common/trace-chunk.hpp>
24
25 #include "relayd.hpp"
26
27 static
28 bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
29 {
30 if (sock->major > 2) {
31 return true;
32 } else if (sock->major == 2 && sock->minor >= 11) {
33 return true;
34 }
35 return false;
36 }
37
38 static
39 bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
40 {
41 if (sock->major > 2) {
42 return true;
43 } else if (sock->major == 2 && sock->minor >= 12) {
44 return true;
45 }
46 return false;
47 }
48
49 static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
50 {
51 if (sock->major > 2) {
52 return true;
53 } else if (sock->major == 2 && sock->minor >= 15) {
54 return true;
55 }
56 return false;
57 }
58
59 /*
60 * Send command. Fill up the header and append the data.
61 */
62 static int send_command(struct lttcomm_relayd_sock *rsock,
63 enum lttcomm_relayd_command cmd, const void *data, size_t size,
64 int flags)
65 {
66 int ret;
67 struct lttcomm_relayd_hdr header;
68 char *buf;
69 uint64_t buf_size = sizeof(header);
70
71 if (rsock->sock.fd < 0) {
72 return -ECONNRESET;
73 }
74
75 if (data) {
76 buf_size += size;
77 }
78
79 buf = calloc<char>(buf_size);
80 if (buf == NULL) {
81 PERROR("zmalloc relayd send command buf");
82 ret = -ENOMEM;
83 goto alloc_error;
84 }
85
86 memset(&header, 0, sizeof(header));
87 header.cmd = htobe32(cmd);
88 header.data_size = htobe64(size);
89
90 /* Zeroed for now since not used. */
91 header.cmd_version = 0;
92 header.circuit_id = 0;
93
94 /* Prepare buffer to send. */
95 memcpy(buf, &header, sizeof(header));
96 if (data) {
97 memcpy(buf + sizeof(header), data, size);
98 }
99
100 DBG3("Relayd sending command %s of size %" PRIu64,
101 lttcomm_relayd_command_str(cmd), buf_size);
102 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
103 if (ret < 0) {
104 PERROR("Failed to send command %s of size %" PRIu64,
105 lttcomm_relayd_command_str(cmd), buf_size);
106 ret = -errno;
107 goto error;
108 }
109 error:
110 free(buf);
111 alloc_error:
112 return ret;
113 }
114
115 /*
116 * Receive reply data on socket. This MUST be call after send_command or else
117 * could result in unexpected behavior(s).
118 */
119 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
120 {
121 int ret;
122
123 if (rsock->sock.fd < 0) {
124 return -ECONNRESET;
125 }
126
127 DBG3("Relayd waiting for reply of size %zu", size);
128
129 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
130 if (ret <= 0 || ret != size) {
131 if (ret == 0) {
132 /* Orderly shutdown. */
133 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
134 } else {
135 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
136 rsock->sock.fd, size, ret);
137 }
138 /* Always return -1 here and the caller can use errno. */
139 ret = -1;
140 goto error;
141 }
142
143 error:
144 return ret;
145 }
146
147 /*
148 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
149 * hostname, and base_path) have no length restriction on the sender side.
150 * Length for both payloads is stored in the msg struct. A new dynamic size
151 * payload size is introduced.
152 */
153 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
154 const char *session_name, const char *hostname,
155 const char *base_path, int session_live_timer,
156 unsigned int snapshot, uint64_t sessiond_session_id,
157 const lttng_uuid& sessiond_uuid, const uint64_t *current_chunk_id,
158 time_t creation_time, bool session_name_contains_creation_time,
159 struct lttcomm_relayd_create_session_reply_2_11 *reply,
160 char *output_path)
161 {
162 int ret;
163 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
164 size_t session_name_len;
165 size_t hostname_len;
166 size_t base_path_len;
167 size_t msg_length;
168 char *dst;
169
170 if (!base_path) {
171 base_path = "";
172 }
173 /* The three names are sent with a '\0' delimiter between them. */
174 session_name_len = strlen(session_name) + 1;
175 hostname_len = strlen(hostname) + 1;
176 base_path_len = strlen(base_path) + 1;
177
178 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
179 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
180 if (!msg) {
181 PERROR("zmalloc create_session_2_11 command message");
182 ret = -1;
183 goto error;
184 }
185
186 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
187 msg->session_name_len = htobe32(session_name_len);
188
189 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
190 msg->hostname_len = htobe32(hostname_len);
191
192 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
193 msg->base_path_len = htobe32(base_path_len);
194
195 dst = msg->names;
196 if (lttng_strncpy(dst, session_name, session_name_len)) {
197 ret = -1;
198 goto error;
199 }
200 dst += session_name_len;
201 if (lttng_strncpy(dst, hostname, hostname_len)) {
202 ret = -1;
203 goto error;
204 }
205 dst += hostname_len;
206 if (lttng_strncpy(dst, base_path, base_path_len)) {
207 ret = -1;
208 goto error;
209 }
210
211 msg->live_timer = htobe32(session_live_timer);
212 msg->snapshot = !!snapshot;
213
214 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
215 msg->session_id = htobe64(sessiond_session_id);
216 msg->session_name_contains_creation_time = session_name_contains_creation_time;
217 if (current_chunk_id) {
218 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
219 htobe64(*current_chunk_id));
220 }
221
222 msg->creation_time = htobe64((uint64_t) creation_time);
223
224 /* Send command */
225 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
226 if (ret < 0) {
227 goto error;
228 }
229 /* Receive response */
230 ret = recv_reply(rsock, reply, sizeof(*reply));
231 if (ret < 0) {
232 goto error;
233 }
234 reply->generic.session_id = be64toh(reply->generic.session_id);
235 reply->generic.ret_code = be32toh(reply->generic.ret_code);
236 reply->output_path_length = be32toh(reply->output_path_length);
237 if (reply->output_path_length >= LTTNG_PATH_MAX) {
238 ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
239 reply->output_path_length, LTTNG_PATH_MAX);
240 ret = -1;
241 goto error;
242 }
243 ret = recv_reply(rsock, output_path, reply->output_path_length);
244 if (ret < 0) {
245 goto error;
246 }
247 error:
248 free(msg);
249 return ret;
250 }
251 /*
252 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
253 * support the live reading capability.
254 */
255 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
256 const char *session_name, const char *hostname,
257 int session_live_timer, unsigned int snapshot,
258 struct lttcomm_relayd_status_session *reply)
259 {
260 int ret;
261 struct lttcomm_relayd_create_session_2_4 msg;
262
263 if (lttng_strncpy(msg.session_name, session_name,
264 sizeof(msg.session_name))) {
265 ret = -1;
266 goto error;
267 }
268 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
269 ret = -1;
270 goto error;
271 }
272 msg.live_timer = htobe32(session_live_timer);
273 msg.snapshot = htobe32(snapshot);
274
275 /* Send command */
276 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
277 if (ret < 0) {
278 goto error;
279 }
280
281 /* Receive response */
282 ret = recv_reply(rsock, reply, sizeof(*reply));
283 if (ret < 0) {
284 goto error;
285 }
286 reply->session_id = be64toh(reply->session_id);
287 reply->ret_code = be32toh(reply->ret_code);
288 error:
289 return ret;
290 }
291
292 /*
293 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
294 */
295 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
296 struct lttcomm_relayd_status_session *reply)
297 {
298 int ret;
299
300 /* Send command */
301 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
302 if (ret < 0) {
303 goto error;
304 }
305
306 /* Receive response */
307 ret = recv_reply(rsock, reply, sizeof(*reply));
308 if (ret < 0) {
309 goto error;
310 }
311 reply->session_id = be64toh(reply->session_id);
312 reply->ret_code = be32toh(reply->ret_code);
313 error:
314 return ret;
315 }
316
317 /*
318 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
319 * set session_id of the relayd if we have a successful reply from the relayd.
320 *
321 * On success, return 0 else a negative value which is either an errno error or
322 * a lttng error code from the relayd.
323 */
324 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
325 uint64_t *relayd_session_id,
326 const char *session_name, const char *hostname,
327 const char *base_path, int session_live_timer,
328 unsigned int snapshot, uint64_t sessiond_session_id,
329 const lttng_uuid& sessiond_uuid,
330 const uint64_t *current_chunk_id,
331 time_t creation_time, bool session_name_contains_creation_time,
332 char *output_path)
333 {
334 int ret;
335 struct lttcomm_relayd_create_session_reply_2_11 reply = {};
336
337 LTTNG_ASSERT(rsock);
338 LTTNG_ASSERT(relayd_session_id);
339
340 DBG("Relayd create session");
341
342 if (rsock->minor < 4) {
343 /* From 2.1 to 2.3 */
344 ret = relayd_create_session_2_1(rsock, &reply.generic);
345 } else if (rsock->minor >= 4 && rsock->minor < 11) {
346 /* From 2.4 to 2.10 */
347 ret = relayd_create_session_2_4(rsock, session_name,
348 hostname, session_live_timer, snapshot,
349 &reply.generic);
350 } else {
351 /* From 2.11 to ... */
352 ret = relayd_create_session_2_11(rsock, session_name,
353 hostname, base_path, session_live_timer, snapshot,
354 sessiond_session_id, sessiond_uuid,
355 current_chunk_id, creation_time,
356 session_name_contains_creation_time,
357 &reply, output_path);
358 }
359
360 if (ret < 0) {
361 goto error;
362 }
363
364 /* Return session id or negative ret code. */
365 if (reply.generic.ret_code != LTTNG_OK) {
366 ret = -1;
367 ERR("Relayd create session replied error %d",
368 reply.generic.ret_code);
369 goto error;
370 } else {
371 ret = 0;
372 *relayd_session_id = reply.generic.session_id;
373 }
374
375 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
376
377 error:
378 return ret;
379 }
380
381 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
382 const char *channel_name, const char *pathname)
383 {
384 int ret;
385 struct lttcomm_relayd_add_stream msg;
386
387 memset(&msg, 0, sizeof(msg));
388 if (lttng_strncpy(msg.channel_name, channel_name,
389 sizeof(msg.channel_name))) {
390 ret = -1;
391 goto error;
392 }
393
394 if (lttng_strncpy(msg.pathname, pathname,
395 sizeof(msg.pathname))) {
396 ret = -1;
397 goto error;
398 }
399
400 /* Send command */
401 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
402 if (ret < 0) {
403 ret = -1;
404 goto error;
405 }
406 ret = 0;
407 error:
408 return ret;
409 }
410
411 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
412 const char *channel_name, const char *pathname,
413 uint64_t tracefile_size, uint64_t tracefile_count)
414 {
415 int ret;
416 struct lttcomm_relayd_add_stream_2_2 msg;
417
418 memset(&msg, 0, sizeof(msg));
419 /* Compat with relayd 2.2 to 2.10 */
420 if (lttng_strncpy(msg.channel_name, channel_name,
421 sizeof(msg.channel_name))) {
422 ret = -1;
423 goto error;
424 }
425 if (lttng_strncpy(msg.pathname, pathname,
426 sizeof(msg.pathname))) {
427 ret = -1;
428 goto error;
429 }
430 msg.tracefile_size = htobe64(tracefile_size);
431 msg.tracefile_count = htobe64(tracefile_count);
432
433 /* Send command */
434 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
435 if (ret < 0) {
436 goto error;
437 }
438 ret = 0;
439 error:
440 return ret;
441 }
442
443 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
444 const char *channel_name, const char *pathname,
445 uint64_t tracefile_size, uint64_t tracefile_count,
446 uint64_t trace_archive_id)
447 {
448 int ret;
449 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
450 size_t channel_name_len;
451 size_t pathname_len;
452 size_t msg_length;
453
454 /* The two names are sent with a '\0' delimiter between them. */
455 channel_name_len = strlen(channel_name) + 1;
456 pathname_len = strlen(pathname) + 1;
457
458 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
459 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
460 if (!msg) {
461 PERROR("zmalloc add_stream_2_11 command message");
462 ret = -1;
463 goto error;
464 }
465
466 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
467 msg->channel_name_len = htobe32(channel_name_len);
468
469 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
470 msg->pathname_len = htobe32(pathname_len);
471
472 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
473 ret = -1;
474 goto error;
475 }
476 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
477 ret = -1;
478 goto error;
479 }
480
481 msg->tracefile_size = htobe64(tracefile_size);
482 msg->tracefile_count = htobe64(tracefile_count);
483 msg->trace_chunk_id = htobe64(trace_archive_id);
484
485 /* Send command */
486 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
487 if (ret < 0) {
488 goto error;
489 }
490 ret = 0;
491 error:
492 free(msg);
493 return ret;
494 }
495
496 /*
497 * Add stream on the relayd and assign stream handle to the stream_id argument.
498 *
499 * Chunks are not supported by relayd prior to 2.11, but are used to
500 * internally between session daemon and consumer daemon to keep track
501 * of the channel and stream output path.
502 *
503 * On success return 0 else return ret_code negative value.
504 */
505 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
506 const char *domain_name, const char *_pathname, uint64_t *stream_id,
507 uint64_t tracefile_size, uint64_t tracefile_count,
508 struct lttng_trace_chunk *trace_chunk)
509 {
510 int ret;
511 struct lttcomm_relayd_status_stream reply;
512 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
513
514 /* Code flow error. Safety net. */
515 LTTNG_ASSERT(rsock);
516 LTTNG_ASSERT(channel_name);
517 LTTNG_ASSERT(domain_name);
518 LTTNG_ASSERT(_pathname);
519 LTTNG_ASSERT(trace_chunk);
520
521 DBG("Relayd adding stream for channel name %s", channel_name);
522
523 /* Compat with relayd 2.1 */
524 if (rsock->minor == 1) {
525 /* For 2.1 */
526 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
527
528 } else if (rsock->minor > 1 && rsock->minor < 11) {
529 /* From 2.2 to 2.10 */
530 ret = relayd_add_stream_2_2(rsock, channel_name, _pathname,
531 tracefile_size, tracefile_count);
532 } else {
533 const char *separator;
534 enum lttng_trace_chunk_status chunk_status;
535 uint64_t chunk_id;
536
537 if (_pathname[0] == '\0') {
538 separator = "";
539 } else {
540 separator = "/";
541 }
542
543 ret = snprintf(pathname, RELAYD_COMM_LTTNG_PATH_MAX, "%s%s%s",
544 domain_name, separator, _pathname);
545 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
546 ERR("Failed to format stream path: %s",
547 ret <= 0 ? "formatting error" :
548 "path exceeds maximal allowed length");
549 ret = -1;
550 goto error;
551 }
552
553 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
554 &chunk_id);
555 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
556
557 /* From 2.11 to ...*/
558 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
559 tracefile_size, tracefile_count,
560 chunk_id);
561 }
562
563 if (ret) {
564 ret = -1;
565 goto error;
566 }
567
568 /* Waiting for reply */
569 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
570 if (ret < 0) {
571 goto error;
572 }
573
574 /* Back to host bytes order. */
575 reply.handle = be64toh(reply.handle);
576 reply.ret_code = be32toh(reply.ret_code);
577
578 /* Return session id or negative ret code. */
579 if (reply.ret_code != LTTNG_OK) {
580 ret = -1;
581 ERR("Relayd add stream replied error %d", reply.ret_code);
582 } else {
583 /* Success */
584 ret = 0;
585 *stream_id = reply.handle;
586 }
587
588 DBG("Relayd stream added successfully with handle %" PRIu64,
589 reply.handle);
590
591 error:
592 return ret;
593 }
594
595 /*
596 * Inform the relay that all the streams for the current channel has been sent.
597 *
598 * On success return 0 else return ret_code negative value.
599 */
600 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
601 {
602 int ret;
603 struct lttcomm_relayd_generic_reply reply;
604
605 /* Code flow error. Safety net. */
606 LTTNG_ASSERT(rsock);
607
608 DBG("Relayd sending streams sent.");
609
610 /* This feature was introduced in 2.4, ignore it for earlier versions. */
611 if (rsock->minor < 4) {
612 ret = 0;
613 goto end;
614 }
615
616 /* Send command */
617 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
618 if (ret < 0) {
619 goto error;
620 }
621
622 /* Waiting for reply */
623 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
624 if (ret < 0) {
625 goto error;
626 }
627
628 /* Back to host bytes order. */
629 reply.ret_code = be32toh(reply.ret_code);
630
631 /* Return session id or negative ret code. */
632 if (reply.ret_code != LTTNG_OK) {
633 ret = -1;
634 ERR("Relayd streams sent replied error %d", reply.ret_code);
635 goto error;
636 } else {
637 /* Success */
638 ret = 0;
639 }
640
641 DBG("Relayd streams sent success");
642
643 error:
644 end:
645 return ret;
646 }
647
648 /*
649 * Check version numbers on the relayd.
650 * If major versions are compatible, we assign minor_to_use to the
651 * minor version of the procotol we are going to use for this session.
652 *
653 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
654 * otherwise, or a negative value on network errors.
655 */
656 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
657 {
658 int ret;
659 struct lttcomm_relayd_version msg;
660
661 /* Code flow error. Safety net. */
662 LTTNG_ASSERT(rsock);
663
664 DBG("Relayd version check for major.minor %u.%u", rsock->major,
665 rsock->minor);
666
667 memset(&msg, 0, sizeof(msg));
668 /* Prepare network byte order before transmission. */
669 msg.major = htobe32(rsock->major);
670 msg.minor = htobe32(rsock->minor);
671
672 /* Send command */
673 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
674 if (ret < 0) {
675 goto error;
676 }
677
678 /* Receive response */
679 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
680 if (ret < 0) {
681 goto error;
682 }
683
684 /* Set back to host bytes order */
685 msg.major = be32toh(msg.major);
686 msg.minor = be32toh(msg.minor);
687
688 /*
689 * Only validate the major version. If the other side is higher,
690 * communication is not possible. Only major version equal can talk to each
691 * other. If the minor version differs, the lowest version is used by both
692 * sides.
693 */
694 if (msg.major != rsock->major) {
695 /* Not compatible */
696 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
697 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
698 msg.major, rsock->major);
699 goto error;
700 }
701
702 /*
703 * If the relayd's minor version is higher, it will adapt to our version so
704 * we can continue to use the latest relayd communication data structure.
705 * If the received minor version is higher, the relayd should adapt to us.
706 */
707 if (rsock->minor > msg.minor) {
708 rsock->minor = msg.minor;
709 }
710
711 /* Version number compatible */
712 DBG2("Relayd version is compatible, using protocol version %u.%u",
713 rsock->major, rsock->minor);
714 ret = 0;
715
716 error:
717 return ret;
718 }
719
720 /*
721 * Add stream on the relayd and assign stream handle to the stream_id argument.
722 *
723 * On success return 0 else return ret_code negative value.
724 */
725 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
726 {
727 int ret;
728
729 /* Code flow error. Safety net. */
730 LTTNG_ASSERT(rsock);
731
732 DBG("Relayd sending metadata of size %zu", len);
733
734 /* Send command */
735 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
736 if (ret < 0) {
737 goto error;
738 }
739
740 DBG2("Relayd metadata added successfully");
741
742 /*
743 * After that call, the metadata data MUST be sent to the relayd so the
744 * receive size on the other end matches the len of the metadata packet
745 * header. This is why we don't wait for a reply here.
746 */
747
748 error:
749 return ret;
750 }
751
752 /*
753 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
754 */
755 int relayd_connect(struct lttcomm_relayd_sock *rsock)
756 {
757 /* Code flow error. Safety net. */
758 LTTNG_ASSERT(rsock);
759
760 if (!rsock->sock.ops) {
761 /*
762 * Attempting a connect on a non-initialized socket.
763 */
764 return -ECONNRESET;
765 }
766
767 DBG3("Relayd connect ...");
768
769 return rsock->sock.ops->connect(&rsock->sock);
770 }
771
772 /*
773 * Close relayd socket with an allocated lttcomm_relayd_sock.
774 *
775 * If no socket operations are found, simply return 0 meaning that everything
776 * is fine. Without operations, the socket can not possibly be opened or used.
777 * This is possible if the socket was allocated but not created. However, the
778 * caller could simply use it to store a valid file descriptor for instance
779 * passed over a Unix socket and call this to cleanup but still without a valid
780 * ops pointer.
781 *
782 * Return the close returned value. On error, a negative value is usually
783 * returned back from close(2).
784 */
785 int relayd_close(struct lttcomm_relayd_sock *rsock)
786 {
787 int ret;
788
789 /* Code flow error. Safety net. */
790 LTTNG_ASSERT(rsock);
791
792 /* An invalid fd is fine, return success. */
793 if (rsock->sock.fd < 0) {
794 ret = 0;
795 goto end;
796 }
797
798 DBG3("Relayd closing socket %d", rsock->sock.fd);
799
800 if (rsock->sock.ops) {
801 ret = rsock->sock.ops->close(&rsock->sock);
802 } else {
803 /* Default call if no specific ops found. */
804 ret = close(rsock->sock.fd);
805 if (ret < 0) {
806 PERROR("relayd_close default close");
807 }
808 }
809 rsock->sock.fd = -1;
810
811 end:
812 return ret;
813 }
814
815 /*
816 * Send data header structure to the relayd.
817 */
818 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
819 struct lttcomm_relayd_data_hdr *hdr, size_t size)
820 {
821 int ret;
822
823 /* Code flow error. Safety net. */
824 LTTNG_ASSERT(rsock);
825 LTTNG_ASSERT(hdr);
826
827 if (rsock->sock.fd < 0) {
828 return -ECONNRESET;
829 }
830
831 DBG3("Relayd sending data header of size %zu", size);
832
833 /* Again, safety net */
834 if (size == 0) {
835 size = sizeof(struct lttcomm_relayd_data_hdr);
836 }
837
838 /* Only send data header. */
839 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
840 if (ret < 0) {
841 ret = -errno;
842 goto error;
843 }
844
845 /*
846 * The data MUST be sent right after that command for the receive on the
847 * other end to match the size in the header.
848 */
849
850 error:
851 return ret;
852 }
853
854 /*
855 * Send close stream command to the relayd.
856 */
857 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
858 uint64_t last_net_seq_num)
859 {
860 int ret;
861 struct lttcomm_relayd_close_stream msg;
862 struct lttcomm_relayd_generic_reply reply;
863
864 /* Code flow error. Safety net. */
865 LTTNG_ASSERT(rsock);
866
867 DBG("Relayd closing stream id %" PRIu64, stream_id);
868
869 memset(&msg, 0, sizeof(msg));
870 msg.stream_id = htobe64(stream_id);
871 msg.last_net_seq_num = htobe64(last_net_seq_num);
872
873 /* Send command */
874 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
875 if (ret < 0) {
876 goto error;
877 }
878
879 /* Receive response */
880 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
881 if (ret < 0) {
882 goto error;
883 }
884
885 reply.ret_code = be32toh(reply.ret_code);
886
887 /* Return session id or negative ret code. */
888 if (reply.ret_code != LTTNG_OK) {
889 ret = -1;
890 ERR("Relayd close stream replied error %d", reply.ret_code);
891 } else {
892 /* Success */
893 ret = 0;
894 }
895
896 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
897
898 error:
899 return ret;
900 }
901
902 /*
903 * Check for data availability for a given stream id.
904 *
905 * Return 0 if NOT pending, 1 if so and a negative value on error.
906 */
907 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
908 uint64_t last_net_seq_num)
909 {
910 int ret;
911 struct lttcomm_relayd_data_pending msg;
912 struct lttcomm_relayd_generic_reply reply;
913
914 /* Code flow error. Safety net. */
915 LTTNG_ASSERT(rsock);
916
917 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
918
919 memset(&msg, 0, sizeof(msg));
920 msg.stream_id = htobe64(stream_id);
921 msg.last_net_seq_num = htobe64(last_net_seq_num);
922
923 /* Send command */
924 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
925 sizeof(msg), 0);
926 if (ret < 0) {
927 goto error;
928 }
929
930 /* Receive response */
931 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
932 if (ret < 0) {
933 goto error;
934 }
935
936 reply.ret_code = be32toh(reply.ret_code);
937
938 /* Return session id or negative ret code. */
939 if (reply.ret_code >= LTTNG_OK) {
940 ERR("Relayd data pending replied error %d", reply.ret_code);
941 }
942
943 /* At this point, the ret code is either 1 or 0 */
944 ret = reply.ret_code;
945
946 DBG("Relayd data is %s pending for stream id %" PRIu64,
947 ret == 1 ? "" : "NOT", stream_id);
948
949 error:
950 return ret;
951 }
952
953 /*
954 * Check on the relayd side for a quiescent state on the control socket.
955 */
956 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
957 uint64_t metadata_stream_id)
958 {
959 int ret;
960 struct lttcomm_relayd_quiescent_control msg;
961 struct lttcomm_relayd_generic_reply reply;
962
963 /* Code flow error. Safety net. */
964 LTTNG_ASSERT(rsock);
965
966 DBG("Relayd checking quiescent control state");
967
968 memset(&msg, 0, sizeof(msg));
969 msg.stream_id = htobe64(metadata_stream_id);
970
971 /* Send command */
972 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
973 if (ret < 0) {
974 goto error;
975 }
976
977 /* Receive response */
978 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
979 if (ret < 0) {
980 goto error;
981 }
982
983 reply.ret_code = be32toh(reply.ret_code);
984
985 /* Return session id or negative ret code. */
986 if (reply.ret_code != LTTNG_OK) {
987 ret = -1;
988 ERR("Relayd quiescent control replied error %d", reply.ret_code);
989 goto error;
990 }
991
992 /* Control socket is quiescent */
993 return 0;
994
995 error:
996 return ret;
997 }
998
999 /*
1000 * Begin a data pending command for a specific session id.
1001 */
1002 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
1003 {
1004 int ret;
1005 struct lttcomm_relayd_begin_data_pending msg;
1006 struct lttcomm_relayd_generic_reply reply;
1007
1008 /* Code flow error. Safety net. */
1009 LTTNG_ASSERT(rsock);
1010
1011 DBG("Relayd begin data pending");
1012
1013 memset(&msg, 0, sizeof(msg));
1014 msg.session_id = htobe64(id);
1015
1016 /* Send command */
1017 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
1018 if (ret < 0) {
1019 goto error;
1020 }
1021
1022 /* Receive response */
1023 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1024 if (ret < 0) {
1025 goto error;
1026 }
1027
1028 reply.ret_code = be32toh(reply.ret_code);
1029
1030 /* Return session id or negative ret code. */
1031 if (reply.ret_code != LTTNG_OK) {
1032 ret = -1;
1033 ERR("Relayd begin data pending replied error %d", reply.ret_code);
1034 goto error;
1035 }
1036
1037 return 0;
1038
1039 error:
1040 return ret;
1041 }
1042
1043 /*
1044 * End a data pending command for a specific session id.
1045 *
1046 * Return 0 on success and set is_data_inflight to 0 if no data is being
1047 * streamed or 1 if it is the case.
1048 */
1049 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
1050 unsigned int *is_data_inflight)
1051 {
1052 int ret, recv_ret;
1053 struct lttcomm_relayd_end_data_pending msg;
1054 struct lttcomm_relayd_generic_reply reply;
1055
1056 /* Code flow error. Safety net. */
1057 LTTNG_ASSERT(rsock);
1058
1059 DBG("Relayd end data pending");
1060
1061 memset(&msg, 0, sizeof(msg));
1062 msg.session_id = htobe64(id);
1063
1064 /* Send command */
1065 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
1066 if (ret < 0) {
1067 goto error;
1068 }
1069
1070 /* Receive response */
1071 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1072 if (ret < 0) {
1073 goto error;
1074 }
1075
1076 recv_ret = be32toh(reply.ret_code);
1077 if (recv_ret < 0) {
1078 ret = recv_ret;
1079 goto error;
1080 }
1081
1082 *is_data_inflight = recv_ret;
1083
1084 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1085
1086 return 0;
1087
1088 error:
1089 return ret;
1090 }
1091
1092 /*
1093 * Send index to the relayd.
1094 */
1095 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1096 struct ctf_packet_index *index, uint64_t relay_stream_id,
1097 uint64_t net_seq_num)
1098 {
1099 int ret;
1100 struct lttcomm_relayd_index msg;
1101 struct lttcomm_relayd_generic_reply reply;
1102
1103 /* Code flow error. Safety net. */
1104 LTTNG_ASSERT(rsock);
1105
1106 if (rsock->minor < 4) {
1107 DBG("Not sending indexes before protocol 2.4");
1108 ret = 0;
1109 goto error;
1110 }
1111
1112 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1113
1114 memset(&msg, 0, sizeof(msg));
1115 msg.relay_stream_id = htobe64(relay_stream_id);
1116 msg.net_seq_num = htobe64(net_seq_num);
1117
1118 /* The index is already in big endian. */
1119 msg.packet_size = index->packet_size;
1120 msg.content_size = index->content_size;
1121 msg.timestamp_begin = index->timestamp_begin;
1122 msg.timestamp_end = index->timestamp_end;
1123 msg.events_discarded = index->events_discarded;
1124 msg.stream_id = index->stream_id;
1125
1126 if (rsock->minor >= 8) {
1127 msg.stream_instance_id = index->stream_instance_id;
1128 msg.packet_seq_num = index->packet_seq_num;
1129 }
1130
1131 /* Send command */
1132 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1133 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1134 rsock->minor),
1135 lttng_to_index_minor(rsock->major, rsock->minor)),
1136 0);
1137 if (ret < 0) {
1138 goto error;
1139 }
1140
1141 /* Receive response */
1142 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1143 if (ret < 0) {
1144 goto error;
1145 }
1146
1147 reply.ret_code = be32toh(reply.ret_code);
1148
1149 /* Return session id or negative ret code. */
1150 if (reply.ret_code != LTTNG_OK) {
1151 ret = -1;
1152 ERR("Relayd send index replied error %d", reply.ret_code);
1153 } else {
1154 /* Success */
1155 ret = 0;
1156 }
1157
1158 error:
1159 return ret;
1160 }
1161
1162 /*
1163 * Ask the relay to reset the metadata trace file (regeneration).
1164 */
1165 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1166 uint64_t stream_id, uint64_t version)
1167 {
1168 int ret;
1169 struct lttcomm_relayd_reset_metadata msg;
1170 struct lttcomm_relayd_generic_reply reply;
1171
1172 /* Code flow error. Safety net. */
1173 LTTNG_ASSERT(rsock);
1174
1175 /* Should have been prevented by the sessiond. */
1176 if (rsock->minor < 8) {
1177 ERR("Metadata regeneration unsupported before 2.8");
1178 ret = -1;
1179 goto error;
1180 }
1181
1182 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1183
1184 memset(&msg, 0, sizeof(msg));
1185 msg.stream_id = htobe64(stream_id);
1186 msg.version = htobe64(version);
1187
1188 /* Send command */
1189 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1190 if (ret < 0) {
1191 goto error;
1192 }
1193
1194 /* Receive response */
1195 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1196 if (ret < 0) {
1197 goto error;
1198 }
1199
1200 reply.ret_code = be32toh(reply.ret_code);
1201
1202 /* Return session id or negative ret code. */
1203 if (reply.ret_code != LTTNG_OK) {
1204 ret = -1;
1205 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1206 } else {
1207 /* Success */
1208 ret = 0;
1209 }
1210
1211 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1212
1213 error:
1214 return ret;
1215 }
1216
1217 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1218 unsigned int stream_count, const uint64_t *new_chunk_id,
1219 const struct relayd_stream_rotation_position *positions)
1220 {
1221 int ret;
1222 unsigned int i;
1223 struct lttng_dynamic_buffer payload;
1224 struct lttcomm_relayd_generic_reply reply = {};
1225 struct lttcomm_relayd_rotate_streams msg;
1226 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1227 const char *new_chunk_id_str;
1228
1229 msg.stream_count = htobe32((uint32_t) stream_count);
1230 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1231 .is_set = !!new_chunk_id,
1232 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1233 };
1234
1235 if (!relayd_supports_chunks(sock)) {
1236 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1237 return 0;
1238 }
1239
1240 lttng_dynamic_buffer_init(&payload);
1241
1242 /* Code flow error. Safety net. */
1243 LTTNG_ASSERT(sock);
1244
1245 if (new_chunk_id) {
1246 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1247 "%" PRIu64, *new_chunk_id);
1248 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1249 new_chunk_id_str = "formatting error";
1250 } else {
1251 new_chunk_id_str = new_chunk_id_buf;
1252 }
1253 } else {
1254 new_chunk_id_str = "none";
1255 }
1256
1257 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1258 new_chunk_id_str, stream_count);
1259
1260 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1261 if (ret) {
1262 ERR("Failed to allocate \"rotate streams\" command payload");
1263 goto error;
1264 }
1265
1266 for (i = 0; i < stream_count; i++) {
1267 const struct relayd_stream_rotation_position *position =
1268 &positions[i];
1269 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1270 .stream_id = htobe64(position->stream_id),
1271 .rotate_at_seq_num = htobe64(
1272 position->rotate_at_seq_num),
1273 };
1274
1275 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
1276 position->stream_id,
1277 position->rotate_at_seq_num);
1278 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1279 sizeof(comm_position));
1280 if (ret) {
1281 ERR("Failed to allocate \"rotate streams\" command payload");
1282 goto error;
1283 }
1284 }
1285
1286 /* Send command. */
1287 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1288 payload.size, 0);
1289 if (ret < 0) {
1290 ERR("Failed to send \"rotate stream\" command");
1291 goto error;
1292 }
1293
1294 /* Receive response. */
1295 ret = recv_reply(sock, &reply, sizeof(reply));
1296 if (ret < 0) {
1297 ERR("Failed to receive \"rotate streams\" command reply");
1298 goto error;
1299 }
1300
1301 reply.ret_code = be32toh(reply.ret_code);
1302 if (reply.ret_code != LTTNG_OK) {
1303 ret = -1;
1304 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1305 } else {
1306 /* Success. */
1307 ret = 0;
1308 DBG("Relayd rotated streams successfully");
1309 }
1310
1311 error:
1312 lttng_dynamic_buffer_reset(&payload);
1313 return ret;
1314 }
1315
1316 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1317 struct lttng_trace_chunk *chunk)
1318 {
1319 int ret = 0;
1320 enum lttng_trace_chunk_status status;
1321 struct lttcomm_relayd_create_trace_chunk msg = {};
1322 struct lttcomm_relayd_generic_reply reply = {};
1323 struct lttng_dynamic_buffer payload;
1324 uint64_t chunk_id;
1325 time_t creation_timestamp;
1326 const char *chunk_name;
1327 size_t chunk_name_length;
1328 bool overridden_name;
1329
1330 lttng_dynamic_buffer_init(&payload);
1331
1332 if (!relayd_supports_chunks(sock)) {
1333 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1334 goto end;
1335 }
1336
1337 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1338 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1339 ret = -1;
1340 goto end;
1341 }
1342
1343 status = lttng_trace_chunk_get_creation_timestamp(
1344 chunk, &creation_timestamp);
1345 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1346 ret = -1;
1347 goto end;
1348 }
1349
1350 status = lttng_trace_chunk_get_name(
1351 chunk, &chunk_name, &overridden_name);
1352 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1353 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1354 ret = -1;
1355 goto end;
1356 }
1357
1358 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1359 msg.chunk_id = htobe64(chunk_id);
1360 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1361 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
1362
1363 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1364 if (ret) {
1365 goto end;
1366 }
1367 if (chunk_name_length) {
1368 ret = lttng_dynamic_buffer_append(
1369 &payload, chunk_name, chunk_name_length);
1370 if (ret) {
1371 goto end;
1372 }
1373 }
1374
1375 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1376 payload.size, 0);
1377 if (ret < 0) {
1378 ERR("Failed to send trace chunk creation command to relay daemon");
1379 goto end;
1380 }
1381
1382 ret = recv_reply(sock, &reply, sizeof(reply));
1383 if (ret < 0) {
1384 ERR("Failed to receive relay daemon trace chunk creation command reply");
1385 goto end;
1386 }
1387
1388 reply.ret_code = be32toh(reply.ret_code);
1389 if (reply.ret_code != LTTNG_OK) {
1390 ret = -1;
1391 ERR("Relayd trace chunk create replied error %d",
1392 reply.ret_code);
1393 } else {
1394 ret = 0;
1395 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1396 chunk_id);
1397 }
1398
1399 end:
1400 lttng_dynamic_buffer_reset(&payload);
1401 return ret;
1402 }
1403
1404 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1405 struct lttng_trace_chunk *chunk,
1406 char *path)
1407 {
1408 int ret = 0;
1409 enum lttng_trace_chunk_status status;
1410 struct lttcomm_relayd_close_trace_chunk msg = {};
1411 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
1412 uint64_t chunk_id;
1413 time_t close_timestamp;
1414 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1415
1416 if (!relayd_supports_chunks(sock)) {
1417 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1418 goto end;
1419 }
1420
1421 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1422 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1423 ERR("Failed to get trace chunk id");
1424 ret = -1;
1425 goto end;
1426 }
1427
1428 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1429 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1430 ERR("Failed to get trace chunk close timestamp");
1431 ret = -1;
1432 goto end;
1433 }
1434
1435 status = lttng_trace_chunk_get_close_command(chunk,
1436 &close_command.value);
1437 switch (status) {
1438 case LTTNG_TRACE_CHUNK_STATUS_OK:
1439 close_command.is_set = 1;
1440 break;
1441 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1442 break;
1443 default:
1444 ERR("Failed to get trace chunk close command");
1445 ret = -1;
1446 goto end;
1447 }
1448
1449 msg = (typeof(msg)){
1450 .chunk_id = htobe64(chunk_id),
1451 .close_timestamp = htobe64((uint64_t) close_timestamp),
1452 .close_command = {
1453 .is_set = close_command.is_set,
1454 .value = htobe32((uint32_t) close_command.value),
1455 },
1456 };
1457
1458 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1459 0);
1460 if (ret < 0) {
1461 ERR("Failed to send trace chunk close command to relay daemon");
1462 goto end;
1463 }
1464
1465 ret = recv_reply(sock, &reply, sizeof(reply));
1466 if (ret < 0) {
1467 ERR("Failed to receive relay daemon trace chunk close command reply");
1468 goto end;
1469 }
1470
1471 reply.path_length = be32toh(reply.path_length);
1472 if (reply.path_length >= LTTNG_PATH_MAX) {
1473 ERR("Chunk path too long");
1474 ret = -1;
1475 goto end;
1476 }
1477
1478 ret = recv_reply(sock, path, reply.path_length);
1479 if (ret < 0) {
1480 ERR("Failed to receive relay daemon trace chunk close command reply");
1481 goto end;
1482 }
1483 if (path[reply.path_length - 1] != '\0') {
1484 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1485 ret = -1;
1486 goto end;
1487 }
1488
1489 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1490 if (reply.generic.ret_code != LTTNG_OK) {
1491 ret = -1;
1492 ERR("Relayd trace chunk close replied error %d",
1493 reply.generic.ret_code);
1494 } else {
1495 ret = 0;
1496 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1497 chunk_id);
1498 }
1499 end:
1500 return ret;
1501 }
1502
1503 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1504 uint64_t chunk_id, bool *chunk_exists)
1505 {
1506 int ret = 0;
1507 struct lttcomm_relayd_trace_chunk_exists msg = {};
1508 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1509
1510 if (!relayd_supports_chunks(sock)) {
1511 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
1512 /* The chunk will never exist */
1513 *chunk_exists = false;
1514 goto end;
1515 }
1516
1517 msg = (typeof(msg)){
1518 .chunk_id = htobe64(chunk_id),
1519 };
1520
1521 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1522 0);
1523 if (ret < 0) {
1524 ERR("Failed to send trace chunk exists command to relay daemon");
1525 goto end;
1526 }
1527
1528 ret = recv_reply(sock, &reply, sizeof(reply));
1529 if (ret < 0) {
1530 ERR("Failed to receive relay daemon trace chunk close command reply");
1531 goto end;
1532 }
1533
1534 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1535 if (reply.generic.ret_code != LTTNG_OK) {
1536 ret = -1;
1537 ERR("Relayd trace chunk close replied error %d",
1538 reply.generic.ret_code);
1539 } else {
1540 ret = 0;
1541 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1542 ", exists = %s", chunk_id,
1543 reply.trace_chunk_exists ? "true" : "false");
1544 *chunk_exists = !!reply.trace_chunk_exists;
1545 }
1546 end:
1547 return ret;
1548 }
1549
1550 enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
1551 uint64_t query_flags,
1552 uint64_t& result_flags,
1553 uint64_t *trace_format_query_results)
1554 {
1555 int ret;
1556 enum lttng_error_code ret_code = LTTNG_OK;
1557 struct lttcomm_relayd_get_configuration msg = {};
1558 struct lttcomm_relayd_get_configuration_reply reply = {};
1559 lttng_dynamic_buffer buffer;
1560 bool requesting_trace_format = query_flags &
1561 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1562
1563 lttng_dynamic_buffer_init(&buffer);
1564
1565 assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
1566 assert(!(requesting_trace_format && !trace_format_query_results));
1567
1568 if (!relayd_supports_get_configuration(sock)) {
1569 DBG("Refusing to get relayd configuration (unsupported by relayd)");
1570 result_flags = 0;
1571 if (trace_format_query_results) {
1572 *trace_format_query_results =
1573 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1574 }
1575 goto end;
1576 }
1577
1578 if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
1579 /*
1580 * Provide default value for that query since lttng-relayd does
1581 * not know this query type.
1582 */
1583 if (trace_format_query_results) {
1584 *trace_format_query_results =
1585 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1586 }
1587 /* Remove from the query set. */
1588 query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1589 requesting_trace_format = false;
1590 }
1591
1592 msg.query_flags = htobe64(query_flags);
1593
1594 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
1595 if (ret < 0) {
1596 ERR("Failed to send get configuration command to relay daemon");
1597 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1598 goto end;
1599 }
1600
1601 ret = recv_reply(sock, &reply, sizeof(reply));
1602 if (ret < 0) {
1603 ERR("Failed to receive relay daemon get configuration command reply");
1604 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1605 goto end;
1606 }
1607
1608 ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
1609 if (ret_code != LTTNG_OK) {
1610 ERR("Relayd get configuration replied error %d", ret_code);
1611 goto end;
1612 }
1613
1614 result_flags = be64toh(reply.relayd_configuration_flags);
1615 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1616 ", results_flags = %" PRIu64,
1617 query_flags, result_flags);
1618
1619 if (!requesting_trace_format) {
1620 ret_code = LTTNG_OK;
1621 goto end;
1622 }
1623
1624 /* Receive trace formats */
1625 {
1626 lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
1627 ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
1628 if (ret < 0) {
1629 ERR("Failed to receive relay daemon get configuration query flag data");
1630 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1631 goto end;
1632 }
1633
1634 query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
1635 LTTNG_ASSERT(query_flag_reply.query_flag &
1636 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
1637
1638 query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
1639 LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
1640
1641 lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
1642
1643 ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
1644 if (ret < 0) {
1645 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1646 ERR("Failed to receive configuration dynamic payload for flag TODO");
1647 goto end;
1648 }
1649
1650 *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
1651 }
1652
1653 ret_code = LTTNG_OK;
1654
1655 end:
1656 lttng_dynamic_buffer_reset(&buffer);
1657 return ret_code;
1658 }
This page took 0.066471 seconds and 5 git commands to generate.