Commit | Line | Data |
---|---|---|
d34e69cf PP |
1 | # The MIT License (MIT) |
2 | # | |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
4 | # | |
5 | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
6 | # of this software and associated documentation files (the "Software"), to deal | |
7 | # in the Software without restriction, including without limitation the rights | |
8 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
9 | # copies of the Software, and to permit persons to whom the Software is | |
10 | # furnished to do so, subject to the following conditions: | |
11 | # | |
12 | # The above copyright notice and this permission notice shall be included in | |
13 | # all copies or substantial portions of the Software. | |
14 | # | |
15 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
16 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
17 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
18 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
19 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
21 | # THE SOFTWARE. | |
22 | ||
23 | from bt2 import utils | |
24 | import bt2 | |
88fdcc33 | 25 | import itertools |
c946c9de PP |
26 | from bt2 import message_iterator as bt2_message_iterator |
27 | from bt2 import logging as bt2_logging | |
28 | from bt2 import port as bt2_port | |
d34e69cf | 29 | import datetime |
d34e69cf PP |
30 | from collections import namedtuple |
31 | import numbers | |
32 | ||
33 | ||
88fdcc33 PP |
34 | # a pair of component and ComponentSpec |
35 | _ComponentAndSpec = namedtuple('_ComponentAndSpec', ['comp', 'spec']) | |
d34e69cf PP |
36 | |
37 | ||
88fdcc33 | 38 | class ComponentSpec: |
61d96b89 FD |
39 | def __init__( |
40 | self, | |
41 | plugin_name, | |
42 | class_name, | |
43 | params=None, | |
b20382e2 | 44 | obj=None, |
c946c9de | 45 | logging_level=bt2_logging.LoggingLevel.NONE, |
61d96b89 | 46 | ): |
d34e69cf | 47 | utils._check_str(plugin_name) |
c88be1c8 | 48 | utils._check_str(class_name) |
cc81b5ab | 49 | utils._check_log_level(logging_level) |
d34e69cf | 50 | self._plugin_name = plugin_name |
c88be1c8 | 51 | self._class_name = class_name |
cc81b5ab | 52 | self._logging_level = logging_level |
b20382e2 | 53 | self._obj = obj |
d34e69cf PP |
54 | |
55 | if type(params) is str: | |
a1040187 | 56 | self._params = bt2.create_value({'inputs': [params]}) |
d34e69cf PP |
57 | else: |
58 | self._params = bt2.create_value(params) | |
59 | ||
60 | @property | |
61 | def plugin_name(self): | |
62 | return self._plugin_name | |
63 | ||
64 | @property | |
c88be1c8 PP |
65 | def class_name(self): |
66 | return self._class_name | |
d34e69cf | 67 | |
cc81b5ab PP |
68 | @property |
69 | def logging_level(self): | |
70 | return self._logging_level | |
71 | ||
d34e69cf PP |
72 | @property |
73 | def params(self): | |
74 | return self._params | |
75 | ||
b20382e2 PP |
76 | @property |
77 | def obj(self): | |
78 | return self._obj | |
79 | ||
d34e69cf PP |
80 | |
81 | # datetime.datetime or integral to nanoseconds | |
82 | def _get_ns(obj): | |
83 | if obj is None: | |
84 | return | |
85 | ||
86 | if isinstance(obj, numbers.Real): | |
87 | # consider that it's already in seconds | |
88 | s = obj | |
89 | elif isinstance(obj, datetime.datetime): | |
90 | # s -> ns | |
91 | s = obj.timestamp() | |
92 | else: | |
61d96b89 FD |
93 | raise TypeError( |
94 | '"{}" is not an integral number or a datetime.datetime object'.format(obj) | |
95 | ) | |
d34e69cf PP |
96 | |
97 | return int(s * 1e9) | |
98 | ||
99 | ||
88fdcc33 PP |
100 | class _CompClsType: |
101 | SOURCE = 0 | |
102 | FILTER = 1 | |
103 | ||
104 | ||
c946c9de | 105 | class TraceCollectionMessageIterator(bt2_message_iterator._MessageIterator): |
61d96b89 FD |
106 | def __init__( |
107 | self, | |
108 | source_component_specs, | |
109 | filter_component_specs=None, | |
110 | stream_intersection_mode=False, | |
111 | begin=None, | |
112 | end=None, | |
113 | ): | |
d34e69cf PP |
114 | utils._check_bool(stream_intersection_mode) |
115 | self._stream_intersection_mode = stream_intersection_mode | |
116 | self._begin_ns = _get_ns(begin) | |
117 | self._end_ns = _get_ns(end) | |
88fdcc33 PP |
118 | |
119 | if type(source_component_specs) is ComponentSpec: | |
120 | source_component_specs = [source_component_specs] | |
121 | ||
122 | if type(filter_component_specs) is ComponentSpec: | |
123 | filter_component_specs = [filter_component_specs] | |
124 | elif filter_component_specs is None: | |
125 | filter_component_specs = [] | |
126 | ||
d34e69cf | 127 | self._src_comp_specs = source_component_specs |
88fdcc33 | 128 | self._flt_comp_specs = filter_component_specs |
d34e69cf PP |
129 | self._next_suffix = 1 |
130 | self._connect_ports = False | |
131 | ||
88fdcc33 | 132 | # lists of _ComponentAndSpec |
d34e69cf | 133 | self._src_comps_and_specs = [] |
88fdcc33 | 134 | self._flt_comps_and_specs = [] |
d34e69cf | 135 | |
88fdcc33 PP |
136 | self._validate_component_specs(source_component_specs) |
137 | self._validate_component_specs(filter_component_specs) | |
d34e69cf PP |
138 | self._build_graph() |
139 | ||
88fdcc33 PP |
140 | def _validate_component_specs(self, comp_specs): |
141 | for comp_spec in comp_specs: | |
142 | if type(comp_spec) is not ComponentSpec: | |
61d96b89 FD |
143 | raise TypeError( |
144 | '"{}" object is not a ComponentSpec'.format(type(comp_spec)) | |
145 | ) | |
d34e69cf PP |
146 | |
147 | def __next__(self): | |
fa4c33e3 | 148 | return next(self._msg_iter) |
d34e69cf | 149 | |
da35796c | 150 | def _create_stream_intersection_trimmer(self, component, port): |
d34e69cf | 151 | # find the original parameters specified by the user to create |
a1040187 | 152 | # this port's component to get the `inputs` parameter |
d34e69cf | 153 | for src_comp_and_spec in self._src_comps_and_specs: |
da35796c | 154 | if component == src_comp_and_spec.comp: |
d34e69cf PP |
155 | break |
156 | ||
157 | try: | |
a1040187 | 158 | inputs = src_comp_and_spec.spec.params['inputs'] |
da35796c | 159 | except Exception as e: |
3b2be708 | 160 | raise ValueError( |
a1040187 | 161 | 'all source components must be created with an "inputs" parameter in stream intersection mode' |
61d96b89 | 162 | ) from e |
d34e69cf | 163 | |
a1040187 | 164 | params = {'inputs': inputs} |
d34e69cf | 165 | |
9e534aae PP |
166 | # query the port's component for the `babeltrace.trace-info` |
167 | # object which contains the stream intersection range for each | |
168 | # exposed trace | |
bf403eb2 | 169 | query_exec = bt2.QueryExecutor( |
9e534aae | 170 | src_comp_and_spec.comp.cls, 'babeltrace.trace-info', params |
61d96b89 | 171 | ) |
bf403eb2 | 172 | trace_info_res = query_exec.query() |
d34e69cf PP |
173 | begin = None |
174 | end = None | |
175 | ||
ddf49b27 | 176 | # find the trace info for this port's trace |
f4811b4f PP |
177 | try: |
178 | for trace_info in trace_info_res: | |
ddf49b27 SM |
179 | for stream in trace_info['streams']: |
180 | if stream['port-name'] == port.name: | |
181 | range_ns = trace_info['intersection-range-ns'] | |
182 | begin = range_ns['begin'] | |
183 | end = range_ns['end'] | |
184 | break | |
da35796c | 185 | except Exception: |
f4811b4f | 186 | pass |
d34e69cf PP |
187 | |
188 | if begin is None or end is None: | |
3b2be708 | 189 | raise RuntimeError( |
61d96b89 FD |
190 | 'cannot find stream intersection range for port "{}"'.format(port.name) |
191 | ) | |
d34e69cf | 192 | |
da35796c | 193 | name = 'trimmer-{}-{}'.format(src_comp_and_spec.comp.name, port.name) |
d34e69cf PP |
194 | return self._create_trimmer(begin, end, name) |
195 | ||
196 | def _create_muxer(self): | |
197 | plugin = bt2.find_plugin('utils') | |
198 | ||
199 | if plugin is None: | |
3b2be708 | 200 | raise RuntimeError('cannot find "utils" plugin (needed for the muxer)') |
d34e69cf PP |
201 | |
202 | if 'muxer' not in plugin.filter_component_classes: | |
3b2be708 | 203 | raise RuntimeError( |
61d96b89 FD |
204 | 'cannot find "muxer" filter component class in "utils" plugin' |
205 | ) | |
d34e69cf PP |
206 | |
207 | comp_cls = plugin.filter_component_classes['muxer'] | |
208 | return self._graph.add_component(comp_cls, 'muxer') | |
209 | ||
da35796c | 210 | def _create_trimmer(self, begin_ns, end_ns, name): |
d34e69cf PP |
211 | plugin = bt2.find_plugin('utils') |
212 | ||
213 | if plugin is None: | |
3b2be708 | 214 | raise RuntimeError('cannot find "utils" plugin (needed for the trimmer)') |
d34e69cf PP |
215 | |
216 | if 'trimmer' not in plugin.filter_component_classes: | |
3b2be708 | 217 | raise RuntimeError( |
61d96b89 FD |
218 | 'cannot find "trimmer" filter component class in "utils" plugin' |
219 | ) | |
d34e69cf PP |
220 | |
221 | params = {} | |
222 | ||
da35796c SM |
223 | def ns_to_string(ns): |
224 | s_part = ns // 1000000000 | |
225 | ns_part = ns % 1000000000 | |
226 | return '{}.{:09d}'.format(s_part, ns_part) | |
d34e69cf | 227 | |
da35796c SM |
228 | if begin_ns is not None: |
229 | params['begin'] = ns_to_string(begin_ns) | |
230 | ||
231 | if end_ns is not None: | |
232 | params['end'] = ns_to_string(end_ns) | |
d34e69cf PP |
233 | |
234 | comp_cls = plugin.filter_component_classes['trimmer'] | |
235 | return self._graph.add_component(comp_cls, name, params) | |
236 | ||
88fdcc33 | 237 | def _get_unique_comp_name(self, comp_spec): |
61d96b89 FD |
238 | name = '{}-{}'.format(comp_spec.plugin_name, comp_spec.class_name) |
239 | comps_and_specs = itertools.chain( | |
240 | self._src_comps_and_specs, self._flt_comps_and_specs | |
241 | ) | |
d34e69cf | 242 | |
88fdcc33 | 243 | if name in [comp_and_spec.comp.name for comp_and_spec in comps_and_specs]: |
d34e69cf PP |
244 | name += '-{}'.format(self._next_suffix) |
245 | self._next_suffix += 1 | |
246 | ||
247 | return name | |
248 | ||
88fdcc33 | 249 | def _create_comp(self, comp_spec, comp_cls_type): |
d34e69cf PP |
250 | plugin = bt2.find_plugin(comp_spec.plugin_name) |
251 | ||
252 | if plugin is None: | |
3b2be708 | 253 | raise ValueError('no such plugin: {}'.format(comp_spec.plugin_name)) |
d34e69cf | 254 | |
88fdcc33 PP |
255 | if comp_cls_type == _CompClsType.SOURCE: |
256 | comp_classes = plugin.source_component_classes | |
257 | else: | |
258 | comp_classes = plugin.filter_component_classes | |
259 | ||
c88be1c8 | 260 | if comp_spec.class_name not in comp_classes: |
88fdcc33 | 261 | cc_type = 'source' if comp_cls_type == _CompClsType.SOURCE else 'filter' |
3b2be708 | 262 | raise ValueError( |
61d96b89 FD |
263 | 'no such {} component class in "{}" plugin: {}'.format( |
264 | cc_type, comp_spec.plugin_name, comp_spec.class_name | |
265 | ) | |
266 | ) | |
d34e69cf | 267 | |
c88be1c8 | 268 | comp_cls = comp_classes[comp_spec.class_name] |
88fdcc33 | 269 | name = self._get_unique_comp_name(comp_spec) |
61d96b89 | 270 | comp = self._graph.add_component( |
b20382e2 | 271 | comp_cls, name, comp_spec.params, comp_spec.obj, comp_spec.logging_level |
61d96b89 | 272 | ) |
d34e69cf PP |
273 | return comp |
274 | ||
275 | def _get_free_muxer_input_port(self): | |
276 | for port in self._muxer_comp.input_ports.values(): | |
277 | if not port.is_connected: | |
278 | return port | |
279 | ||
da35796c | 280 | def _connect_src_comp_port(self, component, port): |
d34e69cf PP |
281 | # if this trace collection iterator is in stream intersection |
282 | # mode, we need this connection: | |
283 | # | |
284 | # port -> trimmer -> muxer | |
285 | # | |
286 | # otherwise, simply: | |
287 | # | |
288 | # port -> muxer | |
289 | if self._stream_intersection_mode: | |
da35796c | 290 | trimmer_comp = self._create_stream_intersection_trimmer(component, port) |
d34e69cf PP |
291 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) |
292 | port_to_muxer = trimmer_comp.output_ports['out'] | |
293 | else: | |
294 | port_to_muxer = port | |
295 | ||
296 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
297 | ||
da35796c | 298 | def _graph_port_added(self, component, port): |
d34e69cf PP |
299 | if not self._connect_ports: |
300 | return | |
301 | ||
c946c9de | 302 | if type(port) is bt2_port._InputPort: |
d34e69cf PP |
303 | return |
304 | ||
da35796c | 305 | if component not in [comp.comp for comp in self._src_comps_and_specs]: |
d34e69cf PP |
306 | # do not care about non-source components (muxer, trimmer, etc.) |
307 | return | |
308 | ||
da35796c | 309 | self._connect_src_comp_port(component, port) |
d34e69cf PP |
310 | |
311 | def _build_graph(self): | |
312 | self._graph = bt2.Graph() | |
da35796c | 313 | self._graph.add_port_added_listener(self._graph_port_added) |
d34e69cf PP |
314 | self._muxer_comp = self._create_muxer() |
315 | ||
316 | if self._begin_ns is not None or self._end_ns is not None: | |
61d96b89 FD |
317 | trimmer_comp = self._create_trimmer(self._begin_ns, self._end_ns, 'trimmer') |
318 | self._graph.connect_ports( | |
319 | self._muxer_comp.output_ports['out'], trimmer_comp.input_ports['in'] | |
320 | ) | |
fa4c33e3 | 321 | msg_iter_port = trimmer_comp.output_ports['out'] |
d34e69cf | 322 | else: |
fa4c33e3 | 323 | msg_iter_port = self._muxer_comp.output_ports['out'] |
d34e69cf | 324 | |
88fdcc33 PP |
325 | # create extra filter components (chained) |
326 | for comp_spec in self._flt_comp_specs: | |
327 | comp = self._create_comp(comp_spec, _CompClsType.FILTER) | |
328 | self._flt_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
329 | ||
330 | # connect the extra filter chain | |
331 | for comp_and_spec in self._flt_comps_and_specs: | |
332 | in_port = list(comp_and_spec.comp.input_ports.values())[0] | |
333 | out_port = list(comp_and_spec.comp.output_ports.values())[0] | |
fa4c33e3 SM |
334 | self._graph.connect_ports(msg_iter_port, in_port) |
335 | msg_iter_port = out_port | |
88fdcc33 | 336 | |
d34e69cf PP |
337 | # Here we create the components, self._graph_port_added() is |
338 | # called when they add ports, but the callback returns early | |
339 | # because self._connect_ports is False. This is because the | |
340 | # self._graph_port_added() could not find the associated source | |
341 | # component specification in self._src_comps_and_specs because | |
342 | # it does not exist yet (it needs the created component to | |
343 | # exist). | |
344 | for comp_spec in self._src_comp_specs: | |
88fdcc33 PP |
345 | comp = self._create_comp(comp_spec, _CompClsType.SOURCE) |
346 | self._src_comps_and_specs.append(_ComponentAndSpec(comp, comp_spec)) | |
d34e69cf PP |
347 | |
348 | # Now we connect the ports which exist at this point. We allow | |
349 | # self._graph_port_added() to automatically connect _new_ ports. | |
350 | self._connect_ports = True | |
351 | ||
352 | for comp_and_spec in self._src_comps_and_specs: | |
353 | # Keep a separate list because comp_and_spec.output_ports | |
354 | # could change during the connection of one of its ports. | |
355 | # Any new port is handled by self._graph_port_added(). | |
356 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
357 | ||
358 | for out_port in out_ports: | |
da35796c | 359 | if out_port.is_connected: |
d34e69cf PP |
360 | continue |
361 | ||
da35796c | 362 | self._connect_src_comp_port(comp_and_spec.comp, out_port) |
d34e69cf | 363 | |
fa4c33e3 | 364 | # create this trace collection iterator's message iterator |
da35796c | 365 | self._msg_iter = self._graph.create_output_port_message_iterator(msg_iter_port) |