bt2/native_bt_version.i \
bt2/clock_class_priority_map.py \
bt2/clock_class.py \
- bt2/clock_value.py \
+ bt2/clock_snapshot.py \
bt2/component.py \
bt2/connection.py \
bt2/ctf_writer.py \
from bt2.clock_class import *
from bt2.clock_class_priority_map import *
-from bt2.clock_value import *
+from bt2.clock_snapshot import *
from bt2.component import *
from bt2.component import _FilterComponent
from bt2.component import _GenericFilterComponentClass
import uuid as uuidp
import numbers
import bt2
-import bt2.clock_value as clock_value
+import bt2.clock_snapshot as clock_snapshot
class ClockClassOffset:
utils._handle_ret(ret, "cannot set clock class object's UUID")
def __call__(self, cycles):
- return clock_value._ClockValue(self._ptr, cycles)
+ return clock_snapshot._ClockSnapshot(self._ptr, cycles)
--- /dev/null
+# The MIT License (MIT)
+#
+# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from bt2 import native_bt, object, utils
+import uuid as uuidp
+import numbers
+import bt2
+
+
+def _create_clock_snapshot_from_ptr(ptr):
+ clock_snapshot = _ClockSnapshot._create_from_ptr(ptr)
+ return clock_snapshot
+
+
+class _ClockSnapshot(object._Object):
+ def __init__(self, clock_class_ptr, cycles):
+ utils._check_uint64(cycles)
+ ptr = native_bt.clock_snapshot_create(clock_class_ptr, cycles)
+
+ if ptr is None:
+ raise bt2.CreationError('cannot create clock value object')
+
+ super().__init__(ptr)
+
+ @property
+ def clock_class(self):
+ ptr = native_bt.clock_snapshot_get_class(self._ptr)
+ assert(ptr)
+ return bt2.ClockClass._create_from_ptr(ptr)
+
+ @property
+ def cycles(self):
+ ret, cycles = native_bt.clock_snapshot_get_value(self._ptr)
+ assert(ret == 0)
+ return cycles
+
+ @property
+ def ns_from_epoch(self):
+ ret, ns = native_bt.clock_snapshot_get_value_ns_from_epoch(self._ptr)
+ utils._handle_ret(ret, "cannot get clock value object's nanoseconds from Epoch")
+ return ns
+
+ def __eq__(self, other):
+ if isinstance(other, numbers.Integral):
+ return int(other) == self.cycles
+
+ if not isinstance(other, self.__class__):
+ # not comparing apples to apples
+ return False
+
+ if self.addr == other.addr:
+ return True
+
+ self_props = self.clock_class, self.cycles
+ other_props = other.clock_class, other.cycles
+ return self_props == other_props
+
+ def __copy__(self):
+ return self.clock_class(self.cycles)
+
+ def __deepcopy__(self, memo):
+ cpy = self.__copy__()
+ memo[id(self)] = cpy
+ return cpy
+++ /dev/null
-# The MIT License (MIT)
-#
-# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-from bt2 import native_bt, object, utils
-import uuid as uuidp
-import numbers
-import bt2
-
-
-def _create_clock_value_from_ptr(ptr):
- clock_value = _ClockValue._create_from_ptr(ptr)
- return clock_value
-
-
-class _ClockValue(object._Object):
- def __init__(self, clock_class_ptr, cycles):
- utils._check_uint64(cycles)
- ptr = native_bt.clock_value_create(clock_class_ptr, cycles)
-
- if ptr is None:
- raise bt2.CreationError('cannot create clock value object')
-
- super().__init__(ptr)
-
- @property
- def clock_class(self):
- ptr = native_bt.clock_value_get_class(self._ptr)
- assert(ptr)
- return bt2.ClockClass._create_from_ptr(ptr)
-
- @property
- def cycles(self):
- ret, cycles = native_bt.clock_value_get_value(self._ptr)
- assert(ret == 0)
- return cycles
-
- @property
- def ns_from_epoch(self):
- ret, ns = native_bt.clock_value_get_value_ns_from_epoch(self._ptr)
- utils._handle_ret(ret, "cannot get clock value object's nanoseconds from Epoch")
- return ns
-
- def __eq__(self, other):
- if isinstance(other, numbers.Integral):
- return int(other) == self.cycles
-
- if not isinstance(other, self.__class__):
- # not comparing apples to apples
- return False
-
- if self.addr == other.addr:
- return True
-
- self_props = self.clock_class, self.cycles
- other_props = other.clock_class, other.cycles
- return self_props == other_props
-
- def __copy__(self):
- return self.clock_class(self.cycles)
-
- def __deepcopy__(self, memo):
- cpy = self.__copy__()
- memo[id(self)] = cpy
- return cpy
import bt2.packet
import bt2.stream
import bt2.fields
-import bt2.clock_value
+import bt2.clock_snapshot
import collections
import numbers
import copy
return event
-class _EventClockValuesIterator(collections.abc.Iterator):
- def __init__(self, event_clock_values):
- self._event_clock_values = event_clock_values
- self._clock_classes = event_clock_values._event._clock_classes
+class _EventClockSnapshotsIterator(collections.abc.Iterator):
+ def __init__(self, event_clock_snapshots):
+ self._event_clock_snapshots = event_clock_snapshots
+ self._clock_classes = event_clock_snapshots._event._clock_classes
self._at = 0
def __next__(self):
return self._clock_classes[at]
-class _EventClockValues(collections.abc.Mapping):
+class _EventClockSnapshots(collections.abc.Mapping):
def __init__(self, event):
self._event = event
def __getitem__(self, clock_class):
utils._check_type(clock_class, bt2.ClockClass)
- clock_value_ptr = native_bt.event_get_clock_value(self._event._ptr,
+ clock_snapshot_ptr = native_bt.event_get_clock_snapshot(self._event._ptr,
clock_class._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
- def add(self, clock_value):
- utils._check_type(clock_value, bt2.clock_value._ClockValue)
- ret = native_bt.event_set_clock_value(self._ptr,
- clock_value._ptr)
+ def add(self, clock_snapshot):
+ utils._check_type(clock_snapshot, bt2.clock_snapshot._ClockSnapshot)
+ ret = native_bt.event_set_clock_snapshot(self._ptr,
+ clock_snapshot._ptr)
utils._handle_ret(ret, "cannot set event object's clock value")
def __len__(self):
return count
def __iter__(self):
- return _EventClockValuesIterator(self)
+ return _EventClockSnapshotsIterator(self)
class _Event(object._Object):
ret = native_bt.event_set_event_payload(self._ptr, payload_ptr)
utils._handle_ret(ret, "cannot set event object's payload field")
- def _get_clock_value_cycles(self, clock_class_ptr):
- clock_value_ptr = native_bt.event_get_clock_value(self._ptr,
+ def _get_clock_snapshot_cycles(self, clock_class_ptr):
+ clock_snapshot_ptr = native_bt.event_get_clock_snapshot(self._ptr,
clock_class_ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- ret, cycles = native_bt.clock_value_get_value(clock_value_ptr)
- native_bt.put(clock_value_ptr)
+ ret, cycles = native_bt.clock_snapshot_get_value(clock_snapshot_ptr)
+ native_bt.put(clock_snapshot_ptr)
utils._handle_ret(ret, "cannot get clock value object's cycles")
return cycles
@property
- def clock_values(self):
- return _EventClockValues(self)
+ def clock_snapshots(self):
+ return _EventClockSnapshots(self)
def __getitem__(self, key):
utils._check_str(key)
if self.addr == other.addr:
return True
- self_clock_values = {}
- other_clock_values = {}
+ self_clock_snapshots = {}
+ other_clock_snapshots = {}
for clock_class_ptr in self._clock_class_ptrs:
- self_clock_values[int(clock_class_ptr)] = self._get_clock_value_cycles(clock_class_ptr)
+ self_clock_snapshots[int(clock_class_ptr)] = self._get_clock_snapshot_cycles(clock_class_ptr)
for clock_class_ptr in other._clock_class_ptrs:
- other_clock_values[int(clock_class_ptr)] = self._get_clock_value_cycles(clock_class_ptr)
+ other_clock_snapshots[int(clock_class_ptr)] = self._get_clock_snapshot_cycles(clock_class_ptr)
self_props = (
self.header_field,
self.stream_event_context_field,
self.context_field,
self.payload_field,
- self_clock_values,
+ self_clock_snapshots,
)
other_props = (
other.header_field,
other.stream_event_context_field,
other.context_field,
other.payload_field,
- other_clock_values,
+ other_clock_snapshots,
)
return self_props == other_props
# Thus even if we copy the clock class, the user cannot modify
# it, therefore it's useless to copy it.
for clock_class in self._clock_classes:
- clock_value = self.clock_values[clock_class]
+ clock_snapshot = self.clock_snapshots[clock_class]
- if clock_value is not None:
- cpy.clock_values.add(clock_value)
+ if clock_snapshot is not None:
+ cpy.clock_snapshots.add(clock_snapshot)
return cpy
from bt2 import native_bt, object, utils
import bt2.clock_class_priority_map
-import bt2.clock_value
+import bt2.clock_snapshot
import collections
import bt2.packet
import bt2.stream
return StreamEndMessage(self.stream)
-class _InactivityMessageClockValuesIterator(collections.abc.Iterator):
- def __init__(self, msg_clock_values):
- self._msg_clock_values = msg_clock_values
- self._clock_classes = list(msg_clock_values._msg.clock_class_priority_map)
+class _InactivityMessageClockSnapshotsIterator(collections.abc.Iterator):
+ def __init__(self, msg_clock_snapshots):
+ self._msg_clock_snapshots = msg_clock_snapshots
+ self._clock_classes = list(msg_clock_snapshots._msg.clock_class_priority_map)
self._at = 0
def __next__(self):
return self._clock_classes[at]
-class _InactivityMessageClockValues(collections.abc.Mapping):
+class _InactivityMessageClockSnapshots(collections.abc.Mapping):
def __init__(self, msg):
self._msg = msg
def __getitem__(self, clock_class):
utils._check_type(clock_class, bt2.ClockClass)
- clock_value_ptr = native_bt.message_inactivity_get_clock_value(self._msg._ptr,
+ clock_snapshot_ptr = native_bt.message_inactivity_get_clock_snapshot(self._msg._ptr,
clock_class._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
- def add(self, clock_value):
- utils._check_type(clock_value, bt2.clock_value._ClockValue)
- ret = native_bt.message_inactivity_set_clock_value(self._msg._ptr,
- clock_value._ptr)
+ def add(self, clock_snapshot):
+ utils._check_type(clock_snapshot, bt2.clock_snapshot._ClockSnapshot)
+ ret = native_bt.message_inactivity_set_clock_snapshot(self._msg._ptr,
+ clock_snapshot._ptr)
utils._handle_ret(ret, "cannot set inactivity message object's clock value")
def __len__(self):
return len(self._msg.clock_class_priority_map)
def __iter__(self):
- return _InactivityMessageClockValuesIterator(self)
+ return _InactivityMessageClockSnapshotsIterator(self)
class InactivityMessage(_CopyableMessage):
return bt2.clock_class_priority_map.ClockClassPriorityMap._create_from_ptr(cc_prio_map_ptr)
@property
- def clock_values(self):
- return _InactivityMessageClockValues(self)
+ def clock_snapshots(self):
+ return _InactivityMessageClockSnapshots(self)
- def _get_clock_values(self):
- clock_values = {}
+ def _get_clock_snapshots(self):
+ clock_snapshots = {}
- for clock_class, clock_value in self.clock_values.items():
- if clock_value is None:
+ for clock_class, clock_snapshot in self.clock_snapshots.items():
+ if clock_snapshot is None:
continue
- clock_values[clock_class] = clock_value
+ clock_snapshots[clock_class] = clock_snapshot
- return clock_values
+ return clock_snapshots
def __eq__(self, other):
if type(other) is not type(self):
self_props = (
self.clock_class_priority_map,
- self._get_clock_values(),
+ self._get_clock_snapshots(),
)
other_props = (
other.clock_class_priority_map,
- other._get_clock_values(),
+ other._get_clock_snapshots(),
)
return self_props == other_props
def __copy__(self):
cpy = InactivityMessage(self.clock_class_priority_map)
- for clock_class, clock_value in self.clock_values.items():
- if clock_value is None:
+ for clock_class, clock_snapshot in self.clock_snapshots.items():
+ if clock_snapshot is None:
continue
- cpy.clock_values.add(clock_value)
+ cpy.clock_snapshots.add(clock_snapshot)
return cpy
# copy clock values
for orig_clock_class in self.clock_class_priority_map:
- orig_clock_value = self.clock_value(orig_clock_class)
+ orig_clock_snapshot = self.clock_snapshot(orig_clock_class)
- if orig_clock_value is None:
+ if orig_clock_snapshot is None:
continue
# find equivalent, copied clock class in CC priority map copy
break
# create copy of clock value from copied clock class
- clock_value_cpy = cpy_clock_class(orig_clock_value.cycles)
+ clock_snapshot_cpy = cpy_clock_class(orig_clock_snapshot.cycles)
# set copied clock value in message copy
- cpy.clock_values.add(clock_value_cpy)
+ cpy.clock_snapshots.add(clock_snapshot_cpy)
memo[id(self)] = cpy
return cpy
self_props = (
self.count,
self.stream,
- self.beginning_clock_value,
- self.end_clock_value,
+ self.beginning_clock_snapshot,
+ self.end_clock_snapshot,
)
other_props = (
other.count,
other.stream,
- other.beginning_clock_value,
- other.end_clock_value,
+ other.beginning_clock_snapshot,
+ other.end_clock_snapshot,
)
return self_props == other_props
return bt2.stream._create_from_ptr(stream_ptr)
@property
- def beginning_clock_value(self):
- clock_value_ptr = native_bt.message_discarded_packets_get_begin_clock_value(self._ptr)
+ def beginning_clock_snapshot(self):
+ clock_snapshot_ptr = native_bt.message_discarded_packets_get_begin_clock_snapshot(self._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
@property
- def end_clock_value(self):
- clock_value_ptr = native_bt.message_discarded_packets_get_end_clock_value(self._ptr)
+ def end_clock_snapshot(self):
+ clock_snapshot_ptr = native_bt.message_discarded_packets_get_end_clock_snapshot(self._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
class _DiscardedEventsMessage(_DiscardedElementsMessage):
return bt2.stream._create_from_ptr(stream_ptr)
@property
- def beginning_clock_value(self):
- clock_value_ptr = native_bt.message_discarded_events_get_begin_clock_value(self._ptr)
+ def beginning_clock_snapshot(self):
+ clock_snapshot_ptr = native_bt.message_discarded_events_get_begin_clock_snapshot(self._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
@property
- def end_clock_value(self):
- clock_value_ptr = native_bt.message_discarded_events_get_end_clock_value(self._ptr)
+ def end_clock_snapshot(self):
+ clock_snapshot_ptr = native_bt.message_discarded_events_get_end_clock_snapshot(self._ptr)
- if clock_value_ptr is None:
+ if clock_snapshot_ptr is None:
return
- clock_value = bt2.clock_value._create_clock_value_from_ptr(clock_value_ptr)
- return clock_value
+ clock_snapshot = bt2.clock_snapshot._create_clock_snapshot_from_ptr(clock_snapshot_ptr)
+ return clock_snapshot
_MESSAGE_TYPE_TO_CLS = {
with self.assertRaises(TypeError):
self._cc.uuid = object()
- def test_create_clock_value(self):
- cv = self._cc(756)
- self.assertEqual(cv.clock_class.addr, self._cc.addr)
+ def test_create_clock_snapshot(self):
+ cs = self._cc(756)
+ self.assertEqual(cs.clock_class.addr, self._cc.addr)
def _test_copy(self, cpy):
self.assertIsNot(cpy, self._cc)
@unittest.skip("this is broken")
-class ClockValueTestCase(unittest.TestCase):
+class ClockSnapshotTestCase(unittest.TestCase):
def setUp(self):
self._cc = bt2.ClockClass('salut', 1000,
offset=bt2.ClockClassOffset(45, 354))
- self._cv = self._cc(123)
+ self._cs = self._cc(123)
def tearDown(self):
del self._cc
- del self._cv
+ del self._cs
def test_create_default(self):
- self.assertEqual(self._cv.clock_class.addr, self._cc.addr)
- self.assertEqual(self._cv.cycles, 123)
+ self.assertEqual(self._cs.clock_class.addr, self._cc.addr)
+ self.assertEqual(self._cs.cycles, 123)
def test_create_invalid_cycles_type(self):
with self.assertRaises(TypeError):
def test_ns_from_epoch(self):
s_from_epoch = 45 + ((354 + 123) / 1000)
ns_from_epoch = int(s_from_epoch * 1e9)
- self.assertEqual(self._cv.ns_from_epoch, ns_from_epoch)
+ self.assertEqual(self._cs.ns_from_epoch, ns_from_epoch)
def test_eq(self):
- cv1 = self._cc(123)
- cv2 = self._cc(123)
- self.assertEqual(cv1, cv2)
+ cs1 = self._cc(123)
+ cs2 = self._cc(123)
+ self.assertEqual(cs1, cs2)
def test_eq_int(self):
- cv1 = self._cc(123)
- self.assertEqual(cv1, 123)
+ cs1 = self._cc(123)
+ self.assertEqual(cs1, 123)
def test_ne_clock_class(self):
cc1 = bt2.ClockClass('yes', 1500)
cc2 = bt2.ClockClass('yes', 1501)
- cv1 = cc1(123)
- cv2 = cc2(123)
- self.assertNotEqual(cv1, cv2)
+ cs1 = cc1(123)
+ cs2 = cc2(123)
+ self.assertNotEqual(cs1, cs2)
def test_ne_cycles(self):
- cv1 = self._cc(123)
- cv2 = self._cc(125)
- self.assertNotEqual(cv1, cv2)
+ cs1 = self._cc(123)
+ cs2 = self._cc(125)
+ self.assertNotEqual(cs1, cs2)
def test_eq_invalid(self):
- self.assertFalse(self._cv == 23)
+ self.assertFalse(self._cs == 23)
def _test_copy(self, cpy):
- self.assertIsNot(cpy, self._cv)
- self.assertNotEqual(cpy.addr, self._cv.addr)
- self.assertEqual(cpy, self._cv)
+ self.assertIsNot(cpy, self._cs)
+ self.assertNotEqual(cpy.addr, self._cs.addr)
+ self.assertEqual(cpy, self._cs)
def test_copy(self):
- cpy = copy.copy(self._cv)
+ cpy = copy.copy(self._cs)
self._test_copy(cpy)
def test_deepcopy(self):
- cpy = copy.deepcopy(self._cv)
+ cpy = copy.deepcopy(self._cs)
self._test_copy(cpy)
self.assertEqual(ev.payload_field['gnu'], 124)
self.assertEqual(ev.payload_field['mosquito'], 17)
- def test_clock_value(self):
+ def test_clock_snapshot(self):
tc = bt2.Trace()
tc.add_stream_class(self._ec.stream_class)
cc = bt2.ClockClass('hi', 1000)
tc.add_clock_class(cc)
ev = self._ec()
- ev.clock_values.add(cc(177))
- self.assertEqual(ev.clock_values[cc].cycles, 177)
+ ev.clock_snapshots.add(cc(177))
+ self.assertEqual(ev.clock_snapshots[cc].cycles, 177)
- def test_no_clock_value(self):
+ def test_no_clock_snapshot(self):
tc = bt2.Trace()
tc.add_stream_class(self._ec.stream_class)
cc = bt2.ClockClass('hi', 1000)
tc.add_clock_class(cc)
ev = self._ec()
- self.assertIsNone(ev.clock_values[cc])
+ self.assertIsNone(ev.clock_snapshots[cc])
def test_no_packet(self):
ev = self._ec()
tc.add_clock_class(cc)
ev = self._ec()
self._fill_ev(ev)
- ev.clock_values.add(cc(234))
+ ev.clock_snapshots.add(cc(234))
return ev
def test_getitem(self):
tc.add_clock_class(cc)
ev1 = self._ec()
self._fill_ev(ev1)
- ev1.clock_values.add(cc(234))
+ ev1.clock_snapshots.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.clock_values.add(cc(234))
+ ev2.clock_snapshots.add(cc(234))
self.assertEqual(ev1, ev2)
def test_ne_header_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.header_field['id'] = 19
- ev1.clock_values.add(cc(234))
+ ev1.clock_snapshots.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.clock_values.add(cc(234))
+ ev2.clock_snapshots.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_stream_event_context_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.stream_event_context_field['cpu_id'] = 3
- ev1.clock_values.add(cc(234))
+ ev1.clock_snapshots.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.clock_values.add(cc(234))
+ ev2.clock_snapshots.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_context_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.context_field['ant'] = -3
- ev1.clock_values.add(cc(234))
+ ev1.clock_snapshots.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.clock_values.add(cc(234))
+ ev2.clock_snapshots.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_ne_payload_field(self):
ev1 = self._ec()
self._fill_ev(ev1)
ev1.payload_field['mosquito'] = 98
- ev1.clock_values.add(cc(234))
+ ev1.clock_snapshots.add(cc(234))
ev2 = self._ec()
self._fill_ev(ev2)
- ev2.clock_values.add(cc(234))
+ ev2.clock_snapshots.add(cc(234))
self.assertNotEqual(ev1, ev2)
def test_eq_invalid(self):
tc.add_clock_class(cc)
ev = self._ec()
self._fill_ev(ev)
- ev.clock_values.add(cc(234))
+ ev.clock_snapshots.add(cc(234))
cpy = func(ev)
self.assertIsNot(ev, cpy)
self.assertNotEqual(ev.addr, cpy.addr)
self._packet = self._stream.create_packet()
self._packet.header_field['hello'] = 19487
self._event = self._ec()
- self._event.clock_values.add(self._clock_class(1772))
+ self._event.clock_snapshots.add(self._clock_class(1772))
self._event.payload_field['my_int'] = 23
self._event.packet = self._packet
def test_create_with_cc_prio_map(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
self.assertEqual(len(msg.clock_class_priority_map), 2)
self.assertEqual(msg.clock_class_priority_map, self._cc_prio_map)
- self.assertEqual(msg.clock_values[self._cc1], 123)
- self.assertEqual(msg.clock_values[self._cc2], 19487)
+ self.assertEqual(msg.clock_snapshots[self._cc1], 123)
+ self.assertEqual(msg.clock_snapshots[self._cc2], 19487)
def test_eq(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
cc_prio_map_copy = copy.copy(self._cc_prio_map)
msg2 = bt2.InactivityMessage(cc_prio_map_copy)
- msg2.clock_values.add(self._cc1(123))
- msg2.clock_values.add(self._cc2(19487))
+ msg2.clock_snapshots.add(self._cc1(123))
+ msg2.clock_snapshots.add(self._cc2(19487))
self.assertEqual(msg, msg2)
def test_ne_cc_prio_map(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
cc_prio_map_copy = copy.copy(self._cc_prio_map)
cc_prio_map_copy[self._cc2] = 23
msg2 = bt2.InactivityMessage(cc_prio_map_copy)
self.assertNotEqual(msg, msg2)
- def test_ne_clock_value(self):
+ def test_ne_clock_snapshot(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
msg2 = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(1847))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(1847))
self.assertNotEqual(msg, msg2)
def test_eq_invalid(self):
def test_copy(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
msg_copy = copy.copy(msg)
self.assertEqual(msg, msg_copy)
self.assertNotEqual(msg.addr, msg_copy.addr)
self.assertEqual(msg.clock_class_priority_map.addr,
msg_copy.clock_class_priority_map.addr)
- self.assertEqual(msg_copy.clock_values[self._cc1], 123)
- self.assertEqual(msg_copy.clock_values[self._cc2], 19487)
+ self.assertEqual(msg_copy.clock_snapshots[self._cc1], 123)
+ self.assertEqual(msg_copy.clock_snapshots[self._cc2], 19487)
def test_deepcopy(self):
msg = bt2.InactivityMessage(self._cc_prio_map)
- msg.clock_values.add(self._cc1(123))
- msg.clock_values.add(self._cc2(19487))
+ msg.clock_snapshots.add(self._cc1(123))
+ msg.clock_snapshots.add(self._cc2(19487))
msg_copy = copy.deepcopy(msg)
self.assertEqual(msg, msg_copy)
self.assertNotEqual(msg.addr, msg_copy.addr)
msg_copy.clock_class_priority_map)
self.assertNotEqual(list(msg.clock_class_priority_map)[0].addr,
list(msg_copy.clock_class_priority_map)[0].addr)
- self.assertIsNone(msg_copy.clock_values[self._cc1])
- self.assertIsNone(msg_copy.clock_values[self._cc2])
- self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[0]], 123)
- self.assertEqual(msg_copy.clock_values[list(msg_copy.clock_class_priority_map)[1]], 19487)
+ self.assertIsNone(msg_copy.clock_snapshots[self._cc1])
+ self.assertIsNone(msg_copy.clock_snapshots[self._cc2])
+ self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[0]], 123)
+ self.assertEqual(msg_copy.clock_snapshots[list(msg_copy.clock_class_priority_map)[1]], 19487)
@unittest.skip("this is broken")
def test_stream(self):
self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
- def test_beginning_clock_value(self):
+ def test_beginning_clock_snapshot(self):
msg = self._get_msg()
- beginning_clock_value = msg.beginning_clock_value
- self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_value, 6)
+ beginning_clock_snapshot = msg.beginning_clock_snapshot
+ self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
+ self.assertEqual(beginning_clock_snapshot, 6)
- def test_end_clock_value(self):
+ def test_end_clock_snapshot(self):
msg = self._get_msg()
- end_clock_value = msg.end_clock_value
- self.assertEqual(end_clock_value.clock_class, self._clock_class)
- self.assertEqual(end_clock_value, 7)
+ end_clock_snapshot = msg.end_clock_snapshot
+ self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
+ self.assertEqual(end_clock_snapshot, 7)
def test_eq(self):
msg1 = self._get_msg()
def test_stream(self):
self.assertEqual(self._get_msg().stream.addr, self._stream.addr)
- def test_beginning_clock_value(self):
+ def test_beginning_clock_snapshot(self):
msg = self._get_msg()
- beginning_clock_value = msg.beginning_clock_value
- self.assertEqual(beginning_clock_value.clock_class, self._clock_class)
- self.assertEqual(beginning_clock_value, 6)
+ beginning_clock_snapshot = msg.beginning_clock_snapshot
+ self.assertEqual(beginning_clock_snapshot.clock_class, self._clock_class)
+ self.assertEqual(beginning_clock_snapshot, 6)
- def test_end_clock_value(self):
+ def test_end_clock_snapshot(self):
msg = self._get_msg()
- end_clock_value = msg.end_clock_value
- self.assertEqual(end_clock_value.clock_class, self._clock_class)
- self.assertEqual(end_clock_value, 10)
+ end_clock_snapshot = msg.end_clock_snapshot
+ self.assertEqual(end_clock_snapshot.clock_class, self._clock_class)
+ self.assertEqual(end_clock_snapshot, 10)
def test_eq(self):
msg1 = self._get_msg()