X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=3548b639f60764b68d3afa5b2ab29473ab2c63ec;hb=516bf0a77e025cfccce2fa400b757e94dc0bf1d8;hp=2cf23d171ce8ec326654af8d3400fc6f97a0c96c;hpb=ecb4ba8aa48c8d0deab589b7c1f4c999a52aa355;p=babeltrace.git diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index 2cf23d17..3548b639 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -1,24 +1,8 @@ /* - * Copyright 2019 - Francis Deslauriers - * Copyright 2016 - Mathieu Desnoyers + * SPDX-License-Identifier: MIT * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright 2019 Francis Deslauriers + * Copyright 2016 Mathieu Desnoyers */ #define BT_COMP_LOG_SELF_COMP (viewer_connection->self_comp) @@ -78,6 +62,86 @@ do { \ _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__); \ } while (0) +static +const char *lttng_viewer_command_string(enum lttng_viewer_command cmd) +{ + switch (cmd){ + case LTTNG_VIEWER_CONNECT: + return "CONNECT"; + case LTTNG_VIEWER_LIST_SESSIONS: + return "LIST_SESSIONS"; + case LTTNG_VIEWER_ATTACH_SESSION: + return "ATTACH_SESSION"; + case LTTNG_VIEWER_GET_NEXT_INDEX: + return "GET_NEXT_INDEX"; + case LTTNG_VIEWER_GET_PACKET: + return "GET_PACKET"; + case LTTNG_VIEWER_GET_METADATA: + return "GET_METADATA"; + case LTTNG_VIEWER_GET_NEW_STREAMS: + return "GET_NEW_STREAMS"; + case LTTNG_VIEWER_CREATE_SESSION: + return "CREATE_SESSION"; + case LTTNG_VIEWER_DETACH_SESSION: + return "DETACH_SESSION"; + } + + bt_common_abort(); +} + +static +const char *lttng_viewer_next_index_return_code_string( + enum lttng_viewer_next_index_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_INDEX_OK: + return "INDEX_OK"; + case LTTNG_VIEWER_INDEX_RETRY: + return "INDEX_RETRY"; + case LTTNG_VIEWER_INDEX_HUP: + return "INDEX_HUP"; + case LTTNG_VIEWER_INDEX_ERR: + return "INDEX_ERR"; + case LTTNG_VIEWER_INDEX_INACTIVE: + return "INDEX_INACTIVE"; + case LTTNG_VIEWER_INDEX_EOF: + return "INDEX_EOF"; + } + + bt_common_abort(); +} + +static +const char *lttng_viewer_get_packet_return_code_string( + enum lttng_viewer_get_packet_return_code code) +{ + switch (code) { + case LTTNG_VIEWER_GET_PACKET_OK: + return "GET_PACKET_OK"; + case LTTNG_VIEWER_GET_PACKET_RETRY: + return "GET_PACKET_RETRY"; + case LTTNG_VIEWER_GET_PACKET_ERR: + return "GET_PACKET_ERR"; + case LTTNG_VIEWER_GET_PACKET_EOF: + return "GET_PACKET_EOF"; + } + + bt_common_abort(); +}; + +static +const char *lttng_viewer_seek_string(enum lttng_viewer_seek seek) +{ + switch (seek) { + case LTTNG_VIEWER_SEEK_BEGINNING: + return "SEEK_BEGINNING"; + case LTTNG_VIEWER_SEEK_LAST: + return "SEEK_LAST"; + } + + bt_common_abort(); +} + static inline enum lttng_live_iterator_status viewer_status_to_live_iterator_status( enum lttng_live_viewer_status viewer_status) @@ -89,9 +153,9 @@ enum lttng_live_iterator_status viewer_status_to_live_iterator_status( return LTTNG_LIVE_ITERATOR_STATUS_AGAIN; case LTTNG_LIVE_VIEWER_STATUS_ERROR: return LTTNG_LIVE_ITERATOR_STATUS_ERROR; - default: - bt_common_abort(); } + + bt_common_abort(); } static inline @@ -105,9 +169,27 @@ enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status( return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; case LTTNG_LIVE_VIEWER_STATUS_ERROR: return CTF_MSG_ITER_MEDIUM_STATUS_ERROR; - default: - bt_common_abort(); } + + bt_common_abort(); +} + +static inline +void viewer_connection_close_socket( + struct live_viewer_connection *viewer_connection) +{ + bt_self_component_class *self_comp_class = + viewer_connection->self_comp_class; + bt_self_component *self_comp = + viewer_connection->self_comp; + int ret = bt_socket_close(viewer_connection->control_sock); + if (ret == -1) { + BT_COMP_OR_COMP_CLASS_LOGW_ERRNO( + self_comp, self_comp_class, + "Error closing viewer connection socket: ", "."); + } + + viewer_connection->control_sock = BT_INVALID_SOCKET; } /* @@ -157,12 +239,14 @@ enum lttng_live_viewer_status lttng_live_recv( } } else { /* - * For any other types of socket error, returng - * an error. + * For any other types of socket error, close + * the socket and return an error. */ LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( self_comp, self_comp_class, "Error receiving from Relay", "."); + + viewer_connection_close_socket(viewer_connection); status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -172,10 +256,12 @@ enum lttng_live_viewer_status lttng_live_recv( * connection was orderly shutdown from the other peer. * If that happens when we are trying to receive * a message from it, it means something when wrong. + * Close the socket and return an error. */ - status = LTTNG_LIVE_VIEWER_STATUS_ERROR; BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp, self_comp_class, "Remote side has closed connection"); + viewer_connection_close_socket(viewer_connection); + status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -236,11 +322,14 @@ enum lttng_live_viewer_status lttng_live_send( } } else { /* - * The send() call returned an error. + * For any other types of socket error, close + * the socket and return an error. */ LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO( self_comp, self_comp_class, "Error sending to Relay", "."); + + viewer_connection_close_socket(viewer_connection); status = LTTNG_LIVE_VIEWER_STATUS_ERROR; goto end; } @@ -321,9 +410,9 @@ enum lttng_live_viewer_status lttng_live_handshake( char cmd_buf[cmd_buf_len]; BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, - "Handshaking with the Relay: " - "major-version=%u, minor-version=%u", - LTTNG_LIVE_MAJOR, LTTNG_LIVE_MINOR); + "Handshaking with the relay daemon: cmd=%s, major-version=%u, minor-version=%u", + lttng_viewer_command_string(LTTNG_VIEWER_CONNECT), LTTNG_LIVE_MAJOR, + LTTNG_LIVE_MINOR); cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT); cmd.data_size = htobe64((uint64_t) sizeof(connect)); @@ -387,7 +476,7 @@ end: static enum lttng_live_viewer_status lttng_live_connect_viewer( - struct live_viewer_connection *viewer_connection) + struct live_viewer_connection *viewer_connection) { struct hostent *host; struct sockaddr_in server_addr; @@ -777,6 +866,9 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions( goto error; } + BT_LOGD("Requesting list of sessions: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS)); + cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); @@ -854,7 +946,8 @@ enum lttng_live_viewer_status lttng_live_query_session_ids( bt_self_component_class *self_comp_class = viewer_connection->self_comp_class; - BT_COMP_LOGD("Asking the Relay for the list of sessions"); + BT_COMP_LOGD("Asking the relay daemon for the list of sessions: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_LIST_SESSIONS)); cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS); cmd.data_size = htobe64((uint64_t) 0); @@ -928,7 +1021,8 @@ enum lttng_live_viewer_status lttng_live_create_viewer_session( viewer_connection->self_comp_class; BT_COMP_OR_COMP_CLASS_LOGD(self_comp, self_comp_class, - "Creating a viewer session"); + "Creating a viewer session: cmd=%s", + lttng_viewer_command_string(LTTNG_VIEWER_CREATE_SESSION)); cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION); cmd.data_size = htobe64((uint64_t) 0); @@ -981,7 +1075,7 @@ enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session lttng_live_msg_iter->viewer_connection; bt_self_component *self_comp = viewer_connection->self_comp; - BT_COMP_LOGI("Getting %" PRIu32 " new streams:", stream_count); + BT_COMP_LOGI("Getting %" PRIu32 " new streams", stream_count); for (i = 0; i < stream_count; i++) { struct lttng_viewer_stream stream; struct lttng_live_stream_iterator *live_stream; @@ -1032,7 +1126,7 @@ end: } BT_HIDDEN -enum lttng_live_viewer_status lttng_live_attach_session( +enum lttng_live_viewer_status lttng_live_session_attach( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { @@ -1050,7 +1144,11 @@ enum lttng_live_viewer_status lttng_live_attach_session( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - BT_COMP_LOGD("Attaching to session: session-id=%"PRIu64, session_id); + BT_COMP_LOGD("Attaching to session: cmd=%s, session-id=%" PRIu64 + ", seek=%s", + lttng_viewer_command_string(LTTNG_VIEWER_ATTACH_SESSION), + session_id, + lttng_viewer_seek_string(LTTNG_VIEWER_SEEK_LAST)); cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1134,7 +1232,7 @@ end: } BT_HIDDEN -enum lttng_live_viewer_status lttng_live_detach_session( +enum lttng_live_viewer_status lttng_live_session_detach( struct lttng_live_session *session) { struct lttng_viewer_cmd cmd; @@ -1150,10 +1248,19 @@ enum lttng_live_viewer_status lttng_live_detach_session( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - if (!session->attached) { + /* + * The session might already be detached and the viewer socket might + * already been closed. This happens when calling this function when + * tearing down the graph after an error. + */ + if (!session->attached || viewer_connection->control_sock == BT_INVALID_SOCKET) { return 0; } + BT_COMP_LOGD("Detaching from session: cmd=%s, session-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_DETACH_SESSION), + session_id); + cmd.cmd = htobe32(LTTNG_VIEWER_DETACH_SESSION); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1217,8 +1324,8 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( struct lttng_viewer_cmd cmd; struct lttng_viewer_get_metadata rq; struct lttng_viewer_metadata_packet rp; - char *data = NULL; - ssize_t ret_len; + gchar *data = NULL; + ssize_t writelen; struct lttng_live_session *session = trace->session; struct lttng_live_msg_iter *lttng_live_msg_iter = session->lttng_live_msg_iter; @@ -1229,8 +1336,9 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - BT_COMP_LOGD("Requesting new metadata for trace: " - "trace-id=%"PRIu64", metadata-stream-id=%"PRIu64, + BT_COMP_LOGD("Requesting new metadata for trace:" + "cmd=%s, trace-id=%" PRIu64 ", metadata-stream-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_METADATA), trace->id, metadata->stream_id); rq.stream_id = htobe64(metadata->stream_id); @@ -1282,7 +1390,7 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Received get_metadata response: unknown"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } len = be64toh(rp.len); @@ -1291,15 +1399,15 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Erroneous response length"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } - data = calloc(1, len); + data = g_new0(gchar, len); if (!data) { BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp, "Failed to allocate data buffer", "."); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } viewer_status = lttng_live_recv(viewer_connection, data, len); @@ -1307,29 +1415,25 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet( viewer_handle_recv_status(self_comp, NULL, viewer_status, "get metadata packet"); status = (enum lttng_live_get_one_metadata_status) viewer_status; - goto error; + goto end; } /* * Write the metadata to the file handle. */ - do { - ret_len = fwrite(data, 1, len, fp); - } while (ret_len < 0 && errno == EINTR); - if (ret_len < 0) { + writelen = fwrite(data, sizeof(uint8_t), len, fp); + if (writelen != len) { BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Writing in the metadata file stream"); status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR; - goto error; + goto end; } - BT_ASSERT(ret_len == len); + *reply_len = len; status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK; -error: - free(data); - end: + g_free(data); return status; } @@ -1351,6 +1455,23 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, pindex->events_discarded = be64toh(lindex->events_discarded); } +static +void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter) +{ + uint64_t session_idx; + struct live_viewer_connection *viewer_connection = + lttng_live_msg_iter->viewer_connection; + + for (session_idx = 0; session_idx < lttng_live_msg_iter->sessions->len; + session_idx++) { + struct lttng_live_session *session = + g_ptr_array_index(lttng_live_msg_iter->sessions, session_idx); + BT_COMP_LOGD("Marking session as needing new streams: " + "session-id=%" PRIu64, session->id); + session->new_streams_needed = true; + } +} + BT_HIDDEN enum lttng_live_iterator_status lttng_live_get_next_index( struct lttng_live_msg_iter *lttng_live_msg_iter, @@ -1370,14 +1491,14 @@ enum lttng_live_iterator_status lttng_live_get_next_index( char cmd_buf[cmd_buf_len]; uint32_t flags, rp_status; - BT_COMP_LOGD("Requesting next index for stream: " - "stream-id=%"PRIu64, stream->viewer_stream_id); - + BT_COMP_LOGD("Requesting next index for stream: cmd=%s, " + "viewer-stream-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), + stream->viewer_stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); @@ -1388,6 +1509,7 @@ enum lttng_live_iterator_status lttng_live_get_next_index( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_send_status(self_comp, NULL, @@ -1405,23 +1527,26 @@ enum lttng_live_iterator_status lttng_live_get_next_index( flags = be32toh(rp.flags); rp_status = be32toh(rp.status); + BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s", + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEXT_INDEX), + lttng_viewer_next_index_return_code_string(rp_status)); switch (rp_status) { case LTTNG_VIEWER_INDEX_INACTIVE: { uint64_t ctf_stream_class_id; - BT_COMP_LOGD("Received get_next_index response: inactive"); memset(index, 0, sizeof(struct packet_index)); index->ts_cycles.timestamp_end = be64toh(rp.timestamp_end); stream->current_inactivity_ts = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); - if (stream->ctf_stream_class_id != -1ULL) { - BT_ASSERT(stream->ctf_stream_class_id == + if (stream->ctf_stream_class_id.is_set) { + BT_ASSERT(stream->ctf_stream_class_id.value== ctf_stream_class_id); } else { - stream->ctf_stream_class_id = ctf_stream_class_id; + stream->ctf_stream_class_id.value = ctf_stream_class_id; + stream->ctf_stream_class_id.is_set = true; } - stream->state = LTTNG_LIVE_STREAM_QUIESCENT; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_QUIESCENT); status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } @@ -1429,53 +1554,55 @@ enum lttng_live_iterator_status lttng_live_get_next_index( { uint64_t ctf_stream_class_id; - BT_COMP_LOGD("Received get_next_index response: OK"); lttng_index_to_packet_index(&rp, index); ctf_stream_class_id = be64toh(rp.stream_id); - if (stream->ctf_stream_class_id != -1ULL) { - BT_ASSERT(stream->ctf_stream_class_id == + if (stream->ctf_stream_class_id.is_set) { + BT_ASSERT(stream->ctf_stream_class_id.value== ctf_stream_class_id); } else { - stream->ctf_stream_class_id = ctf_stream_class_id; + stream->ctf_stream_class_id.value = ctf_stream_class_id; + stream->ctf_stream_class_id.is_set = true; } - stream->state = LTTNG_LIVE_STREAM_ACTIVE_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_DATA); if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - BT_COMP_LOGD("Received get_next_index response: new metadata needed"); - trace->new_metadata_needed = true; + BT_COMP_LOGD("Marking trace as needing new metadata: " + "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64, + lttng_viewer_next_index_return_code_string(rp_status), + trace->id); + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { - BT_COMP_LOGD("Received get_next_index response: new streams needed"); + BT_COMP_LOGD("Marking all sessions as possibly needing new streams: " + "response=%s, response-flag=NEW_STREAM", + lttng_viewer_next_index_return_code_string(rp_status)); lttng_live_need_new_streams(lttng_live_msg_iter); } status = LTTNG_LIVE_ITERATOR_STATUS_OK; break; } case LTTNG_VIEWER_INDEX_RETRY: - BT_COMP_LOGD("Received get_next_index response: retry"); memset(index, 0, sizeof(struct packet_index)); - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; case LTTNG_VIEWER_INDEX_HUP: - BT_COMP_LOGD("Received get_next_index response: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - stream->state = LTTNG_LIVE_STREAM_EOF; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_EOF); stream->has_stream_hung_up = true; status = LTTNG_LIVE_ITERATOR_STATUS_END; break; case LTTNG_VIEWER_INDEX_ERR: - BT_COMP_LOGD("Received get_next_index response: error"); memset(index, 0, sizeof(struct packet_index)); - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; default: BT_COMP_LOGD("Received get_next_index response: unknown value"); memset(index, 0, sizeof(struct packet_index)); - stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; + lttng_live_stream_iterator_set_state(stream, LTTNG_LIVE_STREAM_ACTIVE_NO_DATA); status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; } @@ -1506,8 +1633,11 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( char cmd_buf[cmd_buf_len]; uint32_t flags, rp_status; - BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, + BT_COMP_LOGD("Requesting data from stream: cmd=%s, " + "offset=%" PRIu64 ", request-len=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), offset, req_len); + cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); @@ -1524,47 +1654,58 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_send_status(self_comp, NULL, viewer_status, "get data packet command"); - goto error; + goto error_convert_status; } viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet reply"); - goto error; + goto error_convert_status; } flags = be32toh(rp.flags); rp_status = be32toh(rp.status); + BT_COMP_LOGD("Received response from relay daemon: cmd=%s, response=%s", + lttng_viewer_command_string(LTTNG_VIEWER_GET_PACKET), + lttng_viewer_get_packet_return_code_string(rp_status)); switch (rp_status) { case LTTNG_VIEWER_GET_PACKET_OK: req_len = be32toh(rp.len); - BT_COMP_LOGD("Received get_data_packet response: Ok, " - "packet size : %" PRIu64 "", req_len); + BT_COMP_LOGD("Got packet from relay daemon: response=%s, packet-len=%" PRIu64 "", + lttng_viewer_get_packet_return_code_string(rp_status), + req_len); status = CTF_MSG_ITER_MEDIUM_STATUS_OK; break; case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ - BT_COMP_LOGD("Received get_data_packet response: retry"); status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { - BT_COMP_LOGD("get_data_packet: new metadata needed, try again later"); - trace->new_metadata_needed = true; + BT_COMP_LOGD("Marking trace as needing new metadata: " + "response=%s, response-flag=NEW_METADATA, trace-id=%" PRIu64, + lttng_viewer_next_index_return_code_string(rp_status), + trace->id); + trace->metadata_stream_state = LTTNG_LIVE_METADATA_STREAM_STATE_NEEDED; } if (flags & LTTNG_VIEWER_FLAG_NEW_STREAM) { - BT_COMP_LOGD("get_data_packet: new streams needed, try again later"); + BT_COMP_LOGD("Marking all sessions as possibly needing new streams: " + "response=%s, response-flag=NEW_STREAM", + lttng_viewer_next_index_return_code_string(rp_status)); lttng_live_need_new_streams(lttng_live_msg_iter); } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN; + BT_COMP_LOGD("Reply with any one flags set means we should retry: response=%s", + lttng_viewer_get_packet_return_code_string(rp_status)); goto end; } BT_COMP_LOGE_APPEND_CAUSE(self_comp, @@ -1576,28 +1717,28 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes( goto end; default: BT_COMP_LOGE_APPEND_CAUSE(self_comp, - "Received get_data_packet response: unknown"); + "Received get_data_packet response: unknown (%d)", rp_status); status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; - goto error; + goto end; } if (req_len == 0) { status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR; - goto error; + goto end; } viewer_status = lttng_live_recv(viewer_connection, buf, req_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_recv_status(self_comp, NULL, viewer_status, "get data packet"); - goto error; + goto error_convert_status; } *recv_len = req_len; status = CTF_MSG_ITER_MEDIUM_STATUS_OK; goto end; -error: +error_convert_status: status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status); end: return status; @@ -1607,7 +1748,7 @@ end: * Request new streams for a session. */ BT_HIDDEN -enum lttng_live_iterator_status lttng_live_get_new_streams( +enum lttng_live_iterator_status lttng_live_session_get_new_streams( struct lttng_live_session *session, bt_self_message_iterator *self_msg_iter) { @@ -1631,8 +1772,10 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( goto end; } - BT_COMP_LOGD("Requesting new streams for session: " - "session-id=%"PRIu64, session->id); + BT_COMP_LOGD("Requesting new streams for session: cmd=%s, " + "session-id=%" PRIu64, + lttng_viewer_command_string(LTTNG_VIEWER_GET_NEW_STREAMS), + session->id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1648,6 +1791,7 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( */ memcpy(cmd_buf, &cmd, sizeof(cmd)); memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) { viewer_handle_send_status(self_comp, NULL,