relayd: add health instrumentation to threads
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 16 Sep 2013 14:47:16 +0000 (09:47 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 9 Oct 2013 13:16:58 +0000 (09:16 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/bin/lttng-relayd/main.c

index bf7be3e25c77b299e139ce8c46d49e6b8b9bb667..e7c57f32e2457595b31d20709837c5b7727741be 100644 (file)
@@ -519,6 +519,8 @@ void *relay_thread_listener(void *data)
 
        health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
 
+       health_code_update();
+
        control_sock = relay_init_sock(control_uri);
        if (!control_sock) {
                goto error_sock_control;
@@ -550,10 +552,14 @@ void *relay_thread_listener(void *data)
        }
 
        while (1) {
+               health_code_update();
+
                DBG("Listener accepting connections");
 
 restart:
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -568,6 +574,8 @@ restart:
 
                DBG("Relay new connection received");
                for (i = 0; i < nb_fd; i++) {
+                       health_code_update();
+
                        /* Fetch once the poll data */
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
@@ -662,7 +670,8 @@ error_sock_relay:
        lttcomm_destroy_sock(control_sock);
 error_sock_control:
        if (err) {
-               DBG("Thread exited with error");
+               health_error();
+               ERR("Health error occurred in %s", __func__);
        }
        health_unregister(health_relayd);
        DBG("Relay listener thread cleanup complete");
@@ -676,7 +685,7 @@ error_sock_control:
 static
 void *relay_thread_dispatcher(void *data)
 {
-       int ret;
+       int ret, err = -1;
        struct cds_wfq_node *node;
        struct relay_command *relay_cmd = NULL;
 
@@ -684,11 +693,17 @@ void *relay_thread_dispatcher(void *data)
 
        health_register(health_relayd, HEALTH_RELAYD_TYPE_DISPATCHER);
 
+       health_code_update();
+
        while (!CMM_LOAD_SHARED(dispatch_thread_exit)) {
+               health_code_update();
+
                /* Atomically prepare the queue futex */
                futex_nto1_prepare(&relay_cmd_queue.futex);
 
                do {
+                       health_code_update();
+
                        /* Dequeue commands */
                        node = cds_wfq_dequeue_blocking(&relay_cmd_queue.queue);
                        if (node == NULL) {
@@ -717,10 +732,19 @@ void *relay_thread_dispatcher(void *data)
                } while (node != NULL);
 
                /* Futex wait on queue. Blocking call on futex() */
+               health_poll_entry();
                futex_nto1_wait(&relay_cmd_queue.futex);
+               health_poll_exit();
        }
 
+       /* Normal exit, no error */
+       err = 0;
+
 error:
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
        health_unregister(health_relayd);
        DBG("Dispatch thread dying");
        stop_threads();
@@ -2156,6 +2180,8 @@ void *relay_thread_worker(void *data)
 
        health_register(health_relayd, HEALTH_RELAYD_TYPE_WORKER);
 
+       health_code_update();
+
        /* table of connections indexed on socket */
        relay_connections_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
        if (!relay_connections_ht) {
@@ -2182,9 +2208,13 @@ restart:
        while (1) {
                int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
 
+               health_code_update();
+
                /* Infinite blocking call, waiting for transmission */
                DBG3("Relayd worker thread polling...");
+               health_poll_entry();
                ret = lttng_poll_wait(&events, -1);
+               health_poll_exit();
                if (ret < 0) {
                        /*
                         * Restart interrupted system call.
@@ -2207,6 +2237,8 @@ restart:
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
                        int pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update();
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -2309,6 +2341,9 @@ restart:
                if (last_seen_data_fd >= 0) {
                        for (i = 0; i < nb_fd; i++) {
                                int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+                               health_code_update();
+
                                if (last_seen_data_fd == pollfd) {
                                        idx = i;
                                        break;
@@ -2322,6 +2357,8 @@ restart:
                        uint32_t revents = LTTNG_POLL_GETEV(&events, i);
                        int pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       health_code_update();
+
                        /* Skip the command pipe. It's handled in the first loop. */
                        if (pollfd == relay_cmd_pipe[0]) {
                                continue;
@@ -2371,6 +2408,9 @@ restart:
                last_seen_data_fd = -1;
        }
 
+       /* Normal exit, no error */
+       ret = 0;
+
 exit:
 error:
        lttng_poll_clean(&events);
@@ -2378,6 +2418,8 @@ error:
        /* empty the hash table and free the memory */
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, node, node) {
+               health_code_update();
+
                node = lttng_ht_iter_get_node_ulong(&iter);
                if (node) {
                        relay_connection = caa_container_of(node,
@@ -2397,11 +2439,15 @@ relay_connections_ht_error:
        if (err) {
                DBG("Thread exited with error");
        }
-       health_unregister(health_relayd);
        DBG("Worker thread cleanup complete");
        free(data_buffer);
-       stop_threads();
+       if (err) {
+               health_error();
+               ERR("Health error occurred in %s", __func__);
+       }
+       health_unregister(health_relayd);
        rcu_unregister_thread();
+       stop_threads();
        return NULL;
 }
 
This page took 0.030301 seconds and 5 git commands to generate.