struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotation_ret;
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&stream->metadata_rdv_lock);
}
+ /*
+ * If the stream was flagged to be ready for rotation before we extract
+ * the next packet, rotate it now.
+ */
+ if (stream->rotate_ready) {
+ DBG("Rotate stream before consuming data");
+ ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (ret < 0) {
+ ERR("Stream rotation error before consuming data");
+ goto end;
+ }
+ }
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
ret = lttng_kconsumer_read_subbuffer(stream, ctx);
break;
}
+ if (ret < 0) {
+ goto end;
+ }
+
+ /*
+ * After extracting the packet, we check if the stream is now ready to
+ * be rotated and perform the action immediately.
+ *
+ * Don't overwrite `ret` as callers expect the number of bytes
+ * consumed to be returned on success.
+ */
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Stream rotation error after consuming data");
+ goto end;
+ }
+ } else if (rotation_ret < 0) {
+ ret = rotation_ret;
+ ERR("Failed to check if stream was ready to rotate after consuming data");
+ goto end;
+ }
+
+end:
if (stream->metadata_flag) {
pthread_cond_broadcast(&stream->metadata_rdv);
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}