9f8823a3d6d7a9560a6895039b690c3c1d52ec11
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
1 /*
2 * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <stdlib.h>
26 #include <stdbool.h>
27 #include <unistd.h>
28 #include <glib.h>
29 #include <inttypes.h>
30 #include <sys/socket.h>
31 #include <sys/types.h>
32 #include <netinet/in.h>
33 #include <netdb.h>
34 #include <fcntl.h>
35 #include <poll.h>
36
37 #include <babeltrace/compat/send-internal.h>
38 #include <babeltrace/compiler-internal.h>
39 #include <babeltrace/common-internal.h>
40 #include <babeltrace/graph/graph.h>
41
42 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
43
44 #include "lttng-live-internal.h"
45 #include "viewer-connection.h"
46 #include "lttng-viewer-abi.h"
47 #include "data-stream.h"
48 #include "metadata.h"
49
50 static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
51 void *buf, size_t len)
52 {
53 ssize_t ret;
54 size_t copied = 0, to_copy = len;
55 struct lttng_live_component *lttng_live =
56 viewer_connection->lttng_live;
57 int fd = viewer_connection->control_sock;
58
59 do {
60 ret = recv(fd, buf + copied, to_copy, 0);
61 if (ret > 0) {
62 assert(ret <= to_copy);
63 copied += ret;
64 to_copy -= ret;
65 }
66 if (ret < 0 && errno == EINTR) {
67 if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
68 break;
69 } else {
70 continue;
71 }
72 }
73 } while (ret > 0 && to_copy > 0);
74 if (ret > 0)
75 ret = copied;
76 /* ret = 0 means orderly shutdown, ret < 0 is error. */
77 return ret;
78 }
79
80 static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
81 const void *buf, size_t len)
82 {
83 struct lttng_live_component *lttng_live =
84 viewer_connection->lttng_live;
85 int fd = viewer_connection->control_sock;
86 ssize_t ret;
87
88 for (;;) {
89 ret = bt_send_nosigpipe(fd, buf, len);
90 if (ret < 0 && errno == EINTR) {
91 if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
92 break;
93 } else {
94 continue;
95 }
96 } else {
97 break;
98 }
99 }
100 return ret;
101 }
102
103 static int parse_url(struct bt_live_viewer_connection *viewer_connection)
104 {
105 char error_buf[256] = { 0 };
106 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
107 int ret = -1;
108 const char *path = viewer_connection->url->str;
109
110 if (!path) {
111 goto end;
112 }
113
114 lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
115 error_buf, sizeof(error_buf));
116 if (!lttng_live_url_parts.proto) {
117 BT_LOGW("Invalid LTTng live URL format: %s", error_buf);
118 goto end;
119 }
120
121 viewer_connection->relay_hostname =
122 lttng_live_url_parts.hostname;
123 lttng_live_url_parts.hostname = NULL;
124
125 if (lttng_live_url_parts.port >= 0) {
126 viewer_connection->port = lttng_live_url_parts.port;
127 } else {
128 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
129 }
130
131 viewer_connection->target_hostname =
132 lttng_live_url_parts.target_hostname;
133 lttng_live_url_parts.target_hostname = NULL;
134
135 if (lttng_live_url_parts.session_name) {
136 viewer_connection->session_name =
137 lttng_live_url_parts.session_name;
138 lttng_live_url_parts.session_name = NULL;
139 }
140
141 BT_LOGD("Connecting to hostname : %s, port : %d, "
142 "target hostname : %s, session name : %s, "
143 "proto : %s",
144 viewer_connection->relay_hostname->str,
145 viewer_connection->port,
146 viewer_connection->target_hostname == NULL ?
147 "<none>" : viewer_connection->target_hostname->str,
148 viewer_connection->session_name == NULL ?
149 "<none>" : viewer_connection->session_name->str,
150 lttng_live_url_parts.proto->str);
151 ret = 0;
152
153 end:
154 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
155 return ret;
156 }
157
158 static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
159 {
160 struct lttng_viewer_cmd cmd;
161 struct lttng_viewer_connect connect;
162 int ret;
163 ssize_t ret_len;
164
165 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
166 cmd.data_size = htobe64((uint64_t) sizeof(connect));
167 cmd.cmd_version = htobe32(0);
168
169 connect.viewer_session_id = -1ULL; /* will be set on recv */
170 connect.major = htobe32(LTTNG_LIVE_MAJOR);
171 connect.minor = htobe32(LTTNG_LIVE_MINOR);
172 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
173
174 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
175 if (ret_len < 0) {
176 BT_LOGE("Error sending cmd: %s", strerror(errno));
177 goto error;
178 }
179 assert(ret_len == sizeof(cmd));
180
181 ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
182 if (ret_len < 0) {
183 BT_LOGE("Error sending version: %s", strerror(errno));
184 goto error;
185 }
186 assert(ret_len == sizeof(connect));
187
188 ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
189 if (ret_len == 0) {
190 BT_LOGI("Remote side has closed connection");
191 goto error;
192 }
193 if (ret_len < 0) {
194 BT_LOGE("Error receiving version: %s", strerror(errno));
195 goto error;
196 }
197 assert(ret_len == sizeof(connect));
198
199 BT_LOGD("Received viewer session ID : %" PRIu64,
200 be64toh(connect.viewer_session_id));
201 BT_LOGD("Relayd version : %u.%u", be32toh(connect.major),
202 be32toh(connect.minor));
203
204 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
205 BT_LOGE("Incompatible lttng-relayd protocol");
206 goto error;
207 }
208 /* Use the smallest protocol version implemented. */
209 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
210 viewer_connection->minor = be32toh(connect.minor);
211 } else {
212 viewer_connection->minor = LTTNG_LIVE_MINOR;
213 }
214 viewer_connection->major = LTTNG_LIVE_MAJOR;
215 ret = 0;
216 return ret;
217
218 error:
219 BT_LOGE("Unable to establish connection");
220 return -1;
221 }
222
223 static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
224 {
225 struct hostent *host;
226 struct sockaddr_in server_addr;
227 int ret;
228
229 if (parse_url(viewer_connection)) {
230 goto error;
231 }
232
233 host = gethostbyname(viewer_connection->relay_hostname->str);
234 if (!host) {
235 BT_LOGE("Cannot lookup hostname %s",
236 viewer_connection->relay_hostname->str);
237 goto error;
238 }
239
240 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
241 BT_LOGE("Socket creation failed: %s", strerror(errno));
242 goto error;
243 }
244
245 server_addr.sin_family = AF_INET;
246 server_addr.sin_port = htons(viewer_connection->port);
247 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
248 memset(&(server_addr.sin_zero), 0, 8);
249
250 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
251 sizeof(struct sockaddr)) == -1) {
252 BT_LOGE("Connection failed: %s", strerror(errno));
253 goto error;
254 }
255 if (lttng_live_handshake(viewer_connection)) {
256 goto error;
257 }
258
259 ret = 0;
260
261 return ret;
262
263 error:
264 if (viewer_connection->control_sock >= 0) {
265 if (close(viewer_connection->control_sock)) {
266 BT_LOGE("Close: %s", strerror(errno));
267 }
268 }
269 viewer_connection->control_sock = -1;
270 return -1;
271 }
272
273 static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
274 {
275 if (viewer_connection->control_sock < 0) {
276 return;
277 }
278 if (close(viewer_connection->control_sock)) {
279 BT_LOGE("Close: %s", strerror(errno));
280 viewer_connection->control_sock = -1;
281 }
282 }
283
284 static void connection_release(struct bt_object *obj)
285 {
286 struct bt_live_viewer_connection *conn =
287 container_of(obj, struct bt_live_viewer_connection, obj);
288
289 bt_live_viewer_connection_destroy(conn);
290 }
291
292 static
293 enum bt_value_status list_update_session(struct bt_value *results,
294 const struct lttng_viewer_session *session,
295 bool *_found)
296 {
297 enum bt_value_status ret = BT_VALUE_STATUS_OK;
298 struct bt_value *map = NULL;
299 struct bt_value *hostname = NULL;
300 struct bt_value *session_name = NULL;
301 struct bt_value *btval = NULL;
302 int i, len;
303 bool found = false;
304
305 len = bt_value_array_size(results);
306 if (len < 0) {
307 ret = BT_VALUE_STATUS_ERROR;
308 goto end;
309 }
310 for (i = 0; i < len; i++) {
311 const char *hostname_str = NULL;
312 const char *session_name_str = NULL;
313
314 map = bt_value_array_get(results, (size_t) i);
315 if (!map) {
316 ret = BT_VALUE_STATUS_ERROR;
317 goto end;
318 }
319 hostname = bt_value_map_get(map, "target-hostname");
320 if (!hostname) {
321 ret = BT_VALUE_STATUS_ERROR;
322 goto end;
323 }
324 session_name = bt_value_map_get(map, "session-name");
325 if (!session_name) {
326 ret = BT_VALUE_STATUS_ERROR;
327 goto end;
328 }
329 ret = bt_value_string_get(hostname, &hostname_str);
330 if (ret != BT_VALUE_STATUS_OK) {
331 goto end;
332 }
333 ret = bt_value_string_get(session_name, &session_name_str);
334 if (ret != BT_VALUE_STATUS_OK) {
335 goto end;
336 }
337
338 if (!strcmp(session->hostname, hostname_str)
339 && !strcmp(session->session_name,
340 session_name_str)) {
341 int64_t val;
342 uint32_t streams = be32toh(session->streams);
343 uint32_t clients = be32toh(session->clients);
344
345 found = true;
346
347 btval = bt_value_map_get(map, "stream-count");
348 if (!btval) {
349 ret = BT_VALUE_STATUS_ERROR;
350 goto end;
351 }
352 ret = bt_value_integer_get(btval, &val);
353 if (ret != BT_VALUE_STATUS_OK) {
354 goto end;
355 }
356 /* sum */
357 val += streams;
358 ret = bt_value_integer_set(btval, val);
359 if (ret != BT_VALUE_STATUS_OK) {
360 goto end;
361 }
362 BT_PUT(btval);
363
364 btval = bt_value_map_get(map, "client-count");
365 if (!btval) {
366 ret = BT_VALUE_STATUS_ERROR;
367 goto end;
368 }
369 ret = bt_value_integer_get(btval, &val);
370 if (ret != BT_VALUE_STATUS_OK) {
371 goto end;
372 }
373 /* max */
374 val = max_t(int64_t, clients, val);
375 ret = bt_value_integer_set(btval, val);
376 if (ret != BT_VALUE_STATUS_OK) {
377 goto end;
378 }
379 BT_PUT(btval);
380 }
381
382 BT_PUT(hostname);
383 BT_PUT(session_name);
384 BT_PUT(map);
385
386 if (found) {
387 break;
388 }
389 }
390 end:
391 BT_PUT(btval);
392 BT_PUT(hostname);
393 BT_PUT(session_name);
394 BT_PUT(map);
395 *_found = found;
396 return ret;
397 }
398
399 static
400 enum bt_value_status list_append_session(struct bt_value *results,
401 GString *base_url,
402 const struct lttng_viewer_session *session)
403 {
404 enum bt_value_status ret = BT_VALUE_STATUS_OK;
405 struct bt_value *map = NULL;
406 GString *url = NULL;
407 bool found = false;
408
409 /*
410 * If the session already exists, add the stream count to it,
411 * and do max of client counts.
412 */
413 ret = list_update_session(results, session, &found);
414 if (ret != BT_VALUE_STATUS_OK || found) {
415 goto end;
416 }
417
418 map = bt_value_map_create();
419 if (!map) {
420 ret = BT_VALUE_STATUS_ERROR;
421 goto end;
422 }
423
424 if (base_url->len < 1) {
425 ret = BT_VALUE_STATUS_ERROR;
426 goto end;
427 }
428 /*
429 * key = "url",
430 * value = <string>,
431 */
432 url = g_string_new(base_url->str);
433 g_string_append(url, "/host/");
434 g_string_append(url, session->hostname);
435 g_string_append_c(url, '/');
436 g_string_append(url, session->session_name);
437
438 ret = bt_value_map_insert_string(map, "url", url->str);
439 if (ret != BT_VALUE_STATUS_OK) {
440 goto end;
441 }
442
443 /*
444 * key = "target-hostname",
445 * value = <string>,
446 */
447 ret = bt_value_map_insert_string(map, "target-hostname",
448 session->hostname);
449 if (ret != BT_VALUE_STATUS_OK) {
450 goto end;
451 }
452
453 /*
454 * key = "session-name",
455 * value = <string>,
456 */
457 ret = bt_value_map_insert_string(map, "session-name",
458 session->session_name);
459 if (ret != BT_VALUE_STATUS_OK) {
460 goto end;
461 }
462
463 /*
464 * key = "timer-us",
465 * value = <integer>,
466 */
467 {
468 uint32_t live_timer = be32toh(session->live_timer);
469
470 ret = bt_value_map_insert_integer(map, "timer-us",
471 live_timer);
472 if (ret != BT_VALUE_STATUS_OK) {
473 goto end;
474 }
475 }
476
477 /*
478 * key = "stream-count",
479 * value = <integer>,
480 */
481 {
482 uint32_t streams = be32toh(session->streams);
483
484 ret = bt_value_map_insert_integer(map, "stream-count",
485 streams);
486 if (ret != BT_VALUE_STATUS_OK) {
487 goto end;
488 }
489 }
490
491
492 /*
493 * key = "client-count",
494 * value = <integer>,
495 */
496 {
497 uint32_t clients = be32toh(session->clients);
498
499 ret = bt_value_map_insert_integer(map, "client-count",
500 clients);
501 if (ret != BT_VALUE_STATUS_OK) {
502 goto end;
503 }
504 }
505
506 ret = bt_value_array_append(results, map);
507 end:
508 if (url) {
509 g_string_free(url, TRUE);
510 }
511 BT_PUT(map);
512 return ret;
513 }
514
515 /*
516 * Data structure returned:
517 *
518 * {
519 * <array> = {
520 * [n] = {
521 * <map> = {
522 * {
523 * key = "url",
524 * value = <string>,
525 * },
526 * {
527 * key = "target-hostname",
528 * value = <string>,
529 * },
530 * {
531 * key = "session-name",
532 * value = <string>,
533 * },
534 * {
535 * key = "timer-us",
536 * value = <integer>,
537 * },
538 * {
539 * key = "stream-count",
540 * value = <integer>,
541 * },
542 * {
543 * key = "client-count",
544 * value = <integer>,
545 * },
546 * },
547 * }
548 * }
549 */
550
551 BT_HIDDEN
552 struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
553 {
554 struct bt_value *results = NULL;
555 struct lttng_viewer_cmd cmd;
556 struct lttng_viewer_list_sessions list;
557 uint32_t i, sessions_count;
558 ssize_t ret_len;
559
560 if (lttng_live_handshake(viewer_connection)) {
561 goto error;
562 }
563
564 results = bt_value_array_create();
565 if (!results) {
566 BT_LOGE("Error creating array");
567 goto error;
568 }
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
571 cmd.data_size = htobe64((uint64_t) 0);
572 cmd.cmd_version = htobe32(0);
573
574 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
575 if (ret_len < 0) {
576 BT_LOGE("Error sending cmd: %s", strerror(errno));
577 goto error;
578 }
579 assert(ret_len == sizeof(cmd));
580
581 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
582 if (ret_len == 0) {
583 BT_LOGI("Remote side has closed connection");
584 goto error;
585 }
586 if (ret_len < 0) {
587 BT_LOGE("Error receiving session list: %s", strerror(errno));
588 goto error;
589 }
590 assert(ret_len == sizeof(list));
591
592 sessions_count = be32toh(list.sessions_count);
593 for (i = 0; i < sessions_count; i++) {
594 struct lttng_viewer_session lsession;
595
596 ret_len = lttng_live_recv(viewer_connection,
597 &lsession, sizeof(lsession));
598 if (ret_len == 0) {
599 BT_LOGI("Remote side has closed connection");
600 goto error;
601 }
602 if (ret_len < 0) {
603 BT_LOGE("Error receiving session: %s", strerror(errno));
604 goto error;
605 }
606 assert(ret_len == sizeof(lsession));
607 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
608 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
609 if (list_append_session(results,
610 viewer_connection->url, &lsession)
611 != BT_VALUE_STATUS_OK) {
612 goto error;
613 }
614 }
615 goto end;
616 error:
617 BT_PUT(results);
618 end:
619 return results;
620 }
621
622 static
623 int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
624 {
625 struct lttng_viewer_cmd cmd;
626 struct lttng_viewer_list_sessions list;
627 struct lttng_viewer_session lsession;
628 uint32_t i, sessions_count;
629 ssize_t ret_len;
630 uint64_t session_id;
631 struct bt_live_viewer_connection *viewer_connection =
632 lttng_live->viewer_connection;
633
634 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
635 cmd.data_size = htobe64((uint64_t) 0);
636 cmd.cmd_version = htobe32(0);
637
638 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
639 if (ret_len < 0) {
640 BT_LOGE("Error sending cmd: %s", strerror(errno));
641 goto error;
642 }
643 assert(ret_len == sizeof(cmd));
644
645 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
646 if (ret_len == 0) {
647 BT_LOGI("Remote side has closed connection");
648 goto error;
649 }
650 if (ret_len < 0) {
651 BT_LOGE("Error receiving session list: %s", strerror(errno));
652 goto error;
653 }
654 assert(ret_len == sizeof(list));
655
656 sessions_count = be32toh(list.sessions_count);
657 for (i = 0; i < sessions_count; i++) {
658 ret_len = lttng_live_recv(viewer_connection,
659 &lsession, sizeof(lsession));
660 if (ret_len == 0) {
661 BT_LOGI("Remote side has closed connection");
662 goto error;
663 }
664 if (ret_len < 0) {
665 BT_LOGE("Error receiving session: %s", strerror(errno));
666 goto error;
667 }
668 assert(ret_len == sizeof(lsession));
669 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
670 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
671 session_id = be64toh(lsession.id);
672
673 if ((strncmp(lsession.session_name,
674 viewer_connection->session_name->str,
675 MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
676 viewer_connection->target_hostname->str,
677 MAXNAMLEN) == 0)) {
678 if (lttng_live_add_session(lttng_live, session_id)) {
679 goto error;
680 }
681 }
682 }
683
684 return 0;
685
686 error:
687 BT_LOGE("Unable to query session ids");
688 return -1;
689 }
690
691 BT_HIDDEN
692 int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
693 {
694 struct lttng_viewer_cmd cmd;
695 struct lttng_viewer_create_session_response resp;
696 ssize_t ret_len;
697 struct bt_live_viewer_connection *viewer_connection =
698 lttng_live->viewer_connection;
699
700 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
701 cmd.data_size = htobe64((uint64_t) 0);
702 cmd.cmd_version = htobe32(0);
703
704 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
705 if (ret_len < 0) {
706 BT_LOGE("Error sending cmd: %s", strerror(errno));
707 goto error;
708 }
709 assert(ret_len == sizeof(cmd));
710
711 ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
712 if (ret_len == 0) {
713 BT_LOGI("Remote side has closed connection");
714 goto error;
715 }
716 if (ret_len < 0) {
717 BT_LOGE("Error receiving create session reply: %s", strerror(errno));
718 goto error;
719 }
720 assert(ret_len == sizeof(resp));
721
722 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
723 BT_LOGE("Error creating viewer session");
724 goto error;
725 }
726 if (lttng_live_query_session_ids(lttng_live)) {
727 goto error;
728 }
729
730 return 0;
731
732 error:
733 return -1;
734 }
735
736 static
737 int receive_streams(struct lttng_live_session *session,
738 uint32_t stream_count)
739 {
740 ssize_t ret_len;
741 uint32_t i;
742 struct lttng_live_component *lttng_live = session->lttng_live;
743 struct bt_live_viewer_connection *viewer_connection =
744 lttng_live->viewer_connection;
745
746 BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
747 for (i = 0; i < stream_count; i++) {
748 struct lttng_viewer_stream stream;
749 struct lttng_live_stream_iterator *live_stream;
750 uint64_t stream_id;
751 uint64_t ctf_trace_id;
752
753 ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
754 if (ret_len == 0) {
755 BT_LOGI("Remote side has closed connection");
756 goto error;
757 }
758 if (ret_len < 0) {
759 BT_LOGE("Error receiving stream");
760 goto error;
761 }
762 assert(ret_len == sizeof(stream));
763 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
764 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
765 stream_id = be64toh(stream.id);
766 ctf_trace_id = be64toh(stream.ctf_trace_id);
767
768 if (stream.metadata_flag) {
769 BT_LOGD(" metadata stream %" PRIu64 " : %s/%s",
770 stream_id, stream.path_name,
771 stream.channel_name);
772 if (lttng_live_metadata_create_stream(session,
773 ctf_trace_id, stream_id)) {
774 BT_LOGE("Error creating metadata stream");
775
776 goto error;
777 }
778 session->lazy_stream_notif_init = true;
779 } else {
780 BT_LOGD(" stream %" PRIu64 " : %s/%s",
781 stream_id, stream.path_name,
782 stream.channel_name);
783 live_stream = lttng_live_stream_iterator_create(session,
784 ctf_trace_id, stream_id);
785 if (!live_stream) {
786 BT_LOGE("Error creating streamn");
787 goto error;
788 }
789 }
790 }
791 return 0;
792
793 error:
794 return -1;
795 }
796
797 BT_HIDDEN
798 int lttng_live_attach_session(struct lttng_live_session *session)
799 {
800 struct lttng_viewer_cmd cmd;
801 struct lttng_viewer_attach_session_request rq;
802 struct lttng_viewer_attach_session_response rp;
803 ssize_t ret_len;
804 struct lttng_live_component *lttng_live = session->lttng_live;
805 struct bt_live_viewer_connection *viewer_connection =
806 lttng_live->viewer_connection;
807 uint64_t session_id = session->id;
808 uint32_t streams_count;
809
810 if (session->attached) {
811 return 0;
812 }
813
814 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
815 cmd.data_size = htobe64((uint64_t) sizeof(rq));
816 cmd.cmd_version = htobe32(0);
817
818 memset(&rq, 0, sizeof(rq));
819 rq.session_id = htobe64(session_id);
820 // TODO: add cmd line parameter to select seek beginning
821 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
822 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
823
824 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
825 if (ret_len < 0) {
826 BT_LOGE("Error sending cmd: %s", strerror(errno));
827 goto error;
828 }
829 assert(ret_len == sizeof(cmd));
830
831 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
832 if (ret_len < 0) {
833 BT_LOGE("Error sending attach request: %s", strerror(errno));
834 goto error;
835 }
836 assert(ret_len == sizeof(rq));
837
838 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
839 if (ret_len == 0) {
840 BT_LOGI("Remote side has closed connection");
841 goto error;
842 }
843 if (ret_len < 0) {
844 BT_LOGE("Error receiving attach response: %s", strerror(errno));
845 goto error;
846 }
847 assert(ret_len == sizeof(rp));
848
849 streams_count = be32toh(rp.streams_count);
850 switch(be32toh(rp.status)) {
851 case LTTNG_VIEWER_ATTACH_OK:
852 break;
853 case LTTNG_VIEWER_ATTACH_UNK:
854 BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
855 goto error;
856 case LTTNG_VIEWER_ATTACH_ALREADY:
857 BT_LOGW("There is already a viewer attached to this session");
858 goto error;
859 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
860 BT_LOGW("Not a live session");
861 goto error;
862 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
863 BT_LOGE("Wrong seek parameter");
864 goto error;
865 default:
866 BT_LOGE("Unknown attach return code %u", be32toh(rp.status));
867 goto error;
868 }
869
870 /* We receive the initial list of streams. */
871 if (receive_streams(session, streams_count)) {
872 goto error;
873 }
874
875 session->attached = true;
876 session->new_streams_needed = false;
877
878 return 0;
879
880 error:
881 return -1;
882 }
883
884 BT_HIDDEN
885 int lttng_live_detach_session(struct lttng_live_session *session)
886 {
887 struct lttng_viewer_cmd cmd;
888 struct lttng_viewer_detach_session_request rq;
889 struct lttng_viewer_detach_session_response rp;
890 ssize_t ret_len;
891 struct lttng_live_component *lttng_live = session->lttng_live;
892 struct bt_live_viewer_connection *viewer_connection =
893 lttng_live->viewer_connection;
894 uint64_t session_id = session->id;
895
896 if (!session->attached) {
897 return 0;
898 }
899
900 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
901 cmd.data_size = htobe64((uint64_t) sizeof(rq));
902 cmd.cmd_version = htobe32(0);
903
904 memset(&rq, 0, sizeof(rq));
905 rq.session_id = htobe64(session_id);
906
907 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
908 if (ret_len < 0) {
909 BT_LOGE("Error sending cmd: %s", strerror(errno));
910 goto error;
911 }
912 assert(ret_len == sizeof(cmd));
913
914 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
915 if (ret_len < 0) {
916 BT_LOGE("Error sending detach request: %s", strerror(errno));
917 goto error;
918 }
919 assert(ret_len == sizeof(rq));
920
921 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
922 if (ret_len == 0) {
923 BT_LOGI("Remote side has closed connection");
924 goto error;
925 }
926 if (ret_len < 0) {
927 BT_LOGE("Error receiving detach response: %s", strerror(errno));
928 goto error;
929 }
930 assert(ret_len == sizeof(rp));
931
932 switch(be32toh(rp.status)) {
933 case LTTNG_VIEWER_DETACH_SESSION_OK:
934 break;
935 case LTTNG_VIEWER_DETACH_SESSION_UNK:
936 BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
937 goto error;
938 case LTTNG_VIEWER_DETACH_SESSION_ERR:
939 BT_LOGW("Error detaching session id %" PRIu64 "", session_id);
940 goto error;
941 default:
942 BT_LOGE("Unknown detach return code %u", be32toh(rp.status));
943 goto error;
944 }
945
946 session->attached = false;
947
948 return 0;
949
950 error:
951 return -1;
952 }
953
954 BT_HIDDEN
955 ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
956 FILE *fp)
957 {
958 uint64_t len = 0;
959 int ret;
960 struct lttng_viewer_cmd cmd;
961 struct lttng_viewer_get_metadata rq;
962 struct lttng_viewer_metadata_packet rp;
963 char *data = NULL;
964 ssize_t ret_len;
965 struct lttng_live_session *session = trace->session;
966 struct lttng_live_component *lttng_live = session->lttng_live;
967 struct lttng_live_metadata *metadata = trace->metadata;
968 struct bt_live_viewer_connection *viewer_connection =
969 lttng_live->viewer_connection;
970
971 rq.stream_id = htobe64(metadata->stream_id);
972 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
973 cmd.data_size = htobe64((uint64_t) sizeof(rq));
974 cmd.cmd_version = htobe32(0);
975
976 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
977 if (ret_len < 0) {
978 BT_LOGE("Error sending cmd: %s", strerror(errno));
979 goto error;
980 }
981 assert(ret_len == sizeof(cmd));
982
983 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
984 if (ret_len < 0) {
985 BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
986 goto error;
987 }
988 assert(ret_len == sizeof(rq));
989
990 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
991 if (ret_len == 0) {
992 BT_LOGI("Remote side has closed connection");
993 goto error;
994 }
995 if (ret_len < 0) {
996 BT_LOGE("Error receiving get_metadata response: %s", strerror(errno));
997 goto error;
998 }
999 assert(ret_len == sizeof(rp));
1000
1001 switch (be32toh(rp.status)) {
1002 case LTTNG_VIEWER_METADATA_OK:
1003 BT_LOGD("get_metadata : OK");
1004 break;
1005 case LTTNG_VIEWER_NO_NEW_METADATA:
1006 BT_LOGD("get_metadata : NO NEW");
1007 ret = 0;
1008 goto end;
1009 case LTTNG_VIEWER_METADATA_ERR:
1010 BT_LOGD("get_metadata : ERR");
1011 goto error;
1012 default:
1013 BT_LOGD("get_metadata : UNKNOWN");
1014 goto error;
1015 }
1016
1017 len = be64toh(rp.len);
1018 BT_LOGD("Writing %" PRIu64" bytes to metadata", len);
1019 if (len <= 0) {
1020 goto error;
1021 }
1022
1023 data = zmalloc(len);
1024 if (!data) {
1025 BT_LOGE("relay data zmalloc: %s", strerror(errno));
1026 goto error;
1027 }
1028 ret_len = lttng_live_recv(viewer_connection, data, len);
1029 if (ret_len == 0) {
1030 BT_LOGI("Remote side has closed connection");
1031 goto error_free_data;
1032 }
1033 if (ret_len < 0) {
1034 BT_LOGE("Error receiving trace packet: %s", strerror(errno));
1035 goto error_free_data;
1036 }
1037 assert(ret_len == len);
1038
1039 do {
1040 ret_len = fwrite(data, 1, len, fp);
1041 } while (ret_len < 0 && errno == EINTR);
1042 if (ret_len < 0) {
1043 BT_LOGE("Writing in the metadata fp");
1044 goto error_free_data;
1045 }
1046 assert(ret_len == len);
1047 free(data);
1048 ret = len;
1049 end:
1050 return ret;
1051
1052 error_free_data:
1053 free(data);
1054 error:
1055 return -1;
1056 }
1057
1058 /*
1059 * Assign the fields from a lttng_viewer_index to a packet_index.
1060 */
1061 static
1062 void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1063 struct packet_index *pindex)
1064 {
1065 assert(lindex);
1066 assert(pindex);
1067
1068 pindex->offset = be64toh(lindex->offset);
1069 pindex->packet_size = be64toh(lindex->packet_size);
1070 pindex->content_size = be64toh(lindex->content_size);
1071 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1072 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1073 pindex->events_discarded = be64toh(lindex->events_discarded);
1074 }
1075
1076 BT_HIDDEN
1077 enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
1078 struct lttng_live_stream_iterator *stream,
1079 struct packet_index *index)
1080 {
1081 struct lttng_viewer_cmd cmd;
1082 struct lttng_viewer_get_next_index rq;
1083 ssize_t ret_len;
1084 struct lttng_viewer_index rp;
1085 uint32_t flags, status;
1086 enum bt_ctf_lttng_live_iterator_status retstatus =
1087 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1088 struct bt_live_viewer_connection *viewer_connection =
1089 lttng_live->viewer_connection;
1090 struct lttng_live_trace *trace = stream->trace;
1091
1092 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1093 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1094 cmd.cmd_version = htobe32(0);
1095
1096 memset(&rq, 0, sizeof(rq));
1097 rq.stream_id = htobe64(stream->viewer_stream_id);
1098
1099 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1100 if (ret_len < 0) {
1101 BT_LOGE("Error sending cmd: %s", strerror(errno));
1102 goto error;
1103 }
1104 assert(ret_len == sizeof(cmd));
1105
1106 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
1107 if (ret_len < 0) {
1108 BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
1109 goto error;
1110 }
1111 assert(ret_len == sizeof(rq));
1112
1113 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1114 if (ret_len == 0) {
1115 BT_LOGI("Remote side has closed connection");
1116 goto error;
1117 }
1118 if (ret_len < 0) {
1119 BT_LOGE("Error receiving get_next_index response: %s", strerror(errno));
1120 goto error;
1121 }
1122 assert(ret_len == sizeof(rp));
1123
1124 flags = be32toh(rp.flags);
1125 status = be32toh(rp.status);
1126
1127 switch (status) {
1128 case LTTNG_VIEWER_INDEX_INACTIVE:
1129 {
1130 uint64_t ctf_stream_class_id;
1131
1132 BT_LOGD("get_next_index: inactive");
1133 memset(index, 0, sizeof(struct packet_index));
1134 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1135 stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
1136 ctf_stream_class_id = be64toh(rp.stream_id);
1137 if (stream->ctf_stream_class_id != -1ULL) {
1138 assert(stream->ctf_stream_class_id ==
1139 ctf_stream_class_id);
1140 } else {
1141 stream->ctf_stream_class_id = ctf_stream_class_id;
1142 }
1143 stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
1144 break;
1145 }
1146 case LTTNG_VIEWER_INDEX_OK:
1147 {
1148 uint64_t ctf_stream_class_id;
1149
1150 BT_LOGD("get_next_index: OK");
1151 lttng_index_to_packet_index(&rp, index);
1152 ctf_stream_class_id = be64toh(rp.stream_id);
1153 if (stream->ctf_stream_class_id != -1ULL) {
1154 assert(stream->ctf_stream_class_id ==
1155 ctf_stream_class_id);
1156 } else {
1157 stream->ctf_stream_class_id = ctf_stream_class_id;
1158 }
1159
1160 stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
1161 stream->current_packet_end_timestamp =
1162 index->ts_cycles.timestamp_end;
1163
1164 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1165 BT_LOGD("get_next_index: new metadata needed");
1166 trace->new_metadata_needed = true;
1167 }
1168 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1169 BT_LOGD("get_next_index: new streams needed");
1170 lttng_live_need_new_streams(lttng_live);
1171 }
1172 break;
1173 }
1174 case LTTNG_VIEWER_INDEX_RETRY:
1175 BT_LOGD("get_next_index: retry");
1176 memset(index, 0, sizeof(struct packet_index));
1177 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1178 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1179 goto end;
1180 case LTTNG_VIEWER_INDEX_HUP:
1181 BT_LOGD("get_next_index: stream hung up");
1182 memset(index, 0, sizeof(struct packet_index));
1183 index->offset = EOF;
1184 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1185 stream->state = LTTNG_LIVE_STREAM_EOF;
1186 break;
1187 case LTTNG_VIEWER_INDEX_ERR:
1188 BT_LOGE("get_next_index: error");
1189 memset(index, 0, sizeof(struct packet_index));
1190 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1191 goto error;
1192 default:
1193 BT_LOGE("get_next_index: unknown value");
1194 memset(index, 0, sizeof(struct packet_index));
1195 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1196 goto error;
1197 }
1198 end:
1199 return retstatus;
1200
1201 error:
1202 if (bt_graph_is_canceled(lttng_live->graph)) {
1203 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1204 } else {
1205 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1206 }
1207 return retstatus;
1208 }
1209
1210 BT_HIDDEN
1211 enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
1212 struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
1213 uint64_t req_len, uint64_t *recv_len)
1214 {
1215 enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
1216 struct lttng_viewer_cmd cmd;
1217 struct lttng_viewer_get_packet rq;
1218 struct lttng_viewer_trace_packet rp;
1219 ssize_t ret_len;
1220 uint32_t flags, status;
1221 struct bt_live_viewer_connection *viewer_connection =
1222 lttng_live->viewer_connection;
1223 struct lttng_live_trace *trace = stream->trace;
1224
1225 BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
1226 offset, req_len);
1227 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1228 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1229 cmd.cmd_version = htobe32(0);
1230
1231 memset(&rq, 0, sizeof(rq));
1232 rq.stream_id = htobe64(stream->viewer_stream_id);
1233 rq.offset = htobe64(offset);
1234 rq.len = htobe32(req_len);
1235
1236 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1237 if (ret_len < 0) {
1238 BT_LOGE("Error sending cmd: %s", strerror(errno));
1239 goto error;
1240 }
1241 assert(ret_len == sizeof(cmd));
1242
1243 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
1244 if (ret_len < 0) {
1245 BT_LOGE("Error sending get_data request: %s", strerror(errno));
1246 goto error;
1247 }
1248 assert(ret_len == sizeof(rq));
1249
1250 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1251 if (ret_len == 0) {
1252 BT_LOGI("Remote side has closed connection");
1253 goto error;
1254 }
1255 if (ret_len < 0) {
1256 BT_LOGE("Error receiving get_data response: %s", strerror(errno));
1257 goto error;
1258 }
1259 if (ret_len != sizeof(rp)) {
1260 BT_LOGE("get_data_packet: expected %zu"
1261 ", received %zd", sizeof(rp),
1262 ret_len);
1263 goto error;
1264 }
1265
1266 flags = be32toh(rp.flags);
1267 status = be32toh(rp.status);
1268
1269 switch (status) {
1270 case LTTNG_VIEWER_GET_PACKET_OK:
1271 req_len = be32toh(rp.len);
1272 BT_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len);
1273 break;
1274 case LTTNG_VIEWER_GET_PACKET_RETRY:
1275 /* Unimplemented by relay daemon */
1276 BT_LOGD("get_data_packet: retry");
1277 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1278 goto end;
1279 case LTTNG_VIEWER_GET_PACKET_ERR:
1280 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1281 BT_LOGD("get_data_packet: new metadata needed, try again later");
1282 trace->new_metadata_needed = true;
1283 }
1284 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1285 BT_LOGD("get_data_packet: new streams needed, try again later");
1286 lttng_live_need_new_streams(lttng_live);
1287 }
1288 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1289 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1290 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1291 goto end;
1292 }
1293 BT_LOGE("get_data_packet: error");
1294 goto error;
1295 case LTTNG_VIEWER_GET_PACKET_EOF:
1296 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
1297 goto end;
1298 default:
1299 BT_LOGE("get_data_packet: unknown");
1300 goto error;
1301 }
1302
1303 if (req_len == 0) {
1304 goto error;
1305 }
1306
1307 ret_len = lttng_live_recv(viewer_connection, buf, req_len);
1308 if (ret_len == 0) {
1309 BT_LOGI("Remote side has closed connection");
1310 goto error;
1311 }
1312 if (ret_len < 0) {
1313 BT_LOGE("Error receiving trace packet: %s", strerror(errno));
1314 goto error;
1315 }
1316 assert(ret_len == req_len);
1317 *recv_len = ret_len;
1318 end:
1319 return retstatus;
1320
1321 error:
1322 if (bt_graph_is_canceled(lttng_live->graph)) {
1323 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1324 } else {
1325 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
1326 }
1327 return retstatus;
1328 }
1329
1330 /*
1331 * Request new streams for a session.
1332 */
1333 BT_HIDDEN
1334 enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
1335 struct lttng_live_session *session)
1336 {
1337 enum bt_ctf_lttng_live_iterator_status status =
1338 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1339 struct lttng_viewer_cmd cmd;
1340 struct lttng_viewer_new_streams_request rq;
1341 struct lttng_viewer_new_streams_response rp;
1342 ssize_t ret_len;
1343 struct lttng_live_component *lttng_live = session->lttng_live;
1344 struct bt_live_viewer_connection *viewer_connection =
1345 lttng_live->viewer_connection;
1346 uint32_t streams_count;
1347
1348 if (!session->new_streams_needed) {
1349 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1350 }
1351
1352 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1353 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1354 cmd.cmd_version = htobe32(0);
1355
1356 memset(&rq, 0, sizeof(rq));
1357 rq.session_id = htobe64(session->id);
1358
1359 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1360 if (ret_len < 0) {
1361 BT_LOGE("Error sending cmd: %s", strerror(errno));
1362 goto error;
1363 }
1364 assert(ret_len == sizeof(cmd));
1365
1366 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
1367 if (ret_len < 0) {
1368 BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
1369 goto error;
1370 }
1371 assert(ret_len == sizeof(rq));
1372
1373 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1374 if (ret_len == 0) {
1375 BT_LOGI("Remote side has closed connection");
1376 goto error;
1377 }
1378 if (ret_len < 0) {
1379 BT_LOGE("Error receiving get_new_streams response");
1380 goto error;
1381 }
1382 assert(ret_len == sizeof(rp));
1383
1384 streams_count = be32toh(rp.streams_count);
1385
1386 switch(be32toh(rp.status)) {
1387 case LTTNG_VIEWER_NEW_STREAMS_OK:
1388 session->new_streams_needed = false;
1389 break;
1390 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1391 session->new_streams_needed = false;
1392 goto end;
1393 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1394 session->new_streams_needed = false;
1395 session->closed = true;
1396 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1397 goto end;
1398 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1399 BT_LOGE("get_new_streams error");
1400 goto error;
1401 default:
1402 BT_LOGE("Unknown return code %u", be32toh(rp.status));
1403 goto error;
1404 }
1405
1406 if (receive_streams(session, streams_count)) {
1407 goto error;
1408 }
1409 end:
1410 return status;
1411
1412 error:
1413 if (bt_graph_is_canceled(lttng_live->graph)) {
1414 status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1415 } else {
1416 status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
1417 }
1418 return status;
1419 }
1420
1421 BT_HIDDEN
1422 struct bt_live_viewer_connection *
1423 bt_live_viewer_connection_create(const char *url,
1424 struct lttng_live_component *lttng_live)
1425 {
1426 struct bt_live_viewer_connection *viewer_connection;
1427
1428 viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
1429
1430 bt_object_init(&viewer_connection->obj, connection_release);
1431 viewer_connection->control_sock = -1;
1432 viewer_connection->port = -1;
1433 viewer_connection->lttng_live = lttng_live;
1434 viewer_connection->url = g_string_new(url);
1435 if (!viewer_connection->url) {
1436 goto error;
1437 }
1438
1439 BT_LOGD("Establishing connection to url \"%s\"...", url);
1440 if (lttng_live_connect_viewer(viewer_connection)) {
1441 goto error_report;
1442 }
1443 BT_LOGD("Connection to url \"%s\" is established", url);
1444 return viewer_connection;
1445
1446 error_report:
1447 BT_LOGW("Failure to establish connection to url \"%s\"", url);
1448 error:
1449 g_free(viewer_connection);
1450 return NULL;
1451 }
1452
1453 BT_HIDDEN
1454 void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
1455 {
1456 BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
1457 lttng_live_disconnect_viewer(viewer_connection);
1458 g_string_free(viewer_connection->url, TRUE);
1459 if (viewer_connection->relay_hostname) {
1460 g_string_free(viewer_connection->relay_hostname, TRUE);
1461 }
1462 if (viewer_connection->target_hostname) {
1463 g_string_free(viewer_connection->target_hostname, TRUE);
1464 }
1465 if (viewer_connection->session_name) {
1466 g_string_free(viewer_connection->session_name, TRUE);
1467 }
1468 g_free(viewer_connection);
1469 }
This page took 0.072848 seconds and 3 git commands to generate.