Commit | Line | Data |
---|---|---|
0235b0db | 1 | # SPDX-License-Identifier: MIT |
85dcce24 PP |
2 | # |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
85dcce24 | 4 | |
5995b304 SM |
5 | import numbers |
6 | import datetime | |
3d60267b | 7 | import itertools |
5995b304 SM |
8 | from collections import namedtuple |
9 | ||
10 | from bt2 import mip as bt2_mip | |
3fb99a22 | 11 | from bt2 import port as bt2_port |
5995b304 SM |
12 | from bt2 import error as bt2_error |
13 | from bt2 import graph as bt2_graph | |
14 | from bt2 import utils as bt2_utils | |
f3c9a159 SM |
15 | from bt2 import value as bt2_value |
16 | from bt2 import plugin as bt2_plugin | |
5995b304 SM |
17 | from bt2 import logging as bt2_logging |
18 | from bt2 import component as bt2_component | |
19 | from bt2 import native_bt | |
c345b078 | 20 | from bt2 import query_executor as bt2_query_executor |
5995b304 | 21 | from bt2 import message_iterator as bt2_message_iterator |
c345b078 | 22 | from bt2 import component_descriptor as bt2_component_descriptor |
85dcce24 | 23 | |
3d60267b | 24 | # a pair of component and ComponentSpec |
f5567ea8 | 25 | _ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"]) |
85dcce24 PP |
26 | |
27 | ||
f3c9a159 | 28 | class _BaseComponentSpec: |
c87f23fa SM |
29 | # Base for any component spec that can be passed to |
30 | # TraceCollectionMessageIterator. | |
f3c9a159 SM |
31 | def __init__(self, params, obj, logging_level): |
32 | if logging_level is not None: | |
e5914347 | 33 | bt2_utils._check_log_level(logging_level) |
f3c9a159 | 34 | |
c345b078 | 35 | self._params = bt2_value.create_value(params) |
f3c9a159 SM |
36 | self._obj = obj |
37 | self._logging_level = logging_level | |
38 | ||
39 | @property | |
40 | def params(self): | |
41 | return self._params | |
42 | ||
43 | @property | |
44 | def obj(self): | |
45 | return self._obj | |
46 | ||
47 | @property | |
48 | def logging_level(self): | |
49 | return self._logging_level | |
50 | ||
51 | ||
52 | class ComponentSpec(_BaseComponentSpec): | |
c87f23fa | 53 | # A component spec with a specific component class. |
cfbd7cf3 FD |
54 | def __init__( |
55 | self, | |
c87f23fa | 56 | component_class, |
cfbd7cf3 | 57 | params=None, |
66964f3f | 58 | obj=None, |
c345b078 | 59 | logging_level=bt2_logging.LoggingLevel.NONE, |
cfbd7cf3 | 60 | ): |
f3c9a159 | 61 | if type(params) is str: |
f5567ea8 | 62 | params = {"inputs": [params]} |
f3c9a159 SM |
63 | |
64 | super().__init__(params, obj, logging_level) | |
65 | ||
c87f23fa | 66 | is_cc_object = isinstance( |
615238be | 67 | component_class, |
c345b078 SM |
68 | ( |
69 | bt2_component._SourceComponentClassConst, | |
70 | bt2_component._FilterComponentClassConst, | |
71 | ), | |
c87f23fa SM |
72 | ) |
73 | is_user_cc_type = isinstance( | |
74 | component_class, bt2_component._UserComponentType | |
75 | ) and issubclass( | |
c345b078 SM |
76 | component_class, |
77 | (bt2_component._UserSourceComponent, bt2_component._UserFilterComponent), | |
c87f23fa | 78 | ) |
f3c9a159 | 79 | |
c87f23fa SM |
80 | if not is_cc_object and not is_user_cc_type: |
81 | raise TypeError( | |
82 | "'{}' is not a source or filter component class".format( | |
83 | component_class.__class__.__name__ | |
84 | ) | |
85 | ) | |
85dcce24 | 86 | |
c87f23fa | 87 | self._component_class = component_class |
85dcce24 PP |
88 | |
89 | @property | |
c87f23fa SM |
90 | def component_class(self): |
91 | return self._component_class | |
92 | ||
93 | @classmethod | |
94 | def from_named_plugin_and_component_class( | |
95 | cls, | |
96 | plugin_name, | |
97 | component_class_name, | |
98 | params=None, | |
99 | obj=None, | |
c345b078 | 100 | logging_level=bt2_logging.LoggingLevel.NONE, |
c87f23fa | 101 | ): |
c345b078 | 102 | plugin = bt2_plugin.find_plugin(plugin_name) |
c87f23fa SM |
103 | |
104 | if plugin is None: | |
f5567ea8 | 105 | raise ValueError("no such plugin: {}".format(plugin_name)) |
c87f23fa SM |
106 | |
107 | if component_class_name in plugin.source_component_classes: | |
108 | comp_class = plugin.source_component_classes[component_class_name] | |
109 | elif component_class_name in plugin.filter_component_classes: | |
110 | comp_class = plugin.filter_component_classes[component_class_name] | |
111 | else: | |
112 | raise KeyError( | |
f5567ea8 | 113 | "source or filter component class `{}` not found in plugin `{}`".format( |
c87f23fa SM |
114 | component_class_name, plugin_name |
115 | ) | |
116 | ) | |
117 | ||
118 | return cls(comp_class, params, obj, logging_level) | |
85dcce24 | 119 | |
e874da19 | 120 | |
f3c9a159 | 121 | class AutoSourceComponentSpec(_BaseComponentSpec): |
c87f23fa | 122 | # A component spec that does automatic source discovery. |
f3c9a159 SM |
123 | _no_obj = object() |
124 | ||
125 | def __init__(self, input, params=None, obj=_no_obj, logging_level=None): | |
126 | super().__init__(params, obj, logging_level) | |
127 | self._input = input | |
85dcce24 | 128 | |
66964f3f | 129 | @property |
f3c9a159 SM |
130 | def input(self): |
131 | return self._input | |
132 | ||
133 | ||
134 | def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): | |
135 | # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec` | |
136 | # using the automatic source discovery mechanism. | |
c345b078 | 137 | inputs = bt2_value.ArrayValue([spec.input for spec in auto_source_comp_specs]) |
f3c9a159 SM |
138 | |
139 | if plugin_set is None: | |
c345b078 | 140 | plugin_set = bt2_plugin.find_plugins() |
f3c9a159 | 141 | else: |
e5914347 | 142 | bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet) |
f3c9a159 SM |
143 | |
144 | res_ptr = native_bt.bt2_auto_discover_source_components( | |
145 | inputs._ptr, plugin_set._ptr | |
146 | ) | |
147 | ||
148 | if res_ptr is None: | |
c345b078 | 149 | raise bt2_error._MemoryError("cannot auto discover source components") |
f3c9a159 SM |
150 | |
151 | res = bt2_value._create_from_ptr(res_ptr) | |
152 | ||
c345b078 | 153 | assert type(res) is bt2_value.MapValue |
f5567ea8 | 154 | assert "status" in res |
f3c9a159 | 155 | |
f5567ea8 | 156 | status = res["status"] |
e5914347 | 157 | bt2_utils._handle_func_status(status, "cannot auto-discover source components") |
f3c9a159 SM |
158 | |
159 | comp_specs = [] | |
f5567ea8 | 160 | comp_specs_raw = res["results"] |
c345b078 | 161 | assert type(comp_specs_raw) is bt2_value.ArrayValue |
f3c9a159 | 162 | |
39b351f9 SM |
163 | used_input_indices = set() |
164 | ||
f3c9a159 | 165 | for comp_spec_raw in comp_specs_raw: |
c345b078 | 166 | assert type(comp_spec_raw) is bt2_value.ArrayValue |
f3c9a159 SM |
167 | assert len(comp_spec_raw) == 4 |
168 | ||
169 | plugin_name = comp_spec_raw[0] | |
c345b078 | 170 | assert type(plugin_name) is bt2_value.StringValue |
f3c9a159 SM |
171 | plugin_name = str(plugin_name) |
172 | ||
173 | class_name = comp_spec_raw[1] | |
c345b078 | 174 | assert type(class_name) is bt2_value.StringValue |
f3c9a159 SM |
175 | class_name = str(class_name) |
176 | ||
177 | comp_inputs = comp_spec_raw[2] | |
c345b078 | 178 | assert type(comp_inputs) is bt2_value.ArrayValue |
f3c9a159 SM |
179 | |
180 | comp_orig_indices = comp_spec_raw[3] | |
181 | assert type(comp_orig_indices) | |
182 | ||
c345b078 SM |
183 | params = bt2_value.MapValue() |
184 | logging_level = bt2_logging.LoggingLevel.NONE | |
f3c9a159 SM |
185 | obj = None |
186 | ||
187 | # Compute `params` for this component by piling up params given to all | |
188 | # AutoSourceComponentSpec objects that contributed in the instantiation | |
189 | # of this component. | |
190 | # | |
191 | # The effective log level for a component is the last one specified | |
192 | # across the AutoSourceComponentSpec that contributed in its | |
193 | # instantiation. | |
194 | for idx in comp_orig_indices: | |
195 | orig_spec = auto_source_comp_specs[idx] | |
196 | ||
197 | if orig_spec.params is not None: | |
198 | params.update(orig_spec.params) | |
199 | ||
200 | if orig_spec.logging_level is not None: | |
201 | logging_level = orig_spec.logging_level | |
202 | ||
203 | if orig_spec.obj is not AutoSourceComponentSpec._no_obj: | |
204 | obj = orig_spec.obj | |
205 | ||
39b351f9 SM |
206 | used_input_indices.add(int(idx)) |
207 | ||
f5567ea8 | 208 | params["inputs"] = comp_inputs |
f3c9a159 SM |
209 | |
210 | comp_specs.append( | |
c87f23fa | 211 | ComponentSpec.from_named_plugin_and_component_class( |
f3c9a159 SM |
212 | plugin_name, |
213 | class_name, | |
214 | params=params, | |
215 | obj=obj, | |
216 | logging_level=logging_level, | |
217 | ) | |
218 | ) | |
219 | ||
39b351f9 SM |
220 | if len(used_input_indices) != len(inputs): |
221 | unused_input_indices = set(range(len(inputs))) - used_input_indices | |
222 | unused_input_indices = sorted(unused_input_indices) | |
223 | unused_inputs = [str(inputs[x]) for x in unused_input_indices] | |
224 | ||
225 | msg = ( | |
f5567ea8 FD |
226 | "Some auto source component specs did not produce any component: " |
227 | + ", ".join(unused_inputs) | |
39b351f9 SM |
228 | ) |
229 | raise RuntimeError(msg) | |
230 | ||
f3c9a159 | 231 | return comp_specs |
66964f3f | 232 | |
85dcce24 PP |
233 | |
234 | # datetime.datetime or integral to nanoseconds | |
235 | def _get_ns(obj): | |
236 | if obj is None: | |
237 | return | |
238 | ||
239 | if isinstance(obj, numbers.Real): | |
240 | # consider that it's already in seconds | |
241 | s = obj | |
242 | elif isinstance(obj, datetime.datetime): | |
243 | # s -> ns | |
244 | s = obj.timestamp() | |
245 | else: | |
cfbd7cf3 FD |
246 | raise TypeError( |
247 | '"{}" is not an integral number or a datetime.datetime object'.format(obj) | |
248 | ) | |
85dcce24 PP |
249 | |
250 | return int(s * 1e9) | |
251 | ||
252 | ||
c1859f69 | 253 | class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): |
59225a3e | 254 | def __init__(self, config, params, msg_list): |
c1859f69 PP |
255 | assert type(msg_list) is list |
256 | self._msg_list = msg_list | |
f5567ea8 | 257 | self._add_input_port("in") |
c1859f69 PP |
258 | |
259 | def _user_graph_is_configured(self): | |
f5567ea8 | 260 | self._msg_iter = self._create_message_iterator(self._input_ports["in"]) |
c1859f69 PP |
261 | |
262 | def _user_consume(self): | |
263 | assert self._msg_list[0] is None | |
264 | self._msg_list[0] = next(self._msg_iter) | |
265 | ||
266 | ||
3fb99a22 | 267 | class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): |
cfbd7cf3 FD |
268 | def __init__( |
269 | self, | |
270 | source_component_specs, | |
271 | filter_component_specs=None, | |
272 | stream_intersection_mode=False, | |
273 | begin=None, | |
274 | end=None, | |
f3c9a159 | 275 | plugin_set=None, |
cfbd7cf3 | 276 | ): |
e5914347 | 277 | bt2_utils._check_bool(stream_intersection_mode) |
85dcce24 PP |
278 | self._stream_intersection_mode = stream_intersection_mode |
279 | self._begin_ns = _get_ns(begin) | |
280 | self._end_ns = _get_ns(end) | |
c1859f69 | 281 | self._msg_list = [None] |
3d60267b | 282 | |
f3c9a159 SM |
283 | # If a single item is provided, convert to a list. |
284 | if type(source_component_specs) in ( | |
285 | ComponentSpec, | |
286 | AutoSourceComponentSpec, | |
287 | str, | |
288 | ): | |
3d60267b PP |
289 | source_component_specs = [source_component_specs] |
290 | ||
f3c9a159 SM |
291 | # Convert any string to an AutoSourceComponentSpec. |
292 | def str_to_auto(item): | |
293 | if type(item) is str: | |
294 | item = AutoSourceComponentSpec(item) | |
295 | ||
296 | return item | |
297 | ||
298 | source_component_specs = [str_to_auto(s) for s in source_component_specs] | |
299 | ||
3d60267b PP |
300 | if type(filter_component_specs) is ComponentSpec: |
301 | filter_component_specs = [filter_component_specs] | |
302 | elif filter_component_specs is None: | |
303 | filter_component_specs = [] | |
304 | ||
f3c9a159 SM |
305 | self._validate_source_component_specs(source_component_specs) |
306 | self._validate_filter_component_specs(filter_component_specs) | |
307 | ||
308 | # Pass any `ComponentSpec` instance as-is. | |
309 | self._src_comp_specs = [ | |
310 | spec for spec in source_component_specs if type(spec) is ComponentSpec | |
311 | ] | |
312 | ||
313 | # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances. | |
314 | auto_src_comp_specs = [ | |
315 | spec | |
316 | for spec in source_component_specs | |
317 | if type(spec) is AutoSourceComponentSpec | |
318 | ] | |
319 | self._src_comp_specs += _auto_discover_source_component_specs( | |
320 | auto_src_comp_specs, plugin_set | |
321 | ) | |
322 | ||
3d60267b | 323 | self._flt_comp_specs = filter_component_specs |
85dcce24 PP |
324 | self._next_suffix = 1 |
325 | self._connect_ports = False | |
326 | ||
3d60267b | 327 | # lists of _ComponentAndSpec |
85dcce24 | 328 | self._src_comps_and_specs = [] |
3d60267b | 329 | self._flt_comps_and_specs = [] |
85dcce24 | 330 | |
85dcce24 PP |
331 | self._build_graph() |
332 | ||
30947af0 SM |
333 | def _compute_stream_intersections(self): |
334 | # Pre-compute the trimmer range to use for each port in the graph, when | |
335 | # stream intersection mode is enabled. | |
336 | self._stream_inter_port_to_range = {} | |
337 | ||
338 | for src_comp_and_spec in self._src_comps_and_specs: | |
5f2a1585 | 339 | # Query the port's component for the `babeltrace.trace-infos` |
30947af0 SM |
340 | # object which contains the range for each stream, from which we can |
341 | # compute the intersection of the streams in each trace. | |
c345b078 | 342 | query_exec = bt2_query_executor.QueryExecutor( |
3f3d89b4 | 343 | src_comp_and_spec.spec.component_class, |
f5567ea8 | 344 | "babeltrace.trace-infos", |
3f3d89b4 | 345 | src_comp_and_spec.spec.params, |
30947af0 SM |
346 | ) |
347 | trace_infos = query_exec.query() | |
348 | ||
349 | for trace_info in trace_infos: | |
350 | begin = max( | |
5f2a1585 | 351 | [ |
f5567ea8 FD |
352 | stream["range-ns"]["begin"] |
353 | for stream in trace_info["stream-infos"] | |
5f2a1585 | 354 | ] |
30947af0 SM |
355 | ) |
356 | end = min( | |
f5567ea8 | 357 | [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]] |
30947af0 SM |
358 | ) |
359 | ||
360 | # Each port associated to this trace will have this computed | |
361 | # range. | |
f5567ea8 | 362 | for stream in trace_info["stream-infos"]: |
30947af0 SM |
363 | # A port name is unique within a component, but not |
364 | # necessarily across all components. Use a component | |
365 | # and port name pair to make it unique across the graph. | |
f5567ea8 | 366 | port_name = str(stream["port-name"]) |
30947af0 SM |
367 | key = (src_comp_and_spec.comp.addr, port_name) |
368 | self._stream_inter_port_to_range[key] = (begin, end) | |
369 | ||
f3c9a159 SM |
370 | def _validate_source_component_specs(self, comp_specs): |
371 | for comp_spec in comp_specs: | |
372 | if ( | |
373 | type(comp_spec) is not ComponentSpec | |
374 | and type(comp_spec) is not AutoSourceComponentSpec | |
375 | ): | |
376 | raise TypeError( | |
377 | '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format( | |
378 | type(comp_spec) | |
379 | ) | |
380 | ) | |
381 | ||
382 | def _validate_filter_component_specs(self, comp_specs): | |
3d60267b PP |
383 | for comp_spec in comp_specs: |
384 | if type(comp_spec) is not ComponentSpec: | |
cfbd7cf3 FD |
385 | raise TypeError( |
386 | '"{}" object is not a ComponentSpec'.format(type(comp_spec)) | |
387 | ) | |
85dcce24 PP |
388 | |
389 | def __next__(self): | |
c1859f69 PP |
390 | assert self._msg_list[0] is None |
391 | self._graph.run_once() | |
392 | msg = self._msg_list[0] | |
393 | assert msg is not None | |
394 | self._msg_list[0] = None | |
395 | return msg | |
85dcce24 | 396 | |
907f2b70 | 397 | def _create_stream_intersection_trimmer(self, component, port): |
30947af0 SM |
398 | key = (component.addr, port.name) |
399 | begin, end = self._stream_inter_port_to_range[key] | |
f5567ea8 | 400 | name = "trimmer-{}-{}".format(component.name, port.name) |
85dcce24 PP |
401 | return self._create_trimmer(begin, end, name) |
402 | ||
403 | def _create_muxer(self): | |
c345b078 | 404 | plugin = bt2_plugin.find_plugin("utils") |
85dcce24 PP |
405 | |
406 | if plugin is None: | |
ce4923b0 | 407 | raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') |
85dcce24 | 408 | |
f5567ea8 | 409 | if "muxer" not in plugin.filter_component_classes: |
ce4923b0 | 410 | raise RuntimeError( |
cfbd7cf3 FD |
411 | 'cannot find "muxer" filter component class in "utils" plugin' |
412 | ) | |
85dcce24 | 413 | |
f5567ea8 FD |
414 | comp_cls = plugin.filter_component_classes["muxer"] |
415 | return self._graph.add_component(comp_cls, "muxer") | |
85dcce24 | 416 | |
907f2b70 | 417 | def _create_trimmer(self, begin_ns, end_ns, name): |
c345b078 | 418 | plugin = bt2_plugin.find_plugin("utils") |
85dcce24 PP |
419 | |
420 | if plugin is None: | |
ce4923b0 | 421 | raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') |
85dcce24 | 422 | |
f5567ea8 | 423 | if "trimmer" not in plugin.filter_component_classes: |
ce4923b0 | 424 | raise RuntimeError( |
cfbd7cf3 FD |
425 | 'cannot find "trimmer" filter component class in "utils" plugin' |
426 | ) | |
85dcce24 PP |
427 | |
428 | params = {} | |
429 | ||
907f2b70 SM |
430 | def ns_to_string(ns): |
431 | s_part = ns // 1000000000 | |
432 | ns_part = ns % 1000000000 | |
f5567ea8 | 433 | return "{}.{:09d}".format(s_part, ns_part) |
85dcce24 | 434 | |
907f2b70 | 435 | if begin_ns is not None: |
f5567ea8 | 436 | params["begin"] = ns_to_string(begin_ns) |
907f2b70 SM |
437 | |
438 | if end_ns is not None: | |
f5567ea8 | 439 | params["end"] = ns_to_string(end_ns) |
85dcce24 | 440 | |
f5567ea8 | 441 | comp_cls = plugin.filter_component_classes["trimmer"] |
85dcce24 PP |
442 | return self._graph.add_component(comp_cls, name, params) |
443 | ||
c87f23fa SM |
444 | def _get_unique_comp_name(self, comp_cls): |
445 | name = comp_cls.name | |
cfbd7cf3 FD |
446 | comps_and_specs = itertools.chain( |
447 | self._src_comps_and_specs, self._flt_comps_and_specs | |
448 | ) | |
85dcce24 | 449 | |
3d60267b | 450 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
f5567ea8 | 451 | name += "-{}".format(self._next_suffix) |
85dcce24 PP |
452 | self._next_suffix += 1 |
453 | ||
454 | return name | |
455 | ||
c87f23fa SM |
456 | def _create_comp(self, comp_spec): |
457 | comp_cls = comp_spec.component_class | |
458 | name = self._get_unique_comp_name(comp_cls) | |
cfbd7cf3 | 459 | comp = self._graph.add_component( |
66964f3f | 460 | comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level |
cfbd7cf3 | 461 | ) |
85dcce24 PP |
462 | return comp |
463 | ||
464 | def _get_free_muxer_input_port(self): | |
465 | for port in self._muxer_comp.input_ports.values(): | |
466 | if not port.is_connected: | |
467 | return port | |
468 | ||
907f2b70 | 469 | def _connect_src_comp_port(self, component, port): |
85dcce24 PP |
470 | # if this trace collection iterator is in stream intersection |
471 | # mode, we need this connection: | |
472 | # | |
473 | # port -> trimmer -> muxer | |
474 | # | |
475 | # otherwise, simply: | |
476 | # | |
477 | # port -> muxer | |
478 | if self._stream_intersection_mode: | |
907f2b70 | 479 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
f5567ea8 FD |
480 | self._graph.connect_ports(port, trimmer_comp.input_ports["in"]) |
481 | port_to_muxer = trimmer_comp.output_ports["out"] | |
85dcce24 PP |
482 | else: |
483 | port_to_muxer = port | |
484 | ||
485 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
486 | ||
907f2b70 | 487 | def _graph_port_added(self, component, port): |
85dcce24 PP |
488 | if not self._connect_ports: |
489 | return | |
490 | ||
5813b3a3 | 491 | if type(port) is bt2_port._InputPortConst: |
85dcce24 PP |
492 | return |
493 | ||
907f2b70 | 494 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
85dcce24 PP |
495 | # do not care about non-source components (muxer, trimmer, etc.) |
496 | return | |
497 | ||
907f2b70 | 498 | self._connect_src_comp_port(component, port) |
85dcce24 | 499 | |
2080bf80 | 500 | def _get_greatest_operative_mip_version(self): |
c87f23fa | 501 | def append_comp_specs_descriptors(descriptors, comp_specs): |
2080bf80 | 502 | for comp_spec in comp_specs: |
2080bf80 | 503 | descriptors.append( |
c345b078 | 504 | bt2_component_descriptor.ComponentDescriptor( |
c87f23fa SM |
505 | comp_spec.component_class, comp_spec.params, comp_spec.obj |
506 | ) | |
2080bf80 PP |
507 | ) |
508 | ||
509 | descriptors = [] | |
c87f23fa SM |
510 | append_comp_specs_descriptors(descriptors, self._src_comp_specs) |
511 | append_comp_specs_descriptors(descriptors, self._flt_comp_specs) | |
2080bf80 PP |
512 | |
513 | if self._stream_intersection_mode: | |
514 | # we also need at least one `flt.utils.trimmer` component | |
c87f23fa | 515 | comp_spec = ComponentSpec.from_named_plugin_and_component_class( |
f5567ea8 | 516 | "utils", "trimmer" |
c87f23fa SM |
517 | ) |
518 | append_comp_specs_descriptors(descriptors, [comp_spec]) | |
2080bf80 | 519 | |
c345b078 | 520 | mip_version = bt2_mip.get_greatest_operative_mip_version(descriptors) |
2080bf80 PP |
521 | |
522 | if mip_version is None: | |
f5567ea8 | 523 | msg = "failed to find an operative message interchange protocol version (components are not interoperable)" |
2080bf80 PP |
524 | raise RuntimeError(msg) |
525 | ||
526 | return mip_version | |
527 | ||
85dcce24 | 528 | def _build_graph(self): |
c345b078 | 529 | self._graph = bt2_graph.Graph(self._get_greatest_operative_mip_version()) |
907f2b70 | 530 | self._graph.add_port_added_listener(self._graph_port_added) |
85dcce24 PP |
531 | self._muxer_comp = self._create_muxer() |
532 | ||
533 | if self._begin_ns is not None or self._end_ns is not None: | |
f5567ea8 | 534 | trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer") |
cfbd7cf3 | 535 | self._graph.connect_ports( |
f5567ea8 | 536 | self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"] |
cfbd7cf3 | 537 | ) |
f5567ea8 | 538 | last_flt_out_port = trimmer_comp.output_ports["out"] |
85dcce24 | 539 | else: |
f5567ea8 | 540 | last_flt_out_port = self._muxer_comp.output_ports["out"] |
85dcce24 | 541 | |
3d60267b PP |
542 | # create extra filter components (chained) |
543 | for comp_spec in self._flt_comp_specs: | |
c87f23fa | 544 | comp = self._create_comp(comp_spec) |
3d60267b PP |
545 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
546 | ||
547 | # connect the extra filter chain | |
548 | for comp_and_spec in self._flt_comps_and_specs: | |
549 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
550 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
c1859f69 PP |
551 | self._graph.connect_ports(last_flt_out_port, in_port) |
552 | last_flt_out_port = out_port | |
3d60267b | 553 | |
85dcce24 PP |
554 | # Here we create the components, self._graph_port_added() is |
555 | # called when they add ports, but the callback returns early | |
556 | # because self._connect_ports is False. This is because the | |
557 | # self._graph_port_added() could not find the associated source | |
558 | # component specification in self._src_comps_and_specs because | |
559 | # it does not exist yet (it needs the created component to | |
560 | # exist). | |
561 | for comp_spec in self._src_comp_specs: | |
c87f23fa | 562 | comp = self._create_comp(comp_spec) |
3d60267b | 563 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
85dcce24 | 564 | |
30947af0 SM |
565 | if self._stream_intersection_mode: |
566 | self._compute_stream_intersections() | |
567 | ||
85dcce24 PP |
568 | # Now we connect the ports which exist at this point. We allow |
569 | # self._graph_port_added() to automatically connect _new_ ports. | |
570 | self._connect_ports = True | |
571 | ||
572 | for comp_and_spec in self._src_comps_and_specs: | |
573 | # Keep a separate list because comp_and_spec.output_ports | |
574 | # could change during the connection of one of its ports. | |
575 | # Any new port is handled by self._graph_port_added(). | |
576 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
577 | ||
578 | for out_port in out_ports: | |
907f2b70 | 579 | if out_port.is_connected: |
85dcce24 PP |
580 | continue |
581 | ||
907f2b70 | 582 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
85dcce24 | 583 | |
c1859f69 PP |
584 | # Add the proxy sink, passing our message list to share consumed |
585 | # messages with this trace collection message iterator. | |
586 | sink = self._graph.add_component( | |
f5567ea8 | 587 | _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list |
c1859f69 | 588 | ) |
f5567ea8 | 589 | sink_in_port = sink.input_ports["in"] |
c1859f69 PP |
590 | |
591 | # connect last filter to proxy sink | |
592 | self._graph.connect_ports(last_flt_out_port, sink_in_port) |