relayd: Extend relayd get configuration command to check for trace format support
[deliverable/lttng-tools.git] / src / common / relayd / relayd.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/stat.h>
13 #include <inttypes.h>
14
15 #include <bitset>
16 #include <common/common.hpp>
17 #include <common/compat/endian.hpp>
18 #include <common/compat/string.hpp>
19 #include <common/defaults.hpp>
20 #include <common/index/ctf-index.hpp>
21 #include <common/sessiond-comm/relayd.hpp>
22 #include <common/string-utils/format.hpp>
23 #include <common/trace-chunk.hpp>
24
25 #include "relayd.hpp"
26
27 static
28 bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
29 {
30 if (sock->major > 2) {
31 return true;
32 } else if (sock->major == 2 && sock->minor >= 11) {
33 return true;
34 }
35 return false;
36 }
37
38 static
39 bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
40 {
41 if (sock->major > 2) {
42 return true;
43 } else if (sock->major == 2 && sock->minor >= 12) {
44 return true;
45 }
46 return false;
47 }
48
49 static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
50 {
51 if (sock->major > 2) {
52 return true;
53 } else if (sock->major == 2 && sock->minor >= 15) {
54 return true;
55 }
56 return false;
57 }
58
59 /*
60 * Send command. Fill up the header and append the data.
61 */
62 static int send_command(struct lttcomm_relayd_sock *rsock,
63 enum lttcomm_relayd_command cmd, const void *data, size_t size,
64 int flags)
65 {
66 int ret;
67 struct lttcomm_relayd_hdr header;
68 char *buf;
69 uint64_t buf_size = sizeof(header);
70
71 if (rsock->sock.fd < 0) {
72 return -ECONNRESET;
73 }
74
75 if (data) {
76 buf_size += size;
77 }
78
79 buf = calloc<char>(buf_size);
80 if (buf == NULL) {
81 PERROR("zmalloc relayd send command buf");
82 ret = -ENOMEM;
83 goto alloc_error;
84 }
85
86 memset(&header, 0, sizeof(header));
87 header.cmd = htobe32(cmd);
88 header.data_size = htobe64(size);
89
90 /* Zeroed for now since not used. */
91 header.cmd_version = 0;
92 header.circuit_id = 0;
93
94 /* Prepare buffer to send. */
95 memcpy(buf, &header, sizeof(header));
96 if (data) {
97 memcpy(buf + sizeof(header), data, size);
98 }
99
100 DBG3("Relayd sending command %s of size %" PRIu64,
101 lttcomm_relayd_command_str(cmd), buf_size);
102 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
103 if (ret < 0) {
104 PERROR("Failed to send command %s of size %" PRIu64,
105 lttcomm_relayd_command_str(cmd), buf_size);
106 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
107 ret = -errno;
108 goto error;
109 }
110 error:
111 free(buf);
112 alloc_error:
113 return ret;
114 }
115
116 /*
117 * Receive reply data on socket. This MUST be call after send_command or else
118 * could result in unexpected behavior(s).
119 */
120 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
121 {
122 int ret;
123
124 if (rsock->sock.fd < 0) {
125 return -ECONNRESET;
126 }
127
128 DBG3("Relayd waiting for reply of size %zu", size);
129
130 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
131 if (ret <= 0 || ret != size) {
132 if (ret == 0) {
133 /* Orderly shutdown. */
134 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
135 } else {
136 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
137 rsock->sock.fd, size, ret);
138 }
139 /* Always return -1 here and the caller can use errno. */
140 ret = -1;
141 goto error;
142 }
143
144 error:
145 return ret;
146 }
147
148 /*
149 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
150 * hostname, and base_path) have no length restriction on the sender side.
151 * Length for both payloads is stored in the msg struct. A new dynamic size
152 * payload size is introduced.
153 */
154 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
155 const char *session_name, const char *hostname,
156 const char *base_path, int session_live_timer,
157 unsigned int snapshot, uint64_t sessiond_session_id,
158 const lttng_uuid& sessiond_uuid, const uint64_t *current_chunk_id,
159 time_t creation_time, bool session_name_contains_creation_time,
160 struct lttcomm_relayd_create_session_reply_2_11 *reply,
161 char *output_path)
162 {
163 int ret;
164 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
165 size_t session_name_len;
166 size_t hostname_len;
167 size_t base_path_len;
168 size_t msg_length;
169 char *dst;
170
171 if (!base_path) {
172 base_path = "";
173 }
174 /* The three names are sent with a '\0' delimiter between them. */
175 session_name_len = strlen(session_name) + 1;
176 hostname_len = strlen(hostname) + 1;
177 base_path_len = strlen(base_path) + 1;
178
179 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
180 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
181 if (!msg) {
182 PERROR("zmalloc create_session_2_11 command message");
183 ret = -1;
184 goto error;
185 }
186
187 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
188 msg->session_name_len = htobe32(session_name_len);
189
190 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
191 msg->hostname_len = htobe32(hostname_len);
192
193 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
194 msg->base_path_len = htobe32(base_path_len);
195
196 dst = msg->names;
197 if (lttng_strncpy(dst, session_name, session_name_len)) {
198 ret = -1;
199 goto error;
200 }
201 dst += session_name_len;
202 if (lttng_strncpy(dst, hostname, hostname_len)) {
203 ret = -1;
204 goto error;
205 }
206 dst += hostname_len;
207 if (lttng_strncpy(dst, base_path, base_path_len)) {
208 ret = -1;
209 goto error;
210 }
211
212 msg->live_timer = htobe32(session_live_timer);
213 msg->snapshot = !!snapshot;
214
215 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
216 msg->session_id = htobe64(sessiond_session_id);
217 msg->session_name_contains_creation_time = session_name_contains_creation_time;
218 if (current_chunk_id) {
219 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
220 htobe64(*current_chunk_id));
221 }
222
223 msg->creation_time = htobe64((uint64_t) creation_time);
224
225 /* Send command */
226 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
227 if (ret < 0) {
228 goto error;
229 }
230 /* Receive response */
231 ret = recv_reply(rsock, reply, sizeof(*reply));
232 if (ret < 0) {
233 goto error;
234 }
235 reply->generic.session_id = be64toh(reply->generic.session_id);
236 reply->generic.ret_code = be32toh(reply->generic.ret_code);
237 reply->output_path_length = be32toh(reply->output_path_length);
238 if (reply->output_path_length >= LTTNG_PATH_MAX) {
239 ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
240 reply->output_path_length, LTTNG_PATH_MAX);
241 ret = -1;
242 goto error;
243 }
244 ret = recv_reply(rsock, output_path, reply->output_path_length);
245 if (ret < 0) {
246 goto error;
247 }
248 error:
249 free(msg);
250 return ret;
251 }
252 /*
253 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
254 * support the live reading capability.
255 */
256 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
257 const char *session_name, const char *hostname,
258 int session_live_timer, unsigned int snapshot,
259 struct lttcomm_relayd_status_session *reply)
260 {
261 int ret;
262 struct lttcomm_relayd_create_session_2_4 msg;
263
264 if (lttng_strncpy(msg.session_name, session_name,
265 sizeof(msg.session_name))) {
266 ret = -1;
267 goto error;
268 }
269 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
270 ret = -1;
271 goto error;
272 }
273 msg.live_timer = htobe32(session_live_timer);
274 msg.snapshot = htobe32(snapshot);
275
276 /* Send command */
277 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
278 if (ret < 0) {
279 goto error;
280 }
281
282 /* Receive response */
283 ret = recv_reply(rsock, reply, sizeof(*reply));
284 if (ret < 0) {
285 goto error;
286 }
287 reply->session_id = be64toh(reply->session_id);
288 reply->ret_code = be32toh(reply->ret_code);
289 error:
290 return ret;
291 }
292
293 /*
294 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
295 */
296 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
297 struct lttcomm_relayd_status_session *reply)
298 {
299 int ret;
300
301 /* Send command */
302 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
303 if (ret < 0) {
304 goto error;
305 }
306
307 /* Receive response */
308 ret = recv_reply(rsock, reply, sizeof(*reply));
309 if (ret < 0) {
310 goto error;
311 }
312 reply->session_id = be64toh(reply->session_id);
313 reply->ret_code = be32toh(reply->ret_code);
314 error:
315 return ret;
316 }
317
318 /*
319 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
320 * set session_id of the relayd if we have a successful reply from the relayd.
321 *
322 * On success, return 0 else a negative value which is either an errno error or
323 * a lttng error code from the relayd.
324 */
325 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
326 uint64_t *relayd_session_id,
327 const char *session_name, const char *hostname,
328 const char *base_path, int session_live_timer,
329 unsigned int snapshot, uint64_t sessiond_session_id,
330 const lttng_uuid& sessiond_uuid,
331 const uint64_t *current_chunk_id,
332 time_t creation_time, bool session_name_contains_creation_time,
333 char *output_path)
334 {
335 int ret;
336 struct lttcomm_relayd_create_session_reply_2_11 reply = {};
337
338 LTTNG_ASSERT(rsock);
339 LTTNG_ASSERT(relayd_session_id);
340
341 DBG("Relayd create session");
342
343 if (rsock->minor < 4) {
344 /* From 2.1 to 2.3 */
345 ret = relayd_create_session_2_1(rsock, &reply.generic);
346 } else if (rsock->minor >= 4 && rsock->minor < 11) {
347 /* From 2.4 to 2.10 */
348 ret = relayd_create_session_2_4(rsock, session_name,
349 hostname, session_live_timer, snapshot,
350 &reply.generic);
351 } else {
352 /* From 2.11 to ... */
353 ret = relayd_create_session_2_11(rsock, session_name,
354 hostname, base_path, session_live_timer, snapshot,
355 sessiond_session_id, sessiond_uuid,
356 current_chunk_id, creation_time,
357 session_name_contains_creation_time,
358 &reply, output_path);
359 }
360
361 if (ret < 0) {
362 goto error;
363 }
364
365 /* Return session id or negative ret code. */
366 if (reply.generic.ret_code != LTTNG_OK) {
367 ret = -1;
368 ERR("Relayd create session replied error %d",
369 reply.generic.ret_code);
370 goto error;
371 } else {
372 ret = 0;
373 *relayd_session_id = reply.generic.session_id;
374 }
375
376 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
377
378 error:
379 return ret;
380 }
381
382 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
383 const char *channel_name, const char *pathname)
384 {
385 int ret;
386 struct lttcomm_relayd_add_stream msg;
387
388 memset(&msg, 0, sizeof(msg));
389 if (lttng_strncpy(msg.channel_name, channel_name,
390 sizeof(msg.channel_name))) {
391 ret = -1;
392 goto error;
393 }
394
395 if (lttng_strncpy(msg.pathname, pathname,
396 sizeof(msg.pathname))) {
397 ret = -1;
398 goto error;
399 }
400
401 /* Send command */
402 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
403 if (ret < 0) {
404 ret = -1;
405 goto error;
406 }
407 ret = 0;
408 error:
409 return ret;
410 }
411
412 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
413 const char *channel_name, const char *pathname,
414 uint64_t tracefile_size, uint64_t tracefile_count)
415 {
416 int ret;
417 struct lttcomm_relayd_add_stream_2_2 msg;
418
419 memset(&msg, 0, sizeof(msg));
420 /* Compat with relayd 2.2 to 2.10 */
421 if (lttng_strncpy(msg.channel_name, channel_name,
422 sizeof(msg.channel_name))) {
423 ret = -1;
424 goto error;
425 }
426 if (lttng_strncpy(msg.pathname, pathname,
427 sizeof(msg.pathname))) {
428 ret = -1;
429 goto error;
430 }
431 msg.tracefile_size = htobe64(tracefile_size);
432 msg.tracefile_count = htobe64(tracefile_count);
433
434 /* Send command */
435 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
436 if (ret < 0) {
437 goto error;
438 }
439 ret = 0;
440 error:
441 return ret;
442 }
443
444 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
445 const char *channel_name, const char *pathname,
446 uint64_t tracefile_size, uint64_t tracefile_count,
447 uint64_t trace_archive_id)
448 {
449 int ret;
450 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
451 size_t channel_name_len;
452 size_t pathname_len;
453 size_t msg_length;
454
455 /* The two names are sent with a '\0' delimiter between them. */
456 channel_name_len = strlen(channel_name) + 1;
457 pathname_len = strlen(pathname) + 1;
458
459 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
460 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
461 if (!msg) {
462 PERROR("zmalloc add_stream_2_11 command message");
463 ret = -1;
464 goto error;
465 }
466
467 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
468 msg->channel_name_len = htobe32(channel_name_len);
469
470 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
471 msg->pathname_len = htobe32(pathname_len);
472
473 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
474 ret = -1;
475 goto error;
476 }
477 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
478 ret = -1;
479 goto error;
480 }
481
482 msg->tracefile_size = htobe64(tracefile_size);
483 msg->tracefile_count = htobe64(tracefile_count);
484 msg->trace_chunk_id = htobe64(trace_archive_id);
485
486 /* Send command */
487 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
488 if (ret < 0) {
489 goto error;
490 }
491 ret = 0;
492 error:
493 free(msg);
494 return ret;
495 }
496
497 /*
498 * Add stream on the relayd and assign stream handle to the stream_id argument.
499 *
500 * Chunks are not supported by relayd prior to 2.11, but are used to
501 * internally between session daemon and consumer daemon to keep track
502 * of the channel and stream output path.
503 *
504 * On success return 0 else return ret_code negative value.
505 */
506 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
507 const char *domain_name, const char *_pathname, uint64_t *stream_id,
508 uint64_t tracefile_size, uint64_t tracefile_count,
509 struct lttng_trace_chunk *trace_chunk)
510 {
511 int ret;
512 struct lttcomm_relayd_status_stream reply;
513 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
514
515 /* Code flow error. Safety net. */
516 LTTNG_ASSERT(rsock);
517 LTTNG_ASSERT(channel_name);
518 LTTNG_ASSERT(domain_name);
519 LTTNG_ASSERT(_pathname);
520 LTTNG_ASSERT(trace_chunk);
521
522 DBG("Relayd adding stream for channel name %s", channel_name);
523
524 /* Compat with relayd 2.1 */
525 if (rsock->minor == 1) {
526 /* For 2.1 */
527 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
528
529 } else if (rsock->minor > 1 && rsock->minor < 11) {
530 /* From 2.2 to 2.10 */
531 ret = relayd_add_stream_2_2(rsock, channel_name, _pathname,
532 tracefile_size, tracefile_count);
533 } else {
534 const char *separator;
535 enum lttng_trace_chunk_status chunk_status;
536 uint64_t chunk_id;
537
538 if (_pathname[0] == '\0') {
539 separator = "";
540 } else {
541 separator = "/";
542 }
543
544 ret = snprintf(pathname, RELAYD_COMM_LTTNG_PATH_MAX, "%s%s%s",
545 domain_name, separator, _pathname);
546 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
547 ERR("Failed to format stream path: %s",
548 ret <= 0 ? "formatting error" :
549 "path exceeds maximal allowed length");
550 ret = -1;
551 goto error;
552 }
553
554 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
555 &chunk_id);
556 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
557
558 /* From 2.11 to ...*/
559 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
560 tracefile_size, tracefile_count,
561 chunk_id);
562 }
563
564 if (ret) {
565 ret = -1;
566 goto error;
567 }
568
569 /* Waiting for reply */
570 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
571 if (ret < 0) {
572 goto error;
573 }
574
575 /* Back to host bytes order. */
576 reply.handle = be64toh(reply.handle);
577 reply.ret_code = be32toh(reply.ret_code);
578
579 /* Return session id or negative ret code. */
580 if (reply.ret_code != LTTNG_OK) {
581 ret = -1;
582 ERR("Relayd add stream replied error %d", reply.ret_code);
583 } else {
584 /* Success */
585 ret = 0;
586 *stream_id = reply.handle;
587 }
588
589 DBG("Relayd stream added successfully with handle %" PRIu64,
590 reply.handle);
591
592 error:
593 return ret;
594 }
595
596 /*
597 * Inform the relay that all the streams for the current channel has been sent.
598 *
599 * On success return 0 else return ret_code negative value.
600 */
601 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
602 {
603 int ret;
604 struct lttcomm_relayd_generic_reply reply;
605
606 /* Code flow error. Safety net. */
607 LTTNG_ASSERT(rsock);
608
609 DBG("Relayd sending streams sent.");
610
611 /* This feature was introduced in 2.4, ignore it for earlier versions. */
612 if (rsock->minor < 4) {
613 ret = 0;
614 goto end;
615 }
616
617 /* Send command */
618 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
619 if (ret < 0) {
620 goto error;
621 }
622
623 /* Waiting for reply */
624 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
625 if (ret < 0) {
626 goto error;
627 }
628
629 /* Back to host bytes order. */
630 reply.ret_code = be32toh(reply.ret_code);
631
632 /* Return session id or negative ret code. */
633 if (reply.ret_code != LTTNG_OK) {
634 ret = -1;
635 ERR("Relayd streams sent replied error %d", reply.ret_code);
636 goto error;
637 } else {
638 /* Success */
639 ret = 0;
640 }
641
642 DBG("Relayd streams sent success");
643
644 error:
645 end:
646 return ret;
647 }
648
649 /*
650 * Check version numbers on the relayd.
651 * If major versions are compatible, we assign minor_to_use to the
652 * minor version of the procotol we are going to use for this session.
653 *
654 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
655 * otherwise, or a negative value on network errors.
656 */
657 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
658 {
659 int ret;
660 struct lttcomm_relayd_version msg;
661
662 /* Code flow error. Safety net. */
663 LTTNG_ASSERT(rsock);
664
665 DBG("Relayd version check for major.minor %u.%u", rsock->major,
666 rsock->minor);
667
668 memset(&msg, 0, sizeof(msg));
669 /* Prepare network byte order before transmission. */
670 msg.major = htobe32(rsock->major);
671 msg.minor = htobe32(rsock->minor);
672
673 /* Send command */
674 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
675 if (ret < 0) {
676 goto error;
677 }
678
679 /* Receive response */
680 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
681 if (ret < 0) {
682 goto error;
683 }
684
685 /* Set back to host bytes order */
686 msg.major = be32toh(msg.major);
687 msg.minor = be32toh(msg.minor);
688
689 /*
690 * Only validate the major version. If the other side is higher,
691 * communication is not possible. Only major version equal can talk to each
692 * other. If the minor version differs, the lowest version is used by both
693 * sides.
694 */
695 if (msg.major != rsock->major) {
696 /* Not compatible */
697 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
698 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
699 msg.major, rsock->major);
700 goto error;
701 }
702
703 /*
704 * If the relayd's minor version is higher, it will adapt to our version so
705 * we can continue to use the latest relayd communication data structure.
706 * If the received minor version is higher, the relayd should adapt to us.
707 */
708 if (rsock->minor > msg.minor) {
709 rsock->minor = msg.minor;
710 }
711
712 /* Version number compatible */
713 DBG2("Relayd version is compatible, using protocol version %u.%u",
714 rsock->major, rsock->minor);
715 ret = 0;
716
717 error:
718 return ret;
719 }
720
721 /*
722 * Add stream on the relayd and assign stream handle to the stream_id argument.
723 *
724 * On success return 0 else return ret_code negative value.
725 */
726 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
727 {
728 int ret;
729
730 /* Code flow error. Safety net. */
731 LTTNG_ASSERT(rsock);
732
733 DBG("Relayd sending metadata of size %zu", len);
734
735 /* Send command */
736 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
737 if (ret < 0) {
738 goto error;
739 }
740
741 DBG2("Relayd metadata added successfully");
742
743 /*
744 * After that call, the metadata data MUST be sent to the relayd so the
745 * receive size on the other end matches the len of the metadata packet
746 * header. This is why we don't wait for a reply here.
747 */
748
749 error:
750 return ret;
751 }
752
753 /*
754 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
755 */
756 int relayd_connect(struct lttcomm_relayd_sock *rsock)
757 {
758 /* Code flow error. Safety net. */
759 LTTNG_ASSERT(rsock);
760
761 if (!rsock->sock.ops) {
762 /*
763 * Attempting a connect on a non-initialized socket.
764 */
765 return -ECONNRESET;
766 }
767
768 DBG3("Relayd connect ...");
769
770 return rsock->sock.ops->connect(&rsock->sock);
771 }
772
773 /*
774 * Close relayd socket with an allocated lttcomm_relayd_sock.
775 *
776 * If no socket operations are found, simply return 0 meaning that everything
777 * is fine. Without operations, the socket can not possibly be opened or used.
778 * This is possible if the socket was allocated but not created. However, the
779 * caller could simply use it to store a valid file descriptor for instance
780 * passed over a Unix socket and call this to cleanup but still without a valid
781 * ops pointer.
782 *
783 * Return the close returned value. On error, a negative value is usually
784 * returned back from close(2).
785 */
786 int relayd_close(struct lttcomm_relayd_sock *rsock)
787 {
788 int ret;
789
790 /* Code flow error. Safety net. */
791 LTTNG_ASSERT(rsock);
792
793 /* An invalid fd is fine, return success. */
794 if (rsock->sock.fd < 0) {
795 ret = 0;
796 goto end;
797 }
798
799 DBG3("Relayd closing socket %d", rsock->sock.fd);
800
801 if (rsock->sock.ops) {
802 ret = rsock->sock.ops->close(&rsock->sock);
803 } else {
804 /* Default call if no specific ops found. */
805 ret = close(rsock->sock.fd);
806 if (ret < 0) {
807 PERROR("relayd_close default close");
808 }
809 }
810 rsock->sock.fd = -1;
811
812 end:
813 return ret;
814 }
815
816 /*
817 * Send data header structure to the relayd.
818 */
819 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
820 struct lttcomm_relayd_data_hdr *hdr, size_t size)
821 {
822 int ret;
823
824 /* Code flow error. Safety net. */
825 LTTNG_ASSERT(rsock);
826 LTTNG_ASSERT(hdr);
827
828 if (rsock->sock.fd < 0) {
829 return -ECONNRESET;
830 }
831
832 DBG3("Relayd sending data header of size %zu", size);
833
834 /* Again, safety net */
835 if (size == 0) {
836 size = sizeof(struct lttcomm_relayd_data_hdr);
837 }
838
839 /* Only send data header. */
840 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
841 if (ret < 0) {
842 ret = -errno;
843 goto error;
844 }
845
846 /*
847 * The data MUST be sent right after that command for the receive on the
848 * other end to match the size in the header.
849 */
850
851 error:
852 return ret;
853 }
854
855 /*
856 * Send close stream command to the relayd.
857 */
858 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
859 uint64_t last_net_seq_num)
860 {
861 int ret;
862 struct lttcomm_relayd_close_stream msg;
863 struct lttcomm_relayd_generic_reply reply;
864
865 /* Code flow error. Safety net. */
866 LTTNG_ASSERT(rsock);
867
868 DBG("Relayd closing stream id %" PRIu64, stream_id);
869
870 memset(&msg, 0, sizeof(msg));
871 msg.stream_id = htobe64(stream_id);
872 msg.last_net_seq_num = htobe64(last_net_seq_num);
873
874 /* Send command */
875 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
876 if (ret < 0) {
877 goto error;
878 }
879
880 /* Receive response */
881 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
882 if (ret < 0) {
883 goto error;
884 }
885
886 reply.ret_code = be32toh(reply.ret_code);
887
888 /* Return session id or negative ret code. */
889 if (reply.ret_code != LTTNG_OK) {
890 ret = -1;
891 ERR("Relayd close stream replied error %d", reply.ret_code);
892 } else {
893 /* Success */
894 ret = 0;
895 }
896
897 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
898
899 error:
900 return ret;
901 }
902
903 /*
904 * Check for data availability for a given stream id.
905 *
906 * Return 0 if NOT pending, 1 if so and a negative value on error.
907 */
908 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
909 uint64_t last_net_seq_num)
910 {
911 int ret;
912 struct lttcomm_relayd_data_pending msg;
913 struct lttcomm_relayd_generic_reply reply;
914
915 /* Code flow error. Safety net. */
916 LTTNG_ASSERT(rsock);
917
918 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
919
920 memset(&msg, 0, sizeof(msg));
921 msg.stream_id = htobe64(stream_id);
922 msg.last_net_seq_num = htobe64(last_net_seq_num);
923
924 /* Send command */
925 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
926 sizeof(msg), 0);
927 if (ret < 0) {
928 goto error;
929 }
930
931 /* Receive response */
932 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
933 if (ret < 0) {
934 goto error;
935 }
936
937 reply.ret_code = be32toh(reply.ret_code);
938
939 /* Return session id or negative ret code. */
940 if (reply.ret_code >= LTTNG_OK) {
941 ERR("Relayd data pending replied error %d", reply.ret_code);
942 }
943
944 /* At this point, the ret code is either 1 or 0 */
945 ret = reply.ret_code;
946
947 DBG("Relayd data is %s pending for stream id %" PRIu64,
948 ret == 1 ? "" : "NOT", stream_id);
949
950 error:
951 return ret;
952 }
953
954 /*
955 * Check on the relayd side for a quiescent state on the control socket.
956 */
957 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
958 uint64_t metadata_stream_id)
959 {
960 int ret;
961 struct lttcomm_relayd_quiescent_control msg;
962 struct lttcomm_relayd_generic_reply reply;
963
964 /* Code flow error. Safety net. */
965 LTTNG_ASSERT(rsock);
966
967 DBG("Relayd checking quiescent control state");
968
969 memset(&msg, 0, sizeof(msg));
970 msg.stream_id = htobe64(metadata_stream_id);
971
972 /* Send command */
973 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
974 if (ret < 0) {
975 goto error;
976 }
977
978 /* Receive response */
979 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
980 if (ret < 0) {
981 goto error;
982 }
983
984 reply.ret_code = be32toh(reply.ret_code);
985
986 /* Return session id or negative ret code. */
987 if (reply.ret_code != LTTNG_OK) {
988 ret = -1;
989 ERR("Relayd quiescent control replied error %d", reply.ret_code);
990 goto error;
991 }
992
993 /* Control socket is quiescent */
994 return 0;
995
996 error:
997 return ret;
998 }
999
1000 /*
1001 * Begin a data pending command for a specific session id.
1002 */
1003 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
1004 {
1005 int ret;
1006 struct lttcomm_relayd_begin_data_pending msg;
1007 struct lttcomm_relayd_generic_reply reply;
1008
1009 /* Code flow error. Safety net. */
1010 LTTNG_ASSERT(rsock);
1011
1012 DBG("Relayd begin data pending");
1013
1014 memset(&msg, 0, sizeof(msg));
1015 msg.session_id = htobe64(id);
1016
1017 /* Send command */
1018 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
1019 if (ret < 0) {
1020 goto error;
1021 }
1022
1023 /* Receive response */
1024 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1025 if (ret < 0) {
1026 goto error;
1027 }
1028
1029 reply.ret_code = be32toh(reply.ret_code);
1030
1031 /* Return session id or negative ret code. */
1032 if (reply.ret_code != LTTNG_OK) {
1033 ret = -1;
1034 ERR("Relayd begin data pending replied error %d", reply.ret_code);
1035 goto error;
1036 }
1037
1038 return 0;
1039
1040 error:
1041 return ret;
1042 }
1043
1044 /*
1045 * End a data pending command for a specific session id.
1046 *
1047 * Return 0 on success and set is_data_inflight to 0 if no data is being
1048 * streamed or 1 if it is the case.
1049 */
1050 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
1051 unsigned int *is_data_inflight)
1052 {
1053 int ret, recv_ret;
1054 struct lttcomm_relayd_end_data_pending msg;
1055 struct lttcomm_relayd_generic_reply reply;
1056
1057 /* Code flow error. Safety net. */
1058 LTTNG_ASSERT(rsock);
1059
1060 DBG("Relayd end data pending");
1061
1062 memset(&msg, 0, sizeof(msg));
1063 msg.session_id = htobe64(id);
1064
1065 /* Send command */
1066 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
1067 if (ret < 0) {
1068 goto error;
1069 }
1070
1071 /* Receive response */
1072 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1073 if (ret < 0) {
1074 goto error;
1075 }
1076
1077 recv_ret = be32toh(reply.ret_code);
1078 if (recv_ret < 0) {
1079 ret = recv_ret;
1080 goto error;
1081 }
1082
1083 *is_data_inflight = recv_ret;
1084
1085 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1086
1087 return 0;
1088
1089 error:
1090 return ret;
1091 }
1092
1093 /*
1094 * Send index to the relayd.
1095 */
1096 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1097 struct ctf_packet_index *index, uint64_t relay_stream_id,
1098 uint64_t net_seq_num)
1099 {
1100 int ret;
1101 struct lttcomm_relayd_index msg;
1102 struct lttcomm_relayd_generic_reply reply;
1103
1104 /* Code flow error. Safety net. */
1105 LTTNG_ASSERT(rsock);
1106
1107 if (rsock->minor < 4) {
1108 DBG("Not sending indexes before protocol 2.4");
1109 ret = 0;
1110 goto error;
1111 }
1112
1113 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1114
1115 memset(&msg, 0, sizeof(msg));
1116 msg.relay_stream_id = htobe64(relay_stream_id);
1117 msg.net_seq_num = htobe64(net_seq_num);
1118
1119 /* The index is already in big endian. */
1120 msg.packet_size = index->packet_size;
1121 msg.content_size = index->content_size;
1122 msg.timestamp_begin = index->timestamp_begin;
1123 msg.timestamp_end = index->timestamp_end;
1124 msg.events_discarded = index->events_discarded;
1125 msg.stream_id = index->stream_id;
1126
1127 if (rsock->minor >= 8) {
1128 msg.stream_instance_id = index->stream_instance_id;
1129 msg.packet_seq_num = index->packet_seq_num;
1130 }
1131
1132 /* Send command */
1133 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1134 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1135 rsock->minor),
1136 lttng_to_index_minor(rsock->major, rsock->minor)),
1137 0);
1138 if (ret < 0) {
1139 goto error;
1140 }
1141
1142 /* Receive response */
1143 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1144 if (ret < 0) {
1145 goto error;
1146 }
1147
1148 reply.ret_code = be32toh(reply.ret_code);
1149
1150 /* Return session id or negative ret code. */
1151 if (reply.ret_code != LTTNG_OK) {
1152 ret = -1;
1153 ERR("Relayd send index replied error %d", reply.ret_code);
1154 } else {
1155 /* Success */
1156 ret = 0;
1157 }
1158
1159 error:
1160 return ret;
1161 }
1162
1163 /*
1164 * Ask the relay to reset the metadata trace file (regeneration).
1165 */
1166 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1167 uint64_t stream_id, uint64_t version)
1168 {
1169 int ret;
1170 struct lttcomm_relayd_reset_metadata msg;
1171 struct lttcomm_relayd_generic_reply reply;
1172
1173 /* Code flow error. Safety net. */
1174 LTTNG_ASSERT(rsock);
1175
1176 /* Should have been prevented by the sessiond. */
1177 if (rsock->minor < 8) {
1178 ERR("Metadata regeneration unsupported before 2.8");
1179 ret = -1;
1180 goto error;
1181 }
1182
1183 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1184
1185 memset(&msg, 0, sizeof(msg));
1186 msg.stream_id = htobe64(stream_id);
1187 msg.version = htobe64(version);
1188
1189 /* Send command */
1190 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1191 if (ret < 0) {
1192 goto error;
1193 }
1194
1195 /* Receive response */
1196 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1197 if (ret < 0) {
1198 goto error;
1199 }
1200
1201 reply.ret_code = be32toh(reply.ret_code);
1202
1203 /* Return session id or negative ret code. */
1204 if (reply.ret_code != LTTNG_OK) {
1205 ret = -1;
1206 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1207 } else {
1208 /* Success */
1209 ret = 0;
1210 }
1211
1212 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1213
1214 error:
1215 return ret;
1216 }
1217
1218 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1219 unsigned int stream_count, const uint64_t *new_chunk_id,
1220 const struct relayd_stream_rotation_position *positions)
1221 {
1222 int ret;
1223 unsigned int i;
1224 struct lttng_dynamic_buffer payload;
1225 struct lttcomm_relayd_generic_reply reply = {};
1226 struct lttcomm_relayd_rotate_streams msg;
1227 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1228 const char *new_chunk_id_str;
1229
1230 msg.stream_count = htobe32((uint32_t) stream_count);
1231 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1232 .is_set = !!new_chunk_id,
1233 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1234 };
1235
1236 if (!relayd_supports_chunks(sock)) {
1237 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1238 return 0;
1239 }
1240
1241 lttng_dynamic_buffer_init(&payload);
1242
1243 /* Code flow error. Safety net. */
1244 LTTNG_ASSERT(sock);
1245
1246 if (new_chunk_id) {
1247 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1248 "%" PRIu64, *new_chunk_id);
1249 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1250 new_chunk_id_str = "formatting error";
1251 } else {
1252 new_chunk_id_str = new_chunk_id_buf;
1253 }
1254 } else {
1255 new_chunk_id_str = "none";
1256 }
1257
1258 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1259 new_chunk_id_str, stream_count);
1260
1261 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1262 if (ret) {
1263 ERR("Failed to allocate \"rotate streams\" command payload");
1264 goto error;
1265 }
1266
1267 for (i = 0; i < stream_count; i++) {
1268 const struct relayd_stream_rotation_position *position =
1269 &positions[i];
1270 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1271 .stream_id = htobe64(position->stream_id),
1272 .rotate_at_seq_num = htobe64(
1273 position->rotate_at_seq_num),
1274 };
1275
1276 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
1277 position->stream_id,
1278 position->rotate_at_seq_num);
1279 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1280 sizeof(comm_position));
1281 if (ret) {
1282 ERR("Failed to allocate \"rotate streams\" command payload");
1283 goto error;
1284 }
1285 }
1286
1287 /* Send command. */
1288 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1289 payload.size, 0);
1290 if (ret < 0) {
1291 ERR("Failed to send \"rotate stream\" command");
1292 goto error;
1293 }
1294
1295 /* Receive response. */
1296 ret = recv_reply(sock, &reply, sizeof(reply));
1297 if (ret < 0) {
1298 ERR("Failed to receive \"rotate streams\" command reply");
1299 goto error;
1300 }
1301
1302 reply.ret_code = be32toh(reply.ret_code);
1303 if (reply.ret_code != LTTNG_OK) {
1304 ret = -1;
1305 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1306 } else {
1307 /* Success. */
1308 ret = 0;
1309 DBG("Relayd rotated streams successfully");
1310 }
1311
1312 error:
1313 lttng_dynamic_buffer_reset(&payload);
1314 return ret;
1315 }
1316
1317 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1318 struct lttng_trace_chunk *chunk)
1319 {
1320 int ret = 0;
1321 enum lttng_trace_chunk_status status;
1322 struct lttcomm_relayd_create_trace_chunk msg = {};
1323 struct lttcomm_relayd_generic_reply reply = {};
1324 struct lttng_dynamic_buffer payload;
1325 uint64_t chunk_id;
1326 time_t creation_timestamp;
1327 const char *chunk_name;
1328 size_t chunk_name_length;
1329 bool overridden_name;
1330
1331 lttng_dynamic_buffer_init(&payload);
1332
1333 if (!relayd_supports_chunks(sock)) {
1334 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1335 goto end;
1336 }
1337
1338 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1339 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1340 ret = -1;
1341 goto end;
1342 }
1343
1344 status = lttng_trace_chunk_get_creation_timestamp(
1345 chunk, &creation_timestamp);
1346 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1347 ret = -1;
1348 goto end;
1349 }
1350
1351 status = lttng_trace_chunk_get_name(
1352 chunk, &chunk_name, &overridden_name);
1353 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1354 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1355 ret = -1;
1356 goto end;
1357 }
1358
1359 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1360 msg.chunk_id = htobe64(chunk_id);
1361 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1362 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
1363
1364 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1365 if (ret) {
1366 goto end;
1367 }
1368 if (chunk_name_length) {
1369 ret = lttng_dynamic_buffer_append(
1370 &payload, chunk_name, chunk_name_length);
1371 if (ret) {
1372 goto end;
1373 }
1374 }
1375
1376 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1377 payload.size, 0);
1378 if (ret < 0) {
1379 ERR("Failed to send trace chunk creation command to relay daemon");
1380 goto end;
1381 }
1382
1383 ret = recv_reply(sock, &reply, sizeof(reply));
1384 if (ret < 0) {
1385 ERR("Failed to receive relay daemon trace chunk creation command reply");
1386 goto end;
1387 }
1388
1389 reply.ret_code = be32toh(reply.ret_code);
1390 if (reply.ret_code != LTTNG_OK) {
1391 ret = -1;
1392 ERR("Relayd trace chunk create replied error %d",
1393 reply.ret_code);
1394 } else {
1395 ret = 0;
1396 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1397 chunk_id);
1398 }
1399
1400 end:
1401 lttng_dynamic_buffer_reset(&payload);
1402 return ret;
1403 }
1404
1405 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1406 struct lttng_trace_chunk *chunk,
1407 char *path)
1408 {
1409 int ret = 0;
1410 enum lttng_trace_chunk_status status;
1411 struct lttcomm_relayd_close_trace_chunk msg = {};
1412 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
1413 uint64_t chunk_id;
1414 time_t close_timestamp;
1415 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1416
1417 if (!relayd_supports_chunks(sock)) {
1418 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1419 goto end;
1420 }
1421
1422 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1423 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1424 ERR("Failed to get trace chunk id");
1425 ret = -1;
1426 goto end;
1427 }
1428
1429 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1430 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1431 ERR("Failed to get trace chunk close timestamp");
1432 ret = -1;
1433 goto end;
1434 }
1435
1436 status = lttng_trace_chunk_get_close_command(chunk,
1437 &close_command.value);
1438 switch (status) {
1439 case LTTNG_TRACE_CHUNK_STATUS_OK:
1440 close_command.is_set = 1;
1441 break;
1442 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1443 break;
1444 default:
1445 ERR("Failed to get trace chunk close command");
1446 ret = -1;
1447 goto end;
1448 }
1449
1450 msg = (typeof(msg)){
1451 .chunk_id = htobe64(chunk_id),
1452 .close_timestamp = htobe64((uint64_t) close_timestamp),
1453 .close_command = {
1454 .is_set = close_command.is_set,
1455 .value = htobe32((uint32_t) close_command.value),
1456 },
1457 };
1458
1459 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1460 0);
1461 if (ret < 0) {
1462 ERR("Failed to send trace chunk close command to relay daemon");
1463 goto end;
1464 }
1465
1466 ret = recv_reply(sock, &reply, sizeof(reply));
1467 if (ret < 0) {
1468 ERR("Failed to receive relay daemon trace chunk close command reply");
1469 goto end;
1470 }
1471
1472 reply.path_length = be32toh(reply.path_length);
1473 if (reply.path_length >= LTTNG_PATH_MAX) {
1474 ERR("Chunk path too long");
1475 ret = -1;
1476 goto end;
1477 }
1478
1479 ret = recv_reply(sock, path, reply.path_length);
1480 if (ret < 0) {
1481 ERR("Failed to receive relay daemon trace chunk close command reply");
1482 goto end;
1483 }
1484 if (path[reply.path_length - 1] != '\0') {
1485 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1486 ret = -1;
1487 goto end;
1488 }
1489
1490 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1491 if (reply.generic.ret_code != LTTNG_OK) {
1492 ret = -1;
1493 ERR("Relayd trace chunk close replied error %d",
1494 reply.generic.ret_code);
1495 } else {
1496 ret = 0;
1497 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1498 chunk_id);
1499 }
1500 end:
1501 return ret;
1502 }
1503
1504 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1505 uint64_t chunk_id, bool *chunk_exists)
1506 {
1507 int ret = 0;
1508 struct lttcomm_relayd_trace_chunk_exists msg = {};
1509 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1510
1511 if (!relayd_supports_chunks(sock)) {
1512 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
1513 /* The chunk will never exist */
1514 *chunk_exists = false;
1515 goto end;
1516 }
1517
1518 msg = (typeof(msg)){
1519 .chunk_id = htobe64(chunk_id),
1520 };
1521
1522 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1523 0);
1524 if (ret < 0) {
1525 ERR("Failed to send trace chunk exists command to relay daemon");
1526 goto end;
1527 }
1528
1529 ret = recv_reply(sock, &reply, sizeof(reply));
1530 if (ret < 0) {
1531 ERR("Failed to receive relay daemon trace chunk close command reply");
1532 goto end;
1533 }
1534
1535 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1536 if (reply.generic.ret_code != LTTNG_OK) {
1537 ret = -1;
1538 ERR("Relayd trace chunk close replied error %d",
1539 reply.generic.ret_code);
1540 } else {
1541 ret = 0;
1542 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1543 ", exists = %s", chunk_id,
1544 reply.trace_chunk_exists ? "true" : "false");
1545 *chunk_exists = !!reply.trace_chunk_exists;
1546 }
1547 end:
1548 return ret;
1549 }
1550
1551 enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
1552 uint64_t query_flags,
1553 uint64_t& result_flags,
1554 uint64_t *trace_format_query_results)
1555 {
1556 int ret;
1557 enum lttng_error_code ret_code = LTTNG_OK;
1558 struct lttcomm_relayd_get_configuration msg = {};
1559 struct lttcomm_relayd_get_configuration_reply reply = {};
1560 lttng_dynamic_buffer buffer;
1561 bool requesting_trace_format = query_flags &
1562 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1563
1564 lttng_dynamic_buffer_init(&buffer);
1565
1566 assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
1567 assert(!(requesting_trace_format && !trace_format_query_results));
1568
1569 if (!relayd_supports_get_configuration(sock)) {
1570 DBG("Refusing to get relayd configuration (unsupported by relayd)");
1571 result_flags = 0;
1572 if (trace_format_query_results) {
1573 *trace_format_query_results =
1574 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1575 }
1576 goto end;
1577 }
1578
1579 if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
1580 /*
1581 * Provide default value for that query since lttng-relayd does
1582 * not know this query type.
1583 */
1584 if (trace_format_query_results) {
1585 *trace_format_query_results =
1586 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1587 }
1588 /* Remove from the query set. */
1589 query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1590 requesting_trace_format = false;
1591 }
1592
1593 msg.query_flags = htobe64(query_flags);
1594
1595 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
1596 if (ret < 0) {
1597 ERR("Failed to send get configuration command to relay daemon");
1598 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1599 goto end;
1600 }
1601
1602 ret = recv_reply(sock, &reply, sizeof(reply));
1603 if (ret < 0) {
1604 ERR("Failed to receive relay daemon get configuration command reply");
1605 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1606 goto end;
1607 }
1608
1609 ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
1610 if (ret_code != LTTNG_OK) {
1611 ERR("Relayd get configuration replied error %d", ret_code);
1612 goto end;
1613 }
1614
1615 result_flags = be64toh(reply.relayd_configuration_flags);
1616 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1617 ", results_flags = %" PRIu64,
1618 query_flags, result_flags);
1619
1620 if (!requesting_trace_format) {
1621 ret_code = LTTNG_OK;
1622 goto end;
1623 }
1624
1625 /* Receive trace formats */
1626 {
1627 lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
1628 ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
1629 if (ret < 0) {
1630 ERR("Failed to receive relay daemon get configuration query flag data");
1631 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1632 goto end;
1633 }
1634
1635 query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
1636 LTTNG_ASSERT(query_flag_reply.query_flag &
1637 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
1638
1639 query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
1640 LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
1641
1642 lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
1643
1644 ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
1645 if (ret < 0) {
1646 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1647 ERR("Failed to receive configuration dynamic payload for flag TODO");
1648 goto end;
1649 }
1650
1651 *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
1652 }
1653
1654 ret_code = LTTNG_OK;
1655
1656 end:
1657 lttng_dynamic_buffer_reset(&buffer);
1658 return ret_code;
1659 }
This page took 0.06767 seconds and 5 git commands to generate.