Move to kernel style SPDX license identifiers
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_message_iterator.py
index 947c37b21acd412e58ed8465801c24f501942175..f2ddb6d03394976cd477c0510c0001d739133ec9 100644 (file)
@@ -1,20 +1,7 @@
+# SPDX-License-Identifier: GPL-2.0-only
 #
 # Copyright (C) 2019 EfficiOS Inc.
 #
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; only version 2
-# of the License.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-#
 
 import unittest
 import datetime
@@ -40,28 +27,136 @@ _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH = os.path.join(
 )
 
 
+class _SomeSource(
+    bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator
+):
+    pass
+
+
+class _SomeFilter(
+    bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator
+):
+    pass
+
+
+class _SomeSink(bt2._UserSinkComponent):
+    def _user_consume(self):
+        pass
+
+
 class ComponentSpecTestCase(unittest.TestCase):
-    def test_create_good_no_params(self):
-        bt2.ComponentSpec('plugin', 'compcls')
+    def setUp(self):
+        # A source CC from a plugin.
+        self._dmesg_cc = bt2.find_plugin('text').source_component_classes['dmesg']
+        assert self._dmesg_cc is not None
+
+        # A filter CC from a plugin.
+        self._muxer_cc = bt2.find_plugin('utils').filter_component_classes['muxer']
+        assert self._muxer_cc is not None
+
+        # A sink CC from a plugin.
+        self._pretty_cc = bt2.find_plugin('text').sink_component_classes['pretty']
+        assert self._pretty_cc is not None
+
+    def test_create_source_from_name(self):
+        spec = bt2.ComponentSpec.from_named_plugin_and_component_class('text', 'dmesg')
+        self.assertEqual(spec.component_class.name, 'dmesg')
+
+    def test_create_source_from_plugin(self):
+        spec = bt2.ComponentSpec(self._dmesg_cc)
+        self.assertEqual(spec.component_class.name, 'dmesg')
+
+    def test_create_source_from_user(self):
+        spec = bt2.ComponentSpec(_SomeSource)
+        self.assertEqual(spec.component_class.name, '_SomeSource')
+
+    def test_create_filter_from_name(self):
+        spec = bt2.ComponentSpec.from_named_plugin_and_component_class('utils', 'muxer')
+        self.assertEqual(spec.component_class.name, 'muxer')
+
+    def test_create_filter_from_object(self):
+        spec = bt2.ComponentSpec(self._muxer_cc)
+        self.assertEqual(spec.component_class.name, 'muxer')
+
+    def test_create_sink_from_name(self):
+        with self.assertRaisesRegex(
+            KeyError,
+            'source or filter component class `pretty` not found in plugin `text`',
+        ):
+            bt2.ComponentSpec.from_named_plugin_and_component_class('text', 'pretty')
+
+    def test_create_sink_from_object(self):
+        with self.assertRaisesRegex(
+            TypeError,
+            "'_SinkComponentClassConst' is not a source or filter component class",
+        ):
+            bt2.ComponentSpec(self._pretty_cc)
+
+    def test_create_from_object_with_params(self):
+        spec = bt2.ComponentSpec(self._dmesg_cc, {'salut': 23})
+        self.assertEqual(spec.params['salut'], 23)
+
+    def test_create_from_name_with_params(self):
+        spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'text', 'dmesg', {'salut': 23}
+        )
+        self.assertEqual(spec.params['salut'], 23)
 
-    def test_create_good_with_params(self):
-        bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
+    def test_create_from_object_with_path_params(self):
+        spec = spec = bt2.ComponentSpec(self._dmesg_cc, 'a path')
+        self.assertEqual(spec.params['inputs'], ['a path'])
 
-    def test_create_good_with_path_params(self):
-        spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
+    def test_create_from_name_with_path_params(self):
+        spec = spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'text', 'dmesg', 'a path'
+        )
         self.assertEqual(spec.params['inputs'], ['a path'])
 
-    def test_create_wrong_plugin_name_type(self):
-        with self.assertRaises(TypeError):
-            bt2.ComponentSpec(23, 'compcls')
+    def test_create_wrong_comp_class_type(self):
+        with self.assertRaisesRegex(
+            TypeError, "'int' is not a source or filter component class"
+        ):
+            bt2.ComponentSpec(18)
+
+    def test_create_from_name_wrong_plugin_name_type(self):
+        with self.assertRaisesRegex(TypeError, "'int' is not a 'str' object"):
+            bt2.ComponentSpec.from_named_plugin_and_component_class(23, 'compcls')
+
+    def test_create_from_name_non_existent_plugin(self):
+        with self.assertRaisesRegex(
+            ValueError, "no such plugin: this_plugin_does_not_exist"
+        ):
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'this_plugin_does_not_exist', 'compcls'
+            )
 
