import itertools
import bt2.message_iterator
import datetime
-import collections.abc
from collections import namedtuple
import numbers
self._component_class_name = component_class_name
if type(params) is str:
- self._params = bt2.create_value({'path': params})
+ self._params = bt2.create_value({'paths': [params]})
else:
self._params = bt2.create_value(params)
class TraceCollectionMessageIterator(bt2.message_iterator._MessageIterator):
def __init__(self, source_component_specs, filter_component_specs=None,
- message_types=None, stream_intersection_mode=False,
- begin=None, end=None):
+ stream_intersection_mode=False, begin=None, end=None):
utils._check_bool(stream_intersection_mode)
self._stream_intersection_mode = stream_intersection_mode
self._begin_ns = _get_ns(begin)
self._end_ns = _get_ns(end)
- self._message_types = message_types
if type(source_component_specs) is ComponentSpec:
source_component_specs = [source_component_specs]
def __next__(self):
return next(self._msg_iter)
- def _create_stream_intersection_trimmer(self, port):
+ def _create_stream_intersection_trimmer(self, component, port):
# find the original parameters specified by the user to create
# this port's component to get the `path` parameter
for src_comp_and_spec in self._src_comps_and_specs:
- if port.component == src_comp_and_spec.comp:
- params = src_comp_and_spec.spec.params
+ if component == src_comp_and_spec.comp:
break
try:
- path = params['path']
- except:
- raise bt2.Error('all source components must be created with a "path" parameter in stream intersection mode')
+ paths = src_comp_and_spec.spec.params['paths']
+ except Exception as e:
+ raise bt2.Error('all source components must be created with a "paths" parameter in stream intersection mode') from e
- params = {'path': str(path)}
+ params = {'paths': paths}
# query the port's component for the `trace-info` object which
# contains the stream intersection range for each exposed
# trace
query_exec = bt2.QueryExecutor()
- trace_info_res = query_exec.query(port.component.component_class,
+ trace_info_res = query_exec.query(src_comp_and_spec.comp.component_class,
'trace-info', params)
begin = None
end = None
begin = range_ns['begin']
end = range_ns['end']
break
- except:
+ except Exception:
pass
if begin is None or end is None:
raise bt2.Error('cannot find stream intersection range for port "{}"'.format(port.name))
- name = 'trimmer-{}-{}'.format(port.component.name, port.name)
+ name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name)
return self._create_trimmer(begin, end, name)
def _create_muxer(self):
comp_cls = plugin.filter_component_classes['muxer']
return self._graph.add_component(comp_cls, 'muxer')
- def _create_trimmer(self, begin, end, name):
+ def _create_trimmer(self, begin_ns, end_ns, name):
plugin = bt2.find_plugin('utils')
if plugin is None:
params = {}
- if begin is not None:
- params['begin'] = begin
+ def ns_to_string(ns):
+ s_part = ns // 1000000000
+ ns_part = ns % 1000000000
+ return '{}.{:09d}'.format(s_part, ns_part)
- if end is not None:
- params['end'] = end
+ if begin_ns is not None:
+ params['begin'] = ns_to_string(begin_ns)
+
+ if end_ns is not None:
+ params['end'] = ns_to_string(end_ns)
comp_cls = plugin.filter_component_classes['trimmer']
return self._graph.add_component(comp_cls, name, params)
if not port.is_connected:
return port
- def _connect_src_comp_port(self, port):
+ def _connect_src_comp_port(self, component, port):
# if this trace collection iterator is in stream intersection
# mode, we need this connection:
#
#
# port -> muxer
if self._stream_intersection_mode:
- trimmer_comp = self._create_stream_intersection_trimmer(port)
+ trimmer_comp = self._create_stream_intersection_trimmer(component, port)
self._graph.connect_ports(port, trimmer_comp.input_ports['in'])
port_to_muxer = trimmer_comp.output_ports['out']
else:
self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port())
- def _graph_port_added(self, port):
+ def _graph_port_added(self, component, port):
if not self._connect_ports:
return
- if type(port) is bt2._InputPort:
+ if type(port) is bt2.port._InputPort:
return
- if port.component not in [comp.comp for comp in self._src_comps_and_specs]:
+ if component not in [comp.comp for comp in self._src_comps_and_specs]:
# do not care about non-source components (muxer, trimmer, etc.)
return
- self._connect_src_comp_port(port)
+ self._connect_src_comp_port(component, port)
def _build_graph(self):
self._graph = bt2.Graph()
- self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED,
- self._graph_port_added)
+ self._graph.add_port_added_listener(self._graph_port_added)
self._muxer_comp = self._create_muxer()
if self._begin_ns is not None or self._end_ns is not None:
out_ports = [port for port in comp_and_spec.comp.output_ports.values()]
for out_port in out_ports:
- if not out_port.component or out_port.is_connected:
+ if out_port.is_connected:
continue
- self._connect_src_comp_port(out_port)
+ self._connect_src_comp_port(comp_and_spec.comp, out_port)
# create this trace collection iterator's message iterator
- self._msg_iter = msg_iter_port.create_message_iterator(self._message_types)
+ self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port)
import unittest
import datetime
-import copy
-import uuid
import bt2
import os
import os.path
'3eventsintersect')
-@unittest.skip("this is broken")
class ComponentSpecTestCase(unittest.TestCase):
def test_create_good_no_params(self):
- spec = bt2.ComponentSpec('plugin', 'compcls')
+ bt2.ComponentSpec('plugin', 'compcls')
def test_create_good_with_params(self):
- spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
+ bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
def test_create_good_with_path_params(self):
spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
- self.assertEqual(spec.params['path'], 'a path')
+ self.assertEqual(spec.params['paths'], ['a path'])
def test_create_wrong_plugin_name_type(self):
with self.assertRaises(TypeError):
- spec = bt2.ComponentSpec(23, 'compcls')
+ bt2.ComponentSpec(23, 'compcls')
def test_create_wrong_component_class_name_type(self):
with self.assertRaises(TypeError):
- spec = bt2.ComponentSpec('plugin', 190)
+ bt2.ComponentSpec('plugin', 190)
def test_create_wrong_params_type(self):
with self.assertRaises(TypeError):
- spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
+ bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
+
+
+# Return a map, msg type -> number of messages of this type.
+
+def _count_msgs_by_type(msgs):
+ res = {}
+
+ for msg in msgs:
+ t = type(msg)
+ n = res.get(t, 0)
+ res[t] = n + 1
+
+ return res
-@unittest.skip("this is broken")
class TraceCollectionMessageIteratorTestCase(unittest.TestCase):
def test_create_wrong_stream_intersection_mode_type(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
with self.assertRaises(TypeError):
- msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23)
+ bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23)
def test_create_wrong_begin_type(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
with self.assertRaises(TypeError):
- msg_iter = bt2.TraceCollectionMessageIterator(specs, begin='hi')
+ bt2.TraceCollectionMessageIterator(specs, begin='hi')
def test_create_wrong_end_type(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
with self.assertRaises(TypeError):
- msg_iter = bt2.TraceCollectionMessageIterator(specs, begin='lel')
+ bt2.TraceCollectionMessageIterator(specs, begin='lel')
def test_create_no_such_plugin(self):
specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
with self.assertRaises(bt2.Error):
- msg_iter = bt2.TraceCollectionMessageIterator(specs)
+ bt2.TraceCollectionMessageIterator(specs)
def test_create_begin_s(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=19457.918232)
+ bt2.TraceCollectionMessageIterator(specs, begin=19457.918232)
def test_create_end_s(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs, end=123.12312)
+ bt2.TraceCollectionMessageIterator(specs, end=123.12312)
def test_create_begin_datetime(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now())
+ bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now())
def test_create_end_datetime(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now())
+ bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now())
def test_iter_no_intersection(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
msg_iter = bt2.TraceCollectionMessageIterator(specs)
- self.assertEqual(len(list(msg_iter)), 28)
-
- def test_iter_no_intersection_subscribe(self):
- specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs,
- message_types=[bt2.EventMessage])
- self.assertEqual(len(list(msg_iter)), 8)
+ msgs = list(msg_iter)
+ self.assertEqual(len(msgs), 32)
+ hist = _count_msgs_by_type(msgs)
+ self.assertEqual(hist[bt2.message._EventMessage], 8)
+ # Same as the above, but we pass a single spec instead of a spec list.
def test_iter_specs_not_list(self):
spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
- msg_iter = bt2.TraceCollectionMessageIterator(spec,
- message_types=[bt2.EventMessage])
- self.assertEqual(len(list(msg_iter)), 8)
+ msg_iter = bt2.TraceCollectionMessageIterator(spec)
+ msgs = list(msg_iter)
+ self.assertEqual(len(msgs), 32)
+ hist = _count_msgs_by_type(msgs)
+ self.assertEqual(hist[bt2.message._EventMessage], 8)
def test_iter_custom_filter(self):
src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
- 'end': 13515309000000075,
+ 'end': '13515309.000000075',
})
- msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec,
- message_types=[bt2.EventMessage])
- self.assertEqual(len(list(msg_iter)), 5)
+ msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec)
+ hist = _count_msgs_by_type(msg_iter)
+ self.assertEqual(hist[bt2.message._EventMessage], 5)
def test_iter_intersection(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True)
- self.assertEqual(len(list(msg_iter)), 15)
-
- def test_iter_intersection_subscribe(self):
- specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True,
- message_types=[bt2.EventMessage])
- self.assertEqual(len(list(msg_iter)), 3)
+ msgs = list(msg_iter)
+ self.assertEqual(len(msgs), 19)
+ hist = _count_msgs_by_type(msgs)
+ self.assertEqual(hist[bt2.message._EventMessage], 3)
def test_iter_intersection_no_path_param(self):
specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
with self.assertRaises(bt2.Error):
- msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True,
- message_types=[bt2.EventMessage])
+ bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True)
def test_iter_no_intersection_two_traces(self):
spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
specs = [spec, spec]
msg_iter = bt2.TraceCollectionMessageIterator(specs)
- self.assertEqual(len(list(msg_iter)), 56)
+ msgs = list(msg_iter)
+ self.assertEqual(len(msgs), 64)
+ hist = _count_msgs_by_type(msgs)
+ self.assertEqual(hist[bt2.message._EventMessage], 16)
def test_iter_no_intersection_begin(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs,
- message_types=[bt2.EventMessage],
- begin=13515309.000000023)
- self.assertEqual(len(list(msg_iter)), 6)
+ msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=13515309.000000023)
+ hist = _count_msgs_by_type(msg_iter)
+ self.assertEqual(hist[bt2.message._EventMessage], 6)
def test_iter_no_intersection_end(self):
specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
- msg_iter = bt2.TraceCollectionMessageIterator(specs,
- message_types=[bt2.EventMessage],
- end=13515309.000000075)
- self.assertEqual(len(list(msg_iter)), 5)
+ msg_iter = bt2.TraceCollectionMessageIterator(specs, end=13515309.000000075)
+ hist = _count_msgs_by_type(msg_iter)
+ self.assertEqual(hist[bt2.message._EventMessage], 5)