From 4c66436f61db5fcb293f79313f78b6affc83666a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Tue, 16 May 2017 14:42:18 -0400 Subject: [PATCH] lttng-live: handle EINTR and graph cancelation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Mathieu Desnoyers Signed-off-by: Jérémie Galarneau --- plugins/ctf/lttng-live/lttng-live-internal.h | 1 + plugins/ctf/lttng-live/lttng-live.c | 38 ++++-- plugins/ctf/lttng-live/metadata.c | 7 ++ plugins/ctf/lttng-live/viewer-connection.c | 123 ++++++++++++------- plugins/ctf/lttng-live/viewer-connection.h | 8 +- 5 files changed, 122 insertions(+), 55 deletions(-) diff --git a/plugins/ctf/lttng-live/lttng-live-internal.h b/plugins/ctf/lttng-live/lttng-live-internal.h index 6b2d71fb..fa540927 100644 --- a/plugins/ctf/lttng-live/lttng-live-internal.h +++ b/plugins/ctf/lttng-live/lttng-live-internal.h @@ -193,6 +193,7 @@ struct lttng_live_component { struct lttng_live_no_stream_iterator *no_stream_iter; struct bt_component *downstream_component; + struct bt_graph *graph; /* weak */ }; enum bt_ctf_lttng_live_iterator_status { diff --git a/plugins/ctf/lttng-live/lttng-live.c b/plugins/ctf/lttng-live/lttng-live.c index 095bef5b..0e64f7d9 100644 --- a/plugins/ctf/lttng-live/lttng-live.c +++ b/plugins/ctf/lttng-live/lttng-live.c @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -269,9 +270,11 @@ void lttng_live_destroy_session(struct lttng_live_session *session) BT_LOGI("Destroy session"); if (session->id != -1ULL) { if (lttng_live_detach_session(session)) { - /* Old relayd cannot detach sessions. */ - BT_LOGD("Unable to detach session %" PRIu64, - session->id); + if (!bt_graph_is_canceled(session->lttng_live->graph)) { + /* Old relayd cannot detach sessions. */ + BT_LOGD("Unable to detach session %" PRIu64, + session->id); + } } session->id = -1ULL; } @@ -401,7 +404,11 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_session( struct lttng_live_trace *trace, *t; if (lttng_live_attach_session(session)) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } } status = lttng_live_get_new_streams(session); if (status != BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK && @@ -911,7 +918,7 @@ struct bt_value *lttng_live_query_list_sessions(struct bt_component_class *comp_ goto error; } - viewer_connection = bt_live_viewer_connection_create(url, stderr); + viewer_connection = bt_live_viewer_connection_create(url, NULL); if (!viewer_connection) { ret = BT_COMPONENT_STATUS_NOMEM; goto error; @@ -984,6 +991,7 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params struct bt_value *value = NULL; const char *url; enum bt_value_status ret; + struct bt_component *component; lttng_live = g_new0(struct lttng_live_component, 1); if (!lttng_live) { @@ -1007,18 +1015,30 @@ struct lttng_live_component *lttng_live_component_create(struct bt_value *params goto error; } lttng_live->viewer_connection = - bt_live_viewer_connection_create(lttng_live->url->str, - stderr); + bt_live_viewer_connection_create(lttng_live->url->str, lttng_live); if (!lttng_live->viewer_connection) { - ret = BT_COMPONENT_STATUS_NOMEM; + if (bt_graph_is_canceled(lttng_live->graph)) { + ret = BT_COMPONENT_STATUS_AGAIN; + } else { + ret = BT_COMPONENT_STATUS_NOMEM; + } goto error; } if (lttng_live_create_viewer_session(lttng_live)) { - ret = BT_COMPONENT_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + ret = BT_COMPONENT_STATUS_AGAIN; + } else { + ret = BT_COMPONENT_STATUS_NOMEM; + } goto error; } lttng_live->private_component = private_component; + component = bt_component_from_private_component(private_component); + lttng_live->graph = bt_component_get_graph(component); + bt_put(lttng_live->graph); /* weak */ + bt_put(component); + goto end; error: diff --git a/plugins/ctf/lttng-live/metadata.c b/plugins/ctf/lttng-live/metadata.c index 3124ff6c..d3b7b095 100644 --- a/plugins/ctf/lttng-live/metadata.c +++ b/plugins/ctf/lttng-live/metadata.c @@ -30,6 +30,7 @@ #include #include #include +#include #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-METADATA" @@ -155,6 +156,12 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_metadata_update( */ lttng_live_unref_trace(metadata->trace); } + if (errno == EINTR) { + if (bt_graph_is_canceled(session->lttng_live->graph)) { + status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + goto end; + } + } } if (bt_close_memstream(&metadata_buf, &size, fp)) { diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index aab5c395..9f8823a3 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -37,6 +37,7 @@ #include #include #include +#include #define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER" @@ -46,10 +47,14 @@ #include "data-stream.h" #include "metadata.h" -static ssize_t lttng_live_recv(int fd, void *buf, size_t len) +static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection, + void *buf, size_t len) { ssize_t ret; size_t copied = 0, to_copy = len; + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + int fd = viewer_connection->control_sock; do { ret = recv(fd, buf + copied, to_copy, 0); @@ -58,21 +63,40 @@ static ssize_t lttng_live_recv(int fd, void *buf, size_t len) copied += ret; to_copy -= ret; } - } while ((ret > 0 && to_copy > 0) - || (ret < 0 && errno == EINTR)); + if (ret < 0 && errno == EINTR) { + if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + break; + } else { + continue; + } + } + } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; /* ret = 0 means orderly shutdown, ret < 0 is error. */ return ret; } -static ssize_t lttng_live_send(int fd, const void *buf, size_t len) +static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection, + const void *buf, size_t len) { + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + int fd = viewer_connection->control_sock; ssize_t ret; - do { + for (;;) { ret = bt_send_nosigpipe(fd, buf, len); - } while (ret < 0 && errno == EINTR); + if (ret < 0 && errno == EINTR) { + if (lttng_live && bt_graph_is_canceled(lttng_live->graph)) { + break; + } else { + continue; + } + } else { + break; + } + } return ret; } @@ -147,21 +171,21 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect)); if (ret_len < 0) { BT_LOGE("Error sending version: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(connect)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -547,14 +571,14 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -569,7 +593,7 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); @@ -611,14 +635,14 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -631,7 +655,7 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); @@ -677,14 +701,14 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp)); + ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -726,7 +750,7 @@ int receive_streams(struct lttng_live_session *session, uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream)); + ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -797,21 +821,21 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending attach request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -880,21 +904,21 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending detach request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -949,21 +973,21 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1001,7 +1025,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("relay data zmalloc: %s", strerror(errno)); goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, data, len); + ret_len = lttng_live_recv(viewer_connection, data, len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error_free_data; @@ -1072,21 +1096,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1175,7 +1199,11 @@ end: return retstatus; error: - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return retstatus; } @@ -1205,21 +1233,21 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_data request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1276,7 +1304,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len); + ret_len = lttng_live_recv(viewer_connection, buf, req_len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1291,7 +1319,11 @@ end: return retstatus; error: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + } else { + retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + } return retstatus; } @@ -1324,21 +1356,21 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); if (ret_len < 0) { BT_LOGE("Error sending cmd: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); if (ret_len < 0) { BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); goto error; } assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; @@ -1378,13 +1410,18 @@ end: return status; error: - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (bt_graph_is_canceled(lttng_live->graph)) { + status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + } else { + status = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + } return status; } BT_HIDDEN struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, FILE *error_fp) + bt_live_viewer_connection_create(const char *url, + struct lttng_live_component *lttng_live) { struct bt_live_viewer_connection *viewer_connection; @@ -1393,7 +1430,7 @@ struct bt_live_viewer_connection * bt_object_init(&viewer_connection->obj, connection_release); viewer_connection->control_sock = -1; viewer_connection->port = -1; - viewer_connection->error_fp = error_fp; + viewer_connection->lttng_live = lttng_live; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { goto error; diff --git a/plugins/ctf/lttng-live/viewer-connection.h b/plugins/ctf/lttng-live/viewer-connection.h index 822c17ed..db9554d7 100644 --- a/plugins/ctf/lttng-live/viewer-connection.h +++ b/plugins/ctf/lttng-live/viewer-connection.h @@ -37,11 +37,11 @@ #define LTTNG_LIVE_MAJOR 2 #define LTTNG_LIVE_MINOR 4 +struct lttng_live_component; + struct bt_live_viewer_connection { struct bt_object obj; - FILE *error_fp; - GString *url; GString *relay_hostname; @@ -53,6 +53,8 @@ struct bt_live_viewer_connection { int32_t major; int32_t minor; + + struct lttng_live_component *lttng_live; }; struct packet_index_time { @@ -75,7 +77,7 @@ struct packet_index { }; struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, FILE *error_fp); + bt_live_viewer_connection_create(const char *url, struct lttng_live_component *lttng_live); void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *conn); -- 2.34.1