X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Futils%2Fmuxer%2Fmuxer.c;h=663bd3460d0a7fe741631eba2a02aba1e9bdf7c4;hb=d4393e0875e7b08f6ee97d617cc5f2c9286742a4;hp=61cd4883319ff3332037d844df015e75d93fada7;hpb=26e21a82c47a15d1080dc142cb20c0b0b0b5a929;p=babeltrace.git diff --git a/plugins/utils/muxer/muxer.c b/plugins/utils/muxer/muxer.c index 61cd4883..663bd346 100644 --- a/plugins/utils/muxer/muxer.c +++ b/plugins/utils/muxer/muxer.c @@ -61,21 +61,8 @@ struct muxer_upstream_notif_iter { /* Owned by this, NULL if ended */ struct bt_notification_iterator *notif_iter; - /* Owned by this */ - struct bt_notification *notif; - - /* - * This flag is true if the upstream notification iterator's - * current notification must be considered for the multiplexing - * operations. If the upstream iterator returns - * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN, then this object - * is considered invalid, because its current notification is - * still the previous one, but we already took it into account. - * - * The value of this flag is not important if notif_iter above - * is NULL (which means the upstream iterator is finished). - */ - bool is_valid; + /* Contains `struct bt_notification *`, owned by this */ + GQueue *notifs; }; enum muxer_notif_iter_clock_class_expectation { @@ -107,9 +94,6 @@ struct muxer_notif_iter { */ GList *newly_connected_priv_ports; - /* Next thing to return by the "next" method */ - struct bt_notification_iterator_next_method_return next_next_return; - /* Last time returned in a notification */ int64_t last_returned_ts_ns; @@ -133,12 +117,23 @@ void destroy_muxer_upstream_notif_iter( } BT_LOGD("Destroying muxer's upstream notification iterator wrapper: " - "addr=%p, notif-iter-addr=%p, is-valid=%d", + "addr=%p, notif-iter-addr=%p, queue-len=%u", muxer_upstream_notif_iter, muxer_upstream_notif_iter->notif_iter, - muxer_upstream_notif_iter->is_valid); + muxer_upstream_notif_iter->notifs->length); bt_put(muxer_upstream_notif_iter->notif_iter); - bt_put(muxer_upstream_notif_iter->notif); + + if (muxer_upstream_notif_iter->notifs) { + struct bt_notification *notif; + + while ((notif = g_queue_pop_head( + muxer_upstream_notif_iter->notifs))) { + bt_put(notif); + } + + g_queue_free(muxer_upstream_notif_iter->notifs); + } + g_free(muxer_upstream_notif_iter); } @@ -157,7 +152,13 @@ struct muxer_upstream_notif_iter *muxer_notif_iter_add_upstream_notif_iter( } muxer_upstream_notif_iter->notif_iter = bt_get(notif_iter); - muxer_upstream_notif_iter->is_valid = false; + muxer_upstream_notif_iter->notifs = g_queue_new(); + if (!muxer_upstream_notif_iter->notifs) { + BT_LOGE_STR("Failed to allocate a GQueue."); + + goto end; + } + g_ptr_array_add(muxer_notif_iter->muxer_upstream_notif_iters, muxer_upstream_notif_iter); BT_LOGD("Added muxer's upstream notification iterator wrapper: " @@ -452,36 +453,45 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( struct muxer_upstream_notif_iter *muxer_upstream_notif_iter) { enum bt_notification_iterator_status status; - struct bt_notification *notif = NULL; + bt_notification_array notifs; + uint64_t i; + uint64_t count; BT_LOGV("Calling upstream notification iterator's \"next\" method: " "muxer-upstream-notif-iter-wrap-addr=%p, notif-iter-addr=%p", muxer_upstream_notif_iter, muxer_upstream_notif_iter->notif_iter); status = bt_private_connection_notification_iterator_next( - muxer_upstream_notif_iter->notif_iter, ¬if); + muxer_upstream_notif_iter->notif_iter, ¬ifs, &count); BT_LOGV("Upstream notification iterator's \"next\" method returned: " "status=%s", bt_notification_iterator_status_string(status)); switch (status) { case BT_NOTIFICATION_ITERATOR_STATUS_OK: /* - * Notification iterator's current notification is valid: - * it must be considered for muxing operations. + * Notification iterator's current notification is + * valid: it must be considered for muxing operations. */ BT_LOGV_STR("Validated upstream notification iterator wrapper."); - muxer_upstream_notif_iter->is_valid = true; - BT_MOVE(muxer_upstream_notif_iter->notif, notif); + BT_ASSERT(count > 0); + + /* Move notifications to our queue */ + for (i = 0; i < count; i++) { + /* + * Push to tail in order; other side + * (muxer_notif_iter_do_next_one()) consumes + * from the head first. + */ + g_queue_push_tail(muxer_upstream_notif_iter->notifs, + notifs[i]); + } break; case BT_NOTIFICATION_ITERATOR_STATUS_AGAIN: /* * Notification iterator's current notification is not * valid anymore. Return - * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN - * immediately. + * BT_NOTIFICATION_ITERATOR_STATUS_AGAIN immediately. */ - BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_AGAIN."); - muxer_upstream_notif_iter->is_valid = false; break; case BT_NOTIFICATION_ITERATOR_STATUS_END: /* Fall-through. */ case BT_NOTIFICATION_ITERATOR_STATUS_CANCELED: @@ -490,9 +500,7 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( * won't be considered again to find the youngest * notification. */ - BT_LOGV_STR("Invalidated upstream notification iterator wrapper because of BT_NOTIFICATION_ITERATOR_STATUS_END or BT_NOTIFICATION_ITERATOR_STATUS_CANCELED."); BT_PUT(muxer_upstream_notif_iter->notif_iter); - muxer_upstream_notif_iter->is_valid = false; status = BT_NOTIFICATION_ITERATOR_STATUS_OK; break; default: @@ -503,7 +511,6 @@ enum bt_notification_iterator_status muxer_upstream_notif_iter_next( break; } - BT_ASSERT(!notif); return status; } @@ -518,7 +525,7 @@ int muxer_notif_iter_handle_newly_connected_ports( /* * Here we create one upstream notification iterator for each - * newly connected port. We do not perform an initial "next" on + * newly connected port. We do NOT perform an initial "next" on * those new upstream notification iterators: they are * invalidated, to be validated later. The list of newly * connected ports to handle here is updated by @@ -906,8 +913,8 @@ muxer_notif_iter_youngest_upstream_notif_iter( continue; } - BT_ASSERT(cur_muxer_upstream_notif_iter->is_valid); - notif = cur_muxer_upstream_notif_iter->notif; + BT_ASSERT(cur_muxer_upstream_notif_iter->notifs->length > 0); + notif = g_queue_peek_head(cur_muxer_upstream_notif_iter->notifs); BT_ASSERT(notif); ret = get_notif_ts_ns(muxer_comp, muxer_notif_iter, notif, muxer_notif_iter->last_returned_ts_ns, ¬if_ts_ns); @@ -946,8 +953,12 @@ enum bt_notification_iterator_status validate_muxer_upstream_notif_iter( "muxer-upstream-notif-iter-wrap-addr=%p", muxer_upstream_notif_iter); - if (muxer_upstream_notif_iter->is_valid || + if (muxer_upstream_notif_iter->notifs->length > 0 || !muxer_upstream_notif_iter->notif_iter) { + BT_LOGV("Already valid or not considered: " + "queue-len=%u, upstream-notif-iter-addr=%p", + muxer_upstream_notif_iter->notifs->length, + muxer_upstream_notif_iter->notif_iter); goto end; } @@ -960,7 +971,7 @@ end: static enum bt_notification_iterator_status validate_muxer_upstream_notif_iters( - struct muxer_notif_iter *muxer_notif_iter) + struct muxer_notif_iter *muxer_notif_iter) { enum bt_notification_iterator_status status = BT_NOTIFICATION_ITERATOR_STATUS_OK; @@ -1019,16 +1030,15 @@ end: return status; } -static -struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( +static inline +enum bt_notification_iterator_status muxer_notif_iter_do_next_one( struct muxer_comp *muxer_comp, - struct muxer_notif_iter *muxer_notif_iter) + struct muxer_notif_iter *muxer_notif_iter, + struct bt_notification **notif) { + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; struct muxer_upstream_notif_iter *muxer_upstream_notif_iter = NULL; - struct bt_notification_iterator_next_method_return next_return = { - .notification = NULL, - .status = BT_NOTIFICATION_ITERATOR_STATUS_OK, - }; int64_t next_return_ts; while (true) { @@ -1040,14 +1050,12 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "ret=%d", muxer_comp, muxer_notif_iter, ret); - next_return.status = - BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - next_return.status = - validate_muxer_upstream_notif_iters(muxer_notif_iter); - if (next_return.status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = validate_muxer_upstream_notif_iters(muxer_notif_iter); + if (status != BT_NOTIFICATION_ITERATOR_STATUS_OK) { /* validate_muxer_upstream_notif_iters() logs details */ goto end; } @@ -1075,21 +1083,19 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( * amongst those, of which the current notification is the * youngest. */ - next_return.status = - muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp, + status = muxer_notif_iter_youngest_upstream_notif_iter(muxer_comp, muxer_notif_iter, &muxer_upstream_notif_iter, &next_return_ts); - if (next_return.status < 0 || - next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_END || - next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) { - if (next_return.status < 0) { + if (status < 0 || status == BT_NOTIFICATION_ITERATOR_STATUS_END || + status == BT_NOTIFICATION_ITERATOR_STATUS_CANCELED) { + if (status < 0) { BT_LOGE("Cannot find the youngest upstream notification iterator wrapper: " "status=%s", - bt_notification_iterator_status_string(next_return.status)); + bt_notification_iterator_status_string(status)); } else { BT_LOGV("Cannot find the youngest upstream notification iterator wrapper: " "status=%s", - bt_notification_iterator_status_string(next_return.status)); + bt_notification_iterator_status_string(status)); } goto end; @@ -1101,7 +1107,7 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "last-returned-ts=%" PRId64, muxer_notif_iter, next_return_ts, muxer_notif_iter->last_returned_ts_ns); - next_return.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } @@ -1110,21 +1116,58 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_do_next( "muxer-upstream-notif-iter-wrap-addr=%p, " "ts=%" PRId64, muxer_notif_iter, muxer_upstream_notif_iter, next_return_ts); - BT_ASSERT(next_return.status == BT_NOTIFICATION_ITERATOR_STATUS_OK); + BT_ASSERT(status == BT_NOTIFICATION_ITERATOR_STATUS_OK); BT_ASSERT(muxer_upstream_notif_iter); - next_return.notification = bt_get(muxer_upstream_notif_iter->notif); - BT_ASSERT(next_return.notification); /* - * We invalidate the upstream notification iterator so that, the - * next time this function is called, - * validate_muxer_upstream_notif_iters() will make it valid. + * Consume from the queue's head: other side + * (muxer_upstream_notif_iter_next()) writes to the tail. */ - muxer_upstream_notif_iter->is_valid = false; + *notif = g_queue_pop_head(muxer_upstream_notif_iter->notifs); + BT_ASSERT(*notif); muxer_notif_iter->last_returned_ts_ns = next_return_ts; end: - return next_return; + return status; +} + +static +enum bt_notification_iterator_status muxer_notif_iter_do_next( + struct muxer_comp *muxer_comp, + struct muxer_notif_iter *muxer_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) +{ + enum bt_notification_iterator_status status = + BT_NOTIFICATION_ITERATOR_STATUS_OK; + uint64_t i = 0; + + while (i < capacity && status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + status = muxer_notif_iter_do_next_one(muxer_comp, + muxer_notif_iter, ¬ifs[i]); + if (status == BT_NOTIFICATION_ITERATOR_STATUS_OK) { + i++; + } + } + + if (i > 0) { + /* + * Even if muxer_notif_iter_do_next_one() returned + * something else than + * BT_NOTIFICATION_ITERATOR_STATUS_OK, we accumulated + * notification objects in the output notification + * array, so we need to return + * BT_NOTIFICATION_ITERATOR_STATUS_OK so that they are + * transfered to downstream. This other status occurs + * again the next time muxer_notif_iter_do_next() is + * called, possibly without any accumulated + * notification, in which case we'll return it. + */ + *count = i; + status = BT_NOTIFICATION_ITERATOR_STATUS_OK; + } + + return status; } static @@ -1338,10 +1381,12 @@ void muxer_notif_iter_finalize( } BT_HIDDEN -struct bt_notification_iterator_next_method_return muxer_notif_iter_next( - struct bt_private_connection_private_notification_iterator *priv_notif_iter) +enum bt_notification_iterator_status muxer_notif_iter_next( + struct bt_private_connection_private_notification_iterator *priv_notif_iter, + bt_notification_array notifs, uint64_t capacity, + uint64_t *count) { - struct bt_notification_iterator_next_method_return next_ret; + enum bt_notification_iterator_status status; struct muxer_notif_iter *muxer_notif_iter = bt_private_connection_private_notification_iterator_get_user_data(priv_notif_iter); struct bt_private_component *priv_comp = NULL; @@ -1353,7 +1398,6 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next( BT_ASSERT(priv_comp); muxer_comp = bt_private_component_get_user_data(priv_comp); BT_ASSERT(muxer_comp); - BT_LOGV("Muxer component's notification iterator's \"next\" method called: " "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p", @@ -1365,28 +1409,27 @@ struct bt_notification_iterator_next_method_return muxer_notif_iter_next( "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p", priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter); - next_ret.notification = NULL; - next_ret.status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; + status = BT_NOTIFICATION_ITERATOR_STATUS_ERROR; goto end; } - next_ret = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter); - if (next_ret.status < 0) { + status = muxer_notif_iter_do_next(muxer_comp, muxer_notif_iter, + notifs, capacity, count); + if (status < 0) { BT_LOGE("Cannot get next notification: " "comp-addr=%p, muxer-comp-addr=%p, muxer-notif-iter-addr=%p, " "notif-iter-addr=%p, status=%s", priv_comp, muxer_comp, muxer_notif_iter, priv_notif_iter, - bt_notification_iterator_status_string(next_ret.status)); + bt_notification_iterator_status_string(status)); } else { BT_LOGV("Returning from muxer component's notification iterator's \"next\" method: " - "status=%s, notif-addr=%p", - bt_notification_iterator_status_string(next_ret.status), - next_ret.notification); + "status=%s", + bt_notification_iterator_status_string(status)); } end: bt_put(priv_comp); - return next_ret; + return status; } BT_HIDDEN