2 # Copyright (C) 2019 EfficiOS Inc.
4 # This program is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License
6 # as published by the Free Software Foundation; only version 2
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software
16 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26 _BT_TESTS_DATADIR
= os
.environ
['BT_TESTS_DATADIR']
27 _BT_CTF_TRACES_PATH
= os
.environ
['BT_CTF_TRACES_PATH']
28 _3EVENTS_INTERSECT_TRACE_PATH
= os
.path
.join(
29 _BT_CTF_TRACES_PATH
, 'intersection', '3eventsintersect'
31 _NOINTERSECT_TRACE_PATH
= os
.path
.join(
32 _BT_CTF_TRACES_PATH
, 'intersection', 'nointersect'
34 _SEQUENCE_TRACE_PATH
= os
.path
.join(_BT_CTF_TRACES_PATH
, 'succeed', 'sequence')
35 _AUTO_SOURCE_DISCOVERY_GROUPING_PATH
= os
.path
.join(
36 _BT_TESTS_DATADIR
, 'auto-source-discovery', 'grouping'
38 _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
= os
.path
.join(
39 _BT_TESTS_DATADIR
, 'auto-source-discovery', 'params-log-level'
43 class ComponentSpecTestCase(unittest
.TestCase
):
44 def test_create_good_no_params(self
):
45 bt2
.ComponentSpec('plugin', 'compcls')
47 def test_create_good_with_params(self
):
48 bt2
.ComponentSpec('plugin', 'compcls', {'salut': 23})
50 def test_create_good_with_path_params(self
):
51 spec
= bt2
.ComponentSpec('plugin', 'compcls', 'a path')
52 self
.assertEqual(spec
.params
['inputs'], ['a path'])
54 def test_create_wrong_plugin_name_type(self
):
55 with self
.assertRaises(TypeError):
56 bt2
.ComponentSpec(23, 'compcls')
58 def test_create_wrong_component_class_name_type(self
):
59 with self
.assertRaises(TypeError):
60 bt2
.ComponentSpec('plugin', 190)
62 def test_create_wrong_params_type(self
):
63 with self
.assertRaises(TypeError):
64 bt2
.ComponentSpec('dwdw', 'compcls', datetime
.datetime
.now())
67 # Return a map, msg type -> number of messages of this type.
70 def _count_msgs_by_type(msgs
):
81 class TraceCollectionMessageIteratorTestCase(unittest
.TestCase
):
82 def test_create_wrong_stream_intersection_mode_type(self
):
83 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
85 with self
.assertRaises(TypeError):
86 bt2
.TraceCollectionMessageIterator(specs
, stream_intersection_mode
=23)
88 def test_create_wrong_begin_type(self
):
89 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
91 with self
.assertRaises(TypeError):
92 bt2
.TraceCollectionMessageIterator(specs
, begin
='hi')
94 def test_create_wrong_end_type(self
):
95 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
97 with self
.assertRaises(TypeError):
98 bt2
.TraceCollectionMessageIterator(specs
, begin
='lel')
100 def test_create_no_such_plugin(self
):
101 specs
= [bt2
.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH
)]
103 with self
.assertRaises(ValueError):
104 bt2
.TraceCollectionMessageIterator(specs
)
106 def test_create_begin_s(self
):
107 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
108 bt2
.TraceCollectionMessageIterator(specs
, begin
=19457.918232)
110 def test_create_end_s(self
):
111 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
112 bt2
.TraceCollectionMessageIterator(specs
, end
=123.12312)
114 def test_create_begin_datetime(self
):
115 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
116 bt2
.TraceCollectionMessageIterator(specs
, begin
=datetime
.datetime
.now())
118 def test_create_end_datetime(self
):
119 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
120 bt2
.TraceCollectionMessageIterator(specs
, end
=datetime
.datetime
.now())
122 def test_iter_no_intersection(self
):
123 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
124 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
125 msgs
= list(msg_iter
)
126 self
.assertEqual(len(msgs
), 28)
127 hist
= _count_msgs_by_type(msgs
)
128 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
130 # Same as the above, but we pass a single spec instead of a spec list.
131 def test_iter_specs_not_list(self
):
132 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
133 msg_iter
= bt2
.TraceCollectionMessageIterator(spec
)
134 msgs
= list(msg_iter
)
135 self
.assertEqual(len(msgs
), 28)
136 hist
= _count_msgs_by_type(msgs
)
137 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
139 def test_iter_custom_filter(self
):
140 src_spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
141 flt_spec
= bt2
.ComponentSpec('utils', 'trimmer', {'end': '13515309.000000075'})
142 msg_iter
= bt2
.TraceCollectionMessageIterator(src_spec
, flt_spec
)
143 hist
= _count_msgs_by_type(msg_iter
)
144 self
.assertEqual(hist
[bt2
._EventMessage
], 5)
146 def test_iter_intersection(self
):
147 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
148 msg_iter
= bt2
.TraceCollectionMessageIterator(
149 specs
, stream_intersection_mode
=True
151 msgs
= list(msg_iter
)
152 self
.assertEqual(len(msgs
), 15)
153 hist
= _count_msgs_by_type(msgs
)
154 self
.assertEqual(hist
[bt2
._EventMessage
], 3)
156 def test_iter_intersection_no_inputs_param(self
):
157 specs
= [bt2
.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
159 with self
.assertRaises(ValueError):
160 bt2
.TraceCollectionMessageIterator(specs
, stream_intersection_mode
=True)
162 def test_iter_no_intersection_two_traces(self
):
163 spec
= bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)
165 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
166 msgs
= list(msg_iter
)
167 self
.assertEqual(len(msgs
), 56)
168 hist
= _count_msgs_by_type(msgs
)
169 self
.assertEqual(hist
[bt2
._EventMessage
], 16)
171 def test_iter_no_intersection_begin(self
):
172 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
173 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
, begin
=13515309.000000023)
174 hist
= _count_msgs_by_type(msg_iter
)
175 self
.assertEqual(hist
[bt2
._EventMessage
], 6)
177 def test_iter_no_intersection_end(self
):
178 specs
= [bt2
.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH
)]
179 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
, end
=13515309.000000075)
180 hist
= _count_msgs_by_type(msg_iter
)
181 self
.assertEqual(hist
[bt2
._EventMessage
], 5)
183 def test_iter_auto_source_component_spec(self
):
184 specs
= [bt2
.AutoSourceComponentSpec(_3EVENTS_INTERSECT_TRACE_PATH
)]
185 msg_iter
= bt2
.TraceCollectionMessageIterator(specs
)
186 msgs
= list(msg_iter
)
187 self
.assertEqual(len(msgs
), 28)
188 hist
= _count_msgs_by_type(msgs
)
189 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
191 def test_iter_auto_source_component_spec_list_of_strings(self
):
192 msg_iter
= bt2
.TraceCollectionMessageIterator([_3EVENTS_INTERSECT_TRACE_PATH
])
193 msgs
= list(msg_iter
)
194 self
.assertEqual(len(msgs
), 28)
195 hist
= _count_msgs_by_type(msgs
)
196 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
198 def test_iter_auto_source_component_spec_string(self
):
199 msg_iter
= bt2
.TraceCollectionMessageIterator(_3EVENTS_INTERSECT_TRACE_PATH
)
200 msgs
= list(msg_iter
)
201 self
.assertEqual(len(msgs
), 28)
202 hist
= _count_msgs_by_type(msgs
)
203 self
.assertEqual(hist
[bt2
._EventMessage
], 8)
205 def test_iter_mixed_inputs(self
):
206 msg_iter
= bt2
.TraceCollectionMessageIterator(
208 _3EVENTS_INTERSECT_TRACE_PATH
,
209 bt2
.AutoSourceComponentSpec(_SEQUENCE_TRACE_PATH
),
210 bt2
.ComponentSpec('ctf', 'fs', _NOINTERSECT_TRACE_PATH
),
213 msgs
= list(msg_iter
)
214 self
.assertEqual(len(msgs
), 76)
215 hist
= _count_msgs_by_type(msgs
)
216 self
.assertEqual(hist
[bt2
._EventMessage
], 24)
218 def test_auto_source_component_non_existent(self
):
219 with self
.assertRaisesRegex(
221 'Some auto source component specs did not produce any component',
223 # Test with one path known to contain a trace and one path known
224 # to not contain any trace.
225 bt2
.TraceCollectionMessageIterator(
226 [_SEQUENCE_TRACE_PATH
, '/this/path/better/not/exist']
230 class _TestAutoDiscoverSourceComponentSpecs(unittest
.TestCase
):
232 self
._saved
_babeltrace
_plugin
_path
= os
.environ
['BABELTRACE_PLUGIN_PATH']
233 os
.environ
['BABELTRACE_PLUGIN_PATH'] += os
.pathsep
+ self
._plugin
_path
236 os
.environ
['BABELTRACE_PLUGIN_PATH'] = self
._saved
_babeltrace
_plugin
_path
239 class TestAutoDiscoverSourceComponentSpecsGrouping(
240 _TestAutoDiscoverSourceComponentSpecs
242 _plugin_path
= _AUTO_SOURCE_DISCOVERY_GROUPING_PATH
244 def test_grouping(self
):
246 bt2
.AutoSourceComponentSpec('ABCDE'),
247 bt2
.AutoSourceComponentSpec(_AUTO_SOURCE_DISCOVERY_GROUPING_PATH
),
249 it
= bt2
.TraceCollectionMessageIterator(specs
)
250 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
252 self
.assertEqual(len(msgs
), 8)
254 self
.assertEqual(msgs
[0].stream
.name
, 'TestSourceABCDE: ABCDE')
255 self
.assertEqual(msgs
[1].stream
.name
, 'TestSourceExt: aaa1, aaa2, aaa3')
256 self
.assertEqual(msgs
[2].stream
.name
, 'TestSourceExt: bbb1, bbb2')
257 self
.assertEqual(msgs
[3].stream
.name
, 'TestSourceExt: ccc1')
258 self
.assertEqual(msgs
[4].stream
.name
, 'TestSourceExt: ccc2')
259 self
.assertEqual(msgs
[5].stream
.name
, 'TestSourceExt: ccc3')
260 self
.assertEqual(msgs
[6].stream
.name
, 'TestSourceExt: ccc4')
261 self
.assertEqual(msgs
[7].stream
.name
, 'TestSourceSomeDir: some-dir')
264 class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel(
265 _TestAutoDiscoverSourceComponentSpecs
267 _plugin_path
= _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
269 _dir_a
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-a')
270 _dir_b
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-b')
271 _dir_ab
= os
.path
.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH
, 'dir-ab')
273 def _test_two_comps_from_one_spec(self
, params
, obj
=None, logging_level
=None):
275 bt2
.AutoSourceComponentSpec(
276 self
._dir
_ab
, params
=params
, obj
=obj
, logging_level
=logging_level
279 it
= bt2
.TraceCollectionMessageIterator(specs
)
280 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
282 self
.assertEqual(len(msgs
), 2)
286 def test_params_two_comps_from_one_spec(self
):
287 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
288 params
={'test-allo': 'madame', 'what': 'test-params'}
291 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: ('test-allo', 'madame')")
292 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: ('test-allo', 'madame')")
294 def test_obj_two_comps_from_one_spec(self
):
295 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
296 params
={'what': 'python-obj'}, obj
='deore'
299 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
300 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
302 def test_log_level_two_comps_from_one_spec(self
):
303 msgs
= self
._test
_two
_comps
_from
_one
_spec
(
304 params
={'what': 'log-level'}, logging_level
=bt2
.LoggingLevel
.DEBUG
308 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.DEBUG
)
311 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.DEBUG
)
314 def _test_two_comps_from_two_specs(
320 logging_level_a
=None,
321 logging_level_b
=None,
324 bt2
.AutoSourceComponentSpec(
325 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
327 bt2
.AutoSourceComponentSpec(
328 self
._dir
_b
, params
=params_b
, obj
=obj_b
, logging_level
=logging_level_b
331 it
= bt2
.TraceCollectionMessageIterator(specs
)
332 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
334 self
.assertEqual(len(msgs
), 2)
338 def test_params_two_comps_from_two_specs(self
):
339 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
340 params_a
={'test-allo': 'madame', 'what': 'test-params'},
341 params_b
={'test-bonjour': 'monsieur', 'what': 'test-params'},
344 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: ('test-allo', 'madame')")
346 msgs
[1].stream
.name
, "TestSourceB: ('test-bonjour', 'monsieur')"
349 def test_obj_two_comps_from_two_specs(self
):
350 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
351 params_a
={'what': 'python-obj'},
352 params_b
={'what': 'python-obj'},
357 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
358 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: alivio")
360 def test_log_level_two_comps_from_two_specs(self
):
361 msgs
= self
._test
_two
_comps
_from
_two
_specs
(
362 params_a
={'what': 'log-level'},
363 params_b
={'what': 'log-level'},
364 logging_level_a
=bt2
.LoggingLevel
.DEBUG
,
365 logging_level_b
=bt2
.LoggingLevel
.TRACE
,
369 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.DEBUG
)
372 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.TRACE
)
375 def _test_one_comp_from_one_spec_one_comp_from_both_1(
381 logging_level_a
=None,
382 logging_level_ab
=None,
385 bt2
.AutoSourceComponentSpec(
386 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
388 bt2
.AutoSourceComponentSpec(
392 logging_level
=logging_level_ab
,
395 it
= bt2
.TraceCollectionMessageIterator(specs
)
396 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
398 self
.assertEqual(len(msgs
), 2)
402 def test_params_one_comp_from_one_spec_one_comp_from_both_1(self
):
403 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
404 params_a
={'test-allo': 'madame', 'what': 'test-params'},
405 params_ab
={'test-bonjour': 'monsieur', 'what': 'test-params'},
410 "TestSourceA: ('test-allo', 'madame'), ('test-bonjour', 'monsieur')",
413 msgs
[1].stream
.name
, "TestSourceB: ('test-bonjour', 'monsieur')"
416 def test_obj_one_comp_from_one_spec_one_comp_from_both_1(self
):
417 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
418 params_a
={'what': 'python-obj'},
419 params_ab
={'what': 'python-obj'},
424 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: alivio")
425 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: alivio")
427 def test_log_level_one_comp_from_one_spec_one_comp_from_both_1(self
):
428 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_1(
429 params_a
={'what': 'log-level'},
430 params_ab
={'what': 'log-level'},
431 logging_level_a
=bt2
.LoggingLevel
.DEBUG
,
432 logging_level_ab
=bt2
.LoggingLevel
.TRACE
,
436 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.TRACE
)
439 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.TRACE
)
442 def _test_one_comp_from_one_spec_one_comp_from_both_2(
448 logging_level_ab
=None,
449 logging_level_a
=None,
452 bt2
.AutoSourceComponentSpec(
456 logging_level
=logging_level_ab
,
458 bt2
.AutoSourceComponentSpec(
459 self
._dir
_a
, params
=params_a
, obj
=obj_a
, logging_level
=logging_level_a
462 it
= bt2
.TraceCollectionMessageIterator(specs
)
463 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
465 self
.assertEqual(len(msgs
), 2)
469 def test_params_one_comp_from_one_spec_one_comp_from_both_2(self
):
470 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
472 'test-bonjour': 'madame',
473 'test-salut': 'les amis',
474 'what': 'test-params',
476 params_a
={'test-bonjour': 'monsieur', 'what': 'test-params'},
481 "TestSourceA: ('test-bonjour', 'monsieur'), ('test-salut', 'les amis')",
485 "TestSourceB: ('test-bonjour', 'madame'), ('test-salut', 'les amis')",
488 def test_obj_one_comp_from_one_spec_one_comp_from_both_2(self
):
489 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
490 params_ab
={'what': 'python-obj'},
491 params_a
={'what': 'python-obj'},
496 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: alivio")
497 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
499 def test_log_level_one_comp_from_one_spec_one_comp_from_both_2(self
):
500 msgs
= self
._test
_one
_comp
_from
_one
_spec
_one
_comp
_from
_both
_2(
501 params_ab
={'what': 'log-level'},
502 params_a
={'what': 'log-level'},
503 logging_level_ab
=bt2
.LoggingLevel
.DEBUG
,
504 logging_level_a
=bt2
.LoggingLevel
.TRACE
,
508 msgs
[0].stream
.name
, "TestSourceA: {}".format(bt2
.LoggingLevel
.TRACE
)
511 msgs
[1].stream
.name
, "TestSourceB: {}".format(bt2
.LoggingLevel
.DEBUG
)
514 def test_obj_override_with_none(self
):
516 bt2
.AutoSourceComponentSpec(
517 self
._dir
_ab
, params
={'what': 'python-obj'}, obj
='deore'
519 bt2
.AutoSourceComponentSpec(
520 self
._dir
_a
, params
={'what': 'python-obj'}, obj
=None
523 it
= bt2
.TraceCollectionMessageIterator(specs
)
524 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
526 self
.assertEqual(len(msgs
), 2)
527 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: None")
528 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
530 def test_obj_no_override_with_no_obj(self
):
532 bt2
.AutoSourceComponentSpec(
533 self
._dir
_ab
, params
={'what': 'python-obj'}, obj
='deore'
535 bt2
.AutoSourceComponentSpec(self
._dir
_a
, params
={'what': 'python-obj'}),
537 it
= bt2
.TraceCollectionMessageIterator(specs
)
538 msgs
= [x
for x
in it
if type(x
) is bt2
._StreamBeginningMessage
]
540 self
.assertEqual(len(msgs
), 2)
541 self
.assertEqual(msgs
[0].stream
.name
, "TestSourceA: deore")
542 self
.assertEqual(msgs
[1].stream
.name
, "TestSourceB: deore")
545 if __name__
== '__main__':