Implement ctf.lttng-live component
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
CommitLineData
7cdc2bab
MD
1/*
2 * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <stdio.h>
24#include <stdint.h>
25#include <stdlib.h>
26#include <stdbool.h>
27#include <unistd.h>
28#include <glib.h>
29#include <inttypes.h>
30#include <sys/socket.h>
31#include <sys/types.h>
32#include <netinet/in.h>
33#include <netdb.h>
34#include <fcntl.h>
35#include <poll.h>
36
37#include <babeltrace/compat/send-internal.h>
38#include <babeltrace/compiler-internal.h>
39
40#include "lttng-live-internal.h"
41#include "viewer-connection.h"
42#include "lttng-viewer-abi.h"
43#include "data-stream.h"
44#include "metadata.h"
45
46#define PRINT_ERR_STREAM viewer_connection->error_fp
47#define PRINT_PREFIX "lttng-live-viewer-connection"
48#define PRINT_DBG_CHECK lttng_live_debug
49#include "../print.h"
50
51static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
52{
53 ssize_t ret;
54 size_t copied = 0, to_copy = len;
55
56 do {
57 ret = recv(fd, buf + copied, to_copy, 0);
58 if (ret > 0) {
59 assert(ret <= to_copy);
60 copied += ret;
61 to_copy -= ret;
62 }
63 } while ((ret > 0 && to_copy > 0)
64 || (ret < 0 && errno == EINTR));
65 if (ret > 0)
66 ret = copied;
67 /* ret = 0 means orderly shutdown, ret < 0 is error. */
68 return ret;
69}
70
71static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
72{
73 ssize_t ret;
74
75 do {
76 ret = bt_send_nosigpipe(fd, buf, len);
77 } while (ret < 0 && errno == EINTR);
78 return ret;
79}
80
81/*
82 * hostname parameter needs to hold MAXNAMLEN chars.
83 */
84static int parse_url(struct bt_live_viewer_connection *viewer_connection)
85{
86 char remain[3][MAXNAMLEN];
87 int ret = -1, proto, proto_offset = 0;
88 const char *path = viewer_connection->url->str;
89 size_t path_len;
90
91 if (!path) {
92 goto end;
93 }
94 path_len = strlen(path); /* not accounting \0 */
95
96 /*
97 * Since sscanf API does not allow easily checking string length
98 * against a size defined by a macro. Test it beforehand on the
99 * input. We know the output is always <= than the input length.
100 */
101 if (path_len >= MAXNAMLEN) {
102 goto end;
103 }
104 ret = sscanf(path, "net%d://", &proto);
105 if (ret < 1) {
106 proto = 4;
107 /* net:// */
108 proto_offset = strlen("net://");
109 } else {
110 /* net4:// or net6:// */
111 proto_offset = strlen("netX://");
112 }
113 if (proto_offset > path_len) {
114 goto end;
115 }
116 if (proto == 6) {
117 PERR("[error] IPv6 is currently unsupported by lttng-live\n");
118 goto end;
119 }
120 /* TODO : parse for IPv6 as well */
121 /* Parse the hostname or IP */
122 ret = sscanf(&path[proto_offset], "%[a-zA-Z.0-9%-]%s",
123 viewer_connection->relay_hostname, remain[0]);
124 if (ret == 2) {
125 /* Optional port number */
126 switch (remain[0][0]) {
127 case ':':
128 ret = sscanf(remain[0], ":%d%s", &viewer_connection->port, remain[1]);
129 /* Optional session ID with port number */
130 if (ret == 2) {
131 ret = sscanf(remain[1], "/%s", remain[2]);
132 /* Accept 0 or 1 (optional) */
133 if (ret < 0) {
134 goto end;
135 }
136 } else if (ret == 0) {
137 PERR("[error] Missing port number after delimitor ':'\n");
138 ret = -1;
139 goto end;
140 }
141 break;
142 case '/':
143 /* Optional session ID */
144 ret = sscanf(remain[0], "/%s", remain[2]);
145 /* Accept 0 or 1 (optional) */
146 if (ret < 0) {
147 goto end;
148 }
149 break;
150 default:
151 PERR("[error] wrong delimitor : %c\n", remain[0][0]);
152 ret = -1;
153 goto end;
154 }
155 }
156
157 if (viewer_connection->port < 0) {
158 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
159 }
160
161 if (strlen(remain[2]) == 0) {
162 PDBG("Connecting to hostname : %s, port : %d, "
163 "proto : IPv%d\n",
164 viewer_connection->relay_hostname,
165 viewer_connection->port,
166 proto);
167 ret = 0;
168 goto end;
169 }
170 ret = sscanf(remain[2], "host/%[a-zA-Z.0-9%-]/%s",
171 viewer_connection->target_hostname,
172 viewer_connection->session_name);
173 if (ret != 2) {
174 PERR("[error] Format : "
175 "net://<hostname>/host/<target_hostname>/<session_name>\n");
176 goto end;
177 }
178
179 PDBG("Connecting to hostname : %s, port : %d, "
180 "target hostname : %s, session name : %s, "
181 "proto : IPv%d\n",
182 viewer_connection->relay_hostname,
183 viewer_connection->port,
184 viewer_connection->target_hostname,
185 viewer_connection->session_name, proto);
186 ret = 0;
187
188end:
189 return ret;
190}
191
192static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
193{
194 struct lttng_viewer_cmd cmd;
195 struct lttng_viewer_connect connect;
196 int ret;
197 ssize_t ret_len;
198
199 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
200 cmd.data_size = htobe64((uint64_t) sizeof(connect));
201 cmd.cmd_version = htobe32(0);
202
203 connect.viewer_session_id = -1ULL; /* will be set on recv */
204 connect.major = htobe32(LTTNG_LIVE_MAJOR);
205 connect.minor = htobe32(LTTNG_LIVE_MINOR);
206 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
207
208 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
209 if (ret_len < 0) {
210 PERR("Error sending cmd: %s\n", strerror(errno));
211 goto error;
212 }
213 assert(ret_len == sizeof(cmd));
214
215 ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
216 if (ret_len < 0) {
217 PERR("Error sending version: %s\n", strerror(errno));
218 goto error;
219 }
220 assert(ret_len == sizeof(connect));
221
222 ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
223 if (ret_len == 0) {
224 PERR("Remote side has closed connection\n");
225 goto error;
226 }
227 if (ret_len < 0) {
228 PERR("[error] Error receiving version: %s", strerror(errno));
229 goto error;
230 }
231 assert(ret_len == sizeof(connect));
232
233 PDBG("Received viewer session ID : %" PRIu64 "\n",
234 be64toh(connect.viewer_session_id));
235 PDBG("Relayd version : %u.%u\n", be32toh(connect.major),
236 be32toh(connect.minor));
237
238 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
239 PERR("Incompatible lttng-relayd protocol\n");
240 goto error;
241 }
242 /* Use the smallest protocol version implemented. */
243 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
244 viewer_connection->minor = be32toh(connect.minor);
245 } else {
246 viewer_connection->minor = LTTNG_LIVE_MINOR;
247 }
248 viewer_connection->major = LTTNG_LIVE_MAJOR;
249 ret = 0;
250 return ret;
251
252error:
253 PERR("Unable to establish connection\n");
254 return -1;
255}
256
257static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
258{
259 struct hostent *host;
260 struct sockaddr_in server_addr;
261 int ret;
262
263 if (parse_url(viewer_connection)) {
264 goto error;
265 }
266
267 host = gethostbyname(viewer_connection->relay_hostname);
268 if (!host) {
269 PERR("[error] Cannot lookup hostname %s\n",
270 viewer_connection->relay_hostname);
271 goto error;
272 }
273
274 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
275 PERR("[error] Socket creation failed: %s\n", strerror(errno));
276 goto error;
277 }
278
279 server_addr.sin_family = AF_INET;
280 server_addr.sin_port = htons(viewer_connection->port);
281 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
282 memset(&(server_addr.sin_zero), 0, 8);
283
284 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
285 sizeof(struct sockaddr)) == -1) {
286 PERR("[error] Connection failed: %s\n", strerror(errno));
287 goto error;
288 }
289 if (lttng_live_handshake(viewer_connection)) {
290 goto error;
291 }
292
293 ret = 0;
294
295 return ret;
296
297error:
298 if (viewer_connection->control_sock >= 0) {
299 if (close(viewer_connection->control_sock)) {
300 PERR("Close: %s", strerror(errno));
301 }
302 }
303 viewer_connection->control_sock = -1;
304 return -1;
305}
306
307static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
308{
309 if (viewer_connection->control_sock < 0) {
310 return;
311 }
312 if (close(viewer_connection->control_sock)) {
313 PERR("Close: %s", strerror(errno));
314 viewer_connection->control_sock = -1;
315 }
316}
317
318static void connection_release(struct bt_object *obj)
319{
320 struct bt_live_viewer_connection *conn =
321 container_of(obj, struct bt_live_viewer_connection, obj);
322
323 bt_live_viewer_connection_destroy(conn);
324}
325
326static
327enum bt_value_status list_update_session(struct bt_value *results,
328 const struct lttng_viewer_session *session,
329 bool *_found)
330{
331 enum bt_value_status ret = BT_VALUE_STATUS_OK;
332 struct bt_value *map = NULL;
333 struct bt_value *hostname = NULL;
334 struct bt_value *session_name = NULL;
335 struct bt_value *btval = NULL;
336 int i, len;
337 bool found = false;
338
339 len = bt_value_array_size(results);
340 if (len < 0) {
341 ret = BT_VALUE_STATUS_ERROR;
342 goto end;
343 }
344 for (i = 0; i < len; i++) {
345 const char *hostname_str = NULL;
346 const char *session_name_str = NULL;
347
348 map = bt_value_array_get(results, (size_t) i);
349 if (!map) {
350 ret = BT_VALUE_STATUS_ERROR;
351 goto end;
352 }
353 hostname = bt_value_map_get(map, "target-hostname");
354 if (!hostname) {
355 ret = BT_VALUE_STATUS_ERROR;
356 goto end;
357 }
358 session_name = bt_value_map_get(map, "session-name");
359 if (!session_name) {
360 ret = BT_VALUE_STATUS_ERROR;
361 goto end;
362 }
363 ret = bt_value_string_get(hostname, &hostname_str);
364 if (ret != BT_VALUE_STATUS_OK) {
365 goto end;
366 }
367 ret = bt_value_string_get(session_name, &session_name_str);
368 if (ret != BT_VALUE_STATUS_OK) {
369 goto end;
370 }
371
372 if (!strcmp(session->hostname, hostname_str)
373 && !strcmp(session->session_name,
374 session_name_str)) {
375 int64_t val;
376 uint32_t streams = be32toh(session->streams);
377 uint32_t clients = be32toh(session->clients);
378
379 found = true;
380
381 btval = bt_value_map_get(map, "stream-count");
382 if (!btval) {
383 ret = BT_VALUE_STATUS_ERROR;
384 goto end;
385 }
386 ret = bt_value_integer_get(btval, &val);
387 if (ret != BT_VALUE_STATUS_OK) {
388 goto end;
389 }
390 /* sum */
391 val += streams;
392 ret = bt_value_integer_set(btval, val);
393 if (ret != BT_VALUE_STATUS_OK) {
394 goto end;
395 }
396 BT_PUT(btval);
397
398 btval = bt_value_map_get(map, "client-count");
399 if (!btval) {
400 ret = BT_VALUE_STATUS_ERROR;
401 goto end;
402 }
403 ret = bt_value_integer_get(btval, &val);
404 if (ret != BT_VALUE_STATUS_OK) {
405 goto end;
406 }
407 /* max */
408 val = max_t(int64_t, clients, val);
409 ret = bt_value_integer_set(btval, val);
410 if (ret != BT_VALUE_STATUS_OK) {
411 goto end;
412 }
413 BT_PUT(btval);
414 }
415
416 BT_PUT(hostname);
417 BT_PUT(session_name);
418 BT_PUT(map);
419
420 if (found) {
421 break;
422 }
423 }
424end:
425 BT_PUT(btval);
426 BT_PUT(hostname);
427 BT_PUT(session_name);
428 BT_PUT(map);
429 *_found = found;
430 return ret;
431}
432
433static
434enum bt_value_status list_append_session(struct bt_value *results,
435 GString *base_url,
436 const struct lttng_viewer_session *session)
437{
438 enum bt_value_status ret = BT_VALUE_STATUS_OK;
439 struct bt_value *map = NULL;
440 GString *url = NULL;
441 bool found = false;
442
443 /*
444 * If the session already exists, add the stream count to it,
445 * and do max of client counts.
446 */
447 ret = list_update_session(results, session, &found);
448 if (ret != BT_VALUE_STATUS_OK || found) {
449 goto end;
450 }
451
452 map = bt_value_map_create();
453 if (!map) {
454 ret = BT_VALUE_STATUS_ERROR;
455 goto end;
456 }
457
458 if (base_url->len < 1) {
459 ret = BT_VALUE_STATUS_ERROR;
460 goto end;
461 }
462 /*
463 * key = "url",
464 * value = <string>,
465 */
466 url = g_string_new(base_url->str);
467 g_string_append(url, "/host/");
468 g_string_append(url, session->hostname);
469 g_string_append_c(url, '/');
470 g_string_append(url, session->session_name);
471
472 ret = bt_value_map_insert_string(map, "url", url->str);
473 if (ret != BT_VALUE_STATUS_OK) {
474 goto end;
475 }
476
477 /*
478 * key = "target-hostname",
479 * value = <string>,
480 */
481 ret = bt_value_map_insert_string(map, "target-hostname",
482 session->hostname);
483 if (ret != BT_VALUE_STATUS_OK) {
484 goto end;
485 }
486
487 /*
488 * key = "session-name",
489 * value = <string>,
490 */
491 ret = bt_value_map_insert_string(map, "session-name",
492 session->session_name);
493 if (ret != BT_VALUE_STATUS_OK) {
494 goto end;
495 }
496
497 /*
498 * key = "timer-us",
499 * value = <integer>,
500 */
501 {
502 uint32_t live_timer = be32toh(session->live_timer);
503
504 ret = bt_value_map_insert_integer(map, "timer-us",
505 live_timer);
506 if (ret != BT_VALUE_STATUS_OK) {
507 goto end;
508 }
509 }
510
511 /*
512 * key = "stream-count",
513 * value = <integer>,
514 */
515 {
516 uint32_t streams = be32toh(session->streams);
517
518 ret = bt_value_map_insert_integer(map, "stream-count",
519 streams);
520 if (ret != BT_VALUE_STATUS_OK) {
521 goto end;
522 }
523 }
524
525
526 /*
527 * key = "client-count",
528 * value = <integer>,
529 */
530 {
531 uint32_t clients = be32toh(session->clients);
532
533 ret = bt_value_map_insert_integer(map, "client-count",
534 clients);
535 if (ret != BT_VALUE_STATUS_OK) {
536 goto end;
537 }
538 }
539
540 ret = bt_value_array_append(results, map);
541end:
542 if (url) {
543 g_string_free(url, TRUE);
544 }
545 BT_PUT(map);
546 return ret;
547}
548
549/*
550 * Data structure returned:
551 *
552 * {
553 * <array> = {
554 * [n] = {
555 * <map> = {
556 * {
557 * key = "url",
558 * value = <string>,
559 * },
560 * {
561 * key = "target-hostname",
562 * value = <string>,
563 * },
564 * {
565 * key = "session-name",
566 * value = <string>,
567 * },
568 * {
569 * key = "timer-us",
570 * value = <integer>,
571 * },
572 * {
573 * key = "stream-count",
574 * value = <integer>,
575 * },
576 * {
577 * key = "client-count",
578 * value = <integer>,
579 * },
580 * },
581 * }
582 * }
583 */
584
585BT_HIDDEN
586struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
587{
588 struct bt_value *results = NULL;
589 struct lttng_viewer_cmd cmd;
590 struct lttng_viewer_list_sessions list;
591 uint32_t i, sessions_count;
592 ssize_t ret_len;
593
594 if (lttng_live_handshake(viewer_connection)) {
595 goto error;
596 }
597
598 results = bt_value_array_create();
599 if (!results) {
600 fprintf(stderr, "Error creating array\n");
601 goto error;
602 }
603
604 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
605 cmd.data_size = htobe64((uint64_t) 0);
606 cmd.cmd_version = htobe32(0);
607
608 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
609 if (ret_len < 0) {
610 fprintf(stderr, "Error sending cmd: %s\n", strerror(errno));
611 goto error;
612 }
613 assert(ret_len == sizeof(cmd));
614
615 ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
616 if (ret_len == 0) {
617 fprintf(stderr, "Remote side has closed connection\n");
618 goto error;
619 }
620 if (ret_len < 0) {
621 fprintf(stderr, "Error receiving session list: %s\n", strerror(errno));
622 goto error;
623 }
624 assert(ret_len == sizeof(list));
625
626 sessions_count = be32toh(list.sessions_count);
627 for (i = 0; i < sessions_count; i++) {
628 struct lttng_viewer_session lsession;
629
630 ret_len = lttng_live_recv(viewer_connection->control_sock,
631 &lsession, sizeof(lsession));
632 if (ret_len == 0) {
633 fprintf(stderr, "Remote side has closed connection\n");
634 goto error;
635 }
636 if (ret_len < 0) {
637 fprintf(stderr, "Error receiving session: %s\n", strerror(errno));
638 goto error;
639 }
640 assert(ret_len == sizeof(lsession));
641 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
642 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
643 if (list_append_session(results,
644 viewer_connection->url, &lsession)
645 != BT_VALUE_STATUS_OK) {
646 goto error;
647 }
648 }
649 goto end;
650error:
651 BT_PUT(results);
652end:
653 return results;
654}
655
656static
657int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
658{
659 struct lttng_viewer_cmd cmd;
660 struct lttng_viewer_list_sessions list;
661 struct lttng_viewer_session lsession;
662 uint32_t i, sessions_count;
663 ssize_t ret_len;
664 uint64_t session_id;
665 struct bt_live_viewer_connection *viewer_connection =
666 lttng_live->viewer_connection;
667
668 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
669 cmd.data_size = htobe64((uint64_t) 0);
670 cmd.cmd_version = htobe32(0);
671
672 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
673 if (ret_len < 0) {
674 PERR("Error sending cmd: %s\n", strerror(errno));
675 goto error;
676 }
677 assert(ret_len == sizeof(cmd));
678
679 ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
680 if (ret_len == 0) {
681 PERR("Remote side has closed connection\n");
682 goto error;
683 }
684 if (ret_len < 0) {
685 PERR("Error receiving session list: %s\n", strerror(errno));
686 goto error;
687 }
688 assert(ret_len == sizeof(list));
689
690 sessions_count = be32toh(list.sessions_count);
691 for (i = 0; i < sessions_count; i++) {
692 ret_len = lttng_live_recv(viewer_connection->control_sock,
693 &lsession, sizeof(lsession));
694 if (ret_len == 0) {
695 PERR("Remote side has closed connection\n");
696 goto error;
697 }
698 if (ret_len < 0) {
699 PERR("Error receiving session: %s\n", strerror(errno));
700 goto error;
701 }
702 assert(ret_len == sizeof(lsession));
703 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
704 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
705 session_id = be64toh(lsession.id);
706
707 if ((strncmp(lsession.session_name,
708 viewer_connection->session_name,
709 MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
710 viewer_connection->target_hostname,
711 MAXNAMLEN) == 0)) {
712 if (lttng_live_add_session(lttng_live, session_id)) {
713 goto error;
714 }
715 }
716 }
717
718 return 0;
719
720error:
721 PERR("Unable to query session ids\n");
722 return -1;
723}
724
725BT_HIDDEN
726int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
727{
728 struct lttng_viewer_cmd cmd;
729 struct lttng_viewer_create_session_response resp;
730 ssize_t ret_len;
731 struct bt_live_viewer_connection *viewer_connection =
732 lttng_live->viewer_connection;
733
734 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
735 cmd.data_size = htobe64((uint64_t) 0);
736 cmd.cmd_version = htobe32(0);
737
738 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
739 if (ret_len < 0) {
740 PERR("Error sending cmd: %s\n", strerror(errno));
741 goto error;
742 }
743 assert(ret_len == sizeof(cmd));
744
745 ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
746 if (ret_len == 0) {
747 PERR("Remote side has closed connection\n");
748 goto error;
749 }
750 if (ret_len < 0) {
751 PERR("Error receiving create session reply: %s\n", strerror(errno));
752 goto error;
753 }
754 assert(ret_len == sizeof(resp));
755
756 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
757 PERR("Error creating viewer session\n");
758 goto error;
759 }
760 if (lttng_live_query_session_ids(lttng_live)) {
761 goto error;
762 }
763
764 return 0;
765
766error:
767 return -1;
768}
769
770static
771int receive_streams(struct lttng_live_session *session,
772 uint32_t stream_count)
773{
774 ssize_t ret_len;
775 uint32_t i;
776 struct lttng_live_component *lttng_live = session->lttng_live;
777 struct bt_live_viewer_connection *viewer_connection =
778 lttng_live->viewer_connection;
779
780 PDBG("Getting %" PRIu32 " new streams:\n", stream_count);
781 for (i = 0; i < stream_count; i++) {
782 struct lttng_viewer_stream stream;
783 struct lttng_live_stream_iterator *live_stream;
784 uint64_t stream_id;
785 uint64_t ctf_trace_id;
786
787 ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
788 if (ret_len == 0) {
789 PERR("Remote side has closed connection\n");
790 goto error;
791 }
792 if (ret_len < 0) {
793 PERR("Error receiving stream\n");
794 goto error;
795 }
796 assert(ret_len == sizeof(stream));
797 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
798 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
799 stream_id = be64toh(stream.id);
800 ctf_trace_id = be64toh(stream.ctf_trace_id);
801
802 if (stream.metadata_flag) {
803 PDBG(" metadata stream %" PRIu64 " : %s/%s\n",
804 stream_id, stream.path_name,
805 stream.channel_name);
806 if (lttng_live_metadata_create_stream(session,
807 ctf_trace_id, stream_id)) {
808 PERR("Error creating metadata stream\n");
809
810 goto error;
811 }
812 session->lazy_stream_notif_init = true;
813 } else {
814 PDBG(" stream %" PRIu64 " : %s/%s\n",
815 stream_id, stream.path_name,
816 stream.channel_name);
817 live_stream = lttng_live_stream_iterator_create(session,
818 ctf_trace_id, stream_id);
819 if (!live_stream) {
820 PERR("Error creating stream\n");
821 goto error;
822 }
823 }
824 }
825 return 0;
826
827error:
828 return -1;
829}
830
831BT_HIDDEN
832int lttng_live_attach_session(struct lttng_live_session *session)
833{
834 struct lttng_viewer_cmd cmd;
835 struct lttng_viewer_attach_session_request rq;
836 struct lttng_viewer_attach_session_response rp;
837 ssize_t ret_len;
838 struct lttng_live_component *lttng_live = session->lttng_live;
839 struct bt_live_viewer_connection *viewer_connection =
840 lttng_live->viewer_connection;
841 uint64_t session_id = session->id;
842 uint32_t streams_count;
843
844 if (session->attached) {
845 return 0;
846 }
847
848 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
849 cmd.data_size = htobe64((uint64_t) sizeof(rq));
850 cmd.cmd_version = htobe32(0);
851
852 memset(&rq, 0, sizeof(rq));
853 rq.session_id = htobe64(session_id);
854 // TODO: add cmd line parameter to select seek beginning
855 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
856 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
857
858 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
859 if (ret_len < 0) {
860 PERR("Error sending cmd: %s\n", strerror(errno));
861 goto error;
862 }
863 assert(ret_len == sizeof(cmd));
864
865 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
866 if (ret_len < 0) {
867 PERR("Error sending attach request: %s\n", strerror(errno));
868 goto error;
869 }
870 assert(ret_len == sizeof(rq));
871
872 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
873 if (ret_len == 0) {
874 PERR("Remote side has closed connection\n");
875 goto error;
876 }
877 if (ret_len < 0) {
878 PERR("Error receiving attach response: %s\n", strerror(errno));
879 goto error;
880 }
881 assert(ret_len == sizeof(rp));
882
883 streams_count = be32toh(rp.streams_count);
884 switch(be32toh(rp.status)) {
885 case LTTNG_VIEWER_ATTACH_OK:
886 break;
887 case LTTNG_VIEWER_ATTACH_UNK:
888 PERR("Session id %" PRIu64 " is unknown\n", session_id);
889 goto error;
890 case LTTNG_VIEWER_ATTACH_ALREADY:
891 PERR("There is already a viewer attached to this session\n");
892 goto error;
893 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
894 PERR("Not a live session\n");
895 goto error;
896 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
897 PERR("Wrong seek parameter\n");
898 goto error;
899 default:
900 PERR("Unknown attach return code %u\n", be32toh(rp.status));
901 goto error;
902 }
903
904 /* We receive the initial list of streams. */
905 if (receive_streams(session, streams_count)) {
906 goto error;
907 }
908
909 session->attached = true;
910 session->new_streams_needed = false;
911
912 return 0;
913
914error:
915 return -1;
916}
917
918BT_HIDDEN
919int lttng_live_detach_session(struct lttng_live_session *session)
920{
921 struct lttng_viewer_cmd cmd;
922 struct lttng_viewer_detach_session_request rq;
923 struct lttng_viewer_detach_session_response rp;
924 ssize_t ret_len;
925 struct lttng_live_component *lttng_live = session->lttng_live;
926 struct bt_live_viewer_connection *viewer_connection =
927 lttng_live->viewer_connection;
928 uint64_t session_id = session->id;
929
930 if (!session->attached) {
931 return 0;
932 }
933
934 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
935 cmd.data_size = htobe64((uint64_t) sizeof(rq));
936 cmd.cmd_version = htobe32(0);
937
938 memset(&rq, 0, sizeof(rq));
939 rq.session_id = htobe64(session_id);
940
941 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
942 if (ret_len < 0) {
943 PERR("Error sending cmd: %s\n", strerror(errno));
944 goto error;
945 }
946 assert(ret_len == sizeof(cmd));
947
948 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
949 if (ret_len < 0) {
950 PERR("Error sending detach request: %s\n", strerror(errno));
951 goto error;
952 }
953 assert(ret_len == sizeof(rq));
954
955 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
956 if (ret_len == 0) {
957 PERR("Remote side has closed connection\n");
958 goto error;
959 }
960 if (ret_len < 0) {
961 PERR("Error receiving detach response: %s\n", strerror(errno));
962 goto error;
963 }
964 assert(ret_len == sizeof(rp));
965
966 switch(be32toh(rp.status)) {
967 case LTTNG_VIEWER_DETACH_SESSION_OK:
968 break;
969 case LTTNG_VIEWER_DETACH_SESSION_UNK:
970 PERR("Session id %" PRIu64 " is unknown\n", session_id);
971 goto error;
972 case LTTNG_VIEWER_DETACH_SESSION_ERR:
973 PERR("Error detaching session id %" PRIu64 "\n", session_id);
974 goto error;
975 default:
976 PERR("Unknown detach return code %u\n", be32toh(rp.status));
977 goto error;
978 }
979
980 session->attached = false;
981
982 return 0;
983
984error:
985 return -1;
986}
987
988BT_HIDDEN
989ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
990 FILE *fp)
991{
992 uint64_t len = 0;
993 int ret;
994 struct lttng_viewer_cmd cmd;
995 struct lttng_viewer_get_metadata rq;
996 struct lttng_viewer_metadata_packet rp;
997 char *data = NULL;
998 ssize_t ret_len;
999 struct lttng_live_session *session = trace->session;
1000 struct lttng_live_component *lttng_live = session->lttng_live;
1001 struct lttng_live_metadata *metadata = trace->metadata;
1002 struct bt_live_viewer_connection *viewer_connection =
1003 lttng_live->viewer_connection;
1004
1005 rq.stream_id = htobe64(metadata->stream_id);
1006 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1007 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1008 cmd.cmd_version = htobe32(0);
1009
1010 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
1011 if (ret_len < 0) {
1012 PERR("Error sending cmd: %s\n", strerror(errno));
1013 goto error;
1014 }
1015 assert(ret_len == sizeof(cmd));
1016
1017 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
1018 if (ret_len < 0) {
1019 PERR("Error sending get_metadata request: %s\n", strerror(errno));
1020 goto error;
1021 }
1022 assert(ret_len == sizeof(rq));
1023
1024 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
1025 if (ret_len == 0) {
1026 PERR("Remote side has closed connection\n");
1027 goto error;
1028 }
1029 if (ret_len < 0) {
1030 PERR("Error receiving get_metadata response: %s\n", strerror(errno));
1031 goto error;
1032 }
1033 assert(ret_len == sizeof(rp));
1034
1035 switch (be32toh(rp.status)) {
1036 case LTTNG_VIEWER_METADATA_OK:
1037 PDBG("get_metadata : OK\n");
1038 break;
1039 case LTTNG_VIEWER_NO_NEW_METADATA:
1040 PDBG("get_metadata : NO NEW\n");
1041 ret = 0;
1042 goto end;
1043 case LTTNG_VIEWER_METADATA_ERR:
1044 PDBG("get_metadata : ERR\n");
1045 goto error;
1046 default:
1047 PDBG("get_metadata : UNKNOWN\n");
1048 goto error;
1049 }
1050
1051 len = be64toh(rp.len);
1052 PDBG("Writing %" PRIu64" bytes to metadata\n", len);
1053 if (len <= 0) {
1054 goto error;
1055 }
1056
1057 data = zmalloc(len);
1058 if (!data) {
1059 PERR("relay data zmalloc: %s", strerror(errno));
1060 goto error;
1061 }
1062 ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
1063 if (ret_len == 0) {
1064 PERR("[error] Remote side has closed connection\n");
1065 goto error_free_data;
1066 }
1067 if (ret_len < 0) {
1068 PERR("[error] Error receiving trace packet: %s", strerror(errno));
1069 goto error_free_data;
1070 }
1071 assert(ret_len == len);
1072
1073 do {
1074 ret_len = fwrite(data, 1, len, fp);
1075 } while (ret_len < 0 && errno == EINTR);
1076 if (ret_len < 0) {
1077 PERR("[error] Writing in the metadata fp\n");
1078 goto error_free_data;
1079 }
1080 assert(ret_len == len);
1081 free(data);
1082 ret = len;
1083end:
1084 return ret;
1085
1086error_free_data:
1087 free(data);
1088error:
1089 return -1;
1090}
1091
1092/*
1093 * Assign the fields from a lttng_viewer_index to a packet_index.
1094 */
1095static
1096void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1097 struct packet_index *pindex)
1098{
1099 assert(lindex);
1100 assert(pindex);
1101
1102 pindex->offset = be64toh(lindex->offset);
1103 pindex->packet_size = be64toh(lindex->packet_size);
1104 pindex->content_size = be64toh(lindex->content_size);
1105 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1106 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1107 pindex->events_discarded = be64toh(lindex->events_discarded);
1108}
1109
1110BT_HIDDEN
1111enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
1112 struct lttng_live_stream_iterator *stream,
1113 struct packet_index *index)
1114{
1115 struct lttng_viewer_cmd cmd;
1116 struct lttng_viewer_get_next_index rq;
1117 ssize_t ret_len;
1118 struct lttng_viewer_index rp;
1119 uint32_t flags, status;
1120 enum bt_ctf_lttng_live_iterator_status retstatus =
1121 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1122 struct bt_live_viewer_connection *viewer_connection =
1123 lttng_live->viewer_connection;
1124 struct lttng_live_trace *trace = stream->trace;
1125
1126 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1127 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1128 cmd.cmd_version = htobe32(0);
1129
1130 memset(&rq, 0, sizeof(rq));
1131 rq.stream_id = htobe64(stream->viewer_stream_id);
1132
1133 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
1134 if (ret_len < 0) {
1135 PERR("Error sending cmd: %s\n", strerror(errno));
1136 goto error;
1137 }
1138 assert(ret_len == sizeof(cmd));
1139
1140 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
1141 if (ret_len < 0) {
1142 PERR("Error sending get_next_index request: %s\n", strerror(errno));
1143 goto error;
1144 }
1145 assert(ret_len == sizeof(rq));
1146
1147 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
1148 if (ret_len == 0) {
1149 PERR("Remote side has closed connection\n");
1150 goto error;
1151 }
1152 if (ret_len < 0) {
1153 PERR("Error receiving get_next_index response: %s\n", strerror(errno));
1154 goto error;
1155 }
1156 assert(ret_len == sizeof(rp));
1157
1158 flags = be32toh(rp.flags);
1159 status = be32toh(rp.status);
1160
1161 switch (status) {
1162 case LTTNG_VIEWER_INDEX_INACTIVE:
1163 {
1164 uint64_t ctf_stream_class_id;
1165
1166 PDBG("get_next_index: inactive\n");
1167 memset(index, 0, sizeof(struct packet_index));
1168 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1169 stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
1170 ctf_stream_class_id = be64toh(rp.stream_id);
1171 if (stream->ctf_stream_class_id != -1ULL) {
1172 assert(stream->ctf_stream_class_id ==
1173 ctf_stream_class_id);
1174 } else {
1175 stream->ctf_stream_class_id = ctf_stream_class_id;
1176 }
1177 stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
1178 break;
1179 }
1180 case LTTNG_VIEWER_INDEX_OK:
1181 {
1182 uint64_t ctf_stream_class_id;
1183
1184 PDBG("get_next_index: OK\n");
1185 lttng_index_to_packet_index(&rp, index);
1186 ctf_stream_class_id = be64toh(rp.stream_id);
1187 if (stream->ctf_stream_class_id != -1ULL) {
1188 assert(stream->ctf_stream_class_id ==
1189 ctf_stream_class_id);
1190 } else {
1191 stream->ctf_stream_class_id = ctf_stream_class_id;
1192 }
1193
1194 stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
1195 stream->current_packet_end_timestamp =
1196 index->ts_cycles.timestamp_end;
1197
1198 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1199 PDBG("get_next_index: new metadata needed\n");
1200 trace->new_metadata_needed = true;
1201 }
1202 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1203 PDBG("get_next_index: new streams needed\n");
1204 lttng_live_need_new_streams(lttng_live);
1205 }
1206 break;
1207 }
1208 case LTTNG_VIEWER_INDEX_RETRY:
1209 PDBG("get_next_index: retry\n");
1210 memset(index, 0, sizeof(struct packet_index));
1211 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1212 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1213 goto end;
1214 case LTTNG_VIEWER_INDEX_HUP:
1215 PDBG("get_next_index: stream hung up\n");
1216 memset(index, 0, sizeof(struct packet_index));
1217 index->offset = EOF;
1218 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1219 stream->state = LTTNG_LIVE_STREAM_EOF;
1220 break;
1221 case LTTNG_VIEWER_INDEX_ERR:
1222 PERR("get_next_index: error\n");
1223 memset(index, 0, sizeof(struct packet_index));
1224 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1225 goto error;
1226 default:
1227 PERR("get_next_index: unkwown value\n");
1228 memset(index, 0, sizeof(struct packet_index));
1229 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1230 goto error;
1231 }
1232end:
1233 return retstatus;
1234
1235error:
1236 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1237 return retstatus;
1238}
1239
1240BT_HIDDEN
1241enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
1242 struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
1243 uint64_t req_len, uint64_t *recv_len)
1244{
1245 enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
1246 struct lttng_viewer_cmd cmd;
1247 struct lttng_viewer_get_packet rq;
1248 struct lttng_viewer_trace_packet rp;
1249 ssize_t ret_len;
1250 uint32_t flags, status;
1251 struct bt_live_viewer_connection *viewer_connection =
1252 lttng_live->viewer_connection;
1253 struct lttng_live_trace *trace = stream->trace;
1254
1255 PDBG("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64 "\n",
1256 offset, req_len);
1257 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1258 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1259 cmd.cmd_version = htobe32(0);
1260
1261 memset(&rq, 0, sizeof(rq));
1262 rq.stream_id = htobe64(stream->viewer_stream_id);
1263 rq.offset = htobe64(offset);
1264 rq.len = htobe32(req_len);
1265
1266 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
1267 if (ret_len < 0) {
1268 PERR("Error sending cmd: %s\n", strerror(errno));
1269 goto error;
1270 }
1271 assert(ret_len == sizeof(cmd));
1272
1273 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
1274 if (ret_len < 0) {
1275 PERR("Error sending get_data request: %s\n", strerror(errno));
1276 goto error;
1277 }
1278 assert(ret_len == sizeof(rq));
1279
1280 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
1281 if (ret_len == 0) {
1282 PERR("Remote side has closed connection\n");
1283 goto error;
1284 }
1285 if (ret_len < 0) {
1286 PERR("Error receiving get_data response: %s\n", strerror(errno));
1287 goto error;
1288 }
1289 if (ret_len != sizeof(rp)) {
1290 PERR("[error] get_data_packet: expected %zu"
1291 ", received %zd\n", sizeof(rp),
1292 ret_len);
1293 goto error;
1294 }
1295
1296 flags = be32toh(rp.flags);
1297 status = be32toh(rp.status);
1298
1299 switch (status) {
1300 case LTTNG_VIEWER_GET_PACKET_OK:
1301 req_len = be32toh(rp.len);
1302 PDBG("get_data_packet: Ok, packet size : %" PRIu64 "\n", req_len);
1303 break;
1304 case LTTNG_VIEWER_GET_PACKET_RETRY:
1305 /* Unimplemented by relay daemon */
1306 PDBG("get_data_packet: retry\n");
1307 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1308 goto end;
1309 case LTTNG_VIEWER_GET_PACKET_ERR:
1310 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1311 PDBG("get_data_packet: new metadata needed, try again later\n");
1312 trace->new_metadata_needed = true;
1313 }
1314 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1315 PDBG("get_data_packet: new streams needed, try again later\n");
1316 lttng_live_need_new_streams(lttng_live);
1317 }
1318 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1319 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1320 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1321 goto end;
1322 }
1323 PERR("get_data_packet: error\n");
1324 goto error;
1325 case LTTNG_VIEWER_GET_PACKET_EOF:
1326 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
1327 goto end;
1328 default:
1329 PDBG("get_data_packet: unknown\n");
1330 goto error;
1331 }
1332
1333 if (req_len == 0) {
1334 goto error;
1335 }
1336
1337 ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
1338 if (ret_len == 0) {
1339 PERR("Remote side has closed connection\n");
1340 goto error;
1341 }
1342 if (ret_len < 0) {
1343 PERR("Error receiving trace packet: %s\n", strerror(errno));
1344 goto error;
1345 }
1346 assert(ret_len == req_len);
1347 *recv_len = ret_len;
1348end:
1349 return retstatus;
1350
1351error:
1352 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
1353 return retstatus;
1354}
1355
1356/*
1357 * Request new streams for a session.
1358 */
1359BT_HIDDEN
1360enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
1361 struct lttng_live_session *session)
1362{
1363 enum bt_ctf_lttng_live_iterator_status status =
1364 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1365 struct lttng_viewer_cmd cmd;
1366 struct lttng_viewer_new_streams_request rq;
1367 struct lttng_viewer_new_streams_response rp;
1368 ssize_t ret_len;
1369 struct lttng_live_component *lttng_live = session->lttng_live;
1370 struct bt_live_viewer_connection *viewer_connection =
1371 lttng_live->viewer_connection;
1372 uint32_t streams_count;
1373
1374 if (!session->new_streams_needed) {
1375 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1376 }
1377
1378 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1379 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1380 cmd.cmd_version = htobe32(0);
1381
1382 memset(&rq, 0, sizeof(rq));
1383 rq.session_id = htobe64(session->id);
1384
1385 ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
1386 if (ret_len < 0) {
1387 PERR("Error sending cmd: %s\n", strerror(errno));
1388 goto error;
1389 }
1390 assert(ret_len == sizeof(cmd));
1391
1392 ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
1393 if (ret_len < 0) {
1394 PERR("Error sending get_new_streams request: %s\n", strerror(errno));
1395 goto error;
1396 }
1397 assert(ret_len == sizeof(rq));
1398
1399 ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
1400 if (ret_len == 0) {
1401 PERR("Remote side has closed connection\n");
1402 goto error;
1403 }
1404 if (ret_len < 0) {
1405 PERR("Error receiving get_new_streams response\n");
1406 goto error;
1407 }
1408 assert(ret_len == sizeof(rp));
1409
1410 streams_count = be32toh(rp.streams_count);
1411
1412 switch(be32toh(rp.status)) {
1413 case LTTNG_VIEWER_NEW_STREAMS_OK:
1414 session->new_streams_needed = false;
1415 break;
1416 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1417 session->new_streams_needed = false;
1418 goto end;
1419 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1420 session->new_streams_needed = false;
1421 session->closed = true;
1422 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1423 goto end;
1424 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1425 PERR("get_new_streams error\n");
1426 goto error;
1427 default:
1428 PERR("Unknown return code %u\n", be32toh(rp.status));
1429 goto error;
1430 }
1431
1432 if (receive_streams(session, streams_count)) {
1433 goto error;
1434 }
1435end:
1436 return status;
1437
1438error:
1439 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1440 return status;
1441}
1442
1443BT_HIDDEN
1444struct bt_live_viewer_connection *
1445 bt_live_viewer_connection_create(const char *url, FILE *error_fp)
1446{
1447 struct bt_live_viewer_connection *viewer_connection;
1448
1449 viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
1450
1451 bt_object_init(&viewer_connection->obj, connection_release);
1452 viewer_connection->control_sock = -1;
1453 viewer_connection->port = -1;
1454 viewer_connection->error_fp = error_fp;
1455 viewer_connection->url = g_string_new(url);
1456 if (!viewer_connection->url) {
1457 goto error;
1458 }
1459
1460 PDBG("Establishing connection to url \"%s\"...\n", url);
1461 if (lttng_live_connect_viewer(viewer_connection)) {
1462 goto error_report;
1463 }
1464 PDBG("Connection to url \"%s\" is established\n", url);
1465 return viewer_connection;
1466
1467error_report:
1468 printf_verbose("Failure to establish connection to url \"%s\"\n", url);
1469error:
1470 g_free(viewer_connection);
1471 return NULL;
1472}
1473
1474BT_HIDDEN
1475void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
1476{
1477 PDBG("Closing connection to url \"%s\"\n", viewer_connection->url->str);
1478 lttng_live_disconnect_viewer(viewer_connection);
1479 g_string_free(viewer_connection->url, TRUE);
1480 g_free(viewer_connection);
1481}
This page took 0.07372 seconds and 4 git commands to generate.