Fix: `ctf` plugin: msg-iter.c: do not switch packet twice
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 6 Aug 2019 16:49:54 +0000 (12:49 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 6 Aug 2019 21:39:18 +0000 (17:39 -0400)
Issue
=====
In read_packet_header_begin_state(), bt_msg_iter_switch_packet() is
called systematically at the beginning of the state. However,
buf_ensure_available_bits() can fail below (the medium can return
`BT_MSG_ITER_MEDIUM_STATUS_AGAIN`), and read_packet_header_begin_state()
will be called again next time, calling bt_msg_iter_switch_packet()
twice for the same packet.

One of the problems of calling bt_msg_iter_switch_packet() twice is
that, the first time, the current snapshots are copied to the previous
snapshots and the current snapshots are reset; the second time, the
current (reset) snapshots are copied to the previous snapshots. This
shatters the whole snapshot states. Because the medium installed by
`src.ctf.lttng-live` can return `BT_MSG_ITER_MEDIUM_STATUS_AGAIN`, all
the messages that depend on the previous snapshots, like the discarded
events/packets messages, are wrong.

Solution
========
In `msg-iter.c`, add a new `STATE_SWITCH_PACKET` state which is
dedicated to doing what bt_msg_iter_switch_packet() used to do. Once it
completes, the next state is `STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN`.

read_packet_header_begin_state() does not switch packets itself now (the
`STATE_SWITCH_PACKET` state already occured at this point), so it can
safely occur twice.

Known drawbacks
===============
There's one more state to handle per packet, but this have a significant
performance hit.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Reported-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Change-Id: I24dd2482762c61e339ab0310fddee6c9aa69d438
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1833
Tested-by: jenkins <jenkins@lttng.org>
src/plugins/ctf/common/msg-iter/msg-iter.c

index 7c4756716d3321f22980cefef5e2aa9b58f42032..87ff39af87a22a6bac395fea033dad0a4ececd5a 100644 (file)
@@ -80,6 +80,7 @@ struct stack {
 /* State */
 enum state {
        STATE_INIT,
+       STATE_SWITCH_PACKET,
        STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN,
        STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE,
        STATE_AFTER_TRACE_PACKET_HEADER,
@@ -259,6 +260,8 @@ const char *state_string(enum state state)
        switch (state) {
        case STATE_INIT:
                return "INIT";
+       case STATE_SWITCH_PACKET:
+               return "SWITCH_PACKET";
        case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
                return "DSCOPE_TRACE_PACKET_HEADER_BEGIN";
        case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE:
@@ -314,9 +317,6 @@ const char *state_string(enum state state)
        }
 }
 
-static
-int bt_msg_iter_switch_packet(struct bt_msg_iter *notit);
-
 static
 struct stack *stack_new(struct bt_msg_iter *notit)
 {
@@ -677,6 +677,71 @@ void release_all_dscopes(struct bt_msg_iter *notit)
        release_event_dscopes(notit);
 }
 
+static
+enum bt_msg_iter_status switch_packet_state(struct bt_msg_iter *notit)
+{
+       enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
+
+       /*
+        * We don't put the stream class here because we need to make
+        * sure that all the packets processed by the same message
+        * iterator refer to the same stream class (the first one).
+        */
+       BT_ASSERT(notit);
+
+       if (notit->cur_exp_packet_total_size != -1) {
+               notit->cur_packet_offset += notit->cur_exp_packet_total_size;
+       }
+
+       BT_COMP_LOGD("Switching packet: notit-addr=%p, cur=%zu, "
+               "packet-offset=%" PRId64, notit, notit->buf.at,
+               notit->cur_packet_offset);
+       stack_clear(notit->stack);
+       notit->meta.ec = NULL;
+       BT_PACKET_PUT_REF_AND_RESET(notit->packet);
+       BT_MESSAGE_PUT_REF_AND_RESET(notit->event_msg);
+       release_all_dscopes(notit);
+       notit->cur_dscope_field = NULL;
+
+       /*
+        * Adjust current buffer so that addr points to the beginning of the new
+        * packet.
+        */
+       if (notit->buf.addr) {
+               size_t consumed_bytes = (size_t) (notit->buf.at / CHAR_BIT);
+
+               /* Packets are assumed to start on a byte frontier. */
+               if (notit->buf.at % CHAR_BIT) {
+                       BT_COMP_LOGW("Cannot switch packet: current position is not a multiple of 8: "
+                               "notit-addr=%p, cur=%zu", notit, notit->buf.at);
+                       status = BT_MSG_ITER_STATUS_ERROR;
+                       goto end;
+               }
+
+               notit->buf.addr += consumed_bytes;
+               notit->buf.sz -= consumed_bytes;
+               notit->buf.at = 0;
+               notit->buf.packet_offset = 0;
+               BT_COMP_LOGD("Adjusted buffer: addr=%p, size=%zu",
+                       notit->buf.addr, notit->buf.sz);
+       }
+
+       notit->cur_exp_packet_content_size = -1;
+       notit->cur_exp_packet_total_size = -1;
+       notit->cur_stream_class_id = -1;
+       notit->cur_event_class_id = -1;
+       notit->cur_data_stream_id = -1;
+       notit->prev_packet_snapshots = notit->snapshots;
+       notit->snapshots.discarded_events = UINT64_C(-1);
+       notit->snapshots.packets = UINT64_C(-1);
+       notit->snapshots.beginning_clock = UINT64_C(-1);
+       notit->snapshots.end_clock = UINT64_C(-1);
+       notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN;
+
+end:
+       return status;
+}
+
 static
 enum bt_msg_iter_status read_packet_header_begin_state(
                struct bt_msg_iter *notit)
@@ -684,12 +749,6 @@ enum bt_msg_iter_status read_packet_header_begin_state(
        struct ctf_field_class *packet_header_fc = NULL;
        enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK;
 
-       if (bt_msg_iter_switch_packet(notit)) {
-               BT_COMP_LOGW("Cannot switch packet: notit-addr=%p", notit);
-               ret = BT_MSG_ITER_STATUS_ERROR;
-               goto end;
-       }
-
        /*
         * Make sure at least one bit is available for this packet. An
         * empty packet is impossible. If we reach the end of the medium
@@ -1420,11 +1479,12 @@ enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit)
 {
        enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
        size_t bits_to_skip;
+       const enum state next_state = STATE_SWITCH_PACKET;
 
        BT_ASSERT(notit->cur_exp_packet_total_size > 0);
        bits_to_skip = notit->cur_exp_packet_total_size - packet_at(notit);
        if (bits_to_skip == 0) {
-               notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN;
+               notit->state = next_state;
                goto end;
        } else {
                size_t bits_to_consume;
@@ -1443,7 +1503,7 @@ enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit)
                bits_to_skip = notit->cur_exp_packet_total_size -
                        packet_at(notit);
                if (bits_to_skip == 0) {
-                       notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN;
+                       notit->state = next_state;
                        goto end;
                }
        }
@@ -1585,7 +1645,10 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit)
        // TODO: optimalize!
        switch (state) {
        case STATE_INIT:
-               notit->state = STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN;
+               notit->state = STATE_SWITCH_PACKET;
+               break;
+       case STATE_SWITCH_PACKET:
+               status = switch_packet_state(notit);
                break;
        case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
                status = read_packet_header_begin_state(notit);
@@ -1738,70 +1801,6 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit)
        notit->prev_packet_snapshots.end_clock = UINT64_C(-1);
 }
 
-static
-int bt_msg_iter_switch_packet(struct bt_msg_iter *notit)
-{
-       int ret = 0;
-
-       /*
-        * We don't put the stream class here because we need to make
-        * sure that all the packets processed by the same message
-        * iterator refer to the same stream class (the first one).
-        */
-       BT_ASSERT(notit);
-
-       if (notit->cur_exp_packet_total_size != -1) {
-               notit->cur_packet_offset += notit->cur_exp_packet_total_size;
-       }
-
-       BT_COMP_LOGD("Switching packet: notit-addr=%p, cur=%zu, "
-               "packet-offset=%" PRId64, notit, notit->buf.at,
-               notit->cur_packet_offset);
-       stack_clear(notit->stack);
-       notit->meta.ec = NULL;
-       BT_PACKET_PUT_REF_AND_RESET(notit->packet);
-       BT_MESSAGE_PUT_REF_AND_RESET(notit->event_msg);
-       release_all_dscopes(notit);
-       notit->cur_dscope_field = NULL;
-
-       /*
-        * Adjust current buffer so that addr points to the beginning of the new
-        * packet.
-        */
-       if (notit->buf.addr) {
-               size_t consumed_bytes = (size_t) (notit->buf.at / CHAR_BIT);
-
-               /* Packets are assumed to start on a byte frontier. */
-               if (notit->buf.at % CHAR_BIT) {
-                       BT_COMP_LOGW("Cannot switch packet: current position is not a multiple of 8: "
-                               "notit-addr=%p, cur=%zu", notit, notit->buf.at);
-                       ret = -1;
-                       goto end;
-               }
-
-               notit->buf.addr += consumed_bytes;
-               notit->buf.sz -= consumed_bytes;
-               notit->buf.at = 0;
-               notit->buf.packet_offset = 0;
-               BT_COMP_LOGD("Adjusted buffer: addr=%p, size=%zu",
-                       notit->buf.addr, notit->buf.sz);
-       }
-
-       notit->cur_exp_packet_content_size = -1;
-       notit->cur_exp_packet_total_size = -1;
-       notit->cur_stream_class_id = -1;
-       notit->cur_event_class_id = -1;
-       notit->cur_data_stream_id = -1;
-       notit->prev_packet_snapshots = notit->snapshots;
-       notit->snapshots.discarded_events = UINT64_C(-1);
-       notit->snapshots.packets = UINT64_C(-1);
-       notit->snapshots.beginning_clock = UINT64_C(-1);
-       notit->snapshots.end_clock = UINT64_C(-1);
-
-end:
-       return ret;
-}
-
 static
 bt_field *borrow_next_field(struct bt_msg_iter *notit)
 {
@@ -2869,6 +2868,7 @@ enum bt_msg_iter_status read_packet_header_context_fields(
                         */
                        goto end;
                case STATE_INIT:
+               case STATE_SWITCH_PACKET:
                case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
                case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE:
                case STATE_AFTER_TRACE_PACKET_HEADER:
This page took 0.028602 seconds and 4 git commands to generate.