consumerd: move rotation logic to domain-agnostic read path
[lttng-tools.git] / src / common / consumer / consumer.c
index e2e7438f62d29a942740f37dcd255c6e9242cbf7..f13e90a6881ac4ab87312c61a7e632004d97e81d 100644 (file)
@@ -3413,6 +3413,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotation_ret;
 
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
@@ -3420,6 +3421,19 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_lock(&stream->metadata_rdv_lock);
        }
 
+       /*
+        * If the stream was flagged to be ready for rotation before we extract
+        * the next packet, rotate it now.
+        */
+       if (stream->rotate_ready) {
+               DBG("Rotate stream before consuming data");
+               ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (ret < 0) {
+                       ERR("Stream rotation error before consuming data");
+                       goto end;
+               }
+       }
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                ret = lttng_kconsumer_read_subbuffer(stream, ctx);
@@ -3435,13 +3449,38 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (ret < 0) {
+               goto end;
+       }
+
+       /*
+        * After extracting the packet, we check if the stream is now ready to
+        * be rotated and perform the action immediately.
+        *
+        * Don't overwrite `ret` as callers expect the number of bytes
+        * consumed to be returned on success.
+        */
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (rotation_ret < 0) {
+                       ret = rotation_ret;
+                       ERR("Stream rotation error after consuming data");
+                       goto end;
+               }
+       } else if (rotation_ret < 0) {
+               ret = rotation_ret;
+               ERR("Failed to check if stream was ready to rotate after consuming data");
+               goto end;
+       }
+
+end:
        if (stream->metadata_flag) {
                pthread_cond_broadcast(&stream->metadata_rdv);
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
-
        return ret;
 }
 
This page took 0.025129 seconds and 5 git commands to generate.