rcu_read_unlock();
}
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+ uint64_t key)
+{
+ int ret;
+
+ fprintf(stderr, "Notif send\n");
+ do {
+ ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ PERROR("write to the channel rotate pipe");
+ } else {
+ DBG("Sent channel rotation notification for channel key %"
+ PRIu64, key);
+ }
+ fprintf(stderr, "Notif done\n");
+
+ return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->rotated) {
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->chan->lock);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * The ust_metadata_pushed counter has been reset to 0, so now
+ * we can wakeup the metadata thread so it dumps the metadata
+ * cache to the new file.
+ */
+ if (stream->metadata_flag) {
+ consumer_metadata_wakeup_pipe(stream->chan);
+ }
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
+ if (--stream->chan->nr_stream_rotate_pending == 0) {
+ ret = rotate_notify_sessiond(ctx, stream->chan->key);
+ }
+ pthread_mutex_unlock(&stream->chan->lock);
+ stream->rotated = 0;
+
+end:
+ return ret;
+}
+
static
int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx,
struct lttng_pipe *stream_pipe)
goto end;
}
+ pthread_mutex_lock(&stream->lock);
fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key);
ret = lttng_consumer_rotate_stream(ctx, stream);
+ pthread_mutex_unlock(&stream->lock);
if (ret < 0) {
ERR("Failed to rotate metadata stream");
goto end;
}
+ ret = consumer_post_rotation(stream, ctx);
+ if (ret < 0) {
+ ERR("Failed after a rotation");
+ ret = -1;
+ }
ret = 0;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
- int nb_fd = 0;
+ int nb_fd = 0, nb_pipes_fd;
struct lttng_consumer_local_data *ctx = data;
ssize_t len;
local_stream = NULL;
/*
- * Allocate for all fds and:
+ * Allocate for all fds + 3:
* +1 for the consumer_data_pipe
* +1 for wake up pipe
* +1 for consumer_data_rotate_pipe.
*/
- pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd));
+ nb_pipes_fd = 3;
+ pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- local_stream = zmalloc((consumer_data.stream_count + 3) *
+ local_stream = zmalloc((consumer_data.stream_count + nb_pipes_fd) *
sizeof(struct lttng_consumer_stream *));
if (local_stream == NULL) {
PERROR("local_stream malloc");
}
/* poll on the array of fds */
restart:
- DBG("polling on %d fd", nb_fd + 2);
+ DBG("polling on %d fd", nb_fd + nb_pipes_fd);
if (testpoint(consumerd_thread_data_poll)) {
goto end;
}
health_poll_entry();
- num_rdy = poll(pollfd, nb_fd + 2, -1);
+ num_rdy = poll(pollfd, nb_fd + nb_pipes_fd, -1);
health_poll_exit();
DBG("poll num_rdy : %d", num_rdy);
if (num_rdy == -1) {
return NULL;
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- int ret;
-
- fprintf(stderr, "Notif send\n");
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("write to the channel rotate pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- }
- fprintf(stderr, "Notif done\n");
-
- return ret;
-}
-
-/*
- * Perform operations that need to be done after a stream has
- * rotated and released the stream lock.
- *
- * Multiple rotations cannot occur simultaneously, so we know the state of the
- * "rotated" stream flag cannot change.
- *
- * This MUST be called WITHOUT the stream lock held.
- */
-static
-int consumer_post_rotation(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
-{
- int ret = 0;
-
- if (!stream->rotated) {
- goto end;
- }
-
- pthread_mutex_lock(&stream->chan->lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- /*
- * The ust_metadata_pushed counter has been reset to 0, so now
- * we can wakeup the metadata thread so it dumps the metadata
- * cache to the new file.
- */
- if (stream->metadata_flag) {
- consumer_metadata_wakeup_pipe(stream->chan);
- }
- break;
- default:
- ERR("Unknown consumer_data type");
- abort();
- }
-
- fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending);
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
- pthread_mutex_unlock(&stream->chan->lock);
- stream->rotated = 0;
-
-end:
- return ret;
-}
-
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
/*
* Check if a stream is ready to be rotated after extracting it.
*
- * When we are called between get_next_subbuf and put_next_subbuf, the len
- * parameter is the subbuf size of the current subbuffer being extracted. This
- * len is with padding, so it is normal to see that the current position is
- * farther than the expected rotate position.
- *
* Return 1 if it is ready for rotation, 0 if it is not, a negative value on
- * error.
+ * error. Stream lock must be held.
*/
-int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream,
- unsigned long len)
+int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
{
int ret;
unsigned long consumed_pos;
ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
- ERR("Produced kernel snapshot position");
+ ERR("Consumed kernel snapshot position");
goto end;
}
fprintf(stderr, "packet %lu, pos %lu\n", stream->key, consumed_pos);
/* Rotate position not reached yet. */
- if ((consumed_pos + len) < stream->rotate_position) {
+ if (consumed_pos < stream->rotate_position) {
ret = 0;
goto end;
}
fprintf(stderr, "Rotate position %lu (expected %lu) reached for stream %lu\n",
- consumed_pos + len, stream->rotate_position,
+ consumed_pos, stream->rotate_position,
stream->key);
ret = 1;
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err, write_index = 1, rotation_ret, rotate_ready;
+ int err, write_index = 1, rotation_ret;
ssize_t ret = 0;
int infd = stream->wait_fd;
struct ctf_packet_index index;
goto error;
}
- rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len);
- fprintf(stderr, "consumer read stream %lu, len %lu, ready = %d\n", stream->key,
- len, rotate_ready);
- if (rotate_ready < 0) {
- ERR("Failed to check if stream is ready for rotation");
- err = kernctl_put_subbuf(infd);
- if (err != 0) {
- if (err == -EFAULT) {
- PERROR("Error in unreserving sub buffer\n");
- } else if (err == -EIO) {
- /* Should never happen with newer LTTng versions */
- PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
- }
- ret = err;
- goto error;
- }
- ret = -1;
- goto error;
- }
- stream->rotate_ready = rotate_ready;
-
if (!stream->metadata_flag) {
ret = get_index_values(&index, infd);
if (ret < 0) {
}
rotate:
- if (stream->rotate_ready) {
+ rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+ if (rotation_ret == 1) {
rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
if (rotation_ret < 0) {
ERR("Stream rotation error");
ret = -1;
goto error;
}
+ } else if (rotation_ret < 0) {
+ ERR("Checking if stream is ready to rotate");
+ ret = -1;
+ goto error;
}
error: