From b9478f8d577c88b514733987ba67ca424877331c Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Wed, 26 Jul 2017 16:01:04 -0400 Subject: [PATCH] post-rotation cleanup Signed-off-by: Julien Desfossez --- src/common/consumer/consumer.c | 126 +++++++++++++------ src/common/consumer/consumer.h | 6 + src/common/kernel-consumer/kernel-consumer.c | 39 +++--- src/common/ust-consumer/ust-consumer.c | 28 ++--- 4 files changed, 125 insertions(+), 74 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 90e632dc3..1a8dadad8 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -3312,10 +3312,76 @@ error_testpoint: return NULL; } +static +int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, + uint64_t key) +{ + int ret; + + do { + ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + PERROR("write to the channel rotate pipe"); + } else { + DBG("Sent channel rotation notification for channel key %" + PRIu64, key); + } + + return ret; +} + +/* + * Perform operations that need to be done after a stream has + * rotated and released the stream lock. + * + * Multiple rotations cannot occur simultaneously, so we know the state of the + * "rotated" stream flag cannot change. + * + * This MUST be called WITHOUT the stream lock held. + */ +static +int consumer_post_rotation(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx) +{ + int ret = 0; + + if (!stream->rotated) { + goto end; + } + + pthread_mutex_lock(&stream->chan->lock); + switch (consumer_data.type) { + case LTTNG_CONSUMER_KERNEL: + break; + case LTTNG_CONSUMER32_UST: + case LTTNG_CONSUMER64_UST: + /* + * Wakeup the metadata thread so it dumps the metadata cache + * to file again. + */ + consumer_metadata_wakeup_pipe(stream->chan); + break; + default: + ERR("Unknown consumer_data type"); + abort(); + } + + if (--stream->chan->nr_stream_rotate_pending == 0) { + ret = rotate_notify_sessiond(ctx, stream->chan->key); + } + pthread_mutex_unlock(&stream->chan->lock); + stream->rotated = 0; + +end: + return ret; +} + ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, struct lttng_consumer_local_data *ctx) { ssize_t ret; + int rotate_ret; pthread_mutex_lock(&stream->lock); if (stream->metadata_flag) { @@ -3342,6 +3408,13 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_rdv_lock); } pthread_mutex_unlock(&stream->lock); + + rotate_ret = consumer_post_rotation(stream, ctx); + if (rotate_ret < 0) { + ERR("Failed after a rotation"); + ret = -1; + } + return ret; } @@ -3828,7 +3901,7 @@ unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, } static -int flush_buffer(struct lttng_consumer_stream *stream, int producer_active) +int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_active) { int ret = 0; @@ -3891,6 +3964,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, * Lock stream because we are about to change its state. */ pthread_mutex_lock(&stream->lock); + memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX); ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { ERR("Taking kernel snapshot positions"); @@ -3918,7 +3992,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, } channel->nr_stream_rotate_pending++; - ret = flush_buffer(stream, 1); + ret = consumer_flush_buffer(stream, 1); if (ret < 0) { ERR("Failed to flush stream"); goto end_unlock; @@ -3939,25 +4013,6 @@ end: return ret; } -static -int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, - uint64_t key) -{ - int ret; - - do { - ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); - } while (ret == -1 && errno == EINTR); - if (ret == -1) { - PERROR("write to the channel rotate pipe"); - } else { - DBG("Sent channel rotation notification for channel key %" - PRIu64, key); - } - - return ret; -} - /* * Performs the stream rotation for the rotate session feature if needed. * It must be called with the stream and channel locks held. @@ -4012,9 +4067,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, } fprintf(stderr, "Rotating stream %lu to %s/%s\n", stream->key, - stream->chan->pathname, stream->name); - ret = utils_create_stream_file(stream->chan->pathname, stream->name, - stream->chan->tracefile_size, stream->tracefile_count_current, + stream->channel_ro_pathname, stream->name); + ret = utils_create_stream_file(stream->channel_ro_pathname, stream->name, + stream->channel_ro_tracefile_size, stream->tracefile_count_current, stream->uid, stream->gid, NULL); if (ret < 0) { goto error; @@ -4027,9 +4082,9 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, lttng_index_file_put(stream->index_file); - index_file = lttng_index_file_create(stream->chan->pathname, + index_file = lttng_index_file_create(stream->channel_ro_pathname, stream->name, stream->uid, stream->gid, - stream->chan->tracefile_size, + stream->channel_ro_tracefile_size, stream->tracefile_count_current, CTF_INDEX_MAJOR, CTF_INDEX_MINOR); if (!index_file) { @@ -4046,7 +4101,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, */ ret = kernctl_metadata_cache_dump(stream->wait_fd); if (ret < 0) { - ERR("Failed to dump the metadata cache after rotation"); + ERR("Failed to dump the kernel metadata cache after rotation"); goto error; } break; @@ -4057,12 +4112,6 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, * will write from the beginning on the next push. */ stream->ust_metadata_pushed = 0; - /* - * Wakeup the metadata thread so it dumps the metadata cache - * to file again. - * FIXME: post-pone that after we have released the stream lock. - */ - consumer_metadata_wakeup_pipe(stream->chan); break; default: ERR("Unknown consumer_data type"); @@ -4072,11 +4121,7 @@ int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx, stream->rotate_position = 0; stream->rotate_ready = 0; - - if (--stream->chan->nr_stream_rotate_pending == 0) { - rotate_notify_sessiond(ctx, stream->chan->key); - fprintf(stderr, "SENT %lu\n", stream->chan->key); - } + stream->rotated = 1; ret = 0; goto end; @@ -4135,6 +4180,11 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, } pthread_mutex_unlock(&stream->lock); + ret = consumer_post_rotation(stream, ctx); + if (ret < 0) { + ERR("Failed after a rotation"); + goto end; + } } ret = 0; diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 77256c146..d8df7adb4 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -446,6 +446,12 @@ struct lttng_consumer_stream { * use the produced/consumed positions as reference. */ unsigned int rotate_ready:1; + /* + * Flag set to 1 if the stream just got rotated. This is used to + * perform actions on the channel after a rotation without needing + * to nest the channel lock inside the stream lock. + */ + unsigned int rotated:1; /* Indicate if the stream still has some data to be read. */ unsigned int has_data:1; diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 89be8de0a..76c031063 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1428,7 +1428,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency)"); ret = err; - goto end; + goto error; } /* Get the full subbuffer size including padding */ @@ -1444,10 +1444,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } if (!stream->metadata_flag) { @@ -1462,9 +1462,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } ret = update_stream_stats(stream); if (ret < 0) { @@ -1477,9 +1477,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } else { write_index = 0; @@ -1494,9 +1494,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } - goto end; + goto error; } } @@ -1541,10 +1541,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } ret = err; - goto end; + goto error; } /* Make sure the tracer is not gone mad on us! */ @@ -1587,12 +1587,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, PERROR("Reader has been pushed by the writer, last sub-buffer corrupted."); } ret = err; - goto end; + goto error; } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -1616,24 +1616,23 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_mutex_unlock(&stream->metadata_timer_lock); } if (err < 0) { - goto end; + goto error; } } err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: - pthread_mutex_lock(&stream->chan->lock); +rotate: rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { - pthread_mutex_unlock(&stream->chan->lock); ERR("Stream rotation error"); - goto end; + goto error; } - pthread_mutex_unlock(&stream->chan->lock); + +error: return ret; } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 668de5775..db6ec8bff 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -2589,7 +2589,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream, readlen = lttng_read(stream->wait_fd, &dummy, 1); if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { ret = readlen; - goto end; + goto error; } } @@ -2604,7 +2604,7 @@ retry: if (stream->metadata_flag) { ret = commit_one_metadata_packet(stream); if (ret <= 0) { - goto end; + goto error; } ustctl_flush_buffer(stream->ustream, 1); goto retry; @@ -2619,7 +2619,7 @@ retry: */ DBG("Reserving sub buffer failed (everything is normal, " "it is due to concurrency) [ret: %d]", err); - goto end; + goto error; } assert(stream->chan->output == CONSUMER_CHANNEL_MMAP); @@ -2629,7 +2629,7 @@ retry: if (ret < 0) { err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } /* Update the stream's sequence and discarded events count. */ @@ -2638,7 +2638,7 @@ retry: PERROR("kernctl_get_events_discarded"); err = ustctl_put_subbuf(ustream); assert(err == 0); - goto end; + goto error; } } else { write_index = 0; @@ -2687,13 +2687,13 @@ retry: if (!stream->metadata_flag) { ret = notify_if_more_data(stream, ctx); if (ret < 0) { - goto end; + goto error; } } /* Write index if needed. */ if (!write_index) { - goto end; + goto rotate; } if (stream->chan->live_timer_interval && !stream->metadata_flag) { @@ -2718,28 +2718,24 @@ retry: } if (err < 0) { - goto end; + goto error; } } assert(!stream->metadata_flag); err = consumer_stream_write_index(stream, &index); if (err < 0) { - goto end; + goto error; } -end: - /* FIXME: do we need this lock, it causes deadlocks when called - * at the same time with lttng_ustconsumer_rotate_channel ? */ -// pthread_mutex_lock(&stream->chan->lock); +rotate: rotation_ret = lttng_consumer_rotate_stream(ctx, stream); if (rotation_ret < 0) { -// pthread_mutex_unlock(&stream->chan->lock); ret = -1; ERR("Stream rotation error"); - goto end; + goto error; } -// pthread_mutex_unlock(&stream->chan->lock); +error: return ret; } -- 2.34.1