from bt2 import utils
import bt2
import itertools
-import bt2.message_iterator
+from bt2 import message_iterator as bt2_message_iterator
+from bt2 import logging as bt2_logging
+from bt2 import port as bt2_port
import datetime
from collections import namedtuple
import numbers
class ComponentSpec:
- def __init__(self, plugin_name, class_name, params=None):
+ def __init__(
+ self,
+ plugin_name,
+ class_name,
+ params=None,
+ logging_level=bt2_logging.LoggingLevel.NONE,
+ ):
utils._check_str(plugin_name)
utils._check_str(class_name)
+ utils._check_log_level(logging_level)
self._plugin_name = plugin_name
self._class_name = class_name
+ self._logging_level = logging_level
if type(params) is str:
- self._params = bt2.create_value({'paths': [params]})
+ self._params = bt2.create_value({'inputs': [params]})
else:
self._params = bt2.create_value(params)
def class_name(self):
return self._class_name
+ @property
+ def logging_level(self):
+ return self._logging_level
+
@property
def params(self):
return self._params
# s -> ns
s = obj.timestamp()
else:
- raise TypeError('"{}" is not an integral number or a datetime.datetime object'.format(obj))
+ raise TypeError(
+ '"{}" is not an integral number or a datetime.datetime object'.format(obj)
+ )
return int(s * 1e9)
FILTER = 1
-class TraceCollectionMessageIterator(bt2.message_iterator._MessageIterator):
- def __init__(self, source_component_specs, filter_component_specs=None,
- stream_intersection_mode=False, begin=None, end=None):
+class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
+ def __init__(
+ self,
+ source_component_specs,
+ filter_component_specs=None,
+ stream_intersection_mode=False,
+ begin=None,
+ end=None,
+ ):
utils._check_bool(stream_intersection_mode)
self._stream_intersection_mode = stream_intersection_mode
self._begin_ns = _get_ns(begin)
def _validate_component_specs(self, comp_specs):
for comp_spec in comp_specs:
if type(comp_spec) is not ComponentSpec:
- raise TypeError('"{}" object is not a ComponentSpec'.format(type(comp_spec)))
+ raise TypeError(
+ '"{}" object is not a ComponentSpec'.format(type(comp_spec))
+ )
def __next__(self):
return next(self._msg_iter)
def _create_stream_intersection_trimmer(self, component, port):
# find the original parameters specified by the user to create
- # this port's component to get the `path` parameter
+ # this port's component to get the `inputs` parameter
for src_comp_and_spec in self._src_comps_and_specs:
if component == src_comp_and_spec.comp:
break
try:
- paths = src_comp_and_spec.spec.params['paths']
+ inputs = src_comp_and_spec.spec.params['inputs']
except Exception as e:
- raise bt2.Error('all source components must be created with a "paths" parameter in stream intersection mode') from e
-
- params = {'paths': paths}
-
- # query the port's component for the `trace-info` object which
- # contains the stream intersection range for each exposed
- # trace
- query_exec = bt2.QueryExecutor()
- trace_info_res = query_exec.query(src_comp_and_spec.comp.cls,
- 'trace-info', params)
+ raise ValueError(
+ 'all source components must be created with an "inputs" parameter in stream intersection mode'
+ ) from e
+
+ params = {'inputs': inputs}
+
+ # query the port's component for the `babeltrace.trace-info`
+ # object which contains the stream intersection range for each
+ # exposed trace
+ query_exec = bt2.QueryExecutor(
+ src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params
+ )
+ trace_info_res = query_exec.query()
begin = None
end = None
pass
if begin is None or end is None:
- raise bt2.Error('cannot find stream intersection range for port "{}"'.format(port.name))
+ raise RuntimeError(
+ 'cannot find stream intersection range for port "{}"'.format(port.name)
+ )
name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name)
return self._create_trimmer(begin, end, name)
plugin = bt2.find_plugin('utils')
if plugin is None:
- raise bt2.Error('cannot find "utils" plugin (needed for the muxer)')
+ raise RuntimeError('cannot find "utils" plugin (needed for the muxer)')
if 'muxer' not in plugin.filter_component_classes:
- raise bt2.Error('cannot find "muxer" filter component class in "utils" plugin')
+ raise RuntimeError(
+ 'cannot find "muxer" filter component class in "utils" plugin'
+ )
comp_cls = plugin.filter_component_classes['muxer']
return self._graph.add_component(comp_cls, 'muxer')
plugin = bt2.find_plugin('utils')
if plugin is None:
- raise bt2.Error('cannot find "utils" plugin (needed for the trimmer)')
+ raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)')
if 'trimmer' not in plugin.filter_component_classes:
- raise bt2.Error('cannot find "trimmer" filter component class in "utils" plugin')
+ raise RuntimeError(
+ 'cannot find "trimmer" filter component class in "utils" plugin'
+ )
params = {}
return self._graph.add_component(comp_cls, name, params)
def _get_unique_comp_name(self, comp_spec):
- name = '{}-{}'.format(comp_spec.plugin_name,
- comp_spec.class_name)
- comps_and_specs = itertools.chain(self._src_comps_and_specs,
- self._flt_comps_and_specs)
+ name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.class_name)
+ comps_and_specs = itertools.chain(
+ self._src_comps_and_specs, self._flt_comps_and_specs
+ )
if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
name += '-{}'.format(self._next_suffix)
plugin = bt2.find_plugin(comp_spec.plugin_name)
if plugin is None:
- raise bt2.Error('no such plugin: {}'.format(comp_spec.plugin_name))
+ raise ValueError('no such plugin: {}'.format(comp_spec.plugin_name))
if comp_cls_type == _CompClsType.SOURCE:
comp_classes = plugin.source_component_classes
if comp_spec.class_name not in comp_classes:
cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter'
- raise bt2.Error('no such {} component class in "{}" plugin: {}'.format(cc_type,
- comp_spec.plugin_name,
- comp_spec.class_name))
+ raise ValueError(
+ 'no such {} component class in "{}" plugin: {}'.format(
+ cc_type, comp_spec.plugin_name, comp_spec.class_name
+ )
+ )
comp_cls = comp_classes[comp_spec.class_name]
name = self._get_unique_comp_name(comp_spec)
- comp = self._graph.add_component(comp_cls, name, comp_spec.params)
+ comp = self._graph.add_component(
+ comp_cls, name, comp_spec.params, comp_spec.logging_level
+ )
return comp
def _get_free_muxer_input_port(self):
if not self._connect_ports:
return
- if type(port) is bt2.port._InputPort:
+ if type(port) is bt2_port._InputPort:
return
if component not in [comp.comp for comp in self._src_comps_and_specs]:
self._muxer_comp = self._create_muxer()
if self._begin_ns is not None or self._end_ns is not None:
- trimmer_comp = self._create_trimmer(self._begin_ns,
- self._end_ns, 'trimmer')
- self._graph.connect_ports(self._muxer_comp.output_ports['out'],
- trimmer_comp.input_ports['in'])
+ trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer')
+ self._graph.connect_ports(
+ self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in']
+ )
msg_iter_port = trimmer_comp.output_ports['out']
else:
msg_iter_port = self._muxer_comp.output_ports['out']