Handle statedump agent thread state across fork
[libside.git] / src / side.c
index f7628b07e15c537056d353afc80a4114a2691a6a..60865be5476eb3a89a81cfd4f6ec4078ccf6bf45 100644 (file)
@@ -6,7 +6,11 @@
 #include <side/trace.h>
 #include <string.h>
 #include <assert.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <poll.h>
 
+#include "compiler.h"
 #include "rcu.h"
 #include "list.h"
 #include "rculist.h"
@@ -37,6 +41,9 @@
 /* Key 0x2 is reserved for ptrace. */
 #define SIDE_KEY_PTRACE                                        0x2
 
+#define SIDE_RETRY_BUSY_LOOP_ATTEMPTS                  100
+#define SIDE_RETRY_DELAY_MS                            1
+
 struct side_events_register_handle {
        struct side_list_node node;
        struct side_event_description **events;
@@ -77,6 +84,22 @@ struct side_callback {
        uint64_t key;
 };
 
+enum agent_thread_state {
+       AGENT_THREAD_STATE_BLOCKED = 0,
+       AGENT_THREAD_STATE_HANDLE_REQUEST = (1 << 0),
+       AGENT_THREAD_STATE_EXIT = (1 << 1),
+       AGENT_THREAD_STATE_PAUSE = (1 << 2),
+       AGENT_THREAD_STATE_PAUSE_ACK = (1 << 3),
+};
+
+struct statedump_agent_thread {
+       long ref;
+       pthread_t id;
+       enum agent_thread_state state;
+       pthread_cond_t worker_cond;
+       pthread_cond_t waiter_cond;
+};
+
 static struct side_rcu_gp_state event_rcu_gp, statedump_rcu_gp;
 
 /*
@@ -94,10 +117,20 @@ static bool finalized;
 static pthread_mutex_t side_event_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 static pthread_mutex_t side_statedump_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 static pthread_mutex_t side_key_lock = PTHREAD_MUTEX_INITIALIZER;
+/*
+ * The side_agent_thread_lock protects the life-time of the agent
+ * thread: reference counting, creation, join. It is not taken by
+ * the agent thread per se so it does not have a circular dependency
+ * with pthread join.
+ * The side_statedump_lock nests inside the side_agent_thread_lock.
+ */
+static pthread_mutex_t side_agent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Dynamic tracer key allocation. */
 static uint64_t side_key_next = SIDE_KEY_RESERVED_RANGE_END;
 
+static struct statedump_agent_thread statedump_agent_thread;
+
 static DEFINE_SIDE_LIST_HEAD(side_events_list);
 static DEFINE_SIDE_LIST_HEAD(side_tracer_list);
 
@@ -556,6 +589,10 @@ void queue_statedump_pending(struct side_statedump_request_handle *handle, uint6
                abort();
        notif->key = key;
        side_list_insert_node_tail(&handle->notification_queue, &notif->node);
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
+               (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
+               pthread_cond_broadcast(&statedump_agent_thread.worker_cond);
+       }
 }
 
 /* Called with side_statedump_lock held. */
@@ -572,6 +609,151 @@ void unqueue_statedump_pending(struct side_statedump_request_handle *handle, uin
        }
 }
 
