#include <babeltrace/graph/private-port.h>
#include <plugins-common.h>
#include <glib.h>
+#include <stdbool.h>
#include <assert.h>
+#include <stdlib.h>
+
+#define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes"
struct muxer_comp {
/* Array of struct bt_private_notification_iterator * (weak refs) */
size_t available_input_ports;
bool error;
bool initializing_muxer_notif_iter;
+ bool ignore_absolute;
};
struct muxer_upstream_notif_iter {
/* Owned by this, NULL if ended */
struct bt_notification_iterator *notif_iter;
- /* Owned by this*/
- struct bt_private_port *priv_port;
-
/*
* This flag is true if the upstream notification iterator's
* current notification must be considered for the multiplexing
GPtrArray *muxer_upstream_notif_iters;
/*
- * List of "recently" connected input ports (owned by this) to
+ * List of "recently" connected input ports (weak) to
* handle by this muxer notification iterator.
* muxer_port_connected() adds entries to this list, and the
* entries are removed when a notification iterator is created
}
bt_put(muxer_upstream_notif_iter->notif_iter);
- bt_put(muxer_upstream_notif_iter->priv_port);
g_free(muxer_upstream_notif_iter);
}
}
muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter);
- muxer_upstream_notif_iter->priv_port = bt_get(priv_port);
muxer_upstream_notif_iter->is_valid = false;
g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters,
muxer_upstream_notif_iter);
}
static
-int ensure_available_input_port(struct bt_private_component *priv_comp)
+enum bt_component_status ensure_available_input_port(
+ struct bt_private_component *priv_comp)
{
struct muxer_comp *muxer_comp =
bt_private_component_get_user_data(priv_comp);
- int ret = 0;
+ enum bt_component_status status = BT_COMPONENT_STATUS_OK;
GString *port_name = NULL;
- void *priv_port = NULL;
assert(muxer_comp);
port_name = g_string_new("in");
if (!port_name) {
- ret = -1;
+ status = BT_COMPONENT_STATUS_NOMEM;
goto end;
}
g_string_append_printf(port_name, "%u", muxer_comp->next_port_num);
- priv_port = bt_private_component_filter_add_input_private_port(
- priv_comp, port_name->str, NULL);
- if (!priv_port) {
- ret = -1;
+ status = bt_private_component_filter_add_input_private_port(
+ priv_comp, port_name->str, NULL, NULL);
+ if (status != BT_COMPONENT_STATUS_OK) {
goto end;
}
g_string_free(port_name, TRUE);
}
- BT_PUT(priv_port);
- return ret;
+ return status;
}
static
-int create_output_port(struct bt_private_component *priv_comp)
+enum bt_component_status create_output_port(
+ struct bt_private_component *priv_comp)
{
- void *priv_port;
- int ret = 0;
-
- priv_port = bt_private_component_filter_add_output_private_port(
- priv_comp, "out", NULL);
- if (!priv_port) {
- ret = -1;
- }
-
- bt_put(priv_port);
- return ret;
+ return bt_private_component_filter_add_output_private_port(
+ priv_comp, "out", NULL, NULL);
}
static
g_free(muxer_comp);
}
+static
+struct bt_value *get_default_params(void)
+{
+ struct bt_value *params;
+ int ret;
+
+ params = bt_value_map_create();
+ if (!params) {
+ goto error;
+ }
+
+ ret = bt_value_map_insert_bool(params,
+ ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME, false);
+ if (ret) {
+ goto error;
+ }
+
+ goto end;
+
+error:
+ BT_PUT(params);
+
+end:
+ return params;
+}
+
+static
+int configure_muxer_comp(struct muxer_comp *muxer_comp, struct bt_value *params)
+{
+ struct bt_value *default_params = NULL;
+ struct bt_value *real_params = NULL;
+ struct bt_value *ignore_absolute = NULL;
+ int ret = 0;
+ bt_bool bool_val;
+
+ default_params = get_default_params();
+ if (!default_params) {
+ goto error;
+ }
+
+ real_params = bt_value_map_extend(default_params, params);
+ if (!real_params) {
+ goto error;
+ }
+
+ ignore_absolute = bt_value_map_get(real_params,
+ ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME);
+ if (!bt_value_is_bool(ignore_absolute)) {
+ goto error;
+ }
+
+ if (bt_value_bool_get(ignore_absolute, &bool_val)) {
+ goto error;
+ }
+
+ muxer_comp->ignore_absolute = (bool) bool_val;
+
+ goto end;
+
+error:
+ ret = -1;
+
+end:
+ bt_put(default_params);
+ bt_put(real_params);
+ bt_put(ignore_absolute);
+ return ret;
+}
+
BT_HIDDEN
enum bt_component_status muxer_init(
struct bt_private_component *priv_comp,
goto error;
}
+ ret = configure_muxer_comp(muxer_comp, params);
+ if (ret) {
+ goto error;
+ }
+
muxer_comp->muxer_notif_iters = g_ptr_array_new();
if (!muxer_comp->muxer_notif_iters) {
goto error;
muxer_comp->priv_comp = priv_comp;
ret = bt_private_component_set_user_data(priv_comp, muxer_comp);
assert(ret == 0);
- ret = ensure_available_input_port(priv_comp);
- if (ret) {
+ status = ensure_available_input_port(priv_comp);
+ if (status != BT_COMPONENT_STATUS_OK) {
goto error;
}
destroy_muxer_comp(muxer_comp);
ret = bt_private_component_set_user_data(priv_comp, NULL);
assert(ret == 0);
- status = BT_COMPONENT_STATUS_ERROR;
+
+ if (status == BT_COMPONENT_STATUS_OK) {
+ status = BT_COMPONENT_STATUS_ERROR;
+ }
end:
return status;
// returned notification by the muxer notification
// iterator which creates it.
notif_iter = bt_private_connection_create_notification_iterator(
- priv_conn);
+ priv_conn, NULL);
if (!notif_iter) {
*ret = -1;
goto end;
*/
muxer_upstream_notif_iter->is_valid = false;
break;
- case BT_NOTIFICATION_ITERATOR_STATUS_END:
+ case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */
+ case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED:
/*
* Notification iterator reached the end: release it. It
* won't be considered again to find the youngest
&ret);
if (ret) {
assert(!upstream_notif_iter);
- bt_put(priv_port);
goto error;
}
muxer_notif_iter_add_upstream_notif_iter(
muxer_notif_iter, upstream_notif_iter,
priv_port);
- BT_PUT(priv_port);
BT_PUT(upstream_notif_iter);
if (!muxer_upstream_notif_iter) {
goto error;
remove_node:
bt_put(upstream_notif_iter);
bt_put(port);
- bt_put(priv_port);
muxer_notif_iter->newly_connected_priv_ports =
g_list_delete_link(
muxer_notif_iter->newly_connected_priv_ports,
goto error;
}
- if (!bt_ctf_clock_class_is_absolute(clock_class)) {
- // TODO: Allow this with an explicit parameter
+ if (!muxer_comp->ignore_absolute &&
+ !bt_ctf_clock_class_is_absolute(clock_class)) {
goto error;
}
notif, clock_class);
break;
default:
- assert(false);
+ abort();
}
if (!clock_value) {
if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) {
goto end;
}
+
+ /*
+ * Remove this muxer upstream notification iterator
+ * if it's ended or canceled.
+ */
+ if (!muxer_upstream_notif_iter->notif_iter) {
+ /*
+ * Use g_ptr_array_remove_fast() because the
+ * order of those elements is not important.
+ */
+ g_ptr_array_remove_index_fast(
+ muxer_notif_iter->muxer_upstream_notif_iters,
+ i);
+ i--;
+ }
}
end:
muxer_notif_iter, &muxer_upstream_notif_iter,
&next_return_ts);
if (next_return.status < 0 ||
- next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END) {
+ next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END ||
+ next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) {
goto end;
}
static
void destroy_muxer_notif_iter(struct muxer_notif_iter *muxer_notif_iter)
{
- GList *node;
-
if (!muxer_notif_iter) {
return;
}
muxer_notif_iter->muxer_upstream_notif_iters, TRUE);
}
- for (node = muxer_notif_iter->newly_connected_priv_ports;
- node; node = g_list_next(node)) {
- bt_put(node->data);
- }
-
g_list_free(muxer_notif_iter->newly_connected_priv_ports);
g_free(muxer_notif_iter);
}
}
bt_put(port);
+ bt_put(priv_port);
muxer_notif_iter->newly_connected_priv_ports =
g_list_append(
muxer_notif_iter->newly_connected_priv_ports,
priv_port);
if (!muxer_notif_iter->newly_connected_priv_ports) {
- bt_put(priv_port);
ret = -1;
goto end;
}
struct muxer_comp *muxer_comp =
bt_private_component_get_user_data(priv_comp);
size_t i;
+ int ret;
assert(self_port);
assert(muxer_comp);
- if (bt_port_get_type(self_port) == BT_PORT_TYPE_INPUT) {
- int ret;
-
- /* One less available input port */
- muxer_comp->available_input_ports--;
- ret = ensure_available_input_port(priv_comp);
- if (ret) {
- /*
- * Only way to report an error later since this
- * method does not return anything.
- */
- muxer_comp->error = true;
- goto end;
- }
+ if (bt_port_get_type(self_port) == BT_PORT_TYPE_OUTPUT) {
+ goto end;
}
for (i = 0; i < muxer_comp->muxer_notif_iters->len; i++) {
muxer_notif_iter->newly_connected_priv_ports =
g_list_append(
muxer_notif_iter->newly_connected_priv_ports,
- bt_get(self_private_port));
+ self_private_port);
if (!muxer_notif_iter->newly_connected_priv_ports) {
/* Put reference taken by bt_get() above */
- bt_put(self_private_port);
muxer_comp->error = true;
goto end;
}
}
+ /* One less available input port */
+ muxer_comp->available_input_ports--;
+ ret = ensure_available_input_port(priv_comp);
+ if (ret) {
+ /*
+ * Only way to report an error later since this
+ * method does not return anything.
+ */
+ muxer_comp->error = true;
+ goto end;
+ }
+
end:
bt_put(self_port);
}