lib: remove output port message iterator
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Sun, 4 Aug 2019 14:04:03 +0000 (10:04 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Mon, 5 Aug 2019 19:10:13 +0000 (15:10 -0400)
The output port message iterator concept was not a good idea in the
beginning: it breaks the graph model, making it possible to iterate a
disconnected output port (which is weird from the component's
perspective), blocking the graph (user cannot call bt_graph_run()), and
forcing the graph not to contain any sink component. This looks and
feels like a hack.

For the C part, a subsequent patch should implement an easy way to add a
simple sink component to a graph based on a simple callback and custom
user data instead of going through the inconvenience of creating a sink
component class, setting the methods manually, creating an input port
message iterator once the graph is configured, etc.

For the Python part, we'll focus on `TraceCollectionMessageIterator` to
replace `_OutputPortMessageIterator`. `TraceCollectionMessageIterator`
should cover most of the use cases and is easier to use: you don't need
to set up your graph, add your own `flt.utils.muxer`, etc.

For more advanced use cases in Python, it's always possible to create a
"proxy sink component", just like `TraceCollectionMessageIterator` does
internally, to get full control on the input port message iterator.

To adapt the current tests, `TestOutputPortMessageIterator` in
`utils.py` can be used like an output port message iterator. Such an
iterator cannot seek however, so `test_message_iterator.py` needed
special treatments to make the eventual input port message iterator
seek.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I92d432fb33d35ae3c0262b723cdfeae82c6633c9
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1821
Tested-by: jenkins <jenkins@lttng.org>
17 files changed:
include/Makefile.am
include/babeltrace2/babeltrace.h
include/babeltrace2/graph/port-output-message-iterator.h [deleted file]
src/bindings/python/bt2/bt2/graph.py
src/bindings/python/bt2/bt2/message_iterator.py
src/bindings/python/bt2/bt2/native_bt_message_iterator.i
src/bindings/python/bt2/bt2/native_bt_message_iterator.i.h
src/bindings/python/bt2/bt2/port.py
src/lib/error.c
src/lib/graph/iterator.c
src/lib/graph/message/iterator.h
src/lib/lib-logging.c
tests/bindings/python/bt2/test_clock_class.py
tests/bindings/python/bt2/test_event.py
tests/bindings/python/bt2/test_message.py
tests/bindings/python/bt2/test_message_iterator.py
tests/bindings/python/bt2/utils.py

index bddc8a5dba8f0d4b2dcc56d47f946b9c6c9dad34..f2ff631be4b1728daa3a0250cc34845c78677de8 100644 (file)
@@ -111,7 +111,6 @@ babeltrace2graphinclude_HEADERS = \
        babeltrace2/graph/port-const.h \
        babeltrace2/graph/port-input-const.h \
        babeltrace2/graph/port-output-const.h \
-       babeltrace2/graph/port-output-message-iterator.h \
        babeltrace2/graph/private-query-executor.h \
        babeltrace2/graph/query-executor-const.h \
        babeltrace2/graph/query-executor.h \
index edd45d76d3380c3c259156fe93d91475e1f28cd4..92599faacdb8f95006bbf02c19bfd28213c48cc7 100644 (file)
 
 /* Message iterator API */
 #include <babeltrace2/graph/message-iterator.h>
-#include <babeltrace2/graph/port-output-message-iterator.h>
 #include <babeltrace2/graph/self-component-port-input-message-iterator.h>
 #include <babeltrace2/graph/self-message-iterator.h>
 
diff --git a/include/babeltrace2/graph/port-output-message-iterator.h b/include/babeltrace2/graph/port-output-message-iterator.h
deleted file mode 100644 (file)
index 49d564a..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-#ifndef BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H
-#define BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H
-
-/*
- * Copyright (c) 2010-2019 EfficiOS Inc. and Linux Foundation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- * SOFTWARE.
- */
-
-#ifndef __BT_IN_BABELTRACE_H
-# error "Please include <babeltrace2/babeltrace.h> instead."
-#endif
-
-#include <stdint.h>
-
-#include <babeltrace2/graph/message-iterator.h>
-#include <babeltrace2/types.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-static inline
-bt_message_iterator *
-bt_port_output_message_iterator_as_message_iterator(
-               bt_port_output_message_iterator *iterator)
-{
-       return __BT_UPCAST(bt_message_iterator, iterator);
-}
-
-extern bt_port_output_message_iterator *
-bt_port_output_message_iterator_create(
-               bt_graph *graph,
-               const bt_port_output *output_port);
-
-extern bt_message_iterator_next_status
-bt_port_output_message_iterator_next(
-               bt_port_output_message_iterator *iterator,
-               bt_message_array_const *msgs, uint64_t *count);
-
-extern bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin(
-               bt_port_output_message_iterator *iterator,
-               int64_t ns_from_origin);
-
-extern bt_bool bt_port_output_message_iterator_can_seek_beginning(
-               bt_port_output_message_iterator *iterator);
-
-extern bt_message_iterator_seek_ns_from_origin_status
-bt_port_output_message_iterator_seek_ns_from_origin(
-               bt_port_output_message_iterator *iterator,
-               int64_t ns_from_origin);
-
-extern bt_message_iterator_seek_beginning_status
-bt_port_output_message_iterator_seek_beginning(
-               bt_port_output_message_iterator *iterator);
-
-extern void bt_port_output_message_iterator_get_ref(
-               const bt_port_output_message_iterator *port_output_message_iterator);
-
-extern void bt_port_output_message_iterator_put_ref(
-               const bt_port_output_message_iterator *port_output_message_iterator);
-
-#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_PUT_REF_AND_RESET(_var)        \
-       do {                                                            \
-               bt_port_output_message_iterator_put_ref(_var);  \
-               (_var) = NULL;                                          \
-       } while (0)
-
-#define BT_PORT_OUTPUT_MESSAGE_ITERATOR_MOVE_REF(_var_dst, _var_src) \
-       do {                                                            \
-               bt_port_output_message_iterator_put_ref(_var_dst);      \
-               (_var_dst) = (_var_src);                                \
-               (_var_src) = NULL;                                      \
-       } while (0)
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /* BABELTRACE2_GRAPH_PORT_OUTPUT_MESSAGE_ITERATOR_H */
index 6c75d52b843d1de01911b07632bc7d2e849971bb..3812788f0b4bc05a9e2f08adbf8f82f76c8f2498 100644 (file)
@@ -24,7 +24,6 @@ from bt2 import native_bt, object, utils
 from bt2 import interrupter as bt2_interrupter
 from bt2 import connection as bt2_connection
 from bt2 import component as bt2_component
-from bt2 import message_iterator as bt2_message_iterator
 import functools
 from bt2 import port as bt2_port
 from bt2 import logging as bt2_logging
@@ -196,14 +195,3 @@ class Graph(object._SharedObject):
 
     def interrupt(self):
         native_bt.graph_interrupt(self._ptr)
-
-    def create_output_port_message_iterator(self, output_port):
-        utils._check_type(output_port, bt2_port._OutputPort)
-        msg_iter_ptr = native_bt.port_output_message_iterator_create(
-            self._ptr, output_port._ptr
-        )
-
-        if msg_iter_ptr is None:
-            raise bt2._MemoryError('cannot create output port message iterator')
-
-        return bt2_message_iterator._OutputPortMessageIterator(msg_iter_ptr)
index c6c23dfd8b8848ea004f026036e86a3b48913925..1a48b8117b2bc04a4d33e751a5d8c7ff7551fee2 100644 (file)
@@ -88,20 +88,6 @@ class _UserComponentInputPortMessageIterator(_GenericMessageIterator):
     )
 
 