+static
+void side_statedump_run(struct side_statedump_request_handle *handle,
+               struct side_statedump_notification *notif)
+{
+       /* Invoke the state dump callback specifically for the tracer key. */
+       filter_key = notif->key;
+       side_statedump_event_call(side_statedump_begin,
+               side_arg_list(side_arg_string(handle->name)));
+       handle->cb();
+       side_statedump_event_call(side_statedump_end,
+               side_arg_list(side_arg_string(handle->name)));
+       filter_key = SIDE_KEY_MATCH_ALL;
+}
+
+static
+void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle)
+{
+       struct side_statedump_notification *notif, *tmp;
+       DEFINE_SIDE_LIST_HEAD(tmp_head);
+
+       pthread_mutex_lock(&side_statedump_lock);
+       side_list_splice(&handle->notification_queue, &tmp_head);
+       side_list_head_init(&handle->notification_queue);
+       pthread_mutex_unlock(&side_statedump_lock);
+
+       /* We are now sole owner of the tmp_head list. */
+       side_list_for_each_entry(notif, &tmp_head, node)
+               side_statedump_run(handle, notif);
+       side_list_for_each_entry_safe(notif, tmp, &tmp_head, node)
+               free(notif);
+
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
+               pthread_mutex_lock(&side_statedump_lock);
+               pthread_cond_broadcast(&statedump_agent_thread.waiter_cond);
+               pthread_mutex_unlock(&side_statedump_lock);
+       }
+}
+
+static
+void *statedump_agent_func(void *arg __attribute__((unused)))
+{
+       for (;;) {
+               struct side_statedump_request_handle *handle;
+               struct side_rcu_read_state rcu_read_state;
+               enum agent_thread_state state;
+
+               pthread_mutex_lock(&side_statedump_lock);
+               for (;;) {
+                       state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST);
+                       if (state == AGENT_THREAD_STATE_BLOCKED)
+                               pthread_cond_wait(&statedump_agent_thread.worker_cond, &side_statedump_lock);
+                       else
+                               break;
+               }
+               pthread_mutex_unlock(&side_statedump_lock);
+               if (state & AGENT_THREAD_STATE_EXIT)
+                       break;
+               if (state & AGENT_THREAD_STATE_PAUSE) {
+                       int attempt = 0;
+
+                       (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_PAUSE_ACK, __ATOMIC_SEQ_CST);
+                       for (;;) {
+                               state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST);
+                               if (!(state & AGENT_THREAD_STATE_PAUSE))
+                                       break;
+                               if (attempt > SIDE_RETRY_BUSY_LOOP_ATTEMPTS) {
+                                       (void)poll(NULL, 0, SIDE_RETRY_DELAY_MS);
+                                       continue;
+                               }
+                               attempt++;
+                               side_cpu_relax();
+                       }
+                       continue;
+               }
+               (void)__atomic_and_fetch(&statedump_agent_thread.state, ~AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
+               side_rcu_read_begin(&statedump_rcu_gp, &rcu_read_state);
+               side_list_for_each_entry_rcu(handle, &side_statedump_list, node)
+                       _side_statedump_run_pending_requests(handle);
+               side_rcu_read_end(&statedump_rcu_gp, &rcu_read_state);
+       }
+       return NULL;
+}
+
+static
+void statedump_agent_thread_init(void)
+{
+       pthread_cond_init(&statedump_agent_thread.worker_cond, NULL);
+       pthread_cond_init(&statedump_agent_thread.waiter_cond, NULL);
+       statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
+}
+
+/* Called with side_agent_thread_lock and side_statedump_lock held. */
+static
+void statedump_agent_thread_get(void)
+{
+       int ret;
+
+       if (statedump_agent_thread.ref++)
+               return;
+       statedump_agent_thread_init();
+       ret = pthread_create(&statedump_agent_thread.id, NULL,
+                       statedump_agent_func, NULL);
+       if (ret) {
+               abort();
+       }
+}
+
+/*
+ * Called with side_agent_thread_lock and side_statedump_lock held.
+ * Returns true if join for agent thread is needed.
+ */
+static
+bool statedump_agent_thread_put(void)
+{
+       if (--statedump_agent_thread.ref)
+               return false;
+       (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST);
+       pthread_cond_broadcast(&statedump_agent_thread.worker_cond);
+       return true;
+}
+
+static
+void statedump_agent_thread_fini(void)
+{
+       statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
+       if (pthread_cond_destroy(&statedump_agent_thread.worker_cond))
+               abort();
+       if (pthread_cond_destroy(&statedump_agent_thread.waiter_cond))
+               abort();
+}
+
+/* Called with side_agent_thread_lock held. */
+static
+void statedump_agent_thread_join(void)
+{
+       int ret;
+       void *retval;
+
+       ret = pthread_join(statedump_agent_thread.id, &retval);
+       if (ret) {
+               abort();
+       }
+       statedump_agent_thread_fini();
+}
+
 struct side_statedump_request_handle *
        side_statedump_request_notification_register(const char *state_name,
                void (*statedump_cb)(void),
@@ -601,12 +783,25 @@ struct side_statedump_request_handle *
        handle->mode = mode;
        side_list_head_init(&handle->notification_queue);
 
+       if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_lock(&side_agent_thread_lock);
        pthread_mutex_lock(&side_statedump_lock);
+       if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               statedump_agent_thread_get();
        side_list_insert_node_tail_rcu(&side_statedump_list, &handle->node);
        /* Queue statedump pending for all tracers. */
        queue_statedump_pending(handle, SIDE_KEY_MATCH_ALL);
        pthread_mutex_unlock(&side_statedump_lock);
 
+       if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
+               pthread_mutex_unlock(&side_agent_thread_lock);
+
+               pthread_mutex_lock(&side_statedump_lock);
+               while (!side_list_empty(&handle->notification_queue))
+                       pthread_cond_wait(&statedump_agent_thread.waiter_cond, &side_statedump_lock);
+               pthread_mutex_unlock(&side_statedump_lock);
+       }
+
        return handle;
 
 name_nomem:
