def help(self):
return native_bt.component_class_get_help(self._ptr)
- def query(self, obj, params=None):
- return _query(self._ptr, obj, params)
-
def __eq__(self, other):
if not isinstance(other, _GenericComponentClass):
try:
assert(port_ptr)
if comp_ports._is_private:
- port_pub_ptr = native_bt.port_from_private_port(port_ptr)
+ port_pub_ptr = native_bt.port_from_private(port_ptr)
name = native_bt.port_get_name(port_pub_ptr)
native_bt.put(port_pub_ptr)
else:
def __len__(self):
if self._is_private:
- pub_ptr = native_bt.component_from_private_component(self._component._ptr)
+ pub_ptr = native_bt.component_from_private(self._component._ptr)
count = self._get_port_count_fn(pub_ptr)
native_bt.put(pub_ptr)
else:
return '\n'.join(trimmed)
-def _query(comp_cls_ptr, obj, params):
- utils._check_str(obj)
-
- if params is None:
- params_ptr = native_bt.value_null
- else:
- params = bt2.create_value(params)
- params_ptr = params._ptr
-
- results_ptr = native_bt.component_class_query(comp_cls_ptr, obj,
- params_ptr)
-
- if results_ptr is None:
- raise bt2.Error('cannot query info with object "{}"'.format(obj))
-
- return bt2.values._create_from_ptr(results_ptr)
-
-
# Metaclass for component classes defined by Python code.
#
# The Python user can create a standard Python class which inherits one
def addr(cls):
return int(cls._cc_ptr)
- def query(cls, obj, params=None):
- return _query(cls._cc_ptr, obj, params)
-
- def _query_from_native(cls, obj, params_ptr):
+ def _query_from_native(cls, query_exec_ptr, obj, params_ptr):
# this can raise, in which case the native call to
# bt_component_class_query() returns NULL
if params_ptr is not None:
else:
params = None
- try:
- results = cls._query(obj, params)
- except:
- if not _NO_PRINT_TRACEBACK:
- traceback.print_exc()
+ native_bt.get(query_exec_ptr)
+ query_exec = bt2.QueryExecutor._create_from_ptr(query_exec_ptr)
- return
+ # this can raise, but the native side checks the exception
+ results = cls._query(query_exec, obj, params)
if results is NotImplemented:
return results
- try:
- results = bt2.create_value(results)
- except:
- if not _NO_PRINT_TRACEBACK:
- traceback.print_exc()
-
- return
+ # this can raise, but the native side checks the exception
+ results = bt2.create_value(results)
if results is None:
results_addr = int(native_bt.value_null)
return results_addr
- @classmethod
- def _query(cls, obj, params):
+ def _query(cls, query_executor, obj, params):
# BT catches this and returns NULL to the user
return NotImplemented
class _UserComponent(metaclass=_UserComponentType):
@property
def name(self):
- pub_ptr = native_bt.component_from_private_component(self._ptr)
+ pub_ptr = native_bt.component_from_private(self._ptr)
name = native_bt.component_get_name(pub_ptr)
native_bt.put(pub_ptr)
assert(name is not None)
@property
def graph(self):
- pub_ptr = native_bt.component_from_private_component(self._ptr)
+ pub_ptr = native_bt.component_from_private(self._ptr)
ptr = native_bt.component_get_graph(pub_ptr)
native_bt.put(pub_ptr)
assert(ptr)
@property
def component_class(self):
- pub_ptr = native_bt.component_from_private_component(self._ptr)
+ pub_ptr = native_bt.component_from_private(self._ptr)
cc_ptr = native_bt.component_get_class(pub_ptr)
native_bt.put(pub_ptr)
assert(cc_ptr)
other_port = bt2.port._create_from_ptr(other_port_ptr)
try:
- self._port_connected(port, other_port_ptr)
+ self._port_connected(port, other_port)
except:
if not _NO_PRINT_TRACEBACK:
traceback.print_exc()