Sort includes in C++ files
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #include <fcntl.h>
9 #include <glib.h>
10 #include <stdbool.h>
11 #include <stdint.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <sys/types.h>
15 #include <unistd.h>
16
17 #include <babeltrace2/babeltrace.h>
18
19 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
20 #define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
21 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
22 #include "logging/comp-logging.h"
23
24 #include "common/common.h"
25 #include "compat/compiler.h"
26 #include "compat/endian.h"
27 #include "compat/socket.h"
28
29 #include "data-stream.hpp"
30 #include "lttng-live.hpp"
31 #include "lttng-viewer-abi.hpp"
32 #include "metadata.hpp"
33 #include "viewer-connection.hpp"
34
35 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
36 do { \
37 switch (_status) { \
38 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
39 break; \
40 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
41 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
42 "Error " _action " " _msg_str); \
43 break; \
44 default: \
45 bt_common_abort(); \
46 } \
47 } while (0)
48
49 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
50 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
51
52 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
53 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
54
55 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
56 _fmt, ...) \
57 do { \
58 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
59 bt_socket_errormsg(), ##__VA_ARGS__); \
60 } while (0)
61
62 static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
63 {
64 switch (cmd) {
65 case LTTNG_VIEWER_CONNECT:
66 return "CONNECT";
67 case LTTNG_VIEWER_LIST_SESSIONS:
68 return "LIST_SESSIONS";
69 case LTTNG_VIEWER_ATTACH_SESSION:
70 return "ATTACH_SESSION";
71 case LTTNG_VIEWER_GET_NEXT_INDEX:
72 return "GET_NEXT_INDEX";
73 case LTTNG_VIEWER_GET_PACKET:
74 return "GET_PACKET";
75 case LTTNG_VIEWER_GET_METADATA:
76 return "GET_METADATA";
77 case LTTNG_VIEWER_GET_NEW_STREAMS:
78 return "GET_NEW_STREAMS";
79 case LTTNG_VIEWER_CREATE_SESSION:
80 return "CREATE_SESSION";
81 case LTTNG_VIEWER_DETACH_SESSION:
82 return "DETACH_SESSION";
83 }
84
85 bt_common_abort();
86 }
87
88 static const char *
89 lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
90 {
91 switch (code) {
92 case LTTNG_VIEWER_INDEX_OK:
93 return "INDEX_OK";
94 case LTTNG_VIEWER_INDEX_RETRY:
95 return "INDEX_RETRY";
96 case LTTNG_VIEWER_INDEX_HUP:
97 return "INDEX_HUP";
98 case LTTNG_VIEWER_INDEX_ERR:
99 return "INDEX_ERR";
100 case LTTNG_VIEWER_INDEX_INACTIVE:
101 return "INDEX_INACTIVE";
102 case LTTNG_VIEWER_INDEX_EOF:
103 return "INDEX_EOF";
104 }
105
106 bt_common_abort();
107 }
108
109 static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
110 {
111 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
112 }
113
114 static const char *
115 lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
116 {
117 switch (code) {
118 case LTTNG_VIEWER_GET_PACKET_OK:
119 return "GET_PACKET_OK";
120 case LTTNG_VIEWER_GET_PACKET_RETRY:
121 return "GET_PACKET_RETRY";
122 case LTTNG_VIEWER_GET_PACKET_ERR:
123 return "GET_PACKET_ERR";
124 case LTTNG_VIEWER_GET_PACKET_EOF:
125 return "GET_PACKET_EOF";
126 }
127
128 bt_common_abort();
129 };
130
131 static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
132 {
133 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
134 }
135
136 static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
137 {
138 switch (seek) {
139 case LTTNG_VIEWER_SEEK_BEGINNING:
140 return "SEEK_BEGINNING";
141 case LTTNG_VIEWER_SEEK_LAST:
142 return "SEEK_LAST";
143 }
144
145 bt_common_abort();
146 }
147
148 static inline enum lttng_live_iterator_status
149 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
150 {
151 switch (viewer_status) {
152 case LTTNG_LIVE_VIEWER_STATUS_OK:
153 return LTTNG_LIVE_ITERATOR_STATUS_OK;
154 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
155 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
156 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
157 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
158 }
159
160 bt_common_abort();
161 }
162
163 static inline enum ctf_msg_iter_medium_status
164 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
165 {
166 switch (viewer_status) {
167 case LTTNG_LIVE_VIEWER_STATUS_OK:
168 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
169 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
170 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
171 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
172 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
173 }
174
175 bt_common_abort();
176 }
177
178 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
179 {
180 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
181 bt_self_component *self_comp = viewer_connection->self_comp;
182 int ret = bt_socket_close(viewer_connection->control_sock);
183 if (ret == -1) {
184 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
185 "Error closing viewer connection socket: ", ".");
186 }
187
188 viewer_connection->control_sock = BT_INVALID_SOCKET;
189 }
190
191 /*
192 * This function receives a message from the Relay daemon.
193 * If it received the entire message, it returns _OK,
194 * If it's interrupted, it returns _INTERRUPTED,
195 * otherwise, it returns _ERROR.
196 */
197 static enum lttng_live_viewer_status
198 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
199 {
200 ssize_t received;
201 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
202 bt_self_component *self_comp = viewer_connection->self_comp;
203 size_t total_received = 0, to_receive = len;
204 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
205 enum lttng_live_viewer_status status;
206 BT_SOCKET sock = viewer_connection->control_sock;
207
208 /*
209 * Receive a message from the Relay.
210 */
211 do {
212 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
213 if (received == BT_SOCKET_ERROR) {
214 if (bt_socket_interrupted()) {
215 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
216 /*
217 * This interruption was due to a
218 * SIGINT and the graph is being torn
219 * down.
220 */
221 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
222 lttng_live_msg_iter->was_interrupted = true;
223 goto end;
224 } else {
225 /*
226 * A signal was received, but the graph
227 * is not being torn down. Carry on.
228 */
229 continue;
230 }
231 } else {
232 /*
233 * For any other types of socket error, close
234 * the socket and return an error.
235 */
236 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
237 self_comp, self_comp_class, "Error receiving from Relay", ".");
238
239 viewer_connection_close_socket(viewer_connection);
240 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
241 goto end;
242 }
243 } else if (received == 0) {
244 /*
245 * The recv() call returned 0. This means the
246 * connection was orderly shutdown from the other peer.
247 * If that happens when we are trying to receive
248 * a message from it, it means something when wrong.
249 * Close the socket and return an error.
250 */
251 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
252 "Remote side has closed connection");
253 viewer_connection_close_socket(viewer_connection);
254 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
255 goto end;
256 }
257
258 BT_ASSERT(received <= to_receive);
259 total_received += received;
260 to_receive -= received;
261
262 } while (to_receive > 0);
263
264 BT_ASSERT(total_received == len);
265 status = LTTNG_LIVE_VIEWER_STATUS_OK;
266
267 end:
268 return status;
269 }
270
271 /*
272 * This function sends a message to the Relay daemon.
273 * If it send the message, it returns _OK,
274 * If it's interrupted, it returns _INTERRUPTED,
275 * otherwise, it returns _ERROR.
276 */
277 static enum lttng_live_viewer_status
278 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
279 {
280 enum lttng_live_viewer_status status;
281 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
282 bt_self_component *self_comp = viewer_connection->self_comp;
283 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
284 BT_SOCKET sock = viewer_connection->control_sock;
285 size_t to_send = len;
286 ssize_t total_sent = 0;
287
288 do {
289 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
290 if (sent == BT_SOCKET_ERROR) {
291 if (bt_socket_interrupted()) {
292 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
293 /*
294 * This interruption was a SIGINT and
295 * the graph is being teared down.
296 */
297 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
298 lttng_live_msg_iter->was_interrupted = true;
299 goto end;
300 } else {
301 /*
302 * A signal was received, but the graph
303 * is not being teared down. Carry on.
304 */
305 continue;
306 }
307 } else {
308 /*
309 * For any other types of socket error, close
310 * the socket and return an error.
311 */
312 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
313 self_comp, self_comp_class, "Error sending to Relay", ".");
314
315 viewer_connection_close_socket(viewer_connection);
316 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
317 goto end;
318 }
319 }
320
321 BT_ASSERT(sent <= to_send);
322 total_sent += sent;
323 to_send -= sent;
324
325 } while (to_send > 0);
326
327 BT_ASSERT(total_sent == len);
328 status = LTTNG_LIVE_VIEWER_STATUS_OK;
329
330 end:
331 return status;
332 }
333
334 static int parse_url(struct live_viewer_connection *viewer_connection)
335 {
336 char error_buf[256] = {0};
337 bt_self_component *self_comp = viewer_connection->self_comp;
338 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
339 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
340 int ret = -1;
341 const char *path = viewer_connection->url->str;
342
343 if (!path) {
344 goto end;
345 }
346
347 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
348 if (!lttng_live_url_parts.proto) {
349 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
350 "Invalid LTTng live URL format: %s", error_buf);
351 goto end;
352 }
353 viewer_connection->proto = lttng_live_url_parts.proto;
354 lttng_live_url_parts.proto = NULL;
355
356 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
357 lttng_live_url_parts.hostname = NULL;
358
359 if (lttng_live_url_parts.port >= 0) {
360 viewer_connection->port = lttng_live_url_parts.port;
361 } else {
362 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
363 }
364
365 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
366 lttng_live_url_parts.target_hostname = NULL;
367
368 if (lttng_live_url_parts.session_name) {
369 viewer_connection->session_name = lttng_live_url_parts.session_name;
370 lttng_live_url_parts.session_name = NULL;
371 }
372
373 ret = 0;
374
375 end:
376 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
377 return ret;
378 }
379
380 static enum lttng_live_viewer_status
381 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
382 {
383 struct lttng_viewer_cmd cmd;
384 struct lttng_viewer_connect connect;
385 enum lttng_live_viewer_status status;
386 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
387 bt_self_component *self_comp = viewer_connection->self_comp;
388 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
389 char cmd_buf[cmd_buf_len];
390
391 BT_COMP_OR_COMP_CLASS_LOGD(
392 self_comp, self_comp_class,
393 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
394 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
395
396 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
397 cmd.data_size = htobe64((uint64_t) sizeof(connect));
398 cmd.cmd_version = htobe32(0);
399
400 connect.viewer_session_id = -1ULL; /* will be set on recv */
401 connect.major = htobe32(LTTNG_LIVE_MAJOR);
402 connect.minor = htobe32(LTTNG_LIVE_MINOR);
403 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
404
405 /*
406 * Merge the cmd and connection request to prevent a write-write
407 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
408 * second write to be performed quickly in presence of Nagle's algorithm
409 */
410 memcpy(cmd_buf, &cmd, sizeof(cmd));
411 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
412
413 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
414 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
415 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
416 goto end;
417 }
418
419 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
420 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
421 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
422 goto end;
423 }
424
425 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
426 (uint64_t) be64toh(connect.viewer_session_id));
427 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
428 be32toh(connect.major), be32toh(connect.minor));
429
430 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
431 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
432 "Incompatible lttng-relayd protocol");
433 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
434 goto end;
435 }
436 /* Use the smallest protocol version implemented. */
437 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
438 viewer_connection->minor = be32toh(connect.minor);
439 } else {
440 viewer_connection->minor = LTTNG_LIVE_MINOR;
441 }
442 viewer_connection->major = LTTNG_LIVE_MAJOR;
443
444 status = LTTNG_LIVE_VIEWER_STATUS_OK;
445
446 goto end;
447
448 end:
449 return status;
450 }
451
452 static enum lttng_live_viewer_status
453 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
454 {
455 struct hostent *host;
456 struct sockaddr_in server_addr;
457 enum lttng_live_viewer_status status;
458 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
459 bt_self_component *self_comp = viewer_connection->self_comp;
460
461 if (parse_url(viewer_connection)) {
462 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
463 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
464 goto error;
465 }
466
467 BT_COMP_OR_COMP_CLASS_LOGD(
468 self_comp, self_comp_class,
469 "Connecting to hostname : %s, port : %d, "
470 "target hostname : %s, session name : %s, proto : %s",
471 viewer_connection->relay_hostname->str, viewer_connection->port,
472 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
473 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
474 viewer_connection->proto->str);
475
476 host = gethostbyname(viewer_connection->relay_hostname->str);
477 if (!host) {
478 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
479 "Cannot lookup hostname: hostname=\"%s\"",
480 viewer_connection->relay_hostname->str);
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
486 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
487 "Socket creation failed: %s", bt_socket_errormsg());
488 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
489 goto error;
490 }
491
492 server_addr.sin_family = AF_INET;
493 server_addr.sin_port = htons(viewer_connection->port);
494 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
495 memset(&(server_addr.sin_zero), 0, 8);
496
497 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
498 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
499 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
500 bt_socket_errormsg());
501 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
502 goto error;
503 }
504
505 status = lttng_live_handshake(viewer_connection);
506
507 /*
508 * Only print error and append cause in case of error. not in case of
509 * interruption.
510 */
511 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
512 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
513 "Viewer handshake failed");
514 goto error;
515 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
516 goto end;
517 }
518
519 goto end;
520
521 error:
522 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
523 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
524 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
525 bt_socket_errormsg());
526 }
527 }
528 viewer_connection->control_sock = BT_INVALID_SOCKET;
529 end:
530 return status;
531 }
532
533 static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
534 {
535 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
536 bt_self_component *self_comp = viewer_connection->self_comp;
537
538 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
539 return;
540 }
541 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
543 bt_socket_errormsg());
544 viewer_connection->control_sock = BT_INVALID_SOCKET;
545 }
546 }
547
548 static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
549 bool *_found, struct live_viewer_connection *viewer_connection)
550 {
551 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
552 bt_self_component *self_comp = viewer_connection->self_comp;
553 int ret = 0;
554 uint64_t i, len;
555 bt_value *map = NULL;
556 bt_value *hostname = NULL;
557 bt_value *session_name = NULL;
558 bt_value *btval = NULL;
559 bool found = false;
560
561 len = bt_value_array_get_length(results);
562 for (i = 0; i < len; i++) {
563 const char *hostname_str = NULL;
564 const char *session_name_str = NULL;
565
566 map = bt_value_array_borrow_element_by_index(results, i);
567 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
568 if (!hostname) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"target-hostname\" entry.");
571 ret = -1;
572 goto end;
573 }
574 session_name = bt_value_map_borrow_entry_value(map, "session-name");
575 if (!session_name) {
576 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
577 "Error borrowing \"session-name\" entry.");
578 ret = -1;
579 goto end;
580 }
581 hostname_str = bt_value_string_get(hostname);
582 session_name_str = bt_value_string_get(session_name);
583
584 if (strcmp(session->hostname, hostname_str) == 0 &&
585 strcmp(session->session_name, session_name_str) == 0) {
586 int64_t val;
587 uint32_t streams = be32toh(session->streams);
588 uint32_t clients = be32toh(session->clients);
589
590 found = true;
591
592 btval = bt_value_map_borrow_entry_value(map, "stream-count");
593 if (!btval) {
594 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
595 "Error borrowing \"stream-count\" entry.");
596 ret = -1;
597 goto end;
598 }
599 val = bt_value_integer_unsigned_get(btval);
600 /* sum */
601 val += streams;
602 bt_value_integer_unsigned_set(btval, val);
603
604 btval = bt_value_map_borrow_entry_value(map, "client-count");
605 if (!btval) {
606 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
607 "Error borrowing \"client-count\" entry.");
608 ret = -1;
609 goto end;
610 }
611 val = bt_value_integer_unsigned_get(btval);
612 /* max */
613 val = bt_max_t(int64_t, clients, val);
614 bt_value_integer_unsigned_set(btval, val);
615 }
616
617 if (found) {
618 break;
619 }
620 }
621 end:
622 *_found = found;
623 return ret;
624 }
625
626 static int list_append_session(bt_value *results, GString *base_url,
627 const struct lttng_viewer_session *session,
628 struct live_viewer_connection *viewer_connection)
629 {
630 int ret = 0;
631 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
632 bt_value_map_insert_entry_status insert_status;
633 bt_value_array_append_element_status append_status;
634 bt_value *map = NULL;
635 GString *url = NULL;
636 bool found = false;
637
638 /*
639 * If the session already exists, add the stream count to it,
640 * and do max of client counts.
641 */
642 ret = list_update_session(results, session, &found, viewer_connection);
643 if (ret || found) {
644 goto end;
645 }
646
647 map = bt_value_map_create();
648 if (!map) {
649 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
650 ret = -1;
651 goto end;
652 }
653
654 if (base_url->len < 1) {
655 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
656 ret = -1;
657 goto end;
658 }
659 /*
660 * key = "url",
661 * value = <string>,
662 */
663 url = g_string_new(base_url->str);
664 g_string_append(url, "/host/");
665 g_string_append(url, session->hostname);
666 g_string_append_c(url, '/');
667 g_string_append(url, session->session_name);
668
669 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
670 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
671 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
672 ret = -1;
673 goto end;
674 }
675
676 /*
677 * key = "target-hostname",
678 * value = <string>,
679 */
680 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
681 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
682 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
683 "Error inserting \"target-hostname\" entry.");
684 ret = -1;
685 goto end;
686 }
687
688 /*
689 * key = "session-name",
690 * value = <string>,
691 */
692 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
693 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
694 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
695 ret = -1;
696 goto end;
697 }
698
699 /*
700 * key = "timer-us",
701 * value = <integer>,
702 */
703 {
704 uint32_t live_timer = be32toh(session->live_timer);
705
706 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
707 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
708 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
709 ret = -1;
710 goto end;
711 }
712 }
713
714 /*
715 * key = "stream-count",
716 * value = <integer>,
717 */
718 {
719 uint32_t streams = be32toh(session->streams);
720
721 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
722 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
723 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
724 "Error inserting \"stream-count\" entry.");
725 ret = -1;
726 goto end;
727 }
728 }
729
730 /*
731 * key = "client-count",
732 * value = <integer>,
733 */
734 {
735 uint32_t clients = be32toh(session->clients);
736
737 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
738 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
739 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
740 "Error inserting \"client-count\" entry.");
741 ret = -1;
742 goto end;
743 }
744 }
745
746 append_status = bt_value_array_append_element(results, map);
747 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
748 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
749 ret = -1;
750 }
751
752 end:
753 if (url) {
754 g_string_free(url, true);
755 }
756 BT_VALUE_PUT_REF_AND_RESET(map);
757 return ret;
758 }
759
760 /*
761 * Data structure returned:
762 *
763 * {
764 * <array> = {
765 * [n] = {
766 * <map> = {
767 * {
768 * key = "url",
769 * value = <string>,
770 * },
771 * {
772 * key = "target-hostname",
773 * value = <string>,
774 * },
775 * {
776 * key = "session-name",
777 * value = <string>,
778 * },
779 * {
780 * key = "timer-us",
781 * value = <integer>,
782 * },
783 * {
784 * key = "stream-count",
785 * value = <integer>,
786 * },
787 * {
788 * key = "client-count",
789 * value = <integer>,
790 * },
791 * },
792 * }
793 * }
794 */
795
796 bt_component_class_query_method_status
797 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
798 const bt_value **user_result)
799 {
800 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
801 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
802 bt_value *result = NULL;
803 enum lttng_live_viewer_status viewer_status;
804 struct lttng_viewer_cmd cmd;
805 struct lttng_viewer_list_sessions list;
806 uint32_t i, sessions_count;
807
808 result = bt_value_array_create();
809 if (!result) {
810 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
811 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
812 goto error;
813 }
814
815 BT_LOGD("Requesting list of sessions: cmd=%s",
816 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
817
818 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
819 cmd.data_size = htobe64((uint64_t) 0);
820 cmd.cmd_version = htobe32(0);
821
822 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
823 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
824 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
825 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
826 goto error;
827 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
828 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
829 goto error;
830 }
831
832 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
833 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
834 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
835 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
836 goto error;
837 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
838 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
839 goto error;
840 }
841
842 sessions_count = be32toh(list.sessions_count);
843 for (i = 0; i < sessions_count; i++) {
844 struct lttng_viewer_session lsession;
845
846 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
847 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
848 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
849 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
850 goto error;
851 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
852 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
853 goto error;
854 }
855
856 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
857 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
858 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
859 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
860 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
861 goto error;
862 }
863 }
864
865 *user_result = result;
866 goto end;
867 error:
868 BT_VALUE_PUT_REF_AND_RESET(result);
869 end:
870 return status;
871 }
872
873 static enum lttng_live_viewer_status
874 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
875 {
876 struct lttng_viewer_cmd cmd;
877 struct lttng_viewer_list_sessions list;
878 struct lttng_viewer_session lsession;
879 uint32_t i, sessions_count;
880 uint64_t session_id;
881 enum lttng_live_viewer_status status;
882 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
883 bt_self_component *self_comp = viewer_connection->self_comp;
884 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
885
886 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
887 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
888
889 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
890 cmd.data_size = htobe64((uint64_t) 0);
891 cmd.cmd_version = htobe32(0);
892
893 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
894 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
895 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
896 goto end;
897 }
898
899 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
900 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
901 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
902 goto end;
903 }
904
905 sessions_count = be32toh(list.sessions_count);
906 for (i = 0; i < sessions_count; i++) {
907 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
908 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
909 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
910 goto end;
911 }
912 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
913 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
914 session_id = be64toh(lsession.id);
915
916 BT_COMP_LOGI("Adding session to internal list: "
917 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
918 session_id, lsession.hostname, lsession.session_name);
919
920 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
921 LTTNG_VIEWER_NAME_MAX) == 0) &&
922 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
923 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
924 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
925 lsession.session_name)) {
926 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
927 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
928 goto end;
929 }
930 }
931 }
932
933 status = LTTNG_LIVE_VIEWER_STATUS_OK;
934
935 end:
936 return status;
937 }
938
939 enum lttng_live_viewer_status
940 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
941 {
942 struct lttng_viewer_cmd cmd;
943 struct lttng_viewer_create_session_response resp;
944 enum lttng_live_viewer_status status;
945 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
946 bt_self_component *self_comp = viewer_connection->self_comp;
947 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
948
949 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
950 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
951
952 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
953 cmd.data_size = htobe64((uint64_t) 0);
954 cmd.cmd_version = htobe32(0);
955
956 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
957 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
958 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
959 goto end;
960 }
961
962 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
963 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
964 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
965 goto end;
966 }
967
968 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
969 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
970 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
971 goto end;
972 }
973
974 status = lttng_live_query_session_ids(lttng_live_msg_iter);
975 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
976 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
977 goto end;
978 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
979 goto end;
980 }
981
982 end:
983 return status;
984 }
985
986 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
987 uint32_t stream_count,
988 bt_self_message_iterator *self_msg_iter)
989 {
990 uint32_t i;
991 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
992 enum lttng_live_viewer_status status;
993 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
994 bt_self_component *self_comp = viewer_connection->self_comp;
995
996 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
997 for (i = 0; i < stream_count; i++) {
998 struct lttng_viewer_stream stream;
999 struct lttng_live_stream_iterator *live_stream;
1000 uint64_t stream_id;
1001 uint64_t ctf_trace_id;
1002
1003 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
1004 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1005 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
1006 goto end;
1007 }
1008 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1009 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1010 stream_id = be64toh(stream.id);
1011 ctf_trace_id = be64toh(stream.ctf_trace_id);
1012
1013 if (stream.metadata_flag) {
1014 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1015 stream.channel_name);
1016 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
1017 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1018 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1019 goto end;
1020 }
1021 session->lazy_stream_msg_init = true;
1022 } else {
1023 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1024 stream.channel_name);
1025 live_stream =
1026 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1027 if (!live_stream) {
1028 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1029 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1030 goto end;
1031 }
1032 }
1033 }
1034 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1035
1036 end:
1037 return status;
1038 }
1039
1040 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1041 bt_self_message_iterator *self_msg_iter)
1042 {
1043 struct lttng_viewer_cmd cmd;
1044 enum lttng_live_viewer_status status;
1045 struct lttng_viewer_attach_session_request rq;
1046 struct lttng_viewer_attach_session_response rp;
1047 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1048 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1049 bt_self_component *self_comp = viewer_connection->self_comp;
1050 uint64_t session_id = session->id;
1051 uint32_t streams_count;
1052 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1053 char cmd_buf[cmd_buf_len];
1054
1055 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1056 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1057 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1058
1059 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1060 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1061 cmd.cmd_version = htobe32(0);
1062
1063 memset(&rq, 0, sizeof(rq));
1064 rq.session_id = htobe64(session_id);
1065 // TODO: add cmd line parameter to select seek beginning
1066 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1067 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1068
1069 /*
1070 * Merge the cmd and connection request to prevent a write-write
1071 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1072 * second write to be performed quickly in presence of Nagle's algorithm.
1073 */
1074 memcpy(cmd_buf, &cmd, sizeof(cmd));
1075 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1076 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1077 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1078 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1079 goto end;
1080 }
1081
1082 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1083 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1084 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1085 goto end;
1086 }
1087
1088 streams_count = be32toh(rp.streams_count);
1089 switch (be32toh(rp.status)) {
1090 case LTTNG_VIEWER_ATTACH_OK:
1091 break;
1092 case LTTNG_VIEWER_ATTACH_UNK:
1093 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1094 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1095 goto end;
1096 case LTTNG_VIEWER_ATTACH_ALREADY:
1097 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1098 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1099 goto end;
1100 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1101 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1102 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1103 goto end;
1104 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1105 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1106 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1107 goto end;
1108 default:
1109 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1110 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1111 goto end;
1112 }
1113
1114 /* We receive the initial list of streams. */
1115 status = receive_streams(session, streams_count, self_msg_iter);
1116 switch (status) {
1117 case LTTNG_LIVE_VIEWER_STATUS_OK:
1118 break;
1119 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1120 goto end;
1121 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1122 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1123 goto end;
1124 default:
1125 bt_common_abort();
1126 }
1127
1128 session->attached = true;
1129 session->new_streams_needed = false;
1130
1131 end:
1132 return status;
1133 }
1134
1135 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
1136 {
1137 struct lttng_viewer_cmd cmd;
1138 enum lttng_live_viewer_status status;
1139 struct lttng_viewer_detach_session_request rq;
1140 struct lttng_viewer_detach_session_response rp;
1141 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1142 bt_self_component *self_comp = session->self_comp;
1143 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1144 uint64_t session_id = session->id;
1145 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1146 char cmd_buf[cmd_buf_len];
1147
1148 /*
1149 * The session might already be detached and the viewer socket might
1150 * already been closed. This happens when calling this function when
1151 * tearing down the graph after an error.
1152 */
1153 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1154 return LTTNG_LIVE_VIEWER_STATUS_OK;
1155 }
1156
1157 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1158 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1159
1160 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1161 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1162 cmd.cmd_version = htobe32(0);
1163
1164 memset(&rq, 0, sizeof(rq));
1165 rq.session_id = htobe64(session_id);
1166
1167 /*
1168 * Merge the cmd and connection request to prevent a write-write
1169 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1170 * second write to be performed quickly in presence of Nagle's algorithm.
1171 */
1172 memcpy(cmd_buf, &cmd, sizeof(cmd));
1173 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1174 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1175 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1176 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1177 goto end;
1178 }
1179
1180 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1181 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1182 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1183 goto end;
1184 }
1185
1186 switch (be32toh(rp.status)) {
1187 case LTTNG_VIEWER_DETACH_SESSION_OK:
1188 break;
1189 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1190 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1191 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1192 goto end;
1193 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1194 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1195 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1196 goto end;
1197 default:
1198 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1199 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1200 goto end;
1201 }
1202
1203 session->attached = false;
1204
1205 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1206
1207 end:
1208 return status;
1209 }
1210
1211 enum lttng_live_get_one_metadata_status
1212 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1213 {
1214 uint64_t len = 0;
1215 enum lttng_live_get_one_metadata_status status;
1216 enum lttng_live_viewer_status viewer_status;
1217 struct lttng_viewer_cmd cmd;
1218 struct lttng_viewer_get_metadata rq;
1219 struct lttng_viewer_metadata_packet rp;
1220 gchar *data = NULL;
1221 ssize_t writelen;
1222 struct lttng_live_session *session = trace->session;
1223 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1224 struct lttng_live_metadata *metadata = trace->metadata;
1225 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1226 bt_self_component *self_comp = viewer_connection->self_comp;
1227 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1228 char cmd_buf[cmd_buf_len];
1229
1230 BT_COMP_LOGD("Requesting new metadata for trace:"
1231 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1232 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1233 metadata->stream_id);
1234
1235 rq.stream_id = htobe64(metadata->stream_id);
1236 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1237 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1238 cmd.cmd_version = htobe32(0);
1239
1240 /*
1241 * Merge the cmd and connection request to prevent a write-write
1242 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1243 * second write to be performed quickly in presence of Nagle's algorithm.
1244 */
1245 memcpy(cmd_buf, &cmd, sizeof(cmd));
1246 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1247 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1248 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1249 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1250 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1251 goto end;
1252 }
1253
1254 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1255 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1256 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1257 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1258 goto end;
1259 }
1260
1261 switch (be32toh(rp.status)) {
1262 case LTTNG_VIEWER_METADATA_OK:
1263 BT_COMP_LOGD("Received get_metadata response: ok");
1264 break;
1265 case LTTNG_VIEWER_NO_NEW_METADATA:
1266 BT_COMP_LOGD("Received get_metadata response: no new");
1267 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1268 goto end;
1269 case LTTNG_VIEWER_METADATA_ERR:
1270 /*
1271 * The Relayd cannot find this stream id. Maybe its
1272 * gone already. This can happen in short lived UST app
1273 * in a per-pid session.
1274 */
1275 BT_COMP_LOGD("Received get_metadata response: error");
1276 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1277 goto end;
1278 default:
1279 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1280 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1281 goto end;
1282 }
1283
1284 len = be64toh(rp.len);
1285 if (len == 0) {
1286 /*
1287 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1288 * length of 0. This means we must try again. This scenario
1289 * arises when a clear command is performed on an lttng session.
1290 */
1291 BT_COMP_LOGD(
1292 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1293 goto empty_metadata_packet_retry;
1294 }
1295
1296 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1297 if (len <= 0) {
1298 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1299 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1300 goto end;
1301 }
1302
1303 data = g_new0(gchar, len);
1304 if (!data) {
1305 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1306 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1307 goto end;
1308 }
1309
1310 viewer_status = lttng_live_recv(viewer_connection, data, len);
1311 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1312 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1313 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1314 goto end;
1315 }
1316
1317 /*
1318 * Write the metadata to the file handle.
1319 */
1320 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1321 if (writelen != len) {
1322 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1323 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1324 goto end;
1325 }
1326
1327 empty_metadata_packet_retry:
1328 *reply_len = len;
1329 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1330
1331 end:
1332 g_free(data);
1333 return status;
1334 }
1335
1336 /*
1337 * Assign the fields from a lttng_viewer_index to a packet_index.
1338 */
1339 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1340 struct packet_index *pindex)
1341 {
1342 BT_ASSERT(lindex);
1343 BT_ASSERT(pindex);
1344
1345 pindex->offset = be64toh(lindex->offset);
1346 pindex->packet_size = be64toh(lindex->packet_size);
1347 pindex->content_size = be64toh(lindex->content_size);
1348 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1349 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1350 pindex->events_discarded = be64toh(lindex->events_discarded);
1351 }
1352
1353 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1354 {
1355 uint64_t session_idx;
1356 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1357
1358 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1359 struct lttng_live_session *session =
1360 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1361 BT_COMP_LOGD("Marking session as needing new streams: "
1362 "session-id=%" PRIu64,
1363 session->id);
1364 session->new_streams_needed = true;
1365 }
1366 }
1367
1368 enum lttng_live_iterator_status
1369 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1370 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1371 {
1372 struct lttng_viewer_cmd cmd;
1373 struct lttng_viewer_get_next_index rq;
1374 enum lttng_live_viewer_status viewer_status;
1375 struct lttng_viewer_index rp;
1376 enum lttng_live_iterator_status status;
1377 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1378 bt_self_component *self_comp = viewer_connection->self_comp;
1379 struct lttng_live_trace *trace = stream->trace;
1380 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1381 char cmd_buf[cmd_buf_len];
1382 uint32_t flags, rp_status;
1383
1384 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1385 "viewer-stream-id=%" PRIu64,
1386 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1387 stream->viewer_stream_id);
1388 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1389 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1390 cmd.cmd_version = htobe32(0);
1391
1392 memset(&rq, 0, sizeof(rq));
1393 rq.stream_id = htobe64(stream->viewer_stream_id);
1394
1395 /*
1396 * Merge the cmd and connection request to prevent a write-write
1397 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1398 * second write to be performed quickly in presence of Nagle's algorithm.
1399 */
1400 memcpy(cmd_buf, &cmd, sizeof(cmd));
1401 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1402
1403 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1404 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1405 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1406 goto error;
1407 }
1408
1409 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1410 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1411 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1412 goto error;
1413 }
1414
1415 flags = be32toh(rp.flags);
1416 rp_status = be32toh(rp.status);
1417
1418 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1419 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1420 lttng_viewer_next_index_return_code_string(rp_status));
1421 switch (rp_status) {
1422 case LTTNG_VIEWER_INDEX_INACTIVE:
1423 {
1424 uint64_t ctf_stream_class_id;
1425
1426 memset(index, 0, sizeof(struct packet_index));
1427 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1428 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1429 ctf_stream_class_id = be64toh(rp.stream_id);
1430 if (stream->ctf_stream_class_id.is_set) {
1431 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1432 } else {
1433 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1434 stream->ctf_stream_class_id.is_set = true;
1435 }
1436 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1437 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1438 break;
1439 }
1440 case LTTNG_VIEWER_INDEX_OK:
1441 {
1442 uint64_t ctf_stream_class_id;
1443
1444 lttng_index_to_packet_index(&rp, index);
1445 ctf_stream_class_id = be64toh(rp.stream_id);
1446 if (stream->ctf_stream_class_id.is_set) {
1447 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1448 } else {
1449 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1450 stream->ctf_stream_class_id.is_set = true;
1451 }
1452
1453 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1454
1455 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1456 BT_COMP_LOGD("Marking trace as needing new metadata: "
1457 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1458 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1459 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1460 }
1461 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1462 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1463 "response=%s, response-flag=NEW_STREAM",
1464 lttng_viewer_next_index_return_code_string(rp_status));
1465 lttng_live_need_new_streams(lttng_live_msg_iter);
1466 }
1467 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1468 break;
1469 }
1470 case LTTNG_VIEWER_INDEX_RETRY:
1471 memset(index, 0, sizeof(struct packet_index));
1472 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1473 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1474 goto end;
1475 case LTTNG_VIEWER_INDEX_HUP:
1476 memset(index, 0, sizeof(struct packet_index));
1477 index->offset = EOF;
1478 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1479 stream->has_stream_hung_up = true;
1480 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1481 break;
1482 case LTTNG_VIEWER_INDEX_ERR:
1483 memset(index, 0, sizeof(struct packet_index));
1484 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1485 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1486 goto end;
1487 default:
1488 BT_COMP_LOGD("Received get_next_index response: unknown value");
1489 memset(index, 0, sizeof(struct packet_index));
1490 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1491 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1492 goto end;
1493 }
1494 goto end;
1495
1496 error:
1497 status = viewer_status_to_live_iterator_status(viewer_status);
1498 end:
1499 return status;
1500 }
1501
1502 enum ctf_msg_iter_medium_status
1503 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1504 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1505 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1506 {
1507 enum ctf_msg_iter_medium_status status;
1508 enum lttng_live_viewer_status viewer_status;
1509 struct lttng_viewer_trace_packet rp;
1510 struct lttng_viewer_cmd cmd;
1511 struct lttng_viewer_get_packet rq;
1512 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1513 bt_self_component *self_comp = viewer_connection->self_comp;
1514 struct lttng_live_trace *trace = stream->trace;
1515 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1516 char cmd_buf[cmd_buf_len];
1517 uint32_t flags, rp_status;
1518
1519 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1520 "offset=%" PRIu64 ", request-len=%" PRIu64,
1521 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1522
1523 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1524 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1525 cmd.cmd_version = htobe32(0);
1526
1527 memset(&rq, 0, sizeof(rq));
1528 rq.stream_id = htobe64(stream->viewer_stream_id);
1529 rq.offset = htobe64(offset);
1530 rq.len = htobe32(req_len);
1531
1532 /*
1533 * Merge the cmd and connection request to prevent a write-write
1534 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1535 * second write to be performed quickly in presence of Nagle's algorithm.
1536 */
1537 memcpy(cmd_buf, &cmd, sizeof(cmd));
1538 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1539
1540 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1541 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1542 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1543 goto error_convert_status;
1544 }
1545
1546 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1547 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1548 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1549 goto error_convert_status;
1550 }
1551
1552 flags = be32toh(rp.flags);
1553 rp_status = be32toh(rp.status);
1554
1555 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1556 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1557 lttng_viewer_get_packet_return_code_string(rp_status));
1558 switch (rp_status) {
1559 case LTTNG_VIEWER_GET_PACKET_OK:
1560 req_len = be32toh(rp.len);
1561 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1562 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
1563 break;
1564 case LTTNG_VIEWER_GET_PACKET_RETRY:
1565 /* Unimplemented by relay daemon */
1566 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1567 goto end;
1568 case LTTNG_VIEWER_GET_PACKET_ERR:
1569 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1570 BT_COMP_LOGD("Marking trace as needing new metadata: "
1571 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1572 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1573 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1574 }
1575 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1576 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1577 "response=%s, response-flag=NEW_STREAM",
1578 lttng_viewer_next_index_return_code_string(rp_status));
1579 lttng_live_need_new_streams(lttng_live_msg_iter);
1580 }
1581 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1582 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1583 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1584 lttng_viewer_get_packet_return_code_string(rp_status));
1585 goto end;
1586 }
1587 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1588 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1589 goto end;
1590 case LTTNG_VIEWER_GET_PACKET_EOF:
1591 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1592 goto end;
1593 default:
1594 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1595 rp_status);
1596 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1597 goto end;
1598 }
1599
1600 if (req_len == 0) {
1601 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1602 goto end;
1603 }
1604
1605 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1606 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1607 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1608 goto error_convert_status;
1609 }
1610 *recv_len = req_len;
1611
1612 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1613 goto end;
1614
1615 error_convert_status:
1616 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1617 end:
1618 return status;
1619 }
1620
1621 /*
1622 * Request new streams for a session.
1623 */
1624 enum lttng_live_iterator_status
1625 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1626 bt_self_message_iterator *self_msg_iter)
1627 {
1628 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1629 struct lttng_viewer_cmd cmd;
1630 struct lttng_viewer_new_streams_request rq;
1631 struct lttng_viewer_new_streams_response rp;
1632 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1633 enum lttng_live_viewer_status viewer_status;
1634 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1635 bt_self_component *self_comp = viewer_connection->self_comp;
1636 uint32_t streams_count;
1637 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1638 char cmd_buf[cmd_buf_len];
1639
1640 if (!session->new_streams_needed) {
1641 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1642 goto end;
1643 }
1644
1645 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1646 "session-id=%" PRIu64,
1647 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1648
1649 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1650 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1651 cmd.cmd_version = htobe32(0);
1652
1653 memset(&rq, 0, sizeof(rq));
1654 rq.session_id = htobe64(session->id);
1655
1656 /*
1657 * Merge the cmd and connection request to prevent a write-write
1658 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1659 * second write to be performed quickly in presence of Nagle's algorithm.
1660 */
1661 memcpy(cmd_buf, &cmd, sizeof(cmd));
1662 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1663
1664 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1665 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1666 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1667 status = viewer_status_to_live_iterator_status(viewer_status);
1668 goto end;
1669 }
1670
1671 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1672 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1673 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1674 status = viewer_status_to_live_iterator_status(viewer_status);
1675 goto end;
1676 }
1677
1678 streams_count = be32toh(rp.streams_count);
1679
1680 switch (be32toh(rp.status)) {
1681 case LTTNG_VIEWER_NEW_STREAMS_OK:
1682 session->new_streams_needed = false;
1683 break;
1684 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1685 session->new_streams_needed = false;
1686 goto end;
1687 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1688 session->new_streams_needed = false;
1689 session->closed = true;
1690 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1691 goto end;
1692 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1693 BT_COMP_LOGD("Received get_new_streams response: error");
1694 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1695 goto end;
1696 default:
1697 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1698 "Received get_new_streams response: Unknown:"
1699 "return code %u",
1700 be32toh(rp.status));
1701 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1702 goto end;
1703 }
1704
1705 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1706 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1707 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1708 status = viewer_status_to_live_iterator_status(viewer_status);
1709 goto end;
1710 }
1711
1712 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1713 end:
1714 return status;
1715 }
1716
1717 enum lttng_live_viewer_status live_viewer_connection_create(
1718 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1719 bt_logging_level log_level, const char *url, bool in_query,
1720 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
1721 {
1722 struct live_viewer_connection *viewer_connection;
1723 enum lttng_live_viewer_status status;
1724
1725 viewer_connection = g_new0(struct live_viewer_connection, 1);
1726
1727 if (bt_socket_init(log_level) != 0) {
1728 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1729 "Failed to init socket");
1730 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1731 goto error;
1732 }
1733
1734 viewer_connection->log_level = log_level;
1735
1736 viewer_connection->self_comp = self_comp;
1737 viewer_connection->self_comp_class = self_comp_class;
1738
1739 viewer_connection->control_sock = BT_INVALID_SOCKET;
1740 viewer_connection->port = -1;
1741 viewer_connection->in_query = in_query;
1742 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1743 viewer_connection->url = g_string_new(url);
1744 if (!viewer_connection->url) {
1745 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1746 "Failed to allocate URL buffer");
1747 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1748 goto error;
1749 }
1750
1751 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1752 "Establishing connection to url \"%s\"...", url);
1753 status = lttng_live_connect_viewer(viewer_connection);
1754 /*
1755 * Only print error and append cause in case of error. not in case of
1756 * interruption.
1757 */
1758 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1759 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1760 "Failed to establish connection: "
1761 "url=\"%s\"",
1762 url);
1763 goto error;
1764 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1765 goto error;
1766 }
1767 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1768 "Connection to url \"%s\" is established", url);
1769
1770 *viewer = viewer_connection;
1771 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1772 goto end;
1773
1774 error:
1775 if (viewer_connection) {
1776 live_viewer_connection_destroy(viewer_connection);
1777 }
1778 end:
1779 return status;
1780 }
1781
1782 void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
1783 {
1784 bt_self_component *self_comp = viewer_connection->self_comp;
1785 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1786
1787 if (!viewer_connection) {
1788 goto end;
1789 }
1790
1791 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1792 "Closing connection to relay:"
1793 "relay-url=\"%s\"",
1794 viewer_connection->url->str);
1795
1796 lttng_live_disconnect_viewer(viewer_connection);
1797
1798 if (viewer_connection->url) {
1799 g_string_free(viewer_connection->url, true);
1800 }
1801
1802 if (viewer_connection->relay_hostname) {
1803 g_string_free(viewer_connection->relay_hostname, true);
1804 }
1805
1806 if (viewer_connection->target_hostname) {
1807 g_string_free(viewer_connection->target_hostname, true);
1808 }
1809
1810 if (viewer_connection->session_name) {
1811 g_string_free(viewer_connection->session_name, true);
1812 }
1813
1814 if (viewer_connection->proto) {
1815 g_string_free(viewer_connection->proto, true);
1816 }
1817
1818 g_free(viewer_connection);
1819
1820 bt_socket_fini();
1821
1822 end:
1823 return;
1824 }
This page took 0.1054 seconds and 4 git commands to generate.