701d40947c1074a65542fefef812a1a768480c87
[babeltrace.git] / bindings / python / bt.py
1 # nativebt.i.in
2 #
3 # Babeltrace native interface Python module
4 #
5 # Copyright 2012-2015 EfficiOS Inc.
6 #
7 # Author: Danny Serres <danny.serres@efficios.com>
8 # Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 #
10 # Permission is hereby granted, free of charge, to any person obtaining a copy
11 # of this software and associated documentation files (the "Software"), to deal
12 # in the Software without restriction, including without limitation the rights
13 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 # copies of the Software, and to permit persons to whom the Software is
15 # furnished to do so, subject to the following conditions:
16 #
17 # The above copyright notice and this permission notice shall be included in
18 # all copies or substantial portions of the Software.
19 #
20 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 # SOFTWARE.
27
28 import babeltrace.nativebt as nbt
29 import collections
30 import os
31 from datetime import datetime
32 from uuid import UUID
33
34
35 class TraceCollection:
36 """
37 A :class:`TraceCollection` is a collection of opened traces.
38
39 In general, once a trace collection is created, you add one to many
40 independent traces to it using :meth:`add_trace` or
41 :meth:`add_traces_recursive`, and then iterate the ordered events
42 of all traces merged together using :attr:`events`.
43
44 You may use :meth:`remove_trace` to close and remove a specific
45 trace from a trace collection, although all the traces of a given
46 trace collection will be automatically removed when it is garbage
47 collected.
48 """
49
50 def __init__(self):
51 """
52 Creates an empty trace collection.
53 """
54
55 self._tc = nbt._bt_context_create()
56
57 def __del__(self):
58 nbt._bt_context_put(self._tc)
59
60 def add_trace(self, path, format_str):
61 """
62 Adds a trace to the trace collection.
63
64 The trace is located at the file system path *path*. This
65 function **does not** recurse directories to find the trace:
66 *path* must point to the exact trace location (see
67 :meth:`add_traces_recursive` for a recursive version of this
68 function).
69
70 *format_str* is a string indicating the Babeltrace type of the
71 trace to add. ``ctf`` is the only currently supported trace
72 format.
73
74 Once added, the trace is opened.
75
76 Returns the corresponding :class:`TraceHandle` instance for
77 this opened trace on success, or ``None`` on error.
78 """
79
80 ret = nbt._bt_context_add_trace(self._tc, path, format_str,
81 None, None, None)
82
83 if ret < 0:
84 return None
85
86 th = TraceHandle.__new__(TraceHandle)
87 th._id = ret
88 th._trace_collection = self
89
90 return th
91
92 def add_traces_recursive(self, path, format_str):
93 """
94 Adds traces to this trace collection by recursively searching
95 in the *path* directory.
96
97 *format_str* is a string indicating the Babeltrace type of the
98 traces to find and add. ``ctf`` is the only currently supported
99 trace format.
100
101 See also :meth:`add_trace`.
102
103 Returns a :class:`dict` object mapping full paths to trace
104 handles for each trace found, or ``None`` on error.
105 """
106
107 trace_handles = {}
108 noTrace = True
109 error = False
110
111 for fullpath, dirs, files in os.walk(path):
112 if "metadata" in files:
113 trace_handle = self.add_trace(fullpath, format_str)
114
115 if trace_handle is None:
116 error = True
117 continue
118
119 trace_handles[fullpath] = trace_handle
120 noTrace = False
121
122 if noTrace and error:
123 return None
124
125 return trace_handles
126
127 def remove_trace(self, trace_handle):
128 """
129 Removes a trace from the trace collection using its trace
130 handle *trace_handle*.
131
132 :class:`TraceHandle` objects are returned by :meth:`add_trace`
133 and :meth:`add_traces_recursive`.
134
135 The trace is closed before being removed.
136 """
137
138 try:
139 nbt._bt_context_remove_trace(self._tc, trace_handle._id)
140 except AttributeError:
141 raise TypeError("in remove_trace, argument 2 must be a TraceHandle instance")
142
143 @property
144 def events(self):
145 """
146 Generates the ordered :class:`Event` objects of all the opened
147 traces contained in this trace collection. Iterate this function
148 to iterate actual events.
149
150 Due to limitations of the native Babeltrace API, only one event
151 may be "alive" at a given time, i.e. a user **should never**
152 store a copy of the events returned by this function for
153 ulterior use. Users shall make sure to copy the information
154 they need *from* an event before accessing the next one.
155
156 Furthermore, :class:`Event` objects become invalid when the
157 generator goes out of scope as the underlying iterator will be
158 reclaimed. Using an event after the the generator has gone out
159 of scope may result in a crash or data corruption.
160 """
161
162 begin_pos_ptr = nbt._bt_iter_pos()
163 end_pos_ptr = nbt._bt_iter_pos()
164 begin_pos_ptr.type = nbt.SEEK_BEGIN
165 end_pos_ptr.type = nbt.SEEK_LAST
166
167 for event in self._events(begin_pos_ptr, end_pos_ptr):
168 yield event
169
170 def events_timestamps(self, timestamp_begin, timestamp_end):
171 """
172 Generates the ordered :class:`Event` objects of all the opened
173 traces contained in this trace collection from *timestamp_begin*
174 to *timestamp_end*.
175
176 See :attr:`events` for notes and limitations.
177 """
178
179 begin_pos_ptr = nbt._bt_iter_pos()
180 end_pos_ptr = nbt._bt_iter_pos()
181 begin_pos_ptr.type = end_pos_ptr.type = nbt.SEEK_TIME
182 begin_pos_ptr.u.seek_time = timestamp_begin
183 end_pos_ptr.u.seek_time = timestamp_end
184
185 for event in self._events(begin_pos_ptr, end_pos_ptr):
186 yield event
187
188 @property
189 def timestamp_begin(self):
190 """
191 Trace collection's begin timestamp.
192 """
193
194 pos_ptr = nbt._bt_iter_pos()
195 pos_ptr.type = nbt.SEEK_BEGIN
196
197 return self._timestamp_at_pos(pos_ptr)
198
199 @property
200 def timestamp_end(self):
201 """
202 Trace collection's end timestamp.
203 """
204
205 pos_ptr = nbt._bt_iter_pos()
206 pos_ptr.type = nbt.SEEK_LAST
207
208 return self._timestamp_at_pos(pos_ptr)
209
210 def _timestamp_at_pos(self, pos_ptr):
211 ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, pos_ptr, pos_ptr)
212
213 if ctf_it_ptr is None:
214 raise NotImplementedError("Creation of multiple iterators is unsupported.")
215
216 ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr)
217 nbt._bt_ctf_iter_destroy(ctf_it_ptr)
218
219 def _events(self, begin_pos_ptr, end_pos_ptr):
220 ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, begin_pos_ptr, end_pos_ptr)
221
222 if ctf_it_ptr is None:
223 raise NotImplementedError("Creation of multiple iterators is unsupported.")
224
225 while True:
226 ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr)
227
228 if ev_ptr is None:
229 break
230
231 ev = Event.__new__(Event)
232 ev._e = ev_ptr
233
234 try:
235 yield ev
236 except GeneratorExit:
237 break
238
239 ret = nbt._bt_iter_next(nbt._bt_ctf_get_iter(ctf_it_ptr))
240
241 if ret != 0:
242 break
243
244 nbt._bt_ctf_iter_destroy(ctf_it_ptr)
245
246
247 # Based on enum bt_clock_type in clock-type.h
248 class _ClockType:
249 CLOCK_CYCLES = 0
250 CLOCK_REAL = 1
251
252
253 class TraceHandle:
254 """
255 A :class:`TraceHandle` is a handle allowing the user to manipulate
256 a specific trace directly. It is a unique identifier representing a
257 trace, and is not meant to be instantiated by the user.
258 """
259
260 def __init__(self):
261 raise NotImplementedError("TraceHandle cannot be instantiated")
262
263 def __repr__(self):
264 return "Babeltrace TraceHandle: trace_id('{0}')".format(self._id)
265
266 @property
267 def id(self):
268 """
269 Trace handle's numeric ID.
270 """
271
272 return self._id
273
274 @property
275 def path(self):
276 """
277 Path of the underlying trace.
278 """
279
280 return nbt._bt_trace_handle_get_path(self._trace_collection._tc,
281 self._id)
282
283 @property
284 def timestamp_begin(self):
285 """
286 Buffers creation timestamp (nanoseconds since Epoch) of the
287 underlying trace.
288 """
289
290 return nbt._bt_trace_handle_get_timestamp_begin(self._trace_collection._tc,
291 self._id,
292 _ClockType.CLOCK_REAL)
293
294 @property
295 def timestamp_end(self):
296 """
297 Buffers destruction timestamp (nanoseconds since Epoch) of the
298 underlying trace.
299 """
300
301 return nbt._bt_trace_handle_get_timestamp_end(self._trace_collection._tc,
302 self._id,
303 _ClockType.CLOCK_REAL)
304
305 @property
306 def events(self):
307 """
308 Generates all the :class:`EventDeclaration` objects of the
309 underlying trace.
310
311 Note that this doesn't generate actual trace *events*, but
312 rather their declarations, i.e. their layouts and metadata.
313 """
314
315 ret = nbt._bt_python_event_decl_listcaller(self.id,
316 self._trace_collection._tc)
317
318 if not isinstance(ret, list):
319 return
320
321 ptr_list, count = ret
322
323 for i in range(count):
324 tmp = EventDeclaration.__new__(EventDeclaration)
325 tmp._ed = nbt._bt_python_decl_one_from_list(ptr_list, i)
326 yield tmp
327
328
329 class CTFStringEncoding:
330 """
331 CTF string encodings.
332 """
333
334 #: None
335 NONE = 0
336
337 #: UTF-8
338 UTF8 = 1
339
340 #: ASCII
341 ASCII = 2
342
343 #: Unknown
344 UNKNOWN = 3
345
346
347 # Based on the enum in ctf-writer/writer.h
348 class ByteOrder:
349 """
350 Byte orders.
351 """
352
353 #: Native byte order
354 BYTE_ORDER_NATIVE = 0
355
356 #: Little-endian
357 BYTE_ORDER_LITTLE_ENDIAN = 1
358
359 #: Big-endian
360 BYTE_ORDER_BIG_ENDIAN = 2
361
362 #: Network byte order (big-endian)
363 BYTE_ORDER_NETWORK = 3
364
365 #: Unknown byte order
366 BYTE_ORDER_UNKNOWN = 4 # Python-specific entry
367
368
369 # enum equivalent, accessible constants
370 # These are taken directly from ctf/events.h
371 # All changes to enums must also be made here
372 class CTFTypeId:
373 """
374 CTF numeric type identifiers.
375 """
376
377 #: Unknown type
378 UNKNOWN = 0
379
380 #: Integer
381 INTEGER = 1
382
383 #: Floating point number
384 FLOAT = 2
385
386 #: Enumeration
387 ENUM = 3
388
389 #: String
390 STRING = 4
391
392 #: Structure
393 STRUCT = 5
394
395 #: Untagged variant
396 UNTAGGED_VARIANT = 6
397
398 #: Variant
399 VARIANT = 7
400
401 #: Array
402 ARRAY = 8
403
404 #: Sequence
405 SEQUENCE = 9
406
407 NR_CTF_TYPES = 10
408
409 def type_name(id):
410 """
411 Returns the name of the CTF numeric type identifier *id*.
412 """
413
414 name = "UNKNOWN_TYPE"
415 constants = [
416 attr for attr in dir(CTFTypeId) if not callable(
417 getattr(
418 CTFTypeId,
419 attr)) and not attr.startswith("__")]
420
421 for attr in constants:
422 if getattr(CTFTypeId, attr) == id:
423 name = attr
424 break
425
426 return name
427
428
429 class CTFScope:
430 """
431 CTF scopes.
432 """
433
434 #: Packet header
435 TRACE_PACKET_HEADER = 0
436
437 #: Packet context
438 STREAM_PACKET_CONTEXT = 1
439
440 #: Event header
441 STREAM_EVENT_HEADER = 2
442
443 #: Stream event context
444 STREAM_EVENT_CONTEXT = 3
445
446 #: Event context
447 EVENT_CONTEXT = 4
448
449 #: Event fields
450 EVENT_FIELDS = 5
451
452 def scope_name(scope):
453 """
454 Returns the name of the CTF scope *scope*.
455 """
456
457 name = "UNKNOWN_SCOPE"
458 constants = [
459 attr for attr in dir(CTFScope) if not callable(
460 getattr(
461 CTFScope,
462 attr)) and not attr.startswith("__")]
463
464 for attr in constants:
465 if getattr(CTFScope, attr) == scope:
466 name = attr
467 break
468
469 return name
470
471
472 # Priority of the scopes when searching for event fields
473 _scopes = [
474 CTFScope.EVENT_FIELDS,
475 CTFScope.EVENT_CONTEXT,
476 CTFScope.STREAM_EVENT_CONTEXT,
477 CTFScope.STREAM_EVENT_HEADER,
478 CTFScope.STREAM_PACKET_CONTEXT,
479 CTFScope.TRACE_PACKET_HEADER
480 ]
481
482
483 class Event(collections.Mapping):
484 """
485 An :class:`Event` object represents a trace event. :class:`Event`
486 objects are returned by :attr:`TraceCollection.events` and are
487 not meant to be instantiated by the user.
488
489 :class:`Event` has a :class:`dict`-like interface for accessing
490 an event's field value by field name:
491
492 .. code-block:: python
493
494 event['my_field']
495
496 If a field name exists in multiple scopes, the value of the first
497 field found is returned. The scopes are searched in the following
498 order:
499
500 1. Event fields (:attr:`CTFScope.EVENT_FIELDS`)
501 2. Event context (:attr:`CTFScope.EVENT_CONTEXT`)
502 3. Stream event context (:attr:`CTFScope.STREAM_EVENT_CONTEXT`)
503 4. Event header (:attr:`CTFScope.STREAM_EVENT_HEADER`)
504 5. Packet context (:attr:`CTFScope.STREAM_PACKET_CONTEXT`)
505 6. Packet header (:attr:`CTFScope.TRACE_PACKET_HEADER`)
506
507 It is still possible to obtain a field's value from a specific
508 scope using :meth:`field_with_scope`.
509
510 Field values are returned as native Python types, that is:
511
512 +-----------------------+----------------------------------+
513 | Field type | Python type |
514 +=======================+==================================+
515 | Integer | :class:`int` |
516 +-----------------------+----------------------------------+
517 | Floating point number | :class:`float` |
518 +-----------------------+----------------------------------+
519 | Enumeration | :class:`str` (enumeration label) |
520 +-----------------------+----------------------------------+
521 | String | :class:`str` |
522 +-----------------------+----------------------------------+
523 | Array | :class:`list` of native Python |
524 | | objects |
525 +-----------------------+----------------------------------+
526 | Sequence | :class:`list` of native Python |
527 | | objects |
528 +-----------------------+----------------------------------+
529 | Structure | :class:`dict` mapping field |
530 | | names to native Python objects |
531 +-----------------------+----------------------------------+
532
533 For example, printing the third element of a sequence named ``seq``
534 in a structure named ``my_struct`` of the ``event``'s field named
535 ``my_field`` is done this way:
536
537 .. code-block:: python
538
539 print(event['my_field']['my_struct']['seq'][2])
540 """
541
542 def __init__(self):
543 raise NotImplementedError("Event cannot be instantiated")
544
545 @property
546 def name(self):
547 """
548 Event's name or ``None`` on error.
549 """
550
551 return nbt._bt_ctf_event_name(self._e)
552
553 @property
554 def cycles(self):
555 """
556 Event's timestamp in cycles or -1 on error.
557 """
558
559 return nbt._bt_ctf_get_cycles(self._e)
560
561 @property
562 def timestamp(self):
563 """
564 Event's timestamp (nanoseconds since Epoch) or -1 on error.
565 """
566
567 return nbt._bt_ctf_get_timestamp(self._e)
568
569 @property
570 def datetime(self):
571 """
572 Event's timestamp as a standard :class:`datetime.datetime`
573 object.
574
575 Note that the :class:`datetime.datetime` class' precision
576 is limited to microseconds, whereas :attr:`timestamp` provides
577 the event's timestamp with a nanosecond resolution.
578 """
579
580 return datetime.fromtimestamp(self.timestamp / 1E9)
581
582 def field_with_scope(self, field_name, scope):
583 """
584 Returns the value of a field named *field_name* within the
585 scope *scope*, or ``None`` if the field cannot be found.
586
587 *scope* must be one of :class:`CTFScope` constants.
588 """
589
590 if scope not in _scopes:
591 raise ValueError("Invalid scope provided")
592
593 field = self._field_with_scope(field_name, scope)
594
595 if field is not None:
596 return field.value
597
598 def field_list_with_scope(self, scope):
599 """
600 Returns a list of field names in the scope *scope*.
601 """
602
603 if scope not in _scopes:
604 raise ValueError("Invalid scope provided")
605
606 field_names = []
607
608 for field in self._field_list_with_scope(scope):
609 field_names.append(field.name)
610
611 return field_names
612
613 @property
614 def handle(self):
615 """
616 :class:`TraceHandle` object containing this event, or ``None``
617 on error.
618 """
619
620 ret = nbt._bt_ctf_event_get_handle_id(self._e)
621
622 if ret < 0:
623 return None
624
625 th = TraceHandle.__new__(TraceHandle)
626 th._id = ret
627 th._trace_collection = self.get_trace_collection()
628
629 return th
630
631 @property
632 def trace_collection(self):
633 """
634 :class:`TraceCollection` object containing this event, or
635 ``None`` on error.
636 """
637
638 trace_collection = TraceCollection()
639 trace_collection._tc = nbt._bt_ctf_event_get_context(self._e)
640
641 if trace_collection._tc is not None:
642 return trace_collection
643
644 def __getitem__(self, field_name):
645 field = self._field(field_name)
646
647 if field is not None:
648 return field.value
649
650 raise KeyError(field_name)
651
652 def __iter__(self):
653 for key in self.keys():
654 yield key
655
656 def __len__(self):
657 count = 0
658
659 for scope in _scopes:
660 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
661 ret = nbt._bt_python_field_listcaller(self._e, scope_ptr)
662
663 if isinstance(ret, list):
664 count += ret[1]
665
666 return count
667
668 def __contains__(self, field_name):
669 return self._field(field_name) is not None
670
671 def keys(self):
672 """
673 Returns the list of field names.
674
675 Note: field names are unique within the returned list, although
676 a field name could exist in multiple scopes. Use
677 :meth:`field_list_with_scope` to obtain the list of field names
678 of a given scope.
679 """
680
681 field_names = set()
682
683 for scope in _scopes:
684 for name in self.field_list_with_scope(scope):
685 field_names.add(name)
686
687 return list(field_names)
688
689 def get(self, field_name, default=None):
690 """
691 Returns the value of the field named *field_name*, or *default*
692 when not found.
693
694 See :class:`Event` note about how fields are retrieved by
695 name when multiple fields share the same name in different
696 scopes.
697 """
698
699 field = self._field(field_name)
700
701 if field is None:
702 return default
703
704 return field.value
705
706 def items(self):
707 """
708 Generates pairs of (field name, field value).
709
710 This method iterates :meth:`keys` to find field names, which
711 means some fields could be unavailable if other fields share
712 their names in scopes with higher priorities.
713 """
714
715 for field in self.keys():
716 yield (field, self[field])
717
718 def _field_with_scope(self, field_name, scope):
719 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
720
721 if scope_ptr is None:
722 return None
723
724 definition_ptr = nbt._bt_ctf_get_field(self._e, scope_ptr, field_name)
725
726 if definition_ptr is None:
727 return None
728
729 field = _Definition(definition_ptr, scope)
730
731 return field
732
733 def _field(self, field_name):
734 field = None
735
736 for scope in _scopes:
737 field = self._field_with_scope(field_name, scope)
738
739 if field is not None:
740 break
741
742 return field
743
744 def _field_list_with_scope(self, scope):
745 fields = []
746 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
747
748 # Returns a list [list_ptr, count]. If list_ptr is NULL, SWIG will only
749 # provide the "count" return value
750 count = 0
751 list_ptr = None
752 ret = nbt._bt_python_field_listcaller(self._e, scope_ptr)
753
754 if isinstance(ret, list):
755 list_ptr, count = ret
756
757 for i in range(count):
758 definition_ptr = nbt._bt_python_field_one_from_list(list_ptr, i)
759
760 if definition_ptr is not None:
761 definition = _Definition(definition_ptr, scope)
762 fields.append(definition)
763
764 return fields
765
766
767 class FieldError(Exception):
768 """
769 Field error, raised when a field's value cannot be accessed.
770 """
771
772 def __init__(self, value):
773 self.value = value
774
775 def __str__(self):
776 return repr(self.value)
777
778
779 class EventDeclaration:
780 """Event declaration class. Do not instantiate."""
781
782 MAX_UINT64 = 0xFFFFFFFFFFFFFFFF
783
784 def __init__(self):
785 raise NotImplementedError("EventDeclaration cannot be instantiated")
786
787 @property
788 def name(self):
789 """Return the name of the event or None on error"""
790
791 return nbt._bt_ctf_get_decl_event_name(self._ed)
792
793 @property
794 def id(self):
795 """Return the event-ID of the event or -1 on error"""
796
797 id = nbt._bt_ctf_get_decl_event_id(self._ed)
798
799 if id == self.MAX_UINT64:
800 id = -1
801
802 return id
803
804 @property
805 def fields(self):
806 """
807 Generator returning all FieldDeclarations of an event, going through
808 each scope in the following order:
809 1) EVENT_FIELDS
810 2) EVENT_CONTEXT
811 3) STREAM_EVENT_CONTEXT
812 4) STREAM_EVENT_HEADER
813 5) STREAM_PACKET_CONTEXT
814 6) TRACE_PACKET_HEADER
815 """
816
817 for scope in _scopes:
818 for declaration in self.fields_scope(scope):
819 yield declaration
820
821 def fields_scope(self, scope):
822 """
823 Generator returning FieldDeclarations of the current event in scope.
824 """
825 ret = nbt._by_python_field_decl_listcaller(self._ed, scope)
826
827 if not isinstance(ret, list):
828 return
829
830 list_ptr, count = ret
831
832 for i in range(count):
833 field_decl_ptr = nbt._bt_python_field_decl_one_from_list(list_ptr, i)
834
835 if field_decl_ptr is not None:
836 decl_ptr = nbt._bt_ctf_get_decl_from_field_decl(field_decl_ptr)
837 name = nbt._bt_ctf_get_decl_field_name(field_decl_ptr)
838 field_declaration = _create_field_declaration(decl_ptr, name,
839 scope)
840 yield field_declaration
841
842
843 class FieldDeclaration:
844 """Field declaration class. Do not instantiate."""
845
846 def __init__(self):
847 raise NotImplementedError("FieldDeclaration cannot be instantiated")
848
849 def __repr__(self):
850 return "({0}) {1} {2}".format(CTFScope.scope_name(self.scope),
851 CTFTypeId.type_name(self.type),
852 self.name)
853
854 @property
855 def name(self):
856 """Return the name of a FieldDeclaration or None on error."""
857
858 return self._name
859
860 @property
861 def type(self):
862 """
863 Return the FieldDeclaration's type. One of the entries in class
864 CTFTypeId.
865 """
866
867 return nbt._bt_ctf_field_type(self._fd)
868
869 @property
870 def scope(self):
871 """
872 Return the FieldDeclaration's scope.
873 """
874
875 return self._s
876
877
878 class IntegerFieldDeclaration(FieldDeclaration):
879 """Do not instantiate."""
880
881 def __init__(self):
882 raise NotImplementedError("IntegerFieldDeclaration cannot be instantiated")
883
884 @property
885 def signedness(self):
886 """
887 Return the signedness of an integer:
888 0 if unsigned; 1 if signed; -1 on error.
889 """
890
891 return nbt._bt_ctf_get_int_signedness(self._fd)
892
893 @property
894 def base(self):
895 """Return the base of an int or a negative value on error."""
896
897 return nbt._bt_ctf_get_int_base(self._fd)
898
899 @property
900 def byte_order(self):
901 """
902 Return the byte order. One of class ByteOrder's entries.
903 """
904
905 ret = nbt._bt_ctf_get_int_byte_order(self._fd)
906
907 if ret == 1234:
908 return ByteOrder.BYTE_ORDER_LITTLE_ENDIAN
909 elif ret == 4321:
910 return ByteOrder.BYTE_ORDER_BIG_ENDIAN
911 else:
912 return ByteOrder.BYTE_ORDER_UNKNOWN
913
914 @property
915 def length(self):
916 """
917 Return the size, in bits, of an int or a negative
918 value on error.
919 """
920
921 return nbt._bt_ctf_get_int_len(self._fd)
922
923 @property
924 def encoding(self):
925 """
926 Return the encoding. One of class CTFStringEncoding's entries.
927 Return a negative value on error.
928 """
929
930 return nbt._bt_ctf_get_encoding(self._fd)
931
932
933 class EnumerationFieldDeclaration(FieldDeclaration):
934 """Do not instantiate."""
935
936 def __init__(self):
937 raise NotImplementedError("EnumerationFieldDeclaration cannot be instantiated")
938
939
940 class ArrayFieldDeclaration(FieldDeclaration):
941 """Do not instantiate."""
942
943 def __init__(self):
944 raise NotImplementedError("ArrayFieldDeclaration cannot be instantiated")
945
946 @property
947 def length(self):
948 """
949 Return the length of an array or a negative
950 value on error.
951 """
952
953 return nbt._bt_ctf_get_array_len(self._fd)
954
955 @property
956 def element_declaration(self):
957 """
958 Return element declaration.
959 """
960
961 field_decl_ptr = nbt._bt_python_get_array_element_declaration(self._fd)
962
963 return _create_field_declaration(field_decl_ptr, "", self.scope)
964
965
966 class SequenceFieldDeclaration(FieldDeclaration):
967 """Do not instantiate."""
968
969 def __init__(self):
970 raise NotImplementedError("SequenceFieldDeclaration cannot be instantiated")
971
972 @property
973 def element_declaration(self):
974 """
975 Return element declaration.
976 """
977
978 field_decl_ptr = nbt._bt_python_get_sequence_element_declaration(self._fd)
979
980 return _create_field_declaration(field_decl_ptr, "", self.scope)
981
982
983 class FloatFieldDeclaration(FieldDeclaration):
984 """Do not instantiate."""
985
986 def __init__(self):
987 raise NotImplementedError("FloatFieldDeclaration cannot be instantiated")
988
989
990 class StructureFieldDeclaration(FieldDeclaration):
991 """Do not instantiate."""
992
993 def __init__(self):
994 raise NotImplementedError("StructureFieldDeclaration cannot be instantiated")
995
996
997 class StringFieldDeclaration(FieldDeclaration):
998 """Do not instantiate."""
999
1000 def __init__(self):
1001 raise NotImplementedError("StringFieldDeclaration cannot be instantiated")
1002
1003
1004 class VariantFieldDeclaration(FieldDeclaration):
1005 """Do not instantiate."""
1006
1007 def __init__(self):
1008 raise NotImplementedError("VariantFieldDeclaration cannot be instantiated")
1009
1010
1011 def field_error():
1012 """
1013 Return the last error code encountered while
1014 accessing a field and reset the error flag.
1015 Return 0 if no error, a negative value otherwise.
1016 """
1017
1018 return nbt._bt_ctf_field_get_error()
1019
1020
1021 def _create_field_declaration(declaration_ptr, name, scope):
1022 """
1023 Private field declaration factory.
1024 """
1025
1026 if declaration_ptr is None:
1027 raise ValueError("declaration_ptr must be valid")
1028 if scope not in _scopes:
1029 raise ValueError("Invalid scope provided")
1030
1031 type = nbt._bt_ctf_field_type(declaration_ptr)
1032 declaration = None
1033
1034 if type == CTFTypeId.INTEGER:
1035 declaration = IntegerFieldDeclaration.__new__(IntegerFieldDeclaration)
1036 elif type == CTFTypeId.ENUM:
1037 declaration = EnumerationFieldDeclaration.__new__(EnumerationFieldDeclaration)
1038 elif type == CTFTypeId.ARRAY:
1039 declaration = ArrayFieldDeclaration.__new__(ArrayFieldDeclaration)
1040 elif type == CTFTypeId.SEQUENCE:
1041 declaration = SequenceFieldDeclaration.__new__(SequenceFieldDeclaration)
1042 elif type == CTFTypeId.FLOAT:
1043 declaration = FloatFieldDeclaration.__new__(FloatFieldDeclaration)
1044 elif type == CTFTypeId.STRUCT:
1045 declaration = StructureFieldDeclaration.__new__(StructureFieldDeclaration)
1046 elif type == CTFTypeId.STRING:
1047 declaration = StringFieldDeclaration.__new__(StringFieldDeclaration)
1048 elif type == CTFTypeId.VARIANT:
1049 declaration = VariantFieldDeclaration.__new__(VariantFieldDeclaration)
1050 else:
1051 return declaration
1052
1053 declaration._fd = declaration_ptr
1054 declaration._s = scope
1055 declaration._name = name
1056
1057 return declaration
1058
1059
1060 class _Definition:
1061 def __init__(self, definition_ptr, scope):
1062 self._d = definition_ptr
1063 self._s = scope
1064
1065 if scope not in _scopes:
1066 ValueError("Invalid scope provided")
1067
1068 @property
1069 def name(self):
1070 """Return the name of a field or None on error."""
1071
1072 return nbt._bt_ctf_field_name(self._d)
1073
1074 @property
1075 def type(self):
1076 """Return the type of a field or -1 if unknown."""
1077
1078 return nbt._bt_ctf_field_type(nbt._bt_ctf_get_decl_from_def(self._d))
1079
1080 @property
1081 def declaration(self):
1082 """Return the associated Definition object."""
1083
1084 return _create_field_declaration(
1085 nbt._bt_ctf_get_decl_from_def(self._d), self.name, self.scope)
1086
1087 def _get_enum_str(self):
1088 """
1089 Return the string matching the current enumeration.
1090 Return None on error.
1091 """
1092
1093 return nbt._bt_ctf_get_enum_str(self._d)
1094
1095 def _get_array_element_at(self, index):
1096 """
1097 Return the array's element at position index.
1098 Return None on error
1099 """
1100
1101 array_ptr = nbt._bt_python_get_array_from_def(self._d)
1102
1103 if array_ptr is None:
1104 return None
1105
1106 definition_ptr = nbt._bt_array_index(array_ptr, index)
1107
1108 if definition_ptr is None:
1109 return None
1110
1111 return _Definition(definition_ptr, self.scope)
1112
1113 def _get_sequence_len(self):
1114 """
1115 Return the len of a sequence or a negative
1116 value on error.
1117 """
1118
1119 seq = nbt._bt_python_get_sequence_from_def(self._d)
1120
1121 return nbt._bt_sequence_len(seq)
1122
1123 def _get_sequence_element_at(self, index):
1124 """
1125 Return the sequence's element at position index,
1126 otherwise return None
1127 """
1128
1129 seq = nbt._bt_python_get_sequence_from_def(self._d)
1130
1131 if seq is not None:
1132 definition_ptr = nbt._bt_sequence_index(seq, index)
1133
1134 if definition_ptr is not None:
1135 return _Definition(definition_ptr, self.scope)
1136
1137 def _get_uint64(self):
1138 """
1139 Return the value associated with the field.
1140 If the field does not exist or is not of the type requested,
1141 the value returned is undefined. To check if an error occured,
1142 use the field_error() function after accessing a field.
1143 """
1144
1145 return nbt._bt_ctf_get_uint64(self._d)
1146
1147 def _get_int64(self):
1148 """
1149 Return the value associated with the field.
1150 If the field does not exist or is not of the type requested,
1151 the value returned is undefined. To check if an error occured,
1152 use the field_error() function after accessing a field.
1153 """
1154
1155 return nbt._bt_ctf_get_int64(self._d)
1156
1157 def _get_char_array(self):
1158 """
1159 Return the value associated with the field.
1160 If the field does not exist or is not of the type requested,
1161 the value returned is undefined. To check if an error occurred,
1162 use the field_error() function after accessing a field.
1163 """
1164
1165 return nbt._bt_ctf_get_char_array(self._d)
1166
1167 def _get_str(self):
1168 """
1169 Return the value associated with the field.
1170 If the field does not exist or is not of the type requested,
1171 the value returned is undefined. To check if an error occurred,
1172 use the field_error() function after accessing a field.
1173 """
1174
1175 return nbt._bt_ctf_get_string(self._d)
1176
1177 def _get_float(self):
1178 """
1179 Return the value associated with the field.
1180 If the field does not exist or is not of the type requested,
1181 the value returned is undefined. To check if an error occurred,
1182 use the field_error() function after accessing a field.
1183 """
1184
1185 return nbt._bt_ctf_get_float(self._d)
1186
1187 def _get_variant(self):
1188 """
1189 Return the variant's selected field.
1190 If the field does not exist or is not of the type requested,
1191 the value returned is undefined. To check if an error occurred,
1192 use the field_error() function after accessing a field.
1193 """
1194
1195 return nbt._bt_ctf_get_variant(self._d)
1196
1197 def _get_struct_field_count(self):
1198 """
1199 Return the number of fields contained in the structure.
1200 If the field does not exist or is not of the type requested,
1201 the value returned is undefined.
1202 """
1203
1204 return nbt._bt_ctf_get_struct_field_count(self._d)
1205
1206 def _get_struct_field_at(self, i):
1207 """
1208 Return the structure's field at position i.
1209 If the field does not exist or is not of the type requested,
1210 the value returned is undefined. To check if an error occurred,
1211 use the field_error() function after accessing a field.
1212 """
1213
1214 return nbt._bt_ctf_get_struct_field_index(self._d, i)
1215
1216 @property
1217 def value(self):
1218 """
1219 Return the value associated with the field according to its type.
1220 Return None on error.
1221 """
1222
1223 id = self.type
1224 value = None
1225
1226 if id == CTFTypeId.STRING:
1227 value = self._get_str()
1228 elif id == CTFTypeId.ARRAY:
1229 element_decl = self.declaration.element_declaration
1230
1231 if ((element_decl.type == CTFTypeId.INTEGER
1232 and element_decl.length == 8)
1233 and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)):
1234 value = nbt._bt_python_get_array_string(self._d)
1235 else:
1236 value = []
1237
1238 for i in range(self.declaration.length):
1239 element = self._get_array_element_at(i)
1240 value.append(element.value)
1241 elif id == CTFTypeId.INTEGER:
1242 if self.declaration.signedness == 0:
1243 value = self._get_uint64()
1244 else:
1245 value = self._get_int64()
1246 elif id == CTFTypeId.ENUM:
1247 value = self._get_enum_str()
1248 elif id == CTFTypeId.SEQUENCE:
1249 element_decl = self.declaration.element_declaration
1250
1251 if ((element_decl.type == CTFTypeId.INTEGER
1252 and element_decl.length == 8)
1253 and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)):
1254 value = nbt._bt_python_get_sequence_string(self._d)
1255 else:
1256 seq_len = self._get_sequence_len()
1257 value = []
1258
1259 for i in range(seq_len):
1260 evDef = self._get_sequence_element_at(i)
1261 value.append(evDef.value)
1262 elif id == CTFTypeId.FLOAT:
1263 value = self._get_float()
1264 elif id == CTFTypeId.VARIANT:
1265 variant = _Definition.__new__(_Definition)
1266 variant._d = self._get_variant()
1267 value = variant.value
1268 elif id == CTFTypeId.STRUCT:
1269 value = {}
1270
1271 for i in range(self._get_struct_field_count()):
1272 member = _Definition(self._get_struct_field_at(i), self.scope)
1273 value[member.name] = member.value
1274
1275 if field_error():
1276 raise FieldError(
1277 "Error occurred while accessing field {} of type {}".format(
1278 self.name,
1279 CTFTypeId.type_name(id)))
1280
1281 return value
1282
1283 @property
1284 def scope(self):
1285 """Return the scope of a field or None on error."""
1286
1287 return self._s
1288
1289
1290 class CTFWriter:
1291 # Used to compare to -1ULL in error checks
1292 _MAX_UINT64 = 0xFFFFFFFFFFFFFFFF
1293
1294 class EnumerationMapping:
1295 """
1296 Enumeration mapping class. start and end values are inclusive.
1297 """
1298
1299 def __init__(self, name, start, end):
1300 self.name = name
1301 self.start = start
1302 self.end = end
1303
1304 class Clock:
1305 def __init__(self, name):
1306 self._c = nbt._bt_ctf_clock_create(name)
1307
1308 if self._c is None:
1309 raise ValueError("Invalid clock name.")
1310
1311 def __del__(self):
1312 nbt._bt_ctf_clock_put(self._c)
1313
1314 @property
1315 def name(self):
1316 """
1317 Get the clock's name.
1318 """
1319
1320 name = nbt._bt_ctf_clock_get_name(self._c)
1321
1322 if name is None:
1323 raise ValueError("Invalid clock instance.")
1324
1325 return name
1326
1327 @property
1328 def description(self):
1329 """
1330 Get the clock's description. None if unset.
1331 """
1332
1333 return nbt._bt_ctf_clock_get_description(self._c)
1334
1335 @description.setter
1336 def description(self, desc):
1337 """
1338 Set the clock's description. The description appears in the clock's TSDL
1339 meta-data.
1340 """
1341
1342 ret = nbt._bt_ctf_clock_set_description(self._c, str(desc))
1343
1344 if ret < 0:
1345 raise ValueError("Invalid clock description.")
1346
1347 @property
1348 def frequency(self):
1349 """
1350 Get the clock's frequency (Hz).
1351 """
1352
1353 freq = nbt._bt_ctf_clock_get_frequency(self._c)
1354
1355 if freq == CTFWriter._MAX_UINT64:
1356 raise ValueError("Invalid clock instance")
1357
1358 return freq
1359
1360 @frequency.setter
1361 def frequency(self, freq):
1362 """
1363 Set the clock's frequency (Hz).
1364 """
1365
1366 ret = nbt._bt_ctf_clock_set_frequency(self._c, freq)
1367
1368 if ret < 0:
1369 raise ValueError("Invalid frequency value.")
1370
1371 @property
1372 def precision(self):
1373 """
1374 Get the clock's precision (in clock ticks).
1375 """
1376
1377 precision = nbt._bt_ctf_clock_get_precision(self._c)
1378
1379 if precision == CTFWriter._MAX_UINT64:
1380 raise ValueError("Invalid clock instance")
1381
1382 return precision
1383
1384 @precision.setter
1385 def precision(self, precision):
1386 """
1387 Set the clock's precision (in clock ticks).
1388 """
1389
1390 ret = nbt._bt_ctf_clock_set_precision(self._c, precision)
1391
1392 @property
1393 def offset_seconds(self):
1394 """
1395 Get the clock's offset in seconds from POSIX.1 Epoch.
1396 """
1397
1398 offset_s = nbt._bt_ctf_clock_get_offset_s(self._c)
1399
1400 if offset_s == CTFWriter._MAX_UINT64:
1401 raise ValueError("Invalid clock instance")
1402
1403 return offset_s
1404
1405 @offset_seconds.setter
1406 def offset_seconds(self, offset_s):
1407 """
1408 Set the clock's offset in seconds from POSIX.1 Epoch.
1409 """
1410
1411 ret = nbt._bt_ctf_clock_set_offset_s(self._c, offset_s)
1412
1413 if ret < 0:
1414 raise ValueError("Invalid offset value.")
1415
1416 @property
1417 def offset(self):
1418 """
1419 Get the clock's offset in ticks from POSIX.1 Epoch + offset in seconds.
1420 """
1421
1422 offset = nbt._bt_ctf_clock_get_offset(self._c)
1423
1424 if offset == CTFWriter._MAX_UINT64:
1425 raise ValueError("Invalid clock instance")
1426
1427 return offset
1428
1429 @offset.setter
1430 def offset(self, offset):
1431 """
1432 Set the clock's offset in ticks from POSIX.1 Epoch + offset in seconds.
1433 """
1434
1435 ret = nbt._bt_ctf_clock_set_offset(self._c, offset)
1436
1437 if ret < 0:
1438 raise ValueError("Invalid offset value.")
1439
1440 @property
1441 def absolute(self):
1442 """
1443 Get a clock's absolute attribute. A clock is absolute if the clock
1444 is a global reference across the trace's other clocks.
1445 """
1446
1447 is_absolute = nbt._bt_ctf_clock_get_is_absolute(self._c)
1448
1449 if is_absolute == -1:
1450 raise ValueError("Invalid clock instance")
1451
1452 return False if is_absolute == 0 else True
1453
1454 @absolute.setter
1455 def absolute(self, is_absolute):
1456 """
1457 Set a clock's absolute attribute. A clock is absolute if the clock
1458 is a global reference across the trace's other clocks.
1459 """
1460
1461 ret = nbt._bt_ctf_clock_set_is_absolute(self._c, int(is_absolute))
1462
1463 if ret < 0:
1464 raise ValueError("Could not set the clock's absolute attribute.")
1465
1466 @property
1467 def uuid(self):
1468 """
1469 Get a clock's UUID (an object of type UUID).
1470 """
1471
1472 uuid_list = []
1473
1474 for i in range(16):
1475 ret, value = nbt._bt_python_ctf_clock_get_uuid_index(self._c, i)
1476
1477 if ret < 0:
1478 raise ValueError("Invalid clock instance")
1479
1480 uuid_list.append(value)
1481
1482 return UUID(bytes=bytes(uuid_list))
1483
1484 @uuid.setter
1485 def uuid(self, uuid):
1486 """
1487 Set a clock's UUID (an object of type UUID).
1488 """
1489
1490 uuid_bytes = uuid.bytes
1491
1492 if len(uuid_bytes) != 16:
1493 raise ValueError("Invalid UUID provided. UUID length must be 16 bytes")
1494
1495 for i in range(len(uuid_bytes)):
1496 ret = nbt._bt_python_ctf_clock_set_uuid_index(self._c, i,
1497 uuid_bytes[i])
1498
1499 if ret < 0:
1500 raise ValueError("Invalid clock instance")
1501
1502 @property
1503 def time(self):
1504 """
1505 Get the current time in nanoseconds since the clock's origin (offset and
1506 offset_s attributes).
1507 """
1508
1509 time = nbt._bt_ctf_clock_get_time(self._c)
1510
1511 if time == CTFWriter._MAX_UINT64:
1512 raise ValueError("Invalid clock instance")
1513
1514 return time
1515
1516 @time.setter
1517 def time(self, time):
1518 """
1519 Set the current time in nanoseconds since the clock's origin (offset and
1520 offset_s attributes). The clock's value will be sampled as events are
1521 appended to a stream.
1522 """
1523
1524 ret = nbt._bt_ctf_clock_set_time(self._c, time)
1525
1526 if ret < 0:
1527 raise ValueError("Invalid time value.")
1528
1529 class FieldDeclaration:
1530 """
1531 FieldDeclaration should not be instantiated directly. Instantiate
1532 one of the concrete FieldDeclaration classes.
1533 """
1534
1535 class IntegerBase:
1536 # These values are based on the bt_ctf_integer_base enum
1537 # declared in event-types.h.
1538 INTEGER_BASE_UNKNOWN = -1
1539 INTEGER_BASE_BINARY = 2
1540 INTEGER_BASE_OCTAL = 8
1541 INTEGER_BASE_DECIMAL = 10
1542 INTEGER_BASE_HEXADECIMAL = 16
1543
1544 def __init__(self):
1545 if self._ft is None:
1546 raise ValueError("FieldDeclaration creation failed.")
1547
1548 def __del__(self):
1549 nbt._bt_ctf_field_type_put(self._ft)
1550
1551 @staticmethod
1552 def _create_field_declaration_from_native_instance(
1553 native_field_declaration):
1554 type_dict = {
1555 CTFTypeId.INTEGER: CTFWriter.IntegerFieldDeclaration,
1556 CTFTypeId.FLOAT: CTFWriter.FloatFieldDeclaration,
1557 CTFTypeId.ENUM: CTFWriter.EnumerationFieldDeclaration,
1558 CTFTypeId.STRING: CTFWriter.StringFieldDeclaration,
1559 CTFTypeId.STRUCT: CTFWriter.StructureFieldDeclaration,
1560 CTFTypeId.VARIANT: CTFWriter.VariantFieldDeclaration,
1561 CTFTypeId.ARRAY: CTFWriter.ArrayFieldDeclaration,
1562 CTFTypeId.SEQUENCE: CTFWriter.SequenceFieldDeclaration
1563 }
1564
1565 field_type_id = nbt._bt_ctf_field_type_get_type_id(native_field_declaration)
1566
1567 if field_type_id == CTFTypeId.UNKNOWN:
1568 raise TypeError("Invalid field instance")
1569
1570 declaration = CTFWriter.Field.__new__(CTFWriter.Field)
1571 declaration._ft = native_field_declaration
1572 declaration.__class__ = type_dict[field_type_id]
1573
1574 return declaration
1575
1576 @property
1577 def alignment(self):
1578 """
1579 Get the field declaration's alignment. Returns -1 on error.
1580 """
1581
1582 return nbt._bt_ctf_field_type_get_alignment(self._ft)
1583
1584 @alignment.setter
1585 def alignment(self, alignment):
1586 """
1587 Set the field declaration's alignment. Defaults to 1 (bit-aligned). However,
1588 some types, such as structures and string, may impose other alignment
1589 constraints.
1590 """
1591
1592 ret = nbt._bt_ctf_field_type_set_alignment(self._ft, alignment)
1593
1594 if ret < 0:
1595 raise ValueError("Invalid alignment value.")
1596
1597 @property
1598 def byte_order(self):
1599 """
1600 Get the field declaration's byte order. One of the ByteOrder's constant.
1601 """
1602
1603 return nbt._bt_ctf_field_type_get_byte_order(self._ft)
1604
1605 @byte_order.setter
1606 def byte_order(self, byte_order):
1607 """
1608 Set the field declaration's byte order. Use constants defined in the ByteOrder
1609 class.
1610 """
1611
1612 ret = nbt._bt_ctf_field_type_set_byte_order(self._ft, byte_order)
1613
1614 if ret < 0:
1615 raise ValueError("Could not set byte order value.")
1616
1617 class IntegerFieldDeclaration(FieldDeclaration):
1618 def __init__(self, size):
1619 """
1620 Create a new integer field declaration of the given size.
1621 """
1622 self._ft = nbt._bt_ctf_field_type_integer_create(size)
1623 super().__init__()
1624
1625 @property
1626 def size(self):
1627 """
1628 Get an integer's size.
1629 """
1630
1631 ret = nbt._bt_ctf_field_type_integer_get_size(self._ft)
1632
1633 if ret < 0:
1634 raise ValueError("Could not get Integer's size attribute.")
1635 else:
1636 return ret
1637
1638 @property
1639 def signed(self):
1640 """
1641 Get an integer's signedness attribute.
1642 """
1643
1644 ret = nbt._bt_ctf_field_type_integer_get_signed(self._ft)
1645
1646 if ret < 0:
1647 raise ValueError("Could not get Integer's signed attribute.")
1648 elif ret > 0:
1649 return True
1650 else:
1651 return False
1652
1653 @signed.setter
1654 def signed(self, signed):
1655 """
1656 Set an integer's signedness attribute.
1657 """
1658
1659 ret = nbt._bt_ctf_field_type_integer_set_signed(self._ft, signed)
1660
1661 if ret < 0:
1662 raise ValueError("Could not set Integer's signed attribute.")
1663
1664 @property
1665 def base(self):
1666 """
1667 Get the integer's base used to pretty-print the resulting trace.
1668 Returns a constant from the FieldDeclaration.IntegerBase class.
1669 """
1670
1671 return nbt._bt_ctf_field_type_integer_get_base(self._ft)
1672
1673 @base.setter
1674 def base(self, base):
1675 """
1676 Set the integer's base used to pretty-print the resulting trace.
1677 The base must be a constant of the FieldDeclarationIntegerBase class.
1678 """
1679
1680 ret = nbt._bt_ctf_field_type_integer_set_base(self._ft, base)
1681
1682 if ret < 0:
1683 raise ValueError("Could not set Integer's base.")
1684
1685 @property
1686 def encoding(self):
1687 """
1688 Get the integer's encoding (one of the constants of the
1689 CTFStringEncoding class).
1690 Returns a constant from the CTFStringEncoding class.
1691 """
1692
1693 return nbt._bt_ctf_field_type_integer_get_encoding(self._ft)
1694
1695 @encoding.setter
1696 def encoding(self, encoding):
1697 """
1698 An integer encoding may be set to signal that the integer must be printed
1699 as a text character. Must be a constant from the CTFStringEncoding class.
1700 """
1701
1702 ret = nbt._bt_ctf_field_type_integer_set_encoding(self._ft, encoding)
1703
1704 if ret < 0:
1705 raise ValueError("Could not set Integer's encoding.")
1706
1707 class EnumerationFieldDeclaration(FieldDeclaration):
1708 def __init__(self, integer_type):
1709 """
1710 Create a new enumeration field declaration with the given underlying container type.
1711 """
1712 isinst = isinstance(integer_type, CTFWriter.IntegerFieldDeclaration)
1713
1714 if integer_type is None or not isinst:
1715 raise TypeError("Invalid integer container.")
1716
1717 self._ft = nbt._bt_ctf_field_type_enumeration_create(integer_type._ft)
1718 super().__init__()
1719
1720 @property
1721 def container(self):
1722 """
1723 Get the enumeration's underlying container type.
1724 """
1725
1726 ret = nbt._bt_ctf_field_type_enumeration_get_container_type(self._ft)
1727
1728 if ret is None:
1729 raise TypeError("Invalid enumeration declaration")
1730
1731 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1732
1733 def add_mapping(self, name, range_start, range_end):
1734 """
1735 Add a mapping to the enumeration. The range's values are inclusive.
1736 """
1737
1738 if range_start < 0 or range_end < 0:
1739 ret = nbt._bt_ctf_field_type_enumeration_add_mapping(self._ft,
1740 str(name),
1741 range_start,
1742 range_end)
1743 else:
1744 ret = nbt._bt_ctf_field_type_enumeration_add_mapping_unsigned(self._ft,
1745 str(name),
1746 range_start,
1747 range_end)
1748
1749 if ret < 0:
1750 raise ValueError("Could not add mapping to enumeration declaration.")
1751
1752 @property
1753 def mappings(self):
1754 """
1755 Generator returning instances of EnumerationMapping.
1756 """
1757
1758 signed = self.container.signed
1759
1760 count = nbt._bt_ctf_field_type_enumeration_get_mapping_count(self._ft)
1761
1762 for i in range(count):
1763 if signed:
1764 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, i)
1765 else:
1766 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, i)
1767
1768 if len(ret) != 3:
1769 msg = "Could not get Enumeration mapping at index {}".format(i)
1770 raise TypeError(msg)
1771
1772 name, range_start, range_end = ret
1773 yield CTFWriter.EnumerationMapping(name, range_start, range_end)
1774
1775 def get_mapping_by_name(self, name):
1776 """
1777 Get a mapping by name (EnumerationMapping).
1778 """
1779
1780 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_name(self._ft, name)
1781
1782 if index < 0:
1783 return None
1784
1785 if self.container.signed:
1786 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index)
1787 else:
1788 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index)
1789
1790 if len(ret) != 3:
1791 msg = "Could not get Enumeration mapping at index {}".format(i)
1792 raise TypeError(msg)
1793
1794 name, range_start, range_end = ret
1795
1796 return CTFWriter.EnumerationMapping(name, range_start, range_end)
1797
1798 def get_mapping_by_value(self, value):
1799 """
1800 Get a mapping by value (EnumerationMapping).
1801 """
1802
1803 if value < 0:
1804 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_value(self._ft, value)
1805 else:
1806 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_unsigned_value(self._ft, value)
1807
1808 if index < 0:
1809 return None
1810
1811 if self.container.signed:
1812 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index)
1813 else:
1814 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index)
1815
1816 if len(ret) != 3:
1817 msg = "Could not get Enumeration mapping at index {}".format(i)
1818 raise TypeError(msg)
1819
1820 name, range_start, range_end = ret
1821
1822 return CTFWriter.EnumerationMapping(name, range_start, range_end)
1823
1824 class FloatFieldDeclaration(FieldDeclaration):
1825 FLT_EXP_DIG = 8
1826 DBL_EXP_DIG = 11
1827 FLT_MANT_DIG = 24
1828 DBL_MANT_DIG = 53
1829
1830 def __init__(self):
1831 """
1832 Create a new floating point field declaration.
1833 """
1834
1835 self._ft = nbt._bt_ctf_field_type_floating_point_create()
1836 super().__init__()
1837
1838 @property
1839 def exponent_digits(self):
1840 """
1841 Get the number of exponent digits used to store the floating point field.
1842 """
1843
1844 ret = nbt._bt_ctf_field_type_floating_point_get_exponent_digits(self._ft)
1845
1846 if ret < 0:
1847 raise TypeError(
1848 "Could not get Floating point exponent digit count")
1849
1850 return ret
1851
1852 @exponent_digits.setter
1853 def exponent_digits(self, exponent_digits):
1854 """
1855 Set the number of exponent digits to use to store the floating point field.
1856 The only values currently supported are FLT_EXP_DIG and DBL_EXP_DIG which
1857 are defined as constants of this class.
1858 """
1859
1860 ret = nbt._bt_ctf_field_type_floating_point_set_exponent_digits(self._ft,
1861 exponent_digits)
1862
1863 if ret < 0:
1864 raise ValueError("Could not set exponent digit count.")
1865
1866 @property
1867 def mantissa_digits(self):
1868 """
1869 Get the number of mantissa digits used to store the floating point field.
1870 """
1871
1872 ret = nbt._bt_ctf_field_type_floating_point_get_mantissa_digits(self._ft)
1873
1874 if ret < 0:
1875 raise TypeError("Could not get Floating point mantissa digit count")
1876
1877 return ret
1878
1879 @mantissa_digits.setter
1880 def mantissa_digits(self, mantissa_digits):
1881 """
1882 Set the number of mantissa digits to use to store the floating point field.
1883 The only values currently supported are FLT_MANT_DIG and DBL_MANT_DIG which
1884 are defined as constants of this class.
1885 """
1886
1887 ret = nbt._bt_ctf_field_type_floating_point_set_mantissa_digits(self._ft,
1888 mantissa_digits)
1889
1890 if ret < 0:
1891 raise ValueError("Could not set mantissa digit count.")
1892
1893 class FloatingPointFieldDeclaration(FloatFieldDeclaration):
1894 pass
1895
1896 class StructureFieldDeclaration(FieldDeclaration):
1897 def __init__(self):
1898 """
1899 Create a new structure field declaration.
1900 """
1901
1902 self._ft = nbt._bt_ctf_field_type_structure_create()
1903 super().__init__()
1904
1905 def add_field(self, field_type, field_name):
1906 """
1907 Add a field of type "field_type" to the structure.
1908 """
1909
1910 ret = nbt._bt_ctf_field_type_structure_add_field(self._ft,
1911 field_type._ft,
1912 str(field_name))
1913
1914 if ret < 0:
1915 raise ValueError("Could not add field to structure.")
1916
1917 @property
1918 def fields(self):
1919 """
1920 Generator returning the structure's field as tuples of (field name, field declaration).
1921 """
1922
1923 count = nbt._bt_ctf_field_type_structure_get_field_count(self._ft)
1924
1925 if count < 0:
1926 raise TypeError("Could not get Structure field count")
1927
1928 for i in range(count):
1929 field_name = nbt._bt_python_ctf_field_type_structure_get_field_name(self._ft, i)
1930
1931 if field_name is None:
1932 msg = "Could not get Structure field name at index {}".format(i)
1933 raise TypeError(msg)
1934
1935 field_type_native = nbt._bt_python_ctf_field_type_structure_get_field_type(self._ft, i)
1936
1937 if field_type_native is None:
1938 msg = "Could not get Structure field type at index {}".format(i)
1939 raise TypeError(msg)
1940
1941 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1942 yield (field_name, field_type)
1943
1944 def get_field_by_name(self, name):
1945 """
1946 Get a field declaration by name (FieldDeclaration).
1947 """
1948
1949 field_type_native = nbt._bt_ctf_field_type_structure_get_field_type_by_name(self._ft, name)
1950
1951 if field_type_native is None:
1952 msg = "Could not find Structure field with name {}".format(name)
1953 raise TypeError(msg)
1954
1955 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1956
1957 class VariantFieldDeclaration(FieldDeclaration):
1958 def __init__(self, enum_tag, tag_name):
1959 """
1960 Create a new variant field declaration.
1961 """
1962
1963 isinst = isinstance(enum_tag, CTFWriter.EnumerationFieldDeclaration)
1964 if enum_tag is None or not isinst:
1965 raise TypeError("Invalid tag type; must be of type EnumerationFieldDeclaration.")
1966
1967 self._ft = nbt._bt_ctf_field_type_variant_create(enum_tag._ft,
1968 str(tag_name))
1969 super().__init__()
1970
1971 @property
1972 def tag_name(self):
1973 """
1974 Get the variant's tag name.
1975 """
1976
1977 ret = nbt._bt_ctf_field_type_variant_get_tag_name(self._ft)
1978
1979 if ret is None:
1980 raise TypeError("Could not get Variant tag name")
1981
1982 return ret
1983
1984 @property
1985 def tag_type(self):
1986 """
1987 Get the variant's tag type.
1988 """
1989
1990 ret = nbt._bt_ctf_field_type_variant_get_tag_type(self._ft)
1991
1992 if ret is None:
1993 raise TypeError("Could not get Variant tag type")
1994
1995 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1996
1997 def add_field(self, field_type, field_name):
1998 """
1999 Add a field of type "field_type" to the variant.
2000 """
2001
2002 ret = nbt._bt_ctf_field_type_variant_add_field(self._ft,
2003 field_type._ft,
2004 str(field_name))
2005
2006 if ret < 0:
2007 raise ValueError("Could not add field to variant.")
2008
2009 @property
2010 def fields(self):
2011 """
2012 Generator returning the variant's field as tuples of (field name, field declaration).
2013 """
2014
2015 count = nbt._bt_ctf_field_type_variant_get_field_count(self._ft)
2016
2017 if count < 0:
2018 raise TypeError("Could not get Variant field count")
2019
2020 for i in range(count):
2021 field_name = nbt._bt_python_ctf_field_type_variant_get_field_name(self._ft, i)
2022
2023 if field_name is None:
2024 msg = "Could not get Variant field name at index {}".format(i)
2025 raise TypeError(msg)
2026
2027 field_type_native = nbt._bt_python_ctf_field_type_variant_get_field_type(self._ft, i)
2028
2029 if field_type_native is None:
2030 msg = "Could not get Variant field type at index {}".format(i)
2031 raise TypeError(msg)
2032
2033 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2034 yield (field_name, field_type)
2035
2036 def get_field_by_name(self, name):
2037 """
2038 Get a field declaration by name (FieldDeclaration).
2039 """
2040
2041 field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_by_name(self._ft,
2042 name)
2043
2044 if field_type_native is None:
2045 msg = "Could not find Variant field with name {}".format(name)
2046 raise TypeError(msg)
2047
2048 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2049
2050 def get_field_from_tag(self, tag):
2051 """
2052 Get a field declaration from tag (EnumerationField).
2053 """
2054
2055 field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_from_tag(self._ft, tag._f)
2056
2057 if field_type_native is None:
2058 msg = "Could not find Variant field with tag value {}".format(tag.value)
2059 raise TypeError(msg)
2060
2061 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2062
2063 class ArrayFieldDeclaration(FieldDeclaration):
2064 def __init__(self, element_type, length):
2065 """
2066 Create a new array field declaration.
2067 """
2068
2069 self._ft = nbt._bt_ctf_field_type_array_create(element_type._ft,
2070 length)
2071 super().__init__()
2072
2073 @property
2074 def element_type(self):
2075 """
2076 Get the array's element type.
2077 """
2078
2079 ret = nbt._bt_ctf_field_type_array_get_element_type(self._ft)
2080
2081 if ret is None:
2082 raise TypeError("Could not get Array element type")
2083
2084 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
2085
2086 @property
2087 def length(self):
2088 """
2089 Get the array's length.
2090 """
2091
2092 ret = nbt._bt_ctf_field_type_array_get_length(self._ft)
2093
2094 if ret < 0:
2095 raise TypeError("Could not get Array length")
2096
2097 return ret
2098
2099 class SequenceFieldDeclaration(FieldDeclaration):
2100 def __init__(self, element_type, length_field_name):
2101 """
2102 Create a new sequence field declaration.
2103 """
2104
2105 self._ft = nbt._bt_ctf_field_type_sequence_create(element_type._ft,
2106 str(length_field_name))
2107 super().__init__()
2108
2109 @property
2110 def element_type(self):
2111 """
2112 Get the sequence's element type.
2113 """
2114
2115 ret = nbt._bt_ctf_field_type_sequence_get_element_type(self._ft)
2116
2117 if ret is None:
2118 raise TypeError("Could not get Sequence element type")
2119
2120 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
2121
2122 @property
2123 def length_field_name(self):
2124 """
2125 Get the sequence's length field name.
2126 """
2127
2128 ret = nbt._bt_ctf_field_type_sequence_get_length_field_name(self._ft)
2129
2130 if ret is None:
2131 raise TypeError("Could not get Sequence length field name")
2132
2133 return ret
2134
2135 class StringFieldDeclaration(FieldDeclaration):
2136 def __init__(self):
2137 """
2138 Create a new string field declaration.
2139 """
2140
2141 self._ft = nbt._bt_ctf_field_type_string_create()
2142 super().__init__()
2143
2144 @property
2145 def encoding(self):
2146 """
2147 Get a string declaration's encoding (a constant from the CTFStringEncoding class).
2148 """
2149
2150 return nbt._bt_ctf_field_type_string_get_encoding(self._ft)
2151
2152 @encoding.setter
2153 def encoding(self, encoding):
2154 """
2155 Set a string declaration's encoding. Must be a constant from the CTFStringEncoding class.
2156 """
2157
2158 ret = nbt._bt_ctf_field_type_string_set_encoding(self._ft, encoding)
2159 if ret < 0:
2160 raise ValueError("Could not set string encoding.")
2161
2162 @staticmethod
2163 def create_field(field_type):
2164 """
2165 Create an instance of a field.
2166 """
2167 isinst = isinstance(field_type, CTFWriter.FieldDeclaration)
2168
2169 if field_type is None or not isinst:
2170 raise TypeError("Invalid field_type. Type must be a FieldDeclaration-derived class.")
2171
2172 if isinstance(field_type, CTFWriter.IntegerFieldDeclaration):
2173 return CTFWriter.IntegerField(field_type)
2174 elif isinstance(field_type, CTFWriter.EnumerationFieldDeclaration):
2175 return CTFWriter.EnumerationField(field_type)
2176 elif isinstance(field_type, CTFWriter.FloatFieldDeclaration):
2177 return CTFWriter.FloatingPointField(field_type)
2178 elif isinstance(field_type, CTFWriter.StructureFieldDeclaration):
2179 return CTFWriter.StructureField(field_type)
2180 elif isinstance(field_type, CTFWriter.VariantFieldDeclaration):
2181 return CTFWriter.VariantField(field_type)
2182 elif isinstance(field_type, CTFWriter.ArrayFieldDeclaration):
2183 return CTFWriter.ArrayField(field_type)
2184 elif isinstance(field_type, CTFWriter.SequenceFieldDeclaration):
2185 return CTFWriter.SequenceField(field_type)
2186 elif isinstance(field_type, CTFWriter.StringFieldDeclaration):
2187 return CTFWriter.StringField(field_type)
2188
2189 class Field:
2190 """
2191 Base class, do not instantiate.
2192 """
2193
2194 def __init__(self, field_type):
2195 if not isinstance(field_type, CTFWriter.FieldDeclaration):
2196 raise TypeError("Invalid field_type argument.")
2197
2198 self._f = nbt._bt_ctf_field_create(field_type._ft)
2199
2200 if self._f is None:
2201 raise ValueError("Field creation failed.")
2202
2203 def __del__(self):
2204 nbt._bt_ctf_field_put(self._f)
2205
2206 @staticmethod
2207 def _create_field_from_native_instance(native_field_instance):
2208 type_dict = {
2209 CTFTypeId.INTEGER: CTFWriter.IntegerField,
2210 CTFTypeId.FLOAT: CTFWriter.FloatingPointField,
2211 CTFTypeId.ENUM: CTFWriter.EnumerationField,
2212 CTFTypeId.STRING: CTFWriter.StringField,
2213 CTFTypeId.STRUCT: CTFWriter.StructureField,
2214 CTFTypeId.VARIANT: CTFWriter.VariantField,
2215 CTFTypeId.ARRAY: CTFWriter.ArrayField,
2216 CTFTypeId.SEQUENCE: CTFWriter.SequenceField
2217 }
2218
2219 field_type = nbt._bt_python_get_field_type(native_field_instance)
2220
2221 if field_type == CTFTypeId.UNKNOWN:
2222 raise TypeError("Invalid field instance")
2223
2224 field = CTFWriter.Field.__new__(CTFWriter.Field)
2225 field._f = native_field_instance
2226 field.__class__ = type_dict[field_type]
2227
2228 return field
2229
2230 @property
2231 def declaration(self):
2232 native_field_type = nbt._bt_ctf_field_get_type(self._f)
2233
2234 if native_field_type is None:
2235 raise TypeError("Invalid field instance")
2236 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(
2237 native_field_type)
2238
2239 class IntegerField(Field):
2240 @property
2241 def value(self):
2242 """
2243 Get an integer field's value.
2244 """
2245
2246 signedness = nbt._bt_python_field_integer_get_signedness(self._f)
2247
2248 if signedness < 0:
2249 raise TypeError("Invalid integer instance.")
2250
2251 if signedness == 0:
2252 ret, value = nbt._bt_ctf_field_unsigned_integer_get_value(self._f)
2253 else:
2254 ret, value = nbt._bt_ctf_field_signed_integer_get_value(self._f)
2255
2256 if ret < 0:
2257 raise ValueError("Could not get integer field value.")
2258
2259 return value
2260
2261 @value.setter
2262 def value(self, value):
2263 """
2264 Set an integer field's value.
2265 """
2266
2267 if not isinstance(value, int):
2268 raise TypeError("IntegerField's value must be an int")
2269
2270 signedness = nbt._bt_python_field_integer_get_signedness(self._f)
2271 if signedness < 0:
2272 raise TypeError("Invalid integer instance.")
2273
2274 if signedness == 0:
2275 ret = nbt._bt_ctf_field_unsigned_integer_set_value(self._f, value)
2276 else:
2277 ret = nbt._bt_ctf_field_signed_integer_set_value(self._f, value)
2278
2279 if ret < 0:
2280 raise ValueError("Could not set integer field value.")
2281
2282 class EnumerationField(Field):
2283 @property
2284 def container(self):
2285 """
2286 Return the enumeration's underlying container field (an integer field).
2287 """
2288
2289 container = CTFWriter.IntegerField.__new__(CTFWriter.IntegerField)
2290 container._f = nbt._bt_ctf_field_enumeration_get_container(self._f)
2291
2292 if container._f is None:
2293 raise TypeError("Invalid enumeration field type.")
2294
2295 return container
2296
2297 @property
2298 def value(self):
2299 """
2300 Get the enumeration field's mapping name.
2301 """
2302
2303 value = nbt._bt_ctf_field_enumeration_get_mapping_name(self._f)
2304
2305 if value is None:
2306 raise ValueError("Could not get enumeration's mapping name.")
2307
2308 return value
2309
2310 @value.setter
2311 def value(self, value):
2312 """
2313 Set the enumeration field's value. Must be an integer as mapping names
2314 may be ambiguous.
2315 """
2316
2317 if not isinstance(value, int):
2318 raise TypeError("EnumerationField value must be an int")
2319
2320 self.container.value = value
2321
2322 class FloatingPointField(Field):
2323 @property
2324 def value(self):
2325 """
2326 Get a floating point field's value.
2327 """
2328
2329 ret, value = nbt._bt_ctf_field_floating_point_get_value(self._f)
2330
2331 if ret < 0:
2332 raise ValueError("Could not get floating point field value.")
2333
2334 return value
2335
2336 @value.setter
2337 def value(self, value):
2338 """
2339 Set a floating point field's value.
2340 """
2341
2342 if not isinstance(value, int) and not isinstance(value, float):
2343 raise TypeError("Value must be either a float or an int")
2344
2345 ret = nbt._bt_ctf_field_floating_point_set_value(self._f, float(value))
2346
2347 if ret < 0:
2348 raise ValueError("Could not set floating point field value.")
2349
2350 # oops!! This class is provided to ensure backward-compatibility since
2351 # a stable release publicly exposed this abomination.
2352 class FloatFieldingPoint(FloatingPointField):
2353 pass
2354
2355 class StructureField(Field):
2356 def field(self, field_name):
2357 """
2358 Get the structure's field corresponding to the provided field name.
2359 """
2360
2361 native_instance = nbt._bt_ctf_field_structure_get_field(self._f,
2362 str(field_name))
2363
2364 if native_instance is None:
2365 raise ValueError("Invalid field_name provided.")
2366
2367 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2368
2369 class VariantField(Field):
2370 def field(self, tag):
2371 """
2372 Return the variant's selected field. The "tag" field is the selector enum field.
2373 """
2374
2375 native_instance = nbt._bt_ctf_field_variant_get_field(self._f, tag._f)
2376
2377 if native_instance is None:
2378 raise ValueError("Invalid tag provided.")
2379
2380 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2381
2382 class ArrayField(Field):
2383 def field(self, index):
2384 """
2385 Return the array's field at position "index".
2386 """
2387
2388 native_instance = nbt._bt_ctf_field_array_get_field(self._f, index)
2389
2390 if native_instance is None:
2391 raise IndexError("Invalid index provided.")
2392
2393 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2394
2395 class SequenceField(Field):
2396 @property
2397 def length(self):
2398 """
2399 Get the sequence's length field (IntegerField).
2400 """
2401
2402 native_instance = nbt._bt_ctf_field_sequence_get_length(self._f)
2403
2404 if native_instance is None:
2405 length = -1
2406
2407 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2408
2409 @length.setter
2410 def length(self, length_field):
2411 """
2412 Set the sequence's length field (IntegerField).
2413 """
2414
2415 if not isinstance(length_field, CTFWriter.IntegerField):
2416 raise TypeError("Invalid length field.")
2417
2418 if length_field.declaration.signed:
2419 raise TypeError("Sequence field length must be unsigned")
2420
2421 ret = nbt._bt_ctf_field_sequence_set_length(self._f, length_field._f)
2422
2423 if ret < 0:
2424 raise ValueError("Could not set sequence length.")
2425
2426 def field(self, index):
2427 """
2428 Return the sequence's field at position "index".
2429 """
2430
2431 native_instance = nbt._bt_ctf_field_sequence_get_field(self._f, index)
2432
2433 if native_instance is None:
2434 raise ValueError("Could not get sequence element at index.")
2435
2436 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2437
2438 class StringField(Field):
2439 @property
2440 def value(self):
2441 """
2442 Get a string field's value.
2443 """
2444
2445 return nbt._bt_ctf_field_string_get_value(self._f)
2446
2447 @value.setter
2448 def value(self, value):
2449 """
2450 Set a string field's value.
2451 """
2452
2453 ret = nbt._bt_ctf_field_string_set_value(self._f, str(value))
2454
2455 if ret < 0:
2456 raise ValueError("Could not set string field value.")
2457
2458 class EventClass:
2459 def __init__(self, name):
2460 """
2461 Create a new event class of the given name.
2462 """
2463
2464 self._ec = nbt._bt_ctf_event_class_create(name)
2465
2466 if self._ec is None:
2467 raise ValueError("Event class creation failed.")
2468
2469 def __del__(self):
2470 nbt._bt_ctf_event_class_put(self._ec)
2471
2472 def add_field(self, field_type, field_name):
2473 """
2474 Add a field of type "field_type" to the event class.
2475 """
2476
2477 ret = nbt._bt_ctf_event_class_add_field(self._ec, field_type._ft,
2478 str(field_name))
2479
2480 if ret < 0:
2481 raise ValueError("Could not add field to event class.")
2482
2483 @property
2484 def name(self):
2485 """
2486 Get the event class' name.
2487 """
2488
2489 name = nbt._bt_ctf_event_class_get_name(self._ec)
2490
2491 if name is None:
2492 raise TypeError("Could not get EventClass name")
2493
2494 return name
2495
2496 @property
2497 def id(self):
2498 """
2499 Get the event class' id. Returns a negative value if unset.
2500 """
2501
2502 id = nbt._bt_ctf_event_class_get_id(self._ec)
2503
2504 if id < 0:
2505 raise TypeError("Could not get EventClass id")
2506
2507 return id
2508
2509 @id.setter
2510 def id(self, id):
2511 """
2512 Set the event class' id. Throws a TypeError if the event class
2513 is already registered to a stream class.
2514 """
2515
2516 ret = nbt._bt_ctf_event_class_set_id(self._ec, id)
2517
2518 if ret < 0:
2519 raise TypeError("Can't change an Event Class's id after it has been assigned to a stream class")
2520
2521 @property
2522 def stream_class(self):
2523 """
2524 Get the event class' stream class. Returns None if unset.
2525 """
2526 stream_class_native = nbt._bt_ctf_event_class_get_stream_class(self._ec)
2527
2528 if stream_class_native is None:
2529 return None
2530
2531 stream_class = CTFWriter.StreamClass.__new__(CTFWriter.StreamClass)
2532 stream_class._sc = stream_class_native
2533
2534 return stream_class
2535
2536 @property
2537 def fields(self):
2538 """
2539 Generator returning the event class' fields as tuples of (field name, field declaration).
2540 """
2541
2542 count = nbt._bt_ctf_event_class_get_field_count(self._ec)
2543
2544 if count < 0:
2545 raise TypeError("Could not get EventClass' field count")
2546
2547 for i in range(count):
2548 field_name = nbt._bt_python_ctf_event_class_get_field_name(self._ec, i)
2549
2550 if field_name is None:
2551 msg = "Could not get EventClass' field name at index {}".format(i)
2552 raise TypeError(msg)
2553
2554 field_type_native = nbt._bt_python_ctf_event_class_get_field_type(self._ec, i)
2555
2556 if field_type_native is None:
2557 msg = "Could not get EventClass' field type at index {}".format(i)
2558 raise TypeError(msg)
2559
2560 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2561 yield (field_name, field_type)
2562
2563 def get_field_by_name(self, name):
2564 """
2565 Get a field declaration by name (FieldDeclaration).
2566 """
2567
2568 field_type_native = nbt._bt_ctf_event_class_get_field_by_name(self._ec, name)
2569
2570 if field_type_native is None:
2571 msg = "Could not find EventClass field with name {}".format(name)
2572 raise TypeError(msg)
2573
2574 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2575
2576 class Event:
2577 def __init__(self, event_class):
2578 """
2579 Create a new event of the given event class.
2580 """
2581
2582 if not isinstance(event_class, CTFWriter.EventClass):
2583 raise TypeError("Invalid event_class argument.")
2584
2585 self._e = nbt._bt_ctf_event_create(event_class._ec)
2586
2587 if self._e is None:
2588 raise ValueError("Event creation failed.")
2589
2590 def __del__(self):
2591 nbt._bt_ctf_event_put(self._e)
2592
2593 @property
2594 def event_class(self):
2595 """
2596 Get the event's class.
2597 """
2598
2599 event_class_native = nbt._bt_ctf_event_get_class(self._e)
2600
2601 if event_class_native is None:
2602 return None
2603
2604 event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass)
2605 event_class._ec = event_class_native
2606
2607 return event_class
2608
2609 def clock(self):
2610 """
2611 Get a clock from event. Returns None if the event's class
2612 is not registered to a stream class.
2613 """
2614
2615 clock_instance = nbt._bt_ctf_event_get_clock(self._e)
2616
2617 if clock_instance is None:
2618 return None
2619
2620 clock = CTFWriter.Clock.__new__(CTFWriter.Clock)
2621 clock._c = clock_instance
2622
2623 return clock
2624
2625 def payload(self, field_name):
2626 """
2627 Get a field from event.
2628 """
2629
2630 native_instance = nbt._bt_ctf_event_get_payload(self._e,
2631 str(field_name))
2632
2633 if native_instance is None:
2634 raise ValueError("Could not get event payload.")
2635
2636 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2637
2638 def set_payload(self, field_name, value_field):
2639 """
2640 Set a manually created field as an event's payload.
2641 """
2642
2643 if not isinstance(value, CTFWriter.Field):
2644 raise TypeError("Invalid value type.")
2645
2646 ret = nbt._bt_ctf_event_set_payload(self._e, str(field_name),
2647 value_field._f)
2648
2649 if ret < 0:
2650 raise ValueError("Could not set event field payload.")
2651
2652 class StreamClass:
2653 def __init__(self, name):
2654 """
2655 Create a new stream class of the given name.
2656 """
2657
2658 self._sc = nbt._bt_ctf_stream_class_create(name)
2659
2660 if self._sc is None:
2661 raise ValueError("Stream class creation failed.")
2662
2663 def __del__(self):
2664 nbt._bt_ctf_stream_class_put(self._sc)
2665
2666 @property
2667 def name(self):
2668 """
2669 Get a stream class' name.
2670 """
2671
2672 name = nbt._bt_ctf_stream_class_get_name(self._sc)
2673
2674 if name is None:
2675 raise TypeError("Could not get StreamClass name")
2676
2677 return name
2678
2679 @property
2680 def clock(self):
2681 """
2682 Get a stream class' clock.
2683 """
2684
2685 clock_instance = nbt._bt_ctf_stream_class_get_clock(self._sc)
2686
2687 if clock_instance is None:
2688 return None
2689
2690 clock = CTFWriter.Clock.__new__(CTFWriter.Clock)
2691 clock._c = clock_instance
2692
2693 return clock
2694
2695 @clock.setter
2696 def clock(self, clock):
2697 """
2698 Assign a clock to a stream class.
2699 """
2700
2701 if not isinstance(clock, CTFWriter.Clock):
2702 raise TypeError("Invalid clock type.")
2703
2704 ret = nbt._bt_ctf_stream_class_set_clock(self._sc, clock._c)
2705
2706 if ret < 0:
2707 raise ValueError("Could not set stream class clock.")
2708
2709 @property
2710 def id(self):
2711 """
2712 Get a stream class' id.
2713 """
2714
2715 ret = nbt._bt_ctf_stream_class_get_id(self._sc)
2716
2717 if ret < 0:
2718 raise TypeError("Could not get StreamClass id")
2719
2720 return ret
2721
2722 @id.setter
2723 def id(self, id):
2724 """
2725 Assign an id to a stream class.
2726 """
2727
2728 ret = nbt._bt_ctf_stream_class_set_id(self._sc, id)
2729
2730 if ret < 0:
2731 raise TypeError("Could not set stream class id.")
2732
2733 @property
2734 def event_classes(self):
2735 """
2736 Generator returning the stream class' event classes.
2737 """
2738
2739 count = nbt._bt_ctf_stream_class_get_event_class_count(self._sc)
2740
2741 if count < 0:
2742 raise TypeError("Could not get StreamClass' event class count")
2743
2744 for i in range(count):
2745 event_class_native = nbt._bt_ctf_stream_class_get_event_class(self._sc, i)
2746
2747 if event_class_native is None:
2748 msg = "Could not get StreamClass' event class at index {}".format(i)
2749 raise TypeError(msg)
2750
2751 event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass)
2752 event_class._ec = event_class_native
2753 yield event_class
2754
2755 def add_event_class(self, event_class):
2756 """
2757 Add an event class to a stream class. New events can be added even after a
2758 stream has been instantiated and events have been appended. However, a stream
2759 will not accept events of a class that has not been added to the stream
2760 class beforehand.
2761 """
2762
2763 if not isinstance(event_class, CTFWriter.EventClass):
2764 raise TypeError("Invalid event_class type.")
2765
2766 ret = nbt._bt_ctf_stream_class_add_event_class(self._sc,
2767 event_class._ec)
2768
2769 if ret < 0:
2770 raise ValueError("Could not add event class.")
2771
2772 @property
2773 def packet_context_type(self):
2774 """
2775 Get the StreamClass' packet context type (StructureFieldDeclaration)
2776 """
2777
2778 field_type_native = nbt._bt_ctf_stream_class_get_packet_context_type(self._sc)
2779
2780 if field_type_native is None:
2781 raise ValueError("Invalid StreamClass")
2782
2783 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2784
2785 return field_type
2786
2787 @packet_context_type.setter
2788 def packet_context_type(self, field_type):
2789 """
2790 Set a StreamClass' packet context type. Must be of type
2791 StructureFieldDeclaration.
2792 """
2793
2794 if not isinstance(field_type, CTFWriter.StructureFieldDeclaration):
2795 raise TypeError("field_type argument must be of type StructureFieldDeclaration.")
2796
2797 ret = nbt._bt_ctf_stream_class_set_packet_context_type(self._sc,
2798 field_type._ft)
2799
2800 if ret < 0:
2801 raise ValueError("Failed to set packet context type.")
2802
2803 class Stream:
2804 def __init__(self):
2805 raise NotImplementedError("Stream cannot be instantiated; use Writer.create_stream()")
2806
2807 def __del__(self):
2808 nbt._bt_ctf_stream_put(self._s)
2809
2810 @property
2811 def discarded_events(self):
2812 """
2813 Get a stream's discarded event count.
2814 """
2815
2816 ret, count = nbt._bt_ctf_stream_get_discarded_events_count(self._s)
2817
2818 if ret < 0:
2819 raise ValueError("Could not get the stream's discarded events count")
2820
2821 return count
2822
2823 def append_discarded_events(self, event_count):
2824 """
2825 Increase the current packet's discarded event count.
2826 """
2827
2828 nbt._bt_ctf_stream_append_discarded_events(self._s, event_count)
2829
2830 def append_event(self, event):
2831 """
2832 Append "event" to the stream's current packet. The stream's associated clock
2833 will be sampled during this call. The event shall not be modified after
2834 being appended to a stream.
2835 """
2836
2837 ret = nbt._bt_ctf_stream_append_event(self._s, event._e)
2838
2839 if ret < 0:
2840 raise ValueError("Could not append event to stream.")
2841
2842 @property
2843 def packet_context(self):
2844 """
2845 Get a Stream's packet context field (a StructureField).
2846 """
2847
2848 native_field = nbt._bt_ctf_stream_get_packet_context(self._s)
2849
2850 if native_field is None:
2851 raise ValueError("Invalid Stream.")
2852
2853 return CTFWriter.Field._create_field_from_native_instance(native_field)
2854
2855 @packet_context.setter
2856 def packet_context(self, field):
2857 """
2858 Set a Stream's packet context field (must be a StructureField).
2859 """
2860
2861 if not isinstance(field, CTFWriter.StructureField):
2862 raise TypeError("Argument field must be of type StructureField")
2863
2864 ret = nbt._bt_ctf_stream_set_packet_context(self._s, field._f)
2865
2866 if ret < 0:
2867 raise ValueError("Invalid packet context field.")
2868
2869 def flush(self):
2870 """
2871 The stream's current packet's events will be flushed to disk. Events
2872 subsequently appended to the stream will be added to a new packet.
2873 """
2874
2875 ret = nbt._bt_ctf_stream_flush(self._s)
2876
2877 if ret < 0:
2878 raise ValueError("Could not flush stream.")
2879
2880 class Writer:
2881 def __init__(self, path):
2882 """
2883 Create a new writer that will produce a trace in the given path.
2884 """
2885
2886 self._w = nbt._bt_ctf_writer_create(path)
2887
2888 if self._w is None:
2889 raise ValueError("Writer creation failed.")
2890
2891 def __del__(self):
2892 nbt._bt_ctf_writer_put(self._w)
2893
2894 def create_stream(self, stream_class):
2895 """
2896 Create a new stream instance and register it to the writer.
2897 """
2898
2899 if not isinstance(stream_class, CTFWriter.StreamClass):
2900 raise TypeError("Invalid stream_class type.")
2901
2902 stream = CTFWriter.Stream.__new__(CTFWriter.Stream)
2903 stream._s = nbt._bt_ctf_writer_create_stream(self._w, stream_class._sc)
2904
2905 return stream
2906
2907 def add_environment_field(self, name, value):
2908 """
2909 Add an environment field to the trace.
2910 """
2911
2912 ret = nbt._bt_ctf_writer_add_environment_field(self._w, str(name),
2913 str(value))
2914
2915 if ret < 0:
2916 raise ValueError("Could not add environment field to trace.")
2917
2918 def add_clock(self, clock):
2919 """
2920 Add a clock to the trace. Clocks assigned to stream classes must be
2921 registered to the writer.
2922 """
2923
2924 ret = nbt._bt_ctf_writer_add_clock(self._w, clock._c)
2925
2926 if ret < 0:
2927 raise ValueError("Could not add clock to Writer.")
2928
2929 @property
2930 def metadata(self):
2931 """
2932 Get the trace's TSDL meta-data.
2933 """
2934
2935 return nbt._bt_ctf_writer_get_metadata_string(self._w)
2936
2937 def flush_metadata(self):
2938 """
2939 Flush the trace's metadata to the metadata file.
2940 """
2941
2942 nbt._bt_ctf_writer_flush_metadata(self._w)
2943
2944 @property
2945 def byte_order(self):
2946 """
2947 Get the trace's byte order. Must be a constant from the ByteOrder
2948 class.
2949 """
2950
2951 raise NotImplementedError("Getter not implemented.")
2952
2953 @byte_order.setter
2954 def byte_order(self, byte_order):
2955 """
2956 Set the trace's byte order. Must be a constant from the ByteOrder
2957 class. Defaults to the host machine's endianness
2958 """
2959
2960 ret = nbt._bt_ctf_writer_set_byte_order(self._w, byte_order)
2961
2962 if ret < 0:
2963 raise ValueError("Could not set trace's byte order.")
This page took 0.088078 seconds and 3 git commands to generate.