src.ctf.lttng-live: make component class handle interruptions
authorFrancis Deslauriers <francis.deslauriers@efficios.com>
Mon, 4 Nov 2019 17:07:22 +0000 (12:07 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 15 Nov 2019 21:10:12 +0000 (16:10 -0500)
Currently, if a `src.ctf.lttng-live` component is interrupted by a
signal during a I/O syscall, it will return an _ERROR status downstream.
This is wrong for two reasons:
  1. Some signals are not problematic (e.g. SIGUSR1) so we want to
     continue as normal, and
  2. we want to return an _AGAIN status when we receive a SIGINT
     signal.

With this commit, when getting interrupted during a `recv()` or `send()`
call, we check if the graph is now in the interrupted state and return
the _AGAIN status if this case.
This commit changes the semantic of the lttng_live_{recv, send}
functions to make them return a status (_OK, _INTERRUPTED, or _ERROR).
The _OK status is only returned if the entire message was received or
sent. This allows us to remove many checks of the length of received or
sent data.

With this commit, once a live iterator is interrupted by SIGINT and
returns _AGAIN it's not in a state that can be recovered if the graph was
to be run again. Further work is needed to make this component class
restartable.

So to prevent user of the graph to call the _NEXT method after an
interruption, we mark the live iterator as interrupted (with the
`was_interrupted` boolean field) and return an error if it's the case.

Signed-off-by: Francis Deslauriers <francis.deslauriers@efficios.com>
Change-Id: I36cba1c3456250099ddfa9a2b15646c3e4f61e94
Reviewed-on: https://review.lttng.org/c/babeltrace/+/2324
Reviewed-by: Simon Marchi <simon.marchi@efficios.com>
src/plugins/ctf/lttng-live/lttng-live.c
src/plugins/ctf/lttng-live/lttng-live.h
src/plugins/ctf/lttng-live/metadata.c
src/plugins/ctf/lttng-live/viewer-connection.c
src/plugins/ctf/lttng-live/viewer-connection.h

index e184b6328815703ef4d3693fac22380cfe896a4a..e236655815a5d68f7b9854bc2b052075a06761a6 100644 (file)
@@ -447,10 +447,10 @@ enum lttng_live_iterator_status lttng_live_get_session(
        uint64_t trace_idx;
 
        if (!session->attached) {
-               enum lttng_live_attach_session_status attach_status =
+               enum lttng_live_viewer_status attach_status =
                        lttng_live_attach_session(session,
                                lttng_live_msg_iter->self_msg_iter);
-               if (attach_status != LTTNG_LIVE_ATTACH_SESSION_STATUS_OK) {
+               if (attach_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
                        if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
                                /*
                                 * Clear any causes appended in
@@ -546,7 +546,8 @@ enum lttng_live_iterator_status
 lttng_live_iterator_handle_new_streams_and_metadata(
                struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
-       enum lttng_live_iterator_status ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status;
+       enum lttng_live_viewer_status viewer_status;
        bt_logging_level log_level = lttng_live_msg_iter->log_level;
        bt_self_component *self_comp = lttng_live_msg_iter->self_comp;
        uint64_t session_idx = 0, nr_sessions_opened = 0;
@@ -562,17 +563,24 @@ lttng_live_iterator_handle_new_streams_and_metadata(
         */
        if (lttng_live_msg_iter->sessions->len == 0) {
                if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE) {
-                       ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_END;
                        goto end;
                } else {
                        /*
                         * Retry to create a viewer session for the requested
                         * session name.
                         */
-                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
-                               ret = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-                               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                                       "Error creating LTTng live viewer session");
+                       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+                       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                                       status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+                                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                                               "Error creating LTTng live viewer session");
+                               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                                       status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+                               } else {
+                                       bt_common_abort();
+                               }
                                goto end;
                        }
                }
@@ -582,12 +590,12 @@ lttng_live_iterator_handle_new_streams_and_metadata(
                        session_idx++) {
                session = g_ptr_array_index(lttng_live_msg_iter->sessions,
                                session_idx);
-               ret = lttng_live_get_session(lttng_live_msg_iter, session);
-               switch (ret) {
+               status = lttng_live_get_session(lttng_live_msg_iter, session);
+               switch (status) {
                case LTTNG_LIVE_ITERATOR_STATUS_OK:
                        break;
                case LTTNG_LIVE_ITERATOR_STATUS_END:
-                       ret = LTTNG_LIVE_ITERATOR_STATUS_OK;
+                       status = LTTNG_LIVE_ITERATOR_STATUS_OK;
                        break;
                default:
                        goto end;
@@ -596,13 +604,16 @@ lttng_live_iterator_handle_new_streams_and_metadata(
                        nr_sessions_opened++;
                }
        }
-end:
-       if (ret == LTTNG_LIVE_ITERATOR_STATUS_OK &&
-                       sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
+
+       if (sess_not_found_act != SESSION_NOT_FOUND_ACTION_CONTINUE &&
                        nr_sessions_opened == 0) {
-               ret = LTTNG_LIVE_ITERATOR_STATUS_END;
+               status = LTTNG_LIVE_ITERATOR_STATUS_END;
+       } else {
+               status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
-       return ret;
+
+end:
+       return status;
 }
 
 static
@@ -1259,6 +1270,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                uint64_t *count)
 {
        bt_component_class_message_iterator_next_method_status status;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                bt_self_message_iterator_get_data(self_msg_it);
        struct lttng_live_component *lttng_live =
@@ -1272,6 +1284,21 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
        BT_ASSERT_DBG(lttng_live_msg_iter);
 
+       if (G_UNLIKELY(lttng_live_msg_iter->was_interrupted)) {
+               /*
+                * The iterator was interrupted in a previous call to the
+                * `_next()` method. We currently do not support generating
+                * messages after such event. The babeltrace2 CLI should never
+                * be running the graph after being interrupted. So this check
+                * is to prevent other graph users from using this live
+                * iterator in an messed up internal state.
+                */
+               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                       "Message iterator was interrupted during a previous call to the `next()` and currently does not support continuing after such event.");
+               goto end;
+       }
+
        /*
         * Clear all the invalid message reference that might be left over in
         * the output array.
@@ -1287,16 +1314,25 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                if (lttng_live->params.sess_not_found_act !=
                                SESSION_NOT_FOUND_ACTION_CONTINUE) {
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_END;
-                       goto no_session;
+                       goto end;
                } else {
                        /*
                         * The are no more active session for this session
                         * name. Retry to create a viewer session for the
                         * requested session name.
                         */
-                       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
-                               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
-                               goto no_session;
+                       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+                       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                                       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_ERROR;
+                                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                                               "Error creating LTTng live viewer session");
+                               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                                       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
+                               } else {
+                                       bt_common_abort();
+                               }
+                               goto end;
                        }
                }
        }
