rotate pending working on the relay
authorJulien Desfossez <jdesfossez@efficios.com>
Mon, 11 Sep 2017 20:21:01 +0000 (16:21 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Mon, 11 Sep 2017 20:21:01 +0000 (16:21 -0400)
Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/bin/lttng-relayd/main.c
src/bin/lttng-sessiond/cmd.c
src/bin/lttng-sessiond/consumer.c
src/bin/lttng-sessiond/consumer.h
src/bin/lttng-sessiond/rotate.c
src/bin/lttng-sessiond/rotate.h
src/common/kernel-consumer/kernel-consumer.c
src/common/relayd/relayd.c
src/common/ust-consumer/ust-consumer.c

index 348e161b77899e96500ee9a0f4425effc623bd09..d38cdfecb9c6cd6069d848c5e4cfb5566dc9a04e 100644 (file)
@@ -2452,6 +2452,94 @@ end_no_session:
        return ret;
 }
 
+static
+int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
+               struct relay_connection *conn)
+{
+       struct relay_session *session = conn->session;
+       struct lttcomm_relayd_rotate_pending msg;
+       struct lttcomm_relayd_generic_reply reply;
+       struct lttng_ht_iter iter;
+       struct relay_stream *stream;
+       int ret;
+       uint64_t chunk_id;
+       uint32_t rotate_pending;
+
+       DBG("Rotate pending command received");
+       fprintf(stderr, "Rotate pending command received\n");
+
+       if (!session || conn->version_check_done == 0) {
+               ERR("Trying to check for data before version check");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       if (session->minor < 11) {
+               ERR("Unsupported feature before 2.11");
+               ret = -1;
+               goto end_no_session;
+       }
+
+       ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+       if (ret < sizeof(msg)) {
+               if (ret == 0) {
+                       /* Orderly shutdown. Not necessary to print an error. */
+                       DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+               } else {
+                       ERR("Relay didn't receive valid rotate_pending struct size : %d",
+                                       ret);
+               }
+               ret = -1;
+               goto end_no_session;
+       }
+
+       chunk_id = be64toh(msg.chunk_id);
+
+       rotate_pending = 0;
+
+       rcu_read_lock();
+       cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+                       node.node) {
+               if (!stream_get(stream)) {
+                       continue;
+               }
+               if (stream->trace->session != session) {
+                       stream_put(stream);
+                       continue;
+               }
+               pthread_mutex_lock(&stream->lock);
+               if (stream->rotate_at_seq_num != -1ULL) {
+                       rotate_pending = 1;
+                       DBG("Stream %" PRIu64 " is still rotating",
+                                       stream->stream_handle);
+               } else if (stream->chunk_id < chunk_id) {
+                       rotate_pending = 1;
+                       DBG("Stream %" PRIu64 " did not exist on the consumer "
+                                       "when the last rotation started, but is"
+                                       "still waiting for data before getting"
+                                       "closed",
+                                       stream->stream_handle);
+               }
+               pthread_mutex_unlock(&stream->lock);
+               stream_put(stream);
+               if (rotate_pending) {
+                       goto send_reply;
+               }
+       }
+       rcu_read_unlock();
+
+send_reply:
+       memset(&reply, 0, sizeof(reply));
+       reply.ret_code = htobe32(rotate_pending);
+       ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+       if (ret < 0) {
+               ERR("Relay rotate pending ret code failed");
+       }
+
+end_no_session:
+       return ret;
+}
+
 /*
  * Process the commands received on the control socket
  */
@@ -2506,6 +2594,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr,
        case RELAYD_ROTATE_RENAME:
                ret = relay_rotate_rename(recv_hdr, conn);
                break;
+       case RELAYD_ROTATE_PENDING:
+               ret = relay_rotate_pending(recv_hdr, conn);
+               break;
        case RELAYD_UPDATE_SYNC_INFO:
        default:
                ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
