X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=6abd8b1e86de34c71d330ac68726846dd971da3b;hp=892c841ba0f8958ae46cff1f4d577f0426df1415;hb=1fc79fb475198741b09a13b5397f018dff4b1aec;hpb=f385ae0a8f5c34cc33ca57cdb412393f90f9c0a8 diff --git a/src/common/consumer.c b/src/common/consumer.c index 892c841ba..6abd8b1e8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -44,6 +44,7 @@ #include "consumer.h" #include "consumer-stream.h" +#include "../bin/lttng-consumerd/health-consumerd.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -2182,7 +2183,7 @@ static void validate_endpoint_status_metadata_stream( */ void *consumer_thread_metadata_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; @@ -2193,6 +2194,8 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2220,6 +2223,7 @@ void *consumer_thread_metadata_poll(void *data) while (1) { /* Only the metadata pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } @@ -2352,6 +2356,8 @@ restart: } } + /* All is OK */ + err = 0; error: end: DBG("Metadata poll thread exiting"); @@ -2360,6 +2366,11 @@ end: end_poll: destroy_stream_ht(metadata_ht); end_ht: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2370,7 +2381,7 @@ end_ht: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i; + int num_rdy, num_hup, high_prio, ret, i, err = -1; struct pollfd *pollfd = NULL; /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; @@ -2381,6 +2392,8 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ @@ -2440,6 +2453,7 @@ void *consumer_thread_data_poll(void *data) /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } /* poll on the array of fds */ @@ -2588,6 +2602,8 @@ void *consumer_thread_data_poll(void *data) } } } + /* All is OK */ + err = 0; end: DBG("polling thread exiting"); free(pollfd); @@ -2605,6 +2621,12 @@ end: destroy_data_stream_ht(data_ht); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; } @@ -2686,7 +2708,7 @@ static void destroy_channel_ht(struct lttng_ht *ht) */ void *consumer_thread_channel_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_channel *chan = NULL; struct lttng_ht_iter iter; @@ -2697,6 +2719,8 @@ void *consumer_thread_channel_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!channel_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2723,6 +2747,7 @@ void *consumer_thread_channel_poll(void *data) while (1) { /* Only the channel pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } @@ -2880,12 +2905,19 @@ restart: } } + /* All is OK */ + err = 0; end: lttng_poll_clean(&events); end_poll: destroy_channel_ht(channel_ht); end_ht: DBG("Channel poll thread exiting"); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2923,7 +2955,7 @@ error: */ void *consumer_thread_sessiond_poll(void *data) { - int sock = -1, client_socket, ret; + int sock = -1, client_socket, ret, err = -1; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2933,6 +2965,8 @@ void *consumer_thread_sessiond_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -3013,10 +3047,14 @@ void *consumer_thread_sessiond_poll(void *data) } if (consumer_quit) { DBG("consumer_thread_receive_fds received quit from signal"); + err = 0; /* All is OK */ goto end; } DBG("received command on sock"); } + /* All is OK */ + err = 0; + end: DBG("Consumer thread sessiond poll exiting"); @@ -3056,6 +3094,12 @@ end: } } + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; }