From 4a39caef874f11e1684e67fd33ad8f86b0a6d651 Mon Sep 17 00:00:00 2001 From: Francis Deslauriers Date: Thu, 8 Aug 2019 13:17:40 -0400 Subject: [PATCH] Fix: src.ctf.lttng-live: omitting stream end on stream hang up Issue ===== When receiving a LTTNG_VIEWER_INDEX_HUP request status, a `src.ctf.lttng-live` component quickly removes the associated stream iterator and move on to the next request without sending the stream end message to properly close the stream. This breaks the guarantee that stream end messages are always present. Solution ======== Whenever we receive an HUP status from the server, we trigger the `bt_msg_iter` code to hit an BT_MSG_ITER_MEDIUM_STATUS_EOF so it generates the stream end message that closes the stream. Signed-off-by: Francis Deslauriers Change-Id: I89924ee95b35ca62c103651ad44c9eaf71f6df53 Reviewed-on: https://review.lttng.org/c/babeltrace/+/1840 Reviewed-by: Philippe Proulx Tested-by: jenkins --- src/plugins/ctf/lttng-live/data-stream.c | 9 +++- src/plugins/ctf/lttng-live/lttng-live.c | 48 +++++++++++++++++++ src/plugins/ctf/lttng-live/lttng-live.h | 2 + .../ctf/lttng-live/viewer-connection.c | 1 + 4 files changed, 59 insertions(+), 1 deletion(-) diff --git a/src/plugins/ctf/lttng-live/data-stream.c b/src/plugins/ctf/lttng-live/data-stream.c index 1c89a87a..04496b81 100644 --- a/src/plugins/ctf/lttng-live/data-stream.c +++ b/src/plugins/ctf/lttng-live/data-stream.c @@ -58,12 +58,18 @@ enum bt_msg_iter_medium_status medop_request_bytes( uint64_t len_left; uint64_t read_len; + if (stream->has_stream_hung_up) { + status = BT_MSG_ITER_MEDIUM_STATUS_EOF; + goto end; + } + len_left = stream->base_offset + stream->len - stream->offset; if (!len_left) { stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; status = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; - return status; + goto end; } + read_len = MIN(request_sz, stream->buflen); read_len = MIN(read_len, len_left); status = lttng_live_get_stream_bytes(live_msg_iter, @@ -72,6 +78,7 @@ enum bt_msg_iter_medium_status medop_request_bytes( *buffer_addr = stream->buf; *buffer_sz = recv_len; stream->offset += recv_len; +end: return status; } diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 40d76b9d..7760daf6 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -793,6 +793,34 @@ end: return ret; } +static +enum lttng_live_iterator_status lttng_live_iterator_close_stream( + struct lttng_live_msg_iter *lttng_live_msg_iter, + struct lttng_live_stream_iterator *stream_iter, + bt_message **curr_msg) +{ + enum lttng_live_iterator_status live_status = + LTTNG_LIVE_ITERATOR_STATUS_OK; + /* + * The viewer has hung up on us so we are closing the stream. The + * `bt_msg_iter` should simply realize that it needs to close the + * stream properly by emitting the necessary stream end message. + */ + enum bt_msg_iter_status status = + bt_msg_iter_get_next_message(stream_iter->msg_iter, + lttng_live_msg_iter->self_msg_iter, curr_msg); + + if (status == BT_MSG_ITER_STATUS_ERROR) { + live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; + goto end; + } + + BT_ASSERT(status == BT_MSG_ITER_STATUS_OK); + +end: + return live_status; +} + /* * helper function: * handle_no_data_streams() @@ -851,6 +879,17 @@ enum lttng_live_iterator_status lttng_live_iterator_next_on_stream( bt_self_component *self_comp = lttng_live_msg_iter->self_comp; enum lttng_live_iterator_status live_status; + if (stream_iter->has_stream_hung_up) { + /* + * The stream has hung up and the stream was properly closed + * during the last call to the current function. Return _END + * status now so that this stream iterator is removed for the + * stream iterator list. + */ + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; + } + retry: print_stream_state(stream_iter); live_status = lttng_live_iterator_handle_new_streams_and_metadata( @@ -860,7 +899,16 @@ retry: } live_status = lttng_live_iterator_next_handle_one_no_data_stream( lttng_live_msg_iter, stream_iter); + if (live_status != LTTNG_LIVE_ITERATOR_STATUS_OK) { + if (live_status == LTTNG_LIVE_ITERATOR_STATUS_END) { + /* + * We overwrite `live_status` since `curr_msg` is + * likely set to a valid message in this function. + */ + live_status = lttng_live_iterator_close_stream( + lttng_live_msg_iter, stream_iter, curr_msg); + } goto end; } live_status = lttng_live_iterator_next_handle_one_quiescent_stream( diff --git a/src/plugins/ctf/lttng-live/lttng-live.h b/src/plugins/ctf/lttng-live/lttng-live.h index 7b72147f..967344dc 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.h +++ b/src/plugins/ctf/lttng-live/lttng-live.h @@ -119,6 +119,8 @@ struct lttng_live_stream_iterator { /* Owned by this. */ GString *name; + + bool has_stream_hung_up; }; struct lttng_live_metadata { diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index 1ffba1cf..d96a2256 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -1217,6 +1217,7 @@ enum lttng_live_iterator_status lttng_live_get_next_index( index->offset = EOF; retstatus = LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; + stream->has_stream_hung_up = true; break; case LTTNG_VIEWER_INDEX_ERR: BT_COMP_LOGE("get_next_index: error"); -- 2.34.1