src.ctf.lttng-live: make component class handle interruptions
[babeltrace.git] / src / plugins / ctf / lttng-live / lttng-live.c
index e184b6328815703ef4d3693fac22380cfe896a4a..e236655815a5d68f7b9854bc2b052075a06761a6 100644 (file)
@@ -447,10 +447,10 @@ enum lttng_live_iterator_status lttng_live_get_session(
        uint64_t trace_idx;
 
        if (!session->attached) {
-               enum lttng_live_attach_session_status attach_status =
+               enum lttng_live_viewer_status attach_status =
                        lttng_live_attach_session(session,
                                lttng_live_msg_iter->self_msg_iter);
-               if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
+               if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
                        if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                                /*
                                 * Clear any causes appended in
@@ -546,7 +546,8 @@ enum lttng_live_iterator_status
 lttng_live_iterator_handle_new_streams_and_metadata(
                struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status;
+       enum lttng_live_viewer_status viewer_status;
        bt_logging_level log_level = lttng_live_msg_iter->log_level;
        bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
        uint64_t session_idx = 0, nr_sessions_opened = 0;
@@ -562,17 +563,24 @@ lttng_live_iterator_handle_new_streams_and_metadata(
         */
        if (lttng_live_msg_iter->sessions->len == 0) {
                if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
-                       ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_END;
                        goto end;
                } else {
                        /*
                         * Retry to create a viewer session for the requested
                         * session name.
                         */
-                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
-                               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-                               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                                       "Error creating LTTng live viewer session");
+                       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+                       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                                       status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                                               "Error creating LTTng live viewer session");
+                               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                                       status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                               } else {
+                                       bt_common_abort();
+                               }
                                goto end;
                        }
                }
@@ -582,12 +590,12 @@ lttng_live_iterator_handle_new_streams_and_metadata(
                        session_idx++) {
                session = g_ptr_array_index(lttng_live_msg_iter->sessions,
                                session_idx);
-               ret = lttng_live_get_session(lttng_live_msg_iter, session);
-               switch (ret) {
+               status = lttng_live_get_session(lttng_live_msg_iter, session);
+               switch (status) {
                case LTTNG_LIVE_ITERATOR_STATUS_OK:
                        break;
                case LTTNG_LIVE_ITERATOR_STATUS_END:
-                       ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_OK;
                        break;
                default:
                        goto end;
@@ -596,13 +604,16 @@ lttng_live_iterator_handle_new_streams_and_metadata(
                        nr_sessions_opened++;
                }
        }
-end:
-       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                       sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+
+       if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
                        nr_sessions_opened == 0) {
-               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = LTTNG_LIVE_ITERATOR_STATUS_END;
+       } else {
+               status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
-       return ret;
+
+end:
+       return status;
 }
 
 static
@@ -1259,6 +1270,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                uint64_t *count)
 {
        bt_component_class_message_iterator_next_method_status status;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                bt_self_message_iterator_get_data(self_msg_it);
        struct lttng_live_component *lttng_live =
@@ -1272,6 +1284,21 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
        BT_ASSERT_DBG(lttng_live_msg_iter);
 
+       if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
+               /*
+                * The iterator was interrupted in a previous call to the
+                * `_next()` method. We currently do not support generating
+                * messages after such event. The babeltrace2 CLI should never
+                * be running the graph after being interrupted. So this check
+                * is to prevent other graph users from using this live
+                * iterator in an messed up internal state.
+                */
+               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                       "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
+               goto end;
+       }
+
        /*
         * Clear all the invalid message reference that might be left over in
         * the output array.
@@ -1287,16 +1314,25 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                if (lttng_live->params.sess_not_found_act !=
                                SESSION_NOT_FOUND_ACTION_CONTINUE) {
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
-                       goto no_session;
+                       goto end;
                } else {
                        /*
                         * The are no more active session for this session
                         * name. Retry to create a viewer session for the
                         * requested session name.
                         */
-                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
-                               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
-                               goto no_session;
+                       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+                       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                                       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+                                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                                               "Error creating LTTng live viewer session");
+                               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                                       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
+                               } else {
+                                       bt_common_abort();
+                               }
+                               goto end;
                        }
                }
        }
@@ -1364,7 +1400,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                        }
 
                        if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-                               goto end;
