config_parse_v2.py: fix typing issue
[deliverable/barectf.git] / barectf / config_parse_v2.py
index 4faea4c0956e2739e7463fc32ed7bde0bb5c79ea..59f9733e48be0d743a8d97272213fb6908e8bd41 100644 (file)
 from barectf.config_parse_common import _ConfigurationParseError
 from barectf.config_parse_common import _append_error_ctx
 import barectf.config_parse_common as config_parse_common
+from barectf.config_parse_common import _MapNode
 import collections
 import copy
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, TextIO, Union, Callable
+import typing
 
 
-def _del_prop_if_exists(node, prop_name):
+def _del_prop_if_exists(node: _MapNode, prop_name: str):
     if prop_name in node:
         del node[prop_name]
 
 
-def _rename_prop(node, old_prop_name, new_prop_name):
+def _rename_prop(node: _MapNode, old_prop_name: str, new_prop_name: str):
     if old_prop_name in node:
         node[new_prop_name] = node[old_prop_name]
         del node[old_prop_name]
 
 
-def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
+def _copy_prop_if_exists(dst_node: _MapNode, src_node: _MapNode, src_prop_name: str,
+                         dst_prop_name: _OptStr = None):
     if dst_prop_name is None:
         dst_prop_name = src_prop_name
 
@@ -59,10 +64,13 @@ def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
 # parsing stages and general strategy.
 class _Parser(config_parse_common._Parser):
     # Builds a barectf 2 YAML configuration parser and parses the root
-    # configuration node `node` (already loaded from `path`).
-    def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found):
-        super().__init__(path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, 2)
-        self._ft_cls_name_to_conv_method = {
+    # configuration node `node` (already loaded from the file-like
+    # object `root_file`).
+    def __init__(self, root_file: TextIO, node: _MapNode, with_pkg_include_dir: bool,
+                 include_dirs: Optional[List[str]], ignore_include_not_found: bool):
+        super().__init__(root_file, node, with_pkg_include_dir, include_dirs,
+                         ignore_include_not_found, VersionNumber(2))
+        self._ft_cls_name_to_conv_method: Dict[str, Callable[[_MapNode], _MapNode]] = {
             'int': self._conv_int_ft_node,
             'integer': self._conv_int_ft_node,
             'enum': self._conv_enum_ft_node,
@@ -80,24 +88,24 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 field type node to a v3 field type node and returns
     # it.
-    def _conv_ft_node(self, v2_ft_node):
+    def _conv_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         assert 'class' in v2_ft_node
         cls = v2_ft_node['class']
         assert cls in self._ft_cls_name_to_conv_method
         return self._ft_cls_name_to_conv_method[cls](v2_ft_node)
 
-    def _conv_ft_node_if_exists(self, v2_parent_node, key):
+    def _conv_ft_node_if_exists(self, v2_parent_node: Optional[_MapNode], key: str) -> Optional[_MapNode]:
         if v2_parent_node is None:
-            return
+            return None
 
         if key not in v2_parent_node:
-            return
+            return None
 
         return self._conv_ft_node(v2_parent_node[key])
 
     # Converts a v2 integer field type node to a v3 integer field type
     # node and returns it.
-    def _conv_int_ft_node(self, v2_ft_node):
+    def _conv_int_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 integer field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -121,6 +129,9 @@ class _Parser(config_parse_common._Parser):
         # remove `encoding` property
         _del_prop_if_exists(v3_ft_node, 'encoding')
 
+        # remove `byte-order` property (always target BO in v3)
+        _del_prop_if_exists(v3_ft_node, 'byte-order')
+
         # remove `property-mappings` property
         _del_prop_if_exists(v3_ft_node, 'property-mappings')
 
@@ -128,7 +139,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 enumeration field type node to a v3 enumeration
     # field type node and returns it.
-    def _conv_enum_ft_node(self, v2_ft_node):
+    def _conv_enum_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # An enumeration field type _is_ an integer field type, so use a
         # copy of the converted v2 value field type node.
         v3_ft_node = copy.deepcopy(self._conv_ft_node(v2_ft_node['value-type']))
@@ -147,10 +158,12 @@ class _Parser(config_parse_common._Parser):
         members_node = v2_ft_node.get(prop_name)
 
         if members_node is not None:
-            mappings_node = collections.OrderedDict()
+            mappings_node: _MapNode = collections.OrderedDict()
             cur = 0
 
             for member_node in members_node:
+                v3_value_node: Union[int, List[int]]
+
                 if type(member_node) is str:
                     label = member_node
                     v3_value_node = cur
@@ -180,7 +193,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 real field type node to a v3 real field type node
     # and returns it.
-    def _conv_real_ft_node(self, v2_ft_node):
+    def _conv_real_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 real field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -198,7 +211,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 string field type node to a v3 string field type
     # node and returns it.
-    def _conv_string_ft_node(self, v2_ft_node):
+    def _conv_string_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # copy v2 string field type node
         v3_ft_node = copy.deepcopy(v2_ft_node)
 
@@ -209,9 +222,9 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 array field type node to a v3 (static) array field
     # type node and returns it.
-    def _conv_static_array_ft_node(self, v2_ft_node):
+    def _conv_static_array_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # class renamed to `static-array`
-        v3_ft_node = collections.OrderedDict({'class': 'static-array'})
+        v3_ft_node: _MapNode = collections.OrderedDict({'class': 'static-array'})
 
         # copy `length` property
         _copy_prop_if_exists(v3_ft_node, v2_ft_node, 'length')
@@ -223,7 +236,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 structure field type node to a v3 structure field
     # type node and returns it.
-    def _conv_struct_ft_node(self, v2_ft_node):
+    def _conv_struct_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
         # Create fresh v3 structure field type node, reusing the class
         # of `v2_ft_node`.
         v3_ft_node = collections.OrderedDict({'class': v2_ft_node['class']})
@@ -250,7 +263,7 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 clock type node to a v3 clock type node and returns
     # it.
-    def _conv_clk_type_node(self, v2_clk_type_node):
+    def _conv_clk_type_node(self, v2_clk_type_node: _MapNode) -> _MapNode:
         # copy v2 clock type node
         v3_clk_type_node = copy.deepcopy(v2_clk_type_node)
 
@@ -272,9 +285,9 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 event type node to a v3 event type node and returns
     # it.
-    def _conv_ev_type_node(self, v2_ev_type_node):
+    def _conv_ev_type_node(self, v2_ev_type_node: _MapNode) -> _MapNode:
         # create empty v3 event type node
-        v3_ev_type_node = collections.OrderedDict()
+        v3_ev_type_node: _MapNode = collections.OrderedDict()
 
         # copy `log-level` property
         _copy_prop_if_exists(v3_ev_type_node, v2_ev_type_node, 'log-level')
@@ -294,7 +307,8 @@ class _Parser(config_parse_common._Parser):
         return v3_ev_type_node
 
     @staticmethod
-    def _set_v3_feature_ft_if_exists(v3_features_node, key, node):
+    def _set_v3_feature_ft_if_exists(v3_features_node: _MapNode, key: str,
+                                     node: Union[Optional[_MapNode], bool]):
         val = node
 
         if val is None:
@@ -304,12 +318,12 @@ class _Parser(config_parse_common._Parser):
 
     # Converts a v2 stream type node to a v3 stream type node and
     # returns it.
-    def _conv_stream_type_node(self, v2_stream_type_node):
+    def _conv_stream_type_node(self, v2_stream_type_node: _MapNode) -> _MapNode:
         # This function creates a v3 stream type features node from the
         # packet context and event header field type nodes of a
         # v2 stream type node.
-        def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node,
-                                              v2_ev_header_ft_fields_node):
+        def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node: _MapNode,
+                                              v2_ev_header_ft_fields_node: Optional[_MapNode]) -> _MapNode:
             if v2_ev_header_ft_fields_node is None:
                 v2_ev_header_ft_fields_node = collections.OrderedDict()
 
@@ -324,9 +338,9 @@ class _Parser(config_parse_common._Parser):
             v3_ev_type_id_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'id')
             v3_ev_time_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node,
                                                               'timestamp')
-            v3_features_node = collections.OrderedDict()
-            v3_pkt_node = collections.OrderedDict()
-            v3_ev_node = collections.OrderedDict()
+            v3_features_node: _MapNode = collections.OrderedDict()
+            v3_pkt_node: _MapNode = collections.OrderedDict()
+            v3_ev_node: _MapNode = collections.OrderedDict()
             v3_pkt_node['total-size-field-type'] = v3_pkt_total_size_ft_node
             v3_pkt_node['content-size-field-type'] = v3_pkt_content_size_ft_node
             self._set_v3_feature_ft_if_exists(v3_pkt_node, 'beginning-time-field-type',
@@ -342,9 +356,9 @@ class _Parser(config_parse_common._Parser):
             v3_features_node['event'] = v3_ev_node
             return v3_features_node
 
-        def clk_type_name_from_v2_int_ft_node(v2_int_ft_node):
+        def clk_type_name_from_v2_int_ft_node(v2_int_ft_node: Optional[_MapNode]) -> _OptStr:
             if v2_int_ft_node is None:
-                return
+                return None
 
             assert v2_int_ft_node['class'] in ('int', 'integer')
             prop_mappings_node = v2_int_ft_node.get('property-mappings')
@@ -352,8 +366,10 @@ class _Parser(config_parse_common._Parser):
             if prop_mappings_node is not None and len(prop_mappings_node) > 0:
                 return prop_mappings_node[0]['name']
 
+            return None
+
         # create empty v3 stream type node
-        v3_stream_type_node = collections.OrderedDict()
+        v3_stream_type_node: _MapNode = collections.OrderedDict()
 
         # rename `$default` property to `$is-default`
         _copy_prop_if_exists(v3_stream_type_node, v2_stream_type_node, '$default', '$is-default')
@@ -446,8 +462,8 @@ class _Parser(config_parse_common._Parser):
         return v3_stream_type_node
 
     # Converts a v2 metadata node to a v3 trace node and returns it.
-    def _conv_meta_node(self, v2_meta_node):
-        def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node):
+    def _conv_meta_node(self, v2_meta_node: _MapNode) -> _MapNode:
+        def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node: Optional[_MapNode]) -> _MapNode:
             def set_if_exists(key, node):
                 return self._set_v3_feature_ft_if_exists(v3_features_node, key, node)
 
@@ -460,18 +476,19 @@ class _Parser(config_parse_common._Parser):
             v3_uuid_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'uuid')
             v3_stream_type_id_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node,
                                                                      'stream_id')
-            v3_features_node = collections.OrderedDict()
+            v3_features_node: _MapNode = collections.OrderedDict()
             set_if_exists('magic-field-type', v3_magic_ft_node)
             set_if_exists('uuid-field-type', v3_uuid_ft_node)
             set_if_exists('stream-type-id-field-type', v3_stream_type_id_ft_node)
             return v3_features_node
 
-        v3_trace_node = collections.OrderedDict()
-        v3_trace_type_node = collections.OrderedDict()
+        v3_trace_node: _MapNode = collections.OrderedDict()
+        v3_trace_type_node: _MapNode = collections.OrderedDict()
         v2_trace_node = v2_meta_node['trace']
 
-        # rename `byte-order` property to `$default-byte-order`
-        _copy_prop_if_exists(v3_trace_type_node, v2_trace_node, 'byte-order', '$default-byte-order')
+        # Move `byte-order` property to root node's `target-byte-order`
+        # property.
+        typing.cast(_MapNode, self._root_node)['target-byte-order'] = v2_trace_node['byte-order']
 
         # copy `uuid` property
         _copy_prop_if_exists(v3_trace_type_node, v2_trace_node, 'uuid')
@@ -645,7 +662,7 @@ class _Parser(config_parse_common._Parser):
         # Make sure that the current configuration node is valid
         # considering field types are not expanded yet.
         self._schema_validator.validate(self._root_node,
-                                        '2/config/config-pre-field-type-expansion')
+                                        'config/2/config-pre-field-type-expansion')
 
         meta_node = self._root_node['metadata']
         ft_aliases_node = meta_node.get('type-aliases')
@@ -663,17 +680,17 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the event type node `ev_type_node`,
     # returning the effective node.
-    def _process_ev_type_node_include(self, ev_type_node):
+    def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
         # Make sure the event type node is valid for the inclusion
         # processing stage.
-        self._schema_validator.validate(ev_type_node, '2/config/event-type-pre-include')
+        self._schema_validator.validate(ev_type_node, 'config/2/event-type-pre-include')
 
         # process inclusions
         return self._process_node_include(ev_type_node, self._process_ev_type_node_include)
 
     # Processes the inclusions of the stream type node
     # `stream_type_node`, returning the effective node.
-    def _process_stream_type_node_include(self, stream_type_node):
+    def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
         def process_children_include(stream_type_node):
             prop_name = 'events'
 
@@ -685,7 +702,7 @@ class _Parser(config_parse_common._Parser):
 
         # Make sure the stream type node is valid for the inclusion
         # processing stage.
-        self._schema_validator.validate(stream_type_node, '2/config/stream-type-pre-include')
+        self._schema_validator.validate(stream_type_node, 'config/2/stream-type-pre-include')
 
         # process inclusions
         return self._process_node_include(stream_type_node, self._process_stream_type_node_include,
@@ -693,28 +710,28 @@ class _Parser(config_parse_common._Parser):
 
     # Processes the inclusions of the trace type node `trace_type_node`,
     # returning the effective node.
-    def _process_trace_type_node_include(self, trace_type_node):
+    def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
         # Make sure the trace type node is valid for the inclusion
         # processing stage.
-        self._schema_validator.validate(trace_type_node, '2/config/trace-type-pre-include')
+        self._schema_validator.validate(trace_type_node, 'config/2/trace-type-pre-include')
 
         # process inclusions
         return self._process_node_include(trace_type_node, self._process_trace_type_node_include)
 
     # Processes the inclusions of the clock type node `clk_type_node`,
     # returning the effective node.
-    def _process_clk_type_node_include(self, clk_type_node):
+    def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
         # Make sure the clock type node is valid for the inclusion
         # processing stage.
-        self._schema_validator.validate(clk_type_node, '2/config/clock-type-pre-include')
+        self._schema_validator.validate(clk_type_node, 'config/2/clock-type-pre-include')
 
         # process inclusions
         return self._process_node_include(clk_type_node, self._process_clk_type_node_include)
 
     # Processes the inclusions of the metadata node `meta_node`,
     # returning the effective node.
-    def _process_meta_node_include(self, meta_node):
-        def process_children_include(meta_node):
+    def _process_meta_node_include(self, meta_node: _MapNode) -> _MapNode:
+        def process_children_include(meta_node: _MapNode):
             prop_name = 'trace'
 
             if prop_name in meta_node:
@@ -738,7 +755,7 @@ class _Parser(config_parse_common._Parser):
 
         # Make sure the metadata node is valid for the inclusion
         # processing stage.
-        self._schema_validator.validate(meta_node, '2/config/metadata-pre-include')
+        self._schema_validator.validate(meta_node, 'config/2/metadata-pre-include')
 
         # process inclusions
         return self._process_node_include(meta_node, self._process_meta_node_include,
@@ -766,7 +783,7 @@ class _Parser(config_parse_common._Parser):
         # First, make sure the configuration node itself is valid for
         # the inclusion processing stage.
         self._schema_validator.validate(self._root_node,
-                                        '2/config/config-pre-include')
+                                        'config/2/config-pre-include')
 
         # Process metadata node inclusions.
         #
@@ -786,7 +803,7 @@ class _Parser(config_parse_common._Parser):
         #
         # * Process inclusions.
         # * Expand field types (aliases and inheritance).
-        self._schema_validator.validate(self._root_node, '2/config/config-min')
+        self._schema_validator.validate(self._root_node, 'config/2/config-min')
 
         # process configuration node inclusions
         self._process_config_includes()
@@ -826,12 +843,12 @@ class _Parser(config_parse_common._Parser):
         # event type nodes can be log level aliases. Log level aliases
         # are also a feature of a barectf 3 configuration node,
         # therefore this is compatible.
-        self._schema_validator.validate(self._root_node, '2/config/config')
+        self._schema_validator.validate(self._root_node, 'config/2/config')
 
         # Transform the current configuration node into a valid v3
         # configuration node.
         self._transform_config_node()
 
     @property
-    def config_node(self):
-        return config_parse_common._ConfigNodeV3(self._root_node)
+    def config_node(self) -> config_parse_common._ConfigNodeV3:
+        return config_parse_common._ConfigNodeV3(typing.cast(_MapNode, self._root_node))
This page took 0.029194 seconds and 4 git commands to generate.