}
/*
- * Performs the stream rotation for the rotate session feature if needed.
- * It must be called with the stream and channel locks held.
+ * Check if a stream is ready to be rotated after extracting it.
*
- * Return 0 on success, a negative number of error.
+ * When we are called between get_next_subbuf and put_next_subbuf, the len
+ * parameter is the subbuf size of the current subbuffer being extracted. This
+ * len is with padding, so it is normal to see that the current position is
+ * farther than the expected rotate position.
+ *
+ * Return 1 if it is ready for rotation, 0 if it is not, a negative value on
+ * error.
*/
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
+ unsigned long len)
{
int ret;
unsigned long consumed_pos;
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
ERR("Taking kernel snapshot positions");
- goto error;
+ goto end;
}
ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Produced kernel snapshot position");
- goto error;
+ goto end;
}
fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
/* Rotate position not reached yet. */
- if (consumed_pos < stream->rotate_position) {
+ if ((consumed_pos + len) < stream->rotate_position) {
ret = 0;
goto end;
}
fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
- consumed_pos, stream->rotate_position, stream->key);
+ consumed_pos + len, stream->rotate_position,
+ stream->key);
+ ret = 1;
} else {
fprintf(stderr, "Rotate position reached for stream %lu\n",
stream->key);
+ ret = 1;
}
+end:
+ return ret;
+}
+
+/*
+ * Performs the stream rotation for the rotate session feature if needed.
+ * It must be called with the stream and channel locks held.
+ *
+ * Return 0 on success, a negative number of error.
+ */
+int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream)
+{
+ int ret;
+
ret = close(stream->out_fd);
if (ret < 0) {
PERROR("Closing tracefile");
int lttng_consumer_rotate_channel(uint64_t key, char *path,
uint64_t relayd_id, uint32_t metadata,
struct lttng_consumer_local_data *ctx);
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
+ unsigned long len);
int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_ready_streams(uint64_t key,
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1, rotation_ret;
+ int err, write_index = 1, rotation_ret, rotate_ready;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index;
goto error;
}
+ rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+ if (rotate_ready < 0) {
+ ERR("Failed to check if stream is ready for rotation");
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto error;
+ }
+ ret = -1;
+ goto error;
+ }
+
if (!stream->metadata_flag) {
ret = get_index_values(&index, infd);
if (ret < 0) {
}
rotate:
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ERR("Stream rotation error");
- goto error;
+ if (rotate_ready) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (rotation_ret < 0) {
+ ERR("Stream rotation error");
+ ret = -1;
+ goto error;
+ }
}
error:
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1, rotation_ret;
+ int err, write_index = 1, rotation_ret, rotate_ready;
long ret = 0;
struct ustctl_consumer_stream *ustream;
struct ctf_packet_index index;
assert(len >= subbuf_size);
padding = len - subbuf_size;
+
+ rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
+ if (rotate_ready < 0) {
+ ERR("Failed to check if stream is ready for rotation");
+ ret = -1;
+ err = ustctl_put_subbuf(ustream);
+ assert(err == 0);
+ goto error;
+ }
+
/* write the subbuffer to the tracefile */
ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
/*
}
rotate:
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
- if (rotation_ret < 0) {
- ret = -1;
- ERR("Stream rotation error");
- goto error;
+ if (rotate_ready) {
+ rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ if (rotation_ret < 0) {
+ ret = -1;
+ ERR("Stream rotation error");
+ goto error;
+ }
}
error:
return ret;