def _create_packet_beginning_message(self, packet, default_clock_snapshot=None):
utils._check_type(packet, bt2.packet._Packet)
- if packet.stream.stream_class.packets_have_default_beginning_clock_snapshot:
+ if packet.stream.stream_class.packets_have_beginning_default_clock_snapshot:
if default_clock_snapshot is None:
raise ValueError("packet beginning messages in this stream must have a default clock snapshots")
def _create_packet_end_message(self, packet, default_clock_snapshot=None):
utils._check_type(packet, bt2.packet._Packet)
- if packet.stream.stream_class.packets_have_default_end_clock_snapshot:
+ if packet.stream.stream_class.packets_have_end_default_clock_snapshot:
if default_clock_snapshot is None:
raise ValueError("packet end messages in this stream must have a default clock snapshots")
end_clock_snapshot=None):
utils._check_type(stream, bt2.stream._Stream)
- if beg_clock_snapshot is None and end_clock_snapshot is None:
- ptr = native_bt.message_discarded_events_create(self._ptr, stream._ptr)
- elif beg_clock_snapshot is not None and end_clock_snapshot is not None:
+ if not stream.stream_class.supports_discarded_events:
+ raise ValueError('stream class does not support discarded events')
+
+ if stream.stream_class.discarded_events_have_default_clock_snapshots:
+ if beg_clock_snapshot is None or end_clock_snapshot is None:
+ raise ValueError('discarded events have default clock snapshots for this stream class')
+
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_events_create_with_default_clock_snapshots(
self._ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
else:
- raise ValueError('begin and end clock snapshots must be both provided or both omitted')
+ if beg_clock_snapshot is not None or end_clock_snapshot is not None:
+ raise ValueError('discarded events have no default clock snapshots for this stream class')
+
+ ptr = native_bt.message_discarded_events_create(
+ self._ptr, stream._ptr)
if ptr is None:
raise bt2.CreationError('cannot discarded events message object')
def _create_discarded_packets_message(self, stream, count=None, beg_clock_snapshot=None, end_clock_snapshot=None):
utils._check_type(stream, bt2.stream._Stream)
- if beg_clock_snapshot is None and end_clock_snapshot is None:
- ptr = native_bt.message_discarded_packets_create(self._ptr, stream._ptr)
- elif beg_clock_snapshot is not None and end_clock_snapshot is not None:
+ if not stream.stream_class.supports_discarded_packets:
+ raise ValueError('stream class does not support discarded packets')
+
+ if stream.stream_class.discarded_packets_have_default_clock_snapshots:
+ if beg_clock_snapshot is None or end_clock_snapshot is None:
+ raise ValueError('discarded packets have default clock snapshots for this stream class')
+
utils._check_uint64(beg_clock_snapshot)
utils._check_uint64(end_clock_snapshot)
ptr = native_bt.message_discarded_packets_create_with_default_clock_snapshots(
self._ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot)
else:
- raise ValueError('begin and end clock snapshots must be both provided or both omitted')
+ if beg_clock_snapshot is not None or end_clock_snapshot is not None:
+ raise ValueError('discarded packets have no default clock snapshots for this stream class')
+
+ ptr = native_bt.message_discarded_packets_create(
+ self._ptr, stream._ptr)
if ptr is None:
raise bt2.CreationError('cannot discarded packets message object')