-    def test_create_wrong_component_class_name_type(self):
-        with self.assertRaises(TypeError):
-            bt2.ComponentSpec('plugin', 190)
+    def test_create_from_name_wrong_component_class_name_type(self):
+        with self.assertRaisesRegex(TypeError, "'int' is not a 'str' object"):
+            bt2.ComponentSpec.from_named_plugin_and_component_class('utils', 190)
 
     def test_create_wrong_params_type(self):
-        with self.assertRaises(TypeError):
-            bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
+        with self.assertRaisesRegex(
+            TypeError, "cannot create value object from 'datetime' object"
+        ):
+            bt2.ComponentSpec(self._dmesg_cc, params=datetime.datetime.now())
+
+    def test_create_from_name_wrong_params_type(self):
+        with self.assertRaisesRegex(
+            TypeError, "cannot create value object from 'datetime' object"
+        ):
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'text', 'dmesg', datetime.datetime.now()
+            )
+
+    def test_create_wrong_log_level_type(self):
+        with self.assertRaisesRegex(TypeError, "'str' is not an 'int' object"):
+            bt2.ComponentSpec(self._dmesg_cc, logging_level='banane')
+
+    def test_create_from_name_wrong_log_level_type(self):
+        with self.assertRaisesRegex(TypeError, "'str' is not an 'int' object"):
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'text', 'dmesg', logging_level='banane'
+            )
 
 
 # Return a map, msg type -> number of messages of this type.
@@ -80,105 +175,175 @@ def _count_msgs_by_type(msgs):
 
 class TraceCollectionMessageIteratorTestCase(unittest.TestCase):
     def test_create_wrong_stream_intersection_mode_type(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
 
         with self.assertRaises(TypeError):
             bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23)
 
     def test_create_wrong_begin_type(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
 
         with self.assertRaises(TypeError):
             bt2.TraceCollectionMessageIterator(specs, begin='hi')
 
     def test_create_wrong_end_type(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
 
         with self.assertRaises(TypeError):
             bt2.TraceCollectionMessageIterator(specs, begin='lel')
 
-    def test_create_no_such_plugin(self):
-        specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
-
-        with self.assertRaises(ValueError):
-            bt2.TraceCollectionMessageIterator(specs)
-
     def test_create_begin_s(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         bt2.TraceCollectionMessageIterator(specs, begin=19457.918232)
 
     def test_create_end_s(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         bt2.TraceCollectionMessageIterator(specs, end=123.12312)
 
     def test_create_begin_datetime(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now())
 
     def test_create_end_datetime(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now())
 
     def test_iter_no_intersection(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         msg_iter = bt2.TraceCollectionMessageIterator(specs)
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 28)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 8)
+        self.assertEqual(hist[bt2._EventMessageConst], 8)
 
     # Same as the above, but we pass a single spec instead of a spec list.
     def test_iter_specs_not_list(self):
-        spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
+        spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+        )
         msg_iter = bt2.TraceCollectionMessageIterator(spec)
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 28)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 8)
+        self.assertEqual(hist[bt2._EventMessageConst], 8)
 
     def test_iter_custom_filter(self):
-        src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
-        flt_spec = bt2.ComponentSpec('utils', 'trimmer', {'end': '13515309.000000075'})
+        src_spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+        )
+        flt_spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'utils', 'trimmer', {'end': '13515309.000000075'}
+        )
         msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec)
         hist = _count_msgs_by_type(msg_iter)
-        self.assertEqual(hist[bt2._EventMessage], 5)
+        self.assertEqual(hist[bt2._EventMessageConst], 5)
 
     def test_iter_intersection(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         msg_iter = bt2.TraceCollectionMessageIterator(
             specs, stream_intersection_mode=True
         )
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 15)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 3)
+        self.assertEqual(hist[bt2._EventMessageConst], 3)
+
+    def test_iter_intersection_params(self):
+        # Check that all params used to create the source component are passed
+        # to the `babeltrace.trace-infos` query.
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf',
+                'fs',
+                {
+                    'inputs': [_3EVENTS_INTERSECT_TRACE_PATH],
+                    'clock-class-offset-s': 1000,
+                },
+            )
+        ]
 
-    def test_iter_intersection_no_inputs_param(self):
-        specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
+        msg_iter = bt2.TraceCollectionMessageIterator(
+            specs, stream_intersection_mode=True
+        )
 
-        with self.assertRaises(ValueError):
-            bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True)
+        event_msgs = [x for x in msg_iter if type(x) is bt2._EventMessageConst]
+        self.assertEqual(len(event_msgs), 3)
+        self.assertEqual(
+            event_msgs[0].default_clock_snapshot.ns_from_origin, 13516309000000071
+        )
+        self.assertEqual(
+            event_msgs[1].default_clock_snapshot.ns_from_origin, 13516309000000072
+        )
+        self.assertEqual(
+            event_msgs[2].default_clock_snapshot.ns_from_origin, 13516309000000082
+        )
 
     def test_iter_no_intersection_two_traces(self):
