ctf: allocate some structures with new
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7cdc2bab
MD
6 */
7
c802cacb 8#include <glib.h>
7cdc2bab 9#include <stdint.h>
3c22a242 10#include <stdio.h>
3c22a242 11
c802cacb
SM
12#include <babeltrace2/babeltrace.h>
13
14#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
71436ae4 15#define BT_LOG_OUTPUT_LEVEL ((enum bt_log_level) viewer_connection->log_level)
c802cacb
SM
16#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
17#include "logging/comp-logging.h"
7cdc2bab 18
578e048b 19#include "common/common.h"
83ad336c 20#include "compat/endian.h" /* IWYU pragma: keep */
7cdc2bab 21
c802cacb 22#include "data-stream.hpp"
087cd0f5 23#include "lttng-live.hpp"
087cd0f5 24#include "lttng-viewer-abi.hpp"
087cd0f5 25#include "metadata.hpp"
c802cacb 26#include "viewer-connection.hpp"
7cdc2bab 27
4164020e
SM
28#define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
29 do { \
30 switch (_status) { \
31 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
32 break; \
33 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
34 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
35 "Error " _action " " _msg_str); \
36 break; \
37 default: \
38 bt_common_abort(); \
39 } \
40 } while (0)
41
42#define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
43 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
44
45#define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
46 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
47
48#define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
49 _fmt, ...) \
50 do { \
51 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
52 bt_socket_errormsg(), ##__VA_ARGS__); \
53 } while (0)
54
55static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
f93afbf9 56{
4164020e
SM
57 switch (cmd) {
58 case LTTNG_VIEWER_CONNECT:
59 return "CONNECT";
60 case LTTNG_VIEWER_LIST_SESSIONS:
61 return "LIST_SESSIONS";
62 case LTTNG_VIEWER_ATTACH_SESSION:
63 return "ATTACH_SESSION";
64 case LTTNG_VIEWER_GET_NEXT_INDEX:
65 return "GET_NEXT_INDEX";
66 case LTTNG_VIEWER_GET_PACKET:
67 return "GET_PACKET";
68 case LTTNG_VIEWER_GET_METADATA:
69 return "GET_METADATA";
70 case LTTNG_VIEWER_GET_NEW_STREAMS:
71 return "GET_NEW_STREAMS";
72 case LTTNG_VIEWER_CREATE_SESSION:
73 return "CREATE_SESSION";
74 case LTTNG_VIEWER_DETACH_SESSION:
75 return "DETACH_SESSION";
76 }
77
78 bt_common_abort();
f93afbf9
FD
79}
80
4164020e
SM
81static const char *
82lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
f93afbf9 83{
4164020e
SM
84 switch (code) {
85 case LTTNG_VIEWER_INDEX_OK:
86 return "INDEX_OK";
87 case LTTNG_VIEWER_INDEX_RETRY:
88 return "INDEX_RETRY";
89 case LTTNG_VIEWER_INDEX_HUP:
90 return "INDEX_HUP";
91 case LTTNG_VIEWER_INDEX_ERR:
92 return "INDEX_ERR";
93 case LTTNG_VIEWER_INDEX_INACTIVE:
94 return "INDEX_INACTIVE";
95 case LTTNG_VIEWER_INDEX_EOF:
96 return "INDEX_EOF";
97 }
98
99 bt_common_abort();
f93afbf9
FD
100}
101
4164020e 102static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
087cd0f5 103{
4164020e 104 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
087cd0f5
SM
105}
106
4164020e
SM
107static const char *
108lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
f93afbf9 109{
4164020e
SM
110 switch (code) {
111 case LTTNG_VIEWER_GET_PACKET_OK:
112 return "GET_PACKET_OK";
113 case LTTNG_VIEWER_GET_PACKET_RETRY:
114 return "GET_PACKET_RETRY";
115 case LTTNG_VIEWER_GET_PACKET_ERR:
116 return "GET_PACKET_ERR";
117 case LTTNG_VIEWER_GET_PACKET_EOF:
118 return "GET_PACKET_EOF";
119 }
120
121 bt_common_abort();
f93afbf9
FD
122};
123
4164020e 124static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
087cd0f5 125{
4164020e 126 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
087cd0f5
SM
127}
128
4164020e 129static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
f93afbf9 130{
4164020e
SM
131 switch (seek) {
132 case LTTNG_VIEWER_SEEK_BEGINNING:
133 return "SEEK_BEGINNING";
134 case LTTNG_VIEWER_SEEK_LAST:
135 return "SEEK_LAST";
136 }
137
138 bt_common_abort();
f93afbf9
FD
139}
140
4164020e
SM
141static inline enum lttng_live_iterator_status
142viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 143{
4164020e
SM
144 switch (viewer_status) {
145 case LTTNG_LIVE_VIEWER_STATUS_OK:
146 return LTTNG_LIVE_ITERATOR_STATUS_OK;
147 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
148 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
149 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
150 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
151 }
152
153 bt_common_abort();
f79c2d7a
FD
154}
155
4164020e
SM
156static inline enum ctf_msg_iter_medium_status
157viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 158{
4164020e
SM
159 switch (viewer_status) {
160 case LTTNG_LIVE_VIEWER_STATUS_OK:
161 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
162 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
163 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
164 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
165 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
166 }
167
168 bt_common_abort();
f79c2d7a
FD
169}
170
4164020e 171static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
b197ca37 172{
4164020e
SM
173 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
174 bt_self_component *self_comp = viewer_connection->self_comp;
175 int ret = bt_socket_close(viewer_connection->control_sock);
176 if (ret == -1) {
177 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
178 "Error closing viewer connection socket: ", ".");
179 }
180
181 viewer_connection->control_sock = BT_INVALID_SOCKET;
b197ca37
FD
182}
183
f79c2d7a
FD
184/*
185 * This function receives a message from the Relay daemon.
186 * If it received the entire message, it returns _OK,
187 * If it's interrupted, it returns _INTERRUPTED,
188 * otherwise, it returns _ERROR.
189 */
4164020e
SM
190static enum lttng_live_viewer_status
191lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
7cdc2bab 192{
4164020e
SM
193 ssize_t received;
194 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
195 bt_self_component *self_comp = viewer_connection->self_comp;
196 size_t total_received = 0, to_receive = len;
197 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
198 enum lttng_live_viewer_status status;
199 BT_SOCKET sock = viewer_connection->control_sock;
200
201 /*
202 * Receive a message from the Relay.
203 */
204 do {
205 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
206 if (received == BT_SOCKET_ERROR) {
207 if (bt_socket_interrupted()) {
208 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
209 /*
210 * This interruption was due to a
211 * SIGINT and the graph is being torn
212 * down.
213 */
214 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
215 lttng_live_msg_iter->was_interrupted = true;
216 goto end;
217 } else {
218 /*
219 * A signal was received, but the graph
220 * is not being torn down. Carry on.
221 */
222 continue;
223 }
224 } else {
225 /*
226 * For any other types of socket error, close
227 * the socket and return an error.
228 */
229 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
230 self_comp, self_comp_class, "Error receiving from Relay", ".");
231
232 viewer_connection_close_socket(viewer_connection);
233 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
234 goto end;
235 }
236 } else if (received == 0) {
237 /*
238 * The recv() call returned 0. This means the
239 * connection was orderly shutdown from the other peer.
240 * If that happens when we are trying to receive
241 * a message from it, it means something when wrong.
242 * Close the socket and return an error.
243 */
244 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
245 "Remote side has closed connection");
246 viewer_connection_close_socket(viewer_connection);
247 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
248 goto end;
249 }
250
251 BT_ASSERT(received <= to_receive);
252 total_received += received;
253 to_receive -= received;
254
255 } while (to_receive > 0);
256
257 BT_ASSERT(total_received == len);
258 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
259
260end:
4164020e 261 return status;
7cdc2bab
MD
262}
263
f79c2d7a
FD
264/*
265 * This function sends a message to the Relay daemon.
266 * If it send the message, it returns _OK,
267 * If it's interrupted, it returns _INTERRUPTED,
268 * otherwise, it returns _ERROR.
269 */
4164020e
SM
270static enum lttng_live_viewer_status
271lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
7cdc2bab 272{
4164020e
SM
273 enum lttng_live_viewer_status status;
274 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
275 bt_self_component *self_comp = viewer_connection->self_comp;
276 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
277 BT_SOCKET sock = viewer_connection->control_sock;
278 size_t to_send = len;
279 ssize_t total_sent = 0;
280
281 do {
282 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
283 if (sent == BT_SOCKET_ERROR) {
284 if (bt_socket_interrupted()) {
285 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
286 /*
287 * This interruption was a SIGINT and
288 * the graph is being teared down.
289 */
290 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
291 lttng_live_msg_iter->was_interrupted = true;
292 goto end;
293 } else {
294 /*
295 * A signal was received, but the graph
296 * is not being teared down. Carry on.
297 */
298 continue;
299 }
300 } else {
301 /*
302 * For any other types of socket error, close
303 * the socket and return an error.
304 */
305 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
306 self_comp, self_comp_class, "Error sending to Relay", ".");
307
308 viewer_connection_close_socket(viewer_connection);
309 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
310 goto end;
311 }
312 }
313
314 BT_ASSERT(sent <= to_send);
315 total_sent += sent;
316 to_send -= sent;
317
318 } while (to_send > 0);
319
320 BT_ASSERT(total_sent == len);
321 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
322
323end:
4164020e 324 return status;
7cdc2bab
MD
325}
326
4164020e 327static int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 328{
4164020e
SM
329 char error_buf[256] = {0};
330 bt_self_component *self_comp = viewer_connection->self_comp;
331 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
dd420a9b 332 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {};
4164020e
SM
333 int ret = -1;
334 const char *path = viewer_connection->url->str;
335
336 if (!path) {
337 goto end;
338 }
339
340 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
341 if (!lttng_live_url_parts.proto) {
342 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
343 "Invalid LTTng live URL format: %s", error_buf);
344 goto end;
345 }
346 viewer_connection->proto = lttng_live_url_parts.proto;
347 lttng_live_url_parts.proto = NULL;
348
349 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
350 lttng_live_url_parts.hostname = NULL;
351
352 if (lttng_live_url_parts.port >= 0) {
353 viewer_connection->port = lttng_live_url_parts.port;
354 } else {
355 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
356 }
357
358 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
359 lttng_live_url_parts.target_hostname = NULL;
360
361 if (lttng_live_url_parts.session_name) {
362 viewer_connection->session_name = lttng_live_url_parts.session_name;
363 lttng_live_url_parts.session_name = NULL;
364 }
365
366 ret = 0;
7cdc2bab
MD
367
368end:
4164020e
SM
369 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
370 return ret;
7cdc2bab
MD
371}
372
4164020e
SM
373static enum lttng_live_viewer_status
374lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab 375{
4164020e
SM
376 struct lttng_viewer_cmd cmd;
377 struct lttng_viewer_connect connect;
378 enum lttng_live_viewer_status status;
379 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
380 bt_self_component *self_comp = viewer_connection->self_comp;
381 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
382 char cmd_buf[cmd_buf_len];
383
384 BT_COMP_OR_COMP_CLASS_LOGD(
385 self_comp, self_comp_class,
386 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
387 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
388
389 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
390 cmd.data_size = htobe64((uint64_t) sizeof(connect));
391 cmd.cmd_version = htobe32(0);
392
393 connect.viewer_session_id = -1ULL; /* will be set on recv */
394 connect.major = htobe32(LTTNG_LIVE_MAJOR);
395 connect.minor = htobe32(LTTNG_LIVE_MINOR);
396 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
397
398 /*
399 * Merge the cmd and connection request to prevent a write-write
400 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
401 * second write to be performed quickly in presence of Nagle's algorithm
402 */
403 memcpy(cmd_buf, &cmd, sizeof(cmd));
404 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
405
406 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
407 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
408 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
409 goto end;
410 }
411
412 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
413 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
414 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
415 goto end;
416 }
417
418 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
419 (uint64_t) be64toh(connect.viewer_session_id));
420 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
421 be32toh(connect.major), be32toh(connect.minor));
422
423 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
424 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
425 "Incompatible lttng-relayd protocol");
426 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
427 goto end;
428 }
429 /* Use the smallest protocol version implemented. */
430 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
431 viewer_connection->minor = be32toh(connect.minor);
432 } else {
433 viewer_connection->minor = LTTNG_LIVE_MINOR;
434 }
435 viewer_connection->major = LTTNG_LIVE_MAJOR;
436
437 status = LTTNG_LIVE_VIEWER_STATUS_OK;
438
439 goto end;
f79c2d7a
FD
440
441end:
4164020e 442 return status;
7cdc2bab
MD
443}
444
4164020e
SM
445static enum lttng_live_viewer_status
446lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 447{
4164020e
SM
448 struct hostent *host;
449 struct sockaddr_in server_addr;
450 enum lttng_live_viewer_status status;
451 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
452 bt_self_component *self_comp = viewer_connection->self_comp;
453
454 if (parse_url(viewer_connection)) {
455 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
456 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
457 goto error;
458 }
459
460 BT_COMP_OR_COMP_CLASS_LOGD(
461 self_comp, self_comp_class,
462 "Connecting to hostname : %s, port : %d, "
463 "target hostname : %s, session name : %s, proto : %s",
464 viewer_connection->relay_hostname->str, viewer_connection->port,
465 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
466 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
467 viewer_connection->proto->str);
468
469 host = gethostbyname(viewer_connection->relay_hostname->str);
470 if (!host) {
471 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
472 "Cannot lookup hostname: hostname=\"%s\"",
473 viewer_connection->relay_hostname->str);
474 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
475 goto error;
476 }
477
478 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
479 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
480 "Socket creation failed: %s", bt_socket_errormsg());
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 server_addr.sin_family = AF_INET;
486 server_addr.sin_port = htons(viewer_connection->port);
487 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
488 memset(&(server_addr.sin_zero), 0, 8);
489
490 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
491 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
492 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
493 bt_socket_errormsg());
494 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
495 goto error;
496 }
497
498 status = lttng_live_handshake(viewer_connection);
499
500 /*
501 * Only print error and append cause in case of error. not in case of
502 * interruption.
503 */
504 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
505 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
506 "Viewer handshake failed");
507 goto error;
508 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
509 goto end;
510 }
511
512 goto end;
7cdc2bab
MD
513
514error:
4164020e
SM
515 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
516 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
517 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
518 bt_socket_errormsg());
519 }
520 }
521 viewer_connection->control_sock = BT_INVALID_SOCKET;
f79c2d7a 522end:
4164020e 523 return status;
7cdc2bab
MD
524}
525
4164020e 526static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 527{
4164020e
SM
528 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
529 bt_self_component *self_comp = viewer_connection->self_comp;
530
531 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
532 return;
533 }
534 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
535 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
536 bt_socket_errormsg());
537 viewer_connection->control_sock = BT_INVALID_SOCKET;
538 }
7cdc2bab
MD
539}
540
4164020e
SM
541static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
542 bool *_found, struct live_viewer_connection *viewer_connection)
7cdc2bab 543{
4164020e
SM
544 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
545 bt_self_component *self_comp = viewer_connection->self_comp;
546 int ret = 0;
547 uint64_t i, len;
548 bt_value *map = NULL;
549 bt_value *hostname = NULL;
550 bt_value *session_name = NULL;
551 bt_value *btval = NULL;
552 bool found = false;
553
554 len = bt_value_array_get_length(results);
555 for (i = 0; i < len; i++) {
556 const char *hostname_str = NULL;
557 const char *session_name_str = NULL;
558
559 map = bt_value_array_borrow_element_by_index(results, i);
560 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
561 if (!hostname) {
562 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
563 "Error borrowing \"target-hostname\" entry.");
564 ret = -1;
565 goto end;
566 }
567 session_name = bt_value_map_borrow_entry_value(map, "session-name");
568 if (!session_name) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"session-name\" entry.");
571 ret = -1;
572 goto end;
573 }
574 hostname_str = bt_value_string_get(hostname);
575 session_name_str = bt_value_string_get(session_name);
576
577 if (strcmp(session->hostname, hostname_str) == 0 &&
578 strcmp(session->session_name, session_name_str) == 0) {
579 int64_t val;
580 uint32_t streams = be32toh(session->streams);
581 uint32_t clients = be32toh(session->clients);
582
583 found = true;
584
585 btval = bt_value_map_borrow_entry_value(map, "stream-count");
586 if (!btval) {
587 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
588 "Error borrowing \"stream-count\" entry.");
589 ret = -1;
590 goto end;
591 }
592 val = bt_value_integer_unsigned_get(btval);
593 /* sum */
594 val += streams;
595 bt_value_integer_unsigned_set(btval, val);
596
597 btval = bt_value_map_borrow_entry_value(map, "client-count");
598 if (!btval) {
599 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
600 "Error borrowing \"client-count\" entry.");
601 ret = -1;
602 goto end;
603 }
604 val = bt_value_integer_unsigned_get(btval);
605 /* max */
606 val = bt_max_t(int64_t, clients, val);
607 bt_value_integer_unsigned_set(btval, val);
608 }
609
610 if (found) {
611 break;
612 }
613 }
7cdc2bab 614end:
4164020e
SM
615 *_found = found;
616 return ret;
7cdc2bab
MD
617}
618
4164020e
SM
619static int list_append_session(bt_value *results, GString *base_url,
620 const struct lttng_viewer_session *session,
621 struct live_viewer_connection *viewer_connection)
7cdc2bab 622{
4164020e
SM
623 int ret = 0;
624 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
625 bt_value_map_insert_entry_status insert_status;
626 bt_value_array_append_element_status append_status;
627 bt_value *map = NULL;
628 GString *url = NULL;
629 bool found = false;
630
631 /*
632 * If the session already exists, add the stream count to it,
633 * and do max of client counts.
634 */
635 ret = list_update_session(results, session, &found, viewer_connection);
636 if (ret || found) {
637 goto end;
638 }
639
640 map = bt_value_map_create();
641 if (!map) {
642 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
643 ret = -1;
644 goto end;
645 }
646
647 if (base_url->len < 1) {
648 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
649 ret = -1;
650 goto end;
651 }
652 /*
653 * key = "url",
654 * value = <string>,
655 */
656 url = g_string_new(base_url->str);
657 g_string_append(url, "/host/");
658 g_string_append(url, session->hostname);
659 g_string_append_c(url, '/');
660 g_string_append(url, session->session_name);
661
662 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
663 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
664 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
665 ret = -1;
666 goto end;
667 }
668
669 /*
670 * key = "target-hostname",
671 * value = <string>,
672 */
673 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
674 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
675 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
676 "Error inserting \"target-hostname\" entry.");
677 ret = -1;
678 goto end;
679 }
680
681 /*
682 * key = "session-name",
683 * value = <string>,
684 */
685 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
686 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
687 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
688 ret = -1;
689 goto end;
690 }
691
692 /*
693 * key = "timer-us",
694 * value = <integer>,
695 */
696 {
697 uint32_t live_timer = be32toh(session->live_timer);
698
699 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
700 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
701 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
702 ret = -1;
703 goto end;
704 }
705 }
706
707 /*
708 * key = "stream-count",
709 * value = <integer>,
710 */
711 {
712 uint32_t streams = be32toh(session->streams);
713
714 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
715 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
716 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
717 "Error inserting \"stream-count\" entry.");
718 ret = -1;
719 goto end;
720 }
721 }
722
723 /*
724 * key = "client-count",
725 * value = <integer>,
726 */
727 {
728 uint32_t clients = be32toh(session->clients);
729
730 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
731 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
732 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
733 "Error inserting \"client-count\" entry.");
734 ret = -1;
735 goto end;
736 }
737 }
738
739 append_status = bt_value_array_append_element(results, map);
740 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
741 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
742 ret = -1;
743 }
14f28187 744
7cdc2bab 745end:
4164020e
SM
746 if (url) {
747 g_string_free(url, true);
748 }
749 BT_VALUE_PUT_REF_AND_RESET(map);
750 return ret;
7cdc2bab
MD
751}
752
753/*
754 * Data structure returned:
755 *
756 * {
757 * <array> = {
758 * [n] = {
759 * <map> = {
760 * {
761 * key = "url",
762 * value = <string>,
763 * },
764 * {
765 * key = "target-hostname",
766 * value = <string>,
767 * },
768 * {
769 * key = "session-name",
770 * value = <string>,
771 * },
772 * {
773 * key = "timer-us",
774 * value = <integer>,
775 * },
776 * {
777 * key = "stream-count",
778 * value = <integer>,
779 * },
780 * {
781 * key = "client-count",
782 * value = <integer>,
783 * },
784 * },
785 * }
786 * }
787 */
788
4164020e
SM
789bt_component_class_query_method_status
790live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
791 const bt_value **user_result)
7cdc2bab 792{
4164020e
SM
793 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
794 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
795 bt_value *result = NULL;
796 enum lttng_live_viewer_status viewer_status;
797 struct lttng_viewer_cmd cmd;
798 struct lttng_viewer_list_sessions list;
799 uint32_t i, sessions_count;
800
801 result = bt_value_array_create();
802 if (!result) {
803 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
804 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
805 goto error;
806 }
807
808 BT_LOGD("Requesting list of sessions: cmd=%s",
809 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
810
811 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
812 cmd.data_size = htobe64((uint64_t) 0);
813 cmd.cmd_version = htobe32(0);
814
815 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
816 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
817 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
818 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
819 goto error;
820 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
821 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
822 goto error;
823 }
824
825 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
826 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
827 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
828 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
829 goto error;
830 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
831 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
832 goto error;
833 }
834
835 sessions_count = be32toh(list.sessions_count);
836 for (i = 0; i < sessions_count; i++) {
837 struct lttng_viewer_session lsession;
838
839 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
840 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
841 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
842 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
843 goto error;
844 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
845 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
846 goto error;
847 }
848
849 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
850 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
851 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
852 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
853 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
854 goto error;
855 }
856 }
857
858 *user_result = result;
859 goto end;
7cdc2bab 860error:
4164020e 861 BT_VALUE_PUT_REF_AND_RESET(result);
7cdc2bab 862end:
4164020e 863 return status;
7cdc2bab
MD
864}
865
4164020e
SM
866static enum lttng_live_viewer_status
867lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 868{
4164020e
SM
869 struct lttng_viewer_cmd cmd;
870 struct lttng_viewer_list_sessions list;
871 struct lttng_viewer_session lsession;
872 uint32_t i, sessions_count;
873 uint64_t session_id;
874 enum lttng_live_viewer_status status;
875 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
876 bt_self_component *self_comp = viewer_connection->self_comp;
877 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
878
879 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
880 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
881
882 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
883 cmd.data_size = htobe64((uint64_t) 0);
884 cmd.cmd_version = htobe32(0);
885
886 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
887 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
888 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
889 goto end;
890 }
891
892 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
893 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
894 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
895 goto end;
896 }
897
898 sessions_count = be32toh(list.sessions_count);
899 for (i = 0; i < sessions_count; i++) {
900 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
901 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
902 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
903 goto end;
904 }
905 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
906 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
907 session_id = be64toh(lsession.id);
908
909 BT_COMP_LOGI("Adding session to internal list: "
910 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
911 session_id, lsession.hostname, lsession.session_name);
912
913 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
914 LTTNG_VIEWER_NAME_MAX) == 0) &&
915 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
916 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
917 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
918 lsession.session_name)) {
919 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
920 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
921 goto end;
922 }
923 }
924 }
925
926 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 927
f79c2d7a 928end:
4164020e 929 return status;
7cdc2bab
MD
930}
931
4164020e
SM
932enum lttng_live_viewer_status
933lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 934{
4164020e
SM
935 struct lttng_viewer_cmd cmd;
936 struct lttng_viewer_create_session_response resp;
937 enum lttng_live_viewer_status status;
938 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
939 bt_self_component *self_comp = viewer_connection->self_comp;
940 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
941
942 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
943 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
944
945 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
946 cmd.data_size = htobe64((uint64_t) 0);
947 cmd.cmd_version = htobe32(0);
948
949 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
950 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
951 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
952 goto end;
953 }
954
955 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
956 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
957 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
958 goto end;
959 }
960
961 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
962 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
963 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
964 goto end;
965 }
966
967 status = lttng_live_query_session_ids(lttng_live_msg_iter);
968 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
969 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
970 goto end;
971 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
972 goto end;
973 }
7cdc2bab 974
f79c2d7a 975end:
4164020e 976 return status;
7cdc2bab
MD
977}
978
4164020e
SM
979static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
980 uint32_t stream_count,
981 bt_self_message_iterator *self_msg_iter)
7cdc2bab 982{
4164020e
SM
983 uint32_t i;
984 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
985 enum lttng_live_viewer_status status;
986 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
987 bt_self_component *self_comp = viewer_connection->self_comp;
988
989 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
990 for (i = 0; i < stream_count; i++) {
991 struct lttng_viewer_stream stream;
992 struct lttng_live_stream_iterator *live_stream;
993 uint64_t stream_id;
994 uint64_t ctf_trace_id;
995
996 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
997 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
998 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
999 goto end;
1000 }
1001 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1002 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1003 stream_id = be64toh(stream.id);
1004 ctf_trace_id = be64toh(stream.ctf_trace_id);
1005
1006 if (stream.metadata_flag) {
1007 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1008 stream.channel_name);
44bd6303 1009 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id)) {
4164020e
SM
1010 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1011 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1012 goto end;
1013 }
1014 session->lazy_stream_msg_init = true;
1015 } else {
1016 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1017 stream.channel_name);
1018 live_stream =
1019 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1020 if (!live_stream) {
1021 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1022 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1023 goto end;
1024 }
1025 }
1026 }
1027 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1028
f79c2d7a 1029end:
4164020e 1030 return status;
7cdc2bab
MD
1031}
1032
4164020e
SM
1033enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1034 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1035{
4164020e
SM
1036 struct lttng_viewer_cmd cmd;
1037 enum lttng_live_viewer_status status;
1038 struct lttng_viewer_attach_session_request rq;
1039 struct lttng_viewer_attach_session_response rp;
1040 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1041 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1042 bt_self_component *self_comp = viewer_connection->self_comp;
1043 uint64_t session_id = session->id;
1044 uint32_t streams_count;
1045 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1046 char cmd_buf[cmd_buf_len];
1047
1048 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1049 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1050 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1051
1052 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1053 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1054 cmd.cmd_version = htobe32(0);
1055
1056 memset(&rq, 0, sizeof(rq));
1057 rq.session_id = htobe64(session_id);
1058 // TODO: add cmd line parameter to select seek beginning
1059 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1060 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1061
1062 /*
1063 * Merge the cmd and connection request to prevent a write-write
1064 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1065 * second write to be performed quickly in presence of Nagle's algorithm.
1066 */
1067 memcpy(cmd_buf, &cmd, sizeof(cmd));
1068 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1069 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1070 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1071 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1072 goto end;
1073 }
1074
1075 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1076 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1077 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1078 goto end;
1079 }
1080
1081 streams_count = be32toh(rp.streams_count);
1082 switch (be32toh(rp.status)) {
1083 case LTTNG_VIEWER_ATTACH_OK:
1084 break;
1085 case LTTNG_VIEWER_ATTACH_UNK:
1086 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1087 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1088 goto end;
1089 case LTTNG_VIEWER_ATTACH_ALREADY:
1090 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1091 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1092 goto end;
1093 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1094 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1095 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1096 goto end;
1097 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1098 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1099 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1100 goto end;
1101 default:
1102 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1103 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1104 goto end;
1105 }
1106
1107 /* We receive the initial list of streams. */
1108 status = receive_streams(session, streams_count, self_msg_iter);
1109 switch (status) {
1110 case LTTNG_LIVE_VIEWER_STATUS_OK:
1111 break;
1112 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1113 goto end;
1114 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1115 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1116 goto end;
1117 default:
1118 bt_common_abort();
1119 }
1120
1121 session->attached = true;
1122 session->new_streams_needed = false;
7cdc2bab 1123
eee8e741 1124end:
4164020e 1125 return status;
7cdc2bab
MD
1126}
1127
4164020e 1128enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
7cdc2bab 1129{
4164020e
SM
1130 struct lttng_viewer_cmd cmd;
1131 enum lttng_live_viewer_status status;
1132 struct lttng_viewer_detach_session_request rq;
1133 struct lttng_viewer_detach_session_response rp;
1134 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1135 bt_self_component *self_comp = session->self_comp;
1136 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1137 uint64_t session_id = session->id;
1138 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1139 char cmd_buf[cmd_buf_len];
1140
1141 /*
1142 * The session might already be detached and the viewer socket might
1143 * already been closed. This happens when calling this function when
1144 * tearing down the graph after an error.
1145 */
1146 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1147 return LTTNG_LIVE_VIEWER_STATUS_OK;
1148 }
1149
1150 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1151 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1152
1153 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1154 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1155 cmd.cmd_version = htobe32(0);
1156
1157 memset(&rq, 0, sizeof(rq));
1158 rq.session_id = htobe64(session_id);
1159
1160 /*
1161 * Merge the cmd and connection request to prevent a write-write
1162 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1163 * second write to be performed quickly in presence of Nagle's algorithm.
1164 */
1165 memcpy(cmd_buf, &cmd, sizeof(cmd));
1166 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1167 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1168 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1169 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1170 goto end;
1171 }
1172
1173 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1174 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1175 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1176 goto end;
1177 }
1178
1179 switch (be32toh(rp.status)) {
1180 case LTTNG_VIEWER_DETACH_SESSION_OK:
1181 break;
1182 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1183 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1184 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1185 goto end;
1186 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1187 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1188 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1189 goto end;
1190 default:
1191 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1192 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1193 goto end;
1194 }
1195
1196 session->attached = false;
1197
1198 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1199
f79c2d7a 1200end:
4164020e 1201 return status;
7cdc2bab
MD
1202}
1203
4164020e
SM
1204enum lttng_live_get_one_metadata_status
1205lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
7cdc2bab 1206{
4164020e
SM
1207 uint64_t len = 0;
1208 enum lttng_live_get_one_metadata_status status;
1209 enum lttng_live_viewer_status viewer_status;
1210 struct lttng_viewer_cmd cmd;
1211 struct lttng_viewer_get_metadata rq;
1212 struct lttng_viewer_metadata_packet rp;
1213 gchar *data = NULL;
1214 ssize_t writelen;
1215 struct lttng_live_session *session = trace->session;
1216 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1217 struct lttng_live_metadata *metadata = trace->metadata;
1218 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1219 bt_self_component *self_comp = viewer_connection->self_comp;
1220 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1221 char cmd_buf[cmd_buf_len];
1222
1223 BT_COMP_LOGD("Requesting new metadata for trace:"
1224 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1225 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1226 metadata->stream_id);
1227
1228 rq.stream_id = htobe64(metadata->stream_id);
1229 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1230 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1231 cmd.cmd_version = htobe32(0);
1232
1233 /*
1234 * Merge the cmd and connection request to prevent a write-write
1235 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1236 * second write to be performed quickly in presence of Nagle's algorithm.
1237 */
1238 memcpy(cmd_buf, &cmd, sizeof(cmd));
1239 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1240 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1241 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1242 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1243 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1244 goto end;
1245 }
1246
1247 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1248 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1249 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1250 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1251 goto end;
1252 }
1253
1254 switch (be32toh(rp.status)) {
1255 case LTTNG_VIEWER_METADATA_OK:
1256 BT_COMP_LOGD("Received get_metadata response: ok");
1257 break;
1258 case LTTNG_VIEWER_NO_NEW_METADATA:
1259 BT_COMP_LOGD("Received get_metadata response: no new");
1260 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1261 goto end;
1262 case LTTNG_VIEWER_METADATA_ERR:
1263 /*
1264 * The Relayd cannot find this stream id. Maybe its
1265 * gone already. This can happen in short lived UST app
1266 * in a per-pid session.
1267 */
1268 BT_COMP_LOGD("Received get_metadata response: error");
1269 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1270 goto end;
1271 default:
1272 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1273 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1274 goto end;
1275 }
1276
1277 len = be64toh(rp.len);
c5ce3927
FD
1278 if (len == 0) {
1279 /*
1280 * We received a `LTTNG_VIEWER_METADATA_OK` with a packet
1281 * length of 0. This means we must try again. This scenario
1282 * arises when a clear command is performed on an lttng session.
1283 */
1284 BT_COMP_LOGD(
1285 "Expecting a metadata packet of size 0. Retry to get a packet from the relay.");
1286 goto empty_metadata_packet_retry;
1287 }
1288
4164020e
SM
1289 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1290 if (len <= 0) {
1291 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1292 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1293 goto end;
1294 }
1295
1296 data = g_new0(gchar, len);
1297 if (!data) {
1298 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1299 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1300 goto end;
1301 }
1302
1303 viewer_status = lttng_live_recv(viewer_connection, data, len);
1304 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1305 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1306 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1307 goto end;
1308 }
1309
1310 /*
1311 * Write the metadata to the file handle.
1312 */
1313 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1314 if (writelen != len) {
1315 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1316 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1317 goto end;
1318 }
1319
c5ce3927 1320empty_metadata_packet_retry:
4164020e
SM
1321 *reply_len = len;
1322 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
c28512ab 1323
c28512ab 1324end:
4164020e
SM
1325 g_free(data);
1326 return status;
7cdc2bab
MD
1327}
1328
1329/*
1330 * Assign the fields from a lttng_viewer_index to a packet_index.
1331 */
4164020e
SM
1332static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1333 struct packet_index *pindex)
7cdc2bab 1334{
4164020e
SM
1335 BT_ASSERT(lindex);
1336 BT_ASSERT(pindex);
1337
1338 pindex->offset = be64toh(lindex->offset);
1339 pindex->packet_size = be64toh(lindex->packet_size);
1340 pindex->content_size = be64toh(lindex->content_size);
1341 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1342 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1343 pindex->events_discarded = be64toh(lindex->events_discarded);
7cdc2bab
MD
1344}
1345
4164020e 1346static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
36e94ad6 1347{
4164020e
SM
1348 uint64_t session_idx;
1349 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1350
1351 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1352 struct lttng_live_session *session =
1353 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1354 BT_COMP_LOGD("Marking session as needing new streams: "
1355 "session-id=%" PRIu64,
1356 session->id);
1357 session->new_streams_needed = true;
1358 }
36e94ad6
FD
1359}
1360
4164020e
SM
1361enum lttng_live_iterator_status
1362lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1363 struct lttng_live_stream_iterator *stream, struct packet_index *index)
7cdc2bab 1364{
4164020e
SM
1365 struct lttng_viewer_cmd cmd;
1366 struct lttng_viewer_get_next_index rq;
1367 enum lttng_live_viewer_status viewer_status;
1368 struct lttng_viewer_index rp;
1369 enum lttng_live_iterator_status status;
1370 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1371 bt_self_component *self_comp = viewer_connection->self_comp;
1372 struct lttng_live_trace *trace = stream->trace;
1373 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1374 char cmd_buf[cmd_buf_len];
1375 uint32_t flags, rp_status;
1376
1377 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1378 "viewer-stream-id=%" PRIu64,
1379 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1380 stream->viewer_stream_id);
1381 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1382 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1383 cmd.cmd_version = htobe32(0);
1384
1385 memset(&rq, 0, sizeof(rq));
1386 rq.stream_id = htobe64(stream->viewer_stream_id);
1387
1388 /*
1389 * Merge the cmd and connection request to prevent a write-write
1390 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1391 * second write to be performed quickly in presence of Nagle's algorithm.
1392 */
1393 memcpy(cmd_buf, &cmd, sizeof(cmd));
1394 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1395
1396 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1397 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1398 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1399 goto error;
1400 }
1401
1402 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1403 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1404 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1405 goto error;
1406 }
1407
1408 flags = be32toh(rp.flags);
1409 rp_status = be32toh(rp.status);
1410
1411 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1412 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1413 lttng_viewer_next_index_return_code_string(rp_status));
ca622546
JG
1414
1415 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1416 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1417 "response=%s, response-flag=NEW_STREAM",
1418 lttng_viewer_next_index_return_code_string(rp_status));
1419 lttng_live_need_new_streams(lttng_live_msg_iter);
1420 }
1421
4164020e
SM
1422 switch (rp_status) {
1423 case LTTNG_VIEWER_INDEX_INACTIVE:
1424 {
1425 uint64_t ctf_stream_class_id;
1426
1427 memset(index, 0, sizeof(struct packet_index));
1428 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1429 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1430 ctf_stream_class_id = be64toh(rp.stream_id);
1431 if (stream->ctf_stream_class_id.is_set) {
1432 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1433 } else {
1434 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1435 stream->ctf_stream_class_id.is_set = true;
1436 }
1437 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1438 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1439 break;
1440 }
1441 case LTTNG_VIEWER_INDEX_OK:
1442 {
1443 uint64_t ctf_stream_class_id;
1444
1445 lttng_index_to_packet_index(&rp, index);
1446 ctf_stream_class_id = be64toh(rp.stream_id);
1447 if (stream->ctf_stream_class_id.is_set) {
1448 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1449 } else {
1450 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1451 stream->ctf_stream_class_id.is_set = true;
1452 }
4164020e
SM
1453 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1454
1455 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1456 BT_COMP_LOGD("Marking trace as needing new metadata: "
1457 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1458 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1459 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1460 }
4164020e
SM
1461 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1462 break;
1463 }
1464 case LTTNG_VIEWER_INDEX_RETRY:
1465 memset(index, 0, sizeof(struct packet_index));
1466 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1467 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1468 goto end;
1469 case LTTNG_VIEWER_INDEX_HUP:
1470 memset(index, 0, sizeof(struct packet_index));
1471 index->offset = EOF;
1472 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1473 stream->has_stream_hung_up = true;
1474 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1475 break;
1476 case LTTNG_VIEWER_INDEX_ERR:
1477 memset(index, 0, sizeof(struct packet_index));
1478 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1479 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1480 goto end;
1481 default:
1482 BT_COMP_LOGD("Received get_next_index response: unknown value");
1483 memset(index, 0, sizeof(struct packet_index));
1484 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1485 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1486 goto end;
1487 }
ca622546 1488
4164020e 1489 goto end;
7cdc2bab
MD
1490
1491error:
4164020e 1492 status = viewer_status_to_live_iterator_status(viewer_status);
f79c2d7a 1493end:
4164020e 1494 return status;
7cdc2bab
MD
1495}
1496
4164020e
SM
1497enum ctf_msg_iter_medium_status
1498lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1499 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1500 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1501{
4164020e
SM
1502 enum ctf_msg_iter_medium_status status;
1503 enum lttng_live_viewer_status viewer_status;
1504 struct lttng_viewer_trace_packet rp;
1505 struct lttng_viewer_cmd cmd;
1506 struct lttng_viewer_get_packet rq;
1507 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1508 bt_self_component *self_comp = viewer_connection->self_comp;
1509 struct lttng_live_trace *trace = stream->trace;
1510 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1511 char cmd_buf[cmd_buf_len];
1512 uint32_t flags, rp_status;
1513
1514 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1515 "offset=%" PRIu64 ", request-len=%" PRIu64,
1516 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1517
1518 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1519 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1520 cmd.cmd_version = htobe32(0);
1521
1522 memset(&rq, 0, sizeof(rq));
1523 rq.stream_id = htobe64(stream->viewer_stream_id);
1524 rq.offset = htobe64(offset);
1525 rq.len = htobe32(req_len);
1526
1527 /*
1528 * Merge the cmd and connection request to prevent a write-write
1529 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1530 * second write to be performed quickly in presence of Nagle's algorithm.
1531 */
1532 memcpy(cmd_buf, &cmd, sizeof(cmd));
1533 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1534
1535 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1536 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1537 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1538 goto error_convert_status;
1539 }
1540
1541 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1542 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1543 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1544 goto error_convert_status;
1545 }
1546
1547 flags = be32toh(rp.flags);
1548 rp_status = be32toh(rp.status);
1549
1550 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1551 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1552 lttng_viewer_get_packet_return_code_string(rp_status));
1553 switch (rp_status) {
1554 case LTTNG_VIEWER_GET_PACKET_OK:
1555 req_len = be32toh(rp.len);
1556 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1557 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
4164020e
SM
1558 break;
1559 case LTTNG_VIEWER_GET_PACKET_RETRY:
1560 /* Unimplemented by relay daemon */
1561 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1562 goto end;
1563 case LTTNG_VIEWER_GET_PACKET_ERR:
1564 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1565 BT_COMP_LOGD("Marking trace as needing new metadata: "
1566 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1567 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1568 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1569 }
1570 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1571 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1572 "response=%s, response-flag=NEW_STREAM",
1573 lttng_viewer_next_index_return_code_string(rp_status));
1574 lttng_live_need_new_streams(lttng_live_msg_iter);
1575 }
1576 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1577 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1578 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1579 lttng_viewer_get_packet_return_code_string(rp_status));
1580 goto end;
1581 }
1582 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1583 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1584 goto end;
1585 case LTTNG_VIEWER_GET_PACKET_EOF:
1586 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1587 goto end;
1588 default:
1589 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1590 rp_status);
1591 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1592 goto end;
1593 }
1594
1595 if (req_len == 0) {
1596 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1597 goto end;
1598 }
1599
1600 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1601 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1602 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1603 goto error_convert_status;
1604 }
1605 *recv_len = req_len;
1606
1607 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1608 goto end;
f79c2d7a 1609
535fb48a 1610error_convert_status:
4164020e 1611 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
f79c2d7a 1612end:
4164020e 1613 return status;
7cdc2bab
MD
1614}
1615
1616/*
1617 * Request new streams for a session.
1618 */
4164020e
SM
1619enum lttng_live_iterator_status
1620lttng_live_session_get_new_streams(struct lttng_live_session *session,
1621 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1622{
4164020e
SM
1623 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1624 struct lttng_viewer_cmd cmd;
1625 struct lttng_viewer_new_streams_request rq;
1626 struct lttng_viewer_new_streams_response rp;
1627 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1628 enum lttng_live_viewer_status viewer_status;
1629 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1630 bt_self_component *self_comp = viewer_connection->self_comp;
1631 uint32_t streams_count;
1632 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1633 char cmd_buf[cmd_buf_len];
1634
1635 if (!session->new_streams_needed) {
1636 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1637 goto end;
1638 }
1639
1640 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1641 "session-id=%" PRIu64,
1642 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1643
1644 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1645 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1646 cmd.cmd_version = htobe32(0);
1647
1648 memset(&rq, 0, sizeof(rq));
1649 rq.session_id = htobe64(session->id);
1650
1651 /*
1652 * Merge the cmd and connection request to prevent a write-write
1653 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1654 * second write to be performed quickly in presence of Nagle's algorithm.
1655 */
1656 memcpy(cmd_buf, &cmd, sizeof(cmd));
1657 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1658
1659 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1660 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1661 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1662 status = viewer_status_to_live_iterator_status(viewer_status);
1663 goto end;
1664 }
1665
1666 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1667 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1668 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1669 status = viewer_status_to_live_iterator_status(viewer_status);
1670 goto end;
1671 }
1672
1673 streams_count = be32toh(rp.streams_count);
1674
1675 switch (be32toh(rp.status)) {
1676 case LTTNG_VIEWER_NEW_STREAMS_OK:
1677 session->new_streams_needed = false;
1678 break;
1679 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1680 session->new_streams_needed = false;
1681 goto end;
1682 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1683 session->new_streams_needed = false;
1684 session->closed = true;
1685 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1686 goto end;
1687 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1688 BT_COMP_LOGD("Received get_new_streams response: error");
1689 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1690 goto end;
1691 default:
1692 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1693 "Received get_new_streams response: Unknown:"
1694 "return code %u",
1695 be32toh(rp.status));
1696 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1697 goto end;
1698 }
1699
1700 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1701 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1702 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1703 status = viewer_status_to_live_iterator_status(viewer_status);
1704 goto end;
1705 }
1706
1707 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
f79c2d7a 1708end:
4164020e 1709 return status;
7cdc2bab
MD
1710}
1711
f79c2d7a 1712enum lttng_live_viewer_status live_viewer_connection_create(
4164020e
SM
1713 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1714 bt_logging_level log_level, const char *url, bool in_query,
1715 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
7cdc2bab 1716{
4164020e
SM
1717 enum lttng_live_viewer_status status;
1718
afb0f12b 1719 live_viewer_connection *viewer_connection = new live_viewer_connection;
4164020e
SM
1720
1721 if (bt_socket_init(log_level) != 0) {
1722 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1723 "Failed to init socket");
1724 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1725 goto error;
1726 }
1727
1728 viewer_connection->log_level = log_level;
1729
1730 viewer_connection->self_comp = self_comp;
1731 viewer_connection->self_comp_class = self_comp_class;
1732
1733 viewer_connection->control_sock = BT_INVALID_SOCKET;
1734 viewer_connection->port = -1;
1735 viewer_connection->in_query = in_query;
1736 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1737 viewer_connection->url = g_string_new(url);
1738 if (!viewer_connection->url) {
1739 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1740 "Failed to allocate URL buffer");
1741 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1742 goto error;
1743 }
1744
1745 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1746 "Establishing connection to url \"%s\"...", url);
1747 status = lttng_live_connect_viewer(viewer_connection);
1748 /*
1749 * Only print error and append cause in case of error. not in case of
1750 * interruption.
1751 */
1752 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1753 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1754 "Failed to establish connection: "
1755 "url=\"%s\"",
1756 url);
1757 goto error;
1758 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1759 goto error;
1760 }
1761 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1762 "Connection to url \"%s\" is established", url);
1763
1764 *viewer = viewer_connection;
1765 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1766 goto end;
7cdc2bab 1767
7cdc2bab 1768error:
4164020e
SM
1769 if (viewer_connection) {
1770 live_viewer_connection_destroy(viewer_connection);
1771 }
f79c2d7a 1772end:
4164020e 1773 return status;
7cdc2bab
MD
1774}
1775
4164020e 1776void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
7cdc2bab 1777{
4164020e
SM
1778 bt_self_component *self_comp = viewer_connection->self_comp;
1779 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
b9e6ec43 1780
4164020e
SM
1781 if (!viewer_connection) {
1782 goto end;
1783 }
b9e6ec43 1784
4164020e
SM
1785 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1786 "Closing connection to relay:"
1787 "relay-url=\"%s\"",
1788 viewer_connection->url->str);
ecb4ba8a 1789
4164020e 1790 lttng_live_disconnect_viewer(viewer_connection);
b9e6ec43 1791
4164020e
SM
1792 if (viewer_connection->url) {
1793 g_string_free(viewer_connection->url, true);
1794 }
b9e6ec43 1795
4164020e
SM
1796 if (viewer_connection->relay_hostname) {
1797 g_string_free(viewer_connection->relay_hostname, true);
1798 }
b9e6ec43 1799
4164020e
SM
1800 if (viewer_connection->target_hostname) {
1801 g_string_free(viewer_connection->target_hostname, true);
1802 }
b9e6ec43 1803
4164020e
SM
1804 if (viewer_connection->session_name) {
1805 g_string_free(viewer_connection->session_name, true);
1806 }
b9e6ec43 1807
4164020e
SM
1808 if (viewer_connection->proto) {
1809 g_string_free(viewer_connection->proto, true);
1810 }
ecb4ba8a 1811
afb0f12b 1812 delete viewer_connection;
1cb3cdd7 1813
4164020e 1814 bt_socket_fini();
b9e6ec43
FD
1815
1816end:
4164020e 1817 return;
7cdc2bab 1818}
This page took 0.183075 seconds and 4 git commands to generate.