bindings/python/bt2: Make the bt2 package importable
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_notification_iterator.py
CommitLineData
704c2307
PP
1import unittest
2import datetime
3import copy
4import uuid
5import bt2
6import os
7import os.path
8
9
10_TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH']
11_3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
12 'intersection',
13 '3eventsintersect')
14
15
3d60267b 16class ComponentSpecTestCase(unittest.TestCase):
704c2307 17 def test_create_good_no_params(self):
3d60267b 18 spec = bt2.ComponentSpec('plugin', 'compcls')
704c2307
PP
19
20 def test_create_good_with_params(self):
3d60267b 21 spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
704c2307
PP
22
23 def test_create_good_with_path_params(self):
3d60267b 24 spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
704c2307
PP
25 self.assertEqual(spec.params['path'], 'a path')
26
27 def test_create_wrong_plugin_name_type(self):
28 with self.assertRaises(TypeError):
3d60267b 29 spec = bt2.ComponentSpec(23, 'compcls')
704c2307
PP
30
31 def test_create_wrong_component_class_name_type(self):
32 with self.assertRaises(TypeError):
3d60267b 33 spec = bt2.ComponentSpec('plugin', 190)
704c2307
PP
34
35 def test_create_wrong_params_type(self):
36 with self.assertRaises(TypeError):
3d60267b 37 spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
704c2307
PP
38
39
40class TraceCollectionNotificationIteratorTestCase(unittest.TestCase):
41 def test_create_wrong_stream_intersection_mode_type(self):
3d60267b 42 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
43
44 with self.assertRaises(TypeError):
45 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23)
46
47 def test_create_wrong_begin_type(self):
3d60267b 48 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
49
50 with self.assertRaises(TypeError):
51 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi')
52
53 def test_create_wrong_end_type(self):
3d60267b 54 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
55
56 with self.assertRaises(TypeError):
57 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel')
58
59 def test_create_no_such_plugin(self):
3d60267b 60 specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
61
62 with self.assertRaises(bt2.Error):
63 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
64
65 def test_create_begin_s(self):
3d60267b 66 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
67 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232)
68
69 def test_create_end_s(self):
3d60267b 70 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
71 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312)
72
73 def test_create_begin_datetime(self):
3d60267b 74 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
75 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now())
76
77 def test_create_end_datetime(self):
3d60267b 78 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
79 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now())
80
81 def test_iter_no_intersection(self):
3d60267b 82 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
83 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
84 self.assertEqual(len(list(notif_iter)), 28)
85
86 def test_iter_no_intersection_subscribe(self):
3d60267b 87 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
88 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
89 notification_types=[bt2.EventNotification])
90 self.assertEqual(len(list(notif_iter)), 8)
91
3d60267b
PP
92 def test_iter_specs_not_list(self):
93 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
94 notif_iter = bt2.TraceCollectionNotificationIterator(spec,
95 notification_types=[bt2.EventNotification])
96 self.assertEqual(len(list(notif_iter)), 8)
97
98 def test_iter_custom_filter(self):
99 src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
100 flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
101 'end': 13515309000000075,
102 })
103 notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec,
104 notification_types=[bt2.EventNotification])
105 self.assertEqual(len(list(notif_iter)), 5)
106
704c2307 107 def test_iter_intersection(self):
3d60267b 108 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
109 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True)
110 self.assertEqual(len(list(notif_iter)), 15)
111
112 def test_iter_intersection_subscribe(self):
3d60267b 113 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
114 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
115 notification_types=[bt2.EventNotification])
116 self.assertEqual(len(list(notif_iter)), 3)
117
118 def test_iter_intersection_no_path_param(self):
3d60267b 119 specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
704c2307
PP
120
121 with self.assertRaises(bt2.Error):
122 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
123 notification_types=[bt2.EventNotification])
124
125 def test_iter_no_intersection_two_traces(self):
3d60267b 126 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
704c2307
PP
127 specs = [spec, spec]
128 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
129 self.assertEqual(len(list(notif_iter)), 56)
130
131 def test_iter_no_intersection_begin(self):
3d60267b 132 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
133 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
134 notification_types=[bt2.EventNotification],
135 begin=13515309.000000023)
136 self.assertEqual(len(list(notif_iter)), 6)
137
138 def test_iter_no_intersection_end(self):
3d60267b 139 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
140 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
141 notification_types=[bt2.EventNotification],
142 end=13515309.000000075)
143 self.assertEqual(len(list(notif_iter)), 5)
This page took 0.032413 seconds and 4 git commands to generate.