_borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
+# Specific type to pass an unknown clock snapshot when creating a stream
+# beginning/end message.
+class _StreamActivityMessageUnknownClockSnapshot:
+ pass
+
+
+# Specific type to pass an infinite clock snapshot when creating a
+# stream beginning/end message.
+class _StreamActivityMessageInfiniteClockSnapshot:
+ pass
+
+
class _StreamActivityMessage(_Message):
@property
def default_clock_snapshot(self):
- self._check_has_default_clock_class(self.stream.cls.default_clock_class)
status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
if status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
- snapshot_type = bt2.clock_snapshot._ClockSnapshot
+ cs_type = bt2.clock_snapshot._ClockSnapshot
+ assert snapshot_ptr is not None
+ return cs_type._create_from_ptr_and_get_ref(snapshot_ptr, self._ptr,
+ self._get_ref, self._put_ref)
elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN:
- snapshot_type = bt2.clock_snapshot._UnknownClockSnapshot
+ return bt2.clock_snapshot._UnknownClockSnapshot()
elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE:
- snapshot_type = bt2.clock_snapshot._InfiniteClockSnapshot
+ return bt2.clock_snapshot._InfiniteClockSnapshot()
else:
raise bt2.Error('cannot borrow default clock snapshot from message')
- assert snapshot_ptr is not None
-
- return snapshot_type._create_from_ptr_and_get_ref(
- snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
-
def _default_clock_snapshot(self, value):
- self._set_default_clock_snapshot_ptr(self._ptr, value)
+ if type(value) is _StreamActivityMessageUnknownClockSnapshot:
+ self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN)
+ elif type(value) is _StreamActivityMessageInfiniteClockSnapshot:
+ self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE)
+ else:
+ assert utils._is_uint64(value)
+ self._set_default_clock_snapshot_ptr(self._ptr, value)
_default_clock_snapshot = property(fset=_default_clock_snapshot)
class _StreamActivityBeginningMessage(_StreamActivityMessage):
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_default_clock_snapshot_const)
_set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot)
+ _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot_state)
_borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_stream)
class _StreamActivityEndMessage(_StreamActivityMessage):
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_default_clock_snapshot_const)
_set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot)
+ _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot_state)
_borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_stream)