Fix: usage of `bt_value_array_get_length()`
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.c
CommitLineData
7cdc2bab 1/*
bb18709b 2 * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
7cdc2bab
MD
3 * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
5278d5ba 24#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
1c0ceb65 25#define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
b03487ab 26#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
3fa1b6a3 27#include "logging/comp-logging.h"
020bc26f 28
7cdc2bab
MD
29#include <stdio.h>
30#include <stdint.h>
31#include <stdlib.h>
32#include <stdbool.h>
33#include <unistd.h>
34#include <glib.h>
35#include <inttypes.h>
7cdc2bab 36#include <sys/types.h>
7cdc2bab 37#include <fcntl.h>
7cdc2bab 38
57952005
MJ
39#include "compat/socket.h"
40#include "compat/endian.h"
41#include "compat/compiler.h"
42#include "common/common.h"
71c5da58 43#include <babeltrace2/babeltrace.h>
7cdc2bab 44
bb18709b 45#include "lttng-live.h"
7cdc2bab
MD
46#include "viewer-connection.h"
47#include "lttng-viewer-abi.h"
48#include "data-stream.h"
49#include "metadata.h"
50
bb18709b
FD
51static
52ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
4c66436f 53 void *buf, size_t len)
7cdc2bab
MD
54{
55 ssize_t ret;
56 size_t copied = 0, to_copy = len;
bb18709b
FD
57 struct lttng_live_msg_iter *lttng_live_msg_iter =
58 viewer_connection->lttng_live_msg_iter;
a2147eb2 59 BT_SOCKET sock = viewer_connection->control_sock;
7cdc2bab
MD
60
61 do {
a2147eb2 62 ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
7cdc2bab 63 if (ret > 0) {
8b45963b 64 BT_ASSERT(ret <= to_copy);
7cdc2bab
MD
65 copied += ret;
66 to_copy -= ret;
67 }
a2147eb2 68 if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
bb18709b 69 if (!viewer_connection->in_query &&
d73bb381 70 lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
4c66436f
MD
71 break;
72 } else {
73 continue;
74 }
75 }
76 } while (ret > 0 && to_copy > 0);
7cdc2bab
MD
77 if (ret > 0)
78 ret = copied;
a2147eb2 79 /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
7cdc2bab
MD
80 return ret;
81}
82
bb18709b
FD
83static
84ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
4c66436f 85 const void *buf, size_t len)
7cdc2bab 86{
bb18709b
FD
87 struct lttng_live_msg_iter *lttng_live_msg_iter =
88 viewer_connection->lttng_live_msg_iter;
a2147eb2 89 BT_SOCKET sock = viewer_connection->control_sock;
7cdc2bab
MD
90 ssize_t ret;
91
4c66436f 92 for (;;) {
a2147eb2
MJ
93 ret = bt_socket_send_nosigpipe(sock, buf, len);
94 if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
bb18709b 95 if (!viewer_connection->in_query &&
d73bb381 96 lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
4c66436f
MD
97 break;
98 } else {
99 continue;
100 }
101 } else {
102 break;
103 }
104 }
7cdc2bab
MD
105 return ret;
106}
107
bb18709b
FD
108static
109int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 110{
94b828f3
MD
111 char error_buf[256] = { 0 };
112 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
113 int ret = -1;
7cdc2bab 114 const char *path = viewer_connection->url->str;
7cdc2bab
MD
115
116 if (!path) {
117 goto end;
118 }
7cdc2bab 119
0b6986b4
FD
120 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
121 sizeof(error_buf));
94b828f3 122 if (!lttng_live_url_parts.proto) {
5278d5ba 123 BT_COMP_LOGW("Invalid LTTng live URL format: %s", error_buf);
7cdc2bab
MD
124 goto end;
125 }
7cdc2bab 126
0b6986b4 127 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
94b828f3
MD
128 lttng_live_url_parts.hostname = NULL;
129
130 if (lttng_live_url_parts.port >= 0) {
131 viewer_connection->port = lttng_live_url_parts.port;
132 } else {
7cdc2bab
MD
133 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
134 }
135
0b6986b4 136 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
94b828f3
MD
137 lttng_live_url_parts.target_hostname = NULL;
138
139 if (lttng_live_url_parts.session_name) {
0b6986b4 140 viewer_connection->session_name = lttng_live_url_parts.session_name;
94b828f3 141 lttng_live_url_parts.session_name = NULL;
7cdc2bab
MD
142 }
143
5278d5ba 144 BT_COMP_LOGI("Connecting to hostname : %s, port : %d, "
0b6986b4
FD
145 "target hostname : %s, session name : %s, proto : %s",
146 viewer_connection->relay_hostname->str,
147 viewer_connection->port,
148 !viewer_connection->target_hostname ?
149 "<none>" : viewer_connection->target_hostname->str,
150 !viewer_connection->session_name ?
151 "<none>" : viewer_connection->session_name->str,
152 lttng_live_url_parts.proto->str);
7cdc2bab
MD
153 ret = 0;
154
155end:
94b828f3 156 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
7cdc2bab
MD
157 return ret;
158}
159
bb18709b
FD
160static
161int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab
MD
162{
163 struct lttng_viewer_cmd cmd;
164 struct lttng_viewer_connect connect;
02b23da7
JR
165 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
166 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
167 int ret;
168 ssize_t ret_len;
169
170 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
171 cmd.data_size = htobe64((uint64_t) sizeof(connect));
172 cmd.cmd_version = htobe32(0);
173
174 connect.viewer_session_id = -1ULL; /* will be set on recv */
175 connect.major = htobe32(LTTNG_LIVE_MAJOR);
176 connect.minor = htobe32(LTTNG_LIVE_MINOR);
177 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
178
02b23da7
JR
179 /*
180 * Merge the cmd and connection request to prevent a write-write
181 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
182 * second write to be performed quickly in presence of Nagle's algorithm
183 */
184 memcpy(cmd_buf, &cmd, sizeof(cmd));
185 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
02b23da7 186 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 187 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 188 BT_COMP_LOGE("Error sending version: %s", bt_socket_errormsg());
7cdc2bab
MD
189 goto error;
190 }
8b45963b
PP
191
192 BT_ASSERT(ret_len == cmd_buf_len);
7cdc2bab 193
4c66436f 194 ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
7cdc2bab 195 if (ret_len == 0) {
5278d5ba 196 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
197 goto error;
198 }
a2147eb2 199 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 200 BT_COMP_LOGE("Error receiving version: %s", bt_socket_errormsg());
7cdc2bab
MD
201 goto error;
202 }
8b45963b 203 BT_ASSERT(ret_len == sizeof(connect));
7cdc2bab 204
5278d5ba 205 BT_COMP_LOGI("Received viewer session ID : %" PRIu64,
ba412a2f 206 (uint64_t) be64toh(connect.viewer_session_id));
5278d5ba 207 BT_COMP_LOGI("Relayd version : %u.%u", be32toh(connect.major),
7cdc2bab
MD
208 be32toh(connect.minor));
209
210 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
5278d5ba 211 BT_COMP_LOGE("Incompatible lttng-relayd protocol");
7cdc2bab
MD
212 goto error;
213 }
214 /* Use the smallest protocol version implemented. */
215 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
216 viewer_connection->minor = be32toh(connect.minor);
217 } else {
218 viewer_connection->minor = LTTNG_LIVE_MINOR;
219 }
220 viewer_connection->major = LTTNG_LIVE_MAJOR;
221 ret = 0;
222 return ret;
223
224error:
5278d5ba 225 BT_COMP_LOGE("Unable to establish connection");
7cdc2bab
MD
226 return -1;
227}
228
bb18709b
FD
229static
230int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab
MD
231{
232 struct hostent *host;
233 struct sockaddr_in server_addr;
234 int ret;
235
236 if (parse_url(viewer_connection)) {
237 goto error;
238 }
239
94b828f3 240 host = gethostbyname(viewer_connection->relay_hostname->str);
7cdc2bab 241 if (!host) {
5278d5ba 242 BT_COMP_LOGE("Cannot lookup hostname %s",
94b828f3 243 viewer_connection->relay_hostname->str);
7cdc2bab
MD
244 goto error;
245 }
246
a2147eb2 247 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
5278d5ba 248 BT_COMP_LOGE("Socket creation failed: %s", bt_socket_errormsg());
7cdc2bab
MD
249 goto error;
250 }
251
252 server_addr.sin_family = AF_INET;
253 server_addr.sin_port = htons(viewer_connection->port);
254 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
255 memset(&(server_addr.sin_zero), 0, 8);
256
257 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
a2147eb2 258 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
5278d5ba 259 BT_COMP_LOGE("Connection failed: %s", bt_socket_errormsg());
7cdc2bab
MD
260 goto error;
261 }
262 if (lttng_live_handshake(viewer_connection)) {
263 goto error;
264 }
265
266 ret = 0;
267
268 return ret;
269
270error:
a2147eb2
MJ
271 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
272 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
5278d5ba 273 BT_COMP_LOGE("Close: %s", bt_socket_errormsg());
7cdc2bab
MD
274 }
275 }
a2147eb2 276 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab
MD
277 return -1;
278}
279
bb18709b
FD
280static
281void lttng_live_disconnect_viewer(
282 struct live_viewer_connection *viewer_connection)
7cdc2bab 283{
a2147eb2 284 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
7cdc2bab
MD
285 return;
286 }
a2147eb2 287 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
5278d5ba 288 BT_COMP_LOGE("Close: %s", bt_socket_errormsg());
a2147eb2 289 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab
MD
290 }
291}
292
bb18709b
FD
293static
294void connection_release(bt_object *obj)
7cdc2bab 295{
bb18709b
FD
296 struct live_viewer_connection *conn =
297 container_of(obj, struct live_viewer_connection, obj);
7cdc2bab 298
bb18709b 299 live_viewer_connection_destroy(conn);
7cdc2bab
MD
300}
301
302static
bb18709b 303int list_update_session(bt_value *results,
7cdc2bab 304 const struct lttng_viewer_session *session,
1c0ceb65 305 bool *_found, struct live_viewer_connection *viewer_connection)
7cdc2bab 306{
b602018b
FD
307 int ret = 0;
308 uint64_t i, len;
8eee8ea2
PP
309 bt_value *map = NULL;
310 bt_value *hostname = NULL;
311 bt_value *session_name = NULL;
312 bt_value *btval = NULL;
7cdc2bab
MD
313 bool found = false;
314
1270d0e9 315 len = bt_value_array_get_length(results);
7cdc2bab
MD
316 for (i = 0; i < len; i++) {
317 const char *hostname_str = NULL;
318 const char *session_name_str = NULL;
319
b602018b 320 map = bt_value_array_borrow_element_by_index(results, i);
7cdc2bab 321 if (!map) {
5278d5ba 322 BT_COMP_LOGE_STR("Error borrowing map.");
bb18709b 323 ret = -1;
7cdc2bab
MD
324 goto end;
325 }
bb18709b 326 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
7cdc2bab 327 if (!hostname) {
5278d5ba 328 BT_COMP_LOGE_STR("Error borrowing \"target-hostname\" entry.");
bb18709b 329 ret = -1;
7cdc2bab
MD
330 goto end;
331 }
bb18709b 332 session_name = bt_value_map_borrow_entry_value(map, "session-name");
7cdc2bab 333 if (!session_name) {
5278d5ba 334 BT_COMP_LOGE_STR("Error borrowing \"session-name\" entry.");
bb18709b 335 ret = -1;
7cdc2bab
MD
336 goto end;
337 }
b5cdc106
PP
338 hostname_str = bt_value_string_get(hostname);
339 session_name_str = bt_value_string_get(session_name);
7cdc2bab 340
acfa8112
PP
341 if (strcmp(session->hostname, hostname_str) == 0
342 && strcmp(session->session_name, session_name_str) == 0) {
7cdc2bab
MD
343 int64_t val;
344 uint32_t streams = be32toh(session->streams);
345 uint32_t clients = be32toh(session->clients);
346
347 found = true;
348
bb18709b 349 btval = bt_value_map_borrow_entry_value(map, "stream-count");
7cdc2bab 350 if (!btval) {
5278d5ba 351 BT_COMP_LOGE_STR("Error borrowing \"stream-count\" entry.");
bb18709b 352 ret = -1;
7cdc2bab
MD
353 goto end;
354 }
9d9daa04 355 val = bt_value_integer_unsigned_get(btval);
7cdc2bab
MD
356 /* sum */
357 val += streams;
9d9daa04 358 bt_value_integer_unsigned_set(btval, val);
7cdc2bab 359
bb18709b 360 btval = bt_value_map_borrow_entry_value(map, "client-count");
7cdc2bab 361 if (!btval) {
5278d5ba 362 BT_COMP_LOGE_STR("Error borrowing \"client-count\" entry.");
bb18709b 363 ret = -1;
7cdc2bab
MD
364 goto end;
365 }
9d9daa04 366 val = bt_value_integer_unsigned_get(btval);
7cdc2bab 367 /* max */
85e7137b 368 val = bt_max_t(int64_t, clients, val);
9d9daa04 369 bt_value_integer_unsigned_set(btval, val);
7cdc2bab
MD
370 }
371
7cdc2bab
MD
372 if (found) {
373 break;
374 }
375 }
376end:
7cdc2bab
MD
377 *_found = found;
378 return ret;
379}
380
381static
bb18709b 382int list_append_session(bt_value *results,
7cdc2bab 383 GString *base_url,
1c0ceb65
PP
384 const struct lttng_viewer_session *session,
385 struct live_viewer_connection *viewer_connection)
7cdc2bab 386{
bb18709b 387 int ret = 0;
fb25b9e3
PP
388 bt_value_map_insert_entry_status insert_status;
389 bt_value_array_append_element_status append_status;
8eee8ea2 390 bt_value *map = NULL;
7cdc2bab
MD
391 GString *url = NULL;
392 bool found = false;
393
394 /*
395 * If the session already exists, add the stream count to it,
396 * and do max of client counts.
397 */
1c0ceb65 398 ret = list_update_session(results, session, &found, viewer_connection);
bb18709b 399 if (ret || found) {
7cdc2bab
MD
400 goto end;
401 }
402
bb18709b 403 map = bt_value_map_create();
7cdc2bab 404 if (!map) {
5278d5ba 405 BT_COMP_LOGE_STR("Error creating map value.");
bb18709b 406 ret = -1;
7cdc2bab
MD
407 goto end;
408 }
409
410 if (base_url->len < 1) {
5278d5ba 411 BT_COMP_LOGE_STR("Error: base_url length smaller than 1.");
bb18709b 412 ret = -1;
7cdc2bab
MD
413 goto end;
414 }
415 /*
416 * key = "url",
417 * value = <string>,
418 */
419 url = g_string_new(base_url->str);
420 g_string_append(url, "/host/");
421 g_string_append(url, session->hostname);
422 g_string_append_c(url, '/');
423 g_string_append(url, session->session_name);
424
fb25b9e3
PP
425 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
426 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 427 BT_COMP_LOGE_STR("Error inserting \"url\" entry.");
bb18709b 428 ret = -1;
7cdc2bab
MD
429 goto end;
430 }
431
432 /*
433 * key = "target-hostname",
434 * value = <string>,
435 */
fb25b9e3 436 insert_status = bt_value_map_insert_string_entry(map, "target-hostname",
7cdc2bab 437 session->hostname);
fb25b9e3 438 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 439 BT_COMP_LOGE_STR("Error inserting \"target-hostname\" entry.");
bb18709b 440 ret = -1;
7cdc2bab
MD
441 goto end;
442 }
443
444 /*
445 * key = "session-name",
446 * value = <string>,
447 */
fb25b9e3 448 insert_status = bt_value_map_insert_string_entry(map, "session-name",
7cdc2bab 449 session->session_name);
fb25b9e3 450 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 451 BT_COMP_LOGE_STR("Error inserting \"session-name\" entry.");
bb18709b 452 ret = -1;
7cdc2bab
MD
453 goto end;
454 }
455
456 /*
457 * key = "timer-us",
458 * value = <integer>,
459 */
460 {
461 uint32_t live_timer = be32toh(session->live_timer);
462
9d9daa04 463 insert_status = bt_value_map_insert_unsigned_integer_entry(
68d9d039 464 map, "timer-us", live_timer);
fb25b9e3 465 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 466 BT_COMP_LOGE_STR("Error inserting \"timer-us\" entry.");
bb18709b 467 ret = -1;
7cdc2bab
MD
468 goto end;
469 }
470 }
471
472 /*
473 * key = "stream-count",
474 * value = <integer>,
475 */
476 {
477 uint32_t streams = be32toh(session->streams);
478
9d9daa04 479 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
68d9d039 480 "stream-count", streams);
fb25b9e3 481 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 482 BT_COMP_LOGE_STR("Error inserting \"stream-count\" entry.");
bb18709b 483 ret = -1;
7cdc2bab
MD
484 goto end;
485 }
486 }
487
7cdc2bab
MD
488 /*
489 * key = "client-count",
490 * value = <integer>,
491 */
492 {
493 uint32_t clients = be32toh(session->clients);
494
9d9daa04 495 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
68d9d039 496 "client-count", clients);
fb25b9e3 497 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
5278d5ba 498 BT_COMP_LOGE_STR("Error inserting \"client-count\" entry.");
bb18709b 499 ret = -1;
7cdc2bab
MD
500 goto end;
501 }
502 }
503
fb25b9e3
PP
504 append_status = bt_value_array_append_element(results, map);
505 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
5278d5ba 506 BT_COMP_LOGE_STR("Error appending map to results.");
bb18709b
FD
507 ret = -1;
508 }
509
7cdc2bab
MD
510end:
511 if (url) {
bb18709b 512 g_string_free(url, true);
7cdc2bab 513 }
8c6884d9 514 BT_VALUE_PUT_REF_AND_RESET(map);
7cdc2bab
MD
515 return ret;
516}
517
518/*
519 * Data structure returned:
520 *
521 * {
522 * <array> = {
523 * [n] = {
524 * <map> = {
525 * {
526 * key = "url",
527 * value = <string>,
528 * },
529 * {
530 * key = "target-hostname",
531 * value = <string>,
532 * },
533 * {
534 * key = "session-name",
535 * value = <string>,
536 * },
537 * {
538 * key = "timer-us",
539 * value = <integer>,
540 * },
541 * {
542 * key = "stream-count",
543 * value = <integer>,
544 * },
545 * {
546 * key = "client-count",
547 * value = <integer>,
548 * },
549 * },
550 * }
551 * }
552 */
553
554BT_HIDDEN
fb25b9e3 555bt_component_class_query_method_status live_viewer_connection_list_sessions(
bb18709b
FD
556 struct live_viewer_connection *viewer_connection,
557 const bt_value **user_result)
7cdc2bab 558{
fb25b9e3
PP
559 bt_component_class_query_method_status status =
560 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
bb18709b 561 bt_value *result = NULL;
7cdc2bab
MD
562 struct lttng_viewer_cmd cmd;
563 struct lttng_viewer_list_sessions list;
564 uint32_t i, sessions_count;
565 ssize_t ret_len;
566
bb18709b
FD
567 result = bt_value_array_create();
568 if (!result) {
5278d5ba 569 BT_COMP_LOGE("Error creating array");
fb25b9e3 570 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
7cdc2bab
MD
571 goto error;
572 }
573
574 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
575 cmd.data_size = htobe64((uint64_t) 0);
576 cmd.cmd_version = htobe32(0);
577
4c66436f 578 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
a2147eb2 579 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 580 BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg());
fb25b9e3 581 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
582 goto error;
583 }
8b45963b 584 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 585
4c66436f 586 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 587 if (ret_len == 0) {
5278d5ba 588 BT_COMP_LOGI("Remote side has closed connection");
fb25b9e3 589 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
590 goto error;
591 }
a2147eb2 592 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 593 BT_COMP_LOGE("Error receiving session list: %s", bt_socket_errormsg());
fb25b9e3 594 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
595 goto error;
596 }
8b45963b 597 BT_ASSERT(ret_len == sizeof(list));
7cdc2bab
MD
598
599 sessions_count = be32toh(list.sessions_count);
600 for (i = 0; i < sessions_count; i++) {
601 struct lttng_viewer_session lsession;
602
bb18709b
FD
603 ret_len = lttng_live_recv(viewer_connection, &lsession,
604 sizeof(lsession));
7cdc2bab 605 if (ret_len == 0) {
5278d5ba 606 BT_COMP_LOGI("Remote side has closed connection");
fb25b9e3 607 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
608 goto error;
609 }
a2147eb2 610 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 611 BT_COMP_LOGE("Error receiving session: %s", bt_socket_errormsg());
fb25b9e3 612 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
613 goto error;
614 }
8b45963b 615 BT_ASSERT(ret_len == sizeof(lsession));
7cdc2bab
MD
616 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
617 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
bb18709b 618 if (list_append_session(result, viewer_connection->url,
1c0ceb65 619 &lsession, viewer_connection)) {
fb25b9e3 620 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
621 goto error;
622 }
623 }
bb18709b
FD
624
625 *user_result = result;
7cdc2bab
MD
626 goto end;
627error:
bb18709b 628 BT_VALUE_PUT_REF_AND_RESET(result);
7cdc2bab 629end:
bb18709b 630 return status;
7cdc2bab
MD
631}
632
633static
bb18709b 634int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab
MD
635{
636 struct lttng_viewer_cmd cmd;
637 struct lttng_viewer_list_sessions list;
638 struct lttng_viewer_session lsession;
639 uint32_t i, sessions_count;
640 ssize_t ret_len;
641 uint64_t session_id;
bb18709b
FD
642 struct live_viewer_connection *viewer_connection =
643 lttng_live_msg_iter->viewer_connection;
7cdc2bab
MD
644
645 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
646 cmd.data_size = htobe64((uint64_t) 0);
647 cmd.cmd_version = htobe32(0);
648
4c66436f 649 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
a2147eb2 650 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 651 BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg());
7cdc2bab
MD
652 goto error;
653 }
8b45963b 654 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 655
4c66436f 656 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 657 if (ret_len == 0) {
5278d5ba 658 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
659 goto error;
660 }
a2147eb2 661 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 662 BT_COMP_LOGE("Error receiving session list: %s", bt_socket_errormsg());
7cdc2bab
MD
663 goto error;
664 }
8b45963b 665 BT_ASSERT(ret_len == sizeof(list));
7cdc2bab
MD
666
667 sessions_count = be32toh(list.sessions_count);
668 for (i = 0; i < sessions_count; i++) {
4c66436f 669 ret_len = lttng_live_recv(viewer_connection,
7cdc2bab
MD
670 &lsession, sizeof(lsession));
671 if (ret_len == 0) {
5278d5ba 672 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
673 goto error;
674 }
a2147eb2 675 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 676 BT_COMP_LOGE("Error receiving session: %s", bt_socket_errormsg());
7cdc2bab
MD
677 goto error;
678 }
8b45963b 679 BT_ASSERT(ret_len == sizeof(lsession));
7cdc2bab
MD
680 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
681 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
682 session_id = be64toh(lsession.id);
683
5278d5ba 684 BT_COMP_LOGI("Adding session %" PRIu64 " hostname: %s session_name: %s",
06994c71
MD
685 session_id, lsession.hostname, lsession.session_name);
686
7cdc2bab 687 if ((strncmp(lsession.session_name,
94b828f3 688 viewer_connection->session_name->str,
a2147eb2 689 LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
94b828f3 690 viewer_connection->target_hostname->str,
a2147eb2 691 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
bb18709b 692 if (lttng_live_add_session(lttng_live_msg_iter, session_id,
06994c71
MD
693 lsession.hostname,
694 lsession.session_name)) {
7cdc2bab
MD
695 goto error;
696 }
697 }
698 }
699
700 return 0;
701
702error:
5278d5ba 703 BT_COMP_LOGE("Unable to query session ids");
7cdc2bab
MD
704 return -1;
705}
706
707BT_HIDDEN
bb18709b
FD
708int lttng_live_create_viewer_session(
709 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab
MD
710{
711 struct lttng_viewer_cmd cmd;
712 struct lttng_viewer_create_session_response resp;
713 ssize_t ret_len;
bb18709b
FD
714 struct live_viewer_connection *viewer_connection =
715 lttng_live_msg_iter->viewer_connection;
7cdc2bab
MD
716
717 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
718 cmd.data_size = htobe64((uint64_t) 0);
719 cmd.cmd_version = htobe32(0);
720
4c66436f 721 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
a2147eb2 722 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 723 BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg());
7cdc2bab
MD
724 goto error;
725 }
8b45963b 726 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 727
4c66436f 728 ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
7cdc2bab 729 if (ret_len == 0) {
5278d5ba 730 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
731 goto error;
732 }
a2147eb2 733 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 734 BT_COMP_LOGE("Error receiving create session reply: %s", bt_socket_errormsg());
7cdc2bab
MD
735 goto error;
736 }
8b45963b 737 BT_ASSERT(ret_len == sizeof(resp));
7cdc2bab
MD
738
739 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
5278d5ba 740 BT_COMP_LOGE("Error creating viewer session");
7cdc2bab
MD
741 goto error;
742 }
bb18709b 743 if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
7cdc2bab
MD
744 goto error;
745 }
746
747 return 0;
748
749error:
750 return -1;
751}
752
753static
754int receive_streams(struct lttng_live_session *session,
755 uint32_t stream_count)
756{
757 ssize_t ret_len;
758 uint32_t i;
bb18709b 759 struct lttng_live_msg_iter *lttng_live_msg_iter =
0b6986b4 760 session->lttng_live_msg_iter;
bb18709b 761 struct live_viewer_connection *viewer_connection =
0b6986b4 762 lttng_live_msg_iter->viewer_connection;
7cdc2bab 763
5278d5ba 764 BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count);
7cdc2bab
MD
765 for (i = 0; i < stream_count; i++) {
766 struct lttng_viewer_stream stream;
767 struct lttng_live_stream_iterator *live_stream;
768 uint64_t stream_id;
769 uint64_t ctf_trace_id;
770
4c66436f 771 ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
7cdc2bab 772 if (ret_len == 0) {
5278d5ba 773 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
774 goto error;
775 }
a2147eb2 776 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 777 BT_COMP_LOGE("Error receiving stream");
7cdc2bab
MD
778 goto error;
779 }
8b45963b 780 BT_ASSERT(ret_len == sizeof(stream));
7cdc2bab
MD
781 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
782 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
783 stream_id = be64toh(stream.id);
784 ctf_trace_id = be64toh(stream.ctf_trace_id);
785
786 if (stream.metadata_flag) {
5278d5ba 787 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
0b6986b4 788 stream_id, stream.path_name, stream.channel_name);
7cdc2bab 789 if (lttng_live_metadata_create_stream(session,
06994c71
MD
790 ctf_trace_id, stream_id,
791 stream.path_name)) {
5278d5ba 792 BT_COMP_LOGE("Error creating metadata stream");
7cdc2bab
MD
793
794 goto error;
795 }
b09a5592 796 session->lazy_stream_msg_init = true;
7cdc2bab 797 } else {
5278d5ba 798 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
0b6986b4 799 stream_id, stream.path_name, stream.channel_name);
7cdc2bab
MD
800 live_stream = lttng_live_stream_iterator_create(session,
801 ctf_trace_id, stream_id);
802 if (!live_stream) {
5278d5ba 803 BT_COMP_LOGE("Error creating streamn");
7cdc2bab
MD
804 goto error;
805 }
806 }
807 }
808 return 0;
809
810error:
811 return -1;
812}
813
814BT_HIDDEN
815int lttng_live_attach_session(struct lttng_live_session *session)
816{
817 struct lttng_viewer_cmd cmd;
818 struct lttng_viewer_attach_session_request rq;
819 struct lttng_viewer_attach_session_response rp;
0b6986b4
FD
820 struct lttng_live_msg_iter *lttng_live_msg_iter =
821 session->lttng_live_msg_iter;
bb18709b 822 struct live_viewer_connection *viewer_connection =
0b6986b4 823 lttng_live_msg_iter->viewer_connection;
7cdc2bab
MD
824 uint64_t session_id = session->id;
825 uint32_t streams_count;
02b23da7
JR
826 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
827 char cmd_buf[cmd_buf_len];
0b6986b4 828 ssize_t ret_len;
7cdc2bab 829
7cdc2bab
MD
830 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
831 cmd.data_size = htobe64((uint64_t) sizeof(rq));
832 cmd.cmd_version = htobe32(0);
833
834 memset(&rq, 0, sizeof(rq));
835 rq.session_id = htobe64(session_id);
836 // TODO: add cmd line parameter to select seek beginning
837 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
838 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
839
02b23da7
JR
840 /*
841 * Merge the cmd and connection request to prevent a write-write
842 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
843 * second write to be performed quickly in presence of Nagle's algorithm.
844 */
845 memcpy(cmd_buf, &cmd, sizeof(cmd));
846 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 847 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 848 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 849 BT_COMP_LOGE("Error sending attach request: %s", bt_socket_errormsg());
7cdc2bab
MD
850 goto error;
851 }
7cdc2bab 852
8b45963b 853 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 854 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 855 if (ret_len == 0) {
5278d5ba 856 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
857 goto error;
858 }
a2147eb2 859 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 860 BT_COMP_LOGE("Error receiving attach response: %s", bt_socket_errormsg());
7cdc2bab
MD
861 goto error;
862 }
8b45963b 863 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
864
865 streams_count = be32toh(rp.streams_count);
866 switch(be32toh(rp.status)) {
867 case LTTNG_VIEWER_ATTACH_OK:
868 break;
869 case LTTNG_VIEWER_ATTACH_UNK:
5278d5ba 870 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
871 goto error;
872 case LTTNG_VIEWER_ATTACH_ALREADY:
5278d5ba 873 BT_COMP_LOGW("There is already a viewer attached to this session");
7cdc2bab
MD
874 goto error;
875 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
5278d5ba 876 BT_COMP_LOGW("Not a live session");
7cdc2bab
MD
877 goto error;
878 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
5278d5ba 879 BT_COMP_LOGE("Wrong seek parameter");
7cdc2bab
MD
880 goto error;
881 default:
5278d5ba 882 BT_COMP_LOGE("Unknown attach return code %u", be32toh(rp.status));
7cdc2bab
MD
883 goto error;
884 }
885
886 /* We receive the initial list of streams. */
887 if (receive_streams(session, streams_count)) {
888 goto error;
889 }
890
891 session->attached = true;
892 session->new_streams_needed = false;
893
894 return 0;
895
896error:
897 return -1;
898}
899
900BT_HIDDEN
901int lttng_live_detach_session(struct lttng_live_session *session)
902{
903 struct lttng_viewer_cmd cmd;
904 struct lttng_viewer_detach_session_request rq;
905 struct lttng_viewer_detach_session_response rp;
906 ssize_t ret_len;
0b6986b4
FD
907 struct lttng_live_msg_iter *lttng_live_msg_iter =
908 session->lttng_live_msg_iter;
bb18709b 909 struct live_viewer_connection *viewer_connection =
0b6986b4 910 lttng_live_msg_iter->viewer_connection;
7cdc2bab 911 uint64_t session_id = session->id;
02b23da7
JR
912 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
913 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
914
915 if (!session->attached) {
916 return 0;
917 }
918
919 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
920 cmd.data_size = htobe64((uint64_t) sizeof(rq));
921 cmd.cmd_version = htobe32(0);
922
923 memset(&rq, 0, sizeof(rq));
924 rq.session_id = htobe64(session_id);
925
02b23da7
JR
926 /*
927 * Merge the cmd and connection request to prevent a write-write
928 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
929 * second write to be performed quickly in presence of Nagle's algorithm.
930 */
931 memcpy(cmd_buf, &cmd, sizeof(cmd));
932 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 933 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 934 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 935 BT_COMP_LOGE("Error sending detach request: %s", bt_socket_errormsg());
7cdc2bab
MD
936 goto error;
937 }
7cdc2bab 938
8b45963b 939 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 940 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 941 if (ret_len == 0) {
5278d5ba 942 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
943 goto error;
944 }
a2147eb2 945 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 946 BT_COMP_LOGE("Error receiving detach response: %s", bt_socket_errormsg());
7cdc2bab
MD
947 goto error;
948 }
8b45963b 949 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
950
951 switch(be32toh(rp.status)) {
952 case LTTNG_VIEWER_DETACH_SESSION_OK:
953 break;
954 case LTTNG_VIEWER_DETACH_SESSION_UNK:
5278d5ba 955 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
956 goto error;
957 case LTTNG_VIEWER_DETACH_SESSION_ERR:
5278d5ba 958 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
7cdc2bab
MD
959 goto error;
960 default:
5278d5ba 961 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
7cdc2bab
MD
962 goto error;
963 }
964
965 session->attached = false;
966
967 return 0;
968
969error:
970 return -1;
971}
972
973BT_HIDDEN
974ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
975 FILE *fp)
976{
977 uint64_t len = 0;
978 int ret;
979 struct lttng_viewer_cmd cmd;
980 struct lttng_viewer_get_metadata rq;
981 struct lttng_viewer_metadata_packet rp;
982 char *data = NULL;
983 ssize_t ret_len;
984 struct lttng_live_session *session = trace->session;
0b6986b4
FD
985 struct lttng_live_msg_iter *lttng_live_msg_iter =
986 session->lttng_live_msg_iter;
7cdc2bab 987 struct lttng_live_metadata *metadata = trace->metadata;
bb18709b 988 struct live_viewer_connection *viewer_connection =
0b6986b4 989 lttng_live_msg_iter->viewer_connection;
02b23da7
JR
990 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
991 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
992
993 rq.stream_id = htobe64(metadata->stream_id);
994 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
995 cmd.data_size = htobe64((uint64_t) sizeof(rq));
996 cmd.cmd_version = htobe32(0);
997
02b23da7
JR
998 /*
999 * Merge the cmd and connection request to prevent a write-write
1000 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1001 * second write to be performed quickly in presence of Nagle's algorithm.
1002 */
1003 memcpy(cmd_buf, &cmd, sizeof(cmd));
1004 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 1005 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 1006 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1007 BT_COMP_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
7cdc2bab
MD
1008 goto error;
1009 }
7cdc2bab 1010
8b45963b 1011 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1012 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1013 if (ret_len == 0) {
5278d5ba 1014 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1015 goto error;
1016 }
a2147eb2 1017 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1018 BT_COMP_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg());
7cdc2bab
MD
1019 goto error;
1020 }
8b45963b 1021 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1022
1023 switch (be32toh(rp.status)) {
1024 case LTTNG_VIEWER_METADATA_OK:
5278d5ba 1025 BT_COMP_LOGD("get_metadata : OK");
7cdc2bab
MD
1026 break;
1027 case LTTNG_VIEWER_NO_NEW_METADATA:
5278d5ba 1028 BT_COMP_LOGD("get_metadata : NO NEW");
7cdc2bab
MD
1029 ret = 0;
1030 goto end;
1031 case LTTNG_VIEWER_METADATA_ERR:
5278d5ba 1032 BT_COMP_LOGD("get_metadata : ERR");
7cdc2bab
MD
1033 goto error;
1034 default:
5278d5ba 1035 BT_COMP_LOGD("get_metadata : UNKNOWN");
7cdc2bab
MD
1036 goto error;
1037 }
1038
1039 len = be64toh(rp.len);
5278d5ba 1040 BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
7cdc2bab
MD
1041 if (len <= 0) {
1042 goto error;
1043 }
1044
85e7137b 1045 data = calloc(1, len);
7cdc2bab 1046 if (!data) {
5278d5ba 1047 BT_COMP_LOGE("relay data calloc: %s", strerror(errno));
7cdc2bab
MD
1048 goto error;
1049 }
4c66436f 1050 ret_len = lttng_live_recv(viewer_connection, data, len);
7cdc2bab 1051 if (ret_len == 0) {
5278d5ba 1052 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1053 goto error_free_data;
1054 }
a2147eb2 1055 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1056 BT_COMP_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
7cdc2bab
MD
1057 goto error_free_data;
1058 }
8b45963b 1059 BT_ASSERT(ret_len == len);
7cdc2bab
MD
1060
1061 do {
1062 ret_len = fwrite(data, 1, len, fp);
1063 } while (ret_len < 0 && errno == EINTR);
1064 if (ret_len < 0) {
5278d5ba 1065 BT_COMP_LOGE("Writing in the metadata fp");
7cdc2bab
MD
1066 goto error_free_data;
1067 }
8b45963b 1068 BT_ASSERT(ret_len == len);
7cdc2bab
MD
1069 free(data);
1070 ret = len;
1071end:
1072 return ret;
1073
1074error_free_data:
1075 free(data);
1076error:
1077 return -1;
1078}
1079
1080/*
1081 * Assign the fields from a lttng_viewer_index to a packet_index.
1082 */
1083static
1084void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1085 struct packet_index *pindex)
1086{
8b45963b
PP
1087 BT_ASSERT(lindex);
1088 BT_ASSERT(pindex);
7cdc2bab
MD
1089
1090 pindex->offset = be64toh(lindex->offset);
1091 pindex->packet_size = be64toh(lindex->packet_size);
1092 pindex->content_size = be64toh(lindex->content_size);
1093 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1094 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1095 pindex->events_discarded = be64toh(lindex->events_discarded);
1096}
1097
1098BT_HIDDEN
bb18709b
FD
1099enum lttng_live_iterator_status lttng_live_get_next_index(
1100 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
1101 struct lttng_live_stream_iterator *stream,
1102 struct packet_index *index)
1103{
1104 struct lttng_viewer_cmd cmd;
1105 struct lttng_viewer_get_next_index rq;
7cdc2bab 1106 struct lttng_viewer_index rp;
0b6986b4 1107 enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK;
bb18709b
FD
1108 struct live_viewer_connection *viewer_connection =
1109 lttng_live_msg_iter->viewer_connection;
7cdc2bab 1110 struct lttng_live_trace *trace = stream->trace;
02b23da7
JR
1111 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1112 char cmd_buf[cmd_buf_len];
0b6986b4
FD
1113 uint32_t flags, status;
1114 ssize_t ret_len;
7cdc2bab
MD
1115
1116 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1117 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1118 cmd.cmd_version = htobe32(0);
1119
02b23da7 1120
7cdc2bab
MD
1121 memset(&rq, 0, sizeof(rq));
1122 rq.stream_id = htobe64(stream->viewer_stream_id);
1123
02b23da7
JR
1124 /*
1125 * Merge the cmd and connection request to prevent a write-write
1126 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1127 * second write to be performed quickly in presence of Nagle's algorithm.
1128 */
1129 memcpy(cmd_buf, &cmd, sizeof(cmd));
1130 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 1131 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 1132 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1133 BT_COMP_LOGE("Error sending get_next_index request: %s",
bb18709b 1134 bt_socket_errormsg());
7cdc2bab
MD
1135 goto error;
1136 }
7cdc2bab 1137
8b45963b 1138 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1139 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1140 if (ret_len == 0) {
5278d5ba 1141 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1142 goto error;
1143 }
a2147eb2 1144 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1145 BT_COMP_LOGE("Error receiving get_next_index response: %s",
bb18709b 1146 bt_socket_errormsg());
7cdc2bab
MD
1147 goto error;
1148 }
8b45963b 1149 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1150
1151 flags = be32toh(rp.flags);
1152 status = be32toh(rp.status);
1153
1154 switch (status) {
1155 case LTTNG_VIEWER_INDEX_INACTIVE:
1156 {
1157 uint64_t ctf_stream_class_id;
1158
5278d5ba 1159 BT_COMP_LOGD("get_next_index: inactive");
7cdc2bab
MD
1160 memset(index, 0, sizeof(struct packet_index));
1161 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
bb18709b 1162 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
7cdc2bab
MD
1163 ctf_stream_class_id = be64toh(rp.stream_id);
1164 if (stream->ctf_stream_class_id != -1ULL) {
8b45963b 1165 BT_ASSERT(stream->ctf_stream_class_id ==
7cdc2bab
MD
1166 ctf_stream_class_id);
1167 } else {
1168 stream->ctf_stream_class_id = ctf_stream_class_id;
1169 }
1170 stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
1171 break;
1172 }
1173 case LTTNG_VIEWER_INDEX_OK:
1174 {
1175 uint64_t ctf_stream_class_id;
1176
5278d5ba 1177 BT_COMP_LOGD("get_next_index: OK");
7cdc2bab
MD
1178 lttng_index_to_packet_index(&rp, index);
1179 ctf_stream_class_id = be64toh(rp.stream_id);
1180 if (stream->ctf_stream_class_id != -1ULL) {
8b45963b 1181 BT_ASSERT(stream->ctf_stream_class_id ==
7cdc2bab
MD
1182 ctf_stream_class_id);
1183 } else {
1184 stream->ctf_stream_class_id = ctf_stream_class_id;
1185 }
1186
1187 stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
7cdc2bab
MD
1188
1189 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
5278d5ba 1190 BT_COMP_LOGD("get_next_index: new metadata needed");
7cdc2bab
MD
1191 trace->new_metadata_needed = true;
1192 }
1193 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
5278d5ba 1194 BT_COMP_LOGD("get_next_index: new streams needed");
bb18709b 1195 lttng_live_need_new_streams(lttng_live_msg_iter);
7cdc2bab
MD
1196 }
1197 break;
1198 }
1199 case LTTNG_VIEWER_INDEX_RETRY:
5278d5ba 1200 BT_COMP_LOGD("get_next_index: retry");
7cdc2bab 1201 memset(index, 0, sizeof(struct packet_index));
bb18709b 1202 retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
1203 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1204 goto end;
1205 case LTTNG_VIEWER_INDEX_HUP:
5278d5ba 1206 BT_COMP_LOGD("get_next_index: stream hung up");
7cdc2bab
MD
1207 memset(index, 0, sizeof(struct packet_index));
1208 index->offset = EOF;
bb18709b 1209 retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 1210 stream->state = LTTNG_LIVE_STREAM_EOF;
5b337d32 1211 stream->has_stream_hung_up = true;
7cdc2bab
MD
1212 break;
1213 case LTTNG_VIEWER_INDEX_ERR:
5278d5ba 1214 BT_COMP_LOGE("get_next_index: error");
7cdc2bab
MD
1215 memset(index, 0, sizeof(struct packet_index));
1216 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1217 goto error;
1218 default:
5278d5ba 1219 BT_COMP_LOGE("get_next_index: unknown value");
7cdc2bab
MD
1220 memset(index, 0, sizeof(struct packet_index));
1221 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1222 goto error;
1223 }
1224end:
1225 return retstatus;
1226
1227error:
d73bb381 1228 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
bb18709b 1229 retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 1230 } else {
bb18709b 1231 retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4c66436f 1232 }
7cdc2bab
MD
1233 return retstatus;
1234}
1235
1236BT_HIDDEN
bb18709b
FD
1237enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
1238 struct lttng_live_msg_iter *lttng_live_msg_iter,
1239 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1240 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1241{
b09a5592 1242 enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
0b6986b4 1243 struct lttng_viewer_trace_packet rp;
7cdc2bab
MD
1244 struct lttng_viewer_cmd cmd;
1245 struct lttng_viewer_get_packet rq;
bb18709b
FD
1246 struct live_viewer_connection *viewer_connection =
1247 lttng_live_msg_iter->viewer_connection;
7cdc2bab 1248 struct lttng_live_trace *trace = stream->trace;
02b23da7
JR
1249 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1250 char cmd_buf[cmd_buf_len];
0b6986b4
FD
1251 uint32_t flags, status;
1252 ssize_t ret_len;
7cdc2bab 1253
5278d5ba 1254 BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
7cdc2bab
MD
1255 offset, req_len);
1256 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1257 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1258 cmd.cmd_version = htobe32(0);
1259
1260 memset(&rq, 0, sizeof(rq));
1261 rq.stream_id = htobe64(stream->viewer_stream_id);
1262 rq.offset = htobe64(offset);
1263 rq.len = htobe32(req_len);
1264
02b23da7
JR
1265 /*
1266 * Merge the cmd and connection request to prevent a write-write
1267 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1268 * second write to be performed quickly in presence of Nagle's algorithm.
1269 */
1270 memcpy(cmd_buf, &cmd, sizeof(cmd));
1271 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 1272 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 1273 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1274 BT_COMP_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
7cdc2bab
MD
1275 goto error;
1276 }
7cdc2bab 1277
8b45963b 1278 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1279 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1280 if (ret_len == 0) {
5278d5ba 1281 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1282 goto error;
1283 }
a2147eb2 1284 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1285 BT_COMP_LOGE("Error receiving get_data response: %s", bt_socket_errormsg());
7cdc2bab
MD
1286 goto error;
1287 }
1288 if (ret_len != sizeof(rp)) {
5278d5ba 1289 BT_COMP_LOGE("get_data_packet: expected %zu"
087bc060 1290 ", received %zd", sizeof(rp),
7cdc2bab
MD
1291 ret_len);
1292 goto error;
1293 }
1294
1295 flags = be32toh(rp.flags);
1296 status = be32toh(rp.status);
1297
1298 switch (status) {
1299 case LTTNG_VIEWER_GET_PACKET_OK:
1300 req_len = be32toh(rp.len);
5278d5ba 1301 BT_COMP_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len);
7cdc2bab
MD
1302 break;
1303 case LTTNG_VIEWER_GET_PACKET_RETRY:
1304 /* Unimplemented by relay daemon */
5278d5ba 1305 BT_COMP_LOGD("get_data_packet: retry");
b09a5592 1306 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
7cdc2bab
MD
1307 goto end;
1308 case LTTNG_VIEWER_GET_PACKET_ERR:
1309 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
5278d5ba 1310 BT_COMP_LOGD("get_data_packet: new metadata needed, try again later");
7cdc2bab
MD
1311 trace->new_metadata_needed = true;
1312 }
1313 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
5278d5ba 1314 BT_COMP_LOGD("get_data_packet: new streams needed, try again later");
bb18709b 1315 lttng_live_need_new_streams(lttng_live_msg_iter);
7cdc2bab
MD
1316 }
1317 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1318 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
b09a5592 1319 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
7cdc2bab
MD
1320 goto end;
1321 }
5278d5ba 1322 BT_COMP_LOGE("get_data_packet: error");
7cdc2bab
MD
1323 goto error;
1324 case LTTNG_VIEWER_GET_PACKET_EOF:
b09a5592 1325 retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF;
7cdc2bab
MD
1326 goto end;
1327 default:
5278d5ba 1328 BT_COMP_LOGE("get_data_packet: unknown");
7cdc2bab
MD
1329 goto error;
1330 }
1331
1332 if (req_len == 0) {
1333 goto error;
1334 }
1335
4c66436f 1336 ret_len = lttng_live_recv(viewer_connection, buf, req_len);
7cdc2bab 1337 if (ret_len == 0) {
5278d5ba 1338 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1339 goto error;
1340 }
a2147eb2 1341 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1342 BT_COMP_LOGE("Error receiving trace packet: %s", bt_socket_errormsg());
7cdc2bab
MD
1343 goto error;
1344 }
8b45963b 1345 BT_ASSERT(ret_len == req_len);
7cdc2bab
MD
1346 *recv_len = ret_len;
1347end:
1348 return retstatus;
1349
1350error:
d73bb381 1351 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
b09a5592 1352 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
4c66436f 1353 } else {
b09a5592 1354 retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
4c66436f 1355 }
7cdc2bab
MD
1356 return retstatus;
1357}
1358
1359/*
1360 * Request new streams for a session.
1361 */
1362BT_HIDDEN
bb18709b 1363enum lttng_live_iterator_status lttng_live_get_new_streams(
7cdc2bab
MD
1364 struct lttng_live_session *session)
1365{
bb18709b
FD
1366 enum lttng_live_iterator_status status =
1367 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
1368 struct lttng_viewer_cmd cmd;
1369 struct lttng_viewer_new_streams_request rq;
1370 struct lttng_viewer_new_streams_response rp;
bb18709b
FD
1371 struct lttng_live_msg_iter *lttng_live_msg_iter =
1372 session->lttng_live_msg_iter;
1373 struct live_viewer_connection *viewer_connection =
1374 lttng_live_msg_iter->viewer_connection;
7cdc2bab 1375 uint32_t streams_count;
02b23da7
JR
1376 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1377 char cmd_buf[cmd_buf_len];
0b6986b4 1378 ssize_t ret_len;
7cdc2bab
MD
1379
1380 if (!session->new_streams_needed) {
bb18709b 1381 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
1382 }
1383
1384 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1385 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1386 cmd.cmd_version = htobe32(0);
1387
1388 memset(&rq, 0, sizeof(rq));
1389 rq.session_id = htobe64(session->id);
1390
02b23da7
JR
1391 /*
1392 * Merge the cmd and connection request to prevent a write-write
1393 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1394 * second write to be performed quickly in presence of Nagle's algorithm.
1395 */
1396 memcpy(cmd_buf, &cmd, sizeof(cmd));
1397 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
02b23da7 1398 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
a2147eb2 1399 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1400 BT_COMP_LOGE("Error sending get_new_streams request: %s",
bb18709b 1401 bt_socket_errormsg());
7cdc2bab
MD
1402 goto error;
1403 }
7cdc2bab 1404
8b45963b 1405 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1406 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1407 if (ret_len == 0) {
5278d5ba 1408 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1409 goto error;
1410 }
a2147eb2 1411 if (ret_len == BT_SOCKET_ERROR) {
5278d5ba 1412 BT_COMP_LOGE("Error receiving get_new_streams response");
7cdc2bab
MD
1413 goto error;
1414 }
8b45963b 1415 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1416
1417 streams_count = be32toh(rp.streams_count);
1418
1419 switch(be32toh(rp.status)) {
1420 case LTTNG_VIEWER_NEW_STREAMS_OK:
1421 session->new_streams_needed = false;
1422 break;
1423 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1424 session->new_streams_needed = false;
1425 goto end;
1426 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1427 session->new_streams_needed = false;
1428 session->closed = true;
bb18709b 1429 status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1430 goto end;
1431 case LTTNG_VIEWER_NEW_STREAMS_ERR:
5278d5ba 1432 BT_COMP_LOGE("get_new_streams error");
7cdc2bab
MD
1433 goto error;
1434 default:
5278d5ba 1435 BT_COMP_LOGE("Unknown return code %u", be32toh(rp.status));
7cdc2bab
MD
1436 goto error;
1437 }
1438
1439 if (receive_streams(session, streams_count)) {
1440 goto error;
1441 }
1442end:
1443 return status;
1444
1445error:
d73bb381 1446 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
bb18709b 1447 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 1448 } else {
bb18709b 1449 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4c66436f 1450 }
7cdc2bab
MD
1451 return status;
1452}
1453
1454BT_HIDDEN
bb18709b
FD
1455struct live_viewer_connection *live_viewer_connection_create(
1456 const char *url, bool in_query,
ca06281d
PP
1457 struct lttng_live_msg_iter *lttng_live_msg_iter,
1458 bt_logging_level log_level)
7cdc2bab 1459{
bb18709b 1460 struct live_viewer_connection *viewer_connection;
7cdc2bab 1461
bb18709b 1462 viewer_connection = g_new0(struct live_viewer_connection, 1);
7cdc2bab 1463
ca06281d 1464 if (bt_socket_init(log_level) != 0) {
a2147eb2
MJ
1465 goto error;
1466 }
1467
ca06281d
PP
1468 viewer_connection->log_level = log_level;
1469
1470 if (lttng_live_msg_iter) {
1471 viewer_connection->self_comp = lttng_live_msg_iter->self_comp;
1472 }
1473
bb18709b 1474 bt_object_init_shared(&viewer_connection->obj, connection_release);
a2147eb2 1475 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab 1476 viewer_connection->port = -1;
bb18709b
FD
1477 viewer_connection->in_query = in_query;
1478 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
7cdc2bab
MD
1479 viewer_connection->url = g_string_new(url);
1480 if (!viewer_connection->url) {
1481 goto error;
1482 }
1483
5278d5ba 1484 BT_COMP_LOGI("Establishing connection to url \"%s\"...", url);
7cdc2bab
MD
1485 if (lttng_live_connect_viewer(viewer_connection)) {
1486 goto error_report;
1487 }
5278d5ba 1488 BT_COMP_LOGI("Connection to url \"%s\" is established", url);
7cdc2bab
MD
1489 return viewer_connection;
1490
1491error_report:
5278d5ba 1492 BT_COMP_LOGW("Failure to establish connection to url \"%s\"", url);
7cdc2bab 1493error:
487e99c4
PP
1494 if (viewer_connection) {
1495 live_viewer_connection_destroy(viewer_connection);
1496 }
1497
7cdc2bab
MD
1498 return NULL;
1499}
1500
1501BT_HIDDEN
bb18709b
FD
1502void live_viewer_connection_destroy(
1503 struct live_viewer_connection *viewer_connection)
7cdc2bab 1504{
5278d5ba 1505 BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str);
7cdc2bab 1506 lttng_live_disconnect_viewer(viewer_connection);
487e99c4
PP
1507 if (viewer_connection->url) {
1508 g_string_free(viewer_connection->url, true);
1509 }
94b828f3 1510 if (viewer_connection->relay_hostname) {
bb18709b 1511 g_string_free(viewer_connection->relay_hostname, true);
94b828f3
MD
1512 }
1513 if (viewer_connection->target_hostname) {
bb18709b 1514 g_string_free(viewer_connection->target_hostname, true);
94b828f3
MD
1515 }
1516 if (viewer_connection->session_name) {
bb18709b 1517 g_string_free(viewer_connection->session_name, true);
94b828f3 1518 }
7cdc2bab 1519 g_free(viewer_connection);
a2147eb2
MJ
1520
1521 bt_socket_fini();
7cdc2bab 1522}
This page took 0.133062 seconds and 4 git commands to generate.