Commit | Line | Data |
---|---|---|
85dcce24 PP |
1 | # The MIT License (MIT) |
2 | # | |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
4 | # | |
5 | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
6 | # of this software and associated documentation files (the "Software"), to deal | |
7 | # in the Software without restriction, including without limitation the rights | |
8 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
9 | # copies of the Software, and to permit persons to whom the Software is | |
10 | # furnished to do so, subject to the following conditions: | |
11 | # | |
12 | # The above copyright notice and this permission notice shall be included in | |
13 | # all copies or substantial portions of the Software. | |
14 | # | |
15 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
16 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
17 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
18 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
19 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
21 | # THE SOFTWARE. | |
22 | ||
f3c9a159 | 23 | from bt2 import utils, native_bt |
85dcce24 | 24 | import bt2 |
3d60267b | 25 | import itertools |
3fb99a22 PP |
26 | from bt2 import message_iterator as bt2_message_iterator |
27 | from bt2 import logging as bt2_logging | |
28 | from bt2 import port as bt2_port | |
c1859f69 | 29 | from bt2 import component as bt2_component |
f3c9a159 SM |
30 | from bt2 import value as bt2_value |
31 | from bt2 import plugin as bt2_plugin | |
85dcce24 | 32 | import datetime |
85dcce24 PP |
33 | from collections import namedtuple |
34 | import numbers | |
35 | ||
36 | ||
3d60267b PP |
37 | # a pair of component and ComponentSpec |
38 | _ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) | |
85dcce24 PP |
39 | |
40 | ||
f3c9a159 | 41 | class _BaseComponentSpec: |
c87f23fa SM |
42 | # Base for any component spec that can be passed to |
43 | # TraceCollectionMessageIterator. | |
f3c9a159 SM |
44 | def __init__(self, params, obj, logging_level): |
45 | if logging_level is not None: | |
46 | utils._check_log_level(logging_level) | |
47 | ||
48 | self._params = bt2.create_value(params) | |
49 | self._obj = obj | |
50 | self._logging_level = logging_level | |
51 | ||
52 | @property | |
53 | def params(self): | |
54 | return self._params | |
55 | ||
56 | @property | |
57 | def obj(self): | |
58 | return self._obj | |
59 | ||
60 | @property | |
61 | def logging_level(self): | |
62 | return self._logging_level | |
63 | ||
64 | ||
65 | class ComponentSpec(_BaseComponentSpec): | |
c87f23fa | 66 | # A component spec with a specific component class. |
cfbd7cf3 FD |
67 | def __init__( |
68 | self, | |
c87f23fa | 69 | component_class, |
cfbd7cf3 | 70 | params=None, |
66964f3f | 71 | obj=None, |
c87f23fa | 72 | logging_level=bt2.LoggingLevel.NONE, |
cfbd7cf3 | 73 | ): |
f3c9a159 SM |
74 | if type(params) is str: |
75 | params = {'inputs': [params]} | |
76 | ||
77 | super().__init__(params, obj, logging_level) | |
78 | ||
c87f23fa SM |
79 | is_cc_object = isinstance( |
80 | component_class, (bt2._SourceComponentClass, bt2._FilterComponentClass) | |
81 | ) | |
82 | is_user_cc_type = isinstance( | |
83 | component_class, bt2_component._UserComponentType | |
84 | ) and issubclass( | |
85 | component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent) | |
86 | ) | |
f3c9a159 | 87 | |
c87f23fa SM |
88 | if not is_cc_object and not is_user_cc_type: |
89 | raise TypeError( | |
90 | "'{}' is not a source or filter component class".format( | |
91 | component_class.__class__.__name__ | |
92 | ) | |
93 | ) | |
85dcce24 | 94 | |
c87f23fa | 95 | self._component_class = component_class |
85dcce24 PP |
96 | |
97 | @property | |
c87f23fa SM |
98 | def component_class(self): |
99 | return self._component_class | |
100 | ||
101 | @classmethod | |
102 | def from_named_plugin_and_component_class( | |
103 | cls, | |
104 | plugin_name, | |
105 | component_class_name, | |
106 | params=None, | |
107 | obj=None, | |
108 | logging_level=bt2.LoggingLevel.NONE, | |
109 | ): | |
110 | plugin = bt2.find_plugin(plugin_name) | |
111 | ||
112 | if plugin is None: | |
113 | raise ValueError('no such plugin: {}'.format(plugin_name)) | |
114 | ||
115 | if component_class_name in plugin.source_component_classes: | |
116 | comp_class = plugin.source_component_classes[component_class_name] | |
117 | elif component_class_name in plugin.filter_component_classes: | |
118 | comp_class = plugin.filter_component_classes[component_class_name] | |
119 | else: | |
120 | raise KeyError( | |
121 | 'source or filter component class `{}` not found in plugin `{}`'.format( | |
122 | component_class_name, plugin_name | |
123 | ) | |
124 | ) | |
125 | ||
126 | return cls(comp_class, params, obj, logging_level) | |
85dcce24 | 127 | |
e874da19 | 128 | |
f3c9a159 | 129 | class AutoSourceComponentSpec(_BaseComponentSpec): |
c87f23fa | 130 | # A component spec that does automatic source discovery. |
f3c9a159 SM |
131 | _no_obj = object() |
132 | ||
133 | def __init__(self, input, params=None, obj=_no_obj, logging_level=None): | |
134 | super().__init__(params, obj, logging_level) | |
135 | self._input = input | |
85dcce24 | 136 | |
66964f3f | 137 | @property |
f3c9a159 SM |
138 | def input(self): |
139 | return self._input | |
140 | ||
141 | ||
142 | def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): | |
143 | # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec` | |
144 | # using the automatic source discovery mechanism. | |
145 | inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs]) | |
146 | ||
147 | if plugin_set is None: | |
148 | plugin_set = bt2.find_plugins() | |
149 | else: | |
150 | utils._check_type(plugin_set, bt2_plugin._PluginSet) | |
151 | ||
152 | res_ptr = native_bt.bt2_auto_discover_source_components( | |
153 | inputs._ptr, plugin_set._ptr | |
154 | ) | |
155 | ||
156 | if res_ptr is None: | |
157 | raise bt2._MemoryError('cannot auto discover source components') | |
158 | ||
159 | res = bt2_value._create_from_ptr(res_ptr) | |
160 | ||
161 | assert type(res) == bt2.MapValue | |
162 | assert 'status' in res | |
163 | ||
164 | status = res['status'] | |
165 | utils._handle_func_status(status, 'cannot auto-discover source components') | |
166 | ||
167 | comp_specs = [] | |
168 | comp_specs_raw = res['results'] | |
169 | assert type(comp_specs_raw) == bt2.ArrayValue | |
170 | ||
39b351f9 SM |
171 | used_input_indices = set() |
172 | ||
f3c9a159 SM |
173 | for comp_spec_raw in comp_specs_raw: |
174 | assert type(comp_spec_raw) == bt2.ArrayValue | |
175 | assert len(comp_spec_raw) == 4 | |
176 | ||
177 | plugin_name = comp_spec_raw[0] | |
178 | assert type(plugin_name) == bt2.StringValue | |
179 | plugin_name = str(plugin_name) | |
180 | ||
181 | class_name = comp_spec_raw[1] | |
182 | assert type(class_name) == bt2.StringValue | |
183 | class_name = str(class_name) | |
184 | ||
185 | comp_inputs = comp_spec_raw[2] | |
186 | assert type(comp_inputs) == bt2.ArrayValue | |
187 | ||
188 | comp_orig_indices = comp_spec_raw[3] | |
189 | assert type(comp_orig_indices) | |
190 | ||
191 | params = bt2.MapValue() | |
192 | logging_level = bt2.LoggingLevel.NONE | |
193 | obj = None | |
194 | ||
195 | # Compute `params` for this component by piling up params given to all | |
196 | # AutoSourceComponentSpec objects that contributed in the instantiation | |
197 | # of this component. | |
198 | # | |
199 | # The effective log level for a component is the last one specified | |
200 | # across the AutoSourceComponentSpec that contributed in its | |
201 | # instantiation. | |
202 | for idx in comp_orig_indices: | |
203 | orig_spec = auto_source_comp_specs[idx] | |
204 | ||
205 | if orig_spec.params is not None: | |
206 | params.update(orig_spec.params) | |
207 | ||
208 | if orig_spec.logging_level is not None: | |
209 | logging_level = orig_spec.logging_level | |
210 | ||
211 | if orig_spec.obj is not AutoSourceComponentSpec._no_obj: | |
212 | obj = orig_spec.obj | |
213 | ||
39b351f9 SM |
214 | used_input_indices.add(int(idx)) |
215 | ||
f3c9a159 SM |
216 | params['inputs'] = comp_inputs |
217 | ||
218 | comp_specs.append( | |
c87f23fa | 219 | ComponentSpec.from_named_plugin_and_component_class( |
f3c9a159 SM |
220 | plugin_name, |
221 | class_name, | |
222 | params=params, | |
223 | obj=obj, | |
224 | logging_level=logging_level, | |
225 | ) | |
226 | ) | |
227 | ||
39b351f9 SM |
228 | if len(used_input_indices) != len(inputs): |
229 | unused_input_indices = set(range(len(inputs))) - used_input_indices | |
230 | unused_input_indices = sorted(unused_input_indices) | |
231 | unused_inputs = [str(inputs[x]) for x in unused_input_indices] | |
232 | ||
233 | msg = ( | |
234 | 'Some auto source component specs did not produce any component: ' | |
235 | + ', '.join(unused_inputs) | |
236 | ) | |
237 | raise RuntimeError(msg) | |
238 | ||
f3c9a159 | 239 | return comp_specs |
66964f3f | 240 | |
85dcce24 PP |
241 | |
242 | # datetime.datetime or integral to nanoseconds | |
243 | def _get_ns(obj): | |
244 | if obj is None: | |
245 | return | |
246 | ||
247 | if isinstance(obj, numbers.Real): | |
248 | # consider that it's already in seconds | |
249 | s = obj | |
250 | elif isinstance(obj, datetime.datetime): | |
251 | # s -> ns | |
252 | s = obj.timestamp() | |
253 | else: | |
cfbd7cf3 FD |
254 | raise TypeError( |
255 | '"{}" is not an integral number or a datetime.datetime object'.format(obj) | |
256 | ) | |
85dcce24 PP |
257 | |
258 | return int(s * 1e9) | |
259 | ||
260 | ||
c1859f69 PP |
261 | class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): |
262 | def __init__(self, params, msg_list): | |
263 | assert type(msg_list) is list | |
264 | self._msg_list = msg_list | |
265 | self._add_input_port('in') | |
266 | ||
267 | def _user_graph_is_configured(self): | |
268 | self._msg_iter = self._create_input_port_message_iterator( | |
269 | self._input_ports['in'] | |
270 | ) | |
271 | ||
272 | def _user_consume(self): | |
273 | assert self._msg_list[0] is None | |
274 | self._msg_list[0] = next(self._msg_iter) | |
275 | ||
276 | ||
3fb99a22 | 277 | class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): |
cfbd7cf3 FD |
278 | def __init__( |
279 | self, | |
280 | source_component_specs, | |
281 | filter_component_specs=None, | |
282 | stream_intersection_mode=False, | |
283 | begin=None, | |
284 | end=None, | |
f3c9a159 | 285 | plugin_set=None, |
cfbd7cf3 | 286 | ): |
85dcce24 PP |
287 | utils._check_bool(stream_intersection_mode) |
288 | self._stream_intersection_mode = stream_intersection_mode | |
289 | self._begin_ns = _get_ns(begin) | |
290 | self._end_ns = _get_ns(end) | |
c1859f69 | 291 | self._msg_list = [None] |
3d60267b | 292 | |
f3c9a159 SM |
293 | # If a single item is provided, convert to a list. |
294 | if type(source_component_specs) in ( | |
295 | ComponentSpec, | |
296 | AutoSourceComponentSpec, | |
297 | str, | |
298 | ): | |
3d60267b PP |
299 | source_component_specs = [source_component_specs] |
300 | ||
f3c9a159 SM |
301 | # Convert any string to an AutoSourceComponentSpec. |
302 | def str_to_auto(item): | |
303 | if type(item) is str: | |
304 | item = AutoSourceComponentSpec(item) | |
305 | ||
306 | return item | |
307 | ||
308 | source_component_specs = [str_to_auto(s) for s in source_component_specs] | |
309 | ||
3d60267b PP |
310 | if type(filter_component_specs) is ComponentSpec: |
311 | filter_component_specs = [filter_component_specs] | |
312 | elif filter_component_specs is None: | |
313 | filter_component_specs = [] | |
314 | ||
f3c9a159 SM |
315 | self._validate_source_component_specs(source_component_specs) |
316 | self._validate_filter_component_specs(filter_component_specs) | |
317 | ||
318 | # Pass any `ComponentSpec` instance as-is. | |
319 | self._src_comp_specs = [ | |
320 | spec for spec in source_component_specs if type(spec) is ComponentSpec | |
321 | ] | |
322 | ||
323 | # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances. | |
324 | auto_src_comp_specs = [ | |
325 | spec | |
326 | for spec in source_component_specs | |
327 | if type(spec) is AutoSourceComponentSpec | |
328 | ] | |
329 | self._src_comp_specs += _auto_discover_source_component_specs( | |
330 | auto_src_comp_specs, plugin_set | |
331 | ) | |
332 | ||
3d60267b | 333 | self._flt_comp_specs = filter_component_specs |
85dcce24 PP |
334 | self._next_suffix = 1 |
335 | self._connect_ports = False | |
336 | ||
3d60267b | 337 | # lists of _ComponentAndSpec |
85dcce24 | 338 | self._src_comps_and_specs = [] |
3d60267b | 339 | self._flt_comps_and_specs = [] |
85dcce24 | 340 | |
85dcce24 PP |
341 | self._build_graph() |
342 | ||
f3c9a159 SM |
343 | def _validate_source_component_specs(self, comp_specs): |
344 | for comp_spec in comp_specs: | |
345 | if ( | |
346 | type(comp_spec) is not ComponentSpec | |
347 | and type(comp_spec) is not AutoSourceComponentSpec | |
348 | ): | |
349 | raise TypeError( | |
350 | '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format( | |
351 | type(comp_spec) | |
352 | ) | |
353 | ) | |
354 | ||
355 | def _validate_filter_component_specs(self, comp_specs): | |
3d60267b PP |
356 | for comp_spec in comp_specs: |
357 | if type(comp_spec) is not ComponentSpec: | |
cfbd7cf3 FD |
358 | raise TypeError( |
359 | '"{}" object is not a ComponentSpec'.format(type(comp_spec)) | |
360 | ) | |
85dcce24 PP |
361 | |
362 | def __next__(self): | |
c1859f69 PP |
363 | assert self._msg_list[0] is None |
364 | self._graph.run_once() | |
365 | msg = self._msg_list[0] | |
366 | assert msg is not None | |
367 | self._msg_list[0] = None | |
368 | return msg | |
85dcce24 | 369 | |
907f2b70 | 370 | def _create_stream_intersection_trimmer(self, component, port): |
85dcce24 | 371 | # find the original parameters specified by the user to create |
73760435 | 372 | # this port's component to get the `inputs` parameter |
85dcce24 | 373 | for src_comp_and_spec in self._src_comps_and_specs: |
907f2b70 | 374 | if component == src_comp_and_spec.comp: |
85dcce24 PP |
375 | break |
376 | ||
377 | try: | |
73760435 | 378 | inputs = src_comp_and_spec.spec.params['inputs'] |
907f2b70 | 379 | except Exception as e: |
ce4923b0 | 380 | raise ValueError( |
73760435 | 381 | 'all source components must be created with an "inputs" parameter in stream intersection mode' |
cfbd7cf3 | 382 | ) from e |
85dcce24 | 383 | |
73760435 | 384 | params = {'inputs': inputs} |
85dcce24 | 385 | |
1a29b831 PP |
386 | # query the port's component for the `babeltrace.trace-info` |
387 | # object which contains the stream intersection range for each | |
388 | # exposed trace | |
3c729b9a | 389 | query_exec = bt2.QueryExecutor( |
1a29b831 | 390 | src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params |
cfbd7cf3 | 391 | ) |
3c729b9a | 392 | trace_info_res = query_exec.query() |
85dcce24 PP |
393 | begin = None |
394 | end = None | |
395 | ||
a38d7650 | 396 | # find the trace info for this port's trace |
87b768aa PP |
397 | try: |
398 | for trace_info in trace_info_res: | |
a38d7650 SM |
399 | for stream in trace_info['streams']: |
400 | if stream['port-name'] == port.name: | |
401 | range_ns = trace_info['intersection-range-ns'] | |
402 | begin = range_ns['begin'] | |
403 | end = range_ns['end'] | |
404 | break | |
907f2b70 | 405 | except Exception: |
87b768aa | 406 | pass |
85dcce24 PP |
407 | |
408 | if begin is None or end is None: | |
ce4923b0 | 409 | raise RuntimeError( |
cfbd7cf3 FD |
410 | 'cannot find stream intersection range for port "{}"'.format(port.name) |
411 | ) | |
85dcce24 | 412 | |
907f2b70 | 413 | name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) |
85dcce24 PP |
414 | return self._create_trimmer(begin, end, name) |
415 | ||
416 | def _create_muxer(self): | |
417 | plugin = bt2.find_plugin('utils') | |
418 | ||
419 | if plugin is None: | |
ce4923b0 | 420 | raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') |
85dcce24 PP |
421 | |
422 | if 'muxer' not in plugin.filter_component_classes: | |
ce4923b0 | 423 | raise RuntimeError( |
cfbd7cf3 FD |
424 | 'cannot find "muxer" filter component class in "utils" plugin' |
425 | ) | |
85dcce24 PP |
426 | |
427 | comp_cls = plugin.filter_component_classes['muxer'] | |
428 | return self._graph.add_component(comp_cls, 'muxer') | |
429 | ||
907f2b70 | 430 | def _create_trimmer(self, begin_ns, end_ns, name): |
85dcce24 PP |
431 | plugin = bt2.find_plugin('utils') |
432 | ||
433 | if plugin is None: | |
ce4923b0 | 434 | raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') |
85dcce24 PP |
435 | |
436 | if 'trimmer' not in plugin.filter_component_classes: | |
ce4923b0 | 437 | raise RuntimeError( |
cfbd7cf3 FD |
438 | 'cannot find "trimmer" filter component class in "utils" plugin' |
439 | ) | |
85dcce24 PP |
440 | |
441 | params = {} | |
442 | ||
907f2b70 SM |
443 | def ns_to_string(ns): |
444 | s_part = ns // 1000000000 | |
445 | ns_part = ns % 1000000000 | |
446 | return '{}.{:09d}'.format(s_part, ns_part) | |
85dcce24 | 447 | |
907f2b70 SM |
448 | if begin_ns is not None: |
449 | params['begin'] = ns_to_string(begin_ns) | |
450 | ||
451 | if end_ns is not None: | |
452 | params['end'] = ns_to_string(end_ns) | |
85dcce24 PP |
453 | |
454 | comp_cls = plugin.filter_component_classes['trimmer'] | |
455 | return self._graph.add_component(comp_cls, name, params) | |
456 | ||
c87f23fa SM |
457 | def _get_unique_comp_name(self, comp_cls): |
458 | name = comp_cls.name | |
cfbd7cf3 FD |
459 | comps_and_specs = itertools.chain( |
460 | self._src_comps_and_specs, self._flt_comps_and_specs | |
461 | ) | |
85dcce24 | 462 | |
3d60267b | 463 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
85dcce24 PP |
464 | name += '-{}'.format(self._next_suffix) |
465 | self._next_suffix += 1 | |
466 | ||
467 | return name | |
468 | ||
c87f23fa SM |
469 | def _create_comp(self, comp_spec): |
470 | comp_cls = comp_spec.component_class | |
471 | name = self._get_unique_comp_name(comp_cls) | |
cfbd7cf3 | 472 | comp = self._graph.add_component( |
66964f3f | 473 | comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level |
cfbd7cf3 | 474 | ) |
85dcce24 PP |
475 | return comp |
476 | ||
477 | def _get_free_muxer_input_port(self): | |
478 | for port in self._muxer_comp.input_ports.values(): | |
479 | if not port.is_connected: | |
480 | return port | |
481 | ||
907f2b70 | 482 | def _connect_src_comp_port(self, component, port): |
85dcce24 PP |
483 | # if this trace collection iterator is in stream intersection |
484 | # mode, we need this connection: | |
485 | # | |
486 | # port -> trimmer -> muxer | |
487 | # | |
488 | # otherwise, simply: | |
489 | # | |
490 | # port -> muxer | |
491 | if self._stream_intersection_mode: | |
907f2b70 | 492 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
85dcce24 PP |
493 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) |
494 | port_to_muxer = trimmer_comp.output_ports['out'] | |
495 | else: | |
496 | port_to_muxer = port | |
497 | ||
498 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
499 | ||
907f2b70 | 500 | def _graph_port_added(self, component, port): |
85dcce24 PP |
501 | if not self._connect_ports: |
502 | return | |
503 | ||
3fb99a22 | 504 | if type(port) is bt2_port._InputPort: |
85dcce24 PP |
505 | return |
506 | ||
907f2b70 | 507 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
85dcce24 PP |
508 | # do not care about non-source components (muxer, trimmer, etc.) |
509 | return | |
510 | ||
907f2b70 | 511 | self._connect_src_comp_port(component, port) |
85dcce24 | 512 | |
2080bf80 | 513 | def _get_greatest_operative_mip_version(self): |
c87f23fa | 514 | def append_comp_specs_descriptors(descriptors, comp_specs): |
2080bf80 | 515 | for comp_spec in comp_specs: |
2080bf80 | 516 | descriptors.append( |
c87f23fa SM |
517 | bt2.ComponentDescriptor( |
518 | comp_spec.component_class, comp_spec.params, comp_spec.obj | |
519 | ) | |
2080bf80 PP |
520 | ) |
521 | ||
522 | descriptors = [] | |
c87f23fa SM |
523 | append_comp_specs_descriptors(descriptors, self._src_comp_specs) |
524 | append_comp_specs_descriptors(descriptors, self._flt_comp_specs) | |
2080bf80 PP |
525 | |
526 | if self._stream_intersection_mode: | |
527 | # we also need at least one `flt.utils.trimmer` component | |
c87f23fa SM |
528 | comp_spec = ComponentSpec.from_named_plugin_and_component_class( |
529 | 'utils', 'trimmer' | |
530 | ) | |
531 | append_comp_specs_descriptors(descriptors, [comp_spec]) | |
2080bf80 PP |
532 | |
533 | mip_version = bt2.get_greatest_operative_mip_version(descriptors) | |
534 | ||
535 | if mip_version is None: | |
536 | msg = 'failed to find an operative message interchange protocol version (components are not interoperable)' | |
537 | raise RuntimeError(msg) | |
538 | ||
539 | return mip_version | |
540 | ||
85dcce24 | 541 | def _build_graph(self): |
2080bf80 | 542 | self._graph = bt2.Graph(self._get_greatest_operative_mip_version()) |
907f2b70 | 543 | self._graph.add_port_added_listener(self._graph_port_added) |
85dcce24 PP |
544 | self._muxer_comp = self._create_muxer() |
545 | ||
546 | if self._begin_ns is not None or self._end_ns is not None: | |
cfbd7cf3 FD |
547 | trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') |
548 | self._graph.connect_ports( | |
549 | self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] | |
550 | ) | |
c1859f69 | 551 | last_flt_out_port = trimmer_comp.output_ports['out'] |
85dcce24 | 552 | else: |
c1859f69 | 553 | last_flt_out_port = self._muxer_comp.output_ports['out'] |
85dcce24 | 554 | |
3d60267b PP |
555 | # create extra filter components (chained) |
556 | for comp_spec in self._flt_comp_specs: | |
c87f23fa | 557 | comp = self._create_comp(comp_spec) |
3d60267b PP |
558 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
559 | ||
560 | # connect the extra filter chain | |
561 | for comp_and_spec in self._flt_comps_and_specs: | |
562 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
563 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
c1859f69 PP |
564 | self._graph.connect_ports(last_flt_out_port, in_port) |
565 | last_flt_out_port = out_port | |
3d60267b | 566 | |
85dcce24 PP |
567 | # Here we create the components, self._graph_port_added() is |
568 | # called when they add ports, but the callback returns early | |
569 | # because self._connect_ports is False. This is because the | |
570 | # self._graph_port_added() could not find the associated source | |
571 | # component specification in self._src_comps_and_specs because | |
572 | # it does not exist yet (it needs the created component to | |
573 | # exist). | |
574 | for comp_spec in self._src_comp_specs: | |
c87f23fa | 575 | comp = self._create_comp(comp_spec) |
3d60267b | 576 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
85dcce24 PP |
577 | |
578 | # Now we connect the ports which exist at this point. We allow | |
579 | # self._graph_port_added() to automatically connect _new_ ports. | |
580 | self._connect_ports = True | |
581 | ||
582 | for comp_and_spec in self._src_comps_and_specs: | |
583 | # Keep a separate list because comp_and_spec.output_ports | |
584 | # could change during the connection of one of its ports. | |
585 | # Any new port is handled by self._graph_port_added(). | |
586 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
587 | ||
588 | for out_port in out_ports: | |
907f2b70 | 589 | if out_port.is_connected: |
85dcce24 PP |
590 | continue |
591 | ||
907f2b70 | 592 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
85dcce24 | 593 | |
c1859f69 PP |
594 | # Add the proxy sink, passing our message list to share consumed |
595 | # messages with this trace collection message iterator. | |
596 | sink = self._graph.add_component( | |
597 | _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list | |
598 | ) | |
599 | sink_in_port = sink.input_ports['in'] | |
600 | ||
601 | # connect last filter to proxy sink | |
602 | self._graph.connect_ports(last_flt_out_port, sink_in_port) |