Commit | Line | Data |
---|---|---|
d34e69cf PP |
1 | # The MIT License (MIT) |
2 | # | |
3 | # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com> | |
4 | # | |
5 | # Permission is hereby granted, free of charge, to any person obtaining a copy | |
6 | # of this software and associated documentation files (the "Software"), to deal | |
7 | # in the Software without restriction, including without limitation the rights | |
8 | # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
9 | # copies of the Software, and to permit persons to whom the Software is | |
10 | # furnished to do so, subject to the following conditions: | |
11 | # | |
12 | # The above copyright notice and this permission notice shall be included in | |
13 | # all copies or substantial portions of the Software. | |
14 | # | |
15 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
16 | # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
17 | # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
18 | # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
19 | # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
20 | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
21 | # THE SOFTWARE. | |
22 | ||
23 | from bt2 import utils | |
24 | import bt2 | |
25 | import bt2.notification_iterator | |
26 | import datetime | |
27 | import collections.abc | |
28 | from collections import namedtuple | |
29 | import numbers | |
30 | ||
31 | ||
32 | # a pair of source component and _SourceComponentSpec | |
33 | _SourceComponentAndSpec = namedtuple('_SourceComponentAndSpec', | |
34 | ['comp', 'spec']) | |
35 | ||
36 | ||
37 | class SourceComponentSpec: | |
38 | def __init__(self, plugin_name, component_class_name, params=None): | |
39 | utils._check_str(plugin_name) | |
40 | utils._check_str(component_class_name) | |
41 | self._plugin_name = plugin_name | |
42 | self._component_class_name = component_class_name | |
43 | ||
44 | if type(params) is str: | |
45 | self._params = bt2.create_value({'path': params}) | |
46 | else: | |
47 | self._params = bt2.create_value(params) | |
48 | ||
49 | @property | |
50 | def plugin_name(self): | |
51 | return self._plugin_name | |
52 | ||
53 | @property | |
54 | def component_class_name(self): | |
55 | return self._component_class_name | |
56 | ||
57 | @property | |
58 | def params(self): | |
59 | return self._params | |
60 | ||
61 | ||
62 | # datetime.datetime or integral to nanoseconds | |
63 | def _get_ns(obj): | |
64 | if obj is None: | |
65 | return | |
66 | ||
67 | if isinstance(obj, numbers.Real): | |
68 | # consider that it's already in seconds | |
69 | s = obj | |
70 | elif isinstance(obj, datetime.datetime): | |
71 | # s -> ns | |
72 | s = obj.timestamp() | |
73 | else: | |
74 | raise TypeError('"{}" is not an integral number or a datetime.datetime object'.format(obj)) | |
75 | ||
76 | return int(s * 1e9) | |
77 | ||
78 | ||
79 | class TraceCollectionNotificationIterator(bt2.notification_iterator._NotificationIterator): | |
80 | def __init__(self, source_component_specs, notification_types=None, | |
81 | stream_intersection_mode=False, begin=None, | |
82 | end=None): | |
83 | utils._check_bool(stream_intersection_mode) | |
84 | self._stream_intersection_mode = stream_intersection_mode | |
85 | self._begin_ns = _get_ns(begin) | |
86 | self._end_ns = _get_ns(end) | |
87 | self._notification_types = notification_types | |
88 | self._src_comp_specs = source_component_specs | |
89 | self._next_suffix = 1 | |
90 | self._connect_ports = False | |
91 | ||
92 | # set of _SourceComponentAndSpec | |
93 | self._src_comps_and_specs = [] | |
94 | ||
95 | self._validate_source_component_specs() | |
96 | self._build_graph() | |
97 | ||
98 | def _validate_source_component_specs(self): | |
99 | for source_comp_spec in self._src_comp_specs: | |
100 | if type(source_comp_spec) is not SourceComponentSpec: | |
101 | raise TypeError('"{}" object is not a SourceComponentSpec'.format(type(source_comp_spec))) | |
102 | ||
103 | def __next__(self): | |
104 | return next(self._notif_iter) | |
105 | ||
106 | def _create_stream_intersection_trimmer(self, port): | |
107 | # find the original parameters specified by the user to create | |
108 | # this port's component to get the `path` parameter | |
109 | for src_comp_and_spec in self._src_comps_and_specs: | |
110 | if port.component == src_comp_and_spec.comp: | |
111 | params = src_comp_and_spec.spec.params | |
112 | break | |
113 | ||
114 | try: | |
115 | path = params['path'] | |
116 | except: | |
117 | raise bt2.Error('all source components must be created with a "path" parameter in stream intersection mode') | |
118 | ||
119 | params = {'path': str(path)} | |
120 | ||
121 | # query the port's component for the `trace-info` object which | |
122 | # contains the stream intersection range for each exposed | |
123 | # trace | |
124 | query_exec = bt2.QueryExecutor() | |
125 | trace_info_res = query_exec.query(port.component.component_class, | |
126 | 'trace-info', params) | |
127 | begin = None | |
128 | end = None | |
129 | ||
130 | # find the trace info for this port's trace by name's prefix | |
131 | for trace_info in trace_info_res: | |
132 | if port.name.startswith(str(trace_info['path'])): | |
133 | range_ns = trace_info['intersection-range-ns'] | |
134 | begin = range_ns['begin'] | |
135 | end = range_ns['end'] | |
136 | break | |
137 | ||
138 | if begin is None or end is None: | |
139 | raise bt2.Error('cannot find stream intersection range for port "{}"'.format(port.name)) | |
140 | ||
141 | name = 'trimmer-{}-{}'.format(port.component.name, port.name) | |
142 | return self._create_trimmer(begin, end, name) | |
143 | ||
144 | def _create_muxer(self): | |
145 | plugin = bt2.find_plugin('utils') | |
146 | ||
147 | if plugin is None: | |
148 | raise bt2.Error('cannot find "utils" plugin (needed for the muxer)') | |
149 | ||
150 | if 'muxer' not in plugin.filter_component_classes: | |
151 | raise bt2.Error('cannot find "muxer" filter component class in "utils" plugin') | |
152 | ||
153 | comp_cls = plugin.filter_component_classes['muxer'] | |
154 | return self._graph.add_component(comp_cls, 'muxer') | |
155 | ||
156 | def _create_trimmer(self, begin, end, name): | |
157 | plugin = bt2.find_plugin('utils') | |
158 | ||
159 | if plugin is None: | |
160 | raise bt2.Error('cannot find "utils" plugin (needed for the trimmer)') | |
161 | ||
162 | if 'trimmer' not in plugin.filter_component_classes: | |
163 | raise bt2.Error('cannot find "trimmer" filter component class in "utils" plugin') | |
164 | ||
165 | params = {} | |
166 | ||
167 | if begin is not None: | |
168 | params['begin'] = begin | |
169 | ||
170 | if end is not None: | |
171 | params['end'] = end | |
172 | ||
173 | comp_cls = plugin.filter_component_classes['trimmer'] | |
174 | return self._graph.add_component(comp_cls, name, params) | |
175 | ||
176 | def _get_unique_src_comp_name(self, comp_spec): | |
177 | name = '{}-{}'.format(comp_spec.plugin_name, | |
178 | comp_spec.component_class_name) | |
179 | ||
180 | if name in [comp_and_spec.comp.name for comp_and_spec in self._src_comps_and_specs]: | |
181 | name += '-{}'.format(self._next_suffix) | |
182 | self._next_suffix += 1 | |
183 | ||
184 | return name | |
185 | ||
186 | def _create_src_comp(self, comp_spec): | |
187 | plugin = bt2.find_plugin(comp_spec.plugin_name) | |
188 | ||
189 | if plugin is None: | |
190 | raise bt2.Error('no such plugin: {}'.format(comp_spec.plugin_name)) | |
191 | ||
192 | if comp_spec.component_class_name not in plugin.source_component_classes: | |
193 | raise bt2.Error('no such source component class in "{}" plugin: {}'.format(comp_spec.plugin_name, | |
194 | comp_spec.component_class_name)) | |
195 | ||
196 | comp_cls = plugin.source_component_classes[comp_spec.component_class_name] | |
197 | name = self._get_unique_src_comp_name(comp_spec) | |
198 | comp = self._graph.add_component(comp_cls, name, comp_spec.params) | |
199 | return comp | |
200 | ||
201 | def _get_free_muxer_input_port(self): | |
202 | for port in self._muxer_comp.input_ports.values(): | |
203 | if not port.is_connected: | |
204 | return port | |
205 | ||
206 | def _connect_src_comp_port(self, port): | |
207 | # if this trace collection iterator is in stream intersection | |
208 | # mode, we need this connection: | |
209 | # | |
210 | # port -> trimmer -> muxer | |
211 | # | |
212 | # otherwise, simply: | |
213 | # | |
214 | # port -> muxer | |
215 | if self._stream_intersection_mode: | |
216 | trimmer_comp = self._create_stream_intersection_trimmer(port) | |
217 | self._graph.connect_ports(port, trimmer_comp.input_ports['in']) | |
218 | port_to_muxer = trimmer_comp.output_ports['out'] | |
219 | else: | |
220 | port_to_muxer = port | |
221 | ||
222 | self._graph.connect_ports(port_to_muxer, self._get_free_muxer_input_port()) | |
223 | ||
224 | def _graph_port_added(self, port): | |
225 | if not self._connect_ports: | |
226 | return | |
227 | ||
228 | if type(port) is bt2._InputPort: | |
229 | return | |
230 | ||
231 | if port.component not in [comp.comp for comp in self._src_comps_and_specs]: | |
232 | # do not care about non-source components (muxer, trimmer, etc.) | |
233 | return | |
234 | ||
235 | self._connect_src_comp_port(port) | |
236 | ||
237 | def _build_graph(self): | |
238 | self._graph = bt2.Graph() | |
239 | self._graph.add_listener(bt2.GraphListenerType.PORT_ADDED, | |
240 | self._graph_port_added) | |
241 | self._muxer_comp = self._create_muxer() | |
242 | ||
243 | if self._begin_ns is not None or self._end_ns is not None: | |
244 | trimmer_comp = self._create_trimmer(self._begin_ns, | |
245 | self._end_ns, 'trimmer') | |
246 | self._graph.connect_ports(self._muxer_comp.output_ports['out'], | |
247 | trimmer_comp.input_ports['in']) | |
248 | notif_iter_port = trimmer_comp.output_ports['out'] | |
249 | else: | |
250 | notif_iter_port = self._muxer_comp.output_ports['out'] | |
251 | ||
252 | # Here we create the components, self._graph_port_added() is | |
253 | # called when they add ports, but the callback returns early | |
254 | # because self._connect_ports is False. This is because the | |
255 | # self._graph_port_added() could not find the associated source | |
256 | # component specification in self._src_comps_and_specs because | |
257 | # it does not exist yet (it needs the created component to | |
258 | # exist). | |
259 | for comp_spec in self._src_comp_specs: | |
260 | comp = self._create_src_comp(comp_spec) | |
261 | self._src_comps_and_specs.append(_SourceComponentAndSpec(comp, comp_spec)) | |
262 | ||
263 | # Now we connect the ports which exist at this point. We allow | |
264 | # self._graph_port_added() to automatically connect _new_ ports. | |
265 | self._connect_ports = True | |
266 | ||
267 | for comp_and_spec in self._src_comps_and_specs: | |
268 | # Keep a separate list because comp_and_spec.output_ports | |
269 | # could change during the connection of one of its ports. | |
270 | # Any new port is handled by self._graph_port_added(). | |
271 | out_ports = [port for port in comp_and_spec.comp.output_ports.values()] | |
272 | ||
273 | for out_port in out_ports: | |
274 | if not out_port.component or out_port.is_connected: | |
275 | continue | |
276 | ||
277 | self._connect_src_comp_port(out_port) | |
278 | ||
279 | # create this trace collection iterator's notification iterator | |
280 | self._notif_iter = notif_iter_port.create_notification_iterator(self._notification_types) |