Visibility hidden by default
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
9 #define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
10 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
11 #include "logging/comp-logging.h"
12
13 #include <fcntl.h>
14 #include <stdbool.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20
21 #include <glib.h>
22
23 #include "compat/socket.h"
24 #include "compat/endian.h"
25 #include "compat/compiler.h"
26 #include "common/common.h"
27 #include <babeltrace2/babeltrace.h>
28
29 #include "lttng-live.hpp"
30 #include "viewer-connection.hpp"
31 #include "lttng-viewer-abi.hpp"
32 #include "data-stream.hpp"
33 #include "metadata.hpp"
34
35 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
36 do { \
37 switch (_status) { \
38 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
39 break; \
40 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
41 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
42 "Error " _action " " _msg_str); \
43 break; \
44 default: \
45 bt_common_abort(); \
46 } \
47 } while (0)
48
49 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
50 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
51
52 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
53 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
54
55 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
56 _fmt, ...) \
57 do { \
58 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
59 bt_socket_errormsg(), ##__VA_ARGS__); \
60 } while (0)
61
62 static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
63 {
64 switch (cmd) {
65 case LTTNG_VIEWER_CONNECT:
66 return "CONNECT";
67 case LTTNG_VIEWER_LIST_SESSIONS:
68 return "LIST_SESSIONS";
69 case LTTNG_VIEWER_ATTACH_SESSION:
70 return "ATTACH_SESSION";
71 case LTTNG_VIEWER_GET_NEXT_INDEX:
72 return "GET_NEXT_INDEX";
73 case LTTNG_VIEWER_GET_PACKET:
74 return "GET_PACKET";
75 case LTTNG_VIEWER_GET_METADATA:
76 return "GET_METADATA";
77 case LTTNG_VIEWER_GET_NEW_STREAMS:
78 return "GET_NEW_STREAMS";
79 case LTTNG_VIEWER_CREATE_SESSION:
80 return "CREATE_SESSION";
81 case LTTNG_VIEWER_DETACH_SESSION:
82 return "DETACH_SESSION";
83 }
84
85 bt_common_abort();
86 }
87
88 static const char *
89 lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
90 {
91 switch (code) {
92 case LTTNG_VIEWER_INDEX_OK:
93 return "INDEX_OK";
94 case LTTNG_VIEWER_INDEX_RETRY:
95 return "INDEX_RETRY";
96 case LTTNG_VIEWER_INDEX_HUP:
97 return "INDEX_HUP";
98 case LTTNG_VIEWER_INDEX_ERR:
99 return "INDEX_ERR";
100 case LTTNG_VIEWER_INDEX_INACTIVE:
101 return "INDEX_INACTIVE";
102 case LTTNG_VIEWER_INDEX_EOF:
103 return "INDEX_EOF";
104 }
105
106 bt_common_abort();
107 }
108
109 static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
110 {
111 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
112 }
113
114 static const char *
115 lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
116 {
117 switch (code) {
118 case LTTNG_VIEWER_GET_PACKET_OK:
119 return "GET_PACKET_OK";
120 case LTTNG_VIEWER_GET_PACKET_RETRY:
121 return "GET_PACKET_RETRY";
122 case LTTNG_VIEWER_GET_PACKET_ERR:
123 return "GET_PACKET_ERR";
124 case LTTNG_VIEWER_GET_PACKET_EOF:
125 return "GET_PACKET_EOF";
126 }
127
128 bt_common_abort();
129 };
130
131 static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
132 {
133 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
134 }
135
136 static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
137 {
138 switch (seek) {
139 case LTTNG_VIEWER_SEEK_BEGINNING:
140 return "SEEK_BEGINNING";
141 case LTTNG_VIEWER_SEEK_LAST:
142 return "SEEK_LAST";
143 }
144
145 bt_common_abort();
146 }
147
148 static inline enum lttng_live_iterator_status
149 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
150 {
151 switch (viewer_status) {
152 case LTTNG_LIVE_VIEWER_STATUS_OK:
153 return LTTNG_LIVE_ITERATOR_STATUS_OK;
154 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
155 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
156 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
157 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
158 }
159
160 bt_common_abort();
161 }
162
163 static inline enum ctf_msg_iter_medium_status
164 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
165 {
166 switch (viewer_status) {
167 case LTTNG_LIVE_VIEWER_STATUS_OK:
168 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
169 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
170 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
171 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
172 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
173 }
174
175 bt_common_abort();
176 }
177
178 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
179 {
180 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
181 bt_self_component *self_comp = viewer_connection->self_comp;
182 int ret = bt_socket_close(viewer_connection->control_sock);
183 if (ret == -1) {
184 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
185 "Error closing viewer connection socket: ", ".");
186 }
187
188 viewer_connection->control_sock = BT_INVALID_SOCKET;
189 }
190
191 /*
192 * This function receives a message from the Relay daemon.
193 * If it received the entire message, it returns _OK,
194 * If it's interrupted, it returns _INTERRUPTED,
195 * otherwise, it returns _ERROR.
196 */
197 static enum lttng_live_viewer_status
198 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
199 {
200 ssize_t received;
201 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
202 bt_self_component *self_comp = viewer_connection->self_comp;
203 size_t total_received = 0, to_receive = len;
204 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
205 enum lttng_live_viewer_status status;
206 BT_SOCKET sock = viewer_connection->control_sock;
207
208 /*
209 * Receive a message from the Relay.
210 */
211 do {
212 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
213 if (received == BT_SOCKET_ERROR) {
214 if (bt_socket_interrupted()) {
215 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
216 /*
217 * This interruption was due to a
218 * SIGINT and the graph is being torn
219 * down.
220 */
221 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
222 lttng_live_msg_iter->was_interrupted = true;
223 goto end;
224 } else {
225 /*
226 * A signal was received, but the graph
227 * is not being torn down. Carry on.
228 */
229 continue;
230 }
231 } else {
232 /*
233 * For any other types of socket error, close
234 * the socket and return an error.
235 */
236 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
237 self_comp, self_comp_class, "Error receiving from Relay", ".");
238
239 viewer_connection_close_socket(viewer_connection);
240 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
241 goto end;
242 }
243 } else if (received == 0) {
244 /*
245 * The recv() call returned 0. This means the
246 * connection was orderly shutdown from the other peer.
247 * If that happens when we are trying to receive
248 * a message from it, it means something when wrong.
249 * Close the socket and return an error.
250 */
251 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
252 "Remote side has closed connection");
253 viewer_connection_close_socket(viewer_connection);
254 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
255 goto end;
256 }
257
258 BT_ASSERT(received <= to_receive);
259 total_received += received;
260 to_receive -= received;
261
262 } while (to_receive > 0);
263
264 BT_ASSERT(total_received == len);
265 status = LTTNG_LIVE_VIEWER_STATUS_OK;
266
267 end:
268 return status;
269 }
270
271 /*
272 * This function sends a message to the Relay daemon.
273 * If it send the message, it returns _OK,
274 * If it's interrupted, it returns _INTERRUPTED,
275 * otherwise, it returns _ERROR.
276 */
277 static enum lttng_live_viewer_status
278 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
279 {
280 enum lttng_live_viewer_status status;
281 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
282 bt_self_component *self_comp = viewer_connection->self_comp;
283 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
284 BT_SOCKET sock = viewer_connection->control_sock;
285 size_t to_send = len;
286 ssize_t total_sent = 0;
287
288 do {
289 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
290 if (sent == BT_SOCKET_ERROR) {
291 if (bt_socket_interrupted()) {
292 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
293 /*
294 * This interruption was a SIGINT and
295 * the graph is being teared down.
296 */
297 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
298 lttng_live_msg_iter->was_interrupted = true;
299 goto end;
300 } else {
301 /*
302 * A signal was received, but the graph
303 * is not being teared down. Carry on.
304 */
305 continue;
306 }
307 } else {
308 /*
309 * For any other types of socket error, close
310 * the socket and return an error.
311 */
312 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
313 self_comp, self_comp_class, "Error sending to Relay", ".");
314
315 viewer_connection_close_socket(viewer_connection);
316 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
317 goto end;
318 }
319 }
320
321 BT_ASSERT(sent <= to_send);
322 total_sent += sent;
323 to_send -= sent;
324
325 } while (to_send > 0);
326
327 BT_ASSERT(total_sent == len);
328 status = LTTNG_LIVE_VIEWER_STATUS_OK;
329
330 end:
331 return status;
332 }
333
334 static int parse_url(struct live_viewer_connection *viewer_connection)
335 {
336 char error_buf[256] = {0};
337 bt_self_component *self_comp = viewer_connection->self_comp;
338 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
339 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {0};
340 int ret = -1;
341 const char *path = viewer_connection->url->str;
342
343 if (!path) {
344 goto end;
345 }
346
347 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
348 if (!lttng_live_url_parts.proto) {
349 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
350 "Invalid LTTng live URL format: %s", error_buf);
351 goto end;
352 }
353 viewer_connection->proto = lttng_live_url_parts.proto;
354 lttng_live_url_parts.proto = NULL;
355
356 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
357 lttng_live_url_parts.hostname = NULL;
358
359 if (lttng_live_url_parts.port >= 0) {
360 viewer_connection->port = lttng_live_url_parts.port;
361 } else {
362 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
363 }
364
365 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
366 lttng_live_url_parts.target_hostname = NULL;
367
368 if (lttng_live_url_parts.session_name) {
369 viewer_connection->session_name = lttng_live_url_parts.session_name;
370 lttng_live_url_parts.session_name = NULL;
371 }
372
373 ret = 0;
374
375 end:
376 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
377 return ret;
378 }
379
380 static enum lttng_live_viewer_status
381 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
382 {
383 struct lttng_viewer_cmd cmd;
384 struct lttng_viewer_connect connect;
385 enum lttng_live_viewer_status status;
386 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
387 bt_self_component *self_comp = viewer_connection->self_comp;
388 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
389 char cmd_buf[cmd_buf_len];
390
391 BT_COMP_OR_COMP_CLASS_LOGD(
392 self_comp, self_comp_class,
393 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
394 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
395
396 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
397 cmd.data_size = htobe64((uint64_t) sizeof(connect));
398 cmd.cmd_version = htobe32(0);
399
400 connect.viewer_session_id = -1ULL; /* will be set on recv */
401 connect.major = htobe32(LTTNG_LIVE_MAJOR);
402 connect.minor = htobe32(LTTNG_LIVE_MINOR);
403 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
404
405 /*
406 * Merge the cmd and connection request to prevent a write-write
407 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
408 * second write to be performed quickly in presence of Nagle's algorithm
409 */
410 memcpy(cmd_buf, &cmd, sizeof(cmd));
411 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
412
413 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
414 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
415 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
416 goto end;
417 }
418
419 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
420 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
421 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
422 goto end;
423 }
424
425 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
426 (uint64_t) be64toh(connect.viewer_session_id));
427 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
428 be32toh(connect.major), be32toh(connect.minor));
429
430 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
431 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
432 "Incompatible lttng-relayd protocol");
433 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
434 goto end;
435 }
436 /* Use the smallest protocol version implemented. */
437 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
438 viewer_connection->minor = be32toh(connect.minor);
439 } else {
440 viewer_connection->minor = LTTNG_LIVE_MINOR;
441 }
442 viewer_connection->major = LTTNG_LIVE_MAJOR;
443
444 status = LTTNG_LIVE_VIEWER_STATUS_OK;
445
446 goto end;
447
448 end:
449 return status;
450 }
451
452 static enum lttng_live_viewer_status
453 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
454 {
455 struct hostent *host;
456 struct sockaddr_in server_addr;
457 enum lttng_live_viewer_status status;
458 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
459 bt_self_component *self_comp = viewer_connection->self_comp;
460
461 if (parse_url(viewer_connection)) {
462 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
463 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
464 goto error;
465 }
466
467 BT_COMP_OR_COMP_CLASS_LOGD(
468 self_comp, self_comp_class,
469 "Connecting to hostname : %s, port : %d, "
470 "target hostname : %s, session name : %s, proto : %s",
471 viewer_connection->relay_hostname->str, viewer_connection->port,
472 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
473 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
474 viewer_connection->proto->str);
475
476 host = gethostbyname(viewer_connection->relay_hostname->str);
477 if (!host) {
478 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
479 "Cannot lookup hostname: hostname=\"%s\"",
480 viewer_connection->relay_hostname->str);
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
486 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
487 "Socket creation failed: %s", bt_socket_errormsg());
488 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
489 goto error;
490 }
491
492 server_addr.sin_family = AF_INET;
493 server_addr.sin_port = htons(viewer_connection->port);
494 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
495 memset(&(server_addr.sin_zero), 0, 8);
496
497 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
498 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
499 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
500 bt_socket_errormsg());
501 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
502 goto error;
503 }
504
505 status = lttng_live_handshake(viewer_connection);
506
507 /*
508 * Only print error and append cause in case of error. not in case of
509 * interruption.
510 */
511 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
512 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
513 "Viewer handshake failed");
514 goto error;
515 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
516 goto end;
517 }
518
519 goto end;
520
521 error:
522 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
523 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
524 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
525 bt_socket_errormsg());
526 }
527 }
528 viewer_connection->control_sock = BT_INVALID_SOCKET;
529 end:
530 return status;
531 }
532
533 static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
534 {
535 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
536 bt_self_component *self_comp = viewer_connection->self_comp;
537
538 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
539 return;
540 }
541 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
543 bt_socket_errormsg());
544 viewer_connection->control_sock = BT_INVALID_SOCKET;
545 }
546 }
547
548 static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
549 bool *_found, struct live_viewer_connection *viewer_connection)
550 {
551 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
552 bt_self_component *self_comp = viewer_connection->self_comp;
553 int ret = 0;
554 uint64_t i, len;
555 bt_value *map = NULL;
556 bt_value *hostname = NULL;
557 bt_value *session_name = NULL;
558 bt_value *btval = NULL;
559 bool found = false;
560
561 len = bt_value_array_get_length(results);
562 for (i = 0; i < len; i++) {
563 const char *hostname_str = NULL;
564 const char *session_name_str = NULL;
565
566 map = bt_value_array_borrow_element_by_index(results, i);
567 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
568 if (!hostname) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"target-hostname\" entry.");
571 ret = -1;
572 goto end;
573 }
574 session_name = bt_value_map_borrow_entry_value(map, "session-name");
575 if (!session_name) {
576 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
577 "Error borrowing \"session-name\" entry.");
578 ret = -1;
579 goto end;
580 }
581 hostname_str = bt_value_string_get(hostname);
582 session_name_str = bt_value_string_get(session_name);
583
584 if (strcmp(session->hostname, hostname_str) == 0 &&
585 strcmp(session->session_name, session_name_str) == 0) {
586 int64_t val;
587 uint32_t streams = be32toh(session->streams);
588 uint32_t clients = be32toh(session->clients);
589
590 found = true;
591
592 btval = bt_value_map_borrow_entry_value(map, "stream-count");
593 if (!btval) {
594 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
595 "Error borrowing \"stream-count\" entry.");
596 ret = -1;
597 goto end;
598 }
599 val = bt_value_integer_unsigned_get(btval);
600 /* sum */
601 val += streams;
602 bt_value_integer_unsigned_set(btval, val);
603
604 btval = bt_value_map_borrow_entry_value(map, "client-count");
605 if (!btval) {
606 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
607 "Error borrowing \"client-count\" entry.");
608 ret = -1;
609 goto end;
610 }
611 val = bt_value_integer_unsigned_get(btval);
612 /* max */
613 val = bt_max_t(int64_t, clients, val);
614 bt_value_integer_unsigned_set(btval, val);
615 }
616
617 if (found) {
618 break;
619 }
620 }
621 end:
622 *_found = found;
623 return ret;
624 }
625
626 static int list_append_session(bt_value *results, GString *base_url,
627 const struct lttng_viewer_session *session,
628 struct live_viewer_connection *viewer_connection)
629 {
630 int ret = 0;
631 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
632 bt_value_map_insert_entry_status insert_status;
633 bt_value_array_append_element_status append_status;
634 bt_value *map = NULL;
635 GString *url = NULL;
636 bool found = false;
637
638 /*
639 * If the session already exists, add the stream count to it,
640 * and do max of client counts.
641 */
642 ret = list_update_session(results, session, &found, viewer_connection);
643 if (ret || found) {
644 goto end;
645 }
646
647 map = bt_value_map_create();
648 if (!map) {
649 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
650 ret = -1;
651 goto end;
652 }
653
654 if (base_url->len < 1) {
655 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
656 ret = -1;
657 goto end;
658 }
659 /*
660 * key = "url",
661 * value = <string>,
662 */
663 url = g_string_new(base_url->str);
664 g_string_append(url, "/host/");
665 g_string_append(url, session->hostname);
666 g_string_append_c(url, '/');
667 g_string_append(url, session->session_name);
668
669 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
670 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
671 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
672 ret = -1;
673 goto end;
674 }
675
676 /*
677 * key = "target-hostname",
678 * value = <string>,
679 */
680 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
681 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
682 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
683 "Error inserting \"target-hostname\" entry.");
684 ret = -1;
685 goto end;
686 }
687
688 /*
689 * key = "session-name",
690 * value = <string>,
691 */
692 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
693 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
694 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
695 ret = -1;
696 goto end;
697 }
698
699 /*
700 * key = "timer-us",
701 * value = <integer>,
702 */
703 {
704 uint32_t live_timer = be32toh(session->live_timer);
705
706 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
707 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
708 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
709 ret = -1;
710 goto end;
711 }
712 }
713
714 /*
715 * key = "stream-count",
716 * value = <integer>,
717 */
718 {
719 uint32_t streams = be32toh(session->streams);
720
721 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
722 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
723 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
724 "Error inserting \"stream-count\" entry.");
725 ret = -1;
726 goto end;
727 }
728 }
729
730 /*
731 * key = "client-count",
732 * value = <integer>,
733 */
734 {
735 uint32_t clients = be32toh(session->clients);
736
737 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
738 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
739 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
740 "Error inserting \"client-count\" entry.");
741 ret = -1;
742 goto end;
743 }
744 }
745
746 append_status = bt_value_array_append_element(results, map);
747 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
748 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
749 ret = -1;
750 }
751
752 end:
753 if (url) {
754 g_string_free(url, true);
755 }
756 BT_VALUE_PUT_REF_AND_RESET(map);
757 return ret;
758 }
759
760 /*
761 * Data structure returned:
762 *
763 * {
764 * <array> = {
765 * [n] = {
766 * <map> = {
767 * {
768 * key = "url",
769 * value = <string>,
770 * },
771 * {
772 * key = "target-hostname",
773 * value = <string>,
774 * },
775 * {
776 * key = "session-name",
777 * value = <string>,
778 * },
779 * {
780 * key = "timer-us",
781 * value = <integer>,
782 * },
783 * {
784 * key = "stream-count",
785 * value = <integer>,
786 * },
787 * {
788 * key = "client-count",
789 * value = <integer>,
790 * },
791 * },
792 * }
793 * }
794 */
795
796 bt_component_class_query_method_status
797 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
798 const bt_value **user_result)
799 {
800 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
801 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
802 bt_value *result = NULL;
803 enum lttng_live_viewer_status viewer_status;
804 struct lttng_viewer_cmd cmd;
805 struct lttng_viewer_list_sessions list;
806 uint32_t i, sessions_count;
807
808 result = bt_value_array_create();
809 if (!result) {
810 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
811 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
812 goto error;
813 }
814
815 BT_LOGD("Requesting list of sessions: cmd=%s",
816 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
817
818 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
819 cmd.data_size = htobe64((uint64_t) 0);
820 cmd.cmd_version = htobe32(0);
821
822 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
823 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
824 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
825 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
826 goto error;
827 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
828 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
829 goto error;
830 }
831
832 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
833 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
834 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
835 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
836 goto error;
837 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
838 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
839 goto error;
840 }
841
842 sessions_count = be32toh(list.sessions_count);
843 for (i = 0; i < sessions_count; i++) {
844 struct lttng_viewer_session lsession;
845
846 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
847 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
848 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
849 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
850 goto error;
851 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
852 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
853 goto error;
854 }
855
856 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
857 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
858 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
859 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
860 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
861 goto error;
862 }
863 }
864
865 *user_result = result;
866 goto end;
867 error:
868 BT_VALUE_PUT_REF_AND_RESET(result);
869 end:
870 return status;
871 }
872
873 static enum lttng_live_viewer_status
874 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
875 {
876 struct lttng_viewer_cmd cmd;
877 struct lttng_viewer_list_sessions list;
878 struct lttng_viewer_session lsession;
879 uint32_t i, sessions_count;
880 uint64_t session_id;
881 enum lttng_live_viewer_status status;
882 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
883 bt_self_component *self_comp = viewer_connection->self_comp;
884 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
885
886 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
887 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
888
889 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
890 cmd.data_size = htobe64((uint64_t) 0);
891 cmd.cmd_version = htobe32(0);
892
893 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
894 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
895 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
896 goto end;
897 }
898
899 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
900 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
901 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
902 goto end;
903 }
904
905 sessions_count = be32toh(list.sessions_count);
906 for (i = 0; i < sessions_count; i++) {
907 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
908 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
909 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
910 goto end;
911 }
912 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
913 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
914 session_id = be64toh(lsession.id);
915
916 BT_COMP_LOGI("Adding session to internal list: "
917 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
918 session_id, lsession.hostname, lsession.session_name);
919
920 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
921 LTTNG_VIEWER_NAME_MAX) == 0) &&
922 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
923 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
924 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
925 lsession.session_name)) {
926 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
927 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
928 goto end;
929 }
930 }
931 }
932
933 status = LTTNG_LIVE_VIEWER_STATUS_OK;
934
935 end:
936 return status;
937 }
938
939 enum lttng_live_viewer_status
940 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
941 {
942 struct lttng_viewer_cmd cmd;
943 struct lttng_viewer_create_session_response resp;
944 enum lttng_live_viewer_status status;
945 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
946 bt_self_component *self_comp = viewer_connection->self_comp;
947 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
948
949 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
950 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
951
952 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
953 cmd.data_size = htobe64((uint64_t) 0);
954 cmd.cmd_version = htobe32(0);
955
956 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
957 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
958 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
959 goto end;
960 }
961
962 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
963 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
964 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
965 goto end;
966 }
967
968 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
969 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
970 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
971 goto end;
972 }
973
974 status = lttng_live_query_session_ids(lttng_live_msg_iter);
975 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
976 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
977 goto end;
978 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
979 goto end;
980 }
981
982 end:
983 return status;
984 }
985
986 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
987 uint32_t stream_count,
988 bt_self_message_iterator *self_msg_iter)
989 {
990 uint32_t i;
991 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
992 enum lttng_live_viewer_status status;
993 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
994 bt_self_component *self_comp = viewer_connection->self_comp;
995
996 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
997 for (i = 0; i < stream_count; i++) {
998 struct lttng_viewer_stream stream;
999 struct lttng_live_stream_iterator *live_stream;
1000 uint64_t stream_id;
1001 uint64_t ctf_trace_id;
1002
1003 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
1004 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1005 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
1006 goto end;
1007 }
1008 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1009 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1010 stream_id = be64toh(stream.id);
1011 ctf_trace_id = be64toh(stream.ctf_trace_id);
1012
1013 if (stream.metadata_flag) {
1014 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1015 stream.channel_name);
1016 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id,
1017 stream.path_name)) {
1018 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1019 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1020 goto end;
1021 }
1022 session->lazy_stream_msg_init = true;
1023 } else {
1024 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1025 stream.channel_name);
1026 live_stream =
1027 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1028 if (!live_stream) {
1029 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1030 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1031 goto end;
1032 }
1033 }
1034 }
1035 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1036
1037 end:
1038 return status;
1039 }
1040
1041 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1042 bt_self_message_iterator *self_msg_iter)
1043 {
1044 struct lttng_viewer_cmd cmd;
1045 enum lttng_live_viewer_status status;
1046 struct lttng_viewer_attach_session_request rq;
1047 struct lttng_viewer_attach_session_response rp;
1048 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1049 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1050 bt_self_component *self_comp = viewer_connection->self_comp;
1051 uint64_t session_id = session->id;
1052 uint32_t streams_count;
1053 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1054 char cmd_buf[cmd_buf_len];
1055
1056 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1057 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1058 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1059
1060 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1061 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1062 cmd.cmd_version = htobe32(0);
1063
1064 memset(&rq, 0, sizeof(rq));
1065 rq.session_id = htobe64(session_id);
1066 // TODO: add cmd line parameter to select seek beginning
1067 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1068 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1069
1070 /*
1071 * Merge the cmd and connection request to prevent a write-write
1072 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1073 * second write to be performed quickly in presence of Nagle's algorithm.
1074 */
1075 memcpy(cmd_buf, &cmd, sizeof(cmd));
1076 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1077 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1078 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1079 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1080 goto end;
1081 }
1082
1083 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1084 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1085 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1086 goto end;
1087 }
1088
1089 streams_count = be32toh(rp.streams_count);
1090 switch (be32toh(rp.status)) {
1091 case LTTNG_VIEWER_ATTACH_OK:
1092 break;
1093 case LTTNG_VIEWER_ATTACH_UNK:
1094 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1095 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1096 goto end;
1097 case LTTNG_VIEWER_ATTACH_ALREADY:
1098 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1099 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1100 goto end;
1101 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1102 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1103 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1104 goto end;
1105 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1106 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1107 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1108 goto end;
1109 default:
1110 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1111 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1112 goto end;
1113 }
1114
1115 /* We receive the initial list of streams. */
1116 status = receive_streams(session, streams_count, self_msg_iter);
1117 switch (status) {
1118 case LTTNG_LIVE_VIEWER_STATUS_OK:
1119 break;
1120 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1121 goto end;
1122 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1123 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1124 goto end;
1125 default:
1126 bt_common_abort();
1127 }
1128
1129 session->attached = true;
1130 session->new_streams_needed = false;
1131
1132 end:
1133 return status;
1134 }
1135
1136 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
1137 {
1138 struct lttng_viewer_cmd cmd;
1139 enum lttng_live_viewer_status status;
1140 struct lttng_viewer_detach_session_request rq;
1141 struct lttng_viewer_detach_session_response rp;
1142 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1143 bt_self_component *self_comp = session->self_comp;
1144 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1145 uint64_t session_id = session->id;
1146 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1147 char cmd_buf[cmd_buf_len];
1148
1149 /*
1150 * The session might already be detached and the viewer socket might
1151 * already been closed. This happens when calling this function when
1152 * tearing down the graph after an error.
1153 */
1154 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1155 return LTTNG_LIVE_VIEWER_STATUS_OK;
1156 }
1157
1158 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1159 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1160
1161 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1162 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1163 cmd.cmd_version = htobe32(0);
1164
1165 memset(&rq, 0, sizeof(rq));
1166 rq.session_id = htobe64(session_id);
1167
1168 /*
1169 * Merge the cmd and connection request to prevent a write-write
1170 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1171 * second write to be performed quickly in presence of Nagle's algorithm.
1172 */
1173 memcpy(cmd_buf, &cmd, sizeof(cmd));
1174 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1175 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1176 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1177 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1178 goto end;
1179 }
1180
1181 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1182 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1183 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1184 goto end;
1185 }
1186
1187 switch (be32toh(rp.status)) {
1188 case LTTNG_VIEWER_DETACH_SESSION_OK:
1189 break;
1190 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1191 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1192 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1193 goto end;
1194 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1195 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1196 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1197 goto end;
1198 default:
1199 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1200 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1201 goto end;
1202 }
1203
1204 session->attached = false;
1205
1206 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1207
1208 end:
1209 return status;
1210 }
1211
1212 enum lttng_live_get_one_metadata_status
1213 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1214 {
1215 uint64_t len = 0;
1216 enum lttng_live_get_one_metadata_status status;
1217 enum lttng_live_viewer_status viewer_status;
1218 struct lttng_viewer_cmd cmd;
1219 struct lttng_viewer_get_metadata rq;
1220 struct lttng_viewer_metadata_packet rp;
1221 gchar *data = NULL;
1222 ssize_t writelen;
1223 struct lttng_live_session *session = trace->session;
1224 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1225 struct lttng_live_metadata *metadata = trace->metadata;
1226 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1227 bt_self_component *self_comp = viewer_connection->self_comp;
1228 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1229 char cmd_buf[cmd_buf_len];
1230
1231 BT_COMP_LOGD("Requesting new metadata for trace:"
1232 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1233 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1234 metadata->stream_id);
1235
1236 rq.stream_id = htobe64(metadata->stream_id);
1237 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1238 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1239 cmd.cmd_version = htobe32(0);
1240
1241 /*
1242 * Merge the cmd and connection request to prevent a write-write
1243 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1244 * second write to be performed quickly in presence of Nagle's algorithm.
1245 */
1246 memcpy(cmd_buf, &cmd, sizeof(cmd));
1247 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1248 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1249 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1250 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1251 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1252 goto end;
1253 }
1254
1255 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1256 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1257 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1258 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1259 goto end;
1260 }
1261
1262 switch (be32toh(rp.status)) {
1263 case LTTNG_VIEWER_METADATA_OK:
1264 BT_COMP_LOGD("Received get_metadata response: ok");
1265 break;
1266 case LTTNG_VIEWER_NO_NEW_METADATA:
1267 BT_COMP_LOGD("Received get_metadata response: no new");
1268 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1269 goto end;
1270 case LTTNG_VIEWER_METADATA_ERR:
1271 /*
1272 * The Relayd cannot find this stream id. Maybe its
1273 * gone already. This can happen in short lived UST app
1274 * in a per-pid session.
1275 */
1276 BT_COMP_LOGD("Received get_metadata response: error");
1277 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1278 goto end;
1279 default:
1280 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1281 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1282 goto end;
1283 }
1284
1285 len = be64toh(rp.len);
1286 if (len == 0) {
1287 /*
1288 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1289 * length of 0. This means we must try again. This scenario
1290 * arises when a clear command is performed on an lttng session.
1291 */
1292 BT_COMP_LOGD(
1293 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1294 goto empty_metadata_packet_retry;
1295 }
1296
1297 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1298 if (len <= 0) {
1299 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1300 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1301 goto end;
1302 }
1303
1304 data = g_new0(gchar, len);
1305 if (!data) {
1306 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1307 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1308 goto end;
1309 }
1310
1311 viewer_status = lttng_live_recv(viewer_connection, data, len);
1312 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1313 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1314 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1315 goto end;
1316 }
1317
1318 /*
1319 * Write the metadata to the file handle.
1320 */
1321 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1322 if (writelen != len) {
1323 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1324 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1325 goto end;
1326 }
1327
1328 empty_metadata_packet_retry:
1329 *reply_len = len;
1330 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1331
1332 end:
1333 g_free(data);
1334 return status;
1335 }
1336
1337 /*
1338 * Assign the fields from a lttng_viewer_index to a packet_index.
1339 */
1340 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1341 struct packet_index *pindex)
1342 {
1343 BT_ASSERT(lindex);
1344 BT_ASSERT(pindex);
1345
1346 pindex->offset = be64toh(lindex->offset);
1347 pindex->packet_size = be64toh(lindex->packet_size);
1348 pindex->content_size = be64toh(lindex->content_size);
1349 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1350 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1351 pindex->events_discarded = be64toh(lindex->events_discarded);
1352 }
1353
1354 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1355 {
1356 uint64_t session_idx;
1357 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1358
1359 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1360 struct lttng_live_session *session =
1361 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1362 BT_COMP_LOGD("Marking session as needing new streams: "
1363 "session-id=%" PRIu64,
1364 session->id);
1365 session->new_streams_needed = true;
1366 }
1367 }
1368
1369 enum lttng_live_iterator_status
1370 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1371 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1372 {
1373 struct lttng_viewer_cmd cmd;
1374 struct lttng_viewer_get_next_index rq;
1375 enum lttng_live_viewer_status viewer_status;
1376 struct lttng_viewer_index rp;
1377 enum lttng_live_iterator_status status;
1378 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1379 bt_self_component *self_comp = viewer_connection->self_comp;
1380 struct lttng_live_trace *trace = stream->trace;
1381 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1382 char cmd_buf[cmd_buf_len];
1383 uint32_t flags, rp_status;
1384
1385 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1386 "viewer-stream-id=%" PRIu64,
1387 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1388 stream->viewer_stream_id);
1389 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1390 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1391 cmd.cmd_version = htobe32(0);
1392
1393 memset(&rq, 0, sizeof(rq));
1394 rq.stream_id = htobe64(stream->viewer_stream_id);
1395
1396 /*
1397 * Merge the cmd and connection request to prevent a write-write
1398 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1399 * second write to be performed quickly in presence of Nagle's algorithm.
1400 */
1401 memcpy(cmd_buf, &cmd, sizeof(cmd));
1402 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1403
1404 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1405 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1406 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1407 goto error;
1408 }
1409
1410 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1411 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1412 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1413 goto error;
1414 }
1415
1416 flags = be32toh(rp.flags);
1417 rp_status = be32toh(rp.status);
1418
1419 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1420 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1421 lttng_viewer_next_index_return_code_string(rp_status));
1422 switch (rp_status) {
1423 case LTTNG_VIEWER_INDEX_INACTIVE:
1424 {
1425 uint64_t ctf_stream_class_id;
1426
1427 memset(index, 0, sizeof(struct packet_index));
1428 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1429 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1430 ctf_stream_class_id = be64toh(rp.stream_id);
1431 if (stream->ctf_stream_class_id.is_set) {
1432 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1433 } else {
1434 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1435 stream->ctf_stream_class_id.is_set = true;
1436 }
1437 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1438 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1439 break;
1440 }
1441 case LTTNG_VIEWER_INDEX_OK:
1442 {
1443 uint64_t ctf_stream_class_id;
1444
1445 lttng_index_to_packet_index(&rp, index);
1446 ctf_stream_class_id = be64toh(rp.stream_id);
1447 if (stream->ctf_stream_class_id.is_set) {
1448 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1449 } else {
1450 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1451 stream->ctf_stream_class_id.is_set = true;
1452 }
1453
1454 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1455
1456 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1457 BT_COMP_LOGD("Marking trace as needing new metadata: "
1458 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1459 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1460 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1461 }
1462 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1463 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1464 "response=%s, response-flag=NEW_STREAM",
1465 lttng_viewer_next_index_return_code_string(rp_status));
1466 lttng_live_need_new_streams(lttng_live_msg_iter);
1467 }
1468 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1469 break;
1470 }
1471 case LTTNG_VIEWER_INDEX_RETRY:
1472 memset(index, 0, sizeof(struct packet_index));
1473 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1474 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1475 goto end;
1476 case LTTNG_VIEWER_INDEX_HUP:
1477 memset(index, 0, sizeof(struct packet_index));
1478 index->offset = EOF;
1479 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1480 stream->has_stream_hung_up = true;
1481 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1482 break;
1483 case LTTNG_VIEWER_INDEX_ERR:
1484 memset(index, 0, sizeof(struct packet_index));
1485 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1486 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1487 goto end;
1488 default:
1489 BT_COMP_LOGD("Received get_next_index response: unknown value");
1490 memset(index, 0, sizeof(struct packet_index));
1491 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1492 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1493 goto end;
1494 }
1495 goto end;
1496
1497 error:
1498 status = viewer_status_to_live_iterator_status(viewer_status);
1499 end:
1500 return status;
1501 }
1502
1503 enum ctf_msg_iter_medium_status
1504 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1505 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1506 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1507 {
1508 enum ctf_msg_iter_medium_status status;
1509 enum lttng_live_viewer_status viewer_status;
1510 struct lttng_viewer_trace_packet rp;
1511 struct lttng_viewer_cmd cmd;
1512 struct lttng_viewer_get_packet rq;
1513 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1514 bt_self_component *self_comp = viewer_connection->self_comp;
1515 struct lttng_live_trace *trace = stream->trace;
1516 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1517 char cmd_buf[cmd_buf_len];
1518 uint32_t flags, rp_status;
1519
1520 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1521 "offset=%" PRIu64 ", request-len=%" PRIu64,
1522 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1523
1524 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1525 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1526 cmd.cmd_version = htobe32(0);
1527
1528 memset(&rq, 0, sizeof(rq));
1529 rq.stream_id = htobe64(stream->viewer_stream_id);
1530 rq.offset = htobe64(offset);
1531 rq.len = htobe32(req_len);
1532
1533 /*
1534 * Merge the cmd and connection request to prevent a write-write
1535 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1536 * second write to be performed quickly in presence of Nagle's algorithm.
1537 */
1538 memcpy(cmd_buf, &cmd, sizeof(cmd));
1539 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1540
1541 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1542 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1543 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1544 goto error_convert_status;
1545 }
1546
1547 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1548 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1549 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1550 goto error_convert_status;
1551 }
1552
1553 flags = be32toh(rp.flags);
1554 rp_status = be32toh(rp.status);
1555
1556 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1557 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1558 lttng_viewer_get_packet_return_code_string(rp_status));
1559 switch (rp_status) {
1560 case LTTNG_VIEWER_GET_PACKET_OK:
1561 req_len = be32toh(rp.len);
1562 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1563 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
1564 break;
1565 case LTTNG_VIEWER_GET_PACKET_RETRY:
1566 /* Unimplemented by relay daemon */
1567 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1568 goto end;
1569 case LTTNG_VIEWER_GET_PACKET_ERR:
1570 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1571 BT_COMP_LOGD("Marking trace as needing new metadata: "
1572 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1573 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1574 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1575 }
1576 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1577 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1578 "response=%s, response-flag=NEW_STREAM",
1579 lttng_viewer_next_index_return_code_string(rp_status));
1580 lttng_live_need_new_streams(lttng_live_msg_iter);
1581 }
1582 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1583 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1584 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1585 lttng_viewer_get_packet_return_code_string(rp_status));
1586 goto end;
1587 }
1588 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1589 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1590 goto end;
1591 case LTTNG_VIEWER_GET_PACKET_EOF:
1592 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1593 goto end;
1594 default:
1595 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1596 rp_status);
1597 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1598 goto end;
1599 }
1600
1601 if (req_len == 0) {
1602 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1603 goto end;
1604 }
1605
1606 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1607 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1608 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1609 goto error_convert_status;
1610 }
1611 *recv_len = req_len;
1612
1613 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1614 goto end;
1615
1616 error_convert_status:
1617 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1618 end:
1619 return status;
1620 }
1621
1622 /*
1623 * Request new streams for a session.
1624 */
1625 enum lttng_live_iterator_status
1626 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1627 bt_self_message_iterator *self_msg_iter)
1628 {
1629 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1630 struct lttng_viewer_cmd cmd;
1631 struct lttng_viewer_new_streams_request rq;
1632 struct lttng_viewer_new_streams_response rp;
1633 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1634 enum lttng_live_viewer_status viewer_status;
1635 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1636 bt_self_component *self_comp = viewer_connection->self_comp;
1637 uint32_t streams_count;
1638 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1639 char cmd_buf[cmd_buf_len];
1640
1641 if (!session->new_streams_needed) {
1642 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1643 goto end;
1644 }
1645
1646 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1647 "session-id=%" PRIu64,
1648 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1649
1650 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1651 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1652 cmd.cmd_version = htobe32(0);
1653
1654 memset(&rq, 0, sizeof(rq));
1655 rq.session_id = htobe64(session->id);
1656
1657 /*
1658 * Merge the cmd and connection request to prevent a write-write
1659 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1660 * second write to be performed quickly in presence of Nagle's algorithm.
1661 */
1662 memcpy(cmd_buf, &cmd, sizeof(cmd));
1663 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1664
1665 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1666 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1667 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1668 status = viewer_status_to_live_iterator_status(viewer_status);
1669 goto end;
1670 }
1671
1672 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1673 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1674 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1675 status = viewer_status_to_live_iterator_status(viewer_status);
1676 goto end;
1677 }
1678
1679 streams_count = be32toh(rp.streams_count);
1680
1681 switch (be32toh(rp.status)) {
1682 case LTTNG_VIEWER_NEW_STREAMS_OK:
1683 session->new_streams_needed = false;
1684 break;
1685 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1686 session->new_streams_needed = false;
1687 goto end;
1688 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1689 session->new_streams_needed = false;
1690 session->closed = true;
1691 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1692 goto end;
1693 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1694 BT_COMP_LOGD("Received get_new_streams response: error");
1695 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1696 goto end;
1697 default:
1698 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1699 "Received get_new_streams response: Unknown:"
1700 "return code %u",
1701 be32toh(rp.status));
1702 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1703 goto end;
1704 }
1705
1706 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1707 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1708 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1709 status = viewer_status_to_live_iterator_status(viewer_status);
1710 goto end;
1711 }
1712
1713 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1714 end:
1715 return status;
1716 }
1717
1718 enum lttng_live_viewer_status live_viewer_connection_create(
1719 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1720 bt_logging_level log_level, const char *url, bool in_query,
1721 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
1722 {
1723 struct live_viewer_connection *viewer_connection;
1724 enum lttng_live_viewer_status status;
1725
1726 viewer_connection = g_new0(struct live_viewer_connection, 1);
1727
1728 if (bt_socket_init(log_level) != 0) {
1729 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1730 "Failed to init socket");
1731 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1732 goto error;
1733 }
1734
1735 viewer_connection->log_level = log_level;
1736
1737 viewer_connection->self_comp = self_comp;
1738 viewer_connection->self_comp_class = self_comp_class;
1739
1740 viewer_connection->control_sock = BT_INVALID_SOCKET;
1741 viewer_connection->port = -1;
1742 viewer_connection->in_query = in_query;
1743 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1744 viewer_connection->url = g_string_new(url);
1745 if (!viewer_connection->url) {
1746 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1747 "Failed to allocate URL buffer");
1748 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1749 goto error;
1750 }
1751
1752 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1753 "Establishing connection to url \"%s\"...", url);
1754 status = lttng_live_connect_viewer(viewer_connection);
1755 /*
1756 * Only print error and append cause in case of error. not in case of
1757 * interruption.
1758 */
1759 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1760 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1761 "Failed to establish connection: "
1762 "url=\"%s\"",
1763 url);
1764 goto error;
1765 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1766 goto error;
1767 }
1768 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1769 "Connection to url \"%s\" is established", url);
1770
1771 *viewer = viewer_connection;
1772 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1773 goto end;
1774
1775 error:
1776 if (viewer_connection) {
1777 live_viewer_connection_destroy(viewer_connection);
1778 }
1779 end:
1780 return status;
1781 }
1782
1783 void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
1784 {
1785 bt_self_component *self_comp = viewer_connection->self_comp;
1786 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1787
1788 if (!viewer_connection) {
1789 goto end;
1790 }
1791
1792 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1793 "Closing connection to relay:"
1794 "relay-url=\"%s\"",
1795 viewer_connection->url->str);
1796
1797 lttng_live_disconnect_viewer(viewer_connection);
1798
1799 if (viewer_connection->url) {
1800 g_string_free(viewer_connection->url, true);
1801 }
1802
1803 if (viewer_connection->relay_hostname) {
1804 g_string_free(viewer_connection->relay_hostname, true);
1805 }
1806
1807 if (viewer_connection->target_hostname) {
1808 g_string_free(viewer_connection->target_hostname, true);
1809 }
1810
1811 if (viewer_connection->session_name) {
1812 g_string_free(viewer_connection->session_name, true);
1813 }
1814
1815 if (viewer_connection->proto) {
1816 g_string_free(viewer_connection->proto, true);
1817 }
1818
1819 g_free(viewer_connection);
1820
1821 bt_socket_fini();
1822
1823 end:
1824 return;
1825 }
This page took 0.125426 seconds and 4 git commands to generate.