Remove `skip-string-normalization` in Python formatter config
[babeltrace.git] / src / bindings / python / bt2 / bt2 / message.py
... / ...
CommitLineData
1# SPDX-License-Identifier: MIT
2#
3# Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4
5from bt2 import native_bt, object, utils
6from bt2 import clock_snapshot as bt2_clock_snapshot
7from bt2 import packet as bt2_packet
8from bt2 import stream as bt2_stream
9from bt2 import event as bt2_event
10
11
12def _create_from_ptr(ptr):
13 msg_type = native_bt.message_get_type(ptr)
14 return _MESSAGE_TYPE_TO_CLS[msg_type]._create_from_ptr(ptr)
15
16
17class _MessageConst(object._SharedObject):
18 _get_ref = staticmethod(native_bt.message_get_ref)
19 _put_ref = staticmethod(native_bt.message_put_ref)
20
21 @staticmethod
22 def _check_has_default_clock_class(clock_class):
23 if clock_class is None:
24 raise ValueError(
25 "cannot get default clock snapshot: stream class has no default clock class"
26 )
27
28
29class _Message(_MessageConst):
30 pass
31
32
33class _MessageWithDefaultClockSnapshot:
34 def _get_default_clock_snapshot(self, borrow_clock_snapshot_ptr):
35 snapshot_ptr = borrow_clock_snapshot_ptr(self._ptr)
36
37 return bt2_clock_snapshot._ClockSnapshotConst._create_from_ptr_and_get_ref(
38 snapshot_ptr, self._ptr, self._get_ref, self._put_ref
39 )
40
41
42class _EventMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
43 _borrow_default_clock_snapshot = staticmethod(
44 native_bt.message_event_borrow_default_clock_snapshot_const
45 )
46 _borrow_event = staticmethod(native_bt.message_event_borrow_event_const)
47 _event_pycls = property(lambda _: bt2_event._EventConst)
48
49 @property
50 def default_clock_snapshot(self):
51 self._check_has_default_clock_class(self.event.stream.cls.default_clock_class)
52 return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot)
53
54 @property
55 def event(self):
56 event_ptr = self._borrow_event(self._ptr)
57 assert event_ptr is not None
58 return self._event_pycls._create_from_ptr_and_get_ref(
59 event_ptr, self._ptr, self._get_ref, self._put_ref
60 )
61
62
63class _EventMessage(_EventMessageConst, _Message):
64 _borrow_event = staticmethod(native_bt.message_event_borrow_event)
65 _stream_pycls = property(lambda _: bt2_stream._Stream)
66 _event_pycls = property(lambda _: bt2_event._Event)
67
68
69class _PacketMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
70 _packet_pycls = bt2_packet._PacketConst
71
72 @property
73 def default_clock_snapshot(self):
74 self._check_has_default_clock_class(self.packet.stream.cls.default_clock_class)
75 return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
76
77 @property
78 def packet(self):
79 packet_ptr = self._borrow_packet(self._ptr)
80 assert packet_ptr is not None
81 return self._packet_pycls._create_from_ptr_and_get_ref(packet_ptr)
82
83
84class _PacketMessage(_PacketMessageConst, _Message):
85 _packet_pycls = bt2_packet._Packet
86
87
88class _PacketBeginningMessageConst(_PacketMessageConst):
89 _borrow_packet = staticmethod(
90 native_bt.message_packet_beginning_borrow_packet_const
91 )
92 _borrow_default_clock_snapshot_ptr = staticmethod(
93 native_bt.message_packet_beginning_borrow_default_clock_snapshot_const
94 )
95
96
97class _PacketBeginningMessage(_PacketMessage):
98 _borrow_packet = staticmethod(native_bt.message_packet_beginning_borrow_packet)
99
100
101class _PacketEndMessageConst(_PacketMessageConst):
102 _borrow_packet = staticmethod(native_bt.message_packet_end_borrow_packet_const)
103 _borrow_default_clock_snapshot_ptr = staticmethod(
104 native_bt.message_packet_end_borrow_default_clock_snapshot_const
105 )
106
107
108class _PacketEndMessage(_PacketMessage):
109 _borrow_packet = staticmethod(native_bt.message_packet_end_borrow_packet)
110
111
112class _StreamMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
113 _stream_pycls = property(lambda _: bt2_stream._StreamConst)
114
115 @property
116 def stream(self):
117 stream_ptr = self._borrow_stream_ptr(self._ptr)
118 assert stream_ptr
119 return self._stream_pycls._create_from_ptr_and_get_ref(stream_ptr)
120
121 @property
122 def default_clock_snapshot(self):
123 self._check_has_default_clock_class(self.stream.cls.default_clock_class)
124
125 status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
126
127 if status == native_bt.MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN:
128 return bt2_clock_snapshot._UnknownClockSnapshot()
129
130 return bt2_clock_snapshot._ClockSnapshotConst._create_from_ptr_and_get_ref(
131 snapshot_ptr, self._ptr, self._get_ref, self._put_ref
132 )
133
134
135class _StreamMessage(_StreamMessageConst, _Message):
136 def _default_clock_snapshot(self, raw_value):
137 utils._check_uint64(raw_value)
138 self._set_default_clock_snapshot(self._ptr, raw_value)
139
140 _default_clock_snapshot = property(
141 fget=_StreamMessageConst.default_clock_snapshot.fget,
142 fset=_default_clock_snapshot,
143 )
144 _stream_pycls = property(lambda _: bt2_stream._Stream)
145
146
147class _StreamBeginningMessageConst(_StreamMessageConst):
148 _borrow_stream_ptr = staticmethod(
149 native_bt.message_stream_beginning_borrow_stream_const
150 )
151 _borrow_default_clock_snapshot_ptr = staticmethod(
152 native_bt.message_stream_beginning_borrow_default_clock_snapshot_const
153 )
154
155
156class _StreamBeginningMessage(_StreamMessage):
157 _borrow_stream_ptr = staticmethod(native_bt.message_stream_beginning_borrow_stream)
158 _set_default_clock_snapshot = staticmethod(
159 native_bt.message_stream_beginning_set_default_clock_snapshot
160 )
161
162
163class _StreamEndMessageConst(_StreamMessageConst):
164 _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream_const)
165 _borrow_default_clock_snapshot_ptr = staticmethod(
166 native_bt.message_stream_end_borrow_default_clock_snapshot_const
167 )
168
169
170class _StreamEndMessage(_StreamMessage):
171 _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
172 _set_default_clock_snapshot = staticmethod(
173 native_bt.message_stream_end_set_default_clock_snapshot
174 )
175
176
177class _MessageIteratorInactivityMessageConst(
178 _MessageConst, _MessageWithDefaultClockSnapshot
179):
180 _borrow_clock_snapshot_ptr = staticmethod(
181 native_bt.message_message_iterator_inactivity_borrow_clock_snapshot_const
182 )
183
184 @property
185 def clock_snapshot(self):
186 # This kind of message always has a clock class: no
187 # need to call self._check_has_default_clock_class() here.
188 return self._get_default_clock_snapshot(self._borrow_clock_snapshot_ptr)
189
190
191class _MessageIteratorInactivityMessage(
192 _MessageIteratorInactivityMessageConst, _Message
193):
194 pass
195
196
197class _DiscardedMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
198 _stream_pycls = property(lambda _: bt2_stream._StreamConst)
199
200 @property
201 def stream(self):
202 stream_ptr = self._borrow_stream_ptr(self._ptr)
203 assert stream_ptr
204 return self._stream_pycls._create_from_ptr_and_get_ref(stream_ptr)
205
206 @property
207 def count(self):
208 avail, count = self._get_count(self._ptr)
209 if avail is native_bt.PROPERTY_AVAILABILITY_AVAILABLE:
210 return count
211
212 def _check_has_default_clock_snapshots(self):
213 if not self._has_default_clock_snapshots:
214 raise ValueError(
215 "cannot get default clock snapshot: such a message has no clock snapshots for this stream class"
216 )
217
218 @property
219 def beginning_default_clock_snapshot(self):
220 self._check_has_default_clock_snapshots()
221 return self._get_default_clock_snapshot(
222 self._borrow_beginning_clock_snapshot_ptr
223 )
224
225 @property
226 def end_default_clock_snapshot(self):
227 self._check_has_default_clock_snapshots()
228 return self._get_default_clock_snapshot(self._borrow_end_clock_snapshot_ptr)
229
230
231class _DiscardedMessage(_DiscardedMessageConst, _Message):
232 _stream_pycls = property(lambda _: bt2_stream._Stream)
233
234 def _set_count(self, count):
235 utils._check_uint64(count)
236
237 if count == 0:
238 raise ValueError("discarded {} count is 0".format(self._item_name))
239
240 self._set_count(self._ptr, count)
241
242 _count = property(fget=_DiscardedMessageConst.count.fget, fset=_set_count)
243
244
245class _DiscardedEventsMessageConst(_DiscardedMessageConst):
246 _borrow_stream_ptr = staticmethod(
247 native_bt.message_discarded_events_borrow_stream_const
248 )
249 _get_count = staticmethod(native_bt.message_discarded_events_get_count)
250 _borrow_beginning_clock_snapshot_ptr = staticmethod(
251 native_bt.message_discarded_events_borrow_beginning_default_clock_snapshot_const
252 )
253 _borrow_end_clock_snapshot_ptr = staticmethod(
254 native_bt.message_discarded_events_borrow_end_default_clock_snapshot_const
255 )
256
257 @property
258 def _has_default_clock_snapshots(self):
259 return self.stream.cls.discarded_events_have_default_clock_snapshots
260
261
262class _DiscardedEventsMessage(_DiscardedEventsMessageConst, _DiscardedMessage):
263 _borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream)
264 _set_count = staticmethod(native_bt.message_discarded_events_set_count)
265 _item_name = "event"
266
267
268class _DiscardedPacketsMessageConst(_DiscardedMessageConst):
269 _borrow_stream_ptr = staticmethod(
270 native_bt.message_discarded_packets_borrow_stream_const
271 )
272 _get_count = staticmethod(native_bt.message_discarded_packets_get_count)
273 _borrow_beginning_clock_snapshot_ptr = staticmethod(
274 native_bt.message_discarded_packets_borrow_beginning_default_clock_snapshot_const
275 )
276 _borrow_end_clock_snapshot_ptr = staticmethod(
277 native_bt.message_discarded_packets_borrow_end_default_clock_snapshot_const
278 )
279
280 @property
281 def _has_default_clock_snapshots(self):
282 return self.stream.cls.discarded_packets_have_default_clock_snapshots
283
284
285class _DiscardedPacketsMessage(_DiscardedPacketsMessageConst, _DiscardedMessage):
286 _borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream)
287 _set_count = staticmethod(native_bt.message_discarded_packets_set_count)
288 _item_name = "packet"
289
290
291_MESSAGE_TYPE_TO_CLS = {
292 native_bt.MESSAGE_TYPE_EVENT: _EventMessage,
293 native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: _MessageIteratorInactivityMessage,
294 native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessage,
295 native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessage,
296 native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage,
297 native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage,
298 native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessage,
299 native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage,
300}
301
302_MESSAGE_TYPE_TO_CLS = {
303 native_bt.MESSAGE_TYPE_EVENT: _EventMessageConst,
304 native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: _MessageIteratorInactivityMessageConst,
305 native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessageConst,
306 native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessageConst,
307 native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessageConst,
308 native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessageConst,
309 native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessageConst,
310 native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessageConst,
311}
This page took 0.023743 seconds and 4 git commands to generate.