# The MIT License (MIT)
#
-# Copyright (c) 2015-2016 Philippe Proulx <pproulx@efficios.com>
+# Copyright (c) 2015-2020 Philippe Proulx <pproulx@efficios.com>
#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
#
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-# THE SOFTWARE.
-
-from barectf import metadata
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import barectf.version as barectf_version
+from typing import Optional, Any, FrozenSet, Mapping, Iterator, Set, Union
+import typing
+from barectf.typing import Count, Alignment, _OptStr, Id
+import collections.abc
import collections
import datetime
-import barectf
import enum
-import yaml
-import uuid
-import copy
-import re
-import os
+import uuid as uuidp
-class ConfigError(RuntimeError):
- def __init__(self, msg, prev=None):
- super().__init__(msg)
- self._prev = prev
-
- @property
- def prev(self):
- return self._prev
-
-
-class Config:
- def __init__(self, version, prefix, metadata):
- self.prefix = prefix
- self.version = version
- self.metadata = metadata
+@enum.unique
+class ByteOrder(enum.Enum):
+ LITTLE_ENDIAN = 'le'
+ BIG_ENDIAN = 'be'
- def _validate_metadata(self, meta):
- try:
- validator = _MetadataTypesHistologyValidator()
- validator.validate(meta)
- validator = _MetadataDynamicTypesValidator()
- validator.validate(meta)
- validator = _MetadataSpecialFieldsValidator()
- validator.validate(meta)
- except Exception as e:
- raise ConfigError('metadata error', e)
- try:
- validator = _BarectfMetadataValidator()
- validator.validate(meta)
- except Exception as e:
- raise ConfigError('barectf metadata error', e)
+class _FieldType:
+ @property
+ def alignment(self) -> Alignment:
+ raise NotImplementedError
- def _augment_metadata_env(self, meta):
- env = meta.env
- env['domain'] = 'bare'
- env['tracer_name'] = 'barectf'
- version_tuple = barectf.get_version_tuple()
- env['tracer_major'] = version_tuple[0]
- env['tracer_minor'] = version_tuple[1]
- env['tracer_patch'] = version_tuple[2]
- env['barectf_gen_date'] = str(datetime.datetime.now().isoformat())
+class _BitArrayFieldType(_FieldType):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Alignment = Alignment(1)):
+ self._size = size
+ self._byte_order = byte_order
+ self._alignment = alignment
@property
- def version(self):
- return self._version
-
- @version.setter
- def version(self, value):
- self._version = value
+ def size(self) -> Count:
+ return self._size
@property
- def metadata(self):
- return self._metadata
-
- @metadata.setter
- def metadata(self, value):
- self._validate_metadata(value)
- self._augment_metadata_env(value)
- self._metadata = value
+ def byte_order(self) -> Optional[ByteOrder]:
+ return self._byte_order
@property
- def prefix(self):
- return self._prefix
-
- @prefix.setter
- def prefix(self, value):
- if not is_valid_identifier(value):
- raise ConfigError('prefix must be a valid C identifier')
-
- self._prefix = value
-
-
-def _is_assoc_array_prop(node):
- return isinstance(node, dict)
-
-
-def _is_array_prop(node):
- return isinstance(node, list)
-
-
-def _is_int_prop(node):
- return type(node) is int
-
-
-def _is_str_prop(node):
- return type(node) is str
-
-
-def _is_bool_prop(node):
- return type(node) is bool
-
-
-def _is_valid_alignment(align):
- return ((align & (align - 1)) == 0) and align > 0
-
-
-def _byte_order_str_to_bo(bo_str):
- bo_str = bo_str.lower()
+ def alignment(self) -> Alignment:
+ return self._alignment
- if bo_str == 'le':
- return metadata.ByteOrder.LE
- elif bo_str == 'be':
- return metadata.ByteOrder.BE
+class DisplayBase(enum.Enum):
+ BINARY = 2
+ OCTAL = 8
+ DECIMAL = 10
+ HEXADECIMAL = 16
-def _encoding_str_to_encoding(encoding_str):
- encoding_str = encoding_str.lower()
- if encoding_str == 'utf-8' or encoding_str == 'utf8':
- return metadata.Encoding.UTF8
- elif encoding_str == 'ascii':
- return metadata.Encoding.ASCII
- elif encoding_str == 'none':
- return metadata.Encoding.NONE
+class _IntegerFieldType(_BitArrayFieldType):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Optional[Alignment] = None,
+ preferred_display_base: DisplayBase = DisplayBase.DECIMAL):
+ effective_alignment = 1
+ if alignment is None and size % 8 == 0:
+ effective_alignment = 8
-_re_iden = re.compile(r'^[a-zA-Z][a-zA-Z0-9_]*$')
-_ctf_keywords = set([
- 'align',
- 'callsite',
- 'clock',
- 'enum',
- 'env',
- 'event',
- 'floating_point',
- 'integer',
- 'stream',
- 'string',
- 'struct',
- 'trace',
- 'typealias',
- 'typedef',
- 'variant',
-])
+ super().__init__(size, byte_order, Alignment(effective_alignment))
+ self._preferred_display_base = preferred_display_base
+ @property
+ def preferred_display_base(self) -> DisplayBase:
+ return self._preferred_display_base
-def is_valid_identifier(iden):
- if not _re_iden.match(iden):
- return False
-
- if _re_iden in _ctf_keywords:
- return False
-
- return True
-
-
-def _get_first_unknown_prop(node, known_props):
- for prop_name in node:
- if prop_name in known_props:
- continue
-
- return prop_name
-
-
-# This validator validates the configured metadata for barectf specific
-# needs.
-#
-# barectf needs:
-#
-# * all header/contexts are at least byte-aligned
-# * all integer and floating point number sizes to be <= 64
-# * no inner structures, arrays, or variants
-class _BarectfMetadataValidator:
- def __init__(self):
- self._type_to_validate_type_func = {
- metadata.Integer: self._validate_int_type,
- metadata.FloatingPoint: self._validate_float_type,
- metadata.Enum: self._validate_enum_type,
- metadata.String: self._validate_string_type,
- metadata.Struct: self._validate_struct_type,
- metadata.Array: self._validate_array_type,
- metadata.Variant: self._validate_variant_type,
- }
-
- def _validate_int_type(self, t, entity_root):
- if t.size > 64:
- raise ConfigError('integer type\'s size must be lesser than or equal to 64 bits')
-
- def _validate_float_type(self, t, entity_root):
- if t.size > 64:
- raise ConfigError('floating point number type\'s size must be lesser than or equal to 64 bits')
-
- def _validate_enum_type(self, t, entity_root):
- if t.value_type.size > 64:
- raise ConfigError('enumeration type\'s integer type\'s size must be lesser than or equal to 64 bits')
-
- def _validate_string_type(self, t, entity_root):
- pass
-
- def _validate_struct_type(self, t, entity_root):
- if not entity_root:
- raise ConfigError('inner structure types are not supported as of this version')
-
- for field_name, field_type in t.fields.items():
- if entity_root and self._cur_entity is _Entity.TRACE_PACKET_HEADER:
- if field_name == 'uuid':
- # allow
- continue
-
- try:
- self._validate_type(field_type, False)
- except Exception as e:
- raise ConfigError('in structure type\'s field "{}"'.format(field_name), e)
-
- def _validate_array_type(self, t, entity_root):
- raise ConfigError('array types are not supported as of this version')
-
- def _validate_variant_type(self, t, entity_root):
- raise ConfigError('variant types are not supported as of this version')
-
- def _validate_type(self, t, entity_root):
- self._type_to_validate_type_func[type(t)](t, entity_root)
-
- def _validate_entity(self, t):
- if t is None:
- return
-
- # make sure entity is byte-aligned
- if t.align < 8:
- raise ConfigError('type\'s alignment must be at least byte-aligned')
-
- # make sure entity is a structure
- if type(t) is not metadata.Struct:
- raise ConfigError('expecting a structure type')
- # validate types
- self._validate_type(t, True)
+class UnsignedIntegerFieldType(_IntegerFieldType):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self._mapped_clk_type_name = None
- def _validate_entities_and_names(self, meta):
- self._cur_entity = _Entity.TRACE_PACKET_HEADER
- try:
- self._validate_entity(meta.trace.packet_header_type)
- except Exception as e:
- raise ConfigError('invalid trace packet header type', e)
+class SignedIntegerFieldType(_IntegerFieldType):
+ pass
- for stream_name, stream in meta.streams.items():
- if not is_valid_identifier(stream_name):
- raise ConfigError('stream name "{}" is not a valid C identifier'.format(stream_name))
- self._cur_entity = _Entity.STREAM_PACKET_CONTEXT
+class EnumerationFieldTypeMappingRange:
+ def __init__(self, lower: int, upper: int):
+ self._lower = lower
+ self._upper = upper
- try:
- self._validate_entity(stream.packet_context_type)
- except Exception as e:
- raise ConfigError('invalid packet context type in stream "{}"'.format(stream_name), e)
+ @property
+ def lower(self) -> int:
+ return self._lower
- self._cur_entity = _Entity.STREAM_EVENT_HEADER
+ @property
+ def upper(self) -> int:
+ return self._upper
- try:
- self._validate_entity(stream.event_header_type)
- except Exception as e:
- raise ConfigError('invalid event header type in stream "{}"'.format(stream_name), e)
+ def __eq__(self, other: Any) -> bool:
+ if type(other) is not type(self):
+ return False
- self._cur_entity = _Entity.STREAM_EVENT_CONTEXT
+ return (self._lower, self._upper) == (other._lower, other._upper)
- try:
- self._validate_entity(stream.event_context_type)
- except Exception as e:
- raise ConfigError('invalid event context type in stream "{}"'.format(stream_name), e)
+ def __hash__(self) -> int:
+ return hash((self._lower, self._upper))
- try:
- for ev_name, ev in stream.events.items():
- if not is_valid_identifier(ev_name):
- raise ConfigError('event name "{}" is not a valid C identifier'.format(ev_name))
+ def contains(self, value: int) -> bool:
+ return self._lower <= value <= self._upper
- self._cur_entity = _Entity.EVENT_CONTEXT
- try:
- self._validate_entity(ev.context_type)
- except Exception as e:
- raise ConfigError('invalid context type in event "{}"'.format(ev_name), e)
+class EnumerationFieldTypeMapping:
+ def __init__(self, ranges: Set[EnumerationFieldTypeMappingRange]):
+ self._ranges = frozenset(ranges)
- self._cur_entity = _Entity.EVENT_PAYLOAD
+ @property
+ def ranges(self) -> FrozenSet[EnumerationFieldTypeMappingRange]:
+ return self._ranges
- if ev.payload_type is None:
- raise ConfigError('missing payload type in event "{}"'.format(ev_name), e)
+ def ranges_contain_value(self, value: int) -> bool:
+ return any([rg.contains(value) for rg in self._ranges])
- try:
- self._validate_entity(ev.payload_type)
- except Exception as e:
- raise ConfigError('invalid payload type in event "{}"'.format(ev_name), e)
- if not ev.payload_type.fields:
- raise ConfigError('empty payload type in event "{}"'.format(ev_name), e)
- except Exception as e:
- raise ConfigError('invalid stream "{}"'.format(stream_name), e)
+_EnumFtMappings = Mapping[str, EnumerationFieldTypeMapping]
- def validate(self, meta):
- self._validate_entities_and_names(meta)
+class EnumerationFieldTypeMappings(collections.abc.Mapping):
+ def __init__(self, mappings: _EnumFtMappings):
+ self._mappings = {label: mapping for label, mapping in mappings.items()}
-# This validator validates special fields of trace, stream, and event
-# types. For example, if checks that the "stream_id" field exists in the
-# trace packet header if there's more than one stream, and much more.
-class _MetadataSpecialFieldsValidator:
- def _validate_trace_packet_header_type(self, t):
- # needs "stream_id" field?
- if len(self._meta.streams) > 1:
- # yes
- if t is None:
- raise ConfigError('need "stream_id" field in trace packet header type, but trace packet header type is missing')
+ def __getitem__(self, key: str) -> EnumerationFieldTypeMapping:
+ return self._mappings[key]
- if type(t) is not metadata.Struct:
- raise ConfigError('need "stream_id" field in trace packet header type, but trace packet header type is not a structure type')
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._mappings)
- if 'stream_id' not in t.fields:
- raise ConfigError('need "stream_id" field in trace packet header type')
+ def __len__(self) -> int:
+ return len(self._mappings)
- # validate "magic" and "stream_id" types
- if type(t) is not metadata.Struct:
- return
- for i, (field_name, field_type) in enumerate(t.fields.items()):
- if field_name == 'magic':
- if type(field_type) is not metadata.Integer:
- raise ConfigError('"magic" field in trace packet header type must be an integer type')
+class _EnumerationFieldType(_IntegerFieldType):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Optional[Alignment] = None,
+ preferred_display_base: DisplayBase = DisplayBase.DECIMAL,
+ mappings: Optional[_EnumFtMappings] = None):
+ super().__init__(size, byte_order, alignment, preferred_display_base)
+ self._mappings = EnumerationFieldTypeMappings({})
- if field_type.signed or field_type.size != 32:
- raise ConfigError('"magic" field in trace packet header type must be a 32-bit unsigned integer type')
+ if mappings is not None:
+ self._mappings = EnumerationFieldTypeMappings(mappings)
- if i != 0:
- raise ConfigError('"magic" field must be the first trace packet header type\'s field')
- elif field_name == 'stream_id':
- if type(field_type) is not metadata.Integer:
- raise ConfigError('"stream_id" field in trace packet header type must be an integer type')
+ @property
+ def mappings(self) -> EnumerationFieldTypeMappings:
+ return self._mappings
- if field_type.signed:
- raise ConfigError('"stream_id" field in trace packet header type must be an unsigned integer type')
+ def labels_for_value(self, value: int) -> Set[str]:
+ labels = set()
- # "id" size can fit all event IDs
- if len(self._meta.streams) > (1 << field_type.size):
- raise ConfigError('"stream_id" field\' size in trace packet header type is too small for the number of trace streams')
- elif field_name == 'uuid':
- if self._meta.trace.uuid is None:
- raise ConfigError('"uuid" field in trace packet header type specified, but no trace UUID provided')
+ for label, mapping in self._mappings.items():
+ if mapping.ranges_contain_value(value):
+ labels.add(label)
- if type(field_type) is not metadata.Array:
- raise ConfigError('"uuid" field in trace packet header type must be an array')
+ return labels
- if field_type.length != 16:
- raise ConfigError('"uuid" field in trace packet header type must be an array of 16 bytes')
- element_type = field_type.element_type
+class UnsignedEnumerationFieldType(_EnumerationFieldType, UnsignedIntegerFieldType):
+ pass
- if type(element_type) is not metadata.Integer:
- raise ConfigError('"uuid" field in trace packet header type must be an array of 16 unsigned bytes')
- if element_type.size != 8:
- raise ConfigError('"uuid" field in trace packet header type must be an array of 16 unsigned bytes')
+class SignedEnumerationFieldType(_EnumerationFieldType, SignedIntegerFieldType):
+ pass
- if element_type.signed:
- raise ConfigError('"uuid" field in trace packet header type must be an array of 16 unsigned bytes')
- if element_type.align != 8:
- raise ConfigError('"uuid" field in trace packet header type must be an array of 16 unsigned, byte-aligned bytes')
+class RealFieldType(_BitArrayFieldType):
+ pass
- def _validate_trace(self, meta):
- self._validate_trace_packet_header_type(meta.trace.packet_header_type)
- def _validate_stream_packet_context(self, stream):
- t = stream.packet_context_type
+class StringFieldType(_FieldType):
+ @property
+ def alignment(self) -> Alignment:
+ return Alignment(8)
- if type(t) is None:
- raise ConfigError('missing "packet-context-type" property in stream object')
- if type(t) is not metadata.Struct:
- raise ConfigError('"packet-context-type": expecting a structure type')
+class _ArrayFieldType(_FieldType):
+ def __init__(self, element_field_type: _FieldType):
+ self._element_field_type = element_field_type
- # "timestamp_begin", if exists, is an unsigned integer type,
- # mapped to a clock
- ts_begin = None
+ @property
+ def element_field_type(self) -> _FieldType:
+ return self._element_field_type
- if 'timestamp_begin' in t.fields:
- ts_begin = t.fields['timestamp_begin']
+ @property
+ def alignment(self) -> Alignment:
+ return self._element_field_type.alignment
- if type(ts_begin) is not metadata.Integer:
- raise ConfigError('"timestamp_begin" field in stream packet context type must be an integer type')
- if ts_begin.signed:
- raise ConfigError('"timestamp_begin" field in stream packet context type must be an unsigned integer type')
+class StaticArrayFieldType(_ArrayFieldType):
+ def __init__(self, length: Count, element_field_type: _FieldType):
+ super().__init__(element_field_type)
+ self._length = length
- if not ts_begin.property_mappings:
- raise ConfigError('"timestamp_begin" field in stream packet context type must be mapped to a clock')
+ @property
+ def length(self) -> Count:
+ return self._length
- # "timestamp_end", if exists, is an unsigned integer type,
- # mapped to a clock
- ts_end = None
- if 'timestamp_end' in t.fields:
- ts_end = t.fields['timestamp_end']
+class StructureFieldTypeMember:
+ def __init__(self, field_type: _FieldType):
+ self._field_type = field_type
- if type(ts_end) is not metadata.Integer:
- raise ConfigError('"timestamp_end" field in stream packet context type must be an integer type')
+ @property
+ def field_type(self) -> _FieldType:
+ return self._field_type
- if ts_end.signed:
- raise ConfigError('"timestamp_end" field in stream packet context type must be an unsigned integer type')
- if not ts_end.property_mappings:
- raise ConfigError('"timestamp_end" field in stream packet context type must be mapped to a clock')
+_StructFtMembers = Mapping[str, StructureFieldTypeMember]
- # "timestamp_begin" and "timestamp_end" exist together
- if (('timestamp_begin' in t.fields) ^ ('timestamp_end' in t.fields)):
- raise ConfigError('"timestamp_begin" and "timestamp_end" fields must be defined together in stream packet context type')
- # "timestamp_begin" and "timestamp_end" are mapped to the same clock
- if ts_begin is not None and ts_end is not None:
- if ts_begin.property_mappings[0].object.name != ts_end.property_mappings[0].object.name:
- raise ConfigError('"timestamp_begin" and "timestamp_end" fields must be mapped to the same clock object in stream packet context type')
+class StructureFieldTypeMembers(collections.abc.Mapping):
+ def __init__(self, members: _StructFtMembers):
+ self._members = collections.OrderedDict()
- # "events_discarded", if exists, is an unsigned integer type
- if 'events_discarded' in t.fields:
- events_discarded = t.fields['events_discarded']
+ for name, member in members.items():
+ assert type(member) is StructureFieldTypeMember
+ self._members[name] = member
- if type(events_discarded) is not metadata.Integer:
- raise ConfigError('"events_discarded" field in stream packet context type must be an integer type')
+ def __getitem__(self, key: str) -> StructureFieldTypeMember:
+ return self._members[key]
- if events_discarded.signed:
- raise ConfigError('"events_discarded" field in stream packet context type must be an unsigned integer type')
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._members)
- # "packet_size" and "content_size" must exist
- if 'packet_size' not in t.fields:
- raise ConfigError('missing "packet_size" field in stream packet context type')
+ def __len__(self) -> int:
+ return len(self._members)
- packet_size = t.fields['packet_size']
- # "content_size" and "content_size" must exist
- if 'content_size' not in t.fields:
- raise ConfigError('missing "content_size" field in stream packet context type')
+class StructureFieldType(_FieldType):
+ def __init__(self, minimum_alignment: Alignment = Alignment(1),
+ members: Optional[_StructFtMembers] = None):
+ self._minimum_alignment = minimum_alignment
+ self._members = StructureFieldTypeMembers({})
- content_size = t.fields['content_size']
+ if members is not None:
+ self._members = StructureFieldTypeMembers(members)
- # "packet_size" is an unsigned integer type
- if type(packet_size) is not metadata.Integer:
- raise ConfigError('"packet_size" field in stream packet context type must be an integer type')
+ self._set_alignment()
- if packet_size.signed:
- raise ConfigError('"packet_size" field in stream packet context type must be an unsigned integer type')
+ def _set_alignment(self):
+ self._alignment: Alignment = self._minimum_alignment
- # "content_size" is an unsigned integer type
- if type(content_size) is not metadata.Integer:
- raise ConfigError('"content_size" field in stream packet context type must be an integer type')
+ for member in self._members.values():
+ if member.field_type.alignment > self._alignment:
+ self._alignment = member.field_type.alignment
- if content_size.signed:
- raise ConfigError('"content_size" field in stream packet context type must be an unsigned integer type')
+ @property
+ def minimum_alignment(self) -> Alignment:
+ return self._minimum_alignment
- # "packet_size" size should be greater than or equal to "content_size" size
- if content_size.size > packet_size.size:
- raise ConfigError('"content_size" field size must be lesser than or equal to "packet_size" field size')
+ @property
+ def alignment(self) -> Alignment:
+ return self._alignment
- def _validate_stream_event_header(self, stream):
- t = stream.event_header_type
+ @property
+ def members(self) -> StructureFieldTypeMembers:
+ return self._members
- # needs "id" field?
- if len(stream.events) > 1:
- # yes
- if t is None:
- raise ConfigError('need "id" field in stream event header type, but stream event header type is missing')
- if type(t) is not metadata.Struct:
- raise ConfigError('need "id" field in stream event header type, but stream event header type is not a structure type')
+class _UniqueByName:
+ _name: str
- if 'id' not in t.fields:
- raise ConfigError('need "id" field in stream event header type')
+ def __eq__(self, other: Any) -> bool:
+ if type(other) is not type(self):
+ return False
- # validate "id" and "timestamp" types
- if type(t) is not metadata.Struct:
- return
+ return self._name == other._name
- # "timestamp", if exists, is an unsigned integer type,
- # mapped to a clock
- if 'timestamp' in t.fields:
- ts = t.fields['timestamp']
+ def __lt__(self, other: '_UniqueByName'):
+ assert type(self) is type(other)
+ return self._name < other._name
- if type(ts) is not metadata.Integer:
- raise ConfigError('"timestamp" field in stream event header type must be an integer type')
+ def __hash__(self) -> int:
+ return hash(self._name)
- if ts.signed:
- raise ConfigError('"timestamp" field in stream event header type must be an unsigned integer type')
- if not ts.property_mappings:
- raise ConfigError('"timestamp" field in stream event header type must be mapped to a clock')
+_OptFt = Optional[_FieldType]
+_OptStructFt = Optional[StructureFieldType]
+LogLevel = typing.NewType('LogLevel', int)
- if 'id' in t.fields:
- eid = t.fields['id']
- # "id" is an unsigned integer type
- if type(eid) is not metadata.Integer:
- raise ConfigError('"id" field in stream event header type must be an integer type')
+class EventType(_UniqueByName):
+ def __init__(self, name: str, log_level: Optional[LogLevel] = None,
+ specific_context_field_type: _OptStructFt = None, payload_field_type: _OptStructFt = None):
+ self._id: Optional[Id] = None
+ self._name = name
+ self._log_level = log_level
+ self._specific_context_field_type = specific_context_field_type
+ self._payload_field_type = payload_field_type
- if eid.signed:
- raise ConfigError('"id" field in stream event header type must be an unsigned integer type')
+ @property
+ def id(self) -> Optional[Id]:
+ return self._id
- # "id" size can fit all event IDs
- if len(stream.events) > (1 << eid.size):
- raise ConfigError('"id" field\' size in stream event header type is too small for the number of stream events')
+ @property
+ def name(self) -> str:
+ return self._name
- def _validate_stream(self, stream):
- self._validate_stream_packet_context(stream)
- self._validate_stream_event_header(stream)
+ @property
+ def log_level(self) -> Optional[LogLevel]:
+ return self._log_level
- def validate(self, meta):
- self._meta = meta
- self._validate_trace(meta)
+ @property
+ def specific_context_field_type(self) -> _OptStructFt:
+ return self._specific_context_field_type
- for stream in meta.streams.values():
- try:
- self._validate_stream(stream)
- except Exception as e:
- raise ConfigError('invalid stream "{}"'.format(stream.name), e)
+ @property
+ def payload_field_type(self) -> _OptStructFt:
+ return self._payload_field_type
-class _MetadataDynamicTypesValidatorStackEntry:
- def __init__(self, base_t):
- self._base_t = base_t
- self._index = 0
+class ClockTypeOffset:
+ def __init__(self, seconds: int = 0, cycles: Count = Count(0)):
+ self._seconds = seconds
+ self._cycles = cycles
@property
- def index(self):
- return self._index
-
- @index.setter
- def index(self, value):
- self._index = value
+ def seconds(self) -> int:
+ return self._seconds
@property
- def base_t(self):
- return self._base_t
+ def cycles(self) -> Count:
+ return self._cycles
- @base_t.setter
- def base_t(self, value):
- self._base_t = value
-
-
-# Entities. Order of values is important here.
-@enum.unique
-class _Entity(enum.IntEnum):
- TRACE_PACKET_HEADER = 0
- STREAM_PACKET_CONTEXT = 1
- STREAM_EVENT_HEADER = 2
- STREAM_EVENT_CONTEXT = 3
- EVENT_CONTEXT = 4
- EVENT_PAYLOAD = 5
-
-
-# This validator validates dynamic metadata types, that is, it ensures
-# variable-length array lengths and variant tags actually point to
-# something that exists. It also checks that variable-length array
-# lengths point to integer types and variant tags to enumeration types.
-class _MetadataDynamicTypesValidator:
- def __init__(self):
- self._type_to_visit_type_func = {
- metadata.Integer: None,
- metadata.FloatingPoint: None,
- metadata.Enum: None,
- metadata.String: None,
- metadata.Struct: self._visit_struct_type,
- metadata.Array: self._visit_array_type,
- metadata.Variant: self._visit_variant_type,
- }
-
- self._cur_trace = None
- self._cur_stream = None
- self._cur_event = None
-
- def _lookup_path_from_base(self, path, parts, base, start_index,
- base_is_current, from_t):
- index = start_index
- cur_t = base
- found_path = []
-
- while index < len(parts):
- part = parts[index]
- next_t = None
-
- if type(cur_t) is metadata.Struct:
- enumerated_items = enumerate(cur_t.fields.items())
-
- # lookup each field
- for i, (field_name, field_type) in enumerated_items:
- if field_name == part:
- next_t = field_type
- found_path.append((i, field_type))
-
- if next_t is None:
- raise ConfigError('invalid path "{}": cannot find field "{}" in structure type'.format(path, part))
- elif type(cur_t) is metadata.Variant:
- enumerated_items = enumerate(cur_t.types.items())
-
- # lookup each type
- for i, (type_name, type_type) in enumerated_items:
- if type_name == part:
- next_t = type_type
- found_path.append((i, type_type))
-
- if next_t is None:
- raise ConfigError('invalid path "{}": cannot find type "{}" in variant type'.format(path, part))
- else:
- raise ConfigError('invalid path "{}": requesting "{}" in a non-variant, non-structure type'.format(path, part))
-
- cur_t = next_t
- index += 1
-
- # make sure that the pointed type is not the pointing type
- if from_t is cur_t:
- raise ConfigError('invalid path "{}": pointing to self'.format(path))
-
- # if we're here, we found the type; however, it could be located
- # _after_ the variant/VLA looking for it, if the pointing
- # and pointed types are in the same entity, so compare the
- # current stack entries indexes to our index path in that case
- if not base_is_current:
- return cur_t
-
- for index, entry in enumerate(self._stack):
- if index == len(found_path):
- # end of index path; valid so far
- break
-
- if found_path[index][0] > entry.index:
- raise ConfigError('invalid path "{}": pointed type is after pointing type'.format(path))
-
- # also make sure that both pointed and pointing types share
- # a common structure ancestor
- for index, entry in enumerate(self._stack):
- if index == len(found_path):
- break
-
- if entry.base_t is not found_path[index][1]:
- # found common ancestor
- if type(entry.base_t) is metadata.Variant:
- raise ConfigError('invalid path "{}": type cannot be reached because pointed and pointing types are in the same variant type'.format(path))
-
- return cur_t
-
- def _lookup_path_from_top(self, path, parts):
- if len(parts) != 1:
- raise ConfigError('invalid path "{}": multipart relative path not supported'.format(path))
-
- find_name = parts[0]
- index = len(self._stack) - 1
- got_struct = False
-
- # check stack entries in reversed order
- for entry in reversed(self._stack):
- # structure base type
- if type(entry.base_t) is metadata.Struct:
- got_struct = True
- enumerated_items = enumerate(entry.base_t.fields.items())
-
- # lookup each field, until the current visiting index is met
- for i, (field_name, field_type) in enumerated_items:
- if i == entry.index:
- break
-
- if field_name == find_name:
- return field_type
-
- # variant base type
- elif type(entry.base_t) is metadata.Variant:
- enumerated_items = enumerate(entry.base_t.types.items())
-
- # lookup each type, until the current visiting index is met
- for i, (type_name, type_type) in enumerated_items:
- if i == entry.index:
- break
-
- if type_name == find_name:
- if not got_struct:
- raise ConfigError('invalid path "{}": type cannot be reached because pointed and pointing types are in the same variant type'.format(path))
-
- return type_type
-
- # nothing returned here: cannot find type
- raise ConfigError('invalid path "{}": cannot find type in current context'.format(path))
-
- def _lookup_path(self, path, from_t):
- parts = path.lower().split('.')
- base = None
- base_is_current = False
-
- if len(parts) >= 3:
- if parts[0] == 'trace':
- if parts[1] == 'packet' and parts[2] == 'header':
- # make sure packet header exists
- if self._cur_trace.packet_header_type is None:
- raise ConfigError('invalid path "{}": no defined trace packet header type'.format(path))
-
- base = self._cur_trace.packet_header_type
-
- if self._cur_entity == _Entity.TRACE_PACKET_HEADER:
- base_is_current = True
- else:
- raise ConfigError('invalid path "{}": unknown names after "trace"'.format(path))
- elif parts[0] == 'stream':
- if parts[1] == 'packet' and parts[2] == 'context':
- if self._cur_entity < _Entity.STREAM_PACKET_CONTEXT:
- raise ConfigError('invalid path "{}": cannot access stream packet context here'.format(path))
- if self._cur_stream.packet_context_type is None:
- raise ConfigError('invalid path "{}": no defined stream packet context type'.format(path))
-
- base = self._cur_stream.packet_context_type
-
- if self._cur_entity == _Entity.STREAM_PACKET_CONTEXT:
- base_is_current = True
- elif parts[1] == 'event':
- if parts[2] == 'header':
- if self._cur_entity < _Entity.STREAM_EVENT_HEADER:
- raise ConfigError('invalid path "{}": cannot access stream event header here'.format(path))
-
- if self._cur_stream.event_header_type is None:
- raise ConfigError('invalid path "{}": no defined stream event header type'.format(path))
+_OptUuid = Optional[uuidp.UUID]
- base = self._cur_stream.event_header_type
- if self._cur_entity == _Entity.STREAM_EVENT_HEADER:
- base_is_current = True
- elif parts[2] == 'context':
- if self._cur_entity < _Entity.STREAM_EVENT_CONTEXT:
- raise ConfigError('invalid path "{}": cannot access stream event context here'.format(path))
-
- if self._cur_stream.event_context_type is None:
- raise ConfigError('invalid path "{}": no defined stream event context type'.format(path))
-
- base = self._cur_stream.event_context_type
+class ClockType(_UniqueByName):
+ def __init__(self, name: str, frequency: Count = Count(int(1e9)), uuid: _OptUuid = None,
+ description: _OptStr = None, precision: Count = Count(0),
+ offset: Optional[ClockTypeOffset] = None, origin_is_unix_epoch: bool = False):
+ self._name = name
+ self._frequency = frequency
+ self._uuid = uuid
+ self._description = description
+ self._precision = precision
+ self._offset = ClockTypeOffset()
- if self._cur_entity == _Entity.STREAM_EVENT_CONTEXT:
- base_is_current = True
- else:
- raise ConfigError('invalid path "{}": unknown names after "stream.event"'.format(path))
- else:
- raise ConfigError('invalid path "{}": unknown names after "stream"'.format(path))
+ if offset is not None:
+ self._offset = offset
- if base is not None:
- start_index = 3
-
- if len(parts) >= 2 and base is None:
- if parts[0] == 'event':
- if parts[1] == 'context':
- if self._cur_entity < _Entity.EVENT_CONTEXT:
- raise ConfigError('invalid path "{}": cannot access event context here'.format(path))
+ self._origin_is_unix_epoch = origin_is_unix_epoch
- if self._cur_event.context_type is None:
- raise ConfigError('invalid path "{}": no defined event context type'.format(path))
+ @property
+ def name(self) -> str:
+ return self._name
- base = self._cur_event.context_type
+ @property
+ def frequency(self) -> Count:
+ return self._frequency
- if self._cur_entity == _Entity.EVENT_CONTEXT:
- base_is_current = True
- elif parts[1] == 'payload' or parts[1] == 'fields':
- if self._cur_entity < _Entity.EVENT_PAYLOAD:
- raise ConfigError('invalid path "{}": cannot access event payload here'.format(path))
+ @property
+ def uuid(self) -> _OptUuid:
+ return self._uuid
- if self._cur_event.payload_type is None:
- raise ConfigError('invalid path "{}": no defined event payload type'.format(path))
+ @property
+ def description(self) -> _OptStr:
+ return self._description
- base = self._cur_event.payload_type
+ @property
+ def precision(self) -> Count:
+ return self._precision
- if self._cur_entity == _Entity.EVENT_PAYLOAD:
- base_is_current = True
- else:
- raise ConfigError('invalid path "{}": unknown names after "event"'.format(path))
+ @property
+ def offset(self) -> ClockTypeOffset:
+ return self._offset
- if base is not None:
- start_index = 2
+ @property
+ def origin_is_unix_epoch(self) -> bool:
+ return self._origin_is_unix_epoch
- if base is not None:
- return self._lookup_path_from_base(path, parts, base, start_index,
- base_is_current, from_t)
- else:
- return self._lookup_path_from_top(path, parts)
- def _stack_reset(self):
- self._stack = []
+DEFAULT_FIELD_TYPE = 'default'
+_DefaultableUIntFt = Union[str, UnsignedIntegerFieldType]
+_OptDefaultableUIntFt = Optional[_DefaultableUIntFt]
+_OptUIntFt = Optional[UnsignedIntegerFieldType]
- def _stack_push(self, base_t):
- entry = _MetadataDynamicTypesValidatorStackEntry(base_t)
- self._stack.append(entry)
- def _stack_pop(self):
- self._stack.pop()
+class StreamTypePacketFeatures:
+ def __init__(self, total_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ content_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ beginning_time_field_type: _OptDefaultableUIntFt = None,
+ end_time_field_type: _OptDefaultableUIntFt = None,
+ discarded_events_counter_field_type: _OptDefaultableUIntFt = None):
+ def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
+ if user_ft == DEFAULT_FIELD_TYPE:
+ return UnsignedIntegerFieldType(64)
- def _stack_incr_index(self):
- self._stack[-1].index += 1
+ return typing.cast(_OptUIntFt, user_ft)
- def _visit_struct_type(self, t):
- self._stack_push(t)
+ self._total_size_field_type = get_ft(total_size_field_type)
+ self._content_size_field_type = get_ft(content_size_field_type)
+ self._beginning_time_field_type = get_ft(beginning_time_field_type)
+ self._end_time_field_type = get_ft(end_time_field_type)
+ self._discarded_events_counter_field_type = get_ft(discarded_events_counter_field_type)
- for field_name, field_type in t.fields.items():
- try:
- self._visit_type(field_type)
- except Exception as e:
- raise ConfigError('in structure type\'s field "{}"'.format(field_name), e)
+ @property
+ def total_size_field_type(self) -> _OptUIntFt:
+ return self._total_size_field_type
- self._stack_incr_index()
+ @property
+ def content_size_field_type(self) -> _OptUIntFt:
+ return self._content_size_field_type
- self._stack_pop()
+ @property
+ def beginning_time_field_type(self) -> _OptUIntFt:
+ return self._beginning_time_field_type
- def _visit_array_type(self, t):
- if t.is_variable_length:
- # find length type
- try:
- length_type = self._lookup_path(t.length, t)
- except Exception as e:
- raise ConfigError('invalid array type\'s length', e)
+ @property
+ def end_time_field_type(self) -> _OptUIntFt:
+ return self._end_time_field_type
- # make sure length type an unsigned integer
- if type(length_type) is not metadata.Integer:
- raise ConfigError('array type\'s length does not point to an integer type')
+ @property
+ def discarded_events_counter_field_type(self) -> _OptUIntFt:
+ return self._discarded_events_counter_field_type
- if length_type.signed:
- raise ConfigError('array type\'s length does not point to an unsigned integer type')
- self._visit_type(t.element_type)
+class StreamTypeEventFeatures:
+ def __init__(self, type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ time_field_type: _OptDefaultableUIntFt = None):
+ def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
+ if user_ft == DEFAULT_FIELD_TYPE:
+ return UnsignedIntegerFieldType(64)
- def _visit_variant_type(self, t):
- # find tag type
- try:
- tag_type = self._lookup_path(t.tag, t)
- except Exception as e:
- raise ConfigError('invalid variant type\'s tag', e)
+ return typing.cast(_OptUIntFt, user_ft)
- # make sure tag type is an enumeration
- if type(tag_type) is not metadata.Enum:
- raise ConfigError('variant type\'s tag does not point to an enumeration type')
+ self._type_id_field_type = get_ft(type_id_field_type)
+ self._time_field_type = get_ft(time_field_type)
- # verify that each variant type's type exists as an enumeration member
- for tag_name in t.types.keys():
- if tag_name not in tag_type.members:
- raise ConfigError('cannot find variant type\'s type "{}" in pointed tag type'.format(tag_name))
+ @property
+ def type_id_field_type(self) -> _OptUIntFt:
+ return self._type_id_field_type
- self._stack_push(t)
+ @property
+ def time_field_type(self) -> _OptUIntFt:
+ return self._time_field_type
- for type_name, type_type in t.types.items():
- try:
- self._visit_type(type_type)
- except Exception as e:
- raise ConfigError('in variant type\'s type "{}"'.format(type_name), e)
- self._stack_incr_index()
+class StreamTypeFeatures:
+ def __init__(self, packet_features: Optional[StreamTypePacketFeatures] = None,
+ event_features: Optional[StreamTypeEventFeatures] = None):
+ self._packet_features = StreamTypePacketFeatures()
- self._stack_pop()
+ if packet_features is not None:
+ self._packet_features = packet_features
- def _visit_type(self, t):
- if t is None:
- return
+ self._event_features = StreamTypeEventFeatures()
- if type(t) in self._type_to_visit_type_func:
- func = self._type_to_visit_type_func[type(t)]
-
- if func is not None:
- func(t)
-
- def _visit_event(self, ev):
- ev_name = ev.name
-
- # set current event
- self._cur_event = ev
-
- # visit event context type
- self._stack_reset()
- self._cur_entity = _Entity.EVENT_CONTEXT
-
- try:
- self._visit_type(ev.context_type)
- except Exception as e:
- raise ConfigError('invalid context type in event "{}"'.format(ev_name), e)
-
- # visit event payload type
- self._stack_reset()
- self._cur_entity = _Entity.EVENT_PAYLOAD
-
- try:
- self._visit_type(ev.payload_type)
- except Exception as e:
- raise ConfigError('invalid payload type in event "{}"'.format(ev_name), e)
-
- def _visit_stream(self, stream):
- stream_name = stream.name
-
- # set current stream
- self._cur_stream = stream
-
- # reset current event
- self._cur_event = None
-
- # visit stream packet context type
- self._stack_reset()
- self._cur_entity = _Entity.STREAM_PACKET_CONTEXT
-
- try:
- self._visit_type(stream.packet_context_type)
- except Exception as e:
- raise ConfigError('invalid packet context type in stream "{}"'.format(stream_name), e)
-
- # visit stream event header type
- self._stack_reset()
- self._cur_entity = _Entity.STREAM_EVENT_HEADER
-
- try:
- self._visit_type(stream.event_header_type)
- except Exception as e:
- raise ConfigError('invalid event header type in stream "{}"'.format(stream_name), e)
-
- # visit stream event context type
- self._stack_reset()
- self._cur_entity = _Entity.STREAM_EVENT_CONTEXT
-
- try:
- self._visit_type(stream.event_context_type)
- except Exception as e:
- raise ConfigError('invalid event context type in stream "{}"'.format(stream_name), e)
-
- # visit events
- for ev in stream.events.values():
- try:
- self._visit_event(ev)
- except Exception as e:
- raise ConfigError('invalid stream "{}"'.format(stream_name))
-
- def validate(self, meta):
- # set current trace
- self._cur_trace = meta.trace
-
- # visit trace packet header type
- self._stack_reset()
- self._cur_entity = _Entity.TRACE_PACKET_HEADER
-
- try:
- self._visit_type(meta.trace.packet_header_type)
- except Exception as e:
- raise ConfigError('invalid packet header type in trace', e)
-
- # visit streams
- for stream in meta.streams.values():
- self._visit_stream(stream)
-
-
-# Since type inheritance allows types to be only partially defined at
-# any place in the configuration, this validator validates that actual
-# trace, stream, and event types are all complete and valid. Therefore
-# an invalid, but unusued type alias is accepted.
-class _MetadataTypesHistologyValidator:
- def __init__(self):
- self._type_to_validate_type_histology_func = {
- metadata.Integer: self._validate_integer_histology,
- metadata.FloatingPoint: self._validate_float_histology,
- metadata.Enum: self._validate_enum_histology,
- metadata.String: self._validate_string_histology,
- metadata.Struct: self._validate_struct_histology,
- metadata.Array: self._validate_array_histology,
- metadata.Variant: self._validate_variant_histology,
- }
-
- def _validate_integer_histology(self, t):
- # size is set
- if t.size is None:
- raise ConfigError('missing integer type\'s size')
-
- def _validate_float_histology(self, t):
- # exponent digits is set
- if t.exp_size is None:
- raise ConfigError('missing floating point number type\'s exponent size')
-
- # mantissa digits is set
- if t.mant_size is None:
- raise ConfigError('missing floating point number type\'s mantissa size')
-
- # exponent and mantissa sum is a multiple of 8
- if (t.exp_size + t.mant_size) % 8 != 0:
- raise ConfigError('floating point number type\'s mantissa and exponent sizes sum must be a multiple of 8')
-
- def _validate_enum_histology(self, t):
- # integer type is set
- if t.value_type is None:
- raise ConfigError('missing enumeration type\'s value type')
-
- # there's at least one member
- if not t.members:
- raise ConfigError('enumeration type needs at least one member')
-
- # no overlapping values and all values are valid considering
- # the value type
- ranges = []
-
- if t.value_type.signed:
- value_min = -(1 << t.value_type.size - 1)
- value_max = (1 << (t.value_type.size - 1)) - 1
- else:
- value_min = 0
- value_max = (1 << t.value_type.size) - 1
-
- for label, value in t.members.items():
- for rg in ranges:
- if value[0] <= rg[1] and rg[0] <= value[1]:
- raise ConfigError('enumeration type\'s member "{}" overlaps another member'.format(label))
-
- fmt = 'enumeration type\'s member "{}": value {} is outside the value type range [{}, {}]'
-
- if value[0] < value_min or value[0] > value_max:
- raise ConfigError(fmt.format(label, value[0], value_min, value_max))
-
- if value[1] < value_min or value[1] > value_max:
- raise ConfigError(fmt.format(label, value[1], value_min, value_max))
-
- ranges.append(value)
-
- def _validate_string_histology(self, t):
- # always valid
- pass
-
- def _validate_struct_histology(self, t):
- # all fields are valid
- for field_name, field_type in t.fields.items():
- try:
- self._validate_type_histology(field_type)
- except Exception as e:
- raise ConfigError('invalid structure type\'s field "{}"'.format(field_name), e)
-
- def _validate_array_histology(self, t):
- # length is set
- if t.length is None:
- raise ConfigError('missing array type\'s length')
-
- # element type is set
- if t.element_type is None:
- raise ConfigError('missing array type\'s element type')
-
- # element type is valid
- try:
- self._validate_type_histology(t.element_type)
- except Exception as e:
- raise ConfigError('invalid array type\'s element type', e)
-
- def _validate_variant_histology(self, t):
- # tag is set
- if t.tag is None:
- raise ConfigError('missing variant type\'s tag')
-
- # there's at least one type
- if not t.types:
- raise ConfigError('variant type needs at least one type')
-
- # all types are valid
- for type_name, type_t in t.types.items():
- try:
- self._validate_type_histology(type_t)
- except Exception as e:
- raise ConfigError('invalid variant type\'s type "{}"'.format(type_name), e)
-
- def _validate_type_histology(self, t):
- if t is None:
- return
+ if event_features is not None:
+ self._event_features = event_features
- self._type_to_validate_type_histology_func[type(t)](t)
+ @property
+ def packet_features(self) -> StreamTypePacketFeatures:
+ return self._packet_features
- def _validate_entity_type_histology(self, t):
- if t is None:
+ @property
+ def event_features(self) -> StreamTypeEventFeatures:
+ return self._event_features
+
+
+class StreamType(_UniqueByName):
+ def __init__(self, name: str, event_types: Set[EventType],
+ default_clock_type: Optional[ClockType] = None,
+ features: Optional[StreamTypeFeatures] = None,
+ packet_context_field_type_extra_members: Optional[_StructFtMembers] = None,
+ event_common_context_field_type: _OptStructFt = None):
+ self._id: Optional[Id] = None
+ self._name = name
+ self._default_clock_type = default_clock_type
+ self._event_common_context_field_type = event_common_context_field_type
+ self._event_types = frozenset(event_types)
+
+ # assign unique IDs
+ for index, ev_type in enumerate(sorted(self._event_types, key=lambda evt: evt.name)):
+ assert ev_type._id is None
+ ev_type._id = Id(index)
+
+ self._set_features(features)
+ self._packet_context_field_type_extra_members = StructureFieldTypeMembers({})
+
+ if packet_context_field_type_extra_members is not None:
+ self._packet_context_field_type_extra_members = StructureFieldTypeMembers(packet_context_field_type_extra_members)
+
+ self._set_pkt_ctx_ft()
+ self._set_ev_header_ft()
+
+ def _set_features(self, features: Optional[StreamTypeFeatures]):
+ if features is not None:
+ self._features = features
+ return None
+
+ ev_time_ft = None
+ pkt_beginning_time_ft = None
+ pkt_end_time_ft = None
+
+ if self._default_clock_type is not None:
+ # Automatic time field types because the stream type has a
+ # default clock type.
+ ev_time_ft = DEFAULT_FIELD_TYPE
+ pkt_beginning_time_ft = DEFAULT_FIELD_TYPE
+ pkt_end_time_ft = DEFAULT_FIELD_TYPE
+
+ self._features = StreamTypeFeatures(StreamTypePacketFeatures(beginning_time_field_type=pkt_beginning_time_ft,
+ end_time_field_type=pkt_end_time_ft),
+ StreamTypeEventFeatures(time_field_type=ev_time_ft))
+
+ def _set_ft_mapped_clk_type_name(self, ft: Optional[UnsignedIntegerFieldType]):
+ if ft is None:
return
- if type(t) is not metadata.Struct:
- raise ConfigError('expecting a structure type')
-
- self._validate_type_histology(t)
-
- def _validate_event_types_histology(self, ev):
- ev_name = ev.name
-
- # validate event context type
- try:
- self._validate_entity_type_histology(ev.context_type)
- except Exception as e:
- raise ConfigError('invalid event context type for event "{}"'.format(ev_name), e)
-
- # validate event payload type
- if ev.payload_type is None:
- raise ConfigError('event payload type must exist in event "{}"'.format(ev_name))
-
- # TODO: also check arrays, sequences, and variants
- if type(ev.payload_type) is metadata.Struct:
- if not ev.payload_type.fields:
- raise ConfigError('event payload type must have at least one field for event "{}"'.format(ev_name))
-
- try:
- self._validate_entity_type_histology(ev.payload_type)
- except Exception as e:
- raise ConfigError('invalid event payload type for event "{}"'.format(ev_name), e)
-
- def _validate_stream_types_histology(self, stream):
- stream_name = stream.name
-
- # validate stream packet context type
- try:
- self._validate_entity_type_histology(stream.packet_context_type)
- except Exception as e:
- raise ConfigError('invalid stream packet context type for stream "{}"'.format(stream_name), e)
-
- # validate stream event header type
- try:
- self._validate_entity_type_histology(stream.event_header_type)
- except Exception as e:
- raise ConfigError('invalid stream event header type for stream "{}"'.format(stream_name), e)
-
- # validate stream event context type
- try:
- self._validate_entity_type_histology(stream.event_context_type)
- except Exception as e:
- raise ConfigError('invalid stream event context type for stream "{}"'.format(stream_name), e)
-
- # validate events
- for ev in stream.events.values():
- try:
- self._validate_event_types_histology(ev)
- except Exception as e:
- raise ConfigError('invalid event in stream "{}"'.format(stream_name), e)
-
- def validate(self, meta):
- # validate trace packet header type
- try:
- self._validate_entity_type_histology(meta.trace.packet_header_type)
- except Exception as e:
- raise ConfigError('invalid trace packet header type', e)
-
- # validate streams
- for stream in meta.streams.values():
- self._validate_stream_types_histology(stream)
-
-
-class _YamlConfigParser:
- def __init__(self, include_dirs, ignore_include_not_found, dump_config):
- self._class_name_to_create_type_func = {
- 'int': self._create_integer,
- 'integer': self._create_integer,
- 'flt': self._create_float,
- 'float': self._create_float,
- 'floating-point': self._create_float,
- 'enum': self._create_enum,
- 'enumeration': self._create_enum,
- 'str': self._create_string,
- 'string': self._create_string,
- 'struct': self._create_struct,
- 'structure': self._create_struct,
- 'array': self._create_array,
- 'var': self._create_variant,
- 'variant': self._create_variant,
- }
- self._type_to_create_type_func = {
- metadata.Integer: self._create_integer,
- metadata.FloatingPoint: self._create_float,
- metadata.Enum: self._create_enum,
- metadata.String: self._create_string,
- metadata.Struct: self._create_struct,
- metadata.Array: self._create_array,
- metadata.Variant: self._create_variant,
- }
- self._include_dirs = include_dirs
- self._ignore_include_not_found = ignore_include_not_found
- self._dump_config = dump_config
-
- def _set_byte_order(self, metadata_node):
- if 'trace' not in metadata_node:
- raise ConfigError('missing "trace" property (metadata)')
-
- trace_node = metadata_node['trace']
-
- if not _is_assoc_array_prop(trace_node):
- raise ConfigError('"trace" property (metadata) must be an associative array')
-
- if 'byte-order' not in trace_node:
- raise ConfigError('missing "byte-order" property (trace)')
-
- bo_node = trace_node['byte-order']
-
- if not _is_str_prop(bo_node):
- raise ConfigError('"byte-order" property of trace object must be a string ("le" or "be")')
-
- self._bo = _byte_order_str_to_bo(bo_node)
-
- if self._bo is None:
- raise ConfigError('invalid "byte-order" property (trace): must be "le" or "be"')
-
- def _lookup_type_alias(self, name):
- if name in self._tas:
- return copy.deepcopy(self._tas[name])
-
- def _set_int_clock_prop_mapping(self, int_obj, prop_mapping_node):
- unk_prop = _get_first_unknown_prop(prop_mapping_node, ['type', 'name', 'property'])
-
- if unk_prop:
- raise ConfigError('unknown property in integer type object\'s clock property mapping: "{}"'.format(unk_prop))
-
- if 'name' not in prop_mapping_node:
- raise ConfigError('missing "name" property in integer type object\'s clock property mapping')
-
- if 'property' not in prop_mapping_node:
- raise ConfigError('missing "property" property in integer type object\'s clock property mapping')
-
- clock_name = prop_mapping_node['name']
- prop = prop_mapping_node['property']
-
- if not _is_str_prop(clock_name):
- raise ConfigError('"name" property of integer type object\'s clock property mapping must be a string')
-
- if not _is_str_prop(prop):
- raise ConfigError('"property" property of integer type object\'s clock property mapping must be a string')
-
- if clock_name not in self._clocks:
- raise ConfigError('invalid clock name "{}" in integer type object\'s clock property mapping'.format(clock_name))
-
- if prop != 'value':
- raise ConfigError('invalid "property" property in integer type object\'s clock property mapping: "{}"'.format(prop))
-
- mapped_clock = self._clocks[clock_name]
- int_obj.property_mappings.append(metadata.PropertyMapping(mapped_clock, prop))
-
- def _get_first_unknown_type_prop(self, type_node, known_props):
- kp = known_props + ['inherit', 'class']
-
- if self._version >= 201:
- kp.append('$inherit')
-
- return _get_first_unknown_prop(type_node, kp)
-
- def _create_integer(self, obj, node):
- if obj is None:
- # create integer object
- obj = metadata.Integer()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'size',
- 'align',
- 'signed',
- 'byte-order',
- 'base',
- 'encoding',
- 'property-mappings',
- ])
-
- if unk_prop:
- raise ConfigError('unknown integer type object property: "{}"'.format(unk_prop))
-
- # size
- if 'size' in node:
- size = node['size']
-
- if not _is_int_prop(size):
- raise ConfigError('"size" property of integer type object must be an integer')
-
- if size < 1:
- raise ConfigError('invalid integer size: {}'.format(size))
-
- obj.size = size
-
- # align
- if 'align' in node:
- align = node['align']
-
- if align is None:
- obj.set_default_align()
- else:
- if not _is_int_prop(align):
- raise ConfigError('"align" property of integer type object must be an integer')
-
- if not _is_valid_alignment(align):
- raise ConfigError('invalid alignment: {}'.format(align))
-
- obj.align = align
-
- # signed
- if 'signed' in node:
- signed = node['signed']
-
- if signed is None:
- obj.set_default_signed()
- else:
- if not _is_bool_prop(signed):
- raise ConfigError('"signed" property of integer type object must be a boolean')
-
- obj.signed = signed
-
- # byte order
- if 'byte-order' in node:
- byte_order = node['byte-order']
-
- if byte_order is None:
- obj.byte_order = self._bo
- else:
- if not _is_str_prop(byte_order):
- raise ConfigError('"byte-order" property of integer type object must be a string ("le" or "be")')
-
- byte_order = _byte_order_str_to_bo(byte_order)
-
- if byte_order is None:
- raise ConfigError('invalid "byte-order" property in integer type object')
-
- obj.byte_order = byte_order
- else:
- obj.byte_order = self._bo
-
- # base
- if 'base' in node:
- base = node['base']
-
- if base is None:
- obj.set_default_base()
- else:
- if not _is_str_prop(base):
- raise ConfigError('"base" property of integer type object must be a string ("bin", "oct", "dec", or "hex")')
-
- if base == 'bin':
- base = 2
- elif base == 'oct':
- base = 8
- elif base == 'dec':
- base = 10
- elif base == 'hex':
- base = 16
- else:
- raise ConfigError('unknown "base" property value: "{}" ("bin", "oct", "dec", and "hex" are accepted)'.format(base))
-
- obj.base = base
-
- # encoding
- if 'encoding' in node:
- encoding = node['encoding']
-
- if encoding is None:
- obj.set_default_encoding()
- else:
- if not _is_str_prop(encoding):
- raise ConfigError('"encoding" property of integer type object must be a string ("none", "ascii", or "utf-8")')
-
- encoding = _encoding_str_to_encoding(encoding)
-
- if encoding is None:
- raise ConfigError('invalid "encoding" property in integer type object')
-
- obj.encoding = encoding
-
- # property mappings
- if 'property-mappings' in node:
- prop_mappings = node['property-mappings']
-
- if prop_mappings is None:
- obj.set_default_property_mappings()
- else:
- if not _is_array_prop(prop_mappings):
- raise ConfigError('"property-mappings" property of integer type object must be an array')
-
- if len(prop_mappings) > 1:
- raise ConfigError('length of "property-mappings" array in integer type object must be 1')
-
- for index, prop_mapping in enumerate(prop_mappings):
- if not _is_assoc_array_prop(prop_mapping):
- raise ConfigError('elements of "property-mappings" property of integer type object must be associative arrays')
-
- if 'type' not in prop_mapping:
- raise ConfigError('missing "type" property in integer type object\'s "property-mappings" array\'s element #{}'.format(index))
-
- prop_type = prop_mapping['type']
-
- if not _is_str_prop(prop_type):
- raise ConfigError('"type" property of integer type object\'s "property-mappings" array\'s element #{} must be a string'.format(index))
-
- if prop_type == 'clock':
- self._set_int_clock_prop_mapping(obj, prop_mapping)
- else:
- raise ConfigError('unknown property mapping type "{}" in integer type object\'s "property-mappings" array\'s element #{}'.format(prop_type, index))
-
- return obj
-
- def _create_float(self, obj, node):
- if obj is None:
- # create floating point number object
- obj = metadata.FloatingPoint()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'size',
- 'align',
- 'byte-order',
- ])
-
- if unk_prop:
- raise ConfigError('unknown floating point number type object property: "{}"'.format(unk_prop))
-
- # size
- if 'size' in node:
- size = node['size']
-
- if not _is_assoc_array_prop(size):
- raise ConfigError('"size" property of floating point number type object must be an associative array')
-
- unk_prop = _get_first_unknown_prop(size, ['exp', 'mant'])
-
- if unk_prop:
- raise ConfigError('unknown floating point number type object\'s "size" property: "{}"'.format(unk_prop))
-
- if 'exp' in size:
- exp = size['exp']
-
- if not _is_int_prop(exp):
- raise ConfigError('"exp" property of floating point number type object\'s "size" property must be an integer')
-
- if exp < 1:
- raise ConfigError('invalid floating point number exponent size: {}')
-
- obj.exp_size = exp
-
- if 'mant' in size:
- mant = size['mant']
-
- if not _is_int_prop(mant):
- raise ConfigError('"mant" property of floating point number type object\'s "size" property must be an integer')
-
- if mant < 1:
- raise ConfigError('invalid floating point number mantissa size: {}')
-
- obj.mant_size = mant
-
- # align
- if 'align' in node:
- align = node['align']
-
- if align is None:
- obj.set_default_align()
- else:
- if not _is_int_prop(align):
- raise ConfigError('"align" property of floating point number type object must be an integer')
-
- if not _is_valid_alignment(align):
- raise ConfigError('invalid alignment: {}'.format(align))
-
- obj.align = align
-
- # byte order
- if 'byte-order' in node:
- byte_order = node['byte-order']
-
- if byte_order is None:
- obj.byte_order = self._bo
- else:
- if not _is_str_prop(byte_order):
- raise ConfigError('"byte-order" property of floating point number type object must be a string ("le" or "be")')
-
- byte_order = _byte_order_str_to_bo(byte_order)
-
- if byte_order is None:
- raise ConfigError('invalid "byte-order" property in floating point number type object')
- else:
- obj.byte_order = self._bo
-
- return obj
-
- def _create_enum(self, obj, node):
- if obj is None:
- # create enumeration object
- obj = metadata.Enum()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'value-type',
- 'members',
- ])
-
- if unk_prop:
- raise ConfigError('unknown enumeration type object property: "{}"'.format(unk_prop))
-
- # value type
- if 'value-type' in node:
- value_type_node = node['value-type']
-
- try:
- obj.value_type = self._create_type(value_type_node)
- except Exception as e:
- raise ConfigError('cannot create enumeration type\'s integer type', e)
-
- # members
- if 'members' in node:
- members_node = node['members']
-
- if not _is_array_prop(members_node):
- raise ConfigError('"members" property of enumeration type object must be an array')
-
- cur = 0
- last_value = obj.last_value
-
- if last_value is None:
- cur = 0
- else:
- cur = last_value + 1
-
- for index, m_node in enumerate(members_node):
- if not _is_str_prop(m_node) and not _is_assoc_array_prop(m_node):
- raise ConfigError('invalid enumeration member #{}: expecting a string or an associative array'.format(index))
-
- if _is_str_prop(m_node):
- label = m_node
- value = (cur, cur)
- cur += 1
- else:
- unk_prop = _get_first_unknown_prop(m_node, [
- 'label',
- 'value',
- ])
-
- if unk_prop:
- raise ConfigError('unknown enumeration type member object property: "{}"'.format(unk_prop))
-
- if 'label' not in m_node:
- raise ConfigError('missing "label" property in enumeration member #{}'.format(index))
-
- label = m_node['label']
-
- if not _is_str_prop(label):
- raise ConfigError('"label" property of enumeration member #{} must be a string'.format(index))
-
- if 'value' not in m_node:
- raise ConfigError('missing "value" property in enumeration member ("{}")'.format(label))
-
- value = m_node['value']
-
- if not _is_int_prop(value) and not _is_array_prop(value):
- raise ConfigError('invalid enumeration member ("{}"): expecting an integer or an array'.format(label))
-
- if _is_int_prop(value):
- cur = value + 1
- value = (value, value)
- else:
- if len(value) != 2:
- raise ConfigError('invalid enumeration member ("{}"): range must have exactly two items'.format(label))
-
- mn = value[0]
- mx = value[1]
-
- if mn > mx:
- raise ConfigError('invalid enumeration member ("{}"): invalid range ({} > {})'.format(label, mn, mx))
-
- value = (mn, mx)
- cur = mx + 1
-
- obj.members[label] = value
-
- return obj
-
- def _create_string(self, obj, node):
- if obj is None:
- # create string object
- obj = metadata.String()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'encoding',
- ])
-
- if unk_prop:
- raise ConfigError('unknown string type object property: "{}"'.format(unk_prop))
-
- # encoding
- if 'encoding' in node:
- encoding = node['encoding']
-
- if encoding is None:
- obj.set_default_encoding()
- else:
- if not _is_str_prop(encoding):
- raise ConfigError('"encoding" property of string type object must be a string ("none", "ascii", or "utf-8")')
-
- encoding = _encoding_str_to_encoding(encoding)
-
- if encoding is None:
- raise ConfigError('invalid "encoding" property in string type object')
-
- obj.encoding = encoding
-
- return obj
-
- def _create_struct(self, obj, node):
- if obj is None:
- # create structure object
- obj = metadata.Struct()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'min-align',
- 'fields',
- ])
-
- if unk_prop:
- raise ConfigError('unknown string type object property: "{}"'.format(unk_prop))
-
- # minimum alignment
- if 'min-align' in node:
- min_align = node['min-align']
-
- if min_align is None:
- obj.set_default_min_align()
- else:
- if not _is_int_prop(min_align):
- raise ConfigError('"min-align" property of structure type object must be an integer')
-
- if not _is_valid_alignment(min_align):
- raise ConfigError('invalid minimum alignment: {}'.format(min_align))
-
- obj.min_align = min_align
-
- # fields
- if 'fields' in node:
- fields = node['fields']
-
- if fields is None:
- obj.set_default_fields()
- else:
- if not _is_assoc_array_prop(fields):
- raise ConfigError('"fields" property of structure type object must be an associative array')
-
- for field_name, field_node in fields.items():
- if not is_valid_identifier(field_name):
- raise ConfigError('"{}" is not a valid field name for structure type'.format(field_name))
-
- try:
- obj.fields[field_name] = self._create_type(field_node)
- except Exception as e:
- raise ConfigError('cannot create structure type\'s field "{}"'.format(field_name), e)
-
- return obj
-
- def _create_array(self, obj, node):
- if obj is None:
- # create array object
- obj = metadata.Array()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'length',
- 'element-type',
- ])
-
- if unk_prop:
- raise ConfigError('unknown array type object property: "{}"'.format(unk_prop))
-
- # length
- if 'length' in node:
- length = node['length']
+ if self._default_clock_type is not None:
+ assert isinstance(ft, UnsignedIntegerFieldType)
+ ft._mapped_clk_type_name = self._default_clock_type.name
- if not _is_int_prop(length) and not _is_str_prop(length):
- raise ConfigError('"length" property of array type object must be an integer or a string')
+ def _set_pkt_ctx_ft(self):
+ members = None
- if type(length) is int and length < 0:
- raise ConfigError('invalid static array length: {}'.format(length))
+ def add_member_if_exists(name: str, ft: _FieldType, set_mapped_clk_type_name: bool = False):
+ nonlocal members
- obj.length = length
+ if ft is not None:
+ if set_mapped_clk_type_name:
+ self._set_ft_mapped_clk_type_name(typing.cast(UnsignedIntegerFieldType, ft))
- # element type
- if 'element-type' in node:
- element_type_node = node['element-type']
+ members[name] = StructureFieldTypeMember(ft)
- try:
- obj.element_type = self._create_type(node['element-type'])
- except Exception as e:
- raise ConfigError('cannot create array type\'s element type', e)
-
- return obj
-
- def _create_variant(self, obj, node):
- if obj is None:
- # create variant object
- obj = metadata.Variant()
-
- unk_prop = self._get_first_unknown_type_prop(node, [
- 'tag',
- 'types',
+ members = collections.OrderedDict([
+ (
+ 'packet_size',
+ StructureFieldTypeMember(self._features.packet_features.total_size_field_type)
+ ),
+ (
+ 'content_size',
+ StructureFieldTypeMember(self._features.packet_features.content_size_field_type)
+ )
])
- if unk_prop:
- raise ConfigError('unknown variant type object property: "{}"'.format(unk_prop))
-
- # tag
- if 'tag' in node:
- tag = node['tag']
-
- if not _is_str_prop(tag):
- raise ConfigError('"tag" property of variant type object must be a string')
-
- # do not validate variant tag for the moment; will be done in a
- # second phase
- obj.tag = tag
-
- # element type
- if 'types' in node:
- types = node['types']
-
- if not _is_assoc_array_prop(types):
- raise ConfigError('"types" property of variant type object must be an associative array')
-
- # do not validate type names for the moment; will be done in a
- # second phase
- for type_name, type_node in types.items():
- if not is_valid_identifier(type_name):
- raise ConfigError('"{}" is not a valid type name for variant type'.format(type_name))
-
- try:
- obj.types[type_name] = self._create_type(type_node)
- except Exception as e:
- raise ConfigError('cannot create variant type\'s type "{}"'.format(type_name), e)
-
- return obj
-
- def _create_type(self, type_node):
- if type(type_node) is str:
- t = self._lookup_type_alias(type_node)
-
- if t is None:
- raise ConfigError('unknown type alias "{}"'.format(type_node))
-
- return t
-
- if not _is_assoc_array_prop(type_node):
- raise ConfigError('type objects must be associative arrays or strings (type alias name)')
-
- # inherit:
- # v2.0: "inherit"
- # v2.1+: "$inherit"
- inherit_node = None
-
- if self._version >= 200:
- if 'inherit' in type_node:
- inherit_prop = 'inherit'
- inherit_node = type_node[inherit_prop]
-
- if self._version >= 201:
- if '$inherit' in type_node:
- if inherit_node is not None:
- raise ConfigError('cannot specify both "inherit" and "$inherit" properties of type object: prefer "$inherit"')
-
- inherit_prop = '$inherit'
- inherit_node = type_node[inherit_prop]
-
- if inherit_node is not None and 'class' in type_node:
- raise ConfigError('cannot specify both "{}" and "class" properties in type object'.format(inherit_prop))
-
- if inherit_node is not None:
- if not _is_str_prop(inherit_node):
- raise ConfigError('"{}" property of type object must be a string'.format(inherit_prop))
-
- base = self._lookup_type_alias(inherit_node)
-
- if base is None:
- raise ConfigError('cannot inherit from type alias "{}": type alias does not exist at this point'.format(inherit_node))
-
- func = self._type_to_create_type_func[type(base)]
- else:
- if 'class' not in type_node:
- raise ConfigError('type objects which do not inherit must have a "class" property')
-
- class_name = type_node['class']
-
- if type(class_name) is not str:
- raise ConfigError('type objects\' "class" property must be a string')
-
- if class_name not in self._class_name_to_create_type_func:
- raise ConfigError('unknown type class "{}"'.format(class_name))
-
- base = None
- func = self._class_name_to_create_type_func[class_name]
-
- return func(base, type_node)
-
- def _register_type_aliases(self, metadata_node):
- self._tas = dict()
-
- if 'type-aliases' not in metadata_node:
- return
-
- ta_node = metadata_node['type-aliases']
-
- if ta_node is None:
- return
-
- if not _is_assoc_array_prop(ta_node):
- raise ConfigError('"type-aliases" property (metadata) must be an associative array')
-
- for ta_name, ta_type in ta_node.items():
- if ta_name in self._tas:
- raise ConfigError('duplicate type alias "{}"'.format(ta_name))
-
- try:
- t = self._create_type(ta_type)
- except Exception as e:
- raise ConfigError('cannot create type alias "{}"'.format(ta_name), e)
-
- self._tas[ta_name] = t
-
- def _create_clock(self, node):
- # create clock object
- clock = metadata.Clock()
-
- if not _is_assoc_array_prop(node):
- raise ConfigError('clock objects must be associative arrays')
-
- known_props = [
- 'uuid',
- 'description',
- 'freq',
- 'error-cycles',
- 'offset',
- 'absolute',
- 'return-ctype',
- ]
-
- if self._version >= 201:
- known_props.append('$return-ctype')
-
- unk_prop = _get_first_unknown_prop(node, known_props)
-
- if unk_prop:
- raise ConfigError('unknown clock object property: "{}"'.format(unk_prop))
-
- # UUID
- if 'uuid' in node:
- uuidp = node['uuid']
-
- if uuidp is None:
- clock.set_default_uuid()
- else:
- if not _is_str_prop(uuidp):
- raise ConfigError('"uuid" property of clock object must be a string')
-
- try:
- uuidp = uuid.UUID(uuidp)
- except:
- raise ConfigError('malformed UUID (clock object): "{}"'.format(uuidp))
+ add_member_if_exists('timestamp_begin',
+ self._features.packet_features.beginning_time_field_type, True)
+ add_member_if_exists('timestamp_end', self._features.packet_features.end_time_field_type,
+ True)
+ add_member_if_exists('events_discarded',
+ self._features.packet_features.discarded_events_counter_field_type)
- clock.uuid = uuidp
+ if self._packet_context_field_type_extra_members is not None:
+ for name, field_type in self._packet_context_field_type_extra_members.items():
+ assert name not in members
+ members[name] = field_type
- # description
- if 'description' in node:
- desc = node['description']
+ self._pkt_ctx_ft = StructureFieldType(8, members)
- if desc is None:
- clock.set_default_description()
- else:
- if not _is_str_prop(desc):
- raise ConfigError('"description" property of clock object must be a string')
+ def _set_ev_header_ft(self):
+ members = collections.OrderedDict()
- clock.description = desc
+ if self._features.event_features.type_id_field_type is not None:
+ members['id'] = StructureFieldTypeMember(self._features.event_features.type_id_field_type)
- # frequency
- if 'freq' in node:
- freq = node['freq']
+ if self._features.event_features.time_field_type is not None:
+ ft = self._features.event_features.time_field_type
+ self._set_ft_mapped_clk_type_name(ft)
+ members['timestamp'] = StructureFieldTypeMember(ft)
- if freq is None:
- clock.set_default_freq()
- else:
- if not _is_int_prop(freq):
- raise ConfigError('"freq" property of clock object must be an integer')
+ self._ev_header_ft = StructureFieldType(8, members)
- if freq < 1:
- raise ConfigError('invalid clock frequency: {}'.format(freq))
-
- clock.freq = freq
-
- # error cycles
- if 'error-cycles' in node:
- error_cycles = node['error-cycles']
-
- if error_cycles is None:
- clock.set_default_error_cycles()
- else:
- if not _is_int_prop(error_cycles):
- raise ConfigError('"error-cycles" property of clock object must be an integer')
-
- if error_cycles < 0:
- raise ConfigError('invalid clock error cycles: {}'.format(error_cycles))
-
- clock.error_cycles = error_cycles
-
- # offset
- if 'offset' in node:
- offset = node['offset']
-
- if offset is None:
- self.set_default_offset_seconds()
- self.set_default_offset_cycles()
- else:
- if not _is_assoc_array_prop(offset):
- raise ConfigError('"offset" property of clock object must be an associative array')
-
- unk_prop = _get_first_unknown_prop(offset, ['cycles', 'seconds'])
-
- if unk_prop:
- raise ConfigError('unknown clock object\'s offset property: "{}"'.format(unk_prop))
-
- # cycles
- if 'cycles' in offset:
- offset_cycles = offset['cycles']
-
- if not _is_int_prop(offset_cycles):
- raise ConfigError('"cycles" property of clock object\'s offset property must be an integer')
-
- if offset_cycles < 0:
- raise ConfigError('invalid clock offset cycles: {}'.format(offset_cycles))
-
- clock.offset_cycles = offset_cycles
-
- # seconds
- if 'seconds' in offset:
- offset_seconds = offset['seconds']
-
- if not _is_int_prop(offset_seconds):
- raise ConfigError('"seconds" property of clock object\'s offset property must be an integer')
-
- if offset_seconds < 0:
- raise ConfigError('invalid clock offset seconds: {}'.format(offset_seconds))
-
- clock.offset_seconds = offset_seconds
-
- # absolute
- if 'absolute' in node:
- absolute = node['absolute']
-
- if absolute is None:
- clock.set_default_absolute()
- else:
- if not _is_bool_prop(absolute):
- raise ConfigError('"absolute" property of clock object must be a boolean')
-
- clock.absolute = absolute
-
- # return C type:
- # v2.0: "return-ctype"
- # v2.1+: "$return-ctype"
- return_ctype_node = None
-
- if self._version >= 200:
- if 'return-ctype' in node:
- return_ctype_prop = 'return-ctype'
- return_ctype_node = node[return_ctype_prop]
-
- if self._version >= 201:
- if '$return-ctype' in node:
- if return_ctype_node is not None:
- raise ConfigError('cannot specify both "return-ctype" and "$return-ctype" properties of clock object: prefer "$return-ctype"')
-
- return_ctype_prop = '$return-ctype'
- return_ctype_node = node[return_ctype_prop]
-
- if return_ctype_node is not None:
- if return_ctype_node is None:
- clock.set_default_return_ctype()
- else:
- if not _is_str_prop(return_ctype_node):
- raise ConfigError('"{}" property of clock object must be a string'.format(return_ctype_prop))
-
- clock.return_ctype = return_ctype_node
-
- return clock
-
- def _register_clocks(self, metadata_node):
- self._clocks = collections.OrderedDict()
-
- if 'clocks' not in metadata_node:
- return
-
- clocks_node = metadata_node['clocks']
-
- if clocks_node is None:
- return
-
- if not _is_assoc_array_prop(clocks_node):
- raise ConfigError('"clocks" property (metadata) must be an associative array')
+ @property
+ def id(self) -> Optional[Id]:
+ return self._id
- for clock_name, clock_node in clocks_node.items():
- if not is_valid_identifier(clock_name):
- raise ConfigError('invalid clock name: "{}"'.format(clock_name))
+ @property
+ def name(self) -> str:
+ return self._name
- if clock_name in self._clocks:
- raise ConfigError('duplicate clock "{}"'.format(clock_name))
+ @property
+ def default_clock_type(self) -> Optional[ClockType]:
+ return self._default_clock_type
- try:
- clock = self._create_clock(clock_node)
- except Exception as e:
- raise ConfigError('cannot create clock "{}"'.format(clock_name), e)
+ @property
+ def features(self) -> StreamTypeFeatures:
+ return self._features
- clock.name = clock_name
- self._clocks[clock_name] = clock
+ @property
+ def packet_context_field_type_extra_members(self) -> StructureFieldTypeMembers:
+ return self._packet_context_field_type_extra_members
- def _create_env(self, metadata_node):
- env = collections.OrderedDict()
+ @property
+ def event_common_context_field_type(self) -> _OptStructFt:
+ return self._event_common_context_field_type
- if 'env' not in metadata_node:
- return env
+ @property
+ def event_types(self) -> FrozenSet[EventType]:
+ return self._event_types
- env_node = metadata_node['env']
- if env_node is None:
- return env
+_OptUuidFt = Optional[Union[str, StaticArrayFieldType]]
- if not _is_assoc_array_prop(env_node):
- raise ConfigError('"env" property (metadata) must be an associative array')
- for env_name, env_value in env_node.items():
- if env_name in env:
- raise ConfigError('duplicate environment variable "{}"'.format(env_name))
+class TraceTypeFeatures:
+ def __init__(self, magic_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ uuid_field_type: _OptUuidFt = None,
+ stream_type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE):
+ def get_field_type(user_ft: Optional[Union[str, _FieldType]], default_ft: _FieldType) -> _OptFt:
+ if user_ft == DEFAULT_FIELD_TYPE:
+ return default_ft
- if not is_valid_identifier(env_name):
- raise ConfigError('invalid environment variable name: "{}"'.format(env_name))
+ return typing.cast(_OptFt, user_ft)
- if not _is_int_prop(env_value) and not _is_str_prop(env_value):
- raise ConfigError('invalid environment variable value ("{}"): expecting integer or string'.format(env_name))
+ self._magic_field_type = typing.cast(_OptUIntFt, get_field_type(magic_field_type,
+ UnsignedIntegerFieldType(32)))
+ self._uuid_field_type = typing.cast(Optional[StaticArrayFieldType], get_field_type(uuid_field_type,
+ StaticArrayFieldType(Count(16),
+ UnsignedIntegerFieldType(8))))
+ self._stream_type_id_field_type = typing.cast(_OptUIntFt, get_field_type(stream_type_id_field_type,
+ UnsignedIntegerFieldType(64)))
- env[env_name] = env_value
+ @property
+ def magic_field_type(self) -> _OptUIntFt:
+ return self._magic_field_type
- return env
+ @property
+ def uuid_field_type(self) -> Optional[StaticArrayFieldType]:
+ return self._uuid_field_type
- def _register_log_levels(self, metadata_node):
- self._log_levels = dict()
+ @property
+ def stream_type_id_field_type(self) -> _OptUIntFt:
+ return self._stream_type_id_field_type
- # log levels:
- # v2.0: "log-levels"
- # v2.1+: "$log-levels"
- log_levels_node = None
- if self._version >= 200:
- if 'log-levels' in metadata_node:
- log_levels_prop = 'log-levels'
- log_levels_node = metadata_node[log_levels_prop]
+class TraceType:
+ def __init__(self, stream_types: Set[StreamType], default_byte_order: ByteOrder,
+ uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None):
+ self._default_byte_order = default_byte_order
+ self._stream_types = frozenset(stream_types)
- if self._version >= 201:
- if '$log-levels' in metadata_node:
- if log_levels_node is not None:
- raise ConfigError('cannot specify both "log-levels" and "$log-levels" properties of metadata object: prefer "$log-levels"')
+ # assign unique IDs
+ for index, stream_type in enumerate(sorted(self._stream_types, key=lambda st: st.name)):
+ assert stream_type._id is None
+ stream_type._id = Id(index)
- log_levels_prop = '$log-levels'
- log_levels_node = metadata_node[log_levels_prop]
+ self._uuid = uuid
+ self._set_features(features)
+ self._set_pkt_header_ft()
+ self._set_fts_effective_byte_order()
- if log_levels_node is None:
+ def _set_features(self, features: Optional[TraceTypeFeatures]):
+ if features is not None:
+ self._features = features
return
- if not _is_assoc_array_prop(log_levels_node):
- raise ConfigError('"{}" property (metadata) must be an associative array'.format(log_levels_prop))
-
- for ll_name, ll_value in log_levels_node.items():
- if ll_name in self._log_levels:
- raise ConfigError('duplicate log level entry "{}"'.format(ll_name))
-
- if not _is_int_prop(ll_value):
- raise ConfigError('invalid log level entry ("{}"): expecting an integer'.format(ll_name))
-
- if ll_value < 0:
- raise ConfigError('invalid log level entry ("{}"): log level value must be positive'.format(ll_name))
-
- self._log_levels[ll_name] = ll_value
+ # automatic UUID field type because the trace type has a UUID
+ uuid_ft = None if self._uuid is None else DEFAULT_FIELD_TYPE
+ self._features = TraceTypeFeatures(uuid_field_type=uuid_ft)
+
+ def _set_pkt_header_ft(self):
+ members = collections.OrderedDict()
+
+ def add_member_if_exists(name: str, ft: _OptFt):
+ nonlocal members
+
+ if ft is not None:
+ members[name] = StructureFieldTypeMember(ft)
+
+ add_member_if_exists('magic', self._features.magic_field_type)
+ add_member_if_exists('uuid', self._features.uuid_field_type)
+ add_member_if_exists('stream_id', self._features.stream_type_id_field_type)
+ self._pkt_header_ft = StructureFieldType(8, members)
+
+ def _set_fts_effective_byte_order(self):
+ def set_ft_effective_byte_order(ft: _OptFt):
+ if ft is None:
+ return
+
+ if isinstance(ft, _BitArrayFieldType):
+ if ft._byte_order is None:
+ assert self._default_byte_order is not None
+ ft._byte_order = self._default_byte_order
+ elif isinstance(ft, StaticArrayFieldType):
+ set_ft_effective_byte_order(ft.element_field_type)
+ elif isinstance(ft, StructureFieldType):
+ for member in ft.members.values():
+ set_ft_effective_byte_order(member.field_type)
+
+ # packet header field type
+ set_ft_effective_byte_order(self._pkt_header_ft)
+
+ # stream type field types
+ for stream_type in self._stream_types:
+ set_ft_effective_byte_order(stream_type._pkt_ctx_ft)
+ set_ft_effective_byte_order(stream_type._ev_header_ft)
+ set_ft_effective_byte_order(stream_type._event_common_context_field_type)
+
+ # event type field types
+ for ev_type in stream_type.event_types:
+ set_ft_effective_byte_order(ev_type._specific_context_field_type)
+ set_ft_effective_byte_order(ev_type._payload_field_type)
- def _create_trace(self, metadata_node):
- # create trace object
- trace = metadata.Trace()
-
- if 'trace' not in metadata_node:
- raise ConfigError('missing "trace" property (metadata)')
-
- trace_node = metadata_node['trace']
-
- if not _is_assoc_array_prop(trace_node):
- raise ConfigError('"trace" property (metadata) must be an associative array')
-
- unk_prop = _get_first_unknown_prop(trace_node, [
- 'byte-order',
- 'uuid',
- 'packet-header-type',
- ])
-
- if unk_prop:
- raise ConfigError('unknown trace object property: "{}"'.format(unk_prop))
-
- # set byte order (already parsed)
- trace.byte_order = self._bo
-
- # UUID
- if 'uuid' in trace_node and trace_node['uuid'] is not None:
- uuidp = trace_node['uuid']
-
- if not _is_str_prop(uuidp):
- raise ConfigError('"uuid" property of trace object must be a string')
-
- if uuidp == 'auto':
- uuidp = uuid.uuid1()
- else:
- try:
- uuidp = uuid.UUID(uuidp)
- except:
- raise ConfigError('malformed UUID (trace object): "{}"'.format(uuidp))
-
- trace.uuid = uuidp
-
- # packet header type
- if 'packet-header-type' in trace_node and trace_node['packet-header-type'] is not None:
- try:
- ph_type = self._create_type(trace_node['packet-header-type'])
- except Exception as e:
- raise ConfigError('cannot create packet header type (trace)', e)
-
- trace.packet_header_type = ph_type
-
- return trace
-
- def _lookup_log_level(self, ll):
- if _is_int_prop(ll):
- return ll
- elif _is_str_prop(ll) and ll in self._log_levels:
- return self._log_levels[ll]
-
- def _create_event(self, event_node):
- event = metadata.Event()
-
- if not _is_assoc_array_prop(event_node):
- raise ConfigError('event objects must be associative arrays')
-
- unk_prop = _get_first_unknown_prop(event_node, [
- 'log-level',
- 'context-type',
- 'payload-type',
- ])
-
- if unk_prop:
- raise ConfigError('unknown event object property: "{}"'.format(unk_prop))
-
- if 'log-level' in event_node and event_node['log-level'] is not None:
- ll_node = event_node['log-level']
+ @property
+ def default_byte_order(self) -> ByteOrder:
+ return self._default_byte_order
- if _is_str_prop(ll_node):
- ll_value = self._lookup_log_level(event_node['log-level'])
+ @property
+ def uuid(self) -> _OptUuid:
+ return self._uuid
- if ll_value is None:
- raise ConfigError('cannot find log level "{}"'.format(ll_node))
+ @property
+ def stream_types(self) -> FrozenSet[StreamType]:
+ return self._stream_types
- ll = metadata.LogLevel(event_node['log-level'], ll_value)
- elif _is_int_prop(ll_node):
- if ll_node < 0:
- raise ConfigError('invalid log level value {}: value must be positive'.format(ll_node))
+ def stream_type(self, name: str) -> Optional[StreamType]:
+ for cand_stream_type in self._stream_types:
+ if cand_stream_type.name == name:
+ return cand_stream_type
- ll = metadata.LogLevel(None, ll_node)
- else:
- raise ConfigError('"log-level" property must be either a string or an integer')
+ return None
- event.log_level = ll
+ @property
+ def features(self) -> TraceTypeFeatures:
+ return self._features
- if 'context-type' in event_node and event_node['context-type'] is not None:
- ctx_type_node = event_node['context-type']
- try:
- t = self._create_type(event_node['context-type'])
- except Exception as e:
- raise ConfigError('cannot create event\'s context type object', e)
+_EnvEntry = Union[str, int]
+_EnvEntries = Mapping[str, _EnvEntry]
- event.context_type = t
- if 'payload-type' not in event_node:
- raise ConfigError('missing "payload-type" property in event object')
+class TraceEnvironment(collections.abc.Mapping):
+ def __init__(self, environment: _EnvEntries):
+ self._env = {name: value for name, value in environment.items()}
- if event_node['payload-type'] is not None:
- try:
- t = self._create_type(event_node['payload-type'])
- except Exception as e:
- raise ConfigError('cannot create event\'s payload type object', e)
+ def __getitem__(self, key: str) -> _EnvEntry:
+ return self._env[key]
- event.payload_type = t
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._env)
- return event
+ def __len__(self) -> int:
+ return len(self._env)
- def _create_stream(self, stream_node):
- stream = metadata.Stream()
- if not _is_assoc_array_prop(stream_node):
- raise ConfigError('stream objects must be associative arrays')
+class Trace:
+ def __init__(self, type: TraceType, environment: Optional[_EnvEntries] = None):
+ self._type = type
+ self._set_env(environment)
- unk_prop = _get_first_unknown_prop(stream_node, [
- 'packet-context-type',
- 'event-header-type',
- 'event-context-type',
- 'events',
+ def _set_env(self, environment: Optional[_EnvEntries]):
+ init_env = collections.OrderedDict([
+ ('domain', 'bare'),
+ ('tracer_name', 'barectf'),
+ ('tracer_major', barectf_version.__major_version__),
+ ('tracer_minor', barectf_version.__minor_version__),
+ ('tracer_patch', barectf_version.__patch_version__),
+ ('barectf_gen_date', str(datetime.datetime.now().isoformat())),
])
- if unk_prop:
- raise ConfigError('unknown stream object property: "{}"'.format(unk_prop))
-
- if 'packet-context-type' in stream_node and stream_node['packet-context-type'] is not None:
- try:
- t = self._create_type(stream_node['packet-context-type'])
- except Exception as e:
- raise ConfigError('cannot create stream\'s packet context type object', e)
-
- stream.packet_context_type = t
-
- if 'event-header-type' in stream_node and stream_node['event-header-type'] is not None:
- try:
- t = self._create_type(stream_node['event-header-type'])
- except Exception as e:
- raise ConfigError('cannot create stream\'s event header type object', e)
-
- stream.event_header_type = t
-
- if 'event-context-type' in stream_node and stream_node['event-context-type'] is not None:
- try:
- t = self._create_type(stream_node['event-context-type'])
- except Exception as e:
- raise ConfigError('cannot create stream\'s event context type object', e)
-
- stream.event_context_type = t
-
- if 'events' not in stream_node:
- raise ConfigError('missing "events" property in stream object')
-
- events = stream_node['events']
-
- if events is not None:
- if not _is_assoc_array_prop(events):
- raise ConfigError('"events" property of stream object must be an associative array')
-
- if not events:
- raise ConfigError('at least one event is needed within a stream object')
-
- cur_id = 0
-
- for ev_name, ev_node in events.items():
- try:
- ev = self._create_event(ev_node)
- except Exception as e:
- raise ConfigError('cannot create event "{}"'.format(ev_name), e)
-
- ev.id = cur_id
- ev.name = ev_name
- stream.events[ev_name] = ev
- cur_id += 1
-
- return stream
-
- def _create_streams(self, metadata_node):
- streams = collections.OrderedDict()
-
- if 'streams' not in metadata_node:
- raise ConfigError('missing "streams" property (metadata)')
-
- streams_node = metadata_node['streams']
-
- if not _is_assoc_array_prop(streams_node):
- raise ConfigError('"streams" property (metadata) must be an associative array')
-
- if not streams_node:
- raise ConfigError('at least one stream is needed (metadata)')
-
- cur_id = 0
-
- for stream_name, stream_node in streams_node.items():
- try:
- stream = self._create_stream(stream_node)
- except Exception as e:
- raise ConfigError('cannot create stream "{}"'.format(stream_name), e)
-
- stream.id = cur_id
- stream.name = str(stream_name)
- streams[stream_name] = stream
- cur_id += 1
+ if environment is None:
+ environment = {}
- return streams
+ init_env.update(environment)
+ self._env = TraceEnvironment(typing.cast(_EnvEntries, init_env))
- def _create_metadata(self, root):
- meta = metadata.Metadata()
-
- if 'metadata' not in root:
- raise ConfigError('missing "metadata" property (configuration)')
-
- metadata_node = root['metadata']
-
- if not _is_assoc_array_prop(metadata_node):
- raise ConfigError('"metadata" property (configuration) must be an associative array')
-
- known_props = [
- 'type-aliases',
- 'log-levels',
- 'trace',
- 'env',
- 'clocks',
- 'streams',
- ]
+ @property
+ def type(self) -> TraceType:
+ return self._type
- if self._version >= 201:
- known_props.append('$log-levels')
+ @property
+ def environment(self) -> TraceEnvironment:
+ return self._env
- unk_prop = _get_first_unknown_prop(metadata_node, known_props)
- if unk_prop:
- add = ''
+_ClkTypeCTypes = Mapping[ClockType, str]
- if unk_prop == '$include':
- add = ' (use version 2.1 or greater)'
- raise ConfigError('unknown metadata property{}: "{}"'.format(add, unk_prop))
+class ClockTypeCTypes(collections.abc.Mapping):
+ def __init__(self, c_types: _ClkTypeCTypes):
+ self._c_types = {clk_type: c_type for clk_type, c_type in c_types.items()}
- self._set_byte_order(metadata_node)
- self._register_clocks(metadata_node)
- meta.clocks = self._clocks
- self._register_type_aliases(metadata_node)
- meta.env = self._create_env(metadata_node)
- meta.trace = self._create_trace(metadata_node)
- self._register_log_levels(metadata_node)
- meta.streams = self._create_streams(metadata_node)
+ def __getitem__(self, key: ClockType) -> str:
+ return self._c_types[key]
- return meta
+ def __iter__(self) -> Iterator[ClockType]:
+ return iter(self._c_types)
- def _get_version(self, root):
- if 'version' not in root:
- raise ConfigError('missing "version" property (configuration)')
+ def __len__(self) -> int:
+ return len(self._c_types)
- version_node = root['version']
- if not _is_str_prop(version_node):
- raise ConfigError('"version" property (configuration) must be a string')
+class ConfigurationCodeGenerationHeaderOptions:
+ def __init__(self, identifier_prefix_definition: bool = False,
+ default_stream_type_name_definition: bool = False):
+ self._identifier_prefix_definition = identifier_prefix_definition
+ self._default_stream_type_name_definition = default_stream_type_name_definition
- version_node = version_node.strip()
+ @property
+ def identifier_prefix_definition(self) -> bool:
+ return self._identifier_prefix_definition
- if version_node not in ['2.0', '2.1']:
- raise ConfigError('unsupported version ({}): versions 2.0 and 2.1 are supported'.format(version_node))
+ @property
+ def default_stream_type_name_definition(self) -> bool:
+ return self._default_stream_type_name_definition
- # convert version string to comparable version integer
- parts = version_node.split('.')
- version = int(parts[0]) * 100 + int(parts[1])
- return version
+class ConfigurationCodeGenerationOptions:
+ def __init__(self, identifier_prefix: str = 'barectf_', file_name_prefix: str = 'barectf',
+ default_stream_type: Optional[StreamType] = None,
+ header_options: Optional[ConfigurationCodeGenerationHeaderOptions] = None,
+ clock_type_c_types: Optional[_ClkTypeCTypes] = None):
+ self._identifier_prefix = identifier_prefix
+ self._file_name_prefix = file_name_prefix
+ self._default_stream_type = default_stream_type
- def _get_prefix(self, root):
- def_prefix = 'barectf_'
+ self._header_options = ConfigurationCodeGenerationHeaderOptions()
- if 'prefix' not in root:
- return def_prefix
+ if header_options is not None:
+ self._header_options = header_options
- prefix_node = root['prefix']
+ self._clock_type_c_types = ClockTypeCTypes({})
- if prefix_node is None:
- return def_prefix
+ if clock_type_c_types is not None:
+ self._clock_type_c_types = ClockTypeCTypes(clock_type_c_types)
- if not _is_str_prop(prefix_node):
- raise ConfigError('"prefix" property (configuration) must be a string')
+ @property
+ def identifier_prefix(self) -> str:
+ return self._identifier_prefix
- if not is_valid_identifier(prefix_node):
- raise ConfigError('"prefix" property (configuration) must be a valid C identifier')
+ @property
+ def file_name_prefix(self) -> str:
+ return self._file_name_prefix
- return prefix_node
+ @property
+ def default_stream_type(self) -> Optional[StreamType]:
+ return self._default_stream_type
- def _get_last_include_file(self):
- if self._include_stack:
- return self._include_stack[-1]
+ @property
+ def header_options(self) -> ConfigurationCodeGenerationHeaderOptions:
+ return self._header_options
- return self._root_yaml_path
+ @property
+ def clock_type_c_types(self) -> ClockTypeCTypes:
+ return self._clock_type_c_types
- def _load_include(self, yaml_path):
- for inc_dir in self._include_dirs:
- # current include dir + file name path
- # note: os.path.join() only takes the last arg if it's absolute
- inc_path = os.path.join(inc_dir, yaml_path)
- # real path (symbolic links resolved)
- real_path = os.path.realpath(inc_path)
+class ConfigurationOptions:
+ def __init__(self,
+ code_generation_options: Optional[ConfigurationCodeGenerationOptions] = None):
+ self._code_generation_options = ConfigurationCodeGenerationOptions()
- # normalized path (weird stuff removed!)
- norm_path = os.path.normpath(real_path)
+ if code_generation_options is not None:
+ self._code_generation_options = code_generation_options
- if not os.path.isfile(norm_path):
- # file does not exist: skip
- continue
+ @property
+ def code_generation_options(self) -> ConfigurationCodeGenerationOptions:
+ return self._code_generation_options
- if norm_path in self._include_stack:
- base_path = self._get_last_include_file()
- raise ConfigError('in "{}": cannot recursively include file "{}"'.format(base_path, norm_path))
- self._include_stack.append(norm_path)
+class Configuration:
+ def __init__(self, trace: Trace, options: Optional[ConfigurationOptions] = None):
+ self._trace = trace
+ self._options = ConfigurationOptions()
- # load raw content
- return self._yaml_ordered_load(norm_path)
+ if options is not None:
+ self._options = options
- if not self._ignore_include_not_found:
- base_path = self._get_last_include_file()
- raise ConfigError('in "{}": cannot include file "{}": file not found in include directories'.format(base_path, yaml_path))
+ clk_type_c_types = self._options.code_generation_options.clock_type_c_types
- return None
+ for stream_type in trace.type.stream_types:
+ def_clk_type = stream_type.default_clock_type
- def _get_include_paths(self, include_node):
- if _is_str_prop(include_node):
- return [include_node]
- elif _is_array_prop(include_node):
- for include_path in include_node:
- if not _is_str_prop(include_path):
- raise ConfigError('invalid include property: expecting array of strings')
-
- return include_node
-
- raise ConfigError('invalid include property: expecting string or array of strings')
-
- def _update_node(self, base_node, overlay_node):
- for olay_key, olay_value in overlay_node.items():
- if olay_key in base_node:
- base_value = base_node[olay_key]
-
- if _is_assoc_array_prop(olay_value) and _is_assoc_array_prop(base_value):
- # merge dictionaries
- self._update_node(base_value, olay_value)
- elif _is_array_prop(olay_value) and _is_array_prop(base_value):
- # append extension array items to base items
- base_value += olay_value
- else:
- # fall back to replacing
- base_node[olay_key] = olay_value
- else:
- base_node[olay_key] = olay_value
-
- def _process_node_include(self, last_overlay_node, name,
- process_base_include_cb,
- process_children_include_cb=None):
- if not _is_assoc_array_prop(last_overlay_node):
- raise ConfigError('{} objects must be associative arrays'.format(name))
-
- # process children inclusions first
- if process_children_include_cb:
- process_children_include_cb(last_overlay_node)
-
- if '$include' in last_overlay_node:
- include_node = last_overlay_node['$include']
- else:
- # no includes!
- return last_overlay_node
-
- include_paths = self._get_include_paths(include_node)
- cur_base_path = self._get_last_include_file()
- base_node = None
-
- # keep the include paths and remove the include property
- include_paths = copy.deepcopy(include_paths)
- del last_overlay_node['$include']
-
- for include_path in include_paths:
- # load raw YAML from included file
- overlay_node = self._load_include(include_path)
-
- if overlay_node is None:
- # cannot find include file, but we're ignoring those
- # errors, otherwise _load_include() itself raises
- # a config error
+ if def_clk_type is None:
continue
- # recursively process includes
- try:
- overlay_node = process_base_include_cb(overlay_node)
- except Exception as e:
- raise ConfigError('in "{}"'.format(cur_base_path), e)
-
- # pop include stack now that we're done including
- del self._include_stack[-1]
-
- # at this point, base_node is fully resolved (does not
- # contain any include property)
- if base_node is None:
- base_node = overlay_node
- else:
- self._update_node(base_node, overlay_node)
-
- # finally, we update the latest base node with our last overlay
- # node
- if base_node is None:
- # nothing was included, which is possible when we're
- # ignoring include errors
- return last_overlay_node
-
- self._update_node(base_node, last_overlay_node)
-
- return base_node
-
- def _process_event_include(self, event_node):
- return self._process_node_include(event_node, 'event',
- self._process_event_include)
-
- def _process_stream_include(self, stream_node):
- def process_children_include(stream_node):
- if 'events' in stream_node:
- events_node = stream_node['events']
-
- if not _is_assoc_array_prop(events_node):
- raise ConfigError('"events" property must be an associative array')
-
- events_node_keys = list(events_node.keys())
-
- for key in events_node_keys:
- event_node = events_node[key]
-
- try:
- events_node[key] = self._process_event_include(event_node)
- except Exception as e:
- raise ConfigError('cannot process includes of event object "{}"'.format(key), e)
-
- return self._process_node_include(stream_node, 'stream',
- self._process_stream_include,
- process_children_include)
-
- def _process_trace_include(self, trace_node):
- return self._process_node_include(trace_node, 'trace',
- self._process_trace_include)
-
- def _process_clock_include(self, clock_node):
- return self._process_node_include(clock_node, 'clock',
- self._process_clock_include)
+ if def_clk_type not in clk_type_c_types:
+ clk_type_c_types._c_types[def_clk_type] = 'uint32_t'
- def _process_metadata_include(self, metadata_node):
- def process_children_include(metadata_node):
- if 'trace' in metadata_node:
- metadata_node['trace'] = self._process_trace_include(metadata_node['trace'])
-
- if 'clocks' in metadata_node:
- clocks_node = metadata_node['clocks']
-
- if not _is_assoc_array_prop(clocks_node):
- raise ConfigError('"clocks" property (metadata) must be an associative array')
-
- clocks_node_keys = list(clocks_node.keys())
-
- for key in clocks_node_keys:
- clock_node = clocks_node[key]
-
- try:
- clocks_node[key] = self._process_clock_include(clock_node)
- except Exception as e:
- raise ConfigError('cannot process includes of clock object "{}"'.format(key), e)
-
- if 'streams' in metadata_node:
- streams_node = metadata_node['streams']
-
- if not _is_assoc_array_prop(streams_node):
- raise ConfigError('"streams" property (metadata) must be an associative array')
-
- streams_node_keys = list(streams_node.keys())
-
- for key in streams_node_keys:
- stream_node = streams_node[key]
-
- try:
- streams_node[key] = self._process_stream_include(stream_node)
- except Exception as e:
- raise ConfigError('cannot process includes of stream object "{}"'.format(key), e)
-
- return self._process_node_include(metadata_node, 'metadata',
- self._process_metadata_include,
- process_children_include)
-
- def _process_root_includes(self, root):
- # The following config objects support includes:
- #
- # * Metadata object
- # * Trace object
- # * Stream object
- # * Event object
- #
- # We need to process the event includes first, then the stream
- # includes, then the trace includes, and finally the metadata
- # includes.
- #
- # In each object, only one of the $include and $include-replace
- # special properties is allowed.
- #
- # We keep a stack of absolute paths to included files to detect
- # recursion.
- if 'metadata' in root:
- root['metadata'] = self._process_metadata_include(root['metadata'])
-
- return root
-
- def _yaml_ordered_dump(self, node, **kwds):
- class ODumper(yaml.Dumper):
- pass
-
- def dict_representer(dumper, node):
- return dumper.represent_mapping(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
- node.items())
-
- ODumper.add_representer(collections.OrderedDict, dict_representer)
-
- return yaml.dump(node, Dumper=ODumper, **kwds)
-
- def _yaml_ordered_load(self, yaml_path):
- class OLoader(yaml.Loader):
- pass
-
- def construct_mapping(loader, node):
- loader.flatten_mapping(node)
-
- return collections.OrderedDict(loader.construct_pairs(node))
-
- OLoader.add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
- construct_mapping)
-
- # YAML -> Python
- try:
- with open(yaml_path, 'r') as f:
- node = yaml.load(f, OLoader)
- except (OSError, IOError) as e:
- raise ConfigError('cannot open file "{}"'.format(yaml_path))
- except Exception as e:
- raise ConfigError('unknown error while trying to load file "{}"'.format(yaml_path), e)
-
- # loaded node must be an associate array
- if not _is_assoc_array_prop(node):
- raise ConfigError('root of YAML file "{}" must be an associative array'.format(yaml_path))
-
- return node
-
- def _reset(self):
- self._version = None
- self._include_stack = []
-
- def parse(self, yaml_path):
- self._reset()
- self._root_yaml_path = yaml_path
-
- try:
- root = self._yaml_ordered_load(yaml_path)
- except Exception as e:
- raise ConfigError('cannot parse YAML file "{}"'.format(yaml_path), e)
-
- if not _is_assoc_array_prop(root):
- raise ConfigError('configuration must be an associative array')
-
- unk_prop = _get_first_unknown_prop(root, [
- 'version',
- 'prefix',
- 'metadata',
- ])
-
- if unk_prop:
- raise ConfigError('unknown configuration property: "{}"'.format(unk_prop))
-
- # get the config version
- self._version = self._get_version(root)
-
- # process includes if supported
- if self._version >= 201:
- root = self._process_root_includes(root)
-
- # dump config if required
- if self._dump_config:
- print(self._yaml_ordered_dump(root, indent=2,
- default_flow_style=False))
-
- # get prefix and metadata
- prefix = self._get_prefix(root)
- meta = self._create_metadata(root)
-
- return Config(self._version, prefix, meta)
-
-
-def from_yaml_file(path, include_dirs, ignore_include_not_found, dump_config):
- try:
- parser = _YamlConfigParser(include_dirs, ignore_include_not_found,
- dump_config)
- cfg = parser.parse(path)
+ @property
+ def trace(self) -> Trace:
+ return self._trace
- return cfg
- except Exception as e:
- raise ConfigError('cannot create configuration from YAML file "{}"'.format(path), e)
+ @property
+ def options(self) -> ConfigurationOptions:
+ return self._options