return ret;
}
+static
+int rotate_index_file(struct relay_stream *stream)
+{
+ int ret;
+ uint32_t major, minor;
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+ stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->channel_name,
+ -1, -1, stream->tracefile_size,
+ tracefile_array_get_file_index_head(stream->tfa),
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor));
+ if (!stream->index_file) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
/*
* relay_rotate_stream: rotate a stream to a new tracefile for the session
* rotation feature (not the tracefile rotation feature).
be64toh(stream_info.stream_id), stream_info.new_pathname);
pthread_mutex_lock(&stream->lock);
+
+ /* Update the trace path (just the folder, the stream name does not change). */
free(stream->path_name);
stream->path_name = create_output_path(stream_info.new_pathname);
if (!stream->path_name) {
goto end_stream_unlock;
}
+ /* Rotate also the index if the stream is not a metadata stream. */
+ if (!stream->is_metadata) {
+ ret = rotate_index_file(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end_stream_unlock;
+ }
+ }
+
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream_put(stream);
return ret;
}
+/*
+ * relay_rotate_rename: rename the trace folder after the rotation is
+ * complete. We are not closing any fd here, just moving the folder, so it
+ * works even if data is still in flight.
+ */
+static int relay_rotate_rename(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_connection *conn)
+{
+ int ret, send_ret;
+ struct relay_session *session = conn->session;
+ struct lttcomm_relayd_rotate_rename stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ size_t len;
+ char *old = NULL, *new = NULL;
+
+ DBG("Rotate rename received");
+
+ if (!session || conn->version_check_done == 0) {
+ ERR("Trying to rename before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ if (session->minor < 11) {
+ ERR("Unsupported feature before 2.11");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &stream_info,
+ sizeof(stream_info), 0);
+ if (ret < sizeof(stream_info)) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Relay didn't receive valid rotate_rename struct size : %d", ret);
+ }
+ ret = -1;
+ goto end_no_session;
+ }
+
+ len = lttng_strnlen(stream_info.current_path,
+ sizeof(stream_info.current_path));
+ /* Ensure that NULL-terminated and fits in local filename length. */
+ if (len == sizeof(stream_info.current_path) || len >= LTTNG_NAME_MAX) {
+ ret = -ENAMETOOLONG;
+ ERR("Path name too long");
+ goto end;
+ }
+
+ len = lttng_strnlen(stream_info.new_path,
+ sizeof(stream_info.new_path));
+ /* Ensure that NULL-terminated and fits in local filename length. */
+ if (len == sizeof(stream_info.new_path) || len >= LTTNG_NAME_MAX) {
+ ret = -ENAMETOOLONG;
+ ERR("Path name too long");
+ goto end;
+ }
+
+ fprintf(stderr, "Renaming %s to %s/\n", stream_info.current_path,
+ stream_info.new_path);
+
+ old = create_output_path(stream_info.current_path);
+ if (!old) {
+ ERR("Failed to create current output path");
+ ret = -1;
+ goto end;
+ }
+
+ new = create_output_path(stream_info.new_path);
+ if (!new) {
+ ERR("Failed to create new output path");
+ ret = -1;
+ goto end;
+ }
+
+ ret = utils_mkdir_recursive(new, S_IRWXU | S_IRWXG,
+ -1, -1);
+ if (ret < 0) {
+ ERR("relay creating output directory");
+ goto end;
+ }
+
+ ret = rename(old, new);
+ if (ret < 0 && errno != ENOENT) {
+ PERROR("Rename completed rotation chunk");
+ goto end;
+ }
+
+end:
+ memset(&reply, 0, sizeof(reply));
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTNG_ERR_UNK);
+ } else {
+ reply.ret_code = htobe32(LTTNG_OK);
+ }
+ send_ret = conn->sock->ops->sendmsg(conn->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending stream id");
+ ret = send_ret;
+ }
+
+end_no_session:
+ free(old);
+ free(new);
+ return ret;
+}
/*
* Process the commands received on the control socket
case RELAYD_ROTATE_STREAM:
ret = relay_rotate_session_stream(recv_hdr, conn);
break;
+ case RELAYD_ROTATE_RENAME:
+ ret = relay_rotate_rename(recv_hdr, conn);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
}
if (rotate_index || !stream->index_file) {
- uint32_t major, minor;
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
- if (!stream->index_file) {
- ret = -1;
+ ret = rotate_index_file(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index");
/* Put self-ref for this index due to error. */
relay_index_put(index);
index = NULL;
return 0;
}
+static
+const char *get_base_path(struct ltt_session *session,
+ struct consumer_output *consumer)
+{
+ if (session->net_handle > 0) {
+ return consumer->dst.net.base_dir;
+ } else {
+ return consumer->dst.session_root_path;
+ }
+}
+
/*
* Command LTTNG_ROTATE_SESSION from the lttng-ctl library.
*
/* Special case for the first rotation. */
if (session->rotate_count == 0) {
+ const char *base_path = NULL;
+
/* Either one of the two sessions is enough to get the root path. */
if (session->kernel_session) {
- snprintf(session->rotation_chunk.current_rotate_path,
- PATH_MAX, "%s",
- session->kernel_session->consumer->dst.session_root_path);
+ base_path = get_base_path(session, session->kernel_session->consumer);
} else if (session->ust_session) {
- snprintf(session->rotation_chunk.current_rotate_path,
- PATH_MAX, "%s",
- session->ust_session->consumer->dst.session_root_path);
+ base_path = get_base_path(session, session->ust_session->consumer);
} else {
assert(0);
}
+ assert(base_path);
+ snprintf(session->rotation_chunk.current_rotate_path,
+ PATH_MAX, "%s",
+ base_path);
+ fprintf(stderr, "b: %s\n", base_path);
} else {
/*
* The currently active tracing path is now the folder we
/* The active path for the next rotation/destroy. */
snprintf(session->rotation_chunk.active_tracing_path,
PATH_MAX, "%s/%s-",
- session->kernel_session->consumer->dst.session_root_path,
+ get_base_path(session, session->kernel_session->consumer),
datetime);
/* The sub-directory for the consumer. */
snprintf(session->kernel_session->consumer->chunk_path,
if (session->ust_session) {
snprintf(session->rotation_chunk.active_tracing_path,
PATH_MAX, "%s/%s-",
- session->ust_session->consumer->dst.session_root_path,
+ get_base_path(session, session->ust_session->consumer),
datetime);
snprintf(session->ust_session->consumer->chunk_path,
PATH_MAX, "/%s-", datetime);
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
- uint32_t create, uid_t uid, gid_t gid)
+ uid_t uid, gid_t gid)
{
int ret;
struct lttcomm_consumer_msg msg;
memset(&msg, 0, sizeof(msg));
msg.cmd_type = LTTNG_CONSUMER_ROTATE_RENAME;
msg.u.rotate_rename.session_id = session_id;
- msg.u.rotate_rename.create = create;
msg.u.rotate_rename.uid = uid;
msg.u.rotate_rename.gid = gid;
+ snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s", current_path);
+ snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s", new_path);
+ fprintf(stderr, "rotate rename from %s to %s\n", current_path,
+ new_path);
+
if (output->type == CONSUMER_DST_NET) {
- fprintf(stderr, "SUBDIR: %s\n", output->subdir);
- fprintf(stderr, "SUBDIR: %s\n", output->dst.net.control.subdir);
- ERR("TODO");
- ret = -1;
msg.u.rotate_rename.relayd_id = output->net_seq_index;
- goto error;
} else {
msg.u.rotate_rename.relayd_id = (uint64_t) -1ULL;
- snprintf(msg.u.rotate_rename.current_path, PATH_MAX, "%s",
- current_path);
- snprintf(msg.u.rotate_rename.new_path, PATH_MAX, "%s",
- new_path);
- fprintf(stderr, "rotate rename from %s to %s\n", current_path,
- new_path);
}
health_code_update();
char *tmp, uint32_t metadata);
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
- uint32_t create, uid_t uid, gid_t gid);
+ uid_t uid, gid_t gid);
#endif /* _CONSUMER_H */
}
int session_rename_chunk(struct ltt_session *session, char *current_path,
- char *new_path, uint32_t create)
+ char *new_path)
{
int ret;
struct consumer_socket *socket;
cds_lfht_for_each_entry(output->socks->ht, &iter.iter, socket, node.node) {
pthread_mutex_lock(socket->lock);
ret = consumer_rotate_rename(socket, session->id, output,
- current_path, new_path, create, uid, gid);
+ current_path, new_path, uid, gid);
pthread_mutex_unlock(socket->lock);
if (ret) {
ERR("Consumer rename chunk");
}
/* Current domain path: <session>/kernel */
- snprintf(tmppath, PATH_MAX, "%s/%s",
- consumer->dst.session_root_path, consumer->subdir);
+ if (session->net_handle > 0) {
+ snprintf(tmppath, PATH_MAX, "%s/%s",
+ consumer->dst.net.base_dir, consumer->subdir);
+ } else {
+ snprintf(tmppath, PATH_MAX, "%s/%s",
+ consumer->dst.session_root_path, consumer->subdir);
+ }
/* New domain path: <session>/<start-date>-<end-date>-<rotate-count>/kernel */
snprintf(tmppath2, PATH_MAX, "%s/%s",
new_path, consumer->subdir);
+ fprintf(stderr, "A: %s, B: %s, C: %s\n",
+ consumer->dst.net.base_dir, consumer->subdir, new_path);
/*
* Move the per-domain folder inside the first rotation
* folder.
*/
- ret = session_rename_chunk(session, tmppath, tmppath2, 1);
+ ret = session_rename_chunk(session, tmppath, tmppath2);
if (ret < 0) {
ERR("Rename first trace directory");
ret = -LTTNG_ERR_ROTATE_NO_DATA;
ret = session_rename_chunk(session,
session->rotation_chunk.current_rotate_path,
- new_path, 0);
+ new_path);
if (ret) {
ERR("Session rename");
ret = 0;
struct ltt_session *session);
int session_rename_chunk(struct ltt_session *session, char *current_path,
- char *new_path, uint32_t create);
+ char *new_path);
int rename_complete_chunk(struct ltt_session *session, time_t ts);
return ret;
}
-int lttng_consumer_rotate_rename(char *current_path, char *new_path,
- uint32_t create, uid_t uid, gid_t gid)
+static
+int rotate_rename_local(char *current_path, char *new_path,
+ uid_t uid, gid_t gid)
{
int ret;
- if (create) {
- ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG,
- uid, gid);
- if (ret < 0) {
- ERR("Create directory on rotate");
- goto end;
- }
-
+ ret = utils_mkdir_recursive(new_path, S_IRWXU | S_IRWXG,
+ uid, gid);
+ if (ret < 0) {
+ ERR("Create directory on rotate");
+ goto end;
}
+
ret = rename(current_path, new_path);
/*
* If a domain has not yet created its channel, the domain-specific
end:
return ret;
}
+
+static
+int rotate_rename_relay(char *current_path, char *new_path, uint64_t relayd_id)
+{
+ int ret;
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd");
+ ret = -1;
+ goto end;
+ }
+
+ ret = relayd_rotate_rename(&relayd->control_sock, current_path, new_path);
+
+end:
+ return ret;
+
+}
+
+int lttng_consumer_rotate_rename(char *current_path, char *new_path,
+ uid_t uid, gid_t gid, uint64_t relayd_id)
+{
+ if (relayd_id != (uint64_t) -1ULL) {
+ return rotate_rename_relay(current_path, new_path, relayd_id);
+ } else {
+ return rotate_rename_local(current_path, new_path, uid, gid);
+ }
+}
int lttng_consumer_rotate_ready_streams(uint64_t key,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_rotate_rename(char *current_path, char *new_path,
- uint32_t create, uid_t uid, gid_t gid);
+ uid_t uid, gid_t gid, uint64_t relayd_id);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
#endif /* LIB_CONSUMER_H */
msg.u.rotate_rename.session_id);
ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.current_path,
msg.u.rotate_rename.new_path,
- msg.u.rotate_rename.create,
msg.u.rotate_rename.uid,
- msg.u.rotate_rename.gid);
+ msg.u.rotate_rename.gid,
+ msg.u.rotate_rename.relayd_id);
if (ret < 0) {
ERR("Rotate rename failed");
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
DBG("Relayd rotated stream id %" PRIu64 " successfully", stream_id);
+error:
+ return ret;
+}
+
+int relayd_rotate_rename(struct lttcomm_relayd_sock *rsock,
+ const char *current_path, const char *new_path)
+{
+ int ret;
+ struct lttcomm_relayd_rotate_rename msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(rsock);
+
+ DBG("Relayd rename chunk %s to %s", current_path, new_path);
+
+ memset(&msg, 0, sizeof(msg));
+ if (lttng_strncpy(msg.current_path, current_path,
+ sizeof(msg.current_path))) {
+ ret = -1;
+ goto error;
+ }
+ if (lttng_strncpy(msg.new_path, new_path,
+ sizeof(msg.new_path))) {
+ ret = -1;
+ goto error;
+ }
+
+ /* Send command */
+ ret = send_command(rsock, RELAYD_ROTATE_RENAME, (void *) &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Receive response */
+ ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -1;
+ ERR("Relayd rotate rename replied error %d", reply.ret_code);
+ } else {
+ /* Success */
+ ret = 0;
+ }
+
+ DBG("Relayd rotate rename completed successfully");
+
error:
return ret;
uint64_t stream_id, uint64_t version);
int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
const char *new_pathname);
+int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
+ const char *current_path, const char *new_path);
#endif /* _RELAYD_H */
char new_pathname[LTTNG_PATH_MAX];
} LTTNG_PACKED;
+struct lttcomm_relayd_rotate_rename {
+ char current_path[LTTNG_PATH_MAX];
+ char new_path[LTTNG_PATH_MAX];
+} LTTNG_PACKED;
+
#endif /* _RELAYD_COMM */
RELAYD_RESET_METADATA = 17,
/* Ask the relay to rotate a stream file (2.11+) */
RELAYD_ROTATE_STREAM = 18,
+ /* Rename a chunk after the rotation is completed (2.11+) */
+ RELAYD_ROTATE_RENAME = 19,
};
/*
msg.u.rotate_rename.session_id);
ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.current_path,
msg.u.rotate_rename.new_path,
- msg.u.rotate_rename.create,
msg.u.rotate_rename.uid,
- msg.u.rotate_rename.gid);
+ msg.u.rotate_rename.gid,
+ msg.u.rotate_rename.relayd_id);
if (ret < 0) {
ERR("Rotate rename failed");
ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;