From: Philippe Proulx Date: Thu, 2 Nov 2017 17:14:54 +0000 (-0400) Subject: bt2: TraceCollectionNotificationIterator: support custom filter CCs X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=commitdiff_plain;h=3d60267b222697603cdcbb942d79a12fd7c9ce44 bt2: TraceCollectionNotificationIterator: support custom filter CCs This patch adds an optional `filter_component_specs` parameter to TraceCollectionNotificationIterator's constructor so that the trace collection notification iterator supports a chain of custom filter components. The filter chain is connected to the implicit muxer's output port or to the implicit trimmer's output port if it exists. This is useful to add debugging information, for example: src = bt2.ComponentSpec('ctf', 'fs', trace_path) flt = bt2.ComponentSpec('lttng-utils', 'debug-info') it = bt2.TraceCollectionNotificationIterator(src, flt) for notif in it: ... Signed-off-by: Philippe Proulx Signed-off-by: Jérémie Galarneau --- diff --git a/bindings/python/babeltrace/babeltrace/reader_trace_collection.py b/bindings/python/babeltrace/babeltrace/reader_trace_collection.py index d888f8d8..0ca484d7 100644 --- a/bindings/python/babeltrace/babeltrace/reader_trace_collection.py +++ b/bindings/python/babeltrace/babeltrace/reader_trace_collection.py @@ -166,7 +166,7 @@ class TraceCollection: return self._gen_events(timestamp_begin / 1e9, timestamp_end / 1e9) def _gen_events(self, begin_s=None, end_s=None): - specs = [bt2.SourceComponentSpec('ctf', 'fs', th.path) for th in self._trace_handles] + specs = [bt2.ComponentSpec('ctf', 'fs', th.path) for th in self._trace_handles] try: iter_cls = bt2.TraceCollectionNotificationIterator diff --git a/bindings/python/babeltrace/babeltrace/reader_trace_handle.py b/bindings/python/babeltrace/babeltrace/reader_trace_handle.py index 07c36bb2..4d682300 100644 --- a/bindings/python/babeltrace/babeltrace/reader_trace_handle.py +++ b/bindings/python/babeltrace/babeltrace/reader_trace_handle.py @@ -113,7 +113,7 @@ class TraceHandle: def _get_event_declarations(self): notif_iter = bt2.TraceCollectionNotificationIterator([ - bt2.SourceComponentSpec('ctf', 'fs', self._path) + bt2.ComponentSpec('ctf', 'fs', self._path) ]) # raises if the trace contains no streams diff --git a/bindings/python/bt2/bt2/trace_collection_notification_iterator.py b/bindings/python/bt2/bt2/trace_collection_notification_iterator.py index d7f1ae5e..a2343673 100644 --- a/bindings/python/bt2/bt2/trace_collection_notification_iterator.py +++ b/bindings/python/bt2/bt2/trace_collection_notification_iterator.py @@ -22,6 +22,7 @@ from bt2 import utils import bt2 +import itertools import bt2.notification_iterator import datetime import collections.abc @@ -29,12 +30,11 @@ from collections import namedtuple import numbers -# a pair of source component and _SourceComponentSpec -_SourceComponentAndSpec = namedtuple('_SourceComponentAndSpec', - ['comp', 'spec']) +# a pair of component and ComponentSpec +_ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) -class SourceComponentSpec: +class ComponentSpec: def __init__(self, plugin_name, component_class_name, params=None): utils._check_str(plugin_name) utils._check_str(component_class_name) @@ -76,29 +76,46 @@ def _get_ns(obj): return int(s * 1e9) +class _CompClsType: + SOURCE = 0 + FILTER = 1 + + class TraceCollectionNotificationIterator(bt2.notification_iterator._NotificationIterator): - def __init__(self, source_component_specs, notification_types=None, - stream_intersection_mode=False, begin=None, - end=None): + def __init__(self, source_component_specs, filter_component_specs=None, + notification_types=None, stream_intersection_mode=False, + begin=None, end=None): utils._check_bool(stream_intersection_mode) self._stream_intersection_mode = stream_intersection_mode self._begin_ns = _get_ns(begin) self._end_ns = _get_ns(end) self._notification_types = notification_types + + if type(source_component_specs) is ComponentSpec: + source_component_specs = [source_component_specs] + + if type(filter_component_specs) is ComponentSpec: + filter_component_specs = [filter_component_specs] + elif filter_component_specs is None: + filter_component_specs = [] + self._src_comp_specs = source_component_specs + self._flt_comp_specs = filter_component_specs self._next_suffix = 1 self._connect_ports = False - # set of _SourceComponentAndSpec + # lists of _ComponentAndSpec self._src_comps_and_specs = [] + self._flt_comps_and_specs = [] - self._validate_source_component_specs() + self._validate_component_specs(source_component_specs) + self._validate_component_specs(filter_component_specs) self._build_graph() - def _validate_source_component_specs(self): - for source_comp_spec in self._src_comp_specs: - if type(source_comp_spec) is not SourceComponentSpec: - raise TypeError('"{}" object is not a SourceComponentSpec'.format(type(source_comp_spec))) + def _validate_component_specs(self, comp_specs): + for comp_spec in comp_specs: + if type(comp_spec) is not ComponentSpec: + raise TypeError('"{}" object is not a ComponentSpec'.format(type(comp_spec))) def __next__(self): return next(self._notif_iter) @@ -176,28 +193,37 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio comp_cls = plugin.filter_component_classes['trimmer'] return self._graph.add_component(comp_cls, name, params) - def _get_unique_src_comp_name(self, comp_spec): + def _get_unique_comp_name(self, comp_spec): name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.component_class_name) + comps_and_specs = itertools.chain(self._src_comps_and_specs, + self._flt_comps_and_specs) - if name in [comp_and_spec.comp.name for comp_and_spec in self._src_comps_and_specs]: + if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: name += '-{}'.format(self._next_suffix) self._next_suffix += 1 return name - def _create_src_comp(self, comp_spec): + def _create_comp(self, comp_spec, comp_cls_type): plugin = bt2.find_plugin(comp_spec.plugin_name) if plugin is None: raise bt2.Error('no such plugin: {}'.format(comp_spec.plugin_name)) - if comp_spec.component_class_name not in plugin.source_component_classes: - raise bt2.Error('no such source component class in "{}" plugin: {}'.format(comp_spec.plugin_name, - comp_spec.component_class_name)) + if comp_cls_type == _CompClsType.SOURCE: + comp_classes = plugin.source_component_classes + else: + comp_classes = plugin.filter_component_classes + + if comp_spec.component_class_name not in comp_classes: + cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter' + raise bt2.Error('no such {} component class in "{}" plugin: {}'.format(cc_type, + comp_spec.plugin_name, + comp_spec.component_class_name)) - comp_cls = plugin.source_component_classes[comp_spec.component_class_name] - name = self._get_unique_src_comp_name(comp_spec) + comp_cls = comp_classes[comp_spec.component_class_name] + name = self._get_unique_comp_name(comp_spec) comp = self._graph.add_component(comp_cls, name, comp_spec.params) return comp @@ -252,6 +278,18 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio else: notif_iter_port = self._muxer_comp.output_ports['out'] + # create extra filter components (chained) + for comp_spec in self._flt_comp_specs: + comp = self._create_comp(comp_spec, _CompClsType.FILTER) + self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) + + # connect the extra filter chain + for comp_and_spec in self._flt_comps_and_specs: + in_port = list(comp_and_spec.comp.input_ports.values())[0] + out_port = list(comp_and_spec.comp.output_ports.values())[0] + self._graph.connect_ports(notif_iter_port, in_port) + notif_iter_port = out_port + # Here we create the components, self._graph_port_added() is # called when they add ports, but the callback returns early # because self._connect_ports is False. This is because the @@ -260,8 +298,8 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio # it does not exist yet (it needs the created component to # exist). for comp_spec in self._src_comp_specs: - comp = self._create_src_comp(comp_spec) - self._src_comps_and_specs.append(_SourceComponentAndSpec(comp, comp_spec)) + comp = self._create_comp(comp_spec, _CompClsType.SOURCE) + self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) # Now we connect the ports which exist at this point. We allow # self._graph_port_added() to automatically connect _new_ ports. diff --git a/tests/bindings/python/bt2/test_trace_collection_notification_iterator.py b/tests/bindings/python/bt2/test_trace_collection_notification_iterator.py index 459607ea..6dd774ae 100644 --- a/tests/bindings/python/bt2/test_trace_collection_notification_iterator.py +++ b/tests/bindings/python/bt2/test_trace_collection_notification_iterator.py @@ -13,115 +13,130 @@ _3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH, '3eventsintersect') -class SourceComponentSpecTestCase(unittest.TestCase): +class ComponentSpecTestCase(unittest.TestCase): def test_create_good_no_params(self): - spec = bt2.SourceComponentSpec('plugin', 'compcls') + spec = bt2.ComponentSpec('plugin', 'compcls') def test_create_good_with_params(self): - spec = bt2.SourceComponentSpec('plugin', 'compcls', {'salut': 23}) + spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23}) def test_create_good_with_path_params(self): - spec = bt2.SourceComponentSpec('plugin', 'compcls', 'a path') + spec = bt2.ComponentSpec('plugin', 'compcls', 'a path') self.assertEqual(spec.params['path'], 'a path') def test_create_wrong_plugin_name_type(self): with self.assertRaises(TypeError): - spec = bt2.SourceComponentSpec(23, 'compcls') + spec = bt2.ComponentSpec(23, 'compcls') def test_create_wrong_component_class_name_type(self): with self.assertRaises(TypeError): - spec = bt2.SourceComponentSpec('plugin', 190) + spec = bt2.ComponentSpec('plugin', 190) def test_create_wrong_params_type(self): with self.assertRaises(TypeError): - spec = bt2.SourceComponentSpec('dwdw', 'compcls', datetime.datetime.now()) + spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now()) class TraceCollectionNotificationIteratorTestCase(unittest.TestCase): def test_create_wrong_stream_intersection_mode_type(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] with self.assertRaises(TypeError): notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23) def test_create_wrong_begin_type(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] with self.assertRaises(TypeError): notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi') def test_create_wrong_end_type(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] with self.assertRaises(TypeError): notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel') def test_create_no_such_plugin(self): - specs = [bt2.SourceComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] with self.assertRaises(bt2.Error): notif_iter = bt2.TraceCollectionNotificationIterator(specs) def test_create_begin_s(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232) def test_create_end_s(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312) def test_create_begin_datetime(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now()) def test_create_end_datetime(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now()) def test_iter_no_intersection(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs) self.assertEqual(len(list(notif_iter)), 28) def test_iter_no_intersection_subscribe(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, notification_types=[bt2.EventNotification]) self.assertEqual(len(list(notif_iter)), 8) + def test_iter_specs_not_list(self): + spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) + notif_iter = bt2.TraceCollectionNotificationIterator(spec, + notification_types=[bt2.EventNotification]) + self.assertEqual(len(list(notif_iter)), 8) + + def test_iter_custom_filter(self): + src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) + flt_spec = bt2.ComponentSpec('utils', 'trimmer', { + 'end': 13515309000000075, + }) + notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec, + notification_types=[bt2.EventNotification]) + self.assertEqual(len(list(notif_iter)), 5) + def test_iter_intersection(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True) self.assertEqual(len(list(notif_iter)), 15) def test_iter_intersection_subscribe(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, notification_types=[bt2.EventNotification]) self.assertEqual(len(list(notif_iter)), 3) def test_iter_intersection_no_path_param(self): - specs = [bt2.SourceComponentSpec('text', 'dmesg', {'read-from-stdin': True})] + specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})] with self.assertRaises(bt2.Error): notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, notification_types=[bt2.EventNotification]) def test_iter_no_intersection_two_traces(self): - spec = bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) + spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) specs = [spec, spec] notif_iter = bt2.TraceCollectionNotificationIterator(specs) self.assertEqual(len(list(notif_iter)), 56) def test_iter_no_intersection_begin(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, notification_types=[bt2.EventNotification], begin=13515309.000000023) self.assertEqual(len(list(notif_iter)), 6) def test_iter_no_intersection_end(self): - specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] notif_iter = bt2.TraceCollectionNotificationIterator(specs, notification_types=[bt2.EventNotification], end=13515309.000000075)