From: Mathieu Desnoyers Date: Fri, 22 Dec 2023 18:29:16 +0000 (-0500) Subject: Implement agent thread X-Git-Url: http://git.efficios.com/?p=libside.git;a=commitdiff_plain;h=a125f21721e8fe6f37964213a737568a2692723a Implement agent thread Currently uses 1 second polling for state. To be replaced by a proper wait/wakeup. Signed-off-by: Mathieu Desnoyers --- diff --git a/src/side.c b/src/side.c index f7628b0..6295343 100644 --- a/src/side.c +++ b/src/side.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "rcu.h" #include "list.h" @@ -77,6 +79,18 @@ struct side_callback { uint64_t key; }; +enum agent_thread_state { + AGENT_THREAD_STATE_BLOCKED = 0, + AGENT_THREAD_STATE_HANDLE_REQUEST = (1 << 0), + AGENT_THREAD_STATE_EXIT = (1 << 1), +}; + +struct statedump_agent_thread { + long ref; + pthread_t id; + enum agent_thread_state state; +}; + static struct side_rcu_gp_state event_rcu_gp, statedump_rcu_gp; /* @@ -98,6 +112,8 @@ static pthread_mutex_t side_key_lock = PTHREAD_MUTEX_INITIALIZER; /* Dynamic tracer key allocation. */ static uint64_t side_key_next = SIDE_KEY_RESERVED_RANGE_END; +static struct statedump_agent_thread statedump_agent_thread; + static DEFINE_SIDE_LIST_HEAD(side_events_list); static DEFINE_SIDE_LIST_HEAD(side_tracer_list); @@ -556,6 +572,8 @@ void queue_statedump_pending(struct side_statedump_request_handle *handle, uint6 abort(); notif->key = key; side_list_insert_node_tail(&handle->notification_queue, ¬if->node); + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST); } /* Called with side_statedump_lock held. */ @@ -572,6 +590,94 @@ void unqueue_statedump_pending(struct side_statedump_request_handle *handle, uin } } +static +void side_statedump_run(struct side_statedump_request_handle *handle, + struct side_statedump_notification *notif) +{ + /* Invoke the state dump callback specifically for the tracer key. */ + filter_key = notif->key; + side_statedump_event_call(side_statedump_begin, + side_arg_list(side_arg_string(handle->name))); + handle->cb(); + side_statedump_event_call(side_statedump_end, + side_arg_list(side_arg_string(handle->name))); + filter_key = SIDE_KEY_MATCH_ALL; +} + +static +void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle) +{ + struct side_statedump_notification *notif, *tmp; + DEFINE_SIDE_LIST_HEAD(tmp_head); + + pthread_mutex_lock(&side_statedump_lock); + side_list_splice(&handle->notification_queue, &tmp_head); + side_list_head_init(&handle->notification_queue); + pthread_mutex_unlock(&side_statedump_lock); + + /* We are now sole owner of the tmp_head list. */ + side_list_for_each_entry(notif, &tmp_head, node) + side_statedump_run(handle, notif); + side_list_for_each_entry_safe(notif, tmp, &tmp_head, node) + free(notif); +} + +static +void *statedump_agent_func(void *arg __attribute__((unused))) +{ + for (;;) { + struct side_statedump_request_handle *handle; + struct side_rcu_read_state rcu_read_state; + enum agent_thread_state state; + + /* TODO: futex-based wakeup. */ + state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST); + if (state == AGENT_THREAD_STATE_BLOCKED) { + sleep(1); + continue; + } + if (state & AGENT_THREAD_STATE_EXIT) + break; + (void)__atomic_and_fetch(&statedump_agent_thread.state, ~AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST); + side_rcu_read_begin(&statedump_rcu_gp, &rcu_read_state); + side_list_for_each_entry_rcu(handle, &side_statedump_list, node) + _side_statedump_run_pending_requests(handle); + side_rcu_read_end(&statedump_rcu_gp, &rcu_read_state); + } + return NULL; +} + +static +void statedump_agent_thread_get(void) +{ + int ret; + + if (statedump_agent_thread.ref++) + return; + statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED; + ret = pthread_create(&statedump_agent_thread.id, NULL, + statedump_agent_func, NULL); + if (ret) { + abort(); + } +} + +static +void statedump_agent_thread_put(void) +{ + int ret; + void *retval; + + if (--statedump_agent_thread.ref) + return; + (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST); + ret = pthread_join(statedump_agent_thread.id, &retval); + if (ret) { + abort(); + } + statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED; +} + struct side_statedump_request_handle * side_statedump_request_notification_register(const char *state_name, void (*statedump_cb)(void), @@ -602,11 +708,27 @@ struct side_statedump_request_handle * side_list_head_init(&handle->notification_queue); pthread_mutex_lock(&side_statedump_lock); + if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + statedump_agent_thread_get(); side_list_insert_node_tail_rcu(&side_statedump_list, &handle->node); /* Queue statedump pending for all tracers. */ queue_statedump_pending(handle, SIDE_KEY_MATCH_ALL); pthread_mutex_unlock(&side_statedump_lock); + if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) { + for (;;) { + bool is_empty; + + /* TODO futex based wakeup. */ + pthread_mutex_lock(&side_statedump_lock); + is_empty = side_list_empty(&handle->notification_queue); + pthread_mutex_unlock(&side_statedump_lock); + if (is_empty) + break; + sleep(1); + } + } + return handle; name_nomem: @@ -625,6 +747,8 @@ void side_statedump_request_notification_unregister(struct side_statedump_reques pthread_mutex_lock(&side_statedump_lock); unqueue_statedump_pending(handle, SIDE_KEY_MATCH_ALL); side_list_remove_node_rcu(&handle->node); + if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) + statedump_agent_thread_put(); pthread_mutex_unlock(&side_statedump_lock); side_rcu_wait_grace_period(&statedump_rcu_gp); @@ -645,37 +769,6 @@ bool side_statedump_poll_pending_requests(struct side_statedump_request_handle * return ret; } -static -void side_statedump_run(struct side_statedump_request_handle *handle, - struct side_statedump_notification *notif) -{ - /* Invoke the state dump callback specifically for the tracer key. */ - filter_key = notif->key; - side_statedump_event_call(side_statedump_begin, - side_arg_list(side_arg_string(handle->name))); - handle->cb(); - side_statedump_event_call(side_statedump_end, - side_arg_list(side_arg_string(handle->name))); - filter_key = SIDE_KEY_MATCH_ALL; -} - -static -void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle) -{ - struct side_statedump_notification *notif, *tmp; - DEFINE_SIDE_LIST_HEAD(tmp_head); - - pthread_mutex_lock(&side_statedump_lock); - side_list_splice(&handle->notification_queue, &tmp_head); - pthread_mutex_lock(&side_statedump_lock); - - /* We are now sole owner of the tmp_head list. */ - side_list_for_each_entry(notif, &tmp_head, node) - side_statedump_run(handle, notif); - side_list_for_each_entry_safe(notif, tmp, &tmp_head, node) - free(notif); -} - /* * Only polling mode state dump handles allow application to explicitly handle the * pending requests.