From: Francis Deslauriers Date: Thu, 19 Dec 2019 21:39:45 +0000 (-0500) Subject: Fix: src.ctf.lttng-live: emitting stream end msg with no stream X-Git-Tag: v2.0.0~11 X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=901f0ce8eca409028109888889ddeafb08d7dd69 Fix: src.ctf.lttng-live: emitting stream end msg with no stream Background ========== When a stream hangs up on the `src.ctf.lttng-live` component, we make sure we send a stream end message to ensure we honor The Contract which states that any stream beginning must eventually be followed by its stream end counterpart. We do this by calling `ctf_msg_iter_get_next_message()` one last time to emit any missing messages. Using the upcoming lttng clear feature in conjunction with a per-pid session makes it highly likely that a live stream hangs up on the `src.ctf.lttng-live` component between the moment we learn about it and the moment we first ask for its live index. In such event, the live stream iterator and its `ctf_msg_it` are both created but the corresponding stream is uninitialized. When the component realized that a live stream has hung up, it calls `ctf_msg_iter_get_next_message()` to respect The Contract but then errors out here: CAUSED BY [lttng-live: 'source.ctf.lttng-live'] (msg-iter.c:2474) Cannot create stream end message because stream is NULL: msg-it-addr=0x555fba864010 The `stream` field is null because we never got the chance to received any index for this stream. Issue ===== It's possible for a `ctf_msg` state machine to pass by the `STATE_EMIT_MSG_STREAM_END` state without having passed by the `STATE_EMIT_MSG_STREAM_BEGINNING` state. Solution ======== Keep track of the fact that we sent a stream beginning message downstream and that we need to send its respective stream end message. If no message were send for a particular stream, we can omit sending a stream end message. Signed-off-by: Francis Deslauriers Change-Id: If7f52f43162e7263785713c01c226907fe475d94 Reviewed-on: https://review.lttng.org/c/babeltrace/+/2719 CI-Build: Simon Marchi Tested-by: jenkins Reviewed-by: Simon Marchi --- diff --git a/src/plugins/ctf/common/msg-iter/msg-iter.c b/src/plugins/ctf/common/msg-iter/msg-iter.c index fc617f3d..a49462b8 100644 --- a/src/plugins/ctf/common/msg-iter/msg-iter.c +++ b/src/plugins/ctf/common/msg-iter/msg-iter.c @@ -108,6 +108,7 @@ enum state { STATE_EMIT_MSG_PACKET_END_MULTI, STATE_EMIT_MSG_PACKET_END_SINGLE, STATE_EMIT_QUEUED_MSG_PACKET_END, + STATE_CHECK_EMIT_MSG_STREAM_END, STATE_EMIT_MSG_STREAM_END, STATE_DONE, }; @@ -181,6 +182,14 @@ struct ctf_msg_iter { */ bool emit_stream_beginning_message; + /* + * True if we need to emit a stream end message at the end of the + * current stream. A live stream may never receive any data and thus + * never send a stream beginning message which removes the need to emit + * a stream end message. + */ + bool emit_stream_end_message; + /* Database of current dynamic scopes */ struct { bt_field *stream_packet_context; @@ -318,6 +327,8 @@ const char *state_string(enum state state) return "EMIT_MSG_PACKET_END_SINGLE"; case STATE_EMIT_QUEUED_MSG_PACKET_END: return "EMIT_QUEUED_MSG_PACKET_END"; + case STATE_CHECK_EMIT_MSG_STREAM_END: + return "CHECK_EMIT_MSG_STREAM_END"; case STATE_EMIT_MSG_STREAM_END: return "EMIT_MSG_STREAM_END"; case STATE_DONE: @@ -725,7 +736,7 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it) medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data); if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) { /* No more packets. */ - msg_it->state = STATE_EMIT_MSG_STREAM_END; + msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; status = CTF_MSG_ITER_STATUS_OK; goto end; } else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) { @@ -803,7 +814,7 @@ enum ctf_msg_iter_status read_packet_header_begin_state( break; case CTF_MSG_ITER_STATUS_EOF: status = CTF_MSG_ITER_STATUS_OK; - msg_it->state = STATE_EMIT_MSG_STREAM_END; + msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; goto end; default: goto end; @@ -1657,6 +1668,20 @@ end: return CTF_MSG_ITER_STATUS_OK; } +static inline +enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it) +{ + enum state next_state; + + if (msg_it->emit_stream_end_message) { + next_state = STATE_EMIT_MSG_STREAM_END; + } else { + next_state = STATE_DONE; + } + + return next_state; +} + static inline enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) { @@ -1755,6 +1780,9 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) case STATE_EMIT_QUEUED_MSG_PACKET_END: msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE; break; + case STATE_CHECK_EMIT_MSG_STREAM_END: + msg_it->state = check_emit_msg_stream_end(msg_it); + break; case STATE_EMIT_MSG_STREAM_END: msg_it->state = STATE_DONE; break; @@ -1817,6 +1845,7 @@ void ctf_msg_iter_reset(struct ctf_msg_iter *msg_it) msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1); msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1); msg_it->emit_stream_beginning_message = true; + msg_it->emit_stream_end_message = false; } static @@ -2947,6 +2976,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message( /* create_msg_stream_beginning() logs errors */ *message = create_msg_stream_beginning(msg_it); msg_it->emit_stream_beginning_message = false; + msg_it->emit_stream_end_message = true; if (!*message) { status = CTF_MSG_ITER_STATUS_ERROR; @@ -2956,6 +2986,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message( case STATE_EMIT_MSG_STREAM_END: /* create_msg_stream_end() logs errors */ *message = create_msg_stream_end(msg_it); + msg_it->emit_stream_end_message = false; if (!*message) { status = CTF_MSG_ITER_STATUS_ERROR; diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 35e1037f..bef21b09 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -873,6 +873,10 @@ enum lttng_live_iterator_status lttng_live_iterator_close_stream( "Error getting the next message from CTF message iterator"); live_status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; goto end; + } else if (status == CTF_MSG_ITER_STATUS_EOF) { + BT_COMP_LOGI("Reached the end of the live stream iterator."); + live_status = LTTNG_LIVE_ITERATOR_STATUS_END; + goto end; } BT_ASSERT(status == CTF_MSG_ITER_STATUS_OK);