from bt2 import native_bt, object, utils
import bt2.clock_snapshot
-import collections
import bt2.packet
import bt2.stream
import bt2.event
-import copy
import bt2
def _create_from_ptr(ptr):
msg_type = native_bt.message_get_type(ptr)
- cls = None
if msg_type not in _MESSAGE_TYPE_TO_CLS:
raise bt2.Error('unknown message type: {}'.format(msg_type))
return _MESSAGE_TYPE_TO_CLS[msg_type]._create_from_ptr(ptr)
-def _msg_types_from_msg_classes(message_types):
- if message_types is None:
- msg_types = None
- else:
- for msg_cls in message_types:
- if msg_cls not in _MESSAGE_TYPE_TO_CLS.values():
- raise ValueError("'{}' is not a message class".format(msg_cls))
-
- msg_types = [msg_cls._TYPE for msg_cls in message_types]
-
- return msg_types
-
-
class _Message(object._SharedObject):
_get_ref = staticmethod(native_bt.message_get_ref)
_put_ref = staticmethod(native_bt.message_put_ref)
-class _CopyableMessage(_Message):
- def __copy__(self):
- return self._copy(lambda obj: obj)
+class _MessageWithDefaultClockSnapshot:
+ def _get_default_clock_snapshot(self, borrow_clock_snapshot_ptr):
+ if not self._has_default_clock_class:
+ raise bt2.NoDefaultClockClass('cannot get default clock snapshot, stream class has no default clock class')
- def __deepcopy__(self, memo):
- cpy = self._copy(copy.deepcopy)
- memo[id(self)] = cpy
- return cpy
+ snapshot_ptr = borrow_clock_snapshot_ptr(self._ptr)
+ return bt2.clock_snapshot._ClockSnapshot._create_from_ptr_and_get_ref(
+ snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
-class _EventMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_EVENT
+
+class _EventMessage(_Message, _MessageWithDefaultClockSnapshot):
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_event_borrow_default_clock_snapshot_const)
+
+ @property
+ def _has_default_clock_class(self):
+ return self.event.packet.stream.stream_class.default_clock_class is not None
+
+ @property
+ def default_clock_snapshot(self):
+ return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
@property
def event(self):
return bt2.event._Event._create_from_ptr_and_get_ref(
event_ptr, self._ptr, self._get_ref, self._put_ref)
- @property
- def default_clock_snapshot(self):
- if self.event.event_class.stream_class.default_clock_class is None:
- return None
-
- snapshot_ptr = native_bt.message_event_borrow_default_clock_snapshot_const(self._ptr)
-
- return bt2.clock_snapshot._ClockSnapshot._create_from_ptr_and_get_ref(
- snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
+class _PacketMessage(_Message, _MessageWithDefaultClockSnapshot):
@property
- def clock_class_priority_map(self):
- cc_prio_map_ptr = native_bt.message_event_get_clock_class_priority_map(self._ptr)
- assert(cc_prio_map_ptr)
- return bt2.clock_class_priority_map.ClockClassPriorityMap._create_from_ptr(cc_prio_map_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- self_props = (
- self.event,
- self.clock_class_priority_map,
- )
- other_props = (
- other.event,
- other.clock_class_priority_map,
- )
- return self_props == other_props
-
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return EventMessage(self.event, self.clock_class_priority_map)
-
-
-class _PacketBeginningMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_PACKET_BEGINNING
+ def _has_default_clock_class(self):
+ return self.packet.stream.stream_class.default_clock_class is not None
@property
- def packet(self):
- packet_ptr = native_bt.message_packet_begin_get_packet(self._ptr)
- assert(packet_ptr)
- return bt2.packet._Packet._create_from_ptr(packet_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- return self.packet == other.packet
-
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return PacketBeginningMessage(self.packet)
-
-
-class _PacketEndMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_PACKET_END
+ def default_clock_snapshot(self):
+ return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
@property
def packet(self):
- packet_ptr = native_bt.message_packet_end_get_packet(self._ptr)
- assert(packet_ptr)
- return bt2.packet._Packet._create_from_ptr(packet_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
+ packet_ptr = self._borrow_packet_ptr(self._ptr)
+ assert packet_ptr is not None
+ return bt2.packet._Packet._create_from_ptr_and_get_ref(packet_ptr)
- if self.addr == other.addr:
- return True
- return self.packet == other.packet
+class _PacketBeginningMessage(_PacketMessage):
+ _borrow_packet_ptr = staticmethod(native_bt.message_packet_beginning_borrow_packet)
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_packet_beginning_borrow_default_clock_snapshot_const)
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return PacketEndMessage(self.packet)
+class _PacketEndMessage(_PacketMessage):
+ _borrow_packet_ptr = staticmethod(native_bt.message_packet_end_borrow_packet)
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_packet_end_borrow_default_clock_snapshot_const)
-class _StreamBeginningMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_STREAM_BEGINNING
+class _StreamMessage(_Message):
@property
def stream(self):
- stream_ptr = native_bt.message_stream_begin_get_stream(self._ptr)
- assert(stream_ptr)
- return bt2.stream._create_from_ptr(stream_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
+ stream_ptr = self._borrow_stream_ptr(self._ptr)
+ assert stream_ptr
+ return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
- if self.addr == other.addr:
- return True
- return self.stream == other.stream
+class _StreamBeginningMessage(_StreamMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_beginning_borrow_stream)
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return StreamBeginningMessage(self.stream)
+class _StreamEndMessage(_StreamMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
-class _StreamEndMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_STREAM_END
+class _StreamActivityMessage(_Message):
@property
- def stream(self):
- stream_ptr = native_bt.message_stream_end_get_stream(self._ptr)
- assert(stream_ptr)
- return bt2.stream._create_from_ptr(stream_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- return self.stream == other.stream
-
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return StreamEndMessage(self.stream)
-
-
-class _InactivityMessageClockSnapshotsIterator(collections.abc.Iterator):
- def __init__(self, msg_clock_snapshots):
- self._msg_clock_snapshots = msg_clock_snapshots
- self._clock_classes = list(msg_clock_snapshots._msg.clock_class_priority_map)
- self._at = 0
-
- def __next__(self):
- if self._at == len(self._clock_classes):
- raise StopIteration
-
- self._at += 1
- return self._clock_classes[at]
-
-
-class _InactivityMessageClockSnapshots(collections.abc.Mapping):
- def __init__(self, msg):
- self._msg = msg
-
- def __getitem__(self, clock_class):
- utils._check_type(clock_class, bt2.ClockClass)
- clock_snapshot_ptr = native_bt.message_inactivity_get_clock_snapshot(self._msg._ptr,
- clock_class._ptr)
-
- if clock_snapshot_ptr is None:
- return
-
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
-
- def add(self, clock_snapshot):
- utils._check_type(clock_snapshot, bt2.clock_snapshot._ClockSnapshot)
- ret = native_bt.message_inactivity_set_clock_snapshot(self._msg._ptr,
- clock_snapshot._ptr)
- utils._handle_ret(ret, "cannot set inactivity message object's clock value")
-
- def __len__(self):
- return len(self._msg.clock_class_priority_map)
-
- def __iter__(self):
- return _InactivityMessageClockSnapshotsIterator(self)
-
+ def default_clock_snapshot(self):
+ if self.stream.stream_class.default_clock_class is None:
+ raise bt2.NoDefaultClockClass('cannot get default clock snapshot, stream class has no default clock class')
-class InactivityMessage(_CopyableMessage):
- _TYPE = native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
+ status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
- def __init__(self, cc_prio_map=None):
- if cc_prio_map is not None:
- utils._check_type(cc_prio_map, bt2.clock_class_priority_map.ClockClassPriorityMap)
- cc_prio_map_ptr = cc_prio_map._ptr
+ if status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
+ snapshot_type = bt2.clock_snapshot._ClockSnapshot
+ elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN:
+ snapshot_type = bt2.clock_snapshot._UnknownClockSnapshot
+ elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE:
+ snapshot_type = bt2.clock_snapshot._InfiniteClockSnapshot
else:
- cc_prio_map_ptr = None
+ raise bt2.Error('cannot borrow default clock snapshot from message')
- ptr = native_bt.message_inactivity_create(cc_prio_map_ptr)
+ assert snapshot_ptr is not None
- if ptr is None:
- raise bt2.CreationError('cannot create inactivity message object')
+ return snapshot_type._create_from_ptr_and_get_ref(
+ snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
- super().__init__(ptr)
+ def _default_clock_snapshot(self, value):
+ self._set_default_clock_snapshot_ptr(self._ptr, value)
- @property
- def clock_class_priority_map(self):
- cc_prio_map_ptr = native_bt.message_inactivity_get_clock_class_priority_map(self._ptr)
- assert(cc_prio_map_ptr)
- return bt2.clock_class_priority_map.ClockClassPriorityMap._create_from_ptr(cc_prio_map_ptr)
+ _default_clock_snapshot = property(fset=_default_clock_snapshot)
@property
- def clock_snapshots(self):
- return _InactivityMessageClockSnapshots(self)
-
- def _get_clock_snapshots(self):
- clock_snapshots = {}
-
- for clock_class, clock_snapshot in self.clock_snapshots.items():
- if clock_snapshot is None:
- continue
-
- clock_snapshots[clock_class] = clock_snapshot
-
- return clock_snapshots
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- self_props = (
- self.clock_class_priority_map,
- self._get_clock_snapshots(),
- )
- other_props = (
- other.clock_class_priority_map,
- other._get_clock_snapshots(),
- )
- return self_props == other_props
-
- def __copy__(self):
- cpy = InactivityMessage(self.clock_class_priority_map)
-
- for clock_class, clock_snapshot in self.clock_snapshots.items():
- if clock_snapshot is None:
- continue
-
- cpy.clock_snapshots.add(clock_snapshot)
-
- return cpy
-
- def __deepcopy__(self, memo):
- cc_prio_map_cpy = copy.deepcopy(self.clock_class_priority_map)
- cpy = InactivityMessage(cc_prio_map_cpy)
-
- # copy clock values
- for orig_clock_class in self.clock_class_priority_map:
- orig_clock_snapshot = self.clock_snapshot(orig_clock_class)
-
- if orig_clock_snapshot is None:
- continue
-
- # find equivalent, copied clock class in CC priority map copy
- for cpy_clock_class in cc_prio_map_cpy:
- if cpy_clock_class == orig_clock_class:
- break
-
- # create copy of clock value from copied clock class
- clock_snapshot_cpy = cpy_clock_class(orig_clock_snapshot.cycles)
-
- # set copied clock value in message copy
- cpy.clock_snapshots.add(clock_snapshot_cpy)
-
- memo[id(self)] = cpy
- return cpy
+ def stream(self):
+ stream_ptr = self._borrow_stream_ptr(self._ptr)
+ assert stream_ptr
+ return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
-class _DiscardedElementsMessage(_Message):
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
+class _StreamActivityBeginningMessage(_StreamActivityMessage):
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_default_clock_snapshot_const)
+ _set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot)
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_stream)
- if self.addr == other.addr:
- return True
- self_props = (
- self.count,
- self.stream,
- self.beginning_clock_snapshot,
- self.end_clock_snapshot,
- )
- other_props = (
- other.count,
- other.stream,
- other.beginning_clock_snapshot,
- other.end_clock_snapshot,
- )
- return self_props == other_props
+class _StreamActivityEndMessage(_StreamActivityMessage):
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_default_clock_snapshot_const)
+ _set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot)
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_stream)
-class _DiscardedPacketsMessage(_DiscardedElementsMessage):
- _TYPE = native_bt.MESSAGE_TYPE_DISCARDED_PACKETS
+class _MessageIteratorInactivityMessage(_Message, _MessageWithDefaultClockSnapshot):
+ # This kind of message always has a default clock class.
+ _has_default_clock_class = True
+ _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_message_iterator_inactivity_borrow_default_clock_snapshot_const)
@property
- def count(self):
- count = native_bt.message_discarded_packets_get_count(self._ptr)
- assert(count >= 0)
- return count
+ def default_clock_snapshot(self):
+ return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
+
+class _DiscardedMessage(_Message, _MessageWithDefaultClockSnapshot):
@property
def stream(self):
- stream_ptr = native_bt.message_discarded_packets_get_stream(self._ptr)
- assert(stream_ptr)
- return bt2.stream._create_from_ptr(stream_ptr)
+ stream_ptr = self._borrow_stream_ptr(self._ptr)
+ assert stream_ptr
+ return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
@property
- def beginning_clock_snapshot(self):
- clock_snapshot_ptr = native_bt.message_discarded_packets_get_begin_clock_snapshot(self._ptr)
-
- if clock_snapshot_ptr is None:
- return
-
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
+ def _has_default_clock_class(self):
+ return self.default_clock_class is not None
@property
- def end_clock_snapshot(self):
- clock_snapshot_ptr = native_bt.message_discarded_packets_get_end_clock_snapshot(self._ptr)
-
- if clock_snapshot_ptr is None:
- return
-
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
-
-
-class _DiscardedEventsMessage(_DiscardedElementsMessage):
- _TYPE = native_bt.MESSAGE_TYPE_DISCARDED_EVENTS
+ def default_clock_class(self):
+ cc_ptr = self._borrow_clock_class_ptr(self._ptr)
+ if cc_ptr is not None:
+ return bt2.clock_class._ClockClass._create_from_ptr_and_get_ref(cc_ptr)
@property
def count(self):
- count = native_bt.message_discarded_events_get_count(self._ptr)
- assert(count >= 0)
- return count
+ avail, count = self._get_count(self._ptr)
+ if avail is native_bt.PROPERTY_AVAILABILITY_AVAILABLE:
+ return count
- @property
- def stream(self):
- stream_ptr = native_bt.message_discarded_events_get_stream(self._ptr)
- assert(stream_ptr)
- return bt2.stream._create_from_ptr(stream_ptr)
+ def _set_count(self, count):
+ utils._check_uint64(count)
+ self._set_count(self._ptr, count)
+
+ _count = property(fset=_set_count)
@property
- def beginning_clock_snapshot(self):
- clock_snapshot_ptr = native_bt.message_discarded_events_get_begin_clock_snapshot(self._ptr)
+ def beginning_default_clock_snapshot(self):
+ return self._get_default_clock_snapshot(self._borrow_beginning_clock_snapshot_ptr)
- if clock_snapshot_ptr is None:
- return
+ @property
+ def end_default_clock_snapshot(self):
+ return self._get_default_clock_snapshot(self._borrow_end_clock_snapshot_ptr)
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
- @property
- def end_clock_snapshot(self):
- clock_snapshot_ptr = native_bt.message_discarded_events_get_end_clock_snapshot(self._ptr)
+class _DiscardedEventsMessage(_DiscardedMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream_const)
+ _get_count = staticmethod(native_bt.message_discarded_events_get_count)
+ _set_count = staticmethod(native_bt.message_discarded_events_set_count)
+ _borrow_clock_class_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream_class_default_clock_class_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_default_beginning_clock_snapshot_const)
+ _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_default_end_clock_snapshot_const)
- if clock_snapshot_ptr is None:
- return
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
+class _DiscardedPacketsMessage(_DiscardedMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream_const)
+ _get_count = staticmethod(native_bt.message_discarded_packets_get_count)
+ _set_count = staticmethod(native_bt.message_discarded_packets_set_count)
+ _borrow_clock_class_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream_class_default_clock_class_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_default_beginning_clock_snapshot_const)
+ _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_default_end_clock_snapshot_const)
_MESSAGE_TYPE_TO_CLS = {
native_bt.MESSAGE_TYPE_EVENT: _EventMessage,
- native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage,
- native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage,
+ native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: _MessageIteratorInactivityMessage,
native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessage,
native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessage,
- native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: InactivityMessage,
- native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage,
+ native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage,
+ native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage,
+ native_bt.MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: _StreamActivityBeginningMessage,
+ native_bt.MESSAGE_TYPE_STREAM_ACTIVITY_END: _StreamActivityEndMessage,
native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessage,
+ native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage,
}
-from bt2 import value
import collections
import unittest
-import copy
import bt2
-class _MessageTestCase(unittest.TestCase):
+class AllMessagesTestCase(unittest.TestCase):
def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._clock_class = bt2.ClockClass('allo', 1000)
- self._trace.add_clock_class(self._clock_class)
- self._trace.packet_header_field_class = bt2.StructureFieldClass()
- self._trace.packet_header_field_class += collections.OrderedDict([
- ('hello', self._my_int_fc),
- ])
- self._trace.add_stream_class(self._sc)
- self._cc_prio_map = bt2.ClockClassPriorityMap()
- self._cc_prio_map[self._clock_class] = 231
- self._stream = self._sc()
- self._packet = self._stream.create_packet()
- self._packet.header_field['hello'] = 19487
- self._event = self._ec()
- self._event.clock_snapshots.add(self._clock_class(1772))
- self._event.payload_field['my_int'] = 23
- self._event.packet = self._packet
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._my_int_fc
- del self._clock_class
- del self._cc_prio_map
- del self._stream
- del self._packet
- del self._event
-
-
-@unittest.skip("this is broken")
-class EventMessageTestCase(_MessageTestCase):
- def test_create_no_cc_prio_map(self):
- msg = bt2.EventMessage(self._event)
- self.assertEqual(msg.event.addr, self._event.addr)
- self.assertEqual(len(msg.clock_class_priority_map), 0)
-
- def test_create_with_cc_prio_map(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- self.assertEqual(msg.event.addr, self._event.addr)
- self.assertEqual(len(msg.clock_class_priority_map), 1)
- self.assertEqual(msg.clock_class_priority_map.highest_priority_clock_class.addr,
- self._clock_class.addr)
- self.assertEqual(msg.clock_class_priority_map[self._clock_class], 231)
-
- def test_eq(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- event_copy = copy.copy(self._event)
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_event(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- event_copy = copy.copy(self._event)
- event_copy.payload_field['my_int'] = 17
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_ne_cc_prio_map(self):
- msg = bt2.EventMessage(self._event)
- event_copy = copy.copy(self._event)
- event_copy.packet = self._packet
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.EventMessage(event_copy, cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.EventMessage(self._event)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.EventMessage(self._event, self._cc_prio_map)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketBeginningMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- self.assertEqual(msg.packet.addr, self._packet.addr)
-
- def test_eq(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- msg2 = bt2.PacketBeginningMessage(packet_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_packet(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- packet_copy.header_field['hello'] = 1847
- msg2 = bt2.PacketBeginningMessage(packet_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.PacketBeginningMessage(self._packet)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class PacketEndMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.PacketEndMessage(self._packet)
- self.assertEqual(msg.packet.addr, self._packet.addr)
-
- def test_eq(self):
- msg = bt2.PacketEndMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- msg2 = bt2.PacketEndMessage(packet_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_packet(self):
- msg = bt2.PacketEndMessage(self._packet)
- packet_copy = copy.copy(self._packet)
- packet_copy.header_field['hello'] = 1847
- msg2 = bt2.PacketEndMessage(packet_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.PacketEndMessage(self._packet)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.PacketEndMessage(self._packet)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.PacketEndMessage(self._packet)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamBeginningMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- self.assertEqual(msg.stream.addr, self._stream.addr)
-
- def test_eq(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- stream_copy = copy.copy(self._stream)
- msg2 = bt2.StreamBeginningMessage(stream_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_stream(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- stream_copy = self._sc(name='salut')
- msg2 = bt2.StreamBeginningMessage(stream_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.StreamBeginningMessage(self._stream)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class StreamEndMessageTestCase(_MessageTestCase):
- def test_create(self):
- msg = bt2.StreamEndMessage(self._stream)
- self.assertEqual(msg.stream.addr, self._stream.addr)
-
- def test_eq(self):
- msg = bt2.StreamEndMessage(self._stream)
- stream_copy = copy.copy(self._stream)
- msg2 = bt2.StreamEndMessage(stream_copy)
- self.assertEqual(msg, msg2)
-
- def test_ne_stream(self):
- msg = bt2.StreamEndMessage(self._stream)
- stream_copy = self._sc(name='salut')
- msg2 = bt2.StreamEndMessage(stream_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.StreamEndMessage(self._stream)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.StreamEndMessage(self._stream)
- msg2 = copy.copy(msg)
- self.assertEqual(msg, msg2)
-
- def test_deepcopy(self):
- msg = bt2.StreamEndMessage(self._stream)
- msg2 = copy.deepcopy(msg)
- self.assertEqual(msg, msg2)
-
-
-@unittest.skip("this is broken")
-class InactivityMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._cc1 = bt2.ClockClass('cc1', 1000)
- self._cc2 = bt2.ClockClass('cc2', 2000)
- self._cc_prio_map = bt2.ClockClassPriorityMap()
- self._cc_prio_map[self._cc1] = 25
- self._cc_prio_map[self._cc2] = 50
-
- def tearDown(self):
- del self._cc1
- del self._cc2
- del self._cc_prio_map
-
- def test_create_no_cc_prio_map(self):
- msg = bt2.InactivityMessage()
- self.assertEqual(len(msg.clock_class_priority_map), 0)
-
- def test_create_with_cc_prio_map(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- self.assertEqual(len(msg.clock_class_priority_map), 2)
- self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map)
- self.assertEqual(msg.clock_snapshots[self._cc1], 123)
- self.assertEqual(msg.clock_snapshots[self._cc2], 19487)
-
- def test_eq(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- msg2 = bt2.InactivityMessage(cc_prio_map_copy)
- msg2.clock_snapshots.add(self._cc1(123))
- msg2.clock_snapshots.add(self._cc2(19487))
- self.assertEqual(msg, msg2)
-
- def test_ne_cc_prio_map(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- cc_prio_map_copy = copy.copy(self._cc_prio_map)
- cc_prio_map_copy[self._cc2] = 23
- msg2 = bt2.InactivityMessage(cc_prio_map_copy)
- self.assertNotEqual(msg, msg2)
-
- def test_ne_clock_snapshot(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- msg2 = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(1847))
- self.assertNotEqual(msg, msg2)
-
- def test_eq_invalid(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- self.assertNotEqual(msg, 23)
-
- def test_copy(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- msg_copy = copy.copy(msg)
- self.assertEqual(msg, msg_copy)
- self.assertNotEqual(msg.addr, msg_copy.addr)
- self.assertEqual(msg.clock_class_priority_map.addr,
- msg_copy.clock_class_priority_map.addr)
- self.assertEqual(msg_copy.clock_snapshots[self._cc1], 123)
- self.assertEqual(msg_copy.clock_snapshots[self._cc2], 19487)
-
- def test_deepcopy(self):
- msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_snapshots.add(self._cc1(123))
- msg.clock_snapshots.add(self._cc2(19487))
- msg_copy = copy.deepcopy(msg)
- self.assertEqual(msg, msg_copy)
- self.assertNotEqual(msg.addr, msg_copy.addr)
- self.assertNotEqual(msg.clock_class_priority_map.addr,
- msg_copy.clock_class_priority_map.addr)
- self.assertEqual(msg.clock_class_priority_map,
- msg_copy.clock_class_priority_map)
- self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr,
- list(msg_copy.clock_class_priority_map)[0].addr)
- self.assertIsNone(msg_copy.clock_snapshots[self._cc1])
- self.assertIsNone(msg_copy.clock_snapshots[self._cc2])
- self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[0]], 123)
- self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[1]], 19487)
-
-
-@unittest.skip("this is broken")
-class DiscardedPacketsMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._clock_class = bt2.ClockClass('yo', 1000)
- self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._sc.packet_context_field_class = bt2.StructureFieldClass()
- self._sc.packet_context_field_class += collections.OrderedDict([
- ('packet_seq_num', self._my_int_fc),
- ('timestamp_begin', self._uint64_int_fc),
- ('timestamp_end', self._uint64_int_fc),
- ])
- self._trace.add_clock_class(self._clock_class)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
-
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._clock_class
- del self._uint64_int_fc
- del self._my_int_fc
- del self._stream
-
- def _create_event(self, packet):
- event = self._ec()
- event.payload_field['my_int'] = 23
- event.packet = packet
- return event
-
- def _get_msg(self):
class MyIter(bt2._UserMessageIterator):
- def __init__(iter_self):
- packet1 = self._stream.create_packet()
- packet1.context_field['packet_seq_num'] = 0
- packet1.context_field['timestamp_begin'] = 3
- packet1.context_field['timestamp_end'] = 6
- packet2 = self._stream.create_packet()
- packet2.context_field['packet_seq_num'] = 5
- packet2.context_field['timestamp_begin'] = 7
- packet2.context_field['timestamp_end'] = 10
- iter_self._ev1 = self._create_event(packet1)
- iter_self._ev2 = self._create_event(packet2)
- iter_self._at = 0
+ def __init__(self):
+ self._at = 0
def __next__(self):
- if self._at == 0:
- msg = bt2.EventMessage(self._ev1)
- elif self._at == 1:
- msg = bt2.EventMessage(self._ev2)
+ if test_obj._clock_class:
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(test_obj._stream)
+ elif self._at == 1:
+ msg = self._create_stream_activity_beginning_message(test_obj._stream, default_clock_snapshot=self._at)
+ elif self._at == 2:
+ msg = self._create_packet_beginning_message(test_obj._packet, self._at)
+ elif self._at == 3:
+ msg = self._create_event_message(test_obj._event_class, test_obj._packet, self._at)
+ elif self._at == 4:
+ msg = self._create_message_iterator_inactivity_message(test_obj._clock_class, self._at)
+ elif self._at == 5:
+ msg = self._create_discarded_events_message(test_obj._stream, 890, self._at, self._at)
+ elif self._at == 6:
+ msg = self._create_packet_end_message(test_obj._packet, self._at)
+ elif self._at == 7:
+ msg = self._create_discarded_packets_message(test_obj._stream, 678, self._at, self._at)
+ elif self._at == 8:
+ msg = self._create_stream_activity_end_message(test_obj._stream, default_clock_snapshot=self._at)
+ elif self._at == 9:
+ msg = self._create_stream_end_message(test_obj._stream)
+ elif self._at >= 10:
+ raise bt2.Stop
else:
- raise bt2.Stop
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(test_obj._stream)
+ elif self._at == 1:
+ msg = self._create_stream_activity_beginning_message(test_obj._stream)
+ elif self._at == 2:
+ msg = self._create_packet_beginning_message(test_obj._packet)
+ elif self._at == 3:
+ msg = self._create_event_message(test_obj._event_class, test_obj._packet)
+ elif self._at == 4:
+ msg = self._create_discarded_events_message(test_obj._stream, 890)
+ elif self._at == 5:
+ msg = self._create_packet_end_message(test_obj._packet)
+ elif self._at == 6:
+ msg = self._create_discarded_packets_message(test_obj._stream, 678)
+ elif self._at == 7:
+ msg = self._create_stream_activity_end_message(test_obj._stream)
+ elif self._at == 8:
+ msg = self._create_stream_end_message(test_obj._stream)
+ elif self._at >= 9:
+ raise bt2.Stop
self._at += 1
return msg
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
def __init__(self, params):
self._add_output_port('out')
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(comp_self):
- nonlocal the_msg
- msg = next(comp_self._msg_iter)
-
- if type(msg) is bt2._DiscardedPacketsMessage:
- the_msg = msg
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
-
- the_msg = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- graph.run()
- return the_msg
-
- def test_create(self):
- self.assertIsInstance(self._get_msg(), bt2._DiscardedPacketsMessage)
-
- def test_count(self):
- self.assertEqual(self._get_msg().count, 4)
-
- def test_stream(self):
- self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
- def test_beginning_clock_snapshot(self):
- msg = self._get_msg()
- beginning_clock_snapshot = msg.beginning_clock_snapshot
- self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_snapshot, 6)
-
- def test_end_clock_snapshot(self):
- msg = self._get_msg()
- end_clock_snapshot = msg.end_clock_snapshot
- self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
- self.assertEqual(end_clock_snapshot, 7)
-
- def test_eq(self):
- msg1 = self._get_msg()
- msg2 = self._get_msg()
- self.assertEqual(msg1, msg2)
-
- def test_eq_invalid(self):
- msg1 = self._get_msg()
- self.assertNotEqual(msg1, 23)
-
-
-@unittest.skip("this is broken")
-class DiscardedEventsMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._trace = bt2.Trace()
- self._sc = bt2.StreamClass()
- self._ec = bt2.EventClass('salut')
- self._clock_class = bt2.ClockClass('yo', 1000)
- self._uint64_int_fc = bt2.IntegerFieldClass(64, mapped_clock_class=self._clock_class)
- self._my_int_fc = bt2.IntegerFieldClass(32)
- self._ec.payload_field_class = bt2.StructureFieldClass()
- self._ec.payload_field_class += collections.OrderedDict([
- ('my_int', self._my_int_fc),
- ])
- self._sc.add_event_class(self._ec)
- self._sc.packet_context_field_class = bt2.StructureFieldClass()
- self._sc.packet_context_field_class += collections.OrderedDict([
- ('events_discarded', self._my_int_fc),
- ('timestamp_begin', self._uint64_int_fc),
- ('timestamp_end', self._uint64_int_fc),
- ])
- self._trace.add_clock_class(self._clock_class)
- self._trace.add_stream_class(self._sc)
- self._stream = self._sc()
-
- def tearDown(self):
- del self._trace
- del self._sc
- del self._ec
- del self._clock_class
- del self._uint64_int_fc
- del self._my_int_fc
- del self._stream
-
- def _create_event(self, packet):
- event = self._ec()
- event.payload_field['my_int'] = 23
- event.packet = packet
- return event
-
- def _get_msg(self):
- class MyIter(bt2._UserMessageIterator):
- def __init__(iter_self):
- packet1 = self._stream.create_packet()
- packet1.context_field['events_discarded'] = 0
- packet1.context_field['timestamp_begin'] = 3
- packet1.context_field['timestamp_end'] = 6
- packet2 = self._stream.create_packet()
- packet2.context_field['events_discarded'] = 10
- packet2.context_field['timestamp_begin'] = 7
- packet2.context_field['timestamp_end'] = 10
- iter_self._ev1 = self._create_event(packet1)
- iter_self._ev2 = self._create_event(packet2)
- iter_self._at = 0
-
- def __next__(self):
- if self._at == 0:
- msg = bt2.EventMessage(self._ev1)
- elif self._at == 1:
- msg = bt2.EventMessage(self._ev2)
+ with_cc = params['with_cc']
+ tc = self._create_trace_class()
+ if with_cc:
+ cc = self._create_clock_class()
+ packets_have_clock_snapshots = True
else:
- raise bt2.Stop
-
- self._at += 1
- return msg
-
- class MySource(bt2._UserSourceComponent,
- message_iterator_class=MyIter):
- def __init__(self, params):
- self._add_output_port('out')
-
- class MySink(bt2._UserSinkComponent):
- def __init__(self, params):
- self._add_input_port('in')
-
- def _consume(comp_self):
- nonlocal the_msg
- msg = next(comp_self._msg_iter)
-
- if type(msg) is bt2._DiscardedEventsMessage:
- the_msg = msg
- raise bt2.Stop
-
- def _port_connected(self, port, other_port):
- self._msg_iter = port.connection.create_message_iterator()
-
- the_msg = None
- graph = bt2.Graph()
- src = graph.add_component(MySource, 'src')
- sink = graph.add_component(MySink, 'sink')
- conn = graph.connect_ports(src.output_ports['out'],
- sink.input_ports['in'])
- graph.run()
- return the_msg
-
- def test_create(self):
- self.assertIsInstance(self._get_msg(), bt2._DiscardedEventsMessage)
-
- def test_count(self):
- self.assertEqual(self._get_msg().count, 10)
-
- def test_stream(self):
- self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
-
- def test_beginning_clock_snapshot(self):
- msg = self._get_msg()
- beginning_clock_snapshot = msg.beginning_clock_snapshot
- self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_snapshot, 6)
-
- def test_end_clock_snapshot(self):
- msg = self._get_msg()
- end_clock_snapshot = msg.end_clock_snapshot
- self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
- self.assertEqual(end_clock_snapshot, 10)
-
- def test_eq(self):
- msg1 = self._get_msg()
- msg2 = self._get_msg()
- self.assertEqual(msg1, msg2)
+ cc = None
+ packets_have_clock_snapshots = False
+
+ sc = tc.create_stream_class(default_clock_class=cc,
+ packets_have_default_beginning_clock_snapshot=packets_have_clock_snapshots,
+ packets_have_default_end_clock_snapshot=packets_have_clock_snapshots)
+
+ # Create payload field class
+ my_int_fc = tc.create_signed_integer_field_class(32)
+ payload_fc = tc.create_structure_field_class()
+ payload_fc += collections.OrderedDict([
+ ('my_int', my_int_fc),
+ ])
+
+ ec = sc.create_event_class(name='salut', payload_field_class=payload_fc)
+
+ trace = tc()
+ stream = trace.create_stream(sc)
+ packet = stream.create_packet()
+
+ test_obj._trace = trace
+ test_obj._stream = stream
+ test_obj._packet = packet
+ test_obj._event_class = ec
+ test_obj._clock_class = cc
+
+ test_obj = self
+ self._graph = bt2.Graph()
+ self._src = MySrc
+ self._iter = MyIter
+
+ def test_all_msg_with_cc(self):
+ params = {'with_cc': True}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out'])
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ elif i == 1:
+ self.assertIsInstance(msg, bt2.message._StreamActivityBeginningMessage)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 2:
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 3:
+ self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertEqual(msg.event.event_class.addr, self._event_class.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 4:
+ self.assertIsInstance(msg, bt2.message._MessageIteratorInactivityMessage)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 5:
+ self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertEqual(msg.default_clock_class.addr, self._clock_class.addr)
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 6:
+ self.assertIsInstance(msg, bt2.message._PacketEndMessage)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 7:
+ self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertEqual(msg.default_clock_class.addr, self._clock_class.addr)
+ self.assertEqual(msg.beginning_default_clock_snapshot.value, i)
+ self.assertEqual(msg.end_default_clock_snapshot.value, i)
+ elif i == 8:
+ self.assertIsInstance(msg, bt2.message._StreamActivityEndMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.default_clock_snapshot.value, i)
+ elif i == 9:
+ self.assertIsInstance(msg, bt2.message._StreamEndMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ else:
+ raise Exception
+
+ def test_all_msg_without_cc(self):
+ params = {'with_cc': False}
+ self._src_comp = self._graph.add_component(self._src, 'my_source', params)
+ self._msg_iter = self._graph.create_output_port_message_iterator(self._src_comp.output_ports['out'])
+
+ for i, msg in enumerate(self._msg_iter):
+ if i == 0:
+ self.assertIsInstance(msg, bt2.message._StreamBeginningMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ elif i == 1:
+ self.assertIsInstance(msg, bt2.message._StreamActivityBeginningMessage)
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.default_clock_snapshot
+ elif i == 2:
+ self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ elif i == 3:
+ self.assertIsInstance(msg, bt2.message._EventMessage)
+ self.assertEqual(msg.event.event_class.addr, self._event_class.addr)
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.default_clock_snapshot
+ elif i == 4:
+ self.assertIsInstance(msg, bt2.message._DiscardedEventsMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 890)
+ self.assertIsNone(msg.default_clock_class)
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.end_default_clock_snapshot
+ elif i == 5:
+ self.assertIsInstance(msg, bt2.message._PacketEndMessage)
+ self.assertEqual(msg.packet.addr, self._packet.addr)
+ elif i == 6:
+ self.assertIsInstance(msg, bt2.message._DiscardedPacketsMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ self.assertEqual(msg.count, 678)
+ self.assertIsNone(msg.default_clock_class)
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.beginning_default_clock_snapshot
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.end_default_clock_snapshot
+ elif i == 7:
+ self.assertIsInstance(msg, bt2.message._StreamActivityEndMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ with self.assertRaises(bt2.NoDefaultClockClass):
+ msg.default_clock_snapshot
+ elif i == 8:
+ self.assertIsInstance(msg, bt2.message._StreamEndMessage)
+ self.assertEqual(msg.stream.addr, self._stream.addr)
+ else:
+ raise Exception
- def test_eq_invalid(self):
- msg1 = self._get_msg()
- self.assertNotEqual(msg1, 23)