Add Babeltrace 2 Python bindings
[babeltrace.git] / bindings / python / bt2 / ctf_writer.py
1 # The MIT License (MIT)
2 #
3 # Copyright (c) 2016-2017 Philippe Proulx <pproulx@efficios.com>
4 #
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
11 #
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 # THE SOFTWARE.
22
23 from bt2 import native_bt, object, stream, utils
24 import uuid as uuidp
25 import bt2.event
26 import abc
27 import bt2
28
29
30 class CtfWriterClock(object._Object):
31 def __init__(self, name, description=None, frequency=None, precision=None,
32 offset=None, is_absolute=None, uuid=None):
33 utils._check_str(name)
34 ptr = native_bt.ctf_clock_create(name)
35
36 if ptr is None:
37 raise bt2.CreationError('cannot create CTF writer clock object')
38
39 super().__init__(ptr)
40
41 if description is not None:
42 self.description = description
43
44 if frequency is not None:
45 self.frequency = frequency
46
47 if precision is not None:
48 self.precision = precision
49
50 if offset is not None:
51 self.offset = offset
52
53 if is_absolute is not None:
54 self.is_absolute = is_absolute
55
56 if uuid is not None:
57 self.uuid = uuid
58
59 def __eq__(self, other):
60 if type(self) is not type(other):
61 # not comparing apples to apples
62 return False
63
64 if self.addr == other.addr:
65 return True
66
67 self_props = (
68 self.name,
69 self.description,
70 self.frequency,
71 self.precision,
72 self.offset,
73 self.is_absolute,
74 self.uuid
75 )
76 other_props = (
77 other.name,
78 other.description,
79 other.frequency,
80 other.precision,
81 other.offset,
82 other.is_absolute,
83 other.uuid
84 )
85 return self_props == other_props
86
87 def __copy__(self):
88 return CtfWriterClock(name=self.name, description=self.description,
89 frequency=self.frequency,
90 precision=self.precision, offset=self.offset,
91 is_absolute=self.is_absolute, uuid=self.uuid)
92
93 def __deepcopy__(self, memo):
94 cpy = self.__copy__()
95 memo[id(self)] = cpy
96 return cpy
97
98 @property
99 def name(self):
100 name = native_bt.ctf_clock_get_name(self._ptr)
101 utils._handle_ptr(name, "cannot get CTF writer clock object's name")
102 return name
103
104 @property
105 def description(self):
106 description = native_bt.ctf_clock_get_description(self._ptr)
107 return description
108
109 @description.setter
110 def description(self, description):
111 utils._check_str(description)
112 ret = native_bt.ctf_clock_set_description(self._ptr, description)
113 utils._handle_ret(ret, "cannot set CTF writer clock object's description")
114
115 @property
116 def frequency(self):
117 frequency = native_bt.ctf_clock_get_frequency(self._ptr)
118
119 if utils._is_m1ull(frequency):
120 raise bt2.Error("cannot get CTF writer clock object's frequency")
121
122 return frequency
123
124 @frequency.setter
125 def frequency(self, frequency):
126 utils._check_uint64(frequency)
127 ret = native_bt.ctf_clock_set_frequency(self._ptr, frequency)
128 utils._handle_ret(ret, "cannot set CTF writer clock object's frequency")
129
130 @property
131 def precision(self):
132 precision = native_bt.ctf_clock_get_precision(self._ptr)
133
134 if utils._is_m1ull(precision):
135 raise bt2.Error("cannot get CTF writer clock object's precision")
136
137 return precision
138
139 @precision.setter
140 def precision(self, precision):
141 utils._check_uint64(precision)
142 ret = native_bt.ctf_clock_set_precision(self._ptr, precision)
143 utils._handle_ret(ret, "cannot set CTF writer clock object's precision")
144
145 @property
146 def offset(self):
147 ret, offset_s = native_bt.ctf_clock_get_offset_s(self._ptr)
148 utils._handle_ret(ret, "cannot get CTF writer clock object's offset (seconds)")
149 ret, offset_cycles = native_bt.ctf_clock_get_offset(self._ptr)
150 utils._handle_ret(ret, "cannot get CTF writer clock object's offset (cycles)")
151 return bt2.ClockClassOffset(offset_s, offset_cycles)
152
153 @offset.setter
154 def offset(self, offset):
155 utils._check_type(offset, bt2.ClockClassOffset)
156 ret = native_bt.ctf_clock_set_offset_s(self._ptr, offset.seconds)
157 utils._handle_ret(ret, "cannot set CTF writer clock object's offset (seconds)")
158 ret = native_bt.ctf_clock_set_offset(self._ptr, offset.cycles)
159 utils._handle_ret(ret, "cannot set CTF writer clock object's offset (cycles)")
160
161 @property
162 def is_absolute(self):
163 is_absolute = native_bt.ctf_clock_get_is_absolute(self._ptr)
164 utils._handle_ret(is_absolute, "cannot get CTF writer clock object's absoluteness")
165 return is_absolute > 0
166
167 @is_absolute.setter
168 def is_absolute(self, is_absolute):
169 utils._check_bool(is_absolute)
170 ret = native_bt.ctf_clock_set_is_absolute(self._ptr, int(is_absolute))
171 utils._handle_ret(ret, "cannot set CTF writer clock object's absoluteness")
172
173 @property
174 def uuid(self):
175 uuid_bytes = native_bt.ctf_clock_get_uuid(self._ptr)
176
177 if uuid_bytes is None:
178 raise bt2.Error("cannot get CTF writer clock object's UUID")
179
180 return uuidp.UUID(bytes=uuid_bytes)
181
182 @uuid.setter
183 def uuid(self, uuid):
184 utils._check_type(uuid, uuidp.UUID)
185 ret = native_bt.ctf_clock_set_uuid(self._ptr, uuid.bytes)
186 utils._handle_ret(ret, "cannot set CTF writer clock object's UUID")
187
188 def _time(self, time):
189 utils._check_int64(time)
190 ret = native_bt.ctf_clock_set_time(self._ptr, time)
191
192 time = property(fset=_time)
193
194
195 class _CtfWriterStream(stream._StreamBase):
196 @property
197 def discarded_events_count(self):
198 ret, count = native_bt.ctf_stream_get_discarded_events_count(self._ptr)
199 utils._handle_ret(ret, "cannot get CTF writer stream object's discarded events count")
200 return count
201
202 def append_discarded_events(self, count):
203 utils._check_uint64(count)
204 native_bt.ctf_stream_append_discarded_events(self._ptr, count)
205
206 def append_event(self, event):
207 utils._check_type(event, bt2.event._Event)
208 ret = native_bt.ctf_stream_append_event(self._ptr, event._ptr)
209 utils._handle_ret(ret, 'cannot append event object to CTF writer stream object')
210
211 def flush(self):
212 ret = native_bt.ctf_stream_flush(self._ptr)
213 utils._handle_ret(ret, 'cannot cannot flush CTF writer stream object')
214
215 @property
216 def packet_header_field(self):
217 field_ptr = native_bt.ctf_stream_get_packet_header(self._ptr)
218
219 if field_ptr is None:
220 return
221
222 return fields._create_from_ptr(field_ptr)
223
224 @packet_header_field.setter
225 def packet_header_field(self, packet_header_field):
226 packet_header_field_ptr = None
227
228 if packet_header_field is not None:
229 utils._check_type(packet_header_field, fields._Field)
230 packet_header_field_ptr = packet_header_field._ptr
231
232 ret = native_bt.ctf_stream_set_packet_header(self._ptr,
233 packet_header_field_ptr)
234 utils._handle_ret(ret, "cannot set CTF writer stream object's packet header field")
235
236 @property
237 def packet_context_field(self):
238 field_ptr = native_bt.ctf_stream_get_packet_context(self._ptr)
239
240 if field_ptr is None:
241 return
242
243 return fields._create_from_ptr(field_ptr)
244
245 @packet_context_field.setter
246 def packet_context_field(self, packet_context_field):
247 packet_context_field_ptr = None
248
249 if packet_context_field is not None:
250 utils._check_type(packet_context_field, fields._Field)
251 packet_context_field_ptr = packet_context_field._ptr
252
253 ret = native_bt.ctf_stream_set_packet_context(self._ptr,
254 packet_context_field_ptr)
255 utils._handle_ret(ret, "cannot set CTF writer stream object's packet context field")
256
257 def __eq__(self, other):
258 if type(other) is not type(self):
259 return False
260
261 if self.addr == other.addr:
262 return True
263
264 if not _StreamBase.__eq__(self, other):
265 return False
266
267 self_props = (
268 self.discarded_events_count,
269 self.packet_header_field,
270 self.packet_context_field,
271 )
272 other_props = (
273 other.discarded_events_count,
274 other.packet_header_field,
275 other.packet_context_field,
276 )
277 return self_props == other_props
278
279 def _copy(self, copy_func):
280 cpy = self.stream_class(self.name)
281 cpy.append_discarded_events(self.discarded_events_count)
282 cpy.packet_header_field = copy_func(self.packet_header_field)
283 cpy.packet_context_field = copy_func(self.packet_context_field)
284 return cpy
285
286 def __copy__(self):
287 return self._copy(copy.copy)
288
289 def __deepcopy__(self, memo):
290 cpy = self._copy(copy.deepcopy)
291 memo[id(self)] = cpy
292 return cpy
293
294
295 class CtfWriter(object._Object):
296 def __init__(self, path):
297 utils._check_str(path)
298 ptr = native_bt.ctf_writer_create(path)
299
300 if ptr is None:
301 raise bt2.CreationError('cannot create CTF writer object')
302
303 super().__init__(ptr)
304
305 @property
306 def trace(self):
307 trace_ptr = native_bt.ctf_writer_get_trace(self._ptr)
308 utils._handle_ptr(name, "cannot get CTF writer object's trace class")
309 return bt2.Trace._create_from_ptr(trace_ptr)
310
311 @property
312 def metadata_string(self):
313 metadata_string = native_bt.ctf_writer_get_metadata_string(self._ptr)
314 utils._handle_ptr(metadata_string, "cannot get CTF writer object's metadata string")
315 return metadata_string
316
317 def flush_metadata(self):
318 native_bt.ctf_writer_flush_metadata(self._ptr)
319
320 def add_clock(self, clock):
321 utils._check_type(clock, CtfWriterClock)
322 ret = native_bt.ctf_writer_add_clock(self._ptr, clock._ptr)
323 utils._handle_ret(ret, 'cannot add CTF writer clock object to CTF writer object')
This page took 0.038411 seconds and 4 git commands to generate.