python: standardize intra-bt2 imports
[babeltrace.git] / src / bindings / python / bt2 / bt2 / trace_collection_message_iterator.py
CommitLineData
0235b0db 1# SPDX-License-Identifier: MIT
85dcce24
PP
2#
3# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
85dcce24 4
e5914347
SM
5from bt2 import native_bt
6from bt2 import utils as bt2_utils
85dcce24 7import bt2
3d60267b 8import itertools
3fb99a22 9from bt2 import message_iterator as bt2_message_iterator
3fb99a22 10from bt2 import port as bt2_port
c1859f69 11from bt2 import component as bt2_component
f3c9a159
SM
12from bt2 import value as bt2_value
13from bt2 import plugin as bt2_plugin
85dcce24 14import datetime
85dcce24
PP
15from collections import namedtuple
16import numbers
17
18
3d60267b 19# a pair of component and ComponentSpec
f5567ea8 20_ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"])
85dcce24
PP
21
22
f3c9a159 23class _BaseComponentSpec:
c87f23fa
SM
24 # Base for any component spec that can be passed to
25 # TraceCollectionMessageIterator.
f3c9a159
SM
26 def __init__(self, params, obj, logging_level):
27 if logging_level is not None:
e5914347 28 bt2_utils._check_log_level(logging_level)
f3c9a159
SM
29
30 self._params = bt2.create_value(params)
31 self._obj = obj
32 self._logging_level = logging_level
33
34 @property
35 def params(self):
36 return self._params
37
38 @property
39 def obj(self):
40 return self._obj
41
42 @property
43 def logging_level(self):
44 return self._logging_level
45
46
47class ComponentSpec(_BaseComponentSpec):
c87f23fa 48 # A component spec with a specific component class.
cfbd7cf3
FD
49 def __init__(
50 self,
c87f23fa 51 component_class,
cfbd7cf3 52 params=None,
66964f3f 53 obj=None,
c87f23fa 54 logging_level=bt2.LoggingLevel.NONE,
cfbd7cf3 55 ):
f3c9a159 56 if type(params) is str:
f5567ea8 57 params = {"inputs": [params]}
f3c9a159
SM
58
59 super().__init__(params, obj, logging_level)
60
c87f23fa 61 is_cc_object = isinstance(
615238be
FD
62 component_class,
63 (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst),
c87f23fa
SM
64 )
65 is_user_cc_type = isinstance(
66 component_class, bt2_component._UserComponentType
67 ) and issubclass(
68 component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent)
69 )
f3c9a159 70
c87f23fa
SM
71 if not is_cc_object and not is_user_cc_type:
72 raise TypeError(
73 "'{}' is not a source or filter component class".format(
74 component_class.__class__.__name__
75 )
76 )
85dcce24 77
c87f23fa 78 self._component_class = component_class
85dcce24
PP
79
80 @property
c87f23fa
SM
81 def component_class(self):
82 return self._component_class
83
84 @classmethod
85 def from_named_plugin_and_component_class(
86 cls,
87 plugin_name,
88 component_class_name,
89 params=None,
90 obj=None,
91 logging_level=bt2.LoggingLevel.NONE,
92 ):
93 plugin = bt2.find_plugin(plugin_name)
94
95 if plugin is None:
f5567ea8 96 raise ValueError("no such plugin: {}".format(plugin_name))
c87f23fa
SM
97
98 if component_class_name in plugin.source_component_classes:
99 comp_class = plugin.source_component_classes[component_class_name]
100 elif component_class_name in plugin.filter_component_classes:
101 comp_class = plugin.filter_component_classes[component_class_name]
102 else:
103 raise KeyError(
f5567ea8 104 "source or filter component class `{}` not found in plugin `{}`".format(
c87f23fa
SM
105 component_class_name, plugin_name
106 )
107 )
108
109 return cls(comp_class, params, obj, logging_level)
85dcce24 110
e874da19 111
f3c9a159 112class AutoSourceComponentSpec(_BaseComponentSpec):
c87f23fa 113 # A component spec that does automatic source discovery.
f3c9a159
SM
114 _no_obj = object()
115
116 def __init__(self, input, params=None, obj=_no_obj, logging_level=None):
117 super().__init__(params, obj, logging_level)
118 self._input = input
85dcce24 119
66964f3f 120 @property
f3c9a159
SM
121 def input(self):
122 return self._input
123
124
125def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set):
126 # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec`
127 # using the automatic source discovery mechanism.
128 inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs])
129
130 if plugin_set is None:
131 plugin_set = bt2.find_plugins()
132 else:
e5914347 133 bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet)
f3c9a159
SM
134
135 res_ptr = native_bt.bt2_auto_discover_source_components(
136 inputs._ptr, plugin_set._ptr
137 )
138
139 if res_ptr is None:
f5567ea8 140 raise bt2._MemoryError("cannot auto discover source components")
f3c9a159
SM
141
142 res = bt2_value._create_from_ptr(res_ptr)
143
f20175d1 144 assert type(res) is bt2.MapValue
f5567ea8 145 assert "status" in res
f3c9a159 146
f5567ea8 147 status = res["status"]
e5914347 148 bt2_utils._handle_func_status(status, "cannot auto-discover source components")
f3c9a159
SM
149
150 comp_specs = []
f5567ea8 151 comp_specs_raw = res["results"]
f20175d1 152 assert type(comp_specs_raw) is bt2.ArrayValue
f3c9a159 153
39b351f9
SM
154 used_input_indices = set()
155
f3c9a159 156 for comp_spec_raw in comp_specs_raw:
f20175d1 157 assert type(comp_spec_raw) is bt2.ArrayValue
f3c9a159
SM
158 assert len(comp_spec_raw) == 4
159
160 plugin_name = comp_spec_raw[0]
f20175d1 161 assert type(plugin_name) is bt2.StringValue
f3c9a159
SM
162 plugin_name = str(plugin_name)
163
164 class_name = comp_spec_raw[1]
f20175d1 165 assert type(class_name) is bt2.StringValue
f3c9a159
SM
166 class_name = str(class_name)
167
168 comp_inputs = comp_spec_raw[2]
f20175d1 169 assert type(comp_inputs) is bt2.ArrayValue
f3c9a159
SM
170
171 comp_orig_indices = comp_spec_raw[3]
172 assert type(comp_orig_indices)
173
174 params = bt2.MapValue()
175 logging_level = bt2.LoggingLevel.NONE
176 obj = None
177
178 # Compute `params` for this component by piling up params given to all
179 # AutoSourceComponentSpec objects that contributed in the instantiation
180 # of this component.
181 #
182 # The effective log level for a component is the last one specified
183 # across the AutoSourceComponentSpec that contributed in its
184 # instantiation.
185 for idx in comp_orig_indices:
186 orig_spec = auto_source_comp_specs[idx]
187
188 if orig_spec.params is not None:
189 params.update(orig_spec.params)
190
191 if orig_spec.logging_level is not None:
192 logging_level = orig_spec.logging_level
193
194 if orig_spec.obj is not AutoSourceComponentSpec._no_obj:
195 obj = orig_spec.obj
196
39b351f9
SM
197 used_input_indices.add(int(idx))
198
f5567ea8 199 params["inputs"] = comp_inputs
f3c9a159
SM
200
201 comp_specs.append(
c87f23fa 202 ComponentSpec.from_named_plugin_and_component_class(
f3c9a159
SM
203 plugin_name,
204 class_name,
205 params=params,
206 obj=obj,
207 logging_level=logging_level,
208 )
209 )
210
39b351f9
SM
211 if len(used_input_indices) != len(inputs):
212 unused_input_indices = set(range(len(inputs))) - used_input_indices
213 unused_input_indices = sorted(unused_input_indices)
214 unused_inputs = [str(inputs[x]) for x in unused_input_indices]
215
216 msg = (
f5567ea8
FD
217 "Some auto source component specs did not produce any component: "
218 + ", ".join(unused_inputs)
39b351f9
SM
219 )
220 raise RuntimeError(msg)
221
f3c9a159 222 return comp_specs
66964f3f 223
85dcce24
PP
224
225# datetime.datetime or integral to nanoseconds
226def _get_ns(obj):
227 if obj is None:
228 return
229
230 if isinstance(obj, numbers.Real):
231 # consider that it's already in seconds
232 s = obj
233 elif isinstance(obj, datetime.datetime):
234 # s -> ns
235 s = obj.timestamp()
236 else:
cfbd7cf3
FD
237 raise TypeError(
238 '"{}" is not an integral number or a datetime.datetime object'.format(obj)
239 )
85dcce24
PP
240
241 return int(s * 1e9)
242
243
c1859f69 244class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
59225a3e 245 def __init__(self, config, params, msg_list):
c1859f69
PP
246 assert type(msg_list) is list
247 self._msg_list = msg_list
f5567ea8 248 self._add_input_port("in")
c1859f69
PP
249
250 def _user_graph_is_configured(self):
f5567ea8 251 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
c1859f69
PP
252
253 def _user_consume(self):
254 assert self._msg_list[0] is None
255 self._msg_list[0] = next(self._msg_iter)
256
257
3fb99a22 258class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
cfbd7cf3
FD
259 def __init__(
260 self,
261 source_component_specs,
262 filter_component_specs=None,
263 stream_intersection_mode=False,
264 begin=None,
265 end=None,
f3c9a159 266 plugin_set=None,
cfbd7cf3 267 ):
e5914347 268 bt2_utils._check_bool(stream_intersection_mode)
85dcce24
PP
269 self._stream_intersection_mode = stream_intersection_mode
270 self._begin_ns = _get_ns(begin)
271 self._end_ns = _get_ns(end)
c1859f69 272 self._msg_list = [None]
3d60267b 273
f3c9a159
SM
274 # If a single item is provided, convert to a list.
275 if type(source_component_specs) in (
276 ComponentSpec,
277 AutoSourceComponentSpec,
278 str,
279 ):
3d60267b
PP
280 source_component_specs = [source_component_specs]
281
f3c9a159
SM
282 # Convert any string to an AutoSourceComponentSpec.
283 def str_to_auto(item):
284 if type(item) is str:
285 item = AutoSourceComponentSpec(item)
286
287 return item
288
289 source_component_specs = [str_to_auto(s) for s in source_component_specs]
290
3d60267b
PP
291 if type(filter_component_specs) is ComponentSpec:
292 filter_component_specs = [filter_component_specs]
293 elif filter_component_specs is None:
294 filter_component_specs = []
295
f3c9a159
SM
296 self._validate_source_component_specs(source_component_specs)
297 self._validate_filter_component_specs(filter_component_specs)
298
299 # Pass any `ComponentSpec` instance as-is.
300 self._src_comp_specs = [
301 spec for spec in source_component_specs if type(spec) is ComponentSpec
302 ]
303
304 # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances.
305 auto_src_comp_specs = [
306 spec
307 for spec in source_component_specs
308 if type(spec) is AutoSourceComponentSpec
309 ]
310 self._src_comp_specs += _auto_discover_source_component_specs(
311 auto_src_comp_specs, plugin_set
312 )
313
3d60267b 314 self._flt_comp_specs = filter_component_specs
85dcce24
PP
315 self._next_suffix = 1
316 self._connect_ports = False
317
3d60267b 318 # lists of _ComponentAndSpec
85dcce24 319 self._src_comps_and_specs = []
3d60267b 320 self._flt_comps_and_specs = []
85dcce24 321
85dcce24
PP
322 self._build_graph()
323
30947af0
SM
324 def _compute_stream_intersections(self):
325 # Pre-compute the trimmer range to use for each port in the graph, when
326 # stream intersection mode is enabled.
327 self._stream_inter_port_to_range = {}
328
329 for src_comp_and_spec in self._src_comps_and_specs:
5f2a1585 330 # Query the port's component for the `babeltrace.trace-infos`
30947af0
SM
331 # object which contains the range for each stream, from which we can
332 # compute the intersection of the streams in each trace.
333 query_exec = bt2.QueryExecutor(
3f3d89b4 334 src_comp_and_spec.spec.component_class,
f5567ea8 335 "babeltrace.trace-infos",
3f3d89b4 336 src_comp_and_spec.spec.params,
30947af0
SM
337 )
338 trace_infos = query_exec.query()
339
340 for trace_info in trace_infos:
341 begin = max(
5f2a1585 342 [
f5567ea8
FD
343 stream["range-ns"]["begin"]
344 for stream in trace_info["stream-infos"]
5f2a1585 345 ]
30947af0
SM
346 )
347 end = min(
f5567ea8 348 [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]]
30947af0
SM
349 )
350
351 # Each port associated to this trace will have this computed
352 # range.
f5567ea8 353 for stream in trace_info["stream-infos"]:
30947af0
SM
354 # A port name is unique within a component, but not
355 # necessarily across all components. Use a component
356 # and port name pair to make it unique across the graph.
f5567ea8 357 port_name = str(stream["port-name"])
30947af0
SM
358 key = (src_comp_and_spec.comp.addr, port_name)
359 self._stream_inter_port_to_range[key] = (begin, end)
360
f3c9a159
SM
361 def _validate_source_component_specs(self, comp_specs):
362 for comp_spec in comp_specs:
363 if (
364 type(comp_spec) is not ComponentSpec
365 and type(comp_spec) is not AutoSourceComponentSpec
366 ):
367 raise TypeError(
368 '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format(
369 type(comp_spec)
370 )
371 )
372
373 def _validate_filter_component_specs(self, comp_specs):
3d60267b
PP
374 for comp_spec in comp_specs:
375 if type(comp_spec) is not ComponentSpec:
cfbd7cf3
FD
376 raise TypeError(
377 '"{}" object is not a ComponentSpec'.format(type(comp_spec))
378 )
85dcce24
PP
379
380 def __next__(self):
c1859f69
PP
381 assert self._msg_list[0] is None
382 self._graph.run_once()
383 msg = self._msg_list[0]
384 assert msg is not None
385 self._msg_list[0] = None
386 return msg
85dcce24 387
907f2b70 388 def _create_stream_intersection_trimmer(self, component, port):
30947af0
SM
389 key = (component.addr, port.name)
390 begin, end = self._stream_inter_port_to_range[key]
f5567ea8 391 name = "trimmer-{}-{}".format(component.name, port.name)
85dcce24
PP
392 return self._create_trimmer(begin, end, name)
393
394 def _create_muxer(self):
f5567ea8 395 plugin = bt2.find_plugin("utils")
85dcce24
PP
396
397 if plugin is None:
ce4923b0 398 raise RuntimeError('cannot find "utils" plugin (needed for the muxer)')
85dcce24 399
f5567ea8 400 if "muxer" not in plugin.filter_component_classes:
ce4923b0 401 raise RuntimeError(
cfbd7cf3
FD
402 'cannot find "muxer" filter component class in "utils" plugin'
403 )
85dcce24 404
f5567ea8
FD
405 comp_cls = plugin.filter_component_classes["muxer"]
406 return self._graph.add_component(comp_cls, "muxer")
85dcce24 407
907f2b70 408 def _create_trimmer(self, begin_ns, end_ns, name):
f5567ea8 409 plugin = bt2.find_plugin("utils")
85dcce24
PP
410
411 if plugin is None:
ce4923b0 412 raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)')
85dcce24 413
f5567ea8 414 if "trimmer" not in plugin.filter_component_classes:
ce4923b0 415 raise RuntimeError(
cfbd7cf3
FD
416 'cannot find "trimmer" filter component class in "utils" plugin'
417 )
85dcce24
PP
418
419 params = {}
420
907f2b70
SM
421 def ns_to_string(ns):
422 s_part = ns // 1000000000
423 ns_part = ns % 1000000000
f5567ea8 424 return "{}.{:09d}".format(s_part, ns_part)
85dcce24 425
907f2b70 426 if begin_ns is not None:
f5567ea8 427 params["begin"] = ns_to_string(begin_ns)
907f2b70
SM
428
429 if end_ns is not None:
f5567ea8 430 params["end"] = ns_to_string(end_ns)
85dcce24 431
f5567ea8 432 comp_cls = plugin.filter_component_classes["trimmer"]
85dcce24
PP
433 return self._graph.add_component(comp_cls, name, params)
434
c87f23fa
SM
435 def _get_unique_comp_name(self, comp_cls):
436 name = comp_cls.name
cfbd7cf3
FD
437 comps_and_specs = itertools.chain(
438 self._src_comps_and_specs, self._flt_comps_and_specs
439 )
85dcce24 440
3d60267b 441 if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
f5567ea8 442 name += "-{}".format(self._next_suffix)
85dcce24
PP
443 self._next_suffix += 1
444
445 return name
446
c87f23fa
SM
447 def _create_comp(self, comp_spec):
448 comp_cls = comp_spec.component_class
449 name = self._get_unique_comp_name(comp_cls)
cfbd7cf3 450 comp = self._graph.add_component(
66964f3f 451 comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level
cfbd7cf3 452 )
85dcce24
PP
453 return comp
454
455 def _get_free_muxer_input_port(self):
456 for port in self._muxer_comp.input_ports.values():
457 if not port.is_connected:
458 return port
459
907f2b70 460 def _connect_src_comp_port(self, component, port):
85dcce24
PP
461 # if this trace collection iterator is in stream intersection
462 # mode, we need this connection:
463 #
464 # port -> trimmer -> muxer
465 #
466 # otherwise, simply:
467 #
468 # port -> muxer
469 if self._stream_intersection_mode:
907f2b70 470 trimmer_comp = self._create_stream_intersection_trimmer(component, port)
f5567ea8
FD
471 self._graph.connect_ports(port, trimmer_comp.input_ports["in"])
472 port_to_muxer = trimmer_comp.output_ports["out"]
85dcce24
PP
473 else:
474 port_to_muxer = port
475
476 self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port())
477
907f2b70 478 def _graph_port_added(self, component, port):
85dcce24
PP
479 if not self._connect_ports:
480 return
481
5813b3a3 482 if type(port) is bt2_port._InputPortConst:
85dcce24
PP
483 return
484
907f2b70 485 if component not in [comp.comp for comp in self._src_comps_and_specs]:
85dcce24
PP
486 # do not care about non-source components (muxer, trimmer, etc.)
487 return
488
907f2b70 489 self._connect_src_comp_port(component, port)
85dcce24 490
2080bf80 491 def _get_greatest_operative_mip_version(self):
c87f23fa 492 def append_comp_specs_descriptors(descriptors, comp_specs):
2080bf80 493 for comp_spec in comp_specs:
2080bf80 494 descriptors.append(
c87f23fa
SM
495 bt2.ComponentDescriptor(
496 comp_spec.component_class, comp_spec.params, comp_spec.obj
497 )
2080bf80
PP
498 )
499
500 descriptors = []
c87f23fa
SM
501 append_comp_specs_descriptors(descriptors, self._src_comp_specs)
502 append_comp_specs_descriptors(descriptors, self._flt_comp_specs)
2080bf80
PP
503
504 if self._stream_intersection_mode:
505 # we also need at least one `flt.utils.trimmer` component
c87f23fa 506 comp_spec = ComponentSpec.from_named_plugin_and_component_class(
f5567ea8 507 "utils", "trimmer"
c87f23fa
SM
508 )
509 append_comp_specs_descriptors(descriptors, [comp_spec])
2080bf80
PP
510
511 mip_version = bt2.get_greatest_operative_mip_version(descriptors)
512
513 if mip_version is None:
f5567ea8 514 msg = "failed to find an operative message interchange protocol version (components are not interoperable)"
2080bf80
PP
515 raise RuntimeError(msg)
516
517 return mip_version
518
85dcce24 519 def _build_graph(self):
2080bf80 520 self._graph = bt2.Graph(self._get_greatest_operative_mip_version())
907f2b70 521 self._graph.add_port_added_listener(self._graph_port_added)
85dcce24
PP
522 self._muxer_comp = self._create_muxer()
523
524 if self._begin_ns is not None or self._end_ns is not None:
f5567ea8 525 trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer")
cfbd7cf3 526 self._graph.connect_ports(
f5567ea8 527 self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"]
cfbd7cf3 528 )
f5567ea8 529 last_flt_out_port = trimmer_comp.output_ports["out"]
85dcce24 530 else:
f5567ea8 531 last_flt_out_port = self._muxer_comp.output_ports["out"]
85dcce24 532
3d60267b
PP
533 # create extra filter components (chained)
534 for comp_spec in self._flt_comp_specs:
c87f23fa 535 comp = self._create_comp(comp_spec)
3d60267b
PP
536 self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
537
538 # connect the extra filter chain
539 for comp_and_spec in self._flt_comps_and_specs:
540 in_port = list(comp_and_spec.comp.input_ports.values())[0]
541 out_port = list(comp_and_spec.comp.output_ports.values())[0]
c1859f69
PP
542 self._graph.connect_ports(last_flt_out_port, in_port)
543 last_flt_out_port = out_port
3d60267b 544
85dcce24
PP
545 # Here we create the components, self._graph_port_added() is
546 # called when they add ports, but the callback returns early
547 # because self._connect_ports is False. This is because the
548 # self._graph_port_added() could not find the associated source
549 # component specification in self._src_comps_and_specs because
550 # it does not exist yet (it needs the created component to
551 # exist).
552 for comp_spec in self._src_comp_specs:
c87f23fa 553 comp = self._create_comp(comp_spec)
3d60267b 554 self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
85dcce24 555
30947af0
SM
556 if self._stream_intersection_mode:
557 self._compute_stream_intersections()
558
85dcce24
PP
559 # Now we connect the ports which exist at this point. We allow
560 # self._graph_port_added() to automatically connect _new_ ports.
561 self._connect_ports = True
562
563 for comp_and_spec in self._src_comps_and_specs:
564 # Keep a separate list because comp_and_spec.output_ports
565 # could change during the connection of one of its ports.
566 # Any new port is handled by self._graph_port_added().
567 out_ports = [port for port in comp_and_spec.comp.output_ports.values()]
568
569 for out_port in out_ports:
907f2b70 570 if out_port.is_connected:
85dcce24
PP
571 continue
572
907f2b70 573 self._connect_src_comp_port(comp_and_spec.comp, out_port)
85dcce24 574
c1859f69
PP
575 # Add the proxy sink, passing our message list to share consumed
576 # messages with this trace collection message iterator.
577 sink = self._graph.add_component(
f5567ea8 578 _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list
c1859f69 579 )
f5567ea8 580 sink_in_port = sink.input_ports["in"]
c1859f69
PP
581
582 # connect last filter to proxy sink
583 self._graph.connect_ports(last_flt_out_port, sink_in_port)
This page took 0.097817 seconds and 4 git commands to generate.