# The native code calls this, then manually calls
# self.__init__() without the `ptr` argument. The user has
# access to self.component during this call, thanks to this
# The native code calls this, then manually calls
# self.__init__() without the `ptr` argument. The user has
# access to self.component during this call, thanks to this
self_output_port = bt2.port._create_self_from_ptr_and_get_ref(
self_output_port_ptr, native_bt.PORT_TYPE_OUTPUT)
self.__init__(self_output_port)
self_output_port = bt2.port._create_self_from_ptr_and_get_ref(
self_output_port_ptr, native_bt.PORT_TYPE_OUTPUT)
self.__init__(self_output_port)
# Here, we mimic the behavior of the C API:
#
# - If the iterator has a _can_seek_beginning attribute, read it and use
# Here, we mimic the behavior of the C API:
#
# - If the iterator has a _can_seek_beginning attribute, read it and use
self._seek_beginning()
def _create_event_message(self, event_class, packet, default_clock_snapshot=None):
self._seek_beginning()
def _create_event_message(self, event_class, packet, default_clock_snapshot=None):
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_event_create_with_default_clock_snapshot(
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_event_create_with_default_clock_snapshot(
- self._ptr, event_class._ptr, packet._ptr, default_clock_snapshot)
+ self._bt_ptr, event_class._ptr, packet._ptr, default_clock_snapshot)
else:
if event_class.stream_class.default_clock_class is not None:
raise ValueError('event messages in this stream must have a default clock snapshot')
ptr = native_bt.message_event_create(
else:
if event_class.stream_class.default_clock_class is not None:
raise ValueError('event messages in this stream must have a default clock snapshot')
ptr = native_bt.message_event_create(
def _create_message_iterator_inactivity_message(self, clock_class, clock_snapshot):
utils._check_type(clock_class, bt2.clock_class._ClockClass)
ptr = native_bt.message_message_iterator_inactivity_create(
def _create_message_iterator_inactivity_message(self, clock_class, clock_snapshot):
utils._check_type(clock_class, bt2.clock_class._ClockClass)
ptr = native_bt.message_message_iterator_inactivity_create(
isinst_infinite = isinstance(default_cs, bt2.message._StreamActivityMessageInfiniteClockSnapshot)
isinst_unknown = isinstance(default_cs, bt2.message._StreamActivityMessageUnknownClockSnapshot)
isinst_infinite = isinstance(default_cs, bt2.message._StreamActivityMessageInfiniteClockSnapshot)
isinst_unknown = isinstance(default_cs, bt2.message._StreamActivityMessageUnknownClockSnapshot)
def _create_stream_beginning_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
def _create_stream_beginning_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
def _create_stream_activity_beginning_message(self, stream,
default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
def _create_stream_activity_beginning_message(self, stream,
default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
- ptr = native_bt.message_stream_activity_beginning_create(self._ptr, stream._ptr)
+ self._bt_validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
+ ptr = native_bt.message_stream_activity_beginning_create(self._bt_ptr, stream._ptr)
def _create_stream_activity_end_message(self, stream,
default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
def _create_stream_activity_end_message(self, stream,
default_clock_snapshot=_unknown_clock_snapshot):
utils._check_type(stream, bt2.stream._Stream)
- self._validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
- ptr = native_bt.message_stream_activity_end_create(self._ptr, stream._ptr)
+ self._bt_validate_stream_activity_message_default_clock_snapshot(stream, default_clock_snapshot)
+ ptr = native_bt.message_stream_activity_end_create(self._bt_ptr, stream._ptr)
def _create_stream_end_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
def _create_stream_end_message(self, stream):
utils._check_type(stream, bt2.stream._Stream)
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_packet_beginning_create_with_default_clock_snapshot(
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_packet_beginning_create_with_default_clock_snapshot(
else:
if default_clock_snapshot is not None:
raise ValueError("packet beginning messages in this stream must not have a default clock snapshot")
else:
if default_clock_snapshot is not None:
raise ValueError("packet beginning messages in this stream must not have a default clock snapshot")
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_packet_end_create_with_default_clock_snapshot(
utils._check_uint64(default_clock_snapshot)
ptr = native_bt.message_packet_end_create_with_default_clock_snapshot(
else:
if default_clock_snapshot is not None:
raise ValueError("packet end messages in this stream must not have a default clock snapshot")
else:
if default_clock_snapshot is not None:
raise ValueError("packet end messages in this stream must not have a default clock snapshot")
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_events_create_with_default_clock_snapshots(
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_events_create_with_default_clock_snapshots(
- self._ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
+ self._bt_ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
else:
if beg_clock_snapshot is not None or end_clock_snapshot is not None:
raise ValueError('discarded events have no default clock snapshots for this stream class')
ptr = native_bt.message_discarded_events_create(
else:
if beg_clock_snapshot is not None or end_clock_snapshot is not None:
raise ValueError('discarded events have no default clock snapshots for this stream class')
ptr = native_bt.message_discarded_events_create(
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_packets_create_with_default_clock_snapshots(
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_packets_create_with_default_clock_snapshots(
- self._ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
+ self._bt_ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
else:
if beg_clock_snapshot is not None or end_clock_snapshot is not None:
raise ValueError('discarded packets have no default clock snapshots for this stream class')
ptr = native_bt.message_discarded_packets_create(
else:
if beg_clock_snapshot is not None or end_clock_snapshot is not None:
raise ValueError('discarded packets have no default clock snapshots for this stream class')
ptr = native_bt.message_discarded_packets_create(