Re-format new C++ files
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
9 #define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
10 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
11 #include "logging/comp-logging.h"
12
13 #include <fcntl.h>
14 #include <stdbool.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20
21 #include <glib.h>
22
23 #include "compat/socket.h"
24 #include "compat/endian.h"
25 #include "compat/compiler.h"
26 #include "common/common.h"
27 #include <babeltrace2/babeltrace.h>
28
29 #include "lttng-live.hpp"
30 #include "viewer-connection.hpp"
31 #include "lttng-viewer-abi.hpp"
32 #include "data-stream.hpp"
33 #include "metadata.hpp"
34
35 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
36 do { \
37 switch (_status) { \
38 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
39 break; \
40 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
41 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
42 "Error " _action " " _msg_str); \
43 break; \
44 default: \
45 bt_common_abort(); \
46 } \
47 } while (0)
48
49 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
50 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
51
52 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
53 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
54
55 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
56 _fmt, ...) \
57 do { \
58 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
59 bt_socket_errormsg(), ##__VA_ARGS__); \
60 } while (0)
61
62 static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
63 {
64 switch (cmd) {
65 case LTTNG_VIEWER_CONNECT:
66 return "CONNECT";
67 case LTTNG_VIEWER_LIST_SESSIONS:
68 return "LIST_SESSIONS";
69 case LTTNG_VIEWER_ATTACH_SESSION:
70 return "ATTACH_SESSION";
71 case LTTNG_VIEWER_GET_NEXT_INDEX:
72 return "GET_NEXT_INDEX";
73 case LTTNG_VIEWER_GET_PACKET:
74 return "GET_PACKET";
75 case LTTNG_VIEWER_GET_METADATA:
76 return "GET_METADATA";
77 case LTTNG_VIEWER_GET_NEW_STREAMS:
78 return "GET_NEW_STREAMS";
79 case LTTNG_VIEWER_CREATE_SESSION:
80 return "CREATE_SESSION";
81 case LTTNG_VIEWER_DETACH_SESSION:
82 return "DETACH_SESSION";
83 }
84
85 bt_common_abort();
86 }
87
88 static const char *
89 lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
90 {
91 switch (code) {
92 case LTTNG_VIEWER_INDEX_OK:
93 return "INDEX_OK";
94 case LTTNG_VIEWER_INDEX_RETRY:
95 return "INDEX_RETRY";
96 case LTTNG_VIEWER_INDEX_HUP:
97 return "INDEX_HUP";
98 case LTTNG_VIEWER_INDEX_ERR:
99 return "INDEX_ERR";
100 case LTTNG_VIEWER_INDEX_INACTIVE:
101 return "INDEX_INACTIVE";
102 case LTTNG_VIEWER_INDEX_EOF:
103 return "INDEX_EOF";
104 }
105
106 bt_common_abort();
107 }
108
109 static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
110 {
111 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
112 }
113
114 static const char *
115 lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
116 {
117 switch (code) {
118 case LTTNG_VIEWER_GET_PACKET_OK:
119 return "GET_PACKET_OK";
120 case LTTNG_VIEWER_GET_PACKET_RETRY:
121 return "GET_PACKET_RETRY";
122 case LTTNG_VIEWER_GET_PACKET_ERR:
123 return "GET_PACKET_ERR";
124 case LTTNG_VIEWER_GET_PACKET_EOF:
125 return "GET_PACKET_EOF";
126 }
127
128 bt_common_abort();
129 };
130
131 static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
132 {
133 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
134 }
135
136 static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
137 {
138 switch (seek) {
139 case LTTNG_VIEWER_SEEK_BEGINNING:
140 return "SEEK_BEGINNING";
141 case LTTNG_VIEWER_SEEK_LAST:
142 return "SEEK_LAST";
143 }
144
145 bt_common_abort();
146 }
147
148 static inline enum lttng_live_iterator_status
149 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
150 {
151 switch (viewer_status) {
152 case LTTNG_LIVE_VIEWER_STATUS_OK:
153 return LTTNG_LIVE_ITERATOR_STATUS_OK;
154 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
155 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
156 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
157 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
158 }
159
160 bt_common_abort();
161 }
162
163 static inline enum ctf_msg_iter_medium_status
164 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
165 {
166 switch (viewer_status) {
167 case LTTNG_LIVE_VIEWER_STATUS_OK:
168 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
169 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
170 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
171 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
172 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
173 }
174
175 bt_common_abort();
176 }
177
178 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
179 {
180 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
181 bt_self_component *self_comp = viewer_connection->self_comp;
182 int ret = bt_socket_close(viewer_connection->control_sock);
183 if (ret == -1) {
184 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
185 "Error closing viewer connection socket: ", ".");
186 }
187
188 viewer_connection->control_sock = BT_INVALID_SOCKET;
189 }
190
191 /*
192 * This function receives a message from the Relay daemon.
193 * If it received the entire message, it returns _OK,
194 * If it's interrupted, it returns _INTERRUPTED,
195 * otherwise, it returns _ERROR.
196 */
197 static enum lttng_live_viewer_status
198 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
199 {
200 ssize_t received;
201 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
202 bt_self_component *self_comp = viewer_connection->self_comp;
203 size_t total_received = 0, to_receive = len;
204 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
205 enum lttng_live_viewer_status status;
206 BT_SOCKET sock = viewer_connection->control_sock;
207
208 /*
209 * Receive a message from the Relay.
210 */
211 do {
212 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
213 if (received == BT_SOCKET_ERROR) {
214 if (bt_socket_interrupted()) {
215 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
216 /*
217 * This interruption was due to a
218 * SIGINT and the graph is being torn
219 * down.
220 */
221 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
222 lttng_live_msg_iter->was_interrupted = true;
223 goto end;
224 } else {
225 /*
226 * A signal was received, but the graph
227 * is not being torn down. Carry on.
228 */
229 continue;
230 }
231 } else {
232 /*
233 * For any other types of socket error, close
234 * the socket and return an error.
235 */
236 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
237 self_comp, self_comp_class, "Error receiving from Relay", ".");
238
239 viewer_connection_close_socket(viewer_connection);
240 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
241 goto end;
242 }
243 } else if (received == 0) {
244 /*
245 * The recv() call returned 0. This means the
246 * connection was orderly shutdown from the other peer.
247 * If that happens when we are trying to receive
248 * a message from it, it means something when wrong.
249 * Close the socket and return an error.
250 */
251 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
252 "Remote side has closed connection");
253 viewer_connection_close_socket(viewer_connection);
254 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
255 goto end;
256 }
257
258 BT_ASSERT(received <= to_receive);
259 total_received += received;
260 to_receive -= received;
261
262 } while (to_receive > 0);
263
264 BT_ASSERT(total_received == len);
265 status = LTTNG_LIVE_VIEWER_STATUS_OK;
266
267 end:
268 return status;
269 }
270
271 /*
272 * This function sends a message to the Relay daemon.
273 * If it send the message, it returns _OK,
274 * If it's interrupted, it returns _INTERRUPTED,
275 * otherwise, it returns _ERROR.
276 */
277 static enum lttng_live_viewer_status
278 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
279 {
280 enum lttng_live_viewer_status status;
281 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
282 bt_self_component *self_comp = viewer_connection->self_comp;
283 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
284 BT_SOCKET sock = viewer_connection->control_sock;
285 size_t to_send = len;
286 ssize_t total_sent = 0;
287
288 do {
289 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
290 if (sent == BT_SOCKET_ERROR) {
291 if (bt_socket_interrupted()) {
292 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
293 /*
294 * This interruption was a SIGINT and
295 * the graph is being teared down.
296 */
297 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
298 lttng_live_msg_iter->was_interrupted = true;
299 goto end;
300 } else {
301 /*
302 * A signal was received, but the graph
303 * is not being teared down. Carry on.
304 */
305 continue;
306 }
307 } else {
308 /*
309 * For any other types of socket error, close
310 * the socket and return an error.
311 */
312 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
313 self_comp, self_comp_class, "Error sending to Relay", ".");
314
315 viewer_connection_close_socket(viewer_connection);
316 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
317 goto end;
318 }
319 }
320
321 BT_ASSERT(sent <= to_send);
322 total_sent += sent;
323 to_send -= sent;
324
325 } while (to_send > 0);
326
327 BT_ASSERT(total_sent == len);
328 status = LTTNG_LIVE_VIEWER_STATUS_OK;
329
330 end:
331 return status;
332 }
333
334 static int parse_url(struct live_viewer_connection *viewer_connection)
335 {
336 char error_buf[256] = {0};
337 bt_self_component *self_comp = viewer_connection->self_comp;
338 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
339 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {0};
340 int ret = -1;
341 const char *path = viewer_connection->url->str;
342
343 if (!path) {
344 goto end;
345 }
346
347 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
348 if (!lttng_live_url_parts.proto) {
349 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
350 "Invalid LTTng live URL format: %s", error_buf);
351 goto end;
352 }
353 viewer_connection->proto = lttng_live_url_parts.proto;
354 lttng_live_url_parts.proto = NULL;
355
356 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
357 lttng_live_url_parts.hostname = NULL;
358
359 if (lttng_live_url_parts.port >= 0) {
360 viewer_connection->port = lttng_live_url_parts.port;
361 } else {
362 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
363 }
364
365 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
366 lttng_live_url_parts.target_hostname = NULL;
367
368 if (lttng_live_url_parts.session_name) {
369 viewer_connection->session_name = lttng_live_url_parts.session_name;
370 lttng_live_url_parts.session_name = NULL;
371 }
372
373 ret = 0;
374
375 end:
376 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
377 return ret;
378 }
379
380 static enum lttng_live_viewer_status
381 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
382 {
383 struct lttng_viewer_cmd cmd;
384 struct lttng_viewer_connect connect;
385 enum lttng_live_viewer_status status;
386 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
387 bt_self_component *self_comp = viewer_connection->self_comp;
388 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
389 char cmd_buf[cmd_buf_len];
390
391 BT_COMP_OR_COMP_CLASS_LOGD(
392 self_comp, self_comp_class,
393 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
394 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
395
396 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
397 cmd.data_size = htobe64((uint64_t) sizeof(connect));
398 cmd.cmd_version = htobe32(0);
399
400 connect.viewer_session_id = -1ULL; /* will be set on recv */
401 connect.major = htobe32(LTTNG_LIVE_MAJOR);
402 connect.minor = htobe32(LTTNG_LIVE_MINOR);
403 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
404
405 /*
406 * Merge the cmd and connection request to prevent a write-write
407 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
408 * second write to be performed quickly in presence of Nagle's algorithm
409 */
410 memcpy(cmd_buf, &cmd, sizeof(cmd));
411 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
412
413 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
414 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
415 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
416 goto end;
417 }
418
419 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
420 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
421 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
422 goto end;
423 }
424
425 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
426 (uint64_t) be64toh(connect.viewer_session_id));
427 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
428 be32toh(connect.major), be32toh(connect.minor));
429
430 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
431 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
432 "Incompatible lttng-relayd protocol");
433 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
434 goto end;
435 }
436 /* Use the smallest protocol version implemented. */
437 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
438 viewer_connection->minor = be32toh(connect.minor);
439 } else {
440 viewer_connection->minor = LTTNG_LIVE_MINOR;
441 }
442 viewer_connection->major = LTTNG_LIVE_MAJOR;
443
444 status = LTTNG_LIVE_VIEWER_STATUS_OK;
445
446 goto end;
447
448 end:
449 return status;
450 }
451
452 static enum lttng_live_viewer_status
453 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
454 {
455 struct hostent *host;
456 struct sockaddr_in server_addr;
457 enum lttng_live_viewer_status status;
458 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
459 bt_self_component *self_comp = viewer_connection->self_comp;
460
461 if (parse_url(viewer_connection)) {
462 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
463 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
464 goto error;
465 }
466
467 BT_COMP_OR_COMP_CLASS_LOGD(
468 self_comp, self_comp_class,
469 "Connecting to hostname : %s, port : %d, "
470 "target hostname : %s, session name : %s, proto : %s",
471 viewer_connection->relay_hostname->str, viewer_connection->port,
472 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
473 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
474 viewer_connection->proto->str);
475
476 host = gethostbyname(viewer_connection->relay_hostname->str);
477 if (!host) {
478 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
479 "Cannot lookup hostname: hostname=\"%s\"",
480 viewer_connection->relay_hostname->str);
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
486 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
487 "Socket creation failed: %s", bt_socket_errormsg());
488 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
489 goto error;
490 }
491
492 server_addr.sin_family = AF_INET;
493 server_addr.sin_port = htons(viewer_connection->port);
494 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
495 memset(&(server_addr.sin_zero), 0, 8);
496
497 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
498 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
499 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
500 bt_socket_errormsg());
501 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
502 goto error;
503 }
504
505 status = lttng_live_handshake(viewer_connection);
506
507 /*
508 * Only print error and append cause in case of error. not in case of
509 * interruption.
510 */
511 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
512 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
513 "Viewer handshake failed");
514 goto error;
515 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
516 goto end;
517 }
518
519 goto end;
520
521 error:
522 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
523 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
524 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
525 bt_socket_errormsg());
526 }
527 }
528 viewer_connection->control_sock = BT_INVALID_SOCKET;
529 end:
530 return status;
531 }
532
533 static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
534 {
535 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
536 bt_self_component *self_comp = viewer_connection->self_comp;
537
538 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
539 return;
540 }
541 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
543 bt_socket_errormsg());
544 viewer_connection->control_sock = BT_INVALID_SOCKET;
545 }
546 }
547
548 static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
549 bool *_found, struct live_viewer_connection *viewer_connection)
550 {
551 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
552 bt_self_component *self_comp = viewer_connection->self_comp;
553 int ret = 0;
554 uint64_t i, len;
555 bt_value *map = NULL;
556 bt_value *hostname = NULL;
557 bt_value *session_name = NULL;
558 bt_value *btval = NULL;
559 bool found = false;
560
561 len = bt_value_array_get_length(results);
562 for (i = 0; i < len; i++) {
563 const char *hostname_str = NULL;
564 const char *session_name_str = NULL;
565
566 map = bt_value_array_borrow_element_by_index(results, i);
567 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
568 if (!hostname) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"target-hostname\" entry.");
571 ret = -1;
572 goto end;
573 }
574 session_name = bt_value_map_borrow_entry_value(map, "session-name");
575 if (!session_name) {
576 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
577 "Error borrowing \"session-name\" entry.");
578 ret = -1;
579 goto end;
580 }
581 hostname_str = bt_value_string_get(hostname);
582 session_name_str = bt_value_string_get(session_name);
583
584 if (strcmp(session->hostname, hostname_str) == 0 &&
585 strcmp(session->session_name, session_name_str) == 0) {
586 int64_t val;
587 uint32_t streams = be32toh(session->streams);
588 uint32_t clients = be32toh(session->clients);
589
590 found = true;
591
592 btval = bt_value_map_borrow_entry_value(map, "stream-count");
593 if (!btval) {
594 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
595 "Error borrowing \"stream-count\" entry.");
596 ret = -1;
597 goto end;
598 }
599 val = bt_value_integer_unsigned_get(btval);
600 /* sum */
601 val += streams;
602 bt_value_integer_unsigned_set(btval, val);
603
604 btval = bt_value_map_borrow_entry_value(map, "client-count");
605 if (!btval) {
606 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
607 "Error borrowing \"client-count\" entry.");
608 ret = -1;
609 goto end;
610 }
611 val = bt_value_integer_unsigned_get(btval);
612 /* max */
613 val = bt_max_t(int64_t, clients, val);
614 bt_value_integer_unsigned_set(btval, val);
615 }
616
617 if (found) {
618 break;
619 }
620 }
621 end:
622 *_found = found;
623 return ret;
624 }
625
626 static int list_append_session(bt_value *results, GString *base_url,
627 const struct lttng_viewer_session *session,
628 struct live_viewer_connection *viewer_connection)
629 {
630 int ret = 0;
631 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
632 bt_value_map_insert_entry_status insert_status;
633 bt_value_array_append_element_status append_status;
634 bt_value *map = NULL;
635 GString *url = NULL;
636 bool found = false;
637
638 /*
639 * If the session already exists, add the stream count to it,
640 * and do max of client counts.
641 */
642 ret = list_update_session(results, session, &found, viewer_connection);
643 if (ret || found) {
644 goto end;
645 }
646
647 map = bt_value_map_create();
648 if (!map) {
649 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
650 ret = -1;
651 goto end;
652 }
653
654 if (base_url->len < 1) {
655 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
656 ret = -1;
657 goto end;
658 }
659 /*
660 * key = "url",
661 * value = <string>,
662 */
663 url = g_string_new(base_url->str);
664 g_string_append(url, "/host/");
665 g_string_append(url, session->hostname);
666 g_string_append_c(url, '/');
667 g_string_append(url, session->session_name);
668
669 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
670 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
671 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
672 ret = -1;
673 goto end;
674 }
675
676 /*
677 * key = "target-hostname",
678 * value = <string>,
679 */
680 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
681 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
682 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
683 "Error inserting \"target-hostname\" entry.");
684 ret = -1;
685 goto end;
686 }
687
688 /*
689 * key = "session-name",
690 * value = <string>,
691 */
692 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
693 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
694 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
695 ret = -1;
696 goto end;
697 }
698
699 /*
700 * key = "timer-us",
701 * value = <integer>,
702 */
703 {
704 uint32_t live_timer = be32toh(session->live_timer);
705
706 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
707 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
708 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
709 ret = -1;
710 goto end;
711 }
712 }
713
714 /*
715 * key = "stream-count",
716 * value = <integer>,
717 */
718 {
719 uint32_t streams = be32toh(session->streams);
720
721 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
722 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
723 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
724 "Error inserting \"stream-count\" entry.");
725 ret = -1;
726 goto end;
727 }
728 }
729
730 /*
731 * key = "client-count",
732 * value = <integer>,
733 */
734 {
735 uint32_t clients = be32toh(session->clients);
736
737 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
738 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
739 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
740 "Error inserting \"client-count\" entry.");
741 ret = -1;
742 goto end;
743 }
744 }
745
746 append_status = bt_value_array_append_element(results, map);
747 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
748 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
749 ret = -1;
750 }
751
752 end:
753 if (url) {
754 g_string_free(url, true);
755 }
756 BT_VALUE_PUT_REF_AND_RESET(map);
757 return ret;
758 }
759
760 /*
761 * Data structure returned:
762 *
763 * {
764 * <array> = {
765 * [n] = {
766 * <map> = {
767 * {
768 * key = "url",
769 * value = <string>,
770 * },
771 * {
772 * key = "target-hostname",
773 * value = <string>,
774 * },
775 * {
776 * key = "session-name",
777 * value = <string>,
778 * },
779 * {
780 * key = "timer-us",
781 * value = <integer>,
782 * },
783 * {
784 * key = "stream-count",
785 * value = <integer>,
786 * },
787 * {
788 * key = "client-count",
789 * value = <integer>,
790 * },
791 * },
792 * }
793 * }
794 */
795
796 BT_HIDDEN
797 bt_component_class_query_method_status
798 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
799 const bt_value **user_result)
800 {
801 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
802 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
803 bt_value *result = NULL;
804 enum lttng_live_viewer_status viewer_status;
805 struct lttng_viewer_cmd cmd;
806 struct lttng_viewer_list_sessions list;
807 uint32_t i, sessions_count;
808
809 result = bt_value_array_create();
810 if (!result) {
811 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
812 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
813 goto error;
814 }
815
816 BT_LOGD("Requesting list of sessions: cmd=%s",
817 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
818
819 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
820 cmd.data_size = htobe64((uint64_t) 0);
821 cmd.cmd_version = htobe32(0);
822
823 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
824 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
825 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
826 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
827 goto error;
828 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
829 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
830 goto error;
831 }
832
833 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
834 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
835 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
836 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
837 goto error;
838 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
839 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
840 goto error;
841 }
842
843 sessions_count = be32toh(list.sessions_count);
844 for (i = 0; i < sessions_count; i++) {
845 struct lttng_viewer_session lsession;
846
847 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
848 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
849 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
850 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
851 goto error;
852 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
853 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
854 goto error;
855 }
856
857 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
858 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
859 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
860 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
861 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
862 goto error;
863 }
864 }
865
866 *user_result = result;
867 goto end;
868 error:
869 BT_VALUE_PUT_REF_AND_RESET(result);
870 end:
871 return status;
872 }
873
874 static enum lttng_live_viewer_status
875 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
876 {
877 struct lttng_viewer_cmd cmd;
878 struct lttng_viewer_list_sessions list;
879 struct lttng_viewer_session lsession;
880 uint32_t i, sessions_count;
881 uint64_t session_id;
882 enum lttng_live_viewer_status status;
883 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
884 bt_self_component *self_comp = viewer_connection->self_comp;
885 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
886
887 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
888 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
889
890 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
891 cmd.data_size = htobe64((uint64_t) 0);
892 cmd.cmd_version = htobe32(0);
893
894 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
895 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
896 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
897 goto end;
898 }
899
900 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
901 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
902 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
903 goto end;
904 }
905
906 sessions_count = be32toh(list.sessions_count);
907 for (i = 0; i < sessions_count; i++) {
908 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
909 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
910 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
911 goto end;
912 }
913 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
914 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
915 session_id = be64toh(lsession.id);
916
917 BT_COMP_LOGI("Adding session to internal list: "
918 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
919 session_id, lsession.hostname, lsession.session_name);
920
921 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
922 LTTNG_VIEWER_NAME_MAX) == 0) &&
923 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
924 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
925 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
926 lsession.session_name)) {
927 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
928 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
929 goto end;
930 }
931 }
932 }
933
934 status = LTTNG_LIVE_VIEWER_STATUS_OK;
935
936 end:
937 return status;
938 }
939
940 BT_HIDDEN
941 enum lttng_live_viewer_status
942 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
943 {
944 struct lttng_viewer_cmd cmd;
945 struct lttng_viewer_create_session_response resp;
946 enum lttng_live_viewer_status status;
947 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
948 bt_self_component *self_comp = viewer_connection->self_comp;
949 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
950
951 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
952 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
953
954 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
955 cmd.data_size = htobe64((uint64_t) 0);
956 cmd.cmd_version = htobe32(0);
957
958 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
959 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
960 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
961 goto end;
962 }
963
964 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
965 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
966 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
967 goto end;
968 }
969
970 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
971 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
972 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
973 goto end;
974 }
975
976 status = lttng_live_query_session_ids(lttng_live_msg_iter);
977 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
978 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
979 goto end;
980 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
981 goto end;
982 }
983
984 end:
985 return status;
986 }
987
988 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
989 uint32_t stream_count,
990 bt_self_message_iterator *self_msg_iter)
991 {
992 uint32_t i;
993 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
994 enum lttng_live_viewer_status status;
995 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
996 bt_self_component *self_comp = viewer_connection->self_comp;
997
998 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
999 for (i = 0; i < stream_count; i++) {
1000 struct lttng_viewer_stream stream;
1001 struct lttng_live_stream_iterator *live_stream;
1002 uint64_t stream_id;
1003 uint64_t ctf_trace_id;
1004
1005 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
1006 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1007 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
1008 goto end;
1009 }
1010 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1011 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1012 stream_id = be64toh(stream.id);
1013 ctf_trace_id = be64toh(stream.ctf_trace_id);
1014
1015 if (stream.metadata_flag) {
1016 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1017 stream.channel_name);
1018 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id,
1019 stream.path_name)) {
1020 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1021 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1022 goto end;
1023 }
1024 session->lazy_stream_msg_init = true;
1025 } else {
1026 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1027 stream.channel_name);
1028 live_stream =
1029 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1030 if (!live_stream) {
1031 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1032 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1033 goto end;
1034 }
1035 }
1036 }
1037 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1038
1039 end:
1040 return status;
1041 }
1042
1043 BT_HIDDEN
1044 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1045 bt_self_message_iterator *self_msg_iter)
1046 {
1047 struct lttng_viewer_cmd cmd;
1048 enum lttng_live_viewer_status status;
1049 struct lttng_viewer_attach_session_request rq;
1050 struct lttng_viewer_attach_session_response rp;
1051 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1052 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1053 bt_self_component *self_comp = viewer_connection->self_comp;
1054 uint64_t session_id = session->id;
1055 uint32_t streams_count;
1056 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1057 char cmd_buf[cmd_buf_len];
1058
1059 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1060 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1061 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1062
1063 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1064 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1065 cmd.cmd_version = htobe32(0);
1066
1067 memset(&rq, 0, sizeof(rq));
1068 rq.session_id = htobe64(session_id);
1069 // TODO: add cmd line parameter to select seek beginning
1070 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1071 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1072
1073 /*
1074 * Merge the cmd and connection request to prevent a write-write
1075 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1076 * second write to be performed quickly in presence of Nagle's algorithm.
1077 */
1078 memcpy(cmd_buf, &cmd, sizeof(cmd));
1079 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1080 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1081 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1082 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1083 goto end;
1084 }
1085
1086 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1087 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1088 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1089 goto end;
1090 }
1091
1092 streams_count = be32toh(rp.streams_count);
1093 switch (be32toh(rp.status)) {
1094 case LTTNG_VIEWER_ATTACH_OK:
1095 break;
1096 case LTTNG_VIEWER_ATTACH_UNK:
1097 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1098 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1099 goto end;
1100 case LTTNG_VIEWER_ATTACH_ALREADY:
1101 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1102 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1103 goto end;
1104 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1105 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1106 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1107 goto end;
1108 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1109 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1110 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1111 goto end;
1112 default:
1113 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1114 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1115 goto end;
1116 }
1117
1118 /* We receive the initial list of streams. */
1119 status = receive_streams(session, streams_count, self_msg_iter);
1120 switch (status) {
1121 case LTTNG_LIVE_VIEWER_STATUS_OK:
1122 break;
1123 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1124 goto end;
1125 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1126 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1127 goto end;
1128 default:
1129 bt_common_abort();
1130 }
1131
1132 session->attached = true;
1133 session->new_streams_needed = false;
1134
1135 end:
1136 return status;
1137 }
1138
1139 BT_HIDDEN
1140 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
1141 {
1142 struct lttng_viewer_cmd cmd;
1143 enum lttng_live_viewer_status status;
1144 struct lttng_viewer_detach_session_request rq;
1145 struct lttng_viewer_detach_session_response rp;
1146 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1147 bt_self_component *self_comp = session->self_comp;
1148 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1149 uint64_t session_id = session->id;
1150 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1151 char cmd_buf[cmd_buf_len];
1152
1153 /*
1154 * The session might already be detached and the viewer socket might
1155 * already been closed. This happens when calling this function when
1156 * tearing down the graph after an error.
1157 */
1158 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1159 return LTTNG_LIVE_VIEWER_STATUS_OK;
1160 }
1161
1162 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1163 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1164
1165 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1166 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1167 cmd.cmd_version = htobe32(0);
1168
1169 memset(&rq, 0, sizeof(rq));
1170 rq.session_id = htobe64(session_id);
1171
1172 /*
1173 * Merge the cmd and connection request to prevent a write-write
1174 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1175 * second write to be performed quickly in presence of Nagle's algorithm.
1176 */
1177 memcpy(cmd_buf, &cmd, sizeof(cmd));
1178 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1179 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1180 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1181 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1182 goto end;
1183 }
1184
1185 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1186 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1187 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1188 goto end;
1189 }
1190
1191 switch (be32toh(rp.status)) {
1192 case LTTNG_VIEWER_DETACH_SESSION_OK:
1193 break;
1194 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1195 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1196 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1197 goto end;
1198 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1199 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1200 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1201 goto end;
1202 default:
1203 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1204 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1205 goto end;
1206 }
1207
1208 session->attached = false;
1209
1210 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1211
1212 end:
1213 return status;
1214 }
1215
1216 BT_HIDDEN
1217 enum lttng_live_get_one_metadata_status
1218 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1219 {
1220 uint64_t len = 0;
1221 enum lttng_live_get_one_metadata_status status;
1222 enum lttng_live_viewer_status viewer_status;
1223 struct lttng_viewer_cmd cmd;
1224 struct lttng_viewer_get_metadata rq;
1225 struct lttng_viewer_metadata_packet rp;
1226 gchar *data = NULL;
1227 ssize_t writelen;
1228 struct lttng_live_session *session = trace->session;
1229 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1230 struct lttng_live_metadata *metadata = trace->metadata;
1231 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1232 bt_self_component *self_comp = viewer_connection->self_comp;
1233 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1234 char cmd_buf[cmd_buf_len];
1235
1236 BT_COMP_LOGD("Requesting new metadata for trace:"
1237 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1238 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1239 metadata->stream_id);
1240
1241 rq.stream_id = htobe64(metadata->stream_id);
1242 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1243 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1244 cmd.cmd_version = htobe32(0);
1245
1246 /*
1247 * Merge the cmd and connection request to prevent a write-write
1248 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1249 * second write to be performed quickly in presence of Nagle's algorithm.
1250 */
1251 memcpy(cmd_buf, &cmd, sizeof(cmd));
1252 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1253 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1254 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1255 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1256 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1257 goto end;
1258 }
1259
1260 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1261 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1262 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1263 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1264 goto end;
1265 }
1266
1267 switch (be32toh(rp.status)) {
1268 case LTTNG_VIEWER_METADATA_OK:
1269 BT_COMP_LOGD("Received get_metadata response: ok");
1270 break;
1271 case LTTNG_VIEWER_NO_NEW_METADATA:
1272 BT_COMP_LOGD("Received get_metadata response: no new");
1273 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1274 goto end;
1275 case LTTNG_VIEWER_METADATA_ERR:
1276 /*
1277 * The Relayd cannot find this stream id. Maybe its
1278 * gone already. This can happen in short lived UST app
1279 * in a per-pid session.
1280 */
1281 BT_COMP_LOGD("Received get_metadata response: error");
1282 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1283 goto end;
1284 default:
1285 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1286 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1287 goto end;
1288 }
1289
1290 len = be64toh(rp.len);
1291 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1292 if (len <= 0) {
1293 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1294 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1295 goto end;
1296 }
1297
1298 data = g_new0(gchar, len);
1299 if (!data) {
1300 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1301 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1302 goto end;
1303 }
1304
1305 viewer_status = lttng_live_recv(viewer_connection, data, len);
1306 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1307 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1308 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1309 goto end;
1310 }
1311
1312 /*
1313 * Write the metadata to the file handle.
1314 */
1315 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1316 if (writelen != len) {
1317 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1318 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1319 goto end;
1320 }
1321
1322 *reply_len = len;
1323 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1324
1325 end:
1326 g_free(data);
1327 return status;
1328 }
1329
1330 /*
1331 * Assign the fields from a lttng_viewer_index to a packet_index.
1332 */
1333 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1334 struct packet_index *pindex)
1335 {
1336 BT_ASSERT(lindex);
1337 BT_ASSERT(pindex);
1338
1339 pindex->offset = be64toh(lindex->offset);
1340 pindex->packet_size = be64toh(lindex->packet_size);
1341 pindex->content_size = be64toh(lindex->content_size);
1342 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1343 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1344 pindex->events_discarded = be64toh(lindex->events_discarded);
1345 }
1346
1347 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1348 {
1349 uint64_t session_idx;
1350 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1351
1352 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1353 struct lttng_live_session *session =
1354 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1355 BT_COMP_LOGD("Marking session as needing new streams: "
1356 "session-id=%" PRIu64,
1357 session->id);
1358 session->new_streams_needed = true;
1359 }
1360 }
1361
1362 BT_HIDDEN
1363 enum lttng_live_iterator_status
1364 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1365 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1366 {
1367 struct lttng_viewer_cmd cmd;
1368 struct lttng_viewer_get_next_index rq;
1369 enum lttng_live_viewer_status viewer_status;
1370 struct lttng_viewer_index rp;
1371 enum lttng_live_iterator_status status;
1372 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1373 bt_self_component *self_comp = viewer_connection->self_comp;
1374 struct lttng_live_trace *trace = stream->trace;
1375 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1376 char cmd_buf[cmd_buf_len];
1377 uint32_t flags, rp_status;
1378
1379 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1380 "viewer-stream-id=%" PRIu64,
1381 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1382 stream->viewer_stream_id);
1383 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1384 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1385 cmd.cmd_version = htobe32(0);
1386
1387 memset(&rq, 0, sizeof(rq));
1388 rq.stream_id = htobe64(stream->viewer_stream_id);
1389
1390 /*
1391 * Merge the cmd and connection request to prevent a write-write
1392 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1393 * second write to be performed quickly in presence of Nagle's algorithm.
1394 */
1395 memcpy(cmd_buf, &cmd, sizeof(cmd));
1396 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1397
1398 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1399 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1400 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1401 goto error;
1402 }
1403
1404 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1405 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1406 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1407 goto error;
1408 }
1409
1410 flags = be32toh(rp.flags);
1411 rp_status = be32toh(rp.status);
1412
1413 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1414 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1415 lttng_viewer_next_index_return_code_string(rp_status));
1416 switch (rp_status) {
1417 case LTTNG_VIEWER_INDEX_INACTIVE:
1418 {
1419 uint64_t ctf_stream_class_id;
1420
1421 memset(index, 0, sizeof(struct packet_index));
1422 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1423 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1424 ctf_stream_class_id = be64toh(rp.stream_id);
1425 if (stream->ctf_stream_class_id.is_set) {
1426 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1427 } else {
1428 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1429 stream->ctf_stream_class_id.is_set = true;
1430 }
1431 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1432 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1433 break;
1434 }
1435 case LTTNG_VIEWER_INDEX_OK:
1436 {
1437 uint64_t ctf_stream_class_id;
1438
1439 lttng_index_to_packet_index(&rp, index);
1440 ctf_stream_class_id = be64toh(rp.stream_id);
1441 if (stream->ctf_stream_class_id.is_set) {
1442 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1443 } else {
1444 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1445 stream->ctf_stream_class_id.is_set = true;
1446 }
1447
1448 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1449
1450 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1451 BT_COMP_LOGD("Marking trace as needing new metadata: "
1452 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1453 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1454 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1455 }
1456 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1457 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1458 "response=%s, response-flag=NEW_STREAM",
1459 lttng_viewer_next_index_return_code_string(rp_status));
1460 lttng_live_need_new_streams(lttng_live_msg_iter);
1461 }
1462 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1463 break;
1464 }
1465 case LTTNG_VIEWER_INDEX_RETRY:
1466 memset(index, 0, sizeof(struct packet_index));
1467 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1468 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1469 goto end;
1470 case LTTNG_VIEWER_INDEX_HUP:
1471 memset(index, 0, sizeof(struct packet_index));
1472 index->offset = EOF;
1473 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1474 stream->has_stream_hung_up = true;
1475 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1476 break;
1477 case LTTNG_VIEWER_INDEX_ERR:
1478 memset(index, 0, sizeof(struct packet_index));
1479 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1480 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1481 goto end;
1482 default:
1483 BT_COMP_LOGD("Received get_next_index response: unknown value");
1484 memset(index, 0, sizeof(struct packet_index));
1485 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1486 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1487 goto end;
1488 }
1489 goto end;
1490
1491 error:
1492 status = viewer_status_to_live_iterator_status(viewer_status);
1493 end:
1494 return status;
1495 }
1496
1497 BT_HIDDEN
1498 enum ctf_msg_iter_medium_status
1499 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1500 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1501 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1502 {
1503 enum ctf_msg_iter_medium_status status;
1504 enum lttng_live_viewer_status viewer_status;
1505 struct lttng_viewer_trace_packet rp;
1506 struct lttng_viewer_cmd cmd;
1507 struct lttng_viewer_get_packet rq;
1508 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1509 bt_self_component *self_comp = viewer_connection->self_comp;
1510 struct lttng_live_trace *trace = stream->trace;
1511 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1512 char cmd_buf[cmd_buf_len];
1513 uint32_t flags, rp_status;
1514
1515 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1516 "offset=%" PRIu64 ", request-len=%" PRIu64,
1517 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1518
1519 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1520 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1521 cmd.cmd_version = htobe32(0);
1522
1523 memset(&rq, 0, sizeof(rq));
1524 rq.stream_id = htobe64(stream->viewer_stream_id);
1525 rq.offset = htobe64(offset);
1526 rq.len = htobe32(req_len);
1527
1528 /*
1529 * Merge the cmd and connection request to prevent a write-write
1530 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1531 * second write to be performed quickly in presence of Nagle's algorithm.
1532 */
1533 memcpy(cmd_buf, &cmd, sizeof(cmd));
1534 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1535
1536 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1537 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1538 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1539 goto error_convert_status;
1540 }
1541
1542 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1543 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1544 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1545 goto error_convert_status;
1546 }
1547
1548 flags = be32toh(rp.flags);
1549 rp_status = be32toh(rp.status);
1550
1551 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1552 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1553 lttng_viewer_get_packet_return_code_string(rp_status));
1554 switch (rp_status) {
1555 case LTTNG_VIEWER_GET_PACKET_OK:
1556 req_len = be32toh(rp.len);
1557 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1558 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
1559 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1560 break;
1561 case LTTNG_VIEWER_GET_PACKET_RETRY:
1562 /* Unimplemented by relay daemon */
1563 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1564 goto end;
1565 case LTTNG_VIEWER_GET_PACKET_ERR:
1566 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1567 BT_COMP_LOGD("Marking trace as needing new metadata: "
1568 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1569 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1570 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1571 }
1572 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1573 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1574 "response=%s, response-flag=NEW_STREAM",
1575 lttng_viewer_next_index_return_code_string(rp_status));
1576 lttng_live_need_new_streams(lttng_live_msg_iter);
1577 }
1578 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1579 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1580 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1581 lttng_viewer_get_packet_return_code_string(rp_status));
1582 goto end;
1583 }
1584 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1585 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1586 goto end;
1587 case LTTNG_VIEWER_GET_PACKET_EOF:
1588 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1589 goto end;
1590 default:
1591 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1592 rp_status);
1593 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1594 goto end;
1595 }
1596
1597 if (req_len == 0) {
1598 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1599 goto end;
1600 }
1601
1602 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1603 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1604 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1605 goto error_convert_status;
1606 }
1607 *recv_len = req_len;
1608
1609 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1610 goto end;
1611
1612 error_convert_status:
1613 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1614 end:
1615 return status;
1616 }
1617
1618 /*
1619 * Request new streams for a session.
1620 */
1621 BT_HIDDEN
1622 enum lttng_live_iterator_status
1623 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1624 bt_self_message_iterator *self_msg_iter)
1625 {
1626 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1627 struct lttng_viewer_cmd cmd;
1628 struct lttng_viewer_new_streams_request rq;
1629 struct lttng_viewer_new_streams_response rp;
1630 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1631 enum lttng_live_viewer_status viewer_status;
1632 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1633 bt_self_component *self_comp = viewer_connection->self_comp;
1634 uint32_t streams_count;
1635 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1636 char cmd_buf[cmd_buf_len];
1637
1638 if (!session->new_streams_needed) {
1639 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1640 goto end;
1641 }
1642
1643 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1644 "session-id=%" PRIu64,
1645 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1646
1647 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1648 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1649 cmd.cmd_version = htobe32(0);
1650
1651 memset(&rq, 0, sizeof(rq));
1652 rq.session_id = htobe64(session->id);
1653
1654 /*
1655 * Merge the cmd and connection request to prevent a write-write
1656 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1657 * second write to be performed quickly in presence of Nagle's algorithm.
1658 */
1659 memcpy(cmd_buf, &cmd, sizeof(cmd));
1660 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1661
1662 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1663 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1664 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1665 status = viewer_status_to_live_iterator_status(viewer_status);
1666 goto end;
1667 }
1668
1669 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1670 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1671 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1672 status = viewer_status_to_live_iterator_status(viewer_status);
1673 goto end;
1674 }
1675
1676 streams_count = be32toh(rp.streams_count);
1677
1678 switch (be32toh(rp.status)) {
1679 case LTTNG_VIEWER_NEW_STREAMS_OK:
1680 session->new_streams_needed = false;
1681 break;
1682 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1683 session->new_streams_needed = false;
1684 goto end;
1685 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1686 session->new_streams_needed = false;
1687 session->closed = true;
1688 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1689 goto end;
1690 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1691 BT_COMP_LOGD("Received get_new_streams response: error");
1692 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1693 goto end;
1694 default:
1695 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1696 "Received get_new_streams response: Unknown:"
1697 "return code %u",
1698 be32toh(rp.status));
1699 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1700 goto end;
1701 }
1702
1703 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1704 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1705 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1706 status = viewer_status_to_live_iterator_status(viewer_status);
1707 goto end;
1708 }
1709
1710 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1711 end:
1712 return status;
1713 }
1714
1715 BT_HIDDEN
1716 enum lttng_live_viewer_status live_viewer_connection_create(
1717 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1718 bt_logging_level log_level, const char *url, bool in_query,
1719 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
1720 {
1721 struct live_viewer_connection *viewer_connection;
1722 enum lttng_live_viewer_status status;
1723
1724 viewer_connection = g_new0(struct live_viewer_connection, 1);
1725
1726 if (bt_socket_init(log_level) != 0) {
1727 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1728 "Failed to init socket");
1729 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1730 goto error;
1731 }
1732
1733 viewer_connection->log_level = log_level;
1734
1735 viewer_connection->self_comp = self_comp;
1736 viewer_connection->self_comp_class = self_comp_class;
1737
1738 viewer_connection->control_sock = BT_INVALID_SOCKET;
1739 viewer_connection->port = -1;
1740 viewer_connection->in_query = in_query;
1741 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1742 viewer_connection->url = g_string_new(url);
1743 if (!viewer_connection->url) {
1744 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1745 "Failed to allocate URL buffer");
1746 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1747 goto error;
1748 }
1749
1750 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1751 "Establishing connection to url \"%s\"...", url);
1752 status = lttng_live_connect_viewer(viewer_connection);
1753 /*
1754 * Only print error and append cause in case of error. not in case of
1755 * interruption.
1756 */
1757 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1758 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1759 "Failed to establish connection: "
1760 "url=\"%s\"",
1761 url);
1762 goto error;
1763 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1764 goto error;
1765 }
1766 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1767 "Connection to url \"%s\" is established", url);
1768
1769 *viewer = viewer_connection;
1770 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1771 goto end;
1772
1773 error:
1774 if (viewer_connection) {
1775 live_viewer_connection_destroy(viewer_connection);
1776 }
1777 end:
1778 return status;
1779 }
1780
1781 BT_HIDDEN
1782 void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
1783 {
1784 bt_self_component *self_comp = viewer_connection->self_comp;
1785 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1786
1787 if (!viewer_connection) {
1788 goto end;
1789 }
1790
1791 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1792 "Closing connection to relay:"
1793 "relay-url=\"%s\"",
1794 viewer_connection->url->str);
1795
1796 lttng_live_disconnect_viewer(viewer_connection);
1797
1798 if (viewer_connection->url) {
1799 g_string_free(viewer_connection->url, true);
1800 }
1801
1802 if (viewer_connection->relay_hostname) {
1803 g_string_free(viewer_connection->relay_hostname, true);
1804 }
1805
1806 if (viewer_connection->target_hostname) {
1807 g_string_free(viewer_connection->target_hostname, true);
1808 }
1809
1810 if (viewer_connection->session_name) {
1811 g_string_free(viewer_connection->session_name, true);
1812 }
1813
1814 if (viewer_connection->proto) {
1815 g_string_free(viewer_connection->proto, true);
1816 }
1817
1818 g_free(viewer_connection);
1819
1820 bt_socket_fini();
1821
1822 end:
1823 return;
1824 }
This page took 0.068466 seconds and 4 git commands to generate.