Implement the RELAYD_ROTATE_PENDING relay daemon command
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 14 Dec 2017 16:00:30 +0000 (11:00 -0500)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Wed, 14 Mar 2018 01:12:33 +0000 (21:12 -0400)
This command allows the sessiond to check if a rotation is complete
from the relayd point of view. There can be a significant delay
between the time the consumer has finished extracting the data from
the buffers and the time the relay has finished writing them on disk,
and we can only inform the user that the rotation is complete when all
the data is on disk. So the RELAYD_ROTATE_PENDING command is used to
poll the relayd after the consumer has finished extracting the data
until everything is on the relayd disk.

This command also takes care of streams that did not exist on the
consumer when the rotation started, or streams that appeared after the
last rotation started. The chunk_id field is used to distinguish those
cases.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
src/bin/lttng-relayd/main.c
src/common/sessiond-comm/relayd.h
src/common/sessiond-comm/sessiond-comm.h

index e8283f037e43a3a9429b57ec0e606a44097192f2..a57b1d4b063e2d02f200fcd07f860276b1d6ae64 100644 (file)
@@ -2789,6 +2789,112 @@ end_no_reply:
        return ret;
 }
 
+/*
+ * Check if all the streams in the session have completed the last rotation.
+ * The chunk_id value is used to distinguish the cases where a stream was
+ * closed on the consumerd before the rotation started but it still active on
+ * the relayd, and the case where a stream appeared on the consumerd/relayd
+ * after the last rotation started (in that case, it is already writing in the
+ * new chunk folder).
+ */
+static
+int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct lttng_ht_iter iter;
+       struct relay_stream *stream;
+       int ret;
+       ssize_t network_ret;
+       uint64_t chunk_id;
+        bool rotate_pending = false;
+
+       DBG("Rotate pending command received");
+
+       if (!session || !conn->version_check_done) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       if (session->major == 2 && session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       network_ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+       if (network_ret < (ssize_t) sizeof(msg)) {
+               if (network_ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_pending struct size : %zi",
+                                       network_ret);
+               }
+               ret = -1;
+               goto end_no_reply;
+       }
+
+       chunk_id = be64toh(msg.chunk_id);
+       DBG("Evaluating rotate pending for chunk id %" PRIu64, chunk_id);
+
+       /*
+        * Iterate over all the streams in the session and check if they are
+        * still waiting for data to perform their rotation.
+        */
+       rcu_read_lock();
+       cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+                       node.node) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
+               if (stream->trace->session != session) {
+                       stream_put(stream);
+                       continue;
+               }
+               pthread_mutex_lock(&stream->lock);
+               if (stream->rotate_at_seq_num != -1ULL) {
+                       /* We have not yet performed the rotation. */
+                       rotate_pending = true;
+                       DBG("Stream %" PRIu64 " is still rotating",
+                                       stream->stream_handle);
+               } else if (stream->chunk_id < chunk_id) {
+                       /*
+                        * Stream closed on the consumer but still active on the
+                        * relay.
+                        */
+                       rotate_pending = true;
+                       DBG("Stream %" PRIu64 " did not exist on the consumer "
+                                       "when the last rotation started, but is"
+                                       "still waiting for data before getting"
+                                       "closed",
+                                       stream->stream_handle);
+               }
+               pthread_mutex_unlock(&stream->lock);
+               stream_put(stream);
+               if (rotate_pending) {
+                       goto send_reply;
+               }
+       }
+
+send_reply:
+       rcu_read_unlock();
+       memset(&reply, 0, sizeof(reply));
+       reply.ret_code = htobe32(rotate_pending ? 1 : 0);
+       network_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+                       sizeof(reply), 0);
+       if (network_ret < (ssize_t) sizeof(reply)) {
+               ERR("Relay rotate pending ret code failed");
+               ret = -1;
+       }
+
+end_no_reply:
+       return ret;
+}
+
 /*
  * Process the commands received on the control socket
  */
@@ -2843,6 +2949,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_ROTATE_RENAME:
                ret = relay_rotate_rename(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_PENDING:
+               ret = relay_rotate_pending(recv_hdr, conn);
+               break;
        case RELAYD_MKDIR:
                ret = relay_mkdir(recv_hdr, conn);
                break;
index 444c43af0aae454b32ecb8b86aedf33ed681f1b2..9a733df1e7aaa68c5c6e252988012fca0eda6eda 100644 (file)
@@ -212,6 +212,10 @@ struct lttcomm_relayd_rotate_rename {
        char paths[];
 } LTTNG_PACKED;
 
+struct lttcomm_relayd_rotate_pending {
+       uint64_t chunk_id;
+} LTTNG_PACKED;
+
 struct lttcomm_relayd_mkdir {
        /* Includes trailing NULL */
        uint32_t length;
index 78c7f5e2b6b9a1fc3cbf8a5fc55c15f1bc2d87d9..22d04770ffa1cd9bfc40c41c3cf84831dfbf4884 100644 (file)
@@ -127,6 +127,8 @@ enum lttcomm_relayd_command {
        RELAYD_ROTATE_STREAM                = 18,
        /* Rename a chunk after the rotation is completed (2.11+) */
        RELAYD_ROTATE_RENAME                = 19,
+       /* Check if a chunk has data pending (2.11+) */
+       RELAYD_ROTATE_PENDING               = 20,
        /* Create a folder on the relayd FS (2.11+) */
        RELAYD_MKDIR                        = 21,
 };
This page took 0.030359 seconds and 5 git commands to generate.