From 13886d2d433762b7cf54f8e812f747ec2829de57 Mon Sep 17 00:00:00 2001 From: David Goulet Date: Tue, 14 May 2013 11:27:26 -0400 Subject: [PATCH] Change consumer_metadata_pipe to be a lttng_pipe The read() call in the metadata thread is also changed to use the lttng pipe read wrapper. Signed-off-by: David Goulet --- src/common/consumer.c | 58 +++++++------------- src/common/consumer.h | 2 +- src/common/kernel-consumer/kernel-consumer.c | 2 +- src/common/ust-consumer/ust-consumer.c | 2 +- 4 files changed, 22 insertions(+), 42 deletions(-) diff --git a/src/common/consumer.c b/src/common/consumer.c index d1b7ba29b..044a504ce 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -76,21 +76,6 @@ volatile int consumer_quit; static struct lttng_ht *metadata_ht; static struct lttng_ht *data_ht; -/* - * Notify a thread pipe to poll back again. This usually means that some global - * state has changed so we just send back the thread in a poll wait call. - */ -static void notify_thread_pipe(int wpipe) -{ - int ret; - - do { - struct lttng_consumer_stream *null_stream = NULL; - - ret = write(wpipe, &null_stream, sizeof(null_stream)); - } while (ret < 0 && errno == EINTR); -} - /* * Notify a thread lttng pipe to poll back again. This usually means that some * global state has changed so we just send back the thread in a poll wait @@ -423,7 +408,7 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, */ if (ctx) { notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_pipe(ctx->consumer_metadata_pipe[1]); + notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); } } @@ -1203,8 +1188,8 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_channel_pipe; } - ret = utils_create_pipe(ctx->consumer_metadata_pipe); - if (ret < 0) { + ctx->consumer_metadata_pipe = lttng_pipe_open(0); + if (!ctx->consumer_metadata_pipe) { goto error_metadata_pipe; } @@ -1216,7 +1201,7 @@ struct lttng_consumer_local_data *lttng_consumer_create( return ctx; error_splice_pipe: - utils_close_pipe(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_metadata_pipe); error_metadata_pipe: utils_close_pipe(ctx->consumer_channel_pipe); error_channel_pipe: @@ -1251,6 +1236,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); + lttng_pipe_destroy(ctx->consumer_metadata_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2131,7 +2117,8 @@ void *consumer_thread_metadata_poll(void *data) goto end_poll; } - ret = lttng_poll_add(&events, ctx->consumer_metadata_pipe[0], LPOLLIN); + ret = lttng_poll_add(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe), LPOLLIN); if (ret < 0) { goto end; } @@ -2169,30 +2156,26 @@ restart: continue; } - if (pollfd == ctx->consumer_metadata_pipe[0]) { + if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); /* * Remove the pipe from the poll set and continue the loop * since their might be data to consume. */ - lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]); - ret = close(ctx->consumer_metadata_pipe[0]); - if (ret < 0) { - PERROR("close metadata pipe"); - } + lttng_poll_del(&events, + lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)); + lttng_pipe_read_close(ctx->consumer_metadata_pipe); continue; } else if (revents & LPOLLIN) { - do { - /* Get the stream pointer received */ - ret = read(pollfd, &stream, sizeof(stream)); - } while (ret < 0 && errno == EINTR); - if (ret < 0 || - ret < sizeof(struct lttng_consumer_stream *)) { - PERROR("read metadata stream"); + ssize_t pipe_len; + + pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe, + &stream, sizeof(stream)); + if (pipe_len < 0) { + ERR("read metadata stream, ret: %ld", pipe_len); /* - * Let's continue here and hope we can still work - * without stopping the consumer. XXX: Should we? + * Continue here to handle the rest of the streams. */ continue; } @@ -2540,10 +2523,7 @@ end: * only tracked fd in the poll set. The thread will take care of closing * the read side. */ - ret = close(ctx->consumer_metadata_pipe[1]); - if (ret < 0) { - PERROR("close data pipe"); - } + (void) lttng_pipe_write_close(ctx->consumer_metadata_pipe); destroy_data_stream_ht(data_ht); diff --git a/src/common/consumer.h b/src/common/consumer.h index 91039e8e9..3726fd1e6 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -351,7 +351,7 @@ struct lttng_consumer_local_data { /* to let the signal handler wake up the fd receiver thread */ int consumer_should_quit[2]; /* Metadata poll thread pipe. Transfer metadata stream to it */ - int consumer_metadata_pipe[2]; + struct lttng_pipe *consumer_metadata_pipe; }; /* diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index d8aec492f..f23fc9c57 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -288,7 +288,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* Get the right pipe where the stream will be sent. */ if (new_stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe); } else { stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe); } diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 8bc69006b..3731cbb26 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -189,7 +189,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, /* Get the right pipe where the stream will be sent. */ if (stream->metadata_flag) { - stream_pipe = ctx->consumer_metadata_pipe[1]; + stream_pipe = lttng_pipe_get_writefd(ctx->consumer_metadata_pipe); } else { stream_pipe = lttng_pipe_get_writefd(ctx->consumer_data_pipe); } -- 2.34.1