From 1fc79fb475198741b09a13b5397f018dff4b1aec Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 16 Sep 2013 17:44:22 -0500 Subject: [PATCH] consumerd: register threads to health monitoring Signed-off-by: Mathieu Desnoyers --- src/bin/lttng-consumerd/Makefile.am | 5 ++- src/bin/lttng-consumerd/health-consumerd.h | 37 +++++++++++++++ src/bin/lttng-consumerd/lttng-consumerd.c | 12 +++++ src/common/consumer-timer.c | 7 +++ src/common/consumer.c | 52 ++++++++++++++++++++-- 5 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 src/bin/lttng-consumerd/health-consumerd.h diff --git a/src/bin/lttng-consumerd/Makefile.am b/src/bin/lttng-consumerd/Makefile.am index a7971ade0..9f029a15a 100644 --- a/src/bin/lttng-consumerd/Makefile.am +++ b/src/bin/lttng-consumerd/Makefile.am @@ -2,13 +2,16 @@ AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/src lttnglibexec_PROGRAMS = lttng-consumerd -lttng_consumerd_SOURCES = lttng-consumerd.c lttng-consumerd.h +lttng_consumerd_SOURCES = lttng-consumerd.c \ + lttng-consumerd.h \ + health-consumerd.h lttng_consumerd_LDADD = \ $(top_builddir)/src/common/libconsumer.la \ $(top_builddir)/src/common/sessiond-comm/libsessiond-comm.la \ $(top_builddir)/src/common/libcommon.la \ $(top_builddir)/src/common/index/libindex.la \ + $(top_builddir)/src/common/health/libhealth.la \ -lrt if HAVE_LIBLTTNG_UST_CTL diff --git a/src/bin/lttng-consumerd/health-consumerd.h b/src/bin/lttng-consumerd/health-consumerd.h new file mode 100644 index 000000000..f5e2a34ce --- /dev/null +++ b/src/bin/lttng-consumerd/health-consumerd.h @@ -0,0 +1,37 @@ +#ifndef HEALTH_CONSUMERD_H +#define HEALTH_CONSUMERD_H + +/* + * Copyright (C) 2012 - David Goulet + * Copyright (C) 2013 - Mathieu Desnoyers + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License, version 2 only, as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include + +enum health_type { + HEALTH_CONSUMERD_TYPE_CHANNEL = 0, + HEALTH_CONSUMERD_TYPE_METADATA = 1, + HEALTH_CONSUMERD_TYPE_DATA = 2, + HEALTH_CONSUMERD_TYPE_SESSIOND = 3, + HEALTH_CONSUMERD_TYPE_METADATA_TIMER = 4, + + NR_HEALTH_CONSUMERD_TYPES, +}; + +/* Consumerd health monitoring */ +struct health_app *health_consumerd; + +#endif /* HEALTH_CONSUMERD_H */ diff --git a/src/bin/lttng-consumerd/lttng-consumerd.c b/src/bin/lttng-consumerd/lttng-consumerd.c index 59397594b..e33a470f5 100644 --- a/src/bin/lttng-consumerd/lttng-consumerd.c +++ b/src/bin/lttng-consumerd/lttng-consumerd.c @@ -49,6 +49,7 @@ #include #include "lttng-consumerd.h" +#include "health-consumerd.h" /* TODO : support UST (all direct kernel-ctl accesses). */ @@ -72,6 +73,9 @@ static enum lttng_consumer_type opt_type = LTTNG_CONSUMER_KERNEL; /* the liblttngconsumerd context */ static struct lttng_consumer_local_data *ctx; +/* Consumerd health monitoring */ +struct health_app *health_consumerd; + /* * Signal handler for the daemon */ @@ -325,6 +329,11 @@ int main(int argc, char **argv) set_ulimit(); } + health_consumerd = health_app_create(NR_HEALTH_CONSUMERD_TYPES); + if (!health_consumerd) { + goto error; + } + /* create the consumer instance with and assign the callbacks */ ctx = lttng_consumer_create(opt_type, lttng_consumer_read_subbuffer, NULL, lttng_consumer_on_recv_stream, NULL); @@ -469,6 +478,9 @@ error: end: lttng_consumer_destroy(ctx); lttng_consumer_cleanup(); + if (health_consumerd) { + health_app_destroy(health_consumerd); + } return ret; } diff --git a/src/common/consumer-timer.c b/src/common/consumer-timer.c index 68b5638dd..b02ccbb11 100644 --- a/src/common/consumer-timer.c +++ b/src/common/consumer-timer.c @@ -28,6 +28,7 @@ #include "consumer-timer.h" #include "ust-consumer/ust-consumer.h" +#include "../bin/lttng-consumerd/health-consumerd.h" static struct timer_signal_data timer_signal = { .tid = 0, @@ -469,6 +470,8 @@ void *consumer_timer_thread(void *data) siginfo_t info; struct lttng_consumer_local_data *ctx = data; + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER); + /* Only self thread will receive signal mask. */ setmask(&mask); CMM_STORE_SHARED(timer_signal.tid, pthread_self()); @@ -494,5 +497,9 @@ void *consumer_timer_thread(void *data) } } + /* Currently never reached */ + health_unregister(health_consumerd); + + /* Never return */ return NULL; } diff --git a/src/common/consumer.c b/src/common/consumer.c index 892c841ba..6abd8b1e8 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -44,6 +44,7 @@ #include "consumer.h" #include "consumer-stream.h" +#include "../bin/lttng-consumerd/health-consumerd.h" struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -2182,7 +2183,7 @@ static void validate_endpoint_status_metadata_stream( */ void *consumer_thread_metadata_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_stream *stream = NULL; struct lttng_ht_iter iter; @@ -2193,6 +2194,8 @@ void *consumer_thread_metadata_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA); + metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!metadata_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2220,6 +2223,7 @@ void *consumer_thread_metadata_poll(void *data) while (1) { /* Only the metadata pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } @@ -2352,6 +2356,8 @@ restart: } } + /* All is OK */ + err = 0; error: end: DBG("Metadata poll thread exiting"); @@ -2360,6 +2366,11 @@ end: end_poll: destroy_stream_ht(metadata_ht); end_ht: + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2370,7 +2381,7 @@ end_ht: */ void *consumer_thread_data_poll(void *data) { - int num_rdy, num_hup, high_prio, ret, i; + int num_rdy, num_hup, high_prio, ret, i, err = -1; struct pollfd *pollfd = NULL; /* local view of the streams */ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; @@ -2381,6 +2392,8 @@ void *consumer_thread_data_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA); + data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (data_ht == NULL) { /* ENOMEM at this point. Better to bail out. */ @@ -2440,6 +2453,7 @@ void *consumer_thread_data_poll(void *data) /* No FDs and consumer_quit, consumer_cleanup the thread */ if (nb_fd == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } /* poll on the array of fds */ @@ -2588,6 +2602,8 @@ void *consumer_thread_data_poll(void *data) } } } + /* All is OK */ + err = 0; end: DBG("polling thread exiting"); free(pollfd); @@ -2605,6 +2621,12 @@ end: destroy_data_stream_ht(data_ht); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; } @@ -2686,7 +2708,7 @@ static void destroy_channel_ht(struct lttng_ht *ht) */ void *consumer_thread_channel_poll(void *data) { - int ret, i, pollfd; + int ret, i, pollfd, err = -1; uint32_t revents, nb_fd; struct lttng_consumer_channel *chan = NULL; struct lttng_ht_iter iter; @@ -2697,6 +2719,8 @@ void *consumer_thread_channel_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL); + channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64); if (!channel_ht) { /* ENOMEM at this point. Better to bail out. */ @@ -2723,6 +2747,7 @@ void *consumer_thread_channel_poll(void *data) while (1) { /* Only the channel pipe is set */ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) { + err = 0; /* All is OK */ goto end; } @@ -2880,12 +2905,19 @@ restart: } } + /* All is OK */ + err = 0; end: lttng_poll_clean(&events); end_poll: destroy_channel_ht(channel_ht); end_ht: DBG("Channel poll thread exiting"); + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); rcu_unregister_thread(); return NULL; } @@ -2923,7 +2955,7 @@ error: */ void *consumer_thread_sessiond_poll(void *data) { - int sock = -1, client_socket, ret; + int sock = -1, client_socket, ret, err = -1; /* * structure to poll for incoming data on communication socket avoids * making blocking sockets. @@ -2933,6 +2965,8 @@ void *consumer_thread_sessiond_poll(void *data) rcu_register_thread(); + health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND); + DBG("Creating command socket %s", ctx->consumer_command_sock_path); unlink(ctx->consumer_command_sock_path); client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path); @@ -3013,10 +3047,14 @@ void *consumer_thread_sessiond_poll(void *data) } if (consumer_quit) { DBG("consumer_thread_receive_fds received quit from signal"); + err = 0; /* All is OK */ goto end; } DBG("received command on sock"); } + /* All is OK */ + err = 0; + end: DBG("Consumer thread sessiond poll exiting"); @@ -3056,6 +3094,12 @@ end: } } + if (err) { + health_error(); + ERR("Health error occurred in %s", __func__); + } + health_unregister(health_consumerd); + rcu_unregister_thread(); return NULL; } -- 2.34.1