- def stream(self):
- stream_ptr = native_bt.message_stream_end_get_stream(self._ptr)
- assert(stream_ptr)
- return bt2.stream._create_from_ptr(stream_ptr)
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- return self.stream == other.stream
-
- def _copy(self, copy_func):
- # We can always use references here because those properties are
- # frozen anyway if they are part of a message. Since the
- # user cannot modify them after copying the message, it's
- # useless to copy/deep-copy them.
- return StreamEndMessage(self.stream)
-
-
-class _InactivityMessageClockSnapshotsIterator(collections.abc.Iterator):
- def __init__(self, msg_clock_snapshots):
- self._msg_clock_snapshots = msg_clock_snapshots
- self._clock_classes = list(msg_clock_snapshots._msg.clock_class_priority_map)
- self._at = 0
-
- def __next__(self):
- if self._at == len(self._clock_classes):
- raise StopIteration
-
- self._at += 1
- return self._clock_classes[at]
-
-
-class _InactivityMessageClockSnapshots(collections.abc.Mapping):
- def __init__(self, msg):
- self._msg = msg
-
- def __getitem__(self, clock_class):
- utils._check_type(clock_class, bt2.ClockClass)
- clock_snapshot_ptr = native_bt.message_inactivity_get_clock_snapshot(self._msg._ptr,
- clock_class._ptr)
-
- if clock_snapshot_ptr is None:
- return
-
- clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
- return clock_snapshot
-
- def add(self, clock_snapshot):
- utils._check_type(clock_snapshot, bt2.clock_snapshot._ClockSnapshot)
- ret = native_bt.message_inactivity_set_clock_snapshot(self._msg._ptr,
- clock_snapshot._ptr)
- utils._handle_ret(ret, "cannot set inactivity message object's clock value")
-
- def __len__(self):
- return len(self._msg.clock_class_priority_map)
-
- def __iter__(self):
- return _InactivityMessageClockSnapshotsIterator(self)
-