| 1 | from collections import OrderedDict |
| 2 | from bt2 import values |
| 3 | import unittest |
| 4 | import copy |
| 5 | import bt2 |
| 6 | |
| 7 | |
| 8 | @unittest.skip("this is broken") |
| 9 | class StreamTestCase(unittest.TestCase): |
| 10 | def setUp(self): |
| 11 | self._stream = self._create_stream(stream_id=23) |
| 12 | |
| 13 | def tearDown(self): |
| 14 | del self._stream |
| 15 | |
| 16 | def _create_stream(self, name='my_stream', stream_id=None): |
| 17 | # event header |
| 18 | eh = bt2.StructureFieldType() |
| 19 | eh += OrderedDict(( |
| 20 | ('id', bt2.IntegerFieldType(8)), |
| 21 | ('ts', bt2.IntegerFieldType(32)), |
| 22 | )) |
| 23 | |
| 24 | # stream event context |
| 25 | sec = bt2.StructureFieldType() |
| 26 | sec += OrderedDict(( |
| 27 | ('cpu_id', bt2.IntegerFieldType(8)), |
| 28 | ('stuff', bt2.FloatingPointNumberFieldType()), |
| 29 | )) |
| 30 | |
| 31 | # packet context |
| 32 | pc = bt2.StructureFieldType() |
| 33 | pc += OrderedDict(( |
| 34 | ('something', bt2.IntegerFieldType(8)), |
| 35 | ('something_else', bt2.FloatingPointNumberFieldType()), |
| 36 | )) |
| 37 | |
| 38 | # stream class |
| 39 | sc = bt2.StreamClass() |
| 40 | sc.packet_context_field_type = pc |
| 41 | sc.event_header_field_type = eh |
| 42 | sc.event_context_field_type = sec |
| 43 | |
| 44 | # event context |
| 45 | ec = bt2.StructureFieldType() |
| 46 | ec += OrderedDict(( |
| 47 | ('ant', bt2.IntegerFieldType(16, is_signed=True)), |
| 48 | ('msg', bt2.StringFieldType()), |
| 49 | )) |
| 50 | |
| 51 | # event payload |
| 52 | ep = bt2.StructureFieldType() |
| 53 | ep += OrderedDict(( |
| 54 | ('giraffe', bt2.IntegerFieldType(32)), |
| 55 | ('gnu', bt2.IntegerFieldType(8)), |
| 56 | ('mosquito', bt2.IntegerFieldType(8)), |
| 57 | )) |
| 58 | |
| 59 | # event class |
| 60 | event_class = bt2.EventClass('ec') |
| 61 | event_class.context_field_type = ec |
| 62 | event_class.payload_field_type = ep |
| 63 | sc.add_event_class(event_class) |
| 64 | |
| 65 | # packet header |
| 66 | ph = bt2.StructureFieldType() |
| 67 | ph += OrderedDict(( |
| 68 | ('magic', bt2.IntegerFieldType(32)), |
| 69 | ('stream_id', bt2.IntegerFieldType(16)), |
| 70 | )) |
| 71 | |
| 72 | # trace c;ass |
| 73 | tc = bt2.Trace() |
| 74 | tc.packet_header_field_type = ph |
| 75 | tc.add_stream_class(sc) |
| 76 | |
| 77 | # stream |
| 78 | return sc(name=name, id=stream_id) |
| 79 | |
| 80 | def test_attr_stream_class(self): |
| 81 | self.assertIsNotNone(self._stream.stream_class) |
| 82 | |
| 83 | def test_attr_name(self): |
| 84 | self.assertEqual(self._stream.name, 'my_stream') |
| 85 | |
| 86 | def test_eq(self): |
| 87 | stream1 = self._create_stream(stream_id=17) |
| 88 | stream2 = self._create_stream(stream_id=17) |
| 89 | self.assertEqual(stream1, stream2) |
| 90 | |
| 91 | def test_ne_name(self): |
| 92 | stream1 = self._create_stream(stream_id=17) |
| 93 | stream2 = self._create_stream('lel', 17) |
| 94 | self.assertNotEqual(stream1, stream2) |
| 95 | |
| 96 | def test_ne_id(self): |
| 97 | stream1 = self._create_stream(stream_id=17) |
| 98 | stream2 = self._create_stream(stream_id=23) |
| 99 | self.assertNotEqual(stream1, stream2) |
| 100 | |
| 101 | def test_eq_invalid(self): |
| 102 | self.assertFalse(self._stream == 23) |
| 103 | |
| 104 | def _test_copy(self, func): |
| 105 | stream = self._create_stream() |
| 106 | cpy = func(stream) |
| 107 | self.assertIsNot(stream, cpy) |
| 108 | self.assertNotEqual(stream.addr, cpy.addr) |
| 109 | self.assertEqual(stream, cpy) |
| 110 | |
| 111 | def test_copy(self): |
| 112 | self._test_copy(copy.copy) |
| 113 | |
| 114 | def test_deepcopy(self): |
| 115 | self._test_copy(copy.deepcopy) |