self._stream_inter_port_to_range = {}
for src_comp_and_spec in self._src_comps_and_specs:
- try:
- inputs = src_comp_and_spec.spec.params['inputs']
- except KeyError as e:
- raise ValueError(
- 'all source components must be created with an "inputs" parameter in stream intersection mode'
- ) from e
-
- params = {'inputs': inputs}
-
# query the port's component for the `babeltrace.trace-info`
# object which contains the range for each stream, from which we can
# compute the intersection of the streams in each trace.
query_exec = bt2.QueryExecutor(
- src_comp_and_spec.spec.component_class, 'babeltrace.trace-info', params
+ src_comp_and_spec.spec.component_class,
+ 'babeltrace.trace-info',
+ src_comp_and_spec.spec.params,
)
trace_infos = query_exec.query()
hist = _count_msgs_by_type(msgs)
self.assertEqual(hist[bt2._EventMessage], 3)
- def test_iter_intersection_no_inputs_param(self):
+ def test_iter_intersection_params(self):
+ # Check that all params used to create the source component are passed
+ # to the `babeltrace.trace-info` query.
specs = [
bt2.ComponentSpec.from_named_plugin_and_component_class(
- 'text', 'dmesg', {'read-from-stdin': True}
+ 'ctf',
+ 'fs',
+ {
+ 'inputs': [_3EVENTS_INTERSECT_TRACE_PATH],
+ 'clock-class-offset-s': 1000,
+ },
)
]
- with self.assertRaises(ValueError):
- bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True)
+ msg_iter = bt2.TraceCollectionMessageIterator(
+ specs, stream_intersection_mode=True
+ )
+
+ event_msgs = [x for x in msg_iter if type(x) is bt2._EventMessage]
+ self.assertEqual(len(event_msgs), 3)
+ self.assertEqual(
+ event_msgs[0].default_clock_snapshot.ns_from_origin, 13516309000000071
+ )
+ self.assertEqual(
+ event_msgs[1].default_clock_snapshot.ns_from_origin, 13516309000000072
+ )
+ self.assertEqual(
+ event_msgs[2].default_clock_snapshot.ns_from_origin, 13516309000000082
+ )
def test_iter_no_intersection_two_traces(self):
spec = bt2.ComponentSpec.from_named_plugin_and_component_class(