enum muxer_msg_iter_clock_class_expectation {
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
}
static
-struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
- struct muxer_msg_iter *muxer_msg_iter,
+int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
bt_self_component_port_input_message_iterator *self_msg_iter)
{
+ int ret = 0;
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
g_new0(struct muxer_upstream_msg_iter, 1);
if (!muxer_upstream_msg_iter) {
BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
- goto end;
+ goto error;
}
muxer_upstream_msg_iter->msg_iter = self_msg_iter;
muxer_upstream_msg_iter->msgs = g_queue_new();
if (!muxer_upstream_msg_iter->msgs) {
BT_LOGE_STR("Failed to allocate a GQueue.");
- goto end;
+ goto error;
}
g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
muxer_upstream_msg_iter, muxer_msg_iter,
self_msg_iter);
+ goto end;
+
+error:
+ g_free(muxer_upstream_msg_iter);
+ ret = -1;
+
end:
- return muxer_upstream_msg_iter;
+ return ret;
}
static
const bt_message *msg, int64_t last_returned_ts_ns,
int64_t *ts_ns)
{
- const bt_clock_class *clock_class = NULL;
const bt_clock_snapshot *clock_snapshot = NULL;
int ret = 0;
- const unsigned char *cc_uuid;
- const char *cc_name;
- bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+ const bt_stream_class *stream_class = NULL;
+ bt_message_type msg_type;
BT_ASSERT(msg);
BT_ASSERT(ts_ns);
-
BT_LOGV("Getting message's timestamp: "
"muxer-msg-iter-addr=%p, msg-addr=%p, "
"last-returned-ts=%" PRId64,
muxer_msg_iter, msg, last_returned_ts_ns);
- switch (bt_message_get_type(msg)) {
- case BT_MESSAGE_TYPE_EVENT:
- clock_class =
- bt_message_event_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
- goto no_clock_snapshot;
- }
+ if (unlikely(muxer_msg_iter->clock_class_expectation ==
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
+ *ts_ns = last_returned_ts_ns;
+ goto end;
+ }
- cs_state = bt_message_event_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
+ msg_type = bt_message_get_type(msg);
+
+ if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_packet_borrow_stream_const(
+ bt_message_packet_beginning_borrow_packet_const(
+ msg)));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_packet_borrow_stream_const(
+ bt_message_packet_end_borrow_packet_const(
+ msg)));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_message_discarded_events_borrow_stream_const(msg));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_message_discarded_packets_borrow_stream_const(msg));
+ }
+
+ switch (msg_type) {
+ case BT_MESSAGE_TYPE_EVENT:
+ BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const(
+ msg));
+ clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
+ msg);
break;
case BT_MESSAGE_TYPE_PACKET_BEGINNING:
- bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
+ if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
+ stream_class)) {
+ clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+ msg);
+ } else {
goto no_clock_snapshot;
}
- cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
break;
case BT_MESSAGE_TYPE_PACKET_END:
- bt_message_packet_end_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
+ if (bt_stream_class_packets_have_end_default_clock_snapshot(
+ stream_class)) {
+ clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
+ msg);
+ } else {
goto no_clock_snapshot;
}
- cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
break;
case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
- bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
+ if (bt_stream_class_discarded_events_have_default_clock_snapshots(
+ stream_class)) {
+ clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
+ msg);
+ } else {
goto no_clock_snapshot;
}
- cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
- msg, &clock_snapshot);
break;
case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
- bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
+ if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
+ stream_class)) {
+ clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
+ msg);
+ } else {
goto no_clock_snapshot;
}
- cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
- msg, &clock_snapshot);
break;
case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
- bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
- goto no_clock_snapshot;
- }
-
+ BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+ msg));
sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
msg, &clock_snapshot);
if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
break;
case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
- bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
- msg);
- if (!clock_class) {
- goto no_clock_snapshot;
- }
-
+ BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+ msg));
sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
msg, &clock_snapshot);
if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
break;
case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
- cs_state =
- bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
+ clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg);
break;
default:
/* All the other messages have a higher priority */
goto end;
}
- if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
- BT_LOGE_STR("Unsupported unknown clock snapshot.");
- ret = -1;
- goto end;
+ ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+ if (ret) {
+ BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+ "clock-snapshot-addr=%p", clock_snapshot);
+ goto error;
}
- clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+ goto end;
+
+no_clock_snapshot:
+ BT_LOGV_STR("Message's default clock snapshot is missing: "
+ "using the last returned timestamp.");
+ *ts_ns = last_returned_ts_ns;
+ goto end;
+
+error:
+ ret = -1;
+
+end:
+ if (ret == 0) {
+ BT_LOGV("Found message's timestamp: "
+ "muxer-msg-iter-addr=%p, msg-addr=%p, "
+ "last-returned-ts=%" PRId64 ", ts=%" PRId64,
+ muxer_msg_iter, msg, last_returned_ts_ns,
+ *ts_ns);
+ }
+
+ return ret;
+}
+
+static inline
+int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+ struct muxer_comp *muxer_comp,
+ const bt_clock_class *clock_class)
+{
+ int ret = 0;
+ const unsigned char *cc_uuid;
+ const char *cc_name;
+
BT_ASSERT(clock_class);
cc_uuid = bt_clock_class_get_uuid(clock_class);
cc_name = bt_clock_class_get_name(clock_class);
goto error;
}
break;
+ case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
+ BT_LOGE("Expecting no clock class, but got one: "
+ "clock-class-addr=%p, clock-class-name=\"%s\"",
+ clock_class, cc_name);
+ goto error;
default:
/* Unexpected */
BT_LOGF("Unexpected clock class expectation: "
}
}
- ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
- if (ret) {
- BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
- "clock-snapshot-addr=%p", clock_snapshot);
- goto error;
- }
-
- goto end;
-
-no_clock_snapshot:
- BT_LOGV_STR("Message's default clock snapshot is missing: "
- "using the last returned timestamp.");
- *ts_ns = last_returned_ts_ns;
goto end;
error:
ret = -1;
end:
- if (ret == 0) {
- BT_LOGV("Found message's timestamp: "
- "muxer-msg-iter-addr=%p, msg-addr=%p, "
- "last-returned-ts=%" PRId64 ", ts=%" PRId64,
- muxer_msg_iter, msg, last_returned_ts_ns,
- *ts_ns);
+ return ret;
+}
+
+static inline
+int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+ struct muxer_comp *muxer_comp, const bt_stream *stream)
+{
+ int ret = 0;
+ const bt_stream_class *stream_class =
+ bt_stream_borrow_class_const(stream);
+ const bt_clock_class *clock_class =
+ bt_stream_class_borrow_default_clock_class_const(stream_class);
+
+ if (!clock_class) {
+ if (muxer_msg_iter->clock_class_expectation ==
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
+ /* Expect no clock class */
+ muxer_msg_iter->clock_class_expectation =
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
+ } else {
+ BT_LOGE("Expecting stream class with a default clock class: "
+ "stream-class-addr=%p, stream-class-name=\"%s\", "
+ "stream-class-id=%" PRIu64,
+ stream_class, bt_stream_class_get_name(stream_class),
+ bt_stream_class_get_id(stream_class));
+ ret = -1;
+ }
+
+ goto end;
}
+ ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
+
+end:
return ret;
}
BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0);
msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
BT_ASSERT(msg);
+
+ if (unlikely(bt_message_get_type(msg) ==
+ BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
+ ret = validate_new_stream_clock_class(
+ muxer_msg_iter, muxer_comp,
+ bt_message_stream_beginning_borrow_stream_const(
+ msg));
+ if (ret) {
+ /*
+ * validate_new_stream_clock_class() logs
+ * errors.
+ */
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ } else if (unlikely(bt_message_get_type(msg) ==
+ BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
+ const bt_clock_snapshot *cs;
+
+ cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg);
+ ret = validate_clock_class(muxer_msg_iter, muxer_comp,
+ bt_clock_snapshot_borrow_clock_class_const(cs));
+ if (ret) {
+ /* validate_clock_class() logs errors */
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ }
+
ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
if (ret) {
for (i = 0; i < count; i++) {
bt_self_component_port_input_message_iterator *upstream_msg_iter;
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
bt_self_component_port_input *self_port =
bt_self_component_filter_borrow_input_port_by_index(
muxer_comp->self_comp, i);
goto end;
}
- muxer_upstream_msg_iter =
- muxer_msg_iter_add_upstream_msg_iter(
- muxer_msg_iter, upstream_msg_iter);
+ ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
+ upstream_msg_iter);
bt_self_component_port_input_message_iterator_put_ref(
upstream_msg_iter);
- if (!muxer_upstream_msg_iter) {
+ if (ret) {
/* muxer_msg_iter_add_upstream_msg_iter() logs errors */
- ret = -1;
goto end;
}
}
{
struct muxer_msg_iter *muxer_msg_iter =
bt_self_message_iterator_get_data(self_msg_iter);
- int status;
+ bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
uint64_t i;
/* Seek all ended upstream iterators first */
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
end:
- return status;
+ return (bt_self_message_iterator_status) status;
}