Python: bt.py: PEP 8 + improve readability
[babeltrace.git] / bindings / python / bt.py
1 # nativebt.i.in
2 #
3 # Babeltrace native interface Python module
4 #
5 # Copyright 2012-2015 EfficiOS Inc.
6 #
7 # Author: Danny Serres <danny.serres@efficios.com>
8 # Author: Jérémie Galarneau <jeremie.galarneau@efficios.com>
9 #
10 # Permission is hereby granted, free of charge, to any person obtaining a copy
11 # of this software and associated documentation files (the "Software"), to deal
12 # in the Software without restriction, including without limitation the rights
13 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 # copies of the Software, and to permit persons to whom the Software is
15 # furnished to do so, subject to the following conditions:
16 #
17 # The above copyright notice and this permission notice shall be included in
18 # all copies or substantial portions of the Software.
19 #
20 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 # SOFTWARE.
27
28 import babeltrace.nativebt as nbt
29 import collections
30 import os
31 from datetime import datetime
32 from uuid import UUID
33
34
35 class TraceCollection:
36 """
37 The TraceCollection is the object that contains all currently opened traces.
38 """
39
40 def __init__(self):
41 self._tc = nbt._bt_context_create()
42
43 def __del__(self):
44 nbt._bt_context_put(self._tc)
45
46 def add_trace(self, path, format_str):
47 """
48 Add a trace by path to the TraceCollection.
49
50 Open a trace.
51
52 path is the path to the trace, it is not recursive.
53 If "path" is None, stream_list is used instead as a list
54 of mmap streams to open for the trace.
55
56 format is a string containing the format name in which the trace was
57 produced.
58
59 Return: the corresponding TraceHandle on success or None on error.
60 """
61
62 ret = nbt._bt_context_add_trace(self._tc, path, format_str,
63 None, None, None)
64
65 if ret < 0:
66 return None
67
68 th = TraceHandle.__new__(TraceHandle)
69 th._id = ret
70 th._trace_collection = self
71
72 return th
73
74 def add_traces_recursive(self, path, format_str):
75 """
76 Open a trace recursively.
77
78 Find each trace present in the subdirectory starting from the given
79 path, and add them to the TraceCollection.
80
81 Return a dict of TraceHandle instances (the full path is the key).
82 Return None on error.
83 """
84
85 trace_handles = {}
86 noTrace = True
87 error = False
88
89 for fullpath, dirs, files in os.walk(path):
90 if "metadata" in files:
91 trace_handle = self.add_trace(fullpath, format_str)
92
93 if trace_handle is None:
94 error = True
95 continue
96
97 trace_handles[fullpath] = trace_handle
98 noTrace = False
99
100 if noTrace and error:
101 return None
102
103 return trace_handles
104
105 def remove_trace(self, trace_handle):
106 """
107 Remove a trace from the TraceCollection.
108 Effectively closing the trace.
109 """
110
111 try:
112 nbt._bt_context_remove_trace(self._tc, trace_handle._id)
113 except AttributeError:
114 raise TypeError("in remove_trace, argument 2 must be a TraceHandle instance")
115
116 @property
117 def events(self):
118 """
119 Generator function to iterate over the events of open in the current
120 TraceCollection.
121
122 Due to limitations of the native Babeltrace API, only one event
123 may be "alive" at a time (i.e. a user should never store a copy
124 of the events returned by this function for ulterior use). Users
125 shall make sure to copy the information they need from an event
126 before accessing the next one.
127
128 Furthermore, event objects become invalid when the generator goes
129 out of scope as the underlying iterator will be reclaimed. Using an
130 event after the the generator has gone out of scope may result in a
131 crash or data corruption.
132 """
133
134 begin_pos_ptr = nbt._bt_iter_pos()
135 end_pos_ptr = nbt._bt_iter_pos()
136 begin_pos_ptr.type = nbt.SEEK_BEGIN
137 end_pos_ptr.type = nbt.SEEK_LAST
138
139 for event in self._events(begin_pos_ptr, end_pos_ptr):
140 yield event
141
142 def events_timestamps(self, timestamp_begin, timestamp_end):
143 """
144 Generator function to iterate over the events of open in the current
145 TraceCollection from timestamp_begin to timestamp_end.
146 """
147
148 begin_pos_ptr = nbt._bt_iter_pos()
149 end_pos_ptr = nbt._bt_iter_pos()
150 begin_pos_ptr.type = end_pos_ptr.type = nbt.SEEK_TIME
151 begin_pos_ptr.u.seek_time = timestamp_begin
152 end_pos_ptr.u.seek_time = timestamp_end
153
154 for event in self._events(begin_pos_ptr, end_pos_ptr):
155 yield event
156
157 @property
158 def timestamp_begin(self):
159 pos_ptr = nbt._bt_iter_pos()
160 pos_ptr.type = nbt.SEEK_BEGIN
161
162 return self._timestamp_at_pos(pos_ptr)
163
164 @property
165 def timestamp_end(self):
166 pos_ptr = nbt._bt_iter_pos()
167 pos_ptr.type = nbt.SEEK_LAST
168
169 return self._timestamp_at_pos(pos_ptr)
170
171 def _timestamp_at_pos(self, pos_ptr):
172 ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, pos_ptr, pos_ptr)
173
174 if ctf_it_ptr is None:
175 raise NotImplementedError("Creation of multiple iterators is unsupported.")
176
177 ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr)
178 nbt._bt_ctf_iter_destroy(ctf_it_ptr)
179
180 if ev_ptr is None:
181 return None
182
183 def _events(self, begin_pos_ptr, end_pos_ptr):
184 ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, begin_pos_ptr, end_pos_ptr)
185
186 if ctf_it_ptr is None:
187 raise NotImplementedError("Creation of multiple iterators is unsupported.")
188
189 while True:
190 ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr)
191
192 if ev_ptr is None:
193 break
194
195 ev = Event.__new__(Event)
196 ev._e = ev_ptr
197
198 try:
199 yield ev
200 except GeneratorExit:
201 break
202
203 ret = nbt._bt_iter_next(nbt._bt_ctf_get_iter(ctf_it_ptr))
204
205 if ret != 0:
206 break
207
208 nbt._bt_ctf_iter_destroy(ctf_it_ptr)
209
210
211 def print_format_list(babeltrace_file):
212 """
213 Print a list of available formats to file.
214
215 babeltrace_file must be a File instance opened in write mode.
216 """
217
218 try:
219 if babeltrace_file._file is not None:
220 nbt._bt_print_format_list(babeltrace_file._file)
221 except AttributeError:
222 raise TypeError("in print_format_list, argument 1 must be a File instance")
223
224
225 # Based on enum bt_clock_type in clock-type.h
226 class ClockType:
227 CLOCK_CYCLES = 0
228 CLOCK_REAL = 1
229
230
231 class TraceHandle:
232 """
233 The TraceHandle allows the user to manipulate a trace file directly.
234 It is a unique identifier representing a trace file.
235 Do not instantiate.
236 """
237
238 def __init__(self):
239 raise NotImplementedError("TraceHandle cannot be instantiated")
240
241 def __repr__(self):
242 return "Babeltrace TraceHandle: trace_id('{0}')".format(self._id)
243
244 @property
245 def id(self):
246 """Return the TraceHandle id."""
247
248 return self._id
249
250 @property
251 def path(self):
252 """Return the path of a TraceHandle."""
253
254 return nbt._bt_trace_handle_get_path(self._trace_collection._tc,
255 self._id)
256
257 @property
258 def timestamp_begin(self):
259 """Return the creation time of the buffers of a trace."""
260
261 return nbt._bt_trace_handle_get_timestamp_begin(self._trace_collection._tc,
262 self._id,
263 ClockType.CLOCK_REAL)
264
265 @property
266 def timestamp_end(self):
267 """Return the destruction timestamp of the buffers of a trace."""
268
269 return nbt._bt_trace_handle_get_timestamp_end(self._trace_collection._tc,
270 self._id,
271 ClockType.CLOCK_REAL)
272
273 @property
274 def events(self):
275 """
276 Generator returning all events (EventDeclaration) in a trace.
277 """
278
279 ret = nbt._bt_python_event_decl_listcaller(self.id,
280 self._trace_collection._tc)
281
282 if not isinstance(ret, list):
283 return
284
285 ptr_list, count = ret
286
287 for i in range(count):
288 tmp = EventDeclaration.__new__(EventDeclaration)
289 tmp._ed = nbt._bt_python_decl_one_from_list(ptr_list, i)
290 yield tmp
291
292
293 class CTFStringEncoding:
294 NONE = 0
295 UTF8 = 1
296 ASCII = 2
297 UNKNOWN = 3
298
299
300 # Based on the enum in ctf-writer/writer.h
301 class ByteOrder:
302 BYTE_ORDER_NATIVE = 0
303 BYTE_ORDER_LITTLE_ENDIAN = 1
304 BYTE_ORDER_BIG_ENDIAN = 2
305 BYTE_ORDER_NETWORK = 3
306 BYTE_ORDER_UNKNOWN = 4 # Python-specific entry
307
308
309 # enum equivalent, accessible constants
310 # These are taken directly from ctf/events.h
311 # All changes to enums must also be made here
312 class CTFTypeId:
313 UNKNOWN = 0
314 INTEGER = 1
315 FLOAT = 2
316 ENUM = 3
317 STRING = 4
318 STRUCT = 5
319 UNTAGGED_VARIANT = 6
320 VARIANT = 7
321 ARRAY = 8
322 SEQUENCE = 9
323 NR_CTF_TYPES = 10
324
325 def type_name(id):
326 name = "UNKNOWN_TYPE"
327 constants = [
328 attr for attr in dir(CTFTypeId) if not callable(
329 getattr(
330 CTFTypeId,
331 attr)) and not attr.startswith("__")]
332
333 for attr in constants:
334 if getattr(CTFTypeId, attr) == id:
335 name = attr
336 break
337
338 return name
339
340
341 class CTFScope:
342 TRACE_PACKET_HEADER = 0
343 STREAM_PACKET_CONTEXT = 1
344 STREAM_EVENT_HEADER = 2
345 STREAM_EVENT_CONTEXT = 3
346 EVENT_CONTEXT = 4
347 EVENT_FIELDS = 5
348
349 def scope_name(scope):
350 name = "UNKNOWN_SCOPE"
351 constants = [
352 attr for attr in dir(CTFScope) if not callable(
353 getattr(
354 CTFScope,
355 attr)) and not attr.startswith("__")]
356
357 for attr in constants:
358 if getattr(CTFScope, attr) == scope:
359 name = attr
360 break
361
362 return name
363
364
365 # Priority of the scopes when searching for event fields
366 _scopes = [
367 CTFScope.EVENT_FIELDS,
368 CTFScope.EVENT_CONTEXT,
369 CTFScope.STREAM_EVENT_CONTEXT,
370 CTFScope.STREAM_EVENT_HEADER,
371 CTFScope.STREAM_PACKET_CONTEXT,
372 CTFScope.TRACE_PACKET_HEADER
373 ]
374
375
376 class Event(collections.Mapping):
377 """
378 This class represents an event from the trace.
379 It is obtained using the TraceCollection generator functions.
380 Do not instantiate.
381 """
382
383 def __init__(self):
384 raise NotImplementedError("Event cannot be instantiated")
385
386 @property
387 def name(self):
388 """Return the name of the event or None on error."""
389
390 return nbt._bt_ctf_event_name(self._e)
391
392 @property
393 def cycles(self):
394 """
395 Return the timestamp of the event as written in
396 the packet (in cycles) or -1ULL on error.
397 """
398
399 return nbt._bt_ctf_get_cycles(self._e)
400
401 @property
402 def timestamp(self):
403 """
404 Return the timestamp of the event offset with the
405 system clock source or -1ULL on error.
406 """
407
408 return nbt._bt_ctf_get_timestamp(self._e)
409
410 @property
411 def datetime(self):
412 """
413 Return a datetime object based on the event's
414 timestamp. Note that the datetime class' precision
415 is limited to microseconds.
416 """
417
418 return datetime.fromtimestamp(self.timestamp / 1E9)
419
420 def field_with_scope(self, field_name, scope):
421 """
422 Get field_name's value in scope.
423 None is returned if no field matches field_name.
424 """
425
426 if scope not in _scopes:
427 raise ValueError("Invalid scope provided")
428
429 field = self._field_with_scope(field_name, scope)
430
431 if field is not None:
432 return field.value
433
434 return None
435
436 def field_list_with_scope(self, scope):
437 """Return a list of field names in scope."""
438
439 if scope not in _scopes:
440 raise ValueError("Invalid scope provided")
441
442 field_names = []
443
444 for field in self._field_list_with_scope(scope):
445 field_names.append(field.name)
446
447 return field_names
448
449 @property
450 def handle(self):
451 """
452 Get the TraceHandle associated with this event
453 Return None on error
454 """
455
456 ret = nbt._bt_ctf_event_get_handle_id(self._e)
457
458 if ret < 0:
459 return None
460
461 th = TraceHandle.__new__(TraceHandle)
462 th._id = ret
463 th._trace_collection = self.get_trace_collection()
464
465 return th
466
467 @property
468 def trace_collection(self):
469 """
470 Get the TraceCollection associated with this event.
471 Return None on error.
472 """
473
474 trace_collection = TraceCollection()
475 trace_collection._tc = nbt._bt_ctf_event_get_context(self._e)
476
477 if trace_collection._tc is None:
478 return None
479 else:
480 return trace_collection
481
482 def __getitem__(self, field_name):
483 """
484 Get field_name's value. If the field_name exists in multiple
485 scopes, the first field found is returned. The scopes are searched
486 in the following order:
487 1) EVENT_FIELDS
488 2) EVENT_CONTEXT
489 3) STREAM_EVENT_CONTEXT
490 4) STREAM_EVENT_HEADER
491 5) STREAM_PACKET_CONTEXT
492 6) TRACE_PACKET_HEADER
493 None is returned if no field matches field_name.
494
495 Use field_with_scope() to explicitly access fields in a given
496 scope.
497 """
498
499 field = self._field(field_name)
500
501 if field is not None:
502 return field.value
503
504 raise KeyError(field_name)
505
506 def __iter__(self):
507 for key in self.keys():
508 yield key
509
510 def __len__(self):
511 count = 0
512
513 for scope in _scopes:
514 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
515 ret = nbt._bt_python_field_listcaller(self._e, scope_ptr)
516
517 if isinstance(ret, list):
518 count += ret[1]
519
520 return count
521
522 def __contains__(self, field_name):
523 return self._field(field_name) is not None
524
525 def keys(self):
526 """Return a list of field names."""
527
528 field_names = set()
529
530 for scope in _scopes:
531 for name in self.field_list_with_scope(scope):
532 field_names.add(name)
533
534 return list(field_names)
535
536 def get(self, field_name, default=None):
537 field = self._field(field_name)
538
539 if field is None:
540 return default
541
542 return field.value
543
544 def items(self):
545 for field in self.keys():
546 yield (field, self[field])
547
548 def _field_with_scope(self, field_name, scope):
549 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
550
551 if scope_ptr is None:
552 return None
553
554 definition_ptr = nbt._bt_ctf_get_field(self._e, scope_ptr, field_name)
555
556 if definition_ptr is None:
557 return None
558
559 field = _Definition(definition_ptr, scope)
560
561 return field
562
563 def _field(self, field_name):
564 field = None
565
566 for scope in _scopes:
567 field = self._field_with_scope(field_name, scope)
568
569 if field is not None:
570 break
571
572 return field
573
574 def _field_list_with_scope(self, scope):
575 fields = []
576 scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope)
577
578 # Returns a list [list_ptr, count]. If list_ptr is NULL, SWIG will only
579 # provide the "count" return value
580 count = 0
581 list_ptr = None
582 ret = nbt._bt_python_field_listcaller(self._e, scope_ptr)
583
584 if isinstance(ret, list):
585 list_ptr, count = ret
586
587 for i in range(count):
588 definition_ptr = nbt._bt_python_field_one_from_list(list_ptr, i)
589
590 if definition_ptr is not None:
591 definition = _Definition(definition_ptr, scope)
592 fields.append(definition)
593
594 return fields
595
596
597 class FieldError(Exception):
598 def __init__(self, value):
599 self.value = value
600
601 def __str__(self):
602 return repr(self.value)
603
604
605 class EventDeclaration:
606 """Event declaration class. Do not instantiate."""
607
608 MAX_UINT64 = 0xFFFFFFFFFFFFFFFF
609
610 def __init__(self):
611 raise NotImplementedError("EventDeclaration cannot be instantiated")
612
613 @property
614 def name(self):
615 """Return the name of the event or None on error"""
616
617 return nbt._bt_ctf_get_decl_event_name(self._ed)
618
619 @property
620 def id(self):
621 """Return the event-ID of the event or -1 on error"""
622
623 id = nbt._bt_ctf_get_decl_event_id(self._ed)
624
625 if id == self.MAX_UINT64:
626 id = -1
627
628 return id
629
630 @property
631 def fields(self):
632 """
633 Generator returning all FieldDeclarations of an event, going through
634 each scope in the following order:
635 1) EVENT_FIELDS
636 2) EVENT_CONTEXT
637 3) STREAM_EVENT_CONTEXT
638 4) STREAM_EVENT_HEADER
639 5) STREAM_PACKET_CONTEXT
640 6) TRACE_PACKET_HEADER
641 """
642
643 for scope in _scopes:
644 for declaration in self.fields_scope(scope):
645 yield declaration
646
647 def fields_scope(self, scope):
648 """
649 Generator returning FieldDeclarations of the current event in scope.
650 """
651 ret = nbt._by_python_field_decl_listcaller(self._ed, scope)
652
653 if not isinstance(ret, list):
654 return
655
656 list_ptr, count = ret
657
658 for i in range(count):
659 field_decl_ptr = nbt._bt_python_field_decl_one_from_list(list_ptr, i)
660
661 if field_decl_ptr is not None:
662 decl_ptr = nbt._bt_ctf_get_decl_from_field_decl(field_decl_ptr)
663 name = nbt._bt_ctf_get_decl_field_name(field_decl_ptr)
664 field_declaration = _create_field_declaration(decl_ptr, name,
665 scope)
666 yield field_declaration
667
668
669 class FieldDeclaration:
670 """Field declaration class. Do not instantiate."""
671
672 def __init__(self):
673 raise NotImplementedError("FieldDeclaration cannot be instantiated")
674
675 def __repr__(self):
676 return "({0}) {1} {2}".format(CTFScope.scope_name(self.scope),
677 CTFTypeId.type_name(self.type),
678 self.name)
679
680 @property
681 def name(self):
682 """Return the name of a FieldDeclaration or None on error."""
683
684 return self._name
685
686 @property
687 def type(self):
688 """
689 Return the FieldDeclaration's type. One of the entries in class
690 CTFTypeId.
691 """
692
693 return nbt._bt_ctf_field_type(self._fd)
694
695 @property
696 def scope(self):
697 """
698 Return the FieldDeclaration's scope.
699 """
700
701 return self._s
702
703
704 class IntegerFieldDeclaration(FieldDeclaration):
705 """Do not instantiate."""
706
707 def __init__(self):
708 raise NotImplementedError("IntegerFieldDeclaration cannot be instantiated")
709
710 @property
711 def signedness(self):
712 """
713 Return the signedness of an integer:
714 0 if unsigned; 1 if signed; -1 on error.
715 """
716
717 return nbt._bt_ctf_get_int_signedness(self._fd)
718
719 @property
720 def base(self):
721 """Return the base of an int or a negative value on error."""
722
723 return nbt._bt_ctf_get_int_base(self._fd)
724
725 @property
726 def byte_order(self):
727 """
728 Return the byte order. One of class ByteOrder's entries.
729 """
730
731 ret = nbt._bt_ctf_get_int_byte_order(self._fd)
732
733 if ret == 1234:
734 return ByteOrder.BYTE_ORDER_LITTLE_ENDIAN
735 elif ret == 4321:
736 return ByteOrder.BYTE_ORDER_BIG_ENDIAN
737 else:
738 return ByteOrder.BYTE_ORDER_UNKNOWN
739
740 @property
741 def length(self):
742 """
743 Return the size, in bits, of an int or a negative
744 value on error.
745 """
746
747 return nbt._bt_ctf_get_int_len(self._fd)
748
749 @property
750 def encoding(self):
751 """
752 Return the encoding. One of class CTFStringEncoding's entries.
753 Return a negative value on error.
754 """
755
756 return nbt._bt_ctf_get_encoding(self._fd)
757
758
759 class EnumerationFieldDeclaration(FieldDeclaration):
760 """Do not instantiate."""
761
762 def __init__(self):
763 raise NotImplementedError("EnumerationFieldDeclaration cannot be instantiated")
764
765
766 class ArrayFieldDeclaration(FieldDeclaration):
767 """Do not instantiate."""
768
769 def __init__(self):
770 raise NotImplementedError("ArrayFieldDeclaration cannot be instantiated")
771
772 @property
773 def length(self):
774 """
775 Return the length of an array or a negative
776 value on error.
777 """
778
779 return nbt._bt_ctf_get_array_len(self._fd)
780
781 @property
782 def element_declaration(self):
783 """
784 Return element declaration.
785 """
786
787 field_decl_ptr = nbt._bt_python_get_array_element_declaration(self._fd)
788
789 return _create_field_declaration(field_decl_ptr, "", self.scope)
790
791
792 class SequenceFieldDeclaration(FieldDeclaration):
793 """Do not instantiate."""
794
795 def __init__(self):
796 raise NotImplementedError("SequenceFieldDeclaration cannot be instantiated")
797
798 @property
799 def element_declaration(self):
800 """
801 Return element declaration.
802 """
803
804 field_decl_ptr = nbt._bt_python_get_sequence_element_declaration(self._fd)
805
806 return _create_field_declaration(field_decl_ptr, "", self.scope)
807
808
809 class FloatFieldDeclaration(FieldDeclaration):
810 """Do not instantiate."""
811
812 def __init__(self):
813 raise NotImplementedError("FloatFieldDeclaration cannot be instantiated")
814
815
816 class StructureFieldDeclaration(FieldDeclaration):
817 """Do not instantiate."""
818
819 def __init__(self):
820 raise NotImplementedError("StructureFieldDeclaration cannot be instantiated")
821
822
823 class StringFieldDeclaration(FieldDeclaration):
824 """Do not instantiate."""
825
826 def __init__(self):
827 raise NotImplementedError("StringFieldDeclaration cannot be instantiated")
828
829
830 class VariantFieldDeclaration(FieldDeclaration):
831 """Do not instantiate."""
832
833 def __init__(self):
834 raise NotImplementedError("VariantFieldDeclaration cannot be instantiated")
835
836
837 def field_error():
838 """
839 Return the last error code encountered while
840 accessing a field and reset the error flag.
841 Return 0 if no error, a negative value otherwise.
842 """
843
844 return nbt._bt_ctf_field_get_error()
845
846
847 def _create_field_declaration(declaration_ptr, name, scope):
848 """
849 Private field declaration factory.
850 """
851
852 if declaration_ptr is None:
853 raise ValueError("declaration_ptr must be valid")
854 if scope not in _scopes:
855 raise ValueError("Invalid scope provided")
856
857 type = nbt._bt_ctf_field_type(declaration_ptr)
858 declaration = None
859
860 if type == CTFTypeId.INTEGER:
861 declaration = IntegerFieldDeclaration.__new__(IntegerFieldDeclaration)
862 elif type == CTFTypeId.ENUM:
863 declaration = EnumerationFieldDeclaration.__new__(EnumerationFieldDeclaration)
864 elif type == CTFTypeId.ARRAY:
865 declaration = ArrayFieldDeclaration.__new__(ArrayFieldDeclaration)
866 elif type == CTFTypeId.SEQUENCE:
867 declaration = SequenceFieldDeclaration.__new__(SequenceFieldDeclaration)
868 elif type == CTFTypeId.FLOAT:
869 declaration = FloatFieldDeclaration.__new__(FloatFieldDeclaration)
870 elif type == CTFTypeId.STRUCT:
871 declaration = StructureFieldDeclaration.__new__(StructureFieldDeclaration)
872 elif type == CTFTypeId.STRING:
873 declaration = StringFieldDeclaration.__new__(StringFieldDeclaration)
874 elif type == CTFTypeId.VARIANT:
875 declaration = VariantFieldDeclaration.__new__(VariantFieldDeclaration)
876 else:
877 return declaration
878
879 declaration._fd = declaration_ptr
880 declaration._s = scope
881 declaration._name = name
882
883 return declaration
884
885
886 class _Definition:
887 def __init__(self, definition_ptr, scope):
888 self._d = definition_ptr
889 self._s = scope
890
891 if scope not in _scopes:
892 ValueError("Invalid scope provided")
893
894 @property
895 def name(self):
896 """Return the name of a field or None on error."""
897
898 return nbt._bt_ctf_field_name(self._d)
899
900 @property
901 def type(self):
902 """Return the type of a field or -1 if unknown."""
903
904 return nbt._bt_ctf_field_type(nbt._bt_ctf_get_decl_from_def(self._d))
905
906 @property
907 def declaration(self):
908 """Return the associated Definition object."""
909
910 return _create_field_declaration(
911 nbt._bt_ctf_get_decl_from_def(self._d), self.name, self.scope)
912
913 def _get_enum_str(self):
914 """
915 Return the string matching the current enumeration.
916 Return None on error.
917 """
918
919 return nbt._bt_ctf_get_enum_str(self._d)
920
921 def _get_array_element_at(self, index):
922 """
923 Return the array's element at position index.
924 Return None on error
925 """
926
927 array_ptr = nbt._bt_python_get_array_from_def(self._d)
928
929 if array_ptr is None:
930 return None
931
932 definition_ptr = nbt._bt_array_index(array_ptr, index)
933
934 if definition_ptr is None:
935 return None
936
937 return _Definition(definition_ptr, self.scope)
938
939 def _get_sequence_len(self):
940 """
941 Return the len of a sequence or a negative
942 value on error.
943 """
944
945 seq = nbt._bt_python_get_sequence_from_def(self._d)
946
947 return nbt._bt_sequence_len(seq)
948
949 def _get_sequence_element_at(self, index):
950 """
951 Return the sequence's element at position index,
952 otherwise return None
953 """
954
955 seq = nbt._bt_python_get_sequence_from_def(self._d)
956
957 if seq is not None:
958 definition_ptr = nbt._bt_sequence_index(seq, index)
959
960 if definition_ptr is not None:
961 return _Definition(definition_ptr, self.scope)
962
963 return None
964
965 def _get_uint64(self):
966 """
967 Return the value associated with the field.
968 If the field does not exist or is not of the type requested,
969 the value returned is undefined. To check if an error occured,
970 use the field_error() function after accessing a field.
971 """
972
973 return nbt._bt_ctf_get_uint64(self._d)
974
975 def _get_int64(self):
976 """
977 Return the value associated with the field.
978 If the field does not exist or is not of the type requested,
979 the value returned is undefined. To check if an error occured,
980 use the field_error() function after accessing a field.
981 """
982
983 return nbt._bt_ctf_get_int64(self._d)
984
985 def _get_char_array(self):
986 """
987 Return the value associated with the field.
988 If the field does not exist or is not of the type requested,
989 the value returned is undefined. To check if an error occurred,
990 use the field_error() function after accessing a field.
991 """
992
993 return nbt._bt_ctf_get_char_array(self._d)
994
995 def _get_str(self):
996 """
997 Return the value associated with the field.
998 If the field does not exist or is not of the type requested,
999 the value returned is undefined. To check if an error occurred,
1000 use the field_error() function after accessing a field.
1001 """
1002
1003 return nbt._bt_ctf_get_string(self._d)
1004
1005 def _get_float(self):
1006 """
1007 Return the value associated with the field.
1008 If the field does not exist or is not of the type requested,
1009 the value returned is undefined. To check if an error occurred,
1010 use the field_error() function after accessing a field.
1011 """
1012
1013 return nbt._bt_ctf_get_float(self._d)
1014
1015 def _get_variant(self):
1016 """
1017 Return the variant's selected field.
1018 If the field does not exist or is not of the type requested,
1019 the value returned is undefined. To check if an error occurred,
1020 use the field_error() function after accessing a field.
1021 """
1022
1023 return nbt._bt_ctf_get_variant(self._d)
1024
1025 def _get_struct_field_count(self):
1026 """
1027 Return the number of fields contained in the structure.
1028 If the field does not exist or is not of the type requested,
1029 the value returned is undefined.
1030 """
1031
1032 return nbt._bt_ctf_get_struct_field_count(self._d)
1033
1034 def _get_struct_field_at(self, i):
1035 """
1036 Return the structure's field at position i.
1037 If the field does not exist or is not of the type requested,
1038 the value returned is undefined. To check if an error occurred,
1039 use the field_error() function after accessing a field.
1040 """
1041
1042 return nbt._bt_ctf_get_struct_field_index(self._d, i)
1043
1044 @property
1045 def value(self):
1046 """
1047 Return the value associated with the field according to its type.
1048 Return None on error.
1049 """
1050
1051 id = self.type
1052 value = None
1053
1054 if id == CTFTypeId.STRING:
1055 value = self._get_str()
1056 elif id == CTFTypeId.ARRAY:
1057 element_decl = self.declaration.element_declaration
1058
1059 if ((element_decl.type == CTFTypeId.INTEGER
1060 and element_decl.length == 8)
1061 and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)):
1062 value = nbt._bt_python_get_array_string(self._d)
1063 else:
1064 value = []
1065
1066 for i in range(self.declaration.length):
1067 element = self._get_array_element_at(i)
1068 value.append(element.value)
1069 elif id == CTFTypeId.INTEGER:
1070 if self.declaration.signedness == 0:
1071 value = self._get_uint64()
1072 else:
1073 value = self._get_int64()
1074 elif id == CTFTypeId.ENUM:
1075 value = self._get_enum_str()
1076 elif id == CTFTypeId.SEQUENCE:
1077 element_decl = self.declaration.element_declaration
1078
1079 if ((element_decl.type == CTFTypeId.INTEGER
1080 and element_decl.length == 8)
1081 and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)):
1082 value = nbt._bt_python_get_sequence_string(self._d)
1083 else:
1084 seq_len = self._get_sequence_len()
1085 value = []
1086
1087 for i in range(seq_len):
1088 evDef = self._get_sequence_element_at(i)
1089 value.append(evDef.value)
1090 elif id == CTFTypeId.FLOAT:
1091 value = self._get_float()
1092 elif id == CTFTypeId.VARIANT:
1093 variant = _Definition.__new__(_Definition)
1094 variant._d = self._get_variant()
1095 value = variant.value
1096 elif id == CTFTypeId.STRUCT:
1097 value = {}
1098
1099 for i in range(self._get_struct_field_count()):
1100 member = _Definition(self._get_struct_field_at(i), self.scope)
1101 value[member.name] = member.value
1102
1103 if field_error():
1104 raise FieldError(
1105 "Error occurred while accessing field {} of type {}".format(
1106 self.name,
1107 CTFTypeId.type_name(id)))
1108
1109 return value
1110
1111 @property
1112 def scope(self):
1113 """Return the scope of a field or None on error."""
1114
1115 return self._s
1116
1117
1118 class CTFWriter:
1119 # Used to compare to -1ULL in error checks
1120 _MAX_UINT64 = 0xFFFFFFFFFFFFFFFF
1121
1122 class EnumerationMapping:
1123 """
1124 Enumeration mapping class. start and end values are inclusive.
1125 """
1126
1127 def __init__(self, name, start, end):
1128 self.name = name
1129 self.start = start
1130 self.end = end
1131
1132 class Clock:
1133 def __init__(self, name):
1134 self._c = nbt._bt_ctf_clock_create(name)
1135
1136 if self._c is None:
1137 raise ValueError("Invalid clock name.")
1138
1139 def __del__(self):
1140 nbt._bt_ctf_clock_put(self._c)
1141
1142 @property
1143 def name(self):
1144 """
1145 Get the clock's name.
1146 """
1147
1148 name = nbt._bt_ctf_clock_get_name(self._c)
1149
1150 if name is None:
1151 raise ValueError("Invalid clock instance.")
1152
1153 return name
1154
1155 @property
1156 def description(self):
1157 """
1158 Get the clock's description. None if unset.
1159 """
1160
1161 return nbt._bt_ctf_clock_get_description(self._c)
1162
1163 @description.setter
1164 def description(self, desc):
1165 """
1166 Set the clock's description. The description appears in the clock's TSDL
1167 meta-data.
1168 """
1169
1170 ret = nbt._bt_ctf_clock_set_description(self._c, str(desc))
1171
1172 if ret < 0:
1173 raise ValueError("Invalid clock description.")
1174
1175 @property
1176 def frequency(self):
1177 """
1178 Get the clock's frequency (Hz).
1179 """
1180
1181 freq = nbt._bt_ctf_clock_get_frequency(self._c)
1182
1183 if freq == CTFWriter._MAX_UINT64:
1184 raise ValueError("Invalid clock instance")
1185
1186 return freq
1187
1188 @frequency.setter
1189 def frequency(self, freq):
1190 """
1191 Set the clock's frequency (Hz).
1192 """
1193
1194 ret = nbt._bt_ctf_clock_set_frequency(self._c, freq)
1195
1196 if ret < 0:
1197 raise ValueError("Invalid frequency value.")
1198
1199 @property
1200 def precision(self):
1201 """
1202 Get the clock's precision (in clock ticks).
1203 """
1204
1205 precision = nbt._bt_ctf_clock_get_precision(self._c)
1206
1207 if precision == CTFWriter._MAX_UINT64:
1208 raise ValueError("Invalid clock instance")
1209
1210 return precision
1211
1212 @precision.setter
1213 def precision(self, precision):
1214 """
1215 Set the clock's precision (in clock ticks).
1216 """
1217
1218 ret = nbt._bt_ctf_clock_set_precision(self._c, precision)
1219
1220 @property
1221 def offset_seconds(self):
1222 """
1223 Get the clock's offset in seconds from POSIX.1 Epoch.
1224 """
1225
1226 offset_s = nbt._bt_ctf_clock_get_offset_s(self._c)
1227
1228 if offset_s == CTFWriter._MAX_UINT64:
1229 raise ValueError("Invalid clock instance")
1230
1231 return offset_s
1232
1233 @offset_seconds.setter
1234 def offset_seconds(self, offset_s):
1235 """
1236 Set the clock's offset in seconds from POSIX.1 Epoch.
1237 """
1238
1239 ret = nbt._bt_ctf_clock_set_offset_s(self._c, offset_s)
1240
1241 if ret < 0:
1242 raise ValueError("Invalid offset value.")
1243
1244 @property
1245 def offset(self):
1246 """
1247 Get the clock's offset in ticks from POSIX.1 Epoch + offset in seconds.
1248 """
1249
1250 offset = nbt._bt_ctf_clock_get_offset(self._c)
1251
1252 if offset == CTFWriter._MAX_UINT64:
1253 raise ValueError("Invalid clock instance")
1254
1255 return offset
1256
1257 @offset.setter
1258 def offset(self, offset):
1259 """
1260 Set the clock's offset in ticks from POSIX.1 Epoch + offset in seconds.
1261 """
1262
1263 ret = nbt._bt_ctf_clock_set_offset(self._c, offset)
1264
1265 if ret < 0:
1266 raise ValueError("Invalid offset value.")
1267
1268 @property
1269 def absolute(self):
1270 """
1271 Get a clock's absolute attribute. A clock is absolute if the clock
1272 is a global reference across the trace's other clocks.
1273 """
1274
1275 is_absolute = nbt._bt_ctf_clock_get_is_absolute(self._c)
1276
1277 if is_absolute == -1:
1278 raise ValueError("Invalid clock instance")
1279
1280 return False if is_absolute == 0 else True
1281
1282 @absolute.setter
1283 def absolute(self, is_absolute):
1284 """
1285 Set a clock's absolute attribute. A clock is absolute if the clock
1286 is a global reference across the trace's other clocks.
1287 """
1288
1289 ret = nbt._bt_ctf_clock_set_is_absolute(self._c, int(is_absolute))
1290
1291 if ret < 0:
1292 raise ValueError("Could not set the clock's absolute attribute.")
1293
1294 @property
1295 def uuid(self):
1296 """
1297 Get a clock's UUID (an object of type UUID).
1298 """
1299
1300 uuid_list = []
1301
1302 for i in range(16):
1303 ret, value = nbt._bt_python_ctf_clock_get_uuid_index(self._c, i)
1304
1305 if ret < 0:
1306 raise ValueError("Invalid clock instance")
1307
1308 uuid_list.append(value)
1309
1310 return UUID(bytes=bytes(uuid_list))
1311
1312 @uuid.setter
1313 def uuid(self, uuid):
1314 """
1315 Set a clock's UUID (an object of type UUID).
1316 """
1317
1318 uuid_bytes = uuid.bytes
1319
1320 if len(uuid_bytes) != 16:
1321 raise ValueError("Invalid UUID provided. UUID length must be 16 bytes")
1322
1323 for i in range(len(uuid_bytes)):
1324 ret = nbt._bt_python_ctf_clock_set_uuid_index(self._c, i,
1325 uuid_bytes[i])
1326
1327 if ret < 0:
1328 raise ValueError("Invalid clock instance")
1329
1330 @property
1331 def time(self):
1332 """
1333 Get the current time in nanoseconds since the clock's origin (offset and
1334 offset_s attributes).
1335 """
1336
1337 time = nbt._bt_ctf_clock_get_time(self._c)
1338
1339 if time == CTFWriter._MAX_UINT64:
1340 raise ValueError("Invalid clock instance")
1341
1342 return time
1343
1344 @time.setter
1345 def time(self, time):
1346 """
1347 Set the current time in nanoseconds since the clock's origin (offset and
1348 offset_s attributes). The clock's value will be sampled as events are
1349 appended to a stream.
1350 """
1351
1352 ret = nbt._bt_ctf_clock_set_time(self._c, time)
1353
1354 if ret < 0:
1355 raise ValueError("Invalid time value.")
1356
1357 class FieldDeclaration:
1358 """
1359 FieldDeclaration should not be instantiated directly. Instantiate
1360 one of the concrete FieldDeclaration classes.
1361 """
1362
1363 class IntegerBase:
1364 # These values are based on the bt_ctf_integer_base enum
1365 # declared in event-types.h.
1366 INTEGER_BASE_UNKNOWN = -1
1367 INTEGER_BASE_BINARY = 2
1368 INTEGER_BASE_OCTAL = 8
1369 INTEGER_BASE_DECIMAL = 10
1370 INTEGER_BASE_HEXADECIMAL = 16
1371
1372 def __init__(self):
1373 if self._ft is None:
1374 raise ValueError("FieldDeclaration creation failed.")
1375
1376 def __del__(self):
1377 nbt._bt_ctf_field_type_put(self._ft)
1378
1379 @staticmethod
1380 def _create_field_declaration_from_native_instance(
1381 native_field_declaration):
1382 type_dict = {
1383 CTFTypeId.INTEGER: CTFWriter.IntegerFieldDeclaration,
1384 CTFTypeId.FLOAT: CTFWriter.FloatFieldDeclaration,
1385 CTFTypeId.ENUM: CTFWriter.EnumerationFieldDeclaration,
1386 CTFTypeId.STRING: CTFWriter.StringFieldDeclaration,
1387 CTFTypeId.STRUCT: CTFWriter.StructureFieldDeclaration,
1388 CTFTypeId.VARIANT: CTFWriter.VariantFieldDeclaration,
1389 CTFTypeId.ARRAY: CTFWriter.ArrayFieldDeclaration,
1390 CTFTypeId.SEQUENCE: CTFWriter.SequenceFieldDeclaration
1391 }
1392
1393 field_type_id = nbt._bt_ctf_field_type_get_type_id(native_field_declaration)
1394
1395 if field_type_id == CTFTypeId.UNKNOWN:
1396 raise TypeError("Invalid field instance")
1397
1398 declaration = CTFWriter.Field.__new__(CTFWriter.Field)
1399 declaration._ft = native_field_declaration
1400 declaration.__class__ = type_dict[field_type_id]
1401
1402 return declaration
1403
1404 @property
1405 def alignment(self):
1406 """
1407 Get the field declaration's alignment. Returns -1 on error.
1408 """
1409
1410 return nbt._bt_ctf_field_type_get_alignment(self._ft)
1411
1412 @alignment.setter
1413 def alignment(self, alignment):
1414 """
1415 Set the field declaration's alignment. Defaults to 1 (bit-aligned). However,
1416 some types, such as structures and string, may impose other alignment
1417 constraints.
1418 """
1419
1420 ret = nbt._bt_ctf_field_type_set_alignment(self._ft, alignment)
1421
1422 if ret < 0:
1423 raise ValueError("Invalid alignment value.")
1424
1425 @property
1426 def byte_order(self):
1427 """
1428 Get the field declaration's byte order. One of the ByteOrder's constant.
1429 """
1430
1431 return nbt._bt_ctf_field_type_get_byte_order(self._ft)
1432
1433 @byte_order.setter
1434 def byte_order(self, byte_order):
1435 """
1436 Set the field declaration's byte order. Use constants defined in the ByteOrder
1437 class.
1438 """
1439
1440 ret = nbt._bt_ctf_field_type_set_byte_order(self._ft, byte_order)
1441
1442 if ret < 0:
1443 raise ValueError("Could not set byte order value.")
1444
1445 class IntegerFieldDeclaration(FieldDeclaration):
1446 def __init__(self, size):
1447 """
1448 Create a new integer field declaration of the given size.
1449 """
1450 self._ft = nbt._bt_ctf_field_type_integer_create(size)
1451 super().__init__()
1452
1453 @property
1454 def size(self):
1455 """
1456 Get an integer's size.
1457 """
1458
1459 ret = nbt._bt_ctf_field_type_integer_get_size(self._ft)
1460
1461 if ret < 0:
1462 raise ValueError("Could not get Integer's size attribute.")
1463 else:
1464 return ret
1465
1466 @property
1467 def signed(self):
1468 """
1469 Get an integer's signedness attribute.
1470 """
1471
1472 ret = nbt._bt_ctf_field_type_integer_get_signed(self._ft)
1473
1474 if ret < 0:
1475 raise ValueError("Could not get Integer's signed attribute.")
1476 elif ret > 0:
1477 return True
1478 else:
1479 return False
1480
1481 @signed.setter
1482 def signed(self, signed):
1483 """
1484 Set an integer's signedness attribute.
1485 """
1486
1487 ret = nbt._bt_ctf_field_type_integer_set_signed(self._ft, signed)
1488
1489 if ret < 0:
1490 raise ValueError("Could not set Integer's signed attribute.")
1491
1492 @property
1493 def base(self):
1494 """
1495 Get the integer's base used to pretty-print the resulting trace.
1496 Returns a constant from the FieldDeclaration.IntegerBase class.
1497 """
1498
1499 return nbt._bt_ctf_field_type_integer_get_base(self._ft)
1500
1501 @base.setter
1502 def base(self, base):
1503 """
1504 Set the integer's base used to pretty-print the resulting trace.
1505 The base must be a constant of the FieldDeclarationIntegerBase class.
1506 """
1507
1508 ret = nbt._bt_ctf_field_type_integer_set_base(self._ft, base)
1509
1510 if ret < 0:
1511 raise ValueError("Could not set Integer's base.")
1512
1513 @property
1514 def encoding(self):
1515 """
1516 Get the integer's encoding (one of the constants of the
1517 CTFStringEncoding class).
1518 Returns a constant from the CTFStringEncoding class.
1519 """
1520
1521 return nbt._bt_ctf_field_type_integer_get_encoding(self._ft)
1522
1523 @encoding.setter
1524 def encoding(self, encoding):
1525 """
1526 An integer encoding may be set to signal that the integer must be printed
1527 as a text character. Must be a constant from the CTFStringEncoding class.
1528 """
1529
1530 ret = nbt._bt_ctf_field_type_integer_set_encoding(self._ft, encoding)
1531
1532 if ret < 0:
1533 raise ValueError("Could not set Integer's encoding.")
1534
1535 class EnumerationFieldDeclaration(FieldDeclaration):
1536 def __init__(self, integer_type):
1537 """
1538 Create a new enumeration field declaration with the given underlying container type.
1539 """
1540 isinst = isinstance(integer_type, CTFWriter.IntegerFieldDeclaration)
1541
1542 if integer_type is None or not isinst:
1543 raise TypeError("Invalid integer container.")
1544
1545 self._ft = nbt._bt_ctf_field_type_enumeration_create(integer_type._ft)
1546 super().__init__()
1547
1548 @property
1549 def container(self):
1550 """
1551 Get the enumeration's underlying container type.
1552 """
1553
1554 ret = nbt._bt_ctf_field_type_enumeration_get_container_type(self._ft)
1555
1556 if ret is None:
1557 raise TypeError("Invalid enumeration declaration")
1558
1559 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1560
1561 def add_mapping(self, name, range_start, range_end):
1562 """
1563 Add a mapping to the enumeration. The range's values are inclusive.
1564 """
1565
1566 if range_start < 0 or range_end < 0:
1567 ret = nbt._bt_ctf_field_type_enumeration_add_mapping(self._ft,
1568 str(name),
1569 range_start,
1570 range_end)
1571 else:
1572 ret = nbt._bt_ctf_field_type_enumeration_add_mapping_unsigned(self._ft,
1573 str(name),
1574 range_start,
1575 range_end)
1576
1577 if ret < 0:
1578 raise ValueError("Could not add mapping to enumeration declaration.")
1579
1580 @property
1581 def mappings(self):
1582 """
1583 Generator returning instances of EnumerationMapping.
1584 """
1585
1586 signed = self.container.signed
1587
1588 count = nbt._bt_ctf_field_type_enumeration_get_mapping_count(self._ft)
1589
1590 for i in range(count):
1591 if signed:
1592 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, i)
1593 else:
1594 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, i)
1595
1596 if len(ret) != 3:
1597 msg = "Could not get Enumeration mapping at index {}".format(i)
1598 raise TypeError(msg)
1599
1600 name, range_start, range_end = ret
1601 yield CTFWriter.EnumerationMapping(name, range_start, range_end)
1602
1603 def get_mapping_by_name(self, name):
1604 """
1605 Get a mapping by name (EnumerationMapping).
1606 """
1607
1608 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_name(self._ft, name)
1609
1610 if index < 0:
1611 return None
1612
1613 if self.container.signed:
1614 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index)
1615 else:
1616 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index)
1617
1618 if len(ret) != 3:
1619 msg = "Could not get Enumeration mapping at index {}".format(i)
1620 raise TypeError(msg)
1621
1622 name, range_start, range_end = ret
1623
1624 return CTFWriter.EnumerationMapping(name, range_start, range_end)
1625
1626 def get_mapping_by_value(self, value):
1627 """
1628 Get a mapping by value (EnumerationMapping).
1629 """
1630
1631 if value < 0:
1632 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_value(self._ft, value)
1633 else:
1634 index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_unsigned_value(self._ft, value)
1635
1636 if index < 0:
1637 return None
1638
1639 if self.container.signed:
1640 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index)
1641 else:
1642 ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index)
1643
1644 if len(ret) != 3:
1645 msg = "Could not get Enumeration mapping at index {}".format(i)
1646 raise TypeError(msg)
1647
1648 name, range_start, range_end = ret
1649
1650 return CTFWriter.EnumerationMapping(name, range_start, range_end)
1651
1652 class FloatFieldDeclaration(FieldDeclaration):
1653 FLT_EXP_DIG = 8
1654 DBL_EXP_DIG = 11
1655 FLT_MANT_DIG = 24
1656 DBL_MANT_DIG = 53
1657
1658 def __init__(self):
1659 """
1660 Create a new floating point field declaration.
1661 """
1662
1663 self._ft = nbt._bt_ctf_field_type_floating_point_create()
1664 super().__init__()
1665
1666 @property
1667 def exponent_digits(self):
1668 """
1669 Get the number of exponent digits used to store the floating point field.
1670 """
1671
1672 ret = nbt._bt_ctf_field_type_floating_point_get_exponent_digits(self._ft)
1673
1674 if ret < 0:
1675 raise TypeError(
1676 "Could not get Floating point exponent digit count")
1677
1678 return ret
1679
1680 @exponent_digits.setter
1681 def exponent_digits(self, exponent_digits):
1682 """
1683 Set the number of exponent digits to use to store the floating point field.
1684 The only values currently supported are FLT_EXP_DIG and DBL_EXP_DIG which
1685 are defined as constants of this class.
1686 """
1687
1688 ret = nbt._bt_ctf_field_type_floating_point_set_exponent_digits(self._ft,
1689 exponent_digits)
1690
1691 if ret < 0:
1692 raise ValueError("Could not set exponent digit count.")
1693
1694 @property
1695 def mantissa_digits(self):
1696 """
1697 Get the number of mantissa digits used to store the floating point field.
1698 """
1699
1700 ret = nbt._bt_ctf_field_type_floating_point_get_mantissa_digits(self._ft)
1701
1702 if ret < 0:
1703 raise TypeError("Could not get Floating point mantissa digit count")
1704
1705 return ret
1706
1707 @mantissa_digits.setter
1708 def mantissa_digits(self, mantissa_digits):
1709 """
1710 Set the number of mantissa digits to use to store the floating point field.
1711 The only values currently supported are FLT_MANT_DIG and DBL_MANT_DIG which
1712 are defined as constants of this class.
1713 """
1714
1715 ret = nbt._bt_ctf_field_type_floating_point_set_mantissa_digits(self._ft,
1716 mantissa_digits)
1717
1718 if ret < 0:
1719 raise ValueError("Could not set mantissa digit count.")
1720
1721 class StructureFieldDeclaration(FieldDeclaration):
1722 def __init__(self):
1723 """
1724 Create a new structure field declaration.
1725 """
1726
1727 self._ft = nbt._bt_ctf_field_type_structure_create()
1728 super().__init__()
1729
1730 def add_field(self, field_type, field_name):
1731 """
1732 Add a field of type "field_type" to the structure.
1733 """
1734
1735 ret = nbt._bt_ctf_field_type_structure_add_field(self._ft,
1736 field_type._ft,
1737 str(field_name))
1738
1739 if ret < 0:
1740 raise ValueError("Could not add field to structure.")
1741
1742 @property
1743 def fields(self):
1744 """
1745 Generator returning the structure's field as tuples of (field name, field declaration).
1746 """
1747
1748 count = nbt._bt_ctf_field_type_structure_get_field_count(self._ft)
1749
1750 if count < 0:
1751 raise TypeError("Could not get Structure field count")
1752
1753 for i in range(count):
1754 field_name = nbt._bt_python_ctf_field_type_structure_get_field_name(self._ft, i)
1755
1756 if field_name is None:
1757 msg = "Could not get Structure field name at index {}".format(i)
1758 raise TypeError(msg)
1759
1760 field_type_native = nbt._bt_python_ctf_field_type_structure_get_field_type(self._ft, i)
1761
1762 if field_type_native is None:
1763 msg = "Could not get Structure field type at index {}".format(i)
1764 raise TypeError(msg)
1765
1766 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1767 yield (field_name, field_type)
1768
1769 def get_field_by_name(self, name):
1770 """
1771 Get a field declaration by name (FieldDeclaration).
1772 """
1773
1774 field_type_native = nbt._bt_ctf_field_type_structure_get_field_type_by_name(self._ft, name)
1775
1776 if field_type_native is None:
1777 msg = "Could not find Structure field with name {}".format(name)
1778 raise TypeError(msg)
1779
1780 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1781
1782 class VariantFieldDeclaration(FieldDeclaration):
1783 def __init__(self, enum_tag, tag_name):
1784 """
1785 Create a new variant field declaration.
1786 """
1787
1788 isinst = isinstance(enum_tag, CTFWriter.EnumerationFieldDeclaration)
1789 if enum_tag is None or not isinst:
1790 raise TypeError("Invalid tag type; must be of type EnumerationFieldDeclaration.")
1791
1792 self._ft = nbt._bt_ctf_field_type_variant_create(enum_tag._ft,
1793 str(tag_name))
1794 super().__init__()
1795
1796 @property
1797 def tag_name(self):
1798 """
1799 Get the variant's tag name.
1800 """
1801
1802 ret = nbt._bt_ctf_field_type_variant_get_tag_name(self._ft)
1803
1804 if ret is None:
1805 raise TypeError("Could not get Variant tag name")
1806
1807 return ret
1808
1809 @property
1810 def tag_type(self):
1811 """
1812 Get the variant's tag type.
1813 """
1814
1815 ret = nbt._bt_ctf_field_type_variant_get_tag_type(self._ft)
1816
1817 if ret is None:
1818 raise TypeError("Could not get Variant tag type")
1819
1820 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1821
1822 def add_field(self, field_type, field_name):
1823 """
1824 Add a field of type "field_type" to the variant.
1825 """
1826
1827 ret = nbt._bt_ctf_field_type_variant_add_field(self._ft,
1828 field_type._ft,
1829 str(field_name))
1830
1831 if ret < 0:
1832 raise ValueError("Could not add field to variant.")
1833
1834 @property
1835 def fields(self):
1836 """
1837 Generator returning the variant's field as tuples of (field name, field declaration).
1838 """
1839
1840 count = nbt._bt_ctf_field_type_variant_get_field_count(self._ft)
1841
1842 if count < 0:
1843 raise TypeError("Could not get Variant field count")
1844
1845 for i in range(count):
1846 field_name = nbt._bt_python_ctf_field_type_variant_get_field_name(self._ft, i)
1847
1848 if field_name is None:
1849 msg = "Could not get Variant field name at index {}".format(i)
1850 raise TypeError(msg)
1851
1852 field_type_native = nbt._bt_python_ctf_field_type_variant_get_field_type(self._ft, i)
1853
1854 if field_type_native is None:
1855 msg = "Could not get Variant field type at index {}".format(i)
1856 raise TypeError(msg)
1857
1858 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1859 yield (field_name, field_type)
1860
1861 def get_field_by_name(self, name):
1862 """
1863 Get a field declaration by name (FieldDeclaration).
1864 """
1865
1866 field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_by_name(self._ft,
1867 name)
1868
1869 if field_type_native is None:
1870 msg = "Could not find Variant field with name {}".format(name)
1871 raise TypeError(msg)
1872
1873 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1874
1875 def get_field_from_tag(self, tag):
1876 """
1877 Get a field declaration from tag (EnumerationField).
1878 """
1879
1880 field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_from_tag(self._ft, tag._f)
1881
1882 if field_type_native is None:
1883 msg = "Could not find Variant field with tag value {}".format(tag.value)
1884 raise TypeError(msg)
1885
1886 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
1887
1888 class ArrayFieldDeclaration(FieldDeclaration):
1889 def __init__(self, element_type, length):
1890 """
1891 Create a new array field declaration.
1892 """
1893
1894 self._ft = nbt._bt_ctf_field_type_array_create(element_type._ft,
1895 length)
1896 super().__init__()
1897
1898 @property
1899 def element_type(self):
1900 """
1901 Get the array's element type.
1902 """
1903
1904 ret = nbt._bt_ctf_field_type_array_get_element_type(self._ft)
1905
1906 if ret is None:
1907 raise TypeError("Could not get Array element type")
1908
1909 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1910
1911 @property
1912 def length(self):
1913 """
1914 Get the array's length.
1915 """
1916
1917 ret = nbt._bt_ctf_field_type_array_get_length(self._ft)
1918
1919 if ret < 0:
1920 raise TypeError("Could not get Array length")
1921
1922 return ret
1923
1924 class SequenceFieldDeclaration(FieldDeclaration):
1925 def __init__(self, element_type, length_field_name):
1926 """
1927 Create a new sequence field declaration.
1928 """
1929
1930 self._ft = nbt._bt_ctf_field_type_sequence_create(element_type._ft,
1931 str(length_field_name))
1932 super().__init__()
1933
1934 @property
1935 def element_type(self):
1936 """
1937 Get the sequence's element type.
1938 """
1939
1940 ret = nbt._bt_ctf_field_type_sequence_get_element_type(self._ft)
1941
1942 if ret is None:
1943 raise TypeError("Could not get Sequence element type")
1944
1945 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret)
1946
1947 @property
1948 def length_field_name(self):
1949 """
1950 Get the sequence's length field name.
1951 """
1952
1953 ret = nbt._bt_ctf_field_type_sequence_get_length_field_name(self._ft)
1954
1955 if ret is None:
1956 raise TypeError("Could not get Sequence length field name")
1957
1958 return ret
1959
1960 class StringFieldDeclaration(FieldDeclaration):
1961 def __init__(self):
1962 """
1963 Create a new string field declaration.
1964 """
1965
1966 self._ft = nbt._bt_ctf_field_type_string_create()
1967 super().__init__()
1968
1969 @property
1970 def encoding(self):
1971 """
1972 Get a string declaration's encoding (a constant from the CTFStringEncoding class).
1973 """
1974
1975 return nbt._bt_ctf_field_type_string_get_encoding(self._ft)
1976
1977 @encoding.setter
1978 def encoding(self, encoding):
1979 """
1980 Set a string declaration's encoding. Must be a constant from the CTFStringEncoding class.
1981 """
1982
1983 ret = nbt._bt_ctf_field_type_string_set_encoding(self._ft, encoding)
1984 if ret < 0:
1985 raise ValueError("Could not set string encoding.")
1986
1987 @staticmethod
1988 def create_field(field_type):
1989 """
1990 Create an instance of a field.
1991 """
1992 isinst = isinstance(field_type, CTFWriter.FieldDeclaration)
1993
1994 if field_type is None or not isinst:
1995 raise TypeError("Invalid field_type. Type must be a FieldDeclaration-derived class.")
1996
1997 if isinstance(field_type, CTFWriter.IntegerFieldDeclaration):
1998 return CTFWriter.IntegerField(field_type)
1999 elif isinstance(field_type, CTFWriter.EnumerationFieldDeclaration):
2000 return CTFWriter.EnumerationField(field_type)
2001 elif isinstance(field_type, CTFWriter.FloatFieldDeclaration):
2002 return CTFWriter.FloatFieldingPoint(field_type)
2003 elif isinstance(field_type, CTFWriter.StructureFieldDeclaration):
2004 return CTFWriter.StructureField(field_type)
2005 elif isinstance(field_type, CTFWriter.VariantFieldDeclaration):
2006 return CTFWriter.VariantField(field_type)
2007 elif isinstance(field_type, CTFWriter.ArrayFieldDeclaration):
2008 return CTFWriter.ArrayField(field_type)
2009 elif isinstance(field_type, CTFWriter.SequenceFieldDeclaration):
2010 return CTFWriter.SequenceField(field_type)
2011 elif isinstance(field_type, CTFWriter.StringFieldDeclaration):
2012 return CTFWriter.StringField(field_type)
2013
2014 class Field:
2015 """
2016 Base class, do not instantiate.
2017 """
2018
2019 def __init__(self, field_type):
2020 if not isinstance(field_type, CTFWriter.FieldDeclaration):
2021 raise TypeError("Invalid field_type argument.")
2022
2023 self._f = nbt._bt_ctf_field_create(field_type._ft)
2024
2025 if self._f is None:
2026 raise ValueError("Field creation failed.")
2027
2028 def __del__(self):
2029 nbt._bt_ctf_field_put(self._f)
2030
2031 @staticmethod
2032 def _create_field_from_native_instance(native_field_instance):
2033 type_dict = {
2034 CTFTypeId.INTEGER: CTFWriter.IntegerField,
2035 CTFTypeId.FLOAT: CTFWriter.FloatFieldingPoint,
2036 CTFTypeId.ENUM: CTFWriter.EnumerationField,
2037 CTFTypeId.STRING: CTFWriter.StringField,
2038 CTFTypeId.STRUCT: CTFWriter.StructureField,
2039 CTFTypeId.VARIANT: CTFWriter.VariantField,
2040 CTFTypeId.ARRAY: CTFWriter.ArrayField,
2041 CTFTypeId.SEQUENCE: CTFWriter.SequenceField
2042 }
2043
2044 field_type = nbt._bt_python_get_field_type(native_field_instance)
2045
2046 if field_type == CTFTypeId.UNKNOWN:
2047 raise TypeError("Invalid field instance")
2048
2049 field = CTFWriter.Field.__new__(CTFWriter.Field)
2050 field._f = native_field_instance
2051 field.__class__ = type_dict[field_type]
2052
2053 return field
2054
2055 @property
2056 def declaration(self):
2057 native_field_type = nbt._bt_ctf_field_get_type(self._f)
2058
2059 if native_field_type is None:
2060 raise TypeError("Invalid field instance")
2061 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(
2062 native_field_type)
2063
2064 class IntegerField(Field):
2065 @property
2066 def value(self):
2067 """
2068 Get an integer field's value.
2069 """
2070
2071 signedness = nbt._bt_python_field_integer_get_signedness(self._f)
2072
2073 if signedness < 0:
2074 raise TypeError("Invalid integer instance.")
2075
2076 if signedness == 0:
2077 ret, value = nbt._bt_ctf_field_unsigned_integer_get_value(self._f)
2078 else:
2079 ret, value = nbt._bt_ctf_field_signed_integer_get_value(self._f)
2080
2081 if ret < 0:
2082 raise ValueError("Could not get integer field value.")
2083
2084 return value
2085
2086 @value.setter
2087 def value(self, value):
2088 """
2089 Set an integer field's value.
2090 """
2091
2092 if not isinstance(value, int):
2093 raise TypeError("IntegerField's value must be an int")
2094
2095 signedness = nbt._bt_python_field_integer_get_signedness(self._f)
2096 if signedness < 0:
2097 raise TypeError("Invalid integer instance.")
2098
2099 if signedness == 0:
2100 ret = nbt._bt_ctf_field_unsigned_integer_set_value(self._f, value)
2101 else:
2102 ret = nbt._bt_ctf_field_signed_integer_set_value(self._f, value)
2103
2104 if ret < 0:
2105 raise ValueError("Could not set integer field value.")
2106
2107 class EnumerationField(Field):
2108 @property
2109 def container(self):
2110 """
2111 Return the enumeration's underlying container field (an integer field).
2112 """
2113
2114 container = CTFWriter.IntegerField.__new__(CTFWriter.IntegerField)
2115 container._f = nbt._bt_ctf_field_enumeration_get_container(self._f)
2116
2117 if container._f is None:
2118 raise TypeError("Invalid enumeration field type.")
2119
2120 return container
2121
2122 @property
2123 def value(self):
2124 """
2125 Get the enumeration field's mapping name.
2126 """
2127
2128 value = nbt._bt_ctf_field_enumeration_get_mapping_name(self._f)
2129
2130 if value is None:
2131 raise ValueError("Could not get enumeration's mapping name.")
2132
2133 return value
2134
2135 @value.setter
2136 def value(self, value):
2137 """
2138 Set the enumeration field's value. Must be an integer as mapping names
2139 may be ambiguous.
2140 """
2141
2142 if not isinstance(value, int):
2143 raise TypeError("EnumerationField value must be an int")
2144
2145 self.container.value = value
2146
2147 class FloatFieldingPoint(Field):
2148 @property
2149 def value(self):
2150 """
2151 Get a floating point field's value.
2152 """
2153
2154 ret, value = nbt._bt_ctf_field_floating_point_get_value(self._f)
2155
2156 if ret < 0:
2157 raise ValueError("Could not get floating point field value.")
2158
2159 return value
2160
2161 @value.setter
2162 def value(self, value):
2163 """
2164 Set a floating point field's value.
2165 """
2166
2167 if not isinstance(value, int) and not isinstance(value, float):
2168 raise TypeError("Value must be either a float or an int")
2169
2170 ret = nbt._bt_ctf_field_floating_point_set_value(self._f, float(value))
2171
2172 if ret < 0:
2173 raise ValueError("Could not set floating point field value.")
2174
2175 class StructureField(Field):
2176 def field(self, field_name):
2177 """
2178 Get the structure's field corresponding to the provided field name.
2179 """
2180
2181 native_instance = nbt._bt_ctf_field_structure_get_field(self._f,
2182 str(field_name))
2183
2184 if native_instance is None:
2185 raise ValueError("Invalid field_name provided.")
2186
2187 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2188
2189 class VariantField(Field):
2190 def field(self, tag):
2191 """
2192 Return the variant's selected field. The "tag" field is the selector enum field.
2193 """
2194
2195 native_instance = nbt._bt_ctf_field_variant_get_field(self._f, tag._f)
2196
2197 if native_instance is None:
2198 raise ValueError("Invalid tag provided.")
2199
2200 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2201
2202 class ArrayField(Field):
2203 def field(self, index):
2204 """
2205 Return the array's field at position "index".
2206 """
2207
2208 native_instance = nbt._bt_ctf_field_array_get_field(self._f, index)
2209
2210 if native_instance is None:
2211 raise IndexError("Invalid index provided.")
2212
2213 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2214
2215 class SequenceField(Field):
2216 @property
2217 def length(self):
2218 """
2219 Get the sequence's length field (IntegerField).
2220 """
2221
2222 native_instance = nbt._bt_ctf_field_sequence_get_length(self._f)
2223
2224 if native_instance is None:
2225 length = -1
2226
2227 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2228
2229 @length.setter
2230 def length(self, length_field):
2231 """
2232 Set the sequence's length field (IntegerField).
2233 """
2234
2235 if not isinstance(length_field, CTFWriter.IntegerField):
2236 raise TypeError("Invalid length field.")
2237
2238 if length_field.declaration.signed:
2239 raise TypeError("Sequence field length must be unsigned")
2240
2241 ret = nbt._bt_ctf_field_sequence_set_length(self._f, length_field._f)
2242
2243 if ret < 0:
2244 raise ValueError("Could not set sequence length.")
2245
2246 def field(self, index):
2247 """
2248 Return the sequence's field at position "index".
2249 """
2250
2251 native_instance = nbt._bt_ctf_field_sequence_get_field(self._f, index)
2252
2253 if native_instance is None:
2254 raise ValueError("Could not get sequence element at index.")
2255
2256 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2257
2258 class StringField(Field):
2259 @property
2260 def value(self):
2261 """
2262 Get a string field's value.
2263 """
2264
2265 return nbt._bt_ctf_field_string_get_value(self._f)
2266
2267 @value.setter
2268 def value(self, value):
2269 """
2270 Set a string field's value.
2271 """
2272
2273 ret = nbt._bt_ctf_field_string_set_value(self._f, str(value))
2274
2275 if ret < 0:
2276 raise ValueError("Could not set string field value.")
2277
2278 class EventClass:
2279 def __init__(self, name):
2280 """
2281 Create a new event class of the given name.
2282 """
2283
2284 self._ec = nbt._bt_ctf_event_class_create(name)
2285
2286 if self._ec is None:
2287 raise ValueError("Event class creation failed.")
2288
2289 def __del__(self):
2290 nbt._bt_ctf_event_class_put(self._ec)
2291
2292 def add_field(self, field_type, field_name):
2293 """
2294 Add a field of type "field_type" to the event class.
2295 """
2296
2297 ret = nbt._bt_ctf_event_class_add_field(self._ec, field_type._ft,
2298 str(field_name))
2299
2300 if ret < 0:
2301 raise ValueError("Could not add field to event class.")
2302
2303 @property
2304 def name(self):
2305 """
2306 Get the event class' name.
2307 """
2308
2309 name = nbt._bt_ctf_event_class_get_name(self._ec)
2310
2311 if name is None:
2312 raise TypeError("Could not get EventClass name")
2313
2314 return name
2315
2316 @property
2317 def id(self):
2318 """
2319 Get the event class' id. Returns a negative value if unset.
2320 """
2321
2322 id = nbt._bt_ctf_event_class_get_id(self._ec)
2323
2324 if id < 0:
2325 raise TypeError("Could not get EventClass id")
2326
2327 return id
2328
2329 @id.setter
2330 def id(self, id):
2331 """
2332 Set the event class' id. Throws a TypeError if the event class
2333 is already registered to a stream class.
2334 """
2335
2336 ret = nbt._bt_ctf_event_class_set_id(self._ec, id)
2337
2338 if ret < 0:
2339 raise TypeError("Can't change an Event Class's id after it has been assigned to a stream class")
2340
2341 @property
2342 def stream_class(self):
2343 """
2344 Get the event class' stream class. Returns None if unset.
2345 """
2346 stream_class_native = nbt._bt_ctf_event_class_get_stream_class(self._ec)
2347
2348 if stream_class_native is None:
2349 return None
2350
2351 stream_class = CTFWriter.StreamClass.__new__(CTFWriter.StreamClass)
2352 stream_class._sc = stream_class_native
2353
2354 return stream_class
2355
2356 @property
2357 def fields(self):
2358 """
2359 Generator returning the event class' fields as tuples of (field name, field declaration).
2360 """
2361
2362 count = nbt._bt_ctf_event_class_get_field_count(self._ec)
2363
2364 if count < 0:
2365 raise TypeError("Could not get EventClass' field count")
2366
2367 for i in range(count):
2368 field_name = nbt._bt_python_ctf_event_class_get_field_name(self._ec, i)
2369
2370 if field_name is None:
2371 msg = "Could not get EventClass' field name at index {}".format(i)
2372 raise TypeError(msg)
2373
2374 field_type_native = nbt._bt_python_ctf_event_class_get_field_type(self._ec, i)
2375
2376 if field_type_native is None:
2377 msg = "Could not get EventClass' field type at index {}".format(i)
2378 raise TypeError(msg)
2379
2380 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2381 yield (field_name, field_type)
2382
2383 def get_field_by_name(self, name):
2384 """
2385 Get a field declaration by name (FieldDeclaration).
2386 """
2387
2388 field_type_native = nbt._bt_ctf_event_class_get_field_by_name(self._ec, name)
2389
2390 if field_type_native is None:
2391 msg = "Could not find EventClass field with name {}".format(name)
2392 raise TypeError(msg)
2393
2394 return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2395
2396 class Event:
2397 def __init__(self, event_class):
2398 """
2399 Create a new event of the given event class.
2400 """
2401
2402 if not isinstance(event_class, CTFWriter.EventClass):
2403 raise TypeError("Invalid event_class argument.")
2404
2405 self._e = nbt._bt_ctf_event_create(event_class._ec)
2406
2407 if self._e is None:
2408 raise ValueError("Event creation failed.")
2409
2410 def __del__(self):
2411 nbt._bt_ctf_event_put(self._e)
2412
2413 @property
2414 def event_class(self):
2415 """
2416 Get the event's class.
2417 """
2418
2419 event_class_native = nbt._bt_ctf_event_get_class(self._e)
2420
2421 if event_class_native is None:
2422 return None
2423
2424 event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass)
2425 event_class._ec = event_class_native
2426
2427 return event_class
2428
2429 def clock(self):
2430 """
2431 Get a clock from event. Returns None if the event's class
2432 is not registered to a stream class.
2433 """
2434
2435 clock_instance = nbt._bt_ctf_event_get_clock(self._e)
2436
2437 if clock_instance is None:
2438 return None
2439
2440 clock = CTFWriter.Clock.__new__(CTFWriter.Clock)
2441 clock._c = clock_instance
2442
2443 return clock
2444
2445 def payload(self, field_name):
2446 """
2447 Get a field from event.
2448 """
2449
2450 native_instance = nbt._bt_ctf_event_get_payload(self._e,
2451 str(field_name))
2452
2453 if native_instance is None:
2454 raise ValueError("Could not get event payload.")
2455
2456 return CTFWriter.Field._create_field_from_native_instance(native_instance)
2457
2458 def set_payload(self, field_name, value_field):
2459 """
2460 Set a manually created field as an event's payload.
2461 """
2462
2463 if not isinstance(value, CTFWriter.Field):
2464 raise TypeError("Invalid value type.")
2465
2466 ret = nbt._bt_ctf_event_set_payload(self._e, str(field_name),
2467 value_field._f)
2468
2469 if ret < 0:
2470 raise ValueError("Could not set event field payload.")
2471
2472 class StreamClass:
2473 def __init__(self, name):
2474 """
2475 Create a new stream class of the given name.
2476 """
2477
2478 self._sc = nbt._bt_ctf_stream_class_create(name)
2479
2480 if self._sc is None:
2481 raise ValueError("Stream class creation failed.")
2482
2483 def __del__(self):
2484 nbt._bt_ctf_stream_class_put(self._sc)
2485
2486 @property
2487 def name(self):
2488 """
2489 Get a stream class' name.
2490 """
2491
2492 name = nbt._bt_ctf_stream_class_get_name(self._sc)
2493
2494 if name is None:
2495 raise TypeError("Could not get StreamClass name")
2496
2497 return name
2498
2499 @property
2500 def clock(self):
2501 """
2502 Get a stream class' clock.
2503 """
2504
2505 clock_instance = nbt._bt_ctf_stream_class_get_clock(self._sc)
2506
2507 if clock_instance is None:
2508 return None
2509
2510 clock = CTFWriter.Clock.__new__(CTFWriter.Clock)
2511 clock._c = clock_instance
2512
2513 return clock
2514
2515 @clock.setter
2516 def clock(self, clock):
2517 """
2518 Assign a clock to a stream class.
2519 """
2520
2521 if not isinstance(clock, CTFWriter.Clock):
2522 raise TypeError("Invalid clock type.")
2523
2524 ret = nbt._bt_ctf_stream_class_set_clock(self._sc, clock._c)
2525
2526 if ret < 0:
2527 raise ValueError("Could not set stream class clock.")
2528
2529 @property
2530 def id(self):
2531 """
2532 Get a stream class' id.
2533 """
2534
2535 ret = nbt._bt_ctf_stream_class_get_id(self._sc)
2536
2537 if ret < 0:
2538 raise TypeError("Could not get StreamClass id")
2539
2540 return ret
2541
2542 @id.setter
2543 def id(self, id):
2544 """
2545 Assign an id to a stream class.
2546 """
2547
2548 ret = nbt._bt_ctf_stream_class_set_id(self._sc, id)
2549
2550 if ret < 0:
2551 raise TypeError("Could not set stream class id.")
2552
2553 @property
2554 def event_classes(self):
2555 """
2556 Generator returning the stream class' event classes.
2557 """
2558
2559 count = nbt._bt_ctf_stream_class_get_event_class_count(self._sc)
2560
2561 if count < 0:
2562 raise TypeError("Could not get StreamClass' event class count")
2563
2564 for i in range(count):
2565 event_class_native = nbt._bt_ctf_stream_class_get_event_class(self._sc, i)
2566
2567 if event_class_native is None:
2568 msg = "Could not get StreamClass' event class at index {}".format(i)
2569 raise TypeError(msg)
2570
2571 event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass)
2572 event_class._ec = event_class_native
2573 yield event_class
2574
2575 def add_event_class(self, event_class):
2576 """
2577 Add an event class to a stream class. New events can be added even after a
2578 stream has been instantiated and events have been appended. However, a stream
2579 will not accept events of a class that has not been added to the stream
2580 class beforehand.
2581 """
2582
2583 if not isinstance(event_class, CTFWriter.EventClass):
2584 raise TypeError("Invalid event_class type.")
2585
2586 ret = nbt._bt_ctf_stream_class_add_event_class(self._sc,
2587 event_class._ec)
2588
2589 if ret < 0:
2590 raise ValueError("Could not add event class.")
2591
2592 @property
2593 def packet_context_type(self):
2594 """
2595 Get the StreamClass' packet context type (StructureFieldDeclaration)
2596 """
2597
2598 field_type_native = nbt._bt_ctf_stream_class_get_packet_context_type(self._sc)
2599
2600 if field_type_native is None:
2601 raise ValueError("Invalid StreamClass")
2602
2603 field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native)
2604
2605 return field_type
2606
2607 @packet_context_type.setter
2608 def packet_context_type(self, field_type):
2609 """
2610 Set a StreamClass' packet context type. Must be of type
2611 StructureFieldDeclaration.
2612 """
2613
2614 if not isinstance(field_type, CTFWriter.StructureFieldDeclaration):
2615 raise TypeError("field_type argument must be of type StructureFieldDeclaration.")
2616
2617 ret = nbt._bt_ctf_stream_class_set_packet_context_type(self._sc,
2618 field_type._ft)
2619
2620 if ret < 0:
2621 raise ValueError("Failed to set packet context type.")
2622
2623 class Stream:
2624 def __init__(self, stream_class):
2625 """
2626 Create a stream of the given class.
2627 """
2628
2629 if not isinstance(stream_class, CTFWriter.StreamClass):
2630 raise TypeError("Invalid stream_class argument must be of type StreamClass.")
2631
2632 self._s = nbt._bt_ctf_stream_create(stream_class._sc)
2633
2634 if self._s is None:
2635 raise ValueError("Stream creation failed.")
2636
2637 def __del__(self):
2638 nbt._bt_ctf_stream_put(self._s)
2639
2640 @property
2641 def discarded_events(self):
2642 """
2643 Get a stream's discarded event count.
2644 """
2645
2646 ret, count = nbt._bt_ctf_stream_get_discarded_events_count(self._s)
2647
2648 if ret < 0:
2649 raise ValueError("Could not get the stream's discarded events count")
2650
2651 return count
2652
2653 def append_discarded_events(self, event_count):
2654 """
2655 Increase the current packet's discarded event count.
2656 """
2657
2658 nbt._bt_ctf_stream_append_discarded_events(self._s, event_count)
2659
2660 def append_event(self, event):
2661 """
2662 Append "event" to the stream's current packet. The stream's associated clock
2663 will be sampled during this call. The event shall not be modified after
2664 being appended to a stream.
2665 """
2666
2667 ret = nbt._bt_ctf_stream_append_event(self._s, event._e)
2668
2669 if ret < 0:
2670 raise ValueError("Could not append event to stream.")
2671
2672 @property
2673 def packet_context(self):
2674 """
2675 Get a Stream's packet context field (a StructureField).
2676 """
2677
2678 native_field = nbt._bt_ctf_stream_get_packet_context(self._s)
2679
2680 if native_field is None:
2681 raise ValueError("Invalid Stream.")
2682
2683 return CTFWriter.Field._create_field_from_native_instance(native_field)
2684
2685 @packet_context.setter
2686 def packet_context(self, field):
2687 """
2688 Set a Stream's packet context field (must be a StructureField).
2689 """
2690
2691 if not isinstance(field, CTFWriter.StructureField):
2692 raise TypeError("Argument field must be of type StructureField")
2693
2694 ret = nbt._bt_ctf_stream_set_packet_context(self._s, field._f)
2695
2696 if ret < 0:
2697 raise ValueError("Invalid packet context field.")
2698
2699 def flush(self):
2700 """
2701 The stream's current packet's events will be flushed to disk. Events
2702 subsequently appended to the stream will be added to a new packet.
2703 """
2704
2705 ret = nbt._bt_ctf_stream_flush(self._s)
2706
2707 if ret < 0:
2708 raise ValueError("Could not flush stream.")
2709
2710 class Writer:
2711 def __init__(self, path):
2712 """
2713 Create a new writer that will produce a trace in the given path.
2714 """
2715
2716 self._w = nbt._bt_ctf_writer_create(path)
2717
2718 if self._w is None:
2719 raise ValueError("Writer creation failed.")
2720
2721 def __del__(self):
2722 nbt._bt_ctf_writer_put(self._w)
2723
2724 def create_stream(self, stream_class):
2725 """
2726 Create a new stream instance and register it to the writer.
2727 """
2728
2729 if not isinstance(stream_class, CTFWriter.StreamClass):
2730 raise TypeError("Invalid stream_class type.")
2731
2732 stream = CTFWriter.Stream.__new__(CTFWriter.Stream)
2733 stream._s = nbt._bt_ctf_writer_create_stream(self._w, stream_class._sc)
2734
2735 return stream
2736
2737 def add_environment_field(self, name, value):
2738 """
2739 Add an environment field to the trace.
2740 """
2741
2742 ret = nbt._bt_ctf_writer_add_environment_field(self._w, str(name),
2743 str(value))
2744
2745 if ret < 0:
2746 raise ValueError("Could not add environment field to trace.")
2747
2748 def add_clock(self, clock):
2749 """
2750 Add a clock to the trace. Clocks assigned to stream classes must be
2751 registered to the writer.
2752 """
2753
2754 ret = nbt._bt_ctf_writer_add_clock(self._w, clock._c)
2755
2756 if ret < 0:
2757 raise ValueError("Could not add clock to Writer.")
2758
2759 @property
2760 def metadata(self):
2761 """
2762 Get the trace's TSDL meta-data.
2763 """
2764
2765 return nbt._bt_ctf_writer_get_metadata_string(self._w)
2766
2767 def flush_metadata(self):
2768 """
2769 Flush the trace's metadata to the metadata file.
2770 """
2771
2772 nbt._bt_ctf_writer_flush_metadata(self._w)
2773
2774 @property
2775 def byte_order(self):
2776 """
2777 Get the trace's byte order. Must be a constant from the ByteOrder
2778 class.
2779 """
2780
2781 raise NotImplementedError("Getter not implemented.")
2782
2783 @byte_order.setter
2784 def byte_order(self, byte_order):
2785 """
2786 Set the trace's byte order. Must be a constant from the ByteOrder
2787 class. Defaults to the host machine's endianness
2788 """
2789
2790 ret = nbt._bt_ctf_writer_set_byte_order(self._w, byte_order)
2791
2792 if ret < 0:
2793 raise ValueError("Could not set trace's byte order.")
This page took 0.133079 seconds and 5 git commands to generate.