bt2: let components attach "user data" to ports
[babeltrace.git] / tests / bindings / python / bt2 / test_trace_collection_message_iterator.py
CommitLineData
704c2307
PP
1import unittest
2import datetime
3import copy
4import uuid
5import bt2
6import os
7import os.path
8
9
10_TEST_CTF_TRACES_PATH = os.environ['TEST_CTF_TRACES_PATH']
11_3EVENTS_INTERSECT_TRACE_PATH = os.path.join(_TEST_CTF_TRACES_PATH,
12 'intersection',
13 '3eventsintersect')
14
15
976c241d 16@unittest.skip("this is broken")
3d60267b 17class ComponentSpecTestCase(unittest.TestCase):
704c2307 18 def test_create_good_no_params(self):
3d60267b 19 spec = bt2.ComponentSpec('plugin', 'compcls')
704c2307
PP
20
21 def test_create_good_with_params(self):
3d60267b 22 spec = bt2.ComponentSpec('plugin', 'compcls', {'salut': 23})
704c2307
PP
23
24 def test_create_good_with_path_params(self):
3d60267b 25 spec = bt2.ComponentSpec('plugin', 'compcls', 'a path')
704c2307
PP
26 self.assertEqual(spec.params['path'], 'a path')
27
28 def test_create_wrong_plugin_name_type(self):
29 with self.assertRaises(TypeError):
3d60267b 30 spec = bt2.ComponentSpec(23, 'compcls')
704c2307
PP
31
32 def test_create_wrong_component_class_name_type(self):
33 with self.assertRaises(TypeError):
3d60267b 34 spec = bt2.ComponentSpec('plugin', 190)
704c2307
PP
35
36 def test_create_wrong_params_type(self):
37 with self.assertRaises(TypeError):
3d60267b 38 spec = bt2.ComponentSpec('dwdw', 'compcls', datetime.datetime.now())
704c2307
PP
39
40
976c241d 41@unittest.skip("this is broken")
5602ef81 42class TraceCollectionMessageIteratorTestCase(unittest.TestCase):
704c2307 43 def test_create_wrong_stream_intersection_mode_type(self):
3d60267b 44 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
45
46 with self.assertRaises(TypeError):
5602ef81 47 msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=23)
704c2307
PP
48
49 def test_create_wrong_begin_type(self):
3d60267b 50 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
51
52 with self.assertRaises(TypeError):
5602ef81 53 msg_iter = bt2.TraceCollectionMessageIterator(specs, begin='hi')
704c2307
PP
54
55 def test_create_wrong_end_type(self):
3d60267b 56 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
57
58 with self.assertRaises(TypeError):
5602ef81 59 msg_iter = bt2.TraceCollectionMessageIterator(specs, begin='lel')
704c2307
PP
60
61 def test_create_no_such_plugin(self):
3d60267b 62 specs = [bt2.ComponentSpec('77', '101', _3EVENTS_INTERSECT_TRACE_PATH)]
704c2307
PP
63
64 with self.assertRaises(bt2.Error):
5602ef81 65 msg_iter = bt2.TraceCollectionMessageIterator(specs)
704c2307
PP
66
67 def test_create_begin_s(self):
3d60267b 68 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81 69 msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=19457.918232)
704c2307
PP
70
71 def test_create_end_s(self):
3d60267b 72 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81 73 msg_iter = bt2.TraceCollectionMessageIterator(specs, end=123.12312)
704c2307
PP
74
75 def test_create_begin_datetime(self):
3d60267b 76 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81 77 msg_iter = bt2.TraceCollectionMessageIterator(specs, begin=datetime.datetime.now())
704c2307
PP
78
79 def test_create_end_datetime(self):
3d60267b 80 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81 81 msg_iter = bt2.TraceCollectionMessageIterator(specs, end=datetime.datetime.now())
704c2307
PP
82
83 def test_iter_no_intersection(self):
3d60267b 84 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
85 msg_iter = bt2.TraceCollectionMessageIterator(specs)
86 self.assertEqual(len(list(msg_iter)), 28)
704c2307
PP
87
88 def test_iter_no_intersection_subscribe(self):
3d60267b 89 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
90 msg_iter = bt2.TraceCollectionMessageIterator(specs,
91 message_types=[bt2.EventMessage])
92 self.assertEqual(len(list(msg_iter)), 8)
704c2307 93
3d60267b
PP
94 def test_iter_specs_not_list(self):
95 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
5602ef81
SM
96 msg_iter = bt2.TraceCollectionMessageIterator(spec,
97 message_types=[bt2.EventMessage])
98 self.assertEqual(len(list(msg_iter)), 8)
3d60267b
PP
99
100 def test_iter_custom_filter(self):
101 src_spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
102 flt_spec = bt2.ComponentSpec('utils', 'trimmer', {
103 'end': 13515309000000075,
104 })
5602ef81
SM
105 msg_iter = bt2.TraceCollectionMessageIterator(src_spec, flt_spec,
106 message_types=[bt2.EventMessage])
107 self.assertEqual(len(list(msg_iter)), 5)
3d60267b 108
704c2307 109 def test_iter_intersection(self):
3d60267b 110 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
111 msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True)
112 self.assertEqual(len(list(msg_iter)), 15)
704c2307
PP
113
114 def test_iter_intersection_subscribe(self):
3d60267b 115 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
116 msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True,
117 message_types=[bt2.EventMessage])
118 self.assertEqual(len(list(msg_iter)), 3)
704c2307
PP
119
120 def test_iter_intersection_no_path_param(self):
3d60267b 121 specs = [bt2.ComponentSpec('text', 'dmesg', {'read-from-stdin': True})]
704c2307
PP
122
123 with self.assertRaises(bt2.Error):
5602ef81
SM
124 msg_iter = bt2.TraceCollectionMessageIterator(specs, stream_intersection_mode=True,
125 message_types=[bt2.EventMessage])
704c2307
PP
126
127 def test_iter_no_intersection_two_traces(self):
3d60267b 128 spec = bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)
704c2307 129 specs = [spec, spec]
5602ef81
SM
130 msg_iter = bt2.TraceCollectionMessageIterator(specs)
131 self.assertEqual(len(list(msg_iter)), 56)
704c2307
PP
132
133 def test_iter_no_intersection_begin(self):
3d60267b 134 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
135 msg_iter = bt2.TraceCollectionMessageIterator(specs,
136 message_types=[bt2.EventMessage],
704c2307 137 begin=13515309.000000023)
5602ef81 138 self.assertEqual(len(list(msg_iter)), 6)
704c2307
PP
139
140 def test_iter_no_intersection_end(self):
3d60267b 141 specs = [bt2.ComponentSpec('ctf', 'fs', _3EVENTS_INTERSECT_TRACE_PATH)]
5602ef81
SM
142 msg_iter = bt2.TraceCollectionMessageIterator(specs,
143 message_types=[bt2.EventMessage],
704c2307 144 end=13515309.000000075)
5602ef81 145 self.assertEqual(len(list(msg_iter)), 5)
This page took 0.036157 seconds and 4 git commands to generate.