Implement agent thread wakeup scheme with pthread cond var
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 22 Dec 2023 20:03:07 +0000 (15:03 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 22 Dec 2023 20:03:07 +0000 (15:03 -0500)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/side.c

index 6295343dc8e98a02368391394cac01253584d090..7f6d58a83370c2351d4180c65886a9e4777a26f6 100644 (file)
@@ -89,6 +89,8 @@ struct statedump_agent_thread {
        long ref;
        pthread_t id;
        enum agent_thread_state state;
+       pthread_cond_t worker_cond;
+       pthread_cond_t waiter_cond;
 };
 
 static struct side_rcu_gp_state event_rcu_gp, statedump_rcu_gp;
@@ -108,6 +110,14 @@ static bool finalized;
 static pthread_mutex_t side_event_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 static pthread_mutex_t side_statedump_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 static pthread_mutex_t side_key_lock = PTHREAD_MUTEX_INITIALIZER;
+/*
+ * The side_agent_thread_lock protects the life-time of the agent
+ * thread: reference counting, creation, join. It is not taken by
+ * the agent thread per se so it does not have a circular dependency
+ * with pthread join.
+ * The side_statedump_lock nests inside the side_agent_thread_lock.
+ */
+static pthread_mutex_t side_agent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
 
 /* Dynamic tracer key allocation. */
 static uint64_t side_key_next = SIDE_KEY_RESERVED_RANGE_END;
@@ -572,8 +582,10 @@ void queue_statedump_pending(struct side_statedump_request_handle *handle, uint6
                abort();
        notif->key = key;
        side_list_insert_node_tail(&handle->notification_queue, &notif->node);
-       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
                (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
+               pthread_cond_broadcast(&statedump_agent_thread.worker_cond);
+       }
 }
 
 /* Called with side_statedump_lock held. */
@@ -620,6 +632,12 @@ void _side_statedump_run_pending_requests(struct side_statedump_request_handle *
                side_statedump_run(handle, notif);
        side_list_for_each_entry_safe(notif, tmp, &tmp_head, node)
                free(notif);
+
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
+               pthread_mutex_lock(&side_statedump_lock);
+               pthread_cond_broadcast(&statedump_agent_thread.waiter_cond);
+               pthread_mutex_unlock(&side_statedump_lock);
+       }
 }
 
 static
@@ -630,12 +648,15 @@ void *statedump_agent_func(void *arg __attribute__((unused)))
                struct side_rcu_read_state rcu_read_state;
                enum agent_thread_state state;
 
-               /* TODO: futex-based wakeup. */
-               state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST);
-               if (state == AGENT_THREAD_STATE_BLOCKED) {
-                       sleep(1);
-                       continue;
+               pthread_mutex_lock(&side_statedump_lock);
+               for (;;) {
+                       state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST);
+                       if (state == AGENT_THREAD_STATE_BLOCKED)
+                               pthread_cond_wait(&statedump_agent_thread.worker_cond, &side_statedump_lock);
+                       else
+                               break;
                }
+               pthread_mutex_unlock(&side_statedump_lock);
                if (state & AGENT_THREAD_STATE_EXIT)
                        break;
                (void)__atomic_and_fetch(&statedump_agent_thread.state, ~AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
@@ -647,6 +668,7 @@ void *statedump_agent_func(void *arg __attribute__((unused)))
        return NULL;
 }
 
+/* Called with side_agent_thread_lock and side_statedump_lock held. */
 static
 void statedump_agent_thread_get(void)
 {
@@ -654,6 +676,8 @@ void statedump_agent_thread_get(void)
 
        if (statedump_agent_thread.ref++)
                return;
+       pthread_cond_init(&statedump_agent_thread.worker_cond, NULL);
+       pthread_cond_init(&statedump_agent_thread.waiter_cond, NULL);
        statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
        ret = pthread_create(&statedump_agent_thread.id, NULL,
                        statedump_agent_func, NULL);
@@ -662,20 +686,34 @@ void statedump_agent_thread_get(void)
        }
 }
 
+/*
+ * Called with side_agent_thread_lock and side_statedump_lock held.
+ * Returns true if join for agent thread is needed.
+ */
+static
+bool statedump_agent_thread_put(void)
+{
+       if (--statedump_agent_thread.ref)
+               return false;
+       (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST);
+       pthread_cond_broadcast(&statedump_agent_thread.worker_cond);
+       return true;
+}
+
+/* Called with side_agent_thread_lock held. */
 static
-void statedump_agent_thread_put(void)
+void statedump_agent_thread_join(void)
 {
        int ret;
        void *retval;
 
-       if (--statedump_agent_thread.ref)
-               return;
-       (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST);
        ret = pthread_join(statedump_agent_thread.id, &retval);
        if (ret) {
                abort();
        }
        statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
+       pthread_cond_destroy(&statedump_agent_thread.worker_cond);
+       pthread_cond_destroy(&statedump_agent_thread.waiter_cond);
 }
 
 struct side_statedump_request_handle *
@@ -707,6 +745,8 @@ struct side_statedump_request_handle *
        handle->mode = mode;
        side_list_head_init(&handle->notification_queue);
 
+       if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_lock(&side_agent_thread_lock);
        pthread_mutex_lock(&side_statedump_lock);
        if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
                statedump_agent_thread_get();
@@ -716,17 +756,12 @@ struct side_statedump_request_handle *
        pthread_mutex_unlock(&side_statedump_lock);
 
        if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
-               for (;;) {
-                       bool is_empty;
+               pthread_mutex_unlock(&side_agent_thread_lock);
 
-                       /* TODO futex based wakeup. */
-                       pthread_mutex_lock(&side_statedump_lock);
-                       is_empty = side_list_empty(&handle->notification_queue);
-                       pthread_mutex_unlock(&side_statedump_lock);
-                       if (is_empty)
-                               break;
-                       sleep(1);
-               }
+               pthread_mutex_lock(&side_statedump_lock);
+               while (!side_list_empty(&handle->notification_queue))
+                       pthread_cond_wait(&statedump_agent_thread.waiter_cond, &side_statedump_lock);
+               pthread_mutex_unlock(&side_statedump_lock);
        }
 
        return handle;
@@ -738,18 +773,26 @@ name_nomem:
 
 void side_statedump_request_notification_unregister(struct side_statedump_request_handle *handle)
 {
+       bool join = false;
+
        if (finalized)
                return;
        if (!initialized)
                side_init();
        assert(!filter_key);
 
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_lock(&side_agent_thread_lock);
        pthread_mutex_lock(&side_statedump_lock);
        unqueue_statedump_pending(handle, SIDE_KEY_MATCH_ALL);
        side_list_remove_node_rcu(&handle->node);
        if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
-               statedump_agent_thread_put();
+               join = statedump_agent_thread_put();
        pthread_mutex_unlock(&side_statedump_lock);
+       if (join)
+               statedump_agent_thread_join();
+       if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+               pthread_mutex_unlock(&side_agent_thread_lock);
 
        side_rcu_wait_grace_period(&statedump_rcu_gp);
        free(handle->name);
This page took 0.027091 seconds and 4 git commands to generate.