X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=e80ac6be751fbba7eebc98404d520f9ea62c4cd5;hp=ede214c8025b264fcdf5a538a565b2ab5b710eed;hb=02b3d1769d5f8a33e4109b1e681141c9295dfda6;hpb=2c521c63a46d6c79a7819a822d109ccdd48b6064 diff --git a/src/common/consumer.c b/src/common/consumer.c index ede214c80..e80ac6be7 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1091,6 +1091,9 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, */ (*pollfd)[i].fd = lttng_pipe_get_readfd(ctx->consumer_data_pipe); (*pollfd)[i].events = POLLIN | POLLPRI; + + (*pollfd)[i + 1].fd = lttng_pipe_get_readfd(ctx->consumer_wakeup_pipe); + (*pollfd)[i + 1].events = POLLIN | POLLPRI; return i; } @@ -1287,6 +1290,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_poll_pipe; } + ctx->consumer_wakeup_pipe = lttng_pipe_open(0); + if (!ctx->consumer_wakeup_pipe) { + goto error_wakeup_pipe; + } + ret = pipe(ctx->consumer_should_quit); if (ret < 0) { PERROR("Error creating recv pipe"); @@ -1326,6 +1334,8 @@ error_channel_pipe: error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); +error_wakeup_pipe: lttng_pipe_destroy(ctx->consumer_data_pipe); error_poll_pipe: free(ctx); @@ -1408,6 +1418,7 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); + lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); utils_close_pipe(ctx->consumer_splice_metadata_pipe); @@ -2402,16 +2413,18 @@ void *consumer_thread_data_poll(void *data) free(local_stream); local_stream = NULL; - /* allocate for all fds + 1 for the consumer_data_pipe */ - pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd)); + /* + * Allocate for all fds +1 for the consumer_data_pipe and +1 for + * wake up pipe. + */ + pollfd = zmalloc((consumer_data.stream_count + 2) * sizeof(struct pollfd)); if (pollfd == NULL) { PERROR("pollfd malloc"); pthread_mutex_unlock(&consumer_data.lock); goto end; } - /* allocate for all fds + 1 for the consumer_data_pipe */ - local_stream = zmalloc((consumer_data.stream_count + 1) * + local_stream = zmalloc((consumer_data.stream_count + 2) * sizeof(struct lttng_consumer_stream *)); if (local_stream == NULL) { PERROR("local_stream malloc"); @@ -2438,9 +2451,9 @@ void *consumer_thread_data_poll(void *data) } /* poll on the array of fds */ restart: - DBG("polling on %d fd", nb_fd + 1); + DBG("polling on %d fd", nb_fd + 2); health_poll_entry(); - num_rdy = poll(pollfd, nb_fd + 1, -1); + num_rdy = poll(pollfd, nb_fd + 2, -1); health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { @@ -2489,6 +2502,20 @@ void *consumer_thread_data_poll(void *data) continue; } + /* Handle wakeup pipe. */ + if (pollfd[nb_fd + 1].revents & (POLLIN | POLLPRI)) { + char dummy; + ssize_t pipe_readlen; + + pipe_readlen = lttng_pipe_read(ctx->consumer_wakeup_pipe, &dummy, + sizeof(dummy)); + if (pipe_readlen < 0) { + PERROR("Consumer data wakeup pipe"); + } + /* We've been awakened to handle stream(s). */ + ctx->has_wakeup = 0; + } + /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { health_code_update(); @@ -2527,7 +2554,8 @@ void *consumer_thread_data_poll(void *data) continue; } if ((pollfd[i].revents & POLLIN) || - local_stream[i]->hangup_flush_done) { + local_stream[i]->hangup_flush_done || + local_stream[i]->has_data) { DBG("Normal read on fd %d", pollfd[i].fd); len = ctx->on_buffer_ready(local_stream[i], ctx); /* it's ok to have an unavailable sub-buffer */