1 # The MIT License (MIT)
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 from bt2
import native_bt
, object, utils
24 import bt2
.clock_class_priority_map
25 import bt2
.clock_value
34 def _create_from_ptr(ptr
):
35 notif_type
= native_bt
.notification_get_type(ptr
)
38 if notif_type
not in _NOTIF_TYPE_TO_CLS
:
39 raise bt2
.Error('unknown notification type: {}'.format(notif_type
))
41 return _NOTIF_TYPE_TO_CLS
[notif_type
]._create
_from
_ptr
(ptr
)
44 def _notif_types_from_notif_classes(notification_types
):
45 if notification_types
is None:
48 for notif_cls
in notification_types
:
49 if notif_cls
not in _NOTIF_TYPE_TO_CLS
.values():
50 raise ValueError("'{}' is not a notification class".format(notif_cls
))
52 notif_types
= [notif_cls
._TYPE
for notif_cls
in notification_types
]
57 class _Notification(object._Object
):
61 class _CopyableNotification(_Notification
):
63 return self
._copy
(lambda obj
: obj
)
65 def __deepcopy__(self
, memo
):
66 cpy
= self
._copy
(copy
.deepcopy
)
71 class EventNotification(_CopyableNotification
):
72 _TYPE
= native_bt
.MESSAGE_TYPE_EVENT
74 def __init__(self
, event
, cc_prio_map
=None):
75 utils
._check
_type
(event
, bt2
.event
._Event
)
77 if cc_prio_map
is not None:
78 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
79 cc_prio_map_ptr
= cc_prio_map
._ptr
81 cc_prio_map_ptr
= None
83 ptr
= native_bt
.notification_event_create(event
._ptr
, cc_prio_map_ptr
)
86 raise bt2
.CreationError('cannot create event notification object')
92 event_ptr
= native_bt
.notification_event_get_event(self
._ptr
)
94 return bt2
.event
._create
_from
_ptr
(event_ptr
)
97 def clock_class_priority_map(self
):
98 cc_prio_map_ptr
= native_bt
.notification_event_get_clock_class_priority_map(self
._ptr
)
99 assert(cc_prio_map_ptr
)
100 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
102 def __eq__(self
, other
):
103 if type(other
) is not type(self
):
106 if self
.addr
== other
.addr
:
111 self
.clock_class_priority_map
,
115 other
.clock_class_priority_map
,
117 return self_props
== other_props
119 def _copy(self
, copy_func
):
120 # We can always use references here because those properties are
121 # frozen anyway if they are part of a notification. Since the
122 # user cannot modify them after copying the notification, it's
123 # useless to copy/deep-copy them.
124 return EventNotification(self
.event
, self
.clock_class_priority_map
)
127 class PacketBeginningNotification(_CopyableNotification
):
128 _TYPE
= native_bt
.MESSAGE_TYPE_PACKET_BEGINNING
130 def __init__(self
, packet
):
131 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
132 ptr
= native_bt
.notification_packet_begin_create(packet
._ptr
)
135 raise bt2
.CreationError('cannot create packet beginning notification object')
137 super().__init
__(ptr
)
141 packet_ptr
= native_bt
.notification_packet_begin_get_packet(self
._ptr
)
143 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
145 def __eq__(self
, other
):
146 if type(other
) is not type(self
):
149 if self
.addr
== other
.addr
:
152 return self
.packet
== other
.packet
154 def _copy(self
, copy_func
):
155 # We can always use references here because those properties are
156 # frozen anyway if they are part of a notification. Since the
157 # user cannot modify them after copying the notification, it's
158 # useless to copy/deep-copy them.
159 return PacketBeginningNotification(self
.packet
)
162 class PacketEndNotification(_CopyableNotification
):
163 _TYPE
= native_bt
.MESSAGE_TYPE_PACKET_END
165 def __init__(self
, packet
):
166 utils
._check
_type
(packet
, bt2
.packet
._Packet
)
167 ptr
= native_bt
.notification_packet_end_create(packet
._ptr
)
170 raise bt2
.CreationError('cannot create packet end notification object')
172 super().__init
__(ptr
)
176 packet_ptr
= native_bt
.notification_packet_end_get_packet(self
._ptr
)
178 return bt2
.packet
._Packet
._create
_from
_ptr
(packet_ptr
)
180 def __eq__(self
, other
):
181 if type(other
) is not type(self
):
184 if self
.addr
== other
.addr
:
187 return self
.packet
== other
.packet
189 def _copy(self
, copy_func
):
190 # We can always use references here because those properties are
191 # frozen anyway if they are part of a notification. Since the
192 # user cannot modify them after copying the notification, it's
193 # useless to copy/deep-copy them.
194 return PacketEndNotification(self
.packet
)
197 class StreamBeginningNotification(_CopyableNotification
):
198 _TYPE
= native_bt
.MESSAGE_TYPE_STREAM_BEGINNING
200 def __init__(self
, stream
):
201 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
202 ptr
= native_bt
.notification_stream_begin_create(stream
._ptr
)
205 raise bt2
.CreationError('cannot create stream beginning notification object')
207 super().__init
__(ptr
)
211 stream_ptr
= native_bt
.notification_stream_begin_get_stream(self
._ptr
)
213 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
215 def __eq__(self
, other
):
216 if type(other
) is not type(self
):
219 if self
.addr
== other
.addr
:
222 return self
.stream
== other
.stream
224 def _copy(self
, copy_func
):
225 # We can always use references here because those properties are
226 # frozen anyway if they are part of a notification. Since the
227 # user cannot modify them after copying the notification, it's
228 # useless to copy/deep-copy them.
229 return StreamBeginningNotification(self
.stream
)
232 class StreamEndNotification(_CopyableNotification
):
233 _TYPE
= native_bt
.MESSAGE_TYPE_STREAM_END
235 def __init__(self
, stream
):
236 utils
._check
_type
(stream
, bt2
.stream
._Stream
)
237 ptr
= native_bt
.notification_stream_end_create(stream
._ptr
)
240 raise bt2
.CreationError('cannot create stream end notification object')
242 super().__init
__(ptr
)
246 stream_ptr
= native_bt
.notification_stream_end_get_stream(self
._ptr
)
248 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
250 def __eq__(self
, other
):
251 if type(other
) is not type(self
):
254 if self
.addr
== other
.addr
:
257 return self
.stream
== other
.stream
259 def _copy(self
, copy_func
):
260 # We can always use references here because those properties are
261 # frozen anyway if they are part of a notification. Since the
262 # user cannot modify them after copying the notification, it's
263 # useless to copy/deep-copy them.
264 return StreamEndNotification(self
.stream
)
267 class _InactivityNotificationClockValuesIterator(collections
.abc
.Iterator
):
268 def __init__(self
, notif_clock_values
):
269 self
._notif
_clock
_values
= notif_clock_values
270 self
._clock
_classes
= list(notif_clock_values
._notif
.clock_class_priority_map
)
274 if self
._at
== len(self
._clock
_classes
):
278 return self
._clock
_classes
[at
]
281 class _InactivityNotificationClockValues(collections
.abc
.Mapping
):
282 def __init__(self
, notif
):
285 def __getitem__(self
, clock_class
):
286 utils
._check
_type
(clock_class
, bt2
.ClockClass
)
287 clock_value_ptr
= native_bt
.notification_inactivity_get_clock_value(self
._notif
._ptr
,
290 if clock_value_ptr
is None:
293 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
296 def add(self
, clock_value
):
297 utils
._check
_type
(clock_value
, bt2
.clock_value
._ClockValue
)
298 ret
= native_bt
.notification_inactivity_set_clock_value(self
._notif
._ptr
,
300 utils
._handle
_ret
(ret
, "cannot set inactivity notification object's clock value")
303 return len(self
._notif
.clock_class_priority_map
)
306 return _InactivityNotificationClockValuesIterator(self
)
309 class InactivityNotification(_CopyableNotification
):
310 _TYPE
= native_bt
.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
312 def __init__(self
, cc_prio_map
=None):
313 if cc_prio_map
is not None:
314 utils
._check
_type
(cc_prio_map
, bt2
.clock_class_priority_map
.ClockClassPriorityMap
)
315 cc_prio_map_ptr
= cc_prio_map
._ptr
317 cc_prio_map_ptr
= None
319 ptr
= native_bt
.notification_inactivity_create(cc_prio_map_ptr
)
322 raise bt2
.CreationError('cannot create inactivity notification object')
324 super().__init
__(ptr
)
327 def clock_class_priority_map(self
):
328 cc_prio_map_ptr
= native_bt
.notification_inactivity_get_clock_class_priority_map(self
._ptr
)
329 assert(cc_prio_map_ptr
)
330 return bt2
.clock_class_priority_map
.ClockClassPriorityMap
._create
_from
_ptr
(cc_prio_map_ptr
)
333 def clock_values(self
):
334 return _InactivityNotificationClockValues(self
)
336 def _get_clock_values(self
):
339 for clock_class
, clock_value
in self
.clock_values
.items():
340 if clock_value
is None:
343 clock_values
[clock_class
] = clock_value
347 def __eq__(self
, other
):
348 if type(other
) is not type(self
):
351 if self
.addr
== other
.addr
:
355 self
.clock_class_priority_map
,
356 self
._get
_clock
_values
(),
359 other
.clock_class_priority_map
,
360 other
._get
_clock
_values
(),
362 return self_props
== other_props
365 cpy
= InactivityNotification(self
.clock_class_priority_map
)
367 for clock_class
, clock_value
in self
.clock_values
.items():
368 if clock_value
is None:
371 cpy
.clock_values
.add(clock_value
)
375 def __deepcopy__(self
, memo
):
376 cc_prio_map_cpy
= copy
.deepcopy(self
.clock_class_priority_map
)
377 cpy
= InactivityNotification(cc_prio_map_cpy
)
380 for orig_clock_class
in self
.clock_class_priority_map
:
381 orig_clock_value
= self
.clock_value(orig_clock_class
)
383 if orig_clock_value
is None:
386 # find equivalent, copied clock class in CC priority map copy
387 for cpy_clock_class
in cc_prio_map_cpy
:
388 if cpy_clock_class
== orig_clock_class
:
391 # create copy of clock value from copied clock class
392 clock_value_cpy
= cpy_clock_class(orig_clock_value
.cycles
)
394 # set copied clock value in notification copy
395 cpy
.clock_values
.add(clock_value_cpy
)
401 class _DiscardedElementsNotification(_Notification
):
402 def __eq__(self
, other
):
403 if type(other
) is not type(self
):
406 if self
.addr
== other
.addr
:
412 self
.beginning_clock_value
,
413 self
.end_clock_value
,
418 other
.beginning_clock_value
,
419 other
.end_clock_value
,
421 return self_props
== other_props
424 class _DiscardedPacketsNotification(_DiscardedElementsNotification
):
425 _TYPE
= native_bt
.MESSAGE_TYPE_DISCARDED_PACKETS
429 count
= native_bt
.notification_discarded_packets_get_count(self
._ptr
)
435 stream_ptr
= native_bt
.notification_discarded_packets_get_stream(self
._ptr
)
437 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
440 def beginning_clock_value(self
):
441 clock_value_ptr
= native_bt
.notification_discarded_packets_get_begin_clock_value(self
._ptr
)
443 if clock_value_ptr
is None:
446 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
450 def end_clock_value(self
):
451 clock_value_ptr
= native_bt
.notification_discarded_packets_get_end_clock_value(self
._ptr
)
453 if clock_value_ptr
is None:
456 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
460 class _DiscardedEventsNotification(_DiscardedElementsNotification
):
461 _TYPE
= native_bt
.MESSAGE_TYPE_DISCARDED_EVENTS
465 count
= native_bt
.notification_discarded_events_get_count(self
._ptr
)
471 stream_ptr
= native_bt
.notification_discarded_events_get_stream(self
._ptr
)
473 return bt2
.stream
._create
_from
_ptr
(stream_ptr
)
476 def beginning_clock_value(self
):
477 clock_value_ptr
= native_bt
.notification_discarded_events_get_begin_clock_value(self
._ptr
)
479 if clock_value_ptr
is None:
482 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
486 def end_clock_value(self
):
487 clock_value_ptr
= native_bt
.notification_discarded_events_get_end_clock_value(self
._ptr
)
489 if clock_value_ptr
is None:
492 clock_value
= bt2
.clock_value
._create
_clock
_value
_from
_ptr
(clock_value_ptr
)
496 _NOTIF_TYPE_TO_CLS
= {
497 native_bt
.MESSAGE_TYPE_EVENT
: EventNotification
,
498 native_bt
.MESSAGE_TYPE_PACKET_BEGINNING
: PacketBeginningNotification
,
499 native_bt
.MESSAGE_TYPE_PACKET_END
: PacketEndNotification
,
500 native_bt
.MESSAGE_TYPE_STREAM_BEGINNING
: StreamBeginningNotification
,
501 native_bt
.MESSAGE_TYPE_STREAM_END
: StreamEndNotification
,
502 native_bt
.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY
: InactivityNotification
,
503 native_bt
.MESSAGE_TYPE_DISCARDED_PACKETS
: _DiscardedPacketsNotification
,
504 native_bt
.MESSAGE_TYPE_DISCARDED_EVENTS
: _DiscardedEventsNotification
,