X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=6f3a02d2619eeef095684fc927399fca7a9404dd;hb=5c635c724d60fe8e8bfeb00907dfa1e113cc3548;hp=6abd8b1e86de34c71d330ac68726846dd971da3b;hpb=1fc79fb475198741b09a13b5397f018dff4b1aec;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 6abd8b1e8..6f3a02d26 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -94,6 +94,18 @@ static void notify_thread_lttng_pipe(struct lttng_pipe *pipe) (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); } +static void notify_health_quit_pipe(int *pipe) +{ + int ret; + + do { + ret = write(pipe[1], "4", 1); + } while (ret < 0 && errno == EINTR); + if (ret < 0 || ret != 1) { + PERROR("write consumer health quit"); + } +} + static void notify_channel_pipe(struct lttng_consumer_local_data *ctx, struct lttng_consumer_channel *chan, uint64_t key, @@ -2196,6 +2208,8 @@ void *consumer_thread_metadata_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + health_code_update(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2221,6 +2235,8 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { + health_code_update(); + /* Only the metadata pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { err = 0; /* All is OK */ @@ -2229,7 +2245,9 @@ void *consumer_thread_metadata_poll(void *data) restart: DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2243,6 +2261,8 @@ restart: /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2312,6 +2332,8 @@ restart: /* We just flushed the stream now read it. */ do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2334,6 +2356,8 @@ restart: assert(stream->wait_fd == pollfd); do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2394,6 +2418,8 @@ void *consumer_thread_data_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + health_code_update(); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ @@ -2407,6 +2433,8 @@ void *consumer_thread_data_poll(void *data) } while (1) { + health_code_update(); + high_prio = 0; num_hup = 0; @@ -2459,7 +2487,9 @@ void *consumer_thread_data_poll(void *data) /* poll on the array of fds */ restart: DBG("polling on %d fd", nb_fd + 1); + health_poll_entry(); num_rdy = poll(pollfd, nb_fd + 1, -1); + health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { /* @@ -2509,6 +2539,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2537,6 +2569,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of low priority channels. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2557,6 +2591,8 @@ void *consumer_thread_data_poll(void *data) /* Handle hangup and errors */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2721,6 +2757,8 @@ void *consumer_thread_channel_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + health_code_update(); + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!channel_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2745,6 +2783,8 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { + health_code_update(); + /* Only the channel pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { err = 0; /* All is OK */ @@ -2753,7 +2793,9 @@ void *consumer_thread_channel_poll(void *data) restart: DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2767,6 +2809,8 @@ restart: /* From here, the event is a channel wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2833,6 +2877,8 @@ restart: /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, send_node) { + health_code_update(); + cds_list_del(&stream->send_node); lttng_ustconsumer_del_stream(stream); uatomic_sub(&stream->chan->refcount, 1); @@ -2967,6 +3013,8 @@ void *consumer_thread_sessiond_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + health_code_update(); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -3027,7 +3075,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].events = POLLIN | POLLPRI; while (1) { - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + health_code_update(); + + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret < 0) { goto end; } DBG("Incoming command on sock"); @@ -3080,6 +3133,8 @@ end: notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT); + notify_health_quit_pipe(health_quit_pipe); + /* Cleaning up possibly open sockets. */ if (sock >= 0) { ret = close(sock);