X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=0492340c212c5158e7621fbd4d5c8d973699884c;hb=f6ccaed94e575af57fe6bf38154771bee4871a2a;hp=aab5c3958a6402890feaeb846e6178557ca8be88;hpb=94b828f31a1155cd07721926761e32cfc610bf47;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index aab5c395..0492340c 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -20,6 +20,9 @@ * SOFTWARE. */ +#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-SRC-VIEWER" +#include "logging.h" + #include #include #include @@ -27,18 +30,14 @@ #include #include #include -#include #include -#include -#include #include -#include -#include +#include +#include #include #include - -#define BT_LOG_TAG "PLUGIN-CTF-LTTNG-LIVE-VIEWER" +#include #include "lttng-live-internal.h" #include "viewer-connection.h" @@ -46,33 +45,56 @@ #include "data-stream.h" #include "metadata.h" -static ssize_t lttng_live_recv(int fd, void *buf, size_t len) +static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connection, + void *buf, size_t len) { ssize_t ret; size_t copied = 0, to_copy = len; + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + BT_SOCKET sock = viewer_connection->control_sock; do { - ret = recv(fd, buf + copied, to_copy, 0); + ret = bt_socket_recv(sock, buf + copied, to_copy, 0); if (ret > 0) { - assert(ret <= to_copy); + BT_ASSERT(ret <= to_copy); copied += ret; to_copy -= ret; } - } while ((ret > 0 && to_copy > 0) - || (ret < 0 && errno == EINTR)); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (lttng_live_is_canceled(lttng_live)) { + break; + } else { + continue; + } + } + } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; - /* ret = 0 means orderly shutdown, ret < 0 is error. */ + /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ return ret; } -static ssize_t lttng_live_send(int fd, const void *buf, size_t len) +static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connection, + const void *buf, size_t len) { + struct lttng_live_component *lttng_live = + viewer_connection->lttng_live; + BT_SOCKET sock = viewer_connection->control_sock; ssize_t ret; - do { - ret = bt_send_nosigpipe(fd, buf, len); - } while (ret < 0 && errno == EINTR); + for (;;) { + ret = bt_socket_send_nosigpipe(sock, buf, len); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { + if (lttng_live_is_canceled(lttng_live)) { + break; + } else { + continue; + } + } else { + break; + } + } return ret; } @@ -135,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -147,33 +171,34 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); - ret_len = lttng_live_send(viewer_connection->control_sock, &connect, sizeof(connect)); - if (ret_len < 0) { - BT_LOGE("Error sending version: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == cmd_buf_len); - ret_len = lttng_live_recv(viewer_connection->control_sock, &connect, sizeof(connect)); + ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving version: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(connect)); + BT_ASSERT(ret_len == sizeof(connect)); BT_LOGD("Received viewer session ID : %" PRIu64, - be64toh(connect.viewer_session_id)); + (uint64_t) be64toh(connect.viewer_session_id)); BT_LOGD("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor)); @@ -213,8 +238,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co goto error; } - if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - BT_LOGE("Socket creation failed: %s", strerror(errno)); + if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { + BT_LOGE("Socket creation failed: %s", bt_socket_errormsg()); goto error; } @@ -224,8 +249,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co memset(&(server_addr.sin_zero), 0, 8); if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, - sizeof(struct sockaddr)) == -1) { - BT_LOGE("Connection failed: %s", strerror(errno)); + sizeof(struct sockaddr)) == BT_SOCKET_ERROR) { + BT_LOGE("Connection failed: %s", bt_socket_errormsg()); goto error; } if (lttng_live_handshake(viewer_connection)) { @@ -237,23 +262,23 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co return ret; error: - if (viewer_connection->control_sock >= 0) { - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); + if (viewer_connection->control_sock != BT_INVALID_SOCKET) { + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); } } - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; return -1; } static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) { - if (viewer_connection->control_sock < 0) { + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); - viewer_connection->control_sock = -1; + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); + viewer_connection->control_sock = BT_INVALID_SOCKET; } } @@ -547,39 +572,39 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { struct lttng_viewer_session lsession; - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; if (list_append_session(results, @@ -611,47 +636,52 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &list, sizeof(list)); + ret_len = lttng_live_recv(viewer_connection, &list, sizeof(list)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(list)); + BT_ASSERT(ret_len == sizeof(list)); sessions_count = be32toh(list.sessions_count); for (i = 0; i < sessions_count; i++) { - ret_len = lttng_live_recv(viewer_connection->control_sock, + ret_len = lttng_live_recv(viewer_connection, &lsession, sizeof(lsession)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(lsession)); + BT_ASSERT(ret_len == sizeof(lsession)); lsession.hostname[LTTNG_VIEWER_HOST_NAME_MAX - 1] = '\0'; lsession.session_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; session_id = be64toh(lsession.id); + BT_LOGD("Adding session %" PRIu64 " hostname: %s session_name: %s", + session_id, lsession.hostname, lsession.session_name); + if ((strncmp(lsession.session_name, viewer_connection->session_name->str, - MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, - MAXNAMLEN) == 0)) { - if (lttng_live_add_session(lttng_live, session_id)) { + LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { + if (lttng_live_add_session(lttng_live, session_id, + lsession.hostname, + lsession.session_name)) { goto error; } } @@ -677,23 +707,23 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) cmd.data_size = htobe64((uint64_t) 0); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(cmd)); + BT_ASSERT(ret_len == sizeof(cmd)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &resp, sizeof(resp)); + ret_len = lttng_live_recv(viewer_connection, &resp, sizeof(resp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving create session reply: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(resp)); + BT_ASSERT(ret_len == sizeof(resp)); if (be32toh(resp.status) != LTTNG_VIEWER_CREATE_SESSION_OK) { BT_LOGE("Error creating viewer session"); @@ -726,16 +756,16 @@ int receive_streams(struct lttng_live_session *session, uint64_t stream_id; uint64_t ctf_trace_id; - ret_len = lttng_live_recv(viewer_connection->control_sock, &stream, sizeof(stream)); + ret_len = lttng_live_recv(viewer_connection, &stream, sizeof(stream)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving stream"); goto error; } - assert(ret_len == sizeof(stream)); + BT_ASSERT(ret_len == sizeof(stream)); stream.path_name[LTTNG_VIEWER_PATH_MAX - 1] = '\0'; stream.channel_name[LTTNG_VIEWER_NAME_MAX - 1] = '\0'; stream_id = be64toh(stream.id); @@ -746,7 +776,8 @@ int receive_streams(struct lttng_live_session *session, stream_id, stream.path_name, stream.channel_name); if (lttng_live_metadata_create_stream(session, - ctf_trace_id, stream_id)) { + ctf_trace_id, stream_id, + stream.path_name)) { BT_LOGE("Error creating metadata stream"); goto error; @@ -782,6 +813,8 @@ int lttng_live_attach_session(struct lttng_live_session *session) lttng_live->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (session->attached) { return 0; @@ -797,30 +830,30 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending attach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending attach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving attach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); switch(be32toh(rp.status)) { @@ -868,6 +901,8 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint64_t session_id = session->id; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->attached) { return 0; @@ -880,30 +915,30 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending detach request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending detach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving detach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch(be32toh(rp.status)) { case LTTNG_VIEWER_DETACH_SESSION_OK: @@ -943,36 +978,38 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, struct lttng_live_metadata *metadata = trace->metadata; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_metadata response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); switch (be32toh(rp.status)) { case LTTNG_VIEWER_METADATA_OK: @@ -1001,16 +1038,16 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("relay data zmalloc: %s", strerror(errno)); goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, data, len); + ret_len = lttng_live_recv(viewer_connection, data, len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error_free_data; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); do { ret_len = fwrite(data, 1, len, fp); @@ -1019,7 +1056,7 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGE("Writing in the metadata fp"); goto error_free_data; } - assert(ret_len == len); + BT_ASSERT(ret_len == len); free(data); ret = len; end: @@ -1038,8 +1075,8 @@ static void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, struct packet_index *pindex) { - assert(lindex); - assert(pindex); + BT_ASSERT(lindex); + BT_ASSERT(pindex); pindex->offset = be64toh(lindex->offset); pindex->packet_size = be64toh(lindex->packet_size); @@ -1050,7 +1087,7 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, } BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, +enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, struct packet_index *index) { @@ -1059,43 +1096,46 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li ssize_t ret_len; struct lttng_viewer_index rp; uint32_t flags, status; - enum bt_ctf_lttng_live_iterator_status retstatus = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status retstatus = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); + memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_next_index response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); flags = be32toh(rp.flags); status = be32toh(rp.status); @@ -1111,7 +1151,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li stream->current_inactivity_timestamp = index->ts_cycles.timestamp_end; ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; @@ -1127,7 +1167,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li lttng_index_to_packet_index(&rp, index); ctf_stream_class_id = be64toh(rp.stream_id); if (stream->ctf_stream_class_id != -1ULL) { - assert(stream->ctf_stream_class_id == + BT_ASSERT(stream->ctf_stream_class_id == ctf_stream_class_id); } else { stream->ctf_stream_class_id = ctf_stream_class_id; @@ -1150,14 +1190,14 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li case LTTNG_VIEWER_INDEX_RETRY: BT_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; break; case LTTNG_VIEWER_INDEX_ERR: @@ -1175,16 +1215,20 @@ end: return retstatus; error: - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return retstatus; } BT_HIDDEN -enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, +enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; struct lttng_viewer_trace_packet rp; @@ -1193,6 +1237,8 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1205,27 +1251,27 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_data request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_data response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg()); goto error; } if (ret_len != sizeof(rp)) { @@ -1246,7 +1292,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ BT_LOGD("get_data_packet: retry"); - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -1259,13 +1305,13 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; } BT_LOGE("get_data_packet: error"); goto error; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF; goto end; default: BT_LOGE("get_data_packet: unknown"); @@ -1276,22 +1322,26 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li goto error; } - ret_len = lttng_live_recv(viewer_connection->control_sock, buf, req_len); + ret_len = lttng_live_recv(viewer_connection, buf, req_len); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == req_len); + BT_ASSERT(ret_len == req_len); *recv_len = ret_len; end: return retstatus; error: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + } else { + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR; + } return retstatus; } @@ -1299,11 +1349,11 @@ error: * Request new streams for a session. */ BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( +enum bt_lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_live_session *session) { - enum bt_ctf_lttng_live_iterator_status status = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status status = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; @@ -1312,9 +1362,11 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); @@ -1324,30 +1376,30 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection->control_sock, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); - - ret_len = lttng_live_send(viewer_connection->control_sock, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); - ret_len = lttng_live_recv(viewer_connection->control_sock, &rp, sizeof(rp)); + BT_ASSERT(ret_len == cmd_buf_len); + ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving get_new_streams response"); goto error; } - assert(ret_len == sizeof(rp)); + BT_ASSERT(ret_len == sizeof(rp)); streams_count = be32toh(rp.streams_count); @@ -1361,7 +1413,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( case LTTNG_VIEWER_NEW_STREAMS_HUP: session->new_streams_needed = false; session->closed = true; - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_LOGE("get_new_streams error"); @@ -1378,22 +1430,31 @@ end: return status; error: - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + if (lttng_live_is_canceled(lttng_live)) { + status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + } else { + status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + } return status; } BT_HIDDEN struct bt_live_viewer_connection * - bt_live_viewer_connection_create(const char *url, FILE *error_fp) + bt_live_viewer_connection_create(const char *url, + struct lttng_live_component *lttng_live) { struct bt_live_viewer_connection *viewer_connection; viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + if (bt_socket_init() != 0) { + goto error; + } + bt_object_init(&viewer_connection->obj, connection_release); - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; - viewer_connection->error_fp = error_fp; + viewer_connection->lttng_live = lttng_live; viewer_connection->url = g_string_new(url); if (!viewer_connection->url) { goto error; @@ -1429,4 +1490,6 @@ void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_ g_string_free(viewer_connection->session_name, TRUE); } g_free(viewer_connection); + + bt_socket_fini(); }