@@ -1364,7 +1400,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
                        }
 
                        if (stream_iter_status != LTTNG_LIVE_ITERATOR_STATUS_OK) {
-                               goto end;
+                               goto return_status;
                        }
 
                        if (G_UNLIKELY(youngest_stream_iter == NULL) ||
@@ -1407,7 +1443,7 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
                if (!youngest_stream_iter) {
                        stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-                       goto end;
+                       goto return_status;
                }
 
                BT_ASSERT_DBG(youngest_stream_iter->current_msg);
@@ -1429,17 +1465,20 @@ bt_component_class_message_iterator_next_method_status lttng_live_msg_iter_next(
 
                stream_iter_status = LTTNG_LIVE_ITERATOR_STATUS_OK;
        }
-end:
+
+return_status:
        switch (stream_iter_status) {
        case LTTNG_LIVE_ITERATOR_STATUS_OK:
        case LTTNG_LIVE_ITERATOR_STATUS_AGAIN:
+               /*
+                * If we gathered messages, return _OK even if the graph was
+                * interrupted. This allows for the components downstream to at
+                * least get the thoses messages. If the graph was indeed
+                * interrupted there should not be another _next() call as the
+                * application will tear down the graph. This component class
+                * doesn't support restarting after an interruption.
+                */
                if (*count > 0) {
-                       /*
-                        * We received a again status but we have some messages
-                        * to send downstream. We send them and return OK for
-                        * now. On the next call we return again if there are
-                        * still no new message to send.
-                        */
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_OK;
                } else {
                        status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_NEXT_METHOD_STATUS_AGAIN;
@@ -1462,7 +1501,7 @@ end:
                bt_common_abort();
        }
 
-no_session:
+end:
        return status;
 }
 
@@ -1473,12 +1512,12 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
                bt_self_component_source *self_comp_src,
                bt_self_component_port_output *self_port)
 {
-       bt_component_class_message_iterator_initialize_method_status ret =
-               BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
+       bt_component_class_message_iterator_initialize_method_status status;
        bt_self_component *self_comp =
                bt_self_component_source_as_self_component(self_comp_src);
        struct lttng_live_component *lttng_live;
        struct lttng_live_msg_iter *lttng_live_msg_iter;
+       enum lttng_live_viewer_status viewer_status;
        bt_logging_level log_level;
 
        BT_ASSERT(self_msg_it);
@@ -1493,7 +1532,9 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
 
        lttng_live_msg_iter = g_new0(struct lttng_live_msg_iter, 1);
        if (!lttng_live_msg_iter) {
-               ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+               status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_MEMORY_ERROR;
+               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                       "Failed to allocate lttng_live_msg_iter");
                goto end;
        }
 
@@ -1504,42 +1545,65 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
 
        lttng_live_msg_iter->active_stream_iter = 0;
        lttng_live_msg_iter->last_msg_ts_ns = INT64_MIN;
+       lttng_live_msg_iter->was_interrupted = false;
+
        lttng_live_msg_iter->sessions = g_ptr_array_new_with_free_func(
                (GDestroyNotify) lttng_live_destroy_session);
        BT_ASSERT(lttng_live_msg_iter->sessions);
 
-       lttng_live_msg_iter->viewer_connection =
-               live_viewer_connection_create(lttng_live->params.url->str, false,
-                       lttng_live_msg_iter, self_comp, NULL, log_level);
-       if (!lttng_live_msg_iter->viewer_connection) {
+        viewer_status = live_viewer_connection_create(self_comp, NULL,
+               log_level, lttng_live->params.url->str, false,
+               lttng_live_msg_iter, &lttng_live_msg_iter->viewer_connection);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Failed to create viewer connection");
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       /*
+                        * Interruption in the _iter_init() method is not
+                        * supported. Return an error.
+                        */
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Interrupted while creating viewer connection");
+               }
                goto error;
        }
 
-       if (lttng_live_create_viewer_session(lttng_live_msg_iter)) {
+       viewer_status = lttng_live_create_viewer_session(lttng_live_msg_iter);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Failed to create viewer session");
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       /*
+                        * Interruption in the _iter_init() method is not
+                        * supported. Return an error.
+                        */
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Interrupted when creating viewer session");
+               }
                goto error;
        }
