From 5b9e151d6a8c0327e67bca1706ef16525d1d319d Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Wed, 27 Feb 2019 14:16:51 -0500 Subject: [PATCH] lib: iterator auto-seeking: handle intersecting discarded items messages This patch makes the message iterator auto-seeking algorithm handle discarded events and packets messages of which the seeking time intersects the message's time range, for example: discarded events msg v | ===*=========*=*===|==*== * * * ^ event msg | ^ seeking time In this scenario, we obviously don't want to keep the three event messages which occur before the seeking time, but we want to keep a part of the discarded events message because discarding it entirely (as it was done before this patch) leads to information loss, i.e.: | | * * * * | Instead, when this happens, we set the discarded events/packets message's beginning time to the seeking time, make its count unavailable, and we keep the message: | |==*== * * * | The message iterator's internal auto-seeking message array is replaced with a message queue as there can be more messages in this queue than the iterator's message array capacity now. Because of this, the post-seeking temporary "next" method fills the output message array as long as messages exist in the auto-seeking message queue. When the message queue is finally empty, it resets the iterator's "next" method to the original user method. Because setting a discarded items message's beginning time to a seeking time (in nanoseconds from origin) requires a conversion from nanoseconds from origin to a raw clock value, the internal bt_clock_class_clock_value_from_ns_from_origin() utility is added. This function can fail if there's an overflow in arithmetic operations. Signed-off-by: Philippe Proulx --- include/babeltrace/babeltrace-internal.h | 33 ++ .../graph/message-iterator-internal.h | 3 +- .../trace-ir/clock-class-internal.h | 92 ++++++ lib/graph/iterator.c | 312 ++++++++++-------- 4 files changed, 302 insertions(+), 138 deletions(-) diff --git a/include/babeltrace/babeltrace-internal.h b/include/babeltrace/babeltrace-internal.h index d58b9860..981f84b1 100644 --- a/include/babeltrace/babeltrace-internal.h +++ b/include/babeltrace/babeltrace-internal.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -61,6 +62,38 @@ ((type) (a) > (type) (b) ? (type) (a) : (type) (b)) #endif +static inline +bool bt_safe_to_mul_int64(int64_t a, int64_t b) +{ + if (a == 0 || b == 0) { + return true; + } + + return a < INT64_MAX / b; +} + +static inline +bool bt_safe_to_mul_uint64(uint64_t a, uint64_t b) +{ + if (a == 0 || b == 0) { + return true; + } + + return a < UINT64_MAX / b; +} + +static inline +bool bt_safe_to_add_int64(int64_t a, int64_t b) +{ + return a <= INT64_MAX - b; +} + +static inline +bool bt_safe_to_add_uint64(uint64_t a, uint64_t b) +{ + return a <= UINT64_MAX - b; +} + /* * Memory allocation zeroed */ diff --git a/include/babeltrace/graph/message-iterator-internal.h b/include/babeltrace/graph/message-iterator-internal.h index 3a57f5da..00ce436e 100644 --- a/include/babeltrace/graph/message-iterator-internal.h +++ b/include/babeltrace/graph/message-iterator-internal.h @@ -112,8 +112,7 @@ struct bt_self_component_port_input_message_iterator { } methods; enum bt_self_component_port_input_message_iterator_state state; - uint64_t auto_seek_msg_count; - GPtrArray *auto_seek_msgs; + GQueue *auto_seek_msgs; void *user_data; }; diff --git a/include/babeltrace/trace-ir/clock-class-internal.h b/include/babeltrace/trace-ir/clock-class-internal.h index d91d549f..80147a63 100644 --- a/include/babeltrace/trace-ir/clock-class-internal.h +++ b/include/babeltrace/trace-ir/clock-class-internal.h @@ -31,10 +31,14 @@ #include #include #include +#include #include #include #include +#define NS_PER_S_I INT64_C(1000000000) +#define NS_PER_S_U UINT64_C(1000000000) + struct bt_clock_class { struct bt_object base; @@ -102,4 +106,92 @@ void _bt_clock_class_freeze(const struct bt_clock_class *clock_class); BT_HIDDEN bt_bool bt_clock_class_is_valid(struct bt_clock_class *clock_class); +static inline +int bt_clock_class_clock_value_from_ns_from_origin( + struct bt_clock_class *cc, int64_t ns_from_origin, + uint64_t *raw_value) +{ + int ret = 0; + int64_t offset_in_ns; + uint64_t value_in_ns; + uint64_t rem_value_in_ns; + uint64_t value_periods; + uint64_t value_period_cycles; + int64_t ns_to_add; + + BT_ASSERT(cc); + BT_ASSERT(raw_value); + + /* Compute offset part of requested value, in nanoseconds */ + if (!bt_safe_to_mul_int64(cc->offset_seconds, NS_PER_S_I)) { + ret = -1; + goto end; + } + + offset_in_ns = cc->offset_seconds * NS_PER_S_I; + + if (cc->frequency == NS_PER_S_U) { + ns_to_add = (int64_t) cc->offset_cycles; + } else { + if (!bt_safe_to_mul_int64((int64_t) cc->offset_cycles, + NS_PER_S_I)) { + ret = -1; + goto end; + } + + ns_to_add = ((int64_t) cc->offset_cycles * NS_PER_S_I) / + (int64_t) cc->frequency; + } + + if (!bt_safe_to_add_int64(offset_in_ns, ns_to_add)) { + ret = -1; + goto end; + } + + offset_in_ns += ns_to_add; + + /* Value part in nanoseconds */ + if (ns_from_origin < offset_in_ns) { + ret = -1; + goto end; + } + + value_in_ns = (uint64_t) (ns_from_origin - offset_in_ns); + + /* Number of whole clock periods in `value_in_ns` */ + value_periods = value_in_ns / NS_PER_S_U; + + /* Remaining nanoseconds in cycles + whole clock periods in cycles */ + rem_value_in_ns = value_in_ns - value_periods * NS_PER_S_U; + + if (value_periods > UINT64_MAX / cc->frequency) { + ret = -1; + goto end; + } + + if (!bt_safe_to_mul_uint64(value_periods, cc->frequency)) { + ret = -1; + goto end; + } + + value_period_cycles = value_periods * cc->frequency; + + if (!bt_safe_to_mul_uint64(cc->frequency, rem_value_in_ns)) { + ret = -1; + goto end; + } + + if (!bt_safe_to_add_uint64(cc->frequency * rem_value_in_ns / NS_PER_S_U, + value_period_cycles)) { + ret = -1; + goto end; + } + + *raw_value = cc->frequency * rem_value_in_ns / NS_PER_S_U + + value_period_cycles; + +end: + return ret; +} + #endif /* BABELTRACE_TRACE_IR_CLOCK_CLASS_INTERNAL_H */ diff --git a/lib/graph/iterator.c b/lib/graph/iterator.c index a7c9ea17..5cfda703 100644 --- a/lib/graph/iterator.c +++ b/lib/graph/iterator.c @@ -25,6 +25,8 @@ #include #include +#include +#include #include #include #include @@ -148,17 +150,12 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj } if (iterator->auto_seek_msgs) { - uint64_t i; - - /* Put any remaining message in the auto-seek array */ - for (i = 0; i < iterator->auto_seek_msgs->len; i++) { - if (iterator->auto_seek_msgs->pdata[i]) { - bt_object_put_no_null_check( - iterator->auto_seek_msgs->pdata[i]); - } + while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + bt_object_put_no_null_check( + g_queue_pop_tail(iterator->auto_seek_msgs)); } - g_ptr_array_free(iterator->auto_seek_msgs, TRUE); + g_queue_free(iterator->auto_seek_msgs); iterator->auto_seek_msgs = NULL; } @@ -324,14 +321,13 @@ bt_self_component_port_input_message_iterator_create_initial( goto end; } - iterator->auto_seek_msgs = g_ptr_array_new(); + iterator->auto_seek_msgs = g_queue_new(); if (!iterator->auto_seek_msgs) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); + BT_LOGE_STR("Failed to allocate a GQueue."); ret = -1; goto end; } - g_ptr_array_set_size(iterator->auto_seek_msgs, MSG_BATCH_SIZE); iterator->upstream_component = upstream_comp; iterator->upstream_port = upstream_port; iterator->connection = iterator->upstream_port->connection; @@ -985,11 +981,18 @@ bt_self_component_port_input_message_iterator_seek_beginning( } static inline -int get_message_ns_from_origin(const struct bt_message *msg, - int64_t *ns_from_origin, bool *ignore) +enum bt_message_iterator_status auto_seek_handle_message( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin, const struct bt_message *msg, + bool *got_first) { + enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + int64_t msg_ns_from_origin; const struct bt_clock_snapshot *clk_snapshot = NULL; - int ret = 0; + int ret; + + BT_ASSERT(msg); + BT_ASSERT(got_first); switch (msg->type) { case BT_MESSAGE_TYPE_EVENT: @@ -1027,14 +1030,68 @@ int get_message_ns_from_origin(const struct bt_message *msg, case BT_MESSAGE_TYPE_DISCARDED_EVENTS: case BT_MESSAGE_TYPE_DISCARDED_PACKETS: { - const struct bt_message_discarded_items *disc_items_msg = - (const void *) msg; + struct bt_message_discarded_items *msg_disc_items = + (void *) msg; + + BT_ASSERT_PRE(msg_disc_items->default_begin_cs && + msg_disc_items->default_end_cs, + "Discarded events/packets message has no default clock snapshots: %!+n", + msg_disc_items); + ret = bt_clock_snapshot_get_ns_from_origin( + msg_disc_items->default_begin_cs, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } - clk_snapshot = disc_items_msg->default_begin_cs; - BT_ASSERT_PRE(clk_snapshot, - "Discarded events/packets message has no default clock snapshot: %!+n", - msg); - break; + if (msg_ns_from_origin >= ns_from_origin) { + *got_first = true; + goto push_msg; + } + + ret = bt_clock_snapshot_get_ns_from_origin( + msg_disc_items->default_end_cs, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + if (msg_ns_from_origin >= ns_from_origin) { + /* + * The discarded items message's beginning time + * is before the requested seeking time, but its + * end time is after. Modify the message so as + * to set its beginning time to the requested + * seeking time, and make its item count unknown + * as we don't know if items were really + * discarded within the new time range. + */ + uint64_t new_begin_raw_value; + + ret = bt_clock_class_clock_value_from_ns_from_origin( + msg_disc_items->default_end_cs->clock_class, + ns_from_origin, &new_begin_raw_value); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + goto end; + } + + bt_clock_snapshot_set_raw_value( + msg_disc_items->default_begin_cs, + new_begin_raw_value); + msg_disc_items->count.base.avail = + BT_PROPERTY_AVAILABILITY_NOT_AVAILABLE; + + /* + * It is safe to push it because its beginning + * time is exactly the requested seeking time. + */ + goto push_msg; + } else { + goto skip_msg; + } } case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: { @@ -1049,7 +1106,7 @@ int get_message_ns_from_origin(const struct bt_message *msg, * and we can't assume any specific time for an * unknown clock snapshot, so skip this. */ - goto set_ignore; + goto skip_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: clk_snapshot = stream_act_msg->default_cs; BT_ASSERT(clk_snapshot); @@ -1071,14 +1128,14 @@ int get_message_ns_from_origin(const struct bt_message *msg, * We can't assume any specific time for an * unknown clock snapshot, so skip this. */ - goto set_ignore; + goto skip_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: /* * +inf is always greater than any requested * time. */ - *ns_from_origin = INT64_MAX; - goto end; + *got_first = true; + goto push_msg; case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: clk_snapshot = stream_act_msg->default_cs; BT_ASSERT(clk_snapshot); @@ -1092,22 +1149,35 @@ int get_message_ns_from_origin(const struct bt_message *msg, case BT_MESSAGE_TYPE_STREAM_BEGINNING: case BT_MESSAGE_TYPE_STREAM_END: /* Ignore */ - break; + goto skip_msg; default: abort(); } -set_ignore: - if (!clk_snapshot) { - *ignore = true; + BT_ASSERT(clk_snapshot); + ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, + &msg_ns_from_origin); + if (ret) { + status = BT_MESSAGE_ITERATOR_STATUS_ERROR; goto end; } - ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, - ns_from_origin); + if (msg_ns_from_origin >= ns_from_origin) { + *got_first = true; + goto push_msg; + } + +skip_msg: + bt_object_put_no_null_check(msg); + goto end; + +push_msg: + g_queue_push_head(iterator->auto_seek_msgs, (void *) msg); + msg = NULL; end: - return ret; + BT_ASSERT(!msg || status != BT_MESSAGE_ITERATOR_STATUS_OK); + return status; } static @@ -1121,6 +1191,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( const struct bt_message *messages[MSG_BATCH_SIZE]; uint64_t user_count = 0; uint64_t i; + bool got_first = false; BT_ASSERT(iterator); memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE); @@ -1170,60 +1241,22 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( abort(); } - /* - * Find first message which has a default clock snapshot - * that is greater than or equal to the requested value. - * - * For event and message iterator inactivity messages, compare with the - * default clock snapshot. - * - * For packet beginning messages, compare with the - * default beginning clock snapshot, if any. - * - * For packet end messages, compare with the default end - * clock snapshot, if any. - * - * For stream beginning, stream end, ignore. - */ for (i = 0; i < user_count; i++) { - const struct bt_message *msg = messages[i]; - int64_t msg_ns_from_origin; - bool ignore = false; - int ret; - - BT_ASSERT(msg); - ret = get_message_ns_from_origin(msg, &msg_ns_from_origin, - &ignore); - if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - goto end; - } - - if (ignore) { - /* Skip message without a clock snapshot */ + if (got_first) { + g_queue_push_head(iterator->auto_seek_msgs, + (void *) messages[i]); + messages[i] = NULL; continue; } - if (msg_ns_from_origin >= ns_from_origin) { - /* - * We found it: move this message and - * the following ones to the iterator's - * auto-seek message array. - */ - uint64_t j; - - for (j = i; j < user_count; j++) { - iterator->auto_seek_msgs->pdata[j - i] = - (void *) messages[j]; - messages[j] = NULL; - } - - iterator->auto_seek_msg_count = user_count - i; + status = auto_seek_handle_message(iterator, + ns_from_origin, messages[i], &got_first); + if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + /* Message was either put or moved */ + messages[i] = NULL; + } else { goto end; } - - bt_object_put_no_null_check(msg); - messages[i] = NULL; } } @@ -1244,44 +1277,47 @@ enum bt_self_message_iterator_status post_auto_seek_next( bt_message_array_const msgs, uint64_t capacity, uint64_t *count) { - BT_ASSERT(iterator->auto_seek_msg_count <= capacity); - BT_ASSERT(iterator->auto_seek_msg_count > 0); + BT_ASSERT(!g_queue_is_empty(iterator->auto_seek_msgs)); + *count = 0; /* * Move auto-seek messages to the output array (which is this - * iterator's base message array. + * iterator's base message array). */ - memcpy(&msgs[0], &iterator->auto_seek_msgs->pdata[0], - sizeof(msgs[0]) * iterator->auto_seek_msg_count); - memset(&iterator->auto_seek_msgs->pdata[0], 0, - sizeof(iterator->auto_seek_msgs->pdata[0]) * - iterator->auto_seek_msg_count); - *count = iterator->auto_seek_msg_count; - - /* Restore real user's "next" method */ - switch (iterator->upstream_component->class->type) { - case BT_COMPONENT_CLASS_TYPE_SOURCE: - { - struct bt_component_class_source *src_comp_cls = - (void *) iterator->upstream_component->class; - - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - src_comp_cls->methods.msg_iter_next; - break; + while (capacity > 0 && !g_queue_is_empty(iterator->auto_seek_msgs)) { + msgs[*count] = g_queue_pop_tail(iterator->auto_seek_msgs); + capacity--; + (*count)++; } - case BT_COMPONENT_CLASS_TYPE_FILTER: - { - struct bt_component_class_filter *flt_comp_cls = - (void *) iterator->upstream_component->class; - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - flt_comp_cls->methods.msg_iter_next; - break; - } - default: - abort(); + BT_ASSERT(*count > 0); + + if (g_queue_is_empty(iterator->auto_seek_msgs)) { + /* No more auto-seek messages */ + switch (iterator->upstream_component->class->type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + { + struct bt_component_class_source *src_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + src_comp_cls->methods.msg_iter_next; + break; + } + case BT_COMPONENT_CLASS_TYPE_FILTER: + { + struct bt_component_class_filter *flt_comp_cls = + (void *) iterator->upstream_component->class; + + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + flt_comp_cls->methods.msg_iter_next; + break; + } + default: + abort(); + } } return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; @@ -1353,36 +1389,45 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( /* * Find the first message which has a default clock * snapshot greater than or equal to the requested - * nanoseconds from origin, and move the received - * messages from this point in the batch to this - * iterator's auto-seek message array. + * seeking time, and move the received messages from + * this point in the batch to this iterator's auto-seek + * message queue. */ + while (!g_queue_is_empty(iterator->auto_seek_msgs)) { + bt_object_put_no_null_check( + g_queue_pop_tail(iterator->auto_seek_msgs)); + } + status = find_message_ge_ns_from_origin(iterator, ns_from_origin); switch (status) { case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_MESSAGE_ITERATOR_STATUS_END: /* - * Replace the user's "next" method with a - * custom, temporary "next" method which returns - * the messages in the iterator's message array. + * If there are messages in the auto-seek + * message queue, replace the user's "next" + * method with a custom, temporary "next" method + * which returns them. */ - iterator->methods.next = - (bt_self_component_port_input_message_iterator_next_method) - post_auto_seek_next; + if (!g_queue_is_empty(iterator->auto_seek_msgs)) { + iterator->methods.next = + (bt_self_component_port_input_message_iterator_next_method) + post_auto_seek_next; + } + + /* + * `BT_MESSAGE_ITERATOR_STATUS_END` becomes + * `BT_MESSAGE_ITERATOR_STATUS_OK`: the next + * time this iterator's "next" method is called, + * it will return + * `BT_MESSAGE_ITERATOR_STATUS_END`. + */ + status = BT_MESSAGE_ITERATOR_STATUS_OK; break; case BT_MESSAGE_ITERATOR_STATUS_ERROR: case BT_MESSAGE_ITERATOR_STATUS_NOMEM: case BT_MESSAGE_ITERATOR_STATUS_AGAIN: goto end; - case BT_MESSAGE_ITERATOR_STATUS_END: - /* - * The iterator reached the end: just return - * `BT_MESSAGE_ITERATOR_STATUS_OK` here, as if - * the seeking operation occured: the next - * "next" method will return - * `BT_MESSAGE_ITERATOR_STATUS_END` itself. - */ - break; default: abort(); } @@ -1390,11 +1435,6 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( end: set_iterator_state_after_seeking(iterator, status); - - if (status == BT_MESSAGE_ITERATOR_STATUS_END) { - status = BT_MESSAGE_ITERATOR_STATUS_OK; - } - return status; } -- 2.34.1