# THE SOFTWARE.
from bt2 import native_bt, object, utils
-import bt2.component
-import bt2.logging
+from bt2 import interrupter as bt2_interrupter
+from bt2 import component as bt2_component
+from bt2 import logging as bt2_logging
+from bt2 import value as bt2_value
import bt2
-class QueryExecutor(object._SharedObject):
- _get_ref = staticmethod(native_bt.query_executor_get_ref)
- _put_ref = staticmethod(native_bt.query_executor_put_ref)
+class _QueryExecutorCommon:
+ @property
+ def _common_ptr(self):
+ return self._as_query_executor_ptr()
- def __init__(self):
- ptr = native_bt.query_executor_create()
+ @property
+ def is_interrupted(self):
+ is_interrupted = native_bt.query_executor_is_interrupted(self._common_ptr)
+ return bool(is_interrupted)
- if ptr is None:
- raise bt2._MemoryError('cannot create query executor object')
+ @property
+ def logging_level(self):
+ return native_bt.query_executor_get_logging_level(self._common_ptr)
- super().__init__(ptr)
- def cancel(self):
- status = native_bt.query_executor_cancel(self._ptr)
- utils._handle_func_status(status, 'cannot cancel query executor object')
+class QueryExecutor(object._SharedObject, _QueryExecutorCommon):
+ _get_ref = staticmethod(native_bt.query_executor_get_ref)
+ _put_ref = staticmethod(native_bt.query_executor_put_ref)
- @property
- def is_canceled(self):
- is_canceled = native_bt.query_executor_is_canceled(self._ptr)
- assert is_canceled >= 0
- return is_canceled > 0
-
- def query(
- self,
- component_class,
- object,
- params=None,
- logging_level=bt2.logging.LoggingLevel.NONE,
- ):
- if self.is_canceled:
- raise bt2.Canceled
-
- if not isinstance(component_class, bt2.component._GenericComponentClass):
+ def _as_query_executor_ptr(self):
+ return self._ptr
+
+ def __init__(self, component_class, object, params=None):
+ if not isinstance(component_class, bt2_component._ComponentClass):
err = False
try:
- if not issubclass(component_class, bt2.component._UserComponent):
+ if not issubclass(component_class, bt2_component._UserComponent):
err = True
except TypeError:
err = True
params = bt2.create_value(params)
params_ptr = params._ptr
- utils._check_log_level(logging_level)
cc_ptr = component_class._bt_component_class_ptr()
+ assert cc_ptr is not None
+ ptr = native_bt.query_executor_create(cc_ptr, object, params_ptr)
+
+ if ptr is None:
+ raise bt2._MemoryError('cannot create query executor object')
+
+ super().__init__(ptr)
+
+ def add_interrupter(self, interrupter):
+ utils._check_type(interrupter, bt2_interrupter.Interrupter)
+ native_bt.query_executor_add_interrupter(self._ptr, interrupter._ptr)
+
+ def interrupt(self):
+ native_bt.query_executor_interrupt(self._ptr)
+
+ def _set_logging_level(self, log_level):
+ utils._check_log_level(log_level)
+ status = native_bt.query_executor_set_logging_level(self._ptr, log_level)
+ utils._handle_func_status(status, "cannot set query executor's logging level")
+
+ logging_level = property(
+ fget=_QueryExecutorCommon.logging_level, fset=_set_logging_level
+ )
- status, result_ptr = native_bt.query_executor_query(
- self._ptr, cc_ptr, object, params_ptr, logging_level
- )
+ @property
+ def is_interrupted(self):
+ is_interrupted = native_bt.query_executor_is_interrupted(self._ptr)
+ return bool(is_interrupted)
+
+ def query(self):
+ status, result_ptr = native_bt.query_executor_query(self._ptr)
utils._handle_func_status(status, 'cannot query component class')
- assert result_ptr
- return bt2.value._create_from_ptr(result_ptr)
+ assert result_ptr is not None
+ return bt2_value._create_from_ptr(result_ptr)
+
+
+class _PrivateQueryExecutor(_QueryExecutorCommon):
+ def __init__(self, ptr):
+ self._ptr = ptr
+
+ def _check_validity(self):
+ if self._ptr is None:
+ raise RuntimeError('this object is not valid anymore')
+
+ def _as_query_executor_ptr(self):
+ self._check_validity()
+ return native_bt.private_query_executor_as_query_executor_const(self._ptr)
+
+ def _invalidate(self):
+ self._ptr = None