Propagate trace format to relayd on session creation
[lttng-tools.git] / src / common / relayd / relayd.cpp
CommitLineData
00e2e675 1/*
ab5be9fa 2 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
00e2e675 3 *
ab5be9fa 4 * SPDX-License-Identifier: GPL-2.0-only
00e2e675 5 *
00e2e675
DG
6 */
7
6c1c0768 8#define _LGPL_SOURCE
00e2e675
DG
9#include <stdio.h>
10#include <stdlib.h>
11#include <string.h>
12#include <sys/stat.h>
77c7c900 13#include <inttypes.h>
00e2e675 14
f4c5b127 15#include <bitset>
c9e313bc 16#include <common/common.hpp>
c9e313bc
SM
17#include <common/compat/endian.hpp>
18#include <common/compat/string.hpp>
f4c5b127 19#include <common/defaults.hpp>
c9e313bc 20#include <common/index/ctf-index.hpp>
f4c5b127 21#include <common/sessiond-comm/relayd.hpp>
c9e313bc 22#include <common/string-utils/format.hpp>
f4c5b127 23#include <common/trace-chunk.hpp>
8476ce3a 24#include <lttng/trace-format-descriptor-internal.hpp>
00e2e675 25
c9e313bc 26#include "relayd.hpp"
00e2e675 27
070b6a86
MD
28static
29bool relayd_supports_chunks(const struct lttcomm_relayd_sock *sock)
30{
31 if (sock->major > 2) {
32 return true;
33 } else if (sock->major == 2 && sock->minor >= 11) {
34 return true;
35 }
36 return false;
37}
38
8614e600
MD
39static
40bool relayd_supports_get_configuration(const struct lttcomm_relayd_sock *sock)
41{
42 if (sock->major > 2) {
43 return true;
44 } else if (sock->major == 2 && sock->minor >= 12) {
45 return true;
46 }
47 return false;
48}
49
f4c5b127
JR
50static bool relayd_supports_get_configuration_trace_format(const struct lttcomm_relayd_sock *sock)
51{
52 if (sock->major > 2) {
53 return true;
54 } else if (sock->major == 2 && sock->minor >= 15) {
55 return true;
56 }
57 return false;
58}
59
00e2e675
DG
60/*
61 * Send command. Fill up the header and append the data.
62 */
6151a90f 63static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 64 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
65 int flags)
66{
67 int ret;
68 struct lttcomm_relayd_hdr header;
69 char *buf;
70 uint64_t buf_size = sizeof(header);
71
f96e4545
MD
72 if (rsock->sock.fd < 0) {
73 return -ECONNRESET;
74 }
75
00e2e675
DG
76 if (data) {
77 buf_size += size;
78 }
79
64803277 80 buf = calloc<char>(buf_size);
00e2e675
DG
81 if (buf == NULL) {
82 PERROR("zmalloc relayd send command buf");
f4c5b127 83 ret = -ENOMEM;
00e2e675
DG
84 goto alloc_error;
85 }
86
53efb85a 87 memset(&header, 0, sizeof(header));
00e2e675
DG
88 header.cmd = htobe32(cmd);
89 header.data_size = htobe64(size);
90
91 /* Zeroed for now since not used. */
92 header.cmd_version = 0;
93 header.circuit_id = 0;
94
95 /* Prepare buffer to send. */
96 memcpy(buf, &header, sizeof(header));
97 if (data) {
98 memcpy(buf + sizeof(header), data, size);
99 }
100
00e71031
FD
101 DBG3("Relayd sending command %s of size %" PRIu64,
102 lttcomm_relayd_command_str(cmd), buf_size);
6151a90f 103 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 104 if (ret < 0) {
00e71031
FD
105 PERROR("Failed to send command %s of size %" PRIu64,
106 lttcomm_relayd_command_str(cmd), buf_size);
8994307f 107 ret = -errno;
00e2e675
DG
108 goto error;
109 }
00e2e675
DG
110error:
111 free(buf);
112alloc_error:
113 return ret;
114}
115
116/*
117 * Receive reply data on socket. This MUST be call after send_command or else
118 * could result in unexpected behavior(s).
119 */
6151a90f 120static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
121{
122 int ret;
123
f96e4545
MD
124 if (rsock->sock.fd < 0) {
125 return -ECONNRESET;
126 }
127
8fd623e0 128 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 129
6151a90f 130 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
131 if (ret <= 0 || ret != size) {
132 if (ret == 0) {
133 /* Orderly shutdown. */
6151a90f 134 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 135 } else {
8fd623e0 136 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 137 rsock->sock.fd, size, ret);
20275fe8
DG
138 }
139 /* Always return -1 here and the caller can use errno. */
140 ret = -1;
00e2e675
DG
141 goto error;
142 }
143
144error:
145 return ret;
146}
147
8476ce3a
JR
148/*
149 * Starting from 2.15, the trace format for the session is sent to lttng-relayd
150 * on creation.
151 * */
152static int relayd_create_session_2_15(struct lttcomm_relayd_sock *rsock,
153 const char *session_name,
154 const char *hostname,
155 const char *base_path,
156 int session_live_timer,
157 unsigned int snapshot,
158 uint64_t sessiond_session_id,
159 const lttng_uuid& sessiond_uuid,
160 const uint64_t *current_chunk_id,
161 time_t creation_time,
162 bool session_name_contains_creation_time,
163 lttcomm_relayd_create_session_reply_2_15 *reply,
164 char *output_path,
165 const lttng::trace_format_descriptor& trace_format)
166{
167 int ret;
168 struct lttcomm_relayd_create_session_2_15 *msg = NULL;
169 size_t session_name_len;
170 size_t hostname_len;
171 size_t base_path_len;
172 size_t msg_length;
173 char *dst;
174
175 if (!base_path) {
176 base_path = "";
177 }
178 /* The three names are sent with a '\0' delimiter between them. */
179 session_name_len = strlen(session_name) + 1;
180 hostname_len = strlen(hostname) + 1;
181 base_path_len = strlen(base_path) + 1;
182
183 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
184 msg = zmalloc<lttcomm_relayd_create_session_2_15>(msg_length);
185 if (!msg) {
186 PERROR("zmalloc create_session_2_11 command message");
187 ret = -1;
188 goto error;
189 }
190
191 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
192 msg->session_name_len = htobe32(session_name_len);
193
194 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
195 msg->hostname_len = htobe32(hostname_len);
196
197 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
198 msg->base_path_len = htobe32(base_path_len);
199
200 dst = msg->names;
201 if (lttng_strncpy(dst, session_name, session_name_len)) {
202 ret = -1;
203 goto error;
204 }
205 dst += session_name_len;
206 if (lttng_strncpy(dst, hostname, hostname_len)) {
207 ret = -1;
208 goto error;
209 }
210 dst += hostname_len;
211 if (lttng_strncpy(dst, base_path, base_path_len)) {
212 ret = -1;
213 goto error;
214 }
215
216 msg->live_timer = htobe32(session_live_timer);
217 msg->snapshot = !!snapshot;
218
219 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
220 msg->session_id = htobe64(sessiond_session_id);
221 msg->session_name_contains_creation_time = session_name_contains_creation_time;
222 msg->trace_format = (uint8_t) trace_format.relayd_type();
223 if (current_chunk_id) {
224 LTTNG_OPTIONAL_SET(&msg->current_chunk_id, htobe64(*current_chunk_id));
225 }
226
227 msg->creation_time = htobe64((uint64_t) creation_time);
228
229 /* Send command */
230 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
231 if (ret < 0) {
232 goto error;
233 }
234 /* Receive response */
235 ret = recv_reply(rsock, reply, sizeof(*reply));
236 if (ret < 0) {
237 goto error;
238 }
239 reply->generic.session_id = be64toh(reply->generic.session_id);
240 reply->generic.ret_code = be32toh(reply->generic.ret_code);
241 reply->output_path_length = be32toh(reply->output_path_length);
242 if (reply->output_path_length >= LTTNG_PATH_MAX) {
243 ERR("Invalid session output path length in reply (%" PRIu32
244 " bytes) exceeds maximal allowed length (%d bytes)",
245 reply->output_path_length, LTTNG_PATH_MAX);
246 ret = -1;
247 goto error;
248 }
249 ret = recv_reply(rsock, output_path, reply->output_path_length);
250 if (ret < 0) {
251 goto error;
252 }
253error:
254 free(msg);
255 return ret;
256}
257
d3e2ba59 258/*
6fa5fe7c
MD
259 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
260 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
261 * Length for both payloads is stored in the msg struct. A new dynamic size
262 * payload size is introduced.
263 */
264static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
fb9a95c4 265 const char *session_name, const char *hostname,
6fa5fe7c
MD
266 const char *base_path, int session_live_timer,
267 unsigned int snapshot, uint64_t sessiond_session_id,
328c2fe7 268 const lttng_uuid& sessiond_uuid, const uint64_t *current_chunk_id,
ecd1a12f
MD
269 time_t creation_time, bool session_name_contains_creation_time,
270 struct lttcomm_relayd_create_session_reply_2_11 *reply,
271 char *output_path)
f86f6389
JR
272{
273 int ret;
274 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
275 size_t session_name_len;
276 size_t hostname_len;
6fa5fe7c 277 size_t base_path_len;
f86f6389 278 size_t msg_length;
6fa5fe7c 279 char *dst;
f86f6389 280
46ef2188
MD
281 if (!base_path) {
282 base_path = "";
283 }
284 /* The three names are sent with a '\0' delimiter between them. */
f86f6389
JR
285 session_name_len = strlen(session_name) + 1;
286 hostname_len = strlen(hostname) + 1;
17e736a5 287 base_path_len = strlen(base_path) + 1;
f86f6389 288
6fa5fe7c 289 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
64803277 290 msg = zmalloc<lttcomm_relayd_create_session_2_11>(msg_length);
f86f6389
JR
291 if (!msg) {
292 PERROR("zmalloc create_session_2_11 command message");
293 ret = -1;
294 goto error;
295 }
296
a0377dfe 297 LTTNG_ASSERT(session_name_len <= UINT32_MAX);
f86f6389
JR
298 msg->session_name_len = htobe32(session_name_len);
299
a0377dfe 300 LTTNG_ASSERT(hostname_len <= UINT32_MAX);
f86f6389
JR
301 msg->hostname_len = htobe32(hostname_len);
302
a0377dfe 303 LTTNG_ASSERT(base_path_len <= UINT32_MAX);
6fa5fe7c
MD
304 msg->base_path_len = htobe32(base_path_len);
305
306 dst = msg->names;
307 if (lttng_strncpy(dst, session_name, session_name_len)) {
308 ret = -1;
309 goto error;
310 }
311 dst += session_name_len;
312 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
313 ret = -1;
314 goto error;
315 }
6fa5fe7c 316 dst += hostname_len;
36f9f13b 317 if (lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
318 ret = -1;
319 goto error;
320 }
321
322 msg->live_timer = htobe32(session_live_timer);
323 msg->snapshot = !!snapshot;
324
328c2fe7 325 std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg->sessiond_uuid);
658f12fa 326 msg->session_id = htobe64(sessiond_session_id);
46ef2188 327 msg->session_name_contains_creation_time = session_name_contains_creation_time;
f39bd140
JG
328 if (current_chunk_id) {
329 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
330 htobe64(*current_chunk_id));
331 }
332
db1da059
JG
333 msg->creation_time = htobe64((uint64_t) creation_time);
334
f86f6389
JR
335 /* Send command */
336 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
337 if (ret < 0) {
338 goto error;
339 }
ecd1a12f
MD
340 /* Receive response */
341 ret = recv_reply(rsock, reply, sizeof(*reply));
342 if (ret < 0) {
343 goto error;
344 }
345 reply->generic.session_id = be64toh(reply->generic.session_id);
346 reply->generic.ret_code = be32toh(reply->generic.ret_code);
347 reply->output_path_length = be32toh(reply->output_path_length);
348 if (reply->output_path_length >= LTTNG_PATH_MAX) {
349 ERR("Invalid session output path length in reply (%" PRIu32 " bytes) exceeds maximal allowed length (%d bytes)",
350 reply->output_path_length, LTTNG_PATH_MAX);
351 ret = -1;
352 goto error;
353 }
354 ret = recv_reply(rsock, output_path, reply->output_path_length);
355 if (ret < 0) {
356 goto error;
357 }
f86f6389
JR
358error:
359 free(msg);
360 return ret;
361}
362/*
363 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
364 * support the live reading capability.
365 */
366static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
fb9a95c4 367 const char *session_name, const char *hostname,
ecd1a12f
MD
368 int session_live_timer, unsigned int snapshot,
369 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
370{
371 int ret;
372 struct lttcomm_relayd_create_session_2_4 msg;
373
3a13ffd5
MD
374 if (lttng_strncpy(msg.session_name, session_name,
375 sizeof(msg.session_name))) {
246777db
MD
376 ret = -1;
377 goto error;
378 }
3a13ffd5 379 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
380 ret = -1;
381 goto error;
382 }
d3e2ba59 383 msg.live_timer = htobe32(session_live_timer);
7d2f7452 384 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
385
386 /* Send command */
387 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
388 if (ret < 0) {
389 goto error;
390 }
391
ecd1a12f
MD
392 /* Receive response */
393 ret = recv_reply(rsock, reply, sizeof(*reply));
394 if (ret < 0) {
395 goto error;
396 }
397 reply->session_id = be64toh(reply->session_id);
398 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
399error:
400 return ret;
401}
402
403/*
404 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
405 */
ecd1a12f
MD
406static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock,
407 struct lttcomm_relayd_status_session *reply)
d3e2ba59
JD
408{
409 int ret;
410
411 /* Send command */
412 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
413 if (ret < 0) {
414 goto error;
415 }
416
ecd1a12f
MD
417 /* Receive response */
418 ret = recv_reply(rsock, reply, sizeof(*reply));
419 if (ret < 0) {
420 goto error;
421 }
422 reply->session_id = be64toh(reply->session_id);
423 reply->ret_code = be32toh(reply->ret_code);
d3e2ba59
JD
424error:
425 return ret;
426}
427
c5b6f4f0
DG
428/*
429 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
430 * set session_id of the relayd if we have a successful reply from the relayd.
431 *
20275fe8
DG
432 * On success, return 0 else a negative value which is either an errno error or
433 * a lttng error code from the relayd.
c5b6f4f0 434 */
fb9a95c4
JG
435int relayd_create_session(struct lttcomm_relayd_sock *rsock,
436 uint64_t *relayd_session_id,
8476ce3a
JR
437 const char *session_name,
438 const char *hostname,
439 const char *base_path,
440 int session_live_timer,
441 unsigned int snapshot,
442 uint64_t sessiond_session_id,
328c2fe7 443 const lttng_uuid& sessiond_uuid,
db1da059 444 const uint64_t *current_chunk_id,
8476ce3a
JR
445 time_t creation_time,
446 bool session_name_contains_creation_time,
447 char *output_path,
448 lttng::trace_format_descriptor& trace_format)
c5b6f4f0
DG
449{
450 int ret;
8476ce3a 451 lttcomm_relayd_create_session_reply_2_15 reply = {};
c5b6f4f0 452
a0377dfe
FD
453 LTTNG_ASSERT(rsock);
454 LTTNG_ASSERT(relayd_session_id);
c5b6f4f0
DG
455
456 DBG("Relayd create session");
457
f86f6389
JR
458 if (rsock->minor < 4) {
459 /* From 2.1 to 2.3 */
ecd1a12f 460 ret = relayd_create_session_2_1(rsock, &reply.generic);
f86f6389
JR
461 } else if (rsock->minor >= 4 && rsock->minor < 11) {
462 /* From 2.4 to 2.10 */
8476ce3a
JR
463 ret = relayd_create_session_2_4(rsock, session_name, hostname, session_live_timer,
464 snapshot, &reply.generic);
465 } else if (rsock->minor < 15) {
466 /* From 2.11 to 2.14 */
467 ret = relayd_create_session_2_11(rsock, session_name, hostname, base_path,
468 session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
469 current_chunk_id, creation_time,
470 session_name_contains_creation_time, &reply, output_path);
f86f6389 471 } else {
8476ce3a
JR
472 ret = relayd_create_session_2_15(rsock, session_name, hostname, base_path,
473 session_live_timer, snapshot, sessiond_session_id, sessiond_uuid,
46ef2188 474 current_chunk_id, creation_time,
8476ce3a
JR
475 session_name_contains_creation_time, &reply, output_path,
476 trace_format);
c5b6f4f0
DG
477 }
478
c5b6f4f0
DG
479 if (ret < 0) {
480 goto error;
481 }
482
c5b6f4f0 483 /* Return session id or negative ret code. */
ecd1a12f 484 if (reply.generic.ret_code != LTTNG_OK) {
bb63afd9 485 ret = -1;
ecd1a12f
MD
486 ERR("Relayd create session replied error %d",
487 reply.generic.ret_code);
c5b6f4f0
DG
488 goto error;
489 } else {
490 ret = 0;
ecd1a12f 491 *relayd_session_id = reply.generic.session_id;
c5b6f4f0
DG
492 }
493
ecd1a12f 494 DBG("Relayd session created with id %" PRIu64, reply.generic.session_id);
c5b6f4f0
DG
495
496error:
497 return ret;
498}
499
2f21a469
JR
500static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
501 const char *channel_name, const char *pathname)
502{
503 int ret;
504 struct lttcomm_relayd_add_stream msg;
505
506 memset(&msg, 0, sizeof(msg));
507 if (lttng_strncpy(msg.channel_name, channel_name,
508 sizeof(msg.channel_name))) {
509 ret = -1;
510 goto error;
511 }
512
513 if (lttng_strncpy(msg.pathname, pathname,
514 sizeof(msg.pathname))) {
515 ret = -1;
516 goto error;
517 }
518
519 /* Send command */
520 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
521 if (ret < 0) {
522 ret = -1;
523 goto error;
524 }
525 ret = 0;
526error:
527 return ret;
528}
529
530static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
531 const char *channel_name, const char *pathname,
532 uint64_t tracefile_size, uint64_t tracefile_count)
533{
534 int ret;
535 struct lttcomm_relayd_add_stream_2_2 msg;
536
537 memset(&msg, 0, sizeof(msg));
538 /* Compat with relayd 2.2 to 2.10 */
539 if (lttng_strncpy(msg.channel_name, channel_name,
540 sizeof(msg.channel_name))) {
541 ret = -1;
542 goto error;
543 }
544 if (lttng_strncpy(msg.pathname, pathname,
545 sizeof(msg.pathname))) {
546 ret = -1;
547 goto error;
548 }
549 msg.tracefile_size = htobe64(tracefile_size);
550 msg.tracefile_count = htobe64(tracefile_count);
551
552 /* Send command */
553 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
554 if (ret < 0) {
555 goto error;
556 }
557 ret = 0;
558error:
559 return ret;
560}
561
562static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
563 const char *channel_name, const char *pathname,
b00e554e
JG
564 uint64_t tracefile_size, uint64_t tracefile_count,
565 uint64_t trace_archive_id)
2f21a469
JR
566{
567 int ret;
568 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
569 size_t channel_name_len;
570 size_t pathname_len;
571 size_t msg_length;
572
573 /* The two names are sent with a '\0' delimiter between them. */
574 channel_name_len = strlen(channel_name) + 1;
575 pathname_len = strlen(pathname) + 1;
576
577 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
64803277 578 msg = zmalloc<lttcomm_relayd_add_stream_2_11>(msg_length);
2f21a469
JR
579 if (!msg) {
580 PERROR("zmalloc add_stream_2_11 command message");
581 ret = -1;
582 goto error;
583 }
584
a0377dfe 585 LTTNG_ASSERT(channel_name_len <= UINT32_MAX);
2f21a469
JR
586 msg->channel_name_len = htobe32(channel_name_len);
587
a0377dfe 588 LTTNG_ASSERT(pathname_len <= UINT32_MAX);
2f21a469
JR
589 msg->pathname_len = htobe32(pathname_len);
590
591 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
592 ret = -1;
593 goto error;
594 }
595 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
596 ret = -1;
597 goto error;
598 }
599
600 msg->tracefile_size = htobe64(tracefile_size);
601 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 602 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
603
604 /* Send command */
605 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
606 if (ret < 0) {
607 goto error;
608 }
609 ret = 0;
610error:
611 free(msg);
612 return ret;
613}
614
00e2e675
DG
615/*
616 * Add stream on the relayd and assign stream handle to the stream_id argument.
617 *
070b6a86
MD
618 * Chunks are not supported by relayd prior to 2.11, but are used to
619 * internally between session daemon and consumer daemon to keep track
620 * of the channel and stream output path.
621 *
00e2e675
DG
622 * On success return 0 else return ret_code negative value.
623 */
6151a90f 624int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
5da88b0f 625 const char *domain_name, const char *_pathname, uint64_t *stream_id,
0b50e4b3 626 uint64_t tracefile_size, uint64_t tracefile_count,
d2956687 627 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
628{
629 int ret;
00e2e675 630 struct lttcomm_relayd_status_stream reply;
5da88b0f 631 char pathname[RELAYD_COMM_LTTNG_PATH_MAX];
00e2e675
DG
632
633 /* Code flow error. Safety net. */
a0377dfe
FD
634 LTTNG_ASSERT(rsock);
635 LTTNG_ASSERT(channel_name);
636 LTTNG_ASSERT(domain_name);
637 LTTNG_ASSERT(_pathname);
638 LTTNG_ASSERT(trace_chunk);
00e2e675
DG
639
640 DBG("Relayd adding stream for channel name %s", channel_name);
641
0f907de1
JD
642 /* Compat with relayd 2.1 */
643 if (rsock->minor == 1) {
2f21a469 644 /* For 2.1 */
5fab2976 645 ret = relayd_add_stream_2_1(rsock, channel_name, _pathname);
55caead7 646
2f21a469
JR
647 } else if (rsock->minor > 1 && rsock->minor < 11) {
648 /* From 2.2 to 2.10 */
5fab2976 649 ret = relayd_add_stream_2_2(rsock, channel_name, _pathname,
2f21a469 650 tracefile_size, tracefile_count);
0f907de1 651 } else {
5fab2976 652 const char *separator;
d2956687
JG
653 enum lttng_trace_chunk_status chunk_status;
654 uint64_t chunk_id;
655
5fab2976
JR
656 if (_pathname[0] == '\0') {
657 separator = "";
658 } else {
659 separator = "/";
660 }
661
662 ret = snprintf(pathname, RELAYD_COMM_LTTNG_PATH_MAX, "%s%s%s",
663 domain_name, separator, _pathname);
664 if (ret <= 0 || ret >= RELAYD_COMM_LTTNG_PATH_MAX) {
665 ERR("Failed to format stream path: %s",
666 ret <= 0 ? "formatting error" :
667 "path exceeds maximal allowed length");
668 ret = -1;
669 goto error;
670 }
671
d2956687
JG
672 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
673 &chunk_id);
a0377dfe 674 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 675
2f21a469
JR
676 /* From 2.11 to ...*/
677 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e 678 tracefile_size, tracefile_count,
d2956687 679 chunk_id);
2f21a469 680 }
0f907de1 681
2f21a469
JR
682 if (ret) {
683 ret = -1;
684 goto error;
00e2e675
DG
685 }
686
633d0084 687 /* Waiting for reply */
6151a90f 688 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
689 if (ret < 0) {
690 goto error;
691 }
692
693 /* Back to host bytes order. */
694 reply.handle = be64toh(reply.handle);
695 reply.ret_code = be32toh(reply.ret_code);
696
697 /* Return session id or negative ret code. */
f73fabfd 698 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
699 ret = -1;
700 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
701 } else {
702 /* Success */
703 ret = 0;
704 *stream_id = reply.handle;
705 }
706
77c7c900 707 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 708 reply.handle);
00e2e675
DG
709
710error:
711 return ret;
712}
713
a4baae1b
JD
714/*
715 * Inform the relay that all the streams for the current channel has been sent.
716 *
717 * On success return 0 else return ret_code negative value.
718 */
719int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
720{
721 int ret;
722 struct lttcomm_relayd_generic_reply reply;
723
724 /* Code flow error. Safety net. */
a0377dfe 725 LTTNG_ASSERT(rsock);
a4baae1b
JD
726
727 DBG("Relayd sending streams sent.");
728
729 /* This feature was introduced in 2.4, ignore it for earlier versions. */
730 if (rsock->minor < 4) {
731 ret = 0;
732 goto end;
733 }
734
735 /* Send command */
736 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
737 if (ret < 0) {
738 goto error;
739 }
740
741 /* Waiting for reply */
742 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
743 if (ret < 0) {
744 goto error;
745 }
746
747 /* Back to host bytes order. */
748 reply.ret_code = be32toh(reply.ret_code);
749
750 /* Return session id or negative ret code. */
751 if (reply.ret_code != LTTNG_OK) {
752 ret = -1;
753 ERR("Relayd streams sent replied error %d", reply.ret_code);
754 goto error;
755 } else {
756 /* Success */
757 ret = 0;
758 }
759
760 DBG("Relayd streams sent success");
761
762error:
763end:
764 return ret;
765}
766
00e2e675
DG
767/*
768 * Check version numbers on the relayd.
d4519fa3
JD
769 * If major versions are compatible, we assign minor_to_use to the
770 * minor version of the procotol we are going to use for this session.
00e2e675 771 *
67d5aa28
JD
772 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
773 * otherwise, or a negative value on network errors.
00e2e675 774 */
6151a90f 775int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
776{
777 int ret;
092b6259 778 struct lttcomm_relayd_version msg;
00e2e675
DG
779
780 /* Code flow error. Safety net. */
a0377dfe 781 LTTNG_ASSERT(rsock);
00e2e675 782
6151a90f
JD
783 DBG("Relayd version check for major.minor %u.%u", rsock->major,
784 rsock->minor);
00e2e675 785
53efb85a 786 memset(&msg, 0, sizeof(msg));
092b6259 787 /* Prepare network byte order before transmission. */
6151a90f
JD
788 msg.major = htobe32(rsock->major);
789 msg.minor = htobe32(rsock->minor);
092b6259 790
00e2e675 791 /* Send command */
6151a90f 792 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
793 if (ret < 0) {
794 goto error;
795 }
796
20275fe8 797 /* Receive response */
6151a90f 798 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
799 if (ret < 0) {
800 goto error;
801 }
802
803 /* Set back to host bytes order */
092b6259
DG
804 msg.major = be32toh(msg.major);
805 msg.minor = be32toh(msg.minor);
806
807 /*
808 * Only validate the major version. If the other side is higher,
809 * communication is not possible. Only major version equal can talk to each
810 * other. If the minor version differs, the lowest version is used by both
811 * sides.
092b6259 812 */
6151a90f 813 if (msg.major != rsock->major) {
d4519fa3 814 /* Not compatible */
67d5aa28 815 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 816 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 817 msg.major, rsock->major);
092b6259 818 goto error;
00e2e675
DG
819 }
820
092b6259 821 /*
6151a90f
JD
822 * If the relayd's minor version is higher, it will adapt to our version so
823 * we can continue to use the latest relayd communication data structure.
824 * If the received minor version is higher, the relayd should adapt to us.
092b6259 825 */
6151a90f
JD
826 if (rsock->minor > msg.minor) {
827 rsock->minor = msg.minor;
d4519fa3 828 }
092b6259 829
d4519fa3
JD
830 /* Version number compatible */
831 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 832 rsock->major, rsock->minor);
d4519fa3 833 ret = 0;
00e2e675
DG
834
835error:
836 return ret;
837}
838
00e2e675
DG
839/*
840 * Add stream on the relayd and assign stream handle to the stream_id argument.
841 *
842 * On success return 0 else return ret_code negative value.
843 */
6151a90f 844int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
845{
846 int ret;
847
848 /* Code flow error. Safety net. */
a0377dfe 849 LTTNG_ASSERT(rsock);
00e2e675 850
77c7c900 851 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
852
853 /* Send command */
6151a90f 854 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
855 if (ret < 0) {
856 goto error;
857 }
858
859 DBG2("Relayd metadata added successfully");
860
861 /*
862 * After that call, the metadata data MUST be sent to the relayd so the
863 * receive size on the other end matches the len of the metadata packet
633d0084 864 * header. This is why we don't wait for a reply here.
00e2e675
DG
865 */
866
867error:
868 return ret;
869}
870
871/*
6151a90f 872 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 873 */
6151a90f 874int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
875{
876 /* Code flow error. Safety net. */
a0377dfe 877 LTTNG_ASSERT(rsock);
00e2e675 878
f96e4545
MD
879 if (!rsock->sock.ops) {
880 /*
881 * Attempting a connect on a non-initialized socket.
882 */
883 return -ECONNRESET;
884 }
885
00e2e675
DG
886 DBG3("Relayd connect ...");
887
6151a90f 888 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
889}
890
891/*
6151a90f 892 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
893 *
894 * If no socket operations are found, simply return 0 meaning that everything
895 * is fine. Without operations, the socket can not possibly be opened or used.
896 * This is possible if the socket was allocated but not created. However, the
897 * caller could simply use it to store a valid file descriptor for instance
898 * passed over a Unix socket and call this to cleanup but still without a valid
899 * ops pointer.
900 *
901 * Return the close returned value. On error, a negative value is usually
902 * returned back from close(2).
00e2e675 903 */
6151a90f 904int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 905{
ffe60014
DG
906 int ret;
907
00e2e675 908 /* Code flow error. Safety net. */
a0377dfe 909 LTTNG_ASSERT(rsock);
00e2e675 910
ffe60014 911 /* An invalid fd is fine, return success. */
6151a90f 912 if (rsock->sock.fd < 0) {
ffe60014
DG
913 ret = 0;
914 goto end;
915 }
916
6151a90f 917 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 918
6151a90f
JD
919 if (rsock->sock.ops) {
920 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
921 } else {
922 /* Default call if no specific ops found. */
6151a90f 923 ret = close(rsock->sock.fd);
ffe60014
DG
924 if (ret < 0) {
925 PERROR("relayd_close default close");
926 }
927 }
f96e4545 928 rsock->sock.fd = -1;
ffe60014
DG
929
930end:
931 return ret;
00e2e675
DG
932}
933
934/*
935 * Send data header structure to the relayd.
936 */
6151a90f 937int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
938 struct lttcomm_relayd_data_hdr *hdr, size_t size)
939{
940 int ret;
941
942 /* Code flow error. Safety net. */
a0377dfe
FD
943 LTTNG_ASSERT(rsock);
944 LTTNG_ASSERT(hdr);
00e2e675 945
f96e4545
MD
946 if (rsock->sock.fd < 0) {
947 return -ECONNRESET;
948 }
949
8fd623e0 950 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
951
952 /* Again, safety net */
953 if (size == 0) {
954 size = sizeof(struct lttcomm_relayd_data_hdr);
955 }
956
957 /* Only send data header. */
6151a90f 958 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 959 if (ret < 0) {
8994307f 960 ret = -errno;
00e2e675
DG
961 goto error;
962 }
963
964 /*
965 * The data MUST be sent right after that command for the receive on the
966 * other end to match the size in the header.
967 */
968
969error:
970 return ret;
971}
173af62f
DG
972
973/*
974 * Send close stream command to the relayd.
975 */
6151a90f 976int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
977 uint64_t last_net_seq_num)
978{
979 int ret;
980 struct lttcomm_relayd_close_stream msg;
981 struct lttcomm_relayd_generic_reply reply;
982
983 /* Code flow error. Safety net. */
a0377dfe 984 LTTNG_ASSERT(rsock);
173af62f 985
77c7c900 986 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 987
53efb85a 988 memset(&msg, 0, sizeof(msg));
173af62f
DG
989 msg.stream_id = htobe64(stream_id);
990 msg.last_net_seq_num = htobe64(last_net_seq_num);
991
992 /* Send command */
6151a90f 993 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
994 if (ret < 0) {
995 goto error;
996 }
997
20275fe8 998 /* Receive response */
6151a90f 999 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
1000 if (ret < 0) {
1001 goto error;
1002 }
1003
1004 reply.ret_code = be32toh(reply.ret_code);
1005
1006 /* Return session id or negative ret code. */
f73fabfd 1007 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1008 ret = -1;
1009 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
1010 } else {
1011 /* Success */
1012 ret = 0;
1013 }
1014
77c7c900 1015 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
1016
1017error:
1018 return ret;
1019}
c8f59ee5
DG
1020
1021/*
1022 * Check for data availability for a given stream id.
1023 *
6d805429 1024 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 1025 */
6151a90f 1026int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
1027 uint64_t last_net_seq_num)
1028{
1029 int ret;
6d805429 1030 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
1031 struct lttcomm_relayd_generic_reply reply;
1032
1033 /* Code flow error. Safety net. */
a0377dfe 1034 LTTNG_ASSERT(rsock);
c8f59ee5 1035
6d805429 1036 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 1037
53efb85a 1038 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
1039 msg.stream_id = htobe64(stream_id);
1040 msg.last_net_seq_num = htobe64(last_net_seq_num);
1041
1042 /* Send command */
6151a90f 1043 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
1044 sizeof(msg), 0);
1045 if (ret < 0) {
1046 goto error;
1047 }
1048
20275fe8 1049 /* Receive response */
6151a90f 1050 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
1051 if (ret < 0) {
1052 goto error;
1053 }
1054
1055 reply.ret_code = be32toh(reply.ret_code);
1056
1057 /* Return session id or negative ret code. */
1058 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 1059 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
1060 }
1061
1062 /* At this point, the ret code is either 1 or 0 */
1063 ret = reply.ret_code;
1064
6d805429 1065 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 1066 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
1067
1068error:
1069 return ret;
1070}
1071
1072/*
1073 * Check on the relayd side for a quiescent state on the control socket.
1074 */
6151a90f 1075int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 1076 uint64_t metadata_stream_id)
c8f59ee5
DG
1077{
1078 int ret;
ad7051c0 1079 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
1080 struct lttcomm_relayd_generic_reply reply;
1081
1082 /* Code flow error. Safety net. */
a0377dfe 1083 LTTNG_ASSERT(rsock);
c8f59ee5
DG
1084
1085 DBG("Relayd checking quiescent control state");
1086
53efb85a 1087 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
1088 msg.stream_id = htobe64(metadata_stream_id);
1089
c8f59ee5 1090 /* Send command */
6151a90f 1091 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
1092 if (ret < 0) {
1093 goto error;
1094 }
1095
20275fe8 1096 /* Receive response */
6151a90f 1097 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
1098 if (ret < 0) {
1099 goto error;
1100 }
1101
1102 reply.ret_code = be32toh(reply.ret_code);
1103
1104 /* Return session id or negative ret code. */
1105 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1106 ret = -1;
1107 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
1108 goto error;
1109 }
1110
1111 /* Control socket is quiescent */
6d805429 1112 return 0;
c8f59ee5
DG
1113
1114error:
1115 return ret;
1116}
f7079f67
DG
1117
1118/*
1119 * Begin a data pending command for a specific session id.
1120 */
6151a90f 1121int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
1122{
1123 int ret;
1124 struct lttcomm_relayd_begin_data_pending msg;
1125 struct lttcomm_relayd_generic_reply reply;
1126
1127 /* Code flow error. Safety net. */
a0377dfe 1128 LTTNG_ASSERT(rsock);
f7079f67
DG
1129
1130 DBG("Relayd begin data pending");
1131
53efb85a 1132 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1133 msg.session_id = htobe64(id);
1134
1135 /* Send command */
6151a90f 1136 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1137 if (ret < 0) {
1138 goto error;
1139 }
1140
20275fe8 1141 /* Receive response */
6151a90f 1142 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1143 if (ret < 0) {
1144 goto error;
1145 }
1146
1147 reply.ret_code = be32toh(reply.ret_code);
1148
1149 /* Return session id or negative ret code. */
1150 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
1151 ret = -1;
1152 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
1153 goto error;
1154 }
1155
1156 return 0;
1157
1158error:
1159 return ret;
1160}
1161
1162/*
1163 * End a data pending command for a specific session id.
1164 *
1165 * Return 0 on success and set is_data_inflight to 0 if no data is being
1166 * streamed or 1 if it is the case.
1167 */
6151a90f 1168int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
1169 unsigned int *is_data_inflight)
1170{
af6c30b5 1171 int ret, recv_ret;
f7079f67
DG
1172 struct lttcomm_relayd_end_data_pending msg;
1173 struct lttcomm_relayd_generic_reply reply;
1174
1175 /* Code flow error. Safety net. */
a0377dfe 1176 LTTNG_ASSERT(rsock);
f7079f67
DG
1177
1178 DBG("Relayd end data pending");
1179
53efb85a 1180 memset(&msg, 0, sizeof(msg));
f7079f67
DG
1181 msg.session_id = htobe64(id);
1182
1183 /* Send command */
6151a90f 1184 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
1185 if (ret < 0) {
1186 goto error;
1187 }
1188
20275fe8 1189 /* Receive response */
6151a90f 1190 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
1191 if (ret < 0) {
1192 goto error;
1193 }
1194
af6c30b5
DG
1195 recv_ret = be32toh(reply.ret_code);
1196 if (recv_ret < 0) {
1197 ret = recv_ret;
f7079f67
DG
1198 goto error;
1199 }
1200
af6c30b5 1201 *is_data_inflight = recv_ret;
f7079f67 1202
af6c30b5 1203 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1204
1205 return 0;
1206
1207error:
1208 return ret;
1209}
1c20f0e2
JD
1210
1211/*
1212 * Send index to the relayd.
1213 */
1214int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 1215 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
1216 uint64_t net_seq_num)
1217{
1218 int ret;
1219 struct lttcomm_relayd_index msg;
1220 struct lttcomm_relayd_generic_reply reply;
1221
1222 /* Code flow error. Safety net. */
a0377dfe 1223 LTTNG_ASSERT(rsock);
1c20f0e2
JD
1224
1225 if (rsock->minor < 4) {
1226 DBG("Not sending indexes before protocol 2.4");
1227 ret = 0;
1228 goto error;
1229 }
1230
1231 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1232
53efb85a 1233 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1234 msg.relay_stream_id = htobe64(relay_stream_id);
1235 msg.net_seq_num = htobe64(net_seq_num);
1236
1237 /* The index is already in big endian. */
1238 msg.packet_size = index->packet_size;
1239 msg.content_size = index->content_size;
1240 msg.timestamp_begin = index->timestamp_begin;
1241 msg.timestamp_end = index->timestamp_end;
1242 msg.events_discarded = index->events_discarded;
1243 msg.stream_id = index->stream_id;
1244
234cd636
JD
1245 if (rsock->minor >= 8) {
1246 msg.stream_instance_id = index->stream_instance_id;
1247 msg.packet_seq_num = index->packet_seq_num;
1248 }
1249
1c20f0e2 1250 /* Send command */
f8f3885c
MD
1251 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1252 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1253 rsock->minor),
1254 lttng_to_index_minor(rsock->major, rsock->minor)),
1255 0);
1c20f0e2
JD
1256 if (ret < 0) {
1257 goto error;
1258 }
1259
1260 /* Receive response */
1261 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1262 if (ret < 0) {
1263 goto error;
1264 }
1265
1266 reply.ret_code = be32toh(reply.ret_code);
1267
1268 /* Return session id or negative ret code. */
1269 if (reply.ret_code != LTTNG_OK) {
1270 ret = -1;
1271 ERR("Relayd send index replied error %d", reply.ret_code);
1272 } else {
1273 /* Success */
1274 ret = 0;
1275 }
1276
1277error:
1278 return ret;
1279}
93ec662e
JD
1280
1281/*
1282 * Ask the relay to reset the metadata trace file (regeneration).
1283 */
1284int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1285 uint64_t stream_id, uint64_t version)
1286{
1287 int ret;
1288 struct lttcomm_relayd_reset_metadata msg;
1289 struct lttcomm_relayd_generic_reply reply;
1290
1291 /* Code flow error. Safety net. */
a0377dfe 1292 LTTNG_ASSERT(rsock);
93ec662e
JD
1293
1294 /* Should have been prevented by the sessiond. */
1295 if (rsock->minor < 8) {
1296 ERR("Metadata regeneration unsupported before 2.8");
1297 ret = -1;
1298 goto error;
1299 }
1300
1301 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1302
1303 memset(&msg, 0, sizeof(msg));
1304 msg.stream_id = htobe64(stream_id);
1305 msg.version = htobe64(version);
1306
1307 /* Send command */
1308 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1309 if (ret < 0) {
1310 goto error;
1311 }
1312
1313 /* Receive response */
1314 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1315 if (ret < 0) {
1316 goto error;
1317 }
1318
1319 reply.ret_code = be32toh(reply.ret_code);
1320
1321 /* Return session id or negative ret code. */
1322 if (reply.ret_code != LTTNG_OK) {
1323 ret = -1;
1324 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1325 } else {
1326 /* Success */
1327 ret = 0;
1328 }
1329
1330 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1331
1332error:
1333 return ret;
1334}
a1ae2ea5 1335
c35f9726 1336int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
ebb29c10 1337 unsigned int stream_count, const uint64_t *new_chunk_id,
c35f9726 1338 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1339{
1340 int ret;
c35f9726
JG
1341 unsigned int i;
1342 struct lttng_dynamic_buffer payload;
1343 struct lttcomm_relayd_generic_reply reply = {};
6d15ee45 1344 struct lttcomm_relayd_rotate_streams msg;
c35f9726
JG
1345 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1346 const char *new_chunk_id_str;
d73bf3d7 1347
6d15ee45
JG
1348 msg.stream_count = htobe32((uint32_t) stream_count);
1349 msg.new_chunk_id = (typeof(msg.new_chunk_id)){
1350 .is_set = !!new_chunk_id,
1351 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1352 };
1353
070b6a86
MD
1354 if (!relayd_supports_chunks(sock)) {
1355 DBG("Refusing to rotate remote streams: relayd does not support chunks");
1356 return 0;
1357 }
1358
c35f9726 1359 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1360
c35f9726 1361 /* Code flow error. Safety net. */
a0377dfe 1362 LTTNG_ASSERT(sock);
d73bf3d7 1363
c35f9726
JG
1364 if (new_chunk_id) {
1365 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1366 "%" PRIu64, *new_chunk_id);
1367 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1368 new_chunk_id_str = "formatting error";
1369 } else {
1370 new_chunk_id_str = new_chunk_id_buf;
1371 }
1372 } else {
1373 new_chunk_id_str = "none";
d73bf3d7
JD
1374 }
1375
c35f9726
JG
1376 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1377 new_chunk_id_str, stream_count);
d73bf3d7 1378
c35f9726
JG
1379 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1380 if (ret) {
1381 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1382 goto error;
1383 }
1384
c35f9726
JG
1385 for (i = 0; i < stream_count; i++) {
1386 const struct relayd_stream_rotation_position *position =
1387 &positions[i];
1388 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1389 .stream_id = htobe64(position->stream_id),
1390 .rotate_at_seq_num = htobe64(
1391 position->rotate_at_seq_num),
1392 };
1393
a269202e 1394 DBG("Rotate stream %" PRIu64 " at sequence number %" PRIu64,
c35f9726
JG
1395 position->stream_id,
1396 position->rotate_at_seq_num);
1397 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1398 sizeof(comm_position));
1399 if (ret) {
1400 ERR("Failed to allocate \"rotate streams\" command payload");
1401 goto error;
1402 }
1403 }
d73bf3d7
JD
1404
1405 /* Send command. */
c35f9726
JG
1406 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1407 payload.size, 0);
d73bf3d7 1408 if (ret < 0) {
c35f9726 1409 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1410 goto error;
1411 }
1412
1413 /* Receive response. */
c35f9726 1414 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1415 if (ret < 0) {
c35f9726 1416 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1417 goto error;
1418 }
1419
1420 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1421 if (reply.ret_code != LTTNG_OK) {
1422 ret = -1;
c35f9726 1423 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1424 } else {
1425 /* Success. */
1426 ret = 0;
c35f9726 1427 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1428 }
1429
1430error:
c35f9726 1431 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1432 return ret;
1433}
e5add6d0
JG
1434
1435int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1436 struct lttng_trace_chunk *chunk)
1437{
1438 int ret = 0;
1439 enum lttng_trace_chunk_status status;
1440 struct lttcomm_relayd_create_trace_chunk msg = {};
1441 struct lttcomm_relayd_generic_reply reply = {};
1442 struct lttng_dynamic_buffer payload;
1443 uint64_t chunk_id;
1444 time_t creation_timestamp;
1445 const char *chunk_name;
1446 size_t chunk_name_length;
913a542b 1447 bool overridden_name;
e5add6d0
JG
1448
1449 lttng_dynamic_buffer_init(&payload);
1450
070b6a86
MD
1451 if (!relayd_supports_chunks(sock)) {
1452 DBG("Refusing to create remote trace chunk: relayd does not support chunks");
1453 goto end;
1454 }
1455
e5add6d0
JG
1456 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1457 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1458 ret = -1;
1459 goto end;
1460 }
1461
1462 status = lttng_trace_chunk_get_creation_timestamp(
1463 chunk, &creation_timestamp);
1464 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1465 ret = -1;
1466 goto end;
1467 }
1468
1469 status = lttng_trace_chunk_get_name(
913a542b 1470 chunk, &chunk_name, &overridden_name);
e5add6d0
JG
1471 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1472 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1473 ret = -1;
1474 goto end;
1475 }
1476
913a542b 1477 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
c15e2d3d
JG
1478 msg.chunk_id = htobe64(chunk_id);
1479 msg.creation_timestamp = htobe64((uint64_t) creation_timestamp);
1480 msg.override_name_length = htobe32((uint32_t) chunk_name_length);
e5add6d0
JG
1481
1482 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1483 if (ret) {
1484 goto end;
1485 }
1486 if (chunk_name_length) {
1487 ret = lttng_dynamic_buffer_append(
1488 &payload, chunk_name, chunk_name_length);
1489 if (ret) {
1490 goto end;
1491 }
1492 }
1493
bbc4768c
JG
1494 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1495 payload.size, 0);
e5add6d0
JG
1496 if (ret < 0) {
1497 ERR("Failed to send trace chunk creation command to relay daemon");
1498 goto end;
1499 }
1500
1501 ret = recv_reply(sock, &reply, sizeof(reply));
1502 if (ret < 0) {
1503 ERR("Failed to receive relay daemon trace chunk creation command reply");
1504 goto end;
1505 }
1506
1507 reply.ret_code = be32toh(reply.ret_code);
1508 if (reply.ret_code != LTTNG_OK) {
1509 ret = -1;
1510 ERR("Relayd trace chunk create replied error %d",
1511 reply.ret_code);
1512 } else {
1513 ret = 0;
1514 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1515 chunk_id);
1516 }
1517
1518end:
1519 lttng_dynamic_buffer_reset(&payload);
1520 return ret;
1521}
bbc4768c
JG
1522
1523int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
ecd1a12f
MD
1524 struct lttng_trace_chunk *chunk,
1525 char *path)
bbc4768c
JG
1526{
1527 int ret = 0;
1528 enum lttng_trace_chunk_status status;
1529 struct lttcomm_relayd_close_trace_chunk msg = {};
ecd1a12f 1530 struct lttcomm_relayd_close_trace_chunk_reply reply = {};
bbc4768c
JG
1531 uint64_t chunk_id;
1532 time_t close_timestamp;
1533 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1534
070b6a86
MD
1535 if (!relayd_supports_chunks(sock)) {
1536 DBG("Refusing to close remote trace chunk: relayd does not support chunks");
1537 goto end;
1538 }
1539
bbc4768c
JG
1540 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1541 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1542 ERR("Failed to get trace chunk id");
1543 ret = -1;
1544 goto end;
1545 }
1546
1547 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1548 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1549 ERR("Failed to get trace chunk close timestamp");
1550 ret = -1;
1551 goto end;
1552 }
1553
1554 status = lttng_trace_chunk_get_close_command(chunk,
1555 &close_command.value);
1556 switch (status) {
1557 case LTTNG_TRACE_CHUNK_STATUS_OK:
1558 close_command.is_set = 1;
1559 break;
1560 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1561 break;
1562 default:
1563 ERR("Failed to get trace chunk close command");
1564 ret = -1;
1565 goto end;
1566 }
1567
1568 msg = (typeof(msg)){
1569 .chunk_id = htobe64(chunk_id),
1570 .close_timestamp = htobe64((uint64_t) close_timestamp),
1571 .close_command = {
bbc4768c 1572 .is_set = close_command.is_set,
55caead7 1573 .value = htobe32((uint32_t) close_command.value),
bbc4768c
JG
1574 },
1575 };
1576
1577 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1578 0);
1579 if (ret < 0) {
1580 ERR("Failed to send trace chunk close command to relay daemon");
1581 goto end;
1582 }
1583
1584 ret = recv_reply(sock, &reply, sizeof(reply));
1585 if (ret < 0) {
1586 ERR("Failed to receive relay daemon trace chunk close command reply");
1587 goto end;
1588 }
1589
ecd1a12f
MD
1590 reply.path_length = be32toh(reply.path_length);
1591 if (reply.path_length >= LTTNG_PATH_MAX) {
1592 ERR("Chunk path too long");
1593 ret = -1;
1594 goto end;
1595 }
1596
1597 ret = recv_reply(sock, path, reply.path_length);
1598 if (ret < 0) {
1599 ERR("Failed to receive relay daemon trace chunk close command reply");
1600 goto end;
1601 }
1602 if (path[reply.path_length - 1] != '\0') {
1603 ERR("Invalid trace chunk path returned by relay daemon (not null-terminated)");
1604 ret = -1;
1605 goto end;
1606 }
1607
1608 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1609 if (reply.generic.ret_code != LTTNG_OK) {
bbc4768c
JG
1610 ret = -1;
1611 ERR("Relayd trace chunk close replied error %d",
ecd1a12f 1612 reply.generic.ret_code);
bbc4768c
JG
1613 } else {
1614 ret = 0;
1615 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1616 chunk_id);
1617 }
1618end:
1619 return ret;
1620}
c35f9726
JG
1621
1622int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1623 uint64_t chunk_id, bool *chunk_exists)
1624{
1625 int ret = 0;
1626 struct lttcomm_relayd_trace_chunk_exists msg = {};
1627 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1628
070b6a86
MD
1629 if (!relayd_supports_chunks(sock)) {
1630 DBG("Refusing to check for trace chunk existence: relayd does not support chunks");
caa15afd
JR
1631 /* The chunk will never exist */
1632 *chunk_exists = false;
070b6a86
MD
1633 goto end;
1634 }
1635
c35f9726
JG
1636 msg = (typeof(msg)){
1637 .chunk_id = htobe64(chunk_id),
1638 };
1639
1640 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1641 0);
1642 if (ret < 0) {
1643 ERR("Failed to send trace chunk exists command to relay daemon");
1644 goto end;
1645 }
1646
1647 ret = recv_reply(sock, &reply, sizeof(reply));
1648 if (ret < 0) {
1649 ERR("Failed to receive relay daemon trace chunk close command reply");
1650 goto end;
1651 }
1652
1653 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1654 if (reply.generic.ret_code != LTTNG_OK) {
1655 ret = -1;
1656 ERR("Relayd trace chunk close replied error %d",
1657 reply.generic.ret_code);
1658 } else {
1659 ret = 0;
1660 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1661 ", exists = %s", chunk_id,
1662 reply.trace_chunk_exists ? "true" : "false");
1663 *chunk_exists = !!reply.trace_chunk_exists;
1664 }
1665end:
1666 return ret;
1667}
8614e600 1668
f4c5b127 1669enum lttng_error_code relayd_get_configuration(struct lttcomm_relayd_sock *sock,
8614e600 1670 uint64_t query_flags,
f4c5b127
JR
1671 uint64_t& result_flags,
1672 uint64_t *trace_format_query_results)
8614e600 1673{
f4c5b127
JR
1674 int ret;
1675 enum lttng_error_code ret_code = LTTNG_OK;
1676 struct lttcomm_relayd_get_configuration msg = {};
8614e600 1677 struct lttcomm_relayd_get_configuration_reply reply = {};
f4c5b127
JR
1678 lttng_dynamic_buffer buffer;
1679 bool requesting_trace_format = query_flags &
1680 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1681
1682 lttng_dynamic_buffer_init(&buffer);
1683
1684 assert(!(query_flags & ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_MASK));
1685 assert(!(requesting_trace_format && !trace_format_query_results));
8614e600
MD
1686
1687 if (!relayd_supports_get_configuration(sock)) {
1688 DBG("Refusing to get relayd configuration (unsupported by relayd)");
f4c5b127
JR
1689 result_flags = 0;
1690 if (trace_format_query_results) {
1691 *trace_format_query_results =
1692 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1693 }
8614e600
MD
1694 goto end;
1695 }
1696
f4c5b127
JR
1697 if (requesting_trace_format && !relayd_supports_get_configuration_trace_format(sock)) {
1698 /*
1699 * Provide default value for that query since lttng-relayd does
1700 * not know this query type.
1701 */
1702 if (trace_format_query_results) {
1703 *trace_format_query_results =
1704 LTTCOMM_RELAYD_CONFIGURATION_TRACE_FORMAT_SUPPORTED_CTF1;
1705 }
1706 /* Remove from the query set. */
1707 query_flags &= ~LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT;
1708 requesting_trace_format = false;
1709 }
1710
1711 msg.query_flags = htobe64(query_flags);
1712
1713 ret = send_command(sock, RELAYD_GET_CONFIGURATION, &msg, sizeof(msg), 0);
8614e600
MD
1714 if (ret < 0) {
1715 ERR("Failed to send get configuration command to relay daemon");
f4c5b127 1716 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
8614e600
MD
1717 goto end;
1718 }
1719
1720 ret = recv_reply(sock, &reply, sizeof(reply));
1721 if (ret < 0) {
1722 ERR("Failed to receive relay daemon get configuration command reply");
f4c5b127 1723 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
8614e600
MD
1724 goto end;
1725 }
1726
f4c5b127
JR
1727 ret_code = static_cast<enum lttng_error_code>(be32toh(reply.generic.ret_code));
1728 if (ret_code != LTTNG_OK) {
1729 ERR("Relayd get configuration replied error %d", ret_code);
1730 goto end;
8614e600 1731 }
f4c5b127
JR
1732
1733 result_flags = be64toh(reply.relayd_configuration_flags);
1734 DBG("Relayd successfully got configuration: query_flags = %" PRIu64
1735 ", results_flags = %" PRIu64,
1736 query_flags, result_flags);
1737
1738 if (!requesting_trace_format) {
1739 ret_code = LTTNG_OK;
1740 goto end;
1741 }
1742
1743 /* Receive trace formats */
1744 {
1745 lttcomm_relayd_get_configuration_specialized_query_reply query_flag_reply = {};
1746 ret = recv_reply(sock, &query_flag_reply, sizeof(query_flag_reply));
1747 if (ret < 0) {
1748 ERR("Failed to receive relay daemon get configuration query flag data");
1749 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1750 goto end;
1751 }
1752
1753 query_flag_reply.query_flag = be64toh(query_flag_reply.query_flag);
1754 LTTNG_ASSERT(query_flag_reply.query_flag &
1755 LTTCOMM_RELAYD_CONFIGURATION_QUERY_FLAG_SUPPORTED_TRACE_FORMAT);
1756
1757 query_flag_reply.payload_len = be64toh(query_flag_reply.payload_len);
1758 LTTNG_ASSERT(query_flag_reply.payload_len == sizeof(uint64_t));
1759
1760 lttng_dynamic_buffer_set_size(&buffer, query_flag_reply.payload_len);
1761
1762 ret = recv_reply(sock, buffer.data, query_flag_reply.payload_len);
1763 if (ret < 0) {
1764 ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL;
1765 ERR("Failed to receive configuration dynamic payload for flag TODO");
1766 goto end;
1767 }
1768
1769 *trace_format_query_results = be64toh(*(uint64_t *) buffer.data);
1770 }
1771
1772 ret_code = LTTNG_OK;
1773
8614e600 1774end:
f4c5b127
JR
1775 lttng_dynamic_buffer_reset(&buffer);
1776 return ret_code;
8614e600 1777}
This page took 0.177754 seconds and 5 git commands to generate.