e2efa732baba9729f2635456341d11121717b009
[babeltrace.git] / src / bindings / python / bt2 / bt2 / message_iterator.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4
5 from bt2 import native_bt, object, utils
6 from bt2 import message as bt2_message
7 import collections.abc
8 from bt2 import stream as bt2_stream
9 from bt2 import event_class as bt2_event_class
10 from bt2 import packet as bt2_packet
11 from bt2 import port as bt2_port
12 from bt2 import clock_class as bt2_clock_class
13 import bt2
14
15
16 class _MessageIterator(collections.abc.Iterator):
17 def __next__(self):
18 raise NotImplementedError
19
20
21 class _UserComponentInputPortMessageIterator(object._SharedObject, _MessageIterator):
22 _get_ref = staticmethod(native_bt.message_iterator_get_ref)
23 _put_ref = staticmethod(native_bt.message_iterator_put_ref)
24
25 def __init__(self, ptr):
26 self._current_msgs = []
27 self._at = 0
28 super().__init__(ptr)
29
30 def __next__(self):
31 if len(self._current_msgs) == self._at:
32 status, msgs = native_bt.bt2_self_component_port_input_get_msg_range(
33 self._ptr
34 )
35 utils._handle_func_status(
36 status, "unexpected error: cannot advance the message iterator"
37 )
38 self._current_msgs = msgs
39 self._at = 0
40
41 msg_ptr = self._current_msgs[self._at]
42 self._at += 1
43
44 return bt2_message._create_from_ptr(msg_ptr)
45
46 def can_seek_beginning(self):
47 (status, res) = native_bt.message_iterator_can_seek_beginning(self._ptr)
48 utils._handle_func_status(
49 status,
50 "cannot check whether or not message iterator can seek its beginning",
51 )
52 return res != 0
53
54 def seek_beginning(self):
55 # Forget about buffered messages, they won't be valid after seeking.
56 self._current_msgs.clear()
57 self._at = 0
58
59 status = native_bt.message_iterator_seek_beginning(self._ptr)
60 utils._handle_func_status(status, "cannot seek message iterator beginning")
61
62 def can_seek_ns_from_origin(self, ns_from_origin):
63 utils._check_int64(ns_from_origin)
64 (status, res) = native_bt.message_iterator_can_seek_ns_from_origin(
65 self._ptr, ns_from_origin
66 )
67 utils._handle_func_status(
68 status,
69 "cannot check whether or not message iterator can seek given ns from origin",
70 )
71 return res != 0
72
73 def seek_ns_from_origin(self, ns_from_origin):
74 utils._check_int64(ns_from_origin)
75
76 # Forget about buffered messages, they won't be valid after seeking.
77 self._current_msgs.clear()
78 self._at = 0
79
80 status = native_bt.message_iterator_seek_ns_from_origin(
81 self._ptr, ns_from_origin
82 )
83 utils._handle_func_status(
84 status, "message iterator cannot seek given ns from origin"
85 )
86
87 @property
88 def can_seek_forward(self):
89 return native_bt.message_iterator_can_seek_forward(self._ptr)
90
91
92 class _MessageIteratorConfiguration:
93 def __init__(self, ptr):
94 self._ptr = ptr
95
96 def can_seek_forward(self, value):
97 utils._check_bool(value)
98 native_bt.self_message_iterator_configuration_set_can_seek_forward(
99 self._ptr, value
100 )
101
102 can_seek_forward = property(fset=can_seek_forward)
103
104
105 # This is extended by the user to implement component classes in Python. It
106 # is created for a given output port when an input port message iterator is
107 # created on the input port on the other side of the connection.
108 #
109 # Its purpose is to feed the messages that should go out through this output
110 # port.
111 class _UserMessageIterator(_MessageIterator):
112 def __new__(cls, ptr):
113 # User iterator objects are always created by the native side,
114 # that is, never instantiated directly by Python code.
115 #
116 # The native code calls this, then manually calls
117 # self.__init__() without the `ptr` argument. The user has
118 # access to self.component during this call, thanks to this
119 # self._bt_ptr argument being set.
120 #
121 # self._bt_ptr is NOT owned by this object here, so there's nothing
122 # to do in __del__().
123 self = super().__new__(cls)
124 self._bt_ptr = ptr
125 return self
126
127 def _bt_init_from_native(self, config_ptr, self_output_port_ptr):
128 self_output_port = bt2_port._create_self_from_ptr_and_get_ref(
129 self_output_port_ptr, native_bt.PORT_TYPE_OUTPUT
130 )
131 config = _MessageIteratorConfiguration(config_ptr)
132 self.__init__(config, self_output_port)
133
134 def __init__(self, config, self_output_port):
135 pass
136
137 @property
138 def _component(self):
139 return native_bt.bt2_get_user_component_from_user_msg_iter(self._bt_ptr)
140
141 @property
142 def _port(self):
143 port_ptr = native_bt.self_message_iterator_borrow_port(self._bt_ptr)
144 assert port_ptr is not None
145 return bt2_port._create_self_from_ptr_and_get_ref(
146 port_ptr, native_bt.PORT_TYPE_OUTPUT
147 )
148
149 @property
150 def addr(self):
151 return int(self._bt_ptr)
152
153 @property
154 def _is_interrupted(self):
155 return bool(native_bt.self_message_iterator_is_interrupted(self._bt_ptr))
156
157 def _user_finalize(self):
158 pass
159
160 def __next__(self):
161 raise bt2.Stop
162
163 def _bt_next_from_native(self):
164 # this can raise anything: it's catched by the native part
165 try:
166 msg = next(self)
167 except StopIteration:
168 raise bt2.Stop
169 except Exception:
170 raise
171
172 utils._check_type(msg, bt2_message._MessageConst)
173
174 # The reference we return will be given to the message array.
175 # However, the `msg` Python object may stay alive, if the user has kept
176 # a reference to it. Acquire a new reference to account for that.
177 msg._get_ref(msg._ptr)
178 return int(msg._ptr)
179
180 def _bt_can_seek_beginning_from_native(self):
181 # Here, we mimic the behavior of the C API:
182 #
183 # - If the iterator has a _user_can_seek_beginning method,
184 # read it and use that result.
185 # - Otherwise, the presence or absence of a `_user_seek_beginning`
186 # method indicates whether the iterator can seek beginning.
187 if hasattr(self, "_user_can_seek_beginning"):
188 can_seek_beginning = self._user_can_seek_beginning()
189 utils._check_bool(can_seek_beginning)
190 return can_seek_beginning
191 else:
192 return hasattr(self, "_user_seek_beginning")
193
194 def _bt_seek_beginning_from_native(self):
195 self._user_seek_beginning()
196
197 def _bt_can_seek_ns_from_origin_from_native(self, ns_from_origin):
198 # Return whether the iterator can seek ns from origin using the
199 # user-implemented seek_ns_from_origin method. We mimic the behavior
200 # of the C API:
201 #
202 # - If the iterator has a _user_can_seek_ns_from_origin method,
203 # call it and use its return value.
204 # - Otherwise, if there is a `_user_seek_ns_from_origin` method,
205 # we presume it's possible.
206
207 if hasattr(self, "_user_can_seek_ns_from_origin"):
208 can_seek_ns_from_origin = self._user_can_seek_ns_from_origin(ns_from_origin)
209 utils._check_bool(can_seek_ns_from_origin)
210 return can_seek_ns_from_origin
211 else:
212 return hasattr(self, "_user_seek_ns_from_origin")
213
214 def _bt_seek_ns_from_origin_from_native(self, ns_from_origin):
215 self._user_seek_ns_from_origin(ns_from_origin)
216
217 def _create_message_iterator(self, input_port):
218 utils._check_type(input_port, bt2_port._UserComponentInputPort)
219
220 if not input_port.is_connected:
221 raise ValueError("input port is not connected")
222
223 (
224 status,
225 msg_iter_ptr,
226 ) = native_bt.bt2_message_iterator_create_from_message_iterator(
227 self._bt_ptr, input_port._ptr
228 )
229 utils._handle_func_status(status, "cannot create message iterator object")
230 assert msg_iter_ptr is not None
231
232 return _UserComponentInputPortMessageIterator(msg_iter_ptr)
233
234 def _create_event_message(self, event_class, parent, default_clock_snapshot=None):
235 utils._check_type(event_class, bt2_event_class._EventClass)
236
237 if event_class.stream_class.supports_packets:
238 utils._check_type(parent, bt2_packet._Packet)
239 else:
240 utils._check_type(parent, bt2_stream._Stream)
241
242 if default_clock_snapshot is not None:
243 if event_class.stream_class.default_clock_class is None:
244 raise ValueError(
245 "event messages in this stream must not have a default clock snapshot"
246 )
247
248 utils._check_uint64(default_clock_snapshot)
249
250 if event_class.stream_class.supports_packets:
251 ptr = native_bt.message_event_create_with_packet_and_default_clock_snapshot(
252 self._bt_ptr, event_class._ptr, parent._ptr, default_clock_snapshot
253 )
254 else:
255 ptr = native_bt.message_event_create_with_default_clock_snapshot(
256 self._bt_ptr, event_class._ptr, parent._ptr, default_clock_snapshot
257 )
258 else:
259 if event_class.stream_class.default_clock_class is not None:
260 raise ValueError(
261 "event messages in this stream must have a default clock snapshot"
262 )
263
264 if event_class.stream_class.supports_packets:
265 ptr = native_bt.message_event_create_with_packet(
266 self._bt_ptr, event_class._ptr, parent._ptr
267 )
268 else:
269 ptr = native_bt.message_event_create(
270 self._bt_ptr, event_class._ptr, parent._ptr
271 )
272
273 if ptr is None:
274 raise bt2._MemoryError("cannot create event message object")
275
276 return bt2_message._EventMessage(ptr)
277
278 def _create_message_iterator_inactivity_message(self, clock_class, clock_snapshot):
279 utils._check_type(clock_class, bt2_clock_class._ClockClass)
280 ptr = native_bt.message_message_iterator_inactivity_create(
281 self._bt_ptr, clock_class._ptr, clock_snapshot
282 )
283
284 if ptr is None:
285 raise bt2._MemoryError("cannot create inactivity message object")
286
287 return bt2_message._MessageIteratorInactivityMessage(ptr)
288
289 def _create_stream_beginning_message(self, stream, default_clock_snapshot=None):
290 utils._check_type(stream, bt2_stream._Stream)
291
292 ptr = native_bt.message_stream_beginning_create(self._bt_ptr, stream._ptr)
293 if ptr is None:
294 raise bt2._MemoryError("cannot create stream beginning message object")
295
296 msg = bt2_message._StreamBeginningMessage(ptr)
297
298 if default_clock_snapshot is not None:
299 msg._default_clock_snapshot = default_clock_snapshot
300
301 return msg
302
303 def _create_stream_end_message(self, stream, default_clock_snapshot=None):
304 utils._check_type(stream, bt2_stream._Stream)
305
306 ptr = native_bt.message_stream_end_create(self._bt_ptr, stream._ptr)
307 if ptr is None:
308 raise bt2._MemoryError("cannot create stream end message object")
309
310 msg = bt2_message._StreamEndMessage(ptr)
311
312 if default_clock_snapshot is not None:
313 msg._default_clock_snapshot = default_clock_snapshot
314
315 return msg
316
317 def _create_packet_beginning_message(self, packet, default_clock_snapshot=None):
318 utils._check_type(packet, bt2_packet._Packet)
319
320 if packet.stream.cls.packets_have_beginning_default_clock_snapshot:
321 if default_clock_snapshot is None:
322 raise ValueError(
323 "packet beginning messages in this stream must have a default clock snapshot"
324 )
325
326 utils._check_uint64(default_clock_snapshot)
327 ptr = native_bt.message_packet_beginning_create_with_default_clock_snapshot(
328 self._bt_ptr, packet._ptr, default_clock_snapshot
329 )
330 else:
331 if default_clock_snapshot is not None:
332 raise ValueError(
333 "packet beginning messages in this stream must not have a default clock snapshot"
334 )
335
336 ptr = native_bt.message_packet_beginning_create(self._bt_ptr, packet._ptr)
337
338 if ptr is None:
339 raise bt2._MemoryError("cannot create packet beginning message object")
340
341 return bt2_message._PacketBeginningMessage(ptr)
342
343 def _create_packet_end_message(self, packet, default_clock_snapshot=None):
344 utils._check_type(packet, bt2_packet._Packet)
345
346 if packet.stream.cls.packets_have_end_default_clock_snapshot:
347 if default_clock_snapshot is None:
348 raise ValueError(
349 "packet end messages in this stream must have a default clock snapshot"
350 )
351
352 utils._check_uint64(default_clock_snapshot)
353 ptr = native_bt.message_packet_end_create_with_default_clock_snapshot(
354 self._bt_ptr, packet._ptr, default_clock_snapshot
355 )
356 else:
357 if default_clock_snapshot is not None:
358 raise ValueError(
359 "packet end messages in this stream must not have a default clock snapshot"
360 )
361
362 ptr = native_bt.message_packet_end_create(self._bt_ptr, packet._ptr)
363
364 if ptr is None:
365 raise bt2._MemoryError("cannot create packet end message object")
366
367 return bt2_message._PacketEndMessage(ptr)
368
369 def _create_discarded_events_message(
370 self, stream, count=None, beg_clock_snapshot=None, end_clock_snapshot=None
371 ):
372 utils._check_type(stream, bt2_stream._Stream)
373
374 if not stream.cls.supports_discarded_events:
375 raise ValueError("stream class does not support discarded events")
376
377 if stream.cls.discarded_events_have_default_clock_snapshots:
378 if beg_clock_snapshot is None or end_clock_snapshot is None:
379 raise ValueError(
380 "discarded events have default clock snapshots for this stream class"
381 )
382
383 utils._check_uint64(beg_clock_snapshot)
384 utils._check_uint64(end_clock_snapshot)
385
386 if beg_clock_snapshot > end_clock_snapshot:
387 raise ValueError(
388 "beginning default clock snapshot value ({}) is greater than end default clock snapshot value ({})".format(
389 beg_clock_snapshot, end_clock_snapshot
390 )
391 )
392
393 ptr = (
394 native_bt.message_discarded_events_create_with_default_clock_snapshots(
395 self._bt_ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot
396 )
397 )
398 else:
399 if beg_clock_snapshot is not None or end_clock_snapshot is not None:
400 raise ValueError(
401 "discarded events have no default clock snapshots for this stream class"
402 )
403
404 ptr = native_bt.message_discarded_events_create(self._bt_ptr, stream._ptr)
405
406 if ptr is None:
407 raise bt2._MemoryError("cannot discarded events message object")
408
409 msg = bt2_message._DiscardedEventsMessage(ptr)
410
411 if count is not None:
412 msg._count = count
413
414 return msg
415
416 def _create_discarded_packets_message(
417 self, stream, count=None, beg_clock_snapshot=None, end_clock_snapshot=None
418 ):
419 utils._check_type(stream, bt2_stream._Stream)
420
421 if not stream.cls.supports_discarded_packets:
422 raise ValueError("stream class does not support discarded packets")
423
424 if stream.cls.discarded_packets_have_default_clock_snapshots:
425 if beg_clock_snapshot is None or end_clock_snapshot is None:
426 raise ValueError(
427 "discarded packets have default clock snapshots for this stream class"
428 )
429
430 utils._check_uint64(beg_clock_snapshot)
431 utils._check_uint64(end_clock_snapshot)
432
433 if beg_clock_snapshot > end_clock_snapshot:
434 raise ValueError(
435 "beginning default clock snapshot value ({}) is greater than end default clock snapshot value ({})".format(
436 beg_clock_snapshot, end_clock_snapshot
437 )
438 )
439
440 ptr = (
441 native_bt.message_discarded_packets_create_with_default_clock_snapshots(
442 self._bt_ptr, stream._ptr, beg_clock_snapshot, end_clock_snapshot
443 )
444 )
445 else:
446 if beg_clock_snapshot is not None or end_clock_snapshot is not None:
447 raise ValueError(
448 "discarded packets have no default clock snapshots for this stream class"
449 )
450
451 ptr = native_bt.message_discarded_packets_create(self._bt_ptr, stream._ptr)
452
453 if ptr is None:
454 raise bt2._MemoryError("cannot discarded packets message object")
455
456 msg = bt2_message._DiscardedPacketsMessage(ptr)
457
458 if count is not None:
459 msg._count = count
460
461 return msg
This page took 0.049956 seconds and 4 git commands to generate.