X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=be9c79e0035347660c98be2ef013b23a29c52f85;hb=0235b0db7de5bcacdb3650c92461f2ce5eb2143d;hp=69a2a25e7dbff67457aec9be1654687171b19623;hpb=f79c2d7a3575db7cd07f502b39e1db76619b70a6;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index 69a2a25e..be9c79e0 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -1,24 +1,8 @@ /* - * Copyright 2019 - Francis Deslauriers - * Copyright 2016 - Mathieu Desnoyers + * SPDX-License-Identifier: MIT * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2019 Francis Deslauriers + * Copyright 2016 Mathieu Desnoyers */ #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp) @@ -110,6 +94,24 @@ enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status( } } +static inline +void viewer_connection_close_socket( + struct live_viewer_connection *viewer_connection) +{ + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + int ret = bt_socket_close(viewer_connection->control_sock); + if (ret == -1) { + BT_COMP_OR_COMP_CLASS_LOGW_ERRNO( + self_comp, self_comp_class, + "Error closing viewer connection socket: ", "."); + } + + viewer_connection->control_sock = BT_INVALID_SOCKET; +} + /* * This function receives a message from the Relay daemon. * If it received the entire message, it returns _OK, @@ -157,12 +159,14 @@ enum lttng_live_viewer_status lttng_live_recv( } } else { /* - * For any other types of socket error, returng - * an error. + * For any other types of socket error, close + * the socket and return an error. */ LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( self_comp, self_comp_class, "Error receiving from Relay", "."); + + viewer_connection_close_socket(viewer_connection); status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -172,10 +176,12 @@ enum lttng_live_viewer_status lttng_live_recv( * connection was orderly shutdown from the other peer. * If that happens when we are trying to receive * a message from it, it means something when wrong. + * Close the socket and return an error. */ - status = LTTNG_LIVE_VIEWER_STATUS_ERROR; BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Remote side has closed connection"); + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -236,11 +242,14 @@ enum lttng_live_viewer_status lttng_live_send( } } else { /* - * The send() call returned an error. + * For any other types of socket error, close + * the socket and return an error. */ LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( self_comp, self_comp_class, "Error sending to Relay", "."); + + viewer_connection_close_socket(viewer_connection); status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -281,6 +290,8 @@ int parse_url(struct live_viewer_connection *viewer_connection) error_buf); goto end; } + viewer_connection->proto = lttng_live_url_parts.proto; + lttng_live_url_parts.proto = NULL; viewer_connection->relay_hostname = lttng_live_url_parts.hostname; lttng_live_url_parts.hostname = NULL; @@ -299,15 +310,6 @@ int parse_url(struct live_viewer_connection *viewer_connection) lttng_live_url_parts.session_name = NULL; } - BT_COMP_LOGI("Connecting to hostname : %s, port : %d, " - "target hostname : %s, session name : %s, proto : %s", - viewer_connection->relay_hostname->str, - viewer_connection->port, - !viewer_connection->target_hostname ? - "" : viewer_connection->target_hostname->str, - !viewer_connection->session_name ? - "" : viewer_connection->session_name->str, - lttng_live_url_parts.proto->str); ret = 0; end: @@ -327,6 +329,11 @@ enum lttng_live_viewer_status lttng_live_handshake( const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); char cmd_buf[cmd_buf_len]; + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Handshaking with the Relay: " + "major-version=%u, minor-version=%u", + LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR); + cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); cmd.data_size = htobe64((uint64_t) sizeof(connect)); cmd.cmd_version = htobe32(0); @@ -358,10 +365,12 @@ enum lttng_live_viewer_status lttng_live_handshake( goto end; } - BT_COMP_LOGI("Received viewer session ID : %" PRIu64, - (uint64_t) be64toh(connect.viewer_session_id)); - BT_COMP_LOGI("Relayd version : %u.%u", be32toh(connect.major), - be32toh(connect.minor)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Received viewer session ID : %" PRIu64, + (uint64_t) be64toh(connect.viewer_session_id)); + BT_COMP_OR_COMP_CLASS_LOGI(self_comp, self_comp_class, + "Relayd version : %u.%u", be32toh(connect.major), + be32toh(connect.minor)); if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, @@ -387,7 +396,7 @@ end: static enum lttng_live_viewer_status lttng_live_connect_viewer( - struct live_viewer_connection *viewer_connection) + struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; @@ -402,6 +411,17 @@ enum lttng_live_viewer_status lttng_live_connect_viewer( goto error; } + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connecting to hostname : %s, port : %d, " + "target hostname : %s, session name : %s, proto : %s", + viewer_connection->relay_hostname->str, + viewer_connection->port, + !viewer_connection->target_hostname ? + "" : viewer_connection->target_hostname->str, + !viewer_connection->session_name ? + "" : viewer_connection->session_name->str, + viewer_connection->proto->str); + host = gethostbyname(viewer_connection->relay_hostname->str); if (!host) { BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, @@ -451,8 +471,8 @@ enum lttng_live_viewer_status lttng_live_connect_viewer( error: if (viewer_connection->control_sock != BT_INVALID_SOCKET) { if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_OR_COMP_CLASS_LOGE(self_comp, self_comp_class, - "Error closing socket: %s", bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s.", bt_socket_errormsg()); } } viewer_connection->control_sock = BT_INVALID_SOCKET; @@ -464,12 +484,15 @@ static void lttng_live_disconnect_viewer( struct live_viewer_connection *viewer_connection) { + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + bt_self_component *self_comp = viewer_connection->self_comp; + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { - BT_COMP_LOGE("Error closing socket: %s", - bt_socket_errormsg()); + BT_COMP_OR_COMP_CLASS_LOGW(self_comp, self_comp_class, + "Error closing socket: %s", bt_socket_errormsg()); viewer_connection->control_sock = BT_INVALID_SOCKET; } } @@ -840,6 +863,8 @@ enum lttng_live_viewer_status lttng_live_query_session_ids( bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + BT_COMP_LOGD("Asking the Relay for the list of sessions"); + cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); @@ -871,7 +896,8 @@ enum lttng_live_viewer_status lttng_live_query_session_ids( lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); - BT_COMP_LOGI("Adding session %" PRIu64 " hostname: %s session_name: %s", + BT_COMP_LOGI("Adding session to internal list: " + "session-id=%" PRIu64 ", hostname=\"%s\", session-name=\"%s\"", session_id, lsession.hostname, lsession.session_name); if ((strncmp(lsession.session_name, @@ -879,6 +905,7 @@ enum lttng_live_viewer_status lttng_live_query_session_ids( LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { + if (lttng_live_add_session(lttng_live_msg_iter, session_id, lsession.hostname, lsession.session_name)) { @@ -909,6 +936,9 @@ enum lttng_live_viewer_status lttng_live_create_viewer_session( bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Creating a viewer session"); + cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); @@ -1011,7 +1041,7 @@ end: } BT_HIDDEN -enum lttng_live_viewer_status lttng_live_attach_session( +enum lttng_live_viewer_status lttng_live_session_attach( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { @@ -1029,6 +1059,8 @@ enum lttng_live_viewer_status lttng_live_attach_session( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + BT_COMP_LOGD("Attaching to session: session-id=%"PRIu64, session_id); + cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1111,7 +1143,7 @@ end: } BT_HIDDEN -enum lttng_live_viewer_status lttng_live_detach_session( +enum lttng_live_viewer_status lttng_live_session_detach( struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; @@ -1127,7 +1159,12 @@ enum lttng_live_viewer_status lttng_live_detach_session( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - if (!session->attached) { + /* + * The session might already be detached and the viewer socket might + * already been closed. This happens when calling this function when + * tearing down the graph after an error. + */ + if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) { return 0; } @@ -1194,8 +1231,8 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - char *data = NULL; - ssize_t ret_len; + gchar *data = NULL; + ssize_t writelen; struct lttng_live_session *session = trace->session; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; @@ -1206,6 +1243,10 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; + BT_COMP_LOGD("Requesting new metadata for trace: " + "trace-id=%"PRIu64", metadata-stream-id=%"PRIu64, + trace->id, metadata->stream_id); + rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1255,7 +1296,7 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } len = be64toh(rp.len); @@ -1264,15 +1305,15 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } - data = calloc(1, len); + data = g_new0(gchar, len); if (!data) { BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", "."); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } viewer_status = lttng_live_recv(viewer_connection, data, len); @@ -1280,29 +1321,25 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet"); status = (enum lttng_live_get_one_metadata_status) viewer_status; - goto error; + goto end; } /* * Write the metadata to the file handle. */ - do { - ret_len = fwrite(data, 1, len, fp); - } while (ret_len < 0 && errno == EINTR); - if (ret_len < 0) { + writelen = fwrite(data, sizeof(uint8_t), len, fp); + if (writelen != len) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } - BT_ASSERT(ret_len == len); + *reply_len = len; status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; -error: - free(data); - end: + g_free(data); return status; } @@ -1324,6 +1361,19 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, pindex->events_discarded = be64toh(lindex->events_discarded); } +static +void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) +{ + uint64_t session_idx; + + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + session->new_streams_needed = true; + } +} + BT_HIDDEN enum lttng_live_iterator_status lttng_live_get_next_index( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1343,6 +1393,9 @@ enum lttng_live_iterator_status lttng_live_get_next_index( char cmd_buf[cmd_buf_len]; uint32_t flags, rp_status; + BT_COMP_LOGD("Requesting next index for stream: " + "stream-id=%"PRIu64, stream->viewer_stream_id); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1413,7 +1466,7 @@ enum lttng_live_iterator_status lttng_live_get_next_index( if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_COMP_LOGD("Received get_next_index response: new metadata needed"); - trace->new_metadata_needed = true; + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_COMP_LOGD("Received get_next_index response: new streams needed"); @@ -1526,7 +1579,7 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { BT_COMP_LOGD("get_data_packet: new metadata needed, try again later"); - trace->new_metadata_needed = true; + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { BT_COMP_LOGD("get_data_packet: new streams needed, try again later"); @@ -1577,7 +1630,7 @@ end: * Request new streams for a session. */ BT_HIDDEN -enum lttng_live_iterator_status lttng_live_get_new_streams( +enum lttng_live_iterator_status lttng_live_session_get_new_streams( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { @@ -1601,6 +1654,9 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( goto end; } + BT_COMP_LOGD("Requesting new streams for session: " + "session-id=%"PRIu64, session->id); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1708,7 +1764,8 @@ enum lttng_live_viewer_status live_viewer_connection_create( goto error; } - BT_COMP_LOGI("Establishing connection to url \"%s\"...", url); + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Establishing connection to url \"%s\"...", url); status = lttng_live_connect_viewer(viewer_connection); /* * Only print error and append cause in case of error. not in case of @@ -1722,7 +1779,8 @@ enum lttng_live_viewer_status live_viewer_connection_create( } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) { goto error; } - BT_COMP_LOGI("Connection to url \"%s\" is established", url); + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Connection to url \"%s\" is established", url); *viewer = viewer_connection; status = LTTNG_LIVE_VIEWER_STATUS_OK; @@ -1740,12 +1798,17 @@ BT_HIDDEN void live_viewer_connection_destroy( struct live_viewer_connection *viewer_connection) { - BT_COMP_LOGI("Closing connection to url \"%s\"", viewer_connection->url->str); + bt_self_component *self_comp = viewer_connection->self_comp; + bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; if (!viewer_connection) { goto end; } + BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, + "Closing connection to relay:" + "relay-url=\"%s\"", viewer_connection->url->str); + lttng_live_disconnect_viewer(viewer_connection); if (viewer_connection->url) { @@ -1764,6 +1827,10 @@ void live_viewer_connection_destroy( g_string_free(viewer_connection->session_name, true); } + if (viewer_connection->proto) { + g_string_free(viewer_connection->proto, true); + } + g_free(viewer_connection); bt_socket_fini();