raise bt2.Stop
elif status == native_bt.SELF_COMPONENT_STATUS_AGAIN:
raise bt2.TryAgain
- elif status == native_bt.SELF_COMPONENT_STATUS_REFUSE_PORT_CONNECTION:
- raise bt2.PortConnectionRefused
elif status < 0:
raise bt2.Error(gen_error_msg)
return results_addr
- def _query(cls, query_executor, obj, params):
+ def _query(cls, query_executor, obj, params, log_level):
raise NotImplementedError
def _component_class_ptr(self):
def _finalize(self):
pass
- def _accept_port_connection(self, port, other_port):
- return True
-
- def _accept_port_connection_from_native(self, self_port_ptr, self_port_type, other_port_ptr):
- port = bt2.port._create_self_from_ptr_and_get_ref(
- self_port_ptr, self_port_type)
-
- if self_port_type == native_bt.PORT_TYPE_OUTPUT:
- other_port_type = native_bt.PORT_TYPE_INPUT
- else:
- other_port_type = native_bt.PORT_TYPE_OUTPUT
-
- other_port = bt2.port._create_from_ptr_and_get_ref(
- other_port_ptr, other_port_type)
- res = self._accept_port_connection(port, other_port_ptr)
-
- if type(res) is not bool:
- raise TypeError("'{}' is not a 'bool' object")
-
- return res
-
def _port_connected(self, port, other_port):
pass