X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=2f20ffb5d1d97fb585803d1721bdfeb7e494ffb9;hp=6abd8b1e86de34c71d330ac68726846dd971da3b;hb=9ce5646a7ef9b8d7936c46649a21ee546fadd538;hpb=1fc79fb475198741b09a13b5397f018dff4b1aec diff --git a/src/common/consumer.c b/src/common/consumer.c index 6abd8b1e8..2f20ffb5d 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -2196,6 +2196,8 @@ void *consumer_thread_metadata_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + health_code_update(); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2221,6 +2223,8 @@ void *consumer_thread_metadata_poll(void *data) DBG("Metadata main loop started"); while (1) { + health_code_update(); + /* Only the metadata pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { err = 0; /* All is OK */ @@ -2229,7 +2233,9 @@ void *consumer_thread_metadata_poll(void *data) restart: DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2243,6 +2249,8 @@ restart: /* From here, the event is a metadata wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2312,6 +2320,8 @@ restart: /* We just flushed the stream now read it. */ do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2334,6 +2344,8 @@ restart: assert(stream->wait_fd == pollfd); do { + health_code_update(); + len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get @@ -2394,6 +2406,8 @@ void *consumer_thread_data_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + health_code_update(); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ @@ -2407,6 +2421,8 @@ void *consumer_thread_data_poll(void *data) } while (1) { + health_code_update(); + high_prio = 0; num_hup = 0; @@ -2459,7 +2475,9 @@ void *consumer_thread_data_poll(void *data) /* poll on the array of fds */ restart: DBG("polling on %d fd", nb_fd + 1); + health_poll_entry(); num_rdy = poll(pollfd, nb_fd + 1, -1); + health_poll_exit(); DBG("poll num_rdy : %d", num_rdy); if (num_rdy == -1) { /* @@ -2509,6 +2527,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of high priority channels first. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2537,6 +2557,8 @@ void *consumer_thread_data_poll(void *data) /* Take care of low priority channels. */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2557,6 +2579,8 @@ void *consumer_thread_data_poll(void *data) /* Handle hangup and errors */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + if (local_stream[i] == NULL) { continue; } @@ -2721,6 +2745,8 @@ void *consumer_thread_channel_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + health_code_update(); + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!channel_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2745,6 +2771,8 @@ void *consumer_thread_channel_poll(void *data) DBG("Channel main loop started"); while (1) { + health_code_update(); + /* Only the channel pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { err = 0; /* All is OK */ @@ -2753,7 +2781,9 @@ void *consumer_thread_channel_poll(void *data) restart: DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_poll_entry(); ret = lttng_poll_wait(&events, -1); + health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { if (errno == EINTR) { @@ -2767,6 +2797,8 @@ restart: /* From here, the event is a channel wait fd */ for (i = 0; i < nb_fd; i++) { + health_code_update(); + revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); @@ -2833,6 +2865,8 @@ restart: /* Delete streams that might have been left in the stream list. */ cds_list_for_each_entry_safe(stream, stmp, &chan->streams.head, send_node) { + health_code_update(); + cds_list_del(&stream->send_node); lttng_ustconsumer_del_stream(stream); uatomic_sub(&stream->chan->refcount, 1); @@ -2967,6 +3001,8 @@ void *consumer_thread_sessiond_poll(void *data) health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + health_code_update(); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -3027,7 +3063,12 @@ void *consumer_thread_sessiond_poll(void *data) consumer_sockpoll[1].events = POLLIN | POLLPRI; while (1) { - if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { + health_code_update(); + + health_poll_entry(); + ret = lttng_consumer_poll_socket(consumer_sockpoll); + health_poll_exit(); + if (ret < 0) { goto end; } DBG("Incoming command on sock");