From: Julien Desfossez Date: Thu, 24 Aug 2017 21:42:10 +0000 (-0400) Subject: check if the stream is ready for rotation between get_next and put_next X-Git-Url: http://git.efficios.com/?p=deliverable%2Flttng-tools.git;a=commitdiff_plain;h=c1b7c694190d2a182f826b2123f78b7cfbc20427 check if the stream is ready for rotation between get_next and put_next This will allow to send the extended data header to the relay and rotate as soon as the current packet is written to the trace file. Signed-off-by: Julien Desfossez --- diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 40c422953..fa93ab813 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -4025,13 +4025,18 @@ end: } /* - * Performs the stream rotation for the rotate session feature if needed. - * It must be called with the stream and channel locks held. + * Check if a stream is ready to be rotated after extracting it. * - * Return 0 on success, a negative number of error. + * When we are called between get_next_subbuf and put_next_subbuf, the len + * parameter is the subbuf size of the current subbuffer being extracted. This + * len is with padding, so it is normal to see that the current position is + * farther than the expected rotate position. + * + * Return 1 if it is ready for rotation, 0 if it is not, a negative value on + * error. */ -int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream) +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream, + unsigned long len) { int ret; unsigned long consumed_pos; @@ -4049,28 +4054,46 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Taking kernel snapshot positions"); - goto error; + goto end; } ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Produced kernel snapshot position"); - goto error; + goto end; } fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos); /* Rotate position not reached yet. */ - if (consumed_pos < stream->rotate_position) { + if ((consumed_pos + len) < stream->rotate_position) { ret = 0; goto end; } fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n", - consumed_pos, stream->rotate_position, stream->key); + consumed_pos + len, stream->rotate_position, + stream->key); + ret = 1; } else { fprintf(stderr, "Rotate position reached for stream %lu\n", stream->key); + ret = 1; } +end: + return ret; +} + +/* + * Performs the stream rotation for the rotate session feature if needed. + * It must be called with the stream and channel locks held. + * + * Return 0 on success, a negative number of error. + */ +int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream) +{ + int ret; + ret = close(stream->out_fd); if (ret < 0) { PERROR("Closing tracefile"); diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 3aa63eed1..7f09bf870 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -818,6 +818,8 @@ int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(uint64_t key, char *path, uint64_t relayd_id, uint32_t metadata, struct lttng_consumer_local_data *ctx); +int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream, + unsigned long len); int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream); int lttng_consumer_rotate_ready_streams(uint64_t key, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index bbce28ccc..2f15f6a25 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1432,7 +1432,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1, rotation_ret, rotate_ready; ssize_t ret = 0; int infd = stream->wait_fd; struct ctf_packet_index index; @@ -1473,6 +1473,24 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto error; } + rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len); + if (rotate_ready < 0) { + ERR("Failed to check if stream is ready for rotation"); + err = kernctl_put_subbuf(infd); + if (err != 0) { + if (err == -EFAULT) { + PERROR("Error in unreserving sub buffer\n"); + } else if (err == -EIO) { + /* Should never happen with newer LTTng versions */ + PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); + } + ret = err; + goto error; + } + ret = -1; + goto error; + } + if (!stream->metadata_flag) { ret = get_index_values(&index, infd); if (ret < 0) { @@ -1649,10 +1667,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } rotate: - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ERR("Stream rotation error"); - goto error; + if (rotate_ready) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + if (rotation_ret < 0) { + ERR("Stream rotation error"); + ret = -1; + goto error; + } } error: diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 1ec0a0e46..fab493d1f 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2580,7 +2580,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { unsigned long len, subbuf_size, padding; - int err, write_index = 1, rotation_ret; + int err, write_index = 1, rotation_ret, rotate_ready; long ret = 0; struct ustctl_consumer_stream *ustream; struct ctf_packet_index index; @@ -2679,6 +2679,16 @@ retry: assert(len >= subbuf_size); padding = len - subbuf_size; + + rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len); + if (rotate_ready < 0) { + ERR("Failed to check if stream is ready for rotation"); + ret = -1; + err = ustctl_put_subbuf(ustream); + assert(err == 0); + goto error; + } + /* write the subbuffer to the tracefile */ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index); /* @@ -2752,11 +2762,13 @@ retry: } rotate: - rotation_ret = lttng_consumer_rotate_stream(ctx, stream); - if (rotation_ret < 0) { - ret = -1; - ERR("Stream rotation error"); - goto error; + if (rotate_ready) { + rotation_ret = lttng_consumer_rotate_stream(ctx, stream); + if (rotation_ret < 0) { + ret = -1; + ERR("Stream rotation error"); + goto error; + } } error: return ret;