index 88f33c1568a77b21630f57d9486deafd61536763..cb32f7f7fb63419373af1be65635576a13cfc49a 100644 (file)
@@ -4177,6 +4177,32 @@ int cmd_rotate_session(struct ltt_session *session,
                goto error;
        }
 
+       /*
+        * If the user did not wait for the previous rotation to complete
+        * (--no-wait), we have to ensure now that the relay had time to
+        * receive all the data pending from the previous rotation.
+        */
+       if (session->rotate_pending_relay) {
+               ret = relay_rotate_pending(session,
+                               session->rotate_count - 1);
+               if (ret == 0) {
+                       DBG("Previous rotation completed on the relay for session %s"
+                                       ", rotate_id %" PRIu64,
+                                       session->name,
+                                       session->rotate_count);
+                       session->rotate_pending_relay = 0;
+               } else if (ret == 1) {
+                       DBG("Session %s, rotate_id %" PRIu64 " still pending "
+                                       "on the relay",
+                                       session->name, session->rotate_count);
+                       ret = -LTTNG_ERR_ROTATE_PENDING;
+                       goto error;
+               } else {
+                       ERR("Failed to check rotate pending on the relay");
+                       ret = -LTTNG_ERR_UNK;
+               }
+       }
+
        /* Special case for the first rotation. */
        if (session->rotate_count == 0) {
                const char *base_path = NULL;
@@ -4310,16 +4336,22 @@ int cmd_rotate_pending(struct ltt_session *session,
                } else {
                        /*
                         * The consumer finished the rotation, but we don't
-                        * know if the relay still has data pending. We need
-                        * to find one consumer_output to talk to the relay
-                        * and ask it.
+                        * know if the relay still has data pending. We need to
+                        * find one consumer_output to talk to the relay and
+                        * ask it.
+                        *
+                        * (rotate_count - 1) is the chunk id that we want to
+                        * make sure is completely flushed to disk on the
+                        * relay.
                         */
-                       ret = relay_rotate_pending(session);
+                       ret = relay_rotate_pending(session,
+                                       session->rotate_count - 1);
                        if (ret == 0) {
                                DBG("Rotate completed on the relay for session %s"
                                                ", rotate_id %" PRIu64,
                                                session->name,
                                                session->rotate_count);
+                               session->rotate_pending_relay = 0;
                                (*pending_return)->status = LTTNG_ROTATE_COMPLETED;
                                snprintf((*pending_return)->output_path, PATH_MAX, "%s",
                                                session->rotation_chunk.current_rotate_path);
index f901466f8d96e9f35590506b829e436a4d9bcd03..9cc0350130915feda11e375a32e95e5efcbb2c94 100644 (file)
@@ -1697,7 +1697,7 @@ int consumer_rotate_pending_relay(struct consumer_socket *socket,
 {
        int ret;
        struct lttcomm_consumer_msg msg;
-       int32_t ret_code = 0;
+       uint32_t pending = 0;
 
        assert(socket);
 
@@ -1716,16 +1716,12 @@ int consumer_rotate_pending_relay(struct consumer_socket *socket,
                goto error;
        }
 
-       /*
-        * No need for a recv reply status because the answer to the command is
-        * the reply status message.
-        */
-       ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+       ret = consumer_socket_recv(socket, &pending, sizeof(pending));
        if (ret < 0) {
                goto error;
        }
 
-       ret = ret_code;
+       ret = pending;
 
 error:
        health_code_update();
index 60f84a5fa839b7cb558df0507aae43e45df0ac99..1c6e8a3464bb2f97f6915a68919b928cf2584292 100644 (file)
@@ -331,7 +331,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
 int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
                struct consumer_output *output, char *current_path, char *new_path,
                uid_t uid, gid_t gid);
-int consumer_rotate_pending_relay(struct consumer_socket *socket, 
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
                struct consumer_output *output, uint64_t session_id,
                uint64_t chunk_id);
 
index 498a3b67e0e2c564fb15fe48d97788783d94ab21..98406f4298fee1b55b91362e6714460d2c7f9495 100644 (file)
@@ -295,7 +295,7 @@ end:
        return ret;
 }
 
-int relay_rotate_pending(struct ltt_session *session)
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
 {
        int ret;
        struct consumer_socket *socket;
@@ -320,6 +320,8 @@ int relay_rotate_pending(struct ltt_session *session)
                goto end;
        }
 
+       ret = -1;
+
        rcu_read_lock();
        /*
         * We have to iterate to find a socket, but we only need to send the
@@ -328,25 +330,13 @@ int relay_rotate_pending(struct ltt_session *session)
         */
        cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) {
                pthread_mutex_lock(socket->lock);
-               /*
-                * (rotate_count - 1) is the chunk id that we want to make sure
-                * is completely flushed to disk on the relay.
-                */
                ret = consumer_rotate_pending_relay(socket, output, session->id,
-                               session->rotate_count - 1);
+                               chunk_id);
                pthread_mutex_unlock(socket->lock);
-               if (ret) {
-                       ERR("Consumer rename chunk");
-                       ret = -1;
-                       rcu_read_unlock();
-                       goto end;
-               }
                break;
        }
        rcu_read_unlock();
 
-       ret = 0;
-
 end:
        return ret;
 }
