X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=lib%2Fgraph%2Fiterator.c;h=6a969826cc4da08a253d023360e01ead5851a99c;hb=2cf86e2068e546ad060b45612d1cdf28178a7ace;hp=a7c9ea17ea9ca0cb2017879ee6da1b79391edeaf;hpb=40bf6fd0de34c06399c8272ba703b43ea042d20a;p=babeltrace.git diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index a7c9ea17..6a969826 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -25,6 +25,8 @@ #include #include +#include +#include #include #include #include @@ -81,6 +83,7 @@ (_iter)->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR, \ "Message iterator is in the wrong state: %!+i", _iter) +BT_ASSERT_PRE_FUNC static inline void _set_self_comp_port_input_msg_iterator_state( struct bt_self_component_port_input_message_iterator *iterator, @@ -95,7 +98,7 @@ void _set_self_comp_port_input_msg_iterator_state( #ifdef BT_DEV_MODE # define set_self_comp_port_input_msg_iterator_state _set_self_comp_port_input_msg_iterator_state #else -# define set_self_comp_port_input_msg_iterator_state(_a, _b) +# define set_self_comp_port_input_msg_iterator_state(_a, _b) ((void) _a); ((void) _b); #endif static @@ -148,17 +151,12 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj } if (iterator->auto_seek_msgs) { - uint64_t i; - - /* Put any remaining message in the auto-seek array */ - for (i = 0; i < iterator->auto_seek_msgs->len; i++) { - if (iterator->auto_seek_msgs->pdata[i]) { - bt_object_put_no_null_check( - iterator->auto_seek_msgs->pdata[i]); - } + while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + bt_object_put_no_null_check( + g_queue_pop_tail(iterator->auto_seek_msgs)); } - g_ptr_array_free(iterator->auto_seek_msgs, TRUE); + g_queue_free(iterator->auto_seek_msgs); iterator->auto_seek_msgs = NULL; } @@ -324,14 +322,13 @@ bt_self_component_port_input_message_iterator_create_initial( goto end; } - iterator->auto_seek_msgs = g_ptr_array_new(); + iterator->auto_seek_msgs = g_queue_new(); if (!iterator->auto_seek_msgs) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); + BT_LOGE_STR("Failed to allocate a GQueue."); ret = -1; goto end; } - g_ptr_array_set_size(iterator->auto_seek_msgs, MSG_BATCH_SIZE); iterator->upstream_component = upstream_comp; iterator->upstream_port = upstream_port; iterator->connection = iterator->upstream_port->connection; @@ -530,39 +527,6 @@ void bt_self_message_iterator_set_data( "%!+i, user-data-addr=%p", iterator, data); } -BT_ASSERT_PRE_FUNC -static inline -void bt_message_borrow_packet_stream(const struct bt_message *msg, - const struct bt_stream **stream, - const struct bt_packet **packet) -{ - BT_ASSERT(msg); - - switch (msg->type) { - case BT_MESSAGE_TYPE_EVENT: - *packet = bt_event_borrow_packet_const( - bt_message_event_borrow_event_const(msg)); - *stream = bt_packet_borrow_stream_const(*packet); - break; - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - *stream = bt_message_stream_beginning_borrow_stream_const(msg); - break; - case BT_MESSAGE_TYPE_STREAM_END: - *stream = bt_message_stream_end_borrow_stream_const(msg); - break; - case BT_MESSAGE_TYPE_PACKET_BEGINNING: - *packet = bt_message_packet_beginning_borrow_packet_const(msg); - *stream = bt_packet_borrow_stream_const(*packet); - break; - case BT_MESSAGE_TYPE_PACKET_END: - *packet = bt_message_packet_end_borrow_packet_const(msg); - *stream = bt_packet_borrow_stream_const(*packet); - break; - default: - break; - } -} - enum bt_message_iterator_status bt_self_component_port_input_message_iterator_next( struct bt_self_component_port_input_message_iterator *iterator, @@ -754,6 +718,8 @@ bt_port_output_message_iterator_create(struct bt_graph *graph, (void *) graph, "Output port is not part of graph: %![graph-]+g, %![port-]+p", graph, output_port); + BT_ASSERT_PRE(!graph->has_sink, + "Graph already has a sink component: %![graph-]+g"); /* Create message iterator */ BT_LIB_LOGD("Creating message iterator on output port: " @@ -916,6 +882,7 @@ bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( return can; } +BT_ASSERT_PRE_FUNC static inline void _set_iterator_state_after_seeking( struct bt_self_component_port_input_message_iterator *iterator, @@ -948,7 +915,7 @@ void _set_iterator_state_after_seeking( #ifdef BT_DEV_MODE # define set_iterator_state_after_seeking _set_iterator_state_after_seeking #else -# define set_iterator_state_after_seeking(_iter, _status) +# define set_iterator_state_after_seeking(_iter, _status) ((void) _iter); ((void) _status); #endif enum bt_message_iterator_status @@ -979,17 +946,24 @@ bt_self_component_port_input_message_iterator_seek_beginning( status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, bt_self_message_iterator_status_string(status)); + iterator, bt_common_self_message_iterator_status_string(status)); set_iterator_state_after_seeking(iterator, status); return status; } static inline -int get_message_ns_from_origin(const struct bt_message *msg, - int64_t *ns_from_origin, bool *ignore) +enum bt_message_iterator_status auto_seek_handle_message( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin, const struct bt_message *msg, + bool *got_first) { + enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + int64_t msg_ns_from_origin; const struct bt_clock_snapshot *clk_snapshot = NULL; - int ret = 0; + int ret; + + BT_ASSERT(msg); + BT_ASSERT(got_first); switch (msg->type) { case BT_MESSAGE_TYPE_EVENT: @@ -1027,14 +1001,68 @@ int get_message_ns_from_origin(const struct bt_message *msg, case BT_MESSAGE_TYPE_DISCARDED_EVENTS: case BT_MESSAGE_TYPE_DISCARDED_PACKETS: { - const struct bt_message_discarded_items *disc_items_msg = - (const void *) msg; + struct bt_message_discarded_items *msg_disc_items = + (void *) msg; + + BT_ASSERT_PRE(msg_disc_items->default_begin_cs && + msg_disc_items->default_end_cs, + "Discarded events/packets message has no default clock snapshots: %!+n", + msg_disc_items); + ret = bt_clock_snapshot_get_ns_from_origin( + msg_disc_items->default_begin_cs, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } - clk_snapshot = disc_items_msg->default_begin_cs; - BT_ASSERT_PRE(clk_snapshot, - "Discarded events/packets message has no default clock snapshot: %!+n", - msg); - break; + if (msg_ns_from_origin >= ns_from_origin) { + *got_first = true; + goto push_msg; + } + + ret = bt_clock_snapshot_get_ns_from_origin( + msg_disc_items->default_end_cs, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (msg_ns_from_origin >= ns_from_origin) { + /* + * The discarded items message's beginning time + * is before the requested seeking time, but its + * end time is after. Modify the message so as + * to set its beginning time to the requested + * seeking time, and make its item count unknown + * as we don't know if items were really + * discarded within the new time range. + */ + uint64_t new_begin_raw_value; + + ret = bt_clock_class_clock_value_from_ns_from_origin( + msg_disc_items->default_end_cs->clock_class, + ns_from_origin, &new_begin_raw_value); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + bt_clock_snapshot_set_raw_value( + msg_disc_items->default_begin_cs, + new_begin_raw_value); + msg_disc_items->count.base.avail = + BT_PROPERTY_AVAILABILITY_NOT_AVAILABLE; + + /* + * It is safe to push it because its beginning + * time is exactly the requested seeking time. + */ + goto push_msg; + } else { + goto skip_msg; + } } case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: { @@ -1049,7 +1077,7 @@ int get_message_ns_from_origin(const struct bt_message *msg, * and we can't assume any specific time for an * unknown clock snapshot, so skip this. */ - goto set_ignore; + goto skip_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: clk_snapshot = stream_act_msg->default_cs; BT_ASSERT(clk_snapshot); @@ -1071,14 +1099,14 @@ int get_message_ns_from_origin(const struct bt_message *msg, * We can't assume any specific time for an * unknown clock snapshot, so skip this. */ - goto set_ignore; + goto skip_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: /* * +inf is always greater than any requested * time. */ - *ns_from_origin = INT64_MAX; - goto end; + *got_first = true; + goto push_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: clk_snapshot = stream_act_msg->default_cs; BT_ASSERT(clk_snapshot); @@ -1092,22 +1120,36 @@ int get_message_ns_from_origin(const struct bt_message *msg, case BT_MESSAGE_TYPE_STREAM_BEGINNING: case BT_MESSAGE_TYPE_STREAM_END: /* Ignore */ - break; + goto skip_msg; default: abort(); } -set_ignore: - if (!clk_snapshot) { - *ignore = true; + BT_ASSERT(clk_snapshot); + ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } - ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, - ns_from_origin); + if (msg_ns_from_origin >= ns_from_origin) { + *got_first = true; + goto push_msg; + } + +skip_msg: + bt_object_put_no_null_check(msg); + msg = NULL; + goto end; + +push_msg: + g_queue_push_head(iterator->auto_seek_msgs, (void *) msg); + msg = NULL; end: - return ret; + BT_ASSERT(!msg || status != BT_MESSAGE_ITERATOR_STATUS_OK); + return status; } static @@ -1121,6 +1163,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( const struct bt_message *messages[MSG_BATCH_SIZE]; uint64_t user_count = 0; uint64_t i; + bool got_first = false; BT_ASSERT(iterator); memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE); @@ -1134,7 +1177,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( BT_ASSERT(iterator->methods.next); - while (true) { + while (!got_first) { /* * Call the user's "next" method to get the next * messages and status. @@ -1170,60 +1213,22 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( abort(); } - /* - * Find first message which has a default clock snapshot - * that is greater than or equal to the requested value. - * - * For event and message iterator inactivity messages, compare with the - * default clock snapshot. - * - * For packet beginning messages, compare with the - * default beginning clock snapshot, if any. - * - * For packet end messages, compare with the default end - * clock snapshot, if any. - * - * For stream beginning, stream end, ignore. - */ for (i = 0; i < user_count; i++) { - const struct bt_message *msg = messages[i]; - int64_t msg_ns_from_origin; - bool ignore = false; - int ret; - - BT_ASSERT(msg); - ret = get_message_ns_from_origin(msg, &msg_ns_from_origin, - &ignore); - if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - goto end; - } - - if (ignore) { - /* Skip message without a clock snapshot */ + if (got_first) { + g_queue_push_head(iterator->auto_seek_msgs, + (void *) messages[i]); + messages[i] = NULL; continue; } - if (msg_ns_from_origin >= ns_from_origin) { - /* - * We found it: move this message and - * the following ones to the iterator's - * auto-seek message array. - */ - uint64_t j; - - for (j = i; j < user_count; j++) { - iterator->auto_seek_msgs->pdata[j - i] = - (void *) messages[j]; - messages[j] = NULL; - } - - iterator->auto_seek_msg_count = user_count - i; + status = auto_seek_handle_message(iterator, + ns_from_origin, messages[i], &got_first); + if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + /* Message was either pushed or moved */ + messages[i] = NULL; + } else { goto end; } - - bt_object_put_no_null_check(msg); - messages[i] = NULL; } } @@ -1244,44 +1249,47 @@ enum bt_self_message_iterator_status post_auto_seek_next( bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - BT_ASSERT(iterator->auto_seek_msg_count <= capacity); - BT_ASSERT(iterator->auto_seek_msg_count > 0); + BT_ASSERT(!g_queue_is_empty(iterator->auto_seek_msgs)); + *count = 0; /* * Move auto-seek messages to the output array (which is this - * iterator's base message array. + * iterator's base message array). */ - memcpy(&msgs[0], &iterator->auto_seek_msgs->pdata[0], - sizeof(msgs[0]) * iterator->auto_seek_msg_count); - memset(&iterator->auto_seek_msgs->pdata[0], 0, - sizeof(iterator->auto_seek_msgs->pdata[0]) * - iterator->auto_seek_msg_count); - *count = iterator->auto_seek_msg_count; - - /* Restore real user's "next" method */ - switch (iterator->upstream_component->class->type) { - case BT_COMPONENT_CLASS_TYPE_SOURCE: - { - struct bt_component_class_source *src_comp_cls = - (void *) iterator->upstream_component->class; - - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - src_comp_cls->methods.msg_iter_next; - break; + while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek_msgs)) { + msgs[*count] = g_queue_pop_tail(iterator->auto_seek_msgs); + capacity--; + (*count)++; } - case BT_COMPONENT_CLASS_TYPE_FILTER: - { - struct bt_component_class_filter *flt_comp_cls = - (void *) iterator->upstream_component->class; - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - flt_comp_cls->methods.msg_iter_next; - break; - } - default: - abort(); + BT_ASSERT(*count > 0); + + if (g_queue_is_empty(iterator->auto_seek_msgs)) { + /* No more auto-seek messages */ + switch (iterator->upstream_component->class->type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + { + struct bt_component_class_source *src_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + src_comp_cls->methods.msg_iter_next; + break; + } + case BT_COMPONENT_CLASS_TYPE_FILTER: + { + struct bt_component_class_filter *flt_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + flt_comp_cls->methods.msg_iter_next; + break; + } + default: + abort(); + } } return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; @@ -1322,7 +1330,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", iterator, - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); } else { /* Start automatic seeking: seek beginning first */ BT_ASSERT(iterator->methods.can_seek_beginning(iterator)); @@ -1338,7 +1346,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", iterator, - bt_self_message_iterator_status_string(status)); + bt_common_self_message_iterator_status_string(status)); switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: break; @@ -1353,36 +1361,45 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( /* * Find the first message which has a default clock * snapshot greater than or equal to the requested - * nanoseconds from origin, and move the received - * messages from this point in the batch to this - * iterator's auto-seek message array. + * seeking time, and move the received messages from + * this point in the batch to this iterator's auto-seek + * message queue. */ + while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + bt_object_put_no_null_check( + g_queue_pop_tail(iterator->auto_seek_msgs)); + } + status = find_message_ge_ns_from_origin(iterator, ns_from_origin); switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_MESSAGE_ITERATOR_STATUS_END: /* - * Replace the user's "next" method with a - * custom, temporary "next" method which returns - * the messages in the iterator's message array. + * If there are messages in the auto-seek + * message queue, replace the user's "next" + * method with a custom, temporary "next" method + * which returns them. */ - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - post_auto_seek_next; + if (!g_queue_is_empty(iterator->auto_seek_msgs)) { + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + post_auto_seek_next; + } + + /* + * `BT_MESSAGE_ITERATOR_STATUS_END` becomes + * `BT_MESSAGE_ITERATOR_STATUS_OK`: the next + * time this iterator's "next" method is called, + * it will return + * `BT_MESSAGE_ITERATOR_STATUS_END`. + */ + status = BT_MESSAGE_ITERATOR_STATUS_OK; break; case BT_MESSAGE_ITERATOR_STATUS_ERROR: case BT_MESSAGE_ITERATOR_STATUS_NOMEM: case BT_MESSAGE_ITERATOR_STATUS_AGAIN: goto end; - case BT_MESSAGE_ITERATOR_STATUS_END: - /* - * The iterator reached the end: just return - * `BT_MESSAGE_ITERATOR_STATUS_OK` here, as if - * the seeking operation occured: the next - * "next" method will return - * `BT_MESSAGE_ITERATOR_STATUS_END` itself. - */ - break; default: abort(); } @@ -1390,11 +1407,6 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( end: set_iterator_state_after_seeking(iterator, status); - - if (status == BT_MESSAGE_ITERATOR_STATUS_END) { - status = BT_MESSAGE_ITERATOR_STATUS_OK; - } - return status; }