X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=be9c79e0035347660c98be2ef013b23a29c52f85;hb=0235b0db7de5bcacdb3650c92461f2ce5eb2143d;hp=b1f37f04a0516bc04addd95af1f4c48ba2de7678;hpb=851de941fc0cdea645b905098fb8f2a61dfdd161;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index b1f37f04..be9c79e0 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -1,24 +1,8 @@ /* - * Copyright 2019 - Francis Deslauriers - * Copyright 2016 - Mathieu Desnoyers + * SPDX-License-Identifier: MIT * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2019 Francis Deslauriers + * Copyright 2016 Mathieu Desnoyers */ #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp) @@ -48,64 +32,240 @@ #include "data-stream.h" #include "metadata.h" +#define viewer_handle_send_recv_status(_self_comp, _self_comp_class, \ + _status, _action, _msg_str) \ +do { \ + switch (_status) { \ + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: \ + break; \ + case LTTNG_LIVE_VIEWER_STATUS_ERROR: \ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, \ + _self_comp_class, "Error " _action " " _msg_str); \ + break; \ + default: \ + bt_common_abort(); \ + } \ +} while (0) + +#define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \ + viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \ + "sending", _msg_str) + +#define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \ + viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \ + "receiving", _msg_str) + +#define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, \ + _self_comp_class, _msg, _fmt, ...) \ + do { \ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class, \ + _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \ + } while (0) + +static inline +enum lttng_live_iterator_status viewer_status_to_live_iterator_status( + enum lttng_live_viewer_status viewer_status) +{ + switch (viewer_status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + return LTTNG_LIVE_ITERATOR_STATUS_OK; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: + return LTTNG_LIVE_ITERATOR_STATUS_ERROR; + default: + bt_common_abort(); + } +} + +static inline +enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status( + enum lttng_live_viewer_status viewer_status) +{ + switch (viewer_status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + return CTF_MSG_ITER_MEDIUM_STATUS_OK; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: + return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + default: + bt_common_abort(); + } +} + +static inline +void viewer_connection_close_socket( + struct live_viewer_connection *viewer_connection) +{ + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + int ret = bt_socket_close(viewer_connection->control_sock); + if (ret == -1) { + BT_COMP_OR_COMP_CLASS_LOGW_ERRNO( + self_comp, self_comp_class, + "Error closing viewer connection socket: ", "."); + } + + viewer_connection->control_sock = BT_INVALID_SOCKET; +} + +/* + * This function receives a message from the Relay daemon. + * If it received the entire message, it returns _OK, + * If it's interrupted, it returns _INTERRUPTED, + * otherwise, it returns _ERROR. + */ static -ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection, +enum lttng_live_viewer_status lttng_live_recv( + struct live_viewer_connection *viewer_connection, void *buf, size_t len) { - ssize_t ret; - size_t copied = 0, to_copy = len; + ssize_t received; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + size_t total_received = 0, to_receive = len; struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter; + enum lttng_live_viewer_status status; BT_SOCKET sock = viewer_connection->control_sock; + /* + * Receive a message from the Relay. + */ do { - ret = bt_socket_recv(sock, buf + copied, to_copy, 0); - if (ret > 0) { - BT_ASSERT(ret <= to_copy); - copied += ret; - to_copy -= ret; - } - if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - break; + received = bt_socket_recv(sock, buf + total_received, to_receive, 0); + if (received == BT_SOCKET_ERROR) { + if (bt_socket_interrupted()) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { + /* + * This interruption was due to a + * SIGINT and the graph is being torn + * down. + */ + status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED; + lttng_live_msg_iter->was_interrupted = true; + goto end; + } else { + /* + * A signal was received, but the graph + * is not being torn down. Carry on. + */ + continue; + } } else { - continue; + /* + * For any other types of socket error, close + * the socket and return an error. + */ + LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( + self_comp, self_comp_class, + "Error receiving from Relay", "."); + + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } + } else if (received == 0) { + /* + * The recv() call returned 0. This means the + * connection was orderly shutdown from the other peer. + * If that happens when we are trying to receive + * a message from it, it means something when wrong. + * Close the socket and return an error. + */ + BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, + self_comp_class, "Remote side has closed connection"); + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - } while (ret > 0 && to_copy > 0); - if (ret > 0) { - ret = copied; - } + BT_ASSERT(received <= to_receive); + total_received += received; + to_receive -= received; - /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ - return ret; + } while (to_receive > 0); + + BT_ASSERT(total_received == len); + status = LTTNG_LIVE_VIEWER_STATUS_OK; + +end: + return status; } +/* + * This function sends a message to the Relay daemon. + * If it send the message, it returns _OK, + * If it's interrupted, it returns _INTERRUPTED, + * otherwise, it returns _ERROR. + */ static -ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection, +enum lttng_live_viewer_status lttng_live_send( + struct live_viewer_connection *viewer_connection, const void *buf, size_t len) { + enum lttng_live_viewer_status status; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; struct lttng_live_msg_iter *lttng_live_msg_iter = viewer_connection->lttng_live_msg_iter; BT_SOCKET sock = viewer_connection->control_sock; - ssize_t ret; - - for (;;) { - ret = bt_socket_send_nosigpipe(sock, buf, len); - if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { - if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - break; + size_t to_send = len; + ssize_t total_sent = 0; + + do { + ssize_t sent = bt_socket_send_nosigpipe(sock, buf + total_sent, + to_send); + if (sent == BT_SOCKET_ERROR) { + if (bt_socket_interrupted()) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { + /* + * This interruption was a SIGINT and + * the graph is being teared down. + */ + status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED; + lttng_live_msg_iter->was_interrupted = true; + goto end; + } else { + /* + * A signal was received, but the graph + * is not being teared down. Carry on. + */ + continue; + } } else { - continue; + /* + * For any other types of socket error, close + * the socket and return an error. + */ + LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( + self_comp, self_comp_class, + "Error sending to Relay", "."); + + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - } else { - break; } - } - return ret; + + BT_ASSERT(sent <= to_send); + total_sent += sent; + to_send -= sent; + + } while (to_send > 0); + + BT_ASSERT(total_sent == len); + status = LTTNG_LIVE_VIEWER_STATUS_OK; + +end: + return status; } static @@ -130,6 +290,8 @@ int parse_url(struct live_viewer_connection *viewer_connection) error_buf); goto end; } + viewer_connection->proto = lttng_live_url_parts.proto; + lttng_live_url_parts.proto = NULL; viewer_connection->relay_hostname = lttng_live_url_parts.hostname; lttng_live_url_parts.hostname = NULL; @@ -148,15 +310,6 @@ int parse_url(struct live_viewer_connection *viewer_connection) lttng_live_url_parts.session_name = NULL; } - BT_COMP_LOGI("Connecting to hostname : %s, port : %d, " - "target hostname : %s, session name : %s, proto : %s", - viewer_connection->relay_hostname->str, - viewer_connection->port, - !viewer_connection->target_hostname ? - "" : viewer_connection->target_hostname->str, - !viewer_connection->session_name ? - "" : viewer_connection->session_name->str, - lttng_live_url_parts.proto->str); ret = 0; end: @@ -165,16 +318,21 @@ end: } static -int lttng_live_handshake(struct live_viewer_connection *viewer_connection) +enum lttng_live_viewer_status lttng_live_handshake( + struct live_viewer_connection *viewer_connection) { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + enum lttng_live_viewer_status status; bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; bt_self_component *self_comp = viewer_connection->self_comp; const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); char cmd_buf[cmd_buf_len]; - int ret; - ssize_t ret_len; + + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Handshaking with the Relay: " + "major-version=%u, minor-version=%u", + LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR); cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); cmd.data_size = htobe64((uint64_t) sizeof(connect)); @@ -192,39 +350,33 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection) */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, - self_comp_class, "Error sending version: %s", - bt_socket_errormsg()); - goto error; - } - - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); - if (ret_len == 0) { - BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, - self_comp_class, "Remote side has closed connection"); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "viewer connect command"); + goto end; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, - self_comp_class, "Error receiving version: %s", - bt_socket_errormsg()); - goto error; + + status = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "viewer connect reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(connect)); - BT_COMP_LOGI("Received viewer session ID : %" PRIu64, - (uint64_t) be64toh(connect.viewer_session_id)); - BT_COMP_LOGI("Relayd version : %u.%u", be32toh(connect.major), - be32toh(connect.minor)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Received viewer session ID : %" PRIu64, + (uint64_t) be64toh(connect.viewer_session_id)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Relayd version : %u.%u", be32toh(connect.major), + be32toh(connect.minor)); if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Incompatible lttng-relayd protocol"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } /* Use the smallest protocol version implemented. */ if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) { @@ -233,39 +385,56 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection) viewer_connection->minor = LTTNG_LIVE_MINOR; } viewer_connection->major = LTTNG_LIVE_MAJOR; - ret = 0; - return ret; -error: - return -1; + status = LTTNG_LIVE_VIEWER_STATUS_OK; + + goto end; + +end: + return status; } static -int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) +enum lttng_live_viewer_status lttng_live_connect_viewer( + struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; + enum lttng_live_viewer_status status; bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; bt_self_component *self_comp = viewer_connection->self_comp; - int ret; if (parse_url(viewer_connection)) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to parse URL"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connecting to hostname : %s, port : %d, " + "target hostname : %s, session name : %s, proto : %s", + viewer_connection->relay_hostname->str, + viewer_connection->port, + !viewer_connection->target_hostname ? + "" : viewer_connection->target_hostname->str, + !viewer_connection->session_name ? + "" : viewer_connection->session_name->str, + viewer_connection->proto->str); + host = gethostbyname(viewer_connection->relay_hostname->str); if (!host) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Cannot lookup hostname: hostname=\"%s\"", viewer_connection->relay_hostname->str); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Socket creation failed: %s", bt_socket_errormsg()); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } @@ -279,39 +448,51 @@ int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection) BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Connection failed: %s", bt_socket_errormsg()); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } - if (lttng_live_handshake(viewer_connection)) { + + status = lttng_live_handshake(viewer_connection); + + /* + * Only print error and append cause in case of error. not in case of + * interruption. + */ + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Viewer handshake failed"); goto error; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto end; } - ret = 0; - - return ret; + goto end; error: if (viewer_connection->control_sock != BT_INVALID_SOCKET) { if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_OR_COMP_CLASS_LOGE(self_comp, self_comp_class, - "Error closing socket: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s.", bt_socket_errormsg()); } } viewer_connection->control_sock = BT_INVALID_SOCKET; - return -1; +end: + return status; } static void lttng_live_disconnect_viewer( struct live_viewer_connection *viewer_connection) { + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error closing socket: %s", - bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s", bt_socket_errormsg()); viewer_connection->control_sock = BT_INVALID_SOCKET; } } @@ -592,10 +773,10 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions( bt_component_class_query_method_status status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK; bt_value *result = NULL; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; uint32_t i, sessions_count; - ssize_t ret_len; result = bt_value_array_create(); if (!result) { @@ -609,51 +790,44 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions( cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { + viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Error sending cmd: %s", bt_socket_errormsg()); + "Error sending list sessions command"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; + goto error; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); - if (ret_len == 0) { + viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list)); + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Remote side has closed connection"); + "Error receiving session list"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Error receiving session list: %s", - bt_socket_errormsg()); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; goto error; } - BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection, &lsession, + viewer_status = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); - if (ret_len == 0) { + if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Remote side has closed connection"); + "Error receiving session:"); status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class, - "Error receiving session: %s", - bt_socket_errormsg()); - status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR; + } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN; goto error; } - BT_ASSERT(ret_len == sizeof(lsession)); + lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; if (list_append_session(result, viewer_connection->url, @@ -674,65 +848,56 @@ end: } static -int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter) +enum lttng_live_viewer_status lttng_live_query_session_ids( + struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_list_sessions list; struct lttng_viewer_session lsession; uint32_t i, sessions_count; - ssize_t ret_len; uint64_t session_id; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + + BT_COMP_LOGD("Asking the Relay for the list of sessions"); cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "list sessions command"); + goto end; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving session list: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &list, sizeof(list)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "session list reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection, - &lsession, sizeof(lsession)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving session: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &lsession, + sizeof(lsession)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); - BT_COMP_LOGI("Adding session %" PRIu64 " hostname: %s session_name: %s", + BT_COMP_LOGI("Adding session to internal list: " + "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"", session_id, lsession.hostname, lsession.session_name); if ((strncmp(lsession.session_name, @@ -740,85 +905,87 @@ int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { + if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname, lsession.session_name)) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to add live session"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } } } - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - return -1; +end: + return status; } BT_HIDDEN -int lttng_live_create_viewer_session( +enum lttng_live_viewer_status lttng_live_create_viewer_session( struct lttng_live_msg_iter *lttng_live_msg_iter) { struct lttng_viewer_cmd cmd; struct lttng_viewer_create_session_response resp; - ssize_t ret_len; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Creating a viewer session"); cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, self_comp_class, + status, "create session command"); + goto end; } - BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving create session reply: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, self_comp_class, + status, "create session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(resp)); if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating viewer session"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } - if (lttng_live_query_session_ids(lttng_live_msg_iter)) { + + status = lttng_live_query_session_ids(lttng_live_msg_iter); + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Failed to query live viewer session ids"); - goto error; + goto end; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto end; } - return 0; - -error: - return -1; +end: + return status; } static -int receive_streams(struct lttng_live_session *session, +enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session, uint32_t stream_count, bt_self_message_iterator *self_msg_iter) { - ssize_t ret_len; uint32_t i; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + enum lttng_live_viewer_status status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; @@ -830,18 +997,13 @@ int receive_streams(struct lttng_live_session *session, uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving stream"); - goto error; + status = lttng_live_recv(viewer_connection, &stream, + sizeof(stream)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "stream reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(stream)); stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; stream_id = be64toh(stream.id); @@ -855,8 +1017,8 @@ int receive_streams(struct lttng_live_session *session, stream.path_name)) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating metadata stream"); - - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } session->lazy_stream_msg_init = true; } else { @@ -867,23 +1029,24 @@ int receive_streams(struct lttng_live_session *session, if (!live_stream) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error creating stream"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } } } - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - return -1; +end: + return status; } BT_HIDDEN -enum lttng_live_attach_session_status lttng_live_attach_session( +enum lttng_live_viewer_status lttng_live_session_attach( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { struct lttng_viewer_cmd cmd; - enum lttng_live_attach_session_status attach_status; + enum lttng_live_viewer_status status; struct lttng_viewer_attach_session_request rq; struct lttng_viewer_attach_session_response rp; struct lttng_live_msg_iter *lttng_live_msg_iter = @@ -895,7 +1058,8 @@ enum lttng_live_attach_session_status lttng_live_attach_session( uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - ssize_t ret_len; + + BT_COMP_LOGD("Attaching to session: session-id=%"PRIu64, session_id); cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -914,28 +1078,19 @@ enum lttng_live_attach_session_status lttng_live_attach_session( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error sending attach request: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + status, "attach session command"); + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving attach response: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "attach session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); switch(be32toh(rp.status)) { @@ -944,58 +1099,72 @@ enum lttng_live_attach_session_status lttng_live_attach_session( case LTTNG_VIEWER_ATTACH_UNK: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Session id %" PRIu64 " is unknown", session_id); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_ALREADY: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "There is already a viewer attached to this session"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_NOT_LIVE: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_ATTACH_SEEK_ERR: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter"); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; default: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unknown attach return code %u", be32toh(rp.status)); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } /* We receive the initial list of streams. */ - if (receive_streams(session, streams_count, self_msg_iter)) { + status = receive_streams(session, streams_count, self_msg_iter); + switch (status) { + case LTTNG_LIVE_VIEWER_STATUS_OK: + break; + case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED: + goto end; + case LTTNG_LIVE_VIEWER_STATUS_ERROR: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams"); - goto error; + goto end; + default: + bt_common_abort(); } session->attached = true; session->new_streams_needed = false; - attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_OK; - goto end; - -error: - attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_ERROR; - end: - return attach_status; + return status; } BT_HIDDEN -int lttng_live_detach_session(struct lttng_live_session *session) +enum lttng_live_viewer_status lttng_live_session_detach( + struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; + enum lttng_live_viewer_status status; struct lttng_viewer_detach_session_request rq; struct lttng_viewer_detach_session_response rp; - ssize_t ret_len; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + bt_self_component *self_comp = session->self_comp; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; uint64_t session_id = session->id; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - if (!session->attached) { + /* + * The session might already be detached and the viewer socket might + * already been closed. This happens when calling this function when + * tearing down the graph after an error. + */ + if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) { return 0; } @@ -1013,46 +1182,43 @@ int lttng_live_detach_session(struct lttng_live_session *session) */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error sending detach request: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + status, "detach session command"); + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE("Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error receiving detach response: %s", - bt_socket_errormsg()); - goto error; + status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + status, "detach session reply"); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); switch(be32toh(rp.status)) { case LTTNG_VIEWER_DETACH_SESSION_OK: break; case LTTNG_VIEWER_DETACH_SESSION_UNK: BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; case LTTNG_VIEWER_DETACH_SESSION_ERR: BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; default: BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status)); - goto error; + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; + goto end; } session->attached = false; - return 0; + status = LTTNG_LIVE_VIEWER_STATUS_OK; -error: - return -1; +end: + return status; } BT_HIDDEN @@ -1060,12 +1226,13 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( struct lttng_live_trace *trace, FILE *fp, size_t *reply_len) { uint64_t len = 0; - enum lttng_live_get_one_metadata_status metadata_status; + enum lttng_live_get_one_metadata_status status; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - char *data = NULL; - ssize_t ret_len; + gchar *data = NULL; + ssize_t writelen; struct lttng_live_session *session = trace->session; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; @@ -1076,6 +1243,10 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + BT_COMP_LOGD("Requesting new metadata for trace: " + "trace-id=%"PRIu64", metadata-stream-id=%"PRIu64, + trace->id, metadata->stream_id); + rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1088,28 +1259,21 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error sending get_metadata request: %s", - bt_socket_errormsg()); - goto error; + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get metadata command"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving get_metadata response: %s", - bt_socket_errormsg()); - goto error; + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get metadata reply"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); switch (be32toh(rp.status)) { case LTTNG_VIEWER_METADATA_OK: @@ -1117,7 +1281,7 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( break; case LTTNG_VIEWER_NO_NEW_METADATA: BT_COMP_LOGD("Received get_metadata response: no new"); - metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END; goto end; case LTTNG_VIEWER_METADATA_ERR: /* @@ -1126,12 +1290,13 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( * in a per-pid session. */ BT_COMP_LOGD("Received get_metadata response: error"); - metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED; goto end; default: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown"); - goto error; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } len = be64toh(rp.len); @@ -1139,49 +1304,43 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( if (len <= 0) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length"); - goto error; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - data = calloc(1, len); + data = g_new0(gchar, len); if (!data) { BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", "."); - goto error; - } - ret_len = lttng_live_recv(viewer_connection, data, len); - if (ret_len == 0) { - BT_COMP_LOGI("Remote side has closed connection"); - goto error_free_data; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving trace packet: %s", bt_socket_errormsg()); - goto error_free_data; + + viewer_status = lttng_live_recv(viewer_connection, data, len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get metadata packet"); + status = (enum lttng_live_get_one_metadata_status) viewer_status; + goto end; } - BT_ASSERT(ret_len == len); - do { - ret_len = fwrite(data, 1, len, fp); - } while (ret_len < 0 && errno == EINTR); - if (ret_len < 0) { + /* + * Write the metadata to the file handle. + */ + writelen = fwrite(data, sizeof(uint8_t), len, fp); + if (writelen != len) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream"); - goto error_free_data; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + goto end; } - BT_ASSERT(ret_len == len); - free(data); - *reply_len = len; - metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; - - goto end; -error_free_data: - free(data); -error: - metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; + *reply_len = len; + status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; end: - return metadata_status; + g_free(data); + return status; } /* @@ -1202,6 +1361,19 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, pindex->events_discarded = be64toh(lindex->events_discarded); } +static +void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) +{ + uint64_t session_idx; + + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + session->new_streams_needed = true; + } +} + BT_HIDDEN enum lttng_live_iterator_status lttng_live_get_next_index( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1210,16 +1382,19 @@ enum lttng_live_iterator_status lttng_live_get_next_index( { struct lttng_viewer_cmd cmd; struct lttng_viewer_get_next_index rq; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_index rp; - enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK; + enum lttng_live_iterator_status status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - uint32_t flags, status; - ssize_t ret_len; + uint32_t flags, rp_status; + + BT_COMP_LOGD("Requesting next index for stream: " + "stream-id=%"PRIu64, stream->viewer_stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1236,33 +1411,24 @@ enum lttng_live_iterator_status lttng_live_get_next_index( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error sending get_next_index request: %s", - bt_socket_errormsg()); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get next index command"); goto error; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get next index reply"); goto error; } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving get_next_index response: %s", - bt_socket_errormsg()); - goto error; - } - BT_ASSERT(ret_len == sizeof(rp)); flags = be32toh(rp.flags); - status = be32toh(rp.status); + rp_status = be32toh(rp.status); - switch (status) { + switch (rp_status) { case LTTNG_VIEWER_INDEX_INACTIVE: { uint64_t ctf_stream_class_id; @@ -1279,6 +1445,7 @@ enum lttng_live_iterator_status lttng_live_get_next_index( stream->ctf_stream_class_id = ctf_stream_class_id; } stream->state = LTTNG_LIVE_STREAM_QUIESCENT; + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } case LTTNG_VIEWER_INDEX_OK: @@ -1299,49 +1466,48 @@ enum lttng_live_iterator_status lttng_live_get_next_index( if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_COMP_LOGD("Received get_next_index response: new metadata needed"); - trace->new_metadata_needed = true; + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_COMP_LOGD("Received get_next_index response: new streams needed"); lttng_live_need_new_streams(lttng_live_msg_iter); } + status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } case LTTNG_VIEWER_INDEX_RETRY: BT_COMP_LOGD("Received get_next_index response: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_COMP_LOGD("Received get_next_index response: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; stream->has_stream_hung_up = true; + status = LTTNG_LIVE_ITERATOR_STATUS_END; break; case LTTNG_VIEWER_INDEX_ERR: BT_COMP_LOGD("Received get_next_index response: error"); memset(index, 0, sizeof(struct packet_index)); stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - goto error; + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; default: BT_COMP_LOGD("Received get_next_index response: unknown value"); memset(index, 0, sizeof(struct packet_index)); stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; - goto error; + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; } -end: - return retstatus; + goto end; error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - } - return retstatus; + status = viewer_status_to_live_iterator_status(viewer_status); +end: + return status; } BT_HIDDEN @@ -1350,7 +1516,8 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum ctf_msg_iter_medium_status retstatus = CTF_MSG_ITER_MEDIUM_STATUS_OK; + enum ctf_msg_iter_medium_status status; + enum lttng_live_viewer_status viewer_status; struct lttng_viewer_trace_packet rp; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; @@ -1360,8 +1527,7 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - uint32_t flags, status; - ssize_t ret_len; + uint32_t flags, rp_status; BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1381,51 +1547,39 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error sending get_data_packet request: %s", - bt_socket_errormsg()); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get data packet command"); goto error; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Socket error receiving get_data response: %s", - bt_socket_errormsg()); - goto error; - } - if (ret_len != sizeof(rp)) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "get_data_packet: " - "expected %zu, received %zd", sizeof(rp), ret_len); + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get data packet reply"); goto error; } flags = be32toh(rp.flags); - status = be32toh(rp.status); + rp_status = be32toh(rp.status); - switch (status) { + switch (rp_status) { case LTTNG_VIEWER_GET_PACKET_OK: req_len = be32toh(rp.len); BT_COMP_LOGD("Received get_data_packet response: Ok, " "packet size : %" PRIu64 "", req_len); + status = CTF_MSG_ITER_MEDIUM_STATUS_OK; break; case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ BT_COMP_LOGD("Received get_data_packet response: retry"); - retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_COMP_LOGD("get_data_packet: new metadata needed, try again later"); - trace->new_metadata_needed = true; + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_COMP_LOGD("get_data_packet: new streams needed, try again later"); @@ -1433,56 +1587,50 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; } BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: error"); - goto error; + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; + goto end; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = CTF_MSG_ITER_MEDIUM_STATUS_EOF; + status = CTF_MSG_ITER_MEDIUM_STATUS_EOF; goto end; default: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_data_packet response: unknown"); + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; goto error; } if (req_len == 0) { + status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; goto error; } - ret_len = lttng_live_recv(viewer_connection, buf, req_len); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving trace packet: %s", - bt_socket_errormsg()); + viewer_status = lttng_live_recv(viewer_connection, buf, req_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get data packet"); goto error; } - BT_ASSERT(ret_len == req_len); - *recv_len = ret_len; -end: - return retstatus; + *recv_len = req_len; + status = CTF_MSG_ITER_MEDIUM_STATUS_OK; + goto end; error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; - } else { - retstatus = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; - } - return retstatus; + + status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status); +end: + return status; } /* * Request new streams for a session. */ BT_HIDDEN -enum lttng_live_iterator_status lttng_live_get_new_streams( +enum lttng_live_iterator_status lttng_live_session_get_new_streams( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { @@ -1493,18 +1641,22 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_viewer_new_streams_response rp; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; + enum lttng_live_viewer_status viewer_status; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - ssize_t ret_len; if (!session->new_streams_needed) { - return LTTNG_LIVE_ITERATOR_STATUS_OK; + status = LTTNG_LIVE_ITERATOR_STATUS_OK; + goto end; } + BT_COMP_LOGD("Requesting new streams for session: " + "session-id=%"PRIu64, session->id); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1519,27 +1671,21 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error sending get_new_streams request: %s", - bt_socket_errormsg()); - goto error; + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_send_status(self_comp, NULL, + viewer_status, "get new streams command"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } - BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); - if (ret_len == 0) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Remote side has closed connection"); - goto error; - } - if (ret_len == BT_SOCKET_ERROR) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Error receiving get_new_streams response"); - goto error; + viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "get new streams reply"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } - BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); @@ -1557,46 +1703,47 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_COMP_LOGD("Received get_new_streams response: error"); - goto error; + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; default: BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_new_streams response: Unknown:" "return code %u", be32toh(rp.status)); - goto error; + status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; } - if (receive_streams(session, streams_count, self_msg_iter)) { - BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams"); - goto error; + viewer_status = receive_streams(session, streams_count, self_msg_iter); + if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { + viewer_handle_recv_status(self_comp, NULL, + viewer_status, "new streams"); + status = viewer_status_to_live_iterator_status(viewer_status); + goto end; } -end: - return status; -error: - if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { - status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; - } else { - status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; - BT_COMP_LOGE("Error receiving streams."); - } + status = LTTNG_LIVE_ITERATOR_STATUS_OK; +end: return status; } BT_HIDDEN -struct live_viewer_connection *live_viewer_connection_create( - const char *url, bool in_query, - struct lttng_live_msg_iter *lttng_live_msg_iter, +enum lttng_live_viewer_status live_viewer_connection_create( bt_self_component *self_comp, bt_self_component_class *self_comp_class, - bt_logging_level log_level) + bt_logging_level log_level, + const char *url, bool in_query, + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct live_viewer_connection **viewer) { struct live_viewer_connection *viewer_connection; + enum lttng_live_viewer_status status; viewer_connection = g_new0(struct live_viewer_connection, 1); if (bt_socket_init(log_level) != 0) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to init socket"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } @@ -1613,37 +1760,55 @@ struct live_viewer_connection *live_viewer_connection_create( if (!viewer_connection->url) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to allocate URL buffer"); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto error; } - BT_COMP_LOGI("Establishing connection to url \"%s\"...", url); - if (lttng_live_connect_viewer(viewer_connection)) { + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Establishing connection to url \"%s\"...", url); + status = lttng_live_connect_viewer(viewer_connection); + /* + * Only print error and append cause in case of error. not in case of + * interruption. + */ + if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Failed to establish connection: " "url=\"%s\"", url); goto error; + } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { + goto error; } - BT_COMP_LOGI("Connection to url \"%s\" is established", url); - return viewer_connection; + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connection to url \"%s\" is established", url); + + *viewer = viewer_connection; + status = LTTNG_LIVE_VIEWER_STATUS_OK; + goto end; error: if (viewer_connection) { live_viewer_connection_destroy(viewer_connection); } - - return NULL; +end: + return status; } BT_HIDDEN void live_viewer_connection_destroy( struct live_viewer_connection *viewer_connection) { - BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str); + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; if (!viewer_connection) { goto end; } + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Closing connection to relay:" + "relay-url=\"%s\"", viewer_connection->url->str); + lttng_live_disconnect_viewer(viewer_connection); if (viewer_connection->url) { @@ -1662,6 +1827,10 @@ void live_viewer_connection_destroy( g_string_free(viewer_connection->session_name, true); } + if (viewer_connection->proto) { + g_string_free(viewer_connection->proto, true); + } + g_free(viewer_connection); bt_socket_fini();