return ret;
}
+static
+int rotate_index_file(struct relay_stream *stream)
+{
+ int ret;
+ uint32_t major, minor;
+
+ /* Put ref on previous index_file. */
+ if (stream->index_file) {
+ lttng_index_file_put(stream->index_file);
+ stream->index_file = NULL;
+ }
+ major = stream->trace->session->major;
+ minor = stream->trace->session->minor;
+ stream->index_file = lttng_index_file_create(stream->path_name,
+ stream->channel_name,
+ -1, -1, stream->tracefile_size,
+ tracefile_array_get_file_index_head(stream->tfa),
+ lttng_to_index_major(major, minor),
+ lttng_to_index_minor(major, minor));
+ if (!stream->index_file) {
+ ret = -1;
+ goto end;
+ }
+
+ ret = 0;
+
+end:
+ return ret;
+}
+
+/*
+ * Check if a stream should perform a rotation (for session rotation).
+ * Must be called with the stream lock held.
+ *
+ * Return 0 on success, a negative value on error.
+ */
+static
+int check_rotate_stream(struct relay_stream *stream)
+{
+ int ret;
+
+ /* No rotation expected */
+ if (stream->rotate_at_seq_num == -1ULL) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->prev_seq < stream->rotate_at_seq_num) {
+ DBG("Stream %" PRIu64 " no yet ready for rotation",
+ stream->stream_handle);
+ fprintf(stderr, "Stream %" PRIu64 " no yet ready for rotation\n",
+ stream->stream_handle);
+ ret = 0;
+ goto end;
+ } else if (stream->prev_seq > stream->rotate_at_seq_num) {
+ DBG("Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating", stream->stream_handle);
+ fprintf(stderr, "Rotation after too much data has been written in tracefile "
+ "for stream %" PRIu64 ", need to truncate before "
+ "rotating\n", stream->stream_handle);
+ /* TODO */
+ } else {
+ DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle);
+ fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle);
+ }
+
+ /* Perform the stream rotation. */
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, -1,
+ -1, stream->stream_fd->fd,
+ NULL, &stream->stream_fd->fd);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end;
+ }
+ stream->tracefile_size_current = 0;
+
+ /* Rotate also the index if the stream is not a metadata stream. */
+ if (!stream->is_metadata) {
+ ret = rotate_index_file(stream);
+ if (ret < 0) {
+ ERR("Failed to rotate index file");
+ goto end;
+ }
+ }
+
+ stream->rotate_at_seq_num = -1ULL;
+
+end:
+ return ret;
+}
+
/*
* relay_recv_metadata: receive the metadata for the session.
*/
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
+ ret = check_rotate_stream(metadata_stream);
+ if (ret < 0) {
+ ERR("Check rotate stream");
+ goto end_put;
+ }
+
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
return ret;
}
-static
-int rotate_index_file(struct relay_stream *stream)
-{
- int ret;
- uint32_t major, minor;
-
- /* Put ref on previous index_file. */
- if (stream->index_file) {
- lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
- }
- major = stream->trace->session->major;
- minor = stream->trace->session->minor;
- stream->index_file = lttng_index_file_create(stream->path_name,
- stream->channel_name,
- -1, -1, stream->tracefile_size,
- tracefile_array_get_file_index_head(stream->tfa),
- lttng_to_index_major(major, minor),
- lttng_to_index_minor(major, minor));
- if (!stream->index_file) {
- ret = -1;
- goto end;
- }
-
- ret = 0;
-
-end:
- return ret;
-}
-
/*
* relay_rotate_stream: rotate a stream to a new tracefile for the session
* rotation feature (not the tracefile rotation feature).
goto end;
}
- fprintf(stderr, "Rotating stream %lu to %s/\n",
- be64toh(stream_info.stream_id), stream_info.new_pathname);
+ fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n",
+ be64toh(stream_info.stream_id), stream_info.new_pathname,
+ stream->prev_seq, be64toh(stream_info.rotate_at_seq_num));
+ fprintf(stderr, "metadata: %d\n", stream->is_metadata);
pthread_mutex_lock(&stream->lock);
ERR("relay creating output directory");
goto end;
}
- ret = utils_rotate_stream_file(stream->path_name,
- stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- NULL, &stream->stream_fd->fd);
+ stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num);
+ stream->chunk_id = be64toh(stream_info.new_chunk_id);
+ /* FIXME: YOU ARE HERE
+ * metadata stream does not have a valid seq_num, but is sent on the control connection */
+ ret = check_rotate_stream(stream);
if (ret < 0) {
- ERR("Rotating stream output file");
+ ERR("Check rotate stream");
goto end_stream_unlock;
}
- stream->tracefile_size_current = 0;
-
- /* Rotate also the index if the stream is not a metadata stream. */
- if (!stream->is_metadata) {
- ret = rotate_index_file(stream);
- if (ret < 0) {
- ERR("Failed to rotate index file");
- goto end_stream_unlock;
- }
- }
end_stream_unlock:
pthread_mutex_unlock(&stream->lock);
stream->prev_seq = net_seq_num;
+ ret = check_rotate_stream(stream);
+ if (ret < 0) {
+ ERR("Check rotate stream");
+ goto end_stream_unlock;
+ }
+
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
stream->tracefile_count = tracefile_count;
stream->path_name = path_name;
stream->channel_name = channel_name;
+ stream->rotate_at_seq_num = -1ULL;
lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
pthread_mutex_init(&stream->lock, NULL);
urcu_ref_init(&stream->ref);
struct lttng_ht_node_u64 node;
bool in_stream_ht; /* is stream in stream hash table. */
struct rcu_head rcu_node; /* For call_rcu teardown. */
+ /*
+ * When we have written the data and index corresponding to this
+ * seq_num, rotate the tracefile (session rotation). The path_name is
+ * already up-to-date.
+ * This is set to -1ULL when no rotation is pending. Always
+ * read/updated with stream lock held.
+ */
+ uint64_t rotate_at_seq_num;
+ /*
+ * This is the id of the chunk where we are writing to if no rotate is
+ * pending (rotate_at_seq_num == -1ULL). If a rotate is pending, this
+ * is the chunk_id we will have after the rotation. It must be updated
+ * atomically with rotate_at_seq_num. Always read/updated with stream
+ * lock held.
+ */
+ uint64_t chunk_id;
};
struct relay_stream *stream_create(struct ctf_trace *trace,
/*
* Ask the consumer to rotate a channel.
* app_pathname only used for UST, it contains the path after /ust/.
+ *
+ * The new_chunk_id is the session->rotate_count that has been incremented
+ * when the rotation started. On the relay, this allows to keep track in which
+ * chunk each stream is currently writing to (for the rotate_pending operation).
*/
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *app_pathname, uint32_t metadata)
+ char *app_pathname, uint32_t metadata, uint64_t new_chunk_id)
{
int ret;
struct lttcomm_consumer_msg msg;
msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL;
msg.u.rotate_channel.key = key;
msg.u.rotate_channel.metadata = metadata;
+ msg.u.rotate_channel.new_chunk_id = new_chunk_id;
if (output->type == CONSUMER_DST_NET) {
fprintf(stderr, "BASE: %s\n", output->dst.net.base_dir);
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *tmp, uint32_t metadata);
+ char *tmp, uint32_t metadata, uint64_t new_chunk_id);
int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id,
struct consumer_output *output, char *current_path, char *new_path,
uid_t uid, gid_t gid);
ret = consumer_rotate_channel(socket, chan->fd,
ksess->uid, ksess->gid, ksess->consumer,
- "", 0);
+ "", 0,
+ session->rotate_count);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
pthread_mutex_unlock(socket->lock);
*/
pthread_mutex_lock(socket->lock);
ret = consumer_rotate_channel(socket, ksess->metadata->fd,
- ksess->uid, ksess->gid, ksess->consumer, "", 1);
+ ksess->uid, ksess->gid, ksess->consumer, "", 1,
+ session->rotate_count);
if (ret < 0) {
ret = LTTNG_ERR_KERN_CONSUMER_FAIL;
pthread_mutex_unlock(socket->lock);
ret = consumer_rotate_channel(socket,
reg_chan->consumer_key,
usess->uid, usess->gid,
- usess->consumer, pathname, 0);
+ usess->consumer, pathname, 0,
+ session->rotate_count);
if (ret < 0) {
goto error;
}
ret = consumer_rotate_channel(socket,
reg->registry->reg.ust->metadata_key,
usess->uid, usess->gid,
- usess->consumer, pathname, 1);
+ usess->consumer, pathname, 1,
+ session->rotate_count);
if (ret < 0) {
goto error;
}
}
ret = consumer_rotate_channel(socket, ua_chan->key,
ua_sess->euid, ua_sess->egid,
- ua_sess->consumer, pathname, 0);
+ ua_sess->consumer, pathname, 0,
+ session->rotate_count);
if (ret < 0) {
goto error;
}
(void) push_metadata(registry, usess->consumer);
ret = consumer_rotate_channel(socket, registry->metadata_key,
ua_sess->euid, ua_sess->egid,
- ua_sess->consumer, pathname, 1);
+ ua_sess->consumer, pathname, 1,
+ session->rotate_count);
if (ret < 0) {
goto error;
}
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_channel(uint64_t key, char *path,
- uint64_t relayd_id, uint32_t metadata,
+ uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id,
struct lttng_consumer_local_data *ctx)
{
int ret;
goto end;
}
pthread_mutex_lock(&channel->lock);
+ channel->current_chunk_id = new_chunk_id;
snprintf(channel->pathname, PATH_MAX, "%s", path);
ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG,
channel->uid, channel->gid);
goto end;
}
+ /* FIXME: chan_ro ? */
ret = relayd_rotate_stream(&relayd->control_sock,
- stream->relayd_stream_id, stream->channel_ro_pathname);
+ stream->relayd_stream_id, stream->channel_ro_pathname,
+ stream->chan->current_chunk_id,
+ stream->last_sequence_number);
end:
return ret;
* daemon that this channel has finished its rotation.
*/
uint64_t nr_stream_rotate_pending;
+
+ /*
+ * The chunk id where we currently write the data. This value is sent
+ * to the relay when we add a stream and when a stream rotates. This
+ * allows to keep track of where each stream on the relay is writing.
+ */
+ uint64_t current_chunk_id;
};
/*
int consumer_create_index_file(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_channel(uint64_t key, char *path,
uint64_t relayd_id, uint32_t metadata,
- struct lttng_consumer_local_data *ctx);
+ uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx);
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
unsigned long len);
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
msg.u.rotate_channel.pathname,
msg.u.rotate_channel.relayd_id,
msg.u.rotate_channel.metadata,
+ msg.u.rotate_channel.new_chunk_id,
ctx);
if (ret < 0) {
ERR("Rotate channel failed");
}
int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id,
- const char *new_pathname)
+ const char *new_pathname, uint64_t new_chunk_id,
+ uint64_t seq_num)
{
int ret;
struct lttcomm_relayd_rotate_stream msg;
memset(&msg, 0, sizeof(msg));
msg.stream_id = htobe64(stream_id);
+ msg.rotate_at_seq_num = htobe64(seq_num);
+ msg.new_chunk_id = htobe64(new_chunk_id);
if (lttng_strncpy(msg.new_pathname, new_pathname,
sizeof(msg.new_pathname))) {
ret = -1;
int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
uint64_t stream_id, uint64_t version);
int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id,
- const char *new_pathname);
+ const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num);
int relayd_rotate_rename(struct lttcomm_relayd_sock *sock,
const char *current_path, const char *new_path);
struct lttcomm_relayd_rotate_stream {
uint64_t stream_id;
+ uint64_t rotate_at_seq_num;
+ uint64_t new_chunk_id;
char new_pathname[LTTNG_PATH_MAX];
} LTTNG_PACKED;
uint32_t metadata; /* This is a metadata channel. */
uint64_t relayd_id; /* Relayd id if apply. */
uint64_t key;
+ uint64_t new_chunk_id;
} LTTNG_PACKED rotate_channel;
struct {
char current_path[PATH_MAX];
msg.u.rotate_channel.pathname,
msg.u.rotate_channel.relayd_id,
msg.u.rotate_channel.metadata,
+ msg.u.rotate_channel.new_chunk_id,
ctx);
if (ret < 0) {
ERR("Rotate channel failed");