X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Futils%2Fmuxer%2Fmuxer.c;h=032fef41be05dac7bbe47874a839007fc28894e4;hb=4725a2013cb518374822ccb490610b45f74dbdbf;hp=8270e14b7113382d09d8037e9195d1147b1a8349;hpb=d0fea13089e4ea4825826b1022ff0d8110ef2898;p=babeltrace.git diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index 8270e14b..032fef41 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -39,6 +39,8 @@ #include #include +#include "muxer.h" + #define ASSUME_ABSOLUTE_CLOCK_CLASSES_PARAM_NAME "assume-absolute-clock-classes" struct muxer_comp { @@ -328,7 +330,9 @@ end: BT_HIDDEN bt_self_component_status muxer_init( bt_self_component_filter *self_comp, - bt_value *params, void *init_data) + const bt_value *params, void *init_data) + + { int ret; bt_self_component_status status = BT_SELF_COMPONENT_STATUS_OK; @@ -447,10 +451,11 @@ end: } static -bt_message_iterator_status muxer_upstream_msg_iter_next( +bt_self_message_iterator_status muxer_upstream_msg_iter_next( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) { - bt_message_iterator_status status; + bt_self_message_iterator_status status; + bt_message_iterator_status input_port_iter_status; bt_message_array_const msgs; uint64_t i; uint64_t count; @@ -459,12 +464,12 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( "muxer-upstream-msg-iter-wrap-addr=%p, msg-iter-addr=%p", muxer_upstream_msg_iter, muxer_upstream_msg_iter->msg_iter); - status = bt_self_component_port_input_message_iterator_next( + input_port_iter_status = bt_self_component_port_input_message_iterator_next( muxer_upstream_msg_iter->msg_iter, &msgs, &count); BT_LOGV("Upstream message iterator's \"next\" method returned: " - "status=%s", bt_message_iterator_status_string(status)); + "status=%s", bt_message_iterator_status_string(input_port_iter_status)); - switch (status) { + switch (input_port_iter_status) { case BT_MESSAGE_ITERATOR_STATUS_OK: /* * Message iterator's current message is @@ -483,6 +488,7 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( g_queue_push_tail(muxer_upstream_msg_iter->msgs, (void *) msgs[i]); } + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; break; case BT_MESSAGE_ITERATOR_STATUS_AGAIN: /* @@ -490,6 +496,7 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( * valid anymore. Return * BT_MESSAGE_ITERATOR_STATUS_AGAIN immediately. */ + status = BT_SELF_MESSAGE_ITERATOR_STATUS_AGAIN; break; case BT_MESSAGE_ITERATOR_STATUS_END: /* Fall-through. */ /* @@ -498,13 +505,13 @@ bt_message_iterator_status muxer_upstream_msg_iter_next( * message. */ BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(muxer_upstream_msg_iter->msg_iter); - status = BT_MESSAGE_ITERATOR_STATUS_OK; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; break; default: /* Error or unsupported status code */ BT_LOGE("Error or unsupported status code: " - "status-code=%d", status); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + "status-code=%d", input_port_iter_status); + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; break; } @@ -850,7 +857,7 @@ end: * the youngest, and sets *ts_ns to its time. */ static -bt_message_iterator_status +bt_self_message_iterator_status muxer_msg_iter_youngest_upstream_msg_iter( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, @@ -860,8 +867,8 @@ muxer_msg_iter_youngest_upstream_msg_iter( size_t i; int ret; int64_t youngest_ts_ns = INT64_MAX; - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; BT_ASSERT(muxer_comp); BT_ASSERT(muxer_msg_iter); @@ -890,7 +897,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( if (ret) { /* get_msg_ts_ns() logs errors */ *muxer_upstream_msg_iter = NULL; - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } @@ -903,7 +910,7 @@ muxer_msg_iter_youngest_upstream_msg_iter( } if (!*muxer_upstream_msg_iter) { - status = BT_MESSAGE_ITERATOR_STATUS_END; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_END; *ts_ns = INT64_MIN; } @@ -912,11 +919,11 @@ end: } static -bt_message_iterator_status validate_muxer_upstream_msg_iter( +bt_self_message_iterator_status validate_muxer_upstream_msg_iter( struct muxer_upstream_msg_iter *muxer_upstream_msg_iter) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; BT_LOGV("Validating muxer's upstream message iterator wrapper: " "muxer-upstream-msg-iter-wrap-addr=%p", @@ -939,11 +946,11 @@ end: } static -bt_message_iterator_status validate_muxer_upstream_msg_iters( +bt_self_message_iterator_status validate_muxer_upstream_msg_iters( struct muxer_msg_iter *muxer_msg_iter) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; size_t i; BT_LOGV("Validating muxer's upstream message iterator wrappers: " @@ -957,7 +964,7 @@ bt_message_iterator_status validate_muxer_upstream_msg_iters( status = validate_muxer_upstream_msg_iter( muxer_upstream_msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { if (status < 0) { BT_LOGE("Cannot validate muxer's upstream message iterator wrapper: " "muxer-msg-iter-addr=%p, " @@ -1000,13 +1007,13 @@ end: } static inline -bt_message_iterator_status muxer_msg_iter_do_next_one( +bt_self_message_iterator_status muxer_msg_iter_do_next_one( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, const bt_message **msg) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; struct muxer_upstream_msg_iter *muxer_upstream_msg_iter = NULL; int64_t next_return_ts; @@ -1019,12 +1026,12 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( "muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "ret=%d", muxer_comp, muxer_msg_iter, ret); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } status = validate_muxer_upstream_msg_iters(muxer_msg_iter); - if (status != BT_MESSAGE_ITERATOR_STATUS_OK) { + if (status != BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { /* validate_muxer_upstream_msg_iters() logs details */ goto end; } @@ -1055,15 +1062,15 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( status = muxer_msg_iter_youngest_upstream_msg_iter(muxer_comp, muxer_msg_iter, &muxer_upstream_msg_iter, &next_return_ts); - if (status < 0 || status == BT_MESSAGE_ITERATOR_STATUS_END) { + if (status < 0 || status == BT_SELF_MESSAGE_ITERATOR_STATUS_END) { if (status < 0) { BT_LOGE("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_message_iterator_status_string(status)); + bt_self_message_iterator_status_string(status)); } else { BT_LOGV("Cannot find the youngest upstream message iterator wrapper: " "status=%s", - bt_message_iterator_status_string(status)); + bt_self_message_iterator_status_string(status)); } goto end; @@ -1075,7 +1082,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( "last-returned-ts=%" PRId64, muxer_msg_iter, next_return_ts, muxer_msg_iter->last_returned_ts_ns); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } @@ -1084,7 +1091,7 @@ bt_message_iterator_status muxer_msg_iter_do_next_one( "muxer-upstream-msg-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_msg_iter, muxer_upstream_msg_iter, next_return_ts); - BT_ASSERT(status == BT_MESSAGE_ITERATOR_STATUS_OK); + BT_ASSERT(status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK); BT_ASSERT(muxer_upstream_msg_iter); /* @@ -1100,20 +1107,20 @@ end: } static -bt_message_iterator_status muxer_msg_iter_do_next( +bt_self_message_iterator_status muxer_msg_iter_do_next( struct muxer_comp *muxer_comp, struct muxer_msg_iter *muxer_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + bt_self_message_iterator_status status = + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; uint64_t i = 0; - while (i < capacity && status == BT_MESSAGE_ITERATOR_STATUS_OK) { + while (i < capacity && status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { status = muxer_msg_iter_do_next_one(muxer_comp, muxer_msg_iter, &msgs[i]); - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + if (status == BT_SELF_MESSAGE_ITERATOR_STATUS_OK) { i++; } } @@ -1132,7 +1139,7 @@ bt_message_iterator_status muxer_msg_iter_do_next( * message, in which case we'll return it. */ *count = i; - status = BT_MESSAGE_ITERATOR_STATUS_OK; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_OK; } return status; @@ -1232,7 +1239,7 @@ bt_self_message_iterator_status muxer_msg_iter_init( struct muxer_comp *muxer_comp = NULL; struct muxer_msg_iter *muxer_msg_iter = NULL; bt_self_message_iterator_status status = - BT_MESSAGE_ITERATOR_STATUS_OK; + BT_SELF_MESSAGE_ITERATOR_STATUS_OK; int ret; muxer_comp = bt_self_component_get_data( @@ -1308,7 +1315,7 @@ error: destroy_muxer_msg_iter(muxer_msg_iter); bt_self_message_iterator_set_data(self_msg_iter, NULL); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_SELF_MESSAGE_ITERATOR_STATUS_ERROR; end: muxer_comp->initializing_muxer_msg_iter = false; @@ -1341,12 +1348,12 @@ void muxer_msg_iter_finalize( } BT_HIDDEN -bt_message_iterator_status muxer_msg_iter_next( +bt_self_message_iterator_status muxer_msg_iter_next( bt_self_message_iterator *self_msg_iter, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - bt_message_iterator_status status; + bt_self_message_iterator_status status; struct muxer_msg_iter *muxer_msg_iter = bt_self_message_iterator_get_data(self_msg_iter); bt_self_component *self_comp = NULL; @@ -1370,11 +1377,11 @@ bt_message_iterator_status muxer_msg_iter_next( "comp-addr=%p, muxer-comp-addr=%p, muxer-msg-iter-addr=%p, " "msg-iter-addr=%p, status=%s", self_comp, muxer_comp, muxer_msg_iter, self_msg_iter, - bt_message_iterator_status_string(status)); + bt_self_message_iterator_status_string(status)); } else { BT_LOGV("Returning from muxer component's message iterator's \"next\" method: " "status=%s", - bt_message_iterator_status_string(status)); + bt_self_message_iterator_status_string(status)); } return status; @@ -1455,29 +1462,3 @@ bt_self_component_status muxer_input_port_connected( end: return status; } - -BT_HIDDEN -void muxer_input_port_disconnected( - bt_self_component_filter *self_component, - bt_self_component_port_input *self_port) -{ - struct muxer_comp *muxer_comp = - bt_self_component_get_data( - bt_self_component_filter_as_self_component( - self_component)); - const bt_port *port = - bt_self_component_port_as_port( - bt_self_component_port_input_as_self_component_port( - self_port)); - - BT_ASSERT(port); - BT_ASSERT(muxer_comp); - - /* One more available input port */ - muxer_comp->available_input_ports++; - BT_LOGD("Leaving disconnected input port available for future connections: " - "comp-addr=%p, muxer-comp-addr=%p, port-addr=%p, " - "port-name=\"%s\", avail-input-port-count=%zu", - self_component, muxer_comp, port, bt_port_get_name(port), - muxer_comp->available_input_ports); -}