X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=9f8823a3d6d7a9560a6895039b690c3c1d52ec11;hb=4c66436f61db5fcb293f79313f78b6affc83666a;hp=aab5c3958a6402890feaeb846e6178557ca8be88;hpb=5bd230f4f87cd49f82e418fcb1886c255f8acf14;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index aab5c395..9f8823a3 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -37,6 +37,7 @@ #include #include #include +#include #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER" @@ -46,10 +47,14 @@ #include "data-stream.h" #include "metadata.h" -static ssize_t lttng_live_recv(int fd, void *buf, size_t len) +static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection, + void *buf, size_t len) { ssize_t ret; size_t copied = 0, to_copy = len; + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + int fd = viewer_connection->control_sock; do { ret = recv(fd, buf + copied, to_copy, 0); @@ -58,21 +63,40 @@ static ssize_t lttng_live_recv(int fd, void *buf, size_t len) copied += ret; to_copy -= ret; } - } while ((ret > 0 && to_copy > 0) - || (ret < 0 && errno == EINTR)); + if (ret < 0 && errno == EINTR) { + if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + break; + } else { + continue; + } + } + } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; /* ret = 0 means orderly shutdown, ret < 0 is error. */ return ret; } -static ssize_t lttng_live_send(int fd, const void *buf, size_t len) +static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection, + const void *buf, size_t len) { + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + int fd = viewer_connection->control_sock; ssize_t ret; - do { + for (;;) { ret = bt_send_nosigpipe(fd, buf, len); - } while (ret < 0 && errno == EINTR); + if (ret < 0 && errno == EINTR) { + if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + break; + } else { + continue; + } + } else { + break; + } + } return ret; } @@ -147,21 +171,21 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect)); if (ret_len < 0) { BT_LOGE("Error sending version: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(connect)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -547,14 +571,14 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -569,7 +593,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); @@ -611,14 +635,14 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -631,7 +655,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); @@ -677,14 +701,14 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp)); + ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -726,7 +750,7 @@ int receive_streams(struct lttng_live_session *session, uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream)); + ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -797,21 +821,21 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending attach request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -880,21 +904,21 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending detach request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -949,21 +973,21 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1001,7 +1025,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("relay data zmalloc: %s", strerror(errno)); goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, data, len); + ret_len = lttng_live_recv(viewer_connection, data, len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error_free_data; @@ -1072,21 +1096,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1175,7 +1199,11 @@ end: return retstatus; error: - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return retstatus; } @@ -1205,21 +1233,21 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_data request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1276,7 +1304,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len); + ret_len = lttng_live_recv(viewer_connection, buf, req_len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1291,7 +1319,11 @@ end: return retstatus; error: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + } else { + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + } return retstatus; } @@ -1324,21 +1356,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1378,13 +1410,18 @@ end: return status; error: - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + } else { + status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + } return status; } BT_HIDDEN struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, FILE *error_fp) + bt_live_viewer_connection_create(const char *url, + struct lttng_live_component *lttng_live) { struct bt_live_viewer_connection *viewer_connection; @@ -1393,7 +1430,7 @@ struct bt_live_viewer_connection * bt_object_init(&viewer_connection->obj, connection_release); viewer_connection->control_sock = -1; viewer_connection->port = -1; - viewer_connection->error_fp = error_fp; + viewer_connection->lttng_live = lttng_live; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { goto error;