| 1 | from collections import OrderedDict |
| 2 | from bt2 import values |
| 3 | import unittest |
| 4 | import copy |
| 5 | import bt2 |
| 6 | |
| 7 | |
| 8 | class StreamTestCase(unittest.TestCase): |
| 9 | def setUp(self): |
| 10 | self._stream = self._create_stream(stream_id=23) |
| 11 | |
| 12 | def tearDown(self): |
| 13 | del self._stream |
| 14 | |
| 15 | def _create_stream(self, name='my_stream', stream_id=None): |
| 16 | # event header |
| 17 | eh = bt2.StructureFieldType() |
| 18 | eh += OrderedDict(( |
| 19 | ('id', bt2.IntegerFieldType(8)), |
| 20 | ('ts', bt2.IntegerFieldType(32)), |
| 21 | )) |
| 22 | |
| 23 | # stream event context |
| 24 | sec = bt2.StructureFieldType() |
| 25 | sec += OrderedDict(( |
| 26 | ('cpu_id', bt2.IntegerFieldType(8)), |
| 27 | ('stuff', bt2.FloatingPointNumberFieldType()), |
| 28 | )) |
| 29 | |
| 30 | # packet context |
| 31 | pc = bt2.StructureFieldType() |
| 32 | pc += OrderedDict(( |
| 33 | ('something', bt2.IntegerFieldType(8)), |
| 34 | ('something_else', bt2.FloatingPointNumberFieldType()), |
| 35 | )) |
| 36 | |
| 37 | # stream class |
| 38 | sc = bt2.StreamClass() |
| 39 | sc.packet_context_field_type = pc |
| 40 | sc.event_header_field_type = eh |
| 41 | sc.event_context_field_type = sec |
| 42 | |
| 43 | # event context |
| 44 | ec = bt2.StructureFieldType() |
| 45 | ec += OrderedDict(( |
| 46 | ('ant', bt2.IntegerFieldType(16, is_signed=True)), |
| 47 | ('msg', bt2.StringFieldType()), |
| 48 | )) |
| 49 | |
| 50 | # event payload |
| 51 | ep = bt2.StructureFieldType() |
| 52 | ep += OrderedDict(( |
| 53 | ('giraffe', bt2.IntegerFieldType(32)), |
| 54 | ('gnu', bt2.IntegerFieldType(8)), |
| 55 | ('mosquito', bt2.IntegerFieldType(8)), |
| 56 | )) |
| 57 | |
| 58 | # event class |
| 59 | event_class = bt2.EventClass('ec') |
| 60 | event_class.context_field_type = ec |
| 61 | event_class.payload_field_type = ep |
| 62 | sc.add_event_class(event_class) |
| 63 | |
| 64 | # packet header |
| 65 | ph = bt2.StructureFieldType() |
| 66 | ph += OrderedDict(( |
| 67 | ('magic', bt2.IntegerFieldType(32)), |
| 68 | ('stream_id', bt2.IntegerFieldType(16)), |
| 69 | )) |
| 70 | |
| 71 | # trace c;ass |
| 72 | tc = bt2.Trace() |
| 73 | tc.packet_header_field_type = ph |
| 74 | tc.add_stream_class(sc) |
| 75 | |
| 76 | # stream |
| 77 | return sc(name=name, id=stream_id) |
| 78 | |
| 79 | def test_attr_stream_class(self): |
| 80 | self.assertIsNotNone(self._stream.stream_class) |
| 81 | |
| 82 | def test_attr_name(self): |
| 83 | self.assertEqual(self._stream.name, 'my_stream') |
| 84 | |
| 85 | def test_eq(self): |
| 86 | stream1 = self._create_stream(stream_id=17) |
| 87 | stream2 = self._create_stream(stream_id=17) |
| 88 | self.assertEqual(stream1, stream2) |
| 89 | |
| 90 | def test_ne_name(self): |
| 91 | stream1 = self._create_stream(stream_id=17) |
| 92 | stream2 = self._create_stream('lel', 17) |
| 93 | self.assertNotEqual(stream1, stream2) |
| 94 | |
| 95 | def test_ne_id(self): |
| 96 | stream1 = self._create_stream(stream_id=17) |
| 97 | stream2 = self._create_stream(stream_id=23) |
| 98 | self.assertNotEqual(stream1, stream2) |
| 99 | |
| 100 | def test_eq_invalid(self): |
| 101 | self.assertFalse(self._stream == 23) |
| 102 | |
| 103 | def _test_copy(self, func): |
| 104 | stream = self._create_stream() |
| 105 | cpy = func(stream) |
| 106 | self.assertIsNot(stream, cpy) |
| 107 | self.assertNotEqual(stream.addr, cpy.addr) |
| 108 | self.assertEqual(stream, cpy) |
| 109 | |
| 110 | def test_copy(self): |
| 111 | self._test_copy(copy.copy) |
| 112 | |
| 113 | def test_deepcopy(self): |
| 114 | self._test_copy(copy.deepcopy) |