6dd774aeb5e5b812b8889291574ccb47cf37a074
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_notification_iterator.py
1 import unittest
2 import datetime
3 import copy
4 import uuid
5 import bt2
6 import os
7 import os.path
8
9
10 _TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH']
11 _3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
12 'intersection',
13 '3eventsintersect')
14
15
16 class ComponentSpecTestCase(unittest.TestCase):
17 def test_create_good_no_params(self):
18 spec = bt2.ComponentSpec('plugin', 'compcls')
19
20 def test_create_good_with_params(self):
21 spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
22
23 def test_create_good_with_path_params(self):
24 spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
25 self.assertEqual(spec.params['path'], 'a path')
26
27 def test_create_wrong_plugin_name_type(self):
28 with self.assertRaises(TypeError):
29 spec = bt2.ComponentSpec(23, 'compcls')
30
31 def test_create_wrong_component_class_name_type(self):
32 with self.assertRaises(TypeError):
33 spec = bt2.ComponentSpec('plugin', 190)
34
35 def test_create_wrong_params_type(self):
36 with self.assertRaises(TypeError):
37 spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
38
39
40 class TraceCollectionNotificationIteratorTestCase(unittest.TestCase):
41 def test_create_wrong_stream_intersection_mode_type(self):
42 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
43
44 with self.assertRaises(TypeError):
45 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23)
46
47 def test_create_wrong_begin_type(self):
48 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
49
50 with self.assertRaises(TypeError):
51 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi')
52
53 def test_create_wrong_end_type(self):
54 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
55
56 with self.assertRaises(TypeError):
57 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel')
58
59 def test_create_no_such_plugin(self):
60 specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
61
62 with self.assertRaises(bt2.Error):
63 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
64
65 def test_create_begin_s(self):
66 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
67 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232)
68
69 def test_create_end_s(self):
70 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
71 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312)
72
73 def test_create_begin_datetime(self):
74 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
75 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now())
76
77 def test_create_end_datetime(self):
78 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
79 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now())
80
81 def test_iter_no_intersection(self):
82 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
83 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
84 self.assertEqual(len(list(notif_iter)), 28)
85
86 def test_iter_no_intersection_subscribe(self):
87 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
88 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
89 notification_types=[bt2.EventNotification])
90 self.assertEqual(len(list(notif_iter)), 8)
91
92 def test_iter_specs_not_list(self):
93 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
94 notif_iter = bt2.TraceCollectionNotificationIterator(spec,
95 notification_types=[bt2.EventNotification])
96 self.assertEqual(len(list(notif_iter)), 8)
97
98 def test_iter_custom_filter(self):
99 src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
100 flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
101 'end': 13515309000000075,
102 })
103 notif_iter = bt2.TraceCollectionNotificationIterator(src_spec, flt_spec,
104 notification_types=[bt2.EventNotification])
105 self.assertEqual(len(list(notif_iter)), 5)
106
107 def test_iter_intersection(self):
108 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
109 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True)
110 self.assertEqual(len(list(notif_iter)), 15)
111
112 def test_iter_intersection_subscribe(self):
113 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
114 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
115 notification_types=[bt2.EventNotification])
116 self.assertEqual(len(list(notif_iter)), 3)
117
118 def test_iter_intersection_no_path_param(self):
119 specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
120
121 with self.assertRaises(bt2.Error):
122 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
123 notification_types=[bt2.EventNotification])
124
125 def test_iter_no_intersection_two_traces(self):
126 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
127 specs = [spec, spec]
128 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
129 self.assertEqual(len(list(notif_iter)), 56)
130
131 def test_iter_no_intersection_begin(self):
132 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
133 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
134 notification_types=[bt2.EventNotification],
135 begin=13515309.000000023)
136 self.assertEqual(len(list(notif_iter)), 6)
137
138 def test_iter_no_intersection_end(self):
139 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
140 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
141 notification_types=[bt2.EventNotification],
142 end=13515309.000000075)
143 self.assertEqual(len(list(notif_iter)), 5)
This page took 0.032577 seconds and 3 git commands to generate.