lib: lib-logging.c: `Babeltrace library` -> `libbabeltrace2`
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.c
CommitLineData
7cdc2bab 1/*
14f28187 2 * Copyright 2019 - Francis Deslauriers <francis.deslauriers@efficios.com>
7cdc2bab
MD
3 * Copyright 2016 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to deal
7 * in the Software without restriction, including without limitation the rights
8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 * copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 * SOFTWARE.
22 */
23
2ece7dd0 24#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
c01594de 25#define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
350ad6c1 26#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
d9c39b0a 27#include "logging/comp-logging.h"
020bc26f 28
7cdc2bab
MD
29#include <stdio.h>
30#include <stdint.h>
31#include <stdlib.h>
32#include <stdbool.h>
33#include <unistd.h>
34#include <glib.h>
35#include <inttypes.h>
7cdc2bab 36#include <sys/types.h>
7cdc2bab 37#include <fcntl.h>
7cdc2bab 38
578e048b
MJ
39#include "compat/socket.h"
40#include "compat/endian.h"
41#include "compat/compiler.h"
42#include "common/common.h"
3fadfbc0 43#include <babeltrace2/babeltrace.h>
7cdc2bab 44
14f28187 45#include "lttng-live.h"
7cdc2bab
MD
46#include "viewer-connection.h"
47#include "lttng-viewer-abi.h"
48#include "data-stream.h"
49#include "metadata.h"
50
14f28187
FD
51static
52ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
4c66436f 53 void *buf, size_t len)
7cdc2bab
MD
54{
55 ssize_t ret;
56 size_t copied = 0, to_copy = len;
14f28187
FD
57 struct lttng_live_msg_iter *lttng_live_msg_iter =
58 viewer_connection->lttng_live_msg_iter;
1cb3cdd7 59 BT_SOCKET sock = viewer_connection->control_sock;
7cdc2bab
MD
60
61 do {
1cb3cdd7 62 ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
7cdc2bab 63 if (ret > 0) {
f6ccaed9 64 BT_ASSERT(ret <= to_copy);
7cdc2bab
MD
65 copied += ret;
66 to_copy -= ret;
67 }
1cb3cdd7 68 if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
14f28187 69 if (!viewer_connection->in_query &&
9b4f9b42 70 lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
4c66436f
MD
71 break;
72 } else {
73 continue;
74 }
75 }
76 } while (ret > 0 && to_copy > 0);
710d900e
FD
77
78 if (ret > 0) {
7cdc2bab 79 ret = copied;
710d900e
FD
80 }
81
1cb3cdd7 82 /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
7cdc2bab
MD
83 return ret;
84}
85
14f28187
FD
86static
87ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
4c66436f 88 const void *buf, size_t len)
7cdc2bab 89{
14f28187
FD
90 struct lttng_live_msg_iter *lttng_live_msg_iter =
91 viewer_connection->lttng_live_msg_iter;
1cb3cdd7 92 BT_SOCKET sock = viewer_connection->control_sock;
7cdc2bab
MD
93 ssize_t ret;
94
4c66436f 95 for (;;) {
1cb3cdd7
MJ
96 ret = bt_socket_send_nosigpipe(sock, buf, len);
97 if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
14f28187 98 if (!viewer_connection->in_query &&
9b4f9b42 99 lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
4c66436f
MD
100 break;
101 } else {
102 continue;
103 }
104 } else {
105 break;
106 }
107 }
7cdc2bab
MD
108 return ret;
109}
110
14f28187
FD
111static
112int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 113{
94b828f3 114 char error_buf[256] = { 0 };
1419db2b
FD
115 bt_self_component *self_comp = viewer_connection->self_comp;
116 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
94b828f3
MD
117 struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 };
118 int ret = -1;
7cdc2bab 119 const char *path = viewer_connection->url->str;
7cdc2bab
MD
120
121 if (!path) {
122 goto end;
123 }
7cdc2bab 124
0f1979c3
FD
125 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf,
126 sizeof(error_buf));
94b828f3 127 if (!lttng_live_url_parts.proto) {
1419db2b
FD
128 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
129 self_comp_class,"Invalid LTTng live URL format: %s",
130 error_buf);
7cdc2bab
MD
131 goto end;
132 }
7cdc2bab 133
0f1979c3 134 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
94b828f3
MD
135 lttng_live_url_parts.hostname = NULL;
136
137 if (lttng_live_url_parts.port >= 0) {
138 viewer_connection->port = lttng_live_url_parts.port;
139 } else {
7cdc2bab
MD
140 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
141 }
142
0f1979c3 143 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
94b828f3
MD
144 lttng_live_url_parts.target_hostname = NULL;
145
146 if (lttng_live_url_parts.session_name) {
0f1979c3 147 viewer_connection->session_name = lttng_live_url_parts.session_name;
94b828f3 148 lttng_live_url_parts.session_name = NULL;
7cdc2bab
MD
149 }
150
2ece7dd0 151 BT_COMP_LOGI("Connecting to hostname : %s, port : %d, "
0f1979c3
FD
152 "target hostname : %s, session name : %s, proto : %s",
153 viewer_connection->relay_hostname->str,
154 viewer_connection->port,
155 !viewer_connection->target_hostname ?
156 "<none>" : viewer_connection->target_hostname->str,
157 !viewer_connection->session_name ?
158 "<none>" : viewer_connection->session_name->str,
159 lttng_live_url_parts.proto->str);
7cdc2bab
MD
160 ret = 0;
161
162end:
94b828f3 163 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
7cdc2bab
MD
164 return ret;
165}
166
14f28187
FD
167static
168int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab
MD
169{
170 struct lttng_viewer_cmd cmd;
171 struct lttng_viewer_connect connect;
1419db2b
FD
172 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
173 bt_self_component *self_comp = viewer_connection->self_comp;
bdcbd52e
JR
174 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
175 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
176 int ret;
177 ssize_t ret_len;
178
179 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
180 cmd.data_size = htobe64((uint64_t) sizeof(connect));
181 cmd.cmd_version = htobe32(0);
182
183 connect.viewer_session_id = -1ULL; /* will be set on recv */
184 connect.major = htobe32(LTTNG_LIVE_MAJOR);
185 connect.minor = htobe32(LTTNG_LIVE_MINOR);
186 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
187
bdcbd52e
JR
188 /*
189 * Merge the cmd and connection request to prevent a write-write
190 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
191 * second write to be performed quickly in presence of Nagle's algorithm
192 */
193 memcpy(cmd_buf, &cmd, sizeof(cmd));
194 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
bdcbd52e 195 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 196 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
197 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
198 self_comp_class, "Error sending version: %s",
199 bt_socket_errormsg());
7cdc2bab
MD
200 goto error;
201 }
f6ccaed9
PP
202
203 BT_ASSERT(ret_len == cmd_buf_len);
7cdc2bab 204
4c66436f 205 ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
7cdc2bab 206 if (ret_len == 0) {
1419db2b
FD
207 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
208 self_comp_class, "Remote side has closed connection");
7cdc2bab
MD
209 goto error;
210 }
1cb3cdd7 211 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
212 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
213 self_comp_class, "Error receiving version: %s",
214 bt_socket_errormsg());
7cdc2bab
MD
215 goto error;
216 }
f6ccaed9 217 BT_ASSERT(ret_len == sizeof(connect));
7cdc2bab 218
2ece7dd0 219 BT_COMP_LOGI("Received viewer session ID : %" PRIu64,
83c78c35 220 (uint64_t) be64toh(connect.viewer_session_id));
2ece7dd0 221 BT_COMP_LOGI("Relayd version : %u.%u", be32toh(connect.major),
7cdc2bab
MD
222 be32toh(connect.minor));
223
224 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
1419db2b
FD
225 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
226 self_comp_class, "Incompatible lttng-relayd protocol");
7cdc2bab
MD
227 goto error;
228 }
229 /* Use the smallest protocol version implemented. */
230 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
231 viewer_connection->minor = be32toh(connect.minor);
232 } else {
233 viewer_connection->minor = LTTNG_LIVE_MINOR;
234 }
235 viewer_connection->major = LTTNG_LIVE_MAJOR;
236 ret = 0;
237 return ret;
238
239error:
7cdc2bab
MD
240 return -1;
241}
242
14f28187
FD
243static
244int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab
MD
245{
246 struct hostent *host;
247 struct sockaddr_in server_addr;
1419db2b
FD
248 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
249 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab
MD
250 int ret;
251
252 if (parse_url(viewer_connection)) {
1419db2b
FD
253 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
254 self_comp_class, "Failed to parse URL");
7cdc2bab
MD
255 goto error;
256 }
257
94b828f3 258 host = gethostbyname(viewer_connection->relay_hostname->str);
7cdc2bab 259 if (!host) {
1419db2b
FD
260 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
261 self_comp_class, "Cannot lookup hostname: hostname=\"%s\"",
94b828f3 262 viewer_connection->relay_hostname->str);
7cdc2bab
MD
263 goto error;
264 }
265
1cb3cdd7 266 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
1419db2b
FD
267 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
268 self_comp_class, "Socket creation failed: %s", bt_socket_errormsg());
7cdc2bab
MD
269 goto error;
270 }
271
272 server_addr.sin_family = AF_INET;
273 server_addr.sin_port = htons(viewer_connection->port);
274 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
275 memset(&(server_addr.sin_zero), 0, 8);
276
277 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
1cb3cdd7 278 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
1419db2b
FD
279 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
280 self_comp_class, "Connection failed: %s",
281 bt_socket_errormsg());
7cdc2bab
MD
282 goto error;
283 }
284 if (lttng_live_handshake(viewer_connection)) {
1419db2b
FD
285 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
286 self_comp_class, "Viewer handshake failed");
7cdc2bab
MD
287 goto error;
288 }
289
290 ret = 0;
291
292 return ret;
293
294error:
1cb3cdd7
MJ
295 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
296 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
1419db2b
FD
297 BT_COMP_OR_COMP_CLASS_LOGE(self_comp, self_comp_class,
298 "Error closing socket: %s", bt_socket_errormsg());
7cdc2bab
MD
299 }
300 }
1cb3cdd7 301 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab
MD
302 return -1;
303}
304
14f28187
FD
305static
306void lttng_live_disconnect_viewer(
307 struct live_viewer_connection *viewer_connection)
7cdc2bab 308{
1cb3cdd7 309 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
7cdc2bab
MD
310 return;
311 }
1cb3cdd7 312 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
1419db2b
FD
313 BT_COMP_LOGE("Error closing socket: %s",
314 bt_socket_errormsg());
1cb3cdd7 315 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab
MD
316 }
317}
318
14f28187
FD
319static
320void connection_release(bt_object *obj)
7cdc2bab 321{
14f28187
FD
322 struct live_viewer_connection *conn =
323 container_of(obj, struct live_viewer_connection, obj);
7cdc2bab 324
14f28187 325 live_viewer_connection_destroy(conn);
7cdc2bab
MD
326}
327
328static
14f28187 329int list_update_session(bt_value *results,
7cdc2bab 330 const struct lttng_viewer_session *session,
c01594de 331 bool *_found, struct live_viewer_connection *viewer_connection)
7cdc2bab 332{
1419db2b
FD
333 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
334 bt_self_component *self_comp = viewer_connection->self_comp;
f80e9ec1
FD
335 int ret = 0;
336 uint64_t i, len;
b19ff26f
PP
337 bt_value *map = NULL;
338 bt_value *hostname = NULL;
339 bt_value *session_name = NULL;
340 bt_value *btval = NULL;
7cdc2bab
MD
341 bool found = false;
342
393729a6 343 len = bt_value_array_get_length(results);
7cdc2bab
MD
344 for (i = 0; i < len; i++) {
345 const char *hostname_str = NULL;
346 const char *session_name_str = NULL;
347
f80e9ec1 348 map = bt_value_array_borrow_element_by_index(results, i);
14f28187 349 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
7cdc2bab 350 if (!hostname) {
1419db2b
FD
351 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
352 self_comp_class,
353 "Error borrowing \"target-hostname\" entry.");
14f28187 354 ret = -1;
7cdc2bab
MD
355 goto end;
356 }
14f28187 357 session_name = bt_value_map_borrow_entry_value(map, "session-name");
7cdc2bab 358 if (!session_name) {
1419db2b
FD
359 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
360 self_comp_class,
361 "Error borrowing \"session-name\" entry.");
14f28187 362 ret = -1;
7cdc2bab
MD
363 goto end;
364 }
601b0d3c
PP
365 hostname_str = bt_value_string_get(hostname);
366 session_name_str = bt_value_string_get(session_name);
7cdc2bab 367
2242b43d
PP
368 if (strcmp(session->hostname, hostname_str) == 0
369 && strcmp(session->session_name, session_name_str) == 0) {
7cdc2bab
MD
370 int64_t val;
371 uint32_t streams = be32toh(session->streams);
372 uint32_t clients = be32toh(session->clients);
373
374 found = true;
375
14f28187 376 btval = bt_value_map_borrow_entry_value(map, "stream-count");
7cdc2bab 377 if (!btval) {
1419db2b
FD
378 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
379 self_comp, self_comp_class,
380 "Error borrowing \"stream-count\" entry.");
14f28187 381 ret = -1;
7cdc2bab
MD
382 goto end;
383 }
a91cb83e 384 val = bt_value_integer_unsigned_get(btval);
7cdc2bab
MD
385 /* sum */
386 val += streams;
a91cb83e 387 bt_value_integer_unsigned_set(btval, val);
7cdc2bab 388
14f28187 389 btval = bt_value_map_borrow_entry_value(map, "client-count");
7cdc2bab 390 if (!btval) {
1419db2b
FD
391 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(
392 self_comp, self_comp_class,
393 "Error borrowing \"client-count\" entry.");
14f28187 394 ret = -1;
7cdc2bab
MD
395 goto end;
396 }
a91cb83e 397 val = bt_value_integer_unsigned_get(btval);
7cdc2bab 398 /* max */
91d81473 399 val = bt_max_t(int64_t, clients, val);
a91cb83e 400 bt_value_integer_unsigned_set(btval, val);
7cdc2bab
MD
401 }
402
7cdc2bab
MD
403 if (found) {
404 break;
405 }
406 }
407end:
7cdc2bab
MD
408 *_found = found;
409 return ret;
410}
411
412static
14f28187 413int list_append_session(bt_value *results,
7cdc2bab 414 GString *base_url,
c01594de
PP
415 const struct lttng_viewer_session *session,
416 struct live_viewer_connection *viewer_connection)
7cdc2bab 417{
14f28187 418 int ret = 0;
1419db2b 419 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
d24d5663
PP
420 bt_value_map_insert_entry_status insert_status;
421 bt_value_array_append_element_status append_status;
b19ff26f 422 bt_value *map = NULL;
7cdc2bab
MD
423 GString *url = NULL;
424 bool found = false;
425
426 /*
427 * If the session already exists, add the stream count to it,
428 * and do max of client counts.
429 */
c01594de 430 ret = list_update_session(results, session, &found, viewer_connection);
14f28187 431 if (ret || found) {
7cdc2bab
MD
432 goto end;
433 }
434
14f28187 435 map = bt_value_map_create();
7cdc2bab 436 if (!map) {
1419db2b
FD
437 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
438 "Error creating map value.");
14f28187 439 ret = -1;
7cdc2bab
MD
440 goto end;
441 }
442
443 if (base_url->len < 1) {
1419db2b
FD
444 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
445 "Error: base_url length smaller than 1.");
14f28187 446 ret = -1;
7cdc2bab
MD
447 goto end;
448 }
449 /*
450 * key = "url",
451 * value = <string>,
452 */
453 url = g_string_new(base_url->str);
454 g_string_append(url, "/host/");
455 g_string_append(url, session->hostname);
456 g_string_append_c(url, '/');
457 g_string_append(url, session->session_name);
458
d24d5663
PP
459 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
460 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
461 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
462 "Error inserting \"url\" entry.");
14f28187 463 ret = -1;
7cdc2bab
MD
464 goto end;
465 }
466
467 /*
468 * key = "target-hostname",
469 * value = <string>,
470 */
d24d5663 471 insert_status = bt_value_map_insert_string_entry(map, "target-hostname",
7cdc2bab 472 session->hostname);
d24d5663 473 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
474 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
475 "Error inserting \"target-hostname\" entry.");
14f28187 476 ret = -1;
7cdc2bab
MD
477 goto end;
478 }
479
480 /*
481 * key = "session-name",
482 * value = <string>,
483 */
d24d5663 484 insert_status = bt_value_map_insert_string_entry(map, "session-name",
7cdc2bab 485 session->session_name);
d24d5663 486 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
487 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
488 "Error inserting \"session-name\" entry.");
14f28187 489 ret = -1;
7cdc2bab
MD
490 goto end;
491 }
492
493 /*
494 * key = "timer-us",
495 * value = <integer>,
496 */
497 {
498 uint32_t live_timer = be32toh(session->live_timer);
499
a91cb83e 500 insert_status = bt_value_map_insert_unsigned_integer_entry(
fdd3a2da 501 map, "timer-us", live_timer);
d24d5663 502 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
503 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
504 "Error inserting \"timer-us\" entry.");
14f28187 505 ret = -1;
7cdc2bab
MD
506 goto end;
507 }
508 }
509
510 /*
511 * key = "stream-count",
512 * value = <integer>,
513 */
514 {
515 uint32_t streams = be32toh(session->streams);
516
a91cb83e 517 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
fdd3a2da 518 "stream-count", streams);
d24d5663 519 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
520 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
521 "Error inserting \"stream-count\" entry.");
14f28187 522 ret = -1;
7cdc2bab
MD
523 goto end;
524 }
525 }
526
7cdc2bab
MD
527 /*
528 * key = "client-count",
529 * value = <integer>,
530 */
531 {
532 uint32_t clients = be32toh(session->clients);
533
a91cb83e 534 insert_status = bt_value_map_insert_unsigned_integer_entry(map,
fdd3a2da 535 "client-count", clients);
d24d5663 536 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
1419db2b
FD
537 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
538 "Error inserting \"client-count\" entry.");
14f28187 539 ret = -1;
7cdc2bab
MD
540 goto end;
541 }
542 }
543
d24d5663
PP
544 append_status = bt_value_array_append_element(results, map);
545 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
1419db2b
FD
546 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
547 "Error appending map to results.");
14f28187
FD
548 ret = -1;
549 }
550
7cdc2bab
MD
551end:
552 if (url) {
14f28187 553 g_string_free(url, true);
7cdc2bab 554 }
c5b9b441 555 BT_VALUE_PUT_REF_AND_RESET(map);
7cdc2bab
MD
556 return ret;
557}
558
559/*
560 * Data structure returned:
561 *
562 * {
563 * <array> = {
564 * [n] = {
565 * <map> = {
566 * {
567 * key = "url",
568 * value = <string>,
569 * },
570 * {
571 * key = "target-hostname",
572 * value = <string>,
573 * },
574 * {
575 * key = "session-name",
576 * value = <string>,
577 * },
578 * {
579 * key = "timer-us",
580 * value = <integer>,
581 * },
582 * {
583 * key = "stream-count",
584 * value = <integer>,
585 * },
586 * {
587 * key = "client-count",
588 * value = <integer>,
589 * },
590 * },
591 * }
592 * }
593 */
594
595BT_HIDDEN
d24d5663 596bt_component_class_query_method_status live_viewer_connection_list_sessions(
14f28187
FD
597 struct live_viewer_connection *viewer_connection,
598 const bt_value **user_result)
7cdc2bab 599{
1419db2b 600 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
d24d5663
PP
601 bt_component_class_query_method_status status =
602 BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
14f28187 603 bt_value *result = NULL;
7cdc2bab
MD
604 struct lttng_viewer_cmd cmd;
605 struct lttng_viewer_list_sessions list;
606 uint32_t i, sessions_count;
607 ssize_t ret_len;
608
14f28187
FD
609 result = bt_value_array_create();
610 if (!result) {
1419db2b
FD
611 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
612 "Error creating array");
d24d5663 613 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
7cdc2bab
MD
614 goto error;
615 }
616
617 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
618 cmd.data_size = htobe64((uint64_t) 0);
619 cmd.cmd_version = htobe32(0);
620
4c66436f 621 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1cb3cdd7 622 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
623 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
624 "Error sending cmd: %s", bt_socket_errormsg());
d24d5663 625 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
626 goto error;
627 }
f6ccaed9 628 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 629
4c66436f 630 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 631 if (ret_len == 0) {
1419db2b
FD
632 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
633 "Remote side has closed connection");
d24d5663 634 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
635 goto error;
636 }
1cb3cdd7 637 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
638 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
639 "Error receiving session list: %s",
640 bt_socket_errormsg());
d24d5663 641 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
642 goto error;
643 }
f6ccaed9 644 BT_ASSERT(ret_len == sizeof(list));
7cdc2bab
MD
645
646 sessions_count = be32toh(list.sessions_count);
647 for (i = 0; i < sessions_count; i++) {
648 struct lttng_viewer_session lsession;
649
14f28187
FD
650 ret_len = lttng_live_recv(viewer_connection, &lsession,
651 sizeof(lsession));
7cdc2bab 652 if (ret_len == 0) {
1419db2b
FD
653 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
654 "Remote side has closed connection");
d24d5663 655 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
656 goto error;
657 }
1cb3cdd7 658 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
659 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
660 "Error receiving session: %s",
661 bt_socket_errormsg());
d24d5663 662 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
663 goto error;
664 }
f6ccaed9 665 BT_ASSERT(ret_len == sizeof(lsession));
7cdc2bab
MD
666 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
667 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
14f28187 668 if (list_append_session(result, viewer_connection->url,
c01594de 669 &lsession, viewer_connection)) {
1419db2b
FD
670 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
671 "Error appending session");
d24d5663 672 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
7cdc2bab
MD
673 goto error;
674 }
675 }
14f28187
FD
676
677 *user_result = result;
7cdc2bab
MD
678 goto end;
679error:
14f28187 680 BT_VALUE_PUT_REF_AND_RESET(result);
7cdc2bab 681end:
14f28187 682 return status;
7cdc2bab
MD
683}
684
685static
14f28187 686int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab
MD
687{
688 struct lttng_viewer_cmd cmd;
689 struct lttng_viewer_list_sessions list;
690 struct lttng_viewer_session lsession;
691 uint32_t i, sessions_count;
692 ssize_t ret_len;
693 uint64_t session_id;
14f28187 694 struct live_viewer_connection *viewer_connection =
1419db2b
FD
695 lttng_live_msg_iter->viewer_connection;
696 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab
MD
697
698 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
699 cmd.data_size = htobe64((uint64_t) 0);
700 cmd.cmd_version = htobe32(0);
701
4c66436f 702 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1cb3cdd7 703 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
704 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s",
705 bt_socket_errormsg());
7cdc2bab
MD
706 goto error;
707 }
f6ccaed9 708 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 709
4c66436f 710 ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
7cdc2bab 711 if (ret_len == 0) {
1419db2b
FD
712 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
713 "Remote side has closed connection");
7cdc2bab
MD
714 goto error;
715 }
1cb3cdd7 716 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
717 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
718 "Error receiving session list: %s",
719 bt_socket_errormsg());
7cdc2bab
MD
720 goto error;
721 }
f6ccaed9 722 BT_ASSERT(ret_len == sizeof(list));
7cdc2bab
MD
723
724 sessions_count = be32toh(list.sessions_count);
725 for (i = 0; i < sessions_count; i++) {
4c66436f 726 ret_len = lttng_live_recv(viewer_connection,
7cdc2bab
MD
727 &lsession, sizeof(lsession));
728 if (ret_len == 0) {
1419db2b
FD
729 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
730 "Remote side has closed connection");
7cdc2bab
MD
731 goto error;
732 }
1cb3cdd7 733 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
734 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
735 "Error receiving session: %s",
736 bt_socket_errormsg());
7cdc2bab
MD
737 goto error;
738 }
f6ccaed9 739 BT_ASSERT(ret_len == sizeof(lsession));
7cdc2bab
MD
740 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
741 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
742 session_id = be64toh(lsession.id);
743
2ece7dd0 744 BT_COMP_LOGI("Adding session %" PRIu64 " hostname: %s session_name: %s",
06994c71
MD
745 session_id, lsession.hostname, lsession.session_name);
746
7cdc2bab 747 if ((strncmp(lsession.session_name,
94b828f3 748 viewer_connection->session_name->str,
1cb3cdd7 749 LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname,
94b828f3 750 viewer_connection->target_hostname->str,
1cb3cdd7 751 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
14f28187 752 if (lttng_live_add_session(lttng_live_msg_iter, session_id,
06994c71
MD
753 lsession.hostname,
754 lsession.session_name)) {
1419db2b
FD
755 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
756 "Failed to add live session");
7cdc2bab
MD
757 goto error;
758 }
759 }
760 }
761
762 return 0;
763
764error:
7cdc2bab
MD
765 return -1;
766}
767
768BT_HIDDEN
14f28187
FD
769int lttng_live_create_viewer_session(
770 struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab
MD
771{
772 struct lttng_viewer_cmd cmd;
773 struct lttng_viewer_create_session_response resp;
774 ssize_t ret_len;
14f28187 775 struct live_viewer_connection *viewer_connection =
1419db2b
FD
776 lttng_live_msg_iter->viewer_connection;
777 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab
MD
778
779 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
780 cmd.data_size = htobe64((uint64_t) 0);
781 cmd.cmd_version = htobe32(0);
782
4c66436f 783 ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
1cb3cdd7 784 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
785 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s",
786 bt_socket_errormsg());
7cdc2bab
MD
787 goto error;
788 }
f6ccaed9 789 BT_ASSERT(ret_len == sizeof(cmd));
7cdc2bab 790
4c66436f 791 ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
7cdc2bab 792 if (ret_len == 0) {
1419db2b
FD
793 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
794 "Remote side has closed connection");
7cdc2bab
MD
795 goto error;
796 }
1cb3cdd7 797 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
798 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
799 "Error receiving create session reply: %s",
800 bt_socket_errormsg());
7cdc2bab
MD
801 goto error;
802 }
f6ccaed9 803 BT_ASSERT(ret_len == sizeof(resp));
7cdc2bab
MD
804
805 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
1419db2b
FD
806 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
807 "Error creating viewer session");
7cdc2bab
MD
808 goto error;
809 }
14f28187 810 if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
1419db2b
FD
811 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
812 "Failed to query live viewer session ids");
7cdc2bab
MD
813 goto error;
814 }
815
816 return 0;
817
818error:
819 return -1;
820}
821
822static
823int receive_streams(struct lttng_live_session *session,
824 uint32_t stream_count)
825{
826 ssize_t ret_len;
827 uint32_t i;
14f28187 828 struct lttng_live_msg_iter *lttng_live_msg_iter =
0f1979c3 829 session->lttng_live_msg_iter;
14f28187 830 struct live_viewer_connection *viewer_connection =
0f1979c3 831 lttng_live_msg_iter->viewer_connection;
1419db2b 832 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab 833
2ece7dd0 834 BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count);
7cdc2bab
MD
835 for (i = 0; i < stream_count; i++) {
836 struct lttng_viewer_stream stream;
837 struct lttng_live_stream_iterator *live_stream;
838 uint64_t stream_id;
839 uint64_t ctf_trace_id;
840
4c66436f 841 ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
7cdc2bab 842 if (ret_len == 0) {
1419db2b
FD
843 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
844 "Remote side has closed connection");
7cdc2bab
MD
845 goto error;
846 }
1cb3cdd7 847 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
848 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
849 "Error receiving stream");
7cdc2bab
MD
850 goto error;
851 }
f6ccaed9 852 BT_ASSERT(ret_len == sizeof(stream));
7cdc2bab
MD
853 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
854 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
855 stream_id = be64toh(stream.id);
856 ctf_trace_id = be64toh(stream.ctf_trace_id);
857
858 if (stream.metadata_flag) {
2ece7dd0 859 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s",
0f1979c3 860 stream_id, stream.path_name, stream.channel_name);
7cdc2bab 861 if (lttng_live_metadata_create_stream(session,
06994c71
MD
862 ctf_trace_id, stream_id,
863 stream.path_name)) {
1419db2b
FD
864 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
865 "Error creating metadata stream");
7cdc2bab
MD
866
867 goto error;
868 }
d6e69534 869 session->lazy_stream_msg_init = true;
7cdc2bab 870 } else {
2ece7dd0 871 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s",
0f1979c3 872 stream_id, stream.path_name, stream.channel_name);
7cdc2bab
MD
873 live_stream = lttng_live_stream_iterator_create(session,
874 ctf_trace_id, stream_id);
875 if (!live_stream) {
1419db2b
FD
876 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
877 "Error creating stream");
7cdc2bab
MD
878 goto error;
879 }
880 }
881 }
882 return 0;
883
884error:
885 return -1;
886}
887
888BT_HIDDEN
eee8e741
FD
889enum lttng_live_attach_session_status lttng_live_attach_session(
890 struct lttng_live_session *session)
7cdc2bab
MD
891{
892 struct lttng_viewer_cmd cmd;
eee8e741 893 enum lttng_live_attach_session_status attach_status;
7cdc2bab
MD
894 struct lttng_viewer_attach_session_request rq;
895 struct lttng_viewer_attach_session_response rp;
0f1979c3
FD
896 struct lttng_live_msg_iter *lttng_live_msg_iter =
897 session->lttng_live_msg_iter;
14f28187 898 struct live_viewer_connection *viewer_connection =
0f1979c3 899 lttng_live_msg_iter->viewer_connection;
1419db2b 900 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab
MD
901 uint64_t session_id = session->id;
902 uint32_t streams_count;
bdcbd52e
JR
903 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
904 char cmd_buf[cmd_buf_len];
0f1979c3 905 ssize_t ret_len;
7cdc2bab 906
7cdc2bab
MD
907 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
908 cmd.data_size = htobe64((uint64_t) sizeof(rq));
909 cmd.cmd_version = htobe32(0);
910
911 memset(&rq, 0, sizeof(rq));
912 rq.session_id = htobe64(session_id);
913 // TODO: add cmd line parameter to select seek beginning
914 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
915 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
916
bdcbd52e
JR
917 /*
918 * Merge the cmd and connection request to prevent a write-write
919 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
920 * second write to be performed quickly in presence of Nagle's algorithm.
921 */
922 memcpy(cmd_buf, &cmd, sizeof(cmd));
923 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 924 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 925 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
926 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
927 "Error sending attach request: %s",
928 bt_socket_errormsg());
7cdc2bab
MD
929 goto error;
930 }
7cdc2bab 931
f6ccaed9 932 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 933 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 934 if (ret_len == 0) {
1419db2b
FD
935 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
936 "Remote side has closed connection");
7cdc2bab
MD
937 goto error;
938 }
1cb3cdd7 939 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
940 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
941 "Error receiving attach response: %s",
942 bt_socket_errormsg());
7cdc2bab
MD
943 goto error;
944 }
f6ccaed9 945 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
946
947 streams_count = be32toh(rp.streams_count);
948 switch(be32toh(rp.status)) {
949 case LTTNG_VIEWER_ATTACH_OK:
950 break;
951 case LTTNG_VIEWER_ATTACH_UNK:
1419db2b
FD
952 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
953 "Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
954 goto error;
955 case LTTNG_VIEWER_ATTACH_ALREADY:
1419db2b
FD
956 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
957 "There is already a viewer attached to this session");
7cdc2bab
MD
958 goto error;
959 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1419db2b 960 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
7cdc2bab
MD
961 goto error;
962 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1419db2b 963 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
7cdc2bab
MD
964 goto error;
965 default:
1419db2b
FD
966 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
967 "Unknown attach return code %u", be32toh(rp.status));
7cdc2bab
MD
968 goto error;
969 }
970
971 /* We receive the initial list of streams. */
972 if (receive_streams(session, streams_count)) {
1419db2b 973 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
7cdc2bab
MD
974 goto error;
975 }
976
977 session->attached = true;
978 session->new_streams_needed = false;
979
eee8e741
FD
980 attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_OK;
981 goto end;
7cdc2bab
MD
982
983error:
eee8e741
FD
984 attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_ERROR;
985
986end:
987 return attach_status;
7cdc2bab
MD
988}
989
990BT_HIDDEN
991int lttng_live_detach_session(struct lttng_live_session *session)
992{
993 struct lttng_viewer_cmd cmd;
994 struct lttng_viewer_detach_session_request rq;
995 struct lttng_viewer_detach_session_response rp;
996 ssize_t ret_len;
0f1979c3
FD
997 struct lttng_live_msg_iter *lttng_live_msg_iter =
998 session->lttng_live_msg_iter;
14f28187 999 struct live_viewer_connection *viewer_connection =
0f1979c3 1000 lttng_live_msg_iter->viewer_connection;
7cdc2bab 1001 uint64_t session_id = session->id;
bdcbd52e
JR
1002 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1003 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
1004
1005 if (!session->attached) {
1006 return 0;
1007 }
1008
1009 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1010 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1011 cmd.cmd_version = htobe32(0);
1012
1013 memset(&rq, 0, sizeof(rq));
1014 rq.session_id = htobe64(session_id);
1015
bdcbd52e
JR
1016 /*
1017 * Merge the cmd and connection request to prevent a write-write
1018 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1019 * second write to be performed quickly in presence of Nagle's algorithm.
1020 */
1021 memcpy(cmd_buf, &cmd, sizeof(cmd));
1022 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 1023 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 1024 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1025 BT_COMP_LOGE("Error sending detach request: %s",
1026 bt_socket_errormsg());
7cdc2bab
MD
1027 goto error;
1028 }
7cdc2bab 1029
f6ccaed9 1030 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1031 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1032 if (ret_len == 0) {
1419db2b 1033 BT_COMP_LOGE("Remote side has closed connection");
7cdc2bab
MD
1034 goto error;
1035 }
1cb3cdd7 1036 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1037 BT_COMP_LOGE("Error receiving detach response: %s",
1038 bt_socket_errormsg());
7cdc2bab
MD
1039 goto error;
1040 }
f6ccaed9 1041 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1042
1043 switch(be32toh(rp.status)) {
1044 case LTTNG_VIEWER_DETACH_SESSION_OK:
1045 break;
1046 case LTTNG_VIEWER_DETACH_SESSION_UNK:
2ece7dd0 1047 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
7cdc2bab
MD
1048 goto error;
1049 case LTTNG_VIEWER_DETACH_SESSION_ERR:
2ece7dd0 1050 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
7cdc2bab
MD
1051 goto error;
1052 default:
2ece7dd0 1053 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
7cdc2bab
MD
1054 goto error;
1055 }
1056
1057 session->attached = false;
1058
1059 return 0;
1060
1061error:
1062 return -1;
1063}
1064
1065BT_HIDDEN
c28512ab
FD
1066enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
1067 struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
7cdc2bab
MD
1068{
1069 uint64_t len = 0;
c28512ab 1070 enum lttng_live_get_one_metadata_status metadata_status;
7cdc2bab
MD
1071 struct lttng_viewer_cmd cmd;
1072 struct lttng_viewer_get_metadata rq;
1073 struct lttng_viewer_metadata_packet rp;
1074 char *data = NULL;
1075 ssize_t ret_len;
1076 struct lttng_live_session *session = trace->session;
0f1979c3
FD
1077 struct lttng_live_msg_iter *lttng_live_msg_iter =
1078 session->lttng_live_msg_iter;
7cdc2bab 1079 struct lttng_live_metadata *metadata = trace->metadata;
14f28187 1080 struct live_viewer_connection *viewer_connection =
0f1979c3 1081 lttng_live_msg_iter->viewer_connection;
1419db2b 1082 bt_self_component *self_comp = viewer_connection->self_comp;
bdcbd52e
JR
1083 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1084 char cmd_buf[cmd_buf_len];
7cdc2bab
MD
1085
1086 rq.stream_id = htobe64(metadata->stream_id);
1087 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1088 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1089 cmd.cmd_version = htobe32(0);
1090
bdcbd52e
JR
1091 /*
1092 * Merge the cmd and connection request to prevent a write-write
1093 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1094 * second write to be performed quickly in presence of Nagle's algorithm.
1095 */
1096 memcpy(cmd_buf, &cmd, sizeof(cmd));
1097 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 1098 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 1099 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1100 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1101 "Error sending get_metadata request: %s",
1102 bt_socket_errormsg());
7cdc2bab
MD
1103 goto error;
1104 }
7cdc2bab 1105
f6ccaed9 1106 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1107 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1108 if (ret_len == 0) {
1419db2b
FD
1109 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1110 "Remote side has closed connection");
7cdc2bab
MD
1111 goto error;
1112 }
1cb3cdd7 1113 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1114 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1115 "Error receiving get_metadata response: %s",
1116 bt_socket_errormsg());
7cdc2bab
MD
1117 goto error;
1118 }
f6ccaed9 1119 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1120
1121 switch (be32toh(rp.status)) {
1122 case LTTNG_VIEWER_METADATA_OK:
1419db2b 1123 BT_COMP_LOGD("Received get_metadata response: ok");
7cdc2bab
MD
1124 break;
1125 case LTTNG_VIEWER_NO_NEW_METADATA:
c28512ab
FD
1126 BT_COMP_LOGD("Received get_metadata response: no new");
1127 metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
7cdc2bab
MD
1128 goto end;
1129 case LTTNG_VIEWER_METADATA_ERR:
c28512ab
FD
1130 /*
1131 * The Relayd cannot find this stream id. Maybe its
1132 * gone already. This can happen in short lived UST app
1133 * in a per-pid session.
1134 */
1135 BT_COMP_LOGD("Received get_metadata response: error");
1136 metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1137 goto end;
7cdc2bab 1138 default:
1419db2b
FD
1139 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1140 "Received get_metadata response: unknown");
7cdc2bab
MD
1141 goto error;
1142 }
1143
1144 len = be64toh(rp.len);
2ece7dd0 1145 BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len);
7cdc2bab 1146 if (len <= 0) {
1419db2b
FD
1147 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1148 "Erroneous response length");
7cdc2bab
MD
1149 goto error;
1150 }
1151
91d81473 1152 data = calloc(1, len);
7cdc2bab 1153 if (!data) {
1419db2b
FD
1154 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
1155 "Failed to allocate data buffer", ".");
7cdc2bab
MD
1156 goto error;
1157 }
4c66436f 1158 ret_len = lttng_live_recv(viewer_connection, data, len);
7cdc2bab 1159 if (ret_len == 0) {
2ece7dd0 1160 BT_COMP_LOGI("Remote side has closed connection");
7cdc2bab
MD
1161 goto error_free_data;
1162 }
1cb3cdd7 1163 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1164 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1165 "Error receiving trace packet: %s", bt_socket_errormsg());
7cdc2bab
MD
1166 goto error_free_data;
1167 }
f6ccaed9 1168 BT_ASSERT(ret_len == len);
7cdc2bab
MD
1169
1170 do {
1171 ret_len = fwrite(data, 1, len, fp);
1172 } while (ret_len < 0 && errno == EINTR);
1173 if (ret_len < 0) {
1419db2b
FD
1174 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1175 "Writing in the metadata file stream");
7cdc2bab
MD
1176 goto error_free_data;
1177 }
f6ccaed9 1178 BT_ASSERT(ret_len == len);
7cdc2bab 1179 free(data);
c28512ab
FD
1180 *reply_len = len;
1181 metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
1182
1183 goto end;
7cdc2bab
MD
1184
1185error_free_data:
1186 free(data);
1187error:
c28512ab
FD
1188 metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1189
1190end:
1191 return metadata_status;
7cdc2bab
MD
1192}
1193
1194/*
1195 * Assign the fields from a lttng_viewer_index to a packet_index.
1196 */
1197static
1198void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1199 struct packet_index *pindex)
1200{
f6ccaed9
PP
1201 BT_ASSERT(lindex);
1202 BT_ASSERT(pindex);
7cdc2bab
MD
1203
1204 pindex->offset = be64toh(lindex->offset);
1205 pindex->packet_size = be64toh(lindex->packet_size);
1206 pindex->content_size = be64toh(lindex->content_size);
1207 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1208 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1209 pindex->events_discarded = be64toh(lindex->events_discarded);
1210}
1211
1212BT_HIDDEN
14f28187
FD
1213enum lttng_live_iterator_status lttng_live_get_next_index(
1214 struct lttng_live_msg_iter *lttng_live_msg_iter,
7cdc2bab
MD
1215 struct lttng_live_stream_iterator *stream,
1216 struct packet_index *index)
1217{
1218 struct lttng_viewer_cmd cmd;
1219 struct lttng_viewer_get_next_index rq;
7cdc2bab 1220 struct lttng_viewer_index rp;
0f1979c3 1221 enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK;
14f28187 1222 struct live_viewer_connection *viewer_connection =
1419db2b
FD
1223 lttng_live_msg_iter->viewer_connection;
1224 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab 1225 struct lttng_live_trace *trace = stream->trace;
bdcbd52e
JR
1226 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1227 char cmd_buf[cmd_buf_len];
0f1979c3
FD
1228 uint32_t flags, status;
1229 ssize_t ret_len;
7cdc2bab
MD
1230
1231 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1232 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1233 cmd.cmd_version = htobe32(0);
1234
bdcbd52e 1235
7cdc2bab
MD
1236 memset(&rq, 0, sizeof(rq));
1237 rq.stream_id = htobe64(stream->viewer_stream_id);
1238
bdcbd52e
JR
1239 /*
1240 * Merge the cmd and connection request to prevent a write-write
1241 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1242 * second write to be performed quickly in presence of Nagle's algorithm.
1243 */
1244 memcpy(cmd_buf, &cmd, sizeof(cmd));
1245 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 1246 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 1247 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1248 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1249 "Error sending get_next_index request: %s",
1250 bt_socket_errormsg());
7cdc2bab
MD
1251 goto error;
1252 }
7cdc2bab 1253
f6ccaed9 1254 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1255 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1256 if (ret_len == 0) {
1419db2b
FD
1257 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1258 "Remote side has closed connection");
7cdc2bab
MD
1259 goto error;
1260 }
1cb3cdd7 1261 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1262 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1263 "Error receiving get_next_index response: %s",
1264 bt_socket_errormsg());
7cdc2bab
MD
1265 goto error;
1266 }
f6ccaed9 1267 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1268
1269 flags = be32toh(rp.flags);
1270 status = be32toh(rp.status);
1271
1272 switch (status) {
1273 case LTTNG_VIEWER_INDEX_INACTIVE:
1274 {
1275 uint64_t ctf_stream_class_id;
1276
1419db2b 1277 BT_COMP_LOGD("Received get_next_index response: inactive");
7cdc2bab
MD
1278 memset(index, 0, sizeof(struct packet_index));
1279 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
14f28187 1280 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
7cdc2bab
MD
1281 ctf_stream_class_id = be64toh(rp.stream_id);
1282 if (stream->ctf_stream_class_id != -1ULL) {
f6ccaed9 1283 BT_ASSERT(stream->ctf_stream_class_id ==
7cdc2bab
MD
1284 ctf_stream_class_id);
1285 } else {
1286 stream->ctf_stream_class_id = ctf_stream_class_id;
1287 }
1288 stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
1289 break;
1290 }
1291 case LTTNG_VIEWER_INDEX_OK:
1292 {
1293 uint64_t ctf_stream_class_id;
1294
1419db2b 1295 BT_COMP_LOGD("Received get_next_index response: OK");
7cdc2bab
MD
1296 lttng_index_to_packet_index(&rp, index);
1297 ctf_stream_class_id = be64toh(rp.stream_id);
1298 if (stream->ctf_stream_class_id != -1ULL) {
f6ccaed9 1299 BT_ASSERT(stream->ctf_stream_class_id ==
7cdc2bab
MD
1300 ctf_stream_class_id);
1301 } else {
1302 stream->ctf_stream_class_id = ctf_stream_class_id;
1303 }
1304
1305 stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA;
7cdc2bab
MD
1306
1307 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1419db2b 1308 BT_COMP_LOGD("Received get_next_index response: new metadata needed");
7cdc2bab
MD
1309 trace->new_metadata_needed = true;
1310 }
1311 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1419db2b 1312 BT_COMP_LOGD("Received get_next_index response: new streams needed");
14f28187 1313 lttng_live_need_new_streams(lttng_live_msg_iter);
7cdc2bab
MD
1314 }
1315 break;
1316 }
1317 case LTTNG_VIEWER_INDEX_RETRY:
1419db2b 1318 BT_COMP_LOGD("Received get_next_index response: retry");
7cdc2bab 1319 memset(index, 0, sizeof(struct packet_index));
14f28187 1320 retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
7cdc2bab
MD
1321 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1322 goto end;
1323 case LTTNG_VIEWER_INDEX_HUP:
1419db2b 1324 BT_COMP_LOGD("Received get_next_index response: stream hung up");
7cdc2bab
MD
1325 memset(index, 0, sizeof(struct packet_index));
1326 index->offset = EOF;
14f28187 1327 retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab 1328 stream->state = LTTNG_LIVE_STREAM_EOF;
4a39caef 1329 stream->has_stream_hung_up = true;
7cdc2bab
MD
1330 break;
1331 case LTTNG_VIEWER_INDEX_ERR:
1419db2b 1332 BT_COMP_LOGD("Received get_next_index response: error");
7cdc2bab
MD
1333 memset(index, 0, sizeof(struct packet_index));
1334 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1335 goto error;
1336 default:
1419db2b 1337 BT_COMP_LOGD("Received get_next_index response: unknown value");
7cdc2bab
MD
1338 memset(index, 0, sizeof(struct packet_index));
1339 stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
1340 goto error;
1341 }
1342end:
1343 return retstatus;
1344
1345error:
9b4f9b42 1346 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
14f28187 1347 retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 1348 } else {
14f28187 1349 retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
4c66436f 1350 }
7cdc2bab
MD
1351 return retstatus;
1352}
1353
1354BT_HIDDEN
14f28187
FD
1355enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
1356 struct lttng_live_msg_iter *lttng_live_msg_iter,
1357 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1358 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1359{
d6e69534 1360 enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK;
0f1979c3 1361 struct lttng_viewer_trace_packet rp;
7cdc2bab
MD
1362 struct lttng_viewer_cmd cmd;
1363 struct lttng_viewer_get_packet rq;
14f28187 1364 struct live_viewer_connection *viewer_connection =
1419db2b
FD
1365 lttng_live_msg_iter->viewer_connection;
1366 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab 1367 struct lttng_live_trace *trace = stream->trace;
bdcbd52e
JR
1368 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1369 char cmd_buf[cmd_buf_len];
0f1979c3
FD
1370 uint32_t flags, status;
1371 ssize_t ret_len;
7cdc2bab 1372
2ece7dd0 1373 BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
7cdc2bab
MD
1374 offset, req_len);
1375 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1376 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1377 cmd.cmd_version = htobe32(0);
1378
1379 memset(&rq, 0, sizeof(rq));
1380 rq.stream_id = htobe64(stream->viewer_stream_id);
1381 rq.offset = htobe64(offset);
1382 rq.len = htobe32(req_len);
1383
bdcbd52e
JR
1384 /*
1385 * Merge the cmd and connection request to prevent a write-write
1386 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1387 * second write to be performed quickly in presence of Nagle's algorithm.
1388 */
1389 memcpy(cmd_buf, &cmd, sizeof(cmd));
1390 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 1391 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 1392 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1393 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1394 "Error sending get_data_packet request: %s",
1395 bt_socket_errormsg());
7cdc2bab
MD
1396 goto error;
1397 }
7cdc2bab 1398
f6ccaed9 1399 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1400 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1401 if (ret_len == 0) {
1419db2b
FD
1402 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1403 "Remote side has closed connection");
7cdc2bab
MD
1404 goto error;
1405 }
1cb3cdd7 1406 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1407 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1408 "Socket error receiving get_data response: %s",
1409 bt_socket_errormsg());
7cdc2bab
MD
1410 goto error;
1411 }
1412 if (ret_len != sizeof(rp)) {
1419db2b
FD
1413 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "get_data_packet: "
1414 "expected %zu, received %zd", sizeof(rp), ret_len);
7cdc2bab
MD
1415 goto error;
1416 }
1417
1418 flags = be32toh(rp.flags);
1419 status = be32toh(rp.status);
1420
1421 switch (status) {
1422 case LTTNG_VIEWER_GET_PACKET_OK:
1423 req_len = be32toh(rp.len);
1419db2b
FD
1424 BT_COMP_LOGD("Received get_data_packet response: Ok, "
1425 "packet size : %" PRIu64 "", req_len);
7cdc2bab
MD
1426 break;
1427 case LTTNG_VIEWER_GET_PACKET_RETRY:
1428 /* Unimplemented by relay daemon */
1419db2b 1429 BT_COMP_LOGD("Received get_data_packet response: retry");
d6e69534 1430 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
7cdc2bab
MD
1431 goto end;
1432 case LTTNG_VIEWER_GET_PACKET_ERR:
1433 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
2ece7dd0 1434 BT_COMP_LOGD("get_data_packet: new metadata needed, try again later");
7cdc2bab
MD
1435 trace->new_metadata_needed = true;
1436 }
1437 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
2ece7dd0 1438 BT_COMP_LOGD("get_data_packet: new streams needed, try again later");
14f28187 1439 lttng_live_need_new_streams(lttng_live_msg_iter);
7cdc2bab
MD
1440 }
1441 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
1442 | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
d6e69534 1443 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
7cdc2bab
MD
1444 goto end;
1445 }
1419db2b
FD
1446 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1447 "Received get_data_packet response: error");
7cdc2bab
MD
1448 goto error;
1449 case LTTNG_VIEWER_GET_PACKET_EOF:
d6e69534 1450 retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF;
7cdc2bab
MD
1451 goto end;
1452 default:
1419db2b
FD
1453 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1454 "Received get_data_packet response: unknown");
7cdc2bab
MD
1455 goto error;
1456 }
1457
1458 if (req_len == 0) {
1459 goto error;
1460 }
1461
4c66436f 1462 ret_len = lttng_live_recv(viewer_connection, buf, req_len);
7cdc2bab 1463 if (ret_len == 0) {
1419db2b
FD
1464 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1465 "Remote side has closed connection");
7cdc2bab
MD
1466 goto error;
1467 }
1cb3cdd7 1468 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1469 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1470 "Error receiving trace packet: %s",
1471 bt_socket_errormsg());
7cdc2bab
MD
1472 goto error;
1473 }
f6ccaed9 1474 BT_ASSERT(ret_len == req_len);
7cdc2bab
MD
1475 *recv_len = ret_len;
1476end:
1477 return retstatus;
1478
1479error:
9b4f9b42 1480 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
d6e69534 1481 retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
4c66436f 1482 } else {
d6e69534 1483 retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
4c66436f 1484 }
7cdc2bab
MD
1485 return retstatus;
1486}
1487
1488/*
1489 * Request new streams for a session.
1490 */
1491BT_HIDDEN
14f28187 1492enum lttng_live_iterator_status lttng_live_get_new_streams(
7cdc2bab
MD
1493 struct lttng_live_session *session)
1494{
14f28187
FD
1495 enum lttng_live_iterator_status status =
1496 LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
1497 struct lttng_viewer_cmd cmd;
1498 struct lttng_viewer_new_streams_request rq;
1499 struct lttng_viewer_new_streams_response rp;
14f28187
FD
1500 struct lttng_live_msg_iter *lttng_live_msg_iter =
1501 session->lttng_live_msg_iter;
1502 struct live_viewer_connection *viewer_connection =
1503 lttng_live_msg_iter->viewer_connection;
1419db2b 1504 bt_self_component *self_comp = viewer_connection->self_comp;
7cdc2bab 1505 uint32_t streams_count;
bdcbd52e
JR
1506 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1507 char cmd_buf[cmd_buf_len];
0f1979c3 1508 ssize_t ret_len;
7cdc2bab
MD
1509
1510 if (!session->new_streams_needed) {
14f28187 1511 return LTTNG_LIVE_ITERATOR_STATUS_OK;
7cdc2bab
MD
1512 }
1513
1514 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1515 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1516 cmd.cmd_version = htobe32(0);
1517
1518 memset(&rq, 0, sizeof(rq));
1519 rq.session_id = htobe64(session->id);
1520
bdcbd52e
JR
1521 /*
1522 * Merge the cmd and connection request to prevent a write-write
1523 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1524 * second write to be performed quickly in presence of Nagle's algorithm.
1525 */
1526 memcpy(cmd_buf, &cmd, sizeof(cmd));
1527 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
bdcbd52e 1528 ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1cb3cdd7 1529 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1530 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1531 "Error sending get_new_streams request: %s",
1532 bt_socket_errormsg());
7cdc2bab
MD
1533 goto error;
1534 }
7cdc2bab 1535
f6ccaed9 1536 BT_ASSERT(ret_len == cmd_buf_len);
4c66436f 1537 ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
7cdc2bab 1538 if (ret_len == 0) {
1419db2b
FD
1539 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1540 "Remote side has closed connection");
7cdc2bab
MD
1541 goto error;
1542 }
1cb3cdd7 1543 if (ret_len == BT_SOCKET_ERROR) {
1419db2b
FD
1544 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1545 "Error receiving get_new_streams response");
7cdc2bab
MD
1546 goto error;
1547 }
f6ccaed9 1548 BT_ASSERT(ret_len == sizeof(rp));
7cdc2bab
MD
1549
1550 streams_count = be32toh(rp.streams_count);
1551
1552 switch(be32toh(rp.status)) {
1553 case LTTNG_VIEWER_NEW_STREAMS_OK:
1554 session->new_streams_needed = false;
1555 break;
1556 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1557 session->new_streams_needed = false;
1558 goto end;
1559 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1560 session->new_streams_needed = false;
1561 session->closed = true;
14f28187 1562 status = LTTNG_LIVE_ITERATOR_STATUS_END;
7cdc2bab
MD
1563 goto end;
1564 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1419db2b 1565 BT_COMP_LOGD("Received get_new_streams response: error");
7cdc2bab
MD
1566 goto error;
1567 default:
1419db2b
FD
1568 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1569 "Received get_new_streams response: Unknown:"
1570 "return code %u", be32toh(rp.status));
7cdc2bab
MD
1571 goto error;
1572 }
1573
1574 if (receive_streams(session, streams_count)) {
1419db2b 1575 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
7cdc2bab
MD
1576 goto error;
1577 }
1578end:
1579 return status;
1580
1581error:
9b4f9b42 1582 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
14f28187 1583 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
4c66436f 1584 } else {
14f28187 1585 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1419db2b 1586 BT_COMP_LOGE("Error receiving streams.");
4c66436f 1587 }
7cdc2bab
MD
1588 return status;
1589}
1590
1591BT_HIDDEN
14f28187
FD
1592struct live_viewer_connection *live_viewer_connection_create(
1593 const char *url, bool in_query,
550004b4 1594 struct lttng_live_msg_iter *lttng_live_msg_iter,
1419db2b
FD
1595 bt_self_component *self_comp,
1596 bt_self_component_class *self_comp_class,
550004b4 1597 bt_logging_level log_level)
7cdc2bab 1598{
14f28187 1599 struct live_viewer_connection *viewer_connection;
7cdc2bab 1600
14f28187 1601 viewer_connection = g_new0(struct live_viewer_connection, 1);
7cdc2bab 1602
550004b4 1603 if (bt_socket_init(log_level) != 0) {
1419db2b
FD
1604 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1605 self_comp_class, "Failed to init socket");
1cb3cdd7
MJ
1606 goto error;
1607 }
1608
550004b4
PP
1609 viewer_connection->log_level = log_level;
1610
1419db2b
FD
1611 viewer_connection->self_comp = self_comp;
1612 viewer_connection->self_comp_class = self_comp_class;
550004b4 1613
14f28187 1614 bt_object_init_shared(&viewer_connection->obj, connection_release);
1cb3cdd7 1615 viewer_connection->control_sock = BT_INVALID_SOCKET;
7cdc2bab 1616 viewer_connection->port = -1;
14f28187
FD
1617 viewer_connection->in_query = in_query;
1618 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
7cdc2bab
MD
1619 viewer_connection->url = g_string_new(url);
1620 if (!viewer_connection->url) {
1419db2b
FD
1621 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1622 self_comp_class, "Failed to allocate URL buffer");
7cdc2bab
MD
1623 goto error;
1624 }
1625
2ece7dd0 1626 BT_COMP_LOGI("Establishing connection to url \"%s\"...", url);
7cdc2bab 1627 if (lttng_live_connect_viewer(viewer_connection)) {
1419db2b
FD
1628 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
1629 self_comp_class, "Failed to establish connection: "
1630 "url=\"%s\"", url);
1631 goto error;
7cdc2bab 1632 }
2ece7dd0 1633 BT_COMP_LOGI("Connection to url \"%s\" is established", url);
7cdc2bab
MD
1634 return viewer_connection;
1635
7cdc2bab 1636error:
2f767475
PP
1637 if (viewer_connection) {
1638 live_viewer_connection_destroy(viewer_connection);
1639 }
1640
7cdc2bab
MD
1641 return NULL;
1642}
1643
1644BT_HIDDEN
14f28187
FD
1645void live_viewer_connection_destroy(
1646 struct live_viewer_connection *viewer_connection)
7cdc2bab 1647{
2ece7dd0 1648 BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str);
7cdc2bab 1649 lttng_live_disconnect_viewer(viewer_connection);
2f767475
PP
1650 if (viewer_connection->url) {
1651 g_string_free(viewer_connection->url, true);
1652 }
94b828f3 1653 if (viewer_connection->relay_hostname) {
14f28187 1654 g_string_free(viewer_connection->relay_hostname, true);
94b828f3
MD
1655 }
1656 if (viewer_connection->target_hostname) {
14f28187 1657 g_string_free(viewer_connection->target_hostname, true);
94b828f3
MD
1658 }
1659 if (viewer_connection->session_name) {
14f28187 1660 g_string_free(viewer_connection->session_name, true);
94b828f3 1661 }
7cdc2bab 1662 g_free(viewer_connection);
1cb3cdd7
MJ
1663
1664 bt_socket_fini();
7cdc2bab 1665}
This page took 0.141899 seconds and 4 git commands to generate.