8 class _MessageTestCase(unittest
.TestCase
):
10 self
._trace
= bt2
.Trace()
11 self
._sc
= bt2
.StreamClass()
12 self
._ec
= bt2
.EventClass('salut')
13 self
._my
_int
_fc
= bt2
.IntegerFieldClass(32)
14 self
._ec
.payload_field_class
= bt2
.StructureFieldClass()
15 self
._ec
.payload_field_class
+= collections
.OrderedDict([
16 ('my_int', self
._my
_int
_fc
),
18 self
._sc
.add_event_class(self
._ec
)
19 self
._clock
_class
= bt2
.ClockClass('allo', 1000)
20 self
._trace
.add_clock_class(self
._clock
_class
)
21 self
._trace
.packet_header_field_class
= bt2
.StructureFieldClass()
22 self
._trace
.packet_header_field_class
+= collections
.OrderedDict([
23 ('hello', self
._my
_int
_fc
),
25 self
._trace
.add_stream_class(self
._sc
)
26 self
._cc
_prio
_map
= bt2
.ClockClassPriorityMap()
27 self
._cc
_prio
_map
[self
._clock
_class
] = 231
28 self
._stream
= self
._sc
()
29 self
._packet
= self
._stream
.create_packet()
30 self
._packet
.header_field
['hello'] = 19487
31 self
._event
= self
._ec
()
32 self
._event
.clock_snapshots
.add(self
._clock
_class
(1772))
33 self
._event
.payload_field
['my_int'] = 23
34 self
._event
.packet
= self
._packet
48 @unittest.skip("this is broken")
49 class EventMessageTestCase(_MessageTestCase
):
50 def test_create_no_cc_prio_map(self
):
51 msg
= bt2
.EventMessage(self
._event
)
52 self
.assertEqual(msg
.event
.addr
, self
._event
.addr
)
53 self
.assertEqual(len(msg
.clock_class_priority_map
), 0)
55 def test_create_with_cc_prio_map(self
):
56 msg
= bt2
.EventMessage(self
._event
, self
._cc
_prio
_map
)
57 self
.assertEqual(msg
.event
.addr
, self
._event
.addr
)
58 self
.assertEqual(len(msg
.clock_class_priority_map
), 1)
59 self
.assertEqual(msg
.clock_class_priority_map
.highest_priority_clock_class
.addr
,
60 self
._clock
_class
.addr
)
61 self
.assertEqual(msg
.clock_class_priority_map
[self
._clock
_class
], 231)
64 msg
= bt2
.EventMessage(self
._event
, self
._cc
_prio
_map
)
65 event_copy
= copy
.copy(self
._event
)
66 event_copy
.packet
= self
._packet
67 cc_prio_map_copy
= copy
.copy(self
._cc
_prio
_map
)
68 msg2
= bt2
.EventMessage(event_copy
, cc_prio_map_copy
)
69 self
.assertEqual(msg
, msg2
)
71 def test_ne_event(self
):
72 msg
= bt2
.EventMessage(self
._event
, self
._cc
_prio
_map
)
73 event_copy
= copy
.copy(self
._event
)
74 event_copy
.payload_field
['my_int'] = 17
75 event_copy
.packet
= self
._packet
76 cc_prio_map_copy
= copy
.copy(self
._cc
_prio
_map
)
77 msg2
= bt2
.EventMessage(event_copy
, cc_prio_map_copy
)
78 self
.assertNotEqual(msg
, msg2
)
80 def test_ne_cc_prio_map(self
):
81 msg
= bt2
.EventMessage(self
._event
)
82 event_copy
= copy
.copy(self
._event
)
83 event_copy
.packet
= self
._packet
84 cc_prio_map_copy
= copy
.copy(self
._cc
_prio
_map
)
85 msg2
= bt2
.EventMessage(event_copy
, cc_prio_map_copy
)
86 self
.assertNotEqual(msg
, msg2
)
88 def test_eq_invalid(self
):
89 msg
= bt2
.EventMessage(self
._event
)
90 self
.assertNotEqual(msg
, 23)
93 msg
= bt2
.EventMessage(self
._event
, self
._cc
_prio
_map
)
95 self
.assertEqual(msg
, msg2
)
97 def test_deepcopy(self
):
98 msg
= bt2
.EventMessage(self
._event
, self
._cc
_prio
_map
)
99 msg2
= copy
.deepcopy(msg
)
100 self
.assertEqual(msg
, msg2
)
103 @unittest.skip("this is broken")
104 class PacketBeginningMessageTestCase(_MessageTestCase
):
105 def test_create(self
):
106 msg
= bt2
.PacketBeginningMessage(self
._packet
)
107 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
110 msg
= bt2
.PacketBeginningMessage(self
._packet
)
111 packet_copy
= copy
.copy(self
._packet
)
112 msg2
= bt2
.PacketBeginningMessage(packet_copy
)
113 self
.assertEqual(msg
, msg2
)
115 def test_ne_packet(self
):
116 msg
= bt2
.PacketBeginningMessage(self
._packet
)
117 packet_copy
= copy
.copy(self
._packet
)
118 packet_copy
.header_field
['hello'] = 1847
119 msg2
= bt2
.PacketBeginningMessage(packet_copy
)
120 self
.assertNotEqual(msg
, msg2
)
122 def test_eq_invalid(self
):
123 msg
= bt2
.PacketBeginningMessage(self
._packet
)
124 self
.assertNotEqual(msg
, 23)
127 msg
= bt2
.PacketBeginningMessage(self
._packet
)
128 msg2
= copy
.copy(msg
)
129 self
.assertEqual(msg
, msg2
)
131 def test_deepcopy(self
):
132 msg
= bt2
.PacketBeginningMessage(self
._packet
)
133 msg2
= copy
.deepcopy(msg
)
134 self
.assertEqual(msg
, msg2
)
137 @unittest.skip("this is broken")
138 class PacketEndMessageTestCase(_MessageTestCase
):
139 def test_create(self
):
140 msg
= bt2
.PacketEndMessage(self
._packet
)
141 self
.assertEqual(msg
.packet
.addr
, self
._packet
.addr
)
144 msg
= bt2
.PacketEndMessage(self
._packet
)
145 packet_copy
= copy
.copy(self
._packet
)
146 msg2
= bt2
.PacketEndMessage(packet_copy
)
147 self
.assertEqual(msg
, msg2
)
149 def test_ne_packet(self
):
150 msg
= bt2
.PacketEndMessage(self
._packet
)
151 packet_copy
= copy
.copy(self
._packet
)
152 packet_copy
.header_field
['hello'] = 1847
153 msg2
= bt2
.PacketEndMessage(packet_copy
)
154 self
.assertNotEqual(msg
, msg2
)
156 def test_eq_invalid(self
):
157 msg
= bt2
.PacketEndMessage(self
._packet
)
158 self
.assertNotEqual(msg
, 23)
161 msg
= bt2
.PacketEndMessage(self
._packet
)
162 msg2
= copy
.copy(msg
)
163 self
.assertEqual(msg
, msg2
)
165 def test_deepcopy(self
):
166 msg
= bt2
.PacketEndMessage(self
._packet
)
167 msg2
= copy
.deepcopy(msg
)
168 self
.assertEqual(msg
, msg2
)
171 @unittest.skip("this is broken")
172 class StreamBeginningMessageTestCase(_MessageTestCase
):
173 def test_create(self
):
174 msg
= bt2
.StreamBeginningMessage(self
._stream
)
175 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
178 msg
= bt2
.StreamBeginningMessage(self
._stream
)
179 stream_copy
= copy
.copy(self
._stream
)
180 msg2
= bt2
.StreamBeginningMessage(stream_copy
)
181 self
.assertEqual(msg
, msg2
)
183 def test_ne_stream(self
):
184 msg
= bt2
.StreamBeginningMessage(self
._stream
)
185 stream_copy
= self
._sc
(name
='salut')
186 msg2
= bt2
.StreamBeginningMessage(stream_copy
)
187 self
.assertNotEqual(msg
, msg2
)
189 def test_eq_invalid(self
):
190 msg
= bt2
.StreamBeginningMessage(self
._stream
)
191 self
.assertNotEqual(msg
, 23)
194 msg
= bt2
.StreamBeginningMessage(self
._stream
)
195 msg2
= copy
.copy(msg
)
196 self
.assertEqual(msg
, msg2
)
198 def test_deepcopy(self
):
199 msg
= bt2
.StreamBeginningMessage(self
._stream
)
200 msg2
= copy
.deepcopy(msg
)
201 self
.assertEqual(msg
, msg2
)
204 @unittest.skip("this is broken")
205 class StreamEndMessageTestCase(_MessageTestCase
):
206 def test_create(self
):
207 msg
= bt2
.StreamEndMessage(self
._stream
)
208 self
.assertEqual(msg
.stream
.addr
, self
._stream
.addr
)
211 msg
= bt2
.StreamEndMessage(self
._stream
)
212 stream_copy
= copy
.copy(self
._stream
)
213 msg2
= bt2
.StreamEndMessage(stream_copy
)
214 self
.assertEqual(msg
, msg2
)
216 def test_ne_stream(self
):
217 msg
= bt2
.StreamEndMessage(self
._stream
)
218 stream_copy
= self
._sc
(name
='salut')
219 msg2
= bt2
.StreamEndMessage(stream_copy
)
220 self
.assertNotEqual(msg
, msg2
)
222 def test_eq_invalid(self
):
223 msg
= bt2
.StreamEndMessage(self
._stream
)
224 self
.assertNotEqual(msg
, 23)
227 msg
= bt2
.StreamEndMessage(self
._stream
)
228 msg2
= copy
.copy(msg
)
229 self
.assertEqual(msg
, msg2
)
231 def test_deepcopy(self
):
232 msg
= bt2
.StreamEndMessage(self
._stream
)
233 msg2
= copy
.deepcopy(msg
)
234 self
.assertEqual(msg
, msg2
)
237 @unittest.skip("this is broken")
238 class InactivityMessageTestCase(unittest
.TestCase
):
240 self
._cc
1 = bt2
.ClockClass('cc1', 1000)
241 self
._cc
2 = bt2
.ClockClass('cc2', 2000)
242 self
._cc
_prio
_map
= bt2
.ClockClassPriorityMap()
243 self
._cc
_prio
_map
[self
._cc
1] = 25
244 self
._cc
_prio
_map
[self
._cc
2] = 50
249 del self
._cc
_prio
_map
251 def test_create_no_cc_prio_map(self
):
252 msg
= bt2
.InactivityMessage()
253 self
.assertEqual(len(msg
.clock_class_priority_map
), 0)
255 def test_create_with_cc_prio_map(self
):
256 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
257 msg
.clock_snapshots
.add(self
._cc
1(123))
258 msg
.clock_snapshots
.add(self
._cc
2(19487))
259 self
.assertEqual(len(msg
.clock_class_priority_map
), 2)
260 self
.assertEqual(msg
.clock_class_priority_map
, self
._cc
_prio
_map
)
261 self
.assertEqual(msg
.clock_snapshots
[self
._cc
1], 123)
262 self
.assertEqual(msg
.clock_snapshots
[self
._cc
2], 19487)
265 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
266 msg
.clock_snapshots
.add(self
._cc
1(123))
267 msg
.clock_snapshots
.add(self
._cc
2(19487))
268 cc_prio_map_copy
= copy
.copy(self
._cc
_prio
_map
)
269 msg2
= bt2
.InactivityMessage(cc_prio_map_copy
)
270 msg2
.clock_snapshots
.add(self
._cc
1(123))
271 msg2
.clock_snapshots
.add(self
._cc
2(19487))
272 self
.assertEqual(msg
, msg2
)
274 def test_ne_cc_prio_map(self
):
275 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
276 msg
.clock_snapshots
.add(self
._cc
1(123))
277 msg
.clock_snapshots
.add(self
._cc
2(19487))
278 cc_prio_map_copy
= copy
.copy(self
._cc
_prio
_map
)
279 cc_prio_map_copy
[self
._cc
2] = 23
280 msg2
= bt2
.InactivityMessage(cc_prio_map_copy
)
281 self
.assertNotEqual(msg
, msg2
)
283 def test_ne_clock_snapshot(self
):
284 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
285 msg
.clock_snapshots
.add(self
._cc
1(123))
286 msg
.clock_snapshots
.add(self
._cc
2(19487))
287 msg2
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
288 msg
.clock_snapshots
.add(self
._cc
1(123))
289 msg
.clock_snapshots
.add(self
._cc
2(1847))
290 self
.assertNotEqual(msg
, msg2
)
292 def test_eq_invalid(self
):
293 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
294 self
.assertNotEqual(msg
, 23)
297 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
298 msg
.clock_snapshots
.add(self
._cc
1(123))
299 msg
.clock_snapshots
.add(self
._cc
2(19487))
300 msg_copy
= copy
.copy(msg
)
301 self
.assertEqual(msg
, msg_copy
)
302 self
.assertNotEqual(msg
.addr
, msg_copy
.addr
)
303 self
.assertEqual(msg
.clock_class_priority_map
.addr
,
304 msg_copy
.clock_class_priority_map
.addr
)
305 self
.assertEqual(msg_copy
.clock_snapshots
[self
._cc
1], 123)
306 self
.assertEqual(msg_copy
.clock_snapshots
[self
._cc
2], 19487)
308 def test_deepcopy(self
):
309 msg
= bt2
.InactivityMessage(self
._cc
_prio
_map
)
310 msg
.clock_snapshots
.add(self
._cc
1(123))
311 msg
.clock_snapshots
.add(self
._cc
2(19487))
312 msg_copy
= copy
.deepcopy(msg
)
313 self
.assertEqual(msg
, msg_copy
)
314 self
.assertNotEqual(msg
.addr
, msg_copy
.addr
)
315 self
.assertNotEqual(msg
.clock_class_priority_map
.addr
,
316 msg_copy
.clock_class_priority_map
.addr
)
317 self
.assertEqual(msg
.clock_class_priority_map
,
318 msg_copy
.clock_class_priority_map
)
319 self
.assertNotEqual(list(msg
.clock_class_priority_map
)[0].addr
,
320 list(msg_copy
.clock_class_priority_map
)[0].addr
)
321 self
.assertIsNone(msg_copy
.clock_snapshots
[self
._cc
1])
322 self
.assertIsNone(msg_copy
.clock_snapshots
[self
._cc
2])
323 self
.assertEqual(msg_copy
.clock_snapshots
[list(msg_copy
.clock_class_priority_map
)[0]], 123)
324 self
.assertEqual(msg_copy
.clock_snapshots
[list(msg_copy
.clock_class_priority_map
)[1]], 19487)
327 @unittest.skip("this is broken")
328 class DiscardedPacketsMessageTestCase(unittest
.TestCase
):
330 self
._trace
= bt2
.Trace()
331 self
._sc
= bt2
.StreamClass()
332 self
._ec
= bt2
.EventClass('salut')
333 self
._clock
_class
= bt2
.ClockClass('yo', 1000)
334 self
._uint
64_int
_fc
= bt2
.IntegerFieldClass(64, mapped_clock_class
=self
._clock
_class
)
335 self
._my
_int
_fc
= bt2
.IntegerFieldClass(32)
336 self
._ec
.payload_field_class
= bt2
.StructureFieldClass()
337 self
._ec
.payload_field_class
+= collections
.OrderedDict([
338 ('my_int', self
._my
_int
_fc
),
340 self
._sc
.add_event_class(self
._ec
)
341 self
._sc
.packet_context_field_class
= bt2
.StructureFieldClass()
342 self
._sc
.packet_context_field_class
+= collections
.OrderedDict([
343 ('packet_seq_num', self
._my
_int
_fc
),
344 ('timestamp_begin', self
._uint
64_int
_fc
),
345 ('timestamp_end', self
._uint
64_int
_fc
),
347 self
._trace
.add_clock_class(self
._clock
_class
)
348 self
._trace
.add_stream_class(self
._sc
)
349 self
._stream
= self
._sc
()
355 del self
._clock
_class
356 del self
._uint
64_int
_fc
360 def _create_event(self
, packet
):
362 event
.payload_field
['my_int'] = 23
363 event
.packet
= packet
367 class MyIter(bt2
._UserMessageIterator
):
368 def __init__(iter_self
):
369 packet1
= self
._stream
.create_packet()
370 packet1
.context_field
['packet_seq_num'] = 0
371 packet1
.context_field
['timestamp_begin'] = 3
372 packet1
.context_field
['timestamp_end'] = 6
373 packet2
= self
._stream
.create_packet()
374 packet2
.context_field
['packet_seq_num'] = 5
375 packet2
.context_field
['timestamp_begin'] = 7
376 packet2
.context_field
['timestamp_end'] = 10
377 iter_self
._ev
1 = self
._create
_event
(packet1
)
378 iter_self
._ev
2 = self
._create
_event
(packet2
)
383 msg
= bt2
.EventMessage(self
._ev
1)
385 msg
= bt2
.EventMessage(self
._ev
2)
392 class MySource(bt2
._UserSourceComponent
,
393 message_iterator_class
=MyIter
):
394 def __init__(self
, params
):
395 self
._add
_output
_port
('out')
397 class MySink(bt2
._UserSinkComponent
):
398 def __init__(self
, params
):
399 self
._add
_input
_port
('in')
401 def _consume(comp_self
):
403 msg
= next(comp_self
._msg
_iter
)
405 if type(msg
) is bt2
._DiscardedPacketsMessage
:
409 def _port_connected(self
, port
, other_port
):
410 self
._msg
_iter
= port
.connection
.create_message_iterator()
414 src
= graph
.add_component(MySource
, 'src')
415 sink
= graph
.add_component(MySink
, 'sink')
416 conn
= graph
.connect_ports(src
.output_ports
['out'],
417 sink
.input_ports
['in'])
421 def test_create(self
):
422 self
.assertIsInstance(self
._get
_msg
(), bt2
._DiscardedPacketsMessage
)
424 def test_count(self
):
425 self
.assertEqual(self
._get
_msg
().count
, 4)
427 def test_stream(self
):
428 self
.assertEqual(self
._get
_msg
().stream
.addr
, self
._stream
.addr
)
430 def test_beginning_clock_snapshot(self
):
431 msg
= self
._get
_msg
()
432 beginning_clock_snapshot
= msg
.beginning_clock_snapshot
433 self
.assertEqual(beginning_clock_snapshot
.clock_class
, self
._clock
_class
)
434 self
.assertEqual(beginning_clock_snapshot
, 6)
436 def test_end_clock_snapshot(self
):
437 msg
= self
._get
_msg
()
438 end_clock_snapshot
= msg
.end_clock_snapshot
439 self
.assertEqual(end_clock_snapshot
.clock_class
, self
._clock
_class
)
440 self
.assertEqual(end_clock_snapshot
, 7)
443 msg1
= self
._get
_msg
()
444 msg2
= self
._get
_msg
()
445 self
.assertEqual(msg1
, msg2
)
447 def test_eq_invalid(self
):
448 msg1
= self
._get
_msg
()
449 self
.assertNotEqual(msg1
, 23)
452 @unittest.skip("this is broken")
453 class DiscardedEventsMessageTestCase(unittest
.TestCase
):
455 self
._trace
= bt2
.Trace()
456 self
._sc
= bt2
.StreamClass()
457 self
._ec
= bt2
.EventClass('salut')
458 self
._clock
_class
= bt2
.ClockClass('yo', 1000)
459 self
._uint
64_int
_fc
= bt2
.IntegerFieldClass(64, mapped_clock_class
=self
._clock
_class
)
460 self
._my
_int
_fc
= bt2
.IntegerFieldClass(32)
461 self
._ec
.payload_field_class
= bt2
.StructureFieldClass()
462 self
._ec
.payload_field_class
+= collections
.OrderedDict([
463 ('my_int', self
._my
_int
_fc
),
465 self
._sc
.add_event_class(self
._ec
)
466 self
._sc
.packet_context_field_class
= bt2
.StructureFieldClass()
467 self
._sc
.packet_context_field_class
+= collections
.OrderedDict([
468 ('events_discarded', self
._my
_int
_fc
),
469 ('timestamp_begin', self
._uint
64_int
_fc
),
470 ('timestamp_end', self
._uint
64_int
_fc
),
472 self
._trace
.add_clock_class(self
._clock
_class
)
473 self
._trace
.add_stream_class(self
._sc
)
474 self
._stream
= self
._sc
()
480 del self
._clock
_class
481 del self
._uint
64_int
_fc
485 def _create_event(self
, packet
):
487 event
.payload_field
['my_int'] = 23
488 event
.packet
= packet
492 class MyIter(bt2
._UserMessageIterator
):
493 def __init__(iter_self
):
494 packet1
= self
._stream
.create_packet()
495 packet1
.context_field
['events_discarded'] = 0
496 packet1
.context_field
['timestamp_begin'] = 3
497 packet1
.context_field
['timestamp_end'] = 6
498 packet2
= self
._stream
.create_packet()
499 packet2
.context_field
['events_discarded'] = 10
500 packet2
.context_field
['timestamp_begin'] = 7
501 packet2
.context_field
['timestamp_end'] = 10
502 iter_self
._ev
1 = self
._create
_event
(packet1
)
503 iter_self
._ev
2 = self
._create
_event
(packet2
)
508 msg
= bt2
.EventMessage(self
._ev
1)
510 msg
= bt2
.EventMessage(self
._ev
2)
517 class MySource(bt2
._UserSourceComponent
,
518 message_iterator_class
=MyIter
):
519 def __init__(self
, params
):
520 self
._add
_output
_port
('out')
522 class MySink(bt2
._UserSinkComponent
):
523 def __init__(self
, params
):
524 self
._add
_input
_port
('in')
526 def _consume(comp_self
):
528 msg
= next(comp_self
._msg
_iter
)
530 if type(msg
) is bt2
._DiscardedEventsMessage
:
534 def _port_connected(self
, port
, other_port
):
535 self
._msg
_iter
= port
.connection
.create_message_iterator()
539 src
= graph
.add_component(MySource
, 'src')
540 sink
= graph
.add_component(MySink
, 'sink')
541 conn
= graph
.connect_ports(src
.output_ports
['out'],
542 sink
.input_ports
['in'])
546 def test_create(self
):
547 self
.assertIsInstance(self
._get
_msg
(), bt2
._DiscardedEventsMessage
)
549 def test_count(self
):
550 self
.assertEqual(self
._get
_msg
().count
, 10)
552 def test_stream(self
):
553 self
.assertEqual(self
._get
_msg
().stream
.addr
, self
._stream
.addr
)
555 def test_beginning_clock_snapshot(self
):
556 msg
= self
._get
_msg
()
557 beginning_clock_snapshot
= msg
.beginning_clock_snapshot
558 self
.assertEqual(beginning_clock_snapshot
.clock_class
, self
._clock
_class
)
559 self
.assertEqual(beginning_clock_snapshot
, 6)
561 def test_end_clock_snapshot(self
):
562 msg
= self
._get
_msg
()
563 end_clock_snapshot
= msg
.end_clock_snapshot
564 self
.assertEqual(end_clock_snapshot
.clock_class
, self
._clock
_class
)
565 self
.assertEqual(end_clock_snapshot
, 10)
568 msg1
= self
._get
_msg
()
569 msg2
= self
._get
_msg
()
570 self
.assertEqual(msg1
, msg2
)
572 def test_eq_invalid(self
):
573 msg1
= self
._get
_msg
()
574 self
.assertNotEqual(msg1
, 23)