@@ -616,16 +811,26 @@ name_nomem:
 
 void side_statedump_request_notification_unregister(struct side_statedump_request_handle *handle)
 {
+       bool join = false;
+
        if (finalized)
                return;
        if (!initialized)
                side_init();
        assert(!filter_key);
 
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_lock(&side_agent_thread_lock);
        pthread_mutex_lock(&side_statedump_lock);
        unqueue_statedump_pending(handle, SIDE_KEY_MATCH_ALL);
        side_list_remove_node_rcu(&handle->node);
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               join = statedump_agent_thread_put();
        pthread_mutex_unlock(&side_statedump_lock);
+       if (join)
+               statedump_agent_thread_join();
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_unlock(&side_agent_thread_lock);
 
        side_rcu_wait_grace_period(&statedump_rcu_gp);
        free(handle->name);
@@ -645,37 +850,6 @@ bool side_statedump_poll_pending_requests(struct side_statedump_request_handle *
        return ret;
 }
 
-static
-void side_statedump_run(struct side_statedump_request_handle *handle,
-               struct side_statedump_notification *notif)
-{
-       /* Invoke the state dump callback specifically for the tracer key. */
-       filter_key = notif->key;
-       side_statedump_event_call(side_statedump_begin,
-               side_arg_list(side_arg_string(handle->name)));
-       handle->cb();
-       side_statedump_event_call(side_statedump_end,
-               side_arg_list(side_arg_string(handle->name)));
-       filter_key = SIDE_KEY_MATCH_ALL;
-}
-
-static
-void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle)
-{
-       struct side_statedump_notification *notif, *tmp;
-       DEFINE_SIDE_LIST_HEAD(tmp_head);
-
-       pthread_mutex_lock(&side_statedump_lock);
-       side_list_splice(&handle->notification_queue, &tmp_head);
-       pthread_mutex_lock(&side_statedump_lock);
-
-       /* We are now sole owner of the tmp_head list. */
-       side_list_for_each_entry(notif, &tmp_head, node)
-               side_statedump_run(handle, notif);
-       side_list_for_each_entry_safe(notif, tmp, &tmp_head, node)
-               free(notif);
-}
-
 /*
  * Only polling mode state dump handles allow application to explicitly handle the
  * pending requests.
@@ -739,12 +913,76 @@ end:
        return ret;
 }
 
+/*
+ * Use of pthread_atfork depends on glibc 2.24 to eliminate hangs when
+ * waiting for the agent thread if the agent thread calls malloc. This
+ * is corrected by GNU libc
+ * commit 8a727af925be63aa6ea0f5f90e16751fd541626b.
+ * Ref. https://bugzilla.redhat.com/show_bug.cgi?id=906468
+ */
+static
+void side_before_fork(void)
+{
+       int attempt = 0;
+
+       pthread_mutex_lock(&side_agent_thread_lock);
+       if (!statedump_agent_thread.ref)
+               return;
+       /* Pause agent thread. */
+       pthread_mutex_lock(&side_statedump_lock);
+       (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_PAUSE, __ATOMIC_SEQ_CST);
+       pthread_cond_broadcast(&statedump_agent_thread.worker_cond);
+       pthread_mutex_unlock(&side_statedump_lock);
+       /* Wait for agent thread acknowledge. */
+       while (!(__atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST) & AGENT_THREAD_STATE_PAUSE_ACK)) {
+               if (attempt > SIDE_RETRY_BUSY_LOOP_ATTEMPTS) {
+                       (void)poll(NULL, 0, SIDE_RETRY_DELAY_MS);
+                       continue;
+               }
+               attempt++;
+               side_cpu_relax();
+       }
+}
+
+static
+void side_after_fork_parent(void)
+{
+       if (statedump_agent_thread.ref)
+               (void)__atomic_and_fetch(&statedump_agent_thread.state,
+                       ~(AGENT_THREAD_STATE_PAUSE | AGENT_THREAD_STATE_PAUSE_ACK),
+                       __ATOMIC_SEQ_CST);
+       pthread_mutex_unlock(&side_agent_thread_lock);
+}
+
+/*
+ * The agent thread does not exist in the child process after a fork.
+ * Re-initialize its data structures and create a new agent thread.
+ */
+static
+void side_after_fork_child(void)
+{
+       if (statedump_agent_thread.ref) {
+               int ret;
+
+               statedump_agent_thread_fini();
+               statedump_agent_thread_init();
+               ret = pthread_create(&statedump_agent_thread.id, NULL,
+                               statedump_agent_func, NULL);
+               if (ret) {
+                       abort();
+               }
+       }
+       pthread_mutex_unlock(&side_agent_thread_lock);
+}
+
 void side_init(void)
 {
        if (initialized)
                return;
        side_rcu_gp_init(&event_rcu_gp);
        side_rcu_gp_init(&statedump_rcu_gp);
+       if (pthread_atfork(side_before_fork, side_after_fork_parent, side_after_fork_child))
+               abort();
        initialized = true;
 }
 
This page took 0.046598 seconds and 4 git commands to generate.