bt2: Mass field_types -> field_class rename
[babeltrace.git] / tests / bindings / python / bt2 / test_message.py
1 from bt2 import values
2 import collections
3 import unittest
4 import copy
5 import bt2
6
7
8 class _MessageTestCase(unittest.TestCase):
9 def setUp(self):
10 self._trace = bt2.Trace()
11 self._sc = bt2.StreamClass()
12 self._ec = bt2.EventClass('salut')
13 self._my_int_fc = bt2.IntegerFieldClass(32)
14 self._ec.payload_field_class = bt2.StructureFieldClass()
15 self._ec.payload_field_class += collections.OrderedDict([
16 ('my_int', self._my_int_fc),
17 ])
18 self._sc.add_event_class(self._ec)
19 self._clock_class = bt2.ClockClass('allo', 1000)
20 self._trace.add_clock_class(self._clock_class)
21 self._trace.packet_header_field_class = bt2.StructureFieldClass()
22 self._trace.packet_header_field_class += collections.OrderedDict([
23 ('hello', self._my_int_fc),
24 ])
25 self._trace.add_stream_class(self._sc)
26 self._cc_prio_map = bt2.ClockClassPriorityMap()
27 self._cc_prio_map[self._clock_class] = 231
28 self._stream = self._sc()
29 self._packet = self._stream.create_packet()
30 self._packet.header_field['hello'] = 19487
31 self._event = self._ec()
32 self._event.clock_values.add(self._clock_class(1772))
33 self._event.payload_field['my_int'] = 23
34 self._event.packet = self._packet
35
36 def tearDown(self):
37 del self._trace
38 del self._sc
39 del self._ec
40 del self._my_int_fc
41 del self._clock_class
42 del self._cc_prio_map
43 del self._stream
44 del self._packet
45 del self._event
46
47
48 @unittest.skip("this is broken")
49 class EventMessageTestCase(_MessageTestCase):
50 def test_create_no_cc_prio_map(self):
51 msg = bt2.EventMessage(self._event)
52 self.assertEqual(msg.event.addr, self._event.addr)
53 self.assertEqual(len(msg.clock_class_priority_map), 0)
54
55 def test_create_with_cc_prio_map(self):
56 msg = bt2.EventMessage(self._event, self._cc_prio_map)
57 self.assertEqual(msg.event.addr, self._event.addr)
58 self.assertEqual(len(msg.clock_class_priority_map), 1)
59 self.assertEqual(msg.clock_class_priority_map.highest_priority_clock_class.addr,
60 self._clock_class.addr)
61 self.assertEqual(msg.clock_class_priority_map[self._clock_class], 231)
62
63 def test_eq(self):
64 msg = bt2.EventMessage(self._event, self._cc_prio_map)
65 event_copy = copy.copy(self._event)
66 event_copy.packet = self._packet
67 cc_prio_map_copy = copy.copy(self._cc_prio_map)
68 msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
69 self.assertEqual(msg, msg2)
70
71 def test_ne_event(self):
72 msg = bt2.EventMessage(self._event, self._cc_prio_map)
73 event_copy = copy.copy(self._event)
74 event_copy.payload_field['my_int'] = 17
75 event_copy.packet = self._packet
76 cc_prio_map_copy = copy.copy(self._cc_prio_map)
77 msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
78 self.assertNotEqual(msg, msg2)
79
80 def test_ne_cc_prio_map(self):
81 msg = bt2.EventMessage(self._event)
82 event_copy = copy.copy(self._event)
83 event_copy.packet = self._packet
84 cc_prio_map_copy = copy.copy(self._cc_prio_map)
85 msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
86 self.assertNotEqual(msg, msg2)
87
88 def test_eq_invalid(self):
89 msg = bt2.EventMessage(self._event)
90 self.assertNotEqual(msg, 23)
91
92 def test_copy(self):
93 msg = bt2.EventMessage(self._event, self._cc_prio_map)
94 msg2 = copy.copy(msg)
95 self.assertEqual(msg, msg2)
96
97 def test_deepcopy(self):
98 msg = bt2.EventMessage(self._event, self._cc_prio_map)
99 msg2 = copy.deepcopy(msg)
100 self.assertEqual(msg, msg2)
101
102
103 @unittest.skip("this is broken")
104 class PacketBeginningMessageTestCase(_MessageTestCase):
105 def test_create(self):
106 msg = bt2.PacketBeginningMessage(self._packet)
107 self.assertEqual(msg.packet.addr, self._packet.addr)
108
109 def test_eq(self):
110 msg = bt2.PacketBeginningMessage(self._packet)
111 packet_copy = copy.copy(self._packet)
112 msg2 = bt2.PacketBeginningMessage(packet_copy)
113 self.assertEqual(msg, msg2)
114
115 def test_ne_packet(self):
116 msg = bt2.PacketBeginningMessage(self._packet)
117 packet_copy = copy.copy(self._packet)
118 packet_copy.header_field['hello'] = 1847
119 msg2 = bt2.PacketBeginningMessage(packet_copy)
120 self.assertNotEqual(msg, msg2)
121
122 def test_eq_invalid(self):
123 msg = bt2.PacketBeginningMessage(self._packet)
124 self.assertNotEqual(msg, 23)
125
126 def test_copy(self):
127 msg = bt2.PacketBeginningMessage(self._packet)
128 msg2 = copy.copy(msg)
129 self.assertEqual(msg, msg2)
130
131 def test_deepcopy(self):
132 msg = bt2.PacketBeginningMessage(self._packet)
133 msg2 = copy.deepcopy(msg)
134 self.assertEqual(msg, msg2)
135
136
137 @unittest.skip("this is broken")
138 class PacketEndMessageTestCase(_MessageTestCase):
139 def test_create(self):
140 msg = bt2.PacketEndMessage(self._packet)
141 self.assertEqual(msg.packet.addr, self._packet.addr)
142
143 def test_eq(self):
144 msg = bt2.PacketEndMessage(self._packet)
145 packet_copy = copy.copy(self._packet)
146 msg2 = bt2.PacketEndMessage(packet_copy)
147 self.assertEqual(msg, msg2)
148
149 def test_ne_packet(self):
150 msg = bt2.PacketEndMessage(self._packet)
151 packet_copy = copy.copy(self._packet)
152 packet_copy.header_field['hello'] = 1847
153 msg2 = bt2.PacketEndMessage(packet_copy)
154 self.assertNotEqual(msg, msg2)
155
156 def test_eq_invalid(self):
157 msg = bt2.PacketEndMessage(self._packet)
158 self.assertNotEqual(msg, 23)
159
160 def test_copy(self):
161 msg = bt2.PacketEndMessage(self._packet)
162 msg2 = copy.copy(msg)
163 self.assertEqual(msg, msg2)
164
165 def test_deepcopy(self):
166 msg = bt2.PacketEndMessage(self._packet)
167 msg2 = copy.deepcopy(msg)
168 self.assertEqual(msg, msg2)
169
170
171 @unittest.skip("this is broken")
172 class StreamBeginningMessageTestCase(_MessageTestCase):
173 def test_create(self):
174 msg = bt2.StreamBeginningMessage(self._stream)
175 self.assertEqual(msg.stream.addr, self._stream.addr)
176
177 def test_eq(self):
178 msg = bt2.StreamBeginningMessage(self._stream)
179 stream_copy = copy.copy(self._stream)
180 msg2 = bt2.StreamBeginningMessage(stream_copy)
181 self.assertEqual(msg, msg2)
182
183 def test_ne_stream(self):
184 msg = bt2.StreamBeginningMessage(self._stream)
185 stream_copy = self._sc(name='salut')
186 msg2 = bt2.StreamBeginningMessage(stream_copy)
187 self.assertNotEqual(msg, msg2)
188
189 def test_eq_invalid(self):
190 msg = bt2.StreamBeginningMessage(self._stream)
191 self.assertNotEqual(msg, 23)
192
193 def test_copy(self):
194 msg = bt2.StreamBeginningMessage(self._stream)
195 msg2 = copy.copy(msg)
196 self.assertEqual(msg, msg2)
197
198 def test_deepcopy(self):
199 msg = bt2.StreamBeginningMessage(self._stream)
200 msg2 = copy.deepcopy(msg)
201 self.assertEqual(msg, msg2)
202
203
204 @unittest.skip("this is broken")
205 class StreamEndMessageTestCase(_MessageTestCase):
206 def test_create(self):
207 msg = bt2.StreamEndMessage(self._stream)
208 self.assertEqual(msg.stream.addr, self._stream.addr)
209
210 def test_eq(self):
211 msg = bt2.StreamEndMessage(self._stream)
212 stream_copy = copy.copy(self._stream)
213 msg2 = bt2.StreamEndMessage(stream_copy)
214 self.assertEqual(msg, msg2)
215
216 def test_ne_stream(self):
217 msg = bt2.StreamEndMessage(self._stream)
218 stream_copy = self._sc(name='salut')
219 msg2 = bt2.StreamEndMessage(stream_copy)
220 self.assertNotEqual(msg, msg2)
221
222 def test_eq_invalid(self):
223 msg = bt2.StreamEndMessage(self._stream)
224 self.assertNotEqual(msg, 23)
225
226 def test_copy(self):
227 msg = bt2.StreamEndMessage(self._stream)
228 msg2 = copy.copy(msg)
229 self.assertEqual(msg, msg2)
230
231 def test_deepcopy(self):
232 msg = bt2.StreamEndMessage(self._stream)
233 msg2 = copy.deepcopy(msg)
234 self.assertEqual(msg, msg2)
235
236
237 @unittest.skip("this is broken")
238 class InactivityMessageTestCase(unittest.TestCase):
239 def setUp(self):
240 self._cc1 = bt2.ClockClass('cc1', 1000)
241 self._cc2 = bt2.ClockClass('cc2', 2000)
242 self._cc_prio_map = bt2.ClockClassPriorityMap()
243 self._cc_prio_map[self._cc1] = 25
244 self._cc_prio_map[self._cc2] = 50
245
246 def tearDown(self):
247 del self._cc1
248 del self._cc2
249 del self._cc_prio_map
250
251 def test_create_no_cc_prio_map(self):
252 msg = bt2.InactivityMessage()
253 self.assertEqual(len(msg.clock_class_priority_map), 0)
254
255 def test_create_with_cc_prio_map(self):
256 msg = bt2.InactivityMessage(self._cc_prio_map)
257 msg.clock_values.add(self._cc1(123))
258 msg.clock_values.add(self._cc2(19487))
259 self.assertEqual(len(msg.clock_class_priority_map), 2)
260 self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map)
261 self.assertEqual(msg.clock_values[self._cc1], 123)
262 self.assertEqual(msg.clock_values[self._cc2], 19487)
263
264 def test_eq(self):
265 msg = bt2.InactivityMessage(self._cc_prio_map)
266 msg.clock_values.add(self._cc1(123))
267 msg.clock_values.add(self._cc2(19487))
268 cc_prio_map_copy = copy.copy(self._cc_prio_map)
269 msg2 = bt2.InactivityMessage(cc_prio_map_copy)
270 msg2.clock_values.add(self._cc1(123))
271 msg2.clock_values.add(self._cc2(19487))
272 self.assertEqual(msg, msg2)
273
274 def test_ne_cc_prio_map(self):
275 msg = bt2.InactivityMessage(self._cc_prio_map)
276 msg.clock_values.add(self._cc1(123))
277 msg.clock_values.add(self._cc2(19487))
278 cc_prio_map_copy = copy.copy(self._cc_prio_map)
279 cc_prio_map_copy[self._cc2] = 23
280 msg2 = bt2.InactivityMessage(cc_prio_map_copy)
281 self.assertNotEqual(msg, msg2)
282
283 def test_ne_clock_value(self):
284 msg = bt2.InactivityMessage(self._cc_prio_map)
285 msg.clock_values.add(self._cc1(123))
286 msg.clock_values.add(self._cc2(19487))
287 msg2 = bt2.InactivityMessage(self._cc_prio_map)
288 msg.clock_values.add(self._cc1(123))
289 msg.clock_values.add(self._cc2(1847))
290 self.assertNotEqual(msg, msg2)
291
292 def test_eq_invalid(self):
293 msg = bt2.InactivityMessage(self._cc_prio_map)
294 self.assertNotEqual(msg, 23)
295
296 def test_copy(self):
297 msg = bt2.InactivityMessage(self._cc_prio_map)
298 msg.clock_values.add(self._cc1(123))
299 msg.clock_values.add(self._cc2(19487))
300 msg_copy = copy.copy(msg)
301 self.assertEqual(msg, msg_copy)
302 self.assertNotEqual(msg.addr, msg_copy.addr)
303 self.assertEqual(msg.clock_class_priority_map.addr,
304 msg_copy.clock_class_priority_map.addr)
305 self.assertEqual(msg_copy.clock_values[self._cc1], 123)
306 self.assertEqual(msg_copy.clock_values[self._cc2], 19487)
307
308 def test_deepcopy(self):
309 msg = bt2.InactivityMessage(self._cc_prio_map)
310 msg.clock_values.add(self._cc1(123))
311 msg.clock_values.add(self._cc2(19487))
312 msg_copy = copy.deepcopy(msg)
313 self.assertEqual(msg, msg_copy)
314 self.assertNotEqual(msg.addr, msg_copy.addr)
315 self.assertNotEqual(msg.clock_class_priority_map.addr,
316 msg_copy.clock_class_priority_map.addr)
317 self.assertEqual(msg.clock_class_priority_map,
318 msg_copy.clock_class_priority_map)
319 self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr,
320 list(msg_copy.clock_class_priority_map)[0].addr)
321 self.assertIsNone(msg_copy.clock_values[self._cc1])
322 self.assertIsNone(msg_copy.clock_values[self._cc2])
323 self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[0]], 123)
324 self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[1]], 19487)
325
326
327 @unittest.skip("this is broken")
328 class DiscardedPacketsMessageTestCase(unittest.TestCase):
329 def setUp(self):
330 self._trace = bt2.Trace()
331 self._sc = bt2.StreamClass()
332 self._ec = bt2.EventClass('salut')
333 self._clock_class = bt2.ClockClass('yo', 1000)
334 self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
335 self._my_int_fc = bt2.IntegerFieldClass(32)
336 self._ec.payload_field_class = bt2.StructureFieldClass()
337 self._ec.payload_field_class += collections.OrderedDict([
338 ('my_int', self._my_int_fc),
339 ])
340 self._sc.add_event_class(self._ec)
341 self._sc.packet_context_field_class = bt2.StructureFieldClass()
342 self._sc.packet_context_field_class += collections.OrderedDict([
343 ('packet_seq_num', self._my_int_fc),
344 ('timestamp_begin', self._uint64_int_fc),
345 ('timestamp_end', self._uint64_int_fc),
346 ])
347 self._trace.add_clock_class(self._clock_class)
348 self._trace.add_stream_class(self._sc)
349 self._stream = self._sc()
350
351 def tearDown(self):
352 del self._trace
353 del self._sc
354 del self._ec
355 del self._clock_class
356 del self._uint64_int_fc
357 del self._my_int_fc
358 del self._stream
359
360 def _create_event(self, packet):
361 event = self._ec()
362 event.payload_field['my_int'] = 23
363 event.packet = packet
364 return event
365
366 def _get_msg(self):
367 class MyIter(bt2._UserMessageIterator):
368 def __init__(iter_self):
369 packet1 = self._stream.create_packet()
370 packet1.context_field['packet_seq_num'] = 0
371 packet1.context_field['timestamp_begin'] = 3
372 packet1.context_field['timestamp_end'] = 6
373 packet2 = self._stream.create_packet()
374 packet2.context_field['packet_seq_num'] = 5
375 packet2.context_field['timestamp_begin'] = 7
376 packet2.context_field['timestamp_end'] = 10
377 iter_self._ev1 = self._create_event(packet1)
378 iter_self._ev2 = self._create_event(packet2)
379 iter_self._at = 0
380
381 def __next__(self):
382 if self._at == 0:
383 msg = bt2.EventMessage(self._ev1)
384 elif self._at == 1:
385 msg = bt2.EventMessage(self._ev2)
386 else:
387 raise bt2.Stop
388
389 self._at += 1
390 return msg
391
392 class MySource(bt2._UserSourceComponent,
393 message_iterator_class=MyIter):
394 def __init__(self, params):
395 self._add_output_port('out')
396
397 class MySink(bt2._UserSinkComponent):
398 def __init__(self, params):
399 self._add_input_port('in')
400
401 def _consume(comp_self):
402 nonlocal the_msg
403 msg = next(comp_self._msg_iter)
404
405 if type(msg) is bt2._DiscardedPacketsMessage:
406 the_msg = msg
407 raise bt2.Stop
408
409 def _port_connected(self, port, other_port):
410 self._msg_iter = port.connection.create_message_iterator()
411
412 the_msg = None
413 graph = bt2.Graph()
414 src = graph.add_component(MySource, 'src')
415 sink = graph.add_component(MySink, 'sink')
416 conn = graph.connect_ports(src.output_ports['out'],
417 sink.input_ports['in'])
418 graph.run()
419 return the_msg
420
421 def test_create(self):
422 self.assertIsInstance(self._get_msg(), bt2._DiscardedPacketsMessage)
423
424 def test_count(self):
425 self.assertEqual(self._get_msg().count, 4)
426
427 def test_stream(self):
428 self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
429
430 def test_beginning_clock_value(self):
431 msg = self._get_msg()
432 beginning_clock_value = msg.beginning_clock_value
433 self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
434 self.assertEqual(beginning_clock_value, 6)
435
436 def test_end_clock_value(self):
437 msg = self._get_msg()
438 end_clock_value = msg.end_clock_value
439 self.assertEqual(end_clock_value.clock_class, self._clock_class)
440 self.assertEqual(end_clock_value, 7)
441
442 def test_eq(self):
443 msg1 = self._get_msg()
444 msg2 = self._get_msg()
445 self.assertEqual(msg1, msg2)
446
447 def test_eq_invalid(self):
448 msg1 = self._get_msg()
449 self.assertNotEqual(msg1, 23)
450
451
452 @unittest.skip("this is broken")
453 class DiscardedEventsMessageTestCase(unittest.TestCase):
454 def setUp(self):
455 self._trace = bt2.Trace()
456 self._sc = bt2.StreamClass()
457 self._ec = bt2.EventClass('salut')
458 self._clock_class = bt2.ClockClass('yo', 1000)
459 self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
460 self._my_int_fc = bt2.IntegerFieldClass(32)
461 self._ec.payload_field_class = bt2.StructureFieldClass()
462 self._ec.payload_field_class += collections.OrderedDict([
463 ('my_int', self._my_int_fc),
464 ])
465 self._sc.add_event_class(self._ec)
466 self._sc.packet_context_field_class = bt2.StructureFieldClass()
467 self._sc.packet_context_field_class += collections.OrderedDict([
468 ('events_discarded', self._my_int_fc),
469 ('timestamp_begin', self._uint64_int_fc),
470 ('timestamp_end', self._uint64_int_fc),
471 ])
472 self._trace.add_clock_class(self._clock_class)
473 self._trace.add_stream_class(self._sc)
474 self._stream = self._sc()
475
476 def tearDown(self):
477 del self._trace
478 del self._sc
479 del self._ec
480 del self._clock_class
481 del self._uint64_int_fc
482 del self._my_int_fc
483 del self._stream
484
485 def _create_event(self, packet):
486 event = self._ec()
487 event.payload_field['my_int'] = 23
488 event.packet = packet
489 return event
490
491 def _get_msg(self):
492 class MyIter(bt2._UserMessageIterator):
493 def __init__(iter_self):
494 packet1 = self._stream.create_packet()
495 packet1.context_field['events_discarded'] = 0
496 packet1.context_field['timestamp_begin'] = 3
497 packet1.context_field['timestamp_end'] = 6
498 packet2 = self._stream.create_packet()
499 packet2.context_field['events_discarded'] = 10
500 packet2.context_field['timestamp_begin'] = 7
501 packet2.context_field['timestamp_end'] = 10
502 iter_self._ev1 = self._create_event(packet1)
503 iter_self._ev2 = self._create_event(packet2)
504 iter_self._at = 0
505
506 def __next__(self):
507 if self._at == 0:
508 msg = bt2.EventMessage(self._ev1)
509 elif self._at == 1:
510 msg = bt2.EventMessage(self._ev2)
511 else:
512 raise bt2.Stop
513
514 self._at += 1
515 return msg
516
517 class MySource(bt2._UserSourceComponent,
518 message_iterator_class=MyIter):
519 def __init__(self, params):
520 self._add_output_port('out')
521
522 class MySink(bt2._UserSinkComponent):
523 def __init__(self, params):
524 self._add_input_port('in')
525
526 def _consume(comp_self):
527 nonlocal the_msg
528 msg = next(comp_self._msg_iter)
529
530 if type(msg) is bt2._DiscardedEventsMessage:
531 the_msg = msg
532 raise bt2.Stop
533
534 def _port_connected(self, port, other_port):
535 self._msg_iter = port.connection.create_message_iterator()
536
537 the_msg = None
538 graph = bt2.Graph()
539 src = graph.add_component(MySource, 'src')
540 sink = graph.add_component(MySink, 'sink')
541 conn = graph.connect_ports(src.output_ports['out'],
542 sink.input_ports['in'])
543 graph.run()
544 return the_msg
545
546 def test_create(self):
547 self.assertIsInstance(self._get_msg(), bt2._DiscardedEventsMessage)
548
549 def test_count(self):
550 self.assertEqual(self._get_msg().count, 10)
551
552 def test_stream(self):
553 self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
554
555 def test_beginning_clock_value(self):
556 msg = self._get_msg()
557 beginning_clock_value = msg.beginning_clock_value
558 self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
559 self.assertEqual(beginning_clock_value, 6)
560
561 def test_end_clock_value(self):
562 msg = self._get_msg()
563 end_clock_value = msg.end_clock_value
564 self.assertEqual(end_clock_value.clock_class, self._clock_class)
565 self.assertEqual(end_clock_value, 10)
566
567 def test_eq(self):
568 msg1 = self._get_msg()
569 msg2 = self._get_msg()
570 self.assertEqual(msg1, msg2)
571
572 def test_eq_invalid(self):
573 msg1 = self._get_msg()
574 self.assertNotEqual(msg1, 23)
This page took 0.044144 seconds and 5 git commands to generate.