Save/load: support session trace format
[lttng-tools.git] / src / common / relayd / relayd.cpp
CommitLineData
00e2e675 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
00e2e675 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
00e2e675 5 *
00e2e675
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
00e2e675
DG
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/stat.h>
77c7c900 13#include <inttypes.h>
00e2e675 14
f4c5b127 15#include <bitset>
c9e313bc 16#include <common/common.hpp>
c9e313bc
SM
17#include <common/compat/endian.hpp>
18#include <common/compat/string.hpp>
f4c5b127 19#include <common/defaults.hpp>
c9e313bc 20#include <common/index/ctf-index.hpp>
f4c5b127 21#include <common/sessiond-comm/relayd.hpp>
c9e313bc 22#include <common/string-utils/format.hpp>
f4c5b127 23#include <common/trace-chunk.hpp>
00e2e675 24
c9e313bc 25#include "relayd.hpp"
00e2e675 26
070b6a86
MD
27static
28bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
29{
30 if (sock->major > 2) {
31 return true;
32 } else if (sock->major == 2 && sock->minor >= 11) {
33 return true;
34 }
35 return false;
36}
37
8614e600
MD
38static
39bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
40{
41 if (sock->major > 2) {
42 return true;
43 } else if (sock->major == 2 && sock->minor >= 12) {
44 return true;
45 }
46 return false;
47}
48
f4c5b127
JR
49static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
50{
51 if (sock->major > 2) {
52 return true;
53 } else if (sock->major == 2 && sock->minor >= 15) {
54 return true;
55 }
56 return false;
57}
58
00e2e675
DG
59/*
60 * Send command. Fill up the header and append the data.
61 */
6151a90f 62static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 63 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
64 int flags)
65{
66 int ret;
67 struct lttcomm_relayd_hdr header;
68 char *buf;
69 uint64_t buf_size = sizeof(header);
70
f96e4545
MD
71 if (rsock->sock.fd < 0) {
72 return -ECONNRESET;
73 }
74
00e2e675
DG
75 if (data) {
76 buf_size += size;
77 }
78
64803277 79 buf = calloc<char>(buf_size);
00e2e675
DG
80 if (buf == NULL) {
81 PERROR("zmalloc relayd send command buf");
f4c5b127 82 ret = -ENOMEM;
00e2e675
DG
83 goto alloc_error;
84 }
85
53efb85a 86 memset(&header, 0, sizeof(header));
00e2e675
DG
87 header.cmd = htobe32(cmd);
88 header.data_size = htobe64(size);
89
90 /* Zeroed for now since not used. */
91 header.cmd_version = 0;
92 header.circuit_id = 0;
93
94 /* Prepare buffer to send. */
95 memcpy(buf, &header, sizeof(header));
96 if (data) {
97 memcpy(buf + sizeof(header), data, size);
98 }
99
00e71031
FD
100 DBG3("Relayd sending command %s of size %" PRIu64,
101 lttcomm_relayd_command_str(cmd), buf_size);
6151a90f 102 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 103 if (ret < 0) {
00e71031
FD
104 PERROR("Failed to send command %s of size %" PRIu64,
105 lttcomm_relayd_command_str(cmd), buf_size);
8994307f 106 ret = -errno;
00e2e675
DG
107 goto error;
108 }
00e2e675
DG
109error:
110 free(buf);
111alloc_error:
112 return ret;
113}
114
115/*
116 * Receive reply data on socket. This MUST be call after send_command or else
117 * could result in unexpected behavior(s).
118 */
6151a90f 119static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
120{
121 int ret;
122
f96e4545
MD
123 if (rsock->sock.fd < 0) {
124 return -ECONNRESET;
125 }
126
8fd623e0 127 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 128
6151a90f 129 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
130 if (ret <= 0 || ret != size) {
131 if (ret == 0) {
132 /* Orderly shutdown. */
6151a90f 133 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 134 } else {
8fd623e0 135 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 136 rsock->sock.fd, size, ret);
20275fe8
DG
137 }
138 /* Always return -1 here and the caller can use errno. */
139 ret = -1;
00e2e675
DG
140 goto error;
141 }
142
143error:
144 return ret;
145}
146
d3e2ba59 147/*
6fa5fe7c
MD
148 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
149 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
150 * Length for both payloads is stored in the msg struct. A new dynamic size
151 * payload size is introduced.
152 */
153static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
fb9a95c4 154 const char *session_name, const char *hostname,
6fa5fe7c
MD
155 const char *base_path, int session_live_timer,
156 unsigned int snapshot, uint64_t sessiond_session_id,
328c2fe7 157 const lttng_uuid& sessiond_uuid, const uint64_t *current_chunk_id,
ecd1a12f
MD
158 time_t creation_time, bool session_name_contains_creation_time,
159 struct lttcomm_relayd_create_session_reply_2_11 *reply,
160 char *output_path)
f86f6389
JR
161{
162 int ret;
163 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
164 size_t session_name_len;
165 size_t hostname_len;
6fa5fe7c 166 size_t base_path_len;
f86f6389 167 size_t msg_length;
6fa5fe7c 168 char *dst;
f86f6389 169
46ef2188
MD
170 if (!base_path) {
171 base_path = "";
172 }
173 /* The three names are sent with a '\0' delimiter between them. */
f86f6389
JR
174 session_name_len = strlen(session_name) + 1;
175 hostname_len = strlen(hostname) + 1;
17e736a5 176 base_path_len = strlen(base_path) + 1;
f86f6389 177
6fa5fe7c 178 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
64803277 179 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
f86f6389
JR
180 if (!msg) {
181 PERROR("zmalloc create_session_2_11 command message");
182 ret = -1;
183 goto error;
184 }
185
a0377dfe 186 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
f86f6389
JR
187 msg->session_name_len = htobe32(session_name_len);
188
a0377dfe 189 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
f86f6389
JR
190 msg->hostname_len = htobe32(hostname_len);
191
a0377dfe 192 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
6fa5fe7c
MD
193 msg->base_path_len = htobe32(base_path_len);
194
195 dst = msg->names;
196 if (lttng_strncpy(dst, session_name, session_name_len)) {
197 ret = -1;
198 goto error;
199 }
200 dst += session_name_len;
201 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
202 ret = -1;
203 goto error;
204 }
6fa5fe7c 205 dst += hostname_len;
36f9f13b 206 if (lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
207 ret = -1;
208 goto error;
209 }
210
211 msg->live_timer = htobe32(session_live_timer);
212 msg->snapshot = !!snapshot;
213
328c2fe7 214 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
658f12fa 215 msg->session_id = htobe64(sessiond_session_id);
46ef2188 216 msg->session_name_contains_creation_time = session_name_contains_creation_time;
f39bd140
JG
217 if (current_chunk_id) {
218 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
219 htobe64(*current_chunk_id));
220 }
221
db1da059
JG
222 msg->creation_time = htobe64((uint64_t) creation_time);
223
f86f6389
JR
224 /* Send command */
225 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
226 if (ret < 0) {
227 goto error;
228 }
ecd1a12f
MD
229 /* Receive response */
230 ret = recv_reply(rsock, reply, sizeof(*reply));
231 if (ret < 0) {
232 goto error;
233 }
234 reply->generic.session_id = be64toh(reply->generic.session_id);
235 reply->generic.ret_code = be32toh(reply->generic.ret_code);
236 reply->output_path_length = be32toh(reply->output_path_length);
237 if (reply->output_path_length >= LTTNG_PATH_MAX) {
238 ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
239 reply->output_path_length, LTTNG_PATH_MAX);
240 ret = -1;
241 goto error;
242 }
243 ret = recv_reply(rsock, output_path, reply->output_path_length);
244 if (ret < 0) {
245 goto error;
246 }
f86f6389
JR
247error:
248 free(msg);
249 return ret;
250}
251/*
252 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
253 * support the live reading capability.
254 */
255static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
fb9a95c4 256 const char *session_name, const char *hostname,
ecd1a12f
MD
257 int session_live_timer, unsigned int snapshot,
258 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
259{
260 int ret;
261 struct lttcomm_relayd_create_session_2_4 msg;
262
3a13ffd5
MD
263 if (lttng_strncpy(msg.session_name, session_name,
264 sizeof(msg.session_name))) {
246777db
MD
265 ret = -1;
266 goto error;
267 }
3a13ffd5 268 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
269 ret = -1;
270 goto error;
271 }
d3e2ba59 272 msg.live_timer = htobe32(session_live_timer);
7d2f7452 273 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
274
275 /* Send command */
276 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
277 if (ret < 0) {
278 goto error;
279 }
280
ecd1a12f
MD
281 /* Receive response */
282 ret = recv_reply(rsock, reply, sizeof(*reply));
283 if (ret < 0) {
284 goto error;
285 }
286 reply->session_id = be64toh(reply->session_id);
287 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
288error:
289 return ret;
290}
291
292/*
293 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
294 */
ecd1a12f
MD
295static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
296 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
297{
298 int ret;
299
300 /* Send command */
301 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
302 if (ret < 0) {
303 goto error;
304 }
305
ecd1a12f
MD
306 /* Receive response */
307 ret = recv_reply(rsock, reply, sizeof(*reply));
308 if (ret < 0) {
309 goto error;
310 }
311 reply->session_id = be64toh(reply->session_id);
312 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
313error:
314 return ret;
315}
316
c5b6f4f0
DG
317/*
318 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
319 * set session_id of the relayd if we have a successful reply from the relayd.
320 *
20275fe8
DG
321 * On success, return 0 else a negative value which is either an errno error or
322 * a lttng error code from the relayd.
c5b6f4f0 323 */
fb9a95c4
JG
324int relayd_create_session(struct lttcomm_relayd_sock *rsock,
325 uint64_t *relayd_session_id,
326 const char *session_name, const char *hostname,
6fa5fe7c 327 const char *base_path, int session_live_timer,
658f12fa 328 unsigned int snapshot, uint64_t sessiond_session_id,
328c2fe7 329 const lttng_uuid& sessiond_uuid,
db1da059 330 const uint64_t *current_chunk_id,
ecd1a12f
MD
331 time_t creation_time, bool session_name_contains_creation_time,
332 char *output_path)
c5b6f4f0
DG
333{
334 int ret;
ecd1a12f 335 struct lttcomm_relayd_create_session_reply_2_11 reply = {};
c5b6f4f0 336
a0377dfe
FD
337 LTTNG_ASSERT(rsock);
338 LTTNG_ASSERT(relayd_session_id);
c5b6f4f0
DG
339
340 DBG("Relayd create session");
341
f86f6389
JR
342 if (rsock->minor < 4) {
343 /* From 2.1 to 2.3 */
ecd1a12f 344 ret = relayd_create_session_2_1(rsock, &reply.generic);
f86f6389
JR
345 } else if (rsock->minor >= 4 && rsock->minor < 11) {
346 /* From 2.4 to 2.10 */
347 ret = relayd_create_session_2_4(rsock, session_name,
ecd1a12f
MD
348 hostname, session_live_timer, snapshot,
349 &reply.generic);
f86f6389
JR
350 } else {
351 /* From 2.11 to ... */
352 ret = relayd_create_session_2_11(rsock, session_name,
6fa5fe7c 353 hostname, base_path, session_live_timer, snapshot,
f39bd140 354 sessiond_session_id, sessiond_uuid,
46ef2188 355 current_chunk_id, creation_time,
ecd1a12f
MD
356 session_name_contains_creation_time,
357 &reply, output_path);
c5b6f4f0
DG
358 }
359
c5b6f4f0
DG
360 if (ret < 0) {
361 goto error;
362 }
363
c5b6f4f0 364 /* Return session id or negative ret code. */
ecd1a12f 365 if (reply.generic.ret_code != LTTNG_OK) {
bb63afd9 366 ret = -1;
ecd1a12f
MD
367 ERR("Relayd create session replied error %d",
368 reply.generic.ret_code);
c5b6f4f0
DG
369 goto error;
370 } else {
371 ret = 0;
ecd1a12f 372 *relayd_session_id = reply.generic.session_id;
c5b6f4f0
DG
373 }
374
ecd1a12f 375 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
c5b6f4f0
DG
376
377error:
378 return ret;
379}
380
2f21a469
JR
381static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
382 const char *channel_name, const char *pathname)
383{
384 int ret;
385 struct lttcomm_relayd_add_stream msg;
386
387 memset(&msg, 0, sizeof(msg));
388 if (lttng_strncpy(msg.channel_name, channel_name,
389 sizeof(msg.channel_name))) {
390 ret = -1;
391 goto error;
392 }
393
394 if (lttng_strncpy(msg.pathname, pathname,
395 sizeof(msg.pathname))) {
396 ret = -1;
397 goto error;
398 }
399
400 /* Send command */
401 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
402 if (ret < 0) {
403 ret = -1;
404 goto error;
405 }
406 ret = 0;
407error:
408 return ret;
409}
410
411static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
412 const char *channel_name, const char *pathname,
413 uint64_t tracefile_size, uint64_t tracefile_count)
414{
415 int ret;
416 struct lttcomm_relayd_add_stream_2_2 msg;
417
418 memset(&msg, 0, sizeof(msg));
419 /* Compat with relayd 2.2 to 2.10 */
420 if (lttng_strncpy(msg.channel_name, channel_name,
421 sizeof(msg.channel_name))) {
422 ret = -1;
423 goto error;
424 }
425 if (lttng_strncpy(msg.pathname, pathname,
426 sizeof(msg.pathname))) {
427 ret = -1;
428 goto error;
429 }
430 msg.tracefile_size = htobe64(tracefile_size);
431 msg.tracefile_count = htobe64(tracefile_count);
432
433 /* Send command */
434 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
435 if (ret < 0) {
436 goto error;
437 }
438 ret = 0;
439error:
440 return ret;
441}
442
443static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
444 const char *channel_name, const char *pathname,
b00e554e
JG
445 uint64_t tracefile_size, uint64_t tracefile_count,
446 uint64_t trace_archive_id)
2f21a469
JR
447{
448 int ret;
449 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
450 size_t channel_name_len;
451 size_t pathname_len;
452 size_t msg_length;
453
454 /* The two names are sent with a '\0' delimiter between them. */
455 channel_name_len = strlen(channel_name) + 1;
456 pathname_len = strlen(pathname) + 1;
457
458 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
64803277 459 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
2f21a469
JR
460 if (!msg) {
461 PERROR("zmalloc add_stream_2_11 command message");
462 ret = -1;
463 goto error;
464 }
465
a0377dfe 466 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
2f21a469
JR
467 msg->channel_name_len = htobe32(channel_name_len);
468
a0377dfe 469 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
2f21a469
JR
470 msg->pathname_len = htobe32(pathname_len);
471
472 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
473 ret = -1;
474 goto error;
475 }
476 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
477 ret = -1;
478 goto error;
479 }
480
481 msg->tracefile_size = htobe64(tracefile_size);
482 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 483 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
484
485 /* Send command */
486 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
487 if (ret < 0) {
488 goto error;
489 }
490 ret = 0;
491error:
492 free(msg);
493 return ret;
494}
495
00e2e675
DG
496/*
497 * Add stream on the relayd and assign stream handle to the stream_id argument.
498 *
070b6a86
MD
499 * Chunks are not supported by relayd prior to 2.11, but are used to
500 * internally between session daemon and consumer daemon to keep track
501 * of the channel and stream output path.
502 *
00e2e675
DG
503 * On success return 0 else return ret_code negative value.
504 */
6151a90f 505int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
5da88b0f 506 const char *domain_name, const char *_pathname, uint64_t *stream_id,
0b50e4b3 507 uint64_t tracefile_size, uint64_t tracefile_count,
d2956687 508 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
509{
510 int ret;
00e2e675 511 struct lttcomm_relayd_status_stream reply;
5da88b0f 512 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
00e2e675
DG
513
514 /* Code flow error. Safety net. */
a0377dfe
FD
515 LTTNG_ASSERT(rsock);
516 LTTNG_ASSERT(channel_name);
517 LTTNG_ASSERT(domain_name);
518 LTTNG_ASSERT(_pathname);
519 LTTNG_ASSERT(trace_chunk);
00e2e675
DG
520
521 DBG("Relayd adding stream for channel name %s", channel_name);
522
0f907de1
JD
523 /* Compat with relayd 2.1 */
524 if (rsock->minor == 1) {
2f21a469 525 /* For 2.1 */
5fab2976 526 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
55caead7 527
2f21a469
JR
528 } else if (rsock->minor > 1 && rsock->minor < 11) {
529 /* From 2.2 to 2.10 */
5fab2976 530 ret = relayd_add_stream_2_2(rsock, channel_name, _pathname,
2f21a469 531 tracefile_size, tracefile_count);
0f907de1 532 } else {
5fab2976 533 const char *separator;
d2956687
JG
534 enum lttng_trace_chunk_status chunk_status;
535 uint64_t chunk_id;
536
5fab2976
JR
537 if (_pathname[0] == '\0') {
538 separator = "";
539 } else {
540 separator = "/";
541 }
542
543 ret = snprintf(pathname, RELAYD_COMM_LTTNG_PATH_MAX, "%s%s%s",
544 domain_name, separator, _pathname);
545 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
546 ERR("Failed to format stream path: %s",
547 ret <= 0 ? "formatting error" :
548 "path exceeds maximal allowed length");
549 ret = -1;
550 goto error;
551 }
552
d2956687
JG
553 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
554 &chunk_id);
a0377dfe 555 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 556
2f21a469
JR
557 /* From 2.11 to ...*/
558 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e 559 tracefile_size, tracefile_count,
d2956687 560 chunk_id);
2f21a469 561 }
0f907de1 562
2f21a469
JR
563 if (ret) {
564 ret = -1;
565 goto error;
00e2e675
DG
566 }
567
633d0084 568 /* Waiting for reply */
6151a90f 569 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
570 if (ret < 0) {
571 goto error;
572 }
573
574 /* Back to host bytes order. */
575 reply.handle = be64toh(reply.handle);
576 reply.ret_code = be32toh(reply.ret_code);
577
578 /* Return session id or negative ret code. */
f73fabfd 579 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
580 ret = -1;
581 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
582 } else {
583 /* Success */
584 ret = 0;
585 *stream_id = reply.handle;
586 }
587
77c7c900 588 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 589 reply.handle);
00e2e675
DG
590
591error:
592 return ret;
593}
594
a4baae1b
JD
595/*
596 * Inform the relay that all the streams for the current channel has been sent.
597 *
598 * On success return 0 else return ret_code negative value.
599 */
600int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
601{
602 int ret;
603 struct lttcomm_relayd_generic_reply reply;
604
605 /* Code flow error. Safety net. */
a0377dfe 606 LTTNG_ASSERT(rsock);
a4baae1b
JD
607
608 DBG("Relayd sending streams sent.");
609
610 /* This feature was introduced in 2.4, ignore it for earlier versions. */
611 if (rsock->minor < 4) {
612 ret = 0;
613 goto end;
614 }
615
616 /* Send command */
617 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
618 if (ret < 0) {
619 goto error;
620 }
621
622 /* Waiting for reply */
623 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
624 if (ret < 0) {
625 goto error;
626 }
627
628 /* Back to host bytes order. */
629 reply.ret_code = be32toh(reply.ret_code);
630
631 /* Return session id or negative ret code. */
632 if (reply.ret_code != LTTNG_OK) {
633 ret = -1;
634 ERR("Relayd streams sent replied error %d", reply.ret_code);
635 goto error;
636 } else {
637 /* Success */
638 ret = 0;
639 }
640
641 DBG("Relayd streams sent success");
642
643error:
644end:
645 return ret;
646}
647
00e2e675
DG
648/*
649 * Check version numbers on the relayd.
d4519fa3
JD
650 * If major versions are compatible, we assign minor_to_use to the
651 * minor version of the procotol we are going to use for this session.
00e2e675 652 *
67d5aa28
JD
653 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
654 * otherwise, or a negative value on network errors.
00e2e675 655 */
6151a90f 656int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
657{
658 int ret;
092b6259 659 struct lttcomm_relayd_version msg;
00e2e675
DG
660
661 /* Code flow error. Safety net. */
a0377dfe 662 LTTNG_ASSERT(rsock);
00e2e675 663
6151a90f
JD
664 DBG("Relayd version check for major.minor %u.%u", rsock->major,
665 rsock->minor);
00e2e675 666
53efb85a 667 memset(&msg, 0, sizeof(msg));
092b6259 668 /* Prepare network byte order before transmission. */
6151a90f
JD
669 msg.major = htobe32(rsock->major);
670 msg.minor = htobe32(rsock->minor);
092b6259 671
00e2e675 672 /* Send command */
6151a90f 673 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
674 if (ret < 0) {
675 goto error;
676 }
677
20275fe8 678 /* Receive response */
6151a90f 679 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
680 if (ret < 0) {
681 goto error;
682 }
683
684 /* Set back to host bytes order */
092b6259
DG
685 msg.major = be32toh(msg.major);
686 msg.minor = be32toh(msg.minor);
687
688 /*
689 * Only validate the major version. If the other side is higher,
690 * communication is not possible. Only major version equal can talk to each
691 * other. If the minor version differs, the lowest version is used by both
692 * sides.
092b6259 693 */
6151a90f 694 if (msg.major != rsock->major) {
d4519fa3 695 /* Not compatible */
67d5aa28 696 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 697 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 698 msg.major, rsock->major);
092b6259 699 goto error;
00e2e675
DG
700 }
701
092b6259 702 /*
6151a90f
JD
703 * If the relayd's minor version is higher, it will adapt to our version so
704 * we can continue to use the latest relayd communication data structure.
705 * If the received minor version is higher, the relayd should adapt to us.
092b6259 706 */
6151a90f
JD
707 if (rsock->minor > msg.minor) {
708 rsock->minor = msg.minor;
d4519fa3 709 }
092b6259 710
d4519fa3
JD
711 /* Version number compatible */
712 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 713 rsock->major, rsock->minor);
d4519fa3 714 ret = 0;
00e2e675
DG
715
716error:
717 return ret;
718}
719
00e2e675
DG
720/*
721 * Add stream on the relayd and assign stream handle to the stream_id argument.
722 *
723 * On success return 0 else return ret_code negative value.
724 */
6151a90f 725int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
726{
727 int ret;
728
729 /* Code flow error. Safety net. */
a0377dfe 730 LTTNG_ASSERT(rsock);
00e2e675 731
77c7c900 732 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
733
734 /* Send command */
6151a90f 735 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
736 if (ret < 0) {
737 goto error;
738 }
739
740 DBG2("Relayd metadata added successfully");
741
742 /*
743 * After that call, the metadata data MUST be sent to the relayd so the
744 * receive size on the other end matches the len of the metadata packet
633d0084 745 * header. This is why we don't wait for a reply here.
00e2e675
DG
746 */
747
748error:
749 return ret;
750}
751
752/*
6151a90f 753 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 754 */
6151a90f 755int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
756{
757 /* Code flow error. Safety net. */
a0377dfe 758 LTTNG_ASSERT(rsock);
00e2e675 759
f96e4545
MD
760 if (!rsock->sock.ops) {
761 /*
762 * Attempting a connect on a non-initialized socket.
763 */
764 return -ECONNRESET;
765 }
766
00e2e675
DG
767 DBG3("Relayd connect ...");
768
6151a90f 769 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
770}
771
772/*
6151a90f 773 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
774 *
775 * If no socket operations are found, simply return 0 meaning that everything
776 * is fine. Without operations, the socket can not possibly be opened or used.
777 * This is possible if the socket was allocated but not created. However, the
778 * caller could simply use it to store a valid file descriptor for instance
779 * passed over a Unix socket and call this to cleanup but still without a valid
780 * ops pointer.
781 *
782 * Return the close returned value. On error, a negative value is usually
783 * returned back from close(2).
00e2e675 784 */
6151a90f 785int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 786{
ffe60014
DG
787 int ret;
788
00e2e675 789 /* Code flow error. Safety net. */
a0377dfe 790 LTTNG_ASSERT(rsock);
00e2e675 791
ffe60014 792 /* An invalid fd is fine, return success. */
6151a90f 793 if (rsock->sock.fd < 0) {
ffe60014
DG
794 ret = 0;
795 goto end;
796 }
797
6151a90f 798 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 799
6151a90f
JD
800 if (rsock->sock.ops) {
801 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
802 } else {
803 /* Default call if no specific ops found. */
6151a90f 804 ret = close(rsock->sock.fd);
ffe60014
DG
805 if (ret < 0) {
806 PERROR("relayd_close default close");
807 }
808 }
f96e4545 809 rsock->sock.fd = -1;
ffe60014
DG
810
811end:
812 return ret;
00e2e675
DG
813}
814
815/*
816 * Send data header structure to the relayd.
817 */
6151a90f 818int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
819 struct lttcomm_relayd_data_hdr *hdr, size_t size)
820{
821 int ret;
822
823 /* Code flow error. Safety net. */
a0377dfe
FD
824 LTTNG_ASSERT(rsock);
825 LTTNG_ASSERT(hdr);
00e2e675 826
f96e4545
MD
827 if (rsock->sock.fd < 0) {
828 return -ECONNRESET;
829 }
830
8fd623e0 831 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
832
833 /* Again, safety net */
834 if (size == 0) {
835 size = sizeof(struct lttcomm_relayd_data_hdr);
836 }
837
838 /* Only send data header. */
6151a90f 839 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 840 if (ret < 0) {
8994307f 841 ret = -errno;
00e2e675
DG
842 goto error;
843 }
844
845 /*
846 * The data MUST be sent right after that command for the receive on the
847 * other end to match the size in the header.
848 */
849
850error:
851 return ret;
852}
173af62f
DG
853
854/*
855 * Send close stream command to the relayd.
856 */
6151a90f 857int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
858 uint64_t last_net_seq_num)
859{
860 int ret;
861 struct lttcomm_relayd_close_stream msg;
862 struct lttcomm_relayd_generic_reply reply;
863
864 /* Code flow error. Safety net. */
a0377dfe 865 LTTNG_ASSERT(rsock);
173af62f 866
77c7c900 867 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 868
53efb85a 869 memset(&msg, 0, sizeof(msg));
173af62f
DG
870 msg.stream_id = htobe64(stream_id);
871 msg.last_net_seq_num = htobe64(last_net_seq_num);
872
873 /* Send command */
6151a90f 874 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
875 if (ret < 0) {
876 goto error;
877 }
878
20275fe8 879 /* Receive response */
6151a90f 880 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
881 if (ret < 0) {
882 goto error;
883 }
884
885 reply.ret_code = be32toh(reply.ret_code);
886
887 /* Return session id or negative ret code. */
f73fabfd 888 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
889 ret = -1;
890 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
891 } else {
892 /* Success */
893 ret = 0;
894 }
895
77c7c900 896 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
897
898error:
899 return ret;
900}
c8f59ee5
DG
901
902/*
903 * Check for data availability for a given stream id.
904 *
6d805429 905 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 906 */
6151a90f 907int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
908 uint64_t last_net_seq_num)
909{
910 int ret;
6d805429 911 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
912 struct lttcomm_relayd_generic_reply reply;
913
914 /* Code flow error. Safety net. */
a0377dfe 915 LTTNG_ASSERT(rsock);
c8f59ee5 916
6d805429 917 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 918
53efb85a 919 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
920 msg.stream_id = htobe64(stream_id);
921 msg.last_net_seq_num = htobe64(last_net_seq_num);
922
923 /* Send command */
6151a90f 924 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
925 sizeof(msg), 0);
926 if (ret < 0) {
927 goto error;
928 }
929
20275fe8 930 /* Receive response */
6151a90f 931 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
932 if (ret < 0) {
933 goto error;
934 }
935
936 reply.ret_code = be32toh(reply.ret_code);
937
938 /* Return session id or negative ret code. */
939 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 940 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
941 }
942
943 /* At this point, the ret code is either 1 or 0 */
944 ret = reply.ret_code;
945
6d805429 946 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 947 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
948
949error:
950 return ret;
951}
952
953/*
954 * Check on the relayd side for a quiescent state on the control socket.
955 */
6151a90f 956int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 957 uint64_t metadata_stream_id)
c8f59ee5
DG
958{
959 int ret;
ad7051c0 960 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
961 struct lttcomm_relayd_generic_reply reply;
962
963 /* Code flow error. Safety net. */
a0377dfe 964 LTTNG_ASSERT(rsock);
c8f59ee5
DG
965
966 DBG("Relayd checking quiescent control state");
967
53efb85a 968 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
969 msg.stream_id = htobe64(metadata_stream_id);
970
c8f59ee5 971 /* Send command */
6151a90f 972 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
973 if (ret < 0) {
974 goto error;
975 }
976
20275fe8 977 /* Receive response */
6151a90f 978 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
979 if (ret < 0) {
980 goto error;
981 }
982
983 reply.ret_code = be32toh(reply.ret_code);
984
985 /* Return session id or negative ret code. */
986 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
987 ret = -1;
988 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
989 goto error;
990 }
991
992 /* Control socket is quiescent */
6d805429 993 return 0;
c8f59ee5
DG
994
995error:
996 return ret;
997}
f7079f67
DG
998
999/*
1000 * Begin a data pending command for a specific session id.
1001 */
6151a90f 1002int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
1003{
1004 int ret;
1005 struct lttcomm_relayd_begin_data_pending msg;
1006 struct lttcomm_relayd_generic_reply reply;
1007
1008 /* Code flow error. Safety net. */
a0377dfe 1009 LTTNG_ASSERT(rsock);
f7079f67
DG
1010
1011 DBG("Relayd begin data pending");
1012
53efb85a 1013 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1014 msg.session_id = htobe64(id);
1015
1016 /* Send command */
6151a90f 1017 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1018 if (ret < 0) {
1019 goto error;
1020 }
1021
20275fe8 1022 /* Receive response */
6151a90f 1023 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1024 if (ret < 0) {
1025 goto error;
1026 }
1027
1028 reply.ret_code = be32toh(reply.ret_code);
1029
1030 /* Return session id or negative ret code. */
1031 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1032 ret = -1;
1033 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
1034 goto error;
1035 }
1036
1037 return 0;
1038
1039error:
1040 return ret;
1041}
1042
1043/*
1044 * End a data pending command for a specific session id.
1045 *
1046 * Return 0 on success and set is_data_inflight to 0 if no data is being
1047 * streamed or 1 if it is the case.
1048 */
6151a90f 1049int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
1050 unsigned int *is_data_inflight)
1051{
af6c30b5 1052 int ret, recv_ret;
f7079f67
DG
1053 struct lttcomm_relayd_end_data_pending msg;
1054 struct lttcomm_relayd_generic_reply reply;
1055
1056 /* Code flow error. Safety net. */
a0377dfe 1057 LTTNG_ASSERT(rsock);
f7079f67
DG
1058
1059 DBG("Relayd end data pending");
1060
53efb85a 1061 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1062 msg.session_id = htobe64(id);
1063
1064 /* Send command */
6151a90f 1065 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1066 if (ret < 0) {
1067 goto error;
1068 }
1069
20275fe8 1070 /* Receive response */
6151a90f 1071 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1072 if (ret < 0) {
1073 goto error;
1074 }
1075
af6c30b5
DG
1076 recv_ret = be32toh(reply.ret_code);
1077 if (recv_ret < 0) {
1078 ret = recv_ret;
f7079f67
DG
1079 goto error;
1080 }
1081
af6c30b5 1082 *is_data_inflight = recv_ret;
f7079f67 1083
af6c30b5 1084 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1085
1086 return 0;
1087
1088error:
1089 return ret;
1090}
1c20f0e2
JD
1091
1092/*
1093 * Send index to the relayd.
1094 */
1095int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 1096 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
1097 uint64_t net_seq_num)
1098{
1099 int ret;
1100 struct lttcomm_relayd_index msg;
1101 struct lttcomm_relayd_generic_reply reply;
1102
1103 /* Code flow error. Safety net. */
a0377dfe 1104 LTTNG_ASSERT(rsock);
1c20f0e2
JD
1105
1106 if (rsock->minor < 4) {
1107 DBG("Not sending indexes before protocol 2.4");
1108 ret = 0;
1109 goto error;
1110 }
1111
1112 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1113
53efb85a 1114 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1115 msg.relay_stream_id = htobe64(relay_stream_id);
1116 msg.net_seq_num = htobe64(net_seq_num);
1117
1118 /* The index is already in big endian. */
1119 msg.packet_size = index->packet_size;
1120 msg.content_size = index->content_size;
1121 msg.timestamp_begin = index->timestamp_begin;
1122 msg.timestamp_end = index->timestamp_end;
1123 msg.events_discarded = index->events_discarded;
1124 msg.stream_id = index->stream_id;
1125
234cd636
JD
1126 if (rsock->minor >= 8) {
1127 msg.stream_instance_id = index->stream_instance_id;
1128 msg.packet_seq_num = index->packet_seq_num;
1129 }
1130
1c20f0e2 1131 /* Send command */
f8f3885c
MD
1132 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1133 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1134 rsock->minor),
1135 lttng_to_index_minor(rsock->major, rsock->minor)),
1136 0);
1c20f0e2
JD
1137 if (ret < 0) {
1138 goto error;
1139 }
1140
1141 /* Receive response */
1142 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1143 if (ret < 0) {
1144 goto error;
1145 }
1146
1147 reply.ret_code = be32toh(reply.ret_code);
1148
1149 /* Return session id or negative ret code. */
1150 if (reply.ret_code != LTTNG_OK) {
1151 ret = -1;
1152 ERR("Relayd send index replied error %d", reply.ret_code);
1153 } else {
1154 /* Success */
1155 ret = 0;
1156 }
1157
1158error:
1159 return ret;
1160}
93ec662e
JD
1161
1162/*
1163 * Ask the relay to reset the metadata trace file (regeneration).
1164 */
1165int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1166 uint64_t stream_id, uint64_t version)
1167{
1168 int ret;
1169 struct lttcomm_relayd_reset_metadata msg;
1170 struct lttcomm_relayd_generic_reply reply;
1171
1172 /* Code flow error. Safety net. */
a0377dfe 1173 LTTNG_ASSERT(rsock);
93ec662e
JD
1174
1175 /* Should have been prevented by the sessiond. */
1176 if (rsock->minor < 8) {
1177 ERR("Metadata regeneration unsupported before 2.8");
1178 ret = -1;
1179 goto error;
1180 }
1181
1182 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1183
1184 memset(&msg, 0, sizeof(msg));
1185 msg.stream_id = htobe64(stream_id);
1186 msg.version = htobe64(version);
1187
1188 /* Send command */
1189 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1190 if (ret < 0) {
1191 goto error;
1192 }
1193
1194 /* Receive response */
1195 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1196 if (ret < 0) {
1197 goto error;
1198 }
1199
1200 reply.ret_code = be32toh(reply.ret_code);
1201
1202 /* Return session id or negative ret code. */
1203 if (reply.ret_code != LTTNG_OK) {
1204 ret = -1;
1205 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1206 } else {
1207 /* Success */
1208 ret = 0;
1209 }
1210
1211 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1212
1213error:
1214 return ret;
1215}
a1ae2ea5 1216
c35f9726 1217int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
ebb29c10 1218 unsigned int stream_count, const uint64_t *new_chunk_id,
c35f9726 1219 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1220{
1221 int ret;
c35f9726
JG
1222 unsigned int i;
1223 struct lttng_dynamic_buffer payload;
1224 struct lttcomm_relayd_generic_reply reply = {};
6d15ee45 1225 struct lttcomm_relayd_rotate_streams msg;
c35f9726
JG
1226 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1227 const char *new_chunk_id_str;
d73bf3d7 1228
6d15ee45
JG
1229 msg.stream_count = htobe32((uint32_t) stream_count);
1230 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1231 .is_set = !!new_chunk_id,
1232 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1233 };
1234
070b6a86
MD
1235 if (!relayd_supports_chunks(sock)) {
1236 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1237 return 0;
1238 }
1239
c35f9726 1240 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1241
c35f9726 1242 /* Code flow error. Safety net. */
a0377dfe 1243 LTTNG_ASSERT(sock);
d73bf3d7 1244
c35f9726
JG
1245 if (new_chunk_id) {
1246 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1247 "%" PRIu64, *new_chunk_id);
1248 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1249 new_chunk_id_str = "formatting error";
1250 } else {
1251 new_chunk_id_str = new_chunk_id_buf;
1252 }
1253 } else {
1254 new_chunk_id_str = "none";
d73bf3d7
JD
1255 }
1256
c35f9726
JG
1257 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1258 new_chunk_id_str, stream_count);
d73bf3d7 1259
c35f9726
JG
1260 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1261 if (ret) {
1262 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1263 goto error;
1264 }
1265
c35f9726
JG
1266 for (i = 0; i < stream_count; i++) {
1267 const struct relayd_stream_rotation_position *position =
1268 &positions[i];
1269 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1270 .stream_id = htobe64(position->stream_id),
1271 .rotate_at_seq_num = htobe64(
1272 position->rotate_at_seq_num),
1273 };
1274
a269202e 1275 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
c35f9726
JG
1276 position->stream_id,
1277 position->rotate_at_seq_num);
1278 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1279 sizeof(comm_position));
1280 if (ret) {
1281 ERR("Failed to allocate \"rotate streams\" command payload");
1282 goto error;
1283 }
1284 }
d73bf3d7
JD
1285
1286 /* Send command. */
c35f9726
JG
1287 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1288 payload.size, 0);
d73bf3d7 1289 if (ret < 0) {
c35f9726 1290 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1291 goto error;
1292 }
1293
1294 /* Receive response. */
c35f9726 1295 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1296 if (ret < 0) {
c35f9726 1297 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1298 goto error;
1299 }
1300
1301 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1302 if (reply.ret_code != LTTNG_OK) {
1303 ret = -1;
c35f9726 1304 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1305 } else {
1306 /* Success. */
1307 ret = 0;
c35f9726 1308 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1309 }
1310
1311error:
c35f9726 1312 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1313 return ret;
1314}
e5add6d0
JG
1315
1316int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1317 struct lttng_trace_chunk *chunk)
1318{
1319 int ret = 0;
1320 enum lttng_trace_chunk_status status;
1321 struct lttcomm_relayd_create_trace_chunk msg = {};
1322 struct lttcomm_relayd_generic_reply reply = {};
1323 struct lttng_dynamic_buffer payload;
1324 uint64_t chunk_id;
1325 time_t creation_timestamp;
1326 const char *chunk_name;
1327 size_t chunk_name_length;
913a542b 1328 bool overridden_name;
e5add6d0
JG
1329
1330 lttng_dynamic_buffer_init(&payload);
1331
070b6a86
MD
1332 if (!relayd_supports_chunks(sock)) {
1333 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1334 goto end;
1335 }
1336
e5add6d0
JG
1337 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1338 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1339 ret = -1;
1340 goto end;
1341 }
1342
1343 status = lttng_trace_chunk_get_creation_timestamp(
1344 chunk, &creation_timestamp);
1345 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1346 ret = -1;
1347 goto end;
1348 }
1349
1350 status = lttng_trace_chunk_get_name(
913a542b 1351 chunk, &chunk_name, &overridden_name);
e5add6d0
JG
1352 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1353 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1354 ret = -1;
1355 goto end;
1356 }
1357
913a542b 1358 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
c15e2d3d
JG
1359 msg.chunk_id = htobe64(chunk_id);
1360 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1361 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
e5add6d0
JG
1362
1363 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1364 if (ret) {
1365 goto end;
1366 }
1367 if (chunk_name_length) {
1368 ret = lttng_dynamic_buffer_append(
1369 &payload, chunk_name, chunk_name_length);
1370 if (ret) {
1371 goto end;
1372 }
1373 }
1374
bbc4768c
JG
1375 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1376 payload.size, 0);
e5add6d0
JG
1377 if (ret < 0) {
1378 ERR("Failed to send trace chunk creation command to relay daemon");
1379 goto end;
1380 }
1381
1382 ret = recv_reply(sock, &reply, sizeof(reply));
1383 if (ret < 0) {
1384 ERR("Failed to receive relay daemon trace chunk creation command reply");
1385 goto end;
1386 }
1387
1388 reply.ret_code = be32toh(reply.ret_code);
1389 if (reply.ret_code != LTTNG_OK) {
1390 ret = -1;
1391 ERR("Relayd trace chunk create replied error %d",
1392 reply.ret_code);
1393 } else {
1394 ret = 0;
1395 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1396 chunk_id);
1397 }
1398
1399end:
1400 lttng_dynamic_buffer_reset(&payload);
1401 return ret;
1402}
bbc4768c
JG
1403
1404int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
ecd1a12f
MD
1405 struct lttng_trace_chunk *chunk,
1406 char *path)
bbc4768c
JG
1407{
1408 int ret = 0;
1409 enum lttng_trace_chunk_status status;
1410 struct lttcomm_relayd_close_trace_chunk msg = {};
ecd1a12f 1411 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
bbc4768c
JG
1412 uint64_t chunk_id;
1413 time_t close_timestamp;
1414 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1415
070b6a86
MD
1416 if (!relayd_supports_chunks(sock)) {
1417 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1418 goto end;
1419 }
1420
bbc4768c
JG
1421 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1422 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1423 ERR("Failed to get trace chunk id");
1424 ret = -1;
1425 goto end;
1426 }
1427
1428 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1429 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1430 ERR("Failed to get trace chunk close timestamp");
1431 ret = -1;
1432 goto end;
1433 }
1434
1435 status = lttng_trace_chunk_get_close_command(chunk,
1436 &close_command.value);
1437 switch (status) {
1438 case LTTNG_TRACE_CHUNK_STATUS_OK:
1439 close_command.is_set = 1;
1440 break;
1441 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1442 break;
1443 default:
1444 ERR("Failed to get trace chunk close command");
1445 ret = -1;
1446 goto end;
1447 }
1448
1449 msg = (typeof(msg)){
1450 .chunk_id = htobe64(chunk_id),
1451 .close_timestamp = htobe64((uint64_t) close_timestamp),
1452 .close_command = {
bbc4768c 1453 .is_set = close_command.is_set,
55caead7 1454 .value = htobe32((uint32_t) close_command.value),
bbc4768c
JG
1455 },
1456 };
1457
1458 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1459 0);
1460 if (ret < 0) {
1461 ERR("Failed to send trace chunk close command to relay daemon");
1462 goto end;
1463 }
1464
1465 ret = recv_reply(sock, &reply, sizeof(reply));
1466 if (ret < 0) {
1467 ERR("Failed to receive relay daemon trace chunk close command reply");
1468 goto end;
1469 }
1470
ecd1a12f
MD
1471 reply.path_length = be32toh(reply.path_length);
1472 if (reply.path_length >= LTTNG_PATH_MAX) {
1473 ERR("Chunk path too long");
1474 ret = -1;
1475 goto end;
1476 }
1477
1478 ret = recv_reply(sock, path, reply.path_length);
1479 if (ret < 0) {
1480 ERR("Failed to receive relay daemon trace chunk close command reply");
1481 goto end;
1482 }
1483 if (path[reply.path_length - 1] != '\0') {
1484 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1485 ret = -1;
1486 goto end;
1487 }
1488
1489 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1490 if (reply.generic.ret_code != LTTNG_OK) {
bbc4768c
JG
1491 ret = -1;
1492 ERR("Relayd trace chunk close replied error %d",
ecd1a12f 1493 reply.generic.ret_code);
bbc4768c
JG
1494 } else {
1495 ret = 0;
1496 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1497 chunk_id);
1498 }
1499end:
1500 return ret;
1501}
c35f9726
JG
1502
1503int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1504 uint64_t chunk_id, bool *chunk_exists)
1505{
1506 int ret = 0;
1507 struct lttcomm_relayd_trace_chunk_exists msg = {};
1508 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1509
070b6a86
MD
1510 if (!relayd_supports_chunks(sock)) {
1511 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
caa15afd
JR
1512 /* The chunk will never exist */
1513 *chunk_exists = false;
070b6a86
MD
1514 goto end;
1515 }
1516
c35f9726
JG
1517 msg = (typeof(msg)){
1518 .chunk_id = htobe64(chunk_id),
1519 };
1520
1521 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1522 0);
1523 if (ret < 0) {
1524 ERR("Failed to send trace chunk exists command to relay daemon");
1525 goto end;
1526 }
1527
1528 ret = recv_reply(sock, &reply, sizeof(reply));
1529 if (ret < 0) {
1530 ERR("Failed to receive relay daemon trace chunk close command reply");
1531 goto end;
1532 }
1533
1534 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1535 if (reply.generic.ret_code != LTTNG_OK) {
1536 ret = -1;
1537 ERR("Relayd trace chunk close replied error %d",
1538 reply.generic.ret_code);
1539 } else {
1540 ret = 0;
1541 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1542 ", exists = %s", chunk_id,
1543 reply.trace_chunk_exists ? "true" : "false");
1544 *chunk_exists = !!reply.trace_chunk_exists;
1545 }
1546end:
1547 return ret;
1548}
8614e600 1549
f4c5b127 1550enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
8614e600 1551 uint64_t query_flags,
f4c5b127
JR
1552 uint64_t& result_flags,
1553 uint64_t *trace_format_query_results)
8614e600 1554{
f4c5b127
JR
1555 int ret;
1556 enum lttng_error_code ret_code = LTTNG_OK;
1557 struct lttcomm_relayd_get_configuration msg = {};
8614e600 1558 struct lttcomm_relayd_get_configuration_reply reply = {};
f4c5b127
JR
1559 lttng_dynamic_buffer buffer;
1560 bool requesting_trace_format = query_flags &
1561 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1562
1563 lttng_dynamic_buffer_init(&buffer);
1564
1565 assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
1566 assert(!(requesting_trace_format && !trace_format_query_results));
8614e600
MD
1567
1568 if (!relayd_supports_get_configuration(sock)) {
1569 DBG("Refusing to get relayd configuration (unsupported by relayd)");
f4c5b127
JR
1570 result_flags = 0;
1571 if (trace_format_query_results) {
1572 *trace_format_query_results =
1573 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1574 }
8614e600
MD
1575 goto end;
1576 }
1577
f4c5b127
JR
1578 if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
1579 /*
1580 * Provide default value for that query since lttng-relayd does
1581 * not know this query type.
1582 */
1583 if (trace_format_query_results) {
1584 *trace_format_query_results =
1585 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1586 }
1587 /* Remove from the query set. */
1588 query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1589 requesting_trace_format = false;
1590 }
1591
1592 msg.query_flags = htobe64(query_flags);
1593
1594 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
8614e600
MD
1595 if (ret < 0) {
1596 ERR("Failed to send get configuration command to relay daemon");
f4c5b127 1597 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
8614e600
MD
1598 goto end;
1599 }
1600
1601 ret = recv_reply(sock, &reply, sizeof(reply));
1602 if (ret < 0) {
1603 ERR("Failed to receive relay daemon get configuration command reply");
f4c5b127 1604 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
8614e600
MD
1605 goto end;
1606 }
1607
f4c5b127
JR
1608 ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
1609 if (ret_code != LTTNG_OK) {
1610 ERR("Relayd get configuration replied error %d", ret_code);
1611 goto end;
8614e600 1612 }
f4c5b127
JR
1613
1614 result_flags = be64toh(reply.relayd_configuration_flags);
1615 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1616 ", results_flags = %" PRIu64,
1617 query_flags, result_flags);
1618
1619 if (!requesting_trace_format) {
1620 ret_code = LTTNG_OK;
1621 goto end;
1622 }
1623
1624 /* Receive trace formats */
1625 {
1626 lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
1627 ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
1628 if (ret < 0) {
1629 ERR("Failed to receive relay daemon get configuration query flag data");
1630 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1631 goto end;
1632 }
1633
1634 query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
1635 LTTNG_ASSERT(query_flag_reply.query_flag &
1636 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
1637
1638 query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
1639 LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
1640
1641 lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
1642
1643 ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
1644 if (ret < 0) {
1645 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1646 ERR("Failed to receive configuration dynamic payload for flag TODO");
1647 goto end;
1648 }
1649
1650 *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
1651 }
1652
1653 ret_code = LTTNG_OK;
1654
8614e600 1655end:
f4c5b127
JR
1656 lttng_dynamic_buffer_reset(&buffer);
1657 return ret_code;
8614e600 1658}
This page took 0.178247 seconds and 5 git commands to generate.