@staticmethod
def _check_has_default_clock_class(clock_class):
if clock_class is None:
- raise bt2.NonexistentClockSnapshot('cannot get default clock snapshot: stream class has no default clock class')
+ raise bt2.NonexistentClockSnapshot(
+ 'cannot get default clock snapshot: stream class has no default clock class'
+ )
class _MessageWithDefaultClockSnapshot:
snapshot_ptr = borrow_clock_snapshot_ptr(self._ptr)
return bt2.clock_snapshot._ClockSnapshot._create_from_ptr_and_get_ref(
- snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
+ snapshot_ptr, self._ptr, self._get_ref, self._put_ref
+ )
class _EventMessage(_Message, _MessageWithDefaultClockSnapshot):
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_event_borrow_default_clock_snapshot_const)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_event_borrow_default_clock_snapshot_const
+ )
@property
def default_clock_snapshot(self):
- self._check_has_default_clock_class(self.event.packet.stream.cls.default_clock_class)
+ self._check_has_default_clock_class(self.event.stream.cls.default_clock_class)
return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
@property
event_ptr = native_bt.message_event_borrow_event(self._ptr)
assert event_ptr is not None
return bt2.event._Event._create_from_ptr_and_get_ref(
- event_ptr, self._ptr, self._get_ref, self._put_ref)
+ event_ptr, self._ptr, self._get_ref, self._put_ref
+ )
class _PacketMessage(_Message, _MessageWithDefaultClockSnapshot):
class _PacketBeginningMessage(_PacketMessage):
_borrow_packet_ptr = staticmethod(native_bt.message_packet_beginning_borrow_packet)
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_packet_beginning_borrow_default_clock_snapshot_const)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_packet_beginning_borrow_default_clock_snapshot_const
+ )
class _PacketEndMessage(_PacketMessage):
_borrow_packet_ptr = staticmethod(native_bt.message_packet_end_borrow_packet)
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_packet_end_borrow_default_clock_snapshot_const)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_packet_end_borrow_default_clock_snapshot_const
+ )
-class _StreamMessage(_Message):
+class _StreamMessage(_Message, _MessageWithDefaultClockSnapshot):
@property
def stream(self):
stream_ptr = self._borrow_stream_ptr(self._ptr)
assert stream_ptr
return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
-
-class _StreamBeginningMessage(_StreamMessage):
- _borrow_stream_ptr = staticmethod(native_bt.message_stream_beginning_borrow_stream)
-
-
-class _StreamEndMessage(_StreamMessage):
- _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
-
-
-# Specific type to pass an unknown clock snapshot when creating a stream
-# beginning/end message.
-class _StreamActivityMessageUnknownClockSnapshot:
- pass
-
-
-# Specific type to pass an infinite clock snapshot when creating a
-# stream beginning/end message.
-class _StreamActivityMessageInfiniteClockSnapshot:
- pass
-
-
-class _StreamActivityMessage(_Message):
@property
def default_clock_snapshot(self):
+ self._check_has_default_clock_class(self.stream.cls.default_clock_class)
+
status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
- if status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
- cs_type = bt2.clock_snapshot._ClockSnapshot
- assert snapshot_ptr is not None
- return cs_type._create_from_ptr_and_get_ref(snapshot_ptr, self._ptr,
- self._get_ref, self._put_ref)
- elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN:
+ if status == native_bt.MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN:
return bt2.clock_snapshot._UnknownClockSnapshot()
- elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE:
- return bt2.clock_snapshot._InfiniteClockSnapshot()
- else:
- raise bt2.Error('cannot borrow default clock snapshot from message')
-
- def _default_clock_snapshot(self, value):
- if type(value) is _StreamActivityMessageUnknownClockSnapshot:
- self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN)
- elif type(value) is _StreamActivityMessageInfiniteClockSnapshot:
- self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE)
- else:
- assert utils._is_uint64(value)
- self._set_default_clock_snapshot_ptr(self._ptr, value)
- _default_clock_snapshot = property(fset=_default_clock_snapshot)
+ return bt2.clock_snapshot._ClockSnapshot._create_from_ptr_and_get_ref(
+ snapshot_ptr, self._ptr, self._get_ref, self._put_ref
+ )
- @property
- def stream(self):
- stream_ptr = self._borrow_stream_ptr(self._ptr)
- assert stream_ptr
- return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
+ def _default_clock_snapshot(self, raw_value):
+ utils._check_uint64(raw_value)
+ self._set_default_clock_snapshot(self._ptr, raw_value)
+
+ _default_clock_snapshot = property(fset=_default_clock_snapshot)
-class _StreamActivityBeginningMessage(_StreamActivityMessage):
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_default_clock_snapshot_const)
- _set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot)
- _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot_state)
- _borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_stream)
+class _StreamBeginningMessage(_StreamMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_beginning_borrow_stream)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_stream_beginning_borrow_default_clock_snapshot_const
+ )
+ _set_default_clock_snapshot = staticmethod(
+ native_bt.message_stream_beginning_set_default_clock_snapshot
+ )
-class _StreamActivityEndMessage(_StreamActivityMessage):
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_default_clock_snapshot_const)
- _set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot)
- _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot_state)
- _borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_stream)
+class _StreamEndMessage(_StreamMessage):
+ _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_stream_end_borrow_default_clock_snapshot_const
+ )
+ _set_default_clock_snapshot = staticmethod(
+ native_bt.message_stream_end_set_default_clock_snapshot
+ )
class _MessageIteratorInactivityMessage(_Message, _MessageWithDefaultClockSnapshot):
- _borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_message_iterator_inactivity_borrow_default_clock_snapshot_const)
+ _borrow_default_clock_snapshot_ptr = staticmethod(
+ native_bt.message_message_iterator_inactivity_borrow_default_clock_snapshot_const
+ )
@property
def default_clock_snapshot(self):
def _check_has_default_clock_snapshots(self):
if not self._has_default_clock_snapshots:
- raise bt2.NonexistentClockSnapshot('cannot get default clock snapshot: such a message has no clock snapshots for this stream class')
+ raise bt2.NonexistentClockSnapshot(
+ 'cannot get default clock snapshot: such a message has no clock snapshots for this stream class'
+ )
@property
def beginning_default_clock_snapshot(self):
self._check_has_default_clock_snapshots()
- return self._get_default_clock_snapshot(self._borrow_beginning_clock_snapshot_ptr)
+ return self._get_default_clock_snapshot(
+ self._borrow_beginning_clock_snapshot_ptr
+ )
@property
def end_default_clock_snapshot(self):
class _DiscardedEventsMessage(_DiscardedMessage):
- _borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream_const)
+ _borrow_stream_ptr = staticmethod(
+ native_bt.message_discarded_events_borrow_stream_const
+ )
_get_count = staticmethod(native_bt.message_discarded_events_get_count)
_set_count = staticmethod(native_bt.message_discarded_events_set_count)
- _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_beginning_default_clock_snapshot_const)
- _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_end_default_clock_snapshot_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(
+ native_bt.message_discarded_events_borrow_beginning_default_clock_snapshot_const
+ )
+ _borrow_end_clock_snapshot_ptr = staticmethod(
+ native_bt.message_discarded_events_borrow_end_default_clock_snapshot_const
+ )
@property
def _has_default_clock_snapshots(self):
class _DiscardedPacketsMessage(_DiscardedMessage):
- _borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream_const)
+ _borrow_stream_ptr = staticmethod(
+ native_bt.message_discarded_packets_borrow_stream_const
+ )
_get_count = staticmethod(native_bt.message_discarded_packets_get_count)
_set_count = staticmethod(native_bt.message_discarded_packets_set_count)
- _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_beginning_default_clock_snapshot_const)
- _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_end_default_clock_snapshot_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(
+ native_bt.message_discarded_packets_borrow_beginning_default_clock_snapshot_const
+ )
+ _borrow_end_clock_snapshot_ptr = staticmethod(
+ native_bt.message_discarded_packets_borrow_end_default_clock_snapshot_const
+ )
@property
def _has_default_clock_snapshots(self):
native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessage,
native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage,
native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage,
- native_bt.MESSAGE_TYPE_STREAM_ACTIVITY_BEGINNING: _StreamActivityBeginningMessage,
- native_bt.MESSAGE_TYPE_STREAM_ACTIVITY_END: _StreamActivityEndMessage,
native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessage,
native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage,
}