#include <stdlib.h>
#include <string.h>
+#include "muxer.h"
+
#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
struct muxer_comp {
}
static
-enum bt_self_component_status ensure_available_input_port(
+bt_self_component_status ensure_available_input_port(
bt_self_component_filter *self_comp)
{
struct muxer_comp *muxer_comp = bt_self_component_get_data(
bt_self_component_filter_as_self_component(self_comp));
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+ bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
GString *port_name = NULL;
BT_ASSERT(muxer_comp);
}
static
-enum bt_self_component_status create_output_port(
+bt_self_component_status create_output_port(
bt_self_component_filter *self_comp)
{
return bt_self_component_filter_add_output_port(
}
BT_HIDDEN
-enum bt_self_component_status muxer_init(
+bt_self_component_status muxer_init(
bt_self_component_filter *self_comp,
- bt_value *params, void *init_data)
+ const bt_value *params, void *init_data)
+
+
{
int ret;
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+ bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
struct muxer_comp *muxer_comp = g_new0(struct muxer_comp, 1);
BT_LOGD("Initializing muxer component: "
}
static
-enum bt_message_iterator_status muxer_upstream_msg_iter_next(
+bt_self_message_iterator_status muxer_upstream_msg_iter_next(
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
{
- enum bt_message_iterator_status status;
+ bt_self_message_iterator_status status;
+ bt_message_iterator_status input_port_iter_status;
bt_message_array_const msgs;
uint64_t i;
uint64_t count;
"muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p",
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter);
- status = bt_self_component_port_input_message_iterator_next(
+ input_port_iter_status = bt_self_component_port_input_message_iterator_next(
muxer_upstream_msg_iter->msg_iter, &msgs, &count);
BT_LOGV("Upstream message iterator's \"next\" method returned: "
- "status=%s", bt_message_iterator_status_string(status));
+ "status=%s", bt_message_iterator_status_string(input_port_iter_status));
- switch (status) {
+ switch (input_port_iter_status) {
case BT_MESSAGE_ITERATOR_STATUS_OK:
/*
* Message iterator's current message is
g_queue_push_tail(muxer_upstream_msg_iter->msgs,
(void *) msgs[i]);
}
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
break;
case BT_MESSAGE_ITERATOR_STATUS_AGAIN:
/*
* valid anymore. Return
* BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately.
*/
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN;
break;
case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */
- case BT_MESSAGE_ITERATOR_STATUS_CANCELED:
/*
* Message iterator reached the end: release it. It
* won't be considered again to find the youngest
* message.
*/
BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter);
- status = BT_MESSAGE_ITERATOR_STATUS_OK;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
break;
default:
/* Error or unsupported status code */
BT_LOGE("Error or unsupported status code: "
- "status-code=%d", status);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ "status-code=%d", input_port_iter_status);
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
break;
}
{
const bt_clock_class *clock_class = NULL;
const bt_clock_snapshot *clock_snapshot = NULL;
- const bt_event *event = NULL;
int ret = 0;
const unsigned char *cc_uuid;
const char *cc_name;
- enum bt_clock_snapshot_status cv_status = BT_CLOCK_SNAPSHOT_STATUS_KNOWN;
+ bt_clock_snapshot_state cs_state = BT_CLOCK_SNAPSHOT_STATE_KNOWN;
+ bt_message_stream_activity_clock_snapshot_state sa_cs_state;
BT_ASSERT(msg);
BT_ASSERT(ts_ns);
switch (bt_message_get_type(msg)) {
case BT_MESSAGE_TYPE_EVENT:
- event = bt_message_event_borrow_event_const(msg);
- BT_ASSERT(event);
- cv_status = bt_event_borrow_default_clock_snapshot_const(event,
- &clock_snapshot);
+ clock_class =
+ bt_message_event_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ cs_state = bt_message_event_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_PACKET_BEGINNING:
+ bt_message_packet_beginning_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ cs_state = bt_message_packet_beginning_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_PACKET_END:
+ bt_message_packet_end_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ cs_state = bt_message_packet_end_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_DISCARDED_EVENTS:
+ bt_message_discarded_events_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ cs_state = bt_message_discarded_events_borrow_default_beginning_clock_snapshot_const(
+ msg, &clock_snapshot);
break;
+ case BT_MESSAGE_TYPE_DISCARDED_PACKETS:
+ bt_message_discarded_packets_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+ cs_state = bt_message_discarded_packets_borrow_default_beginning_clock_snapshot_const(
+ msg, &clock_snapshot);
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING:
+ bt_message_stream_activity_beginning_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ sa_cs_state = bt_message_stream_activity_beginning_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
+ case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END:
+ bt_message_stream_activity_end_borrow_stream_class_default_clock_class_const(
+ msg);
+ if (!clock_class) {
+ goto no_clock_snapshot;
+ }
+
+ sa_cs_state = bt_message_stream_activity_end_borrow_default_clock_snapshot_const(
+ msg, &clock_snapshot);
+ if (sa_cs_state != BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) {
+ goto no_clock_snapshot;
+ }
+
+ break;
case BT_MESSAGE_TYPE_INACTIVITY:
- clock_snapshot =
+ cs_state =
bt_message_inactivity_borrow_default_clock_snapshot_const(
- msg);
+ msg, &clock_snapshot);
break;
default:
/* All the other messages have a higher priority */
goto end;
}
- if (cv_status != BT_CLOCK_SNAPSHOT_STATUS_KNOWN) {
+ if (cs_state != BT_CLOCK_SNAPSHOT_STATE_KNOWN) {
BT_LOGE_STR("Unsupported unknown clock snapshot.");
ret = -1;
goto end;
}
- /*
- * If the clock snapshot is missing, then we consider that this
- * message has no time. In this case it's always the
- * youngest.
- */
- if (!clock_snapshot) {
- BT_LOGV_STR("Message's default clock snapshot is missing: "
- "using the last returned timestamp.");
- *ts_ns = last_returned_ts_ns;
- goto end;
- }
-
clock_class = bt_clock_snapshot_borrow_clock_class_const(clock_snapshot);
BT_ASSERT(clock_class);
cc_uuid = bt_clock_class_get_uuid(clock_class);
* the iterator without a true
* `assume-absolute-clock-classes` parameter.
*/
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
/* Expect absolute clock classes */
muxer_msg_iter->clock_class_expectation =
MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE;
if (!muxer_comp->assume_absolute_clock_classes) {
switch (muxer_msg_iter->clock_class_expectation) {
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_ABSOLUTE:
- if (!bt_clock_class_is_absolute(clock_class)) {
+ if (!bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting an absolute clock class, "
"but got a non-absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
}
break;
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_NO_UUID:
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting a non-absolute clock class with no UUID, "
"but got an absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
}
break;
case MUXER_MSG_ITER_CLOCK_CLASS_EXPECTATION_NOT_ABS_SPEC_UUID:
- if (bt_clock_class_is_absolute(clock_class)) {
+ if (bt_clock_class_origin_is_unix_epoch(clock_class)) {
BT_LOGE("Expecting a non-absolute clock class with a specific UUID, "
"but got an absolute one: "
"clock-class-addr=%p, clock-class-name=\"%s\"",
goto end;
+no_clock_snapshot:
+ BT_LOGV_STR("Message's default clock snapshot is missing: "
+ "using the last returned timestamp.");
+ *ts_ns = last_returned_ts_ns;
+ goto end;
+
error:
ret = -1;
* the youngest, and sets *ts_ns to its time.
*/
static
-enum bt_message_iterator_status
+bt_self_message_iterator_status
muxer_msg_iter_youngest_upstream_msg_iter(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
size_t i;
int ret;
int64_t youngest_ts_ns = INT64_MAX;
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
BT_ASSERT(muxer_comp);
BT_ASSERT(muxer_msg_iter);
if (ret) {
/* get_msg_ts_ns() logs errors */
*muxer_upstream_msg_iter = NULL;
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
}
if (!*muxer_upstream_msg_iter) {
- status = BT_MESSAGE_ITERATOR_STATUS_END;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_END;
*ts_ns = INT64_MIN;
}
}
static
-enum bt_message_iterator_status validate_muxer_upstream_msg_iter(
+bt_self_message_iterator_status validate_muxer_upstream_msg_iter(
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
BT_LOGV("Validating muxer's upstream message iterator wrapper: "
"muxer-upstream-msg-iter-wrap-addr=%p",
}
static
-enum bt_message_iterator_status validate_muxer_upstream_msg_iters(
+bt_self_message_iterator_status validate_muxer_upstream_msg_iters(
struct muxer_msg_iter *muxer_msg_iter)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
size_t i;
BT_LOGV("Validating muxer's upstream message iterator wrappers: "
status = validate_muxer_upstream_msg_iter(
muxer_upstream_msg_iter);
- if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
if (status < 0) {
BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: "
"muxer-msg-iter-addr=%p, "
}
static inline
-enum bt_message_iterator_status muxer_msg_iter_do_next_one(
+bt_self_message_iterator_status muxer_msg_iter_do_next_one(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
const bt_message **msg)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL;
int64_t next_return_ts;
"muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
"ret=%d",
muxer_comp, muxer_msg_iter, ret);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
status = validate_muxer_upstream_msg_iters(muxer_msg_iter);
- if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
/* validate_muxer_upstream_msg_iters() logs details */
goto end;
}
status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp,
muxer_msg_iter, &muxer_upstream_msg_iter,
&next_return_ts);
- if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END ||
- status == BT_MESSAGE_ITERATOR_STATUS_CANCELED) {
+ if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) {
if (status < 0) {
BT_LOGE("Cannot find the youngest upstream message iterator wrapper: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_self_message_iterator_status_string(status));
} else {
BT_LOGV("Cannot find the youngest upstream message iterator wrapper: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_self_message_iterator_status_string(status));
}
goto end;
"last-returned-ts=%" PRId64,
muxer_msg_iter, next_return_ts,
muxer_msg_iter->last_returned_ts_ns);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
goto end;
}
"muxer-upstream-msg-iter-wrap-addr=%p, "
"ts=%" PRId64,
muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts);
- BT_ASSERT(status == BT_MESSAGE_ITERATOR_STATUS_OK);
+ BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK);
BT_ASSERT(muxer_upstream_msg_iter);
/*
}
static
-enum bt_message_iterator_status muxer_msg_iter_do_next(
+bt_self_message_iterator_status muxer_msg_iter_do_next(
struct muxer_comp *muxer_comp,
struct muxer_msg_iter *muxer_msg_iter,
bt_message_array_const msgs, uint64_t capacity,
uint64_t *count)
{
- enum bt_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
uint64_t i = 0;
- while (i < capacity && status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+ while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
status = muxer_msg_iter_do_next_one(muxer_comp,
muxer_msg_iter, &msgs[i]);
- if (status == BT_MESSAGE_ITERATOR_STATUS_OK) {
+ if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) {
i++;
}
}
* message, in which case we'll return it.
*/
*count = i;
- status = BT_MESSAGE_ITERATOR_STATUS_OK;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
}
return status;
}
BT_HIDDEN
-enum bt_self_message_iterator_status muxer_msg_iter_init(
+bt_self_message_iterator_status muxer_msg_iter_init(
bt_self_message_iterator *self_msg_iter,
bt_self_component_filter *self_comp,
bt_self_component_port_output *port)
{
struct muxer_comp *muxer_comp = NULL;
struct muxer_msg_iter *muxer_msg_iter = NULL;
- enum bt_self_message_iterator_status status =
- BT_MESSAGE_ITERATOR_STATUS_OK;
+ bt_self_message_iterator_status status =
+ BT_SELF_MESSAGE_ITERATOR_STATUS_OK;
int ret;
muxer_comp = bt_self_component_get_data(
destroy_muxer_msg_iter(muxer_msg_iter);
bt_self_message_iterator_set_data(self_msg_iter,
NULL);
- status = BT_MESSAGE_ITERATOR_STATUS_ERROR;
+ status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR;
end:
muxer_comp->initializing_muxer_msg_iter = false;
}
BT_HIDDEN
-enum bt_message_iterator_status muxer_msg_iter_next(
+bt_self_message_iterator_status muxer_msg_iter_next(
bt_self_message_iterator *self_msg_iter,
bt_message_array_const msgs, uint64_t capacity,
uint64_t *count)
{
- enum bt_message_iterator_status status;
+ bt_self_message_iterator_status status;
struct muxer_msg_iter *muxer_msg_iter =
bt_self_message_iterator_get_data(self_msg_iter);
bt_self_component *self_comp = NULL;
"comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, "
"msg-iter-addr=%p, status=%s",
self_comp, muxer_comp, muxer_msg_iter, self_msg_iter,
- bt_message_iterator_status_string(status));
+ bt_self_message_iterator_status_string(status));
} else {
BT_LOGV("Returning from muxer component's message iterator's \"next\" method: "
"status=%s",
- bt_message_iterator_status_string(status));
+ bt_self_message_iterator_status_string(status));
}
return status;
}
BT_HIDDEN
-enum bt_self_component_status muxer_input_port_connected(
+bt_self_component_status muxer_input_port_connected(
bt_self_component_filter *self_comp,
bt_self_component_port_input *self_port,
const bt_port_output *other_port)
{
- enum bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
+ bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK;
const bt_port *port = bt_self_component_port_as_port(
bt_self_component_port_input_as_self_component_port(
self_port));
}
BT_HIDDEN
-void muxer_input_port_disconnected(
- bt_self_component_filter *self_component,
- bt_self_component_port_input *self_port)
+bt_bool muxer_msg_iter_can_seek_beginning(
+ bt_self_message_iterator *self_msg_iter)
{
- struct muxer_comp *muxer_comp =
- bt_self_component_get_data(
- bt_self_component_filter_as_self_component(
- self_component));
- const bt_port *port =
- bt_self_component_port_as_port(
- bt_self_component_port_input_as_self_component_port(
- self_port));
+ struct muxer_msg_iter *muxer_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ uint64_t i;
+ bt_bool ret = BT_TRUE;
- BT_ASSERT(port);
- BT_ASSERT(muxer_comp);
+ for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
- /* One more available input port */
- muxer_comp->available_input_ports++;
- BT_LOGD("Leaving disconnected input port available for future connections: "
- "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, "
- "port-name=\"%s\", avail-input-port-count=%zu",
- self_component, muxer_comp, port, bt_port_get_name(port),
- muxer_comp->available_input_ports);
+ if (!bt_self_component_port_input_message_iterator_can_seek_beginning(
+ upstream_msg_iter->msg_iter)) {
+ ret = BT_FALSE;
+ goto end;
+ }
+ }
+
+end:
+ return ret;
+}
+
+BT_HIDDEN
+bt_self_message_iterator_status muxer_msg_iter_seek_beginning(
+ bt_self_message_iterator *self_msg_iter)
+{
+ struct muxer_msg_iter *muxer_msg_iter =
+ bt_self_message_iterator_get_data(self_msg_iter);
+ int status;
+ uint64_t i;
+
+ for (i = 0; i < muxer_msg_iter->muxer_upstream_msg_iters->len; i++) {
+ struct muxer_upstream_msg_iter *upstream_msg_iter =
+ muxer_msg_iter->muxer_upstream_msg_iters->pdata[i];
+
+ status = bt_self_component_port_input_message_iterator_seek_beginning(
+ upstream_msg_iter->msg_iter);
+ if (status != BT_MESSAGE_ITERATOR_STATUS_OK) {
+ goto end;
+ }
+ }
+
+ muxer_msg_iter->last_returned_ts_ns = INT64_MIN;
+
+end:
+ return status;
}