+
        if (lttng_live_msg_iter->sessions->len == 0) {
                switch (lttng_live->params.sess_not_found_act) {
                case SESSION_NOT_FOUND_ACTION_CONTINUE:
-                       BT_COMP_LOGI("Unable to connect to the requested live viewer "
-                               "session. Keep trying to connect because of "
+                       BT_COMP_LOGI("Unable to connect to the requested live viewer session. Keep trying to connect because of "
                                "%s=\"%s\" component parameter: url=\"%s\"",
                                SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_CONTINUE_STR,
                                lttng_live->params.url->str);
                        break;
                case SESSION_NOT_FOUND_ACTION_FAIL:
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Unable to connect to the requested live viewer "
-                               "session. Fail the message iterator"
-                               "initialization because of %s=\"%s\" "
+                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
+                               "Unable to connect to the requested live viewer session. Fail the message iterator initialization because of %s=\"%s\" "
                                "component parameter: url =\"%s\"",
                                SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_FAIL_STR,
                                lttng_live->params.url->str);
                        goto error;
                case SESSION_NOT_FOUND_ACTION_END:
-                       BT_COMP_LOGI("Unable to connect to the requested live viewer "
-                               "session. End gracefully at the first _next() "
+                       BT_COMP_LOGI("Unable to connect to the requested live viewer session. End gracefully at the first _next() "
                                "call because of %s=\"%s\" component parameter: "
                                "url=\"%s\"", SESS_NOT_FOUND_ACTION_PARAM,
                                SESS_NOT_FOUND_ACTION_END_STR,
@@ -1551,13 +1615,14 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
        }
 
        bt_self_message_iterator_set_data(self_msg_it, lttng_live_msg_iter);
-
+       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_OK;
        goto end;
+
 error:
-       ret = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
+       status = BT_COMPONENT_CLASS_MESSAGE_ITERATOR_INITIALIZE_METHOD_STATUS_ERROR;
        lttng_live_msg_iter_destroy(lttng_live_msg_iter);
 end:
-       return ret;
+       return status;
 }
 
 static struct bt_param_validation_map_value_entry_descr list_sessions_params[] = {
@@ -1575,6 +1640,7 @@ bt_component_class_query_method_status lttng_live_query_list_sessions(
        const bt_value *url_value = NULL;
        const char *url;
        struct live_viewer_connection *viewer_connection = NULL;
+       enum lttng_live_viewer_status viewer_status;
        enum bt_param_validation_status validation_status;
        gchar *validate_error = NULL;
 
@@ -1593,12 +1659,18 @@ bt_component_class_query_method_status lttng_live_query_list_sessions(
        url_value = bt_value_map_borrow_entry_value_const(params, URL_PARAM);
        url = bt_value_string_get(url_value);
 
-       viewer_connection = live_viewer_connection_create(url, true, NULL,
-               NULL, self_comp_class, log_level);
-       if (!viewer_connection) {
-               BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                       "Failed to create viewer connection");
-               status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+       viewer_status = live_viewer_connection_create(NULL, self_comp_class,
+               log_level, url, true, NULL, &viewer_connection);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
+                       BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
+                               "Failed to create viewer connection");
+                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
+               } else {
+                       bt_common_abort();
+               }
                goto error;
        }
 
index bb1c760ee6a6efe0aaf63bab26213c005711fcba..1447cc5d2815ea7bb16a65a6f934b36191b8d2c5 100644 (file)
@@ -238,6 +238,9 @@ struct lttng_live_msg_iter {
 
        /* Timestamp in nanosecond of the last message sent downstream. */
        int64_t last_msg_ts_ns;
+
+       /* True if the iterator was interrupted. */
+       bool was_interrupted;
 };
 
 enum lttng_live_iterator_status {
@@ -285,18 +288,16 @@ bt_component_class_message_iterator_initialize_method_status lttng_live_msg_iter
 
 void lttng_live_msg_iter_finalize(bt_self_message_iterator *it);
 
-int lttng_live_create_viewer_session(struct lttng_live_msg_iter *lttng_live_msg_iter);
-
-enum lttng_live_attach_session_status {
-       LTTNG_LIVE_ATTACH_SESSION_STATUS_OK     = 0,
-       LTTNG_LIVE_ATTACH_SESSION_STATUS_ERROR  = -2,
-};
+enum lttng_live_viewer_status lttng_live_create_viewer_session(
+               struct lttng_live_msg_iter *lttng_live_msg_iter);
 
-enum lttng_live_attach_session_status lttng_live_attach_session(
+enum lttng_live_viewer_status lttng_live_attach_session(
                struct lttng_live_session *session,
                bt_self_message_iterator *self_msg_iter);
 
-int lttng_live_detach_session(struct lttng_live_session *session);
+enum lttng_live_viewer_status lttng_live_detach_session(
+               struct lttng_live_session *session);
+
 enum lttng_live_iterator_status lttng_live_get_new_streams(
                struct lttng_live_session *session,
                bt_self_message_iterator *self_msg_iter);
@@ -306,20 +307,6 @@ int lttng_live_add_session(struct lttng_live_msg_iter *lttng_live_msg_iter,
                const char *hostname,
                const char *session_name);
 
-enum lttng_live_get_one_metadata_status {
-       /* The end of the metadata stream was reached. */
-       LTTNG_LIVE_GET_ONE_METADATA_STATUS_END      = 1,
-       /* One metadata packet was received and written to file. */
-       LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK       = 0,
-       /* The metadata stream was not found on the relay. */
-       LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED   = -1,
-       /*
-        * A critical error occurred when contacting the relay or while
-        * handling its response.
-        */
-       LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR    = -2,
-};
-
 /*
  * lttng_live_get_one_metadata_packet() asks the Relay Daemon for new metadata.
  * If new metadata is received, the function writes it to the provided file
index 787fc0b322da0c39d68494b636c852a6e6ec8876..7a7a0d8a445c59815ba85e8a540c33b447a48819 100644 (file)
@@ -151,11 +151,15 @@ enum lttng_live_iterator_status lttng_live_metadata_update(
                goto end;
        }
 
-       /* Open for writing */
+       /*
+        * Open a new write only file handle to populate the `metadata_buf`
+        * memory buffer so we can write in loop in it easily.
+        */
        fp = bt_open_memstream(&metadata_buf, &size);
        if (!fp) {
                if (errno == EINTR &&
                                lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+                       session->lttng_live_msg_iter->was_interrupted = true;
                        status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                } else {
                        BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
@@ -210,6 +214,7 @@ enum lttng_live_iterator_status lttng_live_metadata_update(
                goto end;
        }
 
+       /* The memory buffer `metadata_buf` contains all the metadata. */
        if (bt_close_memstream(&metadata_buf, &size, fp)) {
                BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
                        "Metadata bt_close_memstream", ".");
@@ -226,10 +231,15 @@ enum lttng_live_iterator_status lttng_live_metadata_update(
                goto end;
        }
 
+       /*
+        * Open a new reading file handle on the `metadata_buf` and pass it to
+        * the metadata decoder.
+        */
        fp = bt_fmemopen(metadata_buf, len_read, "rb");
        if (!fp) {
                if (errno == EINTR &&
                                lttng_live_graph_is_canceled(session->lttng_live_msg_iter)) {
+                       session->lttng_live_msg_iter->was_interrupted = true;
                        status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                } else {
                        BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
index b1f37f04a0516bc04addd95af1f4c48ba2de7678..69a2a25e7dbff67457aec9be1654687171b19623 100644 (file)
 #include "data-stream.h"
 #include "metadata.h"
 
+#define viewer_handle_send_recv_status(_self_comp, _self_comp_class,   \
+               _status, _action, _msg_str)                             \
+do {                                                                   \
+       switch (_status) {                                              \
+       case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:                      \
+               break;                                                  \
+       case LTTNG_LIVE_VIEWER_STATUS_ERROR:                            \
+               BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp,     \
+                       _self_comp_class, "Error " _action " " _msg_str); \
+               break;                                                  \
+       default:                                                        \
+                bt_common_abort();                                     \
+       }                                                               \
+} while (0)
+
+#define viewer_handle_send_status(_self_comp, _self_comp_class, _status, _msg_str) \
+       viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
+               "sending", _msg_str)
+
+#define viewer_handle_recv_status(_self_comp, _self_comp_class, _status, _msg_str) \
+       viewer_handle_send_recv_status(_self_comp, _self_comp_class, _status, \
+               "receiving", _msg_str)
+
+#define LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(_self_comp, \
+               _self_comp_class, _msg, _fmt, ...) \
+       do {                                                                            \
+               BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(_self_comp, _self_comp_class,   \
+                       _msg ": %s" _fmt, bt_socket_errormsg(), ##__VA_ARGS__);         \
+       } while (0)
+
+static inline
+enum lttng_live_iterator_status viewer_status_to_live_iterator_status(
+               enum lttng_live_viewer_status viewer_status)
+{
+       switch (viewer_status) {
+       case LTTNG_LIVE_VIEWER_STATUS_OK:
+               return LTTNG_LIVE_ITERATOR_STATUS_OK;
+       case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
+               return LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+       case LTTNG_LIVE_VIEWER_STATUS_ERROR:
+               return LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       default:
+               bt_common_abort();
+       }
+}
+
+static inline
+enum ctf_msg_iter_medium_status viewer_status_to_ctf_msg_iter_medium_status(
+               enum lttng_live_viewer_status viewer_status)
+{
+       switch (viewer_status) {
+       case LTTNG_LIVE_VIEWER_STATUS_OK:
+               return CTF_MSG_ITER_MEDIUM_STATUS_OK;
+       case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
+               return CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+       case LTTNG_LIVE_VIEWER_STATUS_ERROR:
+               return CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+       default:
+               bt_common_abort();
+       }
+}
+
+/*
+ * This function receives a message from the Relay daemon.
+ * If it received the entire message, it returns _OK,
+ * If it's interrupted, it returns _INTERRUPTED,
+ * otherwise, it returns _ERROR.
+ */
 static
-ssize_t lttng_live_recv(struct live_viewer_connection *viewer_connection,
+enum lttng_live_viewer_status lttng_live_recv(
+               struct live_viewer_connection *viewer_connection,
                void *buf, size_t len)
 {
-       ssize_t ret;
-       size_t copied = 0, to_copy = len;
+       ssize_t received;
+       bt_self_component_class *self_comp_class =
+               viewer_connection->self_comp_class;
+       bt_self_component *self_comp =
+               viewer_connection->self_comp;
+       size_t total_received = 0, to_receive = len;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                viewer_connection->lttng_live_msg_iter;
+       enum lttng_live_viewer_status status;
        BT_SOCKET sock = viewer_connection->control_sock;
 
+       /*
+        * Receive a message from the Relay.
+        */
        do {
-               ret = bt_socket_recv(sock, buf + copied, to_copy, 0);
-               if (ret > 0) {
-                       BT_ASSERT(ret <= to_copy);
-                       copied += ret;
-                       to_copy -= ret;
-               }
-               if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
-                       if (!viewer_connection->in_query &&
-                                       lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
-                               break;
+               received = bt_socket_recv(sock, buf + total_received, to_receive, 0);
+               if (received == BT_SOCKET_ERROR) {
+                       if (bt_socket_interrupted()) {
+                               if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
+                                       /*
+                                        * This interruption was due to a
+                                        * SIGINT and the graph is being torn
+                                        * down.
+                                        */
+                                       status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
+                                       lttng_live_msg_iter->was_interrupted = true;
+                                       goto end;
+                               } else {
+                                       /*
+                                        * A signal was received, but the graph
+                                        * is not being torn down. Carry on.
+                                        */
+                                       continue;
+                               }
                        } else {
-                               continue;
+                               /*
+                                * For any other types of socket error, returng
+                                * an error.
+                                */
+                               LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
+                                       self_comp, self_comp_class,
+                                       "Error receiving from Relay", ".");
+                               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                               goto end;
                        }
+               } else if (received == 0) {
+                       /*
+                        * The recv() call returned 0. This means the
+                        * connection was orderly shutdown from the other peer.
+                        * If that happens when we are trying to receive
+                        * a message from it, it means something when wrong.
+                        */
+                       status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                       BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
+                               self_comp_class, "Remote side has closed connection");
+                       goto end;
                }
-       } while (ret > 0 && to_copy > 0);
 
-       if (ret > 0) {
-               ret = copied;
-       }
+               BT_ASSERT(received <= to_receive);
+               total_received += received;
+               to_receive -= received;
 
-       /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */
-       return ret;
+       } while (to_receive > 0);
+
+       BT_ASSERT(total_received == len);
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
+
+end:
+       return status;
 }
 
+/*
+ * This function sends a message to the Relay daemon.
+ * If it send the message, it returns _OK,
+ * If it's interrupted, it returns _INTERRUPTED,
+ * otherwise, it returns _ERROR.
+ */
 static
-ssize_t lttng_live_send(struct live_viewer_connection *viewer_connection,
+enum lttng_live_viewer_status lttng_live_send(
+               struct live_viewer_connection *viewer_connection,
                const void *buf, size_t len)
 {
+       enum lttng_live_viewer_status status;
+       bt_self_component_class *self_comp_class =
+               viewer_connection->self_comp_class;
+       bt_self_component *self_comp =
+               viewer_connection->self_comp;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                viewer_connection->lttng_live_msg_iter;
        BT_SOCKET sock = viewer_connection->control_sock;
-       ssize_t ret;
-
-       for (;;) {
-               ret = bt_socket_send_nosigpipe(sock, buf, len);
-               if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) {
-                       if (!viewer_connection->in_query &&
-                                       lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
-                               break;
+       size_t to_send = len;
+       ssize_t total_sent = 0;
+
+       do {
+               ssize_t sent = bt_socket_send_nosigpipe(sock, buf + total_sent,
+                       to_send);
+               if (sent == BT_SOCKET_ERROR) {
+                       if (bt_socket_interrupted()) {
+                               if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
+                                       /*
+                                        * This interruption was a SIGINT and
+                                        * the graph is being teared down.
+                                        */
+                                       status = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED;
+                                       lttng_live_msg_iter->was_interrupted = true;
+                                       goto end;
+                               } else {
+                                       /*
+                                        * A signal was received, but the graph
+                                        * is not being teared down. Carry on.
+                                        */
+                                       continue;
+                               }
                        } else {
-                               continue;
+                               /*
+                                * The send() call returned an error.
+                                */
+                               LTTNG_LIVE_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE_ERRNO(
+                                       self_comp, self_comp_class,
+                                       "Error sending to Relay", ".");
+                               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                               goto end;
                        }
-               } else {
-                       break;
                }
-       }
-       return ret;
+
+               BT_ASSERT(sent <= to_send);
+               total_sent += sent;
+               to_send -= sent;
+
+       } while (to_send > 0);
+
+       BT_ASSERT(total_sent == len);
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
+
+end:
+       return status;
 }
 
 static
@@ -165,16 +316,16 @@ end:
 }
 
 static
-int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
+enum lttng_live_viewer_status lttng_live_handshake(
+               struct live_viewer_connection *viewer_connection)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       enum lttng_live_viewer_status status;
        bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
        bt_self_component *self_comp = viewer_connection->self_comp;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
        char cmd_buf[cmd_buf_len];
-       int ret;
-       ssize_t ret_len;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CONNECT);
        cmd.data_size = htobe64((uint64_t) sizeof(connect));
@@ -192,29 +343,20 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
-                       self_comp_class, "Error sending version: %s",
-                       bt_socket_errormsg());
-               goto error;
-       }
-
-       BT_ASSERT(ret_len == cmd_buf_len);
 