index 652ff4d3ec89c531c3b032389b708347a07d88ff..ecec8024da9d9463345e6179fe0b154fff306232 100644 (file)
@@ -32,6 +32,6 @@ int session_rename_chunk(struct ltt_session *session, char *current_path,
 
 int rename_complete_chunk(struct ltt_session *session, time_t ts);
 
-int relay_rotate_pending(struct ltt_session *session);
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
 
 #endif /* ROTATE_H */
index 91722f3a32a58a11c0a0135dbb80ca4feff7f934..b957309258f0c38654020abd33355c0b4b3ee1c6 100644 (file)
@@ -1192,13 +1192,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        /* Somehow, the session daemon is not responding anymore. */
                        goto end_nosignal;
                }
-
+               break;
        }
        case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
        {
+               uint32_t pending;
+
                DBG("Consumer rotate pending on relay for session %" PRIu64,
                                msg.u.rotate_pending_relay.session_id);
-               ret = lttng_consumer_rotate_pending_relay(
+               pending = lttng_consumer_rotate_pending_relay(
                                msg.u.rotate_pending_relay.session_id,
                                msg.u.rotate_pending_relay.relayd_id,
                                msg.u.rotate_pending_relay.chunk_id);
@@ -1215,6 +1217,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        goto end_nosignal;
                }
 
+               /* Send back returned value to session daemon */
+               ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending));
+               if (ret < 0) {
+                       PERROR("send data pending ret code");
+                       goto error_fatal;
+               }
+               break;
        }
        default:
                goto end_nosignal;
index 339ed85ab535080ee79320a59e20e534956b8bd8..af9f1783527dc46ef46718094e5e23bb0faf4fec 100644 (file)
@@ -1117,12 +1117,12 @@ int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
        reply.ret_code = be32toh(reply.ret_code);
 
        /* Return session id or negative ret code. */
-       if (reply.ret_code != LTTNG_OK) {
+       if (reply.ret_code >= LTTNG_OK) {
                ret = -1;
                ERR("Relayd rotate pending replied error %d", reply.ret_code);
        } else {
-               /* Success */
-               ret = 0;
+               /* No error, just rotate pending state */
+               ret = reply.ret_code;
        }
 
        DBG("Relayd rotate pending completed successfully");
index 0c022ade7e9eaa60798ceffebd450270a5799f5a..d2552cb63f451a8d7416ca4d0a696e0bc4904244 100644 (file)
@@ -2006,6 +2006,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                }
 
                health_code_update();
+               ret_code = ret;
 
                ret = consumer_send_status_msg(sock, ret_code);
                if (ret < 0) {
This page took 0.034155 seconds and 5 git commands to generate.