Fix live-comm: merge TCP socket write-write sequence in a single write
[babeltrace.git] / formats / lttng-live / lttng-live-comm.c
index 6541facfea57abd01ec711b2426d2ed0bfd3045a..c19cc6460b5b06ee07ed46e6e5b5f54adf17d80a 100644 (file)
@@ -31,7 +31,6 @@
 #include <errno.h>
 #include <inttypes.h>
 #include <fcntl.h>
-#include <sys/mman.h>
 #include <poll.h>
 
 #include <babeltrace/ctf/ctf-index.h>
@@ -53,6 +52,7 @@
 #include <babeltrace/compat/memstream.h>
 #include <babeltrace/compat/send.h>
 #include <babeltrace/compat/string.h>
+#include <babeltrace/compat/mman.h>
 
 #include "lttng-live.h"
 #include "lttng-viewer-abi.h"
@@ -157,6 +157,8 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
 {
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_connect connect;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
+       char cmd_buf[cmd_buf_len];
        int ret;
        ssize_t ret_len;
 
@@ -174,19 +176,20 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
        connect.minor = htobe32(LTTNG_LIVE_MINOR);
        connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);
 
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               perror("[error] Error sending cmd");
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));
 
-       ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
+       ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending version");
+               perror("[error] Error sending cmd for establishing session");
                goto error;
        }
-       assert(ret_len == sizeof(connect));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
        if (ret_len == 0) {
@@ -423,6 +426,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
        struct lttng_viewer_attach_session_request rq;
        struct lttng_viewer_attach_session_response rp;
        struct lttng_viewer_stream stream;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
        int ret, i;
        ssize_t ret_len;
 
@@ -441,19 +446,20 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
        // rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
        rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);
 
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               perror("[error] Error sending cmd");
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending attach request");
+               perror("[error] Error sending attach command and request");
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -641,6 +647,8 @@ int get_data_packet(struct lttng_live_ctx *ctx,
        struct lttng_viewer_cmd cmd;
        struct lttng_viewer_get_packet rq;
        struct lttng_viewer_trace_packet rp;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
        ssize_t ret_len;
        int ret;
 
@@ -650,17 +658,6 @@ retry:
                goto end;
        }
 
-       /*
-        * Flush the output between attempts to grab a packet, thus
-        * ensuring we flush at least at the periodical timer period.
-        * This ensures the output remains reactive for interactive users and
-        * that the output is flushed when redirected to a file by the shell.
-        */
-       if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) {
-               perror("fflush");
-               goto error;
-       }
-
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_PACKET);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
@@ -670,19 +667,20 @@ retry:
        rq.offset = htobe64(offset);
        rq.len = htobe32(len);
 
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               perror("[error] Error sending cmd");
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending get_data_packet request");
+               perror("[error] Error sending get_data_packet cmd and request");
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -806,6 +804,8 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx,
        struct lttng_viewer_metadata_packet rp;
        char *data = NULL;
        ssize_t ret_len;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (lttng_live_should_quit()) {
                ret = -1;
@@ -817,19 +817,20 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx,
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
        cmd.cmd_version = htobe32(0);
 
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               perror("[error] Error sending cmd");
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending get_metadata request");
+               perror("[error] Error sending get_metadata cmd and request");
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
        if (ret_len == 0) {
@@ -987,6 +988,8 @@ int get_next_index(struct lttng_live_ctx *ctx,
        int ret;
        ssize_t ret_len;
        struct lttng_viewer_index *rp = &viewer_stream->current_index;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
        cmd.data_size = htobe64((uint64_t) sizeof(rq));
@@ -995,24 +998,24 @@ int get_next_index(struct lttng_live_ctx *ctx,
        memset(&rq, 0, sizeof(rq));
        rq.stream_id = htobe64(viewer_stream->id);
 
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 retry:
        if (lttng_live_should_quit()) {
                ret = -1;
                goto end;
        }
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
+       ret_len = lttng_live_send(ctx->control_sock, &cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending cmd");
+               perror("[error] Error sending get_next_index cmd and request");
                goto error;
        }
-       assert(ret_len == sizeof(cmd));
-
-       ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
-       if (ret_len < 0) {
-               perror("[error] Error sending get_next_index request");
-               goto error;
-       }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
        if (ret_len == 0) {
@@ -1293,6 +1296,17 @@ retry:
                                cur_index->ts_real.timestamp_begin;
        }
 
+       /*
+        * Flush the output between attempts to grab a packet, thus
+        * ensuring we flush at least at the periodical timer period.
+        * This ensures the output remains reactive for interactive users and
+        * that the output is flushed when redirected to a file by the shell.
+        */
+       if (fflush(LTTNG_LIVE_OUTPUT_FP) < 0) {
+               perror("fflush");
+               goto end;
+       }
+
        if (pos->packet_size == 0 || pos->offset == EOF) {
                goto end;
        }
@@ -1515,6 +1529,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
        int ret, i, nb_streams = 0;
        ssize_t ret_len;
        uint32_t stream_count;
+       const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
+       char cmd_buf[cmd_buf_len];
 
        if (lttng_live_should_quit()) {
                ret = -1;
@@ -1528,19 +1544,20 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
        memset(&rq, 0, sizeof(rq));
        rq.session_id = htobe64(id);
 
-       ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
-       if (ret_len < 0) {
-               perror("[error] Error sending cmd");
-               goto error;
-       }
-       assert(ret_len == sizeof(cmd));
+       /*
+        * Merge the cmd and connection request to prevent a write-write
+        * sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
+        * second write to be performed quickly in presence of Nagle's algorithm.
+        */
+       memcpy(cmd_buf, &cmd, sizeof(cmd));
+       memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
 
-       ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
+       ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
        if (ret_len < 0) {
-               perror("[error] Error sending get_new_streams request");
+               perror("[error] Error sending get_new_streams cmd and request");
                goto error;
        }
-       assert(ret_len == sizeof(rq));
+       assert(ret_len == cmd_buf_len);
 
        ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
        if (ret_len == 0) {
This page took 0.026616 seconds and 4 git commands to generate.