Commit | Line | Data |
---|---|---|
d34e69cf PP |
1 | # The MIT License (MIT) |
2 | # | |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
4 | # | |
5 | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
6 | # of this software and associated documentation files (the "Software"), to deal | |
7 | # in the Software without restriction, including without limitation the rights | |
8 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
9 | # copies of the Software, and to permit persons to whom the Software is | |
10 | # furnished to do so, subject to the following conditions: | |
11 | # | |
12 | # The above copyright notice and this permission notice shall be included in | |
13 | # all copies or substantial portions of the Software. | |
14 | # | |
15 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
16 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
17 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
18 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
19 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
21 | # THE SOFTWARE. | |
22 | ||
94e72386 | 23 | from bt2 import utils, native_bt |
d34e69cf | 24 | import bt2 |
88fdcc33 | 25 | import itertools |
c946c9de PP |
26 | from bt2 import message_iterator as bt2_message_iterator |
27 | from bt2 import logging as bt2_logging | |
28 | from bt2 import port as bt2_port | |
c535b26d | 29 | from bt2 import component as bt2_component |
94e72386 SM |
30 | from bt2 import value as bt2_value |
31 | from bt2 import plugin as bt2_plugin | |
d34e69cf | 32 | import datetime |
d34e69cf PP |
33 | from collections import namedtuple |
34 | import numbers | |
35 | ||
36 | ||
88fdcc33 PP |
37 | # a pair of component and ComponentSpec |
38 | _ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) | |
d34e69cf PP |
39 | |
40 | ||
94e72386 SM |
41 | class _BaseComponentSpec: |
42 | def __init__(self, params, obj, logging_level): | |
43 | if logging_level is not None: | |
44 | utils._check_log_level(logging_level) | |
45 | ||
46 | self._params = bt2.create_value(params) | |
47 | self._obj = obj | |
48 | self._logging_level = logging_level | |
49 | ||
50 | @property | |
51 | def params(self): | |
52 | return self._params | |
53 | ||
54 | @property | |
55 | def obj(self): | |
56 | return self._obj | |
57 | ||
58 | @property | |
59 | def logging_level(self): | |
60 | return self._logging_level | |
61 | ||
62 | ||
63 | class ComponentSpec(_BaseComponentSpec): | |
61d96b89 FD |
64 | def __init__( |
65 | self, | |
66 | plugin_name, | |
67 | class_name, | |
68 | params=None, | |
b20382e2 | 69 | obj=None, |
c946c9de | 70 | logging_level=bt2_logging.LoggingLevel.NONE, |
61d96b89 | 71 | ): |
94e72386 SM |
72 | if type(params) is str: |
73 | params = {'inputs': [params]} | |
74 | ||
75 | super().__init__(params, obj, logging_level) | |
76 | ||
d34e69cf | 77 | utils._check_str(plugin_name) |
c88be1c8 | 78 | utils._check_str(class_name) |
94e72386 | 79 | |
d34e69cf | 80 | self._plugin_name = plugin_name |
c88be1c8 | 81 | self._class_name = class_name |
d34e69cf PP |
82 | |
83 | @property | |
84 | def plugin_name(self): | |
85 | return self._plugin_name | |
86 | ||
87 | @property | |
c88be1c8 PP |
88 | def class_name(self): |
89 | return self._class_name | |
d34e69cf | 90 | |
cc81b5ab | 91 | |
94e72386 SM |
92 | class AutoSourceComponentSpec(_BaseComponentSpec): |
93 | _no_obj = object() | |
94 | ||
95 | def __init__(self, input, params=None, obj=_no_obj, logging_level=None): | |
96 | super().__init__(params, obj, logging_level) | |
97 | self._input = input | |
d34e69cf | 98 | |
b20382e2 | 99 | @property |
94e72386 SM |
100 | def input(self): |
101 | return self._input | |
102 | ||
103 | ||
104 | def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set): | |
105 | # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec` | |
106 | # using the automatic source discovery mechanism. | |
107 | inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs]) | |
108 | ||
109 | if plugin_set is None: | |
110 | plugin_set = bt2.find_plugins() | |
111 | else: | |
112 | utils._check_type(plugin_set, bt2_plugin._PluginSet) | |
113 | ||
114 | res_ptr = native_bt.bt2_auto_discover_source_components( | |
115 | inputs._ptr, plugin_set._ptr | |
116 | ) | |
117 | ||
118 | if res_ptr is None: | |
119 | raise bt2._MemoryError('cannot auto discover source components') | |
120 | ||
121 | res = bt2_value._create_from_ptr(res_ptr) | |
122 | ||
123 | assert type(res) == bt2.MapValue | |
124 | assert 'status' in res | |
125 | ||
126 | status = res['status'] | |
127 | utils._handle_func_status(status, 'cannot auto-discover source components') | |
128 | ||
129 | comp_specs = [] | |
130 | comp_specs_raw = res['results'] | |
131 | assert type(comp_specs_raw) == bt2.ArrayValue | |
132 | ||
133 | for comp_spec_raw in comp_specs_raw: | |
134 | assert type(comp_spec_raw) == bt2.ArrayValue | |
135 | assert len(comp_spec_raw) == 4 | |
136 | ||
137 | plugin_name = comp_spec_raw[0] | |
138 | assert type(plugin_name) == bt2.StringValue | |
139 | plugin_name = str(plugin_name) | |
140 | ||
141 | class_name = comp_spec_raw[1] | |
142 | assert type(class_name) == bt2.StringValue | |
143 | class_name = str(class_name) | |
144 | ||
145 | comp_inputs = comp_spec_raw[2] | |
146 | assert type(comp_inputs) == bt2.ArrayValue | |
147 | ||
148 | comp_orig_indices = comp_spec_raw[3] | |
149 | assert type(comp_orig_indices) | |
150 | ||
151 | params = bt2.MapValue() | |
152 | logging_level = bt2.LoggingLevel.NONE | |
153 | obj = None | |
154 | ||
155 | # Compute `params` for this component by piling up params given to all | |
156 | # AutoSourceComponentSpec objects that contributed in the instantiation | |
157 | # of this component. | |
158 | # | |
159 | # The effective log level for a component is the last one specified | |
160 | # across the AutoSourceComponentSpec that contributed in its | |
161 | # instantiation. | |
162 | for idx in comp_orig_indices: | |
163 | orig_spec = auto_source_comp_specs[idx] | |
164 | ||
165 | if orig_spec.params is not None: | |
166 | params.update(orig_spec.params) | |
167 | ||
168 | if orig_spec.logging_level is not None: | |
169 | logging_level = orig_spec.logging_level | |
170 | ||
171 | if orig_spec.obj is not AutoSourceComponentSpec._no_obj: | |
172 | obj = orig_spec.obj | |
173 | ||
174 | params['inputs'] = comp_inputs | |
175 | ||
176 | comp_specs.append( | |
177 | ComponentSpec( | |
178 | plugin_name, | |
179 | class_name, | |
180 | params=params, | |
181 | obj=obj, | |
182 | logging_level=logging_level, | |
183 | ) | |
184 | ) | |
185 | ||
186 | return comp_specs | |
b20382e2 | 187 | |
d34e69cf PP |
188 | |
189 | # datetime.datetime or integral to nanoseconds | |
190 | def _get_ns(obj): | |
191 | if obj is None: | |
192 | return | |
193 | ||
194 | if isinstance(obj, numbers.Real): | |
195 | # consider that it's already in seconds | |
196 | s = obj | |
197 | elif isinstance(obj, datetime.datetime): | |
198 | # s -> ns | |
199 | s = obj.timestamp() | |
200 | else: | |
61d96b89 FD |
201 | raise TypeError( |
202 | '"{}" is not an integral number or a datetime.datetime object'.format(obj) | |
203 | ) | |
d34e69cf PP |
204 | |
205 | return int(s * 1e9) | |
206 | ||
207 | ||
88fdcc33 PP |
208 | class _CompClsType: |
209 | SOURCE = 0 | |
210 | FILTER = 1 | |
211 | ||
212 | ||
c535b26d PP |
213 | class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent): |
214 | def __init__(self, params, msg_list): | |
215 | assert type(msg_list) is list | |
216 | self._msg_list = msg_list | |
217 | self._add_input_port('in') | |
218 | ||
219 | def _user_graph_is_configured(self): | |
220 | self._msg_iter = self._create_input_port_message_iterator( | |
221 | self._input_ports['in'] | |
222 | ) | |
223 | ||
224 | def _user_consume(self): | |
225 | assert self._msg_list[0] is None | |
226 | self._msg_list[0] = next(self._msg_iter) | |
227 | ||
228 | ||
c946c9de | 229 | class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): |
61d96b89 FD |
230 | def __init__( |
231 | self, | |
232 | source_component_specs, | |
233 | filter_component_specs=None, | |
234 | stream_intersection_mode=False, | |
235 | begin=None, | |
236 | end=None, | |
94e72386 | 237 | plugin_set=None, |
61d96b89 | 238 | ): |
d34e69cf PP |
239 | utils._check_bool(stream_intersection_mode) |
240 | self._stream_intersection_mode = stream_intersection_mode | |
241 | self._begin_ns = _get_ns(begin) | |
242 | self._end_ns = _get_ns(end) | |
c535b26d | 243 | self._msg_list = [None] |
88fdcc33 | 244 | |
94e72386 SM |
245 | # If a single item is provided, convert to a list. |
246 | if type(source_component_specs) in ( | |
247 | ComponentSpec, | |
248 | AutoSourceComponentSpec, | |
249 | str, | |
250 | ): | |
88fdcc33 PP |
251 | source_component_specs = [source_component_specs] |
252 | ||
94e72386 SM |
253 | # Convert any string to an AutoSourceComponentSpec. |
254 | def str_to_auto(item): | |
255 | if type(item) is str: | |
256 | item = AutoSourceComponentSpec(item) | |
257 | ||
258 | return item | |
259 | ||
260 | source_component_specs = [str_to_auto(s) for s in source_component_specs] | |
261 | ||
88fdcc33 PP |
262 | if type(filter_component_specs) is ComponentSpec: |
263 | filter_component_specs = [filter_component_specs] | |
264 | elif filter_component_specs is None: | |
265 | filter_component_specs = [] | |
266 | ||
94e72386 SM |
267 | self._validate_source_component_specs(source_component_specs) |
268 | self._validate_filter_component_specs(filter_component_specs) | |
269 | ||
270 | # Pass any `ComponentSpec` instance as-is. | |
271 | self._src_comp_specs = [ | |
272 | spec for spec in source_component_specs if type(spec) is ComponentSpec | |
273 | ] | |
274 | ||
275 | # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances. | |
276 | auto_src_comp_specs = [ | |
277 | spec | |
278 | for spec in source_component_specs | |
279 | if type(spec) is AutoSourceComponentSpec | |
280 | ] | |
281 | self._src_comp_specs += _auto_discover_source_component_specs( | |
282 | auto_src_comp_specs, plugin_set | |
283 | ) | |
284 | ||
88fdcc33 | 285 | self._flt_comp_specs = filter_component_specs |
d34e69cf PP |
286 | self._next_suffix = 1 |
287 | self._connect_ports = False | |
288 | ||
88fdcc33 | 289 | # lists of _ComponentAndSpec |
d34e69cf | 290 | self._src_comps_and_specs = [] |
88fdcc33 | 291 | self._flt_comps_and_specs = [] |
d34e69cf | 292 | |
d34e69cf PP |
293 | self._build_graph() |
294 | ||
94e72386 SM |
295 | def _validate_source_component_specs(self, comp_specs): |
296 | for comp_spec in comp_specs: | |
297 | if ( | |
298 | type(comp_spec) is not ComponentSpec | |
299 | and type(comp_spec) is not AutoSourceComponentSpec | |
300 | ): | |
301 | raise TypeError( | |
302 | '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format( | |
303 | type(comp_spec) | |
304 | ) | |
305 | ) | |
306 | ||
307 | def _validate_filter_component_specs(self, comp_specs): | |
88fdcc33 PP |
308 | for comp_spec in comp_specs: |
309 | if type(comp_spec) is not ComponentSpec: | |
61d96b89 FD |
310 | raise TypeError( |
311 | '"{}" object is not a ComponentSpec'.format(type(comp_spec)) | |
312 | ) | |
d34e69cf PP |
313 | |
314 | def __next__(self): | |
c535b26d PP |
315 | assert self._msg_list[0] is None |
316 | self._graph.run_once() | |
317 | msg = self._msg_list[0] | |
318 | assert msg is not None | |
319 | self._msg_list[0] = None | |
320 | return msg | |
d34e69cf | 321 | |
da35796c | 322 | def _create_stream_intersection_trimmer(self, component, port): |
d34e69cf | 323 | # find the original parameters specified by the user to create |
a1040187 | 324 | # this port's component to get the `inputs` parameter |
d34e69cf | 325 | for src_comp_and_spec in self._src_comps_and_specs: |
da35796c | 326 | if component == src_comp_and_spec.comp: |
d34e69cf PP |
327 | break |
328 | ||
329 | try: | |
a1040187 | 330 | inputs = src_comp_and_spec.spec.params['inputs'] |
da35796c | 331 | except Exception as e: |
3b2be708 | 332 | raise ValueError( |
a1040187 | 333 | 'all source components must be created with an "inputs" parameter in stream intersection mode' |
61d96b89 | 334 | ) from e |
d34e69cf | 335 | |
a1040187 | 336 | params = {'inputs': inputs} |
d34e69cf | 337 | |
9e534aae PP |
338 | # query the port's component for the `babeltrace.trace-info` |
339 | # object which contains the stream intersection range for each | |
340 | # exposed trace | |
bf403eb2 | 341 | query_exec = bt2.QueryExecutor( |
9e534aae | 342 | src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params |
61d96b89 | 343 | ) |
bf403eb2 | 344 | trace_info_res = query_exec.query() |
d34e69cf PP |
345 | begin = None |
346 | end = None | |
347 | ||
ddf49b27 | 348 | # find the trace info for this port's trace |
f4811b4f PP |
349 | try: |
350 | for trace_info in trace_info_res: | |
ddf49b27 SM |
351 | for stream in trace_info['streams']: |
352 | if stream['port-name'] == port.name: | |
353 | range_ns = trace_info['intersection-range-ns'] | |
354 | begin = range_ns['begin'] | |
355 | end = range_ns['end'] | |
356 | break | |
da35796c | 357 | except Exception: |
f4811b4f | 358 | pass |
d34e69cf PP |
359 | |
360 | if begin is None or end is None: | |
3b2be708 | 361 | raise RuntimeError( |
61d96b89 FD |
362 | 'cannot find stream intersection range for port "{}"'.format(port.name) |
363 | ) | |
d34e69cf | 364 | |
da35796c | 365 | name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) |
d34e69cf PP |
366 | return self._create_trimmer(begin, end, name) |
367 | ||
368 | def _create_muxer(self): | |
369 | plugin = bt2.find_plugin('utils') | |
370 | ||
371 | if plugin is None: | |
3b2be708 | 372 | raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') |
d34e69cf PP |
373 | |
374 | if 'muxer' not in plugin.filter_component_classes: | |
3b2be708 | 375 | raise RuntimeError( |
61d96b89 FD |
376 | 'cannot find "muxer" filter component class in "utils" plugin' |
377 | ) | |
d34e69cf PP |
378 | |
379 | comp_cls = plugin.filter_component_classes['muxer'] | |
380 | return self._graph.add_component(comp_cls, 'muxer') | |
381 | ||
da35796c | 382 | def _create_trimmer(self, begin_ns, end_ns, name): |
d34e69cf PP |
383 | plugin = bt2.find_plugin('utils') |
384 | ||
385 | if plugin is None: | |
3b2be708 | 386 | raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') |
d34e69cf PP |
387 | |
388 | if 'trimmer' not in plugin.filter_component_classes: | |
3b2be708 | 389 | raise RuntimeError( |
61d96b89 FD |
390 | 'cannot find "trimmer" filter component class in "utils" plugin' |
391 | ) | |
d34e69cf PP |
392 | |
393 | params = {} | |
394 | ||
da35796c SM |
395 | def ns_to_string(ns): |
396 | s_part = ns // 1000000000 | |
397 | ns_part = ns % 1000000000 | |
398 | return '{}.{:09d}'.format(s_part, ns_part) | |
d34e69cf | 399 | |
da35796c SM |
400 | if begin_ns is not None: |
401 | params['begin'] = ns_to_string(begin_ns) | |
402 | ||
403 | if end_ns is not None: | |
404 | params['end'] = ns_to_string(end_ns) | |
d34e69cf PP |
405 | |
406 | comp_cls = plugin.filter_component_classes['trimmer'] | |
407 | return self._graph.add_component(comp_cls, name, params) | |
408 | ||
88fdcc33 | 409 | def _get_unique_comp_name(self, comp_spec): |
61d96b89 FD |
410 | name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.class_name) |
411 | comps_and_specs = itertools.chain( | |
412 | self._src_comps_and_specs, self._flt_comps_and_specs | |
413 | ) | |
d34e69cf | 414 | |
88fdcc33 | 415 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
d34e69cf PP |
416 | name += '-{}'.format(self._next_suffix) |
417 | self._next_suffix += 1 | |
418 | ||
419 | return name | |
420 | ||
88fdcc33 | 421 | def _create_comp(self, comp_spec, comp_cls_type): |
d34e69cf PP |
422 | plugin = bt2.find_plugin(comp_spec.plugin_name) |
423 | ||
424 | if plugin is None: | |
3b2be708 | 425 | raise ValueError('no such plugin: {}'.format(comp_spec.plugin_name)) |
d34e69cf | 426 | |
88fdcc33 PP |
427 | if comp_cls_type == _CompClsType.SOURCE: |
428 | comp_classes = plugin.source_component_classes | |
429 | else: | |
430 | comp_classes = plugin.filter_component_classes | |
431 | ||
c88be1c8 | 432 | if comp_spec.class_name not in comp_classes: |
88fdcc33 | 433 | cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter' |
3b2be708 | 434 | raise ValueError( |
61d96b89 FD |
435 | 'no such {} component class in "{}" plugin: {}'.format( |
436 | cc_type, comp_spec.plugin_name, comp_spec.class_name | |
437 | ) | |
438 | ) | |
d34e69cf | 439 | |
c88be1c8 | 440 | comp_cls = comp_classes[comp_spec.class_name] |
88fdcc33 | 441 | name = self._get_unique_comp_name(comp_spec) |
61d96b89 | 442 | comp = self._graph.add_component( |
b20382e2 | 443 | comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level |
61d96b89 | 444 | ) |
d34e69cf PP |
445 | return comp |
446 | ||
447 | def _get_free_muxer_input_port(self): | |
448 | for port in self._muxer_comp.input_ports.values(): | |
449 | if not port.is_connected: | |
450 | return port | |
451 | ||
da35796c | 452 | def _connect_src_comp_port(self, component, port): |
d34e69cf PP |
453 | # if this trace collection iterator is in stream intersection |
454 | # mode, we need this connection: | |
455 | # | |
456 | # port -> trimmer -> muxer | |
457 | # | |
458 | # otherwise, simply: | |
459 | # | |
460 | # port -> muxer | |
461 | if self._stream_intersection_mode: | |
da35796c | 462 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
d34e69cf PP |
463 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) |
464 | port_to_muxer = trimmer_comp.output_ports['out'] | |
465 | else: | |
466 | port_to_muxer = port | |
467 | ||
468 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
469 | ||
da35796c | 470 | def _graph_port_added(self, component, port): |
d34e69cf PP |
471 | if not self._connect_ports: |
472 | return | |
473 | ||
c946c9de | 474 | if type(port) is bt2_port._InputPort: |
d34e69cf PP |
475 | return |
476 | ||
da35796c | 477 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
d34e69cf PP |
478 | # do not care about non-source components (muxer, trimmer, etc.) |
479 | return | |
480 | ||
da35796c | 481 | self._connect_src_comp_port(component, port) |
d34e69cf PP |
482 | |
483 | def _build_graph(self): | |
484 | self._graph = bt2.Graph() | |
da35796c | 485 | self._graph.add_port_added_listener(self._graph_port_added) |
d34e69cf PP |
486 | self._muxer_comp = self._create_muxer() |
487 | ||
488 | if self._begin_ns is not None or self._end_ns is not None: | |
61d96b89 FD |
489 | trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') |
490 | self._graph.connect_ports( | |
491 | self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] | |
492 | ) | |
c535b26d | 493 | last_flt_out_port = trimmer_comp.output_ports['out'] |
d34e69cf | 494 | else: |
c535b26d | 495 | last_flt_out_port = self._muxer_comp.output_ports['out'] |
d34e69cf | 496 | |
88fdcc33 PP |
497 | # create extra filter components (chained) |
498 | for comp_spec in self._flt_comp_specs: | |
499 | comp = self._create_comp(comp_spec, _CompClsType.FILTER) | |
500 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
501 | ||
502 | # connect the extra filter chain | |
503 | for comp_and_spec in self._flt_comps_and_specs: | |
504 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
505 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
c535b26d PP |
506 | self._graph.connect_ports(last_flt_out_port, in_port) |
507 | last_flt_out_port = out_port | |
88fdcc33 | 508 | |
d34e69cf PP |
509 | # Here we create the components, self._graph_port_added() is |
510 | # called when they add ports, but the callback returns early | |
511 | # because self._connect_ports is False. This is because the | |
512 | # self._graph_port_added() could not find the associated source | |
513 | # component specification in self._src_comps_and_specs because | |
514 | # it does not exist yet (it needs the created component to | |
515 | # exist). | |
516 | for comp_spec in self._src_comp_specs: | |
88fdcc33 PP |
517 | comp = self._create_comp(comp_spec, _CompClsType.SOURCE) |
518 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
d34e69cf PP |
519 | |
520 | # Now we connect the ports which exist at this point. We allow | |
521 | # self._graph_port_added() to automatically connect _new_ ports. | |
522 | self._connect_ports = True | |
523 | ||
524 | for comp_and_spec in self._src_comps_and_specs: | |
525 | # Keep a separate list because comp_and_spec.output_ports | |
526 | # could change during the connection of one of its ports. | |
527 | # Any new port is handled by self._graph_port_added(). | |
528 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
529 | ||
530 | for out_port in out_ports: | |
da35796c | 531 | if out_port.is_connected: |
d34e69cf PP |
532 | continue |
533 | ||
da35796c | 534 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
d34e69cf | 535 | |
c535b26d PP |
536 | # Add the proxy sink, passing our message list to share consumed |
537 | # messages with this trace collection message iterator. | |
538 | sink = self._graph.add_component( | |
539 | _TraceCollectionMessageIteratorProxySink, 'proxy-sink', obj=self._msg_list | |
540 | ) | |
541 | sink_in_port = sink.input_ports['in'] | |
542 | ||
543 | # connect last filter to proxy sink | |
544 | self._graph.connect_ports(last_flt_out_port, sink_in_port) |