tests: add base-path tests
[lttng-tools.git] / src / common / relayd / relayd.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
00e2e675
DG
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
77c7c900 24#include <inttypes.h>
00e2e675
DG
25
26#include <common/common.h>
27#include <common/defaults.h>
f263b7fd 28#include <common/compat/endian.h>
27283937 29#include <common/compat/string.h>
00e2e675 30#include <common/sessiond-comm/relayd.h>
50adc264 31#include <common/index/ctf-index.h>
d2956687 32#include <common/trace-chunk.h>
c35f9726 33#include <common/string-utils/format.h>
00e2e675
DG
34
35#include "relayd.h"
36
37/*
38 * Send command. Fill up the header and append the data.
39 */
6151a90f 40static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 41 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
42 int flags)
43{
44 int ret;
45 struct lttcomm_relayd_hdr header;
46 char *buf;
47 uint64_t buf_size = sizeof(header);
48
f96e4545
MD
49 if (rsock->sock.fd < 0) {
50 return -ECONNRESET;
51 }
52
00e2e675
DG
53 if (data) {
54 buf_size += size;
55 }
56
57 buf = zmalloc(buf_size);
58 if (buf == NULL) {
59 PERROR("zmalloc relayd send command buf");
60 ret = -1;
61 goto alloc_error;
62 }
63
53efb85a 64 memset(&header, 0, sizeof(header));
00e2e675
DG
65 header.cmd = htobe32(cmd);
66 header.data_size = htobe64(size);
67
68 /* Zeroed for now since not used. */
69 header.cmd_version = 0;
70 header.circuit_id = 0;
71
72 /* Prepare buffer to send. */
73 memcpy(buf, &header, sizeof(header));
74 if (data) {
75 memcpy(buf + sizeof(header), data, size);
76 }
77
06586bbe 78 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
6151a90f 79 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 80 if (ret < 0) {
06586bbe
JG
81 PERROR("Failed to send command %d of size %" PRIu64,
82 (int) cmd, buf_size);
8994307f 83 ret = -errno;
00e2e675
DG
84 goto error;
85 }
00e2e675
DG
86error:
87 free(buf);
88alloc_error:
89 return ret;
90}
91
92/*
93 * Receive reply data on socket. This MUST be call after send_command or else
94 * could result in unexpected behavior(s).
95 */
6151a90f 96static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
97{
98 int ret;
99
f96e4545
MD
100 if (rsock->sock.fd < 0) {
101 return -ECONNRESET;
102 }
103
8fd623e0 104 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 105
6151a90f 106 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
107 if (ret <= 0 || ret != size) {
108 if (ret == 0) {
109 /* Orderly shutdown. */
6151a90f 110 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 111 } else {
8fd623e0 112 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 113 rsock->sock.fd, size, ret);
20275fe8
DG
114 }
115 /* Always return -1 here and the caller can use errno. */
116 ret = -1;
00e2e675
DG
117 goto error;
118 }
119
120error:
121 return ret;
122}
123
d3e2ba59 124/*
6fa5fe7c
MD
125 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name,
126 * hostname, and base_path) have no length restriction on the sender side.
f86f6389
JR
127 * Length for both payloads is stored in the msg struct. A new dynamic size
128 * payload size is introduced.
129 */
130static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
fb9a95c4 131 const char *session_name, const char *hostname,
6fa5fe7c
MD
132 const char *base_path, int session_live_timer,
133 unsigned int snapshot, uint64_t sessiond_session_id,
134 const lttng_uuid sessiond_uuid, const uint64_t *current_chunk_id,
db1da059 135 time_t creation_time)
f86f6389
JR
136{
137 int ret;
138 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
139 size_t session_name_len;
140 size_t hostname_len;
6fa5fe7c 141 size_t base_path_len;
f86f6389 142 size_t msg_length;
6fa5fe7c 143 char *dst;
f86f6389
JR
144
145 /* The two names are sent with a '\0' delimiter between them. */
146 session_name_len = strlen(session_name) + 1;
147 hostname_len = strlen(hostname) + 1;
6fa5fe7c 148 base_path_len = base_path ? strlen(base_path) + 1 : 0;
f86f6389 149
6fa5fe7c 150 msg_length = sizeof(*msg) + session_name_len + hostname_len + base_path_len;
f86f6389
JR
151 msg = zmalloc(msg_length);
152 if (!msg) {
153 PERROR("zmalloc create_session_2_11 command message");
154 ret = -1;
155 goto error;
156 }
157
158 assert(session_name_len <= UINT32_MAX);
159 msg->session_name_len = htobe32(session_name_len);
160
161 assert(hostname_len <= UINT32_MAX);
162 msg->hostname_len = htobe32(hostname_len);
163
6fa5fe7c
MD
164 assert(base_path_len <= UINT32_MAX);
165 msg->base_path_len = htobe32(base_path_len);
166
167 dst = msg->names;
168 if (lttng_strncpy(dst, session_name, session_name_len)) {
169 ret = -1;
170 goto error;
171 }
172 dst += session_name_len;
173 if (lttng_strncpy(dst, hostname, hostname_len)) {
f86f6389
JR
174 ret = -1;
175 goto error;
176 }
6fa5fe7c
MD
177 dst += hostname_len;
178 if (base_path && lttng_strncpy(dst, base_path, base_path_len)) {
f86f6389
JR
179 ret = -1;
180 goto error;
181 }
182
183 msg->live_timer = htobe32(session_live_timer);
184 msg->snapshot = !!snapshot;
185
658f12fa
JG
186 lttng_uuid_copy(msg->sessiond_uuid, sessiond_uuid);
187 msg->session_id = htobe64(sessiond_session_id);
188
f39bd140
JG
189 if (current_chunk_id) {
190 LTTNG_OPTIONAL_SET(&msg->current_chunk_id,
191 htobe64(*current_chunk_id));
192 }
193
db1da059
JG
194 msg->creation_time = htobe64((uint64_t) creation_time);
195
f86f6389
JR
196 /* Send command */
197 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
198 if (ret < 0) {
199 goto error;
200 }
201error:
202 free(msg);
203 return ret;
204}
205/*
206 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
207 * support the live reading capability.
208 */
209static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
fb9a95c4
JG
210 const char *session_name, const char *hostname,
211 int session_live_timer, unsigned int snapshot)
d3e2ba59
JD
212{
213 int ret;
214 struct lttcomm_relayd_create_session_2_4 msg;
215
3a13ffd5
MD
216 if (lttng_strncpy(msg.session_name, session_name,
217 sizeof(msg.session_name))) {
246777db
MD
218 ret = -1;
219 goto error;
220 }
3a13ffd5 221 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
222 ret = -1;
223 goto error;
224 }
d3e2ba59 225 msg.live_timer = htobe32(session_live_timer);
7d2f7452 226 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
227
228 /* Send command */
229 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
230 if (ret < 0) {
231 goto error;
232 }
233
234error:
235 return ret;
236}
237
238/*
239 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
240 */
42e9a27b 241static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
d3e2ba59
JD
242{
243 int ret;
244
245 /* Send command */
246 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
247 if (ret < 0) {
248 goto error;
249 }
250
251error:
252 return ret;
253}
254
c5b6f4f0
DG
255/*
256 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
257 * set session_id of the relayd if we have a successful reply from the relayd.
258 *
20275fe8
DG
259 * On success, return 0 else a negative value which is either an errno error or
260 * a lttng error code from the relayd.
c5b6f4f0 261 */
fb9a95c4
JG
262int relayd_create_session(struct lttcomm_relayd_sock *rsock,
263 uint64_t *relayd_session_id,
264 const char *session_name, const char *hostname,
6fa5fe7c 265 const char *base_path, int session_live_timer,
658f12fa 266 unsigned int snapshot, uint64_t sessiond_session_id,
f39bd140 267 const lttng_uuid sessiond_uuid,
db1da059
JG
268 const uint64_t *current_chunk_id,
269 time_t creation_time)
c5b6f4f0
DG
270{
271 int ret;
272 struct lttcomm_relayd_status_session reply;
273
6151a90f 274 assert(rsock);
658f12fa 275 assert(relayd_session_id);
c5b6f4f0
DG
276
277 DBG("Relayd create session");
278
f86f6389
JR
279 if (rsock->minor < 4) {
280 /* From 2.1 to 2.3 */
281 ret = relayd_create_session_2_1(rsock);
282 } else if (rsock->minor >= 4 && rsock->minor < 11) {
283 /* From 2.4 to 2.10 */
284 ret = relayd_create_session_2_4(rsock, session_name,
285 hostname, session_live_timer, snapshot);
286 } else {
287 /* From 2.11 to ... */
288 ret = relayd_create_session_2_11(rsock, session_name,
6fa5fe7c 289 hostname, base_path, session_live_timer, snapshot,
f39bd140 290 sessiond_session_id, sessiond_uuid,
db1da059 291 current_chunk_id, creation_time);
d3e2ba59
JD
292 }
293
c5b6f4f0
DG
294 if (ret < 0) {
295 goto error;
296 }
297
20275fe8 298 /* Receive response */
6151a90f 299 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c5b6f4f0
DG
300 if (ret < 0) {
301 goto error;
302 }
303
304 reply.session_id = be64toh(reply.session_id);
305 reply.ret_code = be32toh(reply.ret_code);
306
307 /* Return session id or negative ret code. */
308 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
309 ret = -1;
310 ERR("Relayd create session replied error %d", reply.ret_code);
c5b6f4f0
DG
311 goto error;
312 } else {
313 ret = 0;
658f12fa 314 *relayd_session_id = reply.session_id;
c5b6f4f0
DG
315 }
316
317 DBG("Relayd session created with id %" PRIu64, reply.session_id);
318
319error:
320 return ret;
321}
322
2f21a469
JR
323static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
324 const char *channel_name, const char *pathname)
325{
326 int ret;
327 struct lttcomm_relayd_add_stream msg;
328
329 memset(&msg, 0, sizeof(msg));
330 if (lttng_strncpy(msg.channel_name, channel_name,
331 sizeof(msg.channel_name))) {
332 ret = -1;
333 goto error;
334 }
335
336 if (lttng_strncpy(msg.pathname, pathname,
337 sizeof(msg.pathname))) {
338 ret = -1;
339 goto error;
340 }
341
342 /* Send command */
343 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
344 if (ret < 0) {
345 ret = -1;
346 goto error;
347 }
348 ret = 0;
349error:
350 return ret;
351}
352
353static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
354 const char *channel_name, const char *pathname,
355 uint64_t tracefile_size, uint64_t tracefile_count)
356{
357 int ret;
358 struct lttcomm_relayd_add_stream_2_2 msg;
359
360 memset(&msg, 0, sizeof(msg));
361 /* Compat with relayd 2.2 to 2.10 */
362 if (lttng_strncpy(msg.channel_name, channel_name,
363 sizeof(msg.channel_name))) {
364 ret = -1;
365 goto error;
366 }
367 if (lttng_strncpy(msg.pathname, pathname,
368 sizeof(msg.pathname))) {
369 ret = -1;
370 goto error;
371 }
372 msg.tracefile_size = htobe64(tracefile_size);
373 msg.tracefile_count = htobe64(tracefile_count);
374
375 /* Send command */
376 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
377 if (ret < 0) {
378 goto error;
379 }
380 ret = 0;
381error:
382 return ret;
383}
384
385static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
386 const char *channel_name, const char *pathname,
b00e554e
JG
387 uint64_t tracefile_size, uint64_t tracefile_count,
388 uint64_t trace_archive_id)
2f21a469
JR
389{
390 int ret;
391 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
392 size_t channel_name_len;
393 size_t pathname_len;
394 size_t msg_length;
395
396 /* The two names are sent with a '\0' delimiter between them. */
397 channel_name_len = strlen(channel_name) + 1;
398 pathname_len = strlen(pathname) + 1;
399
400 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
401 msg = zmalloc(msg_length);
402 if (!msg) {
403 PERROR("zmalloc add_stream_2_11 command message");
404 ret = -1;
405 goto error;
406 }
407
408 assert(channel_name_len <= UINT32_MAX);
409 msg->channel_name_len = htobe32(channel_name_len);
410
411 assert(pathname_len <= UINT32_MAX);
412 msg->pathname_len = htobe32(pathname_len);
413
414 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
415 ret = -1;
416 goto error;
417 }
418 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
419 ret = -1;
420 goto error;
421 }
422
423 msg->tracefile_size = htobe64(tracefile_size);
424 msg->tracefile_count = htobe64(tracefile_count);
348a81dc 425 msg->trace_chunk_id = htobe64(trace_archive_id);
2f21a469
JR
426
427 /* Send command */
428 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
429 if (ret < 0) {
430 goto error;
431 }
432 ret = 0;
433error:
434 free(msg);
435 return ret;
436}
437
00e2e675
DG
438/*
439 * Add stream on the relayd and assign stream handle to the stream_id argument.
440 *
441 * On success return 0 else return ret_code negative value.
442 */
6151a90f 443int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
0f907de1 444 const char *pathname, uint64_t *stream_id,
0b50e4b3 445 uint64_t tracefile_size, uint64_t tracefile_count,
d2956687 446 struct lttng_trace_chunk *trace_chunk)
00e2e675
DG
447{
448 int ret;
00e2e675
DG
449 struct lttcomm_relayd_status_stream reply;
450
451 /* Code flow error. Safety net. */
6151a90f 452 assert(rsock);
00e2e675
DG
453 assert(channel_name);
454 assert(pathname);
455
456 DBG("Relayd adding stream for channel name %s", channel_name);
457
0f907de1
JD
458 /* Compat with relayd 2.1 */
459 if (rsock->minor == 1) {
2f21a469 460 /* For 2.1 */
d2956687 461 assert(!trace_chunk);
2f21a469
JR
462 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
463
464 } else if (rsock->minor > 1 && rsock->minor < 11) {
465 /* From 2.2 to 2.10 */
d2956687 466 assert(!trace_chunk);
2f21a469
JR
467 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
468 tracefile_size, tracefile_count);
0f907de1 469 } else {
d2956687
JG
470 enum lttng_trace_chunk_status chunk_status;
471 uint64_t chunk_id;
472
473 assert(trace_chunk);
474 chunk_status = lttng_trace_chunk_get_id(trace_chunk,
475 &chunk_id);
476 assert(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
477
2f21a469
JR
478 /* From 2.11 to ...*/
479 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
b00e554e 480 tracefile_size, tracefile_count,
d2956687 481 chunk_id);
2f21a469 482 }
0f907de1 483
2f21a469
JR
484 if (ret) {
485 ret = -1;
486 goto error;
00e2e675
DG
487 }
488
633d0084 489 /* Waiting for reply */
6151a90f 490 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
491 if (ret < 0) {
492 goto error;
493 }
494
495 /* Back to host bytes order. */
496 reply.handle = be64toh(reply.handle);
497 reply.ret_code = be32toh(reply.ret_code);
498
499 /* Return session id or negative ret code. */
f73fabfd 500 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
501 ret = -1;
502 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
503 } else {
504 /* Success */
505 ret = 0;
506 *stream_id = reply.handle;
507 }
508
77c7c900 509 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 510 reply.handle);
00e2e675
DG
511
512error:
513 return ret;
514}
515
a4baae1b
JD
516/*
517 * Inform the relay that all the streams for the current channel has been sent.
518 *
519 * On success return 0 else return ret_code negative value.
520 */
521int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
522{
523 int ret;
524 struct lttcomm_relayd_generic_reply reply;
525
526 /* Code flow error. Safety net. */
527 assert(rsock);
528
529 DBG("Relayd sending streams sent.");
530
531 /* This feature was introduced in 2.4, ignore it for earlier versions. */
532 if (rsock->minor < 4) {
533 ret = 0;
534 goto end;
535 }
536
537 /* Send command */
538 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
539 if (ret < 0) {
540 goto error;
541 }
542
543 /* Waiting for reply */
544 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
545 if (ret < 0) {
546 goto error;
547 }
548
549 /* Back to host bytes order. */
550 reply.ret_code = be32toh(reply.ret_code);
551
552 /* Return session id or negative ret code. */
553 if (reply.ret_code != LTTNG_OK) {
554 ret = -1;
555 ERR("Relayd streams sent replied error %d", reply.ret_code);
556 goto error;
557 } else {
558 /* Success */
559 ret = 0;
560 }
561
562 DBG("Relayd streams sent success");
563
564error:
565end:
566 return ret;
567}
568
00e2e675
DG
569/*
570 * Check version numbers on the relayd.
d4519fa3
JD
571 * If major versions are compatible, we assign minor_to_use to the
572 * minor version of the procotol we are going to use for this session.
00e2e675 573 *
67d5aa28
JD
574 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
575 * otherwise, or a negative value on network errors.
00e2e675 576 */
6151a90f 577int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
578{
579 int ret;
092b6259 580 struct lttcomm_relayd_version msg;
00e2e675
DG
581
582 /* Code flow error. Safety net. */
6151a90f 583 assert(rsock);
00e2e675 584
6151a90f
JD
585 DBG("Relayd version check for major.minor %u.%u", rsock->major,
586 rsock->minor);
00e2e675 587
53efb85a 588 memset(&msg, 0, sizeof(msg));
092b6259 589 /* Prepare network byte order before transmission. */
6151a90f
JD
590 msg.major = htobe32(rsock->major);
591 msg.minor = htobe32(rsock->minor);
092b6259 592
00e2e675 593 /* Send command */
6151a90f 594 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
595 if (ret < 0) {
596 goto error;
597 }
598
20275fe8 599 /* Receive response */
6151a90f 600 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
601 if (ret < 0) {
602 goto error;
603 }
604
605 /* Set back to host bytes order */
092b6259
DG
606 msg.major = be32toh(msg.major);
607 msg.minor = be32toh(msg.minor);
608
609 /*
610 * Only validate the major version. If the other side is higher,
611 * communication is not possible. Only major version equal can talk to each
612 * other. If the minor version differs, the lowest version is used by both
613 * sides.
092b6259 614 */
6151a90f 615 if (msg.major != rsock->major) {
d4519fa3 616 /* Not compatible */
67d5aa28 617 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 618 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 619 msg.major, rsock->major);
092b6259 620 goto error;
00e2e675
DG
621 }
622
092b6259 623 /*
6151a90f
JD
624 * If the relayd's minor version is higher, it will adapt to our version so
625 * we can continue to use the latest relayd communication data structure.
626 * If the received minor version is higher, the relayd should adapt to us.
092b6259 627 */
6151a90f
JD
628 if (rsock->minor > msg.minor) {
629 rsock->minor = msg.minor;
d4519fa3 630 }
092b6259 631
d4519fa3
JD
632 /* Version number compatible */
633 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 634 rsock->major, rsock->minor);
d4519fa3 635 ret = 0;
00e2e675
DG
636
637error:
638 return ret;
639}
640
00e2e675
DG
641/*
642 * Add stream on the relayd and assign stream handle to the stream_id argument.
643 *
644 * On success return 0 else return ret_code negative value.
645 */
6151a90f 646int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
647{
648 int ret;
649
650 /* Code flow error. Safety net. */
6151a90f 651 assert(rsock);
00e2e675 652
77c7c900 653 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
654
655 /* Send command */
6151a90f 656 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
657 if (ret < 0) {
658 goto error;
659 }
660
661 DBG2("Relayd metadata added successfully");
662
663 /*
664 * After that call, the metadata data MUST be sent to the relayd so the
665 * receive size on the other end matches the len of the metadata packet
633d0084 666 * header. This is why we don't wait for a reply here.
00e2e675
DG
667 */
668
669error:
670 return ret;
671}
672
673/*
6151a90f 674 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 675 */
6151a90f 676int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
677{
678 /* Code flow error. Safety net. */
6151a90f 679 assert(rsock);
00e2e675 680
f96e4545
MD
681 if (!rsock->sock.ops) {
682 /*
683 * Attempting a connect on a non-initialized socket.
684 */
685 return -ECONNRESET;
686 }
687
00e2e675
DG
688 DBG3("Relayd connect ...");
689
6151a90f 690 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
691}
692
693/*
6151a90f 694 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
695 *
696 * If no socket operations are found, simply return 0 meaning that everything
697 * is fine. Without operations, the socket can not possibly be opened or used.
698 * This is possible if the socket was allocated but not created. However, the
699 * caller could simply use it to store a valid file descriptor for instance
700 * passed over a Unix socket and call this to cleanup but still without a valid
701 * ops pointer.
702 *
703 * Return the close returned value. On error, a negative value is usually
704 * returned back from close(2).
00e2e675 705 */
6151a90f 706int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 707{
ffe60014
DG
708 int ret;
709
00e2e675 710 /* Code flow error. Safety net. */
6151a90f 711 assert(rsock);
00e2e675 712
ffe60014 713 /* An invalid fd is fine, return success. */
6151a90f 714 if (rsock->sock.fd < 0) {
ffe60014
DG
715 ret = 0;
716 goto end;
717 }
718
6151a90f 719 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 720
6151a90f
JD
721 if (rsock->sock.ops) {
722 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
723 } else {
724 /* Default call if no specific ops found. */
6151a90f 725 ret = close(rsock->sock.fd);
ffe60014
DG
726 if (ret < 0) {
727 PERROR("relayd_close default close");
728 }
729 }
f96e4545 730 rsock->sock.fd = -1;
ffe60014
DG
731
732end:
733 return ret;
00e2e675
DG
734}
735
736/*
737 * Send data header structure to the relayd.
738 */
6151a90f 739int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
740 struct lttcomm_relayd_data_hdr *hdr, size_t size)
741{
742 int ret;
743
744 /* Code flow error. Safety net. */
6151a90f 745 assert(rsock);
00e2e675
DG
746 assert(hdr);
747
f96e4545
MD
748 if (rsock->sock.fd < 0) {
749 return -ECONNRESET;
750 }
751
8fd623e0 752 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
753
754 /* Again, safety net */
755 if (size == 0) {
756 size = sizeof(struct lttcomm_relayd_data_hdr);
757 }
758
759 /* Only send data header. */
6151a90f 760 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 761 if (ret < 0) {
8994307f 762 ret = -errno;
00e2e675
DG
763 goto error;
764 }
765
766 /*
767 * The data MUST be sent right after that command for the receive on the
768 * other end to match the size in the header.
769 */
770
771error:
772 return ret;
773}
173af62f
DG
774
775/*
776 * Send close stream command to the relayd.
777 */
6151a90f 778int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
779 uint64_t last_net_seq_num)
780{
781 int ret;
782 struct lttcomm_relayd_close_stream msg;
783 struct lttcomm_relayd_generic_reply reply;
784
785 /* Code flow error. Safety net. */
6151a90f 786 assert(rsock);
173af62f 787
77c7c900 788 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 789
53efb85a 790 memset(&msg, 0, sizeof(msg));
173af62f
DG
791 msg.stream_id = htobe64(stream_id);
792 msg.last_net_seq_num = htobe64(last_net_seq_num);
793
794 /* Send command */
6151a90f 795 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
796 if (ret < 0) {
797 goto error;
798 }
799
20275fe8 800 /* Receive response */
6151a90f 801 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
802 if (ret < 0) {
803 goto error;
804 }
805
806 reply.ret_code = be32toh(reply.ret_code);
807
808 /* Return session id or negative ret code. */
f73fabfd 809 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
810 ret = -1;
811 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
812 } else {
813 /* Success */
814 ret = 0;
815 }
816
77c7c900 817 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
818
819error:
820 return ret;
821}
c8f59ee5
DG
822
823/*
824 * Check for data availability for a given stream id.
825 *
6d805429 826 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 827 */
6151a90f 828int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
829 uint64_t last_net_seq_num)
830{
831 int ret;
6d805429 832 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
833 struct lttcomm_relayd_generic_reply reply;
834
835 /* Code flow error. Safety net. */
6151a90f 836 assert(rsock);
c8f59ee5 837
6d805429 838 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 839
53efb85a 840 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
841 msg.stream_id = htobe64(stream_id);
842 msg.last_net_seq_num = htobe64(last_net_seq_num);
843
844 /* Send command */
6151a90f 845 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
846 sizeof(msg), 0);
847 if (ret < 0) {
848 goto error;
849 }
850
20275fe8 851 /* Receive response */
6151a90f 852 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
853 if (ret < 0) {
854 goto error;
855 }
856
857 reply.ret_code = be32toh(reply.ret_code);
858
859 /* Return session id or negative ret code. */
860 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 861 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
862 }
863
864 /* At this point, the ret code is either 1 or 0 */
865 ret = reply.ret_code;
866
6d805429 867 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 868 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
869
870error:
871 return ret;
872}
873
874/*
875 * Check on the relayd side for a quiescent state on the control socket.
876 */
6151a90f 877int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 878 uint64_t metadata_stream_id)
c8f59ee5
DG
879{
880 int ret;
ad7051c0 881 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
882 struct lttcomm_relayd_generic_reply reply;
883
884 /* Code flow error. Safety net. */
6151a90f 885 assert(rsock);
c8f59ee5
DG
886
887 DBG("Relayd checking quiescent control state");
888
53efb85a 889 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
890 msg.stream_id = htobe64(metadata_stream_id);
891
c8f59ee5 892 /* Send command */
6151a90f 893 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
894 if (ret < 0) {
895 goto error;
896 }
897
20275fe8 898 /* Receive response */
6151a90f 899 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
900 if (ret < 0) {
901 goto error;
902 }
903
904 reply.ret_code = be32toh(reply.ret_code);
905
906 /* Return session id or negative ret code. */
907 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
908 ret = -1;
909 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
910 goto error;
911 }
912
913 /* Control socket is quiescent */
6d805429 914 return 0;
c8f59ee5
DG
915
916error:
917 return ret;
918}
f7079f67
DG
919
920/*
921 * Begin a data pending command for a specific session id.
922 */
6151a90f 923int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
924{
925 int ret;
926 struct lttcomm_relayd_begin_data_pending msg;
927 struct lttcomm_relayd_generic_reply reply;
928
929 /* Code flow error. Safety net. */
6151a90f 930 assert(rsock);
f7079f67
DG
931
932 DBG("Relayd begin data pending");
933
53efb85a 934 memset(&msg, 0, sizeof(msg));
f7079f67
DG
935 msg.session_id = htobe64(id);
936
937 /* Send command */
6151a90f 938 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
939 if (ret < 0) {
940 goto error;
941 }
942
20275fe8 943 /* Receive response */
6151a90f 944 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
945 if (ret < 0) {
946 goto error;
947 }
948
949 reply.ret_code = be32toh(reply.ret_code);
950
951 /* Return session id or negative ret code. */
952 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
953 ret = -1;
954 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
955 goto error;
956 }
957
958 return 0;
959
960error:
961 return ret;
962}
963
964/*
965 * End a data pending command for a specific session id.
966 *
967 * Return 0 on success and set is_data_inflight to 0 if no data is being
968 * streamed or 1 if it is the case.
969 */
6151a90f 970int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
971 unsigned int *is_data_inflight)
972{
af6c30b5 973 int ret, recv_ret;
f7079f67
DG
974 struct lttcomm_relayd_end_data_pending msg;
975 struct lttcomm_relayd_generic_reply reply;
976
977 /* Code flow error. Safety net. */
6151a90f 978 assert(rsock);
f7079f67
DG
979
980 DBG("Relayd end data pending");
981
53efb85a 982 memset(&msg, 0, sizeof(msg));
f7079f67
DG
983 msg.session_id = htobe64(id);
984
985 /* Send command */
6151a90f 986 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
987 if (ret < 0) {
988 goto error;
989 }
990
20275fe8 991 /* Receive response */
6151a90f 992 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
993 if (ret < 0) {
994 goto error;
995 }
996
af6c30b5
DG
997 recv_ret = be32toh(reply.ret_code);
998 if (recv_ret < 0) {
999 ret = recv_ret;
f7079f67
DG
1000 goto error;
1001 }
1002
af6c30b5 1003 *is_data_inflight = recv_ret;
f7079f67 1004
af6c30b5 1005 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
1006
1007 return 0;
1008
1009error:
1010 return ret;
1011}
1c20f0e2
JD
1012
1013/*
1014 * Send index to the relayd.
1015 */
1016int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 1017 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
1018 uint64_t net_seq_num)
1019{
1020 int ret;
1021 struct lttcomm_relayd_index msg;
1022 struct lttcomm_relayd_generic_reply reply;
1023
1024 /* Code flow error. Safety net. */
1025 assert(rsock);
1026
1027 if (rsock->minor < 4) {
1028 DBG("Not sending indexes before protocol 2.4");
1029 ret = 0;
1030 goto error;
1031 }
1032
1033 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
1034
53efb85a 1035 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
1036 msg.relay_stream_id = htobe64(relay_stream_id);
1037 msg.net_seq_num = htobe64(net_seq_num);
1038
1039 /* The index is already in big endian. */
1040 msg.packet_size = index->packet_size;
1041 msg.content_size = index->content_size;
1042 msg.timestamp_begin = index->timestamp_begin;
1043 msg.timestamp_end = index->timestamp_end;
1044 msg.events_discarded = index->events_discarded;
1045 msg.stream_id = index->stream_id;
1046
234cd636
JD
1047 if (rsock->minor >= 8) {
1048 msg.stream_instance_id = index->stream_instance_id;
1049 msg.packet_seq_num = index->packet_seq_num;
1050 }
1051
1c20f0e2 1052 /* Send command */
f8f3885c
MD
1053 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1054 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1055 rsock->minor),
1056 lttng_to_index_minor(rsock->major, rsock->minor)),
1057 0);
1c20f0e2
JD
1058 if (ret < 0) {
1059 goto error;
1060 }
1061
1062 /* Receive response */
1063 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1064 if (ret < 0) {
1065 goto error;
1066 }
1067
1068 reply.ret_code = be32toh(reply.ret_code);
1069
1070 /* Return session id or negative ret code. */
1071 if (reply.ret_code != LTTNG_OK) {
1072 ret = -1;
1073 ERR("Relayd send index replied error %d", reply.ret_code);
1074 } else {
1075 /* Success */
1076 ret = 0;
1077 }
1078
1079error:
1080 return ret;
1081}
93ec662e
JD
1082
1083/*
1084 * Ask the relay to reset the metadata trace file (regeneration).
1085 */
1086int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1087 uint64_t stream_id, uint64_t version)
1088{
1089 int ret;
1090 struct lttcomm_relayd_reset_metadata msg;
1091 struct lttcomm_relayd_generic_reply reply;
1092
1093 /* Code flow error. Safety net. */
1094 assert(rsock);
1095
1096 /* Should have been prevented by the sessiond. */
1097 if (rsock->minor < 8) {
1098 ERR("Metadata regeneration unsupported before 2.8");
1099 ret = -1;
1100 goto error;
1101 }
1102
1103 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1104
1105 memset(&msg, 0, sizeof(msg));
1106 msg.stream_id = htobe64(stream_id);
1107 msg.version = htobe64(version);
1108
1109 /* Send command */
1110 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1111 if (ret < 0) {
1112 goto error;
1113 }
1114
1115 /* Receive response */
1116 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1117 if (ret < 0) {
1118 goto error;
1119 }
1120
1121 reply.ret_code = be32toh(reply.ret_code);
1122
1123 /* Return session id or negative ret code. */
1124 if (reply.ret_code != LTTNG_OK) {
1125 ret = -1;
1126 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1127 } else {
1128 /* Success */
1129 ret = 0;
1130 }
1131
1132 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1133
1134error:
1135 return ret;
1136}
a1ae2ea5 1137
c35f9726 1138int relayd_rotate_streams(struct lttcomm_relayd_sock *sock,
ebb29c10 1139 unsigned int stream_count, const uint64_t *new_chunk_id,
c35f9726 1140 const struct relayd_stream_rotation_position *positions)
d73bf3d7
JD
1141{
1142 int ret;
c35f9726
JG
1143 unsigned int i;
1144 struct lttng_dynamic_buffer payload;
1145 struct lttcomm_relayd_generic_reply reply = {};
1146 const struct lttcomm_relayd_rotate_streams msg = {
1147 .stream_count = htobe32((uint32_t) stream_count),
1148 .new_chunk_id = (typeof(msg.new_chunk_id)) {
1149 .is_set = !!new_chunk_id,
1150 .value = htobe64(new_chunk_id ? *new_chunk_id : 0),
1151 },
1152 };
1153 char new_chunk_id_buf[MAX_INT_DEC_LEN(*new_chunk_id)] = {};
1154 const char *new_chunk_id_str;
d73bf3d7 1155
c35f9726 1156 lttng_dynamic_buffer_init(&payload);
d73bf3d7 1157
c35f9726
JG
1158 /* Code flow error. Safety net. */
1159 assert(sock);
d73bf3d7 1160
c35f9726
JG
1161 if (new_chunk_id) {
1162 ret = snprintf(new_chunk_id_buf, sizeof(new_chunk_id_buf),
1163 "%" PRIu64, *new_chunk_id);
1164 if (ret == -1 || ret >= sizeof(new_chunk_id_buf)) {
1165 new_chunk_id_str = "formatting error";
1166 } else {
1167 new_chunk_id_str = new_chunk_id_buf;
1168 }
1169 } else {
1170 new_chunk_id_str = "none";
d73bf3d7
JD
1171 }
1172
c35f9726
JG
1173 DBG("Preparing \"rotate streams\" command payload: new_chunk_id = %s, stream_count = %u",
1174 new_chunk_id_str, stream_count);
d73bf3d7 1175
c35f9726
JG
1176 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1177 if (ret) {
1178 ERR("Failed to allocate \"rotate streams\" command payload");
d73bf3d7
JD
1179 goto error;
1180 }
1181
c35f9726
JG
1182 for (i = 0; i < stream_count; i++) {
1183 const struct relayd_stream_rotation_position *position =
1184 &positions[i];
1185 const struct lttcomm_relayd_stream_rotation_position comm_position = {
1186 .stream_id = htobe64(position->stream_id),
1187 .rotate_at_seq_num = htobe64(
1188 position->rotate_at_seq_num),
1189 };
1190
1191 DBG("Rotate stream %" PRIu64 "at sequence number %" PRIu64,
1192 position->stream_id,
1193 position->rotate_at_seq_num);
1194 ret = lttng_dynamic_buffer_append(&payload, &comm_position,
1195 sizeof(comm_position));
1196 if (ret) {
1197 ERR("Failed to allocate \"rotate streams\" command payload");
1198 goto error;
1199 }
1200 }
d73bf3d7
JD
1201
1202 /* Send command. */
c35f9726
JG
1203 ret = send_command(sock, RELAYD_ROTATE_STREAMS, payload.data,
1204 payload.size, 0);
d73bf3d7 1205 if (ret < 0) {
c35f9726 1206 ERR("Failed to send \"rotate stream\" command");
d73bf3d7
JD
1207 goto error;
1208 }
1209
1210 /* Receive response. */
c35f9726 1211 ret = recv_reply(sock, &reply, sizeof(reply));
d73bf3d7 1212 if (ret < 0) {
c35f9726 1213 ERR("Failed to receive \"rotate streams\" command reply");
d73bf3d7
JD
1214 goto error;
1215 }
1216
1217 reply.ret_code = be32toh(reply.ret_code);
d73bf3d7
JD
1218 if (reply.ret_code != LTTNG_OK) {
1219 ret = -1;
c35f9726 1220 ERR("Relayd rotate streams replied error %d", reply.ret_code);
d73bf3d7
JD
1221 } else {
1222 /* Success. */
1223 ret = 0;
c35f9726 1224 DBG("Relayd rotated streams successfully");
d73bf3d7
JD
1225 }
1226
1227error:
c35f9726 1228 lttng_dynamic_buffer_reset(&payload);
d73bf3d7
JD
1229 return ret;
1230}
e5add6d0
JG
1231
1232int relayd_create_trace_chunk(struct lttcomm_relayd_sock *sock,
1233 struct lttng_trace_chunk *chunk)
1234{
1235 int ret = 0;
1236 enum lttng_trace_chunk_status status;
1237 struct lttcomm_relayd_create_trace_chunk msg = {};
1238 struct lttcomm_relayd_generic_reply reply = {};
1239 struct lttng_dynamic_buffer payload;
1240 uint64_t chunk_id;
1241 time_t creation_timestamp;
1242 const char *chunk_name;
1243 size_t chunk_name_length;
913a542b 1244 bool overridden_name;
e5add6d0
JG
1245
1246 lttng_dynamic_buffer_init(&payload);
1247
1248 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1249 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1250 ret = -1;
1251 goto end;
1252 }
1253
1254 status = lttng_trace_chunk_get_creation_timestamp(
1255 chunk, &creation_timestamp);
1256 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1257 ret = -1;
1258 goto end;
1259 }
1260
1261 status = lttng_trace_chunk_get_name(
913a542b 1262 chunk, &chunk_name, &overridden_name);
e5add6d0
JG
1263 if (status != LTTNG_TRACE_CHUNK_STATUS_OK &&
1264 status != LTTNG_TRACE_CHUNK_STATUS_NONE) {
1265 ret = -1;
1266 goto end;
1267 }
1268
913a542b 1269 chunk_name_length = overridden_name ? (strlen(chunk_name) + 1) : 0;
e5add6d0
JG
1270 msg = (typeof(msg)){
1271 .chunk_id = htobe64(chunk_id),
1272 .creation_timestamp = htobe64((uint64_t) creation_timestamp),
1273 .override_name_length = htobe32((uint32_t) chunk_name_length),
1274 };
1275
1276 ret = lttng_dynamic_buffer_append(&payload, &msg, sizeof(msg));
1277 if (ret) {
1278 goto end;
1279 }
1280 if (chunk_name_length) {
1281 ret = lttng_dynamic_buffer_append(
1282 &payload, chunk_name, chunk_name_length);
1283 if (ret) {
1284 goto end;
1285 }
1286 }
1287
bbc4768c
JG
1288 ret = send_command(sock, RELAYD_CREATE_TRACE_CHUNK, payload.data,
1289 payload.size, 0);
e5add6d0
JG
1290 if (ret < 0) {
1291 ERR("Failed to send trace chunk creation command to relay daemon");
1292 goto end;
1293 }
1294
1295 ret = recv_reply(sock, &reply, sizeof(reply));
1296 if (ret < 0) {
1297 ERR("Failed to receive relay daemon trace chunk creation command reply");
1298 goto end;
1299 }
1300
1301 reply.ret_code = be32toh(reply.ret_code);
1302 if (reply.ret_code != LTTNG_OK) {
1303 ret = -1;
1304 ERR("Relayd trace chunk create replied error %d",
1305 reply.ret_code);
1306 } else {
1307 ret = 0;
1308 DBG("Relayd successfully created trace chunk: chunk_id = %" PRIu64,
1309 chunk_id);
1310 }
1311
1312end:
1313 lttng_dynamic_buffer_reset(&payload);
1314 return ret;
1315}
bbc4768c
JG
1316
1317int relayd_close_trace_chunk(struct lttcomm_relayd_sock *sock,
1318 struct lttng_trace_chunk *chunk)
1319{
1320 int ret = 0;
1321 enum lttng_trace_chunk_status status;
1322 struct lttcomm_relayd_close_trace_chunk msg = {};
1323 struct lttcomm_relayd_generic_reply reply = {};
1324 uint64_t chunk_id;
1325 time_t close_timestamp;
1326 LTTNG_OPTIONAL(enum lttng_trace_chunk_command_type) close_command = {};
1327
1328 status = lttng_trace_chunk_get_id(chunk, &chunk_id);
1329 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1330 ERR("Failed to get trace chunk id");
1331 ret = -1;
1332 goto end;
1333 }
1334
1335 status = lttng_trace_chunk_get_close_timestamp(chunk, &close_timestamp);
1336 if (status != LTTNG_TRACE_CHUNK_STATUS_OK) {
1337 ERR("Failed to get trace chunk close timestamp");
1338 ret = -1;
1339 goto end;
1340 }
1341
1342 status = lttng_trace_chunk_get_close_command(chunk,
1343 &close_command.value);
1344 switch (status) {
1345 case LTTNG_TRACE_CHUNK_STATUS_OK:
1346 close_command.is_set = 1;
1347 break;
1348 case LTTNG_TRACE_CHUNK_STATUS_NONE:
1349 break;
1350 default:
1351 ERR("Failed to get trace chunk close command");
1352 ret = -1;
1353 goto end;
1354 }
1355
1356 msg = (typeof(msg)){
1357 .chunk_id = htobe64(chunk_id),
1358 .close_timestamp = htobe64((uint64_t) close_timestamp),
1359 .close_command = {
1360 .value = htobe32((uint32_t) close_command.value),
1361 .is_set = close_command.is_set,
1362 },
1363 };
1364
1365 ret = send_command(sock, RELAYD_CLOSE_TRACE_CHUNK, &msg, sizeof(msg),
1366 0);
1367 if (ret < 0) {
1368 ERR("Failed to send trace chunk close command to relay daemon");
1369 goto end;
1370 }
1371
1372 ret = recv_reply(sock, &reply, sizeof(reply));
1373 if (ret < 0) {
1374 ERR("Failed to receive relay daemon trace chunk close command reply");
1375 goto end;
1376 }
1377
1378 reply.ret_code = be32toh(reply.ret_code);
1379 if (reply.ret_code != LTTNG_OK) {
1380 ret = -1;
1381 ERR("Relayd trace chunk close replied error %d",
1382 reply.ret_code);
1383 } else {
1384 ret = 0;
1385 DBG("Relayd successfully closed trace chunk: chunk_id = %" PRIu64,
1386 chunk_id);
1387 }
1388end:
1389 return ret;
1390}
c35f9726
JG
1391
1392int relayd_trace_chunk_exists(struct lttcomm_relayd_sock *sock,
1393 uint64_t chunk_id, bool *chunk_exists)
1394{
1395 int ret = 0;
1396 struct lttcomm_relayd_trace_chunk_exists msg = {};
1397 struct lttcomm_relayd_trace_chunk_exists_reply reply = {};
1398
1399 msg = (typeof(msg)){
1400 .chunk_id = htobe64(chunk_id),
1401 };
1402
1403 ret = send_command(sock, RELAYD_TRACE_CHUNK_EXISTS, &msg, sizeof(msg),
1404 0);
1405 if (ret < 0) {
1406 ERR("Failed to send trace chunk exists command to relay daemon");
1407 goto end;
1408 }
1409
1410 ret = recv_reply(sock, &reply, sizeof(reply));
1411 if (ret < 0) {
1412 ERR("Failed to receive relay daemon trace chunk close command reply");
1413 goto end;
1414 }
1415
1416 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1417 if (reply.generic.ret_code != LTTNG_OK) {
1418 ret = -1;
1419 ERR("Relayd trace chunk close replied error %d",
1420 reply.generic.ret_code);
1421 } else {
1422 ret = 0;
1423 DBG("Relayd successfully checked trace chunk existence: chunk_id = %" PRIu64
1424 ", exists = %s", chunk_id,
1425 reply.trace_chunk_exists ? "true" : "false");
1426 *chunk_exists = !!reply.trace_chunk_exists;
1427 }
1428end:
1429 return ret;
1430}
This page took 0.127394 seconds and 5 git commands to generate.