Add Python type hints
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 11 Aug 2020 18:33:35 +0000 (14:33 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Tue, 11 Aug 2020 18:33:35 +0000 (14:33 -0400)
This patch adds Python type hints to all the modules except
`codegen.py`, `gen.py`, and `tsdl182gen.py`, as it is likely that those
will change significantly in the future.

Mypy 0.782 reports no errors with this patch.

The few errors that were found during the type hint introduction process
are fixed as part of this patch.

`typing.py` is a new module which contains public and private type
aliases, mostly derivatives of `int` to add semantics (index, count,
version number, and the rest). The ones that are public are available
from the `barectf` package itself (`__init__.py`). A `barectf` API user
doesn't need to use them without static type checking needs. If she
wants to, then she must use `barectf` types explicitly, for example:

    import barectf

    clk_type = barectf.ClockType('my_clock',
                                 frequency=barectf.Count(100000))

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
barectf/__init__.py
barectf/argpar.py
barectf/cli.py
barectf/config.py
barectf/config_file.py
barectf/config_parse.py
barectf/config_parse_common.py
barectf/config_parse_v2.py
barectf/config_parse_v3.py
barectf/typing.py [new file with mode: 0644]

index 0e2cfada50a6a472f36224f9126323591e0af0f0..0dcfe8f8e5e525754567591286aeaa25d04d545a 100644 (file)
@@ -26,6 +26,7 @@ import barectf.version as barectf_version
 import barectf.config as barectf_config
 import barectf.config_file as barectf_config_file
 import barectf.gen as barectf_gen
+import barectf.typing as barectf_typing
 
 
 # version API
@@ -35,6 +36,14 @@ __patch_version__ = barectf_version.__patch_version__
 __version__ = barectf_version.__version__
 
 
+# common typing API
+Index = barectf_typing.Index
+Count = barectf_typing.Count
+Id = barectf_typing.Id
+Alignment = barectf_typing.Alignment
+VersionNumber = barectf_typing.VersionNumber
+
+
 # configuration API
 _ArrayFieldType = barectf_config._ArrayFieldType
 _BitArrayFieldType = barectf_config._BitArrayFieldType
@@ -56,6 +65,7 @@ EnumerationFieldTypeMapping = barectf_config.EnumerationFieldTypeMapping
 EnumerationFieldTypeMappingRange = barectf_config.EnumerationFieldTypeMappingRange
 EnumerationFieldTypeMappings = barectf_config.EnumerationFieldTypeMappings
 EventType = barectf_config.EventType
+LogLevel = barectf_config.LogLevel
 RealFieldType = barectf_config.RealFieldType
 SignedEnumerationFieldType = barectf_config.SignedEnumerationFieldType
 SignedIntegerFieldType = barectf_config.SignedIntegerFieldType
@@ -92,3 +102,4 @@ del barectf_version
 del barectf_config
 del barectf_config_file
 del barectf_gen
+del barectf_typing
index c9fba5e8d03d0488b17d589fcc84258c22872126..8bc7a056fa9988181d9cf59305c5c117dd06a7e1 100644 (file)
 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 import re
-import collections
+import typing
+from typing import Optional, List, Iterable
+from barectf.typing import Index, _OptStr
 
 
-__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse']
+__all__ = ['OptDescr', '_OptItem', '_NonOptItem', '_Error', 'parse', 'OrigArgs']
+
+
+# types
+OrigArgs = List[str]
 
 
 # Option descriptor.
@@ -36,22 +42,23 @@ class OptDescr:
     #
     # If `has_arg` is `True`, then it is expected that such an option
     # has an argument.
-    def __init__(self, short_name=None, long_name=None, has_arg=False):
+    def __init__(self, short_name: _OptStr = None, long_name: _OptStr = None,
+                 has_arg: bool = False):
         assert short_name is not None or long_name is not None
         self._short_name = short_name
         self._long_name = long_name
         self._has_arg = has_arg
 
     @property
-    def short_name(self):
+    def short_name(self) -> _OptStr:
         return self._short_name
 
     @property
-    def long_name(self):
+    def long_name(self) -> _OptStr:
         return self._long_name
 
     @property
-    def has_arg(self):
+    def has_arg(self) -> Optional[bool]:
         return self._has_arg
 
 
@@ -61,82 +68,85 @@ class _Item:
 
 # Parsed option argument item.
 class _OptItem(_Item):
-    def __init__(self, descr, arg_text=None):
+    def __init__(self, descr: OptDescr, arg_text: _OptStr = None):
         self._descr = descr
         self._arg_text = arg_text
 
     @property
-    def descr(self):
+    def descr(self) -> OptDescr:
         return self._descr
 
     @property
-    def arg_text(self):
+    def arg_text(self) -> _OptStr:
         return self._arg_text
 
 
 # Parsed non-option argument item.
 class _NonOptItem(_Item):
-    def __init__(self, text, orig_arg_index, non_opt_index):
+    def __init__(self, text: str, orig_arg_index: Index, non_opt_index: Index):
         self._text = text
         self._orig_arg_index = orig_arg_index
         self._non_opt_index = non_opt_index
 
     @property
-    def text(self):
+    def text(self) -> str:
         return self._text
 
     @property
-    def orig_arg_index(self):
+    def orig_arg_index(self) -> Index:
         return self._orig_arg_index
 
     @property
-    def non_opt_index(self):
+    def non_opt_index(self) -> Index:
         return self._non_opt_index
 
 
 # Results of parse().
 class _ParseRes:
-    def __init__(self, items, ingested_orig_args, remaining_orig_args):
+    def __init__(self, items: List[_Item], ingested_orig_args: OrigArgs,
+                 remaining_orig_args: OrigArgs):
         self._items = items
         self._ingested_orig_args = ingested_orig_args
         self._remaining_orig_args = remaining_orig_args
 
     @property
-    def items(self):
+    def items(self) -> List[_Item]:
         return self._items
 
     @property
-    def ingested_orig_args(self):
+    def ingested_orig_args(self) -> OrigArgs:
         return self._ingested_orig_args
 
     @property
-    def remaining_orig_args(self):
+    def remaining_orig_args(self) -> OrigArgs:
         return self._remaining_orig_args
 
 
 # Parsing error.
 class _Error(Exception):
-    def __init__(self, orig_arg_index, orig_arg, msg):
+    def __init__(self, orig_arg_index: Index, orig_arg: str, msg: str):
         super().__init__(msg)
         self._orig_arg_index = orig_arg_index
         self._orig_arg = orig_arg
         self._msg = msg
 
     @property
-    def orig_arg_index(self):
+    def orig_arg_index(self) -> Index:
         return self._orig_arg_index
 
     @property
-    def orig_arg(self):
+    def orig_arg(self) -> str:
         return self._orig_arg
 
     @property
-    def msg(self):
+    def msg(self) -> str:
         return self._msg
 
 
 # Results of parse_short_opts() and parse_long_opt(); internal.
-_OptParseRes = collections.namedtuple('_OptParseRes', ['items', 'orig_arg_index_incr'])
+class _OptParseRes(typing.NamedTuple):
+    items: List[_Item]
+    orig_arg_index_incr: int
 
 
 # Parses the original arguments `orig_args` (list of strings),
@@ -227,11 +237,13 @@ _OptParseRes = collections.namedtuple('_OptParseRes', ['items', 'orig_arg_index_
 # resulting option items.
 #
 # On failure, this function raises an `_Error` object.
-def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
+def parse(orig_args: OrigArgs, opt_descrs: Iterable[OptDescr],
+          fail_on_unknown_opt: bool = True) -> _ParseRes:
     # Finds and returns an option description amongst `opt_descrs`
     # having the short option name `short_name` OR the long option name
     # `long_name` (not both).
-    def find_opt_descr(short_name=None, long_name=None):
+    def find_opt_descr(short_name: _OptStr = None,
+                       long_name: _OptStr = None) -> Optional[OptDescr]:
         for opt_descr in opt_descrs:
             if short_name is not None and short_name == opt_descr.short_name:
                 return opt_descr
@@ -239,6 +251,8 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
             if long_name is not None and long_name == opt_descr.long_name:
                 return opt_descr
 
+        return None
+
     # Parses a short option original argument, returning an
     # `_OptParseRes` object.
     #
@@ -261,9 +275,9 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
     # If any of the short options of `orig_arg` is unknown, then this
     # function raises an error if `fail_on_unknown_opt` is `True`, or
     # returns `None` otherwise.
-    def parse_short_opts():
+    def parse_short_opts() -> Optional[_OptParseRes]:
         short_opts = orig_arg[1:]
-        items = []
+        items: List[_Item] = []
         done = False
         index = 0
         orig_arg_index_incr = 1
@@ -278,7 +292,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
                     raise _Error(orig_arg_index, orig_arg, f'Unknown short option `-{short_opt}`')
 
                 # discard collected arguments
-                return
+                return None
 
             opt_arg = None
 
@@ -324,7 +338,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
     #
     # If the long option is unknown, then this function raises an error
     # if `fail_on_unknown_opt` is `True`, or returns `None` otherwise.
-    def parse_long_opt():
+    def parse_long_opt() -> Optional[_OptParseRes]:
         long_opt = orig_arg[2:]
         m = re.match(r'--([^=]+)=(.*)', orig_arg)
 
@@ -340,7 +354,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
                 raise _Error(orig_arg_index, orig_arg, f'Unknown long option `--{long_opt}`')
 
             # discard
-            return
+            return None
 
         orig_arg_index_incr = 1
 
@@ -361,9 +375,9 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
         return _OptParseRes([item], orig_arg_index_incr)
 
     # parse original arguments
-    items = []
-    orig_arg_index = 0
-    non_opt_index = 0
+    items: List[_Item] = []
+    orig_arg_index = Index(0)
+    non_opt_index = Index(0)
 
     while orig_arg_index < len(orig_args):
         orig_arg = orig_args[orig_arg_index]
@@ -378,7 +392,7 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
             # option
             if orig_arg[1] == '-':
                 if orig_arg == '--':
-                    raise _Error(orig_arg_index, 'Invalid `--` argument')
+                    raise _Error(orig_arg_index, orig_arg, 'Invalid `--` argument')
 
                 # long option
                 res = parse_long_opt()
@@ -392,11 +406,11 @@ def parse(orig_args, opt_descrs, fail_on_unknown_opt=True):
                 return _ParseRes(items, orig_args[:orig_arg_index], orig_args[orig_arg_index:])
 
             items += res.items
-            orig_arg_index += res.orig_arg_index_incr
+            orig_arg_index = Index(orig_arg_index + res.orig_arg_index_incr)
         else:
             # non-option
             items.append(_NonOptItem(orig_arg, orig_arg_index, non_opt_index))
-            non_opt_index += 1
-            orig_arg_index += 1
+            non_opt_index = Index(non_opt_index + 1)
+            orig_arg_index = Index(orig_arg_index + 1)
 
     return _ParseRes(items, orig_args, [])
index 3b21ec4df7f6584fb34587be9a9fafc0234bad5d..f8329873744e0b8ef5be9870fb678f624dbed837 100644 (file)
@@ -29,13 +29,16 @@ import os.path
 import barectf
 import barectf.config_parse_common as barectf_config_parse_common
 import barectf.argpar as barectf_argpar
+from typing import Any, List, Iterable, NoReturn
+import typing
+from barectf.typing import Index, Count
 import sys
 import os
 
 
 # Colors and prints the error message `msg` and exits with status code
 # 1.
-def _print_error(msg):
+def _print_error(msg: str) -> NoReturn:
     termcolor.cprint('Error: ', 'red', end='', file=sys.stderr)
     termcolor.cprint(msg, 'red', attrs=['bold'], file=sys.stderr)
     sys.exit(1)
@@ -43,7 +46,7 @@ def _print_error(msg):
 
 # Pretty-prints the barectf configuration error `exc` and exits with
 # status code 1.
-def _print_config_error(exc):
+def _print_config_error(exc: barectf._ConfigurationParseError) -> NoReturn:
     # reverse: most precise message comes last
     for ctx in reversed(exc.context):
         msg = ''
@@ -60,7 +63,7 @@ def _print_config_error(exc):
 
 
 # Pretty-prints the unknown exception `exc`.
-def _print_unknown_exc(exc):
+def _print_unknown_exc(exc: Exception) -> NoReturn:
     import traceback
 
     traceback.print_exc()
@@ -69,12 +72,16 @@ def _print_unknown_exc(exc):
 
 # Finds and returns all the option items in `items` having the long name
 # `long_name`.
-def _find_opt_items(items, long_name):
-    ret_items = []
+def _find_opt_items(items: Iterable[barectf_argpar._Item],
+                    long_name: str) -> List[barectf_argpar._OptItem]:
+    ret_items: List[barectf_argpar._OptItem] = []
 
     for item in items:
-        if type(item) is barectf_argpar._OptItem and item.descr.long_name == long_name:
-            ret_items.append(item)
+        if type(item) is barectf_argpar._OptItem:
+            item = typing.cast(barectf_argpar._OptItem, item)
+
+            if item.descr.long_name == long_name:
+                ret_items.append(item)
 
     return ret_items
 
@@ -91,7 +98,8 @@ def _find_opt_items(items, long_name):
 # `items`.
 #
 # Returns `default` if there's no such option item.
-def _opt_item_val(items, long_name, default=None):
+def _opt_item_val(items: Iterable[barectf_argpar._Item], long_name: str,
+                  default: Any = None) -> Any:
     opt_items = _find_opt_items(items, long_name)
 
     if len(opt_items) == 0:
@@ -109,7 +117,7 @@ class _CliError(Exception):
     pass
 
 
-def _cfg_file_path_from_parse_res(parse_res):
+def _cfg_file_path_from_parse_res(parse_res: barectf_argpar._ParseRes) -> str:
     cfg_file_path = None
 
     for item in parse_res.items:
@@ -117,7 +125,7 @@ def _cfg_file_path_from_parse_res(parse_res):
             if cfg_file_path is not None:
                 raise _CliError('Multiple configuration file paths provided')
 
-            cfg_file_path = item.text
+            cfg_file_path = typing.cast(barectf_argpar._NonOptItem, item).text
 
     if cfg_file_path is None:
         raise _CliError('Missing configuration file path')
@@ -130,12 +138,13 @@ def _cfg_file_path_from_parse_res(parse_res):
 
 # Returns a `_CfgCmdCfg` object from the command-line parsing results
 # `parse_res`.
-def _cfg_cmd_cfg_from_parse_res(parse_res):
+def _cfg_cmd_cfg_from_parse_res(parse_res: barectf_argpar._ParseRes) -> '_CfgCmdCfg':
     # check configuration file path
     cfg_file_path = _cfg_file_path_from_parse_res(parse_res)
 
-    # inclusion directories
-    inclusion_dirs = [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')]
+    # inclusion directories (`--include-dir` option needs an argument)
+    inclusion_dirs = typing.cast(List[str],
+                                 [item.arg_text for item in _find_opt_items(parse_res.items, 'include-dir')])
 
     for dir in inclusion_dirs:
         if not os.path.isdir(dir):
@@ -175,7 +184,7 @@ Options:
 
 # Returns a source and metadata stream file generating command object
 # from the specific command-line arguments `orig_args`.
-def _gen_cmd_cfg_from_args(orig_args):
+def _gen_cmd_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_GenCmd':
     # parse original arguments
     opt_descrs = [
         barectf_argpar.OptDescr('h', 'help'),
@@ -237,7 +246,7 @@ Options:
 
 # Returns an effective configuration showing command object from the
 # specific command-line arguments `orig_args`.
-def _show_effective_cfg_cfg_from_args(orig_args):
+def _show_effective_cfg_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowEffectiveCfgCmd':
     # parse original arguments
     opt_descrs = [
         barectf_argpar.OptDescr('h', 'help'),
@@ -270,7 +279,7 @@ def _show_effective_cfg_cfg_from_args(orig_args):
     return _ShowEffectiveCfgCmd(_ShowEffectiveCfgCmdCfg(cfg_cmd_cfg.cfg_file_path,
                                                         cfg_cmd_cfg.inclusion_dirs,
                                                         cfg_cmd_cfg.ignore_inclusion_file_not_found,
-                                                        indent_space_count))
+                                                        Count(indent_space_count)))
 
 
 def _show_cfg_version_cmd_usage():
@@ -285,7 +294,7 @@ Options:
 
 # Returns a configuration version showing command object from the
 # specific command-line arguments `orig_args`.
-def _show_cfg_version_cfg_from_args(orig_args):
+def _show_cfg_version_cfg_from_args(orig_args: barectf_argpar.OrigArgs) -> '_ShowCfgVersionCmd':
     # parse original arguments
     opt_descrs = [
         barectf_argpar.OptDescr('h', 'help'),
@@ -336,7 +345,7 @@ Run `barectf COMMAND --help` to show the help of COMMAND.''')
 # Returns a command object from the command-line arguments `orig_args`.
 #
 # All the `orig_args` elements are considered.
-def _cmd_from_args(orig_args):
+def _cmd_from_args(orig_args: barectf_argpar.OrigArgs) -> '_Cmd':
     # We use our `argpar` module here instead of Python's `argparse`
     # because we need to support the two following use cases:
     #
@@ -363,23 +372,24 @@ def _cmd_from_args(orig_args):
         'show-config-version': _show_cfg_version_cfg_from_args,
         'show-cfg-version': _show_cfg_version_cfg_from_args,
     }
-    general_opt_items = []
+    general_opt_items: List[barectf_argpar._OptItem] = []
     cmd_first_orig_arg_index = None
     cmd_from_args_func = None
 
     for item in res.items:
         if type(item) is barectf_argpar._NonOptItem:
+            item = typing.cast(barectf_argpar._NonOptItem, item)
             cmd_from_args_func = cmd_from_args_funcs.get(item.text)
 
             if cmd_from_args_func is None:
                 cmd_first_orig_arg_index = item.orig_arg_index
             else:
-                cmd_first_orig_arg_index = item.orig_arg_index + 1
+                cmd_first_orig_arg_index = Index(item.orig_arg_index + 1)
 
             break
         else:
             assert type(item) is barectf_argpar._OptItem
-            general_opt_items.append(item)
+            general_opt_items.append(typing.cast(barectf_argpar._OptItem, item))
 
     # general help?
     if len(_find_opt_items(general_opt_items, 'help')) > 0:
@@ -406,30 +416,31 @@ class _CmdCfg:
 
 
 class _CfgCmdCfg(_CmdCfg):
-    def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found):
+    def __init__(self, cfg_file_path: str, inclusion_dirs: List[str],
+                 ignore_inclusion_file_not_found: bool):
         self._cfg_file_path = cfg_file_path
         self._inclusion_dirs = inclusion_dirs
         self._ignore_inclusion_file_not_found = ignore_inclusion_file_not_found
 
     @property
-    def cfg_file_path(self):
+    def cfg_file_path(self) -> str:
         return self._cfg_file_path
 
     @property
-    def inclusion_dirs(self):
+    def inclusion_dirs(self) -> List[str]:
         return self._inclusion_dirs
 
     @property
-    def ignore_inclusion_file_not_found(self):
+    def ignore_inclusion_file_not_found(self) -> bool:
         return self._ignore_inclusion_file_not_found
 
 
 class _Cmd:
-    def __init__(self, cfg):
+    def __init__(self, cfg: _CmdCfg):
         self._cfg = cfg
 
     @property
-    def cfg(self):
+    def cfg(self) -> _CmdCfg:
         return self._cfg
 
     def exec(self):
@@ -437,8 +448,9 @@ class _Cmd:
 
 
 class _GenCmdCfg(_CfgCmdCfg):
-    def __init__(self, cfg_file_path, c_source_dir, c_header_dir, metadata_stream_dir,
-                 inclusion_dirs, ignore_inclusion_file_not_found, dump_config, v2_prefix):
+    def __init__(self, cfg_file_path: str, c_source_dir: str, c_header_dir: str,
+                 metadata_stream_dir: str, inclusion_dirs: List[str],
+                 ignore_inclusion_file_not_found: bool, dump_config: bool, v2_prefix: str):
         super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found)
         self._c_source_dir = c_source_dir
         self._c_header_dir = c_header_dir
@@ -447,23 +459,23 @@ class _GenCmdCfg(_CfgCmdCfg):
         self._v2_prefix = v2_prefix
 
     @property
-    def c_source_dir(self):
+    def c_source_dir(self) -> str:
         return self._c_source_dir
 
     @property
-    def c_header_dir(self):
+    def c_header_dir(self) -> str:
         return self._c_header_dir
 
     @property
-    def metadata_stream_dir(self):
+    def metadata_stream_dir(self) -> str:
         return self._metadata_stream_dir
 
     @property
-    def dump_config(self):
+    def dump_config(self) -> bool:
         return self._dump_config
 
     @property
-    def v2_prefix(self):
+    def v2_prefix(self) -> str:
         return self._v2_prefix
 
 
@@ -531,13 +543,13 @@ class _GenCmd(_Cmd):
 
 
 class _ShowEffectiveCfgCmdCfg(_CfgCmdCfg):
-    def __init__(self, cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found,
-                 indent_space_count):
+    def __init__(self, cfg_file_path: str, inclusion_dirs: List[str],
+                 ignore_inclusion_file_not_found: bool, indent_space_count: Count):
         super().__init__(cfg_file_path, inclusion_dirs, ignore_inclusion_file_not_found)
         self._indent_space_count = indent_space_count
 
     @property
-    def indent_space_count(self):
+    def indent_space_count(self) -> Count:
         return self._indent_space_count
 
 
@@ -556,11 +568,11 @@ class _ShowEffectiveCfgCmd(_Cmd):
 
 
 class _ShowCfgVersionCmdCfg(_CmdCfg):
-    def __init__(self, cfg_file_path):
+    def __init__(self, cfg_file_path: str):
         self._cfg_file_path = cfg_file_path
 
     @property
-    def cfg_file_path(self):
+    def cfg_file_path(self) -> str:
         return self._cfg_file_path
 
 
index 319d28c6e7cf7fcb95ac6f7809a4891c68ad4d94..edf1bfe217a421a5075ca2e143c0a635b5350533 100644 (file)
 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 import barectf.version as barectf_version
+from typing import Optional, Any, FrozenSet, Mapping, Iterator, Set, Union
+import typing
+from barectf.typing import Count, Alignment, _OptStr, Id
 import collections.abc
 import collections
 import datetime
 import enum
+import uuid as uuidp
 
 
 @enum.unique
@@ -36,26 +40,27 @@ class ByteOrder(enum.Enum):
 
 class _FieldType:
     @property
-    def alignment(self):
+    def alignment(self) -> Alignment:
         raise NotImplementedError
 
 
 class _BitArrayFieldType(_FieldType):
-    def __init__(self, size, byte_order=None, alignment=1):
+    def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+                 alignment: Alignment = Alignment(1)):
         self._size = size
         self._byte_order = byte_order
         self._alignment = alignment
 
     @property
-    def size(self):
+    def size(self) -> Count:
         return self._size
 
     @property
-    def byte_order(self):
+    def byte_order(self) -> Optional[ByteOrder]:
         return self._byte_order
 
     @property
-    def alignment(self):
+    def alignment(self) -> Alignment:
         return self._alignment
 
 
@@ -67,18 +72,19 @@ class DisplayBase(enum.Enum):
 
 
 class _IntegerFieldType(_BitArrayFieldType):
-    def __init__(self, size, byte_order=None, alignment=None,
-                 preferred_display_base=DisplayBase.DECIMAL):
+    def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+                 alignment: Optional[Alignment] = None,
+                 preferred_display_base: DisplayBase = DisplayBase.DECIMAL):
         effective_alignment = 1
 
         if alignment is None and size % 8 == 0:
             effective_alignment = 8
 
-        super().__init__(size, byte_order, effective_alignment)
+        super().__init__(size, byte_order, Alignment(effective_alignment))
         self._preferred_display_base = preferred_display_base
 
     @property
-    def preferred_display_base(self):
+    def preferred_display_base(self) -> DisplayBase:
         return self._preferred_display_base
 
 
@@ -93,60 +99,65 @@ class SignedIntegerFieldType(_IntegerFieldType):
 
 
 class EnumerationFieldTypeMappingRange:
-    def __init__(self, lower, upper):
+    def __init__(self, lower: int, upper: int):
         self._lower = lower
         self._upper = upper
 
     @property
-    def lower(self):
+    def lower(self) -> int:
         return self._lower
 
     @property
-    def upper(self):
+    def upper(self) -> int:
         return self._upper
 
-    def __eq__(self, other):
+    def __eq__(self, other: Any) -> bool:
         if type(other) is not type(self):
             return False
 
         return (self._lower, self._upper) == (other._lower, other._upper)
 
-    def __hash__(self):
+    def __hash__(self) -> int:
         return hash((self._lower, self._upper))
 
-    def contains(self, value):
+    def contains(self, value: int) -> bool:
         return self._lower <= value <= self._upper
 
 
 class EnumerationFieldTypeMapping:
-    def __init__(self, ranges):
+    def __init__(self, ranges: Set[EnumerationFieldTypeMappingRange]):
         self._ranges = frozenset(ranges)
 
     @property
-    def ranges(self):
+    def ranges(self) -> FrozenSet[EnumerationFieldTypeMappingRange]:
         return self._ranges
 
-    def ranges_contain_value(self, value):
+    def ranges_contain_value(self, value: int) -> bool:
         return any([rg.contains(value) for rg in self._ranges])
 
 
+_EnumFtMappings = Mapping[str, EnumerationFieldTypeMapping]
+
+
 class EnumerationFieldTypeMappings(collections.abc.Mapping):
-    def __init__(self, mappings):
+    def __init__(self, mappings: _EnumFtMappings):
         self._mappings = {label: mapping for label, mapping in mappings.items()}
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> EnumerationFieldTypeMapping:
         return self._mappings[key]
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[str]:
         return iter(self._mappings)
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self._mappings)
 
 
 class _EnumerationFieldType(_IntegerFieldType):
-    def __init__(self, size, byte_order=None, alignment=None,
-                 preferred_display_base=DisplayBase.DECIMAL, mappings=None):
+    def __init__(self, size: Count, byte_order: Optional[ByteOrder] = None,
+                 alignment: Optional[Alignment] = None,
+                 preferred_display_base: DisplayBase = DisplayBase.DECIMAL,
+                 mappings: Optional[_EnumFtMappings] = None):
         super().__init__(size, byte_order, alignment, preferred_display_base)
         self._mappings = EnumerationFieldTypeMappings({})
 
@@ -154,10 +165,10 @@ class _EnumerationFieldType(_IntegerFieldType):
             self._mappings = EnumerationFieldTypeMappings(mappings)
 
     @property
-    def mappings(self):
+    def mappings(self) -> EnumerationFieldTypeMappings:
         return self._mappings
 
-    def labels_for_value(self, value):
+    def labels_for_value(self, value: int) -> Set[str]:
         labels = set()
 
         for label, mapping in self._mappings.items():
@@ -181,62 +192,66 @@ class RealFieldType(_BitArrayFieldType):
 
 class StringFieldType(_FieldType):
     @property
-    def alignment(self):
-        return 8
+    def alignment(self) -> Alignment:
+        return Alignment(8)
 
 
 class _ArrayFieldType(_FieldType):
-    def __init__(self, element_field_type):
+    def __init__(self, element_field_type: _FieldType):
         self._element_field_type = element_field_type
 
     @property
-    def element_field_type(self):
+    def element_field_type(self) -> _FieldType:
         return self._element_field_type
 
     @property
-    def alignment(self):
+    def alignment(self) -> Alignment:
         return self._element_field_type.alignment
 
 
 class StaticArrayFieldType(_ArrayFieldType):
-    def __init__(self, length, element_field_type):
+    def __init__(self, length: Count, element_field_type: _FieldType):
         super().__init__(element_field_type)
         self._length = length
 
     @property
-    def length(self):
+    def length(self) -> Count:
         return self._length
 
 
 class StructureFieldTypeMember:
-    def __init__(self, field_type):
+    def __init__(self, field_type: _FieldType):
         self._field_type = field_type
 
     @property
-    def field_type(self):
+    def field_type(self) -> _FieldType:
         return self._field_type
 
 
+_StructFtMembers = Mapping[str, StructureFieldTypeMember]
+
+
 class StructureFieldTypeMembers(collections.abc.Mapping):
-    def __init__(self, members):
+    def __init__(self, members: _StructFtMembers):
         self._members = collections.OrderedDict()
 
         for name, member in members.items():
             assert type(member) is StructureFieldTypeMember
             self._members[name] = member
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> StructureFieldTypeMember:
         return self._members[key]
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[str]:
         return iter(self._members)
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self._members)
 
 
 class StructureFieldType(_FieldType):
-    def __init__(self, minimum_alignment=1, members=None):
+    def __init__(self, minimum_alignment: Alignment = Alignment(1),
+                 members: Optional[_StructFtMembers] = None):
         self._minimum_alignment = minimum_alignment
         self._members = StructureFieldTypeMembers({})
 
@@ -246,87 +261,98 @@ class StructureFieldType(_FieldType):
         self._set_alignment()
 
     def _set_alignment(self):
-        self._alignment = self._minimum_alignment
+        self._alignment: Alignment = self._minimum_alignment
 
         for member in self._members.values():
             if member.field_type.alignment > self._alignment:
                 self._alignment = member.field_type.alignment
 
     @property
-    def minimum_alignment(self):
+    def minimum_alignment(self) -> Alignment:
         return self._minimum_alignment
 
     @property
-    def alignment(self):
+    def alignment(self) -> Alignment:
         return self._alignment
 
     @property
-    def members(self):
+    def members(self) -> StructureFieldTypeMembers:
         return self._members
 
 
 class _UniqueByName:
-    def __eq__(self, other):
+    _name: str
+
+    def __eq__(self, other: Any) -> bool:
         if type(other) is not type(self):
             return False
 
         return self._name == other._name
 
-    def __lt__(self, other):
+    def __lt__(self, other: '_UniqueByName'):
         assert type(self) is type(other)
         return self._name < other._name
 
-    def __hash__(self):
+    def __hash__(self) -> int:
         return hash(self._name)
 
 
+_OptFt = Optional[_FieldType]
+_OptStructFt = Optional[StructureFieldType]
+LogLevel = typing.NewType('LogLevel', int)
+
+
 class EventType(_UniqueByName):
-    def __init__(self, name, log_level=None, specific_context_field_type=None,
-                 payload_field_type=None):
-        self._id = None
+    def __init__(self, name: str, log_level: Optional[LogLevel] = None,
+                 specific_context_field_type: _OptStructFt = None, payload_field_type: _OptStructFt = None):
+        self._id: Optional[Id] = None
         self._name = name
         self._log_level = log_level
         self._specific_context_field_type = specific_context_field_type
         self._payload_field_type = payload_field_type
 
     @property
-    def id(self):
+    def id(self) -> Optional[Id]:
         return self._id
 
     @property
-    def name(self):
+    def name(self) -> str:
         return self._name
 
     @property
-    def log_level(self):
+    def log_level(self) -> Optional[LogLevel]:
         return self._log_level
 
     @property
-    def specific_context_field_type(self):
+    def specific_context_field_type(self) -> _OptStructFt:
         return self._specific_context_field_type
 
     @property
-    def payload_field_type(self):
+    def payload_field_type(self) -> _OptStructFt:
         return self._payload_field_type
 
 
 class ClockTypeOffset:
-    def __init__(self, seconds=0, cycles=0):
+    def __init__(self, seconds: int = 0, cycles: Count = Count(0)):
         self._seconds = seconds
         self._cycles = cycles
 
     @property
-    def seconds(self):
+    def seconds(self) -> int:
         return self._seconds
 
     @property
-    def cycles(self):
+    def cycles(self) -> Count:
         return self._cycles
 
 
+_OptUuid = Optional[uuidp.UUID]
+
+
 class ClockType(_UniqueByName):
-    def __init__(self, name, frequency=int(1e9), uuid=None, description=None, precision=0,
-                 offset=None, origin_is_unix_epoch=False):
+    def __init__(self, name: str, frequency: Count = Count(int(1e9)), uuid: _OptUuid = None,
+                 description: _OptStr = None, precision: Count = Count(0),
+                 offset: Optional[ClockTypeOffset] = None, origin_is_unix_epoch: bool = False):
         self._name = name
         self._frequency = frequency
         self._uuid = uuid
@@ -340,46 +366,51 @@ class ClockType(_UniqueByName):
         self._origin_is_unix_epoch = origin_is_unix_epoch
 
     @property
-    def name(self):
+    def name(self) -> str:
         return self._name
 
     @property
-    def frequency(self):
+    def frequency(self) -> Count:
         return self._frequency
 
     @property
-    def uuid(self):
+    def uuid(self) -> _OptUuid:
         return self._uuid
 
     @property
-    def description(self):
+    def description(self) -> _OptStr:
         return self._description
 
     @property
-    def precision(self):
+    def precision(self) -> Count:
         return self._precision
 
     @property
-    def offset(self):
+    def offset(self) -> ClockTypeOffset:
         return self._offset
 
     @property
-    def origin_is_unix_epoch(self):
+    def origin_is_unix_epoch(self) -> bool:
         return self._origin_is_unix_epoch
 
 
 DEFAULT_FIELD_TYPE = 'default'
+_DefaultableUIntFt = Union[str, UnsignedIntegerFieldType]
+_OptDefaultableUIntFt = Optional[_DefaultableUIntFt]
+_OptUIntFt = Optional[UnsignedIntegerFieldType]
 
 
 class StreamTypePacketFeatures:
-    def __init__(self, total_size_field_type=DEFAULT_FIELD_TYPE,
-                 content_size_field_type=DEFAULT_FIELD_TYPE, beginning_time_field_type=None,
-                 end_time_field_type=None, discarded_events_counter_field_type=None):
-        def get_ft(user_ft):
+    def __init__(self, total_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+                 content_size_field_type: _DefaultableUIntFt = DEFAULT_FIELD_TYPE,
+                 beginning_time_field_type: _OptDefaultableUIntFt = None,
+                 end_time_field_type: _OptDefaultableUIntFt = None,
+                 discarded_events_counter_field_type: _OptDefaultableUIntFt = None):
+        def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
             if user_ft == DEFAULT_FIELD_TYPE:
                 return UnsignedIntegerFieldType(64)
 
-            return user_ft
+            return typing.cast(_OptUIntFt, user_ft)
 
         self._total_size_field_type = get_ft(total_size_field_type)
         self._content_size_field_type = get_ft(content_size_field_type)
@@ -388,48 +419,50 @@ class StreamTypePacketFeatures:
         self._discarded_events_counter_field_type = get_ft(discarded_events_counter_field_type)
 
     @property
-    def total_size_field_type(self):
+    def total_size_field_type(self) -> _OptUIntFt:
         return self._total_size_field_type
 
     @property
-    def content_size_field_type(self):
+    def content_size_field_type(self) -> _OptUIntFt:
         return self._content_size_field_type
 
     @property
-    def beginning_time_field_type(self):
+    def beginning_time_field_type(self) -> _OptUIntFt:
         return self._beginning_time_field_type
 
     @property
-    def end_time_field_type(self):
+    def end_time_field_type(self) -> _OptUIntFt:
         return self._end_time_field_type
 
     @property
-    def discarded_events_counter_field_type(self):
+    def discarded_events_counter_field_type(self) -> _OptUIntFt:
         return self._discarded_events_counter_field_type
 
 
 class StreamTypeEventFeatures:
-    def __init__(self, type_id_field_type=DEFAULT_FIELD_TYPE, time_field_type=None):
-        def get_ft(user_field_type):
-            if user_field_type == DEFAULT_FIELD_TYPE:
+    def __init__(self, type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+                 time_field_type: _OptDefaultableUIntFt = None):
+        def get_ft(user_ft: _OptDefaultableUIntFt) -> _OptUIntFt:
+            if user_ft == DEFAULT_FIELD_TYPE:
                 return UnsignedIntegerFieldType(64)
 
-            return user_field_type
+            return typing.cast(_OptUIntFt, user_ft)
 
         self._type_id_field_type = get_ft(type_id_field_type)
         self._time_field_type = get_ft(time_field_type)
 
     @property
-    def type_id_field_type(self):
+    def type_id_field_type(self) -> _OptUIntFt:
         return self._type_id_field_type
 
     @property
-    def time_field_type(self):
+    def time_field_type(self) -> _OptUIntFt:
         return self._time_field_type
 
 
 class StreamTypeFeatures:
-    def __init__(self, packet_features=None, event_features=None):
+    def __init__(self, packet_features: Optional[StreamTypePacketFeatures] = None,
+                 event_features: Optional[StreamTypeEventFeatures] = None):
         self._packet_features = StreamTypePacketFeatures()
 
         if packet_features is not None:
@@ -441,19 +474,21 @@ class StreamTypeFeatures:
             self._event_features = event_features
 
     @property
-    def packet_features(self):
+    def packet_features(self) -> StreamTypePacketFeatures:
         return self._packet_features
 
     @property
-    def event_features(self):
+    def event_features(self) -> StreamTypeEventFeatures:
         return self._event_features
 
 
 class StreamType(_UniqueByName):
-    def __init__(self, name, event_types, default_clock_type=None, features=None,
-                 packet_context_field_type_extra_members=None,
-                 event_common_context_field_type=None):
-        self._id = None
+    def __init__(self, name: str, event_types: Set[EventType],
+                 default_clock_type: Optional[ClockType] = None,
+                 features: Optional[StreamTypeFeatures] = None,
+                 packet_context_field_type_extra_members: Optional[_StructFtMembers] = None,
+                 event_common_context_field_type: _OptStructFt = None):
+        self._id: Optional[Id] = None
         self._name = name
         self._default_clock_type = default_clock_type
         self._event_common_context_field_type = event_common_context_field_type
@@ -462,7 +497,7 @@ class StreamType(_UniqueByName):
         # assign unique IDs
         for index, ev_type in enumerate(sorted(self._event_types, key=lambda evt: evt.name)):
             assert ev_type._id is None
-            ev_type._id = index
+            ev_type._id = Id(index)
 
         self._set_features(features)
         self._packet_context_field_type_extra_members = StructureFieldTypeMembers({})
@@ -473,10 +508,10 @@ class StreamType(_UniqueByName):
         self._set_pkt_ctx_ft()
         self._set_ev_header_ft()
 
-    def _set_features(self, features):
+    def _set_features(self, features: Optional[StreamTypeFeatures]):
         if features is not None:
             self._features = features
-            return
+            return None
 
         ev_time_ft = None
         pkt_beginning_time_ft = None
@@ -493,7 +528,7 @@ class StreamType(_UniqueByName):
                                                                      end_time_field_type=pkt_end_time_ft),
                                             StreamTypeEventFeatures(time_field_type=ev_time_ft))
 
-    def _set_ft_mapped_clk_type_name(self, ft):
+    def _set_ft_mapped_clk_type_name(self, ft: Optional[UnsignedIntegerFieldType]):
         if ft is None:
             return
 
@@ -502,12 +537,14 @@ class StreamType(_UniqueByName):
             ft._mapped_clk_type_name = self._default_clock_type.name
 
     def _set_pkt_ctx_ft(self):
-        def add_member_if_exists(name, ft, set_mapped_clk_type_name=False):
+        members = None
+
+        def add_member_if_exists(name: str, ft: _FieldType, set_mapped_clk_type_name: bool = False):
             nonlocal members
 
             if ft is not None:
                 if set_mapped_clk_type_name:
-                    self._set_ft_mapped_clk_type_name(ft)
+                    self._set_ft_mapped_clk_type_name(typing.cast(UnsignedIntegerFieldType, ft))
 
                 members[name] = StructureFieldTypeMember(ft)
 
@@ -550,78 +587,85 @@ class StreamType(_UniqueByName):
         self._ev_header_ft = StructureFieldType(8, members)
 
     @property
-    def id(self):
+    def id(self) -> Optional[Id]:
         return self._id
 
     @property
-    def name(self):
+    def name(self) -> str:
         return self._name
 
     @property
-    def default_clock_type(self):
+    def default_clock_type(self) -> Optional[ClockType]:
         return self._default_clock_type
 
     @property
-    def features(self):
+    def features(self) -> StreamTypeFeatures:
         return self._features
 
     @property
-    def packet_context_field_type_extra_members(self):
+    def packet_context_field_type_extra_members(self) -> StructureFieldTypeMembers:
         return self._packet_context_field_type_extra_members
 
     @property
-    def event_common_context_field_type(self):
+    def event_common_context_field_type(self) -> _OptStructFt:
         return self._event_common_context_field_type
 
     @property
-    def event_types(self):
+    def event_types(self) -> FrozenSet[EventType]:
         return self._event_types
 
 
+_OptUuidFt = Optional[Union[str, StaticArrayFieldType]]
+
+
 class TraceTypeFeatures:
-    def __init__(self, magic_field_type=DEFAULT_FIELD_TYPE, uuid_field_type=None,
-                 stream_type_id_field_type=DEFAULT_FIELD_TYPE):
-        def get_field_type(user_field_type, default_field_type):
-            if user_field_type == DEFAULT_FIELD_TYPE:
-                return default_field_type
+    def __init__(self, magic_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE,
+                 uuid_field_type: _OptUuidFt = None,
+                 stream_type_id_field_type: _OptDefaultableUIntFt = DEFAULT_FIELD_TYPE):
+        def get_field_type(user_ft: Optional[Union[str, _FieldType]], default_ft: _FieldType) -> _OptFt:
+            if user_ft == DEFAULT_FIELD_TYPE:
+                return default_ft
 
-            return user_field_type
+            return typing.cast(_OptFt, user_ft)
 
-        self._magic_field_type = get_field_type(magic_field_type, UnsignedIntegerFieldType(32))
-        self._uuid_field_type = get_field_type(uuid_field_type,
-                                               StaticArrayFieldType(16, UnsignedIntegerFieldType(8)))
-        self._stream_type_id_field_type = get_field_type(stream_type_id_field_type,
-                                                         UnsignedIntegerFieldType(64))
+        self._magic_field_type = typing.cast(_OptUIntFt, get_field_type(magic_field_type,
+                                                                        UnsignedIntegerFieldType(32)))
+        self._uuid_field_type = typing.cast(Optional[StaticArrayFieldType], get_field_type(uuid_field_type,
+                                                                                           StaticArrayFieldType(Count(16),
+                                                                                                                UnsignedIntegerFieldType(8))))
+        self._stream_type_id_field_type = typing.cast(_OptUIntFt, get_field_type(stream_type_id_field_type,
+                                                                                 UnsignedIntegerFieldType(64)))
 
     @property
-    def magic_field_type(self):
+    def magic_field_type(self) -> _OptUIntFt:
         return self._magic_field_type
 
     @property
-    def uuid_field_type(self):
+    def uuid_field_type(self) -> Optional[StaticArrayFieldType]:
         return self._uuid_field_type
 
     @property
-    def stream_type_id_field_type(self):
+    def stream_type_id_field_type(self) -> _OptUIntFt:
         return self._stream_type_id_field_type
 
 
 class TraceType:
-    def __init__(self, stream_types, default_byte_order, uuid=None, features=None):
+    def __init__(self, stream_types: Set[StreamType], default_byte_order: ByteOrder,
+                 uuid: _OptUuid = None, features: Optional[TraceTypeFeatures] = None):
         self._default_byte_order = default_byte_order
         self._stream_types = frozenset(stream_types)
 
         # assign unique IDs
         for index, stream_type in enumerate(sorted(self._stream_types, key=lambda st: st.name)):
             assert stream_type._id is None
-            stream_type._id = index
+            stream_type._id = Id(index)
 
         self._uuid = uuid
         self._set_features(features)
         self._set_pkt_header_ft()
         self._set_fts_effective_byte_order()
 
-    def _set_features(self, features):
+    def _set_features(self, features: Optional[TraceTypeFeatures]):
         if features is not None:
             self._features = features
             return
@@ -631,20 +675,21 @@ class TraceType:
         self._features = TraceTypeFeatures(uuid_field_type=uuid_ft)
 
     def _set_pkt_header_ft(self):
-        def add_member_if_exists(name, field_type):
+        members = collections.OrderedDict()
+
+        def add_member_if_exists(name: str, ft: _OptFt):
             nonlocal members
 
-            if field_type is not None:
-                members[name] = StructureFieldTypeMember(field_type)
+            if ft is not None:
+                members[name] = StructureFieldTypeMember(ft)
 
-        members = collections.OrderedDict()
         add_member_if_exists('magic', self._features.magic_field_type)
         add_member_if_exists('uuid', self._features.uuid_field_type)
         add_member_if_exists('stream_id', self._features.stream_type_id_field_type)
         self._pkt_header_ft = StructureFieldType(8, members)
 
     def _set_fts_effective_byte_order(self):
-        def set_ft_effective_byte_order(ft):
+        def set_ft_effective_byte_order(ft: _OptFt):
             if ft is None:
                 return
 
@@ -673,47 +718,53 @@ class TraceType:
                 set_ft_effective_byte_order(ev_type._payload_field_type)
 
     @property
-    def default_byte_order(self):
+    def default_byte_order(self) -> ByteOrder:
         return self._default_byte_order
 
     @property
-    def uuid(self):
+    def uuid(self) -> _OptUuid:
         return self._uuid
 
     @property
-    def stream_types(self):
+    def stream_types(self) -> FrozenSet[StreamType]:
         return self._stream_types
 
-    def stream_type(self, name):
+    def stream_type(self, name: str) -> Optional[StreamType]:
         for cand_stream_type in self._stream_types:
             if cand_stream_type.name == name:
                 return cand_stream_type
 
+        return None
+
     @property
-    def features(self):
+    def features(self) -> TraceTypeFeatures:
         return self._features
 
 
+_EnvEntry = Union[str, int]
+_EnvEntries = Mapping[str, _EnvEntry]
+
+
 class TraceEnvironment(collections.abc.Mapping):
-    def __init__(self, environment):
+    def __init__(self, environment: _EnvEntries):
         self._env = {name: value for name, value in environment.items()}
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> _EnvEntry:
         return self._env[key]
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[str]:
         return iter(self._env)
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self._env)
 
 
 class Trace:
-    def __init__(self, type, environment=None):
+    def __init__(self, type: TraceType, environment: Optional[_EnvEntries] = None):
         self._type = type
         self._set_env(environment)
 
-    def _set_env(self, environment):
+    def _set_env(self, environment: Optional[_EnvEntries]):
         init_env = collections.OrderedDict([
             ('domain', 'bare'),
             ('tracer_name', 'barectf'),
@@ -727,49 +778,54 @@ class Trace:
             environment = {}
 
         init_env.update(environment)
-        self._env = TraceEnvironment(init_env)
+        self._env = TraceEnvironment(typing.cast(_EnvEntries, init_env))
 
     @property
-    def type(self):
+    def type(self) -> TraceType:
         return self._type
 
     @property
-    def environment(self):
+    def environment(self) -> TraceEnvironment:
         return self._env
 
 
+_ClkTypeCTypes = Mapping[ClockType, str]
+
+
 class ClockTypeCTypes(collections.abc.Mapping):
-    def __init__(self, c_types):
+    def __init__(self, c_types: _ClkTypeCTypes):
         self._c_types = {clk_type: c_type for clk_type, c_type in c_types.items()}
 
-    def __getitem__(self, key):
+    def __getitem__(self, key: ClockType) -> str:
         return self._c_types[key]
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[ClockType]:
         return iter(self._c_types)
 
-    def __len__(self):
+    def __len__(self) -> int:
         return len(self._c_types)
 
 
 class ConfigurationCodeGenerationHeaderOptions:
-    def __init__(self, identifier_prefix_definition=False,
-                 default_stream_type_name_definition=False):
+    def __init__(self, identifier_prefix_definition: bool = False,
+                 default_stream_type_name_definition: bool = False):
         self._identifier_prefix_definition = identifier_prefix_definition
         self._default_stream_type_name_definition = default_stream_type_name_definition
 
     @property
-    def identifier_prefix_definition(self):
+    def identifier_prefix_definition(self) -> bool:
         return self._identifier_prefix_definition
 
     @property
-    def default_stream_type_name_definition(self):
+    def default_stream_type_name_definition(self) -> bool:
         return self._default_stream_type_name_definition
 
 
 class ConfigurationCodeGenerationOptions:
-    def __init__(self, identifier_prefix='barectf_', file_name_prefix='barectf',
-                 default_stream_type=None, header_options=None, clock_type_c_types=None):
+    def __init__(self, identifier_prefix: str = 'barectf_', file_name_prefix: str = 'barectf',
+                 default_stream_type: Optional[StreamType] = None,
+                 header_options: Optional[ConfigurationCodeGenerationHeaderOptions] = None,
+                 clock_type_c_types: Optional[_ClkTypeCTypes] = None):
         self._identifier_prefix = identifier_prefix
         self._file_name_prefix = file_name_prefix
         self._default_stream_type = default_stream_type
@@ -785,40 +841,41 @@ class ConfigurationCodeGenerationOptions:
             self._clock_type_c_types = ClockTypeCTypes(clock_type_c_types)
 
     @property
-    def identifier_prefix(self):
+    def identifier_prefix(self) -> str:
         return self._identifier_prefix
 
     @property
-    def file_name_prefix(self):
+    def file_name_prefix(self) -> str:
         return self._file_name_prefix
 
     @property
-    def default_stream_type(self):
+    def default_stream_type(self) -> Optional[StreamType]:
         return self._default_stream_type
 
     @property
-    def header_options(self):
+    def header_options(self) -> ConfigurationCodeGenerationHeaderOptions:
         return self._header_options
 
     @property
-    def clock_type_c_types(self):
+    def clock_type_c_types(self) -> ClockTypeCTypes:
         return self._clock_type_c_types
 
 
 class ConfigurationOptions:
-    def __init__(self, code_generation_options=None):
+    def __init__(self,
+                 code_generation_options: Optional[ConfigurationCodeGenerationOptions] = None):
         self._code_generation_options = ConfigurationCodeGenerationOptions()
 
         if code_generation_options is not None:
             self._code_generation_options = code_generation_options
 
     @property
-    def code_generation_options(self):
+    def code_generation_options(self) -> ConfigurationCodeGenerationOptions:
         return self._code_generation_options
 
 
 class Configuration:
-    def __init__(self, trace, options=None):
+    def __init__(self, trace: Trace, options: Optional[ConfigurationOptions] = None):
         self._trace = trace
         self._options = ConfigurationOptions()
 
@@ -837,9 +894,9 @@ class Configuration:
                 clk_type_c_types._c_types[def_clk_type] = 'uint32_t'
 
     @property
-    def trace(self):
+    def trace(self) -> Trace:
         return self._trace
 
     @property
-    def options(self):
+    def options(self) -> ConfigurationOptions:
         return self._options
index 5090d59f96b87b1de18487c917d84dfc52342172..c5a04d7de8618636408b571b84668696435c4b69 100644 (file)
 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
 import barectf.config_parse as barectf_config_parse
+import barectf.config as barectf_config
+from barectf.typing import Count, VersionNumber
+from typing import Optional, List, TextIO
 
 
-def effective_configuration_file(file, with_package_inclusion_directory=True,
-                                 inclusion_directories=None, ignore_inclusion_not_found=False,
-                                 indent_space_count=2):
+def effective_configuration_file(file: TextIO, with_package_inclusion_directory: bool = True,
+                                 inclusion_directories: Optional[List[str]] = None,
+                                 ignore_inclusion_not_found: bool = False,
+                                 indent_space_count: Count = Count(2)) -> str:
     if inclusion_directories is None:
         inclusion_directories = []
 
@@ -36,8 +40,9 @@ def effective_configuration_file(file, with_package_inclusion_directory=True,
                                                        indent_space_count)
 
 
-def configuration_from_file(file, with_package_inclusion_directory=True, inclusion_directories=None,
-                            ignore_inclusion_not_found=False):
+def configuration_from_file(file: TextIO, with_package_inclusion_directory: bool = True,
+                            inclusion_directories: Optional[List[str]] = None,
+                            ignore_inclusion_not_found: bool = False) -> barectf_config.Configuration:
     if inclusion_directories is None:
         inclusion_directories = []
 
@@ -45,5 +50,5 @@ def configuration_from_file(file, with_package_inclusion_directory=True, inclusi
                                            inclusion_directories, ignore_inclusion_not_found)
 
 
-def configuration_file_major_version(file):
+def configuration_file_major_version(file: TextIO) -> VersionNumber:
     return barectf_config_parse._config_file_major_version(file)
index 4b26baa9b192124e5bd2776bd475d7ac7d1705fd..1f7efdaf11404859e03d4a1454a8f585b2d46a9d 100644 (file)
 
 import barectf.config_parse_common as barectf_config_parse_common
 from barectf.config_parse_common import _ConfigurationParseError
+from barectf.config_parse_common import _MapNode
 import barectf.config_parse_v2 as barectf_config_parse_v2
 import barectf.config_parse_v3 as barectf_config_parse_v3
+import barectf.config as barectf_config
 import collections
+from barectf.typing import Count, VersionNumber
+from typing import Optional, List, TextIO
+import typing
 
 
 # Creates and returns a barectf 3 YAML configuration file parser to
 # parse the file-like object `file`.
 #
 # `file` can be a barectf 2 or 3 configuration file.
-def _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found):
+def _create_v3_parser(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+                      ignore_include_not_found: bool) -> barectf_config_parse_v3._Parser:
     try:
         root_node = barectf_config_parse_common._yaml_load(file)
 
         if type(root_node) is barectf_config_parse_common._ConfigNodeV3:
             # barectf 3 configuration file
-            return barectf_config_parse_v3._Parser(file, root_node, with_pkg_include_dir,
-                                                   include_dirs, ignore_include_not_found)
+            return barectf_config_parse_v3._Parser(file,
+                                                   typing.cast(barectf_config_parse_common._ConfigNodeV3,
+                                                               root_node),
+                                                   with_pkg_include_dir, include_dirs,
+                                                   ignore_include_not_found)
         elif type(root_node) is collections.OrderedDict:
             # barectf 2 configuration file
-            v2_parser = barectf_config_parse_v2._Parser(file, root_node, with_pkg_include_dir,
-                                                        include_dirs, ignore_include_not_found)
+            v2_parser = barectf_config_parse_v2._Parser(file, typing.cast(_MapNode, root_node),
+                                                        with_pkg_include_dir, include_dirs,
+                                                        ignore_include_not_found)
             return barectf_config_parse_v3._Parser(file, v2_parser.config_node,
                                                    with_pkg_include_dir, include_dirs,
                                                    ignore_include_not_found)
@@ -54,13 +64,18 @@ def _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_n
         barectf_config_parse_common._append_error_ctx(exc, 'Configuration',
                                                       'Cannot create configuration from YAML file')
 
+        # satisfy static type checker (never reached)
+        raise
+
 
-def _from_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found):
+def _from_file(file: TextIO, with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+               ignore_include_not_found: bool) -> barectf_config.Configuration:
     return _create_v3_parser(file, with_pkg_include_dir, include_dirs, ignore_include_not_found).config
 
 
-def _effective_config_file(file, with_pkg_include_dir, include_dirs, ignore_include_not_found,
-                           indent_space_count):
+def _effective_config_file(file: TextIO, with_pkg_include_dir: bool,
+                           include_dirs: Optional[List[str]], ignore_include_not_found: bool,
+                           indent_space_count: Count) -> str:
     config_node = _create_v3_parser(file, with_pkg_include_dir, include_dirs,
                                     ignore_include_not_found).config_node
     return barectf_config_parse_common._yaml_dump(config_node, indent=indent_space_count,
@@ -68,15 +83,19 @@ def _effective_config_file(file, with_pkg_include_dir, include_dirs, ignore_incl
                                                   explicit_end=True)
 
 
-def _config_file_major_version(file):
+def _config_file_major_version(file: TextIO) -> VersionNumber:
     try:
         root_node = barectf_config_parse_common._yaml_load(file)
 
         if type(root_node) is barectf_config_parse_common._ConfigNodeV3:
             # barectf 3 configuration file
-            return 3
-        elif type(root_node) is collections.OrderedDict:
+            return VersionNumber(3)
+        else:
             # barectf 2 configuration file
-            return 2
+            assert type(root_node) is collections.OrderedDict
+            return VersionNumber(2)
     except _ConfigurationParseError as exc:
         barectf_config_parse_common._append_error_ctx(exc, 'Configuration', 'Cannot load YAML file')
+
+        # satisfy static type checker (never reached)
+        raise
index 06e0c4242a9cdf5a5a91be4774cc8bdf919d9774..e7906940599034ebbd90217a23bf012904d6771c 100644 (file)
 
 import pkg_resources
 import collections
-import jsonschema
+import jsonschema # type: ignore
 import os.path
 import yaml
 import copy
 import os
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, Any, TextIO, MutableMapping, Union, Set, Iterable, Callable, Tuple
+import typing
 
 
 # The context of a configuration parsing error.
 #
 # Such a context object has a name and, optionally, a message.
 class _ConfigurationParseErrorContext:
-    def __init__(self, name, message=None):
+    def __init__(self, name: str, message: _OptStr = None):
         self._name = name
         self._msg = message
 
     @property
-    def name(self):
+    def name(self) -> str:
         return self._name
 
     @property
-    def message(self):
+    def message(self) -> _OptStr:
         return self._msg
 
 
-# Appends the context having the object name `obj_name` and the
-# (optional) message `message` to the `_ConfigurationParseError`
-# exception `exc` and then raises `exc` again.
-def _append_error_ctx(exc, obj_name, message=None):
-    exc._append_ctx(obj_name, message)
-    raise exc
-
-
 # A configuration parsing error.
 #
 # Such an error object contains a list of contexts (`context` property).
@@ -68,14 +63,14 @@ def _append_error_ctx(exc, obj_name, message=None):
 class _ConfigurationParseError(Exception):
     def __init__(self, init_ctx_obj_name, init_ctx_msg=None):
         super().__init__()
-        self._ctx = []
+        self._ctx: List[_ConfigurationParseErrorContext] = []
         self._append_ctx(init_ctx_obj_name, init_ctx_msg)
 
     @property
-    def context(self):
+    def context(self) -> List[_ConfigurationParseErrorContext]:
         return self._ctx
 
-    def _append_ctx(self, name, msg=None):
+    def _append_ctx(self, name: str, msg: _OptStr = None):
         self._ctx.append(_ConfigurationParseErrorContext(name, msg))
 
     def __str__(self):
@@ -92,11 +87,19 @@ class _ConfigurationParseError(Exception):
         return '\n'.join(lines)
 
 
+# Appends the context having the object name `obj_name` and the
+# (optional) message `message` to the `_ConfigurationParseError`
+# exception `exc` and then raises `exc` again.
+def _append_error_ctx(exc: _ConfigurationParseError, obj_name: str, message: _OptStr = None):
+    exc._append_ctx(obj_name, message)
+    raise exc
+
+
 _V3Prefixes = collections.namedtuple('_V3Prefixes', ['identifier', 'file_name'])
 
 
 # Convers a v2 prefix to v3 prefixes.
-def _v3_prefixes_from_v2_prefix(v2_prefix):
+def _v3_prefixes_from_v2_prefix(v2_prefix: str) -> _V3Prefixes:
     return _V3Prefixes(v2_prefix, v2_prefix.rstrip('_'))
 
 
@@ -106,10 +109,15 @@ def _v3_prefixes_from_v2_prefix(v2_prefix):
 # This must never happen in barectf because all our schemas are local;
 # it would mean a programming or schema error.
 class _RefResolver(jsonschema.RefResolver):
-    def resolve_remote(self, uri):
+    def resolve_remote(self, uri: str):
         raise RuntimeError(f'Missing local schema with URI `{uri}`')
 
 
+# Not all static type checkers support type recursion, so let's just use
+# `Any` as a map node's value's type.
+_MapNode = MutableMapping[str, Any]
+
+
 # Schema validator which considers all the schemas found in the
 # subdirectories `subdirs` (at build time) of the barectf package's
 # `schemas` directory.
@@ -117,9 +125,9 @@ class _RefResolver(jsonschema.RefResolver):
 # The only public method is validate() which accepts an instance to
 # validate as well as a schema short ID.
 class _SchemaValidator:
-    def __init__(self, subdirs):
+    def __init__(self, subdirs: Iterable[str]):
         schemas_dir = pkg_resources.resource_filename(__name__, 'schemas')
-        self._store = {}
+        self._store: Dict[str, str] = {}
 
         for subdir in subdirs:
             dir = os.path.join(schemas_dir, subdir)
@@ -155,7 +163,7 @@ class _SchemaValidator:
 
         return dct
 
-    def _validate(self, instance, schema_short_id):
+    def _validate(self, instance: _MapNode, schema_short_id: str):
         # retrieve full schema ID from short ID
         schema_id = f'https://barectf.org/schemas/{schema_short_id}.json'
         assert schema_id in self._store
@@ -187,7 +195,7 @@ class _SchemaValidator:
     #
     # Raises a `_ConfigurationParseError` object, hiding any
     # `jsonschema` exception, on validation failure.
-    def validate(self, instance, schema_short_id):
+    def validate(self, instance: _MapNode, schema_short_id: str):
         try:
             self._validate(instance, schema_short_id)
         except jsonschema.ValidationError as exc:
@@ -234,11 +242,11 @@ class _SchemaValidator:
 
 # barectf 3 YAML configuration node.
 class _ConfigNodeV3:
-    def __init__(self, config_node):
+    def __init__(self, config_node: _MapNode):
         self._config_node = config_node
 
     @property
-    def config_node(self):
+    def config_node(self) -> _MapNode:
         return self._config_node
 
 
@@ -253,11 +261,11 @@ _CONFIG_V3_YAML_TAG = 'tag:barectf.org,2020/3/config'
 # `collections.OrderedDict` object.
 #
 # All YAML maps are loaded as `collections.OrderedDict` objects.
-def _yaml_load(file):
+def _yaml_load(file: TextIO) -> Union[_ConfigNodeV3, _MapNode]:
     class Loader(yaml.Loader):
         pass
 
-    def config_ctor(loader, node):
+    def config_ctor(loader, node) -> _ConfigNodeV3:
         if not isinstance(node, yaml.MappingNode):
             problem = f'Expecting a map for the tag `{node.tag}`'
             raise yaml.constructor.ConstructorError(problem=problem)
@@ -265,7 +273,7 @@ def _yaml_load(file):
         loader.flatten_mapping(node)
         return _ConfigNodeV3(collections.OrderedDict(loader.construct_pairs(node)))
 
-    def mapping_ctor(loader, node):
+    def mapping_ctor(loader, node) -> _MapNode:
         loader.flatten_mapping(node)
         return collections.OrderedDict(loader.construct_pairs(node))
 
@@ -279,7 +287,7 @@ def _yaml_load(file):
         raise _ConfigurationParseError('YAML loader', f'Cannot load file: {exc}')
 
 
-def _yaml_load_path(path):
+def _yaml_load_path(path: str) -> Union[_ConfigNodeV3, _MapNode]:
     with open(path) as f:
         return _yaml_load(f)
 
@@ -287,7 +295,7 @@ def _yaml_load_path(path):
 # Dumps the content of the Python object `obj`
 # (`collections.OrderedDict` or `_ConfigNodeV3`) as a YAML string and
 # returns it.
-def _yaml_dump(node, **kwds):
+def _yaml_dump(node: _MapNode, **kwds) -> str:
     class Dumper(yaml.Dumper):
         pass
 
@@ -311,16 +319,17 @@ def _yaml_dump(node, **kwds):
 # mostly contains helpers.
 class _Parser:
     # Builds a base barectf YAML configuration parser to process the
-    # configuration node `node` (already loaded from the file having the
-    # path `path`).
+    # configuration node `node` (already loaded from the file-like
+    # object `file`).
     #
     # For its _process_node_include() method, the parser considers the
     # package inclusion directory as well as `include_dirs`, and ignores
     # nonexistent inclusion files if `ignore_include_not_found` is
     # `True`.
-    def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found,
-                 major_version):
-        self._root_path = path
+    def __init__(self, root_file: TextIO, node: Union[_MapNode, _ConfigNodeV3],
+                 with_pkg_include_dir: bool, include_dirs: Optional[List[str]],
+                 ignore_include_not_found: bool, major_version: VersionNumber):
+        self._root_file = root_file
         self._root_node = node
         self._ft_prop_names = [
             # barectf 2.1+
@@ -335,35 +344,42 @@ class _Parser:
             'element-field-type',
         ]
 
+        if include_dirs is None:
+            include_dirs = []
+
         self._include_dirs = copy.copy(include_dirs)
 
         if with_pkg_include_dir:
             self._include_dirs.append(pkg_resources.resource_filename(__name__, f'include/{major_version}'))
 
         self._ignore_include_not_found = ignore_include_not_found
-        self._include_stack = []
-        self._resolved_ft_aliases = set()
+        self._include_stack: List[str] = []
+        self._resolved_ft_aliases: Set[str] = set()
         self._schema_validator = _SchemaValidator({'common/config', f'{major_version}/config'})
         self._major_version = major_version
 
     @property
-    def _struct_ft_node_members_prop_name(self):
+    def _struct_ft_node_members_prop_name(self) -> str:
         if self._major_version == 2:
             return 'fields'
         else:
             return 'members'
 
     # Returns the last included file name from the parser's inclusion
-    # file name stack.
-    def _get_last_include_file(self):
+    # file name stack, or `N/A` if the root file does not have an
+    # associated path under the `name` property.
+    def _get_last_include_file(self) -> str:
         if self._include_stack:
             return self._include_stack[-1]
 
-        return self._root_path
+        if hasattr(self._root_file, 'name'):
+            return typing.cast(str, self._root_file.name)
+
+        return 'N/A'
 
     # Loads the inclusion file having the path `yaml_path` and returns
     # its content as a `collections.OrderedDict` object.
-    def _load_include(self, yaml_path):
+    def _load_include(self, yaml_path) -> Optional[_MapNode]:
         for inc_dir in self._include_dirs:
             # Current inclusion dir + file name path.
             #
@@ -389,38 +405,37 @@ class _Parser:
             self._include_stack.append(norm_path)
 
             # load raw content
-            return _yaml_load_path(norm_path)
+            return typing.cast(_MapNode, _yaml_load_path(norm_path))
 
         if not self._ignore_include_not_found:
             base_path = self._get_last_include_file()
             raise _ConfigurationParseError(f'File `{base_path}`',
                                            f'Cannot include file `{yaml_path}`: file not found in inclusion directories')
 
+        return None
+
     # Returns a list of all the inclusion file paths as found in the
     # inclusion node `include_node`.
-    def _get_include_paths(self, include_node):
+    def _get_include_paths(self, include_node: _MapNode) -> List[str]:
         if include_node is None:
             # none
             return []
 
         if type(include_node) is str:
             # wrap as array
-            return [include_node]
+            return [typing.cast(str, include_node)]
 
         # already an array
         assert type(include_node) is list
-        return include_node
+        return typing.cast(List[str], include_node)
 
     # Updates the node `base_node` with an overlay node `overlay_node`.
     #
     # Both the inclusion and field type node inheritance features use
     # this update mechanism.
-    def _update_node(self, base_node, overlay_node):
+    def _update_node(self, base_node: _MapNode, overlay_node: _MapNode):
         # see the comment about the `members` property below
-        def update_members_node(base_value, olay_value):
-            assert type(olay_value) is list
-            assert type(base_value) is list
-
+        def update_members_node(base_value: List[Any], olay_value: List[Any]):
             for olay_item in olay_value:
                 # assume we append `olay_item` to `base_value` initially
                 append_olay_item = True
@@ -567,9 +582,9 @@ class _Parser:
     # `last_overlay_node` and then patches the current base node with
     # its other properties before returning the result (always a deep
     # copy).
-    def _process_node_include(self, last_overlay_node,
-                              process_base_include_cb,
-                              process_children_include_cb=None):
+    def _process_node_include(self, last_overlay_node: _MapNode,
+                              process_base_include_cb: Callable[[_MapNode], _MapNode],
+                              process_children_include_cb: Optional[Callable[[_MapNode], None]] = None) -> _MapNode:
         # process children inclusions first
         if process_children_include_cb is not None:
             process_children_include_cb(last_overlay_node)
@@ -629,13 +644,16 @@ class _Parser:
     # Generates pairs of member node and field type node property name
     # (in the member node) for the structure field type node's members
     # node `node`.
-    def _struct_ft_member_fts_iter(self, node):
+    def _struct_ft_member_fts_iter(self,
+                                   node: Union[List[_MapNode], _MapNode]) -> Iterable[Tuple[_MapNode, str]]:
         if type(node) is list:
             # barectf 3
             assert self._major_version == 3
+            node = typing.cast(List[_MapNode], node)
 
             for member_node in node:
                 assert type(member_node) is collections.OrderedDict
+                member_node = typing.cast(_MapNode, member_node)
                 name, val = list(member_node.items())[0]
 
                 if type(val) is collections.OrderedDict:
@@ -647,6 +665,7 @@ class _Parser:
             # barectf 2
             assert self._major_version == 2
             assert type(node) is collections.OrderedDict
+            node = typing.cast(_MapNode, node)
 
             for name in node:
                 yield node, name
@@ -661,7 +680,8 @@ class _Parser:
     #
     # `ctx_obj_name` is the context's object name when this method
     # raises a `_ConfigurationParseError` exception.
-    def _resolve_ft_alias(self, ft_aliases_node, parent_node, key, ctx_obj_name, alias_set=None):
+    def _resolve_ft_alias(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str,
+                          ctx_obj_name: str, alias_set: Optional[Set[str]] = None):
         if key not in parent_node:
             return
 
@@ -722,7 +742,7 @@ class _Parser:
 
     # Like _resolve_ft_alias(), but builds a context object name for any
     # `ctx_obj_name` exception.
-    def _resolve_ft_alias_from(self, ft_aliases_node, parent_node, key):
+    def _resolve_ft_alias_from(self, ft_aliases_node: _MapNode, parent_node: _MapNode, key: str):
         self._resolve_ft_alias(ft_aliases_node, parent_node, key, f'`{key}` property')
 
     # Applies field type node inheritance to the property `key` of
@@ -735,7 +755,7 @@ class _Parser:
     #
     # When this method returns, no field type node has an `$inherit` or
     # `inherit` property.
-    def _apply_ft_inheritance(self, parent_node, key):
+    def _apply_ft_inheritance(self, parent_node: _MapNode, key: str):
         if key not in parent_node:
             return
 
index 4faea4c0956e2739e7463fc32ed7bde0bb5c79ea..2fbf898addc0b1273557b7c2ea03db42295da494 100644 (file)
 from barectf.config_parse_common import _ConfigurationParseError
 from barectf.config_parse_common import _append_error_ctx
 import barectf.config_parse_common as config_parse_common
+from barectf.config_parse_common import _MapNode
 import collections
 import copy
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, TextIO, Union, Callable
+import typing
 
 
-def _del_prop_if_exists(node, prop_name):
+def _del_prop_if_exists(node: _MapNode, prop_name: str):
     if prop_name in node:
         del node[prop_name]
 
 
-def _rename_prop(node, old_prop_name, new_prop_name):
+def _rename_prop(node: _MapNode, old_prop_name: str, new_prop_name: str):
     if old_prop_name in node:
         node[new_prop_name] = node[old_prop_name]
         del node[old_prop_name]
 
 
-def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
+def _copy_prop_if_exists(dst_node: _MapNode, src_node: _MapNode, src_prop_name: str,
+                         dst_prop_name: _OptStr = None):
     if dst_prop_name is None:
         dst_prop_name = src_prop_name
 
@@ -59,10 +64,13 @@ def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
 # parsing stages and general strategy.
 class _Parser(config_parse_common._Parser):
     # Builds a barectf 2 YAML configuration parser and parses the root
-    # configuration node `node` (already loaded from `path`).
-    def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found):
-        super().__init__(path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, 2)
-        self._ft_cls_name_to_conv_method = {
+    # configuration node `node` (already loaded from the file-like
+    # object `root_file`).
+    def __init__(self, root_file: TextIO, node: _MapNode, with_pkg_include_dir: bool,
+                 include_dirs: Optional[List[str]], ignore_include_not_found: bool):
+        super().__init__(root_file, node, with_pkg_include_dir, include_dirs,
+                         ignore_include_not_found, VersionNumber(2))
+        self._ft_cls_name_to_conv_method: Dict[str, Callable[[_MapNode], _MapNode]] = {
             'int': self._conv_int_ft_node,
             'integer': self._conv_int_ft_node,
             'enum': self._conv_enum_ft_node,
@@ -80,24 +88,24 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 field type node to a v3 field type node and returns
     # it.
-    def _conv_ft_node(self, v2_ft_node):
+    def _conv_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         assert 'class' in v2_ft_node
         cls = v2_ft_node['class']
         assert cls in self._ft_cls_name_to_conv_method
         return self._ft_cls_name_to_conv_method[cls](v2_ft_node)
 
-    def _conv_ft_node_if_exists(self, v2_parent_node, key):
+    def _conv_ft_node_if_exists(self, v2_parent_node: Optional[_MapNode], key: str) -> Optional[_MapNode]:
         if v2_parent_node is None:
-            return
+            return None
 
         if key not in v2_parent_node:
-            return
+            return None
 
         return self._conv_ft_node(v2_parent_node[key])
 
     # Converts a v2 integer field type node to a v3 integer field type
     # node and returns it.
-    def _conv_int_ft_node(self, v2_ft_node):
+    def _conv_int_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 integer field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -128,7 +136,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 enumeration field type node to a v3 enumeration
     # field type node and returns it.
-    def _conv_enum_ft_node(self, v2_ft_node):
+    def _conv_enum_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # An enumeration field type _is_ an integer field type, so use a
         # copy of the converted v2 value field type node.
         v3_ft_node = copy.deepcopy(self._conv_ft_node(v2_ft_node['value-type']))
@@ -147,10 +155,12 @@ class _Parser(config_parse_common._Parser):
         members_node = v2_ft_node.get(prop_name)
 
         if members_node is not None:
-            mappings_node = collections.OrderedDict()
+            mappings_node: _MapNode = collections.OrderedDict()
             cur = 0
 
             for member_node in members_node:
+                v3_value_node: Union[int, List[int]]
+
                 if type(member_node) is str:
                     label = member_node
                     v3_value_node = cur
@@ -180,7 +190,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 real field type node to a v3 real field type node
     # and returns it.
-    def _conv_real_ft_node(self, v2_ft_node):
+    def _conv_real_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 real field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -198,7 +208,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 string field type node to a v3 string field type
     # node and returns it.
-    def _conv_string_ft_node(self, v2_ft_node):
+    def _conv_string_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 string field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -209,9 +219,9 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 array field type node to a v3 (static) array field
     # type node and returns it.
-    def _conv_static_array_ft_node(self, v2_ft_node):
+    def _conv_static_array_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # class renamed to `static-array`
-        v3_ft_node = collections.OrderedDict({'class': 'static-array'})
+        v3_ft_node: _MapNode = collections.OrderedDict({'class': 'static-array'})
 
         # copy `length` property
         _copy_prop_if_exists(v3_ft_node, v2_ft_node, 'length')
@@ -223,7 +233,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 structure field type node to a v3 structure field
     # type node and returns it.
-    def _conv_struct_ft_node(self, v2_ft_node):
+    def _conv_struct_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # Create fresh v3 structure field type node, reusing the class
         # of `v2_ft_node`.
         v3_ft_node = collections.OrderedDict({'class': v2_ft_node['class']})
@@ -250,7 +260,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 clock type node to a v3 clock type node and returns
     # it.
-    def _conv_clk_type_node(self, v2_clk_type_node):
+    def _conv_clk_type_node(self, v2_clk_type_node: _MapNode) -> _MapNode:
         # copy v2 clock type node
         v3_clk_type_node = copy.deepcopy(v2_clk_type_node)
 
@@ -272,9 +282,9 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 event type node to a v3 event type node and returns
     # it.
-    def _conv_ev_type_node(self, v2_ev_type_node):
+    def _conv_ev_type_node(self, v2_ev_type_node: _MapNode) -> _MapNode:
         # create empty v3 event type node
-        v3_ev_type_node = collections.OrderedDict()
+        v3_ev_type_node: _MapNode = collections.OrderedDict()
 
         # copy `log-level` property
         _copy_prop_if_exists(v3_ev_type_node, v2_ev_type_node, 'log-level')
@@ -294,7 +304,8 @@ class _Parser(config_parse_common._Parser):
         return v3_ev_type_node
 
     @staticmethod
-    def _set_v3_feature_ft_if_exists(v3_features_node, key, node):
+    def _set_v3_feature_ft_if_exists(v3_features_node: _MapNode, key: str,
+                                     node: Union[Optional[_MapNode], bool]):
         val = node
 
         if val is None:
@@ -304,12 +315,12 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 stream type node to a v3 stream type node and
     # returns it.
-    def _conv_stream_type_node(self, v2_stream_type_node):
+    def _conv_stream_type_node(self, v2_stream_type_node: _MapNode) -> _MapNode:
         # This function creates a v3 stream type features node from the
         # packet context and event header field type nodes of a
         # v2 stream type node.
-        def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node,
-                                              v2_ev_header_ft_fields_node):
+        def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node: _MapNode,
+                                              v2_ev_header_ft_fields_node: Optional[_MapNode]) -> _MapNode:
             if v2_ev_header_ft_fields_node is None:
                 v2_ev_header_ft_fields_node = collections.OrderedDict()
 
@@ -324,9 +335,9 @@ class _Parser(config_parse_common._Parser):
             v3_ev_type_id_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'id')
             v3_ev_time_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node,
                                                               'timestamp')
-            v3_features_node = collections.OrderedDict()
-            v3_pkt_node = collections.OrderedDict()
-            v3_ev_node = collections.OrderedDict()
+            v3_features_node: _MapNode = collections.OrderedDict()
+            v3_pkt_node: _MapNode = collections.OrderedDict()
+            v3_ev_node: _MapNode = collections.OrderedDict()
             v3_pkt_node['total-size-field-type'] = v3_pkt_total_size_ft_node
             v3_pkt_node['content-size-field-type'] = v3_pkt_content_size_ft_node
             self._set_v3_feature_ft_if_exists(v3_pkt_node, 'beginning-time-field-type',
@@ -342,9 +353,9 @@ class _Parser(config_parse_common._Parser):
             v3_features_node['event'] = v3_ev_node
             return v3_features_node
 
-        def clk_type_name_from_v2_int_ft_node(v2_int_ft_node):
+        def clk_type_name_from_v2_int_ft_node(v2_int_ft_node: Optional[_MapNode]) -> _OptStr:
             if v2_int_ft_node is None:
-                return
+                return None
 
             assert v2_int_ft_node['class'] in ('int', 'integer')
             prop_mappings_node = v2_int_ft_node.get('property-mappings')
@@ -352,8 +363,10 @@ class _Parser(config_parse_common._Parser):
             if prop_mappings_node is not None and len(prop_mappings_node) > 0:
                 return prop_mappings_node[0]['name']
 
+            return None
+
         # create empty v3 stream type node
-        v3_stream_type_node = collections.OrderedDict()
+        v3_stream_type_node: _MapNode = collections.OrderedDict()
 
         # rename `$default` property to `$is-default`
         _copy_prop_if_exists(v3_stream_type_node, v2_stream_type_node, '$default', '$is-default')
@@ -446,8 +459,8 @@ class _Parser(config_parse_common._Parser):
         return v3_stream_type_node
 
     # Converts a v2 metadata node to a v3 trace node and returns it.
-    def _conv_meta_node(self, v2_meta_node):
-        def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node):
+    def _conv_meta_node(self, v2_meta_node: _MapNode) -> _MapNode:
+        def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node: Optional[_MapNode]) -> _MapNode:
             def set_if_exists(key, node):
                 return self._set_v3_feature_ft_if_exists(v3_features_node, key, node)
 
@@ -460,14 +473,14 @@ class _Parser(config_parse_common._Parser):
             v3_uuid_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'uuid')
             v3_stream_type_id_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node,
                                                                      'stream_id')
-            v3_features_node = collections.OrderedDict()
+            v3_features_node: _MapNode = collections.OrderedDict()
             set_if_exists('magic-field-type', v3_magic_ft_node)
             set_if_exists('uuid-field-type', v3_uuid_ft_node)
             set_if_exists('stream-type-id-field-type', v3_stream_type_id_ft_node)
             return v3_features_node
 
-        v3_trace_node = collections.OrderedDict()
-        v3_trace_type_node = collections.OrderedDict()
+        v3_trace_node: _MapNode = collections.OrderedDict()
+        v3_trace_type_node: _MapNode = collections.OrderedDict()
         v2_trace_node = v2_meta_node['trace']
 
         # rename `byte-order` property to `$default-byte-order`
@@ -663,7 +676,7 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the event type node `ev_type_node`,
     # returning the effective node.
-    def _process_ev_type_node_include(self, ev_type_node):
+    def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
         # Make sure the event type node is valid for the inclusion
         # processing stage.
         self._schema_validator.validate(ev_type_node, '2/config/event-type-pre-include')
@@ -673,7 +686,7 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the stream type node
     # `stream_type_node`, returning the effective node.
-    def _process_stream_type_node_include(self, stream_type_node):
+    def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
         def process_children_include(stream_type_node):
             prop_name = 'events'
 
@@ -693,7 +706,7 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the trace type node `trace_type_node`,
     # returning the effective node.
-    def _process_trace_type_node_include(self, trace_type_node):
+    def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
         # Make sure the trace type node is valid for the inclusion
         # processing stage.
         self._schema_validator.validate(trace_type_node, '2/config/trace-type-pre-include')
@@ -703,7 +716,7 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the clock type node `clk_type_node`,
     # returning the effective node.
-    def _process_clk_type_node_include(self, clk_type_node):
+    def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
         # Make sure the clock type node is valid for the inclusion
         # processing stage.
         self._schema_validator.validate(clk_type_node, '2/config/clock-type-pre-include')
@@ -713,8 +726,8 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the metadata node `meta_node`,
     # returning the effective node.
-    def _process_meta_node_include(self, meta_node):
-        def process_children_include(meta_node):
+    def _process_meta_node_include(self, meta_node: _MapNode) -> _MapNode:
+        def process_children_include(meta_node: _MapNode):
             prop_name = 'trace'
 
             if prop_name in meta_node:
@@ -833,5 +846,5 @@ class _Parser(config_parse_common._Parser):
         self._transform_config_node()
 
     @property
-    def config_node(self):
-        return config_parse_common._ConfigNodeV3(self._root_node)
+    def config_node(self) -> config_parse_common._ConfigNodeV3:
+        return config_parse_common._ConfigNodeV3(typing.cast(_MapNode, self._root_node))
index fe5a1d0faecfbadd2ed0d584f431d48efd12bd29..fccf1a12237a6389ed09380a6f53317e7f8f9f6a 100644 (file)
 import barectf.config_parse_common as barectf_config_parse_common
 from barectf.config_parse_common import _ConfigurationParseError
 from barectf.config_parse_common import _append_error_ctx
+from barectf.config_parse_common import _MapNode
 import barectf.config as barectf_config
+from barectf.config import _OptFt, _OptStructFt
 import collections
 import uuid
+from barectf.typing import Count, Alignment, VersionNumber
+from typing import Optional, List, Dict, Any, TextIO, Set, Iterable, Callable, Tuple, Type
+import typing
 
 
 # A barectf 3 YAML configuration parser.
@@ -40,11 +45,14 @@ import uuid
 # parsing stages and general strategy.
 class _Parser(barectf_config_parse_common._Parser):
     # Builds a barectf 3 YAML configuration parser and parses the root
-    # configuration node `node` (already loaded from `path`).
-    def __init__(self, path, node, with_pkg_include_dir, inclusion_dirs, ignore_include_not_found):
-        super().__init__(path, node, with_pkg_include_dir, inclusion_dirs,
-                         ignore_include_not_found, 3)
-        self._ft_cls_name_to_create_method = {
+    # configuration node `node` (already loaded from the file-like
+    # object `root_file`).
+    def __init__(self, root_file: TextIO, node: barectf_config_parse_common._ConfigNodeV3,
+                 with_pkg_include_dir: bool, inclusion_dirs: Optional[List[str]],
+                 ignore_include_not_found: bool):
+        super().__init__(root_file, node, with_pkg_include_dir, inclusion_dirs,
+                         ignore_include_not_found, VersionNumber(3))
+        self._ft_cls_name_to_create_method: Dict[str, Callable[[_MapNode], barectf_config._FieldType]] = {
             'unsigned-integer': self._create_int_ft,
             'signed-integer': self._create_int_ft,
             'unsigned-enumeration': self._create_enum_ft,
@@ -60,7 +68,7 @@ class _Parser(barectf_config_parse_common._Parser):
     # `_ConfigurationParseError` exception using `ctx_obj_name` if it's
     # invalid.
     @staticmethod
-    def _validate_alignment(alignment, ctx_obj_name):
+    def _validate_alignment(alignment: Alignment, ctx_obj_name: str):
         assert alignment >= 1
 
         # check for power of two
@@ -72,7 +80,7 @@ class _Parser(barectf_config_parse_common._Parser):
     # `_ConfigurationParseError` exception using `ctx_obj_name` and
     # `prop` to format the message if it's invalid.
     @staticmethod
-    def _validate_iden(iden, ctx_obj_name, prop):
+    def _validate_iden(iden: str, ctx_obj_name: str, prop: str):
         assert type(iden) is str
         ctf_keywords = {
             'align',
@@ -97,20 +105,20 @@ class _Parser(barectf_config_parse_common._Parser):
             raise _ConfigurationParseError(ctx_obj_name, msg)
 
     @staticmethod
-    def _alignment_prop(ft_node, prop_name):
+    def _alignment_prop(ft_node: _MapNode, prop_name: str) -> Alignment:
         alignment = ft_node.get(prop_name)
 
         if alignment is not None:
             _Parser._validate_alignment(alignment, '`prop_name` property')
 
-        return alignment
+        return Alignment(alignment)
 
     @property
-    def _trace_type_node(self):
-        return self._root_node.config_node['trace']['type']
+    def _trace_type_node(self) -> _MapNode:
+        return self.config_node['trace']['type']
 
     @staticmethod
-    def _byte_order_from_node(node):
+    def _byte_order_from_node(node: str) -> barectf_config.ByteOrder:
         return {
             'big-endian': barectf_config.ByteOrder.BIG_ENDIAN,
             'little-endian': barectf_config.ByteOrder.LITTLE_ENDIAN,
@@ -119,7 +127,10 @@ class _Parser(barectf_config_parse_common._Parser):
     # Creates a bit array field type having the type `ft_type` from the
     # bit array field type node `ft_node`, passing the additional
     # `*args` to ft_type.__init__().
-    def _create_common_bit_array_ft(self, ft_node, ft_type, default_alignment, *args):
+    def _create_common_bit_array_ft(self, ft_node: _MapNode,
+                                    ft_type: Type[barectf_config._BitArrayFieldType],
+                                    default_alignment: Optional[Alignment],
+                                    *args) -> barectf_config._BitArrayFieldType:
         byte_order = self._byte_order_from_node(ft_node['byte-order'])
         alignment = self._alignment_prop(ft_node, 'alignment')
 
@@ -131,18 +142,21 @@ class _Parser(barectf_config_parse_common._Parser):
     # Creates an integer field type having the type `ft_type` from the
     # integer field type node `ft_node`, passing the additional `*args`
     # to ft_type.__init__().
-    def _create_common_int_ft(self, ft_node, ft_type, *args):
+    def _create_common_int_ft(self, ft_node: _MapNode,
+                              ft_type: Type[barectf_config._IntegerFieldType], *args) -> barectf_config._IntegerFieldType:
         preferred_display_base = {
             'binary': barectf_config.DisplayBase.BINARY,
             'octal': barectf_config.DisplayBase.OCTAL,
             'decimal': barectf_config.DisplayBase.DECIMAL,
             'hexadecimal': barectf_config.DisplayBase.HEXADECIMAL,
         }[ft_node.get('preferred-display-base', 'decimal')]
-        return self._create_common_bit_array_ft(ft_node, ft_type, None, preferred_display_base, *args)
+        return typing.cast(barectf_config._IntegerFieldType,
+                           self._create_common_bit_array_ft(ft_node, ft_type, None,
+                                                            preferred_display_base, *args))
 
     # Creates an integer field type from the unsigned/signed integer
     # field type node `ft_node`.
-    def _create_int_ft(self, ft_node):
+    def _create_int_ft(self, ft_node: _MapNode) -> barectf_config._IntegerFieldType:
         ft_type = {
             'unsigned-integer': barectf_config.UnsignedIntegerFieldType,
             'signed-integer': barectf_config.SignedIntegerFieldType,
@@ -151,7 +165,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Creates an enumeration field type from the unsigned/signed
     # enumeration field type node `ft_node`.
-    def _create_enum_ft(self, ft_node):
+    def _create_enum_ft(self, ft_node: _MapNode) -> barectf_config._EnumerationFieldType:
         ft_type = {
             'unsigned-enumeration': barectf_config.UnsignedEnumerationFieldType,
             'signed-enumeration': barectf_config.SignedEnumerationFieldType,
@@ -172,21 +186,24 @@ class _Parser(barectf_config_parse_common._Parser):
 
             mappings[label] = barectf_config.EnumerationFieldTypeMapping(ranges)
 
-        return self._create_common_int_ft(ft_node, ft_type,
-                                          barectf_config.EnumerationFieldTypeMappings(mappings))
+        return typing.cast(barectf_config._EnumerationFieldType,
+                           self._create_common_int_ft(ft_node, ft_type,
+                                                      barectf_config.EnumerationFieldTypeMappings(mappings)))
 
     # Creates a real field type from the real field type node `ft_node`.
-    def _create_real_ft(self, ft_node):
-        return self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType, 8)
+    def _create_real_ft(self, ft_node: _MapNode) -> barectf_config.RealFieldType:
+        return typing.cast(barectf_config.RealFieldType,
+                           self._create_common_bit_array_ft(ft_node, barectf_config.RealFieldType,
+                                                            Alignment(8)))
 
     # Creates a string field type from the string field type node
     # `ft_node`.
-    def _create_string_ft(self, ft_node):
+    def _create_string_ft(self, ft_node: _MapNode) -> barectf_config.StringFieldType:
         return barectf_config.StringFieldType()
 
     # Creates a static array field type from the static array field type
     # node `ft_node`.
-    def _create_static_array_ft(self, ft_node):
+    def _create_static_array_ft(self, ft_node: _MapNode) -> barectf_config.StaticArrayFieldType:
         prop_name = 'element-field-type'
 
         try:
@@ -201,9 +218,9 @@ class _Parser(barectf_config_parse_common._Parser):
     #
     # `prop_name` is the name of the property of which `members_node` is
     # the value.
-    def _create_struct_ft_members(self, members_node, prop_name):
+    def _create_struct_ft_members(self, members_node: List[_MapNode], prop_name: str):
         members = collections.OrderedDict()
-        member_names = set()
+        member_names: Set[str] = set()
 
         for member_node in members_node:
             member_name, member_node = list(member_node.items())[0]
@@ -236,7 +253,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Creates a structure field type from the structure field type node
     # `ft_node`.
-    def _create_struct_ft(self, ft_node):
+    def _create_struct_ft(self, ft_node: _MapNode) -> barectf_config.StructureFieldType:
         minimum_alignment = self._alignment_prop(ft_node, 'minimum-alignment')
 
         if minimum_alignment is None:
@@ -252,33 +269,43 @@ class _Parser(barectf_config_parse_common._Parser):
         return barectf_config.StructureFieldType(minimum_alignment, members)
 
     # Creates a field type from the field type node `ft_node`.
-    def _create_ft(self, ft_node):
+    def _create_ft(self, ft_node: _MapNode) -> barectf_config._FieldType:
         return self._ft_cls_name_to_create_method[ft_node['class']](ft_node)
 
     # Creates a field type from the field type node `parent_node[key]`
     # if it exists.
-    def _try_create_ft(self, parent_node, key):
+    def _try_create_ft(self, parent_node: _MapNode, key: str) -> _OptFt:
         if key not in parent_node:
-            return
+            return None
 
         try:
             return self._create_ft(parent_node[key])
         except _ConfigurationParseError as exc:
             _append_error_ctx(exc, f'`{key}` property')
 
+            # satisfy static type checker (never reached)
+            raise
+
+    # Like _try_create_ft(), but casts the result's type to
+    # `barectf_config.StructureFieldType` to satisfy static type
+    # checkers.
+    def _try_create_struct_ft(self, parent_node: _MapNode, key: str) -> _OptStructFt:
+        return typing.cast(barectf_config.StructureFieldType,
+                           self._try_create_ft(parent_node, key))
+
     # Returns the total number of members in the structure field type
     # node `ft_node` if it exists, otherwise 0.
     @staticmethod
-    def _total_struct_ft_node_members(ft_node):
+    def _total_struct_ft_node_members(ft_node: Optional[_MapNode]) -> Count:
         if ft_node is None:
-            return 0
+            return Count(0)
 
         members_node = ft_node.get('members')
 
         if members_node is None:
-            return 0
+            return Count(0)
 
-        return len(members_node)
+        return Count(len(members_node))
 
     # Creates an event type from the event type node `ev_type_node`
     # named `name`.
@@ -288,26 +315,33 @@ class _Parser(barectf_config_parse_common._Parser):
     # stream type). For example, if the stream type has a event header
     # field type with `id` and `timestamp` members, then
     # `ev_member_count` is 2.
-    def _create_ev_type(self, name, ev_type_node, ev_member_count):
+    def _create_ev_type(self, name: str, ev_type_node: _MapNode, ev_member_count: Count) -> barectf_config.EventType:
         try:
             self._validate_iden(name, '`name` property', 'event type name')
 
             # make sure the event type is not empty
             spec_ctx_ft_prop_name = 'specific-context-field-type'
             payload_ft_prop_name = 'payload-field-type'
-            ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name))
-            ev_member_count += self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name))
+            ev_member_count = Count(ev_member_count +
+                                    self._total_struct_ft_node_members(ev_type_node.get(spec_ctx_ft_prop_name)))
+            ev_member_count = Count(ev_member_count +
+                                    self._total_struct_ft_node_members(ev_type_node.get(payload_ft_prop_name)))
 
             if ev_member_count == 0:
                 raise _ConfigurationParseError('Event type', 'Event type is empty (no members).')
 
             # create event type
             return barectf_config.EventType(name, ev_type_node.get('log-level'),
-                                            self._try_create_ft(ev_type_node, spec_ctx_ft_prop_name),
-                                            self._try_create_ft(ev_type_node, payload_ft_prop_name))
+                                            self._try_create_struct_ft(ev_type_node,
+                                                                       spec_ctx_ft_prop_name),
+                                            self._try_create_struct_ft(ev_type_node,
+                                                                       payload_ft_prop_name))
         except _ConfigurationParseError as exc:
             _append_error_ctx(exc, f'Event type `{name}`')
 
+            # satisfy static type checker (never reached)
+            raise
+
     # Returns the effective feature field type for the field type
     # node `parent_node[key]`, if any.
     #
@@ -324,7 +358,7 @@ class _Parser(barectf_config_parse_common._Parser):
     #
     # Otherwise:
     #     A created field type.
-    def _feature_ft(self, parent_node, key, none=None):
+    def _feature_ft(self, parent_node: _MapNode, key: str, none: Any = None) -> Any:
         if key not in parent_node:
             # missing: default feature field type
             return none
@@ -343,7 +377,7 @@ class _Parser(barectf_config_parse_common._Parser):
         assert type(ft_node) is collections.OrderedDict
         return self._create_ft(ft_node)
 
-    def _create_stream_type(self, name, stream_type_node):
+    def _create_stream_type(self, name: str, stream_type_node: _MapNode) -> barectf_config.StreamType:
         try:
             # validate stream type's name
             self._validate_iden(name, '`name` property', 'stream type name')
@@ -416,9 +450,12 @@ class _Parser(barectf_config_parse_common._Parser):
                     raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
                                                    'Event type ID field type feature is required because stream type has more than one event type')
 
-                if isinstance(ev_type_id_ft, barectf_config._FieldType) and ev_type_count > (1 << ev_type_id_ft.size):
-                    raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
-                                                   f'Field type\'s size ({ev_type_id_ft.size} bits) is too small to accomodate {ev_type_count} event types')
+                if isinstance(ev_type_id_ft, barectf_config._IntegerFieldType):
+                    ev_type_id_int_ft = typing.cast(barectf_config._IntegerFieldType, ev_type_id_ft)
+
+                    if ev_type_count > (1 << ev_type_id_int_ft.size):
+                        raise _ConfigurationParseError(f'`{type_id_ft_prop_name}` property',
+                                                       f'Field type\'s size ({ev_type_id_int_ft.size} bits) is too small to accomodate {ev_type_count} event types')
             except _ConfigurationParseError as exc:
                 exc._append_ctx('`event` property')
                 _append_error_ctx(exc, '`$features` property')
@@ -456,17 +493,18 @@ class _Parser(barectf_config_parse_common._Parser):
                                                        f'Packet context field type member name `{member_name}` is reserved.')
 
             # create event types
-            ev_header_common_ctx_member_count = 0
+            ev_header_common_ctx_member_count = Count(0)
 
             if ev_features.type_id_field_type is not None:
-                ev_header_common_ctx_member_count += 1
+                ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1)
 
             if ev_features.time_field_type is not None:
-                ev_header_common_ctx_member_count += 1
+                ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count + 1)
 
             ev_common_ctx_ft_prop_name = 'event-common-context-field-type'
             ev_common_ctx_ft_node = stream_type_node.get(ev_common_ctx_ft_prop_name)
-            ev_header_common_ctx_member_count += self._total_struct_ft_node_members(ev_common_ctx_ft_node)
+            ev_header_common_ctx_member_count = Count(ev_header_common_ctx_member_count +
+                                                      self._total_struct_ft_node_members(ev_common_ctx_ft_node))
             ev_types = set()
 
             for ev_name, ev_type_node in stream_type_node[ev_types_prop_name].items():
@@ -475,12 +513,15 @@ class _Parser(barectf_config_parse_common._Parser):
             # create stream type
             return barectf_config.StreamType(name, ev_types, def_clk_type, features,
                                              pkt_ctx_ft_extra_members,
-                                             self._try_create_ft(stream_type_node,
-                                                                 ev_common_ctx_ft_prop_name))
+                                             self._try_create_struct_ft(stream_type_node,
+                                                                        ev_common_ctx_ft_prop_name))
         except _ConfigurationParseError as exc:
             _append_error_ctx(exc, f'Stream type `{name}`')
 
-    def _clk_type(self, name, prop_name):
+            # satisfy static type checker (never reached)
+            raise
+
+    def _clk_type(self, name: str, prop_name: str) -> barectf_config.ClockType:
         clk_type = self._clk_types.get(name)
 
         if clk_type is None:
@@ -489,7 +530,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
         return clk_type
 
-    def _create_clk_type(self, name, clk_type_node):
+    def _create_clk_type(self, name: str, clk_type_node: _MapNode) -> barectf_config.ClockType:
         self._validate_iden(name, '`name` property', 'clock type name')
         clk_type_uuid = None
         uuid_node = clk_type_node.get('uuid')
@@ -498,12 +539,12 @@ class _Parser(barectf_config_parse_common._Parser):
             clk_type_uuid = uuid.UUID(uuid_node)
 
         offset_seconds = 0
-        offset_cycles = 0
+        offset_cycles = Count(0)
         offset_node = clk_type_node.get('offset')
 
         if offset_node is not None:
             offset_seconds = offset_node.get('seconds', 0)
-            offset_cycles = offset_node.get('cycles', 0)
+            offset_cycles = offset_node.get('cycles', Count(0))
 
         return barectf_config.ClockType(name, clk_type_node.get('frequency', int(1e9)),
                                         clk_type_uuid, clk_type_node.get('description'),
@@ -583,7 +624,7 @@ class _Parser(barectf_config_parse_common._Parser):
     def _create_trace(self):
         try:
             trace_type = self._create_trace_type()
-            trace_node = self._root_node.config_node['trace']
+            trace_node = self.config_node['trace']
             env = None
             env_node = trace_node.get('environment')
 
@@ -641,7 +682,9 @@ class _Parser(barectf_config_parse_common._Parser):
         # create options
         iden_prefix_def = False
         def_stream_type_name_def = False
-        opts_node = self._root_node.config_node.get('options')
+        opts_node = self.config_node.get('options')
+        iden_prefix = 'barectf_'
+        file_name_prefix = 'barectf'
 
         if opts_node is not None:
             code_gen_opts_node = opts_node.get('code-generation')
@@ -686,14 +729,14 @@ class _Parser(barectf_config_parse_common._Parser):
     # * The `$field-type-aliases` property of the trace type node is
     #   removed.
     def _expand_ft_aliases(self):
-        def resolve_ft_alias_from(parent_node, key):
+        def resolve_ft_alias_from(parent_node: _MapNode, key: str):
             if key not in parent_node:
                 return
 
             if type(parent_node[key]) not in [collections.OrderedDict, str]:
                 return
 
-            return self._resolve_ft_alias_from(ft_aliases_node, parent_node, key)
+            self._resolve_ft_alias_from(ft_aliases_node, parent_node, key)
 
         ft_aliases_node = self._trace_type_node['$field-type-aliases']
 
@@ -779,14 +822,14 @@ class _Parser(barectf_config_parse_common._Parser):
     # When this method returns, no field type node has an `$inherit`
     # property.
     def _apply_fts_inheritance(self):
-        def apply_ft_inheritance(parent_node, key):
+        def apply_ft_inheritance(parent_node: _MapNode, key: str):
             if key not in parent_node:
                 return
 
             if type(parent_node[key]) is not collections.OrderedDict:
                 return
 
-            return self._apply_ft_inheritance(parent_node, key)
+            self._apply_ft_inheritance(parent_node, key)
 
         features_prop_name = '$features'
         features_node = self._trace_type_node.get(features_prop_name)
@@ -841,7 +884,7 @@ class _Parser(barectf_config_parse_common._Parser):
     #
     # This method normalizes form 1 to use form 2.
     def _normalize_struct_ft_member_nodes(self):
-        def normalize_members_node(members_node):
+        def normalize_members_node(members_node: List[_MapNode]):
             ft_prop_name = 'field-type'
 
             for member_node in members_node:
@@ -854,7 +897,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
                 normalize_struct_ft_member_nodes(member_node[member_name], ft_prop_name)
 
-        def normalize_struct_ft_member_nodes(parent_node, key):
+        def normalize_struct_ft_member_nodes(parent_node: _MapNode, key: str):
             if type(parent_node) is not collections.OrderedDict:
                 return
 
@@ -863,6 +906,7 @@ class _Parser(barectf_config_parse_common._Parser):
             if type(ft_node) is not collections.OrderedDict:
                 return
 
+            ft_node = typing.cast(collections.OrderedDict, ft_node)
             members_nodes = ft_node.get('members')
 
             if members_nodes is not None:
@@ -919,7 +963,7 @@ class _Parser(barectf_config_parse_common._Parser):
     def _expand_fts(self):
         # Make sure that the current configuration node is valid
         # considering field types are not expanded yet.
-        self._schema_validator.validate(self._root_node.config_node,
+        self._schema_validator.validate(self.config_node,
                                         '3/config/config-pre-field-type-expansion')
 
         prop_name = '$field-type-aliases'
@@ -950,7 +994,7 @@ class _Parser(barectf_config_parse_common._Parser):
     def _sub_log_level_aliases(self):
         # Make sure that the current configuration node is valid
         # considering log level aliases are not substituted yet.
-        self._schema_validator.validate(self._root_node.config_node,
+        self._schema_validator.validate(self.config_node,
                                         '3/config/config-pre-log-level-alias-sub')
 
         log_level_aliases_prop_name = '$log-level-aliases'
@@ -990,7 +1034,7 @@ class _Parser(barectf_config_parse_common._Parser):
     #
     # It is safe to delete a yielded node during the iteration.
     @staticmethod
-    def _props(node):
+    def _props(node: Any) -> Iterable[Tuple[Any, str]]:
         if type(node) is collections.OrderedDict:
             for key in list(node):
                 yield from _Parser._props(node[key])
@@ -999,8 +1043,8 @@ class _Parser(barectf_config_parse_common._Parser):
             for item_node in node:
                 yield from _Parser._props(item_node)
 
-    def _trace_type_props(self):
-        yield from _Parser._props(self._root_node.config_node['trace']['type'])
+    def _trace_type_props(self) -> Iterable[Tuple[Any, str]]:
+        yield from _Parser._props(self.config_node['trace']['type'])
 
     # Normalize the properties of the configuration node.
     #
@@ -1018,7 +1062,7 @@ class _Parser(barectf_config_parse_common._Parser):
     # This method also applies 1. to the trace node's `environment`
     # property.
     def _normalize_props(self):
-        def normalize_byte_order_prop(parent_node, key):
+        def normalize_byte_order_prop(parent_node: _MapNode, key: str):
             node = parent_node[key]
 
             if node in ['be', 'big']:
@@ -1026,7 +1070,7 @@ class _Parser(barectf_config_parse_common._Parser):
             elif node in ['le', 'little']:
                 parent_node[key] = 'little-endian'
 
-        trace_node = self._root_node.config_node['trace']
+        trace_node = self.config_node['trace']
         trace_type_node = trace_node['type']
         prop_name = '$default-byte-order'
 
@@ -1089,7 +1133,7 @@ class _Parser(barectf_config_parse_common._Parser):
             'real',
         }
 
-        def set_ft_node_byte_order_prop(parent_node, key):
+        def set_ft_node_byte_order_prop(parent_node: _MapNode, key: str):
             if key not in parent_node:
                 return
 
@@ -1116,7 +1160,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
             set_ft_node_byte_order_prop(ft_node, 'element-field-type')
 
-        def set_struct_ft_node_members_byte_order_prop(members_node):
+        def set_struct_ft_node_members_byte_order_prop(members_node: List[_MapNode]):
             for member_node in members_node:
                 member_name, member_node = list(member_node.items())[0]
 
@@ -1187,7 +1231,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Processes the inclusions of the event type node `ev_type_node`,
     # returning the effective node.
-    def _process_ev_type_node_include(self, ev_type_node):
+    def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
         # Make sure the event type node is valid for the inclusion
         # processing stage.
         self._schema_validator.validate(ev_type_node, '3/config/event-type-pre-include')
@@ -1197,8 +1241,8 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Processes the inclusions of the stream type node
     # `stream_type_node`, returning the effective node.
-    def _process_stream_type_node_include(self, stream_type_node):
-        def process_children_include(stream_type_node):
+    def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
+        def process_children_include(stream_type_node: _MapNode):
             prop_name = 'event-types'
 
             if prop_name in stream_type_node:
@@ -1217,7 +1261,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Processes the inclusions of the clock type node `clk_type_node`,
     # returning the effective node.
-    def _process_clk_type_node_include(self, clk_type_node):
+    def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
         # Make sure the clock type node is valid for the inclusion
         # processing stage.
         self._schema_validator.validate(clk_type_node, '3/config/clock-type-pre-include')
@@ -1227,8 +1271,8 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Processes the inclusions of the trace type node `trace_type_node`,
     # returning the effective node.
-    def _process_trace_type_node_include(self, trace_type_node):
-        def process_children_include(trace_type_node):
+    def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
+        def process_children_include(trace_type_node: _MapNode):
             prop_name = 'clock-types'
 
             if prop_name in trace_type_node:
@@ -1255,8 +1299,8 @@ class _Parser(barectf_config_parse_common._Parser):
 
     # Processes the inclusions of the trace node `trace_node`, returning
     # the effective node.
-    def _process_trace_node_include(self, trace_node):
-        def process_children_include(trace_node):
+    def _process_trace_node_include(self, trace_node: _MapNode) -> _MapNode:
+        def process_children_include(trace_node: _MapNode):
             prop_name = 'type'
             trace_node[prop_name] = self._process_trace_type_node_include(trace_node[prop_name])
 
@@ -1294,13 +1338,13 @@ class _Parser(barectf_config_parse_common._Parser):
         #
         # First, make sure the configuration node itself is valid for
         # the inclusion processing stage.
-        self._schema_validator.validate(self._root_node.config_node, '3/config/config-pre-include')
+        self._schema_validator.validate(self.config_node, '3/config/config-pre-include')
 
         # Process trace node inclusions.
         #
         # self._process_trace_node_include() returns a new (or the same)
         # trace node without any `$include` property in it, recursively.
-        self._root_node.config_node['trace'] = self._process_trace_node_include(self._root_node.config_node['trace'])
+        self.config_node['trace'] = self._process_trace_node_include(self.config_node['trace'])
 
     def _parse(self):
         # process configuration node inclusions
@@ -1339,7 +1383,7 @@ class _Parser(barectf_config_parse_common._Parser):
 
         # At this point, the configuration node must be valid as an
         # effective configuration node.
-        self._schema_validator.validate(self._root_node.config_node, '3/config/config')
+        self._schema_validator.validate(self.config_node, '3/config/config')
 
         # Normalize properties.
         #
@@ -1367,9 +1411,9 @@ class _Parser(barectf_config_parse_common._Parser):
         self._create_config()
 
     @property
-    def config(self):
+    def config(self) -> barectf_config.Configuration:
         return self._config
 
     @property
-    def config_node(self):
-        return self._root_node
+    def config_node(self) -> _MapNode:
+        return typing.cast(barectf_config_parse_common._ConfigNodeV3, self._root_node).config_node
diff --git a/barectf/typing.py b/barectf/typing.py
new file mode 100644 (file)
index 0000000..36445db
--- /dev/null
@@ -0,0 +1,31 @@
+# The MIT License (MIT)
+#
+# Copyright (c) 2020 Philippe Proulx <pproulx@efficios.com>
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
+# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import typing
+
+Index = typing.NewType('Index', int)
+Count = typing.NewType('Count', int)
+Id = typing.NewType('Id', int)
+Alignment = typing.NewType('Alignment', int)
+VersionNumber = typing.NewType('VersionNumber', int)
+_OptStr = typing.Optional[str]
This page took 0.067234 seconds and 4 git commands to generate.