73677fa2d1b94ad6bc695b94d29e225461933aae
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
9 #define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
10 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
11 #include "logging/comp-logging.h"
12
13 #include <fcntl.h>
14 #include <stdbool.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20
21 #include <glib.h>
22
23 #include "compat/socket.h"
24 #include "compat/endian.h"
25 #include "compat/compiler.h"
26 #include "common/common.h"
27 #include <babeltrace2/babeltrace.h>
28
29 #include "lttng-live.hpp"
30 #include "viewer-connection.hpp"
31 #include "lttng-viewer-abi.hpp"
32 #include "data-stream.hpp"
33 #include "metadata.hpp"
34
35 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, \
36 _status, _action, _msg_str) \
37 do { \
38 switch (_status) { \
39 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
40 break; \
41 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
42 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, \
43 _self_comp_class, "Error " _action " " _msg_str); \
44 break; \
45 default: \
46 bt_common_abort(); \
47 } \
48 } while (0)
49
50 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
51 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
52 "sending", _msg_str)
53
54 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
55 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
56 "receiving", _msg_str)
57
58 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, \
59 _self_comp_class, _msg, _fmt, ...) \
60 do { \
61 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
62 _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \
63 } while (0)
64
65 static
66 const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
67 {
68 switch (cmd){
69 case LTTNG_VIEWER_CONNECT:
70 return "CONNECT";
71 case LTTNG_VIEWER_LIST_SESSIONS:
72 return "LIST_SESSIONS";
73 case LTTNG_VIEWER_ATTACH_SESSION:
74 return "ATTACH_SESSION";
75 case LTTNG_VIEWER_GET_NEXT_INDEX:
76 return "GET_NEXT_INDEX";
77 case LTTNG_VIEWER_GET_PACKET:
78 return "GET_PACKET";
79 case LTTNG_VIEWER_GET_METADATA:
80 return "GET_METADATA";
81 case LTTNG_VIEWER_GET_NEW_STREAMS:
82 return "GET_NEW_STREAMS";
83 case LTTNG_VIEWER_CREATE_SESSION:
84 return "CREATE_SESSION";
85 case LTTNG_VIEWER_DETACH_SESSION:
86 return "DETACH_SESSION";
87 }
88
89 bt_common_abort();
90 }
91
92 static
93 const char *lttng_viewer_next_index_return_code_string(
94 enum lttng_viewer_next_index_return_code code)
95 {
96 switch (code) {
97 case LTTNG_VIEWER_INDEX_OK:
98 return "INDEX_OK";
99 case LTTNG_VIEWER_INDEX_RETRY:
100 return "INDEX_RETRY";
101 case LTTNG_VIEWER_INDEX_HUP:
102 return "INDEX_HUP";
103 case LTTNG_VIEWER_INDEX_ERR:
104 return "INDEX_ERR";
105 case LTTNG_VIEWER_INDEX_INACTIVE:
106 return "INDEX_INACTIVE";
107 case LTTNG_VIEWER_INDEX_EOF:
108 return "INDEX_EOF";
109 }
110
111 bt_common_abort();
112 }
113
114 static
115 const char *lttng_viewer_next_index_return_code_string(uint32_t code)
116 {
117 return lttng_viewer_next_index_return_code_string(
118 (lttng_viewer_next_index_return_code) code);
119 }
120
121 static
122 const char *lttng_viewer_get_packet_return_code_string(
123 enum lttng_viewer_get_packet_return_code code)
124 {
125 switch (code) {
126 case LTTNG_VIEWER_GET_PACKET_OK:
127 return "GET_PACKET_OK";
128 case LTTNG_VIEWER_GET_PACKET_RETRY:
129 return "GET_PACKET_RETRY";
130 case LTTNG_VIEWER_GET_PACKET_ERR:
131 return "GET_PACKET_ERR";
132 case LTTNG_VIEWER_GET_PACKET_EOF:
133 return "GET_PACKET_EOF";
134 }
135
136 bt_common_abort();
137 };
138
139 static
140 const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
141 {
142 return lttng_viewer_get_packet_return_code_string(
143 (lttng_viewer_get_packet_return_code) code);
144 }
145
146 static
147 const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
148 {
149 switch (seek) {
150 case LTTNG_VIEWER_SEEK_BEGINNING:
151 return "SEEK_BEGINNING";
152 case LTTNG_VIEWER_SEEK_LAST:
153 return "SEEK_LAST";
154 }
155
156 bt_common_abort();
157 }
158
159 static inline
160 enum lttng_live_iterator_status viewer_status_to_live_iterator_status(
161 enum lttng_live_viewer_status viewer_status)
162 {
163 switch (viewer_status) {
164 case LTTNG_LIVE_VIEWER_STATUS_OK:
165 return LTTNG_LIVE_ITERATOR_STATUS_OK;
166 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
167 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
168 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
169 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
170 }
171
172 bt_common_abort();
173 }
174
175 static inline
176 enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status(
177 enum lttng_live_viewer_status viewer_status)
178 {
179 switch (viewer_status) {
180 case LTTNG_LIVE_VIEWER_STATUS_OK:
181 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
182 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
183 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
184 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
185 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
186 }
187
188 bt_common_abort();
189 }
190
191 static inline
192 void viewer_connection_close_socket(
193 struct live_viewer_connection *viewer_connection)
194 {
195 bt_self_component_class *self_comp_class =
196 viewer_connection->self_comp_class;
197 bt_self_component *self_comp =
198 viewer_connection->self_comp;
199 int ret = bt_socket_close(viewer_connection->control_sock);
200 if (ret == -1) {
201 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(
202 self_comp, self_comp_class,
203 "Error closing viewer connection socket: ", ".");
204 }
205
206 viewer_connection->control_sock = BT_INVALID_SOCKET;
207 }
208
209 /*
210 * This function receives a message from the Relay daemon.
211 * If it received the entire message, it returns _OK,
212 * If it's interrupted, it returns _INTERRUPTED,
213 * otherwise, it returns _ERROR.
214 */
215 static
216 enum lttng_live_viewer_status lttng_live_recv(
217 struct live_viewer_connection *viewer_connection,
218 void *buf, size_t len)
219 {
220 ssize_t received;
221 bt_self_component_class *self_comp_class =
222 viewer_connection->self_comp_class;
223 bt_self_component *self_comp =
224 viewer_connection->self_comp;
225 size_t total_received = 0, to_receive = len;
226 struct lttng_live_msg_iter *lttng_live_msg_iter =
227 viewer_connection->lttng_live_msg_iter;
228 enum lttng_live_viewer_status status;
229 BT_SOCKET sock = viewer_connection->control_sock;
230
231 /*
232 * Receive a message from the Relay.
233 */
234 do {
235 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
236 if (received == BT_SOCKET_ERROR) {
237 if (bt_socket_interrupted()) {
238 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
239 /*
240 * This interruption was due to a
241 * SIGINT and the graph is being torn
242 * down.
243 */
244 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
245 lttng_live_msg_iter->was_interrupted = true;
246 goto end;
247 } else {
248 /*
249 * A signal was received, but the graph
250 * is not being torn down. Carry on.
251 */
252 continue;
253 }
254 } else {
255 /*
256 * For any other types of socket error, close
257 * the socket and return an error.
258 */
259 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
260 self_comp, self_comp_class,
261 "Error receiving from Relay", ".");
262
263 viewer_connection_close_socket(viewer_connection);
264 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
265 goto end;
266 }
267 } else if (received == 0) {
268 /*
269 * The recv() call returned 0. This means the
270 * connection was orderly shutdown from the other peer.
271 * If that happens when we are trying to receive
272 * a message from it, it means something when wrong.
273 * Close the socket and return an error.
274 */
275 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
276 self_comp_class, "Remote side has closed connection");
277 viewer_connection_close_socket(viewer_connection);
278 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
279 goto end;
280 }
281
282 BT_ASSERT(received <= to_receive);
283 total_received += received;
284 to_receive -= received;
285
286 } while (to_receive > 0);
287
288 BT_ASSERT(total_received == len);
289 status = LTTNG_LIVE_VIEWER_STATUS_OK;
290
291 end:
292 return status;
293 }
294
295 /*
296 * This function sends a message to the Relay daemon.
297 * If it send the message, it returns _OK,
298 * If it's interrupted, it returns _INTERRUPTED,
299 * otherwise, it returns _ERROR.
300 */
301 static
302 enum lttng_live_viewer_status lttng_live_send(
303 struct live_viewer_connection *viewer_connection,
304 const void *buf, size_t len)
305 {
306 enum lttng_live_viewer_status status;
307 bt_self_component_class *self_comp_class =
308 viewer_connection->self_comp_class;
309 bt_self_component *self_comp =
310 viewer_connection->self_comp;
311 struct lttng_live_msg_iter *lttng_live_msg_iter =
312 viewer_connection->lttng_live_msg_iter;
313 BT_SOCKET sock = viewer_connection->control_sock;
314 size_t to_send = len;
315 ssize_t total_sent = 0;
316
317 do {
318 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent,
319 to_send);
320 if (sent == BT_SOCKET_ERROR) {
321 if (bt_socket_interrupted()) {
322 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
323 /*
324 * This interruption was a SIGINT and
325 * the graph is being teared down.
326 */
327 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
328 lttng_live_msg_iter->was_interrupted = true;
329 goto end;
330 } else {
331 /*
332 * A signal was received, but the graph
333 * is not being teared down. Carry on.
334 */
335 continue;
336 }
337 } else {
338 /*
339 * For any other types of socket error, close
340 * the socket and return an error.
341 */
342 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
343 self_comp, self_comp_class,
344 "Error sending to Relay", ".");
345
346 viewer_connection_close_socket(viewer_connection);
347 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
348 goto end;
349 }
350 }
351
352 BT_ASSERT(sent <= to_send);
353 total_sent += sent;
354 to_send -= sent;
355
356 } while (to_send > 0);
357
358 BT_ASSERT(total_sent == len);
359 status = LTTNG_LIVE_VIEWER_STATUS_OK;
360
361 end:
362 return status;
363 }
364
365 static
366 int parse_url(struct live_viewer_connection *viewer_connection)
367 {
368 char error_buf[256] = { 0 };
369 bt_self_component *self_comp = viewer_connection->self_comp;
370 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
371 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
372 int ret = -1;
373 const char *path = viewer_connection->url->str;
374
375 if (!path) {
376 goto end;
377 }
378
379 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
380 sizeof(error_buf));
381 if (!lttng_live_url_parts.proto) {
382 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
383 self_comp_class,"Invalid LTTng live URL format: %s",
384 error_buf);
385 goto end;
386 }
387 viewer_connection->proto = lttng_live_url_parts.proto;
388 lttng_live_url_parts.proto = NULL;
389
390 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
391 lttng_live_url_parts.hostname = NULL;
392
393 if (lttng_live_url_parts.port >= 0) {
394 viewer_connection->port = lttng_live_url_parts.port;
395 } else {
396 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
397 }
398
399 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
400 lttng_live_url_parts.target_hostname = NULL;
401
402 if (lttng_live_url_parts.session_name) {
403 viewer_connection->session_name = lttng_live_url_parts.session_name;
404 lttng_live_url_parts.session_name = NULL;
405 }
406
407 ret = 0;
408
409 end:
410 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
411 return ret;
412 }
413
414 static
415 enum lttng_live_viewer_status lttng_live_handshake(
416 struct live_viewer_connection *viewer_connection)
417 {
418 struct lttng_viewer_cmd cmd;
419 struct lttng_viewer_connect connect;
420 enum lttng_live_viewer_status status;
421 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
422 bt_self_component *self_comp = viewer_connection->self_comp;
423 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
424 char cmd_buf[cmd_buf_len];
425
426 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
427 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
428 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR,
429 LTTNG_LIVE_MINOR);
430
431 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
432 cmd.data_size = htobe64((uint64_t) sizeof(connect));
433 cmd.cmd_version = htobe32(0);
434
435 connect.viewer_session_id = -1ULL; /* will be set on recv */
436 connect.major = htobe32(LTTNG_LIVE_MAJOR);
437 connect.minor = htobe32(LTTNG_LIVE_MINOR);
438 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
439
440 /*
441 * Merge the cmd and connection request to prevent a write-write
442 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
443 * second write to be performed quickly in presence of Nagle's algorithm
444 */
445 memcpy(cmd_buf, &cmd, sizeof(cmd));
446 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
447
448 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
449 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
450 viewer_handle_send_status(self_comp, self_comp_class,
451 status, "viewer connect command");
452 goto end;
453 }
454
455 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
456 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
457 viewer_handle_recv_status(self_comp, self_comp_class,
458 status, "viewer connect reply");
459 goto end;
460 }
461
462 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class,
463 "Received viewer session ID : %" PRIu64,
464 (uint64_t) be64toh(connect.viewer_session_id));
465 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class,
466 "Relayd version : %u.%u", be32toh(connect.major),
467 be32toh(connect.minor));
468
469 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
470 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
471 self_comp_class, "Incompatible lttng-relayd protocol");
472 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
473 goto end;
474 }
475 /* Use the smallest protocol version implemented. */
476 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
477 viewer_connection->minor = be32toh(connect.minor);
478 } else {
479 viewer_connection->minor = LTTNG_LIVE_MINOR;
480 }
481 viewer_connection->major = LTTNG_LIVE_MAJOR;
482
483 status = LTTNG_LIVE_VIEWER_STATUS_OK;
484
485 goto end;
486
487 end:
488 return status;
489 }
490
491 static
492 enum lttng_live_viewer_status lttng_live_connect_viewer(
493 struct live_viewer_connection *viewer_connection)
494 {
495 struct hostent *host;
496 struct sockaddr_in server_addr;
497 enum lttng_live_viewer_status status;
498 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
499 bt_self_component *self_comp = viewer_connection->self_comp;
500
501 if (parse_url(viewer_connection)) {
502 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
503 self_comp_class, "Failed to parse URL");
504 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
505 goto error;
506 }
507
508 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
509 "Connecting to hostname : %s, port : %d, "
510 "target hostname : %s, session name : %s, proto : %s",
511 viewer_connection->relay_hostname->str,
512 viewer_connection->port,
513 !viewer_connection->target_hostname ?
514 "<none>" : viewer_connection->target_hostname->str,
515 !viewer_connection->session_name ?
516 "<none>" : viewer_connection->session_name->str,
517 viewer_connection->proto->str);
518
519 host = gethostbyname(viewer_connection->relay_hostname->str);
520 if (!host) {
521 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
522 self_comp_class, "Cannot lookup hostname: hostname=\"%s\"",
523 viewer_connection->relay_hostname->str);
524 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
525 goto error;
526 }
527
528 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
529 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
530 self_comp_class, "Socket creation failed: %s", bt_socket_errormsg());
531 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
532 goto error;
533 }
534
535 server_addr.sin_family = AF_INET;
536 server_addr.sin_port = htons(viewer_connection->port);
537 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
538 memset(&(server_addr.sin_zero), 0, 8);
539
540 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
541 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
543 self_comp_class, "Connection failed: %s",
544 bt_socket_errormsg());
545 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
546 goto error;
547 }
548
549 status = lttng_live_handshake(viewer_connection);
550
551 /*
552 * Only print error and append cause in case of error. not in case of
553 * interruption.
554 */
555 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
556 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
557 self_comp_class, "Viewer handshake failed");
558 goto error;
559 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
560 goto end;
561 }
562
563 goto end;
564
565 error:
566 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
567 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
568 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class,
569 "Error closing socket: %s.", bt_socket_errormsg());
570 }
571 }
572 viewer_connection->control_sock = BT_INVALID_SOCKET;
573 end:
574 return status;
575 }
576
577 static
578 void lttng_live_disconnect_viewer(
579 struct live_viewer_connection *viewer_connection)
580 {
581 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
582 bt_self_component *self_comp = viewer_connection->self_comp;
583
584 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
585 return;
586 }
587 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
588 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class,
589 "Error closing socket: %s", bt_socket_errormsg());
590 viewer_connection->control_sock = BT_INVALID_SOCKET;
591 }
592 }
593
594 static
595 int list_update_session(bt_value *results,
596 const struct lttng_viewer_session *session,
597 bool *_found, struct live_viewer_connection *viewer_connection)
598 {
599 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
600 bt_self_component *self_comp = viewer_connection->self_comp;
601 int ret = 0;
602 uint64_t i, len;
603 bt_value *map = NULL;
604 bt_value *hostname = NULL;
605 bt_value *session_name = NULL;
606 bt_value *btval = NULL;
607 bool found = false;
608
609 len = bt_value_array_get_length(results);
610 for (i = 0; i < len; i++) {
611 const char *hostname_str = NULL;
612 const char *session_name_str = NULL;
613
614 map = bt_value_array_borrow_element_by_index(results, i);
615 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
616 if (!hostname) {
617 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
618 self_comp_class,
619 "Error borrowing \"target-hostname\" entry.");
620 ret = -1;
621 goto end;
622 }
623 session_name = bt_value_map_borrow_entry_value(map, "session-name");
624 if (!session_name) {
625 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
626 self_comp_class,
627 "Error borrowing \"session-name\" entry.");
628 ret = -1;
629 goto end;
630 }
631 hostname_str = bt_value_string_get(hostname);
632 session_name_str = bt_value_string_get(session_name);
633
634 if (strcmp(session->hostname, hostname_str) == 0
635 && strcmp(session->session_name, session_name_str) == 0) {
636 int64_t val;
637 uint32_t streams = be32toh(session->streams);
638 uint32_t clients = be32toh(session->clients);
639
640 found = true;
641
642 btval = bt_value_map_borrow_entry_value(map, "stream-count");
643 if (!btval) {
644 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
645 self_comp, self_comp_class,
646 "Error borrowing \"stream-count\" entry.");
647 ret = -1;
648 goto end;
649 }
650 val = bt_value_integer_unsigned_get(btval);
651 /* sum */
652 val += streams;
653 bt_value_integer_unsigned_set(btval, val);
654
655 btval = bt_value_map_borrow_entry_value(map, "client-count");
656 if (!btval) {
657 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
658 self_comp, self_comp_class,
659 "Error borrowing \"client-count\" entry.");
660 ret = -1;
661 goto end;
662 }
663 val = bt_value_integer_unsigned_get(btval);
664 /* max */
665 val = bt_max_t(int64_t, clients, val);
666 bt_value_integer_unsigned_set(btval, val);
667 }
668
669 if (found) {
670 break;
671 }
672 }
673 end:
674 *_found = found;
675 return ret;
676 }
677
678 static
679 int list_append_session(bt_value *results,
680 GString *base_url,
681 const struct lttng_viewer_session *session,
682 struct live_viewer_connection *viewer_connection)
683 {
684 int ret = 0;
685 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
686 bt_value_map_insert_entry_status insert_status;
687 bt_value_array_append_element_status append_status;
688 bt_value *map = NULL;
689 GString *url = NULL;
690 bool found = false;
691
692 /*
693 * If the session already exists, add the stream count to it,
694 * and do max of client counts.
695 */
696 ret = list_update_session(results, session, &found, viewer_connection);
697 if (ret || found) {
698 goto end;
699 }
700
701 map = bt_value_map_create();
702 if (!map) {
703 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
704 "Error creating map value.");
705 ret = -1;
706 goto end;
707 }
708
709 if (base_url->len < 1) {
710 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
711 "Error: base_url length smaller than 1.");
712 ret = -1;
713 goto end;
714 }
715 /*
716 * key = "url",
717 * value = <string>,
718 */
719 url = g_string_new(base_url->str);
720 g_string_append(url, "/host/");
721 g_string_append(url, session->hostname);
722 g_string_append_c(url, '/');
723 g_string_append(url, session->session_name);
724
725 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
726 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
727 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
728 "Error inserting \"url\" entry.");
729 ret = -1;
730 goto end;
731 }
732
733 /*
734 * key = "target-hostname",
735 * value = <string>,
736 */
737 insert_status = bt_value_map_insert_string_entry(map, "target-hostname",
738 session->hostname);
739 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
740 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
741 "Error inserting \"target-hostname\" entry.");
742 ret = -1;
743 goto end;
744 }
745
746 /*
747 * key = "session-name",
748 * value = <string>,
749 */
750 insert_status = bt_value_map_insert_string_entry(map, "session-name",
751 session->session_name);
752 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
753 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
754 "Error inserting \"session-name\" entry.");
755 ret = -1;
756 goto end;
757 }
758
759 /*
760 * key = "timer-us",
761 * value = <integer>,
762 */
763 {
764 uint32_t live_timer = be32toh(session->live_timer);
765
766 insert_status = bt_value_map_insert_unsigned_integer_entry(
767 map, "timer-us", live_timer);
768 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
769 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
770 "Error inserting \"timer-us\" entry.");
771 ret = -1;
772 goto end;
773 }
774 }
775
776 /*
777 * key = "stream-count",
778 * value = <integer>,
779 */
780 {
781 uint32_t streams = be32toh(session->streams);
782
783 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
784 "stream-count", streams);
785 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
786 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
787 "Error inserting \"stream-count\" entry.");
788 ret = -1;
789 goto end;
790 }
791 }
792
793 /*
794 * key = "client-count",
795 * value = <integer>,
796 */
797 {
798 uint32_t clients = be32toh(session->clients);
799
800 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
801 "client-count", clients);
802 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
803 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
804 "Error inserting \"client-count\" entry.");
805 ret = -1;
806 goto end;
807 }
808 }
809
810 append_status = bt_value_array_append_element(results, map);
811 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
812 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
813 "Error appending map to results.");
814 ret = -1;
815 }
816
817 end:
818 if (url) {
819 g_string_free(url, true);
820 }
821 BT_VALUE_PUT_REF_AND_RESET(map);
822 return ret;
823 }
824
825 /*
826 * Data structure returned:
827 *
828 * {
829 * <array> = {
830 * [n] = {
831 * <map> = {
832 * {
833 * key = "url",
834 * value = <string>,
835 * },
836 * {
837 * key = "target-hostname",
838 * value = <string>,
839 * },
840 * {
841 * key = "session-name",
842 * value = <string>,
843 * },
844 * {
845 * key = "timer-us",
846 * value = <integer>,
847 * },
848 * {
849 * key = "stream-count",
850 * value = <integer>,
851 * },
852 * {
853 * key = "client-count",
854 * value = <integer>,
855 * },
856 * },
857 * }
858 * }
859 */
860
861 BT_HIDDEN
862 bt_component_class_query_method_status live_viewer_connection_list_sessions(
863 struct live_viewer_connection *viewer_connection,
864 const bt_value **user_result)
865 {
866 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
867 bt_component_class_query_method_status status =
868 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
869 bt_value *result = NULL;
870 enum lttng_live_viewer_status viewer_status;
871 struct lttng_viewer_cmd cmd;
872 struct lttng_viewer_list_sessions list;
873 uint32_t i, sessions_count;
874
875 result = bt_value_array_create();
876 if (!result) {
877 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
878 "Error creating array");
879 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
880 goto error;
881 }
882
883 BT_LOGD("Requesting list of sessions: cmd=%s",
884 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
885
886 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
887 cmd.data_size = htobe64((uint64_t) 0);
888 cmd.cmd_version = htobe32(0);
889
890 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
891 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
892 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
893 "Error sending list sessions command");
894 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
895 goto error;
896 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
897 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
898 goto error;
899 }
900
901 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
902 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
903 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
904 "Error receiving session list");
905 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
906 goto error;
907 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
908 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
909 goto error;
910 }
911
912 sessions_count = be32toh(list.sessions_count);
913 for (i = 0; i < sessions_count; i++) {
914 struct lttng_viewer_session lsession;
915
916 viewer_status = lttng_live_recv(viewer_connection, &lsession,
917 sizeof(lsession));
918 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
919 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
920 "Error receiving session:");
921 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
922 goto error;
923 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
924 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
925 goto error;
926 }
927
928 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
929 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
930 if (list_append_session(result, viewer_connection->url,
931 &lsession, viewer_connection)) {
932 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
933 "Error appending session");
934 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
935 goto error;
936 }
937 }
938
939 *user_result = result;
940 goto end;
941 error:
942 BT_VALUE_PUT_REF_AND_RESET(result);
943 end:
944 return status;
945 }
946
947 static
948 enum lttng_live_viewer_status lttng_live_query_session_ids(
949 struct lttng_live_msg_iter *lttng_live_msg_iter)
950 {
951 struct lttng_viewer_cmd cmd;
952 struct lttng_viewer_list_sessions list;
953 struct lttng_viewer_session lsession;
954 uint32_t i, sessions_count;
955 uint64_t session_id;
956 enum lttng_live_viewer_status status;
957 struct live_viewer_connection *viewer_connection =
958 lttng_live_msg_iter->viewer_connection;
959 bt_self_component *self_comp = viewer_connection->self_comp;
960 bt_self_component_class *self_comp_class =
961 viewer_connection->self_comp_class;
962
963 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
964 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
965
966 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
967 cmd.data_size = htobe64((uint64_t) 0);
968 cmd.cmd_version = htobe32(0);
969
970 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
971 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
972 viewer_handle_send_status(self_comp, self_comp_class,
973 status, "list sessions command");
974 goto end;
975 }
976
977 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
978 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
979 viewer_handle_recv_status(self_comp, self_comp_class,
980 status, "session list reply");
981 goto end;
982 }
983
984 sessions_count = be32toh(list.sessions_count);
985 for (i = 0; i < sessions_count; i++) {
986 status = lttng_live_recv(viewer_connection, &lsession,
987 sizeof(lsession));
988 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
989 viewer_handle_recv_status(self_comp, self_comp_class,
990 status, "session reply");
991 goto end;
992 }
993 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
994 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
995 session_id = be64toh(lsession.id);
996
997 BT_COMP_LOGI("Adding session to internal list: "
998 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
999 session_id, lsession.hostname, lsession.session_name);
1000
1001 if ((strncmp(lsession.session_name,
1002 viewer_connection->session_name->str,
1003 LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
1004 viewer_connection->target_hostname->str,
1005 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
1006
1007 if (lttng_live_add_session(lttng_live_msg_iter, session_id,
1008 lsession.hostname,
1009 lsession.session_name)) {
1010 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1011 "Failed to add live session");
1012 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1013 goto end;
1014 }
1015 }
1016 }
1017
1018 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1019
1020 end:
1021 return status;
1022 }
1023
1024 BT_HIDDEN
1025 enum lttng_live_viewer_status lttng_live_create_viewer_session(
1026 struct lttng_live_msg_iter *lttng_live_msg_iter)
1027 {
1028 struct lttng_viewer_cmd cmd;
1029 struct lttng_viewer_create_session_response resp;
1030 enum lttng_live_viewer_status status;
1031 struct live_viewer_connection *viewer_connection =
1032 lttng_live_msg_iter->viewer_connection;
1033 bt_self_component *self_comp = viewer_connection->self_comp;
1034 bt_self_component_class *self_comp_class =
1035 viewer_connection->self_comp_class;
1036
1037 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1038 "Creating a viewer session: cmd=%s",
1039 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
1040
1041 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1042 cmd.data_size = htobe64((uint64_t) 0);
1043 cmd.cmd_version = htobe32(0);
1044
1045 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1046 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1047 viewer_handle_send_status(self_comp, self_comp_class,
1048 status, "create session command");
1049 goto end;
1050 }
1051
1052 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
1053 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1054 viewer_handle_recv_status(self_comp, self_comp_class,
1055 status, "create session reply");
1056 goto end;
1057 }
1058
1059 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1060 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1061 "Error creating viewer session");
1062 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1063 goto end;
1064 }
1065
1066 status = lttng_live_query_session_ids(lttng_live_msg_iter);
1067 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1068 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1069 "Failed to query live viewer session ids");
1070 goto end;
1071 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1072 goto end;
1073 }
1074
1075 end:
1076 return status;
1077 }
1078
1079 static
1080 enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
1081 uint32_t stream_count,
1082 bt_self_message_iterator *self_msg_iter)
1083 {
1084 uint32_t i;
1085 struct lttng_live_msg_iter *lttng_live_msg_iter =
1086 session->lttng_live_msg_iter;
1087 enum lttng_live_viewer_status status;
1088 struct live_viewer_connection *viewer_connection =
1089 lttng_live_msg_iter->viewer_connection;
1090 bt_self_component *self_comp = viewer_connection->self_comp;
1091
1092 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
1093 for (i = 0; i < stream_count; i++) {
1094 struct lttng_viewer_stream stream;
1095 struct lttng_live_stream_iterator *live_stream;
1096 uint64_t stream_id;
1097 uint64_t ctf_trace_id;
1098
1099 status = lttng_live_recv(viewer_connection, &stream,
1100 sizeof(stream));
1101 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1102 viewer_handle_recv_status(self_comp, NULL,
1103 status, "stream reply");
1104 goto end;
1105 }
1106 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1107 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1108 stream_id = be64toh(stream.id);
1109 ctf_trace_id = be64toh(stream.ctf_trace_id);
1110
1111 if (stream.metadata_flag) {
1112 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
1113 stream_id, stream.path_name, stream.channel_name);
1114 if (lttng_live_metadata_create_stream(session,
1115 ctf_trace_id, stream_id,
1116 stream.path_name)) {
1117 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1118 "Error creating metadata stream");
1119 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1120 goto end;
1121 }
1122 session->lazy_stream_msg_init = true;
1123 } else {
1124 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
1125 stream_id, stream.path_name, stream.channel_name);
1126 live_stream = lttng_live_stream_iterator_create(session,
1127 ctf_trace_id, stream_id, self_msg_iter);
1128 if (!live_stream) {
1129 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1130 "Error creating stream");
1131 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1132 goto end;
1133 }
1134 }
1135 }
1136 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1137
1138 end:
1139 return status;
1140 }
1141
1142 BT_HIDDEN
1143 enum lttng_live_viewer_status lttng_live_session_attach(
1144 struct lttng_live_session *session,
1145 bt_self_message_iterator *self_msg_iter)
1146 {
1147 struct lttng_viewer_cmd cmd;
1148 enum lttng_live_viewer_status status;
1149 struct lttng_viewer_attach_session_request rq;
1150 struct lttng_viewer_attach_session_response rp;
1151 struct lttng_live_msg_iter *lttng_live_msg_iter =
1152 session->lttng_live_msg_iter;
1153 struct live_viewer_connection *viewer_connection =
1154 lttng_live_msg_iter->viewer_connection;
1155 bt_self_component *self_comp = viewer_connection->self_comp;
1156 uint64_t session_id = session->id;
1157 uint32_t streams_count;
1158 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1159 char cmd_buf[cmd_buf_len];
1160
1161 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64
1162 ", seek=%s",
1163 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION),
1164 session_id,
1165 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1166
1167 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1168 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1169 cmd.cmd_version = htobe32(0);
1170
1171 memset(&rq, 0, sizeof(rq));
1172 rq.session_id = htobe64(session_id);
1173 // TODO: add cmd line parameter to select seek beginning
1174 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1175 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1176
1177 /*
1178 * Merge the cmd and connection request to prevent a write-write
1179 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1180 * second write to be performed quickly in presence of Nagle's algorithm.
1181 */
1182 memcpy(cmd_buf, &cmd, sizeof(cmd));
1183 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1184 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1185 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1186 viewer_handle_send_status(self_comp, NULL,
1187 status, "attach session command");
1188 goto end;
1189 }
1190
1191 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1192 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1193 viewer_handle_recv_status(self_comp, NULL,
1194 status, "attach session reply");
1195 goto end;
1196 }
1197
1198 streams_count = be32toh(rp.streams_count);
1199 switch(be32toh(rp.status)) {
1200 case LTTNG_VIEWER_ATTACH_OK:
1201 break;
1202 case LTTNG_VIEWER_ATTACH_UNK:
1203 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1204 "Session id %" PRIu64 " is unknown", session_id);
1205 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1206 goto end;
1207 case LTTNG_VIEWER_ATTACH_ALREADY:
1208 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1209 "There is already a viewer attached to this session");
1210 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1211 goto end;
1212 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1213 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1214 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1215 goto end;
1216 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1217 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1218 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1219 goto end;
1220 default:
1221 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1222 "Unknown attach return code %u", be32toh(rp.status));
1223 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1224 goto end;
1225 }
1226
1227 /* We receive the initial list of streams. */
1228 status = receive_streams(session, streams_count, self_msg_iter);
1229 switch (status) {
1230 case LTTNG_LIVE_VIEWER_STATUS_OK:
1231 break;
1232 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1233 goto end;
1234 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1235 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1236 goto end;
1237 default:
1238 bt_common_abort();
1239 }
1240
1241 session->attached = true;
1242 session->new_streams_needed = false;
1243
1244 end:
1245 return status;
1246 }
1247
1248 BT_HIDDEN
1249 enum lttng_live_viewer_status lttng_live_session_detach(
1250 struct lttng_live_session *session)
1251 {
1252 struct lttng_viewer_cmd cmd;
1253 enum lttng_live_viewer_status status;
1254 struct lttng_viewer_detach_session_request rq;
1255 struct lttng_viewer_detach_session_response rp;
1256 struct lttng_live_msg_iter *lttng_live_msg_iter =
1257 session->lttng_live_msg_iter;
1258 bt_self_component *self_comp = session->self_comp;
1259 struct live_viewer_connection *viewer_connection =
1260 lttng_live_msg_iter->viewer_connection;
1261 uint64_t session_id = session->id;
1262 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1263 char cmd_buf[cmd_buf_len];
1264
1265 /*
1266 * The session might already be detached and the viewer socket might
1267 * already been closed. This happens when calling this function when
1268 * tearing down the graph after an error.
1269 */
1270 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1271 return LTTNG_LIVE_VIEWER_STATUS_OK;
1272 }
1273
1274 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1275 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION),
1276 session_id);
1277
1278 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1279 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1280 cmd.cmd_version = htobe32(0);
1281
1282 memset(&rq, 0, sizeof(rq));
1283 rq.session_id = htobe64(session_id);
1284
1285 /*
1286 * Merge the cmd and connection request to prevent a write-write
1287 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1288 * second write to be performed quickly in presence of Nagle's algorithm.
1289 */
1290 memcpy(cmd_buf, &cmd, sizeof(cmd));
1291 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1292 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1293 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1294 viewer_handle_send_status(self_comp, NULL,
1295 status, "detach session command");
1296 goto end;
1297 }
1298
1299 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1300 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1301 viewer_handle_recv_status(self_comp, NULL,
1302 status, "detach session reply");
1303 goto end;
1304 }
1305
1306 switch(be32toh(rp.status)) {
1307 case LTTNG_VIEWER_DETACH_SESSION_OK:
1308 break;
1309 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1310 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1311 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1312 goto end;
1313 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1314 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1315 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1316 goto end;
1317 default:
1318 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1319 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1320 goto end;
1321 }
1322
1323 session->attached = false;
1324
1325 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1326
1327 end:
1328 return status;
1329 }
1330
1331 BT_HIDDEN
1332 enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
1333 struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1334 {
1335 uint64_t len = 0;
1336 enum lttng_live_get_one_metadata_status status;
1337 enum lttng_live_viewer_status viewer_status;
1338 struct lttng_viewer_cmd cmd;
1339 struct lttng_viewer_get_metadata rq;
1340 struct lttng_viewer_metadata_packet rp;
1341 gchar *data = NULL;
1342 ssize_t writelen;
1343 struct lttng_live_session *session = trace->session;
1344 struct lttng_live_msg_iter *lttng_live_msg_iter =
1345 session->lttng_live_msg_iter;
1346 struct lttng_live_metadata *metadata = trace->metadata;
1347 struct live_viewer_connection *viewer_connection =
1348 lttng_live_msg_iter->viewer_connection;
1349 bt_self_component *self_comp = viewer_connection->self_comp;
1350 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1351 char cmd_buf[cmd_buf_len];
1352
1353 BT_COMP_LOGD("Requesting new metadata for trace:"
1354 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1355 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA),
1356 trace->id, metadata->stream_id);
1357
1358 rq.stream_id = htobe64(metadata->stream_id);
1359 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1360 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1361 cmd.cmd_version = htobe32(0);
1362
1363 /*
1364 * Merge the cmd and connection request to prevent a write-write
1365 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1366 * second write to be performed quickly in presence of Nagle's algorithm.
1367 */
1368 memcpy(cmd_buf, &cmd, sizeof(cmd));
1369 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1370 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1371 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1372 viewer_handle_send_status(self_comp, NULL,
1373 viewer_status, "get metadata command");
1374 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1375 goto end;
1376 }
1377
1378 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1379 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1380 viewer_handle_recv_status(self_comp, NULL,
1381 viewer_status, "get metadata reply");
1382 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1383 goto end;
1384 }
1385
1386 switch (be32toh(rp.status)) {
1387 case LTTNG_VIEWER_METADATA_OK:
1388 BT_COMP_LOGD("Received get_metadata response: ok");
1389 break;
1390 case LTTNG_VIEWER_NO_NEW_METADATA:
1391 BT_COMP_LOGD("Received get_metadata response: no new");
1392 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1393 goto end;
1394 case LTTNG_VIEWER_METADATA_ERR:
1395 /*
1396 * The Relayd cannot find this stream id. Maybe its
1397 * gone already. This can happen in short lived UST app
1398 * in a per-pid session.
1399 */
1400 BT_COMP_LOGD("Received get_metadata response: error");
1401 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1402 goto end;
1403 default:
1404 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1405 "Received get_metadata response: unknown");
1406 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1407 goto end;
1408 }
1409
1410 len = be64toh(rp.len);
1411 BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
1412 if (len <= 0) {
1413 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1414 "Erroneous response length");
1415 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1416 goto end;
1417 }
1418
1419 data = g_new0(gchar, len);
1420 if (!data) {
1421 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
1422 "Failed to allocate data buffer", ".");
1423 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1424 goto end;
1425 }
1426
1427 viewer_status = lttng_live_recv(viewer_connection, data, len);
1428 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1429 viewer_handle_recv_status(self_comp, NULL,
1430 viewer_status, "get metadata packet");
1431 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1432 goto end;
1433 }
1434
1435 /*
1436 * Write the metadata to the file handle.
1437 */
1438 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1439 if (writelen != len) {
1440 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1441 "Writing in the metadata file stream");
1442 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1443 goto end;
1444 }
1445
1446 *reply_len = len;
1447 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1448
1449 end:
1450 g_free(data);
1451 return status;
1452 }
1453
1454 /*
1455 * Assign the fields from a lttng_viewer_index to a packet_index.
1456 */
1457 static
1458 void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1459 struct packet_index *pindex)
1460 {
1461 BT_ASSERT(lindex);
1462 BT_ASSERT(pindex);
1463
1464 pindex->offset = be64toh(lindex->offset);
1465 pindex->packet_size = be64toh(lindex->packet_size);
1466 pindex->content_size = be64toh(lindex->content_size);
1467 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1468 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1469 pindex->events_discarded = be64toh(lindex->events_discarded);
1470 }
1471
1472 static
1473 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1474 {
1475 uint64_t session_idx;
1476 struct live_viewer_connection *viewer_connection =
1477 lttng_live_msg_iter->viewer_connection;
1478
1479 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
1480 session_idx++) {
1481 struct lttng_live_session *session =
1482 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1483 BT_COMP_LOGD("Marking session as needing new streams: "
1484 "session-id=%" PRIu64, session->id);
1485 session->new_streams_needed = true;
1486 }
1487 }
1488
1489 BT_HIDDEN
1490 enum lttng_live_iterator_status lttng_live_get_next_index(
1491 struct lttng_live_msg_iter *lttng_live_msg_iter,
1492 struct lttng_live_stream_iterator *stream,
1493 struct packet_index *index)
1494 {
1495 struct lttng_viewer_cmd cmd;
1496 struct lttng_viewer_get_next_index rq;
1497 enum lttng_live_viewer_status viewer_status;
1498 struct lttng_viewer_index rp;
1499 enum lttng_live_iterator_status status;
1500 struct live_viewer_connection *viewer_connection =
1501 lttng_live_msg_iter->viewer_connection;
1502 bt_self_component *self_comp = viewer_connection->self_comp;
1503 struct lttng_live_trace *trace = stream->trace;
1504 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1505 char cmd_buf[cmd_buf_len];
1506 uint32_t flags, rp_status;
1507
1508 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1509 "viewer-stream-id=%" PRIu64,
1510 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1511 stream->viewer_stream_id);
1512 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1513 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1514 cmd.cmd_version = htobe32(0);
1515
1516 memset(&rq, 0, sizeof(rq));
1517 rq.stream_id = htobe64(stream->viewer_stream_id);
1518
1519 /*
1520 * Merge the cmd and connection request to prevent a write-write
1521 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1522 * second write to be performed quickly in presence of Nagle's algorithm.
1523 */
1524 memcpy(cmd_buf, &cmd, sizeof(cmd));
1525 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1526
1527 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1528 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1529 viewer_handle_send_status(self_comp, NULL,
1530 viewer_status, "get next index command");
1531 goto error;
1532 }
1533
1534 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1535 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1536 viewer_handle_recv_status(self_comp, NULL,
1537 viewer_status, "get next index reply");
1538 goto error;
1539 }
1540
1541 flags = be32toh(rp.flags);
1542 rp_status = be32toh(rp.status);
1543
1544 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1545 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1546 lttng_viewer_next_index_return_code_string(rp_status));
1547 switch (rp_status) {
1548 case LTTNG_VIEWER_INDEX_INACTIVE:
1549 {
1550 uint64_t ctf_stream_class_id;
1551
1552 memset(index, 0, sizeof(struct packet_index));
1553 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1554 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1555 ctf_stream_class_id = be64toh(rp.stream_id);
1556 if (stream->ctf_stream_class_id.is_set) {
1557 BT_ASSERT(stream->ctf_stream_class_id.value==
1558 ctf_stream_class_id);
1559 } else {
1560 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1561 stream->ctf_stream_class_id.is_set = true;
1562 }
1563 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1564 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1565 break;
1566 }
1567 case LTTNG_VIEWER_INDEX_OK:
1568 {
1569 uint64_t ctf_stream_class_id;
1570
1571 lttng_index_to_packet_index(&rp, index);
1572 ctf_stream_class_id = be64toh(rp.stream_id);
1573 if (stream->ctf_stream_class_id.is_set) {
1574 BT_ASSERT(stream->ctf_stream_class_id.value==
1575 ctf_stream_class_id);
1576 } else {
1577 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1578 stream->ctf_stream_class_id.is_set = true;
1579 }
1580
1581 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1582
1583 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1584 BT_COMP_LOGD("Marking trace as needing new metadata: "
1585 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1586 lttng_viewer_next_index_return_code_string(rp_status),
1587 trace->id);
1588 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1589 }
1590 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1591 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1592 "response=%s, response-flag=NEW_STREAM",
1593 lttng_viewer_next_index_return_code_string(rp_status));
1594 lttng_live_need_new_streams(lttng_live_msg_iter);
1595 }
1596 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1597 break;
1598 }
1599 case LTTNG_VIEWER_INDEX_RETRY:
1600 memset(index, 0, sizeof(struct packet_index));
1601 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1602 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1603 goto end;
1604 case LTTNG_VIEWER_INDEX_HUP:
1605 memset(index, 0, sizeof(struct packet_index));
1606 index->offset = EOF;
1607 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1608 stream->has_stream_hung_up = true;
1609 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1610 break;
1611 case LTTNG_VIEWER_INDEX_ERR:
1612 memset(index, 0, sizeof(struct packet_index));
1613 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1614 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1615 goto end;
1616 default:
1617 BT_COMP_LOGD("Received get_next_index response: unknown value");
1618 memset(index, 0, sizeof(struct packet_index));
1619 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1620 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1621 goto end;
1622 }
1623 goto end;
1624
1625 error:
1626 status = viewer_status_to_live_iterator_status(viewer_status);
1627 end:
1628 return status;
1629 }
1630
1631 BT_HIDDEN
1632 enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
1633 struct lttng_live_msg_iter *lttng_live_msg_iter,
1634 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1635 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1636 {
1637 enum ctf_msg_iter_medium_status status;
1638 enum lttng_live_viewer_status viewer_status;
1639 struct lttng_viewer_trace_packet rp;
1640 struct lttng_viewer_cmd cmd;
1641 struct lttng_viewer_get_packet rq;
1642 struct live_viewer_connection *viewer_connection =
1643 lttng_live_msg_iter->viewer_connection;
1644 bt_self_component *self_comp = viewer_connection->self_comp;
1645 struct lttng_live_trace *trace = stream->trace;
1646 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1647 char cmd_buf[cmd_buf_len];
1648 uint32_t flags, rp_status;
1649
1650 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1651 "offset=%" PRIu64 ", request-len=%" PRIu64,
1652 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1653 offset, req_len);
1654
1655 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1656 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1657 cmd.cmd_version = htobe32(0);
1658
1659 memset(&rq, 0, sizeof(rq));
1660 rq.stream_id = htobe64(stream->viewer_stream_id);
1661 rq.offset = htobe64(offset);
1662 rq.len = htobe32(req_len);
1663
1664 /*
1665 * Merge the cmd and connection request to prevent a write-write
1666 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1667 * second write to be performed quickly in presence of Nagle's algorithm.
1668 */
1669 memcpy(cmd_buf, &cmd, sizeof(cmd));
1670 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1671
1672 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1673 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1674 viewer_handle_send_status(self_comp, NULL,
1675 viewer_status, "get data packet command");
1676 goto error_convert_status;
1677 }
1678
1679 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1680 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1681 viewer_handle_recv_status(self_comp, NULL,
1682 viewer_status, "get data packet reply");
1683 goto error_convert_status;
1684 }
1685
1686 flags = be32toh(rp.flags);
1687 rp_status = be32toh(rp.status);
1688
1689 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1690 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1691 lttng_viewer_get_packet_return_code_string(rp_status));
1692 switch (rp_status) {
1693 case LTTNG_VIEWER_GET_PACKET_OK:
1694 req_len = be32toh(rp.len);
1695 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1696 lttng_viewer_get_packet_return_code_string(rp_status),
1697 req_len);
1698 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1699 break;
1700 case LTTNG_VIEWER_GET_PACKET_RETRY:
1701 /* Unimplemented by relay daemon */
1702 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1703 goto end;
1704 case LTTNG_VIEWER_GET_PACKET_ERR:
1705 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1706 BT_COMP_LOGD("Marking trace as needing new metadata: "
1707 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1708 lttng_viewer_next_index_return_code_string(rp_status),
1709 trace->id);
1710 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1711 }
1712 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1713 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1714 "response=%s, response-flag=NEW_STREAM",
1715 lttng_viewer_next_index_return_code_string(rp_status));
1716 lttng_live_need_new_streams(lttng_live_msg_iter);
1717 }
1718 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1719 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1720 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1721 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1722 lttng_viewer_get_packet_return_code_string(rp_status));
1723 goto end;
1724 }
1725 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1726 "Received get_data_packet response: error");
1727 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1728 goto end;
1729 case LTTNG_VIEWER_GET_PACKET_EOF:
1730 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1731 goto end;
1732 default:
1733 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1734 "Received get_data_packet response: unknown (%d)", rp_status);
1735 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1736 goto end;
1737 }
1738
1739 if (req_len == 0) {
1740 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1741 goto end;
1742 }
1743
1744 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1745 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1746 viewer_handle_recv_status(self_comp, NULL,
1747 viewer_status, "get data packet");
1748 goto error_convert_status;
1749 }
1750 *recv_len = req_len;
1751
1752 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1753 goto end;
1754
1755 error_convert_status:
1756 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1757 end:
1758 return status;
1759 }
1760
1761 /*
1762 * Request new streams for a session.
1763 */
1764 BT_HIDDEN
1765 enum lttng_live_iterator_status lttng_live_session_get_new_streams(
1766 struct lttng_live_session *session,
1767 bt_self_message_iterator *self_msg_iter)
1768 {
1769 enum lttng_live_iterator_status status =
1770 LTTNG_LIVE_ITERATOR_STATUS_OK;
1771 struct lttng_viewer_cmd cmd;
1772 struct lttng_viewer_new_streams_request rq;
1773 struct lttng_viewer_new_streams_response rp;
1774 struct lttng_live_msg_iter *lttng_live_msg_iter =
1775 session->lttng_live_msg_iter;
1776 enum lttng_live_viewer_status viewer_status;
1777 struct live_viewer_connection *viewer_connection =
1778 lttng_live_msg_iter->viewer_connection;
1779 bt_self_component *self_comp = viewer_connection->self_comp;
1780 uint32_t streams_count;
1781 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1782 char cmd_buf[cmd_buf_len];
1783
1784 if (!session->new_streams_needed) {
1785 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1786 goto end;
1787 }
1788
1789 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1790 "session-id=%" PRIu64,
1791 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS),
1792 session->id);
1793
1794 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1795 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1796 cmd.cmd_version = htobe32(0);
1797
1798 memset(&rq, 0, sizeof(rq));
1799 rq.session_id = htobe64(session->id);
1800
1801 /*
1802 * Merge the cmd and connection request to prevent a write-write
1803 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1804 * second write to be performed quickly in presence of Nagle's algorithm.
1805 */
1806 memcpy(cmd_buf, &cmd, sizeof(cmd));
1807 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1808
1809 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1810 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1811 viewer_handle_send_status(self_comp, NULL,
1812 viewer_status, "get new streams command");
1813 status = viewer_status_to_live_iterator_status(viewer_status);
1814 goto end;
1815 }
1816
1817 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1818 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1819 viewer_handle_recv_status(self_comp, NULL,
1820 viewer_status, "get new streams reply");
1821 status = viewer_status_to_live_iterator_status(viewer_status);
1822 goto end;
1823 }
1824
1825 streams_count = be32toh(rp.streams_count);
1826
1827 switch(be32toh(rp.status)) {
1828 case LTTNG_VIEWER_NEW_STREAMS_OK:
1829 session->new_streams_needed = false;
1830 break;
1831 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1832 session->new_streams_needed = false;
1833 goto end;
1834 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1835 session->new_streams_needed = false;
1836 session->closed = true;
1837 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1838 goto end;
1839 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1840 BT_COMP_LOGD("Received get_new_streams response: error");
1841 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1842 goto end;
1843 default:
1844 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1845 "Received get_new_streams response: Unknown:"
1846 "return code %u", be32toh(rp.status));
1847 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1848 goto end;
1849 }
1850
1851 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1852 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1853 viewer_handle_recv_status(self_comp, NULL,
1854 viewer_status, "new streams");
1855 status = viewer_status_to_live_iterator_status(viewer_status);
1856 goto end;
1857 }
1858
1859 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1860 end:
1861 return status;
1862 }
1863
1864 BT_HIDDEN
1865 enum lttng_live_viewer_status live_viewer_connection_create(
1866 bt_self_component *self_comp,
1867 bt_self_component_class *self_comp_class,
1868 bt_logging_level log_level,
1869 const char *url, bool in_query,
1870 struct lttng_live_msg_iter *lttng_live_msg_iter,
1871 struct live_viewer_connection **viewer)
1872 {
1873 struct live_viewer_connection *viewer_connection;
1874 enum lttng_live_viewer_status status;
1875
1876 viewer_connection = g_new0(struct live_viewer_connection, 1);
1877
1878 if (bt_socket_init(log_level) != 0) {
1879 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1880 self_comp_class, "Failed to init socket");
1881 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1882 goto error;
1883 }
1884
1885 viewer_connection->log_level = log_level;
1886
1887 viewer_connection->self_comp = self_comp;
1888 viewer_connection->self_comp_class = self_comp_class;
1889
1890 viewer_connection->control_sock = BT_INVALID_SOCKET;
1891 viewer_connection->port = -1;
1892 viewer_connection->in_query = in_query;
1893 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1894 viewer_connection->url = g_string_new(url);
1895 if (!viewer_connection->url) {
1896 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1897 self_comp_class, "Failed to allocate URL buffer");
1898 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1899 goto error;
1900 }
1901
1902 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1903 "Establishing connection to url \"%s\"...", url);
1904 status = lttng_live_connect_viewer(viewer_connection);
1905 /*
1906 * Only print error and append cause in case of error. not in case of
1907 * interruption.
1908 */
1909 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1910 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1911 self_comp_class, "Failed to establish connection: "
1912 "url=\"%s\"", url);
1913 goto error;
1914 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1915 goto error;
1916 }
1917 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1918 "Connection to url \"%s\" is established", url);
1919
1920 *viewer = viewer_connection;
1921 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1922 goto end;
1923
1924 error:
1925 if (viewer_connection) {
1926 live_viewer_connection_destroy(viewer_connection);
1927 }
1928 end:
1929 return status;
1930 }
1931
1932 BT_HIDDEN
1933 void live_viewer_connection_destroy(
1934 struct live_viewer_connection *viewer_connection)
1935 {
1936 bt_self_component *self_comp = viewer_connection->self_comp;
1937 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1938
1939 if (!viewer_connection) {
1940 goto end;
1941 }
1942
1943 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1944 "Closing connection to relay:"
1945 "relay-url=\"%s\"", viewer_connection->url->str);
1946
1947 lttng_live_disconnect_viewer(viewer_connection);
1948
1949 if (viewer_connection->url) {
1950 g_string_free(viewer_connection->url, true);
1951 }
1952
1953 if (viewer_connection->relay_hostname) {
1954 g_string_free(viewer_connection->relay_hostname, true);
1955 }
1956
1957 if (viewer_connection->target_hostname) {
1958 g_string_free(viewer_connection->target_hostname, true);
1959 }
1960
1961 if (viewer_connection->session_name) {
1962 g_string_free(viewer_connection->session_name, true);
1963 }
1964
1965 if (viewer_connection->proto) {
1966 g_string_free(viewer_connection->proto, true);
1967 }
1968
1969 g_free(viewer_connection);
1970
1971 bt_socket_fini();
1972
1973 end:
1974 return;
1975 }
This page took 0.116723 seconds and 3 git commands to generate.