bindings/python/bt2: Make the bt2 package importable
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_notification_iterator.py
CommitLineData
cc261e29
PP
1import unittest
2import datetime
3import copy
4import uuid
5import bt2
6import os
7import os.path
8
9
10_TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH']
11_3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
12 'intersection',
13 '3eventsintersect')
14
15
88fdcc33 16class ComponentSpecTestCase(unittest.TestCase):
cc261e29 17 def test_create_good_no_params(self):
88fdcc33 18 spec = bt2.ComponentSpec('plugin', 'compcls')
cc261e29
PP
19
20 def test_create_good_with_params(self):
88fdcc33 21 spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
cc261e29
PP
22
23 def test_create_good_with_path_params(self):
88fdcc33 24 spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
cc261e29
PP
25 self.assertEqual(spec.params['path'], 'a path')
26
27 def test_create_wrong_plugin_name_type(self):
28 with self.assertRaises(TypeError):
88fdcc33 29 spec = bt2.ComponentSpec(23, 'compcls')
cc261e29
PP
30
31 def test_create_wrong_component_class_name_type(self):
32 with self.assertRaises(TypeError):
88fdcc33 33 spec = bt2.ComponentSpec('plugin', 190)
cc261e29
PP
34
35 def test_create_wrong_params_type(self):
36 with self.assertRaises(TypeError):
88fdcc33 37 spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
cc261e29
PP
38
39
40class TraceCollectionNotificationIteratorTestCase(unittest.TestCase):
41 def test_create_wrong_stream_intersection_mode_type(self):
88fdcc33 42 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
43
44 with self.assertRaises(TypeError):
45 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23)
46
47 def test_create_wrong_begin_type(self):
88fdcc33 48 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
49
50 with self.assertRaises(TypeError):
51 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi')
52
53 def test_create_wrong_end_type(self):
88fdcc33 54 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
55
56 with self.assertRaises(TypeError):
57 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel')
58
59 def test_create_no_such_plugin(self):
88fdcc33 60 specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
61
62 with self.assertRaises(bt2.Error):
63 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
64
65 def test_create_begin_s(self):
88fdcc33 66 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
67 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232)
68
69 def test_create_end_s(self):
88fdcc33 70 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
71 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312)
72
73 def test_create_begin_datetime(self):
88fdcc33 74 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
75 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now())
76
77 def test_create_end_datetime(self):
88fdcc33 78 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
79 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now())
80
81 def test_iter_no_intersection(self):
88fdcc33 82 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
83 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
84 self.assertEqual(len(list(notif_iter)), 28)
85
86 def test_iter_no_intersection_subscribe(self):
88fdcc33 87 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
88 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
89 notification_types=[bt2.EventNotification])
90 self.assertEqual(len(list(notif_iter)), 8)
91
88fdcc33
PP
92 def test_iter_specs_not_list(self):
93 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
94 notif_iter = bt2.TraceCollectionNotificationIterator(spec,
95 notification_types=[bt2.EventNotification])
96 self.assertEqual(len(list(notif_iter)), 8)
97
98 def test_iter_custom_filter(self):
99 src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
100 flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
101 'end': 13515309000000075,
102 })
103 notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec,
104 notification_types=[bt2.EventNotification])
105 self.assertEqual(len(list(notif_iter)), 5)
106
cc261e29 107 def test_iter_intersection(self):
88fdcc33 108 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
109 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True)
110 self.assertEqual(len(list(notif_iter)), 15)
111
112 def test_iter_intersection_subscribe(self):
88fdcc33 113 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
114 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
115 notification_types=[bt2.EventNotification])
116 self.assertEqual(len(list(notif_iter)), 3)
117
118 def test_iter_intersection_no_path_param(self):
88fdcc33 119 specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
cc261e29
PP
120
121 with self.assertRaises(bt2.Error):
122 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
123 notification_types=[bt2.EventNotification])
124
125 def test_iter_no_intersection_two_traces(self):
88fdcc33 126 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
cc261e29
PP
127 specs = [spec, spec]
128 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
129 self.assertEqual(len(list(notif_iter)), 56)
130
131 def test_iter_no_intersection_begin(self):
88fdcc33 132 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
133 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
134 notification_types=[bt2.EventNotification],
135 begin=13515309.000000023)
136 self.assertEqual(len(list(notif_iter)), 6)
137
138 def test_iter_no_intersection_end(self):
88fdcc33 139 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
cc261e29
PP
140 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
141 notification_types=[bt2.EventNotification],
142 end=13515309.000000075)
143 self.assertEqual(len(list(notif_iter)), 5)
This page took 0.032217 seconds and 4 git commands to generate.