src.ctf.lttng-live: use std::vector in lttng_live_get_one_metadata_packet
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #define BT_CLOG_CFG (logCfg)
9 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
10
11 #include <fcntl.h>
12 #include <stdbool.h>
13 #include <stdint.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18
19 #include <glib.h>
20
21 #include "compat/socket.h"
22 #include "compat/endian.h"
23 #include "compat/compiler.h"
24 #include "common/common.h"
25 #include <babeltrace2/babeltrace.h>
26
27 #include "lttng-live.hpp"
28 #include "viewer-connection.hpp"
29 #include "lttng-viewer-abi.hpp"
30 #include "data-stream.hpp"
31 #include "metadata.hpp"
32 #include "cpp-common/cfg-logging-error-reporting.hpp"
33
34 #define viewer_handle_send_recv_status(_logCfg, _status, _action, _msg_str) \
35 do { \
36 switch (_status) { \
37 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
38 break; \
39 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
40 BT_CLOGE_APPEND_CAUSE("Error " _action " " _msg_str); \
41 break; \
42 default: \
43 bt_common_abort(); \
44 } \
45 } while (0)
46
47 #define viewer_handle_send_status(_logCfg, _status, _msg_str) \
48 viewer_handle_send_recv_status(_logCfg, _status, "sending", _msg_str)
49
50 #define viewer_handle_recv_status(_logCfg, _status, _msg_str) \
51 viewer_handle_send_recv_status(_logCfg, _status, "receiving", _msg_str)
52
53 #define LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO(_msg, _fmt, ...) \
54 do { \
55 BT_CLOGE_APPEND_CAUSE(_msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \
56 } while (0)
57
58 static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
59 {
60 switch (cmd) {
61 case LTTNG_VIEWER_CONNECT:
62 return "CONNECT";
63 case LTTNG_VIEWER_LIST_SESSIONS:
64 return "LIST_SESSIONS";
65 case LTTNG_VIEWER_ATTACH_SESSION:
66 return "ATTACH_SESSION";
67 case LTTNG_VIEWER_GET_NEXT_INDEX:
68 return "GET_NEXT_INDEX";
69 case LTTNG_VIEWER_GET_PACKET:
70 return "GET_PACKET";
71 case LTTNG_VIEWER_GET_METADATA:
72 return "GET_METADATA";
73 case LTTNG_VIEWER_GET_NEW_STREAMS:
74 return "GET_NEW_STREAMS";
75 case LTTNG_VIEWER_CREATE_SESSION:
76 return "CREATE_SESSION";
77 case LTTNG_VIEWER_DETACH_SESSION:
78 return "DETACH_SESSION";
79 }
80
81 bt_common_abort();
82 }
83
84 static const char *
85 lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
86 {
87 switch (code) {
88 case LTTNG_VIEWER_INDEX_OK:
89 return "INDEX_OK";
90 case LTTNG_VIEWER_INDEX_RETRY:
91 return "INDEX_RETRY";
92 case LTTNG_VIEWER_INDEX_HUP:
93 return "INDEX_HUP";
94 case LTTNG_VIEWER_INDEX_ERR:
95 return "INDEX_ERR";
96 case LTTNG_VIEWER_INDEX_INACTIVE:
97 return "INDEX_INACTIVE";
98 case LTTNG_VIEWER_INDEX_EOF:
99 return "INDEX_EOF";
100 }
101
102 bt_common_abort();
103 }
104
105 static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
106 {
107 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
108 }
109
110 static const char *
111 lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
112 {
113 switch (code) {
114 case LTTNG_VIEWER_GET_PACKET_OK:
115 return "GET_PACKET_OK";
116 case LTTNG_VIEWER_GET_PACKET_RETRY:
117 return "GET_PACKET_RETRY";
118 case LTTNG_VIEWER_GET_PACKET_ERR:
119 return "GET_PACKET_ERR";
120 case LTTNG_VIEWER_GET_PACKET_EOF:
121 return "GET_PACKET_EOF";
122 }
123
124 bt_common_abort();
125 };
126
127 static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
128 {
129 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
130 }
131
132 static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
133 {
134 switch (seek) {
135 case LTTNG_VIEWER_SEEK_BEGINNING:
136 return "SEEK_BEGINNING";
137 case LTTNG_VIEWER_SEEK_LAST:
138 return "SEEK_LAST";
139 }
140
141 bt_common_abort();
142 }
143
144 static inline enum lttng_live_iterator_status
145 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
146 {
147 switch (viewer_status) {
148 case LTTNG_LIVE_VIEWER_STATUS_OK:
149 return LTTNG_LIVE_ITERATOR_STATUS_OK;
150 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
151 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
152 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
153 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
154 }
155
156 bt_common_abort();
157 }
158
159 static inline enum ctf_msg_iter_medium_status
160 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
161 {
162 switch (viewer_status) {
163 case LTTNG_LIVE_VIEWER_STATUS_OK:
164 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
165 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
166 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
167 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
168 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
169 }
170
171 bt_common_abort();
172 }
173
174 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
175 {
176 int ret = bt_socket_close(viewer_connection->control_sock);
177 if (ret == -1) {
178 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
179 BT_CLOGW_ERRNO("Error closing viewer connection socket: ", ".");
180 }
181
182 viewer_connection->control_sock = BT_INVALID_SOCKET;
183 }
184
185 /*
186 * This function receives a message from the Relay daemon.
187 * If it received the entire message, it returns _OK,
188 * If it's interrupted, it returns _INTERRUPTED,
189 * otherwise, it returns _ERROR.
190 */
191 static enum lttng_live_viewer_status
192 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
193 {
194 ssize_t received;
195 size_t total_received = 0, to_receive = len;
196 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
197 enum lttng_live_viewer_status status;
198 BT_SOCKET sock = viewer_connection->control_sock;
199 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
200
201 /*
202 * Receive a message from the Relay.
203 */
204 do {
205 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
206 if (received == BT_SOCKET_ERROR) {
207 if (bt_socket_interrupted()) {
208 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
209 /*
210 * This interruption was due to a
211 * SIGINT and the graph is being torn
212 * down.
213 */
214 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
215 lttng_live_msg_iter->was_interrupted = true;
216 goto end;
217 } else {
218 /*
219 * A signal was received, but the graph
220 * is not being torn down. Carry on.
221 */
222 continue;
223 }
224 } else {
225 /*
226 * For any other types of socket error, close
227 * the socket and return an error.
228 */
229 LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO("Error receiving from Relay", ".");
230
231 viewer_connection_close_socket(viewer_connection);
232 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
233 goto end;
234 }
235 } else if (received == 0) {
236 /*
237 * The recv() call returned 0. This means the
238 * connection was orderly shutdown from the other peer.
239 * If that happens when we are trying to receive
240 * a message from it, it means something when wrong.
241 * Close the socket and return an error.
242 */
243 BT_CLOGE_APPEND_CAUSE("Remote side has closed connection");
244 viewer_connection_close_socket(viewer_connection);
245 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
246 goto end;
247 }
248
249 BT_ASSERT(received <= to_receive);
250 total_received += received;
251 to_receive -= received;
252
253 } while (to_receive > 0);
254
255 BT_ASSERT(total_received == len);
256 status = LTTNG_LIVE_VIEWER_STATUS_OK;
257
258 end:
259 return status;
260 }
261
262 /*
263 * This function sends a message to the Relay daemon.
264 * If it send the message, it returns _OK,
265 * If it's interrupted, it returns _INTERRUPTED,
266 * otherwise, it returns _ERROR.
267 */
268 static enum lttng_live_viewer_status
269 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
270 {
271 enum lttng_live_viewer_status status;
272 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
273 BT_SOCKET sock = viewer_connection->control_sock;
274 size_t to_send = len;
275 ssize_t total_sent = 0;
276 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
277
278 do {
279 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
280 if (sent == BT_SOCKET_ERROR) {
281 if (bt_socket_interrupted()) {
282 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
283 /*
284 * This interruption was a SIGINT and
285 * the graph is being teared down.
286 */
287 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
288 lttng_live_msg_iter->was_interrupted = true;
289 goto end;
290 } else {
291 /*
292 * A signal was received, but the graph
293 * is not being teared down. Carry on.
294 */
295 continue;
296 }
297 } else {
298 /*
299 * For any other types of socket error, close
300 * the socket and return an error.
301 */
302 LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO("Error sending to Relay", ".");
303
304 viewer_connection_close_socket(viewer_connection);
305 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
306 goto end;
307 }
308 }
309
310 BT_ASSERT(sent <= to_send);
311 total_sent += sent;
312 to_send -= sent;
313
314 } while (to_send > 0);
315
316 BT_ASSERT(total_sent == len);
317 status = LTTNG_LIVE_VIEWER_STATUS_OK;
318
319 end:
320 return status;
321 }
322
323 static int parse_url(struct live_viewer_connection *viewer_connection)
324 {
325 char error_buf[256] = {0};
326 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
327 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {0};
328 int ret = -1;
329 const char *path = viewer_connection->url->str;
330
331 if (!path) {
332 goto end;
333 }
334
335 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
336 if (!lttng_live_url_parts.proto) {
337 BT_CLOGE_APPEND_CAUSE("Invalid LTTng live URL format: %s", error_buf);
338 goto end;
339 }
340 viewer_connection->proto = lttng_live_url_parts.proto;
341 lttng_live_url_parts.proto = NULL;
342
343 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
344 lttng_live_url_parts.hostname = NULL;
345
346 if (lttng_live_url_parts.port >= 0) {
347 viewer_connection->port = lttng_live_url_parts.port;
348 } else {
349 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
350 }
351
352 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
353 lttng_live_url_parts.target_hostname = NULL;
354
355 if (lttng_live_url_parts.session_name) {
356 viewer_connection->session_name = lttng_live_url_parts.session_name;
357 lttng_live_url_parts.session_name = NULL;
358 }
359
360 ret = 0;
361
362 end:
363 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
364 return ret;
365 }
366
367 static enum lttng_live_viewer_status
368 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
369 {
370 struct lttng_viewer_cmd cmd;
371 struct lttng_viewer_connect connect;
372 enum lttng_live_viewer_status status;
373 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
374 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
375 char cmd_buf[cmd_buf_len];
376
377 BT_CLOGD("Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
378 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
379
380 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
381 cmd.data_size = htobe64((uint64_t) sizeof(connect));
382 cmd.cmd_version = htobe32(0);
383
384 connect.viewer_session_id = -1ULL; /* will be set on recv */
385 connect.major = htobe32(LTTNG_LIVE_MAJOR);
386 connect.minor = htobe32(LTTNG_LIVE_MINOR);
387 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
388
389 /*
390 * Merge the cmd and connection request to prevent a write-write
391 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
392 * second write to be performed quickly in presence of Nagle's algorithm
393 */
394 memcpy(cmd_buf, &cmd, sizeof(cmd));
395 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
396
397 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
398 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
399 viewer_handle_send_status(logCfg, status, "viewer connect command");
400 goto end;
401 }
402
403 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
404 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
405 viewer_handle_recv_status(logCfg, status, "viewer connect reply");
406 goto end;
407 }
408
409 BT_CLOGI("Received viewer session ID : %" PRIu64,
410 (uint64_t) be64toh(connect.viewer_session_id));
411 BT_CLOGI("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor));
412
413 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
414 BT_CLOGE_APPEND_CAUSE("Incompatible lttng-relayd protocol");
415 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
416 goto end;
417 }
418 /* Use the smallest protocol version implemented. */
419 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
420 viewer_connection->minor = be32toh(connect.minor);
421 } else {
422 viewer_connection->minor = LTTNG_LIVE_MINOR;
423 }
424 viewer_connection->major = LTTNG_LIVE_MAJOR;
425
426 status = LTTNG_LIVE_VIEWER_STATUS_OK;
427
428 goto end;
429
430 end:
431 return status;
432 }
433
434 static enum lttng_live_viewer_status
435 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
436 {
437 struct hostent *host;
438 struct sockaddr_in server_addr;
439 enum lttng_live_viewer_status status;
440 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
441
442 if (parse_url(viewer_connection)) {
443 BT_CLOGE_APPEND_CAUSE("Failed to parse URL");
444 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
445 goto error;
446 }
447
448 BT_CLOGD("Connecting to hostname : %s, port : %d, "
449 "target hostname : %s, session name : %s, proto : %s",
450 viewer_connection->relay_hostname->str, viewer_connection->port,
451 !viewer_connection->target_hostname ? "<none>" :
452 viewer_connection->target_hostname->str,
453 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
454 viewer_connection->proto->str);
455
456 host = gethostbyname(viewer_connection->relay_hostname->str);
457 if (!host) {
458 BT_CLOGE_APPEND_CAUSE("Cannot lookup hostname: hostname=\"%s\"",
459 viewer_connection->relay_hostname->str);
460 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
461 goto error;
462 }
463
464 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
465 BT_CLOGE_APPEND_CAUSE("Socket creation failed: %s", bt_socket_errormsg());
466 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
467 goto error;
468 }
469
470 server_addr.sin_family = AF_INET;
471 server_addr.sin_port = htons(viewer_connection->port);
472 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
473 memset(&(server_addr.sin_zero), 0, 8);
474
475 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
476 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
477 BT_CLOGE_APPEND_CAUSE("Connection failed: %s", bt_socket_errormsg());
478 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
479 goto error;
480 }
481
482 status = lttng_live_handshake(viewer_connection);
483
484 /*
485 * Only print error and append cause in case of error. not in case of
486 * interruption.
487 */
488 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
489 BT_CLOGE_APPEND_CAUSE("Viewer handshake failed");
490 goto error;
491 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
492 goto end;
493 }
494
495 goto end;
496
497 error:
498 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
499 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
500 BT_CLOGW("Error closing socket: %s.", bt_socket_errormsg());
501 }
502 }
503 viewer_connection->control_sock = BT_INVALID_SOCKET;
504 end:
505 return status;
506 }
507
508 static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
509 {
510 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
511
512 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
513 return;
514 }
515 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
516 BT_CLOGW("Error closing socket: %s", bt_socket_errormsg());
517 viewer_connection->control_sock = BT_INVALID_SOCKET;
518 }
519 }
520
521 static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
522 bool *_found, struct live_viewer_connection *viewer_connection)
523 {
524 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
525 int ret = 0;
526 uint64_t i, len;
527 bt_value *map = NULL;
528 bt_value *hostname = NULL;
529 bt_value *session_name = NULL;
530 bt_value *btval = NULL;
531 bool found = false;
532
533 len = bt_value_array_get_length(results);
534 for (i = 0; i < len; i++) {
535 const char *hostname_str = NULL;
536 const char *session_name_str = NULL;
537
538 map = bt_value_array_borrow_element_by_index(results, i);
539 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
540 if (!hostname) {
541 BT_CLOGE_APPEND_CAUSE("Error borrowing \"target-hostname\" entry.");
542 ret = -1;
543 goto end;
544 }
545 session_name = bt_value_map_borrow_entry_value(map, "session-name");
546 if (!session_name) {
547 BT_CLOGE_APPEND_CAUSE("Error borrowing \"session-name\" entry.");
548 ret = -1;
549 goto end;
550 }
551 hostname_str = bt_value_string_get(hostname);
552 session_name_str = bt_value_string_get(session_name);
553
554 if (strcmp(session->hostname, hostname_str) == 0 &&
555 strcmp(session->session_name, session_name_str) == 0) {
556 int64_t val;
557 uint32_t streams = be32toh(session->streams);
558 uint32_t clients = be32toh(session->clients);
559
560 found = true;
561
562 btval = bt_value_map_borrow_entry_value(map, "stream-count");
563 if (!btval) {
564 BT_CLOGE_APPEND_CAUSE("Error borrowing \"stream-count\" entry.");
565 ret = -1;
566 goto end;
567 }
568 val = bt_value_integer_unsigned_get(btval);
569 /* sum */
570 val += streams;
571 bt_value_integer_unsigned_set(btval, val);
572
573 btval = bt_value_map_borrow_entry_value(map, "client-count");
574 if (!btval) {
575 BT_CLOGE_APPEND_CAUSE("Error borrowing \"client-count\" entry.");
576 ret = -1;
577 goto end;
578 }
579 val = bt_value_integer_unsigned_get(btval);
580 /* max */
581 val = bt_max_t(int64_t, clients, val);
582 bt_value_integer_unsigned_set(btval, val);
583 }
584
585 if (found) {
586 break;
587 }
588 }
589 end:
590 *_found = found;
591 return ret;
592 }
593
594 static int list_append_session(bt_value *results, GString *base_url,
595 const struct lttng_viewer_session *session,
596 struct live_viewer_connection *viewer_connection)
597 {
598 int ret = 0;
599 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
600 bt_value_map_insert_entry_status insert_status;
601 bt_value_array_append_element_status append_status;
602 bt_value *map = NULL;
603 GString *url = NULL;
604 bool found = false;
605
606 /*
607 * If the session already exists, add the stream count to it,
608 * and do max of client counts.
609 */
610 ret = list_update_session(results, session, &found, viewer_connection);
611 if (ret || found) {
612 goto end;
613 }
614
615 map = bt_value_map_create();
616 if (!map) {
617 BT_CLOGE_APPEND_CAUSE("Error creating map value.");
618 ret = -1;
619 goto end;
620 }
621
622 if (base_url->len < 1) {
623 BT_CLOGE_APPEND_CAUSE("Error: base_url length smaller than 1.");
624 ret = -1;
625 goto end;
626 }
627 /*
628 * key = "url",
629 * value = <string>,
630 */
631 url = g_string_new(base_url->str);
632 g_string_append(url, "/host/");
633 g_string_append(url, session->hostname);
634 g_string_append_c(url, '/');
635 g_string_append(url, session->session_name);
636
637 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
638 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
639 BT_CLOGE_APPEND_CAUSE("Error inserting \"url\" entry.");
640 ret = -1;
641 goto end;
642 }
643
644 /*
645 * key = "target-hostname",
646 * value = <string>,
647 */
648 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
649 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
650 BT_CLOGE_APPEND_CAUSE("Error inserting \"target-hostname\" entry.");
651 ret = -1;
652 goto end;
653 }
654
655 /*
656 * key = "session-name",
657 * value = <string>,
658 */
659 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
660 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
661 BT_CLOGE_APPEND_CAUSE("Error inserting \"session-name\" entry.");
662 ret = -1;
663 goto end;
664 }
665
666 /*
667 * key = "timer-us",
668 * value = <integer>,
669 */
670 {
671 uint32_t live_timer = be32toh(session->live_timer);
672
673 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
674 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
675 BT_CLOGE_APPEND_CAUSE("Error inserting \"timer-us\" entry.");
676 ret = -1;
677 goto end;
678 }
679 }
680
681 /*
682 * key = "stream-count",
683 * value = <integer>,
684 */
685 {
686 uint32_t streams = be32toh(session->streams);
687
688 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
689 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
690 BT_CLOGE_APPEND_CAUSE("Error inserting \"stream-count\" entry.");
691 ret = -1;
692 goto end;
693 }
694 }
695
696 /*
697 * key = "client-count",
698 * value = <integer>,
699 */
700 {
701 uint32_t clients = be32toh(session->clients);
702
703 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
704 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
705 BT_CLOGE_APPEND_CAUSE("Error inserting \"client-count\" entry.");
706 ret = -1;
707 goto end;
708 }
709 }
710
711 append_status = bt_value_array_append_element(results, map);
712 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
713 BT_CLOGE_APPEND_CAUSE("Error appending map to results.");
714 ret = -1;
715 }
716
717 end:
718 if (url) {
719 g_string_free(url, true);
720 }
721 BT_VALUE_PUT_REF_AND_RESET(map);
722 return ret;
723 }
724
725 /*
726 * Data structure returned:
727 *
728 * {
729 * <array> = {
730 * [n] = {
731 * <map> = {
732 * {
733 * key = "url",
734 * value = <string>,
735 * },
736 * {
737 * key = "target-hostname",
738 * value = <string>,
739 * },
740 * {
741 * key = "session-name",
742 * value = <string>,
743 * },
744 * {
745 * key = "timer-us",
746 * value = <integer>,
747 * },
748 * {
749 * key = "stream-count",
750 * value = <integer>,
751 * },
752 * {
753 * key = "client-count",
754 * value = <integer>,
755 * },
756 * },
757 * }
758 * }
759 */
760
761 BT_HIDDEN
762 bt_component_class_query_method_status
763 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
764 const bt_value **user_result)
765 {
766 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
767 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
768 bt_value *result = NULL;
769 enum lttng_live_viewer_status viewer_status;
770 struct lttng_viewer_cmd cmd;
771 struct lttng_viewer_list_sessions list;
772 uint32_t i, sessions_count;
773
774 result = bt_value_array_create();
775 if (!result) {
776 BT_CLOGE_APPEND_CAUSE("Error creating array");
777 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
778 goto error;
779 }
780
781 BT_LOGD("Requesting list of sessions: cmd=%s",
782 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
783
784 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
785 cmd.data_size = htobe64((uint64_t) 0);
786 cmd.cmd_version = htobe32(0);
787
788 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
789 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
790 BT_CLOGE_APPEND_CAUSE("Error sending list sessions command");
791 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
792 goto error;
793 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
794 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
795 goto error;
796 }
797
798 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
799 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
800 BT_CLOGE_APPEND_CAUSE("Error receiving session list");
801 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
802 goto error;
803 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
804 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
805 goto error;
806 }
807
808 sessions_count = be32toh(list.sessions_count);
809 for (i = 0; i < sessions_count; i++) {
810 struct lttng_viewer_session lsession;
811
812 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
813 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
814 BT_CLOGE_APPEND_CAUSE("Error receiving session:");
815 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
816 goto error;
817 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
818 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
819 goto error;
820 }
821
822 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
823 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
824 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
825 BT_CLOGE_APPEND_CAUSE("Error appending session");
826 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
827 goto error;
828 }
829 }
830
831 *user_result = result;
832 goto end;
833 error:
834 BT_VALUE_PUT_REF_AND_RESET(result);
835 end:
836 return status;
837 }
838
839 static enum lttng_live_viewer_status
840 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
841 {
842 struct lttng_viewer_cmd cmd;
843 struct lttng_viewer_list_sessions list;
844 struct lttng_viewer_session lsession;
845 uint32_t i, sessions_count;
846 uint64_t session_id;
847 enum lttng_live_viewer_status status;
848 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
849 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
850
851 BT_CLOGD("Asking the relay daemon for the list of sessions: cmd=%s",
852 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
853
854 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
855 cmd.data_size = htobe64((uint64_t) 0);
856 cmd.cmd_version = htobe32(0);
857
858 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
859 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
860 viewer_handle_send_status(logCfg, status, "list sessions command");
861 goto end;
862 }
863
864 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
865 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
866 viewer_handle_recv_status(logCfg, status, "session list reply");
867 goto end;
868 }
869
870 sessions_count = be32toh(list.sessions_count);
871 for (i = 0; i < sessions_count; i++) {
872 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
873 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
874 viewer_handle_recv_status(logCfg, status, "session reply");
875 goto end;
876 }
877 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
878 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
879 session_id = be64toh(lsession.id);
880
881 BT_CLOGI("Adding session to internal list: "
882 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
883 session_id, lsession.hostname, lsession.session_name);
884
885 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
886 LTTNG_VIEWER_NAME_MAX) == 0) &&
887 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
888 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
889 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
890 lsession.session_name)) {
891 BT_CLOGE_APPEND_CAUSE("Failed to add live session");
892 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
893 goto end;
894 }
895 }
896 }
897
898 status = LTTNG_LIVE_VIEWER_STATUS_OK;
899
900 end:
901 return status;
902 }
903
904 BT_HIDDEN
905 enum lttng_live_viewer_status
906 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
907 {
908 struct lttng_viewer_cmd cmd;
909 struct lttng_viewer_create_session_response resp;
910 enum lttng_live_viewer_status status;
911 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
912 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
913
914 BT_CLOGD("Creating a viewer session: cmd=%s",
915 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
916
917 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
918 cmd.data_size = htobe64((uint64_t) 0);
919 cmd.cmd_version = htobe32(0);
920
921 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
922 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
923 viewer_handle_send_status(logCfg, status, "create session command");
924 goto end;
925 }
926
927 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
928 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
929 viewer_handle_recv_status(logCfg, status, "create session reply");
930 goto end;
931 }
932
933 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
934 BT_CLOGE_APPEND_CAUSE("Error creating viewer session");
935 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
936 goto end;
937 }
938
939 status = lttng_live_query_session_ids(lttng_live_msg_iter);
940 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
941 BT_CLOGE_APPEND_CAUSE("Failed to query live viewer session ids");
942 goto end;
943 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
944 goto end;
945 }
946
947 end:
948 return status;
949 }
950
951 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
952 uint32_t stream_count,
953 bt_self_message_iterator *self_msg_iter)
954 {
955 uint32_t i;
956 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
957 enum lttng_live_viewer_status status;
958 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
959 const bt2_common::LogCfg& logCfg = session->logCfg;
960
961 BT_CLOGI("Getting %" PRIu32 " new streams", stream_count);
962 for (i = 0; i < stream_count; i++) {
963 struct lttng_viewer_stream stream;
964 struct lttng_live_stream_iterator *live_stream;
965 uint64_t stream_id;
966 uint64_t ctf_trace_id;
967
968 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
969 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
970 viewer_handle_recv_status(logCfg, status, "stream reply");
971 goto end;
972 }
973 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
974 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
975 stream_id = be64toh(stream.id);
976 ctf_trace_id = be64toh(stream.ctf_trace_id);
977
978 if (stream.metadata_flag) {
979 BT_CLOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
980 stream.channel_name);
981 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id,
982 stream.path_name)) {
983 BT_CLOGE_APPEND_CAUSE("Error creating metadata stream");
984 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
985 goto end;
986 }
987 session->lazy_stream_msg_init = true;
988 } else {
989 BT_CLOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
990 stream.channel_name);
991 live_stream =
992 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
993 if (!live_stream) {
994 BT_CLOGE_APPEND_CAUSE("Error creating stream");
995 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
996 goto end;
997 }
998 }
999 }
1000 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1001
1002 end:
1003 return status;
1004 }
1005
1006 BT_HIDDEN
1007 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1008 bt_self_message_iterator *self_msg_iter)
1009 {
1010 struct lttng_viewer_cmd cmd;
1011 enum lttng_live_viewer_status status;
1012 struct lttng_viewer_attach_session_request rq;
1013 struct lttng_viewer_attach_session_response rp;
1014 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1015 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1016 const bt2_common::LogCfg& logCfg = session->logCfg;
1017 uint64_t session_id = session->id;
1018 uint32_t streams_count;
1019 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1020 char cmd_buf[cmd_buf_len];
1021
1022 BT_CLOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1023 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1024 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1025
1026 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1027 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1028 cmd.cmd_version = htobe32(0);
1029
1030 memset(&rq, 0, sizeof(rq));
1031 rq.session_id = htobe64(session_id);
1032 // TODO: add cmd line parameter to select seek beginning
1033 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1034 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1035
1036 /*
1037 * Merge the cmd and connection request to prevent a write-write
1038 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1039 * second write to be performed quickly in presence of Nagle's algorithm.
1040 */
1041 memcpy(cmd_buf, &cmd, sizeof(cmd));
1042 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1043 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1044 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1045 viewer_handle_send_status(logCfg, status, "attach session command");
1046 goto end;
1047 }
1048
1049 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1050 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1051 viewer_handle_recv_status(logCfg, status, "attach session reply");
1052 goto end;
1053 }
1054
1055 streams_count = be32toh(rp.streams_count);
1056 switch (be32toh(rp.status)) {
1057 case LTTNG_VIEWER_ATTACH_OK:
1058 break;
1059 case LTTNG_VIEWER_ATTACH_UNK:
1060 BT_CLOGE_APPEND_CAUSE("Session id %" PRIu64 " is unknown", session_id);
1061 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1062 goto end;
1063 case LTTNG_VIEWER_ATTACH_ALREADY:
1064 BT_CLOGE_APPEND_CAUSE("There is already a viewer attached to this session");
1065 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1066 goto end;
1067 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1068 BT_CLOGE_APPEND_CAUSE("Not a live session");
1069 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1070 goto end;
1071 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1072 BT_CLOGE_APPEND_CAUSE("Wrong seek parameter");
1073 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1074 goto end;
1075 default:
1076 BT_CLOGE_APPEND_CAUSE("Unknown attach return code %u", be32toh(rp.status));
1077 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1078 goto end;
1079 }
1080
1081 /* We receive the initial list of streams. */
1082 status = receive_streams(session, streams_count, self_msg_iter);
1083 switch (status) {
1084 case LTTNG_LIVE_VIEWER_STATUS_OK:
1085 break;
1086 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1087 goto end;
1088 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1089 BT_CLOGE_APPEND_CAUSE("Error receiving streams");
1090 goto end;
1091 default:
1092 bt_common_abort();
1093 }
1094
1095 session->attached = true;
1096 session->new_streams_needed = false;
1097
1098 end:
1099 return status;
1100 }
1101
1102 BT_HIDDEN
1103 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
1104 {
1105 struct lttng_viewer_cmd cmd;
1106 enum lttng_live_viewer_status status;
1107 struct lttng_viewer_detach_session_request rq;
1108 struct lttng_viewer_detach_session_response rp;
1109 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1110 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1111 uint64_t session_id = session->id;
1112 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1113 char cmd_buf[cmd_buf_len];
1114 const bt2_common::LogCfg& logCfg = session->logCfg;
1115
1116 /*
1117 * The session might already be detached and the viewer socket might
1118 * already been closed. This happens when calling this function when
1119 * tearing down the graph after an error.
1120 */
1121 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1122 return LTTNG_LIVE_VIEWER_STATUS_OK;
1123 }
1124
1125 BT_CLOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1126 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1127
1128 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1129 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1130 cmd.cmd_version = htobe32(0);
1131
1132 memset(&rq, 0, sizeof(rq));
1133 rq.session_id = htobe64(session_id);
1134
1135 /*
1136 * Merge the cmd and connection request to prevent a write-write
1137 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1138 * second write to be performed quickly in presence of Nagle's algorithm.
1139 */
1140 memcpy(cmd_buf, &cmd, sizeof(cmd));
1141 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1142 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1143 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1144 viewer_handle_send_status(logCfg, status, "detach session command");
1145 goto end;
1146 }
1147
1148 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1149 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1150 viewer_handle_recv_status(logCfg, status, "detach session reply");
1151 goto end;
1152 }
1153
1154 switch (be32toh(rp.status)) {
1155 case LTTNG_VIEWER_DETACH_SESSION_OK:
1156 break;
1157 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1158 BT_CLOGW("Session id %" PRIu64 " is unknown", session_id);
1159 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1160 goto end;
1161 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1162 BT_CLOGW("Error detaching session id %" PRIu64 "", session_id);
1163 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1164 goto end;
1165 default:
1166 BT_CLOGE("Unknown detach return code %u", be32toh(rp.status));
1167 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1168 goto end;
1169 }
1170
1171 session->attached = false;
1172
1173 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1174
1175 end:
1176 return status;
1177 }
1178
1179 BT_HIDDEN
1180 enum lttng_live_get_one_metadata_status
1181 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
1182 {
1183 uint64_t len = 0;
1184 enum lttng_live_get_one_metadata_status status;
1185 enum lttng_live_viewer_status viewer_status;
1186 struct lttng_viewer_cmd cmd;
1187 struct lttng_viewer_get_metadata rq;
1188 struct lttng_viewer_metadata_packet rp;
1189 std::vector<char> data;
1190 struct lttng_live_session *session = trace->session;
1191 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1192 struct lttng_live_metadata *metadata = trace->metadata;
1193 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1194 const bt2_common::LogCfg& logCfg = trace->logCfg;
1195 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1196 char cmd_buf[cmd_buf_len];
1197
1198 BT_CLOGD("Requesting new metadata for trace:"
1199 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1200 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1201 metadata->stream_id);
1202
1203 rq.stream_id = htobe64(metadata->stream_id);
1204 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1205 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1206 cmd.cmd_version = htobe32(0);
1207
1208 /*
1209 * Merge the cmd and connection request to prevent a write-write
1210 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1211 * second write to be performed quickly in presence of Nagle's algorithm.
1212 */
1213 memcpy(cmd_buf, &cmd, sizeof(cmd));
1214 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1215 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1216 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1217 viewer_handle_send_status(logCfg, viewer_status, "get metadata command");
1218 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1219 goto end;
1220 }
1221
1222 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1223 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1224 viewer_handle_recv_status(logCfg, viewer_status, "get metadata reply");
1225 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1226 goto end;
1227 }
1228
1229 switch (be32toh(rp.status)) {
1230 case LTTNG_VIEWER_METADATA_OK:
1231 BT_CLOGD("Received get_metadata response: ok");
1232 break;
1233 case LTTNG_VIEWER_NO_NEW_METADATA:
1234 BT_CLOGD("Received get_metadata response: no new");
1235 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1236 goto end;
1237 case LTTNG_VIEWER_METADATA_ERR:
1238 /*
1239 * The Relayd cannot find this stream id. Maybe its
1240 * gone already. This can happen in short lived UST app
1241 * in a per-pid session.
1242 */
1243 BT_CLOGD("Received get_metadata response: error");
1244 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1245 goto end;
1246 default:
1247 BT_CLOGE_APPEND_CAUSE("Received get_metadata response: unknown");
1248 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1249 goto end;
1250 }
1251
1252 len = be64toh(rp.len);
1253 if (len == 0) {
1254 /*
1255 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1256 * length of 0. This means we must try again. This scenario
1257 * arises when a clear command is performed on an lttng session.
1258 */
1259 BT_CLOGD("Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1260 goto empty_metadata_packet_retry;
1261 }
1262
1263 BT_CLOGD("Writing %" PRIu64 " bytes to metadata", len);
1264 if (len <= 0) {
1265 BT_CLOGE_APPEND_CAUSE("Erroneous response length");
1266 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1267 goto end;
1268 }
1269
1270 data.resize(len);
1271
1272 viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
1273 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1274 viewer_handle_recv_status(logCfg, viewer_status, "get metadata packet");
1275 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1276 goto end;
1277 }
1278
1279 /*
1280 * Write the metadata to the file handle.
1281 */
1282 buf.insert(buf.end(), data.begin(), data.end());
1283
1284 empty_metadata_packet_retry:
1285 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1286
1287 end:
1288 return status;
1289 }
1290
1291 /*
1292 * Assign the fields from a lttng_viewer_index to a packet_index.
1293 */
1294 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1295 struct packet_index *pindex)
1296 {
1297 BT_ASSERT(lindex);
1298 BT_ASSERT(pindex);
1299
1300 pindex->offset = be64toh(lindex->offset);
1301 pindex->packet_size = be64toh(lindex->packet_size);
1302 pindex->content_size = be64toh(lindex->content_size);
1303 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1304 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1305 pindex->events_discarded = be64toh(lindex->events_discarded);
1306 }
1307
1308 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1309 {
1310 uint64_t session_idx;
1311 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1312
1313 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1314 struct lttng_live_session *session =
1315 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1316 BT_CLOGD("Marking session as needing new streams: "
1317 "session-id=%" PRIu64,
1318 session->id);
1319 session->new_streams_needed = true;
1320 }
1321 }
1322
1323 BT_HIDDEN
1324 enum lttng_live_iterator_status
1325 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1326 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1327 {
1328 struct lttng_viewer_cmd cmd;
1329 struct lttng_viewer_get_next_index rq;
1330 enum lttng_live_viewer_status viewer_status;
1331 struct lttng_viewer_index rp;
1332 enum lttng_live_iterator_status status;
1333 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1334 struct lttng_live_trace *trace = stream->trace;
1335 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1336 char cmd_buf[cmd_buf_len];
1337 uint32_t flags, rp_status;
1338 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1339
1340 BT_CLOGD("Requesting next index for stream: cmd=%s, "
1341 "viewer-stream-id=%" PRIu64,
1342 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), stream->viewer_stream_id);
1343 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1344 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1345 cmd.cmd_version = htobe32(0);
1346
1347 memset(&rq, 0, sizeof(rq));
1348 rq.stream_id = htobe64(stream->viewer_stream_id);
1349
1350 /*
1351 * Merge the cmd and connection request to prevent a write-write
1352 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1353 * second write to be performed quickly in presence of Nagle's algorithm.
1354 */
1355 memcpy(cmd_buf, &cmd, sizeof(cmd));
1356 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1357
1358 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1359 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1360 viewer_handle_send_status(logCfg, viewer_status, "get next index command");
1361 goto error;
1362 }
1363
1364 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1365 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1366 viewer_handle_recv_status(logCfg, viewer_status, "get next index reply");
1367 goto error;
1368 }
1369
1370 flags = be32toh(rp.flags);
1371 rp_status = be32toh(rp.status);
1372
1373 BT_CLOGD("Received response from relay daemon: cmd=%s, response=%s",
1374 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1375 lttng_viewer_next_index_return_code_string(rp_status));
1376 switch (rp_status) {
1377 case LTTNG_VIEWER_INDEX_INACTIVE:
1378 {
1379 uint64_t ctf_stream_class_id;
1380
1381 memset(index, 0, sizeof(struct packet_index));
1382 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1383 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1384 ctf_stream_class_id = be64toh(rp.stream_id);
1385 if (stream->ctf_stream_class_id.is_set) {
1386 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1387 } else {
1388 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1389 stream->ctf_stream_class_id.is_set = true;
1390 }
1391 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1392 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1393 break;
1394 }
1395 case LTTNG_VIEWER_INDEX_OK:
1396 {
1397 uint64_t ctf_stream_class_id;
1398
1399 lttng_index_to_packet_index(&rp, index);
1400 ctf_stream_class_id = be64toh(rp.stream_id);
1401 if (stream->ctf_stream_class_id.is_set) {
1402 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1403 } else {
1404 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1405 stream->ctf_stream_class_id.is_set = true;
1406 }
1407
1408 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1409
1410 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1411 BT_CLOGD("Marking trace as needing new metadata: "
1412 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1413 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1414 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1415 }
1416 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1417 BT_CLOGD("Marking all sessions as possibly needing new streams: "
1418 "response=%s, response-flag=NEW_STREAM",
1419 lttng_viewer_next_index_return_code_string(rp_status));
1420 lttng_live_need_new_streams(lttng_live_msg_iter);
1421 }
1422 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1423 break;
1424 }
1425 case LTTNG_VIEWER_INDEX_RETRY:
1426 memset(index, 0, sizeof(struct packet_index));
1427 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1428 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1429 goto end;
1430 case LTTNG_VIEWER_INDEX_HUP:
1431 memset(index, 0, sizeof(struct packet_index));
1432 index->offset = EOF;
1433 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1434 stream->has_stream_hung_up = true;
1435 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1436 break;
1437 case LTTNG_VIEWER_INDEX_ERR:
1438 memset(index, 0, sizeof(struct packet_index));
1439 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1440 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1441 goto end;
1442 default:
1443 BT_CLOGD("Received get_next_index response: unknown value");
1444 memset(index, 0, sizeof(struct packet_index));
1445 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1446 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1447 goto end;
1448 }
1449 goto end;
1450
1451 error:
1452 status = viewer_status_to_live_iterator_status(viewer_status);
1453 end:
1454 return status;
1455 }
1456
1457 BT_HIDDEN
1458 enum ctf_msg_iter_medium_status
1459 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1460 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1461 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1462 {
1463 enum ctf_msg_iter_medium_status status;
1464 enum lttng_live_viewer_status viewer_status;
1465 struct lttng_viewer_trace_packet rp;
1466 struct lttng_viewer_cmd cmd;
1467 struct lttng_viewer_get_packet rq;
1468 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1469 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1470 struct lttng_live_trace *trace = stream->trace;
1471 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1472 char cmd_buf[cmd_buf_len];
1473 uint32_t flags, rp_status;
1474
1475 BT_CLOGD("Requesting data from stream: cmd=%s, "
1476 "offset=%" PRIu64 ", request-len=%" PRIu64,
1477 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1478
1479 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1480 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1481 cmd.cmd_version = htobe32(0);
1482
1483 memset(&rq, 0, sizeof(rq));
1484 rq.stream_id = htobe64(stream->viewer_stream_id);
1485 rq.offset = htobe64(offset);
1486 rq.len = htobe32(req_len);
1487
1488 /*
1489 * Merge the cmd and connection request to prevent a write-write
1490 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1491 * second write to be performed quickly in presence of Nagle's algorithm.
1492 */
1493 memcpy(cmd_buf, &cmd, sizeof(cmd));
1494 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1495
1496 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1497 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1498 viewer_handle_send_status(logCfg, viewer_status, "get data packet command");
1499 goto error_convert_status;
1500 }
1501
1502 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1503 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1504 viewer_handle_recv_status(logCfg, viewer_status, "get data packet reply");
1505 goto error_convert_status;
1506 }
1507
1508 flags = be32toh(rp.flags);
1509 rp_status = be32toh(rp.status);
1510
1511 BT_CLOGD("Received response from relay daemon: cmd=%s, response=%s",
1512 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1513 lttng_viewer_get_packet_return_code_string(rp_status));
1514 switch (rp_status) {
1515 case LTTNG_VIEWER_GET_PACKET_OK:
1516 req_len = be32toh(rp.len);
1517 BT_CLOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1518 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
1519 break;
1520 case LTTNG_VIEWER_GET_PACKET_RETRY:
1521 /* Unimplemented by relay daemon */
1522 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1523 goto end;
1524 case LTTNG_VIEWER_GET_PACKET_ERR:
1525 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1526 BT_CLOGD("Marking trace as needing new metadata: "
1527 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1528 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1529 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1530 }
1531 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1532 BT_CLOGD("Marking all sessions as possibly needing new streams: "
1533 "response=%s, response-flag=NEW_STREAM",
1534 lttng_viewer_next_index_return_code_string(rp_status));
1535 lttng_live_need_new_streams(lttng_live_msg_iter);
1536 }
1537 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1538 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1539 BT_CLOGD("Reply with any one flags set means we should retry: response=%s",
1540 lttng_viewer_get_packet_return_code_string(rp_status));
1541 goto end;
1542 }
1543 BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: error");
1544 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1545 goto end;
1546 case LTTNG_VIEWER_GET_PACKET_EOF:
1547 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1548 goto end;
1549 default:
1550 BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: unknown (%d)", rp_status);
1551 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1552 goto end;
1553 }
1554
1555 if (req_len == 0) {
1556 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1557 goto end;
1558 }
1559
1560 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1561 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1562 viewer_handle_recv_status(logCfg, viewer_status, "get data packet");
1563 goto error_convert_status;
1564 }
1565 *recv_len = req_len;
1566
1567 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1568 goto end;
1569
1570 error_convert_status:
1571 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1572 end:
1573 return status;
1574 }
1575
1576 /*
1577 * Request new streams for a session.
1578 */
1579 BT_HIDDEN
1580 enum lttng_live_iterator_status
1581 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1582 bt_self_message_iterator *self_msg_iter)
1583 {
1584 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1585 struct lttng_viewer_cmd cmd;
1586 struct lttng_viewer_new_streams_request rq;
1587 struct lttng_viewer_new_streams_response rp;
1588 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1589 enum lttng_live_viewer_status viewer_status;
1590 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1591 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
1592 uint32_t streams_count;
1593 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1594 char cmd_buf[cmd_buf_len];
1595
1596 if (!session->new_streams_needed) {
1597 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1598 goto end;
1599 }
1600
1601 BT_CLOGD("Requesting new streams for session: cmd=%s, "
1602 "session-id=%" PRIu64,
1603 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1604
1605 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1606 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1607 cmd.cmd_version = htobe32(0);
1608
1609 memset(&rq, 0, sizeof(rq));
1610 rq.session_id = htobe64(session->id);
1611
1612 /*
1613 * Merge the cmd and connection request to prevent a write-write
1614 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1615 * second write to be performed quickly in presence of Nagle's algorithm.
1616 */
1617 memcpy(cmd_buf, &cmd, sizeof(cmd));
1618 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1619
1620 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1621 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1622 viewer_handle_send_status(logCfg, viewer_status, "get new streams command");
1623 status = viewer_status_to_live_iterator_status(viewer_status);
1624 goto end;
1625 }
1626
1627 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1628 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1629 viewer_handle_recv_status(logCfg, viewer_status, "get new streams reply");
1630 status = viewer_status_to_live_iterator_status(viewer_status);
1631 goto end;
1632 }
1633
1634 streams_count = be32toh(rp.streams_count);
1635
1636 switch (be32toh(rp.status)) {
1637 case LTTNG_VIEWER_NEW_STREAMS_OK:
1638 session->new_streams_needed = false;
1639 break;
1640 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1641 session->new_streams_needed = false;
1642 goto end;
1643 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1644 session->new_streams_needed = false;
1645 session->closed = true;
1646 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1647 goto end;
1648 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1649 BT_CLOGD("Received get_new_streams response: error");
1650 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1651 goto end;
1652 default:
1653 BT_CLOGE_APPEND_CAUSE("Received get_new_streams response: Unknown:"
1654 "return code %u",
1655 be32toh(rp.status));
1656 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1657 goto end;
1658 }
1659
1660 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1661 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1662 viewer_handle_recv_status(logCfg, viewer_status, "new streams");
1663 status = viewer_status_to_live_iterator_status(viewer_status);
1664 goto end;
1665 }
1666
1667 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1668 end:
1669 return status;
1670 }
1671
1672 BT_HIDDEN
1673 enum lttng_live_viewer_status live_viewer_connection_create(
1674 const char *url, bool in_query, struct lttng_live_msg_iter *lttng_live_msg_iter,
1675 const bt2_common::LogCfg& logCfg, struct live_viewer_connection **viewer)
1676 {
1677 enum lttng_live_viewer_status status;
1678
1679 live_viewer_connection *viewer_connection = new live_viewer_connection {logCfg};
1680
1681 if (bt_socket_init(logCfg.logLevel()) != 0) {
1682 BT_CLOGE_APPEND_CAUSE("Failed to init socket");
1683 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1684 goto error;
1685 }
1686
1687 viewer_connection->control_sock = BT_INVALID_SOCKET;
1688 viewer_connection->port = -1;
1689 viewer_connection->in_query = in_query;
1690 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1691 viewer_connection->url = g_string_new(url);
1692 if (!viewer_connection->url) {
1693 BT_CLOGE_APPEND_CAUSE("Failed to allocate URL buffer");
1694 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1695 goto error;
1696 }
1697
1698 BT_CLOGD("Establishing connection to url \"%s\"...", url);
1699 status = lttng_live_connect_viewer(viewer_connection);
1700 /*
1701 * Only print error and append cause in case of error. not in case of
1702 * interruption.
1703 */
1704 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1705 BT_CLOGE_APPEND_CAUSE("Failed to establish connection: "
1706 "url=\"%s\"",
1707 url);
1708 goto error;
1709 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1710 goto error;
1711 }
1712 BT_CLOGD("Connection to url \"%s\" is established", url);
1713
1714 *viewer = viewer_connection;
1715 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1716 goto end;
1717
1718 error:
1719 if (viewer_connection) {
1720 live_viewer_connection_destroy(viewer_connection);
1721 }
1722 end:
1723 return status;
1724 }
1725
1726 BT_HIDDEN
1727 void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
1728 {
1729 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
1730
1731 if (!viewer_connection) {
1732 goto end;
1733 }
1734
1735 BT_CLOGD("Closing connection to relay:"
1736 "relay-url=\"%s\"",
1737 viewer_connection->url->str);
1738
1739 lttng_live_disconnect_viewer(viewer_connection);
1740
1741 if (viewer_connection->url) {
1742 g_string_free(viewer_connection->url, true);
1743 }
1744
1745 if (viewer_connection->relay_hostname) {
1746 g_string_free(viewer_connection->relay_hostname, true);
1747 }
1748
1749 if (viewer_connection->target_hostname) {
1750 g_string_free(viewer_connection->target_hostname, true);
1751 }
1752
1753 if (viewer_connection->session_name) {
1754 g_string_free(viewer_connection->session_name, true);
1755 }
1756
1757 if (viewer_connection->proto) {
1758 g_string_free(viewer_connection->proto, true);
1759 }
1760
1761 delete viewer_connection;
1762
1763 bt_socket_fini();
1764
1765 end:
1766 return;
1767 }
This page took 0.07042 seconds and 5 git commands to generate.