| 1 | from collections import OrderedDict |
| 2 | from bt2 import values |
| 3 | import unittest |
| 4 | import copy |
| 5 | import bt2 |
| 6 | |
| 7 | |
| 8 | class StreamTestCase(unittest.TestCase): |
| 9 | def setUp(self): |
| 10 | self._stream = self._create_stream() |
| 11 | |
| 12 | def _create_stream(self, name='my_stream'): |
| 13 | # event header |
| 14 | eh = bt2.StructureFieldType() |
| 15 | eh += OrderedDict(( |
| 16 | ('id', bt2.IntegerFieldType(8)), |
| 17 | ('ts', bt2.IntegerFieldType(32)), |
| 18 | )) |
| 19 | |
| 20 | # stream event context |
| 21 | sec = bt2.StructureFieldType() |
| 22 | sec += OrderedDict(( |
| 23 | ('cpu_id', bt2.IntegerFieldType(8)), |
| 24 | ('stuff', bt2.FloatingPointNumberFieldType()), |
| 25 | )) |
| 26 | |
| 27 | # packet context |
| 28 | pc = bt2.StructureFieldType() |
| 29 | pc += OrderedDict(( |
| 30 | ('something', bt2.IntegerFieldType(8)), |
| 31 | ('something_else', bt2.FloatingPointNumberFieldType()), |
| 32 | )) |
| 33 | |
| 34 | # stream class |
| 35 | sc = bt2.StreamClass() |
| 36 | sc.packet_context_field_type = pc |
| 37 | sc.event_header_field_type = eh |
| 38 | sc.event_context_field_type = sec |
| 39 | |
| 40 | # event context |
| 41 | ec = bt2.StructureFieldType() |
| 42 | ec += OrderedDict(( |
| 43 | ('ant', bt2.IntegerFieldType(16, is_signed=True)), |
| 44 | ('msg', bt2.StringFieldType()), |
| 45 | )) |
| 46 | |
| 47 | # event payload |
| 48 | ep = bt2.StructureFieldType() |
| 49 | ep += OrderedDict(( |
| 50 | ('giraffe', bt2.IntegerFieldType(32)), |
| 51 | ('gnu', bt2.IntegerFieldType(8)), |
| 52 | ('mosquito', bt2.IntegerFieldType(8)), |
| 53 | )) |
| 54 | |
| 55 | # event class |
| 56 | event_class = bt2.EventClass('ec') |
| 57 | event_class.context_field_type = ec |
| 58 | event_class.payload_field_type = ep |
| 59 | sc.add_event_class(event_class) |
| 60 | |
| 61 | # packet header |
| 62 | ph = bt2.StructureFieldType() |
| 63 | ph += OrderedDict(( |
| 64 | ('magic', bt2.IntegerFieldType(32)), |
| 65 | ('stream_id', bt2.IntegerFieldType(16)), |
| 66 | )) |
| 67 | |
| 68 | # trace c;ass |
| 69 | tc = bt2.Trace() |
| 70 | tc.packet_header_field_type = ph |
| 71 | tc.add_stream_class(sc) |
| 72 | |
| 73 | # stream |
| 74 | return sc(name=name) |
| 75 | |
| 76 | def test_attr_stream_class(self): |
| 77 | self.assertIsNotNone(self._stream.stream_class) |
| 78 | |
| 79 | def test_attr_name(self): |
| 80 | self.assertEqual(self._stream.name, 'my_stream') |
| 81 | |
| 82 | def test_eq(self): |
| 83 | stream1 = self._create_stream() |
| 84 | stream2 = self._create_stream() |
| 85 | self.assertEqual(stream1, stream2) |
| 86 | |
| 87 | def test_ne_name(self): |
| 88 | stream1 = self._create_stream() |
| 89 | stream2 = self._create_stream('lel') |
| 90 | self.assertNotEqual(stream1, stream2) |
| 91 | |
| 92 | def test_eq_invalid(self): |
| 93 | self.assertFalse(self._stream == 23) |
| 94 | |
| 95 | def _test_copy(self, func): |
| 96 | stream = self._create_stream() |
| 97 | cpy = func(stream) |
| 98 | self.assertIsNot(stream, cpy) |
| 99 | self.assertNotEqual(stream.addr, cpy.addr) |
| 100 | self.assertEqual(stream, cpy) |
| 101 | |
| 102 | def test_copy(self): |
| 103 | self._test_copy(copy.copy) |
| 104 | |
| 105 | def test_deepcopy(self): |
| 106 | self._test_copy(copy.deepcopy) |