from bt2 import message_iterator as bt2_message_iterator
from bt2 import logging as bt2_logging
from bt2 import port as bt2_port
+from bt2 import component as bt2_component
import datetime
from collections import namedtuple
import numbers
FILTER = 1
+class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
+ def __init__(self, params, msg_list):
+ assert type(msg_list) is list
+ self._msg_list = msg_list
+ self._add_input_port('in')
+
+ def _user_graph_is_configured(self):
+ self._msg_iter = self._create_input_port_message_iterator(
+ self._input_ports['in']
+ )
+
+ def _user_consume(self):
+ assert self._msg_list[0] is None
+ self._msg_list[0] = next(self._msg_iter)
+
+
class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
def __init__(
self,
self._stream_intersection_mode = stream_intersection_mode
self._begin_ns = _get_ns(begin)
self._end_ns = _get_ns(end)
+ self._msg_list = [None]
if type(source_component_specs) is ComponentSpec:
source_component_specs = [source_component_specs]
)
def __next__(self):
- return next(self._msg_iter)
+ assert self._msg_list[0] is None
+ self._graph.run_once()
+ msg = self._msg_list[0]
+ assert msg is not None
+ self._msg_list[0] = None
+ return msg
def _create_stream_intersection_trimmer(self, component, port):
# find the original parameters specified by the user to create
self._graph.connect_ports(
self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in']
)
- msg_iter_port = trimmer_comp.output_ports['out']
+ last_flt_out_port = trimmer_comp.output_ports['out']
else:
- msg_iter_port = self._muxer_comp.output_ports['out']
+ last_flt_out_port = self._muxer_comp.output_ports['out']
# create extra filter components (chained)
for comp_spec in self._flt_comp_specs:
for comp_and_spec in self._flt_comps_and_specs:
in_port = list(comp_and_spec.comp.input_ports.values())[0]
out_port = list(comp_and_spec.comp.output_ports.values())[0]
- self._graph.connect_ports(msg_iter_port, in_port)
- msg_iter_port = out_port
+ self._graph.connect_ports(last_flt_out_port, in_port)
+ last_flt_out_port = out_port
# Here we create the components, self._graph_port_added() is
# called when they add ports, but the callback returns early
self._connect_src_comp_port(comp_and_spec.comp, out_port)
- # create this trace collection iterator's message iterator
- self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port)
+ # Add the proxy sink, passing our message list to share consumed
+ # messages with this trace collection message iterator.
+ sink = self._graph.add_component(
+ _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list
+ )
+ sink_in_port = sink.input_ports['in']
+
+ # connect last filter to proxy sink
+ self._graph.connect_ports(last_flt_out_port, sink_in_port)