1 # The MIT License (MIT)
3 # Copyright (c) 2016-2017 Philippe Proulx <pproulx@efficios.com>
5 # Permission is hereby granted, free of charge, to any person obtaining a copy
6 # of this software and associated documentation files (the "Software"), to deal
7 # in the Software without restriction, including without limitation the rights
8 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 # copies of the Software, and to permit persons to whom the Software is
10 # furnished to do so, subject to the following conditions:
12 # The above copyright notice and this permission notice shall be included in
13 # all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 from bt2
import native_bt
, object, stream
, utils
30 class CtfWriterClock(object._Object
):
31 def __init__(self
, name
, description
=None, frequency
=None, precision
=None,
32 offset
=None, is_absolute
=None, uuid
=None):
33 utils
._check
_str
(name
)
34 ptr
= native_bt
.ctf_clock_create(name
)
37 raise bt2
.CreationError('cannot create CTF writer clock object')
41 if description
is not None:
42 self
.description
= description
44 if frequency
is not None:
45 self
.frequency
= frequency
47 if precision
is not None:
48 self
.precision
= precision
50 if offset
is not None:
53 if is_absolute
is not None:
54 self
.is_absolute
= is_absolute
59 def __eq__(self
, other
):
60 if type(self
) is not type(other
):
61 # not comparing apples to apples
64 if self
.addr
== other
.addr
:
85 return self_props
== other_props
88 return CtfWriterClock(name
=self
.name
, description
=self
.description
,
89 frequency
=self
.frequency
,
90 precision
=self
.precision
, offset
=self
.offset
,
91 is_absolute
=self
.is_absolute
, uuid
=self
.uuid
)
93 def __deepcopy__(self
, memo
):
100 name
= native_bt
.ctf_clock_get_name(self
._ptr
)
101 utils
._handle
_ptr
(name
, "cannot get CTF writer clock object's name")
105 def description(self
):
106 description
= native_bt
.ctf_clock_get_description(self
._ptr
)
110 def description(self
, description
):
111 utils
._check
_str
(description
)
112 ret
= native_bt
.ctf_clock_set_description(self
._ptr
, description
)
113 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's description")
117 frequency
= native_bt
.ctf_clock_get_frequency(self
._ptr
)
119 if utils
._is
_m
1ull(frequency
):
120 raise bt2
.Error("cannot get CTF writer clock object's frequency")
125 def frequency(self
, frequency
):
126 utils
._check
_uint
64(frequency
)
127 ret
= native_bt
.ctf_clock_set_frequency(self
._ptr
, frequency
)
128 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's frequency")
132 precision
= native_bt
.ctf_clock_get_precision(self
._ptr
)
134 if utils
._is
_m
1ull(precision
):
135 raise bt2
.Error("cannot get CTF writer clock object's precision")
140 def precision(self
, precision
):
141 utils
._check
_uint
64(precision
)
142 ret
= native_bt
.ctf_clock_set_precision(self
._ptr
, precision
)
143 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's precision")
147 ret
, offset_s
= native_bt
.ctf_clock_get_offset_s(self
._ptr
)
148 utils
._handle
_ret
(ret
, "cannot get CTF writer clock object's offset (seconds)")
149 ret
, offset_cycles
= native_bt
.ctf_clock_get_offset(self
._ptr
)
150 utils
._handle
_ret
(ret
, "cannot get CTF writer clock object's offset (cycles)")
151 return bt2
.ClockClassOffset(offset_s
, offset_cycles
)
154 def offset(self
, offset
):
155 utils
._check
_type
(offset
, bt2
.ClockClassOffset
)
156 ret
= native_bt
.ctf_clock_set_offset_s(self
._ptr
, offset
.seconds
)
157 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's offset (seconds)")
158 ret
= native_bt
.ctf_clock_set_offset(self
._ptr
, offset
.cycles
)
159 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's offset (cycles)")
162 def is_absolute(self
):
163 is_absolute
= native_bt
.ctf_clock_get_is_absolute(self
._ptr
)
164 utils
._handle
_ret
(is_absolute
, "cannot get CTF writer clock object's absoluteness")
165 return is_absolute
> 0
168 def is_absolute(self
, is_absolute
):
169 utils
._check
_bool
(is_absolute
)
170 ret
= native_bt
.ctf_clock_set_is_absolute(self
._ptr
, int(is_absolute
))
171 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's absoluteness")
175 uuid_bytes
= native_bt
.ctf_clock_get_uuid(self
._ptr
)
177 if uuid_bytes
is None:
178 raise bt2
.Error("cannot get CTF writer clock object's UUID")
180 return uuidp
.UUID(bytes
=uuid_bytes
)
183 def uuid(self
, uuid
):
184 utils
._check
_type
(uuid
, uuidp
.UUID
)
185 ret
= native_bt
.ctf_clock_set_uuid(self
._ptr
, uuid
.bytes
)
186 utils
._handle
_ret
(ret
, "cannot set CTF writer clock object's UUID")
188 def _time(self
, time
):
189 utils
._check
_int
64(time
)
190 ret
= native_bt
.ctf_clock_set_time(self
._ptr
, time
)
192 time
= property(fset
=_time
)
195 class _CtfWriterStream(stream
._StreamBase
):
197 def discarded_events_count(self
):
198 ret
, count
= native_bt
.ctf_stream_get_discarded_events_count(self
._ptr
)
199 utils
._handle
_ret
(ret
, "cannot get CTF writer stream object's discarded events count")
202 def append_discarded_events(self
, count
):
203 utils
._check
_uint
64(count
)
204 native_bt
.ctf_stream_append_discarded_events(self
._ptr
, count
)
206 def append_event(self
, event
):
207 utils
._check
_type
(event
, bt2
.event
._Event
)
208 ret
= native_bt
.ctf_stream_append_event(self
._ptr
, event
._ptr
)
209 utils
._handle
_ret
(ret
, 'cannot append event object to CTF writer stream object')
212 ret
= native_bt
.ctf_stream_flush(self
._ptr
)
213 utils
._handle
_ret
(ret
, 'cannot cannot flush CTF writer stream object')
216 def packet_header_field(self
):
217 field_ptr
= native_bt
.ctf_stream_get_packet_header(self
._ptr
)
219 if field_ptr
is None:
222 return fields
._create
_from
_ptr
(field_ptr
)
224 @packet_header_field.setter
225 def packet_header_field(self
, packet_header_field
):
226 packet_header_field_ptr
= None
228 if packet_header_field
is not None:
229 utils
._check
_type
(packet_header_field
, fields
._Field
)
230 packet_header_field_ptr
= packet_header_field
._ptr
232 ret
= native_bt
.ctf_stream_set_packet_header(self
._ptr
,
233 packet_header_field_ptr
)
234 utils
._handle
_ret
(ret
, "cannot set CTF writer stream object's packet header field")
237 def packet_context_field(self
):
238 field_ptr
= native_bt
.ctf_stream_get_packet_context(self
._ptr
)
240 if field_ptr
is None:
243 return fields
._create
_from
_ptr
(field_ptr
)
245 @packet_context_field.setter
246 def packet_context_field(self
, packet_context_field
):
247 packet_context_field_ptr
= None
249 if packet_context_field
is not None:
250 utils
._check
_type
(packet_context_field
, fields
._Field
)
251 packet_context_field_ptr
= packet_context_field
._ptr
253 ret
= native_bt
.ctf_stream_set_packet_context(self
._ptr
,
254 packet_context_field_ptr
)
255 utils
._handle
_ret
(ret
, "cannot set CTF writer stream object's packet context field")
257 def __eq__(self
, other
):
258 if type(other
) is not type(self
):
261 if self
.addr
== other
.addr
:
264 if not _StreamBase
.__eq
__(self
, other
):
268 self
.discarded_events_count
,
269 self
.packet_header_field
,
270 self
.packet_context_field
,
273 other
.discarded_events_count
,
274 other
.packet_header_field
,
275 other
.packet_context_field
,
277 return self_props
== other_props
279 def _copy(self
, copy_func
):
280 cpy
= self
.stream_class(self
.name
)
281 cpy
.append_discarded_events(self
.discarded_events_count
)
282 cpy
.packet_header_field
= copy_func(self
.packet_header_field
)
283 cpy
.packet_context_field
= copy_func(self
.packet_context_field
)
287 return self
._copy
(copy
.copy
)
289 def __deepcopy__(self
, memo
):
290 cpy
= self
._copy
(copy
.deepcopy
)
295 class CtfWriter(object._Object
):
296 def __init__(self
, path
):
297 utils
._check
_str
(path
)
298 ptr
= native_bt
.ctf_writer_create(path
)
301 raise bt2
.CreationError('cannot create CTF writer object')
303 super().__init
__(ptr
)
307 trace_ptr
= native_bt
.ctf_writer_get_trace(self
._ptr
)
308 utils
._handle
_ptr
(name
, "cannot get CTF writer object's trace class")
309 return bt2
.Trace
._create
_from
_ptr
(trace_ptr
)
312 def metadata_string(self
):
313 metadata_string
= native_bt
.ctf_writer_get_metadata_string(self
._ptr
)
314 utils
._handle
_ptr
(metadata_string
, "cannot get CTF writer object's metadata string")
315 return metadata_string
317 def flush_metadata(self
):
318 native_bt
.ctf_writer_flush_metadata(self
._ptr
)
320 def add_clock(self
, clock
):
321 utils
._check
_type
(clock
, CtfWriterClock
)
322 ret
= native_bt
.ctf_writer_add_clock(self
._ptr
, clock
._ptr
)
323 utils
._handle
_ret
(ret
, 'cannot add CTF writer clock object to CTF writer object')
This page took 0.05309 seconds and 4 git commands to generate.