src.ctf.lttng-live: use viewer_connection_close_socket more
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #include <glib.h>
9 #include <stdint.h>
10 #include <stdio.h>
11
12 #include <babeltrace2/babeltrace.h>
13
14 #include "common/common.h"
15 #include "compat/endian.h" /* IWYU pragma: keep */
16 #include "cpp-common/bt2s/make-unique.hpp"
17
18 #include "data-stream.hpp"
19 #include "lttng-live.hpp"
20 #include "lttng-viewer-abi.hpp"
21 #include "metadata.hpp"
22 #include "viewer-connection.hpp"
23
24 #define viewer_handle_send_recv_status(_status, _action, _msg_str) \
25 do { \
26 switch (_status) { \
27 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
28 break; \
29 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
30 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, \
31 "Error " _action " " _msg_str); \
32 break; \
33 default: \
34 bt_common_abort(); \
35 } \
36 } while (0)
37
38 #define viewer_handle_send_status(_status, _msg_str) \
39 viewer_handle_send_recv_status(_status, "sending", _msg_str)
40
41 #define viewer_handle_recv_status(_status, _msg_str) \
42 viewer_handle_send_recv_status(_status, "receiving", _msg_str)
43
44 #define LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO(_msg, _fmt, ...) \
45 do { \
46 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, _msg ": {}" _fmt, \
47 bt_socket_errormsg(), ##__VA_ARGS__); \
48 } while (0)
49
50 static inline enum lttng_live_iterator_status
51 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
52 {
53 switch (viewer_status) {
54 case LTTNG_LIVE_VIEWER_STATUS_OK:
55 return LTTNG_LIVE_ITERATOR_STATUS_OK;
56 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
57 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
58 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
59 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
60 }
61
62 bt_common_abort();
63 }
64
65 static inline enum ctf_msg_iter_medium_status
66 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
67 {
68 switch (viewer_status) {
69 case LTTNG_LIVE_VIEWER_STATUS_OK:
70 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
71 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
72 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
73 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
74 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
75 }
76
77 bt_common_abort();
78 }
79
80 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
81 {
82 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
83 return;
84 }
85
86 int ret = bt_socket_close(viewer_connection->control_sock);
87 if (ret == -1) {
88 BT_CPPLOGW_ERRNO_SPEC(viewer_connection->logger,
89 "Error closing viewer connection socket: ", ".");
90 }
91
92 viewer_connection->control_sock = BT_INVALID_SOCKET;
93 }
94
95 /*
96 * This function receives a message from the Relay daemon.
97 * If it received the entire message, it returns _OK,
98 * If it's interrupted, it returns _INTERRUPTED,
99 * otherwise, it returns _ERROR.
100 */
101 static enum lttng_live_viewer_status
102 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
103 {
104 ssize_t received;
105 size_t total_received = 0, to_receive = len;
106 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
107 enum lttng_live_viewer_status status;
108 BT_SOCKET sock = viewer_connection->control_sock;
109
110 /*
111 * Receive a message from the Relay.
112 */
113 do {
114 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
115 if (received == BT_SOCKET_ERROR) {
116 if (bt_socket_interrupted()) {
117 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
118 /*
119 * This interruption was due to a
120 * SIGINT and the graph is being torn
121 * down.
122 */
123 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
124 lttng_live_msg_iter->was_interrupted = true;
125 goto end;
126 } else {
127 /*
128 * A signal was received, but the graph
129 * is not being torn down. Carry on.
130 */
131 continue;
132 }
133 } else {
134 /*
135 * For any other types of socket error, close
136 * the socket and return an error.
137 */
138 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error receiving from Relay", ".");
139
140 viewer_connection_close_socket(viewer_connection);
141 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
142 goto end;
143 }
144 } else if (received == 0) {
145 /*
146 * The recv() call returned 0. This means the
147 * connection was orderly shutdown from the other peer.
148 * If that happens when we are trying to receive
149 * a message from it, it means something when wrong.
150 * Close the socket and return an error.
151 */
152 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
153 "Remote side has closed connection");
154 viewer_connection_close_socket(viewer_connection);
155 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
156 goto end;
157 }
158
159 BT_ASSERT(received <= to_receive);
160 total_received += received;
161 to_receive -= received;
162
163 } while (to_receive > 0);
164
165 BT_ASSERT(total_received == len);
166 status = LTTNG_LIVE_VIEWER_STATUS_OK;
167
168 end:
169 return status;
170 }
171
172 /*
173 * This function sends a message to the Relay daemon.
174 * If it send the message, it returns _OK,
175 * If it's interrupted, it returns _INTERRUPTED,
176 * otherwise, it returns _ERROR.
177 */
178 static enum lttng_live_viewer_status
179 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
180 {
181 enum lttng_live_viewer_status status;
182 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
183 BT_SOCKET sock = viewer_connection->control_sock;
184 size_t to_send = len;
185 ssize_t total_sent = 0;
186
187 do {
188 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
189 if (sent == BT_SOCKET_ERROR) {
190 if (bt_socket_interrupted()) {
191 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
192 /*
193 * This interruption was a SIGINT and
194 * the graph is being teared down.
195 */
196 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
197 lttng_live_msg_iter->was_interrupted = true;
198 goto end;
199 } else {
200 /*
201 * A signal was received, but the graph
202 * is not being teared down. Carry on.
203 */
204 continue;
205 }
206 } else {
207 /*
208 * For any other types of socket error, close
209 * the socket and return an error.
210 */
211 LTTNG_LIVE_CPPLOGE_APPEND_CAUSE_ERRNO("Error sending to Relay", ".");
212
213 viewer_connection_close_socket(viewer_connection);
214 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
215 goto end;
216 }
217 }
218
219 BT_ASSERT(sent <= to_send);
220 total_sent += sent;
221 to_send -= sent;
222
223 } while (to_send > 0);
224
225 BT_ASSERT(total_sent == len);
226 status = LTTNG_LIVE_VIEWER_STATUS_OK;
227
228 end:
229 return status;
230 }
231
232 static int parse_url(struct live_viewer_connection *viewer_connection)
233 {
234 char error_buf[256] = {0};
235 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
236 bt_common_lttng_live_url_parts_deleter partsDeleter {lttng_live_url_parts};
237 int ret = -1;
238
239 if (viewer_connection->url.empty()) {
240 goto end;
241 }
242
243 lttng_live_url_parts = bt_common_parse_lttng_live_url(viewer_connection->url.c_str(), error_buf,
244 sizeof(error_buf));
245 if (!lttng_live_url_parts.proto) {
246 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Invalid LTTng live URL format: {}",
247 error_buf);
248 goto end;
249 }
250 viewer_connection->proto.reset(lttng_live_url_parts.proto);
251 lttng_live_url_parts.proto = NULL;
252
253 viewer_connection->relay_hostname.reset(lttng_live_url_parts.hostname);
254 lttng_live_url_parts.hostname = NULL;
255
256 if (lttng_live_url_parts.port >= 0) {
257 viewer_connection->port = lttng_live_url_parts.port;
258 } else {
259 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
260 }
261
262 viewer_connection->target_hostname.reset(lttng_live_url_parts.target_hostname);
263 lttng_live_url_parts.target_hostname = NULL;
264
265 if (lttng_live_url_parts.session_name) {
266 viewer_connection->session_name.reset(lttng_live_url_parts.session_name);
267 lttng_live_url_parts.session_name = NULL;
268 }
269
270 ret = 0;
271
272 end:
273 return ret;
274 }
275
276 static enum lttng_live_viewer_status
277 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
278 {
279 struct lttng_viewer_cmd cmd;
280 struct lttng_viewer_connect connect;
281 enum lttng_live_viewer_status status;
282 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
283 char cmd_buf[cmd_buf_len];
284
285 BT_CPPLOGD_SPEC(viewer_connection->logger,
286 "Handshaking with the relay daemon: cmd={}, major-version={}, minor-version={}",
287 LTTNG_VIEWER_CONNECT, LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
288
289 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
290 cmd.data_size = htobe64((uint64_t) sizeof(connect));
291 cmd.cmd_version = htobe32(0);
292
293 connect.viewer_session_id = -1ULL; /* will be set on recv */
294 connect.major = htobe32(LTTNG_LIVE_MAJOR);
295 connect.minor = htobe32(LTTNG_LIVE_MINOR);
296 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
297
298 /*
299 * Merge the cmd and connection request to prevent a write-write
300 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
301 * second write to be performed quickly in presence of Nagle's algorithm
302 */
303 memcpy(cmd_buf, &cmd, sizeof(cmd));
304 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
305
306 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
307 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
308 viewer_handle_send_status(status, "viewer connect command");
309 goto end;
310 }
311
312 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
313 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
314 viewer_handle_recv_status(status, "viewer connect reply");
315 goto end;
316 }
317
318 BT_CPPLOGI_SPEC(viewer_connection->logger, "Received viewer session ID : {}",
319 (uint64_t) be64toh(connect.viewer_session_id));
320 BT_CPPLOGI_SPEC(viewer_connection->logger, "Relayd version : {}.{}", be32toh(connect.major),
321 be32toh(connect.minor));
322
323 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
324 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
325 "Incompatible lttng-relayd protocol");
326 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
327 goto end;
328 }
329 /* Use the smallest protocol version implemented. */
330 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
331 viewer_connection->minor = be32toh(connect.minor);
332 } else {
333 viewer_connection->minor = LTTNG_LIVE_MINOR;
334 }
335 viewer_connection->major = LTTNG_LIVE_MAJOR;
336
337 status = LTTNG_LIVE_VIEWER_STATUS_OK;
338
339 goto end;
340
341 end:
342 return status;
343 }
344
345 static enum lttng_live_viewer_status
346 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
347 {
348 struct hostent *host;
349 struct sockaddr_in server_addr;
350 enum lttng_live_viewer_status status;
351
352 if (parse_url(viewer_connection)) {
353 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to parse URL");
354 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
355 goto error;
356 }
357
358 BT_CPPLOGD_SPEC(
359 viewer_connection->logger,
360 "Connecting to hostname : {}, port : {}, target hostname : {}, session name : {}, proto : {}",
361 viewer_connection->relay_hostname->str, viewer_connection->port,
362 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
363 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
364 viewer_connection->proto->str);
365
366 host = gethostbyname(viewer_connection->relay_hostname->str);
367 if (!host) {
368 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
369 "Cannot lookup hostname: hostname=\"{}\"",
370 viewer_connection->relay_hostname->str);
371 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
372 goto error;
373 }
374
375 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
376 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Socket creation failed: {}",
377 bt_socket_errormsg());
378 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
379 goto error;
380 }
381
382 server_addr.sin_family = AF_INET;
383 server_addr.sin_port = htons(viewer_connection->port);
384 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
385 memset(&(server_addr.sin_zero), 0, 8);
386
387 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
388 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
389 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Connection failed: {}",
390 bt_socket_errormsg());
391 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
392 goto error;
393 }
394
395 status = lttng_live_handshake(viewer_connection);
396
397 /*
398 * Only print error and append cause in case of error. not in case of
399 * interruption.
400 */
401 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
402 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Viewer handshake failed");
403 goto error;
404 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
405 goto end;
406 }
407
408 goto end;
409
410 error:
411 viewer_connection_close_socket(viewer_connection);
412
413 end:
414 return status;
415 }
416
417 static int list_update_session(const bt2::ArrayValue results,
418 const struct lttng_viewer_session *session, bool *_found,
419 struct live_viewer_connection *viewer_connection)
420 {
421 bool found = false;
422
423 for (const auto value : results) {
424 const auto map = value.asMap();
425 const auto hostnameVal = map["target-hostname"];
426
427 if (!hostnameVal) {
428 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
429 "Error borrowing \"target-hostname\" entry.");
430 return -1;
431 }
432
433 const auto sessionNameVal = map["session-name"];
434
435 if (!sessionNameVal) {
436 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
437 "Error borrowing \"session-name\" entry.");
438 return -1;
439 }
440
441 const auto hostname_str = hostnameVal->asString().value();
442 const auto session_name_str = sessionNameVal->asString().value();
443
444 if (strcmp(session->hostname, hostname_str) == 0 &&
445 strcmp(session->session_name, session_name_str) == 0) {
446 uint32_t streams = be32toh(session->streams);
447 uint32_t clients = be32toh(session->clients);
448
449 found = true;
450
451 const auto streamCountVal = map["stream-count"];
452
453 if (!streamCountVal) {
454 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
455 "Error borrowing \"stream-count\" entry.");
456 return -1;
457 }
458
459 auto val = streamCountVal->asUnsignedInteger().value();
460
461 /* sum */
462 val += streams;
463 streamCountVal->asUnsignedInteger().value(val);
464
465 const auto clientCountVal = map["client-count"];
466
467 if (!clientCountVal) {
468 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
469 "Error borrowing \"client-count\" entry.");
470 return -1;
471 }
472
473 val = clientCountVal->asUnsignedInteger().value();
474
475 /* max */
476 val = std::max<uint64_t>(clients, val);
477 clientCountVal->asUnsignedInteger().value(val);
478 }
479
480 if (found) {
481 break;
482 }
483 }
484
485 *_found = found;
486 return 0;
487 }
488
489 static int list_append_session(const bt2::ArrayValue results, const std::string& base_url,
490 const struct lttng_viewer_session *session,
491 struct live_viewer_connection *viewer_connection)
492 {
493 int ret = 0;
494 bool found = false;
495
496 /*
497 * If the session already exists, add the stream count to it,
498 * and do max of client counts.
499 */
500 ret = list_update_session(results, session, &found, viewer_connection);
501 if (ret || found) {
502 return ret;
503 }
504
505 const auto map = bt2::MapValue::create();
506
507 if (base_url.empty()) {
508 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error: base_url empty.");
509 return -1;
510 }
511
512 /*
513 * key = "url",
514 * value = <string>,
515 */
516 map->insert("url",
517 fmt::format("{}/host/{}/{}", base_url, session->hostname, session->session_name));
518
519 /*
520 * key = "target-hostname",
521 * value = <string>,
522 */
523 map->insert("target-hostname", session->hostname);
524
525 /*
526 * key = "session-name",
527 * value = <string>,
528 */
529 map->insert("session-name", session->session_name);
530
531 /*
532 * key = "timer-us",
533 * value = <integer>,
534 */
535 {
536 uint32_t live_timer = be32toh(session->live_timer);
537
538 map->insert("timer-us", (uint64_t) live_timer);
539 }
540
541 /*
542 * key = "stream-count",
543 * value = <integer>,
544 */
545 {
546 uint32_t streams = be32toh(session->streams);
547
548 map->insert("stream-count", (uint64_t) streams);
549 }
550
551 /*
552 * key = "client-count",
553 * value = <integer>,
554 */
555 {
556 uint32_t clients = be32toh(session->clients);
557
558 map->insert("client-count", (uint64_t) clients);
559 }
560
561 results.append(*map);
562 return 0;
563 }
564
565 /*
566 * Data structure returned:
567 *
568 * {
569 * <array> = {
570 * [n] = {
571 * <map> = {
572 * {
573 * key = "url",
574 * value = <string>,
575 * },
576 * {
577 * key = "target-hostname",
578 * value = <string>,
579 * },
580 * {
581 * key = "session-name",
582 * value = <string>,
583 * },
584 * {
585 * key = "timer-us",
586 * value = <integer>,
587 * },
588 * {
589 * key = "stream-count",
590 * value = <integer>,
591 * },
592 * {
593 * key = "client-count",
594 * value = <integer>,
595 * },
596 * },
597 * }
598 * }
599 */
600
601 bt2::Value::Shared
602 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection)
603 {
604 enum lttng_live_viewer_status viewer_status;
605 struct lttng_viewer_cmd cmd;
606 struct lttng_viewer_list_sessions list;
607 uint32_t i, sessions_count;
608 auto result = bt2::ArrayValue::create();
609
610 BT_CPPLOGD_SPEC(viewer_connection->logger, "Requesting list of sessions: cmd={}",
611 LTTNG_VIEWER_LIST_SESSIONS);
612
613 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
614 cmd.data_size = htobe64((uint64_t) 0);
615 cmd.cmd_version = htobe32(0);
616
617 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
618 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
619 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
620 "Error sending list sessions command");
621 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
622 throw bt2c::TryAgain {};
623 }
624
625 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
626 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
627 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
628 "Error receiving session list");
629 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
630 throw bt2c::TryAgain {};
631 }
632
633 sessions_count = be32toh(list.sessions_count);
634 for (i = 0; i < sessions_count; i++) {
635 struct lttng_viewer_session lsession;
636
637 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
638 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
639 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
640 "Error receiving session:");
641 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
642 throw bt2c::TryAgain {};
643 }
644
645 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
646 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
647 if (list_append_session(*result, viewer_connection->url, &lsession, viewer_connection)) {
648 BT_CPPLOGE_APPEND_CAUSE_AND_THROW_SPEC(viewer_connection->logger, bt2::Error,
649 "Error appending session");
650 }
651 }
652
653 return result;
654 }
655
656 static enum lttng_live_viewer_status
657 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
658 {
659 struct lttng_viewer_cmd cmd;
660 struct lttng_viewer_list_sessions list;
661 struct lttng_viewer_session lsession;
662 uint32_t i, sessions_count;
663 uint64_t session_id;
664 enum lttng_live_viewer_status status;
665 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
666
667 BT_CPPLOGD_SPEC(viewer_connection->logger,
668 "Asking the relay daemon for the list of sessions: cmd={}",
669 LTTNG_VIEWER_LIST_SESSIONS);
670
671 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
672 cmd.data_size = htobe64((uint64_t) 0);
673 cmd.cmd_version = htobe32(0);
674
675 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
676 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
677 viewer_handle_send_status(status, "list sessions command");
678 goto end;
679 }
680
681 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
682 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
683 viewer_handle_recv_status(status, "session list reply");
684 goto end;
685 }
686
687 sessions_count = be32toh(list.sessions_count);
688 for (i = 0; i < sessions_count; i++) {
689 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
690 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
691 viewer_handle_recv_status(status, "session reply");
692 goto end;
693 }
694 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
695 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
696 session_id = be64toh(lsession.id);
697
698 BT_CPPLOGI_SPEC(viewer_connection->logger,
699 "Adding session to internal list: "
700 "session-id={}, hostname=\"{}\", session-name=\"{}\"",
701 session_id, lsession.hostname, lsession.session_name);
702
703 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
704 LTTNG_VIEWER_NAME_MAX) == 0) &&
705 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
706 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
707 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
708 lsession.session_name)) {
709 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
710 "Failed to add live session");
711 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
712 goto end;
713 }
714 }
715 }
716
717 status = LTTNG_LIVE_VIEWER_STATUS_OK;
718
719 end:
720 return status;
721 }
722
723 enum lttng_live_viewer_status
724 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
725 {
726 struct lttng_viewer_cmd cmd;
727 struct lttng_viewer_create_session_response resp;
728 enum lttng_live_viewer_status status;
729 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
730
731 BT_CPPLOGD_SPEC(viewer_connection->logger, "Creating a viewer session: cmd={}",
732 LTTNG_VIEWER_CREATE_SESSION);
733
734 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
735 cmd.data_size = htobe64((uint64_t) 0);
736 cmd.cmd_version = htobe32(0);
737
738 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
739 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
740 viewer_handle_send_status(status, "create session command");
741 goto end;
742 }
743
744 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
745 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
746 viewer_handle_recv_status(status, "create session reply");
747 goto end;
748 }
749
750 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
751 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating viewer session");
752 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
753 goto end;
754 }
755
756 status = lttng_live_query_session_ids(lttng_live_msg_iter);
757 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
758 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
759 "Failed to query live viewer session ids");
760 goto end;
761 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
762 goto end;
763 }
764
765 end:
766 return status;
767 }
768
769 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
770 uint32_t stream_count,
771 bt_self_message_iterator *self_msg_iter)
772 {
773 uint32_t i;
774 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
775 enum lttng_live_viewer_status status;
776 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
777
778 BT_CPPLOGI_SPEC(viewer_connection->logger, "Getting {} new streams", stream_count);
779 for (i = 0; i < stream_count; i++) {
780 struct lttng_viewer_stream stream;
781 struct lttng_live_stream_iterator *live_stream;
782 uint64_t stream_id;
783 uint64_t ctf_trace_id;
784
785 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
786 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
787 viewer_handle_recv_status(status, "stream reply");
788 goto end;
789 }
790 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
791 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
792 stream_id = be64toh(stream.id);
793 ctf_trace_id = be64toh(stream.ctf_trace_id);
794
795 if (stream.metadata_flag) {
796 BT_CPPLOGI_SPEC(viewer_connection->logger, " metadata stream {} : {}/{}", stream_id,
797 stream.path_name, stream.channel_name);
798 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
799 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
800 "Error creating metadata stream");
801 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
802 goto end;
803 }
804 session->lazy_stream_msg_init = true;
805 } else {
806 BT_CPPLOGI_SPEC(viewer_connection->logger, " stream {} : {}/{}", stream_id,
807 stream.path_name, stream.channel_name);
808 live_stream =
809 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
810 if (!live_stream) {
811 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error creating stream");
812 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
813 goto end;
814 }
815 }
816 }
817 status = LTTNG_LIVE_VIEWER_STATUS_OK;
818
819 end:
820 return status;
821 }
822
823 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
824 bt_self_message_iterator *self_msg_iter)
825 {
826 struct lttng_viewer_cmd cmd;
827 enum lttng_live_viewer_status status;
828 struct lttng_viewer_attach_session_request rq;
829 struct lttng_viewer_attach_session_response rp;
830 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
831 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
832 uint64_t session_id = session->id;
833 uint32_t streams_count;
834 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
835 char cmd_buf[cmd_buf_len];
836
837 BT_CPPLOGD_SPEC(viewer_connection->logger,
838 "Attaching to session: cmd={}, session-id={}, seek={}",
839 LTTNG_VIEWER_ATTACH_SESSION, session_id, LTTNG_VIEWER_SEEK_LAST);
840
841 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
842 cmd.data_size = htobe64((uint64_t) sizeof(rq));
843 cmd.cmd_version = htobe32(0);
844
845 memset(&rq, 0, sizeof(rq));
846 rq.session_id = htobe64(session_id);
847 // TODO: add cmd line parameter to select seek beginning
848 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
849 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
850
851 /*
852 * Merge the cmd and connection request to prevent a write-write
853 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
854 * second write to be performed quickly in presence of Nagle's algorithm.
855 */
856 memcpy(cmd_buf, &cmd, sizeof(cmd));
857 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
858 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
859 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
860 viewer_handle_send_status(status, "attach session command");
861 goto end;
862 }
863
864 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
865 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
866 viewer_handle_recv_status(status, "attach session reply");
867 goto end;
868 }
869
870 streams_count = be32toh(rp.streams_count);
871 switch (be32toh(rp.status)) {
872 case LTTNG_VIEWER_ATTACH_OK:
873 break;
874 case LTTNG_VIEWER_ATTACH_UNK:
875 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Session id {} is unknown",
876 session_id);
877 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
878 goto end;
879 case LTTNG_VIEWER_ATTACH_ALREADY:
880 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
881 "There is already a viewer attached to this session");
882 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
883 goto end;
884 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
885 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Not a live session");
886 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
887 goto end;
888 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
889 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Wrong seek parameter");
890 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
891 goto end;
892 default:
893 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Unknown attach return code {}",
894 be32toh(rp.status));
895 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
896 goto end;
897 }
898
899 /* We receive the initial list of streams. */
900 status = receive_streams(session, streams_count, self_msg_iter);
901 switch (status) {
902 case LTTNG_LIVE_VIEWER_STATUS_OK:
903 break;
904 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
905 goto end;
906 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
907 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Error receiving streams");
908 goto end;
909 default:
910 bt_common_abort();
911 }
912
913 session->attached = true;
914 session->new_streams_needed = false;
915
916 end:
917 return status;
918 }
919
920 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
921 {
922 struct lttng_viewer_cmd cmd;
923 enum lttng_live_viewer_status status;
924 struct lttng_viewer_detach_session_request rq;
925 struct lttng_viewer_detach_session_response rp;
926 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
927 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
928 uint64_t session_id = session->id;
929 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
930 char cmd_buf[cmd_buf_len];
931
932 /*
933 * The session might already be detached and the viewer socket might
934 * already been closed. This happens when calling this function when
935 * tearing down the graph after an error.
936 */
937 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
938 return LTTNG_LIVE_VIEWER_STATUS_OK;
939 }
940
941 BT_CPPLOGD_SPEC(viewer_connection->logger, "Detaching from session: cmd={}, session-id={}",
942 LTTNG_VIEWER_DETACH_SESSION, session_id);
943
944 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
945 cmd.data_size = htobe64((uint64_t) sizeof(rq));
946 cmd.cmd_version = htobe32(0);
947
948 memset(&rq, 0, sizeof(rq));
949 rq.session_id = htobe64(session_id);
950
951 /*
952 * Merge the cmd and connection request to prevent a write-write
953 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
954 * second write to be performed quickly in presence of Nagle's algorithm.
955 */
956 memcpy(cmd_buf, &cmd, sizeof(cmd));
957 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
958 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
959 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
960 viewer_handle_send_status(status, "detach session command");
961 goto end;
962 }
963
964 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
965 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
966 viewer_handle_recv_status(status, "detach session reply");
967 goto end;
968 }
969
970 switch (be32toh(rp.status)) {
971 case LTTNG_VIEWER_DETACH_SESSION_OK:
972 break;
973 case LTTNG_VIEWER_DETACH_SESSION_UNK:
974 BT_CPPLOGW_SPEC(viewer_connection->logger, "Session id {} is unknown", session_id);
975 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
976 goto end;
977 case LTTNG_VIEWER_DETACH_SESSION_ERR:
978 BT_CPPLOGW_SPEC(viewer_connection->logger, "Error detaching session id {}", session_id);
979 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
980 goto end;
981 default:
982 BT_CPPLOGE_SPEC(viewer_connection->logger, "Unknown detach return code {}",
983 be32toh(rp.status));
984 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
985 goto end;
986 }
987
988 session->attached = false;
989
990 status = LTTNG_LIVE_VIEWER_STATUS_OK;
991
992 end:
993 return status;
994 }
995
996 enum lttng_live_get_one_metadata_status
997 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
998 {
999 uint64_t len = 0;
1000 enum lttng_live_get_one_metadata_status status;
1001 enum lttng_live_viewer_status viewer_status;
1002 struct lttng_viewer_cmd cmd;
1003 struct lttng_viewer_get_metadata rq;
1004 struct lttng_viewer_metadata_packet rp;
1005 std::vector<char> data;
1006 struct lttng_live_session *session = trace->session;
1007 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1008 struct lttng_live_metadata *metadata = trace->metadata.get();
1009 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
1010 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1011 char cmd_buf[cmd_buf_len];
1012
1013 BT_CPPLOGD_SPEC(viewer_connection->logger,
1014 "Requesting new metadata for trace:"
1015 "cmd={}, trace-id={}, metadata-stream-id={}",
1016 LTTNG_VIEWER_GET_METADATA, trace->id, metadata->stream_id);
1017
1018 rq.stream_id = htobe64(metadata->stream_id);
1019 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1020 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1021 cmd.cmd_version = htobe32(0);
1022
1023 /*
1024 * Merge the cmd and connection request to prevent a write-write
1025 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1026 * second write to be performed quickly in presence of Nagle's algorithm.
1027 */
1028 memcpy(cmd_buf, &cmd, sizeof(cmd));
1029 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1030 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1031 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1032 viewer_handle_send_status(viewer_status, "get metadata command");
1033 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1034 goto end;
1035 }
1036
1037 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1038 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1039 viewer_handle_recv_status(viewer_status, "get metadata reply");
1040 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1041 goto end;
1042 }
1043
1044 switch (be32toh(rp.status)) {
1045 case LTTNG_VIEWER_METADATA_OK:
1046 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: ok");
1047 break;
1048 case LTTNG_VIEWER_NO_NEW_METADATA:
1049 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: no new");
1050 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1051 goto end;
1052 case LTTNG_VIEWER_METADATA_ERR:
1053 /*
1054 * The Relayd cannot find this stream id. Maybe its
1055 * gone already. This can happen in short lived UST app
1056 * in a per-pid session.
1057 */
1058 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_metadata response: error");
1059 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1060 goto end;
1061 default:
1062 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1063 "Received get_metadata response: unknown");
1064 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1065 goto end;
1066 }
1067
1068 len = be64toh(rp.len);
1069 if (len == 0) {
1070 /*
1071 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1072 * length of 0. This means we must try again. This scenario
1073 * arises when a clear command is performed on an lttng session.
1074 */
1075 BT_CPPLOGD_SPEC(
1076 viewer_connection->logger,
1077 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1078 goto empty_metadata_packet_retry;
1079 }
1080
1081 BT_CPPLOGD_SPEC(viewer_connection->logger, "Writing {} bytes to metadata", len);
1082 if (len <= 0) {
1083 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Erroneous response length");
1084 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1085 goto end;
1086 }
1087
1088 data.resize(len);
1089
1090 viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
1091 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1092 viewer_handle_recv_status(viewer_status, "get metadata packet");
1093 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1094 goto end;
1095 }
1096
1097 /*
1098 * Write the metadata to the file handle.
1099 */
1100 buf.insert(buf.end(), data.begin(), data.end());
1101
1102 empty_metadata_packet_retry:
1103 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1104
1105 end:
1106 return status;
1107 }
1108
1109 /*
1110 * Assign the fields from a lttng_viewer_index to a packet_index.
1111 */
1112 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1113 struct packet_index *pindex)
1114 {
1115 BT_ASSERT(lindex);
1116 BT_ASSERT(pindex);
1117
1118 pindex->offset = be64toh(lindex->offset);
1119 pindex->packet_size = be64toh(lindex->packet_size);
1120 pindex->content_size = be64toh(lindex->content_size);
1121 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1122 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1123 pindex->events_discarded = be64toh(lindex->events_discarded);
1124 }
1125
1126 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1127 {
1128 for (const auto& session : lttng_live_msg_iter->sessions) {
1129 BT_CPPLOGD_SPEC(lttng_live_msg_iter->logger,
1130 "Marking session as needing new streams: "
1131 "session-id={}",
1132 session->id);
1133 session->new_streams_needed = true;
1134 }
1135 }
1136
1137 enum lttng_live_iterator_status
1138 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1139 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1140 {
1141 struct lttng_viewer_cmd cmd;
1142 struct lttng_viewer_get_next_index rq;
1143 enum lttng_live_viewer_status viewer_status;
1144 struct lttng_viewer_index rp;
1145 enum lttng_live_iterator_status status;
1146 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
1147 struct lttng_live_trace *trace = stream->trace;
1148 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1149 char cmd_buf[cmd_buf_len];
1150 uint32_t flags, rp_status;
1151
1152 BT_CPPLOGD_SPEC(viewer_connection->logger,
1153 "Requesting next index for stream: cmd={}, "
1154 "viewer-stream-id={}",
1155 LTTNG_VIEWER_GET_NEXT_INDEX, stream->viewer_stream_id);
1156 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1157 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1158 cmd.cmd_version = htobe32(0);
1159
1160 memset(&rq, 0, sizeof(rq));
1161 rq.stream_id = htobe64(stream->viewer_stream_id);
1162
1163 /*
1164 * Merge the cmd and connection request to prevent a write-write
1165 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1166 * second write to be performed quickly in presence of Nagle's algorithm.
1167 */
1168 memcpy(cmd_buf, &cmd, sizeof(cmd));
1169 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1170
1171 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1172 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1173 viewer_handle_send_status(viewer_status, "get next index command");
1174 goto error;
1175 }
1176
1177 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1178 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1179 viewer_handle_recv_status(viewer_status, "get next index reply");
1180 goto error;
1181 }
1182
1183 flags = be32toh(rp.flags);
1184 rp_status = be32toh(rp.status);
1185
1186 BT_CPPLOGD_SPEC(
1187 viewer_connection->logger, "Received response from relay daemon: cmd=%s, response={}",
1188 LTTNG_VIEWER_GET_NEXT_INDEX, static_cast<lttng_viewer_next_index_return_code>(rp_status));
1189
1190 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1191 BT_CPPLOGD_SPEC(viewer_connection->logger,
1192 "Marking all sessions as possibly needing new streams: "
1193 "response={}, response-flag=NEW_STREAM",
1194 static_cast<lttng_viewer_next_index_return_code>(rp_status));
1195 lttng_live_need_new_streams(lttng_live_msg_iter);
1196 }
1197
1198 switch (rp_status) {
1199 case LTTNG_VIEWER_INDEX_INACTIVE:
1200 {
1201 uint64_t ctf_stream_class_id;
1202
1203 memset(index, 0, sizeof(struct packet_index));
1204 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1205 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1206 ctf_stream_class_id = be64toh(rp.stream_id);
1207 if (stream->ctf_stream_class_id.is_set) {
1208 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1209 } else {
1210 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1211 stream->ctf_stream_class_id.is_set = true;
1212 }
1213 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1214 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1215 break;
1216 }
1217 case LTTNG_VIEWER_INDEX_OK:
1218 {
1219 uint64_t ctf_stream_class_id;
1220
1221 lttng_index_to_packet_index(&rp, index);
1222 ctf_stream_class_id = be64toh(rp.stream_id);
1223 if (stream->ctf_stream_class_id.is_set) {
1224 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1225 } else {
1226 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1227 stream->ctf_stream_class_id.is_set = true;
1228 }
1229 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1230
1231 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1232 BT_CPPLOGD_SPEC(viewer_connection->logger,
1233 "Marking trace as needing new metadata: "
1234 "response={}, response-flag=NEW_METADATA, trace-id={}",
1235 static_cast<lttng_viewer_next_index_return_code>(rp_status), trace->id);
1236 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1237 }
1238 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1239 break;
1240 }
1241 case LTTNG_VIEWER_INDEX_RETRY:
1242 memset(index, 0, sizeof(struct packet_index));
1243 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1244 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1245 goto end;
1246 case LTTNG_VIEWER_INDEX_HUP:
1247 memset(index, 0, sizeof(struct packet_index));
1248 index->offset = EOF;
1249 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1250 stream->has_stream_hung_up = true;
1251 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1252 break;
1253 case LTTNG_VIEWER_INDEX_ERR:
1254 memset(index, 0, sizeof(struct packet_index));
1255 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1256 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1257 goto end;
1258 default:
1259 BT_CPPLOGD_SPEC(viewer_connection->logger,
1260 "Received get_next_index response: unknown value");
1261 memset(index, 0, sizeof(struct packet_index));
1262 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1263 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1264 goto end;
1265 }
1266
1267 goto end;
1268
1269 error:
1270 status = viewer_status_to_live_iterator_status(viewer_status);
1271 end:
1272 return status;
1273 }
1274
1275 enum ctf_msg_iter_medium_status
1276 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1277 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1278 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1279 {
1280 enum ctf_msg_iter_medium_status status;
1281 enum lttng_live_viewer_status viewer_status;
1282 struct lttng_viewer_trace_packet rp;
1283 struct lttng_viewer_cmd cmd;
1284 struct lttng_viewer_get_packet rq;
1285 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
1286 struct lttng_live_trace *trace = stream->trace;
1287 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1288 char cmd_buf[cmd_buf_len];
1289 uint32_t flags, rp_status;
1290
1291 BT_CPPLOGD_SPEC(viewer_connection->logger,
1292 "Requesting data from stream: cmd={}, "
1293 "offset={}, request-len={}",
1294 LTTNG_VIEWER_GET_PACKET, offset, req_len);
1295
1296 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1297 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1298 cmd.cmd_version = htobe32(0);
1299
1300 memset(&rq, 0, sizeof(rq));
1301 rq.stream_id = htobe64(stream->viewer_stream_id);
1302 rq.offset = htobe64(offset);
1303 rq.len = htobe32(req_len);
1304
1305 /*
1306 * Merge the cmd and connection request to prevent a write-write
1307 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1308 * second write to be performed quickly in presence of Nagle's algorithm.
1309 */
1310 memcpy(cmd_buf, &cmd, sizeof(cmd));
1311 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1312
1313 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1314 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1315 viewer_handle_send_status(viewer_status, "get data packet command");
1316 goto error_convert_status;
1317 }
1318
1319 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1320 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1321 viewer_handle_recv_status(viewer_status, "get data packet reply");
1322 goto error_convert_status;
1323 }
1324
1325 flags = be32toh(rp.flags);
1326 rp_status = be32toh(rp.status);
1327
1328 BT_CPPLOGD_SPEC(
1329 viewer_connection->logger, "Received response from relay daemon: cmd={}, response={}",
1330 LTTNG_VIEWER_GET_PACKET, static_cast<lttng_viewer_get_packet_return_code>(rp_status));
1331 switch (rp_status) {
1332 case LTTNG_VIEWER_GET_PACKET_OK:
1333 req_len = be32toh(rp.len);
1334 BT_CPPLOGD_SPEC(viewer_connection->logger,
1335 "Got packet from relay daemon: response={}, packet-len={}",
1336 static_cast<lttng_viewer_get_packet_return_code>(rp_status), req_len);
1337 break;
1338 case LTTNG_VIEWER_GET_PACKET_RETRY:
1339 /* Unimplemented by relay daemon */
1340 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1341 goto end;
1342 case LTTNG_VIEWER_GET_PACKET_ERR:
1343 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1344 BT_CPPLOGD_SPEC(viewer_connection->logger,
1345 "Marking trace as needing new metadata: "
1346 "response={}, response-flag=NEW_METADATA, trace-id={}",
1347 static_cast<lttng_viewer_get_packet_return_code>(rp_status), trace->id);
1348 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1349 }
1350 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1351 BT_CPPLOGD_SPEC(viewer_connection->logger,
1352 "Marking all sessions as possibly needing new streams: "
1353 "response={}, response-flag=NEW_STREAM",
1354 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
1355 lttng_live_need_new_streams(lttng_live_msg_iter);
1356 }
1357 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1358 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1359 BT_CPPLOGD_SPEC(viewer_connection->logger,
1360 "Reply with any one flags set means we should retry: response={}",
1361 static_cast<lttng_viewer_get_packet_return_code>(rp_status));
1362 goto end;
1363 }
1364 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1365 "Received get_data_packet response: error");
1366 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1367 goto end;
1368 case LTTNG_VIEWER_GET_PACKET_EOF:
1369 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1370 goto end;
1371 default:
1372 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1373 "Received get_data_packet response: unknown ({})", rp_status);
1374 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1375 goto end;
1376 }
1377
1378 if (req_len == 0) {
1379 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1380 goto end;
1381 }
1382
1383 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1384 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1385 viewer_handle_recv_status(viewer_status, "get data packet");
1386 goto error_convert_status;
1387 }
1388 *recv_len = req_len;
1389
1390 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1391 goto end;
1392
1393 error_convert_status:
1394 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1395 end:
1396 return status;
1397 }
1398
1399 /*
1400 * Request new streams for a session.
1401 */
1402 enum lttng_live_iterator_status
1403 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1404 bt_self_message_iterator *self_msg_iter)
1405 {
1406 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1407 struct lttng_viewer_cmd cmd;
1408 struct lttng_viewer_new_streams_request rq;
1409 struct lttng_viewer_new_streams_response rp;
1410 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1411 enum lttng_live_viewer_status viewer_status;
1412 live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection.get();
1413 uint32_t streams_count;
1414 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1415 char cmd_buf[cmd_buf_len];
1416
1417 if (!session->new_streams_needed) {
1418 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1419 goto end;
1420 }
1421
1422 BT_CPPLOGD_SPEC(viewer_connection->logger,
1423 "Requesting new streams for session: cmd={}, session-id={}",
1424 LTTNG_VIEWER_GET_NEW_STREAMS, session->id);
1425
1426 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1427 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1428 cmd.cmd_version = htobe32(0);
1429
1430 memset(&rq, 0, sizeof(rq));
1431 rq.session_id = htobe64(session->id);
1432
1433 /*
1434 * Merge the cmd and connection request to prevent a write-write
1435 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1436 * second write to be performed quickly in presence of Nagle's algorithm.
1437 */
1438 memcpy(cmd_buf, &cmd, sizeof(cmd));
1439 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1440
1441 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1442 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1443 viewer_handle_send_status(viewer_status, "get new streams command");
1444 status = viewer_status_to_live_iterator_status(viewer_status);
1445 goto end;
1446 }
1447
1448 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1449 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1450 viewer_handle_recv_status(viewer_status, "get new streams reply");
1451 status = viewer_status_to_live_iterator_status(viewer_status);
1452 goto end;
1453 }
1454
1455 streams_count = be32toh(rp.streams_count);
1456
1457 switch (be32toh(rp.status)) {
1458 case LTTNG_VIEWER_NEW_STREAMS_OK:
1459 session->new_streams_needed = false;
1460 break;
1461 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1462 session->new_streams_needed = false;
1463 goto end;
1464 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1465 session->new_streams_needed = false;
1466 session->closed = true;
1467 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1468 goto end;
1469 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1470 BT_CPPLOGD_SPEC(viewer_connection->logger, "Received get_new_streams response: error");
1471 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1472 goto end;
1473 default:
1474 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1475 "Received get_new_streams response: Unknown:"
1476 "return code {}",
1477 be32toh(rp.status));
1478 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1479 goto end;
1480 }
1481
1482 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1483 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1484 viewer_handle_recv_status(viewer_status, "new streams");
1485 status = viewer_status_to_live_iterator_status(viewer_status);
1486 goto end;
1487 }
1488
1489 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1490 end:
1491 return status;
1492 }
1493
1494 enum lttng_live_viewer_status
1495 live_viewer_connection_create(const char *url, bool in_query,
1496 struct lttng_live_msg_iter *lttng_live_msg_iter,
1497 const bt2c::Logger& parentLogger, live_viewer_connection::UP& viewer)
1498 {
1499 auto viewer_connection = bt2s::make_unique<live_viewer_connection>(parentLogger);
1500
1501 if (bt_socket_init(viewer_connection->logger) != 0) {
1502 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger, "Failed to init socket");
1503 return LTTNG_LIVE_VIEWER_STATUS_ERROR;
1504 }
1505
1506 viewer_connection->control_sock = BT_INVALID_SOCKET;
1507 viewer_connection->port = -1;
1508 viewer_connection->in_query = in_query;
1509 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1510 viewer_connection->url = url;
1511
1512 BT_CPPLOGD_SPEC(viewer_connection->logger, "Establishing connection to url \"{}\"...", url);
1513 const auto status = lttng_live_connect_viewer(viewer_connection.get());
1514
1515 /*
1516 * Only print error and append cause in case of error. not in case of
1517 * interruption.
1518 */
1519 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1520 BT_CPPLOGE_APPEND_CAUSE_SPEC(viewer_connection->logger,
1521 "Failed to establish connection: "
1522 "url=\"{}\"",
1523 url);
1524 return status;
1525 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1526 return status;
1527 }
1528 BT_CPPLOGD_SPEC(viewer_connection->logger, "Connection to url \"{}\" is established", url);
1529
1530 viewer = std::move(viewer_connection);
1531 return LTTNG_LIVE_VIEWER_STATUS_OK;
1532 }
1533
1534 live_viewer_connection::~live_viewer_connection()
1535 {
1536 BT_CPPLOGD_SPEC(this->logger, "Closing connection to relay: relay-url=\"{}\"", this->url);
1537
1538 viewer_connection_close_socket(this);
1539
1540 bt_socket_fini();
1541 }
This page took 0.062404 seconds and 5 git commands to generate.