Commit | Line | Data |
---|---|---|
704c2307 PP |
1 | import unittest |
2 | import datetime | |
3 | import copy | |
4 | import uuid | |
5 | import bt2 | |
6 | import os | |
7 | import os.path | |
8 | ||
9 | ||
10 | _TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH'] | |
11 | _3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH, | |
12 | 'intersection', | |
13 | '3eventsintersect') | |
14 | ||
15 | ||
ddfd1165 | 16 | @unittest.skip("this is broken") |
3d60267b | 17 | class ComponentSpecTestCase(unittest.TestCase): |
704c2307 | 18 | def test_create_good_no_params(self): |
3d60267b | 19 | spec = bt2.ComponentSpec('plugin', 'compcls') |
704c2307 PP |
20 | |
21 | def test_create_good_with_params(self): | |
3d60267b | 22 | spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23}) |
704c2307 PP |
23 | |
24 | def test_create_good_with_path_params(self): | |
3d60267b | 25 | spec = bt2.ComponentSpec('plugin', 'compcls', 'a path') |
704c2307 PP |
26 | self.assertEqual(spec.params['path'], 'a path') |
27 | ||
28 | def test_create_wrong_plugin_name_type(self): | |
29 | with self.assertRaises(TypeError): | |
3d60267b | 30 | spec = bt2.ComponentSpec(23, 'compcls') |
704c2307 PP |
31 | |
32 | def test_create_wrong_component_class_name_type(self): | |
33 | with self.assertRaises(TypeError): | |
3d60267b | 34 | spec = bt2.ComponentSpec('plugin', 190) |
704c2307 PP |
35 | |
36 | def test_create_wrong_params_type(self): | |
37 | with self.assertRaises(TypeError): | |
3d60267b | 38 | spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now()) |
704c2307 PP |
39 | |
40 | ||
ddfd1165 | 41 | @unittest.skip("this is broken") |
704c2307 PP |
42 | class TraceCollectionNotificationIteratorTestCase(unittest.TestCase): |
43 | def test_create_wrong_stream_intersection_mode_type(self): | |
3d60267b | 44 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
45 | |
46 | with self.assertRaises(TypeError): | |
47 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23) | |
48 | ||
49 | def test_create_wrong_begin_type(self): | |
3d60267b | 50 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
51 | |
52 | with self.assertRaises(TypeError): | |
53 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi') | |
54 | ||
55 | def test_create_wrong_end_type(self): | |
3d60267b | 56 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
57 | |
58 | with self.assertRaises(TypeError): | |
59 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel') | |
60 | ||
61 | def test_create_no_such_plugin(self): | |
3d60267b | 62 | specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
63 | |
64 | with self.assertRaises(bt2.Error): | |
65 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) | |
66 | ||
67 | def test_create_begin_s(self): | |
3d60267b | 68 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
69 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232) |
70 | ||
71 | def test_create_end_s(self): | |
3d60267b | 72 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
73 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312) |
74 | ||
75 | def test_create_begin_datetime(self): | |
3d60267b | 76 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
77 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now()) |
78 | ||
79 | def test_create_end_datetime(self): | |
3d60267b | 80 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
81 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now()) |
82 | ||
83 | def test_iter_no_intersection(self): | |
3d60267b | 84 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
85 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) |
86 | self.assertEqual(len(list(notif_iter)), 28) | |
87 | ||
88 | def test_iter_no_intersection_subscribe(self): | |
3d60267b | 89 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
90 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
91 | notification_types=[bt2.EventNotification]) | |
92 | self.assertEqual(len(list(notif_iter)), 8) | |
93 | ||
3d60267b PP |
94 | def test_iter_specs_not_list(self): |
95 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
96 | notif_iter = bt2.TraceCollectionNotificationIterator(spec, | |
97 | notification_types=[bt2.EventNotification]) | |
98 | self.assertEqual(len(list(notif_iter)), 8) | |
99 | ||
100 | def test_iter_custom_filter(self): | |
101 | src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
102 | flt_spec = bt2.ComponentSpec('utils', 'trimmer', { | |
103 | 'end': 13515309000000075, | |
104 | }) | |
105 | notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec, | |
106 | notification_types=[bt2.EventNotification]) | |
107 | self.assertEqual(len(list(notif_iter)), 5) | |
108 | ||
704c2307 | 109 | def test_iter_intersection(self): |
3d60267b | 110 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
111 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True) |
112 | self.assertEqual(len(list(notif_iter)), 15) | |
113 | ||
114 | def test_iter_intersection_subscribe(self): | |
3d60267b | 115 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
116 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, |
117 | notification_types=[bt2.EventNotification]) | |
118 | self.assertEqual(len(list(notif_iter)), 3) | |
119 | ||
120 | def test_iter_intersection_no_path_param(self): | |
3d60267b | 121 | specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})] |
704c2307 PP |
122 | |
123 | with self.assertRaises(bt2.Error): | |
124 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, | |
125 | notification_types=[bt2.EventNotification]) | |
126 | ||
127 | def test_iter_no_intersection_two_traces(self): | |
3d60267b | 128 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) |
704c2307 PP |
129 | specs = [spec, spec] |
130 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) | |
131 | self.assertEqual(len(list(notif_iter)), 56) | |
132 | ||
133 | def test_iter_no_intersection_begin(self): | |
3d60267b | 134 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
135 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
136 | notification_types=[bt2.EventNotification], | |
137 | begin=13515309.000000023) | |
138 | self.assertEqual(len(list(notif_iter)), 6) | |
139 | ||
140 | def test_iter_no_intersection_end(self): | |
3d60267b | 141 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
142 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
143 | notification_types=[bt2.EventNotification], | |
144 | end=13515309.000000075) | |
145 | self.assertEqual(len(list(notif_iter)), 5) |