lttng-live: handle EINTR and graph cancelation
[babeltrace.git] / plugins / ctf / lttng-live / viewer-connection.c
index aab5c3958a6402890feaeb846e6178557ca8be88..9f8823a3d6d7a9560a6895039b690c3c1d52ec11 100644 (file)
@@ -37,6 +37,7 @@
 #include <babeltrace/compat/send-internal.h>
 #include <babeltrace/compiler-internal.h>
 #include <babeltrace/common-internal.h>
+#include <babeltrace/graph/graph.h>
 
 #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER"
 
 #include "data-stream.h"
 #include "metadata.h"
 
-static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
+static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection,
+               void *buf, size_t len)
 {
        ssize_t ret;
        size_t copied = 0, to_copy = len;
+       struct lttng_live_component *lttng_live =
+                       viewer_connection->lttng_live;
+       int fd = viewer_connection->control_sock;
 
        do {
                ret = recv(fd, buf + copied, to_copy, 0);
@@ -58,21 +63,40 @@ static ssize_t lttng_live_recv(int fd, void *buf, size_t len)
                        copied += ret;
                        to_copy -= ret;
                }
-       } while ((ret > 0 && to_copy > 0)
-               || (ret < 0 && errno == EINTR));
+               if (ret < 0 && errno == EINTR) {
+                       if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+                               break;
+                       } else {
+                               continue;
+                       }
+               }
+       } while (ret > 0 && to_copy > 0);
        if (ret > 0)
                ret = copied;
        /* ret = 0 means orderly shutdown, ret < 0 is error. */
        return ret;
 }
 
-static ssize_t lttng_live_send(int fd, const void *buf, size_t len)
+static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection,
+               const void *buf, size_t len)
 {
+       struct lttng_live_component *lttng_live =
+                       viewer_connection->lttng_live;
+       int fd = viewer_connection->control_sock;
        ssize_t ret;
 
-       do {
+       for (;;) {
                ret = bt_send_nosigpipe(fd, buf, len);
-       } while (ret < 0 && errno == EINTR);
+               if (ret < 0 && errno == EINTR) {
+                       if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) {
+                               break;
+                       } else {
+                               continue;
+                       }
+               } else {
+                       break;
+               }
+       }
        return ret;
 }
 
@@ -147,21 +171,21 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect));
+       ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
        if (ret_len < 0) {
                BT_LOGE("Error sending version: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(connect));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect));
+       ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -547,14 +571,14 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -569,7 +593,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c
        for (i = 0; i < sessions_count; i++) {
                struct lttng_viewer_session lsession;
 
-               ret_len = lttng_live_recv(viewer_connection->control_sock,
+               ret_len = lttng_live_recv(viewer_connection,
                                &lsession, sizeof(lsession));
                if (ret_len == 0) {
                        BT_LOGI("Remote side has closed connection");
@@ -611,14 +635,14 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list));
+       ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -631,7 +655,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live)
 
        sessions_count = be32toh(list.sessions_count);
        for (i = 0; i < sessions_count; i++) {
-               ret_len = lttng_live_recv(viewer_connection->control_sock,
+               ret_len = lttng_live_recv(viewer_connection,
                                &lsession, sizeof(lsession));
                if (ret_len == 0) {
                        BT_LOGI("Remote side has closed connection");
@@ -677,14 +701,14 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live)
        cmd.data_size = htobe64((uint64_t) 0);
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp));
+       ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -726,7 +750,7 @@ int receive_streams(struct lttng_live_session *session,
                uint64_t stream_id;
                uint64_t ctf_trace_id;
 
-               ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream));
+               ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream));
                if (ret_len == 0) {
                        BT_LOGI("Remote side has closed connection");
                        goto error;
@@ -797,21 +821,21 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending attach request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -880,21 +904,21 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session_id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending detach request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -949,21 +973,21 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending get_metadata request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -1001,7 +1025,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
                BT_LOGE("relay data zmalloc: %s", strerror(errno));
                goto error;
        }
-       ret_len = lttng_live_recv(viewer_connection->control_sock, data, len);
+       ret_len = lttng_live_recv(viewer_connection, data, len);
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error_free_data;
@@ -1072,21 +1096,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->viewer_stream_id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending get_next_index request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -1175,7 +1199,11 @@ end:
        return retstatus;
 
 error:
-       retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (bt_graph_is_canceled(lttng_live->graph)) {
+               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN;
+       } else {
+               retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       }
        return retstatus;
 }
 
@@ -1205,21 +1233,21 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
        rq.offset = htobe64(offset);
        rq.len = htobe32(req_len);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending get_data request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -1276,7 +1304,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li
                goto error;
        }
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len);
+       ret_len = lttng_live_recv(viewer_connection, buf, req_len);
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -1291,7 +1319,11 @@ end:
        return retstatus;
 
 error:
-       retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+       if (bt_graph_is_canceled(lttng_live->graph)) {
+               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+       } else {
+               retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+       }
        return retstatus;
 }
 
@@ -1324,21 +1356,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams(
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session->id);
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
        if (ret_len < 0) {
                BT_LOGE("Error sending cmd: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(cmd));
 
-       ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
        if (ret_len < 0) {
                BT_LOGE("Error sending get_new_streams request: %s", strerror(errno));
                goto error;
        }
        assert(ret_len == sizeof(rq));
 
-       ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp));
+       ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
                BT_LOGI("Remote side has closed connection");
                goto error;
@@ -1378,13 +1410,18 @@ end:
        return status;
 
 error:
-       status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR;
+       if (bt_graph_is_canceled(lttng_live->graph)) {
+               status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN;
+       } else {
+               status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR;
+       }
        return status;
 }
 
 BT_HIDDEN
 struct bt_live_viewer_connection *
-       bt_live_viewer_connection_create(const char *url, FILE *error_fp)
+       bt_live_viewer_connection_create(const char *url,
+                       struct lttng_live_component *lttng_live)
 {
        struct bt_live_viewer_connection *viewer_connection;
 
@@ -1393,7 +1430,7 @@ struct bt_live_viewer_connection *
        bt_object_init(&viewer_connection->obj, connection_release);
        viewer_connection->control_sock = -1;
        viewer_connection->port = -1;
-       viewer_connection->error_fp = error_fp;
+       viewer_connection->lttng_live = lttng_live;
        viewer_connection->url = g_string_new(url);
        if (!viewer_connection->url) {
                goto error;
This page took 0.029724 seconds and 4 git commands to generate.