-       ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
-       if (ret_len == 0) {
-               BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
-                       self_comp_class, "Remote side has closed connection");
-               goto error;
+       status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, self_comp_class,
+                       status, "viewer connect command");
+               goto end;
        }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
-                       self_comp_class, "Error receiving version: %s",
-                       bt_socket_errormsg());
-               goto error;
+
+       status = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, self_comp_class,
+                       status, "viewer connect reply");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(connect));
 
        BT_COMP_LOGI("Received viewer session ID : %" PRIu64,
                        (uint64_t) be64toh(connect.viewer_session_id));
@@ -224,7 +366,8 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
        if (LTTNG_LIVE_MAJOR != be32toh(connect.major)) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Incompatible lttng-relayd protocol");
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        }
        /* Use the smallest protocol version implemented. */
        if (LTTNG_LIVE_MINOR > be32toh(connect.minor)) {
@@ -233,25 +376,29 @@ int lttng_live_handshake(struct live_viewer_connection *viewer_connection)
                viewer_connection->minor =  LTTNG_LIVE_MINOR;
        }
        viewer_connection->major = LTTNG_LIVE_MAJOR;
-       ret = 0;
-       return ret;
 
-error:
-       return -1;
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
+
+       goto end;
+
+end:
+       return status;
 }
 
 static
-int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
+enum lttng_live_viewer_status lttng_live_connect_viewer(
+       struct live_viewer_connection *viewer_connection)
 {
        struct hostent *host;
        struct sockaddr_in server_addr;
+       enum lttng_live_viewer_status status;
        bt_self_component_class *self_comp_class = viewer_connection->self_comp_class;
        bt_self_component *self_comp = viewer_connection->self_comp;
-       int ret;
 
        if (parse_url(viewer_connection)) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Failed to parse URL");
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
 
@@ -260,12 +407,14 @@ int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Cannot lookup hostname: hostname=\"%s\"",
                        viewer_connection->relay_hostname->str);
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
 
        if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Socket creation failed: %s", bt_socket_errormsg());
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
 
