bt_message_iterator *msg_iter;
/* Contains `const bt_message *`, owned by this */
- GQueue *msgs;
+ GPtrArray *msgs;
+
+ /* Index of the next message in `msgs` to return */
+ guint next_msg;
};
enum muxer_msg_iter_clock_class_expectation {
static
void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
{
- const bt_message *msg;
-
- while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
- bt_message_put_ref(msg);
- }
+ g_ptr_array_set_size(upstream_msg_iter->msgs, 0);
}
static
muxer_comp = muxer_upstream_msg_iter->muxer_comp;
BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
- "addr=%p, msg-iter-addr=%p, queue-len=%u",
+ "addr=%p, msg-iter-addr=%p, queue-len=%u, next-msg=%u",
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter,
- muxer_upstream_msg_iter->msgs->length);
+ muxer_upstream_msg_iter->msgs->len,
+ muxer_upstream_msg_iter->next_msg);
+
bt_message_iterator_put_ref(
muxer_upstream_msg_iter->msg_iter);
if (muxer_upstream_msg_iter->msgs) {
- empty_message_queue(muxer_upstream_msg_iter);
- g_queue_free(muxer_upstream_msg_iter->msgs);
+ g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE);
}
g_free(muxer_upstream_msg_iter);
muxer_upstream_msg_iter->muxer_comp = muxer_comp;
muxer_upstream_msg_iter->msg_iter = self_msg_iter;
bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
- muxer_upstream_msg_iter->msgs = g_queue_new();
+ muxer_upstream_msg_iter->msgs =
+ g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref);
if (!muxer_upstream_msg_iter->msgs) {
BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
- "Failed to allocate a GQueue.");
+ "Failed to allocate a GPtrArray.");
goto error;
}
BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
BT_ASSERT_DBG(count > 0);
+ g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count);
+ muxer_upstream_msg_iter->next_msg = 0;
+
/* Move messages to our queue */
for (i = 0; i < count; i++) {
/*
* (muxer_msg_iter_do_next_one()) consumes
* from the head first.
*/
- g_queue_push_tail(muxer_upstream_msg_iter->msgs,
- (void *) msgs[i]);
+ g_ptr_array_index(muxer_upstream_msg_iter->msgs, i)
+ = (gpointer *) msgs[i];
}
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
break;
continue;
}
- BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0);
- msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
+ BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg <
+ cur_muxer_upstream_msg_iter->msgs->len);
+ msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs,
+ cur_muxer_upstream_msg_iter->next_msg);
BT_ASSERT_DBG(msg);
if (G_UNLIKELY(bt_message_get_type(msg) ==
* current candidate message. We must break the tie
* in a predictable manner.
*/
- const bt_message *selected_msg = g_queue_peek_head(
- (*muxer_upstream_msg_iter)->msgs);
+ BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg <
+ (*muxer_upstream_msg_iter)->msgs->len);
+ const bt_message *selected_msg =
+ g_ptr_array_index((*muxer_upstream_msg_iter)->msgs,
+ (*muxer_upstream_msg_iter)->next_msg);
BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
"muxer-upstream-msg-iter-wrap-addr=%p",
muxer_upstream_msg_iter);
- if (muxer_upstream_msg_iter->msgs->length > 0 ||
+ if (muxer_upstream_msg_iter->next_msg < muxer_upstream_msg_iter->msgs->len ||
!muxer_upstream_msg_iter->msg_iter) {
BT_COMP_LOGD("Already valid or not considered: "
- "queue-len=%u, upstream-msg-iter-addr=%p",
- muxer_upstream_msg_iter->msgs->length,
+ "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p",
+ muxer_upstream_msg_iter->msgs->len,
+ muxer_upstream_msg_iter->next_msg,
muxer_upstream_msg_iter->msg_iter);
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
goto end;
* Consume from the queue's head: other side
* (muxer_upstream_msg_iter_next()) writes to the tail.
*/
- *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs);
+ *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+ muxer_upstream_msg_iter->next_msg);
+ g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+ muxer_upstream_msg_iter->next_msg) = NULL;
+ ++muxer_upstream_msg_iter->next_msg;
BT_ASSERT_DBG(*msg);
muxer_msg_iter->last_returned_ts_ns = next_return_ts;