Commit | Line | Data |
---|---|---|
0235b0db | 1 | # SPDX-License-Identifier: MIT |
85dcce24 PP |
2 | # |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
85dcce24 | 4 | |
f3c9a159 | 5 | from bt2 import utils, native_bt |
85dcce24 | 6 | import bt2 |
3d60267b | 7 | import itertools |
3fb99a22 | 8 | from bt2 import message_iterator as bt2_message_iterator |
3fb99a22 | 9 | from bt2 import port as bt2_port |
c1859f69 | 10 | from bt2 import component as bt2_component |
f3c9a159 SM |
11 | from bt2 import value as bt2_value |
12 | from bt2 import plugin as bt2_plugin | |
85dcce24 | 13 | import datetime |
85dcce24 PP |
14 | from collections import namedtuple |
15 | import numbers | |
16 | ||
17 | ||
3d60267b PP |
18 | # a pair of component and ComponentSpec |
19 | _ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) | |
85dcce24 PP |
20 | |
21 | ||
f3c9a159 | 22 | class _BaseComponentSpec: |
c87f23fa SM |
23 | # Base for any component spec that can be passed to |
24 | # TraceCollectionMessageIterator. | |
f3c9a159 SM |
25 | def __init__(self, params, obj, logging_level): |
26 | if logging_level is not None: | |
27 | utils._check_log_level(logging_level) | |
28 | ||
29 | self._params = bt2.create_value(params) | |
30 | self._obj = obj | |
31 | self._logging_level = logging_level | |
32 | ||
33 | @property | |
34 | def params(self): | |
35 | return self._params | |
36 | ||
37 | @property | |
38 | def obj(self): | |
39 | return self._obj | |
40 | ||
41 | @property | |
42 | def logging_level(self): | |
43 | return self._logging_level | |
44 | ||
45 | ||
46 | class ComponentSpec(_BaseComponentSpec): | |
c87f23fa | 47 | # A component spec with a specific component class. |
cfbd7cf3 FD |
48 | def __init__( |
49 | self, | |
c87f23fa | 50 | component_class, |
cfbd7cf3 | 51 | params=None, |
66964f3f | 52 | obj=None, |
c87f23fa | 53 | logging_level=bt2.LoggingLevel.NONE, |
cfbd7cf3 | 54 | ): |
f3c9a159 SM |
55 | if type(params) is str: |
56 | params = {'inputs': [params]} | |
57 | ||
58 | super().__init__(params, obj, logging_level) | |
59 | ||
c87f23fa | 60 | is_cc_object = isinstance( |
615238be FD |
61 | component_class, |
62 | (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst), | |
c87f23fa SM |
63 | ) |
64 | is_user_cc_type = isinstance( | |
65 | component_class, bt2_component._UserComponentType | |
66 | ) and issubclass( | |
67 | component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent) | |
68 | ) | |
f3c9a159 | 69 | |
c87f23fa SM |
70 | if not is_cc_object and not is_user_cc_type: |
71 | raise TypeError( | |
72 | "'{}' is not a source or filter component class".format( | |
73 | component_class.__class__.__name__ | |
74 | ) | |
75 | ) | |
85dcce24 | 76 | |
c87f23fa | 77 | self._component_class = component_class |
85dcce24 PP |
78 | |
79 | @property | |
c87f23fa SM |
80 | def component_class(self): |
81 | return self._component_class | |
82 | ||
83 | @classmethod | |
84 | def from_named_plugin_and_component_class( | |
85 | cls, | |
86 | plugin_name, | |
87 | component_class_name, | |
88 | params=None, | |
89 | obj=None, | |
90 | logging_level=bt2.LoggingLevel.NONE, | |
91 | ): | |
92 | plugin = bt2.find_plugin(plugin_name) | |
93 | ||
94 | if plugin is None: | |
95 | raise ValueError('no such plugin: {}'.format(plugin_name)) | |
96 | ||
97 | if component_class_name in plugin.source_component_classes: | |
98 | comp_class = plugin.source_component_classes[component_class_name] | |
99 | elif component_class_name in plugin.filter_component_classes: | |
100 | comp_class = plugin.filter_component_classes[component_class_name] | |
101 | else: | |
102 | raise KeyError( | |
103 | 'source or filter component class `{}` not found in plugin `{}`'.format( | |
104 | component_class_name, plugin_name | |
105 | ) | |
106 | ) | |
107 | ||
108 | return cls(comp_class, params, obj, logging_level) | |
85dcce24 | 109 | |
e874da19 | 110 | |
f3c9a159 | 111 | class AutoSourceComponentSpec(_BaseComponentSpec): |
c87f23fa | 112 | # A component spec that does automatic source discovery. |
f3c9a159 SM |
113 | _no_obj = object() |
114 | ||
115 | def __init__(self, input, params=None, obj=_no_obj, logging_level=None): | |
116 | super().__init__(params, obj, logging_level) | |
117 | self._input = input | |
85dcce24 | 118 | |
66964f3f | 119 | @property |
f3c9a159 SM |
120 | def input(self): |
121 | return self._input | |
122 | ||
123 | ||
124 | def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): | |
125 | # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec` | |
126 | # using the automatic source discovery mechanism. | |
127 | inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs]) | |
128 | ||
129 | if plugin_set is None: | |
130 | plugin_set = bt2.find_plugins() | |
131 | else: | |
132 | utils._check_type(plugin_set, bt2_plugin._PluginSet) | |
133 | ||
134 | res_ptr = native_bt.bt2_auto_discover_source_components( | |
135 | inputs._ptr, plugin_set._ptr | |
136 | ) | |
137 | ||
138 | if res_ptr is None: | |
139 | raise bt2._MemoryError('cannot auto discover source components') | |
140 | ||
141 | res = bt2_value._create_from_ptr(res_ptr) | |
142 | ||
143 | assert type(res) == bt2.MapValue | |
144 | assert 'status' in res | |
145 | ||
146 | status = res['status'] | |
147 | utils._handle_func_status(status, 'cannot auto-discover source components') | |
148 | ||
149 | comp_specs = [] | |
150 | comp_specs_raw = res['results'] | |
151 | assert type(comp_specs_raw) == bt2.ArrayValue | |
152 | ||
39b351f9 SM |
153 | used_input_indices = set() |
154 | ||
f3c9a159 SM |
155 | for comp_spec_raw in comp_specs_raw: |
156 | assert type(comp_spec_raw) == bt2.ArrayValue | |
157 | assert len(comp_spec_raw) == 4 | |
158 | ||
159 | plugin_name = comp_spec_raw[0] | |
160 | assert type(plugin_name) == bt2.StringValue | |
161 | plugin_name = str(plugin_name) | |
162 | ||
163 | class_name = comp_spec_raw[1] | |
164 | assert type(class_name) == bt2.StringValue | |
165 | class_name = str(class_name) | |
166 | ||
167 | comp_inputs = comp_spec_raw[2] | |
168 | assert type(comp_inputs) == bt2.ArrayValue | |
169 | ||
170 | comp_orig_indices = comp_spec_raw[3] | |
171 | assert type(comp_orig_indices) | |
172 | ||
173 | params = bt2.MapValue() | |
174 | logging_level = bt2.LoggingLevel.NONE | |
175 | obj = None | |
176 | ||
177 | # Compute `params` for this component by piling up params given to all | |
178 | # AutoSourceComponentSpec objects that contributed in the instantiation | |
179 | # of this component. | |
180 | # | |
181 | # The effective log level for a component is the last one specified | |
182 | # across the AutoSourceComponentSpec that contributed in its | |
183 | # instantiation. | |
184 | for idx in comp_orig_indices: | |
185 | orig_spec = auto_source_comp_specs[idx] | |
186 | ||
187 | if orig_spec.params is not None: | |
188 | params.update(orig_spec.params) | |
189 | ||
190 | if orig_spec.logging_level is not None: | |
191 | logging_level = orig_spec.logging_level | |
192 | ||
193 | if orig_spec.obj is not AutoSourceComponentSpec._no_obj: | |
194 | obj = orig_spec.obj | |
195 | ||
39b351f9 SM |
196 | used_input_indices.add(int(idx)) |
197 | ||
f3c9a159 SM |
198 | params['inputs'] = comp_inputs |
199 | ||
200 | comp_specs.append( | |
c87f23fa | 201 | ComponentSpec.from_named_plugin_and_component_class( |
f3c9a159 SM |
202 | plugin_name, |
203 | class_name, | |
204 | params=params, | |
205 | obj=obj, | |
206 | logging_level=logging_level, | |
207 | ) | |
208 | ) | |
209 | ||
39b351f9 SM |
210 | if len(used_input_indices) != len(inputs): |
211 | unused_input_indices = set(range(len(inputs))) - used_input_indices | |
212 | unused_input_indices = sorted(unused_input_indices) | |
213 | unused_inputs = [str(inputs[x]) for x in unused_input_indices] | |
214 | ||
215 | msg = ( | |
216 | 'Some auto source component specs did not produce any component: ' | |
217 | + ', '.join(unused_inputs) | |
218 | ) | |
219 | raise RuntimeError(msg) | |
220 | ||
f3c9a159 | 221 | return comp_specs |
66964f3f | 222 | |
85dcce24 PP |
223 | |
224 | # datetime.datetime or integral to nanoseconds | |
225 | def _get_ns(obj): | |
226 | if obj is None: | |
227 | return | |
228 | ||
229 | if isinstance(obj, numbers.Real): | |
230 | # consider that it's already in seconds | |
231 | s = obj | |
232 | elif isinstance(obj, datetime.datetime): | |
233 | # s -> ns | |
234 | s = obj.timestamp() | |
235 | else: | |
cfbd7cf3 FD |
236 | raise TypeError( |
237 | '"{}" is not an integral number or a datetime.datetime object'.format(obj) | |
238 | ) | |
85dcce24 PP |
239 | |
240 | return int(s * 1e9) | |
241 | ||
242 | ||
c1859f69 | 243 | class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): |
59225a3e | 244 | def __init__(self, config, params, msg_list): |
c1859f69 PP |
245 | assert type(msg_list) is list |
246 | self._msg_list = msg_list | |
247 | self._add_input_port('in') | |
248 | ||
249 | def _user_graph_is_configured(self): | |
9a2c8b8e | 250 | self._msg_iter = self._create_message_iterator(self._input_ports['in']) |
c1859f69 PP |
251 | |
252 | def _user_consume(self): | |
253 | assert self._msg_list[0] is None | |
254 | self._msg_list[0] = next(self._msg_iter) | |
255 | ||
256 | ||
3fb99a22 | 257 | class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): |
cfbd7cf3 FD |
258 | def __init__( |
259 | self, | |
260 | source_component_specs, | |
261 | filter_component_specs=None, | |
262 | stream_intersection_mode=False, | |
263 | begin=None, | |
264 | end=None, | |
f3c9a159 | 265 | plugin_set=None, |
cfbd7cf3 | 266 | ): |
85dcce24 PP |
267 | utils._check_bool(stream_intersection_mode) |
268 | self._stream_intersection_mode = stream_intersection_mode | |
269 | self._begin_ns = _get_ns(begin) | |
270 | self._end_ns = _get_ns(end) | |
c1859f69 | 271 | self._msg_list = [None] |
3d60267b | 272 | |
f3c9a159 SM |
273 | # If a single item is provided, convert to a list. |
274 | if type(source_component_specs) in ( | |
275 | ComponentSpec, | |
276 | AutoSourceComponentSpec, | |
277 | str, | |
278 | ): | |
3d60267b PP |
279 | source_component_specs = [source_component_specs] |
280 | ||
f3c9a159 SM |
281 | # Convert any string to an AutoSourceComponentSpec. |
282 | def str_to_auto(item): | |
283 | if type(item) is str: | |
284 | item = AutoSourceComponentSpec(item) | |
285 | ||
286 | return item | |
287 | ||
288 | source_component_specs = [str_to_auto(s) for s in source_component_specs] | |
289 | ||
3d60267b PP |
290 | if type(filter_component_specs) is ComponentSpec: |
291 | filter_component_specs = [filter_component_specs] | |
292 | elif filter_component_specs is None: | |
293 | filter_component_specs = [] | |
294 | ||
f3c9a159 SM |
295 | self._validate_source_component_specs(source_component_specs) |
296 | self._validate_filter_component_specs(filter_component_specs) | |
297 | ||
298 | # Pass any `ComponentSpec` instance as-is. | |
299 | self._src_comp_specs = [ | |
300 | spec for spec in source_component_specs if type(spec) is ComponentSpec | |
301 | ] | |
302 | ||
303 | # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances. | |
304 | auto_src_comp_specs = [ | |
305 | spec | |
306 | for spec in source_component_specs | |
307 | if type(spec) is AutoSourceComponentSpec | |
308 | ] | |
309 | self._src_comp_specs += _auto_discover_source_component_specs( | |
310 | auto_src_comp_specs, plugin_set | |
311 | ) | |
312 | ||
3d60267b | 313 | self._flt_comp_specs = filter_component_specs |
85dcce24 PP |
314 | self._next_suffix = 1 |
315 | self._connect_ports = False | |
316 | ||
3d60267b | 317 | # lists of _ComponentAndSpec |
85dcce24 | 318 | self._src_comps_and_specs = [] |
3d60267b | 319 | self._flt_comps_and_specs = [] |
85dcce24 | 320 | |
85dcce24 PP |
321 | self._build_graph() |
322 | ||
30947af0 SM |
323 | def _compute_stream_intersections(self): |
324 | # Pre-compute the trimmer range to use for each port in the graph, when | |
325 | # stream intersection mode is enabled. | |
326 | self._stream_inter_port_to_range = {} | |
327 | ||
328 | for src_comp_and_spec in self._src_comps_and_specs: | |
5f2a1585 | 329 | # Query the port's component for the `babeltrace.trace-infos` |
30947af0 SM |
330 | # object which contains the range for each stream, from which we can |
331 | # compute the intersection of the streams in each trace. | |
332 | query_exec = bt2.QueryExecutor( | |
3f3d89b4 | 333 | src_comp_and_spec.spec.component_class, |
5f2a1585 | 334 | 'babeltrace.trace-infos', |
3f3d89b4 | 335 | src_comp_and_spec.spec.params, |
30947af0 SM |
336 | ) |
337 | trace_infos = query_exec.query() | |
338 | ||
339 | for trace_info in trace_infos: | |
340 | begin = max( | |
5f2a1585 SM |
341 | [ |
342 | stream['range-ns']['begin'] | |
343 | for stream in trace_info['stream-infos'] | |
344 | ] | |
30947af0 SM |
345 | ) |
346 | end = min( | |
5f2a1585 | 347 | [stream['range-ns']['end'] for stream in trace_info['stream-infos']] |
30947af0 SM |
348 | ) |
349 | ||
350 | # Each port associated to this trace will have this computed | |
351 | # range. | |
5f2a1585 | 352 | for stream in trace_info['stream-infos']: |
30947af0 SM |
353 | # A port name is unique within a component, but not |
354 | # necessarily across all components. Use a component | |
355 | # and port name pair to make it unique across the graph. | |
356 | port_name = str(stream['port-name']) | |
357 | key = (src_comp_and_spec.comp.addr, port_name) | |
358 | self._stream_inter_port_to_range[key] = (begin, end) | |
359 | ||
f3c9a159 SM |
360 | def _validate_source_component_specs(self, comp_specs): |
361 | for comp_spec in comp_specs: | |
362 | if ( | |
363 | type(comp_spec) is not ComponentSpec | |
364 | and type(comp_spec) is not AutoSourceComponentSpec | |
365 | ): | |
366 | raise TypeError( | |
367 | '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format( | |
368 | type(comp_spec) | |
369 | ) | |
370 | ) | |
371 | ||
372 | def _validate_filter_component_specs(self, comp_specs): | |
3d60267b PP |
373 | for comp_spec in comp_specs: |
374 | if type(comp_spec) is not ComponentSpec: | |
cfbd7cf3 FD |
375 | raise TypeError( |
376 | '"{}" object is not a ComponentSpec'.format(type(comp_spec)) | |
377 | ) | |
85dcce24 PP |
378 | |
379 | def __next__(self): | |
c1859f69 PP |
380 | assert self._msg_list[0] is None |
381 | self._graph.run_once() | |
382 | msg = self._msg_list[0] | |
383 | assert msg is not None | |
384 | self._msg_list[0] = None | |
385 | return msg | |
85dcce24 | 386 | |
907f2b70 | 387 | def _create_stream_intersection_trimmer(self, component, port): |
30947af0 SM |
388 | key = (component.addr, port.name) |
389 | begin, end = self._stream_inter_port_to_range[key] | |
390 | name = 'trimmer-{}-{}'.format(component.name, port.name) | |
85dcce24 PP |
391 | return self._create_trimmer(begin, end, name) |
392 | ||
393 | def _create_muxer(self): | |
394 | plugin = bt2.find_plugin('utils') | |
395 | ||
396 | if plugin is None: | |
ce4923b0 | 397 | raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') |
85dcce24 PP |
398 | |
399 | if 'muxer' not in plugin.filter_component_classes: | |
ce4923b0 | 400 | raise RuntimeError( |
cfbd7cf3 FD |
401 | 'cannot find "muxer" filter component class in "utils" plugin' |
402 | ) | |
85dcce24 PP |
403 | |
404 | comp_cls = plugin.filter_component_classes['muxer'] | |
405 | return self._graph.add_component(comp_cls, 'muxer') | |
406 | ||
907f2b70 | 407 | def _create_trimmer(self, begin_ns, end_ns, name): |
85dcce24 PP |
408 | plugin = bt2.find_plugin('utils') |
409 | ||
410 | if plugin is None: | |
ce4923b0 | 411 | raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') |
85dcce24 PP |
412 | |
413 | if 'trimmer' not in plugin.filter_component_classes: | |
ce4923b0 | 414 | raise RuntimeError( |
cfbd7cf3 FD |
415 | 'cannot find "trimmer" filter component class in "utils" plugin' |
416 | ) | |
85dcce24 PP |
417 | |
418 | params = {} | |
419 | ||
907f2b70 SM |
420 | def ns_to_string(ns): |
421 | s_part = ns // 1000000000 | |
422 | ns_part = ns % 1000000000 | |
423 | return '{}.{:09d}'.format(s_part, ns_part) | |
85dcce24 | 424 | |
907f2b70 SM |
425 | if begin_ns is not None: |
426 | params['begin'] = ns_to_string(begin_ns) | |
427 | ||
428 | if end_ns is not None: | |
429 | params['end'] = ns_to_string(end_ns) | |
85dcce24 PP |
430 | |
431 | comp_cls = plugin.filter_component_classes['trimmer'] | |
432 | return self._graph.add_component(comp_cls, name, params) | |
433 | ||
c87f23fa SM |
434 | def _get_unique_comp_name(self, comp_cls): |
435 | name = comp_cls.name | |
cfbd7cf3 FD |
436 | comps_and_specs = itertools.chain( |
437 | self._src_comps_and_specs, self._flt_comps_and_specs | |
438 | ) | |
85dcce24 | 439 | |
3d60267b | 440 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
85dcce24 PP |
441 | name += '-{}'.format(self._next_suffix) |
442 | self._next_suffix += 1 | |
443 | ||
444 | return name | |
445 | ||
c87f23fa SM |
446 | def _create_comp(self, comp_spec): |
447 | comp_cls = comp_spec.component_class | |
448 | name = self._get_unique_comp_name(comp_cls) | |
cfbd7cf3 | 449 | comp = self._graph.add_component( |
66964f3f | 450 | comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level |
cfbd7cf3 | 451 | ) |
85dcce24 PP |
452 | return comp |
453 | ||
454 | def _get_free_muxer_input_port(self): | |
455 | for port in self._muxer_comp.input_ports.values(): | |
456 | if not port.is_connected: | |
457 | return port | |
458 | ||
907f2b70 | 459 | def _connect_src_comp_port(self, component, port): |
85dcce24 PP |
460 | # if this trace collection iterator is in stream intersection |
461 | # mode, we need this connection: | |
462 | # | |
463 | # port -> trimmer -> muxer | |
464 | # | |
465 | # otherwise, simply: | |
466 | # | |
467 | # port -> muxer | |
468 | if self._stream_intersection_mode: | |
907f2b70 | 469 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
85dcce24 PP |
470 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) |
471 | port_to_muxer = trimmer_comp.output_ports['out'] | |
472 | else: | |
473 | port_to_muxer = port | |
474 | ||
475 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
476 | ||
907f2b70 | 477 | def _graph_port_added(self, component, port): |
85dcce24 PP |
478 | if not self._connect_ports: |
479 | return | |
480 | ||
5813b3a3 | 481 | if type(port) is bt2_port._InputPortConst: |
85dcce24 PP |
482 | return |
483 | ||
907f2b70 | 484 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
85dcce24 PP |
485 | # do not care about non-source components (muxer, trimmer, etc.) |
486 | return | |
487 | ||
907f2b70 | 488 | self._connect_src_comp_port(component, port) |
85dcce24 | 489 | |
2080bf80 | 490 | def _get_greatest_operative_mip_version(self): |
c87f23fa | 491 | def append_comp_specs_descriptors(descriptors, comp_specs): |
2080bf80 | 492 | for comp_spec in comp_specs: |
2080bf80 | 493 | descriptors.append( |
c87f23fa SM |
494 | bt2.ComponentDescriptor( |
495 | comp_spec.component_class, comp_spec.params, comp_spec.obj | |
496 | ) | |
2080bf80 PP |
497 | ) |
498 | ||
499 | descriptors = [] | |
c87f23fa SM |
500 | append_comp_specs_descriptors(descriptors, self._src_comp_specs) |
501 | append_comp_specs_descriptors(descriptors, self._flt_comp_specs) | |
2080bf80 PP |
502 | |
503 | if self._stream_intersection_mode: | |
504 | # we also need at least one `flt.utils.trimmer` component | |
c87f23fa SM |
505 | comp_spec = ComponentSpec.from_named_plugin_and_component_class( |
506 | 'utils', 'trimmer' | |
507 | ) | |
508 | append_comp_specs_descriptors(descriptors, [comp_spec]) | |
2080bf80 PP |
509 | |
510 | mip_version = bt2.get_greatest_operative_mip_version(descriptors) | |
511 | ||
512 | if mip_version is None: | |
513 | msg = 'failed to find an operative message interchange protocol version (components are not interoperable)' | |
514 | raise RuntimeError(msg) | |
515 | ||
516 | return mip_version | |
517 | ||
85dcce24 | 518 | def _build_graph(self): |
2080bf80 | 519 | self._graph = bt2.Graph(self._get_greatest_operative_mip_version()) |
907f2b70 | 520 | self._graph.add_port_added_listener(self._graph_port_added) |
85dcce24 PP |
521 | self._muxer_comp = self._create_muxer() |
522 | ||
523 | if self._begin_ns is not None or self._end_ns is not None: | |
cfbd7cf3 FD |
524 | trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') |
525 | self._graph.connect_ports( | |
526 | self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] | |
527 | ) | |
c1859f69 | 528 | last_flt_out_port = trimmer_comp.output_ports['out'] |
85dcce24 | 529 | else: |
c1859f69 | 530 | last_flt_out_port = self._muxer_comp.output_ports['out'] |
85dcce24 | 531 | |
3d60267b PP |
532 | # create extra filter components (chained) |
533 | for comp_spec in self._flt_comp_specs: | |
c87f23fa | 534 | comp = self._create_comp(comp_spec) |
3d60267b PP |
535 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
536 | ||
537 | # connect the extra filter chain | |
538 | for comp_and_spec in self._flt_comps_and_specs: | |
539 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
540 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
c1859f69 PP |
541 | self._graph.connect_ports(last_flt_out_port, in_port) |
542 | last_flt_out_port = out_port | |
3d60267b | 543 | |
85dcce24 PP |
544 | # Here we create the components, self._graph_port_added() is |
545 | # called when they add ports, but the callback returns early | |
546 | # because self._connect_ports is False. This is because the | |
547 | # self._graph_port_added() could not find the associated source | |
548 | # component specification in self._src_comps_and_specs because | |
549 | # it does not exist yet (it needs the created component to | |
550 | # exist). | |
551 | for comp_spec in self._src_comp_specs: | |
c87f23fa | 552 | comp = self._create_comp(comp_spec) |
3d60267b | 553 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) |
85dcce24 | 554 | |
30947af0 SM |
555 | if self._stream_intersection_mode: |
556 | self._compute_stream_intersections() | |
557 | ||
85dcce24 PP |
558 | # Now we connect the ports which exist at this point. We allow |
559 | # self._graph_port_added() to automatically connect _new_ ports. | |
560 | self._connect_ports = True | |
561 | ||
562 | for comp_and_spec in self._src_comps_and_specs: | |
563 | # Keep a separate list because comp_and_spec.output_ports | |
564 | # could change during the connection of one of its ports. | |
565 | # Any new port is handled by self._graph_port_added(). | |
566 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
567 | ||
568 | for out_port in out_ports: | |
907f2b70 | 569 | if out_port.is_connected: |
85dcce24 PP |
570 | continue |
571 | ||
907f2b70 | 572 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
85dcce24 | 573 | |
c1859f69 PP |
574 | # Add the proxy sink, passing our message list to share consumed |
575 | # messages with this trace collection message iterator. | |
576 | sink = self._graph.add_component( | |
577 | _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list | |
578 | ) | |
579 | sink_in_port = sink.input_ports['in'] | |
580 | ||
581 | # connect last filter to proxy sink | |
582 | self._graph.connect_ports(last_flt_out_port, sink_in_port) |