CUSTOM: relayd protocol: ignore reply on relayd_send_index and relayd_send_close_stream
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 18 Jan 2021 19:44:34 +0000 (14:44 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 25 Jan 2021 19:21:05 +0000 (14:21 -0500)
Note: this patch is not a bugfix, it is a targeted modification to
improve timing and predictability for particular scenarios.

Note: this patch only applies to userspace tracing.

Part of this work might find itself upstream but most probably not in
this form. Deeper work should be done upstream to mitigate the real root
of the problem which is how the network protocol for live works and
lifetime management of the different component required for the live
feature.

Scenario:
=======
   System with high CPU resource restriction (base high workload 99% cpu load),
   High CPU count (32),
   Slowish network (ping ~60-100ms),
   Timing constraint for create, configure, start, stop , destroy , create ... cycles.

A lot of time is wasted waiting for response that do not provide more
information and are essentially not pertinent. Here the focus is done on
the reply for relayd_send_index and relayd_send_close_stream.

relayd_send_index is used at the end of a buffer consumption and do not
require further protocol synchronization on lttng-sessiond side. This
allows us to simply mark the bytes of the response as `to ignore` on the
next real receive operation.

relayd_send_index is also used to send empty index during the live
timer. Again there is now direct value in waiting for the reply here
since tcp guarantee send/receive ordering.

relayd_send_close_stream again does not benefit from waiting on the
reply considering that even in the possibility of error there is not
much we can do since we are closing it on lttng-sessiond side.

NOTE:
   Three call sites are responsible for "purging" the unwanted reply:
   recv_reply, recv_reply_ignore and relayd_close.

   recv_reply simply try to receive all bytes to ignore at the time it
   is called. This is to preserve the protocol integrity.

   recv_reply_ignore will issue a recv call if a cutoff for `bytes to
   ignore` (128 currently) is met. The recv call will recv and discard
   the hald the quantity of the cutoff (64 in this patch). This ensure
   that long period with no real receive operation do not lead to buffer
   bloat. We slowly consume data that is "useless". The amount we
   receive is totally arbitrary and is only set to half of the cutoff to
   `give a chance to the runner` since there is a high probability that
   they are available. In any case, we block if it is not the case.

   relayd_close must wait for all `ignored bytes` to ensure that we do
   not close the socket before the relayd have sent all its reply.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Change-Id: I5da9b364c73498e8c0156c71960432229932ecb5

src/common/relayd/relayd.c
src/common/sessiond-comm/sessiond-comm.h

index afcd2d838b3d0f7b2e256ba368a1fac635651135..97a5941f75cb63fbd8c38baafaea73612f7b9970 100644 (file)
@@ -86,6 +86,53 @@ alloc_error:
        return ret;
 }
 
+static int recv_reply_ignore(struct lttcomm_relayd_sock *rsock, size_t size)
+{
+       int ret;
+       size_t cutfoff = 128;
+
+       /*
+        * To prevent ever growing size of recv_reply to ignore, if the number
+        * of bytes we want to ignore is bigger than `cutoff`, consume half of
+        * the cutoff. We might block on it but still, most of bytes to ignore
+        * should already be ready to consume at this point.
+        *
+        * This kind of scenario can easily happen on stopped session with a
+        * live_timer since no actual receive is done on the socket that would
+        * discard the `ignore` portion.
+        *
+        * TCP guarantee in-order transmission both on send and receive so this
+        * is safe to do.
+        */
+       if (rsock->bytes_to_ignore_on_recv >= cutfoff) {
+               size_t to_discard = cutfoff / 2;
+
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, to_discard, MSG_TRUNC);
+               if (ret <= 0 || ret != to_discard) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to discard failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, to_discard, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+
+               DBG("Force discard of %zu bytes for socket %d", to_discard, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv -= to_discard;
+       }
+
+       DBG3("Relayd ignore reply of %zu bytes for socket %d.", size, rsock->sock.fd);
+       /* Do not wait for the current reply to be ignored */
+       rsock->bytes_to_ignore_on_recv += size;
+       ret = 0;
+
+error:
+       return ret;
+}
+
 /*
  * Receive reply data on socket. This MUST be call after send_command or else
  * could result in unexpected behavior(s).
@@ -98,6 +145,26 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size
                return -ECONNRESET;
        }
 
+       /*
+        * We have to consume the bytes that are marked to ignore.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd waiting for reply of size %zu", size);
 
        ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
@@ -626,6 +693,27 @@ int relayd_close(struct lttcomm_relayd_sock *rsock)
                goto end;
        }
 
+       /*
+        * This ensure that we do not close the socket while the lttng-relayd
+        * expects to be able to send a response that we skipped.
+        * While we loose some time to receive everything, this keep the
+        * protocol intact from the point of view of lttng-relayd.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd closing socket %d", rsock->sock.fd);
 
        if (rsock->sock.ops) {
@@ -707,23 +795,16 @@ int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Discard response since we do not really care for it and that TCP
+        * guarantee in-order delivery. As for error handling, there is not much
+        * to do at this point (closing).
+        **/
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd close stream replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
        DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
 
 error:
@@ -969,23 +1050,18 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Ignore the response. TCP guarantee in-order arrival and the overall
+        * protocol do not rely on hard ordering between the control and data
+        * socket for index.
+        * Indexes are sent either at the end of the buffer consumption or
+        * during the live timer.
+        */
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd send index replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
 error:
        return ret;
 }
index ceb56f6717eb0f2e36b18274424178ca3916d87c..18347742f8dfd6882d098e945d06f44b00765fb6 100644 (file)
@@ -208,6 +208,8 @@ struct lttcomm_relayd_sock {
        struct lttcomm_sock sock;
        uint32_t major;
        uint32_t minor;
+       /* The number of bytes to ignore on the next receive. */
+       size_t bytes_to_ignore_on_recv;
 } LTTNG_PACKED;
 
 struct lttcomm_net_family {
This page took 0.030252 seconds and 5 git commands to generate.