Implement RCU wait/wakeup scheme based on futex
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 31 Oct 2022 16:16:45 +0000 (12:16 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 31 Oct 2022 16:18:02 +0000 (12:18 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
src/rcu.c
src/rcu.h

index 3c6dafbb35b4f6f6d1b51dd56648111909a57381..2e8be1c1b238a17ea337104ca4b90d4defb93b2b 100644 (file)
--- a/src/rcu.c
+++ b/src/rcu.c
@@ -30,6 +30,83 @@ membarrier(int cmd, unsigned int flags, int cpu_id)
        return syscall(__NR_membarrier, cmd, flags, cpu_id);
 }
 
+/*
+ * Wait/wakeup scheme with single waiter/many wakers.
+ */
+static
+void wait_gp_prepare(struct side_rcu_gp_state *gp_state)
+{
+       __atomic_store_n(&gp_state->futex, -1, __ATOMIC_RELAXED);
+       /*
+        * This memory barrier (H) pairs with memory barrier (F). It
+        * orders store to futex before load of RCU reader's counter
+        * state, thus ensuring that load of RCU reader's counters does
+        * not leak outside of futex state=-1.
+        */
+       if (side_rcu_rseq_membarrier_available) {
+               if (membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0)) {
+                       perror("membarrier");
+                       abort();
+               }
+       } else {
+               __atomic_thread_fence(__ATOMIC_SEQ_CST);
+       }
+}
+
+static
+void wait_gp_end(struct side_rcu_gp_state *gp_state)
+{
+       /*
+        * This memory barrier (G) pairs with memory barrier (F). It
+        * orders load of RCU reader's counter state before storing the
+        * futex value, thus ensuring that load of RCU reader's counters
+        * does not leak outside of futex state=-1.
+        */
+       if (side_rcu_rseq_membarrier_available) {
+               if (membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0)) {
+                       perror("membarrier");
+                       abort();
+               }
+       } else {
+               __atomic_thread_fence(__ATOMIC_SEQ_CST);
+       }
+       __atomic_store_n(&gp_state->futex, 0, __ATOMIC_RELAXED);
+}
+
+static
+void wait_gp(struct side_rcu_gp_state *gp_state)
+{
+       /*
+        * This memory barrier (G) pairs with memory barrier (F). It
+        * orders load of RCU reader's counter state before loading the
+        * futex value.
+        */
+       if (side_rcu_rseq_membarrier_available) {
+               if (membarrier(MEMBARRIER_CMD_PRIVATE_EXPEDITED, 0, 0)) {
+                       perror("membarrier");
+                       abort();
+               }
+       } else {
+               __atomic_thread_fence(__ATOMIC_SEQ_CST);
+       }
+       if (__atomic_load_n(&gp_state->futex, __ATOMIC_RELAXED) != -1)
+               return;
+       while (futex(&gp_state->futex, FUTEX_WAIT, -1, NULL, NULL, 0)) {
+               switch (errno) {
+               case EWOULDBLOCK:
+                       /* Value already changed. */
+                       return;
+               case EINTR:
+                       /* Retry if interrupted by signal. */
+                       break;  /* Get out of switch. */
+               default:
+                       /* Unexpected error. */
+                       abort();
+               }
+       }
+       return;
+}
+
 /* active_readers is an input/output parameter. */
 static
 void check_active_readers(struct side_rcu_gp_state *gp_state, bool *active_readers)
@@ -109,11 +186,13 @@ void wait_for_prev_period_readers(struct side_rcu_gp_state *gp_state, bool *acti
         * previous period.
         */
        for (;;) {
+               wait_gp_prepare(gp_state);
                check_active_readers(gp_state, active_readers);
-               if (!active_readers[prev_period])
+               if (!active_readers[prev_period]) {
+                       wait_gp_end(gp_state);
                        break;
-               /* Retry after 10ms. */
-               poll(NULL, 0, 10);
+               }
+               wait_gp(gp_state);
        }
 }
 
index 6e61ea348ea7a1d31a5f416c4d169558a1a9298f..6d4e4ecf0eb874d2c004a45d92841e737451028a 100644 (file)
--- a/src/rcu.h
+++ b/src/rcu.h
 #include <poll.h>
 #include <side/trace.h>
 #include <rseq/rseq.h>
+#include <linux/futex.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <sys/syscall.h>
 
 #define SIDE_CACHE_LINE_SIZE           256
 
@@ -30,13 +34,34 @@ struct side_rcu_cpu_gp_state {
 struct side_rcu_gp_state {
        struct side_rcu_cpu_gp_state *percpu_state;
        int nr_cpus;
+       int32_t futex;
        unsigned int period;
        pthread_mutex_t gp_lock;
 };
 
 extern unsigned int side_rcu_rseq_membarrier_available __attribute__((visibility("hidden")));
 
-//TODO: implement wait/wakeup for grace period using sys_futex
+static inline
+int futex(int32_t *uaddr, int op, int32_t val,
+       const struct timespec *timeout, int32_t *uaddr2, int32_t val3)
+{
+       return syscall(__NR_futex, uaddr, op, val, timeout, uaddr2, val3);
+}
+
+/*
+ * Wake-up side_rcu_wait_grace_period. Called concurrently from many
+ * threads.
+ */
+static inline
+void side_rcu_wake_up_gp(struct side_rcu_gp_state *gp_state)
+{
+       if (side_unlikely(__atomic_load_n(&gp_state->futex, __ATOMIC_RELAXED)) == -1) {
+               __atomic_store_n(&gp_state->futex, 0, __ATOMIC_RELAXED);
+               /* TODO: handle futex return values. */
+               (void) futex(&gp_state->futex, FUTEX_WAKE, 1, NULL, NULL, 0);
+       }
+}
+
 static inline
 unsigned int side_rcu_read_begin(struct side_rcu_gp_state *gp_state)
 {
@@ -97,8 +122,15 @@ void side_rcu_read_end(struct side_rcu_gp_state *gp_state, unsigned int period)
                rseq_barrier();
                cpu = rseq_cpu_start();
                cpu_gp_state = &gp_state->percpu_state[cpu];
-               if (side_likely(!rseq_addv((intptr_t *)&cpu_gp_state->count[period].rseq_end, 1, cpu)))
-                       return;
+               if (side_likely(!rseq_addv((intptr_t *)&cpu_gp_state->count[period].rseq_end, 1, cpu))) {
+                       /*
+                        * This barrier (F) is paired with membarrier()
+                        * at (G). It orders increment of the begin/end
+                        * counters before load/store to the futex.
+                        */
+                       rseq_barrier();
+                       goto end;
+               }
        }
        /* Fallback to atomic increment and SEQ_CST. */
        cpu = sched_getcpu();
@@ -106,6 +138,14 @@ void side_rcu_read_end(struct side_rcu_gp_state *gp_state, unsigned int period)
                cpu = 0;
        cpu_gp_state = &gp_state->percpu_state[cpu];
        (void) __atomic_add_fetch(&cpu_gp_state->count[period].end, 1, __ATOMIC_SEQ_CST);
+       /*
+        * This barrier (F) is paired with SEQ_CST barrier or
+        * membarrier() at (G). It orders increment of the begin/end
+        * counters before load/store to the futex.
+        */
+       __atomic_thread_fence(__ATOMIC_SEQ_CST);
+end:
+       side_rcu_wake_up_gp(gp_state);
 }
 
 #define side_rcu_dereference(p) \
This page took 0.026266 seconds and 4 git commands to generate.