plugins/ctf/common/btr: use standard logging files and macros
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
CommitLineData
7cdc2bab
MD
1/*
2 * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23#include <stdio.h>
24#include <stdint.h>
25#include <stdlib.h>
26#include <stdbool.h>
27#include <unistd.h>
28#include <glib.h>
29#include <inttypes.h>
30#include <sys/socket.h>
31#include <sys/types.h>
32#include <netinet/in.h>
33#include <netdb.h>
34#include <fcntl.h>
35#include <poll.h>
36
37#include <babeltrace/compat/send-internal.h>
38#include <babeltrace/compiler-internal.h>
94b828f3 39#include <babeltrace/common-internal.h>
4c66436f 40#include <babeltrace/graph/graph.h>
7cdc2bab 41
087bc060
MD
42#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
43
7cdc2bab
MD
44#include "lttng-live-internal.h"
45#include "viewer-connection.h"
46#include "lttng-viewer-abi.h"
47#include "data-stream.h"
48#include "metadata.h"
49
4c66436f
MD
50static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
51 void *buf, size_t len)
7cdc2bab
MD
52{
53 ssize_t ret;
54 size_t copied = 0, to_copy = len;
4c66436f
MD
55 struct lttng_live_component *lttng_live =
56 viewer_connection->lttng_live;
57 int fd = viewer_connection->control_sock;
7cdc2bab
MD
58
59 do {
60 ret = recv(fd, buf + copied, to_copy, 0);
61 if (ret > 0) {
62 assert(ret <= to_copy);
63 copied += ret;
64 to_copy -= ret;
65 }
4c66436f
MD
66 if (ret < 0 && errno == EINTR) {
67 if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
68 break;
69 } else {
70 continue;
71 }
72 }
73 } while (ret > 0 && to_copy > 0);
7cdc2bab
MD
74 if (ret > 0)
75 ret = copied;
76 /* ret = 0 means orderly shutdown, ret < 0 is error. */
77 return ret;
78}
79
4c66436f
MD
80static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
81 const void *buf, size_t len)
7cdc2bab 82{
4c66436f
MD
83 struct lttng_live_component *lttng_live =
84 viewer_connection->lttng_live;
85 int fd = viewer_connection->control_sock;
7cdc2bab
MD
86 ssize_t ret;
87
4c66436f 88 for (;;) {
7cdc2bab 89 ret = bt_send_nosigpipe(fd, buf, len);
4c66436f
MD
90 if (ret < 0 && errno == EINTR) {
91 if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
92 break;
93 } else {
94 continue;
95 }
96 } else {
97 break;
98 }
99 }
7cdc2bab
MD
100 return ret;
101}
102
7cdc2bab
MD
103static int parse_url(struct bt_live_viewer_connection *viewer_connection)
104{
94b828f3
MD
105 char error_buf[256] = { 0 };
106 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
107 int ret = -1;
7cdc2bab 108 const char *path = viewer_connection->url->str;
7cdc2bab
MD
109
110 if (!path) {
111 goto end;
112 }
7cdc2bab 113
94b828f3
MD
114 lttng_live_url_parts = bt_common_parse_lttng_live_url(path,
115 error_buf, sizeof(error_buf));
116 if (!lttng_live_url_parts.proto) {
117 BT_LOGW("Invalid LTTng live URL format: %s", error_buf);
7cdc2bab
MD
118 goto end;
119 }
7cdc2bab 120
94b828f3
MD
121 viewer_connection->relay_hostname =
122 lttng_live_url_parts.hostname;
123 lttng_live_url_parts.hostname = NULL;
124
125 if (lttng_live_url_parts.port >= 0) {
126 viewer_connection->port = lttng_live_url_parts.port;
127 } else {
7cdc2bab
MD
128 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
129 }
130
94b828f3
MD
131 viewer_connection->target_hostname =
132 lttng_live_url_parts.target_hostname;
133 lttng_live_url_parts.target_hostname = NULL;
134
135 if (lttng_live_url_parts.session_name) {
136 viewer_connection->session_name =
137 lttng_live_url_parts.session_name;
138 lttng_live_url_parts.session_name = NULL;
7cdc2bab
MD
139 }
140
087bc060 141 BT_LOGD("Connecting to hostname : %s, port : %d, "
7cdc2bab 142 "target hostname : %s, session name : %s, "
94b828f3
MD
143 "proto : %s",
144 viewer_connection->relay_hostname->str,
7cdc2bab 145 viewer_connection->port,
94b828f3
MD
146 viewer_connection->target_hostname == NULL ?
147 "<none>" : viewer_connection->target_hostname->str,
148 viewer_connection->session_name == NULL ?
149 "<none>" : viewer_connection->session_name->str,
150 lttng_live_url_parts.proto->str);
7cdc2bab
MD
151 ret = 0;
152
153end:
94b828f3 154 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
7cdc2bab
MD
155 return ret;
156}
157
158static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connection)
159{
160 struct lttng_viewer_cmd cmd;
161 struct lttng_viewer_connect connect;
162 int ret;
163 ssize_t ret_len;
164
165 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
166 cmd.data_size = htobe64((uint64_t) sizeof(connect));
167 cmd.cmd_version = htobe32(0);
168
169 connect.viewer_session_id = -1ULL; /* will be set on recv */
170 connect.major = htobe32(LTTNG_LIVE_MAJOR);
171 connect.minor = htobe32(LTTNG_LIVE_MINOR);
172 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
173
4c66436f 174 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 175 if (ret_len < 0) {
087bc060 176 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
177 goto error;
178 }
179 assert(ret_len == sizeof(cmd));
180
4c66436f 181 ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
7cdc2bab 182 if (ret_len < 0) {
087bc060 183 BT_LOGE("Error sending version: %s", strerror(errno));
7cdc2bab
MD
184 goto error;
185 }
186 assert(ret_len == sizeof(connect));
187
4c66436f 188 ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
7cdc2bab 189 if (ret_len == 0) {
087bc060 190 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
191 goto error;
192 }
193 if (ret_len < 0) {
087bc060 194 BT_LOGE("Error receiving version: %s", strerror(errno));
7cdc2bab
MD
195 goto error;
196 }
197 assert(ret_len == sizeof(connect));
198
087bc060 199 BT_LOGD("Received viewer session ID : %" PRIu64,
7cdc2bab 200 be64toh(connect.viewer_session_id));
087bc060 201 BT_LOGD("Relayd version : %u.%u", be32toh(connect.major),
7cdc2bab
MD
202 be32toh(connect.minor));
203
204 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
087bc060 205 BT_LOGE("Incompatible lttng-relayd protocol");
7cdc2bab
MD
206 goto error;
207 }
208 /* Use the smallest protocol version implemented. */
209 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
210 viewer_connection->minor = be32toh(connect.minor);
211 } else {
212 viewer_connection->minor = LTTNG_LIVE_MINOR;
213 }
214 viewer_connection->major = LTTNG_LIVE_MAJOR;
215 ret = 0;
216 return ret;
217
218error:
087bc060 219 BT_LOGE("Unable to establish connection");
7cdc2bab
MD
220 return -1;
221}
222
223static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_connection)
224{
225 struct hostent *host;
226 struct sockaddr_in server_addr;
227 int ret;
228
229 if (parse_url(viewer_connection)) {
230 goto error;
231 }
232
94b828f3 233 host = gethostbyname(viewer_connection->relay_hostname->str);
7cdc2bab 234 if (!host) {
087bc060 235 BT_LOGE("Cannot lookup hostname %s",
94b828f3 236 viewer_connection->relay_hostname->str);
7cdc2bab
MD
237 goto error;
238 }
239
240 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
087bc060 241 BT_LOGE("Socket creation failed: %s", strerror(errno));
7cdc2bab
MD
242 goto error;
243 }
244
245 server_addr.sin_family = AF_INET;
246 server_addr.sin_port = htons(viewer_connection->port);
247 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
248 memset(&(server_addr.sin_zero), 0, 8);
249
250 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
251 sizeof(struct sockaddr)) == -1) {
087bc060 252 BT_LOGE("Connection failed: %s", strerror(errno));
7cdc2bab
MD
253 goto error;
254 }
255 if (lttng_live_handshake(viewer_connection)) {
256 goto error;
257 }
258
259 ret = 0;
260
261 return ret;
262
263error:
264 if (viewer_connection->control_sock >= 0) {
265 if (close(viewer_connection->control_sock)) {
087bc060 266 BT_LOGE("Close: %s", strerror(errno));
7cdc2bab
MD
267 }
268 }
269 viewer_connection->control_sock = -1;
270 return -1;
271}
272
273static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection)
274{
275 if (viewer_connection->control_sock < 0) {
276 return;
277 }
278 if (close(viewer_connection->control_sock)) {
087bc060 279 BT_LOGE("Close: %s", strerror(errno));
7cdc2bab
MD
280 viewer_connection->control_sock = -1;
281 }
282}
283
284static void connection_release(struct bt_object *obj)
285{
286 struct bt_live_viewer_connection *conn =
287 container_of(obj, struct bt_live_viewer_connection, obj);
288
289 bt_live_viewer_connection_destroy(conn);
290}
291
292static
293enum bt_value_status list_update_session(struct bt_value *results,
294 const struct lttng_viewer_session *session,
295 bool *_found)
296{
297 enum bt_value_status ret = BT_VALUE_STATUS_OK;
298 struct bt_value *map = NULL;
299 struct bt_value *hostname = NULL;
300 struct bt_value *session_name = NULL;
301 struct bt_value *btval = NULL;
302 int i, len;
303 bool found = false;
304
305 len = bt_value_array_size(results);
306 if (len < 0) {
307 ret = BT_VALUE_STATUS_ERROR;
308 goto end;
309 }
310 for (i = 0; i < len; i++) {
311 const char *hostname_str = NULL;
312 const char *session_name_str = NULL;
313
314 map = bt_value_array_get(results, (size_t) i);
315 if (!map) {
316 ret = BT_VALUE_STATUS_ERROR;
317 goto end;
318 }
319 hostname = bt_value_map_get(map, "target-hostname");
320 if (!hostname) {
321 ret = BT_VALUE_STATUS_ERROR;
322 goto end;
323 }
324 session_name = bt_value_map_get(map, "session-name");
325 if (!session_name) {
326 ret = BT_VALUE_STATUS_ERROR;
327 goto end;
328 }
329 ret = bt_value_string_get(hostname, &hostname_str);
330 if (ret != BT_VALUE_STATUS_OK) {
331 goto end;
332 }
333 ret = bt_value_string_get(session_name, &session_name_str);
334 if (ret != BT_VALUE_STATUS_OK) {
335 goto end;
336 }
337
338 if (!strcmp(session->hostname, hostname_str)
339 && !strcmp(session->session_name,
340 session_name_str)) {
341 int64_t val;
342 uint32_t streams = be32toh(session->streams);
343 uint32_t clients = be32toh(session->clients);
344
345 found = true;
346
347 btval = bt_value_map_get(map, "stream-count");
348 if (!btval) {
349 ret = BT_VALUE_STATUS_ERROR;
350 goto end;
351 }
352 ret = bt_value_integer_get(btval, &val);
353 if (ret != BT_VALUE_STATUS_OK) {
354 goto end;
355 }
356 /* sum */
357 val += streams;
358 ret = bt_value_integer_set(btval, val);
359 if (ret != BT_VALUE_STATUS_OK) {
360 goto end;
361 }
362 BT_PUT(btval);
363
364 btval = bt_value_map_get(map, "client-count");
365 if (!btval) {
366 ret = BT_VALUE_STATUS_ERROR;
367 goto end;
368 }
369 ret = bt_value_integer_get(btval, &val);
370 if (ret != BT_VALUE_STATUS_OK) {
371 goto end;
372 }
373 /* max */
374 val = max_t(int64_t, clients, val);
375 ret = bt_value_integer_set(btval, val);
376 if (ret != BT_VALUE_STATUS_OK) {
377 goto end;
378 }
379 BT_PUT(btval);
380 }
381
382 BT_PUT(hostname);
383 BT_PUT(session_name);
384 BT_PUT(map);
385
386 if (found) {
387 break;
388 }
389 }
390end:
391 BT_PUT(btval);
392 BT_PUT(hostname);
393 BT_PUT(session_name);
394 BT_PUT(map);
395 *_found = found;
396 return ret;
397}
398
399static
400enum bt_value_status list_append_session(struct bt_value *results,
401 GString *base_url,
402 const struct lttng_viewer_session *session)
403{
404 enum bt_value_status ret = BT_VALUE_STATUS_OK;
405 struct bt_value *map = NULL;
406 GString *url = NULL;
407 bool found = false;
408
409 /*
410 * If the session already exists, add the stream count to it,
411 * and do max of client counts.
412 */
413 ret = list_update_session(results, session, &found);
414 if (ret != BT_VALUE_STATUS_OK || found) {
415 goto end;
416 }
417
418 map = bt_value_map_create();
419 if (!map) {
420 ret = BT_VALUE_STATUS_ERROR;
421 goto end;
422 }
423
424 if (base_url->len < 1) {
425 ret = BT_VALUE_STATUS_ERROR;
426 goto end;
427 }
428 /*
429 * key = "url",
430 * value = <string>,
431 */
432 url = g_string_new(base_url->str);
433 g_string_append(url, "/host/");
434 g_string_append(url, session->hostname);
435 g_string_append_c(url, '/');
436 g_string_append(url, session->session_name);
437
438 ret = bt_value_map_insert_string(map, "url", url->str);
439 if (ret != BT_VALUE_STATUS_OK) {
440 goto end;
441 }
442
443 /*
444 * key = "target-hostname",
445 * value = <string>,
446 */
447 ret = bt_value_map_insert_string(map, "target-hostname",
448 session->hostname);
449 if (ret != BT_VALUE_STATUS_OK) {
450 goto end;
451 }
452
453 /*
454 * key = "session-name",
455 * value = <string>,
456 */
457 ret = bt_value_map_insert_string(map, "session-name",
458 session->session_name);
459 if (ret != BT_VALUE_STATUS_OK) {
460 goto end;
461 }
462
463 /*
464 * key = "timer-us",
465 * value = <integer>,
466 */
467 {
468 uint32_t live_timer = be32toh(session->live_timer);
469
470 ret = bt_value_map_insert_integer(map, "timer-us",
471 live_timer);
472 if (ret != BT_VALUE_STATUS_OK) {
473 goto end;
474 }
475 }
476
477 /*
478 * key = "stream-count",
479 * value = <integer>,
480 */
481 {
482 uint32_t streams = be32toh(session->streams);
483
484 ret = bt_value_map_insert_integer(map, "stream-count",
485 streams);
486 if (ret != BT_VALUE_STATUS_OK) {
487 goto end;
488 }
489 }
490
491
492 /*
493 * key = "client-count",
494 * value = <integer>,
495 */
496 {
497 uint32_t clients = be32toh(session->clients);
498
499 ret = bt_value_map_insert_integer(map, "client-count",
500 clients);
501 if (ret != BT_VALUE_STATUS_OK) {
502 goto end;
503 }
504 }
505
506 ret = bt_value_array_append(results, map);
507end:
508 if (url) {
509 g_string_free(url, TRUE);
510 }
511 BT_PUT(map);
512 return ret;
513}
514
515/*
516 * Data structure returned:
517 *
518 * {
519 * <array> = {
520 * [n] = {
521 * <map> = {
522 * {
523 * key = "url",
524 * value = <string>,
525 * },
526 * {
527 * key = "target-hostname",
528 * value = <string>,
529 * },
530 * {
531 * key = "session-name",
532 * value = <string>,
533 * },
534 * {
535 * key = "timer-us",
536 * value = <integer>,
537 * },
538 * {
539 * key = "stream-count",
540 * value = <integer>,
541 * },
542 * {
543 * key = "client-count",
544 * value = <integer>,
545 * },
546 * },
547 * }
548 * }
549 */
550
551BT_HIDDEN
552struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_connection *viewer_connection)
553{
554 struct bt_value *results = NULL;
555 struct lttng_viewer_cmd cmd;
556 struct lttng_viewer_list_sessions list;
557 uint32_t i, sessions_count;
558 ssize_t ret_len;
559
560 if (lttng_live_handshake(viewer_connection)) {
561 goto error;
562 }
563
564 results = bt_value_array_create();
565 if (!results) {
087bc060 566 BT_LOGE("Error creating array");
7cdc2bab
MD
567 goto error;
568 }
569
570 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
571 cmd.data_size = htobe64((uint64_t) 0);
572 cmd.cmd_version = htobe32(0);
573
4c66436f 574 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 575 if (ret_len < 0) {
087bc060 576 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
577 goto error;
578 }
579 assert(ret_len == sizeof(cmd));
580
4c66436f 581 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 582 if (ret_len == 0) {
087bc060 583 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
584 goto error;
585 }
586 if (ret_len < 0) {
087bc060 587 BT_LOGE("Error receiving session list: %s", strerror(errno));
7cdc2bab
MD
588 goto error;
589 }
590 assert(ret_len == sizeof(list));
591
592 sessions_count = be32toh(list.sessions_count);
593 for (i = 0; i < sessions_count; i++) {
594 struct lttng_viewer_session lsession;
595
4c66436f 596 ret_len = lttng_live_recv(viewer_connection,
7cdc2bab
MD
597 &lsession, sizeof(lsession));
598 if (ret_len == 0) {
087bc060 599 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
600 goto error;
601 }
602 if (ret_len < 0) {
087bc060 603 BT_LOGE("Error receiving session: %s", strerror(errno));
7cdc2bab
MD
604 goto error;
605 }
606 assert(ret_len == sizeof(lsession));
607 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
608 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
609 if (list_append_session(results,
610 viewer_connection->url, &lsession)
611 != BT_VALUE_STATUS_OK) {
612 goto error;
613 }
614 }
615 goto end;
616error:
617 BT_PUT(results);
618end:
619 return results;
620}
621
622static
623int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
624{
625 struct lttng_viewer_cmd cmd;
626 struct lttng_viewer_list_sessions list;
627 struct lttng_viewer_session lsession;
628 uint32_t i, sessions_count;
629 ssize_t ret_len;
630 uint64_t session_id;
631 struct bt_live_viewer_connection *viewer_connection =
632 lttng_live->viewer_connection;
633
634 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
635 cmd.data_size = htobe64((uint64_t) 0);
636 cmd.cmd_version = htobe32(0);
637
4c66436f 638 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 639 if (ret_len < 0) {
087bc060 640 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
641 goto error;
642 }
643 assert(ret_len == sizeof(cmd));
644
4c66436f 645 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 646 if (ret_len == 0) {
087bc060 647 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
648 goto error;
649 }
650 if (ret_len < 0) {
087bc060 651 BT_LOGE("Error receiving session list: %s", strerror(errno));
7cdc2bab
MD
652 goto error;
653 }
654 assert(ret_len == sizeof(list));
655
656 sessions_count = be32toh(list.sessions_count);
657 for (i = 0; i < sessions_count; i++) {
4c66436f 658 ret_len = lttng_live_recv(viewer_connection,
7cdc2bab
MD
659 &lsession, sizeof(lsession));
660 if (ret_len == 0) {
087bc060 661 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
662 goto error;
663 }
664 if (ret_len < 0) {
087bc060 665 BT_LOGE("Error receiving session: %s", strerror(errno));
7cdc2bab
MD
666 goto error;
667 }
668 assert(ret_len == sizeof(lsession));
669 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
670 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
671 session_id = be64toh(lsession.id);
672
673 if ((strncmp(lsession.session_name,
94b828f3 674 viewer_connection->session_name->str,
7cdc2bab 675 MAXNAMLEN) == 0) && (strncmp(lsession.hostname,
94b828f3 676 viewer_connection->target_hostname->str,
7cdc2bab
MD
677 MAXNAMLEN) == 0)) {
678 if (lttng_live_add_session(lttng_live, session_id)) {
679 goto error;
680 }
681 }
682 }
683
684 return 0;
685
686error:
087bc060 687 BT_LOGE("Unable to query session ids");
7cdc2bab
MD
688 return -1;
689}
690
691BT_HIDDEN
692int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
693{
694 struct lttng_viewer_cmd cmd;
695 struct lttng_viewer_create_session_response resp;
696 ssize_t ret_len;
697 struct bt_live_viewer_connection *viewer_connection =
698 lttng_live->viewer_connection;
699
700 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
701 cmd.data_size = htobe64((uint64_t) 0);
702 cmd.cmd_version = htobe32(0);
703
4c66436f 704 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 705 if (ret_len < 0) {
087bc060 706 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
707 goto error;
708 }
709 assert(ret_len == sizeof(cmd));
710
4c66436f 711 ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
7cdc2bab 712 if (ret_len == 0) {
087bc060 713 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
714 goto error;
715 }
716 if (ret_len < 0) {
087bc060 717 BT_LOGE("Error receiving create session reply: %s", strerror(errno));
7cdc2bab
MD
718 goto error;
719 }
720 assert(ret_len == sizeof(resp));
721
722 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
087bc060 723 BT_LOGE("Error creating viewer session");
7cdc2bab
MD
724 goto error;
725 }
726 if (lttng_live_query_session_ids(lttng_live)) {
727 goto error;
728 }
729
730 return 0;
731
732error:
733 return -1;
734}
735
736static
737int receive_streams(struct lttng_live_session *session,
738 uint32_t stream_count)
739{
740 ssize_t ret_len;
741 uint32_t i;
742 struct lttng_live_component *lttng_live = session->lttng_live;
743 struct bt_live_viewer_connection *viewer_connection =
744 lttng_live->viewer_connection;
745
087bc060 746 BT_LOGD("Getting %" PRIu32 " new streams:", stream_count);
7cdc2bab
MD
747 for (i = 0; i < stream_count; i++) {
748 struct lttng_viewer_stream stream;
749 struct lttng_live_stream_iterator *live_stream;
750 uint64_t stream_id;
751 uint64_t ctf_trace_id;
752
4c66436f 753 ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
7cdc2bab 754 if (ret_len == 0) {
087bc060 755 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
756 goto error;
757 }
758 if (ret_len < 0) {
087bc060 759 BT_LOGE("Error receiving stream");
7cdc2bab
MD
760 goto error;
761 }
762 assert(ret_len == sizeof(stream));
763 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
764 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
765 stream_id = be64toh(stream.id);
766 ctf_trace_id = be64toh(stream.ctf_trace_id);
767
768 if (stream.metadata_flag) {
087bc060 769 BT_LOGD(" metadata stream %" PRIu64 " : %s/%s",
7cdc2bab
MD
770 stream_id, stream.path_name,
771 stream.channel_name);
772 if (lttng_live_metadata_create_stream(session,
773 ctf_trace_id, stream_id)) {
087bc060 774 BT_LOGE("Error creating metadata stream");
7cdc2bab
MD
775
776 goto error;
777 }
778 session->lazy_stream_notif_init = true;
779 } else {
087bc060 780 BT_LOGD(" stream %" PRIu64 " : %s/%s",
7cdc2bab
MD
781 stream_id, stream.path_name,
782 stream.channel_name);
783 live_stream = lttng_live_stream_iterator_create(session,
784 ctf_trace_id, stream_id);
785 if (!live_stream) {
087bc060 786 BT_LOGE("Error creating streamn");
7cdc2bab
MD
787 goto error;
788 }
789 }
790 }
791 return 0;
792
793error:
794 return -1;
795}
796
797BT_HIDDEN
798int lttng_live_attach_session(struct lttng_live_session *session)
799{
800 struct lttng_viewer_cmd cmd;
801 struct lttng_viewer_attach_session_request rq;
802 struct lttng_viewer_attach_session_response rp;
803 ssize_t ret_len;
804 struct lttng_live_component *lttng_live = session->lttng_live;
805 struct bt_live_viewer_connection *viewer_connection =
806 lttng_live->viewer_connection;
807 uint64_t session_id = session->id;
808 uint32_t streams_count;
809
810 if (session->attached) {
811 return 0;
812 }
813
814 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
815 cmd.data_size = htobe64((uint64_t) sizeof(rq));
816 cmd.cmd_version = htobe32(0);
817
818 memset(&rq, 0, sizeof(rq));
819 rq.session_id = htobe64(session_id);
820 // TODO: add cmd line parameter to select seek beginning
821 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
822 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
823
4c66436f 824 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 825 if (ret_len < 0) {
087bc060 826 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
827 goto error;
828 }
829 assert(ret_len == sizeof(cmd));
830
4c66436f 831 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 832 if (ret_len < 0) {
087bc060 833 BT_LOGE("Error sending attach request: %s", strerror(errno));
7cdc2bab
MD
834 goto error;
835 }
836 assert(ret_len == sizeof(rq));
837
4c66436f 838 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 839 if (ret_len == 0) {
087bc060 840 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
841 goto error;
842 }
843 if (ret_len < 0) {
087bc060 844 BT_LOGE("Error receiving attach response: %s", strerror(errno));
7cdc2bab
MD
845 goto error;
846 }
847 assert(ret_len == sizeof(rp));
848
849 streams_count = be32toh(rp.streams_count);
850 switch(be32toh(rp.status)) {
851 case LTTNG_VIEWER_ATTACH_OK:
852 break;
853 case LTTNG_VIEWER_ATTACH_UNK:
087bc060 854 BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
855 goto error;
856 case LTTNG_VIEWER_ATTACH_ALREADY:
087bc060 857 BT_LOGW("There is already a viewer attached to this session");
7cdc2bab
MD
858 goto error;
859 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
087bc060 860 BT_LOGW("Not a live session");
7cdc2bab
MD
861 goto error;
862 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
087bc060 863 BT_LOGE("Wrong seek parameter");
7cdc2bab
MD
864 goto error;
865 default:
087bc060 866 BT_LOGE("Unknown attach return code %u", be32toh(rp.status));
7cdc2bab
MD
867 goto error;
868 }
869
870 /* We receive the initial list of streams. */
871 if (receive_streams(session, streams_count)) {
872 goto error;
873 }
874
875 session->attached = true;
876 session->new_streams_needed = false;
877
878 return 0;
879
880error:
881 return -1;
882}
883
884BT_HIDDEN
885int lttng_live_detach_session(struct lttng_live_session *session)
886{
887 struct lttng_viewer_cmd cmd;
888 struct lttng_viewer_detach_session_request rq;
889 struct lttng_viewer_detach_session_response rp;
890 ssize_t ret_len;
891 struct lttng_live_component *lttng_live = session->lttng_live;
892 struct bt_live_viewer_connection *viewer_connection =
893 lttng_live->viewer_connection;
894 uint64_t session_id = session->id;
895
896 if (!session->attached) {
897 return 0;
898 }
899
900 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
901 cmd.data_size = htobe64((uint64_t) sizeof(rq));
902 cmd.cmd_version = htobe32(0);
903
904 memset(&rq, 0, sizeof(rq));
905 rq.session_id = htobe64(session_id);
906
4c66436f 907 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 908 if (ret_len < 0) {
087bc060 909 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
910 goto error;
911 }
912 assert(ret_len == sizeof(cmd));
913
4c66436f 914 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 915 if (ret_len < 0) {
087bc060 916 BT_LOGE("Error sending detach request: %s", strerror(errno));
7cdc2bab
MD
917 goto error;
918 }
919 assert(ret_len == sizeof(rq));
920
4c66436f 921 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 922 if (ret_len == 0) {
087bc060 923 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
924 goto error;
925 }
926 if (ret_len < 0) {
087bc060 927 BT_LOGE("Error receiving detach response: %s", strerror(errno));
7cdc2bab
MD
928 goto error;
929 }
930 assert(ret_len == sizeof(rp));
931
932 switch(be32toh(rp.status)) {
933 case LTTNG_VIEWER_DETACH_SESSION_OK:
934 break;
935 case LTTNG_VIEWER_DETACH_SESSION_UNK:
087bc060 936 BT_LOGW("Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
937 goto error;
938 case LTTNG_VIEWER_DETACH_SESSION_ERR:
087bc060 939 BT_LOGW("Error detaching session id %" PRIu64 "", session_id);
7cdc2bab
MD
940 goto error;
941 default:
087bc060 942 BT_LOGE("Unknown detach return code %u", be32toh(rp.status));
7cdc2bab
MD
943 goto error;
944 }
945
946 session->attached = false;
947
948 return 0;
949
950error:
951 return -1;
952}
953
954BT_HIDDEN
955ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
956 FILE *fp)
957{
958 uint64_t len = 0;
959 int ret;
960 struct lttng_viewer_cmd cmd;
961 struct lttng_viewer_get_metadata rq;
962 struct lttng_viewer_metadata_packet rp;
963 char *data = NULL;
964 ssize_t ret_len;
965 struct lttng_live_session *session = trace->session;
966 struct lttng_live_component *lttng_live = session->lttng_live;
967 struct lttng_live_metadata *metadata = trace->metadata;
968 struct bt_live_viewer_connection *viewer_connection =
969 lttng_live->viewer_connection;
970
971 rq.stream_id = htobe64(metadata->stream_id);
972 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
973 cmd.data_size = htobe64((uint64_t) sizeof(rq));
974 cmd.cmd_version = htobe32(0);
975
4c66436f 976 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 977 if (ret_len < 0) {
087bc060 978 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
979 goto error;
980 }
981 assert(ret_len == sizeof(cmd));
982
4c66436f 983 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 984 if (ret_len < 0) {
087bc060 985 BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
7cdc2bab
MD
986 goto error;
987 }
988 assert(ret_len == sizeof(rq));
989
4c66436f 990 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 991 if (ret_len == 0) {
087bc060 992 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
993 goto error;
994 }
995 if (ret_len < 0) {
087bc060 996 BT_LOGE("Error receiving get_metadata response: %s", strerror(errno));
7cdc2bab
MD
997 goto error;
998 }
999 assert(ret_len == sizeof(rp));
1000
1001 switch (be32toh(rp.status)) {
1002 case LTTNG_VIEWER_METADATA_OK:
087bc060 1003 BT_LOGD("get_metadata : OK");
7cdc2bab
MD
1004 break;
1005 case LTTNG_VIEWER_NO_NEW_METADATA:
087bc060 1006 BT_LOGD("get_metadata : NO NEW");
7cdc2bab
MD
1007 ret = 0;
1008 goto end;
1009 case LTTNG_VIEWER_METADATA_ERR:
087bc060 1010 BT_LOGD("get_metadata : ERR");
7cdc2bab
MD
1011 goto error;
1012 default:
087bc060 1013 BT_LOGD("get_metadata : UNKNOWN");
7cdc2bab
MD
1014 goto error;
1015 }
1016
1017 len = be64toh(rp.len);
087bc060 1018 BT_LOGD("Writing %" PRIu64" bytes to metadata", len);
7cdc2bab
MD
1019 if (len <= 0) {
1020 goto error;
1021 }
1022
1023 data = zmalloc(len);
1024 if (!data) {
087bc060 1025 BT_LOGE("relay data zmalloc: %s", strerror(errno));
7cdc2bab
MD
1026 goto error;
1027 }
4c66436f 1028 ret_len = lttng_live_recv(viewer_connection, data, len);
7cdc2bab 1029 if (ret_len == 0) {
087bc060 1030 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
1031 goto error_free_data;
1032 }
1033 if (ret_len < 0) {
087bc060 1034 BT_LOGE("Error receiving trace packet: %s", strerror(errno));
7cdc2bab
MD
1035 goto error_free_data;
1036 }
1037 assert(ret_len == len);
1038
1039 do {
1040 ret_len = fwrite(data, 1, len, fp);
1041 } while (ret_len < 0 && errno == EINTR);
1042 if (ret_len < 0) {
087bc060 1043 BT_LOGE("Writing in the metadata fp");
7cdc2bab
MD
1044 goto error_free_data;
1045 }
1046 assert(ret_len == len);
1047 free(data);
1048 ret = len;
1049end:
1050 return ret;
1051
1052error_free_data:
1053 free(data);
1054error:
1055 return -1;
1056}
1057
1058/*
1059 * Assign the fields from a lttng_viewer_index to a packet_index.
1060 */
1061static
1062void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1063 struct packet_index *pindex)
1064{
1065 assert(lindex);
1066 assert(pindex);
1067
1068 pindex->offset = be64toh(lindex->offset);
1069 pindex->packet_size = be64toh(lindex->packet_size);
1070 pindex->content_size = be64toh(lindex->content_size);
1071 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1072 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1073 pindex->events_discarded = be64toh(lindex->events_discarded);
1074}
1075
1076BT_HIDDEN
1077enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live,
1078 struct lttng_live_stream_iterator *stream,
1079 struct packet_index *index)
1080{
1081 struct lttng_viewer_cmd cmd;
1082 struct lttng_viewer_get_next_index rq;
1083 ssize_t ret_len;
1084 struct lttng_viewer_index rp;
1085 uint32_t flags, status;
1086 enum bt_ctf_lttng_live_iterator_status retstatus =
1087 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1088 struct bt_live_viewer_connection *viewer_connection =
1089 lttng_live->viewer_connection;
1090 struct lttng_live_trace *trace = stream->trace;
1091
1092 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1093 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1094 cmd.cmd_version = htobe32(0);
1095
1096 memset(&rq, 0, sizeof(rq));
1097 rq.stream_id = htobe64(stream->viewer_stream_id);
1098
4c66436f 1099 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 1100 if (ret_len < 0) {
087bc060 1101 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
1102 goto error;
1103 }
1104 assert(ret_len == sizeof(cmd));
1105
4c66436f 1106 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 1107 if (ret_len < 0) {
087bc060 1108 BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
7cdc2bab
MD
1109 goto error;
1110 }
1111 assert(ret_len == sizeof(rq));
1112
4c66436f 1113 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1114 if (ret_len == 0) {
087bc060 1115 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
1116 goto error;
1117 }
1118 if (ret_len < 0) {
087bc060 1119 BT_LOGE("Error receiving get_next_index response: %s", strerror(errno));
7cdc2bab
MD
1120 goto error;
1121 }
1122 assert(ret_len == sizeof(rp));
1123
1124 flags = be32toh(rp.flags);
1125 status = be32toh(rp.status);
1126
1127 switch (status) {
1128 case LTTNG_VIEWER_INDEX_INACTIVE:
1129 {
1130 uint64_t ctf_stream_class_id;
1131
087bc060 1132 BT_LOGD("get_next_index: inactive");
7cdc2bab
MD
1133 memset(index, 0, sizeof(struct packet_index));
1134 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1135 stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end;
1136 ctf_stream_class_id = be64toh(rp.stream_id);
1137 if (stream->ctf_stream_class_id != -1ULL) {
1138 assert(stream->ctf_stream_class_id ==
1139 ctf_stream_class_id);
1140 } else {
1141 stream->ctf_stream_class_id = ctf_stream_class_id;
1142 }
1143 stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
1144 break;
1145 }
1146 case LTTNG_VIEWER_INDEX_OK:
1147 {
1148 uint64_t ctf_stream_class_id;
1149
087bc060 1150 BT_LOGD("get_next_index: OK");
7cdc2bab
MD
1151 lttng_index_to_packet_index(&rp, index);
1152 ctf_stream_class_id = be64toh(rp.stream_id);
1153 if (stream->ctf_stream_class_id != -1ULL) {
1154 assert(stream->ctf_stream_class_id ==
1155 ctf_stream_class_id);
1156 } else {
1157 stream->ctf_stream_class_id = ctf_stream_class_id;
1158 }
1159
1160 stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
1161 stream->current_packet_end_timestamp =
1162 index->ts_cycles.timestamp_end;
1163
1164 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
087bc060 1165 BT_LOGD("get_next_index: new metadata needed");
7cdc2bab
MD
1166 trace->new_metadata_needed = true;
1167 }
1168 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
087bc060 1169 BT_LOGD("get_next_index: new streams needed");
7cdc2bab
MD
1170 lttng_live_need_new_streams(lttng_live);
1171 }
1172 break;
1173 }
1174 case LTTNG_VIEWER_INDEX_RETRY:
087bc060 1175 BT_LOGD("get_next_index: retry");
7cdc2bab
MD
1176 memset(index, 0, sizeof(struct packet_index));
1177 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1178 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1179 goto end;
1180 case LTTNG_VIEWER_INDEX_HUP:
087bc060 1181 BT_LOGD("get_next_index: stream hung up");
7cdc2bab
MD
1182 memset(index, 0, sizeof(struct packet_index));
1183 index->offset = EOF;
1184 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1185 stream->state = LTTNG_LIVE_STREAM_EOF;
1186 break;
1187 case LTTNG_VIEWER_INDEX_ERR:
087bc060 1188 BT_LOGE("get_next_index: error");
7cdc2bab
MD
1189 memset(index, 0, sizeof(struct packet_index));
1190 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1191 goto error;
1192 default:
087bc060 1193 BT_LOGE("get_next_index: unknown value");
7cdc2bab
MD
1194 memset(index, 0, sizeof(struct packet_index));
1195 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1196 goto error;
1197 }
1198end:
1199 return retstatus;
1200
1201error:
4c66436f
MD
1202 if (bt_graph_is_canceled(lttng_live->graph)) {
1203 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1204 } else {
1205 retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1206 }
7cdc2bab
MD
1207 return retstatus;
1208}
1209
1210BT_HIDDEN
1211enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live,
1212 struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset,
1213 uint64_t req_len, uint64_t *recv_len)
1214{
1215 enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK;
1216 struct lttng_viewer_cmd cmd;
1217 struct lttng_viewer_get_packet rq;
1218 struct lttng_viewer_trace_packet rp;
1219 ssize_t ret_len;
1220 uint32_t flags, status;
1221 struct bt_live_viewer_connection *viewer_connection =
1222 lttng_live->viewer_connection;
1223 struct lttng_live_trace *trace = stream->trace;
1224
087bc060 1225 BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
7cdc2bab
MD
1226 offset, req_len);
1227 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1228 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1229 cmd.cmd_version = htobe32(0);
1230
1231 memset(&rq, 0, sizeof(rq));
1232 rq.stream_id = htobe64(stream->viewer_stream_id);
1233 rq.offset = htobe64(offset);
1234 rq.len = htobe32(req_len);
1235
4c66436f 1236 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 1237 if (ret_len < 0) {
087bc060 1238 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
1239 goto error;
1240 }
1241 assert(ret_len == sizeof(cmd));
1242
4c66436f 1243 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 1244 if (ret_len < 0) {
087bc060 1245 BT_LOGE("Error sending get_data request: %s", strerror(errno));
7cdc2bab
MD
1246 goto error;
1247 }
1248 assert(ret_len == sizeof(rq));
1249
4c66436f 1250 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1251 if (ret_len == 0) {
087bc060 1252 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
1253 goto error;
1254 }
1255 if (ret_len < 0) {
087bc060 1256 BT_LOGE("Error receiving get_data response: %s", strerror(errno));
7cdc2bab
MD
1257 goto error;
1258 }
1259 if (ret_len != sizeof(rp)) {
087bc060
MD
1260 BT_LOGE("get_data_packet: expected %zu"
1261 ", received %zd", sizeof(rp),
7cdc2bab
MD
1262 ret_len);
1263 goto error;
1264 }
1265
1266 flags = be32toh(rp.flags);
1267 status = be32toh(rp.status);
1268
1269 switch (status) {
1270 case LTTNG_VIEWER_GET_PACKET_OK:
1271 req_len = be32toh(rp.len);
087bc060 1272 BT_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len);
7cdc2bab
MD
1273 break;
1274 case LTTNG_VIEWER_GET_PACKET_RETRY:
1275 /* Unimplemented by relay daemon */
087bc060 1276 BT_LOGD("get_data_packet: retry");
7cdc2bab
MD
1277 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1278 goto end;
1279 case LTTNG_VIEWER_GET_PACKET_ERR:
1280 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
087bc060 1281 BT_LOGD("get_data_packet: new metadata needed, try again later");
7cdc2bab
MD
1282 trace->new_metadata_needed = true;
1283 }
1284 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
087bc060 1285 BT_LOGD("get_data_packet: new streams needed, try again later");
7cdc2bab
MD
1286 lttng_live_need_new_streams(lttng_live);
1287 }
1288 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1289 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1290 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1291 goto end;
1292 }
087bc060 1293 BT_LOGE("get_data_packet: error");
7cdc2bab
MD
1294 goto error;
1295 case LTTNG_VIEWER_GET_PACKET_EOF:
1296 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF;
1297 goto end;
1298 default:
087bc060 1299 BT_LOGE("get_data_packet: unknown");
7cdc2bab
MD
1300 goto error;
1301 }
1302
1303 if (req_len == 0) {
1304 goto error;
1305 }
1306
4c66436f 1307 ret_len = lttng_live_recv(viewer_connection, buf, req_len);
7cdc2bab 1308 if (ret_len == 0) {
087bc060 1309 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
1310 goto error;
1311 }
1312 if (ret_len < 0) {
087bc060 1313 BT_LOGE("Error receiving trace packet: %s", strerror(errno));
7cdc2bab
MD
1314 goto error;
1315 }
1316 assert(ret_len == req_len);
1317 *recv_len = ret_len;
1318end:
1319 return retstatus;
1320
1321error:
4c66436f
MD
1322 if (bt_graph_is_canceled(lttng_live->graph)) {
1323 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
1324 } else {
1325 retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
1326 }
7cdc2bab
MD
1327 return retstatus;
1328}
1329
1330/*
1331 * Request new streams for a session.
1332 */
1333BT_HIDDEN
1334enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
1335 struct lttng_live_session *session)
1336{
1337 enum bt_ctf_lttng_live_iterator_status status =
1338 BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1339 struct lttng_viewer_cmd cmd;
1340 struct lttng_viewer_new_streams_request rq;
1341 struct lttng_viewer_new_streams_response rp;
1342 ssize_t ret_len;
1343 struct lttng_live_component *lttng_live = session->lttng_live;
1344 struct bt_live_viewer_connection *viewer_connection =
1345 lttng_live->viewer_connection;
1346 uint32_t streams_count;
1347
1348 if (!session->new_streams_needed) {
1349 return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK;
1350 }
1351
1352 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1353 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1354 cmd.cmd_version = htobe32(0);
1355
1356 memset(&rq, 0, sizeof(rq));
1357 rq.session_id = htobe64(session->id);
1358
4c66436f 1359 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
7cdc2bab 1360 if (ret_len < 0) {
087bc060 1361 BT_LOGE("Error sending cmd: %s", strerror(errno));
7cdc2bab
MD
1362 goto error;
1363 }
1364 assert(ret_len == sizeof(cmd));
1365
4c66436f 1366 ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
7cdc2bab 1367 if (ret_len < 0) {
087bc060 1368 BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
7cdc2bab
MD
1369 goto error;
1370 }
1371 assert(ret_len == sizeof(rq));
1372
4c66436f 1373 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1374 if (ret_len == 0) {
087bc060 1375 BT_LOGI("Remote side has closed connection");
7cdc2bab
MD
1376 goto error;
1377 }
1378 if (ret_len < 0) {
087bc060 1379 BT_LOGE("Error receiving get_new_streams response");
7cdc2bab
MD
1380 goto error;
1381 }
1382 assert(ret_len == sizeof(rp));
1383
1384 streams_count = be32toh(rp.streams_count);
1385
1386 switch(be32toh(rp.status)) {
1387 case LTTNG_VIEWER_NEW_STREAMS_OK:
1388 session->new_streams_needed = false;
1389 break;
1390 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1391 session->new_streams_needed = false;
1392 goto end;
1393 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1394 session->new_streams_needed = false;
1395 session->closed = true;
1396 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END;
1397 goto end;
1398 case LTTNG_VIEWER_NEW_STREAMS_ERR:
087bc060 1399 BT_LOGE("get_new_streams error");
7cdc2bab
MD
1400 goto error;
1401 default:
087bc060 1402 BT_LOGE("Unknown return code %u", be32toh(rp.status));
7cdc2bab
MD
1403 goto error;
1404 }
1405
1406 if (receive_streams(session, streams_count)) {
1407 goto error;
1408 }
1409end:
1410 return status;
1411
1412error:
4c66436f 1413 if (bt_graph_is_canceled(lttng_live->graph)) {
3cdf4234 1414 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 1415 } else {
3cdf4234 1416 status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4c66436f 1417 }
7cdc2bab
MD
1418 return status;
1419}
1420
1421BT_HIDDEN
1422struct bt_live_viewer_connection *
4c66436f
MD
1423 bt_live_viewer_connection_create(const char *url,
1424 struct lttng_live_component *lttng_live)
7cdc2bab
MD
1425{
1426 struct bt_live_viewer_connection *viewer_connection;
1427
1428 viewer_connection = g_new0(struct bt_live_viewer_connection, 1);
1429
1430 bt_object_init(&viewer_connection->obj, connection_release);
1431 viewer_connection->control_sock = -1;
1432 viewer_connection->port = -1;
4c66436f 1433 viewer_connection->lttng_live = lttng_live;
7cdc2bab
MD
1434 viewer_connection->url = g_string_new(url);
1435 if (!viewer_connection->url) {
1436 goto error;
1437 }
1438
087bc060 1439 BT_LOGD("Establishing connection to url \"%s\"...", url);
7cdc2bab
MD
1440 if (lttng_live_connect_viewer(viewer_connection)) {
1441 goto error_report;
1442 }
087bc060 1443 BT_LOGD("Connection to url \"%s\" is established", url);
7cdc2bab
MD
1444 return viewer_connection;
1445
1446error_report:
087bc060 1447 BT_LOGW("Failure to establish connection to url \"%s\"", url);
7cdc2bab
MD
1448error:
1449 g_free(viewer_connection);
1450 return NULL;
1451}
1452
1453BT_HIDDEN
1454void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_connection)
1455{
087bc060 1456 BT_LOGD("Closing connection to url \"%s\"", viewer_connection->url->str);
7cdc2bab
MD
1457 lttng_live_disconnect_viewer(viewer_connection);
1458 g_string_free(viewer_connection->url, TRUE);
94b828f3
MD
1459 if (viewer_connection->relay_hostname) {
1460 g_string_free(viewer_connection->relay_hostname, TRUE);
1461 }
1462 if (viewer_connection->target_hostname) {
1463 g_string_free(viewer_connection->target_hostname, TRUE);
1464 }
1465 if (viewer_connection->session_name) {
1466 g_string_free(viewer_connection->session_name, TRUE);
1467 }
7cdc2bab
MD
1468 g_free(viewer_connection);
1469}
This page took 0.084869 seconds and 4 git commands to generate.