return ret;
}
+static
+int relay_rotate_pending(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+ struct lttng_ht_iter iter;
+ struct relay_stream *stream;
+ int ret;
+ uint64_t chunk_id;
+ uint32_t rotate_pending;
+
+ DBG("Rotate pending command received");
+ fprintf(stderr, "Rotate pending command received\n");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to check for data before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &msg, sizeof(msg), 0);
+ if (ret < sizeof(msg)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid rotate_pending struct size : %d",
+ ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ chunk_id = be64toh(msg.chunk_id);
+
+ rotate_pending = 0;
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
+ node.node) {
+ if (!stream_get(stream)) {
+ continue;
+ }
+ if (stream->trace->session != session) {
+ stream_put(stream);
+ continue;
+ }
+ pthread_mutex_lock(&stream->lock);
+ if (stream->rotate_at_seq_num != -1ULL) {
+ rotate_pending = 1;
+ DBG("Stream %" PRIu64 " is still rotating",
+ stream->stream_handle);
+ } else if (stream->chunk_id < chunk_id) {
+ rotate_pending = 1;
+ DBG("Stream %" PRIu64 " did not exist on the consumer "
+ "when the last rotation started, but is"
+ "still waiting for data before getting"
+ "closed",
+ stream->stream_handle);
+ }
+ pthread_mutex_unlock(&stream->lock);
+ stream_put(stream);
+ if (rotate_pending) {
+ goto send_reply;
+ }
+ }
+ rcu_read_unlock();
+
+send_reply:
+ memset(&reply, 0, sizeof(reply));
+ reply.ret_code = htobe32(rotate_pending);
+ ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
+ if (ret < 0) {
+ ERR("Relay rotate pending ret code failed");
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* Process the commands received on the control socket
*/
case RELAYD_ROTATE_RENAME:
ret = relay_rotate_rename(recv_hdr, conn);
break;
+ case RELAYD_ROTATE_PENDING:
+ ret = relay_rotate_pending(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
goto error;
}
+ /*
+ * If the user did not wait for the previous rotation to complete
+ * (--no-wait), we have to ensure now that the relay had time to
+ * receive all the data pending from the previous rotation.
+ */
+ if (session->rotate_pending_relay) {
+ ret = relay_rotate_pending(session,
+ session->rotate_count - 1);
+ if (ret == 0) {
+ DBG("Previous rotation completed on the relay for session %s"
+ ", rotate_id %" PRIu64,
+ session->name,
+ session->rotate_count);
+ session->rotate_pending_relay = 0;
+ } else if (ret == 1) {
+ DBG("Session %s, rotate_id %" PRIu64 " still pending "
+ "on the relay",
+ session->name, session->rotate_count);
+ ret = -LTTNG_ERR_ROTATE_PENDING;
+ goto error;
+ } else {
+ ERR("Failed to check rotate pending on the relay");
+ ret = -LTTNG_ERR_UNK;
+ }
+ }
+
/* Special case for the first rotation. */
if (session->rotate_count == 0) {
const char *base_path = NULL;
} else {
/*
* The consumer finished the rotation, but we don't
- * know if the relay still has data pending. We need
- * to find one consumer_output to talk to the relay
- * and ask it.
+ * know if the relay still has data pending. We need to
+ * find one consumer_output to talk to the relay and
+ * ask it.
+ *
+ * (rotate_count - 1) is the chunk id that we want to
+ * make sure is completely flushed to disk on the
+ * relay.
*/
- ret = relay_rotate_pending(session);
+ ret = relay_rotate_pending(session,
+ session->rotate_count - 1);
if (ret == 0) {
DBG("Rotate completed on the relay for session %s"
", rotate_id %" PRIu64,
session->name,
session->rotate_count);
+ session->rotate_pending_relay = 0;
(*pending_return)->status = LTTNG_ROTATE_COMPLETED;
snprintf((*pending_return)->output_path, PATH_MAX, "%s",
session->rotation_chunk.current_rotate_path);
{
int ret;
struct lttcomm_consumer_msg msg;
- int32_t ret_code = 0;
+ uint32_t pending = 0;
assert(socket);
goto error;
}
- /*
- * No need for a recv reply status because the answer to the command is
- * the reply status message.
- */
- ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ ret = consumer_socket_recv(socket, &pending, sizeof(pending));
if (ret < 0) {
goto error;
}
- ret = ret_code;
+ ret = pending;
error:
health_code_update();
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
uid_t uid, gid_t gid);
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
struct consumer_output *output, uint64_t session_id,
uint64_t chunk_id);
return ret;
}
-int relay_rotate_pending(struct ltt_session *session)
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id)
{
int ret;
struct consumer_socket *socket;
goto end;
}
+ ret = -1;
+
rcu_read_lock();
/*
* We have to iterate to find a socket, but we only need to send the
*/
cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) {
pthread_mutex_lock(socket->lock);
- /*
- * (rotate_count - 1) is the chunk id that we want to make sure
- * is completely flushed to disk on the relay.
- */
ret = consumer_rotate_pending_relay(socket, output, session->id,
- session->rotate_count - 1);
+ chunk_id);
pthread_mutex_unlock(socket->lock);
- if (ret) {
- ERR("Consumer rename chunk");
- ret = -1;
- rcu_read_unlock();
- goto end;
- }
break;
}
rcu_read_unlock();
- ret = 0;
-
end:
return ret;
}
int rename_complete_chunk(struct ltt_session *session, time_t ts);
-int relay_rotate_pending(struct ltt_session *session);
+int relay_rotate_pending(struct ltt_session *session, uint64_t chunk_id);
#endif /* ROTATE_H */
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
-
+ break;
}
case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
{
+ uint32_t pending;
+
DBG("Consumer rotate pending on relay for session %" PRIu64,
msg.u.rotate_pending_relay.session_id);
- ret = lttng_consumer_rotate_pending_relay(
+ pending = lttng_consumer_rotate_pending_relay(
msg.u.rotate_pending_relay.session_id,
msg.u.rotate_pending_relay.relayd_id,
msg.u.rotate_pending_relay.chunk_id);
goto end_nosignal;
}
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending, sizeof(pending));
+ if (ret < 0) {
+ PERROR("send data pending ret code");
+ goto error_fatal;
+ }
+ break;
}
default:
goto end_nosignal;
reply.ret_code = be32toh(reply.ret_code);
/* Return session id or negative ret code. */
- if (reply.ret_code != LTTNG_OK) {
+ if (reply.ret_code >= LTTNG_OK) {
ret = -1;
ERR("Relayd rotate pending replied error %d", reply.ret_code);
} else {
- /* Success */
- ret = 0;
+ /* No error, just rotate pending state */
+ ret = reply.ret_code;
}
DBG("Relayd rotate pending completed successfully");
}
health_code_update();
+ ret_code = ret;
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {