X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Fcommon%2Fmsg-iter%2Fmsg-iter.c;h=0671d9c3344bfb4c275f4552cbc51d0a69414a7d;hb=68b66a256a54d32992dfefeaad11eea88b7df234;hp=91d06e9da78355f4c33778ba69790fb741149f75;hpb=83ebb7f1751a20c7ba771442487f863331c754ef;p=babeltrace.git diff --git a/plugins/ctf/common/msg-iter/msg-iter.c b/plugins/ctf/common/msg-iter/msg-iter.c index 91d06e9d..0671d9c3 100644 --- a/plugins/ctf/common/msg-iter/msg-iter.c +++ b/plugins/ctf/common/msg-iter/msg-iter.c @@ -31,10 +31,10 @@ #include #include #include -#include +#include #include -#include -#include +#include +#include #include #include @@ -80,8 +80,14 @@ enum state { STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN, STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE, STATE_AFTER_STREAM_PACKET_CONTEXT, - STATE_EMIT_MSG_NEW_STREAM, - STATE_EMIT_MSG_NEW_PACKET, + STATE_CHECK_EMIT_MSG_STREAM_BEGINNING, + STATE_EMIT_MSG_STREAM_BEGINNING, + STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING, + STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS, + STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS, + STATE_EMIT_MSG_DISCARDED_EVENTS, + STATE_EMIT_MSG_DISCARDED_PACKETS, + STATE_EMIT_MSG_PACKET_BEGINNING, STATE_DSCOPE_EVENT_HEADER_BEGIN, STATE_DSCOPE_EVENT_HEADER_CONTINUE, STATE_AFTER_EVENT_HEADER, @@ -92,9 +98,20 @@ enum state { STATE_DSCOPE_EVENT_PAYLOAD_BEGIN, STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE, STATE_EMIT_MSG_EVENT, - STATE_EMIT_MSG_END_OF_PACKET, - STATE_DONE, STATE_SKIP_PACKET_PADDING, + STATE_EMIT_MSG_PACKET_END_MULTI, + STATE_EMIT_MSG_PACKET_END_SINGLE, + STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END, + STATE_EMIT_MSG_STREAM_ACTIVITY_END, + STATE_EMIT_MSG_STREAM_END, + STATE_DONE, +}; + +struct end_of_packet_snapshots { + uint64_t discarded_events; + uint64_t packets; + uint64_t beginning_clock; + uint64_t end_clock; }; /* CTF message iterator */ @@ -105,6 +122,18 @@ struct bt_msg_iter { /* Current message iterator to create messages (weak) */ bt_self_message_iterator *msg_iter; + /* + * True to emit stream beginning and stream activity beginning + * messages. + */ + bool emit_stream_begin_msg; + + /* True to emit stream end and stream activity end messages */ + bool emit_stream_end_msg; + + /* True to set the stream */ + bool set_stream; + /* * Current dynamic scope field pointer. * @@ -183,9 +212,6 @@ struct bt_msg_iter { void *data; } medium; - /* Stream beginning was emitted */ - bool stream_begin_emitted; - /* Current packet size (bits) (-1 if unknown) */ int64_t cur_exp_packet_total_size; @@ -210,13 +236,11 @@ struct bt_msg_iter { /* Default clock's current value */ uint64_t default_clock_snapshot; - /* End of packet snapshots */ - struct { - uint64_t discarded_events; - uint64_t packets; - uint64_t beginning_clock; - uint64_t end_clock; - } snapshots; + /* End of current packet snapshots */ + struct end_of_packet_snapshots snapshots; + + /* End of previous packet snapshots */ + struct end_of_packet_snapshots prev_packet_snapshots; /* Stored values (for sequence lengths, variant tags) */ GArray *stored_values; @@ -240,10 +264,16 @@ const char *state_string(enum state state) return "STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE"; case STATE_AFTER_STREAM_PACKET_CONTEXT: return "STATE_AFTER_STREAM_PACKET_CONTEXT"; - case STATE_EMIT_MSG_NEW_PACKET: - return "STATE_EMIT_MSG_NEW_PACKET"; - case STATE_EMIT_MSG_NEW_STREAM: - return "STATE_EMIT_MSG_NEW_STREAM"; + case STATE_EMIT_MSG_STREAM_BEGINNING: + return "STATE_EMIT_MSG_STREAM_BEGINNING"; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING"; + case STATE_EMIT_MSG_PACKET_BEGINNING: + return "STATE_EMIT_MSG_PACKET_BEGINNING"; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + return "STATE_EMIT_MSG_DISCARDED_EVENTS"; + case STATE_EMIT_MSG_DISCARDED_PACKETS: + return "STATE_EMIT_MSG_DISCARDED_PACKETS"; case STATE_DSCOPE_EVENT_HEADER_BEGIN: return "STATE_DSCOPE_EVENT_HEADER_BEGIN"; case STATE_DSCOPE_EVENT_HEADER_CONTINUE: @@ -264,12 +294,18 @@ const char *state_string(enum state state) return "STATE_DSCOPE_EVENT_PAYLOAD_CONTINUE"; case STATE_EMIT_MSG_EVENT: return "STATE_EMIT_MSG_EVENT"; - case STATE_EMIT_MSG_END_OF_PACKET: - return "STATE_EMIT_MSG_END_OF_PACKET"; - case STATE_DONE: - return "STATE_DONE"; case STATE_SKIP_PACKET_PADDING: return "STATE_SKIP_PACKET_PADDING"; + case STATE_EMIT_MSG_PACKET_END_MULTI: + return "STATE_EMIT_MSG_PACKET_END_MULTI"; + case STATE_EMIT_MSG_PACKET_END_SINGLE: + return "STATE_EMIT_MSG_PACKET_END_SINGLE"; + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + return "STATE_EMIT_MSG_STREAM_ACTIVITY_END"; + case STATE_EMIT_MSG_STREAM_END: + return "STATE_EMIT_MSG_STREAM_END"; + case STATE_DONE: + return "STATE_DONE"; default: return "(unknown)"; } @@ -642,6 +678,23 @@ enum bt_msg_iter_status read_packet_header_begin_state( goto end; } + /* + * Make sure at least one bit is available for this packet. An + * empty packet is impossible. If we reach the end of the medium + * at this point, then it's considered the end of the stream. + */ + ret = buf_ensure_available_bits(notit); + switch (ret) { + case BT_MSG_ITER_STATUS_OK: + break; + case BT_MSG_ITER_STATUS_EOF: + ret = BT_MSG_ITER_STATUS_OK; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; + goto end; + default: + goto end; + } + /* Packet header class is common to the whole trace class. */ packet_header_fc = notit->meta.tc->packet_header_fc; if (!packet_header_fc) { @@ -916,14 +969,8 @@ enum bt_msg_iter_status set_current_packet_content_sizes( if (notit->cur_exp_packet_total_size == -1) { if (notit->cur_exp_packet_content_size != -1) { - BT_LOGW("Content size is set, but packet size is not: " - "notit-addr=%p, packet-context-field-addr=%p, " - "packet-size=%" PRId64 ", content-size=%" PRId64, - notit, notit->dscopes.stream_packet_context, - notit->cur_exp_packet_total_size, - notit->cur_exp_packet_content_size); - status = BT_MSG_ITER_STATUS_ERROR; - goto end; + notit->cur_exp_packet_total_size = + notit->cur_exp_packet_content_size; } } else { if (notit->cur_exp_packet_content_size == -1) { @@ -932,6 +979,11 @@ enum bt_msg_iter_status set_current_packet_content_sizes( } } + BT_ASSERT((notit->cur_exp_packet_total_size >= 0 && + notit->cur_exp_packet_content_size >= 0) || + (notit->cur_exp_packet_total_size < 0 && + notit->cur_exp_packet_content_size < 0)); + if (notit->cur_exp_packet_content_size > notit->cur_exp_packet_total_size) { BT_LOGW("Invalid packet or content size: " @@ -949,13 +1001,13 @@ enum bt_msg_iter_status set_current_packet_content_sizes( "notit-addr=%p, packet-size=%" PRIu64 ", content-size=%" PRIu64, notit, notit->cur_exp_packet_total_size, notit->cur_exp_packet_content_size); + end: return status; } static -enum bt_msg_iter_status after_packet_context_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status; @@ -964,10 +1016,15 @@ enum bt_msg_iter_status after_packet_context_state( goto end; } - if (notit->stream_begin_emitted) { - notit->state = STATE_EMIT_MSG_NEW_PACKET; + if (notit->stream) { + /* + * Stream exists, which means we already emitted at + * least one packet beginning message, so the initial + * stream beginning message was also emitted. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; } else { - notit->state = STATE_EMIT_MSG_NEW_STREAM; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING; } end: @@ -975,8 +1032,7 @@ end: } static -enum bt_msg_iter_status read_event_header_begin_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status read_event_header_begin_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; struct ctf_field_class *event_header_fc = NULL; @@ -992,7 +1048,7 @@ enum bt_msg_iter_status read_event_header_begin_state( /* No more events! */ BT_LOGV("Reached end of packet: notit-addr=%p, " "cur=%zu", notit, packet_at(notit)); - notit->state = STATE_EMIT_MSG_END_OF_PACKET; + notit->state = STATE_EMIT_MSG_PACKET_END_MULTI; goto end; } else if (unlikely(packet_at(notit) > notit->cur_exp_packet_content_size)) { @@ -1011,22 +1067,14 @@ enum bt_msg_iter_status read_event_header_begin_state( * nothing else for us. */ status = buf_ensure_available_bits(notit); - if (status != BT_MSG_ITER_STATUS_OK) { - /* - * If this function returns - * `BT_MSG_ITER_STATUS_EOF`: - * - * 1. bt_msg_iter_get_next_message() - * emits a "packet end" message. This - * resets the current packet. The state - * remains unchanged otherwise. - * 2. This function is called again. It returns - * `BT_MSG_ITER_STATUS_EOF` again. - * 3. bt_msg_iter_get_next_message() - * emits a "stream end" message because - * there's no current packet. It sets the - * current state to `STATE_DONE`. - */ + switch (status) { + case BT_MSG_ITER_STATUS_OK: + break; + case BT_MSG_ITER_STATUS_EOF: + status = BT_MSG_ITER_STATUS_OK; + notit->state = STATE_EMIT_MSG_PACKET_END_SINGLE; + goto end; + default: goto end; } } @@ -1134,8 +1182,17 @@ enum bt_msg_iter_status set_current_event_message( notit->meta.ec->name->str, notit->packet); BT_ASSERT(notit->msg_iter); - msg = bt_message_event_create(notit->msg_iter, - notit->meta.ec->ir_ec, notit->packet); + BT_ASSERT(notit->meta.sc); + + if (bt_stream_class_borrow_default_clock_class(notit->meta.sc->ir_sc)) { + msg = bt_message_event_create_with_default_clock_snapshot( + notit->msg_iter, notit->meta.ec->ir_ec, + notit->packet, notit->default_clock_snapshot); + } else { + msg = bt_message_event_create(notit->msg_iter, + notit->meta.ec->ir_ec, notit->packet); + } + if (!msg) { BT_LOGE("Cannot create event message: " "notit-addr=%p, ec-addr=%p, ec-name=\"%s\", " @@ -1347,8 +1404,7 @@ enum bt_msg_iter_status read_event_payload_continue_state( } static -enum bt_msg_iter_status skip_packet_padding_state( - struct bt_msg_iter *notit) +enum bt_msg_iter_status skip_packet_padding_state(struct bt_msg_iter *notit) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; size_t bits_to_skip; @@ -1384,6 +1440,127 @@ end: return status; } +static +enum bt_msg_iter_status check_emit_msg_stream_beginning_state( + struct bt_msg_iter *notit) +{ + enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; + + if (notit->set_stream) { + status = set_current_stream(notit); + if (status != BT_MSG_ITER_STATUS_OK) { + goto end; + } + } + + if (notit->emit_stream_begin_msg) { + notit->state = STATE_EMIT_MSG_STREAM_BEGINNING; + } else { + /* Stream's first packet */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; + } + +end: + return status; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_events( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_EVENTS; + + if (!notit->meta.sc->has_discarded_events) { + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + goto end; + } + + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + if (notit->snapshots.discarded_events == 0 || + notit->snapshots.discarded_events == UINT64_C(-1)) { + /* + * Stream's first packet with no discarded + * events or no information about discarded + * events: do not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.discarded_events != UINT64_C(-1)); + + if (notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events == 0) { + /* + * No discarded events since previous packet: do + * not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } + +end: + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_packets( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_PACKETS; + + if (!notit->meta.sc->has_discarded_packets) { + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + goto end; + } + + if (notit->prev_packet_snapshots.packets == UINT64_C(-1)) { + /* + * Stream's first packet or no information about + * discarded packets: do not emit. In other words, if + * this is the first packet and its sequence number is + * not 0, do not consider that packets were previously + * lost: we might be reading a partial stream (LTTng + * snapshot for example). + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.packets != UINT64_C(-1)); + + if (notit->snapshots.packets - + notit->prev_packet_snapshots.packets <= 1) { + /* + * No discarded packets since previous packet: + * do not emit. + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } + } + +end: + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_stream_activity_end( + struct bt_msg_iter *notit) +{ + if (notit->emit_stream_end_msg) { + notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END; + } else { + notit->state = STATE_DONE; + } + + return BT_MSG_ITER_STATUS_OK; +} + static inline enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) { @@ -1416,10 +1593,28 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) case STATE_AFTER_STREAM_PACKET_CONTEXT: status = after_packet_context_state(notit); break; - case STATE_EMIT_MSG_NEW_STREAM: - notit->state = STATE_EMIT_MSG_NEW_PACKET; + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: + status = check_emit_msg_stream_beginning_state(notit); + break; + case STATE_EMIT_MSG_STREAM_BEGINNING: + notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING; + break; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; + break; + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + status = check_emit_msg_discarded_events(notit); + break; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + break; + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + status = check_emit_msg_discarded_packets(notit); break; - case STATE_EMIT_MSG_NEW_PACKET: + case STATE_EMIT_MSG_DISCARDED_PACKETS: + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + break; + case STATE_EMIT_MSG_PACKET_BEGINNING: notit->state = STATE_DSCOPE_EVENT_HEADER_BEGIN; break; case STATE_DSCOPE_EVENT_HEADER_BEGIN: @@ -1455,9 +1650,23 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) case STATE_SKIP_PACKET_PADDING: status = skip_packet_padding_state(notit); break; - case STATE_EMIT_MSG_END_OF_PACKET: + case STATE_EMIT_MSG_PACKET_END_MULTI: notit->state = STATE_SKIP_PACKET_PADDING; break; + case STATE_EMIT_MSG_PACKET_END_SINGLE: + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; + break; + case STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END: + status = check_emit_msg_stream_activity_end(notit); + break; + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + notit->state = STATE_EMIT_MSG_STREAM_END; + break; + case STATE_EMIT_MSG_STREAM_END: + notit->state = STATE_DONE; + break; + case STATE_DONE: + break; default: BT_LOGD("Unknown CTF plugin message iterator state: " "notit-addr=%p, state=%d", notit, notit->state); @@ -1471,11 +1680,8 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) return status; } -/** - * Resets the internal state of a CTF message iterator. - */ BT_HIDDEN -void bt_msg_iter_reset(struct bt_msg_iter *notit) +void bt_msg_iter_reset_for_next_stream_file(struct bt_msg_iter *notit) { BT_ASSERT(notit); BT_LOGD("Resetting message iterator: addr=%p", notit); @@ -1502,10 +1708,28 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit) notit->cur_exp_packet_content_size = -1; notit->cur_exp_packet_total_size = -1; notit->cur_packet_offset = -1; - notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; + notit->snapshots.beginning_clock = UINT64_C(-1); + notit->snapshots.end_clock = UINT64_C(-1); +} + +/** + * Resets the internal state of a CTF message iterator. + */ +BT_HIDDEN +void bt_msg_iter_reset(struct bt_msg_iter *notit) +{ + bt_msg_iter_reset_for_next_stream_file(notit); + notit->cur_stream_class_id = -1; notit->cur_data_stream_id = -1; - notit->stream_begin_emitted = false; + notit->emit_stream_begin_msg = true; + notit->emit_stream_end_msg = true; + notit->snapshots.discarded_events = UINT64_C(-1); + notit->snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.discarded_events = UINT64_C(-1); + notit->prev_packet_snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.beginning_clock = UINT64_C(-1); + notit->prev_packet_snapshots.end_clock = UINT64_C(-1); } static @@ -1562,6 +1786,7 @@ int bt_msg_iter_switch_packet(struct bt_msg_iter *notit) notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; notit->cur_data_stream_id = -1; + notit->prev_packet_snapshots = notit->snapshots; notit->snapshots.discarded_events = UINT64_C(-1); notit->snapshots.packets = UINT64_C(-1); notit->snapshots.beginning_clock = UINT64_C(-1); @@ -2075,11 +2300,23 @@ int64_t bfcr_get_sequence_length_cb(struct ctf_field_class *fc, void *data) seq_fc->stored_length_index); seq_field = stack_top(notit->stack)->base; BT_ASSERT(seq_field); - ret = bt_field_dynamic_array_set_length(seq_field, (uint64_t) length); - if (ret) { - BT_LOGE("Cannot set dynamic array field's length field: " - "notit-addr=%p, field-addr=%p, " - "length=%" PRIu64, notit, seq_field, length); + + /* + * bfcr_get_sequence_length_cb() also gets called back for a + * text sequence, but the destination field is a string field. + * Only set the field's sequence length if the destination field + * is a sequence field. + */ + if (!seq_fc->base.is_text) { + BT_ASSERT(bt_field_get_class_type(seq_field) == + BT_FIELD_CLASS_TYPE_DYNAMIC_ARRAY); + ret = bt_field_dynamic_array_set_length(seq_field, + (uint64_t) length); + if (ret) { + BT_LOGE("Cannot set dynamic array field's length field: " + "notit-addr=%p, field-addr=%p, " + "length=%" PRIu64, notit, seq_field, length); + } } return length; @@ -2165,52 +2402,73 @@ end: } static -void set_event_default_clock_snapshot(struct bt_msg_iter *notit) +void create_msg_stream_beginning(struct bt_msg_iter *notit, + bt_message **message) { - bt_event *event = - bt_message_event_borrow_event( - notit->event_msg); - bt_stream_class *sc = notit->meta.sc->ir_sc; + bt_message *ret = NULL; - BT_ASSERT(event); + BT_ASSERT(notit->stream); + BT_ASSERT(notit->msg_iter); + ret = bt_message_stream_beginning_create(notit->msg_iter, + notit->stream); + if (!ret) { + BT_LOGE("Cannot create stream beginning message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } - if (bt_stream_class_borrow_default_clock_class(sc)) { - bt_event_set_default_clock_snapshot(event, - notit->default_clock_snapshot); + *message = ret; +} + +static +void create_msg_stream_activity_beginning(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *ret = NULL; + + BT_ASSERT(notit->stream); + BT_ASSERT(notit->msg_iter); + ret = bt_message_stream_activity_beginning_create(notit->msg_iter, + notit->stream); + if (!ret) { + BT_LOGE("Cannot create stream activity beginning message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; } + + *message = ret; } static -void notify_new_stream(struct bt_msg_iter *notit, +void create_msg_stream_activity_end(struct bt_msg_iter *notit, bt_message **message) { - enum bt_msg_iter_status status; bt_message *ret = NULL; - status = set_current_stream(notit); - if (status != BT_MSG_ITER_STATUS_OK) { - BT_MESSAGE_PUT_REF_AND_RESET(ret); - goto end; + if (!notit->stream) { + BT_LOGE("Cannot create stream for stream message: " + "notit-addr=%p", notit); + return; } BT_ASSERT(notit->stream); BT_ASSERT(notit->msg_iter); - ret = bt_message_stream_beginning_create(notit->msg_iter, + ret = bt_message_stream_activity_end_create(notit->msg_iter, notit->stream); if (!ret) { - BT_LOGE("Cannot create stream beginning message: " + BT_LOGE("Cannot create stream activity end message: " "notit-addr=%p, stream-addr=%p", notit, notit->stream); return; } -end: *message = ret; } static -void notify_end_of_stream(struct bt_msg_iter *notit, - bt_message **message) +void create_msg_stream_end(struct bt_msg_iter *notit, bt_message **message) { bt_message *ret; @@ -2224,16 +2482,17 @@ void notify_end_of_stream(struct bt_msg_iter *notit, ret = bt_message_stream_end_create(notit->msg_iter, notit->stream); if (!ret) { - BT_LOGE("Cannot create stream beginning message: " + BT_LOGE("Cannot create stream end message: " "notit-addr=%p, stream-addr=%p", notit, notit->stream); return; } + *message = ret; } static -void notify_new_packet(struct bt_msg_iter *notit, +void create_msg_packet_beginning(struct bt_msg_iter *notit, bt_message **message) { int ret; @@ -2250,30 +2509,6 @@ void notify_new_packet(struct bt_msg_iter *notit, sc = notit->meta.sc->ir_sc; BT_ASSERT(sc); - if (bt_stream_class_packets_have_discarded_event_counter_snapshot(sc)) { - BT_ASSERT(notit->snapshots.discarded_events != UINT64_C(-1)); - bt_packet_set_discarded_event_counter_snapshot( - notit->packet, notit->snapshots.discarded_events); - } - - if (bt_stream_class_packets_have_packet_counter_snapshot(sc)) { - BT_ASSERT(notit->snapshots.packets != UINT64_C(-1)); - bt_packet_set_packet_counter_snapshot( - notit->packet, notit->snapshots.packets); - } - - if (bt_stream_class_packets_have_default_beginning_clock_snapshot(sc)) { - BT_ASSERT(notit->snapshots.beginning_clock != UINT64_C(-1)); - bt_packet_set_default_beginning_clock_snapshot( - notit->packet, notit->snapshots.beginning_clock); - } - - if (bt_stream_class_packets_have_default_end_clock_snapshot(sc)) { - BT_ASSERT(notit->snapshots.end_clock != UINT64_C(-1)); - bt_packet_set_default_end_clock_snapshot( - notit->packet, notit->snapshots.end_clock); - } - if (notit->packet_context_field) { ret = bt_packet_move_context_field( notit->packet, notit->packet_context_field); @@ -2294,8 +2529,17 @@ void notify_new_packet(struct bt_msg_iter *notit, } BT_ASSERT(notit->msg_iter); - msg = bt_message_packet_beginning_create(notit->msg_iter, - notit->packet); + + if (notit->meta.sc->packets_have_ts_begin) { + BT_ASSERT(notit->snapshots.beginning_clock != UINT64_C(-1)); + msg = bt_message_packet_beginning_create_with_default_clock_snapshot( + notit->msg_iter, notit->packet, + notit->snapshots.beginning_clock); + } else { + msg = bt_message_packet_beginning_create(notit->msg_iter, + notit->packet); + } + if (!msg) { BT_LOGE("Cannot create packet beginning message: " "notit-addr=%p, packet-addr=%p", @@ -2310,8 +2554,7 @@ end: } static -void notify_end_of_packet(struct bt_msg_iter *notit, - bt_message **message) +void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message) { bt_message *msg; @@ -2325,8 +2568,17 @@ void notify_end_of_packet(struct bt_msg_iter *notit, } BT_ASSERT(notit->msg_iter); - msg = bt_message_packet_end_create(notit->msg_iter, - notit->packet); + + if (notit->meta.sc->packets_have_ts_end) { + BT_ASSERT(notit->snapshots.end_clock != UINT64_C(-1)); + msg = bt_message_packet_end_create_with_default_clock_snapshot( + notit->msg_iter, notit->packet, + notit->snapshots.end_clock); + } else { + msg = bt_message_packet_end_create(notit->msg_iter, + notit->packet); + } + if (!msg) { BT_LOGE("Cannot create packet end message: " "notit-addr=%p, packet-addr=%p", @@ -2339,6 +2591,95 @@ void notify_end_of_packet(struct bt_msg_iter *notit, *message = msg; } +static +void create_msg_discarded_events(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + uint64_t beginning_raw_value = UINT64_C(-1); + uint64_t end_raw_value = UINT64_C(-1); + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + BT_ASSERT(notit->meta.sc->has_discarded_events); + + if (notit->meta.sc->discarded_events_have_default_cs) { + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + /* + * We discarded events, but before (and possibly + * including) the current packet: use this packet's time + * range, and do not have a specific count. + */ + beginning_raw_value = notit->snapshots.beginning_clock; + end_raw_value = notit->snapshots.end_clock; + } else { + beginning_raw_value = notit->prev_packet_snapshots.end_clock; + end_raw_value = notit->snapshots.end_clock; + } + + BT_ASSERT(beginning_raw_value != UINT64_C(-1)); + BT_ASSERT(end_raw_value != UINT64_C(-1)); + msg = bt_message_discarded_events_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, beginning_raw_value, + end_raw_value); + } else { + msg = bt_message_discarded_events_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded events message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + if (notit->prev_packet_snapshots.discarded_events != UINT64_C(-1)) { + bt_message_discarded_events_set_count(msg, + notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events); + } + + *message = msg; +} + +static +void create_msg_discarded_packets(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + BT_ASSERT(notit->meta.sc->has_discarded_packets); + BT_ASSERT(notit->prev_packet_snapshots.packets != + UINT64_C(-1)); + + if (notit->meta.sc->discarded_packets_have_default_cs) { + BT_ASSERT(notit->prev_packet_snapshots.end_clock != UINT64_C(-1)); + BT_ASSERT(notit->snapshots.beginning_clock != UINT64_C(-1)); + msg = bt_message_discarded_packets_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, + notit->prev_packet_snapshots.end_clock, + notit->snapshots.beginning_clock); + } else { + msg = bt_message_discarded_packets_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded packets message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + bt_message_discarded_packets_set_count(msg, + notit->snapshots.packets - + notit->prev_packet_snapshots.packets - 1); + *message = msg; +} + BT_HIDDEN struct bt_msg_iter *bt_msg_iter_create(struct ctf_trace_class *tc, size_t max_request_sz, @@ -2435,95 +2776,108 @@ void bt_msg_iter_destroy(struct bt_msg_iter *notit) enum bt_msg_iter_status bt_msg_iter_get_next_message( struct bt_msg_iter *notit, - bt_self_message_iterator *msg_iter, - bt_message **message) + bt_self_message_iterator *msg_iter, bt_message **message) { enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; BT_ASSERT(notit); BT_ASSERT(message); - - if (notit->state == STATE_DONE) { - status = BT_MSG_ITER_STATUS_EOF; - goto end; - } - notit->msg_iter = msg_iter; - + notit->set_stream = true; BT_LOGV("Getting next message: notit-addr=%p", notit); while (true) { status = handle_state(notit); - if (status == BT_MSG_ITER_STATUS_AGAIN) { + if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) { BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN."); goto end; + } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) { + BT_LOGW("Cannot handle state: notit-addr=%p, state=%s", + notit, state_string(notit->state)); + goto end; } - if (status != BT_MSG_ITER_STATUS_OK) { - if (status == BT_MSG_ITER_STATUS_EOF) { - enum state next_state = notit->state; - - BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF."); - - if (notit->packet) { - notify_end_of_packet(notit, - message); - } else { - notify_end_of_stream(notit, - message); - next_state = STATE_DONE; - } - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - goto end; - } - - status = BT_MSG_ITER_STATUS_OK; - notit->state = next_state; - } else { - BT_LOGW("Cannot handle state: " - "notit-addr=%p, state=%s", - notit, state_string(notit->state)); + switch (notit->state) { + case STATE_EMIT_MSG_EVENT: + BT_ASSERT(notit->event_msg); + *message = notit->event_msg; + notit->event_msg = NULL; + goto end; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + /* create_msg_discared_events() logs errors */ + create_msg_discarded_events(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } goto end; - } + case STATE_EMIT_MSG_DISCARDED_PACKETS: + /* create_msg_discared_packets() logs errors */ + create_msg_discarded_packets(notit, message); - switch (notit->state) { - case STATE_EMIT_MSG_NEW_STREAM: - /* notify_new_stream() logs errors */ - notify_new_stream(notit, message); + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_PACKET_BEGINNING: + /* create_msg_packet_beginning() logs errors */ + create_msg_packet_beginning(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } - notit->stream_begin_emitted = true; goto end; - case STATE_EMIT_MSG_NEW_PACKET: - /* notify_new_packet() logs errors */ - notify_new_packet(notit, message); + case STATE_EMIT_MSG_PACKET_END_SINGLE: + case STATE_EMIT_MSG_PACKET_END_MULTI: + /* create_msg_packet_end() logs errors */ + create_msg_packet_end(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } goto end; - case STATE_EMIT_MSG_EVENT: - BT_ASSERT(notit->event_msg); - set_event_default_clock_snapshot(notit); - *message = notit->event_msg; - notit->event_msg = NULL; + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + /* create_msg_stream_activity_beginning() logs errors */ + create_msg_stream_activity_beginning(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + goto end; - case STATE_EMIT_MSG_END_OF_PACKET: - /* notify_end_of_packet() logs errors */ - notify_end_of_packet(notit, message); + case STATE_EMIT_MSG_STREAM_ACTIVITY_END: + /* create_msg_stream_activity_end() logs errors */ + create_msg_stream_activity_end(notit, message); if (!*message) { status = BT_MSG_ITER_STATUS_ERROR; } + goto end; + case STATE_EMIT_MSG_STREAM_BEGINNING: + /* create_msg_stream_beginning() logs errors */ + create_msg_stream_beginning(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_STREAM_END: + /* create_msg_stream_end() logs errors */ + create_msg_stream_end(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_DONE: + status = BT_MSG_ITER_STATUS_EOF; goto end; default: /* Non-emitting state: continue */ @@ -2543,50 +2897,51 @@ enum bt_msg_iter_status read_packet_header_context_fields( enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; BT_ASSERT(notit); + notit->set_stream = false; - if (notit->state == STATE_EMIT_MSG_NEW_PACKET) { + if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) { /* We're already there */ goto end; } while (true) { status = handle_state(notit); - if (status == BT_MSG_ITER_STATUS_AGAIN) { + if (unlikely(status == BT_MSG_ITER_STATUS_AGAIN)) { BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_AGAIN."); goto end; - } - if (status != BT_MSG_ITER_STATUS_OK) { - if (status == BT_MSG_ITER_STATUS_EOF) { - BT_LOGV_STR("Medium returned BT_MSG_ITER_STATUS_EOF."); - } else { - BT_LOGW("Cannot handle state: " - "notit-addr=%p, state=%s", - notit, state_string(notit->state)); - } + } else if (unlikely(status != BT_MSG_ITER_STATUS_OK)) { + BT_LOGW("Cannot handle state: notit-addr=%p, state=%s", + notit, state_string(notit->state)); goto end; } switch (notit->state) { - case STATE_EMIT_MSG_NEW_PACKET: + case STATE_EMIT_MSG_PACKET_BEGINNING: /* * Packet header and context fields are * potentially decoded (or they don't exist). */ goto end; case STATE_INIT: - case STATE_EMIT_MSG_NEW_STREAM: case STATE_DSCOPE_TRACE_PACKET_HEADER_BEGIN: case STATE_DSCOPE_TRACE_PACKET_HEADER_CONTINUE: case STATE_AFTER_TRACE_PACKET_HEADER: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE: case STATE_AFTER_STREAM_PACKET_CONTEXT: + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: + case STATE_EMIT_MSG_STREAM_BEGINNING: + case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + case STATE_EMIT_MSG_DISCARDED_EVENTS: + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + case STATE_EMIT_MSG_DISCARDED_PACKETS: /* Non-emitting state: continue */ break; default: /* * We should never get past the - * STATE_EMIT_MSG_NEW_PACKET state. + * STATE_EMIT_MSG_PACKET_BEGINNING state. */ BT_LOGF("Unexpected state: notit-addr=%p, state=%s", notit, state_string(notit->state)); @@ -2598,7 +2953,6 @@ end: ret = set_current_packet_content_sizes(notit); if (ret) { status = BT_MSG_ITER_STATUS_ERROR; - goto end; } return status; @@ -2613,8 +2967,8 @@ void bt_msg_iter_set_medops_data(struct bt_msg_iter *notit, } BT_HIDDEN -enum bt_msg_iter_status bt_msg_iter_seek( - struct bt_msg_iter *notit, off_t offset) +enum bt_msg_iter_status bt_msg_iter_seek(struct bt_msg_iter *notit, + off_t offset) { enum bt_msg_iter_status ret = BT_MSG_ITER_STATUS_OK; enum bt_msg_iter_medium_status medium_status; @@ -2650,30 +3004,6 @@ end: return ret; } -BT_HIDDEN -off_t bt_msg_iter_get_current_packet_offset(struct bt_msg_iter *notit) -{ - BT_ASSERT(notit); - return notit->cur_packet_offset; -} - -BT_HIDDEN -off_t bt_msg_iter_get_current_packet_size( - struct bt_msg_iter *notit) -{ - BT_ASSERT(notit); - return notit->cur_exp_packet_total_size; -} - -BT_HIDDEN -void bt_msg_trace_class_changed(struct bt_msg_iter *notit) -{ - if (notit->meta.tc->stored_value_count > notit->stored_values->len) { - g_array_set_size(notit->stored_values, - notit->meta.tc->stored_value_count); - } -} - BT_HIDDEN enum bt_msg_iter_status bt_msg_iter_get_packet_properties( struct bt_msg_iter *notit, @@ -2688,11 +3018,8 @@ enum bt_msg_iter_status bt_msg_iter_get_packet_properties( goto end; } - props->exp_packet_total_size = - (uint64_t) notit->cur_exp_packet_total_size; - props->exp_packet_content_size = - (uint64_t) notit->cur_exp_packet_content_size; - BT_ASSERT(props->stream_class_id >= 0); + props->exp_packet_total_size = notit->cur_exp_packet_total_size; + props->exp_packet_content_size = notit->cur_exp_packet_content_size; props->stream_class_id = (uint64_t) notit->cur_stream_class_id; props->data_stream_id = notit->cur_data_stream_id; props->snapshots.discarded_events = notit->snapshots.discarded_events; @@ -2703,3 +3030,17 @@ enum bt_msg_iter_status bt_msg_iter_get_packet_properties( end: return status; } + +BT_HIDDEN +void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit, + bool val) +{ + notit->emit_stream_begin_msg = val; +} + +BT_HIDDEN +void bt_msg_iter_set_emit_stream_end_message(struct bt_msg_iter *notit, + bool val) +{ + notit->emit_stream_end_msg = val; +}