@@ -279,17 +428,25 @@ int lttng_live_connect_viewer(struct live_viewer_connection *viewer_connection)
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Connection failed: %s",
                        bt_socket_errormsg());
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
-       if (lttng_live_handshake(viewer_connection)) {
+
+       status = lttng_live_handshake(viewer_connection);
+
+       /*
+        * Only print error and append cause in case of error. not in case of
+        * interruption.
+        */
+       if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Viewer handshake failed");
                goto error;
+       } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+               goto end;
        }
 
-       ret = 0;
-
-       return ret;
+       goto end;
 
 error:
        if (viewer_connection->control_sock != BT_INVALID_SOCKET) {
@@ -299,7 +456,8 @@ error:
                }
        }
        viewer_connection->control_sock = BT_INVALID_SOCKET;
-       return -1;
+end:
+       return status;
 }
 
 static
@@ -592,10 +750,10 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions(
        bt_component_class_query_method_status status =
                BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_OK;
        bt_value *result = NULL;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        uint32_t i, sessions_count;
-       ssize_t ret_len;
 
        result = bt_value_array_create();
        if (!result) {
@@ -609,51 +767,44 @@ bt_component_class_query_method_status live_viewer_connection_list_sessions(
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
+       viewer_status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                       "Error sending cmd: %s", bt_socket_errormsg());
+                       "Error sending list sessions command");
                status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
                goto error;
+       } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+               status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
+               goto error;
        }
-       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
-       if (ret_len == 0) {
+       viewer_status = lttng_live_recv(viewer_connection, &list, sizeof(list));
+       if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                       "Remote side has closed connection");
+                       "Error receiving session list");
                status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
                goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                       "Error receiving session list: %s",
-                       bt_socket_errormsg());
-               status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+       } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+               status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
                goto error;
        }
-       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
                struct lttng_viewer_session lsession;
 
-               ret_len = lttng_live_recv(viewer_connection, &lsession,
+               viewer_status = lttng_live_recv(viewer_connection, &lsession,
                        sizeof(lsession));
-               if (ret_len == 0) {
+               if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                        BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                               "Remote side has closed connection");
+                               "Error receiving session:");
                        status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
                        goto error;
-               }
-               if (ret_len == BT_SOCKET_ERROR) {
-                       BT_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp_class,
-                               "Error receiving session: %s",
-                               bt_socket_errormsg());
-                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_ERROR;
+               } else if (viewer_status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+                       status = BT_COMPONENT_CLASS_QUERY_METHOD_STATUS_AGAIN;
                        goto error;
                }
-               BT_ASSERT(ret_len == sizeof(lsession));
+
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                if (list_append_session(result, viewer_connection->url,
@@ -674,60 +825,48 @@ end:
 }
 
 static
-int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter)
+enum lttng_live_viewer_status lttng_live_query_session_ids(
+               struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_list_sessions list;
        struct lttng_viewer_session lsession;
        uint32_t i, sessions_count;
-       ssize_t ret_len;
        uint64_t session_id;
+       enum lttng_live_viewer_status status;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        bt_self_component *self_comp = viewer_connection->self_comp;
+       bt_self_component_class *self_comp_class =
+               viewer_connection->self_comp_class;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_LIST_SESSIONS);
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, self_comp_class,
+                       status, "list sessions command");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving session list: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_recv(viewer_connection, &list, sizeof(list));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, self_comp_class,
+                       status, "session list reply");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(list));
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
-               ret_len = lttng_live_recv(viewer_connection,
-                               &lsession, sizeof(lsession));
-               if (ret_len == 0) {
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                               "Remote side has closed connection");
-                       goto error;
-               }
-               if (ret_len == BT_SOCKET_ERROR) {
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                               "Error receiving session: %s",
-                               bt_socket_errormsg());
-                       goto error;
+               status = lttng_live_recv(viewer_connection, &lsession,
+                       sizeof(lsession));
+               if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                       viewer_handle_recv_status(self_comp, self_comp_class,
+                               status, "session reply");
+                       goto end;
                }
-               BT_ASSERT(ret_len == sizeof(lsession));
                lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0';
                lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                session_id = be64toh(lsession.id);
@@ -745,80 +884,78 @@ int lttng_live_query_session_ids(struct lttng_live_msg_iter *lttng_live_msg_iter
                                        lsession.session_name)) {
                                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                                        "Failed to add live session");
-                               goto error;
+                               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                               goto end;
                        }
                }
        }
 
-       return 0;
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
 
-error:
-       return -1;
+end:
+       return status;
 }
 
 BT_HIDDEN
-int lttng_live_create_viewer_session(
+enum lttng_live_viewer_status lttng_live_create_viewer_session(
                struct lttng_live_msg_iter *lttng_live_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_create_session_response resp;
-       ssize_t ret_len;
+       enum lttng_live_viewer_status status;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        bt_self_component *self_comp = viewer_connection->self_comp;
+       bt_self_component_class *self_comp_class =
+               viewer_connection->self_comp_class;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_CREATE_SESSION);
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error sending cmd: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, self_comp_class,
+                       status, "create session command");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving create session reply: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, self_comp_class,
+                       status, "create session reply");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(resp));
 
        if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) {
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Error creating viewer session");
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        }
-       if (lttng_live_query_session_ids(lttng_live_msg_iter)) {
+
+       status = lttng_live_query_session_ids(lttng_live_msg_iter);
+       if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Failed to query live viewer session ids");
-               goto error;
+               goto end;
+       } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+               goto end;
        }
 
-       return 0;
-
-error:
-       return -1;
+end:
+       return status;
 }
 
 static
-int receive_streams(struct lttng_live_session *session,
+enum lttng_live_viewer_status receive_streams(struct lttng_live_session *session,
                uint32_t stream_count,
                bt_self_message_iterator *self_msg_iter)
 {
-       ssize_t ret_len;
        uint32_t i;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                session->lttng_live_msg_iter;
+       enum lttng_live_viewer_status status;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        bt_self_component *self_comp = viewer_connection->self_comp;
@@ -830,18 +967,13 @@ int receive_streams(struct lttng_live_session *session,
                uint64_t stream_id;
                uint64_t ctf_trace_id;
 
-               ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
-               if (ret_len == 0) {
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                               "Remote side has closed connection");
-                       goto error;
-               }
-               if (ret_len == BT_SOCKET_ERROR) {
-                       BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                               "Error receiving stream");
-                       goto error;
+               status = lttng_live_recv(viewer_connection, &stream,
+                       sizeof(stream));
+               if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+                       viewer_handle_recv_status(self_comp, NULL,
+                               status, "stream reply");
+                       goto end;
                }
-               BT_ASSERT(ret_len == sizeof(stream));
                stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0';
                stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0';
                stream_id = be64toh(stream.id);
@@ -855,8 +987,8 @@ int receive_streams(struct lttng_live_session *session,
                                        stream.path_name)) {
                                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                                        "Error creating metadata stream");
-
-                               goto error;
+                               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                               goto end;
                        }
                        session->lazy_stream_msg_init = true;
                } else {
@@ -867,23 +999,24 @@ int receive_streams(struct lttng_live_session *session,
                        if (!live_stream) {
                                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                                        "Error creating stream");
-                               goto error;
+                               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+                               goto end;
                        }
                }
        }
-       return 0;
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
 
