#include <side/trace.h>
#include <string.h>
#include <assert.h>
+#include <pthread.h>
+#include <unistd.h>
#include "rcu.h"
#include "list.h"
uint64_t key;
};
+enum agent_thread_state {
+ AGENT_THREAD_STATE_BLOCKED = 0,
+ AGENT_THREAD_STATE_HANDLE_REQUEST = (1 << 0),
+ AGENT_THREAD_STATE_EXIT = (1 << 1),
+};
+
+struct statedump_agent_thread {
+ long ref;
+ pthread_t id;
+ enum agent_thread_state state;
+};
+
static struct side_rcu_gp_state event_rcu_gp, statedump_rcu_gp;
/*
/* Dynamic tracer key allocation. */
static uint64_t side_key_next = SIDE_KEY_RESERVED_RANGE_END;
+static struct statedump_agent_thread statedump_agent_thread;
+
static DEFINE_SIDE_LIST_HEAD(side_events_list);
static DEFINE_SIDE_LIST_HEAD(side_tracer_list);
abort();
notif->key = key;
side_list_insert_node_tail(&handle->notification_queue, ¬if->node);
+ if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+ (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
}
/* Called with side_statedump_lock held. */
}
}
+static
+void side_statedump_run(struct side_statedump_request_handle *handle,
+ struct side_statedump_notification *notif)
+{
+ /* Invoke the state dump callback specifically for the tracer key. */
+ filter_key = notif->key;
+ side_statedump_event_call(side_statedump_begin,
+ side_arg_list(side_arg_string(handle->name)));
+ handle->cb();
+ side_statedump_event_call(side_statedump_end,
+ side_arg_list(side_arg_string(handle->name)));
+ filter_key = SIDE_KEY_MATCH_ALL;
+}
+
+static
+void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle)
+{
+ struct side_statedump_notification *notif, *tmp;
+ DEFINE_SIDE_LIST_HEAD(tmp_head);
+
+ pthread_mutex_lock(&side_statedump_lock);
+ side_list_splice(&handle->notification_queue, &tmp_head);
+ side_list_head_init(&handle->notification_queue);
+ pthread_mutex_unlock(&side_statedump_lock);
+
+ /* We are now sole owner of the tmp_head list. */
+ side_list_for_each_entry(notif, &tmp_head, node)
+ side_statedump_run(handle, notif);
+ side_list_for_each_entry_safe(notif, tmp, &tmp_head, node)
+ free(notif);
+}
+
+static
+void *statedump_agent_func(void *arg __attribute__((unused)))
+{
+ for (;;) {
+ struct side_statedump_request_handle *handle;
+ struct side_rcu_read_state rcu_read_state;
+ enum agent_thread_state state;
+
+ /* TODO: futex-based wakeup. */
+ state = __atomic_load_n(&statedump_agent_thread.state, __ATOMIC_SEQ_CST);
+ if (state == AGENT_THREAD_STATE_BLOCKED) {
+ sleep(1);
+ continue;
+ }
+ if (state & AGENT_THREAD_STATE_EXIT)
+ break;
+ (void)__atomic_and_fetch(&statedump_agent_thread.state, ~AGENT_THREAD_STATE_HANDLE_REQUEST, __ATOMIC_SEQ_CST);
+ side_rcu_read_begin(&statedump_rcu_gp, &rcu_read_state);
+ side_list_for_each_entry_rcu(handle, &side_statedump_list, node)
+ _side_statedump_run_pending_requests(handle);
+ side_rcu_read_end(&statedump_rcu_gp, &rcu_read_state);
+ }
+ return NULL;
+}
+
+static
+void statedump_agent_thread_get(void)
+{
+ int ret;
+
+ if (statedump_agent_thread.ref++)
+ return;
+ statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
+ ret = pthread_create(&statedump_agent_thread.id, NULL,
+ statedump_agent_func, NULL);
+ if (ret) {
+ abort();
+ }
+}
+
+static
+void statedump_agent_thread_put(void)
+{
+ int ret;
+ void *retval;
+
+ if (--statedump_agent_thread.ref)
+ return;
+ (void)__atomic_or_fetch(&statedump_agent_thread.state, AGENT_THREAD_STATE_EXIT, __ATOMIC_SEQ_CST);
+ ret = pthread_join(statedump_agent_thread.id, &retval);
+ if (ret) {
+ abort();
+ }
+ statedump_agent_thread.state = AGENT_THREAD_STATE_BLOCKED;
+}
+
struct side_statedump_request_handle *
side_statedump_request_notification_register(const char *state_name,
void (*statedump_cb)(void),
side_list_head_init(&handle->notification_queue);
pthread_mutex_lock(&side_statedump_lock);
+ if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+ statedump_agent_thread_get();
side_list_insert_node_tail_rcu(&side_statedump_list, &handle->node);
/* Queue statedump pending for all tracers. */
queue_statedump_pending(handle, SIDE_KEY_MATCH_ALL);
pthread_mutex_unlock(&side_statedump_lock);
+ if (mode == SIDE_STATEDUMP_MODE_AGENT_THREAD) {
+ for (;;) {
+ bool is_empty;
+
+ /* TODO futex based wakeup. */
+ pthread_mutex_lock(&side_statedump_lock);
+ is_empty = side_list_empty(&handle->notification_queue);
+ pthread_mutex_unlock(&side_statedump_lock);
+ if (is_empty)
+ break;
+ sleep(1);
+ }
+ }
+
return handle;
name_nomem:
pthread_mutex_lock(&side_statedump_lock);
unqueue_statedump_pending(handle, SIDE_KEY_MATCH_ALL);
side_list_remove_node_rcu(&handle->node);
+ if (handle->mode == SIDE_STATEDUMP_MODE_AGENT_THREAD)
+ statedump_agent_thread_put();
pthread_mutex_unlock(&side_statedump_lock);
side_rcu_wait_grace_period(&statedump_rcu_gp);
return ret;
}
-static
-void side_statedump_run(struct side_statedump_request_handle *handle,
- struct side_statedump_notification *notif)
-{
- /* Invoke the state dump callback specifically for the tracer key. */
- filter_key = notif->key;
- side_statedump_event_call(side_statedump_begin,
- side_arg_list(side_arg_string(handle->name)));
- handle->cb();
- side_statedump_event_call(side_statedump_end,
- side_arg_list(side_arg_string(handle->name)));
- filter_key = SIDE_KEY_MATCH_ALL;
-}
-
-static
-void _side_statedump_run_pending_requests(struct side_statedump_request_handle *handle)
-{
- struct side_statedump_notification *notif, *tmp;
- DEFINE_SIDE_LIST_HEAD(tmp_head);
-
- pthread_mutex_lock(&side_statedump_lock);
- side_list_splice(&handle->notification_queue, &tmp_head);
- pthread_mutex_lock(&side_statedump_lock);
-
- /* We are now sole owner of the tmp_head list. */
- side_list_for_each_entry(notif, &tmp_head, node)
- side_statedump_run(handle, notif);
- side_list_for_each_entry_safe(notif, tmp, &tmp_head, node)
- free(notif);
-}
-
/*
* Only polling mode state dump handles allow application to explicitly handle the
* pending requests.