X-Git-Url: https://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Fcommon%2Fmsg-iter%2Fmsg-iter.c;h=a49462b85977810e9c3883a8ac4aff6a3d986b15;hb=901f0ce8eca409028109888889ddeafb08d7dd69;hp=fc617f3d366ccee0f8a9b8815a23b3222c432602;hpb=62988c56ccecb3f9b5415824dde056feda67986b;p=babeltrace.git diff --git a/src/plugins/ctf/common/msg-iter/msg-iter.c b/src/plugins/ctf/common/msg-iter/msg-iter.c index fc617f3d..a49462b8 100644 --- a/src/plugins/ctf/common/msg-iter/msg-iter.c +++ b/src/plugins/ctf/common/msg-iter/msg-iter.c @@ -108,6 +108,7 @@ enum state { STATE_EMIT_MSG_PACKET_END_MULTI, STATE_EMIT_MSG_PACKET_END_SINGLE, STATE_EMIT_QUEUED_MSG_PACKET_END, + STATE_CHECK_EMIT_MSG_STREAM_END, STATE_EMIT_MSG_STREAM_END, STATE_DONE, }; @@ -181,6 +182,14 @@ struct ctf_msg_iter { */ bool emit_stream_beginning_message; + /* + * True if we need to emit a stream end message at the end of the + * current stream. A live stream may never receive any data and thus + * never send a stream beginning message which removes the need to emit + * a stream end message. + */ + bool emit_stream_end_message; + /* Database of current dynamic scopes */ struct { bt_field *stream_packet_context; @@ -318,6 +327,8 @@ const char *state_string(enum state state) return "EMIT_MSG_PACKET_END_SINGLE"; case STATE_EMIT_QUEUED_MSG_PACKET_END: return "EMIT_QUEUED_MSG_PACKET_END"; + case STATE_CHECK_EMIT_MSG_STREAM_END: + return "CHECK_EMIT_MSG_STREAM_END"; case STATE_EMIT_MSG_STREAM_END: return "EMIT_MSG_STREAM_END"; case STATE_DONE: @@ -725,7 +736,7 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it) medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data); if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) { /* No more packets. */ - msg_it->state = STATE_EMIT_MSG_STREAM_END; + msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; status = CTF_MSG_ITER_STATUS_OK; goto end; } else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) { @@ -803,7 +814,7 @@ enum ctf_msg_iter_status read_packet_header_begin_state( break; case CTF_MSG_ITER_STATUS_EOF: status = CTF_MSG_ITER_STATUS_OK; - msg_it->state = STATE_EMIT_MSG_STREAM_END; + msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; goto end; default: goto end; @@ -1657,6 +1668,20 @@ end: return CTF_MSG_ITER_STATUS_OK; } +static inline +enum state check_emit_msg_stream_end(struct ctf_msg_iter *msg_it) +{ + enum state next_state; + + if (msg_it->emit_stream_end_message) { + next_state = STATE_EMIT_MSG_STREAM_END; + } else { + next_state = STATE_DONE; + } + + return next_state; +} + static inline enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) { @@ -1755,6 +1780,9 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) case STATE_EMIT_QUEUED_MSG_PACKET_END: msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE; break; + case STATE_CHECK_EMIT_MSG_STREAM_END: + msg_it->state = check_emit_msg_stream_end(msg_it); + break; case STATE_EMIT_MSG_STREAM_END: msg_it->state = STATE_DONE; break; @@ -1817,6 +1845,7 @@ void ctf_msg_iter_reset(struct ctf_msg_iter *msg_it) msg_it->prev_packet_snapshots.beginning_clock = UINT64_C(-1); msg_it->prev_packet_snapshots.end_clock = UINT64_C(-1); msg_it->emit_stream_beginning_message = true; + msg_it->emit_stream_end_message = false; } static @@ -2947,6 +2976,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message( /* create_msg_stream_beginning() logs errors */ *message = create_msg_stream_beginning(msg_it); msg_it->emit_stream_beginning_message = false; + msg_it->emit_stream_end_message = true; if (!*message) { status = CTF_MSG_ITER_STATUS_ERROR; @@ -2956,6 +2986,7 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message( case STATE_EMIT_MSG_STREAM_END: /* create_msg_stream_end() logs errors */ *message = create_msg_stream_end(msg_it); + msg_it->emit_stream_end_message = false; if (!*message) { status = CTF_MSG_ITER_STATUS_ERROR;