return ret;
}
+/*
+ * Ask the consumer to rotate a channel.
+ * app_pathname only used for UST, it contains the path after /ust/.
+ *
+ * The new_chunk_id is the session->rotate_count that has been incremented
+ * when the rotation started. On the relay, this allows to keep track in which
+ * chunk each stream is currently writing to (for the rotate_pending operation).
+ */
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *tmp, uint32_t metadata)
+ char *app_pathname, uint32_t metadata, uint64_t new_chunk_id,
+ bool *rotate_pending_relay)
{
int ret;
struct lttcomm_consumer_msg msg;
msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
msg.u.rotate_channel.key = key;
msg.u.rotate_channel.metadata = metadata;
-
+ msg.u.rotate_channel.new_chunk_id = new_chunk_id;
if (output->type == CONSUMER_DST_NET) {
- fprintf(stderr, "SUBDIR: %s\n", output->subdir);
+ fprintf(stderr, "BASE: %s\n", output->dst.net.base_dir);
+ fprintf(stderr, "CHUNK: %s\n", output->chunk_path);
msg.u.rotate_channel.relayd_id = output->net_seq_index;
- snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s/%s/%s",
- output->subdir,
- output->chunk_path, tmp);
+ snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s%s%s",
+ output->dst.net.base_dir,
+ output->chunk_path, app_pathname);
+ fprintf(stderr, "SENDING: %s\n", msg.u.rotate_channel.pathname);
+ *rotate_pending_relay = true;
} else {
msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
- snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s/%s/%s",
+ snprintf(msg.u.rotate_channel.pathname, PATH_MAX, "%s%s%s",
output->dst.session_root_path,
- output->chunk_path, tmp);
+ output->chunk_path, app_pathname);
fprintf(stderr, "rotate to %s\n",
msg.u.rotate_channel.pathname);
-
}
health_code_update();
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
- uint32_t create, uid_t uid, gid_t gid)
+ uid_t uid, gid_t gid)
{
int ret;
struct lttcomm_consumer_msg msg;
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME;
msg.u.rotate_rename.session_id = session_id;
- msg.u.rotate_rename.create = create;
msg.u.rotate_rename.uid = uid;
msg.u.rotate_rename.gid = gid;
+ snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s", current_path);
+ snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s", new_path);
if (output->type == CONSUMER_DST_NET) {
- fprintf(stderr, "SUBDIR: %s\n", output->subdir);
- fprintf(stderr, "SUBDIR: %s\n", output->dst.net.control.subdir);
- ERR("TODO");
- ret = -1;
msg.u.rotate_rename.relayd_id = output->net_seq_index;
- goto error;
} else {
msg.u.rotate_rename.relayd_id = (uint64_t) -1ULL;
- snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s",
- current_path);
- snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s",
- new_path);
- fprintf(stderr, "rotate rename from %s to %s\n", current_path,
- new_path);
}
health_code_update();
- fprintf(stderr, "send %d to the consumer\n",
- LTTNG_CONSUMER_ROTATE_RENAME);
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ goto error;
+ }
- ret = 0;
+error:
+ health_code_update();
+ return ret;
+}
+
+/*
+ * Ask the relay if a rotation is still pending. Must be called with the socket
+ * lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_rotate_pending_relay(struct consumer_socket *socket,
+ struct consumer_output *output, uint64_t session_id,
+ uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+ int32_t ret_code = 0;
+
+ assert(socket);
+
+ DBG("Consumer rotate pending on relay for session %" PRIu64, session_id);
+ assert(output->type == CONSUMER_DST_NET);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
+ msg.u.rotate_pending_relay.session_id = session_id;
+ msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
+ msg.u.rotate_pending_relay.chunk_id = chunk_id;
+
+ health_code_update();
ret = consumer_send_msg(socket, &msg);
if (ret < 0) {
goto error;
}
+ /*
+ * No need for a recv reply status because the answer to the command is
+ * the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &ret_code, sizeof(ret_code));
+ if (ret < 0) {
+ goto error;
+ }
+
+ ret = ret_code;
+
error:
health_code_update();
return ret;