From bdcbd52e40f0f395f137a20bf94d41c1281715f7 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Fri, 24 Nov 2017 15:48:31 -0500 Subject: [PATCH] Fix live-comm: merge TCP socket write-write sequence in a single write MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The live protocol implementation is often sending content on TCP sockets in two separate writes. One to send a command header, and the second one sending the command's payload. This was presumably done under the assumption that it would not result in two separate TCP packets being sent on the network (or that it would not matter). Delayed ACK-induced delays were observed [1] on the second write of the "write header, write payload" sequence and result in problematic latency build-ups for live clients connected to moderately/highly active sessions. Fundamentally, this problem arises due to the combination of Nagle's algorithm and the delayed ACK mechanism which make write-write-read sequences on TCP sockets problematic as near-constant latency is expected when clients can keep-up with the event production rate. In such a write-write-read sequence, the second write is held up until the first write is acknowledged (TCP ACK). The solution implemented by this patch bundles the writes into a single one [2]. [1] https://github.com/tbricks/wireshark-lttng-plugin Basic Wireshark dissector for lttng-live by Anto Smyk from Itiviti [2] https://lists.freebsd.org/pipermail/freebsd-net/2006-January/009527.html Reported-by: Anton Smyk Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- plugins/ctf/lttng-live/viewer-connection.c | 134 ++++++++++++--------- 1 file changed, 78 insertions(+), 56 deletions(-) diff --git a/plugins/ctf/lttng-live/viewer-connection.c b/plugins/ctf/lttng-live/viewer-connection.c index e66bd1e8..93ad97a5 100644 --- a/plugins/ctf/lttng-live/viewer-connection.c +++ b/plugins/ctf/lttng-live/viewer-connection.c @@ -157,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect { struct lttng_viewer_cmd cmd; struct lttng_viewer_connect connect; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect); + char cmd_buf[cmd_buf_len]; int ret; ssize_t ret_len; @@ -169,19 +171,20 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect connect.minor = htobe32(LTTNG_LIVE_MINOR); connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect)); - ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending version: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(connect)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect)); if (ret_len == 0) { @@ -810,6 +813,8 @@ int lttng_live_attach_session(struct lttng_live_session *session) lttng_live->viewer_connection; uint64_t session_id = session->id; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (session->attached) { return 0; @@ -825,19 +830,20 @@ int lttng_live_attach_session(struct lttng_live_session *session) // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING); rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending attach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { @@ -896,6 +902,8 @@ int lttng_live_detach_session(struct lttng_live_session *session) struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint64_t session_id = session->id; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->attached) { return 0; @@ -908,19 +916,20 @@ int lttng_live_detach_session(struct lttng_live_session *session) memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending detach request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { @@ -971,25 +980,28 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace, struct lttng_live_metadata *metadata = trace->metadata; struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; rq.stream_id = htobe64(metadata->stream_id); cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { @@ -1092,27 +1104,31 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX); cmd.data_size = htobe64((uint64_t) sizeof(rq)); cmd.cmd_version = htobe32(0); + memset(&rq, 0, sizeof(rq)); rq.stream_id = htobe64(stream->viewer_stream_id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { @@ -1225,6 +1241,8 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; struct lttng_live_trace *trace = stream->trace; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64, offset, req_len); @@ -1237,19 +1255,20 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c rq.offset = htobe64(offset); rq.len = htobe32(req_len); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { @@ -1348,6 +1367,8 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams( struct bt_live_viewer_connection *viewer_connection = lttng_live->viewer_connection; uint32_t streams_count; + const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq); + char cmd_buf[cmd_buf_len]; if (!session->new_streams_needed) { return BT_LTTNG_LIVE_ITERATOR_STATUS_OK; @@ -1360,19 +1381,20 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams( memset(&rq, 0, sizeof(rq)); rq.session_id = htobe64(session->id); - ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd)); - if (ret_len == BT_SOCKET_ERROR) { - BT_LOGE("Error sending cmd: %s", bt_socket_errormsg()); - goto error; - } - assert(ret_len == sizeof(cmd)); + /* + * Merge the cmd and connection request to prevent a write-write + * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the + * second write to be performed quickly in presence of Nagle's algorithm. + */ + memcpy(cmd_buf, &cmd, sizeof(cmd)); + memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq)); - ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq)); + ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len); if (ret_len == BT_SOCKET_ERROR) { BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg()); goto error; } - assert(ret_len == sizeof(rq)); + assert(ret_len == cmd_buf_len); ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp)); if (ret_len == 0) { -- 2.34.1