from bt2 import native_bt
from bt2 import utils as bt2_utils
-import bt2
+from bt2 import logging as bt2_logging
import itertools
from bt2 import message_iterator as bt2_message_iterator
from bt2 import port as bt2_port
from bt2 import component as bt2_component
from bt2 import value as bt2_value
from bt2 import plugin as bt2_plugin
+from bt2 import error as bt2_error
+from bt2 import query_executor as bt2_query_executor
+from bt2 import component_descriptor as bt2_component_descriptor
+from bt2 import mip as bt2_mip
+from bt2 import graph as bt2_graph
import datetime
from collections import namedtuple
import numbers
if logging_level is not None:
bt2_utils._check_log_level(logging_level)
- self._params = bt2.create_value(params)
+ self._params = bt2_value.create_value(params)
self._obj = obj
self._logging_level = logging_level
component_class,
params=None,
obj=None,
- logging_level=bt2.LoggingLevel.NONE,
+ logging_level=bt2_logging.LoggingLevel.NONE,
):
if type(params) is str:
params = {"inputs": [params]}
is_cc_object = isinstance(
component_class,
- (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst),
+ (
+ bt2_component._SourceComponentClassConst,
+ bt2_component._FilterComponentClassConst,
+ ),
)
is_user_cc_type = isinstance(
component_class, bt2_component._UserComponentType
) and issubclass(
- component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent)
+ component_class,
+ (bt2_component._UserSourceComponent, bt2_component._UserFilterComponent),
)
if not is_cc_object and not is_user_cc_type:
component_class_name,
params=None,
obj=None,
- logging_level=bt2.LoggingLevel.NONE,
+ logging_level=bt2_logging.LoggingLevel.NONE,
):
- plugin = bt2.find_plugin(plugin_name)
+ plugin = bt2_plugin.find_plugin(plugin_name)
if plugin is None:
raise ValueError("no such plugin: {}".format(plugin_name))
def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set):
# Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec`
# using the automatic source discovery mechanism.
- inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs])
+ inputs = bt2_value.ArrayValue([spec.input for spec in auto_source_comp_specs])
if plugin_set is None:
- plugin_set = bt2.find_plugins()
+ plugin_set = bt2_plugin.find_plugins()
else:
bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet)
)
if res_ptr is None:
- raise bt2._MemoryError("cannot auto discover source components")
+ raise bt2_error._MemoryError("cannot auto discover source components")
res = bt2_value._create_from_ptr(res_ptr)
- assert type(res) is bt2.MapValue
+ assert type(res) is bt2_value.MapValue
assert "status" in res
status = res["status"]
comp_specs = []
comp_specs_raw = res["results"]
- assert type(comp_specs_raw) is bt2.ArrayValue
+ assert type(comp_specs_raw) is bt2_value.ArrayValue
used_input_indices = set()
for comp_spec_raw in comp_specs_raw:
- assert type(comp_spec_raw) is bt2.ArrayValue
+ assert type(comp_spec_raw) is bt2_value.ArrayValue
assert len(comp_spec_raw) == 4
plugin_name = comp_spec_raw[0]
- assert type(plugin_name) is bt2.StringValue
+ assert type(plugin_name) is bt2_value.StringValue
plugin_name = str(plugin_name)
class_name = comp_spec_raw[1]
- assert type(class_name) is bt2.StringValue
+ assert type(class_name) is bt2_value.StringValue
class_name = str(class_name)
comp_inputs = comp_spec_raw[2]
- assert type(comp_inputs) is bt2.ArrayValue
+ assert type(comp_inputs) is bt2_value.ArrayValue
comp_orig_indices = comp_spec_raw[3]
assert type(comp_orig_indices)
- params = bt2.MapValue()
- logging_level = bt2.LoggingLevel.NONE
+ params = bt2_value.MapValue()
+ logging_level = bt2_logging.LoggingLevel.NONE
obj = None
# Compute `params` for this component by piling up params given to all
# Query the port's component for the `babeltrace.trace-infos`
# object which contains the range for each stream, from which we can
# compute the intersection of the streams in each trace.
- query_exec = bt2.QueryExecutor(
+ query_exec = bt2_query_executor.QueryExecutor(
src_comp_and_spec.spec.component_class,
"babeltrace.trace-infos",
src_comp_and_spec.spec.params,
return self._create_trimmer(begin, end, name)
def _create_muxer(self):
- plugin = bt2.find_plugin("utils")
+ plugin = bt2_plugin.find_plugin("utils")
if plugin is None:
raise RuntimeError('cannot find "utils" plugin (needed for the muxer)')
return self._graph.add_component(comp_cls, "muxer")
def _create_trimmer(self, begin_ns, end_ns, name):
- plugin = bt2.find_plugin("utils")
+ plugin = bt2_plugin.find_plugin("utils")
if plugin is None:
raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)')
def append_comp_specs_descriptors(descriptors, comp_specs):
for comp_spec in comp_specs:
descriptors.append(
- bt2.ComponentDescriptor(
+ bt2_component_descriptor.ComponentDescriptor(
comp_spec.component_class, comp_spec.params, comp_spec.obj
)
)
)
append_comp_specs_descriptors(descriptors, [comp_spec])
- mip_version = bt2.get_greatest_operative_mip_version(descriptors)
+ mip_version = bt2_mip.get_greatest_operative_mip_version(descriptors)
if mip_version is None:
msg = "failed to find an operative message interchange protocol version (components are not interoperable)"
return mip_version
def _build_graph(self):
- self._graph = bt2.Graph(self._get_greatest_operative_mip_version())
+ self._graph = bt2_graph.Graph(self._get_greatest_operative_mip_version())
self._graph.add_port_added_listener(self._graph_port_added)
self._muxer_comp = self._create_muxer()