lib: add bt_{graph,query_executor}_add_interrupter()
authorPhilippe Proulx <eeppeliteloop@gmail.com>
Sun, 21 Jul 2019 01:30:30 +0000 (21:30 -0400)
committerPhilippe Proulx <eeppeliteloop@gmail.com>
Wed, 24 Jul 2019 14:17:16 +0000 (10:17 -0400)
This patch makes it possible to add interrupter objects to a graph or to
a query executor.

To do this, you need to create an interrupter object
(bt_interrupter_create()), add it to a graph or to a query executor with
the new bt_graph_add_interrupter() and
bt_query_executor_add_interrupter() functions. Then you can interrupt
the object with bt_interrupter_set().

Within the whole project, the "cancel" terminology is renamed to
"interrupt", as many times it is possible to resume an interrupted
operation. For example, if you interrupt a running graph and
bt_graph_run() returns `BT_GRAPH_RUN_STATUS_AGAIN`, then you can resume
the graph's operation by calling bt_graph_run() again.

A graph or a query executor is deemed interrupted when any of its
interrupters is set. This makes it possible, for example, to add both a
global interrupter and a thread-specific interrupter to a graph object
so that either the thread-specific action or a global signal handler can
interrupt the graph.

As a convenience, bt_graph_interrupt() (which was bt_graph_cancel()) and
bt_query_executor_interrupt() (which was bt_query_executor_cancel()) are
kept: they set a default interrupter which is always part of the
object's interrupter set.

Conceptually, when a graph runs, each execution context is considered
independently interruptible:

* The execution of a message iterator.
* The execution of a sink component.
* The execution of the bt_graph_run() loop itself.

bt_graph_add_interrupter() conceptually adds the given interrupter to:

* All the graph's message iterators, current and future.

* All the graph's sink components.

* Itself, for bt_graph_run().

  This is needed because message iterators typically won't check if they
  are interrupted without performing "long" tasks. Because we cannot
  guarantee that, bt_graph_run() checks if it's interrupted for each
  sink consuming iteration.

This is why bt_graph_is_canceled() and bt_component_graph_is_canceled()
do not exist anymore, while the new
bt_self_message_iterator_is_interrupted() and
bt_self_component_sink_is_interrupted() functions are introduced.
However, because we don't need per-message iterator or per-sink
component interruption yet, bt_self_message_iterator_is_interrupted()
and bt_self_component_sink_is_interrupted() simply check their graph's
interrupters directly. The two functions exist so that, in the future,
we can add interrupters to a single message iterator (and its upstream
message iterators) or to a single sink component if needed.

As of this patch, you cannot remove an interrupter from a graph or from
a query executor as the project does not need this currently.

A message iterator, a sink component, a graph, or a query executor never
returns an "interrupted" status. It is the job of the actor which checks
the interruption status to return an appropriate status:

* Whenever possible, returning an "again" status is the cleanest
  approach: this indicates that the operation can resume later, even if
  it was interrupted.

* Otherwise, return an error status.

The distinction between a "real" "again" status and an "again" status
caused by an interruption is easy to make from the side controlling the
interrupter: check the interrupter's state to discover it. For example
(Python):

    while True:
        try:
            my_graph.run()
        except bt2.TryAgain:
            if my_interrupter:
                print('interrupted by user')
                sys.exit(1)
            else:
                time.sleep(.1)

The same should be checked when getting an error, as an error can be the
consequence of an interruption (this is what `src.ctf.lttng-live` does
currently as it cannot safely interrupt some network interchanges and
resume them later).

The CLI is changed to have a single, global interrupter object. This
interrupter is added to any query executor or graph. The `SIGINT` signal
handler sets this interrupter. When getting an "again" or an error
status, the CLI checks the interrupter's state to discover if this is
the result of a user action, for example:

    case BT_GRAPH_RUN_STATUS_AGAIN:
        if (bt_interrupter_is_set(the_interrupter)) {
            BT_CLI_LOGW_APPEND_CAUSE("Graph was interrupted by user.");
            goto error;
        }

        /* ... */

In the `bt2` Python package:

* The `bt2.Canceled` exception is removed as it's not needed anymore.
  The tests are adapted to demonstrate that.

* Graph.cancel() is changed to Graph.interrupt().

* Graph.is_canceled() is removed.

* There's a new Graph.add_interrupter() method.

* QueryExecutor.cancel() is changed to QueryExecutor.interrupt().

* There's a new QueryExecutor.add_interrupter() method.

* There's a new protected _UserMessageIterator._is_interrupted() method.

* There's a new protected _UserSinkComponent._is_interrupted() method.

Signed-off-by: Philippe Proulx <eeppeliteloop@gmail.com>
Change-Id: I4d6631d39b585bd440457e7fea53a939ec9aaf3a
Reviewed-on: https://review.lttng.org/c/babeltrace/+/1735
Tested-by: jenkins <jenkins@lttng.org>
40 files changed:
CONTRIBUTING.adoc
include/babeltrace2/babeltrace.h
include/babeltrace2/func-status.h
include/babeltrace2/graph/component-const.h
include/babeltrace2/graph/graph-const.h
include/babeltrace2/graph/graph.h
include/babeltrace2/graph/query-executor-const.h
include/babeltrace2/graph/query-executor.h
include/babeltrace2/graph/self-component-sink.h
include/babeltrace2/graph/self-message-iterator.h
include/babeltrace2/value-const.h
include/babeltrace2/value.h
src/bindings/python/bt2/bt2/__init__.py.in
src/bindings/python/bt2/bt2/component.py
src/bindings/python/bt2/bt2/graph.py
src/bindings/python/bt2/bt2/message_iterator.py
src/bindings/python/bt2/bt2/native_bt_component_class.i
src/bindings/python/bt2/bt2/query_executor.py
src/bindings/python/bt2/bt2/utils.py
src/cli/babeltrace2.c
src/common/common.h
src/lib/func-status.h
src/lib/graph/component-sink.c
src/lib/graph/component.c
src/lib/graph/connection.c
src/lib/graph/graph.c
src/lib/graph/graph.h
src/lib/graph/interrupter.h
src/lib/graph/iterator.c
src/lib/graph/query-executor.c
src/lib/graph/query-executor.h
src/lib/lib-logging.c
src/lib/value.c
src/plugins/ctf/lttng-live/lttng-live.c
src/plugins/ctf/lttng-live/lttng-live.h
src/plugins/ctf/lttng-live/metadata.c
src/plugins/ctf/lttng-live/viewer-connection.c
tests/bindings/python/bt2/test_graph.py
tests/bindings/python/bt2/test_query_executor.py
tests/lib/test_bt_values.c

index 46984b34ad1ab0b0079b682073ff60d3cf45b6fa..d5d86e54a2a9e4daa19b21d22de1edbbc4ce28be 100644 (file)
@@ -1353,7 +1353,7 @@ an _INFO_ build.
 * Object copying (except fields and values).
 * Object freezing (whatever the type, as freezing only occurs in
   developer mode).
-* Object cancellation.
+* Object interruption.
 * Calling user methods and logging the result.
 * Setting object properties (except fields and values).
 |
index 1be659069912eb7b01d772f75711435fa9a1dce9..e0db1254e4a5551db646a1312cb7196275c7d1c3 100644 (file)
 #include <babeltrace2/plugin/plugin-dev.h>
 
 /* Cancel private definitions */
-#undef __BT_FUNC_STATUS_OVERFLOW
-#undef __BT_FUNC_STATUS_INVALID_PARAMS
+#undef __BT_FUNC_STATUS_AGAIN
+#undef __BT_FUNC_STATUS_END
+#undef __BT_FUNC_STATUS_END
+#undef __BT_FUNC_STATUS_ERROR
+#undef __BT_FUNC_STATUS_ERROR
+#undef __BT_FUNC_STATUS_INTERRUPTED
 #undef __BT_FUNC_STATUS_INVALID_OBJECT
 #undef __BT_FUNC_STATUS_MEMORY_ERROR
-#undef __BT_FUNC_STATUS_ERROR
-#undef __BT_FUNC_STATUS_OK
-#undef __BT_FUNC_STATUS_END
 #undef __BT_FUNC_STATUS_NOT_FOUND
-#undef __BT_FUNC_STATUS_AGAIN
-#undef __BT_FUNC_STATUS_CANCELED
+#undef __BT_FUNC_STATUS_OK
 #undef __BT_IN_BABELTRACE_H
 #undef __BT_UPCAST
 #undef __BT_UPCAST_CONST
index c817b38842a7fdad7fb5d188a17b588342abaa69..78a2f0a473c13f37d27bc3acb443e18f16598a4b 100644 (file)
 # define __BT_FUNC_STATUS_NOT_FOUND            2
 #endif
 
+/* Object is interrupted */
+#ifndef __BT_FUNC_STATUS_INTERRUPTED
+# define __BT_FUNC_STATUS_INTERRUPTED          4
+#endif
+
 /* Try operation again later */
 #ifndef __BT_FUNC_STATUS_AGAIN
 # define __BT_FUNC_STATUS_AGAIN                        11
 #endif
