From 44269abb1a3769a1f76eab0de5b23cd7cb0cb58d Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Tue, 6 Aug 2019 12:49:54 -0400 Subject: [PATCH] Fix: `ctf` plugin: msg-iter.c: do not switch packet twice Issue ===== In read_packet_header_begin_state(), bt_msg_iter_switch_packet() is called systematically at the beginning of the state. However, buf_ensure_available_bits() can fail below (the medium can return `BT_MSG_ITER_MEDIUM_STATUS_AGAIN`), and read_packet_header_begin_state() will be called again next time, calling bt_msg_iter_switch_packet() twice for the same packet. One of the problems of calling bt_msg_iter_switch_packet() twice is that, the first time, the current snapshots are copied to the previous snapshots and the current snapshots are reset; the second time, the current (reset) snapshots are copied to the previous snapshots. This shatters the whole snapshot states. Because the medium installed by `src.ctf.lttng-live` can return `BT_MSG_ITER_MEDIUM_STATUS_AGAIN`, all the messages that depend on the previous snapshots, like the discarded events/packets messages, are wrong. Solution ======== In `msg-iter.c`, add a new `STATE_SWITCH_PACKET` state which is dedicated to doing what bt_msg_iter_switch_packet() used to do. Once it completes, the next state is `STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN`. read_packet_header_begin_state() does not switch packets itself now (the `STATE_SWITCH_PACKET` state already occured at this point), so it can safely occur twice. Known drawbacks =============== There's one more state to handle per packet, but this have a significant performance hit. Signed-off-by: Philippe Proulx Reported-by: Jonathan Rajotte Change-Id: I24dd2482762c61e339ab0310fddee6c9aa69d438 Reviewed-on: https://review.lttng.org/c/babeltrace/+/1833 Tested-by: jenkins --- src/plugins/ctf/common/msg-iter/msg-iter.c | 152 ++++++++++----------- 1 file changed, 76 insertions(+), 76 deletions(-) diff --git a/src/plugins/ctf/common/msg-iter/msg-iter.c b/src/plugins/ctf/common/msg-iter/msg-iter.c index 7c475671..87ff39af 100644 --- a/src/plugins/ctf/common/msg-iter/msg-iter.c +++ b/src/plugins/ctf/common/msg-iter/msg-iter.c @@ -80,6 +80,7 @@ struct stack { /* State */ enum state { STATE_INIT, + STATE_SWITCH_PACKET, STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN, STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE, STATE_AFTER_TRACE_PACKET_HEADER, @@ -259,6 +260,8 @@ const char *state_string(enum state state) switch (state) { case STATE_INIT: return "INIT"; + case STATE_SWITCH_PACKET: + return "SWITCH_PACKET"; case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN: return "DSCOPE_TRACE_PACKET_HEADER_BEGIN"; case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE: @@ -314,9 +317,6 @@ const char *state_string(enum state state) } } -static -int bt_msg_iter_switch_packet(struct bt_msg_iter *notit); - static struct stack *stack_new(struct bt_msg_iter *notit) { @@ -677,6 +677,71 @@ void release_all_dscopes(struct bt_msg_iter *notit) release_event_dscopes(notit); } +static +enum bt_msg_iter_status switch_packet_state(struct bt_msg_iter *notit) +{ + enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; + + /* + * We don't put the stream class here because we need to make + * sure that all the packets processed by the same message + * iterator refer to the same stream class (the first one). + */ + BT_ASSERT(notit); + + if (notit->cur_exp_packet_total_size != -1) { + notit->cur_packet_offset += notit->cur_exp_packet_total_size; + } + + BT_COMP_LOGD("Switching packet: notit-addr=%p, cur=%zu, " + "packet-offset=%" PRId64, notit, notit->buf.at, + notit->cur_packet_offset); + stack_clear(notit->stack); + notit->meta.ec = NULL; + BT_PACKET_PUT_REF_AND_RESET(notit->packet); + BT_MESSAGE_PUT_REF_AND_RESET(notit->event_msg); + release_all_dscopes(notit); + notit->cur_dscope_field = NULL; + + /* + * Adjust current buffer so that addr points to the beginning of the new + * packet. + */ + if (notit->buf.addr) { + size_t consumed_bytes = (size_t) (notit->buf.at / CHAR_BIT); + + /* Packets are assumed to start on a byte frontier. */ + if (notit->buf.at % CHAR_BIT) { + BT_COMP_LOGW("Cannot switch packet: current position is not a multiple of 8: " + "notit-addr=%p, cur=%zu", notit, notit->buf.at); + status = BT_MSG_ITER_STATUS_ERROR; + goto end; + } + + notit->buf.addr += consumed_bytes; + notit->buf.sz -= consumed_bytes; + notit->buf.at = 0; + notit->buf.packet_offset = 0; + BT_COMP_LOGD("Adjusted buffer: addr=%p, size=%zu", + notit->buf.addr, notit->buf.sz); + } + + notit->cur_exp_packet_content_size = -1; + notit->cur_exp_packet_total_size = -1; + notit->cur_stream_class_id = -1; + notit->cur_event_class_id = -1; + notit->cur_data_stream_id = -1; + notit->prev_packet_snapshots = notit->snapshots; + notit->snapshots.discarded_events = UINT64_C(-1); + notit->snapshots.packets = UINT64_C(-1); + notit->snapshots.beginning_clock = UINT64_C(-1); + notit->snapshots.end_clock = UINT64_C(-1); + notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN; + +end: + return status; +} + static enum bt_msg_iter_status read_packet_header_begin_state( struct bt_msg_iter *notit) @@ -684,12 +749,6 @@ enum bt_msg_iter_status read_packet_header_begin_state( struct ctf_field_class *packet_header_fc = NULL; enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK; - if (bt_msg_iter_switch_packet(notit)) { - BT_COMP_LOGW("Cannot switch packet: notit-addr=%p", notit); - ret = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - /* * Make sure at least one bit is available for this packet. An * empty packet is impossible. If we reach the end of the medium @@ -1420,11 +1479,12 @@ enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; size_t bits_to_skip; + const enum state next_state = STATE_SWITCH_PACKET; BT_ASSERT(notit->cur_exp_packet_total_size > 0); bits_to_skip = notit->cur_exp_packet_total_size - packet_at(notit); if (bits_to_skip == 0) { - notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN; + notit->state = next_state; goto end; } else { size_t bits_to_consume; @@ -1443,7 +1503,7 @@ enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit) bits_to_skip = notit->cur_exp_packet_total_size - packet_at(notit); if (bits_to_skip == 0) { - notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN; + notit->state = next_state; goto end; } } @@ -1585,7 +1645,10 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) // TODO: optimalize! switch (state) { case STATE_INIT: - notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN; + notit->state = STATE_SWITCH_PACKET; + break; + case STATE_SWITCH_PACKET: + status = switch_packet_state(notit); break; case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN: status = read_packet_header_begin_state(notit); @@ -1738,70 +1801,6 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit) notit->prev_packet_snapshots.end_clock = UINT64_C(-1); } -static -int bt_msg_iter_switch_packet(struct bt_msg_iter *notit) -{ - int ret = 0; - - /* - * We don't put the stream class here because we need to make - * sure that all the packets processed by the same message - * iterator refer to the same stream class (the first one). - */ - BT_ASSERT(notit); - - if (notit->cur_exp_packet_total_size != -1) { - notit->cur_packet_offset += notit->cur_exp_packet_total_size; - } - - BT_COMP_LOGD("Switching packet: notit-addr=%p, cur=%zu, " - "packet-offset=%" PRId64, notit, notit->buf.at, - notit->cur_packet_offset); - stack_clear(notit->stack); - notit->meta.ec = NULL; - BT_PACKET_PUT_REF_AND_RESET(notit->packet); - BT_MESSAGE_PUT_REF_AND_RESET(notit->event_msg); - release_all_dscopes(notit); - notit->cur_dscope_field = NULL; - - /* - * Adjust current buffer so that addr points to the beginning of the new - * packet. - */ - if (notit->buf.addr) { - size_t consumed_bytes = (size_t) (notit->buf.at / CHAR_BIT); - - /* Packets are assumed to start on a byte frontier. */ - if (notit->buf.at % CHAR_BIT) { - BT_COMP_LOGW("Cannot switch packet: current position is not a multiple of 8: " - "notit-addr=%p, cur=%zu", notit, notit->buf.at); - ret = -1; - goto end; - } - - notit->buf.addr += consumed_bytes; - notit->buf.sz -= consumed_bytes; - notit->buf.at = 0; - notit->buf.packet_offset = 0; - BT_COMP_LOGD("Adjusted buffer: addr=%p, size=%zu", - notit->buf.addr, notit->buf.sz); - } - - notit->cur_exp_packet_content_size = -1; - notit->cur_exp_packet_total_size = -1; - notit->cur_stream_class_id = -1; - notit->cur_event_class_id = -1; - notit->cur_data_stream_id = -1; - notit->prev_packet_snapshots = notit->snapshots; - notit->snapshots.discarded_events = UINT64_C(-1); - notit->snapshots.packets = UINT64_C(-1); - notit->snapshots.beginning_clock = UINT64_C(-1); - notit->snapshots.end_clock = UINT64_C(-1); - -end: - return ret; -} - static bt_field *borrow_next_field(struct bt_msg_iter *notit) { @@ -2869,6 +2868,7 @@ enum bt_msg_iter_status read_packet_header_context_fields( */ goto end; case STATE_INIT: + case STATE_SWITCH_PACKET: case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN: case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE: case STATE_AFTER_TRACE_PACKET_HEADER: -- 2.34.1