lib: run most of bt_self_component_port_input_message_iterator_try_finalize when...
[babeltrace.git] / src / lib / graph / iterator.c
index 511a1f981ba020c18c91285ba4e139d17b806f64..3ef5fd57d226a6e17d7a69157dedf79a1831fec7 100644 (file)
@@ -60,6 +60,7 @@
 #include "lib/assert-post.h"
 #include <stdint.h>
 #include <inttypes.h>
+#include <stdbool.h>
 #include <stdlib.h>
 
 #include "component-class.h"
@@ -95,7 +96,7 @@ void set_self_comp_port_input_msg_iterator_state(
                struct bt_self_component_port_input_message_iterator *iterator,
                enum bt_self_component_port_input_message_iterator_state state)
 {
-       BT_ASSERT(iterator);
+       BT_ASSERT_DBG(iterator);
        BT_LIB_LOGD("Updating message iterator's state: new-state=%s",
                bt_self_component_port_input_message_iterator_state_string(state));
        iterator->state = state;
@@ -137,7 +138,7 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj
 
        if (iterator->auto_seek.msgs) {
                while (!g_queue_is_empty(iterator->auto_seek.msgs)) {
-                       bt_object_put_no_null_check(
+                       bt_object_put_ref_no_null_check(
                                g_queue_pop_tail(iterator->auto_seek.msgs));
                }
 
@@ -169,18 +170,24 @@ void bt_self_component_port_input_message_iterator_try_finalize(
 {
        uint64_t i;
        typedef void (*method_t)(void *);
-
-       struct bt_component_class *comp_class = NULL;
-       method_t method = NULL;
+       bool call_user_finalize = true;
 
        BT_ASSERT(iterator);
 
        switch (iterator->state) {
        case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED:
-               /* Skip user finalization if user initialization failed */
-               BT_LIB_LOGD("Not finalizing non-initialized message iterator: "
-                       "%!+i", iterator);
-               goto end;
+               /*
+                * If this function is called while the iterator is in the
+                * NON_INITIALIZED state, it means the user initialization
+                * method has either not been called, or has failed.  We
+                * therefore don't want to call the user finalization method.
+                * However, the initialization method might have created some
+                * upstream message iterators before failing, so we want to
+                * execute the rest of this function, which unlinks the related
+                * iterators.
+                */
+               call_user_finalize = false;
+               break;
        case BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZED:
                /* Already finalized */
                BT_LIB_LOGD("Not finalizing message iterator: already finalized: "
@@ -190,7 +197,7 @@ void bt_self_component_port_input_message_iterator_try_finalize(
                /* Finalizing */
                BT_LIB_LOGF("Message iterator is already being finalized: "
                        "%!+i", iterator);
-               abort();
+               bt_common_abort();
        default:
                break;
        }
@@ -199,35 +206,48 @@ void bt_self_component_port_input_message_iterator_try_finalize(
        set_self_comp_port_input_msg_iterator_state(iterator,
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_FINALIZING);
        BT_ASSERT(iterator->upstream_component);
-       comp_class = iterator->upstream_component->class;
 
        /* Call user-defined destroy method */
-       switch (comp_class->type) {
-       case BT_COMPONENT_CLASS_TYPE_SOURCE:
-       {
-               struct bt_component_class_source *src_comp_cls =
-                       (void *) comp_class;
+       if (call_user_finalize) {
+               method_t method = NULL;
+               struct bt_component_class *comp_class =
+                       iterator->upstream_component->class;
 
-               method = (method_t) src_comp_cls->methods.msg_iter_finalize;
-               break;
-       }
-       case BT_COMPONENT_CLASS_TYPE_FILTER:
-       {
-               struct bt_component_class_filter *flt_comp_cls =
-                       (void *) comp_class;
+               switch (comp_class->type) {
+               case BT_COMPONENT_CLASS_TYPE_SOURCE:
+               {
+                       struct bt_component_class_source *src_comp_cls =
+                               (void *) comp_class;
 
-               method = (method_t) flt_comp_cls->methods.msg_iter_finalize;
-               break;
-       }
-       default:
-               /* Unreachable */
-               abort();
-       }
+                       method = (method_t) src_comp_cls->methods.msg_iter_finalize;
+                       break;
+               }
+               case BT_COMPONENT_CLASS_TYPE_FILTER:
+               {
+                       struct bt_component_class_filter *flt_comp_cls =
+                               (void *) comp_class;
 
-       if (method) {
-               BT_LIB_LOGD("Calling user's finalization method: %!+i",
-                       iterator);
-               method(iterator);
+                       method = (method_t) flt_comp_cls->methods.msg_iter_finalize;
+                       break;
+               }
+               default:
+                       /* Unreachable */
+                       bt_common_abort();
+               }
+
+               if (method) {
+                       const bt_error *saved_error;
+
+                       saved_error = bt_current_thread_take_error();
+
+                       BT_LIB_LOGD("Calling user's finalization method: %!+i",
+                               iterator);
+                       method(iterator);
+
+                       if (saved_error) {
+                               BT_CURRENT_THREAD_MOVE_ERROR_AND_RESET(saved_error);
+                       }
+               }
        }
 
        /* Detach upstream message iterators */
@@ -427,7 +447,7 @@ int create_self_component_input_port_message_iterator(
                break;
        }
        default:
-               abort();
+               bt_common_abort();
        }
 
        if (iterator->methods.seek_ns_from_origin &&
@@ -465,7 +485,7 @@ int create_self_component_input_port_message_iterator(
        }
        default:
                /* Unreachable */
-               abort();
+               bt_common_abort();
        }
 
        if (init_method) {
@@ -476,6 +496,7 @@ int create_self_component_input_port_message_iterator(
                        upstream_port);
                BT_LOGD("User method returned: status=%s",
                        bt_common_func_status_string(iter_status));
+               BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(iter_status);
                if (iter_status != BT_FUNC_STATUS_OK) {
                        BT_LIB_LOGW_APPEND_CAUSE(
                                "Component input port message iterator initialization method failed: "
@@ -525,6 +546,7 @@ bt_self_component_port_input_message_iterator_create_from_message_iterator(
                struct bt_self_component_port_input *input_port,
                struct bt_self_component_port_input_message_iterator **message_iterator)
 {
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(self_msg_iter, "Message iterator");
        return create_self_component_input_port_message_iterator(self_msg_iter,
                input_port, message_iterator);
@@ -536,6 +558,7 @@ bt_self_component_port_input_message_iterator_create_from_sink_component(
                struct bt_self_component_port_input *input_port,
                struct bt_self_component_port_input_message_iterator **message_iterator)
 {
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(self_comp, "Sink component");
        return create_self_component_input_port_message_iterator(NULL,
                input_port, message_iterator);
@@ -642,8 +665,15 @@ bool clock_snapshots_are_monotonic_one(
                goto end;
        }
 
-       clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(clock_snapshot, &ns_from_origin);
+       clock_snapshot_status = bt_clock_snapshot_get_ns_from_origin(
+               clock_snapshot, &ns_from_origin);
        if (clock_snapshot_status != BT_FUNC_STATUS_OK) {
+               /*
+                * bt_clock_snapshot_get_ns_from_origin can return
+                * OVERFLOW_ERROR.  We don't really want to report an error to
+                * our caller, so just clear it.
+                */
+               bt_current_thread_clear_error();
                goto end;
        }
 
@@ -848,7 +878,7 @@ call_iterator_next_method(
 {
        enum bt_component_class_message_iterator_next_method_status status;
 
-       BT_ASSERT(iterator->methods.next);
+       BT_ASSERT_DBG(iterator->methods.next);
        BT_LOGD_STR("Calling user's \"next\" method.");
        status = iterator->methods.next(iterator, msgs, capacity, user_count);
        BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
@@ -861,6 +891,8 @@ call_iterator_next_method(
                        "Clock snapshots are not monotonic");
        }
 
+       BT_ASSERT_POST_DEV_NO_ERROR_IF_NO_ERROR_STATUS(status);
+
        return status;
 }
 
@@ -871,6 +903,7 @@ bt_self_component_port_input_message_iterator_next(
 {
        enum bt_message_iterator_next_status status = BT_FUNC_STATUS_OK;
 
+       BT_ASSERT_PRE_DEV_NO_ERROR();
        BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_DEV_NON_NULL(msgs, "Message array (output)");
        BT_ASSERT_PRE_DEV_NON_NULL(user_count, "Message count (output)");
@@ -878,8 +911,8 @@ bt_self_component_port_input_message_iterator_next(
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE,
                "Message iterator's \"next\" called, but "
                "message iterator is in the wrong state: %!+i", iterator);
-       BT_ASSERT(iterator->upstream_component);
-       BT_ASSERT(iterator->upstream_component->class);
+       BT_ASSERT_DBG(iterator->upstream_component);
+       BT_ASSERT_DBG(iterator->upstream_component->class);
        BT_ASSERT_PRE_DEV(
                bt_component_borrow_graph(iterator->upstream_component)->config_state !=
                        BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
@@ -916,7 +949,7 @@ bt_self_component_port_input_message_iterator_next(
         * For the same reason, there is no way that this iterator could
         * have seeked (cannot seek a self message iterator).
         */
-       BT_ASSERT(iterator->state ==
+       BT_ASSERT_DBG(iterator->state ==
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
 
        switch (status) {
@@ -935,7 +968,7 @@ bt_self_component_port_input_message_iterator_next(
                goto end;
        default:
                /* Unknown non-error status */
-               abort();
+               bt_common_abort();
        }
 
 end:
@@ -950,14 +983,6 @@ bt_self_component_port_input_message_iterator_borrow_component(
        return iterator->upstream_component;
 }
 
-const struct bt_component *
-bt_self_component_port_input_message_iterator_borrow_component_const(
-               const struct bt_self_component_port_input_message_iterator *iterator)
-{
-       BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
-       return iterator->upstream_component;
-}
-
 struct bt_self_component *bt_self_message_iterator_borrow_component(
                struct bt_self_message_iterator *self_iterator)
 {
@@ -985,6 +1010,7 @@ bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
 {
        enum bt_message_iterator_can_seek_ns_from_origin_status status;
 
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)");
        BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
@@ -1007,6 +1033,8 @@ bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
                status = (int) iterator->methods.can_seek_ns_from_origin(iterator,
                        ns_from_origin, can_seek);
 
+               BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
+
                if (status != BT_FUNC_STATUS_OK) {
                        BT_LIB_LOGW_APPEND_CAUSE(
                                "Component input port message iterator's \"can seek nanoseconds from origin\" method failed: "
@@ -1053,6 +1081,7 @@ bt_self_component_port_input_message_iterator_can_seek_beginning(
 {
        enum bt_message_iterator_can_seek_beginning_status status;
 
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_NON_NULL(can_seek, "Result (output)");
        BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
@@ -1077,6 +1106,7 @@ bt_self_component_port_input_message_iterator_can_seek_beginning(
                                *can_seek == BT_FALSE,
                        "Unexpected boolean value returned from user's \"can seek beginning\" method: val=%d, %![iter-]+i",
                        *can_seek, iterator);
+               BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
        } else {
                *can_seek = BT_FALSE;
                status = BT_FUNC_STATUS_OK;
@@ -1108,7 +1138,7 @@ void set_iterator_state_after_seeking(
                new_state = BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ENDED;
                break;
        default:
-               abort();
+               bt_common_abort();
        }
 
        set_self_comp_port_input_msg_iterator_state(iterator, new_state);
@@ -1144,6 +1174,7 @@ bt_self_component_port_input_message_iterator_seek_beginning(
 {
        int status;
 
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
        BT_ASSERT_PRE(
@@ -1172,6 +1203,7 @@ bt_self_component_port_input_message_iterator_seek_beginning(
                status == BT_FUNC_STATUS_AGAIN,
                "Unexpected status: %![iter-]+i, status=%s",
                iterator, bt_common_func_status_string(status));
+       BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
        if (status < 0) {
                BT_LIB_LOGW_APPEND_CAUSE(
                        "Component input port message iterator's \"seek beginning\" method failed: "
@@ -1275,8 +1307,8 @@ int auto_seek_handle_message(
        const struct bt_clock_snapshot *clk_snapshot = NULL;
        int ret;
 
-       BT_ASSERT(msg);
-       BT_ASSERT(got_first);
+       BT_ASSERT_DBG(msg);
+       BT_ASSERT_DBG(got_first);
 
        switch (msg->type) {
        case BT_MESSAGE_TYPE_EVENT:
@@ -1296,7 +1328,7 @@ int auto_seek_handle_message(
                        (const void *) msg;
 
                clk_snapshot = inactivity_msg->default_cs;
-               BT_ASSERT(clk_snapshot);
+               BT_ASSERT_DBG(clk_snapshot);
                break;
        }
        case BT_MESSAGE_TYPE_PACKET_BEGINNING:
@@ -1392,10 +1424,10 @@ int auto_seek_handle_message(
                break;
        }
        default:
-               abort();
+               bt_common_abort();
        }
 
-       BT_ASSERT(clk_snapshot);
+       BT_ASSERT_DBG(clk_snapshot);
        ret = bt_clock_snapshot_get_ns_from_origin(clk_snapshot,
                &msg_ns_from_origin);
        if (ret) {
@@ -1429,7 +1461,7 @@ skip_msg:
                        stream_state->seen_clock_snapshot = true;
                }
 
-               BT_ASSERT(!bt_g_hash_table_contains(stream_states, stream_msg->stream));
+               BT_ASSERT_DBG(!bt_g_hash_table_contains(stream_states, stream_msg->stream));
                g_hash_table_insert(stream_states, stream_msg->stream, stream_state);
                break;
        }
@@ -1441,11 +1473,10 @@ skip_msg:
 
                /* Update stream's state: packet began. */
                stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
-               BT_ASSERT(stream_state);
-
-               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
+               BT_ASSERT_DBG(stream_state);
+               BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
                stream_state->state = AUTO_SEEK_STREAM_STATE_PACKET_BEGAN;
-               BT_ASSERT(!stream_state->packet);
+               BT_ASSERT_DBG(!stream_state->packet);
                stream_state->packet = packet_msg->packet;
 
                if (packet_msg->packet->stream->class->packets_have_beginning_default_clock_snapshot) {
@@ -1461,7 +1492,7 @@ skip_msg:
 
                stream_state = g_hash_table_lookup(stream_states,
                        event_msg->event->packet->stream);
-               BT_ASSERT(stream_state);
+               BT_ASSERT_DBG(stream_state);
 
                // HELPME: are we sure that event messages have clock snapshots at this point?
                stream_state->seen_clock_snapshot = true;
@@ -1476,11 +1507,10 @@ skip_msg:
 
                /* Update stream's state: packet ended. */
                stream_state = g_hash_table_lookup(stream_states, packet_msg->packet->stream);
-               BT_ASSERT(stream_state);
-
-               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN);
+               BT_ASSERT_DBG(stream_state);
+               BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_PACKET_BEGAN);
                stream_state->state = AUTO_SEEK_STREAM_STATE_STREAM_BEGAN;
-               BT_ASSERT(stream_state->packet);
+               BT_ASSERT_DBG(stream_state->packet);
                stream_state->packet = NULL;
 
                if (packet_msg->packet->stream->class->packets_have_end_default_clock_snapshot) {
@@ -1495,9 +1525,9 @@ skip_msg:
                struct auto_seek_stream_state *stream_state;
 
                stream_state = g_hash_table_lookup(stream_states, stream_msg->stream);
-               BT_ASSERT(stream_state);
-               BT_ASSERT(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
-               BT_ASSERT(!stream_state->packet);
+               BT_ASSERT_DBG(stream_state);
+               BT_ASSERT_DBG(stream_state->state == AUTO_SEEK_STREAM_STATE_STREAM_BEGAN);
+               BT_ASSERT_DBG(!stream_state->packet);
 
                /* Update stream's state: this stream doesn't exist anymore. */
                g_hash_table_remove(stream_states, stream_msg->stream);
@@ -1511,7 +1541,7 @@ skip_msg:
                struct auto_seek_stream_state *stream_state;
 
                stream_state = g_hash_table_lookup(stream_states, discarded_msg->stream);
-               BT_ASSERT(stream_state);
+               BT_ASSERT_DBG(stream_state);
 
                if ((msg->type == BT_MESSAGE_TYPE_DISCARDED_EVENTS && discarded_msg->stream->class->discarded_events_have_default_clock_snapshots) ||
                        (msg->type == BT_MESSAGE_TYPE_DISCARDED_PACKETS && discarded_msg->stream->class->discarded_packets_have_default_clock_snapshots)) {
@@ -1524,7 +1554,7 @@ skip_msg:
                break;
        }
 
-       bt_object_put_no_null_check(msg);
+       bt_object_put_ref_no_null_check(msg);
        msg = NULL;
        goto end;
 
@@ -1533,7 +1563,7 @@ push_msg:
        msg = NULL;
 
 end:
-       BT_ASSERT(!msg || status != BT_FUNC_STATUS_OK);
+       BT_ASSERT_DBG(!msg || status != BT_FUNC_STATUS_OK);
        return status;
 }
 
@@ -1542,7 +1572,7 @@ int find_message_ge_ns_from_origin(
                struct bt_self_component_port_input_message_iterator *iterator,
                int64_t ns_from_origin, GHashTable *stream_states)
 {
-       int status;
+       int status = BT_FUNC_STATUS_OK;
        enum bt_self_component_port_input_message_iterator_state init_state =
                iterator->state;
        const struct bt_message *messages[MSG_BATCH_SIZE];
@@ -1550,7 +1580,7 @@ int find_message_ge_ns_from_origin(
        uint64_t i;
        bool got_first = false;
 
-       BT_ASSERT(iterator);
+       BT_ASSERT_DBG(iterator);
        memset(&messages[0], 0, sizeof(messages[0]) * MSG_BATCH_SIZE);
 
        /*
@@ -1560,7 +1590,7 @@ int find_message_ge_ns_from_origin(
        set_self_comp_port_input_msg_iterator_state(iterator,
                BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
 
-       BT_ASSERT(iterator->methods.next);
+       BT_ASSERT_DBG(iterator->methods.next);
 
        while (!got_first) {
                /*
@@ -1582,7 +1612,7 @@ int find_message_ge_ns_from_origin(
                 * The user's "next" method must not do any action which
                 * would change the iterator's state.
                 */
-               BT_ASSERT(iterator->state ==
+               BT_ASSERT_DBG(iterator->state ==
                        BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_ACTIVE);
 
                switch (status) {
@@ -1598,7 +1628,7 @@ int find_message_ge_ns_from_origin(
                case BT_FUNC_STATUS_END:
                        goto end;
                default:
-                       abort();
+                       bt_common_abort();
                }
 
                for (i = 0; i < user_count; i++) {
@@ -1624,7 +1654,7 @@ int find_message_ge_ns_from_origin(
 end:
        for (i = 0; i < user_count; i++) {
                if (messages[i]) {
-                       bt_object_put_no_null_check(messages[i]);
+                       bt_object_put_ref_no_null_check(messages[i]);
                }
        }
 
@@ -1709,6 +1739,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
        GHashTable *stream_states = NULL;
        bt_bool can_seek_by_itself;
 
+       BT_ASSERT_PRE_NO_ERROR();
        BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
        BT_ASSERT_PRE_ITER_HAS_STATE_TO_SEEK(iterator);
        BT_ASSERT_PRE(
@@ -1761,6 +1792,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                        status == BT_FUNC_STATUS_AGAIN,
                        "Unexpected status: %![iter-]+i, status=%s",
                        iterator, bt_common_func_status_string(status));
+               BT_ASSERT_POST_NO_ERROR_IF_NO_ERROR_STATUS(status);
                if (status < 0) {
                        BT_LIB_LOGW_APPEND_CAUSE(
                                "Component input port message iterator's \"seek nanoseconds from origin\" method failed: "
@@ -1781,7 +1813,6 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                        &can_seek_beginning);
                BT_ASSERT(can_seek_status == BT_FUNC_STATUS_OK);
                BT_ASSERT(can_seek_beginning);
-
                BT_ASSERT(iterator->methods.seek_beginning);
                BT_LIB_LOGD("Calling user's \"seek beginning\" method: %!+i",
                        iterator);
@@ -1809,7 +1840,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                case BT_FUNC_STATUS_AGAIN:
                        goto end;
                default:
-                       abort();
+                       bt_common_abort();
                }
 
                /*
@@ -1820,7 +1851,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                 * message queue.
                 */
                while (!g_queue_is_empty(iterator->auto_seek.msgs)) {
-                       bt_object_put_no_null_check(
+                       bt_object_put_ref_no_null_check(
                                g_queue_pop_tail(iterator->auto_seek.msgs));
                }
 
@@ -1953,7 +1984,7 @@ bt_self_component_port_input_message_iterator_seek_ns_from_origin(
                case BT_FUNC_STATUS_AGAIN:
                        goto end;
                default:
-                       abort();
+                       bt_common_abort();
                }
        }
 
This page took 0.030719 seconds and 4 git commands to generate.