Fix live-comm: merge TCP socket write-write sequence in a single write
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Fri, 24 Nov 2017 20:48:31 +0000 (15:48 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Thu, 31 May 2018 06:45:54 +0000 (02:45 -0400)
The live protocol implementation is often sending content
on TCP sockets in two separate writes. One to send a command header,
and the second one sending the command's payload. This was presumably
done under the assumption that it would not result in two separate
TCP packets being sent on the network (or that it would not matter).

Delayed ACK-induced delays were observed [1] on the second write of the
"write header, write payload" sequence and result in problematic
latency build-ups for live clients connected to moderately/highly
active sessions.

Fundamentally, this problem arises due to the combination of Nagle's
algorithm and the delayed ACK mechanism which make write-write-read
sequences on TCP sockets problematic as near-constant latency is
expected when clients can keep-up with the event production rate.

In such a write-write-read sequence, the second write is held up until
the first write is acknowledged (TCP ACK). The solution implemented
by this patch bundles the writes into a single one [2].

[1] https://github.com/tbricks/wireshark-lttng-plugin
Basic Wireshark dissector for lttng-live by Anto Smyk from Itiviti
[2] https://lists.freebsd.org/pipermail/freebsd-net/2006-January/009527.html

Reported-by: Anton Smyk <anton.smyk@itiviti.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
plugins/ctf/lttng-live/viewer-connection.c

index e66bd1e8aa9c0929b80b97476956af02ebf5a909..93ad97a5903427a6593e2ef9715c6f601df588a8 100644 (file)
@@ -157,6 +157,8 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+       char cmd_buf[cmd_buf_len];
        int ret;
        ssize_t ret_len;
 
@@ -169,19 +171,20 @@ static int lttng_live_handshake(struct bt_live_viewer_connection *viewer_connect
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
 
-       ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending version: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
        if (ret_len == 0) {
@@ -810,6 +813,8 @@ int lttng_live_attach_session(struct lttng_live_session *session)
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (session->attached) {
                return 0;
@@ -825,19 +830,20 @@ int lttng_live_attach_session(struct lttng_live_session *session)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -896,6 +902,8 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint64_t session_id = session->id;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->attached) {
                return 0;
@@ -908,19 +916,20 @@ int lttng_live_detach_session(struct lttng_live_session *session)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -971,25 +980,28 @@ ssize_t lttng_live_get_one_metadata_packet(struct lttng_live_trace *trace,
        struct lttng_live_metadata *metadata = trace->metadata;
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        rq.stream_id = htobe64(metadata->stream_id);
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1092,27 +1104,31 @@ enum bt_lttng_live_iterator_status lttng_live_get_next_index(struct lttng_live_c
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
+
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(stream->viewer_stream_id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1225,6 +1241,8 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        struct lttng_live_trace *trace = stream->trace;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
                        offset, req_len);
@@ -1237,19 +1255,20 @@ enum bt_notif_iter_medium_status lttng_live_get_stream_bytes(struct lttng_live_c
        rq.offset = htobe64(offset);
        rq.len = htobe32(req_len);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -1348,6 +1367,8 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
        struct bt_live_viewer_connection *viewer_connection =
                        lttng_live->viewer_connection;
        uint32_t streams_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (!session->new_streams_needed) {
                return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
@@ -1360,19 +1381,20 @@ enum bt_lttng_live_iterator_status lttng_live_get_new_streams(
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(session->id);
 
-       ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
-       if (ret_len == BT_SOCKET_ERROR) {
-               BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+       ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
        if (ret_len == BT_SOCKET_ERROR) {
                BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
        if (ret_len == 0) {
This page took 0.029703 seconds and 4 git commands to generate.