return NULL;
}
+static
+int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
+ uint64_t key)
+{
+ int ret;
+
+ do {
+ ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1) {
+ PERROR("write to the channel rotate pipe");
+ } else {
+ DBG("Sent channel rotation notification for channel key %"
+ PRIu64, key);
+ }
+
+ return ret;
+}
+
+/*
+ * Perform operations that need to be done after a stream has
+ * rotated and released the stream lock.
+ *
+ * Multiple rotations cannot occur simultaneously, so we know the state of the
+ * "rotated" stream flag cannot change.
+ *
+ * This MUST be called WITHOUT the stream lock held.
+ */
+static
+int consumer_post_rotation(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx)
+{
+ int ret = 0;
+
+ if (!stream->rotated) {
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->chan->lock);
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ /*
+ * Wakeup the metadata thread so it dumps the metadata cache
+ * to file again.
+ */
+ consumer_metadata_wakeup_pipe(stream->chan);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ abort();
+ }
+
+ if (--stream->chan->nr_stream_rotate_pending == 0) {
+ ret = rotate_notify_sessiond(ctx, stream->chan->key);
+ }
+ pthread_mutex_unlock(&stream->chan->lock);
+ stream->rotated = 0;
+
+end:
+ return ret;
+}
+
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
ssize_t ret;
+ int rotate_ret;
pthread_mutex_lock(&stream->lock);
if (stream->metadata_flag) {
pthread_mutex_unlock(&stream->metadata_rdv_lock);
}
pthread_mutex_unlock(&stream->lock);
+
+ rotate_ret = consumer_post_rotation(stream, ctx);
+ if (rotate_ret < 0) {
+ ERR("Failed after a rotation");
+ ret = -1;
+ }
+
return ret;
}
}
static
-int flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
+int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active)
{
int ret = 0;
* Lock stream because we are about to change its state.
*/
pthread_mutex_lock(&stream->lock);
+ memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX);
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
ERR("Taking kernel snapshot positions");
}
channel->nr_stream_rotate_pending++;
- ret = flush_buffer(stream, 1);
+ ret = consumer_flush_buffer(stream, 1);
if (ret < 0) {
ERR("Failed to flush stream");
goto end_unlock;
return ret;
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- int ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("write to the channel rotate pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- }
-
- return ret;
-}
-
/*
* Performs the stream rotation for the rotate session feature if needed.
* It must be called with the stream and channel locks held.
}
fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key,
- stream->chan->pathname, stream->name);
- ret = utils_create_stream_file(stream->chan->pathname, stream->name,
- stream->chan->tracefile_size, stream->tracefile_count_current,
+ stream->channel_ro_pathname, stream->name);
+ ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name,
+ stream->channel_ro_tracefile_size, stream->tracefile_count_current,
stream->uid, stream->gid, NULL);
if (ret < 0) {
goto error;
lttng_index_file_put(stream->index_file);
- index_file = lttng_index_file_create(stream->chan->pathname,
+ index_file = lttng_index_file_create(stream->channel_ro_pathname,
stream->name, stream->uid, stream->gid,
- stream->chan->tracefile_size,
+ stream->channel_ro_tracefile_size,
stream->tracefile_count_current,
CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
if (!index_file) {
*/
ret = kernctl_metadata_cache_dump(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to dump the metadata cache after rotation");
+ ERR("Failed to dump the kernel metadata cache after rotation");
goto error;
}
break;
* will write from the beginning on the next push.
*/
stream->ust_metadata_pushed = 0;
- /*
- * Wakeup the metadata thread so it dumps the metadata cache
- * to file again.
- * FIXME: post-pone that after we have released the stream lock.
- */
- consumer_metadata_wakeup_pipe(stream->chan);
break;
default:
ERR("Unknown consumer_data type");
stream->rotate_position = 0;
stream->rotate_ready = 0;
-
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- rotate_notify_sessiond(ctx, stream->chan->key);
- fprintf(stderr, "SENT %lu\n", stream->chan->key);
- }
+ stream->rotated = 1;
ret = 0;
goto end;
}
pthread_mutex_unlock(&stream->lock);
+ ret = consumer_post_rotation(stream, ctx);
+ if (ret < 0) {
+ ERR("Failed after a rotation");
+ goto end;
+ }
}
ret = 0;
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency)");
ret = err;
- goto end;
+ goto error;
}
/* Get the full subbuffer size including padding */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
if (!stream->metadata_flag) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
ret = update_stream_stats(stream);
if (ret < 0) {
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
} else {
write_index = 0;
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
- goto end;
+ goto error;
}
}
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
ret = err;
- goto end;
+ goto error;
}
/* Make sure the tracer is not gone mad on us! */
PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
}
ret = err;
- goto end;
+ goto error;
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
pthread_mutex_unlock(&stream->metadata_timer_lock);
}
if (err < 0) {
- goto end;
+ goto error;
}
}
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
- pthread_mutex_lock(&stream->chan->lock);
+rotate:
rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
if (rotation_ret < 0) {
- pthread_mutex_unlock(&stream->chan->lock);
ERR("Stream rotation error");
- goto end;
+ goto error;
}
- pthread_mutex_unlock(&stream->chan->lock);
+
+error:
return ret;
}
readlen = lttng_read(stream->wait_fd, &dummy, 1);
if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
ret = readlen;
- goto end;
+ goto error;
}
}
if (stream->metadata_flag) {
ret = commit_one_metadata_packet(stream);
if (ret <= 0) {
- goto end;
+ goto error;
}
ustctl_flush_buffer(stream->ustream, 1);
goto retry;
*/
DBG("Reserving sub buffer failed (everything is normal, "
"it is due to concurrency) [ret: %d]", err);
- goto end;
+ goto error;
}
assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
if (ret < 0) {
err = ustctl_put_subbuf(ustream);
assert(err == 0);
- goto end;
+ goto error;
}
/* Update the stream's sequence and discarded events count. */
PERROR("kernctl_get_events_discarded");
err = ustctl_put_subbuf(ustream);
assert(err == 0);
- goto end;
+ goto error;
}
} else {
write_index = 0;
if (!stream->metadata_flag) {
ret = notify_if_more_data(stream, ctx);
if (ret < 0) {
- goto end;
+ goto error;
}
}
/* Write index if needed. */
if (!write_index) {
- goto end;
+ goto rotate;
}
if (stream->chan->live_timer_interval && !stream->metadata_flag) {
}
if (err < 0) {
- goto end;
+ goto error;
}
}
assert(!stream->metadata_flag);
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
- goto end;
+ goto error;
}
-end:
- /* FIXME: do we need this lock, it causes deadlocks when called
- * at the same time with lttng_ustconsumer_rotate_channel ? */
-// pthread_mutex_lock(&stream->chan->lock);
+rotate:
rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
if (rotation_ret < 0) {
-// pthread_mutex_unlock(&stream->chan->lock);
ret = -1;
ERR("Stream rotation error");
- goto end;
+ goto error;
}
-// pthread_mutex_unlock(&stream->chan->lock);
+error:
return ret;
}