-# This is created when the user wants to iterate on a component's output port,
-# from outside the graph.
-class _OutputPortMessageIterator(_GenericMessageIterator):
-    _get_msg_range = staticmethod(native_bt.bt2_port_output_get_msg_range)
-    _get_ref = staticmethod(native_bt.port_output_message_iterator_get_ref)
-    _put_ref = staticmethod(native_bt.port_output_message_iterator_put_ref)
-    _can_seek_beginning = staticmethod(
-        native_bt.port_output_message_iterator_can_seek_beginning
-    )
-    _seek_beginning = staticmethod(
-        native_bt.port_output_message_iterator_seek_beginning
-    )
-
-
 # This is extended by the user to implement component classes in Python.  It
 # is created for a given output port when an input port message iterator is
 # created on the input port on the other side of the connection.  It is also
index 7ebd783668cf662b87165b8d8db1a455e14f1c70..6336a2143e1d1f54bad7bf09cf3f61f2bf3f70f6 100644 (file)
@@ -23,7 +23,6 @@
  */
 
 %include <babeltrace2/graph/message-iterator.h>
-%include <babeltrace2/graph/port-output-message-iterator.h>
 %include <babeltrace2/graph/self-component-port-input-message-iterator.h>
 %include <babeltrace2/graph/self-message-iterator.h>
 
@@ -36,5 +35,3 @@ PyObject *bt_bt2_get_user_component_from_user_msg_iter(
                bt_self_message_iterator *self_message_iterator);
 PyObject *bt_bt2_self_component_port_input_get_msg_range(
                bt_self_component_port_input_message_iterator *iter);
-PyObject *bt_bt2_port_output_get_msg_range(
-               bt_port_output_message_iterator *iter);
index 2ce15de3ee5be81403d36a2d4e6690c0d22ed41a..f687dc9a480c01ff9eed498a432a812ba17d8420 100644 (file)
@@ -94,15 +94,3 @@ static PyObject *bt_bt2_self_component_port_input_get_msg_range(
                &messages, &message_count);
        return get_msg_range_common(status, messages, message_count);
 }
-
-static PyObject *bt_bt2_port_output_get_msg_range(
-               bt_port_output_message_iterator *iter)
-{
-       bt_message_array_const messages;
-       uint64_t message_count = 0;
-       bt_message_iterator_next_status status;
-
-       status = bt_port_output_message_iterator_next(iter, &messages,
-               &message_count);
-       return get_msg_range_common(status, messages, message_count);
-}
index bf11aa9430def3529db5df0614e0e122e8eefc42..418f215b8952162cc6cc98bda6aedc3aa6f33f60 100644 (file)
@@ -23,7 +23,6 @@
 from bt2 import native_bt, object
 from bt2 import component as bt2_component
 from bt2 import connection as bt2_connection
-from bt2 import message_iterator as bt2_message_iterator
 from bt2 import message as bt2_message
 import bt2
 
index f54b65f342f443517f4bb6d071dd9ede089512b0..3ae2db988865b5416abcaabaeb648bed81d95528 100644 (file)
@@ -378,8 +378,6 @@ create_error_cause_message_iterator_actor(struct bt_message_iterator *iter,
         * message iterator, which is a self component port input
         * message iterator.
         */
-       BT_ASSERT(iter->type ==
-               BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT);
        input_port_iter = (void *) iter;
        cause = g_new0(struct bt_error_cause_message_iterator_actor, 1);
        if (!cause) {
index b5033083fdaeb25d44121cfbbd8852003fc86c33..62e1f91e768719131929aa94aa0b94fdcca46494 100644 (file)
@@ -43,7 +43,6 @@
 #include <babeltrace2/graph/message-const.h>
 #include <babeltrace2/graph/message-iterator.h>
 #include <babeltrace2/graph/self-component-port-input-message-iterator.h>
-#include <babeltrace2/graph/port-output-message-iterator.h>
 #include <babeltrace2/graph/message-event-const.h>
 #include <babeltrace2/graph/message-message-iterator-inactivity-const.h>
 #include <babeltrace2/graph/message-packet-beginning.h>
@@ -103,21 +102,6 @@ void set_self_comp_port_input_msg_iterator_state(
        iterator->state = state;
 }
 
-static
-void destroy_base_message_iterator(struct bt_object *obj)
-{
-       struct bt_message_iterator *iterator = (void *) obj;
-
-       BT_ASSERT(iterator);
-
-       if (iterator->msgs) {
-               g_ptr_array_free(iterator->msgs, TRUE);
-               iterator->msgs = NULL;
-       }
-
-       g_free(iterator);
-}
-
 static
 void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj)
 {
@@ -172,7 +156,12 @@ void bt_self_component_port_input_message_iterator_destroy(struct bt_object *obj
                iterator->upstream_msg_iters = NULL;
        }
 
-       destroy_base_message_iterator(obj);
+       if (iterator->msgs) {
+               g_ptr_array_free(iterator->msgs, TRUE);
+               iterator->msgs = NULL;
+       }
+
+       g_free(iterator);
 }
 
 BT_HIDDEN
@@ -284,28 +273,6 @@ void bt_self_component_port_input_message_iterator_set_connection(
                "%![iter-]+i, %![conn-]+x", iterator, connection);
 }
 