-error:
-       return -1;
+end:
+       return status;
 }
 
 BT_HIDDEN
-enum lttng_live_attach_session_status lttng_live_attach_session(
+enum lttng_live_viewer_status lttng_live_attach_session(
                struct lttng_live_session *session,
                bt_self_message_iterator *self_msg_iter)
 {
        struct lttng_viewer_cmd cmd;
-       enum lttng_live_attach_session_status attach_status;
+       enum lttng_live_viewer_status status;
        struct lttng_viewer_attach_session_request rq;
        struct lttng_viewer_attach_session_response rp;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
@@ -895,7 +1028,6 @@ enum lttng_live_attach_session_status lttng_live_attach_session(
        uint32_t streams_count;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       ssize_t ret_len;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_ATTACH_SESSION);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -914,28 +1046,19 @@ enum lttng_live_attach_session_status lttng_live_attach_session(
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error sending attach request: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       status, "attach session command");
+               goto end;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving attach response: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       status, "attach session reply");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
        switch(be32toh(rp.status)) {
@@ -944,51 +1067,60 @@ enum lttng_live_attach_session_status lttng_live_attach_session(
        case LTTNG_VIEWER_ATTACH_UNK:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Session id %" PRIu64 " is unknown", session_id);
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        case LTTNG_VIEWER_ATTACH_ALREADY:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "There is already a viewer attached to this session");
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        case LTTNG_VIEWER_ATTACH_NOT_LIVE:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Not a live session");
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        case LTTNG_VIEWER_ATTACH_SEEK_ERR:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Wrong seek parameter");
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        default:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Unknown attach return code %u", be32toh(rp.status));
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        }
 
        /* We receive the initial list of streams. */
-       if (receive_streams(session, streams_count, self_msg_iter)) {
+       status = receive_streams(session, streams_count, self_msg_iter);
+       switch (status) {
+       case LTTNG_LIVE_VIEWER_STATUS_OK:
+               break;
+       case LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED:
+               goto end;
+       case LTTNG_LIVE_VIEWER_STATUS_ERROR:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
-               goto error;
+               goto end;
+       default:
+               bt_common_abort();
        }
 
        session->attached = true;
        session->new_streams_needed = false;
 
-       attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_OK;
-       goto end;
-
-error:
-       attach_status = LTTNG_LIVE_ATTACH_SESSION_STATUS_ERROR;
-
 end:
-       return attach_status;
+       return status;
 }
 
 BT_HIDDEN
-int lttng_live_detach_session(struct lttng_live_session *session)
+enum lttng_live_viewer_status lttng_live_detach_session(
+               struct lttng_live_session *session)
 {
        struct lttng_viewer_cmd cmd;
+       enum lttng_live_viewer_status status;
        struct lttng_viewer_detach_session_request rq;
        struct lttng_viewer_detach_session_response rp;
-       ssize_t ret_len;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                session->lttng_live_msg_iter;
+       bt_self_component *self_comp = session->self_comp;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        uint64_t session_id = session->id;
@@ -1013,46 +1145,43 @@ int lttng_live_detach_session(struct lttng_live_session *session)
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE("Error sending detach request: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       status, "detach session command");
+               goto end;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE("Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE("Error receiving detach response: %s",
-                       bt_socket_errormsg());
-               goto error;
+       status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       status, "detach session reply");
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(rp));
 
        switch(be32toh(rp.status)) {
        case LTTNG_VIEWER_DETACH_SESSION_OK:
                break;
        case LTTNG_VIEWER_DETACH_SESSION_UNK:
                BT_COMP_LOGW("Session id %" PRIu64 " is unknown", session_id);
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        case LTTNG_VIEWER_DETACH_SESSION_ERR:
                BT_COMP_LOGW("Error detaching session id %" PRIu64 "", session_id);
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        default:
                BT_COMP_LOGE("Unknown detach return code %u", be32toh(rp.status));
-               goto error;
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
+               goto end;
        }
 
        session->attached = false;
 
-       return 0;
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
 
-error:
-       return -1;
+end:
+       return status;
 }
 
 BT_HIDDEN
@@ -1060,7 +1189,8 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
                struct lttng_live_trace *trace, FILE *fp, size_t *reply_len)
 {
        uint64_t len = 0;
-       enum lttng_live_get_one_metadata_status metadata_status;
+       enum lttng_live_get_one_metadata_status status;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_metadata rq;
        struct lttng_viewer_metadata_packet rp;
@@ -1088,28 +1218,21 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error sending get_metadata request: %s",
-                       bt_socket_errormsg());
-               goto error;
+       viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       viewer_status, "get metadata command");
+               status = (enum lttng_live_get_one_metadata_status) viewer_status;
+               goto end;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving get_metadata response: %s",
-                       bt_socket_errormsg());
-               goto error;
+       viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get metadata reply");
+               status = (enum lttng_live_get_one_metadata_status) viewer_status;
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(rp));
 
        switch (be32toh(rp.status)) {
                case LTTNG_VIEWER_METADATA_OK:
@@ -1117,7 +1240,7 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
                        break;
                case LTTNG_VIEWER_NO_NEW_METADATA:
                        BT_COMP_LOGD("Received get_metadata response: no new");
-                       metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
+                       status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_END;
                        goto end;
                case LTTNG_VIEWER_METADATA_ERR:
                        /*
@@ -1126,11 +1249,12 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
                         * in a per-pid session.
                         */
                        BT_COMP_LOGD("Received get_metadata response: error");
-                       metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
+                       status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED;
                        goto end;
                default:
                        BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                                "Received get_metadata response: unknown");
+                       status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
                        goto error;
        }
 
@@ -1139,6 +1263,7 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
        if (len <= 0) {
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Erroneous response length");
+               status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
                goto error;
        }
 
