struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotate_ret;
+ bool rotated = false;
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx, &rotated);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx, &rotated);
break;
default:
ERR("Unknown consumer_data type");
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
+ if (rotated) {
+ rotate_ret = consumer_post_rotation(stream, ctx);
+ if (rotate_ret < 0) {
+ ERR("Failed after a rotation");
+ ret = -1;
+ }
+ }
+
return ret;
}
return start_pos;
}
+/*
+ * Check if a stream is ready to be rotated after extracting it.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error. Stream lock must be held.
+ */
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
+{
+ int ret;
+ unsigned long consumed_pos;
+
+ if (!stream->rotate_position && !stream->rotate_ready) {
+ ret = 0;
+ goto end;
+ }
+
+ if (stream->rotate_ready) {
+ ret = 1;
+ goto end;
+ }
+
+ /*
+ * If we don't have the rotate_ready flag, check the consumed position
+ * to determine if we need to rotate.
+ */
+ ret = lttng_consumer_sample_snapshot_positions(stream);
+ if (ret < 0) {
+ ERR("Taking snapshot positions");
+ goto end;
+ }
+
+ ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
+ if (ret < 0) {
+ ERR("Consumed snapshot position");
+ goto end;
+ }
+
+ /* Rotate position not reached yet (with check for overflow). */
+ if ((long) (consumed_pos - stream->rotate_position) < 0) {
+ ret = 0;
+ goto end;
+ }
+ ret = 1;
+
+end:
+ return ret;
+}
+
/*
* Reset the state for a stream after a rotation occurred.
*/
* Return 0 on success, a negative number of error.
*/
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+ struct lttng_consumer_stream *stream, bool *rotated)
{
int ret;
}
lttng_consumer_reset_stream_rotate_state(stream);
+ if (rotated) {
+ *rotated = true;
+ }
+
ret = 0;
error: