from barectf.config_parse_common import _ConfigurationParseError
from barectf.config_parse_common import _append_error_ctx
import barectf.config_parse_common as config_parse_common
+from barectf.config_parse_common import _MapNode
import collections
import copy
+from barectf.typing import VersionNumber, _OptStr
+from typing import Optional, List, Dict, TextIO, Union, Callable
+import typing
-def _del_prop_if_exists(node, prop_name):
+def _del_prop_if_exists(node: _MapNode, prop_name: str):
if prop_name in node:
del node[prop_name]
-def _rename_prop(node, old_prop_name, new_prop_name):
+def _rename_prop(node: _MapNode, old_prop_name: str, new_prop_name: str):
if old_prop_name in node:
node[new_prop_name] = node[old_prop_name]
del node[old_prop_name]
-def _copy_prop_if_exists(dst_node, src_node, src_prop_name, dst_prop_name=None):
+def _copy_prop_if_exists(dst_node: _MapNode, src_node: _MapNode, src_prop_name: str,
+ dst_prop_name: _OptStr = None):
if dst_prop_name is None:
dst_prop_name = src_prop_name
# parsing stages and general strategy.
class _Parser(config_parse_common._Parser):
# Builds a barectf 2 YAML configuration parser and parses the root
- # configuration node `node` (already loaded from `path`).
- def __init__(self, path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found):
- super().__init__(path, node, with_pkg_include_dir, include_dirs, ignore_include_not_found, 2)
- self._ft_cls_name_to_conv_method = {
+ # configuration node `node` (already loaded from the file-like
+ # object `root_file`).
+ def __init__(self, root_file: TextIO, node: _MapNode, with_pkg_include_dir: bool,
+ include_dirs: Optional[List[str]], ignore_include_not_found: bool):
+ super().__init__(root_file, node, with_pkg_include_dir, include_dirs,
+ ignore_include_not_found, VersionNumber(2))
+ self._ft_cls_name_to_conv_method: Dict[str, Callable[[_MapNode], _MapNode]] = {
'int': self._conv_int_ft_node,
'integer': self._conv_int_ft_node,
'enum': self._conv_enum_ft_node,
# Converts a v2 field type node to a v3 field type node and returns
# it.
- def _conv_ft_node(self, v2_ft_node):
+ def _conv_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
assert 'class' in v2_ft_node
cls = v2_ft_node['class']
assert cls in self._ft_cls_name_to_conv_method
return self._ft_cls_name_to_conv_method[cls](v2_ft_node)
- def _conv_ft_node_if_exists(self, v2_parent_node, key):
+ def _conv_ft_node_if_exists(self, v2_parent_node: Optional[_MapNode], key: str) -> Optional[_MapNode]:
if v2_parent_node is None:
- return
+ return None
if key not in v2_parent_node:
- return
+ return None
return self._conv_ft_node(v2_parent_node[key])
# Converts a v2 integer field type node to a v3 integer field type
# node and returns it.
- def _conv_int_ft_node(self, v2_ft_node):
+ def _conv_int_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 integer field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# remove `encoding` property
_del_prop_if_exists(v3_ft_node, 'encoding')
+ # remove `byte-order` property (always target BO in v3)
+ _del_prop_if_exists(v3_ft_node, 'byte-order')
+
# remove `property-mappings` property
_del_prop_if_exists(v3_ft_node, 'property-mappings')
# Converts a v2 enumeration field type node to a v3 enumeration
# field type node and returns it.
- def _conv_enum_ft_node(self, v2_ft_node):
+ def _conv_enum_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# An enumeration field type _is_ an integer field type, so use a
# copy of the converted v2 value field type node.
v3_ft_node = copy.deepcopy(self._conv_ft_node(v2_ft_node['value-type']))
members_node = v2_ft_node.get(prop_name)
if members_node is not None:
- mappings_node = collections.OrderedDict()
+ mappings_node: _MapNode = collections.OrderedDict()
cur = 0
for member_node in members_node:
+ v3_value_node: Union[int, List[int]]
+
if type(member_node) is str:
label = member_node
v3_value_node = cur
# Converts a v2 real field type node to a v3 real field type node
# and returns it.
- def _conv_real_ft_node(self, v2_ft_node):
+ def _conv_real_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 real field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# Converts a v2 string field type node to a v3 string field type
# node and returns it.
- def _conv_string_ft_node(self, v2_ft_node):
+ def _conv_string_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# copy v2 string field type node
v3_ft_node = copy.deepcopy(v2_ft_node)
# Converts a v2 array field type node to a v3 (static) array field
# type node and returns it.
- def _conv_static_array_ft_node(self, v2_ft_node):
+ def _conv_static_array_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# class renamed to `static-array`
- v3_ft_node = collections.OrderedDict({'class': 'static-array'})
+ v3_ft_node: _MapNode = collections.OrderedDict({'class': 'static-array'})
# copy `length` property
_copy_prop_if_exists(v3_ft_node, v2_ft_node, 'length')
# Converts a v2 structure field type node to a v3 structure field
# type node and returns it.
- def _conv_struct_ft_node(self, v2_ft_node):
+ def _conv_struct_ft_node(self, v2_ft_node: _MapNode) -> _MapNode:
# Create fresh v3 structure field type node, reusing the class
# of `v2_ft_node`.
v3_ft_node = collections.OrderedDict({'class': v2_ft_node['class']})
# Converts a v2 clock type node to a v3 clock type node and returns
# it.
- def _conv_clk_type_node(self, v2_clk_type_node):
+ def _conv_clk_type_node(self, v2_clk_type_node: _MapNode) -> _MapNode:
# copy v2 clock type node
v3_clk_type_node = copy.deepcopy(v2_clk_type_node)
# Converts a v2 event type node to a v3 event type node and returns
# it.
- def _conv_ev_type_node(self, v2_ev_type_node):
+ def _conv_ev_type_node(self, v2_ev_type_node: _MapNode) -> _MapNode:
# create empty v3 event type node
- v3_ev_type_node = collections.OrderedDict()
+ v3_ev_type_node: _MapNode = collections.OrderedDict()
# copy `log-level` property
_copy_prop_if_exists(v3_ev_type_node, v2_ev_type_node, 'log-level')
return v3_ev_type_node
@staticmethod
- def _set_v3_feature_ft_if_exists(v3_features_node, key, node):
+ def _set_v3_feature_ft_if_exists(v3_features_node: _MapNode, key: str,
+ node: Union[Optional[_MapNode], bool]):
val = node
if val is None:
# Converts a v2 stream type node to a v3 stream type node and
# returns it.
- def _conv_stream_type_node(self, v2_stream_type_node):
+ def _conv_stream_type_node(self, v2_stream_type_node: _MapNode) -> _MapNode:
# This function creates a v3 stream type features node from the
# packet context and event header field type nodes of a
# v2 stream type node.
- def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node,
- v2_ev_header_ft_fields_node):
+ def v3_features_node_from_v2_ft_nodes(v2_pkt_ctx_ft_fields_node: _MapNode,
+ v2_ev_header_ft_fields_node: Optional[_MapNode]) -> _MapNode:
if v2_ev_header_ft_fields_node is None:
v2_ev_header_ft_fields_node = collections.OrderedDict()
v3_ev_type_id_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node, 'id')
v3_ev_time_ft_node = self._conv_ft_node_if_exists(v2_ev_header_ft_fields_node,
'timestamp')
- v3_features_node = collections.OrderedDict()
- v3_pkt_node = collections.OrderedDict()
- v3_ev_node = collections.OrderedDict()
+ v3_features_node: _MapNode = collections.OrderedDict()
+ v3_pkt_node: _MapNode = collections.OrderedDict()
+ v3_ev_node: _MapNode = collections.OrderedDict()
v3_pkt_node['total-size-field-type'] = v3_pkt_total_size_ft_node
v3_pkt_node['content-size-field-type'] = v3_pkt_content_size_ft_node
self._set_v3_feature_ft_if_exists(v3_pkt_node, 'beginning-time-field-type',
v3_features_node['event'] = v3_ev_node
return v3_features_node
- def clk_type_name_from_v2_int_ft_node(v2_int_ft_node):
+ def clk_type_name_from_v2_int_ft_node(v2_int_ft_node: Optional[_MapNode]) -> _OptStr:
if v2_int_ft_node is None:
- return
+ return None
assert v2_int_ft_node['class'] in ('int', 'integer')
prop_mappings_node = v2_int_ft_node.get('property-mappings')
if prop_mappings_node is not None and len(prop_mappings_node) > 0:
return prop_mappings_node[0]['name']
+ return None
+
# create empty v3 stream type node
- v3_stream_type_node = collections.OrderedDict()
+ v3_stream_type_node: _MapNode = collections.OrderedDict()
# rename `$default` property to `$is-default`
_copy_prop_if_exists(v3_stream_type_node, v2_stream_type_node, '$default', '$is-default')
return v3_stream_type_node
# Converts a v2 metadata node to a v3 trace node and returns it.
- def _conv_meta_node(self, v2_meta_node):
- def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node):
+ def _conv_meta_node(self, v2_meta_node: _MapNode) -> _MapNode:
+ def v3_features_node_from_v2_ft_node(v2_pkt_header_ft_node: Optional[_MapNode]) -> _MapNode:
def set_if_exists(key, node):
return self._set_v3_feature_ft_if_exists(v3_features_node, key, node)
v3_uuid_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node, 'uuid')
v3_stream_type_id_ft_node = self._conv_ft_node_if_exists(v2_pkt_header_ft_fields_node,
'stream_id')
- v3_features_node = collections.OrderedDict()
+ v3_features_node: _MapNode = collections.OrderedDict()
set_if_exists('magic-field-type', v3_magic_ft_node)
set_if_exists('uuid-field-type', v3_uuid_ft_node)
set_if_exists('stream-type-id-field-type', v3_stream_type_id_ft_node)
return v3_features_node
- v3_trace_node = collections.OrderedDict()
- v3_trace_type_node = collections.OrderedDict()
+ v3_trace_node: _MapNode = collections.OrderedDict()
+ v3_trace_type_node: _MapNode = collections.OrderedDict()
v2_trace_node = v2_meta_node['trace']
- # rename `byte-order` property to `$default-byte-order`
- _copy_prop_if_exists(v3_trace_type_node, v2_trace_node, 'byte-order', '$default-byte-order')
+ # Move `byte-order` property to root node's `target-byte-order`
+ # property.
+ typing.cast(_MapNode, self._root_node)['target-byte-order'] = v2_trace_node['byte-order']
# copy `uuid` property
_copy_prop_if_exists(v3_trace_type_node, v2_trace_node, 'uuid')
# Make sure that the current configuration node is valid
# considering field types are not expanded yet.
self._schema_validator.validate(self._root_node,
- '2/config/config-pre-field-type-expansion')
+ 'config/2/config-pre-field-type-expansion')
meta_node = self._root_node['metadata']
ft_aliases_node = meta_node.get('type-aliases')
# Processes the inclusions of the event type node `ev_type_node`,
# returning the effective node.
- def _process_ev_type_node_include(self, ev_type_node):
+ def _process_ev_type_node_include(self, ev_type_node: _MapNode) -> _MapNode:
# Make sure the event type node is valid for the inclusion
# processing stage.
- self._schema_validator.validate(ev_type_node, '2/config/event-type-pre-include')
+ self._schema_validator.validate(ev_type_node, 'config/2/event-type-pre-include')
# process inclusions
return self._process_node_include(ev_type_node, self._process_ev_type_node_include)
# Processes the inclusions of the stream type node
# `stream_type_node`, returning the effective node.
- def _process_stream_type_node_include(self, stream_type_node):
+ def _process_stream_type_node_include(self, stream_type_node: _MapNode) -> _MapNode:
def process_children_include(stream_type_node):
prop_name = 'events'
# Make sure the stream type node is valid for the inclusion
# processing stage.
- self._schema_validator.validate(stream_type_node, '2/config/stream-type-pre-include')
+ self._schema_validator.validate(stream_type_node, 'config/2/stream-type-pre-include')
# process inclusions
return self._process_node_include(stream_type_node, self._process_stream_type_node_include,
# Processes the inclusions of the trace type node `trace_type_node`,
# returning the effective node.
- def _process_trace_type_node_include(self, trace_type_node):
+ def _process_trace_type_node_include(self, trace_type_node: _MapNode) -> _MapNode:
# Make sure the trace type node is valid for the inclusion
# processing stage.
- self._schema_validator.validate(trace_type_node, '2/config/trace-type-pre-include')
+ self._schema_validator.validate(trace_type_node, 'config/2/trace-type-pre-include')
# process inclusions
return self._process_node_include(trace_type_node, self._process_trace_type_node_include)
# Processes the inclusions of the clock type node `clk_type_node`,
# returning the effective node.
- def _process_clk_type_node_include(self, clk_type_node):
+ def _process_clk_type_node_include(self, clk_type_node: _MapNode) -> _MapNode:
# Make sure the clock type node is valid for the inclusion
# processing stage.
- self._schema_validator.validate(clk_type_node, '2/config/clock-type-pre-include')
+ self._schema_validator.validate(clk_type_node, 'config/2/clock-type-pre-include')
# process inclusions
return self._process_node_include(clk_type_node, self._process_clk_type_node_include)
# Processes the inclusions of the metadata node `meta_node`,
# returning the effective node.
- def _process_meta_node_include(self, meta_node):
- def process_children_include(meta_node):
+ def _process_meta_node_include(self, meta_node: _MapNode) -> _MapNode:
+ def process_children_include(meta_node: _MapNode):
prop_name = 'trace'
if prop_name in meta_node:
# Make sure the metadata node is valid for the inclusion
# processing stage.
- self._schema_validator.validate(meta_node, '2/config/metadata-pre-include')
+ self._schema_validator.validate(meta_node, 'config/2/metadata-pre-include')
# process inclusions
return self._process_node_include(meta_node, self._process_meta_node_include,
# First, make sure the configuration node itself is valid for
# the inclusion processing stage.
self._schema_validator.validate(self._root_node,
- '2/config/config-pre-include')
+ 'config/2/config-pre-include')
# Process metadata node inclusions.
#
#
# * Process inclusions.
# * Expand field types (aliases and inheritance).
- self._schema_validator.validate(self._root_node, '2/config/config-min')
+ self._schema_validator.validate(self._root_node, 'config/2/config-min')
# process configuration node inclusions
self._process_config_includes()
# event type nodes can be log level aliases. Log level aliases
# are also a feature of a barectf 3 configuration node,
# therefore this is compatible.
- self._schema_validator.validate(self._root_node, '2/config/config')
+ self._schema_validator.validate(self._root_node, 'config/2/config')
# Transform the current configuration node into a valid v3
# configuration node.
self._transform_config_node()
@property
- def config_node(self):
- return config_parse_common._ConfigNodeV3(self._root_node)
+ def config_node(self) -> config_parse_common._ConfigNodeV3:
+ return config_parse_common._ConfigNodeV3(typing.cast(_MapNode, self._root_node))