projects
/
babeltrace.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
flt.utils.muxer: replace queue with array
[babeltrace.git]
/
src
/
plugins
/
utils
/
muxer
/
muxer.c
diff --git
a/src/plugins/utils/muxer/muxer.c
b/src/plugins/utils/muxer/muxer.c
index f340f38d227a77592b7fe30474a0be0a0b73a928..fa049cf0dd939b65272dc1d02dc1ab1f82bfdb42 100644
(file)
--- a/
src/plugins/utils/muxer/muxer.c
+++ b/
src/plugins/utils/muxer/muxer.c
@@
-43,7
+43,10
@@
struct muxer_upstream_msg_iter {
bt_message_iterator *msg_iter;
/* Contains `const bt_message *`, owned by this */
bt_message_iterator *msg_iter;
/* Contains `const bt_message *`, owned by this */
- GQueue *msgs;
+ GPtrArray *msgs;
+
+ /* Index of the next message in `msgs` to return */
+ guint next_msg;
};
enum muxer_msg_iter_clock_class_expectation {
};
enum muxer_msg_iter_clock_class_expectation {
@@
-104,11
+107,7
@@
struct muxer_msg_iter {
static
void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
{
static
void empty_message_queue(struct muxer_upstream_msg_iter *upstream_msg_iter)
{
- const bt_message *msg;
-
- while ((msg = g_queue_pop_head(upstream_msg_iter->msgs))) {
- bt_message_put_ref(msg);
- }
+ g_ptr_array_set_size(upstream_msg_iter->msgs, 0);
}
static
}
static
@@
-123,16
+122,17
@@
void destroy_muxer_upstream_msg_iter(
muxer_comp = muxer_upstream_msg_iter->muxer_comp;
BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
muxer_comp = muxer_upstream_msg_iter->muxer_comp;
BT_COMP_LOGD("Destroying muxer's upstream message iterator wrapper: "
- "addr=%p, msg-iter-addr=%p, queue-len=%u",
+ "addr=%p, msg-iter-addr=%p, queue-len=%u
, next-msg=%u
",
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter,
muxer_upstream_msg_iter,
muxer_upstream_msg_iter->msg_iter,
- muxer_upstream_msg_iter->msgs->length);
+ muxer_upstream_msg_iter->msgs->len,
+ muxer_upstream_msg_iter->next_msg);
+
bt_message_iterator_put_ref(
muxer_upstream_msg_iter->msg_iter);
if (muxer_upstream_msg_iter->msgs) {
bt_message_iterator_put_ref(
muxer_upstream_msg_iter->msg_iter);
if (muxer_upstream_msg_iter->msgs) {
- empty_message_queue(muxer_upstream_msg_iter);
- g_queue_free(muxer_upstream_msg_iter->msgs);
+ g_ptr_array_free(muxer_upstream_msg_iter->msgs, TRUE);
}
g_free(muxer_upstream_msg_iter);
}
g_free(muxer_upstream_msg_iter);
@@
-156,10
+156,11
@@
int muxer_msg_iter_add_upstream_msg_iter(struct muxer_msg_iter *muxer_msg_iter,
muxer_upstream_msg_iter->muxer_comp = muxer_comp;
muxer_upstream_msg_iter->msg_iter = self_msg_iter;
bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
muxer_upstream_msg_iter->muxer_comp = muxer_comp;
muxer_upstream_msg_iter->msg_iter = self_msg_iter;
bt_message_iterator_get_ref(muxer_upstream_msg_iter->msg_iter);
- muxer_upstream_msg_iter->msgs = g_queue_new();
+ muxer_upstream_msg_iter->msgs =
+ g_ptr_array_new_with_free_func((GDestroyNotify) bt_message_put_ref);
if (!muxer_upstream_msg_iter->msgs) {
BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
if (!muxer_upstream_msg_iter->msgs) {
BT_COMP_LOGE_APPEND_CAUSE(muxer_comp->self_comp,
- "Failed to allocate a G
Queue
.");
+ "Failed to allocate a G
PtrArray
.");
goto error;
}
goto error;
}
@@
-411,6
+412,9
@@
bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
BT_ASSERT_DBG(count > 0);
BT_COMP_LOGD_STR("Validated upstream message iterator wrapper.");
BT_ASSERT_DBG(count > 0);
+ g_ptr_array_set_size(muxer_upstream_msg_iter->msgs, count);
+ muxer_upstream_msg_iter->next_msg = 0;
+
/* Move messages to our queue */
for (i = 0; i < count; i++) {
/*
/* Move messages to our queue */
for (i = 0; i < count; i++) {
/*
@@
-418,8
+422,8
@@
bt_message_iterator_class_next_method_status muxer_upstream_msg_iter_next(
* (muxer_msg_iter_do_next_one()) consumes
* from the head first.
*/
* (muxer_msg_iter_do_next_one()) consumes
* from the head first.
*/
- g_
queue_push_tail(muxer_upstream_msg_iter->msgs,
-
(void *) msgs[i])
;
+ g_
ptr_array_index(muxer_upstream_msg_iter->msgs, i)
+
= (gpointer *) msgs[i]
;
}
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
break;
}
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
break;
@@
-833,8
+837,10
@@
muxer_msg_iter_youngest_upstream_msg_iter(
continue;
}
continue;
}
- BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->msgs->length > 0);
- msg = g_queue_peek_head(cur_muxer_upstream_msg_iter->msgs);
+ BT_ASSERT_DBG(cur_muxer_upstream_msg_iter->next_msg <
+ cur_muxer_upstream_msg_iter->msgs->len);
+ msg = g_ptr_array_index(cur_muxer_upstream_msg_iter->msgs,
+ cur_muxer_upstream_msg_iter->next_msg);
BT_ASSERT_DBG(msg);
if (G_UNLIKELY(bt_message_get_type(msg) ==
BT_ASSERT_DBG(msg);
if (G_UNLIKELY(bt_message_get_type(msg) ==
@@
-893,8
+899,11
@@
muxer_msg_iter_youngest_upstream_msg_iter(
* current candidate message. We must break the tie
* in a predictable manner.
*/
* current candidate message. We must break the tie
* in a predictable manner.
*/
- const bt_message *selected_msg = g_queue_peek_head(
- (*muxer_upstream_msg_iter)->msgs);
+ BT_ASSERT_DBG((*muxer_upstream_msg_iter)->next_msg <
+ (*muxer_upstream_msg_iter)->msgs->len);
+ const bt_message *selected_msg =
+ g_ptr_array_index((*muxer_upstream_msg_iter)->msgs,
+ (*muxer_upstream_msg_iter)->next_msg);
BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
BT_COMP_LOGD_STR("Two of the next message candidates have the same timestamps, pick one deterministically.");
/*
@@
-944,11
+953,12
@@
validate_muxer_upstream_msg_iter(
"muxer-upstream-msg-iter-wrap-addr=%p",
muxer_upstream_msg_iter);
"muxer-upstream-msg-iter-wrap-addr=%p",
muxer_upstream_msg_iter);
- if (muxer_upstream_msg_iter->
msgs->length > 0
||
+ if (muxer_upstream_msg_iter->
next_msg < muxer_upstream_msg_iter->msgs->len
||
!muxer_upstream_msg_iter->msg_iter) {
BT_COMP_LOGD("Already valid or not considered: "
!muxer_upstream_msg_iter->msg_iter) {
BT_COMP_LOGD("Already valid or not considered: "
- "queue-len=%u, upstream-msg-iter-addr=%p",
- muxer_upstream_msg_iter->msgs->length,
+ "queue-len=%u, next-msg=%u, upstream-msg-iter-addr=%p",
+ muxer_upstream_msg_iter->msgs->len,
+ muxer_upstream_msg_iter->next_msg,
muxer_upstream_msg_iter->msg_iter);
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
goto end;
muxer_upstream_msg_iter->msg_iter);
status = BT_MESSAGE_ITERATOR_CLASS_NEXT_METHOD_STATUS_OK;
goto end;
@@
-1099,7
+1109,11
@@
bt_message_iterator_class_next_method_status muxer_msg_iter_do_next_one(
* Consume from the queue's head: other side
* (muxer_upstream_msg_iter_next()) writes to the tail.
*/
* Consume from the queue's head: other side
* (muxer_upstream_msg_iter_next()) writes to the tail.
*/
- *msg = g_queue_pop_head(muxer_upstream_msg_iter->msgs);
+ *msg = g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+ muxer_upstream_msg_iter->next_msg);
+ g_ptr_array_index(muxer_upstream_msg_iter->msgs,
+ muxer_upstream_msg_iter->next_msg) = NULL;
+ ++muxer_upstream_msg_iter->next_msg;
BT_ASSERT_DBG(*msg);
muxer_msg_iter->last_returned_ts_ns = next_return_ts;
BT_ASSERT_DBG(*msg);
muxer_msg_iter->last_returned_ts_ns = next_return_ts;
This page took
0.027672 seconds
and
4
git commands to generate.