+
+static
+bt_self_message_iterator_status state_set_trimmer_iterator_bounds(
+ struct trimmer_iterator *trimmer_it)
+{
+ bt_message_iterator_status upstream_iter_status =
+ BT_MESSAGE_ITERATOR_STATUS_OK;
+ struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
+ bt_message_array_const msgs;
+ uint64_t count = 0;
+ int64_t ns_from_origin = INT64_MIN;
+ uint64_t i;
+ int ret;
+
+ BT_ASSERT(!trimmer_it->begin.is_set ||
+ !trimmer_it->end.is_set);
+
+ while (true) {
+ upstream_iter_status =
+ bt_self_component_port_input_message_iterator_next(
+ trimmer_it->upstream_iter, &msgs, &count);
+ if (upstream_iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ for (i = 0; i < count; i++) {
+ const bt_message *msg = msgs[i];
+ bool skip = false;
+ int ret;
+
+ ret = get_msg_ns_from_origin(msg, &ns_from_origin,
+ &skip);
+ if (ret) {
+ goto error;
+ }
+
+ if (skip) {
+ continue;
+ }
+
+ BT_ASSERT(ns_from_origin != INT64_MIN &&
+ ns_from_origin != INT64_MAX);
+ put_messages(msgs, count);
+ goto found;
+ }
+
+ put_messages(msgs, count);
+ }
+
+found:
+ if (!trimmer_it->begin.is_set) {
+ BT_ASSERT(!trimmer_it->begin.is_infinite);
+ ret = set_trimmer_iterator_bound(&trimmer_it->begin,
+ ns_from_origin, trimmer_comp->is_gmt);
+ if (ret) {
+ goto error;
+ }
+ }
+
+ if (!trimmer_it->end.is_set) {
+ BT_ASSERT(!trimmer_it->end.is_infinite);
+ ret = set_trimmer_iterator_bound(&trimmer_it->end,
+ ns_from_origin, trimmer_comp->is_gmt);
+ if (ret) {
+ goto error;
+ }
+ }
+
+ ret = validate_trimmer_bounds(&trimmer_it->begin,
+ &trimmer_it->end);
+ if (ret) {
+ goto error;
+ }
+
+ goto end;
+
+error:
+ put_messages(msgs, count);
+ upstream_iter_status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+
+end:
+ return (int) upstream_iter_status;
+}
+
+static
+bt_self_message_iterator_status state_seek_initially(
+ struct trimmer_iterator *trimmer_it)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+
+ BT_ASSERT(trimmer_it->begin.is_set);
+
+ if (trimmer_it->begin.is_infinite) {
+ if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
+ trimmer_it->upstream_iter)) {
+ BT_LOGE_STR("Cannot make upstream message iterator initially seek its beginning.");
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ goto end;
+ }
+
+ status = (int) bt_self_component_port_input_message_iterator_seek_beginning(
+ trimmer_it->upstream_iter);
+ } else {
+ if (!bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
+ trimmer_it->upstream_iter,
+ trimmer_it->begin.ns_from_origin)) {
+ BT_LOGE("Cannot make upstream message iterator initially seek: "
+ "seek-ns-from-origin=%" PRId64,
+ trimmer_it->begin.ns_from_origin);
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ goto end;
+ }
+
+ status = (int) bt_self_component_port_input_message_iterator_seek_ns_from_origin(
+ trimmer_it->upstream_iter, trimmer_it->begin.ns_from_origin);
+ }
+
+ if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ trimmer_it->state = TRIMMER_ITERATOR_STATE_TRIM;
+ }
+
+end:
+ return status;
+}
+
+static inline
+void push_message(struct trimmer_iterator *trimmer_it, const bt_message *msg)
+{
+ g_queue_push_head(trimmer_it->output_messages, (void *) msg);
+}
+
+static inline
+const bt_message *pop_message(struct trimmer_iterator *trimmer_it)
+{
+ return g_queue_pop_tail(trimmer_it->output_messages);
+}
+
+static inline
+int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class,
+ int64_t ns_from_origin, uint64_t *raw_value)
+{
+
+ int64_t cc_offset_s;
+ uint64_t cc_offset_cycles;
+ uint64_t cc_freq;
+
+ bt_clock_class_get_offset(clock_class, &cc_offset_s, &cc_offset_cycles);
+ cc_freq = bt_clock_class_get_frequency(clock_class);
+ return bt_common_clock_value_from_ns_from_origin(cc_offset_s,
+ cc_offset_cycles, cc_freq, ns_from_origin, raw_value);
+}
+
+static inline
+bt_self_message_iterator_status end_stream(struct trimmer_iterator *trimmer_it,
+ struct trimmer_iterator_stream_state *sstate)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ uint64_t raw_value;
+ const bt_clock_class *clock_class;
+ int ret;
+ bt_message *msg = NULL;
+
+ BT_ASSERT(!trimmer_it->end.is_infinite);
+
+ if (!sstate->stream) {
+ goto end;
+ }
+
+ if (sstate->cur_packet) {
+ /*
+ * The last message could not have been a stream
+ * activity end message if we have a current packet.
+ */
+ BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
+
+ /*
+ * Create and push a packet end message, making its time
+ * the trimming range's end time.
+ */
+ clock_class = bt_stream_class_borrow_default_clock_class_const(
+ bt_stream_borrow_class_const(sstate->stream));
+ BT_ASSERT(clock_class);
+ ret = clock_raw_value_from_ns_from_origin(clock_class,
+ trimmer_it->end.ns_from_origin, &raw_value);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ msg = bt_message_packet_end_create_with_default_clock_snapshot(
+ trimmer_it->self_msg_iter, sstate->cur_packet,
+ raw_value);
+ if (!msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
+
+ /*
+ * Because we generated a packet end message, set the
+ * stream activity end message's time to use to the
+ * trimming range's end time (this packet end message's
+ * time).
+ */
+ sstate->stream_act_end_ns_from_origin =
+ trimmer_it->end.ns_from_origin;
+ }
+
+ if (!sstate->last_msg_is_stream_activity_end) {
+ /* Create and push a stream activity end message */
+ msg = bt_message_stream_activity_end_create(
+ trimmer_it->self_msg_iter, sstate->stream);
+ if (!msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ clock_class = bt_stream_class_borrow_default_clock_class_const(
+ bt_stream_borrow_class_const(sstate->stream));
+ BT_ASSERT(clock_class);
+
+ if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
+ /*
+ * We received at least what is necessary to
+ * have a stream state (stream beginning and
+ * stream activity beginning messages), but
+ * nothing else: use the trimmer range's end
+ * time.
+ */
+ sstate->stream_act_end_ns_from_origin =
+ trimmer_it->end.ns_from_origin;
+ }
+
+ ret = clock_raw_value_from_ns_from_origin(clock_class,
+ sstate->stream_act_end_ns_from_origin, &raw_value);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ bt_message_stream_activity_end_set_default_clock_snapshot(
+ msg, raw_value);
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ }
+
+ /* Create and push a stream end message */
+ msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
+ sstate->stream);
+ if (!msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ push_message(trimmer_it, msg);
+ msg = NULL;
+
+ /*
+ * Just to make sure that we don't use this stream state again
+ * in the future without an obvious error.
+ */
+ sstate->stream = NULL;
+
+end:
+ bt_message_put_ref(msg);
+ return status;
+}
+
+static inline
+bt_self_message_iterator_status end_iterator_streams(
+ struct trimmer_iterator *trimmer_it)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ GHashTableIter iter;
+ gpointer key, sstate;
+
+ if (trimmer_it->end.is_infinite) {
+ /*
+ * An infinite trimming range's end time guarantees that
+ * we received (and pushed) all the appropriate end
+ * messages.
+ */
+ goto remove_all;
+ }
+
+ /*
+ * End each stream and then remove them from the hash table of
+ * stream states to release unneeded references.
+ */
+ g_hash_table_iter_init(&iter, trimmer_it->stream_states);
+
+ while (g_hash_table_iter_next(&iter, &key, &sstate)) {
+ status = end_stream(trimmer_it, sstate);
+ if (status) {
+ goto end;
+ }
+ }
+
+remove_all:
+ g_hash_table_remove_all(trimmer_it->stream_states);
+
+end:
+ return status;
+}
+
+static inline
+bt_self_message_iterator_status create_stream_beginning_activity_message(
+ struct trimmer_iterator *trimmer_it,
+ const bt_stream *stream,
+ const bt_clock_class *clock_class, bt_message **msg)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+
+ BT_ASSERT(msg);
+ BT_ASSERT(!trimmer_it->begin.is_infinite);
+
+ *msg = bt_message_stream_activity_beginning_create(
+ trimmer_it->self_msg_iter, stream);
+ if (!*msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ if (clock_class) {
+ int ret;
+ uint64_t raw_value;
+
+ ret = clock_raw_value_from_ns_from_origin(clock_class,
+ trimmer_it->begin.ns_from_origin, &raw_value);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ bt_message_put_ref(*msg);
+ goto end;
+ }
+
+ bt_message_stream_activity_beginning_set_default_clock_snapshot(
+ *msg, raw_value);
+ }
+
+end:
+ return status;
+}
+
+/*
+ * Makes sure to initialize a stream state, pushing the appropriate
+ * initial messages.
+ *
+ * `stream_act_beginning_msg` is an initial stream activity beginning
+ * message to potentially use, depending on its clock snapshot state.
+ * This function consumes `stream_act_beginning_msg` unconditionally.
+ */
+static inline
+bt_self_message_iterator_status ensure_stream_state_is_inited(
+ struct trimmer_iterator *trimmer_it,
+ struct trimmer_iterator_stream_state *sstate,
+ const bt_message *stream_act_beginning_msg)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ bt_message *new_msg = NULL;
+ const bt_clock_class *clock_class =
+ bt_stream_class_borrow_default_clock_class_const(
+ bt_stream_borrow_class_const(sstate->stream));
+
+ BT_ASSERT(!sstate->inited);
+
+ if (!sstate->stream_beginning_msg) {
+ /* No initial stream beginning message: create one */
+ sstate->stream_beginning_msg =
+ bt_message_stream_beginning_create(
+ trimmer_it->self_msg_iter, sstate->stream);
+ if (!sstate->stream_beginning_msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+ }
+
+ /* Push initial stream beginning message */
+ BT_ASSERT(sstate->stream_beginning_msg);
+ push_message(trimmer_it, sstate->stream_beginning_msg);
+ sstate->stream_beginning_msg = NULL;
+
+ if (stream_act_beginning_msg) {
+ /*
+ * Initial stream activity beginning message exists: if
+ * its time is -inf, then create and push a new one
+ * having the trimming range's beginning time. Otherwise
+ * push it as is (known and unknown).
+ */
+ const bt_clock_snapshot *cs;
+ bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+
+ sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+ stream_act_beginning_msg, &cs);
+ if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE &&
+ !trimmer_it->begin.is_infinite) {
+ /*
+ * -inf time: use trimming range's beginning
+ * time (which is not -inf).
+ */
+ status = create_stream_beginning_activity_message(
+ trimmer_it, sstate->stream, clock_class,
+ &new_msg);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ push_message(trimmer_it, new_msg);
+ new_msg = NULL;
+ } else {
+ /* Known/unknown: push as is */
+ push_message(trimmer_it, stream_act_beginning_msg);
+ stream_act_beginning_msg = NULL;
+ }
+ } else {
+ BT_ASSERT(!trimmer_it->begin.is_infinite);
+
+ /*
+ * No stream beginning activity message: create and push
+ * a new message.
+ */
+ status = create_stream_beginning_activity_message(
+ trimmer_it, sstate->stream, clock_class, &new_msg);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ push_message(trimmer_it, new_msg);
+ new_msg = NULL;
+ }
+
+ sstate->inited = true;
+
+end:
+ bt_message_put_ref(new_msg);
+ bt_message_put_ref(stream_act_beginning_msg);
+ return status;
+}
+
+static inline
+bt_self_message_iterator_status ensure_cur_packet_exists(
+ struct trimmer_iterator *trimmer_it,
+ struct trimmer_iterator_stream_state *sstate,
+ const bt_packet *packet)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ int ret;
+ const bt_clock_class *clock_class =
+ bt_stream_class_borrow_default_clock_class_const(
+ bt_stream_borrow_class_const(sstate->stream));
+ bt_message *msg = NULL;
+ uint64_t raw_value;
+
+ BT_ASSERT(!trimmer_it->begin.is_infinite);
+ BT_ASSERT(!sstate->cur_packet);
+
+ /*
+ * Create and push an initial packet beginning message,
+ * making its time the trimming range's beginning time.
+ */
+ ret = clock_raw_value_from_ns_from_origin(clock_class,
+ trimmer_it->begin.ns_from_origin, &raw_value);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ msg = bt_message_packet_beginning_create_with_default_clock_snapshot(
+ trimmer_it->self_msg_iter, packet, raw_value);
+ if (!msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ push_message(trimmer_it, msg);
+ msg = NULL;
+
+ /* Set packet as this stream's current packet */
+ sstate->cur_packet = packet;
+ bt_packet_get_ref(sstate->cur_packet);
+
+end:
+ bt_message_put_ref(msg);
+ return status;
+}
+
+/*
+ * Handles a message which is associated to a given stream state. This
+ * _could_ make the iterator's output message queue grow; this could
+ * also consume the message without pushing anything to this queue, only
+ * modifying the stream state.
+ *
+ * This function consumes the `msg` reference, _whatever the outcome_.
+ *
+ * `ns_from_origin` is the message's time, as given by
+ * get_msg_ns_from_origin().
+ *
+ * This function sets `reached_end` if handling this message made the
+ * iterator reach the end of the trimming range. Note that the output
+ * message queue could contain messages even if this function sets
+ * `reached_end`.
+ */
+static inline
+bt_self_message_iterator_status handle_message_with_stream_state(
+ struct trimmer_iterator *trimmer_it, const bt_message *msg,
+ struct trimmer_iterator_stream_state *sstate,
+ int64_t ns_from_origin, bool *reached_end)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ bt_message_type msg_type = bt_message_get_type(msg);
+ int ret;
+
+ switch (msg_type) {
+ case BT_MESSAGE_TYPE_EVENT:
+ if (unlikely(!trimmer_it->end.is_infinite &&
+ ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (unlikely(!sstate->inited)) {
+ status = ensure_stream_state_is_inited(trimmer_it,
+ sstate, NULL);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ if (unlikely(!sstate->cur_packet)) {
+ const bt_event *event =
+ bt_message_event_borrow_event_const(msg);
+ const bt_packet *packet = bt_event_borrow_packet_const(
+ event);
+
+ status = ensure_cur_packet_exists(trimmer_it, sstate,
+ packet);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ BT_ASSERT(sstate->cur_packet);
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ break;
+ case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ if (unlikely(!trimmer_it->end.is_infinite &&
+ ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (unlikely(!sstate->inited)) {
+ status = ensure_stream_state_is_inited(trimmer_it,
+ sstate, NULL);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ BT_ASSERT(!sstate->cur_packet);
+ sstate->cur_packet =
+ bt_message_packet_beginning_borrow_packet_const(msg);
+ bt_packet_get_ref(sstate->cur_packet);
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ break;
+ case BT_MESSAGE_TYPE_PACKET_END:
+ sstate->stream_act_end_ns_from_origin = ns_from_origin;
+
+ if (unlikely(!trimmer_it->end.is_infinite &&
+ ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (unlikely(!sstate->inited)) {
+ status = ensure_stream_state_is_inited(trimmer_it,
+ sstate, NULL);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ if (unlikely(!sstate->cur_packet)) {
+ const bt_packet *packet =
+ bt_message_packet_end_borrow_packet_const(msg);
+
+ status = ensure_cur_packet_exists(trimmer_it, sstate,
+ packet);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ BT_ASSERT(sstate->cur_packet);
+ BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ {
+ /*
+ * `ns_from_origin` is the message's time range's
+ * beginning time here.
+ */
+ int64_t end_ns_from_origin;
+ const bt_clock_snapshot *end_cs;
+
+ if (bt_message_get_type(msg) ==
+ BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
+ /*
+ * Safe to ignore the return value because we
+ * know there's a default clock and it's always
+ * known.
+ */
+ (void) bt_message_discarded_events_borrow_default_end_clock_snapshot_const(
+ msg, &end_cs);
+ } else {
+ /*
+ * Safe to ignore the return value because we
+ * know there's a default clock and it's always
+ * known.
+ */
+ (void) bt_message_discarded_packets_borrow_default_end_clock_snapshot_const(
+ msg, &end_cs);
+ }
+
+ if (bt_clock_snapshot_get_ns_from_origin(end_cs,
+ &end_ns_from_origin)) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
+
+ if (!trimmer_it->end.is_infinite &&
+ ns_from_origin > trimmer_it->end.ns_from_origin) {
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (!trimmer_it->end.is_infinite &&
+ end_ns_from_origin > trimmer_it->end.ns_from_origin) {
+ /*
+ * This message's end time is outside the
+ * trimming time range: replace it with a new
+ * message having an end time equal to the
+ * trimming time range's end and without a
+ * count.
+ */
+ const bt_clock_class *clock_class =
+ bt_clock_snapshot_borrow_clock_class_const(
+ end_cs);
+ const bt_clock_snapshot *begin_cs;
+ bt_message *new_msg;
+ uint64_t end_raw_value;
+
+ ret = clock_raw_value_from_ns_from_origin(clock_class,
+ trimmer_it->end.ns_from_origin, &end_raw_value);
+ if (ret) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ if (msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
+ (void) bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+ msg, &begin_cs);
+ new_msg = bt_message_discarded_events_create_with_default_clock_snapshots(
+ trimmer_it->self_msg_iter,
+ sstate->stream,
+ bt_clock_snapshot_get_value(begin_cs),
+ end_raw_value);
+ } else {
+ (void) bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+ msg, &begin_cs);
+ new_msg = bt_message_discarded_packets_create_with_default_clock_snapshots(
+ trimmer_it->self_msg_iter,
+ sstate->stream,
+ bt_clock_snapshot_get_value(begin_cs),
+ end_raw_value);
+ }
+
+ if (!new_msg) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ /* Replace the original message */
+ BT_MESSAGE_MOVE_REF(msg, new_msg);
+ }
+
+ if (unlikely(!sstate->inited)) {
+ status = ensure_stream_state_is_inited(trimmer_it,
+ sstate, NULL);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ break;
+ }
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ if (!trimmer_it->end.is_infinite &&
+ ns_from_origin > trimmer_it->end.ns_from_origin) {
+ /*
+ * This only happens when the message's time is
+ * known and is greater than the trimming
+ * range's end time. Unknown and -inf times are
+ * always less than
+ * `trimmer_it->end.ns_from_origin`.
+ */
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (!sstate->inited) {
+ status = ensure_stream_state_is_inited(trimmer_it,
+ sstate, msg);
+ msg = NULL;
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ } else {
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ if (trimmer_it->end.is_infinite) {
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ break;
+ }
+
+ if (ns_from_origin == INT64_MIN) {
+ /* Unknown: push as is if stream state is inited */
+ if (sstate->inited) {
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ sstate->last_msg_is_stream_activity_end = true;
+ }
+ } else if (ns_from_origin == INT64_MAX) {
+ /* Infinite: use trimming range's end time */
+ sstate->stream_act_end_ns_from_origin =
+ trimmer_it->end.ns_from_origin;
+ } else {
+ /* Known: check if outside of trimming range */
+ if (ns_from_origin > trimmer_it->end.ns_from_origin) {
+ sstate->stream_act_end_ns_from_origin =
+ trimmer_it->end.ns_from_origin;
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
+
+ if (!sstate->inited) {
+ /*
+ * First message for this stream is a
+ * stream activity end: we can't deduce
+ * anything about the stream activity
+ * beginning's time, and using this
+ * message's time would make a useless
+ * pair of stream activity beginning/end
+ * with the same time. Just skip this
+ * message and wait for something
+ * useful.
+ */
+ break;
+ }
+
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ sstate->last_msg_is_stream_activity_end = true;
+ sstate->stream_act_end_ns_from_origin = ns_from_origin;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+ /*
+ * We don't know what follows at this point, so just
+ * keep this message until we know what to do with it
+ * (it will be used in ensure_stream_state_is_inited()).
+ */
+ BT_ASSERT(!sstate->inited);
+ BT_MESSAGE_MOVE_REF(sstate->stream_beginning_msg, msg);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_END:
+ if (sstate->inited) {
+ /*
+ * This is the end of an inited stream: end this
+ * stream if its stream activity end message
+ * time is not the trimming range's end time
+ * (which means the final stream activity end
+ * message had an infinite time). end_stream()
+ * will generate its own stream end message.
+ */
+ if (trimmer_it->end.is_infinite) {
+ push_message(trimmer_it, msg);
+ msg = NULL;
+ g_hash_table_remove(trimmer_it->stream_states,
+ sstate->stream);
+ } else if (sstate->stream_act_end_ns_from_origin <
+ trimmer_it->end.ns_from_origin) {
+ status = end_stream(trimmer_it, sstate);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ /* We won't need this stream state again */
+ g_hash_table_remove(trimmer_it->stream_states,
+ sstate->stream);
+ }
+ } else {
+ /* We dont't need this stream state anymore */
+ g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
+ }
+
+ break;
+ default:
+ break;
+ }
+
+end:
+ /* We release the message's reference whatever the outcome */
+ bt_message_put_ref(msg);
+ return BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+}
+
+/*
+ * Handles an input message. This _could_ make the iterator's output
+ * message queue grow; this could also consume the message without
+ * pushing anything to this queue, only modifying the stream state.
+ *
+ * This function consumes the `msg` reference, _whatever the outcome_.
+ *
+ * This function sets `reached_end` if handling this message made the
+ * iterator reach the end of the trimming range. Note that the output
+ * message queue could contain messages even if this function sets
+ * `reached_end`.
+ */
+static inline
+bt_self_message_iterator_status handle_message(
+ struct trimmer_iterator *trimmer_it, const bt_message *msg,
+ bool *reached_end)
+{
+ bt_self_message_iterator_status status;
+ const bt_stream *stream = NULL;
+ int64_t ns_from_origin = INT64_MIN;
+ bool skip;
+ int ret;
+ struct trimmer_iterator_stream_state *sstate = NULL;
+
+ /* Find message's associated stream */
+ switch (bt_message_get_type(msg)) {
+ case BT_MESSAGE_TYPE_EVENT:
+ stream = bt_event_borrow_stream_const(
+ bt_message_event_borrow_event_const(msg));
+ break;
+ case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ stream = bt_packet_borrow_stream_const(
+ bt_message_packet_beginning_borrow_packet_const(msg));
+ break;
+ case BT_MESSAGE_TYPE_PACKET_END:
+ stream = bt_packet_borrow_stream_const(
+ bt_message_packet_end_borrow_packet_const(msg));
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ stream = bt_message_discarded_events_borrow_stream_const(msg);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ stream = bt_message_discarded_packets_borrow_stream_const(msg);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ stream = bt_message_stream_activity_end_borrow_stream_const(msg);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+ stream = bt_message_stream_beginning_borrow_stream_const(msg);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_END:
+ stream = bt_message_stream_end_borrow_stream_const(msg);
+ break;
+ default:
+ break;
+ }
+
+ if (likely(stream)) {
+ /* Find stream state */
+ sstate = g_hash_table_lookup(trimmer_it->stream_states,
+ stream);
+ if (unlikely(!sstate)) {
+ /* No stream state yet: create one now */
+ const bt_stream_class *sc;
+
+ /*
+ * Validate right now that the stream's class
+ * has a registered default clock class so that
+ * an existing stream state guarantees existing
+ * default clock snapshots for its associated
+ * messages.
+ *
+ * Also check that clock snapshots are always
+ * known.
+ */
+ sc = bt_stream_borrow_class_const(stream);
+ if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
+ BT_LOGE("Unsupported stream: stream class does "
+ "not have a default clock class: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ if (!bt_stream_class_default_clock_is_always_known(sc)) {
+ BT_LOGE("Unsupported stream: clock does not "
+ "always have a known value: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ sstate = g_new0(struct trimmer_iterator_stream_state,
+ 1);
+ if (!sstate) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_NOMEM;
+ goto end;
+ }
+
+ sstate->stream = stream;
+ sstate->stream_act_end_ns_from_origin = INT64_MIN;
+ g_hash_table_insert(trimmer_it->stream_states,
+ (void *) stream, sstate);
+ }
+ }
+
+ /* Retrieve the message's time */
+ ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
+ if (unlikely(ret)) {
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+
+ if (likely(sstate)) {
+ /* Message associated to a stream */
+ status = handle_message_with_stream_state(trimmer_it, msg,
+ sstate, ns_from_origin, reached_end);
+
+ /*
+ * handle_message_with_stream_state() unconditionally
+ * consumes `msg`.
+ */
+ msg = NULL;
+ } else {
+ /*
+ * Message not associated to a stream (message iterator
+ * inactivity).
+ */
+ if (unlikely(ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ BT_MESSAGE_PUT_REF_AND_RESET(msg);
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ } else {
+ push_message(trimmer_it, msg);
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ msg = NULL;
+ }
+ }
+
+end:
+ /* We release the message's reference whatever the outcome */
+ bt_message_put_ref(msg);
+ return status;
+}
+
+static inline
+void fill_message_array_from_output_messages(
+ struct trimmer_iterator *trimmer_it,
+ bt_message_array_const msgs, uint64_t capacity, uint64_t *count)
+{
+ *count = 0;
+
+ /*
+ * Move auto-seek messages to the output array (which is this
+ * iterator's base message array).
+ */
+ while (capacity > 0 && !g_queue_is_empty(trimmer_it->output_messages)) {
+ msgs[*count] = pop_message(trimmer_it);
+ capacity--;
+ (*count)++;
+ }
+
+ BT_ASSERT(*count > 0);
+}
+
+static inline
+bt_self_message_iterator_status state_ending(
+ struct trimmer_iterator *trimmer_it,
+ bt_message_array_const msgs, uint64_t capacity,
+ uint64_t *count)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+
+ if (g_queue_is_empty(trimmer_it->output_messages)) {
+ trimmer_it->state = TRIMMER_ITERATOR_STATE_ENDED;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
+ goto end;
+ }
+
+ fill_message_array_from_output_messages(trimmer_it, msgs,
+ capacity, count);
+
+end:
+ return status;
+}
+
+static inline
+bt_self_message_iterator_status state_trim(struct trimmer_iterator *trimmer_it,
+ bt_message_array_const msgs, uint64_t capacity,
+ uint64_t *count)
+{
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+ bt_message_array_const my_msgs;
+ uint64_t my_count;
+ uint64_t i;
+ bool reached_end = false;
+
+ while (g_queue_is_empty(trimmer_it->output_messages)) {
+ status = (int) bt_self_component_port_input_message_iterator_next(
+ trimmer_it->upstream_iter, &my_msgs, &my_count);
+ if (unlikely(status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
+ if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
+ status = end_iterator_streams(trimmer_it);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ trimmer_it->state =
+ TRIMMER_ITERATOR_STATE_ENDING;
+ status = state_ending(trimmer_it, msgs,
+ capacity, count);
+ }
+
+ goto end;
+ }
+
+ BT_ASSERT(my_count > 0);
+
+ for (i = 0; i < my_count; i++) {
+ status = handle_message(trimmer_it, my_msgs[i],
+ &reached_end);
+
+ /*
+ * handle_message() unconditionally consumes the
+ * message reference.
+ */
+ my_msgs[i] = NULL;
+
+ if (unlikely(status !=
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK)) {
+ put_messages(my_msgs, my_count);
+ goto end;
+ }
+
+ if (unlikely(reached_end)) {
+ /*
+ * This message's time was passed the
+ * trimming time range's end time: we
+ * are done. Their might still be
+ * messages in the output message queue,
+ * so move to the "ending" state and
+ * apply it immediately since
+ * state_trim() is called within the
+ * "next" method.
+ */
+ put_messages(my_msgs, my_count);
+ trimmer_it->state =
+ TRIMMER_ITERATOR_STATE_ENDING;
+ status = state_ending(trimmer_it, msgs,
+ capacity, count);
+ goto end;
+ }
+ }
+ }
+
+ /*
+ * There's at least one message in the output message queue:
+ * move the messages to the output message array.
+ */
+ BT_ASSERT(!g_queue_is_empty(trimmer_it->output_messages));
+ fill_message_array_from_output_messages(trimmer_it, msgs,
+ capacity, count);
+
+end:
+ return status;
+}
+
+BT_HIDDEN
+bt_self_message_iterator_status trimmer_msg_iter_next(
+ bt_self_message_iterator *self_msg_iter,
+ bt_message_array_const msgs, uint64_t capacity,
+ uint64_t *count)
+{
+ struct trimmer_iterator *trimmer_it =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
+
+ BT_ASSERT(trimmer_it);
+
+ if (likely(trimmer_it->state == TRIMMER_ITERATOR_STATE_TRIM)) {
+ status = state_trim(trimmer_it, msgs, capacity, count);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ } else {
+ switch (trimmer_it->state) {
+ case TRIMMER_ITERATOR_STATE_SET_BOUNDS_NS_FROM_ORIGIN:
+ status = state_set_trimmer_iterator_bounds(trimmer_it);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ status = state_seek_initially(trimmer_it);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ status = state_trim(trimmer_it, msgs, capacity, count);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ break;
+ case TRIMMER_ITERATOR_STATE_SEEK_INITIALLY:
+ status = state_seek_initially(trimmer_it);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ status = state_trim(trimmer_it, msgs, capacity, count);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ break;
+ case TRIMMER_ITERATOR_STATE_ENDING:
+ status = state_ending(trimmer_it, msgs, capacity,
+ count);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ break;
+ case TRIMMER_ITERATOR_STATE_ENDED:
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
+ break;
+ default:
+ abort();
+ }
+ }
+
+end:
+ return status;
+}
+
+BT_HIDDEN
+void trimmer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
+{
+ struct trimmer_iterator *trimmer_it =
+ bt_self_message_iterator_get_data(self_msg_iter);
+
+ BT_ASSERT(trimmer_it);
+ destroy_trimmer_iterator(trimmer_it);
+}