check if the stream is ready for rotation between get_next and put_next
authorJulien Desfossez <jdesfossez@efficios.com>
Thu, 24 Aug 2017 21:42:10 +0000 (17:42 -0400)
committerJulien Desfossez <jdesfossez@efficios.com>
Wed, 6 Sep 2017 17:59:58 +0000 (13:59 -0400)
This will allow to send the extended data header to the relay and rotate
as soon as the current packet is written to the trace file.

Signed-off-by: Julien Desfossez <jdesfossez@efficios.com>
src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index 40c4229533d1c9b4cc5a36a3f14296ac6786b725..fa93ab81358c08bd1ce0c833e268ba81558933d5 100644 (file)
@@ -4025,13 +4025,18 @@ end:
 }
 
 /*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream and channel locks held.
+ * Check if a stream is ready to be rotated after extracting it.
  *
- * Return 0 on success, a negative number of error.
+ * When we are called between get_next_subbuf and put_next_subbuf, the len
+ * parameter is the subbuf size of the current subbuffer being extracted. This
+ * len is with padding, so it is normal to see that the current position is
+ * farther than the expected rotate position.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error.
  */
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
+               unsigned long len)
 {
        int ret;
        unsigned long consumed_pos;
@@ -4049,28 +4054,46 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                ret = lttng_consumer_sample_snapshot_positions(stream);
                if (ret < 0) {
                        ERR("Taking kernel snapshot positions");
-                       goto error;
+                       goto end;
                }
 
                ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Produced kernel snapshot position");
-                       goto error;
+                       goto end;
                }
 
                fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
                /* Rotate position not reached yet. */
-               if (consumed_pos < stream->rotate_position) {
+               if ((consumed_pos + len) < stream->rotate_position) {
                        ret = 0;
                        goto end;
                }
                fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
-                               consumed_pos, stream->rotate_position, stream->key);
+                               consumed_pos + len, stream->rotate_position,
+                               stream->key);
+               ret = 1;
        } else {
                fprintf(stderr, "Rotate position reached for stream %lu\n",
                                stream->key);
+               ret = 1;
        }
 
+end:
+       return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream and channel locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+               struct lttng_consumer_stream *stream)
+{
+       int ret;
+
        ret = close(stream->out_fd);
        if (ret < 0) {
                PERROR("Closing tracefile");
index 3aa63eed1fecc21dc415a8dbf1af68c09cf40524..7f09bf870effd1a2e80013912d4701692d7e12ea 100644 (file)
@@ -818,6 +818,8 @@ int consumer_create_index_file(struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_channel(uint64_t key, char *path,
                uint64_t relayd_id, uint32_t metadata,
                struct lttng_consumer_local_data *ctx);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
+               unsigned long len);
 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream);
 int lttng_consumer_rotate_ready_streams(uint64_t key,
index bbce28ccc406de613e93af354d6e130ef11041d5..2f15f6a25067493be6b0e9ec22ef73d33debcf61 100644 (file)
@@ -1432,7 +1432,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1, rotation_ret, rotate_ready;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct ctf_packet_index index;
@@ -1473,6 +1473,24 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                goto error;
        }
 
+       rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+       if (rotate_ready < 0) {
+               ERR("Failed to check if stream is ready for rotation");
+               err = kernctl_put_subbuf(infd);
+               if (err != 0) {
+                       if (err == -EFAULT) {
+                               PERROR("Error in unreserving sub buffer\n");
+                       } else if (err == -EIO) {
+                               /* Should never happen with newer LTTng versions */
+                               PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+                       }
+                       ret = err;
+                       goto error;
+               }
+               ret = -1;
+               goto error;
+       }
+
        if (!stream->metadata_flag) {
                ret = get_index_values(&index, infd);
                if (ret < 0) {
@@ -1649,10 +1667,13 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
        }
 
 rotate:
-       rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-       if (rotation_ret < 0) {
-               ERR("Stream rotation error");
-               goto error;
+       if (rotate_ready) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (rotation_ret < 0) {
+                       ERR("Stream rotation error");
+                       ret = -1;
+                       goto error;
+               }
        }
 
 error:
index 1ec0a0e4670ce94a6eddfb57bc68b18a506b7be8..fab493d1fe159d4902c6434b37828ef3f2e69cae 100644 (file)
@@ -2580,7 +2580,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1, rotation_ret, rotate_ready;
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
@@ -2679,6 +2679,16 @@ retry:
        assert(len >= subbuf_size);
 
        padding = len - subbuf_size;
+
+       rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+       if (rotate_ready < 0) {
+               ERR("Failed to check if stream is ready for rotation");
+               ret = -1;
+               err = ustctl_put_subbuf(ustream);
+               assert(err == 0);
+               goto error;
+       }
+
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
        /*
@@ -2752,11 +2762,13 @@ retry:
        }
 
 rotate:
-       rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-       if (rotation_ret < 0) {
-               ret = -1;
-               ERR("Stream rotation error");
-               goto error;
+       if (rotate_ready) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (rotation_ret < 0) {
+                       ret = -1;
+                       ERR("Stream rotation error");
+                       goto error;
+               }
        }
 error:
        return ret;
This page took 0.032025 seconds and 5 git commands to generate.