X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fplugins%2Futils%2Fmuxer%2Fmuxer.c;h=d6ca31bb63b435383c45c1ee55d5505425b44472;hb=f2fb1b3297ca0bc13b53189a063b63944be7fae9;hp=ecd5d8d92f059147f0bec1f01dceae1b6ed94b8a;hpb=9244b07ab1598e27d633136124409fff7dd75ff7;p=babeltrace.git diff --git a/src/plugins/utils/muxer/muxer.c b/src/plugins/utils/muxer/muxer.c index ecd5d8d9..d6ca31bb 100644 --- a/src/plugins/utils/muxer/muxer.c +++ b/src/plugins/utils/muxer/muxer.c @@ -430,16 +430,17 @@ void muxer_finalize(bt_self_component_filter *self_comp) } static -bt_self_component_port_input_message_iterator * +bt_self_component_port_input_message_iterator_create_from_message_iterator_status create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, - bt_self_component_port_input *self_port) + bt_self_component_port_input *self_port, + bt_self_component_port_input_message_iterator **msg_iter) { const bt_port *port = bt_self_component_port_as_port( bt_self_component_port_input_as_self_component_port( self_port)); - bt_self_component_port_input_message_iterator *msg_iter = - NULL; + bt_self_component_port_input_message_iterator_create_from_message_iterator_status + status; BT_ASSERT(port); BT_ASSERT(bt_port_is_connected(port)); @@ -447,9 +448,9 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, // TODO: Advance the iterator to >= the time of the latest // returned message by the muxer message // iterator which creates it. - msg_iter = bt_self_component_port_input_message_iterator_create_from_message_iterator( - muxer_msg_iter->self_msg_iter, self_port); - if (!msg_iter) { + status = bt_self_component_port_input_message_iterator_create_from_message_iterator( + muxer_msg_iter->self_msg_iter, self_port, msg_iter); + if (status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { BT_COMP_LOGE("Cannot create upstream message iterator on input port: " "port-addr=%p, port-name=\"%s\"", port, bt_port_get_name(port)); @@ -461,7 +462,7 @@ create_msg_iter_on_input_port(struct muxer_comp *muxer_comp, port, bt_port_get_name(port), msg_iter); end: - return msg_iter; + return status; } static @@ -946,7 +947,13 @@ muxer_msg_iter_youngest_upstream_msg_iter( goto end; } - if (msg_ts_ns < youngest_ts_ns) { + /* + * Update the current message iterator if it has not been set + * yet, or if its current message has a timestamp smaller than + * the previously selected youngest message. + */ + if (G_UNLIKELY(*muxer_upstream_msg_iter == NULL) || + msg_ts_ns < youngest_ts_ns) { *muxer_upstream_msg_iter = cur_muxer_upstream_msg_iter; youngest_ts_ns = msg_ts_ns; @@ -1235,12 +1242,13 @@ void destroy_muxer_msg_iter(struct muxer_msg_iter *muxer_msg_iter) } static -int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, +bt_component_class_message_iterator_init_method_status +muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter) { int64_t count; int64_t i; - int ret = 0; + bt_component_class_message_iterator_init_method_status status; count = bt_component_filter_get_input_port_count( bt_self_component_filter_as_component_filter( @@ -1249,6 +1257,7 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, BT_COMP_LOGD("No input port to initialize for muxer component's message iterator: " "muxer-comp-addr=%p, muxer-msg-iter-addr=%p", muxer_comp, muxer_msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; goto end; } @@ -1258,6 +1267,9 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, bt_self_component_filter_borrow_input_port_by_index( muxer_comp->self_comp_flt, i); const bt_port *port; + bt_self_component_port_input_message_iterator_create_from_message_iterator_status + msg_iter_status; + int int_status; BT_ASSERT(self_port); port = bt_self_component_port_as_port( @@ -1270,27 +1282,29 @@ int muxer_msg_iter_init_upstream_iterators(struct muxer_comp *muxer_comp, continue; } - upstream_msg_iter = create_msg_iter_on_input_port(muxer_comp, - muxer_msg_iter, self_port); - if (!upstream_msg_iter) { + msg_iter_status = create_msg_iter_on_input_port(muxer_comp, + muxer_msg_iter, self_port, &upstream_msg_iter); + if (msg_iter_status != BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_CREATE_FROM_MESSAGE_ITERATOR_STATUS_OK) { /* create_msg_iter_on_input_port() logs errors */ - BT_ASSERT(!upstream_msg_iter); - ret = -1; + status = (int) msg_iter_status; goto end; } - ret = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, + int_status = muxer_msg_iter_add_upstream_msg_iter(muxer_msg_iter, upstream_msg_iter); bt_self_component_port_input_message_iterator_put_ref( upstream_msg_iter); - if (ret) { + if (int_status) { + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; /* muxer_msg_iter_add_upstream_msg_iter() logs errors */ goto end; } } + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; + end: - return ret; + return status; } BT_HIDDEN @@ -1301,9 +1315,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( { struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; - bt_component_class_message_iterator_init_method_status status = - BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_OK; - int ret; + bt_component_class_message_iterator_init_method_status status; muxer_comp = bt_self_component_get_data( bt_self_component_filter_as_self_component(self_comp)); @@ -1321,6 +1333,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( BT_COMP_LOGE("Recursive initialization of muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, msg-iter-addr=%p", self_comp, muxer_comp, self_msg_iter); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; goto error; } @@ -1328,6 +1341,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( muxer_msg_iter = g_new0(struct muxer_msg_iter, 1); if (!muxer_msg_iter) { BT_COMP_LOGE_STR("Failed to allocate one muxer component's message iterator."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1339,6 +1353,7 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->active_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } @@ -1347,17 +1362,18 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( (GDestroyNotify) destroy_muxer_upstream_msg_iter); if (!muxer_msg_iter->ended_muxer_upstream_msg_iters) { BT_COMP_LOGE_STR("Failed to allocate a GPtrArray."); + status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_MEMORY_ERROR; goto error; } - ret = muxer_msg_iter_init_upstream_iterators(muxer_comp, + status = muxer_msg_iter_init_upstream_iterators(muxer_comp, muxer_msg_iter); - if (ret) { + if (status) { BT_COMP_LOGE("Cannot initialize connected input ports for muxer component's message iterator: " "comp-addr=%p, muxer-comp-addr=%p, " "muxer-msg-iter-addr=%p, msg-iter-addr=%p, ret=%d", self_comp, muxer_comp, muxer_msg_iter, - self_msg_iter, ret); + self_msg_iter, status); goto error; } @@ -1371,7 +1387,6 @@ bt_component_class_message_iterator_init_method_status muxer_msg_iter_init( error: destroy_muxer_msg_iter(muxer_msg_iter); bt_self_message_iterator_set_data(self_msg_iter, NULL); - status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INIT_METHOD_STATUS_ERROR; end: muxer_comp->initializing_muxer_msg_iter = false; @@ -1473,49 +1488,57 @@ end: } static inline -bt_bool muxer_upstream_msg_iters_can_all_seek_beginning( - GPtrArray *muxer_upstream_msg_iters) +bt_component_class_message_iterator_can_seek_beginning_method_status +muxer_upstream_msg_iters_can_all_seek_beginning( + GPtrArray *muxer_upstream_msg_iters, bt_bool *can_seek) { + bt_component_class_message_iterator_can_seek_beginning_method_status status; uint64_t i; - bt_bool ret = BT_TRUE; for (i = 0; i < muxer_upstream_msg_iters->len; i++) { struct muxer_upstream_msg_iter *upstream_msg_iter = muxer_upstream_msg_iters->pdata[i]; + status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( + upstream_msg_iter->msg_iter, can_seek); + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { + goto end; + } - if (!bt_self_component_port_input_message_iterator_can_seek_beginning( - upstream_msg_iter->msg_iter)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } } + *can_seek = BT_TRUE; + end: - return ret; + return status; } BT_HIDDEN -bt_bool muxer_msg_iter_can_seek_beginning( - bt_self_message_iterator *self_msg_iter) +bt_component_class_message_iterator_can_seek_beginning_method_status +muxer_msg_iter_can_seek_beginning( + bt_self_message_iterator *self_msg_iter, bt_bool *can_seek) { struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); - bt_bool ret = BT_TRUE; + bt_component_class_message_iterator_can_seek_beginning_method_status status; - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->active_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->active_muxer_upstream_msg_iters, can_seek); + if (status != BT_COMPONENT_CLASS_MESSAGE_ITERATOR_CAN_SEEK_BEGINNING_METHOD_STATUS_OK) { goto end; } - if (!muxer_upstream_msg_iters_can_all_seek_beginning( - muxer_msg_iter->ended_muxer_upstream_msg_iters)) { - ret = BT_FALSE; + if (!*can_seek) { goto end; } + status = muxer_upstream_msg_iters_can_all_seek_beginning( + muxer_msg_iter->ended_muxer_upstream_msg_iters, can_seek); + end: - return ret; + return status; } BT_HIDDEN