+                               goto return_status;
                        }
 
                        if (G_UNLIKELY(youngest_stream_iter == NULL) ||
@@ -1407,7 +1443,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
                if (!youngest_stream_iter) {
                        stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-                       goto end;
+                       goto return_status;
                }
 
                BT_ASSERT_DBG(youngest_stream_iter->current_msg);
@@ -1429,17 +1465,20 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
                stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
-end:
+
+return_status:
        switch (stream_iter_status) {
        case LTTNG_LIVE_ITERATOR_STATUS_OK:
        case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               /*
+                * If we gathered messages, return _OK even if the graph was
+                * interrupted. This allows for the components downstream to at
+                * least get the thoses messages. If the graph was indeed
+                * interrupted there should not be another _next() call as the
+                * application will tear down the graph. This component class
+                * doesn't support restarting after an interruption.
+                */
                if (*count > 0) {
-                       /*
-                        * We received a again status but we have some messages
-                        * to send downstream. We send them and return OK for
-                        * now. On the next call we return again if there are
-                        * still no new message to send.
-                        */
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
                } else {
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
@@ -1462,7 +1501,7 @@ end:
                bt_common_abort();
        }
 
-no_session:
+end:
        return status;
 }
 
@@ -1473,12 +1512,12 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
                bt_self_component_source *self_comp_src,
                bt_self_component_port_output *self_port)
 {
-       bt_component_class_message_iterator_initialize_method_status ret =
-               BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
+       bt_component_class_message_iterator_initialize_method_status status;
        bt_self_component *self_comp =
                bt_self_component_source_as_self_component(self_comp_src);
        struct lttng_live_component *lttng_live;
        struct lttng_live_msg_iter *lttng_live_msg_iter;
+       enum lttng_live_viewer_status viewer_status;
        bt_logging_level log_level;
 
        BT_ASSERT(self_msg_it);
@@ -1493,7 +1532,9 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
 
        lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
        if (!lttng_live_msg_iter) {
-               ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
+               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                       "Failed to allocate lttng_live_msg_iter");
                goto end;
        }
 
@@ -1504,42 +1545,65 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
 
        lttng_live_msg_iter->active_stream_iter = 0;
        lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+       lttng_live_msg_iter->was_interrupted = false;
+
        lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
                (GDestroyNotify) lttng_live_destroy_session);
        BT_ASSERT(lttng_live_msg_iter->sessions);
 
-       lttng_live_msg_iter->viewer_connection =
-               live_viewer_connection_create(lttng_live->params.url->str, false,
-                       lttng_live_msg_iter, self_comp, NULL, log_level);
-       if (!lttng_live_msg_iter->viewer_connection) {
+        viewer_status = live_viewer_connection_create(self_comp, NULL,
+               log_level, lttng_live->params.url->str, false,
+               lttng_live_msg_iter, &lttng_live_msg_iter->viewer_connection);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Failed to create viewer connection");
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       /*
+                        * Interruption in the _iter_init() method is not
+                        * supported. Return an error.
+                        */
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Interrupted while creating viewer connection");
+               }
                goto error;
        }
 
-       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Failed to create viewer session");
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       /*
+                        * Interruption in the _iter_init() method is not
+                        * supported. Return an error.
+                        */
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Interrupted when creating viewer session");
+               }
                goto error;
        }
+
        if (lttng_live_msg_iter->sessions->len == 0) {
                switch (lttng_live->params.sess_not_found_act) {
                case SESSION_NOT_FOUND_ACTION_CONTINUE:
-                       BT_COMP_LOGI("Unable to connect to the requested live viewer "
-                               "session. Keep trying to connect because of "
+                       BT_COMP_LOGI("Unable to connect to the requested live viewer session. Keep trying to connect because of "
                                "%s=\"%s\" component parameter: url=\"%s\"",
                                SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_CONTINUE_STR,
                                lttng_live->params.url->str);
                        break;
                case SESSION_NOT_FOUND_ACTION_FAIL:
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
-                               "session. Fail the message iterator"
-                               "initialization because of %s=\"%s\" "
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
                                "component parameter: url =\"%s\"",
                                SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_FAIL_STR,
                                lttng_live->params.url->str);
                        goto error;
                case SESSION_NOT_FOUND_ACTION_END:
-                       BT_COMP_LOGI("Unable to connect to the requested live viewer "
-                               "session. End gracefully at the first _next() "
+                       BT_COMP_LOGI("Unable to connect to the requested live viewer session. End gracefully at the first _next() "
                                "call because of %s=\"%s\" component parameter: "
                                "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_END_STR,
@@ -1551,13 +1615,14 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
        }
 
        bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
-
+       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
        goto end;
+
 error:
-       ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
        lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 end:
-       return ret;
+       return status;
 }
 
 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
@@ -1575,6 +1640,7 @@ bt_component_class_query_method_status lttng_live_query_list_sessions(
        const bt_value *url_value = NULL;
        const char *url;
        struct live_viewer_connection *viewer_connection = NULL;
+       enum lttng_live_viewer_status viewer_status;
        enum bt_param_validation_status validation_status;
        gchar *validate_error = NULL;
 
@@ -1593,12 +1659,18 @@ bt_component_class_query_method_status lttng_live_query_list_sessions(
        url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
        url = bt_value_string_get(url_value);
 
-       viewer_connection = live_viewer_connection_create(url, true, NULL,
-               NULL, self_comp_class, log_level);
-       if (!viewer_connection) {
-               BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                       "Failed to create viewer connection");
-               status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+       viewer_status = live_viewer_connection_create(NULL, self_comp_class,
+               log_level, url, true, NULL, &viewer_connection);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+                               "Failed to create viewer connection");
+                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
+               } else {
+                       bt_common_abort();
+               }
                goto error;
        }
 
This page took 0.02771 seconds and 4 git commands to generate.