Fix: debug-info: remove `_` prefix when finding fields by name
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_notification_iterator.py
CommitLineData
704c2307
PP
1import unittest
2import datetime
3import copy
4import uuid
5import bt2
6import os
7import os.path
8
9
10_TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH']
11_3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
12 'intersection',
13 '3eventsintersect')
14
15
16class SourceComponentSpecTestCase(unittest.TestCase):
17 def test_create_good_no_params(self):
18 spec = bt2.SourceComponentSpec('plugin', 'compcls')
19
20 def test_create_good_with_params(self):
21 spec = bt2.SourceComponentSpec('plugin', 'compcls', {'salut': 23})
22
23 def test_create_good_with_path_params(self):
24 spec = bt2.SourceComponentSpec('plugin', 'compcls', 'a path')
25 self.assertEqual(spec.params['path'], 'a path')
26
27 def test_create_wrong_plugin_name_type(self):
28 with self.assertRaises(TypeError):
29 spec = bt2.SourceComponentSpec(23, 'compcls')
30
31 def test_create_wrong_component_class_name_type(self):
32 with self.assertRaises(TypeError):
33 spec = bt2.SourceComponentSpec('plugin', 190)
34
35 def test_create_wrong_params_type(self):
36 with self.assertRaises(TypeError):
37 spec = bt2.SourceComponentSpec('dwdw', 'compcls', datetime.datetime.now())
38
39
40class TraceCollectionNotificationIteratorTestCase(unittest.TestCase):
41 def test_create_wrong_stream_intersection_mode_type(self):
42 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
43
44 with self.assertRaises(TypeError):
45 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=23)
46
47 def test_create_wrong_begin_type(self):
48 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
49
50 with self.assertRaises(TypeError):
51 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='hi')
52
53 def test_create_wrong_end_type(self):
54 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
55
56 with self.assertRaises(TypeError):
57 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin='lel')
58
59 def test_create_no_such_plugin(self):
60 specs = [bt2.SourceComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
61
62 with self.assertRaises(bt2.Error):
63 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
64
65 def test_create_begin_s(self):
66 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
67 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=19457.918232)
68
69 def test_create_end_s(self):
70 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
71 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=123.12312)
72
73 def test_create_begin_datetime(self):
74 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
75 notif_iter = bt2.TraceCollectionNotificationIterator(specs, begin=datetime.datetime.now())
76
77 def test_create_end_datetime(self):
78 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
79 notif_iter = bt2.TraceCollectionNotificationIterator(specs, end=datetime.datetime.now())
80
81 def test_iter_no_intersection(self):
82 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
83 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
84 self.assertEqual(len(list(notif_iter)), 28)
85
86 def test_iter_no_intersection_subscribe(self):
87 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
88 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
89 notification_types=[bt2.EventNotification])
90 self.assertEqual(len(list(notif_iter)), 8)
91
92 def test_iter_intersection(self):
93 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
94 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True)
95 self.assertEqual(len(list(notif_iter)), 15)
96
97 def test_iter_intersection_subscribe(self):
98 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
99 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
100 notification_types=[bt2.EventNotification])
101 self.assertEqual(len(list(notif_iter)), 3)
102
103 def test_iter_intersection_no_path_param(self):
104 specs = [bt2.SourceComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
105
106 with self.assertRaises(bt2.Error):
107 notif_iter = bt2.TraceCollectionNotificationIterator(specs, stream_intersection_mode=True,
108 notification_types=[bt2.EventNotification])
109
110 def test_iter_no_intersection_two_traces(self):
111 spec = bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
112 specs = [spec, spec]
113 notif_iter = bt2.TraceCollectionNotificationIterator(specs)
114 self.assertEqual(len(list(notif_iter)), 56)
115
116 def test_iter_no_intersection_begin(self):
117 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
118 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
119 notification_types=[bt2.EventNotification],
120 begin=13515309.000000023)
121 self.assertEqual(len(list(notif_iter)), 6)
122
123 def test_iter_no_intersection_end(self):
124 specs = [bt2.SourceComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
125 notif_iter = bt2.TraceCollectionNotificationIterator(specs,
126 notification_types=[bt2.EventNotification],
127 end=13515309.000000075)
128 self.assertEqual(len(list(notif_iter)), 5)
This page took 0.02683 seconds and 4 git commands to generate.