consumerd: register threads to health monitoring
[lttng-tools.git] / src / common / consumer.c
index b8695698da7301b69b114fe373680b248294b486..6abd8b1e86de34c71d330ac68726846dd971da3b 100644 (file)
@@ -44,6 +44,7 @@
 
 #include "consumer.h"
 #include "consumer-stream.h"
+#include "../bin/lttng-consumerd/health-consumerd.h"
 
 struct lttng_consumer_global_data consumer_data = {
        .stream_count = 0,
@@ -520,6 +521,9 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
                stream->metadata_flag = 1;
                /* Metadata is flat out. */
                strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
+               /* Live rendez-vous point. */
+               pthread_cond_init(&stream->metadata_rdv, NULL);
+               pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
        } else {
                /* Format stream name to <channel_name>_<cpu_number> */
                ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
@@ -2179,7 +2183,7 @@ static void validate_endpoint_status_metadata_stream(
  */
 void *consumer_thread_metadata_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_stream *stream = NULL;
        struct lttng_ht_iter iter;
@@ -2190,6 +2194,8 @@ void *consumer_thread_metadata_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA);
+
        metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!metadata_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2217,6 +2223,7 @@ void *consumer_thread_metadata_poll(void *data)
        while (1) {
                /* Only the metadata pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
@@ -2349,6 +2356,8 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 error:
 end:
        DBG("Metadata poll thread exiting");
@@ -2357,6 +2366,11 @@ end:
 end_poll:
        destroy_stream_ht(metadata_ht);
 end_ht:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2367,7 +2381,7 @@ end_ht:
  */
 void *consumer_thread_data_poll(void *data)
 {
-       int num_rdy, num_hup, high_prio, ret, i;
+       int num_rdy, num_hup, high_prio, ret, i, err = -1;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
@@ -2378,6 +2392,8 @@ void *consumer_thread_data_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_DATA);
+
        data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (data_ht == NULL) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2437,6 +2453,7 @@ void *consumer_thread_data_poll(void *data)
 
                /* No FDs and consumer_quit, consumer_cleanup the thread */
                if (nb_fd == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
                /* poll on the array of fds */
@@ -2585,6 +2602,8 @@ void *consumer_thread_data_poll(void *data)
                        }
                }
        }
+       /* All is OK */
+       err = 0;
 end:
        DBG("polling thread exiting");
        free(pollfd);
@@ -2602,6 +2621,12 @@ end:
 
        destroy_data_stream_ht(data_ht);
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -2683,7 +2708,7 @@ static void destroy_channel_ht(struct lttng_ht *ht)
  */
 void *consumer_thread_channel_poll(void *data)
 {
-       int ret, i, pollfd;
+       int ret, i, pollfd, err = -1;
        uint32_t revents, nb_fd;
        struct lttng_consumer_channel *chan = NULL;
        struct lttng_ht_iter iter;
@@ -2694,6 +2719,8 @@ void *consumer_thread_channel_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_CHANNEL);
+
        channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
        if (!channel_ht) {
                /* ENOMEM at this point. Better to bail out. */
@@ -2720,6 +2747,7 @@ void *consumer_thread_channel_poll(void *data)
        while (1) {
                /* Only the channel pipe is set */
                if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
+                       err = 0;        /* All is OK */
                        goto end;
                }
 
@@ -2877,12 +2905,19 @@ restart:
                }
        }
 
+       /* All is OK */
+       err = 0;
 end:
        lttng_poll_clean(&events);
 end_poll:
        destroy_channel_ht(channel_ht);
 end_ht:
        DBG("Channel poll thread exiting");
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
        rcu_unregister_thread();
        return NULL;
 }
@@ -2920,7 +2955,7 @@ error:
  */
 void *consumer_thread_sessiond_poll(void *data)
 {
-       int sock = -1, client_socket, ret;
+       int sock = -1, client_socket, ret, err = -1;
        /*
         * structure to poll for incoming data on communication socket avoids
         * making blocking sockets.
@@ -2930,6 +2965,8 @@ void *consumer_thread_sessiond_poll(void *data)
 
        rcu_register_thread();
 
+       health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_SESSIOND);
+
        DBG("Creating command socket %s", ctx->consumer_command_sock_path);
        unlink(ctx->consumer_command_sock_path);
        client_socket = lttcomm_create_unix_sock(ctx->consumer_command_sock_path);
@@ -3005,14 +3042,19 @@ void *consumer_thread_sessiond_poll(void *data)
                         * ERR() here.
                         */
                        DBG("Communication interrupted on command socket");
+                       err = 0;
                        goto end;
                }
                if (consumer_quit) {
                        DBG("consumer_thread_receive_fds received quit from signal");
+                       err = 0;        /* All is OK */
                        goto end;
                }
                DBG("received command on sock");
        }
+       /* All is OK */
+       err = 0;
+
 end:
        DBG("Consumer thread sessiond poll exiting");
 
@@ -3052,6 +3094,12 @@ end:
                }
        }
 
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_consumerd);
+
        rcu_unregister_thread();
        return NULL;
 }
@@ -3062,6 +3110,9 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret;
 
        pthread_mutex_lock(&stream->lock);
+       if (stream->metadata_flag) {
+               pthread_mutex_lock(&stream->metadata_rdv_lock);
+       }
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -3078,6 +3129,10 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (stream->metadata_flag) {
+               pthread_cond_broadcast(&stream->metadata_rdv);
+               pthread_mutex_unlock(&stream->metadata_rdv_lock);
+       }
        pthread_mutex_unlock(&stream->lock);
        return ret;
 }
This page took 0.029126 seconds and 5 git commands to generate.