import barectf.config as barectf_config
import barectf.config_file as barectf_config_file
import barectf.gen as barectf_gen
+import barectf.typing as barectf_typing
# version API
__version__ = barectf_version.__version__
+# common typing API
+Index = barectf_typing.Index
+Count = barectf_typing.Count
+Id = barectf_typing.Id
+Alignment = barectf_typing.Alignment
+VersionNumber = barectf_typing.VersionNumber
+
+
# configuration API
_ArrayFieldType = barectf_config._ArrayFieldType
_BitArrayFieldType = barectf_config._BitArrayFieldType
EnumerationFieldTypeMappingRange = barectf_config.EnumerationFieldTypeMappingRange
EnumerationFieldTypeMappings = barectf_config.EnumerationFieldTypeMappings
EventType = barectf_config.EventType
+LogLevel = barectf_config.LogLevel
RealFieldType = barectf_config.RealFieldType
SignedEnumerationFieldType = barectf_config.SignedEnumerationFieldType
SignedIntegerFieldType = barectf_config.SignedIntegerFieldType
del barectf_config
del barectf_config_file
del barectf_gen
+del barectf_typing
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import re
-import collections
+import typing
+from typing import Optional, List, Iterable
+from barectf.typing import Index, _OptStr
-__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse']
+__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse', 'OrigArgs']
+
+
+# types
+OrigArgs = List[str]
# Option descriptor.
#
# If `has_arg` is `True`, then it is expected that such an option
# has an argument.
- def __init__(self, short_name=None, long_name=None, has_arg=False):
+ def __init__(self, short_name: _OptStr = None, long_name: _OptStr = None,
+ has_arg: bool = False):
assert short_name is not None or long_name is not None
self._short_name = short_name
self._long_name = long_name
self._has_arg = has_arg
@property
- def short_name(self):
+ def short_name(self) -> _OptStr:
return self._short_name
@property
- def long_name(self):
+ def long_name(self) -> _OptStr:
return self._long_name
@property
- def has_arg(self):
+ def has_arg(self) -> Optional[bool]:
return self._has_arg
# Parsed option argument item.
class _OptItem(_Item):
- def __init__(self, descr, arg_text=None):
+ def __init__(self, descr: OptDescr, arg_text: _OptStr = None):
self._descr = descr
self._arg_text = arg_text
@property
- def descr(self):
+ def descr(self) -> OptDescr:
return self._descr
@property
- def arg_text(self):
+ def arg_text(self) -> _OptStr:
return self._arg_text
# Parsed non-option argument item.
class _NonOptItem(_Item):
- def __init__(self, text, orig_arg_index, non_opt_index):
+ def __init__(self, text: str, orig_arg_index: Index, non_opt_index: Index):
self._text = text
self._orig_arg_index = orig_arg_index
self._non_opt_index = non_opt_index
@property
- def text(self):
+ def text(self) -> str:
return self._text
@property
- def orig_arg_index(self):
+ def orig_arg_index(self) -> Index:
return self._orig_arg_index
@property
- def non_opt_index(self):
+ def non_opt_index(self) -> Index:
return self._non_opt_index
# Results of parse().
class _ParseRes:
- def __init__(self, items, ingested_orig_args, remaining_orig_args):
+ def __init__(self, items: List[_Item], ingested_orig_args: OrigArgs,
+ remaining_orig_args: OrigArgs):
self._items = items
self._ingested_orig_args = ingested_orig_args
self._remaining_orig_args = remaining_orig_args
@property
- def items(self):
+ def items(self) -> List[_Item]:
return self._items
@property
- def ingested_orig_args(self):
+ def ingested_orig_args(self) -> OrigArgs:
return self._ingested_orig_args
@property
- def remaining_orig_args(self):
+ def remaining_orig_args(self) -> OrigArgs:
return self._remaining_orig_args
# Parsing error.
class _Error(Exception):
- def __init__(self, orig_arg_index, orig_arg, msg):
+ def __init__(self, orig_arg_index: Index, orig_arg: str, msg: str):
super().__init__(msg)
self._orig_arg_index = orig_arg_index
self._orig_arg = orig_arg
self._msg = msg
@property
- def orig_arg_index(self):
+ def orig_arg_index(self) -> Index:
return self._orig_arg_index
@property
- def orig_arg(self):
+ def orig_arg(self) -> str:
return self._orig_arg
@property
- def msg(self):
+ def msg(self) -> str:
return self._msg
# Results of parse_short_opts() and parse_long_opt(); internal.
-_OptParseRes = collections.namedtuple('_OptParseRes', ['items', 'orig_arg_index_incr'])
+class _OptParseRes(typing.NamedTuple):
+ items: List[_Item]
+ orig_arg_index_incr: int
# Parses the original arguments `orig_args` (list of strings),
# resulting option items.
#
# On failure, this function raises an `_Error` object.
-def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
+def parse(orig_args: OrigArgs, opt_descrs: Iterable[OptDescr],
+ fail_on_unknown_opt: bool = True) -> _ParseRes:
# Finds and returns an option description amongst `opt_descrs`
# having the short option name `short_name` OR the long option name
# `long_name` (not both).
- def find_opt_descr(short_name=None, long_name=None):
+ def find_opt_descr(short_name: _OptStr = None,
+ long_name: _OptStr = None) -> Optional[OptDescr]:
for opt_descr in opt_descrs:
if short_name is not None and short_name == opt_descr.short_name:
return opt_descr
if long_name is not None and long_name == opt_descr.long_name:
return opt_descr
+ return None
+
# Parses a short option original argument, returning an
# `_OptParseRes` object.
#
# If any of the short options of `orig_arg` is unknown, then this
# function raises an error if `fail_on_unknown_opt` is `True`, or
# returns `None` otherwise.
- def parse_short_opts():
+ def parse_short_opts() -> Optional[_OptParseRes]:
short_opts = orig_arg[1:]
- items = []
+ items: List[_Item] = []
done = False
index = 0
orig_arg_index_incr = 1
raise _Error(orig_arg_index, orig_arg, f'Unknown short option `-{short_opt}`')
# discard collected arguments
- return
+ return None
opt_arg = None
#
# If the long option is unknown, then this function raises an error
# if `fail_on_unknown_opt` is `True`, or returns `None` otherwise.
- def parse_long_opt():
+ def parse_long_opt() -> Optional[_OptParseRes]:
long_opt = orig_arg[2:]
m = re.match(r'--([^=]+)=(.*)', orig_arg)
raise _Error(orig_arg_index, orig_arg, f'Unknown long option `--{long_opt}`')
# discard
- return
+ return None
orig_arg_index_incr = 1
return _OptParseRes([item], orig_arg_index_incr)
# parse original arguments
- items = []
- orig_arg_index = 0
- non_opt_index = 0
+ items: List[_Item] = []
+ orig_arg_index = Index(0)
+ non_opt_index = Index(0)
while orig_arg_index < len(orig_args):
orig_arg = orig_args[orig_arg_index]
# option
if orig_arg[1] == '-':
if orig_arg == '--':
- raise _Error(orig_arg_index, 'Invalid `--` argument')
+ raise _Error(orig_arg_index, orig_arg, 'Invalid `--` argument')
# long option
res = parse_long_opt()
return _ParseRes(items, orig_args[:orig_arg_index], orig_args[orig_arg_index:])
items += res.items
- orig_arg_index += res.orig_arg_index_incr
+ orig_arg_index = Index(orig_arg_index + res.orig_arg_index_incr)
else:
# non-option
items.append(_NonOptItem(orig_arg, orig_arg_index, non_opt_index))
- non_opt_index += 1
- orig_arg_index += 1
+ non_opt_index = Index(non_opt_index + 1)
+ orig_arg_index = Index(orig_arg_index + 1)
return _ParseRes(items, orig_args, [])
import barectf
import barectf.config_parse_common as barectf_config_parse_common
import barectf.argpar as barectf_argpar
+from typing import Any, List, Iterable, NoReturn
+import typing
+from barectf.typing import Index, Count
import sys
import os
# Colors and prints the error message `msg` and exits with status code
# 1.
-def _print_error(msg):
+def _print_error(msg: str) -> NoReturn:
termcolor.cprint('Error: ', 'red', end='', file=sys.stderr)
termcolor.cprint(msg, 'red', attrs=['bold'], file=sys.stderr)
sys.exit(1)
# Pretty-prints the barectf configuration error `exc` and exits with
# status code 1.
-def _print_config_error(exc):
+def _print_config_error(exc: barectf._ConfigurationParseError) -> NoReturn:
# reverse: most precise message comes last
for ctx in reversed(exc.context):
msg = ''
# Pretty-prints the unknown exception `exc`.
-def _print_unknown_exc(exc):
+def _print_unknown_exc(exc: Exception) -> NoReturn:
import traceback
traceback.print_exc()
# Finds and returns all the option items in `items` having the long name
# `long_name`.
-def _find_opt_items(items, long_name):
- ret_items = []
+def _find_opt_items(items: Iterable[barectf_argpar._Item],
+ long_name: str) -> List[barectf_argpar._OptItem]:
+ ret_items: List[barectf_argpar._OptItem] = []
for item in items:
- if type(item) is barectf_argpar._OptItem and item.descr.long_name == long_name:
- ret_items.append(item)
+ if type(item) is barectf_argpar._OptItem:
+ item = typing.cast(barectf_argpar._OptItem, item)
+
+ if item.descr.long_name == long_name:
+ ret_items.append(item)
return ret_items
# `items`.
#
# Returns `default` if there's no such option item.
-def _opt_item_val(items, long_name, default=None):
+def _opt_item_val(items: Iterable[barectf_argpar._Item], long_name: str,
+ default: Any = None) -> Any:
opt_items = _find_opt_items(items, long_name)
if len(opt_items) == 0:
pass
-def _cfg_file_path_from_parse_res(parse_res):
+def _cfg_file_path_from_parse_res(parse_res: barectf_argpar._ParseRes) -> str:
cfg_file_path = None
for item in parse_res.items:
if cfg_file_path is not None:
raise _CliError('Multiple configuration file paths provided')
- cfg_file_path = item.text
+ cfg_file_path = typing.cast(barectf_argpar._NonOptItem, item).text
if cfg_file_path is None:
raise _CliError('Missing configuration file path')
# Returns a `_CfgCmdCfg` object from the command-line parsing results
# `parse_res`.
-def _cfg_cmd_cfg_from_parse_res(parse_res):
+def _cfg_cmd_cfg_from_parse_res(parse_res: barectf_argpar._ParseRes) -> '_CfgCmdCfg':
# check configuration file path
cfg_file_path = _cfg_file_path_from_parse_res(parse_res)
- # inclusion directories
- inclusion_dirs = [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')]
+ # inclusion directories (`--include-dir` option needs an argument)
+ inclusion_dirs = typing.cast(List[str],
+ [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')])
for dir in inclusion_dirs:
if not os.path.isdir(dir):
# Returns a source and metadata stream file generating command object
# from the specific command-line arguments `orig_args`.
-def _gen_cmd_cfg_from_args(orig_args):
+def _gen_cmd_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_GenCmd':
# parse original arguments
opt_descrs = [
barectf_argpar.OptDescr('h', 'help'),
# Returns an effective configuration showing command object from the
# specific command-line arguments `orig_args`.
-def _show_effective_cfg_cfg_from_args(orig_args):
+def _show_effective_cfg_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowEffectiveCfgCmd':
# parse original arguments
opt_descrs = [
barectf_argpar.OptDescr('h', 'help'),
return _ShowEffectiveCfgCmd(_ShowEffectiveCfgCmdCfg(cfg_cmd_cfg.cfg_file_path,
cfg_cmd_cfg.inclusion_dirs,
cfg_cmd_cfg.ignore_inclusion_file_not_found,
- indent_space_count))
+ Count(indent_space_count)))
def _show_cfg_version_cmd_usage():
# Returns a configuration version showing command object from the
# specific command-line arguments `orig_args`.
-def _show_cfg_version_cfg_from_args(orig_args):
+def _show_cfg_version_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowCfgVersionCmd':
# parse original arguments
opt_descrs = [
barectf_argpar.OptDescr('h', 'help'),
# Returns a command object from the command-line arguments `orig_args`.
#
# All the `orig_args` elements are considered.
-def _cmd_from_args(orig_args):
+def _cmd_from_args(orig_args: barectf_argpar.OrigArgs) -> '_Cmd':
# We use our `argpar` module here instead of Python's `argparse`
# because we need to support the two following use cases:
#
'show-config-version': _show_cfg_version_cfg_from_args,
'show-cfg-version': _show_cfg_version_cfg_from_args,
}
- general_opt_items = []
+ general_opt_items: List[barectf_argpar._OptItem] = []
cmd_first_orig_arg_index = None
cmd_from_args_func = None
for item in res.items:
if type(item) is barectf_argpar._NonOptItem:
+ item = typing.cast(barectf_argpar._NonOptItem, item)
cmd_from_args_func = cmd_from_args_funcs.get(item.text)
if cmd_from_args_func is None:
cmd_first_orig_arg_index = item.orig_arg_index
else:
- cmd_first_orig_arg_index = item.orig_arg_index + 1
+ cmd_first_orig_arg_index = Index(item.orig_arg_index + 1)
break
else:
assert type(item) is barectf_argpar._OptItem
- general_opt_items.append(item)
+ general_opt_items.append(typing.cast(barectf_argpar._OptItem, item))
# general help?
if len(_find_opt_items(general_opt_items, 'help')) > 0:
class _CfgCmdCfg(_CmdCfg):
- def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found):
+ def __init__(self, cfg_file_path: str, inclusion_dirs: List[str],
+ ignore_inclusion_file_not_found: bool):
self._cfg_file_path = cfg_file_path
self._inclusion_dirs = inclusion_dirs
self._ignore_inclusion_file_not_found = ignore_inclusion_file_not_found
@property
- def cfg_file_path(self):
+ def cfg_file_path(self) -> str:
return self._cfg_file_path
@property
- def inclusion_dirs(self):
+ def inclusion_dirs(self) -> List[str]:
return self._inclusion_dirs
@property
- def ignore_inclusion_file_not_found(self):
+ def ignore_inclusion_file_not_found(self) -> bool:
return self._ignore_inclusion_file_not_found
class _Cmd:
- def __init__(self, cfg):
+ def __init__(self, cfg: _CmdCfg):
self._cfg = cfg
@property
- def cfg(self):
+ def cfg(self) -> _CmdCfg:
return self._cfg
def exec(self):
class _GenCmdCfg(_CfgCmdCfg):
- def __init__(self, cfg_file_path, c_source_dir, c_header_dir, metadata_stream_dir,
- inclusion_dirs, ignore_inclusion_file_not_found, dump_config, v2_prefix):
+ def __init__(self, cfg_file_path: str, c_source_dir: str, c_header_dir: str,
+ metadata_stream_dir: str, inclusion_dirs: List[str],
+ ignore_inclusion_file_not_found: bool, dump_config: bool, v2_prefix: str):
super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found)
self._c_source_dir = c_source_dir
self._c_header_dir = c_header_dir
self._v2_prefix = v2_prefix
@property
- def c_source_dir(self):
+ def c_source_dir(self) -> str:
return self._c_source_dir
@property
- def c_header_dir(self):
+ def c_header_dir(self) -> str:
return self._c_header_dir
@property
- def metadata_stream_dir(self):
+ def metadata_stream_dir(self) -> str:
return self._metadata_stream_dir
@property
- def dump_config(self):
+ def dump_config(self) -> bool:
return self._dump_config
@property
- def v2_prefix(self):
+ def v2_prefix(self) -> str:
return self._v2_prefix
class _ShowEffectiveCfgCmdCfg(_CfgCmdCfg):
- def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found,
- indent_space_count):
+ def __init__(self, cfg_file_path: str, inclusion_dirs: List[str],
+ ignore_inclusion_file_not_found: bool, indent_space_count: Count):
super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found)
self._indent_space_count = indent_space_count
@property
- def indent_space_count(self):
+ def indent_space_count(self) -> Count:
return self._indent_space_count
class _ShowCfgVersionCmdCfg(_CmdCfg):
- def __init__(self, cfg_file_path):
+ def __init__(self, cfg_file_path: str):
self._cfg_file_path = cfg_file_path
@property
- def cfg_file_path(self):
+ def cfg_file_path(self) -> str:
return self._cfg_file_path
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import barectf.version as barectf_version
+from typing import Optional, Any, FrozenSet, Mapping, Iterator, Set, Union
+import typing
+from barectf.typing import Count, Alignment, _OptStr, Id
import collections.abc
import collections
import datetime
import enum
+import uuid as uuidp
@enum.unique
class _FieldType:
@property
- def alignment(self):
+ def alignment(self) -> Alignment:
raise NotImplementedError
class _BitArrayFieldType(_FieldType):
- def __init__(self, size, byte_order=None, alignment=1):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Alignment = Alignment(1)):
self._size = size
self._byte_order = byte_order
self._alignment = alignment
@property
- def size(self):
+ def size(self) -> Count:
return self._size
@property
- def byte_order(self):
+ def byte_order(self) -> Optional[ByteOrder]:
return self._byte_order
@property
- def alignment(self):
+ def alignment(self) -> Alignment:
return self._alignment
class _IntegerFieldType(_BitArrayFieldType):
- def __init__(self, size, byte_order=None, alignment=None,
- preferred_display_base=DisplayBase.DECIMAL):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Optional[Alignment] = None,
+ preferred_display_base: DisplayBase = DisplayBase.DECIMAL):
effective_alignment = 1
if alignment is None and size % 8 == 0:
effective_alignment = 8
- super().__init__(size, byte_order, effective_alignment)
+ super().__init__(size, byte_order, Alignment(effective_alignment))
self._preferred_display_base = preferred_display_base
@property
- def preferred_display_base(self):
+ def preferred_display_base(self) -> DisplayBase:
return self._preferred_display_base
class EnumerationFieldTypeMappingRange:
- def __init__(self, lower, upper):
+ def __init__(self, lower: int, upper: int):
self._lower = lower
self._upper = upper
@property
- def lower(self):
+ def lower(self) -> int:
return self._lower
@property
- def upper(self):
+ def upper(self) -> int:
return self._upper
- def __eq__(self, other):
+ def __eq__(self, other: Any) -> bool:
if type(other) is not type(self):
return False
return (self._lower, self._upper) == (other._lower, other._upper)
- def __hash__(self):
+ def __hash__(self) -> int:
return hash((self._lower, self._upper))
- def contains(self, value):
+ def contains(self, value: int) -> bool:
return self._lower <= value <= self._upper
class EnumerationFieldTypeMapping:
- def __init__(self, ranges):
+ def __init__(self, ranges: Set[EnumerationFieldTypeMappingRange]):
self._ranges = frozenset(ranges)
@property
- def ranges(self):
+ def ranges(self) -> FrozenSet[EnumerationFieldTypeMappingRange]:
return self._ranges
- def ranges_contain_value(self, value):
+ def ranges_contain_value(self, value: int) -> bool:
return any([rg.contains(value) for rg in self._ranges])
+_EnumFtMappings = Mapping[str, EnumerationFieldTypeMapping]
+
+
class EnumerationFieldTypeMappings(collections.abc.Mapping):
- def __init__(self, mappings):
+ def __init__(self, mappings: _EnumFtMappings):
self._mappings = {label: mapping for label, mapping in mappings.items()}
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> EnumerationFieldTypeMapping:
return self._mappings[key]
- def __iter__(self):
+ def __iter__(self) -> Iterator[str]:
return iter(self._mappings)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._mappings)
class _EnumerationFieldType(_IntegerFieldType):
- def __init__(self, size, byte_order=None, alignment=None,
- preferred_display_base=DisplayBase.DECIMAL, mappings=None):
+ def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+ alignment: Optional[Alignment] = None,
+ preferred_display_base: DisplayBase = DisplayBase.DECIMAL,
+ mappings: Optional[_EnumFtMappings] = None):
super().__init__(size, byte_order, alignment, preferred_display_base)
self._mappings = EnumerationFieldTypeMappings({})
self._mappings = EnumerationFieldTypeMappings(mappings)
@property
- def mappings(self):
+ def mappings(self) -> EnumerationFieldTypeMappings:
return self._mappings
- def labels_for_value(self, value):
+ def labels_for_value(self, value: int) -> Set[str]:
labels = set()
for label, mapping in self._mappings.items():
class StringFieldType(_FieldType):
@property
- def alignment(self):
- return 8
+ def alignment(self) -> Alignment:
+ return Alignment(8)
class _ArrayFieldType(_FieldType):
- def __init__(self, element_field_type):
+ def __init__(self, element_field_type: _FieldType):
self._element_field_type = element_field_type
@property
- def element_field_type(self):
+ def element_field_type(self) -> _FieldType:
return self._element_field_type
@property
- def alignment(self):
+ def alignment(self) -> Alignment:
return self._element_field_type.alignment
class StaticArrayFieldType(_ArrayFieldType):
- def __init__(self, length, element_field_type):
+ def __init__(self, length: Count, element_field_type: _FieldType):
super().__init__(element_field_type)
self._length = length
@property
- def length(self):
+ def length(self) -> Count:
return self._length
class StructureFieldTypeMember:
- def __init__(self, field_type):
+ def __init__(self, field_type: _FieldType):
self._field_type = field_type
@property
- def field_type(self):
+ def field_type(self) -> _FieldType:
return self._field_type
+_StructFtMembers = Mapping[str, StructureFieldTypeMember]
+
+
class StructureFieldTypeMembers(collections.abc.Mapping):
- def __init__(self, members):
+ def __init__(self, members: _StructFtMembers):
self._members = collections.OrderedDict()
for name, member in members.items():
assert type(member) is StructureFieldTypeMember
self._members[name] = member
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> StructureFieldTypeMember:
return self._members[key]
- def __iter__(self):
+ def __iter__(self) -> Iterator[str]:
return iter(self._members)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._members)
class StructureFieldType(_FieldType):
- def __init__(self, minimum_alignment=1, members=None):
+ def __init__(self, minimum_alignment: Alignment = Alignment(1),
+ members: Optional[_StructFtMembers] = None):
self._minimum_alignment = minimum_alignment
self._members = StructureFieldTypeMembers({})
self._set_alignment()
def _set_alignment(self):
- self._alignment = self._minimum_alignment
+ self._alignment: Alignment = self._minimum_alignment
for member in self._members.values():
if member.field_type.alignment > self._alignment:
self._alignment = member.field_type.alignment
@property
- def minimum_alignment(self):
+ def minimum_alignment(self) -> Alignment:
return self._minimum_alignment
@property
- def alignment(self):
+ def alignment(self) -> Alignment:
return self._alignment
@property
- def members(self):
+ def members(self) -> StructureFieldTypeMembers:
return self._members
class _UniqueByName:
- def __eq__(self, other):
+ _name: str
+
+ def __eq__(self, other: Any) -> bool:
if type(other) is not type(self):
return False
return self._name == other._name
- def __lt__(self, other):
+ def __lt__(self, other: '_UniqueByName'):
assert type(self) is type(other)
return self._name < other._name
- def __hash__(self):
+ def __hash__(self) -> int:
return hash(self._name)
+_OptFt = Optional[_FieldType]
+_OptStructFt = Optional[StructureFieldType]
+LogLevel = typing.NewType('LogLevel', int)
+
+
class EventType(_UniqueByName):
- def __init__(self, name, log_level=None, specific_context_field_type=None,
- payload_field_type=None):
- self._id = None
+ def __init__(self, name: str, log_level: Optional[LogLevel] = None,
+ specific_context_field_type: _OptStructFt = None, payload_field_type: _OptStructFt = None):
+ self._id: Optional[Id] = None
self._name = name
self._log_level = log_level
self._specific_context_field_type = specific_context_field_type
self._payload_field_type = payload_field_type
@property
- def id(self):
+ def id(self) -> Optional[Id]:
return self._id
@property
- def name(self):
+ def name(self) -> str:
return self._name
@property
- def log_level(self):
+ def log_level(self) -> Optional[LogLevel]:
return self._log_level
@property
- def specific_context_field_type(self):
+ def specific_context_field_type(self) -> _OptStructFt:
return self._specific_context_field_type
@property
- def payload_field_type(self):
+ def payload_field_type(self) -> _OptStructFt:
return self._payload_field_type
class ClockTypeOffset:
- def __init__(self, seconds=0, cycles=0):
+ def __init__(self, seconds: int = 0, cycles: Count = Count(0)):
self._seconds = seconds
self._cycles = cycles
@property
- def seconds(self):
+ def seconds(self) -> int:
return self._seconds
@property
- def cycles(self):
+ def cycles(self) -> Count:
return self._cycles
+_OptUuid = Optional[uuidp.UUID]
+
+
class ClockType(_UniqueByName):
- def __init__(self, name, frequency=int(1e9), uuid=None, description=None, precision=0,
- offset=None, origin_is_unix_epoch=False):
+ def __init__(self, name: str, frequency: Count = Count(int(1e9)), uuid: _OptUuid = None,
+ description: _OptStr = None, precision: Count = Count(0),
+ offset: Optional[ClockTypeOffset] = None, origin_is_unix_epoch: bool = False):
self._name = name
self._frequency = frequency
self._uuid = uuid
self._origin_is_unix_epoch = origin_is_unix_epoch
@property
- def name(self):
+ def name(self) -> str:
return self._name
@property
- def frequency(self):
+ def frequency(self) -> Count:
return self._frequency
@property
- def uuid(self):
+ def uuid(self) -> _OptUuid:
return self._uuid
@property
- def description(self):
+ def description(self) -> _OptStr:
return self._description
@property
- def precision(self):
+ def precision(self) -> Count:
return self._precision
@property
- def offset(self):
+ def offset(self) -> ClockTypeOffset:
return self._offset
@property
- def origin_is_unix_epoch(self):
+ def origin_is_unix_epoch(self) -> bool:
return self._origin_is_unix_epoch
DEFAULT_FIELD_TYPE = 'default'
+_DefaultableUIntFt = Union[str, UnsignedIntegerFieldType]
+_OptDefaultableUIntFt = Optional[_DefaultableUIntFt]
+_OptUIntFt = Optional[UnsignedIntegerFieldType]
class StreamTypePacketFeatures:
- def __init__(self, total_size_field_type=DEFAULT_FIELD_TYPE,
- content_size_field_type=DEFAULT_FIELD_TYPE, beginning_time_field_type=None,
- end_time_field_type=None, discarded_events_counter_field_type=None):
- def get_ft(user_ft):
+ def __init__(self, total_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ content_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ beginning_time_field_type: _OptDefaultableUIntFt = None,
+ end_time_field_type: _OptDefaultableUIntFt = None,
+ discarded_events_counter_field_type: _OptDefaultableUIntFt = None):
+ def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
if user_ft == DEFAULT_FIELD_TYPE:
return UnsignedIntegerFieldType(64)
- return user_ft
+ return typing.cast(_OptUIntFt, user_ft)
self._total_size_field_type = get_ft(total_size_field_type)
self._content_size_field_type = get_ft(content_size_field_type)
self._discarded_events_counter_field_type = get_ft(discarded_events_counter_field_type)
@property
- def total_size_field_type(self):
+ def total_size_field_type(self) -> _OptUIntFt:
return self._total_size_field_type
@property
- def content_size_field_type(self):
+ def content_size_field_type(self) -> _OptUIntFt:
return self._content_size_field_type
@property
- def beginning_time_field_type(self):
+ def beginning_time_field_type(self) -> _OptUIntFt:
return self._beginning_time_field_type
@property
- def end_time_field_type(self):
+ def end_time_field_type(self) -> _OptUIntFt:
return self._end_time_field_type
@property
- def discarded_events_counter_field_type(self):
+ def discarded_events_counter_field_type(self) -> _OptUIntFt:
return self._discarded_events_counter_field_type
class StreamTypeEventFeatures:
- def __init__(self, type_id_field_type=DEFAULT_FIELD_TYPE, time_field_type=None):
- def get_ft(user_field_type):
- if user_field_type == DEFAULT_FIELD_TYPE:
+ def __init__(self, type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ time_field_type: _OptDefaultableUIntFt = None):
+ def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
+ if user_ft == DEFAULT_FIELD_TYPE:
return UnsignedIntegerFieldType(64)
- return user_field_type
+ return typing.cast(_OptUIntFt, user_ft)
self._type_id_field_type = get_ft(type_id_field_type)
self._time_field_type = get_ft(time_field_type)
@property
- def type_id_field_type(self):
+ def type_id_field_type(self) -> _OptUIntFt:
return self._type_id_field_type
@property
- def time_field_type(self):
+ def time_field_type(self) -> _OptUIntFt:
return self._time_field_type
class StreamTypeFeatures:
- def __init__(self, packet_features=None, event_features=None):
+ def __init__(self, packet_features: Optional[StreamTypePacketFeatures] = None,
+ event_features: Optional[StreamTypeEventFeatures] = None):
self._packet_features = StreamTypePacketFeatures()
if packet_features is not None:
self._event_features = event_features
@property
- def packet_features(self):
+ def packet_features(self) -> StreamTypePacketFeatures:
return self._packet_features
@property
- def event_features(self):
+ def event_features(self) -> StreamTypeEventFeatures:
return self._event_features
class StreamType(_UniqueByName):
- def __init__(self, name, event_types, default_clock_type=None, features=None,
- packet_context_field_type_extra_members=None,
- event_common_context_field_type=None):
- self._id = None
+ def __init__(self, name: str, event_types: Set[EventType],
+ default_clock_type: Optional[ClockType] = None,
+ features: Optional[StreamTypeFeatures] = None,
+ packet_context_field_type_extra_members: Optional[_StructFtMembers] = None,
+ event_common_context_field_type: _OptStructFt = None):
+ self._id: Optional[Id] = None
self._name = name
self._default_clock_type = default_clock_type
self._event_common_context_field_type = event_common_context_field_type
# assign unique IDs
for index, ev_type in enumerate(sorted(self._event_types, key=lambda evt: evt.name)):
assert ev_type._id is None
- ev_type._id = index
+ ev_type._id = Id(index)
self._set_features(features)
self._packet_context_field_type_extra_members = StructureFieldTypeMembers({})
self._set_pkt_ctx_ft()
self._set_ev_header_ft()
- def _set_features(self, features):
+ def _set_features(self, features: Optional[StreamTypeFeatures]):
if features is not None:
self._features = features
- return
+ return None
ev_time_ft = None
pkt_beginning_time_ft = None
end_time_field_type=pkt_end_time_ft),
StreamTypeEventFeatures(time_field_type=ev_time_ft))
- def _set_ft_mapped_clk_type_name(self, ft):
+ def _set_ft_mapped_clk_type_name(self, ft: Optional[UnsignedIntegerFieldType]):
if ft is None:
return
ft._mapped_clk_type_name = self._default_clock_type.name
def _set_pkt_ctx_ft(self):
- def add_member_if_exists(name, ft, set_mapped_clk_type_name=False):
+ members = None
+
+ def add_member_if_exists(name: str, ft: _FieldType, set_mapped_clk_type_name: bool = False):
nonlocal members
if ft is not None:
if set_mapped_clk_type_name:
- self._set_ft_mapped_clk_type_name(ft)
+ self._set_ft_mapped_clk_type_name(typing.cast(UnsignedIntegerFieldType, ft))
members[name] = StructureFieldTypeMember(ft)
self._ev_header_ft = StructureFieldType(8, members)
@property
- def id(self):
+ def id(self) -> Optional[Id]:
return self._id
@property
- def name(self):
+ def name(self) -> str:
return self._name
@property
- def default_clock_type(self):
+ def default_clock_type(self) -> Optional[ClockType]:
return self._default_clock_type
@property
- def features(self):
+ def features(self) -> StreamTypeFeatures:
return self._features
@property
- def packet_context_field_type_extra_members(self):
+ def packet_context_field_type_extra_members(self) -> StructureFieldTypeMembers:
return self._packet_context_field_type_extra_members
@property
- def event_common_context_field_type(self):
+ def event_common_context_field_type(self) -> _OptStructFt:
return self._event_common_context_field_type
@property
- def event_types(self):
+ def event_types(self) -> FrozenSet[EventType]:
return self._event_types
+_OptUuidFt = Optional[Union[str, StaticArrayFieldType]]
+
+
class TraceTypeFeatures:
- def __init__(self, magic_field_type=DEFAULT_FIELD_TYPE, uuid_field_type=None,
- stream_type_id_field_type=DEFAULT_FIELD_TYPE):
- def get_field_type(user_field_type, default_field_type):
- if user_field_type == DEFAULT_FIELD_TYPE:
- return default_field_type
+ def __init__(self, magic_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+ uuid_field_type: _OptUuidFt = None,
+ stream_type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE):
+ def get_field_type(user_ft: Optional[Union[str, _FieldType]], default_ft: _FieldType) -> _OptFt:
+ if user_ft == DEFAULT_FIELD_TYPE:
+ return default_ft
- return user_field_type
+ return typing.cast(_OptFt, user_ft)
- self._magic_field_type = get_field_type(magic_field_type, UnsignedIntegerFieldType(32))
- self._uuid_field_type = get_field_type(uuid_field_type,
- StaticArrayFieldType(16, UnsignedIntegerFieldType(8)))
- self._stream_type_id_field_type = get_field_type(stream_type_id_field_type,
- UnsignedIntegerFieldType(64))
+ self._magic_field_type = typing.cast(_OptUIntFt, get_field_type(magic_field_type,
+ UnsignedIntegerFieldType(32)))
+ self._uuid_field_type = typing.cast(Optional[StaticArrayFieldType], get_field_type(uuid_field_type,
+ StaticArrayFieldType(Count(16),
+ UnsignedIntegerFieldType(8))))
+ self._stream_type_id_field_type = typing.cast(_OptUIntFt, get_field_type(stream_type_id_field_type,
+ UnsignedIntegerFieldType(64)))
@property
- def magic_field_type(self):
+ def magic_field_type(self) -> _OptUIntFt:
return self._magic_field_type
@property
- def uuid_field_type(self):
+ def uuid_field_type(self) -> Optional[StaticArrayFieldType]:
return self._uuid_field_type
@property
- def stream_type_id_field_type(self):
+ def stream_type_id_field_type(self) -> _OptUIntFt:
return self._stream_type_id_field_type
class TraceType:
- def __init__(self, stream_types, default_byte_order, uuid=None, features=None):
+ def __init__(self, stream_types: Set[StreamType], default_byte_order: ByteOrder,
+ uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None):
self._default_byte_order = default_byte_order
self._stream_types = frozenset(stream_types)
# assign unique IDs
for index, stream_type in enumerate(sorted(self._stream_types, key=lambda st: st.name)):
assert stream_type._id is None
- stream_type._id = index
+ stream_type._id = Id(index)
self._uuid = uuid
self._set_features(features)
self._set_pkt_header_ft()
self._set_fts_effective_byte_order()
- def _set_features(self, features):
+ def _set_features(self, features: Optional[TraceTypeFeatures]):
if features is not None:
self._features = features
return
self._features = TraceTypeFeatures(uuid_field_type=uuid_ft)
def _set_pkt_header_ft(self):
- def add_member_if_exists(name, field_type):
+ members = collections.OrderedDict()
+
+ def add_member_if_exists(name: str, ft: _OptFt):
nonlocal members
- if field_type is not None:
- members[name] = StructureFieldTypeMember(field_type)
+ if ft is not None:
+ members[name] = StructureFieldTypeMember(ft)
- members = collections.OrderedDict()
add_member_if_exists('magic', self._features.magic_field_type)
add_member_if_exists('uuid', self._features.uuid_field_type)
add_member_if_exists('stream_id', self._features.stream_type_id_field_type)
self._pkt_header_ft = StructureFieldType(8, members)
def _set_fts_effective_byte_order(self):
- def set_ft_effective_byte_order(ft):
+ def set_ft_effective_byte_order(ft: _OptFt):
if ft is None:
return
set_ft_effective_byte_order(ev_type._payload_field_type)
@property
- def default_byte_order(self):
+ def default_byte_order(self) -> ByteOrder:
return self._default_byte_order
@property
- def uuid(self):
+ def uuid(self) -> _OptUuid:
return self._uuid
@property
- def stream_types(self):
+ def stream_types(self) -> FrozenSet[StreamType]:
return self._stream_types
- def stream_type(self, name):
+ def stream_type(self, name: str) -> Optional[StreamType]:
for cand_stream_type in self._stream_types:
if cand_stream_type.name == name:
return cand_stream_type
+ return None
+
@property
- def features(self):
+ def features(self) -> TraceTypeFeatures:
return self._features
+_EnvEntry = Union[str, int]
+_EnvEntries = Mapping[str, _EnvEntry]
+
+
class TraceEnvironment(collections.abc.Mapping):
- def __init__(self, environment):
+ def __init__(self, environment: _EnvEntries):
self._env = {name: value for name, value in environment.items()}
- def __getitem__(self, key):
+ def __getitem__(self, key: str) -> _EnvEntry:
return self._env[key]
- def __iter__(self):
+ def __iter__(self) -> Iterator[str]:
return iter(self._env)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._env)
class Trace:
- def __init__(self, type, environment=None):
+ def __init__(self, type: TraceType, environment: Optional[_EnvEntries] = None):
self._type = type
self._set_env(environment)
- def _set_env(self, environment):
+ def _set_env(self, environment: Optional[_EnvEntries]):
init_env = collections.OrderedDict([
('domain', 'bare'),
('tracer_name', 'barectf'),
environment = {}
init_env.update(environment)
- self._env = TraceEnvironment(init_env)
+ self._env = TraceEnvironment(typing.cast(_EnvEntries, init_env))
@property
- def type(self):
+ def type(self) -> TraceType:
return self._type
@property
- def environment(self):
+ def environment(self) -> TraceEnvironment:
return self._env
+_ClkTypeCTypes = Mapping[ClockType, str]
+
+
class ClockTypeCTypes(collections.abc.Mapping):
- def __init__(self, c_types):
+ def __init__(self, c_types: _ClkTypeCTypes):
self._c_types = {clk_type: c_type for clk_type, c_type in c_types.items()}
- def __getitem__(self, key):
+ def __getitem__(self, key: ClockType) -> str:
return self._c_types[key]
- def __iter__(self):
+ def __iter__(self) -> Iterator[ClockType]:
return iter(self._c_types)
- def __len__(self):
+ def __len__(self) -> int:
return len(self._c_types)
class ConfigurationCodeGenerationHeaderOptions:
- def __init__(self, identifier_prefix_definition=False,
- default_stream_type_name_definition=False):
+ def __init__(self, identifier_prefix_definition: bool = False,
+ default_stream_type_name_definition: bool = False):
self._identifier_prefix_definition = identifier_prefix_definition
self._default_stream_type_name_definition = default_stream_type_name_definition
@property
- def identifier_prefix_definition(self):
+ def identifier_prefix_definition(self) -> bool:
return self._identifier_prefix_definition
@property
- def default_stream_type_name_definition(self):
+ def default_stream_type_name_definition(self) -> bool:
return self._default_stream_type_name_definition
class ConfigurationCodeGenerationOptions:
- def __init__(self, identifier_prefix='barectf_', file_name_prefix='barectf',
- default_stream_type=None, header_options=None, clock_type_c_types=None):
+ def __init__(self, identifier_prefix: str = 'barectf_', file_name_prefix: str = 'barectf',
+ default_stream_type: Optional[StreamType] = None,
+ header_options: Optional[ConfigurationCodeGenerationHeaderOptions] = None,
+ clock_type_c_types: Optional[_ClkTypeCTypes] = None):
self._identifier_prefix = identifier_prefix
self._file_name_prefix = file_name_prefix
self._default_stream_type = default_stream_type
self._clock_type_c_types = ClockTypeCTypes(clock_type_c_types)
@property
- def identifier_prefix(self):
+ def identifier_prefix(self) -> str:
return self._identifier_prefix
@property
- def file_name_prefix(self):
+ def file_name_prefix(self) -> str:
return self._file_name_prefix
@property
- def default_stream_type(self):
+ def default_stream_type(self) -> Optional[StreamType]:
return self._default_stream_type
@property
- def header_options(self):
+ def header_options(self) -> ConfigurationCodeGenerationHeaderOptions:
return self._header_options
@property
- def clock_type_c_types(self):
+ def clock_type_c_types(self) -> ClockTypeCTypes:
return self._clock_type_c_types
class ConfigurationOptions:
- def __init__(self, code_generation_options=None):
+ def __init__(self,
+ code_generation_options: Optional[ConfigurationCodeGenerationOptions] = None):
self._code_generation_options = ConfigurationCodeGenerationOptions()
if code_generation_options is not None:
self._code_generation_options = code_generation_options
@property
- def code_generation_options(self):
+ def code_generation_options(self) -> ConfigurationCodeGenerationOptions:
return self._code_generation_options
class Configuration:
- def __init__(self, trace, options=None):
+ def __init__(self, trace: Trace, options: Optional[ConfigurationOptions] = None):
self._trace = trace
self._options = ConfigurationOptions()
clk_type_c_types._c_types[def_clk_type] = 'uint32_t'
@property
- def trace(self):
+ def trace(self) -> Trace:
return self._trace
@property
- def options(self):
+ def options(self) -> ConfigurationOptions:
return self._options
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import barectf.config_parse as barectf_config_parse
+import barectf.config as barectf_config
+from barectf.typing import Count, VersionNumber
+from typing import Optional, List, TextIO
-def effective_configuration_file(file, with_package_inclusion_directory=True,
- inclusion_directories=None, ignore_inclusion_not_found=False,
- indent_space_count=2):
+def effective_configuration_file(file: TextIO, with_package_inclusion_directory: bool = True,
+ inclusion_directories: Optional[List[str]] = None,
+ ignore_inclusion_not_found: bool = False,
+ indent_space_count: Count = Count(2)) -> str:
if inclusion_directories is None:
inclusion_directories = []
indent_space_count)
-def configuration_from_file(file, with_package_inclusion_directory=True, inclusion_directories=None,
- ignore_inclusion_not_found=False):
+def configuration_from_file(file: TextIO, with_package_inclusion_directory: bool = True,
+ inclusion_directories: Optional[List[str]] = None,
+ ignore_inclusion_not_found: bool = False) -> barectf_config.Configuration:
if inclusion_directories is None:
inclusion_directories = []
inclusion_directories, ignore_inclusion_not_found)
-def configuration_file_major_version(file):
+def configuration_file_major_version(file: TextIO) -> VersionNumber:
return barectf_config_parse._config_file_major_version(file)
import barectf.config_parse_common as barectf_config_parse_common
from barectf.config_parse_common import _ConfigurationParseError
+from barectf.config_parse_common import _MapNode
import barectf.config_parse_v2 as barectf_config_parse_v2
import barectf.config_parse_v3 as barectf_config_parse_v3
+import barectf.config as barectf_config
import collections
+from barectf.typing import Count, VersionNumber
+from typing import Optional, List, TextIO
+import typing
# Creates and returns a barectf 3 YAML configuration file parser to
# parse the file-like object `file`.
#
# `file` can be a barectf 2 or 3 configuration file.
-def _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found):
+def _create_v3_parser(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+ ignore_include_not_found: bool) -> barectf_config_parse_v3._Parser:
try:
root_node = barectf_config_parse_common._yaml_load(file)
if type(root_node) is barectf_config_parse_common._ConfigNodeV3:
# barectf 3 configuration file
- return barectf_config_parse_v3._Parser(file, root_node, with_pkg_include_dir,
- include_dirs, ignore_include_not_found)
+ return barectf_config_parse_v3._Parser(file,
+ typing.cast(barectf_config_parse_common._ConfigNodeV3,
+ root_node),
+ with_pkg_include_dir, include_dirs,
+ ignore_include_not_found)
elif type(root_node) is collections.OrderedDict:
# barectf 2 configuration file
- v2_parser = barectf_config_parse_v2._Parser(file, root_node, with_pkg_include_dir,
- include_dirs, ignore_include_not_found)
+ v2_parser = barectf_config_parse_v2._Parser(file, typing.cast(_MapNode, root_node),
+ with_pkg_include_dir, include_dirs,
+ ignore_include_not_found)
return barectf_config_parse_v3._Parser(file, v2_parser.config_node,
with_pkg_include_dir, include_dirs,
ignore_include_not_found)
barectf_config_parse_common._append_error_ctx(exc, 'Configuration',
'Cannot create configuration from YAML file')
+ # satisfy static type checker (never reached)
+ raise
+
-def _from_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found):
+def _from_file(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+ ignore_include_not_found: bool) -> barectf_config.Configuration:
return _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found).config
-def _effective_config_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found,
- indent_space_count):
+def _effective_config_file(file: TextIO, with_pkg_include_dir: bool,
+ include_dirs: Optional[List[str]], ignore_include_not_found: bool,
+ indent_space_count: Count) -> str:
config_node = _create_v3_parser(file, with_pkg_include_dir, include_dirs,
ignore_include_not_found).config_node
return barectf_config_parse_common._yaml_dump(config_node, indent=indent_space_count,
explicit_end=True)
-def _config_file_major_version(file):
+def _config_file_major_version(file: TextIO) -> VersionNumber:
try:
root_node = barectf_config_parse_common._yaml_load(file)
if type(root_node) is barectf_config_parse_common._ConfigNodeV3:
# barectf 3 configuration file
- return 3
- elif type(root_node) is collections.OrderedDict:
+ return VersionNumber(3)
+ else:
# barectf 2 configuration file
- return 2
+ assert type(root_node) is collections.OrderedDict
+ return VersionNumber(2)
except _ConfigurationParseError as exc:
barectf_config_parse_common._append_error_ctx(exc, 'Configuration', 'Cannot load YAML file')
+
+ # satisfy static type checker (never reached)
+ raise
import pkg_resources
import collections
-import jsonschema
+import jsonschema # type: ignore
import os.path
import yaml
import copy
import os
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, Any, TextIO, MutableMapping, Union, Set, Iterable, Callable, Tuple
+import typing
# The context of a configuration parsing error.
#
# Such a context object has a name and, optionally, a message.
class _ConfigurationParseErrorContext:
- def __init__(self, name, message=None):
+ def __init__(self, name: str, message: _OptStr = None):
self._name = name
self._msg = message
@property
- def name(self):
+ def name(self) -> str:
return self._name
@property
- def message(self):
+ def message(self) -> _OptStr:
return self._msg
-# Appends the context having the object name `obj_name` and the
-# (optional) message `message` to the `_ConfigurationParseError`
-# exception `exc` and then raises `exc` again.
-def _append_error_ctx(exc, obj_name, message=None):
- exc._append_ctx(obj_name, message)
- raise exc
-
-
# A configuration parsing error.
#
# Such an error object contains a list of contexts (`context` property).
class _ConfigurationParseError(Exception):
def __init__(self, init_ctx_obj_name, init_ctx_msg=None):
super().__init__()
- self._ctx = []
+ self._ctx: List[_ConfigurationParseErrorContext] = []
self._append_ctx(init_ctx_obj_name, init_ctx_msg)
@property
- def context(self):
+ def context(self) -> List[_ConfigurationParseErrorContext]:
return self._ctx
- def _append_ctx(self, name, msg=None):
+ def _append_ctx(self, name: str, msg: _OptStr = None):
self._ctx.append(_ConfigurationParseErrorContext(name, msg))
def __str__(self):
return '\n'.join(lines)
+# Appends the context having the object name `obj_name` and the
+# (optional) message `message` to the `_ConfigurationParseError`
+# exception `exc` and then raises `exc` again.
+def _append_error_ctx(exc: _ConfigurationParseError, obj_name: str, message: _OptStr = None):
+ exc._append_ctx(obj_name, message)
+ raise exc
+
+
_V3Prefixes = collections.namedtuple('_V3Prefixes', ['identifier', 'file_name'])
# Convers a v2 prefix to v3 prefixes.
-def _v3_prefixes_from_v2_prefix(v2_prefix):
+def _v3_prefixes_from_v2_prefix(v2_prefix: str) -> _V3Prefixes:
return _V3Prefixes(v2_prefix, v2_prefix.rstrip('_'))
# This must never happen in barectf because all our schemas are local;
# it would mean a programming or schema error.
class _RefResolver(jsonschema.RefResolver):
- def resolve_remote(self, uri):
+ def resolve_remote(self, uri: str):
raise RuntimeError(f'Missing local schema with URI `{uri}`')
+# Not all static type checkers support type recursion, so let's just use
+# `Any` as a map node's value's type.
+_MapNode = MutableMapping[str, Any]
+
+
# Schema validator which considers all the schemas found in the
# subdirectories `subdirs` (at build time) of the barectf package's
# `schemas` directory.
# The only public method is validate() which accepts an instance to
# validate as well as a schema short ID.
class _SchemaValidator:
- def __init__(self, subdirs):
+ def __init__(self, subdirs: Iterable[str]):
schemas_dir = pkg_resources.resource_filename(__name__, 'schemas')
- self._store = {}
+ self._store: Dict[str, str] = {}
for subdir in subdirs:
dir = os.path.join(schemas_dir, subdir)
return dct
- def _validate(self, instance, schema_short_id):
+ def _validate(self, instance: _MapNode, schema_short_id: str):
# retrieve full schema ID from short ID
schema_id = f'https://barectf.org/schemas/{schema_short_id}.json'
assert schema_id in self._store
#
# Raises a `_ConfigurationParseError` object, hiding any
# `jsonschema` exception, on validation failure.
- def validate(self, instance, schema_short_id):
+ def validate(self, instance: _MapNode, schema_short_id: str):
try:
self._validate(instance, schema_short_id)
except jsonschema.ValidationError as exc:
# barectf 3 YAML configuration node.
class _ConfigNodeV3:
- def __init__(self, config_node):
+ def __init__(self, config_node: _MapNode):
self._config_node = config_node
@property
- def config_node(self):
+ def config_node(self) -> _MapNode:
return self._config_node
# `collections.OrderedDict` object.
#
# All YAML maps are loaded as `collections.OrderedDict` objects.
-def _yaml_load(file):
+def _yaml_load(file: TextIO) -> Union[_ConfigNodeV3, _MapNode]:
class Loader(yaml.Loader):
pass
- def config_ctor(loader, node):
+ def config_ctor(loader, node) -> _ConfigNodeV3:
if not isinstance(node, yaml.MappingNode):
problem = f'Expecting a map for the tag `{node.tag}`'
raise yaml.constructor.ConstructorError(problem=problem)
loader.flatten_mapping(node)
return _ConfigNodeV3(collections.OrderedDict(loader.construct_pairs(node)))
- def mapping_ctor(loader, node):
+ def mapping_ctor(loader, node) -> _MapNode:
loader.flatten_mapping(node)
return collections.OrderedDict(loader.construct_pairs(node))
raise _ConfigurationParseError('YAML loader', f'Cannot load file: {exc}')
-def _yaml_load_path(path):
+def _yaml_load_path(path: str) -> Union[_ConfigNodeV3, _MapNode]:
with open(path) as f:
return _yaml_load(f)
# Dumps the content of the Python object `obj`
# (`collections.OrderedDict` or `_ConfigNodeV3`) as a YAML string and
# returns it.
-def _yaml_dump(node, **kwds):
+def _yaml_dump(node: _MapNode, **kwds) -> str:
class Dumper(yaml.Dumper):
pass
# mostly contains helpers.
class _Parser:
# Builds a base barectf YAML configuration parser to process the
- # configuration node `node` (already loaded from the file having the
- # path `path`).
+ # configuration node `node` (already loaded from the file-like
+ # object `file`).
#
# For its _process_node_include() method, the parser considers the
# package inclusion directory as well as `include_dirs`, and ignores
# nonexistent inclusion files if `ignore_include_not_found` is
# `True`.
- def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found,
- major_version):
- self._root_path = path
+ def __init__(self, root_file: TextIO, node: Union[_MapNode, _ConfigNodeV3],
+ with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+ ignore_include_not_found: bool, major_version: VersionNumber):
+ self._root_file = root_file
self._root_node = node
self._ft_prop_names = [
# barectf 2.1+
'element-field-type',
]
+ if include_dirs is None:
+ include_dirs = []
+
self._include_dirs = copy.copy(include_dirs)
if with_pkg_include_dir:
self._include_dirs.append(pkg_resources.resource_filename(__name__, f'include/{major_version}'))
self._ignore_include_not_found = ignore_include_not_found
- self._include_stack = []
- self._resolved_ft_aliases = set()
+ self._include_stack: List[str] = []
+ self._resolved_ft_aliases: Set[str] = set()
self._schema_validator = _SchemaValidator({'common/config', f'{major_version}/config'})
self._major_version = major_version
@property
- def _struct_ft_node_members_prop_name(self):
+ def _struct_ft_node_members_prop_name(self) -> str:
if self._major_version == 2:
return 'fields'
else:
return 'members'
# Returns the last included file name from the parser's inclusion
- # file name stack.
- def _get_last_include_file(self):
+ # file name stack, or `N/A` if the root file does not have an
+ # associated path under the `name` property.
+ def _get_last_include_file(self) -> str:
if self._include_stack:
return self._include_stack[-1]
- return self._root_path
+ if hasattr(self._root_file, 'name'):
+ return typing.cast(str, self._root_file.name)
+
+ return 'N/A'
# Loads the inclusion file having the path `yaml_path` and returns
# its content as a `collections.OrderedDict` object.
- def _load_include(self, yaml_path):
+ def _load_include(self, yaml_path) -> Optional[_MapNode]:
for inc_dir in self._include_dirs:
# Current inclusion dir + file name path.
#
self._include_stack.append(norm_path)
# load raw content
- return _yaml_load_path(norm_path)
+ return typing.cast(_MapNode, _yaml_load_path(norm_path))
if not self._ignore_include_not_found:
base_path = self._get_last_include_file()
raise _ConfigurationParseError(f'File `{base_path}`',
f'Cannot include file `{yaml_path}`: file not found in inclusion directories')
+ return None
+
# Returns a list of all the inclusion file paths as found in the
# inclusion node `include_node`.
- def _get_include_paths(self, include_node):
+ def _get_include_paths(self, include_node: _MapNode) -> List[str]:
if include_node is None:
# none
return []
if type(include_node) is str:
# wrap as array
- return [include_node]
+ return [typing.cast(str, include_node)]
# already an array
assert type(include_node) is list
- return include_node
+ return typing.cast(List[str], include_node)
# Updates the node `base_node` with an overlay node `overlay_node`.
#
# Both the inclusion and field type node inheritance features use
# this update mechanism.
- def _update_node(self, base_node, overlay_node):
+ def _update_node(self, base_node: _MapNode, overlay_node: _MapNode):
# see the comment about the `members` property below
- def update_members_node(base_value, olay_value):
- assert type(olay_value) is list
- assert type(base_value) is list
-
+ def update_members_node(base_value: List[Any], olay_value: List[Any]):
for olay_item in olay_value:
# assume we append `olay_item` to `base_value` initially
append_olay_item = True
# `last_overlay_node` and then patches the current base node with
# its other properties before returning the result (always a deep
# copy).
- def _process_node_include(self, last_overlay_node,
- process_base_include_cb,
- process_children_include_cb=None):
+ def _process_node_include(self, last_overlay_node: _MapNode,
+ process_base_include_cb: Callable[[_MapNode], _MapNode],
+ process_children_include_cb: Optional[Callable[[_MapNode], None]] = None) -> _MapNode:
# process children inclusions first
if process_children_include_cb is not None:
process_children_include_cb(last_overlay_node)
# Generates pairs of member node and field type node property name
# (in the member node) for the structure field type node's members
# node `node`.
- def _struct_ft_member_fts_iter(self, node):
+ def _struct_ft_member_fts_iter(self,
+ node: Union[List[_MapNode], _MapNode]) -> Iterable[Tuple[_MapNode, str]]:
if type(node) is list:
# barectf 3
assert self._major_version == 3
+ node = typing.cast(List[_MapNode], node)
for member_node in node:
assert type(member_node) is collections.OrderedDict
+ member_node = typing.cast(_MapNode, member_node)
name, val = list(member_node.items())[0]
if type(val) is collections.OrderedDict:
# barectf 2
assert self._major_version == 2
assert type(node) is collections.OrderedDict
+ node = typing.cast(_MapNode, node)
for name in node:
yield node, name
#
# `ctx_obj_name` is the context's object name when this method
# raises a `_ConfigurationParseError` exception.
- def _resolve_ft_alias(self, ft_aliases_node, parent_node, key, ctx_obj_name, alias_set=None):
+ def _resolve_ft_alias(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str,
+ ctx_obj_name: str, alias_set: Optional[Set[str]] = None):
if key not in parent_node:
return
# Like _resolve_ft_alias(), but builds a context object name for any
# `ctx_obj_name` exception.
- def _resolve_ft_alias_from(self, ft_aliases_node, parent_node, key):
+ def _resolve_ft_alias_from(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str):
self._resolve_ft_alias(ft_aliases_node, parent_node, key, f'`{key}` property')
# Applies field type node inheritance to the property `key` of
#
# When this method returns, no field type node has an `$inherit` or
# `inherit` property.
- def _apply_ft_inheritance(self, parent_node, key):
+ def _apply_ft_inheritance(self, parent_node: _MapNode, key: str):
if key not in parent_node:
return
from barectf.config_parse_common import _ConfigurationParseError
from barectf.config_parse_common import _append_error_ctx
import barectf.config_parse_common as config_parse_common
+from barectf.config_parse_common import _MapNode
import collections
import copy
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, TextIO, Union, Callable
+import typing
-def _del_prop_if_exists(node, prop_name):
+def _del_prop_if_exists(node: _MapNode, prop_name: str):
if prop_name in node:
del node[prop_name]
-def _rename_prop(node, old_prop_name, new_prop_name):
+def _rename_prop(node: _MapNode, old_prop_name: str, new_prop_name: str):
if old_prop_name in node:
node[new_prop_name] = node[old_prop_name]
del node[old_prop_name]
-def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
+def _copy_prop_if_exists(dst_node: _MapNode, src_node: _MapNode, src_prop_name: str,
+ dst_prop_name: _OptStr = None):
if dst_prop_name is None:
dst_prop_name = src_prop_name
# parsing stages and general strategy.
class _Parser(config_parse_common._Parser):
# Builds a barectf 2 YAML configuration parser and parses the root
- # configuration node `node` (already loaded from `path`).
- def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found):
- super().__init__(path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, 2)
- self._ft_cls_name_to_conv_method = {
+ # configuration node `node` (already loaded from the file-like
+ # object `root_file`).
+ def __init__(self, root_file: TextIO, node: _MapNode, with_pkg_include_dir: bool,
+ include_dirs: Optional[List[str]], ignore_include_not_found: bool):
+ super().__init__(root_file, node, with_pkg_include_dir, include_dirs,
+ ignore_include_not_found, VersionNumber(2))
+ self._ft_cls_name_to_conv_method: Dict[str, Callable[[_MapNode], _MapNode]] = {
'int': self._conv_int_ft_node,
'integer': self._conv_int_ft_node,
'enum': self._conv_enum_ft_node,
# Converts a v2 field type node to a v3 field type node and returns
# it.
- def _conv_ft_node(self, v2_ft_node):
+ def _conv_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
assert 'class' in v2_ft_node
cls = v2_ft_node['class']
assert cls in self._ft_cls_name_to_conv_method
return self._ft_cls_name_to_conv_method[cls](v2_ft_node)
- def _conv_ft_node_if_exists(self, v2_parent_node, key):
+ def _conv_ft_node_if_exists(self, v2_parent_node: Optional[_MapNode], key: str) -> Optional[_MapNode]:
if v2_parent_node is None:
- return
+ return None
if key not in v2_parent_node:
- return
+ return None
return self._conv_ft_node(v2_parent_node[key])
# Converts a v2 integer field type node to a v3 integer field type
# node and returns it.
- def _conv_int_ft_node(self, v2_ft_node):
+ def _conv_int_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 integer field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# Converts a v2 enumeration field type node to a v3 enumeration
# field type node and returns it.
- def _conv_enum_ft_node(self, v2_ft_node):
+ def _conv_enum_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# An enumeration field type _is_ an integer field type, so use a
# copy of the converted v2 value field type node.
v3_ft_node = copy.deepcopy(self._conv_ft_node(v2_ft_node['value-type']))
members_node = v2_ft_node.get(prop_name)
if members_node is not None:
- mappings_node = collections.OrderedDict()
+ mappings_node: _MapNode = collections.OrderedDict()
cur = 0
for member_node in members_node:
+ v3_value_node: Union[int, List[int]]
+
if type(member_node) is str:
label = member_node
v3_value_node = cur
# Converts a v2 real field type node to a v3 real field type node
# and returns it.
- def _conv_real_ft_node(self, v2_ft_node):
+ def _conv_real_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 real field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# Converts a v2 string field type node to a v3 string field type
# node and returns it.
- def _conv_string_ft_node(self, v2_ft_node):
+ def _conv_string_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 string field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# Converts a v2 array field type node to a v3 (static) array field
# type node and returns it.
- def _conv_static_array_ft_node(self, v2_ft_node):
+ def _conv_static_array_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# class renamed to `static-array`
- v3_ft_node = collections.OrderedDict({'class': 'static-array'})
+ v3_ft_node: _MapNode = collections.OrderedDict({'class': 'static-array'})
# copy `length` property
_copy_prop_if_exists(v3_ft_node, v2_ft_node, 'length')
# Converts a v2 structure field type node to a v3 structure field
# type node and returns it.
- def _conv_struct_ft_node(self, v2_ft_node):
+ def _conv_struct_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# Create fresh v3 structure field type node, reusing the class
# of `v2_ft_node`.
v3_ft_node = collections.OrderedDict({'class': v2_ft_node['class']})
# Converts a v2 clock type node to a v3 clock type node and returns
# it.
- def _conv_clk_type_node(self, v2_clk_type_node):
+ def _conv_clk_type_node(self, v2_clk_type_node: _MapNode) -> _MapNode:
# copy v2 clock type node
v3_clk_type_node = copy.deepcopy(v2_clk_type_node)
# Converts a v2 event type node to a v3 event type node and returns
# it.
- def _conv_ev_type_node(self, v2_ev_type_node):
+ def _conv_ev_type_node(self, v2_ev_type_node: _MapNode) -> _MapNode:
# create empty v3 event type node
- v3_ev_type_node = collections.OrderedDict()
+ v3_ev_type_node: _MapNode = collections.OrderedDict()
# copy `log-level` property
_copy_prop_if_exists(v3_ev_type_node, v2_ev_type_node, 'log-level')
return v3_ev_type_node
@staticmethod
- def _set_v3_feature_ft_if_exists(v3_features_node, key, node):
+ def _set_v3_feature_ft_if_exists(v3_features_node: _MapNode, key: str,
+ node: Union[Optional[_MapNode], bool]):
val = node
if val is None:
# Converts a v2 stream type node to a v3 stream type node and
# returns it.
- def _conv_stream_type_node(self, v2_stream_type_node):
+ def _conv_stream_type_node(self, v2_stream_type_node: _MapNode) -> _MapNode:
# This function creates a v3 stream type features node from the
# packet context and event header field type nodes of a
# v2 stream type node.
- def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node,
- v2_ev_header_ft_fields_node):
+ def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node: _MapNode,
+ v2_ev_header_ft_fields_node: Optional[_MapNode]) -> _MapNode:
if v2_ev_header_ft_fields_node is None:
v2_ev_header_ft_fields_node = collections.OrderedDict()
v3_ev_type_id_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'id')
v3_ev_time_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node,
'timestamp')
- v3_features_node = collections.OrderedDict()
- v3_pkt_node = collections.OrderedDict()
- v3_ev_node = collections.OrderedDict()
+ v3_features_node: _MapNode = collections.OrderedDict()
+ v3_pkt_node: _MapNode = collections.OrderedDict()
+ v3_ev_node: _MapNode = collections.OrderedDict()
v3_pkt_node['total-size-field-type'] = v3_pkt_total_size_ft_node
v3_pkt_node['content-size-field-type'] = v3_pkt_content_size_ft_node
self._set_v3_feature_ft_if_exists(v3_pkt_node, 'beginning-time-field-type',
v3_features_node['event'] = v3_ev_node
return v3_features_node
- def clk_type_name_from_v2_int_ft_node(v2_int_ft_node):
+ def clk_type_name_from_v2_int_ft_node(v2_int_ft_node: Optional[_MapNode]) -> _OptStr:
if v2_int_ft_node is None:
- return
+ return None
assert v2_int_ft_node['class'] in ('int', 'integer')
prop_mappings_node = v2_int_ft_node.get('property-mappings')
if prop_mappings_node is not None and len(prop_mappings_node) > 0:
return prop_mappings_node[0]['name']
+ return None
+
# create empty v3 stream type node
- v3_stream_type_node = collections.OrderedDict()
+ v3_stream_type_node: _MapNode = collections.OrderedDict()
# rename `$default` property to `$is-default`
_copy_prop_if_exists(v3_stream_type_node, v2_stream_type_node, '$default', '$is-default')
return v3_stream_type_node
# Converts a v2 metadata node to a v3 trace node and returns it.
- def _conv_meta_node(self, v2_meta_node):
- def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node):
+ def _conv_meta_node(self, v2_meta_node: _MapNode) -> _MapNode:
+ def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node: Optional[_MapNode]) -> _MapNode:
def set_if_exists(key, node):
return self._set_v3_feature_ft_if_exists(v3_features_node, key, node)
v3_uuid_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'uuid')
v3_stream_type_id_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node,
'stream_id')
- v3_features_node = collections.OrderedDict()
+ v3_features_node: _MapNode = collections.OrderedDict()
set_if_exists('magic-field-type', v3_magic_ft_node)
set_if_exists('uuid-field-type', v3_uuid_ft_node)
set_if_exists('stream-type-id-field-type', v3_stream_type_id_ft_node)
return v3_features_node
- v3_trace_node = collections.OrderedDict()
- v3_trace_type_node = collections.OrderedDict()
+ v3_trace_node: _MapNode = collections.OrderedDict()
+ v3_trace_type_node: _MapNode = collections.OrderedDict()
v2_trace_node = v2_meta_node['trace']
# rename `byte-order` property to `$default-byte-order`
# Processes the inclusions of the event type node `ev_type_node`,
# returning the effective node.
- def _process_ev_type_node_include(self, ev_type_node):
+ def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
# Make sure the event type node is valid for the inclusion
# processing stage.
self._schema_validator.validate(ev_type_node, '2/config/event-type-pre-include')
# Processes the inclusions of the stream type node
# `stream_type_node`, returning the effective node.
- def _process_stream_type_node_include(self, stream_type_node):
+ def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
def process_children_include(stream_type_node):
prop_name = 'events'
# Processes the inclusions of the trace type node `trace_type_node`,
# returning the effective node.
- def _process_trace_type_node_include(self, trace_type_node):
+ def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
# Make sure the trace type node is valid for the inclusion
# processing stage.
self._schema_validator.validate(trace_type_node, '2/config/trace-type-pre-include')
# Processes the inclusions of the clock type node `clk_type_node`,
# returning the effective node.
- def _process_clk_type_node_include(self, clk_type_node):
+ def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
# Make sure the clock type node is valid for the inclusion
# processing stage.
self._schema_validator.validate(clk_type_node, '2/config/clock-type-pre-include')
# Processes the inclusions of the metadata node `meta_node`,
# returning the effective node.
- def _process_meta_node_include(self, meta_node):
- def process_children_include(meta_node):
+ def _process_meta_node_include(self, meta_node: _MapNode) -> _MapNode:
+ def process_children_include(meta_node: _MapNode):
prop_name = 'trace'
if prop_name in meta_node:
self._transform_config_node()
@property
- def config_node(self):
- return config_parse_common._ConfigNodeV3(self._root_node)
+ def config_node(self) -> config_parse_common._ConfigNodeV3:
+ return config_parse_common._ConfigNodeV3(typing.cast(_MapNode, self._root_node))
import barectf.config_parse_common as barectf_config_parse_common
from barectf.config_parse_common import _ConfigurationParseError
from barectf.config_parse_common import _append_error_ctx
+from barectf.config_parse_common import _MapNode
import barectf.config as barectf_config
+from barectf.config import _OptFt, _OptStructFt
import collections
import uuid
+from barectf.typing import Count, Alignment, VersionNumber
+from typing import Optional, List, Dict, Any, TextIO, Set, Iterable, Callable, Tuple, Type
+import typing
# A barectf 3 YAML configuration parser.
# parsing stages and general strategy.
class _Parser(barectf_config_parse_common._Parser):
# Builds a barectf 3 YAML configuration parser and parses the root
- # configuration node `node` (already loaded from `path`).
- def __init__(self, path, node, with_pkg_include_dir, inclusion_dirs, ignore_include_not_found):
- super().__init__(path, node, with_pkg_include_dir, inclusion_dirs,
- ignore_include_not_found, 3)
- self._ft_cls_name_to_create_method = {
+ # configuration node `node` (already loaded from the file-like
+ # object `root_file`).
+ def __init__(self, root_file: TextIO, node: barectf_config_parse_common._ConfigNodeV3,
+ with_pkg_include_dir: bool, inclusion_dirs: Optional[List[str]],
+ ignore_include_not_found: bool):
+ super().__init__(root_file, node, with_pkg_include_dir, inclusion_dirs,
+ ignore_include_not_found, VersionNumber(3))
+ self._ft_cls_name_to_create_method: Dict[str, Callable[[_MapNode], barectf_config._FieldType]] = {
'unsigned-integer': self._create_int_ft,
'signed-integer': self._create_int_ft,
'unsigned-enumeration': self._create_enum_ft,
# `_ConfigurationParseError` exception using `ctx_obj_name` if it's
# invalid.
@staticmethod
- def _validate_alignment(alignment, ctx_obj_name):
+ def _validate_alignment(alignment: Alignment, ctx_obj_name: str):
assert alignment >= 1
# check for power of two
# `_ConfigurationParseError` exception using `ctx_obj_name` and
# `prop` to format the message if it's invalid.
@staticmethod
- def _validate_iden(iden, ctx_obj_name, prop):
+ def _validate_iden(iden: str, ctx_obj_name: str, prop: str):
assert type(iden) is str
ctf_keywords = {
'align',
raise _ConfigurationParseError(ctx_obj_name, msg)
@staticmethod
- def _alignment_prop(ft_node, prop_name):
+ def _alignment_prop(ft_node: _MapNode, prop_name: str) -> Alignment:
alignment = ft_node.get(prop_name)
if alignment is not None:
_Parser._validate_alignment(alignment, '`prop_name` property')
- return alignment
+ return Alignment(alignment)
@property
- def _trace_type_node(self):
- return self._root_node.config_node['trace']['type']
+ def _trace_type_node(self) -> _MapNode:
+ return self.config_node['trace']['type']
@staticmethod
- def _byte_order_from_node(node):
+ def _byte_order_from_node(node: str) -> barectf_config.ByteOrder:
return {
'big-endian': barectf_config.ByteOrder.BIG_ENDIAN,
'little-endian': barectf_config.ByteOrder.LITTLE_ENDIAN,
# Creates a bit array field type having the type `ft_type` from the
# bit array field type node `ft_node`, passing the additional
# `*args` to ft_type.__init__().
- def _create_common_bit_array_ft(self, ft_node, ft_type, default_alignment, *args):
+ def _create_common_bit_array_ft(self, ft_node: _MapNode,
+ ft_type: Type[barectf_config._BitArrayFieldType],
+ default_alignment: Optional[Alignment],
+ *args) -> barectf_config._BitArrayFieldType:
byte_order = self._byte_order_from_node(ft_node['byte-order'])
alignment = self._alignment_prop(ft_node, 'alignment')
# Creates an integer field type having the type `ft_type` from the
# integer field type node `ft_node`, passing the additional `*args`
# to ft_type.__init__().
- def _create_common_int_ft(self, ft_node, ft_type, *args):
+ def _create_common_int_ft(self, ft_node: _MapNode,
+ ft_type: Type[barectf_config._IntegerFieldType], *args) -> barectf_config._IntegerFieldType:
preferred_display_base = {
'binary': barectf_config.DisplayBase.BINARY,
'octal': barectf_config.DisplayBase.OCTAL,
'decimal': barectf_config.DisplayBase.DECIMAL,
'hexadecimal': barectf_config.DisplayBase.HEXADECIMAL,
}[ft_node.get('preferred-display-base', 'decimal')]
- return self._create_common_bit_array_ft(ft_node, ft_type, None, preferred_display_base, *args)
+ return typing.cast(barectf_config._IntegerFieldType,
+ self._create_common_bit_array_ft(ft_node, ft_type, None,
+ preferred_display_base, *args))
# Creates an integer field type from the unsigned/signed integer
# field type node `ft_node`.
- def _create_int_ft(self, ft_node):
+ def _create_int_ft(self, ft_node: _MapNode) -> barectf_config._IntegerFieldType:
ft_type = {
'unsigned-integer': barectf_config.UnsignedIntegerFieldType,
'signed-integer': barectf_config.SignedIntegerFieldType,
# Creates an enumeration field type from the unsigned/signed
# enumeration field type node `ft_node`.
- def _create_enum_ft(self, ft_node):
+ def _create_enum_ft(self, ft_node: _MapNode) -> barectf_config._EnumerationFieldType:
ft_type = {
'unsigned-enumeration': barectf_config.UnsignedEnumerationFieldType,
'signed-enumeration': barectf_config.SignedEnumerationFieldType,
mappings[label] = barectf_config.EnumerationFieldTypeMapping(ranges)
- return self._create_common_int_ft(ft_node, ft_type,
- barectf_config.EnumerationFieldTypeMappings(mappings))
+ return typing.cast(barectf_config._EnumerationFieldType,
+ self._create_common_int_ft(ft_node, ft_type,
+ barectf_config.EnumerationFieldTypeMappings(mappings)))
# Creates a real field type from the real field type node `ft_node`.
- def _create_real_ft(self, ft_node):
- return self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType, 8)
+ def _create_real_ft(self, ft_node: _MapNode) -> barectf_config.RealFieldType:
+ return typing.cast(barectf_config.RealFieldType,
+ self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType,
+ Alignment(8)))
# Creates a string field type from the string field type node
# `ft_node`.
- def _create_string_ft(self, ft_node):
+ def _create_string_ft(self, ft_node: _MapNode) -> barectf_config.StringFieldType:
return barectf_config.StringFieldType()
# Creates a static array field type from the static array field type
# node `ft_node`.
- def _create_static_array_ft(self, ft_node):
+ def _create_static_array_ft(self, ft_node: _MapNode) -> barectf_config.StaticArrayFieldType:
prop_name = 'element-field-type'
try:
#
# `prop_name` is the name of the property of which `members_node` is
# the value.
- def _create_struct_ft_members(self, members_node, prop_name):
+ def _create_struct_ft_members(self, members_node: List[_MapNode], prop_name: str):
members = collections.OrderedDict()
- member_names = set()
+ member_names: Set[str] = set()
for member_node in members_node:
member_name, member_node = list(member_node.items())[0]
# Creates a structure field type from the structure field type node
# `ft_node`.
- def _create_struct_ft(self, ft_node):
+ def _create_struct_ft(self, ft_node: _MapNode) -> barectf_config.StructureFieldType:
minimum_alignment = self._alignment_prop(ft_node, 'minimum-alignment')
if minimum_alignment is None:
return barectf_config.StructureFieldType(minimum_alignment, members)
# Creates a field type from the field type node `ft_node`.
- def _create_ft(self, ft_node):
+ def _create_ft(self, ft_node: _MapNode) -> barectf_config._FieldType:
return self._ft_cls_name_to_create_method[ft_node['class']](ft_node)
# Creates a field type from the field type node `parent_node[key]`
# if it exists.
- def _try_create_ft(self, parent_node, key):
+ def _try_create_ft(self, parent_node: _MapNode, key: str) -> _OptFt:
if key not in parent_node:
- return
+ return None
try:
return self._create_ft(parent_node[key])
except _ConfigurationParseError as exc:
_append_error_ctx(exc, f'`{key}` property')
+ # satisfy static type checker (never reached)
+ raise
+
+ # Like _try_create_ft(), but casts the result's type to
+ # `barectf_config.StructureFieldType` to satisfy static type
+ # checkers.
+ def _try_create_struct_ft(self, parent_node: _MapNode, key: str) -> _OptStructFt:
+ return typing.cast(barectf_config.StructureFieldType,
+ self._try_create_ft(parent_node, key))
+
# Returns the total number of members in the structure field type
# node `ft_node` if it exists, otherwise 0.
@staticmethod
- def _total_struct_ft_node_members(ft_node):
+ def _total_struct_ft_node_members(ft_node: Optional[_MapNode]) -> Count:
if ft_node is None:
- return 0
+ return Count(0)
members_node = ft_node.get('members')
if members_node is None:
- return 0
+ return Count(0)
- return len(members_node)
+ return Count(len(members_node))
# Creates an event type from the event type node `ev_type_node`
# named `name`.
# stream type). For example, if the stream type has a event header
# field type with `id` and `timestamp` members, then
# `ev_member_count` is 2.
- def _create_ev_type(self, name, ev_type_node, ev_member_count):
+ def _create_ev_type(self, name: str, ev_type_node: _MapNode, ev_member_count: Count) -> barectf_config.EventType:
try:
self._validate_iden(name, '`name` property', 'event type name')
# make sure the event type is not empty
spec_ctx_ft_prop_name = 'specific-context-field-type'
payload_ft_prop_name = 'payload-field-type'
- ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name))
- ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name))
+ ev_member_count = Count(ev_member_count +
+ self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name)))
+ ev_member_count = Count(ev_member_count +
+ self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name)))
if ev_member_count == 0:
raise _ConfigurationParseError('Event type', 'Event type is empty (no members).')
# create event type
return barectf_config.EventType(name, ev_type_node.get('log-level'),
- self._try_create_ft(ev_type_node, spec_ctx_ft_prop_name),
- self._try_create_ft(ev_type_node, payload_ft_prop_name))
+ self._try_create_struct_ft(ev_type_node,
+ spec_ctx_ft_prop_name),
+ self._try_create_struct_ft(ev_type_node,
+ payload_ft_prop_name))
except _ConfigurationParseError as exc:
_append_error_ctx(exc, f'Event type `{name}`')
+ # satisfy static type checker (never reached)
+ raise
+
# Returns the effective feature field type for the field type
# node `parent_node[key]`, if any.
#
#
# Otherwise:
# A created field type.
- def _feature_ft(self, parent_node, key, none=None):
+ def _feature_ft(self, parent_node: _MapNode, key: str, none: Any = None) -> Any:
if key not in parent_node:
# missing: default feature field type
return none
assert type(ft_node) is collections.OrderedDict
return self._create_ft(ft_node)
- def _create_stream_type(self, name, stream_type_node):
+ def _create_stream_type(self, name: str, stream_type_node: _MapNode) -> barectf_config.StreamType:
try:
# validate stream type's name
self._validate_iden(name, '`name` property', 'stream type name')
raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
'Event type ID field type feature is required because stream type has more than one event type')
- if isinstance(ev_type_id_ft, barectf_config._FieldType) and ev_type_count > (1 << ev_type_id_ft.size):
- raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
- f'Field type\'s size ({ev_type_id_ft.size} bits) is too small to accomodate {ev_type_count} event types')
+ if isinstance(ev_type_id_ft, barectf_config._IntegerFieldType):
+ ev_type_id_int_ft = typing.cast(barectf_config._IntegerFieldType, ev_type_id_ft)
+
+ if ev_type_count > (1 << ev_type_id_int_ft.size):
+ raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
+ f'Field type\'s size ({ev_type_id_int_ft.size} bits) is too small to accomodate {ev_type_count} event types')
except _ConfigurationParseError as exc:
exc._append_ctx('`event` property')
_append_error_ctx(exc, '`$features` property')
f'Packet context field type member name `{member_name}` is reserved.')
# create event types
- ev_header_common_ctx_member_count = 0
+ ev_header_common_ctx_member_count = Count(0)
if ev_features.type_id_field_type is not None:
- ev_header_common_ctx_member_count += 1
+ ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1)
if ev_features.time_field_type is not None:
- ev_header_common_ctx_member_count += 1
+ ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1)
ev_common_ctx_ft_prop_name = 'event-common-context-field-type'
ev_common_ctx_ft_node = stream_type_node.get(ev_common_ctx_ft_prop_name)
- ev_header_common_ctx_member_count += self._total_struct_ft_node_members(ev_common_ctx_ft_node)
+ ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count +
+ self._total_struct_ft_node_members(ev_common_ctx_ft_node))
ev_types = set()
for ev_name, ev_type_node in stream_type_node[ev_types_prop_name].items():
# create stream type
return barectf_config.StreamType(name, ev_types, def_clk_type, features,
pkt_ctx_ft_extra_members,
- self._try_create_ft(stream_type_node,
- ev_common_ctx_ft_prop_name))
+ self._try_create_struct_ft(stream_type_node,
+ ev_common_ctx_ft_prop_name))
except _ConfigurationParseError as exc:
_append_error_ctx(exc, f'Stream type `{name}`')
- def _clk_type(self, name, prop_name):
+ # satisfy static type checker (never reached)
+ raise
+
+ def _clk_type(self, name: str, prop_name: str) -> barectf_config.ClockType:
clk_type = self._clk_types.get(name)
if clk_type is None:
return clk_type
- def _create_clk_type(self, name, clk_type_node):
+ def _create_clk_type(self, name: str, clk_type_node: _MapNode) -> barectf_config.ClockType:
self._validate_iden(name, '`name` property', 'clock type name')
clk_type_uuid = None
uuid_node = clk_type_node.get('uuid')
clk_type_uuid = uuid.UUID(uuid_node)
offset_seconds = 0
- offset_cycles = 0
+ offset_cycles = Count(0)
offset_node = clk_type_node.get('offset')
if offset_node is not None:
offset_seconds = offset_node.get('seconds', 0)
- offset_cycles = offset_node.get('cycles', 0)
+ offset_cycles = offset_node.get('cycles', Count(0))
return barectf_config.ClockType(name, clk_type_node.get('frequency', int(1e9)),
clk_type_uuid, clk_type_node.get('description'),
def _create_trace(self):
try:
trace_type = self._create_trace_type()
- trace_node = self._root_node.config_node['trace']
+ trace_node = self.config_node['trace']
env = None
env_node = trace_node.get('environment')
# create options
iden_prefix_def = False
def_stream_type_name_def = False
- opts_node = self._root_node.config_node.get('options')
+ opts_node = self.config_node.get('options')
+ iden_prefix = 'barectf_'
+ file_name_prefix = 'barectf'
if opts_node is not None:
code_gen_opts_node = opts_node.get('code-generation')
# * The `$field-type-aliases` property of the trace type node is
# removed.
def _expand_ft_aliases(self):
- def resolve_ft_alias_from(parent_node, key):
+ def resolve_ft_alias_from(parent_node: _MapNode, key: str):
if key not in parent_node:
return
if type(parent_node[key]) not in [collections.OrderedDict, str]:
return
- return self._resolve_ft_alias_from(ft_aliases_node, parent_node, key)
+ self._resolve_ft_alias_from(ft_aliases_node, parent_node, key)
ft_aliases_node = self._trace_type_node['$field-type-aliases']
# When this method returns, no field type node has an `$inherit`
# property.
def _apply_fts_inheritance(self):
- def apply_ft_inheritance(parent_node, key):
+ def apply_ft_inheritance(parent_node: _MapNode, key: str):
if key not in parent_node:
return
if type(parent_node[key]) is not collections.OrderedDict:
return
- return self._apply_ft_inheritance(parent_node, key)
+ self._apply_ft_inheritance(parent_node, key)
features_prop_name = '$features'
features_node = self._trace_type_node.get(features_prop_name)
#
# This method normalizes form 1 to use form 2.
def _normalize_struct_ft_member_nodes(self):
- def normalize_members_node(members_node):
+ def normalize_members_node(members_node: List[_MapNode]):
ft_prop_name = 'field-type'
for member_node in members_node:
normalize_struct_ft_member_nodes(member_node[member_name], ft_prop_name)
- def normalize_struct_ft_member_nodes(parent_node, key):
+ def normalize_struct_ft_member_nodes(parent_node: _MapNode, key: str):
if type(parent_node) is not collections.OrderedDict:
return
if type(ft_node) is not collections.OrderedDict:
return
+ ft_node = typing.cast(collections.OrderedDict, ft_node)
members_nodes = ft_node.get('members')
if members_nodes is not None:
def _expand_fts(self):
# Make sure that the current configuration node is valid
# considering field types are not expanded yet.
- self._schema_validator.validate(self._root_node.config_node,
+ self._schema_validator.validate(self.config_node,
'3/config/config-pre-field-type-expansion')
prop_name = '$field-type-aliases'
def _sub_log_level_aliases(self):
# Make sure that the current configuration node is valid
# considering log level aliases are not substituted yet.
- self._schema_validator.validate(self._root_node.config_node,
+ self._schema_validator.validate(self.config_node,
'3/config/config-pre-log-level-alias-sub')
log_level_aliases_prop_name = '$log-level-aliases'
#
# It is safe to delete a yielded node during the iteration.
@staticmethod
- def _props(node):
+ def _props(node: Any) -> Iterable[Tuple[Any, str]]:
if type(node) is collections.OrderedDict:
for key in list(node):
yield from _Parser._props(node[key])
for item_node in node:
yield from _Parser._props(item_node)
- def _trace_type_props(self):
- yield from _Parser._props(self._root_node.config_node['trace']['type'])
+ def _trace_type_props(self) -> Iterable[Tuple[Any, str]]:
+ yield from _Parser._props(self.config_node['trace']['type'])
# Normalize the properties of the configuration node.
#
# This method also applies 1. to the trace node's `environment`
# property.
def _normalize_props(self):
- def normalize_byte_order_prop(parent_node, key):
+ def normalize_byte_order_prop(parent_node: _MapNode, key: str):
node = parent_node[key]
if node in ['be', 'big']:
elif node in ['le', 'little']:
parent_node[key] = 'little-endian'
- trace_node = self._root_node.config_node['trace']
+ trace_node = self.config_node['trace']
trace_type_node = trace_node['type']
prop_name = '$default-byte-order'
'real',
}
- def set_ft_node_byte_order_prop(parent_node, key):
+ def set_ft_node_byte_order_prop(parent_node: _MapNode, key: str):
if key not in parent_node:
return
set_ft_node_byte_order_prop(ft_node, 'element-field-type')
- def set_struct_ft_node_members_byte_order_prop(members_node):
+ def set_struct_ft_node_members_byte_order_prop(members_node: List[_MapNode]):
for member_node in members_node:
member_name, member_node = list(member_node.items())[0]
# Processes the inclusions of the event type node `ev_type_node`,
# returning the effective node.
- def _process_ev_type_node_include(self, ev_type_node):
+ def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
# Make sure the event type node is valid for the inclusion
# processing stage.
self._schema_validator.validate(ev_type_node, '3/config/event-type-pre-include')
# Processes the inclusions of the stream type node
# `stream_type_node`, returning the effective node.
- def _process_stream_type_node_include(self, stream_type_node):
- def process_children_include(stream_type_node):
+ def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
+ def process_children_include(stream_type_node: _MapNode):
prop_name = 'event-types'
if prop_name in stream_type_node:
# Processes the inclusions of the clock type node `clk_type_node`,
# returning the effective node.
- def _process_clk_type_node_include(self, clk_type_node):
+ def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
# Make sure the clock type node is valid for the inclusion
# processing stage.
self._schema_validator.validate(clk_type_node, '3/config/clock-type-pre-include')
# Processes the inclusions of the trace type node `trace_type_node`,
# returning the effective node.
- def _process_trace_type_node_include(self, trace_type_node):
- def process_children_include(trace_type_node):
+ def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
+ def process_children_include(trace_type_node: _MapNode):
prop_name = 'clock-types'
if prop_name in trace_type_node:
# Processes the inclusions of the trace node `trace_node`, returning
# the effective node.
- def _process_trace_node_include(self, trace_node):
- def process_children_include(trace_node):
+ def _process_trace_node_include(self, trace_node: _MapNode) -> _MapNode:
+ def process_children_include(trace_node: _MapNode):
prop_name = 'type'
trace_node[prop_name] = self._process_trace_type_node_include(trace_node[prop_name])
#
# First, make sure the configuration node itself is valid for
# the inclusion processing stage.
- self._schema_validator.validate(self._root_node.config_node, '3/config/config-pre-include')
+ self._schema_validator.validate(self.config_node, '3/config/config-pre-include')
# Process trace node inclusions.
#
# self._process_trace_node_include() returns a new (or the same)
# trace node without any `$include` property in it, recursively.
- self._root_node.config_node['trace'] = self._process_trace_node_include(self._root_node.config_node['trace'])
+ self.config_node['trace'] = self._process_trace_node_include(self.config_node['trace'])
def _parse(self):
# process configuration node inclusions
# At this point, the configuration node must be valid as an
# effective configuration node.
- self._schema_validator.validate(self._root_node.config_node, '3/config/config')
+ self._schema_validator.validate(self.config_node, '3/config/config')
# Normalize properties.
#
self._create_config()
@property
- def config(self):
+ def config(self) -> barectf_config.Configuration:
return self._config
@property
- def config_node(self):
- return self._root_node
+ def config_node(self) -> _MapNode:
+ return typing.cast(barectf_config_parse_common._ConfigNodeV3, self._root_node).config_node
--- /dev/null
+# The MIT License (MIT)
+#
+# Copyright (c) 2020 Philippe Proulx <pproulx@efficios.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import typing
+
+Index = typing.NewType('Index', int)
+Count = typing.NewType('Count', int)
+Id = typing.NewType('Id', int)
+Alignment = typing.NewType('Alignment', int)
+VersionNumber = typing.NewType('VersionNumber', int)
+_OptStr = typing.Optional[str]