Commit | Line | Data |
---|---|---|
704c2307 PP |
1 | import unittest |
2 | import datetime | |
3 | import copy | |
4 | import uuid | |
5 | import bt2 | |
6 | import os | |
7 | import os.path | |
8 | ||
9 | ||
10 | _TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH'] | |
11 | _3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH, | |
12 | 'intersection', | |
13 | '3eventsintersect') | |
14 | ||
15 | ||
3d60267b | 16 | class ComponentSpecTestCase(unittest.TestCase): |
704c2307 | 17 | def test_create_good_no_params(self): |
3d60267b | 18 | spec = bt2.ComponentSpec('plugin', 'compcls') |
704c2307 PP |
19 | |
20 | def test_create_good_with_params(self): | |
3d60267b | 21 | spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23}) |
704c2307 PP |
22 | |
23 | def test_create_good_with_path_params(self): | |
3d60267b | 24 | spec = bt2.ComponentSpec('plugin', 'compcls', 'a path') |
704c2307 PP |
25 | self.assertEqual(spec.params['path'], 'a path') |
26 | ||
27 | def test_create_wrong_plugin_name_type(self): | |
28 | with self.assertRaises(TypeError): | |
3d60267b | 29 | spec = bt2.ComponentSpec(23, 'compcls') |
704c2307 PP |
30 | |
31 | def test_create_wrong_component_class_name_type(self): | |
32 | with self.assertRaises(TypeError): | |
3d60267b | 33 | spec = bt2.ComponentSpec('plugin', 190) |
704c2307 PP |
34 | |
35 | def test_create_wrong_params_type(self): | |
36 | with self.assertRaises(TypeError): | |
3d60267b | 37 | spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now()) |
704c2307 PP |
38 | |
39 | ||
40 | class TraceCollectionNotificationIteratorTestCase(unittest.TestCase): | |
41 | def test_create_wrong_stream_intersection_mode_type(self): | |
3d60267b | 42 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
43 | |
44 | with self.assertRaises(TypeError): | |
45 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23) | |
46 | ||
47 | def test_create_wrong_begin_type(self): | |
3d60267b | 48 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
49 | |
50 | with self.assertRaises(TypeError): | |
51 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi') | |
52 | ||
53 | def test_create_wrong_end_type(self): | |
3d60267b | 54 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
55 | |
56 | with self.assertRaises(TypeError): | |
57 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel') | |
58 | ||
59 | def test_create_no_such_plugin(self): | |
3d60267b | 60 | specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
61 | |
62 | with self.assertRaises(bt2.Error): | |
63 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) | |
64 | ||
65 | def test_create_begin_s(self): | |
3d60267b | 66 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
67 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232) |
68 | ||
69 | def test_create_end_s(self): | |
3d60267b | 70 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
71 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312) |
72 | ||
73 | def test_create_begin_datetime(self): | |
3d60267b | 74 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
75 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now()) |
76 | ||
77 | def test_create_end_datetime(self): | |
3d60267b | 78 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
79 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now()) |
80 | ||
81 | def test_iter_no_intersection(self): | |
3d60267b | 82 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
83 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) |
84 | self.assertEqual(len(list(notif_iter)), 28) | |
85 | ||
86 | def test_iter_no_intersection_subscribe(self): | |
3d60267b | 87 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
88 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
89 | notification_types=[bt2.EventNotification]) | |
90 | self.assertEqual(len(list(notif_iter)), 8) | |
91 | ||
3d60267b PP |
92 | def test_iter_specs_not_list(self): |
93 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
94 | notif_iter = bt2.TraceCollectionNotificationIterator(spec, | |
95 | notification_types=[bt2.EventNotification]) | |
96 | self.assertEqual(len(list(notif_iter)), 8) | |
97 | ||
98 | def test_iter_custom_filter(self): | |
99 | src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) | |
100 | flt_spec = bt2.ComponentSpec('utils', 'trimmer', { | |
101 | 'end': 13515309000000075, | |
102 | }) | |
103 | notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec, | |
104 | notification_types=[bt2.EventNotification]) | |
105 | self.assertEqual(len(list(notif_iter)), 5) | |
106 | ||
704c2307 | 107 | def test_iter_intersection(self): |
3d60267b | 108 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
109 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True) |
110 | self.assertEqual(len(list(notif_iter)), 15) | |
111 | ||
112 | def test_iter_intersection_subscribe(self): | |
3d60267b | 113 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
114 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, |
115 | notification_types=[bt2.EventNotification]) | |
116 | self.assertEqual(len(list(notif_iter)), 3) | |
117 | ||
118 | def test_iter_intersection_no_path_param(self): | |
3d60267b | 119 | specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})] |
704c2307 PP |
120 | |
121 | with self.assertRaises(bt2.Error): | |
122 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True, | |
123 | notification_types=[bt2.EventNotification]) | |
124 | ||
125 | def test_iter_no_intersection_two_traces(self): | |
3d60267b | 126 | spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH) |
704c2307 PP |
127 | specs = [spec, spec] |
128 | notif_iter = bt2.TraceCollectionNotificationIterator(specs) | |
129 | self.assertEqual(len(list(notif_iter)), 56) | |
130 | ||
131 | def test_iter_no_intersection_begin(self): | |
3d60267b | 132 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
133 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
134 | notification_types=[bt2.EventNotification], | |
135 | begin=13515309.000000023) | |
136 | self.assertEqual(len(list(notif_iter)), 6) | |
137 | ||
138 | def test_iter_no_intersection_end(self): | |
3d60267b | 139 | specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)] |
704c2307 PP |
140 | notif_iter = bt2.TraceCollectionNotificationIterator(specs, |
141 | notification_types=[bt2.EventNotification], | |
142 | end=13515309.000000075) | |
143 | self.assertEqual(len(list(notif_iter)), 5) |