-
- @property
- def _clock_classes(self):
- stream_class = self.event_class.stream_class
-
- if stream_class is None:
- return []
-
- trace = stream_class.trace
-
- if trace is None:
- return []
-
- clock_classes = []
-
- for clock_class in trace.clock_classes.values():
- clock_classes.append(clock_class)
-
- return clock_classes
-
- @property
- def _clock_class_ptrs(self):
- return [cc._ptr for cc in self._clock_classes]
-
- def __eq__(self, other):
- if type(other) is not type(self):
- return False
-
- if self.addr == other.addr:
- return True
-
- self_clock_snapshots = {}
- other_clock_snapshots = {}
-
- for clock_class_ptr in self._clock_class_ptrs:
- self_clock_snapshots[int(clock_class_ptr)] = self._get_clock_snapshot_cycles(clock_class_ptr)
-
- for clock_class_ptr in other._clock_class_ptrs:
- other_clock_snapshots[int(clock_class_ptr)] = self._get_clock_snapshot_cycles(clock_class_ptr)
-
- self_props = (
- self.header_field,
- self.stream_event_context_field,
- self.context_field,
- self.payload_field,
- self_clock_snapshots,
- )
- other_props = (
- other.header_field,
- other.stream_event_context_field,
- other.context_field,
- other.payload_field,
- other_clock_snapshots,
- )
- return self_props == other_props
-
- def _copy(self, copy_func):
- cpy = self.event_class()
-
- # copy fields
- cpy.header_field = copy_func(self.header_field)
- cpy.stream_event_context_field = copy_func(self.stream_event_context_field)
- cpy.context_field = copy_func(self.context_field)
- cpy.payload_field = copy_func(self.payload_field)
-
- # Copy known clock value references. It's not necessary to copy
- # clock class or clock value objects because once a clock value
- # is created from a clock class, the clock class is frozen.
- # Thus even if we copy the clock class, the user cannot modify
- # it, therefore it's useless to copy it.
- for clock_class in self._clock_classes:
- clock_snapshot = self.clock_snapshots[clock_class]
-
- if clock_snapshot is not None:
- cpy.clock_snapshots.add(clock_snapshot)
-
- return cpy
-
- def __copy__(self):
- return self._copy(copy.copy)
-
- def __deepcopy__(self, memo):
- cpy = self._copy(copy.deepcopy)
- memo[id(self)] = cpy
- return cpy