Consumer perform the rotation when extracting a packet
[lttng-tools.git] / src / common / consumer / consumer.c
index 8e6b63b011436a06a80a455aad802d23276a4470..1ea14f8ca5991af15f4d8b7d094ab9c85168c163 100644 (file)
@@ -3392,6 +3392,8 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotate_ret;
+       bool rotated = false;
 
        pthread_mutex_lock(&stream->lock);
        if (stream->metadata_flag) {
@@ -3400,11 +3402,11 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated);
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+               ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -3418,6 +3420,14 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
+       if (rotated) {
+               rotate_ret = consumer_post_rotation(stream, ctx);
+               if (rotate_ret < 0) {
+                       ERR("Failed after a rotation");
+                       ret = -1;
+               }
+       }
+
        return ret;
 }
 
@@ -3903,6 +3913,54 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
        return start_pos;
 }
 
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+       int ret;
+       unsigned long consumed_pos;
+
+       if (!stream->rotate_position && !stream->rotate_ready) {
+               ret = 0;
+               goto end;
+       }
+
+       if (stream->rotate_ready) {
+               ret = 1;
+               goto end;
+       }
+
+       /*
+        * If we don't have the rotate_ready flag, check the consumed position
+        * to determine if we need to rotate.
+        */
+       ret = lttng_consumer_sample_snapshot_positions(stream);
+       if (ret < 0) {
+               ERR("Taking snapshot positions");
+               goto end;
+       }
+
+       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+       if (ret < 0) {
+               ERR("Consumed snapshot position");
+               goto end;
+       }
+
+       /* Rotate position not reached yet (with check for overflow). */
+       if ((long) (consumed_pos - stream->rotate_position) < 0) {
+               ret = 0;
+               goto end;
+       }
+       ret = 1;
+
+end:
+       return ret;
+}
+
 /*
  * Reset the state for a stream after a rotation occurred.
  */
@@ -4014,7 +4072,7 @@ end:
  * Return 0 on success, a negative number of error.
  */
 int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream)
+               struct lttng_consumer_stream *stream, bool *rotated)
 {
        int ret;
 
@@ -4058,6 +4116,10 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
        }
        lttng_consumer_reset_stream_rotate_state(stream);
 
+       if (rotated) {
+               *rotated = true;
+       }
+
        ret = 0;
 
 error:
This page took 0.025375 seconds and 5 git commands to generate.