X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=34bac35afa0fd7c24c3f32b2c3a8b78167d659c9;hp=d96a225601ea3a2bc5eccb16a369a64cd5bdd8ad;hb=1a25881946640f778a14e8f5491795193572d826;hpb=4a39caef874f11e1684e67fd33ad8f86b0a6d651 diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index d96a2256..34bac35a 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -1,24 +1,8 @@ /* - * Copyright 2019 - Francis Deslauriers - * Copyright 2016 - Mathieu Desnoyers + * SPDX-License-Identifier: MIT * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2019 Francis Deslauriers + * Copyright 2016 Mathieu Desnoyers */ #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp) @@ -26,15 +10,15 @@ #define BT_LOG_TAG "PLUGIN/SRC.CTF.LTTNG-LIVE/VIEWER" #include "logging/comp-logging.h" -#include +#include +#include #include +#include #include -#include +#include #include + #include -#include -#include -#include #include "compat/socket.h" #include "compat/endian.h" @@ -48,67 +32,328 @@ #include "data-stream.h" #include "metadata.h" +#define viewer_handle_send_recv_status(_self_comp, _self_comp_class, \ + _status, _action, _msg_str) \ +do { \ + switch (_status) { \ + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \ + break; \ + case LTTNG_LIVE_VIEWER_STATUS_ERROR: \ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, \ + _self_comp_class, "Error " _action " " _msg_str); \ + break; \ + default: \ + bt_common_abort(); \ + } \ +} while (0) + +#define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \ + viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \ + "sending", _msg_str) + +#define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \ + viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \ + "receiving", _msg_str) + +#define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, \ + _self_comp_class, _msg, _fmt, ...) \ + do { \ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \ + _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \ + } while (0) + static -ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection, +const char *lttng_viewer_command_string(enum lttng_viewer_command cmd) +{ + switch (cmd){ + case LTTNG_VIEWER_CONNECT: + return "CONNECT"; + case LTTNG_VIEWER_LIST_SESSIONS: + return "LIST_SESSIONS"; + case LTTNG_VIEWER_ATTACH_SESSION: + return "ATTACH_SESSION"; + case LTTNG_VIEWER_GET_NEXT_INDEX: + return "GET_NEXT_INDEX"; + case LTTNG_VIEWER_GET_PACKET: + return "GET_PACKET"; + case LTTNG_VIEWER_GET_METADATA: + return "GET_METADATA"; + case LTTNG_VIEWER_GET_NEW_STREAMS: + return "GET_NEW_STREAMS"; + case LTTNG_VIEWER_CREATE_SESSION: + return "CREATE_SESSION"; + case LTTNG_VIEWER_DETACH_SESSION: + return "DETACH_SESSION"; + } + + bt_common_abort(); +} + +static +const char *lttng_viewer_next_index_return_code_string( + enum lttng_viewer_next_index_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_INDEX_OK: + return "INDEX_OK"; + case LTTNG_VIEWER_INDEX_RETRY: + return "INDEX_RETRY"; + case LTTNG_VIEWER_INDEX_HUP: + return "INDEX_HUP"; + case LTTNG_VIEWER_INDEX_ERR: + return "INDEX_ERR"; + case LTTNG_VIEWER_INDEX_INACTIVE: + return "INDEX_INACTIVE"; + case LTTNG_VIEWER_INDEX_EOF: + return "INDEX_EOF"; + } + + bt_common_abort(); +} + +static +const char *lttng_viewer_get_packet_return_code_string( + enum lttng_viewer_get_packet_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_GET_PACKET_OK: + return "GET_PACKET_OK"; + case LTTNG_VIEWER_GET_PACKET_RETRY: + return "GET_PACKET_RETRY"; + case LTTNG_VIEWER_GET_PACKET_ERR: + return "GET_PACKET_ERR"; + case LTTNG_VIEWER_GET_PACKET_EOF: + return "GET_PACKET_EOF"; + } + + bt_common_abort(); +}; + +static +const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek) +{ + switch (seek) { + case LTTNG_VIEWER_SEEK_BEGINNING: + return "SEEK_BEGINNING"; + case LTTNG_VIEWER_SEEK_LAST: + return "SEEK_LAST"; + } + + bt_common_abort(); +} + +static inline +enum lttng_live_iterator_status viewer_status_to_live_iterator_status( + enum lttng_live_viewer_status viewer_status) +{ + switch (viewer_status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + return LTTNG_LIVE_ITERATOR_STATUS_OK; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } + + bt_common_abort(); +} + +static inline +enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status( + enum lttng_live_viewer_status viewer_status) +{ + switch (viewer_status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + return CTF_MSG_ITER_MEDIUM_STATUS_OK; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: + return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + } + + bt_common_abort(); +} + +static inline +void viewer_connection_close_socket( + struct live_viewer_connection *viewer_connection) +{ + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + int ret = bt_socket_close(viewer_connection->control_sock); + if (ret == -1) { + BT_COMP_OR_COMP_CLASS_LOGW_ERRNO( + self_comp, self_comp_class, + "Error closing viewer connection socket: ", "."); + } + + viewer_connection->control_sock = BT_INVALID_SOCKET; +} + +/* + * This function receives a message from the Relay daemon. + * If it received the entire message, it returns _OK, + * If it's interrupted, it returns _INTERRUPTED, + * otherwise, it returns _ERROR. + */ +static +enum lttng_live_viewer_status lttng_live_recv( + struct live_viewer_connection *viewer_connection, void *buf, size_t len) { - ssize_t ret; - size_t copied = 0, to_copy = len; + ssize_t received; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + size_t total_received = 0, to_receive = len; struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter; + enum lttng_live_viewer_status status; BT_SOCKET sock = viewer_connection->control_sock; + /* + * Receive a message from the Relay. + */ do { - ret = bt_socket_recv(sock, buf + copied, to_copy, 0); - if (ret > 0) { - BT_ASSERT(ret <= to_copy); - copied += ret; - to_copy -= ret; - } - if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - break; + received = bt_socket_recv(sock, buf + total_received, to_receive, 0); + if (received == BT_SOCKET_ERROR) { + if (bt_socket_interrupted()) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { + /* + * This interruption was due to a + * SIGINT and the graph is being torn + * down. + */ + status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED; + lttng_live_msg_iter->was_interrupted = true; + goto end; + } else { + /* + * A signal was received, but the graph + * is not being torn down. Carry on. + */ + continue; + } } else { - continue; + /* + * For any other types of socket error, close + * the socket and return an error. + */ + LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( + self_comp, self_comp_class, + "Error receiving from Relay", "."); + + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } + } else if (received == 0) { + /* + * The recv() call returned 0. This means the + * connection was orderly shutdown from the other peer. + * If that happens when we are trying to receive + * a message from it, it means something when wrong. + * Close the socket and return an error. + */ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Remote side has closed connection"); + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - } while (ret > 0 && to_copy > 0); - if (ret > 0) - ret = copied; - /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ - return ret; + + BT_ASSERT(received <= to_receive); + total_received += received; + to_receive -= received; + + } while (to_receive > 0); + + BT_ASSERT(total_received == len); + status = LTTNG_LIVE_VIEWER_STATUS_OK; + +end: + return status; } +/* + * This function sends a message to the Relay daemon. + * If it send the message, it returns _OK, + * If it's interrupted, it returns _INTERRUPTED, + * otherwise, it returns _ERROR. + */ static -ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection, +enum lttng_live_viewer_status lttng_live_send( + struct live_viewer_connection *viewer_connection, const void *buf, size_t len) { + enum lttng_live_viewer_status status; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter; BT_SOCKET sock = viewer_connection->control_sock; - ssize_t ret; - - for (;;) { - ret = bt_socket_send_nosigpipe(sock, buf, len); - if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - break; + size_t to_send = len; + ssize_t total_sent = 0; + + do { + ssize_t sent = bt_socket_send_nosigpipe(sock, buf + total_sent, + to_send); + if (sent == BT_SOCKET_ERROR) { + if (bt_socket_interrupted()) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { + /* + * This interruption was a SIGINT and + * the graph is being teared down. + */ + status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED; + lttng_live_msg_iter->was_interrupted = true; + goto end; + } else { + /* + * A signal was received, but the graph + * is not being teared down. Carry on. + */ + continue; + } } else { - continue; + /* + * For any other types of socket error, close + * the socket and return an error. + */ + LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( + self_comp, self_comp_class, + "Error sending to Relay", "."); + + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - } else { - break; } - } - return ret; + + BT_ASSERT(sent <= to_send); + total_sent += sent; + to_send -= sent; + + } while (to_send > 0); + + BT_ASSERT(total_sent == len); + status = LTTNG_LIVE_VIEWER_STATUS_OK; + +end: + return status; } static int parse_url(struct live_viewer_connection *viewer_connection) { char error_buf[256] = { 0 }; + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; struct bt_common_lttng_live_url_parts lttng_live_url_parts = { 0 }; int ret = -1; const char *path = viewer_connection->url->str; @@ -117,15 +362,18 @@ int parse_url(struct live_viewer_connection *viewer_connection) goto end; } - lttng_live_url_parts = bt_common_parse_lttng_live_url(path, - error_buf, sizeof(error_buf)); + lttng_live_url_parts = bt_common_parse_lttng_live_url(path, error_buf, + sizeof(error_buf)); if (!lttng_live_url_parts.proto) { - BT_COMP_LOGW("Invalid LTTng live URL format: %s", error_buf); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class,"Invalid LTTng live URL format: %s", + error_buf); goto end; } + viewer_connection->proto = lttng_live_url_parts.proto; + lttng_live_url_parts.proto = NULL; - viewer_connection->relay_hostname = - lttng_live_url_parts.hostname; + viewer_connection->relay_hostname = lttng_live_url_parts.hostname; lttng_live_url_parts.hostname = NULL; if (lttng_live_url_parts.port >= 0) { @@ -134,26 +382,14 @@ int parse_url(struct live_viewer_connection *viewer_connection) viewer_connection->port = LTTNG_DEFAULT_NETWORK_VIEWER_PORT; } - viewer_connection->target_hostname = - lttng_live_url_parts.target_hostname; + viewer_connection->target_hostname = lttng_live_url_parts.target_hostname; lttng_live_url_parts.target_hostname = NULL; if (lttng_live_url_parts.session_name) { - viewer_connection->session_name = - lttng_live_url_parts.session_name; + viewer_connection->session_name = lttng_live_url_parts.session_name; lttng_live_url_parts.session_name = NULL; } - BT_COMP_LOGI("Connecting to hostname : %s, port : %d, " - "target hostname : %s, session name : %s, " - "proto : %s", - viewer_connection->relay_hostname->str, - viewer_connection->port, - !viewer_connection->target_hostname ? - "" : viewer_connection->target_hostname->str, - !viewer_connection->session_name ? - "" : viewer_connection->session_name->str, - lttng_live_url_parts.proto->str); ret = 0; end: @@ -162,14 +398,21 @@ end: } static -int lttng_live_handshake(struct live_viewer_connection *viewer_connection) +enum lttng_live_viewer_status lttng_live_handshake( + struct live_viewer_connection *viewer_connection) { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + enum lttng_live_viewer_status status; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); char cmd_buf[cmd_buf_len]; - int ret; - ssize_t ret_len; + + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u", + lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, + LTTNG_LIVE_MINOR); cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); cmd.data_size = htobe64((uint64_t) sizeof(connect)); @@ -187,33 +430,33 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection) */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending version: %s", bt_socket_errormsg()); - goto error; - } - - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "viewer connect command"); + goto end; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving version: %s", bt_socket_errormsg()); - goto error; + + status = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "viewer connect reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(connect)); - BT_COMP_LOGI("Received viewer session ID : %" PRIu64, - (uint64_t) be64toh(connect.viewer_session_id)); - BT_COMP_LOGI("Relayd version : %u.%u", be32toh(connect.major), - be32toh(connect.minor)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Received viewer session ID : %" PRIu64, + (uint64_t) be64toh(connect.viewer_session_id)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Relayd version : %u.%u", be32toh(connect.major), + be32toh(connect.minor)); if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) { - BT_COMP_LOGE("Incompatible lttng-relayd protocol"); - goto error; + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Incompatible lttng-relayd protocol"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } /* Use the smallest protocol version implemented. */ if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) { @@ -222,34 +465,56 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection) viewer_connection->minor = LTTNG_LIVE_MINOR; } viewer_connection->major = LTTNG_LIVE_MAJOR; - ret = 0; - return ret; -error: - BT_COMP_LOGE("Unable to establish connection"); - return -1; + status = LTTNG_LIVE_VIEWER_STATUS_OK; + + goto end; + +end: + return status; } static -int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) +enum lttng_live_viewer_status lttng_live_connect_viewer( + struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; - int ret; + enum lttng_live_viewer_status status; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; if (parse_url(viewer_connection)) { + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Failed to parse URL"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connecting to hostname : %s, port : %d, " + "target hostname : %s, session name : %s, proto : %s", + viewer_connection->relay_hostname->str, + viewer_connection->port, + !viewer_connection->target_hostname ? + "" : viewer_connection->target_hostname->str, + !viewer_connection->session_name ? + "" : viewer_connection->session_name->str, + viewer_connection->proto->str); + host = gethostbyname(viewer_connection->relay_hostname->str); if (!host) { - BT_COMP_LOGE("Cannot lookup hostname %s", + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Cannot lookup hostname: hostname=\"%s\"", viewer_connection->relay_hostname->str); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { - BT_COMP_LOGE("Socket creation failed: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Socket creation failed: %s", bt_socket_errormsg()); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } @@ -260,87 +525,92 @@ int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, sizeof(struct sockaddr)) == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Connection failed: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Connection failed: %s", + bt_socket_errormsg()); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } - if (lttng_live_handshake(viewer_connection)) { + + status = lttng_live_handshake(viewer_connection); + + /* + * Only print error and append cause in case of error. not in case of + * interruption. + */ + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Viewer handshake failed"); goto error; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto end; } - ret = 0; - - return ret; + goto end; error: if (viewer_connection->control_sock != BT_INVALID_SOCKET) { if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Close: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s.", bt_socket_errormsg()); } } viewer_connection->control_sock = BT_INVALID_SOCKET; - return -1; +end: + return status; } static void lttng_live_disconnect_viewer( struct live_viewer_connection *viewer_connection) { + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Close: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s", bt_socket_errormsg()); viewer_connection->control_sock = BT_INVALID_SOCKET; } } -static -void connection_release(bt_object *obj) -{ - struct live_viewer_connection *conn = - container_of(obj, struct live_viewer_connection, obj); - - live_viewer_connection_destroy(conn); -} - static int list_update_session(bt_value *results, const struct lttng_viewer_session *session, bool *_found, struct live_viewer_connection *viewer_connection) { + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; int ret = 0; + uint64_t i, len; bt_value *map = NULL; bt_value *hostname = NULL; bt_value *session_name = NULL; bt_value *btval = NULL; - int i, len; bool found = false; - len = bt_value_array_get_size(results); - if (len < 0) { - BT_COMP_LOGE_STR("Error getting size of array."); - ret = -1; - goto end; - } + len = bt_value_array_get_length(results); for (i = 0; i < len; i++) { const char *hostname_str = NULL; const char *session_name_str = NULL; - map = bt_value_array_borrow_element_by_index(results, (size_t) i); - if (!map) { - BT_COMP_LOGE_STR("Error borrowing map."); - ret = -1; - goto end; - } + map = bt_value_array_borrow_element_by_index(results, i); hostname = bt_value_map_borrow_entry_value(map, "target-hostname"); if (!hostname) { - BT_COMP_LOGE_STR("Error borrowing \"target-hostname\" entry."); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, + "Error borrowing \"target-hostname\" entry."); ret = -1; goto end; } session_name = bt_value_map_borrow_entry_value(map, "session-name"); if (!session_name) { - BT_COMP_LOGE_STR("Error borrowing \"session-name\" entry."); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, + "Error borrowing \"session-name\" entry."); ret = -1; goto end; } @@ -357,25 +627,29 @@ int list_update_session(bt_value *results, btval = bt_value_map_borrow_entry_value(map, "stream-count"); if (!btval) { - BT_COMP_LOGE_STR("Error borrowing \"stream-count\" entry."); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE( + self_comp, self_comp_class, + "Error borrowing \"stream-count\" entry."); ret = -1; goto end; } - val = bt_value_integer_signed_get(btval); + val = bt_value_integer_unsigned_get(btval); /* sum */ val += streams; - bt_value_integer_signed_set(btval, val); + bt_value_integer_unsigned_set(btval, val); btval = bt_value_map_borrow_entry_value(map, "client-count"); if (!btval) { - BT_COMP_LOGE_STR("Error borrowing \"client-count\" entry."); + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE( + self_comp, self_comp_class, + "Error borrowing \"client-count\" entry."); ret = -1; goto end; } - val = bt_value_integer_signed_get(btval); + val = bt_value_integer_unsigned_get(btval); /* max */ val = bt_max_t(int64_t, clients, val); - bt_value_integer_signed_set(btval, val); + bt_value_integer_unsigned_set(btval, val); } if (found) { @@ -394,6 +668,7 @@ int list_append_session(bt_value *results, struct live_viewer_connection *viewer_connection) { int ret = 0; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; bt_value_map_insert_entry_status insert_status; bt_value_array_append_element_status append_status; bt_value *map = NULL; @@ -411,13 +686,15 @@ int list_append_session(bt_value *results, map = bt_value_map_create(); if (!map) { - BT_COMP_LOGE_STR("Error creating map value."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error creating map value."); ret = -1; goto end; } if (base_url->len < 1) { - BT_COMP_LOGE_STR("Error: base_url length smaller than 1."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error: base_url length smaller than 1."); ret = -1; goto end; } @@ -433,7 +710,8 @@ int list_append_session(bt_value *results, insert_status = bt_value_map_insert_string_entry(map, "url", url->str); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"url\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"url\" entry."); ret = -1; goto end; } @@ -445,7 +723,8 @@ int list_append_session(bt_value *results, insert_status = bt_value_map_insert_string_entry(map, "target-hostname", session->hostname); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"target-hostname\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"target-hostname\" entry."); ret = -1; goto end; } @@ -457,7 +736,8 @@ int list_append_session(bt_value *results, insert_status = bt_value_map_insert_string_entry(map, "session-name", session->session_name); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"session-name\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"session-name\" entry."); ret = -1; goto end; } @@ -469,10 +749,11 @@ int list_append_session(bt_value *results, { uint32_t live_timer = be32toh(session->live_timer); - insert_status = bt_value_map_insert_signed_integer_entry( + insert_status = bt_value_map_insert_unsigned_integer_entry( map, "timer-us", live_timer); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"timer-us\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"timer-us\" entry."); ret = -1; goto end; } @@ -485,10 +766,11 @@ int list_append_session(bt_value *results, { uint32_t streams = be32toh(session->streams); - insert_status = bt_value_map_insert_signed_integer_entry(map, + insert_status = bt_value_map_insert_unsigned_integer_entry(map, "stream-count", streams); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"stream-count\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"stream-count\" entry."); ret = -1; goto end; } @@ -501,10 +783,11 @@ int list_append_session(bt_value *results, { uint32_t clients = be32toh(session->clients); - insert_status = bt_value_map_insert_signed_integer_entry(map, + insert_status = bt_value_map_insert_unsigned_integer_entry(map, "client-count", clients); if (insert_status != BT_VALUE_MAP_INSERT_ENTRY_STATUS_OK) { - BT_COMP_LOGE_STR("Error inserting \"client-count\" entry."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error inserting \"client-count\" entry."); ret = -1; goto end; } @@ -512,7 +795,8 @@ int list_append_session(bt_value *results, append_status = bt_value_array_append_element(results, map); if (append_status != BT_VALUE_ARRAY_APPEND_ELEMENT_STATUS_OK) { - BT_COMP_LOGE_STR("Error appending map to results."); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error appending map to results."); ret = -1; } @@ -565,67 +849,74 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions( struct live_viewer_connection *viewer_connection, const bt_value **user_result) { + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; bt_value *result = NULL; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; uint32_t i, sessions_count; - ssize_t ret_len; result = bt_value_array_create(); if (!result) { - BT_COMP_LOGE("Error creating array"); + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error creating array"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_MEMORY_ERROR; goto error; } + BT_LOGD("Requesting list of sessions: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS)); + cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg()); + viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error sending list sessions command"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + goto error; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); + viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list)); + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error receiving session list"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving session list: %s", bt_socket_errormsg()); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; goto error; } - BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection, &lsession, + viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error receiving session:"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving session: %s", bt_socket_errormsg()); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; goto error; } - BT_ASSERT(ret_len == sizeof(lsession)); + lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; if (list_append_session(result, viewer_connection->url, &lsession, viewer_connection)) { + BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, + "Error appending session"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; } @@ -640,57 +931,57 @@ end: } static -int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter) +enum lttng_live_viewer_status lttng_live_query_session_ids( + struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; struct lttng_viewer_session lsession; uint32_t i, sessions_count; - ssize_t ret_len; uint64_t session_id; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + + BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS)); cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "list sessions command"); + goto end; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving session list: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &list, sizeof(list)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "session list reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection, - &lsession, sizeof(lsession)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving session: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &lsession, + sizeof(lsession)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); - BT_COMP_LOGI("Adding session %" PRIu64 " hostname: %s session_name: %s", + BT_COMP_LOGI("Adding session to internal list: " + "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"", session_id, lsession.hostname, lsession.session_name); if ((strncmp(lsession.session_name, @@ -698,95 +989,106 @@ int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { + if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname, lsession.session_name)) { - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to add live session"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } } } - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - BT_COMP_LOGE("Unable to query session ids"); - return -1; +end: + return status; } BT_HIDDEN -int lttng_live_create_viewer_session( +enum lttng_live_viewer_status lttng_live_create_viewer_session( struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_create_session_response resp; - ssize_t ret_len; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Creating a viewer session: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION)); cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "create session command"); + goto end; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving create session reply: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "create session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(resp)); if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { - BT_COMP_LOGE("Error creating viewer session"); - goto error; - } - if (lttng_live_query_session_ids(lttng_live_msg_iter)) { - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating viewer session"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - return 0; + status = lttng_live_query_session_ids(lttng_live_msg_iter); + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Failed to query live viewer session ids"); + goto end; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto end; + } -error: - return -1; +end: + return status; } static -int receive_streams(struct lttng_live_session *session, - uint32_t stream_count) +enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session, + uint32_t stream_count, + bt_self_message_iterator *self_msg_iter) { - ssize_t ret_len; uint32_t i; struct lttng_live_msg_iter *lttng_live_msg_iter = - session->lttng_live_msg_iter; + session->lttng_live_msg_iter; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; - BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count); + BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count); for (i = 0; i < stream_count; i++) { struct lttng_viewer_stream stream; struct lttng_live_stream_iterator *live_stream; uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving stream"); - goto error; + status = lttng_live_recv(viewer_connection, &stream, + sizeof(stream)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "stream reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(stream)); stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; stream_id = be64toh(stream.id); @@ -794,49 +1096,60 @@ int receive_streams(struct lttng_live_session *session, if (stream.metadata_flag) { BT_COMP_LOGI(" metadata stream %" PRIu64 " : %s/%s", - stream_id, stream.path_name, - stream.channel_name); + stream_id, stream.path_name, stream.channel_name); if (lttng_live_metadata_create_stream(session, ctf_trace_id, stream_id, stream.path_name)) { - BT_COMP_LOGE("Error creating metadata stream"); - - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating metadata stream"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } session->lazy_stream_msg_init = true; } else { BT_COMP_LOGI(" stream %" PRIu64 " : %s/%s", - stream_id, stream.path_name, - stream.channel_name); + stream_id, stream.path_name, stream.channel_name); live_stream = lttng_live_stream_iterator_create(session, - ctf_trace_id, stream_id); + ctf_trace_id, stream_id, self_msg_iter); if (!live_stream) { - BT_COMP_LOGE("Error creating streamn"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Error creating stream"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } } } - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - return -1; +end: + return status; } BT_HIDDEN -int lttng_live_attach_session(struct lttng_live_session *session) +enum lttng_live_viewer_status lttng_live_session_attach( + struct lttng_live_session *session, + bt_self_message_iterator *self_msg_iter) { struct lttng_viewer_cmd cmd; + enum lttng_live_viewer_status status; struct lttng_viewer_attach_session_request rq; struct lttng_viewer_attach_session_response rp; - ssize_t ret_len; - struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; uint64_t session_id = session->id; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 + ", seek=%s", + lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), + session_id, + lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST)); + cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -854,77 +1167,100 @@ int lttng_live_attach_session(struct lttng_live_session *session) */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending attach request: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + status, "attach session command"); + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving attach response: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "attach session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); switch(be32toh(rp.status)) { case LTTNG_VIEWER_ATTACH_OK: break; case LTTNG_VIEWER_ATTACH_UNK: - BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Session id %" PRIu64 " is unknown", session_id); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_ALREADY: - BT_COMP_LOGW("There is already a viewer attached to this session"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "There is already a viewer attached to this session"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_NOT_LIVE: - BT_COMP_LOGW("Not a live session"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_SEEK_ERR: - BT_COMP_LOGE("Wrong seek parameter"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; default: - BT_COMP_LOGE("Unknown attach return code %u", be32toh(rp.status)); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Unknown attach return code %u", be32toh(rp.status)); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } /* We receive the initial list of streams. */ - if (receive_streams(session, streams_count)) { - goto error; + status = receive_streams(session, streams_count, self_msg_iter); + switch (status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + break; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + goto end; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: + BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams"); + goto end; + default: + bt_common_abort(); } session->attached = true; session->new_streams_needed = false; - return 0; - -error: - return -1; +end: + return status; } BT_HIDDEN -int lttng_live_detach_session(struct lttng_live_session *session) +enum lttng_live_viewer_status lttng_live_session_detach( + struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; + enum lttng_live_viewer_status status; struct lttng_viewer_detach_session_request rq; struct lttng_viewer_detach_session_response rp; - ssize_t ret_len; - struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; + bt_self_component *self_comp = session->self_comp; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - if (!session->attached) { + /* + * The session might already be detached and the viewer socket might + * already been closed. This happens when calling this function when + * tearing down the graph after an error. + */ + if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) { return 0; } + BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), + session_id); + cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -939,65 +1275,72 @@ int lttng_live_detach_session(struct lttng_live_session *session) */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending detach request: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + status, "detach session command"); + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving detach response: %s", bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "detach session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); switch(be32toh(rp.status)) { case LTTNG_VIEWER_DETACH_SESSION_OK: break; case LTTNG_VIEWER_DETACH_SESSION_UNK: BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_DETACH_SESSION_ERR: BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; default: BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status)); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } session->attached = false; - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - return -1; +end: + return status; } BT_HIDDEN -ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, - FILE *fp) +enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( + struct lttng_live_trace *trace, FILE *fp, size_t *reply_len) { uint64_t len = 0; - int ret; + enum lttng_live_get_one_metadata_status status; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - char *data = NULL; - ssize_t ret_len; + gchar *data = NULL; + ssize_t writelen; struct lttng_live_session *session = trace->session; - struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + struct lttng_live_msg_iter *lttng_live_msg_iter = + session->lttng_live_msg_iter; struct lttng_live_metadata *metadata = trace->metadata; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + BT_COMP_LOGD("Requesting new metadata for trace:" + "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), + trace->id, metadata->stream_id); + rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1010,79 +1353,88 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); - goto error; + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get metadata command"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg()); - goto error; + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get metadata reply"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); switch (be32toh(rp.status)) { case LTTNG_VIEWER_METADATA_OK: - BT_COMP_LOGD("get_metadata : OK"); + BT_COMP_LOGD("Received get_metadata response: ok"); break; case LTTNG_VIEWER_NO_NEW_METADATA: - BT_COMP_LOGD("get_metadata : NO NEW"); - ret = 0; + BT_COMP_LOGD("Received get_metadata response: no new"); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END; goto end; case LTTNG_VIEWER_METADATA_ERR: - BT_COMP_LOGD("get_metadata : ERR"); - goto error; + /* + * The Relayd cannot find this stream id. Maybe its + * gone already. This can happen in short lived UST app + * in a per-pid session. + */ + BT_COMP_LOGD("Received get_metadata response: error"); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED; + goto end; default: - BT_COMP_LOGD("get_metadata : UNKNOWN"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Received get_metadata response: unknown"); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } len = be64toh(rp.len); BT_COMP_LOGD("Writing %" PRIu64" bytes to metadata", len); if (len <= 0) { - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Erroneous response length"); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - data = calloc(1, len); + data = g_new0(gchar, len); if (!data) { - BT_COMP_LOGE("relay data calloc: %s", strerror(errno)); - goto error; + BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, + "Failed to allocate data buffer", "."); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - ret_len = lttng_live_recv(viewer_connection, data, len); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error_free_data; + + viewer_status = lttng_live_recv(viewer_connection, data, len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get metadata packet"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); - goto error_free_data; + + /* + * Write the metadata to the file handle. + */ + writelen = fwrite(data, sizeof(uint8_t), len, fp); + if (writelen != len) { + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Writing in the metadata file stream"); + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - BT_ASSERT(ret_len == len); - do { - ret_len = fwrite(data, 1, len, fp); - } while (ret_len < 0 && errno == EINTR); - if (ret_len < 0) { - BT_COMP_LOGE("Writing in the metadata fp"); - goto error_free_data; - } - BT_ASSERT(ret_len == len); - free(data); - ret = len; -end: - return ret; + *reply_len = len; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; -error_free_data: - free(data); -error: - return -1; +end: + g_free(data); + return status; } /* @@ -1103,6 +1455,23 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, pindex->events_discarded = be64toh(lindex->events_discarded); } +static +void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) +{ + uint64_t session_idx; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; + + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + BT_COMP_LOGD("Marking session as needing new streams: " + "session-id=%" PRIu64, session->id); + session->new_streams_needed = true; + } +} + BT_HIDDEN enum lttng_live_iterator_status lttng_live_get_next_index( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1111,22 +1480,25 @@ enum lttng_live_iterator_status lttng_live_get_next_index( { struct lttng_viewer_cmd cmd; struct lttng_viewer_get_next_index rq; - ssize_t ret_len; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_index rp; - uint32_t flags, status; - enum lttng_live_iterator_status retstatus = - LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + uint32_t flags, rp_status; + BT_COMP_LOGD("Requesting next index for stream: cmd=%s, " + "viewer-stream-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), + stream->viewer_stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); @@ -1137,131 +1509,135 @@ enum lttng_live_iterator_status lttng_live_get_next_index( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending get_next_index request: %s", - bt_socket_errormsg()); - goto error; - } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get next index command"); goto error; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving get_next_index response: %s", - bt_socket_errormsg()); + + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get next index reply"); goto error; } - BT_ASSERT(ret_len == sizeof(rp)); flags = be32toh(rp.flags); - status = be32toh(rp.status); + rp_status = be32toh(rp.status); - switch (status) { + BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s", + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), + lttng_viewer_next_index_return_code_string(rp_status)); + switch (rp_status) { case LTTNG_VIEWER_INDEX_INACTIVE: { uint64_t ctf_stream_class_id; - BT_COMP_LOGD("get_next_index: inactive"); memset(index, 0, sizeof(struct packet_index)); index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); stream->current_inactivity_ts = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); - if (stream->ctf_stream_class_id != -1ULL) { - BT_ASSERT(stream->ctf_stream_class_id == + if (stream->ctf_stream_class_id.is_set) { + BT_ASSERT(stream->ctf_stream_class_id.value== ctf_stream_class_id); } else { - stream->ctf_stream_class_id = ctf_stream_class_id; + stream->ctf_stream_class_id.value = ctf_stream_class_id; + stream->ctf_stream_class_id.is_set = true; } - stream->state = LTTNG_LIVE_STREAM_QUIESCENT; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT); + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } case LTTNG_VIEWER_INDEX_OK: { uint64_t ctf_stream_class_id; - BT_COMP_LOGD("get_next_index: OK"); lttng_index_to_packet_index(&rp, index); ctf_stream_class_id = be64toh(rp.stream_id); - if (stream->ctf_stream_class_id != -1ULL) { - BT_ASSERT(stream->ctf_stream_class_id == + if (stream->ctf_stream_class_id.is_set) { + BT_ASSERT(stream->ctf_stream_class_id.value== ctf_stream_class_id); } else { - stream->ctf_stream_class_id = ctf_stream_class_id; + stream->ctf_stream_class_id.value = ctf_stream_class_id; + stream->ctf_stream_class_id.is_set = true; } - stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA); if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - BT_COMP_LOGD("get_next_index: new metadata needed"); - trace->new_metadata_needed = true; + BT_COMP_LOGD("Marking trace as needing new metadata: " + "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64, + lttng_viewer_next_index_return_code_string(rp_status), + trace->id); + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { - BT_COMP_LOGD("get_next_index: new streams needed"); + BT_COMP_LOGD("Marking all sessions as possibly needing new streams: " + "response=%s, response-flag=NEW_STREAM", + lttng_viewer_next_index_return_code_string(rp_status)); lttng_live_need_new_streams(lttng_live_msg_iter); } + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } case LTTNG_VIEWER_INDEX_RETRY: - BT_COMP_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; case LTTNG_VIEWER_INDEX_HUP: - BT_COMP_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = LTTNG_LIVE_ITERATOR_STATUS_END; - stream->state = LTTNG_LIVE_STREAM_EOF; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF); stream->has_stream_hung_up = true; + status = LTTNG_LIVE_ITERATOR_STATUS_END; break; case LTTNG_VIEWER_INDEX_ERR: - BT_COMP_LOGE("get_next_index: error"); memset(index, 0, sizeof(struct packet_index)); - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - goto error; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; default: - BT_COMP_LOGE("get_next_index: unknown value"); + BT_COMP_LOGD("Received get_next_index response: unknown value"); memset(index, 0, sizeof(struct packet_index)); - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - goto error; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; } -end: - return retstatus; + goto end; error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } - return retstatus; + status = viewer_status_to_live_iterator_status(viewer_status); +end: + return status; } BT_HIDDEN -enum bt_msg_iter_medium_status lttng_live_get_stream_bytes( +enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( struct lttng_live_msg_iter *lttng_live_msg_iter, struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum bt_msg_iter_medium_status retstatus = BT_MSG_ITER_MEDIUM_STATUS_OK; + enum ctf_msg_iter_medium_status status; + enum lttng_live_viewer_status viewer_status; + struct lttng_viewer_trace_packet rp; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; - struct lttng_viewer_trace_packet rp; - ssize_t ret_len; - uint32_t flags, status; struct live_viewer_connection *viewer_connection = - lttng_live_msg_iter->viewer_connection; + lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + uint32_t flags, rp_status; - BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, + BT_COMP_LOGD("Requesting data from stream: cmd=%s, " + "offset=%" PRIu64 ", request-len=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1278,118 +1654,129 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); - goto error; - } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get data packet command"); + goto error_convert_status; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving get_data response: %s", bt_socket_errormsg()); - goto error; - } - if (ret_len != sizeof(rp)) { - BT_COMP_LOGE("get_data_packet: expected %zu" - ", received %zd", sizeof(rp), - ret_len); - goto error; + + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get data packet reply"); + goto error_convert_status; } flags = be32toh(rp.flags); - status = be32toh(rp.status); + rp_status = be32toh(rp.status); - switch (status) { + BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s", + lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), + lttng_viewer_get_packet_return_code_string(rp_status)); + switch (rp_status) { case LTTNG_VIEWER_GET_PACKET_OK: req_len = be32toh(rp.len); - BT_COMP_LOGD("get_data_packet: Ok, packet size : %" PRIu64 "", req_len); + BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "", + lttng_viewer_get_packet_return_code_string(rp_status), + req_len); + status = CTF_MSG_ITER_MEDIUM_STATUS_OK; break; case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ - BT_COMP_LOGD("get_data_packet: retry"); - retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; + status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - BT_COMP_LOGD("get_data_packet: new metadata needed, try again later"); - trace->new_metadata_needed = true; + BT_COMP_LOGD("Marking trace as needing new metadata: " + "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64, + lttng_viewer_next_index_return_code_string(rp_status), + trace->id); + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { - BT_COMP_LOGD("get_data_packet: new streams needed, try again later"); + BT_COMP_LOGD("Marking all sessions as possibly needing new streams: " + "response=%s, response-flag=NEW_STREAM", + lttng_viewer_next_index_return_code_string(rp_status)); lttng_live_need_new_streams(lttng_live_msg_iter); } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; + status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s", + lttng_viewer_get_packet_return_code_string(rp_status)); goto end; } - BT_COMP_LOGE("get_data_packet: error"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Received get_data_packet response: error"); + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + goto end; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = BT_MSG_ITER_MEDIUM_STATUS_EOF; + status = CTF_MSG_ITER_MEDIUM_STATUS_EOF; goto end; default: - BT_COMP_LOGE("get_data_packet: unknown"); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Received get_data_packet response: unknown (%d)", rp_status); + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + goto end; } if (req_len == 0) { - goto error; + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + goto end; } - ret_len = lttng_live_recv(viewer_connection, buf, req_len); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; + viewer_status = lttng_live_recv(viewer_connection, buf, req_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get data packet"); + goto error_convert_status; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); - goto error; - } - BT_ASSERT(ret_len == req_len); - *recv_len = ret_len; -end: - return retstatus; + *recv_len = req_len; -error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; - } else { - retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR; - } - return retstatus; + status = CTF_MSG_ITER_MEDIUM_STATUS_OK; + goto end; + +error_convert_status: + status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status); +end: + return status; } /* * Request new streams for a session. */ BT_HIDDEN -enum lttng_live_iterator_status lttng_live_get_new_streams( - struct lttng_live_session *session) +enum lttng_live_iterator_status lttng_live_session_get_new_streams( + struct lttng_live_session *session, + bt_self_message_iterator *self_msg_iter) { enum lttng_live_iterator_status status = LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; - ssize_t ret_len; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + enum lttng_live_viewer_status viewer_status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; + bt_self_component *self_comp = viewer_connection->self_comp; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return LTTNG_LIVE_ITERATOR_STATUS_OK; + status = LTTNG_LIVE_ITERATOR_STATUS_OK; + goto end; } + BT_COMP_LOGD("Requesting new streams for session: cmd=%s" + "session-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), + session->id); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1404,24 +1791,22 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending get_new_streams request: %s", - bt_socket_errormsg()); - goto error; - } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error; + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get new streams command"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving get_new_streams response"); - goto error; + + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get new streams reply"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); @@ -1438,89 +1823,139 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( status = LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: - BT_COMP_LOGE("get_new_streams error"); - goto error; + BT_COMP_LOGD("Received get_new_streams response: error"); + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; default: - BT_COMP_LOGE("Unknown return code %u", be32toh(rp.status)); - goto error; + BT_COMP_LOGE_APPEND_CAUSE(self_comp, + "Received get_new_streams response: Unknown:" + "return code %u", be32toh(rp.status)); + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; } - if (receive_streams(session, streams_count)) { - goto error; + viewer_status = receive_streams(session, streams_count, self_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "new streams"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } -end: - return status; -error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } + status = LTTNG_LIVE_ITERATOR_STATUS_OK; +end: return status; } BT_HIDDEN -struct live_viewer_connection *live_viewer_connection_create( +enum lttng_live_viewer_status live_viewer_connection_create( + bt_self_component *self_comp, + bt_self_component_class *self_comp_class, + bt_logging_level log_level, const char *url, bool in_query, struct lttng_live_msg_iter *lttng_live_msg_iter, - bt_logging_level log_level) + struct live_viewer_connection **viewer) { struct live_viewer_connection *viewer_connection; + enum lttng_live_viewer_status status; viewer_connection = g_new0(struct live_viewer_connection, 1); if (bt_socket_init(log_level) != 0) { + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Failed to init socket"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } viewer_connection->log_level = log_level; - if (lttng_live_msg_iter) { - viewer_connection->self_comp = lttng_live_msg_iter->self_comp; - } + viewer_connection->self_comp = self_comp; + viewer_connection->self_comp_class = self_comp_class; - bt_object_init_shared(&viewer_connection->obj, connection_release); viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; viewer_connection->in_query = in_query; viewer_connection->lttng_live_msg_iter = lttng_live_msg_iter; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Failed to allocate URL buffer"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } - BT_COMP_LOGI("Establishing connection to url \"%s\"...", url); - if (lttng_live_connect_viewer(viewer_connection)) { - goto error_report; + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Establishing connection to url \"%s\"...", url); + status = lttng_live_connect_viewer(viewer_connection); + /* + * Only print error and append cause in case of error. not in case of + * interruption. + */ + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Failed to establish connection: " + "url=\"%s\"", url); + goto error; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto error; } - BT_COMP_LOGI("Connection to url \"%s\" is established", url); - return viewer_connection; + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connection to url \"%s\" is established", url); + + *viewer = viewer_connection; + status = LTTNG_LIVE_VIEWER_STATUS_OK; + goto end; -error_report: - BT_COMP_LOGW("Failure to establish connection to url \"%s\"", url); error: - g_free(viewer_connection); - return NULL; + if (viewer_connection) { + live_viewer_connection_destroy(viewer_connection); + } +end: + return status; } BT_HIDDEN void live_viewer_connection_destroy( struct live_viewer_connection *viewer_connection) { - BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str); + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + + if (!viewer_connection) { + goto end; + } + + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Closing connection to relay:" + "relay-url=\"%s\"", viewer_connection->url->str); + lttng_live_disconnect_viewer(viewer_connection); - g_string_free(viewer_connection->url, true); + + if (viewer_connection->url) { + g_string_free(viewer_connection->url, true); + } + if (viewer_connection->relay_hostname) { g_string_free(viewer_connection->relay_hostname, true); } + if (viewer_connection->target_hostname) { g_string_free(viewer_connection->target_hostname, true); } + if (viewer_connection->session_name) { g_string_free(viewer_connection->session_name, true); } + + if (viewer_connection->proto) { + g_string_free(viewer_connection->proto, true); + } + g_free(viewer_connection); bt_socket_fini(); + +end: + return; }