class ComponentSpec:
- def __init__(self, plugin_name, class_name, params=None,
- logging_level=bt2.logging.LoggingLevel.NONE):
+ def __init__(
+ self,
+ plugin_name,
+ class_name,
+ params=None,
+ logging_level=bt2.logging.LoggingLevel.NONE,
+ ):
utils._check_str(plugin_name)
utils._check_str(class_name)
utils._check_log_level(logging_level)
# s -> ns
s = obj.timestamp()
else:
- raise TypeError('"{}" is not an integral number or a datetime.datetime object'.format(obj))
+ raise TypeError(
+ '"{}" is not an integral number or a datetime.datetime object'.format(obj)
+ )
return int(s * 1e9)
class TraceCollectionMessageIterator(bt2.message_iterator._MessageIterator):
- def __init__(self, source_component_specs, filter_component_specs=None,
- stream_intersection_mode=False, begin=None, end=None):
+ def __init__(
+ self,
+ source_component_specs,
+ filter_component_specs=None,
+ stream_intersection_mode=False,
+ begin=None,
+ end=None,
+ ):
utils._check_bool(stream_intersection_mode)
self._stream_intersection_mode = stream_intersection_mode
self._begin_ns = _get_ns(begin)
def _validate_component_specs(self, comp_specs):
for comp_spec in comp_specs:
if type(comp_spec) is not ComponentSpec:
- raise TypeError('"{}" object is not a ComponentSpec'.format(type(comp_spec)))
+ raise TypeError(
+ '"{}" object is not a ComponentSpec'.format(type(comp_spec))
+ )
def __next__(self):
return next(self._msg_iter)
try:
paths = src_comp_and_spec.spec.params['paths']
except Exception as e:
- raise bt2.Error('all source components must be created with a "paths" parameter in stream intersection mode') from e
+ raise bt2.Error(
+ 'all source components must be created with a "paths" parameter in stream intersection mode'
+ ) from e
params = {'paths': paths}
# contains the stream intersection range for each exposed
# trace
query_exec = bt2.QueryExecutor()
- trace_info_res = query_exec.query(src_comp_and_spec.comp.cls,
- 'trace-info', params)
+ trace_info_res = query_exec.query(
+ src_comp_and_spec.comp.cls, 'trace-info', params
+ )
begin = None
end = None
pass
if begin is None or end is None:
- raise bt2.Error('cannot find stream intersection range for port "{}"'.format(port.name))
+ raise bt2.Error(
+ 'cannot find stream intersection range for port "{}"'.format(port.name)
+ )
name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name)
return self._create_trimmer(begin, end, name)
raise bt2.Error('cannot find "utils" plugin (needed for the muxer)')
if 'muxer' not in plugin.filter_component_classes:
- raise bt2.Error('cannot find "muxer" filter component class in "utils" plugin')
+ raise bt2.Error(
+ 'cannot find "muxer" filter component class in "utils" plugin'
+ )
comp_cls = plugin.filter_component_classes['muxer']
return self._graph.add_component(comp_cls, 'muxer')
raise bt2.Error('cannot find "utils" plugin (needed for the trimmer)')
if 'trimmer' not in plugin.filter_component_classes:
- raise bt2.Error('cannot find "trimmer" filter component class in "utils" plugin')
+ raise bt2.Error(
+ 'cannot find "trimmer" filter component class in "utils" plugin'
+ )
params = {}
return self._graph.add_component(comp_cls, name, params)
def _get_unique_comp_name(self, comp_spec):
- name = '{}-{}'.format(comp_spec.plugin_name,
- comp_spec.class_name)
- comps_and_specs = itertools.chain(self._src_comps_and_specs,
- self._flt_comps_and_specs)
+ name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.class_name)
+ comps_and_specs = itertools.chain(
+ self._src_comps_and_specs, self._flt_comps_and_specs
+ )
if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
name += '-{}'.format(self._next_suffix)
if comp_spec.class_name not in comp_classes:
cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter'
- raise bt2.Error('no such {} component class in "{}" plugin: {}'.format(cc_type,
- comp_spec.plugin_name,
- comp_spec.class_name))
+ raise bt2.Error(
+ 'no such {} component class in "{}" plugin: {}'.format(
+ cc_type, comp_spec.plugin_name, comp_spec.class_name
+ )
+ )
comp_cls = comp_classes[comp_spec.class_name]
name = self._get_unique_comp_name(comp_spec)
- comp = self._graph.add_component(comp_cls, name, comp_spec.params,
- comp_spec.logging_level)
+ comp = self._graph.add_component(
+ comp_cls, name, comp_spec.params, comp_spec.logging_level
+ )
return comp
def _get_free_muxer_input_port(self):
self._muxer_comp = self._create_muxer()
if self._begin_ns is not None or self._end_ns is not None:
- trimmer_comp = self._create_trimmer(self._begin_ns,
- self._end_ns, 'trimmer')
- self._graph.connect_ports(self._muxer_comp.output_ports['out'],
- trimmer_comp.input_ports['in'])
+ trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer')
+ self._graph.connect_ports(
+ self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in']
+ )
msg_iter_port = trimmer_comp.output_ports['out']
else:
msg_iter_port = self._muxer_comp.output_ports['out']