cpp-common/bt2c/fmt.hpp: use `wise_enum::string_type` in `EnableIfIsWiseEnum` definition
[babeltrace.git] / src / bindings / python / bt2 / bt2 / trace_collection_message_iterator.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4
5 import numbers
6 import datetime
7 import itertools
8 from collections import namedtuple
9
10 from bt2 import mip as bt2_mip
11 from bt2 import port as bt2_port
12 from bt2 import error as bt2_error
13 from bt2 import graph as bt2_graph
14 from bt2 import utils as bt2_utils
15 from bt2 import value as bt2_value
16 from bt2 import plugin as bt2_plugin
17 from bt2 import logging as bt2_logging
18 from bt2 import component as bt2_component
19 from bt2 import native_bt
20 from bt2 import query_executor as bt2_query_executor
21 from bt2 import message_iterator as bt2_message_iterator
22 from bt2 import component_descriptor as bt2_component_descriptor
23
24 # a pair of component and ComponentSpec
25 _ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"])
26
27
28 class _BaseComponentSpec:
29 # Base for any component spec that can be passed to
30 # TraceCollectionMessageIterator.
31 def __init__(self, params, obj, logging_level):
32 if logging_level is not None:
33 bt2_utils._check_log_level(logging_level)
34
35 self._params = bt2_value.create_value(params)
36 self._obj = obj
37 self._logging_level = logging_level
38
39 @property
40 def params(self):
41 return self._params
42
43 @property
44 def obj(self):
45 return self._obj
46
47 @property
48 def logging_level(self):
49 return self._logging_level
50
51
52 class ComponentSpec(_BaseComponentSpec):
53 # A component spec with a specific component class.
54 def __init__(
55 self,
56 component_class,
57 params=None,
58 obj=None,
59 logging_level=bt2_logging.LoggingLevel.NONE,
60 ):
61 if type(params) is str:
62 params = {"inputs": [params]}
63
64 super().__init__(params, obj, logging_level)
65
66 is_cc_object = isinstance(
67 component_class,
68 (
69 bt2_component._SourceComponentClassConst,
70 bt2_component._FilterComponentClassConst,
71 ),
72 )
73 is_user_cc_type = isinstance(
74 component_class, bt2_component._UserComponentType
75 ) and issubclass(
76 component_class,
77 (bt2_component._UserSourceComponent, bt2_component._UserFilterComponent),
78 )
79
80 if not is_cc_object and not is_user_cc_type:
81 raise TypeError(
82 "'{}' is not a source or filter component class".format(
83 component_class.__class__.__name__
84 )
85 )
86
87 self._component_class = component_class
88
89 @property
90 def component_class(self):
91 return self._component_class
92
93 @classmethod
94 def from_named_plugin_and_component_class(
95 cls,
96 plugin_name,
97 component_class_name,
98 params=None,
99 obj=None,
100 logging_level=bt2_logging.LoggingLevel.NONE,
101 ):
102 plugin = bt2_plugin.find_plugin(plugin_name)
103
104 if plugin is None:
105 raise ValueError("no such plugin: {}".format(plugin_name))
106
107 if component_class_name in plugin.source_component_classes:
108 comp_class = plugin.source_component_classes[component_class_name]
109 elif component_class_name in plugin.filter_component_classes:
110 comp_class = plugin.filter_component_classes[component_class_name]
111 else:
112 raise KeyError(
113 "source or filter component class `{}` not found in plugin `{}`".format(
114 component_class_name, plugin_name
115 )
116 )
117
118 return cls(comp_class, params, obj, logging_level)
119
120
121 class AutoSourceComponentSpec(_BaseComponentSpec):
122 # A component spec that does automatic source discovery.
123 _no_obj = object()
124
125 def __init__(self, input, params=None, obj=_no_obj, logging_level=None):
126 super().__init__(params, obj, logging_level)
127 self._input = input
128
129 @property
130 def input(self):
131 return self._input
132
133
134 def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set):
135 # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec`
136 # using the automatic source discovery mechanism.
137 inputs = bt2_value.ArrayValue([spec.input for spec in auto_source_comp_specs])
138
139 if plugin_set is None:
140 plugin_set = bt2_plugin.find_plugins()
141 else:
142 bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet)
143
144 res_ptr = native_bt.bt2_auto_discover_source_components(
145 inputs._ptr, plugin_set._ptr
146 )
147
148 if res_ptr is None:
149 raise bt2_error._MemoryError("cannot auto discover source components")
150
151 res = bt2_value._create_from_ptr(res_ptr)
152
153 assert type(res) is bt2_value.MapValue
154 assert "status" in res
155
156 status = res["status"]
157 bt2_utils._handle_func_status(status, "cannot auto-discover source components")
158
159 comp_specs = []
160 comp_specs_raw = res["results"]
161 assert type(comp_specs_raw) is bt2_value.ArrayValue
162
163 used_input_indices = set()
164
165 for comp_spec_raw in comp_specs_raw:
166 assert type(comp_spec_raw) is bt2_value.ArrayValue
167 assert len(comp_spec_raw) == 4
168
169 plugin_name = comp_spec_raw[0]
170 assert type(plugin_name) is bt2_value.StringValue
171 plugin_name = str(plugin_name)
172
173 class_name = comp_spec_raw[1]
174 assert type(class_name) is bt2_value.StringValue
175 class_name = str(class_name)
176
177 comp_inputs = comp_spec_raw[2]
178 assert type(comp_inputs) is bt2_value.ArrayValue
179
180 comp_orig_indices = comp_spec_raw[3]
181 assert type(comp_orig_indices)
182
183 params = bt2_value.MapValue()
184 logging_level = bt2_logging.LoggingLevel.NONE
185 obj = None
186
187 # Compute `params` for this component by piling up params given to all
188 # AutoSourceComponentSpec objects that contributed in the instantiation
189 # of this component.
190 #
191 # The effective log level for a component is the last one specified
192 # across the AutoSourceComponentSpec that contributed in its
193 # instantiation.
194 for idx in comp_orig_indices:
195 orig_spec = auto_source_comp_specs[idx]
196
197 if orig_spec.params is not None:
198 params.update(orig_spec.params)
199
200 if orig_spec.logging_level is not None:
201 logging_level = orig_spec.logging_level
202
203 if orig_spec.obj is not AutoSourceComponentSpec._no_obj:
204 obj = orig_spec.obj
205
206 used_input_indices.add(int(idx))
207
208 params["inputs"] = comp_inputs
209
210 comp_specs.append(
211 ComponentSpec.from_named_plugin_and_component_class(
212 plugin_name,
213 class_name,
214 params=params,
215 obj=obj,
216 logging_level=logging_level,
217 )
218 )
219
220 if len(used_input_indices) != len(inputs):
221 unused_input_indices = set(range(len(inputs))) - used_input_indices
222 unused_input_indices = sorted(unused_input_indices)
223 unused_inputs = [str(inputs[x]) for x in unused_input_indices]
224
225 msg = (
226 "Some auto source component specs did not produce any component: "
227 + ", ".join(unused_inputs)
228 )
229 raise RuntimeError(msg)
230
231 return comp_specs
232
233
234 # datetime.datetime or integral to nanoseconds
235 def _get_ns(obj):
236 if obj is None:
237 return
238
239 if isinstance(obj, numbers.Real):
240 # consider that it's already in seconds
241 s = obj
242 elif isinstance(obj, datetime.datetime):
243 # s -> ns
244 s = obj.timestamp()
245 else:
246 raise TypeError(
247 '"{}" is not an integral number or a datetime.datetime object'.format(obj)
248 )
249
250 return int(s * 1e9)
251
252
253 class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
254 def __init__(self, config, params, msg_list):
255 assert type(msg_list) is list
256 self._msg_list = msg_list
257 self._add_input_port("in")
258
259 def _user_graph_is_configured(self):
260 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
261
262 def _user_consume(self):
263 assert self._msg_list[0] is None
264 self._msg_list[0] = next(self._msg_iter)
265
266
267 class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
268 def __init__(
269 self,
270 source_component_specs,
271 filter_component_specs=None,
272 stream_intersection_mode=False,
273 begin=None,
274 end=None,
275 plugin_set=None,
276 ):
277 bt2_utils._check_bool(stream_intersection_mode)
278 self._stream_intersection_mode = stream_intersection_mode
279 self._begin_ns = _get_ns(begin)
280 self._end_ns = _get_ns(end)
281 self._msg_list = [None]
282
283 # If a single item is provided, convert to a list.
284 if type(source_component_specs) in (
285 ComponentSpec,
286 AutoSourceComponentSpec,
287 str,
288 ):
289 source_component_specs = [source_component_specs]
290
291 # Convert any string to an AutoSourceComponentSpec.
292 def str_to_auto(item):
293 if type(item) is str:
294 item = AutoSourceComponentSpec(item)
295
296 return item
297
298 source_component_specs = [str_to_auto(s) for s in source_component_specs]
299
300 if type(filter_component_specs) is ComponentSpec:
301 filter_component_specs = [filter_component_specs]
302 elif filter_component_specs is None:
303 filter_component_specs = []
304
305 self._validate_source_component_specs(source_component_specs)
306 self._validate_filter_component_specs(filter_component_specs)
307
308 # Pass any `ComponentSpec` instance as-is.
309 self._src_comp_specs = [
310 spec for spec in source_component_specs if type(spec) is ComponentSpec
311 ]
312
313 # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances.
314 auto_src_comp_specs = [
315 spec
316 for spec in source_component_specs
317 if type(spec) is AutoSourceComponentSpec
318 ]
319 self._src_comp_specs += _auto_discover_source_component_specs(
320 auto_src_comp_specs, plugin_set
321 )
322
323 self._flt_comp_specs = filter_component_specs
324 self._next_suffix = 1
325 self._connect_ports = False
326
327 # lists of _ComponentAndSpec
328 self._src_comps_and_specs = []
329 self._flt_comps_and_specs = []
330
331 self._build_graph()
332
333 def _compute_stream_intersections(self):
334 # Pre-compute the trimmer range to use for each port in the graph, when
335 # stream intersection mode is enabled.
336 self._stream_inter_port_to_range = {}
337
338 for src_comp_and_spec in self._src_comps_and_specs:
339 # Query the port's component for the `babeltrace.trace-infos`
340 # object which contains the range for each stream, from which we can
341 # compute the intersection of the streams in each trace.
342 query_exec = bt2_query_executor.QueryExecutor(
343 src_comp_and_spec.spec.component_class,
344 "babeltrace.trace-infos",
345 src_comp_and_spec.spec.params,
346 )
347 trace_infos = query_exec.query()
348
349 for trace_info in trace_infos:
350 begin = max(
351 [
352 stream["range-ns"]["begin"]
353 for stream in trace_info["stream-infos"]
354 ]
355 )
356 end = min(
357 [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]]
358 )
359
360 # Each port associated to this trace will have this computed
361 # range.
362 for stream in trace_info["stream-infos"]:
363 # A port name is unique within a component, but not
364 # necessarily across all components. Use a component
365 # and port name pair to make it unique across the graph.
366 port_name = str(stream["port-name"])
367 key = (src_comp_and_spec.comp.addr, port_name)
368 self._stream_inter_port_to_range[key] = (begin, end)
369
370 def _validate_source_component_specs(self, comp_specs):
371 for comp_spec in comp_specs:
372 if (
373 type(comp_spec) is not ComponentSpec
374 and type(comp_spec) is not AutoSourceComponentSpec
375 ):
376 raise TypeError(
377 '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format(
378 type(comp_spec)
379 )
380 )
381
382 def _validate_filter_component_specs(self, comp_specs):
383 for comp_spec in comp_specs:
384 if type(comp_spec) is not ComponentSpec:
385 raise TypeError(
386 '"{}" object is not a ComponentSpec'.format(type(comp_spec))
387 )
388
389 def __next__(self):
390 assert self._msg_list[0] is None
391 self._graph.run_once()
392 msg = self._msg_list[0]
393 assert msg is not None
394 self._msg_list[0] = None
395 return msg
396
397 def _create_stream_intersection_trimmer(self, component, port):
398 key = (component.addr, port.name)
399 begin, end = self._stream_inter_port_to_range[key]
400 name = "trimmer-{}-{}".format(component.name, port.name)
401 return self._create_trimmer(begin, end, name)
402
403 def _create_muxer(self):
404 plugin = bt2_plugin.find_plugin("utils")
405
406 if plugin is None:
407 raise RuntimeError('cannot find "utils" plugin (needed for the muxer)')
408
409 if "muxer" not in plugin.filter_component_classes:
410 raise RuntimeError(
411 'cannot find "muxer" filter component class in "utils" plugin'
412 )
413
414 comp_cls = plugin.filter_component_classes["muxer"]
415 return self._graph.add_component(comp_cls, "muxer")
416
417 def _create_trimmer(self, begin_ns, end_ns, name):
418 plugin = bt2_plugin.find_plugin("utils")
419
420 if plugin is None:
421 raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)')
422
423 if "trimmer" not in plugin.filter_component_classes:
424 raise RuntimeError(
425 'cannot find "trimmer" filter component class in "utils" plugin'
426 )
427
428 params = {}
429
430 def ns_to_string(ns):
431 s_part = ns // 1000000000
432 ns_part = ns % 1000000000
433 return "{}.{:09d}".format(s_part, ns_part)
434
435 if begin_ns is not None:
436 params["begin"] = ns_to_string(begin_ns)
437
438 if end_ns is not None:
439 params["end"] = ns_to_string(end_ns)
440
441 comp_cls = plugin.filter_component_classes["trimmer"]
442 return self._graph.add_component(comp_cls, name, params)
443
444 def _get_unique_comp_name(self, comp_cls):
445 name = comp_cls.name
446 comps_and_specs = itertools.chain(
447 self._src_comps_and_specs, self._flt_comps_and_specs
448 )
449
450 if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
451 name += "-{}".format(self._next_suffix)
452 self._next_suffix += 1
453
454 return name
455
456 def _create_comp(self, comp_spec):
457 comp_cls = comp_spec.component_class
458 name = self._get_unique_comp_name(comp_cls)
459 comp = self._graph.add_component(
460 comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level
461 )
462 return comp
463
464 def _get_free_muxer_input_port(self):
465 for port in self._muxer_comp.input_ports.values():
466 if not port.is_connected:
467 return port
468
469 def _connect_src_comp_port(self, component, port):
470 # if this trace collection iterator is in stream intersection
471 # mode, we need this connection:
472 #
473 # port -> trimmer -> muxer
474 #
475 # otherwise, simply:
476 #
477 # port -> muxer
478 if self._stream_intersection_mode:
479 trimmer_comp = self._create_stream_intersection_trimmer(component, port)
480 self._graph.connect_ports(port, trimmer_comp.input_ports["in"])
481 port_to_muxer = trimmer_comp.output_ports["out"]
482 else:
483 port_to_muxer = port
484
485 self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port())
486
487 def _graph_port_added(self, component, port):
488 if not self._connect_ports:
489 return
490
491 if type(port) is bt2_port._InputPortConst:
492 return
493
494 if component not in [comp.comp for comp in self._src_comps_and_specs]:
495 # do not care about non-source components (muxer, trimmer, etc.)
496 return
497
498 self._connect_src_comp_port(component, port)
499
500 def _get_greatest_operative_mip_version(self):
501 def append_comp_specs_descriptors(descriptors, comp_specs):
502 for comp_spec in comp_specs:
503 descriptors.append(
504 bt2_component_descriptor.ComponentDescriptor(
505 comp_spec.component_class, comp_spec.params, comp_spec.obj
506 )
507 )
508
509 descriptors = []
510 append_comp_specs_descriptors(descriptors, self._src_comp_specs)
511 append_comp_specs_descriptors(descriptors, self._flt_comp_specs)
512
513 if self._stream_intersection_mode:
514 # we also need at least one `flt.utils.trimmer` component
515 comp_spec = ComponentSpec.from_named_plugin_and_component_class(
516 "utils", "trimmer"
517 )
518 append_comp_specs_descriptors(descriptors, [comp_spec])
519
520 mip_version = bt2_mip.get_greatest_operative_mip_version(descriptors)
521
522 if mip_version is None:
523 msg = "failed to find an operative message interchange protocol version (components are not interoperable)"
524 raise RuntimeError(msg)
525
526 return mip_version
527
528 def _build_graph(self):
529 self._graph = bt2_graph.Graph(self._get_greatest_operative_mip_version())
530 self._graph.add_port_added_listener(self._graph_port_added)
531 self._muxer_comp = self._create_muxer()
532
533 if self._begin_ns is not None or self._end_ns is not None:
534 trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer")
535 self._graph.connect_ports(
536 self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"]
537 )
538 last_flt_out_port = trimmer_comp.output_ports["out"]
539 else:
540 last_flt_out_port = self._muxer_comp.output_ports["out"]
541
542 # create extra filter components (chained)
543 for comp_spec in self._flt_comp_specs:
544 comp = self._create_comp(comp_spec)
545 self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
546
547 # connect the extra filter chain
548 for comp_and_spec in self._flt_comps_and_specs:
549 in_port = list(comp_and_spec.comp.input_ports.values())[0]
550 out_port = list(comp_and_spec.comp.output_ports.values())[0]
551 self._graph.connect_ports(last_flt_out_port, in_port)
552 last_flt_out_port = out_port
553
554 # Here we create the components, self._graph_port_added() is
555 # called when they add ports, but the callback returns early
556 # because self._connect_ports is False. This is because the
557 # self._graph_port_added() could not find the associated source
558 # component specification in self._src_comps_and_specs because
559 # it does not exist yet (it needs the created component to
560 # exist).
561 for comp_spec in self._src_comp_specs:
562 comp = self._create_comp(comp_spec)
563 self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
564
565 if self._stream_intersection_mode:
566 self._compute_stream_intersections()
567
568 # Now we connect the ports which exist at this point. We allow
569 # self._graph_port_added() to automatically connect _new_ ports.
570 self._connect_ports = True
571
572 for comp_and_spec in self._src_comps_and_specs:
573 # Keep a separate list because comp_and_spec.output_ports
574 # could change during the connection of one of its ports.
575 # Any new port is handled by self._graph_port_added().
576 out_ports = [port for port in comp_and_spec.comp.output_ports.values()]
577
578 for out_port in out_ports:
579 if out_port.is_connected:
580 continue
581
582 self._connect_src_comp_port(comp_and_spec.comp, out_port)
583
584 # Add the proxy sink, passing our message list to share consumed
585 # messages with this trace collection message iterator.
586 sink = self._graph.add_component(
587 _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list
588 )
589 sink_in_port = sink.input_ports["in"]
590
591 # connect last filter to proxy sink
592 self._graph.connect_ports(last_flt_out_port, sink_in_port)
This page took 0.040967 seconds and 4 git commands to generate.