From be5a4e673f4603dd4867e945ee6054de41be78c2 Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Tue, 9 Dec 2014 18:01:07 -0500 Subject: [PATCH] Python: split API in reader/writer modules MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit babeltrace package's modules are now: common: class CTFStringEncoding class ByteOrder class CTFTypeId class CTFScope reader: everything related to reading writer: CTF writer API Backward compatibility is ensured by importing the appropriate modules into the package itself. Signed-off-by: Philippe Proulx Signed-off-by: Jérémie Galarneau --- bindings/python/Makefile.am | 4 +- bindings/python/__init__.py.in | 22 +- bindings/python/bt.py | 3048 -------------------------------- bindings/python/common.py | 168 ++ bindings/python/reader.py | 1223 +++++++++++++ bindings/python/writer.py | 1732 ++++++++++++++++++ 6 files changed, 3141 insertions(+), 3056 deletions(-) delete mode 100644 bindings/python/bt.py create mode 100644 bindings/python/common.py create mode 100644 bindings/python/reader.py create mode 100644 bindings/python/writer.py diff --git a/bindings/python/Makefile.am b/bindings/python/Makefile.am index 0f8e0bd7..ffea7a1d 100644 --- a/bindings/python/Makefile.am +++ b/bindings/python/Makefile.am @@ -5,8 +5,8 @@ __init__.py: __init__.py.in AM_CFLAGS = $(PYTHON_INCLUDE) -I$(top_srcdir)/include/ -EXTRA_DIST = __init__.py.in nativebt.i bt.py -nodist_btpackage_PYTHON = __init__.py nativebt.py bt.py +EXTRA_DIST = __init__.py.in nativebt.i common.py reader.py writer.py +nodist_btpackage_PYTHON = __init__.py nativebt.py common.py reader.py writer.py nativebtlib_LTLIBRARIES = _nativebt.la btpackagedir = $(pythondir)/babeltrace diff --git a/bindings/python/__init__.py.in b/bindings/python/__init__.py.in index 2c4c567a..a1d4d17d 100644 --- a/bindings/python/__init__.py.in +++ b/bindings/python/__init__.py.in @@ -1,10 +1,15 @@ -from .bt import \ - TraceCollection, \ - TraceHandle, \ +# backward compatibility with old `babeltrace` module: import common members +from .common import \ CTFStringEncoding, \ ByteOrder, \ CTFTypeId, \ - CTFScope, \ + CTFScope + + +# backward compatibility with old `babeltrace` module: import reader API members +from .reader import \ + TraceCollection, \ + TraceHandle, \ Event, \ FieldError, \ EventDeclaration, \ @@ -16,8 +21,13 @@ from .bt import \ FloatFieldDeclaration, \ StructureFieldDeclaration, \ StringFieldDeclaration, \ - VariantFieldDeclaration, \ - CTFWriter + VariantFieldDeclaration + + +# backward compatibility with old `babeltrace` module: import CTF writer API +# module as `CTFWriter`, since `CTFWriter` used to be a class in the +# `babeltrace` module +import babeltrace.writer as CTFWriter __version__ = 'BABELTRACE_VERSION_STR' diff --git a/bindings/python/bt.py b/bindings/python/bt.py deleted file mode 100644 index 09cc9d2f..00000000 --- a/bindings/python/bt.py +++ /dev/null @@ -1,3048 +0,0 @@ -# nativebt.i.in -# -# Babeltrace native interface Python module -# -# Copyright 2012-2015 EfficiOS Inc. -# -# Author: Danny Serres -# Author: Jérémie Galarneau -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import babeltrace.nativebt as nbt -import collections -import os -from datetime import datetime -from uuid import UUID - - -class TraceCollection: - """ - A :class:`TraceCollection` is a collection of opened traces. - - In general, once a trace collection is created, you add one to many - independent traces to it using :meth:`add_trace` or - :meth:`add_traces_recursive`, and then iterate the ordered events - of all traces merged together using :attr:`events`. - - You may use :meth:`remove_trace` to close and remove a specific - trace from a trace collection, although all the traces of a given - trace collection will be automatically removed when it is garbage - collected. - """ - - def __init__(self): - """ - Creates an empty trace collection. - """ - - self._tc = nbt._bt_context_create() - - def __del__(self): - nbt._bt_context_put(self._tc) - - def add_trace(self, path, format_str): - """ - Adds a trace to the trace collection. - - The trace is located at the file system path *path*. This - function **does not** recurse directories to find the trace: - *path* must point to the exact trace location (see - :meth:`add_traces_recursive` for a recursive version of this - function). - - *format_str* is a string indicating the Babeltrace type of the - trace to add. ``ctf`` is the only currently supported trace - format. - - Once added, the trace is opened. - - Returns the corresponding :class:`TraceHandle` instance for - this opened trace on success, or ``None`` on error. - """ - - ret = nbt._bt_context_add_trace(self._tc, path, format_str, - None, None, None) - - if ret < 0: - return None - - th = TraceHandle.__new__(TraceHandle) - th._id = ret - th._trace_collection = self - - return th - - def add_traces_recursive(self, path, format_str): - """ - Adds traces to this trace collection by recursively searching - in the *path* directory. - - *format_str* is a string indicating the Babeltrace type of the - traces to find and add. ``ctf`` is the only currently supported - trace format. - - See also :meth:`add_trace`. - - Returns a :class:`dict` object mapping full paths to trace - handles for each trace found, or ``None`` on error. - """ - - trace_handles = {} - noTrace = True - error = False - - for fullpath, dirs, files in os.walk(path): - if "metadata" in files: - trace_handle = self.add_trace(fullpath, format_str) - - if trace_handle is None: - error = True - continue - - trace_handles[fullpath] = trace_handle - noTrace = False - - if noTrace and error: - return None - - return trace_handles - - def remove_trace(self, trace_handle): - """ - Removes a trace from the trace collection using its trace - handle *trace_handle*. - - :class:`TraceHandle` objects are returned by :meth:`add_trace` - and :meth:`add_traces_recursive`. - - The trace is closed before being removed. - """ - - try: - nbt._bt_context_remove_trace(self._tc, trace_handle._id) - except AttributeError: - raise TypeError("in remove_trace, argument 2 must be a TraceHandle instance") - - @property - def events(self): - """ - Generates the ordered :class:`Event` objects of all the opened - traces contained in this trace collection. Iterate this function - to iterate actual events. - - Due to limitations of the native Babeltrace API, only one event - may be "alive" at a given time, i.e. a user **should never** - store a copy of the events returned by this function for - ulterior use. Users shall make sure to copy the information - they need *from* an event before accessing the next one. - - Furthermore, :class:`Event` objects become invalid when the - generator goes out of scope as the underlying iterator will be - reclaimed. Using an event after the the generator has gone out - of scope may result in a crash or data corruption. - """ - - begin_pos_ptr = nbt._bt_iter_pos() - end_pos_ptr = nbt._bt_iter_pos() - begin_pos_ptr.type = nbt.SEEK_BEGIN - end_pos_ptr.type = nbt.SEEK_LAST - - for event in self._events(begin_pos_ptr, end_pos_ptr): - yield event - - def events_timestamps(self, timestamp_begin, timestamp_end): - """ - Generates the ordered :class:`Event` objects of all the opened - traces contained in this trace collection from *timestamp_begin* - to *timestamp_end*. - - See :attr:`events` for notes and limitations. - """ - - begin_pos_ptr = nbt._bt_iter_pos() - end_pos_ptr = nbt._bt_iter_pos() - begin_pos_ptr.type = end_pos_ptr.type = nbt.SEEK_TIME - begin_pos_ptr.u.seek_time = timestamp_begin - end_pos_ptr.u.seek_time = timestamp_end - - for event in self._events(begin_pos_ptr, end_pos_ptr): - yield event - - @property - def timestamp_begin(self): - """ - Trace collection's begin timestamp. - """ - - pos_ptr = nbt._bt_iter_pos() - pos_ptr.type = nbt.SEEK_BEGIN - - return self._timestamp_at_pos(pos_ptr) - - @property - def timestamp_end(self): - """ - Trace collection's end timestamp. - """ - - pos_ptr = nbt._bt_iter_pos() - pos_ptr.type = nbt.SEEK_LAST - - return self._timestamp_at_pos(pos_ptr) - - def _timestamp_at_pos(self, pos_ptr): - ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, pos_ptr, pos_ptr) - - if ctf_it_ptr is None: - raise NotImplementedError("Creation of multiple iterators is unsupported.") - - ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr) - nbt._bt_ctf_iter_destroy(ctf_it_ptr) - - def _events(self, begin_pos_ptr, end_pos_ptr): - ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, begin_pos_ptr, end_pos_ptr) - - if ctf_it_ptr is None: - raise NotImplementedError("Creation of multiple iterators is unsupported.") - - while True: - ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr) - - if ev_ptr is None: - break - - ev = Event.__new__(Event) - ev._e = ev_ptr - - try: - yield ev - except GeneratorExit: - break - - ret = nbt._bt_iter_next(nbt._bt_ctf_get_iter(ctf_it_ptr)) - - if ret != 0: - break - - nbt._bt_ctf_iter_destroy(ctf_it_ptr) - - -# Based on enum bt_clock_type in clock-type.h -class _ClockType: - CLOCK_CYCLES = 0 - CLOCK_REAL = 1 - - -class TraceHandle: - """ - A :class:`TraceHandle` is a handle allowing the user to manipulate - a specific trace directly. It is a unique identifier representing a - trace, and is not meant to be instantiated by the user. - """ - - def __init__(self): - raise NotImplementedError("TraceHandle cannot be instantiated") - - def __repr__(self): - return "Babeltrace TraceHandle: trace_id('{0}')".format(self._id) - - @property - def id(self): - """ - Trace handle's numeric ID. - """ - - return self._id - - @property - def path(self): - """ - Path of the underlying trace. - """ - - return nbt._bt_trace_handle_get_path(self._trace_collection._tc, - self._id) - - @property - def timestamp_begin(self): - """ - Buffers creation timestamp (nanoseconds since Epoch) of the - underlying trace. - """ - - return nbt._bt_trace_handle_get_timestamp_begin(self._trace_collection._tc, - self._id, - _ClockType.CLOCK_REAL) - - @property - def timestamp_end(self): - """ - Buffers destruction timestamp (nanoseconds since Epoch) of the - underlying trace. - """ - - return nbt._bt_trace_handle_get_timestamp_end(self._trace_collection._tc, - self._id, - _ClockType.CLOCK_REAL) - - @property - def events(self): - """ - Generates all the :class:`EventDeclaration` objects of the - underlying trace. - - Note that this doesn't generate actual trace *events*, but - rather their declarations, i.e. their layouts and metadata. - """ - - ret = nbt._bt_python_event_decl_listcaller(self.id, - self._trace_collection._tc) - - if not isinstance(ret, list): - return - - ptr_list, count = ret - - for i in range(count): - tmp = EventDeclaration.__new__(EventDeclaration) - tmp._ed = nbt._bt_python_decl_one_from_list(ptr_list, i) - yield tmp - - -class CTFStringEncoding: - """ - CTF string encodings. - """ - - #: None - NONE = 0 - - #: UTF-8 - UTF8 = 1 - - #: ASCII - ASCII = 2 - - #: Unknown - UNKNOWN = 3 - - -# Based on the enum in ctf-writer/writer.h -class ByteOrder: - """ - Byte orders. - """ - - #: Native byte order - BYTE_ORDER_NATIVE = 0 - - #: Little-endian - BYTE_ORDER_LITTLE_ENDIAN = 1 - - #: Big-endian - BYTE_ORDER_BIG_ENDIAN = 2 - - #: Network byte order (big-endian) - BYTE_ORDER_NETWORK = 3 - - #: Unknown byte order - BYTE_ORDER_UNKNOWN = 4 # Python-specific entry - - -# enum equivalent, accessible constants -# These are taken directly from ctf/events.h -# All changes to enums must also be made here -class CTFTypeId: - """ - CTF numeric type identifiers. - """ - - #: Unknown type - UNKNOWN = 0 - - #: Integer - INTEGER = 1 - - #: Floating point number - FLOAT = 2 - - #: Enumeration - ENUM = 3 - - #: String - STRING = 4 - - #: Structure - STRUCT = 5 - - #: Untagged variant - UNTAGGED_VARIANT = 6 - - #: Variant - VARIANT = 7 - - #: Array - ARRAY = 8 - - #: Sequence - SEQUENCE = 9 - - NR_CTF_TYPES = 10 - - def type_name(id): - """ - Returns the name of the CTF numeric type identifier *id*. - """ - - name = "UNKNOWN_TYPE" - constants = [ - attr for attr in dir(CTFTypeId) if not callable( - getattr( - CTFTypeId, - attr)) and not attr.startswith("__")] - - for attr in constants: - if getattr(CTFTypeId, attr) == id: - name = attr - break - - return name - - -class CTFScope: - """ - CTF scopes. - """ - - #: Packet header - TRACE_PACKET_HEADER = 0 - - #: Packet context - STREAM_PACKET_CONTEXT = 1 - - #: Event header - STREAM_EVENT_HEADER = 2 - - #: Stream event context - STREAM_EVENT_CONTEXT = 3 - - #: Event context - EVENT_CONTEXT = 4 - - #: Event fields - EVENT_FIELDS = 5 - - def scope_name(scope): - """ - Returns the name of the CTF scope *scope*. - """ - - name = "UNKNOWN_SCOPE" - constants = [ - attr for attr in dir(CTFScope) if not callable( - getattr( - CTFScope, - attr)) and not attr.startswith("__")] - - for attr in constants: - if getattr(CTFScope, attr) == scope: - name = attr - break - - return name - - -# Priority of the scopes when searching for event fields -_scopes = [ - CTFScope.EVENT_FIELDS, - CTFScope.EVENT_CONTEXT, - CTFScope.STREAM_EVENT_CONTEXT, - CTFScope.STREAM_EVENT_HEADER, - CTFScope.STREAM_PACKET_CONTEXT, - CTFScope.TRACE_PACKET_HEADER -] - - -class Event(collections.Mapping): - """ - An :class:`Event` object represents a trace event. :class:`Event` - objects are returned by :attr:`TraceCollection.events` and are - not meant to be instantiated by the user. - - :class:`Event` has a :class:`dict`-like interface for accessing - an event's field value by field name: - - .. code-block:: python - - event['my_field'] - - If a field name exists in multiple scopes, the value of the first - field found is returned. The scopes are searched in the following - order: - - 1. Event fields (:attr:`CTFScope.EVENT_FIELDS`) - 2. Event context (:attr:`CTFScope.EVENT_CONTEXT`) - 3. Stream event context (:attr:`CTFScope.STREAM_EVENT_CONTEXT`) - 4. Event header (:attr:`CTFScope.STREAM_EVENT_HEADER`) - 5. Packet context (:attr:`CTFScope.STREAM_PACKET_CONTEXT`) - 6. Packet header (:attr:`CTFScope.TRACE_PACKET_HEADER`) - - It is still possible to obtain a field's value from a specific - scope using :meth:`field_with_scope`. - - Field values are returned as native Python types, that is: - - +-----------------------+----------------------------------+ - | Field type | Python type | - +=======================+==================================+ - | Integer | :class:`int` | - +-----------------------+----------------------------------+ - | Floating point number | :class:`float` | - +-----------------------+----------------------------------+ - | Enumeration | :class:`str` (enumeration label) | - +-----------------------+----------------------------------+ - | String | :class:`str` | - +-----------------------+----------------------------------+ - | Array | :class:`list` of native Python | - | | objects | - +-----------------------+----------------------------------+ - | Sequence | :class:`list` of native Python | - | | objects | - +-----------------------+----------------------------------+ - | Structure | :class:`dict` mapping field | - | | names to native Python objects | - +-----------------------+----------------------------------+ - - For example, printing the third element of a sequence named ``seq`` - in a structure named ``my_struct`` of the ``event``'s field named - ``my_field`` is done this way: - - .. code-block:: python - - print(event['my_field']['my_struct']['seq'][2]) - """ - - def __init__(self): - raise NotImplementedError("Event cannot be instantiated") - - @property - def name(self): - """ - Event's name or ``None`` on error. - """ - - return nbt._bt_ctf_event_name(self._e) - - @property - def cycles(self): - """ - Event's timestamp in cycles or -1 on error. - """ - - return nbt._bt_ctf_get_cycles(self._e) - - @property - def timestamp(self): - """ - Event's timestamp (nanoseconds since Epoch) or -1 on error. - """ - - return nbt._bt_ctf_get_timestamp(self._e) - - @property - def datetime(self): - """ - Event's timestamp as a standard :class:`datetime.datetime` - object. - - Note that the :class:`datetime.datetime` class' precision - is limited to microseconds, whereas :attr:`timestamp` provides - the event's timestamp with a nanosecond resolution. - """ - - return datetime.fromtimestamp(self.timestamp / 1E9) - - def field_with_scope(self, field_name, scope): - """ - Returns the value of a field named *field_name* within the - scope *scope*, or ``None`` if the field cannot be found. - - *scope* must be one of :class:`CTFScope` constants. - """ - - if scope not in _scopes: - raise ValueError("Invalid scope provided") - - field = self._field_with_scope(field_name, scope) - - if field is not None: - return field.value - - def field_list_with_scope(self, scope): - """ - Returns a list of field names in the scope *scope*. - """ - - if scope not in _scopes: - raise ValueError("Invalid scope provided") - - field_names = [] - - for field in self._field_list_with_scope(scope): - field_names.append(field.name) - - return field_names - - @property - def handle(self): - """ - :class:`TraceHandle` object containing this event, or ``None`` - on error. - """ - - ret = nbt._bt_ctf_event_get_handle_id(self._e) - - if ret < 0: - return None - - th = TraceHandle.__new__(TraceHandle) - th._id = ret - th._trace_collection = self.get_trace_collection() - - return th - - @property - def trace_collection(self): - """ - :class:`TraceCollection` object containing this event, or - ``None`` on error. - """ - - trace_collection = TraceCollection() - trace_collection._tc = nbt._bt_ctf_event_get_context(self._e) - - if trace_collection._tc is not None: - return trace_collection - - def __getitem__(self, field_name): - field = self._field(field_name) - - if field is not None: - return field.value - - raise KeyError(field_name) - - def __iter__(self): - for key in self.keys(): - yield key - - def __len__(self): - count = 0 - - for scope in _scopes: - scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) - ret = nbt._bt_python_field_listcaller(self._e, scope_ptr) - - if isinstance(ret, list): - count += ret[1] - - return count - - def __contains__(self, field_name): - return self._field(field_name) is not None - - def keys(self): - """ - Returns the list of field names. - - Note: field names are unique within the returned list, although - a field name could exist in multiple scopes. Use - :meth:`field_list_with_scope` to obtain the list of field names - of a given scope. - """ - - field_names = set() - - for scope in _scopes: - for name in self.field_list_with_scope(scope): - field_names.add(name) - - return list(field_names) - - def get(self, field_name, default=None): - """ - Returns the value of the field named *field_name*, or *default* - when not found. - - See :class:`Event` note about how fields are retrieved by - name when multiple fields share the same name in different - scopes. - """ - - field = self._field(field_name) - - if field is None: - return default - - return field.value - - def items(self): - """ - Generates pairs of (field name, field value). - - This method iterates :meth:`keys` to find field names, which - means some fields could be unavailable if other fields share - their names in scopes with higher priorities. - """ - - for field in self.keys(): - yield (field, self[field]) - - def _field_with_scope(self, field_name, scope): - scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) - - if scope_ptr is None: - return None - - definition_ptr = nbt._bt_ctf_get_field(self._e, scope_ptr, field_name) - - if definition_ptr is None: - return None - - field = _Definition(definition_ptr, scope) - - return field - - def _field(self, field_name): - field = None - - for scope in _scopes: - field = self._field_with_scope(field_name, scope) - - if field is not None: - break - - return field - - def _field_list_with_scope(self, scope): - fields = [] - scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) - - # Returns a list [list_ptr, count]. If list_ptr is NULL, SWIG will only - # provide the "count" return value - count = 0 - list_ptr = None - ret = nbt._bt_python_field_listcaller(self._e, scope_ptr) - - if isinstance(ret, list): - list_ptr, count = ret - - for i in range(count): - definition_ptr = nbt._bt_python_field_one_from_list(list_ptr, i) - - if definition_ptr is not None: - definition = _Definition(definition_ptr, scope) - fields.append(definition) - - return fields - - -class FieldError(Exception): - """ - Field error, raised when a field's value cannot be accessed. - """ - - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -class EventDeclaration: - """ - An event declaration contains the properties of a class of events, - that is, the common properties and fields layout of all the actual - recorded events associated with this declaration. - - This class is not meant to be instantiated by the user. It is - returned by :attr:`TraceHandle.events`. - """ - - MAX_UINT64 = 0xFFFFFFFFFFFFFFFF - - def __init__(self): - raise NotImplementedError("EventDeclaration cannot be instantiated") - - @property - def name(self): - """ - Event's name, or ``None`` on error. - """ - - return nbt._bt_ctf_get_decl_event_name(self._ed) - - @property - def id(self): - """ - Event's numeric ID, or -1 on error. - """ - - id = nbt._bt_ctf_get_decl_event_id(self._ed) - - if id == self.MAX_UINT64: - id = -1 - - return id - - @property - def fields(self): - """ - Generates all the event's field declarations, going through - each scope in the following order: - - 1. Event fields (:attr:`CTFScope.EVENT_FIELDS`) - 2. Event context (:attr:`CTFScope.EVENT_CONTEXT`) - 3. Stream event context (:attr:`CTFScope.STREAM_EVENT_CONTEXT`) - 4. Event header (:attr:`CTFScope.STREAM_EVENT_HEADER`) - 5. Packet context (:attr:`CTFScope.STREAM_PACKET_CONTEXT`) - 6. Packet header (:attr:`CTFScope.TRACE_PACKET_HEADER`) - - All the generated field declarations inherit - :class:`FieldDeclaration`, and are among: - - * :class:`IntegerFieldDeclaration` - * :class:`FloatFieldDeclaration` - * :class:`EnumerationFieldDeclaration` - * :class:`StringFieldDeclaration` - * :class:`ArrayFieldDeclaration` - * :class:`SequenceFieldDeclaration` - * :class:`StructureFieldDeclaration` - * :class:`VariantFieldDeclaration` - """ - - for scope in _scopes: - for declaration in self.fields_scope(scope): - yield declaration - - def fields_scope(self, scope): - """ - Generates all the field declarations of the event's scope - *scope*. - - *scope* must be one of :class:`CTFScope` constants. - - All the generated field declarations inherit - :class:`FieldDeclaration`, and are among: - - * :class:`IntegerFieldDeclaration` - * :class:`FloatFieldDeclaration` - * :class:`EnumerationFieldDeclaration` - * :class:`StringFieldDeclaration` - * :class:`ArrayFieldDeclaration` - * :class:`SequenceFieldDeclaration` - * :class:`StructureFieldDeclaration` - * :class:`VariantFieldDeclaration` - """ - ret = nbt._by_python_field_decl_listcaller(self._ed, scope) - - if not isinstance(ret, list): - return - - list_ptr, count = ret - - for i in range(count): - field_decl_ptr = nbt._bt_python_field_decl_one_from_list(list_ptr, i) - - if field_decl_ptr is not None: - decl_ptr = nbt._bt_ctf_get_decl_from_field_decl(field_decl_ptr) - name = nbt._bt_ctf_get_decl_field_name(field_decl_ptr) - field_declaration = _create_field_declaration(decl_ptr, name, - scope) - yield field_declaration - - -class FieldDeclaration: - """ - Base class for concrete field declarations. - - This class is not meant to be instantiated by the user. - """ - - def __init__(self): - raise NotImplementedError("FieldDeclaration cannot be instantiated") - - def __repr__(self): - return "({0}) {1} {2}".format(CTFScope.scope_name(self.scope), - CTFTypeId.type_name(self.type), - self.name) - - @property - def name(self): - """ - Field's name, or ``None`` on error. - """ - - return self._name - - @property - def type(self): - """ - Field's type (one of :class:`CTFTypeId` constants). - """ - - return nbt._bt_ctf_field_type(self._fd) - - @property - def scope(self): - """ - Field's scope (one of :class:`CTFScope` constants). - """ - - return self._s - - -class IntegerFieldDeclaration(FieldDeclaration): - """ - Integer field declaration. - """ - - def __init__(self): - raise NotImplementedError("IntegerFieldDeclaration cannot be instantiated") - - @property - def signedness(self): - """ - 0 if this integer is unsigned, 1 if signed, or -1 on error. - """ - - return nbt._bt_ctf_get_int_signedness(self._fd) - - @property - def base(self): - """ - Integer's base (``int``), or a negative value on error. - """ - - return nbt._bt_ctf_get_int_base(self._fd) - - @property - def byte_order(self): - """ - Integer's byte order (one of :class:`ByteOrder` constants). - """ - - ret = nbt._bt_ctf_get_int_byte_order(self._fd) - - if ret == 1234: - return ByteOrder.BYTE_ORDER_LITTLE_ENDIAN - elif ret == 4321: - return ByteOrder.BYTE_ORDER_BIG_ENDIAN - else: - return ByteOrder.BYTE_ORDER_UNKNOWN - - @property - def length(self): - """ - Integer's length in bits, or a negative value on error. - """ - - return nbt._bt_ctf_get_int_len(self._fd) - - @property - def encoding(self): - """ - Integer's encoding (one of :class:`CTFStringEncoding` - constants). - """ - - return nbt._bt_ctf_get_encoding(self._fd) - - -class EnumerationFieldDeclaration(FieldDeclaration): - """ - Enumeration field declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("EnumerationFieldDeclaration cannot be instantiated") - - -class ArrayFieldDeclaration(FieldDeclaration): - """ - Static array field declaration. - """ - - def __init__(self): - raise NotImplementedError("ArrayFieldDeclaration cannot be instantiated") - - @property - def length(self): - """ - Static array's fixed length (number of contained elements), or - a negative value on error. - """ - - return nbt._bt_ctf_get_array_len(self._fd) - - @property - def element_declaration(self): - """ - Underlying element's field declaration. - """ - - field_decl_ptr = nbt._bt_python_get_array_element_declaration(self._fd) - - return _create_field_declaration(field_decl_ptr, "", self.scope) - - -class SequenceFieldDeclaration(FieldDeclaration): - """ - Sequence (dynamic array) field declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("SequenceFieldDeclaration cannot be instantiated") - - @property - def element_declaration(self): - """ - Underlying element's field declaration. - """ - - field_decl_ptr = nbt._bt_python_get_sequence_element_declaration(self._fd) - - return _create_field_declaration(field_decl_ptr, "", self.scope) - - -class FloatFieldDeclaration(FieldDeclaration): - """ - Floating point number field declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("FloatFieldDeclaration cannot be instantiated") - - -class StructureFieldDeclaration(FieldDeclaration): - """ - Structure (ordered map of field names to field declarations) field - declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("StructureFieldDeclaration cannot be instantiated") - - -class StringFieldDeclaration(FieldDeclaration): - """ - String (NULL-terminated array of bytes) field declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("StringFieldDeclaration cannot be instantiated") - - -class VariantFieldDeclaration(FieldDeclaration): - """ - Variant (dynamic selection between different types) field declaration. - - .. note:: - - As of this version, this class is missing some properties. - """ - - def __init__(self): - raise NotImplementedError("VariantFieldDeclaration cannot be instantiated") - - -def field_error(): - """ - Return the last error code encountered while - accessing a field and reset the error flag. - Return 0 if no error, a negative value otherwise. - """ - - return nbt._bt_ctf_field_get_error() - - -def _create_field_declaration(declaration_ptr, name, scope): - """ - Private field declaration factory. - """ - - if declaration_ptr is None: - raise ValueError("declaration_ptr must be valid") - if scope not in _scopes: - raise ValueError("Invalid scope provided") - - type = nbt._bt_ctf_field_type(declaration_ptr) - declaration = None - - if type == CTFTypeId.INTEGER: - declaration = IntegerFieldDeclaration.__new__(IntegerFieldDeclaration) - elif type == CTFTypeId.ENUM: - declaration = EnumerationFieldDeclaration.__new__(EnumerationFieldDeclaration) - elif type == CTFTypeId.ARRAY: - declaration = ArrayFieldDeclaration.__new__(ArrayFieldDeclaration) - elif type == CTFTypeId.SEQUENCE: - declaration = SequenceFieldDeclaration.__new__(SequenceFieldDeclaration) - elif type == CTFTypeId.FLOAT: - declaration = FloatFieldDeclaration.__new__(FloatFieldDeclaration) - elif type == CTFTypeId.STRUCT: - declaration = StructureFieldDeclaration.__new__(StructureFieldDeclaration) - elif type == CTFTypeId.STRING: - declaration = StringFieldDeclaration.__new__(StringFieldDeclaration) - elif type == CTFTypeId.VARIANT: - declaration = VariantFieldDeclaration.__new__(VariantFieldDeclaration) - else: - return declaration - - declaration._fd = declaration_ptr - declaration._s = scope - declaration._name = name - - return declaration - - -class _Definition: - def __init__(self, definition_ptr, scope): - self._d = definition_ptr - self._s = scope - - if scope not in _scopes: - ValueError("Invalid scope provided") - - @property - def name(self): - """Return the name of a field or None on error.""" - - return nbt._bt_ctf_field_name(self._d) - - @property - def type(self): - """Return the type of a field or -1 if unknown.""" - - return nbt._bt_ctf_field_type(nbt._bt_ctf_get_decl_from_def(self._d)) - - @property - def declaration(self): - """Return the associated Definition object.""" - - return _create_field_declaration( - nbt._bt_ctf_get_decl_from_def(self._d), self.name, self.scope) - - def _get_enum_str(self): - """ - Return the string matching the current enumeration. - Return None on error. - """ - - return nbt._bt_ctf_get_enum_str(self._d) - - def _get_array_element_at(self, index): - """ - Return the array's element at position index. - Return None on error - """ - - array_ptr = nbt._bt_python_get_array_from_def(self._d) - - if array_ptr is None: - return None - - definition_ptr = nbt._bt_array_index(array_ptr, index) - - if definition_ptr is None: - return None - - return _Definition(definition_ptr, self.scope) - - def _get_sequence_len(self): - """ - Return the len of a sequence or a negative - value on error. - """ - - seq = nbt._bt_python_get_sequence_from_def(self._d) - - return nbt._bt_sequence_len(seq) - - def _get_sequence_element_at(self, index): - """ - Return the sequence's element at position index, - otherwise return None - """ - - seq = nbt._bt_python_get_sequence_from_def(self._d) - - if seq is not None: - definition_ptr = nbt._bt_sequence_index(seq, index) - - if definition_ptr is not None: - return _Definition(definition_ptr, self.scope) - - def _get_uint64(self): - """ - Return the value associated with the field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occured, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_uint64(self._d) - - def _get_int64(self): - """ - Return the value associated with the field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occured, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_int64(self._d) - - def _get_char_array(self): - """ - Return the value associated with the field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occurred, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_char_array(self._d) - - def _get_str(self): - """ - Return the value associated with the field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occurred, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_string(self._d) - - def _get_float(self): - """ - Return the value associated with the field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occurred, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_float(self._d) - - def _get_variant(self): - """ - Return the variant's selected field. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occurred, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_variant(self._d) - - def _get_struct_field_count(self): - """ - Return the number of fields contained in the structure. - If the field does not exist or is not of the type requested, - the value returned is undefined. - """ - - return nbt._bt_ctf_get_struct_field_count(self._d) - - def _get_struct_field_at(self, i): - """ - Return the structure's field at position i. - If the field does not exist or is not of the type requested, - the value returned is undefined. To check if an error occurred, - use the field_error() function after accessing a field. - """ - - return nbt._bt_ctf_get_struct_field_index(self._d, i) - - @property - def value(self): - """ - Return the value associated with the field according to its type. - Return None on error. - """ - - id = self.type - value = None - - if id == CTFTypeId.STRING: - value = self._get_str() - elif id == CTFTypeId.ARRAY: - element_decl = self.declaration.element_declaration - - if ((element_decl.type == CTFTypeId.INTEGER - and element_decl.length == 8) - and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)): - value = nbt._bt_python_get_array_string(self._d) - else: - value = [] - - for i in range(self.declaration.length): - element = self._get_array_element_at(i) - value.append(element.value) - elif id == CTFTypeId.INTEGER: - if self.declaration.signedness == 0: - value = self._get_uint64() - else: - value = self._get_int64() - elif id == CTFTypeId.ENUM: - value = self._get_enum_str() - elif id == CTFTypeId.SEQUENCE: - element_decl = self.declaration.element_declaration - - if ((element_decl.type == CTFTypeId.INTEGER - and element_decl.length == 8) - and (element_decl.encoding == CTFStringEncoding.ASCII or element_decl.encoding == CTFStringEncoding.UTF8)): - value = nbt._bt_python_get_sequence_string(self._d) - else: - seq_len = self._get_sequence_len() - value = [] - - for i in range(seq_len): - evDef = self._get_sequence_element_at(i) - value.append(evDef.value) - elif id == CTFTypeId.FLOAT: - value = self._get_float() - elif id == CTFTypeId.VARIANT: - variant = _Definition.__new__(_Definition) - variant._d = self._get_variant() - value = variant.value - elif id == CTFTypeId.STRUCT: - value = {} - - for i in range(self._get_struct_field_count()): - member = _Definition(self._get_struct_field_at(i), self.scope) - value[member.name] = member.value - - if field_error(): - raise FieldError( - "Error occurred while accessing field {} of type {}".format( - self.name, - CTFTypeId.type_name(id))) - - return value - - @property - def scope(self): - """Return the scope of a field or None on error.""" - - return self._s - - -class CTFWriter: - # Used to compare to -1ULL in error checks - _MAX_UINT64 = 0xFFFFFFFFFFFFFFFF - - class EnumerationMapping: - """ - Enumeration mapping class. start and end values are inclusive. - """ - - def __init__(self, name, start, end): - self.name = name - self.start = start - self.end = end - - class Clock: - def __init__(self, name): - self._c = nbt._bt_ctf_clock_create(name) - - if self._c is None: - raise ValueError("Invalid clock name.") - - def __del__(self): - nbt._bt_ctf_clock_put(self._c) - - @property - def name(self): - """ - Get the clock's name. - """ - - name = nbt._bt_ctf_clock_get_name(self._c) - - if name is None: - raise ValueError("Invalid clock instance.") - - return name - - @property - def description(self): - """ - Get the clock's description. None if unset. - """ - - return nbt._bt_ctf_clock_get_description(self._c) - - @description.setter - def description(self, desc): - """ - Set the clock's description. The description appears in the clock's TSDL - meta-data. - """ - - ret = nbt._bt_ctf_clock_set_description(self._c, str(desc)) - - if ret < 0: - raise ValueError("Invalid clock description.") - - @property - def frequency(self): - """ - Get the clock's frequency (Hz). - """ - - freq = nbt._bt_ctf_clock_get_frequency(self._c) - - if freq == CTFWriter._MAX_UINT64: - raise ValueError("Invalid clock instance") - - return freq - - @frequency.setter - def frequency(self, freq): - """ - Set the clock's frequency (Hz). - """ - - ret = nbt._bt_ctf_clock_set_frequency(self._c, freq) - - if ret < 0: - raise ValueError("Invalid frequency value.") - - @property - def precision(self): - """ - Get the clock's precision (in clock ticks). - """ - - precision = nbt._bt_ctf_clock_get_precision(self._c) - - if precision == CTFWriter._MAX_UINT64: - raise ValueError("Invalid clock instance") - - return precision - - @precision.setter - def precision(self, precision): - """ - Set the clock's precision (in clock ticks). - """ - - ret = nbt._bt_ctf_clock_set_precision(self._c, precision) - - @property - def offset_seconds(self): - """ - Get the clock's offset in seconds from POSIX.1 Epoch. - """ - - offset_s = nbt._bt_ctf_clock_get_offset_s(self._c) - - if offset_s == CTFWriter._MAX_UINT64: - raise ValueError("Invalid clock instance") - - return offset_s - - @offset_seconds.setter - def offset_seconds(self, offset_s): - """ - Set the clock's offset in seconds from POSIX.1 Epoch. - """ - - ret = nbt._bt_ctf_clock_set_offset_s(self._c, offset_s) - - if ret < 0: - raise ValueError("Invalid offset value.") - - @property - def offset(self): - """ - Get the clock's offset in ticks from POSIX.1 Epoch + offset in seconds. - """ - - offset = nbt._bt_ctf_clock_get_offset(self._c) - - if offset == CTFWriter._MAX_UINT64: - raise ValueError("Invalid clock instance") - - return offset - - @offset.setter - def offset(self, offset): - """ - Set the clock's offset in ticks from POSIX.1 Epoch + offset in seconds. - """ - - ret = nbt._bt_ctf_clock_set_offset(self._c, offset) - - if ret < 0: - raise ValueError("Invalid offset value.") - - @property - def absolute(self): - """ - Get a clock's absolute attribute. A clock is absolute if the clock - is a global reference across the trace's other clocks. - """ - - is_absolute = nbt._bt_ctf_clock_get_is_absolute(self._c) - - if is_absolute == -1: - raise ValueError("Invalid clock instance") - - return False if is_absolute == 0 else True - - @absolute.setter - def absolute(self, is_absolute): - """ - Set a clock's absolute attribute. A clock is absolute if the clock - is a global reference across the trace's other clocks. - """ - - ret = nbt._bt_ctf_clock_set_is_absolute(self._c, int(is_absolute)) - - if ret < 0: - raise ValueError("Could not set the clock's absolute attribute.") - - @property - def uuid(self): - """ - Get a clock's UUID (an object of type UUID). - """ - - uuid_list = [] - - for i in range(16): - ret, value = nbt._bt_python_ctf_clock_get_uuid_index(self._c, i) - - if ret < 0: - raise ValueError("Invalid clock instance") - - uuid_list.append(value) - - return UUID(bytes=bytes(uuid_list)) - - @uuid.setter - def uuid(self, uuid): - """ - Set a clock's UUID (an object of type UUID). - """ - - uuid_bytes = uuid.bytes - - if len(uuid_bytes) != 16: - raise ValueError("Invalid UUID provided. UUID length must be 16 bytes") - - for i in range(len(uuid_bytes)): - ret = nbt._bt_python_ctf_clock_set_uuid_index(self._c, i, - uuid_bytes[i]) - - if ret < 0: - raise ValueError("Invalid clock instance") - - @property - def time(self): - """ - Get the current time in nanoseconds since the clock's origin (offset and - offset_s attributes). - """ - - time = nbt._bt_ctf_clock_get_time(self._c) - - if time == CTFWriter._MAX_UINT64: - raise ValueError("Invalid clock instance") - - return time - - @time.setter - def time(self, time): - """ - Set the current time in nanoseconds since the clock's origin (offset and - offset_s attributes). The clock's value will be sampled as events are - appended to a stream. - """ - - ret = nbt._bt_ctf_clock_set_time(self._c, time) - - if ret < 0: - raise ValueError("Invalid time value.") - - class FieldDeclaration: - """ - FieldDeclaration should not be instantiated directly. Instantiate - one of the concrete FieldDeclaration classes. - """ - - class IntegerBase: - # These values are based on the bt_ctf_integer_base enum - # declared in event-types.h. - INTEGER_BASE_UNKNOWN = -1 - INTEGER_BASE_BINARY = 2 - INTEGER_BASE_OCTAL = 8 - INTEGER_BASE_DECIMAL = 10 - INTEGER_BASE_HEXADECIMAL = 16 - - def __init__(self): - if self._ft is None: - raise ValueError("FieldDeclaration creation failed.") - - def __del__(self): - nbt._bt_ctf_field_type_put(self._ft) - - @staticmethod - def _create_field_declaration_from_native_instance( - native_field_declaration): - type_dict = { - CTFTypeId.INTEGER: CTFWriter.IntegerFieldDeclaration, - CTFTypeId.FLOAT: CTFWriter.FloatFieldDeclaration, - CTFTypeId.ENUM: CTFWriter.EnumerationFieldDeclaration, - CTFTypeId.STRING: CTFWriter.StringFieldDeclaration, - CTFTypeId.STRUCT: CTFWriter.StructureFieldDeclaration, - CTFTypeId.VARIANT: CTFWriter.VariantFieldDeclaration, - CTFTypeId.ARRAY: CTFWriter.ArrayFieldDeclaration, - CTFTypeId.SEQUENCE: CTFWriter.SequenceFieldDeclaration - } - - field_type_id = nbt._bt_ctf_field_type_get_type_id(native_field_declaration) - - if field_type_id == CTFTypeId.UNKNOWN: - raise TypeError("Invalid field instance") - - declaration = CTFWriter.Field.__new__(CTFWriter.Field) - declaration._ft = native_field_declaration - declaration.__class__ = type_dict[field_type_id] - - return declaration - - @property - def alignment(self): - """ - Get the field declaration's alignment. Returns -1 on error. - """ - - return nbt._bt_ctf_field_type_get_alignment(self._ft) - - @alignment.setter - def alignment(self, alignment): - """ - Set the field declaration's alignment. Defaults to 1 (bit-aligned). However, - some types, such as structures and string, may impose other alignment - constraints. - """ - - ret = nbt._bt_ctf_field_type_set_alignment(self._ft, alignment) - - if ret < 0: - raise ValueError("Invalid alignment value.") - - @property - def byte_order(self): - """ - Get the field declaration's byte order. One of the ByteOrder's constant. - """ - - return nbt._bt_ctf_field_type_get_byte_order(self._ft) - - @byte_order.setter - def byte_order(self, byte_order): - """ - Set the field declaration's byte order. Use constants defined in the ByteOrder - class. - """ - - ret = nbt._bt_ctf_field_type_set_byte_order(self._ft, byte_order) - - if ret < 0: - raise ValueError("Could not set byte order value.") - - class IntegerFieldDeclaration(FieldDeclaration): - def __init__(self, size): - """ - Create a new integer field declaration of the given size. - """ - self._ft = nbt._bt_ctf_field_type_integer_create(size) - super().__init__() - - @property - def size(self): - """ - Get an integer's size. - """ - - ret = nbt._bt_ctf_field_type_integer_get_size(self._ft) - - if ret < 0: - raise ValueError("Could not get Integer's size attribute.") - else: - return ret - - @property - def signed(self): - """ - Get an integer's signedness attribute. - """ - - ret = nbt._bt_ctf_field_type_integer_get_signed(self._ft) - - if ret < 0: - raise ValueError("Could not get Integer's signed attribute.") - elif ret > 0: - return True - else: - return False - - @signed.setter - def signed(self, signed): - """ - Set an integer's signedness attribute. - """ - - ret = nbt._bt_ctf_field_type_integer_set_signed(self._ft, signed) - - if ret < 0: - raise ValueError("Could not set Integer's signed attribute.") - - @property - def base(self): - """ - Get the integer's base used to pretty-print the resulting trace. - Returns a constant from the FieldDeclaration.IntegerBase class. - """ - - return nbt._bt_ctf_field_type_integer_get_base(self._ft) - - @base.setter - def base(self, base): - """ - Set the integer's base used to pretty-print the resulting trace. - The base must be a constant of the FieldDeclarationIntegerBase class. - """ - - ret = nbt._bt_ctf_field_type_integer_set_base(self._ft, base) - - if ret < 0: - raise ValueError("Could not set Integer's base.") - - @property - def encoding(self): - """ - Get the integer's encoding (one of the constants of the - CTFStringEncoding class). - Returns a constant from the CTFStringEncoding class. - """ - - return nbt._bt_ctf_field_type_integer_get_encoding(self._ft) - - @encoding.setter - def encoding(self, encoding): - """ - An integer encoding may be set to signal that the integer must be printed - as a text character. Must be a constant from the CTFStringEncoding class. - """ - - ret = nbt._bt_ctf_field_type_integer_set_encoding(self._ft, encoding) - - if ret < 0: - raise ValueError("Could not set Integer's encoding.") - - class EnumerationFieldDeclaration(FieldDeclaration): - def __init__(self, integer_type): - """ - Create a new enumeration field declaration with the given underlying container type. - """ - isinst = isinstance(integer_type, CTFWriter.IntegerFieldDeclaration) - - if integer_type is None or not isinst: - raise TypeError("Invalid integer container.") - - self._ft = nbt._bt_ctf_field_type_enumeration_create(integer_type._ft) - super().__init__() - - @property - def container(self): - """ - Get the enumeration's underlying container type. - """ - - ret = nbt._bt_ctf_field_type_enumeration_get_container_type(self._ft) - - if ret is None: - raise TypeError("Invalid enumeration declaration") - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret) - - def add_mapping(self, name, range_start, range_end): - """ - Add a mapping to the enumeration. The range's values are inclusive. - """ - - if range_start < 0 or range_end < 0: - ret = nbt._bt_ctf_field_type_enumeration_add_mapping(self._ft, - str(name), - range_start, - range_end) - else: - ret = nbt._bt_ctf_field_type_enumeration_add_mapping_unsigned(self._ft, - str(name), - range_start, - range_end) - - if ret < 0: - raise ValueError("Could not add mapping to enumeration declaration.") - - @property - def mappings(self): - """ - Generator returning instances of EnumerationMapping. - """ - - signed = self.container.signed - - count = nbt._bt_ctf_field_type_enumeration_get_mapping_count(self._ft) - - for i in range(count): - if signed: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, i) - else: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, i) - - if len(ret) != 3: - msg = "Could not get Enumeration mapping at index {}".format(i) - raise TypeError(msg) - - name, range_start, range_end = ret - yield CTFWriter.EnumerationMapping(name, range_start, range_end) - - def get_mapping_by_name(self, name): - """ - Get a mapping by name (EnumerationMapping). - """ - - index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_name(self._ft, name) - - if index < 0: - return None - - if self.container.signed: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index) - else: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index) - - if len(ret) != 3: - msg = "Could not get Enumeration mapping at index {}".format(i) - raise TypeError(msg) - - name, range_start, range_end = ret - - return CTFWriter.EnumerationMapping(name, range_start, range_end) - - def get_mapping_by_value(self, value): - """ - Get a mapping by value (EnumerationMapping). - """ - - if value < 0: - index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_value(self._ft, value) - else: - index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_unsigned_value(self._ft, value) - - if index < 0: - return None - - if self.container.signed: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index) - else: - ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index) - - if len(ret) != 3: - msg = "Could not get Enumeration mapping at index {}".format(i) - raise TypeError(msg) - - name, range_start, range_end = ret - - return CTFWriter.EnumerationMapping(name, range_start, range_end) - - class FloatFieldDeclaration(FieldDeclaration): - FLT_EXP_DIG = 8 - DBL_EXP_DIG = 11 - FLT_MANT_DIG = 24 - DBL_MANT_DIG = 53 - - def __init__(self): - """ - Create a new floating point field declaration. - """ - - self._ft = nbt._bt_ctf_field_type_floating_point_create() - super().__init__() - - @property - def exponent_digits(self): - """ - Get the number of exponent digits used to store the floating point field. - """ - - ret = nbt._bt_ctf_field_type_floating_point_get_exponent_digits(self._ft) - - if ret < 0: - raise TypeError( - "Could not get Floating point exponent digit count") - - return ret - - @exponent_digits.setter - def exponent_digits(self, exponent_digits): - """ - Set the number of exponent digits to use to store the floating point field. - The only values currently supported are FLT_EXP_DIG and DBL_EXP_DIG which - are defined as constants of this class. - """ - - ret = nbt._bt_ctf_field_type_floating_point_set_exponent_digits(self._ft, - exponent_digits) - - if ret < 0: - raise ValueError("Could not set exponent digit count.") - - @property - def mantissa_digits(self): - """ - Get the number of mantissa digits used to store the floating point field. - """ - - ret = nbt._bt_ctf_field_type_floating_point_get_mantissa_digits(self._ft) - - if ret < 0: - raise TypeError("Could not get Floating point mantissa digit count") - - return ret - - @mantissa_digits.setter - def mantissa_digits(self, mantissa_digits): - """ - Set the number of mantissa digits to use to store the floating point field. - The only values currently supported are FLT_MANT_DIG and DBL_MANT_DIG which - are defined as constants of this class. - """ - - ret = nbt._bt_ctf_field_type_floating_point_set_mantissa_digits(self._ft, - mantissa_digits) - - if ret < 0: - raise ValueError("Could not set mantissa digit count.") - - class FloatingPointFieldDeclaration(FloatFieldDeclaration): - pass - - class StructureFieldDeclaration(FieldDeclaration): - def __init__(self): - """ - Create a new structure field declaration. - """ - - self._ft = nbt._bt_ctf_field_type_structure_create() - super().__init__() - - def add_field(self, field_type, field_name): - """ - Add a field of type "field_type" to the structure. - """ - - ret = nbt._bt_ctf_field_type_structure_add_field(self._ft, - field_type._ft, - str(field_name)) - - if ret < 0: - raise ValueError("Could not add field to structure.") - - @property - def fields(self): - """ - Generator returning the structure's field as tuples of (field name, field declaration). - """ - - count = nbt._bt_ctf_field_type_structure_get_field_count(self._ft) - - if count < 0: - raise TypeError("Could not get Structure field count") - - for i in range(count): - field_name = nbt._bt_python_ctf_field_type_structure_get_field_name(self._ft, i) - - if field_name is None: - msg = "Could not get Structure field name at index {}".format(i) - raise TypeError(msg) - - field_type_native = nbt._bt_python_ctf_field_type_structure_get_field_type(self._ft, i) - - if field_type_native is None: - msg = "Could not get Structure field type at index {}".format(i) - raise TypeError(msg) - - field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - yield (field_name, field_type) - - def get_field_by_name(self, name): - """ - Get a field declaration by name (FieldDeclaration). - """ - - field_type_native = nbt._bt_ctf_field_type_structure_get_field_type_by_name(self._ft, name) - - if field_type_native is None: - msg = "Could not find Structure field with name {}".format(name) - raise TypeError(msg) - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - - class VariantFieldDeclaration(FieldDeclaration): - def __init__(self, enum_tag, tag_name): - """ - Create a new variant field declaration. - """ - - isinst = isinstance(enum_tag, CTFWriter.EnumerationFieldDeclaration) - if enum_tag is None or not isinst: - raise TypeError("Invalid tag type; must be of type EnumerationFieldDeclaration.") - - self._ft = nbt._bt_ctf_field_type_variant_create(enum_tag._ft, - str(tag_name)) - super().__init__() - - @property - def tag_name(self): - """ - Get the variant's tag name. - """ - - ret = nbt._bt_ctf_field_type_variant_get_tag_name(self._ft) - - if ret is None: - raise TypeError("Could not get Variant tag name") - - return ret - - @property - def tag_type(self): - """ - Get the variant's tag type. - """ - - ret = nbt._bt_ctf_field_type_variant_get_tag_type(self._ft) - - if ret is None: - raise TypeError("Could not get Variant tag type") - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret) - - def add_field(self, field_type, field_name): - """ - Add a field of type "field_type" to the variant. - """ - - ret = nbt._bt_ctf_field_type_variant_add_field(self._ft, - field_type._ft, - str(field_name)) - - if ret < 0: - raise ValueError("Could not add field to variant.") - - @property - def fields(self): - """ - Generator returning the variant's field as tuples of (field name, field declaration). - """ - - count = nbt._bt_ctf_field_type_variant_get_field_count(self._ft) - - if count < 0: - raise TypeError("Could not get Variant field count") - - for i in range(count): - field_name = nbt._bt_python_ctf_field_type_variant_get_field_name(self._ft, i) - - if field_name is None: - msg = "Could not get Variant field name at index {}".format(i) - raise TypeError(msg) - - field_type_native = nbt._bt_python_ctf_field_type_variant_get_field_type(self._ft, i) - - if field_type_native is None: - msg = "Could not get Variant field type at index {}".format(i) - raise TypeError(msg) - - field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - yield (field_name, field_type) - - def get_field_by_name(self, name): - """ - Get a field declaration by name (FieldDeclaration). - """ - - field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_by_name(self._ft, - name) - - if field_type_native is None: - msg = "Could not find Variant field with name {}".format(name) - raise TypeError(msg) - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - - def get_field_from_tag(self, tag): - """ - Get a field declaration from tag (EnumerationField). - """ - - field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_from_tag(self._ft, tag._f) - - if field_type_native is None: - msg = "Could not find Variant field with tag value {}".format(tag.value) - raise TypeError(msg) - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - - class ArrayFieldDeclaration(FieldDeclaration): - def __init__(self, element_type, length): - """ - Create a new array field declaration. - """ - - self._ft = nbt._bt_ctf_field_type_array_create(element_type._ft, - length) - super().__init__() - - @property - def element_type(self): - """ - Get the array's element type. - """ - - ret = nbt._bt_ctf_field_type_array_get_element_type(self._ft) - - if ret is None: - raise TypeError("Could not get Array element type") - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret) - - @property - def length(self): - """ - Get the array's length. - """ - - ret = nbt._bt_ctf_field_type_array_get_length(self._ft) - - if ret < 0: - raise TypeError("Could not get Array length") - - return ret - - class SequenceFieldDeclaration(FieldDeclaration): - def __init__(self, element_type, length_field_name): - """ - Create a new sequence field declaration. - """ - - self._ft = nbt._bt_ctf_field_type_sequence_create(element_type._ft, - str(length_field_name)) - super().__init__() - - @property - def element_type(self): - """ - Get the sequence's element type. - """ - - ret = nbt._bt_ctf_field_type_sequence_get_element_type(self._ft) - - if ret is None: - raise TypeError("Could not get Sequence element type") - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(ret) - - @property - def length_field_name(self): - """ - Get the sequence's length field name. - """ - - ret = nbt._bt_ctf_field_type_sequence_get_length_field_name(self._ft) - - if ret is None: - raise TypeError("Could not get Sequence length field name") - - return ret - - class StringFieldDeclaration(FieldDeclaration): - def __init__(self): - """ - Create a new string field declaration. - """ - - self._ft = nbt._bt_ctf_field_type_string_create() - super().__init__() - - @property - def encoding(self): - """ - Get a string declaration's encoding (a constant from the CTFStringEncoding class). - """ - - return nbt._bt_ctf_field_type_string_get_encoding(self._ft) - - @encoding.setter - def encoding(self, encoding): - """ - Set a string declaration's encoding. Must be a constant from the CTFStringEncoding class. - """ - - ret = nbt._bt_ctf_field_type_string_set_encoding(self._ft, encoding) - if ret < 0: - raise ValueError("Could not set string encoding.") - - @staticmethod - def create_field(field_type): - """ - Create an instance of a field. - """ - isinst = isinstance(field_type, CTFWriter.FieldDeclaration) - - if field_type is None or not isinst: - raise TypeError("Invalid field_type. Type must be a FieldDeclaration-derived class.") - - if isinstance(field_type, CTFWriter.IntegerFieldDeclaration): - return CTFWriter.IntegerField(field_type) - elif isinstance(field_type, CTFWriter.EnumerationFieldDeclaration): - return CTFWriter.EnumerationField(field_type) - elif isinstance(field_type, CTFWriter.FloatFieldDeclaration): - return CTFWriter.FloatingPointField(field_type) - elif isinstance(field_type, CTFWriter.StructureFieldDeclaration): - return CTFWriter.StructureField(field_type) - elif isinstance(field_type, CTFWriter.VariantFieldDeclaration): - return CTFWriter.VariantField(field_type) - elif isinstance(field_type, CTFWriter.ArrayFieldDeclaration): - return CTFWriter.ArrayField(field_type) - elif isinstance(field_type, CTFWriter.SequenceFieldDeclaration): - return CTFWriter.SequenceField(field_type) - elif isinstance(field_type, CTFWriter.StringFieldDeclaration): - return CTFWriter.StringField(field_type) - - class Field: - """ - Base class, do not instantiate. - """ - - def __init__(self, field_type): - if not isinstance(field_type, CTFWriter.FieldDeclaration): - raise TypeError("Invalid field_type argument.") - - self._f = nbt._bt_ctf_field_create(field_type._ft) - - if self._f is None: - raise ValueError("Field creation failed.") - - def __del__(self): - nbt._bt_ctf_field_put(self._f) - - @staticmethod - def _create_field_from_native_instance(native_field_instance): - type_dict = { - CTFTypeId.INTEGER: CTFWriter.IntegerField, - CTFTypeId.FLOAT: CTFWriter.FloatingPointField, - CTFTypeId.ENUM: CTFWriter.EnumerationField, - CTFTypeId.STRING: CTFWriter.StringField, - CTFTypeId.STRUCT: CTFWriter.StructureField, - CTFTypeId.VARIANT: CTFWriter.VariantField, - CTFTypeId.ARRAY: CTFWriter.ArrayField, - CTFTypeId.SEQUENCE: CTFWriter.SequenceField - } - - field_type = nbt._bt_python_get_field_type(native_field_instance) - - if field_type == CTFTypeId.UNKNOWN: - raise TypeError("Invalid field instance") - - field = CTFWriter.Field.__new__(CTFWriter.Field) - field._f = native_field_instance - field.__class__ = type_dict[field_type] - - return field - - @property - def declaration(self): - native_field_type = nbt._bt_ctf_field_get_type(self._f) - - if native_field_type is None: - raise TypeError("Invalid field instance") - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance( - native_field_type) - - class IntegerField(Field): - @property - def value(self): - """ - Get an integer field's value. - """ - - signedness = nbt._bt_python_field_integer_get_signedness(self._f) - - if signedness < 0: - raise TypeError("Invalid integer instance.") - - if signedness == 0: - ret, value = nbt._bt_ctf_field_unsigned_integer_get_value(self._f) - else: - ret, value = nbt._bt_ctf_field_signed_integer_get_value(self._f) - - if ret < 0: - raise ValueError("Could not get integer field value.") - - return value - - @value.setter - def value(self, value): - """ - Set an integer field's value. - """ - - if not isinstance(value, int): - raise TypeError("IntegerField's value must be an int") - - signedness = nbt._bt_python_field_integer_get_signedness(self._f) - if signedness < 0: - raise TypeError("Invalid integer instance.") - - if signedness == 0: - ret = nbt._bt_ctf_field_unsigned_integer_set_value(self._f, value) - else: - ret = nbt._bt_ctf_field_signed_integer_set_value(self._f, value) - - if ret < 0: - raise ValueError("Could not set integer field value.") - - class EnumerationField(Field): - @property - def container(self): - """ - Return the enumeration's underlying container field (an integer field). - """ - - container = CTFWriter.IntegerField.__new__(CTFWriter.IntegerField) - container._f = nbt._bt_ctf_field_enumeration_get_container(self._f) - - if container._f is None: - raise TypeError("Invalid enumeration field type.") - - return container - - @property - def value(self): - """ - Get the enumeration field's mapping name. - """ - - value = nbt._bt_ctf_field_enumeration_get_mapping_name(self._f) - - if value is None: - raise ValueError("Could not get enumeration's mapping name.") - - return value - - @value.setter - def value(self, value): - """ - Set the enumeration field's value. Must be an integer as mapping names - may be ambiguous. - """ - - if not isinstance(value, int): - raise TypeError("EnumerationField value must be an int") - - self.container.value = value - - class FloatingPointField(Field): - @property - def value(self): - """ - Get a floating point field's value. - """ - - ret, value = nbt._bt_ctf_field_floating_point_get_value(self._f) - - if ret < 0: - raise ValueError("Could not get floating point field value.") - - return value - - @value.setter - def value(self, value): - """ - Set a floating point field's value. - """ - - if not isinstance(value, int) and not isinstance(value, float): - raise TypeError("Value must be either a float or an int") - - ret = nbt._bt_ctf_field_floating_point_set_value(self._f, float(value)) - - if ret < 0: - raise ValueError("Could not set floating point field value.") - - # oops!! This class is provided to ensure backward-compatibility since - # a stable release publicly exposed this abomination. - class FloatFieldingPoint(FloatingPointField): - pass - - class StructureField(Field): - def field(self, field_name): - """ - Get the structure's field corresponding to the provided field name. - """ - - native_instance = nbt._bt_ctf_field_structure_get_field(self._f, - str(field_name)) - - if native_instance is None: - raise ValueError("Invalid field_name provided.") - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - class VariantField(Field): - def field(self, tag): - """ - Return the variant's selected field. The "tag" field is the selector enum field. - """ - - native_instance = nbt._bt_ctf_field_variant_get_field(self._f, tag._f) - - if native_instance is None: - raise ValueError("Invalid tag provided.") - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - class ArrayField(Field): - def field(self, index): - """ - Return the array's field at position "index". - """ - - native_instance = nbt._bt_ctf_field_array_get_field(self._f, index) - - if native_instance is None: - raise IndexError("Invalid index provided.") - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - class SequenceField(Field): - @property - def length(self): - """ - Get the sequence's length field (IntegerField). - """ - - native_instance = nbt._bt_ctf_field_sequence_get_length(self._f) - - if native_instance is None: - length = -1 - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - @length.setter - def length(self, length_field): - """ - Set the sequence's length field (IntegerField). - """ - - if not isinstance(length_field, CTFWriter.IntegerField): - raise TypeError("Invalid length field.") - - if length_field.declaration.signed: - raise TypeError("Sequence field length must be unsigned") - - ret = nbt._bt_ctf_field_sequence_set_length(self._f, length_field._f) - - if ret < 0: - raise ValueError("Could not set sequence length.") - - def field(self, index): - """ - Return the sequence's field at position "index". - """ - - native_instance = nbt._bt_ctf_field_sequence_get_field(self._f, index) - - if native_instance is None: - raise ValueError("Could not get sequence element at index.") - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - class StringField(Field): - @property - def value(self): - """ - Get a string field's value. - """ - - return nbt._bt_ctf_field_string_get_value(self._f) - - @value.setter - def value(self, value): - """ - Set a string field's value. - """ - - ret = nbt._bt_ctf_field_string_set_value(self._f, str(value)) - - if ret < 0: - raise ValueError("Could not set string field value.") - - class EventClass: - def __init__(self, name): - """ - Create a new event class of the given name. - """ - - self._ec = nbt._bt_ctf_event_class_create(name) - - if self._ec is None: - raise ValueError("Event class creation failed.") - - def __del__(self): - nbt._bt_ctf_event_class_put(self._ec) - - def add_field(self, field_type, field_name): - """ - Add a field of type "field_type" to the event class. - """ - - ret = nbt._bt_ctf_event_class_add_field(self._ec, field_type._ft, - str(field_name)) - - if ret < 0: - raise ValueError("Could not add field to event class.") - - @property - def name(self): - """ - Get the event class' name. - """ - - name = nbt._bt_ctf_event_class_get_name(self._ec) - - if name is None: - raise TypeError("Could not get EventClass name") - - return name - - @property - def id(self): - """ - Get the event class' id. Returns a negative value if unset. - """ - - id = nbt._bt_ctf_event_class_get_id(self._ec) - - if id < 0: - raise TypeError("Could not get EventClass id") - - return id - - @id.setter - def id(self, id): - """ - Set the event class' id. Throws a TypeError if the event class - is already registered to a stream class. - """ - - ret = nbt._bt_ctf_event_class_set_id(self._ec, id) - - if ret < 0: - raise TypeError("Can't change an Event Class's id after it has been assigned to a stream class") - - @property - def stream_class(self): - """ - Get the event class' stream class. Returns None if unset. - """ - stream_class_native = nbt._bt_ctf_event_class_get_stream_class(self._ec) - - if stream_class_native is None: - return None - - stream_class = CTFWriter.StreamClass.__new__(CTFWriter.StreamClass) - stream_class._sc = stream_class_native - - return stream_class - - @property - def fields(self): - """ - Generator returning the event class' fields as tuples of (field name, field declaration). - """ - - count = nbt._bt_ctf_event_class_get_field_count(self._ec) - - if count < 0: - raise TypeError("Could not get EventClass' field count") - - for i in range(count): - field_name = nbt._bt_python_ctf_event_class_get_field_name(self._ec, i) - - if field_name is None: - msg = "Could not get EventClass' field name at index {}".format(i) - raise TypeError(msg) - - field_type_native = nbt._bt_python_ctf_event_class_get_field_type(self._ec, i) - - if field_type_native is None: - msg = "Could not get EventClass' field type at index {}".format(i) - raise TypeError(msg) - - field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - yield (field_name, field_type) - - def get_field_by_name(self, name): - """ - Get a field declaration by name (FieldDeclaration). - """ - - field_type_native = nbt._bt_ctf_event_class_get_field_by_name(self._ec, name) - - if field_type_native is None: - msg = "Could not find EventClass field with name {}".format(name) - raise TypeError(msg) - - return CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - - class Event: - def __init__(self, event_class): - """ - Create a new event of the given event class. - """ - - if not isinstance(event_class, CTFWriter.EventClass): - raise TypeError("Invalid event_class argument.") - - self._e = nbt._bt_ctf_event_create(event_class._ec) - - if self._e is None: - raise ValueError("Event creation failed.") - - def __del__(self): - nbt._bt_ctf_event_put(self._e) - - @property - def event_class(self): - """ - Get the event's class. - """ - - event_class_native = nbt._bt_ctf_event_get_class(self._e) - - if event_class_native is None: - return None - - event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass) - event_class._ec = event_class_native - - return event_class - - def clock(self): - """ - Get a clock from event. Returns None if the event's class - is not registered to a stream class. - """ - - clock_instance = nbt._bt_ctf_event_get_clock(self._e) - - if clock_instance is None: - return None - - clock = CTFWriter.Clock.__new__(CTFWriter.Clock) - clock._c = clock_instance - - return clock - - def payload(self, field_name): - """ - Get a field from event. - """ - - native_instance = nbt._bt_ctf_event_get_payload(self._e, - str(field_name)) - - if native_instance is None: - raise ValueError("Could not get event payload.") - - return CTFWriter.Field._create_field_from_native_instance(native_instance) - - def set_payload(self, field_name, value_field): - """ - Set a manually created field as an event's payload. - """ - - if not isinstance(value, CTFWriter.Field): - raise TypeError("Invalid value type.") - - ret = nbt._bt_ctf_event_set_payload(self._e, str(field_name), - value_field._f) - - if ret < 0: - raise ValueError("Could not set event field payload.") - - class StreamClass: - def __init__(self, name): - """ - Create a new stream class of the given name. - """ - - self._sc = nbt._bt_ctf_stream_class_create(name) - - if self._sc is None: - raise ValueError("Stream class creation failed.") - - def __del__(self): - nbt._bt_ctf_stream_class_put(self._sc) - - @property - def name(self): - """ - Get a stream class' name. - """ - - name = nbt._bt_ctf_stream_class_get_name(self._sc) - - if name is None: - raise TypeError("Could not get StreamClass name") - - return name - - @property - def clock(self): - """ - Get a stream class' clock. - """ - - clock_instance = nbt._bt_ctf_stream_class_get_clock(self._sc) - - if clock_instance is None: - return None - - clock = CTFWriter.Clock.__new__(CTFWriter.Clock) - clock._c = clock_instance - - return clock - - @clock.setter - def clock(self, clock): - """ - Assign a clock to a stream class. - """ - - if not isinstance(clock, CTFWriter.Clock): - raise TypeError("Invalid clock type.") - - ret = nbt._bt_ctf_stream_class_set_clock(self._sc, clock._c) - - if ret < 0: - raise ValueError("Could not set stream class clock.") - - @property - def id(self): - """ - Get a stream class' id. - """ - - ret = nbt._bt_ctf_stream_class_get_id(self._sc) - - if ret < 0: - raise TypeError("Could not get StreamClass id") - - return ret - - @id.setter - def id(self, id): - """ - Assign an id to a stream class. - """ - - ret = nbt._bt_ctf_stream_class_set_id(self._sc, id) - - if ret < 0: - raise TypeError("Could not set stream class id.") - - @property - def event_classes(self): - """ - Generator returning the stream class' event classes. - """ - - count = nbt._bt_ctf_stream_class_get_event_class_count(self._sc) - - if count < 0: - raise TypeError("Could not get StreamClass' event class count") - - for i in range(count): - event_class_native = nbt._bt_ctf_stream_class_get_event_class(self._sc, i) - - if event_class_native is None: - msg = "Could not get StreamClass' event class at index {}".format(i) - raise TypeError(msg) - - event_class = CTFWriter.EventClass.__new__(CTFWriter.EventClass) - event_class._ec = event_class_native - yield event_class - - def add_event_class(self, event_class): - """ - Add an event class to a stream class. New events can be added even after a - stream has been instantiated and events have been appended. However, a stream - will not accept events of a class that has not been added to the stream - class beforehand. - """ - - if not isinstance(event_class, CTFWriter.EventClass): - raise TypeError("Invalid event_class type.") - - ret = nbt._bt_ctf_stream_class_add_event_class(self._sc, - event_class._ec) - - if ret < 0: - raise ValueError("Could not add event class.") - - @property - def packet_context_type(self): - """ - Get the StreamClass' packet context type (StructureFieldDeclaration) - """ - - field_type_native = nbt._bt_ctf_stream_class_get_packet_context_type(self._sc) - - if field_type_native is None: - raise ValueError("Invalid StreamClass") - - field_type = CTFWriter.FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) - - return field_type - - @packet_context_type.setter - def packet_context_type(self, field_type): - """ - Set a StreamClass' packet context type. Must be of type - StructureFieldDeclaration. - """ - - if not isinstance(field_type, CTFWriter.StructureFieldDeclaration): - raise TypeError("field_type argument must be of type StructureFieldDeclaration.") - - ret = nbt._bt_ctf_stream_class_set_packet_context_type(self._sc, - field_type._ft) - - if ret < 0: - raise ValueError("Failed to set packet context type.") - - class Stream: - def __init__(self): - raise NotImplementedError("Stream cannot be instantiated; use Writer.create_stream()") - - def __del__(self): - nbt._bt_ctf_stream_put(self._s) - - @property - def discarded_events(self): - """ - Get a stream's discarded event count. - """ - - ret, count = nbt._bt_ctf_stream_get_discarded_events_count(self._s) - - if ret < 0: - raise ValueError("Could not get the stream's discarded events count") - - return count - - def append_discarded_events(self, event_count): - """ - Increase the current packet's discarded event count. - """ - - nbt._bt_ctf_stream_append_discarded_events(self._s, event_count) - - def append_event(self, event): - """ - Append "event" to the stream's current packet. The stream's associated clock - will be sampled during this call. The event shall not be modified after - being appended to a stream. - """ - - ret = nbt._bt_ctf_stream_append_event(self._s, event._e) - - if ret < 0: - raise ValueError("Could not append event to stream.") - - @property - def packet_context(self): - """ - Get a Stream's packet context field (a StructureField). - """ - - native_field = nbt._bt_ctf_stream_get_packet_context(self._s) - - if native_field is None: - raise ValueError("Invalid Stream.") - - return CTFWriter.Field._create_field_from_native_instance(native_field) - - @packet_context.setter - def packet_context(self, field): - """ - Set a Stream's packet context field (must be a StructureField). - """ - - if not isinstance(field, CTFWriter.StructureField): - raise TypeError("Argument field must be of type StructureField") - - ret = nbt._bt_ctf_stream_set_packet_context(self._s, field._f) - - if ret < 0: - raise ValueError("Invalid packet context field.") - - def flush(self): - """ - The stream's current packet's events will be flushed to disk. Events - subsequently appended to the stream will be added to a new packet. - """ - - ret = nbt._bt_ctf_stream_flush(self._s) - - if ret < 0: - raise ValueError("Could not flush stream.") - - class Writer: - def __init__(self, path): - """ - Create a new writer that will produce a trace in the given path. - """ - - self._w = nbt._bt_ctf_writer_create(path) - - if self._w is None: - raise ValueError("Writer creation failed.") - - def __del__(self): - nbt._bt_ctf_writer_put(self._w) - - def create_stream(self, stream_class): - """ - Create a new stream instance and register it to the writer. - """ - - if not isinstance(stream_class, CTFWriter.StreamClass): - raise TypeError("Invalid stream_class type.") - - stream = CTFWriter.Stream.__new__(CTFWriter.Stream) - stream._s = nbt._bt_ctf_writer_create_stream(self._w, stream_class._sc) - - return stream - - def add_environment_field(self, name, value): - """ - Add an environment field to the trace. - """ - - ret = nbt._bt_ctf_writer_add_environment_field(self._w, str(name), - str(value)) - - if ret < 0: - raise ValueError("Could not add environment field to trace.") - - def add_clock(self, clock): - """ - Add a clock to the trace. Clocks assigned to stream classes must be - registered to the writer. - """ - - ret = nbt._bt_ctf_writer_add_clock(self._w, clock._c) - - if ret < 0: - raise ValueError("Could not add clock to Writer.") - - @property - def metadata(self): - """ - Get the trace's TSDL meta-data. - """ - - return nbt._bt_ctf_writer_get_metadata_string(self._w) - - def flush_metadata(self): - """ - Flush the trace's metadata to the metadata file. - """ - - nbt._bt_ctf_writer_flush_metadata(self._w) - - @property - def byte_order(self): - """ - Get the trace's byte order. Must be a constant from the ByteOrder - class. - """ - - raise NotImplementedError("Getter not implemented.") - - @byte_order.setter - def byte_order(self, byte_order): - """ - Set the trace's byte order. Must be a constant from the ByteOrder - class. Defaults to the host machine's endianness - """ - - ret = nbt._bt_ctf_writer_set_byte_order(self._w, byte_order) - - if ret < 0: - raise ValueError("Could not set trace's byte order.") diff --git a/bindings/python/common.py b/bindings/python/common.py new file mode 100644 index 00000000..c045d3e4 --- /dev/null +++ b/bindings/python/common.py @@ -0,0 +1,168 @@ +# common.py +# +# Babeltrace Python module common definitions +# +# Copyright 2012-2015 EfficiOS Inc. +# +# Author: Danny Serres +# Author: Jérémie Galarneau +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +class CTFStringEncoding: + """ + CTF string encodings. + """ + + #: None + NONE = 0 + + #: UTF-8 + UTF8 = 1 + + #: ASCII + ASCII = 2 + + #: Unknown + UNKNOWN = 3 + + +# Based on the enum in ctf-writer/writer.h +class ByteOrder: + """ + Byte orders. + """ + + #: Native byte order + BYTE_ORDER_NATIVE = 0 + + #: Little-endian + BYTE_ORDER_LITTLE_ENDIAN = 1 + + #: Big-endian + BYTE_ORDER_BIG_ENDIAN = 2 + + #: Network byte order (big-endian) + BYTE_ORDER_NETWORK = 3 + + #: Unknown byte order + BYTE_ORDER_UNKNOWN = 4 # Python-specific entry + + +# enum equivalent, accessible constants +# These are taken directly from ctf/events.h +# All changes to enums must also be made here +class CTFTypeId: + """ + CTF numeric type identifiers. + """ + + #: Unknown type + UNKNOWN = 0 + + #: Integer + INTEGER = 1 + + #: Floating point number + FLOAT = 2 + + #: Enumeration + ENUM = 3 + + #: String + STRING = 4 + + #: Structure + STRUCT = 5 + + #: Untagged variant + UNTAGGED_VARIANT = 6 + + #: Variant + VARIANT = 7 + + #: Array + ARRAY = 8 + + #: Sequence + SEQUENCE = 9 + + NR_CTF_TYPES = 10 + + def type_name(id): + """ + Returns the name of the CTF numeric type identifier *id*. + """ + + name = "UNKNOWN_TYPE" + constants = [ + attr for attr in dir(CTFTypeId) if not callable( + getattr( + CTFTypeId, + attr)) and not attr.startswith("__")] + + for attr in constants: + if getattr(CTFTypeId, attr) == id: + name = attr + break + + return name + + +class CTFScope: + """ + CTF scopes. + """ + + #: Packet header + TRACE_PACKET_HEADER = 0 + + #: Packet context + STREAM_PACKET_CONTEXT = 1 + + #: Event header + STREAM_EVENT_HEADER = 2 + + #: Stream event context + STREAM_EVENT_CONTEXT = 3 + + #: Event context + EVENT_CONTEXT = 4 + + #: Event fields + EVENT_FIELDS = 5 + + def scope_name(scope): + """ + Returns the name of the CTF scope *scope*. + """ + + name = "UNKNOWN_SCOPE" + constants = [ + attr for attr in dir(CTFScope) if not callable( + getattr( + CTFScope, + attr)) and not attr.startswith("__")] + + for attr in constants: + if getattr(CTFScope, attr) == scope: + name = attr + break + + return name diff --git a/bindings/python/reader.py b/bindings/python/reader.py new file mode 100644 index 00000000..285111e8 --- /dev/null +++ b/bindings/python/reader.py @@ -0,0 +1,1223 @@ +# reader.py +# +# Babeltrace reader interface Python module +# +# Copyright 2012-2015 EfficiOS Inc. +# +# Author: Danny Serres +# Author: Jérémie Galarneau +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import babeltrace.nativebt as nbt +import babeltrace.common as common +import collections +import os +from datetime import datetime + + +class TraceCollection: + """ + A :class:`TraceCollection` is a collection of opened traces. + + Once a trace collection is created, you can add traces to the + collection by using the :meth:`add_trace` or + :meth:`add_traces_recursive`, and then iterate on the merged + events using :attr:`events`. + + You may use :meth:`remove_trace` to close and remove a specific + trace from a trace collection. + """ + + def __init__(self): + """ + Creates an empty trace collection. + """ + + self._tc = nbt._bt_context_create() + + def __del__(self): + nbt._bt_context_put(self._tc) + + def add_trace(self, path, format_str): + """ + Adds a trace to the trace collection. + + *path* is the exact path of the trace on the filesystem. + + *format_str* is a string indicating the type of trace to + add. ``ctf`` is currently the only supported trace format. + + Returns the corresponding :class:`TraceHandle` instance for + this opened trace on success, or ``None`` on error. + + This function **does not** recurse directories to find a + trace. See :meth:`add_traces_recursive` for a recursive + version of this function. + """ + + ret = nbt._bt_context_add_trace(self._tc, path, format_str, + None, None, None) + + if ret < 0: + return None + + th = TraceHandle.__new__(TraceHandle) + th._id = ret + th._trace_collection = self + + return th + + def add_traces_recursive(self, path, format_str): + """ + Adds traces to this trace collection by recursively searching + in the *path* directory. + + *format_str* is a string indicating the type of trace to add. + ``ctf`` is currently the only supported trace format. + + Returns a :class:`dict` object mapping full paths to trace + handles for each trace found, or ``None`` on error. + + See also :meth:`add_trace`. + """ + + trace_handles = {} + noTrace = True + error = False + + for fullpath, dirs, files in os.walk(path): + if "metadata" in files: + trace_handle = self.add_trace(fullpath, format_str) + + if trace_handle is None: + error = True + continue + + trace_handles[fullpath] = trace_handle + noTrace = False + + if noTrace and error: + return None + + return trace_handles + + def remove_trace(self, trace_handle): + """ + Removes a trace from the trace collection using its trace + handle *trace_handle*. + + :class:`TraceHandle` objects are returned by :meth:`add_trace` + and :meth:`add_traces_recursive`. + """ + + try: + nbt._bt_context_remove_trace(self._tc, trace_handle._id) + except AttributeError: + raise TypeError("in remove_trace, argument 2 must be a TraceHandle instance") + + @property + def events(self): + """ + Generates the ordered :class:`Event` objects of all the opened + traces contained in this trace collection. + + Due to limitations of the native Babeltrace API, only one event + may be "alive" at a given time, i.e. a user **should never** + store a copy of the events returned by this function for + ulterior use. Users shall make sure to copy the information + they need *from* an event before accessing the next one. + """ + + begin_pos_ptr = nbt._bt_iter_pos() + end_pos_ptr = nbt._bt_iter_pos() + begin_pos_ptr.type = nbt.SEEK_BEGIN + end_pos_ptr.type = nbt.SEEK_LAST + + for event in self._events(begin_pos_ptr, end_pos_ptr): + yield event + + def events_timestamps(self, timestamp_begin, timestamp_end): + """ + Generates the ordered :class:`Event` objects of all the opened + traces contained in this trace collection from *timestamp_begin* + to *timestamp_end*. + + *timestamp_begin* and *timestamp_end* are given in nanoseconds + since Epoch. + + See :attr:`events` for notes and limitations. + """ + + begin_pos_ptr = nbt._bt_iter_pos() + end_pos_ptr = nbt._bt_iter_pos() + begin_pos_ptr.type = end_pos_ptr.type = nbt.SEEK_TIME + begin_pos_ptr.u.seek_time = timestamp_begin + end_pos_ptr.u.seek_time = timestamp_end + + for event in self._events(begin_pos_ptr, end_pos_ptr): + yield event + + @property + def timestamp_begin(self): + """ + Begin timestamp of this trace collection (nanoseconds since + Epoch). + """ + + pos_ptr = nbt._bt_iter_pos() + pos_ptr.type = nbt.SEEK_BEGIN + + return self._timestamp_at_pos(pos_ptr) + + @property + def timestamp_end(self): + """ + End timestamp of this trace collection (nanoseconds since + Epoch). + """ + + pos_ptr = nbt._bt_iter_pos() + pos_ptr.type = nbt.SEEK_LAST + + return self._timestamp_at_pos(pos_ptr) + + def _timestamp_at_pos(self, pos_ptr): + ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, pos_ptr, pos_ptr) + + if ctf_it_ptr is None: + raise NotImplementedError("Creation of multiple iterators is unsupported.") + + ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr) + nbt._bt_ctf_iter_destroy(ctf_it_ptr) + + def _events(self, begin_pos_ptr, end_pos_ptr): + ctf_it_ptr = nbt._bt_ctf_iter_create(self._tc, begin_pos_ptr, end_pos_ptr) + + if ctf_it_ptr is None: + raise NotImplementedError("Creation of multiple iterators is unsupported.") + + while True: + ev_ptr = nbt._bt_ctf_iter_read_event(ctf_it_ptr) + + if ev_ptr is None: + break + + ev = Event.__new__(Event) + ev._e = ev_ptr + + try: + yield ev + except GeneratorExit: + break + + ret = nbt._bt_iter_next(nbt._bt_ctf_get_iter(ctf_it_ptr)) + + if ret != 0: + break + + nbt._bt_ctf_iter_destroy(ctf_it_ptr) + + +# Based on enum bt_clock_type in clock-type.h +class _ClockType: + CLOCK_CYCLES = 0 + CLOCK_REAL = 1 + + +class TraceHandle: + """ + A :class:`TraceHandle` is a handle allowing the user to manipulate + a specific trace directly. It is a unique identifier representing a + trace, and is not meant to be instantiated by the user. + """ + + def __init__(self): + raise NotImplementedError("TraceHandle cannot be instantiated") + + def __repr__(self): + return "Babeltrace TraceHandle: trace_id('{0}')".format(self._id) + + @property + def id(self): + """ + Numeric ID of this trace handle. + """ + + return self._id + + @property + def path(self): + """ + Path of the underlying trace. + """ + + return nbt._bt_trace_handle_get_path(self._trace_collection._tc, + self._id) + + @property + def timestamp_begin(self): + """ + Buffers creation timestamp (nanoseconds since Epoch) of the + underlying trace. + """ + + return nbt._bt_trace_handle_get_timestamp_begin(self._trace_collection._tc, + self._id, + _ClockType.CLOCK_REAL) + + @property + def timestamp_end(self): + """ + Buffers destruction timestamp (nanoseconds since Epoch) of the + underlying trace. + """ + + return nbt._bt_trace_handle_get_timestamp_end(self._trace_collection._tc, + self._id, + _ClockType.CLOCK_REAL) + + @property + def events(self): + """ + Generates all the :class:`EventDeclaration` objects of the + underlying trace. + """ + + ret = nbt._bt_python_event_decl_listcaller(self.id, + self._trace_collection._tc) + + if not isinstance(ret, list): + return + + ptr_list, count = ret + + for i in range(count): + tmp = EventDeclaration.__new__(EventDeclaration) + tmp._ed = nbt._bt_python_decl_one_from_list(ptr_list, i) + yield tmp + + + + +# Priority of the scopes when searching for event fields +_scopes = [ + common.CTFScope.EVENT_FIELDS, + common.CTFScope.EVENT_CONTEXT, + common.CTFScope.STREAM_EVENT_CONTEXT, + common.CTFScope.STREAM_EVENT_HEADER, + common.CTFScope.STREAM_PACKET_CONTEXT, + common.CTFScope.TRACE_PACKET_HEADER +] + + +class Event(collections.Mapping): + """ + An :class:`Event` object represents a trace event. :class:`Event` + objects are returned by :attr:`TraceCollection.events` and are + not meant to be instantiated by the user. + + :class:`Event` has a :class:`dict`-like interface for accessing + an event's field value by field name: + + .. code-block:: python + + event['my_field'] + + If a field name exists in multiple scopes, the value of the first + field found is returned. The scopes are searched in the following + order: + + 1. Event fields (:attr:`babeltrace.common.CTFScope.EVENT_FIELDS`) + 2. Event context (:attr:`babeltrace.common.CTFScope.EVENT_CONTEXT`) + 3. Stream event context (:attr:`babeltrace.common.CTFScope.STREAM_EVENT_CONTEXT`) + 4. Event header (:attr:`babeltrace.common.CTFScope.STREAM_EVENT_HEADER`) + 5. Packet context (:attr:`babeltrace.common.CTFScope.STREAM_PACKET_CONTEXT`) + 6. Packet header (:attr:`babeltrace.common.CTFScope.TRACE_PACKET_HEADER`) + + It is still possible to obtain a field's value from a specific + scope using :meth:`field_with_scope`. + + Field values are returned as native Python types, that is: + + +-----------------------+----------------------------------+ + | Field type | Python type | + +=======================+==================================+ + | Integer | :class:`int` | + +-----------------------+----------------------------------+ + | Floating point number | :class:`float` | + +-----------------------+----------------------------------+ + | Enumeration | :class:`str` (enumeration label) | + +-----------------------+----------------------------------+ + | String | :class:`str` | + +-----------------------+----------------------------------+ + | Array | :class:`list` of native Python | + | | objects | + +-----------------------+----------------------------------+ + | Sequence | :class:`list` of native Python | + | | objects | + +-----------------------+----------------------------------+ + | Structure | :class:`dict` mapping field | + | | names to native Python objects | + +-----------------------+----------------------------------+ + + For example, printing the third element of a sequence named ``seq`` + in a structure named ``my_struct`` of the ``event``'s field named + ``my_field`` is done this way: + + .. code-block:: python + + print(event['my_field']['my_struct']['seq'][2]) + """ + + def __init__(self): + raise NotImplementedError("Event cannot be instantiated") + + @property + def name(self): + """ + Event name or ``None`` on error. + """ + + return nbt._bt_ctf_event_name(self._e) + + @property + def cycles(self): + """ + Event timestamp in cycles or -1 on error. + """ + + return nbt._bt_ctf_get_cycles(self._e) + + @property + def timestamp(self): + """ + Event timestamp (nanoseconds since Epoch) or -1 on error. + """ + + return nbt._bt_ctf_get_timestamp(self._e) + + @property + def datetime(self): + """ + Event timestamp as a standard :class:`datetime.datetime` + object. + + Note that the :class:`datetime.datetime` class' precision + is limited to microseconds, whereas :attr:`timestamp` provides + the event's timestamp with a nanosecond resolution. + """ + + return datetime.fromtimestamp(self.timestamp / 1E9) + + def field_with_scope(self, field_name, scope): + """ + Returns the value of a field named *field_name* within the + scope *scope*, or ``None`` if the field cannot be found. + + *scope* must be one of :class:`babeltrace.common.CTFScope` + constants. + """ + + if scope not in _scopes: + raise ValueError("Invalid scope provided") + + field = self._field_with_scope(field_name, scope) + + if field is not None: + return field.value + + def field_list_with_scope(self, scope): + """ + Returns a list of field names in the scope *scope*. + """ + + if scope not in _scopes: + raise ValueError("Invalid scope provided") + + field_names = [] + + for field in self._field_list_with_scope(scope): + field_names.append(field.name) + + return field_names + + @property + def handle(self): + """ + :class:`TraceHandle` object containing this event, or ``None`` + on error. + """ + + ret = nbt._bt_ctf_event_get_handle_id(self._e) + + if ret < 0: + return None + + th = TraceHandle.__new__(TraceHandle) + th._id = ret + th._trace_collection = self.get_trace_collection() + + return th + + @property + def trace_collection(self): + """ + :class:`TraceCollection` object containing this event, or + ``None`` on error. + """ + + trace_collection = TraceCollection() + trace_collection._tc = nbt._bt_ctf_event_get_context(self._e) + + if trace_collection._tc is not None: + return trace_collection + + def __getitem__(self, field_name): + field = self._field(field_name) + + if field is not None: + return field.value + + raise KeyError(field_name) + + def __iter__(self): + for key in self.keys(): + yield key + + def __len__(self): + count = 0 + + for scope in _scopes: + scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) + ret = nbt._bt_python_field_listcaller(self._e, scope_ptr) + + if isinstance(ret, list): + count += ret[1] + + return count + + def __contains__(self, field_name): + return self._field(field_name) is not None + + def keys(self): + """ + Returns the list of field names. + + Note: field names are unique within the returned list, although + a field name could exist in multiple scopes. Use + :meth:`field_list_with_scope` to obtain the list of field names + of a given scope. + """ + + field_names = set() + + for scope in _scopes: + for name in self.field_list_with_scope(scope): + field_names.add(name) + + return list(field_names) + + def get(self, field_name, default=None): + """ + Returns the value of the field named *field_name*, or *default* + when not found. + + See :class:`Event` note about how fields are retrieved by + name when multiple fields share the same name in different + scopes. + """ + + field = self._field(field_name) + + if field is None: + return default + + return field.value + + def items(self): + """ + Generates pairs of (field name, field value). + + This method iterates :meth:`keys` to find field names, which + means some fields could be unavailable if other fields share + their names in scopes with higher priorities. + """ + + for field in self.keys(): + yield (field, self[field]) + + def _field_with_scope(self, field_name, scope): + scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) + + if scope_ptr is None: + return None + + definition_ptr = nbt._bt_ctf_get_field(self._e, scope_ptr, field_name) + + if definition_ptr is None: + return None + + field = _Definition(definition_ptr, scope) + + return field + + def _field(self, field_name): + field = None + + for scope in _scopes: + field = self._field_with_scope(field_name, scope) + + if field is not None: + break + + return field + + def _field_list_with_scope(self, scope): + fields = [] + scope_ptr = nbt._bt_ctf_get_top_level_scope(self._e, scope) + + # Returns a list [list_ptr, count]. If list_ptr is NULL, SWIG will only + # provide the "count" return value + count = 0 + list_ptr = None + ret = nbt._bt_python_field_listcaller(self._e, scope_ptr) + + if isinstance(ret, list): + list_ptr, count = ret + + for i in range(count): + definition_ptr = nbt._bt_python_field_one_from_list(list_ptr, i) + + if definition_ptr is not None: + definition = _Definition(definition_ptr, scope) + fields.append(definition) + + return fields + + +class FieldError(Exception): + """ + Field error, raised when the value of a field cannot be accessed. + """ + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +class EventDeclaration: + """ + An event declaration contains the properties of a class of events, + that is, the common properties and fields layout of all the actual + recorded events associated with this declaration. + + This class is not meant to be instantiated by the user. It is + returned by :attr:`TraceHandle.events`. + """ + + MAX_UINT64 = 0xFFFFFFFFFFFFFFFF + + def __init__(self): + raise NotImplementedError("EventDeclaration cannot be instantiated") + + @property + def name(self): + """ + Event name, or ``None`` on error. + """ + + return nbt._bt_ctf_get_decl_event_name(self._ed) + + @property + def id(self): + """ + Event numeric ID, or -1 on error. + """ + + id = nbt._bt_ctf_get_decl_event_id(self._ed) + + if id == self.MAX_UINT64: + id = -1 + + return id + + @property + def fields(self): + """ + Generates all the field declarations of this event, going + through each scope in the following order: + + 1. Event fields (:attr:`babeltrace.common.CTFScope.EVENT_FIELDS`) + 2. Event context (:attr:`babeltrace.common.CTFScope.EVENT_CONTEXT`) + 3. Stream event context (:attr:`babeltrace.common.CTFScope.STREAM_EVENT_CONTEXT`) + 4. Event header (:attr:`babeltrace.common.CTFScope.STREAM_EVENT_HEADER`) + 5. Packet context (:attr:`babeltrace.common.CTFScope.STREAM_PACKET_CONTEXT`) + 6. Packet header (:attr:`babeltrace.common.CTFScope.TRACE_PACKET_HEADER`) + + All the generated field declarations inherit + :class:`FieldDeclaration`, and are among: + + * :class:`IntegerFieldDeclaration` + * :class:`FloatFieldDeclaration` + * :class:`EnumerationFieldDeclaration` + * :class:`StringFieldDeclaration` + * :class:`ArrayFieldDeclaration` + * :class:`SequenceFieldDeclaration` + * :class:`StructureFieldDeclaration` + * :class:`VariantFieldDeclaration` + """ + + for scope in _scopes: + for declaration in self.fields_scope(scope): + yield declaration + + def fields_scope(self, scope): + """ + Generates all the field declarations of the event's scope + *scope*. + + *scope* must be one of :class:`babeltrace.common.CTFScope` constants. + + All the generated field declarations inherit + :class:`FieldDeclaration`, and are among: + + * :class:`IntegerFieldDeclaration` + * :class:`FloatFieldDeclaration` + * :class:`EnumerationFieldDeclaration` + * :class:`StringFieldDeclaration` + * :class:`ArrayFieldDeclaration` + * :class:`SequenceFieldDeclaration` + * :class:`StructureFieldDeclaration` + * :class:`VariantFieldDeclaration` + """ + ret = nbt._by_python_field_decl_listcaller(self._ed, scope) + + if not isinstance(ret, list): + return + + list_ptr, count = ret + + for i in range(count): + field_decl_ptr = nbt._bt_python_field_decl_one_from_list(list_ptr, i) + + if field_decl_ptr is not None: + decl_ptr = nbt._bt_ctf_get_decl_from_field_decl(field_decl_ptr) + name = nbt._bt_ctf_get_decl_field_name(field_decl_ptr) + field_declaration = _create_field_declaration(decl_ptr, name, + scope) + yield field_declaration + + +class FieldDeclaration: + """ + Base class for concrete field declarations. + + This class is not meant to be instantiated by the user. + """ + + def __init__(self): + raise NotImplementedError("FieldDeclaration cannot be instantiated") + + def __repr__(self): + return "({0}) {1} {2}".format(common.CTFScope.scope_name(self.scope), + common.CTFTypeId.type_name(self.type), + self.name) + + @property + def name(self): + """ + Field name, or ``None`` on error. + """ + + return self._name + + @property + def type(self): + """ + Field type (one of :class:`babeltrace.common.CTFTypeId` + constants). + """ + + return nbt._bt_ctf_field_type(self._fd) + + @property + def scope(self): + """ + Field scope (one of:class:`babeltrace.common.CTFScope` + constants). + """ + + return self._s + + +class IntegerFieldDeclaration(FieldDeclaration): + """ + Integer field declaration. + """ + + def __init__(self): + raise NotImplementedError("IntegerFieldDeclaration cannot be instantiated") + + @property + def signedness(self): + """ + 0 if this integer is unsigned, 1 if signed, or -1 on error. + """ + + return nbt._bt_ctf_get_int_signedness(self._fd) + + @property + def base(self): + """ + Integer base (:class:`int`), or a negative value on error. + """ + + return nbt._bt_ctf_get_int_base(self._fd) + + @property + def byte_order(self): + """ + Integer byte order (one of + :class:`babeltrace.common.ByteOrder` constants). + """ + + ret = nbt._bt_ctf_get_int_byte_order(self._fd) + + if ret == 1234: + return common.ByteOrder.BYTE_ORDER_LITTLE_ENDIAN + elif ret == 4321: + return common.ByteOrder.BYTE_ORDER_BIG_ENDIAN + else: + return common.ByteOrder.BYTE_ORDER_UNKNOWN + + @property + def length(self): + """ + Integer size in bits, or a negative value on error. + """ + + return nbt._bt_ctf_get_int_len(self._fd) + + @property + def encoding(self): + """ + Integer encoding (one of + :class:`babeltrace.common.CTFStringEncoding` constants). + """ + + return nbt._bt_ctf_get_encoding(self._fd) + + +class EnumerationFieldDeclaration(FieldDeclaration): + """ + Enumeration field declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("EnumerationFieldDeclaration cannot be instantiated") + + +class ArrayFieldDeclaration(FieldDeclaration): + """ + Static array field declaration. + """ + + def __init__(self): + raise NotImplementedError("ArrayFieldDeclaration cannot be instantiated") + + @property + def length(self): + """ + Fixed length of this static array (number of contained + elements), or a negative value on error. + """ + + return nbt._bt_ctf_get_array_len(self._fd) + + @property + def element_declaration(self): + """ + Field declaration of the underlying element. + """ + + field_decl_ptr = nbt._bt_python_get_array_element_declaration(self._fd) + + return _create_field_declaration(field_decl_ptr, "", self.scope) + + +class SequenceFieldDeclaration(FieldDeclaration): + """ + Sequence (dynamic array) field declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("SequenceFieldDeclaration cannot be instantiated") + + @property + def element_declaration(self): + """ + Field declaration of the underlying element. + """ + + field_decl_ptr = nbt._bt_python_get_sequence_element_declaration(self._fd) + + return _create_field_declaration(field_decl_ptr, "", self.scope) + + +class FloatFieldDeclaration(FieldDeclaration): + """ + Floating point number field declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("FloatFieldDeclaration cannot be instantiated") + + +class StructureFieldDeclaration(FieldDeclaration): + """ + Structure (ordered map of field names to field declarations) field + declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("StructureFieldDeclaration cannot be instantiated") + + +class StringFieldDeclaration(FieldDeclaration): + """ + String (NULL-terminated array of bytes) field declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("StringFieldDeclaration cannot be instantiated") + + +class VariantFieldDeclaration(FieldDeclaration): + """ + Variant (dynamic selection between different types) field declaration. + + .. note:: + + As of this version, this class is missing some properties. + """ + + def __init__(self): + raise NotImplementedError("VariantFieldDeclaration cannot be instantiated") + + +def field_error(): + """ + Return the last error code encountered while + accessing a field and reset the error flag. + Return 0 if no error, a negative value otherwise. + """ + + return nbt._bt_ctf_field_get_error() + + +def _create_field_declaration(declaration_ptr, name, scope): + """ + Private field declaration factory. + """ + + if declaration_ptr is None: + raise ValueError("declaration_ptr must be valid") + if scope not in _scopes: + raise ValueError("Invalid scope provided") + + type = nbt._bt_ctf_field_type(declaration_ptr) + declaration = None + + if type == common.CTFTypeId.INTEGER: + declaration = IntegerFieldDeclaration.__new__(IntegerFieldDeclaration) + elif type == common.CTFTypeId.ENUM: + declaration = EnumerationFieldDeclaration.__new__(EnumerationFieldDeclaration) + elif type == common.CTFTypeId.ARRAY: + declaration = ArrayFieldDeclaration.__new__(ArrayFieldDeclaration) + elif type == common.CTFTypeId.SEQUENCE: + declaration = SequenceFieldDeclaration.__new__(SequenceFieldDeclaration) + elif type == common.CTFTypeId.FLOAT: + declaration = FloatFieldDeclaration.__new__(FloatFieldDeclaration) + elif type == common.CTFTypeId.STRUCT: + declaration = StructureFieldDeclaration.__new__(StructureFieldDeclaration) + elif type == common.CTFTypeId.STRING: + declaration = StringFieldDeclaration.__new__(StringFieldDeclaration) + elif type == common.CTFTypeId.VARIANT: + declaration = VariantFieldDeclaration.__new__(VariantFieldDeclaration) + else: + return declaration + + declaration._fd = declaration_ptr + declaration._s = scope + declaration._name = name + + return declaration + + +class _Definition: + def __init__(self, definition_ptr, scope): + self._d = definition_ptr + self._s = scope + + if scope not in _scopes: + ValueError("Invalid scope provided") + + @property + def name(self): + """Return the name of a field or None on error.""" + + return nbt._bt_ctf_field_name(self._d) + + @property + def type(self): + """Return the type of a field or -1 if unknown.""" + + return nbt._bt_ctf_field_type(nbt._bt_ctf_get_decl_from_def(self._d)) + + @property + def declaration(self): + """Return the associated Definition object.""" + + return _create_field_declaration( + nbt._bt_ctf_get_decl_from_def(self._d), self.name, self.scope) + + def _get_enum_str(self): + """ + Return the string matching the current enumeration. + Return None on error. + """ + + return nbt._bt_ctf_get_enum_str(self._d) + + def _get_array_element_at(self, index): + """ + Return the array's element at position index. + Return None on error + """ + + array_ptr = nbt._bt_python_get_array_from_def(self._d) + + if array_ptr is None: + return None + + definition_ptr = nbt._bt_array_index(array_ptr, index) + + if definition_ptr is None: + return None + + return _Definition(definition_ptr, self.scope) + + def _get_sequence_len(self): + """ + Return the len of a sequence or a negative + value on error. + """ + + seq = nbt._bt_python_get_sequence_from_def(self._d) + + return nbt._bt_sequence_len(seq) + + def _get_sequence_element_at(self, index): + """ + Return the sequence's element at position index, + otherwise return None + """ + + seq = nbt._bt_python_get_sequence_from_def(self._d) + + if seq is not None: + definition_ptr = nbt._bt_sequence_index(seq, index) + + if definition_ptr is not None: + return _Definition(definition_ptr, self.scope) + + def _get_uint64(self): + """ + Return the value associated with the field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occured, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_uint64(self._d) + + def _get_int64(self): + """ + Return the value associated with the field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occured, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_int64(self._d) + + def _get_char_array(self): + """ + Return the value associated with the field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occurred, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_char_array(self._d) + + def _get_str(self): + """ + Return the value associated with the field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occurred, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_string(self._d) + + def _get_float(self): + """ + Return the value associated with the field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occurred, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_float(self._d) + + def _get_variant(self): + """ + Return the variant's selected field. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occurred, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_variant(self._d) + + def _get_struct_field_count(self): + """ + Return the number of fields contained in the structure. + If the field does not exist or is not of the type requested, + the value returned is undefined. + """ + + return nbt._bt_ctf_get_struct_field_count(self._d) + + def _get_struct_field_at(self, i): + """ + Return the structure's field at position i. + If the field does not exist or is not of the type requested, + the value returned is undefined. To check if an error occurred, + use the field_error() function after accessing a field. + """ + + return nbt._bt_ctf_get_struct_field_index(self._d, i) + + @property + def value(self): + """ + Return the value associated with the field according to its type. + Return None on error. + """ + + id = self.type + value = None + + if id == common.CTFTypeId.STRING: + value = self._get_str() + elif id == common.CTFTypeId.ARRAY: + element_decl = self.declaration.element_declaration + + if ((element_decl.type == common.CTFTypeId.INTEGER + and element_decl.length == 8) + and (element_decl.encoding == common.CTFStringEncoding.ASCII or element_decl.encoding == common.CTFStringEncoding.UTF8)): + value = nbt._bt_python_get_array_string(self._d) + else: + value = [] + + for i in range(self.declaration.length): + element = self._get_array_element_at(i) + value.append(element.value) + elif id == common.CTFTypeId.INTEGER: + if self.declaration.signedness == 0: + value = self._get_uint64() + else: + value = self._get_int64() + elif id == common.CTFTypeId.ENUM: + value = self._get_enum_str() + elif id == common.CTFTypeId.SEQUENCE: + element_decl = self.declaration.element_declaration + + if ((element_decl.type == common.CTFTypeId.INTEGER + and element_decl.length == 8) + and (element_decl.encoding == common.CTFStringEncoding.ASCII or element_decl.encoding == common.CTFStringEncoding.UTF8)): + value = nbt._bt_python_get_sequence_string(self._d) + else: + seq_len = self._get_sequence_len() + value = [] + + for i in range(seq_len): + evDef = self._get_sequence_element_at(i) + value.append(evDef.value) + elif id == common.CTFTypeId.FLOAT: + value = self._get_float() + elif id == common.CTFTypeId.VARIANT: + variant = _Definition.__new__(_Definition) + variant._d = self._get_variant() + value = variant.value + elif id == common.CTFTypeId.STRUCT: + value = {} + + for i in range(self._get_struct_field_count()): + member = _Definition(self._get_struct_field_at(i), self.scope) + value[member.name] = member.value + + if field_error(): + raise FieldError( + "Error occurred while accessing field {} of type {}".format( + self.name, + common.CTFTypeId.type_name(id))) + + return value + + @property + def scope(self): + """Return the scope of a field or None on error.""" + + return self._s diff --git a/bindings/python/writer.py b/bindings/python/writer.py new file mode 100644 index 00000000..fbcd6875 --- /dev/null +++ b/bindings/python/writer.py @@ -0,0 +1,1732 @@ +# writer.py +# +# Babeltrace writer interface Python module +# +# Copyright 2012-2015 EfficiOS Inc. +# +# Author: Jérémie Galarneau +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import babeltrace.nativebt as nbt +import babeltrace.common as common +from uuid import UUID + + +# Used to compare to -1ULL in error checks +_MAX_UINT64 = 0xFFFFFFFFFFFFFFFF + + +class EnumerationMapping: + """ + Enumeration mapping class. start and end values are inclusive. + """ + + def __init__(self, name, start, end): + self.name = name + self.start = start + self.end = end + + +class Clock: + def __init__(self, name): + self._c = nbt._bt_ctf_clock_create(name) + + if self._c is None: + raise ValueError("Invalid clock name.") + + def __del__(self): + nbt._bt_ctf_clock_put(self._c) + + @property + def name(self): + """ + Get the clock's name. + """ + + name = nbt._bt_ctf_clock_get_name(self._c) + + if name is None: + raise ValueError("Invalid clock instance.") + + return name + + @property + def description(self): + """ + Get the clock's description. None if unset. + """ + + return nbt._bt_ctf_clock_get_description(self._c) + + @description.setter + def description(self, desc): + """ + Set the clock's description. The description appears in the clock's TSDL + meta-data. + """ + + ret = nbt._bt_ctf_clock_set_description(self._c, str(desc)) + + if ret < 0: + raise ValueError("Invalid clock description.") + + @property + def frequency(self): + """ + Get the clock's frequency (Hz). + """ + + freq = nbt._bt_ctf_clock_get_frequency(self._c) + + if freq == _MAX_UINT64: + raise ValueError("Invalid clock instance") + + return freq + + @frequency.setter + def frequency(self, freq): + """ + Set the clock's frequency (Hz). + """ + + ret = nbt._bt_ctf_clock_set_frequency(self._c, freq) + + if ret < 0: + raise ValueError("Invalid frequency value.") + + @property + def precision(self): + """ + Get the clock's precision (in clock ticks). + """ + + precision = nbt._bt_ctf_clock_get_precision(self._c) + + if precision == _MAX_UINT64: + raise ValueError("Invalid clock instance") + + return precision + + @precision.setter + def precision(self, precision): + """ + Set the clock's precision (in clock ticks). + """ + + ret = nbt._bt_ctf_clock_set_precision(self._c, precision) + + @property + def offset_seconds(self): + """ + Get the clock's offset in seconds from POSIX.1 Epoch. + """ + + offset_s = nbt._bt_ctf_clock_get_offset_s(self._c) + + if offset_s == _MAX_UINT64: + raise ValueError("Invalid clock instance") + + return offset_s + + @offset_seconds.setter + def offset_seconds(self, offset_s): + """ + Set the clock's offset in seconds from POSIX.1 Epoch. + """ + + ret = nbt._bt_ctf_clock_set_offset_s(self._c, offset_s) + + if ret < 0: + raise ValueError("Invalid offset value.") + + @property + def offset(self): + """ + Get the clock's offset in ticks from POSIX.1 Epoch + offset in seconds. + """ + + offset = nbt._bt_ctf_clock_get_offset(self._c) + + if offset == _MAX_UINT64: + raise ValueError("Invalid clock instance") + + return offset + + @offset.setter + def offset(self, offset): + """ + Set the clock's offset in ticks from POSIX.1 Epoch + offset in seconds. + """ + + ret = nbt._bt_ctf_clock_set_offset(self._c, offset) + + if ret < 0: + raise ValueError("Invalid offset value.") + + @property + def absolute(self): + """ + Get a clock's absolute attribute. A clock is absolute if the clock + is a global reference across the trace's other clocks. + """ + + is_absolute = nbt._bt_ctf_clock_get_is_absolute(self._c) + + if is_absolute == -1: + raise ValueError("Invalid clock instance") + + return False if is_absolute == 0 else True + + @absolute.setter + def absolute(self, is_absolute): + """ + Set a clock's absolute attribute. A clock is absolute if the clock + is a global reference across the trace's other clocks. + """ + + ret = nbt._bt_ctf_clock_set_is_absolute(self._c, int(is_absolute)) + + if ret < 0: + raise ValueError("Could not set the clock's absolute attribute.") + + @property + def uuid(self): + """ + Get a clock's UUID (an object of type UUID). + """ + + uuid_list = [] + + for i in range(16): + ret, value = nbt._bt_python_ctf_clock_get_uuid_index(self._c, i) + + if ret < 0: + raise ValueError("Invalid clock instance") + + uuid_list.append(value) + + return UUID(bytes=bytes(uuid_list)) + + @uuid.setter + def uuid(self, uuid): + """ + Set a clock's UUID (an object of type UUID). + """ + + uuid_bytes = uuid.bytes + + if len(uuid_bytes) != 16: + raise ValueError("Invalid UUID provided. UUID length must be 16 bytes") + + for i in range(len(uuid_bytes)): + ret = nbt._bt_python_ctf_clock_set_uuid_index(self._c, i, + uuid_bytes[i]) + + if ret < 0: + raise ValueError("Invalid clock instance") + + @property + def time(self): + """ + Get the current time in nanoseconds since the clock's origin (offset and + offset_s attributes). + """ + + time = nbt._bt_ctf_clock_get_time(self._c) + + if time == _MAX_UINT64: + raise ValueError("Invalid clock instance") + + return time + + @time.setter + def time(self, time): + """ + Set the current time in nanoseconds since the clock's origin (offset and + offset_s attributes). The clock's value will be sampled as events are + appended to a stream. + """ + + ret = nbt._bt_ctf_clock_set_time(self._c, time) + + if ret < 0: + raise ValueError("Invalid time value.") + + +class FieldDeclaration: + """ + FieldDeclaration should not be instantiated directly. Instantiate + one of the concrete FieldDeclaration classes. + """ + + class IntegerBase: + # These values are based on the bt_ctf_integer_base enum + # declared in event-types.h. + INTEGER_BASE_UNKNOWN = -1 + INTEGER_BASE_BINARY = 2 + INTEGER_BASE_OCTAL = 8 + INTEGER_BASE_DECIMAL = 10 + INTEGER_BASE_HEXADECIMAL = 16 + + def __init__(self): + if self._ft is None: + raise ValueError("FieldDeclaration creation failed.") + + def __del__(self): + nbt._bt_ctf_field_type_put(self._ft) + + @staticmethod + def _create_field_declaration_from_native_instance( + native_field_declaration): + type_dict = { + common.CTFTypeId.INTEGER: IntegerFieldDeclaration, + common.CTFTypeId.FLOAT: FloatFieldDeclaration, + common.CTFTypeId.ENUM: EnumerationFieldDeclaration, + common.CTFTypeId.STRING: StringFieldDeclaration, + common.CTFTypeId.STRUCT: StructureFieldDeclaration, + common.CTFTypeId.VARIANT: VariantFieldDeclaration, + common.CTFTypeId.ARRAY: ArrayFieldDeclaration, + common.CTFTypeId.SEQUENCE: SequenceFieldDeclaration + } + + field_type_id = nbt._bt_ctf_field_type_get_type_id(native_field_declaration) + + if field_type_id == common.CTFTypeId.UNKNOWN: + raise TypeError("Invalid field instance") + + declaration = Field.__new__(Field) + declaration._ft = native_field_declaration + declaration.__class__ = type_dict[field_type_id] + + return declaration + + @property + def alignment(self): + """ + Get the field declaration's alignment. Returns -1 on error. + """ + + return nbt._bt_ctf_field_type_get_alignment(self._ft) + + @alignment.setter + def alignment(self, alignment): + """ + Set the field declaration's alignment. Defaults to 1 (bit-aligned). However, + some types, such as structures and string, may impose other alignment + constraints. + """ + + ret = nbt._bt_ctf_field_type_set_alignment(self._ft, alignment) + + if ret < 0: + raise ValueError("Invalid alignment value.") + + @property + def byte_order(self): + """ + Get the field declaration's byte order. One of the ByteOrder's constant. + """ + + return nbt._bt_ctf_field_type_get_byte_order(self._ft) + + @byte_order.setter + def byte_order(self, byte_order): + """ + Set the field declaration's byte order. Use constants defined in the ByteOrder + class. + """ + + ret = nbt._bt_ctf_field_type_set_byte_order(self._ft, byte_order) + + if ret < 0: + raise ValueError("Could not set byte order value.") + + +class IntegerFieldDeclaration(FieldDeclaration): + def __init__(self, size): + """ + Create a new integer field declaration of the given size. + """ + self._ft = nbt._bt_ctf_field_type_integer_create(size) + super().__init__() + + @property + def size(self): + """ + Get an integer's size. + """ + + ret = nbt._bt_ctf_field_type_integer_get_size(self._ft) + + if ret < 0: + raise ValueError("Could not get Integer's size attribute.") + else: + return ret + + @property + def signed(self): + """ + Get an integer's signedness attribute. + """ + + ret = nbt._bt_ctf_field_type_integer_get_signed(self._ft) + + if ret < 0: + raise ValueError("Could not get Integer's signed attribute.") + elif ret > 0: + return True + else: + return False + + @signed.setter + def signed(self, signed): + """ + Set an integer's signedness attribute. + """ + + ret = nbt._bt_ctf_field_type_integer_set_signed(self._ft, signed) + + if ret < 0: + raise ValueError("Could not set Integer's signed attribute.") + + @property + def base(self): + """ + Get the integer's base used to pretty-print the resulting trace. + Returns a constant from the FieldDeclaration.IntegerBase class. + """ + + return nbt._bt_ctf_field_type_integer_get_base(self._ft) + + @base.setter + def base(self, base): + """ + Set the integer's base used to pretty-print the resulting trace. + The base must be a constant of the FieldDeclarationIntegerBase class. + """ + + ret = nbt._bt_ctf_field_type_integer_set_base(self._ft, base) + + if ret < 0: + raise ValueError("Could not set Integer's base.") + + @property + def encoding(self): + """ + Get the integer's encoding (one of the constants of the + CTFStringEncoding class). + Returns a constant from the CTFStringEncoding class. + """ + + return nbt._bt_ctf_field_type_integer_get_encoding(self._ft) + + @encoding.setter + def encoding(self, encoding): + """ + An integer encoding may be set to signal that the integer must be printed + as a text character. Must be a constant from the CTFStringEncoding class. + """ + + ret = nbt._bt_ctf_field_type_integer_set_encoding(self._ft, encoding) + + if ret < 0: + raise ValueError("Could not set Integer's encoding.") + + +class EnumerationFieldDeclaration(FieldDeclaration): + def __init__(self, integer_type): + """ + Create a new enumeration field declaration with the given underlying container type. + """ + isinst = isinstance(integer_type, IntegerFieldDeclaration) + + if integer_type is None or not isinst: + raise TypeError("Invalid integer container.") + + self._ft = nbt._bt_ctf_field_type_enumeration_create(integer_type._ft) + super().__init__() + + @property + def container(self): + """ + Get the enumeration's underlying container type. + """ + + ret = nbt._bt_ctf_field_type_enumeration_get_container_type(self._ft) + + if ret is None: + raise TypeError("Invalid enumeration declaration") + + return FieldDeclaration._create_field_declaration_from_native_instance(ret) + + def add_mapping(self, name, range_start, range_end): + """ + Add a mapping to the enumeration. The range's values are inclusive. + """ + + if range_start < 0 or range_end < 0: + ret = nbt._bt_ctf_field_type_enumeration_add_mapping(self._ft, + str(name), + range_start, + range_end) + else: + ret = nbt._bt_ctf_field_type_enumeration_add_mapping_unsigned(self._ft, + str(name), + range_start, + range_end) + + if ret < 0: + raise ValueError("Could not add mapping to enumeration declaration.") + + @property + def mappings(self): + """ + Generator returning instances of EnumerationMapping. + """ + + signed = self.container.signed + + count = nbt._bt_ctf_field_type_enumeration_get_mapping_count(self._ft) + + for i in range(count): + if signed: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, i) + else: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, i) + + if len(ret) != 3: + msg = "Could not get Enumeration mapping at index {}".format(i) + raise TypeError(msg) + + name, range_start, range_end = ret + yield EnumerationMapping(name, range_start, range_end) + + def get_mapping_by_name(self, name): + """ + Get a mapping by name (EnumerationMapping). + """ + + index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_name(self._ft, name) + + if index < 0: + return None + + if self.container.signed: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index) + else: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index) + + if len(ret) != 3: + msg = "Could not get Enumeration mapping at index {}".format(i) + raise TypeError(msg) + + name, range_start, range_end = ret + + return EnumerationMapping(name, range_start, range_end) + + def get_mapping_by_value(self, value): + """ + Get a mapping by value (EnumerationMapping). + """ + + if value < 0: + index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_value(self._ft, value) + else: + index = nbt._bt_ctf_field_type_enumeration_get_mapping_index_by_unsigned_value(self._ft, value) + + if index < 0: + return None + + if self.container.signed: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping(self._ft, index) + else: + ret = nbt._bt_python_ctf_field_type_enumeration_get_mapping_unsigned(self._ft, index) + + if len(ret) != 3: + msg = "Could not get Enumeration mapping at index {}".format(i) + raise TypeError(msg) + + name, range_start, range_end = ret + + return EnumerationMapping(name, range_start, range_end) + + +class FloatFieldDeclaration(FieldDeclaration): + FLT_EXP_DIG = 8 + DBL_EXP_DIG = 11 + FLT_MANT_DIG = 24 + DBL_MANT_DIG = 53 + + def __init__(self): + """ + Create a new floating point field declaration. + """ + + self._ft = nbt._bt_ctf_field_type_floating_point_create() + super().__init__() + + @property + def exponent_digits(self): + """ + Get the number of exponent digits used to store the floating point field. + """ + + ret = nbt._bt_ctf_field_type_floating_point_get_exponent_digits(self._ft) + + if ret < 0: + raise TypeError( + "Could not get Floating point exponent digit count") + + return ret + + @exponent_digits.setter + def exponent_digits(self, exponent_digits): + """ + Set the number of exponent digits to use to store the floating point field. + The only values currently supported are FLT_EXP_DIG and DBL_EXP_DIG which + are defined as constants of this class. + """ + + ret = nbt._bt_ctf_field_type_floating_point_set_exponent_digits(self._ft, + exponent_digits) + + if ret < 0: + raise ValueError("Could not set exponent digit count.") + + @property + def mantissa_digits(self): + """ + Get the number of mantissa digits used to store the floating point field. + """ + + ret = nbt._bt_ctf_field_type_floating_point_get_mantissa_digits(self._ft) + + if ret < 0: + raise TypeError("Could not get Floating point mantissa digit count") + + return ret + + @mantissa_digits.setter + def mantissa_digits(self, mantissa_digits): + """ + Set the number of mantissa digits to use to store the floating point field. + The only values currently supported are FLT_MANT_DIG and DBL_MANT_DIG which + are defined as constants of this class. + """ + + ret = nbt._bt_ctf_field_type_floating_point_set_mantissa_digits(self._ft, + mantissa_digits) + + if ret < 0: + raise ValueError("Could not set mantissa digit count.") + + +class FloatingPointFieldDeclaration(FloatFieldDeclaration): + pass + + +class StructureFieldDeclaration(FieldDeclaration): + def __init__(self): + """ + Create a new structure field declaration. + """ + + self._ft = nbt._bt_ctf_field_type_structure_create() + super().__init__() + + def add_field(self, field_type, field_name): + """ + Add a field of type "field_type" to the structure. + """ + + ret = nbt._bt_ctf_field_type_structure_add_field(self._ft, + field_type._ft, + str(field_name)) + + if ret < 0: + raise ValueError("Could not add field to structure.") + + @property + def fields(self): + """ + Generator returning the structure's field as tuples of (field name, field declaration). + """ + + count = nbt._bt_ctf_field_type_structure_get_field_count(self._ft) + + if count < 0: + raise TypeError("Could not get Structure field count") + + for i in range(count): + field_name = nbt._bt_python_ctf_field_type_structure_get_field_name(self._ft, i) + + if field_name is None: + msg = "Could not get Structure field name at index {}".format(i) + raise TypeError(msg) + + field_type_native = nbt._bt_python_ctf_field_type_structure_get_field_type(self._ft, i) + + if field_type_native is None: + msg = "Could not get Structure field type at index {}".format(i) + raise TypeError(msg) + + field_type = FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + yield (field_name, field_type) + + def get_field_by_name(self, name): + """ + Get a field declaration by name (FieldDeclaration). + """ + + field_type_native = nbt._bt_ctf_field_type_structure_get_field_type_by_name(self._ft, name) + + if field_type_native is None: + msg = "Could not find Structure field with name {}".format(name) + raise TypeError(msg) + + return FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + + +class VariantFieldDeclaration(FieldDeclaration): + def __init__(self, enum_tag, tag_name): + """ + Create a new variant field declaration. + """ + + isinst = isinstance(enum_tag, EnumerationFieldDeclaration) + if enum_tag is None or not isinst: + raise TypeError("Invalid tag type; must be of type EnumerationFieldDeclaration.") + + self._ft = nbt._bt_ctf_field_type_variant_create(enum_tag._ft, + str(tag_name)) + super().__init__() + + @property + def tag_name(self): + """ + Get the variant's tag name. + """ + + ret = nbt._bt_ctf_field_type_variant_get_tag_name(self._ft) + + if ret is None: + raise TypeError("Could not get Variant tag name") + + return ret + + @property + def tag_type(self): + """ + Get the variant's tag type. + """ + + ret = nbt._bt_ctf_field_type_variant_get_tag_type(self._ft) + + if ret is None: + raise TypeError("Could not get Variant tag type") + + return FieldDeclaration._create_field_declaration_from_native_instance(ret) + + def add_field(self, field_type, field_name): + """ + Add a field of type "field_type" to the variant. + """ + + ret = nbt._bt_ctf_field_type_variant_add_field(self._ft, + field_type._ft, + str(field_name)) + + if ret < 0: + raise ValueError("Could not add field to variant.") + + @property + def fields(self): + """ + Generator returning the variant's field as tuples of (field name, field declaration). + """ + + count = nbt._bt_ctf_field_type_variant_get_field_count(self._ft) + + if count < 0: + raise TypeError("Could not get Variant field count") + + for i in range(count): + field_name = nbt._bt_python_ctf_field_type_variant_get_field_name(self._ft, i) + + if field_name is None: + msg = "Could not get Variant field name at index {}".format(i) + raise TypeError(msg) + + field_type_native = nbt._bt_python_ctf_field_type_variant_get_field_type(self._ft, i) + + if field_type_native is None: + msg = "Could not get Variant field type at index {}".format(i) + raise TypeError(msg) + + field_type = FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + yield (field_name, field_type) + + def get_field_by_name(self, name): + """ + Get a field declaration by name (FieldDeclaration). + """ + + field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_by_name(self._ft, + name) + + if field_type_native is None: + msg = "Could not find Variant field with name {}".format(name) + raise TypeError(msg) + + return FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + + def get_field_from_tag(self, tag): + """ + Get a field declaration from tag (EnumerationField). + """ + + field_type_native = nbt._bt_ctf_field_type_variant_get_field_type_from_tag(self._ft, tag._f) + + if field_type_native is None: + msg = "Could not find Variant field with tag value {}".format(tag.value) + raise TypeError(msg) + + return FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + + +class ArrayFieldDeclaration(FieldDeclaration): + def __init__(self, element_type, length): + """ + Create a new array field declaration. + """ + + self._ft = nbt._bt_ctf_field_type_array_create(element_type._ft, + length) + super().__init__() + + @property + def element_type(self): + """ + Get the array's element type. + """ + + ret = nbt._bt_ctf_field_type_array_get_element_type(self._ft) + + if ret is None: + raise TypeError("Could not get Array element type") + + return FieldDeclaration._create_field_declaration_from_native_instance(ret) + + @property + def length(self): + """ + Get the array's length. + """ + + ret = nbt._bt_ctf_field_type_array_get_length(self._ft) + + if ret < 0: + raise TypeError("Could not get Array length") + + return ret + + +class SequenceFieldDeclaration(FieldDeclaration): + def __init__(self, element_type, length_field_name): + """ + Create a new sequence field declaration. + """ + + self._ft = nbt._bt_ctf_field_type_sequence_create(element_type._ft, + str(length_field_name)) + super().__init__() + + @property + def element_type(self): + """ + Get the sequence's element type. + """ + + ret = nbt._bt_ctf_field_type_sequence_get_element_type(self._ft) + + if ret is None: + raise TypeError("Could not get Sequence element type") + + return FieldDeclaration._create_field_declaration_from_native_instance(ret) + + @property + def length_field_name(self): + """ + Get the sequence's length field name. + """ + + ret = nbt._bt_ctf_field_type_sequence_get_length_field_name(self._ft) + + if ret is None: + raise TypeError("Could not get Sequence length field name") + + return ret + + +class StringFieldDeclaration(FieldDeclaration): + def __init__(self): + """ + Create a new string field declaration. + """ + + self._ft = nbt._bt_ctf_field_type_string_create() + super().__init__() + + @property + def encoding(self): + """ + Get a string declaration's encoding (a constant from the CTFStringEncoding class). + """ + + return nbt._bt_ctf_field_type_string_get_encoding(self._ft) + + @encoding.setter + def encoding(self, encoding): + """ + Set a string declaration's encoding. Must be a constant from the CTFStringEncoding class. + """ + + ret = nbt._bt_ctf_field_type_string_set_encoding(self._ft, encoding) + if ret < 0: + raise ValueError("Could not set string encoding.") + + +@staticmethod +def create_field(field_type): + """ + Create an instance of a field. + """ + isinst = isinstance(field_type, FieldDeclaration) + + if field_type is None or not isinst: + raise TypeError("Invalid field_type. Type must be a FieldDeclaration-derived class.") + + if isinstance(field_type, IntegerFieldDeclaration): + return IntegerField(field_type) + elif isinstance(field_type, EnumerationFieldDeclaration): + return EnumerationField(field_type) + elif isinstance(field_type, FloatFieldDeclaration): + return FloatingPointField(field_type) + elif isinstance(field_type, StructureFieldDeclaration): + return StructureField(field_type) + elif isinstance(field_type, VariantFieldDeclaration): + return VariantField(field_type) + elif isinstance(field_type, ArrayFieldDeclaration): + return ArrayField(field_type) + elif isinstance(field_type, SequenceFieldDeclaration): + return SequenceField(field_type) + elif isinstance(field_type, StringFieldDeclaration): + return StringField(field_type) + + +class Field: + """ + Base class, do not instantiate. + """ + + def __init__(self, field_type): + if not isinstance(field_type, FieldDeclaration): + raise TypeError("Invalid field_type argument.") + + self._f = nbt._bt_ctf_field_create(field_type._ft) + + if self._f is None: + raise ValueError("Field creation failed.") + + def __del__(self): + nbt._bt_ctf_field_put(self._f) + + @staticmethod + def _create_field_from_native_instance(native_field_instance): + type_dict = { + common.CTFTypeId.INTEGER: IntegerField, + common.CTFTypeId.FLOAT: FloatingPointField, + common.CTFTypeId.ENUM: EnumerationField, + common.CTFTypeId.STRING: StringField, + common.CTFTypeId.STRUCT: StructureField, + common.CTFTypeId.VARIANT: VariantField, + common.CTFTypeId.ARRAY: ArrayField, + common.CTFTypeId.SEQUENCE: SequenceField + } + + field_type = nbt._bt_python_get_field_type(native_field_instance) + + if field_type == common.CTFTypeId.UNKNOWN: + raise TypeError("Invalid field instance") + + field = Field.__new__(Field) + field._f = native_field_instance + field.__class__ = type_dict[field_type] + + return field + + @property + def declaration(self): + native_field_type = nbt._bt_ctf_field_get_type(self._f) + + if native_field_type is None: + raise TypeError("Invalid field instance") + return FieldDeclaration._create_field_declaration_from_native_instance( + native_field_type) + + +class IntegerField(Field): + @property + def value(self): + """ + Get an integer field's value. + """ + + signedness = nbt._bt_python_field_integer_get_signedness(self._f) + + if signedness < 0: + raise TypeError("Invalid integer instance.") + + if signedness == 0: + ret, value = nbt._bt_ctf_field_unsigned_integer_get_value(self._f) + else: + ret, value = nbt._bt_ctf_field_signed_integer_get_value(self._f) + + if ret < 0: + raise ValueError("Could not get integer field value.") + + return value + + @value.setter + def value(self, value): + """ + Set an integer field's value. + """ + + if not isinstance(value, int): + raise TypeError("IntegerField's value must be an int") + + signedness = nbt._bt_python_field_integer_get_signedness(self._f) + if signedness < 0: + raise TypeError("Invalid integer instance.") + + if signedness == 0: + ret = nbt._bt_ctf_field_unsigned_integer_set_value(self._f, value) + else: + ret = nbt._bt_ctf_field_signed_integer_set_value(self._f, value) + + if ret < 0: + raise ValueError("Could not set integer field value.") + + +class EnumerationField(Field): + @property + def container(self): + """ + Return the enumeration's underlying container field (an integer field). + """ + + container = IntegerField.__new__(IntegerField) + container._f = nbt._bt_ctf_field_enumeration_get_container(self._f) + + if container._f is None: + raise TypeError("Invalid enumeration field type.") + + return container + + @property + def value(self): + """ + Get the enumeration field's mapping name. + """ + + value = nbt._bt_ctf_field_enumeration_get_mapping_name(self._f) + + if value is None: + raise ValueError("Could not get enumeration's mapping name.") + + return value + + @value.setter + def value(self, value): + """ + Set the enumeration field's value. Must be an integer as mapping names + may be ambiguous. + """ + + if not isinstance(value, int): + raise TypeError("EnumerationField value must be an int") + + self.container.value = value + + +class FloatingPointField(Field): + @property + def value(self): + """ + Get a floating point field's value. + """ + + ret, value = nbt._bt_ctf_field_floating_point_get_value(self._f) + + if ret < 0: + raise ValueError("Could not get floating point field value.") + + return value + + @value.setter + def value(self, value): + """ + Set a floating point field's value. + """ + + if not isinstance(value, int) and not isinstance(value, float): + raise TypeError("Value must be either a float or an int") + + ret = nbt._bt_ctf_field_floating_point_set_value(self._f, float(value)) + + if ret < 0: + raise ValueError("Could not set floating point field value.") + + +# oops!! This class is provided to ensure backward-compatibility since +# a stable release publicly exposed this abomination. +class FloatFieldingPoint(FloatingPointField): + pass + + +class StructureField(Field): + def field(self, field_name): + """ + Get the structure's field corresponding to the provided field name. + """ + + native_instance = nbt._bt_ctf_field_structure_get_field(self._f, + str(field_name)) + + if native_instance is None: + raise ValueError("Invalid field_name provided.") + + return Field._create_field_from_native_instance(native_instance) + + +class VariantField(Field): + def field(self, tag): + """ + Return the variant's selected field. The "tag" field is the selector enum field. + """ + + native_instance = nbt._bt_ctf_field_variant_get_field(self._f, tag._f) + + if native_instance is None: + raise ValueError("Invalid tag provided.") + + return Field._create_field_from_native_instance(native_instance) + + +class ArrayField(Field): + def field(self, index): + """ + Return the array's field at position "index". + """ + + native_instance = nbt._bt_ctf_field_array_get_field(self._f, index) + + if native_instance is None: + raise IndexError("Invalid index provided.") + + return Field._create_field_from_native_instance(native_instance) + + +class SequenceField(Field): + @property + def length(self): + """ + Get the sequence's length field (IntegerField). + """ + + native_instance = nbt._bt_ctf_field_sequence_get_length(self._f) + + if native_instance is None: + length = -1 + + return Field._create_field_from_native_instance(native_instance) + + @length.setter + def length(self, length_field): + """ + Set the sequence's length field (IntegerField). + """ + + if not isinstance(length_field, IntegerField): + raise TypeError("Invalid length field.") + + if length_field.declaration.signed: + raise TypeError("Sequence field length must be unsigned") + + ret = nbt._bt_ctf_field_sequence_set_length(self._f, length_field._f) + + if ret < 0: + raise ValueError("Could not set sequence length.") + + def field(self, index): + """ + Return the sequence's field at position "index". + """ + + native_instance = nbt._bt_ctf_field_sequence_get_field(self._f, index) + + if native_instance is None: + raise ValueError("Could not get sequence element at index.") + + return Field._create_field_from_native_instance(native_instance) + + +class StringField(Field): + @property + def value(self): + """ + Get a string field's value. + """ + + return nbt._bt_ctf_field_string_get_value(self._f) + + @value.setter + def value(self, value): + """ + Set a string field's value. + """ + + ret = nbt._bt_ctf_field_string_set_value(self._f, str(value)) + + if ret < 0: + raise ValueError("Could not set string field value.") + + +class EventClass: + def __init__(self, name): + """ + Create a new event class of the given name. + """ + + self._ec = nbt._bt_ctf_event_class_create(name) + + if self._ec is None: + raise ValueError("Event class creation failed.") + + def __del__(self): + nbt._bt_ctf_event_class_put(self._ec) + + def add_field(self, field_type, field_name): + """ + Add a field of type "field_type" to the event class. + """ + + ret = nbt._bt_ctf_event_class_add_field(self._ec, field_type._ft, + str(field_name)) + + if ret < 0: + raise ValueError("Could not add field to event class.") + + @property + def name(self): + """ + Get the event class' name. + """ + + name = nbt._bt_ctf_event_class_get_name(self._ec) + + if name is None: + raise TypeError("Could not get EventClass name") + + return name + + @property + def id(self): + """ + Get the event class' id. Returns a negative value if unset. + """ + + id = nbt._bt_ctf_event_class_get_id(self._ec) + + if id < 0: + raise TypeError("Could not get EventClass id") + + return id + + @id.setter + def id(self, id): + """ + Set the event class' id. Throws a TypeError if the event class + is already registered to a stream class. + """ + + ret = nbt._bt_ctf_event_class_set_id(self._ec, id) + + if ret < 0: + raise TypeError("Can't change an Event Class's id after it has been assigned to a stream class") + + @property + def stream_class(self): + """ + Get the event class' stream class. Returns None if unset. + """ + stream_class_native = nbt._bt_ctf_event_class_get_stream_class(self._ec) + + if stream_class_native is None: + return None + + stream_class = StreamClass.__new__(StreamClass) + stream_class._sc = stream_class_native + + return stream_class + + @property + def fields(self): + """ + Generator returning the event class' fields as tuples of (field name, field declaration). + """ + + count = nbt._bt_ctf_event_class_get_field_count(self._ec) + + if count < 0: + raise TypeError("Could not get EventClass' field count") + + for i in range(count): + field_name = nbt._bt_python_ctf_event_class_get_field_name(self._ec, i) + + if field_name is None: + msg = "Could not get EventClass' field name at index {}".format(i) + raise TypeError(msg) + + field_type_native = nbt._bt_python_ctf_event_class_get_field_type(self._ec, i) + + if field_type_native is None: + msg = "Could not get EventClass' field type at index {}".format(i) + raise TypeError(msg) + + field_type = FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + yield (field_name, field_type) + + def get_field_by_name(self, name): + """ + Get a field declaration by name (FieldDeclaration). + """ + + field_type_native = nbt._bt_ctf_event_class_get_field_by_name(self._ec, name) + + if field_type_native is None: + msg = "Could not find EventClass field with name {}".format(name) + raise TypeError(msg) + + return FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + + +class Event: + def __init__(self, event_class): + """ + Create a new event of the given event class. + """ + + if not isinstance(event_class, EventClass): + raise TypeError("Invalid event_class argument.") + + self._e = nbt._bt_ctf_event_create(event_class._ec) + + if self._e is None: + raise ValueError("Event creation failed.") + + def __del__(self): + nbt._bt_ctf_event_put(self._e) + + @property + def event_class(self): + """ + Get the event's class. + """ + + event_class_native = nbt._bt_ctf_event_get_class(self._e) + + if event_class_native is None: + return None + + event_class = EventClass.__new__(EventClass) + event_class._ec = event_class_native + + return event_class + + def clock(self): + """ + Get a clock from event. Returns None if the event's class + is not registered to a stream class. + """ + + clock_instance = nbt._bt_ctf_event_get_clock(self._e) + + if clock_instance is None: + return None + + clock = Clock.__new__(Clock) + clock._c = clock_instance + + return clock + + def payload(self, field_name): + """ + Get a field from event. + """ + + native_instance = nbt._bt_ctf_event_get_payload(self._e, + str(field_name)) + + if native_instance is None: + raise ValueError("Could not get event payload.") + + return Field._create_field_from_native_instance(native_instance) + + def set_payload(self, field_name, value_field): + """ + Set a manually created field as an event's payload. + """ + + if not isinstance(value, Field): + raise TypeError("Invalid value type.") + + ret = nbt._bt_ctf_event_set_payload(self._e, str(field_name), + value_field._f) + + if ret < 0: + raise ValueError("Could not set event field payload.") + + +class StreamClass: + def __init__(self, name): + """ + Create a new stream class of the given name. + """ + + self._sc = nbt._bt_ctf_stream_class_create(name) + + if self._sc is None: + raise ValueError("Stream class creation failed.") + + def __del__(self): + nbt._bt_ctf_stream_class_put(self._sc) + + @property + def name(self): + """ + Get a stream class' name. + """ + + name = nbt._bt_ctf_stream_class_get_name(self._sc) + + if name is None: + raise TypeError("Could not get StreamClass name") + + return name + + @property + def clock(self): + """ + Get a stream class' clock. + """ + + clock_instance = nbt._bt_ctf_stream_class_get_clock(self._sc) + + if clock_instance is None: + return None + + clock = Clock.__new__(Clock) + clock._c = clock_instance + + return clock + + @clock.setter + def clock(self, clock): + """ + Assign a clock to a stream class. + """ + + if not isinstance(clock, Clock): + raise TypeError("Invalid clock type.") + + ret = nbt._bt_ctf_stream_class_set_clock(self._sc, clock._c) + + if ret < 0: + raise ValueError("Could not set stream class clock.") + + @property + def id(self): + """ + Get a stream class' id. + """ + + ret = nbt._bt_ctf_stream_class_get_id(self._sc) + + if ret < 0: + raise TypeError("Could not get StreamClass id") + + return ret + + @id.setter + def id(self, id): + """ + Assign an id to a stream class. + """ + + ret = nbt._bt_ctf_stream_class_set_id(self._sc, id) + + if ret < 0: + raise TypeError("Could not set stream class id.") + + @property + def event_classes(self): + """ + Generator returning the stream class' event classes. + """ + + count = nbt._bt_ctf_stream_class_get_event_class_count(self._sc) + + if count < 0: + raise TypeError("Could not get StreamClass' event class count") + + for i in range(count): + event_class_native = nbt._bt_ctf_stream_class_get_event_class(self._sc, i) + + if event_class_native is None: + msg = "Could not get StreamClass' event class at index {}".format(i) + raise TypeError(msg) + + event_class = EventClass.__new__(EventClass) + event_class._ec = event_class_native + yield event_class + + def add_event_class(self, event_class): + """ + Add an event class to a stream class. New events can be added even after a + stream has been instantiated and events have been appended. However, a stream + will not accept events of a class that has not been added to the stream + class beforehand. + """ + + if not isinstance(event_class, EventClass): + raise TypeError("Invalid event_class type.") + + ret = nbt._bt_ctf_stream_class_add_event_class(self._sc, + event_class._ec) + + if ret < 0: + raise ValueError("Could not add event class.") + + @property + def packet_context_type(self): + """ + Get the StreamClass' packet context type (StructureFieldDeclaration) + """ + + field_type_native = nbt._bt_ctf_stream_class_get_packet_context_type(self._sc) + + if field_type_native is None: + raise ValueError("Invalid StreamClass") + + field_type = FieldDeclaration._create_field_declaration_from_native_instance(field_type_native) + + return field_type + + @packet_context_type.setter + def packet_context_type(self, field_type): + """ + Set a StreamClass' packet context type. Must be of type + StructureFieldDeclaration. + """ + + if not isinstance(field_type, StructureFieldDeclaration): + raise TypeError("field_type argument must be of type StructureFieldDeclaration.") + + ret = nbt._bt_ctf_stream_class_set_packet_context_type(self._sc, + field_type._ft) + + if ret < 0: + raise ValueError("Failed to set packet context type.") + + +class Stream: + def __init__(self): + raise NotImplementedError("Stream cannot be instantiated; use Writer.create_stream()") + + def __del__(self): + nbt._bt_ctf_stream_put(self._s) + + @property + def discarded_events(self): + """ + Get a stream's discarded event count. + """ + + ret, count = nbt._bt_ctf_stream_get_discarded_events_count(self._s) + + if ret < 0: + raise ValueError("Could not get the stream's discarded events count") + + return count + + def append_discarded_events(self, event_count): + """ + Increase the current packet's discarded event count. + """ + + nbt._bt_ctf_stream_append_discarded_events(self._s, event_count) + + def append_event(self, event): + """ + Append "event" to the stream's current packet. The stream's associated clock + will be sampled during this call. The event shall not be modified after + being appended to a stream. + """ + + ret = nbt._bt_ctf_stream_append_event(self._s, event._e) + + if ret < 0: + raise ValueError("Could not append event to stream.") + + @property + def packet_context(self): + """ + Get a Stream's packet context field (a StructureField). + """ + + native_field = nbt._bt_ctf_stream_get_packet_context(self._s) + + if native_field is None: + raise ValueError("Invalid Stream.") + + return Field._create_field_from_native_instance(native_field) + + @packet_context.setter + def packet_context(self, field): + """ + Set a Stream's packet context field (must be a StructureField). + """ + + if not isinstance(field, StructureField): + raise TypeError("Argument field must be of type StructureField") + + ret = nbt._bt_ctf_stream_set_packet_context(self._s, field._f) + + if ret < 0: + raise ValueError("Invalid packet context field.") + + def flush(self): + """ + The stream's current packet's events will be flushed to disk. Events + subsequently appended to the stream will be added to a new packet. + """ + + ret = nbt._bt_ctf_stream_flush(self._s) + + if ret < 0: + raise ValueError("Could not flush stream.") + + +class Writer: + def __init__(self, path): + """ + Create a new writer that will produce a trace in the given path. + """ + + self._w = nbt._bt_ctf_writer_create(path) + + if self._w is None: + raise ValueError("Writer creation failed.") + + def __del__(self): + nbt._bt_ctf_writer_put(self._w) + + def create_stream(self, stream_class): + """ + Create a new stream instance and register it to the writer. + """ + + if not isinstance(stream_class, StreamClass): + raise TypeError("Invalid stream_class type.") + + stream = Stream.__new__(Stream) + stream._s = nbt._bt_ctf_writer_create_stream(self._w, stream_class._sc) + + return stream + + def add_environment_field(self, name, value): + """ + Add an environment field to the trace. + """ + + ret = nbt._bt_ctf_writer_add_environment_field(self._w, str(name), + str(value)) + + if ret < 0: + raise ValueError("Could not add environment field to trace.") + + def add_clock(self, clock): + """ + Add a clock to the trace. Clocks assigned to stream classes must be + registered to the writer. + """ + + ret = nbt._bt_ctf_writer_add_clock(self._w, clock._c) + + if ret < 0: + raise ValueError("Could not add clock to Writer.") + + @property + def metadata(self): + """ + Get the trace's TSDL meta-data. + """ + + return nbt._bt_ctf_writer_get_metadata_string(self._w) + + def flush_metadata(self): + """ + Flush the trace's metadata to the metadata file. + """ + + nbt._bt_ctf_writer_flush_metadata(self._w) + + @property + def byte_order(self): + """ + Get the trace's byte order. Must be a constant from the ByteOrder + class. + """ + + raise NotImplementedError("Getter not implemented.") + + @byte_order.setter + def byte_order(self, byte_order): + """ + Set the trace's byte order. Must be a constant from the ByteOrder + class. Defaults to the host machine's endianness + """ + + ret = nbt._bt_ctf_writer_set_byte_order(self._w, byte_order) + + if ret < 0: + raise ValueError("Could not set trace's byte order.") -- 2.34.1