CUSTOM: relayd protocol: ignore reply on relayd_send_index and relayd_send_close_stream
[lttng-tools.git] / src / common / relayd / relayd.c
index afcd2d838b3d0f7b2e256ba368a1fac635651135..97a5941f75cb63fbd8c38baafaea73612f7b9970 100644 (file)
@@ -86,6 +86,53 @@ alloc_error:
        return ret;
 }
 
+static int recv_reply_ignore(struct lttcomm_relayd_sock *rsock, size_t size)
+{
+       int ret;
+       size_t cutfoff = 128;
+
+       /*
+        * To prevent ever growing size of recv_reply to ignore, if the number
+        * of bytes we want to ignore is bigger than `cutoff`, consume half of
+        * the cutoff. We might block on it but still, most of bytes to ignore
+        * should already be ready to consume at this point.
+        *
+        * This kind of scenario can easily happen on stopped session with a
+        * live_timer since no actual receive is done on the socket that would
+        * discard the `ignore` portion.
+        *
+        * TCP guarantee in-order transmission both on send and receive so this
+        * is safe to do.
+        */
+       if (rsock->bytes_to_ignore_on_recv >= cutfoff) {
+               size_t to_discard = cutfoff / 2;
+
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, to_discard, MSG_TRUNC);
+               if (ret <= 0 || ret != to_discard) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to discard failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, to_discard, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+
+               DBG("Force discard of %zu bytes for socket %d", to_discard, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv -= to_discard;
+       }
+
+       DBG3("Relayd ignore reply of %zu bytes for socket %d.", size, rsock->sock.fd);
+       /* Do not wait for the current reply to be ignored */
+       rsock->bytes_to_ignore_on_recv += size;
+       ret = 0;
+
+error:
+       return ret;
+}
+
 /*
  * Receive reply data on socket. This MUST be call after send_command or else
  * could result in unexpected behavior(s).
@@ -98,6 +145,26 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size
                return -ECONNRESET;
        }
 
+       /*
+        * We have to consume the bytes that are marked to ignore.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd waiting for reply of size %zu", size);
 
        ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
@@ -626,6 +693,27 @@ int relayd_close(struct lttcomm_relayd_sock *rsock)
                goto end;
        }
 
+       /*
+        * This ensure that we do not close the socket while the lttng-relayd
+        * expects to be able to send a response that we skipped.
+        * While we loose some time to receive everything, this keep the
+        * protocol intact from the point of view of lttng-relayd.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd closing socket %d", rsock->sock.fd);
 
        if (rsock->sock.ops) {
@@ -707,23 +795,16 @@ int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Discard response since we do not really care for it and that TCP
+        * guarantee in-order delivery. As for error handling, there is not much
+        * to do at this point (closing).
+        **/
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd close stream replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
        DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
 
 error:
@@ -969,23 +1050,18 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Ignore the response. TCP guarantee in-order arrival and the overall
+        * protocol do not rely on hard ordering between the control and data
+        * socket for index.
+        * Indexes are sent either at the end of the buffer consumption or
+        * during the live timer.
+        */
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd send index replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
 error:
        return ret;
 }
This page took 0.040347 seconds and 5 git commands to generate.