utils._check_uint64(mip_version)
if mip_version > bt2.get_maximal_mip_version():
- raise ValueError('unknown MIP version {}'.format(mip_version))
+ raise ValueError("unknown MIP version {}".format(mip_version))
ptr = native_bt.graph_create(mip_version)
if ptr is None:
- raise bt2._MemoryError('cannot create graph object')
+ raise bt2._MemoryError("cannot create graph object")
super().__init__(ptr)
base_cc_ptr = component_class._bt_component_class_ptr()
if obj is not None and not native_bt.bt2_is_python_component_class(base_cc_ptr):
- raise ValueError('cannot pass a Python object to a non-Python component')
+ raise ValueError("cannot pass a Python object to a non-Python component")
if params is not None and not isinstance(params, (dict, bt2.MapValue)):
raise TypeError("'params' parameter is not a 'dict' or a 'bt2.MapValue'.")
status, comp_ptr = add_fn(
self._ptr, cc_ptr, name, params_ptr, obj, logging_level
)
- utils._handle_func_status(status, 'cannot add component to graph')
+ utils._handle_func_status(status, "cannot add component to graph")
assert comp_ptr
return bt2_component._create_component_from_const_ptr_and_get_ref(
comp_ptr, cc_type
status, conn_ptr = native_bt.graph_connect_ports(
self._ptr, upstream_port._ptr, downstream_port._ptr
)
- utils._handle_func_status(status, 'cannot connect component ports within graph')
+ utils._handle_func_status(status, "cannot connect component ports within graph")
assert conn_ptr
return bt2_connection._ConnectionConst._create_from_ptr_and_get_ref(conn_ptr)
listener_ids = fn(self._ptr, listener_from_native)
if listener_ids is None:
- raise bt2._Error('cannot add listener to graph object')
+ raise bt2._Error("cannot add listener to graph object")
# keep the partial's reference
self._listener_partials.append(listener_from_native)
def run_once(self):
status = native_bt.graph_run_once(self._ptr)
- utils._handle_func_status(status, 'graph object could not run once')
+ utils._handle_func_status(status, "graph object could not run once")
def run(self):
status = native_bt.graph_run(self._ptr)
- utils._handle_func_status(status, 'graph object stopped running')
+ utils._handle_func_status(status, "graph object stopped running")
def add_interrupter(self, interrupter):
utils._check_type(interrupter, bt2_interrupter.Interrupter)