Propagate trace format to relayd on session creation
[deliverable/lttng-tools.git] / src / common / relayd / relayd.cpp
1 /*
2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/stat.h>
13 #include <inttypes.h>
14
15 #include <bitset>
16 #include <common/common.hpp>
17 #include <common/compat/endian.hpp>
18 #include <common/compat/string.hpp>
19 #include <common/defaults.hpp>
20 #include <common/index/ctf-index.hpp>
21 #include <common/sessiond-comm/relayd.hpp>
22 #include <common/string-utils/format.hpp>
23 #include <common/trace-chunk.hpp>
24 #include <lttng/trace-format-descriptor-internal.hpp>
25
26 #include "relayd.hpp"
27
28 static
29 bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
30 {
31 if (sock->major > 2) {
32 return true;
33 } else if (sock->major == 2 && sock->minor >= 11) {
34 return true;
35 }
36 return false;
37 }
38
39 static
40 bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
41 {
42 if (sock->major > 2) {
43 return true;
44 } else if (sock->major == 2 && sock->minor >= 12) {
45 return true;
46 }
47 return false;
48 }
49
50 static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
51 {
52 if (sock->major > 2) {
53 return true;
54 } else if (sock->major == 2 && sock->minor >= 15) {
55 return true;
56 }
57 return false;
58 }
59
60 /*
61 * Send command. Fill up the header and append the data.
62 */
63 static int send_command(struct lttcomm_relayd_sock *rsock,
64 enum lttcomm_relayd_command cmd, const void *data, size_t size,
65 int flags)
66 {
67 int ret;
68 struct lttcomm_relayd_hdr header;
69 char *buf;
70 uint64_t buf_size = sizeof(header);
71
72 if (rsock->sock.fd < 0) {
73 return -ECONNRESET;
74 }
75
76 if (data) {
77 buf_size += size;
78 }
79
80 buf = calloc<char>(buf_size);
81 if (buf == NULL) {
82 PERROR("zmalloc relayd send command buf");
83 ret = -ENOMEM;
84 goto alloc_error;
85 }
86
87 memset(&header, 0, sizeof(header));
88 header.cmd = htobe32(cmd);
89 header.data_size = htobe64(size);
90
91 /* Zeroed for now since not used. */
92 header.cmd_version = 0;
93 header.circuit_id = 0;
94
95 /* Prepare buffer to send. */
96 memcpy(buf, &header, sizeof(header));
97 if (data) {
98 memcpy(buf + sizeof(header), data, size);
99 }
100
101 DBG3("Relayd sending command %s of size %" PRIu64,
102 lttcomm_relayd_command_str(cmd), buf_size);
103 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
104 if (ret < 0) {
105 PERROR("Failed to send command %s of size %" PRIu64,
106 lttcomm_relayd_command_str(cmd), buf_size);
107 ret = -errno;
108 goto error;
109 }
110 error:
111 free(buf);
112 alloc_error:
113 return ret;
114 }
115
116 /*
117 * Receive reply data on socket. This MUST be call after send_command or else
118 * could result in unexpected behavior(s).
119 */
120 static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
121 {
122 int ret;
123
124 if (rsock->sock.fd < 0) {
125 return -ECONNRESET;
126 }
127
128 DBG3("Relayd waiting for reply of size %zu", size);
129
130 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
131 if (ret <= 0 || ret != size) {
132 if (ret == 0) {
133 /* Orderly shutdown. */
134 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
135 } else {
136 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
137 rsock->sock.fd, size, ret);
138 }
139 /* Always return -1 here and the caller can use errno. */
140 ret = -1;
141 goto error;
142 }
143
144 error:
145 return ret;
146 }
147
148 /*
149 * Starting from 2.15, the trace format for the session is sent to lttng-relayd
150 * on creation.
151 * */
152 static int relayd_create_session_2_15(struct lttcomm_relayd_sock *rsock,
153 const char *session_name,
154 const char *hostname,
155 const char *base_path,
156 int session_live_timer,
157 unsigned int snapshot,
158 uint64_t sessiond_session_id,
159 const lttng_uuid& sessiond_uuid,
160 const uint64_t *current_chunk_id,
161 time_t creation_time,
162 bool session_name_contains_creation_time,
163 lttcomm_relayd_create_session_reply_2_15 *reply,
164 char *output_path,
165 const lttng::trace_format_descriptor& trace_format)
166 {
167 int ret;
168 struct lttcomm_relayd_create_session_2_15 *msg = NULL;
169 size_t session_name_len;
170 size_t hostname_len;
171 size_t base_path_len;
172 size_t msg_length;
173 char *dst;
174
175 if (!base_path) {
176 base_path = "";
177 }
178 /* The three names are sent with a '\0' delimiter between them. */
179 session_name_len = strlen(session_name) + 1;
180 hostname_len = strlen(hostname) + 1;
181 base_path_len = strlen(base_path) + 1;
182
183 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
184 msg = zmalloc<lttcomm_relayd_create_session_2_15>(msg_length);
185 if (!msg) {
186 PERROR("zmalloc create_session_2_11 command message");
187 ret = -1;
188 goto error;
189 }
190
191 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
192 msg->session_name_len = htobe32(session_name_len);
193
194 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
195 msg->hostname_len = htobe32(hostname_len);
196
197 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
198 msg->base_path_len = htobe32(base_path_len);
199
200 dst = msg->names;
201 if (lttng_strncpy(dst, session_name, session_name_len)) {
202 ret = -1;
203 goto error;
204 }
205 dst += session_name_len;
206 if (lttng_strncpy(dst, hostname, hostname_len)) {
207 ret = -1;
208 goto error;
209 }
210 dst += hostname_len;
211 if (lttng_strncpy(dst, base_path, base_path_len)) {
212 ret = -1;
213 goto error;
214 }
215
216 msg->live_timer = htobe32(session_live_timer);
217 msg->snapshot = !!snapshot;
218
219 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
220 msg->session_id = htobe64(sessiond_session_id);
221 msg->session_name_contains_creation_time = session_name_contains_creation_time;
222 msg->trace_format = (uint8_t) trace_format.relayd_type();
223 if (current_chunk_id) {
224 LTTNG_OPTIONAL_SET(&msg->current_chunk_id, htobe64(*current_chunk_id));
225 }
226
227 msg->creation_time = htobe64((uint64_t) creation_time);
228
229 /* Send command */
230 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
231 if (ret < 0) {
232 goto error;
233 }
234 /* Receive response */
235 ret = recv_reply(rsock, reply, sizeof(*reply));
236 if (ret < 0) {
237 goto error;
238 }
239 reply->generic.session_id = be64toh(reply->generic.session_id);
240 reply->generic.ret_code = be32toh(reply->generic.ret_code);
241 reply->output_path_length = be32toh(reply->output_path_length);
242 if (reply->output_path_length >= LTTNG_PATH_MAX) {
243 ERR("Invalid session output path length in reply (%" PRIu32
244 " bytes) exceeds maximal allowed length (%d bytes)",
245 reply->output_path_length, LTTNG_PATH_MAX);
246 ret = -1;
247 goto error;
248 }
249 ret = recv_reply(rsock, output_path, reply->output_path_length);
250 if (ret < 0) {
251 goto error;
252 }
253 error:
254 free(msg);
255 return ret;
256 }
257
258 /*
259 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
260 * hostname, and base_path) have no length restriction on the sender side.
261 * Length for both payloads is stored in the msg struct. A new dynamic size
262 * payload size is introduced.
263 */
264 static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
265 const char *session_name, const char *hostname,
266 const char *base_path, int session_live_timer,
267 unsigned int snapshot, uint64_t sessiond_session_id,
268 const lttng_uuid& sessiond_uuid, const uint64_t *current_chunk_id,
269 time_t creation_time, bool session_name_contains_creation_time,
270 struct lttcomm_relayd_create_session_reply_2_11 *reply,
271 char *output_path)
272 {
273 int ret;
274 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
275 size_t session_name_len;
276 size_t hostname_len;
277 size_t base_path_len;
278 size_t msg_length;
279 char *dst;
280
281 if (!base_path) {
282 base_path = "";
283 }
284 /* The three names are sent with a '\0' delimiter between them. */
285 session_name_len = strlen(session_name) + 1;
286 hostname_len = strlen(hostname) + 1;
287 base_path_len = strlen(base_path) + 1;
288
289 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
290 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
291 if (!msg) {
292 PERROR("zmalloc create_session_2_11 command message");
293 ret = -1;
294 goto error;
295 }
296
297 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
298 msg->session_name_len = htobe32(session_name_len);
299
300 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
301 msg->hostname_len = htobe32(hostname_len);
302
303 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
304 msg->base_path_len = htobe32(base_path_len);
305
306 dst = msg->names;
307 if (lttng_strncpy(dst, session_name, session_name_len)) {
308 ret = -1;
309 goto error;
310 }
311 dst += session_name_len;
312 if (lttng_strncpy(dst, hostname, hostname_len)) {
313 ret = -1;
314 goto error;
315 }
316 dst += hostname_len;
317 if (lttng_strncpy(dst, base_path, base_path_len)) {
318 ret = -1;
319 goto error;
320 }
321
322 msg->live_timer = htobe32(session_live_timer);
323 msg->snapshot = !!snapshot;
324
325 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
326 msg->session_id = htobe64(sessiond_session_id);
327 msg->session_name_contains_creation_time = session_name_contains_creation_time;
328 if (current_chunk_id) {
329 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
330 htobe64(*current_chunk_id));
331 }
332
333 msg->creation_time = htobe64((uint64_t) creation_time);
334
335 /* Send command */
336 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
337 if (ret < 0) {
338 goto error;
339 }
340 /* Receive response */
341 ret = recv_reply(rsock, reply, sizeof(*reply));
342 if (ret < 0) {
343 goto error;
344 }
345 reply->generic.session_id = be64toh(reply->generic.session_id);
346 reply->generic.ret_code = be32toh(reply->generic.ret_code);
347 reply->output_path_length = be32toh(reply->output_path_length);
348 if (reply->output_path_length >= LTTNG_PATH_MAX) {
349 ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
350 reply->output_path_length, LTTNG_PATH_MAX);
351 ret = -1;
352 goto error;
353 }
354 ret = recv_reply(rsock, output_path, reply->output_path_length);
355 if (ret < 0) {
356 goto error;
357 }
358 error:
359 free(msg);
360 return ret;
361 }
362 /*
363 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
364 * support the live reading capability.
365 */
366 static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
367 const char *session_name, const char *hostname,
368 int session_live_timer, unsigned int snapshot,
369 struct lttcomm_relayd_status_session *reply)
370 {
371 int ret;
372 struct lttcomm_relayd_create_session_2_4 msg;
373
374 if (lttng_strncpy(msg.session_name, session_name,
375 sizeof(msg.session_name))) {
376 ret = -1;
377 goto error;
378 }
379 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
380 ret = -1;
381 goto error;
382 }
383 msg.live_timer = htobe32(session_live_timer);
384 msg.snapshot = htobe32(snapshot);
385
386 /* Send command */
387 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
388 if (ret < 0) {
389 goto error;
390 }
391
392 /* Receive response */
393 ret = recv_reply(rsock, reply, sizeof(*reply));
394 if (ret < 0) {
395 goto error;
396 }
397 reply->session_id = be64toh(reply->session_id);
398 reply->ret_code = be32toh(reply->ret_code);
399 error:
400 return ret;
401 }
402
403 /*
404 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
405 */
406 static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
407 struct lttcomm_relayd_status_session *reply)
408 {
409 int ret;
410
411 /* Send command */
412 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
413 if (ret < 0) {
414 goto error;
415 }
416
417 /* Receive response */
418 ret = recv_reply(rsock, reply, sizeof(*reply));
419 if (ret < 0) {
420 goto error;
421 }
422 reply->session_id = be64toh(reply->session_id);
423 reply->ret_code = be32toh(reply->ret_code);
424 error:
425 return ret;
426 }
427
428 /*
429 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
430 * set session_id of the relayd if we have a successful reply from the relayd.
431 *
432 * On success, return 0 else a negative value which is either an errno error or
433 * a lttng error code from the relayd.
434 */
435 int relayd_create_session(struct lttcomm_relayd_sock *rsock,
436 uint64_t *relayd_session_id,
437 const char *session_name,
438 const char *hostname,
439 const char *base_path,
440 int session_live_timer,
441 unsigned int snapshot,
442 uint64_t sessiond_session_id,
443 const lttng_uuid& sessiond_uuid,
444 const uint64_t *current_chunk_id,
445 time_t creation_time,
446 bool session_name_contains_creation_time,
447 char *output_path,
448 lttng::trace_format_descriptor& trace_format)
449 {
450 int ret;
451 lttcomm_relayd_create_session_reply_2_15 reply = {};
452
453 LTTNG_ASSERT(rsock);
454 LTTNG_ASSERT(relayd_session_id);
455
456 DBG("Relayd create session");
457
458 if (rsock->minor < 4) {
459 /* From 2.1 to 2.3 */
460 ret = relayd_create_session_2_1(rsock, &reply.generic);
461 } else if (rsock->minor >= 4 && rsock->minor < 11) {
462 /* From 2.4 to 2.10 */
463 ret = relayd_create_session_2_4(rsock, session_name, hostname, session_live_timer,
464 snapshot, &reply.generic);
465 } else if (rsock->minor < 15) {
466 /* From 2.11 to 2.14 */
467 ret = relayd_create_session_2_11(rsock, session_name, hostname, base_path,
468 session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
469 current_chunk_id, creation_time,
470 session_name_contains_creation_time, &reply, output_path);
471 } else {
472 ret = relayd_create_session_2_15(rsock, session_name, hostname, base_path,
473 session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
474 current_chunk_id, creation_time,
475 session_name_contains_creation_time, &reply, output_path,
476 trace_format);
477 }
478
479 if (ret < 0) {
480 goto error;
481 }
482
483 /* Return session id or negative ret code. */
484 if (reply.generic.ret_code != LTTNG_OK) {
485 ret = -1;
486 ERR("Relayd create session replied error %d",
487 reply.generic.ret_code);
488 goto error;
489 } else {
490 ret = 0;
491 *relayd_session_id = reply.generic.session_id;
492 }
493
494 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
495
496 error:
497 return ret;
498 }
499
500 static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
501 const char *channel_name, const char *pathname)
502 {
503 int ret;
504 struct lttcomm_relayd_add_stream msg;
505
506 memset(&msg, 0, sizeof(msg));
507 if (lttng_strncpy(msg.channel_name, channel_name,
508 sizeof(msg.channel_name))) {
509 ret = -1;
510 goto error;
511 }
512
513 if (lttng_strncpy(msg.pathname, pathname,
514 sizeof(msg.pathname))) {
515 ret = -1;
516 goto error;
517 }
518
519 /* Send command */
520 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
521 if (ret < 0) {
522 ret = -1;
523 goto error;
524 }
525 ret = 0;
526 error:
527 return ret;
528 }
529
530 static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
531 const char *channel_name, const char *pathname,
532 uint64_t tracefile_size, uint64_t tracefile_count)
533 {
534 int ret;
535 struct lttcomm_relayd_add_stream_2_2 msg;
536
537 memset(&msg, 0, sizeof(msg));
538 /* Compat with relayd 2.2 to 2.10 */
539 if (lttng_strncpy(msg.channel_name, channel_name,
540 sizeof(msg.channel_name))) {
541 ret = -1;
542 goto error;
543 }
544 if (lttng_strncpy(msg.pathname, pathname,
545 sizeof(msg.pathname))) {
546 ret = -1;
547 goto error;
548 }
549 msg.tracefile_size = htobe64(tracefile_size);
550 msg.tracefile_count = htobe64(tracefile_count);
551
552 /* Send command */
553 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
554 if (ret < 0) {
555 goto error;
556 }
557 ret = 0;
558 error:
559 return ret;
560 }
561
562 static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
563 const char *channel_name, const char *pathname,
564 uint64_t tracefile_size, uint64_t tracefile_count,
565 uint64_t trace_archive_id)
566 {
567 int ret;
568 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
569 size_t channel_name_len;
570 size_t pathname_len;
571 size_t msg_length;
572
573 /* The two names are sent with a '\0' delimiter between them. */
574 channel_name_len = strlen(channel_name) + 1;
575 pathname_len = strlen(pathname) + 1;
576
577 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
578 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
579 if (!msg) {
580 PERROR("zmalloc add_stream_2_11 command message");
581 ret = -1;
582 goto error;
583 }
584
585 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
586 msg->channel_name_len = htobe32(channel_name_len);
587
588 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
589 msg->pathname_len = htobe32(pathname_len);
590
591 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
592 ret = -1;
593 goto error;
594 }
595 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
596 ret = -1;
597 goto error;
598 }
599
600 msg->tracefile_size = htobe64(tracefile_size);
601 msg->tracefile_count = htobe64(tracefile_count);
602 msg->trace_chunk_id = htobe64(trace_archive_id);
603
604 /* Send command */
605 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
606 if (ret < 0) {
607 goto error;
608 }
609 ret = 0;
610 error:
611 free(msg);
612 return ret;
613 }
614
615 /*
616 * Add stream on the relayd and assign stream handle to the stream_id argument.
617 *
618 * Chunks are not supported by relayd prior to 2.11, but are used to
619 * internally between session daemon and consumer daemon to keep track
620 * of the channel and stream output path.
621 *
622 * On success return 0 else return ret_code negative value.
623 */
624 int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
625 const char *domain_name, const char *_pathname, uint64_t *stream_id,
626 uint64_t tracefile_size, uint64_t tracefile_count,
627 struct lttng_trace_chunk *trace_chunk)
628 {
629 int ret;
630 struct lttcomm_relayd_status_stream reply;
631 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
632
633 /* Code flow error. Safety net. */
634 LTTNG_ASSERT(rsock);
635 LTTNG_ASSERT(channel_name);
636 LTTNG_ASSERT(domain_name);
637 LTTNG_ASSERT(_pathname);
638 LTTNG_ASSERT(trace_chunk);
639
640 DBG("Relayd adding stream for channel name %s", channel_name);
641
642 /* Compat with relayd 2.1 */
643 if (rsock->minor == 1) {
644 /* For 2.1 */
645 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
646
647 } else if (rsock->minor > 1 && rsock->minor < 11) {
648 /* From 2.2 to 2.10 */
649 ret = relayd_add_stream_2_2(rsock, channel_name, _pathname,
650 tracefile_size, tracefile_count);
651 } else {
652 const char *separator;
653 enum lttng_trace_chunk_status chunk_status;
654 uint64_t chunk_id;
655
656 if (_pathname[0] == '\0') {
657 separator = "";
658 } else {
659 separator = "/";
660 }
661
662 ret = snprintf(pathname, RELAYD_COMM_LTTNG_PATH_MAX, "%s%s%s",
663 domain_name, separator, _pathname);
664 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
665 ERR("Failed to format stream path: %s",
666 ret <= 0 ? "formatting error" :
667 "path exceeds maximal allowed length");
668 ret = -1;
669 goto error;
670 }
671
672 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
673 &chunk_id);
674 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
675
676 /* From 2.11 to ...*/
677 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
678 tracefile_size, tracefile_count,
679 chunk_id);
680 }
681
682 if (ret) {
683 ret = -1;
684 goto error;
685 }
686
687 /* Waiting for reply */
688 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
689 if (ret < 0) {
690 goto error;
691 }
692
693 /* Back to host bytes order. */
694 reply.handle = be64toh(reply.handle);
695 reply.ret_code = be32toh(reply.ret_code);
696
697 /* Return session id or negative ret code. */
698 if (reply.ret_code != LTTNG_OK) {
699 ret = -1;
700 ERR("Relayd add stream replied error %d", reply.ret_code);
701 } else {
702 /* Success */
703 ret = 0;
704 *stream_id = reply.handle;
705 }
706
707 DBG("Relayd stream added successfully with handle %" PRIu64,
708 reply.handle);
709
710 error:
711 return ret;
712 }
713
714 /*
715 * Inform the relay that all the streams for the current channel has been sent.
716 *
717 * On success return 0 else return ret_code negative value.
718 */
719 int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
720 {
721 int ret;
722 struct lttcomm_relayd_generic_reply reply;
723
724 /* Code flow error. Safety net. */
725 LTTNG_ASSERT(rsock);
726
727 DBG("Relayd sending streams sent.");
728
729 /* This feature was introduced in 2.4, ignore it for earlier versions. */
730 if (rsock->minor < 4) {
731 ret = 0;
732 goto end;
733 }
734
735 /* Send command */
736 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
737 if (ret < 0) {
738 goto error;
739 }
740
741 /* Waiting for reply */
742 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
743 if (ret < 0) {
744 goto error;
745 }
746
747 /* Back to host bytes order. */
748 reply.ret_code = be32toh(reply.ret_code);
749
750 /* Return session id or negative ret code. */
751 if (reply.ret_code != LTTNG_OK) {
752 ret = -1;
753 ERR("Relayd streams sent replied error %d", reply.ret_code);
754 goto error;
755 } else {
756 /* Success */
757 ret = 0;
758 }
759
760 DBG("Relayd streams sent success");
761
762 error:
763 end:
764 return ret;
765 }
766
767 /*
768 * Check version numbers on the relayd.
769 * If major versions are compatible, we assign minor_to_use to the
770 * minor version of the procotol we are going to use for this session.
771 *
772 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
773 * otherwise, or a negative value on network errors.
774 */
775 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
776 {
777 int ret;
778 struct lttcomm_relayd_version msg;
779
780 /* Code flow error. Safety net. */
781 LTTNG_ASSERT(rsock);
782
783 DBG("Relayd version check for major.minor %u.%u", rsock->major,
784 rsock->minor);
785
786 memset(&msg, 0, sizeof(msg));
787 /* Prepare network byte order before transmission. */
788 msg.major = htobe32(rsock->major);
789 msg.minor = htobe32(rsock->minor);
790
791 /* Send command */
792 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
793 if (ret < 0) {
794 goto error;
795 }
796
797 /* Receive response */
798 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
799 if (ret < 0) {
800 goto error;
801 }
802
803 /* Set back to host bytes order */
804 msg.major = be32toh(msg.major);
805 msg.minor = be32toh(msg.minor);
806
807 /*
808 * Only validate the major version. If the other side is higher,
809 * communication is not possible. Only major version equal can talk to each
810 * other. If the minor version differs, the lowest version is used by both
811 * sides.
812 */
813 if (msg.major != rsock->major) {
814 /* Not compatible */
815 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
816 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
817 msg.major, rsock->major);
818 goto error;
819 }
820
821 /*
822 * If the relayd's minor version is higher, it will adapt to our version so
823 * we can continue to use the latest relayd communication data structure.
824 * If the received minor version is higher, the relayd should adapt to us.
825 */
826 if (rsock->minor > msg.minor) {
827 rsock->minor = msg.minor;
828 }
829
830 /* Version number compatible */
831 DBG2("Relayd version is compatible, using protocol version %u.%u",
832 rsock->major, rsock->minor);
833 ret = 0;
834
835 error:
836 return ret;
837 }
838
839 /*
840 * Add stream on the relayd and assign stream handle to the stream_id argument.
841 *
842 * On success return 0 else return ret_code negative value.
843 */
844 int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
845 {
846 int ret;
847
848 /* Code flow error. Safety net. */
849 LTTNG_ASSERT(rsock);
850
851 DBG("Relayd sending metadata of size %zu", len);
852
853 /* Send command */
854 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
855 if (ret < 0) {
856 goto error;
857 }
858
859 DBG2("Relayd metadata added successfully");
860
861 /*
862 * After that call, the metadata data MUST be sent to the relayd so the
863 * receive size on the other end matches the len of the metadata packet
864 * header. This is why we don't wait for a reply here.
865 */
866
867 error:
868 return ret;
869 }
870
871 /*
872 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
873 */
874 int relayd_connect(struct lttcomm_relayd_sock *rsock)
875 {
876 /* Code flow error. Safety net. */
877 LTTNG_ASSERT(rsock);
878
879 if (!rsock->sock.ops) {
880 /*
881 * Attempting a connect on a non-initialized socket.
882 */
883 return -ECONNRESET;
884 }
885
886 DBG3("Relayd connect ...");
887
888 return rsock->sock.ops->connect(&rsock->sock);
889 }
890
891 /*
892 * Close relayd socket with an allocated lttcomm_relayd_sock.
893 *
894 * If no socket operations are found, simply return 0 meaning that everything
895 * is fine. Without operations, the socket can not possibly be opened or used.
896 * This is possible if the socket was allocated but not created. However, the
897 * caller could simply use it to store a valid file descriptor for instance
898 * passed over a Unix socket and call this to cleanup but still without a valid
899 * ops pointer.
900 *
901 * Return the close returned value. On error, a negative value is usually
902 * returned back from close(2).
903 */
904 int relayd_close(struct lttcomm_relayd_sock *rsock)
905 {
906 int ret;
907
908 /* Code flow error. Safety net. */
909 LTTNG_ASSERT(rsock);
910
911 /* An invalid fd is fine, return success. */
912 if (rsock->sock.fd < 0) {
913 ret = 0;
914 goto end;
915 }
916
917 DBG3("Relayd closing socket %d", rsock->sock.fd);
918
919 if (rsock->sock.ops) {
920 ret = rsock->sock.ops->close(&rsock->sock);
921 } else {
922 /* Default call if no specific ops found. */
923 ret = close(rsock->sock.fd);
924 if (ret < 0) {
925 PERROR("relayd_close default close");
926 }
927 }
928 rsock->sock.fd = -1;
929
930 end:
931 return ret;
932 }
933
934 /*
935 * Send data header structure to the relayd.
936 */
937 int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
938 struct lttcomm_relayd_data_hdr *hdr, size_t size)
939 {
940 int ret;
941
942 /* Code flow error. Safety net. */
943 LTTNG_ASSERT(rsock);
944 LTTNG_ASSERT(hdr);
945
946 if (rsock->sock.fd < 0) {
947 return -ECONNRESET;
948 }
949
950 DBG3("Relayd sending data header of size %zu", size);
951
952 /* Again, safety net */
953 if (size == 0) {
954 size = sizeof(struct lttcomm_relayd_data_hdr);
955 }
956
957 /* Only send data header. */
958 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
959 if (ret < 0) {
960 ret = -errno;
961 goto error;
962 }
963
964 /*
965 * The data MUST be sent right after that command for the receive on the
966 * other end to match the size in the header.
967 */
968
969 error:
970 return ret;
971 }
972
973 /*
974 * Send close stream command to the relayd.
975 */
976 int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
977 uint64_t last_net_seq_num)
978 {
979 int ret;
980 struct lttcomm_relayd_close_stream msg;
981 struct lttcomm_relayd_generic_reply reply;
982
983 /* Code flow error. Safety net. */
984 LTTNG_ASSERT(rsock);
985
986 DBG("Relayd closing stream id %" PRIu64, stream_id);
987
988 memset(&msg, 0, sizeof(msg));
989 msg.stream_id = htobe64(stream_id);
990 msg.last_net_seq_num = htobe64(last_net_seq_num);
991
992 /* Send command */
993 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
994 if (ret < 0) {
995 goto error;
996 }
997
998 /* Receive response */
999 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1000 if (ret < 0) {
1001 goto error;
1002 }
1003
1004 reply.ret_code = be32toh(reply.ret_code);
1005
1006 /* Return session id or negative ret code. */
1007 if (reply.ret_code != LTTNG_OK) {
1008 ret = -1;
1009 ERR("Relayd close stream replied error %d", reply.ret_code);
1010 } else {
1011 /* Success */
1012 ret = 0;
1013 }
1014
1015 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
1016
1017 error:
1018 return ret;
1019 }
1020
1021 /*
1022 * Check for data availability for a given stream id.
1023 *
1024 * Return 0 if NOT pending, 1 if so and a negative value on error.
1025 */
1026 int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1027 uint64_t last_net_seq_num)
1028 {
1029 int ret;
1030 struct lttcomm_relayd_data_pending msg;
1031 struct lttcomm_relayd_generic_reply reply;
1032
1033 /* Code flow error. Safety net. */
1034 LTTNG_ASSERT(rsock);
1035
1036 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
1037
1038 memset(&msg, 0, sizeof(msg));
1039 msg.stream_id = htobe64(stream_id);
1040 msg.last_net_seq_num = htobe64(last_net_seq_num);
1041
1042 /* Send command */
1043 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
1044 sizeof(msg), 0);
1045 if (ret < 0) {
1046 goto error;
1047 }
1048
1049 /* Receive response */
1050 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1051 if (ret < 0) {
1052 goto error;
1053 }
1054
1055 reply.ret_code = be32toh(reply.ret_code);
1056
1057 /* Return session id or negative ret code. */
1058 if (reply.ret_code >= LTTNG_OK) {
1059 ERR("Relayd data pending replied error %d", reply.ret_code);
1060 }
1061
1062 /* At this point, the ret code is either 1 or 0 */
1063 ret = reply.ret_code;
1064
1065 DBG("Relayd data is %s pending for stream id %" PRIu64,
1066 ret == 1 ? "" : "NOT", stream_id);
1067
1068 error:
1069 return ret;
1070 }
1071
1072 /*
1073 * Check on the relayd side for a quiescent state on the control socket.
1074 */
1075 int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
1076 uint64_t metadata_stream_id)
1077 {
1078 int ret;
1079 struct lttcomm_relayd_quiescent_control msg;
1080 struct lttcomm_relayd_generic_reply reply;
1081
1082 /* Code flow error. Safety net. */
1083 LTTNG_ASSERT(rsock);
1084
1085 DBG("Relayd checking quiescent control state");
1086
1087 memset(&msg, 0, sizeof(msg));
1088 msg.stream_id = htobe64(metadata_stream_id);
1089
1090 /* Send command */
1091 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
1092 if (ret < 0) {
1093 goto error;
1094 }
1095
1096 /* Receive response */
1097 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1098 if (ret < 0) {
1099 goto error;
1100 }
1101
1102 reply.ret_code = be32toh(reply.ret_code);
1103
1104 /* Return session id or negative ret code. */
1105 if (reply.ret_code != LTTNG_OK) {
1106 ret = -1;
1107 ERR("Relayd quiescent control replied error %d", reply.ret_code);
1108 goto error;
1109 }
1110
1111 /* Control socket is quiescent */
1112 return 0;
1113
1114 error:
1115 return ret;
1116 }
1117
1118 /*
1119 * Begin a data pending command for a specific session id.
1120 */
1121 int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
1122 {
1123 int ret;
1124 struct lttcomm_relayd_begin_data_pending msg;
1125 struct lttcomm_relayd_generic_reply reply;
1126
1127 /* Code flow error. Safety net. */
1128 LTTNG_ASSERT(rsock);
1129
1130 DBG("Relayd begin data pending");
1131
1132 memset(&msg, 0, sizeof(msg));
1133 msg.session_id = htobe64(id);
1134
1135 /* Send command */
1136 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
1137 if (ret < 0) {
1138 goto error;
1139 }
1140
1141 /* Receive response */
1142 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1143 if (ret < 0) {
1144 goto error;
1145 }
1146
1147 reply.ret_code = be32toh(reply.ret_code);
1148
1149 /* Return session id or negative ret code. */
1150 if (reply.ret_code != LTTNG_OK) {
1151 ret = -1;
1152 ERR("Relayd begin data pending replied error %d", reply.ret_code);
1153 goto error;
1154 }
1155
1156 return 0;
1157
1158 error:
1159 return ret;
1160 }
1161
1162 /*
1163 * End a data pending command for a specific session id.
1164 *
1165 * Return 0 on success and set is_data_inflight to 0 if no data is being
1166 * streamed or 1 if it is the case.
1167 */
1168 int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
1169 unsigned int *is_data_inflight)
1170 {
1171 int ret, recv_ret;
1172 struct lttcomm_relayd_end_data_pending msg;
1173 struct lttcomm_relayd_generic_reply reply;
1174
1175 /* Code flow error. Safety net. */
1176 LTTNG_ASSERT(rsock);
1177
1178 DBG("Relayd end data pending");
1179
1180 memset(&msg, 0, sizeof(msg));
1181 msg.session_id = htobe64(id);
1182
1183 /* Send command */
1184 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
1185 if (ret < 0) {
1186 goto error;
1187 }
1188
1189 /* Receive response */
1190 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1191 if (ret < 0) {
1192 goto error;
1193 }
1194
1195 recv_ret = be32toh(reply.ret_code);
1196 if (recv_ret < 0) {
1197 ret = recv_ret;
1198 goto error;
1199 }
1200
1201 *is_data_inflight = recv_ret;
1202
1203 DBG("Relayd end data pending is data inflight: %d", recv_ret);
1204
1205 return 0;
1206
1207 error:
1208 return ret;
1209 }
1210
1211 /*
1212 * Send index to the relayd.
1213 */
1214 int relayd_send_index(struct lttcomm_relayd_sock *rsock,
1215 struct ctf_packet_index *index, uint64_t relay_stream_id,
1216 uint64_t net_seq_num)
1217 {
1218 int ret;
1219 struct lttcomm_relayd_index msg;
1220 struct lttcomm_relayd_generic_reply reply;
1221
1222 /* Code flow error. Safety net. */
1223 LTTNG_ASSERT(rsock);
1224
1225 if (rsock->minor < 4) {
1226 DBG("Not sending indexes before protocol 2.4");
1227 ret = 0;
1228 goto error;
1229 }
1230
1231 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1232
1233 memset(&msg, 0, sizeof(msg));
1234 msg.relay_stream_id = htobe64(relay_stream_id);
1235 msg.net_seq_num = htobe64(net_seq_num);
1236
1237 /* The index is already in big endian. */
1238 msg.packet_size = index->packet_size;
1239 msg.content_size = index->content_size;
1240 msg.timestamp_begin = index->timestamp_begin;
1241 msg.timestamp_end = index->timestamp_end;
1242 msg.events_discarded = index->events_discarded;
1243 msg.stream_id = index->stream_id;
1244
1245 if (rsock->minor >= 8) {
1246 msg.stream_instance_id = index->stream_instance_id;
1247 msg.packet_seq_num = index->packet_seq_num;
1248 }
1249
1250 /* Send command */
1251 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1252 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1253 rsock->minor),
1254 lttng_to_index_minor(rsock->major, rsock->minor)),
1255 0);
1256 if (ret < 0) {
1257 goto error;
1258 }
1259
1260 /* Receive response */
1261 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1262 if (ret < 0) {
1263 goto error;
1264 }
1265
1266 reply.ret_code = be32toh(reply.ret_code);
1267
1268 /* Return session id or negative ret code. */
1269 if (reply.ret_code != LTTNG_OK) {
1270 ret = -1;
1271 ERR("Relayd send index replied error %d", reply.ret_code);
1272 } else {
1273 /* Success */
1274 ret = 0;
1275 }
1276
1277 error:
1278 return ret;
1279 }
1280
1281 /*
1282 * Ask the relay to reset the metadata trace file (regeneration).
1283 */
1284 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1285 uint64_t stream_id, uint64_t version)
1286 {
1287 int ret;
1288 struct lttcomm_relayd_reset_metadata msg;
1289 struct lttcomm_relayd_generic_reply reply;
1290
1291 /* Code flow error. Safety net. */
1292 LTTNG_ASSERT(rsock);
1293
1294 /* Should have been prevented by the sessiond. */
1295 if (rsock->minor < 8) {
1296 ERR("Metadata regeneration unsupported before 2.8");
1297 ret = -1;
1298 goto error;
1299 }
1300
1301 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1302
1303 memset(&msg, 0, sizeof(msg));
1304 msg.stream_id = htobe64(stream_id);
1305 msg.version = htobe64(version);
1306
1307 /* Send command */
1308 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1309 if (ret < 0) {
1310 goto error;
1311 }
1312
1313 /* Receive response */
1314 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1315 if (ret < 0) {
1316 goto error;
1317 }
1318
1319 reply.ret_code = be32toh(reply.ret_code);
1320
1321 /* Return session id or negative ret code. */
1322 if (reply.ret_code != LTTNG_OK) {
1323 ret = -1;
1324 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1325 } else {
1326 /* Success */
1327 ret = 0;
1328 }
1329
1330 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1331
1332 error:
1333 return ret;
1334 }
1335
1336 int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
1337 unsigned int stream_count, const uint64_t *new_chunk_id,
1338 const struct relayd_stream_rotation_position *positions)
1339 {
1340 int ret;
1341 unsigned int i;
1342 struct lttng_dynamic_buffer payload;
1343 struct lttcomm_relayd_generic_reply reply = {};
1344 struct lttcomm_relayd_rotate_streams msg;
1345 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1346 const char *new_chunk_id_str;
1347
1348 msg.stream_count = htobe32((uint32_t) stream_count);
1349 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1350 .is_set = !!new_chunk_id,
1351 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1352 };
1353
1354 if (!relayd_supports_chunks(sock)) {
1355 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1356 return 0;
1357 }
1358
1359 lttng_dynamic_buffer_init(&payload);
1360
1361 /* Code flow error. Safety net. */
1362 LTTNG_ASSERT(sock);
1363
1364 if (new_chunk_id) {
1365 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1366 "%" PRIu64, *new_chunk_id);
1367 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1368 new_chunk_id_str = "formatting error";
1369 } else {
1370 new_chunk_id_str = new_chunk_id_buf;
1371 }
1372 } else {
1373 new_chunk_id_str = "none";
1374 }
1375
1376 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1377 new_chunk_id_str, stream_count);
1378
1379 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1380 if (ret) {
1381 ERR("Failed to allocate \"rotate streams\" command payload");
1382 goto error;
1383 }
1384
1385 for (i = 0; i < stream_count; i++) {
1386 const struct relayd_stream_rotation_position *position =
1387 &positions[i];
1388 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1389 .stream_id = htobe64(position->stream_id),
1390 .rotate_at_seq_num = htobe64(
1391 position->rotate_at_seq_num),
1392 };
1393
1394 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
1395 position->stream_id,
1396 position->rotate_at_seq_num);
1397 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1398 sizeof(comm_position));
1399 if (ret) {
1400 ERR("Failed to allocate \"rotate streams\" command payload");
1401 goto error;
1402 }
1403 }
1404
1405 /* Send command. */
1406 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1407 payload.size, 0);
1408 if (ret < 0) {
1409 ERR("Failed to send \"rotate stream\" command");
1410 goto error;
1411 }
1412
1413 /* Receive response. */
1414 ret = recv_reply(sock, &reply, sizeof(reply));
1415 if (ret < 0) {
1416 ERR("Failed to receive \"rotate streams\" command reply");
1417 goto error;
1418 }
1419
1420 reply.ret_code = be32toh(reply.ret_code);
1421 if (reply.ret_code != LTTNG_OK) {
1422 ret = -1;
1423 ERR("Relayd rotate streams replied error %d", reply.ret_code);
1424 } else {
1425 /* Success. */
1426 ret = 0;
1427 DBG("Relayd rotated streams successfully");
1428 }
1429
1430 error:
1431 lttng_dynamic_buffer_reset(&payload);
1432 return ret;
1433 }
1434
1435 int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1436 struct lttng_trace_chunk *chunk)
1437 {
1438 int ret = 0;
1439 enum lttng_trace_chunk_status status;
1440 struct lttcomm_relayd_create_trace_chunk msg = {};
1441 struct lttcomm_relayd_generic_reply reply = {};
1442 struct lttng_dynamic_buffer payload;
1443 uint64_t chunk_id;
1444 time_t creation_timestamp;
1445 const char *chunk_name;
1446 size_t chunk_name_length;
1447 bool overridden_name;
1448
1449 lttng_dynamic_buffer_init(&payload);
1450
1451 if (!relayd_supports_chunks(sock)) {
1452 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1453 goto end;
1454 }
1455
1456 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1457 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1458 ret = -1;
1459 goto end;
1460 }
1461
1462 status = lttng_trace_chunk_get_creation_timestamp(
1463 chunk, &creation_timestamp);
1464 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1465 ret = -1;
1466 goto end;
1467 }
1468
1469 status = lttng_trace_chunk_get_name(
1470 chunk, &chunk_name, &overridden_name);
1471 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1472 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1473 ret = -1;
1474 goto end;
1475 }
1476
1477 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
1478 msg.chunk_id = htobe64(chunk_id);
1479 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1480 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
1481
1482 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1483 if (ret) {
1484 goto end;
1485 }
1486 if (chunk_name_length) {
1487 ret = lttng_dynamic_buffer_append(
1488 &payload, chunk_name, chunk_name_length);
1489 if (ret) {
1490 goto end;
1491 }
1492 }
1493
1494 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1495 payload.size, 0);
1496 if (ret < 0) {
1497 ERR("Failed to send trace chunk creation command to relay daemon");
1498 goto end;
1499 }
1500
1501 ret = recv_reply(sock, &reply, sizeof(reply));
1502 if (ret < 0) {
1503 ERR("Failed to receive relay daemon trace chunk creation command reply");
1504 goto end;
1505 }
1506
1507 reply.ret_code = be32toh(reply.ret_code);
1508 if (reply.ret_code != LTTNG_OK) {
1509 ret = -1;
1510 ERR("Relayd trace chunk create replied error %d",
1511 reply.ret_code);
1512 } else {
1513 ret = 0;
1514 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1515 chunk_id);
1516 }
1517
1518 end:
1519 lttng_dynamic_buffer_reset(&payload);
1520 return ret;
1521 }
1522
1523 int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1524 struct lttng_trace_chunk *chunk,
1525 char *path)
1526 {
1527 int ret = 0;
1528 enum lttng_trace_chunk_status status;
1529 struct lttcomm_relayd_close_trace_chunk msg = {};
1530 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
1531 uint64_t chunk_id;
1532 time_t close_timestamp;
1533 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1534
1535 if (!relayd_supports_chunks(sock)) {
1536 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1537 goto end;
1538 }
1539
1540 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1541 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1542 ERR("Failed to get trace chunk id");
1543 ret = -1;
1544 goto end;
1545 }
1546
1547 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1548 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1549 ERR("Failed to get trace chunk close timestamp");
1550 ret = -1;
1551 goto end;
1552 }
1553
1554 status = lttng_trace_chunk_get_close_command(chunk,
1555 &close_command.value);
1556 switch (status) {
1557 case LTTNG_TRACE_CHUNK_STATUS_OK:
1558 close_command.is_set = 1;
1559 break;
1560 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1561 break;
1562 default:
1563 ERR("Failed to get trace chunk close command");
1564 ret = -1;
1565 goto end;
1566 }
1567
1568 msg = (typeof(msg)){
1569 .chunk_id = htobe64(chunk_id),
1570 .close_timestamp = htobe64((uint64_t) close_timestamp),
1571 .close_command = {
1572 .is_set = close_command.is_set,
1573 .value = htobe32((uint32_t) close_command.value),
1574 },
1575 };
1576
1577 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1578 0);
1579 if (ret < 0) {
1580 ERR("Failed to send trace chunk close command to relay daemon");
1581 goto end;
1582 }
1583
1584 ret = recv_reply(sock, &reply, sizeof(reply));
1585 if (ret < 0) {
1586 ERR("Failed to receive relay daemon trace chunk close command reply");
1587 goto end;
1588 }
1589
1590 reply.path_length = be32toh(reply.path_length);
1591 if (reply.path_length >= LTTNG_PATH_MAX) {
1592 ERR("Chunk path too long");
1593 ret = -1;
1594 goto end;
1595 }
1596
1597 ret = recv_reply(sock, path, reply.path_length);
1598 if (ret < 0) {
1599 ERR("Failed to receive relay daemon trace chunk close command reply");
1600 goto end;
1601 }
1602 if (path[reply.path_length - 1] != '\0') {
1603 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1604 ret = -1;
1605 goto end;
1606 }
1607
1608 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1609 if (reply.generic.ret_code != LTTNG_OK) {
1610 ret = -1;
1611 ERR("Relayd trace chunk close replied error %d",
1612 reply.generic.ret_code);
1613 } else {
1614 ret = 0;
1615 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1616 chunk_id);
1617 }
1618 end:
1619 return ret;
1620 }
1621
1622 int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1623 uint64_t chunk_id, bool *chunk_exists)
1624 {
1625 int ret = 0;
1626 struct lttcomm_relayd_trace_chunk_exists msg = {};
1627 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1628
1629 if (!relayd_supports_chunks(sock)) {
1630 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
1631 /* The chunk will never exist */
1632 *chunk_exists = false;
1633 goto end;
1634 }
1635
1636 msg = (typeof(msg)){
1637 .chunk_id = htobe64(chunk_id),
1638 };
1639
1640 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1641 0);
1642 if (ret < 0) {
1643 ERR("Failed to send trace chunk exists command to relay daemon");
1644 goto end;
1645 }
1646
1647 ret = recv_reply(sock, &reply, sizeof(reply));
1648 if (ret < 0) {
1649 ERR("Failed to receive relay daemon trace chunk close command reply");
1650 goto end;
1651 }
1652
1653 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1654 if (reply.generic.ret_code != LTTNG_OK) {
1655 ret = -1;
1656 ERR("Relayd trace chunk close replied error %d",
1657 reply.generic.ret_code);
1658 } else {
1659 ret = 0;
1660 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1661 ", exists = %s", chunk_id,
1662 reply.trace_chunk_exists ? "true" : "false");
1663 *chunk_exists = !!reply.trace_chunk_exists;
1664 }
1665 end:
1666 return ret;
1667 }
1668
1669 enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
1670 uint64_t query_flags,
1671 uint64_t& result_flags,
1672 uint64_t *trace_format_query_results)
1673 {
1674 int ret;
1675 enum lttng_error_code ret_code = LTTNG_OK;
1676 struct lttcomm_relayd_get_configuration msg = {};
1677 struct lttcomm_relayd_get_configuration_reply reply = {};
1678 lttng_dynamic_buffer buffer;
1679 bool requesting_trace_format = query_flags &
1680 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1681
1682 lttng_dynamic_buffer_init(&buffer);
1683
1684 assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
1685 assert(!(requesting_trace_format && !trace_format_query_results));
1686
1687 if (!relayd_supports_get_configuration(sock)) {
1688 DBG("Refusing to get relayd configuration (unsupported by relayd)");
1689 result_flags = 0;
1690 if (trace_format_query_results) {
1691 *trace_format_query_results =
1692 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1693 }
1694 goto end;
1695 }
1696
1697 if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
1698 /*
1699 * Provide default value for that query since lttng-relayd does
1700 * not know this query type.
1701 */
1702 if (trace_format_query_results) {
1703 *trace_format_query_results =
1704 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1705 }
1706 /* Remove from the query set. */
1707 query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1708 requesting_trace_format = false;
1709 }
1710
1711 msg.query_flags = htobe64(query_flags);
1712
1713 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
1714 if (ret < 0) {
1715 ERR("Failed to send get configuration command to relay daemon");
1716 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1717 goto end;
1718 }
1719
1720 ret = recv_reply(sock, &reply, sizeof(reply));
1721 if (ret < 0) {
1722 ERR("Failed to receive relay daemon get configuration command reply");
1723 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1724 goto end;
1725 }
1726
1727 ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
1728 if (ret_code != LTTNG_OK) {
1729 ERR("Relayd get configuration replied error %d", ret_code);
1730 goto end;
1731 }
1732
1733 result_flags = be64toh(reply.relayd_configuration_flags);
1734 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1735 ", results_flags = %" PRIu64,
1736 query_flags, result_flags);
1737
1738 if (!requesting_trace_format) {
1739 ret_code = LTTNG_OK;
1740 goto end;
1741 }
1742
1743 /* Receive trace formats */
1744 {
1745 lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
1746 ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
1747 if (ret < 0) {
1748 ERR("Failed to receive relay daemon get configuration query flag data");
1749 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1750 goto end;
1751 }
1752
1753 query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
1754 LTTNG_ASSERT(query_flag_reply.query_flag &
1755 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
1756
1757 query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
1758 LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
1759
1760 lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
1761
1762 ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
1763 if (ret < 0) {
1764 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1765 ERR("Failed to receive configuration dynamic payload for flag TODO");
1766 goto end;
1767 }
1768
1769 *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
1770 }
1771
1772 ret_code = LTTNG_OK;
1773
1774 end:
1775 lttng_dynamic_buffer_reset(&buffer);
1776 return ret_code;
1777 }
This page took 0.102343 seconds and 5 git commands to generate.