src.ctf.lttng-live: use optional pattern for `ctf_stream_class_id`
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.c
1 /*
2 * SPDX-License-Identifier: MIT
3 *
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 */
7
8 #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
9 #define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
10 #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
11 #include "logging/comp-logging.h"
12
13 #include <fcntl.h>
14 #include <stdbool.h>
15 #include <stdint.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <sys/types.h>
19 #include <unistd.h>
20
21 #include <glib.h>
22
23 #include "compat/socket.h"
24 #include "compat/endian.h"
25 #include "compat/compiler.h"
26 #include "common/common.h"
27 #include <babeltrace2/babeltrace.h>
28
29 #include "lttng-live.h"
30 #include "viewer-connection.h"
31 #include "lttng-viewer-abi.h"
32 #include "data-stream.h"
33 #include "metadata.h"
34
35 #define viewer_handle_send_recv_status(_self_comp, _self_comp_class, \
36 _status, _action, _msg_str) \
37 do { \
38 switch (_status) { \
39 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
40 break; \
41 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
42 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, \
43 _self_comp_class, "Error " _action " " _msg_str); \
44 break; \
45 default: \
46 bt_common_abort(); \
47 } \
48 } while (0)
49
50 #define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
51 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
52 "sending", _msg_str)
53
54 #define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
55 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
56 "receiving", _msg_str)
57
58 #define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, \
59 _self_comp_class, _msg, _fmt, ...) \
60 do { \
61 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
62 _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \
63 } while (0)
64
65 static
66 const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
67 {
68 switch (cmd){
69 case LTTNG_VIEWER_CONNECT:
70 return "CONNECT";
71 case LTTNG_VIEWER_LIST_SESSIONS:
72 return "LIST_SESSIONS";
73 case LTTNG_VIEWER_ATTACH_SESSION:
74 return "ATTACH_SESSION";
75 case LTTNG_VIEWER_GET_NEXT_INDEX:
76 return "GET_NEXT_INDEX";
77 case LTTNG_VIEWER_GET_PACKET:
78 return "GET_PACKET";
79 case LTTNG_VIEWER_GET_METADATA:
80 return "GET_METADATA";
81 case LTTNG_VIEWER_GET_NEW_STREAMS:
82 return "GET_NEW_STREAMS";
83 case LTTNG_VIEWER_CREATE_SESSION:
84 return "CREATE_SESSION";
85 case LTTNG_VIEWER_DETACH_SESSION:
86 return "DETACH_SESSION";
87 }
88
89 bt_common_abort();
90 }
91
92 static
93 const char *lttng_viewer_next_index_return_code_string(
94 enum lttng_viewer_next_index_return_code code)
95 {
96 switch (code) {
97 case LTTNG_VIEWER_INDEX_OK:
98 return "INDEX_OK";
99 case LTTNG_VIEWER_INDEX_RETRY:
100 return "INDEX_RETRY";
101 case LTTNG_VIEWER_INDEX_HUP:
102 return "INDEX_HUP";
103 case LTTNG_VIEWER_INDEX_ERR:
104 return "INDEX_ERR";
105 case LTTNG_VIEWER_INDEX_INACTIVE:
106 return "INDEX_INACTIVE";
107 case LTTNG_VIEWER_INDEX_EOF:
108 return "INDEX_EOF";
109 }
110
111 bt_common_abort();
112 }
113
114 static
115 const char *lttng_viewer_get_packet_return_code_string(
116 enum lttng_viewer_get_packet_return_code code)
117 {
118 switch (code) {
119 case LTTNG_VIEWER_GET_PACKET_OK:
120 return "GET_PACKET_OK";
121 case LTTNG_VIEWER_GET_PACKET_RETRY:
122 return "GET_PACKET_RETRY";
123 case LTTNG_VIEWER_GET_PACKET_ERR:
124 return "GET_PACKET_ERR";
125 case LTTNG_VIEWER_GET_PACKET_EOF:
126 return "GET_PACKET_EOF";
127 }
128
129 bt_common_abort();
130 };
131
132 static
133 const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
134 {
135 switch (seek) {
136 case LTTNG_VIEWER_SEEK_BEGINNING:
137 return "SEEK_BEGINNING";
138 case LTTNG_VIEWER_SEEK_LAST:
139 return "SEEK_LAST";
140 }
141
142 bt_common_abort();
143 }
144
145 static inline
146 enum lttng_live_iterator_status viewer_status_to_live_iterator_status(
147 enum lttng_live_viewer_status viewer_status)
148 {
149 switch (viewer_status) {
150 case LTTNG_LIVE_VIEWER_STATUS_OK:
151 return LTTNG_LIVE_ITERATOR_STATUS_OK;
152 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
153 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
154 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
155 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
156 }
157
158 bt_common_abort();
159 }
160
161 static inline
162 enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status(
163 enum lttng_live_viewer_status viewer_status)
164 {
165 switch (viewer_status) {
166 case LTTNG_LIVE_VIEWER_STATUS_OK:
167 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
168 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
169 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
170 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
171 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
172 }
173
174 bt_common_abort();
175 }
176
177 static inline
178 void viewer_connection_close_socket(
179 struct live_viewer_connection *viewer_connection)
180 {
181 bt_self_component_class *self_comp_class =
182 viewer_connection->self_comp_class;
183 bt_self_component *self_comp =
184 viewer_connection->self_comp;
185 int ret = bt_socket_close(viewer_connection->control_sock);
186 if (ret == -1) {
187 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(
188 self_comp, self_comp_class,
189 "Error closing viewer connection socket: ", ".");
190 }
191
192 viewer_connection->control_sock = BT_INVALID_SOCKET;
193 }
194
195 /*
196 * This function receives a message from the Relay daemon.
197 * If it received the entire message, it returns _OK,
198 * If it's interrupted, it returns _INTERRUPTED,
199 * otherwise, it returns _ERROR.
200 */
201 static
202 enum lttng_live_viewer_status lttng_live_recv(
203 struct live_viewer_connection *viewer_connection,
204 void *buf, size_t len)
205 {
206 ssize_t received;
207 bt_self_component_class *self_comp_class =
208 viewer_connection->self_comp_class;
209 bt_self_component *self_comp =
210 viewer_connection->self_comp;
211 size_t total_received = 0, to_receive = len;
212 struct lttng_live_msg_iter *lttng_live_msg_iter =
213 viewer_connection->lttng_live_msg_iter;
214 enum lttng_live_viewer_status status;
215 BT_SOCKET sock = viewer_connection->control_sock;
216
217 /*
218 * Receive a message from the Relay.
219 */
220 do {
221 received = bt_socket_recv(sock, buf + total_received, to_receive, 0);
222 if (received == BT_SOCKET_ERROR) {
223 if (bt_socket_interrupted()) {
224 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
225 /*
226 * This interruption was due to a
227 * SIGINT and the graph is being torn
228 * down.
229 */
230 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
231 lttng_live_msg_iter->was_interrupted = true;
232 goto end;
233 } else {
234 /*
235 * A signal was received, but the graph
236 * is not being torn down. Carry on.
237 */
238 continue;
239 }
240 } else {
241 /*
242 * For any other types of socket error, close
243 * the socket and return an error.
244 */
245 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
246 self_comp, self_comp_class,
247 "Error receiving from Relay", ".");
248
249 viewer_connection_close_socket(viewer_connection);
250 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
251 goto end;
252 }
253 } else if (received == 0) {
254 /*
255 * The recv() call returned 0. This means the
256 * connection was orderly shutdown from the other peer.
257 * If that happens when we are trying to receive
258 * a message from it, it means something when wrong.
259 * Close the socket and return an error.
260 */
261 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
262 self_comp_class, "Remote side has closed connection");
263 viewer_connection_close_socket(viewer_connection);
264 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
265 goto end;
266 }
267
268 BT_ASSERT(received <= to_receive);
269 total_received += received;
270 to_receive -= received;
271
272 } while (to_receive > 0);
273
274 BT_ASSERT(total_received == len);
275 status = LTTNG_LIVE_VIEWER_STATUS_OK;
276
277 end:
278 return status;
279 }
280
281 /*
282 * This function sends a message to the Relay daemon.
283 * If it send the message, it returns _OK,
284 * If it's interrupted, it returns _INTERRUPTED,
285 * otherwise, it returns _ERROR.
286 */
287 static
288 enum lttng_live_viewer_status lttng_live_send(
289 struct live_viewer_connection *viewer_connection,
290 const void *buf, size_t len)
291 {
292 enum lttng_live_viewer_status status;
293 bt_self_component_class *self_comp_class =
294 viewer_connection->self_comp_class;
295 bt_self_component *self_comp =
296 viewer_connection->self_comp;
297 struct lttng_live_msg_iter *lttng_live_msg_iter =
298 viewer_connection->lttng_live_msg_iter;
299 BT_SOCKET sock = viewer_connection->control_sock;
300 size_t to_send = len;
301 ssize_t total_sent = 0;
302
303 do {
304 ssize_t sent = bt_socket_send_nosigpipe(sock, buf + total_sent,
305 to_send);
306 if (sent == BT_SOCKET_ERROR) {
307 if (bt_socket_interrupted()) {
308 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
309 /*
310 * This interruption was a SIGINT and
311 * the graph is being teared down.
312 */
313 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
314 lttng_live_msg_iter->was_interrupted = true;
315 goto end;
316 } else {
317 /*
318 * A signal was received, but the graph
319 * is not being teared down. Carry on.
320 */
321 continue;
322 }
323 } else {
324 /*
325 * For any other types of socket error, close
326 * the socket and return an error.
327 */
328 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
329 self_comp, self_comp_class,
330 "Error sending to Relay", ".");
331
332 viewer_connection_close_socket(viewer_connection);
333 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
334 goto end;
335 }
336 }
337
338 BT_ASSERT(sent <= to_send);
339 total_sent += sent;
340 to_send -= sent;
341
342 } while (to_send > 0);
343
344 BT_ASSERT(total_sent == len);
345 status = LTTNG_LIVE_VIEWER_STATUS_OK;
346
347 end:
348 return status;
349 }
350
351 static
352 int parse_url(struct live_viewer_connection *viewer_connection)
353 {
354 char error_buf[256] = { 0 };
355 bt_self_component *self_comp = viewer_connection->self_comp;
356 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
357 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
358 int ret = -1;
359 const char *path = viewer_connection->url->str;
360
361 if (!path) {
362 goto end;
363 }
364
365 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
366 sizeof(error_buf));
367 if (!lttng_live_url_parts.proto) {
368 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
369 self_comp_class,"Invalid LTTng live URL format: %s",
370 error_buf);
371 goto end;
372 }
373 viewer_connection->proto = lttng_live_url_parts.proto;
374 lttng_live_url_parts.proto = NULL;
375
376 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
377 lttng_live_url_parts.hostname = NULL;
378
379 if (lttng_live_url_parts.port >= 0) {
380 viewer_connection->port = lttng_live_url_parts.port;
381 } else {
382 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
383 }
384
385 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
386 lttng_live_url_parts.target_hostname = NULL;
387
388 if (lttng_live_url_parts.session_name) {
389 viewer_connection->session_name = lttng_live_url_parts.session_name;
390 lttng_live_url_parts.session_name = NULL;
391 }
392
393 ret = 0;
394
395 end:
396 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
397 return ret;
398 }
399
400 static
401 enum lttng_live_viewer_status lttng_live_handshake(
402 struct live_viewer_connection *viewer_connection)
403 {
404 struct lttng_viewer_cmd cmd;
405 struct lttng_viewer_connect connect;
406 enum lttng_live_viewer_status status;
407 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
408 bt_self_component *self_comp = viewer_connection->self_comp;
409 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
410 char cmd_buf[cmd_buf_len];
411
412 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
413 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
414 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR,
415 LTTNG_LIVE_MINOR);
416
417 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
418 cmd.data_size = htobe64((uint64_t) sizeof(connect));
419 cmd.cmd_version = htobe32(0);
420
421 connect.viewer_session_id = -1ULL; /* will be set on recv */
422 connect.major = htobe32(LTTNG_LIVE_MAJOR);
423 connect.minor = htobe32(LTTNG_LIVE_MINOR);
424 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
425
426 /*
427 * Merge the cmd and connection request to prevent a write-write
428 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
429 * second write to be performed quickly in presence of Nagle's algorithm
430 */
431 memcpy(cmd_buf, &cmd, sizeof(cmd));
432 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
433
434 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
435 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
436 viewer_handle_send_status(self_comp, self_comp_class,
437 status, "viewer connect command");
438 goto end;
439 }
440
441 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
442 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
443 viewer_handle_recv_status(self_comp, self_comp_class,
444 status, "viewer connect reply");
445 goto end;
446 }
447
448 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class,
449 "Received viewer session ID : %" PRIu64,
450 (uint64_t) be64toh(connect.viewer_session_id));
451 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class,
452 "Relayd version : %u.%u", be32toh(connect.major),
453 be32toh(connect.minor));
454
455 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
456 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
457 self_comp_class, "Incompatible lttng-relayd protocol");
458 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
459 goto end;
460 }
461 /* Use the smallest protocol version implemented. */
462 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
463 viewer_connection->minor = be32toh(connect.minor);
464 } else {
465 viewer_connection->minor = LTTNG_LIVE_MINOR;
466 }
467 viewer_connection->major = LTTNG_LIVE_MAJOR;
468
469 status = LTTNG_LIVE_VIEWER_STATUS_OK;
470
471 goto end;
472
473 end:
474 return status;
475 }
476
477 static
478 enum lttng_live_viewer_status lttng_live_connect_viewer(
479 struct live_viewer_connection *viewer_connection)
480 {
481 struct hostent *host;
482 struct sockaddr_in server_addr;
483 enum lttng_live_viewer_status status;
484 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
485 bt_self_component *self_comp = viewer_connection->self_comp;
486
487 if (parse_url(viewer_connection)) {
488 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
489 self_comp_class, "Failed to parse URL");
490 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
491 goto error;
492 }
493
494 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
495 "Connecting to hostname : %s, port : %d, "
496 "target hostname : %s, session name : %s, proto : %s",
497 viewer_connection->relay_hostname->str,
498 viewer_connection->port,
499 !viewer_connection->target_hostname ?
500 "<none>" : viewer_connection->target_hostname->str,
501 !viewer_connection->session_name ?
502 "<none>" : viewer_connection->session_name->str,
503 viewer_connection->proto->str);
504
505 host = gethostbyname(viewer_connection->relay_hostname->str);
506 if (!host) {
507 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
508 self_comp_class, "Cannot lookup hostname: hostname=\"%s\"",
509 viewer_connection->relay_hostname->str);
510 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
511 goto error;
512 }
513
514 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
515 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
516 self_comp_class, "Socket creation failed: %s", bt_socket_errormsg());
517 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
518 goto error;
519 }
520
521 server_addr.sin_family = AF_INET;
522 server_addr.sin_port = htons(viewer_connection->port);
523 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
524 memset(&(server_addr.sin_zero), 0, 8);
525
526 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
527 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
528 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
529 self_comp_class, "Connection failed: %s",
530 bt_socket_errormsg());
531 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
532 goto error;
533 }
534
535 status = lttng_live_handshake(viewer_connection);
536
537 /*
538 * Only print error and append cause in case of error. not in case of
539 * interruption.
540 */
541 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
543 self_comp_class, "Viewer handshake failed");
544 goto error;
545 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
546 goto end;
547 }
548
549 goto end;
550
551 error:
552 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
553 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
554 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class,
555 "Error closing socket: %s.", bt_socket_errormsg());
556 }
557 }
558 viewer_connection->control_sock = BT_INVALID_SOCKET;
559 end:
560 return status;
561 }
562
563 static
564 void lttng_live_disconnect_viewer(
565 struct live_viewer_connection *viewer_connection)
566 {
567 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
568 bt_self_component *self_comp = viewer_connection->self_comp;
569
570 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
571 return;
572 }
573 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
574 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class,
575 "Error closing socket: %s", bt_socket_errormsg());
576 viewer_connection->control_sock = BT_INVALID_SOCKET;
577 }
578 }
579
580 static
581 int list_update_session(bt_value *results,
582 const struct lttng_viewer_session *session,
583 bool *_found, struct live_viewer_connection *viewer_connection)
584 {
585 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
586 bt_self_component *self_comp = viewer_connection->self_comp;
587 int ret = 0;
588 uint64_t i, len;
589 bt_value *map = NULL;
590 bt_value *hostname = NULL;
591 bt_value *session_name = NULL;
592 bt_value *btval = NULL;
593 bool found = false;
594
595 len = bt_value_array_get_length(results);
596 for (i = 0; i < len; i++) {
597 const char *hostname_str = NULL;
598 const char *session_name_str = NULL;
599
600 map = bt_value_array_borrow_element_by_index(results, i);
601 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
602 if (!hostname) {
603 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
604 self_comp_class,
605 "Error borrowing \"target-hostname\" entry.");
606 ret = -1;
607 goto end;
608 }
609 session_name = bt_value_map_borrow_entry_value(map, "session-name");
610 if (!session_name) {
611 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
612 self_comp_class,
613 "Error borrowing \"session-name\" entry.");
614 ret = -1;
615 goto end;
616 }
617 hostname_str = bt_value_string_get(hostname);
618 session_name_str = bt_value_string_get(session_name);
619
620 if (strcmp(session->hostname, hostname_str) == 0
621 && strcmp(session->session_name, session_name_str) == 0) {
622 int64_t val;
623 uint32_t streams = be32toh(session->streams);
624 uint32_t clients = be32toh(session->clients);
625
626 found = true;
627
628 btval = bt_value_map_borrow_entry_value(map, "stream-count");
629 if (!btval) {
630 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
631 self_comp, self_comp_class,
632 "Error borrowing \"stream-count\" entry.");
633 ret = -1;
634 goto end;
635 }
636 val = bt_value_integer_unsigned_get(btval);
637 /* sum */
638 val += streams;
639 bt_value_integer_unsigned_set(btval, val);
640
641 btval = bt_value_map_borrow_entry_value(map, "client-count");
642 if (!btval) {
643 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
644 self_comp, self_comp_class,
645 "Error borrowing \"client-count\" entry.");
646 ret = -1;
647 goto end;
648 }
649 val = bt_value_integer_unsigned_get(btval);
650 /* max */
651 val = bt_max_t(int64_t, clients, val);
652 bt_value_integer_unsigned_set(btval, val);
653 }
654
655 if (found) {
656 break;
657 }
658 }
659 end:
660 *_found = found;
661 return ret;
662 }
663
664 static
665 int list_append_session(bt_value *results,
666 GString *base_url,
667 const struct lttng_viewer_session *session,
668 struct live_viewer_connection *viewer_connection)
669 {
670 int ret = 0;
671 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
672 bt_value_map_insert_entry_status insert_status;
673 bt_value_array_append_element_status append_status;
674 bt_value *map = NULL;
675 GString *url = NULL;
676 bool found = false;
677
678 /*
679 * If the session already exists, add the stream count to it,
680 * and do max of client counts.
681 */
682 ret = list_update_session(results, session, &found, viewer_connection);
683 if (ret || found) {
684 goto end;
685 }
686
687 map = bt_value_map_create();
688 if (!map) {
689 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
690 "Error creating map value.");
691 ret = -1;
692 goto end;
693 }
694
695 if (base_url->len < 1) {
696 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
697 "Error: base_url length smaller than 1.");
698 ret = -1;
699 goto end;
700 }
701 /*
702 * key = "url",
703 * value = <string>,
704 */
705 url = g_string_new(base_url->str);
706 g_string_append(url, "/host/");
707 g_string_append(url, session->hostname);
708 g_string_append_c(url, '/');
709 g_string_append(url, session->session_name);
710
711 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
712 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
713 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
714 "Error inserting \"url\" entry.");
715 ret = -1;
716 goto end;
717 }
718
719 /*
720 * key = "target-hostname",
721 * value = <string>,
722 */
723 insert_status = bt_value_map_insert_string_entry(map, "target-hostname",
724 session->hostname);
725 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
726 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
727 "Error inserting \"target-hostname\" entry.");
728 ret = -1;
729 goto end;
730 }
731
732 /*
733 * key = "session-name",
734 * value = <string>,
735 */
736 insert_status = bt_value_map_insert_string_entry(map, "session-name",
737 session->session_name);
738 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
739 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
740 "Error inserting \"session-name\" entry.");
741 ret = -1;
742 goto end;
743 }
744
745 /*
746 * key = "timer-us",
747 * value = <integer>,
748 */
749 {
750 uint32_t live_timer = be32toh(session->live_timer);
751
752 insert_status = bt_value_map_insert_unsigned_integer_entry(
753 map, "timer-us", live_timer);
754 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
755 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
756 "Error inserting \"timer-us\" entry.");
757 ret = -1;
758 goto end;
759 }
760 }
761
762 /*
763 * key = "stream-count",
764 * value = <integer>,
765 */
766 {
767 uint32_t streams = be32toh(session->streams);
768
769 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
770 "stream-count", streams);
771 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
772 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
773 "Error inserting \"stream-count\" entry.");
774 ret = -1;
775 goto end;
776 }
777 }
778
779 /*
780 * key = "client-count",
781 * value = <integer>,
782 */
783 {
784 uint32_t clients = be32toh(session->clients);
785
786 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
787 "client-count", clients);
788 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
789 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
790 "Error inserting \"client-count\" entry.");
791 ret = -1;
792 goto end;
793 }
794 }
795
796 append_status = bt_value_array_append_element(results, map);
797 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
798 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
799 "Error appending map to results.");
800 ret = -1;
801 }
802
803 end:
804 if (url) {
805 g_string_free(url, true);
806 }
807 BT_VALUE_PUT_REF_AND_RESET(map);
808 return ret;
809 }
810
811 /*
812 * Data structure returned:
813 *
814 * {
815 * <array> = {
816 * [n] = {
817 * <map> = {
818 * {
819 * key = "url",
820 * value = <string>,
821 * },
822 * {
823 * key = "target-hostname",
824 * value = <string>,
825 * },
826 * {
827 * key = "session-name",
828 * value = <string>,
829 * },
830 * {
831 * key = "timer-us",
832 * value = <integer>,
833 * },
834 * {
835 * key = "stream-count",
836 * value = <integer>,
837 * },
838 * {
839 * key = "client-count",
840 * value = <integer>,
841 * },
842 * },
843 * }
844 * }
845 */
846
847 BT_HIDDEN
848 bt_component_class_query_method_status live_viewer_connection_list_sessions(
849 struct live_viewer_connection *viewer_connection,
850 const bt_value **user_result)
851 {
852 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
853 bt_component_class_query_method_status status =
854 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
855 bt_value *result = NULL;
856 enum lttng_live_viewer_status viewer_status;
857 struct lttng_viewer_cmd cmd;
858 struct lttng_viewer_list_sessions list;
859 uint32_t i, sessions_count;
860
861 result = bt_value_array_create();
862 if (!result) {
863 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
864 "Error creating array");
865 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
866 goto error;
867 }
868
869 BT_LOGD("Requesting list of sessions: cmd=%s",
870 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
871
872 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
873 cmd.data_size = htobe64((uint64_t) 0);
874 cmd.cmd_version = htobe32(0);
875
876 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
877 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
878 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
879 "Error sending list sessions command");
880 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
881 goto error;
882 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
883 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
884 goto error;
885 }
886
887 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
888 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
889 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
890 "Error receiving session list");
891 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
892 goto error;
893 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
894 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
895 goto error;
896 }
897
898 sessions_count = be32toh(list.sessions_count);
899 for (i = 0; i < sessions_count; i++) {
900 struct lttng_viewer_session lsession;
901
902 viewer_status = lttng_live_recv(viewer_connection, &lsession,
903 sizeof(lsession));
904 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
905 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
906 "Error receiving session:");
907 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
908 goto error;
909 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
910 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
911 goto error;
912 }
913
914 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
915 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
916 if (list_append_session(result, viewer_connection->url,
917 &lsession, viewer_connection)) {
918 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
919 "Error appending session");
920 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
921 goto error;
922 }
923 }
924
925 *user_result = result;
926 goto end;
927 error:
928 BT_VALUE_PUT_REF_AND_RESET(result);
929 end:
930 return status;
931 }
932
933 static
934 enum lttng_live_viewer_status lttng_live_query_session_ids(
935 struct lttng_live_msg_iter *lttng_live_msg_iter)
936 {
937 struct lttng_viewer_cmd cmd;
938 struct lttng_viewer_list_sessions list;
939 struct lttng_viewer_session lsession;
940 uint32_t i, sessions_count;
941 uint64_t session_id;
942 enum lttng_live_viewer_status status;
943 struct live_viewer_connection *viewer_connection =
944 lttng_live_msg_iter->viewer_connection;
945 bt_self_component *self_comp = viewer_connection->self_comp;
946 bt_self_component_class *self_comp_class =
947 viewer_connection->self_comp_class;
948
949 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
950 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
951
952 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
953 cmd.data_size = htobe64((uint64_t) 0);
954 cmd.cmd_version = htobe32(0);
955
956 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
957 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
958 viewer_handle_send_status(self_comp, self_comp_class,
959 status, "list sessions command");
960 goto end;
961 }
962
963 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
964 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
965 viewer_handle_recv_status(self_comp, self_comp_class,
966 status, "session list reply");
967 goto end;
968 }
969
970 sessions_count = be32toh(list.sessions_count);
971 for (i = 0; i < sessions_count; i++) {
972 status = lttng_live_recv(viewer_connection, &lsession,
973 sizeof(lsession));
974 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
975 viewer_handle_recv_status(self_comp, self_comp_class,
976 status, "session reply");
977 goto end;
978 }
979 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
980 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
981 session_id = be64toh(lsession.id);
982
983 BT_COMP_LOGI("Adding session to internal list: "
984 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
985 session_id, lsession.hostname, lsession.session_name);
986
987 if ((strncmp(lsession.session_name,
988 viewer_connection->session_name->str,
989 LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
990 viewer_connection->target_hostname->str,
991 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
992
993 if (lttng_live_add_session(lttng_live_msg_iter, session_id,
994 lsession.hostname,
995 lsession.session_name)) {
996 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
997 "Failed to add live session");
998 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
999 goto end;
1000 }
1001 }
1002 }
1003
1004 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1005
1006 end:
1007 return status;
1008 }
1009
1010 BT_HIDDEN
1011 enum lttng_live_viewer_status lttng_live_create_viewer_session(
1012 struct lttng_live_msg_iter *lttng_live_msg_iter)
1013 {
1014 struct lttng_viewer_cmd cmd;
1015 struct lttng_viewer_create_session_response resp;
1016 enum lttng_live_viewer_status status;
1017 struct live_viewer_connection *viewer_connection =
1018 lttng_live_msg_iter->viewer_connection;
1019 bt_self_component *self_comp = viewer_connection->self_comp;
1020 bt_self_component_class *self_comp_class =
1021 viewer_connection->self_comp_class;
1022
1023 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1024 "Creating a viewer session: cmd=%s",
1025 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
1026
1027 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
1028 cmd.data_size = htobe64((uint64_t) 0);
1029 cmd.cmd_version = htobe32(0);
1030
1031 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1032 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1033 viewer_handle_send_status(self_comp, self_comp_class,
1034 status, "create session command");
1035 goto end;
1036 }
1037
1038 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
1039 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1040 viewer_handle_recv_status(self_comp, self_comp_class,
1041 status, "create session reply");
1042 goto end;
1043 }
1044
1045 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1046 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1047 "Error creating viewer session");
1048 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1049 goto end;
1050 }
1051
1052 status = lttng_live_query_session_ids(lttng_live_msg_iter);
1053 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1054 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1055 "Failed to query live viewer session ids");
1056 goto end;
1057 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1058 goto end;
1059 }
1060
1061 end:
1062 return status;
1063 }
1064
1065 static
1066 enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
1067 uint32_t stream_count,
1068 bt_self_message_iterator *self_msg_iter)
1069 {
1070 uint32_t i;
1071 struct lttng_live_msg_iter *lttng_live_msg_iter =
1072 session->lttng_live_msg_iter;
1073 enum lttng_live_viewer_status status;
1074 struct live_viewer_connection *viewer_connection =
1075 lttng_live_msg_iter->viewer_connection;
1076 bt_self_component *self_comp = viewer_connection->self_comp;
1077
1078 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
1079 for (i = 0; i < stream_count; i++) {
1080 struct lttng_viewer_stream stream;
1081 struct lttng_live_stream_iterator *live_stream;
1082 uint64_t stream_id;
1083 uint64_t ctf_trace_id;
1084
1085 status = lttng_live_recv(viewer_connection, &stream,
1086 sizeof(stream));
1087 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1088 viewer_handle_recv_status(self_comp, NULL,
1089 status, "stream reply");
1090 goto end;
1091 }
1092 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1093 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1094 stream_id = be64toh(stream.id);
1095 ctf_trace_id = be64toh(stream.ctf_trace_id);
1096
1097 if (stream.metadata_flag) {
1098 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
1099 stream_id, stream.path_name, stream.channel_name);
1100 if (lttng_live_metadata_create_stream(session,
1101 ctf_trace_id, stream_id,
1102 stream.path_name)) {
1103 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1104 "Error creating metadata stream");
1105 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1106 goto end;
1107 }
1108 session->lazy_stream_msg_init = true;
1109 } else {
1110 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
1111 stream_id, stream.path_name, stream.channel_name);
1112 live_stream = lttng_live_stream_iterator_create(session,
1113 ctf_trace_id, stream_id, self_msg_iter);
1114 if (!live_stream) {
1115 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1116 "Error creating stream");
1117 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1118 goto end;
1119 }
1120 }
1121 }
1122 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1123
1124 end:
1125 return status;
1126 }
1127
1128 BT_HIDDEN
1129 enum lttng_live_viewer_status lttng_live_session_attach(
1130 struct lttng_live_session *session,
1131 bt_self_message_iterator *self_msg_iter)
1132 {
1133 struct lttng_viewer_cmd cmd;
1134 enum lttng_live_viewer_status status;
1135 struct lttng_viewer_attach_session_request rq;
1136 struct lttng_viewer_attach_session_response rp;
1137 struct lttng_live_msg_iter *lttng_live_msg_iter =
1138 session->lttng_live_msg_iter;
1139 struct live_viewer_connection *viewer_connection =
1140 lttng_live_msg_iter->viewer_connection;
1141 bt_self_component *self_comp = viewer_connection->self_comp;
1142 uint64_t session_id = session->id;
1143 uint32_t streams_count;
1144 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1145 char cmd_buf[cmd_buf_len];
1146
1147 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64
1148 ", seek=%s",
1149 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION),
1150 session_id,
1151 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1152
1153 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1154 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1155 cmd.cmd_version = htobe32(0);
1156
1157 memset(&rq, 0, sizeof(rq));
1158 rq.session_id = htobe64(session_id);
1159 // TODO: add cmd line parameter to select seek beginning
1160 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1161 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1162
1163 /*
1164 * Merge the cmd and connection request to prevent a write-write
1165 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1166 * second write to be performed quickly in presence of Nagle's algorithm.
1167 */
1168 memcpy(cmd_buf, &cmd, sizeof(cmd));
1169 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1170 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1171 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1172 viewer_handle_send_status(self_comp, NULL,
1173 status, "attach session command");
1174 goto end;
1175 }
1176
1177 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1178 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1179 viewer_handle_recv_status(self_comp, NULL,
1180 status, "attach session reply");
1181 goto end;
1182 }
1183
1184 streams_count = be32toh(rp.streams_count);
1185 switch(be32toh(rp.status)) {
1186 case LTTNG_VIEWER_ATTACH_OK:
1187 break;
1188 case LTTNG_VIEWER_ATTACH_UNK:
1189 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1190 "Session id %" PRIu64 " is unknown", session_id);
1191 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1192 goto end;
1193 case LTTNG_VIEWER_ATTACH_ALREADY:
1194 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1195 "There is already a viewer attached to this session");
1196 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1197 goto end;
1198 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1199 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1200 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1201 goto end;
1202 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1203 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1204 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1205 goto end;
1206 default:
1207 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1208 "Unknown attach return code %u", be32toh(rp.status));
1209 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1210 goto end;
1211 }
1212
1213 /* We receive the initial list of streams. */
1214 status = receive_streams(session, streams_count, self_msg_iter);
1215 switch (status) {
1216 case LTTNG_LIVE_VIEWER_STATUS_OK:
1217 break;
1218 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1219 goto end;
1220 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1221 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1222 goto end;
1223 default:
1224 bt_common_abort();
1225 }
1226
1227 session->attached = true;
1228 session->new_streams_needed = false;
1229
1230 end:
1231 return status;
1232 }
1233
1234 BT_HIDDEN
1235 enum lttng_live_viewer_status lttng_live_session_detach(
1236 struct lttng_live_session *session)
1237 {
1238 struct lttng_viewer_cmd cmd;
1239 enum lttng_live_viewer_status status;
1240 struct lttng_viewer_detach_session_request rq;
1241 struct lttng_viewer_detach_session_response rp;
1242 struct lttng_live_msg_iter *lttng_live_msg_iter =
1243 session->lttng_live_msg_iter;
1244 bt_self_component *self_comp = session->self_comp;
1245 struct live_viewer_connection *viewer_connection =
1246 lttng_live_msg_iter->viewer_connection;
1247 uint64_t session_id = session->id;
1248 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1249 char cmd_buf[cmd_buf_len];
1250
1251 /*
1252 * The session might already be detached and the viewer socket might
1253 * already been closed. This happens when calling this function when
1254 * tearing down the graph after an error.
1255 */
1256 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1257 return 0;
1258 }
1259
1260 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1261 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION),
1262 session_id);
1263
1264 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1265 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1266 cmd.cmd_version = htobe32(0);
1267
1268 memset(&rq, 0, sizeof(rq));
1269 rq.session_id = htobe64(session_id);
1270
1271 /*
1272 * Merge the cmd and connection request to prevent a write-write
1273 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1274 * second write to be performed quickly in presence of Nagle's algorithm.
1275 */
1276 memcpy(cmd_buf, &cmd, sizeof(cmd));
1277 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1278 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1279 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1280 viewer_handle_send_status(self_comp, NULL,
1281 status, "detach session command");
1282 goto end;
1283 }
1284
1285 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1286 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1287 viewer_handle_recv_status(self_comp, NULL,
1288 status, "detach session reply");
1289 goto end;
1290 }
1291
1292 switch(be32toh(rp.status)) {
1293 case LTTNG_VIEWER_DETACH_SESSION_OK:
1294 break;
1295 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1296 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1297 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1298 goto end;
1299 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1300 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1301 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1302 goto end;
1303 default:
1304 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1305 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1306 goto end;
1307 }
1308
1309 session->attached = false;
1310
1311 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1312
1313 end:
1314 return status;
1315 }
1316
1317 BT_HIDDEN
1318 enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
1319 struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
1320 {
1321 uint64_t len = 0;
1322 enum lttng_live_get_one_metadata_status status;
1323 enum lttng_live_viewer_status viewer_status;
1324 struct lttng_viewer_cmd cmd;
1325 struct lttng_viewer_get_metadata rq;
1326 struct lttng_viewer_metadata_packet rp;
1327 gchar *data = NULL;
1328 ssize_t writelen;
1329 struct lttng_live_session *session = trace->session;
1330 struct lttng_live_msg_iter *lttng_live_msg_iter =
1331 session->lttng_live_msg_iter;
1332 struct lttng_live_metadata *metadata = trace->metadata;
1333 struct live_viewer_connection *viewer_connection =
1334 lttng_live_msg_iter->viewer_connection;
1335 bt_self_component *self_comp = viewer_connection->self_comp;
1336 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1337 char cmd_buf[cmd_buf_len];
1338
1339 BT_COMP_LOGD("Requesting new metadata for trace:"
1340 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1341 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA),
1342 trace->id, metadata->stream_id);
1343
1344 rq.stream_id = htobe64(metadata->stream_id);
1345 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1346 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1347 cmd.cmd_version = htobe32(0);
1348
1349 /*
1350 * Merge the cmd and connection request to prevent a write-write
1351 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1352 * second write to be performed quickly in presence of Nagle's algorithm.
1353 */
1354 memcpy(cmd_buf, &cmd, sizeof(cmd));
1355 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1356 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1357 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1358 viewer_handle_send_status(self_comp, NULL,
1359 viewer_status, "get metadata command");
1360 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1361 goto end;
1362 }
1363
1364 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1365 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1366 viewer_handle_recv_status(self_comp, NULL,
1367 viewer_status, "get metadata reply");
1368 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1369 goto end;
1370 }
1371
1372 switch (be32toh(rp.status)) {
1373 case LTTNG_VIEWER_METADATA_OK:
1374 BT_COMP_LOGD("Received get_metadata response: ok");
1375 break;
1376 case LTTNG_VIEWER_NO_NEW_METADATA:
1377 BT_COMP_LOGD("Received get_metadata response: no new");
1378 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1379 goto end;
1380 case LTTNG_VIEWER_METADATA_ERR:
1381 /*
1382 * The Relayd cannot find this stream id. Maybe its
1383 * gone already. This can happen in short lived UST app
1384 * in a per-pid session.
1385 */
1386 BT_COMP_LOGD("Received get_metadata response: error");
1387 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1388 goto end;
1389 default:
1390 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1391 "Received get_metadata response: unknown");
1392 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1393 goto end;
1394 }
1395
1396 len = be64toh(rp.len);
1397 BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
1398 if (len <= 0) {
1399 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1400 "Erroneous response length");
1401 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1402 goto end;
1403 }
1404
1405 data = g_new0(gchar, len);
1406 if (!data) {
1407 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
1408 "Failed to allocate data buffer", ".");
1409 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1410 goto end;
1411 }
1412
1413 viewer_status = lttng_live_recv(viewer_connection, data, len);
1414 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1415 viewer_handle_recv_status(self_comp, NULL,
1416 viewer_status, "get metadata packet");
1417 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1418 goto end;
1419 }
1420
1421 /*
1422 * Write the metadata to the file handle.
1423 */
1424 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1425 if (writelen != len) {
1426 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1427 "Writing in the metadata file stream");
1428 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1429 goto end;
1430 }
1431
1432 *reply_len = len;
1433 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1434
1435 end:
1436 g_free(data);
1437 return status;
1438 }
1439
1440 /*
1441 * Assign the fields from a lttng_viewer_index to a packet_index.
1442 */
1443 static
1444 void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1445 struct packet_index *pindex)
1446 {
1447 BT_ASSERT(lindex);
1448 BT_ASSERT(pindex);
1449
1450 pindex->offset = be64toh(lindex->offset);
1451 pindex->packet_size = be64toh(lindex->packet_size);
1452 pindex->content_size = be64toh(lindex->content_size);
1453 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1454 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1455 pindex->events_discarded = be64toh(lindex->events_discarded);
1456 }
1457
1458 static
1459 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
1460 {
1461 uint64_t session_idx;
1462 struct live_viewer_connection *viewer_connection =
1463 lttng_live_msg_iter->viewer_connection;
1464
1465 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len;
1466 session_idx++) {
1467 struct lttng_live_session *session =
1468 g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1469 BT_COMP_LOGD("Marking session as needing new streams: "
1470 "session-id=%" PRIu64, session->id);
1471 session->new_streams_needed = true;
1472 }
1473 }
1474
1475 BT_HIDDEN
1476 enum lttng_live_iterator_status lttng_live_get_next_index(
1477 struct lttng_live_msg_iter *lttng_live_msg_iter,
1478 struct lttng_live_stream_iterator *stream,
1479 struct packet_index *index)
1480 {
1481 struct lttng_viewer_cmd cmd;
1482 struct lttng_viewer_get_next_index rq;
1483 enum lttng_live_viewer_status viewer_status;
1484 struct lttng_viewer_index rp;
1485 enum lttng_live_iterator_status status;
1486 struct live_viewer_connection *viewer_connection =
1487 lttng_live_msg_iter->viewer_connection;
1488 bt_self_component *self_comp = viewer_connection->self_comp;
1489 struct lttng_live_trace *trace = stream->trace;
1490 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1491 char cmd_buf[cmd_buf_len];
1492 uint32_t flags, rp_status;
1493
1494 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1495 "viewer-stream-id=%" PRIu64,
1496 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1497 stream->viewer_stream_id);
1498 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1499 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1500 cmd.cmd_version = htobe32(0);
1501
1502 memset(&rq, 0, sizeof(rq));
1503 rq.stream_id = htobe64(stream->viewer_stream_id);
1504
1505 /*
1506 * Merge the cmd and connection request to prevent a write-write
1507 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1508 * second write to be performed quickly in presence of Nagle's algorithm.
1509 */
1510 memcpy(cmd_buf, &cmd, sizeof(cmd));
1511 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1512
1513 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1514 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1515 viewer_handle_send_status(self_comp, NULL,
1516 viewer_status, "get next index command");
1517 goto error;
1518 }
1519
1520 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1521 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1522 viewer_handle_recv_status(self_comp, NULL,
1523 viewer_status, "get next index reply");
1524 goto error;
1525 }
1526
1527 flags = be32toh(rp.flags);
1528 rp_status = be32toh(rp.status);
1529
1530 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1531 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1532 lttng_viewer_next_index_return_code_string(rp_status));
1533 switch (rp_status) {
1534 case LTTNG_VIEWER_INDEX_INACTIVE:
1535 {
1536 uint64_t ctf_stream_class_id;
1537
1538 memset(index, 0, sizeof(struct packet_index));
1539 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1540 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1541 ctf_stream_class_id = be64toh(rp.stream_id);
1542 if (stream->ctf_stream_class_id.is_set) {
1543 BT_ASSERT(stream->ctf_stream_class_id.value==
1544 ctf_stream_class_id);
1545 } else {
1546 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1547 stream->ctf_stream_class_id.is_set = true;
1548 }
1549 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1550 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1551 break;
1552 }
1553 case LTTNG_VIEWER_INDEX_OK:
1554 {
1555 uint64_t ctf_stream_class_id;
1556
1557 lttng_index_to_packet_index(&rp, index);
1558 ctf_stream_class_id = be64toh(rp.stream_id);
1559 if (stream->ctf_stream_class_id.is_set) {
1560 BT_ASSERT(stream->ctf_stream_class_id.value==
1561 ctf_stream_class_id);
1562 } else {
1563 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1564 stream->ctf_stream_class_id.is_set = true;
1565 }
1566
1567 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1568
1569 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1570 BT_COMP_LOGD("Marking trace as needing new metadata: "
1571 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1572 lttng_viewer_next_index_return_code_string(rp_status),
1573 trace->id);
1574 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1575 }
1576 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1577 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1578 "response=%s, response-flag=NEW_STREAM",
1579 lttng_viewer_next_index_return_code_string(rp_status));
1580 lttng_live_need_new_streams(lttng_live_msg_iter);
1581 }
1582 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1583 break;
1584 }
1585 case LTTNG_VIEWER_INDEX_RETRY:
1586 memset(index, 0, sizeof(struct packet_index));
1587 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1588 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1589 goto end;
1590 case LTTNG_VIEWER_INDEX_HUP:
1591 memset(index, 0, sizeof(struct packet_index));
1592 index->offset = EOF;
1593 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1594 stream->has_stream_hung_up = true;
1595 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1596 break;
1597 case LTTNG_VIEWER_INDEX_ERR:
1598 memset(index, 0, sizeof(struct packet_index));
1599 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1600 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1601 goto end;
1602 default:
1603 BT_COMP_LOGD("Received get_next_index response: unknown value");
1604 memset(index, 0, sizeof(struct packet_index));
1605 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1606 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1607 goto end;
1608 }
1609 goto end;
1610
1611 error:
1612 status = viewer_status_to_live_iterator_status(viewer_status);
1613 end:
1614 return status;
1615 }
1616
1617 BT_HIDDEN
1618 enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
1619 struct lttng_live_msg_iter *lttng_live_msg_iter,
1620 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1621 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
1622 {
1623 enum ctf_msg_iter_medium_status status;
1624 enum lttng_live_viewer_status viewer_status;
1625 struct lttng_viewer_trace_packet rp;
1626 struct lttng_viewer_cmd cmd;
1627 struct lttng_viewer_get_packet rq;
1628 struct live_viewer_connection *viewer_connection =
1629 lttng_live_msg_iter->viewer_connection;
1630 bt_self_component *self_comp = viewer_connection->self_comp;
1631 struct lttng_live_trace *trace = stream->trace;
1632 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1633 char cmd_buf[cmd_buf_len];
1634 uint32_t flags, rp_status;
1635
1636 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1637 "offset=%" PRIu64 ", request-len=%" PRIu64,
1638 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1639 offset, req_len);
1640
1641 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1642 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1643 cmd.cmd_version = htobe32(0);
1644
1645 memset(&rq, 0, sizeof(rq));
1646 rq.stream_id = htobe64(stream->viewer_stream_id);
1647 rq.offset = htobe64(offset);
1648 rq.len = htobe32(req_len);
1649
1650 /*
1651 * Merge the cmd and connection request to prevent a write-write
1652 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1653 * second write to be performed quickly in presence of Nagle's algorithm.
1654 */
1655 memcpy(cmd_buf, &cmd, sizeof(cmd));
1656 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1657
1658 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1659 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1660 viewer_handle_send_status(self_comp, NULL,
1661 viewer_status, "get data packet command");
1662 goto error_convert_status;
1663 }
1664
1665 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1666 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1667 viewer_handle_recv_status(self_comp, NULL,
1668 viewer_status, "get data packet reply");
1669 goto error_convert_status;
1670 }
1671
1672 flags = be32toh(rp.flags);
1673 rp_status = be32toh(rp.status);
1674
1675 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1676 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1677 lttng_viewer_get_packet_return_code_string(rp_status));
1678 switch (rp_status) {
1679 case LTTNG_VIEWER_GET_PACKET_OK:
1680 req_len = be32toh(rp.len);
1681 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1682 lttng_viewer_get_packet_return_code_string(rp_status),
1683 req_len);
1684 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1685 break;
1686 case LTTNG_VIEWER_GET_PACKET_RETRY:
1687 /* Unimplemented by relay daemon */
1688 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1689 goto end;
1690 case LTTNG_VIEWER_GET_PACKET_ERR:
1691 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1692 BT_COMP_LOGD("Marking trace as needing new metadata: "
1693 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1694 lttng_viewer_next_index_return_code_string(rp_status),
1695 trace->id);
1696 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1697 }
1698 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1699 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1700 "response=%s, response-flag=NEW_STREAM",
1701 lttng_viewer_next_index_return_code_string(rp_status));
1702 lttng_live_need_new_streams(lttng_live_msg_iter);
1703 }
1704 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1705 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1706 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1707 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1708 lttng_viewer_get_packet_return_code_string(rp_status));
1709 goto end;
1710 }
1711 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1712 "Received get_data_packet response: error");
1713 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1714 goto end;
1715 case LTTNG_VIEWER_GET_PACKET_EOF:
1716 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1717 goto end;
1718 default:
1719 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1720 "Received get_data_packet response: unknown (%d)", rp_status);
1721 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1722 goto end;
1723 }
1724
1725 if (req_len == 0) {
1726 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1727 goto end;
1728 }
1729
1730 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1731 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1732 viewer_handle_recv_status(self_comp, NULL,
1733 viewer_status, "get data packet");
1734 goto error_convert_status;
1735 }
1736 *recv_len = req_len;
1737
1738 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1739 goto end;
1740
1741 error_convert_status:
1742 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
1743 end:
1744 return status;
1745 }
1746
1747 /*
1748 * Request new streams for a session.
1749 */
1750 BT_HIDDEN
1751 enum lttng_live_iterator_status lttng_live_session_get_new_streams(
1752 struct lttng_live_session *session,
1753 bt_self_message_iterator *self_msg_iter)
1754 {
1755 enum lttng_live_iterator_status status =
1756 LTTNG_LIVE_ITERATOR_STATUS_OK;
1757 struct lttng_viewer_cmd cmd;
1758 struct lttng_viewer_new_streams_request rq;
1759 struct lttng_viewer_new_streams_response rp;
1760 struct lttng_live_msg_iter *lttng_live_msg_iter =
1761 session->lttng_live_msg_iter;
1762 enum lttng_live_viewer_status viewer_status;
1763 struct live_viewer_connection *viewer_connection =
1764 lttng_live_msg_iter->viewer_connection;
1765 bt_self_component *self_comp = viewer_connection->self_comp;
1766 uint32_t streams_count;
1767 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1768 char cmd_buf[cmd_buf_len];
1769
1770 if (!session->new_streams_needed) {
1771 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1772 goto end;
1773 }
1774
1775 BT_COMP_LOGD("Requesting new streams for session: cmd=%s"
1776 "session-id=%" PRIu64,
1777 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS),
1778 session->id);
1779
1780 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1781 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1782 cmd.cmd_version = htobe32(0);
1783
1784 memset(&rq, 0, sizeof(rq));
1785 rq.session_id = htobe64(session->id);
1786
1787 /*
1788 * Merge the cmd and connection request to prevent a write-write
1789 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1790 * second write to be performed quickly in presence of Nagle's algorithm.
1791 */
1792 memcpy(cmd_buf, &cmd, sizeof(cmd));
1793 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1794
1795 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1796 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1797 viewer_handle_send_status(self_comp, NULL,
1798 viewer_status, "get new streams command");
1799 status = viewer_status_to_live_iterator_status(viewer_status);
1800 goto end;
1801 }
1802
1803 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1804 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1805 viewer_handle_recv_status(self_comp, NULL,
1806 viewer_status, "get new streams reply");
1807 status = viewer_status_to_live_iterator_status(viewer_status);
1808 goto end;
1809 }
1810
1811 streams_count = be32toh(rp.streams_count);
1812
1813 switch(be32toh(rp.status)) {
1814 case LTTNG_VIEWER_NEW_STREAMS_OK:
1815 session->new_streams_needed = false;
1816 break;
1817 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1818 session->new_streams_needed = false;
1819 goto end;
1820 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1821 session->new_streams_needed = false;
1822 session->closed = true;
1823 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1824 goto end;
1825 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1826 BT_COMP_LOGD("Received get_new_streams response: error");
1827 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1828 goto end;
1829 default:
1830 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1831 "Received get_new_streams response: Unknown:"
1832 "return code %u", be32toh(rp.status));
1833 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1834 goto end;
1835 }
1836
1837 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1838 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1839 viewer_handle_recv_status(self_comp, NULL,
1840 viewer_status, "new streams");
1841 status = viewer_status_to_live_iterator_status(viewer_status);
1842 goto end;
1843 }
1844
1845 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1846 end:
1847 return status;
1848 }
1849
1850 BT_HIDDEN
1851 enum lttng_live_viewer_status live_viewer_connection_create(
1852 bt_self_component *self_comp,
1853 bt_self_component_class *self_comp_class,
1854 bt_logging_level log_level,
1855 const char *url, bool in_query,
1856 struct lttng_live_msg_iter *lttng_live_msg_iter,
1857 struct live_viewer_connection **viewer)
1858 {
1859 struct live_viewer_connection *viewer_connection;
1860 enum lttng_live_viewer_status status;
1861
1862 viewer_connection = g_new0(struct live_viewer_connection, 1);
1863
1864 if (bt_socket_init(log_level) != 0) {
1865 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1866 self_comp_class, "Failed to init socket");
1867 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1868 goto error;
1869 }
1870
1871 viewer_connection->log_level = log_level;
1872
1873 viewer_connection->self_comp = self_comp;
1874 viewer_connection->self_comp_class = self_comp_class;
1875
1876 viewer_connection->control_sock = BT_INVALID_SOCKET;
1877 viewer_connection->port = -1;
1878 viewer_connection->in_query = in_query;
1879 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1880 viewer_connection->url = g_string_new(url);
1881 if (!viewer_connection->url) {
1882 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1883 self_comp_class, "Failed to allocate URL buffer");
1884 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1885 goto error;
1886 }
1887
1888 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1889 "Establishing connection to url \"%s\"...", url);
1890 status = lttng_live_connect_viewer(viewer_connection);
1891 /*
1892 * Only print error and append cause in case of error. not in case of
1893 * interruption.
1894 */
1895 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1896 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1897 self_comp_class, "Failed to establish connection: "
1898 "url=\"%s\"", url);
1899 goto error;
1900 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1901 goto error;
1902 }
1903 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1904 "Connection to url \"%s\" is established", url);
1905
1906 *viewer = viewer_connection;
1907 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1908 goto end;
1909
1910 error:
1911 if (viewer_connection) {
1912 live_viewer_connection_destroy(viewer_connection);
1913 }
1914 end:
1915 return status;
1916 }
1917
1918 BT_HIDDEN
1919 void live_viewer_connection_destroy(
1920 struct live_viewer_connection *viewer_connection)
1921 {
1922 bt_self_component *self_comp = viewer_connection->self_comp;
1923 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
1924
1925 if (!viewer_connection) {
1926 goto end;
1927 }
1928
1929 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1930 "Closing connection to relay:"
1931 "relay-url=\"%s\"", viewer_connection->url->str);
1932
1933 lttng_live_disconnect_viewer(viewer_connection);
1934
1935 if (viewer_connection->url) {
1936 g_string_free(viewer_connection->url, true);
1937 }
1938
1939 if (viewer_connection->relay_hostname) {
1940 g_string_free(viewer_connection->relay_hostname, true);
1941 }
1942
1943 if (viewer_connection->target_hostname) {
1944 g_string_free(viewer_connection->target_hostname, true);
1945 }
1946
1947 if (viewer_connection->session_name) {
1948 g_string_free(viewer_connection->session_name, true);
1949 }
1950
1951 if (viewer_connection->proto) {
1952 g_string_free(viewer_connection->proto, true);
1953 }
1954
1955 g_free(viewer_connection);
1956
1957 bt_socket_fini();
1958
1959 end:
1960 return;
1961 }
This page took 0.141071 seconds and 4 git commands to generate.