-
-/* Object is canceled */
-#ifndef __BT_FUNC_STATUS_CANCELED
-# define __BT_FUNC_STATUS_CANCELED             125
-#endif
index 6ade2d3fb025983ac2cec23b0976e7e08941be58..11554b5e3b21306a9557c90b111171809a537766 100644 (file)
@@ -79,9 +79,6 @@ bt_bool bt_component_is_sink(const bt_component *component)
                BT_COMPONENT_CLASS_TYPE_SINK;
 }
 
-extern bt_bool bt_component_graph_is_canceled(
-               const bt_component *component);
-
 extern void bt_component_get_ref(const bt_component *component);
 
 extern void bt_component_put_ref(const bt_component *component);
index 44933ffd944a0ce69d9225fe963cba53ccae0abb..168ed2b25c5a8bed26e90487935d76e72263cc70 100644 (file)
@@ -33,8 +33,6 @@
 extern "C" {
 #endif
 
-extern bt_bool bt_graph_is_canceled(const bt_graph *graph);
-
 extern void bt_graph_get_ref(const bt_graph *graph);
 
 extern void bt_graph_put_ref(const bt_graph *graph);
index 059992c22244c3d2652cf012d4b0620e9a12b481..46aebf9896a7cc82ab06d1274ce8e0369c1466b1 100644 (file)
@@ -145,7 +145,6 @@ bt_graph_add_sink_component_with_init_method_data(
 typedef enum bt_graph_connect_ports_status {
        BT_GRAPH_CONNECT_PORTS_STATUS_OK                = __BT_FUNC_STATUS_OK,
        BT_GRAPH_CONNECT_PORTS_STATUS_ERROR             = __BT_FUNC_STATUS_ERROR,
-       BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED          = __BT_FUNC_STATUS_CANCELED,
        BT_GRAPH_CONNECT_PORTS_STATUS_MEMORY_ERROR      = __BT_FUNC_STATUS_MEMORY_ERROR,
 } bt_graph_connect_ports_status;
 
@@ -160,7 +159,6 @@ typedef enum bt_graph_run_status {
        BT_GRAPH_RUN_STATUS_MEMORY_ERROR        = __BT_FUNC_STATUS_MEMORY_ERROR,
        BT_GRAPH_RUN_STATUS_AGAIN               = __BT_FUNC_STATUS_AGAIN,
        BT_GRAPH_RUN_STATUS_END                 = __BT_FUNC_STATUS_END,
-       BT_GRAPH_RUN_STATUS_CANCELED            = __BT_FUNC_STATUS_CANCELED,
 } bt_graph_run_status;
 
 extern bt_graph_run_status bt_graph_run(bt_graph *graph);
@@ -171,7 +169,6 @@ typedef enum bt_graph_consume_status {
        BT_GRAPH_CONSUME_STATUS_MEMORY_ERROR    = __BT_FUNC_STATUS_MEMORY_ERROR,
        BT_GRAPH_CONSUME_STATUS_AGAIN           = __BT_FUNC_STATUS_AGAIN,
        BT_GRAPH_CONSUME_STATUS_END             = __BT_FUNC_STATUS_END,
-       BT_GRAPH_CONSUME_STATUS_CANCELED        = __BT_FUNC_STATUS_CANCELED,
 } bt_graph_consume_status;
 
 extern bt_graph_consume_status bt_graph_consume(bt_graph *graph);
@@ -237,11 +234,15 @@ bt_graph_add_filter_sink_component_ports_connected_listener(
                bt_graph_listener_removed_func listener_removed, void *data,
                int *listener_id);
 
-typedef enum bt_graph_cancel_status {
-       BT_GRAPH_CANCEL_STATUS_OK       = __BT_FUNC_STATUS_OK,
-} bt_graph_cancel_status;
+typedef enum bt_graph_add_interrupter_status {
+       BT_GRAPH_ADD_INTERRUPTER_STATUS_OK      = __BT_FUNC_STATUS_OK,
+       BT_GRAPH_ADD_INTERRUPTER_MEMORY_ERROR   = __BT_FUNC_STATUS_MEMORY_ERROR,
+} bt_graph_add_interrupter_status;
 
-extern bt_graph_cancel_status bt_graph_cancel(bt_graph *graph);
+extern bt_graph_add_interrupter_status bt_graph_add_interrupter(bt_graph *graph,
+               const bt_interrupter *interrupter);
+
+extern void bt_graph_interrupt(bt_graph *graph);
 
 #ifdef __cplusplus
 }
index fde9289bfa60c0482437314665e2a4708a609452..9b6be7e5bc1133b04f5822a464b9d2cfcf5b824b 100644 (file)
 extern "C" {
 #endif
 
-extern
-bt_bool bt_query_executor_is_canceled(
+extern bt_bool bt_query_executor_is_interrupted(
                const bt_query_executor *query_executor);
 
-extern void bt_query_executor_get_ref(
-               const bt_query_executor *query_executor);
+extern void bt_query_executor_get_ref(const bt_query_executor *query_executor);
 
-extern void bt_query_executor_put_ref(
-               const bt_query_executor *query_executor);
+extern void bt_query_executor_put_ref(const bt_query_executor *query_executor);
 
 #define BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(_var)              \
        do {                                                    \
index 332bd62b340223f349031cde25ee64512fe6222a..ef250462cc5fa79776291468c608110a07780012 100644 (file)
@@ -40,7 +40,6 @@ bt_query_executor *bt_query_executor_create(void);
 typedef enum bt_query_executor_query_status {
        BT_QUERY_EXECUTOR_QUERY_STATUS_OK               = __BT_FUNC_STATUS_OK,
        BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN            = __BT_FUNC_STATUS_AGAIN,
-       BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED         = __BT_FUNC_STATUS_CANCELED,
        BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR            = __BT_FUNC_STATUS_ERROR,
        BT_QUERY_EXECUTOR_QUERY_STATUS_MEMORY_ERROR     = __BT_FUNC_STATUS_MEMORY_ERROR,
        BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT   = __BT_FUNC_STATUS_INVALID_OBJECT,
@@ -54,13 +53,16 @@ bt_query_executor_query_status bt_query_executor_query(
                const char *object, const bt_value *params,
                bt_logging_level logging_level, const bt_value **result);
 
-typedef enum bt_query_executor_cancel_status {
-       BT_QUERY_EXECUTOR_CANCEL_STATUS_OK      = __BT_FUNC_STATUS_OK,
-} bt_query_executor_cancel_status;
+typedef enum bt_query_executor_add_interrupter_status {
+       BT_QUERY_EXECUTOR_ADD_INTERRUPTER_STATUS_OK     = __BT_FUNC_STATUS_OK,
+       BT_QUERY_EXECUTOR_ADD_INTERRUPTER_MEMORY_ERROR  = __BT_FUNC_STATUS_MEMORY_ERROR,
+} bt_query_executor_add_interrupter_status;
 
-extern
-bt_query_executor_cancel_status bt_query_executor_cancel(
-               bt_query_executor *query_executor);
+extern bt_query_executor_add_interrupter_status
+bt_query_executor_add_interrupter(bt_query_executor *query_executor,
+               const bt_interrupter *interrupter);
+
+extern void bt_query_executor_interrupt(bt_query_executor *query_executor);
 
 #ifdef __cplusplus
 }
index 3513d8a0f8b6df8fe82dd2f769a8363912fd432f..224c6781b9ebeb5fd57c2d9db869420885a389a7 100644 (file)
@@ -66,6 +66,9 @@ bt_self_component_sink_add_input_port(
                const char *name, void *user_data,
                bt_self_component_port_input **self_component_port);
 
+extern bt_bool bt_self_component_sink_is_interrupted(
+               const bt_self_component_sink *self_component);
+
 #ifdef __cplusplus
 }
 #endif
index a77901da4d28b948757dd75fdf99549ee5d8e8ff..025eb278a8dfb311e40f3196b467051f7f63d6d1 100644 (file)
@@ -33,6 +33,9 @@
 extern "C" {
 #endif
 
+extern bt_bool bt_self_message_iterator_is_interrupted(
+               const bt_self_message_iterator *message_iterator);
+
 extern bt_self_component *
 bt_self_message_iterator_borrow_component(
                bt_self_message_iterator *message_iterator);
index 3ac4783d49b198fa98567d146cee7f413506d981..67c05795cf126cca9f60554f744c37b46afb9c71 100644 (file)
@@ -161,7 +161,7 @@ typedef bt_bool (* bt_value_map_foreach_entry_const_func)(const char *key,
 typedef enum bt_value_map_foreach_entry_const_status {
        BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_MEMORY_ERROR    = __BT_FUNC_STATUS_MEMORY_ERROR,
        BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_OK              = __BT_FUNC_STATUS_OK,
-       BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_CANCELED        = __BT_FUNC_STATUS_CANCELED,
+       BT_VALUE_MAP_FOREACH_ENTRY_CONST_STATUS_INTERRUPTED     = __BT_FUNC_STATUS_INTERRUPTED,
 } bt_value_map_foreach_entry_const_status;
 
 extern bt_value_map_foreach_entry_const_status bt_value_map_foreach_entry_const(
index c5e8d701966cabe904fc3dba187783eafebf6a43..c37c090a67a5755e61b3f413fba60be45acdf499 100644 (file)
@@ -130,7 +130,7 @@ typedef bt_bool (* bt_value_map_foreach_entry_func)(const char *key,
 typedef enum bt_value_map_foreach_entry_status {
        BT_VALUE_MAP_FOREACH_ENTRY_STATUS_MEMORY_ERROR  = __BT_FUNC_STATUS_MEMORY_ERROR,
        BT_VALUE_MAP_FOREACH_ENTRY_STATUS_OK            = __BT_FUNC_STATUS_OK,
-       BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED      = __BT_FUNC_STATUS_CANCELED,
+       BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED   = __BT_FUNC_STATUS_INTERRUPTED,
 } bt_value_map_foreach_entry_status;
 
 extern bt_value_map_foreach_entry_status bt_value_map_foreach_entry(
index a801446ce44b1acde318fcd556e38c238f66dcd6..01c87e51c238f932ccaf16803dccf6e019a2d64b 100644 (file)
@@ -98,10 +98,6 @@ class IncompleteUserClass(Exception):
     pass
 
 
-class Canceled(Exception):
-    pass
-
-
 class _ListenerHandle:
     def __init__(self, listener_id, obj):
         self._listener_id = listener_id
index 10b1166300e337ee0fbfe82e828b9beb70e4f848..43e3ce1645bb90fe4c4a80f9e4b853ea70640af1 100644 (file)
@@ -864,3 +864,7 @@ class _UserSinkComponent(_UserComponent, _SinkComponent):
             raise bt2.CreationError('cannot create message iterator object')
 
         return bt2.message_iterator._UserComponentInputPortMessageIterator(msg_iter_ptr)
+
+    @property
+    def _is_interrupted(self):
+        return bool(native_bt.self_component_sink_is_interrupted(self._bt_ptr))
index e9659bd2f242bedbad3509a567be476507b36fb8..6d083228ba3c6e951a6d09bb024cbb94496144c1 100644 (file)
@@ -21,6 +21,7 @@
 # THE SOFTWARE.
 
 from bt2 import native_bt, object, utils
+import bt2.interrupter
 import bt2.connection
 import bt2.component
 import functools
@@ -170,24 +171,19 @@ class Graph(object._SharedObject):
         status = native_bt.graph_run(self._ptr)
 
         try:
-            utils._handle_func_status(
-                status, 'graph object stopped running because of an unexpected error'
-            )
+            utils._handle_func_status(status, 'graph object stopped running')
         except bt2.Stop:
             # done
             return
         except Exception:
             raise
 
-    def cancel(self):
-        status = native_bt.graph_cancel(self._ptr)
-        utils._handle_func_status(status, 'cannot cancel graph object')
+    def add_interrupter(self, interrupter):
+        utils._check_type(interrupter, bt2.interrupter.Interrupter)
+        native_bt.graph_add_interrupter(self._ptr, interrupter._ptr)
 
-    @property
-    def is_canceled(self):
-        is_canceled = native_bt.graph_is_canceled(self._ptr)
-        assert is_canceled >= 0
-        return is_canceled > 0
+    def interrupt(self):
+        native_bt.graph_interrupt(self._ptr)
 
     def create_output_port_message_iterator(self, output_port):
         utils._check_type(output_port, bt2.port._OutputPort)
index 07c707dba31c013182c0f9ffc3177ebc72b12108..25d3cbb1d94a0fcc5f0a3532ea7de74ceca15c2b 100644 (file)
@@ -138,6 +138,10 @@ class _UserMessageIterator(_MessageIterator):
     def addr(self):
         return int(self._bt_ptr)
 
+    @property
+    def _is_interrupted(self):
+        return bool(native_bt.self_message_iterator_is_interrupted(self._bt_ptr))
+
     def _finalize(self):
         pass
 
index c33b9caf87d87ac90c16756ff80494c945f90bc6..63dcc0ba29378f5e3612297fef2b6841364ea6da 100644 (file)
@@ -97,7 +97,6 @@ static PyObject *py_mod_bt2_exc_error_type = NULL;
 static PyObject *py_mod_bt2_exc_memory_error = NULL;
 static PyObject *py_mod_bt2_exc_try_again_type = NULL;
 static PyObject *py_mod_bt2_exc_stop_type = NULL;
-static PyObject *py_mod_bt2_exc_msg_iter_canceled_type = NULL;
 static PyObject *py_mod_bt2_exc_invalid_object_type = NULL;
 static PyObject *py_mod_bt2_exc_invalid_params_type = NULL;
 
@@ -154,7 +153,6 @@ void bt_bt2_cc_exit_handler(void)
        Py_XDECREF(py_mod_bt2_exc_error_type);
        Py_XDECREF(py_mod_bt2_exc_try_again_type);
        Py_XDECREF(py_mod_bt2_exc_stop_type);
-       Py_XDECREF(py_mod_bt2_exc_msg_iter_canceled_type);
        Py_XDECREF(py_mod_bt2_exc_invalid_object_type);
        Py_XDECREF(py_mod_bt2_exc_invalid_params_type);
 }
index 6e2bbc8108731090924d04dba4ff6b9049fe414a..e9d3d761b16fba6f0a251598412737193c303cea 100644 (file)
@@ -21,6 +21,7 @@
 # THE SOFTWARE.
 
 from bt2 import native_bt, object, utils
+import bt2.interrupter
 import bt2.component
 import bt2.logging
 import bt2
@@ -38,15 +39,17 @@ class QueryExecutor(object._SharedObject):
 
         super().__init__(ptr)
 
-    def cancel(self):
-        status = native_bt.query_executor_cancel(self._ptr)
-        utils._handle_func_status(status, 'cannot cancel query executor object')
+    def add_interrupter(self, interrupter):
+        utils._check_type(interrupter, bt2.interrupter.Interrupter)
+        native_bt.query_executor_add_interrupter(self._ptr, interrupter._ptr)
+
+    def interrupt(self):
+        native_bt.query_executor_interrupt(self._ptr)
 
     @property
-    def is_canceled(self):
-        is_canceled = native_bt.query_executor_is_canceled(self._ptr)
-        assert is_canceled >= 0
-        return is_canceled > 0
+    def is_interrupted(self):
+        is_interrupted = native_bt.query_executor_is_interrupted(self._ptr)
+        return bool(is_interrupted)
 
     def query(
         self,
@@ -55,9 +58,6 @@ class QueryExecutor(object._SharedObject):
         params=None,
         logging_level=bt2.logging.LoggingLevel.NONE,
     ):
-        if self.is_canceled:
-            raise bt2.Canceled
-
         if not isinstance(component_class, bt2.component._GenericComponentClass):
             err = False
 
index bd4dd7a9406c3749dc1e5865d98e6f560326769e..aa871042bc136a73ad66ae908b62c148b190a573 100644 (file)
@@ -151,11 +151,6 @@ def _handle_func_status(status, msg=None):
             raise bt2.TryAgain
         else:
             raise bt2.TryAgain(msg)
-    elif status == native_bt.__BT_FUNC_STATUS_CANCELED:
-        if msg is None:
-            raise bt2.Canceled
-        else:
-            raise bt2.Canceled(msg)
     elif status == native_bt.__BT_FUNC_STATUS_OVERFLOW:
         if msg is None:
             raise bt2.OverflowError
index 0eeb8770d4707d218f141045ce9562c88cc978ad..9699b0c3b5e46fba695fbbf6d4614df0f413f559 100644 (file)
@@ -55,10 +55,9 @@ static const char* log_level_env_var_names[] = {
        NULL,
 };
 
-/* Application's processing graph (weak) */
-static bt_graph *the_graph;
-static bt_query_executor *the_query_executor;
-static bool canceled = false;
+/* Application's interrupter (owned by this) */
+static bt_interrupter *the_interrupter;
+static volatile bool interrupted = false;
 
 #ifdef __MINGW32__
 
@@ -66,12 +65,11 @@ static bool canceled = false;
 
 static
 BOOL WINAPI signal_handler(DWORD signal) {
-       if (the_graph) {
-               bt_graph_cancel(the_graph);
+       if (the_interrupter) {
+               bt_interrupter_set(the_interrupter);
        }
 
-       canceled = true;
-
+       interrupted = true;
        return TRUE;
 }
 
@@ -92,15 +90,11 @@ void signal_handler(int signum)
                return;
        }
 
-       if (the_graph) {
-               bt_graph_cancel(the_graph);
-       }
-
-       if (the_query_executor) {
-               bt_query_executor_cancel(the_query_executor);
+       if (the_interrupter) {
+               bt_interrupter_set(the_interrupter);
        }
 
-       canceled = true;
+       interrupted = true;
 }
 
 static
@@ -120,26 +114,6 @@ void set_signal_handler(void)
 
 #endif /* __MINGW32__ */
 
-static
-int create_the_query_executor(void)
-{
-       int ret = 0;
-
-       the_query_executor = bt_query_executor_create();
-       if (!the_query_executor) {
-               BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor.");
-               ret = -1;
-       }
-
-       return ret;
-}
-
-static
-void destroy_the_query_executor(void)
-{
-       BT_QUERY_EXECUTOR_PUT_REF_AND_RESET(the_query_executor);
-}
-
 static
 int query(struct bt_config *cfg, const bt_component_class *comp_cls,
                const char *obj, const bt_value *params,
@@ -147,30 +121,33 @@ int query(struct bt_config *cfg, const bt_component_class *comp_cls,
 {
        const bt_value *result = NULL;
        bt_query_executor_query_status query_status;
+       bt_query_executor *query_exec;
        *fail_reason = "unknown error";
        int ret = 0;
 
        BT_ASSERT(fail_reason);
        BT_ASSERT(user_result);
-       ret = create_the_query_executor();
-       if (ret) {
-               /* create_the_query_executor() logs errors */
-               goto end;
+       query_exec = bt_query_executor_create();
+       if (!query_exec) {
+               BT_CLI_LOGE_APPEND_CAUSE("Cannot create a query executor.");
+               goto error;
        }
 
-       if (canceled) {
+       bt_query_executor_add_interrupter(query_exec, the_interrupter);
+
+       if (interrupted) {
                BT_CLI_LOGW_APPEND_CAUSE(
-                       "Canceled by user before executing the query: "
+                       "Interrupted by user before executing the query: "
                        "comp-cls-addr=%p, comp-cls-name=\"%s\", "
                        "query-obj=\"%s\"", comp_cls,
                        bt_component_class_get_name(comp_cls), obj);
-               *fail_reason = "canceled by user";
+               *fail_reason = "interrupted by user";
                goto error;
        }
 
        while (true) {
                query_status = bt_query_executor_query(
-                       the_query_executor, comp_cls, obj, params,
+                       query_exec, comp_cls, obj, params,
                        cfg->log_level, &result);
                switch (query_status) {
                case BT_QUERY_EXECUTOR_QUERY_STATUS_OK:
@@ -179,29 +156,36 @@ int query(struct bt_config *cfg, const bt_component_class *comp_cls,
                {
                        const uint64_t sleep_time_us = 100000;
 
+                       if (bt_interrupter_is_set(the_interrupter)) {
+                               *fail_reason = "interrupted by user";
+                               goto error;
+                       }
+
                        /* Wait 100 ms and retry */
                        BT_LOGD("Got BT_QUERY_EXECUTOR_QUERY_STATUS_AGAIN: sleeping: "
                                "time-us=%" PRIu64, sleep_time_us);
 
                        if (usleep(sleep_time_us)) {
-                               if (bt_query_executor_is_canceled(the_query_executor)) {
+                               if (bt_interrupter_is_set(the_interrupter)) {
                                        BT_CLI_LOGW_APPEND_CAUSE(
-                                               "Query was canceled by user: "
+                                               "Query was interrupted by user: "
                                                "comp-cls-addr=%p, comp-cls-name=\"%s\", "
                                                "query-obj=\"%s\"", comp_cls,
                                                bt_component_class_get_name(comp_cls),
                                                obj);
-                                       *fail_reason = "canceled by user";
+                                       *fail_reason = "interrupted by user";
                                        goto error;
                                }
                        }
 
                        continue;
                }
-               case BT_QUERY_EXECUTOR_QUERY_STATUS_CANCELED:
-                       *fail_reason = "canceled by user";
-                       goto error;
                case BT_QUERY_EXECUTOR_QUERY_STATUS_ERROR:
+                       if (bt_interrupter_is_set(the_interrupter)) {
+                               *fail_reason = "interrupted by user";
+                               goto error;
+                       }
+
                        goto error;
                case BT_QUERY_EXECUTOR_QUERY_STATUS_INVALID_OBJECT:
                        *fail_reason = "invalid or unknown query object";
@@ -228,7 +212,7 @@ error:
        ret = -1;
 
 end:
-       destroy_the_query_executor();
+       bt_query_executor_put_ref(query_exec);
        bt_value_put_ref(result);
        return ret;
 }
@@ -1604,9 +1588,6 @@ int cmd_run_ctx_connect_upstream_port_to_downstream_component(
                switch (connect_ports_status) {
                case BT_GRAPH_CONNECT_PORTS_STATUS_OK:
                        break;
-               case BT_GRAPH_CONNECT_PORTS_STATUS_CANCELED:
-                       BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user.");
-                       break;
                default:
                        BT_CLI_LOGE_APPEND_CAUSE(
                                "Cannot create connection: graph refuses to connect ports: "
@@ -1826,7 +1807,6 @@ void cmd_run_ctx_destroy(struct cmd_run_ctx *ctx)
        }
 
        BT_GRAPH_PUT_REF_AND_RESET(ctx->graph);
-       the_graph = NULL;
        ctx->cfg = NULL;
 }
 
@@ -1870,7 +1850,7 @@ int cmd_run_ctx_init(struct cmd_run_ctx *ctx, struct bt_config *cfg)
                goto error;
        }
 
-       the_graph = ctx->graph;
+       bt_graph_add_interrupter(ctx->graph, the_interrupter);
        add_listener_status = bt_graph_add_source_component_output_port_added_listener(
                ctx->graph, graph_source_output_port_added_listener, NULL, ctx,
                NULL);
@@ -2331,9 +2311,9 @@ int cmd_run(struct bt_config *cfg)
                goto error;
        }
 
-       if (canceled) {
+       if (interrupted) {
                BT_CLI_LOGW_APPEND_CAUSE(
-                       "Canceled by user before creating components.");
+                       "Interrupted by user before creating components.");
                goto error;
        }
 
@@ -2345,9 +2325,9 @@ int cmd_run(struct bt_config *cfg)
                goto error;
        }
 
-       if (canceled) {
+       if (interrupted) {
                BT_CLI_LOGW_APPEND_CAUSE(
-                       "Canceled by user before connecting components.");
+                       "Interrupted by user before connecting components.");
                goto error;
        }
 
@@ -2360,9 +2340,9 @@ int cmd_run(struct bt_config *cfg)
                goto error;
        }
 
-       if (canceled) {
+       if (interrupted) {
                BT_CLI_LOGW_APPEND_CAUSE(
-                       "Canceled by user before running the graph.");
+                       "Interrupted by user before running the graph.");
                goto error;
        }
 
@@ -2385,13 +2365,10 @@ int cmd_run(struct bt_config *cfg)
                switch (run_status) {
                case BT_GRAPH_RUN_STATUS_OK:
                        break;
-               case BT_GRAPH_RUN_STATUS_CANCELED:
-                       BT_CLI_LOGW_APPEND_CAUSE("Graph was canceled by user.");
-                       goto error;
                case BT_GRAPH_RUN_STATUS_AGAIN:
-                       if (bt_graph_is_canceled(ctx.graph)) {
+                       if (bt_interrupter_is_set(the_interrupter)) {
                                BT_CLI_LOGW_APPEND_CAUSE(
-                                       "Graph was canceled by user.");
+                                       "Graph was interrupted by user.");
                                goto error;
                        }
 
@@ -2401,9 +2378,9 @@ int cmd_run(struct bt_config *cfg)
                                        cfg->cmd_data.run.retry_duration_us);
 
                                if (usleep(cfg->cmd_data.run.retry_duration_us)) {
-                                       if (bt_graph_is_canceled(ctx.graph)) {
+                                       if (bt_interrupter_is_set(the_interrupter)) {
                                                BT_CLI_LOGW_APPEND_CAUSE(
-                                                       "Graph was canceled by user.");
+                                                       "Graph was interrupted by user.");
                                                goto error;
                                        }
                                }
@@ -2412,6 +2389,14 @@ int cmd_run(struct bt_config *cfg)
                case BT_GRAPH_RUN_STATUS_END:
                        goto end;
                default:
+                       if (bt_interrupter_is_set(the_interrupter)) {
+                               BT_CLI_LOGW_APPEND_CAUSE(
+                                       "Graph was interrupted by user and failed: "
+                                       "status=%s",
+                                       bt_common_func_status_string(run_status));
+                               goto error;
+                       }
+
                        BT_CLI_LOGE_APPEND_CAUSE(
                                "Graph failed to complete successfully");
                        goto error;
@@ -2705,6 +2690,14 @@ int main(int argc, const char **argv)
                }
        }
 
+       BT_ASSERT(!the_interrupter);
+       the_interrupter = bt_interrupter_create();
+       if (!the_interrupter) {
+               BT_CLI_LOGE_APPEND_CAUSE("Failed to create an interrupter object.");
+               retcode = 1;
+               goto end;
+       }
+
        BT_LOGI("Executing command: cmd=%d, command-name=\"%s\"",
                cfg->command, cfg->command_name);
 
@@ -2740,6 +2733,7 @@ int main(int argc, const char **argv)
 end:
        BT_OBJECT_PUT_REF_AND_RESET(cfg);
        fini_loaded_plugins();
+       bt_interrupter_put_ref(the_interrupter);
 
        if (retcode != 0) {
                print_error_causes();
index 5156e1b0be565312ddcaac487f75102e5f59f40a..8429145c5134c4ce5033a360934bda9ff654c220 100644 (file)
@@ -619,8 +619,8 @@ const char *bt_common_func_status_string(int status)
                return "NOT_FOUND";
        case __BT_FUNC_STATUS_AGAIN:
                return "AGAIN";
-       case __BT_FUNC_STATUS_CANCELED:
-               return "CANCELED";
+       case __BT_FUNC_STATUS_INTERRUPTED:
+               return "INTERRUPTED";
        default:
                return "(unknown)";
        }
index f51328514c6cadee25e72e3f354da07e96336aca..658a82d7ff4a0996820e978eb3603ec95cbef733 100644 (file)
  * Aliases without a `__` prefix for internal code: this is just easier
  * to read.
  */
-#define BT_FUNC_STATUS_OVERFLOW                __BT_FUNC_STATUS_OVERFLOW
-#define BT_FUNC_STATUS_INVALID_PARAMS  __BT_FUNC_STATUS_INVALID_PARAMS
+#define BT_FUNC_STATUS_AGAIN           __BT_FUNC_STATUS_AGAIN
+#define BT_FUNC_STATUS_END             __BT_FUNC_STATUS_END
+#define BT_FUNC_STATUS_ERROR           __BT_FUNC_STATUS_ERROR
+#define BT_FUNC_STATUS_INTERRUPTED     __BT_FUNC_STATUS_INTERRUPTED
 #define BT_FUNC_STATUS_INVALID_OBJECT  __BT_FUNC_STATUS_INVALID_OBJECT
+#define BT_FUNC_STATUS_INVALID_PARAMS  __BT_FUNC_STATUS_INVALID_PARAMS
 #define BT_FUNC_STATUS_MEMORY_ERROR    __BT_FUNC_STATUS_MEMORY_ERROR
-#define BT_FUNC_STATUS_ERROR           __BT_FUNC_STATUS_ERROR
-#define BT_FUNC_STATUS_OK              __BT_FUNC_STATUS_OK
-#define BT_FUNC_STATUS_END             __BT_FUNC_STATUS_END
 #define BT_FUNC_STATUS_NOT_FOUND       __BT_FUNC_STATUS_NOT_FOUND
-#define BT_FUNC_STATUS_AGAIN           __BT_FUNC_STATUS_AGAIN
-#define BT_FUNC_STATUS_CANCELED                __BT_FUNC_STATUS_CANCELED
+#define BT_FUNC_STATUS_OK              __BT_FUNC_STATUS_OK
+#define BT_FUNC_STATUS_OVERFLOW                __BT_FUNC_STATUS_OVERFLOW
 
 #endif /* BABELTRACE_FUNC_STATUS_INTERNAL_H */
index b218144d145d863f3630843c21880a3b6d2e6d87..9d4e11d47cc17d9a4859cb77db6d6f8f0ddb02ca 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "common/assert.h"
 #include "lib/assert-pre.h"
+#include "lib/assert-post.h"
 #include "compat/compiler.h"
 #include <babeltrace2/value.h>
 #include <babeltrace2/graph/self-component-sink.h>
@@ -34,6 +35,7 @@
 
 #include "component-sink.h"
 #include "component.h"
+#include "graph.h"
 #include "lib/func-status.h"
 
 BT_HIDDEN
@@ -141,6 +143,16 @@ end:
        return status;
 }
 
+bt_bool bt_self_component_sink_is_interrupted(
+               const struct bt_self_component_sink *self_comp)
+{
+       struct bt_component *comp = (void *) self_comp;
+
+       BT_ASSERT_PRE_NON_NULL(comp, "Component");
+       return (bt_bool) bt_graph_is_interrupted(
+               bt_component_borrow_graph(comp));
+}
+
 void bt_component_sink_get_ref(
                const struct bt_component_sink *component_sink)
 {
index 22119689dd4873fea9a94c55c682b6d741f0fa42..11144fb42c400eb531b8fa8340f43ef3d0a1eaec 100644 (file)
@@ -208,9 +208,6 @@ enum bt_self_component_add_port_status add_port(
        BT_ASSERT_PRE_NON_NULL(name, "Name");
        BT_ASSERT_PRE(strlen(name) > 0, "Name is empty");
        graph = bt_component_borrow_graph(component);
-       BT_ASSERT_PRE(graph && !bt_graph_is_canceled(graph),
-               "Component's graph is canceled: %![comp-]+c, %![graph-]+g",
-               component, graph);
        BT_ASSERT_PRE(
                graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
                "Component's graph is already configured: "
@@ -392,12 +389,6 @@ void bt_component_set_graph(struct bt_component *component,
                graph ? &graph->base : NULL);
 }
 
-bt_bool bt_component_graph_is_canceled(const struct bt_component *component)
-{
-       return bt_graph_is_canceled(
-               (void *) bt_object_borrow_parent(&component->base));
-}
-
 static
 struct bt_port *borrow_port_by_name(GPtrArray *ports,
                const char *name)
index a50828fbdc415be82d11e798ba6c5be9c5b597b5..64b0fda042e3e565f4f854633c450768455ea945 100644 (file)
@@ -50,8 +50,7 @@ void destroy_connection(struct bt_object *obj)
        /*
         * Make sure that each message iterator which was created for
         * this connection is finalized before we destroy it. Once a
-        * message iterator is finalized, all its method return NULL or
-        * the BT_MESSAGE_ITERATOR_STATUS_CANCELED status.
+        * message iterator is finalized, you cannot use it.
         *
         * Because connections are destroyed before components within a
         * graph, this ensures that message iterators are always
index af6b2b2dc804a973d1ce6540e9013744cc4017bf..192ec9e59489e01e5c61bdacbb05609224e4aa39 100644 (file)
@@ -46,6 +46,7 @@
 #include "component-sink.h"
 #include "connection.h"
 #include "graph.h"
+#include "interrupter.h"
 #include "message/event.h"
 #include "message/packet.h"
 
@@ -133,12 +134,7 @@ void destroy_graph(struct bt_object *obj)
         */
        BT_LIB_LOGI("Destroying graph: %!+g", graph);
        obj->ref_count++;
-
-       /*
-        * Cancel the graph to disallow some operations, like creating
-        * message iterators and adding ports to components.
-        */
-       (void) bt_graph_cancel((void *) graph);
+       graph->config_state = BT_GRAPH_CONFIGURATION_STATE_DESTROYING;
 
        /* Call all remove listeners */
        CALL_REMOVE_LISTENERS(struct bt_graph_listener_port_added,
@@ -175,6 +171,14 @@ void destroy_graph(struct bt_object *obj)
                graph->components = NULL;
        }
 
+       if (graph->interrupters) {
+               BT_LOGD_STR("Putting interrupters.");
+               g_ptr_array_free(graph->interrupters, TRUE);
+               graph->interrupters = NULL;
+       }
+
+       BT_OBJECT_PUT_REF_AND_RESET(graph->default_interrupter);
+
        if (graph->sinks_to_consume) {
                g_queue_free(graph->sinks_to_consume);
                graph->sinks_to_consume = NULL;
@@ -353,6 +357,21 @@ struct bt_graph *bt_graph_create(void)
                goto error;
        }
 
+       graph->interrupters = g_ptr_array_new_with_free_func(
+               (GDestroyNotify) bt_object_put_no_null_check);
+       if (!graph->interrupters) {
+               BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray.");
+               goto error;
+       }
+
+       graph->default_interrupter = bt_interrupter_create();
+       if (!graph->default_interrupter) {
+               BT_LIB_LOGE_APPEND_CAUSE(
+                       "Failed to create one interrupter object.");
+               goto error;
+       }
+
+       bt_graph_add_interrupter(graph, graph->default_interrupter);
        ret = bt_object_pool_initialize(&graph->event_msg_pool,
                (bt_object_pool_new_object_func) bt_message_event_new,
                (bt_object_pool_destroy_object_func) destroy_message_event,
@@ -417,7 +436,6 @@ enum bt_graph_connect_ports_status bt_graph_connect_ports(
        BT_ASSERT_PRE_NON_NULL(graph, "Graph");
        BT_ASSERT_PRE_NON_NULL(upstream_port, "Upstream port");
        BT_ASSERT_PRE_NON_NULL(downstream_port, "Downstream port port");
-       BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
        BT_ASSERT_PRE(
                graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
                "Graph is not in the \"configuring\" state: %!+g", graph);
@@ -687,7 +705,6 @@ enum bt_graph_consume_status bt_graph_consume(struct bt_graph *graph)
        enum bt_graph_consume_status status;
 
        BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph");
-       BT_ASSERT_PRE_DEV(!graph->canceled, "Graph is canceled: %!+g", graph);
        BT_ASSERT_PRE_DEV(graph->can_consume,
                "Cannot consume graph in its current state: %!+g", graph);
        BT_ASSERT_PRE_DEV(graph->config_state !=
@@ -712,7 +729,6 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph)
        enum bt_graph_run_status status;
 
        BT_ASSERT_PRE_NON_NULL(graph, "Graph");
-       BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
        BT_ASSERT_PRE(graph->can_consume,
                "Cannot consume graph in its current state: %!+g", graph);
        BT_ASSERT_PRE(graph->config_state != BT_GRAPH_CONFIGURATION_STATE_FAULTY,
@@ -728,15 +744,15 @@ enum bt_graph_run_status bt_graph_run(struct bt_graph *graph)
 
        do {
                /*
-                * Check if the graph is canceled at each iteration. If
-                * the graph was canceled by another thread or by a
-                * signal handler, this is not a warning nor an error,
-                * it was intentional: log with a DEBUG level only.
+                * Check if the graph is interrupted at each iteration.
+                * If the graph was interrupted by another thread or by
+                * a signal handler, this is NOT a warning nor an error;
+                * it was intentional: log with an INFO level only.
                 */
-               if (G_UNLIKELY(graph->canceled)) {
-                       BT_LIB_LOGI("Stopping the graph: graph is canceled: "
-                               "%!+g", graph);
-                       status = BT_FUNC_STATUS_CANCELED;
+               if (G_UNLIKELY(bt_graph_is_interrupted(graph))) {
+                       BT_LIB_LOGI("Stopping the graph: "
+                               "graph was interrupted: %!+g", graph);
+                       status = BT_FUNC_STATUS_AGAIN;
                        goto end;
                }
 
@@ -1209,20 +1225,6 @@ end:
        return status;
 }
 
-enum bt_graph_cancel_status bt_graph_cancel(struct bt_graph *graph)
-{
-       BT_ASSERT_PRE_NON_NULL(graph, "Graph");
-       graph->canceled = true;
-       BT_LIB_LOGI("Canceled graph: %!+i", graph);
-       return BT_FUNC_STATUS_OK;
-}
-
-bt_bool bt_graph_is_canceled(const struct bt_graph *graph)
-{
-       BT_ASSERT_PRE_DEV_NON_NULL(graph, "Graph");
-       return graph->canceled ? BT_TRUE : BT_FALSE;
-}
-
 BT_HIDDEN
 void bt_graph_remove_connection(struct bt_graph *graph,
                struct bt_connection *connection)
@@ -1275,7 +1277,6 @@ int add_component_with_init_method_data(
        BT_ASSERT(comp_cls);
        BT_ASSERT_PRE_NON_NULL(graph, "Graph");
        BT_ASSERT_PRE_NON_NULL(name, "Name");
-       BT_ASSERT_PRE(!graph->canceled, "Graph is canceled: %!+g", graph);
        BT_ASSERT_PRE(
                graph->config_state == BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
                "Graph is not in the \"configuring\" state: %!+g", graph);
@@ -1559,6 +1560,32 @@ void bt_graph_add_message(struct bt_graph *graph,
        g_ptr_array_add(graph->messages, msg);
 }
 
+BT_HIDDEN
+bool bt_graph_is_interrupted(const struct bt_graph *graph)
+{
+       BT_ASSERT(graph);
+       return bt_interrupter_array_any_is_set(graph->interrupters);
+}
+
+enum bt_graph_add_interrupter_status bt_graph_add_interrupter(
+               struct bt_graph *graph, const struct bt_interrupter *intr)
+{
+       BT_ASSERT_PRE_NON_NULL(graph, "Graph");
+       BT_ASSERT_PRE_NON_NULL(intr, "Interrupter");
+       g_ptr_array_add(graph->interrupters, (void *) intr);
+       bt_object_get_no_null_check(intr);
+       BT_LIB_LOGD("Added interrupter to graph: %![graph-]+g, %![intr-]+z",
+               graph, intr);
+       return BT_FUNC_STATUS_OK;
+}
+
+void bt_graph_interrupt(struct bt_graph *graph)
+{
+       BT_ASSERT_PRE_NON_NULL(graph, "Graph");
+       bt_interrupter_set(graph->default_interrupter);
+       BT_LIB_LOGI("Interrupted graph: %!+g", graph);
+}
+
 void bt_graph_get_ref(const struct bt_graph *graph)
 {
        bt_object_get_ref(graph);
index 74c794707206e6770fa12f616ead62c74e3bfa9e..4e500d49002417b44854a324134c1e8b2f90f698 100644 (file)
@@ -67,6 +67,7 @@ enum bt_graph_configuration_state {
        BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED,
        BT_GRAPH_CONFIGURATION_STATE_CONFIGURED,
        BT_GRAPH_CONFIGURATION_STATE_FAULTY,
+       BT_GRAPH_CONFIGURATION_STATE_DESTROYING,
 };
 
 struct bt_graph {
@@ -89,7 +90,19 @@ struct bt_graph {
        /* Queue of pointers (weak references) to sink bt_components. */
        GQueue *sinks_to_consume;
 
-       bool canceled;
+       /*
+        * Array of `struct bt_interrupter *`, each one owned by this.
+        * If any interrupter is set, then this graph is deemed
+        * interrupted.
+        */
+       GPtrArray *interrupters;
+
+       /*
+        * Default interrupter to support bt_graph_interrupt(); owned
+        * by this.
+        */
+       struct bt_interrupter *default_interrupter;
+
        bool in_remove_listener;
        bool has_sink;
 
@@ -184,6 +197,9 @@ BT_HIDDEN
 void bt_graph_add_message(struct bt_graph *graph,
                struct bt_message *msg);
 
+BT_HIDDEN
+bool bt_graph_is_interrupted(const struct bt_graph *graph);
+
 static inline
 const char *bt_graph_configuration_state_string(
                enum bt_graph_configuration_state state)
index 46fd153ba68661da2b2d706009e938e278103ef8..149d0c1121fd6c59f1dc34a920e18ed01479b0f1 100644 (file)
@@ -33,4 +33,25 @@ struct bt_interrupter {
        bool is_set;
 };
 
+static inline
+bool bt_interrupter_array_any_is_set(const GPtrArray *interrupters)
+{
+       bool is_set = false;
+       uint64_t i;
+
+       BT_ASSERT(interrupters);
+
+       for (i = 0; i < interrupters->len; i++) {
+               const struct bt_interrupter *intr = interrupters->pdata[i];
+
+               if (intr->is_set) {
+                       is_set = true;
+                       goto end;
+               }
+       }
+
+end:
+       return is_set;
+}
+
 #endif /* BABELTRACE_GRAPH_INTERRUPTER_INTERNAL_H */
index 254e422f9889ddabcadc7b25afd957c49abb662a..b5033083fdaeb25d44121cfbbd8852003fc86c33 100644 (file)
@@ -348,17 +348,16 @@ create_self_component_input_port_message_iterator(
                "Input port is not connected: %![port-]+p", port);
        BT_ASSERT_PRE(comp, "Input port is not part of a component: %![port-]+p",
                port);
-       BT_ASSERT_PRE(!bt_component_graph_is_canceled(comp),
-               "Input port's component's graph is canceled: "
-               "%![port-]+p, %![comp-]+c", port, comp);
        BT_ASSERT(port->connection);
        upstream_port = port->connection->upstream_port;
        BT_ASSERT(upstream_port);
        upstream_comp = bt_port_borrow_component_inline(upstream_port);
        BT_ASSERT(upstream_comp);
        BT_ASSERT_PRE(
-               bt_component_borrow_graph(upstream_comp)->config_state !=
-                       BT_GRAPH_CONFIGURATION_STATE_CONFIGURING,
+               bt_component_borrow_graph(upstream_comp)->config_state ==
+                       BT_GRAPH_CONFIGURATION_STATE_PARTIALLY_CONFIGURED ||
+               bt_component_borrow_graph(upstream_comp)->config_state ==
+                       BT_GRAPH_CONFIGURATION_STATE_CONFIGURED,
                "Graph is not configured: %!+g",
                bt_component_borrow_graph(upstream_comp));
        upstream_comp_cls = upstream_comp->class;
@@ -964,7 +963,6 @@ enum bt_message_iterator_next_status bt_port_output_message_iterator_next(
        graph_status = bt_graph_consume_sink_no_check(iterator->graph,
                iterator->colander);
        switch (graph_status) {
-       case BT_FUNC_STATUS_CANCELED:
        case BT_FUNC_STATUS_AGAIN:
        case BT_FUNC_STATUS_END:
        case BT_FUNC_STATUS_MEMORY_ERROR:
@@ -2119,6 +2117,16 @@ bt_port_output_message_iterator_seek_beginning(
                        iterator));
 }
 
+bt_bool bt_self_message_iterator_is_interrupted(
+               const struct bt_self_message_iterator *self_msg_iter)
+{
+       const struct bt_self_component_port_input_message_iterator *iterator =
+               (const void *) self_msg_iter;
+
+       BT_ASSERT_PRE_NON_NULL(iterator, "Message iterator");
+       return (bt_bool) bt_graph_is_interrupted(iterator->graph);
+}
+
 void bt_port_output_message_iterator_get_ref(
                const struct bt_port_output_message_iterator *iterator)
 {
index 3a00c9b80985abe0af2e4cf2a4f6a96a3d3b0d54..1addeeb302311ddb7ce71c2dc5e114e5d4771ec6 100644 (file)
@@ -37,6 +37,7 @@
 
 #include "component-class.h"
 #include "query-executor.h"
+#include "interrupter.h"
 #include "lib/func-status.h"
 
 static
@@ -46,6 +47,15 @@ void bt_query_executor_destroy(struct bt_object *obj)
                container_of(obj, struct bt_query_executor, base);
 
        BT_LOGD("Destroying query executor: addr=%p", query_exec);
+
+       if (query_exec->interrupters) {
+               BT_LOGD_STR("Putting interrupters.");
+               g_ptr_array_free(query_exec->interrupters, TRUE);
+               query_exec->interrupters = NULL;
+       }
+
+       BT_OBJECT_PUT_REF_AND_RESET(query_exec->default_interrupter);
+
        g_free(query_exec);
 }
 
@@ -61,6 +71,24 @@ struct bt_query_executor *bt_query_executor_create(void)
                goto end;
        }
 
+       query_exec->interrupters = g_ptr_array_new_with_free_func(
+               (GDestroyNotify) bt_object_put_no_null_check);
+       if (!query_exec->interrupters) {
+               BT_LIB_LOGE_APPEND_CAUSE("Failed to allocate one GPtrArray.");
+               BT_OBJECT_PUT_REF_AND_RESET(query_exec);
+               goto end;
+       }
+
+       query_exec->default_interrupter = bt_interrupter_create();
+       if (!query_exec->default_interrupter) {
+               BT_LIB_LOGE_APPEND_CAUSE(
+                       "Failed to create one interrupter object.");
+               BT_OBJECT_PUT_REF_AND_RESET(query_exec);
+               goto end;
+       }
+
+       bt_query_executor_add_interrupter(query_exec,
+               query_exec->default_interrupter);
        bt_object_init_shared(&query_exec->base,
                bt_query_executor_destroy);
        BT_LOGD("Created query executor: addr=%p", query_exec);
@@ -88,7 +116,6 @@ enum bt_query_executor_query_status bt_query_executor_query(
        BT_ASSERT_PRE_NON_NULL(comp_cls, "Component class");
        BT_ASSERT_PRE_NON_NULL(object, "Object");
        BT_ASSERT_PRE_NON_NULL(user_result, "Result (output)");
-       BT_ASSERT_PRE(!query_exec->canceled, "Query executor is canceled.");
 
        if (!params) {
                params = bt_value_null;
@@ -141,29 +168,38 @@ enum bt_query_executor_query_status bt_query_executor_query(
        BT_ASSERT_POST(query_status != BT_FUNC_STATUS_OK || *user_result,
                "User method returned `BT_FUNC_STATUS_OK` without a result.");
        status = (int) query_status;
-       if (query_exec->canceled) {
-               BT_OBJECT_PUT_REF_AND_RESET(*user_result);
-               status = BT_FUNC_STATUS_CANCELED;
-               goto end;
-       }
 
 end:
        return status;
 }
 
-enum bt_query_executor_cancel_status bt_query_executor_cancel(
-               struct bt_query_executor *query_exec)
+enum bt_query_executor_add_interrupter_status bt_query_executor_add_interrupter(
+               struct bt_query_executor *query_exec,
+               const struct bt_interrupter *intr)
 {
        BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
-       query_exec->canceled = BT_TRUE;
-       BT_LOGI("Canceled query executor: addr=%p", query_exec);
+       BT_ASSERT_PRE_NON_NULL(intr, "Interrupter");
+       g_ptr_array_add(query_exec->interrupters, (void *) intr);
+       bt_object_get_no_null_check(intr);
+       BT_LIB_LOGD("Added interrupter to query executor: "
+               "query-exec-addr=%p, %![intr-]+z",
+               query_exec, intr);
        return BT_FUNC_STATUS_OK;
 }
 
-bt_bool bt_query_executor_is_canceled(const struct bt_query_executor *query_exec)
+bt_bool bt_query_executor_is_interrupted(const struct bt_query_executor *query_exec)
 {
-       BT_ASSERT_PRE_DEV_NON_NULL(query_exec, "Query executor");
-       return query_exec->canceled;
+       BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
+       return (bt_bool) bt_interrupter_array_any_is_set(
+               query_exec->interrupters);
+}
+
+void bt_query_executor_interrupt(struct bt_query_executor *query_exec)
+{
+       BT_ASSERT_PRE_NON_NULL(query_exec, "Query executor");
+       bt_interrupter_set(query_exec->default_interrupter);
+       BT_LIB_LOGI("Interrupted query executor: query-exec-addr=%p",
+               query_exec);
 }
 
 void bt_query_executor_get_ref(const struct bt_query_executor *query_executor)
index 875c26c090e2dc8bfe72850c8aa553ea5923ec53..c1dcc9c4a943dfe5015d3261339e1a215ed53416 100644 (file)
  * SOFTWARE.
  */
 
+#include <glib.h>
+
 #include <babeltrace2/types.h>
-#include "lib/object.h"
 #include <babeltrace2/graph/query-executor.h>
 #include <babeltrace2/graph/component-class.h>
 
+#include "lib/object.h"
+
 struct bt_query_executor {
        struct bt_object base;
-       bool canceled;
+
+       /*
+        * Array of `struct bt_interrupter *`, each one owned by this.
+        * If any interrupter is set, then this query executor is deemed
+        * interrupted.
+        */
+       GPtrArray *interrupters;
+
+       /*
+        * Default interrupter to support bt_query_executor_interrupt();
+        * owned by this.
+        */
+       struct bt_interrupter *default_interrupter;
 };
 
 #endif /* BABELTRACE_GRAPH_QUERY_EXECUTOR_INTERNAL_H */
index 91db4a31e4cc43fc504c92243db310cff12e6feb..e9afb80656da61d7bb1cdbbc5dbdd28183e540d4 100644 (file)
@@ -1120,9 +1120,7 @@ static inline void format_graph(char **buf_ch, bool extended,
 {
        char tmp_prefix[TMP_PREFIX_LEN];
 
-       BUF_APPEND(", %sis-canceled=%d, %scan-consume=%d, "
-               "%sconfig-state=%s",
-               PRFIELD(graph->canceled),
+       BUF_APPEND(", %scan-consume=%d, %sconfig-state=%s",
                PRFIELD(graph->can_consume),
                PRFIELD(bt_graph_configuration_state_string(graph->config_state)));
 
index 183d2081f32de63eca90b14e2b5032b635353712..8057663cb3e03fc878e16d600cb2f7ca50b62a2a 100644 (file)
@@ -1203,10 +1203,10 @@ enum bt_value_map_foreach_entry_status bt_value_map_foreach_entry(
                const char *key_str = g_quark_to_string(GPOINTER_TO_UINT(key));
 
                if (!func(key_str, element_obj, data)) {
-                       BT_LOGT("User canceled the loop: key=\"%s\", "
+                       BT_LOGT("User interrupted the loop: key=\"%s\", "
                                "value-addr=%p, data=%p",
                                key_str, element_obj, data);
-                       ret = BT_FUNC_STATUS_CANCELED;
+                       ret = BT_FUNC_STATUS_INTERRUPTED;
                        break;
                }
        }
index 0cabf25d01751e9298c5a95726dac6e4f7ed7ce0..11b80f474d3c78c5986d21ab3c9cc89824bc7ccb 100644 (file)
@@ -109,18 +109,17 @@ const char *print_state(struct lttng_live_stream_iterator *s)
        } while (0);
 
 BT_HIDDEN
-bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live)
+bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter)
 {
-       const bt_component *component;
        bool ret;
 
-       if (!lttng_live) {
+       if (!msg_iter) {
                ret = false;
                goto end;
        }
 
-       component = bt_self_component_as_component(lttng_live->self_comp);
-       ret = bt_component_graph_is_canceled(component);
+       ret = bt_self_message_iterator_is_interrupted(
+               msg_iter->self_msg_iter);
 
 end:
        return ret;
@@ -259,7 +258,6 @@ end:
 static
 void lttng_live_destroy_session(struct lttng_live_session *session)
 {
-       struct lttng_live_component *live_comp;
        bt_logging_level log_level;
        bt_self_component *self_comp;
 
@@ -272,9 +270,9 @@ void lttng_live_destroy_session(struct lttng_live_session *session)
        BT_COMP_LOGD("Destroy lttng live session");
        if (session->id != -1ULL) {
                if (lttng_live_detach_session(session)) {
-                       live_comp = session->lttng_live_msg_iter->lttng_live_comp;
                        if (session->lttng_live_msg_iter &&
-                                       !lttng_live_graph_is_canceled(live_comp)) {
+                                       !lttng_live_graph_is_canceled(
+                                               session->lttng_live_msg_iter)) {
                                /* Old relayd cannot detach sessions. */
                                BT_COMP_LOGD("Unable to detach lttng live session %" PRIu64,
                                        session->id);
@@ -442,7 +440,7 @@ enum lttng_live_iterator_status lttng_live_get_session(
                ret = lttng_live_attach_session(session);
                if (ret) {
                        if (lttng_live_msg_iter && lttng_live_graph_is_canceled(
-                                               lttng_live_msg_iter->lttng_live_comp)) {
+                                       lttng_live_msg_iter)) {
                                status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                        } else {
                                status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
@@ -1589,11 +1587,6 @@ bt_component_class_init_method_status lttng_live_component_init(
                goto error;
        }
 
-       if (lttng_live_graph_is_canceled(lttng_live)) {
-               ret = BT_COMPONENT_CLASS_INIT_METHOD_STATUS_ERROR;
-               goto error;
-       }
-
        add_port_status = bt_self_component_source_add_output_port(
                self_comp_src, "out", NULL, NULL);
        switch (add_port_status) {
index 62e1e752205bcb60b226a5e9c7589ec31741e4a8..77632daf59a2ee8214072b18987e5e4c3af2ca26 100644 (file)
@@ -313,6 +313,6 @@ struct lttng_live_trace *lttng_live_borrow_trace(
                struct lttng_live_session *session, uint64_t trace_id);
 void lttng_live_need_new_streams(struct lttng_live_msg_iter *lttng_live_msg_iter);
 
-bool lttng_live_graph_is_canceled(struct lttng_live_component *lttng_live);
+bool lttng_live_graph_is_canceled(struct lttng_live_msg_iter *msg_iter);
 
 #endif /* BABELTRACE_PLUGIN_CTF_LTTNG_LIVE_H */
index 8475c569633bd022170793dbf5003a1118a90d2d..b31cd96adbabfcdd2d254ae7988eb58a8b28a9ab 100644 (file)
@@ -120,8 +120,6 @@ enum lttng_live_iterator_status lttng_live_metadata_update(
 {
        struct lttng_live_session *session = trace->session;
        struct lttng_live_metadata *metadata = trace->metadata;
-       struct lttng_live_component *lttng_live =
-               session->lttng_live_msg_iter->lttng_live_comp;
        ssize_t ret = 0;
        size_t size, len_read = 0;
        char *metadata_buf = NULL;
@@ -188,7 +186,8 @@ enum lttng_live_iterator_status lttng_live_metadata_update(
                        metadata->trace = NULL;
                }
                if (errno == EINTR) {
-                       if (lttng_live_graph_is_canceled(lttng_live)) {
+                       if (lttng_live_graph_is_canceled(
+                                       session->lttng_live_msg_iter)) {
                                status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                                goto end;
                        }
index ac3354274f79179a7317bf22cb6b0ff663ff6bb5..42ea78c0ad7fd9cca3d4287fff86c91e22960cbd 100644 (file)
@@ -67,7 +67,7 @@ ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
                }
                if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
                        if (!viewer_connection->in_query &&
-                                       lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+                                       lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                                break;
                        } else {
                                continue;
@@ -93,7 +93,7 @@ ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
                ret = bt_socket_send_nosigpipe(sock, buf, len);
                if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
                        if (!viewer_connection->in_query &&
-                                       lttng_live_graph_is_canceled(lttng_live_msg_iter->lttng_live_comp)) {
+                                       lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                                break;
                        } else {
                                continue;
@@ -1125,8 +1125,6 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       struct lttng_live_component *lttng_live =
-               lttng_live_msg_iter->lttng_live_comp;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -1239,7 +1237,7 @@ end:
        return retstatus;
 
 error:
-       if (lttng_live_graph_is_canceled(lttng_live)) {
+       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
                retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
@@ -1264,8 +1262,6 @@ enum bt_msg_iter_medium_status lttng_live_get_stream_bytes(
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       struct lttng_live_component *lttng_live =
-               lttng_live_msg_iter->lttng_live_comp;
 
        BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1364,7 +1360,7 @@ end:
        return retstatus;
 
 error:
-       if (lttng_live_graph_is_canceled(lttng_live)) {
+       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                retstatus = BT_MSG_ITER_MEDIUM_STATUS_AGAIN;
        } else {
                retstatus = BT_MSG_ITER_MEDIUM_STATUS_ERROR;
@@ -1389,8 +1385,6 @@ enum lttng_live_iterator_status lttng_live_get_new_streams(
                session->lttng_live_msg_iter;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
-       struct lttng_live_component *lttng_live =
-               lttng_live_msg_iter->lttng_live_comp;
        uint32_t streams_count;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
@@ -1461,7 +1455,7 @@ end:
        return status;
 
 error:
-       if (lttng_live_graph_is_canceled(lttng_live)) {
+       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
        } else {
                status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
index 4f6aade1febc787f8544e736beac3edd5cb55647..7a403b5cd3e68dd8b949cd88e72c78659c8cabd7 100644 (file)
@@ -196,14 +196,52 @@ class GraphTestCase(unittest.TestCase):
                 sink.input_ports['in'], src.output_ports['out']
             )
 
-    def test_cancel(self):
-        self.assertFalse(self._graph.is_canceled)
-        self._graph.cancel()
-        self.assertTrue(self._graph.is_canceled)
-
-    # Test that Graph.run() raises bt2.Canceled if the graph gets canceled
-    # during execution.
-    def test_cancel_while_running(self):
+    def test_add_interrupter(self):
+        class MyIter(bt2._UserMessageIterator):
+            def __next__(self):
+                raise TypeError
+
+        class MySource(bt2._UserSourceComponent, message_iterator_class=MyIter):
+            def __init__(self, params):
+                self._add_output_port('out')
+
+        class MySink(bt2._UserSinkComponent):
+            def __init__(self, params):
+                self._add_input_port('in')
+
+            def _consume(self):
+                next(self._msg_iter)
+
+            def _graph_is_configured(self):
+                self._msg_iter = self._create_input_port_message_iterator(
+                    self._input_ports['in']
+                )
+
+        # add two interrupters, set one of them
+        interrupter1 = bt2.Interrupter()
+        interrupter2 = bt2.Interrupter()
+        self._graph.add_interrupter(interrupter1)
+        src = self._graph.add_component(MySource, 'src')
+        sink = self._graph.add_component(MySink, 'sink')
+        self._graph.connect_ports(src.output_ports['out'], sink.input_ports['in'])
+        self._graph.add_interrupter(interrupter2)
+
+        with self.assertRaises(bt2._Error):
+            self._graph.run()
+
+        interrupter2.set()
+
+        with self.assertRaises(bt2.TryAgain):
+            self._graph.run()
+
+        interrupter2.reset()
+
+        with self.assertRaises(bt2._Error):
+            self._graph.run()
+
+    # Test that Graph.run() raises bt2.Interrupted if the graph gets
+    # interrupted during execution.
+    def test_interrupt_while_running(self):
         class MyIter(_MyIter):
             def __next__(self):
                 return self._create_stream_beginning_message(self._stream)
@@ -217,10 +255,9 @@ class GraphTestCase(unittest.TestCase):
                 self._add_input_port('in')
 
             def _consume(self):
-                # Pretend that somebody asynchronously cancelled the graph.
+                # Pretend that somebody asynchronously interrupted the graph.
                 nonlocal graph
-                graph.cancel()
-
+                graph.interrupt()
                 return next(self._msg_iter)
 
             def _graph_is_configured(self):
@@ -228,12 +265,13 @@ class GraphTestCase(unittest.TestCase):
                     self._input_ports['in']
                 )
 
-        graph = bt2.Graph()
-        up = graph.add_component(MySource, 'down')
-        down = graph.add_component(MySink, 'up')
-        graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
-        with self.assertRaises(bt2.Canceled):
-            graph.run()
+        graph = self._graph
+        up = self._graph.add_component(MySource, 'down')
+        down = self._graph.add_component(MySink, 'up')
+        self._graph.connect_ports(up.output_ports['out'], down.input_ports['in'])
+
+        with self.assertRaises(bt2.TryAgain):
+            self._graph.run()
 
     def test_run(self):
         class MyIter(_MyIter):
index 0977430940f6afef0b07881cb104f634f77ea5d5..bc20a719b3dca0000ee9a1fa123ed51041ddc0ef 100644 (file)
@@ -183,13 +183,32 @@ class QueryExecutorTestCase(unittest.TestCase):
         with self.assertRaises(bt2.TryAgain):
             res = bt2.QueryExecutor().query(MySink, 'obj', [17, 23])
 
-    def test_cancel_no_query(self):
+    def test_query_add_interrupter(self):
+        class MySink(bt2._UserSinkComponent):
+            def _consume(self):
+                pass
+
+            def _graph_is_configured(self):
+                pass
+
+            @classmethod
+            def _query(cls, query_exec, obj, params, log_level):
+                nonlocal interrupter2
+                test_self.assertFalse(query_exec.is_interrupted)
+                interrupter2.set()
+                test_self.assertTrue(query_exec.is_interrupted)
+                interrupter2.reset()
+                test_self.assertFalse(query_exec.is_interrupted)
+
+        interrupter1 = bt2.Interrupter()
+        interrupter2 = bt2.Interrupter()
+        test_self = self
         query_exec = bt2.QueryExecutor()
-        self.assertFalse(query_exec.is_canceled)
-        query_exec.cancel()
-        self.assertTrue(query_exec.is_canceled)
+        query_exec.add_interrupter(interrupter1)
+        query_exec.add_interrupter(interrupter2)
+        query_exec.query(MySink, 'obj', [17, 23])
 
-    def test_query_canceled(self):
+    def test_query_interrupt(self):
         class MySink(bt2._UserSinkComponent):
             def _consume(self):
                 pass
@@ -199,10 +218,10 @@ class QueryExecutorTestCase(unittest.TestCase):
 
             @classmethod
             def _query(cls, query_exec, obj, params, log_level):
-                raise bt2.TryAgain
+                test_self.assertFalse(query_exec.is_interrupted)
+                query_exec.interrupt()
+                test_self.assertTrue(query_exec.is_interrupted)
 
+        test_self = self
         query_exec = bt2.QueryExecutor()
-        query_exec.cancel()
-
-        with self.assertRaises(bt2.Canceled):
-            res = query_exec.query(MySink, 'obj', [17, 23])
+        query_exec.query(MySink, 'obj', [17, 23])
index e170a5f39db935d4358c5b5393cb2f6e860259b7..39427f46e4b568fb2d03200a5c5dc672300e88bd 100644 (file)
@@ -651,7 +651,7 @@ void test_map(void)
 
        ret = bt_value_map_foreach_entry(map_obj, test_map_foreach_cb_count,
                &count);
-       ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_CANCELED && count == 3,
+       ok(ret == BT_VALUE_MAP_FOREACH_ENTRY_STATUS_INTERRUPTED && count == 3,
                "bt_value_map_foreach_entry() breaks the loop when the user function returns BT_FALSE");
 
        memset(&checklist, 0, sizeof(checklist));
This page took 0.054381 seconds and 4 git commands to generate.