src.ctf.lttng-live: make live_viewer_connection::{relay_hostname,target_hostname...
[deliverable/babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7cdc2bab
MD
6 */
7
27a14e13
SM
8#define BT_CLOG_CFG (logCfg)
9#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
020bc26f 10
3c22a242
FD
11#include <fcntl.h>
12#include <stdbool.h>
7cdc2bab 13#include <stdint.h>
3c22a242 14#include <stdio.h>
7cdc2bab 15#include <stdlib.h>
3c22a242 16#include <sys/types.h>
7cdc2bab 17#include <unistd.h>
3c22a242 18
7cdc2bab 19#include <glib.h>
7cdc2bab 20
578e048b
MJ
21#include "compat/socket.h"
22#include "compat/endian.h"
23#include "compat/compiler.h"
24#include "common/common.h"
3fadfbc0 25#include <babeltrace2/babeltrace.h>
7cdc2bab 26
087cd0f5
SM
27#include "lttng-live.hpp"
28#include "viewer-connection.hpp"
29#include "lttng-viewer-abi.hpp"
30#include "data-stream.hpp"
31#include "metadata.hpp"
27a14e13 32#include "cpp-common/cfg-logging-error-reporting.hpp"
7cdc2bab 33
27a14e13 34#define viewer_handle_send_recv_status(_logCfg, _status, _action, _msg_str) \
4164020e
SM
35 do { \
36 switch (_status) { \
37 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
38 break; \
39 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
27a14e13 40 BT_CLOGE_APPEND_CAUSE("Error " _action " " _msg_str); \
4164020e
SM
41 break; \
42 default: \
43 bt_common_abort(); \
44 } \
45 } while (0)
46
27a14e13
SM
47#define viewer_handle_send_status(_logCfg, _status, _msg_str) \
48 viewer_handle_send_recv_status(_logCfg, _status, "sending", _msg_str)
4164020e 49
27a14e13
SM
50#define viewer_handle_recv_status(_logCfg, _status, _msg_str) \
51 viewer_handle_send_recv_status(_logCfg, _status, "receiving", _msg_str)
4164020e 52
27a14e13 53#define LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO(_msg, _fmt, ...) \
4164020e 54 do { \
27a14e13 55 BT_CLOGE_APPEND_CAUSE(_msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \
4164020e
SM
56 } while (0)
57
58static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
f93afbf9 59{
4164020e
SM
60 switch (cmd) {
61 case LTTNG_VIEWER_CONNECT:
62 return "CONNECT";
63 case LTTNG_VIEWER_LIST_SESSIONS:
64 return "LIST_SESSIONS";
65 case LTTNG_VIEWER_ATTACH_SESSION:
66 return "ATTACH_SESSION";
67 case LTTNG_VIEWER_GET_NEXT_INDEX:
68 return "GET_NEXT_INDEX";
69 case LTTNG_VIEWER_GET_PACKET:
70 return "GET_PACKET";
71 case LTTNG_VIEWER_GET_METADATA:
72 return "GET_METADATA";
73 case LTTNG_VIEWER_GET_NEW_STREAMS:
74 return "GET_NEW_STREAMS";
75 case LTTNG_VIEWER_CREATE_SESSION:
76 return "CREATE_SESSION";
77 case LTTNG_VIEWER_DETACH_SESSION:
78 return "DETACH_SESSION";
79 }
80
81 bt_common_abort();
f93afbf9
FD
82}
83
4164020e
SM
84static const char *
85lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
f93afbf9 86{
4164020e
SM
87 switch (code) {
88 case LTTNG_VIEWER_INDEX_OK:
89 return "INDEX_OK";
90 case LTTNG_VIEWER_INDEX_RETRY:
91 return "INDEX_RETRY";
92 case LTTNG_VIEWER_INDEX_HUP:
93 return "INDEX_HUP";
94 case LTTNG_VIEWER_INDEX_ERR:
95 return "INDEX_ERR";
96 case LTTNG_VIEWER_INDEX_INACTIVE:
97 return "INDEX_INACTIVE";
98 case LTTNG_VIEWER_INDEX_EOF:
99 return "INDEX_EOF";
100 }
101
102 bt_common_abort();
f93afbf9
FD
103}
104
4164020e 105static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
087cd0f5 106{
4164020e 107 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
087cd0f5
SM
108}
109
4164020e
SM
110static const char *
111lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
f93afbf9 112{
4164020e
SM
113 switch (code) {
114 case LTTNG_VIEWER_GET_PACKET_OK:
115 return "GET_PACKET_OK";
116 case LTTNG_VIEWER_GET_PACKET_RETRY:
117 return "GET_PACKET_RETRY";
118 case LTTNG_VIEWER_GET_PACKET_ERR:
119 return "GET_PACKET_ERR";
120 case LTTNG_VIEWER_GET_PACKET_EOF:
121 return "GET_PACKET_EOF";
122 }
123
124 bt_common_abort();
f93afbf9
FD
125};
126
4164020e 127static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
087cd0f5 128{
4164020e 129 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
087cd0f5
SM
130}
131
4164020e 132static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
f93afbf9 133{
4164020e
SM
134 switch (seek) {
135 case LTTNG_VIEWER_SEEK_BEGINNING:
136 return "SEEK_BEGINNING";
137 case LTTNG_VIEWER_SEEK_LAST:
138 return "SEEK_LAST";
139 }
140
141 bt_common_abort();
f93afbf9
FD
142}
143
4164020e
SM
144static inline enum lttng_live_iterator_status
145viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 146{
4164020e
SM
147 switch (viewer_status) {
148 case LTTNG_LIVE_VIEWER_STATUS_OK:
149 return LTTNG_LIVE_ITERATOR_STATUS_OK;
150 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
151 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
152 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
153 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
154 }
155
156 bt_common_abort();
f79c2d7a
FD
157}
158
4164020e
SM
159static inline enum ctf_msg_iter_medium_status
160viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 161{
4164020e
SM
162 switch (viewer_status) {
163 case LTTNG_LIVE_VIEWER_STATUS_OK:
164 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
165 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
166 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
167 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
168 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
169 }
170
171 bt_common_abort();
f79c2d7a
FD
172}
173
4164020e 174static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
b197ca37 175{
4164020e
SM
176 int ret = bt_socket_close(viewer_connection->control_sock);
177 if (ret == -1) {
27a14e13
SM
178 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
179 BT_CLOGW_ERRNO("Error closing viewer connection socket: ", ".");
4164020e
SM
180 }
181
182 viewer_connection->control_sock = BT_INVALID_SOCKET;
b197ca37
FD
183}
184
f79c2d7a
FD
185/*
186 * This function receives a message from the Relay daemon.
187 * If it received the entire message, it returns _OK,
188 * If it's interrupted, it returns _INTERRUPTED,
189 * otherwise, it returns _ERROR.
190 */
4164020e
SM
191static enum lttng_live_viewer_status
192lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
7cdc2bab 193{
4164020e 194 ssize_t received;
4164020e
SM
195 size_t total_received = 0, to_receive = len;
196 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
197 enum lttng_live_viewer_status status;
198 BT_SOCKET sock = viewer_connection->control_sock;
27a14e13 199 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
200
201 /*
202 * Receive a message from the Relay.
203 */
204 do {
205 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
206 if (received == BT_SOCKET_ERROR) {
207 if (bt_socket_interrupted()) {
208 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
209 /*
210 * This interruption was due to a
211 * SIGINT and the graph is being torn
212 * down.
213 */
214 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
215 lttng_live_msg_iter->was_interrupted = true;
216 goto end;
217 } else {
218 /*
219 * A signal was received, but the graph
220 * is not being torn down. Carry on.
221 */
222 continue;
223 }
224 } else {
225 /*
226 * For any other types of socket error, close
227 * the socket and return an error.
228 */
27a14e13 229 LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO("Error receiving from Relay", ".");
4164020e
SM
230
231 viewer_connection_close_socket(viewer_connection);
232 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
233 goto end;
234 }
235 } else if (received == 0) {
236 /*
237 * The recv() call returned 0. This means the
238 * connection was orderly shutdown from the other peer.
239 * If that happens when we are trying to receive
240 * a message from it, it means something when wrong.
241 * Close the socket and return an error.
242 */
27a14e13 243 BT_CLOGE_APPEND_CAUSE("Remote side has closed connection");
4164020e
SM
244 viewer_connection_close_socket(viewer_connection);
245 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
246 goto end;
247 }
248
249 BT_ASSERT(received <= to_receive);
250 total_received += received;
251 to_receive -= received;
252
253 } while (to_receive > 0);
254
255 BT_ASSERT(total_received == len);
256 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
257
258end:
4164020e 259 return status;
7cdc2bab
MD
260}
261
f79c2d7a
FD
262/*
263 * This function sends a message to the Relay daemon.
264 * If it send the message, it returns _OK,
265 * If it's interrupted, it returns _INTERRUPTED,
266 * otherwise, it returns _ERROR.
267 */
4164020e
SM
268static enum lttng_live_viewer_status
269lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
7cdc2bab 270{
4164020e 271 enum lttng_live_viewer_status status;
4164020e
SM
272 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
273 BT_SOCKET sock = viewer_connection->control_sock;
274 size_t to_send = len;
275 ssize_t total_sent = 0;
27a14e13 276 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
277
278 do {
279 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
280 if (sent == BT_SOCKET_ERROR) {
281 if (bt_socket_interrupted()) {
282 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
283 /*
284 * This interruption was a SIGINT and
285 * the graph is being teared down.
286 */
287 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
288 lttng_live_msg_iter->was_interrupted = true;
289 goto end;
290 } else {
291 /*
292 * A signal was received, but the graph
293 * is not being teared down. Carry on.
294 */
295 continue;
296 }
297 } else {
298 /*
299 * For any other types of socket error, close
300 * the socket and return an error.
301 */
27a14e13 302 LTTNG_LIVE_CLOGE_APPEND_CAUSE_ERRNO("Error sending to Relay", ".");
4164020e
SM
303
304 viewer_connection_close_socket(viewer_connection);
305 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
306 goto end;
307 }
308 }
309
310 BT_ASSERT(sent <= to_send);
311 total_sent += sent;
312 to_send -= sent;
313
314 } while (to_send > 0);
315
316 BT_ASSERT(total_sent == len);
317 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
318
319end:
4164020e 320 return status;
7cdc2bab
MD
321}
322
4164020e 323static int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 324{
4164020e 325 char error_buf[256] = {0};
27a14e13 326 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
327 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {0};
328 int ret = -1;
4164020e 329
d6ed88fe 330 if (viewer_connection->url.empty()) {
4164020e
SM
331 goto end;
332 }
333
d6ed88fe
SM
334 lttng_live_url_parts = bt_common_parse_lttng_live_url(viewer_connection->url.c_str(), error_buf,
335 sizeof(error_buf));
4164020e 336 if (!lttng_live_url_parts.proto) {
27a14e13 337 BT_CLOGE_APPEND_CAUSE("Invalid LTTng live URL format: %s", error_buf);
4164020e
SM
338 goto end;
339 }
2ec59b43 340 viewer_connection->proto.reset(lttng_live_url_parts.proto);
4164020e
SM
341 lttng_live_url_parts.proto = NULL;
342
2ec59b43 343 viewer_connection->relay_hostname.reset(lttng_live_url_parts.hostname);
4164020e
SM
344 lttng_live_url_parts.hostname = NULL;
345
346 if (lttng_live_url_parts.port >= 0) {
347 viewer_connection->port = lttng_live_url_parts.port;
348 } else {
349 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
350 }
351
2ec59b43 352 viewer_connection->target_hostname.reset(lttng_live_url_parts.target_hostname);
4164020e
SM
353 lttng_live_url_parts.target_hostname = NULL;
354
355 if (lttng_live_url_parts.session_name) {
2ec59b43 356 viewer_connection->session_name.reset(lttng_live_url_parts.session_name);
4164020e
SM
357 lttng_live_url_parts.session_name = NULL;
358 }
359
360 ret = 0;
7cdc2bab
MD
361
362end:
4164020e
SM
363 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
364 return ret;
7cdc2bab
MD
365}
366
4164020e
SM
367static enum lttng_live_viewer_status
368lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab 369{
4164020e
SM
370 struct lttng_viewer_cmd cmd;
371 struct lttng_viewer_connect connect;
372 enum lttng_live_viewer_status status;
27a14e13 373 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
374 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
375 char cmd_buf[cmd_buf_len];
376
27a14e13
SM
377 BT_CLOGD("Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
378 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
4164020e
SM
379
380 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
381 cmd.data_size = htobe64((uint64_t) sizeof(connect));
382 cmd.cmd_version = htobe32(0);
383
384 connect.viewer_session_id = -1ULL; /* will be set on recv */
385 connect.major = htobe32(LTTNG_LIVE_MAJOR);
386 connect.minor = htobe32(LTTNG_LIVE_MINOR);
387 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
388
389 /*
390 * Merge the cmd and connection request to prevent a write-write
391 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
392 * second write to be performed quickly in presence of Nagle's algorithm
393 */
394 memcpy(cmd_buf, &cmd, sizeof(cmd));
395 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
396
397 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
398 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 399 viewer_handle_send_status(logCfg, status, "viewer connect command");
4164020e
SM
400 goto end;
401 }
402
403 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
404 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 405 viewer_handle_recv_status(logCfg, status, "viewer connect reply");
4164020e
SM
406 goto end;
407 }
408
27a14e13
SM
409 BT_CLOGI("Received viewer session ID : %" PRIu64,
410 (uint64_t) be64toh(connect.viewer_session_id));
411 BT_CLOGI("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor));
4164020e
SM
412
413 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
27a14e13 414 BT_CLOGE_APPEND_CAUSE("Incompatible lttng-relayd protocol");
4164020e
SM
415 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
416 goto end;
417 }
418 /* Use the smallest protocol version implemented. */
419 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
420 viewer_connection->minor = be32toh(connect.minor);
421 } else {
422 viewer_connection->minor = LTTNG_LIVE_MINOR;
423 }
424 viewer_connection->major = LTTNG_LIVE_MAJOR;
425
426 status = LTTNG_LIVE_VIEWER_STATUS_OK;
427
428 goto end;
f79c2d7a
FD
429
430end:
4164020e 431 return status;
7cdc2bab
MD
432}
433
4164020e
SM
434static enum lttng_live_viewer_status
435lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 436{
4164020e
SM
437 struct hostent *host;
438 struct sockaddr_in server_addr;
439 enum lttng_live_viewer_status status;
27a14e13 440 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
441
442 if (parse_url(viewer_connection)) {
27a14e13 443 BT_CLOGE_APPEND_CAUSE("Failed to parse URL");
4164020e
SM
444 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
445 goto error;
446 }
447
27a14e13
SM
448 BT_CLOGD("Connecting to hostname : %s, port : %d, "
449 "target hostname : %s, session name : %s, proto : %s",
450 viewer_connection->relay_hostname->str, viewer_connection->port,
451 !viewer_connection->target_hostname ? "<none>" :
452 viewer_connection->target_hostname->str,
453 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
454 viewer_connection->proto->str);
4164020e
SM
455
456 host = gethostbyname(viewer_connection->relay_hostname->str);
457 if (!host) {
27a14e13
SM
458 BT_CLOGE_APPEND_CAUSE("Cannot lookup hostname: hostname=\"%s\"",
459 viewer_connection->relay_hostname->str);
4164020e
SM
460 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
461 goto error;
462 }
463
464 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
27a14e13 465 BT_CLOGE_APPEND_CAUSE("Socket creation failed: %s", bt_socket_errormsg());
4164020e
SM
466 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
467 goto error;
468 }
469
470 server_addr.sin_family = AF_INET;
471 server_addr.sin_port = htons(viewer_connection->port);
472 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
473 memset(&(server_addr.sin_zero), 0, 8);
474
475 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
476 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
27a14e13 477 BT_CLOGE_APPEND_CAUSE("Connection failed: %s", bt_socket_errormsg());
4164020e
SM
478 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
479 goto error;
480 }
481
482 status = lttng_live_handshake(viewer_connection);
483
484 /*
485 * Only print error and append cause in case of error. not in case of
486 * interruption.
487 */
488 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 489 BT_CLOGE_APPEND_CAUSE("Viewer handshake failed");
4164020e
SM
490 goto error;
491 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
492 goto end;
493 }
494
495 goto end;
7cdc2bab
MD
496
497error:
4164020e
SM
498 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
499 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
27a14e13 500 BT_CLOGW("Error closing socket: %s.", bt_socket_errormsg());
4164020e
SM
501 }
502 }
503 viewer_connection->control_sock = BT_INVALID_SOCKET;
f79c2d7a 504end:
4164020e 505 return status;
7cdc2bab
MD
506}
507
4164020e 508static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 509{
27a14e13 510 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
511
512 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
513 return;
514 }
515 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
27a14e13 516 BT_CLOGW("Error closing socket: %s", bt_socket_errormsg());
4164020e
SM
517 viewer_connection->control_sock = BT_INVALID_SOCKET;
518 }
7cdc2bab
MD
519}
520
4164020e
SM
521static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
522 bool *_found, struct live_viewer_connection *viewer_connection)
7cdc2bab 523{
27a14e13 524 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
525 int ret = 0;
526 uint64_t i, len;
527 bt_value *map = NULL;
528 bt_value *hostname = NULL;
529 bt_value *session_name = NULL;
530 bt_value *btval = NULL;
531 bool found = false;
532
533 len = bt_value_array_get_length(results);
534 for (i = 0; i < len; i++) {
535 const char *hostname_str = NULL;
536 const char *session_name_str = NULL;
537
538 map = bt_value_array_borrow_element_by_index(results, i);
539 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
540 if (!hostname) {
27a14e13 541 BT_CLOGE_APPEND_CAUSE("Error borrowing \"target-hostname\" entry.");
4164020e
SM
542 ret = -1;
543 goto end;
544 }
545 session_name = bt_value_map_borrow_entry_value(map, "session-name");
546 if (!session_name) {
27a14e13 547 BT_CLOGE_APPEND_CAUSE("Error borrowing \"session-name\" entry.");
4164020e
SM
548 ret = -1;
549 goto end;
550 }
551 hostname_str = bt_value_string_get(hostname);
552 session_name_str = bt_value_string_get(session_name);
553
554 if (strcmp(session->hostname, hostname_str) == 0 &&
555 strcmp(session->session_name, session_name_str) == 0) {
556 int64_t val;
557 uint32_t streams = be32toh(session->streams);
558 uint32_t clients = be32toh(session->clients);
559
560 found = true;
561
562 btval = bt_value_map_borrow_entry_value(map, "stream-count");
563 if (!btval) {
27a14e13 564 BT_CLOGE_APPEND_CAUSE("Error borrowing \"stream-count\" entry.");
4164020e
SM
565 ret = -1;
566 goto end;
567 }
568 val = bt_value_integer_unsigned_get(btval);
569 /* sum */
570 val += streams;
571 bt_value_integer_unsigned_set(btval, val);
572
573 btval = bt_value_map_borrow_entry_value(map, "client-count");
574 if (!btval) {
27a14e13 575 BT_CLOGE_APPEND_CAUSE("Error borrowing \"client-count\" entry.");
4164020e
SM
576 ret = -1;
577 goto end;
578 }
579 val = bt_value_integer_unsigned_get(btval);
580 /* max */
581 val = bt_max_t(int64_t, clients, val);
582 bt_value_integer_unsigned_set(btval, val);
583 }
584
585 if (found) {
586 break;
587 }
588 }
7cdc2bab 589end:
4164020e
SM
590 *_found = found;
591 return ret;
7cdc2bab
MD
592}
593
d6ed88fe 594static int list_append_session(bt_value *results, const std::string& base_url,
4164020e
SM
595 const struct lttng_viewer_session *session,
596 struct live_viewer_connection *viewer_connection)
7cdc2bab 597{
4164020e 598 int ret = 0;
27a14e13 599 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
600 bt_value_map_insert_entry_status insert_status;
601 bt_value_array_append_element_status append_status;
602 bt_value *map = NULL;
4164020e 603 bool found = false;
cff22928 604 std::string url;
4164020e
SM
605
606 /*
607 * If the session already exists, add the stream count to it,
608 * and do max of client counts.
609 */
610 ret = list_update_session(results, session, &found, viewer_connection);
611 if (ret || found) {
612 goto end;
613 }
614
615 map = bt_value_map_create();
616 if (!map) {
27a14e13 617 BT_CLOGE_APPEND_CAUSE("Error creating map value.");
4164020e
SM
618 ret = -1;
619 goto end;
620 }
621
d6ed88fe
SM
622 if (base_url.empty()) {
623 BT_CLOGE_APPEND_CAUSE("Error: base_url empty.");
4164020e
SM
624 ret = -1;
625 goto end;
626 }
627 /*
628 * key = "url",
629 * value = <string>,
630 */
d6ed88fe 631 url = base_url;
cff22928
SM
632 url += "/host/";
633 url += session->hostname;
634 url += '/';
635 url += session->session_name;
4164020e 636
cff22928 637 insert_status = bt_value_map_insert_string_entry(map, "url", url.c_str());
4164020e 638 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 639 BT_CLOGE_APPEND_CAUSE("Error inserting \"url\" entry.");
4164020e
SM
640 ret = -1;
641 goto end;
642 }
643
644 /*
645 * key = "target-hostname",
646 * value = <string>,
647 */
648 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
649 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 650 BT_CLOGE_APPEND_CAUSE("Error inserting \"target-hostname\" entry.");
4164020e
SM
651 ret = -1;
652 goto end;
653 }
654
655 /*
656 * key = "session-name",
657 * value = <string>,
658 */
659 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
660 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 661 BT_CLOGE_APPEND_CAUSE("Error inserting \"session-name\" entry.");
4164020e
SM
662 ret = -1;
663 goto end;
664 }
665
666 /*
667 * key = "timer-us",
668 * value = <integer>,
669 */
670 {
671 uint32_t live_timer = be32toh(session->live_timer);
672
673 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
674 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 675 BT_CLOGE_APPEND_CAUSE("Error inserting \"timer-us\" entry.");
4164020e
SM
676 ret = -1;
677 goto end;
678 }
679 }
680
681 /*
682 * key = "stream-count",
683 * value = <integer>,
684 */
685 {
686 uint32_t streams = be32toh(session->streams);
687
688 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
689 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 690 BT_CLOGE_APPEND_CAUSE("Error inserting \"stream-count\" entry.");
4164020e
SM
691 ret = -1;
692 goto end;
693 }
694 }
695
696 /*
697 * key = "client-count",
698 * value = <integer>,
699 */
700 {
701 uint32_t clients = be32toh(session->clients);
702
703 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
704 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
27a14e13 705 BT_CLOGE_APPEND_CAUSE("Error inserting \"client-count\" entry.");
4164020e
SM
706 ret = -1;
707 goto end;
708 }
709 }
710
711 append_status = bt_value_array_append_element(results, map);
712 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
27a14e13 713 BT_CLOGE_APPEND_CAUSE("Error appending map to results.");
4164020e
SM
714 ret = -1;
715 }
14f28187 716
7cdc2bab 717end:
4164020e
SM
718 BT_VALUE_PUT_REF_AND_RESET(map);
719 return ret;
7cdc2bab
MD
720}
721
722/*
723 * Data structure returned:
724 *
725 * {
726 * <array> = {
727 * [n] = {
728 * <map> = {
729 * {
730 * key = "url",
731 * value = <string>,
732 * },
733 * {
734 * key = "target-hostname",
735 * value = <string>,
736 * },
737 * {
738 * key = "session-name",
739 * value = <string>,
740 * },
741 * {
742 * key = "timer-us",
743 * value = <integer>,
744 * },
745 * {
746 * key = "stream-count",
747 * value = <integer>,
748 * },
749 * {
750 * key = "client-count",
751 * value = <integer>,
752 * },
753 * },
754 * }
755 * }
756 */
757
758BT_HIDDEN
4164020e
SM
759bt_component_class_query_method_status
760live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
761 const bt_value **user_result)
7cdc2bab 762{
27a14e13 763 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
4164020e
SM
764 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
765 bt_value *result = NULL;
766 enum lttng_live_viewer_status viewer_status;
767 struct lttng_viewer_cmd cmd;
768 struct lttng_viewer_list_sessions list;
769 uint32_t i, sessions_count;
770
771 result = bt_value_array_create();
772 if (!result) {
27a14e13 773 BT_CLOGE_APPEND_CAUSE("Error creating array");
4164020e
SM
774 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
775 goto error;
776 }
777
778 BT_LOGD("Requesting list of sessions: cmd=%s",
779 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
780
781 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
782 cmd.data_size = htobe64((uint64_t) 0);
783 cmd.cmd_version = htobe32(0);
784
785 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
786 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 787 BT_CLOGE_APPEND_CAUSE("Error sending list sessions command");
4164020e
SM
788 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
789 goto error;
790 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
791 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
792 goto error;
793 }
794
795 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
796 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 797 BT_CLOGE_APPEND_CAUSE("Error receiving session list");
4164020e
SM
798 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
799 goto error;
800 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
801 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
802 goto error;
803 }
804
805 sessions_count = be32toh(list.sessions_count);
806 for (i = 0; i < sessions_count; i++) {
807 struct lttng_viewer_session lsession;
808
809 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
810 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 811 BT_CLOGE_APPEND_CAUSE("Error receiving session:");
4164020e
SM
812 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
813 goto error;
814 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
815 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
816 goto error;
817 }
818
819 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
820 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
821 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
27a14e13 822 BT_CLOGE_APPEND_CAUSE("Error appending session");
4164020e
SM
823 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
824 goto error;
825 }
826 }
827
828 *user_result = result;
829 goto end;
7cdc2bab 830error:
4164020e 831 BT_VALUE_PUT_REF_AND_RESET(result);
7cdc2bab 832end:
4164020e 833 return status;
7cdc2bab
MD
834}
835
4164020e
SM
836static enum lttng_live_viewer_status
837lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 838{
4164020e
SM
839 struct lttng_viewer_cmd cmd;
840 struct lttng_viewer_list_sessions list;
841 struct lttng_viewer_session lsession;
842 uint32_t i, sessions_count;
843 uint64_t session_id;
844 enum lttng_live_viewer_status status;
845 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 846 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 847
27a14e13
SM
848 BT_CLOGD("Asking the relay daemon for the list of sessions: cmd=%s",
849 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
4164020e
SM
850
851 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
852 cmd.data_size = htobe64((uint64_t) 0);
853 cmd.cmd_version = htobe32(0);
854
855 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
856 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 857 viewer_handle_send_status(logCfg, status, "list sessions command");
4164020e
SM
858 goto end;
859 }
860
861 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
862 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 863 viewer_handle_recv_status(logCfg, status, "session list reply");
4164020e
SM
864 goto end;
865 }
866
867 sessions_count = be32toh(list.sessions_count);
868 for (i = 0; i < sessions_count; i++) {
869 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
870 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 871 viewer_handle_recv_status(logCfg, status, "session reply");
4164020e
SM
872 goto end;
873 }
874 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
875 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
876 session_id = be64toh(lsession.id);
877
27a14e13
SM
878 BT_CLOGI("Adding session to internal list: "
879 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
880 session_id, lsession.hostname, lsession.session_name);
4164020e
SM
881
882 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
883 LTTNG_VIEWER_NAME_MAX) == 0) &&
884 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
885 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
886 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
887 lsession.session_name)) {
27a14e13 888 BT_CLOGE_APPEND_CAUSE("Failed to add live session");
4164020e
SM
889 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
890 goto end;
891 }
892 }
893 }
894
895 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 896
f79c2d7a 897end:
4164020e 898 return status;
7cdc2bab
MD
899}
900
901BT_HIDDEN
4164020e
SM
902enum lttng_live_viewer_status
903lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 904{
4164020e
SM
905 struct lttng_viewer_cmd cmd;
906 struct lttng_viewer_create_session_response resp;
907 enum lttng_live_viewer_status status;
908 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 909 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 910
27a14e13
SM
911 BT_CLOGD("Creating a viewer session: cmd=%s",
912 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
4164020e
SM
913
914 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
915 cmd.data_size = htobe64((uint64_t) 0);
916 cmd.cmd_version = htobe32(0);
917
918 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
919 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 920 viewer_handle_send_status(logCfg, status, "create session command");
4164020e
SM
921 goto end;
922 }
923
924 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
925 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 926 viewer_handle_recv_status(logCfg, status, "create session reply");
4164020e
SM
927 goto end;
928 }
929
930 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
27a14e13 931 BT_CLOGE_APPEND_CAUSE("Error creating viewer session");
4164020e
SM
932 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
933 goto end;
934 }
935
936 status = lttng_live_query_session_ids(lttng_live_msg_iter);
937 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13 938 BT_CLOGE_APPEND_CAUSE("Failed to query live viewer session ids");
4164020e
SM
939 goto end;
940 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
941 goto end;
942 }
7cdc2bab 943
f79c2d7a 944end:
4164020e 945 return status;
7cdc2bab
MD
946}
947
4164020e
SM
948static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
949 uint32_t stream_count,
950 bt_self_message_iterator *self_msg_iter)
7cdc2bab 951{
4164020e
SM
952 uint32_t i;
953 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
954 enum lttng_live_viewer_status status;
955 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 956 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e 957
27a14e13 958 BT_CLOGI("Getting %" PRIu32 " new streams", stream_count);
4164020e
SM
959 for (i = 0; i < stream_count; i++) {
960 struct lttng_viewer_stream stream;
961 struct lttng_live_stream_iterator *live_stream;
962 uint64_t stream_id;
963 uint64_t ctf_trace_id;
964
965 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
966 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 967 viewer_handle_recv_status(logCfg, status, "stream reply");
4164020e
SM
968 goto end;
969 }
970 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
971 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
972 stream_id = be64toh(stream.id);
973 ctf_trace_id = be64toh(stream.ctf_trace_id);
974
975 if (stream.metadata_flag) {
27a14e13
SM
976 BT_CLOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
977 stream.channel_name);
4164020e
SM
978 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id,
979 stream.path_name)) {
27a14e13 980 BT_CLOGE_APPEND_CAUSE("Error creating metadata stream");
4164020e
SM
981 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
982 goto end;
983 }
984 session->lazy_stream_msg_init = true;
985 } else {
27a14e13
SM
986 BT_CLOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
987 stream.channel_name);
4164020e
SM
988 live_stream =
989 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
990 if (!live_stream) {
27a14e13 991 BT_CLOGE_APPEND_CAUSE("Error creating stream");
4164020e
SM
992 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
993 goto end;
994 }
995 }
996 }
997 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 998
f79c2d7a 999end:
4164020e 1000 return status;
7cdc2bab
MD
1001}
1002
1003BT_HIDDEN
4164020e
SM
1004enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1005 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1006{
4164020e
SM
1007 struct lttng_viewer_cmd cmd;
1008 enum lttng_live_viewer_status status;
1009 struct lttng_viewer_attach_session_request rq;
1010 struct lttng_viewer_attach_session_response rp;
1011 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1012 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 1013 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e
SM
1014 uint64_t session_id = session->id;
1015 uint32_t streams_count;
1016 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1017 char cmd_buf[cmd_buf_len];
1018
27a14e13
SM
1019 BT_CLOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1020 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1021 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
4164020e
SM
1022
1023 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1024 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1025 cmd.cmd_version = htobe32(0);
1026
1027 memset(&rq, 0, sizeof(rq));
1028 rq.session_id = htobe64(session_id);
1029 // TODO: add cmd line parameter to select seek beginning
1030 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1031 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1032
1033 /*
1034 * Merge the cmd and connection request to prevent a write-write
1035 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1036 * second write to be performed quickly in presence of Nagle's algorithm.
1037 */
1038 memcpy(cmd_buf, &cmd, sizeof(cmd));
1039 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1040 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1041 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1042 viewer_handle_send_status(logCfg, status, "attach session command");
4164020e
SM
1043 goto end;
1044 }
1045
1046 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1047 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1048 viewer_handle_recv_status(logCfg, status, "attach session reply");
4164020e
SM
1049 goto end;
1050 }
1051
1052 streams_count = be32toh(rp.streams_count);
1053 switch (be32toh(rp.status)) {
1054 case LTTNG_VIEWER_ATTACH_OK:
1055 break;
1056 case LTTNG_VIEWER_ATTACH_UNK:
27a14e13 1057 BT_CLOGE_APPEND_CAUSE("Session id %" PRIu64 " is unknown", session_id);
4164020e
SM
1058 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1059 goto end;
1060 case LTTNG_VIEWER_ATTACH_ALREADY:
27a14e13 1061 BT_CLOGE_APPEND_CAUSE("There is already a viewer attached to this session");
4164020e
SM
1062 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1063 goto end;
1064 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
27a14e13 1065 BT_CLOGE_APPEND_CAUSE("Not a live session");
4164020e
SM
1066 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1067 goto end;
1068 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
27a14e13 1069 BT_CLOGE_APPEND_CAUSE("Wrong seek parameter");
4164020e
SM
1070 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1071 goto end;
1072 default:
27a14e13 1073 BT_CLOGE_APPEND_CAUSE("Unknown attach return code %u", be32toh(rp.status));
4164020e
SM
1074 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1075 goto end;
1076 }
1077
1078 /* We receive the initial list of streams. */
1079 status = receive_streams(session, streams_count, self_msg_iter);
1080 switch (status) {
1081 case LTTNG_LIVE_VIEWER_STATUS_OK:
1082 break;
1083 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1084 goto end;
1085 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
27a14e13 1086 BT_CLOGE_APPEND_CAUSE("Error receiving streams");
4164020e
SM
1087 goto end;
1088 default:
1089 bt_common_abort();
1090 }
1091
1092 session->attached = true;
1093 session->new_streams_needed = false;
7cdc2bab 1094
eee8e741 1095end:
4164020e 1096 return status;
7cdc2bab
MD
1097}
1098
1099BT_HIDDEN
4164020e 1100enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
7cdc2bab 1101{
4164020e
SM
1102 struct lttng_viewer_cmd cmd;
1103 enum lttng_live_viewer_status status;
1104 struct lttng_viewer_detach_session_request rq;
1105 struct lttng_viewer_detach_session_response rp;
1106 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
4164020e
SM
1107 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1108 uint64_t session_id = session->id;
1109 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1110 char cmd_buf[cmd_buf_len];
27a14e13 1111 const bt2_common::LogCfg& logCfg = session->logCfg;
4164020e
SM
1112
1113 /*
1114 * The session might already be detached and the viewer socket might
1115 * already been closed. This happens when calling this function when
1116 * tearing down the graph after an error.
1117 */
1118 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1119 return LTTNG_LIVE_VIEWER_STATUS_OK;
1120 }
1121
27a14e13
SM
1122 BT_CLOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1123 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
4164020e
SM
1124
1125 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1126 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1127 cmd.cmd_version = htobe32(0);
1128
1129 memset(&rq, 0, sizeof(rq));
1130 rq.session_id = htobe64(session_id);
1131
1132 /*
1133 * Merge the cmd and connection request to prevent a write-write
1134 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1135 * second write to be performed quickly in presence of Nagle's algorithm.
1136 */
1137 memcpy(cmd_buf, &cmd, sizeof(cmd));
1138 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1139 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1140 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1141 viewer_handle_send_status(logCfg, status, "detach session command");
4164020e
SM
1142 goto end;
1143 }
1144
1145 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1146 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1147 viewer_handle_recv_status(logCfg, status, "detach session reply");
4164020e
SM
1148 goto end;
1149 }
1150
1151 switch (be32toh(rp.status)) {
1152 case LTTNG_VIEWER_DETACH_SESSION_OK:
1153 break;
1154 case LTTNG_VIEWER_DETACH_SESSION_UNK:
27a14e13 1155 BT_CLOGW("Session id %" PRIu64 " is unknown", session_id);
4164020e
SM
1156 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1157 goto end;
1158 case LTTNG_VIEWER_DETACH_SESSION_ERR:
27a14e13 1159 BT_CLOGW("Error detaching session id %" PRIu64 "", session_id);
4164020e
SM
1160 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1161 goto end;
1162 default:
27a14e13 1163 BT_CLOGE("Unknown detach return code %u", be32toh(rp.status));
4164020e
SM
1164 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1165 goto end;
1166 }
1167
1168 session->attached = false;
1169
1170 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1171
f79c2d7a 1172end:
4164020e 1173 return status;
7cdc2bab
MD
1174}
1175
1176BT_HIDDEN
4164020e 1177enum lttng_live_get_one_metadata_status
c114204d 1178lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, std::vector<char>& buf)
7cdc2bab 1179{
4164020e
SM
1180 uint64_t len = 0;
1181 enum lttng_live_get_one_metadata_status status;
1182 enum lttng_live_viewer_status viewer_status;
1183 struct lttng_viewer_cmd cmd;
1184 struct lttng_viewer_get_metadata rq;
1185 struct lttng_viewer_metadata_packet rp;
dbd1827b 1186 std::vector<char> data;
4164020e
SM
1187 struct lttng_live_session *session = trace->session;
1188 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
a0843c0b 1189 struct lttng_live_metadata *metadata = trace->metadata.get();
4164020e 1190 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 1191 const bt2_common::LogCfg& logCfg = trace->logCfg;
4164020e
SM
1192 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1193 char cmd_buf[cmd_buf_len];
1194
27a14e13
SM
1195 BT_CLOGD("Requesting new metadata for trace:"
1196 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1197 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1198 metadata->stream_id);
4164020e
SM
1199
1200 rq.stream_id = htobe64(metadata->stream_id);
1201 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1202 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1203 cmd.cmd_version = htobe32(0);
1204
1205 /*
1206 * Merge the cmd and connection request to prevent a write-write
1207 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1208 * second write to be performed quickly in presence of Nagle's algorithm.
1209 */
1210 memcpy(cmd_buf, &cmd, sizeof(cmd));
1211 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1212 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1213 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1214 viewer_handle_send_status(logCfg, viewer_status, "get metadata command");
4164020e
SM
1215 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1216 goto end;
1217 }
1218
1219 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1220 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1221 viewer_handle_recv_status(logCfg, viewer_status, "get metadata reply");
4164020e
SM
1222 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1223 goto end;
1224 }
1225
1226 switch (be32toh(rp.status)) {
1227 case LTTNG_VIEWER_METADATA_OK:
27a14e13 1228 BT_CLOGD("Received get_metadata response: ok");
4164020e
SM
1229 break;
1230 case LTTNG_VIEWER_NO_NEW_METADATA:
27a14e13 1231 BT_CLOGD("Received get_metadata response: no new");
4164020e
SM
1232 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1233 goto end;
1234 case LTTNG_VIEWER_METADATA_ERR:
1235 /*
1236 * The Relayd cannot find this stream id. Maybe its
1237 * gone already. This can happen in short lived UST app
1238 * in a per-pid session.
1239 */
27a14e13 1240 BT_CLOGD("Received get_metadata response: error");
4164020e
SM
1241 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1242 goto end;
1243 default:
27a14e13 1244 BT_CLOGE_APPEND_CAUSE("Received get_metadata response: unknown");
4164020e
SM
1245 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1246 goto end;
1247 }
1248
1249 len = be64toh(rp.len);
c5ce3927
FD
1250 if (len == 0) {
1251 /*
1252 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1253 * length of 0. This means we must try again. This scenario
1254 * arises when a clear command is performed on an lttng session.
1255 */
27a14e13 1256 BT_CLOGD("Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
c5ce3927
FD
1257 goto empty_metadata_packet_retry;
1258 }
1259
27a14e13 1260 BT_CLOGD("Writing %" PRIu64 " bytes to metadata", len);
4164020e 1261 if (len <= 0) {
27a14e13 1262 BT_CLOGE_APPEND_CAUSE("Erroneous response length");
4164020e
SM
1263 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1264 goto end;
1265 }
1266
dbd1827b 1267 data.resize(len);
4164020e 1268
dbd1827b 1269 viewer_status = lttng_live_recv(viewer_connection, data.data(), len);
4164020e 1270 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1271 viewer_handle_recv_status(logCfg, viewer_status, "get metadata packet");
4164020e
SM
1272 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1273 goto end;
1274 }
1275
1276 /*
1277 * Write the metadata to the file handle.
1278 */
dbd1827b 1279 buf.insert(buf.end(), data.begin(), data.end());
4164020e 1280
c5ce3927 1281empty_metadata_packet_retry:
4164020e 1282 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
c28512ab 1283
c28512ab 1284end:
4164020e 1285 return status;
7cdc2bab
MD
1286}
1287
1288/*
1289 * Assign the fields from a lttng_viewer_index to a packet_index.
1290 */
4164020e
SM
1291static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1292 struct packet_index *pindex)
7cdc2bab 1293{
4164020e
SM
1294 BT_ASSERT(lindex);
1295 BT_ASSERT(pindex);
1296
1297 pindex->offset = be64toh(lindex->offset);
1298 pindex->packet_size = be64toh(lindex->packet_size);
1299 pindex->content_size = be64toh(lindex->content_size);
1300 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1301 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1302 pindex->events_discarded = be64toh(lindex->events_discarded);
7cdc2bab
MD
1303}
1304
4164020e 1305static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
36e94ad6 1306{
27a14e13 1307 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 1308
bef0ed48 1309 for (lttng_live_session::UP& session : lttng_live_msg_iter->sessions) {
27a14e13
SM
1310 BT_CLOGD("Marking session as needing new streams: "
1311 "session-id=%" PRIu64,
1312 session->id);
4164020e
SM
1313 session->new_streams_needed = true;
1314 }
36e94ad6
FD
1315}
1316
7cdc2bab 1317BT_HIDDEN
4164020e
SM
1318enum lttng_live_iterator_status
1319lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1320 struct lttng_live_stream_iterator *stream, struct packet_index *index)
7cdc2bab 1321{
4164020e
SM
1322 struct lttng_viewer_cmd cmd;
1323 struct lttng_viewer_get_next_index rq;
1324 enum lttng_live_viewer_status viewer_status;
1325 struct lttng_viewer_index rp;
1326 enum lttng_live_iterator_status status;
1327 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
4164020e
SM
1328 struct lttng_live_trace *trace = stream->trace;
1329 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1330 char cmd_buf[cmd_buf_len];
1331 uint32_t flags, rp_status;
27a14e13 1332 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e 1333
27a14e13
SM
1334 BT_CLOGD("Requesting next index for stream: cmd=%s, "
1335 "viewer-stream-id=%" PRIu64,
1336 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), stream->viewer_stream_id);
4164020e
SM
1337 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1338 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1339 cmd.cmd_version = htobe32(0);
1340
1341 memset(&rq, 0, sizeof(rq));
1342 rq.stream_id = htobe64(stream->viewer_stream_id);
1343
1344 /*
1345 * Merge the cmd and connection request to prevent a write-write
1346 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1347 * second write to be performed quickly in presence of Nagle's algorithm.
1348 */
1349 memcpy(cmd_buf, &cmd, sizeof(cmd));
1350 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1351
1352 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1353 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1354 viewer_handle_send_status(logCfg, viewer_status, "get next index command");
4164020e
SM
1355 goto error;
1356 }
1357
1358 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1359 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1360 viewer_handle_recv_status(logCfg, viewer_status, "get next index reply");
4164020e
SM
1361 goto error;
1362 }
1363
1364 flags = be32toh(rp.flags);
1365 rp_status = be32toh(rp.status);
1366
27a14e13
SM
1367 BT_CLOGD("Received response from relay daemon: cmd=%s, response=%s",
1368 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1369 lttng_viewer_next_index_return_code_string(rp_status));
4164020e
SM
1370 switch (rp_status) {
1371 case LTTNG_VIEWER_INDEX_INACTIVE:
1372 {
1373 uint64_t ctf_stream_class_id;
1374
1375 memset(index, 0, sizeof(struct packet_index));
1376 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1377 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1378 ctf_stream_class_id = be64toh(rp.stream_id);
1379 if (stream->ctf_stream_class_id.is_set) {
1380 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1381 } else {
1382 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1383 stream->ctf_stream_class_id.is_set = true;
1384 }
1385 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1386 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1387 break;
1388 }
1389 case LTTNG_VIEWER_INDEX_OK:
1390 {
1391 uint64_t ctf_stream_class_id;
1392
1393 lttng_index_to_packet_index(&rp, index);
1394 ctf_stream_class_id = be64toh(rp.stream_id);
1395 if (stream->ctf_stream_class_id.is_set) {
1396 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1397 } else {
1398 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1399 stream->ctf_stream_class_id.is_set = true;
1400 }
1401
1402 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1403
1404 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
27a14e13
SM
1405 BT_CLOGD("Marking trace as needing new metadata: "
1406 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1407 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
4164020e
SM
1408 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1409 }
1410 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
27a14e13
SM
1411 BT_CLOGD("Marking all sessions as possibly needing new streams: "
1412 "response=%s, response-flag=NEW_STREAM",
1413 lttng_viewer_next_index_return_code_string(rp_status));
4164020e
SM
1414 lttng_live_need_new_streams(lttng_live_msg_iter);
1415 }
1416 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1417 break;
1418 }
1419 case LTTNG_VIEWER_INDEX_RETRY:
1420 memset(index, 0, sizeof(struct packet_index));
1421 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1422 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1423 goto end;
1424 case LTTNG_VIEWER_INDEX_HUP:
1425 memset(index, 0, sizeof(struct packet_index));
1426 index->offset = EOF;
1427 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1428 stream->has_stream_hung_up = true;
1429 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1430 break;
1431 case LTTNG_VIEWER_INDEX_ERR:
1432 memset(index, 0, sizeof(struct packet_index));
1433 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1434 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1435 goto end;
1436 default:
27a14e13 1437 BT_CLOGD("Received get_next_index response: unknown value");
4164020e
SM
1438 memset(index, 0, sizeof(struct packet_index));
1439 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1440 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1441 goto end;
1442 }
1443 goto end;
7cdc2bab
MD
1444
1445error:
4164020e 1446 status = viewer_status_to_live_iterator_status(viewer_status);
f79c2d7a 1447end:
4164020e 1448 return status;
7cdc2bab
MD
1449}
1450
1451BT_HIDDEN
4164020e
SM
1452enum ctf_msg_iter_medium_status
1453lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1454 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1455 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1456{
4164020e
SM
1457 enum ctf_msg_iter_medium_status status;
1458 enum lttng_live_viewer_status viewer_status;
1459 struct lttng_viewer_trace_packet rp;
1460 struct lttng_viewer_cmd cmd;
1461 struct lttng_viewer_get_packet rq;
1462 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 1463 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
1464 struct lttng_live_trace *trace = stream->trace;
1465 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1466 char cmd_buf[cmd_buf_len];
1467 uint32_t flags, rp_status;
1468
27a14e13
SM
1469 BT_CLOGD("Requesting data from stream: cmd=%s, "
1470 "offset=%" PRIu64 ", request-len=%" PRIu64,
1471 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
4164020e
SM
1472
1473 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1474 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1475 cmd.cmd_version = htobe32(0);
1476
1477 memset(&rq, 0, sizeof(rq));
1478 rq.stream_id = htobe64(stream->viewer_stream_id);
1479 rq.offset = htobe64(offset);
1480 rq.len = htobe32(req_len);
1481
1482 /*
1483 * Merge the cmd and connection request to prevent a write-write
1484 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1485 * second write to be performed quickly in presence of Nagle's algorithm.
1486 */
1487 memcpy(cmd_buf, &cmd, sizeof(cmd));
1488 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1489
1490 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1491 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1492 viewer_handle_send_status(logCfg, viewer_status, "get data packet command");
4164020e
SM
1493 goto error_convert_status;
1494 }
1495
1496 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1497 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1498 viewer_handle_recv_status(logCfg, viewer_status, "get data packet reply");
4164020e
SM
1499 goto error_convert_status;
1500 }
1501
1502 flags = be32toh(rp.flags);
1503 rp_status = be32toh(rp.status);
1504
27a14e13
SM
1505 BT_CLOGD("Received response from relay daemon: cmd=%s, response=%s",
1506 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1507 lttng_viewer_get_packet_return_code_string(rp_status));
4164020e
SM
1508 switch (rp_status) {
1509 case LTTNG_VIEWER_GET_PACKET_OK:
1510 req_len = be32toh(rp.len);
27a14e13
SM
1511 BT_CLOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1512 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
4164020e
SM
1513 break;
1514 case LTTNG_VIEWER_GET_PACKET_RETRY:
1515 /* Unimplemented by relay daemon */
1516 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1517 goto end;
1518 case LTTNG_VIEWER_GET_PACKET_ERR:
1519 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
27a14e13
SM
1520 BT_CLOGD("Marking trace as needing new metadata: "
1521 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1522 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
4164020e
SM
1523 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1524 }
1525 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
27a14e13
SM
1526 BT_CLOGD("Marking all sessions as possibly needing new streams: "
1527 "response=%s, response-flag=NEW_STREAM",
1528 lttng_viewer_next_index_return_code_string(rp_status));
4164020e
SM
1529 lttng_live_need_new_streams(lttng_live_msg_iter);
1530 }
1531 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1532 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
27a14e13
SM
1533 BT_CLOGD("Reply with any one flags set means we should retry: response=%s",
1534 lttng_viewer_get_packet_return_code_string(rp_status));
4164020e
SM
1535 goto end;
1536 }
27a14e13 1537 BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: error");
4164020e
SM
1538 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1539 goto end;
1540 case LTTNG_VIEWER_GET_PACKET_EOF:
1541 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1542 goto end;
1543 default:
27a14e13 1544 BT_CLOGE_APPEND_CAUSE("Received get_data_packet response: unknown (%d)", rp_status);
4164020e
SM
1545 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1546 goto end;
1547 }
1548
1549 if (req_len == 0) {
1550 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1551 goto end;
1552 }
1553
1554 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1555 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1556 viewer_handle_recv_status(logCfg, viewer_status, "get data packet");
4164020e
SM
1557 goto error_convert_status;
1558 }
1559 *recv_len = req_len;
1560
1561 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1562 goto end;
f79c2d7a 1563
535fb48a 1564error_convert_status:
4164020e 1565 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
f79c2d7a 1566end:
4164020e 1567 return status;
7cdc2bab
MD
1568}
1569
1570/*
1571 * Request new streams for a session.
1572 */
1573BT_HIDDEN
4164020e
SM
1574enum lttng_live_iterator_status
1575lttng_live_session_get_new_streams(struct lttng_live_session *session,
1576 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1577{
4164020e
SM
1578 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1579 struct lttng_viewer_cmd cmd;
1580 struct lttng_viewer_new_streams_request rq;
1581 struct lttng_viewer_new_streams_response rp;
1582 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1583 enum lttng_live_viewer_status viewer_status;
1584 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
27a14e13 1585 const bt2_common::LogCfg& logCfg = lttng_live_msg_iter->logCfg;
4164020e
SM
1586 uint32_t streams_count;
1587 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1588 char cmd_buf[cmd_buf_len];
1589
1590 if (!session->new_streams_needed) {
1591 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1592 goto end;
1593 }
1594
27a14e13
SM
1595 BT_CLOGD("Requesting new streams for session: cmd=%s, "
1596 "session-id=%" PRIu64,
1597 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
4164020e
SM
1598
1599 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1600 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1601 cmd.cmd_version = htobe32(0);
1602
1603 memset(&rq, 0, sizeof(rq));
1604 rq.session_id = htobe64(session->id);
1605
1606 /*
1607 * Merge the cmd and connection request to prevent a write-write
1608 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1609 * second write to be performed quickly in presence of Nagle's algorithm.
1610 */
1611 memcpy(cmd_buf, &cmd, sizeof(cmd));
1612 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1613
1614 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1615 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1616 viewer_handle_send_status(logCfg, viewer_status, "get new streams command");
4164020e
SM
1617 status = viewer_status_to_live_iterator_status(viewer_status);
1618 goto end;
1619 }
1620
1621 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1622 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1623 viewer_handle_recv_status(logCfg, viewer_status, "get new streams reply");
4164020e
SM
1624 status = viewer_status_to_live_iterator_status(viewer_status);
1625 goto end;
1626 }
1627
1628 streams_count = be32toh(rp.streams_count);
1629
1630 switch (be32toh(rp.status)) {
1631 case LTTNG_VIEWER_NEW_STREAMS_OK:
1632 session->new_streams_needed = false;
1633 break;
1634 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1635 session->new_streams_needed = false;
1636 goto end;
1637 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1638 session->new_streams_needed = false;
1639 session->closed = true;
1640 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1641 goto end;
1642 case LTTNG_VIEWER_NEW_STREAMS_ERR:
27a14e13 1643 BT_CLOGD("Received get_new_streams response: error");
4164020e
SM
1644 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1645 goto end;
1646 default:
27a14e13
SM
1647 BT_CLOGE_APPEND_CAUSE("Received get_new_streams response: Unknown:"
1648 "return code %u",
1649 be32toh(rp.status));
4164020e
SM
1650 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1651 goto end;
1652 }
1653
1654 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1655 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
27a14e13 1656 viewer_handle_recv_status(logCfg, viewer_status, "new streams");
4164020e
SM
1657 status = viewer_status_to_live_iterator_status(viewer_status);
1658 goto end;
1659 }
1660
1661 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
f79c2d7a 1662end:
4164020e 1663 return status;
7cdc2bab
MD
1664}
1665
1666BT_HIDDEN
f79c2d7a 1667enum lttng_live_viewer_status live_viewer_connection_create(
27a14e13
SM
1668 const char *url, bool in_query, struct lttng_live_msg_iter *lttng_live_msg_iter,
1669 const bt2_common::LogCfg& logCfg, struct live_viewer_connection **viewer)
7cdc2bab 1670{
4164020e
SM
1671 enum lttng_live_viewer_status status;
1672
27a14e13 1673 live_viewer_connection *viewer_connection = new live_viewer_connection {logCfg};
4164020e 1674
27a14e13
SM
1675 if (bt_socket_init(logCfg.logLevel()) != 0) {
1676 BT_CLOGE_APPEND_CAUSE("Failed to init socket");
4164020e
SM
1677 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1678 goto error;
1679 }
1680
4164020e
SM
1681 viewer_connection->control_sock = BT_INVALID_SOCKET;
1682 viewer_connection->port = -1;
1683 viewer_connection->in_query = in_query;
1684 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
d6ed88fe 1685 viewer_connection->url = url;
4164020e 1686
27a14e13 1687 BT_CLOGD("Establishing connection to url \"%s\"...", url);
4164020e
SM
1688 status = lttng_live_connect_viewer(viewer_connection);
1689 /*
1690 * Only print error and append cause in case of error. not in case of
1691 * interruption.
1692 */
1693 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
27a14e13
SM
1694 BT_CLOGE_APPEND_CAUSE("Failed to establish connection: "
1695 "url=\"%s\"",
1696 url);
4164020e
SM
1697 goto error;
1698 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1699 goto error;
1700 }
27a14e13 1701 BT_CLOGD("Connection to url \"%s\" is established", url);
4164020e
SM
1702
1703 *viewer = viewer_connection;
1704 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1705 goto end;
7cdc2bab 1706
7cdc2bab 1707error:
4164020e
SM
1708 if (viewer_connection) {
1709 live_viewer_connection_destroy(viewer_connection);
1710 }
f79c2d7a 1711end:
4164020e 1712 return status;
7cdc2bab
MD
1713}
1714
1715BT_HIDDEN
4164020e 1716void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
7cdc2bab 1717{
27a14e13 1718 const bt2_common::LogCfg& logCfg = viewer_connection->logCfg;
b9e6ec43 1719
4164020e
SM
1720 if (!viewer_connection) {
1721 goto end;
1722 }
b9e6ec43 1723
27a14e13
SM
1724 BT_CLOGD("Closing connection to relay:"
1725 "relay-url=\"%s\"",
d6ed88fe 1726 viewer_connection->url.c_str());
ecb4ba8a 1727
4164020e 1728 lttng_live_disconnect_viewer(viewer_connection);
b9e6ec43 1729
6269f212 1730 delete viewer_connection;
1cb3cdd7 1731
4164020e 1732 bt_socket_fini();
b9e6ec43
FD
1733
1734end:
4164020e 1735 return;
7cdc2bab 1736}
This page took 0.156517 seconds and 5 git commands to generate.