From a1c0a0be5aef6e191eac8bec2c81c518c5200721 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Thu, 14 Feb 2019 18:12:34 -0500 Subject: [PATCH] src.ctf.fs: emit discarded events/packets messages Make a `src.ctf.fs` message iterator emit appropriate discarded events and packets messages when needed. For the moment the information is redundant with the equivalent counters within `bt_packet` objects, but those will be removed eventually before 2.0-rc1. To avoid getting a discarded packets message (therefore a warning with `sink.text.pretty`) for any partial trace, like it's the case with LTTng snapshots, it is not emitted when a stream's first packet's sequence number is not 0. However, if the stream's first packet's discarded event counter is greater than 0, a discarded events message is emitted, indicating an unknown number of discarded events between the packet's beginning time and the packet's end time. I believe this is the best approximation we can get for this exceptional scenario. I refactored some states of `msg-iter.c`, adding states dedicated to check if we have to emit or not some types of messages. Emitting or not stream beginning/end and stream activity beginning/end messages is handled by such states now: for example, check_emit_msg_stream_beginning_state() sets the current state to `STATE_EMIT_MSG_STREAM_BEGINNING` or `STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS` depending on how bt_msg_iter_set_emit_stream_beginning_message() was called the last time. This makes bt_msg_iter_get_next_message() unconditionally create messages when getting emitting states, as some are completely skipped by the checking states. The `STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS` checks whether or not a discarded events message must be emitted, and `STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS` checks whether or not a discarded packets message must be emitted. Before a packet beginning message, a discarded events message can be emitted, and then a discarded packets message can be emitted, in this order. As of this patch, all other component classes ignore those messages. Signed-off-by: Philippe Proulx --- plugins/ctf/common/msg-iter/msg-iter.c | 369 +++++++++++++++++++++---- plugins/ctf/common/msg-iter/msg-iter.h | 6 + plugins/ctf/fs-src/fs.c | 3 +- 3 files changed, 317 insertions(+), 61 deletions(-) diff --git a/plugins/ctf/common/msg-iter/msg-iter.c b/plugins/ctf/common/msg-iter/msg-iter.c index 907b299a7..8e45e6edf 100644 --- a/plugins/ctf/common/msg-iter/msg-iter.c +++ b/plugins/ctf/common/msg-iter/msg-iter.c @@ -80,8 +80,13 @@ enum state { STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN, STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE, STATE_AFTER_STREAM_PACKET_CONTEXT, + STATE_CHECK_EMIT_MSG_STREAM_BEGINNING, STATE_EMIT_MSG_STREAM_BEGINNING, STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING, + STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS, + STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS, + STATE_EMIT_MSG_DISCARDED_EVENTS, + STATE_EMIT_MSG_DISCARDED_PACKETS, STATE_EMIT_MSG_PACKET_BEGINNING, STATE_DSCOPE_EVENT_HEADER_BEGIN, STATE_DSCOPE_EVENT_HEADER_CONTINUE, @@ -96,11 +101,19 @@ enum state { STATE_SKIP_PACKET_PADDING, STATE_EMIT_MSG_PACKET_END_MULTI, STATE_EMIT_MSG_PACKET_END_SINGLE, + STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END, STATE_EMIT_MSG_STREAM_ACTIVITY_END, STATE_EMIT_MSG_STREAM_END, STATE_DONE, }; +struct end_of_packet_snapshots { + uint64_t discarded_events; + uint64_t packets; + uint64_t beginning_clock; + uint64_t end_clock; +}; + /* CTF message iterator */ struct bt_msg_iter { /* Visit stack */ @@ -115,11 +128,12 @@ struct bt_msg_iter { */ bool emit_stream_begin_msg; - /* - * True to emit stream end and stream activity end messages. - */ + /* True to emit stream end and stream activity end messages */ bool emit_stream_end_msg; + /* True to set the stream */ + bool set_stream; + /* * Current dynamic scope field pointer. * @@ -222,13 +236,11 @@ struct bt_msg_iter { /* Default clock's current value */ uint64_t default_clock_snapshot; - /* End of packet snapshots */ - struct { - uint64_t discarded_events; - uint64_t packets; - uint64_t beginning_clock; - uint64_t end_clock; - } snapshots; + /* End of current packet snapshots */ + struct end_of_packet_snapshots snapshots; + + /* End of previous packet snapshots */ + struct end_of_packet_snapshots prev_packet_snapshots; /* Stored values (for sequence lengths, variant tags) */ GArray *stored_values; @@ -258,6 +270,10 @@ const char *state_string(enum state state) return "STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING"; case STATE_EMIT_MSG_PACKET_BEGINNING: return "STATE_EMIT_MSG_PACKET_BEGINNING"; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + return "STATE_EMIT_MSG_DISCARDED_EVENTS"; + case STATE_EMIT_MSG_DISCARDED_PACKETS: + return "STATE_EMIT_MSG_DISCARDED_PACKETS"; case STATE_DSCOPE_EVENT_HEADER_BEGIN: return "STATE_DSCOPE_EVENT_HEADER_BEGIN"; case STATE_DSCOPE_EVENT_HEADER_CONTINUE: @@ -673,7 +689,7 @@ enum bt_msg_iter_status read_packet_header_begin_state( break; case BT_MSG_ITER_STATUS_EOF: ret = BT_MSG_ITER_STATUS_OK; - notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; goto end; default: goto end; @@ -1007,9 +1023,9 @@ enum bt_msg_iter_status after_packet_context_state(struct bt_msg_iter *notit) * least one packet beginning message, so the initial * stream beginning message was also emitted. */ - notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; } else { - notit->state = STATE_EMIT_MSG_STREAM_BEGINNING; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_BEGINNING; } end: @@ -1416,6 +1432,115 @@ end: return status; } +static +enum bt_msg_iter_status check_emit_msg_stream_beginning_state( + struct bt_msg_iter *notit) +{ + enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; + + if (notit->set_stream) { + status = set_current_stream(notit); + if (status != BT_MSG_ITER_STATUS_OK) { + goto end; + } + } + + if (notit->emit_stream_begin_msg) { + notit->state = STATE_EMIT_MSG_STREAM_BEGINNING; + } else { + /* Stream's first packet */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; + } + +end: + return status; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_events( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_EVENTS; + + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + if (notit->snapshots.discarded_events == 0 || + notit->snapshots.discarded_events == UINT64_C(-1)) { + /* + * Stream's first packet with no discarded + * events or no information about discarded + * events: do not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.discarded_events != UINT64_C(-1)); + + if (notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events == 0) { + /* + * No discarded events since previous packet: do + * not emit. + */ + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + } + } + + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_discarded_packets( + struct bt_msg_iter *notit) +{ + notit->state = STATE_EMIT_MSG_DISCARDED_PACKETS; + + if (notit->prev_packet_snapshots.packets == UINT64_C(-1)) { + /* + * Stream's first packet or no information about + * discarded packets: do not emit. In other words, if + * this is the first packet and its sequence number is + * not 0, do not consider that packets were previously + * lost: we might be reading a partial stream (LTTng + * snapshot for example). + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } else { + /* + * If the previous packet has a value for this counter, + * then this counter is defined for the whole stream. + */ + BT_ASSERT(notit->snapshots.packets != UINT64_C(-1)); + + if (notit->snapshots.packets - + notit->prev_packet_snapshots.packets <= 1) { + /* + * No discarded packets since previous packet: + * do not emit. + */ + notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; + } + } + + return BT_MSG_ITER_STATUS_OK; +} + +static +enum bt_msg_iter_status check_emit_msg_stream_activity_end( + struct bt_msg_iter *notit) +{ + if (notit->emit_stream_end_msg) { + notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END; + } else { + notit->state = STATE_DONE; + } + + return BT_MSG_ITER_STATUS_OK; +} + static inline enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) { @@ -1448,10 +1573,25 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) case STATE_AFTER_STREAM_PACKET_CONTEXT: status = after_packet_context_state(notit); break; + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: + status = check_emit_msg_stream_beginning_state(notit); + break; case STATE_EMIT_MSG_STREAM_BEGINNING: notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING; break; case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS; + break; + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + status = check_emit_msg_discarded_events(notit); + break; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + notit->state = STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS; + break; + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + status = check_emit_msg_discarded_packets(notit); + break; + case STATE_EMIT_MSG_DISCARDED_PACKETS: notit->state = STATE_EMIT_MSG_PACKET_BEGINNING; break; case STATE_EMIT_MSG_PACKET_BEGINNING: @@ -1494,7 +1634,10 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) notit->state = STATE_SKIP_PACKET_PADDING; break; case STATE_EMIT_MSG_PACKET_END_SINGLE: - notit->state = STATE_EMIT_MSG_STREAM_ACTIVITY_END; + notit->state = STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END; + break; + case STATE_CHECK_EMIT_MSG_STREAM_ACTIVITY_END: + status = check_emit_msg_stream_activity_end(notit); break; case STATE_EMIT_MSG_STREAM_ACTIVITY_END: notit->state = STATE_EMIT_MSG_STREAM_END; @@ -1517,11 +1660,8 @@ enum bt_msg_iter_status handle_state(struct bt_msg_iter *notit) return status; } -/** - * Resets the internal state of a CTF message iterator. - */ BT_HIDDEN -void bt_msg_iter_reset(struct bt_msg_iter *notit) +void bt_msg_iter_reset_for_next_stream_file(struct bt_msg_iter *notit) { BT_ASSERT(notit); BT_LOGD("Resetting message iterator: addr=%p", notit); @@ -1548,11 +1688,28 @@ void bt_msg_iter_reset(struct bt_msg_iter *notit) notit->cur_exp_packet_content_size = -1; notit->cur_exp_packet_total_size = -1; notit->cur_packet_offset = -1; - notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; + notit->snapshots.beginning_clock = UINT64_C(-1); + notit->snapshots.end_clock = UINT64_C(-1); +} + +/** + * Resets the internal state of a CTF message iterator. + */ +BT_HIDDEN +void bt_msg_iter_reset(struct bt_msg_iter *notit) +{ + bt_msg_iter_reset_for_next_stream_file(notit); + notit->cur_stream_class_id = -1; notit->cur_data_stream_id = -1; notit->emit_stream_begin_msg = true; notit->emit_stream_end_msg = true; + notit->snapshots.discarded_events = UINT64_C(-1); + notit->snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.discarded_events = UINT64_C(-1); + notit->prev_packet_snapshots.packets = UINT64_C(-1); + notit->prev_packet_snapshots.beginning_clock = UINT64_C(-1); + notit->prev_packet_snapshots.end_clock = UINT64_C(-1); } static @@ -1609,6 +1766,7 @@ int bt_msg_iter_switch_packet(struct bt_msg_iter *notit) notit->cur_stream_class_id = -1; notit->cur_event_class_id = -1; notit->cur_data_stream_id = -1; + notit->prev_packet_snapshots = notit->snapshots; notit->snapshots.discarded_events = UINT64_C(-1); notit->snapshots.packets = UINT64_C(-1); notit->snapshots.beginning_clock = UINT64_C(-1); @@ -2421,6 +2579,93 @@ void create_msg_packet_end(struct bt_msg_iter *notit, bt_message **message) *message = msg; } +static +void create_msg_discarded_events(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + uint64_t beginning_raw_value = UINT64_C(-1); + uint64_t end_raw_value = UINT64_C(-1); + uint64_t count = UINT64_C(-1); + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + + if (notit->prev_packet_snapshots.discarded_events == UINT64_C(-1)) { + /* + * We discarded events, but before (and possibly + * including) the current packet: use this packet's time + * range, and do not have a specific count. + */ + beginning_raw_value = notit->snapshots.beginning_clock; + end_raw_value = notit->snapshots.end_clock; + } else { + count = notit->snapshots.discarded_events - + notit->prev_packet_snapshots.discarded_events; + BT_ASSERT(count > 0); + beginning_raw_value = notit->prev_packet_snapshots.end_clock; + end_raw_value = notit->snapshots.end_clock; + } + + if (beginning_raw_value != UINT64_C(-1) && + end_raw_value != UINT64_C(-1)) { + msg = bt_message_discarded_events_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, beginning_raw_value, + end_raw_value); + } else { + msg = bt_message_discarded_events_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded events message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + if (count != UINT64_C(-1)) { + bt_message_discarded_events_set_count(msg, count); + } + + *message = msg; +} + +static +void create_msg_discarded_packets(struct bt_msg_iter *notit, + bt_message **message) +{ + bt_message *msg; + + BT_ASSERT(notit->msg_iter); + BT_ASSERT(notit->stream); + BT_ASSERT(notit->prev_packet_snapshots.packets != + UINT64_C(-1)); + + if (notit->prev_packet_snapshots.end_clock != UINT64_C(-1) && + notit->snapshots.beginning_clock != UINT64_C(-1)) { + msg = bt_message_discarded_packets_create_with_default_clock_snapshots( + notit->msg_iter, notit->stream, + notit->prev_packet_snapshots.end_clock, + notit->snapshots.beginning_clock); + } else { + msg = bt_message_discarded_packets_create(notit->msg_iter, + notit->stream); + } + + if (!msg) { + BT_LOGE("Cannot create discarded packets message: " + "notit-addr=%p, stream-addr=%p", + notit, notit->stream); + return; + } + + bt_message_discarded_packets_set_count(msg, + notit->snapshots.packets - + notit->prev_packet_snapshots.packets - 1); + *message = msg; +} + BT_HIDDEN struct bt_msg_iter *bt_msg_iter_create(struct ctf_trace_class *tc, size_t max_request_sz, @@ -2524,6 +2769,7 @@ enum bt_msg_iter_status bt_msg_iter_get_next_message( BT_ASSERT(notit); BT_ASSERT(message); notit->msg_iter = msg_iter; + notit->set_stream = true; BT_LOGV("Getting next message: notit-addr=%p", notit); while (true) { @@ -2543,6 +2789,24 @@ enum bt_msg_iter_status bt_msg_iter_get_next_message( set_event_default_clock_snapshot(notit); *message = notit->event_msg; notit->event_msg = NULL; + goto end; + case STATE_EMIT_MSG_DISCARDED_EVENTS: + /* create_msg_discared_events() logs errors */ + create_msg_discarded_events(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + + goto end; + case STATE_EMIT_MSG_DISCARDED_PACKETS: + /* create_msg_discared_packets() logs errors */ + create_msg_discarded_packets(notit, message); + + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; + } + goto end; case STATE_EMIT_MSG_PACKET_BEGINNING: /* create_msg_packet_beginning() logs errors */ @@ -2564,62 +2828,41 @@ enum bt_msg_iter_status bt_msg_iter_get_next_message( goto end; case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: - if (notit->emit_stream_begin_msg) { - /* create_msg_stream_activity_beginning() logs errors */ - create_msg_stream_activity_beginning(notit, message); - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - } + /* create_msg_stream_activity_beginning() logs errors */ + create_msg_stream_activity_beginning(notit, message); - goto end; + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } - break; + goto end; case STATE_EMIT_MSG_STREAM_ACTIVITY_END: - if (notit->emit_stream_end_msg) { - /* create_msg_stream_activity_end() logs errors */ - create_msg_stream_activity_end(notit, message); - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - } + /* create_msg_stream_activity_end() logs errors */ + create_msg_stream_activity_end(notit, message); - goto end; + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } - break; + goto end; case STATE_EMIT_MSG_STREAM_BEGINNING: - status = set_current_stream(notit); - if (status != BT_MSG_ITER_STATUS_OK) { - goto end; - } - - if (notit->emit_stream_begin_msg) { - /* create_msg_stream_beginning() logs errors */ - create_msg_stream_beginning(notit, message); - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - } + /* create_msg_stream_beginning() logs errors */ + create_msg_stream_beginning(notit, message); - goto end; + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } - break; + goto end; case STATE_EMIT_MSG_STREAM_END: - if (notit->emit_stream_end_msg) { - /* create_msg_stream_end() logs errors */ - create_msg_stream_end(notit, message); - - if (!*message) { - status = BT_MSG_ITER_STATUS_ERROR; - } + /* create_msg_stream_end() logs errors */ + create_msg_stream_end(notit, message); - goto end; + if (!*message) { + status = BT_MSG_ITER_STATUS_ERROR; } - break; + goto end; case STATE_DONE: status = BT_MSG_ITER_STATUS_EOF; goto end; @@ -2641,6 +2884,7 @@ enum bt_msg_iter_status read_packet_header_context_fields( enum bt_msg_iter_status status = BT_MSG_ITER_STATUS_OK; BT_ASSERT(notit); + notit->set_stream = false; if (notit->state == STATE_EMIT_MSG_PACKET_BEGINNING) { /* We're already there */ @@ -2672,8 +2916,13 @@ enum bt_msg_iter_status read_packet_header_context_fields( case STATE_DSCOPE_STREAM_PACKET_CONTEXT_BEGIN: case STATE_DSCOPE_STREAM_PACKET_CONTEXT_CONTINUE: case STATE_AFTER_STREAM_PACKET_CONTEXT: + case STATE_CHECK_EMIT_MSG_STREAM_BEGINNING: case STATE_EMIT_MSG_STREAM_BEGINNING: case STATE_EMIT_MSG_STREAM_ACTIVITY_BEGINNING: + case STATE_CHECK_EMIT_MSG_DISCARDED_EVENTS: + case STATE_EMIT_MSG_DISCARDED_EVENTS: + case STATE_CHECK_EMIT_MSG_DISCARDED_PACKETS: + case STATE_EMIT_MSG_DISCARDED_PACKETS: /* Non-emitting state: continue */ break; default: diff --git a/plugins/ctf/common/msg-iter/msg-iter.h b/plugins/ctf/common/msg-iter/msg-iter.h index b7830b90f..e2aaa9385 100644 --- a/plugins/ctf/common/msg-iter/msg-iter.h +++ b/plugins/ctf/common/msg-iter/msg-iter.h @@ -325,6 +325,12 @@ enum bt_msg_iter_status bt_msg_iter_seek( BT_HIDDEN void bt_msg_iter_reset(struct bt_msg_iter *notit); +/* + * Like bt_msg_iter_reset(), but preserves stream-dependent state. + */ +BT_HIDDEN +void bt_msg_iter_reset_for_next_stream_file(struct bt_msg_iter *notit); + BT_HIDDEN void bt_msg_iter_set_emit_stream_beginning_message(struct bt_msg_iter *notit, bool val); diff --git a/plugins/ctf/fs-src/fs.c b/plugins/ctf/fs-src/fs.c index f9eb089d6..29c22eb55 100644 --- a/plugins/ctf/fs-src/fs.c +++ b/plugins/ctf/fs-src/fs.c @@ -129,7 +129,8 @@ bt_self_message_iterator_status ctf_fs_iterator_next_one( } msg_iter_data->ds_file_info_index++; - bt_msg_iter_reset(msg_iter_data->msg_iter); + bt_msg_iter_reset_for_next_stream_file( + msg_iter_data->msg_iter); set_msg_iter_emits_stream_beginning_end_messages( msg_iter_data); -- 2.34.1