src.ctf.fs: emit stream activity beginning/end messages
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 13 Feb 2019 23:13:00 +0000 (18:13 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 3 May 2019 22:19:37 +0000 (18:19 -0400)
Emit the newly introduced stream activity beginning/end messages from
`src.ctf.fs`.

With CTF 1.8, there is no way to know the true beginning and end times
of a stream's activity. We cannot rely on the first packet's beginning
time and the last packet's end time because this is not what they mean:
they are, in fact, used to find the time ranges of the discarded events
and packets.

I'm adding (internal) bt_msg_iter_set_emit_stream_beginning_message()
and bt_msg_iter_set_emit_stream_end_message() which control whether or
not the stream and stream activity beginning messages, and the stream
and stream activity end messages are emitted. This is used to support
more easily and efficiently multiple files per data stream: beginning
messages are enabled for the first stream file and end messages are
enabled for the last stream file.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
plugins/ctf/common/msg-iter/msg-iter.c
plugins/ctf/common/msg-iter/msg-iter.h
plugins/ctf/fs-src/data-stream-file.c
plugins/ctf/fs-src/fs.c
plugins/ctf/fs-src/fs.h

index 91d06e9da78355f4c33778ba69790fb741149f75..907b299a7247fb3c62b4dc150c079f58f5c4030d 100644 (file)
@@ -80,8 +80,9 @@ enum state {
        STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN,
        STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE,
        STATE_AFTER_STREAM_PACKET_CONTEXT,
-       STATE_EMIT_MSG_NEW_STREAM,
-       STATE_EMIT_MSG_NEW_PACKET,
+       STATE_EMIT_MSG_STREAM_BEGINNING,
+       STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING,
+       STATE_EMIT_MSG_PACKET_BEGINNING,
        STATE_DSCOPE_EVENT_HEADER_BEGIN,
        STATE_DSCOPE_EVENT_HEADER_CONTINUE,
        STATE_AFTER_EVENT_HEADER,
@@ -92,9 +93,12 @@ enum state {
        STATE_DSCOPE_EVENT_PAYLOAD_BEGIN,
        STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE,
        STATE_EMIT_MSG_EVENT,
-       STATE_EMIT_MSG_END_OF_PACKET,
-       STATE_DONE,
        STATE_SKIP_PACKET_PADDING,
+       STATE_EMIT_MSG_PACKET_END_MULTI,
+       STATE_EMIT_MSG_PACKET_END_SINGLE,
+       STATE_EMIT_MSG_STREAM_ACTIVITY_END,
+       STATE_EMIT_MSG_STREAM_END,
+       STATE_DONE,
 };
 
 /* CTF message iterator */
@@ -105,6 +109,17 @@ struct bt_msg_iter {
        /* Current message iterator to create messages (weak) */
        bt_self_message_iterator *msg_iter;
 
+       /*
+        * True to emit stream beginning and stream activity beginning
+        * messages.
+        */
+       bool emit_stream_begin_msg;
+
+       /*
+        * True to emit stream end and stream activity end messages.
+        */
+       bool emit_stream_end_msg;
+
        /*
         * Current dynamic scope field pointer.
         *
@@ -183,9 +198,6 @@ struct bt_msg_iter {
                void *data;
        } medium;
 
-       /* Stream beginning was emitted */
-       bool stream_begin_emitted;
-
        /* Current packet size (bits) (-1 if unknown) */
        int64_t cur_exp_packet_total_size;
 
@@ -240,10 +252,12 @@ const char *state_string(enum state state)
                return "STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE";
        case STATE_AFTER_STREAM_PACKET_CONTEXT:
                return "STATE_AFTER_STREAM_PACKET_CONTEXT";
-       case STATE_EMIT_MSG_NEW_PACKET:
-               return "STATE_EMIT_MSG_NEW_PACKET";
-       case STATE_EMIT_MSG_NEW_STREAM:
-               return "STATE_EMIT_MSG_NEW_STREAM";
+       case STATE_EMIT_MSG_STREAM_BEGINNING:
+               return "STATE_EMIT_MSG_STREAM_BEGINNING";
+       case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+               return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING";
+       case STATE_EMIT_MSG_PACKET_BEGINNING:
+               return "STATE_EMIT_MSG_PACKET_BEGINNING";
        case STATE_DSCOPE_EVENT_HEADER_BEGIN:
                return "STATE_DSCOPE_EVENT_HEADER_BEGIN";
        case STATE_DSCOPE_EVENT_HEADER_CONTINUE:
@@ -264,12 +278,18 @@ const char *state_string(enum state state)
                return "STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE";
        case STATE_EMIT_MSG_EVENT:
                return "STATE_EMIT_MSG_EVENT";
-       case STATE_EMIT_MSG_END_OF_PACKET:
-               return "STATE_EMIT_MSG_END_OF_PACKET";
-       case STATE_DONE:
-               return "STATE_DONE";
        case STATE_SKIP_PACKET_PADDING:
                return "STATE_SKIP_PACKET_PADDING";
+       case STATE_EMIT_MSG_PACKET_END_MULTI:
+               return "STATE_EMIT_MSG_PACKET_END_MULTI";
+       case STATE_EMIT_MSG_PACKET_END_SINGLE:
+               return "STATE_EMIT_MSG_PACKET_END_SINGLE";
+       case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+               return "STATE_EMIT_MSG_STREAM_ACTIVITY_END";
+       case STATE_EMIT_MSG_STREAM_END:
+               return "STATE_EMIT_MSG_STREAM_END";
+       case STATE_DONE:
+               return "STATE_DONE";
        default:
                return "(unknown)";
        }
@@ -642,6 +662,23 @@ enum bt_msg_iter_status read_packet_header_begin_state(
                goto end;
        }
 
+       /*
+        * Make sure at least one bit is available for this packet. An
+        * empty packet is impossible. If we reach the end of the medium
+        * at this point, then it's considered the end of the stream.
+        */
+       ret = buf_ensure_available_bits(notit);
+       switch (ret) {
+       case BT_MSG_ITER_STATUS_OK:
+               break;
+       case BT_MSG_ITER_STATUS_EOF:
+               ret = BT_MSG_ITER_STATUS_OK;
+               notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+               goto end;
+       default:
+               goto end;
+       }
+
        /* Packet header class is common to the whole trace class. */
        packet_header_fc = notit->meta.tc->packet_header_fc;
        if (!packet_header_fc) {
@@ -949,13 +986,13 @@ enum bt_msg_iter_status set_current_packet_content_sizes(
                "notit-addr=%p, packet-size=%" PRIu64 ", content-size=%" PRIu64,
                notit, notit->cur_exp_packet_total_size,
                notit->cur_exp_packet_content_size);
+
 end:
        return status;
 }
 
 static
-enum bt_msg_iter_status after_packet_context_state(
-               struct bt_msg_iter *notit)
+enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit)
 {
        enum bt_msg_iter_status status;
 
@@ -964,10 +1001,15 @@ enum bt_msg_iter_status after_packet_context_state(
                goto end;
        }
 
-       if (notit->stream_begin_emitted) {
-               notit->state = STATE_EMIT_MSG_NEW_PACKET;
+       if (notit->stream) {
+               /*
+                * Stream exists, which means we already emitted at
+                * least one packet beginning message, so the initial
+                * stream beginning message was also emitted.
+                */
+               notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
        } else {
-               notit->state = STATE_EMIT_MSG_NEW_STREAM;
+               notit->state = STATE_EMIT_MSG_STREAM_BEGINNING;
        }
 
 end:
@@ -975,8 +1017,7 @@ end:
 }
 
 static
-enum bt_msg_iter_status read_event_header_begin_state(
-               struct bt_msg_iter *notit)
+enum bt_msg_iter_status read_event_header_begin_state(struct bt_msg_iter *notit)
 {
        enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
        struct ctf_field_class *event_header_fc = NULL;
@@ -992,7 +1033,7 @@ enum bt_msg_iter_status read_event_header_begin_state(
                        /* No more events! */
                        BT_LOGV("Reached end of packet: notit-addr=%p, "
                                "cur=%zu", notit, packet_at(notit));
-                       notit->state = STATE_EMIT_MSG_END_OF_PACKET;
+                       notit->state = STATE_EMIT_MSG_PACKET_END_MULTI;
                        goto end;
                } else if (unlikely(packet_at(notit) >
                                notit->cur_exp_packet_content_size)) {
@@ -1011,22 +1052,14 @@ enum bt_msg_iter_status read_event_header_begin_state(
                 * nothing else for us.
                 */
                status = buf_ensure_available_bits(notit);
-               if (status != BT_MSG_ITER_STATUS_OK) {
-                       /*
-                        * If this function returns
-                        * `BT_MSG_ITER_STATUS_EOF`:
-                        *
-                        * 1. bt_msg_iter_get_next_message()
-                        *    emits a "packet end" message. This
-                        *    resets the current packet. The state
-                        *    remains unchanged otherwise.
-                        * 2. This function is called again. It returns
-                        *    `BT_MSG_ITER_STATUS_EOF` again.
-                        * 3. bt_msg_iter_get_next_message()
-                        *    emits a "stream end" message because
-                        *    there's no current packet. It sets the
-                        *    current state to `STATE_DONE`.
-                        */
+               switch (status) {
+               case BT_MSG_ITER_STATUS_OK:
+                       break;
+               case BT_MSG_ITER_STATUS_EOF:
+                       status = BT_MSG_ITER_STATUS_OK;
+                       notit->state = STATE_EMIT_MSG_PACKET_END_SINGLE;
+                       goto end;
+               default:
                        goto end;
                }
        }
@@ -1347,8 +1380,7 @@ enum bt_msg_iter_status read_event_payload_continue_state(
 }
 
 static
-enum bt_msg_iter_status skip_packet_padding_state(
-               struct bt_msg_iter *notit)
+enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit)
 {
        enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
        size_t bits_to_skip;
@@ -1416,10 +1448,13 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit)
        case STATE_AFTER_STREAM_PACKET_CONTEXT:
                status = after_packet_context_state(notit);
                break;
-       case STATE_EMIT_MSG_NEW_STREAM:
-               notit->state = STATE_EMIT_MSG_NEW_PACKET;
+       case STATE_EMIT_MSG_STREAM_BEGINNING:
+               notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING;
+               break;
+       case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+               notit->state = STATE_EMIT_MSG_PACKET_BEGINNING;
                break;
-       case STATE_EMIT_MSG_NEW_PACKET:
+       case STATE_EMIT_MSG_PACKET_BEGINNING:
                notit->state = STATE_DSCOPE_EVENT_HEADER_BEGIN;
                break;
        case STATE_DSCOPE_EVENT_HEADER_BEGIN:
@@ -1455,9 +1490,20 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit)
        case STATE_SKIP_PACKET_PADDING:
                status = skip_packet_padding_state(notit);
                break;
-       case STATE_EMIT_MSG_END_OF_PACKET:
+       case STATE_EMIT_MSG_PACKET_END_MULTI:
                notit->state = STATE_SKIP_PACKET_PADDING;
                break;
+       case STATE_EMIT_MSG_PACKET_END_SINGLE:
+               notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END;
+               break;
+       case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+               notit->state = STATE_EMIT_MSG_STREAM_END;
+               break;
+       case STATE_EMIT_MSG_STREAM_END:
+               notit->state = STATE_DONE;
+               break;
+       case STATE_DONE:
+               break;
        default:
                BT_LOGD("Unknown CTF plugin message iterator state: "
                        "notit-addr=%p, state=%d", notit, notit->state);
@@ -1505,7 +1551,8 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit)
        notit->cur_stream_class_id = -1;
        notit->cur_event_class_id = -1;
        notit->cur_data_stream_id = -1;
-       notit->stream_begin_emitted = false;
+       notit->emit_stream_begin_msg = true;
+       notit->emit_stream_end_msg = true;
 }
 
 static
@@ -2167,9 +2214,7 @@ end:
 static
 void set_event_default_clock_snapshot(struct bt_msg_iter *notit)
 {
-       bt_event *event =
-               bt_message_event_borrow_event(
-                       notit->event_msg);
+       bt_event *event = bt_message_event_borrow_event(notit->event_msg);
        bt_stream_class *sc = notit->meta.sc->ir_sc;
 
        BT_ASSERT(event);
@@ -2181,18 +2226,11 @@ void set_event_default_clock_snapshot(struct bt_msg_iter *notit)
 }
 
 static
-void notify_new_stream(struct bt_msg_iter *notit,
+void create_msg_stream_beginning(struct bt_msg_iter *notit,
                bt_message **message)
 {
-       enum bt_msg_iter_status status;
        bt_message *ret = NULL;
 
-       status = set_current_stream(notit);
-       if (status != BT_MSG_ITER_STATUS_OK) {
-               BT_MESSAGE_PUT_REF_AND_RESET(ret);
-               goto end;
-       }
-
        BT_ASSERT(notit->stream);
        BT_ASSERT(notit->msg_iter);
        ret = bt_message_stream_beginning_create(notit->msg_iter,
@@ -2204,13 +2242,57 @@ void notify_new_stream(struct bt_msg_iter *notit,
                return;
        }
 
-end:
        *message = ret;
 }
 
 static
-void notify_end_of_stream(struct bt_msg_iter *notit,
+void create_msg_stream_activity_beginning(struct bt_msg_iter *notit,
+               bt_message **message)
+{
+       bt_message *ret = NULL;
+
+       BT_ASSERT(notit->stream);
+       BT_ASSERT(notit->msg_iter);
+       ret = bt_message_stream_activity_beginning_create(notit->msg_iter,
+               notit->stream);
+       if (!ret) {
+               BT_LOGE("Cannot create stream activity beginning message: "
+                       "notit-addr=%p, stream-addr=%p",
+                       notit, notit->stream);
+               return;
+       }
+
+       *message = ret;
+}
+
+static
+void create_msg_stream_activity_end(struct bt_msg_iter *notit,
                bt_message **message)
+{
+       bt_message *ret = NULL;
+
+       if (!notit->stream) {
+               BT_LOGE("Cannot create stream for stream message: "
+                       "notit-addr=%p", notit);
+               return;
+       }
+
+       BT_ASSERT(notit->stream);
+       BT_ASSERT(notit->msg_iter);
+       ret = bt_message_stream_activity_end_create(notit->msg_iter,
+               notit->stream);
+       if (!ret) {
+               BT_LOGE("Cannot create stream activity end message: "
+                       "notit-addr=%p, stream-addr=%p",
+                       notit, notit->stream);
+               return;
+       }
+
+       *message = ret;
+}
+
+static
+void create_msg_stream_end(struct bt_msg_iter *notit, bt_message **message)
 {
        bt_message *ret;
 
@@ -2224,16 +2306,17 @@ void notify_end_of_stream(struct bt_msg_iter *notit,
        ret = bt_message_stream_end_create(notit->msg_iter,
                notit->stream);
        if (!ret) {
-               BT_LOGE("Cannot create stream beginning message: "
+               BT_LOGE("Cannot create stream end message: "
                        "notit-addr=%p, stream-addr=%p",
                        notit, notit->stream);
                return;
        }
+
        *message = ret;
 }
 
 static
-void notify_new_packet(struct bt_msg_iter *notit,
+void create_msg_packet_beginning(struct bt_msg_iter *notit,
                bt_message **message)
 {
        int ret;
@@ -2310,8 +2393,7 @@ end:
 }
 
 static
-void notify_end_of_packet(struct bt_msg_iter *notit,
-               bt_message **message)
+void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message)
 {
        bt_message *msg;
 
@@ -2435,95 +2517,111 @@ void bt_msg_iter_destroy(struct bt_msg_iter *notit)
 
 enum bt_msg_iter_status bt_msg_iter_get_next_message(
                struct bt_msg_iter *notit,
-               bt_self_message_iterator *msg_iter,
-               bt_message **message)
+               bt_self_message_iterator *msg_iter, bt_message **message)
 {
        enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK;
 
        BT_ASSERT(notit);
        BT_ASSERT(message);
-
-       if (notit->state == STATE_DONE) {
-               status = BT_MSG_ITER_STATUS_EOF;
-               goto end;
-       }
-
        notit->msg_iter = msg_iter;
-
        BT_LOGV("Getting next message: notit-addr=%p", notit);
 
        while (true) {
                status = handle_state(notit);
-               if (status == BT_MSG_ITER_STATUS_AGAIN) {
+               if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
                        BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
                        goto end;
-               }
-
-               if (status != BT_MSG_ITER_STATUS_OK) {
-                       if (status == BT_MSG_ITER_STATUS_EOF) {
-                               enum state next_state = notit->state;
-
-                               BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
-
-                               if (notit->packet) {
-                                       notify_end_of_packet(notit,
-                                               message);
-                               } else {
-                                       notify_end_of_stream(notit,
-                                               message);
-                                       next_state = STATE_DONE;
-                               }
-
-                               if (!*message) {
-                                       status = BT_MSG_ITER_STATUS_ERROR;
-                                       goto end;
-                               }
-
-                               status = BT_MSG_ITER_STATUS_OK;
-                               notit->state = next_state;
-                       } else {
-                               BT_LOGW("Cannot handle state: "
-                                       "notit-addr=%p, state=%s",
-                                       notit, state_string(notit->state));
-                       }
-
+               } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+                       BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+                               notit, state_string(notit->state));
                        goto end;
                }
 
                switch (notit->state) {
-               case STATE_EMIT_MSG_NEW_STREAM:
-                       /* notify_new_stream() logs errors */
-                       notify_new_stream(notit, message);
+               case STATE_EMIT_MSG_EVENT:
+                       BT_ASSERT(notit->event_msg);
+                       set_event_default_clock_snapshot(notit);
+                       *message = notit->event_msg;
+                       notit->event_msg = NULL;
+                       goto end;
+               case STATE_EMIT_MSG_PACKET_BEGINNING:
+                       /* create_msg_packet_beginning() logs errors */
+                       create_msg_packet_beginning(notit, message);
 
                        if (!*message) {
                                status = BT_MSG_ITER_STATUS_ERROR;
                        }
 
-                       notit->stream_begin_emitted = true;
                        goto end;
-               case STATE_EMIT_MSG_NEW_PACKET:
-                       /* notify_new_packet() logs errors */
-                       notify_new_packet(notit, message);
+               case STATE_EMIT_MSG_PACKET_END_SINGLE:
+               case STATE_EMIT_MSG_PACKET_END_MULTI:
+                       /* create_msg_packet_end() logs errors */
+                       create_msg_packet_end(notit, message);
 
                        if (!*message) {
                                status = BT_MSG_ITER_STATUS_ERROR;
                        }
 
                        goto end;
-               case STATE_EMIT_MSG_EVENT:
-                       BT_ASSERT(notit->event_msg);
-                       set_event_default_clock_snapshot(notit);
-                       *message = notit->event_msg;
-                       notit->event_msg = NULL;
-                       goto end;
-               case STATE_EMIT_MSG_END_OF_PACKET:
-                       /* notify_end_of_packet() logs errors */
-                       notify_end_of_packet(notit, message);
+               case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
+                       if (notit->emit_stream_begin_msg) {
+                               /* create_msg_stream_activity_beginning() logs errors */
+                               create_msg_stream_activity_beginning(notit, message);
 
-                       if (!*message) {
-                               status = BT_MSG_ITER_STATUS_ERROR;
+                               if (!*message) {
+                                       status = BT_MSG_ITER_STATUS_ERROR;
+                               }
+
+                               goto end;
                        }
 
+                       break;
+               case STATE_EMIT_MSG_STREAM_ACTIVITY_END:
+                       if (notit->emit_stream_end_msg) {
+                               /* create_msg_stream_activity_end() logs errors */
+                               create_msg_stream_activity_end(notit, message);
+
+                               if (!*message) {
+                                       status = BT_MSG_ITER_STATUS_ERROR;
+                               }
+
+                               goto end;
+                       }
+
+                       break;
+               case STATE_EMIT_MSG_STREAM_BEGINNING:
+                       status = set_current_stream(notit);
+                       if (status != BT_MSG_ITER_STATUS_OK) {
+                               goto end;
+                       }
+
+                       if (notit->emit_stream_begin_msg) {
+                               /* create_msg_stream_beginning() logs errors */
+                               create_msg_stream_beginning(notit, message);
+
+                               if (!*message) {
+                                       status = BT_MSG_ITER_STATUS_ERROR;
+                               }
+
+                               goto end;
+                       }
+
+                       break;
+               case STATE_EMIT_MSG_STREAM_END:
+                       if (notit->emit_stream_end_msg) {
+                               /* create_msg_stream_end() logs errors */
+                               create_msg_stream_end(notit, message);
+
+                               if (!*message) {
+                                       status = BT_MSG_ITER_STATUS_ERROR;
+                               }
+
+                               goto end;
+                       }
+
+                       break;
+               case STATE_DONE:
+                       status = BT_MSG_ITER_STATUS_EOF;
                        goto end;
                default:
                        /* Non-emitting state: continue */
@@ -2544,49 +2642,44 @@ enum bt_msg_iter_status read_packet_header_context_fields(
 
        BT_ASSERT(notit);
 
-       if (notit->state == STATE_EMIT_MSG_NEW_PACKET) {
+       if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) {
                /* We're already there */
                goto end;
        }
 
        while (true) {
                status = handle_state(notit);
-               if (status == BT_MSG_ITER_STATUS_AGAIN) {
+               if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) {
                        BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN.");
                        goto end;
-               }
-               if (status != BT_MSG_ITER_STATUS_OK) {
-                       if (status == BT_MSG_ITER_STATUS_EOF) {
-                               BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF.");
-                       } else {
-                               BT_LOGW("Cannot handle state: "
-                                       "notit-addr=%p, state=%s",
-                                       notit, state_string(notit->state));
-                       }
+               } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) {
+                       BT_LOGW("Cannot handle state: notit-addr=%p, state=%s",
+                               notit, state_string(notit->state));
                        goto end;
                }
 
                switch (notit->state) {
-               case STATE_EMIT_MSG_NEW_PACKET:
+               case STATE_EMIT_MSG_PACKET_BEGINNING:
                        /*
                         * Packet header and context fields are
                         * potentially decoded (or they don't exist).
                         */
                        goto end;
                case STATE_INIT:
-               case STATE_EMIT_MSG_NEW_STREAM:
                case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN:
                case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE:
                case STATE_AFTER_TRACE_PACKET_HEADER:
                case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN:
                case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE:
                case STATE_AFTER_STREAM_PACKET_CONTEXT:
+               case STATE_EMIT_MSG_STREAM_BEGINNING:
+               case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING:
                        /* Non-emitting state: continue */
                        break;
                default:
                        /*
                         * We should never get past the
-                        * STATE_EMIT_MSG_NEW_PACKET state.
+                        * STATE_EMIT_MSG_PACKET_BEGINNING state.
                         */
                        BT_LOGF("Unexpected state: notit-addr=%p, state=%s",
                                notit, state_string(notit->state));
@@ -2613,8 +2706,8 @@ void bt_msg_iter_set_medops_data(struct bt_msg_iter *notit,
 }
 
 BT_HIDDEN
-enum bt_msg_iter_status bt_msg_iter_seek(
-               struct bt_msg_iter *notit, off_t offset)
+enum bt_msg_iter_status bt_msg_iter_seek(struct bt_msg_iter *notit,
+               off_t offset)
 {
        enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK;
        enum bt_msg_iter_medium_status medium_status;
@@ -2650,30 +2743,6 @@ end:
        return ret;
 }
 
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_offset(struct bt_msg_iter *notit)
-{
-       BT_ASSERT(notit);
-       return notit->cur_packet_offset;
-}
-
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_size(
-               struct bt_msg_iter *notit)
-{
-       BT_ASSERT(notit);
-       return notit->cur_exp_packet_total_size;
-}
-
-BT_HIDDEN
-void bt_msg_trace_class_changed(struct bt_msg_iter *notit)
-{
-       if (notit->meta.tc->stored_value_count > notit->stored_values->len) {
-               g_array_set_size(notit->stored_values,
-                       notit->meta.tc->stored_value_count);
-       }
-}
-
 BT_HIDDEN
 enum bt_msg_iter_status bt_msg_iter_get_packet_properties(
                struct bt_msg_iter *notit,
@@ -2688,10 +2757,8 @@ enum bt_msg_iter_status bt_msg_iter_get_packet_properties(
                goto end;
        }
 
-       props->exp_packet_total_size =
-               (uint64_t) notit->cur_exp_packet_total_size;
-       props->exp_packet_content_size =
-               (uint64_t) notit->cur_exp_packet_content_size;
+       props->exp_packet_total_size = notit->cur_exp_packet_total_size;
+       props->exp_packet_content_size = notit->cur_exp_packet_content_size;
        BT_ASSERT(props->stream_class_id >= 0);
        props->stream_class_id = (uint64_t) notit->cur_stream_class_id;
        props->data_stream_id = notit->cur_data_stream_id;
@@ -2703,3 +2770,17 @@ enum bt_msg_iter_status bt_msg_iter_get_packet_properties(
 end:
        return status;
 }
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit,
+               bool val)
+{
+       notit->emit_stream_begin_msg = val;
+}
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit,
+               bool val)
+{
+       notit->emit_stream_end_msg = val;
+}
index 269d6b28ee9cd293bcde5377de5de9ecd20e393b..b7830b90f1f957f44689c558bc045b3c22317faf 100644 (file)
@@ -3,7 +3,7 @@
 
 /*
  * Babeltrace - CTF message iterator
- *                  ¯¯¯¯¯        ¯¯¯¯
+ *
  * Copyright (c) 2015-2016 EfficiOS Inc. and Linux Foundation
  * Copyright (c) 2015-2016 Philippe Proulx <pproulx@efficios.com>
  *
@@ -38,7 +38,7 @@
  * @file ctf-msg-iter.h
  *
  * CTF message iterator
- *     ¯¯¯¯¯        ¯¯¯¯
+ *
  * This is a common internal API used by CTF source plugins. It allows
  * one to get messages from a user-provided medium.
  */
@@ -193,9 +193,8 @@ struct bt_msg_iter_medium_ops {
         * @param data          User data
         * @returns             Status code (see description above)
         */
-       enum bt_msg_iter_medium_status (* request_bytes)(
-                       size_t request_sz, uint8_t **buffer_addr,
-                       size_t *buffer_sz, void *data);
+       enum bt_msg_iter_medium_status (* request_bytes)(size_t request_sz,
+                       uint8_t **buffer_addr, size_t *buffer_sz, void *data);
 
        /**
         * Repositions the underlying stream's position.
@@ -228,8 +227,7 @@ struct bt_msg_iter_medium_ops {
         * @returns             Stream instance (weak reference) or
         *                      \c NULL on error
         */
-       bt_stream * (* borrow_stream)(
-                       bt_stream_class *stream_class,
+       bt_stream * (* borrow_stream)(bt_stream_class *stream_class,
                        int64_t stream_id, void *data);
 };
 
@@ -291,8 +289,8 @@ enum bt_msg_iter_status bt_msg_iter_get_next_message(
                bt_message **message);
 
 struct bt_msg_iter_packet_properties {
-       uint64_t exp_packet_total_size;
-       uint64_t exp_packet_content_size;
+       int64_t exp_packet_total_size;
+       int64_t exp_packet_content_size;
        uint64_t stream_class_id;
        int64_t data_stream_id;
 
@@ -317,34 +315,23 @@ BT_HIDDEN
 enum bt_msg_iter_status bt_msg_iter_seek(
                struct bt_msg_iter *notit, off_t offset);
 
-/*
- * Get the current packet's offset in bytes relative to the media's initial
- * position.
- */
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_offset(
-               struct bt_msg_iter *notit);
-
-/* Get the current packet's size (in bits). */
-BT_HIDDEN
-off_t bt_msg_iter_get_current_packet_size(
-               struct bt_msg_iter *notit);
-
 /*
  * Resets the iterator so that the next requested medium bytes are
- * assumed to be the first bytes of a new stream. The first message
- * which this iterator emits after calling bt_msg_iter_reset() is a
- * BT_MESSAGE_TYPE_STREAM_BEGINNING one.
+ * assumed to be the first bytes of a new stream. Depending on
+ * bt_msg_iter_set_emit_stream_beginning_message(), the first message
+ * which this iterator emits after calling bt_msg_iter_reset() is of
+ * type `BT_MESSAGE_TYPE_STREAM_BEGINNING`.
  */
 BT_HIDDEN
 void bt_msg_iter_reset(struct bt_msg_iter *notit);
 
-/*
- * Notify the iterator that the trace class changed somehow (new
- * stream/event classes).
- */
 BT_HIDDEN
-void bt_msg_trace_class_changed(struct bt_msg_iter *notit);
+void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit,
+               bool val);
+
+BT_HIDDEN
+void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit,
+               bool val);
 
 static inline
 const char *bt_msg_iter_medium_status_string(
index 45e426bc64ab5b4682f1155d6714bbc327debdf4..bafa80de5a6378fd8c783aa99123e232d070e880 100644 (file)
@@ -173,8 +173,7 @@ end:
 }
 
 static
-bt_stream *medop_borrow_stream(
-               bt_stream_class *stream_class, int64_t stream_id,
+bt_stream *medop_borrow_stream(bt_stream_class *stream_class, int64_t stream_id,
                void *data)
 {
        struct ctf_fs_ds_file *ds_file = data;
@@ -199,9 +198,8 @@ end:
 }
 
 static
-enum bt_msg_iter_medium_status medop_seek(
-               enum bt_msg_iter_seek_whence whence, off_t offset,
-               void *data)
+enum bt_msg_iter_medium_status medop_seek(enum bt_msg_iter_seek_whence whence,
+               off_t offset, void *data)
 {
        enum bt_msg_iter_medium_status ret =
                        BT_MSG_ITER_MEDIUM_STATUS_OK;
@@ -539,6 +537,7 @@ struct ctf_fs_ds_index *build_index_from_stream_file(
        int ret;
        struct ctf_fs_ds_index *index = NULL;
        enum bt_msg_iter_status iter_status;
+       off_t current_packet_offset_bytes = 0;
 
        BT_LOGD("Indexing stream file %s", ds_file->file->path->str);
 
@@ -548,50 +547,57 @@ struct ctf_fs_ds_index *build_index_from_stream_file(
        }
 
        do {
-               off_t current_packet_offset;
-               off_t next_packet_offset;
                off_t current_packet_size_bytes;
                struct ctf_fs_ds_index_entry *entry;
                struct bt_msg_iter_packet_properties props;
 
-               iter_status = bt_msg_iter_get_packet_properties(
-                       ds_file->msg_iter, &props);
+               if (current_packet_offset_bytes < 0) {
+                       BT_LOGE_STR("Cannot get the current packet's offset.");
+                       goto error;
+               } else if (current_packet_offset_bytes > ds_file->file->size) {
+                       BT_LOGE_STR("Unexpected current packet's offset (larger than file).");
+                       goto error;
+               } else if (current_packet_offset_bytes == ds_file->file->size) {
+                       /* No more data */
+                       break;
+               }
+
+               iter_status = bt_msg_iter_seek(ds_file->msg_iter,
+                               current_packet_offset_bytes);
                if (iter_status != BT_MSG_ITER_STATUS_OK) {
-                       if (iter_status == BT_MSG_ITER_STATUS_EOF) {
-                               break;
-                       }
                        goto error;
                }
 
-               current_packet_offset =
-                       bt_msg_iter_get_current_packet_offset(
-                               ds_file->msg_iter);
-               if (current_packet_offset < 0) {
-                       BT_LOGE_STR("Cannot get the current packet's offset.");
+               iter_status = bt_msg_iter_get_packet_properties(
+                       ds_file->msg_iter, &props);
+               if (iter_status != BT_MSG_ITER_STATUS_OK) {
                        goto error;
                }
 
-               current_packet_size_bytes =
-                       ((props.exp_packet_total_size + 7) & ~7) / CHAR_BIT;
+               if (props.exp_packet_total_size >= 0) {
+                       current_packet_size_bytes =
+                               (uint64_t) props.exp_packet_total_size / 8;
+               } else {
+                       current_packet_size_bytes = ds_file->file->size;
+               }
 
-               if (current_packet_offset + current_packet_size_bytes >
+               if (current_packet_offset_bytes + current_packet_size_bytes >
                                ds_file->file->size) {
                        BT_LOGW("Invalid packet size reported in file: stream=\"%s\", "
                                        "packet-offset=%jd, packet-size-bytes=%jd, "
                                        "file-size=%jd",
                                        ds_file->file->path->str,
-                                       current_packet_offset,
+                                       current_packet_offset_bytes,
                                        current_packet_size_bytes,
                                        ds_file->file->size);
                        goto error;
                }
 
-               next_packet_offset = current_packet_offset +
-                       current_packet_size_bytes;
+               current_packet_offset_bytes += current_packet_size_bytes;
                BT_LOGD("Seeking to next packet: current-packet-offset=%jd, "
-                       "next-packet-offset=%jd", current_packet_offset,
-                       next_packet_offset);
-
+                       "next-packet-offset=%jd",
+                       current_packet_offset_bytes - current_packet_size_bytes,
+                       current_packet_offset_bytes);
                entry = ctf_fs_ds_index_add_new_entry(index);
                if (!entry) {
                        BT_LOGE_STR("Failed to allocate a new index entry.");
@@ -599,13 +605,10 @@ struct ctf_fs_ds_index *build_index_from_stream_file(
                }
 
                ret = init_index_entry(entry, ds_file, &props,
-                       current_packet_size_bytes, current_packet_offset);
+                       current_packet_size_bytes, current_packet_offset_bytes);
                if (ret) {
                        goto error;
                }
-
-               iter_status = bt_msg_iter_seek(ds_file->msg_iter,
-                               next_packet_offset);
        } while (iter_status == BT_MSG_ITER_STATUS_OK);
 
        if (iter_status != BT_MSG_ITER_STATUS_EOF) {
index 46086905259d8d9869249b60892a978b60eda76f..f9eb089d670b72da5e0bf8e439e4b3efb58c8a52 100644 (file)
@@ -87,105 +87,67 @@ void ctf_fs_msg_iter_data_destroy(
        g_free(msg_iter_data);
 }
 
+static
+void set_msg_iter_emits_stream_beginning_end_messages(
+               struct ctf_fs_msg_iter_data *msg_iter_data)
+{
+       bt_msg_iter_set_emit_stream_beginning_message(
+               msg_iter_data->ds_file->msg_iter,
+               msg_iter_data->ds_file_info_index == 0);
+       bt_msg_iter_set_emit_stream_end_message(
+               msg_iter_data->ds_file->msg_iter,
+               msg_iter_data->ds_file_info_index ==
+                       msg_iter_data->ds_file_group->ds_file_infos->len - 1);
+}
+
 static
 bt_self_message_iterator_status ctf_fs_iterator_next_one(
                struct ctf_fs_msg_iter_data *msg_iter_data,
-               const bt_message **msg)
+               const bt_message **out_msg)
 {
        bt_self_message_iterator_status status;
-       bt_message *priv_msg;
-       int ret;
 
        BT_ASSERT(msg_iter_data->ds_file);
-       status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
-       *msg = priv_msg;
 
-       if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
-                       bt_message_get_type(*msg) ==
-                       BT_MESSAGE_TYPE_STREAM_BEGINNING) {
-               if (msg_iter_data->skip_stream_begin_msgs) {
-                       /*
-                        * We already emitted a
-                        * BT_MESSAGE_TYPE_STREAM_BEGINNING
-                        * message: skip this one, get a new one.
-                        */
-                       BT_MESSAGE_PUT_REF_AND_RESET(*msg);
-                       status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
-                               &priv_msg);
-                       *msg = priv_msg;
-                       BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
-                       goto end;
-               } else {
-                       /*
-                        * First BT_MESSAGE_TYPE_STREAM_BEGINNING
-                        * message: skip all following.
-                        */
-                       msg_iter_data->skip_stream_begin_msgs = true;
+       while (true) {
+               bt_message *msg;
+
+               status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &msg);
+               switch (status) {
+               case BT_SELF_MESSAGE_ITERATOR_STATUS_OK:
+                       *out_msg = msg;
+                       msg = NULL;
                        goto end;
-               }
-       }
+               case BT_SELF_MESSAGE_ITERATOR_STATUS_END:
+               {
+                       int ret;
+
+                       if (msg_iter_data->ds_file_info_index ==
+                                       msg_iter_data->ds_file_group->ds_file_infos->len - 1) {
+                               /* End of all group's stream files */
+                               goto end;
+                       }
 
-       if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK &&
-                       bt_message_get_type(*msg) ==
-                       BT_MESSAGE_TYPE_STREAM_END) {
-               msg_iter_data->ds_file_info_index++;
+                       msg_iter_data->ds_file_info_index++;
+                       bt_msg_iter_reset(msg_iter_data->msg_iter);
+                       set_msg_iter_emits_stream_beginning_end_messages(
+                               msg_iter_data);
 
-               if (msg_iter_data->ds_file_info_index ==
-                               msg_iter_data->ds_file_group->ds_file_infos->len) {
                        /*
-                        * No more stream files to read: we reached the
-                        * real end. Emit this
-                        * BT_MESSAGE_TYPE_STREAM_END message.
-                        * The next time ctf_fs_iterator_next() is
-                        * called for this message iterator,
-                        * ctf_fs_ds_file_next() will return
-                        * BT_SELF_MESSAGE_ITERATOR_STATUS_END().
+                        * Open and start reading the next stream file
+                        * within our stream file group.
                         */
-                       goto end;
-               }
+                       ret = msg_iter_data_set_current_ds_file(msg_iter_data);
+                       if (ret) {
+                               status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+                               goto end;
+                       }
 
-               BT_MESSAGE_PUT_REF_AND_RESET(*msg);
-               bt_msg_iter_reset(msg_iter_data->msg_iter);
-
-               /*
-                * Open and start reading the next stream file within
-                * our stream file group.
-                */
-               ret = msg_iter_data_set_current_ds_file(msg_iter_data);
-               if (ret) {
-                       status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
-                       goto end;
+                       /* Continue the loop to get the next message */
+                       break;
                }
-
-               status = ctf_fs_ds_file_next(msg_iter_data->ds_file, &priv_msg);
-               *msg = priv_msg;
-
-               /*
-                * If we get a message, we expect to get a
-                * BT_MESSAGE_TYPE_STREAM_BEGINNING message
-                * because the iterator's state machine emits one before
-                * even requesting the first block of data from the
-                * medium. Skip this message because we're not
-                * really starting a new stream here, and try getting a
-                * new message (which, if it works, is a
-                * BT_MESSAGE_TYPE_PACKET_BEGINNING one). We're sure to
-                * get at least one pair of
-                * BT_MESSAGE_TYPE_PACKET_BEGINNING and
-                * BT_MESSAGE_TYPE_PACKET_END messages in the
-                * case of a single, empty packet. We know there's at
-                * least one packet because the stream file group does
-                * not contain empty stream files.
-                */
-               BT_ASSERT(msg_iter_data->skip_stream_begin_msgs);
-
-               if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
-                       BT_ASSERT(bt_message_get_type(*msg) ==
-                               BT_MESSAGE_TYPE_STREAM_BEGINNING);
-                       BT_MESSAGE_PUT_REF_AND_RESET(*msg);
-                       status = ctf_fs_ds_file_next(msg_iter_data->ds_file,
-                               &priv_msg);
-                       *msg = priv_msg;
-                       BT_ASSERT(status != BT_SELF_MESSAGE_ITERATOR_STATUS_END);
+               default:
+                       goto end;
                }
        }
 
@@ -276,6 +238,7 @@ bt_self_message_iterator_status ctf_fs_iterator_init(
                goto error;
        }
 
+       set_msg_iter_emits_stream_beginning_end_messages(msg_iter_data);
        bt_self_message_iterator_set_data(self_msg_iter,
                msg_iter_data);
        if (ret != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
index 56fe4891931ceccfbda0879a0fe284b880457e43..d43885f09fbbbab0f528e315a405364ddce6b6e7 100644 (file)
@@ -149,9 +149,6 @@ struct ctf_fs_msg_iter_data {
 
        /* Owned by this */
        struct bt_msg_iter *msg_iter;
-
-       /* True to skip BT_MESSAGE_TYPE_STREAM_BEGINNING messages */
-       bool skip_stream_begin_msgs;
 };
 
 BT_HIDDEN
This page took 0.039223 seconds and 4 git commands to generate.