bt2: TraceCollectionNotificationIterator: support custom filter CCs
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Thu, 2 Nov 2017 17:14:54 +0000 (13:14 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 2 Nov 2017 19:56:35 +0000 (15:56 -0400)
This patch adds an optional `filter_component_specs` parameter to
TraceCollectionNotificationIterator's constructor so that the trace
collection notification iterator supports a chain of custom filter
components. The filter chain is connected to the implicit muxer's output
port or to the implicit trimmer's output port if it exists.

This is useful to add debugging information, for example:

    src = bt2.ComponentSpec('ctf', 'fs', trace_path)
    flt = bt2.ComponentSpec('lttng-utils', 'debug-info')
    it = bt2.TraceCollectionNotificationIterator(src, flt)

    for notif in it:
        ...

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
bindings/python/babeltrace/babeltrace/reader_trace_collection.py
bindings/python/babeltrace/babeltrace/reader_trace_handle.py
bindings/python/bt2/bt2/trace_collection_notification_iterator.py
tests/bindings/python/bt2/test_trace_collection_notification_iterator.py

index d888f8d811ffb7c4e80b0bc2b1fbae82d0fd8619..0ca484d70082b2c5dc16f7f935bbc8538424689d 100644 (file)
@@ -166,7 +166,7 @@ class TraceCollection:
         return self._gen_events(timestamp_begin / 1e9, timestamp_end / 1e9)
 
     def _gen_events(self, begin_s=None, end_s=None):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', th.path) for th in self._trace_handles]
+        specs = [bt2.ComponentSpec('ctf', 'fs', th.path) for th in self._trace_handles]
 
         try:
             iter_cls = bt2.TraceCollectionNotificationIterator
index 07c36bb22cce61a72fb8b7f52350b692f3dea06f..4d6823008ac4fa625a2dbd6d8e8ee05b1fcbb047 100644 (file)
@@ -113,7 +113,7 @@ class TraceHandle:
 
     def _get_event_declarations(self):
         notif_iter = bt2.TraceCollectionNotificationIterator([
-            bt2.SourceComponentSpec('ctf', 'fs', self._path)
+            bt2.ComponentSpec('ctf', 'fs', self._path)
         ])
 
         # raises if the trace contains no streams
index d7f1ae5eb4a6d342ae25e81eda0388cfeb122b83..a23436736d687c6796fae3c08b95a205148487ed 100644 (file)
@@ -22,6 +22,7 @@
 
 from bt2 import utils
 import bt2
+import itertools
 import bt2.notification_iterator
 import datetime
 import collections.abc
@@ -29,12 +30,11 @@ from collections import namedtuple
 import numbers
 
 
-# a pair of source component and _SourceComponentSpec
-_SourceComponentAndSpec = namedtuple('_SourceComponentAndSpec',
-                                     ['comp', 'spec'])
+# a pair of component and ComponentSpec
+_ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec'])
 
 
-class SourceComponentSpec:
+class ComponentSpec:
     def __init__(self, plugin_name, component_class_name, params=None):
         utils._check_str(plugin_name)
         utils._check_str(component_class_name)
@@ -76,29 +76,46 @@ def _get_ns(obj):
     return int(s * 1e9)
 
 
+class _CompClsType:
+    SOURCE = 0
+    FILTER = 1
+
+
 class TraceCollectionNotificationIterator(bt2.notification_iterator._NotificationIterator):
-    def __init__(self, source_component_specs, notification_types=None,
-                 stream_intersection_mode=False, begin=None,
-                 end=None):
+    def __init__(self, source_component_specs, filter_component_specs=None,
+                 notification_types=None, stream_intersection_mode=False,
+                 begin=None, end=None):
         utils._check_bool(stream_intersection_mode)
         self._stream_intersection_mode = stream_intersection_mode
         self._begin_ns = _get_ns(begin)
         self._end_ns = _get_ns(end)
         self._notification_types = notification_types
+
+        if type(source_component_specs) is ComponentSpec:
+            source_component_specs = [source_component_specs]
+
+        if type(filter_component_specs) is ComponentSpec:
+            filter_component_specs = [filter_component_specs]
+        elif filter_component_specs is None:
+            filter_component_specs = []
+
         self._src_comp_specs = source_component_specs
+        self._flt_comp_specs = filter_component_specs
         self._next_suffix = 1
         self._connect_ports = False
 
-        # set of _SourceComponentAndSpec
+        # lists of _ComponentAndSpec
         self._src_comps_and_specs = []
+        self._flt_comps_and_specs = []
 
-        self._validate_source_component_specs()
+        self._validate_component_specs(source_component_specs)
+        self._validate_component_specs(filter_component_specs)
         self._build_graph()
 
-    def _validate_source_component_specs(self):
-        for source_comp_spec in self._src_comp_specs:
-            if type(source_comp_spec) is not SourceComponentSpec:
-                raise TypeError('"{}" object is not a SourceComponentSpec'.format(type(source_comp_spec)))
+    def _validate_component_specs(self, comp_specs):
+        for comp_spec in comp_specs:
+            if type(comp_spec) is not ComponentSpec:
+                raise TypeError('"{}" object is not a ComponentSpec'.format(type(comp_spec)))
 
     def __next__(self):
         return next(self._notif_iter)
@@ -176,28 +193,37 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio
         comp_cls = plugin.filter_component_classes['trimmer']
         return self._graph.add_component(comp_cls, name, params)
 
-    def _get_unique_src_comp_name(self, comp_spec):
+    def _get_unique_comp_name(self, comp_spec):
         name = '{}-{}'.format(comp_spec.plugin_name,
                               comp_spec.component_class_name)
+        comps_and_specs = itertools.chain(self._src_comps_and_specs,
+                                          self._flt_comps_and_specs)
 
-        if name in [comp_and_spec.comp.name for comp_and_spec in self._src_comps_and_specs]:
+        if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
             name += '-{}'.format(self._next_suffix)
             self._next_suffix += 1
 
         return name
 
-    def _create_src_comp(self, comp_spec):
+    def _create_comp(self, comp_spec, comp_cls_type):
         plugin = bt2.find_plugin(comp_spec.plugin_name)
 
         if plugin is None:
             raise bt2.Error('no such plugin: {}'.format(comp_spec.plugin_name))
 
-        if comp_spec.component_class_name not in plugin.source_component_classes:
-            raise bt2.Error('no such source component class in "{}" plugin: {}'.format(comp_spec.plugin_name,
-                                                                                       comp_spec.component_class_name))
+        if comp_cls_type == _CompClsType.SOURCE:
+            comp_classes = plugin.source_component_classes
+        else:
+            comp_classes = plugin.filter_component_classes
+
+        if comp_spec.component_class_name not in comp_classes:
+            cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter'
+            raise bt2.Error('no such {} component class in "{}" plugin: {}'.format(cc_type,
+                                                                                   comp_spec.plugin_name,
+                                                                                   comp_spec.component_class_name))
 
-        comp_cls = plugin.source_component_classes[comp_spec.component_class_name]
-        name = self._get_unique_src_comp_name(comp_spec)
+        comp_cls = comp_classes[comp_spec.component_class_name]
+        name = self._get_unique_comp_name(comp_spec)
         comp = self._graph.add_component(comp_cls, name, comp_spec.params)
         return comp
 
@@ -252,6 +278,18 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio
         else:
             notif_iter_port = self._muxer_comp.output_ports['out']
 
+        # create extra filter components (chained)
+        for comp_spec in self._flt_comp_specs:
+            comp = self._create_comp(comp_spec, _CompClsType.FILTER)
+            self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
+
+        # connect the extra filter chain
+        for comp_and_spec in self._flt_comps_and_specs:
+            in_port = list(comp_and_spec.comp.input_ports.values())[0]
+            out_port = list(comp_and_spec.comp.output_ports.values())[0]
+            self._graph.connect_ports(notif_iter_port, in_port)
+            notif_iter_port = out_port
+
         # Here we create the components, self._graph_port_added() is
         # called when they add ports, but the callback returns early
         # because self._connect_ports is False. This is because the
@@ -260,8 +298,8 @@ class TraceCollectionNotificationIterator(bt2.notification_iterator._Notificatio
         # it does not exist yet (it needs the created component to
         # exist).
         for comp_spec in self._src_comp_specs:
-            comp = self._create_src_comp(comp_spec)
-            self._src_comps_and_specs.append(_SourceComponentAndSpec(comp, comp_spec))
+            comp = self._create_comp(comp_spec, _CompClsType.SOURCE)
+            self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
 
         # Now we connect the ports which exist at this point. We allow
         # self._graph_port_added() to automatically connect _new_ ports.
index 459607ea451821ccd34b6d2da50f8342810fcd07..6dd774aeb5e5b812b8889291574ccb47cf37a074 100644 (file)
@@ -13,115 +13,130 @@ _3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
                                              '3eventsintersect')
 
 
-class SourceComponentSpecTestCase(unittest.TestCase):
+class ComponentSpecTestCase(unittest.TestCase):
     def test_create_good_no_params(self):
-        spec = bt2.SourceComponentSpec('plugin', 'compcls')
+        spec = bt2.ComponentSpec('plugin', 'compcls')
 
     def test_create_good_with_params(self):
-        spec = bt2.SourceComponentSpec('plugin', 'compcls', {'salut': 23})
+        spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
 
     def test_create_good_with_path_params(self):
-        spec = bt2.SourceComponentSpec('plugin', 'compcls', 'a path')
+        spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
         self.assertEqual(spec.params['path'], 'a path')
 
     def test_create_wrong_plugin_name_type(self):
         with self.assertRaises(TypeError):
-            spec = bt2.SourceComponentSpec(23, 'compcls')
+            spec = bt2.ComponentSpec(23, 'compcls')
 
     def test_create_wrong_component_class_name_type(self):
         with self.assertRaises(TypeError):
-            spec = bt2.SourceComponentSpec('plugin', 190)
+            spec = bt2.ComponentSpec('plugin', 190)
 
     def test_create_wrong_params_type(self):
         with self.assertRaises(TypeError):
-            spec = bt2.SourceComponentSpec('dwdw', 'compcls', datetime.datetime.now())
+            spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
 
 
 class TraceCollectionNotificationIteratorTestCase(unittest.TestCase):
     def test_create_wrong_stream_intersection_mode_type(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
 
         with self.assertRaises(TypeError):
             notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23)
 
     def test_create_wrong_begin_type(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
 
         with self.assertRaises(TypeError):
             notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi')
 
     def test_create_wrong_end_type(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
 
         with self.assertRaises(TypeError):
             notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel')
 
     def test_create_no_such_plugin(self):
-        specs = [bt2.SourceComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
 
         with self.assertRaises(bt2.Error):
             notif_iter = bt2.TraceCollectionNotificationIterator(specs)
 
     def test_create_begin_s(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232)
 
     def test_create_end_s(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312)
 
     def test_create_begin_datetime(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now())
 
     def test_create_end_datetime(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now())
 
     def test_iter_no_intersection(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs)
         self.assertEqual(len(list(notif_iter)), 28)
 
     def test_iter_no_intersection_subscribe(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs,
                                                              notification_types=[bt2.EventNotification])
         self.assertEqual(len(list(notif_iter)), 8)
 
+    def test_iter_specs_not_list(self):
+        spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
+        notif_iter = bt2.TraceCollectionNotificationIterator(spec,
+                                                             notification_types=[bt2.EventNotification])
+        self.assertEqual(len(list(notif_iter)), 8)
+
+    def test_iter_custom_filter(self):
+        src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
+        flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
+            'end': 13515309000000075,
+        })
+        notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec,
+                                                             notification_types=[bt2.EventNotification])
+        self.assertEqual(len(list(notif_iter)), 5)
+
     def test_iter_intersection(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True)
         self.assertEqual(len(list(notif_iter)), 15)
 
     def test_iter_intersection_subscribe(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
                                                              notification_types=[bt2.EventNotification])
         self.assertEqual(len(list(notif_iter)), 3)
 
     def test_iter_intersection_no_path_param(self):
-        specs = [bt2.SourceComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
+        specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
 
         with self.assertRaises(bt2.Error):
             notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
                                                                  notification_types=[bt2.EventNotification])
 
     def test_iter_no_intersection_two_traces(self):
-        spec = bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
+        spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
         specs = [spec, spec]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs)
         self.assertEqual(len(list(notif_iter)), 56)
 
     def test_iter_no_intersection_begin(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs,
                                                              notification_types=[bt2.EventNotification],
                                                              begin=13515309.000000023)
         self.assertEqual(len(list(notif_iter)), 6)
 
     def test_iter_no_intersection_end(self):
-        specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
         notif_iter = bt2.TraceCollectionNotificationIterator(specs,
                                                              notification_types=[bt2.EventNotification],
                                                              end=13515309.000000075)
This page took 0.030726 seconds and 4 git commands to generate.