67a713b3a8838ff6e690bfe41a0e0a27bc646364
[babeltrace.git] / src / bindings / python / bt2 / bt2 / trace_collection_message_iterator.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4
5 from bt2 import native_bt
6 from bt2 import utils as bt2_utils
7 import bt2
8 import itertools
9 from bt2 import message_iterator as bt2_message_iterator
10 from bt2 import port as bt2_port
11 from bt2 import component as bt2_component
12 from bt2 import value as bt2_value
13 from bt2 import plugin as bt2_plugin
14 import datetime
15 from collections import namedtuple
16 import numbers
17
18
19 # a pair of component and ComponentSpec
20 _ComponentAndSpec = namedtuple("_ComponentAndSpec", ["comp", "spec"])
21
22
23 class _BaseComponentSpec:
24 # Base for any component spec that can be passed to
25 # TraceCollectionMessageIterator.
26 def __init__(self, params, obj, logging_level):
27 if logging_level is not None:
28 bt2_utils._check_log_level(logging_level)
29
30 self._params = bt2.create_value(params)
31 self._obj = obj
32 self._logging_level = logging_level
33
34 @property
35 def params(self):
36 return self._params
37
38 @property
39 def obj(self):
40 return self._obj
41
42 @property
43 def logging_level(self):
44 return self._logging_level
45
46
47 class ComponentSpec(_BaseComponentSpec):
48 # A component spec with a specific component class.
49 def __init__(
50 self,
51 component_class,
52 params=None,
53 obj=None,
54 logging_level=bt2.LoggingLevel.NONE,
55 ):
56 if type(params) is str:
57 params = {"inputs": [params]}
58
59 super().__init__(params, obj, logging_level)
60
61 is_cc_object = isinstance(
62 component_class,
63 (bt2._SourceComponentClassConst, bt2._FilterComponentClassConst),
64 )
65 is_user_cc_type = isinstance(
66 component_class, bt2_component._UserComponentType
67 ) and issubclass(
68 component_class, (bt2._UserSourceComponent, bt2._UserFilterComponent)
69 )
70
71 if not is_cc_object and not is_user_cc_type:
72 raise TypeError(
73 "'{}' is not a source or filter component class".format(
74 component_class.__class__.__name__
75 )
76 )
77
78 self._component_class = component_class
79
80 @property
81 def component_class(self):
82 return self._component_class
83
84 @classmethod
85 def from_named_plugin_and_component_class(
86 cls,
87 plugin_name,
88 component_class_name,
89 params=None,
90 obj=None,
91 logging_level=bt2.LoggingLevel.NONE,
92 ):
93 plugin = bt2.find_plugin(plugin_name)
94
95 if plugin is None:
96 raise ValueError("no such plugin: {}".format(plugin_name))
97
98 if component_class_name in plugin.source_component_classes:
99 comp_class = plugin.source_component_classes[component_class_name]
100 elif component_class_name in plugin.filter_component_classes:
101 comp_class = plugin.filter_component_classes[component_class_name]
102 else:
103 raise KeyError(
104 "source or filter component class `{}` not found in plugin `{}`".format(
105 component_class_name, plugin_name
106 )
107 )
108
109 return cls(comp_class, params, obj, logging_level)
110
111
112 class AutoSourceComponentSpec(_BaseComponentSpec):
113 # A component spec that does automatic source discovery.
114 _no_obj = object()
115
116 def __init__(self, input, params=None, obj=_no_obj, logging_level=None):
117 super().__init__(params, obj, logging_level)
118 self._input = input
119
120 @property
121 def input(self):
122 return self._input
123
124
125 def _auto_discover_source_component_specs(auto_source_comp_specs, plugin_set):
126 # Transform a list of `AutoSourceComponentSpec` in a list of `ComponentSpec`
127 # using the automatic source discovery mechanism.
128 inputs = bt2.ArrayValue([spec.input for spec in auto_source_comp_specs])
129
130 if plugin_set is None:
131 plugin_set = bt2.find_plugins()
132 else:
133 bt2_utils._check_type(plugin_set, bt2_plugin._PluginSet)
134
135 res_ptr = native_bt.bt2_auto_discover_source_components(
136 inputs._ptr, plugin_set._ptr
137 )
138
139 if res_ptr is None:
140 raise bt2._MemoryError("cannot auto discover source components")
141
142 res = bt2_value._create_from_ptr(res_ptr)
143
144 assert type(res) is bt2.MapValue
145 assert "status" in res
146
147 status = res["status"]
148 bt2_utils._handle_func_status(status, "cannot auto-discover source components")
149
150 comp_specs = []
151 comp_specs_raw = res["results"]
152 assert type(comp_specs_raw) is bt2.ArrayValue
153
154 used_input_indices = set()
155
156 for comp_spec_raw in comp_specs_raw:
157 assert type(comp_spec_raw) is bt2.ArrayValue
158 assert len(comp_spec_raw) == 4
159
160 plugin_name = comp_spec_raw[0]
161 assert type(plugin_name) is bt2.StringValue
162 plugin_name = str(plugin_name)
163
164 class_name = comp_spec_raw[1]
165 assert type(class_name) is bt2.StringValue
166 class_name = str(class_name)
167
168 comp_inputs = comp_spec_raw[2]
169 assert type(comp_inputs) is bt2.ArrayValue
170
171 comp_orig_indices = comp_spec_raw[3]
172 assert type(comp_orig_indices)
173
174 params = bt2.MapValue()
175 logging_level = bt2.LoggingLevel.NONE
176 obj = None
177
178 # Compute `params` for this component by piling up params given to all
179 # AutoSourceComponentSpec objects that contributed in the instantiation
180 # of this component.
181 #
182 # The effective log level for a component is the last one specified
183 # across the AutoSourceComponentSpec that contributed in its
184 # instantiation.
185 for idx in comp_orig_indices:
186 orig_spec = auto_source_comp_specs[idx]
187
188 if orig_spec.params is not None:
189 params.update(orig_spec.params)
190
191 if orig_spec.logging_level is not None:
192 logging_level = orig_spec.logging_level
193
194 if orig_spec.obj is not AutoSourceComponentSpec._no_obj:
195 obj = orig_spec.obj
196
197 used_input_indices.add(int(idx))
198
199 params["inputs"] = comp_inputs
200
201 comp_specs.append(
202 ComponentSpec.from_named_plugin_and_component_class(
203 plugin_name,
204 class_name,
205 params=params,
206 obj=obj,
207 logging_level=logging_level,
208 )
209 )
210
211 if len(used_input_indices) != len(inputs):
212 unused_input_indices = set(range(len(inputs))) - used_input_indices
213 unused_input_indices = sorted(unused_input_indices)
214 unused_inputs = [str(inputs[x]) for x in unused_input_indices]
215
216 msg = (
217 "Some auto source component specs did not produce any component: "
218 + ", ".join(unused_inputs)
219 )
220 raise RuntimeError(msg)
221
222 return comp_specs
223
224
225 # datetime.datetime or integral to nanoseconds
226 def _get_ns(obj):
227 if obj is None:
228 return
229
230 if isinstance(obj, numbers.Real):
231 # consider that it's already in seconds
232 s = obj
233 elif isinstance(obj, datetime.datetime):
234 # s -> ns
235 s = obj.timestamp()
236 else:
237 raise TypeError(
238 '"{}" is not an integral number or a datetime.datetime object'.format(obj)
239 )
240
241 return int(s * 1e9)
242
243
244 class _TraceCollectionMessageIteratorProxySink(bt2_component._UserSinkComponent):
245 def __init__(self, config, params, msg_list):
246 assert type(msg_list) is list
247 self._msg_list = msg_list
248 self._add_input_port("in")
249
250 def _user_graph_is_configured(self):
251 self._msg_iter = self._create_message_iterator(self._input_ports["in"])
252
253 def _user_consume(self):
254 assert self._msg_list[0] is None
255 self._msg_list[0] = next(self._msg_iter)
256
257
258 class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator):
259 def __init__(
260 self,
261 source_component_specs,
262 filter_component_specs=None,
263 stream_intersection_mode=False,
264 begin=None,
265 end=None,
266 plugin_set=None,
267 ):
268 bt2_utils._check_bool(stream_intersection_mode)
269 self._stream_intersection_mode = stream_intersection_mode
270 self._begin_ns = _get_ns(begin)
271 self._end_ns = _get_ns(end)
272 self._msg_list = [None]
273
274 # If a single item is provided, convert to a list.
275 if type(source_component_specs) in (
276 ComponentSpec,
277 AutoSourceComponentSpec,
278 str,
279 ):
280 source_component_specs = [source_component_specs]
281
282 # Convert any string to an AutoSourceComponentSpec.
283 def str_to_auto(item):
284 if type(item) is str:
285 item = AutoSourceComponentSpec(item)
286
287 return item
288
289 source_component_specs = [str_to_auto(s) for s in source_component_specs]
290
291 if type(filter_component_specs) is ComponentSpec:
292 filter_component_specs = [filter_component_specs]
293 elif filter_component_specs is None:
294 filter_component_specs = []
295
296 self._validate_source_component_specs(source_component_specs)
297 self._validate_filter_component_specs(filter_component_specs)
298
299 # Pass any `ComponentSpec` instance as-is.
300 self._src_comp_specs = [
301 spec for spec in source_component_specs if type(spec) is ComponentSpec
302 ]
303
304 # Convert any `AutoSourceComponentSpec` in concrete `ComponentSpec` instances.
305 auto_src_comp_specs = [
306 spec
307 for spec in source_component_specs
308 if type(spec) is AutoSourceComponentSpec
309 ]
310 self._src_comp_specs += _auto_discover_source_component_specs(
311 auto_src_comp_specs, plugin_set
312 )
313
314 self._flt_comp_specs = filter_component_specs
315 self._next_suffix = 1
316 self._connect_ports = False
317
318 # lists of _ComponentAndSpec
319 self._src_comps_and_specs = []
320 self._flt_comps_and_specs = []
321
322 self._build_graph()
323
324 def _compute_stream_intersections(self):
325 # Pre-compute the trimmer range to use for each port in the graph, when
326 # stream intersection mode is enabled.
327 self._stream_inter_port_to_range = {}
328
329 for src_comp_and_spec in self._src_comps_and_specs:
330 # Query the port's component for the `babeltrace.trace-infos`
331 # object which contains the range for each stream, from which we can
332 # compute the intersection of the streams in each trace.
333 query_exec = bt2.QueryExecutor(
334 src_comp_and_spec.spec.component_class,
335 "babeltrace.trace-infos",
336 src_comp_and_spec.spec.params,
337 )
338 trace_infos = query_exec.query()
339
340 for trace_info in trace_infos:
341 begin = max(
342 [
343 stream["range-ns"]["begin"]
344 for stream in trace_info["stream-infos"]
345 ]
346 )
347 end = min(
348 [stream["range-ns"]["end"] for stream in trace_info["stream-infos"]]
349 )
350
351 # Each port associated to this trace will have this computed
352 # range.
353 for stream in trace_info["stream-infos"]:
354 # A port name is unique within a component, but not
355 # necessarily across all components. Use a component
356 # and port name pair to make it unique across the graph.
357 port_name = str(stream["port-name"])
358 key = (src_comp_and_spec.comp.addr, port_name)
359 self._stream_inter_port_to_range[key] = (begin, end)
360
361 def _validate_source_component_specs(self, comp_specs):
362 for comp_spec in comp_specs:
363 if (
364 type(comp_spec) is not ComponentSpec
365 and type(comp_spec) is not AutoSourceComponentSpec
366 ):
367 raise TypeError(
368 '"{}" object is not a ComponentSpec or AutoSourceComponentSpec'.format(
369 type(comp_spec)
370 )
371 )
372
373 def _validate_filter_component_specs(self, comp_specs):
374 for comp_spec in comp_specs:
375 if type(comp_spec) is not ComponentSpec:
376 raise TypeError(
377 '"{}" object is not a ComponentSpec'.format(type(comp_spec))
378 )
379
380 def __next__(self):
381 assert self._msg_list[0] is None
382 self._graph.run_once()
383 msg = self._msg_list[0]
384 assert msg is not None
385 self._msg_list[0] = None
386 return msg
387
388 def _create_stream_intersection_trimmer(self, component, port):
389 key = (component.addr, port.name)
390 begin, end = self._stream_inter_port_to_range[key]
391 name = "trimmer-{}-{}".format(component.name, port.name)
392 return self._create_trimmer(begin, end, name)
393
394 def _create_muxer(self):
395 plugin = bt2.find_plugin("utils")
396
397 if plugin is None:
398 raise RuntimeError('cannot find "utils" plugin (needed for the muxer)')
399
400 if "muxer" not in plugin.filter_component_classes:
401 raise RuntimeError(
402 'cannot find "muxer" filter component class in "utils" plugin'
403 )
404
405 comp_cls = plugin.filter_component_classes["muxer"]
406 return self._graph.add_component(comp_cls, "muxer")
407
408 def _create_trimmer(self, begin_ns, end_ns, name):
409 plugin = bt2.find_plugin("utils")
410
411 if plugin is None:
412 raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)')
413
414 if "trimmer" not in plugin.filter_component_classes:
415 raise RuntimeError(
416 'cannot find "trimmer" filter component class in "utils" plugin'
417 )
418
419 params = {}
420
421 def ns_to_string(ns):
422 s_part = ns // 1000000000
423 ns_part = ns % 1000000000
424 return "{}.{:09d}".format(s_part, ns_part)
425
426 if begin_ns is not None:
427 params["begin"] = ns_to_string(begin_ns)
428
429 if end_ns is not None:
430 params["end"] = ns_to_string(end_ns)
431
432 comp_cls = plugin.filter_component_classes["trimmer"]
433 return self._graph.add_component(comp_cls, name, params)
434
435 def _get_unique_comp_name(self, comp_cls):
436 name = comp_cls.name
437 comps_and_specs = itertools.chain(
438 self._src_comps_and_specs, self._flt_comps_and_specs
439 )
440
441 if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]:
442 name += "-{}".format(self._next_suffix)
443 self._next_suffix += 1
444
445 return name
446
447 def _create_comp(self, comp_spec):
448 comp_cls = comp_spec.component_class
449 name = self._get_unique_comp_name(comp_cls)
450 comp = self._graph.add_component(
451 comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level
452 )
453 return comp
454
455 def _get_free_muxer_input_port(self):
456 for port in self._muxer_comp.input_ports.values():
457 if not port.is_connected:
458 return port
459
460 def _connect_src_comp_port(self, component, port):
461 # if this trace collection iterator is in stream intersection
462 # mode, we need this connection:
463 #
464 # port -> trimmer -> muxer
465 #
466 # otherwise, simply:
467 #
468 # port -> muxer
469 if self._stream_intersection_mode:
470 trimmer_comp = self._create_stream_intersection_trimmer(component, port)
471 self._graph.connect_ports(port, trimmer_comp.input_ports["in"])
472 port_to_muxer = trimmer_comp.output_ports["out"]
473 else:
474 port_to_muxer = port
475
476 self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port())
477
478 def _graph_port_added(self, component, port):
479 if not self._connect_ports:
480 return
481
482 if type(port) is bt2_port._InputPortConst:
483 return
484
485 if component not in [comp.comp for comp in self._src_comps_and_specs]:
486 # do not care about non-source components (muxer, trimmer, etc.)
487 return
488
489 self._connect_src_comp_port(component, port)
490
491 def _get_greatest_operative_mip_version(self):
492 def append_comp_specs_descriptors(descriptors, comp_specs):
493 for comp_spec in comp_specs:
494 descriptors.append(
495 bt2.ComponentDescriptor(
496 comp_spec.component_class, comp_spec.params, comp_spec.obj
497 )
498 )
499
500 descriptors = []
501 append_comp_specs_descriptors(descriptors, self._src_comp_specs)
502 append_comp_specs_descriptors(descriptors, self._flt_comp_specs)
503
504 if self._stream_intersection_mode:
505 # we also need at least one `flt.utils.trimmer` component
506 comp_spec = ComponentSpec.from_named_plugin_and_component_class(
507 "utils", "trimmer"
508 )
509 append_comp_specs_descriptors(descriptors, [comp_spec])
510
511 mip_version = bt2.get_greatest_operative_mip_version(descriptors)
512
513 if mip_version is None:
514 msg = "failed to find an operative message interchange protocol version (components are not interoperable)"
515 raise RuntimeError(msg)
516
517 return mip_version
518
519 def _build_graph(self):
520 self._graph = bt2.Graph(self._get_greatest_operative_mip_version())
521 self._graph.add_port_added_listener(self._graph_port_added)
522 self._muxer_comp = self._create_muxer()
523
524 if self._begin_ns is not None or self._end_ns is not None:
525 trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, "trimmer")
526 self._graph.connect_ports(
527 self._muxer_comp.output_ports["out"], trimmer_comp.input_ports["in"]
528 )
529 last_flt_out_port = trimmer_comp.output_ports["out"]
530 else:
531 last_flt_out_port = self._muxer_comp.output_ports["out"]
532
533 # create extra filter components (chained)
534 for comp_spec in self._flt_comp_specs:
535 comp = self._create_comp(comp_spec)
536 self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
537
538 # connect the extra filter chain
539 for comp_and_spec in self._flt_comps_and_specs:
540 in_port = list(comp_and_spec.comp.input_ports.values())[0]
541 out_port = list(comp_and_spec.comp.output_ports.values())[0]
542 self._graph.connect_ports(last_flt_out_port, in_port)
543 last_flt_out_port = out_port
544
545 # Here we create the components, self._graph_port_added() is
546 # called when they add ports, but the callback returns early
547 # because self._connect_ports is False. This is because the
548 # self._graph_port_added() could not find the associated source
549 # component specification in self._src_comps_and_specs because
550 # it does not exist yet (it needs the created component to
551 # exist).
552 for comp_spec in self._src_comp_specs:
553 comp = self._create_comp(comp_spec)
554 self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec))
555
556 if self._stream_intersection_mode:
557 self._compute_stream_intersections()
558
559 # Now we connect the ports which exist at this point. We allow
560 # self._graph_port_added() to automatically connect _new_ ports.
561 self._connect_ports = True
562
563 for comp_and_spec in self._src_comps_and_specs:
564 # Keep a separate list because comp_and_spec.output_ports
565 # could change during the connection of one of its ports.
566 # Any new port is handled by self._graph_port_added().
567 out_ports = [port for port in comp_and_spec.comp.output_ports.values()]
568
569 for out_port in out_ports:
570 if out_port.is_connected:
571 continue
572
573 self._connect_src_comp_port(comp_and_spec.comp, out_port)
574
575 # Add the proxy sink, passing our message list to share consumed
576 # messages with this trace collection message iterator.
577 sink = self._graph.add_component(
578 _TraceCollectionMessageIteratorProxySink, "proxy-sink", obj=self._msg_list
579 )
580 sink_in_port = sink.input_ports["in"]
581
582 # connect last filter to proxy sink
583 self._graph.connect_ports(last_flt_out_port, sink_in_port)
This page took 0.041108 seconds and 3 git commands to generate.