src.ctf.lttng-live: make queries report errors with exceptions
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7cdc2bab
MD
6 */
7
c802cacb 8#include <glib.h>
7cdc2bab 9#include <stdint.h>
3c22a242 10#include <stdio.h>
3c22a242 11
c802cacb
SM
12#include <babeltrace2/babeltrace.h>
13
578e048b 14#include "common/common.h"
83ad336c 15#include "compat/endian.h" /* IWYU pragma: keep */
d721bef8 16#include "cpp-common/bt2s/make-unique.hpp"
7cdc2bab 17
c802cacb 18#include "data-stream.hpp"
087cd0f5 19#include "lttng-live.hpp"
087cd0f5 20#include "lttng-viewer-abi.hpp"
087cd0f5 21#include "metadata.hpp"
c802cacb 22#include "viewer-connection.hpp"
7cdc2bab 23
0f5c5d5c 24#define viewer_handle_send_recv_status(_status, _action, _msg_str) \
4164020e
SM
25 do { \
26 switch (_status) { \
27 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
28 break; \
29 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
0f5c5d5c
SM
30 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, \
31 "Error " _action " " _msg_str); \
4164020e
SM
32 break; \
33 default: \
34 bt_common_abort(); \
35 } \
36 } while (0)
37
0f5c5d5c
SM
38#define viewer_handle_send_status(_status, _msg_str) \
39 viewer_handle_send_recv_status(_status, "sending", _msg_str)
4164020e 40
0f5c5d5c
SM
41#define viewer_handle_recv_status(_status, _msg_str) \
42 viewer_handle_send_recv_status(_status, "receiving", _msg_str)
4164020e 43
0f5c5d5c 44#define LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO(_msg, _fmt, ...) \
4164020e 45 do { \
0f5c5d5c
SM
46 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, _msg ": {}" _fmt, \
47 bt_socket_errormsg(), ##__VA_ARGS__); \
4164020e
SM
48 } while (0)
49
4164020e
SM
50static inline enum lttng_live_iterator_status
51viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 52{
4164020e
SM
53 switch (viewer_status) {
54 case LTTNG_LIVE_VIEWER_STATUS_OK:
55 return LTTNG_LIVE_ITERATOR_STATUS_OK;
56 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
57 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
58 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
59 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
60 }
61
62 bt_common_abort();
f79c2d7a
FD
63}
64
4164020e
SM
65static inline enum ctf_msg_iter_medium_status
66viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 67{
4164020e
SM
68 switch (viewer_status) {
69 case LTTNG_LIVE_VIEWER_STATUS_OK:
70 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
71 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
72 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
73 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
74 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
75 }
76
77 bt_common_abort();
f79c2d7a
FD
78}
79
4164020e 80static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
b197ca37 81{
4164020e
SM
82 int ret = bt_socket_close(viewer_connection->control_sock);
83 if (ret == -1) {
0f5c5d5c
SM
84 BT_CPPLOGW_ERRNO_SPEC(viewer_connection->logger,
85 "Error closing viewer connection socket: ", ".");
4164020e
SM
86 }
87
88 viewer_connection->control_sock = BT_INVALID_SOCKET;
b197ca37
FD
89}
90
f79c2d7a
FD
91/*
92 * This function receives a message from the Relay daemon.
93 * If it received the entire message, it returns _OK,
94 * If it's interrupted, it returns _INTERRUPTED,
95 * otherwise, it returns _ERROR.
96 */
4164020e
SM
97static enum lttng_live_viewer_status
98lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
7cdc2bab 99{
4164020e 100 ssize_t received;
4164020e
SM
101 size_t total_received = 0, to_receive = len;
102 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
103 enum lttng_live_viewer_status status;
104 BT_SOCKET sock = viewer_connection->control_sock;
105
106 /*
107 * Receive a message from the Relay.
108 */
109 do {
110 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
111 if (received == BT_SOCKET_ERROR) {
112 if (bt_socket_interrupted()) {
113 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
114 /*
115 * This interruption was due to a
116 * SIGINT and the graph is being torn
117 * down.
118 */
119 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
120 lttng_live_msg_iter->was_interrupted = true;
121 goto end;
122 } else {
123 /*
124 * A signal was received, but the graph
125 * is not being torn down. Carry on.
126 */
127 continue;
128 }
129 } else {
130 /*
131 * For any other types of socket error, close
132 * the socket and return an error.
133 */
0f5c5d5c 134 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error receiving from Relay", ".");
4164020e
SM
135
136 viewer_connection_close_socket(viewer_connection);
137 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
138 goto end;
139 }
140 } else if (received == 0) {
141 /*
142 * The recv() call returned 0. This means the
143 * connection was orderly shutdown from the other peer.
144 * If that happens when we are trying to receive
145 * a message from it, it means something when wrong.
146 * Close the socket and return an error.
147 */
0f5c5d5c
SM
148 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
149 "Remote side has closed connection");
4164020e
SM
150 viewer_connection_close_socket(viewer_connection);
151 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
152 goto end;
153 }
154
155 BT_ASSERT(received <= to_receive);
156 total_received += received;
157 to_receive -= received;
158
159 } while (to_receive > 0);
160
161 BT_ASSERT(total_received == len);
162 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
163
164end:
4164020e 165 return status;
7cdc2bab
MD
166}
167
f79c2d7a
FD
168/*
169 * This function sends a message to the Relay daemon.
170 * If it send the message, it returns _OK,
171 * If it's interrupted, it returns _INTERRUPTED,
172 * otherwise, it returns _ERROR.
173 */
4164020e
SM
174static enum lttng_live_viewer_status
175lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
7cdc2bab 176{
4164020e 177 enum lttng_live_viewer_status status;
4164020e
SM
178 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
179 BT_SOCKET sock = viewer_connection->control_sock;
180 size_t to_send = len;
181 ssize_t total_sent = 0;
182
183 do {
184 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
185 if (sent == BT_SOCKET_ERROR) {
186 if (bt_socket_interrupted()) {
187 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
188 /*
189 * This interruption was a SIGINT and
190 * the graph is being teared down.
191 */
192 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
193 lttng_live_msg_iter->was_interrupted = true;
194 goto end;
195 } else {
196 /*
197 * A signal was received, but the graph
198 * is not being teared down. Carry on.
199 */
200 continue;
201 }
202 } else {
203 /*
204 * For any other types of socket error, close
205 * the socket and return an error.
206 */
0f5c5d5c 207 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error sending to Relay", ".");
4164020e
SM
208
209 viewer_connection_close_socket(viewer_connection);
210 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
211 goto end;
212 }
213 }
214
215 BT_ASSERT(sent <= to_send);
216 total_sent += sent;
217 to_send -= sent;
218
219 } while (to_send > 0);
220
221 BT_ASSERT(total_sent == len);
222 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
223
224end:
4164020e 225 return status;
7cdc2bab
MD
226}
227
4164020e 228static int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 229{
4164020e 230 char error_buf[256] = {0};
dd420a9b 231 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
9d0b798a 232 bt_common_lttng_live_url_parts_deleter partsDeleter {lttng_live_url_parts};
4164020e 233 int ret = -1;
4164020e 234
30d81897 235 if (viewer_connection->url.empty()) {
4164020e
SM
236 goto end;
237 }
238
30d81897
SM
239 lttng_live_url_parts = bt_common_parse_lttng_live_url(viewer_connection->url.c_str(), error_buf,
240 sizeof(error_buf));
4164020e 241 if (!lttng_live_url_parts.proto) {
0f5c5d5c
SM
242 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Invalid LTTng live URL format: {}",
243 error_buf);
4164020e
SM
244 goto end;
245 }
198e7911 246 viewer_connection->proto.reset(lttng_live_url_parts.proto);
4164020e
SM
247 lttng_live_url_parts.proto = NULL;
248
198e7911 249 viewer_connection->relay_hostname.reset(lttng_live_url_parts.hostname);
4164020e
SM
250 lttng_live_url_parts.hostname = NULL;
251
252 if (lttng_live_url_parts.port >= 0) {
253 viewer_connection->port = lttng_live_url_parts.port;
254 } else {
255 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
256 }
257
198e7911 258 viewer_connection->target_hostname.reset(lttng_live_url_parts.target_hostname);
4164020e
SM
259 lttng_live_url_parts.target_hostname = NULL;
260
261 if (lttng_live_url_parts.session_name) {
198e7911 262 viewer_connection->session_name.reset(lttng_live_url_parts.session_name);
4164020e
SM
263 lttng_live_url_parts.session_name = NULL;
264 }
265
266 ret = 0;
7cdc2bab
MD
267
268end:
4164020e 269 return ret;
7cdc2bab
MD
270}
271
4164020e
SM
272static enum lttng_live_viewer_status
273lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab 274{
4164020e
SM
275 struct lttng_viewer_cmd cmd;
276 struct lttng_viewer_connect connect;
277 enum lttng_live_viewer_status status;
4164020e
SM
278 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
279 char cmd_buf[cmd_buf_len];
280
0f5c5d5c
SM
281 BT_CPPLOGD_SPEC(viewer_connection->logger,
282 "Handshaking with the relay daemon: cmd={}, major-version={}, minor-version={}",
283 LTTNG_VIEWER_CONNECT, LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
4164020e
SM
284
285 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
286 cmd.data_size = htobe64((uint64_t) sizeof(connect));
287 cmd.cmd_version = htobe32(0);
288
289 connect.viewer_session_id = -1ULL; /* will be set on recv */
290 connect.major = htobe32(LTTNG_LIVE_MAJOR);
291 connect.minor = htobe32(LTTNG_LIVE_MINOR);
292 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
293
294 /*
295 * Merge the cmd and connection request to prevent a write-write
296 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
297 * second write to be performed quickly in presence of Nagle's algorithm
298 */
299 memcpy(cmd_buf, &cmd, sizeof(cmd));
300 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
301
302 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
303 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 304 viewer_handle_send_status(status, "viewer connect command");
4164020e
SM
305 goto end;
306 }
307
308 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
309 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 310 viewer_handle_recv_status(status, "viewer connect reply");
4164020e
SM
311 goto end;
312 }
313
0f5c5d5c
SM
314 BT_CPPLOGI_SPEC(viewer_connection->logger, "Received viewer session ID : {}",
315 (uint64_t) be64toh(connect.viewer_session_id));
316 BT_CPPLOGI_SPEC(viewer_connection->logger, "Relayd version : {}.{}", be32toh(connect.major),
317 be32toh(connect.minor));
4164020e
SM
318
319 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
0f5c5d5c
SM
320 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
321 "Incompatible lttng-relayd protocol");
4164020e
SM
322 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
323 goto end;
324 }
325 /* Use the smallest protocol version implemented. */
326 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
327 viewer_connection->minor = be32toh(connect.minor);
328 } else {
329 viewer_connection->minor = LTTNG_LIVE_MINOR;
330 }
331 viewer_connection->major = LTTNG_LIVE_MAJOR;
332
333 status = LTTNG_LIVE_VIEWER_STATUS_OK;
334
335 goto end;
f79c2d7a
FD
336
337end:
4164020e 338 return status;
7cdc2bab
MD
339}
340
4164020e
SM
341static enum lttng_live_viewer_status
342lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 343{
4164020e
SM
344 struct hostent *host;
345 struct sockaddr_in server_addr;
346 enum lttng_live_viewer_status status;
4164020e
SM
347
348 if (parse_url(viewer_connection)) {
0f5c5d5c 349 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to parse URL");
4164020e
SM
350 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
351 goto error;
352 }
353
0f5c5d5c
SM
354 BT_CPPLOGD_SPEC(
355 viewer_connection->logger,
356 "Connecting to hostname : {}, port : {}, target hostname : {}, session name : {}, proto : {}",
4164020e
SM
357 viewer_connection->relay_hostname->str, viewer_connection->port,
358 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
359 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
360 viewer_connection->proto->str);
361
362 host = gethostbyname(viewer_connection->relay_hostname->str);
363 if (!host) {
0f5c5d5c
SM
364 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
365 "Cannot lookup hostname: hostname=\"{}\"",
366 viewer_connection->relay_hostname->str);
4164020e
SM
367 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
368 goto error;
369 }
370
371 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
0f5c5d5c
SM
372 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Socket creation failed: {}",
373 bt_socket_errormsg());
4164020e
SM
374 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
375 goto error;
376 }
377
378 server_addr.sin_family = AF_INET;
379 server_addr.sin_port = htons(viewer_connection->port);
380 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
381 memset(&(server_addr.sin_zero), 0, 8);
382
383 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
384 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
0f5c5d5c
SM
385 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Connection failed: {}",
386 bt_socket_errormsg());
4164020e
SM
387 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
388 goto error;
389 }
390
391 status = lttng_live_handshake(viewer_connection);
392
393 /*
394 * Only print error and append cause in case of error. not in case of
395 * interruption.
396 */
397 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c 398 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Viewer handshake failed");
4164020e
SM
399 goto error;
400 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
401 goto end;
402 }
403
404 goto end;
7cdc2bab
MD
405
406error:
4164020e
SM
407 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
408 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
0f5c5d5c
SM
409 BT_CPPLOGW_SPEC(viewer_connection->logger, "Error closing socket: {}.",
410 bt_socket_errormsg());
4164020e
SM
411 }
412 }
413 viewer_connection->control_sock = BT_INVALID_SOCKET;
f79c2d7a 414end:
4164020e 415 return status;
7cdc2bab
MD
416}
417
4164020e 418static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 419{
4164020e
SM
420 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
421 return;
422 }
423 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
0f5c5d5c
SM
424 BT_CPPLOGW_SPEC(viewer_connection->logger, "Error closing socket: {}",
425 bt_socket_errormsg());
4164020e
SM
426 viewer_connection->control_sock = BT_INVALID_SOCKET;
427 }
7cdc2bab
MD
428}
429
bf35aaef
SM
430static int list_update_session(const bt2::ArrayValue results,
431 const struct lttng_viewer_session *session, bool *_found,
432 struct live_viewer_connection *viewer_connection)
7cdc2bab 433{
4164020e
SM
434 bool found = false;
435
bf35aaef
SM
436 for (const auto value : results) {
437 const auto map = value.asMap();
438 const auto hostnameVal = map["target-hostname"];
4164020e 439
bf35aaef 440 if (!hostnameVal) {
0f5c5d5c
SM
441 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
442 "Error borrowing \"target-hostname\" entry.");
bf35aaef 443 return -1;
4164020e 444 }
bf35aaef
SM
445
446 const auto sessionNameVal = map["session-name"];
447
448 if (!sessionNameVal) {
0f5c5d5c
SM
449 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
450 "Error borrowing \"session-name\" entry.");
bf35aaef 451 return -1;
4164020e 452 }
bf35aaef
SM
453
454 const auto hostname_str = hostnameVal->asString().value();
455 const auto session_name_str = sessionNameVal->asString().value();
4164020e
SM
456
457 if (strcmp(session->hostname, hostname_str) == 0 &&
458 strcmp(session->session_name, session_name_str) == 0) {
4164020e
SM
459 uint32_t streams = be32toh(session->streams);
460 uint32_t clients = be32toh(session->clients);
461
462 found = true;
463
bf35aaef
SM
464 const auto streamCountVal = map["stream-count"];
465
466 if (!streamCountVal) {
0f5c5d5c
SM
467 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
468 "Error borrowing \"stream-count\" entry.");
bf35aaef 469 return -1;
4164020e 470 }
bf35aaef
SM
471
472 auto val = streamCountVal->asUnsignedInteger().value();
473
4164020e
SM
474 /* sum */
475 val += streams;
bf35aaef 476 streamCountVal->asUnsignedInteger().value(val);
4164020e 477
bf35aaef
SM
478 const auto clientCountVal = map["client-count"];
479
480 if (!clientCountVal) {
0f5c5d5c
SM
481 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
482 "Error borrowing \"client-count\" entry.");
bf35aaef 483 return -1;
4164020e 484 }
bf35aaef
SM
485
486 val = clientCountVal->asUnsignedInteger().value();
487
4164020e 488 /* max */
bf35aaef
SM
489 val = std::max<uint64_t>(clients, val);
490 clientCountVal->asUnsignedInteger().value(val);
4164020e
SM
491 }
492
493 if (found) {
494 break;
495 }
496 }
bf35aaef 497
4164020e 498 *_found = found;
bf35aaef 499 return 0;
7cdc2bab
MD
500}
501
bf35aaef 502static int list_append_session(const bt2::ArrayValue results, const std::string& base_url,
4164020e
SM
503 const struct lttng_viewer_session *session,
504 struct live_viewer_connection *viewer_connection)
7cdc2bab 505{
4164020e 506 int ret = 0;
4164020e
SM
507 bool found = false;
508
509 /*
510 * If the session already exists, add the stream count to it,
511 * and do max of client counts.
512 */
513 ret = list_update_session(results, session, &found, viewer_connection);
514 if (ret || found) {
bf35aaef 515 return ret;
4164020e
SM
516 }
517
bf35aaef 518 const auto map = bt2::MapValue::create();
4164020e 519
30d81897 520 if (base_url.empty()) {
bf35aaef
SM
521 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error: base_url empty.");
522 return -1;
4164020e 523 }
30d81897 524
4164020e
SM
525 /*
526 * key = "url",
527 * value = <string>,
528 */
bf35aaef
SM
529 map->insert("url",
530 fmt::format("{}/host/{}/{}", base_url, session->hostname, session->session_name));
4164020e
SM
531
532 /*
533 * key = "target-hostname",
534 * value = <string>,
535 */
bf35aaef 536 map->insert("target-hostname", session->hostname);
4164020e
SM
537
538 /*
539 * key = "session-name",
540 * value = <string>,
541 */
bf35aaef 542 map->insert("session-name", session->session_name);
4164020e
SM
543
544 /*
545 * key = "timer-us",
546 * value = <integer>,
547 */
548 {
549 uint32_t live_timer = be32toh(session->live_timer);
550
bf35aaef 551 map->insert("timer-us", (uint64_t) live_timer);
4164020e
SM
552 }
553
554 /*
555 * key = "stream-count",
556 * value = <integer>,
557 */
558 {
559 uint32_t streams = be32toh(session->streams);
560
bf35aaef 561 map->insert("stream-count", (uint64_t) streams);
4164020e
SM
562 }
563
564 /*
565 * key = "client-count",
566 * value = <integer>,
567 */
568 {
569 uint32_t clients = be32toh(session->clients);
570
bf35aaef 571 map->insert("client-count", (uint64_t) clients);
4164020e 572 }
14f28187 573
bf35aaef
SM
574 results.append(*map);
575 return 0;
7cdc2bab
MD
576}
577
578/*
579 * Data structure returned:
580 *
581 * {
582 * <array> = {
583 * [n] = {
584 * <map> = {
585 * {
586 * key = "url",
587 * value = <string>,
588 * },
589 * {
590 * key = "target-hostname",
591 * value = <string>,
592 * },
593 * {
594 * key = "session-name",
595 * value = <string>,
596 * },
597 * {
598 * key = "timer-us",
599 * value = <integer>,
600 * },
601 * {
602 * key = "stream-count",
603 * value = <integer>,
604 * },
605 * {
606 * key = "client-count",
607 * value = <integer>,
608 * },
609 * },
610 * }
611 * }
612 */
613
5ad6219b
SM
614bt2::Value::Shared
615live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection)
7cdc2bab 616{
4164020e
SM
617 enum lttng_live_viewer_status viewer_status;
618 struct lttng_viewer_cmd cmd;
619 struct lttng_viewer_list_sessions list;
620 uint32_t i, sessions_count;
bf35aaef 621 auto result = bt2::ArrayValue::create();
4164020e 622
0f5c5d5c
SM
623 BT_CPPLOGD_SPEC(viewer_connection->logger, "Requesting list of sessions: cmd={}",
624 LTTNG_VIEWER_LIST_SESSIONS);
4164020e
SM
625
626 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
627 cmd.data_size = htobe64((uint64_t) 0);
628 cmd.cmd_version = htobe32(0);
629
630 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
631 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
632 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
633 "Error sending list sessions command");
4164020e 634 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 635 throw bt2c::TryAgain {};
4164020e
SM
636 }
637
638 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
639 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
640 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
641 "Error receiving session list");
4164020e 642 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 643 throw bt2c::TryAgain {};
4164020e
SM
644 }
645
646 sessions_count = be32toh(list.sessions_count);
647 for (i = 0; i < sessions_count; i++) {
648 struct lttng_viewer_session lsession;
649
650 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
651 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
5ad6219b
SM
652 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
653 "Error receiving session:");
4164020e 654 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
5ad6219b 655 throw bt2c::TryAgain {};
4164020e
SM
656 }
657
658 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
659 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
bf35aaef 660 if (list_append_session(*result, viewer_connection->url, &lsession, viewer_connection)) {
5ad6219b
SM
661 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
662 "Error appending session");
4164020e
SM
663 }
664 }
665
5ad6219b 666 return result;
7cdc2bab
MD
667}
668
4164020e
SM
669static enum lttng_live_viewer_status
670lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 671{
4164020e
SM
672 struct lttng_viewer_cmd cmd;
673 struct lttng_viewer_list_sessions list;
674 struct lttng_viewer_session lsession;
675 uint32_t i, sessions_count;
676 uint64_t session_id;
677 enum lttng_live_viewer_status status;
d721bef8 678 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 679
0f5c5d5c
SM
680 BT_CPPLOGD_SPEC(viewer_connection->logger,
681 "Asking the relay daemon for the list of sessions: cmd={}",
682 LTTNG_VIEWER_LIST_SESSIONS);
4164020e
SM
683
684 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
685 cmd.data_size = htobe64((uint64_t) 0);
686 cmd.cmd_version = htobe32(0);
687
688 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
689 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 690 viewer_handle_send_status(status, "list sessions command");
4164020e
SM
691 goto end;
692 }
693
694 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
695 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 696 viewer_handle_recv_status(status, "session list reply");
4164020e
SM
697 goto end;
698 }
699
700 sessions_count = be32toh(list.sessions_count);
701 for (i = 0; i < sessions_count; i++) {
702 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
703 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 704 viewer_handle_recv_status(status, "session reply");
4164020e
SM
705 goto end;
706 }
707 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
708 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
709 session_id = be64toh(lsession.id);
710
0f5c5d5c
SM
711 BT_CPPLOGI_SPEC(viewer_connection->logger,
712 "Adding session to internal list: "
713 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
714 session_id, lsession.hostname, lsession.session_name);
4164020e
SM
715
716 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
717 LTTNG_VIEWER_NAME_MAX) == 0) &&
718 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
719 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
720 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
721 lsession.session_name)) {
0f5c5d5c
SM
722 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
723 "Failed to add live session");
4164020e
SM
724 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
725 goto end;
726 }
727 }
728 }
729
730 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 731
f79c2d7a 732end:
4164020e 733 return status;
7cdc2bab
MD
734}
735
4164020e
SM
736enum lttng_live_viewer_status
737lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 738{
4164020e
SM
739 struct lttng_viewer_cmd cmd;
740 struct lttng_viewer_create_session_response resp;
741 enum lttng_live_viewer_status status;
d721bef8 742 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 743
0f5c5d5c
SM
744 BT_CPPLOGD_SPEC(viewer_connection->logger, "Creating a viewer session: cmd={}",
745 LTTNG_VIEWER_CREATE_SESSION);
4164020e
SM
746
747 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
748 cmd.data_size = htobe64((uint64_t) 0);
749 cmd.cmd_version = htobe32(0);
750
751 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
752 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 753 viewer_handle_send_status(status, "create session command");
4164020e
SM
754 goto end;
755 }
756
757 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
758 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 759 viewer_handle_recv_status(status, "create session reply");
4164020e
SM
760 goto end;
761 }
762
763 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
0f5c5d5c 764 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating viewer session");
4164020e
SM
765 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
766 goto end;
767 }
768
769 status = lttng_live_query_session_ids(lttng_live_msg_iter);
770 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c
SM
771 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
772 "Failed to query live viewer session ids");
4164020e
SM
773 goto end;
774 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
775 goto end;
776 }
7cdc2bab 777
f79c2d7a 778end:
4164020e 779 return status;
7cdc2bab
MD
780}
781
4164020e
SM
782static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
783 uint32_t stream_count,
784 bt_self_message_iterator *self_msg_iter)
7cdc2bab 785{
4164020e
SM
786 uint32_t i;
787 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
788 enum lttng_live_viewer_status status;
d721bef8 789 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e 790
0f5c5d5c 791 BT_CPPLOGI_SPEC(viewer_connection->logger, "Getting {} new streams", stream_count);
4164020e
SM
792 for (i = 0; i < stream_count; i++) {
793 struct lttng_viewer_stream stream;
794 struct lttng_live_stream_iterator *live_stream;
795 uint64_t stream_id;
796 uint64_t ctf_trace_id;
797
798 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
799 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 800 viewer_handle_recv_status(status, "stream reply");
4164020e
SM
801 goto end;
802 }
803 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
804 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
805 stream_id = be64toh(stream.id);
806 ctf_trace_id = be64toh(stream.ctf_trace_id);
807
808 if (stream.metadata_flag) {
0f5c5d5c
SM
809 BT_CPPLOGI_SPEC(viewer_connection->logger, " metadata stream {} : {}/{}", stream_id,
810 stream.path_name, stream.channel_name);
44bd6303 811 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
0f5c5d5c
SM
812 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
813 "Error creating metadata stream");
4164020e
SM
814 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
815 goto end;
816 }
817 session->lazy_stream_msg_init = true;
818 } else {
0f5c5d5c
SM
819 BT_CPPLOGI_SPEC(viewer_connection->logger, " stream {} : {}/{}", stream_id,
820 stream.path_name, stream.channel_name);
4164020e
SM
821 live_stream =
822 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
823 if (!live_stream) {
0f5c5d5c 824 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream");
4164020e
SM
825 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
826 goto end;
827 }
828 }
829 }
830 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 831
f79c2d7a 832end:
4164020e 833 return status;
7cdc2bab
MD
834}
835
4164020e
SM
836enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
837 bt_self_message_iterator *self_msg_iter)
7cdc2bab 838{
4164020e
SM
839 struct lttng_viewer_cmd cmd;
840 enum lttng_live_viewer_status status;
841 struct lttng_viewer_attach_session_request rq;
842 struct lttng_viewer_attach_session_response rp;
843 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
d721bef8 844 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
845 uint64_t session_id = session->id;
846 uint32_t streams_count;
847 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
848 char cmd_buf[cmd_buf_len];
849
0f5c5d5c
SM
850 BT_CPPLOGD_SPEC(viewer_connection->logger,
851 "Attaching to session: cmd={}, session-id={}, seek={}",
852 LTTNG_VIEWER_ATTACH_SESSION, session_id, LTTNG_VIEWER_SEEK_LAST);
4164020e
SM
853
854 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
855 cmd.data_size = htobe64((uint64_t) sizeof(rq));
856 cmd.cmd_version = htobe32(0);
857
858 memset(&rq, 0, sizeof(rq));
859 rq.session_id = htobe64(session_id);
860 // TODO: add cmd line parameter to select seek beginning
861 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
862 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
863
864 /*
865 * Merge the cmd and connection request to prevent a write-write
866 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
867 * second write to be performed quickly in presence of Nagle's algorithm.
868 */
869 memcpy(cmd_buf, &cmd, sizeof(cmd));
870 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
871 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
872 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 873 viewer_handle_send_status(status, "attach session command");
4164020e
SM
874 goto end;
875 }
876
877 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
878 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 879 viewer_handle_recv_status(status, "attach session reply");
4164020e
SM
880 goto end;
881 }
882
883 streams_count = be32toh(rp.streams_count);
884 switch (be32toh(rp.status)) {
885 case LTTNG_VIEWER_ATTACH_OK:
886 break;
887 case LTTNG_VIEWER_ATTACH_UNK:
0f5c5d5c
SM
888 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Session id {} is unknown",
889 session_id);
4164020e
SM
890 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
891 goto end;
892 case LTTNG_VIEWER_ATTACH_ALREADY:
0f5c5d5c
SM
893 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
894 "There is already a viewer attached to this session");
4164020e
SM
895 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
896 goto end;
897 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
0f5c5d5c 898 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Not a live session");
4164020e
SM
899 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
900 goto end;
901 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
0f5c5d5c 902 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Wrong seek parameter");
4164020e
SM
903 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
904 goto end;
905 default:
0f5c5d5c
SM
906 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Unknown attach return code {}",
907 be32toh(rp.status));
4164020e
SM
908 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
909 goto end;
910 }
911
912 /* We receive the initial list of streams. */
913 status = receive_streams(session, streams_count, self_msg_iter);
914 switch (status) {
915 case LTTNG_LIVE_VIEWER_STATUS_OK:
916 break;
917 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
918 goto end;
919 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
0f5c5d5c 920 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error receiving streams");
4164020e
SM
921 goto end;
922 default:
923 bt_common_abort();
924 }
925
926 session->attached = true;
927 session->new_streams_needed = false;
7cdc2bab 928
eee8e741 929end:
4164020e 930 return status;
7cdc2bab
MD
931}
932
4164020e 933enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
7cdc2bab 934{
4164020e
SM
935 struct lttng_viewer_cmd cmd;
936 enum lttng_live_viewer_status status;
937 struct lttng_viewer_detach_session_request rq;
938 struct lttng_viewer_detach_session_response rp;
939 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
d721bef8 940 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
941 uint64_t session_id = session->id;
942 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
943 char cmd_buf[cmd_buf_len];
944
945 /*
946 * The session might already be detached and the viewer socket might
947 * already been closed. This happens when calling this function when
948 * tearing down the graph after an error.
949 */
950 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
951 return LTTNG_LIVE_VIEWER_STATUS_OK;
952 }
953
0f5c5d5c
SM
954 BT_CPPLOGD_SPEC(viewer_connection->logger, "Detaching from session: cmd={}, session-id={}",
955 LTTNG_VIEWER_DETACH_SESSION, session_id);
4164020e
SM
956
957 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
958 cmd.data_size = htobe64((uint64_t) sizeof(rq));
959 cmd.cmd_version = htobe32(0);
960
961 memset(&rq, 0, sizeof(rq));
962 rq.session_id = htobe64(session_id);
963
964 /*
965 * Merge the cmd and connection request to prevent a write-write
966 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
967 * second write to be performed quickly in presence of Nagle's algorithm.
968 */
969 memcpy(cmd_buf, &cmd, sizeof(cmd));
970 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
971 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
972 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 973 viewer_handle_send_status(status, "detach session command");
4164020e
SM
974 goto end;
975 }
976
977 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
978 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 979 viewer_handle_recv_status(status, "detach session reply");
4164020e
SM
980 goto end;
981 }
982
983 switch (be32toh(rp.status)) {
984 case LTTNG_VIEWER_DETACH_SESSION_OK:
985 break;
986 case LTTNG_VIEWER_DETACH_SESSION_UNK:
0f5c5d5c 987 BT_CPPLOGW_SPEC(viewer_connection->logger, "Session id {} is unknown", session_id);
4164020e
SM
988 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
989 goto end;
990 case LTTNG_VIEWER_DETACH_SESSION_ERR:
0f5c5d5c 991 BT_CPPLOGW_SPEC(viewer_connection->logger, "Error detaching session id {}", session_id);
4164020e
SM
992 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
993 goto end;
994 default:
0f5c5d5c
SM
995 BT_CPPLOGE_SPEC(viewer_connection->logger, "Unknown detach return code {}",
996 be32toh(rp.status));
4164020e
SM
997 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
998 goto end;
999 }
1000
1001 session->attached = false;
1002
1003 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1004
f79c2d7a 1005end:
4164020e 1006 return status;
7cdc2bab
MD
1007}
1008
4164020e 1009enum lttng_live_get_one_metadata_status
15fcc425 1010lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
7cdc2bab 1011{
4164020e
SM
1012 uint64_t len = 0;
1013 enum lttng_live_get_one_metadata_status status;
1014 enum lttng_live_viewer_status viewer_status;
1015 struct lttng_viewer_cmd cmd;
1016 struct lttng_viewer_get_metadata rq;
1017 struct lttng_viewer_metadata_packet rp;
76edb16f 1018 std::vector<char> data;
4164020e
SM
1019 struct lttng_live_session *session = trace->session;
1020 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
e66be7c3 1021 struct lttng_live_metadata *metadata = trace->metadata.get();
d721bef8 1022 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1023 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1024 char cmd_buf[cmd_buf_len];
1025
0f5c5d5c
SM
1026 BT_CPPLOGD_SPEC(viewer_connection->logger,
1027 "Requesting new metadata for trace:"
1028 "cmd={}, trace-id={}, metadata-stream-id={}",
1029 LTTNG_VIEWER_GET_METADATA, trace->id, metadata->stream_id);
4164020e
SM
1030
1031 rq.stream_id = htobe64(metadata->stream_id);
1032 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1033 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1034 cmd.cmd_version = htobe32(0);
1035
1036 /*
1037 * Merge the cmd and connection request to prevent a write-write
1038 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1039 * second write to be performed quickly in presence of Nagle's algorithm.
1040 */
1041 memcpy(cmd_buf, &cmd, sizeof(cmd));
1042 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1043 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1044 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1045 viewer_handle_send_status(viewer_status, "get metadata command");
4164020e
SM
1046 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1047 goto end;
1048 }
1049
1050 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1051 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1052 viewer_handle_recv_status(viewer_status, "get metadata reply");
4164020e
SM
1053 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1054 goto end;
1055 }
1056
1057 switch (be32toh(rp.status)) {
1058 case LTTNG_VIEWER_METADATA_OK:
0f5c5d5c 1059 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: ok");
4164020e
SM
1060 break;
1061 case LTTNG_VIEWER_NO_NEW_METADATA:
0f5c5d5c 1062 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: no new");
4164020e
SM
1063 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1064 goto end;
1065 case LTTNG_VIEWER_METADATA_ERR:
1066 /*
1067 * The Relayd cannot find this stream id. Maybe its
1068 * gone already. This can happen in short lived UST app
1069 * in a per-pid session.
1070 */
0f5c5d5c 1071 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: error");
4164020e
SM
1072 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1073 goto end;
1074 default:
0f5c5d5c
SM
1075 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1076 "Received get_metadata response: unknown");
4164020e
SM
1077 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1078 goto end;
1079 }
1080
1081 len = be64toh(rp.len);
c5ce3927
FD
1082 if (len == 0) {
1083 /*
1084 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1085 * length of 0. This means we must try again. This scenario
1086 * arises when a clear command is performed on an lttng session.
1087 */
0f5c5d5c
SM
1088 BT_CPPLOGD_SPEC(
1089 viewer_connection->logger,
c5ce3927
FD
1090 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1091 goto empty_metadata_packet_retry;
1092 }
1093
0f5c5d5c 1094 BT_CPPLOGD_SPEC(viewer_connection->logger, "Writing {} bytes to metadata", len);
4164020e 1095 if (len <= 0) {
0f5c5d5c 1096 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Erroneous response length");
4164020e
SM
1097 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1098 goto end;
1099 }
1100
76edb16f 1101 data.resize(len);
4164020e 1102
76edb16f 1103 viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
4164020e 1104 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1105 viewer_handle_recv_status(viewer_status, "get metadata packet");
4164020e
SM
1106 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1107 goto end;
1108 }
1109
1110 /*
1111 * Write the metadata to the file handle.
1112 */
76edb16f 1113 buf.insert(buf.end(), data.begin(), data.end());
4164020e 1114
c5ce3927 1115empty_metadata_packet_retry:
4164020e 1116 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
c28512ab 1117
c28512ab 1118end:
4164020e 1119 return status;
7cdc2bab
MD
1120}
1121
1122/*
1123 * Assign the fields from a lttng_viewer_index to a packet_index.
1124 */
4164020e
SM
1125static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1126 struct packet_index *pindex)
7cdc2bab 1127{
4164020e
SM
1128 BT_ASSERT(lindex);
1129 BT_ASSERT(pindex);
1130
1131 pindex->offset = be64toh(lindex->offset);
1132 pindex->packet_size = be64toh(lindex->packet_size);
1133 pindex->content_size = be64toh(lindex->content_size);
1134 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1135 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1136 pindex->events_discarded = be64toh(lindex->events_discarded);
7cdc2bab
MD
1137}
1138
4164020e 1139static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
36e94ad6 1140{
751aaa62 1141 for (const auto& session : lttng_live_msg_iter->sessions) {
0f5c5d5c
SM
1142 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1143 "Marking session as needing new streams: "
1144 "session-id={}",
1145 session->id);
4164020e
SM
1146 session->new_streams_needed = true;
1147 }
36e94ad6
FD
1148}
1149
4164020e
SM
1150enum lttng_live_iterator_status
1151lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1152 struct lttng_live_stream_iterator *stream, struct packet_index *index)
7cdc2bab 1153{
4164020e
SM
1154 struct lttng_viewer_cmd cmd;
1155 struct lttng_viewer_get_next_index rq;
1156 enum lttng_live_viewer_status viewer_status;
1157 struct lttng_viewer_index rp;
1158 enum lttng_live_iterator_status status;
d721bef8 1159 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1160 struct lttng_live_trace *trace = stream->trace;
1161 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1162 char cmd_buf[cmd_buf_len];
1163 uint32_t flags, rp_status;
1164
0f5c5d5c
SM
1165 BT_CPPLOGD_SPEC(viewer_connection->logger,
1166 "Requesting next index for stream: cmd={}, "
1167 "viewer-stream-id={}",
1168 LTTNG_VIEWER_GET_NEXT_INDEX, stream->viewer_stream_id);
4164020e
SM
1169 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1170 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1171 cmd.cmd_version = htobe32(0);
1172
1173 memset(&rq, 0, sizeof(rq));
1174 rq.stream_id = htobe64(stream->viewer_stream_id);
1175
1176 /*
1177 * Merge the cmd and connection request to prevent a write-write
1178 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1179 * second write to be performed quickly in presence of Nagle's algorithm.
1180 */
1181 memcpy(cmd_buf, &cmd, sizeof(cmd));
1182 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1183
1184 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1185 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1186 viewer_handle_send_status(viewer_status, "get next index command");
4164020e
SM
1187 goto error;
1188 }
1189
1190 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1191 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1192 viewer_handle_recv_status(viewer_status, "get next index reply");
4164020e
SM
1193 goto error;
1194 }
1195
1196 flags = be32toh(rp.flags);
1197 rp_status = be32toh(rp.status);
1198
0f5c5d5c
SM
1199 BT_CPPLOGD_SPEC(
1200 viewer_connection->logger, "Received response from relay daemon: cmd=%s, response={}",
1201 LTTNG_VIEWER_GET_NEXT_INDEX, static_cast<lttng_viewer_next_index_return_code>(rp_status));
ca622546
JG
1202
1203 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
0f5c5d5c
SM
1204 BT_CPPLOGD_SPEC(viewer_connection->logger,
1205 "Marking all sessions as possibly needing new streams: "
1206 "response={}, response-flag=NEW_STREAM",
1207 static_cast<lttng_viewer_next_index_return_code>(rp_status));
ca622546
JG
1208 lttng_live_need_new_streams(lttng_live_msg_iter);
1209 }
1210
4164020e
SM
1211 switch (rp_status) {
1212 case LTTNG_VIEWER_INDEX_INACTIVE:
1213 {
1214 uint64_t ctf_stream_class_id;
1215
1216 memset(index, 0, sizeof(struct packet_index));
1217 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1218 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1219 ctf_stream_class_id = be64toh(rp.stream_id);
1220 if (stream->ctf_stream_class_id.is_set) {
1221 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1222 } else {
1223 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1224 stream->ctf_stream_class_id.is_set = true;
1225 }
1226 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1227 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1228 break;
1229 }
1230 case LTTNG_VIEWER_INDEX_OK:
1231 {
1232 uint64_t ctf_stream_class_id;
1233
1234 lttng_index_to_packet_index(&rp, index);
1235 ctf_stream_class_id = be64toh(rp.stream_id);
1236 if (stream->ctf_stream_class_id.is_set) {
1237 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1238 } else {
1239 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1240 stream->ctf_stream_class_id.is_set = true;
1241 }
4164020e
SM
1242 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1243
1244 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
0f5c5d5c
SM
1245 BT_CPPLOGD_SPEC(viewer_connection->logger,
1246 "Marking trace as needing new metadata: "
1247 "response={}, response-flag=NEW_METADATA, trace-id={}",
1248 static_cast<lttng_viewer_next_index_return_code>(rp_status), trace->id);
4164020e
SM
1249 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1250 }
4164020e
SM
1251 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1252 break;
1253 }
1254 case LTTNG_VIEWER_INDEX_RETRY:
1255 memset(index, 0, sizeof(struct packet_index));
1256 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1257 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1258 goto end;
1259 case LTTNG_VIEWER_INDEX_HUP:
1260 memset(index, 0, sizeof(struct packet_index));
1261 index->offset = EOF;
1262 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1263 stream->has_stream_hung_up = true;
1264 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1265 break;
1266 case LTTNG_VIEWER_INDEX_ERR:
1267 memset(index, 0, sizeof(struct packet_index));
1268 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1269 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1270 goto end;
1271 default:
0f5c5d5c
SM
1272 BT_CPPLOGD_SPEC(viewer_connection->logger,
1273 "Received get_next_index response: unknown value");
4164020e
SM
1274 memset(index, 0, sizeof(struct packet_index));
1275 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1276 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1277 goto end;
1278 }
ca622546 1279
4164020e 1280 goto end;
7cdc2bab
MD
1281
1282error:
4164020e 1283 status = viewer_status_to_live_iterator_status(viewer_status);
f79c2d7a 1284end:
4164020e 1285 return status;
7cdc2bab
MD
1286}
1287
4164020e
SM
1288enum ctf_msg_iter_medium_status
1289lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1290 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1291 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1292{
4164020e
SM
1293 enum ctf_msg_iter_medium_status status;
1294 enum lttng_live_viewer_status viewer_status;
1295 struct lttng_viewer_trace_packet rp;
1296 struct lttng_viewer_cmd cmd;
1297 struct lttng_viewer_get_packet rq;
d721bef8 1298 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1299 struct lttng_live_trace *trace = stream->trace;
1300 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1301 char cmd_buf[cmd_buf_len];
1302 uint32_t flags, rp_status;
1303
0f5c5d5c
SM
1304 BT_CPPLOGD_SPEC(viewer_connection->logger,
1305 "Requesting data from stream: cmd={}, "
1306 "offset={}, request-len={}",
1307 LTTNG_VIEWER_GET_PACKET, offset, req_len);
4164020e
SM
1308
1309 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1310 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1311 cmd.cmd_version = htobe32(0);
1312
1313 memset(&rq, 0, sizeof(rq));
1314 rq.stream_id = htobe64(stream->viewer_stream_id);
1315 rq.offset = htobe64(offset);
1316 rq.len = htobe32(req_len);
1317
1318 /*
1319 * Merge the cmd and connection request to prevent a write-write
1320 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1321 * second write to be performed quickly in presence of Nagle's algorithm.
1322 */
1323 memcpy(cmd_buf, &cmd, sizeof(cmd));
1324 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1325
1326 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1327 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1328 viewer_handle_send_status(viewer_status, "get data packet command");
4164020e
SM
1329 goto error_convert_status;
1330 }
1331
1332 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1333 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1334 viewer_handle_recv_status(viewer_status, "get data packet reply");
4164020e
SM
1335 goto error_convert_status;
1336 }
1337
1338 flags = be32toh(rp.flags);
1339 rp_status = be32toh(rp.status);
1340
0f5c5d5c
SM
1341 BT_CPPLOGD_SPEC(
1342 viewer_connection->logger, "Received response from relay daemon: cmd={}, response={}",
1343 LTTNG_VIEWER_GET_PACKET, static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1344 switch (rp_status) {
1345 case LTTNG_VIEWER_GET_PACKET_OK:
1346 req_len = be32toh(rp.len);
0f5c5d5c
SM
1347 BT_CPPLOGD_SPEC(viewer_connection->logger,
1348 "Got packet from relay daemon: response={}, packet-len={}",
1349 static_cast<lttng_viewer_get_packet_return_code>(rp_status), req_len);
4164020e
SM
1350 break;
1351 case LTTNG_VIEWER_GET_PACKET_RETRY:
1352 /* Unimplemented by relay daemon */
1353 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1354 goto end;
1355 case LTTNG_VIEWER_GET_PACKET_ERR:
1356 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
0f5c5d5c
SM
1357 BT_CPPLOGD_SPEC(viewer_connection->logger,
1358 "Marking trace as needing new metadata: "
1359 "response={}, response-flag=NEW_METADATA, trace-id={}",
1360 static_cast<lttng_viewer_get_packet_return_code>(rp_status), trace->id);
4164020e
SM
1361 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1362 }
1363 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
0f5c5d5c
SM
1364 BT_CPPLOGD_SPEC(viewer_connection->logger,
1365 "Marking all sessions as possibly needing new streams: "
1366 "response={}, response-flag=NEW_STREAM",
1367 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1368 lttng_live_need_new_streams(lttng_live_msg_iter);
1369 }
1370 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1371 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
0f5c5d5c
SM
1372 BT_CPPLOGD_SPEC(viewer_connection->logger,
1373 "Reply with any one flags set means we should retry: response={}",
1374 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
4164020e
SM
1375 goto end;
1376 }
0f5c5d5c
SM
1377 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1378 "Received get_data_packet response: error");
4164020e
SM
1379 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1380 goto end;
1381 case LTTNG_VIEWER_GET_PACKET_EOF:
1382 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1383 goto end;
1384 default:
0f5c5d5c
SM
1385 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1386 "Received get_data_packet response: unknown ({})", rp_status);
4164020e
SM
1387 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1388 goto end;
1389 }
1390
1391 if (req_len == 0) {
1392 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1393 goto end;
1394 }
1395
1396 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1397 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1398 viewer_handle_recv_status(viewer_status, "get data packet");
4164020e
SM
1399 goto error_convert_status;
1400 }
1401 *recv_len = req_len;
1402
1403 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1404 goto end;
f79c2d7a 1405
535fb48a 1406error_convert_status:
4164020e 1407 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
f79c2d7a 1408end:
4164020e 1409 return status;
7cdc2bab
MD
1410}
1411
1412/*
1413 * Request new streams for a session.
1414 */
4164020e
SM
1415enum lttng_live_iterator_status
1416lttng_live_session_get_new_streams(struct lttng_live_session *session,
1417 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1418{
4164020e
SM
1419 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1420 struct lttng_viewer_cmd cmd;
1421 struct lttng_viewer_new_streams_request rq;
1422 struct lttng_viewer_new_streams_response rp;
1423 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1424 enum lttng_live_viewer_status viewer_status;
d721bef8 1425 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
4164020e
SM
1426 uint32_t streams_count;
1427 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1428 char cmd_buf[cmd_buf_len];
1429
1430 if (!session->new_streams_needed) {
1431 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1432 goto end;
1433 }
1434
0f5c5d5c
SM
1435 BT_CPPLOGD_SPEC(viewer_connection->logger,
1436 "Requesting new streams for session: cmd={}, session-id={}",
1437 LTTNG_VIEWER_GET_NEW_STREAMS, session->id);
4164020e
SM
1438
1439 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1440 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1441 cmd.cmd_version = htobe32(0);
1442
1443 memset(&rq, 0, sizeof(rq));
1444 rq.session_id = htobe64(session->id);
1445
1446 /*
1447 * Merge the cmd and connection request to prevent a write-write
1448 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1449 * second write to be performed quickly in presence of Nagle's algorithm.
1450 */
1451 memcpy(cmd_buf, &cmd, sizeof(cmd));
1452 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1453
1454 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1455 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1456 viewer_handle_send_status(viewer_status, "get new streams command");
4164020e
SM
1457 status = viewer_status_to_live_iterator_status(viewer_status);
1458 goto end;
1459 }
1460
1461 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1462 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1463 viewer_handle_recv_status(viewer_status, "get new streams reply");
4164020e
SM
1464 status = viewer_status_to_live_iterator_status(viewer_status);
1465 goto end;
1466 }
1467
1468 streams_count = be32toh(rp.streams_count);
1469
1470 switch (be32toh(rp.status)) {
1471 case LTTNG_VIEWER_NEW_STREAMS_OK:
1472 session->new_streams_needed = false;
1473 break;
1474 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1475 session->new_streams_needed = false;
1476 goto end;
1477 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1478 session->new_streams_needed = false;
1479 session->closed = true;
1480 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1481 goto end;
1482 case LTTNG_VIEWER_NEW_STREAMS_ERR:
0f5c5d5c 1483 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_new_streams response: error");
4164020e
SM
1484 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1485 goto end;
1486 default:
0f5c5d5c
SM
1487 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1488 "Received get_new_streams response: Unknown:"
1489 "return code {}",
1490 be32toh(rp.status));
4164020e
SM
1491 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1492 goto end;
1493 }
1494
1495 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1496 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
0f5c5d5c 1497 viewer_handle_recv_status(viewer_status, "new streams");
4164020e
SM
1498 status = viewer_status_to_live_iterator_status(viewer_status);
1499 goto end;
1500 }
1501
1502 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
f79c2d7a 1503end:
4164020e 1504 return status;
7cdc2bab
MD
1505}
1506
d721bef8
SM
1507enum lttng_live_viewer_status
1508live_viewer_connection_create(const char *url, bool in_query,
1509 struct lttng_live_msg_iter *lttng_live_msg_iter,
1510 const bt2c::Logger& parentLogger, live_viewer_connection::UP& viewer)
7cdc2bab 1511{
d721bef8 1512 auto viewer_connection = bt2s::make_unique<live_viewer_connection>(parentLogger);
4164020e 1513
0f5c5d5c
SM
1514 if (bt_socket_init(viewer_connection->logger) != 0) {
1515 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to init socket");
d721bef8 1516 return LTTNG_LIVE_VIEWER_STATUS_ERROR;
4164020e
SM
1517 }
1518
4164020e
SM
1519 viewer_connection->control_sock = BT_INVALID_SOCKET;
1520 viewer_connection->port = -1;
1521 viewer_connection->in_query = in_query;
1522 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
30d81897 1523 viewer_connection->url = url;
4164020e 1524
0f5c5d5c 1525 BT_CPPLOGD_SPEC(viewer_connection->logger, "Establishing connection to url \"{}\"...", url);
d721bef8
SM
1526 const auto status = lttng_live_connect_viewer(viewer_connection.get());
1527
4164020e
SM
1528 /*
1529 * Only print error and append cause in case of error. not in case of
1530 * interruption.
1531 */
1532 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
0f5c5d5c
SM
1533 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1534 "Failed to establish connection: "
1535 "url=\"{}\"",
1536 url);
d721bef8 1537 return status;
4164020e 1538 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
d721bef8 1539 return status;
4164020e 1540 }
0f5c5d5c 1541 BT_CPPLOGD_SPEC(viewer_connection->logger, "Connection to url \"{}\" is established", url);
4164020e 1542
d721bef8
SM
1543 viewer = std::move(viewer_connection);
1544 return LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab
MD
1545}
1546
277bcb7f 1547live_viewer_connection::~live_viewer_connection()
7cdc2bab 1548{
277bcb7f 1549 BT_CPPLOGD_SPEC(this->logger, "Closing connection to relay: relay-url=\"{}\"", this->url);
b9e6ec43 1550
277bcb7f 1551 lttng_live_disconnect_viewer(this);
1cb3cdd7 1552
4164020e 1553 bt_socket_fini();
7cdc2bab 1554}
This page took 0.177812 seconds and 4 git commands to generate.