X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Flib%2Fgraph%2Fiterator.c;h=4dd69c6ae69477c26b54ac35e3c22b2033c79ef0;hb=a3f0c7db90f4cfc81090a83a7442b7bc624d5789;hp=db8fa5375a2fda8ca45a95d2f3acc0f7d2f03a03;hpb=f6f301d78f17061caca68a882f25bff0a53d00b3;p=babeltrace.git diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index db8fa537..4dd69c6a 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -34,21 +34,20 @@ #include #include "lib/trace-ir/packet.h" #include "lib/trace-ir/stream.h" +#include +#include +#include #include #include #include #include -#include +#include #include -#include #include #include #include #include #include -#include -#include -#include #include #include #include @@ -61,15 +60,16 @@ #include "lib/assert-post.h" #include #include +#include #include #include "component-class.h" -#include "component-class-sink-colander.h" #include "component.h" #include "component-sink.h" #include "component-source.h" #include "connection.h" #include "graph.h" +#include "message-iterator-class.h" #include "message/discarded-items.h" #include "message/event.h" #include "message/iterator.h" @@ -77,7 +77,7 @@ #include "message/message-iterator-inactivity.h" #include "message/stream.h" #include "message/packet.h" -#include "message/stream-activity.h" +#include "lib/func-status.h" /* * TODO: Use graph's state (number of active iterators, etc.) and @@ -97,27 +97,12 @@ void set_self_comp_port_input_msg_iterator_state( struct bt_self_component_port_input_message_iterator *iterator, enum bt_self_component_port_input_message_iterator_state state) { - BT_ASSERT(iterator); + BT_ASSERT_DBG(iterator); BT_LIB_LOGD("Updating message iterator's state: new-state=%s", bt_self_component_port_input_message_iterator_state_string(state)); iterator->state = state; } -static -void destroy_base_message_iterator(struct bt_object *obj) -{ - struct bt_message_iterator *iterator = (void *) obj; - - BT_ASSERT(iterator); - - if (iterator->msgs) { - g_ptr_array_free(iterator->msgs, TRUE); - iterator->msgs = NULL; - } - - g_free(iterator); -} - static void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj) { @@ -154,7 +139,7 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj if (iterator->auto_seek.msgs) { while (!g_queue_is_empty(iterator->auto_seek.msgs)) { - bt_object_put_no_null_check( + bt_object_put_ref_no_null_check( g_queue_pop_tail(iterator->auto_seek.msgs)); } @@ -162,36 +147,57 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj iterator->auto_seek.msgs = NULL; } - destroy_base_message_iterator(obj); + if (iterator->upstream_msg_iters) { + /* + * At this point the message iterator is finalized, so + * it's detached from any upstream message iterator. + */ + BT_ASSERT(iterator->upstream_msg_iters->len == 0); + g_ptr_array_free(iterator->upstream_msg_iters, TRUE); + iterator->upstream_msg_iters = NULL; + } + + if (iterator->msgs) { + g_ptr_array_free(iterator->msgs, TRUE); + iterator->msgs = NULL; + } + + g_free(iterator); } BT_HIDDEN void bt_self_component_port_input_message_iterator_try_finalize( struct bt_self_component_port_input_message_iterator *iterator) { - typedef void (*method_t)(void *); - - struct bt_component_class *comp_class = NULL; - method_t method = NULL; + uint64_t i; + bool call_user_finalize = true; BT_ASSERT(iterator); switch (iterator->state) { case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED: - /* Skip user finalization if user initialization failed */ - BT_LIB_LOGD("Not finalizing non-initialized message iterator: " - "%!+i", iterator); - goto end; + /* + * If this function is called while the iterator is in the + * NON_INITIALIZED state, it means the user initialization + * method has either not been called, or has failed. We + * therefore don't want to call the user finalization method. + * However, the initialization method might have created some + * upstream message iterators before failing, so we want to + * execute the rest of this function, which unlinks the related + * iterators. + */ + call_user_finalize = false; + break; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED: /* Already finalized */ BT_LIB_LOGD("Not finalizing message iterator: already finalized: " "%!+i", iterator); goto end; case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING: - /* Already finalized */ + /* Finalizing */ BT_LIB_LOGF("Message iterator is already being finalized: " "%!+i", iterator); - abort(); + bt_common_abort(); default: break; } @@ -200,35 +206,70 @@ void bt_self_component_port_input_message_iterator_try_finalize( set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING); BT_ASSERT(iterator->upstream_component); - comp_class = iterator->upstream_component->class; /* Call user-defined destroy method */ - switch (comp_class->type) { - case BT_COMPONENT_CLASS_TYPE_SOURCE: - { - struct bt_component_class_source *src_comp_cls = - (void *) comp_class; + if (call_user_finalize) { + typedef void (*method_t)(void *); + method_t method = NULL; + struct bt_component_class *comp_class = + iterator->upstream_component->class; + + switch (comp_class->type) { + case BT_COMPONENT_CLASS_TYPE_SOURCE: + { + struct bt_component_class_source *src_comp_cls = + (void *) comp_class; - method = (method_t) src_comp_cls->methods.msg_iter_finalize; - break; - } - case BT_COMPONENT_CLASS_TYPE_FILTER: - { - struct bt_component_class_filter *flt_comp_cls = - (void *) comp_class; + method = (method_t) src_comp_cls->msg_iter_cls->methods.finalize; + break; + } + case BT_COMPONENT_CLASS_TYPE_FILTER: + { + struct bt_component_class_filter *flt_comp_cls = + (void *) comp_class; - method = (method_t) flt_comp_cls->methods.msg_iter_finalize; - break; + method = (method_t) flt_comp_cls->msg_iter_cls->methods.finalize; + break; + } + default: + /* Unreachable */ + bt_common_abort(); + } + + if (method) { + const bt_error *saved_error; + + saved_error = bt_current_thread_take_error(); + + BT_LIB_LOGD("Calling user's finalization method: %!+i", + iterator); + method(iterator); + + if (saved_error) { + BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(saved_error); + } + } } - default: - /* Unreachable */ - abort(); + + /* Detach upstream message iterators */ + for (i = 0; i < iterator->upstream_msg_iters->len; i++) { + struct bt_self_component_port_input_message_iterator *upstream_msg_iter = + iterator->upstream_msg_iters->pdata[i]; + + upstream_msg_iter->downstream_msg_iter = NULL; } - if (method) { - BT_LIB_LOGD("Calling user's finalization method: %!+i", + g_ptr_array_set_size(iterator->upstream_msg_iters, 0); + + /* Detach downstream message iterator */ + if (iterator->downstream_msg_iter) { + gboolean existed; + + BT_ASSERT(iterator->downstream_msg_iter->upstream_msg_iters); + existed = g_ptr_array_remove_fast( + iterator->downstream_msg_iter->upstream_msg_iters, iterator); - method(iterator); + BT_ASSERT(existed); } iterator->upstream_component = NULL; @@ -253,84 +294,102 @@ void bt_self_component_port_input_message_iterator_set_connection( } static -int init_message_iterator(struct bt_message_iterator *iterator, - enum bt_message_iterator_type type, - bt_object_release_func destroy) +enum bt_message_iterator_can_seek_beginning_status can_seek_ns_from_origin_true( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin, bt_bool *can_seek) { - int ret = 0; - - bt_object_init_shared(&iterator->base, destroy); - iterator->type = type; - iterator->msgs = g_ptr_array_new(); - if (!iterator->msgs) { - BT_LOGE_STR("Failed to allocate a GPtrArray."); - ret = -1; - goto end; - } + *can_seek = BT_TRUE; - g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE); - -end: - return ret; + return BT_FUNC_STATUS_OK; } static -bt_bool can_seek_ns_from_origin_true( +enum bt_message_iterator_can_seek_beginning_status can_seek_beginning_true( struct bt_self_component_port_input_message_iterator *iterator, - int64_t ns_from_origin) + bt_bool *can_seek) { - return BT_TRUE; -} + *can_seek = BT_TRUE; -static -bt_bool can_seek_beginning_true( - struct bt_self_component_port_input_message_iterator *iterator) -{ - return BT_TRUE; + return BT_FUNC_STATUS_OK; } static -struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create_initial( - struct bt_component *upstream_comp, - struct bt_port *upstream_port) +int create_self_component_input_port_message_iterator( + struct bt_self_message_iterator *self_downstream_msg_iter, + struct bt_self_component_port_input *self_port, + struct bt_self_component_port_input_message_iterator **message_iterator) { - int ret; - struct bt_self_component_port_input_message_iterator *iterator = NULL; + bt_message_iterator_class_initialize_method init_method = NULL; + struct bt_self_component_port_input_message_iterator *iterator = + NULL; + struct bt_self_component_port_input_message_iterator *downstream_msg_iter = + (void *) self_downstream_msg_iter; + struct bt_port *port = (void *) self_port; + struct bt_port *upstream_port; + struct bt_component *comp; + struct bt_component *upstream_comp; + struct bt_component_class *upstream_comp_cls; + int status; - BT_ASSERT(upstream_comp); + BT_ASSERT_PRE_NON_NULL(message_iterator, "Created message iterator"); + BT_ASSERT_PRE_NON_NULL(port, "Input port"); + comp = bt_port_borrow_component_inline(port); + BT_ASSERT_PRE(bt_port_is_connected(port), + "Input port is not connected: %![port-]+p", port); + BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p", + port); + BT_ASSERT(port->connection); + upstream_port = port->connection->upstream_port; BT_ASSERT(upstream_port); - BT_ASSERT(bt_port_is_connected(upstream_port)); - BT_LIB_LOGI("Creating initial message iterator on self component input port: " - "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); - BT_ASSERT(bt_component_get_class_type(upstream_comp) == + upstream_comp = bt_port_borrow_component_inline(upstream_port); + BT_ASSERT(upstream_comp); + BT_ASSERT_PRE( + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED || + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_CONFIGURED, + "Graph is not configured: %!+g", + bt_component_borrow_graph(upstream_comp)); + upstream_comp_cls = upstream_comp->class; + BT_ASSERT(upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_SOURCE || - bt_component_get_class_type(upstream_comp) == + upstream_comp->class->type == BT_COMPONENT_CLASS_TYPE_FILTER); + BT_LIB_LOGI("Creating message iterator on self component input port: " + "%![up-comp-]+c, %![up-port-]+p", upstream_comp, upstream_port); iterator = g_new0( struct bt_self_component_port_input_message_iterator, 1); if (!iterator) { - BT_LOGE_STR("Failed to allocate one self component input port " + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to allocate one self component input port " "message iterator."); - goto end; + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto error; } - ret = init_message_iterator((void *) iterator, - BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT, + bt_object_init_shared(&iterator->base, bt_self_component_port_input_message_iterator_destroy); - if (ret) { - /* init_message_iterator() logs errors */ - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; + iterator->msgs = g_ptr_array_new(); + if (!iterator->msgs) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto error; } + g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE); iterator->last_ns_from_origin = INT64_MIN; - iterator->auto_seek.msgs = g_queue_new(); if (!iterator->auto_seek.msgs) { - BT_LOGE_STR("Failed to allocate a GQueue."); - ret = -1; - goto end; + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GQueue."); + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto error; + } + + iterator->upstream_msg_iters = g_ptr_array_new(); + if (!iterator->upstream_msg_iters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray."); + status = BT_FUNC_STATUS_MEMORY_ERROR; + goto error; } iterator->upstream_component = upstream_comp; @@ -348,19 +407,19 @@ bt_self_component_port_input_message_iterator_create_initial( iterator->methods.next = (bt_self_component_port_input_message_iterator_next_method) - src_comp_cls->methods.msg_iter_next; + src_comp_cls->msg_iter_cls->methods.next; iterator->methods.seek_ns_from_origin = (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method) - src_comp_cls->methods.msg_iter_seek_ns_from_origin; + src_comp_cls->msg_iter_cls->methods.seek_ns_from_origin; iterator->methods.seek_beginning = (bt_self_component_port_input_message_iterator_seek_beginning_method) - src_comp_cls->methods.msg_iter_seek_beginning; + src_comp_cls->msg_iter_cls->methods.seek_beginning; iterator->methods.can_seek_ns_from_origin = (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method) - src_comp_cls->methods.msg_iter_can_seek_ns_from_origin; + src_comp_cls->msg_iter_cls->methods.can_seek_ns_from_origin; iterator->methods.can_seek_beginning = (bt_self_component_port_input_message_iterator_can_seek_beginning_method) - src_comp_cls->methods.msg_iter_can_seek_beginning; + src_comp_cls->msg_iter_cls->methods.can_seek_beginning; break; } case BT_COMPONENT_CLASS_TYPE_FILTER: @@ -370,23 +429,23 @@ bt_self_component_port_input_message_iterator_create_initial( iterator->methods.next = (bt_self_component_port_input_message_iterator_next_method) - flt_comp_cls->methods.msg_iter_next; + flt_comp_cls->msg_iter_cls->methods.next; iterator->methods.seek_ns_from_origin = (bt_self_component_port_input_message_iterator_seek_ns_from_origin_method) - flt_comp_cls->methods.msg_iter_seek_ns_from_origin; + flt_comp_cls->msg_iter_cls->methods.seek_ns_from_origin; iterator->methods.seek_beginning = (bt_self_component_port_input_message_iterator_seek_beginning_method) - flt_comp_cls->methods.msg_iter_seek_beginning; + flt_comp_cls->msg_iter_cls->methods.seek_beginning; iterator->methods.can_seek_ns_from_origin = (bt_self_component_port_input_message_iterator_can_seek_ns_from_origin_method) - flt_comp_cls->methods.msg_iter_can_seek_ns_from_origin; + flt_comp_cls->msg_iter_cls->methods.can_seek_ns_from_origin; iterator->methods.can_seek_beginning = (bt_self_component_port_input_message_iterator_can_seek_beginning_method) - flt_comp_cls->methods.msg_iter_can_seek_beginning; + flt_comp_cls->msg_iter_cls->methods.can_seek_beginning; break; } default: - abort(); + bt_common_abort(); } if (iterator->methods.seek_ns_from_origin && @@ -399,74 +458,17 @@ bt_self_component_port_input_message_iterator_create_initial( if (iterator->methods.seek_beginning && !iterator->methods.can_seek_beginning) { iterator->methods.can_seek_beginning = - (bt_self_component_port_input_message_iterator_seek_beginning_method) + (bt_self_component_port_input_message_iterator_can_seek_beginning_method) can_seek_beginning_true; } - BT_LIB_LOGI("Created initial message iterator on self component input port: " - "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", - upstream_port, upstream_comp, iterator); - -end: - return iterator; -} - -struct bt_self_component_port_input_message_iterator * -bt_self_component_port_input_message_iterator_create( - struct bt_self_component_port_input *self_port) -{ - typedef enum bt_self_message_iterator_status (*init_method_t)( - void *, void *, void *); - - init_method_t init_method = NULL; - struct bt_self_component_port_input_message_iterator *iterator = - NULL; - struct bt_port *port = (void *) self_port; - struct bt_port *upstream_port; - struct bt_component *comp; - struct bt_component *upstream_comp; - struct bt_component_class *upstream_comp_cls; - - BT_ASSERT_PRE_NON_NULL(port, "Port"); - comp = bt_port_borrow_component_inline(port); - BT_ASSERT_PRE(bt_port_is_connected(port), - "Port is not connected: %![port-]+p", port); - BT_ASSERT_PRE(comp, "Port is not part of a component: %![port-]+p", - port); - BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp), - "Port's component's graph is canceled: " - "%![port-]+p, %![comp-]+c", port, comp); - BT_ASSERT(port->connection); - upstream_port = port->connection->upstream_port; - BT_ASSERT(upstream_port); - upstream_comp = bt_port_borrow_component_inline(upstream_port); - BT_ASSERT(upstream_comp); - BT_ASSERT_PRE( - bt_component_borrow_graph(upstream_comp)->config_state != - BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, - "Graph is not configured: %!+g", - bt_component_borrow_graph(upstream_comp)); - upstream_comp_cls = upstream_comp->class; - BT_ASSERT(upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_SOURCE || - upstream_comp->class->type == - BT_COMPONENT_CLASS_TYPE_FILTER); - iterator = bt_self_component_port_input_message_iterator_create_initial( - upstream_comp, upstream_port); - if (!iterator) { - BT_LOGW_STR("Cannot create self component input port " - "message iterator."); - goto end; - } - switch (upstream_comp_cls->type) { case BT_COMPONENT_CLASS_TYPE_SOURCE: { struct bt_component_class_source *src_comp_cls = (void *) upstream_comp_cls; - init_method = - (init_method_t) src_comp_cls->methods.msg_iter_init; + init_method = src_comp_cls->msg_iter_cls->methods.initialize; break; } case BT_COMPONENT_CLASS_TYPE_FILTER: @@ -474,28 +476,49 @@ bt_self_component_port_input_message_iterator_create( struct bt_component_class_filter *flt_comp_cls = (void *) upstream_comp_cls; - init_method = - (init_method_t) flt_comp_cls->methods.msg_iter_init; + init_method = flt_comp_cls->msg_iter_cls->methods.initialize; break; } default: /* Unreachable */ - abort(); + bt_common_abort(); } if (init_method) { - int iter_status; + enum bt_message_iterator_class_initialize_method_status iter_status; BT_LIB_LOGD("Calling user's initialization method: %!+i", iterator); - iter_status = init_method(iterator, upstream_comp, - upstream_port); + iter_status = init_method( + (struct bt_self_message_iterator *) iterator, + &iterator->config, + (struct bt_self_component *) upstream_comp, + (struct bt_self_component_port_output *) upstream_port); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(iter_status)); - if (iter_status != BT_MESSAGE_ITERATOR_STATUS_OK) { - BT_LOGW_STR("Initialization method failed."); - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; + bt_common_func_status_string(iter_status)); + BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(iter_status); + if (iter_status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator initialization method failed: " + "%![iter-]+i, status=%s", + iterator, + bt_common_func_status_string(iter_status)); + status = iter_status; + goto error; } + + iterator->config.frozen = true; + } + + if (downstream_msg_iter) { + /* Set this message iterator's downstream message iterator */ + iterator->downstream_msg_iter = downstream_msg_iter; + + /* + * Add this message iterator to the downstream message + * iterator's array of upstream message iterators. + */ + g_ptr_array_add(downstream_msg_iter->upstream_msg_iters, + iterator); } set_self_comp_port_input_msg_iterator_state(iterator, @@ -505,8 +528,39 @@ bt_self_component_port_input_message_iterator_create( "%![up-port-]+p, %![up-comp-]+c, %![iter-]+i", upstream_port, upstream_comp, iterator); + *message_iterator = iterator; + status = BT_FUNC_STATUS_OK; + goto end; + +error: + BT_OBJECT_PUT_REF_AND_RESET(iterator); + end: - return iterator; + return status; +} + +bt_self_component_port_input_message_iterator_create_from_message_iterator_status +bt_self_component_port_input_message_iterator_create_from_message_iterator( + struct bt_self_message_iterator *self_msg_iter, + struct bt_self_component_port_input *input_port, + struct bt_self_component_port_input_message_iterator **message_iterator) +{ + BT_ASSERT_PRE_NO_ERROR(); + BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator"); + return create_self_component_input_port_message_iterator(self_msg_iter, + input_port, message_iterator); +} + +bt_self_component_port_input_message_iterator_create_from_sink_component_status +bt_self_component_port_input_message_iterator_create_from_sink_component( + struct bt_self_component_sink *self_comp, + struct bt_self_component_port_input *input_port, + struct bt_self_component_port_input_message_iterator **message_iterator) +{ + BT_ASSERT_PRE_NO_ERROR(); + BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component"); + return create_self_component_input_port_message_iterator(NULL, + input_port, message_iterator); } void *bt_self_message_iterator_get_data( @@ -515,7 +569,7 @@ void *bt_self_message_iterator_get_data( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return iterator->user_data; } @@ -525,18 +579,28 @@ void bt_self_message_iterator_set_data( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); iterator->user_data = data; BT_LIB_LOGD("Set message iterator's user data: " "%!+i, user-data-addr=%p", iterator, data); } +void bt_self_message_iterator_configuration_set_can_seek_forward( + bt_self_message_iterator_configuration *config, + bt_bool can_seek_forward) +{ + BT_ASSERT_PRE_NON_NULL(config, "Message iterator configuration"); + BT_ASSERT_PRE_DEV_HOT(config, "Message iterator configuration", ""); + + config->can_seek_forward = can_seek_forward; +} + /* * Validate that the default clock snapshot in `msg` doesn't make us go back in * time. */ -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_snapshots_are_monotonic_one( struct bt_self_component_port_input_message_iterator *iterator, @@ -545,7 +609,7 @@ bool clock_snapshots_are_monotonic_one( const struct bt_clock_snapshot *clock_snapshot = NULL; bt_message_type message_type = bt_message_get_type(msg); int64_t ns_from_origin; - enum bt_clock_snapshot_status clock_snapshot_status; + enum bt_clock_snapshot_get_ns_from_origin_status clock_snapshot_status; /* * The default is true: if we can't figure out the clock snapshot @@ -574,21 +638,17 @@ bool clock_snapshots_are_monotonic_one( clock_snapshot = packet_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: { - struct bt_message_stream_activity *str_act_msg = - (struct bt_message_stream_activity *) msg; - - if (str_act_msg->default_cs_state == BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN) { - clock_snapshot = str_act_msg->default_cs; + struct bt_message_stream *stream_msg = (struct bt_message_stream *) msg; + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + goto end; } + + clock_snapshot = stream_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_END: - /* These messages don't have clock snapshots. */ - goto end; case BT_MESSAGE_TYPE_DISCARDED_EVENTS: case BT_MESSAGE_TYPE_DISCARDED_PACKETS: { @@ -604,8 +664,15 @@ bool clock_snapshots_are_monotonic_one( goto end; } - clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, &ns_from_origin); - if (clock_snapshot_status != BT_CLOCK_SNAPSHOT_STATUS_OK) { + clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin( + clock_snapshot, &ns_from_origin); + if (clock_snapshot_status != BT_FUNC_STATUS_OK) { + /* + * bt_clock_snapshot_get_ns_from_origin can return + * OVERFLOW_ERROR. We don't really want to report an error to + * our caller, so just clear it. + */ + bt_current_thread_clear_error(); goto end; } @@ -615,7 +682,7 @@ end: return result; } -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_snapshots_are_monotonic( struct bt_self_component_port_input_message_iterator *iterator, @@ -642,7 +709,7 @@ end: * stream is compatible with what we've seen before. */ -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_classes_are_compatible_one(struct bt_self_component_port_input_message_iterator *iterator, const struct bt_message *msg) @@ -674,7 +741,7 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_UNIX; } else if (clock_class_uuid) { iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_UUID; - memcpy(iterator->clock_expectation.uuid, clock_class_uuid, BABELTRACE_UUID_LEN); + bt_uuid_copy(iterator->clock_expectation.uuid, clock_class_uuid); } else { iterator->clock_expectation.type = CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID; } @@ -682,7 +749,8 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_NONE: if (clock_class) { - BT_ASSERT_POST_MSG("Expecting no clock class, got one: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting no clock class, got one: %![cc-]+K", clock_class); result = false; goto end; @@ -692,13 +760,15 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_UNIX: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (!bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class with Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; @@ -707,27 +777,31 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_OTHER_UUID: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; } if (!clock_class_uuid) { - BT_ASSERT_POST_MSG("Expecting a clock class with UUID: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with UUID: %![cc-]+K", clock_class); result = false; goto end; } if (bt_uuid_compare(iterator->clock_expectation.uuid, clock_class_uuid)) { - BT_ASSERT_POST_MSG("Expecting a clock class with UUID, got one " + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class with UUID, got one " "with a different UUID: %![cc-]+K, expected-uuid=%!u", clock_class, iterator->clock_expectation.uuid); result = false; @@ -737,20 +811,23 @@ bool clock_classes_are_compatible_one(struct bt_self_component_port_input_messag case CLOCK_EXPECTATION_ORIGIN_OTHER_NO_UUID: if (!clock_class) { - BT_ASSERT_POST_MSG("Expecting a clock class, got none."); + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class, got none."); result = false; goto end; } if (bt_clock_class_origin_is_unix_epoch(clock_class)) { - BT_ASSERT_POST_MSG("Expecting a clock class without Unix epoch origin: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without Unix epoch origin: %![cc-]+K", clock_class); result = false; goto end; } if (clock_class_uuid) { - BT_ASSERT_POST_MSG("Expecting a clock class without UUID: %![cc-]+K", + BT_ASSERT_POST_DEV_MSG( + "Expecting a clock class without UUID: %![cc-]+K", clock_class); result = false; goto end; @@ -765,7 +842,7 @@ end: return result; } -BT_ASSERT_POST_FUNC +BT_ASSERT_POST_DEV_FUNC static bool clock_classes_are_compatible( struct bt_self_component_port_input_message_iterator *iterator, @@ -793,45 +870,49 @@ end: */ static -bt_message_iterator_status call_iterator_next_method( +enum bt_message_iterator_class_next_method_status +call_iterator_next_method( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const msgs, uint64_t capacity, uint64_t *user_count) { - bt_message_iterator_status status; + enum bt_message_iterator_class_next_method_status status; - BT_ASSERT(iterator->methods.next); + BT_ASSERT_DBG(iterator->methods.next); BT_LOGD_STR("Calling user's \"next\" method."); status = iterator->methods.next(iterator, msgs, capacity, user_count); BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64, - bt_message_iterator_status_string(status), *user_count); + bt_common_func_status_string(status), *user_count); - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { - BT_ASSERT_POST(clock_classes_are_compatible(iterator, msgs, *user_count), + if (status == BT_FUNC_STATUS_OK) { + BT_ASSERT_POST_DEV(clock_classes_are_compatible(iterator, msgs, *user_count), "Clocks are not compatible"); - BT_ASSERT_POST(clock_snapshots_are_monotonic(iterator, msgs, *user_count), + BT_ASSERT_POST_DEV(clock_snapshots_are_monotonic(iterator, msgs, *user_count), "Clock snapshots are not monotonic"); } + BT_ASSERT_POST_DEV_NO_ERROR_IF_NO_ERROR_STATUS(status); + return status; } -enum bt_message_iterator_status +enum bt_message_iterator_next_status bt_self_component_port_input_message_iterator_next( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const *msgs, uint64_t *user_count) { - int status = BT_MESSAGE_ITERATOR_STATUS_OK; + enum bt_message_iterator_next_status status = BT_FUNC_STATUS_OK; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - BT_ASSERT_PRE_NON_NULL(msgs, "Message array (output)"); - BT_ASSERT_PRE_NON_NULL(user_count, "Message count (output)"); - BT_ASSERT_PRE(iterator->state == + BT_ASSERT_PRE_DEV_NO_ERROR(); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(msgs, "Message array (output)"); + BT_ASSERT_PRE_DEV_NON_NULL(user_count, "Message count (output)"); + BT_ASSERT_PRE_DEV(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE, "Message iterator's \"next\" called, but " "message iterator is in the wrong state: %!+i", iterator); - BT_ASSERT(iterator->upstream_component); - BT_ASSERT(iterator->upstream_component->class); - BT_ASSERT_PRE( + BT_ASSERT_DBG(iterator->upstream_component); + BT_ASSERT_DBG(iterator->upstream_component->class); + BT_ASSERT_PRE_DEV( bt_component_borrow_graph(iterator->upstream_component)->config_state != BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not configured: %!+g", @@ -845,11 +926,16 @@ bt_self_component_port_input_message_iterator_next( * and status. */ *user_count = 0; - status = call_iterator_next_method(iterator, - (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE, + status = (int) call_iterator_next_method(iterator, + (void *) iterator->msgs->pdata, MSG_BATCH_SIZE, user_count); + BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64, + bt_common_func_status_string(status), *user_count); if (status < 0) { - BT_LOGW_STR("User method failed."); + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"next\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); goto end; } @@ -862,86 +948,37 @@ bt_self_component_port_input_message_iterator_next( * For the same reason, there is no way that this iterator could * have seeked (cannot seek a self message iterator). */ - BT_ASSERT(iterator->state == + BT_ASSERT_DBG(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - BT_ASSERT_POST(*user_count <= MSG_BATCH_SIZE, + case BT_FUNC_STATUS_OK: + BT_ASSERT_POST_DEV(*user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", *user_count, MSG_BATCH_SIZE); - *msgs = (void *) iterator->base.msgs->pdata; + *msgs = (void *) iterator->msgs->pdata; break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_AGAIN: goto end; - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_END: set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED); goto end; default: /* Unknown non-error status */ - abort(); + bt_common_abort(); } end: return status; } -enum bt_message_iterator_status bt_port_output_message_iterator_next( - struct bt_port_output_message_iterator *iterator, - bt_message_array_const *msgs_to_user, - uint64_t *count_to_user) -{ - enum bt_message_iterator_status status; - enum bt_graph_status graph_status; - - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - BT_ASSERT_PRE_NON_NULL(msgs_to_user, "Message array (output)"); - BT_ASSERT_PRE_NON_NULL(count_to_user, "Message count (output)"); - BT_LIB_LOGD("Getting next output port message iterator's messages: " - "%!+i", iterator); - graph_status = bt_graph_consume_sink_no_check(iterator->graph, - iterator->colander); - switch (graph_status) { - case BT_GRAPH_STATUS_CANCELED: - case BT_GRAPH_STATUS_AGAIN: - case BT_GRAPH_STATUS_END: - case BT_GRAPH_STATUS_NOMEM: - status = (int) graph_status; - break; - case BT_GRAPH_STATUS_OK: - status = BT_MESSAGE_ITERATOR_STATUS_OK; - - /* - * On success, the colander sink moves the messages - * to this iterator's array and sets this iterator's - * message count: move them to the user. - */ - *msgs_to_user = (void *) iterator->base.msgs->pdata; - *count_to_user = iterator->count; - break; - default: - /* Other errors */ - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - } - - return status; -} - struct bt_component * bt_self_component_port_input_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator) { - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return iterator->upstream_component; -} - -const struct bt_component * -bt_self_component_port_input_message_iterator_borrow_component_const( - const struct bt_self_component_port_input_message_iterator *iterator) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return iterator->upstream_component; } @@ -951,186 +988,30 @@ struct bt_self_component *bt_self_message_iterator_borrow_component( struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return (void *) iterator->upstream_component; } -struct bt_self_port_output *bt_self_message_iterator_borrow_port( +struct bt_self_component_port_output *bt_self_message_iterator_borrow_port( struct bt_self_message_iterator *self_iterator) { struct bt_self_component_port_input_message_iterator *iterator = (void *) self_iterator; - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator"); return (void *) iterator->upstream_port; } -static -void bt_port_output_message_iterator_destroy(struct bt_object *obj) -{ - struct bt_port_output_message_iterator *iterator = (void *) obj; - - BT_LIB_LOGI("Destroying output port message iterator object: %!+i", - iterator); - BT_LOGD_STR("Putting graph."); - BT_OBJECT_PUT_REF_AND_RESET(iterator->graph); - BT_LOGD_STR("Putting colander sink component."); - BT_OBJECT_PUT_REF_AND_RESET(iterator->colander); - destroy_base_message_iterator(obj); -} - -struct bt_port_output_message_iterator * -bt_port_output_message_iterator_create(struct bt_graph *graph, - const struct bt_port_output *output_port) -{ - struct bt_port_output_message_iterator *iterator = NULL; - struct bt_component_class_sink *colander_comp_cls = NULL; - struct bt_component *output_port_comp = NULL; - struct bt_component_sink *colander_comp; - enum bt_graph_status graph_status; - struct bt_port_input *colander_in_port = NULL; - struct bt_component_class_sink_colander_data colander_data; - int ret; - - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE_NON_NULL(output_port, "Output port"); - output_port_comp = bt_port_borrow_component_inline( - (const void *) output_port); - BT_ASSERT_PRE(output_port_comp, - "Output port has no component: %!+p", output_port); - BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) == - (void *) graph, - "Output port is not part of graph: %![graph-]+g, %![port-]+p", - graph, output_port); - BT_ASSERT_PRE(!graph->has_sink, - "Graph already has a sink component: %![graph-]+g"); - - /* Create message iterator */ - BT_LIB_LOGI("Creating message iterator on output port: " - "%![port-]+p, %![comp-]+c", output_port, output_port_comp); - iterator = g_new0(struct bt_port_output_message_iterator, 1); - if (!iterator) { - BT_LOGE_STR("Failed to allocate one output port message iterator."); - goto error; - } - - ret = init_message_iterator((void *) iterator, - BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT, - bt_port_output_message_iterator_destroy); - if (ret) { - /* init_message_iterator() logs errors */ - BT_OBJECT_PUT_REF_AND_RESET(iterator); - goto end; - } - - /* Create colander component */ - colander_comp_cls = bt_component_class_sink_colander_get(); - if (!colander_comp_cls) { - BT_LOGW("Cannot get colander sink component class."); - goto error; - } - - iterator->graph = graph; - bt_object_get_no_null_check(iterator->graph); - colander_data.msgs = (void *) iterator->base.msgs->pdata; - colander_data.count_addr = &iterator->count; - - /* - * Hope that nobody uses this very unique name. - * - * We pass `BT_LOGGING_LEVEL_NONE` but the colander component - * class module does not use this level anyway since it belongs - * to the library. - */ - graph_status = - bt_graph_add_sink_component_with_init_method_data( - (void *) graph, colander_comp_cls, - "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1", - NULL, &colander_data, BT_LOGGING_LEVEL_NONE, - (void *) &iterator->colander); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot add colander sink component to graph: " - "%1[graph-]+g, status=%s", graph, - bt_graph_status_string(graph_status)); - goto error; - } - - /* - * Connect provided output port to the colander component's - * input port. - */ - colander_in_port = - (void *) bt_component_sink_borrow_input_port_by_index_const( - (void *) iterator->colander, 0); - BT_ASSERT(colander_in_port); - graph_status = bt_graph_connect_ports(graph, - output_port, colander_in_port, NULL); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot add colander sink component to graph: " - "%![graph-]+g, %![comp-]+c, status=%s", graph, - iterator->colander, - bt_graph_status_string(graph_status)); - goto error; - } - - /* - * At this point everything went fine. Make the graph - * nonconsumable forever so that only this message iterator - * can consume (thanks to bt_graph_consume_sink_no_check()). - * This avoids leaking the message created by the colander - * sink and moved to the message iterator's message - * member. - */ - bt_graph_set_can_consume(iterator->graph, false); - - /* Also set the graph as being configured. */ - graph_status = bt_graph_configure(graph); - if (graph_status != BT_GRAPH_STATUS_OK) { - BT_LIB_LOGW("Cannot configure graph after having added colander: " - "%![graph-]+g, status=%s", graph, - bt_graph_status_string(graph_status)); - goto error; - } - goto end; - -error: - if (iterator && iterator->graph && iterator->colander) { - int ret; - - /* Remove created colander component from graph if any */ - colander_comp = iterator->colander; - BT_OBJECT_PUT_REF_AND_RESET(iterator->colander); - - /* - * At this point the colander component's reference - * count is 0 because iterator->colander was the only - * owner. We also know that it is not connected because - * this is the last operation before this function - * succeeds. - * - * Since we honor the preconditions here, - * bt_graph_remove_unconnected_component() always - * succeeds. - */ - ret = bt_graph_remove_unconnected_component(iterator->graph, - (void *) colander_comp); - BT_ASSERT(ret == 0); - } - - BT_OBJECT_PUT_REF_AND_RESET(iterator); - -end: - bt_object_put_ref(colander_comp_cls); - return (void *) iterator; -} - -bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( +enum bt_message_iterator_can_seek_ns_from_origin_status +bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, - int64_t ns_from_origin) + int64_t ns_from_origin, bt_bool *can_seek) { - bt_bool can = BT_FALSE; + enum bt_message_iterator_can_seek_ns_from_origin_status status; + BT_ASSERT_PRE_NO_ERROR(); BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); BT_ASSERT_PRE( bt_component_borrow_graph(iterator->upstream_component)->config_state != @@ -1139,29 +1020,69 @@ bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( bt_component_borrow_graph(iterator->upstream_component)); if (iterator->methods.can_seek_ns_from_origin) { - can = iterator->methods.can_seek_ns_from_origin(iterator, - ns_from_origin); - goto end; + /* + * Initialize to an invalid value, so we can post-assert that + * the method returned a valid value. + */ + *can_seek = -1; + + BT_LIB_LOGD("Calling user's \"can seek nanoseconds from origin\" method: %!+i", + iterator); + + status = (int) iterator->methods.can_seek_ns_from_origin(iterator, + ns_from_origin, can_seek); + + BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status); + + if (status != BT_FUNC_STATUS_OK) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"can seek nanoseconds from origin\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + goto end; + } + + BT_ASSERT_POST(*can_seek == BT_TRUE || *can_seek == BT_FALSE, + "Unexpected boolean value returned from user's \"can seek ns from origin\" method: val=%d, %![iter-]+i", + *can_seek, iterator); + + BT_LIB_LOGD( + "User's \"can seek nanoseconds from origin\" returned successfully: " + "%![iter-]+i, can-seek=%d", + iterator, *can_seek); + + if (*can_seek) { + goto end; + } } /* - * Automatic seeking fall back: if we can seek to the beginning, - * then we can automatically seek to any message. + * Automatic seeking fall back: if we can seek to the beginning and the + * iterator supports forward seeking then we can automatically seek to + * any timestamp. */ - if (iterator->methods.can_seek_beginning) { - can = iterator->methods.can_seek_beginning(iterator); + status = (int) bt_self_component_port_input_message_iterator_can_seek_beginning( + iterator, can_seek); + if (status != BT_FUNC_STATUS_OK) { + goto end; } + *can_seek = *can_seek && iterator->config.can_seek_forward; + end: - return can; + return status; } -bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( - struct bt_self_component_port_input_message_iterator *iterator) +enum bt_message_iterator_can_seek_beginning_status +bt_self_component_port_input_message_iterator_can_seek_beginning( + struct bt_self_component_port_input_message_iterator *iterator, + bt_bool *can_seek) { - bt_bool can = BT_FALSE; + enum bt_message_iterator_can_seek_beginning_status status; + BT_ASSERT_PRE_NO_ERROR(); BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); BT_ASSERT_PRE( bt_component_borrow_graph(iterator->upstream_component)->config_state != @@ -1170,36 +1091,53 @@ bt_bool bt_self_component_port_input_message_iterator_can_seek_beginning( bt_component_borrow_graph(iterator->upstream_component)); if (iterator->methods.can_seek_beginning) { - can = iterator->methods.can_seek_beginning(iterator); + /* + * Initialize to an invalid value, so we can post-assert that + * the method returned a valid value. + */ + *can_seek = -1; + + status = (int) iterator->methods.can_seek_beginning(iterator, can_seek); + + BT_ASSERT_POST( + status != BT_FUNC_STATUS_OK || + *can_seek == BT_TRUE || + *can_seek == BT_FALSE, + "Unexpected boolean value returned from user's \"can seek beginning\" method: val=%d, %![iter-]+i", + *can_seek, iterator); + BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status); + } else { + *can_seek = BT_FALSE; + status = BT_FUNC_STATUS_OK; } - return can; + return status; } static inline void set_iterator_state_after_seeking( struct bt_self_component_port_input_message_iterator *iterator, - enum bt_message_iterator_status status) + int status) { enum bt_self_component_port_input_message_iterator_state new_state = 0; /* Set iterator's state depending on seeking status */ switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_FUNC_STATUS_OK: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE; break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_AGAIN: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_AGAIN; break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR; break; - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_END: new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED; break; default: - abort(); + bt_common_abort(); } set_self_comp_port_input_msg_iterator_state(iterator, new_state); @@ -1213,12 +1151,29 @@ void reset_iterator_expectations( iterator->clock_expectation.type = CLOCK_EXPECTATION_UNSET; } -enum bt_message_iterator_status +static +bool message_iterator_can_seek_beginning( + struct bt_self_component_port_input_message_iterator *iterator) +{ + enum bt_message_iterator_can_seek_beginning_status status; + bt_bool can_seek; + + status = bt_self_component_port_input_message_iterator_can_seek_beginning( + iterator, &can_seek); + if (status != BT_FUNC_STATUS_OK) { + can_seek = BT_FALSE; + } + + return can_seek; +} + +enum bt_message_iterator_seek_beginning_status bt_self_component_port_input_message_iterator_seek_beginning( struct bt_self_component_port_input_message_iterator *iterator) { int status; + BT_ASSERT_PRE_NO_ERROR(); BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); BT_ASSERT_PRE( @@ -1226,9 +1181,7 @@ bt_self_component_port_input_message_iterator_seek_beginning( BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not configured: %!+g", bt_component_borrow_graph(iterator->upstream_component)); - BT_ASSERT_PRE( - bt_self_component_port_input_message_iterator_can_seek_beginning( - iterator), + BT_ASSERT_PRE(message_iterator_can_seek_beginning(iterator), "Message iterator cannot seek beginning: %!+i", iterator); /* @@ -1242,17 +1195,33 @@ bt_self_component_port_input_message_iterator_seek_beginning( BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_SEEKING); status = iterator->methods.seek_beginning(iterator); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_POST(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek beginning\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } + set_iterator_state_after_seeking(iterator, status); return status; } +bt_bool +bt_self_component_port_input_message_iterator_can_seek_forward( + bt_self_component_port_input_message_iterator *iterator) +{ + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + + return iterator->config.can_seek_forward; +} /* * Structure used to record the state of a given stream during the fast-forward @@ -1263,20 +1232,18 @@ struct auto_seek_stream_state { * Value representing which step of this timeline we are at. * * time ---> - * [SB] 1 [SAB] 2 [PB] 3 [PE] 2 [SAE] 1 [SE] + * [SB] 1 [PB] 2 [PE] 1 [SE] * * At each point in the timeline, the messages we need to replicate are: * * 1: Stream beginning - * 2: Stream beginning, stream activity beginning - * 3: Stream beginning, stream activity beginning, packet beginning + * 2: Stream beginning, packet beginning * * Before "Stream beginning" and after "Stream end", we don't need to * replicate anything as the stream doesn't exist. */ enum { AUTO_SEEK_STREAM_STATE_STREAM_BEGAN, - AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN, AUTO_SEEK_STREAM_STATE_PACKET_BEGAN, } state; @@ -1286,6 +1253,9 @@ struct auto_seek_stream_state { * alive by the time we use it. */ struct bt_packet *packet; + + /* Have we see a message with a clock snapshot yet? */ + bool seen_clock_snapshot; }; static @@ -1326,18 +1296,18 @@ void destroy_auto_seek_stream_states(GHashTable *stream_states) */ static inline -enum bt_message_iterator_status auto_seek_handle_message( +int auto_seek_handle_message( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin, const struct bt_message *msg, bool *got_first, GHashTable *stream_states) { - enum bt_message_iterator_status status = BT_MESSAGE_ITERATOR_STATUS_OK; + int status = BT_FUNC_STATUS_OK; int64_t msg_ns_from_origin; const struct bt_clock_snapshot *clk_snapshot = NULL; int ret; - BT_ASSERT(msg); - BT_ASSERT(got_first); + BT_ASSERT_DBG(msg); + BT_ASSERT_DBG(got_first); switch (msg->type) { case BT_MESSAGE_TYPE_EVENT: @@ -1346,7 +1316,7 @@ enum bt_message_iterator_status auto_seek_handle_message( (const void *) msg; clk_snapshot = event_msg->default_cs; - BT_ASSERT_POST(clk_snapshot, + BT_ASSERT_POST_DEV(clk_snapshot, "Event message has no default clock snapshot: %!+n", event_msg); break; @@ -1357,7 +1327,7 @@ enum bt_message_iterator_status auto_seek_handle_message( (const void *) msg; clk_snapshot = inactivity_msg->default_cs; - BT_ASSERT(clk_snapshot); + BT_ASSERT_DBG(clk_snapshot); break; } case BT_MESSAGE_TYPE_PACKET_BEGINNING: @@ -1367,7 +1337,7 @@ enum bt_message_iterator_status auto_seek_handle_message( (const void *) msg; clk_snapshot = packet_msg->default_cs; - BT_ASSERT_POST(clk_snapshot, + BT_ASSERT_POST_DEV(clk_snapshot, "Packet message has no default clock snapshot: %!+n", packet_msg); break; @@ -1378,7 +1348,7 @@ enum bt_message_iterator_status auto_seek_handle_message( struct bt_message_discarded_items *msg_disc_items = (void *) msg; - BT_ASSERT_POST(msg_disc_items->default_begin_cs && + BT_ASSERT_POST_DEV(msg_disc_items->default_begin_cs && msg_disc_items->default_end_cs, "Discarded events/packets message has no default clock snapshots: %!+n", msg_disc_items); @@ -1386,7 +1356,7 @@ enum bt_message_iterator_status auto_seek_handle_message( msg_disc_items->default_begin_cs, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1399,7 +1369,7 @@ enum bt_message_iterator_status auto_seek_handle_message( msg_disc_items->default_end_cs, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1413,13 +1383,13 @@ enum bt_message_iterator_status auto_seek_handle_message( * as we don't know if items were really * discarded within the new time range. */ - uint64_t new_begin_raw_value; + uint64_t new_begin_raw_value = 0; ret = bt_clock_class_clock_value_from_ns_from_origin( msg_disc_items->default_end_cs->clock_class, ns_from_origin, &new_begin_raw_value); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1438,72 +1408,29 @@ enum bt_message_iterator_status auto_seek_handle_message( goto skip_msg; } } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: - { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; - - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * -inf is always less than any requested time, - * and we can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ - goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); - } - - break; - } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: + case BT_MESSAGE_TYPE_STREAM_BEGINNING: + case BT_MESSAGE_TYPE_STREAM_END: { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; + struct bt_message_stream *stream_msg = + (struct bt_message_stream *) msg; - switch (stream_act_msg->default_cs_state) { - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN: - /* - * We can't assume any specific time for an - * unknown clock snapshot, so skip this. - */ + if (stream_msg->default_cs_state != BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + /* Ignore */ goto skip_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE: - /* - * +inf is always greater than any requested - * time. - */ - *got_first = true; - goto push_msg; - case BT_MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN: - clk_snapshot = stream_act_msg->default_cs; - BT_ASSERT(clk_snapshot); - break; - default: - abort(); } + clk_snapshot = stream_msg->default_cs; break; } - case BT_MESSAGE_TYPE_STREAM_BEGINNING: - case BT_MESSAGE_TYPE_STREAM_END: - /* Ignore */ - goto skip_msg; default: - abort(); + bt_common_abort(); } - BT_ASSERT(clk_snapshot); + BT_ASSERT_DBG(clk_snapshot); ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot, &msg_ns_from_origin); if (ret) { - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; + status = BT_FUNC_STATUS_ERROR; goto end; } @@ -1523,45 +1450,52 @@ skip_msg: /* Update stream's state: stream began. */ stream_state = create_auto_seek_stream_state(); if (!stream_state) { - status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + status = BT_FUNC_STATUS_MEMORY_ERROR; goto end; } stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; - BT_ASSERT(!bt_g_hash_table_contains(stream_states, stream_msg->stream)); + if (stream_msg->default_cs_state == BT_MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_KNOWN) { + stream_state->seen_clock_snapshot = true; + } + + BT_ASSERT_DBG(!bt_g_hash_table_contains(stream_states, stream_msg->stream)); g_hash_table_insert(stream_states, stream_msg->stream, stream_state); break; } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: + case BT_MESSAGE_TYPE_PACKET_BEGINNING: { - const struct bt_message_stream_activity *stream_act_msg = + const struct bt_message_packet *packet_msg = (const void *) msg; struct auto_seek_stream_state *stream_state; - /* Update stream's state: stream activity began. */ - stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); - BT_ASSERT(stream_state); + /* Update stream's state: packet began. */ + stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); + BT_ASSERT_DBG(stream_state); + BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; + BT_ASSERT_DBG(!stream_state->packet); + stream_state->packet = packet_msg->packet; + + if (packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; - BT_ASSERT(!stream_state->packet); break; } - case BT_MESSAGE_TYPE_PACKET_BEGINNING: + case BT_MESSAGE_TYPE_EVENT: { - const struct bt_message_packet *packet_msg = - (const void *) msg; + const struct bt_message_event *event_msg = (const void *) msg; struct auto_seek_stream_state *stream_state; - /* Update stream's state: packet began. */ - stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); - BT_ASSERT(stream_state); + stream_state = g_hash_table_lookup(stream_states, + event_msg->event->packet->stream); + BT_ASSERT_DBG(stream_state); + + // HELPME: are we sure that event messages have clock snapshots at this point? + stream_state->seen_clock_snapshot = true; - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN; - BT_ASSERT(!stream_state->packet); - stream_state->packet = packet_msg->packet; break; } case BT_MESSAGE_TYPE_PACKET_END: @@ -1572,27 +1506,16 @@ skip_msg: /* Update stream's state: packet ended. */ stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream); - BT_ASSERT(stream_state); - - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN; - BT_ASSERT(stream_state->packet); + BT_ASSERT_DBG(stream_state); + BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN); + stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; + BT_ASSERT_DBG(stream_state->packet); stream_state->packet = NULL; - break; - } - case BT_MESSAGE_TYPE_STREAM_ACTIVITY_END: - { - const struct bt_message_stream_activity *stream_act_msg = - (const void *) msg; - struct auto_seek_stream_state *stream_state; - /* Update stream's state: stream activity ended. */ - stream_state = g_hash_table_lookup(stream_states, stream_act_msg->stream); - BT_ASSERT(stream_state); + if (packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) { + stream_state->seen_clock_snapshot = true; + } - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN); - stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN; - BT_ASSERT(!stream_state->packet); break; } case BT_MESSAGE_TYPE_STREAM_END: @@ -1601,19 +1524,36 @@ skip_msg: struct auto_seek_stream_state *stream_state; stream_state = g_hash_table_lookup(stream_states, stream_msg->stream); - BT_ASSERT(stream_state); - BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); - BT_ASSERT(!stream_state->packet); + BT_ASSERT_DBG(stream_state); + BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN); + BT_ASSERT_DBG(!stream_state->packet); /* Update stream's state: this stream doesn't exist anymore. */ g_hash_table_remove(stream_states, stream_msg->stream); break; } + case BT_MESSAGE_TYPE_DISCARDED_EVENTS: + case BT_MESSAGE_TYPE_DISCARDED_PACKETS: + { + const struct bt_message_discarded_items *discarded_msg = + (const void *) msg; + struct auto_seek_stream_state *stream_state; + + stream_state = g_hash_table_lookup(stream_states, discarded_msg->stream); + BT_ASSERT_DBG(stream_state); + + if ((msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && discarded_msg->stream->class->discarded_events_have_default_clock_snapshots) || + (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && discarded_msg->stream->class->discarded_packets_have_default_clock_snapshots)) { + stream_state->seen_clock_snapshot = true; + } + + break; + } default: break; } - bt_object_put_no_null_check(msg); + bt_object_put_ref_no_null_check(msg); msg = NULL; goto end; @@ -1622,16 +1562,16 @@ push_msg: msg = NULL; end: - BT_ASSERT(!msg || status != BT_MESSAGE_ITERATOR_STATUS_OK); + BT_ASSERT_DBG(!msg || status != BT_FUNC_STATUS_OK); return status; } static -enum bt_message_iterator_status find_message_ge_ns_from_origin( +int find_message_ge_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin, GHashTable *stream_states) { - int status; + int status = BT_FUNC_STATUS_OK; enum bt_self_component_port_input_message_iterator_state init_state = iterator->state; const struct bt_message *messages[MSG_BATCH_SIZE]; @@ -1639,7 +1579,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( uint64_t i; bool got_first = false; - BT_ASSERT(iterator); + BT_ASSERT_DBG(iterator); memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE); /* @@ -1649,7 +1589,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( set_self_comp_port_input_msg_iterator_state(iterator, BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); - BT_ASSERT(iterator->methods.next); + BT_ASSERT_DBG(iterator->methods.next); while (!got_first) { /* @@ -1658,28 +1598,36 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( */ status = call_iterator_next_method(iterator, &messages[0], MSG_BATCH_SIZE, &user_count); + BT_LOGD("User method returned: status=%s", + bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"next\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } /* * The user's "next" method must not do any action which * would change the iterator's state. */ - BT_ASSERT(iterator->state == + BT_ASSERT_DBG(iterator->state == BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - BT_ASSERT_POST(user_count <= MSG_BATCH_SIZE, + case BT_FUNC_STATUS_OK: + BT_ASSERT_POST_DEV(user_count <= MSG_BATCH_SIZE, "Invalid returned message count: greater than " "batch size: count=%" PRIu64 ", batch-size=%u", user_count, MSG_BATCH_SIZE); break; - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_AGAIN: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_END: goto end; default: - abort(); + bt_common_abort(); } for (i = 0; i < user_count; i++) { @@ -1693,7 +1641,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( status = auto_seek_handle_message(iterator, ns_from_origin, messages[i], &got_first, stream_states); - if (status == BT_MESSAGE_ITERATOR_STATUS_OK) { + if (status == BT_FUNC_STATUS_OK) { /* Message was either pushed or moved */ messages[i] = NULL; } else { @@ -1705,7 +1653,7 @@ enum bt_message_iterator_status find_message_ge_ns_from_origin( end: for (i = 0; i < user_count; i++) { if (messages[i]) { - bt_object_put_no_null_check(messages[i]); + bt_object_put_ref_no_null_check(messages[i]); } } @@ -1721,7 +1669,7 @@ end: */ static -enum bt_self_message_iterator_status post_auto_seek_next( +enum bt_message_iterator_class_next_method_status post_auto_seek_next( struct bt_self_component_port_input_message_iterator *iterator, bt_message_array_const msgs, uint64_t capacity, uint64_t *count) @@ -1748,7 +1696,7 @@ enum bt_self_message_iterator_status post_auto_seek_next( iterator->auto_seek.original_next_callback = NULL; } - return BT_SELF_MESSAGE_ITERATOR_STATUS_OK; + return BT_FUNC_STATUS_OK; } static inline @@ -1764,15 +1712,33 @@ int clock_raw_value_from_ns_from_origin(const bt_clock_class *clock_class, cc_offset_cycles, cc_freq, ns_from_origin, raw_value); } +static +bool message_iterator_can_seek_ns_from_origin( + struct bt_self_component_port_input_message_iterator *iterator, + int64_t ns_from_origin) +{ + enum bt_message_iterator_can_seek_ns_from_origin_status status; + bt_bool can_seek; + + status = bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( + iterator, ns_from_origin, &can_seek); + if (status != BT_FUNC_STATUS_OK) { + can_seek = BT_FALSE; + } + + return can_seek; +} -enum bt_message_iterator_status +enum bt_message_iterator_seek_ns_from_origin_status bt_self_component_port_input_message_iterator_seek_ns_from_origin( struct bt_self_component_port_input_message_iterator *iterator, int64_t ns_from_origin) { int status; GHashTable *stream_states = NULL; + bt_bool can_seek_by_itself; + BT_ASSERT_PRE_NO_ERROR(); BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator); BT_ASSERT_PRE( @@ -1780,9 +1746,9 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not configured: %!+g", bt_component_borrow_graph(iterator->upstream_component)); + /* The iterator must be able to seek ns from origin one way or another. */ BT_ASSERT_PRE( - bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( - iterator, ns_from_origin), + message_iterator_can_seek_ns_from_origin(iterator, ns_from_origin), "Message iterator cannot seek nanoseconds from origin: %!+i, " "ns-from-origin=%" PRId64, iterator, ns_from_origin); set_self_comp_port_input_msg_iterator_state(iterator, @@ -1794,49 +1760,85 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( */ reset_iterator_expectations(iterator); - if (iterator->methods.seek_ns_from_origin) { + /* Check if the iterator can seek by itself. If not we'll use autoseek. */ + if (iterator->methods.can_seek_ns_from_origin) { + bt_message_iterator_class_can_seek_ns_from_origin_method_status + can_seek_status; + + can_seek_status = + iterator->methods.can_seek_ns_from_origin( + iterator, ns_from_origin, &can_seek_by_itself); + if (can_seek_status != BT_FUNC_STATUS_OK) { + status = can_seek_status; + goto end; + } + } else { + can_seek_by_itself = false; + } + + if (can_seek_by_itself) { /* The iterator knows how to seek to a particular time, let it handle this. */ + BT_ASSERT(iterator->methods.seek_ns_from_origin); BT_LIB_LOGD("Calling user's \"seek nanoseconds from origin\" method: " "%![iter-]+i, ns=%" PRId64, iterator, ns_from_origin); status = iterator->methods.seek_ns_from_origin(iterator, ns_from_origin); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_POST(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, - bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek nanoseconds from origin\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } } else { /* - * The iterator doesn't know how to seek to a particular time. We will - * seek to the beginning and fast forward to the right place. + * The iterator doesn't know how to seek by itself to a + * particular time. We will seek to the beginning and fast + * forward to the right place. */ - BT_ASSERT(iterator->methods.can_seek_beginning(iterator)); + enum bt_message_iterator_class_can_seek_beginning_method_status can_seek_status; + bt_bool can_seek_beginning; + + can_seek_status = iterator->methods.can_seek_beginning(iterator, + &can_seek_beginning); + BT_ASSERT(can_seek_status == BT_FUNC_STATUS_OK); + BT_ASSERT(can_seek_beginning); BT_ASSERT(iterator->methods.seek_beginning); BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i", iterator); status = iterator->methods.seek_beginning(iterator); BT_LOGD("User method returned: status=%s", - bt_message_iterator_status_string(status)); - BT_ASSERT_POST(status == BT_MESSAGE_ITERATOR_STATUS_OK || - status == BT_MESSAGE_ITERATOR_STATUS_ERROR || - status == BT_MESSAGE_ITERATOR_STATUS_NOMEM || - status == BT_MESSAGE_ITERATOR_STATUS_AGAIN, + bt_common_func_status_string(status)); + BT_ASSERT_POST(status == BT_FUNC_STATUS_OK || + status == BT_FUNC_STATUS_ERROR || + status == BT_FUNC_STATUS_MEMORY_ERROR || + status == BT_FUNC_STATUS_AGAIN, "Unexpected status: %![iter-]+i, status=%s", - iterator, - bt_common_self_message_iterator_status_string(status)); + iterator, bt_common_func_status_string(status)); + if (status < 0) { + BT_LIB_LOGW_APPEND_CAUSE( + "Component input port message iterator's \"seek beginning\" method failed: " + "%![iter-]+i, status=%s", + iterator, bt_common_func_status_string(status)); + } + switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: + case BT_FUNC_STATUS_OK: break; - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_AGAIN: goto end; default: - abort(); + bt_common_abort(); } /* @@ -1847,22 +1849,23 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( * message queue. */ while (!g_queue_is_empty(iterator->auto_seek.msgs)) { - bt_object_put_no_null_check( + bt_object_put_ref_no_null_check( g_queue_pop_tail(iterator->auto_seek.msgs)); } stream_states = create_auto_seek_stream_states(); if (!stream_states) { - BT_LOGE_STR("Failed to allocate one GHashTable."); - status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to allocate one GHashTable."); + status = BT_FUNC_STATUS_MEMORY_ERROR; goto end; } status = find_message_ge_ns_from_origin(iterator, ns_from_origin, stream_states); switch (status) { - case BT_MESSAGE_ITERATOR_STATUS_OK: - case BT_MESSAGE_ITERATOR_STATUS_END: + case BT_FUNC_STATUS_OK: + case BT_FUNC_STATUS_END: { GHashTableIter iter; gpointer key, value; @@ -1880,50 +1883,69 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( bt_message *msg; const bt_clock_class *clock_class = bt_stream_class_borrow_default_clock_class_const( bt_stream_borrow_class_const(stream)); - uint64_t raw_value; - - if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { - BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K", - ns_from_origin, clock_class); - status = BT_MESSAGE_ITERATOR_STATUS_ERROR; - goto end; + /* Initialize to silence maybe-uninitialized warning. */ + uint64_t raw_value = 0; + + /* + * If we haven't seen a message with a clock snapshot, we don't know if our seek time is within + * the clock's range, so it wouldn't be safe to try to convert ns_from_origin to a clock value. + * + * Also, it would be a bit of a lie to generate a stream begin message with the seek time as its + * clock snapshot, because we don't really know if the stream existed at that time. If we have + * seen a message with a clock snapshot in our seeking, then we are sure that the + * seek time is not below the clock range, and we know the stream was active at that + * time (and that we cut it short). + */ + if (stream_state->seen_clock_snapshot) { + if (clock_raw_value_from_ns_from_origin(clock_class, ns_from_origin, &raw_value) != 0) { + BT_LIB_LOGW("Could not convert nanoseconds from origin to clock value: ns-from-origin=%" PRId64 ", %![cc-]+K", + ns_from_origin, clock_class); + status = BT_FUNC_STATUS_ERROR; + goto end; + } } switch (stream_state->state) { case AUTO_SEEK_STREAM_STATE_PACKET_BEGAN: BT_ASSERT(stream_state->packet); BT_LIB_LOGD("Creating packet message: %![packet-]+a", stream_state->packet); - msg = bt_message_packet_beginning_create_with_default_clock_snapshot( - (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); - if (!msg) { - status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; - goto end; + + if (stream->class->packets_have_beginning_default_clock_snapshot) { + /* + * If we are in the PACKET_BEGAN state, it means we have seen a "packet beginning" + * message. If "packet beginning" packets have clock snapshots, then we must have + * seen a clock snapshot. + */ + BT_ASSERT(stream_state->seen_clock_snapshot); + + msg = bt_message_packet_beginning_create_with_default_clock_snapshot( + (bt_self_message_iterator *) iterator, stream_state->packet, raw_value); + } else { + msg = bt_message_packet_beginning_create((bt_self_message_iterator *) iterator, + stream_state->packet); } - g_queue_push_head(iterator->auto_seek.msgs, msg); - msg = NULL; - /* fall-thru */ - case AUTO_SEEK_STREAM_STATE_STREAM_ACTIVITY_BEGAN: - msg = bt_message_stream_activity_beginning_create( - (bt_self_message_iterator *) iterator, stream); if (!msg) { - status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + status = BT_FUNC_STATUS_MEMORY_ERROR; goto end; } - bt_message_stream_activity_beginning_set_default_clock_snapshot(msg, raw_value); - g_queue_push_head(iterator->auto_seek.msgs, msg); msg = NULL; /* fall-thru */ + case AUTO_SEEK_STREAM_STATE_STREAM_BEGAN: msg = bt_message_stream_beginning_create( (bt_self_message_iterator *) iterator, stream); if (!msg) { - status = BT_MESSAGE_ITERATOR_STATUS_NOMEM; + status = BT_FUNC_STATUS_MEMORY_ERROR; goto end; } + if (stream_state->seen_clock_snapshot) { + bt_message_stream_beginning_set_default_clock_snapshot(msg, raw_value); + } + g_queue_push_head(iterator->auto_seek.msgs, msg); msg = NULL; break; @@ -1946,21 +1968,21 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin( } /* - * `BT_MESSAGE_ITERATOR_STATUS_END` becomes - * `BT_MESSAGE_ITERATOR_STATUS_OK`: the next + * `BT_FUNC_STATUS_END` becomes + * `BT_FUNC_STATUS_OK`: the next * time this iterator's "next" method is called, * it will return - * `BT_MESSAGE_ITERATOR_STATUS_END`. + * `BT_FUNC_STATUS_END`. */ - status = BT_MESSAGE_ITERATOR_STATUS_OK; + status = BT_FUNC_STATUS_OK; break; } - case BT_MESSAGE_ITERATOR_STATUS_ERROR: - case BT_MESSAGE_ITERATOR_STATUS_NOMEM: - case BT_MESSAGE_ITERATOR_STATUS_AGAIN: + case BT_FUNC_STATUS_ERROR: + case BT_FUNC_STATUS_MEMORY_ERROR: + case BT_FUNC_STATUS_AGAIN: goto end; default: - abort(); + bt_common_abort(); } } @@ -1975,72 +1997,19 @@ end: destroy_auto_seek_stream_states(stream_states); stream_states = NULL; } + set_iterator_state_after_seeking(iterator, status); return status; } -static inline -bt_self_component_port_input_message_iterator * -borrow_output_port_message_iterator_upstream_iterator( - struct bt_port_output_message_iterator *iterator) -{ - struct bt_component_class_sink_colander_priv_data *colander_data; - - BT_ASSERT(iterator); - colander_data = (void *) iterator->colander->parent.user_data; - BT_ASSERT(colander_data); - BT_ASSERT(colander_data->msg_iter); - return colander_data->msg_iter; -} - -bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin( - struct bt_port_output_message_iterator *iterator, - int64_t ns_from_origin) +bt_bool bt_self_message_iterator_is_interrupted( + const struct bt_self_message_iterator *self_msg_iter) { - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin( - borrow_output_port_message_iterator_upstream_iterator( - iterator), ns_from_origin); -} + const struct bt_self_component_port_input_message_iterator *iterator = + (const void *) self_msg_iter; -bt_bool bt_port_output_message_iterator_can_seek_beginning( - struct bt_port_output_message_iterator *iterator) -{ BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_can_seek_beginning( - borrow_output_port_message_iterator_upstream_iterator( - iterator)); -} - -enum bt_message_iterator_status bt_port_output_message_iterator_seek_ns_from_origin( - struct bt_port_output_message_iterator *iterator, - int64_t ns_from_origin) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_seek_ns_from_origin( - borrow_output_port_message_iterator_upstream_iterator(iterator), - ns_from_origin); -} - -enum bt_message_iterator_status bt_port_output_message_iterator_seek_beginning( - struct bt_port_output_message_iterator *iterator) -{ - BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); - return bt_self_component_port_input_message_iterator_seek_beginning( - borrow_output_port_message_iterator_upstream_iterator( - iterator)); -} - -void bt_port_output_message_iterator_get_ref( - const struct bt_port_output_message_iterator *iterator) -{ - bt_object_get_ref(iterator); -} - -void bt_port_output_message_iterator_put_ref( - const struct bt_port_output_message_iterator *iterator) -{ - bt_object_put_ref(iterator); + return (bt_bool) bt_graph_is_interrupted(iterator->graph); } void bt_self_component_port_input_message_iterator_get_ref(