X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=plugins%2Fctf%2Flttng-live%2Fviewer-connection.c;h=93ad97a5903427a6593e2ef9715c6f601df588a8;hb=464ebc311d460b29f681703aea0aa00eef9e6475;hp=aca5a3e3b2b5c40bb4b96617e207cf6d1f1c0dab;hpb=6f79a7cf4f5f065267895765f2277c6a12437f37;p=babeltrace.git diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index aca5a3e3..93ad97a5 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -30,17 +30,14 @@ #include #include #include -#include #include -#include -#include #include -#include -#include +#include +#include #include #include -#include +#include #include "lttng-live-internal.h" #include "viewer-connection.h" @@ -55,16 +52,16 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti size_t copied = 0, to_copy = len; struct lttng_live_component *lttng_live = viewer_connection->lttng_live; - int fd = viewer_connection->control_sock; + BT_SOCKET sock = viewer_connection->control_sock; do { - ret = recv(fd, buf + copied, to_copy, 0); + ret = bt_socket_recv(sock, buf + copied, to_copy, 0); if (ret > 0) { assert(ret <= to_copy); copied += ret; to_copy -= ret; } - if (ret < 0 && errno == EINTR) { + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { if (lttng_live_is_canceled(lttng_live)) { break; } else { @@ -74,7 +71,7 @@ static ssize_t lttng_live_recv(struct bt_live_viewer_connection *viewer_connecti } while (ret > 0 && to_copy > 0); if (ret > 0) ret = copied; - /* ret = 0 means orderly shutdown, ret < 0 is error. */ + /* ret = 0 means orderly shutdown, ret == BT_SOCKET_ERROR is error. */ return ret; } @@ -83,12 +80,12 @@ static ssize_t lttng_live_send(struct bt_live_viewer_connection *viewer_connecti { struct lttng_live_component *lttng_live = viewer_connection->lttng_live; - int fd = viewer_connection->control_sock; + BT_SOCKET sock = viewer_connection->control_sock; ssize_t ret; for (;;) { - ret = bt_send_nosigpipe(fd, buf, len); - if (ret < 0 && errno == EINTR) { + ret = bt_socket_send_nosigpipe(sock, buf, len); + if (ret == BT_SOCKET_ERROR && bt_socket_interrupted()) { if (lttng_live_is_canceled(lttng_live)) { break; } else { @@ -160,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -172,33 +171,34 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); - ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect)); - if (ret_len < 0) { - BT_LOGE("Error sending version: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(connect)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving version: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving version: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(connect)); BT_LOGD("Received viewer session ID : %" PRIu64, - be64toh(connect.viewer_session_id)); + (uint64_t) be64toh(connect.viewer_session_id)); BT_LOGD("Relayd version : %u.%u", be32toh(connect.major), be32toh(connect.minor)); @@ -238,8 +238,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co goto error; } - if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - BT_LOGE("Socket creation failed: %s", strerror(errno)); + if ((viewer_connection->control_sock = socket(AF_INET, SOCK_STREAM, 0)) == BT_INVALID_SOCKET) { + BT_LOGE("Socket creation failed: %s", bt_socket_errormsg()); goto error; } @@ -249,8 +249,8 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co memset(&(server_addr.sin_zero), 0, 8); if (connect(viewer_connection->control_sock, (struct sockaddr *) &server_addr, - sizeof(struct sockaddr)) == -1) { - BT_LOGE("Connection failed: %s", strerror(errno)); + sizeof(struct sockaddr)) == BT_SOCKET_ERROR) { + BT_LOGE("Connection failed: %s", bt_socket_errormsg()); goto error; } if (lttng_live_handshake(viewer_connection)) { @@ -262,23 +262,23 @@ static int lttng_live_connect_viewer(struct bt_live_viewer_connection *viewer_co return ret; error: - if (viewer_connection->control_sock >= 0) { - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); + if (viewer_connection->control_sock != BT_INVALID_SOCKET) { + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); } } - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; return -1; } static void lttng_live_disconnect_viewer(struct bt_live_viewer_connection *viewer_connection) { - if (viewer_connection->control_sock < 0) { + if (viewer_connection->control_sock == BT_INVALID_SOCKET) { return; } - if (close(viewer_connection->control_sock)) { - BT_LOGE("Close: %s", strerror(errno)); - viewer_connection->control_sock = -1; + if (bt_socket_close(viewer_connection->control_sock) == BT_SOCKET_ERROR) { + BT_LOGE("Close: %s", bt_socket_errormsg()); + viewer_connection->control_sock = BT_INVALID_SOCKET; } } @@ -573,8 +573,8 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(cmd)); @@ -584,8 +584,8 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(list)); @@ -600,8 +600,8 @@ struct bt_value *bt_live_viewer_connection_list_sessions(struct bt_live_viewer_c BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(lsession)); @@ -637,8 +637,8 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(cmd)); @@ -648,8 +648,8 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session list: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session list: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(list)); @@ -662,8 +662,8 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving session: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving session: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(lsession)); @@ -676,9 +676,9 @@ int lttng_live_query_session_ids(struct lttng_live_component *lttng_live) if ((strncmp(lsession.session_name, viewer_connection->session_name->str, - MAXNAMLEN) == 0) && (strncmp(lsession.hostname, + LTTNG_VIEWER_NAME_MAX) == 0) && (strncmp(lsession.hostname, viewer_connection->target_hostname->str, - MAXNAMLEN) == 0)) { + LTTNG_VIEWER_HOST_NAME_MAX) == 0)) { if (lttng_live_add_session(lttng_live, session_id, lsession.hostname, lsession.session_name)) { @@ -708,8 +708,8 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) cmd.cmd_version = htobe32(0); ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(cmd)); @@ -719,8 +719,8 @@ int lttng_live_create_viewer_session(struct lttng_live_component *lttng_live) BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving create session reply: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving create session reply: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(resp)); @@ -761,7 +761,7 @@ int receive_streams(struct lttng_live_session *session, BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving stream"); goto error; } @@ -813,6 +813,8 @@ int lttng_live_attach_session(struct lttng_live_session *session) lttng_live->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (session->attached) { return 0; @@ -828,27 +830,28 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending attach request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending attach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving attach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving attach response: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(rp)); @@ -899,6 +902,8 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint64_t session_id = session->id; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->attached) { return 0; @@ -911,27 +916,28 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending detach request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending detach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving detach response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving detach response: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(rp)); @@ -974,33 +980,36 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, struct lttng_live_metadata *metadata = trace->metadata; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_metadata request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_metadata response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_metadata response: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(rp)); @@ -1037,8 +1046,8 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, BT_LOGI("Remote side has closed connection"); goto error_free_data; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error_free_data; } assert(ret_len == len); @@ -1081,7 +1090,7 @@ void lttng_index_to_packet_index(struct lttng_viewer_index *lindex, } BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, +enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, struct packet_index *index) { @@ -1090,40 +1099,44 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li ssize_t ret_len; struct lttng_viewer_index rp; uint32_t flags, status; - enum bt_ctf_lttng_live_iterator_status retstatus = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status retstatus = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); + memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_next_index request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_next_index response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_next_index response: %s", bt_socket_errormsg()); goto error; } assert(ret_len == sizeof(rp)); @@ -1181,14 +1194,14 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_li case LTTNG_VIEWER_INDEX_RETRY: BT_LOGD("get_next_index: retry"); memset(index, 0, sizeof(struct packet_index)); - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; stream->state = LTTNG_LIVE_STREAM_ACTIVE_NO_DATA; goto end; case LTTNG_VIEWER_INDEX_HUP: BT_LOGD("get_next_index: stream hung up"); memset(index, 0, sizeof(struct packet_index)); index->offset = EOF; - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_END; stream->state = LTTNG_LIVE_STREAM_EOF; break; case LTTNG_VIEWER_INDEX_ERR: @@ -1207,19 +1220,19 @@ end: error: if (lttng_live_is_canceled(lttng_live)) { - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - retstatus = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + retstatus = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return retstatus; } BT_HIDDEN -enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, +enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_component *lttng_live, struct lttng_live_stream_iterator *stream, uint8_t *buf, uint64_t offset, uint64_t req_len, uint64_t *recv_len) { - enum bt_ctf_notif_iter_medium_status retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_OK; + enum bt_notif_iter_medium_status retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_get_packet rq; struct lttng_viewer_trace_packet rp; @@ -1228,6 +1241,8 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1240,27 +1255,28 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_data request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving get_data response: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving get_data response: %s", bt_socket_errormsg()); goto error; } if (ret_len != sizeof(rp)) { @@ -1281,7 +1297,7 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li case LTTNG_VIEWER_GET_PACKET_RETRY: /* Unimplemented by relay daemon */ BT_LOGD("get_data_packet: retry"); - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; case LTTNG_VIEWER_GET_PACKET_ERR: if (flags & LTTNG_VIEWER_FLAG_NEW_METADATA) { @@ -1294,13 +1310,13 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li } if (flags & (LTTNG_VIEWER_FLAG_NEW_METADATA | LTTNG_VIEWER_FLAG_NEW_STREAM)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; goto end; } BT_LOGE("get_data_packet: error"); goto error; case LTTNG_VIEWER_GET_PACKET_EOF: - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_EOF; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_EOF; goto end; default: BT_LOGE("get_data_packet: unknown"); @@ -1316,8 +1332,8 @@ enum bt_ctf_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_li BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { - BT_LOGE("Error receiving trace packet: %s", strerror(errno)); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error receiving trace packet: %s", bt_socket_errormsg()); goto error; } assert(ret_len == req_len); @@ -1327,9 +1343,9 @@ end: error: if (lttng_live_is_canceled(lttng_live)) { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_AGAIN; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_AGAIN; } else { - retstatus = BT_CTF_NOTIF_ITER_MEDIUM_STATUS_ERROR; + retstatus = BT_NOTIF_ITER_MEDIUM_STATUS_ERROR; } return retstatus; } @@ -1338,11 +1354,11 @@ error: * Request new streams for a session. */ BT_HIDDEN -enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( +enum bt_lttng_live_iterator_status lttng_live_get_new_streams( struct lttng_live_session *session) { - enum bt_ctf_lttng_live_iterator_status status = - BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + enum bt_lttng_live_iterator_status status = + BT_LTTNG_LIVE_ITERATOR_STATUS_OK; struct lttng_viewer_cmd cmd; struct lttng_viewer_new_streams_request rq; struct lttng_viewer_new_streams_response rp; @@ -1351,9 +1367,11 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { - return BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_OK; + return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; } cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEW_STREAMS); @@ -1363,26 +1381,27 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len < 0) { - BT_LOGE("Error sending cmd: %s", strerror(errno)); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); - if (ret_len < 0) { - BT_LOGE("Error sending get_new_streams request: %s", strerror(errno)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); + if (ret_len == BT_SOCKET_ERROR) { + BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { BT_LOGI("Remote side has closed connection"); goto error; } - if (ret_len < 0) { + if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error receiving get_new_streams response"); goto error; } @@ -1400,7 +1419,7 @@ enum bt_ctf_lttng_live_iterator_status lttng_live_get_new_streams( case LTTNG_VIEWER_NEW_STREAMS_HUP: session->new_streams_needed = false; session->closed = true; - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_END; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_END; goto end; case LTTNG_VIEWER_NEW_STREAMS_ERR: BT_LOGE("get_new_streams error"); @@ -1418,9 +1437,9 @@ end: error: if (lttng_live_is_canceled(lttng_live)) { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_AGAIN; } else { - status = BT_CTF_LTTNG_LIVE_ITERATOR_STATUS_ERROR; + status = BT_LTTNG_LIVE_ITERATOR_STATUS_ERROR; } return status; } @@ -1434,8 +1453,12 @@ struct bt_live_viewer_connection * viewer_connection = g_new0(struct bt_live_viewer_connection, 1); + if (bt_socket_init() != 0) { + goto error; + } + bt_object_init(&viewer_connection->obj, connection_release); - viewer_connection->control_sock = -1; + viewer_connection->control_sock = BT_INVALID_SOCKET; viewer_connection->port = -1; viewer_connection->lttng_live = lttng_live; viewer_connection->url = g_string_new(url); @@ -1473,4 +1496,6 @@ void bt_live_viewer_connection_destroy(struct bt_live_viewer_connection *viewer_ g_string_free(viewer_connection->session_name, TRUE); } g_free(viewer_connection); + + bt_socket_fini(); }