From c1859f693ac2a28d96e7ad492f8d66f74f33686f Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Sun, 4 Aug 2019 01:13:15 -0400 Subject: [PATCH] bt2: make `TraceCollectionMessageIterator` not use an output port msg iter Because there's a plan to drop the output port message iterator concept altogether, make `TraceCollectionMessageIterator` not use any output port message iterator. Instead, a `TraceCollectionMessageIterator` instance adds a proxy sink component (`_TraceCollectionMessageIteratorProxySink`) to its graph, connecting it to the last filter component in the chain, and sharing with it a list having a single item. When the sink consumes a message from its upstream message iterator, it places it in the shared list as the first item. TraceCollectionMessageIterator.__next__() is changed so that it calls Graph.run_once() to make the proxy sink consume, and then reads the consumed message from the shared list. Signed-off-by: Philippe Proulx Change-Id: I4b561837a9c23d3d758ea089193cfdabf99fc27e Reviewed-on: https://review.lttng.org/c/babeltrace/+/1820 Tested-by: jenkins --- .../bt2/trace_collection_message_iterator.py | 44 ++++++++++++++++--- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py index 4e7361e7..9c931fe2 100644 --- a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py +++ b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py @@ -26,6 +26,7 @@ import itertools from bt2 import message_iterator as bt2_message_iterator from bt2 import logging as bt2_logging from bt2 import port as bt2_port +from bt2 import component as bt2_component import datetime from collections import namedtuple import numbers @@ -102,6 +103,22 @@ class _CompClsType: FILTER = 1 +class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): + def __init__(self, params, msg_list): + assert type(msg_list) is list + self._msg_list = msg_list + self._add_input_port('in') + + def _user_graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + def _user_consume(self): + assert self._msg_list[0] is None + self._msg_list[0] = next(self._msg_iter) + + class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): def __init__( self, @@ -115,6 +132,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._stream_intersection_mode = stream_intersection_mode self._begin_ns = _get_ns(begin) self._end_ns = _get_ns(end) + self._msg_list = [None] if type(source_component_specs) is ComponentSpec: source_component_specs = [source_component_specs] @@ -145,7 +163,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): ) def __next__(self): - return next(self._msg_iter) + assert self._msg_list[0] is None + self._graph.run_once() + msg = self._msg_list[0] + assert msg is not None + self._msg_list[0] = None + return msg def _create_stream_intersection_trimmer(self, component, port): # find the original parameters specified by the user to create @@ -318,9 +341,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._graph.connect_ports( self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] ) - msg_iter_port = trimmer_comp.output_ports['out'] + last_flt_out_port = trimmer_comp.output_ports['out'] else: - msg_iter_port = self._muxer_comp.output_ports['out'] + last_flt_out_port = self._muxer_comp.output_ports['out'] # create extra filter components (chained) for comp_spec in self._flt_comp_specs: @@ -331,8 +354,8 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): for comp_and_spec in self._flt_comps_and_specs: in_port = list(comp_and_spec.comp.input_ports.values())[0] out_port = list(comp_and_spec.comp.output_ports.values())[0] - self._graph.connect_ports(msg_iter_port, in_port) - msg_iter_port = out_port + self._graph.connect_ports(last_flt_out_port, in_port) + last_flt_out_port = out_port # Here we create the components, self._graph_port_added() is # called when they add ports, but the callback returns early @@ -361,5 +384,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._connect_src_comp_port(comp_and_spec.comp, out_port) - # create this trace collection iterator's message iterator - self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port) + # Add the proxy sink, passing our message list to share consumed + # messages with this trace collection message iterator. + sink = self._graph.add_component( + _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list + ) + sink_in_port = sink.input_ports['in'] + + # connect last filter to proxy sink + self._graph.connect_ports(last_flt_out_port, sink_in_port) -- 2.34.1