From: Julien Desfossez Date: Mon, 11 Sep 2017 20:21:01 +0000 (-0400) Subject: rotate pending working on the relay X-Git-Url: http://git.efficios.com/?a=commitdiff_plain;ds=sidebyside;h=735c026fa8751c367790182f5c46c9f79ed5e62c;p=deliverable%2Flttng-tools.git rotate pending working on the relay Signed-off-by: Julien Desfossez --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 348e161b7..d38cdfecb 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -2452,6 +2452,94 @@ end_no_session: return ret; } +static +int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_connection *conn) +{ + struct relay_session *session = conn->session; + struct lttcomm_relayd_rotate_pending msg; + struct lttcomm_relayd_generic_reply reply; + struct lttng_ht_iter iter; + struct relay_stream *stream; + int ret; + uint64_t chunk_id; + uint32_t rotate_pending; + + DBG("Rotate pending command received"); + fprintf(stderr, "Rotate pending command received\n"); + + if (!session || conn->version_check_done == 0) { + ERR("Trying to check for data before version check"); + ret = -1; + goto end_no_session; + } + + if (session->minor < 11) { + ERR("Unsupported feature before 2.11"); + ret = -1; + goto end_no_session; + } + + ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0); + if (ret < sizeof(msg)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", conn->sock->fd); + } else { + ERR("Relay didn't receive valid rotate_pending struct size : %d", + ret); + } + ret = -1; + goto end_no_session; + } + + chunk_id = be64toh(msg.chunk_id); + + rotate_pending = 0; + + rcu_read_lock(); + cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream, + node.node) { + if (!stream_get(stream)) { + continue; + } + if (stream->trace->session != session) { + stream_put(stream); + continue; + } + pthread_mutex_lock(&stream->lock); + if (stream->rotate_at_seq_num != -1ULL) { + rotate_pending = 1; + DBG("Stream %" PRIu64 " is still rotating", + stream->stream_handle); + } else if (stream->chunk_id < chunk_id) { + rotate_pending = 1; + DBG("Stream %" PRIu64 " did not exist on the consumer " + "when the last rotation started, but is" + "still waiting for data before getting" + "closed", + stream->stream_handle); + } + pthread_mutex_unlock(&stream->lock); + stream_put(stream); + if (rotate_pending) { + goto send_reply; + } + } + rcu_read_unlock(); + +send_reply: + memset(&reply, 0, sizeof(reply)); + reply.ret_code = htobe32(rotate_pending); + ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0); + if (ret < 0) { + ERR("Relay rotate pending ret code failed"); + } + +end_no_session: + return ret; +} + /* * Process the commands received on the control socket */ @@ -2506,6 +2594,9 @@ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_ROTATE_RENAME: ret = relay_rotate_rename(recv_hdr, conn); break; + case RELAYD_ROTATE_PENDING: + ret = relay_rotate_pending(recv_hdr, conn); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); diff --git a/src/bin/lttng-sessiond/cmd.c b/src/bin/lttng-sessiond/cmd.c index 88f33c156..cb32f7f7f 100644 --- a/src/bin/lttng-sessiond/cmd.c +++ b/src/bin/lttng-sessiond/cmd.c @@ -4177,6 +4177,32 @@ int cmd_rotate_session(struct ltt_session *session, goto error; } + /* + * If the user did not wait for the previous rotation to complete + * (--no-wait), we have to ensure now that the relay had time to + * receive all the data pending from the previous rotation. + */ + if (session->rotate_pending_relay) { + ret = relay_rotate_pending(session, + session->rotate_count - 1); + if (ret == 0) { + DBG("Previous rotation completed on the relay for session %s" + ", rotate_id %" PRIu64, + session->name, + session->rotate_count); + session->rotate_pending_relay = 0; + } else if (ret == 1) { + DBG("Session %s, rotate_id %" PRIu64 " still pending " + "on the relay", + session->name, session->rotate_count); + ret = -LTTNG_ERR_ROTATE_PENDING; + goto error; + } else { + ERR("Failed to check rotate pending on the relay"); + ret = -LTTNG_ERR_UNK; + } + } + /* Special case for the first rotation. */ if (session->rotate_count == 0) { const char *base_path = NULL; @@ -4310,16 +4336,22 @@ int cmd_rotate_pending(struct ltt_session *session, } else { /* * The consumer finished the rotation, but we don't - * know if the relay still has data pending. We need - * to find one consumer_output to talk to the relay - * and ask it. + * know if the relay still has data pending. We need to + * find one consumer_output to talk to the relay and + * ask it. + * + * (rotate_count - 1) is the chunk id that we want to + * make sure is completely flushed to disk on the + * relay. */ - ret = relay_rotate_pending(session); + ret = relay_rotate_pending(session, + session->rotate_count - 1); if (ret == 0) { DBG("Rotate completed on the relay for session %s" ", rotate_id %" PRIu64, session->name, session->rotate_count); + session->rotate_pending_relay = 0; (*pending_return)->status = LTTNG_ROTATE_COMPLETED; snprintf((*pending_return)->output_path, PATH_MAX, "%s", session->rotation_chunk.current_rotate_path); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index f901466f8..9cc035013 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1697,7 +1697,7 @@ int consumer_rotate_pending_relay(struct consumer_socket *socket, { int ret; struct lttcomm_consumer_msg msg; - int32_t ret_code = 0; + uint32_t pending = 0; assert(socket); @@ -1716,16 +1716,12 @@ int consumer_rotate_pending_relay(struct consumer_socket *socket, goto error; } - /* - * No need for a recv reply status because the answer to the command is - * the reply status message. - */ - ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code)); + ret = consumer_socket_recv(socket, &pending, sizeof(pending)); if (ret < 0) { goto error; } - ret = ret_code; + ret = pending; error: health_code_update(); diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 60f84a5fa..1c6e8a346 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -331,7 +331,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, struct consumer_output *output, char *current_path, char *new_path, uid_t uid, gid_t gid); -int consumer_rotate_pending_relay(struct consumer_socket *socket, +int consumer_rotate_pending_relay(struct consumer_socket *socket, struct consumer_output *output, uint64_t session_id, uint64_t chunk_id); diff --git a/src/bin/lttng-sessiond/rotate.c b/src/bin/lttng-sessiond/rotate.c index 498a3b67e..98406f429 100644 --- a/src/bin/lttng-sessiond/rotate.c +++ b/src/bin/lttng-sessiond/rotate.c @@ -295,7 +295,7 @@ end: return ret; } -int relay_rotate_pending(struct ltt_session *session) +int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id) { int ret; struct consumer_socket *socket; @@ -320,6 +320,8 @@ int relay_rotate_pending(struct ltt_session *session) goto end; } + ret = -1; + rcu_read_lock(); /* * We have to iterate to find a socket, but we only need to send the @@ -328,25 +330,13 @@ int relay_rotate_pending(struct ltt_session *session) */ cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) { pthread_mutex_lock(socket->lock); - /* - * (rotate_count - 1) is the chunk id that we want to make sure - * is completely flushed to disk on the relay. - */ ret = consumer_rotate_pending_relay(socket, output, session->id, - session->rotate_count - 1); + chunk_id); pthread_mutex_unlock(socket->lock); - if (ret) { - ERR("Consumer rename chunk"); - ret = -1; - rcu_read_unlock(); - goto end; - } break; } rcu_read_unlock(); - ret = 0; - end: return ret; } diff --git a/src/bin/lttng-sessiond/rotate.h b/src/bin/lttng-sessiond/rotate.h index 652ff4d3e..ecec8024d 100644 --- a/src/bin/lttng-sessiond/rotate.h +++ b/src/bin/lttng-sessiond/rotate.h @@ -32,6 +32,6 @@ int session_rename_chunk(struct ltt_session *session, char *current_path, int rename_complete_chunk(struct ltt_session *session, time_t ts); -int relay_rotate_pending(struct ltt_session *session); +int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id); #endif /* ROTATE_H */ diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 91722f3a3..b95730925 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1192,13 +1192,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Somehow, the session daemon is not responding anymore. */ goto end_nosignal; } - + break; } case LTTNG_CONSUMER_ROTATE_PENDING_RELAY: { + uint32_t pending; + DBG("Consumer rotate pending on relay for session %" PRIu64, msg.u.rotate_pending_relay.session_id); - ret = lttng_consumer_rotate_pending_relay( + pending = lttng_consumer_rotate_pending_relay( msg.u.rotate_pending_relay.session_id, msg.u.rotate_pending_relay.relayd_id, msg.u.rotate_pending_relay.chunk_id); @@ -1215,6 +1217,13 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } + /* Send back returned value to session daemon */ + ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending)); + if (ret < 0) { + PERROR("send data pending ret code"); + goto error_fatal; + } + break; } default: goto end_nosignal; diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 339ed85ab..af9f17835 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -1117,12 +1117,12 @@ int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id) reply.ret_code = be32toh(reply.ret_code); /* Return session id or negative ret code. */ - if (reply.ret_code != LTTNG_OK) { + if (reply.ret_code >= LTTNG_OK) { ret = -1; ERR("Relayd rotate pending replied error %d", reply.ret_code); } else { - /* Success */ - ret = 0; + /* No error, just rotate pending state */ + ret = reply.ret_code; } DBG("Relayd rotate pending completed successfully"); diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 0c022ade7..d2552cb63 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2006,6 +2006,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } health_code_update(); + ret_code = ret; ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) {