from bt2 import native_bt, object, utils
import bt2.clock_class_priority_map
+import bt2.clock_value
+import collections
import bt2.packet
import bt2.stream
import bt2.event
class EventNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_EVENT
+ _TYPE = native_bt.MESSAGE_TYPE_EVENT
def __init__(self, event, cc_prio_map=None):
utils._check_type(event, bt2.event._Event)
class PacketBeginningNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_PACKET_BEGIN
+ _TYPE = native_bt.MESSAGE_TYPE_PACKET_BEGINNING
def __init__(self, packet):
utils._check_type(packet, bt2.packet._Packet)
class PacketEndNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_PACKET_END
+ _TYPE = native_bt.MESSAGE_TYPE_PACKET_END
def __init__(self, packet):
utils._check_type(packet, bt2.packet._Packet)
class StreamBeginningNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_STREAM_BEGIN
+ _TYPE = native_bt.MESSAGE_TYPE_STREAM_BEGINNING
def __init__(self, stream):
utils._check_type(stream, bt2.stream._Stream)
class StreamEndNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_STREAM_END
+ _TYPE = native_bt.MESSAGE_TYPE_STREAM_END
def __init__(self, stream):
utils._check_type(stream, bt2.stream._Stream)
return StreamEndNotification(self.stream)
+class _InactivityNotificationClockValuesIterator(collections.abc.Iterator):
+ def __init__(self, notif_clock_values):
+ self._notif_clock_values = notif_clock_values
+ self._clock_classes = list(notif_clock_values._notif.clock_class_priority_map)
+ self._at = 0
+
+ def __next__(self):
+ if self._at == len(self._clock_classes):
+ raise StopIteration
+
+ self._at += 1
+ return self._clock_classes[at]
+
+
+class _InactivityNotificationClockValues(collections.abc.Mapping):
+ def __init__(self, notif):
+ self._notif = notif
+
+ def __getitem__(self, clock_class):
+ utils._check_type(clock_class, bt2.ClockClass)
+ clock_value_ptr = native_bt.notification_inactivity_get_clock_value(self._notif._ptr,
+ clock_class._ptr)
+
+ if clock_value_ptr is None:
+ return
+
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
+ return clock_value
+
+ def add(self, clock_value):
+ utils._check_type(clock_value, bt2.clock_value._ClockValue)
+ ret = native_bt.notification_inactivity_set_clock_value(self._notif._ptr,
+ clock_value._ptr)
+ utils._handle_ret(ret, "cannot set inactivity notification object's clock value")
+
+ def __len__(self):
+ return len(self._notif.clock_class_priority_map)
+
+ def __iter__(self):
+ return _InactivityNotificationClockValuesIterator(self)
+
+
class InactivityNotification(_CopyableNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_INACTIVITY
+ _TYPE = native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
def __init__(self, cc_prio_map=None):
if cc_prio_map is not None:
assert(cc_prio_map_ptr)
return bt2.clock_class_priority_map.ClockClassPriorityMap._create_from_ptr(cc_prio_map_ptr)
- def clock_value(self, clock_class):
- utils._check_type(clock_class, bt2.ClockClass)
- clock_value_ptr = native_bt.notification_inactivity_get_clock_value(self._ptr,
- clock_class._ptr)
-
- if clock_value_ptr is None:
- return
-
- clock_value = bt2.clock_class._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
-
- def add_clock_value(self, clock_value):
- utils._check_type(clock_value, bt2.clock_class._ClockValue)
- ret = native_bt.notification_inactivity_set_clock_value(self._ptr,
- clock_value._ptr)
- utils._handle_ret(ret, "cannot set inactivity notification object's clock value")
+ @property
+ def clock_values(self):
+ return _InactivityNotificationClockValues(self)
def _get_clock_values(self):
clock_values = {}
- for clock_class in self.clock_class_priority_map:
- clock_value = self.clock_value(clock_class)
-
+ for clock_class, clock_value in self.clock_values.items():
if clock_value is None:
continue
def __copy__(self):
cpy = InactivityNotification(self.clock_class_priority_map)
- for clock_class in self.clock_class_priority_map:
- clock_value = self.clock_value(clock_class)
-
+ for clock_class, clock_value in self.clock_values.items():
if clock_value is None:
continue
- cpy.add_clock_value(clock_value)
+ cpy.clock_values.add(clock_value)
return cpy
clock_value_cpy = cpy_clock_class(orig_clock_value.cycles)
# set copied clock value in notification copy
- cpy.add_clock_value(clock_value_cpy)
+ cpy.clock_values.add(clock_value_cpy)
memo[id(self)] = cpy
return cpy
class _DiscardedPacketsNotification(_DiscardedElementsNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_DISCARDED_PACKETS
+ _TYPE = native_bt.MESSAGE_TYPE_DISCARDED_PACKETS
@property
def count(self):
if clock_value_ptr is None:
return
- clock_value = bt2.clock_class._create_clock_value_from_ptr(clock_value_ptr)
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
return clock_value
@property
if clock_value_ptr is None:
return
- clock_value = bt2.clock_class._create_clock_value_from_ptr(clock_value_ptr)
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
return clock_value
class _DiscardedEventsNotification(_DiscardedElementsNotification):
- _TYPE = native_bt.NOTIFICATION_TYPE_DISCARDED_EVENTS
+ _TYPE = native_bt.MESSAGE_TYPE_DISCARDED_EVENTS
@property
def count(self):
if clock_value_ptr is None:
return
- clock_value = bt2.clock_class._create_clock_value_from_ptr(clock_value_ptr)
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
return clock_value
@property
if clock_value_ptr is None:
return
- clock_value = bt2.clock_class._create_clock_value_from_ptr(clock_value_ptr)
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
return clock_value
_NOTIF_TYPE_TO_CLS = {
- native_bt.NOTIFICATION_TYPE_EVENT: EventNotification,
- native_bt.NOTIFICATION_TYPE_PACKET_BEGIN: PacketBeginningNotification,
- native_bt.NOTIFICATION_TYPE_PACKET_END: PacketEndNotification,
- native_bt.NOTIFICATION_TYPE_STREAM_BEGIN: StreamBeginningNotification,
- native_bt.NOTIFICATION_TYPE_STREAM_END: StreamEndNotification,
- native_bt.NOTIFICATION_TYPE_INACTIVITY: InactivityNotification,
- native_bt.NOTIFICATION_TYPE_DISCARDED_PACKETS: _DiscardedPacketsNotification,
- native_bt.NOTIFICATION_TYPE_DISCARDED_EVENTS: _DiscardedEventsNotification,
+ native_bt.MESSAGE_TYPE_EVENT: EventNotification,
+ native_bt.MESSAGE_TYPE_PACKET_BEGINNING: PacketBeginningNotification,
+ native_bt.MESSAGE_TYPE_PACKET_END: PacketEndNotification,
+ native_bt.MESSAGE_TYPE_STREAM_BEGINNING: StreamBeginningNotification,
+ native_bt.MESSAGE_TYPE_STREAM_END: StreamEndNotification,
+ native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: InactivityNotification,
+ native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsNotification,
+ native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsNotification,
}