X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Fctf%2Fcommon%2Fmsg-iter%2Fmsg-iter.c;h=c4b98da03456c5a8e8c67eb2c5aa952ba914ba8c;hb=2c78269579ffc368585c5c7eae53fb4388b68d06;hp=0a16eed412fcfe3b2269a3d1248ddd3041446cdc;hpb=ebe4c684d397992e70455847890cc2fe13d2b79d;p=babeltrace.git diff --git a/src/plugins/ctf/common/msg-iter/msg-iter.c b/src/plugins/ctf/common/msg-iter/msg-iter.c index 0a16eed4..c4b98da0 100644 --- a/src/plugins/ctf/common/msg-iter/msg-iter.c +++ b/src/plugins/ctf/common/msg-iter/msg-iter.c @@ -87,7 +87,6 @@ enum state { STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN, STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE, STATE_AFTER_STREAM_PACKET_CONTEXT, - STATE_CHECK_EMIT_MSG_STREAM_BEGINNING, STATE_EMIT_MSG_STREAM_BEGINNING, STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS, STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS, @@ -108,7 +107,6 @@ enum state { STATE_SKIP_PACKET_PADDING, STATE_EMIT_MSG_PACKET_END_MULTI, STATE_EMIT_MSG_PACKET_END_SINGLE, - STATE_CHECK_EMIT_MSG_STREAM_END, STATE_EMIT_QUEUED_MSG_PACKET_END, STATE_EMIT_MSG_STREAM_END, STATE_DONE, @@ -129,12 +127,6 @@ struct ctf_msg_iter { /* Current message iterator to create messages (weak) */ bt_self_message_iterator *self_msg_iter; - /* True to emit a stream beginning message. */ - bool emit_stream_begin_msg; - - /* True to emit a stream end message. */ - bool emit_stream_end_msg; - /* * True if library objects are unavailable during the decoding and * should not be created/used. @@ -708,7 +700,7 @@ void release_all_dscopes(struct ctf_msg_iter *msg_it) static enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it) { - enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK; + enum ctf_msg_iter_status status; bt_self_component *self_comp = msg_it->self_comp; /* @@ -732,6 +724,30 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it) release_all_dscopes(msg_it); msg_it->cur_dscope_field = NULL; + if (msg_it->medium.medops.switch_packet) { + enum ctf_msg_iter_medium_status medium_status; + + medium_status = msg_it->medium.medops.switch_packet(msg_it->medium.data); + if (medium_status == CTF_MSG_ITER_MEDIUM_STATUS_EOF) { + /* No more packets. */ + msg_it->state = STATE_EMIT_MSG_STREAM_END; + status = CTF_MSG_ITER_STATUS_OK; + goto end; + } else if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) { + status = (int) medium_status; + goto end; + } + + /* + * After the packet switch, the medium might want to give us a + * different buffer for the new packet. + */ + status = request_medium_bytes(msg_it); + if (status != CTF_MSG_ITER_STATUS_OK) { + goto end; + } + } + /* * Adjust current buffer so that addr points to the beginning of the new * packet. @@ -768,6 +784,7 @@ enum ctf_msg_iter_status switch_packet_state(struct ctf_msg_iter *msg_it) msg_it->snapshots.end_clock = UINT64_C(-1); msg_it->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN; + status = CTF_MSG_ITER_STATUS_OK; end: return status; } @@ -791,7 +808,7 @@ enum ctf_msg_iter_status read_packet_header_begin_state( break; case CTF_MSG_ITER_STATUS_EOF: status = CTF_MSG_ITER_STATUS_OK; - msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; + msg_it->state = STATE_EMIT_MSG_STREAM_END; goto end; default: goto end; @@ -1137,7 +1154,7 @@ enum ctf_msg_iter_status after_packet_context_state(struct ctf_msg_iter *msg_it) */ msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; } else { - msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING; + msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING; } end: @@ -1572,30 +1589,6 @@ end: return status; } -static -enum ctf_msg_iter_status check_emit_msg_stream_beginning_state( - struct ctf_msg_iter *msg_it) -{ - enum ctf_msg_iter_status status = CTF_MSG_ITER_STATUS_OK; - - if (msg_it->set_stream) { - status = set_current_stream(msg_it); - if (status != CTF_MSG_ITER_STATUS_OK) { - goto end; - } - } - - if (msg_it->emit_stream_begin_msg) { - msg_it->state = STATE_EMIT_MSG_STREAM_BEGINNING; - } else { - /* Stream's first packet */ - msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; - } - -end: - return status; -} - static enum ctf_msg_iter_status check_emit_msg_discarded_events( struct ctf_msg_iter *msg_it) @@ -1680,19 +1673,6 @@ end: return CTF_MSG_ITER_STATUS_OK; } -static -enum ctf_msg_iter_status check_emit_msg_stream_end( - struct ctf_msg_iter *msg_it) -{ - if (msg_it->emit_stream_end_msg) { - msg_it->state = STATE_EMIT_MSG_STREAM_END; - } else { - msg_it->state = STATE_DONE; - } - - return CTF_MSG_ITER_STATUS_OK; -} - static inline enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) { @@ -1728,9 +1708,6 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) case STATE_AFTER_STREAM_PACKET_CONTEXT: status = after_packet_context_state(msg_it); break; - case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: - status = check_emit_msg_stream_beginning_state(msg_it); - break; case STATE_EMIT_MSG_STREAM_BEGINNING: msg_it->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; break; @@ -1789,10 +1766,7 @@ enum ctf_msg_iter_status handle_state(struct ctf_msg_iter *msg_it) msg_it->state = STATE_SKIP_PACKET_PADDING; break; case STATE_EMIT_MSG_PACKET_END_SINGLE: - msg_it->state = STATE_CHECK_EMIT_MSG_STREAM_END; - break; - case STATE_CHECK_EMIT_MSG_STREAM_END: - status = check_emit_msg_stream_end(msg_it); + msg_it->state = STATE_EMIT_MSG_STREAM_END; break; case STATE_EMIT_QUEUED_MSG_PACKET_END: msg_it->state = STATE_EMIT_MSG_PACKET_END_SINGLE; @@ -2837,6 +2811,8 @@ struct ctf_msg_iter *ctf_msg_iter_create( BT_ASSERT(tc); BT_ASSERT(medops.request_bytes); BT_ASSERT(medops.borrow_stream); + BT_ASSERT(max_request_sz > 0); + BT_COMP_LOG_CUR_LVL(BT_LOG_DEBUG, log_level, self_comp, "Creating CTF plugin message iterator: " "trace-addr=%p, max-request-size=%zu, " @@ -3016,6 +2992,12 @@ enum ctf_msg_iter_status ctf_msg_iter_get_next_message( goto end; case STATE_EMIT_MSG_STREAM_BEGINNING: + BT_ASSERT(!msg_it->stream); + status = set_current_stream(msg_it); + if (status != CTF_MSG_ITER_STATUS_OK) { + goto end; + } + /* create_msg_stream_beginning() logs errors */ *message = create_msg_stream_beginning(msg_it); @@ -3086,7 +3068,6 @@ enum ctf_msg_iter_status decode_until_state( struct ctf_msg_iter *msg_it, case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE: case STATE_AFTER_STREAM_PACKET_CONTEXT: - case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: case STATE_EMIT_MSG_STREAM_BEGINNING: case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: case STATE_EMIT_MSG_DISCARDED_EVENTS: @@ -3108,7 +3089,6 @@ enum ctf_msg_iter_status decode_until_state( struct ctf_msg_iter *msg_it, case STATE_EMIT_MSG_PACKET_END_MULTI: case STATE_EMIT_MSG_PACKET_END_SINGLE: case STATE_EMIT_QUEUED_MSG_PACKET_END: - case STATE_CHECK_EMIT_MSG_STREAM_END: case STATE_EMIT_MSG_STREAM_END: break; case STATE_DONE: @@ -3147,14 +3127,6 @@ end: return status; } -BT_HIDDEN -void ctf_msg_iter_set_medops_data(struct ctf_msg_iter *msg_it, - void *medops_data) -{ - BT_ASSERT(msg_it); - msg_it->medium.data = medops_data; -} - BT_HIDDEN enum ctf_msg_iter_status ctf_msg_iter_seek(struct ctf_msg_iter *msg_it, off_t offset) @@ -3164,12 +3136,7 @@ enum ctf_msg_iter_status ctf_msg_iter_seek(struct ctf_msg_iter *msg_it, BT_ASSERT(msg_it); BT_ASSERT(offset >= 0); - - if (!msg_it->medium.medops.seek) { - status = CTF_MSG_ITER_STATUS_UNSUPPORTED; - BT_COMP_LOGD("Aborting seek as the iterator's underlying media does not implement seek support."); - goto end; - } + BT_ASSERT(msg_it->medium.medops.seek); medium_status = msg_it->medium.medops.seek(offset, msg_it->medium.data); if (medium_status != CTF_MSG_ITER_MEDIUM_STATUS_OK) { @@ -3251,20 +3218,6 @@ end: return status; } -BT_HIDDEN -void ctf_msg_iter_set_emit_stream_beginning_message(struct ctf_msg_iter *msg_it, - bool val) -{ - msg_it->emit_stream_begin_msg = val; -} - -BT_HIDDEN -void ctf_msg_iter_set_emit_stream_end_message(struct ctf_msg_iter *msg_it, - bool val) -{ - msg_it->emit_stream_end_msg = val; -} - BT_HIDDEN void ctf_msg_iter_set_dry_run(struct ctf_msg_iter *msg_it, bool val)