src.ctf.lttng-live: use viewer_connection_close_socket more
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7cdc2bab
MD
6 */
7
c802cacb 8#include <glib.h>
7cdc2bab 9#include <stdint.h>
3c22a242 10#include <stdio.h>
3c22a242 11
c802cacb
SM
12#include <babeltrace2/babeltrace.h>
13
578e048b 14#include "common/common.h"
83ad336c 15#include "compat/endian.h" /* IWYU pragma: keep */
d721bef8 16#include "cpp-common/bt2s/make-unique.hpp"
7cdc2bab 17
c802cacb 18#include "data-stream.hpp"
087cd0f5 19#include "lttng-live.hpp"
087cd0f5 20#include "lttng-viewer-abi.hpp"
087cd0f5 21#include "metadata.hpp"
c802cacb 22#include "viewer-connection.hpp"
7cdc2bab 23
0f5c5d5c 24#define viewer_handle_send_recv_status(_status, _action, _msg_str) \
4164020e
SM
25 do { \
26 switch (_status) { \
27 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
28 break; \
29 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
0f5c5d5c
SM
30 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, \
31 "Error " _action " " _msg_str); \
4164020e
SM
32 break; \
33 default: \
34 bt_common_abort(); \
35 } \
36 } while (0)
37
0f5c5d5c
SM
38#define viewer_handle_send_status(_status, _msg_str) \
39 viewer_handle_send_recv_status(_status, "sending", _msg_str)
4164020e 40
0f5c5d5c
SM
41#define viewer_handle_recv_status(_status, _msg_str) \
42 viewer_handle_send_recv_status(_status, "receiving", _msg_str)
4164020e 43
0f5c5d5c 44#define LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO(_msg, _fmt, ...) \
4164020e 45 do { \
0f5c5d5c
SM
46 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, _msg ": {}" _fmt, \
47 bt_socket_errormsg(), ##__VA_ARGS__); \
4164020e
SM
48 } while (0)
49
4164020e
SM
50static inline enum lttng_live_iterator_status
51viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 52{
4164020e
SM
53 switch (viewer_status) {
54 case LTTNG_LIVE_VIEWER_STATUS_OK:
55 return LTTNG_LIVE_ITERATOR_STATUS_OK;
56 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
57 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
58 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
59 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
60 }
61
62 bt_common_abort();
f79c2d7a
FD
63}
64
4164020e
SM
65static inline enum ctf_msg_iter_medium_status
66viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 67{
4164020e
SM
68 switch (viewer_status) {
69 case LTTNG_LIVE_VIEWER_STATUS_OK:
70 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
71 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
72 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
73 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
74 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
75 }
76
77 bt_common_abort();
f79c2d7a
FD
78}
79
4164020e 80static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
b197ca37 81{
e5694f0e
SM
82 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
83 return;
84 }
85
4164020e
SM
86 int ret = bt_socket_close(viewer_connection->control_sock);
87 if (ret == -1) {
0f5c5d5c
SM
88 BT_CPPLOGW_ERRNO_SPEC(viewer_connection->logger,
89 "Error closing viewer connection socket: ", ".");
4164020e
SM
90 }
91
92 viewer_connection->control_sock = BT_INVALID_SOCKET;
b197ca37
FD
93}
94
f79c2d7a
FD
95/*
96 * This function receives a message from the Relay daemon.
97 * If it received the entire message, it returns _OK,
98 * If it's interrupted, it returns _INTERRUPTED,
99 * otherwise, it returns _ERROR.
100 */
4164020e
SM
101static enum lttng_live_viewer_status
102lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
7cdc2bab 103{
4164020e 104 ssize_t received;
4164020e
SM
105 size_t total_received = 0, to_receive = len;
106 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
107 enum lttng_live_viewer_status status;
108 BT_SOCKET sock = viewer_connection->control_sock;
109
110 /*
111 * Receive a message from the Relay.
112 */
113 do {
114 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
115 if (received == BT_SOCKET_ERROR) {
116 if (bt_socket_interrupted()) {
117 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
118 /*
119 * This interruption was due to a
120 * SIGINT and the graph is being torn
121 * down.
122 */
123 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
124 lttng_live_msg_iter->was_interrupted = true;
125 goto end;
126 } else {
127 /*
128 * A signal was received, but the graph
129 * is not being torn down. Carry on.
130 */
131 continue;
132 }
133 } else {
134 /*
135 * For any other types of socket error, close
136 * the socket and return an error.
137 */
0f5c5d5c 138 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error receiving from Relay", ".");
4164020e
SM
139
140 viewer_connection_close_socket(viewer_connection);
141 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
142 goto end;
143 }
144 } else if (received == 0) {
145 /*
146 * The recv() call returned 0. This means the
147 * connection was orderly shutdown from the other peer.
148 * If that happens when we are trying to receive
149 * a message from it, it means something when wrong.
150 * Close the socket and return an error.
151 */
0f5c5d5c
SM
152 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
153 "Remote side has closed connection");
4164020e
SM
154 viewer_connection_close_socket(viewer_connection);
155 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
156 goto end;
157 }
158
159 BT_ASSERT(received <= to_receive);
160 total_received += received;
161 to_receive -= received;
162
163 } while (to_receive > 0);
164
165 BT_ASSERT(total_received == len);
166 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
167
168end:
4164020e 169 return status;
7cdc2bab
MD
170}
171
f79c2d7a
FD
172/*
173 * This function sends a message to the Relay daemon.
174 * If it send the message, it returns _OK,
175 * If it's interrupted, it returns _INTERRUPTED,
176 * otherwise, it returns _ERROR.
177 */
4164020e
SM
178static enum lttng_live_viewer_status
179lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
7cdc2bab 180{
4164020e 181 enum lttng_live_viewer_status status;
4164020e
SM
182 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
183 BT_SOCKET sock = viewer_connection->control_sock;
184 size_t to_send = len;
185 ssize_t total_sent = 0;
186
187 do {
188 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
189 if (sent == BT_SOCKET_ERROR) {
190 if (bt_socket_interrupted()) {
191 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
192 /*
193 * This interruption was a SIGINT and
194 * the graph is being teared down.
195 */
196 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
197 lttng_live_msg_iter->was_interrupted = true;
198 goto end;
199 } else {
200 /*
201 * A signal was received, but the graph
202 * is not being teared down. Carry on.
203 */
204 continue;
205 }
206 } else {
207 /*
208 * For any other types of socket error, close
209 * the socket and return an error.
210 */
0f5c5d5c 211 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error sending to Relay", ".");
4164020e
SM
212
213 viewer_connection_close_socket(viewer_connection);
214 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
215 goto end;
216 }
217 }
218
219 BT_ASSERT(sent <= to_send);
220 total_sent += sent;
221 to_send -= sent;
222
223 } while (to_send > 0);
224
225 BT_ASSERT(total_sent == len);
226 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
227
228end:
4164020e 229 return status;
7cdc2bab
MD
230}
231
4164020e 232static int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 233{
4164020e 234 char error_buf[256] = {0};
dd420a9b 235 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
9d0b798a 236 bt_common_lttng_live_url_parts_deleter partsDeleter {lttng_live_url_parts};
4164020e 237 int ret = -1;
4164020e 238
30d81897 239 if (viewer_connection->url.empty()) {
4164020e
SM
240 goto end;
241 }
242
30d81897
SM
243 lttng_live_url_parts = bt_common_parse_lttng_live_url(viewer_connection->url.c_str(), error_buf,
244 sizeof(error_buf));
4164020e 245 if (!lttng_live_url_parts.proto) {
0f5c5d5c
SM
246 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Invalid LTTng live URL format: {}",
247 error_buf);
4164020e
SM
248 goto end;
249 }
198e7911 250 viewer_connection->proto.reset(lttng_live_url_parts.proto);
4164020e
SM
251 lttng_live_url_parts.proto = NULL;
252
198e7911 253 viewer_connection->relay_hostname.reset(lttng_live_url_parts.hostname);
4164020e
SM
254 lttng_live_url_parts.hostname = NULL;
255
256 if (lttng_live_url_parts.port >= 0) {
257 viewer_connection->port = lttng_live_url_parts.port;
258 } else {
259 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
260 }
261
198e7911 262 viewer_connection->target_hostname.reset(lttng_live_url_parts.target_hostname);
4164020e
SM
263 lttng_live_url_parts.target_hostname = NULL;
264
265 if (lttng_live_url_parts.session_name) {
198e7911 266 viewer_connection->session_name.reset(lttng_live_url_parts.session_name);
4164020e
SM
267 lttng_live_url_parts.session_name = NULL;
268 }
269
270 ret = 0;
7cdc2bab
MD
271
272end:
4164020e 273 return ret;
7cdc2bab
MD
274}
275
4164020e
SM
276static enum lttng_live_viewer_status
277lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab 278{
4164020e
SM
279 struct lttng_viewer_cmd cmd;
280 struct lttng_viewer_connect connect;
281 enum lttng_live_viewer_status status;
4164020e
SM
282 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
283 char cmd_buf[cmd_buf_len];
284
0f5c5d5c
SM
285 BT_CPPLOGD_SPEC(viewer_connection->logger,
286 "Handshaking with the relay daemon: cmd={}, major-version={}, minor-version={}",
287 LTTNG_VIEWER_CONNECT, LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
4164020e
SM
288
289 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
290 cmd.data_size = htobe64((uint64_t) sizeof(connect));
291 cmd.cmd_version = htobe32(0);
292
293 connect.viewer_session_id = -1ULL; /* will be set on recv */
294 connect.major = htobe32(LTTNG_LIVE_MAJOR);
295 connect.minor = htobe32(LTTNG_LIVE_MINOR);
296 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
297
298 /*
299 * Merge the cmd and connection request to prevent a write-write
300 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
301 * second write to be performed quickly in presence of Nagle's algorithm
302 */
303 memcpy(cmd_buf, &cmd, sizeof(cmd));
304 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
305
306 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
307 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 308 viewer_handle_send_status(status, "viewer connect command");
4164020e
SM
309 goto end;
310 }
311
312 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
313 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 314 viewer_handle_recv_status(status, "viewer connect reply");
4164020e
SM
315 goto end;
316 }
317
0f5c5d5c
SM
318 BT_CPPLOGI_SPEC(viewer_connection->logger, "Received viewer session ID : {}",
319 (uint64_t) be64toh(connect.viewer_session_id));
320 BT_CPPLOGI_SPEC(viewer_connection->logger, "Relayd version : {}.{}", be32toh(connect.major),
321 be32toh(connect.minor));
4164020e
SM
322
323 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
0f5c5d5c
SM
324 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
325 "Incompatible lttng-relayd protocol");
4164020e
SM
326 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
327 goto end;
328 }
329 /* Use the smallest protocol version implemented. */
330 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
331 viewer_connection->minor = be32toh(connect.minor);
332 } else {
333 viewer_connection->minor = LTTNG_LIVE_MINOR;
334 }
335 viewer_connection->major = LTTNG_LIVE_MAJOR;
336
337 status = LTTNG_LIVE_VIEWER_STATUS_OK;
338
339 goto end;
f79c2d7a
FD
340
341end:
4164020e 342 return status;
7cdc2bab
MD
343}
344
4164020e
SM
345static enum lttng_live_viewer_status
346lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 347{
4164020e
SM
348 struct hostent *host;
349 struct sockaddr_in server_addr;
350 enum lttng_live_viewer_status status;
4164020e
SM
351
352 if (parse_url(viewer_connection)) {
0f5c5d5c 353 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to parse URL");
4164020e
SM
354 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
355 goto error;
356 }
357
0f5c5d5c
SM
358 BT_CPPLOGD_SPEC(
359 viewer_connection->logger,
360 "Connecting to hostname : {}, port : {}, target hostname : {}, session name : {}, proto : {}",
4164020e
SM
361 viewer_connection->relay_hostname->str, viewer_connection->port,
362 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
363 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
364 viewer_connection->proto->str);
365
366 host = gethostbyname(viewer_connection->relay_hostname->str);
367 if (!host) {
0f5c5d5c
SM
368 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
369 "Cannot lookup hostname: hostname=\"{}\"",
370 viewer_connection->relay_hostname->str);
4164020e
SM
371 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
372 goto error;
373 }
374
375 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
0f5c5d5c
SM
376 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Socket creation failed: {}",
377 bt_socket_errormsg());
4164020e
SM
378 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
379 goto error;
380 }
381
382 server_addr.sin_family = AF_INET;
383 server_addr.sin_port = htons(viewer_connection->port);
384 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
385 memset(&(server_addr.sin_zero), 0, 8);
386
387 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
388 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
0f5c5d5c
SM
389 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Connection failed: {}",
390 bt_socket_errormsg());
4164020e
SM
391 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
392 goto error;
393 }
394
395 status = lttng_live_handshake(viewer_connection);
396
397 /*
398 * Only print error and append cause in case of error. not in case of
399 * interruption.
400 */
401 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 402 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Viewer handshake failed");
4164020e
SM
403 goto error;
404 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
405 goto end;
406 }
407
408 goto end;
7cdc2bab
MD
409
410error:
e5694f0e
SM
411 viewer_connection_close_socket(viewer_connection);
412
f79c2d7a 413end:
4164020e 414 return status;
7cdc2bab
MD
415}
416
bf35aaef
SM
417static int list_update_session(const bt2::ArrayValue results,
418 const struct lttng_viewer_session *session, bool *_found,
419 struct live_viewer_connection *viewer_connection)
7cdc2bab 420{
4164020e
SM
421 bool found = false;
422
bf35aaef
SM
423 for (const auto value : results) {
424 const auto map = value.asMap();
425 const auto hostnameVal = map["target-hostname"];
4164020e 426
bf35aaef 427 if (!hostnameVal) {
0f5c5d5c
SM
428 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
429 "Error borrowing \"target-hostname\" entry.");
bf35aaef 430 return -1;
4164020e 431 }
bf35aaef
SM
432
433 const auto sessionNameVal = map["session-name"];
434
435 if (!sessionNameVal) {
0f5c5d5c
SM
436 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
437 "Error borrowing \"session-name\" entry.");
bf35aaef 438 return -1;
4164020e 439 }
bf35aaef
SM
440
441 const auto hostname_str = hostnameVal->asString().value();
442 const auto session_name_str = sessionNameVal->asString().value();
4164020e
SM
443
444 if (strcmp(session->hostname, hostname_str) == 0 &&
445 strcmp(session->session_name, session_name_str) == 0) {
4164020e
SM
446 uint32_t streams = be32toh(session->streams);
447 uint32_t clients = be32toh(session->clients);
448
449 found = true;
450
bf35aaef
SM
451 const auto streamCountVal = map["stream-count"];
452
453 if (!streamCountVal) {
0f5c5d5c
SM
454 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
455 "Error borrowing \"stream-count\" entry.");
bf35aaef 456 return -1;
4164020e 457 }
bf35aaef
SM
458
459 auto val = streamCountVal->asUnsignedInteger().value();
460
4164020e
SM
461 /* sum */
462 val += streams;
bf35aaef 463 streamCountVal->asUnsignedInteger().value(val);
4164020e 464
bf35aaef
SM
465 const auto clientCountVal = map["client-count"];
466
467 if (!clientCountVal) {
0f5c5d5c
SM
468 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
469 "Error borrowing \"client-count\" entry.");
bf35aaef 470 return -1;
4164020e 471 }
bf35aaef
SM
472
473 val = clientCountVal->asUnsignedInteger().value();
474
4164020e 475 /* max */
bf35aaef
SM
476 val = std::max<uint64_t>(clients, val);
477 clientCountVal->asUnsignedInteger().value(val);
4164020e
SM
478 }
479
480 if (found) {
481 break;
482 }
483 }
bf35aaef 484
4164020e 485 *_found = found;
bf35aaef 486 return 0;
7cdc2bab
MD
487}
488
bf35aaef 489static int list_append_session(const bt2::ArrayValue results, const std::string& base_url,
4164020e
SM
490 const struct lttng_viewer_session *session,
491 struct live_viewer_connection *viewer_connection)
7cdc2bab 492{
4164020e 493 int ret = 0;
4164020e
SM
494 bool found = false;
495
496 /*
497 * If the session already exists, add the stream count to it,
498 * and do max of client counts.
499 */
500 ret = list_update_session(results, session, &found, viewer_connection);
501 if (ret || found) {
bf35aaef 502 return ret;
4164020e
SM
503 }
504
bf35aaef 505 const auto map = bt2::MapValue::create();
4164020e 506
30d81897 507 if (base_url.empty()) {
bf35aaef
SM
508 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error: base_url empty.");
509 return -1;
4164020e 510 }
30d81897 511
4164020e
SM
512 /*
513 * key = "url",
514 * value = <string>,
515 */
bf35aaef
SM
516 map->insert("url",
517 fmt::format("{}/host/{}/{}", base_url, session->hostname, session->session_name));
4164020e
SM
518
519 /*
520 * key = "target-hostname",
521 * value = <string>,
522 */
bf35aaef 523 map->insert("target-hostname", session->hostname);
4164020e
SM
524
525 /*
526 * key = "session-name",
527 * value = <string>,
528 */
bf35aaef 529 map->insert("session-name", session->session_name);
4164020e
SM
530
531 /*
532 * key = "timer-us",
533 * value = <integer>,
534 */
535 {
536 uint32_t live_timer = be32toh(session->live_timer);
537
bf35aaef 538 map->insert("timer-us", (uint64_t) live_timer);
4164020e
SM
539 }
540
541 /*
542 * key = "stream-count",
543 * value = <integer>,
544 */
545 {
546 uint32_t streams = be32toh(session->streams);
547
bf35aaef 548 map->insert("stream-count", (uint64_t) streams);
4164020e
SM
549 }
550
551 /*
552 * key = "client-count",
553 * value = <integer>,
554 */
555 {
556 uint32_t clients = be32toh(session->clients);
557
bf35aaef 558 map->insert("client-count", (uint64_t) clients);
4164020e 559 }
14f28187 560
bf35aaef
SM
561 results.append(*map);
562 return 0;
7cdc2bab
MD
563}
564
565/*
566 * Data structure returned:
567 *
568 * {
569 * <array> = {
570 * [n] = {
571 * <map> = {
572 * {
573 * key = "url",
574 * value = <string>,
575 * },
576 * {
577 * key = "target-hostname",
578 * value = <string>,
579 * },
580 * {
581 * key = "session-name",
582 * value = <string>,
583 * },
584 * {
585 * key = "timer-us",
586 * value = <integer>,
587 * },
588 * {
589 * key = "stream-count",
590 * value = <integer>,
591 * },
592 * {
593 * key = "client-count",
594 * value = <integer>,
595 * },
596 * },
597 * }
598 * }
599 */
600
5ad6219b
SM
601bt2::Value::Shared
602live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection)
7cdc2bab 603{
4164020e
SM
604 enum lttng_live_viewer_status viewer_status;
605 struct lttng_viewer_cmd cmd;
606 struct lttng_viewer_list_sessions list;
607 uint32_t i, sessions_count;
bf35aaef 608 auto result = bt2::ArrayValue::create();
4164020e 609
0f5c5d5c
SM
610 BT_CPPLOGD_SPEC(viewer_connection->logger, "Requesting list of sessions: cmd={}",
611 LTTNG_VIEWER_LIST_SESSIONS);
4164020e
SM
612
613 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
614 cmd.data_size = htobe64((uint64_t) 0);
615 cmd.cmd_version = htobe32(0);
616
617 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
618 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
619 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
620 "Error sending list sessions command");
4164020e 621 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 622 throw bt2c::TryAgain {};
4164020e
SM
623 }
624
625 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
626 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
627 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
628 "Error receiving session list");
4164020e 629 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 630 throw bt2c::TryAgain {};
4164020e
SM
631 }
632
633 sessions_count = be32toh(list.sessions_count);
634 for (i = 0; i < sessions_count; i++) {
635 struct lttng_viewer_session lsession;
636
637 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
638 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
639 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
640 "Error receiving session:");
4164020e 641 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 642 throw bt2c::TryAgain {};
4164020e
SM
643 }
644
645 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
646 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
bf35aaef 647 if (list_append_session(*result, viewer_connection->url, &lsession, viewer_connection)) {
5ad6219b
SM
648 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
649 "Error appending session");
4164020e
SM
650 }
651 }
652
5ad6219b 653 return result;
7cdc2bab
MD
654}
655
4164020e
SM
656static enum lttng_live_viewer_status
657lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 658{
4164020e
SM
659 struct lttng_viewer_cmd cmd;
660 struct lttng_viewer_list_sessions list;
661 struct lttng_viewer_session lsession;
662 uint32_t i, sessions_count;
663 uint64_t session_id;
664 enum lttng_live_viewer_status status;
d721bef8 665 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 666
0f5c5d5c
SM
667 BT_CPPLOGD_SPEC(viewer_connection->logger,
668 "Asking the relay daemon for the list of sessions: cmd={}",
669 LTTNG_VIEWER_LIST_SESSIONS);
4164020e
SM
670
671 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
672 cmd.data_size = htobe64((uint64_t) 0);
673 cmd.cmd_version = htobe32(0);
674
675 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
676 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 677 viewer_handle_send_status(status, "list sessions command");
4164020e
SM
678 goto end;
679 }
680
681 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
682 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 683 viewer_handle_recv_status(status, "session list reply");
4164020e
SM
684 goto end;
685 }
686
687 sessions_count = be32toh(list.sessions_count);
688 for (i = 0; i < sessions_count; i++) {
689 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
690 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 691 viewer_handle_recv_status(status, "session reply");
4164020e
SM
692 goto end;
693 }
694 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
695 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
696 session_id = be64toh(lsession.id);
697
0f5c5d5c
SM
698 BT_CPPLOGI_SPEC(viewer_connection->logger,
699 "Adding session to internal list: "
700 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
701 session_id, lsession.hostname, lsession.session_name);
4164020e
SM
702
703 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
704 LTTNG_VIEWER_NAME_MAX) == 0) &&
705 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
706 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
707 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
708 lsession.session_name)) {
0f5c5d5c
SM
709 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
710 "Failed to add live session");
4164020e
SM
711 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
712 goto end;
713 }
714 }
715 }
716
717 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 718
f79c2d7a 719end:
4164020e 720 return status;
7cdc2bab
MD
721}
722
4164020e
SM
723enum lttng_live_viewer_status
724lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 725{
4164020e
SM
726 struct lttng_viewer_cmd cmd;
727 struct lttng_viewer_create_session_response resp;
728 enum lttng_live_viewer_status status;
d721bef8 729 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 730
0f5c5d5c
SM
731 BT_CPPLOGD_SPEC(viewer_connection->logger, "Creating a viewer session: cmd={}",
732 LTTNG_VIEWER_CREATE_SESSION);
4164020e
SM
733
734 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
735 cmd.data_size = htobe64((uint64_t) 0);
736 cmd.cmd_version = htobe32(0);
737
738 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
739 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 740 viewer_handle_send_status(status, "create session command");
4164020e
SM
741 goto end;
742 }
743
744 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
745 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 746 viewer_handle_recv_status(status, "create session reply");
4164020e
SM
747 goto end;
748 }
749
750 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
0f5c5d5c 751 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating viewer session");
4164020e
SM
752 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
753 goto end;
754 }
755
756 status = lttng_live_query_session_ids(lttng_live_msg_iter);
757 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c
SM
758 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
759 "Failed to query live viewer session ids");
4164020e
SM
760 goto end;
761 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
762 goto end;
763 }
7cdc2bab 764
f79c2d7a 765end:
4164020e 766 return status;
7cdc2bab
MD
767}
768
4164020e
SM
769static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
770 uint32_t stream_count,
771 bt_self_message_iterator *self_msg_iter)
7cdc2bab 772{
4164020e
SM
773 uint32_t i;
774 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
775 enum lttng_live_viewer_status status;
d721bef8 776 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 777
0f5c5d5c 778 BT_CPPLOGI_SPEC(viewer_connection->logger, "Getting {} new streams", stream_count);
4164020e
SM
779 for (i = 0; i < stream_count; i++) {
780 struct lttng_viewer_stream stream;
781 struct lttng_live_stream_iterator *live_stream;
782 uint64_t stream_id;
783 uint64_t ctf_trace_id;
784
785 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
786 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 787 viewer_handle_recv_status(status, "stream reply");
4164020e
SM
788 goto end;
789 }
790 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
791 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
792 stream_id = be64toh(stream.id);
793 ctf_trace_id = be64toh(stream.ctf_trace_id);
794
795 if (stream.metadata_flag) {
0f5c5d5c
SM
796 BT_CPPLOGI_SPEC(viewer_connection->logger, " metadata stream {} : {}/{}", stream_id,
797 stream.path_name, stream.channel_name);
44bd6303 798 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
0f5c5d5c
SM
799 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
800 "Error creating metadata stream");
4164020e
SM
801 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
802 goto end;
803 }
804 session->lazy_stream_msg_init = true;
805 } else {
0f5c5d5c
SM
806 BT_CPPLOGI_SPEC(viewer_connection->logger, " stream {} : {}/{}", stream_id,
807 stream.path_name, stream.channel_name);
4164020e
SM
808 live_stream =
809 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
810 if (!live_stream) {
0f5c5d5c 811 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream");
4164020e
SM
812 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
813 goto end;
814 }
815 }
816 }
817 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 818
f79c2d7a 819end:
4164020e 820 return status;
7cdc2bab
MD
821}
822
4164020e
SM
823enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
824 bt_self_message_iterator *self_msg_iter)
7cdc2bab 825{
4164020e
SM
826 struct lttng_viewer_cmd cmd;
827 enum lttng_live_viewer_status status;
828 struct lttng_viewer_attach_session_request rq;
829 struct lttng_viewer_attach_session_response rp;
830 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
d721bef8 831 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
832 uint64_t session_id = session->id;
833 uint32_t streams_count;
834 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
835 char cmd_buf[cmd_buf_len];
836
0f5c5d5c
SM
837 BT_CPPLOGD_SPEC(viewer_connection->logger,
838 "Attaching to session: cmd={}, session-id={}, seek={}",
839 LTTNG_VIEWER_ATTACH_SESSION, session_id, LTTNG_VIEWER_SEEK_LAST);
4164020e
SM
840
841 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
842 cmd.data_size = htobe64((uint64_t) sizeof(rq));
843 cmd.cmd_version = htobe32(0);
844
845 memset(&rq, 0, sizeof(rq));
846 rq.session_id = htobe64(session_id);
847 // TODO: add cmd line parameter to select seek beginning
848 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
849 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
850
851 /*
852 * Merge the cmd and connection request to prevent a write-write
853 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
854 * second write to be performed quickly in presence of Nagle's algorithm.
855 */
856 memcpy(cmd_buf, &cmd, sizeof(cmd));
857 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
858 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
859 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 860 viewer_handle_send_status(status, "attach session command");
4164020e
SM
861 goto end;
862 }
863
864 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
865 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 866 viewer_handle_recv_status(status, "attach session reply");
4164020e
SM
867 goto end;
868 }
869
870 streams_count = be32toh(rp.streams_count);
871 switch (be32toh(rp.status)) {
872 case LTTNG_VIEWER_ATTACH_OK:
873 break;
874 case LTTNG_VIEWER_ATTACH_UNK:
0f5c5d5c
SM
875 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Session id {} is unknown",
876 session_id);
4164020e
SM
877 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
878 goto end;
879 case LTTNG_VIEWER_ATTACH_ALREADY:
0f5c5d5c
SM
880 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
881 "There is already a viewer attached to this session");
4164020e
SM
882 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
883 goto end;
884 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
0f5c5d5c 885 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Not a live session");
4164020e
SM
886 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
887 goto end;
888 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
0f5c5d5c 889 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Wrong seek parameter");
4164020e
SM
890 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
891 goto end;
892 default:
0f5c5d5c
SM
893 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Unknown attach return code {}",
894 be32toh(rp.status));
4164020e
SM
895 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
896 goto end;
897 }
898
899 /* We receive the initial list of streams. */
900 status = receive_streams(session, streams_count, self_msg_iter);
901 switch (status) {
902 case LTTNG_LIVE_VIEWER_STATUS_OK:
903 break;
904 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
905 goto end;
906 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
0f5c5d5c 907 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error receiving streams");
4164020e
SM
908 goto end;
909 default:
910 bt_common_abort();
911 }
912
913 session->attached = true;
914 session->new_streams_needed = false;
7cdc2bab 915
eee8e741 916end:
4164020e 917 return status;
7cdc2bab
MD
918}
919
4164020e 920enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
7cdc2bab 921{
4164020e
SM
922 struct lttng_viewer_cmd cmd;
923 enum lttng_live_viewer_status status;
924 struct lttng_viewer_detach_session_request rq;
925 struct lttng_viewer_detach_session_response rp;
926 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
d721bef8 927 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
928 uint64_t session_id = session->id;
929 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
930 char cmd_buf[cmd_buf_len];
931
932 /*
933 * The session might already be detached and the viewer socket might
934 * already been closed. This happens when calling this function when
935 * tearing down the graph after an error.
936 */
937 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
938 return LTTNG_LIVE_VIEWER_STATUS_OK;
939 }
940
0f5c5d5c
SM
941 BT_CPPLOGD_SPEC(viewer_connection->logger, "Detaching from session: cmd={}, session-id={}",
942 LTTNG_VIEWER_DETACH_SESSION, session_id);
4164020e
SM
943
944 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
945 cmd.data_size = htobe64((uint64_t) sizeof(rq));
946 cmd.cmd_version = htobe32(0);
947
948 memset(&rq, 0, sizeof(rq));
949 rq.session_id = htobe64(session_id);
950
951 /*
952 * Merge the cmd and connection request to prevent a write-write
953 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
954 * second write to be performed quickly in presence of Nagle's algorithm.
955 */
956 memcpy(cmd_buf, &cmd, sizeof(cmd));
957 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
958 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
959 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 960 viewer_handle_send_status(status, "detach session command");
4164020e
SM
961 goto end;
962 }
963
964 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
965 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 966 viewer_handle_recv_status(status, "detach session reply");
4164020e
SM
967 goto end;
968 }
969
970 switch (be32toh(rp.status)) {
971 case LTTNG_VIEWER_DETACH_SESSION_OK:
972 break;
973 case LTTNG_VIEWER_DETACH_SESSION_UNK:
0f5c5d5c 974 BT_CPPLOGW_SPEC(viewer_connection->logger, "Session id {} is unknown", session_id);
4164020e
SM
975 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
976 goto end;
977 case LTTNG_VIEWER_DETACH_SESSION_ERR:
0f5c5d5c 978 BT_CPPLOGW_SPEC(viewer_connection->logger, "Error detaching session id {}", session_id);
4164020e
SM
979 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
980 goto end;
981 default:
0f5c5d5c
SM
982 BT_CPPLOGE_SPEC(viewer_connection->logger, "Unknown detach return code {}",
983 be32toh(rp.status));
4164020e
SM
984 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
985 goto end;
986 }
987
988 session->attached = false;
989
990 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 991
f79c2d7a 992end:
4164020e 993 return status;
7cdc2bab
MD
994}
995
4164020e 996enum lttng_live_get_one_metadata_status
15fcc425 997lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
7cdc2bab 998{
4164020e
SM
999 uint64_t len = 0;
1000 enum lttng_live_get_one_metadata_status status;
1001 enum lttng_live_viewer_status viewer_status;
1002 struct lttng_viewer_cmd cmd;
1003 struct lttng_viewer_get_metadata rq;
1004 struct lttng_viewer_metadata_packet rp;
76edb16f 1005 std::vector<char> data;
4164020e
SM
1006 struct lttng_live_session *session = trace->session;
1007 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
e66be7c3 1008 struct lttng_live_metadata *metadata = trace->metadata.get();
d721bef8 1009 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1010 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1011 char cmd_buf[cmd_buf_len];
1012
0f5c5d5c
SM
1013 BT_CPPLOGD_SPEC(viewer_connection->logger,
1014 "Requesting new metadata for trace:"
1015 "cmd={}, trace-id={}, metadata-stream-id={}",
1016 LTTNG_VIEWER_GET_METADATA, trace->id, metadata->stream_id);
4164020e
SM
1017
1018 rq.stream_id = htobe64(metadata->stream_id);
1019 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1020 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1021 cmd.cmd_version = htobe32(0);
1022
1023 /*
1024 * Merge the cmd and connection request to prevent a write-write
1025 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1026 * second write to be performed quickly in presence of Nagle's algorithm.
1027 */
1028 memcpy(cmd_buf, &cmd, sizeof(cmd));
1029 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1030 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1031 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1032 viewer_handle_send_status(viewer_status, "get metadata command");
4164020e
SM
1033 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1034 goto end;
1035 }
1036
1037 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1038 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1039 viewer_handle_recv_status(viewer_status, "get metadata reply");
4164020e
SM
1040 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1041 goto end;
1042 }
1043
1044 switch (be32toh(rp.status)) {
1045 case LTTNG_VIEWER_METADATA_OK:
0f5c5d5c 1046 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: ok");
4164020e
SM
1047 break;
1048 case LTTNG_VIEWER_NO_NEW_METADATA:
0f5c5d5c 1049 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: no new");
4164020e
SM
1050 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1051 goto end;
1052 case LTTNG_VIEWER_METADATA_ERR:
1053 /*
1054 * The Relayd cannot find this stream id. Maybe its
1055 * gone already. This can happen in short lived UST app
1056 * in a per-pid session.
1057 */
0f5c5d5c 1058 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: error");
4164020e
SM
1059 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1060 goto end;
1061 default:
0f5c5d5c
SM
1062 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1063 "Received get_metadata response: unknown");
4164020e
SM
1064 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1065 goto end;
1066 }
1067
1068 len = be64toh(rp.len);
c5ce3927
FD
1069 if (len == 0) {
1070 /*
1071 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1072 * length of 0. This means we must try again. This scenario
1073 * arises when a clear command is performed on an lttng session.
1074 */
0f5c5d5c
SM
1075 BT_CPPLOGD_SPEC(
1076 viewer_connection->logger,
c5ce3927
FD
1077 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1078 goto empty_metadata_packet_retry;
1079 }
1080
0f5c5d5c 1081 BT_CPPLOGD_SPEC(viewer_connection->logger, "Writing {} bytes to metadata", len);
4164020e 1082 if (len <= 0) {
0f5c5d5c 1083 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Erroneous response length");
4164020e
SM
1084 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1085 goto end;
1086 }
1087
76edb16f 1088 data.resize(len);
4164020e 1089
76edb16f 1090 viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
4164020e 1091 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1092 viewer_handle_recv_status(viewer_status, "get metadata packet");
4164020e
SM
1093 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1094 goto end;
1095 }
1096
1097 /*
1098 * Write the metadata to the file handle.
1099 */
76edb16f 1100 buf.insert(buf.end(), data.begin(), data.end());
4164020e 1101
c5ce3927 1102empty_metadata_packet_retry:
4164020e 1103 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
c28512ab 1104
c28512ab 1105end:
4164020e 1106 return status;
7cdc2bab
MD
1107}
1108
1109/*
1110 * Assign the fields from a lttng_viewer_index to a packet_index.
1111 */
4164020e
SM
1112static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1113 struct packet_index *pindex)
7cdc2bab 1114{
4164020e
SM
1115 BT_ASSERT(lindex);
1116 BT_ASSERT(pindex);
1117
1118 pindex->offset = be64toh(lindex->offset);
1119 pindex->packet_size = be64toh(lindex->packet_size);
1120 pindex->content_size = be64toh(lindex->content_size);
1121 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1122 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1123 pindex->events_discarded = be64toh(lindex->events_discarded);
7cdc2bab
MD
1124}
1125
4164020e 1126static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
36e94ad6 1127{
751aaa62 1128 for (const auto& session : lttng_live_msg_iter->sessions) {
0f5c5d5c
SM
1129 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1130 "Marking session as needing new streams: "
1131 "session-id={}",
1132 session->id);
4164020e
SM
1133 session->new_streams_needed = true;
1134 }
36e94ad6
FD
1135}
1136
4164020e
SM
1137enum lttng_live_iterator_status
1138lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1139 struct lttng_live_stream_iterator *stream, struct packet_index *index)
7cdc2bab 1140{
4164020e
SM
1141 struct lttng_viewer_cmd cmd;
1142 struct lttng_viewer_get_next_index rq;
1143 enum lttng_live_viewer_status viewer_status;
1144 struct lttng_viewer_index rp;
1145 enum lttng_live_iterator_status status;
d721bef8 1146 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1147 struct lttng_live_trace *trace = stream->trace;
1148 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1149 char cmd_buf[cmd_buf_len];
1150 uint32_t flags, rp_status;
1151
0f5c5d5c
SM
1152 BT_CPPLOGD_SPEC(viewer_connection->logger,
1153 "Requesting next index for stream: cmd={}, "
1154 "viewer-stream-id={}",
1155 LTTNG_VIEWER_GET_NEXT_INDEX, stream->viewer_stream_id);
4164020e
SM
1156 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1157 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1158 cmd.cmd_version = htobe32(0);
1159
1160 memset(&rq, 0, sizeof(rq));
1161 rq.stream_id = htobe64(stream->viewer_stream_id);
1162
1163 /*
1164 * Merge the cmd and connection request to prevent a write-write
1165 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1166 * second write to be performed quickly in presence of Nagle's algorithm.
1167 */
1168 memcpy(cmd_buf, &cmd, sizeof(cmd));
1169 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1170
1171 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1172 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1173 viewer_handle_send_status(viewer_status, "get next index command");
4164020e
SM
1174 goto error;
1175 }
1176
1177 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1178 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1179 viewer_handle_recv_status(viewer_status, "get next index reply");
4164020e
SM
1180 goto error;
1181 }
1182
1183 flags = be32toh(rp.flags);
1184 rp_status = be32toh(rp.status);
1185
0f5c5d5c
SM
1186 BT_CPPLOGD_SPEC(
1187 viewer_connection->logger, "Received response from relay daemon: cmd=%s, response={}",
1188 LTTNG_VIEWER_GET_NEXT_INDEX, static_cast<lttng_viewer_next_index_return_code>(rp_status));
ca622546
JG
1189
1190 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
0f5c5d5c
SM
1191 BT_CPPLOGD_SPEC(viewer_connection->logger,
1192 "Marking all sessions as possibly needing new streams: "
1193 "response={}, response-flag=NEW_STREAM",
1194 static_cast<lttng_viewer_next_index_return_code>(rp_status));
ca622546
JG
1195 lttng_live_need_new_streams(lttng_live_msg_iter);
1196 }
1197
4164020e
SM
1198 switch (rp_status) {
1199 case LTTNG_VIEWER_INDEX_INACTIVE:
1200 {
1201 uint64_t ctf_stream_class_id;
1202
1203 memset(index, 0, sizeof(struct packet_index));
1204 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1205 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1206 ctf_stream_class_id = be64toh(rp.stream_id);
1207 if (stream->ctf_stream_class_id.is_set) {
1208 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1209 } else {
1210 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1211 stream->ctf_stream_class_id.is_set = true;
1212 }
1213 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1214 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1215 break;
1216 }
1217 case LTTNG_VIEWER_INDEX_OK:
1218 {
1219 uint64_t ctf_stream_class_id;
1220
1221 lttng_index_to_packet_index(&rp, index);
1222 ctf_stream_class_id = be64toh(rp.stream_id);
1223 if (stream->ctf_stream_class_id.is_set) {
1224 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1225 } else {
1226 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1227 stream->ctf_stream_class_id.is_set = true;
1228 }
4164020e
SM
1229 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1230
1231 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
0f5c5d5c
SM
1232 BT_CPPLOGD_SPEC(viewer_connection->logger,
1233 "Marking trace as needing new metadata: "
1234 "response={}, response-flag=NEW_METADATA, trace-id={}",
1235 static_cast<lttng_viewer_next_index_return_code>(rp_status), trace->id);
4164020e
SM
1236 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1237 }
4164020e
SM
1238 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1239 break;
1240 }
1241 case LTTNG_VIEWER_INDEX_RETRY:
1242 memset(index, 0, sizeof(struct packet_index));
1243 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1244 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1245 goto end;
1246 case LTTNG_VIEWER_INDEX_HUP:
1247 memset(index, 0, sizeof(struct packet_index));
1248 index->offset = EOF;
1249 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1250 stream->has_stream_hung_up = true;
1251 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1252 break;
1253 case LTTNG_VIEWER_INDEX_ERR:
1254 memset(index, 0, sizeof(struct packet_index));
1255 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1256 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1257 goto end;
1258 default:
0f5c5d5c
SM
1259 BT_CPPLOGD_SPEC(viewer_connection->logger,
1260 "Received get_next_index response: unknown value");
4164020e
SM
1261 memset(index, 0, sizeof(struct packet_index));
1262 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1263 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1264 goto end;
1265 }
ca622546 1266
4164020e 1267 goto end;
7cdc2bab
MD
1268
1269error:
4164020e 1270 status = viewer_status_to_live_iterator_status(viewer_status);
f79c2d7a 1271end:
4164020e 1272 return status;
7cdc2bab
MD
1273}
1274
4164020e
SM
1275enum ctf_msg_iter_medium_status
1276lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1277 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1278 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1279{
4164020e
SM
1280 enum ctf_msg_iter_medium_status status;
1281 enum lttng_live_viewer_status viewer_status;
1282 struct lttng_viewer_trace_packet rp;
1283 struct lttng_viewer_cmd cmd;
1284 struct lttng_viewer_get_packet rq;
d721bef8 1285 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1286 struct lttng_live_trace *trace = stream->trace;
1287 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1288 char cmd_buf[cmd_buf_len];
1289 uint32_t flags, rp_status;
1290
0f5c5d5c
SM
1291 BT_CPPLOGD_SPEC(viewer_connection->logger,
1292 "Requesting data from stream: cmd={}, "
1293 "offset={}, request-len={}",
1294 LTTNG_VIEWER_GET_PACKET, offset, req_len);
4164020e
SM
1295
1296 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1297 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1298 cmd.cmd_version = htobe32(0);
1299
1300 memset(&rq, 0, sizeof(rq));
1301 rq.stream_id = htobe64(stream->viewer_stream_id);
1302 rq.offset = htobe64(offset);
1303 rq.len = htobe32(req_len);
1304
1305 /*
1306 * Merge the cmd and connection request to prevent a write-write
1307 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1308 * second write to be performed quickly in presence of Nagle's algorithm.
1309 */
1310 memcpy(cmd_buf, &cmd, sizeof(cmd));
1311 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1312
1313 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1314 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1315 viewer_handle_send_status(viewer_status, "get data packet command");
4164020e
SM
1316 goto error_convert_status;
1317 }
1318
1319 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1320 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1321 viewer_handle_recv_status(viewer_status, "get data packet reply");
4164020e
SM
1322 goto error_convert_status;
1323 }
1324
1325 flags = be32toh(rp.flags);
1326 rp_status = be32toh(rp.status);
1327
0f5c5d5c
SM
1328 BT_CPPLOGD_SPEC(
1329 viewer_connection->logger, "Received response from relay daemon: cmd={}, response={}",
1330 LTTNG_VIEWER_GET_PACKET, static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1331 switch (rp_status) {
1332 case LTTNG_VIEWER_GET_PACKET_OK:
1333 req_len = be32toh(rp.len);
0f5c5d5c
SM
1334 BT_CPPLOGD_SPEC(viewer_connection->logger,
1335 "Got packet from relay daemon: response={}, packet-len={}",
1336 static_cast<lttng_viewer_get_packet_return_code>(rp_status), req_len);
4164020e
SM
1337 break;
1338 case LTTNG_VIEWER_GET_PACKET_RETRY:
1339 /* Unimplemented by relay daemon */
1340 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1341 goto end;
1342 case LTTNG_VIEWER_GET_PACKET_ERR:
1343 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
0f5c5d5c
SM
1344 BT_CPPLOGD_SPEC(viewer_connection->logger,
1345 "Marking trace as needing new metadata: "
1346 "response={}, response-flag=NEW_METADATA, trace-id={}",
1347 static_cast<lttng_viewer_get_packet_return_code>(rp_status), trace->id);
4164020e
SM
1348 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1349 }
1350 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
0f5c5d5c
SM
1351 BT_CPPLOGD_SPEC(viewer_connection->logger,
1352 "Marking all sessions as possibly needing new streams: "
1353 "response={}, response-flag=NEW_STREAM",
1354 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1355 lttng_live_need_new_streams(lttng_live_msg_iter);
1356 }
1357 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1358 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
0f5c5d5c
SM
1359 BT_CPPLOGD_SPEC(viewer_connection->logger,
1360 "Reply with any one flags set means we should retry: response={}",
1361 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1362 goto end;
1363 }
0f5c5d5c
SM
1364 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1365 "Received get_data_packet response: error");
4164020e
SM
1366 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1367 goto end;
1368 case LTTNG_VIEWER_GET_PACKET_EOF:
1369 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1370 goto end;
1371 default:
0f5c5d5c
SM
1372 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1373 "Received get_data_packet response: unknown ({})", rp_status);
4164020e
SM
1374 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1375 goto end;
1376 }
1377
1378 if (req_len == 0) {
1379 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1380 goto end;
1381 }
1382
1383 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1384 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1385 viewer_handle_recv_status(viewer_status, "get data packet");
4164020e
SM
1386 goto error_convert_status;
1387 }
1388 *recv_len = req_len;
1389
1390 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1391 goto end;
f79c2d7a 1392
535fb48a 1393error_convert_status:
4164020e 1394 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
f79c2d7a 1395end:
4164020e 1396 return status;
7cdc2bab
MD
1397}
1398
1399/*
1400 * Request new streams for a session.
1401 */
4164020e
SM
1402enum lttng_live_iterator_status
1403lttng_live_session_get_new_streams(struct lttng_live_session *session,
1404 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1405{
4164020e
SM
1406 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1407 struct lttng_viewer_cmd cmd;
1408 struct lttng_viewer_new_streams_request rq;
1409 struct lttng_viewer_new_streams_response rp;
1410 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1411 enum lttng_live_viewer_status viewer_status;
d721bef8 1412 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1413 uint32_t streams_count;
1414 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1415 char cmd_buf[cmd_buf_len];
1416
1417 if (!session->new_streams_needed) {
1418 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1419 goto end;
1420 }
1421
0f5c5d5c
SM
1422 BT_CPPLOGD_SPEC(viewer_connection->logger,
1423 "Requesting new streams for session: cmd={}, session-id={}",
1424 LTTNG_VIEWER_GET_NEW_STREAMS, session->id);
4164020e
SM
1425
1426 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1427 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1428 cmd.cmd_version = htobe32(0);
1429
1430 memset(&rq, 0, sizeof(rq));
1431 rq.session_id = htobe64(session->id);
1432
1433 /*
1434 * Merge the cmd and connection request to prevent a write-write
1435 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1436 * second write to be performed quickly in presence of Nagle's algorithm.
1437 */
1438 memcpy(cmd_buf, &cmd, sizeof(cmd));
1439 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1440
1441 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1442 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1443 viewer_handle_send_status(viewer_status, "get new streams command");
4164020e
SM
1444 status = viewer_status_to_live_iterator_status(viewer_status);
1445 goto end;
1446 }
1447
1448 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1449 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1450 viewer_handle_recv_status(viewer_status, "get new streams reply");
4164020e
SM
1451 status = viewer_status_to_live_iterator_status(viewer_status);
1452 goto end;
1453 }
1454
1455 streams_count = be32toh(rp.streams_count);
1456
1457 switch (be32toh(rp.status)) {
1458 case LTTNG_VIEWER_NEW_STREAMS_OK:
1459 session->new_streams_needed = false;
1460 break;
1461 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1462 session->new_streams_needed = false;
1463 goto end;
1464 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1465 session->new_streams_needed = false;
1466 session->closed = true;
1467 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1468 goto end;
1469 case LTTNG_VIEWER_NEW_STREAMS_ERR:
0f5c5d5c 1470 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_new_streams response: error");
4164020e
SM
1471 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1472 goto end;
1473 default:
0f5c5d5c
SM
1474 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1475 "Received get_new_streams response: Unknown:"
1476 "return code {}",
1477 be32toh(rp.status));
4164020e
SM
1478 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1479 goto end;
1480 }
1481
1482 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1483 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1484 viewer_handle_recv_status(viewer_status, "new streams");
4164020e
SM
1485 status = viewer_status_to_live_iterator_status(viewer_status);
1486 goto end;
1487 }
1488
1489 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
f79c2d7a 1490end:
4164020e 1491 return status;
7cdc2bab
MD
1492}
1493
d721bef8
SM
1494enum lttng_live_viewer_status
1495live_viewer_connection_create(const char *url, bool in_query,
1496 struct lttng_live_msg_iter *lttng_live_msg_iter,
1497 const bt2c::Logger& parentLogger, live_viewer_connection::UP& viewer)
7cdc2bab 1498{
d721bef8 1499 auto viewer_connection = bt2s::make_unique<live_viewer_connection>(parentLogger);
4164020e 1500
0f5c5d5c
SM
1501 if (bt_socket_init(viewer_connection->logger) != 0) {
1502 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to init socket");
d721bef8 1503 return LTTNG_LIVE_VIEWER_STATUS_ERROR;
4164020e
SM
1504 }
1505
4164020e
SM
1506 viewer_connection->control_sock = BT_INVALID_SOCKET;
1507 viewer_connection->port = -1;
1508 viewer_connection->in_query = in_query;
1509 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
30d81897 1510 viewer_connection->url = url;
4164020e 1511
0f5c5d5c 1512 BT_CPPLOGD_SPEC(viewer_connection->logger, "Establishing connection to url \"{}\"...", url);
d721bef8
SM
1513 const auto status = lttng_live_connect_viewer(viewer_connection.get());
1514
4164020e
SM
1515 /*
1516 * Only print error and append cause in case of error. not in case of
1517 * interruption.
1518 */
1519 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c
SM
1520 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1521 "Failed to establish connection: "
1522 "url=\"{}\"",
1523 url);
d721bef8 1524 return status;
4164020e 1525 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
d721bef8 1526 return status;
4164020e 1527 }
0f5c5d5c 1528 BT_CPPLOGD_SPEC(viewer_connection->logger, "Connection to url \"{}\" is established", url);
4164020e 1529
d721bef8
SM
1530 viewer = std::move(viewer_connection);
1531 return LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab
MD
1532}
1533
277bcb7f 1534live_viewer_connection::~live_viewer_connection()
7cdc2bab 1535{
277bcb7f 1536 BT_CPPLOGD_SPEC(this->logger, "Closing connection to relay: relay-url=\"{}\"", this->url);
b9e6ec43 1537
e5694f0e 1538 viewer_connection_close_socket(this);
1cb3cdd7 1539
4164020e 1540 bt_socket_fini();
7cdc2bab 1541}
This page took 0.177703 seconds and 4 git commands to generate.