bt2: make `TraceCollectionMessageIterator` not use an output port msg iter
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Sun, 4 Aug 2019 05:13:15 +0000 (01:13 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Mon, 5 Aug 2019 19:10:13 +0000 (15:10 -0400)
Because there's a plan to drop the output port message iterator concept
altogether, make `TraceCollectionMessageIterator` not use any output
port message iterator.

Instead, a `TraceCollectionMessageIterator` instance adds a proxy sink
component (`_TraceCollectionMessageIteratorProxySink`) to its graph,
connecting it to the last filter component in the chain, and sharing
with it a list having a single item.

When the sink consumes a message from its upstream message iterator, it
places it in the shared list as the first item.
TraceCollectionMessageIterator.__next__() is changed so that it calls
Graph.run_once() to make the proxy sink consume, and then reads the
consumed message from the shared list.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I4b561837a9c23d3d758ea089193cfdabf99fc27e
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1820
Tested-by: jenkins <jenkins@lttng.org>
src/bindings/python/bt2/bt2/trace_collection_message_iterator.py

index 4e7361e7ee877e4df02b1e5e005f20c74df3cc25..9c931fe2d4be568a5362cf1c020be87f8e76e00b 100644 (file)
@@ -26,6 +26,7 @@ import itertools
 from bt2 import message_iterator as bt2_message_iterator
 from bt2 import logging as bt2_logging
 from bt2 import port as bt2_port
+from bt2 import component as bt2_component
 import datetime
 from collections import namedtuple
 import numbers
@@ -102,6 +103,22 @@ class _CompClsType:
     FILTER = 1
 
 
+class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
+    def __init__(self, params, msg_list):
+        assert type(msg_list) is list
+        self._msg_list = msg_list
+        self._add_input_port('in')
+
+    def _user_graph_is_configured(self):
+        self._msg_iter = self._create_input_port_message_iterator(
+            self._input_ports['in']
+        )
+
+    def _user_consume(self):
+        assert self._msg_list[0] is None
+        self._msg_list[0] = next(self._msg_iter)
+
+
 class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
     def __init__(
         self,
@@ -115,6 +132,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
         self._stream_intersection_mode = stream_intersection_mode
         self._begin_ns = _get_ns(begin)
         self._end_ns = _get_ns(end)
+        self._msg_list = [None]
 
         if type(source_component_specs) is ComponentSpec:
             source_component_specs = [source_component_specs]
@@ -145,7 +163,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
                 )
 
     def __next__(self):
-        return next(self._msg_iter)
+        assert self._msg_list[0] is None
+        self._graph.run_once()
+        msg = self._msg_list[0]
+        assert msg is not None
+        self._msg_list[0] = None
+        return msg
 
     def _create_stream_intersection_trimmer(self, component, port):
         # find the original parameters specified by the user to create
@@ -318,9 +341,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
             self._graph.connect_ports(
                 self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in']
             )
-            msg_iter_port = trimmer_comp.output_ports['out']
+            last_flt_out_port = trimmer_comp.output_ports['out']
         else:
-            msg_iter_port = self._muxer_comp.output_ports['out']
+            last_flt_out_port = self._muxer_comp.output_ports['out']
 
         # create extra filter components (chained)
         for comp_spec in self._flt_comp_specs:
@@ -331,8 +354,8 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
         for comp_and_spec in self._flt_comps_and_specs:
             in_port = list(comp_and_spec.comp.input_ports.values())[0]
             out_port = list(comp_and_spec.comp.output_ports.values())[0]
-            self._graph.connect_ports(msg_iter_port, in_port)
-            msg_iter_port = out_port
+            self._graph.connect_ports(last_flt_out_port, in_port)
+            last_flt_out_port = out_port
 
         # Here we create the components, self._graph_port_added() is
         # called when they add ports, but the callback returns early
@@ -361,5 +384,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
 
                 self._connect_src_comp_port(comp_and_spec.comp, out_port)
 
-        # create this trace collection iterator's message iterator
-        self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port)
+        # Add the proxy sink, passing our message list to share consumed
+        # messages with this trace collection message iterator.
+        sink = self._graph.add_component(
+            _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list
+        )
+        sink_in_port = sink.input_ports['in']
+
+        # connect last filter to proxy sink
+        self._graph.connect_ports(last_flt_out_port, sink_in_port)
This page took 0.027286 seconds and 4 git commands to generate.