# Upstream repository: <https://github.com/efficios/normand>.
__author__ = "Philippe Proulx"
-__version__ = "0.2.1"
+__version__ = "0.3.0"
__all__ = [
"ByteOrder",
"parse",
import sys
import enum
import struct
-from typing import Any, Dict, List, Union, Pattern, Callable, NoReturn, Optional
+from typing import (
+ Any,
+ Set,
+ Dict,
+ List,
+ Tuple,
+ Union,
+ Pattern,
+ Callable,
+ NoReturn,
+ Optional,
+)
# Text location (line and column numbers).
def col_no(self):
return self._col_no
+ def __repr__(self):
+ return "TextLoc({}, {})".format(self._line_no, self._col_no)
+
# Any item.
class _Item:
def text_loc(self):
return self._text_loc
+
+# Scalar item.
+class _ScalarItem(_Item):
# Returns the size, in bytes, of this item.
@property
@abc.abstractmethod
# A repeatable item.
-class _RepableItem(_Item):
+class _RepableItem:
pass
# Single byte.
-class _Byte(_RepableItem):
+class _Byte(_ScalarItem, _RepableItem):
def __init__(self, val: int, text_loc: TextLoc):
super().__init__(text_loc)
self._val = val
# String.
-class _Str(_RepableItem):
+class _Str(_ScalarItem, _RepableItem):
def __init__(self, data: bytes, text_loc: TextLoc):
super().__init__(text_loc)
self._data = data
LE = "le"
-# Byte order.
-class _Bo(_Item):
+# Byte order setting.
+class _SetBo(_Item):
def __init__(self, bo: ByteOrder, text_loc: TextLoc):
super().__init__(text_loc)
self._bo = bo
def bo(self):
return self._bo
- @property
- def size(self):
- return 0
+ def __repr__(self):
+ return "_SetBo({}, {})".format(repr(self._bo), self._text_loc)
# Label.
def name(self):
return self._name
- @property
- def size(self):
- return 0
-
def __repr__(self):
return "_Label({}, {})".format(repr(self._name), self._text_loc)
-# Offset.
-class _Offset(_Item):
+# Offset setting.
+class _SetOffset(_Item):
def __init__(self, val: int, text_loc: TextLoc):
super().__init__(text_loc)
self._val = val
def val(self):
return self._val
- @property
- def size(self):
- return 0
-
def __repr__(self):
- return "_Offset({}, {})".format(repr(self._val), self._text_loc)
+ return "_SetOffset({}, {})".format(repr(self._val), self._text_loc)
# Mixin of containing an AST expression and its string.
return self._expr
-# Variable.
-class _Var(_Item, _ExprMixin):
+# Variable assignment.
+class _VarAssign(_Item, _ExprMixin):
def __init__(
self, name: str, expr_str: str, expr: ast.Expression, text_loc: TextLoc
):
def name(self):
return self._name
- @property
- def size(self):
- return 0
-
def __repr__(self):
- return "_Var({}, {}, {}, {})".format(
+ return "_VarAssign({}, {}, {}, {})".format(
repr(self._name), repr(self._expr_str), repr(self._expr), self._text_loc
)
# Value, possibly needing more than one byte.
-class _Val(_RepableItem, _ExprMixin):
+class _Val(_ScalarItem, _RepableItem, _ExprMixin):
def __init__(
self, expr_str: str, expr: ast.Expression, len: int, text_loc: TextLoc
):
)
-# Expression item type.
-_ExprItemT = Union[_Val, _Var]
-
-
# Group of items.
-class _Group(_RepableItem):
+class _Group(_Item, _RepableItem):
def __init__(self, items: List[_Item], text_loc: TextLoc):
super().__init__(text_loc)
self._items = items
- self._size = sum([item.size for item in self._items])
# Contained items.
@property
def items(self):
return self._items
- @property
- def size(self):
- return self._size
-
def __repr__(self):
return "_Group({}, {})".format(repr(self._items), self._text_loc)
# Repetition item.
-class _Rep(_Item):
- def __init__(self, item: _RepableItem, mul: int, text_loc: TextLoc):
+class _Rep(_Item, _ExprMixin):
+ def __init__(
+ self, item: _Item, expr_str: str, expr: ast.Expression, text_loc: TextLoc
+ ):
super().__init__(text_loc)
+ _ExprMixin.__init__(self, expr_str, expr)
self._item = item
- self._mul = mul
# Item to repeat.
@property
def item(self):
return self._item
- # Repetition multiplier.
- @property
- def mul(self):
- return self._mul
-
- @property
- def size(self):
- return self._item.size * self._mul
-
def __repr__(self):
- return "_Rep({}, {}, {})".format(
- repr(self._item), repr(self._mul), self._text_loc
+ return "_Rep({}, {}, {}, {})".format(
+ repr(self._item), repr(self._expr_str), repr(self._expr), self._text_loc
)
+# Expression item type.
+_ExprItemT = Union[_Val, _VarAssign, _Rep]
+
+
# A parsing error containing a message and a text location.
class ParseError(RuntimeError):
@classmethod
raise ParseError._create(msg, text_loc) # pyright: ignore[reportPrivateUsage]
-# Variable (and label) dictionary type.
+# Variable/label dictionary type.
VarsT = Dict[str, int]
)
# Patterns for _try_parse_val_and_len()
- _var_pat = re.compile(
+ _var_assign_pat = re.compile(
r"(?P<name>{})\s*=\s*(?P<expr>[^}}]+)".format(_py_name_pat.pattern)
)
- # Tries to parse a variable, returning a variable item on success.
- def _try_parse_var(self):
+ # Tries to parse a variable assignment, returning a variable
+ # assignment item on success.
+ def _try_parse_var_assign(self):
begin_text_loc = self._text_loc
# Match
- m = self._try_parse_pat(self._var_pat)
+ m = self._try_parse_pat(self._var_assign_pat)
if m is None:
# No match
expr_str, expr = self._ast_expr_from_str(m.group("expr"), begin_text_loc)
# Return item
- return _Var(
+ return _VarAssign(
name,
expr_str,
expr,
begin_text_loc,
)
- # Pattern for _try_parse_bo_name()
+ # Pattern for _try_parse_set_bo()
_bo_pat = re.compile(r"[bl]e")
- # Tries to parse a byte order name, returning a byte order item on
- # success.
- def _try_parse_bo_name(self):
+ # Tries to parse a byte order name, returning a byte order setting
+ # item on success.
+ def _try_parse_set_bo(self):
begin_text_loc = self._text_loc
# Match
# Return corresponding item
if m.group(0) == "be":
- return _Bo(ByteOrder.BE, begin_text_loc)
+ return _SetBo(ByteOrder.BE, begin_text_loc)
else:
assert m.group(0) == "le"
- return _Bo(ByteOrder.LE, begin_text_loc)
+ return _SetBo(ByteOrder.LE, begin_text_loc)
# Patterns for _try_parse_val_or_bo()
- _val_var_bo_prefix_pat = re.compile(r"\{\s*")
- _val_var_bo_suffix_pat = re.compile(r"\s*}")
+ _val_var_assign_set_bo_prefix_pat = re.compile(r"\{\s*")
+ _val_var_assign_set_bo_suffix_pat = re.compile(r"\s*}")
- # Tries to parse a value, a variable, or a byte order, returning an
- # item on success.
- def _try_parse_val_or_var_or_bo(self):
+ # Tries to parse a value, a variable assignment, or a byte order
+ # setting, returning an item on success.
+ def _try_parse_val_or_var_assign_or_set_bo(self):
# Match prefix
- if self._try_parse_pat(self._val_var_bo_prefix_pat) is None:
+ if self._try_parse_pat(self._val_var_assign_set_bo_prefix_pat) is None:
# No match
return
- # Variable item?
- item = self._try_parse_var()
+ # Variable assignment item?
+ item = self._try_parse_var_assign()
if item is None:
# Value item?
item = self._try_parse_val_and_len()
if item is None:
- # Byte order item?
- item = self._try_parse_bo_name()
+ # Byte order setting item?
+ item = self._try_parse_set_bo()
if item is None:
# At this point it's invalid
- self._raise_error("Expecting a value, a variable, or a byte order")
+ self._raise_error(
+ "Expecting a value, a variable assignment, or a byte order setting"
+ )
# Expect suffix
- self._expect_pat(self._val_var_bo_suffix_pat, "Expecting `}`")
+ self._expect_pat(self._val_var_assign_set_bo_suffix_pat, "Expecting `}`")
return item
- # Pattern for _try_parse_offset_val() and _try_parse_rep()
+ # Pattern for _try_parse_set_offset_val() and _try_parse_rep()
_pos_const_int_pat = re.compile(r"0[Xx][A-Fa-f0-9]+|\d+")
- # Tries to parse an offset value (after the initial `<`), returning
- # an offset item on success.
- def _try_parse_offset_val(self):
+ # Tries to parse an offset setting value (after the initial `<`),
+ # returning an offset item on success.
+ def _try_parse_set_offset_val(self):
begin_text_loc = self._text_loc
# Match
return
# Return item
- return _Offset(int(m.group(0), 0), begin_text_loc)
+ return _SetOffset(int(m.group(0), 0), begin_text_loc)
# Tries to parse a label name (after the initial `<`), returning a
# label item on success.
# Return item
return _Label(name, begin_text_loc)
- # Patterns for _try_parse_label_or_offset()
- _label_offset_prefix_pat = re.compile(r"<\s*")
- _label_offset_suffix_pat = re.compile(r"\s*>")
+ # Patterns for _try_parse_label_or_set_offset()
+ _label_set_offset_prefix_pat = re.compile(r"<\s*")
+ _label_set_offset_suffix_pat = re.compile(r"\s*>")
- # Tries to parse a label or an offset, returning an item on success.
- def _try_parse_label_or_offset(self):
+ # Tries to parse a label or an offset setting, returning an item on
+ # success.
+ def _try_parse_label_or_set_offset(self):
# Match prefix
- if self._try_parse_pat(self._label_offset_prefix_pat) is None:
+ if self._try_parse_pat(self._label_set_offset_prefix_pat) is None:
# No match
return
- # Offset item?
- item = self._try_parse_offset_val()
+ # Offset setting item?
+ item = self._try_parse_set_offset_val()
if item is None:
# Label item?
if item is None:
# At this point it's invalid
- self._raise_error("Expecting a label name or an offset value")
+ self._raise_error("Expecting a label name or an offset setting value")
# Expect suffix
- self._expect_pat(self._label_offset_suffix_pat, "Expecting `>`")
+ self._expect_pat(self._label_set_offset_suffix_pat, "Expecting `>`")
return item
# Tries to parse a base item (anything except a repetition),
if item is not None:
return item
- # Value, variable, or byte order item?
- item = self._try_parse_val_or_var_or_bo()
+ # Value, variable assignment, or byte order setting item?
+ item = self._try_parse_val_or_var_assign_or_set_bo()
if item is not None:
return item
- # Label or offset item?
- item = self._try_parse_label_or_offset()
+ # Label or offset setting item?
+ item = self._try_parse_label_or_set_offset()
if item is not None:
return item
# Pattern for _try_parse_rep()
_rep_prefix_pat = re.compile(r"\*\s*")
+ _rep_expr_prefix_pat = re.compile(r"\{")
+ _rep_expr_pat = re.compile(r"[^}p]+")
+ _rep_expr_suffix_pat = re.compile(r"\}")
- # Tries to parse a repetition, returning the multiplier on success,
- # or 1 otherwise.
+ # Tries to parse a repetition, returning the expression string and
+ # AST expression node on success.
def _try_parse_rep(self):
# Match prefix
if self._try_parse_pat(self._rep_prefix_pat) is None:
# No match
- return 1
+ return
# Expect and return a decimal multiplier
self._skip_ws_and_comments()
- m = self._expect_pat(
- self._pos_const_int_pat, "Expecting a positive integral multiplier"
- )
- return int(m.group(0), 0)
+
+ # Integer?
+ m = self._try_parse_pat(self._pos_const_int_pat)
+
+ if m is None:
+ # Expression?
+ if self._try_parse_pat(self._rep_expr_prefix_pat) is None:
+ # At this point it's invalid
+ self._raise_error("Expecting a positive integral multiplier or `{`")
+
+ # Expect an expression
+ expr_str_loc = self._text_loc
+ m = self._expect_pat(self._rep_expr_pat, "Expecting an expression")
+ expr_str = self._ast_expr_from_str(m.group(0), expr_str_loc)
+
+ # Expect `}`
+ self._expect_pat(self._rep_expr_suffix_pat, "Expecting `}`")
+ expr_str = m.group(0)
+ else:
+ expr_str_loc = self._text_loc
+ expr_str = m.group(0)
+
+ return self._ast_expr_from_str(expr_str, expr_str_loc)
# Tries to parse an item, possibly followed by a repetition,
# returning `True` on success.
if isinstance(item, _RepableItem):
self._skip_ws_and_comments()
rep_text_loc = self._text_loc
- rep = self._try_parse_rep()
+ rep_ret = self._try_parse_rep()
- if rep == 0:
- # No item, but that's okay
- return True
- elif rep > 1:
- # Convert to repetition item
- item = _Rep(item, rep, rep_text_loc)
+ if rep_ret is not None:
+ item = _Rep(item, rep_ret[0], rep_ret[1], rep_text_loc)
items.append(item)
return True
_icitte_name = "ICITTE"
-# Value expression validator.
-class _ExprValidator(ast.NodeVisitor):
- def __init__(self, item: _ExprItemT, syms: VarsT):
- self._item = item
- self._syms = syms
+# Base node visitor.
+#
+# Calls the _visit_name() method for each name node which isn't the name
+# of a call.
+class _NodeVisitor(ast.NodeVisitor):
+ def __init__(self):
self._parent_is_call = False
def generic_visit(self, node: ast.AST):
if type(node) is ast.Call:
self._parent_is_call = True
elif type(node) is ast.Name and not self._parent_is_call:
- # Make sure the name refers to a known label name
- if node.id != _icitte_name and node.id not in self._syms:
- _raise_error(
- "Unknown variable/label name `{}` in expression `{}`".format(
- node.id, self._item.expr_str
- ),
- self._item.text_loc,
- )
-
- # TODO: Restrict the set of allowed node types
+ self._visit_name(node.id)
super().generic_visit(node)
self._parent_is_call = False
+ @abc.abstractmethod
+ def _visit_name(self, name: str):
+ ...
+
-# Keeper of labels for a given group instance.
-#
-# A group instance is one iteration of a given group.
-class _GroupInstanceLabels:
+# Expression validator: validates that all the names within the
+# expression are allowed.
+class _ExprValidator(_NodeVisitor):
+ def __init__(self, item: _ExprItemT, allowed_names: Set[str], icitte_allowed: bool):
+ super().__init__()
+ self._item = item
+ self._allowed_names = allowed_names
+ self._icitte_allowed = icitte_allowed
+
+ def _visit_name(self, name: str):
+ # Make sure the name refers to a known and reachable
+ # variable/label name.
+ if name == _icitte_name and not self._icitte_allowed:
+ _raise_error(
+ "Illegal reserved name `{}` in expression `{}`".format(
+ _icitte_name, self._item.expr_str
+ ),
+ self._item.text_loc,
+ )
+ elif name != _icitte_name and name not in self._allowed_names:
+ msg = "Illegal (unknown or unreachable) variable/label name `{}` in expression `{}`".format(
+ name, self._item.expr_str
+ )
+
+ if len(self._allowed_names) > 0:
+ allowed_names = self._allowed_names.copy()
+
+ if self._icitte_allowed:
+ allowed_names.add(_icitte_name)
+
+ allowed_names_str = ", ".join(
+ sorted(["`{}`".format(name) for name in allowed_names])
+ )
+ msg += "; the legal names are {{{}}}".format(allowed_names_str)
+
+ _raise_error(
+ msg,
+ self._item.text_loc,
+ )
+
+
+# Expression visitor getting all the contained names.
+class _ExprNamesVisitor(_NodeVisitor):
def __init__(self):
- self._instance_labels = {} # type: Dict[_Group, Dict[int, VarsT]]
+ self._parent_is_call = False
+ self._names = set() # type: Set[str]
+
+ @property
+ def names(self):
+ return self._names
- # Assigns the labels `labels` to a new instance of `group`.
- def add(self, group: _Group, labels: VarsT):
- if group not in self._instance_labels:
- self._instance_labels[group] = {}
+ def _visit_name(self, name: str):
+ self._names.add(name)
- spec_instance_labels = self._instance_labels[group]
- spec_instance_labels[len(spec_instance_labels)] = labels.copy()
- # Returns the labels (not a copy) of the instance `instance_index`
- # of the group `group`.
- def labels(self, group: _Group, instance_index: int):
- return self._instance_labels[group][instance_index]
+# Generator state.
+class _GenState:
+ def __init__(
+ self, variables: VarsT, labels: VarsT, offset: int, bo: Optional[ByteOrder]
+ ):
+ self.variables = variables.copy()
+ self.labels = labels.copy()
+ self.offset = offset
+ self.bo = bo
-# Generator of data and labels from a group item.
+# Generator of data and final state from a group item.
#
# Generation happens in memory at construction time. After building, use
# the `data`, `variables`, `labels`, `offset`, and `bo` properties to
# get the resulting context.
+#
+# The steps of generation are:
+#
+# 1. Validate that each repetition expression uses only reachable names
+# and not `ICITTE`.
+#
+# 2. Compute and keep the effective repetition count for each repetition
+# instance.
+#
+# 3. Generate bytes, updating the initial state as it goes which becomes
+# the final state after the operation.
+#
+# During the generation, when handling a `_Rep` item, we already have
+# the effective repetition count of the instance.
+#
+# When handling a `_Group` item, first update the current labels with
+# all the immediate (not nested) labels, and then handle each
+# contained item. This gives contained item access to "future" outer
+# labels. Then remove the immediate labels from the state so that
+# outer items don't have access to inner labels.
class _Gen:
def __init__(
self,
offset: int,
bo: Optional[ByteOrder],
):
- self._group_instance_labels = _GroupInstanceLabels()
- self._resolve_labels(group, offset, labels.copy())
- self._vars = variables.copy()
- self._offset = offset
- self._bo = bo
- self._main_group = group
- self._gen()
+ self._validate_rep_exprs(group, set(variables.keys()), set(labels.keys()))
+ self._rep_instance_vals = self._compute_rep_instance_vals(
+ group, _GenState(variables, labels, offset, bo)
+ )
+ self._gen(group, _GenState(variables, labels, offset, bo))
# Generated bytes.
@property
# Updated variables.
@property
def variables(self):
- return self._vars
+ return self._final_state.variables
# Updated main group labels.
@property
def labels(self):
- return self._group_instance_labels.labels(self._main_group, 0)
+ return self._final_state.labels
# Updated offset.
@property
def offset(self):
- return self._offset
+ return self._final_state.offset
# Updated byte order.
@property
def bo(self):
- return self._bo
-
- # Fills `self._group_instance_labels` with the labels for each group
- # instance in `item`, starting at current offset `offset` with the
- # current labels `labels`.
+ return self._final_state.bo
+
+ # Returns the set of used, non-called names within the AST
+ # expression `expr`.
+ @staticmethod
+ def _names_of_expr(expr: ast.Expression):
+ visitor = _ExprNamesVisitor()
+ visitor.visit(expr)
+ return visitor.names
+
+ # Validates that all the repetition expressions within `group` don't
+ # refer, directly or indirectly, to subsequent labels.
#
- # Returns the new current offset.
- def _resolve_labels(self, item: _Item, offset: int, labels: VarsT) -> int:
- if type(item) is _Group:
- # First pass: compute immediate labels of this instance
- group_labels = labels.copy()
- group_offset = offset
-
- for subitem in item.items:
- if type(subitem) is _Offset:
- group_offset = subitem.val
- elif type(subitem) is _Label:
- assert subitem.name not in group_labels
- group_labels[subitem.name] = group_offset
- else:
- group_offset += subitem.size
-
- # Add to group instance labels
- self._group_instance_labels.add(item, group_labels)
-
- # Second pass: handle each item
- for subitem in item.items:
- offset = self._resolve_labels(subitem, offset, group_labels)
+ # The strategy here is to keep a set of allowed label names, per
+ # group, initialized to `allowed_label_names`, and a set of allowed
+ # variable names initialized to `allowed_variable_names`.
+ #
+ # Then, depending on the type of `item`:
+ #
+ # `_Label`:
+ # Add its name to the local allowed label names: a label
+ # occurring before a repetition, and not within a nested group,
+ # is always reachable.
+ #
+ # `_VarAssign`:
+ # If all the names within its expression are allowed, then add
+ # its name to the allowed variable names.
+ #
+ # Otherwise, remove its name from the allowed variable names (if
+ # it's in there): a variable which refers to an unreachable name
+ # is unreachable itself.
+ #
+ # `_Rep`:
+ # Make sure all the names within its expression are allowed.
+ #
+ # `_Group`:
+ # Call this function for each contained item with a _copy_ of
+ # the current allowed label names and the same current allowed
+ # variable names.
+ @staticmethod
+ def _validate_rep_exprs(
+ item: _Item, allowed_variable_names: Set[str], allowed_label_names: Set[str]
+ ):
+ if type(item) is _Label:
+ allowed_label_names.add(item.name)
+ elif type(item) is _VarAssign:
+ # Check if this variable name is allowed
+ allowed = True
+
+ for name in _Gen._names_of_expr(item.expr):
+ if name not in (
+ allowed_label_names | allowed_variable_names | {_icitte_name}
+ ):
+ # Not allowed
+ allowed = False
+ break
+
+ if allowed:
+ allowed_variable_names.add(item.name)
+ elif item.name in allowed_variable_names:
+ allowed_variable_names.remove(item.name)
elif type(item) is _Rep:
- for _ in range(item.mul):
- offset = self._resolve_labels(item.item, offset, labels)
- elif type(item) is _Offset:
- offset = item.val
- else:
- offset += item.size
-
- return offset
-
- def _handle_byte_item(self, item: _Byte):
- self._data.append(item.val)
- self._offset += item.size
-
- def _handle_str_item(self, item: _Str):
- self._data += item.data
- self._offset += item.size
+ # Validate the expression first
+ _ExprValidator(
+ item, allowed_label_names | allowed_variable_names, False
+ ).visit(item.expr)
+
+ # Validate inner item
+ _Gen._validate_rep_exprs(
+ item.item, allowed_variable_names, allowed_label_names
+ )
+ elif type(item) is _Group:
+ # Copy `allowed_label_names` so that this frame cannot
+ # access the nested label names.
+ group_allowed_label_names = allowed_label_names.copy()
- def _handle_bo_item(self, item: _Bo):
- self._bo = item.bo
+ for subitem in item.items:
+ _Gen._validate_rep_exprs(
+ subitem, allowed_variable_names, group_allowed_label_names
+ )
- def _eval_expr(self, item: _ExprItemT):
- # Get the labels of the current group instance as the initial
- # symbols (copied because we're adding stuff).
- assert self._cur_group is not None
- syms = self._group_instance_labels.labels(
- self._cur_group, self._group_instance_indexes[self._cur_group]
- ).copy()
+ # Evaluates the expression of `item` considering the current
+ # generation state `state`.
+ #
+ # If `allow_icitte` is `True`, then the `ICITTE` name is available
+ # for the expression to evaluate.
+ @staticmethod
+ def _eval_item_expr(item: _ExprItemT, state: _GenState, allow_icitte: bool):
+ syms = state.labels.copy()
- # Set the `ICITTE` name to the current offset (before encoding)
- syms[_icitte_name] = self._offset
+ # Set the `ICITTE` name to the current offset, if any
+ if allow_icitte:
+ syms[_icitte_name] = state.offset
# Add the current variables
- syms.update(self._vars)
+ syms.update(state.variables)
# Validate the node and its children
- _ExprValidator(item, syms).visit(item.expr)
+ _ExprValidator(item, set(syms.keys()), True).visit(item.expr)
# Compile and evaluate expression node
try:
# Validate result
if type(val) is not int:
_raise_error_for_item(
- "Invalid expression `{}`: unexpected result type `{}`".format(
+ "Invalid expression `{}`: expecting result type `int`, not `{}`".format(
item.expr_str, type(val).__name__
),
item,
return val
- def _handle_var_item(self, item: _Var):
+ # Computes the effective value (multiplier) for each repetition
+ # instance, filling `instance_vals` (if not `None`) and returning
+ # `instance_vals`.
+ #
+ # At this point it must be known that, for a given repetition, its
+ # expression only contains reachable names.
+ #
+ # When handling a `_Rep` item, this function appends its effective
+ # multiplier to `instance_vals` _before_ handling its repeated item.
+ #
+ # When handling a `_VarAssign` item, this function only evaluates it if
+ # all its names are reachable.
+ @staticmethod
+ def _compute_rep_instance_vals(
+ item: _Item, state: _GenState, instance_vals: Optional[List[int]] = None
+ ):
+ if instance_vals is None:
+ instance_vals = []
+
+ if isinstance(item, _ScalarItem):
+ state.offset += item.size
+ elif type(item) is _Label:
+ state.labels[item.name] = state.offset
+ elif type(item) is _VarAssign:
+ # Check if all the names are reachable
+ do_eval = True
+
+ for name in _Gen._names_of_expr(item.expr):
+ if (
+ name != _icitte_name
+ and name not in state.variables
+ and name not in state.labels
+ ):
+ # A name is unknown: cannot evaluate
+ do_eval = False
+ break
+
+ if do_eval:
+ # Evaluate the expression and keep the result
+ state.variables[item.name] = _Gen._eval_item_expr(item, state, True)
+ elif type(item) is _SetOffset:
+ state.offset = item.val
+ elif type(item) is _Rep:
+ # Evaluate the expression and keep the result
+ val = _Gen._eval_item_expr(item, state, False)
+
+ # Validate result
+ if val < 0:
+ _raise_error_for_item(
+ "Invalid expression `{}`: unexpected negative result {:,}".format(
+ item.expr_str, val
+ ),
+ item,
+ )
+
+ # Add to repetition instance values
+ instance_vals.append(val)
+
+ # Process the repeated item `val` times
+ for _ in range(val):
+ _Gen._compute_rep_instance_vals(item.item, state, instance_vals)
+ elif type(item) is _Group:
+ prev_labels = state.labels.copy()
+
+ # Process each item
+ for subitem in item.items:
+ _Gen._compute_rep_instance_vals(subitem, state, instance_vals)
+
+ state.labels = prev_labels
+
+ return instance_vals
+
+ def _zero_item_size(self, item: _Item, next_rep_instance: int):
+ return 0, next_rep_instance
+
+ def _scalar_item_size(self, item: _ScalarItem, next_rep_instance: int):
+ return item.size, next_rep_instance
+
+ def _group_item_size(self, item: _Group, next_rep_instance: int):
+ size = 0
+
+ for subitem in item.items:
+ subitem_size, next_rep_instance = self._item_size(
+ subitem, next_rep_instance
+ )
+ size += subitem_size
+
+ return size, next_rep_instance
+
+ def _rep_item_size(self, item: _Rep, next_rep_instance: int):
+ # Get the value from `self._rep_instance_vals` _before_
+ # incrementing `next_rep_instance` to honor the order of
+ # _compute_rep_instance_vals().
+ mul = self._rep_instance_vals[next_rep_instance]
+ next_rep_instance += 1
+ size = 0
+
+ for _ in range(mul):
+ iter_size, next_rep_instance = self._item_size(item.item, next_rep_instance)
+ size += iter_size
+
+ return size, next_rep_instance
+
+ # Returns the size of `item` and the new next repetition instance.
+ def _item_size(self, item: _Item, next_rep_instance: int):
+ return self._item_size_funcs[type(item)](item, next_rep_instance)
+
+ # Handles the byte item `item`.
+ def _handle_byte_item(self, item: _Byte, state: _GenState, next_rep_instance: int):
+ self._data.append(item.val)
+ state.offset += item.size
+ return next_rep_instance
+
+ # Handles the string item `item`.
+ def _handle_str_item(self, item: _Str, state: _GenState, next_rep_instance: int):
+ self._data += item.data
+ state.offset += item.size
+ return next_rep_instance
+
+ # Handles the byte order setting item `item`.
+ def _handle_set_bo_item(
+ self, item: _SetBo, state: _GenState, next_rep_instance: int
+ ):
+ # Update current byte order
+ state.bo = item.bo
+ return next_rep_instance
+
+ # Handles the variable assignment item `item`.
+ def _handle_var_assign_item(
+ self, item: _VarAssign, state: _GenState, next_rep_instance: int
+ ):
# Update variable
- self._vars[item.name] = self._eval_expr(item)
+ state.variables[item.name] = self._eval_item_expr(item, state, True)
+ return next_rep_instance
- def _handle_val_item(self, item: _Val):
+ # Handles the value item `item`.
+ def _handle_val_item(self, item: _Val, state: _GenState, next_rep_instance: int):
# Compute value
- val = self._eval_expr(item)
+ val = self._eval_item_expr(item, state, True)
# Validate range
if val < -(2 ** (item.len - 1)) or val > 2**item.len - 1:
_raise_error_for_item(
"Value {:,} is outside the {}-bit range when evaluating expression `{}` at byte offset {:,}".format(
- val, item.len, item.expr_str, self._offset
+ val, item.len, item.expr_str, state.offset
),
item,
)
# Encode result on 64 bits (to extend the sign bit whatever the
# value of `item.len`).
- if self._bo is None and item.len > 8:
+ if state.bo is None and item.len > 8:
_raise_error_for_item(
"Current byte order isn't defined at first value (`{}`) to encode on more than 8 bits".format(
item.expr_str
data = struct.pack(
"{}{}".format(
- ">" if self._bo in (None, ByteOrder.BE) else "<",
+ ">" if state.bo in (None, ByteOrder.BE) else "<",
"Q" if val >= 0 else "q",
),
val,
# Keep only the requested length
len_bytes = item.len // 8
- if self._bo in (None, ByteOrder.BE):
+ if state.bo in (None, ByteOrder.BE):
# Big endian: keep last bytes
data = data[-len_bytes:]
else:
# Little endian: keep first bytes
- assert self._bo == ByteOrder.LE
+ assert state.bo == ByteOrder.LE
data = data[:len_bytes]
# Append to current bytes and update offset
self._data += data
- self._offset += len(data)
+ state.offset += len(data)
+ return next_rep_instance
- def _handle_group_item(self, item: _Group):
- # Update the instance index of `item`
- if item not in self._group_instance_indexes:
- self._group_instance_indexes[item] = 0
- else:
- self._group_instance_indexes[item] += 1
+ # Handles the group item `item`, only removing the immediate labels
+ # from `state.labels` if `remove_immediate_labels` is `True`.
+ def _handle_group_item(
+ self,
+ item: _Group,
+ state: _GenState,
+ next_rep_instance: int,
+ remove_immediate_labels: bool = True,
+ ):
+ # Compute the values of the immediate (not nested) labels. Those
+ # labels are reachable by any expression within the group.
+ offset = state.offset
+ immediate_label_names = set() # type: Set[str]
+ tmp_next_rep_instance = next_rep_instance
- # Changed current group
- old_cur_group = self._cur_group
- self._cur_group = item
+ for subitem in item.items:
+ if type(subitem) is _SetOffset:
+ # Update offset
+ offset = subitem.val
+ elif type(subitem) is _Label:
+ # New immediate label
+ state.labels[subitem.name] = offset
+ immediate_label_names.add(subitem.name)
+
+ subitem_size, tmp_next_rep_instance = self._item_size(
+ subitem, tmp_next_rep_instance
+ )
+ offset += subitem_size
- # Handle each item
+ # Handle each item now with the actual state
for subitem in item.items:
- self._handle_item(subitem)
+ next_rep_instance = self._handle_item(subitem, state, next_rep_instance)
+
+ # Remove immediate labels if required so that outer items won't
+ # reach inner labels.
+ if remove_immediate_labels:
+ for name in immediate_label_names:
+ del state.labels[name]
- # Restore current group
- self._cur_group = old_cur_group
+ return next_rep_instance
- def _handle_rep_item(self, item: _Rep):
- for _ in range(item.mul):
- self._handle_item(item.item)
+ # Handles the repetition item `item`.
+ def _handle_rep_item(self, item: _Rep, state: _GenState, next_rep_instance: int):
+ mul = self._rep_instance_vals[next_rep_instance]
+ next_rep_instance += 1
- def _handle_offset_item(self, item: _Offset):
- self._offset = item.val
+ for _ in range(mul):
+ next_rep_instance = self._handle_item(item.item, state, next_rep_instance)
- def _handle_item(self, item: _Item):
- if type(item) in self._item_handlers:
- self._item_handlers[type(item)](item)
+ return next_rep_instance
- def _gen(self):
+ # Handles the offset setting item `item`.
+ def _handle_set_offset_item(
+ self, item: _SetOffset, state: _GenState, next_rep_instance: int
+ ):
+ state.offset = item.val
+ return next_rep_instance
+
+ # Handles the label item `item`.
+ def _handle_label_item(
+ self, item: _Label, state: _GenState, next_rep_instance: int
+ ):
+ return next_rep_instance
+
+ # Handles the item `item`, returning the updated next repetition
+ # instance.
+ def _handle_item(self, item: _Item, state: _GenState, next_rep_instance: int):
+ return self._item_handlers[type(item)](item, state, next_rep_instance)
+
+ # Generates the data (`self._data`) and final state
+ # (`self._final_state`) from `group` and the initial state `state`.
+ def _gen(self, group: _Group, state: _GenState):
# Initial state
self._data = bytearray()
- self._group_instance_indexes = {} # type: Dict[_Group, int]
- self._cur_group = None
# Item handlers
self._item_handlers = {
_Byte: self._handle_byte_item,
- _Str: self._handle_str_item,
- _Bo: self._handle_bo_item,
- _Val: self._handle_val_item,
- _Var: self._handle_var_item,
_Group: self._handle_group_item,
+ _Label: self._handle_label_item,
_Rep: self._handle_rep_item,
- _Offset: self._handle_offset_item,
- } # type: Dict[type, Callable[[Any], None]]
-
- # Handle the group item
- self._handle_item(self._main_group)
+ _SetBo: self._handle_set_bo_item,
+ _SetOffset: self._handle_set_offset_item,
+ _Str: self._handle_str_item,
+ _Val: self._handle_val_item,
+ _VarAssign: self._handle_var_assign_item,
+ } # type: Dict[type, Callable[[Any, _GenState, int], int]]
+
+ # Item size getters
+ self._item_size_funcs = {
+ _Byte: self._scalar_item_size,
+ _Group: self._group_item_size,
+ _Label: self._zero_item_size,
+ _Rep: self._rep_item_size,
+ _SetBo: self._zero_item_size,
+ _SetOffset: self._zero_item_size,
+ _Str: self._scalar_item_size,
+ _Val: self._scalar_item_size,
+ _VarAssign: self._zero_item_size,
+ } # type: Dict[type, Callable[[Any, int], Tuple[int, int]]]
+
+ # Handle the group item, _not_ removing the immediate labels
+ # because the `labels` property offers them.
+ self._handle_group_item(group, state, 0, False)
+
+ # This is actually the final state
+ self._final_state = state
# Returns a `ParseResult` instance containing the bytes encoded by the