Commit | Line | Data |
---|---|---|
d2d857a8 MJ |
1 | # |
2 | # Copyright (C) 2019 EfficiOS Inc. | |
3 | # | |
4 | # This program is free software; you can redistribute it and/or | |
5 | # modify it under the terms of the GNU General Public License | |
6 | # as published by the Free Software Foundation; only version 2 | |
7 | # of the License. | |
8 | # | |
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU General Public License for more details. | |
13 | # | |
14 | # You should have received a copy of the GNU General Public License | |
15 | # along with this program; if not, write to the Free Software | |
16 | # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. | |
17 | # | |
18 | ||
704c2307 PP |
19 | import unittest |
20 | import datetime | |
704c2307 PP |
21 | import bt2 |
22 | import os | |
23 | import os.path | |
24 | ||
25 | ||
f3c9a159 | 26 | _BT_TESTS_DATADIR = os.environ['BT_TESTS_DATADIR'] |
bbff0ab4 | 27 | _BT_CTF_TRACES_PATH = os.environ['BT_CTF_TRACES_PATH'] |
cfbd7cf3 FD |
28 | _3EVENTS_INTERSECT_TRACE_PATH = os.path.join( |
29 | _BT_CTF_TRACES_PATH, 'intersection', '3eventsintersect' | |
30 | ) | |
f3c9a159 SM |
31 | _NOINTERSECT_TRACE_PATH = os.path.join( |
32 | _BT_CTF_TRACES_PATH, 'intersection', 'nointersect' | |
33 | ) | |
34 | _SEQUENCE_TRACE_PATH = os.path.join(_BT_CTF_TRACES_PATH, 'succeed', 'sequence') | |
35 | _AUTO_SOURCE_DISCOVERY_GROUPING_PATH = os.path.join( | |
36 | _BT_TESTS_DATADIR, 'auto-source-discovery', 'grouping' | |
37 | ) | |
38 | _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH = os.path.join( | |
39 | _BT_TESTS_DATADIR, 'auto-source-discovery', 'params-log-level' | |
40 | ) | |
704c2307 PP |
41 | |
42 | ||
3d60267b | 43 | class ComponentSpecTestCase(unittest.TestCase): |
704c2307 | 44 | def test_create_good_no_params(self): |
907f2b70 | 45 | bt2.ComponentSpec('plugin', 'compcls') |
704c2307 PP |
46 | |
47 | def test_create_good_with_params(self): | |
907f2b70 | 48 | bt2.ComponentSpec('plugin', 'compcls', {'salut': 23}) |
704c2307 PP |
49 | |
50 | def test_create_good_with_path_params(self): | |
3d60267b | 51 | spec = bt2.ComponentSpec('plugin', 'compcls', 'a path') |
73760435 | 52 | self.assertEqual(spec.params['inputs'], ['a path']) |
704c2307 PP |
53 | |
54 | def test_create_wrong_plugin_name_type(self): | |
55 | with self.assertRaises(TypeError): | |
907f2b70 | 56 | bt2.ComponentSpec(23, 'compcls') |
704c2307 PP |
57 | |
58 | def test_create_wrong_component_class_name_type(self): | |
59 | with self.assertRaises(TypeError): | |
907f2b70 | 60 | bt2.ComponentSpec('plugin', 190) |
704c2307 PP |
61 | |
62 | def test_create_wrong_params_type(self): | |
63 | with self.assertRaises(TypeError): | |
907f2b70 SM |
64 | bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now()) |
65 | ||
66 | ||
67 | # Return a map, msg type -> number of messages of this type. | |
68 | ||
cfbd7cf3 | 69 | |
907f2b70 SM |
70 | def _count_msgs_by_type(msgs): |
71 | res = {} | |
72 | ||
73 | for msg in msgs: | |
74 | t = type(msg) | |
75 | n = res.get(t, 0) | |
76 | res[t] = n + 1 | |
77 | ||
78 | return res | |
704c2307 PP |
79 | |
80 | ||
5602ef81 | 81 | class TraceCollectionMessageIteratorTestCase(unittest.TestCase): |
704c2307 | 82 | def test_create_wrong_stream_intersection_mode_type(self): |
3d60267b | 83 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
84 | |
85 | with self.assertRaises(TypeError): | |
907f2b70 | 86 | bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23) |
704c2307 PP |
87 | |
88 | def test_create_wrong_begin_type(self): | |
3d60267b | 89 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
90 | |
91 | with self.assertRaises(TypeError): | |
907f2b70 | 92 | bt2.TraceCollectionMessageIterator(specs, begin='hi') |
704c2307 PP |
93 | |
94 | def test_create_wrong_end_type(self): | |
3d60267b | 95 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
96 | |
97 | with self.assertRaises(TypeError): | |
907f2b70 | 98 | bt2.TraceCollectionMessageIterator(specs, begin='lel') |
704c2307 PP |
99 | |
100 | def test_create_no_such_plugin(self): | |
3d60267b | 101 | specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 | 102 | |
ce4923b0 | 103 | with self.assertRaises(ValueError): |
907f2b70 | 104 | bt2.TraceCollectionMessageIterator(specs) |
704c2307 PP |
105 | |
106 | def test_create_begin_s(self): | |
3d60267b | 107 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 | 108 | bt2.TraceCollectionMessageIterator(specs, begin=19457.918232) |
704c2307 PP |
109 | |
110 | def test_create_end_s(self): | |
3d60267b | 111 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 | 112 | bt2.TraceCollectionMessageIterator(specs, end=123.12312) |
704c2307 PP |
113 | |
114 | def test_create_begin_datetime(self): | |
3d60267b | 115 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 | 116 | bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now()) |
704c2307 PP |
117 | |
118 | def test_create_end_datetime(self): | |
3d60267b | 119 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 | 120 | bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now()) |
704c2307 PP |
121 | |
122 | def test_iter_no_intersection(self): | |
3d60267b | 123 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
5602ef81 | 124 | msg_iter = bt2.TraceCollectionMessageIterator(specs) |
907f2b70 | 125 | msgs = list(msg_iter) |
188edac1 | 126 | self.assertEqual(len(msgs), 28) |
907f2b70 | 127 | hist = _count_msgs_by_type(msgs) |
3fb99a22 | 128 | self.assertEqual(hist[bt2._EventMessage], 8) |
704c2307 | 129 | |
907f2b70 | 130 | # Same as the above, but we pass a single spec instead of a spec list. |
3d60267b PP |
131 | def test_iter_specs_not_list(self): |
132 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
907f2b70 SM |
133 | msg_iter = bt2.TraceCollectionMessageIterator(spec) |
134 | msgs = list(msg_iter) | |
188edac1 | 135 | self.assertEqual(len(msgs), 28) |
907f2b70 | 136 | hist = _count_msgs_by_type(msgs) |
3fb99a22 | 137 | self.assertEqual(hist[bt2._EventMessage], 8) |
3d60267b PP |
138 | |
139 | def test_iter_custom_filter(self): | |
140 | src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
cfbd7cf3 | 141 | flt_spec = bt2.ComponentSpec('utils', 'trimmer', {'end': '13515309.000000075'}) |
907f2b70 SM |
142 | msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec) |
143 | hist = _count_msgs_by_type(msg_iter) | |
3fb99a22 | 144 | self.assertEqual(hist[bt2._EventMessage], 5) |
3d60267b | 145 | |
704c2307 | 146 | def test_iter_intersection(self): |
3d60267b | 147 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
cfbd7cf3 FD |
148 | msg_iter = bt2.TraceCollectionMessageIterator( |
149 | specs, stream_intersection_mode=True | |
150 | ) | |
907f2b70 | 151 | msgs = list(msg_iter) |
188edac1 | 152 | self.assertEqual(len(msgs), 15) |
907f2b70 | 153 | hist = _count_msgs_by_type(msgs) |
3fb99a22 | 154 | self.assertEqual(hist[bt2._EventMessage], 3) |
704c2307 | 155 | |
73760435 | 156 | def test_iter_intersection_no_inputs_param(self): |
3d60267b | 157 | specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})] |
704c2307 | 158 | |
ce4923b0 | 159 | with self.assertRaises(ValueError): |
907f2b70 | 160 | bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True) |
704c2307 PP |
161 | |
162 | def test_iter_no_intersection_two_traces(self): | |
3d60267b | 163 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) |
704c2307 | 164 | specs = [spec, spec] |
5602ef81 | 165 | msg_iter = bt2.TraceCollectionMessageIterator(specs) |
907f2b70 | 166 | msgs = list(msg_iter) |
188edac1 | 167 | self.assertEqual(len(msgs), 56) |
907f2b70 | 168 | hist = _count_msgs_by_type(msgs) |
3fb99a22 | 169 | self.assertEqual(hist[bt2._EventMessage], 16) |
704c2307 PP |
170 | |
171 | def test_iter_no_intersection_begin(self): | |
3d60267b | 172 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 SM |
173 | msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=13515309.000000023) |
174 | hist = _count_msgs_by_type(msg_iter) | |
3fb99a22 | 175 | self.assertEqual(hist[bt2._EventMessage], 6) |
704c2307 PP |
176 | |
177 | def test_iter_no_intersection_end(self): | |
3d60267b | 178 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
907f2b70 SM |
179 | msg_iter = bt2.TraceCollectionMessageIterator(specs, end=13515309.000000075) |
180 | hist = _count_msgs_by_type(msg_iter) | |
3fb99a22 | 181 | self.assertEqual(hist[bt2._EventMessage], 5) |
f3c9a159 SM |
182 | |
183 | def test_iter_auto_source_component_spec(self): | |
184 | specs = [bt2.AutoSourceComponentSpec(_3EVENTS_INTERSECT_TRACE_PATH)] | |
185 | msg_iter = bt2.TraceCollectionMessageIterator(specs) | |
186 | msgs = list(msg_iter) | |
187 | self.assertEqual(len(msgs), 28) | |
188 | hist = _count_msgs_by_type(msgs) | |
189 | self.assertEqual(hist[bt2._EventMessage], 8) | |
190 | ||
191 | def test_iter_auto_source_component_spec_list_of_strings(self): | |
192 | msg_iter = bt2.TraceCollectionMessageIterator([_3EVENTS_INTERSECT_TRACE_PATH]) | |
193 | msgs = list(msg_iter) | |
194 | self.assertEqual(len(msgs), 28) | |
195 | hist = _count_msgs_by_type(msgs) | |
196 | self.assertEqual(hist[bt2._EventMessage], 8) | |
197 | ||
198 | def test_iter_auto_source_component_spec_string(self): | |
199 | msg_iter = bt2.TraceCollectionMessageIterator(_3EVENTS_INTERSECT_TRACE_PATH) | |
200 | msgs = list(msg_iter) | |
201 | self.assertEqual(len(msgs), 28) | |
202 | hist = _count_msgs_by_type(msgs) | |
203 | self.assertEqual(hist[bt2._EventMessage], 8) | |
204 | ||
205 | def test_iter_mixed_inputs(self): | |
206 | msg_iter = bt2.TraceCollectionMessageIterator( | |
207 | [ | |
208 | _3EVENTS_INTERSECT_TRACE_PATH, | |
209 | bt2.AutoSourceComponentSpec(_SEQUENCE_TRACE_PATH), | |
210 | bt2.ComponentSpec('ctf', 'fs', _NOINTERSECT_TRACE_PATH), | |
211 | ] | |
212 | ) | |
213 | msgs = list(msg_iter) | |
214 | self.assertEqual(len(msgs), 76) | |
215 | hist = _count_msgs_by_type(msgs) | |
216 | self.assertEqual(hist[bt2._EventMessage], 24) | |
217 | ||
39b351f9 SM |
218 | def test_auto_source_component_non_existent(self): |
219 | with self.assertRaisesRegex( | |
220 | RuntimeError, | |
221 | 'Some auto source component specs did not produce any component', | |
222 | ): | |
223 | # Test with one path known to contain a trace and one path known | |
224 | # to not contain any trace. | |
225 | bt2.TraceCollectionMessageIterator( | |
226 | [_SEQUENCE_TRACE_PATH, '/this/path/better/not/exist'] | |
227 | ) | |
228 | ||
f3c9a159 SM |
229 | |
230 | class _TestAutoDiscoverSourceComponentSpecs(unittest.TestCase): | |
231 | def setUp(self): | |
232 | self._saved_babeltrace_plugin_path = os.environ['BABELTRACE_PLUGIN_PATH'] | |
53487b4e | 233 | os.environ['BABELTRACE_PLUGIN_PATH'] += os.pathsep + self._plugin_path |
f3c9a159 SM |
234 | |
235 | def tearDown(self): | |
236 | os.environ['BABELTRACE_PLUGIN_PATH'] = self._saved_babeltrace_plugin_path | |
237 | ||
238 | ||
239 | class TestAutoDiscoverSourceComponentSpecsGrouping( | |
240 | _TestAutoDiscoverSourceComponentSpecs | |
241 | ): | |
242 | _plugin_path = _AUTO_SOURCE_DISCOVERY_GROUPING_PATH | |
243 | ||
244 | def test_grouping(self): | |
245 | specs = [ | |
246 | bt2.AutoSourceComponentSpec('ABCDE'), | |
247 | bt2.AutoSourceComponentSpec(_AUTO_SOURCE_DISCOVERY_GROUPING_PATH), | |
f3c9a159 SM |
248 | ] |
249 | it = bt2.TraceCollectionMessageIterator(specs) | |
250 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
251 | ||
252 | self.assertEqual(len(msgs), 8) | |
253 | ||
254 | self.assertEqual(msgs[0].stream.name, 'TestSourceABCDE: ABCDE') | |
255 | self.assertEqual(msgs[1].stream.name, 'TestSourceExt: aaa1, aaa2, aaa3') | |
256 | self.assertEqual(msgs[2].stream.name, 'TestSourceExt: bbb1, bbb2') | |
257 | self.assertEqual(msgs[3].stream.name, 'TestSourceExt: ccc1') | |
258 | self.assertEqual(msgs[4].stream.name, 'TestSourceExt: ccc2') | |
259 | self.assertEqual(msgs[5].stream.name, 'TestSourceExt: ccc3') | |
260 | self.assertEqual(msgs[6].stream.name, 'TestSourceExt: ccc4') | |
261 | self.assertEqual(msgs[7].stream.name, 'TestSourceSomeDir: some-dir') | |
262 | ||
263 | ||
264 | class TestAutoDiscoverSourceComponentSpecsParamsObjLogLevel( | |
265 | _TestAutoDiscoverSourceComponentSpecs | |
266 | ): | |
267 | _plugin_path = _AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH | |
268 | ||
269 | _dir_a = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, 'dir-a') | |
270 | _dir_b = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, 'dir-b') | |
271 | _dir_ab = os.path.join(_AUTO_SOURCE_DISCOVERY_PARAMS_LOG_LEVEL_PATH, 'dir-ab') | |
272 | ||
273 | def _test_two_comps_from_one_spec(self, params, obj=None, logging_level=None): | |
274 | specs = [ | |
275 | bt2.AutoSourceComponentSpec( | |
276 | self._dir_ab, params=params, obj=obj, logging_level=logging_level | |
277 | ) | |
278 | ] | |
279 | it = bt2.TraceCollectionMessageIterator(specs) | |
280 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
281 | ||
282 | self.assertEqual(len(msgs), 2) | |
283 | ||
284 | return msgs | |
285 | ||
286 | def test_params_two_comps_from_one_spec(self): | |
287 | msgs = self._test_two_comps_from_one_spec( | |
288 | params={'test-allo': 'madame', 'what': 'test-params'} | |
289 | ) | |
290 | ||
291 | self.assertEqual(msgs[0].stream.name, "TestSourceA: ('test-allo', 'madame')") | |
292 | self.assertEqual(msgs[1].stream.name, "TestSourceB: ('test-allo', 'madame')") | |
293 | ||
294 | def test_obj_two_comps_from_one_spec(self): | |
295 | msgs = self._test_two_comps_from_one_spec( | |
296 | params={'what': 'python-obj'}, obj='deore' | |
297 | ) | |
298 | ||
299 | self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") | |
300 | self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") | |
301 | ||
302 | def test_log_level_two_comps_from_one_spec(self): | |
303 | msgs = self._test_two_comps_from_one_spec( | |
304 | params={'what': 'log-level'}, logging_level=bt2.LoggingLevel.DEBUG | |
305 | ) | |
306 | ||
307 | self.assertEqual( | |
308 | msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.DEBUG) | |
309 | ) | |
310 | self.assertEqual( | |
311 | msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.DEBUG) | |
312 | ) | |
313 | ||
314 | def _test_two_comps_from_two_specs( | |
315 | self, | |
316 | params_a=None, | |
317 | params_b=None, | |
318 | obj_a=None, | |
319 | obj_b=None, | |
320 | logging_level_a=None, | |
321 | logging_level_b=None, | |
322 | ): | |
323 | specs = [ | |
324 | bt2.AutoSourceComponentSpec( | |
325 | self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a | |
326 | ), | |
327 | bt2.AutoSourceComponentSpec( | |
328 | self._dir_b, params=params_b, obj=obj_b, logging_level=logging_level_b | |
329 | ), | |
330 | ] | |
331 | it = bt2.TraceCollectionMessageIterator(specs) | |
332 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
333 | ||
334 | self.assertEqual(len(msgs), 2) | |
335 | ||
336 | return msgs | |
337 | ||
338 | def test_params_two_comps_from_two_specs(self): | |
339 | msgs = self._test_two_comps_from_two_specs( | |
340 | params_a={'test-allo': 'madame', 'what': 'test-params'}, | |
341 | params_b={'test-bonjour': 'monsieur', 'what': 'test-params'}, | |
342 | ) | |
343 | ||
344 | self.assertEqual(msgs[0].stream.name, "TestSourceA: ('test-allo', 'madame')") | |
345 | self.assertEqual( | |
346 | msgs[1].stream.name, "TestSourceB: ('test-bonjour', 'monsieur')" | |
347 | ) | |
348 | ||
349 | def test_obj_two_comps_from_two_specs(self): | |
350 | msgs = self._test_two_comps_from_two_specs( | |
351 | params_a={'what': 'python-obj'}, | |
352 | params_b={'what': 'python-obj'}, | |
353 | obj_a='deore', | |
354 | obj_b='alivio', | |
355 | ) | |
356 | ||
357 | self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") | |
358 | self.assertEqual(msgs[1].stream.name, "TestSourceB: alivio") | |
359 | ||
360 | def test_log_level_two_comps_from_two_specs(self): | |
361 | msgs = self._test_two_comps_from_two_specs( | |
362 | params_a={'what': 'log-level'}, | |
363 | params_b={'what': 'log-level'}, | |
364 | logging_level_a=bt2.LoggingLevel.DEBUG, | |
365 | logging_level_b=bt2.LoggingLevel.TRACE, | |
366 | ) | |
367 | ||
368 | self.assertEqual( | |
369 | msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.DEBUG) | |
370 | ) | |
371 | self.assertEqual( | |
372 | msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.TRACE) | |
373 | ) | |
374 | ||
375 | def _test_one_comp_from_one_spec_one_comp_from_both_1( | |
376 | self, | |
377 | params_a=None, | |
378 | params_ab=None, | |
379 | obj_a=None, | |
380 | obj_ab=None, | |
381 | logging_level_a=None, | |
382 | logging_level_ab=None, | |
383 | ): | |
384 | specs = [ | |
385 | bt2.AutoSourceComponentSpec( | |
386 | self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a | |
387 | ), | |
388 | bt2.AutoSourceComponentSpec( | |
389 | self._dir_ab, | |
390 | params=params_ab, | |
391 | obj=obj_ab, | |
392 | logging_level=logging_level_ab, | |
393 | ), | |
394 | ] | |
395 | it = bt2.TraceCollectionMessageIterator(specs) | |
396 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
397 | ||
398 | self.assertEqual(len(msgs), 2) | |
399 | ||
400 | return msgs | |
401 | ||
402 | def test_params_one_comp_from_one_spec_one_comp_from_both_1(self): | |
403 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( | |
404 | params_a={'test-allo': 'madame', 'what': 'test-params'}, | |
405 | params_ab={'test-bonjour': 'monsieur', 'what': 'test-params'}, | |
406 | ) | |
407 | ||
408 | self.assertEqual( | |
409 | msgs[0].stream.name, | |
410 | "TestSourceA: ('test-allo', 'madame'), ('test-bonjour', 'monsieur')", | |
411 | ) | |
412 | self.assertEqual( | |
413 | msgs[1].stream.name, "TestSourceB: ('test-bonjour', 'monsieur')" | |
414 | ) | |
415 | ||
416 | def test_obj_one_comp_from_one_spec_one_comp_from_both_1(self): | |
417 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( | |
418 | params_a={'what': 'python-obj'}, | |
419 | params_ab={'what': 'python-obj'}, | |
420 | obj_a='deore', | |
421 | obj_ab='alivio', | |
422 | ) | |
423 | ||
424 | self.assertEqual(msgs[0].stream.name, "TestSourceA: alivio") | |
425 | self.assertEqual(msgs[1].stream.name, "TestSourceB: alivio") | |
426 | ||
427 | def test_log_level_one_comp_from_one_spec_one_comp_from_both_1(self): | |
428 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_1( | |
429 | params_a={'what': 'log-level'}, | |
430 | params_ab={'what': 'log-level'}, | |
431 | logging_level_a=bt2.LoggingLevel.DEBUG, | |
432 | logging_level_ab=bt2.LoggingLevel.TRACE, | |
433 | ) | |
434 | ||
435 | self.assertEqual( | |
436 | msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.TRACE) | |
437 | ) | |
438 | self.assertEqual( | |
439 | msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.TRACE) | |
440 | ) | |
441 | ||
442 | def _test_one_comp_from_one_spec_one_comp_from_both_2( | |
443 | self, | |
444 | params_ab=None, | |
445 | params_a=None, | |
446 | obj_ab=None, | |
447 | obj_a=None, | |
448 | logging_level_ab=None, | |
449 | logging_level_a=None, | |
450 | ): | |
451 | specs = [ | |
452 | bt2.AutoSourceComponentSpec( | |
453 | self._dir_ab, | |
454 | params=params_ab, | |
455 | obj=obj_ab, | |
456 | logging_level=logging_level_ab, | |
457 | ), | |
458 | bt2.AutoSourceComponentSpec( | |
459 | self._dir_a, params=params_a, obj=obj_a, logging_level=logging_level_a | |
460 | ), | |
461 | ] | |
462 | it = bt2.TraceCollectionMessageIterator(specs) | |
463 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
464 | ||
465 | self.assertEqual(len(msgs), 2) | |
466 | ||
467 | return msgs | |
468 | ||
469 | def test_params_one_comp_from_one_spec_one_comp_from_both_2(self): | |
470 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( | |
471 | params_ab={ | |
472 | 'test-bonjour': 'madame', | |
473 | 'test-salut': 'les amis', | |
474 | 'what': 'test-params', | |
475 | }, | |
476 | params_a={'test-bonjour': 'monsieur', 'what': 'test-params'}, | |
477 | ) | |
478 | ||
479 | self.assertEqual( | |
480 | msgs[0].stream.name, | |
481 | "TestSourceA: ('test-bonjour', 'monsieur'), ('test-salut', 'les amis')", | |
482 | ) | |
483 | self.assertEqual( | |
484 | msgs[1].stream.name, | |
485 | "TestSourceB: ('test-bonjour', 'madame'), ('test-salut', 'les amis')", | |
486 | ) | |
487 | ||
488 | def test_obj_one_comp_from_one_spec_one_comp_from_both_2(self): | |
489 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( | |
490 | params_ab={'what': 'python-obj'}, | |
491 | params_a={'what': 'python-obj'}, | |
492 | obj_ab='deore', | |
493 | obj_a='alivio', | |
494 | ) | |
495 | ||
496 | self.assertEqual(msgs[0].stream.name, "TestSourceA: alivio") | |
497 | self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") | |
498 | ||
499 | def test_log_level_one_comp_from_one_spec_one_comp_from_both_2(self): | |
500 | msgs = self._test_one_comp_from_one_spec_one_comp_from_both_2( | |
501 | params_ab={'what': 'log-level'}, | |
502 | params_a={'what': 'log-level'}, | |
503 | logging_level_ab=bt2.LoggingLevel.DEBUG, | |
504 | logging_level_a=bt2.LoggingLevel.TRACE, | |
505 | ) | |
506 | ||
507 | self.assertEqual( | |
508 | msgs[0].stream.name, "TestSourceA: {}".format(bt2.LoggingLevel.TRACE) | |
509 | ) | |
510 | self.assertEqual( | |
511 | msgs[1].stream.name, "TestSourceB: {}".format(bt2.LoggingLevel.DEBUG) | |
512 | ) | |
513 | ||
514 | def test_obj_override_with_none(self): | |
515 | specs = [ | |
516 | bt2.AutoSourceComponentSpec( | |
517 | self._dir_ab, params={'what': 'python-obj'}, obj='deore' | |
518 | ), | |
519 | bt2.AutoSourceComponentSpec( | |
520 | self._dir_a, params={'what': 'python-obj'}, obj=None | |
521 | ), | |
522 | ] | |
523 | it = bt2.TraceCollectionMessageIterator(specs) | |
524 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
525 | ||
526 | self.assertEqual(len(msgs), 2) | |
527 | self.assertEqual(msgs[0].stream.name, "TestSourceA: None") | |
528 | self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") | |
529 | ||
530 | def test_obj_no_override_with_no_obj(self): | |
531 | specs = [ | |
532 | bt2.AutoSourceComponentSpec( | |
533 | self._dir_ab, params={'what': 'python-obj'}, obj='deore' | |
534 | ), | |
535 | bt2.AutoSourceComponentSpec(self._dir_a, params={'what': 'python-obj'}), | |
536 | ] | |
537 | it = bt2.TraceCollectionMessageIterator(specs) | |
538 | msgs = [x for x in it if type(x) is bt2._StreamBeginningMessage] | |
539 | ||
540 | self.assertEqual(len(msgs), 2) | |
541 | self.assertEqual(msgs[0].stream.name, "TestSourceA: deore") | |
542 | self.assertEqual(msgs[1].stream.name, "TestSourceB: deore") | |
543 | ||
544 | ||
545 | if __name__ == '__main__': | |
546 | unittest.main() |