@@ -1146,42 +1271,39 @@ enum lttng_live_get_one_metadata_status lttng_live_get_one_metadata_packet(
        if (!data) {
                BT_COMP_LOGE_APPEND_CAUSE_ERRNO(self_comp,
                        "Failed to allocate data buffer", ".");
+               status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
                goto error;
        }
-       ret_len = lttng_live_recv(viewer_connection, data, len);
-       if (ret_len == 0) {
-               BT_COMP_LOGI("Remote side has closed connection");
-               goto error_free_data;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving trace packet: %s", bt_socket_errormsg());
-               goto error_free_data;
+
+       viewer_status = lttng_live_recv(viewer_connection, data, len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get metadata packet");
+               status = (enum lttng_live_get_one_metadata_status) viewer_status;
+               goto error;
        }
-       BT_ASSERT(ret_len == len);
 
+       /*
+        * Write the metadata to the file handle.
+        */
        do {
                ret_len = fwrite(data, 1, len, fp);
        } while (ret_len < 0 && errno == EINTR);
        if (ret_len < 0) {
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Writing in the metadata file stream");
-               goto error_free_data;
+               status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
+               goto error;
        }
        BT_ASSERT(ret_len == len);
-       free(data);
        *reply_len = len;
-       metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
+       status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK;
 
-       goto end;
-
-error_free_data:
-       free(data);
 error:
-       metadata_status = LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR;
+       free(data);
 
 end:
-       return metadata_status;
+       return status;
 }
 
 /*
@@ -1210,16 +1332,16 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_next_index rq;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_viewer_index rp;
-       enum lttng_live_iterator_status retstatus = LTTNG_LIVE_ITERATOR_STATUS_OK;
+       enum lttng_live_iterator_status status;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        bt_self_component *self_comp = viewer_connection->self_comp;
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       uint32_t flags, status;
-       ssize_t ret_len;
+       uint32_t flags, rp_status;
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -1236,33 +1358,24 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error sending get_next_index request: %s",
-                       bt_socket_errormsg());
+       viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       viewer_status, "get next index command");
                goto error;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving get_next_index response: %s",
-                       bt_socket_errormsg());
+       viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get next index reply");
                goto error;
        }
-       BT_ASSERT(ret_len == sizeof(rp));
 
        flags = be32toh(rp.flags);
-       status = be32toh(rp.status);
+       rp_status = be32toh(rp.status);
 
-       switch (status) {
+       switch (rp_status) {
        case LTTNG_VIEWER_INDEX_INACTIVE:
        {
                uint64_t ctf_stream_class_id;
@@ -1279,6 +1392,7 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
                        stream->ctf_stream_class_id = ctf_stream_class_id;
                }
                stream->state = LTTNG_LIVE_STREAM_QUIESCENT;
+               status = LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
        }
        case LTTNG_VIEWER_INDEX_OK:
@@ -1305,43 +1419,42 @@ enum lttng_live_iterator_status lttng_live_get_next_index(
                        BT_COMP_LOGD("Received get_next_index response: new streams needed");
                        lttng_live_need_new_streams(lttng_live_msg_iter);
                }
+               status = LTTNG_LIVE_ITERATOR_STATUS_OK;
                break;
        }
        case LTTNG_VIEWER_INDEX_RETRY:
                BT_COMP_LOGD("Received get_next_index response: retry");
                memset(index, 0, sizeof(struct packet_index));
-               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
+               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
                goto end;
        case LTTNG_VIEWER_INDEX_HUP:
                BT_COMP_LOGD("Received get_next_index response: stream hung up");
                memset(index, 0, sizeof(struct packet_index));
                index->offset = EOF;
-               retstatus = LTTNG_LIVE_ITERATOR_STATUS_END;
                stream->state = LTTNG_LIVE_STREAM_EOF;
                stream->has_stream_hung_up = true;
+               status = LTTNG_LIVE_ITERATOR_STATUS_END;
                break;
        case LTTNG_VIEWER_INDEX_ERR:
                BT_COMP_LOGD("Received get_next_index response: error");
                memset(index, 0, sizeof(struct packet_index));
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
-               goto error;
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        default:
                BT_COMP_LOGD("Received get_next_index response: unknown value");
                memset(index, 0, sizeof(struct packet_index));
                stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA;
-               goto error;
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
-end:
-       return retstatus;
+       goto end;
 
 error:
-       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
-               retstatus = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-       } else {
-               retstatus = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-       }
-       return retstatus;
+       status = viewer_status_to_live_iterator_status(viewer_status);
+end:
+       return status;
 }
 
 BT_HIDDEN
@@ -1350,7 +1463,8 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
                struct lttng_live_stream_iterator *stream, uint8_t *buf,
                uint64_t offset, uint64_t req_len, uint64_t *recv_len)
 {
-       enum ctf_msg_iter_medium_status retstatus = CTF_MSG_ITER_MEDIUM_STATUS_OK;
+       enum ctf_msg_iter_medium_status status;
+       enum lttng_live_viewer_status viewer_status;
        struct lttng_viewer_trace_packet rp;
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_packet rq;
@@ -1360,8 +1474,7 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
        struct lttng_live_trace *trace = stream->trace;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       uint32_t flags, status;
-       ssize_t ret_len;
+       uint32_t flags, rp_status;
 
        BT_COMP_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1381,46 +1494,34 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error sending get_data_packet request: %s",
-                       bt_socket_errormsg());
+       viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       viewer_status, "get data packet command");
                goto error;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Socket error receiving get_data response: %s",
-                       bt_socket_errormsg());
-               goto error;
-       }
-       if (ret_len != sizeof(rp)) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp, "get_data_packet: "
-                       "expected %zu, received %zd", sizeof(rp), ret_len);
+       viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get data packet reply");
                goto error;
        }
 
        flags = be32toh(rp.flags);
-       status = be32toh(rp.status);
+       rp_status = be32toh(rp.status);
 
-       switch (status) {
+       switch (rp_status) {
        case LTTNG_VIEWER_GET_PACKET_OK:
                req_len = be32toh(rp.len);
                BT_COMP_LOGD("Received get_data_packet response: Ok, "
                        "packet size : %" PRIu64 "", req_len);
+               status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
                break;
        case LTTNG_VIEWER_GET_PACKET_RETRY:
                /* Unimplemented by relay daemon */
                BT_COMP_LOGD("Received get_data_packet response: retry");
-               retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+               status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
                goto end;
        case LTTNG_VIEWER_GET_PACKET_ERR:
                if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) {
@@ -1433,49 +1534,43 @@ enum ctf_msg_iter_medium_status lttng_live_get_stream_bytes(
                }
                if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA
                                | LTTNG_VIEWER_FLAG_NEW_STREAM)) {
-                       retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
+                       status = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
                        goto end;
                }
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Received get_data_packet response: error");
-               goto error;
+               status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
+               goto end;
        case LTTNG_VIEWER_GET_PACKET_EOF:
-               retstatus = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
+               status = CTF_MSG_ITER_MEDIUM_STATUS_EOF;
                goto end;
        default:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Received get_data_packet response: unknown");
+               status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
                goto error;
        }
 
        if (req_len == 0) {
+               status = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
                goto error;
        }
 
-       ret_len = lttng_live_recv(viewer_connection, buf, req_len);
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
+       viewer_status = lttng_live_recv(viewer_connection, buf, req_len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get data packet");
                goto error;
        }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving trace packet: %s",
-                       bt_socket_errormsg());
-               goto error;
-       }
-       BT_ASSERT(ret_len == req_len);
-       *recv_len = ret_len;
-end:
-       return retstatus;
+       *recv_len = req_len;
 
+       status = CTF_MSG_ITER_MEDIUM_STATUS_OK;
+       goto end;
 error:
-       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
-               retstatus = CTF_MSG_ITER_MEDIUM_STATUS_AGAIN;
-       } else {
-               retstatus = CTF_MSG_ITER_MEDIUM_STATUS_ERROR;
-       }
-       return retstatus;
+
+       status = viewer_status_to_ctf_msg_iter_medium_status(viewer_status);
+end:
+       return status;
 }
 
 /*
@@ -1493,16 +1588,17 @@ enum lttng_live_iterator_status lttng_live_get_new_streams(
        struct lttng_viewer_new_streams_response rp;
        struct lttng_live_msg_iter *lttng_live_msg_iter =
                session->lttng_live_msg_iter;
+       enum lttng_live_viewer_status viewer_status;
        struct live_viewer_connection *viewer_connection =
                lttng_live_msg_iter->viewer_connection;
        bt_self_component *self_comp = viewer_connection->self_comp;
        uint32_t streams_count;
        const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
        char cmd_buf[cmd_buf_len];
-       ssize_t ret_len;
 
        if (!session->new_streams_needed) {
-               return LTTNG_LIVE_ITERATOR_STATUS_OK;
+               status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+               goto end;
        }
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS);
@@ -1519,27 +1615,21 @@ enum lttng_live_iterator_status lttng_live_get_new_streams(
         */
        memcpy(cmd_buf, &cmd, sizeof(cmd));
        memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
