X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbindings%2Fpython%2Fbt2%2Fbt2%2Ftrace_collection_message_iterator.py;h=4e7347a92a01128a6265f4b2328739540d3044c5;hb=30947af01a064c13fd12c8faf8f13e3d9fd8087f;hp=74417776d07d15170258b931f075cb1516779efc;hpb=ed47ebcd99fd2fb58770dc733d9318b57858de68;p=babeltrace.git diff --git a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py index 74417776..4e7347a9 100644 --- a/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py +++ b/src/bindings/python/bt2/bt2/trace_collection_message_iterator.py @@ -339,6 +339,47 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): self._build_graph() + def _compute_stream_intersections(self): + # Pre-compute the trimmer range to use for each port in the graph, when + # stream intersection mode is enabled. + self._stream_inter_port_to_range = {} + + for src_comp_and_spec in self._src_comps_and_specs: + try: + inputs = src_comp_and_spec.spec.params['inputs'] + except KeyError as e: + raise ValueError( + 'all source components must be created with an "inputs" parameter in stream intersection mode' + ) from e + + params = {'inputs': inputs} + + # query the port's component for the `babeltrace.trace-info` + # object which contains the range for each stream, from which we can + # compute the intersection of the streams in each trace. + query_exec = bt2.QueryExecutor( + src_comp_and_spec.spec.component_class, 'babeltrace.trace-info', params + ) + trace_infos = query_exec.query() + + for trace_info in trace_infos: + begin = max( + [stream['range-ns']['begin'] for stream in trace_info['streams']] + ) + end = min( + [stream['range-ns']['end'] for stream in trace_info['streams']] + ) + + # Each port associated to this trace will have this computed + # range. + for stream in trace_info['streams']: + # A port name is unique within a component, but not + # necessarily across all components. Use a component + # and port name pair to make it unique across the graph. + port_name = str(stream['port-name']) + key = (src_comp_and_spec.comp.addr, port_name) + self._stream_inter_port_to_range[key] = (begin, end) + def _validate_source_component_specs(self, comp_specs): for comp_spec in comp_specs: if ( @@ -367,49 +408,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): return msg def _create_stream_intersection_trimmer(self, component, port): - # find the original parameters specified by the user to create - # this port's component to get the `inputs` parameter - for src_comp_and_spec in self._src_comps_and_specs: - if component == src_comp_and_spec.comp: - break - - try: - inputs = src_comp_and_spec.spec.params['inputs'] - except Exception as e: - raise ValueError( - 'all source components must be created with an "inputs" parameter in stream intersection mode' - ) from e - - params = {'inputs': inputs} - - # query the port's component for the `babeltrace.trace-info` - # object which contains the stream intersection range for each - # exposed trace - query_exec = bt2.QueryExecutor( - src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params - ) - trace_info_res = query_exec.query() - begin = None - end = None - - # find the trace info for this port's trace - try: - for trace_info in trace_info_res: - for stream in trace_info['streams']: - if stream['port-name'] == port.name: - range_ns = trace_info['intersection-range-ns'] - begin = range_ns['begin'] - end = range_ns['end'] - break - except Exception: - pass - - if begin is None or end is None: - raise RuntimeError( - 'cannot find stream intersection range for port "{}"'.format(port.name) - ) - - name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) + key = (component.addr, port.name) + begin, end = self._stream_inter_port_to_range[key] + name = 'trimmer-{}-{}'.format(component.name, port.name) return self._create_trimmer(begin, end, name) def _create_muxer(self): @@ -574,6 +575,9 @@ class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): comp = self._create_comp(comp_spec) self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) + if self._stream_intersection_mode: + self._compute_stream_intersections() + # Now we connect the ports which exist at this point. We allow # self._graph_port_added() to automatically connect _new_ ports. self._connect_ports = True