7a571797ff9a74c77fbb65967192e928581af35d
[babeltrace.git] / tests / bindings / python / bt2 / test_message_iterator.py
1 from bt2 import value
2 import collections
3 import unittest
4 import copy
5 import bt2
6
7
8 class UserMessageIteratorTestCase(unittest.TestCase):
9 @staticmethod
10 def _create_graph(src_comp_cls):
11 class MySink(bt2._UserSinkComponent):
12 def __init__(self, params):
13 self._add_input_port('in')
14
15 def _consume(self):
16 next(self._msg_iter)
17
18 def _graph_is_configured(self):
19 self._msg_iter = self._input_ports['in'].create_message_iterator()
20
21 graph = bt2.Graph()
22 src_comp = graph.add_component(src_comp_cls, 'src')
23 sink_comp = graph.add_component(MySink, 'sink')
24 graph.connect_ports(src_comp.output_ports['out'],
25 sink_comp.input_ports['in'])
26 return graph
27
28 def test_init(self):
29 the_output_port_from_source = None
30 the_output_port_from_iter = None
31
32 class MyIter(bt2._UserMessageIterator):
33 def __init__(self, self_port_output):
34 nonlocal initialized
35 nonlocal the_output_port_from_iter
36 initialized = True
37 the_output_port_from_iter = self_port_output
38
39 class MySource(bt2._UserSourceComponent,
40 message_iterator_class=MyIter):
41 def __init__(self, params):
42 nonlocal the_output_port_from_source
43 the_output_port_from_source = self._add_output_port('out', 'user data')
44
45 initialized = False
46 graph = self._create_graph(MySource)
47 graph.run()
48 self.assertTrue(initialized)
49 self.assertEqual(the_output_port_from_source.addr, the_output_port_from_iter.addr)
50 self.assertEqual(the_output_port_from_iter.user_data, 'user data')
51
52 def test_finalize(self):
53 class MyIter(bt2._UserMessageIterator):
54 def _finalize(self):
55 nonlocal finalized
56 finalized = True
57
58 class MySource(bt2._UserSourceComponent,
59 message_iterator_class=MyIter):
60 def __init__(self, params):
61 self._add_output_port('out')
62
63 finalized = False
64 graph = self._create_graph(MySource)
65 graph.run()
66 del graph
67 self.assertTrue(finalized)
68
69 def test_component(self):
70 class MyIter(bt2._UserMessageIterator):
71 def __init__(self, self_port_output):
72 nonlocal salut
73 salut = self._component._salut
74
75 class MySource(bt2._UserSourceComponent,
76 message_iterator_class=MyIter):
77 def __init__(self, params):
78 self._add_output_port('out')
79 self._salut = 23
80
81 salut = None
82 graph = self._create_graph(MySource)
83 graph.run()
84 self.assertEqual(salut, 23)
85
86 def test_addr(self):
87 class MyIter(bt2._UserMessageIterator):
88 def __init__(self, self_port_output):
89 nonlocal addr
90 addr = self.addr
91
92 class MySource(bt2._UserSourceComponent,
93 message_iterator_class=MyIter):
94 def __init__(self, params):
95 self._add_output_port('out')
96
97 addr = None
98 graph = self._create_graph(MySource)
99 graph.run()
100 self.assertIsNotNone(addr)
101 self.assertNotEqual(addr, 0)
102
103
104 class OutputPortMessageIteratorTestCase(unittest.TestCase):
105 def test_component(self):
106 class MyIter(bt2._UserMessageIterator):
107 def __init__(self, self_port_output):
108 self._at = 0
109
110 def __next__(self):
111 if self._at == 7:
112 raise bt2.Stop
113
114 if self._at == 0:
115 msg = self._create_stream_beginning_message(test_obj._stream)
116 elif self._at == 1:
117 msg = self._create_packet_beginning_message(test_obj._packet)
118 elif self._at == 5:
119 msg = self._create_packet_end_message(test_obj._packet)
120 elif self._at == 6:
121 msg = self._create_stream_end_message(test_obj._stream)
122 else:
123 msg = self._create_event_message(test_obj._event_class, test_obj._packet)
124 msg.event.payload_field['my_int'] = self._at * 3
125
126 self._at += 1
127 return msg
128
129 class MySource(bt2._UserSourceComponent,
130 message_iterator_class=MyIter):
131 def __init__(self, params):
132 self._add_output_port('out')
133
134 trace_class = self._create_trace_class()
135 stream_class = trace_class.create_stream_class()
136
137 # Create payload field class
138 my_int_ft = trace_class.create_signed_integer_field_class(32)
139 payload_ft = trace_class.create_structure_field_class()
140 payload_ft += collections.OrderedDict([
141 ('my_int', my_int_ft),
142 ])
143
144 event_class = stream_class.create_event_class(name='salut', payload_field_class=payload_ft)
145
146 trace = trace_class()
147 stream = trace.create_stream(stream_class)
148 packet = stream.create_packet()
149
150 test_obj._event_class = event_class
151 test_obj._stream = stream
152 test_obj._packet = packet
153
154 test_obj = self
155 graph = bt2.Graph()
156 src = graph.add_component(MySource, 'src')
157 msg_iter = graph.create_output_port_message_iterator(src.output_ports['out'])
158
159 for at, msg in enumerate(msg_iter):
160 if at == 0:
161 self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
162 elif at == 1:
163 self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
164 elif at == 5:
165 self.assertIsInstance(msg, bt2.message._PacketEndMessage)
166 elif at == 6:
167 self.assertIsInstance(msg, bt2.message._StreamEndMessage)
168 else:
169 self.assertIsInstance(msg, bt2.message._EventMessage)
170 self.assertEqual(msg.event.cls.name, 'salut')
171 field = msg.event.payload_field['my_int']
172 self.assertEqual(field, at * 3)
This page took 0.035888 seconds and 3 git commands to generate.