# it would mean a programming or schema error.
class _RefResolver(jsonschema.RefResolver):
def resolve_remote(self, uri):
- raise RuntimeError('Missing local schema with URI `{}`'.format(uri))
+ raise RuntimeError(f'Missing local schema with URI `{uri}`')
# Schema validator which considers all the schemas found in the barectf
def _validate(self, instance, schema_short_id):
# retrieve full schema ID from short ID
- schema_id = 'https://barectf.org/schemas/{}.json'.format(schema_short_id)
+ schema_id = f'https://barectf.org/schemas/{schema_short_id}.json'
assert schema_id in self._store
# retrieve full schema
# property's name).
for elem in exc.absolute_path:
if type(elem) is int:
- ctx = 'Element {}'.format(elem)
+ ctx = f'Element {elem}'
else:
- ctx = '`{}` property'.format(elem)
+ ctx = f'`{elem}` property'
contexts.append(ctx)
# Join each message with `; ` and append this to our
# configuration parsing error's message.
msgs = '; '.join([e.message for e in exc.context])
- schema_ctx = ': {}'.format(msgs)
+ schema_ctx = f': {msgs}'
new_exc = _ConfigParseError(contexts.pop(),
- '{}{} (from schema `{}`)'.format(exc.message,
- schema_ctx,
- schema_short_id))
+ f'{exc.message}{schema_ctx} (from schema `{schema_short_id}`)')
for ctx in reversed(contexts):
new_exc.append_ctx(ctx)
}
if iden in ctf_keywords:
- fmt = 'Invalid {} (not a valid identifier): `{}`'
- raise _ConfigParseError(ctx_obj_name, fmt.format(prop, iden))
+ msg = f'Invalid {prop} (not a valid identifier): `{iden}`'
+ raise _ConfigParseError(ctx_obj_name, msg)
# Validates the alignment `align`, raising a `_ConfigParseError`
if (align & (align - 1)) != 0:
raise _ConfigParseError(ctx_obj_name,
- 'Invalid alignment (not a power of two): {}'.format(align))
+ f'Invalid alignment (not a power of two): {align}')
# Entities.
self._validate_type(field_type, False)
except _ConfigParseError as exc:
_append_error_ctx(exc,
- 'Structure field type\'s field `{}`'.format(field_name))
+ f'Structure field type\'s field `{field_name}`')
def _validate_array_type(self, t, entity_root):
raise _ConfigParseError('Array field type',
# make sure root field type has a real alignment of at least 8
if t.real_align < 8:
raise _ConfigParseError('Root field type',
- 'Effective alignment must be at least 8 (got {})'.format(t.real_align))
+ f'Effective alignment must be at least 8 (got {t.real_align})')
assert type(t) is _Struct
if stream.is_event_empty(ev):
raise _ConfigParseError('Event type', 'Empty')
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'Event type `{}`'.format(ev.name))
+ _append_error_ctx(exc, f'Event type `{ev.name}`')
def _validate_stream_entities_and_names(self, stream):
try:
for ev in stream.events.values():
self._validate_event_entities_and_names(stream, ev)
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'Stream type `{}`'.format(stream.name))
+ _append_error_ctx(exc, f'Stream type `{stream.name}`')
def _validate_entities_and_names(self, meta):
self._cur_entity = _Entity.TRACE_PACKET_HEADER
def _validate_default_stream(self, meta):
if meta.default_stream_name is not None:
if meta.default_stream_name not in meta.streams.keys():
- fmt = 'Default stream type name (`{}`) does not name an existing stream type'
- raise _ConfigParseError('Metadata',
- fmt.format(meta.default_stream_name))
+ msg = f'Default stream type name (`{meta.default_stream_name}`) does not name an existing stream type'
+ raise _ConfigParseError('Metadata', msg)
def validate(self, meta):
try:
elif field_name == 'stream_id':
if len(self._meta.streams) > (1 << field_type.size):
raise _ConfigParseError(ctx_obj_name,
- '`stream_id` field\'s size is too small to accomodate {} stream types'.format(len(self._meta.streams)))
+ f'`stream_id` field\'s size is too small to accomodate {len(self._meta.streams)} stream types')
# Validates the trace type of the metadata object `meta`.
def _validate_trace(self, meta):
if eid is not None:
if len(stream.events) > (1 << eid.size):
raise _ConfigParseError(ctx_obj_name,
- '`id` field\'s size is too small to accomodate {} event types'.format(len(stream.events)))
+ f'`id` field\'s size is too small to accomodate {len(stream.events)} event types')
# Validates the stream type `stream`.
def _validate_stream(self, stream):
try:
self._validate_stream(stream)
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'Stream type `{}`'.format(stream.name))
+ _append_error_ctx(exc, f'Stream type `{stream.name}`')
except _ConfigParseError as exc:
_append_error_ctx(exc, 'Metadata')
if clock is None:
exc = _ConfigParseError('`property-mappings` property',
- 'Clock type `{}` does not exist'.format(clock_name))
+ f'Clock type `{clock_name}` does not exist')
exc.append_ctx('Integer field type')
raise exc
if mn > mx:
exc = _ConfigParseError(ctx_obj_name)
- exc.append_ctx('Member `{}`'.format(label),
- 'Invalid integral range ({} > {})'.format(mn, mx))
+ exc.append_ctx(f'Member `{label}`',
+ f'Invalid integral range ({mn} > {mx})')
raise exc
value = (mn, mx)
# Make sure that all the integral values of the range
# fits the enumeration field type's integer value field
# type depending on its size (bits).
- member_obj_name = 'Member `{}`'.format(label)
- msg_fmt = 'Value {} is outside the value type range [{}, {}]'
- msg = msg_fmt.format(value[0], value_min, value_max)
+ member_obj_name = f'Member `{label}`'
+ msg = f'Value {value[0]} is outside the value type range [{value_min}, {value_max}]'
try:
if value[0] < value_min or value[0] > value_max:
obj.fields[field_name] = self._create_type(field_node)
except _ConfigParseError as exc:
_append_error_ctx(exc, ctx_obj_name,
- 'Cannot create field `{}`'.format(field_name))
+ f'Cannot create field `{field_name}`')
return obj
clock.uuid = uuid.UUID(uuid_node)
except ValueError as exc:
raise _ConfigParseError('Clock type',
- 'Malformed UUID `{}`: {}'.format(uuid_node, exc))
+ f'Malformed UUID `{uuid_node}`: {exc}')
descr_node = node.get('description')
clock = self._create_clock(clock_node)
except _ConfigParseError as exc:
_append_error_ctx(exc, 'Metadata',
- 'Cannot create clock type `{}`'.format(clock_name))
+ f'Cannot create clock type `{clock}`')
clock.name = clock_name
self._clocks[clock_name] = clock
trace.uuid = uuid.UUID(uuid_node)
except ValueError as exc:
raise _ConfigParseError(ctx_obj_name,
- 'Malformed UUID `{}`: {}'.format(uuid_node, exc))
+ f'Malformed UUID `{uuid_node}`: {exc}')
pht_node = trace_node.get('packet-header-type')
ev = self._create_event(ev_node)
except _ConfigParseError as exc:
_append_error_ctx(exc, ctx_obj_name,
- 'Cannot create event type `{}`'.format(ev_name))
+ f'Cannot create event type `{ev_name}`')
ev.id = cur_id
ev.name = ev_name
if default_node is not None:
if self._meta.default_stream_name is not None and self._meta.default_stream_name != stream_name:
- fmt = 'Cannot specify more than one default stream type (default stream type already set to `{}`)'
- raise _ConfigParseError('Stream type',
- fmt.format(self._meta.default_stream_name))
+ msg = f'Cannot specify more than one default stream type (default stream type already set to `{self._meta.default_stream_name}`)'
+ raise _ConfigParseError('Stream type', msg)
self._meta.default_stream_name = stream_name
stream = self._create_stream(stream_name, stream_node)
except _ConfigParseError as exc:
_append_error_ctx(exc, 'Metadata',
- 'Cannot create stream type `{}`'.format(stream_name))
+ f'Cannot create stream type `{stream_name}`')
stream.id = cur_id
stream.name = stream_name
if norm_path in self._include_stack:
base_path = self._get_last_include_file()
- raise _ConfigParseError('File `{}`'.format(base_path),
- 'Cannot recursively include file `{}`'.format(norm_path))
+ raise _ConfigParseError(f'File `{base_path}`',
+ f'Cannot recursively include file `{norm_path}`')
self._include_stack.append(norm_path)
if not self._ignore_include_not_found:
base_path = self._get_last_include_file()
- raise _ConfigParseError('File `{}`'.format(base_path),
- 'Cannot include file `{}`: file not found in inclusion directories'.format(yaml_path))
+ raise _ConfigParseError(f'File `{base_path}`',
+ f'Cannot include file `{yaml_path}`: file not found in inclusion directories')
# Returns a list of all the inclusion file paths as found in the
# inclusion node `include_node`.
try:
overlay_node = process_base_include_cb(overlay_node)
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'File `{}`'.format(cur_base_path))
+ _append_error_ctx(exc, f'File `{cur_base_path}`')
# pop inclusion stack now that we're done including
del self._include_stack[-1]
# didn't resolve the alias yet, as a given node can
# refer to the same field type alias more than once.
if alias in alias_set:
- fmt = 'Cycle detected during the `{}` field type alias resolution'
- raise _ConfigParseError(from_descr, fmt.format(alias))
+ msg = f'Cycle detected during the `{alias}` field type alias resolution'
+ raise _ConfigParseError(from_descr, msg)
# try to load field type alias node named `alias`
if alias not in type_aliases_node:
raise _ConfigParseError(from_descr,
- 'Field type alias `{}` does not exist'.format(alias))
+ f'Field type alias `{alias}` does not exist')
# resolve it
alias_set.add(alias)
def resolve_field_type_aliases_from(parent_node, key):
resolve_field_type_aliases(parent_node, key,
- '`{}` property'.format(key))
+ f'`{key}` property')
# set of resolved field type aliases
resolved_aliases = set()
resolve_field_type_aliases_from(event, 'context-type')
resolve_field_type_aliases_from(event, 'payload-type')
except _ConfigParseError as exc:
- _append_error_ctx(exc,
- 'Event type `{}`'.format(event_name))
+ _append_error_ctx(exc, f'Event type `{event_name}`')
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'Stream type `{}`'.format(stream_name))
+ _append_error_ctx(exc, f'Stream type `{stream_name}`')
# remove the (now unneeded) `type-aliases` node
del metadata_node['type-aliases']
if type(ll_node) is str:
if ll_node not in log_levels_node:
exc = _ConfigParseError('`log-level` property',
- 'Log level alias `{}` does not exist'.format(ll_node))
- exc.append_ctx('Event type `{}`'.format(event_name))
+ f'Log level alias `{ll_node}` does not exist')
+ exc.append_ctx(f'Event type `{event_name}`')
raise exc
event[prop_name] = log_levels_node[ll_node]
except _ConfigParseError as exc:
- _append_error_ctx(exc, 'Stream type `{}`'.format(stream_name))
+ _append_error_ctx(exc, f'Stream type `{stream_name}`')
# Dumps the node `node` as YAML, passing `kwds` to yaml.dump().
def _yaml_ordered_dump(self, node, **kwds):
with open(yaml_path, 'r') as f:
node = yaml.load(f, OLoader)
except (OSError, IOError) as exc:
- raise _ConfigParseError('File `{}`'.format(yaml_path),
- 'Cannot open file: {}'.format(exc))
+ raise _ConfigParseError(f'File `{yaml_path}`',
+ f'Cannot open file: {exc}')
assert type(node) is collections.OrderedDict
return node
config_node = self._yaml_ordered_load(self._root_path)
except _ConfigParseError as exc:
_append_error_ctx(exc, 'Configuration',
- 'Cannot parse YAML file `{}`'.format(self._root_path))
+ f'Cannot parse YAML file `{self._root_path}`')
# Make sure the configuration object is minimally valid, that
# is, it contains a valid `version` property.
dump_config).config
except _ConfigParseError as exc:
_append_error_ctx(exc, 'Configuration',
- 'Cannot create configuration from YAML file `{}`'.format(path))
+ f'Cannot create configuration from YAML file `{path}`')