From 61f33cc993ae206a95c4b528e9d1f45ae841435a Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 28 Sep 2017 16:26:42 -0400 Subject: [PATCH] wip rotate consumer pipe Signed-off-by: Julien Desfossez --- src/common/consumer/consumer.c | 149 ++++++++++++++++--- src/common/consumer/consumer.h | 10 ++ src/common/kernel-consumer/kernel-consumer.c | 14 +- 3 files changed, 142 insertions(+), 31 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index c6fce9777..b37629d4b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -1134,6 +1134,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe); (*pollfd)[i + 1].events = POLLIN | POLLPRI; + + (*pollfd)[i + 2].fd = lttng_pipe_get_readfd(ctx->consumer_data_rotate_pipe); + (*pollfd)[i + 2].events = POLLIN | POLLPRI; return i; } @@ -1345,6 +1348,16 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_wakeup_pipe; } + ctx->consumer_data_rotate_pipe = lttng_pipe_open(0); + if (!ctx->consumer_data_rotate_pipe) { + goto error_data_rotate_pipe; + } + + ctx->consumer_metadata_rotate_pipe = lttng_pipe_open(0); + if (!ctx->consumer_metadata_rotate_pipe) { + goto error_metadata_rotate_pipe; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1371,6 +1384,10 @@ error_metadata_pipe: error_channel_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: + lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe); +error_metadata_rotate_pipe: + lttng_pipe_destroy(ctx->consumer_data_rotate_pipe); +error_data_rotate_pipe: lttng_pipe_destroy(ctx->consumer_wakeup_pipe); error_wakeup_pipe: lttng_pipe_destroy(ctx->consumer_data_pipe); @@ -1459,6 +1476,8 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); lttng_pipe_destroy(ctx->consumer_wakeup_pipe); + lttng_pipe_destroy(ctx->consumer_data_rotate_pipe); + lttng_pipe_destroy(ctx->consumer_metadata_rotate_pipe); utils_close_pipe(ctx->consumer_should_quit); unlink(ctx->consumer_command_sock_path); @@ -2273,6 +2292,37 @@ static void validate_endpoint_status_metadata_stream( rcu_read_unlock(); } +static +int handle_rotate_wakeup_pipe(struct lttng_consumer_local_data *ctx, + struct lttng_pipe *stream_pipe) +{ + int ret; + ssize_t pipe_len; + struct lttng_consumer_stream *stream; + + pipe_len = lttng_pipe_read(stream_pipe, &stream, sizeof(stream)); + if (pipe_len < sizeof(stream)) { + if (pipe_len < 0) { + PERROR("read metadata stream"); + } + ERR("Failed to read stream on metadata rotate pipe"); + ret = -1; + goto end; + } + + fprintf(stderr, "Rotate wakeup pipe, stream %lu\n", stream->key); + ret = lttng_consumer_rotate_stream(ctx, stream); + if (ret < 0) { + ERR("Failed to rotate metadata stream"); + goto end; + } + + ret = 0; + +end: + return ret; +} + /* * Thread polls on metadata file descriptor and write them on disk or on the * network. @@ -2301,7 +2351,7 @@ void *consumer_thread_metadata_poll(void *data) DBG("Thread metadata poll started"); /* Size is set to 1 for the consumer_metadata pipe */ - ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC); + ret = lttng_poll_create(&events, 3, LTTNG_CLOEXEC); if (ret < 0) { ERR("Poll set creation failed"); goto end_poll; @@ -2313,6 +2363,12 @@ void *consumer_thread_metadata_poll(void *data) goto end; } + ret = lttng_poll_add(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe), LPOLLIN); + if (ret < 0) { + goto end; + } + /* Main loop */ DBG("Metadata main loop started"); @@ -2401,6 +2457,35 @@ restart: /* Handle other stream */ continue; + } else if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)) { + if (revents & LPOLLIN) { + ret = handle_rotate_wakeup_pipe(ctx, + ctx->consumer_metadata_rotate_pipe); + if (ret < 0) { + ERR("Failed to rotate metadata stream"); + lttng_poll_del(&events, + lttng_pipe_get_readfd( + ctx->consumer_metadata_rotate_pipe)); + lttng_pipe_read_close( + ctx->consumer_metadata_rotate_pipe); + goto end; + } + } else if (revents & (LPOLLERR | LPOLLHUP)) { + DBG("Metadata rotate pipe hung up"); + fprintf(stderr, "Metadata rotate pipe hung up"); + /* + * Remove the pipe from the poll set and continue the loop + * since their might be data to consume. + */ + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_rotate_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_rotate_pipe); + continue; + } else { + ERR("Unexpected poll events %u for sock %d", revents, pollfd); + goto end; + } + continue; } rcu_read_lock(); @@ -2543,17 +2628,19 @@ void *consumer_thread_data_poll(void *data) local_stream = NULL; /* - * Allocate for all fds +1 for the consumer_data_pipe and +1 for - * wake up pipe. + * Allocate for all fds and: + * +1 for the consumer_data_pipe + * +1 for wake up pipe + * +1 for consumer_data_rotate_pipe. */ - pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); + pollfd = zmalloc((consumer_data.stream_count + 3) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - local_stream = zmalloc((consumer_data.stream_count + 2) * + local_stream = zmalloc((consumer_data.stream_count + 3) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2654,6 +2741,17 @@ void *consumer_thread_data_poll(void *data) ctx->has_wakeup = 0; } + /* Handle consumer_data_rotate_pipe. */ + if (pollfd[nb_fd + 2].revents & (POLLIN | POLLPRI)) { + fprintf(stderr, "data wakeup pipe\n"); + ret = handle_rotate_wakeup_pipe(ctx, + ctx->consumer_data_rotate_pipe); + if (ret < 0) { + ERR("Failed to rotate metadata stream"); + goto end; + } + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { health_code_update(); @@ -2772,6 +2870,7 @@ end: * the read side. */ (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); + (void) lttng_pipe_write_close(ctx->consumer_metadata_rotate_pipe); error_testpoint: if (err) { @@ -3319,6 +3418,7 @@ int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, { int ret; + fprintf(stderr, "Notif send\n"); do { ret = write(ctx->channel_rotate_pipe, &key, sizeof(key)); } while (ret == -1 && errno == EINTR); @@ -3328,6 +3428,7 @@ int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx, DBG("Sent channel rotation notification for channel key %" PRIu64, key); } + fprintf(stderr, "Notif done\n"); return ret; } @@ -3371,6 +3472,7 @@ int consumer_post_rotation(struct lttng_consumer_stream *stream, abort(); } + fprintf(stderr, "nr_pending: %lu\n", stream->chan->nr_stream_rotate_pending); if (--stream->chan->nr_stream_rotate_pending == 0) { ret = rotate_notify_sessiond(ctx, stream->chan->key); } @@ -3411,6 +3513,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, pthread_cond_broadcast(&stream->metadata_rdv); pthread_mutex_unlock(&stream->metadata_rdv_lock); } + fprintf(stderr, "rotated: %d\n", stream->rotated); pthread_mutex_unlock(&stream->lock); rotate_ret = consumer_post_rotation(stream, ctx); @@ -3945,7 +4048,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; - DBG("Kernel consumer sample rotate position for channel %" PRIu64, key); + DBG("Consumer sample rotate position for channel %" PRIu64, key); rcu_read_lock(); @@ -3984,14 +4087,14 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, memcpy(stream->channel_ro_pathname, channel->pathname, PATH_MAX); ret = lttng_consumer_sample_snapshot_positions(stream); if (ret < 0) { - ERR("Taking kernel snapshot positions"); + ERR("Taking snapshot positions"); goto end_unlock; } ret = lttng_consumer_get_produced_snapshot(stream, &stream->rotate_position); if (ret < 0) { - ERR("Produced kernel snapshot position"); + ERR("Produced snapshot position"); goto end_unlock; } fprintf(stderr, "Stream %lu should rotate after %lu to %s\n", @@ -4005,6 +4108,7 @@ int lttng_consumer_rotate_channel(uint64_t key, char *path, fprintf(stderr, "Stream %lu ready to rotate to %s\n", stream->key, channel->pathname); } + fprintf(stderr, "before increasinc nr_pending: %lu\n", channel->nr_stream_rotate_pending); channel->nr_stream_rotate_pending++; ret = consumer_flush_buffer(stream, 1); @@ -4243,6 +4347,7 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, int ret; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; + struct lttng_pipe *stream_pipe; struct lttng_ht_iter iter; struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht; @@ -4254,33 +4359,31 @@ int lttng_consumer_rotate_ready_streams(uint64_t key, ret = -1; goto end; } + + if (channel->metadata_stream) { + fprintf(stderr, "M\n"); + stream_pipe = ctx->consumer_metadata_rotate_pipe; + } else { + fprintf(stderr, "D\n"); + stream_pipe = ctx->consumer_data_rotate_pipe; + } + cds_lfht_for_each_entry_duplicate(ht->ht, ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct, &channel->key, &iter.iter, stream, node_channel_id.node) { health_code_update(); - /* - * Lock stream because we are about to change its state. - */ - pthread_mutex_lock(&stream->lock); if (stream->rotate_ready == 0) { - pthread_mutex_unlock(&stream->lock); continue; } - ret = lttng_consumer_rotate_stream(ctx, stream); - if (ret < 0) { - pthread_mutex_unlock(&stream->lock); - ERR("Stream rotation error"); - goto end; - } - - pthread_mutex_unlock(&stream->lock); - ret = consumer_post_rotation(stream, ctx); + fprintf(stderr, "send stream %lu on wakeup pipe\n", stream->key); + ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { - ERR("Failed after a rotation"); + ERR("Failed to wakeup consumer rotate pipe"); goto end; } + fprintf(stderr, "done sending stream %lu on wakeup pipe\n", stream->key); } ret = 0; diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 0ee937c7a..8b87838e6 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -606,6 +606,16 @@ struct lttng_consumer_local_data { * its rotation (write-only). */ int channel_rotate_pipe; + /* + * Pipe to wakeup the data thread if a stream needs to rotated + * immediately (vs waiting for more data). + */ + struct lttng_pipe *consumer_data_rotate_pipe; + /* + * Pipe to wakeup the metadata thread if a stream needs to rotated + * immediately (vs waiting for more data). + */ + struct lttng_pipe *consumer_metadata_rotate_pipe; }; /* diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index b95730925..f33372a49 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -1156,19 +1156,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, goto end_nosignal; } - /* - * Rotate the streams that are ready right now. - * FIXME: this is a second consecutive iteration over the - * streams in a channel, there is probably a better way to - * handle this, but it needs to be after the - * consumer_send_status_msg() call. - */ + /* Rotate the streams that are ready right now. */ ret = lttng_consumer_rotate_ready_streams( msg.u.rotate_channel.key, ctx); if (ret < 0) { - ERR("Rotate channel failed"); + ERR("Rotate ready streams failed"); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } + break; } case LTTNG_CONSUMER_ROTATE_RENAME: @@ -1509,6 +1504,8 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } rotate_ready = lttng_consumer_stream_is_rotate_ready(stream, len); + fprintf(stderr, "consumer read stream %lu, len %lu, ready = %d\n", stream->key, + len, rotate_ready); if (rotate_ready < 0) { ERR("Failed to check if stream is ready for rotation"); err = kernctl_put_subbuf(infd); @@ -1666,6 +1663,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, ret = err; goto error; } + fprintf(stderr, "consumer read stream %lu done\n", stream->key); /* Write index if needed. */ if (!write_index) { -- 2.34.1