CUSTOM: relayd protocol: ignore reply on relayd_send_index and relayd_send_close_stream
[lttng-tools.git] / src / common / relayd / relayd.c
index 2adcbe415c63f16a176dba9167c247f34eb0eba3..97a5941f75cb63fbd8c38baafaea73612f7b9970 100644 (file)
@@ -72,20 +72,67 @@ static int send_command(struct lttcomm_relayd_sock *rsock,
                memcpy(buf + sizeof(header), data, size);
        }
 
+       DBG3("Relayd sending command %d of size %" PRIu64, (int) cmd, buf_size);
        ret = rsock->sock.ops->sendmsg(&rsock->sock, buf, buf_size, flags);
        if (ret < 0) {
+               PERROR("Failed to send command %d of size %" PRIu64,
+                               (int) cmd, buf_size);
                ret = -errno;
                goto error;
        }
-
-       DBG3("Relayd sending command %d of size %" PRIu64, cmd, buf_size);
-
 error:
        free(buf);
 alloc_error:
        return ret;
 }
 
+static int recv_reply_ignore(struct lttcomm_relayd_sock *rsock, size_t size)
+{
+       int ret;
+       size_t cutfoff = 128;
+
+       /*
+        * To prevent ever growing size of recv_reply to ignore, if the number
+        * of bytes we want to ignore is bigger than `cutoff`, consume half of
+        * the cutoff. We might block on it but still, most of bytes to ignore
+        * should already be ready to consume at this point.
+        *
+        * This kind of scenario can easily happen on stopped session with a
+        * live_timer since no actual receive is done on the socket that would
+        * discard the `ignore` portion.
+        *
+        * TCP guarantee in-order transmission both on send and receive so this
+        * is safe to do.
+        */
+       if (rsock->bytes_to_ignore_on_recv >= cutfoff) {
+               size_t to_discard = cutfoff / 2;
+
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, to_discard, MSG_TRUNC);
+               if (ret <= 0 || ret != to_discard) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to discard failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, to_discard, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+
+               DBG("Force discard of %zu bytes for socket %d", to_discard, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv -= to_discard;
+       }
+
+       DBG3("Relayd ignore reply of %zu bytes for socket %d.", size, rsock->sock.fd);
+       /* Do not wait for the current reply to be ignored */
+       rsock->bytes_to_ignore_on_recv += size;
+       ret = 0;
+
+error:
+       return ret;
+}
+
 /*
  * Receive reply data on socket. This MUST be call after send_command or else
  * could result in unexpected behavior(s).
@@ -98,6 +145,26 @@ static int recv_reply(struct lttcomm_relayd_sock *rsock, void *data, size_t size
                return -ECONNRESET;
        }
 
+       /*
+        * We have to consume the bytes that are marked to ignore.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+                       ret = -1;
+                       goto error;
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd waiting for reply of size %zu", size);
 
        ret = rsock->sock.ops->recvmsg(&rsock->sock, data, size, 0);
@@ -320,6 +387,109 @@ error:
        return ret;
 }
 
+/*
+ * Add stream on the relayd. Send part.
+ *
+ * On success return 0 else return ret_code negative value.
+ */
+int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name,
+               const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count)
+{
+       int ret;
+       struct lttcomm_relayd_add_stream msg;
+       struct lttcomm_relayd_add_stream_2_2 msg_2_2;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+       assert(channel_name);
+       assert(pathname);
+
+       DBG("Relayd adding stream for channel name %s. Part send", channel_name);
+
+       /* Compat with relayd 2.1 */
+       if (rsock->minor == 1) {
+               memset(&msg, 0, sizeof(msg));
+               if (lttng_strncpy(msg.channel_name, channel_name,
+                               sizeof(msg.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg.pathname, pathname,
+                               sizeof(msg.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               memset(&msg_2_2, 0, sizeof(msg_2_2));
+               /* Compat with relayd 2.2+ */
+               if (lttng_strncpy(msg_2_2.channel_name, channel_name,
+                               sizeof(msg_2_2.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg_2_2.pathname, pathname,
+                               sizeof(msg_2_2.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+               msg_2_2.tracefile_size = htobe64(tracefile_size);
+               msg_2_2.tracefile_count = htobe64(tracefile_count);
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       }
+
+       DBG("Relayd add stream sent for channel name %s.", channel_name);
+       ret = 0;
+
+error:
+       return ret;
+}
+
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id)
+{
+       int ret;
+       struct lttcomm_relayd_status_stream reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       /* Waiting for reply */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Back to host bytes order. */
+       reply.handle = be64toh(reply.handle);
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd add stream replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+               *_stream_id = reply.handle;
+       }
+
+       DBG("Relayd stream added successfully with handle %" PRIu64,
+               reply.handle);
+
+error:
+       return ret;
+}
+
 /*
  * Inform the relay that all the streams for the current channel has been sent.
  *
@@ -378,7 +548,8 @@ end:
  * If major versions are compatible, we assign minor_to_use to the
  * minor version of the procotol we are going to use for this session.
  *
- * Return 0 if compatible else negative value.
+ * Return 0 if the two daemons are compatible, LTTNG_ERR_RELAYD_VERSION_FAIL
+ * otherwise, or a negative value on network errors.
  */
 int relayd_version_check(struct lttcomm_relayd_sock *rsock)
 {
@@ -420,7 +591,7 @@ int relayd_version_check(struct lttcomm_relayd_sock *rsock)
         */
        if (msg.major != rsock->major) {
                /* Not compatible */
-               ret = -1;
+               ret = LTTNG_ERR_RELAYD_VERSION_FAIL;
                DBG2("Relayd version is NOT compatible. Relayd version %u != %u (us)",
                                msg.major, rsock->major);
                goto error;
@@ -522,6 +693,27 @@ int relayd_close(struct lttcomm_relayd_sock *rsock)
                goto end;
        }
 
+       /*
+        * This ensure that we do not close the socket while the lttng-relayd
+        * expects to be able to send a response that we skipped.
+        * While we loose some time to receive everything, this keep the
+        * protocol intact from the point of view of lttng-relayd.
+        */
+       if (rsock->bytes_to_ignore_on_recv != 0) {
+               ret = rsock->sock.ops->recvmsg(&rsock->sock, NULL, rsock->bytes_to_ignore_on_recv, MSG_TRUNC);
+               if (ret <= 0 || ret != rsock->bytes_to_ignore_on_recv) {
+                       if (ret == 0) {
+                               /* Orderly shutdown. */
+                               DBG("Socket %d has performed an orderly shutdown", rsock->sock.fd);
+                       } else {
+                               DBG("Receiving reply to skip failed on sock %d for size %zu with ret %d",
+                                               rsock->sock.fd, rsock->bytes_to_ignore_on_recv, ret);
+                       }
+               }
+               DBG("Discarded %zu bytes on sock %d", rsock->bytes_to_ignore_on_recv, rsock->sock.fd);
+               rsock->bytes_to_ignore_on_recv = 0;
+       }
+
        DBG3("Relayd closing socket %d", rsock->sock.fd);
 
        if (rsock->sock.ops) {
@@ -603,23 +795,16 @@ int relayd_send_close_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Discard response since we do not really care for it and that TCP
+        * guarantee in-order delivery. As for error handling, there is not much
+        * to do at this point (closing).
+        **/
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd close stream replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
        DBG("Relayd close stream id %" PRIu64 " successfully", stream_id);
 
 error:
@@ -865,23 +1050,18 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
                goto error;
        }
 
-       /* Receive response */
-       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       /*
+        * Ignore the response. TCP guarantee in-order arrival and the overall
+        * protocol do not rely on hard ordering between the control and data
+        * socket for index.
+        * Indexes are sent either at the end of the buffer consumption or
+        * during the live timer.
+        */
+       ret = recv_reply_ignore(rsock, sizeof(reply));
        if (ret < 0) {
                goto error;
        }
 
-       reply.ret_code = be32toh(reply.ret_code);
-
-       /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
-               ret = -1;
-               ERR("Relayd send index replied error %d", reply.ret_code);
-       } else {
-               /* Success */
-               ret = 0;
-       }
-
 error:
        return ret;
 }
This page took 0.028322 seconds and 5 git commands to generate.