Consumer perform the rotation when extracting a packet
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index 5e1ff896c4e37102bf895819717eda89aefbbba3..6030ca182fd19e5c3078473611ffe1e1ab1c6f8d 100644 (file)
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <urcu/list.h>
 #include <signal.h>
+#include <stdbool.h>
 
 #include <bin/lttng-consumerd/health-consumerd.h>
 #include <common/common.h>
@@ -2594,10 +2595,10 @@ end:
  * Return 0 on success else a negative value.
  */
 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx)
+               struct lttng_consumer_local_data *ctx, bool *rotated)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1;
+       int err, write_index = 1, rotation_ret;
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
@@ -2629,7 +2630,21 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                readlen = lttng_read(stream->wait_fd, &dummy, 1);
                if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
                        ret = readlen;
-                       goto end;
+                       goto error;
+               }
+       }
+
+       /*
+        * If the stream was flagged to be ready for rotation before we extract the
+        * next packet, rotate it now.
+        */
+       if (stream->rotate_ready) {
+               DBG("Rotate stream before extracting data");
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+               if (rotation_ret < 0) {
+                       ERR("Stream rotation error");
+                       ret = -1;
+                       goto error;
                }
        }
 
@@ -2644,7 +2659,7 @@ retry:
                if (stream->metadata_flag) {
                        ret = commit_one_metadata_packet(stream);
                        if (ret <= 0) {
-                               goto end;
+                               goto error;
                        }
                        ustctl_flush_buffer(stream->ustream, 1);
                        goto retry;
@@ -2659,7 +2674,7 @@ retry:
                 */
                DBG("Reserving sub buffer failed (everything is normal, "
                                "it is due to concurrency) [ret: %d]", err);
-               goto end;
+               goto error;
        }
        assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
 
@@ -2669,7 +2684,7 @@ retry:
                if (ret < 0) {
                        err = ustctl_put_subbuf(ustream);
                        assert(err == 0);
-                       goto end;
+                       goto error;
                }
 
                /* Update the stream's sequence and discarded events count. */
@@ -2678,7 +2693,7 @@ retry:
                        PERROR("kernctl_get_events_discarded");
                        err = ustctl_put_subbuf(ustream);
                        assert(err == 0);
-                       goto end;
+                       goto error;
                }
        } else {
                write_index = 0;
@@ -2696,6 +2711,7 @@ retry:
        assert(len >= subbuf_size);
 
        padding = len - subbuf_size;
+
        /* write the subbuffer to the tracefile */
        ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
        /*
@@ -2727,13 +2743,13 @@ retry:
        if (!stream->metadata_flag) {
                ret = notify_if_more_data(stream, ctx);
                if (ret < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto end;
+               goto rotate;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -2758,17 +2774,35 @@ retry:
                }
 
                if (err < 0) {
-                       goto end;
+                       goto error;
                }
        }
 
        assert(!stream->metadata_flag);
        err = consumer_stream_write_index(stream, &index);
        if (err < 0) {
-               goto end;
+               goto error;
        }
 
-end:
+rotate:
+       /*
+        * After extracting the packet, we check if the stream is now ready to be
+        * rotated and perform the action immediately.
+        */
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream, rotated);
+               if (rotation_ret < 0) {
+                       ERR("Stream rotation error");
+                       ret = -1;
+                       goto error;
+               }
+       } else if (rotation_ret < 0) {
+               ERR("Checking if stream is ready to rotate");
+               ret = -1;
+               goto error;
+       }
+error:
        return ret;
 }
 
This page took 0.027516 seconds and 5 git commands to generate.