bt2c::Logger: remove unused cLevel() method
[babeltrace.git] / src / bindings / python / bt2 / bt2 / message.py
1 # SPDX-License-Identifier: MIT
2 #
3 # Copyright (c) 2017 Philippe Proulx <pproulx@efficios.com>
4
5 from bt2 import event as bt2_event
6 from bt2 import utils as bt2_utils
7 from bt2 import object as bt2_object
8 from bt2 import packet as bt2_packet
9 from bt2 import stream as bt2_stream
10 from bt2 import native_bt
11 from bt2 import clock_snapshot as bt2_clock_snapshot
12
13
14 def _create_from_ptr(ptr):
15 msg_type = native_bt.message_get_type(ptr)
16 return _MESSAGE_TYPE_TO_CLS[msg_type]._create_from_ptr(ptr)
17
18
19 class _MessageConst(bt2_object._SharedObject):
20 @staticmethod
21 def _get_ref(ptr):
22 native_bt.message_get_ref(ptr)
23
24 @staticmethod
25 def _put_ref(ptr):
26 native_bt.message_put_ref(ptr)
27
28 @staticmethod
29 def _check_has_default_clock_class(clock_class):
30 if clock_class is None:
31 raise ValueError(
32 "cannot get default clock snapshot: stream class has no default clock class"
33 )
34
35
36 class _Message(_MessageConst):
37 pass
38
39
40 class _MessageWithDefaultClockSnapshot:
41 def _get_default_clock_snapshot(self, borrow_clock_snapshot_ptr):
42 snapshot_ptr = borrow_clock_snapshot_ptr(self._ptr)
43
44 return bt2_clock_snapshot._ClockSnapshotConst._create_from_ptr_and_get_ref(
45 snapshot_ptr, self._ptr, self._get_ref, self._put_ref
46 )
47
48
49 class _EventMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
50 _borrow_default_clock_snapshot = staticmethod(
51 native_bt.message_event_borrow_default_clock_snapshot_const
52 )
53 _borrow_event = staticmethod(native_bt.message_event_borrow_event_const)
54 _event_pycls = property(lambda _: bt2_event._EventConst)
55
56 @property
57 def default_clock_snapshot(self):
58 self._check_has_default_clock_class(self.event.stream.cls.default_clock_class)
59 return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot)
60
61 @property
62 def event(self):
63 event_ptr = self._borrow_event(self._ptr)
64 assert event_ptr is not None
65 return self._event_pycls._create_from_ptr_and_get_ref(
66 event_ptr, self._ptr, self._get_ref, self._put_ref
67 )
68
69
70 class _EventMessage(_EventMessageConst, _Message):
71 _borrow_event = staticmethod(native_bt.message_event_borrow_event)
72 _stream_pycls = property(lambda _: bt2_stream._Stream)
73 _event_pycls = property(lambda _: bt2_event._Event)
74
75
76 class _PacketMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
77 _packet_pycls = bt2_packet._PacketConst
78
79 @property
80 def default_clock_snapshot(self):
81 self._check_has_default_clock_class(self.packet.stream.cls.default_clock_class)
82 return self._get_default_clock_snapshot(self._borrow_default_clock_snapshot_ptr)
83
84 @property
85 def packet(self):
86 packet_ptr = self._borrow_packet(self._ptr)
87 assert packet_ptr is not None
88 return self._packet_pycls._create_from_ptr_and_get_ref(packet_ptr)
89
90
91 class _PacketMessage(_PacketMessageConst, _Message):
92 _packet_pycls = bt2_packet._Packet
93
94
95 class _PacketBeginningMessageConst(_PacketMessageConst):
96 _borrow_packet = staticmethod(
97 native_bt.message_packet_beginning_borrow_packet_const
98 )
99 _borrow_default_clock_snapshot_ptr = staticmethod(
100 native_bt.message_packet_beginning_borrow_default_clock_snapshot_const
101 )
102
103
104 class _PacketBeginningMessage(_PacketMessage):
105 _borrow_packet = staticmethod(native_bt.message_packet_beginning_borrow_packet)
106
107
108 class _PacketEndMessageConst(_PacketMessageConst):
109 _borrow_packet = staticmethod(native_bt.message_packet_end_borrow_packet_const)
110 _borrow_default_clock_snapshot_ptr = staticmethod(
111 native_bt.message_packet_end_borrow_default_clock_snapshot_const
112 )
113
114
115 class _PacketEndMessage(_PacketMessage):
116 _borrow_packet = staticmethod(native_bt.message_packet_end_borrow_packet)
117
118
119 class _StreamMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
120 _stream_pycls = property(lambda _: bt2_stream._StreamConst)
121
122 @property
123 def stream(self):
124 stream_ptr = self._borrow_stream_ptr(self._ptr)
125 assert stream_ptr
126 return self._stream_pycls._create_from_ptr_and_get_ref(stream_ptr)
127
128 @property
129 def default_clock_snapshot(self):
130 self._check_has_default_clock_class(self.stream.cls.default_clock_class)
131
132 status, snapshot_ptr = self._borrow_default_clock_snapshot_ptr(self._ptr)
133
134 if status == native_bt.MESSAGE_STREAM_CLOCK_SNAPSHOT_STATE_UNKNOWN:
135 return bt2_clock_snapshot._UnknownClockSnapshot()
136
137 return bt2_clock_snapshot._ClockSnapshotConst._create_from_ptr_and_get_ref(
138 snapshot_ptr, self._ptr, self._get_ref, self._put_ref
139 )
140
141
142 class _StreamMessage(_StreamMessageConst, _Message):
143 def _default_clock_snapshot(self, raw_value):
144 bt2_utils._check_uint64(raw_value)
145 self._set_default_clock_snapshot(self._ptr, raw_value)
146
147 _default_clock_snapshot = property(
148 fget=_StreamMessageConst.default_clock_snapshot.fget,
149 fset=_default_clock_snapshot,
150 )
151 _stream_pycls = property(lambda _: bt2_stream._Stream)
152
153
154 class _StreamBeginningMessageConst(_StreamMessageConst):
155 _borrow_stream_ptr = staticmethod(
156 native_bt.message_stream_beginning_borrow_stream_const
157 )
158 _borrow_default_clock_snapshot_ptr = staticmethod(
159 native_bt.message_stream_beginning_borrow_default_clock_snapshot_const
160 )
161
162
163 class _StreamBeginningMessage(_StreamMessage):
164 _borrow_stream_ptr = staticmethod(native_bt.message_stream_beginning_borrow_stream)
165 _set_default_clock_snapshot = staticmethod(
166 native_bt.message_stream_beginning_set_default_clock_snapshot
167 )
168
169
170 class _StreamEndMessageConst(_StreamMessageConst):
171 _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream_const)
172 _borrow_default_clock_snapshot_ptr = staticmethod(
173 native_bt.message_stream_end_borrow_default_clock_snapshot_const
174 )
175
176
177 class _StreamEndMessage(_StreamMessage):
178 _borrow_stream_ptr = staticmethod(native_bt.message_stream_end_borrow_stream)
179 _set_default_clock_snapshot = staticmethod(
180 native_bt.message_stream_end_set_default_clock_snapshot
181 )
182
183
184 class _MessageIteratorInactivityMessageConst(
185 _MessageConst, _MessageWithDefaultClockSnapshot
186 ):
187 _borrow_clock_snapshot_ptr = staticmethod(
188 native_bt.message_message_iterator_inactivity_borrow_clock_snapshot_const
189 )
190
191 @property
192 def clock_snapshot(self):
193 # This kind of message always has a clock class: no
194 # need to call self._check_has_default_clock_class() here.
195 return self._get_default_clock_snapshot(self._borrow_clock_snapshot_ptr)
196
197
198 class _MessageIteratorInactivityMessage(
199 _MessageIteratorInactivityMessageConst, _Message
200 ):
201 pass
202
203
204 class _DiscardedMessageConst(_MessageConst, _MessageWithDefaultClockSnapshot):
205 _stream_pycls = property(lambda _: bt2_stream._StreamConst)
206
207 @property
208 def stream(self):
209 stream_ptr = self._borrow_stream_ptr(self._ptr)
210 assert stream_ptr
211 return self._stream_pycls._create_from_ptr_and_get_ref(stream_ptr)
212
213 @property
214 def count(self):
215 avail, count = self._get_count(self._ptr)
216 if avail is native_bt.PROPERTY_AVAILABILITY_AVAILABLE:
217 return count
218
219 def _check_has_default_clock_snapshots(self):
220 if not self._has_default_clock_snapshots:
221 raise ValueError(
222 "cannot get default clock snapshot: such a message has no clock snapshots for this stream class"
223 )
224
225 @property
226 def beginning_default_clock_snapshot(self):
227 self._check_has_default_clock_snapshots()
228 return self._get_default_clock_snapshot(
229 self._borrow_beginning_clock_snapshot_ptr
230 )
231
232 @property
233 def end_default_clock_snapshot(self):
234 self._check_has_default_clock_snapshots()
235 return self._get_default_clock_snapshot(self._borrow_end_clock_snapshot_ptr)
236
237
238 class _DiscardedMessage(_DiscardedMessageConst, _Message):
239 _stream_pycls = property(lambda _: bt2_stream._Stream)
240
241 def _set_count(self, count):
242 bt2_utils._check_uint64(count)
243
244 if count == 0:
245 raise ValueError("discarded {} count is 0".format(self._item_name))
246
247 self._set_count(self._ptr, count)
248
249 _count = property(fget=_DiscardedMessageConst.count.fget, fset=_set_count)
250
251
252 class _DiscardedEventsMessageConst(_DiscardedMessageConst):
253 _borrow_stream_ptr = staticmethod(
254 native_bt.message_discarded_events_borrow_stream_const
255 )
256 _get_count = staticmethod(native_bt.message_discarded_events_get_count)
257 _borrow_beginning_clock_snapshot_ptr = staticmethod(
258 native_bt.message_discarded_events_borrow_beginning_default_clock_snapshot_const
259 )
260 _borrow_end_clock_snapshot_ptr = staticmethod(
261 native_bt.message_discarded_events_borrow_end_default_clock_snapshot_const
262 )
263
264 @property
265 def _has_default_clock_snapshots(self):
266 return self.stream.cls.discarded_events_have_default_clock_snapshots
267
268
269 class _DiscardedEventsMessage(_DiscardedEventsMessageConst, _DiscardedMessage):
270 _borrow_stream_ptr = staticmethod(native_bt.message_discarded_events_borrow_stream)
271 _set_count = staticmethod(native_bt.message_discarded_events_set_count)
272 _item_name = "event"
273
274
275 class _DiscardedPacketsMessageConst(_DiscardedMessageConst):
276 _borrow_stream_ptr = staticmethod(
277 native_bt.message_discarded_packets_borrow_stream_const
278 )
279 _get_count = staticmethod(native_bt.message_discarded_packets_get_count)
280 _borrow_beginning_clock_snapshot_ptr = staticmethod(
281 native_bt.message_discarded_packets_borrow_beginning_default_clock_snapshot_const
282 )
283 _borrow_end_clock_snapshot_ptr = staticmethod(
284 native_bt.message_discarded_packets_borrow_end_default_clock_snapshot_const
285 )
286
287 @property
288 def _has_default_clock_snapshots(self):
289 return self.stream.cls.discarded_packets_have_default_clock_snapshots
290
291
292 class _DiscardedPacketsMessage(_DiscardedPacketsMessageConst, _DiscardedMessage):
293 _borrow_stream_ptr = staticmethod(native_bt.message_discarded_packets_borrow_stream)
294 _set_count = staticmethod(native_bt.message_discarded_packets_set_count)
295 _item_name = "packet"
296
297
298 _MESSAGE_TYPE_TO_CLS = {
299 native_bt.MESSAGE_TYPE_EVENT: _EventMessage,
300 native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: _MessageIteratorInactivityMessage,
301 native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessage,
302 native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessage,
303 native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessage,
304 native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessage,
305 native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessage,
306 native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessage,
307 }
308
309 _MESSAGE_TYPE_TO_CLS = {
310 native_bt.MESSAGE_TYPE_EVENT: _EventMessageConst,
311 native_bt.MESSAGE_TYPE_MESSAGE_ITERATOR_INACTIVITY: _MessageIteratorInactivityMessageConst,
312 native_bt.MESSAGE_TYPE_STREAM_BEGINNING: _StreamBeginningMessageConst,
313 native_bt.MESSAGE_TYPE_STREAM_END: _StreamEndMessageConst,
314 native_bt.MESSAGE_TYPE_PACKET_BEGINNING: _PacketBeginningMessageConst,
315 native_bt.MESSAGE_TYPE_PACKET_END: _PacketEndMessageConst,
316 native_bt.MESSAGE_TYPE_DISCARDED_EVENTS: _DiscardedEventsMessageConst,
317 native_bt.MESSAGE_TYPE_DISCARDED_PACKETS: _DiscardedPacketsMessageConst,
318 }
This page took 0.035611 seconds and 5 git commands to generate.