#include <stdlib.h>
#include <string.h>
+#include "muxer.h"
+
#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
struct muxer_comp {
- /*
- * Array of struct
- * bt_self_message_iterator *
- * (weak refs)
- */
- GPtrArray *muxer_msg_iters;
-
/* Weak ref */
bt_self_component_filter *self_comp;
enum muxer_msg_iter_clock_class_expectation {
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY = 0,
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID,
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID,
* another data structure is faster than this for our typical
* use cases.
*/
- GPtrArray *muxer_upstream_msg_iters;
+ GPtrArray *active_muxer_upstream_msg_iters;
/*
- * List of "recently" connected input ports (weak) to
- * handle by this muxer message iterator.
- * muxer_port_connected() adds entries to this list, and the
- * entries are removed when a message iterator is created
- * on the port's connection and put into
- * muxer_upstream_msg_iters above by
- * muxer_msg_iter_handle_newly_connected_ports().
+ * Array of struct muxer_upstream_msg_iter * (owned by this).
+ *
+ * We move ended message iterators from
+ * `active_muxer_upstream_msg_iters` to this array so as to be
+ * able to restore them when seeking.
*/
- GList *newly_connected_self_ports;
+ GPtrArray *ended_muxer_upstream_msg_iters;
/* Last time returned in a message */
int64_t last_returned_ts_ns;
unsigned char expected_clock_class_uuid[BABELTRACE_UUID_LEN];
};
+static
+void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
+{
+ const bt_message *msg;
+
+ while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
+ bt_message_put_ref(msg);
+ }
+}
+
static
void destroy_muxer_upstream_msg_iter(
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter,
muxer_upstream_msg_iter->msgs->length);
- bt_self_component_port_input_message_iterator_put_ref(muxer_upstream_msg_iter->msg_iter);
+ bt_self_component_port_input_message_iterator_put_ref(
+ muxer_upstream_msg_iter->msg_iter);
if (muxer_upstream_msg_iter->msgs) {
- const bt_message *msg;
-
- while ((msg = g_queue_pop_head(
- muxer_upstream_msg_iter->msgs))) {
- bt_message_put_ref(msg);
- }
-
+ empty_message_queue(muxer_upstream_msg_iter);
g_queue_free(muxer_upstream_msg_iter->msgs);
}
}
static
-struct muxer_upstream_msg_iter *muxer_msg_iter_add_upstream_msg_iter(
- struct muxer_msg_iter *muxer_msg_iter,
+int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
bt_self_component_port_input_message_iterator *self_msg_iter)
{
+ int ret = 0;
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
g_new0(struct muxer_upstream_msg_iter, 1);
if (!muxer_upstream_msg_iter) {
BT_LOGE_STR("Failed to allocate one muxer's upstream message iterator wrapper.");
- goto end;
+ goto error;
}
muxer_upstream_msg_iter->msg_iter = self_msg_iter;
muxer_upstream_msg_iter->msgs = g_queue_new();
if (!muxer_upstream_msg_iter->msgs) {
BT_LOGE_STR("Failed to allocate a GQueue.");
- goto end;
+ goto error;
}
- g_ptr_array_add(muxer_msg_iter->muxer_upstream_msg_iters,
+ g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
muxer_upstream_msg_iter);
BT_LOGD("Added muxer's upstream message iterator wrapper: "
"addr=%p, muxer-msg-iter-addr=%p, msg-iter-addr=%p",
muxer_upstream_msg_iter, muxer_msg_iter,
self_msg_iter);
+ goto end;
+
+error:
+ g_free(muxer_upstream_msg_iter);
+ ret = -1;
+
end:
- return muxer_upstream_msg_iter;
+ return ret;
}
static
-enum bt_self_component_status ensure_available_input_port(
+bt_self_component_status add_available_input_port(
bt_self_component_filter *self_comp)
{
struct muxer_comp *muxer_comp = bt_self_component_get_data(
bt_self_component_filter_as_self_component(self_comp));
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+ bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
GString *port_name = NULL;
BT_ASSERT(muxer_comp);
-
- if (muxer_comp->available_input_ports >= 1) {
- goto end;
- }
-
port_name = g_string_new("in");
if (!port_name) {
BT_LOGE_STR("Failed to allocate a GString.");
BT_LOGD("Added one input port to muxer component: "
"port-name=\"%s\", comp-addr=%p",
port_name->str, self_comp);
+
end:
if (port_name) {
g_string_free(port_name, TRUE);
}
static
-enum bt_self_component_status create_output_port(
+bt_self_component_status create_output_port(
bt_self_component_filter *self_comp)
{
return bt_self_component_filter_add_output_port(
return;
}
- BT_LOGD("Destroying muxer component: muxer-comp-addr=%p, "
- "muxer-msg-iter-count=%u", muxer_comp,
- muxer_comp->muxer_msg_iters ?
- muxer_comp->muxer_msg_iters->len : 0);
-
- if (muxer_comp->muxer_msg_iters) {
- g_ptr_array_free(muxer_comp->muxer_msg_iters, TRUE);
- }
-
g_free(muxer_comp);
}
}
BT_HIDDEN
-enum bt_self_component_status muxer_init(
+bt_self_component_status muxer_init(
bt_self_component_filter *self_comp,
- bt_value *params, void *init_data)
+ const bt_value *params, void *init_data)
{
int ret;
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+ bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
BT_LOGD("Initializing muxer component: "
goto error;
}
- muxer_comp->muxer_msg_iters = g_ptr_array_new();
- if (!muxer_comp->muxer_msg_iters) {
- BT_LOGE_STR("Failed to allocate a GPtrArray.");
- goto error;
- }
-
muxer_comp->self_comp = self_comp;
bt_self_component_set_data(
bt_self_component_filter_as_self_component(self_comp),
muxer_comp);
- status = ensure_available_input_port(self_comp);
+ status = add_available_input_port(self_comp);
if (status != BT_SELF_COMPONENT_STATUS_OK) {
BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
"muxer-comp-addr=%p, status=%s",
static
bt_self_component_port_input_message_iterator *
-create_msg_iter_on_input_port(
- bt_self_component_port_input *self_port, int *ret)
+create_msg_iter_on_input_port(bt_self_component_port_input *self_port)
{
const bt_port *port = bt_self_component_port_as_port(
bt_self_component_port_input_as_self_component_port(
bt_self_component_port_input_message_iterator *msg_iter =
NULL;
- BT_ASSERT(ret);
- *ret = 0;
BT_ASSERT(port);
BT_ASSERT(bt_port_is_connected(port));
BT_LOGE("Cannot create upstream message iterator on input port: "
"port-addr=%p, port-name=\"%s\"",
port, bt_port_get_name(port));
- *ret = -1;
goto end;
}
}
static
-enum bt_message_iterator_status muxer_upstream_msg_iter_next(
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+bt_self_message_iterator_status muxer_upstream_msg_iter_next(
+ struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+ bool *is_ended)
{
- enum bt_message_iterator_status status;
+ bt_self_message_iterator_status status;
+ bt_message_iterator_status input_port_iter_status;
bt_message_array_const msgs;
uint64_t i;
uint64_t count;
"muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter);
- status = bt_self_component_port_input_message_iterator_next(
+ input_port_iter_status = bt_self_component_port_input_message_iterator_next(
muxer_upstream_msg_iter->msg_iter, &msgs, &count);
BT_LOGV("Upstream message iterator's \"next\" method returned: "
- "status=%s", bt_message_iterator_status_string(status));
+ "status=%s", bt_message_iterator_status_string(input_port_iter_status));
- switch (status) {
+ switch (input_port_iter_status) {
case BT_MESSAGE_ITERATOR_STATUS_OK:
/*
* Message iterator's current message is
g_queue_push_tail(muxer_upstream_msg_iter->msgs,
(void *) msgs[i]);
}
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
break;
case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
/*
* valid anymore. Return
* BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately.
*/
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
break;
case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */
- case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
/*
* Message iterator reached the end: release it. It
* won't be considered again to find the youngest
* message.
*/
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
- status = BT_MESSAGE_ITERATOR_STATUS_OK;
+ *is_ended = true;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
break;
default:
/* Error or unsupported status code */
BT_LOGE("Error or unsupported status code: "
- "status-code=%d", status);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ "status-code=%d", input_port_iter_status);
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
break;
}
return status;
}
-static
-int muxer_msg_iter_handle_newly_connected_ports(
- struct muxer_msg_iter *muxer_msg_iter)
-{
- int ret = 0;
-
- BT_LOGV("Handling newly connected ports: "
- "muxer-msg-iter-addr=%p", muxer_msg_iter);
-
- /*
- * Here we create one upstream message iterator for each
- * newly connected port. We do NOT perform an initial "next" on
- * those new upstream message iterators: they are
- * invalidated, to be validated later. The list of newly
- * connected ports to handle here is updated by
- * muxer_port_connected().
- */
- while (true) {
- GList *node = muxer_msg_iter->newly_connected_self_ports;
- bt_self_component_port_input *self_port;
- const bt_port *port;
- bt_self_component_port_input_message_iterator *
- upstream_msg_iter = NULL;
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter;
-
- if (!node) {
- break;
- }
-
- self_port = node->data;
- port = bt_self_component_port_as_port(
- bt_self_component_port_input_as_self_component_port(
- (self_port)));
- BT_ASSERT(port);
-
- if (!bt_port_is_connected(port)) {
- /*
- * Looks like this port is not connected
- * anymore: we can't create an upstream
- * message iterator on its (non-existing)
- * connection in this case.
- */
- goto remove_node;
- }
-
- upstream_msg_iter = create_msg_iter_on_input_port(
- self_port, &ret);
- if (ret) {
- /* create_msg_iter_on_input_port() logs errors */
- BT_ASSERT(!upstream_msg_iter);
- goto error;
- }
-
- muxer_upstream_msg_iter =
- muxer_msg_iter_add_upstream_msg_iter(
- muxer_msg_iter, upstream_msg_iter);
- BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(upstream_msg_iter);
- if (!muxer_upstream_msg_iter) {
- /*
- * muxer_msg_iter_add_upstream_msg_iter()
- * logs errors.
- */
- goto error;
- }
-
-remove_node:
- bt_self_component_port_input_message_iterator_put_ref(upstream_msg_iter);
- muxer_msg_iter->newly_connected_self_ports =
- g_list_delete_link(
- muxer_msg_iter->newly_connected_self_ports,
- node);
- }
-
- goto end;
-
-error:
- if (ret >= 0) {
- ret = -1;
- }
-
-end:
- return ret;
-}
-
static
int get_msg_ts_ns(struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
const bt_message *msg, int64_t last_returned_ts_ns,
int64_t *ts_ns)
{
- const bt_clock_class *clock_class = NULL;
const bt_clock_snapshot *clock_snapshot = NULL;
- const bt_event *event = NULL;
int ret = 0;
- const unsigned char *cc_uuid;
- const char *cc_name;
- enum bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+ bt_message_stream_activity_clock_snapshot_state sa_cs_state;
+ const bt_stream_class *stream_class = NULL;
+ bt_message_type msg_type;
BT_ASSERT(msg);
BT_ASSERT(ts_ns);
-
BT_LOGV("Getting message's timestamp: "
"muxer-msg-iter-addr=%p, msg-addr=%p, "
"last-returned-ts=%" PRId64,
muxer_msg_iter, msg, last_returned_ts_ns);
- switch (bt_message_get_type(msg)) {
+ if (unlikely(muxer_msg_iter->clock_class_expectation ==
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE)) {
+ *ts_ns = last_returned_ts_ns;
+ goto end;
+ }
+
+ msg_type = bt_message_get_type(msg);
+
+ if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_BEGINNING)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_packet_borrow_stream_const(
+ bt_message_packet_beginning_borrow_packet_const(
+ msg)));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_PACKET_END)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_packet_borrow_stream_const(
+ bt_message_packet_end_borrow_packet_const(
+ msg)));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_EVENTS)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_message_discarded_events_borrow_stream_const(msg));
+ } else if (unlikely(msg_type == BT_MESSAGE_TYPE_DISCARDED_PACKETS)) {
+ stream_class = bt_stream_borrow_class_const(
+ bt_message_discarded_packets_borrow_stream_const(msg));
+ }
+
+ switch (msg_type) {
case BT_MESSAGE_TYPE_EVENT:
- event = bt_message_event_borrow_event_const(msg);
- BT_ASSERT(event);
- cs_state = bt_event_borrow_default_clock_snapshot_const(event,
- &clock_snapshot);
+ BT_ASSERT(bt_message_event_borrow_stream_class_default_clock_class_const(
+ msg));
+ clock_snapshot = bt_message_event_borrow_default_clock_snapshot_const(
+ msg);
break;
+ case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ if (bt_stream_class_packets_have_beginning_default_clock_snapshot(
+ stream_class)) {
+ clock_snapshot = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+ msg);
+ } else {
+ goto no_clock_snapshot;
+ }
- case BT_MESSAGE_TYPE_INACTIVITY:
- cs_state =
- bt_message_inactivity_borrow_default_clock_snapshot_const(
- msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_PACKET_END:
+ if (bt_stream_class_packets_have_end_default_clock_snapshot(
+ stream_class)) {
+ clock_snapshot = bt_message_packet_end_borrow_default_clock_snapshot_const(
+ msg);
+ } else {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ if (bt_stream_class_discarded_events_have_default_clock_snapshots(
+ stream_class)) {
+ clock_snapshot = bt_message_discarded_events_borrow_beginning_default_clock_snapshot_const(
+ msg);
+ } else {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ if (bt_stream_class_discarded_packets_have_default_clock_snapshots(
+ stream_class)) {
+ clock_snapshot = bt_message_discarded_packets_borrow_beginning_default_clock_snapshot_const(
+ msg);
+ } else {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ BT_ASSERT(bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+ msg));
+ sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ BT_ASSERT(bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+ msg));
+ sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY:
+ clock_snapshot = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg);
break;
default:
/* All the other messages have a higher priority */
goto end;
}
- if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
- BT_LOGE_STR("Unsupported unknown clock snapshot.");
- ret = -1;
- goto end;
+ ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
+ if (ret) {
+ BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
+ "clock-snapshot-addr=%p", clock_snapshot);
+ goto error;
}
- /*
- * If the clock snapshot is missing, then we consider that this
- * message has no time. In this case it's always the
- * youngest.
- */
- if (!clock_snapshot) {
- BT_LOGV_STR("Message's default clock snapshot is missing: "
- "using the last returned timestamp.");
- *ts_ns = last_returned_ts_ns;
- goto end;
+ goto end;
+
+no_clock_snapshot:
+ BT_LOGV_STR("Message's default clock snapshot is missing: "
+ "using the last returned timestamp.");
+ *ts_ns = last_returned_ts_ns;
+ goto end;
+
+error:
+ ret = -1;
+
+end:
+ if (ret == 0) {
+ BT_LOGV("Found message's timestamp: "
+ "muxer-msg-iter-addr=%p, msg-addr=%p, "
+ "last-returned-ts=%" PRId64 ", ts=%" PRId64,
+ muxer_msg_iter, msg, last_returned_ts_ns,
+ *ts_ns);
}
- clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
+ return ret;
+}
+
+static inline
+int validate_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+ struct muxer_comp *muxer_comp,
+ const bt_clock_class *clock_class)
+{
+ int ret = 0;
+ const unsigned char *cc_uuid;
+ const char *cc_name;
+
BT_ASSERT(clock_class);
cc_uuid = bt_clock_class_get_uuid(clock_class);
cc_name = bt_clock_class_get_name(clock_class);
* the iterator without a true
* `assume-absolute-clock-classes` parameter.
*/
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
/* Expect absolute clock classes */
muxer_msg_iter->clock_class_expectation =
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
if (!muxer_comp->assume_absolute_clock_classes) {
switch (muxer_msg_iter->clock_class_expectation) {
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
- if (!bt_clock_class_is_absolute(clock_class)) {
+ if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting an absolute clock class, "
"but got a non-absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
}
break;
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting a non-absolute clock class with no UUID, "
"but got an absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
}
break;
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
"but got an absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
goto error;
}
break;
+ case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE:
+ BT_LOGE("Expecting no clock class, but got one: "
+ "clock-class-addr=%p, clock-class-name=\"%s\"",
+ clock_class, cc_name);
+ goto error;
default:
/* Unexpected */
BT_LOGF("Unexpected clock class expectation: "
}
}
- ret = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, ts_ns);
- if (ret) {
- BT_LOGE("Cannot get nanoseconds from Epoch of clock snapshot: "
- "clock-snapshot-addr=%p", clock_snapshot);
- goto error;
- }
-
goto end;
error:
ret = -1;
end:
- if (ret == 0) {
- BT_LOGV("Found message's timestamp: "
- "muxer-msg-iter-addr=%p, msg-addr=%p, "
- "last-returned-ts=%" PRId64 ", ts=%" PRId64,
- muxer_msg_iter, msg, last_returned_ts_ns,
- *ts_ns);
+ return ret;
+}
+
+static inline
+int validate_new_stream_clock_class(struct muxer_msg_iter *muxer_msg_iter,
+ struct muxer_comp *muxer_comp, const bt_stream *stream)
+{
+ int ret = 0;
+ const bt_stream_class *stream_class =
+ bt_stream_borrow_class_const(stream);
+ const bt_clock_class *clock_class =
+ bt_stream_class_borrow_default_clock_class_const(stream_class);
+
+ if (!clock_class) {
+ if (muxer_msg_iter->clock_class_expectation ==
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY) {
+ /* Expect no clock class */
+ muxer_msg_iter->clock_class_expectation =
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NONE;
+ } else {
+ BT_LOGE("Expecting stream class with a default clock class: "
+ "stream-class-addr=%p, stream-class-name=\"%s\", "
+ "stream-class-id=%" PRIu64,
+ stream_class, bt_stream_class_get_name(stream_class),
+ bt_stream_class_get_id(stream_class));
+ ret = -1;
+ }
+
+ goto end;
}
+ ret = validate_clock_class(muxer_msg_iter, muxer_comp, clock_class);
+
+end:
return ret;
}
* This function does NOT:
*
* * Update any upstream message iterator.
- * * Check for newly connected ports.
* * Check the upstream message iterators to retry.
*
* On sucess, this function sets *muxer_upstream_msg_iter to the
* the youngest, and sets *ts_ns to its time.
*/
static
-enum bt_message_iterator_status
+bt_self_message_iterator_status
muxer_msg_iter_youngest_upstream_msg_iter(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
size_t i;
int ret;
int64_t youngest_ts_ns = INT64_MAX;
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
BT_ASSERT(muxer_comp);
BT_ASSERT(muxer_msg_iter);
BT_ASSERT(muxer_upstream_msg_iter);
*muxer_upstream_msg_iter = NULL;
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
const bt_message *msg;
struct muxer_upstream_msg_iter *cur_muxer_upstream_msg_iter =
- g_ptr_array_index(muxer_msg_iter->muxer_upstream_msg_iters, i);
+ g_ptr_array_index(
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
+ i);
int64_t msg_ts_ns;
if (!cur_muxer_upstream_msg_iter->msg_iter) {
BT_ASSERT(cur_muxer_upstream_msg_iter->msgs->length > 0);
msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
BT_ASSERT(msg);
+
+ if (unlikely(bt_message_get_type(msg) ==
+ BT_MESSAGE_TYPE_STREAM_BEGINNING)) {
+ ret = validate_new_stream_clock_class(
+ muxer_msg_iter, muxer_comp,
+ bt_message_stream_beginning_borrow_stream_const(
+ msg));
+ if (ret) {
+ /*
+ * validate_new_stream_clock_class() logs
+ * errors.
+ */
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ } else if (unlikely(bt_message_get_type(msg) ==
+ BT_MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY)) {
+ const bt_clock_snapshot *cs;
+
+ cs = bt_message_message_iterator_inactivity_borrow_default_clock_snapshot_const(
+ msg);
+ ret = validate_clock_class(muxer_msg_iter, muxer_comp,
+ bt_clock_snapshot_borrow_clock_class_const(cs));
+ if (ret) {
+ /* validate_clock_class() logs errors */
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
+ goto end;
+ }
+ }
+
ret = get_msg_ts_ns(muxer_comp, muxer_msg_iter, msg,
muxer_msg_iter->last_returned_ts_ns, &msg_ts_ns);
if (ret) {
/* get_msg_ts_ns() logs errors */
*muxer_upstream_msg_iter = NULL;
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
}
if (!*muxer_upstream_msg_iter) {
- status = BT_MESSAGE_ITERATOR_STATUS_END;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
*ts_ns = INT64_MIN;
}
}
static
-enum bt_message_iterator_status validate_muxer_upstream_msg_iter(
- struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
+bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
+ struct muxer_upstream_msg_iter *muxer_upstream_msg_iter,
+ bool *is_ended)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
BT_LOGV("Validating muxer's upstream message iterator wrapper: "
"muxer-upstream-msg-iter-wrap-addr=%p",
}
/* muxer_upstream_msg_iter_next() logs details/errors */
- status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter);
+ status = muxer_upstream_msg_iter_next(muxer_upstream_msg_iter,
+ is_ended);
end:
return status;
}
static
-enum bt_message_iterator_status validate_muxer_upstream_msg_iters(
+bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
struct muxer_msg_iter *muxer_msg_iter)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
size_t i;
BT_LOGV("Validating muxer's upstream message iterator wrappers: "
"muxer-msg-iter-addr=%p", muxer_msg_iter);
- for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
+ bool is_ended = false;
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter =
g_ptr_array_index(
- muxer_msg_iter->muxer_upstream_msg_iters,
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
i);
status = validate_muxer_upstream_msg_iter(
- muxer_upstream_msg_iter);
- if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ muxer_upstream_msg_iter, &is_ended);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
if (status < 0) {
BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
"muxer-msg-iter-addr=%p, "
}
/*
- * Remove this muxer upstream message iterator
- * if it's ended or canceled.
+ * Move this muxer upstream message iterator to the
+ * array of ended iterators if it's ended.
*/
- if (!muxer_upstream_msg_iter->msg_iter) {
+ if (unlikely(is_ended)) {
+ BT_LOGV("Muxer's upstream message iterator wrapper: ended or canceled: "
+ "muxer-msg-iter-addr=%p, "
+ "muxer-upstream-msg-iter-wrap-addr=%p",
+ muxer_msg_iter, muxer_upstream_msg_iter);
+ g_ptr_array_add(
+ muxer_msg_iter->ended_muxer_upstream_msg_iters,
+ muxer_upstream_msg_iter);
+ muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i] = NULL;
+
/*
* Use g_ptr_array_remove_fast() because the
* order of those elements is not important.
*/
- BT_LOGV("Removing muxer's upstream message iterator wrapper: ended or canceled: "
- "muxer-msg-iter-addr=%p, "
- "muxer-upstream-msg-iter-wrap-addr=%p",
- muxer_msg_iter, muxer_upstream_msg_iter);
g_ptr_array_remove_index_fast(
- muxer_msg_iter->muxer_upstream_msg_iters,
+ muxer_msg_iter->active_muxer_upstream_msg_iters,
i);
i--;
}
}
static inline
-enum bt_message_iterator_status muxer_msg_iter_do_next_one(
+bt_self_message_iterator_status muxer_msg_iter_do_next_one(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
const bt_message **msg)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status;
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
int64_t next_return_ts;
- while (true) {
- int ret = muxer_msg_iter_handle_newly_connected_ports(
- muxer_msg_iter);
-
- if (ret) {
- BT_LOGE("Cannot handle newly connected input ports for muxer's message iterator: "
- "muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
- "ret=%d",
- muxer_comp, muxer_msg_iter, ret);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
- goto end;
- }
-
- status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
- if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
- /* validate_muxer_upstream_msg_iters() logs details */
- goto end;
- }
-
- /*
- * At this point, we know that all the existing upstream
- * message iterators are valid. However the
- * operations to validate them (during
- * validate_muxer_upstream_msg_iters()) may have
- * connected new ports. If no ports were connected
- * during this operation, exit the loop.
- */
- if (!muxer_msg_iter->newly_connected_self_ports) {
- BT_LOGV("Not breaking this loop: muxer's message iterator still has newly connected input ports to handle: "
- "muxer-comp-addr=%p", muxer_comp);
- break;
- }
+ status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
+ /* validate_muxer_upstream_msg_iters() logs details */
+ goto end;
}
- BT_ASSERT(!muxer_msg_iter->newly_connected_self_ports);
-
/*
* At this point we know that all the existing upstream
* message iterators are valid. We can find the one,
status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
muxer_msg_iter, &muxer_upstream_msg_iter,
&next_return_ts);
- if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END ||
- status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) {
+ if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
if (status < 0) {
BT_LOGE("Cannot find the youngest upstream message iterator wrapper: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_common_self_message_iterator_status_string(status));
} else {
BT_LOGV("Cannot find the youngest upstream message iterator wrapper: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_common_self_message_iterator_status_string(status));
}
goto end;
"last-returned-ts=%" PRId64,
muxer_msg_iter, next_return_ts,
muxer_msg_iter->last_returned_ts_ns);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
"muxer-upstream-msg-iter-wrap-addr=%p, "
"ts=%" PRId64,
muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
- BT_ASSERT(status == BT_MESSAGE_ITERATOR_STATUS_OK);
+ BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK);
BT_ASSERT(muxer_upstream_msg_iter);
/*
}
static
-enum bt_message_iterator_status muxer_msg_iter_do_next(
+bt_self_message_iterator_status muxer_msg_iter_do_next(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
bt_message_array_const msgs, uint64_t capacity,
uint64_t *count)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
uint64_t i = 0;
- while (i < capacity && status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+ while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
status = muxer_msg_iter_do_next_one(muxer_comp,
muxer_msg_iter, &msgs[i]);
- if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+ if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
i++;
}
}
* message, in which case we'll return it.
*/
*count = i;
- status = BT_MESSAGE_ITERATOR_STATUS_OK;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
}
return status;
BT_LOGD("Destroying muxer component's message iterator: "
"muxer-msg-iter-addr=%p", muxer_msg_iter);
- if (muxer_msg_iter->muxer_upstream_msg_iters) {
- BT_LOGD_STR("Destroying muxer's upstream message iterator wrappers.");
+ if (muxer_msg_iter->active_muxer_upstream_msg_iters) {
+ BT_LOGD_STR("Destroying muxer's active upstream message iterator wrappers.");
g_ptr_array_free(
- muxer_msg_iter->muxer_upstream_msg_iters, TRUE);
+ muxer_msg_iter->active_muxer_upstream_msg_iters, TRUE);
+ }
+
+ if (muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+ BT_LOGD_STR("Destroying muxer's ended upstream message iterator wrappers.");
+ g_ptr_array_free(
+ muxer_msg_iter->ended_muxer_upstream_msg_iters, TRUE);
}
- g_list_free(muxer_msg_iter->newly_connected_self_ports);
g_free(muxer_msg_iter);
}
static
-int muxer_msg_iter_init_newly_connected_ports(struct muxer_comp *muxer_comp,
+int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter)
{
int64_t count;
int64_t i;
int ret = 0;
- /*
- * Add the connected input ports to this muxer message
- * iterator's list of newly connected ports. They will be
- * handled by muxer_msg_iter_handle_newly_connected_ports().
- */
count = bt_component_filter_get_input_port_count(
bt_self_component_filter_as_component_filter(
muxer_comp->self_comp));
}
for (i = 0; i < count; i++) {
+ bt_self_component_port_input_message_iterator *upstream_msg_iter;
bt_self_component_port_input *self_port =
bt_self_component_filter_borrow_input_port_by_index(
muxer_comp->self_comp, i);
BT_ASSERT(port);
if (!bt_port_is_connected(port)) {
- BT_LOGD("Skipping input port: not connected: "
- "muxer-comp-addr=%p, port-addr=%p, port-name\"%s\"",
- muxer_comp, port, bt_port_get_name(port));
+ /* Skip non-connected port */
continue;
}
- muxer_msg_iter->newly_connected_self_ports =
- g_list_append(
- muxer_msg_iter->newly_connected_self_ports,
- self_port);
- if (!muxer_msg_iter->newly_connected_self_ports) {
- BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
- "port-addr=%p, port-name=\"%s\", "
- "muxer-msg-iter-addr=%p", port,
- bt_port_get_name(port), muxer_msg_iter);
+ upstream_msg_iter = create_msg_iter_on_input_port(self_port);
+ if (!upstream_msg_iter) {
+ /* create_msg_iter_on_input_port() logs errors */
+ BT_ASSERT(!upstream_msg_iter);
ret = -1;
goto end;
}
- BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
- "port-addr=%p, port-name=\"%s\", "
- "muxer-msg-iter-addr=%p", port,
- bt_port_get_name(port), muxer_msg_iter);
+ ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter,
+ upstream_msg_iter);
+ bt_self_component_port_input_message_iterator_put_ref(
+ upstream_msg_iter);
+ if (ret) {
+ /* muxer_msg_iter_add_upstream_msg_iter() logs errors */
+ goto end;
+ }
}
end:
}
BT_HIDDEN
-enum bt_self_message_iterator_status muxer_msg_iter_init(
+bt_self_message_iterator_status muxer_msg_iter_init(
bt_self_message_iterator *self_msg_iter,
bt_self_component_filter *self_comp,
bt_self_component_port_output *port)
{
struct muxer_comp *muxer_comp = NULL;
struct muxer_msg_iter *muxer_msg_iter = NULL;
- enum bt_self_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
int ret;
muxer_comp = bt_self_component_get_data(
}
muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
- muxer_msg_iter->muxer_upstream_msg_iters =
+ muxer_msg_iter->active_muxer_upstream_msg_iters =
g_ptr_array_new_with_free_func(
(GDestroyNotify) destroy_muxer_upstream_msg_iter);
- if (!muxer_msg_iter->muxer_upstream_msg_iters) {
+ if (!muxer_msg_iter->active_muxer_upstream_msg_iters) {
BT_LOGE_STR("Failed to allocate a GPtrArray.");
goto error;
}
- /*
- * Add the muxer message iterator to the component's array
- * of muxer message iterators here because
- * muxer_msg_iter_init_newly_connected_ports() can cause
- * muxer_port_connected() to be called, which adds the newly
- * connected port to each muxer message iterator's list of
- * newly connected ports.
- */
- g_ptr_array_add(muxer_comp->muxer_msg_iters, muxer_msg_iter);
- ret = muxer_msg_iter_init_newly_connected_ports(muxer_comp,
+ muxer_msg_iter->ended_muxer_upstream_msg_iters =
+ g_ptr_array_new_with_free_func(
+ (GDestroyNotify) destroy_muxer_upstream_msg_iter);
+ if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) {
+ BT_LOGE_STR("Failed to allocate a GPtrArray.");
+ goto error;
+ }
+
+ ret = muxer_msg_iter_init_upstream_iterators(muxer_comp,
muxer_msg_iter);
if (ret) {
- BT_LOGE("Cannot initialize newly connected input ports for muxer component's message iterator: "
+ BT_LOGE("Cannot initialize connected input ports for muxer component's message iterator: "
"comp-addr=%p, muxer-comp-addr=%p, "
"muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d",
self_comp, muxer_comp, muxer_msg_iter,
goto error;
}
- bt_self_message_iterator_set_data(self_msg_iter,
- muxer_msg_iter);
+ bt_self_message_iterator_set_data(self_msg_iter, muxer_msg_iter);
BT_LOGD("Initialized muxer component's message iterator: "
"comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
"msg-iter-addr=%p",
goto end;
error:
- if (g_ptr_array_index(muxer_comp->muxer_msg_iters,
- muxer_comp->muxer_msg_iters->len - 1) == muxer_msg_iter) {
- g_ptr_array_remove_index(muxer_comp->muxer_msg_iters,
- muxer_comp->muxer_msg_iters->len - 1);
- }
-
destroy_muxer_msg_iter(muxer_msg_iter);
- bt_self_message_iterator_set_data(self_msg_iter,
- NULL);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ bt_self_message_iterator_set_data(self_msg_iter, NULL);
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
end:
muxer_comp->initializing_muxer_msg_iter = false;
}
BT_HIDDEN
-void muxer_msg_iter_finalize(
- bt_self_message_iterator *self_msg_iter)
+void muxer_msg_iter_finalize(bt_self_message_iterator *self_msg_iter)
{
struct muxer_msg_iter *muxer_msg_iter =
bt_self_message_iterator_get_data(self_msg_iter);
"msg-iter-addr=%p",
self_comp, muxer_comp, muxer_msg_iter, self_msg_iter);
- if (muxer_comp) {
- (void) g_ptr_array_remove_fast(muxer_comp->muxer_msg_iters,
- muxer_msg_iter);
+ if (muxer_msg_iter) {
destroy_muxer_msg_iter(muxer_msg_iter);
}
}
BT_HIDDEN
-enum bt_message_iterator_status muxer_msg_iter_next(
+bt_self_message_iterator_status muxer_msg_iter_next(
bt_self_message_iterator *self_msg_iter,
bt_message_array_const msgs, uint64_t capacity,
uint64_t *count)
{
- enum bt_message_iterator_status status;
+ bt_self_message_iterator_status status;
struct muxer_msg_iter *muxer_msg_iter =
bt_self_message_iterator_get_data(self_msg_iter);
bt_self_component *self_comp = NULL;
"comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
"msg-iter-addr=%p, status=%s",
self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
- bt_message_iterator_status_string(status));
+ bt_common_self_message_iterator_status_string(status));
} else {
BT_LOGV("Returning from muxer component's message iterator's \"next\" method: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_common_self_message_iterator_status_string(status));
}
return status;
}
BT_HIDDEN
-enum bt_self_component_status muxer_input_port_connected(
+bt_self_component_status muxer_input_port_connected(
bt_self_component_filter *self_comp,
bt_self_component_port_input *self_port,
const bt_port_output *other_port)
{
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
- const bt_port *port = bt_self_component_port_as_port(
- bt_self_component_port_input_as_self_component_port(
- self_port));
- struct muxer_comp *muxer_comp =
- bt_self_component_get_data(
- bt_self_component_filter_as_self_component(
- self_comp));
- size_t i;
- int ret;
-
- BT_ASSERT(port);
- BT_ASSERT(muxer_comp);
- BT_LOGD("Port connected: "
- "comp-addr=%p, muxer-comp-addr=%p, "
- "port-addr=%p, port-name=\"%s\", "
- "other-port-addr=%p, other-port-name=\"%s\"",
- self_comp, muxer_comp, self_port, bt_port_get_name(port),
- other_port,
- bt_port_get_name(bt_port_output_as_port_const(other_port)));
-
- for (i = 0; i < muxer_comp->muxer_msg_iters->len; i++) {
- struct muxer_msg_iter *muxer_msg_iter =
- g_ptr_array_index(muxer_comp->muxer_msg_iters, i);
+ bt_self_component_status status;
+ status = add_available_input_port(self_comp);
+ if (status) {
/*
- * Add this port to the list of newly connected ports
- * for this muxer message iterator. We append at
- * the end of this list while
- * muxer_msg_iter_handle_newly_connected_ports()
- * removes the nodes from the beginning.
+ * Only way to report an error later since this
+ * method does not return anything.
*/
- muxer_msg_iter->newly_connected_self_ports =
- g_list_append(
- muxer_msg_iter->newly_connected_self_ports,
- self_port);
- if (!muxer_msg_iter->newly_connected_self_ports) {
- BT_LOGE("Cannot append port to muxer's message iterator list of newly connected input ports: "
- "port-addr=%p, port-name=\"%s\", "
- "muxer-msg-iter-addr=%p", self_port,
- bt_port_get_name(port), muxer_msg_iter);
- status = BT_SELF_COMPONENT_STATUS_ERROR;
+ BT_LOGE("Cannot add one muxer component's input port: "
+ "status=%s",
+ bt_self_component_status_string(status));
+ goto end;
+ }
+
+end:
+ return status;
+}
+
+static inline
+bt_bool muxer_upstream_msg_iters_can_all_seek_beginning(
+ GPtrArray *muxer_upstream_msg_iters)
+{
+ uint64_t i;
+ bt_bool ret = BT_TRUE;
+
+ for (i = 0; i < muxer_upstream_msg_iters->len; i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_upstream_msg_iters->pdata[i];
+
+ if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
+ upstream_msg_iter->msg_iter)) {
+ ret = BT_FALSE;
goto end;
}
+ }
+
+end:
+ return ret;
+}
+
+BT_HIDDEN
+bt_bool muxer_msg_iter_can_seek_beginning(
+ bt_self_message_iterator *self_msg_iter)
+{
+ struct muxer_msg_iter *muxer_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ bt_bool ret = BT_TRUE;
- BT_LOGD("Appended port to muxer's message iterator list of newly connected input ports: "
- "port-addr=%p, port-name=\"%s\", "
- "muxer-msg-iter-addr=%p", self_port,
- bt_port_get_name(port), muxer_msg_iter);
+ if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+ muxer_msg_iter->active_muxer_upstream_msg_iters)) {
+ ret = BT_FALSE;
+ goto end;
}
- /* One less available input port */
- muxer_comp->available_input_ports--;
- ret = ensure_available_input_port(self_comp);
- if (ret) {
- /*
- * Only way to report an error later since this
- * method does not return anything.
- */
- BT_LOGE("Cannot ensure that at least one muxer component's input port is available: "
- "muxer-comp-addr=%p, status=%s",
- muxer_comp, bt_self_component_status_string(ret));
- status = BT_SELF_COMPONENT_STATUS_ERROR;
+ if (!muxer_upstream_msg_iters_can_all_seek_beginning(
+ muxer_msg_iter->ended_muxer_upstream_msg_iters)) {
+ ret = BT_FALSE;
goto end;
}
end:
- return status;
+ return ret;
}
BT_HIDDEN
-void muxer_input_port_disconnected(
- bt_self_component_filter *self_component,
- bt_self_component_port_input *self_port)
+bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
+ bt_self_message_iterator *self_msg_iter)
{
- struct muxer_comp *muxer_comp =
- bt_self_component_get_data(
- bt_self_component_filter_as_self_component(
- self_component));
- const bt_port *port =
- bt_self_component_port_as_port(
- bt_self_component_port_input_as_self_component_port(
- self_port));
+ struct muxer_msg_iter *muxer_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK;
+ uint64_t i;
- BT_ASSERT(port);
- BT_ASSERT(muxer_comp);
+ /* Seek all ended upstream iterators first */
+ for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+ i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
- /* One more available input port */
- muxer_comp->available_input_ports++;
- BT_LOGD("Leaving disconnected input port available for future connections: "
- "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
- "port-name=\"%s\", avail-input-port-count=%zu",
- self_component, muxer_comp, port, bt_port_get_name(port),
- muxer_comp->available_input_ports);
+ status = bt_self_component_port_input_message_iterator_seek_beginning(
+ upstream_msg_iter->msg_iter);
+ if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ empty_message_queue(upstream_msg_iter);
+ }
+
+ /* Seek all previously active upstream iterators */
+ for (i = 0; i < muxer_msg_iter->active_muxer_upstream_msg_iters->len;
+ i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->active_muxer_upstream_msg_iters->pdata[i];
+
+ status = bt_self_component_port_input_message_iterator_seek_beginning(
+ upstream_msg_iter->msg_iter);
+ if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+
+ empty_message_queue(upstream_msg_iter);
+ }
+
+ /* Make them all active */
+ for (i = 0; i < muxer_msg_iter->ended_muxer_upstream_msg_iters->len;
+ i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i];
+
+ g_ptr_array_add(muxer_msg_iter->active_muxer_upstream_msg_iters,
+ upstream_msg_iter);
+ muxer_msg_iter->ended_muxer_upstream_msg_iters->pdata[i] = NULL;
+ }
+
+ g_ptr_array_remove_range(muxer_msg_iter->ended_muxer_upstream_msg_iters,
+ 0, muxer_msg_iter->ended_muxer_upstream_msg_iters->len);
+ muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+ muxer_msg_iter->clock_class_expectation =
+ MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ANY;
+
+end:
+ return (bt_self_message_iterator_status) status;
}