src.ctf.lttng-live: remove unnecessary assignment in lttng_live_get_stream_bytes
[babeltrace.git] / src / plugins / ctf / lttng-live / viewer-connection.cpp
CommitLineData
7cdc2bab 1/*
0235b0db 2 * SPDX-License-Identifier: MIT
7cdc2bab 3 *
0235b0db
MJ
4 * Copyright 2019 Francis Deslauriers <francis.deslauriers@efficios.com>
5 * Copyright 2016 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7cdc2bab
MD
6 */
7
2ece7dd0 8#define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp)
4164020e
SM
9#define BT_LOG_OUTPUT_LEVEL (viewer_connection->log_level)
10#define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER"
d9c39b0a 11#include "logging/comp-logging.h"
020bc26f 12
3c22a242
FD
13#include <fcntl.h>
14#include <stdbool.h>
7cdc2bab 15#include <stdint.h>
3c22a242 16#include <stdio.h>
7cdc2bab 17#include <stdlib.h>
3c22a242 18#include <sys/types.h>
7cdc2bab 19#include <unistd.h>
3c22a242 20
7cdc2bab 21#include <glib.h>
7cdc2bab 22
578e048b
MJ
23#include "compat/socket.h"
24#include "compat/endian.h"
25#include "compat/compiler.h"
26#include "common/common.h"
3fadfbc0 27#include <babeltrace2/babeltrace.h>
7cdc2bab 28
087cd0f5
SM
29#include "lttng-live.hpp"
30#include "viewer-connection.hpp"
31#include "lttng-viewer-abi.hpp"
32#include "data-stream.hpp"
33#include "metadata.hpp"
7cdc2bab 34
4164020e
SM
35#define viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, _action, _msg_str) \
36 do { \
37 switch (_status) { \
38 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \
39 break; \
40 case LTTNG_LIVE_VIEWER_STATUS_ERROR: \
41 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \
42 "Error " _action " " _msg_str); \
43 break; \
44 default: \
45 bt_common_abort(); \
46 } \
47 } while (0)
48
49#define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
50 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "sending", _msg_str)
51
52#define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
53 viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, "receiving", _msg_str)
54
55#define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, _self_comp_class, _msg, \
56 _fmt, ...) \
57 do { \
58 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, _msg ": %s" _fmt, \
59 bt_socket_errormsg(), ##__VA_ARGS__); \
60 } while (0)
61
62static const char *lttng_viewer_command_string(enum lttng_viewer_command cmd)
f93afbf9 63{
4164020e
SM
64 switch (cmd) {
65 case LTTNG_VIEWER_CONNECT:
66 return "CONNECT";
67 case LTTNG_VIEWER_LIST_SESSIONS:
68 return "LIST_SESSIONS";
69 case LTTNG_VIEWER_ATTACH_SESSION:
70 return "ATTACH_SESSION";
71 case LTTNG_VIEWER_GET_NEXT_INDEX:
72 return "GET_NEXT_INDEX";
73 case LTTNG_VIEWER_GET_PACKET:
74 return "GET_PACKET";
75 case LTTNG_VIEWER_GET_METADATA:
76 return "GET_METADATA";
77 case LTTNG_VIEWER_GET_NEW_STREAMS:
78 return "GET_NEW_STREAMS";
79 case LTTNG_VIEWER_CREATE_SESSION:
80 return "CREATE_SESSION";
81 case LTTNG_VIEWER_DETACH_SESSION:
82 return "DETACH_SESSION";
83 }
84
85 bt_common_abort();
f93afbf9
FD
86}
87
4164020e
SM
88static const char *
89lttng_viewer_next_index_return_code_string(enum lttng_viewer_next_index_return_code code)
f93afbf9 90{
4164020e
SM
91 switch (code) {
92 case LTTNG_VIEWER_INDEX_OK:
93 return "INDEX_OK";
94 case LTTNG_VIEWER_INDEX_RETRY:
95 return "INDEX_RETRY";
96 case LTTNG_VIEWER_INDEX_HUP:
97 return "INDEX_HUP";
98 case LTTNG_VIEWER_INDEX_ERR:
99 return "INDEX_ERR";
100 case LTTNG_VIEWER_INDEX_INACTIVE:
101 return "INDEX_INACTIVE";
102 case LTTNG_VIEWER_INDEX_EOF:
103 return "INDEX_EOF";
104 }
105
106 bt_common_abort();
f93afbf9
FD
107}
108
4164020e 109static const char *lttng_viewer_next_index_return_code_string(uint32_t code)
087cd0f5 110{
4164020e 111 return lttng_viewer_next_index_return_code_string((lttng_viewer_next_index_return_code) code);
087cd0f5
SM
112}
113
4164020e
SM
114static const char *
115lttng_viewer_get_packet_return_code_string(enum lttng_viewer_get_packet_return_code code)
f93afbf9 116{
4164020e
SM
117 switch (code) {
118 case LTTNG_VIEWER_GET_PACKET_OK:
119 return "GET_PACKET_OK";
120 case LTTNG_VIEWER_GET_PACKET_RETRY:
121 return "GET_PACKET_RETRY";
122 case LTTNG_VIEWER_GET_PACKET_ERR:
123 return "GET_PACKET_ERR";
124 case LTTNG_VIEWER_GET_PACKET_EOF:
125 return "GET_PACKET_EOF";
126 }
127
128 bt_common_abort();
f93afbf9
FD
129};
130
4164020e 131static const char *lttng_viewer_get_packet_return_code_string(uint32_t code)
087cd0f5 132{
4164020e 133 return lttng_viewer_get_packet_return_code_string((lttng_viewer_get_packet_return_code) code);
087cd0f5
SM
134}
135
4164020e 136static const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek)
f93afbf9 137{
4164020e
SM
138 switch (seek) {
139 case LTTNG_VIEWER_SEEK_BEGINNING:
140 return "SEEK_BEGINNING";
141 case LTTNG_VIEWER_SEEK_LAST:
142 return "SEEK_LAST";
143 }
144
145 bt_common_abort();
f93afbf9
FD
146}
147
4164020e
SM
148static inline enum lttng_live_iterator_status
149viewer_status_to_live_iterator_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 150{
4164020e
SM
151 switch (viewer_status) {
152 case LTTNG_LIVE_VIEWER_STATUS_OK:
153 return LTTNG_LIVE_ITERATOR_STATUS_OK;
154 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
155 return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
156 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
157 return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
158 }
159
160 bt_common_abort();
f79c2d7a
FD
161}
162
4164020e
SM
163static inline enum ctf_msg_iter_medium_status
164viewer_status_to_ctf_msg_iter_medium_status(enum lttng_live_viewer_status viewer_status)
f79c2d7a 165{
4164020e
SM
166 switch (viewer_status) {
167 case LTTNG_LIVE_VIEWER_STATUS_OK:
168 return CTF_MSG_ITER_MEDIUM_STATUS_OK;
169 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
170 return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
171 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
172 return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
173 }
174
175 bt_common_abort();
f79c2d7a
FD
176}
177
4164020e 178static inline void viewer_connection_close_socket(struct live_viewer_connection *viewer_connection)
b197ca37 179{
4164020e
SM
180 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
181 bt_self_component *self_comp = viewer_connection->self_comp;
182 int ret = bt_socket_close(viewer_connection->control_sock);
183 if (ret == -1) {
184 BT_COMP_OR_COMP_CLASS_LOGW_ERRNO(self_comp, self_comp_class,
185 "Error closing viewer connection socket: ", ".");
186 }
187
188 viewer_connection->control_sock = BT_INVALID_SOCKET;
b197ca37
FD
189}
190
f79c2d7a
FD
191/*
192 * This function receives a message from the Relay daemon.
193 * If it received the entire message, it returns _OK,
194 * If it's interrupted, it returns _INTERRUPTED,
195 * otherwise, it returns _ERROR.
196 */
4164020e
SM
197static enum lttng_live_viewer_status
198lttng_live_recv(struct live_viewer_connection *viewer_connection, void *buf, size_t len)
7cdc2bab 199{
4164020e
SM
200 ssize_t received;
201 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
202 bt_self_component *self_comp = viewer_connection->self_comp;
203 size_t total_received = 0, to_receive = len;
204 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
205 enum lttng_live_viewer_status status;
206 BT_SOCKET sock = viewer_connection->control_sock;
207
208 /*
209 * Receive a message from the Relay.
210 */
211 do {
212 received = bt_socket_recv(sock, (char *) buf + total_received, to_receive, 0);
213 if (received == BT_SOCKET_ERROR) {
214 if (bt_socket_interrupted()) {
215 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
216 /*
217 * This interruption was due to a
218 * SIGINT and the graph is being torn
219 * down.
220 */
221 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
222 lttng_live_msg_iter->was_interrupted = true;
223 goto end;
224 } else {
225 /*
226 * A signal was received, but the graph
227 * is not being torn down. Carry on.
228 */
229 continue;
230 }
231 } else {
232 /*
233 * For any other types of socket error, close
234 * the socket and return an error.
235 */
236 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
237 self_comp, self_comp_class, "Error receiving from Relay", ".");
238
239 viewer_connection_close_socket(viewer_connection);
240 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
241 goto end;
242 }
243 } else if (received == 0) {
244 /*
245 * The recv() call returned 0. This means the
246 * connection was orderly shutdown from the other peer.
247 * If that happens when we are trying to receive
248 * a message from it, it means something when wrong.
249 * Close the socket and return an error.
250 */
251 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
252 "Remote side has closed connection");
253 viewer_connection_close_socket(viewer_connection);
254 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
255 goto end;
256 }
257
258 BT_ASSERT(received <= to_receive);
259 total_received += received;
260 to_receive -= received;
261
262 } while (to_receive > 0);
263
264 BT_ASSERT(total_received == len);
265 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
266
267end:
4164020e 268 return status;
7cdc2bab
MD
269}
270
f79c2d7a
FD
271/*
272 * This function sends a message to the Relay daemon.
273 * If it send the message, it returns _OK,
274 * If it's interrupted, it returns _INTERRUPTED,
275 * otherwise, it returns _ERROR.
276 */
4164020e
SM
277static enum lttng_live_viewer_status
278lttng_live_send(struct live_viewer_connection *viewer_connection, const void *buf, size_t len)
7cdc2bab 279{
4164020e
SM
280 enum lttng_live_viewer_status status;
281 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
282 bt_self_component *self_comp = viewer_connection->self_comp;
283 struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter;
284 BT_SOCKET sock = viewer_connection->control_sock;
285 size_t to_send = len;
286 ssize_t total_sent = 0;
287
288 do {
289 ssize_t sent = bt_socket_send_nosigpipe(sock, (char *) buf + total_sent, to_send);
290 if (sent == BT_SOCKET_ERROR) {
291 if (bt_socket_interrupted()) {
292 if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
293 /*
294 * This interruption was a SIGINT and
295 * the graph is being teared down.
296 */
297 status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
298 lttng_live_msg_iter->was_interrupted = true;
299 goto end;
300 } else {
301 /*
302 * A signal was received, but the graph
303 * is not being teared down. Carry on.
304 */
305 continue;
306 }
307 } else {
308 /*
309 * For any other types of socket error, close
310 * the socket and return an error.
311 */
312 LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
313 self_comp, self_comp_class, "Error sending to Relay", ".");
314
315 viewer_connection_close_socket(viewer_connection);
316 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
317 goto end;
318 }
319 }
320
321 BT_ASSERT(sent <= to_send);
322 total_sent += sent;
323 to_send -= sent;
324
325 } while (to_send > 0);
326
327 BT_ASSERT(total_sent == len);
328 status = LTTNG_LIVE_VIEWER_STATUS_OK;
f79c2d7a
FD
329
330end:
4164020e 331 return status;
7cdc2bab
MD
332}
333
4164020e 334static int parse_url(struct live_viewer_connection *viewer_connection)
7cdc2bab 335{
4164020e
SM
336 char error_buf[256] = {0};
337 bt_self_component *self_comp = viewer_connection->self_comp;
338 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
339 struct bt_common_lttng_live_url_parts lttng_live_url_parts = {0};
340 int ret = -1;
341 const char *path = viewer_connection->url->str;
342
343 if (!path) {
344 goto end;
345 }
346
347 lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, sizeof(error_buf));
348 if (!lttng_live_url_parts.proto) {
349 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
350 "Invalid LTTng live URL format: %s", error_buf);
351 goto end;
352 }
353 viewer_connection->proto = lttng_live_url_parts.proto;
354 lttng_live_url_parts.proto = NULL;
355
356 viewer_connection->relay_hostname = lttng_live_url_parts.hostname;
357 lttng_live_url_parts.hostname = NULL;
358
359 if (lttng_live_url_parts.port >= 0) {
360 viewer_connection->port = lttng_live_url_parts.port;
361 } else {
362 viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT;
363 }
364
365 viewer_connection->target_hostname = lttng_live_url_parts.target_hostname;
366 lttng_live_url_parts.target_hostname = NULL;
367
368 if (lttng_live_url_parts.session_name) {
369 viewer_connection->session_name = lttng_live_url_parts.session_name;
370 lttng_live_url_parts.session_name = NULL;
371 }
372
373 ret = 0;
7cdc2bab
MD
374
375end:
4164020e
SM
376 bt_common_destroy_lttng_live_url_parts(&lttng_live_url_parts);
377 return ret;
7cdc2bab
MD
378}
379
4164020e
SM
380static enum lttng_live_viewer_status
381lttng_live_handshake(struct live_viewer_connection *viewer_connection)
7cdc2bab 382{
4164020e
SM
383 struct lttng_viewer_cmd cmd;
384 struct lttng_viewer_connect connect;
385 enum lttng_live_viewer_status status;
386 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
387 bt_self_component *self_comp = viewer_connection->self_comp;
388 const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
389 char cmd_buf[cmd_buf_len];
390
391 BT_COMP_OR_COMP_CLASS_LOGD(
392 self_comp, self_comp_class,
393 "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u",
394 lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR);
395
396 cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
397 cmd.data_size = htobe64((uint64_t) sizeof(connect));
398 cmd.cmd_version = htobe32(0);
399
400 connect.viewer_session_id = -1ULL; /* will be set on recv */
401 connect.major = htobe32(LTTNG_LIVE_MAJOR);
402 connect.minor = htobe32(LTTNG_LIVE_MINOR);
403 connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
404
405 /*
406 * Merge the cmd and connection request to prevent a write-write
407 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
408 * second write to be performed quickly in presence of Nagle's algorithm
409 */
410 memcpy(cmd_buf, &cmd, sizeof(cmd));
411 memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
412
413 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
414 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
415 viewer_handle_send_status(self_comp, self_comp_class, status, "viewer connect command");
416 goto end;
417 }
418
419 status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
420 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
421 viewer_handle_recv_status(self_comp, self_comp_class, status, "viewer connect reply");
422 goto end;
423 }
424
425 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Received viewer session ID : %" PRIu64,
426 (uint64_t) be64toh(connect.viewer_session_id));
427 BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, "Relayd version : %u.%u",
428 be32toh(connect.major), be32toh(connect.minor));
429
430 if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
431 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
432 "Incompatible lttng-relayd protocol");
433 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
434 goto end;
435 }
436 /* Use the smallest protocol version implemented. */
437 if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
438 viewer_connection->minor = be32toh(connect.minor);
439 } else {
440 viewer_connection->minor = LTTNG_LIVE_MINOR;
441 }
442 viewer_connection->major = LTTNG_LIVE_MAJOR;
443
444 status = LTTNG_LIVE_VIEWER_STATUS_OK;
445
446 goto end;
f79c2d7a
FD
447
448end:
4164020e 449 return status;
7cdc2bab
MD
450}
451
4164020e
SM
452static enum lttng_live_viewer_status
453lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 454{
4164020e
SM
455 struct hostent *host;
456 struct sockaddr_in server_addr;
457 enum lttng_live_viewer_status status;
458 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
459 bt_self_component *self_comp = viewer_connection->self_comp;
460
461 if (parse_url(viewer_connection)) {
462 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL");
463 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
464 goto error;
465 }
466
467 BT_COMP_OR_COMP_CLASS_LOGD(
468 self_comp, self_comp_class,
469 "Connecting to hostname : %s, port : %d, "
470 "target hostname : %s, session name : %s, proto : %s",
471 viewer_connection->relay_hostname->str, viewer_connection->port,
472 !viewer_connection->target_hostname ? "<none>" : viewer_connection->target_hostname->str,
473 !viewer_connection->session_name ? "<none>" : viewer_connection->session_name->str,
474 viewer_connection->proto->str);
475
476 host = gethostbyname(viewer_connection->relay_hostname->str);
477 if (!host) {
478 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
479 "Cannot lookup hostname: hostname=\"%s\"",
480 viewer_connection->relay_hostname->str);
481 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
482 goto error;
483 }
484
485 if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
486 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
487 "Socket creation failed: %s", bt_socket_errormsg());
488 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
489 goto error;
490 }
491
492 server_addr.sin_family = AF_INET;
493 server_addr.sin_port = htons(viewer_connection->port);
494 server_addr.sin_addr = *((struct in_addr *) host->h_addr);
495 memset(&(server_addr.sin_zero), 0, 8);
496
497 if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr,
498 sizeof(struct sockaddr)) == BT_SOCKET_ERROR) {
499 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s",
500 bt_socket_errormsg());
501 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
502 goto error;
503 }
504
505 status = lttng_live_handshake(viewer_connection);
506
507 /*
508 * Only print error and append cause in case of error. not in case of
509 * interruption.
510 */
511 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
512 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
513 "Viewer handshake failed");
514 goto error;
515 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
516 goto end;
517 }
518
519 goto end;
7cdc2bab
MD
520
521error:
4164020e
SM
522 if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
523 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
524 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s.",
525 bt_socket_errormsg());
526 }
527 }
528 viewer_connection->control_sock = BT_INVALID_SOCKET;
f79c2d7a 529end:
4164020e 530 return status;
7cdc2bab
MD
531}
532
4164020e 533static void lttng_live_disconnect_viewer(struct live_viewer_connection *viewer_connection)
7cdc2bab 534{
4164020e
SM
535 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
536 bt_self_component *self_comp = viewer_connection->self_comp;
537
538 if (viewer_connection->control_sock == BT_INVALID_SOCKET) {
539 return;
540 }
541 if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) {
542 BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, "Error closing socket: %s",
543 bt_socket_errormsg());
544 viewer_connection->control_sock = BT_INVALID_SOCKET;
545 }
7cdc2bab
MD
546}
547
4164020e
SM
548static int list_update_session(bt_value *results, const struct lttng_viewer_session *session,
549 bool *_found, struct live_viewer_connection *viewer_connection)
7cdc2bab 550{
4164020e
SM
551 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
552 bt_self_component *self_comp = viewer_connection->self_comp;
553 int ret = 0;
554 uint64_t i, len;
555 bt_value *map = NULL;
556 bt_value *hostname = NULL;
557 bt_value *session_name = NULL;
558 bt_value *btval = NULL;
559 bool found = false;
560
561 len = bt_value_array_get_length(results);
562 for (i = 0; i < len; i++) {
563 const char *hostname_str = NULL;
564 const char *session_name_str = NULL;
565
566 map = bt_value_array_borrow_element_by_index(results, i);
567 hostname = bt_value_map_borrow_entry_value(map, "target-hostname");
568 if (!hostname) {
569 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
570 "Error borrowing \"target-hostname\" entry.");
571 ret = -1;
572 goto end;
573 }
574 session_name = bt_value_map_borrow_entry_value(map, "session-name");
575 if (!session_name) {
576 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
577 "Error borrowing \"session-name\" entry.");
578 ret = -1;
579 goto end;
580 }
581 hostname_str = bt_value_string_get(hostname);
582 session_name_str = bt_value_string_get(session_name);
583
584 if (strcmp(session->hostname, hostname_str) == 0 &&
585 strcmp(session->session_name, session_name_str) == 0) {
586 int64_t val;
587 uint32_t streams = be32toh(session->streams);
588 uint32_t clients = be32toh(session->clients);
589
590 found = true;
591
592 btval = bt_value_map_borrow_entry_value(map, "stream-count");
593 if (!btval) {
594 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
595 "Error borrowing \"stream-count\" entry.");
596 ret = -1;
597 goto end;
598 }
599 val = bt_value_integer_unsigned_get(btval);
600 /* sum */
601 val += streams;
602 bt_value_integer_unsigned_set(btval, val);
603
604 btval = bt_value_map_borrow_entry_value(map, "client-count");
605 if (!btval) {
606 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
607 "Error borrowing \"client-count\" entry.");
608 ret = -1;
609 goto end;
610 }
611 val = bt_value_integer_unsigned_get(btval);
612 /* max */
613 val = bt_max_t(int64_t, clients, val);
614 bt_value_integer_unsigned_set(btval, val);
615 }
616
617 if (found) {
618 break;
619 }
620 }
7cdc2bab 621end:
4164020e
SM
622 *_found = found;
623 return ret;
7cdc2bab
MD
624}
625
4164020e
SM
626static int list_append_session(bt_value *results, GString *base_url,
627 const struct lttng_viewer_session *session,
628 struct live_viewer_connection *viewer_connection)
7cdc2bab 629{
4164020e
SM
630 int ret = 0;
631 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
632 bt_value_map_insert_entry_status insert_status;
633 bt_value_array_append_element_status append_status;
634 bt_value *map = NULL;
635 GString *url = NULL;
636 bool found = false;
637
638 /*
639 * If the session already exists, add the stream count to it,
640 * and do max of client counts.
641 */
642 ret = list_update_session(results, session, &found, viewer_connection);
643 if (ret || found) {
644 goto end;
645 }
646
647 map = bt_value_map_create();
648 if (!map) {
649 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating map value.");
650 ret = -1;
651 goto end;
652 }
653
654 if (base_url->len < 1) {
655 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error: base_url length smaller than 1.");
656 ret = -1;
657 goto end;
658 }
659 /*
660 * key = "url",
661 * value = <string>,
662 */
663 url = g_string_new(base_url->str);
664 g_string_append(url, "/host/");
665 g_string_append(url, session->hostname);
666 g_string_append_c(url, '/');
667 g_string_append(url, session->session_name);
668
669 insert_status = bt_value_map_insert_string_entry(map, "url", url->str);
670 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
671 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"url\" entry.");
672 ret = -1;
673 goto end;
674 }
675
676 /*
677 * key = "target-hostname",
678 * value = <string>,
679 */
680 insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname);
681 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
682 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
683 "Error inserting \"target-hostname\" entry.");
684 ret = -1;
685 goto end;
686 }
687
688 /*
689 * key = "session-name",
690 * value = <string>,
691 */
692 insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name);
693 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
694 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"session-name\" entry.");
695 ret = -1;
696 goto end;
697 }
698
699 /*
700 * key = "timer-us",
701 * value = <integer>,
702 */
703 {
704 uint32_t live_timer = be32toh(session->live_timer);
705
706 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "timer-us", live_timer);
707 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
708 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error inserting \"timer-us\" entry.");
709 ret = -1;
710 goto end;
711 }
712 }
713
714 /*
715 * key = "stream-count",
716 * value = <integer>,
717 */
718 {
719 uint32_t streams = be32toh(session->streams);
720
721 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams);
722 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
723 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
724 "Error inserting \"stream-count\" entry.");
725 ret = -1;
726 goto end;
727 }
728 }
729
730 /*
731 * key = "client-count",
732 * value = <integer>,
733 */
734 {
735 uint32_t clients = be32toh(session->clients);
736
737 insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients);
738 if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) {
739 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
740 "Error inserting \"client-count\" entry.");
741 ret = -1;
742 goto end;
743 }
744 }
745
746 append_status = bt_value_array_append_element(results, map);
747 if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) {
748 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending map to results.");
749 ret = -1;
750 }
14f28187 751
7cdc2bab 752end:
4164020e
SM
753 if (url) {
754 g_string_free(url, true);
755 }
756 BT_VALUE_PUT_REF_AND_RESET(map);
757 return ret;
7cdc2bab
MD
758}
759
760/*
761 * Data structure returned:
762 *
763 * {
764 * <array> = {
765 * [n] = {
766 * <map> = {
767 * {
768 * key = "url",
769 * value = <string>,
770 * },
771 * {
772 * key = "target-hostname",
773 * value = <string>,
774 * },
775 * {
776 * key = "session-name",
777 * value = <string>,
778 * },
779 * {
780 * key = "timer-us",
781 * value = <integer>,
782 * },
783 * {
784 * key = "stream-count",
785 * value = <integer>,
786 * },
787 * {
788 * key = "client-count",
789 * value = <integer>,
790 * },
791 * },
792 * }
793 * }
794 */
795
796BT_HIDDEN
4164020e
SM
797bt_component_class_query_method_status
798live_viewer_connection_list_sessions(struct live_viewer_connection *viewer_connection,
799 const bt_value **user_result)
7cdc2bab 800{
4164020e
SM
801 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
802 bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
803 bt_value *result = NULL;
804 enum lttng_live_viewer_status viewer_status;
805 struct lttng_viewer_cmd cmd;
806 struct lttng_viewer_list_sessions list;
807 uint32_t i, sessions_count;
808
809 result = bt_value_array_create();
810 if (!result) {
811 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error creating array");
812 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR;
813 goto error;
814 }
815
816 BT_LOGD("Requesting list of sessions: cmd=%s",
817 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
818
819 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
820 cmd.data_size = htobe64((uint64_t) 0);
821 cmd.cmd_version = htobe32(0);
822
823 viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
824 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
825 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error sending list sessions command");
826 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
827 goto error;
828 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
829 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
830 goto error;
831 }
832
833 viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
834 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
835 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session list");
836 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
837 goto error;
838 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
839 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
840 goto error;
841 }
842
843 sessions_count = be32toh(list.sessions_count);
844 for (i = 0; i < sessions_count; i++) {
845 struct lttng_viewer_session lsession;
846
847 viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
848 if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
849 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error receiving session:");
850 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
851 goto error;
852 } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
853 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
854 goto error;
855 }
856
857 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
858 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
859 if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) {
860 BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, "Error appending session");
861 status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
862 goto error;
863 }
864 }
865
866 *user_result = result;
867 goto end;
7cdc2bab 868error:
4164020e 869 BT_VALUE_PUT_REF_AND_RESET(result);
7cdc2bab 870end:
4164020e 871 return status;
7cdc2bab
MD
872}
873
4164020e
SM
874static enum lttng_live_viewer_status
875lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 876{
4164020e
SM
877 struct lttng_viewer_cmd cmd;
878 struct lttng_viewer_list_sessions list;
879 struct lttng_viewer_session lsession;
880 uint32_t i, sessions_count;
881 uint64_t session_id;
882 enum lttng_live_viewer_status status;
883 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
884 bt_self_component *self_comp = viewer_connection->self_comp;
885 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
886
887 BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s",
888 lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS));
889
890 cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
891 cmd.data_size = htobe64((uint64_t) 0);
892 cmd.cmd_version = htobe32(0);
893
894 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
895 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
896 viewer_handle_send_status(self_comp, self_comp_class, status, "list sessions command");
897 goto end;
898 }
899
900 status = lttng_live_recv(viewer_connection, &list, sizeof(list));
901 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
902 viewer_handle_recv_status(self_comp, self_comp_class, status, "session list reply");
903 goto end;
904 }
905
906 sessions_count = be32toh(list.sessions_count);
907 for (i = 0; i < sessions_count; i++) {
908 status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession));
909 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
910 viewer_handle_recv_status(self_comp, self_comp_class, status, "session reply");
911 goto end;
912 }
913 lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
914 lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
915 session_id = be64toh(lsession.id);
916
917 BT_COMP_LOGI("Adding session to internal list: "
918 "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"",
919 session_id, lsession.hostname, lsession.session_name);
920
921 if ((strncmp(lsession.session_name, viewer_connection->session_name->str,
922 LTTNG_VIEWER_NAME_MAX) == 0) &&
923 (strncmp(lsession.hostname, viewer_connection->target_hostname->str,
924 LTTNG_VIEWER_HOST_NAME_MAX) == 0)) {
925 if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname,
926 lsession.session_name)) {
927 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session");
928 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
929 goto end;
930 }
931 }
932 }
933
934 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 935
f79c2d7a 936end:
4164020e 937 return status;
7cdc2bab
MD
938}
939
940BT_HIDDEN
4164020e
SM
941enum lttng_live_viewer_status
942lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter)
7cdc2bab 943{
4164020e
SM
944 struct lttng_viewer_cmd cmd;
945 struct lttng_viewer_create_session_response resp;
946 enum lttng_live_viewer_status status;
947 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
948 bt_self_component *self_comp = viewer_connection->self_comp;
949 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
950
951 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, "Creating a viewer session: cmd=%s",
952 lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION));
953
954 cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
955 cmd.data_size = htobe64((uint64_t) 0);
956 cmd.cmd_version = htobe32(0);
957
958 status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
959 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
960 viewer_handle_send_status(self_comp, self_comp_class, status, "create session command");
961 goto end;
962 }
963
964 status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
965 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
966 viewer_handle_recv_status(self_comp, self_comp_class, status, "create session reply");
967 goto end;
968 }
969
970 if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
971 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session");
972 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
973 goto end;
974 }
975
976 status = lttng_live_query_session_ids(lttng_live_msg_iter);
977 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
978 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids");
979 goto end;
980 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
981 goto end;
982 }
7cdc2bab 983
f79c2d7a 984end:
4164020e 985 return status;
7cdc2bab
MD
986}
987
4164020e
SM
988static enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
989 uint32_t stream_count,
990 bt_self_message_iterator *self_msg_iter)
7cdc2bab 991{
4164020e
SM
992 uint32_t i;
993 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
994 enum lttng_live_viewer_status status;
995 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
996 bt_self_component *self_comp = viewer_connection->self_comp;
997
998 BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count);
999 for (i = 0; i < stream_count; i++) {
1000 struct lttng_viewer_stream stream;
1001 struct lttng_live_stream_iterator *live_stream;
1002 uint64_t stream_id;
1003 uint64_t ctf_trace_id;
1004
1005 status = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
1006 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1007 viewer_handle_recv_status(self_comp, NULL, status, "stream reply");
1008 goto end;
1009 }
1010 stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
1011 stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
1012 stream_id = be64toh(stream.id);
1013 ctf_trace_id = be64toh(stream.ctf_trace_id);
1014
1015 if (stream.metadata_flag) {
1016 BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1017 stream.channel_name);
1018 if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id,
1019 stream.path_name)) {
1020 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream");
1021 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1022 goto end;
1023 }
1024 session->lazy_stream_msg_init = true;
1025 } else {
1026 BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", stream_id, stream.path_name,
1027 stream.channel_name);
1028 live_stream =
1029 lttng_live_stream_iterator_create(session, ctf_trace_id, stream_id, self_msg_iter);
1030 if (!live_stream) {
1031 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream");
1032 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1033 goto end;
1034 }
1035 }
1036 }
1037 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1038
f79c2d7a 1039end:
4164020e 1040 return status;
7cdc2bab
MD
1041}
1042
1043BT_HIDDEN
4164020e
SM
1044enum lttng_live_viewer_status lttng_live_session_attach(struct lttng_live_session *session,
1045 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1046{
4164020e
SM
1047 struct lttng_viewer_cmd cmd;
1048 enum lttng_live_viewer_status status;
1049 struct lttng_viewer_attach_session_request rq;
1050 struct lttng_viewer_attach_session_response rp;
1051 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1052 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1053 bt_self_component *self_comp = viewer_connection->self_comp;
1054 uint64_t session_id = session->id;
1055 uint32_t streams_count;
1056 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1057 char cmd_buf[cmd_buf_len];
1058
1059 BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 ", seek=%s",
1060 lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), session_id,
1061 lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST));
1062
1063 cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
1064 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1065 cmd.cmd_version = htobe32(0);
1066
1067 memset(&rq, 0, sizeof(rq));
1068 rq.session_id = htobe64(session_id);
1069 // TODO: add cmd line parameter to select seek beginning
1070 // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
1071 rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
1072
1073 /*
1074 * Merge the cmd and connection request to prevent a write-write
1075 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1076 * second write to be performed quickly in presence of Nagle's algorithm.
1077 */
1078 memcpy(cmd_buf, &cmd, sizeof(cmd));
1079 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1080 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1081 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1082 viewer_handle_send_status(self_comp, NULL, status, "attach session command");
1083 goto end;
1084 }
1085
1086 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1087 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1088 viewer_handle_recv_status(self_comp, NULL, status, "attach session reply");
1089 goto end;
1090 }
1091
1092 streams_count = be32toh(rp.streams_count);
1093 switch (be32toh(rp.status)) {
1094 case LTTNG_VIEWER_ATTACH_OK:
1095 break;
1096 case LTTNG_VIEWER_ATTACH_UNK:
1097 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id);
1098 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1099 goto end;
1100 case LTTNG_VIEWER_ATTACH_ALREADY:
1101 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session");
1102 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1103 goto end;
1104 case LTTNG_VIEWER_ATTACH_NOT_LIVE:
1105 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
1106 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1107 goto end;
1108 case LTTNG_VIEWER_ATTACH_SEEK_ERR:
1109 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
1110 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1111 goto end;
1112 default:
1113 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status));
1114 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1115 goto end;
1116 }
1117
1118 /* We receive the initial list of streams. */
1119 status = receive_streams(session, streams_count, self_msg_iter);
1120 switch (status) {
1121 case LTTNG_LIVE_VIEWER_STATUS_OK:
1122 break;
1123 case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
1124 goto end;
1125 case LTTNG_LIVE_VIEWER_STATUS_ERROR:
1126 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
1127 goto end;
1128 default:
1129 bt_common_abort();
1130 }
1131
1132 session->attached = true;
1133 session->new_streams_needed = false;
7cdc2bab 1134
eee8e741 1135end:
4164020e 1136 return status;
7cdc2bab
MD
1137}
1138
1139BT_HIDDEN
4164020e 1140enum lttng_live_viewer_status lttng_live_session_detach(struct lttng_live_session *session)
7cdc2bab 1141{
4164020e
SM
1142 struct lttng_viewer_cmd cmd;
1143 enum lttng_live_viewer_status status;
1144 struct lttng_viewer_detach_session_request rq;
1145 struct lttng_viewer_detach_session_response rp;
1146 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1147 bt_self_component *self_comp = session->self_comp;
1148 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1149 uint64_t session_id = session->id;
1150 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1151 char cmd_buf[cmd_buf_len];
1152
1153 /*
1154 * The session might already be detached and the viewer socket might
1155 * already been closed. This happens when calling this function when
1156 * tearing down the graph after an error.
1157 */
1158 if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) {
1159 return LTTNG_LIVE_VIEWER_STATUS_OK;
1160 }
1161
1162 BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64,
1163 lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), session_id);
1164
1165 cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION);
1166 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1167 cmd.cmd_version = htobe32(0);
1168
1169 memset(&rq, 0, sizeof(rq));
1170 rq.session_id = htobe64(session_id);
1171
1172 /*
1173 * Merge the cmd and connection request to prevent a write-write
1174 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1175 * second write to be performed quickly in presence of Nagle's algorithm.
1176 */
1177 memcpy(cmd_buf, &cmd, sizeof(cmd));
1178 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1179 status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1180 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1181 viewer_handle_send_status(self_comp, NULL, status, "detach session command");
1182 goto end;
1183 }
1184
1185 status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1186 if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1187 viewer_handle_recv_status(self_comp, NULL, status, "detach session reply");
1188 goto end;
1189 }
1190
1191 switch (be32toh(rp.status)) {
1192 case LTTNG_VIEWER_DETACH_SESSION_OK:
1193 break;
1194 case LTTNG_VIEWER_DETACH_SESSION_UNK:
1195 BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
1196 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1197 goto end;
1198 case LTTNG_VIEWER_DETACH_SESSION_ERR:
1199 BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
1200 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1201 goto end;
1202 default:
1203 BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
1204 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1205 goto end;
1206 }
1207
1208 session->attached = false;
1209
1210 status = LTTNG_LIVE_VIEWER_STATUS_OK;
7cdc2bab 1211
f79c2d7a 1212end:
4164020e 1213 return status;
7cdc2bab
MD
1214}
1215
1216BT_HIDDEN
4164020e
SM
1217enum lttng_live_get_one_metadata_status
1218lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
7cdc2bab 1219{
4164020e
SM
1220 uint64_t len = 0;
1221 enum lttng_live_get_one_metadata_status status;
1222 enum lttng_live_viewer_status viewer_status;
1223 struct lttng_viewer_cmd cmd;
1224 struct lttng_viewer_get_metadata rq;
1225 struct lttng_viewer_metadata_packet rp;
1226 gchar *data = NULL;
1227 ssize_t writelen;
1228 struct lttng_live_session *session = trace->session;
1229 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1230 struct lttng_live_metadata *metadata = trace->metadata;
1231 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1232 bt_self_component *self_comp = viewer_connection->self_comp;
1233 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1234 char cmd_buf[cmd_buf_len];
1235
1236 BT_COMP_LOGD("Requesting new metadata for trace:"
1237 "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64,
1238 lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id,
1239 metadata->stream_id);
1240
1241 rq.stream_id = htobe64(metadata->stream_id);
1242 cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
1243 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1244 cmd.cmd_version = htobe32(0);
1245
1246 /*
1247 * Merge the cmd and connection request to prevent a write-write
1248 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1249 * second write to be performed quickly in presence of Nagle's algorithm.
1250 */
1251 memcpy(cmd_buf, &cmd, sizeof(cmd));
1252 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1253 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1254 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1255 viewer_handle_send_status(self_comp, NULL, viewer_status, "get metadata command");
1256 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1257 goto end;
1258 }
1259
1260 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1261 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1262 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata reply");
1263 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1264 goto end;
1265 }
1266
1267 switch (be32toh(rp.status)) {
1268 case LTTNG_VIEWER_METADATA_OK:
1269 BT_COMP_LOGD("Received get_metadata response: ok");
1270 break;
1271 case LTTNG_VIEWER_NO_NEW_METADATA:
1272 BT_COMP_LOGD("Received get_metadata response: no new");
1273 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
1274 goto end;
1275 case LTTNG_VIEWER_METADATA_ERR:
1276 /*
1277 * The Relayd cannot find this stream id. Maybe its
1278 * gone already. This can happen in short lived UST app
1279 * in a per-pid session.
1280 */
1281 BT_COMP_LOGD("Received get_metadata response: error");
1282 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
1283 goto end;
1284 default:
1285 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown");
1286 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1287 goto end;
1288 }
1289
1290 len = be64toh(rp.len);
1291 BT_COMP_LOGD("Writing %" PRIu64 " bytes to metadata", len);
1292 if (len <= 0) {
1293 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length");
1294 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1295 goto end;
1296 }
1297
1298 data = g_new0(gchar, len);
1299 if (!data) {
1300 BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", ".");
1301 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1302 goto end;
1303 }
1304
1305 viewer_status = lttng_live_recv(viewer_connection, data, len);
1306 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1307 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet");
1308 status = (enum lttng_live_get_one_metadata_status) viewer_status;
1309 goto end;
1310 }
1311
1312 /*
1313 * Write the metadata to the file handle.
1314 */
1315 writelen = fwrite(data, sizeof(uint8_t), len, fp);
1316 if (writelen != len) {
1317 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream");
1318 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
1319 goto end;
1320 }
1321
1322 *reply_len = len;
1323 status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
c28512ab 1324
c28512ab 1325end:
4164020e
SM
1326 g_free(data);
1327 return status;
7cdc2bab
MD
1328}
1329
1330/*
1331 * Assign the fields from a lttng_viewer_index to a packet_index.
1332 */
4164020e
SM
1333static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex,
1334 struct packet_index *pindex)
7cdc2bab 1335{
4164020e
SM
1336 BT_ASSERT(lindex);
1337 BT_ASSERT(pindex);
1338
1339 pindex->offset = be64toh(lindex->offset);
1340 pindex->packet_size = be64toh(lindex->packet_size);
1341 pindex->content_size = be64toh(lindex->content_size);
1342 pindex->ts_cycles.timestamp_begin = be64toh(lindex->timestamp_begin);
1343 pindex->ts_cycles.timestamp_end = be64toh(lindex->timestamp_end);
1344 pindex->events_discarded = be64toh(lindex->events_discarded);
7cdc2bab
MD
1345}
1346
4164020e 1347static void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter)
36e94ad6 1348{
4164020e
SM
1349 uint64_t session_idx;
1350 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1351
1352 for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; session_idx++) {
1353 struct lttng_live_session *session =
1354 (lttng_live_session *) g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx);
1355 BT_COMP_LOGD("Marking session as needing new streams: "
1356 "session-id=%" PRIu64,
1357 session->id);
1358 session->new_streams_needed = true;
1359 }
36e94ad6
FD
1360}
1361
7cdc2bab 1362BT_HIDDEN
4164020e
SM
1363enum lttng_live_iterator_status
1364lttng_live_get_next_index(struct lttng_live_msg_iter *lttng_live_msg_iter,
1365 struct lttng_live_stream_iterator *stream, struct packet_index *index)
7cdc2bab 1366{
4164020e
SM
1367 struct lttng_viewer_cmd cmd;
1368 struct lttng_viewer_get_next_index rq;
1369 enum lttng_live_viewer_status viewer_status;
1370 struct lttng_viewer_index rp;
1371 enum lttng_live_iterator_status status;
1372 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1373 bt_self_component *self_comp = viewer_connection->self_comp;
1374 struct lttng_live_trace *trace = stream->trace;
1375 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1376 char cmd_buf[cmd_buf_len];
1377 uint32_t flags, rp_status;
1378
1379 BT_COMP_LOGD("Requesting next index for stream: cmd=%s, "
1380 "viewer-stream-id=%" PRIu64,
1381 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1382 stream->viewer_stream_id);
1383 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
1384 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1385 cmd.cmd_version = htobe32(0);
1386
1387 memset(&rq, 0, sizeof(rq));
1388 rq.stream_id = htobe64(stream->viewer_stream_id);
1389
1390 /*
1391 * Merge the cmd and connection request to prevent a write-write
1392 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1393 * second write to be performed quickly in presence of Nagle's algorithm.
1394 */
1395 memcpy(cmd_buf, &cmd, sizeof(cmd));
1396 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1397
1398 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1399 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1400 viewer_handle_send_status(self_comp, NULL, viewer_status, "get next index command");
1401 goto error;
1402 }
1403
1404 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1405 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1406 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get next index reply");
1407 goto error;
1408 }
1409
1410 flags = be32toh(rp.flags);
1411 rp_status = be32toh(rp.status);
1412
1413 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1414 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX),
1415 lttng_viewer_next_index_return_code_string(rp_status));
1416 switch (rp_status) {
1417 case LTTNG_VIEWER_INDEX_INACTIVE:
1418 {
1419 uint64_t ctf_stream_class_id;
1420
1421 memset(index, 0, sizeof(struct packet_index));
1422 index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end);
1423 stream->current_inactivity_ts = index->ts_cycles.timestamp_end;
1424 ctf_stream_class_id = be64toh(rp.stream_id);
1425 if (stream->ctf_stream_class_id.is_set) {
1426 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1427 } else {
1428 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1429 stream->ctf_stream_class_id.is_set = true;
1430 }
1431 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT);
1432 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1433 break;
1434 }
1435 case LTTNG_VIEWER_INDEX_OK:
1436 {
1437 uint64_t ctf_stream_class_id;
1438
1439 lttng_index_to_packet_index(&rp, index);
1440 ctf_stream_class_id = be64toh(rp.stream_id);
1441 if (stream->ctf_stream_class_id.is_set) {
1442 BT_ASSERT(stream->ctf_stream_class_id.value == ctf_stream_class_id);
1443 } else {
1444 stream->ctf_stream_class_id.value = ctf_stream_class_id;
1445 stream->ctf_stream_class_id.is_set = true;
1446 }
1447
1448 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA);
1449
1450 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1451 BT_COMP_LOGD("Marking trace as needing new metadata: "
1452 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1453 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1454 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1455 }
1456 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1457 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1458 "response=%s, response-flag=NEW_STREAM",
1459 lttng_viewer_next_index_return_code_string(rp_status));
1460 lttng_live_need_new_streams(lttng_live_msg_iter);
1461 }
1462 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1463 break;
1464 }
1465 case LTTNG_VIEWER_INDEX_RETRY:
1466 memset(index, 0, sizeof(struct packet_index));
1467 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1468 status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
1469 goto end;
1470 case LTTNG_VIEWER_INDEX_HUP:
1471 memset(index, 0, sizeof(struct packet_index));
1472 index->offset = EOF;
1473 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF);
1474 stream->has_stream_hung_up = true;
1475 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1476 break;
1477 case LTTNG_VIEWER_INDEX_ERR:
1478 memset(index, 0, sizeof(struct packet_index));
1479 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1480 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1481 goto end;
1482 default:
1483 BT_COMP_LOGD("Received get_next_index response: unknown value");
1484 memset(index, 0, sizeof(struct packet_index));
1485 lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA);
1486 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1487 goto end;
1488 }
1489 goto end;
7cdc2bab
MD
1490
1491error:
4164020e 1492 status = viewer_status_to_live_iterator_status(viewer_status);
f79c2d7a 1493end:
4164020e 1494 return status;
7cdc2bab
MD
1495}
1496
1497BT_HIDDEN
4164020e
SM
1498enum ctf_msg_iter_medium_status
1499lttng_live_get_stream_bytes(struct lttng_live_msg_iter *lttng_live_msg_iter,
1500 struct lttng_live_stream_iterator *stream, uint8_t *buf,
1501 uint64_t offset, uint64_t req_len, uint64_t *recv_len)
7cdc2bab 1502{
4164020e
SM
1503 enum ctf_msg_iter_medium_status status;
1504 enum lttng_live_viewer_status viewer_status;
1505 struct lttng_viewer_trace_packet rp;
1506 struct lttng_viewer_cmd cmd;
1507 struct lttng_viewer_get_packet rq;
1508 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1509 bt_self_component *self_comp = viewer_connection->self_comp;
1510 struct lttng_live_trace *trace = stream->trace;
1511 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1512 char cmd_buf[cmd_buf_len];
1513 uint32_t flags, rp_status;
1514
1515 BT_COMP_LOGD("Requesting data from stream: cmd=%s, "
1516 "offset=%" PRIu64 ", request-len=%" PRIu64,
1517 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len);
1518
1519 cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
1520 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1521 cmd.cmd_version = htobe32(0);
1522
1523 memset(&rq, 0, sizeof(rq));
1524 rq.stream_id = htobe64(stream->viewer_stream_id);
1525 rq.offset = htobe64(offset);
1526 rq.len = htobe32(req_len);
1527
1528 /*
1529 * Merge the cmd and connection request to prevent a write-write
1530 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1531 * second write to be performed quickly in presence of Nagle's algorithm.
1532 */
1533 memcpy(cmd_buf, &cmd, sizeof(cmd));
1534 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1535
1536 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1537 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1538 viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command");
1539 goto error_convert_status;
1540 }
1541
1542 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1543 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1544 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply");
1545 goto error_convert_status;
1546 }
1547
1548 flags = be32toh(rp.flags);
1549 rp_status = be32toh(rp.status);
1550
1551 BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s",
1552 lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET),
1553 lttng_viewer_get_packet_return_code_string(rp_status));
1554 switch (rp_status) {
1555 case LTTNG_VIEWER_GET_PACKET_OK:
1556 req_len = be32toh(rp.len);
1557 BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "",
1558 lttng_viewer_get_packet_return_code_string(rp_status), req_len);
4164020e
SM
1559 break;
1560 case LTTNG_VIEWER_GET_PACKET_RETRY:
1561 /* Unimplemented by relay daemon */
1562 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1563 goto end;
1564 case LTTNG_VIEWER_GET_PACKET_ERR:
1565 if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
1566 BT_COMP_LOGD("Marking trace as needing new metadata: "
1567 "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64,
1568 lttng_viewer_next_index_return_code_string(rp_status), trace->id);
1569 trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED;
1570 }
1571 if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) {
1572 BT_COMP_LOGD("Marking all sessions as possibly needing new streams: "
1573 "response=%s, response-flag=NEW_STREAM",
1574 lttng_viewer_next_index_return_code_string(rp_status));
1575 lttng_live_need_new_streams(lttng_live_msg_iter);
1576 }
1577 if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
1578 status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
1579 BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s",
1580 lttng_viewer_get_packet_return_code_string(rp_status));
1581 goto end;
1582 }
1583 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error");
1584 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1585 goto end;
1586 case LTTNG_VIEWER_GET_PACKET_EOF:
1587 status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
1588 goto end;
1589 default:
1590 BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown (%d)",
1591 rp_status);
1592 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1593 goto end;
1594 }
1595
1596 if (req_len == 0) {
1597 status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
1598 goto end;
1599 }
1600
1601 viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
1602 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1603 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet");
1604 goto error_convert_status;
1605 }
1606 *recv_len = req_len;
1607
1608 status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
1609 goto end;
f79c2d7a 1610
535fb48a 1611error_convert_status:
4164020e 1612 status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
f79c2d7a 1613end:
4164020e 1614 return status;
7cdc2bab
MD
1615}
1616
1617/*
1618 * Request new streams for a session.
1619 */
1620BT_HIDDEN
4164020e
SM
1621enum lttng_live_iterator_status
1622lttng_live_session_get_new_streams(struct lttng_live_session *session,
1623 bt_self_message_iterator *self_msg_iter)
7cdc2bab 1624{
4164020e
SM
1625 enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1626 struct lttng_viewer_cmd cmd;
1627 struct lttng_viewer_new_streams_request rq;
1628 struct lttng_viewer_new_streams_response rp;
1629 struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter;
1630 enum lttng_live_viewer_status viewer_status;
1631 struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection;
1632 bt_self_component *self_comp = viewer_connection->self_comp;
1633 uint32_t streams_count;
1634 const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
1635 char cmd_buf[cmd_buf_len];
1636
1637 if (!session->new_streams_needed) {
1638 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
1639 goto end;
1640 }
1641
1642 BT_COMP_LOGD("Requesting new streams for session: cmd=%s, "
1643 "session-id=%" PRIu64,
1644 lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), session->id);
1645
1646 cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
1647 cmd.data_size = htobe64((uint64_t) sizeof(rq));
1648 cmd.cmd_version = htobe32(0);
1649
1650 memset(&rq, 0, sizeof(rq));
1651 rq.session_id = htobe64(session->id);
1652
1653 /*
1654 * Merge the cmd and connection request to prevent a write-write
1655 * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
1656 * second write to be performed quickly in presence of Nagle's algorithm.
1657 */
1658 memcpy(cmd_buf, &cmd, sizeof(cmd));
1659 memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
1660
1661 viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
1662 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1663 viewer_handle_send_status(self_comp, NULL, viewer_status, "get new streams command");
1664 status = viewer_status_to_live_iterator_status(viewer_status);
1665 goto end;
1666 }
1667
1668 viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
1669 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1670 viewer_handle_recv_status(self_comp, NULL, viewer_status, "get new streams reply");
1671 status = viewer_status_to_live_iterator_status(viewer_status);
1672 goto end;
1673 }
1674
1675 streams_count = be32toh(rp.streams_count);
1676
1677 switch (be32toh(rp.status)) {
1678 case LTTNG_VIEWER_NEW_STREAMS_OK:
1679 session->new_streams_needed = false;
1680 break;
1681 case LTTNG_VIEWER_NEW_STREAMS_NO_NEW:
1682 session->new_streams_needed = false;
1683 goto end;
1684 case LTTNG_VIEWER_NEW_STREAMS_HUP:
1685 session->new_streams_needed = false;
1686 session->closed = true;
1687 status = LTTNG_LIVE_ITERATOR_STATUS_END;
1688 goto end;
1689 case LTTNG_VIEWER_NEW_STREAMS_ERR:
1690 BT_COMP_LOGD("Received get_new_streams response: error");
1691 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1692 goto end;
1693 default:
1694 BT_COMP_LOGE_APPEND_CAUSE(self_comp,
1695 "Received get_new_streams response: Unknown:"
1696 "return code %u",
1697 be32toh(rp.status));
1698 status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
1699 goto end;
1700 }
1701
1702 viewer_status = receive_streams(session, streams_count, self_msg_iter);
1703 if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
1704 viewer_handle_recv_status(self_comp, NULL, viewer_status, "new streams");
1705 status = viewer_status_to_live_iterator_status(viewer_status);
1706 goto end;
1707 }
1708
1709 status = LTTNG_LIVE_ITERATOR_STATUS_OK;
f79c2d7a 1710end:
4164020e 1711 return status;
7cdc2bab
MD
1712}
1713
1714BT_HIDDEN
f79c2d7a 1715enum lttng_live_viewer_status live_viewer_connection_create(
4164020e
SM
1716 bt_self_component *self_comp, bt_self_component_class *self_comp_class,
1717 bt_logging_level log_level, const char *url, bool in_query,
1718 struct lttng_live_msg_iter *lttng_live_msg_iter, struct live_viewer_connection **viewer)
7cdc2bab 1719{
4164020e
SM
1720 struct live_viewer_connection *viewer_connection;
1721 enum lttng_live_viewer_status status;
1722
1723 viewer_connection = g_new0(struct live_viewer_connection, 1);
1724
1725 if (bt_socket_init(log_level) != 0) {
1726 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1727 "Failed to init socket");
1728 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1729 goto error;
1730 }
1731
1732 viewer_connection->log_level = log_level;
1733
1734 viewer_connection->self_comp = self_comp;
1735 viewer_connection->self_comp_class = self_comp_class;
1736
1737 viewer_connection->control_sock = BT_INVALID_SOCKET;
1738 viewer_connection->port = -1;
1739 viewer_connection->in_query = in_query;
1740 viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter;
1741 viewer_connection->url = g_string_new(url);
1742 if (!viewer_connection->url) {
1743 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1744 "Failed to allocate URL buffer");
1745 status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
1746 goto error;
1747 }
1748
1749 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1750 "Establishing connection to url \"%s\"...", url);
1751 status = lttng_live_connect_viewer(viewer_connection);
1752 /*
1753 * Only print error and append cause in case of error. not in case of
1754 * interruption.
1755 */
1756 if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
1757 BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class,
1758 "Failed to establish connection: "
1759 "url=\"%s\"",
1760 url);
1761 goto error;
1762 } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
1763 goto error;
1764 }
1765 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1766 "Connection to url \"%s\" is established", url);
1767
1768 *viewer = viewer_connection;
1769 status = LTTNG_LIVE_VIEWER_STATUS_OK;
1770 goto end;
7cdc2bab 1771
7cdc2bab 1772error:
4164020e
SM
1773 if (viewer_connection) {
1774 live_viewer_connection_destroy(viewer_connection);
1775 }
f79c2d7a 1776end:
4164020e 1777 return status;
7cdc2bab
MD
1778}
1779
1780BT_HIDDEN
4164020e 1781void live_viewer_connection_destroy(struct live_viewer_connection *viewer_connection)
7cdc2bab 1782{
4164020e
SM
1783 bt_self_component *self_comp = viewer_connection->self_comp;
1784 bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
b9e6ec43 1785
4164020e
SM
1786 if (!viewer_connection) {
1787 goto end;
1788 }
b9e6ec43 1789
4164020e
SM
1790 BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class,
1791 "Closing connection to relay:"
1792 "relay-url=\"%s\"",
1793 viewer_connection->url->str);
ecb4ba8a 1794
4164020e 1795 lttng_live_disconnect_viewer(viewer_connection);
b9e6ec43 1796
4164020e
SM
1797 if (viewer_connection->url) {
1798 g_string_free(viewer_connection->url, true);
1799 }
b9e6ec43 1800
4164020e
SM
1801 if (viewer_connection->relay_hostname) {
1802 g_string_free(viewer_connection->relay_hostname, true);
1803 }
b9e6ec43 1804
4164020e
SM
1805 if (viewer_connection->target_hostname) {
1806 g_string_free(viewer_connection->target_hostname, true);
1807 }
b9e6ec43 1808
4164020e
SM
1809 if (viewer_connection->session_name) {
1810 g_string_free(viewer_connection->session_name, true);
1811 }
b9e6ec43 1812
4164020e
SM
1813 if (viewer_connection->proto) {
1814 g_string_free(viewer_connection->proto, true);
1815 }
ecb4ba8a 1816
4164020e 1817 g_free(viewer_connection);
1cb3cdd7 1818
4164020e 1819 bt_socket_fini();
b9e6ec43
FD
1820
1821end:
4164020e 1822 return;
7cdc2bab 1823}
This page took 0.161795 seconds and 4 git commands to generate.