#include <stdint.h>
#include <inttypes.h>
#include <glib.h>
+#include "compat/glib.h"
#include "trimmer.h"
};
struct trimmer_iterator_stream_state {
- /*
- * True if the last pushed message for this stream was a stream
- * activity end message.
- */
- bool last_msg_is_stream_activity_end;
-
- /*
- * Time to use for a generated stream end activity message when
- * ending the stream.
- */
- int64_t stream_act_end_ns_from_origin;
-
/* Weak */
const bt_stream *stream;
+ /* Have we seen a message with clock_snapshot going through this stream? */
+ bool seen_clock_snapshot;
+
/* Owned by this (`NULL` initially and between packets) */
const bt_packet *cur_packet;
};
* FALSE. If there's no match, we have no use for it, so free
* it immediatly and don't return it to the caller.
*/
- g_match_info_unref(*match_info);
+ g_match_info_free(*match_info);
*match_info = NULL;
}
char tmp_arg[64];
if (bt_value_is_signed_integer(param)) {
- int64_t value = bt_value_signed_integer_get(param);
+ int64_t value = bt_value_integer_signed_get(param);
/*
* Just convert it to a temporary string to handle
trimmer_it->begin = trimmer_it->trimmer_comp->begin;
trimmer_it->end = trimmer_it->trimmer_comp->end;
trimmer_it->upstream_iter =
- bt_self_component_port_input_message_iterator_create(
+ bt_self_component_port_input_message_iterator_create_from_message_iterator(
+ self_msg_iter,
bt_self_component_filter_borrow_input_port_by_name(
self_comp, in_port_name));
if (!trimmer_it->upstream_iter) {
static inline
int get_msg_ns_from_origin(const bt_message *msg, int64_t *ns_from_origin,
- bool *skip)
+ bool *has_clock_snapshot)
{
const bt_clock_class *clock_class = NULL;
const bt_clock_snapshot *clock_snapshot = NULL;
- bt_message_stream_activity_clock_snapshot_state sa_cs_state;
int ret = 0;
BT_ASSERT(msg);
BT_ASSERT(ns_from_origin);
- BT_ASSERT(skip);
+ BT_ASSERT(has_clock_snapshot);
switch (bt_message_get_type(msg)) {
case BT_MESSAGE_TYPE_EVENT:
clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
msg);
break;
- case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+ {
+ enum bt_message_stream_clock_snapshot_state cs_state;
+
clock_class =
- bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
- msg);
+ bt_message_stream_beginning_borrow_stream_class_default_clock_class_const(msg);
if (G_UNLIKELY(!clock_class)) {
goto error;
}
- clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
- msg);
- break;
- case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- clock_class =
- bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
- msg);
- if (G_UNLIKELY(!clock_class)) {
- goto error;
+ cs_state = bt_message_stream_beginning_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
+ if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
}
- clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
- msg);
break;
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ }
+ case BT_MESSAGE_TYPE_STREAM_END:
+ {
+ enum bt_message_stream_clock_snapshot_state cs_state;
+
clock_class =
- bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
- msg);
+ bt_message_stream_end_borrow_stream_class_default_clock_class_const(msg);
if (G_UNLIKELY(!clock_class)) {
goto error;
}
- sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
- if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN ||
- sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
- /* Lowest possible time to always include them */
- *ns_from_origin = INT64_MIN;
+ cs_state = bt_message_stream_end_borrow_default_clock_snapshot_const(msg, &clock_snapshot);
+ if (cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) {
goto no_clock_snapshot;
}
break;
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ }
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
clock_class =
- bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+ bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
msg);
if (G_UNLIKELY(!clock_class)) {
goto error;
}
- sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
- if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN) {
- /* Lowest time to always include it */
- *ns_from_origin = INT64_MIN;
- goto no_clock_snapshot;
- } else if (sa_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE) {
- /* Greatest time to always exclude it */
- *ns_from_origin = INT64_MAX;
- goto no_clock_snapshot;
+ clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
+ msg);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ clock_class =
+ bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (G_UNLIKELY(!clock_class)) {
+ goto error;
}
+ clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
+ msg);
break;
case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
clock_snapshot =
goto error;
}
+ *has_clock_snapshot = true;
goto end;
no_clock_snapshot:
- *skip = true;
+ *has_clock_snapshot = false;
goto end;
error:
{
struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
struct tm tm;
+ struct tm *res;
time_t time_seconds = (time_t) (ns_from_origin / NS_PER_S);
int ret = 0;
/* We only need to extract the date from this time */
if (is_gmt) {
- bt_gmtime_r(&time_seconds, &tm);
+ res = bt_gmtime_r(&time_seconds, &tm);
} else {
- bt_localtime_r(&time_seconds, &tm);
+ res = bt_localtime_r(&time_seconds, &tm);
}
- if (errno) {
+ if (!res) {
BT_COMP_LOGE_ERRNO("Cannot convert timestamp to date and time",
- "ts=%" PRId64, (int64_t) time_seconds);
+ ": ts=%" PRId64, (int64_t) time_seconds);
ret = -1;
goto end;
}
for (i = 0; i < count; i++) {
const bt_message *msg = msgs[i];
- bool skip = false;
+ bool has_ns_from_origin;
int ret;
ret = get_msg_ns_from_origin(msg, &ns_from_origin,
- &skip);
+ &has_ns_from_origin);
if (ret) {
goto error;
}
- if (skip) {
+ if (!has_ns_from_origin) {
continue;
}
{
bt_component_class_message_iterator_next_method_status status =
BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
- uint64_t raw_value;
- const bt_clock_class *clock_class;
- int ret;
+ /* Initialize to silence maybe-uninitialized warning. */
+ uint64_t raw_value = 0;
bt_message *msg = NULL;
BT_ASSERT(!trimmer_it->end.is_infinite);
+ BT_ASSERT(sstate->stream);
- if (!sstate->stream) {
- goto end;
- }
-
- if (sstate->cur_packet) {
- /*
- * The last message could not have been a stream
- * activity end message if we have a current packet.
- */
- BT_ASSERT(!sstate->last_msg_is_stream_activity_end);
+ /*
+ * If we haven't seen a message with a clock snapshot, we don't know if the trimmer's end bound is within
+ * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value.
+ *
+ * Also, it would be a bit of a lie to generate a stream end message with the end bound as its
+ * clock snapshot, because we don't really know if the stream existed at that time. If we have
+ * seen a message with a clock snapshot and the stream is cut short by another message with a
+ * clock snapshot, then we are sure that the the end bound time is not below the clock range,
+ * and we know the stream was active at that time (and that we cut it short).
+ */
+ if (sstate->seen_clock_snapshot) {
+ const bt_clock_class *clock_class;
+ int ret;
- /*
- * Create and push a packet end message, making its time
- * the trimming range's end time.
- */
clock_class = bt_stream_class_borrow_default_clock_class_const(
bt_stream_borrow_class_const(sstate->stream));
BT_ASSERT(clock_class);
status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
goto end;
}
+ }
+
+ if (sstate->cur_packet) {
+ /*
+ * Create and push a packet end message, making its time
+ * the trimming range's end time.
+ *
+ * We know that we must have seen a clock snapshot, the one in
+ * the packet beginning message, since trimmer currently
+ * requires packet messages to have clock snapshots (see comment
+ * in create_stream_state_entry).
+ */
+ BT_ASSERT(sstate->seen_clock_snapshot);
msg = bt_message_packet_end_create_with_default_clock_snapshot(
trimmer_it->self_msg_iter, sstate->cur_packet,
push_message(trimmer_it, msg);
msg = NULL;
BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
-
- /*
- * Because we generated a packet end message, set the
- * stream activity end message's time to use to the
- * trimming range's end time (this packet end message's
- * time).
- */
- sstate->stream_act_end_ns_from_origin =
- trimmer_it->end.ns_from_origin;
}
- if (!sstate->last_msg_is_stream_activity_end) {
- /* Create and push a stream activity end message */
- msg = bt_message_stream_activity_end_create(
- trimmer_it->self_msg_iter, sstate->stream);
- if (!msg) {
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
- goto end;
- }
-
- clock_class = bt_stream_class_borrow_default_clock_class_const(
- bt_stream_borrow_class_const(sstate->stream));
- BT_ASSERT(clock_class);
-
- if (sstate->stream_act_end_ns_from_origin == INT64_MIN) {
- /*
- * We received at least what is necessary to
- * have a stream state (stream beginning and
- * stream activity beginning messages), but
- * nothing else: use the trimmer range's end
- * time.
- */
- sstate->stream_act_end_ns_from_origin =
- trimmer_it->end.ns_from_origin;
- }
-
- ret = clock_raw_value_from_ns_from_origin(clock_class,
- sstate->stream_act_end_ns_from_origin, &raw_value);
- if (ret) {
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- bt_message_stream_activity_end_set_default_clock_snapshot(
- msg, raw_value);
- push_message(trimmer_it, msg);
- msg = NULL;
- }
-
- /* Create and push a stream end message */
+ /* Create and push a stream end message. */
msg = bt_message_stream_end_create(trimmer_it->self_msg_iter,
sstate->stream);
if (!msg) {
goto end;
}
+ if (sstate->seen_clock_snapshot) {
+ bt_message_stream_end_set_default_clock_snapshot(msg, raw_value);
+ }
+
push_message(trimmer_it, msg);
msg = NULL;
return status;
}
+static
+bt_component_class_message_iterator_next_method_status
+create_stream_state_entry(
+ struct trimmer_iterator *trimmer_it,
+ const struct bt_stream *stream,
+ struct trimmer_iterator_stream_state **stream_state)
+{
+ struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
+ bt_component_class_message_iterator_next_method_status status;
+ struct trimmer_iterator_stream_state *sstate;
+ const bt_stream_class *sc;
+
+ BT_ASSERT(!bt_g_hash_table_contains(trimmer_it->stream_states, stream));
+
+ /*
+ * Validate right now that the stream's class
+ * has a registered default clock class so that
+ * an existing stream state guarantees existing
+ * default clock snapshots for its associated
+ * messages.
+ *
+ * Also check that clock snapshots are always
+ * known.
+ */
+ sc = bt_stream_borrow_class_const(stream);
+ if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
+ BT_COMP_LOGE("Unsupported stream: stream class does "
+ "not have a default clock class: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ goto end;
+ }
+
+ /*
+ * Temporary: make sure packet beginning, packet
+ * end, discarded events, and discarded packets
+ * messages have default clock snapshots until
+ * the support for not having them is
+ * implemented.
+ */
+ if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
+ sc)) {
+ BT_COMP_LOGE("Unsupported stream: packets have "
+ "no beginning clock snapshot: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ goto end;
+ }
+
+ if (!bt_stream_class_packets_have_end_default_clock_snapshot(
+ sc)) {
+ BT_COMP_LOGE("Unsupported stream: packets have "
+ "no end clock snapshot: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ goto end;
+ }
+
+ if (bt_stream_class_supports_discarded_events(sc) &&
+ !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
+ BT_COMP_LOGE("Unsupported stream: discarded events "
+ "have no clock snapshots: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ goto end;
+ }
+
+ if (bt_stream_class_supports_discarded_packets(sc) &&
+ !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
+ BT_COMP_LOGE("Unsupported stream: discarded packets "
+ "have no clock snapshots: "
+ "stream-addr=%p, "
+ "stream-id=%" PRIu64 ", "
+ "stream-name=\"%s\"",
+ stream, bt_stream_get_id(stream),
+ bt_stream_get_name(stream));
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+ goto end;
+ }
+
+ sstate = g_new0(struct trimmer_iterator_stream_state, 1);
+ if (!sstate) {
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
+ goto end;
+ }
+
+ sstate->stream = stream;
+
+ g_hash_table_insert(trimmer_it->stream_states, (void *) stream, sstate);
+
+ *stream_state = sstate;
+
+ status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
+
+end:
+ return status;
+}
+
+static
+struct trimmer_iterator_stream_state *get_stream_state_entry(
+ struct trimmer_iterator *trimmer_it,
+ const struct bt_stream *stream)
+{
+ struct trimmer_iterator_stream_state *sstate;
+
+ BT_ASSERT(stream);
+ sstate = g_hash_table_lookup(trimmer_it->stream_states, stream);
+ BT_ASSERT(sstate);
+
+ return sstate;
+}
+
/*
* Handles a message which is associated to a given stream state. This
* _could_ make the iterator's output message queue grow; this could
*
* This function consumes the `msg` reference, _whatever the outcome_.
*
- * `ns_from_origin` is the message's time, as given by
- * get_msg_ns_from_origin().
+ * If non-NULL, `ns_from_origin` is the message's time, as given by
+ * get_msg_ns_from_origin(). If NULL, the message doesn't have a time.
*
* This function sets `reached_end` if handling this message made the
* iterator reach the end of the trimming range. Note that the output
* message queue could contain messages even if this function sets
* `reached_end`.
*/
-static inline
+static
bt_component_class_message_iterator_next_method_status
-handle_message_with_stream_state(
+handle_message_with_stream(
struct trimmer_iterator *trimmer_it, const bt_message *msg,
- struct trimmer_iterator_stream_state *sstate,
- int64_t ns_from_origin, bool *reached_end)
+ const struct bt_stream *stream, const int64_t *ns_from_origin,
+ bool *reached_end)
{
bt_component_class_message_iterator_next_method_status status =
BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
bt_message_type msg_type = bt_message_get_type(msg);
int ret;
+ struct trimmer_iterator_stream_state *sstate = NULL;
+
+ /*
+ * Retrieve the stream's state - except if the message is stream
+ * beginning, in which case we don't know about about this stream yet.
+ */
+ if (msg_type != BT_MESSAGE_TYPE_STREAM_BEGINNING) {
+ sstate = get_stream_state_entry(trimmer_it, stream);
+ }
switch (msg_type) {
case BT_MESSAGE_TYPE_EVENT:
+ /*
+ * Event messages always have a clock snapshot if the stream
+ * class has a clock class. And we know it has, otherwise we
+ * couldn't be using the trimmer component.
+ */
+ BT_ASSERT(ns_from_origin);
+
if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
- ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ *ns_from_origin > trimmer_it->end.ns_from_origin)) {
status = end_iterator_streams(trimmer_it);
*reached_end = true;
break;
}
- BT_ASSERT(sstate->cur_packet);
+ sstate->seen_clock_snapshot = true;
+
push_message(trimmer_it, msg);
msg = NULL;
break;
+
case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ /*
+ * Packet beginning messages won't have a clock snapshot if
+ * stream_class->packets_have_beginning_default_clock_snapshot
+ * is false. But for now, assume they always do.
+ */
+ BT_ASSERT(ns_from_origin);
+ BT_ASSERT(!sstate->cur_packet);
+
if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
- ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ *ns_from_origin > trimmer_it->end.ns_from_origin)) {
status = end_iterator_streams(trimmer_it);
*reached_end = true;
break;
}
- BT_ASSERT(!sstate->cur_packet);
sstate->cur_packet =
bt_message_packet_beginning_borrow_packet_const(msg);
bt_packet_get_ref(sstate->cur_packet);
+
+ sstate->seen_clock_snapshot = true;
+
push_message(trimmer_it, msg);
msg = NULL;
break;
+
case BT_MESSAGE_TYPE_PACKET_END:
- sstate->stream_act_end_ns_from_origin = ns_from_origin;
+ /*
+ * Packet end messages won't have a clock snapshot if
+ * stream_class->packets_have_end_default_clock_snapshot
+ * is false. But for now, assume they always do.
+ */
+ BT_ASSERT(ns_from_origin);
+ BT_ASSERT(sstate->cur_packet);
if (G_UNLIKELY(!trimmer_it->end.is_infinite &&
- ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ *ns_from_origin > trimmer_it->end.ns_from_origin)) {
status = end_iterator_streams(trimmer_it);
*reached_end = true;
break;
}
- BT_ASSERT(sstate->cur_packet);
BT_PACKET_PUT_REF_AND_RESET(sstate->cur_packet);
+
+ sstate->seen_clock_snapshot = true;
+
push_message(trimmer_it, msg);
msg = NULL;
break;
+
case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
{
int64_t end_ns_from_origin;
const bt_clock_snapshot *end_cs;
+ BT_ASSERT(ns_from_origin);
+
+ sstate->seen_clock_snapshot = true;
+
if (bt_message_get_type(msg) ==
BT_MESSAGE_TYPE_DISCARDED_EVENTS) {
/*
goto end;
}
- sstate->stream_act_end_ns_from_origin = end_ns_from_origin;
-
if (!trimmer_it->end.is_infinite &&
- ns_from_origin > trimmer_it->end.ns_from_origin) {
+ *ns_from_origin > trimmer_it->end.ns_from_origin) {
status = end_iterator_streams(trimmer_it);
*reached_end = true;
break;
msg = NULL;
break;
}
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
- if (!trimmer_it->end.is_infinite &&
- ns_from_origin > trimmer_it->end.ns_from_origin) {
- /*
- * This only happens when the message's time is
- * known and is greater than the trimming
- * range's end time. Unknown and -inf times are
- * always less than
- * `trimmer_it->end.ns_from_origin`.
- */
+
+ case BT_MESSAGE_TYPE_STREAM_BEGINNING:
+ /*
+ * If this message has a time and this time is greater than the
+ * trimmer's end bound, it triggers the end of the trim window.
+ */
+ if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
+ *ns_from_origin > trimmer_it->end.ns_from_origin)) {
status = end_iterator_streams(trimmer_it);
*reached_end = true;
break;
}
- push_message(trimmer_it, msg);
- msg = NULL;
- break;
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
- if (trimmer_it->end.is_infinite) {
- push_message(trimmer_it, msg);
- msg = NULL;
- break;
+ /* Learn about this stream. */
+ status = create_stream_state_entry(trimmer_it, stream, &sstate);
+ if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
+ goto end;
}
- if (ns_from_origin == INT64_MIN) {
- /* Unknown: consider it to be in the trimmer window. */
- push_message(trimmer_it, msg);
- msg = NULL;
- sstate->last_msg_is_stream_activity_end = true;
- } else if (ns_from_origin == INT64_MAX) {
- /* Infinite: use trimming range's end time */
- sstate->stream_act_end_ns_from_origin =
- trimmer_it->end.ns_from_origin;
- } else {
- /* Known: check if outside of trimming range */
- if (ns_from_origin > trimmer_it->end.ns_from_origin) {
- sstate->stream_act_end_ns_from_origin =
- trimmer_it->end.ns_from_origin;
- status = end_iterator_streams(trimmer_it);
- *reached_end = true;
- break;
- }
-
- push_message(trimmer_it, msg);
- msg = NULL;
- sstate->last_msg_is_stream_activity_end = true;
- sstate->stream_act_end_ns_from_origin = ns_from_origin;
+ if (ns_from_origin) {
+ sstate->seen_clock_snapshot = true;
}
- break;
- case BT_MESSAGE_TYPE_STREAM_BEGINNING:
push_message(trimmer_it, msg);
msg = NULL;
break;
case BT_MESSAGE_TYPE_STREAM_END:
+ {
+ gboolean removed;
+
/*
- * This is the end of a stream: end this
- * stream if its stream activity end message
- * time is not the trimming range's end time
- * (which means the final stream activity end
- * message had an infinite time). end_stream()
- * will generate its own stream end message.
+ * If this message has a time and this time is greater than the
+ * trimmer's end bound, it triggers the end of the trim window.
*/
- if (trimmer_it->end.is_infinite) {
- push_message(trimmer_it, msg);
- msg = NULL;
+ if (G_UNLIKELY(ns_from_origin && !trimmer_it->end.is_infinite &&
+ *ns_from_origin > trimmer_it->end.ns_from_origin)) {
+ status = end_iterator_streams(trimmer_it);
+ *reached_end = true;
+ break;
+ }
- /* We won't need this stream state again */
- g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
- } else if (sstate->stream_act_end_ns_from_origin <
- trimmer_it->end.ns_from_origin) {
- status = end_stream(trimmer_it, sstate);
- if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK) {
- goto end;
- }
+ /*
+ * Either the stream end message's time is within the trimmer's
+ * bounds, or it doesn't have a time. In both cases, pass
+ * the message unmodified.
+ */
+ push_message(trimmer_it, msg);
+ msg = NULL;
- /* We won't need this stream state again */
- g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
- }
+ /* Forget about this stream. */
+ removed = g_hash_table_remove(trimmer_it->stream_states, sstate->stream);
+ BT_ASSERT(removed);
break;
+ }
default:
break;
}
end:
/* We release the message's reference whatever the outcome */
bt_message_put_ref(msg);
- return BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
+ return status;
}
/*
bt_component_class_message_iterator_next_method_status status;
const bt_stream *stream = NULL;
int64_t ns_from_origin = INT64_MIN;
- bool skip;
+ bool has_ns_from_origin = false;
int ret;
- struct trimmer_iterator_stream_state *sstate = NULL;
- struct trimmer_comp *trimmer_comp = trimmer_it->trimmer_comp;
/* Find message's associated stream */
switch (bt_message_get_type(msg)) {
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
stream = bt_message_discarded_packets_borrow_stream_const(msg);
break;
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
- stream = bt_message_stream_activity_beginning_borrow_stream_const(msg);
- break;
- case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
- stream = bt_message_stream_activity_end_borrow_stream_const(msg);
- break;
case BT_MESSAGE_TYPE_STREAM_BEGINNING:
stream = bt_message_stream_beginning_borrow_stream_const(msg);
break;
break;
}
- if (G_LIKELY(stream)) {
- /* Find stream state */
- sstate = g_hash_table_lookup(trimmer_it->stream_states,
- stream);
- if (G_UNLIKELY(!sstate)) {
- /* No stream state yet: create one now */
- const bt_stream_class *sc;
-
- /*
- * Validate right now that the stream's class
- * has a registered default clock class so that
- * an existing stream state guarantees existing
- * default clock snapshots for its associated
- * messages.
- *
- * Also check that clock snapshots are always
- * known.
- */
- sc = bt_stream_borrow_class_const(stream);
- if (!bt_stream_class_borrow_default_clock_class_const(sc)) {
- BT_COMP_LOGE("Unsupported stream: stream class does "
- "not have a default clock class: "
- "stream-addr=%p, "
- "stream-id=%" PRIu64 ", "
- "stream-name=\"%s\"",
- stream, bt_stream_get_id(stream),
- bt_stream_get_name(stream));
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- /*
- * Temporary: make sure packet beginning, packet
- * end, discarded events, and discarded packets
- * messages have default clock snapshots until
- * the support for not having them is
- * implemented.
- */
- if (!bt_stream_class_packets_have_beginning_default_clock_snapshot(
- sc)) {
- BT_COMP_LOGE("Unsupported stream: packets have "
- "no beginning clock snapshot: "
- "stream-addr=%p, "
- "stream-id=%" PRIu64 ", "
- "stream-name=\"%s\"",
- stream, bt_stream_get_id(stream),
- bt_stream_get_name(stream));
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- if (!bt_stream_class_packets_have_end_default_clock_snapshot(
- sc)) {
- BT_COMP_LOGE("Unsupported stream: packets have "
- "no end clock snapshot: "
- "stream-addr=%p, "
- "stream-id=%" PRIu64 ", "
- "stream-name=\"%s\"",
- stream, bt_stream_get_id(stream),
- bt_stream_get_name(stream));
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- if (bt_stream_class_supports_discarded_events(sc) &&
- !bt_stream_class_discarded_events_have_default_clock_snapshots(sc)) {
- BT_COMP_LOGE("Unsupported stream: discarded events "
- "have no clock snapshots: "
- "stream-addr=%p, "
- "stream-id=%" PRIu64 ", "
- "stream-name=\"%s\"",
- stream, bt_stream_get_id(stream),
- bt_stream_get_name(stream));
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- if (bt_stream_class_supports_discarded_packets(sc) &&
- !bt_stream_class_discarded_packets_have_default_clock_snapshots(sc)) {
- BT_COMP_LOGE("Unsupported stream: discarded packets "
- "have no clock snapshots: "
- "stream-addr=%p, "
- "stream-id=%" PRIu64 ", "
- "stream-name=\"%s\"",
- stream, bt_stream_get_id(stream),
- bt_stream_get_name(stream));
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
- goto end;
- }
-
- sstate = g_new0(struct trimmer_iterator_stream_state,
- 1);
- if (!sstate) {
- status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_MEMORY_ERROR;
- goto end;
- }
-
- sstate->stream = stream;
- sstate->stream_act_end_ns_from_origin = INT64_MIN;
- g_hash_table_insert(trimmer_it->stream_states,
- (void *) stream, sstate);
- }
- }
-
/* Retrieve the message's time */
- ret = get_msg_ns_from_origin(msg, &ns_from_origin, &skip);
+ ret = get_msg_ns_from_origin(msg, &ns_from_origin, &has_ns_from_origin);
if (G_UNLIKELY(ret)) {
status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
goto end;
}
- if (G_LIKELY(sstate)) {
+ if (G_LIKELY(stream)) {
/* Message associated to a stream */
- status = handle_message_with_stream_state(trimmer_it, msg,
- sstate, ns_from_origin, reached_end);
+ status = handle_message_with_stream(trimmer_it, msg,
+ stream, has_ns_from_origin ? &ns_from_origin : NULL, reached_end);
/*
* handle_message_with_stream_state() unconditionally