logging: strip down and clean `log.h` and `log.c`
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #include <glib.h>
9 #include <stdint.h>
10 #include <stdio.h>
11
12 #include <babeltrace2/babeltrace.h>
13
14 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
15 #define BT_LOG_OUTPUT_LEVEL ((enum bt_log_level) viewer_connection->log_level)
16 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
17 #include "logging/comp-logging.h"
18
19 #include "common/common.h"
20 #include "compat/endian.h" /* IWYU pragma: keep */
21
22 #include "data-stream.hpp"
23 #include "lttng-live.hpp"
24 #include "lttng-viewer-abi.hpp"
25 #include "metadata.hpp"
26 #include "viewer-connection.hpp"
27
28 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
29 do { \
30 switch (_status) { \
31 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
32 break; \
33 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
34 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
35 "Error " _action " " _msg_str); \
36 break; \
37 default: \
38 bt_common_abort(); \
39 } \
40 } while (0)
41
42 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
43 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
44
45 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
46 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
47
48 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
49 _fmt, ...) \
50 do { \
51 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
52 bt_socket_errormsg(), ##__VA_ARGS__); \
53 } while (0)
54
55 static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
56 {
57 switch (cmd) {
58 case LTTNG_VIEWER_CONNECT:
59 return "CONNECT";
60 case LTTNG_VIEWER_LIST_SESSIONS:
61 return "LIST_SESSIONS";
62 case LTTNG_VIEWER_ATTACH_SESSION:
63 return "ATTACH_SESSION";
64 case LTTNG_VIEWER_GET_NEXT_INDEX:
65 return "GET_NEXT_INDEX";
66 case LTTNG_VIEWER_GET_PACKET:
67 return "GET_PACKET";
68 case LTTNG_VIEWER_GET_METADATA:
69 return "GET_METADATA";
70 case LTTNG_VIEWER_GET_NEW_STREAMS:
71 return "GET_NEW_STREAMS";
72 case LTTNG_VIEWER_CREATE_SESSION:
73 return "CREATE_SESSION";
74 case LTTNG_VIEWER_DETACH_SESSION:
75 return "DETACH_SESSION";
76 }
77
78 bt_common_abort();
79 }
80
81 static const char *
82 lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
83 {
84 switch (code) {
85 case LTTNG_VIEWER_INDEX_OK:
86 return "INDEX_OK";
87 case LTTNG_VIEWER_INDEX_RETRY:
88 return "INDEX_RETRY";
89 case LTTNG_VIEWER_INDEX_HUP:
90 return "INDEX_HUP";
91 case LTTNG_VIEWER_INDEX_ERR:
92 return "INDEX_ERR";
93 case LTTNG_VIEWER_INDEX_INACTIVE:
94 return "INDEX_INACTIVE";
95 case LTTNG_VIEWER_INDEX_EOF:
96 return "INDEX_EOF";
97 }
98
99 bt_common_abort();
100 }
101
102 static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
103 {
104 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
105 }
106
107 static const char *
108 lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
109 {
110 switch (code) {
111 case LTTNG_VIEWER_GET_PACKET_OK:
112 return "GET_PACKET_OK";
113 case LTTNG_VIEWER_GET_PACKET_RETRY:
114 return "GET_PACKET_RETRY";
115 case LTTNG_VIEWER_GET_PACKET_ERR:
116 return "GET_PACKET_ERR";
117 case LTTNG_VIEWER_GET_PACKET_EOF:
118 return "GET_PACKET_EOF";
119 }
120
121 bt_common_abort();
122 };
123
124 static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
125 {
126 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
127 }
128
129 static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
130 {
131 switch (seek) {
132 case LTTNG_VIEWER_SEEK_BEGINNING:
133 return "SEEK_BEGINNING";
134 case LTTNG_VIEWER_SEEK_LAST:
135 return "SEEK_LAST";
136 }
137
138 bt_common_abort();
139 }
140
141 static inline enum lttng_live_iterator_status
142 viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
143 {
144 switch (viewer_status) {
145 case LTTNG_LIVE_VIEWER_STATUS_OK:
146 return LTTNG_LIVE_ITERATOR_STATUS_OK;
147 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
148 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
149 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
150 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
151 }
152
153 bt_common_abort();
154 }
155
156 static inline enum ctf_msg_iter_medium_status
157 viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
158 {
159 switch (viewer_status) {
160 case LTTNG_LIVE_VIEWER_STATUS_OK:
161 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
162 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
163 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
164 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
165 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
166 }
167
168 bt_common_abort();
169 }
170
171 static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
172 {
173 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
174 bt_self_component *self_comp = viewer_connection->self_comp;
175 int ret = bt_socket_close(viewer_connection->control_sock);
176 if (ret == -1) {
177 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
178 "Error closing viewer connection socket: ", ".");
179 }
180
181 viewer_connection->control_sock = BT_INVALID_SOCKET;
182 }
183
184 /*
185 * This function receives a message from the Relay daemon.
186 * If it received the entire message, it returns _OK,
187 * If it's interrupted, it returns _INTERRUPTED,
188 * otherwise, it returns _ERROR.
189 */
190 static enum lttng_live_viewer_status
191 lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
192 {
193 ssize_t received;
194 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
195 bt_self_component *self_comp = viewer_connection->self_comp;
196 size_t total_received = 0, to_receive = len;
197 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
198 enum lttng_live_viewer_status status;
199 BT_SOCKET sock = viewer_connection->control_sock;
200
201 /*
202 * Receive a message from the Relay.
203 */
204 do {
205 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
206 if (received == BT_SOCKET_ERROR) {
207 if (bt_socket_interrupted()) {
208 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
209 /*
210 * This interruption was due to a
211 * SIGINT and the graph is being torn
212 * down.
213 */
214 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
215 lttng_live_msg_iter->was_interrupted = true;
216 goto end;
217 } else {
218 /*
219 * A signal was received, but the graph
220 * is not being torn down. Carry on.
221 */
222 continue;
223 }
224 } else {
225 /*
226 * For any other types of socket error, close
227 * the socket and return an error.
228 */
229 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
230 self_comp, self_comp_class, "Error receiving from Relay", ".");
231
232 viewer_connection_close_socket(viewer_connection);
233 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
234 goto end;
235 }
236 } else if (received == 0) {
237 /*
238 * The recv() call returned 0. This means the
239 * connection was orderly shutdown from the other peer.
240 * If that happens when we are trying to receive
241 * a message from it, it means something when wrong.
242 * Close the socket and return an error.
243 */
244 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
245 "Remote side has closed connection");
246 viewer_connection_close_socket(viewer_connection);
247 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
248 goto end;
249 }
250
251 BT_ASSERT(received <= to_receive);
252 total_received += received;
253 to_receive -= received;
254
255 } while (to_receive > 0);
256
257 BT_ASSERT(total_received == len);
258 status = LTTNG_LIVE_VIEWER_STATUS_OK;
259
260 end:
261 return status;
262 }
263
264 /*
265 * This function sends a message to the Relay daemon.
266 * If it send the message, it returns _OK,
267 * If it's interrupted, it returns _INTERRUPTED,
268 * otherwise, it returns _ERROR.
269 */
270 static enum lttng_live_viewer_status
271 lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
272 {
273 enum lttng_live_viewer_status status;
274 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
275 bt_self_component *self_comp = viewer_connection->self_comp;
276 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
277 BT_SOCKET sock = viewer_connection->control_sock;
278 size_t to_send = len;
279 ssize_t total_sent = 0;
280
281 do {
282 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
283 if (sent == BT_SOCKET_ERROR) {
284 if (bt_socket_interrupted()) {
285 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
286 /*
287 * This interruption was a SIGINT and
288 * the graph is being teared down.
289 */
290 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
291 lttng_live_msg_iter->was_interrupted = true;
292 goto end;
293 } else {
294 /*
295 * A signal was received, but the graph
296 * is not being teared down. Carry on.
297 */
298 continue;
299 }
300 } else {
301 /*
302 * For any other types of socket error, close
303 * the socket and return an error.
304 */
305 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
306 self_comp, self_comp_class, "Error sending to Relay", ".");
307
308 viewer_connection_close_socket(viewer_connection);
309 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
310 goto end;
311 }
312 }
313
314 BT_ASSERT(sent <= to_send);
315 total_sent += sent;
316 to_send -= sent;
317
318 } while (to_send > 0);
319
320 BT_ASSERT(total_sent == len);
321 status = LTTNG_LIVE_VIEWER_STATUS_OK;
322
323 end:
324 return status;
325 }
326
327 static int parse_url(struct live_viewer_connection *viewer_connection)
328 {
329 char error_buf[256] = {0};
330 bt_self_component *self_comp = viewer_connection->self_comp;
331 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
332 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
333 int ret = -1;
334 const char *path = viewer_connection->url->str;
335
336 if (!path) {
337 goto end;
338 }
339
340 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
341 if (!lttng_live_url_parts.proto) {
342 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
343 "Invalid LTTng live URL format: %s", error_buf);
344 goto end;
345 }
346 viewer_connection->proto = lttng_live_url_parts.proto;
347 lttng_live_url_parts.proto = NULL;
348
349 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
350 lttng_live_url_parts.hostname = NULL;
351
352 if (lttng_live_url_parts.port >= 0) {
353 viewer_connection->port = lttng_live_url_parts.port;
354 } else {
355 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
356 }
357
358 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
359 lttng_live_url_parts.target_hostname = NULL;
360
361 if (lttng_live_url_parts.session_name) {
362 viewer_connection->session_name = lttng_live_url_parts.session_name;
363 lttng_live_url_parts.session_name = NULL;
364 }
365
366 ret = 0;
367
368 end:
369 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
370 return ret;
371 }
372
373 static enum lttng_live_viewer_status
374 lttng_live_handshake(struct live_viewer_connection *viewer_connection)
375 {
376 struct lttng_viewer_cmd cmd;
377 struct lttng_viewer_connect connect;
378 enum lttng_live_viewer_status status;
379 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
380 bt_self_component *self_comp = viewer_connection->self_comp;
381 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
382 char cmd_buf[cmd_buf_len];
383
384 BT_COMP_OR_COMP_CLASS_LOGD(
385 self_comp, self_comp_class,
386 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
387 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
388
389 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
390 cmd.data_size = htobe64((uint64_t) sizeof(connect));
391 cmd.cmd_version = htobe32(0);
392
393 connect.viewer_session_id = -1ULL; /* will be set on recv */
394 connect.major = htobe32(LTTNG_LIVE_MAJOR);
395 connect.minor = htobe32(LTTNG_LIVE_MINOR);
396 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
397
398 /*
399 * Merge the cmd and connection request to prevent a write-write
400 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
401 * second write to be performed quickly in presence of Nagle's algorithm
402 */
403 memcpy(cmd_buf, &cmd, sizeof(cmd));
404 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
405
406 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
407 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
408 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
409 goto end;
410 }
411
412 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
413 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
414 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
415 goto end;
416 }
417
418 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
419 (uint64_t) be64toh(connect.viewer_session_id));
420 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
421 be32toh(connect.major), be32toh(connect.minor));
422
423 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
424 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
425 "Incompatible lttng-relayd protocol");
426 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
427 goto end;
428 }
429 /* Use the smallest protocol version implemented. */
430 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
431 viewer_connection->minor = be32toh(connect.minor);
432 } else {
433 viewer_connection->minor = LTTNG_LIVE_MINOR;
434 }
435 viewer_connection->major = LTTNG_LIVE_MAJOR;
436
437 status = LTTNG_LIVE_VIEWER_STATUS_OK;
438
439 goto end;
440
441 end:
442 return status;
443 }
444
445 static enum lttng_live_viewer_status
446 lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
447 {
448 struct hostent *host;
449 struct sockaddr_in server_addr;
450 enum lttng_live_viewer_status status;
451 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
452 bt_self_component *self_comp = viewer_connection->self_comp;
453
454 if (parse_url(viewer_connection)) {
455 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
456 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
457 goto error;
458 }
459
460 BT_COMP_OR_COMP_CLASS_LOGD(
461 self_comp, self_comp_class,
462 "Connecting to hostname : %s, port : %d, "
463 "target hostname : %s, session name : %s, proto : %s",
464 viewer_connection->relay_hostname->str, viewer_connection->port,
465 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
466 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
467 viewer_connection->proto->str);
468
469 host = gethostbyname(viewer_connection->relay_hostname->str);
470 if (!host) {
471 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
472 "Cannot lookup hostname: hostname=\"%s\"",
473 viewer_connection->relay_hostname->str);
474 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
475 goto error;
476 }
477
478 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
479 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
480 "Socket creation failed: %s", bt_socket_errormsg());
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 server_addr.sin_family = AF_INET;
486 server_addr.sin_port = htons(viewer_connection->port);
487 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
488 memset(&(server_addr.sin_zero), 0, 8);
489
490 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
491 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
492 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
493 bt_socket_errormsg());
494 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
495 goto error;
496 }
497
498 status = lttng_live_handshake(viewer_connection);
499
500 /*
501 * Only print error and append cause in case of error. not in case of
502 * interruption.
503 */
504 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
505 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
506 "Viewer handshake failed");
507 goto error;
508 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
509 goto end;
510 }
511
512 goto end;
513
514 error:
515 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
516 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
517 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
518 bt_socket_errormsg());
519 }
520 }
521 viewer_connection->control_sock = BT_INVALID_SOCKET;
522 end:
523 return status;
524 }
525
526 static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
527 {
528 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
529 bt_self_component *self_comp = viewer_connection->self_comp;
530
531 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
532 return;
533 }
534 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
535 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
536 bt_socket_errormsg());
537 viewer_connection->control_sock = BT_INVALID_SOCKET;
538 }
539 }
540
541 static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
542 bool *_found, struct live_viewer_connection *viewer_connection)
543 {
544 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
545 bt_self_component *self_comp = viewer_connection->self_comp;
546 int ret = 0;
547 uint64_t i, len;
548 bt_value *map = NULL;
549 bt_value *hostname = NULL;
550 bt_value *session_name = NULL;
551 bt_value *btval = NULL;
552 bool found = false;
553
554 len = bt_value_array_get_length(results);
555 for (i = 0; i < len; i++) {
556 const char *hostname_str = NULL;
557 const char *session_name_str = NULL;
558
559 map = bt_value_array_borrow_element_by_index(results, i);
560 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
561 if (!hostname) {
562 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
563 "Error borrowing \"target-hostname\" entry.");
564 ret = -1;
565 goto end;
566 }
567 session_name = bt_value_map_borrow_entry_value(map, "session-name");
568 if (!session_name) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"session-name\" entry.");
571 ret = -1;
572 goto end;
573 }
574 hostname_str = bt_value_string_get(hostname);
575 session_name_str = bt_value_string_get(session_name);
576
577 if (strcmp(session->hostname, hostname_str) == 0 &&
578 strcmp(session->session_name, session_name_str) == 0) {
579 int64_t val;
580 uint32_t streams = be32toh(session->streams);
581 uint32_t clients = be32toh(session->clients);
582
583 found = true;
584
585 btval = bt_value_map_borrow_entry_value(map, "stream-count");
586 if (!btval) {
587 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
588 "Error borrowing \"stream-count\" entry.");
589 ret = -1;
590 goto end;
591 }
592 val = bt_value_integer_unsigned_get(btval);
593 /* sum */
594 val += streams;
595 bt_value_integer_unsigned_set(btval, val);
596
597 btval = bt_value_map_borrow_entry_value(map, "client-count");
598 if (!btval) {
599 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
600 "Error borrowing \"client-count\" entry.");
601 ret = -1;
602 goto end;
603 }
604 val = bt_value_integer_unsigned_get(btval);
605 /* max */
606 val = bt_max_t(int64_t, clients, val);
607 bt_value_integer_unsigned_set(btval, val);
608 }
609
610 if (found) {
611 break;
612 }
613 }
614 end:
615 *_found = found;
616 return ret;
617 }
618
619 static int list_append_session(bt_value *results, GString *base_url,
620 const struct lttng_viewer_session *session,
621 struct live_viewer_connection *viewer_connection)
622 {
623 int ret = 0;
624 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
625 bt_value_map_insert_entry_status insert_status;
626 bt_value_array_append_element_status append_status;
627 bt_value *map = NULL;
628 GString *url = NULL;
629 bool found = false;
630
631 /*
632 * If the session already exists, add the stream count to it,
633 * and do max of client counts.
634 */
635 ret = list_update_session(results, session, &found, viewer_connection);
636 if (ret || found) {
637 goto end;
638 }
639
640 map = bt_value_map_create();
641 if (!map) {
642 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
643 ret = -1;
644 goto end;
645 }
646
647 if (base_url->len < 1) {
648 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
649 ret = -1;
650 goto end;
651 }
652 /*
653 * key = "url",
654 * value = <string>,
655 */
656 url = g_string_new(base_url->str);
657 g_string_append(url, "/host/");
658 g_string_append(url, session->hostname);
659 g_string_append_c(url, '/');
660 g_string_append(url, session->session_name);
661
662 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
663 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
664 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
665 ret = -1;
666 goto end;
667 }
668
669 /*
670 * key = "target-hostname",
671 * value = <string>,
672 */
673 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
674 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
675 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
676 "Error inserting \"target-hostname\" entry.");
677 ret = -1;
678 goto end;
679 }
680
681 /*
682 * key = "session-name",
683 * value = <string>,
684 */
685 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
686 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
687 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
688 ret = -1;
689 goto end;
690 }
691
692 /*
693 * key = "timer-us",
694 * value = <integer>,
695 */
696 {
697 uint32_t live_timer = be32toh(session->live_timer);
698
699 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
700 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
701 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
702 ret = -1;
703 goto end;
704 }
705 }
706
707 /*
708 * key = "stream-count",
709 * value = <integer>,
710 */
711 {
712 uint32_t streams = be32toh(session->streams);
713
714 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
715 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
716 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
717 "Error inserting \"stream-count\" entry.");
718 ret = -1;
719 goto end;
720 }
721 }
722
723 /*
724 * key = "client-count",
725 * value = <integer>,
726 */
727 {
728 uint32_t clients = be32toh(session->clients);
729
730 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
731 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
732 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
733 "Error inserting \"client-count\" entry.");
734 ret = -1;
735 goto end;
736 }
737 }
738
739 append_status = bt_value_array_append_element(results, map);
740 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
741 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
742 ret = -1;
743 }
744
745 end:
746 if (url) {
747 g_string_free(url, true);
748 }
749 BT_VALUE_PUT_REF_AND_RESET(map);
750 return ret;
751 }
752
753 /*
754 * Data structure returned:
755 *
756 * {
757 * <array> = {
758 * [n] = {
759 * <map> = {
760 * {
761 * key = "url",
762 * value = <string>,
763 * },
764 * {
765 * key = "target-hostname",
766 * value = <string>,
767 * },
768 * {
769 * key = "session-name",
770 * value = <string>,
771 * },
772 * {
773 * key = "timer-us",
774 * value = <integer>,
775 * },
776 * {
777 * key = "stream-count",
778 * value = <integer>,
779 * },
780 * {
781 * key = "client-count",
782 * value = <integer>,
783 * },
784 * },
785 * }
786 * }
787 */
788
789 bt_component_class_query_method_status
790 live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
791 const bt_value **user_result)
792 {
793 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
794 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
795 bt_value *result = NULL;
796 enum lttng_live_viewer_status viewer_status;
797 struct lttng_viewer_cmd cmd;
798 struct lttng_viewer_list_sessions list;
799 uint32_t i, sessions_count;
800
801 result = bt_value_array_create();
802 if (!result) {
803 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
804 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
805 goto error;
806 }
807
808 BT_LOGD("Requesting list of sessions: cmd=%s",
809 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
810
811 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
812 cmd.data_size = htobe64((uint64_t) 0);
813 cmd.cmd_version = htobe32(0);
814
815 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
816 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
817 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
818 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
819 goto error;
820 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
821 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
822 goto error;
823 }
824
825 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
826 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
827 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
828 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
829 goto error;
830 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
831 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
832 goto error;
833 }
834
835 sessions_count = be32toh(list.sessions_count);
836 for (i = 0; i < sessions_count; i++) {
837 struct lttng_viewer_session lsession;
838
839 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
840 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
841 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
842 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
843 goto error;
844 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
845 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
846 goto error;
847 }
848
849 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
850 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
851 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
852 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
853 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
854 goto error;
855 }
856 }
857
858 *user_result = result;
859 goto end;
860 error:
861 BT_VALUE_PUT_REF_AND_RESET(result);
862 end:
863 return status;
864 }
865
866 static enum lttng_live_viewer_status
867 lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
868 {
869 struct lttng_viewer_cmd cmd;
870 struct lttng_viewer_list_sessions list;
871 struct lttng_viewer_session lsession;
872 uint32_t i, sessions_count;
873 uint64_t session_id;
874 enum lttng_live_viewer_status status;
875 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
876 bt_self_component *self_comp = viewer_connection->self_comp;
877 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
878
879 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
880 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
881
882 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
883 cmd.data_size = htobe64((uint64_t) 0);
884 cmd.cmd_version = htobe32(0);
885
886 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
887 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
888 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
889 goto end;
890 }
891
892 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
893 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
894 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
895 goto end;
896 }
897
898 sessions_count = be32toh(list.sessions_count);
899 for (i = 0; i < sessions_count; i++) {
900 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
901 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
902 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
903 goto end;
904 }
905 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
906 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
907 session_id = be64toh(lsession.id);
908
909 BT_COMP_LOGI("Adding session to internal list: "
910 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
911 session_id, lsession.hostname, lsession.session_name);
912
913 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
914 LTTNG_VIEWER_NAME_MAX) == 0) &&
915 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
916 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
917 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
918 lsession.session_name)) {
919 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
920 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
921 goto end;
922 }
923 }
924 }
925
926 status = LTTNG_LIVE_VIEWER_STATUS_OK;
927
928 end:
929 return status;
930 }
931
932 enum lttng_live_viewer_status
933 lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
934 {
935 struct lttng_viewer_cmd cmd;
936 struct lttng_viewer_create_session_response resp;
937 enum lttng_live_viewer_status status;
938 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
939 bt_self_component *self_comp = viewer_connection->self_comp;
940 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
941
942 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
943 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
944
945 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
946 cmd.data_size = htobe64((uint64_t) 0);
947 cmd.cmd_version = htobe32(0);
948
949 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
950 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
951 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
952 goto end;
953 }
954
955 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
956 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
957 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
958 goto end;
959 }
960
961 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
962 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
963 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
964 goto end;
965 }
966
967 status = lttng_live_query_session_ids(lttng_live_msg_iter);
968 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
969 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
970 goto end;
971 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
972 goto end;
973 }
974
975 end:
976 return status;
977 }
978
979 static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
980 uint32_t stream_count,
981 bt_self_message_iterator *self_msg_iter)
982 {
983 uint32_t i;
984 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
985 enum lttng_live_viewer_status status;
986 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
987 bt_self_component *self_comp = viewer_connection->self_comp;
988
989 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
990 for (i = 0; i < stream_count; i++) {
991 struct lttng_viewer_stream stream;
992 struct lttng_live_stream_iterator *live_stream;
993 uint64_t stream_id;
994 uint64_t ctf_trace_id;
995
996 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
997 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
998 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
999 goto end;
1000 }
1001 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1002 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1003 stream_id = be64toh(stream.id);
1004 ctf_trace_id = be64toh(stream.ctf_trace_id);
1005
1006 if (stream.metadata_flag) {
1007 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1008 stream.channel_name);
1009 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
1010 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1011 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1012 goto end;
1013 }
1014 session->lazy_stream_msg_init = true;
1015 } else {
1016 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1017 stream.channel_name);
1018 live_stream =
1019 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1020 if (!live_stream) {
1021 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1022 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1023 goto end;
1024 }
1025 }
1026 }
1027 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1028
1029 end:
1030 return status;
1031 }
1032
1033 enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1034 bt_self_message_iterator *self_msg_iter)
1035 {
1036 struct lttng_viewer_cmd cmd;
1037 enum lttng_live_viewer_status status;
1038 struct lttng_viewer_attach_session_request rq;
1039 struct lttng_viewer_attach_session_response rp;
1040 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1041 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1042 bt_self_component *self_comp = viewer_connection->self_comp;
1043 uint64_t session_id = session->id;
1044 uint32_t streams_count;
1045 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1046 char cmd_buf[cmd_buf_len];
1047
1048 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1049 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1050 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1051
1052 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1053 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1054 cmd.cmd_version = htobe32(0);
1055
1056 memset(&rq, 0, sizeof(rq));
1057 rq.session_id = htobe64(session_id);
1058 // TODO: add cmd line parameter to select seek beginning
1059 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1060 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1061
1062 /*
1063 * Merge the cmd and connection request to prevent a write-write
1064 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1065 * second write to be performed quickly in presence of Nagle's algorithm.
1066 */
1067 memcpy(cmd_buf, &cmd, sizeof(cmd));
1068 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1069 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1070 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1071 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1072 goto end;
1073 }
1074
1075 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1076 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1077 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1078 goto end;
1079 }
1080
1081 streams_count = be32toh(rp.streams_count);
1082 switch (be32toh(rp.status)) {
1083 case LTTNG_VIEWER_ATTACH_OK:
1084 break;
1085 case LTTNG_VIEWER_ATTACH_UNK:
1086 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1087 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1088 goto end;
1089 case LTTNG_VIEWER_ATTACH_ALREADY:
1090 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1091 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1092 goto end;
1093 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1094 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1095 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1096 goto end;
1097 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1098 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1099 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1100 goto end;
1101 default:
1102 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1103 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1104 goto end;
1105 }
1106
1107 /* We receive the initial list of streams. */
1108 status = receive_streams(session, streams_count, self_msg_iter);
1109 switch (status) {
1110 case LTTNG_LIVE_VIEWER_STATUS_OK:
1111 break;
1112 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1113 goto end;
1114 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1115 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1116 goto end;
1117 default:
1118 bt_common_abort();
1119 }
1120
1121 session->attached = true;
1122 session->new_streams_needed = false;
1123
1124 end:
1125 return status;
1126 }
1127
1128 enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
1129 {
1130 struct lttng_viewer_cmd cmd;
1131 enum lttng_live_viewer_status status;
1132 struct lttng_viewer_detach_session_request rq;
1133 struct lttng_viewer_detach_session_response rp;
1134 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1135 bt_self_component *self_comp = session->self_comp;
1136 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1137 uint64_t session_id = session->id;
1138 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1139 char cmd_buf[cmd_buf_len];
1140
1141 /*
1142 * The session might already be detached and the viewer socket might
1143 * already been closed. This happens when calling this function when
1144 * tearing down the graph after an error.
1145 */
1146 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1147 return LTTNG_LIVE_VIEWER_STATUS_OK;
1148 }
1149
1150 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1151 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1152
1153 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1154 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1155 cmd.cmd_version = htobe32(0);
1156
1157 memset(&rq, 0, sizeof(rq));
1158 rq.session_id = htobe64(session_id);
1159
1160 /*
1161 * Merge the cmd and connection request to prevent a write-write
1162 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1163 * second write to be performed quickly in presence of Nagle's algorithm.
1164 */
1165 memcpy(cmd_buf, &cmd, sizeof(cmd));
1166 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1167 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1168 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1169 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1170 goto end;
1171 }
1172
1173 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1174 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1175 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1176 goto end;
1177 }
1178
1179 switch (be32toh(rp.status)) {
1180 case LTTNG_VIEWER_DETACH_SESSION_OK:
1181 break;
1182 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1183 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1184 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1185 goto end;
1186 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1187 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1188 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1189 goto end;
1190 default:
1191 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1192 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1193 goto end;
1194 }
1195
1196 session->attached = false;
1197
1198 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1199
1200 end:
1201 return status;
1202 }
1203
1204 enum lttng_live_get_one_metadata_status
1205 lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1206 {
1207 uint64_t len = 0;
1208 enum lttng_live_get_one_metadata_status status;
1209 enum lttng_live_viewer_status viewer_status;
1210 struct lttng_viewer_cmd cmd;
1211 struct lttng_viewer_get_metadata rq;
1212 struct lttng_viewer_metadata_packet rp;
1213 gchar *data = NULL;
1214 ssize_t writelen;
1215 struct lttng_live_session *session = trace->session;
1216 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1217 struct lttng_live_metadata *metadata = trace->metadata;
1218 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1219 bt_self_component *self_comp = viewer_connection->self_comp;
1220 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1221 char cmd_buf[cmd_buf_len];
1222
1223 BT_COMP_LOGD("Requesting new metadata for trace:"
1224 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1225 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1226 metadata->stream_id);
1227
1228 rq.stream_id = htobe64(metadata->stream_id);
1229 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1230 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1231 cmd.cmd_version = htobe32(0);
1232
1233 /*
1234 * Merge the cmd and connection request to prevent a write-write
1235 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1236 * second write to be performed quickly in presence of Nagle's algorithm.
1237 */
1238 memcpy(cmd_buf, &cmd, sizeof(cmd));
1239 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1240 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1241 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1242 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1243 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1244 goto end;
1245 }
1246
1247 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1248 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1249 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1250 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1251 goto end;
1252 }
1253
1254 switch (be32toh(rp.status)) {
1255 case LTTNG_VIEWER_METADATA_OK:
1256 BT_COMP_LOGD("Received get_metadata response: ok");
1257 break;
1258 case LTTNG_VIEWER_NO_NEW_METADATA:
1259 BT_COMP_LOGD("Received get_metadata response: no new");
1260 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1261 goto end;
1262 case LTTNG_VIEWER_METADATA_ERR:
1263 /*
1264 * The Relayd cannot find this stream id. Maybe its
1265 * gone already. This can happen in short lived UST app
1266 * in a per-pid session.
1267 */
1268 BT_COMP_LOGD("Received get_metadata response: error");
1269 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1270 goto end;
1271 default:
1272 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1273 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1274 goto end;
1275 }
1276
1277 len = be64toh(rp.len);
1278 if (len == 0) {
1279 /*
1280 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1281 * length of 0. This means we must try again. This scenario
1282 * arises when a clear command is performed on an lttng session.
1283 */
1284 BT_COMP_LOGD(
1285 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1286 goto empty_metadata_packet_retry;
1287 }
1288
1289 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1290 if (len <= 0) {
1291 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1292 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1293 goto end;
1294 }
1295
1296 data = g_new0(gchar, len);
1297 if (!data) {
1298 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1299 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1300 goto end;
1301 }
1302
1303 viewer_status = lttng_live_recv(viewer_connection, data, len);
1304 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1305 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1306 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1307 goto end;
1308 }
1309
1310 /*
1311 * Write the metadata to the file handle.
1312 */
1313 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1314 if (writelen != len) {
1315 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1316 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1317 goto end;
1318 }
1319
1320 empty_metadata_packet_retry:
1321 *reply_len = len;
1322 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1323
1324 end:
1325 g_free(data);
1326 return status;
1327 }
1328
1329 /*
1330 * Assign the fields from a lttng_viewer_index to a packet_index.
1331 */
1332 static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1333 struct packet_index *pindex)
1334 {
1335 BT_ASSERT(lindex);
1336 BT_ASSERT(pindex);
1337
1338 pindex->offset = be64toh(lindex->offset);
1339 pindex->packet_size = be64toh(lindex->packet_size);
1340 pindex->content_size = be64toh(lindex->content_size);
1341 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1342 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1343 pindex->events_discarded = be64toh(lindex->events_discarded);
1344 }
1345
1346 static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1347 {
1348 uint64_t session_idx;
1349 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1350
1351 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1352 struct lttng_live_session *session =
1353 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1354 BT_COMP_LOGD("Marking session as needing new streams: "
1355 "session-id=%" PRIu64,
1356 session->id);
1357 session->new_streams_needed = true;
1358 }
1359 }
1360
1361 enum lttng_live_iterator_status
1362 lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1363 struct lttng_live_stream_iterator *stream, struct packet_index *index)
1364 {
1365 struct lttng_viewer_cmd cmd;
1366 struct lttng_viewer_get_next_index rq;
1367 enum lttng_live_viewer_status viewer_status;
1368 struct lttng_viewer_index rp;
1369 enum lttng_live_iterator_status status;
1370 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1371 bt_self_component *self_comp = viewer_connection->self_comp;
1372 struct lttng_live_trace *trace = stream->trace;
1373 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1374 char cmd_buf[cmd_buf_len];
1375 uint32_t flags, rp_status;
1376
1377 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1378 "viewer-stream-id=%" PRIu64,
1379 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1380 stream->viewer_stream_id);
1381 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1382 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1383 cmd.cmd_version = htobe32(0);
1384
1385 memset(&rq, 0, sizeof(rq));
1386 rq.stream_id = htobe64(stream->viewer_stream_id);
1387
1388 /*
1389 * Merge the cmd and connection request to prevent a write-write
1390 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1391 * second write to be performed quickly in presence of Nagle's algorithm.
1392 */
1393 memcpy(cmd_buf, &cmd, sizeof(cmd));
1394 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1395
1396 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1397 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1398 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1399 goto error;
1400 }
1401
1402 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1403 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1404 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1405 goto error;
1406 }
1407
1408 flags = be32toh(rp.flags);
1409 rp_status = be32toh(rp.status);
1410
1411 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1412 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1413 lttng_viewer_next_index_return_code_string(rp_status));
1414 switch (rp_status) {
1415 case LTTNG_VIEWER_INDEX_INACTIVE:
1416 {
1417 uint64_t ctf_stream_class_id;
1418
1419 memset(index, 0, sizeof(struct packet_index));
1420 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1421 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1422 ctf_stream_class_id = be64toh(rp.stream_id);
1423 if (stream->ctf_stream_class_id.is_set) {
1424 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1425 } else {
1426 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1427 stream->ctf_stream_class_id.is_set = true;
1428 }
1429 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1430 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1431 break;
1432 }
1433 case LTTNG_VIEWER_INDEX_OK:
1434 {
1435 uint64_t ctf_stream_class_id;
1436
1437 lttng_index_to_packet_index(&rp, index);
1438 ctf_stream_class_id = be64toh(rp.stream_id);
1439 if (stream->ctf_stream_class_id.is_set) {
1440 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1441 } else {
1442 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1443 stream->ctf_stream_class_id.is_set = true;
1444 }
1445
1446 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1447
1448 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1449 BT_COMP_LOGD("Marking trace as needing new metadata: "
1450 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1451 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1452 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1453 }
1454 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1455 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1456 "response=%s, response-flag=NEW_STREAM",
1457 lttng_viewer_next_index_return_code_string(rp_status));
1458 lttng_live_need_new_streams(lttng_live_msg_iter);
1459 }
1460 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1461 break;
1462 }
1463 case LTTNG_VIEWER_INDEX_RETRY:
1464 memset(index, 0, sizeof(struct packet_index));
1465 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1466 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1467 goto end;
1468 case LTTNG_VIEWER_INDEX_HUP:
1469 memset(index, 0, sizeof(struct packet_index));
1470 index->offset = EOF;
1471 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1472 stream->has_stream_hung_up = true;
1473 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1474 break;
1475 case LTTNG_VIEWER_INDEX_ERR:
1476 memset(index, 0, sizeof(struct packet_index));
1477 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1478 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1479 goto end;
1480 default:
1481 BT_COMP_LOGD("Received get_next_index response: unknown value");
1482 memset(index, 0, sizeof(struct packet_index));
1483 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1484 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1485 goto end;
1486 }
1487 goto end;
1488
1489 error:
1490 status = viewer_status_to_live_iterator_status(viewer_status);
1491 end:
1492 return status;
1493 }
1494
1495 enum ctf_msg_iter_medium_status
1496 lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1497 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1498 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1499 {
1500 enum ctf_msg_iter_medium_status status;
1501 enum lttng_live_viewer_status viewer_status;
1502 struct lttng_viewer_trace_packet rp;
1503 struct lttng_viewer_cmd cmd;
1504 struct lttng_viewer_get_packet rq;
1505 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1506 bt_self_component *self_comp = viewer_connection->self_comp;
1507 struct lttng_live_trace *trace = stream->trace;
1508 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1509 char cmd_buf[cmd_buf_len];
1510 uint32_t flags, rp_status;
1511
1512 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1513 "offset=%" PRIu64 ", request-len=%" PRIu64,
1514 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1515
1516 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1517 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1518 cmd.cmd_version = htobe32(0);
1519
1520 memset(&rq, 0, sizeof(rq));
1521 rq.stream_id = htobe64(stream->viewer_stream_id);
1522 rq.offset = htobe64(offset);
1523 rq.len = htobe32(req_len);
1524
1525 /*
1526 * Merge the cmd and connection request to prevent a write-write
1527 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1528 * second write to be performed quickly in presence of Nagle's algorithm.
1529 */
1530 memcpy(cmd_buf, &cmd, sizeof(cmd));
1531 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1532
1533 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1534 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1535 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1536 goto error_convert_status;
1537 }
1538
1539 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1540 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1541 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1542 goto error_convert_status;
1543 }
1544
1545 flags = be32toh(rp.flags);
1546 rp_status = be32toh(rp.status);
1547
1548 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1549 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1550 lttng_viewer_get_packet_return_code_string(rp_status));
1551 switch (rp_status) {
1552 case LTTNG_VIEWER_GET_PACKET_OK:
1553 req_len = be32toh(rp.len);
1554 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1555 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
1556 break;
1557 case LTTNG_VIEWER_GET_PACKET_RETRY:
1558 /* Unimplemented by relay daemon */
1559 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1560 goto end;
1561 case LTTNG_VIEWER_GET_PACKET_ERR:
1562 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1563 BT_COMP_LOGD("Marking trace as needing new metadata: "
1564 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1565 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1566 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1567 }
1568 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1569 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1570 "response=%s, response-flag=NEW_STREAM",
1571 lttng_viewer_next_index_return_code_string(rp_status));
1572 lttng_live_need_new_streams(lttng_live_msg_iter);
1573 }
1574 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1575 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1576 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1577 lttng_viewer_get_packet_return_code_string(rp_status));
1578 goto end;
1579 }
1580 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1581 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1582 goto end;
1583 case LTTNG_VIEWER_GET_PACKET_EOF:
1584 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1585 goto end;
1586 default:
1587 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1588 rp_status);
1589 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1590 goto end;
1591 }
1592
1593 if (req_len == 0) {
1594 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1595 goto end;
1596 }
1597
1598 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1599 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1600 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1601 goto error_convert_status;
1602 }
1603 *recv_len = req_len;
1604
1605 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1606 goto end;
1607
1608 error_convert_status:
1609 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1610 end:
1611 return status;
1612 }
1613
1614 /*
1615 * Request new streams for a session.
1616 */
1617 enum lttng_live_iterator_status
1618 lttng_live_session_get_new_streams(struct lttng_live_session *session,
1619 bt_self_message_iterator *self_msg_iter)
1620 {
1621 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1622 struct lttng_viewer_cmd cmd;
1623 struct lttng_viewer_new_streams_request rq;
1624 struct lttng_viewer_new_streams_response rp;
1625 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1626 enum lttng_live_viewer_status viewer_status;
1627 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1628 bt_self_component *self_comp = viewer_connection->self_comp;
1629 uint32_t streams_count;
1630 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1631 char cmd_buf[cmd_buf_len];
1632
1633 if (!session->new_streams_needed) {
1634 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1635 goto end;
1636 }
1637
1638 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1639 "session-id=%" PRIu64,
1640 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1641
1642 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1643 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1644 cmd.cmd_version = htobe32(0);
1645
1646 memset(&rq, 0, sizeof(rq));
1647 rq.session_id = htobe64(session->id);
1648
1649 /*
1650 * Merge the cmd and connection request to prevent a write-write
1651 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1652 * second write to be performed quickly in presence of Nagle's algorithm.
1653 */
1654 memcpy(cmd_buf, &cmd, sizeof(cmd));
1655 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1656
1657 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1658 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1659 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1660 status = viewer_status_to_live_iterator_status(viewer_status);
1661 goto end;
1662 }
1663
1664 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1665 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1666 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1667 status = viewer_status_to_live_iterator_status(viewer_status);
1668 goto end;
1669 }
1670
1671 streams_count = be32toh(rp.streams_count);
1672
1673 switch (be32toh(rp.status)) {
1674 case LTTNG_VIEWER_NEW_STREAMS_OK:
1675 session->new_streams_needed = false;
1676 break;
1677 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1678 session->new_streams_needed = false;
1679 goto end;
1680 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1681 session->new_streams_needed = false;
1682 session->closed = true;
1683 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1684 goto end;
1685 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1686 BT_COMP_LOGD("Received get_new_streams response: error");
1687 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1688 goto end;
1689 default:
1690 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1691 "Received get_new_streams response: Unknown:"
1692 "return code %u",
1693 be32toh(rp.status));
1694 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1695 goto end;
1696 }
1697
1698 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1699 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1700 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1701 status = viewer_status_to_live_iterator_status(viewer_status);
1702 goto end;
1703 }
1704
1705 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1706 end:
1707 return status;
1708 }
1709
1710 enum lttng_live_viewer_status live_viewer_connection_create(
1711 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1712 bt_logging_level log_level, const char *url, bool in_query,
1713 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
1714 {
1715 struct live_viewer_connection *viewer_connection;
1716 enum lttng_live_viewer_status status;
1717
1718 viewer_connection = g_new0(struct live_viewer_connection, 1);
1719
1720 if (bt_socket_init(log_level) != 0) {
1721 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1722 "Failed to init socket");
1723 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1724 goto error;
1725 }
1726
1727 viewer_connection->log_level = log_level;
1728
1729 viewer_connection->self_comp = self_comp;
1730 viewer_connection->self_comp_class = self_comp_class;
1731
1732 viewer_connection->control_sock = BT_INVALID_SOCKET;
1733 viewer_connection->port = -1;
1734 viewer_connection->in_query = in_query;
1735 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1736 viewer_connection->url = g_string_new(url);
1737 if (!viewer_connection->url) {
1738 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1739 "Failed to allocate URL buffer");
1740 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1741 goto error;
1742 }
1743
1744 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1745 "Establishing connection to url \"%s\"...", url);
1746 status = lttng_live_connect_viewer(viewer_connection);
1747 /*
1748 * Only print error and append cause in case of error. not in case of
1749 * interruption.
1750 */
1751 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1752 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1753 "Failed to establish connection: "
1754 "url=\"%s\"",
1755 url);
1756 goto error;
1757 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1758 goto error;
1759 }
1760 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1761 "Connection to url \"%s\" is established", url);
1762
1763 *viewer = viewer_connection;
1764 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1765 goto end;
1766
1767 error:
1768 if (viewer_connection) {
1769 live_viewer_connection_destroy(viewer_connection);
1770 }
1771 end:
1772 return status;
1773 }
1774
1775 void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
1776 {
1777 bt_self_component *self_comp = viewer_connection->self_comp;
1778 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1779
1780 if (!viewer_connection) {
1781 goto end;
1782 }
1783
1784 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1785 "Closing connection to relay:"
1786 "relay-url=\"%s\"",
1787 viewer_connection->url->str);
1788
1789 lttng_live_disconnect_viewer(viewer_connection);
1790
1791 if (viewer_connection->url) {
1792 g_string_free(viewer_connection->url, true);
1793 }
1794
1795 if (viewer_connection->relay_hostname) {
1796 g_string_free(viewer_connection->relay_hostname, true);
1797 }
1798
1799 if (viewer_connection->target_hostname) {
1800 g_string_free(viewer_connection->target_hostname, true);
1801 }
1802
1803 if (viewer_connection->session_name) {
1804 g_string_free(viewer_connection->session_name, true);
1805 }
1806
1807 if (viewer_connection->proto) {
1808 g_string_free(viewer_connection->proto, true);
1809 }
1810
1811 g_free(viewer_connection);
1812
1813 bt_socket_fini();
1814
1815 end:
1816 return;
1817 }
This page took 0.070431 seconds and 4 git commands to generate.