-        spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
+        spec = bt2.ComponentSpec.from_named_plugin_and_component_class(
+            'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+        )
         specs = [spec, spec]
         msg_iter = bt2.TraceCollectionMessageIterator(specs)
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 56)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 16)
+        self.assertEqual(hist[bt2._EventMessageConst], 16)
 
     def test_iter_no_intersection_begin(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=13515309.000000023)
         hist = _count_msgs_by_type(msg_iter)
-        self.assertEqual(hist[bt2._EventMessage], 6)
+        self.assertEqual(hist[bt2._EventMessageConst], 6)
 
     def test_iter_no_intersection_end(self):
-        specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
+        specs = [
+            bt2.ComponentSpec.from_named_plugin_and_component_class(
+                'ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
+            )
+        ]
         msg_iter = bt2.TraceCollectionMessageIterator(specs, end=13515309.000000075)
         hist = _count_msgs_by_type(msg_iter)
-        self.assertEqual(hist[bt2._EventMessage], 5)
+        self.assertEqual(hist[bt2._EventMessageConst], 5)
 
     def test_iter_auto_source_component_spec(self):
         specs = [bt2.AutoSourceComponentSpec(_3EVENTS_INTERSECT_TRACE_PATH)]
@@ -186,40 +351,53 @@ class TraceCollectionMessageIteratorTestCase(unittest.TestCase):
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 28)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 8)
+        self.assertEqual(hist[bt2._EventMessageConst], 8)
 
     def test_iter_auto_source_component_spec_list_of_strings(self):
         msg_iter = bt2.TraceCollectionMessageIterator([_3EVENTS_INTERSECT_TRACE_PATH])
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 28)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 8)
+        self.assertEqual(hist[bt2._EventMessageConst], 8)
 
     def test_iter_auto_source_component_spec_string(self):
         msg_iter = bt2.TraceCollectionMessageIterator(_3EVENTS_INTERSECT_TRACE_PATH)
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 28)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 8)
+        self.assertEqual(hist[bt2._EventMessageConst], 8)
 
     def test_iter_mixed_inputs(self):
         msg_iter = bt2.TraceCollectionMessageIterator(
             [
                 _3EVENTS_INTERSECT_TRACE_PATH,
                 bt2.AutoSourceComponentSpec(_SEQUENCE_TRACE_PATH),
-                bt2.ComponentSpec('ctf', 'fs', _NOINTERSECT_TRACE_PATH),
+                bt2.ComponentSpec.from_named_plugin_and_component_class(
+                    'ctf', 'fs', _NOINTERSECT_TRACE_PATH
+                ),
             ]
         )
         msgs = list(msg_iter)
         self.assertEqual(len(msgs), 76)
         hist = _count_msgs_by_type(msgs)
-        self.assertEqual(hist[bt2._EventMessage], 24)
+        self.assertEqual(hist[bt2._EventMessageConst], 24)
+
+    def test_auto_source_component_non_existent(self):
+        with self.assertRaisesRegex(
+            RuntimeError,
+            'Some auto source component specs did not produce any component',
+        ):
+            # Test with one path known to contain a trace and one path known
+            # to not contain any trace.
+            bt2.TraceCollectionMessageIterator(
+                [_SEQUENCE_TRACE_PATH, '/this/path/better/not/exist']
+            )
 
 
 class _TestAutoDiscoverSourceComponentSpecs(unittest.TestCase):
     def setUp(self):
         self._saved_babeltrace_plugin_path = os.environ['BABELTRACE_PLUGIN_PATH']
-        os.environ['BABELTRACE_PLUGIN_PATH'] += ':' + self._plugin_path
+        os.environ['BABELTRACE_PLUGIN_PATH'] += os.pathsep + self._plugin_path
 
     def tearDown(self):
         os.environ['BABELTRACE_PLUGIN_PATH'] = self._saved_babeltrace_plugin_path
@@ -234,10 +412,9 @@ class TestAutoDiscoverSourceComponentSpecsGrouping(
         specs = [
             bt2.AutoSourceComponentSpec('ABCDE'),
             bt2.AutoSourceComponentSpec(_AUTO_SOURCE_DISCOVERY_GROUPING_PATH),
-            bt2.AutoSourceComponentSpec('does-not-exist'),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 8)
 
@@ -267,7 +444,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             )
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
 
@@ -319,7 +496,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             ),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
 
@@ -383,7 +560,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             ),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
 
@@ -450,7 +627,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             ),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
 
@@ -511,7 +688,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             ),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
         self.assertEqual(msgs[0].stream.name, "TestSourceA: None")
@@ -525,7 +702,7 @@ class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
             bt2.AutoSourceComponentSpec(self._dir_a, params={'what': 'python-obj'}),
         ]
         it = bt2.TraceCollectionMessageIterator(specs)
-        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage]
+        msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst]
 
         self.assertEqual(len(msgs), 2)
         self.assertEqual(msgs[0].stream.name, "TestSourceA: deore")
This page took 0.028712 seconds and 4 git commands to generate.