X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbindings%2Fpython%2Fbt2%2Fbt2%2Ftrace_collection_message_iterator.py;h=3a6b95212137980f6a98b1f0d3cf336a7f87f872;hb=HEAD;hp=6c1991f93f3ae3e839f55a7a7937659957e7d4d2;hpb=59225a3e0e13a9c674234755e55055d9ff68d635;p=babeltrace.git diff --git a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py index 6c1991f9..3a6b9521 100644 --- a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py +++ b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py @@ -1,40 +1,28 @@ -# The MIT License (MIT) +# SPDX-License-Identifier: MIT # # Copyright (c) 2017 Philippe Proulx -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -from bt2 import utils, native_bt -import bt2 + +import numbers +import datetime import itertools -from bt2 import message_iterator as bt2_message_iterator +from collections import namedtuple + +from bt2 import mip as bt2_mip from bt2 import port as bt2_port -from bt2 import component as bt2_component +from bt2 import error as bt2_error +from bt2 import graph as bt2_graph +from bt2 import utils as bt2_utils from bt2 import value as bt2_value from bt2 import plugin as bt2_plugin -import datetime -from collections import namedtuple -import numbers - +from bt2 import logging as bt2_logging +from bt2 import component as bt2_component +from bt2 import native_bt +from bt2 import query_executor as bt2_query_executor +from bt2 import message_iterator as bt2_message_iterator +from bt2 import component_descriptor as bt2_component_descriptor # a pair of component and ComponentSpec -_ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) +_ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"]) class _BaseComponentSpec: @@ -42,9 +30,9 @@ class _BaseComponentSpec: # TraceCollectionMessageIterator. def __init__(self, params, obj, logging_level): if logging_level is not None: - utils._check_log_level(logging_level) + bt2_utils._check_log_level(logging_level) - self._params = bt2.create_value(params) + self._params = bt2_value.create_value(params) self._obj = obj self._logging_level = logging_level @@ -68,21 +56,25 @@ class ComponentSpec(_BaseComponentSpec): component_class, params=None, obj=None, - logging_level=bt2.LoggingLevel.NONE, + logging_level=bt2_logging.LoggingLevel.NONE, ): if type(params) is str: - params = {'inputs': [params]} + params = {"inputs": [params]} super().__init__(params, obj, logging_level) is_cc_object = isinstance( component_class, - (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst), + ( + bt2_component._SourceComponentClassConst, + bt2_component._FilterComponentClassConst, + ), ) is_user_cc_type = isinstance( component_class, bt2_component._UserComponentType ) and issubclass( - component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent) + component_class, + (bt2_component._UserSourceComponent, bt2_component._UserFilterComponent), ) if not is_cc_object and not is_user_cc_type: @@ -105,12 +97,12 @@ class ComponentSpec(_BaseComponentSpec): component_class_name, params=None, obj=None, - logging_level=bt2.LoggingLevel.NONE, + logging_level=bt2_logging.LoggingLevel.NONE, ): - plugin = bt2.find_plugin(plugin_name) + plugin = bt2_plugin.find_plugin(plugin_name) if plugin is None: - raise ValueError('no such plugin: {}'.format(plugin_name)) + raise ValueError("no such plugin: {}".format(plugin_name)) if component_class_name in plugin.source_component_classes: comp_class = plugin.source_component_classes[component_class_name] @@ -118,7 +110,7 @@ class ComponentSpec(_BaseComponentSpec): comp_class = plugin.filter_component_classes[component_class_name] else: raise KeyError( - 'source or filter component class `{}` not found in plugin `{}`'.format( + "source or filter component class `{}` not found in plugin `{}`".format( component_class_name, plugin_name ) ) @@ -142,54 +134,54 @@ class AutoSourceComponentSpec(_BaseComponentSpec): def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec` # using the automatic source discovery mechanism. - inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs]) + inputs = bt2_value.ArrayValue([spec.input for spec in auto_source_comp_specs]) if plugin_set is None: - plugin_set = bt2.find_plugins() + plugin_set = bt2_plugin.find_plugins() else: - utils._check_type(plugin_set, bt2_plugin._PluginSet) + bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet) res_ptr = native_bt.bt2_auto_discover_source_components( inputs._ptr, plugin_set._ptr ) if res_ptr is None: - raise bt2._MemoryError('cannot auto discover source components') + raise bt2_error._MemoryError("cannot auto discover source components") res = bt2_value._create_from_ptr(res_ptr) - assert type(res) == bt2.MapValue - assert 'status' in res + assert type(res) is bt2_value.MapValue + assert "status" in res - status = res['status'] - utils._handle_func_status(status, 'cannot auto-discover source components') + status = res["status"] + bt2_utils._handle_func_status(status, "cannot auto-discover source components") comp_specs = [] - comp_specs_raw = res['results'] - assert type(comp_specs_raw) == bt2.ArrayValue + comp_specs_raw = res["results"] + assert type(comp_specs_raw) is bt2_value.ArrayValue used_input_indices = set() for comp_spec_raw in comp_specs_raw: - assert type(comp_spec_raw) == bt2.ArrayValue + assert type(comp_spec_raw) is bt2_value.ArrayValue assert len(comp_spec_raw) == 4 plugin_name = comp_spec_raw[0] - assert type(plugin_name) == bt2.StringValue + assert type(plugin_name) is bt2_value.StringValue plugin_name = str(plugin_name) class_name = comp_spec_raw[1] - assert type(class_name) == bt2.StringValue + assert type(class_name) is bt2_value.StringValue class_name = str(class_name) comp_inputs = comp_spec_raw[2] - assert type(comp_inputs) == bt2.ArrayValue + assert type(comp_inputs) is bt2_value.ArrayValue comp_orig_indices = comp_spec_raw[3] assert type(comp_orig_indices) - params = bt2.MapValue() - logging_level = bt2.LoggingLevel.NONE + params = bt2_value.MapValue() + logging_level = bt2_logging.LoggingLevel.NONE obj = None # Compute `params` for this component by piling up params given to all @@ -213,7 +205,7 @@ def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): used_input_indices.add(int(idx)) - params['inputs'] = comp_inputs + params["inputs"] = comp_inputs comp_specs.append( ComponentSpec.from_named_plugin_and_component_class( @@ -231,8 +223,8 @@ def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): unused_inputs = [str(inputs[x]) for x in unused_input_indices] msg = ( - 'Some auto source component specs did not produce any component: ' - + ', '.join(unused_inputs) + "Some auto source component specs did not produce any component: " + + ", ".join(unused_inputs) ) raise RuntimeError(msg) @@ -262,12 +254,10 @@ class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent) def __init__(self, config, params, msg_list): assert type(msg_list) is list self._msg_list = msg_list - self._add_input_port('in') + self._add_input_port("in") def _user_graph_is_configured(self): - self._msg_iter = self._create_input_port_message_iterator( - self._input_ports['in'] - ) + self._msg_iter = self._create_message_iterator(self._input_ports["in"]) def _user_consume(self): assert self._msg_list[0] is None @@ -284,7 +274,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): end=None, plugin_set=None, ): - utils._check_bool(stream_intersection_mode) + bt2_utils._check_bool(stream_intersection_mode) self._stream_intersection_mode = stream_intersection_mode self._begin_ns = _get_ns(begin) self._end_ns = _get_ns(end) @@ -349,9 +339,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # Query the port's component for the `babeltrace.trace-infos` # object which contains the range for each stream, from which we can # compute the intersection of the streams in each trace. - query_exec = bt2.QueryExecutor( + query_exec = bt2_query_executor.QueryExecutor( src_comp_and_spec.spec.component_class, - 'babeltrace.trace-infos', + "babeltrace.trace-infos", src_comp_and_spec.spec.params, ) trace_infos = query_exec.query() @@ -359,21 +349,21 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): for trace_info in trace_infos: begin = max( [ - stream['range-ns']['begin'] - for stream in trace_info['stream-infos'] + stream["range-ns"]["begin"] + for stream in trace_info["stream-infos"] ] ) end = min( - [stream['range-ns']['end'] for stream in trace_info['stream-infos']] + [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]] ) # Each port associated to this trace will have this computed # range. - for stream in trace_info['stream-infos']: + for stream in trace_info["stream-infos"]: # A port name is unique within a component, but not # necessarily across all components. Use a component # and port name pair to make it unique across the graph. - port_name = str(stream['port-name']) + port_name = str(stream["port-name"]) key = (src_comp_and_spec.comp.addr, port_name) self._stream_inter_port_to_range[key] = (begin, end) @@ -407,30 +397,30 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): def _create_stream_intersection_trimmer(self, component, port): key = (component.addr, port.name) begin, end = self._stream_inter_port_to_range[key] - name = 'trimmer-{}-{}'.format(component.name, port.name) + name = "trimmer-{}-{}".format(component.name, port.name) return self._create_trimmer(begin, end, name) def _create_muxer(self): - plugin = bt2.find_plugin('utils') + plugin = bt2_plugin.find_plugin("utils") if plugin is None: raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') - if 'muxer' not in plugin.filter_component_classes: + if "muxer" not in plugin.filter_component_classes: raise RuntimeError( 'cannot find "muxer" filter component class in "utils" plugin' ) - comp_cls = plugin.filter_component_classes['muxer'] - return self._graph.add_component(comp_cls, 'muxer') + comp_cls = plugin.filter_component_classes["muxer"] + return self._graph.add_component(comp_cls, "muxer") def _create_trimmer(self, begin_ns, end_ns, name): - plugin = bt2.find_plugin('utils') + plugin = bt2_plugin.find_plugin("utils") if plugin is None: raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') - if 'trimmer' not in plugin.filter_component_classes: + if "trimmer" not in plugin.filter_component_classes: raise RuntimeError( 'cannot find "trimmer" filter component class in "utils" plugin' ) @@ -440,15 +430,15 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): def ns_to_string(ns): s_part = ns // 1000000000 ns_part = ns % 1000000000 - return '{}.{:09d}'.format(s_part, ns_part) + return "{}.{:09d}".format(s_part, ns_part) if begin_ns is not None: - params['begin'] = ns_to_string(begin_ns) + params["begin"] = ns_to_string(begin_ns) if end_ns is not None: - params['end'] = ns_to_string(end_ns) + params["end"] = ns_to_string(end_ns) - comp_cls = plugin.filter_component_classes['trimmer'] + comp_cls = plugin.filter_component_classes["trimmer"] return self._graph.add_component(comp_cls, name, params) def _get_unique_comp_name(self, comp_cls): @@ -458,7 +448,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): ) if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: - name += '-{}'.format(self._next_suffix) + name += "-{}".format(self._next_suffix) self._next_suffix += 1 return name @@ -487,8 +477,8 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # port -> muxer if self._stream_intersection_mode: trimmer_comp = self._create_stream_intersection_trimmer(component, port) - self._graph.connect_ports(port, trimmer_comp.input_ports['in']) - port_to_muxer = trimmer_comp.output_ports['out'] + self._graph.connect_ports(port, trimmer_comp.input_ports["in"]) + port_to_muxer = trimmer_comp.output_ports["out"] else: port_to_muxer = port @@ -511,7 +501,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): def append_comp_specs_descriptors(descriptors, comp_specs): for comp_spec in comp_specs: descriptors.append( - bt2.ComponentDescriptor( + bt2_component_descriptor.ComponentDescriptor( comp_spec.component_class, comp_spec.params, comp_spec.obj ) ) @@ -523,31 +513,31 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): if self._stream_intersection_mode: # we also need at least one `flt.utils.trimmer` component comp_spec = ComponentSpec.from_named_plugin_and_component_class( - 'utils', 'trimmer' + "utils", "trimmer" ) append_comp_specs_descriptors(descriptors, [comp_spec]) - mip_version = bt2.get_greatest_operative_mip_version(descriptors) + mip_version = bt2_mip.get_greatest_operative_mip_version(descriptors) if mip_version is None: - msg = 'failed to find an operative message interchange protocol version (components are not interoperable)' + msg = "failed to find an operative message interchange protocol version (components are not interoperable)" raise RuntimeError(msg) return mip_version def _build_graph(self): - self._graph = bt2.Graph(self._get_greatest_operative_mip_version()) + self._graph = bt2_graph.Graph(self._get_greatest_operative_mip_version()) self._graph.add_port_added_listener(self._graph_port_added) self._muxer_comp = self._create_muxer() if self._begin_ns is not None or self._end_ns is not None: - trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') + trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer") self._graph.connect_ports( - self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] + self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"] ) - last_flt_out_port = trimmer_comp.output_ports['out'] + last_flt_out_port = trimmer_comp.output_ports["out"] else: - last_flt_out_port = self._muxer_comp.output_ports['out'] + last_flt_out_port = self._muxer_comp.output_ports["out"] # create extra filter components (chained) for comp_spec in self._flt_comp_specs: @@ -594,9 +584,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # Add the proxy sink, passing our message list to share consumed # messages with this trace collection message iterator. sink = self._graph.add_component( - _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list + _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list ) - sink_in_port = sink.input_ports['in'] + sink_in_port = sink.input_ports["in"] # connect last filter to proxy sink self._graph.connect_ports(last_flt_out_port, sink_in_port)