From: Julien Desfossez Date: Wed, 6 Sep 2017 20:30:27 +0000 (-0400) Subject: start to handle async rotation on the relay + basics for remote rotate_pending X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=commitdiff_plain;h=2d7d73bc2e90de495e009cc2f864f271244d5d96 start to handle async rotation on the relay + basics for remote rotate_pending Signed-off-by: Julien Desfossez --- diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index c1b044f57..c6d61a186 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1507,6 +1507,100 @@ end: return ret; } +static +int rotate_index_file(struct relay_stream *stream) +{ + int ret; + uint32_t major, minor; + + /* Put ref on previous index_file. */ + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = NULL; + } + major = stream->trace->session->major; + minor = stream->trace->session->minor; + stream->index_file = lttng_index_file_create(stream->path_name, + stream->channel_name, + -1, -1, stream->tracefile_size, + tracefile_array_get_file_index_head(stream->tfa), + lttng_to_index_major(major, minor), + lttng_to_index_minor(major, minor)); + if (!stream->index_file) { + ret = -1; + goto end; + } + + ret = 0; + +end: + return ret; +} + +/* + * Check if a stream should perform a rotation (for session rotation). + * Must be called with the stream lock held. + * + * Return 0 on success, a negative value on error. + */ +static +int check_rotate_stream(struct relay_stream *stream) +{ + int ret; + + /* No rotation expected */ + if (stream->rotate_at_seq_num == -1ULL) { + ret = 0; + goto end; + } + + if (stream->prev_seq < stream->rotate_at_seq_num) { + DBG("Stream %" PRIu64 " no yet ready for rotation", + stream->stream_handle); + fprintf(stderr, "Stream %" PRIu64 " no yet ready for rotation\n", + stream->stream_handle); + ret = 0; + goto end; + } else if (stream->prev_seq > stream->rotate_at_seq_num) { + DBG("Rotation after too much data has been written in tracefile " + "for stream %" PRIu64 ", need to truncate before " + "rotating", stream->stream_handle); + fprintf(stderr, "Rotation after too much data has been written in tracefile " + "for stream %" PRIu64 ", need to truncate before " + "rotating\n", stream->stream_handle); + /* TODO */ + } else { + DBG("Stream %" PRIu64 " ready for rotation", stream->stream_handle); + fprintf(stderr, "Stream %" PRIu64 " ready for rotation\n", stream->stream_handle); + } + + /* Perform the stream rotation. */ + ret = utils_rotate_stream_file(stream->path_name, + stream->channel_name, stream->tracefile_size, + stream->tracefile_count, -1, + -1, stream->stream_fd->fd, + NULL, &stream->stream_fd->fd); + if (ret < 0) { + ERR("Rotating stream output file"); + goto end; + } + stream->tracefile_size_current = 0; + + /* Rotate also the index if the stream is not a metadata stream. */ + if (!stream->is_metadata) { + ret = rotate_index_file(stream); + if (ret < 0) { + ERR("Failed to rotate index file"); + goto end; + } + } + + stream->rotate_at_seq_num = -1ULL; + +end: + return ret; +} + /* * relay_recv_metadata: receive the metadata for the session. */ @@ -1590,6 +1684,12 @@ static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); + ret = check_rotate_stream(metadata_stream); + if (ret < 0) { + ERR("Check rotate stream"); + goto end_put; + } + end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -2122,36 +2222,6 @@ end_no_session: return ret; } -static -int rotate_index_file(struct relay_stream *stream) -{ - int ret; - uint32_t major, minor; - - /* Put ref on previous index_file. */ - if (stream->index_file) { - lttng_index_file_put(stream->index_file); - stream->index_file = NULL; - } - major = stream->trace->session->major; - minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, - stream->channel_name, - -1, -1, stream->tracefile_size, - tracefile_array_get_file_index_head(stream->tfa), - lttng_to_index_major(major, minor), - lttng_to_index_minor(major, minor)); - if (!stream->index_file) { - ret = -1; - goto end; - } - - ret = 0; - -end: - return ret; -} - /* * relay_rotate_stream: rotate a stream to a new tracefile for the session * rotation feature (not the tracefile rotation feature). @@ -2208,8 +2278,10 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - fprintf(stderr, "Rotating stream %lu to %s/\n", - be64toh(stream_info.stream_id), stream_info.new_pathname); + fprintf(stderr, "Stream %lu needs to rotate to %s (%lu vs %lu)/\n", + be64toh(stream_info.stream_id), stream_info.new_pathname, + stream->prev_seq, be64toh(stream_info.rotate_at_seq_num)); + fprintf(stderr, "metadata: %d\n", stream->is_metadata); pthread_mutex_lock(&stream->lock); @@ -2226,25 +2298,15 @@ static int relay_rotate_session_stream(struct lttcomm_relayd_hdr *recv_hdr, ERR("relay creating output directory"); goto end; } - ret = utils_rotate_stream_file(stream->path_name, - stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, - -1, stream->stream_fd->fd, - NULL, &stream->stream_fd->fd); + stream->rotate_at_seq_num = be64toh(stream_info.rotate_at_seq_num); + stream->chunk_id = be64toh(stream_info.new_chunk_id); + /* FIXME: YOU ARE HERE + * metadata stream does not have a valid seq_num, but is sent on the control connection */ + ret = check_rotate_stream(stream); if (ret < 0) { - ERR("Rotating stream output file"); + ERR("Check rotate stream"); goto end_stream_unlock; } - stream->tracefile_size_current = 0; - - /* Rotate also the index if the stream is not a metadata stream. */ - if (!stream->is_metadata) { - ret = rotate_index_file(stream); - if (ret < 0) { - ERR("Failed to rotate index file"); - goto end_stream_unlock; - } - } end_stream_unlock: pthread_mutex_unlock(&stream->lock); @@ -2643,6 +2705,12 @@ static int relay_process_data(struct relay_connection *conn) stream->prev_seq = net_seq_num; + ret = check_rotate_stream(stream); + if (ret < 0) { + ERR("Check rotate stream"); + goto end_stream_unlock; + } + end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); diff --git a/src/bin/lttng-relayd/stream.c b/src/bin/lttng-relayd/stream.c index 2bc815813..5ed37c58e 100644 --- a/src/bin/lttng-relayd/stream.c +++ b/src/bin/lttng-relayd/stream.c @@ -88,6 +88,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace, stream->tracefile_count = tracefile_count; stream->path_name = path_name; stream->channel_name = channel_name; + stream->rotate_at_seq_num = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); urcu_ref_init(&stream->ref); diff --git a/src/bin/lttng-relayd/stream.h b/src/bin/lttng-relayd/stream.h index e385032cb..e9ddb5f21 100644 --- a/src/bin/lttng-relayd/stream.h +++ b/src/bin/lttng-relayd/stream.h @@ -129,6 +129,22 @@ struct relay_stream { struct lttng_ht_node_u64 node; bool in_stream_ht; /* is stream in stream hash table. */ struct rcu_head rcu_node; /* For call_rcu teardown. */ + /* + * When we have written the data and index corresponding to this + * seq_num, rotate the tracefile (session rotation). The path_name is + * already up-to-date. + * This is set to -1ULL when no rotation is pending. Always + * read/updated with stream lock held. + */ + uint64_t rotate_at_seq_num; + /* + * This is the id of the chunk where we are writing to if no rotate is + * pending (rotate_at_seq_num == -1ULL). If a rotate is pending, this + * is the chunk_id we will have after the rotation. It must be updated + * atomically with rotate_at_seq_num. Always read/updated with stream + * lock held. + */ + uint64_t chunk_id; }; struct relay_stream *stream_create(struct ctf_trace *trace, diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 23103ffa7..537090b70 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -1594,10 +1594,14 @@ end: /* * Ask the consumer to rotate a channel. * app_pathname only used for UST, it contains the path after /ust/. + * + * The new_chunk_id is the session->rotate_count that has been incremented + * when the rotation started. On the relay, this allows to keep track in which + * chunk each stream is currently writing to (for the rotate_pending operation). */ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - char *app_pathname, uint32_t metadata) + char *app_pathname, uint32_t metadata, uint64_t new_chunk_id) { int ret; struct lttcomm_consumer_msg msg; @@ -1611,6 +1615,7 @@ int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, msg.cmd_type = LTTNG_CONSUMER_ROTATE_CHANNEL; msg.u.rotate_channel.key = key; msg.u.rotate_channel.metadata = metadata; + msg.u.rotate_channel.new_chunk_id = new_chunk_id; if (output->type == CONSUMER_DST_NET) { fprintf(stderr, "BASE: %s\n", output->dst.net.base_dir); diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 66ee67310..ae303222e 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -326,7 +326,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key, uid_t uid, gid_t gid, struct consumer_output *output, - char *tmp, uint32_t metadata); + char *tmp, uint32_t metadata, uint64_t new_chunk_id); int consumer_rotate_rename(struct consumer_socket *socket, uint64_t session_id, struct consumer_output *output, char *current_path, char *new_path, uid_t uid, gid_t gid); diff --git a/src/bin/lttng-sessiond/kernel.c b/src/bin/lttng-sessiond/kernel.c index ef5733c66..43fc93553 100644 --- a/src/bin/lttng-sessiond/kernel.c +++ b/src/bin/lttng-sessiond/kernel.c @@ -1179,7 +1179,8 @@ int kernel_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, chan->fd, ksess->uid, ksess->gid, ksess->consumer, - "", 0); + "", 0, + session->rotate_count); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; pthread_mutex_unlock(socket->lock); @@ -1194,7 +1195,8 @@ int kernel_rotate_session(struct ltt_session *session) */ pthread_mutex_lock(socket->lock); ret = consumer_rotate_channel(socket, ksess->metadata->fd, - ksess->uid, ksess->gid, ksess->consumer, "", 1); + ksess->uid, ksess->gid, ksess->consumer, "", 1, + session->rotate_count); if (ret < 0) { ret = LTTNG_ERR_KERN_CONSUMER_FAIL; pthread_mutex_unlock(socket->lock); diff --git a/src/bin/lttng-sessiond/ust-app.c b/src/bin/lttng-sessiond/ust-app.c index 368e58667..2283f980a 100644 --- a/src/bin/lttng-sessiond/ust-app.c +++ b/src/bin/lttng-sessiond/ust-app.c @@ -6359,7 +6359,8 @@ int ust_app_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, reg_chan->consumer_key, usess->uid, usess->gid, - usess->consumer, pathname, 0); + usess->consumer, pathname, 0, + session->rotate_count); if (ret < 0) { goto error; } @@ -6370,7 +6371,8 @@ int ust_app_rotate_session(struct ltt_session *session) ret = consumer_rotate_channel(socket, reg->registry->reg.ust->metadata_key, usess->uid, usess->gid, - usess->consumer, pathname, 1); + usess->consumer, pathname, 1, + session->rotate_count); if (ret < 0) { goto error; } @@ -6441,7 +6443,8 @@ int ust_app_rotate_session(struct ltt_session *session) } ret = consumer_rotate_channel(socket, ua_chan->key, ua_sess->euid, ua_sess->egid, - ua_sess->consumer, pathname, 0); + ua_sess->consumer, pathname, 0, + session->rotate_count); if (ret < 0) { goto error; } @@ -6451,7 +6454,8 @@ int ust_app_rotate_session(struct ltt_session *session) (void) push_metadata(registry, usess->consumer); ret = consumer_rotate_channel(socket, registry->metadata_key, ua_sess->euid, ua_sess->egid, - ua_sess->consumer, pathname, 1); + ua_sess->consumer, pathname, 1, + session->rotate_count); if (ret < 0) { goto error; } diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index f187584f3..f8b25665f 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3936,7 +3936,7 @@ end: * Returns 0 on success, < 0 on error */ int lttng_consumer_rotate_channel(uint64_t key, char *path, - uint64_t relayd_id, uint32_t metadata, + uint64_t relayd_id, uint32_t metadata, uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx) { int ret; @@ -3956,6 +3956,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, goto end; } pthread_mutex_lock(&channel->lock); + channel->current_chunk_id = new_chunk_id; snprintf(channel->pathname, PATH_MAX, "%s", path); ret = utils_mkdir_recursive(channel->pathname, S_IRWXU | S_IRWXG, channel->uid, channel->gid); @@ -4163,8 +4164,11 @@ int rotate_relay_stream(struct lttng_consumer_local_data *ctx, goto end; } + /* FIXME: chan_ro ? */ ret = relayd_rotate_stream(&relayd->control_sock, - stream->relayd_stream_id, stream->channel_ro_pathname); + stream->relayd_stream_id, stream->channel_ro_pathname, + stream->chan->current_chunk_id, + stream->last_sequence_number); end: return ret; diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 92552504e..63254007a 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -236,6 +236,13 @@ struct lttng_consumer_channel { * daemon that this channel has finished its rotation. */ uint64_t nr_stream_rotate_pending; + + /* + * The chunk id where we currently write the data. This value is sent + * to the relay when we add a stream and when a stream rotates. This + * allows to keep track of where each stream on the relay is writing. + */ + uint64_t current_chunk_id; }; /* @@ -818,7 +825,7 @@ void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(uint64_t key, char *path, uint64_t relayd_id, uint32_t metadata, - struct lttng_consumer_local_data *ctx); + uint64_t new_chunk_id, struct lttng_consumer_local_data *ctx); int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream, unsigned long len); int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bdca67087..4fb586c08 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1141,6 +1141,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, + msg.u.rotate_channel.new_chunk_id, ctx); if (ret < 0) { ERR("Rotate channel failed"); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index dad845d5c..6e3e91bfe 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -979,7 +979,8 @@ error: } int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, - const char *new_pathname) + const char *new_pathname, uint64_t new_chunk_id, + uint64_t seq_num) { int ret; struct lttcomm_relayd_rotate_stream msg; @@ -992,6 +993,8 @@ int relayd_rotate_stream(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, memset(&msg, 0, sizeof(msg)); msg.stream_id = htobe64(stream_id); + msg.rotate_at_seq_num = htobe64(seq_num); + msg.new_chunk_id = htobe64(new_chunk_id); if (lttng_strncpy(msg.new_pathname, new_pathname, sizeof(msg.new_pathname))) { ret = -1; diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index 4c5c2daf5..f94a8e227 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -53,7 +53,7 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); int relayd_rotate_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, - const char *new_pathname); + const char *new_pathname, uint64_t new_chunk_id, uint64_t seq_num); int relayd_rotate_rename(struct lttcomm_relayd_sock *sock, const char *current_path, const char *new_path); diff --git a/src/common/sessiond-comm/relayd.h b/src/common/sessiond-comm/relayd.h index bc754ee5c..921ff182a 100644 --- a/src/common/sessiond-comm/relayd.h +++ b/src/common/sessiond-comm/relayd.h @@ -209,6 +209,8 @@ struct lttcomm_relayd_reset_metadata { struct lttcomm_relayd_rotate_stream { uint64_t stream_id; + uint64_t rotate_at_seq_num; + uint64_t new_chunk_id; char new_pathname[LTTNG_PATH_MAX]; } LTTNG_PACKED; diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index c9c39d1f1..5e045be45 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -549,6 +549,7 @@ struct lttcomm_consumer_msg { uint32_t metadata; /* This is a metadata channel. */ uint64_t relayd_id; /* Relayd id if apply. */ uint64_t key; + uint64_t new_chunk_id; } LTTNG_PACKED rotate_channel; struct { char current_path[PATH_MAX]; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 3a67c7d5b..2a02568b0 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -1939,6 +1939,7 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, msg.u.rotate_channel.pathname, msg.u.rotate_channel.relayd_id, msg.u.rotate_channel.metadata, + msg.u.rotate_channel.new_chunk_id, ctx); if (ret < 0) { ERR("Rotate channel failed");