import bt2.stream
import bt2.fields
import bt2.clock_value
+import collections
import numbers
import copy
import abc
return event
+class _EventClockValuesIterator(collections.abc.Iterator):
+ def __init__(self, event_clock_values):
+ self._event_clock_values = event_clock_values
+ self._clock_classes = event_clock_values._event._clock_classes
+ self._at = 0
+
+ def __next__(self):
+ if self._at == len(self._clock_classes):
+ raise StopIteration
+
+ self._at += 1
+ return self._clock_classes[at]
+
+
+class _EventClockValues(collections.abc.Mapping):
+ def __init__(self, event):
+ self._event = event
+
+ def __getitem__(self, clock_class):
+ utils._check_type(clock_class, bt2.ClockClass)
+ clock_value_ptr = native_bt.event_get_clock_value(self._event._ptr,
+ clock_class._ptr)
+
+ if clock_value_ptr is None:
+ return
+
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
+ return clock_value
+
+ def add(self, clock_value):
+ utils._check_type(clock_value, bt2.clock_value._ClockValue)
+ ret = native_bt.event_set_clock_value(self._ptr,
+ clock_value._ptr)
+ utils._handle_ret(ret, "cannot set event object's clock value")
+
+ def __len__(self):
+ count = len(self._event._clock_classes)
+ assert(count >= 0)
+ return count
+
+ def __iter__(self):
+ return _EventClockValuesIterator(self)
+
+
class _Event(object._Object):
@property
def event_class(self):
utils._handle_ret(ret, "cannot get clock value object's cycles")
return cycles
- def clock_value(self, clock_class):
- utils._check_type(clock_class, bt2.ClockClass)
- clock_value_ptr = native_bt.event_get_clock_value(self._ptr,
- clock_class._ptr)
-
- if clock_value_ptr is None:
- return
-
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
-
- def add_clock_value(self, clock_value):
- utils._check_type(clock_value, bt2.clock_value._ClockValue)
- ret = native_bt.event_set_clock_value(self._ptr,
- clock_value._ptr)
- utils._handle_ret(ret, "cannot set event object's clock value")
+ @property
+ def clock_values(self):
+ return _EventClockValues(self)
def __getitem__(self, key):
utils._check_str(key)
# Thus even if we copy the clock class, the user cannot modify
# it, therefore it's useless to copy it.
for clock_class in self._clock_classes:
- clock_value = self.clock_value(clock_class)
+ clock_value = self.clock_values[clock_class]
if clock_value is not None:
- cpy.add_clock_value(clock_value)
+ cpy.clock_values.add(clock_value)
return cpy
from bt2 import native_bt, object, utils
import bt2.clock_class_priority_map
import bt2.clock_value
+import collections
import bt2.packet
import bt2.stream
import bt2.event
return StreamEndNotification(self.stream)
+class _InactivityNotificationClockValuesIterator(collections.abc.Iterator):
+ def __init__(self, notif_clock_values):
+ self._notif_clock_values = notif_clock_values
+ self._clock_classes = list(notif_clock_values._notif.clock_class_priority_map)
+ self._at = 0
+
+ def __next__(self):
+ if self._at == len(self._clock_classes):
+ raise StopIteration
+
+ self._at += 1
+ return self._clock_classes[at]
+
+
+class _InactivityNotificationClockValues(collections.abc.Mapping):
+ def __init__(self, notif):
+ self._notif = notif
+
+ def __getitem__(self, clock_class):
+ utils._check_type(clock_class, bt2.ClockClass)
+ clock_value_ptr = native_bt.notification_inactivity_get_clock_value(self._notif._ptr,
+ clock_class._ptr)
+
+ if clock_value_ptr is None:
+ return
+
+ clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
+ return clock_value
+
+ def add(self, clock_value):
+ utils._check_type(clock_value, bt2.clock_value._ClockValue)
+ ret = native_bt.notification_inactivity_set_clock_value(self._notif._ptr,
+ clock_value._ptr)
+ utils._handle_ret(ret, "cannot set inactivity notification object's clock value")
+
+ def __len__(self):
+ return len(self._notif.clock_class_priority_map)
+
+ def __iter__(self):
+ return _InactivityNotificationClockValuesIterator(self)
+
+
class InactivityNotification(_CopyableNotification):
_TYPE = native_bt.NOTIFICATION_TYPE_INACTIVITY
assert(cc_prio_map_ptr)
return bt2.clock_class_priority_map.ClockClassPriorityMap._create_from_ptr(cc_prio_map_ptr)
- def clock_value(self, clock_class):
- utils._check_type(clock_class, bt2.ClockClass)
- clock_value_ptr = native_bt.notification_inactivity_get_clock_value(self._ptr,
- clock_class._ptr)
-
- if clock_value_ptr is None:
- return
-
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
-
- def add_clock_value(self, clock_value):
- utils._check_type(clock_value, bt2.clock_value._ClockValue)
- ret = native_bt.notification_inactivity_set_clock_value(self._ptr,
- clock_value._ptr)
- utils._handle_ret(ret, "cannot set inactivity notification object's clock value")
+ @property
+ def clock_values(self):
+ return _InactivityNotificationClockValues(self)
def _get_clock_values(self):
clock_values = {}
- for clock_class in self.clock_class_priority_map:
- clock_value = self.clock_value(clock_class)
-
+ for clock_class, clock_value in self.clock_values.items():
if clock_value is None:
continue
def __copy__(self):
cpy = InactivityNotification(self.clock_class_priority_map)
- for clock_class in self.clock_class_priority_map:
- clock_value = self.clock_value(clock_class)
-
+ for clock_class, clock_value in self.clock_values.items():
if clock_value is None:
continue
- cpy.add_clock_value(clock_value)
+ cpy.clock_values.add(clock_value)
return cpy
clock_value_cpy = cpy_clock_class(orig_clock_value.cycles)
# set copied clock value in notification copy
- cpy.add_clock_value(clock_value_cpy)
+ cpy.clock_values.add(clock_value_cpy)
memo[id(self)] = cpy
return cpy
cli/test_trimmer
TESTS_LIB = \
- lib/test_bitfield \
lib/test_ctf_writer_complete \
lib/test_bt_values \
lib/test_ctf_ir_ref \
self._packet.context_field['spc_field'] = self._values['spc_field']
self._event = self._ec()
- self._event.add_clock_value(self._clock_class(1772))
+ self._event.clock_values.add(self._clock_class(1772))
self._event.header_field['seh_field'] = self._values['seh_field']
self._event.stream_event_context_field['sec_field'] = self._values[
'sec_field']
def test_attr_datetime(self):
event = self._get_event()
clock_class = self._cc_prio_map.highest_priority_clock_class
- ns = self._event.clock_value(clock_class).ns_from_epoch
+ ns = self._event.clock_values[clock_class].ns_from_epoch
self.assertEqual(datetime.date.fromtimestamp(ns / 1E9), event.datetime)
def test_getitem(self):
cc = bt2.ClockClass('hi', 1000)
tc.add_clock_class(cc)
ev = self._ec()
- ev.add_clock_value(cc(177))
- self.assertEqual(ev.clock_value(cc).cycles, 177)
+ ev.clock_values.add(cc(177))
+ self.assertEqual(ev.clock_values[cc].cycles, 177)
def test_no_clock_value(self):
tc = bt2.Trace()
cc = bt2.ClockClass('hi', 1000)
tc.add_clock_class(cc)
ev = self._ec()
- self.assertIsNone(ev.clock_value(cc))
+ self.assertIsNone(ev.clock_values[cc])
def test_no_packet(self):
ev = self._ec()
tc.add_clock_class(cc)
ev = self._ec()
self._fill_ev(ev)
- ev.add_clock_value(cc(234))
+ ev.clock_values.add(cc(234))
return ev
def test_getitem(self):
tc.add_clock_class(cc)
ev1 = self._ec()
self._fill_ev(ev1)
- ev1.add_clock_value(cc(234))
+ ev1.clock_values.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.add_clock_value(cc(234))
+ ev2.clock_values.add(cc(234))
self.assertEqual(ev1, ev2)
def test_ne_header_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.header_field['id'] = 19
- ev1.add_clock_value(cc(234))
+ ev1.clock_values.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.add_clock_value(cc(234))
+ ev2.clock_values.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_stream_event_context_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.stream_event_context_field['cpu_id'] = 3
- ev1.add_clock_value(cc(234))
+ ev1.clock_values.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.add_clock_value(cc(234))
+ ev2.clock_values.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_context_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.context_field['ant'] = -3
- ev1.add_clock_value(cc(234))
+ ev1.clock_values.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.add_clock_value(cc(234))
+ ev2.clock_values.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_payload_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.payload_field['mosquito'] = 98
- ev1.add_clock_value(cc(234))
+ ev1.clock_values.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.add_clock_value(cc(234))
+ ev2.clock_values.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_eq_invalid(self):
tc.add_clock_class(cc)
ev = self._ec()
self._fill_ev(ev)
- ev.add_clock_value(cc(234))
+ ev.clock_values.add(cc(234))
cpy = func(ev)
self.assertIsNot(ev, cpy)
self.assertNotEqual(ev.addr, cpy.addr)
self._packet = self._stream.create_packet()
self._packet.header_field['hello'] = 19487
self._event = self._ec()
- self._event.add_clock_value(self._clock_class(1772))
+ self._event.clock_values.add(self._clock_class(1772))
self._event.payload_field['my_int'] = 23
self._event.packet = self._packet
def test_create_with_cc_prio_map(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
self.assertEqual(len(notif.clock_class_priority_map), 2)
self.assertEqual(notif.clock_class_priority_map, self._cc_prio_map)
- self.assertEqual(notif.clock_value(self._cc1), 123)
- self.assertEqual(notif.clock_value(self._cc2), 19487)
+ self.assertEqual(notif.clock_values[self._cc1], 123)
+ self.assertEqual(notif.clock_values[self._cc2], 19487)
def test_eq(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
cc_prio_map_copy = copy.copy(self._cc_prio_map)
notif2 = bt2.InactivityNotification(cc_prio_map_copy)
- notif2.add_clock_value(self._cc1(123))
- notif2.add_clock_value(self._cc2(19487))
+ notif2.clock_values.add(self._cc1(123))
+ notif2.clock_values.add(self._cc2(19487))
self.assertEqual(notif, notif2)
def test_ne_cc_prio_map(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
cc_prio_map_copy = copy.copy(self._cc_prio_map)
cc_prio_map_copy[self._cc2] = 23
notif2 = bt2.InactivityNotification(cc_prio_map_copy)
def test_ne_clock_value(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
notif2 = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(1847))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(1847))
self.assertNotEqual(notif, notif2)
def test_eq_invalid(self):
def test_copy(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
notif_copy = copy.copy(notif)
self.assertEqual(notif, notif_copy)
self.assertNotEqual(notif.addr, notif_copy.addr)
self.assertEqual(notif.clock_class_priority_map.addr,
notif_copy.clock_class_priority_map.addr)
- self.assertEqual(notif_copy.clock_value(self._cc1), 123)
- self.assertEqual(notif_copy.clock_value(self._cc2), 19487)
+ self.assertEqual(notif_copy.clock_values[self._cc1], 123)
+ self.assertEqual(notif_copy.clock_values[self._cc2], 19487)
def test_deepcopy(self):
notif = bt2.InactivityNotification(self._cc_prio_map)
- notif.add_clock_value(self._cc1(123))
- notif.add_clock_value(self._cc2(19487))
+ notif.clock_values.add(self._cc1(123))
+ notif.clock_values.add(self._cc2(19487))
notif_copy = copy.deepcopy(notif)
self.assertEqual(notif, notif_copy)
self.assertNotEqual(notif.addr, notif_copy.addr)
notif_copy.clock_class_priority_map)
self.assertNotEqual(list(notif.clock_class_priority_map)[0].addr,
list(notif_copy.clock_class_priority_map)[0].addr)
- self.assertIsNone(notif_copy.clock_value(self._cc1))
- self.assertIsNone(notif_copy.clock_value(self._cc2))
- self.assertEqual(notif_copy.clock_value(list(notif_copy.clock_class_priority_map)[0]), 123)
- self.assertEqual(notif_copy.clock_value(list(notif_copy.clock_class_priority_map)[1]), 19487)
+ self.assertIsNone(notif_copy.clock_values[self._cc1])
+ self.assertIsNone(notif_copy.clock_values[self._cc2])
+ self.assertEqual(notif_copy.clock_values[list(notif_copy.clock_class_priority_map)[0]], 123)
+ self.assertEqual(notif_copy.clock_values[list(notif_copy.clock_class_priority_map)[1]], 19487)
class DiscardedPacketsNotificationTestCase(unittest.TestCase):