The live protocol implementation is often sending content
on TCP sockets in two separate writes. One to send a command header,
and the second one sending the command's payload. This was presumably
done under the assumption that it would not result in two separate
TCP packets being sent on the network (or that it would not matter).
Delayed ACK-induced delays were observed [1] on the second write of the
"write header, write payload" sequence and result in problematic
latency build-ups for live clients connected to moderately/highly
active sessions.
Fundamentally, this problem arises due to the combination of Nagle's
algorithm and the delayed ACK mechanism which make write-write-read
sequences on TCP sockets problematic as near-constant latency is
expected when clients can keep-up with the event production rate.
In such a write-write-read sequence, the second write is held up until
the first write is acknowledged (TCP ACK). The solution implemented
by this patch bundles the writes into a single one [2].
[1] https://github.com/tbricks/wireshark-lttng-plugin
Basic Wireshark dissector for lttng-live by Anto Smyk from Itiviti
[2] https://lists.freebsd.org/pipermail/freebsd-net/2006-January/009527.html
Reported-by: Anton Smyk <anton.smyk@itiviti.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+ char cmd_buf[cmd_buf_len];
int ret;
ssize_t ret_len;
int ret;
ssize_t ret_len;
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
- ret_len = lttng_live_send(viewer_connection, &connect, sizeof(connect));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending version: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending version: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(connect));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &connect, sizeof(connect));
if (ret_len == 0) {
lttng_live->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
lttng_live->viewer_connection;
uint64_t session_id = session->id;
uint32_t streams_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (session->attached) {
return 0;
if (session->attached) {
return 0;
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending attach request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint64_t session_id = session->id;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint64_t session_id = session->id;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (!session->attached) {
return 0;
if (!session->attached) {
return 0;
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session_id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending detach request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
struct lttng_live_metadata *metadata = trace->metadata;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_metadata *metadata = trace->metadata;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
rq.stream_id = htobe64(metadata->stream_id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
rq.stream_id = htobe64(metadata->stream_id);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_METADATA);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_metadata request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(stream->viewer_stream_id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_next_index request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
struct lttng_live_trace *trace = stream->trace;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
BT_LOGD("lttng_live_get_stream_bytes: offset=%" PRIu64 ", req_len=%" PRIu64,
offset, req_len);
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
rq.offset = htobe64(offset);
rq.len = htobe32(req_len);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_data request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint32_t streams_count;
struct bt_live_viewer_connection *viewer_connection =
lttng_live->viewer_connection;
uint32_t streams_count;
+ const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+ char cmd_buf[cmd_buf_len];
if (!session->new_streams_needed) {
return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
if (!session->new_streams_needed) {
return BT_LTTNG_LIVE_ITERATOR_STATUS_OK;
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(session->id);
- ret_len = lttng_live_send(viewer_connection, &cmd, sizeof(cmd));
- if (ret_len == BT_SOCKET_ERROR) {
- BT_LOGE("Error sending cmd: %s", bt_socket_errormsg());
- goto error;
- }
- assert(ret_len == sizeof(cmd));
+ /*
+ * Merge the cmd and connection request to prevent a write-write
+ * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+ * second write to be performed quickly in presence of Nagle's algorithm.
+ */
+ memcpy(cmd_buf, &cmd, sizeof(cmd));
+ memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
- ret_len = lttng_live_send(viewer_connection, &rq, sizeof(rq));
+ ret_len = lttng_live_send(viewer_connection, &cmd_buf, cmd_buf_len);
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
goto error;
}
if (ret_len == BT_SOCKET_ERROR) {
BT_LOGE("Error sending get_new_streams request: %s", bt_socket_errormsg());
goto error;
}
- assert(ret_len == sizeof(rq));
+ assert(ret_len == cmd_buf_len);
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {
ret_len = lttng_live_recv(viewer_connection, &rp, sizeof(rp));
if (ret_len == 0) {