Cleanup: use lttng_* string utility functions
[lttng-tools.git] / src / common / relayd / relayd.c
CommitLineData
00e2e675
DG
1/*
2 * Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License, version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
11 * more details.
12 *
13 * You should have received a copy of the GNU General Public License along with
14 * this program; if not, write to the Free Software Foundation, Inc., 51
15 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
16 */
17
6c1c0768 18#define _LGPL_SOURCE
00e2e675
DG
19#include <assert.h>
20#include <stdio.h>
21#include <stdlib.h>
22#include <string.h>
23#include <sys/stat.h>
77c7c900 24#include <inttypes.h>
00e2e675
DG
25
26#include <common/common.h>
27#include <common/defaults.h>
f263b7fd 28#include <common/compat/endian.h>
27283937 29#include <common/compat/string.h>
00e2e675 30#include <common/sessiond-comm/relayd.h>
50adc264 31#include <common/index/ctf-index.h>
00e2e675
DG
32
33#include "relayd.h"
34
35/*
36 * Send command. Fill up the header and append the data.
37 */
6151a90f 38static int send_command(struct lttcomm_relayd_sock *rsock,
76b9afaa 39 enum lttcomm_relayd_command cmd, const void *data, size_t size,
00e2e675
DG
40 int flags)
41{
42 int ret;
43 struct lttcomm_relayd_hdr header;
44 char *buf;
45 uint64_t buf_size = sizeof(header);
46
f96e4545
MD
47 if (rsock->sock.fd < 0) {
48 return -ECONNRESET;
49 }
50
00e2e675
DG
51 if (data) {
52 buf_size += size;
53 }
54
55 buf = zmalloc(buf_size);
56 if (buf == NULL) {
57 PERROR("zmalloc relayd send command buf");
58 ret = -1;
59 goto alloc_error;
60 }
61
53efb85a 62 memset(&header, 0, sizeof(header));
00e2e675
DG
63 header.cmd = htobe32(cmd);
64 header.data_size = htobe64(size);
65
66 /* Zeroed for now since not used. */
67 header.cmd_version = 0;
68 header.circuit_id = 0;
69
70 /* Prepare buffer to send. */
71 memcpy(buf, &header, sizeof(header));
72 if (data) {
73 memcpy(buf + sizeof(header), data, size);
74 }
75
06586bbe 76 DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
6151a90f 77 ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
00e2e675 78 if (ret < 0) {
06586bbe
JG
79 PERROR("Failed to send command %d of size %" PRIu64,
80 (int) cmd, buf_size);
8994307f 81 ret = -errno;
00e2e675
DG
82 goto error;
83 }
00e2e675
DG
84error:
85 free(buf);
86alloc_error:
87 return ret;
88}
89
90/*
91 * Receive reply data on socket. This MUST be call after send_command or else
92 * could result in unexpected behavior(s).
93 */
6151a90f 94static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size)
00e2e675
DG
95{
96 int ret;
97
f96e4545
MD
98 if (rsock->sock.fd < 0) {
99 return -ECONNRESET;
100 }
101
8fd623e0 102 DBG3("Relayd waiting for reply of size %zu", size);
00e2e675 103
6151a90f 104 ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
20275fe8
DG
105 if (ret <= 0 || ret != size) {
106 if (ret == 0) {
107 /* Orderly shutdown. */
6151a90f 108 DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
20275fe8 109 } else {
8fd623e0 110 DBG("Receiving reply failed on sock %d for size %zu with ret %d",
6151a90f 111 rsock->sock.fd, size, ret);
20275fe8
DG
112 }
113 /* Always return -1 here and the caller can use errno. */
114 ret = -1;
00e2e675
DG
115 goto error;
116 }
117
118error:
119 return ret;
120}
121
d3e2ba59 122/*
f86f6389
JR
123 * Starting from 2.11, RELAYD_CREATE_SESSION payload (session_name & hostname)
124 * have no length restriction on the sender side.
125 * Length for both payloads is stored in the msg struct. A new dynamic size
126 * payload size is introduced.
127 */
128static int relayd_create_session_2_11(struct lttcomm_relayd_sock *rsock,
129 char *session_name, char *hostname,
130 int session_live_timer, unsigned int snapshot)
131{
132 int ret;
133 struct lttcomm_relayd_create_session_2_11 *msg = NULL;
134 size_t session_name_len;
135 size_t hostname_len;
136 size_t msg_length;
137
138 /* The two names are sent with a '\0' delimiter between them. */
139 session_name_len = strlen(session_name) + 1;
140 hostname_len = strlen(hostname) + 1;
141
142 msg_length = sizeof(*msg) + session_name_len + hostname_len;
143 msg = zmalloc(msg_length);
144 if (!msg) {
145 PERROR("zmalloc create_session_2_11 command message");
146 ret = -1;
147 goto error;
148 }
149
150 assert(session_name_len <= UINT32_MAX);
151 msg->session_name_len = htobe32(session_name_len);
152
153 assert(hostname_len <= UINT32_MAX);
154 msg->hostname_len = htobe32(hostname_len);
155
156 if (lttng_strncpy(msg->names, session_name, session_name_len)) {
157 ret = -1;
158 goto error;
159 }
160 if (lttng_strncpy(msg->names + session_name_len, hostname, hostname_len)) {
161 ret = -1;
162 goto error;
163 }
164
165 msg->live_timer = htobe32(session_live_timer);
166 msg->snapshot = !!snapshot;
167
168 /* Send command */
169 ret = send_command(rsock, RELAYD_CREATE_SESSION, msg, msg_length, 0);
170 if (ret < 0) {
171 goto error;
172 }
173error:
174 free(msg);
175 return ret;
176}
177/*
178 * From 2.4 to 2.10, RELAYD_CREATE_SESSION takes additional parameters to
d3e2ba59
JD
179 * support the live reading capability.
180 */
181static int relayd_create_session_2_4(struct lttcomm_relayd_sock *rsock,
42e9a27b
JR
182 char *session_name, char *hostname, int session_live_timer,
183 unsigned int snapshot)
d3e2ba59
JD
184{
185 int ret;
186 struct lttcomm_relayd_create_session_2_4 msg;
187
3a13ffd5
MD
188 if (lttng_strncpy(msg.session_name, session_name,
189 sizeof(msg.session_name))) {
246777db
MD
190 ret = -1;
191 goto error;
192 }
3a13ffd5 193 if (lttng_strncpy(msg.hostname, hostname, sizeof(msg.hostname))) {
246777db
MD
194 ret = -1;
195 goto error;
196 }
d3e2ba59 197 msg.live_timer = htobe32(session_live_timer);
7d2f7452 198 msg.snapshot = htobe32(snapshot);
d3e2ba59
JD
199
200 /* Send command */
201 ret = send_command(rsock, RELAYD_CREATE_SESSION, &msg, sizeof(msg), 0);
202 if (ret < 0) {
203 goto error;
204 }
205
206error:
207 return ret;
208}
209
210/*
211 * RELAYD_CREATE_SESSION from 2.1 to 2.3.
212 */
42e9a27b 213static int relayd_create_session_2_1(struct lttcomm_relayd_sock *rsock)
d3e2ba59
JD
214{
215 int ret;
216
217 /* Send command */
218 ret = send_command(rsock, RELAYD_CREATE_SESSION, NULL, 0, 0);
219 if (ret < 0) {
220 goto error;
221 }
222
223error:
224 return ret;
225}
226
c5b6f4f0
DG
227/*
228 * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
229 * set session_id of the relayd if we have a successful reply from the relayd.
230 *
20275fe8
DG
231 * On success, return 0 else a negative value which is either an errno error or
232 * a lttng error code from the relayd.
c5b6f4f0 233 */
d3e2ba59 234int relayd_create_session(struct lttcomm_relayd_sock *rsock, uint64_t *session_id,
7d2f7452
DG
235 char *session_name, char *hostname, int session_live_timer,
236 unsigned int snapshot)
c5b6f4f0
DG
237{
238 int ret;
239 struct lttcomm_relayd_status_session reply;
240
6151a90f 241 assert(rsock);
c5b6f4f0
DG
242 assert(session_id);
243
244 DBG("Relayd create session");
245
f86f6389
JR
246 if (rsock->minor < 4) {
247 /* From 2.1 to 2.3 */
248 ret = relayd_create_session_2_1(rsock);
249 } else if (rsock->minor >= 4 && rsock->minor < 11) {
250 /* From 2.4 to 2.10 */
251 ret = relayd_create_session_2_4(rsock, session_name,
252 hostname, session_live_timer, snapshot);
253 } else {
254 /* From 2.11 to ... */
255 ret = relayd_create_session_2_11(rsock, session_name,
256 hostname, session_live_timer, snapshot);
d3e2ba59
JD
257 }
258
c5b6f4f0
DG
259 if (ret < 0) {
260 goto error;
261 }
262
20275fe8 263 /* Receive response */
6151a90f 264 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c5b6f4f0
DG
265 if (ret < 0) {
266 goto error;
267 }
268
269 reply.session_id = be64toh(reply.session_id);
270 reply.ret_code = be32toh(reply.ret_code);
271
272 /* Return session id or negative ret code. */
273 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
274 ret = -1;
275 ERR("Relayd create session replied error %d", reply.ret_code);
c5b6f4f0
DG
276 goto error;
277 } else {
278 ret = 0;
279 *session_id = reply.session_id;
280 }
281
282 DBG("Relayd session created with id %" PRIu64, reply.session_id);
283
284error:
285 return ret;
286}
287
2f21a469
JR
288static int relayd_add_stream_2_1(struct lttcomm_relayd_sock *rsock,
289 const char *channel_name, const char *pathname)
290{
291 int ret;
292 struct lttcomm_relayd_add_stream msg;
293
294 memset(&msg, 0, sizeof(msg));
295 if (lttng_strncpy(msg.channel_name, channel_name,
296 sizeof(msg.channel_name))) {
297 ret = -1;
298 goto error;
299 }
300
301 if (lttng_strncpy(msg.pathname, pathname,
302 sizeof(msg.pathname))) {
303 ret = -1;
304 goto error;
305 }
306
307 /* Send command */
308 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
309 if (ret < 0) {
310 ret = -1;
311 goto error;
312 }
313 ret = 0;
314error:
315 return ret;
316}
317
318static int relayd_add_stream_2_2(struct lttcomm_relayd_sock *rsock,
319 const char *channel_name, const char *pathname,
320 uint64_t tracefile_size, uint64_t tracefile_count)
321{
322 int ret;
323 struct lttcomm_relayd_add_stream_2_2 msg;
324
325 memset(&msg, 0, sizeof(msg));
326 /* Compat with relayd 2.2 to 2.10 */
327 if (lttng_strncpy(msg.channel_name, channel_name,
328 sizeof(msg.channel_name))) {
329 ret = -1;
330 goto error;
331 }
332 if (lttng_strncpy(msg.pathname, pathname,
333 sizeof(msg.pathname))) {
334 ret = -1;
335 goto error;
336 }
337 msg.tracefile_size = htobe64(tracefile_size);
338 msg.tracefile_count = htobe64(tracefile_count);
339
340 /* Send command */
341 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
342 if (ret < 0) {
343 goto error;
344 }
345 ret = 0;
346error:
347 return ret;
348}
349
350static int relayd_add_stream_2_11(struct lttcomm_relayd_sock *rsock,
351 const char *channel_name, const char *pathname,
352 uint64_t tracefile_size, uint64_t tracefile_count)
353{
354 int ret;
355 struct lttcomm_relayd_add_stream_2_11 *msg = NULL;
356 size_t channel_name_len;
357 size_t pathname_len;
358 size_t msg_length;
359
360 /* The two names are sent with a '\0' delimiter between them. */
361 channel_name_len = strlen(channel_name) + 1;
362 pathname_len = strlen(pathname) + 1;
363
364 msg_length = sizeof(*msg) + channel_name_len + pathname_len;
365 msg = zmalloc(msg_length);
366 if (!msg) {
367 PERROR("zmalloc add_stream_2_11 command message");
368 ret = -1;
369 goto error;
370 }
371
372 assert(channel_name_len <= UINT32_MAX);
373 msg->channel_name_len = htobe32(channel_name_len);
374
375 assert(pathname_len <= UINT32_MAX);
376 msg->pathname_len = htobe32(pathname_len);
377
378 if (lttng_strncpy(msg->names, channel_name, channel_name_len)) {
379 ret = -1;
380 goto error;
381 }
382 if (lttng_strncpy(msg->names + channel_name_len, pathname, pathname_len)) {
383 ret = -1;
384 goto error;
385 }
386
387 msg->tracefile_size = htobe64(tracefile_size);
388 msg->tracefile_count = htobe64(tracefile_count);
389
390 /* Send command */
391 ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) msg, msg_length, 0);
392 if (ret < 0) {
393 goto error;
394 }
395 ret = 0;
396error:
397 free(msg);
398 return ret;
399}
400
00e2e675
DG
401/*
402 * Add stream on the relayd and assign stream handle to the stream_id argument.
403 *
404 * On success return 0 else return ret_code negative value.
405 */
6151a90f 406int relayd_add_stream(struct lttcomm_relayd_sock *rsock, const char *channel_name,
0f907de1 407 const char *pathname, uint64_t *stream_id,
0b50e4b3
JG
408 uint64_t tracefile_size, uint64_t tracefile_count,
409 uint64_t trace_archive_id)
00e2e675
DG
410{
411 int ret;
00e2e675
DG
412 struct lttcomm_relayd_status_stream reply;
413
414 /* Code flow error. Safety net. */
6151a90f 415 assert(rsock);
00e2e675
DG
416 assert(channel_name);
417 assert(pathname);
418
419 DBG("Relayd adding stream for channel name %s", channel_name);
420
0f907de1
JD
421 /* Compat with relayd 2.1 */
422 if (rsock->minor == 1) {
2f21a469
JR
423 /* For 2.1 */
424 ret = relayd_add_stream_2_1(rsock, channel_name, pathname);
425
426 } else if (rsock->minor > 1 && rsock->minor < 11) {
427 /* From 2.2 to 2.10 */
428 ret = relayd_add_stream_2_2(rsock, channel_name, pathname,
429 tracefile_size, tracefile_count);
0f907de1 430 } else {
2f21a469
JR
431 /* From 2.11 to ...*/
432 ret = relayd_add_stream_2_11(rsock, channel_name, pathname,
433 tracefile_size, tracefile_count);
434 }
0f907de1 435
2f21a469
JR
436 if (ret) {
437 ret = -1;
438 goto error;
00e2e675
DG
439 }
440
633d0084 441 /* Waiting for reply */
6151a90f 442 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
00e2e675
DG
443 if (ret < 0) {
444 goto error;
445 }
446
447 /* Back to host bytes order. */
448 reply.handle = be64toh(reply.handle);
449 reply.ret_code = be32toh(reply.ret_code);
450
451 /* Return session id or negative ret code. */
f73fabfd 452 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
453 ret = -1;
454 ERR("Relayd add stream replied error %d", reply.ret_code);
00e2e675
DG
455 } else {
456 /* Success */
457 ret = 0;
458 *stream_id = reply.handle;
459 }
460
77c7c900 461 DBG("Relayd stream added successfully with handle %" PRIu64,
2f21a469 462 reply.handle);
00e2e675
DG
463
464error:
465 return ret;
466}
467
a4baae1b
JD
468/*
469 * Inform the relay that all the streams for the current channel has been sent.
470 *
471 * On success return 0 else return ret_code negative value.
472 */
473int relayd_streams_sent(struct lttcomm_relayd_sock *rsock)
474{
475 int ret;
476 struct lttcomm_relayd_generic_reply reply;
477
478 /* Code flow error. Safety net. */
479 assert(rsock);
480
481 DBG("Relayd sending streams sent.");
482
483 /* This feature was introduced in 2.4, ignore it for earlier versions. */
484 if (rsock->minor < 4) {
485 ret = 0;
486 goto end;
487 }
488
489 /* Send command */
490 ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0);
491 if (ret < 0) {
492 goto error;
493 }
494
495 /* Waiting for reply */
496 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
497 if (ret < 0) {
498 goto error;
499 }
500
501 /* Back to host bytes order. */
502 reply.ret_code = be32toh(reply.ret_code);
503
504 /* Return session id or negative ret code. */
505 if (reply.ret_code != LTTNG_OK) {
506 ret = -1;
507 ERR("Relayd streams sent replied error %d", reply.ret_code);
508 goto error;
509 } else {
510 /* Success */
511 ret = 0;
512 }
513
514 DBG("Relayd streams sent success");
515
516error:
517end:
518 return ret;
519}
520
00e2e675
DG
521/*
522 * Check version numbers on the relayd.
d4519fa3
JD
523 * If major versions are compatible, we assign minor_to_use to the
524 * minor version of the procotol we are going to use for this session.
00e2e675 525 *
67d5aa28
JD
526 * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
527 * otherwise, or a negative value on network errors.
00e2e675 528 */
6151a90f 529int relayd_version_check(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
530{
531 int ret;
092b6259 532 struct lttcomm_relayd_version msg;
00e2e675
DG
533
534 /* Code flow error. Safety net. */
6151a90f 535 assert(rsock);
00e2e675 536
6151a90f
JD
537 DBG("Relayd version check for major.minor %u.%u", rsock->major,
538 rsock->minor);
00e2e675 539
53efb85a 540 memset(&msg, 0, sizeof(msg));
092b6259 541 /* Prepare network byte order before transmission. */
6151a90f
JD
542 msg.major = htobe32(rsock->major);
543 msg.minor = htobe32(rsock->minor);
092b6259 544
00e2e675 545 /* Send command */
6151a90f 546 ret = send_command(rsock, RELAYD_VERSION, (void *) &msg, sizeof(msg), 0);
00e2e675
DG
547 if (ret < 0) {
548 goto error;
549 }
550
20275fe8 551 /* Receive response */
6151a90f 552 ret = recv_reply(rsock, (void *) &msg, sizeof(msg));
00e2e675
DG
553 if (ret < 0) {
554 goto error;
555 }
556
557 /* Set back to host bytes order */
092b6259
DG
558 msg.major = be32toh(msg.major);
559 msg.minor = be32toh(msg.minor);
560
561 /*
562 * Only validate the major version. If the other side is higher,
563 * communication is not possible. Only major version equal can talk to each
564 * other. If the minor version differs, the lowest version is used by both
565 * sides.
092b6259 566 */
6151a90f 567 if (msg.major != rsock->major) {
d4519fa3 568 /* Not compatible */
67d5aa28 569 ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
d4519fa3 570 DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
6151a90f 571 msg.major, rsock->major);
092b6259 572 goto error;
00e2e675
DG
573 }
574
092b6259 575 /*
6151a90f
JD
576 * If the relayd's minor version is higher, it will adapt to our version so
577 * we can continue to use the latest relayd communication data structure.
578 * If the received minor version is higher, the relayd should adapt to us.
092b6259 579 */
6151a90f
JD
580 if (rsock->minor > msg.minor) {
581 rsock->minor = msg.minor;
d4519fa3 582 }
092b6259 583
d4519fa3
JD
584 /* Version number compatible */
585 DBG2("Relayd version is compatible, using protocol version %u.%u",
6151a90f 586 rsock->major, rsock->minor);
d4519fa3 587 ret = 0;
00e2e675
DG
588
589error:
590 return ret;
591}
592
00e2e675
DG
593/*
594 * Add stream on the relayd and assign stream handle to the stream_id argument.
595 *
596 * On success return 0 else return ret_code negative value.
597 */
6151a90f 598int relayd_send_metadata(struct lttcomm_relayd_sock *rsock, size_t len)
00e2e675
DG
599{
600 int ret;
601
602 /* Code flow error. Safety net. */
6151a90f 603 assert(rsock);
00e2e675 604
77c7c900 605 DBG("Relayd sending metadata of size %zu", len);
00e2e675
DG
606
607 /* Send command */
6151a90f 608 ret = send_command(rsock, RELAYD_SEND_METADATA, NULL, len, 0);
00e2e675
DG
609 if (ret < 0) {
610 goto error;
611 }
612
613 DBG2("Relayd metadata added successfully");
614
615 /*
616 * After that call, the metadata data MUST be sent to the relayd so the
617 * receive size on the other end matches the len of the metadata packet
633d0084 618 * header. This is why we don't wait for a reply here.
00e2e675
DG
619 */
620
621error:
622 return ret;
623}
624
625/*
6151a90f 626 * Connect to relay daemon with an allocated lttcomm_relayd_sock.
00e2e675 627 */
6151a90f 628int relayd_connect(struct lttcomm_relayd_sock *rsock)
00e2e675
DG
629{
630 /* Code flow error. Safety net. */
6151a90f 631 assert(rsock);
00e2e675 632
f96e4545
MD
633 if (!rsock->sock.ops) {
634 /*
635 * Attempting a connect on a non-initialized socket.
636 */
637 return -ECONNRESET;
638 }
639
00e2e675
DG
640 DBG3("Relayd connect ...");
641
6151a90f 642 return rsock->sock.ops->connect(&rsock->sock);
00e2e675
DG
643}
644
645/*
6151a90f 646 * Close relayd socket with an allocated lttcomm_relayd_sock.
ffe60014
DG
647 *
648 * If no socket operations are found, simply return 0 meaning that everything
649 * is fine. Without operations, the socket can not possibly be opened or used.
650 * This is possible if the socket was allocated but not created. However, the
651 * caller could simply use it to store a valid file descriptor for instance
652 * passed over a Unix socket and call this to cleanup but still without a valid
653 * ops pointer.
654 *
655 * Return the close returned value. On error, a negative value is usually
656 * returned back from close(2).
00e2e675 657 */
6151a90f 658int relayd_close(struct lttcomm_relayd_sock *rsock)
00e2e675 659{
ffe60014
DG
660 int ret;
661
00e2e675 662 /* Code flow error. Safety net. */
6151a90f 663 assert(rsock);
00e2e675 664
ffe60014 665 /* An invalid fd is fine, return success. */
6151a90f 666 if (rsock->sock.fd < 0) {
ffe60014
DG
667 ret = 0;
668 goto end;
669 }
670
6151a90f 671 DBG3("Relayd closing socket %d", rsock->sock.fd);
00e2e675 672
6151a90f
JD
673 if (rsock->sock.ops) {
674 ret = rsock->sock.ops->close(&rsock->sock);
ffe60014
DG
675 } else {
676 /* Default call if no specific ops found. */
6151a90f 677 ret = close(rsock->sock.fd);
ffe60014
DG
678 if (ret < 0) {
679 PERROR("relayd_close default close");
680 }
681 }
f96e4545 682 rsock->sock.fd = -1;
ffe60014
DG
683
684end:
685 return ret;
00e2e675
DG
686}
687
688/*
689 * Send data header structure to the relayd.
690 */
6151a90f 691int relayd_send_data_hdr(struct lttcomm_relayd_sock *rsock,
00e2e675
DG
692 struct lttcomm_relayd_data_hdr *hdr, size_t size)
693{
694 int ret;
695
696 /* Code flow error. Safety net. */
6151a90f 697 assert(rsock);
00e2e675
DG
698 assert(hdr);
699
f96e4545
MD
700 if (rsock->sock.fd < 0) {
701 return -ECONNRESET;
702 }
703
8fd623e0 704 DBG3("Relayd sending data header of size %zu", size);
00e2e675
DG
705
706 /* Again, safety net */
707 if (size == 0) {
708 size = sizeof(struct lttcomm_relayd_data_hdr);
709 }
710
711 /* Only send data header. */
6151a90f 712 ret = rsock->sock.ops->sendmsg(&rsock->sock, hdr, size, 0);
00e2e675 713 if (ret < 0) {
8994307f 714 ret = -errno;
00e2e675
DG
715 goto error;
716 }
717
718 /*
719 * The data MUST be sent right after that command for the receive on the
720 * other end to match the size in the header.
721 */
722
723error:
724 return ret;
725}
173af62f
DG
726
727/*
728 * Send close stream command to the relayd.
729 */
6151a90f 730int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
173af62f
DG
731 uint64_t last_net_seq_num)
732{
733 int ret;
734 struct lttcomm_relayd_close_stream msg;
735 struct lttcomm_relayd_generic_reply reply;
736
737 /* Code flow error. Safety net. */
6151a90f 738 assert(rsock);
173af62f 739
77c7c900 740 DBG("Relayd closing stream id %" PRIu64, stream_id);
173af62f 741
53efb85a 742 memset(&msg, 0, sizeof(msg));
173af62f
DG
743 msg.stream_id = htobe64(stream_id);
744 msg.last_net_seq_num = htobe64(last_net_seq_num);
745
746 /* Send command */
6151a90f 747 ret = send_command(rsock, RELAYD_CLOSE_STREAM, (void *) &msg, sizeof(msg), 0);
173af62f
DG
748 if (ret < 0) {
749 goto error;
750 }
751
20275fe8 752 /* Receive response */
6151a90f 753 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
173af62f
DG
754 if (ret < 0) {
755 goto error;
756 }
757
758 reply.ret_code = be32toh(reply.ret_code);
759
760 /* Return session id or negative ret code. */
f73fabfd 761 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
762 ret = -1;
763 ERR("Relayd close stream replied error %d", reply.ret_code);
173af62f
DG
764 } else {
765 /* Success */
766 ret = 0;
767 }
768
77c7c900 769 DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
173af62f
DG
770
771error:
772 return ret;
773}
c8f59ee5
DG
774
775/*
776 * Check for data availability for a given stream id.
777 *
6d805429 778 * Return 0 if NOT pending, 1 if so and a negative value on error.
c8f59ee5 779 */
6151a90f 780int relayd_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
c8f59ee5
DG
781 uint64_t last_net_seq_num)
782{
783 int ret;
6d805429 784 struct lttcomm_relayd_data_pending msg;
c8f59ee5
DG
785 struct lttcomm_relayd_generic_reply reply;
786
787 /* Code flow error. Safety net. */
6151a90f 788 assert(rsock);
c8f59ee5 789
6d805429 790 DBG("Relayd data pending for stream id %" PRIu64, stream_id);
c8f59ee5 791
53efb85a 792 memset(&msg, 0, sizeof(msg));
c8f59ee5
DG
793 msg.stream_id = htobe64(stream_id);
794 msg.last_net_seq_num = htobe64(last_net_seq_num);
795
796 /* Send command */
6151a90f 797 ret = send_command(rsock, RELAYD_DATA_PENDING, (void *) &msg,
c8f59ee5
DG
798 sizeof(msg), 0);
799 if (ret < 0) {
800 goto error;
801 }
802
20275fe8 803 /* Receive response */
6151a90f 804 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
805 if (ret < 0) {
806 goto error;
807 }
808
809 reply.ret_code = be32toh(reply.ret_code);
810
811 /* Return session id or negative ret code. */
812 if (reply.ret_code >= LTTNG_OK) {
bb63afd9 813 ERR("Relayd data pending replied error %d", reply.ret_code);
c8f59ee5
DG
814 }
815
816 /* At this point, the ret code is either 1 or 0 */
817 ret = reply.ret_code;
818
6d805429 819 DBG("Relayd data is %s pending for stream id %" PRIu64,
9dd26bb9 820 ret == 1 ? "" : "NOT", stream_id);
c8f59ee5
DG
821
822error:
823 return ret;
824}
825
826/*
827 * Check on the relayd side for a quiescent state on the control socket.
828 */
6151a90f 829int relayd_quiescent_control(struct lttcomm_relayd_sock *rsock,
ad7051c0 830 uint64_t metadata_stream_id)
c8f59ee5
DG
831{
832 int ret;
ad7051c0 833 struct lttcomm_relayd_quiescent_control msg;
c8f59ee5
DG
834 struct lttcomm_relayd_generic_reply reply;
835
836 /* Code flow error. Safety net. */
6151a90f 837 assert(rsock);
c8f59ee5
DG
838
839 DBG("Relayd checking quiescent control state");
840
53efb85a 841 memset(&msg, 0, sizeof(msg));
ad7051c0
DG
842 msg.stream_id = htobe64(metadata_stream_id);
843
c8f59ee5 844 /* Send command */
6151a90f 845 ret = send_command(rsock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
c8f59ee5
DG
846 if (ret < 0) {
847 goto error;
848 }
849
20275fe8 850 /* Receive response */
6151a90f 851 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
c8f59ee5
DG
852 if (ret < 0) {
853 goto error;
854 }
855
856 reply.ret_code = be32toh(reply.ret_code);
857
858 /* Return session id or negative ret code. */
859 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
860 ret = -1;
861 ERR("Relayd quiescent control replied error %d", reply.ret_code);
c8f59ee5
DG
862 goto error;
863 }
864
865 /* Control socket is quiescent */
6d805429 866 return 0;
c8f59ee5
DG
867
868error:
869 return ret;
870}
f7079f67
DG
871
872/*
873 * Begin a data pending command for a specific session id.
874 */
6151a90f 875int relayd_begin_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id)
f7079f67
DG
876{
877 int ret;
878 struct lttcomm_relayd_begin_data_pending msg;
879 struct lttcomm_relayd_generic_reply reply;
880
881 /* Code flow error. Safety net. */
6151a90f 882 assert(rsock);
f7079f67
DG
883
884 DBG("Relayd begin data pending");
885
53efb85a 886 memset(&msg, 0, sizeof(msg));
f7079f67
DG
887 msg.session_id = htobe64(id);
888
889 /* Send command */
6151a90f 890 ret = send_command(rsock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
891 if (ret < 0) {
892 goto error;
893 }
894
20275fe8 895 /* Receive response */
6151a90f 896 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
897 if (ret < 0) {
898 goto error;
899 }
900
901 reply.ret_code = be32toh(reply.ret_code);
902
903 /* Return session id or negative ret code. */
904 if (reply.ret_code != LTTNG_OK) {
bb63afd9
DG
905 ret = -1;
906 ERR("Relayd begin data pending replied error %d", reply.ret_code);
f7079f67
DG
907 goto error;
908 }
909
910 return 0;
911
912error:
913 return ret;
914}
915
916/*
917 * End a data pending command for a specific session id.
918 *
919 * Return 0 on success and set is_data_inflight to 0 if no data is being
920 * streamed or 1 if it is the case.
921 */
6151a90f 922int relayd_end_data_pending(struct lttcomm_relayd_sock *rsock, uint64_t id,
f7079f67
DG
923 unsigned int *is_data_inflight)
924{
af6c30b5 925 int ret, recv_ret;
f7079f67
DG
926 struct lttcomm_relayd_end_data_pending msg;
927 struct lttcomm_relayd_generic_reply reply;
928
929 /* Code flow error. Safety net. */
6151a90f 930 assert(rsock);
f7079f67
DG
931
932 DBG("Relayd end data pending");
933
53efb85a 934 memset(&msg, 0, sizeof(msg));
f7079f67
DG
935 msg.session_id = htobe64(id);
936
937 /* Send command */
6151a90f 938 ret = send_command(rsock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
f7079f67
DG
939 if (ret < 0) {
940 goto error;
941 }
942
20275fe8 943 /* Receive response */
6151a90f 944 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
f7079f67
DG
945 if (ret < 0) {
946 goto error;
947 }
948
af6c30b5
DG
949 recv_ret = be32toh(reply.ret_code);
950 if (recv_ret < 0) {
951 ret = recv_ret;
f7079f67
DG
952 goto error;
953 }
954
af6c30b5 955 *is_data_inflight = recv_ret;
f7079f67 956
af6c30b5 957 DBG("Relayd end data pending is data inflight: %d", recv_ret);
f7079f67
DG
958
959 return 0;
960
961error:
962 return ret;
963}
1c20f0e2
JD
964
965/*
966 * Send index to the relayd.
967 */
968int relayd_send_index(struct lttcomm_relayd_sock *rsock,
50adc264 969 struct ctf_packet_index *index, uint64_t relay_stream_id,
1c20f0e2
JD
970 uint64_t net_seq_num)
971{
972 int ret;
973 struct lttcomm_relayd_index msg;
974 struct lttcomm_relayd_generic_reply reply;
975
976 /* Code flow error. Safety net. */
977 assert(rsock);
978
979 if (rsock->minor < 4) {
980 DBG("Not sending indexes before protocol 2.4");
981 ret = 0;
982 goto error;
983 }
984
985 DBG("Relayd sending index for stream ID %" PRIu64, relay_stream_id);
986
53efb85a 987 memset(&msg, 0, sizeof(msg));
1c20f0e2
JD
988 msg.relay_stream_id = htobe64(relay_stream_id);
989 msg.net_seq_num = htobe64(net_seq_num);
990
991 /* The index is already in big endian. */
992 msg.packet_size = index->packet_size;
993 msg.content_size = index->content_size;
994 msg.timestamp_begin = index->timestamp_begin;
995 msg.timestamp_end = index->timestamp_end;
996 msg.events_discarded = index->events_discarded;
997 msg.stream_id = index->stream_id;
998
234cd636
JD
999 if (rsock->minor >= 8) {
1000 msg.stream_instance_id = index->stream_instance_id;
1001 msg.packet_seq_num = index->packet_seq_num;
1002 }
1003
1c20f0e2 1004 /* Send command */
f8f3885c
MD
1005 ret = send_command(rsock, RELAYD_SEND_INDEX, &msg,
1006 lttcomm_relayd_index_len(lttng_to_index_major(rsock->major,
1007 rsock->minor),
1008 lttng_to_index_minor(rsock->major, rsock->minor)),
1009 0);
1c20f0e2
JD
1010 if (ret < 0) {
1011 goto error;
1012 }
1013
1014 /* Receive response */
1015 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1016 if (ret < 0) {
1017 goto error;
1018 }
1019
1020 reply.ret_code = be32toh(reply.ret_code);
1021
1022 /* Return session id or negative ret code. */
1023 if (reply.ret_code != LTTNG_OK) {
1024 ret = -1;
1025 ERR("Relayd send index replied error %d", reply.ret_code);
1026 } else {
1027 /* Success */
1028 ret = 0;
1029 }
1030
1031error:
1032 return ret;
1033}
93ec662e
JD
1034
1035/*
1036 * Ask the relay to reset the metadata trace file (regeneration).
1037 */
1038int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
1039 uint64_t stream_id, uint64_t version)
1040{
1041 int ret;
1042 struct lttcomm_relayd_reset_metadata msg;
1043 struct lttcomm_relayd_generic_reply reply;
1044
1045 /* Code flow error. Safety net. */
1046 assert(rsock);
1047
1048 /* Should have been prevented by the sessiond. */
1049 if (rsock->minor < 8) {
1050 ERR("Metadata regeneration unsupported before 2.8");
1051 ret = -1;
1052 goto error;
1053 }
1054
1055 DBG("Relayd reset metadata stream id %" PRIu64, stream_id);
1056
1057 memset(&msg, 0, sizeof(msg));
1058 msg.stream_id = htobe64(stream_id);
1059 msg.version = htobe64(version);
1060
1061 /* Send command */
1062 ret = send_command(rsock, RELAYD_RESET_METADATA, (void *) &msg, sizeof(msg), 0);
1063 if (ret < 0) {
1064 goto error;
1065 }
1066
1067 /* Receive response */
1068 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1069 if (ret < 0) {
1070 goto error;
1071 }
1072
1073 reply.ret_code = be32toh(reply.ret_code);
1074
1075 /* Return session id or negative ret code. */
1076 if (reply.ret_code != LTTNG_OK) {
1077 ret = -1;
1078 ERR("Relayd reset metadata replied error %d", reply.ret_code);
1079 } else {
1080 /* Success */
1081 ret = 0;
1082 }
1083
1084 DBG("Relayd reset metadata stream id %" PRIu64 " successfully", stream_id);
1085
1086error:
1087 return ret;
1088}
a1ae2ea5 1089
d73bf3d7
JD
1090int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
1091 const char *new_pathname, uint64_t new_chunk_id,
1092 uint64_t seq_num)
1093{
1094 int ret;
1095 struct lttcomm_relayd_rotate_stream *msg = NULL;
1096 struct lttcomm_relayd_generic_reply reply;
1097 size_t len;
1098 int msg_len;
1099
1100 /* Code flow error. Safety net. */
1101 assert(rsock);
1102
1103 DBG("Sending rotate stream id %" PRIu64 " command to relayd", stream_id);
1104
1105 /* Account for the trailing NULL. */
27283937 1106 len = lttng_strnlen(new_pathname, LTTNG_PATH_MAX) + 1;
d73bf3d7
JD
1107 if (len > LTTNG_PATH_MAX) {
1108 ERR("Path used in relayd rotate stream command exceeds the maximal allowed length");
1109 ret = -1;
1110 goto error;
1111 }
1112
1113 msg_len = offsetof(struct lttcomm_relayd_rotate_stream, new_pathname) + len;
1114 msg = zmalloc(msg_len);
1115 if (!msg) {
1116 PERROR("Failed to allocate relayd rotate stream command of %d bytes",
1117 msg_len);
1118 ret = -1;
1119 goto error;
1120 }
1121
1122 if (lttng_strncpy(msg->new_pathname, new_pathname, len)) {
1123 ret = -1;
1124 ERR("Failed to copy relayd rotate stream command's new path name");
1125 goto error;
1126 }
1127
1128 msg->pathname_length = htobe32(len);
1129 msg->stream_id = htobe64(stream_id);
1130 msg->new_chunk_id = htobe64(new_chunk_id);
1131 /*
1132 * The seq_num is invalid for metadata streams, but it is ignored on
1133 * the relay.
1134 */
1135 msg->rotate_at_seq_num = htobe64(seq_num);
1136
1137 /* Send command. */
1138 ret = send_command(rsock, RELAYD_ROTATE_STREAM, (void *) msg, msg_len, 0);
1139 if (ret < 0) {
1140 ERR("Send rotate command");
1141 goto error;
1142 }
1143
1144 /* Receive response. */
1145 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1146 if (ret < 0) {
1147 ERR("Receive rotate reply");
1148 goto error;
1149 }
1150
1151 reply.ret_code = be32toh(reply.ret_code);
1152
1153 /* Return session id or negative ret code. */
1154 if (reply.ret_code != LTTNG_OK) {
1155 ret = -1;
1156 ERR("Relayd rotate stream replied error %d", reply.ret_code);
1157 } else {
1158 /* Success. */
1159 ret = 0;
1160 DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
1161 }
1162
1163error:
1164 free(msg);
1165 return ret;
1166}
1167
00fb02ac
JD
1168int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
1169 const char *old_path, const char *new_path)
1170{
1171 int ret;
1172 struct lttcomm_relayd_rotate_rename *msg = NULL;
1173 struct lttcomm_relayd_generic_reply reply;
1174 size_t old_path_length, new_path_length;
1175 size_t msg_length;
1176
1177 /* Code flow error. Safety net. */
1178 assert(rsock);
1179
1180 DBG("Relayd rename chunk %s to %s", old_path, new_path);
1181
1182 /* The two paths are sent with a '\0' delimiter between them. */
1183 old_path_length = strlen(old_path) + 1;
1184 new_path_length = strlen(new_path) + 1;
1185
1186 msg_length = sizeof(*msg) + old_path_length + new_path_length;
1187 msg = zmalloc(msg_length);
1188 if (!msg) {
1189 PERROR("zmalloc rotate-rename command message");
1190 ret = -1;
1191 goto error;
1192 }
1193
1194 assert(old_path_length <= UINT32_MAX);
1195 msg->old_path_length = htobe32(old_path_length);
1196
1197 assert(new_path_length <= UINT32_MAX);
1198 msg->new_path_length = htobe32(new_path_length);
1199
1200 strcpy(msg->paths, old_path);
1201 strcpy(msg->paths + old_path_length, new_path);
1202
1203 /* Send command */
1204 ret = send_command(rsock, RELAYD_ROTATE_RENAME, (const void *) msg,
1205 msg_length, 0);
1206 if (ret < 0) {
1207 goto error;
1208 }
1209
1210 /* Receive response */
1211 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1212 if (ret < 0) {
1213 goto error;
1214 }
1215
1216 reply.ret_code = be32toh(reply.ret_code);
1217
1218 /* Return session id or negative ret code. */
1219 if (reply.ret_code != LTTNG_OK) {
1220 ret = -1;
1221 ERR("Relayd rotate rename replied error %d", reply.ret_code);
1222 } else {
1223 /* Success */
1224 ret = 0;
1225 }
1226
1227 DBG("Relayd rotate rename completed successfully");
1228
1229error:
1230 free(msg);
1231 return ret;
1232}
1233
d88744a4
JD
1234int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
1235{
1236 int ret;
1237 struct lttcomm_relayd_rotate_pending msg;
1238 struct lttcomm_relayd_rotate_pending_reply reply;
1239
1240 /* Code flow error. Safety net. */
1241 assert(rsock);
1242
1243 DBG("Querying relayd for rotate pending with chunk_id %" PRIu64,
1244 chunk_id);
1245
1246 memset(&msg, 0, sizeof(msg));
1247 msg.chunk_id = htobe64(chunk_id);
1248
1249 /* Send command */
1250 ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg,
1251 sizeof(msg), 0);
1252 if (ret < 0) {
1253 goto error;
1254 }
1255
1256 /* Receive response */
1257 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1258 if (ret < 0) {
1259 goto error;
1260 }
1261
1262 reply.generic.ret_code = be32toh(reply.generic.ret_code);
1263
1264 /* Return session id or negative ret code. */
1265 if (reply.generic.ret_code != LTTNG_OK) {
1266 ret = -reply.generic.ret_code;
1267 ERR("Relayd rotate pending replied with error %d", ret);
1268 goto error;
1269 } else {
1270 /* No error, just rotate pending state */
1271 if (reply.is_pending == 0 || reply.is_pending == 1) {
1272 ret = reply.is_pending;
1273 DBG("Relayd rotate pending command completed successfully with result \"%s\"",
1274 ret ? "rotation pending" : "rotation NOT pending");
1275 } else {
1276 ret = -LTTNG_ERR_UNK;
1277 }
1278 }
1279
1280error:
1281 return ret;
1282}
1283
a1ae2ea5
JD
1284int relayd_mkdir(struct lttcomm_relayd_sock *rsock, const char *path)
1285{
1286 int ret;
1287 struct lttcomm_relayd_mkdir *msg;
1288 struct lttcomm_relayd_generic_reply reply;
1289 size_t len;
1290
1291 /* Code flow error. Safety net. */
1292 assert(rsock);
1293
1294 DBG("Relayd mkdir path %s", path);
1295
1296 len = strlen(path) + 1;
1297 msg = zmalloc(sizeof(msg->length) + len);
1298 if (!msg) {
1299 PERROR("Alloc mkdir msg");
1300 ret = -1;
1301 goto error;
1302 }
1303 msg->length = htobe32((uint32_t) len);
1304
1305 if (lttng_strncpy(msg->path, path, len)) {
1306 ret = -1;
1307 goto error;
1308 }
1309
1310 /* Send command */
1311 ret = send_command(rsock, RELAYD_MKDIR, (void *) msg,
1312 sizeof(msg->length) + len, 0);
1313 if (ret < 0) {
1314 goto error;
1315 }
1316
1317 /* Receive response */
1318 ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
1319 if (ret < 0) {
1320 goto error;
1321 }
1322
1323 reply.ret_code = be32toh(reply.ret_code);
1324
1325 /* Return session id or negative ret code. */
1326 if (reply.ret_code != LTTNG_OK) {
1327 ret = -1;
1328 ERR("Relayd mkdir replied error %d", reply.ret_code);
1329 } else {
1330 /* Success */
1331 ret = 0;
1332 }
1333
1334 DBG("Relayd mkdir completed successfully");
1335
1336error:
1337 free(msg);
1338 return ret;
a1ae2ea5 1339}
This page took 0.11635 seconds and 5 git commands to generate.