bt2: make `TraceCollectionMessageIterator` not use an output port msg iter
[babeltrace.git] / src / bindings / python / bt2 / bt2 / trace_collection_message_iterator.py
index 4e7361e7ee877e4df02b1e5e005f20c74df3cc25..9c931fe2d4be568a5362cf1c020be87f8e76e00b 100644 (file)
@@ -26,6 +26,7 @@ import itertools
 from bt2 import message_iterator as bt2_message_iterator
 from bt2 import logging as bt2_logging
 from bt2 import port as bt2_port
+from bt2 import component as bt2_component
 import datetime
 from collections import namedtuple
 import numbers
@@ -102,6 +103,22 @@ class _CompClsType:
     FILTER = 1
 
 
+class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
+    def __init__(self, params, msg_list):
+        assert type(msg_list) is list
+        self._msg_list = msg_list
+        self._add_input_port('in')
+
+    def _user_graph_is_configured(self):
+        self._msg_iter = self._create_input_port_message_iterator(
+            self._input_ports['in']
+        )
+
+    def _user_consume(self):
+        assert self._msg_list[0] is None
+        self._msg_list[0] = next(self._msg_iter)
+
+
 class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
     def __init__(
         self,
@@ -115,6 +132,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
         self._stream_intersection_mode = stream_intersection_mode
         self._begin_ns = _get_ns(begin)
         self._end_ns = _get_ns(end)
+        self._msg_list = [None]
 
         if type(source_component_specs) is ComponentSpec:
             source_component_specs = [source_component_specs]
@@ -145,7 +163,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
                 )
 
     def __next__(self):
-        return next(self._msg_iter)
+        assert self._msg_list[0] is None
+        self._graph.run_once()
+        msg = self._msg_list[0]
+        assert msg is not None
+        self._msg_list[0] = None
+        return msg
 
     def _create_stream_intersection_trimmer(self, component, port):
         # find the original parameters specified by the user to create
@@ -318,9 +341,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
             self._graph.connect_ports(
                 self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in']
             )
-            msg_iter_port = trimmer_comp.output_ports['out']
+            last_flt_out_port = trimmer_comp.output_ports['out']
         else:
-            msg_iter_port = self._muxer_comp.output_ports['out']
+            last_flt_out_port = self._muxer_comp.output_ports['out']
 
         # create extra filter components (chained)
         for comp_spec in self._flt_comp_specs:
@@ -331,8 +354,8 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
         for comp_and_spec in self._flt_comps_and_specs:
             in_port = list(comp_and_spec.comp.input_ports.values())[0]
             out_port = list(comp_and_spec.comp.output_ports.values())[0]
-            self._graph.connect_ports(msg_iter_port, in_port)
-            msg_iter_port = out_port
+            self._graph.connect_ports(last_flt_out_port, in_port)
+            last_flt_out_port = out_port
 
         # Here we create the components, self._graph_port_added() is
         # called when they add ports, but the callback returns early
@@ -361,5 +384,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
 
                 self._connect_src_comp_port(comp_and_spec.comp, out_port)
 
-        # create this trace collection iterator's message iterator
-        self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port)
+        # Add the proxy sink, passing our message list to share consumed
+        # messages with this trace collection message iterator.
+        sink = self._graph.add_component(
+            _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list
+        )
+        sink_in_port = sink.input_ports['in']
+
+        # connect last filter to proxy sink
+        self._graph.connect_ports(last_flt_out_port, sink_in_port)
This page took 0.025084 seconds and 4 git commands to generate.