X-Git-Url: http://git.efficios.com/?p=babeltrace.git;a=blobdiff_plain;f=tests%2Fbindings%2Fpython%2Fbt2%2Ftest_trace_collection_message_iterator.py;h=954b149c33645fc47c625b60a7681f1c130f08b1;hp=e30cad4332ca3b4dda88d83985480e95eaf36f0b;hb=827e42e017fc5f525aa39a3851bf2e7e50e887aa;hpb=907f2b70dce96396c5b5b0e2111539664d1a2c44 diff --git a/tests/bindings/python/bt2/test_trace_collection_message_iterator.py b/tests/bindings/python/bt2/test_trace_collection_message_iterator.py index e30cad43..954b149c 100644 --- a/tests/bindings/python/bt2/test_trace_collection_message_iterator.py +++ b/tests/bindings/python/bt2/test_trace_collection_message_iterator.py @@ -1,3 +1,8 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (C) 2019 EfficiOS Inc. +# + import unittest import datetime import bt2 @@ -5,38 +10,162 @@ import os import os.path -_TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH'] -_3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH, - 'intersection', - '3eventsintersect') +_BT_TESTS_DATADIR = os.environ["BT_TESTS_DATADIR"] +_BT_CTF_TRACES_PATH = os.environ["BT_CTF_TRACES_PATH"] +_3EVENTS_INTERSECT_TRACE_PATH = os.path.join( + _BT_CTF_TRACES_PATH, "intersection", "3eventsintersect" +) +_NOINTERSECT_TRACE_PATH = os.path.join( + _BT_CTF_TRACES_PATH, "intersection", "nointersect" +) +_SEQUENCE_TRACE_PATH = os.path.join(_BT_CTF_TRACES_PATH, "succeed", "sequence") +_AUTO_SOURCE_DISCOVERY_GROUPING_PATH = os.path.join( + _BT_TESTS_DATADIR, "auto-source-discovery", "grouping" +) +_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH = os.path.join( + _BT_TESTS_DATADIR, "auto-source-discovery", "params-log-level" +) +_METADATA_SYNTAX_ERROR_TRACE_PATH = os.path.join( + _BT_CTF_TRACES_PATH, "fail", "metadata-syntax-error" +) -class ComponentSpecTestCase(unittest.TestCase): - def test_create_good_no_params(self): - bt2.ComponentSpec('plugin', 'compcls') - def test_create_good_with_params(self): - bt2.ComponentSpec('plugin', 'compcls', {'salut': 23}) +class _SomeSource( + bt2._UserSourceComponent, message_iterator_class=bt2._UserMessageIterator +): + pass - def test_create_good_with_path_params(self): - spec = bt2.ComponentSpec('plugin', 'compcls', 'a path') - self.assertEqual(spec.params['paths'], ['a path']) - def test_create_wrong_plugin_name_type(self): - with self.assertRaises(TypeError): - bt2.ComponentSpec(23, 'compcls') +class _SomeFilter( + bt2._UserFilterComponent, message_iterator_class=bt2._UserMessageIterator +): + pass - def test_create_wrong_component_class_name_type(self): - with self.assertRaises(TypeError): - bt2.ComponentSpec('plugin', 190) + +class _SomeSink(bt2._UserSinkComponent): + def _user_consume(self): + pass + + +class ComponentSpecTestCase(unittest.TestCase): + def setUp(self): + # A source CC from a plugin. + self._dmesg_cc = bt2.find_plugin("text").source_component_classes["dmesg"] + assert self._dmesg_cc is not None + + # A filter CC from a plugin. + self._muxer_cc = bt2.find_plugin("utils").filter_component_classes["muxer"] + assert self._muxer_cc is not None + + # A sink CC from a plugin. + self._pretty_cc = bt2.find_plugin("text").sink_component_classes["pretty"] + assert self._pretty_cc is not None + + def test_create_source_from_name(self): + spec = bt2.ComponentSpec.from_named_plugin_and_component_class("text", "dmesg") + self.assertEqual(spec.component_class.name, "dmesg") + + def test_create_source_from_plugin(self): + spec = bt2.ComponentSpec(self._dmesg_cc) + self.assertEqual(spec.component_class.name, "dmesg") + + def test_create_source_from_user(self): + spec = bt2.ComponentSpec(_SomeSource) + self.assertEqual(spec.component_class.name, "_SomeSource") + + def test_create_filter_from_name(self): + spec = bt2.ComponentSpec.from_named_plugin_and_component_class("utils", "muxer") + self.assertEqual(spec.component_class.name, "muxer") + + def test_create_filter_from_object(self): + spec = bt2.ComponentSpec(self._muxer_cc) + self.assertEqual(spec.component_class.name, "muxer") + + def test_create_sink_from_name(self): + with self.assertRaisesRegex( + KeyError, + "source or filter component class `pretty` not found in plugin `text`", + ): + bt2.ComponentSpec.from_named_plugin_and_component_class("text", "pretty") + + def test_create_sink_from_object(self): + with self.assertRaisesRegex( + TypeError, + "'_SinkComponentClassConst' is not a source or filter component class", + ): + bt2.ComponentSpec(self._pretty_cc) + + def test_create_from_object_with_params(self): + spec = bt2.ComponentSpec(self._dmesg_cc, {"salut": 23}) + self.assertEqual(spec.params["salut"], 23) + + def test_create_from_name_with_params(self): + spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "text", "dmesg", {"salut": 23} + ) + self.assertEqual(spec.params["salut"], 23) + + def test_create_from_object_with_path_params(self): + spec = spec = bt2.ComponentSpec(self._dmesg_cc, "a path") + self.assertEqual(spec.params["inputs"], ["a path"]) + + def test_create_from_name_with_path_params(self): + spec = spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "text", "dmesg", "a path" + ) + self.assertEqual(spec.params["inputs"], ["a path"]) + + def test_create_wrong_comp_class_type(self): + with self.assertRaisesRegex( + TypeError, "'int' is not a source or filter component class" + ): + bt2.ComponentSpec(18) + + def test_create_from_name_wrong_plugin_name_type(self): + with self.assertRaisesRegex(TypeError, "'int' is not a 'str' object"): + bt2.ComponentSpec.from_named_plugin_and_component_class(23, "compcls") + + def test_create_from_name_non_existent_plugin(self): + with self.assertRaisesRegex( + ValueError, "no such plugin: this_plugin_does_not_exist" + ): + bt2.ComponentSpec.from_named_plugin_and_component_class( + "this_plugin_does_not_exist", "compcls" + ) + + def test_create_from_name_wrong_component_class_name_type(self): + with self.assertRaisesRegex(TypeError, "'int' is not a 'str' object"): + bt2.ComponentSpec.from_named_plugin_and_component_class("utils", 190) def test_create_wrong_params_type(self): - with self.assertRaises(TypeError): - bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now()) + with self.assertRaisesRegex( + TypeError, "cannot create value object from 'datetime' object" + ): + bt2.ComponentSpec(self._dmesg_cc, params=datetime.datetime.now()) + + def test_create_from_name_wrong_params_type(self): + with self.assertRaisesRegex( + TypeError, "cannot create value object from 'datetime' object" + ): + bt2.ComponentSpec.from_named_plugin_and_component_class( + "text", "dmesg", datetime.datetime.now() + ) + + def test_create_wrong_log_level_type(self): + with self.assertRaisesRegex(TypeError, "'str' is not an 'int' object"): + bt2.ComponentSpec(self._dmesg_cc, logging_level="banane") + + def test_create_from_name_wrong_log_level_type(self): + with self.assertRaisesRegex(TypeError, "'str' is not an 'int' object"): + bt2.ComponentSpec.from_named_plugin_and_component_class( + "text", "dmesg", logging_level="banane" + ) # Return a map, msg type -> number of messages of this type. + def _count_msgs_by_type(msgs): res = {} @@ -50,102 +179,549 @@ def _count_msgs_by_type(msgs): class TraceCollectionMessageIteratorTestCase(unittest.TestCase): def test_create_wrong_stream_intersection_mode_type(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] with self.assertRaises(TypeError): bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23) def test_create_wrong_begin_type(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] with self.assertRaises(TypeError): - bt2.TraceCollectionMessageIterator(specs, begin='hi') + bt2.TraceCollectionMessageIterator(specs, begin="hi") def test_create_wrong_end_type(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] with self.assertRaises(TypeError): - bt2.TraceCollectionMessageIterator(specs, begin='lel') - - def test_create_no_such_plugin(self): - specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] - - with self.assertRaises(bt2.Error): - bt2.TraceCollectionMessageIterator(specs) + bt2.TraceCollectionMessageIterator(specs, begin="lel") def test_create_begin_s(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] bt2.TraceCollectionMessageIterator(specs, begin=19457.918232) def test_create_end_s(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] bt2.TraceCollectionMessageIterator(specs, end=123.12312) def test_create_begin_datetime(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now()) def test_create_end_datetime(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now()) def test_iter_no_intersection(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] msg_iter = bt2.TraceCollectionMessageIterator(specs) msgs = list(msg_iter) - self.assertEqual(len(msgs), 32) + self.assertEqual(len(msgs), 28) hist = _count_msgs_by_type(msgs) - self.assertEqual(hist[bt2.message._EventMessage], 8) + self.assertEqual(hist[bt2._EventMessageConst], 8) # Same as the above, but we pass a single spec instead of a spec list. def test_iter_specs_not_list(self): - spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) + spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) msg_iter = bt2.TraceCollectionMessageIterator(spec) msgs = list(msg_iter) - self.assertEqual(len(msgs), 32) + self.assertEqual(len(msgs), 28) hist = _count_msgs_by_type(msgs) - self.assertEqual(hist[bt2.message._EventMessage], 8) + self.assertEqual(hist[bt2._EventMessageConst], 8) def test_iter_custom_filter(self): - src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) - flt_spec = bt2.ComponentSpec('utils', 'trimmer', { - 'end': '13515309.000000075', - }) + src_spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + flt_spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "utils", "trimmer", {"end": "13515309.000000075"} + ) msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec) hist = _count_msgs_by_type(msg_iter) - self.assertEqual(hist[bt2.message._EventMessage], 5) + self.assertEqual(hist[bt2._EventMessageConst], 5) def test_iter_intersection(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] - msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True) + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] + msg_iter = bt2.TraceCollectionMessageIterator( + specs, stream_intersection_mode=True + ) msgs = list(msg_iter) - self.assertEqual(len(msgs), 19) + self.assertEqual(len(msgs), 15) hist = _count_msgs_by_type(msgs) - self.assertEqual(hist[bt2.message._EventMessage], 3) - - def test_iter_intersection_no_path_param(self): - specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})] - - with self.assertRaises(bt2.Error): - bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True) + self.assertEqual(hist[bt2._EventMessageConst], 3) + + def test_iter_intersection_params(self): + # Check that all params used to create the source component are passed + # to the `babeltrace.trace-infos` query. + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", + "fs", + { + "inputs": [_3EVENTS_INTERSECT_TRACE_PATH], + "clock-class-offset-s": 1000, + }, + ) + ] + + msg_iter = bt2.TraceCollectionMessageIterator( + specs, stream_intersection_mode=True + ) + + event_msgs = [x for x in msg_iter if type(x) is bt2._EventMessageConst] + self.assertEqual(len(event_msgs), 3) + self.assertEqual( + event_msgs[0].default_clock_snapshot.ns_from_origin, 13516309000000071 + ) + self.assertEqual( + event_msgs[1].default_clock_snapshot.ns_from_origin, 13516309000000072 + ) + self.assertEqual( + event_msgs[2].default_clock_snapshot.ns_from_origin, 13516309000000082 + ) def test_iter_no_intersection_two_traces(self): - spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) + spec = bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) specs = [spec, spec] msg_iter = bt2.TraceCollectionMessageIterator(specs) msgs = list(msg_iter) - self.assertEqual(len(msgs), 64) + self.assertEqual(len(msgs), 56) hist = _count_msgs_by_type(msgs) - self.assertEqual(hist[bt2.message._EventMessage], 16) + self.assertEqual(hist[bt2._EventMessageConst], 16) def test_iter_no_intersection_begin(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=13515309.000000023) hist = _count_msgs_by_type(msg_iter) - self.assertEqual(hist[bt2.message._EventMessage], 6) + self.assertEqual(hist[bt2._EventMessageConst], 6) def test_iter_no_intersection_end(self): - specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] + specs = [ + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _3EVENTS_INTERSECT_TRACE_PATH + ) + ] msg_iter = bt2.TraceCollectionMessageIterator(specs, end=13515309.000000075) hist = _count_msgs_by_type(msg_iter) - self.assertEqual(hist[bt2.message._EventMessage], 5) + self.assertEqual(hist[bt2._EventMessageConst], 5) + + def test_iter_auto_source_component_spec(self): + specs = [bt2.AutoSourceComponentSpec(_3EVENTS_INTERSECT_TRACE_PATH)] + msg_iter = bt2.TraceCollectionMessageIterator(specs) + msgs = list(msg_iter) + self.assertEqual(len(msgs), 28) + hist = _count_msgs_by_type(msgs) + self.assertEqual(hist[bt2._EventMessageConst], 8) + + def test_iter_auto_source_component_spec_list_of_strings(self): + msg_iter = bt2.TraceCollectionMessageIterator([_3EVENTS_INTERSECT_TRACE_PATH]) + msgs = list(msg_iter) + self.assertEqual(len(msgs), 28) + hist = _count_msgs_by_type(msgs) + self.assertEqual(hist[bt2._EventMessageConst], 8) + + def test_iter_auto_source_component_spec_string(self): + msg_iter = bt2.TraceCollectionMessageIterator(_3EVENTS_INTERSECT_TRACE_PATH) + msgs = list(msg_iter) + self.assertEqual(len(msgs), 28) + hist = _count_msgs_by_type(msgs) + self.assertEqual(hist[bt2._EventMessageConst], 8) + + def test_iter_mixed_inputs(self): + msg_iter = bt2.TraceCollectionMessageIterator( + [ + _3EVENTS_INTERSECT_TRACE_PATH, + bt2.AutoSourceComponentSpec(_SEQUENCE_TRACE_PATH), + bt2.ComponentSpec.from_named_plugin_and_component_class( + "ctf", "fs", _NOINTERSECT_TRACE_PATH + ), + ] + ) + msgs = list(msg_iter) + self.assertEqual(len(msgs), 76) + hist = _count_msgs_by_type(msgs) + self.assertEqual(hist[bt2._EventMessageConst], 24) + + def test_auto_source_component_non_existent(self): + with self.assertRaisesRegex( + RuntimeError, + "Some auto source component specs did not produce any component", + ): + # Test with one path known to contain a trace and one path known + # to not contain any trace. + bt2.TraceCollectionMessageIterator( + [_SEQUENCE_TRACE_PATH, "/this/path/better/not/exist"] + ) + + +class _TestAutoDiscoverSourceComponentSpecs(unittest.TestCase): + def setUp(self): + self._saved_babeltrace_plugin_path = os.environ["BABELTRACE_PLUGIN_PATH"] + os.environ["BABELTRACE_PLUGIN_PATH"] += os.pathsep + self._plugin_path + + def tearDown(self): + os.environ["BABELTRACE_PLUGIN_PATH"] = self._saved_babeltrace_plugin_path + + +class TestAutoDiscoverSourceComponentSpecsGrouping( + _TestAutoDiscoverSourceComponentSpecs +): + _plugin_path = _AUTO_SOURCE_DISCOVERY_GROUPING_PATH + + def test_grouping(self): + specs = [ + bt2.AutoSourceComponentSpec("ABCDE"), + bt2.AutoSourceComponentSpec(_AUTO_SOURCE_DISCOVERY_GROUPING_PATH), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 8) + + self.assertEqual(msgs[0].stream.name, "TestSourceABCDE: ABCDE") + self.assertEqual(msgs[1].stream.name, "TestSourceExt: aaa1, aaa2, aaa3") + self.assertEqual(msgs[2].stream.name, "TestSourceExt: bbb1, bbb2") + self.assertEqual(msgs[3].stream.name, "TestSourceExt: ccc1") + self.assertEqual(msgs[4].stream.name, "TestSourceExt: ccc2") + self.assertEqual(msgs[5].stream.name, "TestSourceExt: ccc3") + self.assertEqual(msgs[6].stream.name, "TestSourceExt: ccc4") + self.assertEqual(msgs[7].stream.name, "TestSourceSomeDir: some-dir") + + +class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel( + _TestAutoDiscoverSourceComponentSpecs +): + _plugin_path = _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH + + _dir_a = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, "dir-a") + _dir_b = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, "dir-b") + _dir_ab = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, "dir-ab") + + def _test_two_comps_from_one_spec(self, params, obj=None, logging_level=None): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_ab, params=params, obj=obj, logging_level=logging_level + ) + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + + return msgs + + def test_params_two_comps_from_one_spec(self): + msgs = self._test_two_comps_from_one_spec( + params={"test-allo": "madame", "what": "test-params"} + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: ('test-allo', 'madame')") + self.assertEqual(msgs[1].stream.name, "TestSourceB: ('test-allo', 'madame')") + + def test_obj_two_comps_from_one_spec(self): + msgs = self._test_two_comps_from_one_spec( + params={"what": "python-obj"}, obj="deore" + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") + self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") + + def test_log_level_two_comps_from_one_spec(self): + msgs = self._test_two_comps_from_one_spec( + params={"what": "log-level"}, logging_level=bt2.LoggingLevel.DEBUG + ) + + self.assertEqual( + msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.DEBUG) + ) + self.assertEqual( + msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.DEBUG) + ) + + def _test_two_comps_from_two_specs( + self, + params_a=None, + params_b=None, + obj_a=None, + obj_b=None, + logging_level_a=None, + logging_level_b=None, + ): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a + ), + bt2.AutoSourceComponentSpec( + self._dir_b, params=params_b, obj=obj_b, logging_level=logging_level_b + ), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + + return msgs + + def test_params_two_comps_from_two_specs(self): + msgs = self._test_two_comps_from_two_specs( + params_a={"test-allo": "madame", "what": "test-params"}, + params_b={"test-bonjour": "monsieur", "what": "test-params"}, + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: ('test-allo', 'madame')") + self.assertEqual( + msgs[1].stream.name, "TestSourceB: ('test-bonjour', 'monsieur')" + ) + + def test_obj_two_comps_from_two_specs(self): + msgs = self._test_two_comps_from_two_specs( + params_a={"what": "python-obj"}, + params_b={"what": "python-obj"}, + obj_a="deore", + obj_b="alivio", + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") + self.assertEqual(msgs[1].stream.name, "TestSourceB: alivio") + + def test_log_level_two_comps_from_two_specs(self): + msgs = self._test_two_comps_from_two_specs( + params_a={"what": "log-level"}, + params_b={"what": "log-level"}, + logging_level_a=bt2.LoggingLevel.DEBUG, + logging_level_b=bt2.LoggingLevel.TRACE, + ) + + self.assertEqual( + msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.DEBUG) + ) + self.assertEqual( + msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.TRACE) + ) + + def _test_one_comp_from_one_spec_one_comp_from_both_1( + self, + params_a=None, + params_ab=None, + obj_a=None, + obj_ab=None, + logging_level_a=None, + logging_level_ab=None, + ): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a + ), + bt2.AutoSourceComponentSpec( + self._dir_ab, + params=params_ab, + obj=obj_ab, + logging_level=logging_level_ab, + ), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + + return msgs + + def test_params_one_comp_from_one_spec_one_comp_from_both_1(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( + params_a={"test-allo": "madame", "what": "test-params"}, + params_ab={"test-bonjour": "monsieur", "what": "test-params"}, + ) + + self.assertEqual( + msgs[0].stream.name, + "TestSourceA: ('test-allo', 'madame'), ('test-bonjour', 'monsieur')", + ) + self.assertEqual( + msgs[1].stream.name, "TestSourceB: ('test-bonjour', 'monsieur')" + ) + + def test_obj_one_comp_from_one_spec_one_comp_from_both_1(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( + params_a={"what": "python-obj"}, + params_ab={"what": "python-obj"}, + obj_a="deore", + obj_ab="alivio", + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: alivio") + self.assertEqual(msgs[1].stream.name, "TestSourceB: alivio") + + def test_log_level_one_comp_from_one_spec_one_comp_from_both_1(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( + params_a={"what": "log-level"}, + params_ab={"what": "log-level"}, + logging_level_a=bt2.LoggingLevel.DEBUG, + logging_level_ab=bt2.LoggingLevel.TRACE, + ) + + self.assertEqual( + msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.TRACE) + ) + self.assertEqual( + msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.TRACE) + ) + + def _test_one_comp_from_one_spec_one_comp_from_both_2( + self, + params_ab=None, + params_a=None, + obj_ab=None, + obj_a=None, + logging_level_ab=None, + logging_level_a=None, + ): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_ab, + params=params_ab, + obj=obj_ab, + logging_level=logging_level_ab, + ), + bt2.AutoSourceComponentSpec( + self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a + ), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + + return msgs + + def test_params_one_comp_from_one_spec_one_comp_from_both_2(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( + params_ab={ + "test-bonjour": "madame", + "test-salut": "les amis", + "what": "test-params", + }, + params_a={"test-bonjour": "monsieur", "what": "test-params"}, + ) + + self.assertEqual( + msgs[0].stream.name, + "TestSourceA: ('test-bonjour', 'monsieur'), ('test-salut', 'les amis')", + ) + self.assertEqual( + msgs[1].stream.name, + "TestSourceB: ('test-bonjour', 'madame'), ('test-salut', 'les amis')", + ) + + def test_obj_one_comp_from_one_spec_one_comp_from_both_2(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( + params_ab={"what": "python-obj"}, + params_a={"what": "python-obj"}, + obj_ab="deore", + obj_a="alivio", + ) + + self.assertEqual(msgs[0].stream.name, "TestSourceA: alivio") + self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") + + def test_log_level_one_comp_from_one_spec_one_comp_from_both_2(self): + msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( + params_ab={"what": "log-level"}, + params_a={"what": "log-level"}, + logging_level_ab=bt2.LoggingLevel.DEBUG, + logging_level_a=bt2.LoggingLevel.TRACE, + ) + + self.assertEqual( + msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.TRACE) + ) + self.assertEqual( + msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.DEBUG) + ) + + def test_obj_override_with_none(self): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_ab, params={"what": "python-obj"}, obj="deore" + ), + bt2.AutoSourceComponentSpec( + self._dir_a, params={"what": "python-obj"}, obj=None + ), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + self.assertEqual(msgs[0].stream.name, "TestSourceA: None") + self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") + + def test_obj_no_override_with_no_obj(self): + specs = [ + bt2.AutoSourceComponentSpec( + self._dir_ab, params={"what": "python-obj"}, obj="deore" + ), + bt2.AutoSourceComponentSpec(self._dir_a, params={"what": "python-obj"}), + ] + it = bt2.TraceCollectionMessageIterator(specs) + msgs = [x for x in it if type(x) is bt2._StreamBeginningMessageConst] + + self.assertEqual(len(msgs), 2) + self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") + self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") + + +class TestAutoDiscoverFailures(unittest.TestCase): + def test_metadata_syntax_error(self): + with self.assertRaisesRegex( + bt2._Error, + 'At line 3 in metadata stream: syntax error, unexpected CTF_RSBRAC: token="]"', + ): + specs = [bt2.AutoSourceComponentSpec(_METADATA_SYNTAX_ERROR_TRACE_PATH)] + bt2.TraceCollectionMessageIterator(specs) + + +if __name__ == "__main__": + unittest.main()