-static
-int init_message_iterator(struct bt_message_iterator *iterator,
-               enum bt_message_iterator_type type,
-               bt_object_release_func destroy)
-{
-       int ret = 0;
-
-       bt_object_init_shared(&iterator->base, destroy);
-       iterator->type = type;
-       iterator->msgs = g_ptr_array_new();
-       if (!iterator->msgs) {
-               BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
-               ret = -1;
-               goto end;
-       }
-
-       g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
-
-end:
-       return ret;
-}
-
 static
 bt_bool can_seek_ns_from_origin_true(
                struct bt_self_component_port_input_message_iterator *iterator,
@@ -330,7 +297,6 @@ create_self_component_input_port_message_iterator(
        typedef enum bt_component_class_message_iterator_init_method_status (*init_method_t)(
                        void *, void *, void *);
 
-       int ret;
        init_method_t init_method = NULL;
        struct bt_self_component_port_input_message_iterator *iterator =
                NULL;
@@ -376,14 +342,15 @@ create_self_component_input_port_message_iterator(
                goto error;
        }
 
-       ret = init_message_iterator((void *) iterator,
-               BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
+       bt_object_init_shared(&iterator->base,
                bt_self_component_port_input_message_iterator_destroy);
-       if (ret) {
-               /* init_message_iterator() logs errors */
+       iterator->msgs = g_ptr_array_new();
+       if (!iterator->msgs) {
+               BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate a GPtrArray.");
                goto error;
        }
 
+       g_ptr_array_set_size(iterator->msgs, MSG_BATCH_SIZE);
        iterator->last_ns_from_origin = INT64_MIN;
        iterator->auto_seek.msgs = g_queue_new();
        if (!iterator->auto_seek.msgs) {
@@ -900,7 +867,7 @@ bt_self_component_port_input_message_iterator_next(
         */
        *user_count = 0;
        status = (int) call_iterator_next_method(iterator,
-               (void *) iterator->base.msgs->pdata, MSG_BATCH_SIZE,
+               (void *) iterator->msgs->pdata, MSG_BATCH_SIZE,
                user_count);
        BT_LOGD("User method returned: status=%s, msg-count=%" PRIu64,
                bt_common_func_status_string(status), *user_count);
@@ -930,7 +897,7 @@ bt_self_component_port_input_message_iterator_next(
                        "Invalid returned message count: greater than "
                        "batch size: count=%" PRIu64 ", batch-size=%u",
                        *user_count, MSG_BATCH_SIZE);
-               *msgs = (void *) iterator->base.msgs->pdata;
+               *msgs = (void *) iterator->msgs->pdata;
                break;
        case BT_FUNC_STATUS_AGAIN:
                goto end;
@@ -947,46 +914,6 @@ end:
        return status;
 }
 
-enum bt_message_iterator_next_status bt_port_output_message_iterator_next(
-               struct bt_port_output_message_iterator *iterator,
-               bt_message_array_const *msgs_to_user,
-               uint64_t *count_to_user)
-{
-       enum bt_message_iterator_next_status status;
-       int graph_status;
-
-       BT_ASSERT_PRE_DEV_NON_NULL(iterator, "Message iterator");
-       BT_ASSERT_PRE_DEV_NON_NULL(msgs_to_user, "Message array (output)");
-       BT_ASSERT_PRE_DEV_NON_NULL(count_to_user, "Message count (output)");
-       BT_LIB_LOGD("Getting next output port message iterator's messages: "
-               "%!+i", iterator);
-       graph_status = bt_graph_consume_sink_no_check(iterator->graph,
-               iterator->colander);
-       switch (graph_status) {
-       case BT_FUNC_STATUS_AGAIN:
-       case BT_FUNC_STATUS_END:
-       case BT_FUNC_STATUS_MEMORY_ERROR:
-               status = (int) graph_status;
-               break;
-       case BT_FUNC_STATUS_OK:
-               status = BT_FUNC_STATUS_OK;
-
-               /*
-                * On success, the colander sink moves the messages
-                * to this iterator's array and sets this iterator's
-                * message count: move them to the user.
-                */
-               *msgs_to_user = (void *) iterator->base.msgs->pdata;
-               *count_to_user = iterator->count;
-               break;
-       default:
-               /* Other errors */
-               status = BT_FUNC_STATUS_ERROR;
-       }
-
-       return status;
-}
-
 struct bt_component *
 bt_self_component_port_input_message_iterator_borrow_component(
                struct bt_self_component_port_input_message_iterator *iterator)
@@ -1023,172 +950,6 @@ struct bt_self_port_output *bt_self_message_iterator_borrow_port(
        return (void *) iterator->upstream_port;
 }
 
-static
-void bt_port_output_message_iterator_destroy(struct bt_object *obj)
-{
-       struct bt_port_output_message_iterator *iterator = (void *) obj;
-
-       BT_LIB_LOGI("Destroying output port message iterator object: %!+i",
-               iterator);
-       BT_LOGD_STR("Putting graph.");
-       BT_OBJECT_PUT_REF_AND_RESET(iterator->graph);
-       BT_LOGD_STR("Putting colander sink component.");
-       BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
-       destroy_base_message_iterator(obj);
-}
-
-struct bt_port_output_message_iterator *
-bt_port_output_message_iterator_create(struct bt_graph *graph,
-               const struct bt_port_output *output_port)
-{
-       struct bt_port_output_message_iterator *iterator = NULL;
-       struct bt_component_class_sink *colander_comp_cls = NULL;
-       struct bt_component *output_port_comp = NULL;
-       struct bt_component_sink *colander_comp;
-       int graph_status;
-       struct bt_port_input *colander_in_port = NULL;
-       struct bt_component_class_sink_colander_data colander_data;
-       int ret;
-
-       BT_ASSERT_PRE_NON_NULL(graph, "Graph");
-       BT_ASSERT_PRE_NON_NULL(output_port, "Output port");
-       output_port_comp = bt_port_borrow_component_inline(
-               (const void *) output_port);
-       BT_ASSERT_PRE(output_port_comp,
-               "Output port has no component: %!+p", output_port);
-       BT_ASSERT_PRE(bt_component_borrow_graph(output_port_comp) ==
-               (void *) graph,
-               "Output port is not part of graph: %![graph-]+g, %![port-]+p",
-               graph, output_port);
-       BT_ASSERT_PRE(!graph->has_sink,
-               "Graph already has a sink component: %![graph-]+g");
-
-       /* Create message iterator */
-       BT_LIB_LOGI("Creating message iterator on output port: "
-               "%![port-]+p, %![comp-]+c", output_port, output_port_comp);
-       iterator = g_new0(struct bt_port_output_message_iterator, 1);
-       if (!iterator) {
-               BT_LIB_LOGE_APPEND_CAUSE(
-                       "Failed to allocate one output port message iterator.");
-               goto error;
-       }
-
-       ret = init_message_iterator((void *) iterator,
-               BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
-               bt_port_output_message_iterator_destroy);
-       if (ret) {
-               /* init_message_iterator() logs errors */
-               BT_OBJECT_PUT_REF_AND_RESET(iterator);
-               goto end;
-       }
-
-       /* Create colander component */
-       colander_comp_cls = bt_component_class_sink_colander_get();
-       if (!colander_comp_cls) {
-               /* bt_component_class_sink_colander_get() logs errors */
-               BT_LIB_LOGE_APPEND_CAUSE(
-                       "Cannot get colander sink component class.");
-               goto error;
-       }
-
-       iterator->graph = graph;
-       bt_object_get_no_null_check(iterator->graph);
-       colander_data.msgs = (void *) iterator->base.msgs->pdata;
-       colander_data.count_addr = &iterator->count;
-
-       /*
-        * Hope that nobody uses this very unique name.
-        *
-        * We pass `BT_LOGGING_LEVEL_NONE` but the colander component
-        * class module does not use this level anyway since it belongs
-        * to the library.
-        */
-       graph_status = bt_graph_add_sink_component_with_init_method_data(
-                       (void *) graph, colander_comp_cls,
-                       "colander-36ac3409-b1a8-4d60-ab1f-4fdf341a8fb1",
-                       NULL, &colander_data, BT_LOGGING_LEVEL_NONE,
-                       (void *) &iterator->colander);
-       if (graph_status != BT_FUNC_STATUS_OK) {
-               BT_LIB_LOGE_APPEND_CAUSE(
-                       "Cannot add colander sink component to graph: "
-                       "%![graph-]+g, status=%s", graph,
-                       bt_common_func_status_string(graph_status));
-               goto error;
-       }
-
-       /*
-        * Connect provided output port to the colander component's
-        * input port.
-        */
-       colander_in_port =
-               (void *) bt_component_sink_borrow_input_port_by_index_const(
-                       (void *) iterator->colander, 0);
-       BT_ASSERT(colander_in_port);
-       graph_status = bt_graph_connect_ports(graph,
-               output_port, colander_in_port, NULL);
-       if (graph_status != BT_FUNC_STATUS_OK) {
-               BT_LIB_LOGW_APPEND_CAUSE(
-                       "Cannot connect colander sink's port: "
-                       "%![graph-]+g, %![comp-]+c, status=%s", graph,
-                       iterator->colander,
-                       bt_common_func_status_string(graph_status));
-               goto error;
-       }
-
-       /*
-        * At this point everything went fine. Make the graph
-        * nonconsumable forever so that only this message iterator
-        * can consume (thanks to bt_graph_consume_sink_no_check()).
-        * This avoids leaking the message created by the colander
-        * sink and moved to the message iterator's message
-        * member.
-        */
-       bt_graph_set_can_consume(iterator->graph, false);
-
-       /* Also set the graph as being configured. */
-       graph_status = bt_graph_configure(graph);
-       if (graph_status != BT_FUNC_STATUS_OK) {
-               BT_LIB_LOGW_APPEND_CAUSE(
-                       "Cannot configure graph after having "
-                       "added and connected colander sink: "
-                       "%![graph-]+g, %![comp-]+c, status=%s", graph,
-                       iterator->colander,
-                       bt_common_func_status_string(graph_status));
-               goto error;
-       }
-       goto end;
-
-error:
-       if (iterator && iterator->graph && iterator->colander) {
-               int ret;
-
-               /* Remove created colander component from graph if any */
-               colander_comp = iterator->colander;
-               BT_OBJECT_PUT_REF_AND_RESET(iterator->colander);
-
-               /*
-                * At this point the colander component's reference
-                * count is 0 because iterator->colander was the only
-                * owner. We also know that it is not connected because
-                * this is the last operation before this function
-                * succeeds.
-                *
-                * Since we honor the preconditions here,
-                * bt_graph_remove_unconnected_component() always
-                * succeeds.
-                */
-               ret = bt_graph_remove_unconnected_component(iterator->graph,
-                       (void *) colander_comp);
-               BT_ASSERT(ret == 0);
-       }
-
-       BT_OBJECT_PUT_REF_AND_RESET(iterator);
-
-end:
-       bt_object_put_ref(colander_comp_cls);
-       return (void *) iterator;
-}
-
 bt_bool bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
                struct bt_self_component_port_input_message_iterator *iterator,
                int64_t ns_from_origin)
@@ -2063,60 +1824,6 @@ end:
        return status;
 }
 
-static inline
-bt_self_component_port_input_message_iterator *
-borrow_output_port_message_iterator_upstream_iterator(
-               struct bt_port_output_message_iterator *iterator)
-{
-       struct bt_component_class_sink_colander_priv_data *colander_data;
-
-       BT_ASSERT(iterator);
-       colander_data = (void *) iterator->colander->parent.user_data;
-       BT_ASSERT(colander_data);
-       BT_ASSERT(colander_data->msg_iter);
-       return colander_data->msg_iter;
-}
-
-bt_bool bt_port_output_message_iterator_can_seek_ns_from_origin(
-               struct bt_port_output_message_iterator *iterator,
-               int64_t ns_from_origin)
-{
-       BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
-       return bt_self_component_port_input_message_iterator_can_seek_ns_from_origin(
-               borrow_output_port_message_iterator_upstream_iterator(
-                       iterator), ns_from_origin);
-}
-
-bt_bool bt_port_output_message_iterator_can_seek_beginning(
-               struct bt_port_output_message_iterator *iterator)
-{
-       BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
-       return bt_self_component_port_input_message_iterator_can_seek_beginning(
-               borrow_output_port_message_iterator_upstream_iterator(
-                       iterator));
-}
-
-enum bt_message_iterator_seek_ns_from_origin_status
-bt_port_output_message_iterator_seek_ns_from_origin(
-               struct bt_port_output_message_iterator *iterator,
-               int64_t ns_from_origin)
-{
-       BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
-       return bt_self_component_port_input_message_iterator_seek_ns_from_origin(
-               borrow_output_port_message_iterator_upstream_iterator(iterator),
-               ns_from_origin);
-}
-
-enum bt_message_iterator_seek_beginning_status
-bt_port_output_message_iterator_seek_beginning(
-               struct bt_port_output_message_iterator *iterator)
-{
-       BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
-       return bt_self_component_port_input_message_iterator_seek_beginning(
-               borrow_output_port_message_iterator_upstream_iterator(
-                       iterator));
-}
-
 bt_bool bt_self_message_iterator_is_interrupted(
                const struct bt_self_message_iterator *self_msg_iter)
 {
@@ -2127,18 +1834,6 @@ bt_bool bt_self_message_iterator_is_interrupted(
        return (bt_bool) bt_graph_is_interrupted(iterator->graph);
 }
 
-void bt_port_output_message_iterator_get_ref(
-               const struct bt_port_output_message_iterator *iterator)
-{
-       bt_object_get_ref(iterator);
-}
-
-void bt_port_output_message_iterator_put_ref(
-               const struct bt_port_output_message_iterator *iterator)
-{
-       bt_object_put_ref(iterator);
-}
-
 void bt_self_component_port_input_message_iterator_get_ref(
                const struct bt_self_component_port_input_message_iterator *iterator)
 {
index f48872ece5dde7d748f895ad430bf2dff8754280..82689f61da6674a5975c22ea8bc217527e30ddda 100644 (file)
 struct bt_port;
 struct bt_graph;
 
-enum bt_message_iterator_type {
-       BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT,
-       BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT,
-};
-
 enum bt_self_component_port_input_message_iterator_state {
        /* Iterator is not initialized */
        BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_NON_INITIALIZED,
@@ -70,12 +65,6 @@ enum bt_self_component_port_input_message_iterator_state {
        BT_SELF_COMPONENT_PORT_INPUT_MESSAGE_ITERATOR_STATE_LAST_SEEKING_RETURNED_ERROR,
 };
 
-struct bt_message_iterator {
-       struct bt_object base;
-       enum bt_message_iterator_type type;
-       GPtrArray *msgs;
-};
-
 typedef enum bt_component_class_message_iterator_next_method_status
 (*bt_self_component_port_input_message_iterator_next_method)(
                void *, bt_message_array_const, uint64_t, uint64_t *);
@@ -97,7 +86,8 @@ typedef bt_bool
                void *);
 
 struct bt_self_component_port_input_message_iterator {
-       struct bt_message_iterator base;
+       struct bt_object base;
+       GPtrArray *msgs;
        struct bt_component *upstream_component; /* Weak */
        struct bt_port *upstream_port; /* Weak */
        struct bt_connection *connection; /* Weak */
@@ -199,18 +189,6 @@ struct bt_self_component_port_input_message_iterator {
        void *user_data;
 };
 
-struct bt_port_output_message_iterator {
-       struct bt_message_iterator base;
-       struct bt_graph *graph; /* Owned by this */
-       struct bt_component_sink *colander; /* Owned by this */
-
-       /*
-        * Only used temporarily as a bridge between a colander sink and
-        * the user.
-        */
-       uint64_t count;
-};
-
 BT_HIDDEN
 void bt_self_component_port_input_message_iterator_try_finalize(
                struct bt_self_component_port_input_message_iterator *iterator);
index e9afb80656da61d7bb1cdbbc5dbdd28183e540d4..ce261a1a89ab0d19f5cf87f9a33b086b515e4b57 100644 (file)
@@ -1153,66 +1153,34 @@ static inline void format_message_iterator(char **buf_ch,
                bool extended, const char *prefix,
                const struct bt_message_iterator *iterator)
 {
-       const char *type;
        char tmp_prefix[TMP_PREFIX_LEN];
+       const struct bt_self_component_port_input_message_iterator *
+               port_in_iter = (const void *) iterator;
 
-       if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT) {
-               type = "BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT";
-       } else if (iterator->type == BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT) {
-               type = "BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT";
-       } else {
-               type = "(unknown)";
+       if (port_in_iter->upstream_component) {
+               SET_TMP_PREFIX("upstream-comp-");
+               format_component(buf_ch, false, tmp_prefix,
+                       port_in_iter->upstream_component);
        }
 
-       BUF_APPEND(", %stype=%s", PRFIELD(type));
-
-       switch (iterator->type) {
-       case BT_MESSAGE_ITERATOR_TYPE_SELF_COMPONENT_PORT_INPUT:
-       {
-               const struct bt_self_component_port_input_message_iterator *
-                       port_in_iter = (const void *) iterator;
-
-               if (port_in_iter->upstream_component) {
-                       SET_TMP_PREFIX("upstream-comp-");
-                       format_component(buf_ch, false, tmp_prefix,
-                               port_in_iter->upstream_component);
-               }
-
-               if (port_in_iter->upstream_port) {
-                       SET_TMP_PREFIX("upstream-port-");
-                       format_port(buf_ch, false, tmp_prefix,
-                               port_in_iter->upstream_port);
-               }
-
-               if (port_in_iter->connection) {
-                       SET_TMP_PREFIX("upstream-conn-");
-                       format_connection(buf_ch, false, tmp_prefix,
-                               port_in_iter->connection);
-               }
-               break;
+       if (!extended) {
+               goto end;
        }
-       case BT_MESSAGE_ITERATOR_TYPE_PORT_OUTPUT:
-       {
-               const struct bt_port_output_message_iterator *port_out_iter =
-                       (const void *) iterator;
 
-               if (port_out_iter->graph) {
-                       SET_TMP_PREFIX("graph-");
-                       format_graph(buf_ch, false, tmp_prefix,
-                               port_out_iter->graph);
-               }
-
-               if (port_out_iter->colander) {
-                       SET_TMP_PREFIX("colander-comp-");
-                       format_component(buf_ch, false, tmp_prefix,
-                               (void *) port_out_iter->colander);
-               }
-
-               break;
+       if (port_in_iter->upstream_port) {
+               SET_TMP_PREFIX("upstream-port-");
+               format_port(buf_ch, false, tmp_prefix,
+                       port_in_iter->upstream_port);
        }
-       default:
-               break;
+
+       if (port_in_iter->connection) {
+               SET_TMP_PREFIX("upstream-conn-");
+               format_connection(buf_ch, false, tmp_prefix,
+                       port_in_iter->connection);
        }
+
+end:
+       return;
 }
 
 static inline void format_plugin(char **buf_ch, bool extended,
index 48ae9602fb7300858253605c92d65933254dcfc6..3805c0d56168f87a18d94e935fe1f855f54c2f2b 100644 (file)
@@ -20,7 +20,7 @@ import unittest
 import uuid
 import copy
 import bt2
-from utils import run_in_component_init
+from utils import run_in_component_init, TestOutputPortMessageIterator
 
 
 class ClockClassOffsetTestCase(unittest.TestCase):
@@ -244,8 +244,8 @@ class ClockSnapshotTestCase(unittest.TestCase):
 
         self._graph = bt2.Graph()
         self._src_comp = self._graph.add_component(MySrc, 'my_source')
-        self._msg_iter = self._graph.create_output_port_message_iterator(
-            self._src_comp.output_ports['out']
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
         )
 
         for i, msg in enumerate(self._msg_iter):
index ea65b5909dfcbd8095628486ff6b5075adaa19c9..b146f2615f7146f5444e60ab5780821b34cc92b7 100644 (file)
@@ -19,6 +19,7 @@
 from collections import OrderedDict
 import unittest
 import bt2
+from utils import TestOutputPortMessageIterator
 
 
 class EventTestCase(unittest.TestCase):
@@ -153,8 +154,8 @@ class EventTestCase(unittest.TestCase):
         test_obj = self
         self._graph = bt2.Graph()
         self._src_comp = self._graph.add_component(MySrc, 'my_source')
-        self._msg_iter = self._graph.create_output_port_message_iterator(
-            self._src_comp.output_ports['out']
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
         )
 
         for msg in self._msg_iter:
index 78e8dec066592aae1ad15ffa293d28ba9e240203..ef8e6b800a270eef25ab1c15ec4d09532473fcfe 100644 (file)
@@ -19,6 +19,7 @@
 import collections
 import unittest
 import bt2
+from utils import TestOutputPortMessageIterator
 
 
 class AllMessagesTestCase(unittest.TestCase):
@@ -148,8 +149,8 @@ class AllMessagesTestCase(unittest.TestCase):
     def test_all_msg_with_cc(self):
         params = {'with_cc': True}
         self._src_comp = self._graph.add_component(self._src, 'my_source', params)
-        self._msg_iter = self._graph.create_output_port_message_iterator(
-            self._src_comp.output_ports['out']
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
         )
 
         for i, msg in enumerate(self._msg_iter):
@@ -204,8 +205,8 @@ class AllMessagesTestCase(unittest.TestCase):
     def test_all_msg_without_cc(self):
         params = {'with_cc': False}
         self._src_comp = self._graph.add_component(self._src, 'my_source', params)
-        self._msg_iter = self._graph.create_output_port_message_iterator(
-            self._src_comp.output_ports['out']
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
         )
 
         for i, msg in enumerate(self._msg_iter):
@@ -273,8 +274,8 @@ class AllMessagesTestCase(unittest.TestCase):
         params = {'with_cc': True, 'with_stream_msgs_clock_snapshots': True}
 
         self._src_comp = self._graph.add_component(self._src, 'my_source', params)
-        self._msg_iter = self._graph.create_output_port_message_iterator(
-            self._src_comp.output_ports['out']
+        self._msg_iter = TestOutputPortMessageIterator(
+            self._graph, self._src_comp.output_ports['out']
         )
         msgs = list(self._msg_iter)
 
index 43af1e1f4d8457807f70f5bb10c7615499c3a6f5..3cd27e5b77325536a7c8735756edd539bf76ec01 100644 (file)
@@ -21,6 +21,7 @@ import collections
 import unittest
 import copy
 import bt2
+from utils import TestOutputPortMessageIterator
 
 
 class UserMessageIteratorTestCase(unittest.TestCase):
@@ -196,7 +197,7 @@ class UserMessageIteratorTestCase(unittest.TestCase):
 
         graph = bt2.Graph()
         src = graph.add_component(MySource, 'src')
-        it = graph.create_output_port_message_iterator(src.output_ports['out'])
+        it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
 
         # Skip beginning messages.
         msg = next(it)
@@ -212,7 +213,7 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         self.assertEqual(msg_ev1.addr, msg_ev2.addr)
 
     @staticmethod
-    def _setup_seek_beginning_test():
+    def _setup_seek_beginning_test(sink_cls):
         # Use a source, a filter and an output port iterator.  This allows us
         # to test calling `seek_beginning` on both a _OutputPortMessageIterator
         # and a _UserComponentInputPortMessageIterator, on top of checking that
@@ -279,25 +280,42 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         graph = bt2.Graph()
         src = graph.add_component(MySource, 'src')
         flt = graph.add_component(MyFilter, 'flt')
+        sink = graph.add_component(sink_cls, 'sink')
         graph.connect_ports(src.output_ports['out'], flt.input_ports['in'])
-        it = graph.create_output_port_message_iterator(flt.output_ports['out'])
-
-        return it, MySourceIter
+        graph.connect_ports(flt.output_ports['out'], sink.input_ports['in'])
+        return MySourceIter, graph
 
     def test_can_seek_beginning(self):
-        it, MySourceIter = self._setup_seek_beginning_test()
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params, obj):
+                self._add_input_port('in')
+
+            def _user_graph_is_configured(self):
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
+
+            def _user_consume(self):
+                nonlocal can_seek_beginning
+                can_seek_beginning = self._msg_iter.can_seek_beginning
+
+        MySourceIter, graph = self._setup_seek_beginning_test(MySink)
 
         def _user_can_seek_beginning(self):
-            nonlocal can_seek_beginning
-            return can_seek_beginning
+            nonlocal input_port_iter_can_seek_beginning
+            return input_port_iter_can_seek_beginning
 
         MySourceIter._user_can_seek_beginning = property(_user_can_seek_beginning)
 
-        can_seek_beginning = True
-        self.assertTrue(it.can_seek_beginning)
+        input_port_iter_can_seek_beginning = True
+        can_seek_beginning = None
+        graph.run_once()
+        self.assertTrue(can_seek_beginning)
 
-        can_seek_beginning = False
-        self.assertFalse(it.can_seek_beginning)
+        input_port_iter_can_seek_beginning = False
+        can_seek_beginning = None
+        graph.run_once()
+        self.assertFalse(can_seek_beginning)
 
         # Once can_seek_beginning returns an error, verify that it raises when
         # _can_seek_beginning has/returns the wrong type.
@@ -306,42 +324,62 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         # a _seek_beginning method to know whether the iterator can seek to
         # beginning or not.
         del MySourceIter._user_can_seek_beginning
-        self.assertTrue(it.can_seek_beginning)
+        can_seek_beginning = None
+        graph.run_once()
+        self.assertTrue(can_seek_beginning)
 
         del MySourceIter._user_seek_beginning
-        self.assertFalse(it.can_seek_beginning)
+        can_seek_beginning = None
+        graph.run_once()
+        self.assertFalse(can_seek_beginning)
 
     def test_seek_beginning(self):
-        it, MySourceIter = self._setup_seek_beginning_test()
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params, obj):
+                self._add_input_port('in')
 
-        msg = next(it)
+            def _user_graph_is_configured(self):
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
+
+            def _user_consume(self):
+                nonlocal do_seek_beginning
+                nonlocal msg
+
+                if do_seek_beginning:
+                    self._msg_iter.seek_beginning()
+                    return
+
+                msg = next(self._msg_iter)
+
+        do_seek_beginning = False
+        msg = None
+        MySourceIter, graph = self._setup_seek_beginning_test(MySink)
+        graph.run_once()
         self.assertIsInstance(msg, bt2._StreamBeginningMessage)
-        msg = next(it)
+        graph.run_once()
         self.assertIsInstance(msg, bt2._PacketBeginningMessage)
+        do_seek_beginning = True
+        graph.run_once()
+        do_seek_beginning = False
+        graph.run_once()
+        self.assertIsInstance(msg, bt2._StreamBeginningMessage)
 
-        it.seek_beginning()
+    def test_seek_beginning_user_error(self):
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params, obj):
+                self._add_input_port('in')
 
-        msg = next(it)
-        self.assertIsInstance(msg, bt2._StreamBeginningMessage)
+            def _user_graph_is_configured(self):
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
 
-        # Verify that we can seek beginning after having reached the end.
-        #
-        # It currently does not work to seek an output port message iterator
-        # once it's ended, but we should eventually make it work and uncomment
-        # the following snippet.
-        #
-        # try:
-        #    while True:
-        #        next(it)
-        # except bt2.Stop:
-        #    pass
-        #
-        # it.seek_beginning()
-        # msg = next(it)
-        # self.assertIsInstance(msg, bt2._StreamBeginningMessage)
+            def _user_consume(self):
+                self._msg_iter.seek_beginning()
 
-    def test_seek_beginning_user_error(self):
-        it, MySourceIter = self._setup_seek_beginning_test()
+        MySourceIter, graph = self._setup_seek_beginning_test(MySink)
 
         def _user_seek_beginning_error(self):
             raise ValueError('ouch')
@@ -349,7 +387,7 @@ class UserMessageIteratorTestCase(unittest.TestCase):
         MySourceIter._user_seek_beginning = _user_seek_beginning_error
 
         with self.assertRaises(bt2._Error):
-            it.seek_beginning()
+            graph.run_once()
 
     # Try consuming many times from an iterator that always returns TryAgain.
     # This verifies that we are not missing an incref of Py_None, making the
@@ -365,7 +403,7 @@ class UserMessageIteratorTestCase(unittest.TestCase):
 
         graph = bt2.Graph()
         src = graph.add_component(MySource, 'src')
-        it = graph.create_output_port_message_iterator(src.output_ports['out'])
+        it = TestOutputPortMessageIterator(graph, src.output_ports['out'])
 
         # The initial refcount of Py_None was in the 7000, so 100000 iterations
         # should be enough to catch the bug even if there are small differences
@@ -375,77 +413,5 @@ class UserMessageIteratorTestCase(unittest.TestCase):
                 next(it)
 
 
-class OutputPortMessageIteratorTestCase(unittest.TestCase):
-    def test_component(self):
-        class MyIter(bt2._UserMessageIterator):
-            def __init__(self, self_port_output):
-                self._at = 0
-
-            def __next__(self):
-                if self._at == 7:
-                    raise bt2.Stop
-
-                if self._at == 0:
-                    msg = self._create_stream_beginning_message(test_obj._stream)
-                elif self._at == 1:
-                    msg = self._create_packet_beginning_message(test_obj._packet)
-                elif self._at == 5:
-                    msg = self._create_packet_end_message(test_obj._packet)
-                elif self._at == 6:
-                    msg = self._create_stream_end_message(test_obj._stream)
-                else:
-                    msg = self._create_event_message(
-                        test_obj._event_class, test_obj._packet
-                    )
-                    msg.event.payload_field['my_int'] = self._at * 3
-
-                self._at += 1
-                return msg
-
-        class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
-            def __init__(self, params, obj):
-                self._add_output_port('out')
-
-                trace_class = self._create_trace_class()
-                stream_class = trace_class.create_stream_class(supports_packets=True)
-
-                # Create payload field class
-                my_int_ft = trace_class.create_signed_integer_field_class(32)
-                payload_ft = trace_class.create_structure_field_class()
-                payload_ft += [('my_int', my_int_ft)]
-
-                event_class = stream_class.create_event_class(
-                    name='salut', payload_field_class=payload_ft
-                )
-
-                trace = trace_class()
-                stream = trace.create_stream(stream_class)
-                packet = stream.create_packet()
-
-                test_obj._event_class = event_class
-                test_obj._stream = stream
-                test_obj._packet = packet
-
-        test_obj = self
-        graph = bt2.Graph()
-        src = graph.add_component(MySource, 'src')
-        msg_iter = graph.create_output_port_message_iterator(src.output_ports['out'])
-
-        for at, msg in enumerate(msg_iter):
-            if at == 0:
-                self.assertIsInstance(msg, bt2._StreamBeginningMessage)
-            elif at == 1:
-                self.assertIsInstance(msg, bt2._PacketBeginningMessage)
-            elif at == 5:
-                self.assertIsInstance(msg, bt2._PacketEndMessage)
-            elif at == 6:
-                self.assertIsInstance(msg, bt2._StreamEndMessage)
-            else:
-                self.assertIsInstance(msg, bt2._EventMessage)
-                self.assertEqual(msg.event.cls.name, 'salut')
-                field = msg.event.payload_field['my_int']
-                self.assertEqual(field, at * 3)
-
-
 if __name__ == '__main__':
     unittest.main()
index 1f9a6c2768ae2e0635d55c136425b5250ac8e030..7707a531d1a7576c9d32328dda43cf2ed831eea1 100644 (file)
 #
 
 import bt2
+import collections.abc
 
 # Run callable `func` in the context of a component's __init__ method.  The
 # callable is passed the Component being instantiated.
 #
 # The value returned by the callable is returned by run_in_component_init.
-
-
 def run_in_component_init(func):
     class MySink(bt2._UserSinkComponent):
         def __init__(self, params, obj):
@@ -52,10 +51,56 @@ def run_in_component_init(func):
 
 
 # Create an empty trace class with default values.
-
-
 def get_default_trace_class():
     def f(comp_self):
         return comp_self._create_trace_class()
 
     return run_in_component_init(f)
+
+
+# Proxy sink component class.
+#
+# This sink accepts a list of a single item as its initialization
+# object. This sink creates a single input port `in`. When it consumes
+# from this port, it puts the returned message in the initialization
+# list as the first item.
+class TestProxySink(bt2._UserSinkComponent):
+    def __init__(self, params, msg_list):
+        assert msg_list is not None
+        self._msg_list = msg_list
+        self._add_input_port('in')
+
+    def _user_graph_is_configured(self):
+        self._msg_iter = self._create_input_port_message_iterator(
+            self._input_ports['in']
+        )
+
+    def _user_consume(self):
+        assert self._msg_list[0] is None
+        self._msg_list[0] = next(self._msg_iter)
+
+
+# This is a helper message iterator for tests.
+#
+# The constructor accepts a graph and an output port.
+#
+# Internally, it adds a proxy sink to the graph and connects the
+# received output port to the proxy sink's input port. Its __next__()
+# method then uses the proxy sink to transfer the consumed message to
+# the output port message iterator's user.
+#
+# This message iterator cannot seek.
+class TestOutputPortMessageIterator(collections.abc.Iterator):
+    def __init__(self, graph, output_port):
+        self._graph = graph
+        self._msg_list = [None]
+        sink = graph.add_component(TestProxySink, 'test-proxy-sink', obj=self._msg_list)
+        graph.connect_ports(output_port, sink.input_ports['in'])
+
+    def __next__(self):
+        assert self._msg_list[0] is None
+        self._graph.run_once()
+        msg = self._msg_list[0]
+        assert msg is not None
+        self._msg_list[0] = None
+        return msg
This page took 0.041301 seconds and 4 git commands to generate.