_get_ref = staticmethod(native_bt.message_get_ref)
_put_ref = staticmethod(native_bt.message_put_ref)
+ @staticmethod
+ def _check_has_default_clock_class(clock_class):
+ if clock_class is None:
+ raise bt2.NonexistentClockSnapshot('cannot get default clock snapshot: stream class has no default clock class')
+
class _MessageWithDefaultClockSnapshot:
def _get_default_clock_snapshot(self, borrow_clock_snapshot_ptr):
- if not self._has_default_clock_class:
- raise bt2.NoDefaultClockClass('cannot get default clock snapshot, stream class has no default clock class')
-
snapshot_ptr = borrow_clock_snapshot_ptr(self._ptr)
return bt2.clock_snapshot._ClockSnapshot._create_from_ptr_and_get_ref(
class _EventMessage(_Message, _MessageWithDefaultClockSnapshot):
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_event_borrow_default_clock_snapshot_const)
- @property
- def _has_default_clock_class(self):
- return self.event.packet.stream.stream_class.default_clock_class is not None
-
@property
def default_clock_snapshot(self):
+ self._check_has_default_clock_class(self.event.packet.stream.stream_class.default_clock_class)
return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
@property
class _PacketMessage(_Message, _MessageWithDefaultClockSnapshot):
- @property
- def _has_default_clock_class(self):
- return self.packet.stream.stream_class.default_clock_class is not None
-
@property
def default_clock_snapshot(self):
+ self._check_has_default_clock_class(self.packet.stream.stream_class.default_clock_class)
return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
@property
class _StreamActivityMessage(_Message):
@property
def default_clock_snapshot(self):
- if self.stream.stream_class.default_clock_class is None:
- raise bt2.NoDefaultClockClass('cannot get default clock snapshot, stream class has no default clock class')
-
+ self._check_has_default_clock_class(self.stream.stream_class.default_clock_class)
status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
if status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
class _MessageIteratorInactivityMessage(_Message, _MessageWithDefaultClockSnapshot):
- # This kind of message always has a default clock class.
- _has_default_clock_class = True
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_message_iterator_inactivity_borrow_default_clock_snapshot_const)
@property
def default_clock_snapshot(self):
+ # This kind of message always has a default clock class: no
+ # need to call self._check_has_default_clock_class() here.
return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
assert stream_ptr
return bt2.stream._Stream._create_from_ptr_and_get_ref(stream_ptr)
- @property
- def _has_default_clock_class(self):
- return self.default_clock_class is not None
-
- @property
- def default_clock_class(self):
- cc_ptr = self._borrow_clock_class_ptr(self._ptr)
- if cc_ptr is not None:
- return bt2.clock_class._ClockClass._create_from_ptr_and_get_ref(cc_ptr)
-
@property
def count(self):
avail, count = self._get_count(self._ptr)
_count = property(fset=_set_count)
+ def _check_has_default_clock_snapshots(self):
+ if not self._has_default_clock_snapshots:
+ raise bt2.NonexistentClockSnapshot('cannot get default clock snapshot: such a message has no clock snapshots for this stream class')
+
@property
def beginning_default_clock_snapshot(self):
+ self._check_has_default_clock_snapshots()
return self._get_default_clock_snapshot(self._borrow_beginning_clock_snapshot_ptr)
@property
def end_default_clock_snapshot(self):
+ self._check_has_default_clock_snapshots()
return self._get_default_clock_snapshot(self._borrow_end_clock_snapshot_ptr)
_borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream_const)
_get_count = staticmethod(native_bt.message_discarded_events_get_count)
_set_count = staticmethod(native_bt.message_discarded_events_set_count)
- _borrow_clock_class_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream_class_default_clock_class_const)
- _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_default_beginning_clock_snapshot_const)
- _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_default_end_clock_snapshot_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_beginning_default_clock_snapshot_const)
+ _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_events_borrow_end_default_clock_snapshot_const)
+
+ @property
+ def _has_default_clock_snapshots(self):
+ return self.stream.stream_class.discarded_events_have_default_clock_snapshots
class _DiscardedPacketsMessage(_DiscardedMessage):
_borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream_const)
_get_count = staticmethod(native_bt.message_discarded_packets_get_count)
_set_count = staticmethod(native_bt.message_discarded_packets_set_count)
- _borrow_clock_class_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream_class_default_clock_class_const)
- _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_default_beginning_clock_snapshot_const)
- _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_default_end_clock_snapshot_const)
+ _borrow_beginning_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_beginning_default_clock_snapshot_const)
+ _borrow_end_clock_snapshot_ptr = staticmethod(native_bt.message_discarded_packets_borrow_end_default_clock_snapshot_const)
+
+ @property
+ def _has_default_clock_snapshots(self):
+ return self.stream.stream_class.discarded_packets_have_default_clock_snapshots
_MESSAGE_TYPE_TO_CLS = {