-       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error sending get_new_streams request: %s",
-                       bt_socket_errormsg());
-               goto error;
+       viewer_status = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_send_status(self_comp, NULL,
+                       viewer_status, "get new streams command");
+               status = viewer_status_to_live_iterator_status(viewer_status);
+               goto end;
        }
 
-       BT_ASSERT(ret_len == cmd_buf_len);
-       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
-       if (ret_len == 0) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Remote side has closed connection");
-               goto error;
-       }
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp,
-                       "Error receiving get_new_streams response");
-               goto error;
+       viewer_status = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "get new streams reply");
+               status = viewer_status_to_live_iterator_status(viewer_status);
+               goto end;
        }
-       BT_ASSERT(ret_len == sizeof(rp));
 
        streams_count = be32toh(rp.streams_count);
 
@@ -1557,46 +1647,47 @@ enum lttng_live_iterator_status lttng_live_get_new_streams(
                goto end;
        case LTTNG_VIEWER_NEW_STREAMS_ERR:
                BT_COMP_LOGD("Received get_new_streams response: error");
-               goto error;
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        default:
                BT_COMP_LOGE_APPEND_CAUSE(self_comp,
                        "Received get_new_streams response: Unknown:"
                        "return code %u", be32toh(rp.status));
-               goto error;
+               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+               goto end;
        }
 
-       if (receive_streams(session, streams_count, self_msg_iter)) {
-               BT_COMP_LOGE_APPEND_CAUSE(self_comp, "Error receiving streams");
-               goto error;
+       viewer_status = receive_streams(session, streams_count, self_msg_iter);
+       if (viewer_status != LTTNG_LIVE_VIEWER_STATUS_OK) {
+               viewer_handle_recv_status(self_comp, NULL,
+                       viewer_status, "new streams");
+               status = viewer_status_to_live_iterator_status(viewer_status);
+               goto end;
        }
-end:
-       return status;
 
-error:
-       if (lttng_live_graph_is_canceled(lttng_live_msg_iter)) {
-               status = LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
-       } else {
-               status = LTTNG_LIVE_ITERATOR_STATUS_ERROR;
-               BT_COMP_LOGE("Error receiving streams.");
-       }
+       status = LTTNG_LIVE_ITERATOR_STATUS_OK;
+end:
        return status;
 }
 
 BT_HIDDEN
-struct live_viewer_connection *live_viewer_connection_create(
-               const char *url, bool in_query,
-               struct lttng_live_msg_iter *lttng_live_msg_iter,
+enum lttng_live_viewer_status live_viewer_connection_create(
                bt_self_component *self_comp,
                bt_self_component_class *self_comp_class,
-               bt_logging_level log_level)
+               bt_logging_level log_level,
+               const char *url, bool in_query,
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct live_viewer_connection **viewer)
 {
        struct live_viewer_connection *viewer_connection;
+       enum lttng_live_viewer_status status;
 
        viewer_connection = g_new0(struct live_viewer_connection, 1);
 
        if (bt_socket_init(log_level) != 0) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Failed to init socket");
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
 
@@ -1613,25 +1704,36 @@ struct live_viewer_connection *live_viewer_connection_create(
        if (!viewer_connection->url) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Failed to allocate URL buffer");
+               status = LTTNG_LIVE_VIEWER_STATUS_ERROR;
                goto error;
        }
 
        BT_COMP_LOGI("Establishing connection to url \"%s\"...", url);
-       if (lttng_live_connect_viewer(viewer_connection)) {
+       status = lttng_live_connect_viewer(viewer_connection);
+       /*
+        * Only print error and append cause in case of error. not in case of
+        * interruption.
+        */
+       if (status == LTTNG_LIVE_VIEWER_STATUS_ERROR) {
                BT_COMP_OR_COMP_CLASS_LOGE_APPEND_CAUSE(self_comp,
                        self_comp_class, "Failed to establish connection: "
                        "url=\"%s\"", url);
                goto error;
+       } else if (status == LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED) {
+               goto error;
        }
        BT_COMP_LOGI("Connection to url \"%s\" is established", url);
-       return viewer_connection;
+
+       *viewer = viewer_connection;
+       status = LTTNG_LIVE_VIEWER_STATUS_OK;
+       goto end;
 
 error:
        if (viewer_connection) {
                live_viewer_connection_destroy(viewer_connection);
        }
-
-       return NULL;
+end:
+       return status;
 }
 
 BT_HIDDEN
index ca5883e840491bc7ae6b9a95442a37abb2056536..0a7aa91ad350750d376f2088e2bc26c16e5ecb4a 100644 (file)
 #define LTTNG_LIVE_MAJOR                       2
 #define LTTNG_LIVE_MINOR                       4
 
+enum lttng_live_viewer_status {
+       LTTNG_LIVE_VIEWER_STATUS_OK             = 0,
+       LTTNG_LIVE_VIEWER_STATUS_ERROR          = -1,
+       LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED    = -2,
+};
+
+enum lttng_live_get_one_metadata_status {
+       /* The end of the metadata stream was reached. */
+       LTTNG_LIVE_GET_ONE_METADATA_STATUS_END          = 1,
+       /* One metadata packet was received and written to file. */
+       LTTNG_LIVE_GET_ONE_METADATA_STATUS_OK           = LTTNG_LIVE_VIEWER_STATUS_OK,
+       /*
+        * A critical error occurred when contacting the relay or while
+        * handling its response.
+        */
+       LTTNG_LIVE_GET_ONE_METADATA_STATUS_ERROR        = LTTNG_LIVE_VIEWER_STATUS_ERROR,
+
+       LTTNG_LIVE_GET_ONE_METADATA_STATUS_INTERRUPTED  = LTTNG_LIVE_VIEWER_STATUS_INTERRUPTED,
+
+       /* The metadata stream was not found on the relay. */
+       LTTNG_LIVE_GET_ONE_METADATA_STATUS_CLOSED       = -3,
+};
+
 struct lttng_live_component;
 
 struct live_viewer_connection {
@@ -81,12 +104,13 @@ struct packet_index {
        uint64_t packet_seq_num;        /* packet sequence number */
 };
 
-struct live_viewer_connection * live_viewer_connection_create(
-               const char *url, bool in_query,
-               struct lttng_live_msg_iter *lttng_live_msg_iter,
+enum lttng_live_viewer_status live_viewer_connection_create(
                bt_self_component *self_comp,
                bt_self_component_class *self_comp_class,
-               bt_logging_level log_level);
+               bt_logging_level log_level,
+               const char *url, bool in_query,
+               struct lttng_live_msg_iter *lttng_live_msg_iter,
+               struct live_viewer_connection **viewer_connection);
 
 void live_viewer_connection_destroy(
                struct live_viewer_connection *conn);
This page took 0.054383 seconds and 4 git commands to generate.