X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbindings%2Fpython%2Fbt2%2Fbt2%2Ftrace_collection_message_iterator.py;h=ed474cd09c92a04475e1b348f681ef719605badd;hb=f5567ea88d172767b34373bc6e402da8bfd85ef8;hp=56b48da9ff8061fe4a96e70870495ec711c313a2;hpb=f3c9a159782f70dbd0e5dedb37e4a1ef8a6d304e;p=babeltrace.git diff --git a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py index 56b48da9..ed474cd0 100644 --- a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py +++ b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py @@ -1,30 +1,11 @@ -# The MIT License (MIT) +# SPDX-License-Identifier: MIT # # Copyright (c) 2017 Philippe Proulx -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. from bt2 import utils, native_bt import bt2 import itertools from bt2 import message_iterator as bt2_message_iterator -from bt2 import logging as bt2_logging from bt2 import port as bt2_port from bt2 import component as bt2_component from bt2 import value as bt2_value @@ -35,10 +16,12 @@ import numbers # a pair of component and ComponentSpec -_ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) +_ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"]) class _BaseComponentSpec: + # Base for any component spec that can be passed to + # TraceCollectionMessageIterator. def __init__(self, params, obj, logging_level): if logging_level is not None: utils._check_log_level(logging_level) @@ -61,35 +44,72 @@ class _BaseComponentSpec: class ComponentSpec(_BaseComponentSpec): + # A component spec with a specific component class. def __init__( self, - plugin_name, - class_name, + component_class, params=None, obj=None, - logging_level=bt2_logging.LoggingLevel.NONE, + logging_level=bt2.LoggingLevel.NONE, ): if type(params) is str: - params = {'inputs': [params]} + params = {"inputs": [params]} super().__init__(params, obj, logging_level) - utils._check_str(plugin_name) - utils._check_str(class_name) + is_cc_object = isinstance( + component_class, + (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst), + ) + is_user_cc_type = isinstance( + component_class, bt2_component._UserComponentType + ) and issubclass( + component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent) + ) + + if not is_cc_object and not is_user_cc_type: + raise TypeError( + "'{}' is not a source or filter component class".format( + component_class.__class__.__name__ + ) + ) - self._plugin_name = plugin_name - self._class_name = class_name + self._component_class = component_class @property - def plugin_name(self): - return self._plugin_name + def component_class(self): + return self._component_class - @property - def class_name(self): - return self._class_name + @classmethod + def from_named_plugin_and_component_class( + cls, + plugin_name, + component_class_name, + params=None, + obj=None, + logging_level=bt2.LoggingLevel.NONE, + ): + plugin = bt2.find_plugin(plugin_name) + + if plugin is None: + raise ValueError("no such plugin: {}".format(plugin_name)) + + if component_class_name in plugin.source_component_classes: + comp_class = plugin.source_component_classes[component_class_name] + elif component_class_name in plugin.filter_component_classes: + comp_class = plugin.filter_component_classes[component_class_name] + else: + raise KeyError( + "source or filter component class `{}` not found in plugin `{}`".format( + component_class_name, plugin_name + ) + ) + + return cls(comp_class, params, obj, logging_level) class AutoSourceComponentSpec(_BaseComponentSpec): + # A component spec that does automatic source discovery. _no_obj = object() def __init__(self, input, params=None, obj=_no_obj, logging_level=None): @@ -116,20 +136,22 @@ def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): ) if res_ptr is None: - raise bt2._MemoryError('cannot auto discover source components') + raise bt2._MemoryError("cannot auto discover source components") res = bt2_value._create_from_ptr(res_ptr) assert type(res) == bt2.MapValue - assert 'status' in res + assert "status" in res - status = res['status'] - utils._handle_func_status(status, 'cannot auto-discover source components') + status = res["status"] + utils._handle_func_status(status, "cannot auto-discover source components") comp_specs = [] - comp_specs_raw = res['results'] + comp_specs_raw = res["results"] assert type(comp_specs_raw) == bt2.ArrayValue + used_input_indices = set() + for comp_spec_raw in comp_specs_raw: assert type(comp_spec_raw) == bt2.ArrayValue assert len(comp_spec_raw) == 4 @@ -171,10 +193,12 @@ def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): if orig_spec.obj is not AutoSourceComponentSpec._no_obj: obj = orig_spec.obj - params['inputs'] = comp_inputs + used_input_indices.add(int(idx)) + + params["inputs"] = comp_inputs comp_specs.append( - ComponentSpec( + ComponentSpec.from_named_plugin_and_component_class( plugin_name, class_name, params=params, @@ -183,6 +207,17 @@ def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): ) ) + if len(used_input_indices) != len(inputs): + unused_input_indices = set(range(len(inputs))) - used_input_indices + unused_input_indices = sorted(unused_input_indices) + unused_inputs = [str(inputs[x]) for x in unused_input_indices] + + msg = ( + "Some auto source component specs did not produce any component: " + + ", ".join(unused_inputs) + ) + raise RuntimeError(msg) + return comp_specs @@ -205,21 +240,14 @@ def _get_ns(obj): return int(s * 1e9) -class _CompClsType: - SOURCE = 0 - FILTER = 1 - - class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): - def __init__(self, params, msg_list): + def __init__(self, config, params, msg_list): assert type(msg_list) is list self._msg_list = msg_list - self._add_input_port('in') + self._add_input_port("in") def _user_graph_is_configured(self): - self._msg_iter = self._create_input_port_message_iterator( - self._input_ports['in'] - ) + self._msg_iter = self._create_message_iterator(self._input_ports["in"]) def _user_consume(self): assert self._msg_list[0] is None @@ -292,6 +320,43 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._build_graph() + def _compute_stream_intersections(self): + # Pre-compute the trimmer range to use for each port in the graph, when + # stream intersection mode is enabled. + self._stream_inter_port_to_range = {} + + for src_comp_and_spec in self._src_comps_and_specs: + # Query the port's component for the `babeltrace.trace-infos` + # object which contains the range for each stream, from which we can + # compute the intersection of the streams in each trace. + query_exec = bt2.QueryExecutor( + src_comp_and_spec.spec.component_class, + "babeltrace.trace-infos", + src_comp_and_spec.spec.params, + ) + trace_infos = query_exec.query() + + for trace_info in trace_infos: + begin = max( + [ + stream["range-ns"]["begin"] + for stream in trace_info["stream-infos"] + ] + ) + end = min( + [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]] + ) + + # Each port associated to this trace will have this computed + # range. + for stream in trace_info["stream-infos"]: + # A port name is unique within a component, but not + # necessarily across all components. Use a component + # and port name pair to make it unique across the graph. + port_name = str(stream["port-name"]) + key = (src_comp_and_spec.comp.addr, port_name) + self._stream_inter_port_to_range[key] = (begin, end) + def _validate_source_component_specs(self, comp_specs): for comp_spec in comp_specs: if ( @@ -320,72 +385,32 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): return msg def _create_stream_intersection_trimmer(self, component, port): - # find the original parameters specified by the user to create - # this port's component to get the `inputs` parameter - for src_comp_and_spec in self._src_comps_and_specs: - if component == src_comp_and_spec.comp: - break - - try: - inputs = src_comp_and_spec.spec.params['inputs'] - except Exception as e: - raise ValueError( - 'all source components must be created with an "inputs" parameter in stream intersection mode' - ) from e - - params = {'inputs': inputs} - - # query the port's component for the `babeltrace.trace-info` - # object which contains the stream intersection range for each - # exposed trace - query_exec = bt2.QueryExecutor( - src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params - ) - trace_info_res = query_exec.query() - begin = None - end = None - - # find the trace info for this port's trace - try: - for trace_info in trace_info_res: - for stream in trace_info['streams']: - if stream['port-name'] == port.name: - range_ns = trace_info['intersection-range-ns'] - begin = range_ns['begin'] - end = range_ns['end'] - break - except Exception: - pass - - if begin is None or end is None: - raise RuntimeError( - 'cannot find stream intersection range for port "{}"'.format(port.name) - ) - - name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) + key = (component.addr, port.name) + begin, end = self._stream_inter_port_to_range[key] + name = "trimmer-{}-{}".format(component.name, port.name) return self._create_trimmer(begin, end, name) def _create_muxer(self): - plugin = bt2.find_plugin('utils') + plugin = bt2.find_plugin("utils") if plugin is None: raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') - if 'muxer' not in plugin.filter_component_classes: + if "muxer" not in plugin.filter_component_classes: raise RuntimeError( 'cannot find "muxer" filter component class in "utils" plugin' ) - comp_cls = plugin.filter_component_classes['muxer'] - return self._graph.add_component(comp_cls, 'muxer') + comp_cls = plugin.filter_component_classes["muxer"] + return self._graph.add_component(comp_cls, "muxer") def _create_trimmer(self, begin_ns, end_ns, name): - plugin = bt2.find_plugin('utils') + plugin = bt2.find_plugin("utils") if plugin is None: raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') - if 'trimmer' not in plugin.filter_component_classes: + if "trimmer" not in plugin.filter_component_classes: raise RuntimeError( 'cannot find "trimmer" filter component class in "utils" plugin' ) @@ -395,50 +420,32 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): def ns_to_string(ns): s_part = ns // 1000000000 ns_part = ns % 1000000000 - return '{}.{:09d}'.format(s_part, ns_part) + return "{}.{:09d}".format(s_part, ns_part) if begin_ns is not None: - params['begin'] = ns_to_string(begin_ns) + params["begin"] = ns_to_string(begin_ns) if end_ns is not None: - params['end'] = ns_to_string(end_ns) + params["end"] = ns_to_string(end_ns) - comp_cls = plugin.filter_component_classes['trimmer'] + comp_cls = plugin.filter_component_classes["trimmer"] return self._graph.add_component(comp_cls, name, params) - def _get_unique_comp_name(self, comp_spec): - name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.class_name) + def _get_unique_comp_name(self, comp_cls): + name = comp_cls.name comps_and_specs = itertools.chain( self._src_comps_and_specs, self._flt_comps_and_specs ) if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: - name += '-{}'.format(self._next_suffix) + name += "-{}".format(self._next_suffix) self._next_suffix += 1 return name - def _create_comp(self, comp_spec, comp_cls_type): - plugin = bt2.find_plugin(comp_spec.plugin_name) - - if plugin is None: - raise ValueError('no such plugin: {}'.format(comp_spec.plugin_name)) - - if comp_cls_type == _CompClsType.SOURCE: - comp_classes = plugin.source_component_classes - else: - comp_classes = plugin.filter_component_classes - - if comp_spec.class_name not in comp_classes: - cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter' - raise ValueError( - 'no such {} component class in "{}" plugin: {}'.format( - cc_type, comp_spec.plugin_name, comp_spec.class_name - ) - ) - - comp_cls = comp_classes[comp_spec.class_name] - name = self._get_unique_comp_name(comp_spec) + def _create_comp(self, comp_spec): + comp_cls = comp_spec.component_class + name = self._get_unique_comp_name(comp_cls) comp = self._graph.add_component( comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level ) @@ -460,8 +467,8 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # port -> muxer if self._stream_intersection_mode: trimmer_comp = self._create_stream_intersection_trimmer(component, port) - self._graph.connect_ports(port, trimmer_comp.input_ports['in']) - port_to_muxer = trimmer_comp.output_ports['out'] + self._graph.connect_ports(port, trimmer_comp.input_ports["in"]) + port_to_muxer = trimmer_comp.output_ports["out"] else: port_to_muxer = port @@ -471,7 +478,7 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): if not self._connect_ports: return - if type(port) is bt2_port._InputPort: + if type(port) is bt2_port._InputPortConst: return if component not in [comp.comp for comp in self._src_comps_and_specs]: @@ -480,23 +487,51 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._connect_src_comp_port(component, port) + def _get_greatest_operative_mip_version(self): + def append_comp_specs_descriptors(descriptors, comp_specs): + for comp_spec in comp_specs: + descriptors.append( + bt2.ComponentDescriptor( + comp_spec.component_class, comp_spec.params, comp_spec.obj + ) + ) + + descriptors = [] + append_comp_specs_descriptors(descriptors, self._src_comp_specs) + append_comp_specs_descriptors(descriptors, self._flt_comp_specs) + + if self._stream_intersection_mode: + # we also need at least one `flt.utils.trimmer` component + comp_spec = ComponentSpec.from_named_plugin_and_component_class( + "utils", "trimmer" + ) + append_comp_specs_descriptors(descriptors, [comp_spec]) + + mip_version = bt2.get_greatest_operative_mip_version(descriptors) + + if mip_version is None: + msg = "failed to find an operative message interchange protocol version (components are not interoperable)" + raise RuntimeError(msg) + + return mip_version + def _build_graph(self): - self._graph = bt2.Graph() + self._graph = bt2.Graph(self._get_greatest_operative_mip_version()) self._graph.add_port_added_listener(self._graph_port_added) self._muxer_comp = self._create_muxer() if self._begin_ns is not None or self._end_ns is not None: - trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') + trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer") self._graph.connect_ports( - self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] + self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"] ) - last_flt_out_port = trimmer_comp.output_ports['out'] + last_flt_out_port = trimmer_comp.output_ports["out"] else: - last_flt_out_port = self._muxer_comp.output_ports['out'] + last_flt_out_port = self._muxer_comp.output_ports["out"] # create extra filter components (chained) for comp_spec in self._flt_comp_specs: - comp = self._create_comp(comp_spec, _CompClsType.FILTER) + comp = self._create_comp(comp_spec) self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) # connect the extra filter chain @@ -514,9 +549,12 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # it does not exist yet (it needs the created component to # exist). for comp_spec in self._src_comp_specs: - comp = self._create_comp(comp_spec, _CompClsType.SOURCE) + comp = self._create_comp(comp_spec) self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) + if self._stream_intersection_mode: + self._compute_stream_intersections() + # Now we connect the ports which exist at this point. We allow # self._graph_port_added() to automatically connect _new_ ports. self._connect_ports = True @@ -536,9 +574,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): # Add the proxy sink, passing our message list to share consumed # messages with this trace collection message iterator. sink = self._graph.add_component( - _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list + _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list ) - sink_in_port = sink.input_ports['in'] + sink_in_port = sink.input_ports["in"] # connect last filter to proxy sink self._graph.connect_ports(last_flt_out_port, sink_in_port)