From: Mathieu Desnoyers Date: Fri, 22 Dec 2023 20:03:07 +0000 (-0500) Subject: Implement agent thread wakeup scheme with pthread cond var X-Git-Url: http://git.efficios.com/?p=libside.git;a=commitdiff_plain;h=76dd11f984e13ab51cf11700974f17903cdcd32e Implement agent thread wakeup scheme with pthread cond var Signed-off-by: Mathieu Desnoyers --- diff --git a/src/side.c b/src/side.c index 6295343..7f6d58a 100644 --- a/src/side.c +++ b/src/side.c @@ -89,6 +89,8 @@ struct statedump_agent_thread { long ref; pthread_t id; enum agent_thread_state state; + pthread_cond_t worker_cond; + pthread_cond_t waiter_cond; }; static struct side_rcu_gp_state event_rcu_gp, statedump_rcu_gp; @@ -108,6 +110,14 @@ static bool finalized; static pthread_mutex_t side_event_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; static pthread_mutex_t side_statedump_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; static pthread_mutex_t side_key_lock = PTHREAD_MUTEX_INITIALIZER; +/* + * The side_agent_thread_lock protects the life-time of the agent + * thread: reference counting, creation, join. It is not taken by + * the agent thread per se so it does not have a circular dependency + * with pthread join. + * The side_statedump_lock nests inside the side_agent_thread_lock. + */ +static pthread_mutex_t side_agent_thread_lock = PTHREAD_MUTEX_INITIALIZER; /* Dynamic tracer key allocation. */ static uint64_t side_key_next = SIDE_KEY_RESERVED_RANGE_END; @@ -572,8 +582,10 @@ void queue_statedump_pending(struct side_statedump_request_handle *handle, uint6 abort(); notif->key = key; side_list_insert_node_tail(&handle->notification_queue, ¬if->node); - if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) { (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(&statedump_agent_thread.worker_cond); + } } /* Called with side_statedump_lock held. */ @@ -620,6 +632,12 @@ void _side_statedump_run_pending_requests(struct side_statedump_request_handle * side_statedump_run(handle, notif); side_list_for_each_entry_safe(notif, tmp, &tmp_head, node) free(notif); + + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) { + pthread_mutex_lock(&side_statedump_lock); + pthread_cond_broadcast(&statedump_agent_thread.waiter_cond); + pthread_mutex_unlock(&side_statedump_lock); + } } static @@ -630,12 +648,15 @@ void *statedump_agent_func(void *arg __attribute__((unused))) struct side_rcu_read_state rcu_read_state; enum agent_thread_state state; - /* TODO: futex-based wakeup. */ - state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST); - if (state == AGENT_THREAD_STATE_BLOCKED) { - sleep(1); - continue; + pthread_mutex_lock(&side_statedump_lock); + for (;;) { + state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST); + if (state == AGENT_THREAD_STATE_BLOCKED) + pthread_cond_wait(&statedump_agent_thread.worker_cond, &side_statedump_lock); + else + break; } + pthread_mutex_unlock(&side_statedump_lock); if (state & AGENT_THREAD_STATE_EXIT) break; (void)__atomic_and_fetch(&statedump_agent_thread.state, ~AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST); @@ -647,6 +668,7 @@ void *statedump_agent_func(void *arg __attribute__((unused))) return NULL; } +/* Called with side_agent_thread_lock and side_statedump_lock held. */ static void statedump_agent_thread_get(void) { @@ -654,6 +676,8 @@ void statedump_agent_thread_get(void) if (statedump_agent_thread.ref++) return; + pthread_cond_init(&statedump_agent_thread.worker_cond, NULL); + pthread_cond_init(&statedump_agent_thread.waiter_cond, NULL); statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED; ret = pthread_create(&statedump_agent_thread.id, NULL, statedump_agent_func, NULL); @@ -662,20 +686,34 @@ void statedump_agent_thread_get(void) } } +/* + * Called with side_agent_thread_lock and side_statedump_lock held. + * Returns true if join for agent thread is needed. + */ +static +bool statedump_agent_thread_put(void) +{ + if (--statedump_agent_thread.ref) + return false; + (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST); + pthread_cond_broadcast(&statedump_agent_thread.worker_cond); + return true; +} + +/* Called with side_agent_thread_lock held. */ static -void statedump_agent_thread_put(void) +void statedump_agent_thread_join(void) { int ret; void *retval; - if (--statedump_agent_thread.ref) - return; - (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST); ret = pthread_join(statedump_agent_thread.id, &retval); if (ret) { abort(); } statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED; + pthread_cond_destroy(&statedump_agent_thread.worker_cond); + pthread_cond_destroy(&statedump_agent_thread.waiter_cond); } struct side_statedump_request_handle * @@ -707,6 +745,8 @@ struct side_statedump_request_handle * handle->mode = mode; side_list_head_init(&handle->notification_queue); + if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + pthread_mutex_lock(&side_agent_thread_lock); pthread_mutex_lock(&side_statedump_lock); if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) statedump_agent_thread_get(); @@ -716,17 +756,12 @@ struct side_statedump_request_handle * pthread_mutex_unlock(&side_statedump_lock); if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) { - for (;;) { - bool is_empty; + pthread_mutex_unlock(&side_agent_thread_lock); - /* TODO futex based wakeup. */ - pthread_mutex_lock(&side_statedump_lock); - is_empty = side_list_empty(&handle->notification_queue); - pthread_mutex_unlock(&side_statedump_lock); - if (is_empty) - break; - sleep(1); - } + pthread_mutex_lock(&side_statedump_lock); + while (!side_list_empty(&handle->notification_queue)) + pthread_cond_wait(&statedump_agent_thread.waiter_cond, &side_statedump_lock); + pthread_mutex_unlock(&side_statedump_lock); } return handle; @@ -738,18 +773,26 @@ name_nomem: void side_statedump_request_notification_unregister(struct side_statedump_request_handle *handle) { + bool join = false; + if (finalized) return; if (!initialized) side_init(); assert(!filter_key); + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + pthread_mutex_lock(&side_agent_thread_lock); pthread_mutex_lock(&side_statedump_lock); unqueue_statedump_pending(handle, SIDE_KEY_MATCH_ALL); side_list_remove_node_rcu(&handle->node); if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) - statedump_agent_thread_put(); + join = statedump_agent_thread_put(); pthread_mutex_unlock(&side_statedump_lock); + if (join) + statedump_agent_thread_join(); + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + pthread_mutex_unlock(&side_agent_thread_lock); side_rcu_wait_grace_period(&statedump_rcu_gp); free(handle->name);