return bt2.message._create_from_ptr(msg_ptr)
-class _PrivateConnectionMessageIterator(_GenericMessageIterator):
+class _UserComponentInputPortMessageIterator(_GenericMessageIterator):
+ _get_msg_range = staticmethod(native_bt.py3_self_component_port_input_get_msg_range)
+
@property
def component(self):
comp_ptr = native_bt.private_connection_message_iterator_get_component(self._ptr)
return bt2.message._StreamBeginningMessage(ptr)
+ def _create_stream_end_message(self, stream):
+ utils._check_type(stream, bt2.stream._Stream)
+
+ ptr = native_bt.message_stream_end_create(self._ptr, stream._ptr)
+ if ptr is None:
+ raise bt2.CreationError('cannot create stream end message object')
+
+ return bt2.message._StreamEndMessage(ptr)
+
def _create_packet_beginning_message(self, packet, default_clock_snapshot=None):
utils._check_type(packet, bt2.packet._Packet)
raise bt2.CreationError('cannot create packet beginning message object')
return bt2.message._PacketBeginningMessage(ptr)
+
+ def _create_packet_end_message(self, packet, default_clock_snapshot=None):
+ utils._check_type(packet, bt2.packet._Packet)
+ self._validate_default_clock_snapshot(packet.stream.stream_class, default_clock_snapshot)
+
+ if default_clock_snapshot is not None:
+ utils._check_uint64(default_clock_snapshot)
+ ptr = native_bt.message_packet_end_create_with_default_clock_snapshot(
+ self._ptr, packet._ptr, default_clock_snapshot)
+ else:
+ ptr = native_bt.message_packet_end_create(self._ptr, packet._ptr)
+
+ if ptr is None:
+ raise bt2.CreationError('cannot create packet end message object')
+
+ return bt2.message._PacketEndMessage(ptr)