_borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
+# Specific type to pass an unknown clock snapshot when creating a stream
+# beginning/end message.
+class _StreamActivityMessageUnknownClockSnapshot:
+ pass
+
+
+# Specific type to pass an infinite clock snapshot when creating a
+# stream beginning/end message.
+class _StreamActivityMessageInfiniteClockSnapshot:
+ pass
+
+
class _StreamActivityMessage(_Message):
@property
def default_clock_snapshot(self):
- self._check_has_default_clock_class(self.stream.cls.default_clock_class)
status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
if status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_KNOWN:
- snapshot_type = bt2.clock_snapshot._ClockSnapshot
+ cs_type = bt2.clock_snapshot._ClockSnapshot
+ assert snapshot_ptr is not None
+ return cs_type._create_from_ptr_and_get_ref(snapshot_ptr, self._ptr,
+ self._get_ref, self._put_ref)
elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN:
- snapshot_type = bt2.clock_snapshot._UnknownClockSnapshot
+ return bt2.clock_snapshot._UnknownClockSnapshot()
elif status == native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE:
- snapshot_type = bt2.clock_snapshot._InfiniteClockSnapshot
+ return bt2.clock_snapshot._InfiniteClockSnapshot()
else:
raise bt2.Error('cannot borrow default clock snapshot from message')
- assert snapshot_ptr is not None
-
- return snapshot_type._create_from_ptr_and_get_ref(
- snapshot_ptr, self._ptr, self._get_ref, self._put_ref)
-
def _default_clock_snapshot(self, value):
- self._set_default_clock_snapshot_ptr(self._ptr, value)
+ if type(value) is _StreamActivityMessageUnknownClockSnapshot:
+ self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_UNKNOWN)
+ elif type(value) is _StreamActivityMessageInfiniteClockSnapshot:
+ self._set_default_clock_snapshot_state(self._ptr, native_bt.MESSAGE_STREAM_ACTIVITY_CLOCK_SNAPSHOT_STATE_INFINITE)
+ else:
+ assert utils._is_uint64(value)
+ self._set_default_clock_snapshot_ptr(self._ptr, value)
_default_clock_snapshot = property(fset=_default_clock_snapshot)
class _StreamActivityBeginningMessage(_StreamActivityMessage):
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_default_clock_snapshot_const)
_set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot)
+ _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_beginning_set_default_clock_snapshot_state)
_borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_beginning_borrow_stream)
class _StreamActivityEndMessage(_StreamActivityMessage):
_borrow_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_default_clock_snapshot_const)
_set_default_clock_snapshot_ptr = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot)
+ _set_default_clock_snapshot_state = staticmethod(native_bt.message_stream_activity_end_set_default_clock_snapshot_state)
_borrow_stream_ptr = staticmethod(native_bt.message_stream_activity_end_borrow_stream)
ptr = msg._release()
return int(ptr)
- # Validate that the presence or lack of presence of a
- # `default_clock_snapshot` value is valid in the context of `stream_class`.
- @staticmethod
- def _validate_default_clock_snapshot(stream_class, default_clock_snapshot):
- stream_class_has_default_clock_class = stream_class.default_clock_class is not None
-
- if stream_class_has_default_clock_class and default_clock_snapshot is None:
- raise bt2.Error(
- 'stream class has a default clock class, default_clock_snapshot should not be None')
-
- if not stream_class_has_default_clock_class and default_clock_snapshot is not None:
- raise bt2.Error(
- 'stream class has no default clock class, default_clock_snapshot should be None')
-
def _create_event_message(self, event_class, packet, default_clock_snapshot=None):
utils._check_type(event_class, bt2.event_class._EventClass)
utils._check_type(packet, bt2.packet._Packet)
- self._validate_default_clock_snapshot(packet.stream.cls, default_clock_snapshot)
if default_clock_snapshot is not None:
+ if event_class.stream_class.default_clock_class is None:
+ raise ValueError('event messages in this stream must not have a default clock snapshot')
+
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_event_create_with_default_clock_snapshot(
self._ptr, event_class._ptr, packet._ptr, default_clock_snapshot)
else:
+ if event_class.stream_class.default_clock_class is not None:
+ raise ValueError('event messages in this stream must have a default clock snapshot')
+
ptr = native_bt.message_event_create(
self._ptr, event_class._ptr, packet._ptr)
return bt2.message._MessageIteratorInactivityMessage(ptr)
+ _unknown_clock_snapshot = bt2.message._StreamActivityMessageUnknownClockSnapshot()
+ _infinite_clock_snapshot = bt2.message._StreamActivityMessageInfiniteClockSnapshot()
+
+ @staticmethod
+ def _validate_stream_activity_message_default_clock_snapshot(stream, default_cs):
+ isinst_infinite = isinstance(default_cs, bt2.message._StreamActivityMessageInfiniteClockSnapshot)
+ isinst_unknown = isinstance(default_cs, bt2.message._StreamActivityMessageUnknownClockSnapshot)
+
+ if utils._is_uint64(default_cs):
+ pass
+ elif isinst_infinite or isinst_unknown:
+ if default_cs is not _UserMessageIterator._unknown_clock_snapshot and default_cs is not _UserMessageIterator._infinite_clock_snapshot:
+ raise ValueError('unexpected value for default clock snapshot')
+ else:
+ raise TypeError("unexpected type '{}' for default clock snapshot".format(default_cs.__class__.__name__))
+
+ if stream.cls.default_clock_class is None:
+ if utils._is_uint64(default_cs):
+ raise ValueError('stream activity messages in this stream cannot have a known default clock snapshot')
+
def _create_stream_beginning_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
return bt2.message._StreamBeginningMessage(ptr)
- def _create_stream_activity_beginning_message(self, stream, default_clock_snapshot=None):
+ def _create_stream_activity_beginning_message(self, stream,
+ default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_default_clock_snapshot(stream.cls, default_clock_snapshot)
-
+ self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
ptr = native_bt.message_stream_activity_beginning_create(self._ptr, stream._ptr)
if ptr is None:
'cannot create stream activity beginning message object')
msg = bt2.message._StreamActivityBeginningMessage(ptr)
-
- if default_clock_snapshot is not None:
- msg._default_clock_snapshot = default_clock_snapshot
-
+ msg._default_clock_snapshot = default_clock_snapshot
return msg
- def _create_stream_activity_end_message(self, stream, default_clock_snapshot=None):
+ def _create_stream_activity_end_message(self, stream,
+ default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_default_clock_snapshot(stream.cls, default_clock_snapshot)
-
+ self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
ptr = native_bt.message_stream_activity_end_create(self._ptr, stream._ptr)
if ptr is None:
'cannot create stream activity end message object')
msg = bt2.message._StreamActivityEndMessage(ptr)
-
- if default_clock_snapshot is not None:
- msg._default_clock_snapshot = default_clock_snapshot
-
+ msg._default_clock_snapshot = default_clock_snapshot
return msg
def _create_stream_end_message(self, stream):
self.assertEqual(msg.stream.addr, self._stream.addr)
elif i == 1:
self.assertIsInstance(msg, bt2.message._StreamActivityBeginningMessage)
- with self.assertRaises(bt2.NonexistentClockSnapshot):
- msg.default_clock_snapshot
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
elif i == 2:
self.assertIsInstance(msg, bt2.message._PacketBeginningMessage)
self.assertEqual(msg.packet.addr, self._packet.addr)
elif i == 7:
self.assertIsInstance(msg, bt2.message._StreamActivityEndMessage)
self.assertEqual(msg.stream.addr, self._stream.addr)
- with self.assertRaises(bt2.NonexistentClockSnapshot):
- msg.default_clock_snapshot
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
elif i == 8:
self.assertIsInstance(msg, bt2.message._StreamEndMessage)
self.assertEqual(msg.stream.addr, self._stream.addr)
else:
raise Exception
+
+class StreamActivityMessagesTestCase(unittest.TestCase):
+ def _test_create_msg(self, with_cc, test_create_beginning_func, test_create_end_func):
+ class MyIter(bt2._UserMessageIterator):
+ def __init__(self, self_port_output):
+ self._at = 0
+
+ def __next__(self):
+ if self._at == 0:
+ msg = self._create_stream_beginning_message(self._component._stream)
+ elif self._at == 1:
+ msg = test_create_beginning_func(self, self._component._stream)
+ elif self._at == 2:
+ msg = test_create_end_func(self, self._component._stream)
+ elif self._at == 3:
+ msg = self._create_stream_end_message(self._component._stream)
+ elif self._at >= 4:
+ raise bt2.Stop
+
+ self._at += 1
+ return msg
+
+ class MySrc(bt2._UserSourceComponent, message_iterator_class=MyIter):
+ def __init__(self, params):
+ self._add_output_port('out')
+ tc = self._create_trace_class()
+
+ if with_cc:
+ cc = self._create_clock_class()
+ sc = tc.create_stream_class(default_clock_class=cc)
+ else:
+ sc = tc.create_stream_class()
+
+ # Create payload field class
+ trace = tc()
+ self._stream = trace.create_stream(sc)
+
+ graph = bt2.Graph()
+ src_comp = graph.add_component(MySrc, 'src')
+ msg_iter = graph.create_output_port_message_iterator(src_comp.output_ports['out'])
+
+ for msg in msg_iter:
+ pass
+
+ def test_create_beginning_with_cc_with_known_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_beginning_message(stream, 172)
+ self.assertEqual(msg.default_clock_snapshot.value, 172)
+ return msg
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream, 199)
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_end_with_cc_with_known_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream, 172)
+
+ def create_end(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_end_message(stream, 199)
+ self.assertEqual(msg.default_clock_snapshot.value, 199)
+ return msg
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_beginning_with_cc_with_unknown_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_beginning_message(stream,
+ msg_iter._unknown_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
+ return msg
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream, 199)
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_end_with_cc_with_unknown_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream, 172)
+
+ def create_end(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_end_message(stream,
+ msg_iter._unknown_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
+ return msg
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_beginning_with_cc_with_infinite_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_beginning_message(stream,
+ msg_iter._infinite_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._InfiniteClockSnapshot)
+ return msg
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream, 199)
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_end_with_cc_with_infinite_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream, 172)
+
+ def create_end(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_end_message(stream,
+ msg_iter._infinite_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._InfiniteClockSnapshot)
+ return msg
+
+ self._test_create_msg(True, create_beginning, create_end)
+
+ def test_create_beginning_without_cc_with_known_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ with self.assertRaises(ValueError):
+ msg_iter._create_stream_activity_beginning_message(stream, 172)
+
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_end_without_cc_with_known_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ with self.assertRaises(ValueError):
+ msg_iter._create_stream_activity_end_message(stream, 199)
+
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_beginning_without_cc_with_unknown_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_beginning_message(stream,
+ msg_iter._unknown_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
+ return msg
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_end_without_cc_with_unknown_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_end_message(stream,
+ msg_iter._unknown_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._UnknownClockSnapshot)
+ return msg
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_beginning_without_cc_with_infinite_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_beginning_message(stream,
+ msg_iter._infinite_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._InfiniteClockSnapshot)
+ return msg
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_end_without_cc_with_infinite_default_cs(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ msg = msg_iter._create_stream_activity_end_message(stream,
+ msg_iter._infinite_clock_snapshot)
+ self.assertIsInstance(msg.default_clock_snapshot,
+ bt2._InfiniteClockSnapshot)
+ return msg
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_beginning_default_cs_wrong_type(self):
+ def create_beginning(msg_iter, stream):
+ with self.assertRaises(TypeError):
+ msg_iter._create_stream_activity_beginning_message(stream, 'infinite')
+
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)
+
+ def test_create_end_without_default_cs_wrong_type(self):
+ def create_beginning(msg_iter, stream):
+ return msg_iter._create_stream_activity_beginning_message(stream)
+
+ def create_end(msg_iter, stream):
+ with self.assertRaises(TypeError):
+ msg_iter._create_stream_activity_end_message(stream, 'unknown')
+
+ return msg_iter._create_stream_activity_end_message(stream)
+
+ self._test_create_msg(False, create_beginning, create_end)