}
session->rotate_count++;
- session->rotate_pending = 1;
+ session->rotate_pending = true;
session->rotate_status = LTTNG_ROTATE_STARTED;
/*
} else if (session->rotate_status == LTTNG_ROTATE_EMPTY) {
DBG("Nothing to rotate");
(*pending_return)->status = LTTNG_ROTATE_EMPTY;
+ /* Rotate with a relay */
+ } else if (session->rotate_pending_relay) {
+ /* The consumer has not finished the rotation. */
+ if (session->rotate_pending) {
+ DBG("Session %s, rotate_id %" PRIu64 " still pending",
+ session->name, session->rotate_count);
+ (*pending_return)->status = LTTNG_ROTATE_STARTED;
+ } else {
+ /*
+ * The consumer finished the rotation, but we don't
+ * know if the relay still has data pending. We need
+ * to find one consumer_output to talk to the relay
+ * and ask it.
+ */
+ ret = relay_rotate_pending(session);
+ if (ret == 0) {
+ DBG("Rotate completed on the relay for session %s"
+ ", rotate_id %" PRIu64,
+ session->name,
+ session->rotate_count);
+ (*pending_return)->status = LTTNG_ROTATE_COMPLETED;
+ snprintf((*pending_return)->output_path, PATH_MAX, "%s",
+ session->rotation_chunk.current_rotate_path);
+ } else if (ret == 1) {
+ DBG("Session %s, rotate_id %" PRIu64 " still pending "
+ "on the relay",
+ session->name, session->rotate_count);
+ (*pending_return)->status = LTTNG_ROTATE_STARTED;
+ } else {
+ ERR("Failed to check rotate pending on the relay");
+ (*pending_return)->status = LTTNG_ROTATE_ERROR;
+ }
+ }
} else if (session->rotate_pending) {
DBG("Session %s, rotate_id %" PRIu64 " still pending",
session->name, session->rotate_count);
*/
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *app_pathname, uint32_t metadata, uint64_t new_chunk_id)
+ char *app_pathname, uint32_t metadata, uint64_t new_chunk_id,
+ bool *rotate_pending_relay)
{
int ret;
struct lttcomm_consumer_msg msg;
output->dst.net.base_dir,
output->chunk_path, app_pathname);
fprintf(stderr, "SENDING: %s\n", msg.u.rotate_channel.pathname);
+ *rotate_pending_relay = true;
} else {
msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s%s%s",
msg.u.rotate_rename.gid = gid;
snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s", current_path);
snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s", new_path);
- fprintf(stderr, "rotate rename from %s to %s\n", current_path,
- new_path);
-
if (output->type == CONSUMER_DST_NET) {
msg.u.rotate_rename.relayd_id = output->net_seq_index;
}
health_code_update();
- fprintf(stderr, "send %d to the consumer\n",
- LTTNG_CONSUMER_ROTATE_RENAME);
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
- ret = 0;
+error:
+ health_code_update();
+ return ret;
+}
+
+/*
+ * Ask the relay if a rotation is still pending. Must be called with the socket
+ * lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+ struct consumer_output *output, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+ int32_t ret_code = 0;
+
+ assert(socket);
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64, session_id);
+ assert(output->type == CONSUMER_DST_NET);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
+ msg.u.rotate_pending_relay.session_id = session_id;
+ msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
+ msg.u.rotate_pending_relay.chunk_id = chunk_id;
+
+ health_code_update();
ret = consumer_send_msg(socket, &msg);
if (ret < 0) {
goto error;
}
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = ret_code;
+
error:
health_code_update();
return ret;
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *tmp, uint32_t metadata, uint64_t new_chunk_id);
+ char *tmp, uint32_t metadata, uint64_t new_chunk_id,
+ bool *rotate_pending_relay);
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
uid_t uid, gid_t gid);
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+ struct consumer_output *output, uint64_t session_id,
+ uint64_t chunk_id);
#endif /* _CONSUMER_H */
ret = consumer_rotate_channel(socket, chan->fd,
ksess->uid, ksess->gid, ksess->consumer,
- "", 0,
- session->rotate_count);
+ "", 0, session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
pthread_mutex_unlock(socket->lock);
pthread_mutex_lock(socket->lock);
ret = consumer_rotate_channel(socket, ksess->metadata->fd,
ksess->uid, ksess->gid, ksess->consumer, "", 1,
- session->rotate_count);
+ session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
pthread_mutex_unlock(socket->lock);
char *new_path = NULL;
int ret;
- /*
- * TODO 2: on first rotate, the current_rotate_path is the session root
- * path, so move the kernel/ and ust/ folders inside the
- * "session->last_chunk_start_ts-now()"
- */
-
timeinfo = localtime(&ts);
strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
return ret;
}
+int relay_rotate_pending(struct ltt_session *session)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct consumer_output *output;
+ struct lttng_ht_iter iter;
+
+ /*
+ * Either one of the sessions is enough to find the consumer_output
+ * and uid/gid.
+ */
+ if (session->kernel_session) {
+ output = session->kernel_session->consumer;
+ } else if (session->ust_session) {
+ output = session->ust_session->consumer;
+ } else {
+ assert(0);
+ }
+
+ if (!output || !output->socks) {
+ ERR("No consumer output found");
+ ret = -1;
+ goto end;
+ }
+
+ rcu_read_lock();
+ /*
+ * We have to iterate to find a socket, but we only need to send the
+ * rotate pending command to one consumer, so we break after the first
+ * one.
+ */
+ cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) {
+ pthread_mutex_lock(socket->lock);
+ /*
+ * (rotate_count - 1) is the chunk id that we want to make sure
+ * is completely flushed to disk on the relay.
+ */
+ ret = consumer_rotate_pending_relay(socket, output, session->id,
+ session->rotate_count - 1);
+ pthread_mutex_unlock(socket->lock);
+ if (ret) {
+ ERR("Consumer rename chunk");
+ ret = -1;
+ rcu_read_unlock();
+ goto end;
+ }
+ break;
+ }
+ rcu_read_unlock();
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
int rename_complete_chunk(struct ltt_session *session, time_t ts);
+int relay_rotate_pending(struct ltt_session *session);
+
#endif /* ROTATE_H */
ERR("Failed to rename completed rotation chunk");
goto end;
}
- channel_info->session->rotate_pending = 0;
+ channel_info->session->rotate_pending = false;
}
channel_rotation_info_destroy(channel_info);
goto error;
}
+ new_session->rotate_pending = false;
+ new_session->rotate_pending_relay = false;
+
/* Add new session to the session list */
session_lock_list();
new_session->id = add_session_list(new_session);
* Number of session rotation for this session.
*/
uint64_t rotate_count;
- unsigned int rotate_pending:1;
+ bool rotate_pending;
+ bool rotate_pending_relay;
enum lttng_rotate_status rotate_status;
/*
* Number of channels waiting for a rotate.
* When this number reaches 0, we can handle the rename of the chunk
* folder and inform the client that the rotate is finished.
- *
- * TODO: replace rotate_pending checks by that.
*/
unsigned int nr_chan_rotate_pending;
struct ltt_session_chunk rotation_chunk;
reg_chan->consumer_key,
usess->uid, usess->gid,
usess->consumer, pathname, 0,
- session->rotate_count);
+ session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
goto error;
}
reg->registry->reg.ust->metadata_key,
usess->uid, usess->gid,
usess->consumer, pathname, 1,
- session->rotate_count);
+ session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
goto error;
}
ret = consumer_rotate_channel(socket, ua_chan->key,
ua_sess->euid, ua_sess->egid,
ua_sess->consumer, pathname, 0,
- session->rotate_count);
+ session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
goto error;
}
ret = consumer_rotate_channel(socket, registry->metadata_key,
ua_sess->euid, ua_sess->egid,
ua_sess->consumer, pathname, 1,
- session->rotate_count);
+ session->rotate_count,
+ &session->rotate_pending_relay);
if (ret < 0) {
goto error;
}
}
if (nr_app == 0 && nr_channels == 0) {
- session->rotate_pending = 0;
+ session->rotate_pending = false;
session->rotate_status = LTTNG_ROTATE_EMPTY;
}
{
return 0;
}
+
+static inline
+int ust_app_relay_rotate_pending(struct ltt_session *session)
+{
+ return 0;
+}
#endif /* HAVE_LIBLTTNG_UST_CTL */
#endif /* _LTT_UST_APP_H */
return rotate_rename_local(current_path, new_path, uid, gid);
}
}
+
+int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+ uint64_t relayd_id, uint64_t chunk_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+
+end:
+ return ret;
+
+}
LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE,
LTTNG_CONSUMER_ROTATE_CHANNEL,
LTTNG_CONSUMER_ROTATE_RENAME,
+ LTTNG_CONSUMER_ROTATE_PENDING_RELAY,
};
/* State of each fd in consumer */
struct lttng_consumer_local_data *ctx);
int lttng_consumer_rotate_rename(char *current_path, char *new_path,
uid_t uid, gid_t gid, uint64_t relayd_id);
+int lttng_consumer_rotate_pending_relay( uint64_t session_id,
+ uint64_t relayd_id, uint64_t chunk_id);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */
goto end_nosignal;
}
+ }
+ case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ {
+ DBG("Consumer rotate pending on relay for session %" PRIu64,
+ msg.u.rotate_pending_relay.session_id);
+ ret = lttng_consumer_rotate_pending_relay(
+ msg.u.rotate_pending_relay.session_id,
+ msg.u.rotate_pending_relay.relayd_id,
+ msg.u.rotate_pending_relay.chunk_id);
+ if (ret < 0) {
+ ERR("Rotate pending relay failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
}
default:
goto end_nosignal;
msg.stream_id = htobe64(stream_id);
msg.new_chunk_id = htobe64(new_chunk_id);
/*
- * the seq_num is invalid for metadata streams, but it is ignored on
+ * The seq_num is invalid for metadata streams, but it is ignored on
* the relay.
*/
msg.rotate_at_seq_num = htobe64(seq_num);
return ret;
}
+
+int relayd_rotate_pending(struct lttcomm_relayd_sock *rsock, uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_relayd_rotate_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ DBG("Relayd rotate pending");
+
+ memset(&msg, 0, sizeof(msg));
+ msg.chunk_id = htobe64(chunk_id);
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ROTATE_PENDING, (void *) &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd rotate pending replied error %d", reply.ret_code);
+ } else {
+ /* Success */
+ ret = 0;
+ }
+
+ DBG("Relayd rotate pending completed successfully");
+
+error:
+ return ret;
+
+}
const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
const char *current_path, const char *new_path);
+int relayd_rotate_pending(struct lttcomm_relayd_sock *sock,
+ uint64_t chunk_id);
#endif /* _RELAYD_H */
char new_path[LTTNG_PATH_MAX];
} LTTNG_PACKED;
+struct lttcomm_relayd_rotate_pending {
+ uint64_t chunk_id;
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_ROTATE_STREAM = 18,
/* Rename a chunk after the rotation is completed (2.11+) */
RELAYD_ROTATE_RENAME = 19,
+ /* Check if a chunk has data pending (2.11+) */
+ RELAYD_ROTATE_PENDING = 20,
};
/*
char new_path[PATH_MAX];
uint64_t relayd_id; /* Relayd id if apply. */
uint64_t session_id;
- uint32_t create; /* Create new_path before move. */
uint32_t uid;
uint32_t gid;
} LTTNG_PACKED rotate_rename;
+ struct {
+ uint64_t relayd_id;
+ uint64_t session_id;
+ uint64_t chunk_id;
+ } LTTNG_PACKED rotate_pending_relay;
} u;
} LTTNG_PACKED;
goto end_nosignal;
}
+ }
+ case LTTNG_CONSUMER_ROTATE_PENDING_RELAY:
+ {
+ DBG("Consumer rotate pending on relay for session %" PRIu64,
+ msg.u.rotate_pending_relay.session_id);
+ ret = lttng_consumer_rotate_pending_relay(
+ msg.u.rotate_pending_relay.session_id,
+ msg.u.rotate_pending_relay.relayd_id,
+ msg.u.rotate_pending_relay.chunk_id);
+ if (ret < 0) {
+ ERR("Rotate pending relay failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
}
default:
break;