From 9b4f9b425f2efce9a6ccc25f7ae062ebc1116a7d Mon Sep 17 00:00:00 2001 From: Philippe Proulx Date: Sat, 20 Jul 2019 21:30:30 -0400 Subject: [PATCH] lib: add bt_{graph,query_executor}_add_interrupter() This patch makes it possible to add interrupter objects to a graph or to a query executor. To do this, you need to create an interrupter object (bt_interrupter_create()), add it to a graph or to a query executor with the new bt_graph_add_interrupter() and bt_query_executor_add_interrupter() functions. Then you can interrupt the object with bt_interrupter_set(). Within the whole project, the "cancel" terminology is renamed to "interrupt", as many times it is possible to resume an interrupted operation. For example, if you interrupt a running graph and bt_graph_run() returns `BT_GRAPH_RUN_STATUS_AGAIN`, then you can resume the graph's operation by calling bt_graph_run() again. A graph or a query executor is deemed interrupted when any of its interrupters is set. This makes it possible, for example, to add both a global interrupter and a thread-specific interrupter to a graph object so that either the thread-specific action or a global signal handler can interrupt the graph. As a convenience, bt_graph_interrupt() (which was bt_graph_cancel()) and bt_query_executor_interrupt() (which was bt_query_executor_cancel()) are kept: they set a default interrupter which is always part of the object's interrupter set. Conceptually, when a graph runs, each execution context is considered independently interruptible: * The execution of a message iterator. * The execution of a sink component. * The execution of the bt_graph_run() loop itself. bt_graph_add_interrupter() conceptually adds the given interrupter to: * All the graph's message iterators, current and future. * All the graph's sink components. * Itself, for bt_graph_run(). This is needed because message iterators typically won't check if they are interrupted without performing "long" tasks. Because we cannot guarantee that, bt_graph_run() checks if it's interrupted for each sink consuming iteration. This is why bt_graph_is_canceled() and bt_component_graph_is_canceled() do not exist anymore, while the new bt_self_message_iterator_is_interrupted() and bt_self_component_sink_is_interrupted() functions are introduced. However, because we don't need per-message iterator or per-sink component interruption yet, bt_self_message_iterator_is_interrupted() and bt_self_component_sink_is_interrupted() simply check their graph's interrupters directly. The two functions exist so that, in the future, we can add interrupters to a single message iterator (and its upstream message iterators) or to a single sink component if needed. As of this patch, you cannot remove an interrupter from a graph or from a query executor as the project does not need this currently. A message iterator, a sink component, a graph, or a query executor never returns an "interrupted" status. It is the job of the actor which checks the interruption status to return an appropriate status: * Whenever possible, returning an "again" status is the cleanest approach: this indicates that the operation can resume later, even if it was interrupted. * Otherwise, return an error status. The distinction between a "real" "again" status and an "again" status caused by an interruption is easy to make from the side controlling the interrupter: check the interrupter's state to discover it. For example (Python): while True: try: my_graph.run() except bt2.TryAgain: if my_interrupter: print('interrupted by user') sys.exit(1) else: time.sleep(.1) The same should be checked when getting an error, as an error can be the consequence of an interruption (this is what `src.ctf.lttng-live` does currently as it cannot safely interrupt some network interchanges and resume them later). The CLI is changed to have a single, global interrupter object. This interrupter is added to any query executor or graph. The `SIGINT` signal handler sets this interrupter. When getting an "again" or an error status, the CLI checks the interrupter's state to discover if this is the result of a user action, for example: case BT_GRAPH_RUN_STATUS_AGAIN: if (bt_interrupter_is_set(the_interrupter)) { BT_CLI_LOGW_APPEND_CAUSE("Graph was interrupted by user."); goto error; } /* ... */ In the `bt2` Python package: * The `bt2.Canceled` exception is removed as it's not needed anymore. The tests are adapted to demonstrate that. * Graph.cancel() is changed to Graph.interrupt(). * Graph.is_canceled() is removed. * There's a new Graph.add_interrupter() method. * QueryExecutor.cancel() is changed to QueryExecutor.interrupt(). * There's a new QueryExecutor.add_interrupter() method. * There's a new protected _UserMessageIterator._is_interrupted() method. * There's a new protected _UserSinkComponent._is_interrupted() method. Signed-off-by: Philippe Proulx Change-Id: I4d6631d39b585bd440457e7fea53a939ec9aaf3a Reviewed-on: https://review.lttng.org/c/babeltrace/+/1735 Tested-by: jenkins --- CONTRIBUTING.adoc | 2 +- include/babeltrace2/babeltrace.h | 14 +- include/babeltrace2/func-status.h | 10 +- include/babeltrace2/graph/component-const.h | 3 - include/babeltrace2/graph/graph-const.h | 2 - include/babeltrace2/graph/graph.h | 15 +- .../babeltrace2/graph/query-executor-const.h | 9 +- include/babeltrace2/graph/query-executor.h | 16 ++- .../babeltrace2/graph/self-component-sink.h | 3 + .../babeltrace2/graph/self-message-iterator.h | 3 + include/babeltrace2/value-const.h | 2 +- include/babeltrace2/value.h | 2 +- src/bindings/python/bt2/bt2/__init__.py.in | 4 - src/bindings/python/bt2/bt2/component.py | 4 + src/bindings/python/bt2/bt2/graph.py | 18 +-- .../python/bt2/bt2/message_iterator.py | 4 + .../bt2/bt2/native_bt_component_class.i | 2 - src/bindings/python/bt2/bt2/query_executor.py | 20 +-- src/bindings/python/bt2/bt2/utils.py | 5 - src/cli/babeltrace2.c | 130 +++++++++--------- src/common/common.h | 4 +- src/lib/func-status.h | 14 +- src/lib/graph/component-sink.c | 12 ++ src/lib/graph/component.c | 9 -- src/lib/graph/connection.c | 3 +- src/lib/graph/graph.c | 91 +++++++----- src/lib/graph/graph.h | 18 ++- src/lib/graph/interrupter.h | 21 +++ src/lib/graph/iterator.c | 20 ++- src/lib/graph/query-executor.c | 62 +++++++-- src/lib/graph/query-executor.h | 19 ++- src/lib/lib-logging.c | 4 +- src/lib/value.c | 4 +- src/plugins/ctf/lttng-live/lttng-live.c | 21 +-- src/plugins/ctf/lttng-live/lttng-live.h | 2 +- src/plugins/ctf/lttng-live/metadata.c | 5 +- .../ctf/lttng-live/viewer-connection.c | 16 +-- tests/bindings/python/bt2/test_graph.py | 72 +++++++--- .../python/bt2/test_query_executor.py | 39 ++++-- tests/lib/test_bt_values.c | 2 +- 40 files changed, 430 insertions(+), 276 deletions(-) diff --git a/CONTRIBUTING.adoc b/CONTRIBUTING.adoc index 46984b34..d5d86e54 100644 --- a/CONTRIBUTING.adoc +++ b/CONTRIBUTING.adoc @@ -1353,7 +1353,7 @@ an _INFO_ build. * Object copying (except fields and values). * Object freezing (whatever the type, as freezing only occurs in developer mode). -* Object cancellation. +* Object interruption. * Calling user methods and logging the result. * Setting object properties (except fields and values). | diff --git a/include/babeltrace2/babeltrace.h b/include/babeltrace2/babeltrace.h index 1be65906..e0db1254 100644 --- a/include/babeltrace2/babeltrace.h +++ b/include/babeltrace2/babeltrace.h @@ -157,16 +157,16 @@ #include /* Cancel private definitions */ -#undef __BT_FUNC_STATUS_OVERFLOW -#undef __BT_FUNC_STATUS_INVALID_PARAMS +#undef __BT_FUNC_STATUS_AGAIN +#undef __BT_FUNC_STATUS_END +#undef __BT_FUNC_STATUS_END +#undef __BT_FUNC_STATUS_ERROR +#undef __BT_FUNC_STATUS_ERROR +#undef __BT_FUNC_STATUS_INTERRUPTED #undef __BT_FUNC_STATUS_INVALID_OBJECT #undef __BT_FUNC_STATUS_MEMORY_ERROR -#undef __BT_FUNC_STATUS_ERROR -#undef __BT_FUNC_STATUS_OK -#undef __BT_FUNC_STATUS_END #undef __BT_FUNC_STATUS_NOT_FOUND -#undef __BT_FUNC_STATUS_AGAIN -#undef __BT_FUNC_STATUS_CANCELED +#undef __BT_FUNC_STATUS_OK #undef __BT_IN_BABELTRACE_H #undef __BT_UPCAST #undef __BT_UPCAST_CONST diff --git a/include/babeltrace2/func-status.h b/include/babeltrace2/func-status.h index c817b388..78a2f0a4 100644 --- a/include/babeltrace2/func-status.h +++ b/include/babeltrace2/func-status.h @@ -76,12 +76,12 @@ # define __BT_FUNC_STATUS_NOT_FOUND 2 #endif +/* Object is interrupted */ +#ifndef __BT_FUNC_STATUS_INTERRUPTED +# define __BT_FUNC_STATUS_INTERRUPTED 4 +#endif + /* Try operation again later */ #ifndef __BT_FUNC_STATUS_AGAIN # define __BT_FUNC_STATUS_AGAIN 11 #endif - -/* Object is canceled */ -#ifndef __BT_FUNC_STATUS_CANCELED -# define __BT_FUNC_STATUS_CANCELED 125 -#endif diff --git a/include/babeltrace2/graph/component-const.h b/include/babeltrace2/graph/component-const.h index 6ade2d3f..11554b5e 100644 --- a/include/babeltrace2/graph/component-const.h +++ b/include/babeltrace2/graph/component-const.h @@ -79,9 +79,6 @@ bt_bool bt_component_is_sink(const bt_component *component) BT_COMPONENT_CLASS_TYPE_SINK; } -extern bt_bool bt_component_graph_is_canceled( - const bt_component *component); - extern void bt_component_get_ref(const bt_component *component); extern void bt_component_put_ref(const bt_component *component); diff --git a/include/babeltrace2/graph/graph-const.h b/include/babeltrace2/graph/graph-const.h index 44933ffd..168ed2b2 100644 --- a/include/babeltrace2/graph/graph-const.h +++ b/include/babeltrace2/graph/graph-const.h @@ -33,8 +33,6 @@ extern "C" { #endif -extern bt_bool bt_graph_is_canceled(const bt_graph *graph); - extern void bt_graph_get_ref(const bt_graph *graph); extern void bt_graph_put_ref(const bt_graph *graph); diff --git a/include/babeltrace2/graph/graph.h b/include/babeltrace2/graph/graph.h index 059992c2..46aebf98 100644 --- a/include/babeltrace2/graph/graph.h +++ b/include/babeltrace2/graph/graph.h @@ -145,7 +145,6 @@ bt_graph_add_sink_component_with_init_method_data( typedef enum bt_graph_connect_ports_status { BT_GRAPH_CONNECT_PORTS_STATUS_OK = __BT_FUNC_STATUS_OK, BT_GRAPH_CONNECT_PORTS_STATUS_ERROR = __BT_FUNC_STATUS_ERROR, - BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, BT_GRAPH_CONNECT_PORTS_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, } bt_graph_connect_ports_status; @@ -160,7 +159,6 @@ typedef enum bt_graph_run_status { BT_GRAPH_RUN_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, BT_GRAPH_RUN_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN, BT_GRAPH_RUN_STATUS_END = __BT_FUNC_STATUS_END, - BT_GRAPH_RUN_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, } bt_graph_run_status; extern bt_graph_run_status bt_graph_run(bt_graph *graph); @@ -171,7 +169,6 @@ typedef enum bt_graph_consume_status { BT_GRAPH_CONSUME_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, BT_GRAPH_CONSUME_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN, BT_GRAPH_CONSUME_STATUS_END = __BT_FUNC_STATUS_END, - BT_GRAPH_CONSUME_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, } bt_graph_consume_status; extern bt_graph_consume_status bt_graph_consume(bt_graph *graph); @@ -237,11 +234,15 @@ bt_graph_add_filter_sink_component_ports_connected_listener( bt_graph_listener_removed_func listener_removed, void *data, int *listener_id); -typedef enum bt_graph_cancel_status { - BT_GRAPH_CANCEL_STATUS_OK = __BT_FUNC_STATUS_OK, -} bt_graph_cancel_status; +typedef enum bt_graph_add_interrupter_status { + BT_GRAPH_ADD_INTERRUPTER_STATUS_OK = __BT_FUNC_STATUS_OK, + BT_GRAPH_ADD_INTERRUPTER_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, +} bt_graph_add_interrupter_status; -extern bt_graph_cancel_status bt_graph_cancel(bt_graph *graph); +extern bt_graph_add_interrupter_status bt_graph_add_interrupter(bt_graph *graph, + const bt_interrupter *interrupter); + +extern void bt_graph_interrupt(bt_graph *graph); #ifdef __cplusplus } diff --git a/include/babeltrace2/graph/query-executor-const.h b/include/babeltrace2/graph/query-executor-const.h index fde9289b..9b6be7e5 100644 --- a/include/babeltrace2/graph/query-executor-const.h +++ b/include/babeltrace2/graph/query-executor-const.h @@ -33,15 +33,12 @@ extern "C" { #endif -extern -bt_bool bt_query_executor_is_canceled( +extern bt_bool bt_query_executor_is_interrupted( const bt_query_executor *query_executor); -extern void bt_query_executor_get_ref( - const bt_query_executor *query_executor); +extern void bt_query_executor_get_ref(const bt_query_executor *query_executor); -extern void bt_query_executor_put_ref( - const bt_query_executor *query_executor); +extern void bt_query_executor_put_ref(const bt_query_executor *query_executor); #define BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(_var) \ do { \ diff --git a/include/babeltrace2/graph/query-executor.h b/include/babeltrace2/graph/query-executor.h index 332bd62b..ef250462 100644 --- a/include/babeltrace2/graph/query-executor.h +++ b/include/babeltrace2/graph/query-executor.h @@ -40,7 +40,6 @@ bt_query_executor *bt_query_executor_create(void); typedef enum bt_query_executor_query_status { BT_QUERY_EXECUTOR_QUERY_STATUS_OK = __BT_FUNC_STATUS_OK, BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN = __BT_FUNC_STATUS_AGAIN, - BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR = __BT_FUNC_STATUS_ERROR, BT_QUERY_EXECUTOR_QUERY_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT = __BT_FUNC_STATUS_INVALID_OBJECT, @@ -54,13 +53,16 @@ bt_query_executor_query_status bt_query_executor_query( const char *object, const bt_value *params, bt_logging_level logging_level, const bt_value **result); -typedef enum bt_query_executor_cancel_status { - BT_QUERY_EXECUTOR_CANCEL_STATUS_OK = __BT_FUNC_STATUS_OK, -} bt_query_executor_cancel_status; +typedef enum bt_query_executor_add_interrupter_status { + BT_QUERY_EXECUTOR_ADD_INTERRUPTER_STATUS_OK = __BT_FUNC_STATUS_OK, + BT_QUERY_EXECUTOR_ADD_INTERRUPTER_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, +} bt_query_executor_add_interrupter_status; -extern -bt_query_executor_cancel_status bt_query_executor_cancel( - bt_query_executor *query_executor); +extern bt_query_executor_add_interrupter_status +bt_query_executor_add_interrupter(bt_query_executor *query_executor, + const bt_interrupter *interrupter); + +extern void bt_query_executor_interrupt(bt_query_executor *query_executor); #ifdef __cplusplus } diff --git a/include/babeltrace2/graph/self-component-sink.h b/include/babeltrace2/graph/self-component-sink.h index 3513d8a0..224c6781 100644 --- a/include/babeltrace2/graph/self-component-sink.h +++ b/include/babeltrace2/graph/self-component-sink.h @@ -66,6 +66,9 @@ bt_self_component_sink_add_input_port( const char *name, void *user_data, bt_self_component_port_input **self_component_port); +extern bt_bool bt_self_component_sink_is_interrupted( + const bt_self_component_sink *self_component); + #ifdef __cplusplus } #endif diff --git a/include/babeltrace2/graph/self-message-iterator.h b/include/babeltrace2/graph/self-message-iterator.h index a77901da..025eb278 100644 --- a/include/babeltrace2/graph/self-message-iterator.h +++ b/include/babeltrace2/graph/self-message-iterator.h @@ -33,6 +33,9 @@ extern "C" { #endif +extern bt_bool bt_self_message_iterator_is_interrupted( + const bt_self_message_iterator *message_iterator); + extern bt_self_component * bt_self_message_iterator_borrow_component( bt_self_message_iterator *message_iterator); diff --git a/include/babeltrace2/value-const.h b/include/babeltrace2/value-const.h index 3ac4783d..67c05795 100644 --- a/include/babeltrace2/value-const.h +++ b/include/babeltrace2/value-const.h @@ -161,7 +161,7 @@ typedef bt_bool (* bt_value_map_foreach_entry_const_func)(const char *key, typedef enum bt_value_map_foreach_entry_const_status { BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_OK = __BT_FUNC_STATUS_OK, - BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, + BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_INTERRUPTED = __BT_FUNC_STATUS_INTERRUPTED, } bt_value_map_foreach_entry_const_status; extern bt_value_map_foreach_entry_const_status bt_value_map_foreach_entry_const( diff --git a/include/babeltrace2/value.h b/include/babeltrace2/value.h index c5e8d701..c37c090a 100644 --- a/include/babeltrace2/value.h +++ b/include/babeltrace2/value.h @@ -130,7 +130,7 @@ typedef bt_bool (* bt_value_map_foreach_entry_func)(const char *key, typedef enum bt_value_map_foreach_entry_status { BT_VALUE_MAP_FOREACH_ENTRY_STATUS_MEMORY_ERROR = __BT_FUNC_STATUS_MEMORY_ERROR, BT_VALUE_MAP_FOREACH_ENTRY_STATUS_OK = __BT_FUNC_STATUS_OK, - BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED = __BT_FUNC_STATUS_CANCELED, + BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED = __BT_FUNC_STATUS_INTERRUPTED, } bt_value_map_foreach_entry_status; extern bt_value_map_foreach_entry_status bt_value_map_foreach_entry( diff --git a/src/bindings/python/bt2/bt2/__init__.py.in b/src/bindings/python/bt2/bt2/__init__.py.in index a801446c..01c87e51 100644 --- a/src/bindings/python/bt2/bt2/__init__.py.in +++ b/src/bindings/python/bt2/bt2/__init__.py.in @@ -98,10 +98,6 @@ class IncompleteUserClass(Exception): pass -class Canceled(Exception): - pass - - class _ListenerHandle: def __init__(self, listener_id, obj): self._listener_id = listener_id diff --git a/src/bindings/python/bt2/bt2/component.py b/src/bindings/python/bt2/bt2/component.py index 10b11663..43e3ce16 100644 --- a/src/bindings/python/bt2/bt2/component.py +++ b/src/bindings/python/bt2/bt2/component.py @@ -864,3 +864,7 @@ class _UserSinkComponent(_UserComponent, _SinkComponent): raise bt2.CreationError('cannot create message iterator object') return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr) + + @property + def _is_interrupted(self): + return bool(native_bt.self_component_sink_is_interrupted(self._bt_ptr)) diff --git a/src/bindings/python/bt2/bt2/graph.py b/src/bindings/python/bt2/bt2/graph.py index e9659bd2..6d083228 100644 --- a/src/bindings/python/bt2/bt2/graph.py +++ b/src/bindings/python/bt2/bt2/graph.py @@ -21,6 +21,7 @@ # THE SOFTWARE. from bt2 import native_bt, object, utils +import bt2.interrupter import bt2.connection import bt2.component import functools @@ -170,24 +171,19 @@ class Graph(object._SharedObject): status = native_bt.graph_run(self._ptr) try: - utils._handle_func_status( - status, 'graph object stopped running because of an unexpected error' - ) + utils._handle_func_status(status, 'graph object stopped running') except bt2.Stop: # done return except Exception: raise - def cancel(self): - status = native_bt.graph_cancel(self._ptr) - utils._handle_func_status(status, 'cannot cancel graph object') + def add_interrupter(self, interrupter): + utils._check_type(interrupter, bt2.interrupter.Interrupter) + native_bt.graph_add_interrupter(self._ptr, interrupter._ptr) - @property - def is_canceled(self): - is_canceled = native_bt.graph_is_canceled(self._ptr) - assert is_canceled >= 0 - return is_canceled > 0 + def interrupt(self): + native_bt.graph_interrupt(self._ptr) def create_output_port_message_iterator(self, output_port): utils._check_type(output_port, bt2.port._OutputPort) diff --git a/src/bindings/python/bt2/bt2/message_iterator.py b/src/bindings/python/bt2/bt2/message_iterator.py index 07c707db..25d3cbb1 100644 --- a/src/bindings/python/bt2/bt2/message_iterator.py +++ b/src/bindings/python/bt2/bt2/message_iterator.py @@ -138,6 +138,10 @@ class _UserMessageIterator(_MessageIterator): def addr(self): return int(self._bt_ptr) + @property + def _is_interrupted(self): + return bool(native_bt.self_message_iterator_is_interrupted(self._bt_ptr)) + def _finalize(self): pass diff --git a/src/bindings/python/bt2/bt2/native_bt_component_class.i b/src/bindings/python/bt2/bt2/native_bt_component_class.i index c33b9caf..63dcc0ba 100644 --- a/src/bindings/python/bt2/bt2/native_bt_component_class.i +++ b/src/bindings/python/bt2/bt2/native_bt_component_class.i @@ -97,7 +97,6 @@ static PyObject *py_mod_bt2_exc_error_type = NULL; static PyObject *py_mod_bt2_exc_memory_error = NULL; static PyObject *py_mod_bt2_exc_try_again_type = NULL; static PyObject *py_mod_bt2_exc_stop_type = NULL; -static PyObject *py_mod_bt2_exc_msg_iter_canceled_type = NULL; static PyObject *py_mod_bt2_exc_invalid_object_type = NULL; static PyObject *py_mod_bt2_exc_invalid_params_type = NULL; @@ -154,7 +153,6 @@ void bt_bt2_cc_exit_handler(void) Py_XDECREF(py_mod_bt2_exc_error_type); Py_XDECREF(py_mod_bt2_exc_try_again_type); Py_XDECREF(py_mod_bt2_exc_stop_type); - Py_XDECREF(py_mod_bt2_exc_msg_iter_canceled_type); Py_XDECREF(py_mod_bt2_exc_invalid_object_type); Py_XDECREF(py_mod_bt2_exc_invalid_params_type); } diff --git a/src/bindings/python/bt2/bt2/query_executor.py b/src/bindings/python/bt2/bt2/query_executor.py index 6e2bbc81..e9d3d761 100644 --- a/src/bindings/python/bt2/bt2/query_executor.py +++ b/src/bindings/python/bt2/bt2/query_executor.py @@ -21,6 +21,7 @@ # THE SOFTWARE. from bt2 import native_bt, object, utils +import bt2.interrupter import bt2.component import bt2.logging import bt2 @@ -38,15 +39,17 @@ class QueryExecutor(object._SharedObject): super().__init__(ptr) - def cancel(self): - status = native_bt.query_executor_cancel(self._ptr) - utils._handle_func_status(status, 'cannot cancel query executor object') + def add_interrupter(self, interrupter): + utils._check_type(interrupter, bt2.interrupter.Interrupter) + native_bt.query_executor_add_interrupter(self._ptr, interrupter._ptr) + + def interrupt(self): + native_bt.query_executor_interrupt(self._ptr) @property - def is_canceled(self): - is_canceled = native_bt.query_executor_is_canceled(self._ptr) - assert is_canceled >= 0 - return is_canceled > 0 + def is_interrupted(self): + is_interrupted = native_bt.query_executor_is_interrupted(self._ptr) + return bool(is_interrupted) def query( self, @@ -55,9 +58,6 @@ class QueryExecutor(object._SharedObject): params=None, logging_level=bt2.logging.LoggingLevel.NONE, ): - if self.is_canceled: - raise bt2.Canceled - if not isinstance(component_class, bt2.component._GenericComponentClass): err = False diff --git a/src/bindings/python/bt2/bt2/utils.py b/src/bindings/python/bt2/bt2/utils.py index bd4dd7a9..aa871042 100644 --- a/src/bindings/python/bt2/bt2/utils.py +++ b/src/bindings/python/bt2/bt2/utils.py @@ -151,11 +151,6 @@ def _handle_func_status(status, msg=None): raise bt2.TryAgain else: raise bt2.TryAgain(msg) - elif status == native_bt.__BT_FUNC_STATUS_CANCELED: - if msg is None: - raise bt2.Canceled - else: - raise bt2.Canceled(msg) elif status == native_bt.__BT_FUNC_STATUS_OVERFLOW: if msg is None: raise bt2.OverflowError diff --git a/src/cli/babeltrace2.c b/src/cli/babeltrace2.c index 0eeb8770..9699b0c3 100644 --- a/src/cli/babeltrace2.c +++ b/src/cli/babeltrace2.c @@ -55,10 +55,9 @@ static const char* log_level_env_var_names[] = { NULL, }; -/* Application's processing graph (weak) */ -static bt_graph *the_graph; -static bt_query_executor *the_query_executor; -static bool canceled = false; +/* Application's interrupter (owned by this) */ +static bt_interrupter *the_interrupter; +static volatile bool interrupted = false; #ifdef __MINGW32__ @@ -66,12 +65,11 @@ static bool canceled = false; static BOOL WINAPI signal_handler(DWORD signal) { - if (the_graph) { - bt_graph_cancel(the_graph); + if (the_interrupter) { + bt_interrupter_set(the_interrupter); } - canceled = true; - + interrupted = true; return TRUE; } @@ -92,15 +90,11 @@ void signal_handler(int signum) return; } - if (the_graph) { - bt_graph_cancel(the_graph); - } - - if (the_query_executor) { - bt_query_executor_cancel(the_query_executor); + if (the_interrupter) { + bt_interrupter_set(the_interrupter); } - canceled = true; + interrupted = true; } static @@ -120,26 +114,6 @@ void set_signal_handler(void) #endif /* __MINGW32__ */ -static -int create_the_query_executor(void) -{ - int ret = 0; - - the_query_executor = bt_query_executor_create(); - if (!the_query_executor) { - BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor."); - ret = -1; - } - - return ret; -} - -static -void destroy_the_query_executor(void) -{ - BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(the_query_executor); -} - static int query(struct bt_config *cfg, const bt_component_class *comp_cls, const char *obj, const bt_value *params, @@ -147,30 +121,33 @@ int query(struct bt_config *cfg, const bt_component_class *comp_cls, { const bt_value *result = NULL; bt_query_executor_query_status query_status; + bt_query_executor *query_exec; *fail_reason = "unknown error"; int ret = 0; BT_ASSERT(fail_reason); BT_ASSERT(user_result); - ret = create_the_query_executor(); - if (ret) { - /* create_the_query_executor() logs errors */ - goto end; + query_exec = bt_query_executor_create(); + if (!query_exec) { + BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor."); + goto error; } - if (canceled) { + bt_query_executor_add_interrupter(query_exec, the_interrupter); + + if (interrupted) { BT_CLI_LOGW_APPEND_CAUSE( - "Canceled by user before executing the query: " + "Interrupted by user before executing the query: " "comp-cls-addr=%p, comp-cls-name=\"%s\", " "query-obj=\"%s\"", comp_cls, bt_component_class_get_name(comp_cls), obj); - *fail_reason = "canceled by user"; + *fail_reason = "interrupted by user"; goto error; } while (true) { query_status = bt_query_executor_query( - the_query_executor, comp_cls, obj, params, + query_exec, comp_cls, obj, params, cfg->log_level, &result); switch (query_status) { case BT_QUERY_EXECUTOR_QUERY_STATUS_OK: @@ -179,29 +156,36 @@ int query(struct bt_config *cfg, const bt_component_class *comp_cls, { const uint64_t sleep_time_us = 100000; + if (bt_interrupter_is_set(the_interrupter)) { + *fail_reason = "interrupted by user"; + goto error; + } + /* Wait 100 ms and retry */ BT_LOGD("Got BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN: sleeping: " "time-us=%" PRIu64, sleep_time_us); if (usleep(sleep_time_us)) { - if (bt_query_executor_is_canceled(the_query_executor)) { + if (bt_interrupter_is_set(the_interrupter)) { BT_CLI_LOGW_APPEND_CAUSE( - "Query was canceled by user: " + "Query was interrupted by user: " "comp-cls-addr=%p, comp-cls-name=\"%s\", " "query-obj=\"%s\"", comp_cls, bt_component_class_get_name(comp_cls), obj); - *fail_reason = "canceled by user"; + *fail_reason = "interrupted by user"; goto error; } } continue; } - case BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED: - *fail_reason = "canceled by user"; - goto error; case BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR: + if (bt_interrupter_is_set(the_interrupter)) { + *fail_reason = "interrupted by user"; + goto error; + } + goto error; case BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT: *fail_reason = "invalid or unknown query object"; @@ -228,7 +212,7 @@ error: ret = -1; end: - destroy_the_query_executor(); + bt_query_executor_put_ref(query_exec); bt_value_put_ref(result); return ret; } @@ -1604,9 +1588,6 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component( switch (connect_ports_status) { case BT_GRAPH_CONNECT_PORTS_STATUS_OK: break; - case BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED: - BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user."); - break; default: BT_CLI_LOGE_APPEND_CAUSE( "Cannot create connection: graph refuses to connect ports: " @@ -1826,7 +1807,6 @@ void cmd_run_ctx_destroy(struct cmd_run_ctx *ctx) } BT_GRAPH_PUT_REF_AND_RESET(ctx->graph); - the_graph = NULL; ctx->cfg = NULL; } @@ -1870,7 +1850,7 @@ int cmd_run_ctx_init(struct cmd_run_ctx *ctx, struct bt_config *cfg) goto error; } - the_graph = ctx->graph; + bt_graph_add_interrupter(ctx->graph, the_interrupter); add_listener_status = bt_graph_add_source_component_output_port_added_listener( ctx->graph, graph_source_output_port_added_listener, NULL, ctx, NULL); @@ -2331,9 +2311,9 @@ int cmd_run(struct bt_config *cfg) goto error; } - if (canceled) { + if (interrupted) { BT_CLI_LOGW_APPEND_CAUSE( - "Canceled by user before creating components."); + "Interrupted by user before creating components."); goto error; } @@ -2345,9 +2325,9 @@ int cmd_run(struct bt_config *cfg) goto error; } - if (canceled) { + if (interrupted) { BT_CLI_LOGW_APPEND_CAUSE( - "Canceled by user before connecting components."); + "Interrupted by user before connecting components."); goto error; } @@ -2360,9 +2340,9 @@ int cmd_run(struct bt_config *cfg) goto error; } - if (canceled) { + if (interrupted) { BT_CLI_LOGW_APPEND_CAUSE( - "Canceled by user before running the graph."); + "Interrupted by user before running the graph."); goto error; } @@ -2385,13 +2365,10 @@ int cmd_run(struct bt_config *cfg) switch (run_status) { case BT_GRAPH_RUN_STATUS_OK: break; - case BT_GRAPH_RUN_STATUS_CANCELED: - BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user."); - goto error; case BT_GRAPH_RUN_STATUS_AGAIN: - if (bt_graph_is_canceled(ctx.graph)) { + if (bt_interrupter_is_set(the_interrupter)) { BT_CLI_LOGW_APPEND_CAUSE( - "Graph was canceled by user."); + "Graph was interrupted by user."); goto error; } @@ -2401,9 +2378,9 @@ int cmd_run(struct bt_config *cfg) cfg->cmd_data.run.retry_duration_us); if (usleep(cfg->cmd_data.run.retry_duration_us)) { - if (bt_graph_is_canceled(ctx.graph)) { + if (bt_interrupter_is_set(the_interrupter)) { BT_CLI_LOGW_APPEND_CAUSE( - "Graph was canceled by user."); + "Graph was interrupted by user."); goto error; } } @@ -2412,6 +2389,14 @@ int cmd_run(struct bt_config *cfg) case BT_GRAPH_RUN_STATUS_END: goto end; default: + if (bt_interrupter_is_set(the_interrupter)) { + BT_CLI_LOGW_APPEND_CAUSE( + "Graph was interrupted by user and failed: " + "status=%s", + bt_common_func_status_string(run_status)); + goto error; + } + BT_CLI_LOGE_APPEND_CAUSE( "Graph failed to complete successfully"); goto error; @@ -2705,6 +2690,14 @@ int main(int argc, const char **argv) } } + BT_ASSERT(!the_interrupter); + the_interrupter = bt_interrupter_create(); + if (!the_interrupter) { + BT_CLI_LOGE_APPEND_CAUSE("Failed to create an interrupter object."); + retcode = 1; + goto end; + } + BT_LOGI("Executing command: cmd=%d, command-name=\"%s\"", cfg->command, cfg->command_name); @@ -2740,6 +2733,7 @@ int main(int argc, const char **argv) end: BT_OBJECT_PUT_REF_AND_RESET(cfg); fini_loaded_plugins(); + bt_interrupter_put_ref(the_interrupter); if (retcode != 0) { print_error_causes(); diff --git a/src/common/common.h b/src/common/common.h index 5156e1b0..8429145c 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -619,8 +619,8 @@ const char *bt_common_func_status_string(int status) return "NOT_FOUND"; case __BT_FUNC_STATUS_AGAIN: return "AGAIN"; - case __BT_FUNC_STATUS_CANCELED: - return "CANCELED"; + case __BT_FUNC_STATUS_INTERRUPTED: + return "INTERRUPTED"; default: return "(unknown)"; } diff --git a/src/lib/func-status.h b/src/lib/func-status.h index f5132851..658a82d7 100644 --- a/src/lib/func-status.h +++ b/src/lib/func-status.h @@ -30,15 +30,15 @@ * Aliases without a `__` prefix for internal code: this is just easier * to read. */ -#define BT_FUNC_STATUS_OVERFLOW __BT_FUNC_STATUS_OVERFLOW -#define BT_FUNC_STATUS_INVALID_PARAMS __BT_FUNC_STATUS_INVALID_PARAMS +#define BT_FUNC_STATUS_AGAIN __BT_FUNC_STATUS_AGAIN +#define BT_FUNC_STATUS_END __BT_FUNC_STATUS_END +#define BT_FUNC_STATUS_ERROR __BT_FUNC_STATUS_ERROR +#define BT_FUNC_STATUS_INTERRUPTED __BT_FUNC_STATUS_INTERRUPTED #define BT_FUNC_STATUS_INVALID_OBJECT __BT_FUNC_STATUS_INVALID_OBJECT +#define BT_FUNC_STATUS_INVALID_PARAMS __BT_FUNC_STATUS_INVALID_PARAMS #define BT_FUNC_STATUS_MEMORY_ERROR __BT_FUNC_STATUS_MEMORY_ERROR -#define BT_FUNC_STATUS_ERROR __BT_FUNC_STATUS_ERROR -#define BT_FUNC_STATUS_OK __BT_FUNC_STATUS_OK -#define BT_FUNC_STATUS_END __BT_FUNC_STATUS_END #define BT_FUNC_STATUS_NOT_FOUND __BT_FUNC_STATUS_NOT_FOUND -#define BT_FUNC_STATUS_AGAIN __BT_FUNC_STATUS_AGAIN -#define BT_FUNC_STATUS_CANCELED __BT_FUNC_STATUS_CANCELED +#define BT_FUNC_STATUS_OK __BT_FUNC_STATUS_OK +#define BT_FUNC_STATUS_OVERFLOW __BT_FUNC_STATUS_OVERFLOW #endif /* BABELTRACE_FUNC_STATUS_INTERNAL_H */ diff --git a/src/lib/graph/component-sink.c b/src/lib/graph/component-sink.c index b218144d..9d4e11d4 100644 --- a/src/lib/graph/component-sink.c +++ b/src/lib/graph/component-sink.c @@ -26,6 +26,7 @@ #include "common/assert.h" #include "lib/assert-pre.h" +#include "lib/assert-post.h" #include "compat/compiler.h" #include #include @@ -34,6 +35,7 @@ #include "component-sink.h" #include "component.h" +#include "graph.h" #include "lib/func-status.h" BT_HIDDEN @@ -141,6 +143,16 @@ end: return status; } +bt_bool bt_self_component_sink_is_interrupted( + const struct bt_self_component_sink *self_comp) +{ + struct bt_component *comp = (void *) self_comp; + + BT_ASSERT_PRE_NON_NULL(comp, "Component"); + return (bt_bool) bt_graph_is_interrupted( + bt_component_borrow_graph(comp)); +} + void bt_component_sink_get_ref( const struct bt_component_sink *component_sink) { diff --git a/src/lib/graph/component.c b/src/lib/graph/component.c index 22119689..11144fb4 100644 --- a/src/lib/graph/component.c +++ b/src/lib/graph/component.c @@ -208,9 +208,6 @@ enum bt_self_component_add_port_status add_port( BT_ASSERT_PRE_NON_NULL(name, "Name"); BT_ASSERT_PRE(strlen(name) > 0, "Name is empty"); graph = bt_component_borrow_graph(component); - BT_ASSERT_PRE(graph && !bt_graph_is_canceled(graph), - "Component's graph is canceled: %![comp-]+c, %![graph-]+g", - component, graph); BT_ASSERT_PRE( graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Component's graph is already configured: " @@ -392,12 +389,6 @@ void bt_component_set_graph(struct bt_component *component, graph ? &graph->base : NULL); } -bt_bool bt_component_graph_is_canceled(const struct bt_component *component) -{ - return bt_graph_is_canceled( - (void *) bt_object_borrow_parent(&component->base)); -} - static struct bt_port *borrow_port_by_name(GPtrArray *ports, const char *name) diff --git a/src/lib/graph/connection.c b/src/lib/graph/connection.c index a50828fb..64b0fda0 100644 --- a/src/lib/graph/connection.c +++ b/src/lib/graph/connection.c @@ -50,8 +50,7 @@ void destroy_connection(struct bt_object *obj) /* * Make sure that each message iterator which was created for * this connection is finalized before we destroy it. Once a - * message iterator is finalized, all its method return NULL or - * the BT_MESSAGE_ITERATOR_STATUS_CANCELED status. + * message iterator is finalized, you cannot use it. * * Because connections are destroyed before components within a * graph, this ensures that message iterators are always diff --git a/src/lib/graph/graph.c b/src/lib/graph/graph.c index af6b2b2d..192ec9e5 100644 --- a/src/lib/graph/graph.c +++ b/src/lib/graph/graph.c @@ -46,6 +46,7 @@ #include "component-sink.h" #include "connection.h" #include "graph.h" +#include "interrupter.h" #include "message/event.h" #include "message/packet.h" @@ -133,12 +134,7 @@ void destroy_graph(struct bt_object *obj) */ BT_LIB_LOGI("Destroying graph: %!+g", graph); obj->ref_count++; - - /* - * Cancel the graph to disallow some operations, like creating - * message iterators and adding ports to components. - */ - (void) bt_graph_cancel((void *) graph); + graph->config_state = BT_GRAPH_CONFIGURATION_STATE_DESTROYING; /* Call all remove listeners */ CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added, @@ -175,6 +171,14 @@ void destroy_graph(struct bt_object *obj) graph->components = NULL; } + if (graph->interrupters) { + BT_LOGD_STR("Putting interrupters."); + g_ptr_array_free(graph->interrupters, TRUE); + graph->interrupters = NULL; + } + + BT_OBJECT_PUT_REF_AND_RESET(graph->default_interrupter); + if (graph->sinks_to_consume) { g_queue_free(graph->sinks_to_consume); graph->sinks_to_consume = NULL; @@ -353,6 +357,21 @@ struct bt_graph *bt_graph_create(void) goto error; } + graph->interrupters = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_put_no_null_check); + if (!graph->interrupters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray."); + goto error; + } + + graph->default_interrupter = bt_interrupter_create(); + if (!graph->default_interrupter) { + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to create one interrupter object."); + goto error; + } + + bt_graph_add_interrupter(graph, graph->default_interrupter); ret = bt_object_pool_initialize(&graph->event_msg_pool, (bt_object_pool_new_object_func) bt_message_event_new, (bt_object_pool_destroy_object_func) destroy_message_event, @@ -417,7 +436,6 @@ enum bt_graph_connect_ports_status bt_graph_connect_ports( BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port"); BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE( graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not in the \"configuring\" state: %!+g", graph); @@ -687,7 +705,6 @@ enum bt_graph_consume_status bt_graph_consume(struct bt_graph *graph) enum bt_graph_consume_status status; BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE_DEV(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE_DEV(graph->can_consume, "Cannot consume graph in its current state: %!+g", graph); BT_ASSERT_PRE_DEV(graph->config_state != @@ -712,7 +729,6 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph) enum bt_graph_run_status status; BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE(graph->can_consume, "Cannot consume graph in its current state: %!+g", graph); BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY, @@ -728,15 +744,15 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph) do { /* - * Check if the graph is canceled at each iteration. If - * the graph was canceled by another thread or by a - * signal handler, this is not a warning nor an error, - * it was intentional: log with a DEBUG level only. + * Check if the graph is interrupted at each iteration. + * If the graph was interrupted by another thread or by + * a signal handler, this is NOT a warning nor an error; + * it was intentional: log with an INFO level only. */ - if (G_UNLIKELY(graph->canceled)) { - BT_LIB_LOGI("Stopping the graph: graph is canceled: " - "%!+g", graph); - status = BT_FUNC_STATUS_CANCELED; + if (G_UNLIKELY(bt_graph_is_interrupted(graph))) { + BT_LIB_LOGI("Stopping the graph: " + "graph was interrupted: %!+g", graph); + status = BT_FUNC_STATUS_AGAIN; goto end; } @@ -1209,20 +1225,6 @@ end: return status; } -enum bt_graph_cancel_status bt_graph_cancel(struct bt_graph *graph) -{ - BT_ASSERT_PRE_NON_NULL(graph, "Graph"); - graph->canceled = true; - BT_LIB_LOGI("Canceled graph: %!+i", graph); - return BT_FUNC_STATUS_OK; -} - -bt_bool bt_graph_is_canceled(const struct bt_graph *graph) -{ - BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph"); - return graph->canceled ? BT_TRUE : BT_FALSE; -} - BT_HIDDEN void bt_graph_remove_connection(struct bt_graph *graph, struct bt_connection *connection) @@ -1275,7 +1277,6 @@ int add_component_with_init_method_data( BT_ASSERT(comp_cls); BT_ASSERT_PRE_NON_NULL(graph, "Graph"); BT_ASSERT_PRE_NON_NULL(name, "Name"); - BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph); BT_ASSERT_PRE( graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, "Graph is not in the \"configuring\" state: %!+g", graph); @@ -1559,6 +1560,32 @@ void bt_graph_add_message(struct bt_graph *graph, g_ptr_array_add(graph->messages, msg); } +BT_HIDDEN +bool bt_graph_is_interrupted(const struct bt_graph *graph) +{ + BT_ASSERT(graph); + return bt_interrupter_array_any_is_set(graph->interrupters); +} + +enum bt_graph_add_interrupter_status bt_graph_add_interrupter( + struct bt_graph *graph, const struct bt_interrupter *intr) +{ + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + BT_ASSERT_PRE_NON_NULL(intr, "Interrupter"); + g_ptr_array_add(graph->interrupters, (void *) intr); + bt_object_get_no_null_check(intr); + BT_LIB_LOGD("Added interrupter to graph: %![graph-]+g, %![intr-]+z", + graph, intr); + return BT_FUNC_STATUS_OK; +} + +void bt_graph_interrupt(struct bt_graph *graph) +{ + BT_ASSERT_PRE_NON_NULL(graph, "Graph"); + bt_interrupter_set(graph->default_interrupter); + BT_LIB_LOGI("Interrupted graph: %!+g", graph); +} + void bt_graph_get_ref(const struct bt_graph *graph) { bt_object_get_ref(graph); diff --git a/src/lib/graph/graph.h b/src/lib/graph/graph.h index 74c79470..4e500d49 100644 --- a/src/lib/graph/graph.h +++ b/src/lib/graph/graph.h @@ -67,6 +67,7 @@ enum bt_graph_configuration_state { BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED, BT_GRAPH_CONFIGURATION_STATE_CONFIGURED, BT_GRAPH_CONFIGURATION_STATE_FAULTY, + BT_GRAPH_CONFIGURATION_STATE_DESTROYING, }; struct bt_graph { @@ -89,7 +90,19 @@ struct bt_graph { /* Queue of pointers (weak references) to sink bt_components. */ GQueue *sinks_to_consume; - bool canceled; + /* + * Array of `struct bt_interrupter *`, each one owned by this. + * If any interrupter is set, then this graph is deemed + * interrupted. + */ + GPtrArray *interrupters; + + /* + * Default interrupter to support bt_graph_interrupt(); owned + * by this. + */ + struct bt_interrupter *default_interrupter; + bool in_remove_listener; bool has_sink; @@ -184,6 +197,9 @@ BT_HIDDEN void bt_graph_add_message(struct bt_graph *graph, struct bt_message *msg); +BT_HIDDEN +bool bt_graph_is_interrupted(const struct bt_graph *graph); + static inline const char *bt_graph_configuration_state_string( enum bt_graph_configuration_state state) diff --git a/src/lib/graph/interrupter.h b/src/lib/graph/interrupter.h index 46fd153b..149d0c11 100644 --- a/src/lib/graph/interrupter.h +++ b/src/lib/graph/interrupter.h @@ -33,4 +33,25 @@ struct bt_interrupter { bool is_set; }; +static inline +bool bt_interrupter_array_any_is_set(const GPtrArray *interrupters) +{ + bool is_set = false; + uint64_t i; + + BT_ASSERT(interrupters); + + for (i = 0; i < interrupters->len; i++) { + const struct bt_interrupter *intr = interrupters->pdata[i]; + + if (intr->is_set) { + is_set = true; + goto end; + } + } + +end: + return is_set; +} + #endif /* BABELTRACE_GRAPH_INTERRUPTER_INTERNAL_H */ diff --git a/src/lib/graph/iterator.c b/src/lib/graph/iterator.c index 254e422f..b5033083 100644 --- a/src/lib/graph/iterator.c +++ b/src/lib/graph/iterator.c @@ -348,17 +348,16 @@ create_self_component_input_port_message_iterator( "Input port is not connected: %![port-]+p", port); BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p", port); - BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp), - "Input port's component's graph is canceled: " - "%![port-]+p, %![comp-]+c", port, comp); BT_ASSERT(port->connection); upstream_port = port->connection->upstream_port; BT_ASSERT(upstream_port); upstream_comp = bt_port_borrow_component_inline(upstream_port); BT_ASSERT(upstream_comp); BT_ASSERT_PRE( - bt_component_borrow_graph(upstream_comp)->config_state != - BT_GRAPH_CONFIGURATION_STATE_CONFIGURING, + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED || + bt_component_borrow_graph(upstream_comp)->config_state == + BT_GRAPH_CONFIGURATION_STATE_CONFIGURED, "Graph is not configured: %!+g", bt_component_borrow_graph(upstream_comp)); upstream_comp_cls = upstream_comp->class; @@ -964,7 +963,6 @@ enum bt_message_iterator_next_status bt_port_output_message_iterator_next( graph_status = bt_graph_consume_sink_no_check(iterator->graph, iterator->colander); switch (graph_status) { - case BT_FUNC_STATUS_CANCELED: case BT_FUNC_STATUS_AGAIN: case BT_FUNC_STATUS_END: case BT_FUNC_STATUS_MEMORY_ERROR: @@ -2119,6 +2117,16 @@ bt_port_output_message_iterator_seek_beginning( iterator)); } +bt_bool bt_self_message_iterator_is_interrupted( + const struct bt_self_message_iterator *self_msg_iter) +{ + const struct bt_self_component_port_input_message_iterator *iterator = + (const void *) self_msg_iter; + + BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator"); + return (bt_bool) bt_graph_is_interrupted(iterator->graph); +} + void bt_port_output_message_iterator_get_ref( const struct bt_port_output_message_iterator *iterator) { diff --git a/src/lib/graph/query-executor.c b/src/lib/graph/query-executor.c index 3a00c9b8..1addeeb3 100644 --- a/src/lib/graph/query-executor.c +++ b/src/lib/graph/query-executor.c @@ -37,6 +37,7 @@ #include "component-class.h" #include "query-executor.h" +#include "interrupter.h" #include "lib/func-status.h" static @@ -46,6 +47,15 @@ void bt_query_executor_destroy(struct bt_object *obj) container_of(obj, struct bt_query_executor, base); BT_LOGD("Destroying query executor: addr=%p", query_exec); + + if (query_exec->interrupters) { + BT_LOGD_STR("Putting interrupters."); + g_ptr_array_free(query_exec->interrupters, TRUE); + query_exec->interrupters = NULL; + } + + BT_OBJECT_PUT_REF_AND_RESET(query_exec->default_interrupter); + g_free(query_exec); } @@ -61,6 +71,24 @@ struct bt_query_executor *bt_query_executor_create(void) goto end; } + query_exec->interrupters = g_ptr_array_new_with_free_func( + (GDestroyNotify) bt_object_put_no_null_check); + if (!query_exec->interrupters) { + BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray."); + BT_OBJECT_PUT_REF_AND_RESET(query_exec); + goto end; + } + + query_exec->default_interrupter = bt_interrupter_create(); + if (!query_exec->default_interrupter) { + BT_LIB_LOGE_APPEND_CAUSE( + "Failed to create one interrupter object."); + BT_OBJECT_PUT_REF_AND_RESET(query_exec); + goto end; + } + + bt_query_executor_add_interrupter(query_exec, + query_exec->default_interrupter); bt_object_init_shared(&query_exec->base, bt_query_executor_destroy); BT_LOGD("Created query executor: addr=%p", query_exec); @@ -88,7 +116,6 @@ enum bt_query_executor_query_status bt_query_executor_query( BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class"); BT_ASSERT_PRE_NON_NULL(object, "Object"); BT_ASSERT_PRE_NON_NULL(user_result, "Result (output)"); - BT_ASSERT_PRE(!query_exec->canceled, "Query executor is canceled."); if (!params) { params = bt_value_null; @@ -141,29 +168,38 @@ enum bt_query_executor_query_status bt_query_executor_query( BT_ASSERT_POST(query_status != BT_FUNC_STATUS_OK || *user_result, "User method returned `BT_FUNC_STATUS_OK` without a result."); status = (int) query_status; - if (query_exec->canceled) { - BT_OBJECT_PUT_REF_AND_RESET(*user_result); - status = BT_FUNC_STATUS_CANCELED; - goto end; - } end: return status; } -enum bt_query_executor_cancel_status bt_query_executor_cancel( - struct bt_query_executor *query_exec) +enum bt_query_executor_add_interrupter_status bt_query_executor_add_interrupter( + struct bt_query_executor *query_exec, + const struct bt_interrupter *intr) { BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor"); - query_exec->canceled = BT_TRUE; - BT_LOGI("Canceled query executor: addr=%p", query_exec); + BT_ASSERT_PRE_NON_NULL(intr, "Interrupter"); + g_ptr_array_add(query_exec->interrupters, (void *) intr); + bt_object_get_no_null_check(intr); + BT_LIB_LOGD("Added interrupter to query executor: " + "query-exec-addr=%p, %![intr-]+z", + query_exec, intr); return BT_FUNC_STATUS_OK; } -bt_bool bt_query_executor_is_canceled(const struct bt_query_executor *query_exec) +bt_bool bt_query_executor_is_interrupted(const struct bt_query_executor *query_exec) { - BT_ASSERT_PRE_DEV_NON_NULL(query_exec, "Query executor"); - return query_exec->canceled; + BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor"); + return (bt_bool) bt_interrupter_array_any_is_set( + query_exec->interrupters); +} + +void bt_query_executor_interrupt(struct bt_query_executor *query_exec) +{ + BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor"); + bt_interrupter_set(query_exec->default_interrupter); + BT_LIB_LOGI("Interrupted query executor: query-exec-addr=%p", + query_exec); } void bt_query_executor_get_ref(const struct bt_query_executor *query_executor) diff --git a/src/lib/graph/query-executor.h b/src/lib/graph/query-executor.h index 875c26c0..c1dcc9c4 100644 --- a/src/lib/graph/query-executor.h +++ b/src/lib/graph/query-executor.h @@ -23,14 +23,29 @@ * SOFTWARE. */ +#include + #include -#include "lib/object.h" #include #include +#include "lib/object.h" + struct bt_query_executor { struct bt_object base; - bool canceled; + + /* + * Array of `struct bt_interrupter *`, each one owned by this. + * If any interrupter is set, then this query executor is deemed + * interrupted. + */ + GPtrArray *interrupters; + + /* + * Default interrupter to support bt_query_executor_interrupt(); + * owned by this. + */ + struct bt_interrupter *default_interrupter; }; #endif /* BABELTRACE_GRAPH_QUERY_EXECUTOR_INTERNAL_H */ diff --git a/src/lib/lib-logging.c b/src/lib/lib-logging.c index 91db4a31..e9afb806 100644 --- a/src/lib/lib-logging.c +++ b/src/lib/lib-logging.c @@ -1120,9 +1120,7 @@ static inline void format_graph(char **buf_ch, bool extended, { char tmp_prefix[TMP_PREFIX_LEN]; - BUF_APPEND(", %sis-canceled=%d, %scan-consume=%d, " - "%sconfig-state=%s", - PRFIELD(graph->canceled), + BUF_APPEND(", %scan-consume=%d, %sconfig-state=%s", PRFIELD(graph->can_consume), PRFIELD(bt_graph_configuration_state_string(graph->config_state))); diff --git a/src/lib/value.c b/src/lib/value.c index 183d2081..8057663c 100644 --- a/src/lib/value.c +++ b/src/lib/value.c @@ -1203,10 +1203,10 @@ enum bt_value_map_foreach_entry_status bt_value_map_foreach_entry( const char *key_str = g_quark_to_string(GPOINTER_TO_UINT(key)); if (!func(key_str, element_obj, data)) { - BT_LOGT("User canceled the loop: key=\"%s\", " + BT_LOGT("User interrupted the loop: key=\"%s\", " "value-addr=%p, data=%p", key_str, element_obj, data); - ret = BT_FUNC_STATUS_CANCELED; + ret = BT_FUNC_STATUS_INTERRUPTED; break; } } diff --git a/src/plugins/ctf/lttng-live/lttng-live.c b/src/plugins/ctf/lttng-live/lttng-live.c index 0cabf25d..11b80f47 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.c +++ b/src/plugins/ctf/lttng-live/lttng-live.c @@ -109,18 +109,17 @@ const char *print_state(struct lttng_live_stream_iterator *s) } while (0); BT_HIDDEN -bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live) +bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter) { - const bt_component *component; bool ret; - if (!lttng_live) { + if (!msg_iter) { ret = false; goto end; } - component = bt_self_component_as_component(lttng_live->self_comp); - ret = bt_component_graph_is_canceled(component); + ret = bt_self_message_iterator_is_interrupted( + msg_iter->self_msg_iter); end: return ret; @@ -259,7 +258,6 @@ end: static void lttng_live_destroy_session(struct lttng_live_session *session) { - struct lttng_live_component *live_comp; bt_logging_level log_level; bt_self_component *self_comp; @@ -272,9 +270,9 @@ void lttng_live_destroy_session(struct lttng_live_session *session) BT_COMP_LOGD("Destroy lttng live session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - live_comp = session->lttng_live_msg_iter->lttng_live_comp; if (session->lttng_live_msg_iter && - !lttng_live_graph_is_canceled(live_comp)) { + !lttng_live_graph_is_canceled( + session->lttng_live_msg_iter)) { /* Old relayd cannot detach sessions. */ BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64, session->id); @@ -442,7 +440,7 @@ enum lttng_live_iterator_status lttng_live_get_session( ret = lttng_live_attach_session(session); if (ret) { if (lttng_live_msg_iter && lttng_live_graph_is_canceled( - lttng_live_msg_iter->lttng_live_comp)) { + lttng_live_msg_iter)) { status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -1589,11 +1587,6 @@ bt_component_class_init_method_status lttng_live_component_init( goto error; } - if (lttng_live_graph_is_canceled(lttng_live)) { - ret = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR; - goto error; - } - add_port_status = bt_self_component_source_add_output_port( self_comp_src, "out", NULL, NULL); switch (add_port_status) { diff --git a/src/plugins/ctf/lttng-live/lttng-live.h b/src/plugins/ctf/lttng-live/lttng-live.h index 62e1e752..77632daf 100644 --- a/src/plugins/ctf/lttng-live/lttng-live.h +++ b/src/plugins/ctf/lttng-live/lttng-live.h @@ -313,6 +313,6 @@ struct lttng_live_trace *lttng_live_borrow_trace( struct lttng_live_session *session, uint64_t trace_id); void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter); -bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live); +bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter); #endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */ diff --git a/src/plugins/ctf/lttng-live/metadata.c b/src/plugins/ctf/lttng-live/metadata.c index 8475c569..b31cd96a 100644 --- a/src/plugins/ctf/lttng-live/metadata.c +++ b/src/plugins/ctf/lttng-live/metadata.c @@ -120,8 +120,6 @@ enum lttng_live_iterator_status lttng_live_metadata_update( { struct lttng_live_session *session = trace->session; struct lttng_live_metadata *metadata = trace->metadata; - struct lttng_live_component *lttng_live = - session->lttng_live_msg_iter->lttng_live_comp; ssize_t ret = 0; size_t size, len_read = 0; char *metadata_buf = NULL; @@ -188,7 +186,8 @@ enum lttng_live_iterator_status lttng_live_metadata_update( metadata->trace = NULL; } if (errno == EINTR) { - if (lttng_live_graph_is_canceled(lttng_live)) { + if (lttng_live_graph_is_canceled( + session->lttng_live_msg_iter)) { status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; goto end; } diff --git a/src/plugins/ctf/lttng-live/viewer-connection.c b/src/plugins/ctf/lttng-live/viewer-connection.c index ac335427..42ea78c0 100644 --- a/src/plugins/ctf/lttng-live/viewer-connection.c +++ b/src/plugins/ctf/lttng-live/viewer-connection.c @@ -67,7 +67,7 @@ ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection, } if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { + lttng_live_graph_is_canceled(lttng_live_msg_iter)) { break; } else { continue; @@ -93,7 +93,7 @@ ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection, ret = bt_socket_send_nosigpipe(sock, buf, len); if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { if (!viewer_connection->in_query && - lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) { + lttng_live_graph_is_canceled(lttng_live_msg_iter)) { break; } else { continue; @@ -1125,8 +1125,6 @@ enum lttng_live_iterator_status lttng_live_get_next_index( struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - struct lttng_live_component *lttng_live = - lttng_live_msg_iter->lttng_live_comp; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); @@ -1239,7 +1237,7 @@ end: return retstatus; error: - if (lttng_live_graph_is_canceled(lttng_live)) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR; @@ -1264,8 +1262,6 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes( struct lttng_live_trace *trace = stream->trace; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; - struct lttng_live_component *lttng_live = - lttng_live_msg_iter->lttng_live_comp; BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1364,7 +1360,7 @@ end: return retstatus; error: - if (lttng_live_graph_is_canceled(lttng_live)) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN; } else { retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR; @@ -1389,8 +1385,6 @@ enum lttng_live_iterator_status lttng_live_get_new_streams( session->lttng_live_msg_iter; struct live_viewer_connection *viewer_connection = lttng_live_msg_iter->viewer_connection; - struct lttng_live_component *lttng_live = - lttng_live_msg_iter->lttng_live_comp; uint32_t streams_count; const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); char cmd_buf[cmd_buf_len]; @@ -1461,7 +1455,7 @@ end: return status; error: - if (lttng_live_graph_is_canceled(lttng_live)) { + if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) { status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { status = LTTNG_LIVE_ITERATOR_STATUS_ERROR; diff --git a/tests/bindings/python/bt2/test_graph.py b/tests/bindings/python/bt2/test_graph.py index 4f6aade1..7a403b5c 100644 --- a/tests/bindings/python/bt2/test_graph.py +++ b/tests/bindings/python/bt2/test_graph.py @@ -196,14 +196,52 @@ class GraphTestCase(unittest.TestCase): sink.input_ports['in'], src.output_ports['out'] ) - def test_cancel(self): - self.assertFalse(self._graph.is_canceled) - self._graph.cancel() - self.assertTrue(self._graph.is_canceled) - - # Test that Graph.run() raises bt2.Canceled if the graph gets canceled - # during execution. - def test_cancel_while_running(self): + def test_add_interrupter(self): + class MyIter(bt2._UserMessageIterator): + def __next__(self): + raise TypeError + + class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter): + def __init__(self, params): + self._add_output_port('out') + + class MySink(bt2._UserSinkComponent): + def __init__(self, params): + self._add_input_port('in') + + def _consume(self): + next(self._msg_iter) + + def _graph_is_configured(self): + self._msg_iter = self._create_input_port_message_iterator( + self._input_ports['in'] + ) + + # add two interrupters, set one of them + interrupter1 = bt2.Interrupter() + interrupter2 = bt2.Interrupter() + self._graph.add_interrupter(interrupter1) + src = self._graph.add_component(MySource, 'src') + sink = self._graph.add_component(MySink, 'sink') + self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in']) + self._graph.add_interrupter(interrupter2) + + with self.assertRaises(bt2._Error): + self._graph.run() + + interrupter2.set() + + with self.assertRaises(bt2.TryAgain): + self._graph.run() + + interrupter2.reset() + + with self.assertRaises(bt2._Error): + self._graph.run() + + # Test that Graph.run() raises bt2.Interrupted if the graph gets + # interrupted during execution. + def test_interrupt_while_running(self): class MyIter(_MyIter): def __next__(self): return self._create_stream_beginning_message(self._stream) @@ -217,10 +255,9 @@ class GraphTestCase(unittest.TestCase): self._add_input_port('in') def _consume(self): - # Pretend that somebody asynchronously cancelled the graph. + # Pretend that somebody asynchronously interrupted the graph. nonlocal graph - graph.cancel() - + graph.interrupt() return next(self._msg_iter) def _graph_is_configured(self): @@ -228,12 +265,13 @@ class GraphTestCase(unittest.TestCase): self._input_ports['in'] ) - graph = bt2.Graph() - up = graph.add_component(MySource, 'down') - down = graph.add_component(MySink, 'up') - graph.connect_ports(up.output_ports['out'], down.input_ports['in']) - with self.assertRaises(bt2.Canceled): - graph.run() + graph = self._graph + up = self._graph.add_component(MySource, 'down') + down = self._graph.add_component(MySink, 'up') + self._graph.connect_ports(up.output_ports['out'], down.input_ports['in']) + + with self.assertRaises(bt2.TryAgain): + self._graph.run() def test_run(self): class MyIter(_MyIter): diff --git a/tests/bindings/python/bt2/test_query_executor.py b/tests/bindings/python/bt2/test_query_executor.py index 09774309..bc20a719 100644 --- a/tests/bindings/python/bt2/test_query_executor.py +++ b/tests/bindings/python/bt2/test_query_executor.py @@ -183,13 +183,32 @@ class QueryExecutorTestCase(unittest.TestCase): with self.assertRaises(bt2.TryAgain): res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23]) - def test_cancel_no_query(self): + def test_query_add_interrupter(self): + class MySink(bt2._UserSinkComponent): + def _consume(self): + pass + + def _graph_is_configured(self): + pass + + @classmethod + def _query(cls, query_exec, obj, params, log_level): + nonlocal interrupter2 + test_self.assertFalse(query_exec.is_interrupted) + interrupter2.set() + test_self.assertTrue(query_exec.is_interrupted) + interrupter2.reset() + test_self.assertFalse(query_exec.is_interrupted) + + interrupter1 = bt2.Interrupter() + interrupter2 = bt2.Interrupter() + test_self = self query_exec = bt2.QueryExecutor() - self.assertFalse(query_exec.is_canceled) - query_exec.cancel() - self.assertTrue(query_exec.is_canceled) + query_exec.add_interrupter(interrupter1) + query_exec.add_interrupter(interrupter2) + query_exec.query(MySink, 'obj', [17, 23]) - def test_query_canceled(self): + def test_query_interrupt(self): class MySink(bt2._UserSinkComponent): def _consume(self): pass @@ -199,10 +218,10 @@ class QueryExecutorTestCase(unittest.TestCase): @classmethod def _query(cls, query_exec, obj, params, log_level): - raise bt2.TryAgain + test_self.assertFalse(query_exec.is_interrupted) + query_exec.interrupt() + test_self.assertTrue(query_exec.is_interrupted) + test_self = self query_exec = bt2.QueryExecutor() - query_exec.cancel() - - with self.assertRaises(bt2.Canceled): - res = query_exec.query(MySink, 'obj', [17, 23]) + query_exec.query(MySink, 'obj', [17, 23]) diff --git a/tests/lib/test_bt_values.c b/tests/lib/test_bt_values.c index e170a5f3..39427f46 100644 --- a/tests/lib/test_bt_values.c +++ b/tests/lib/test_bt_values.c @@ -651,7 +651,7 @@ void test_map(void) ret = bt_value_map_foreach_entry(map_obj, test_map_foreach_cb_count, &count); - ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED && count == 3, + ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED && count == 3, "bt_value_map_foreach_entry() breaks the loop when the user function returns BT_FALSE"); memset(&checklist, 0, sizeof(checklist)); -- 2.34.1