-black == 20.8b1
+black ~= 22.0
flake8 >= 3.8
@property
def _upper_bound(self):
- return (2 ** self.cls.field_value_range) - 1
+ return (2**self.cls.field_value_range) - 1
class _SignedIntegerFieldConst(_IntegerFieldConst, _FieldConst):
def _is_in_int64_range(v):
assert isinstance(v, int)
- return v >= -(2 ** 63) and v <= (2 ** 63 - 1)
+ return v >= -(2**63) and v <= (2**63 - 1)
def _is_int64(v):
def _is_in_uint64_range(v):
assert isinstance(v, int)
- return v >= 0 and v <= (2 ** 64 - 1)
+ return v >= 0 and v <= (2**64 - 1)
def _is_uint64(v):
def test_cycles_to_ns_from_origin(self):
def f(comp_self):
return comp_self._create_clock_class(
- frequency=10 ** 8, origin_is_unix_epoch=True
+ frequency=10**8, origin_is_unix_epoch=True
)
cc = run_in_component_init(f)
cc = run_in_component_init(f)
with self.assertRaises(bt2._OverflowError):
- cc.cycles_to_ns_from_origin(2 ** 63)
+ cc.cycles_to_ns_from_origin(2**63)
def test_create_uuid(self):
def f(comp_self):
elif self._at == 1:
notif = self._create_event_message(_ec, _stream, 123)
elif self._at == 2:
- notif = self._create_event_message(_ec, _stream, 2 ** 63)
+ notif = self._create_event_message(_ec, _stream, 2**63)
elif self._at == 3:
notif = self._create_stream_end_message(_stream)
else:
field = _create_field(self._tc, uint_fc)
# Larger than the IEEE 754 double-precision exact representation of
# integers.
- raw = (2 ** 53) + 1
- field.value = (2 ** 53) + 1
+ raw = (2**53) + 1
+ field.value = (2**53) + 1
self.assertEqual(field, raw)
def test_assign_uint_out_of_range(self):
fc.add_mapping('speaker', bt2.SignedIntegerRangeSet([(12, 16)]))
fc.add_mapping('can', bt2.SignedIntegerRangeSet([(18, 2540)]))
fc.add_mapping(
- 'whole range', bt2.SignedIntegerRangeSet([(-(2 ** 31), (2 ** 31) - 1)])
+ 'whole range', bt2.SignedIntegerRangeSet([(-(2**31), (2**31) - 1)])
)
fc.add_mapping('zip', bt2.SignedIntegerRangeSet([(-45, 1001)]))
return fc
def test_create_pos_too_big(self):
with self._assert_expecting_int64():
- self._CLS(2 ** 63)
+ self._CLS(2**63)
def test_create_neg_too_big(self):
with self._assert_expecting_int64():
- self._CLS(-(2 ** 63) - 1)
+ self._CLS(-(2**63) - 1)
def test_assign_neg_int(self):
raw = -13
def test_compare_big_int(self):
# Larger than the IEEE 754 double-precision exact representation of
# integers.
- raw = (2 ** 53) + 1
+ raw = (2**53) + 1
v = bt2.create_value(raw)
self.assertEqual(v, raw)
def test_create_pos_too_big(self):
with self._assert_expecting_uint64():
- self._CLS(2 ** 64)
+ self._CLS(2**64)
def test_create_neg(self):
with self._assert_expecting_uint64():
It is an alternative to TestCase to collect TAP results.
"""
+
failureException = AssertionError
def __init__(self, filename, line):
r"""^SKIP\S*
(?P<whitespace>\s*) # Optional whitespace.
(?P<reason>.*) # Slurp up the rest.""",
- re.IGNORECASE | re.VERBOSE)
+ re.IGNORECASE | re.VERBOSE,
+ )
todo_pattern = re.compile(
r"""^TODO\b # The directive name
(?P<whitespace>\s*) # Immediately following must be whitespace.
(?P<reason>.*) # Slurp up the rest.""",
- re.IGNORECASE | re.VERBOSE)
+ re.IGNORECASE | re.VERBOSE,
+ )
def __init__(self, text):
"""Initialize the directive by parsing the text.
TAP is a line based protocol. Thus, the most primitive type is a line.
"""
+
@property
def category(self):
raise NotImplementedError
"""Information about an individual test line."""
def __init__(
- self, ok, number=None, description='', directive=None,
- diagnostics=None):
+ self, ok, number=None, description='', directive=None, diagnostics=None
+ ):
self._ok = ok
if number:
self._number = int(number)
if self.diagnostics is not None:
diagnostics = '\n' + self.diagnostics.rstrip()
return "{0}ok {1} - {2}{3}{4}".format(
- is_not, self.number, self.description, directive, diagnostics)
+ is_not, self.number, self.description, directive, diagnostics
+ )
class Plan(Line):
This exists for the purpose of a Null Object pattern.
"""
+
@property
def category(self):
""":returns: ``unknown``"""
description = _('A TAP consumer for Python')
epilog = _(
'When no files are given or a dash (-) is used for the file name, '
- 'tappy will read a TAP stream from STDIN.')
+ 'tappy will read a TAP stream from STDIN.'
+ )
parser = argparse.ArgumentParser(description=description, epilog=epilog)
parser.add_argument(
- 'files', metavar='FILE', nargs='*', help=_(
+ 'files',
+ metavar='FILE',
+ nargs='*',
+ help=_(
'A file containing TAP output. Any directories listed will be '
- 'scanned for files to include as TAP files.'))
+ 'scanned for files to include as TAP files.'
+ ),
+ )
parser.add_argument(
- '-v', '--verbose', action='store_const', default=1, const=2,
- help=_('use verbose messages'))
+ '-v',
+ '--verbose',
+ action='store_const',
+ default=1,
+ const=2,
+ help=_('use verbose messages'),
+ )
# argparse expects the executable to be removed from argv.
args = parser.parse_args(argv[1:])
"""
ok = re.compile(r'^ok' + result_base, re.VERBOSE)
not_ok = re.compile(r'^not\ ok' + result_base, re.VERBOSE)
- plan = re.compile(r"""
+ plan = re.compile(
+ r"""
^1..(?P<expected>\d+) # Match the plan details.
[^#]* # Consume any non-hash character to confirm only
# directives appear with the plan details.
\#? # Optional directive marker.
\s* # Optional whitespace.
(?P<directive>.*) # Optional directive text.
- """, re.VERBOSE)
+ """,
+ re.VERBOSE,
+ )
diagnostic = re.compile(r'^#')
- bail = re.compile(r"""
+ bail = re.compile(
+ r"""
^Bail\ out!
\s* # Optional whitespace.
(?P<reason>.*) # Optional reason.
- """, re.VERBOSE)
+ """,
+ re.VERBOSE,
+ )
version = re.compile(r'^TAP version (?P<version>\d+)$')
TAP_MINIMUM_DECLARED_VERSION = 13
def _parse_result(self, ok, match):
"""Parse a matching result line into a result instance."""
return Result(
- ok, match.group('number'), match.group('description').strip(),
- Directive(match.group('directive')))
+ ok,
+ match.group('number'),
+ match.group('description').strip(),
+ Directive(match.group('directive')),
+ )
def _parse_version(self, match):
version = int(match.group('version'))
if version < self.TAP_MINIMUM_DECLARED_VERSION:
- raise ValueError(_('It is an error to explicitly specify '
- 'any version lower than 13.'))
+ raise ValueError(
+ _('It is an error to explicitly specify ' 'any version lower than 13.')
+ )
return Version(version)
class Rules(object):
-
def __init__(self, filename, suite):
self._filename = filename
self._suite = suite
plan, at_line = self._lines_seen['plan'][0]
if not self._plan_on_valid_line(at_line, final_line_count):
self._add_error(
- _('A plan must appear at the beginning or end of the file.'))
+ _('A plan must appear at the beginning or end of the file.')
+ )
return
if plan.expected_tests != self._lines_seen['test']:
- self._add_error(_(
- 'Expected {expected_count} tests '
- 'but only {seen_count} ran.').format(
+ self._add_error(
+ _(
+ 'Expected {expected_count} tests ' 'but only {seen_count} ran.'
+ ).format(
expected_count=plan.expected_tests,
- seen_count=self._lines_seen['test']))
+ seen_count=self._lines_seen['test'],
+ )
+ )
def _plan_on_valid_line(self, at_line, final_line_count):
"""Check if a plan is on a valid line."""
# The plan may only appear on line 2 if the version is at line 1.
after_version = (
- self._lines_seen['version'] and
- self._lines_seen['version'][0] == 1 and
- at_line == 2)
+ self._lines_seen['version']
+ and self._lines_seen['version'][0] == 1
+ and at_line == 2
+ )
if after_version:
return True
def handle_file_does_not_exist(self):
"""Handle a test file that does not exist."""
- self._add_error(_('{filename} does not exist.').format(
- filename=self._filename))
+ self._add_error(_('{filename} does not exist.').format(filename=self._filename))
def handle_skipping_plan(self, skip_plan):
"""Handle a plan that contains a SKIP directive."""
- skip_line = Result(
- True, None, skip_plan.directive.text, Directive('SKIP'))
+ skip_line = Result(True, None, skip_plan.directive.text, Directive('SKIP'))
self._suite.addTest(Adapter(self._filename, skip_line))
def saw_plan(self, plan, at_line):
super(TAPTestResult, self).addError(test, err)
diagnostics = formatter.format_exception(err)
self.tracker.add_not_ok(
- self._cls_name(test), self._description(test),
- diagnostics=diagnostics)
+ self._cls_name(test), self._description(test), diagnostics=diagnostics
+ )
def addFailure(self, test, err):
super(TAPTestResult, self).addFailure(test, err)
diagnostics = formatter.format_exception(err)
self.tracker.add_not_ok(
- self._cls_name(test), self._description(test),
- diagnostics=diagnostics)
+ self._cls_name(test), self._description(test), diagnostics=diagnostics
+ )
def addSuccess(self, test):
super(TAPTestResult, self).addSuccess(test)
def addSkip(self, test, reason):
super(TAPTestResult, self).addSkip(test, reason)
- self.tracker.add_skip(
- self._cls_name(test), self._description(test), reason)
+ self.tracker.add_skip(self._cls_name(test), self._description(test), reason)
def addExpectedFailure(self, test, err):
super(TAPTestResult, self).addExpectedFailure(test, err)
diagnostics = formatter.format_exception(err)
self.tracker.add_not_ok(
- self._cls_name(test), self._description(test),
- _('(expected failure)'), diagnostics=diagnostics)
+ self._cls_name(test),
+ self._description(test),
+ _('(expected failure)'),
+ diagnostics=diagnostics,
+ )
def addUnexpectedSuccess(self, test):
super(TAPTestResult, self).addUnexpectedSuccess(test)
- self.tracker.add_ok(self._cls_name(test), self._description(test),
- _('(unexpected success)'))
+ self.tracker.add_ok(
+ self._cls_name(test), self._description(test), _('(unexpected success)')
+ )
def _cls_name(self, test):
return test.__class__.__name__
try:
return self.FORMAT.format(
method_name=str(test),
- short_description=test.shortDescription() or '')
+ short_description=test.shortDescription() or '',
+ )
except KeyError:
- sys.exit(_(
- 'Bad format string: {format}\n'
- 'Replacement options are: {{short_description}} and '
- '{{method_name}}').format(format=self.FORMAT))
+ sys.exit(
+ _(
+ 'Bad format string: {format}\n'
+ 'Replacement options are: {{short_description}} and '
+ '{{method_name}}'
+ ).format(format=self.FORMAT)
+ )
return test.shortDescription() or str(test)
_tracker.stream = sys.stdout
def _makeResult(self):
- result = self.resultclass(
- self.stream, self.descriptions, self.verbosity)
+ result = self.resultclass(self.stream, self.descriptions, self.verbosity)
result.tracker = _tracker
return result
class Tracker(object):
-
def __init__(
- self, outdir=None, combined=False, streaming=False, stream=None,
- header=True):
+ self, outdir=None, combined=False, streaming=False, stream=None, header=True
+ ):
self.outdir = outdir
# Combine all the test results into one file.
def add_ok(self, class_name, description, directive=''):
result = Result(
- ok=True, number=self._get_next_line_number(class_name),
- description=description, directive=Directive(directive))
+ ok=True,
+ number=self._get_next_line_number(class_name),
+ description=description,
+ directive=Directive(directive),
+ )
self._add_line(class_name, result)
- def add_not_ok(
- self, class_name, description, directive='', diagnostics=None):
+ def add_not_ok(self, class_name, description, directive='', diagnostics=None):
result = Result(
- ok=False, number=self._get_next_line_number(class_name),
- description=description, diagnostics=diagnostics,
- directive=Directive(directive))
+ ok=False,
+ number=self._get_next_line_number(class_name),
+ description=description,
+ diagnostics=diagnostics,
+ directive=Directive(directive),
+ )
self._add_line(class_name, result)
def add_skip(self, class_name, description, reason):
directive = 'SKIP {0}'.format(reason)
result = Result(
- ok=True, number=self._get_next_line_number(class_name),
- description=description, directive=Directive(directive))
+ ok=True,
+ number=self._get_next_line_number(class_name),
+ description=description,
+ directive=Directive(directive),
+ )
self._add_line(class_name, result)
def _add_line(self, class_name, result):
with open(combined_file, 'w') as out_file:
for test_case in self.combined_test_cases_seen:
self.generate_tap_report(
- test_case, self._test_cases[test_case], out_file)
- print(
- '1..{0}'.format(self.combined_line_number), file=out_file)
+ test_case, self._test_cases[test_case], out_file
+ )
+ print('1..{0}'.format(self.combined_line_number), file=out_file)
else:
for test_case, tap_lines in self._test_cases.items():
with open(self._get_tap_file_path(test_case), 'w') as out_file:
print('1..{0}'.format(len(tap_lines)), file=out_file)
def _write_test_case_header(self, test_case, stream):
- print(_('# TAP results for {test_case}').format(
- test_case=test_case), file=stream)
+ print(
+ _('# TAP results for {test_case}').format(test_case=test_case), file=stream
+ )
def _get_tap_file_path(self, test_case):
"""Get the TAP output file path for the test case."""