Use rseq for reserve position
[lttng-ust.git] / libringbuffer / frontend_api.h
index 6a06bba2c2393ba139bc506a8f88bb4027b59ea9..84be505548dde3ab6607173d68942aca5a63eb2e 100644 (file)
 #include "frontend.h"
 #include <urcu-bp.h>
 #include <urcu/compiler.h>
+#include "rseq.h"
+
+static inline
+int lib_ring_buffer_get_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
+{
+       return lttng_ust_get_cpu();
+}
 
 /**
- * lib_ring_buffer_get_cpu - Precedes ring buffer reserve/commit.
+ * lib_ring_buffer_begin - Precedes ring buffer reserve/commit.
  *
- * Grabs RCU read-side lock and keeps a ring buffer nesting count as
- * supplementary safety net to ensure tracer client code will never
- * trigger an endless recursion. Returns the processor ID on success,
- * -EPERM on failure (nesting count too high).
+ * Keeps a ring buffer nesting count as supplementary safety net to
+ * ensure tracer client code will never trigger an endless recursion.
+ * Returns the processor ID on success, -EPERM on failure (nesting count
+ * too high).
  *
  * asm volatile and "memory" clobber prevent the compiler from moving
  * instructions out of the ring buffer nesting count. This is required to ensure
  * section.
  */
 static inline
-int lib_ring_buffer_get_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
+int lib_ring_buffer_begin(const struct lttng_ust_lib_ring_buffer_config *config)
 {
-       int cpu, nesting;
+       int nesting;
 
-       rcu_read_lock();
-       cpu = lttng_ust_get_cpu();
-       nesting = ++lib_ring_buffer_nesting;    /* TLS */
+       nesting = ++URCU_TLS(lib_ring_buffer_nesting);
        cmm_barrier();
 
        if (caa_unlikely(nesting > 4)) {
                WARN_ON_ONCE(1);
-               lib_ring_buffer_nesting--;      /* TLS */
-               rcu_read_unlock();
+               URCU_TLS(lib_ring_buffer_nesting)--;
                return -EPERM;
-       } else
-               return cpu;
+       }
+       return 0;
 }
 
 /**
- * lib_ring_buffer_put_cpu - Follows ring buffer reserve/commit.
+ * lib_ring_buffer_end - Follows ring buffer reserve/commit.
  */
 static inline
-void lib_ring_buffer_put_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
+void lib_ring_buffer_end(const struct lttng_ust_lib_ring_buffer_config *config)
 {
        cmm_barrier();
-       lib_ring_buffer_nesting--;              /* TLS */
-       rcu_read_unlock();
+       URCU_TLS(lib_ring_buffer_nesting)--;            /* TLS */
 }
 
 /*
@@ -147,10 +150,11 @@ int lib_ring_buffer_try_reserve(const struct lttng_ust_lib_ring_buffer_config *c
  *
  * Return :
  *  0 on success.
- * -EAGAIN if channel is disabled.
+ * -EPERM if channel is disabled.
  * -ENOSPC if event size is too large for packet.
  * -ENOBUFS if there is currently not enough space in buffer for the event.
  * -EIO if data cannot be written into the buffer for any other reason.
+ * -EAGAIN reserve aborted, should be attempted again.
  */
 
 static inline
@@ -162,16 +166,27 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        struct lttng_ust_lib_ring_buffer *buf;
        unsigned long o_begin, o_end, o_old;
        size_t before_hdr_pad = 0;
-
-       if (uatomic_read(&chan->record_disabled))
-               return -EAGAIN;
-
+       struct lttng_rseq_state rseq_state;
+
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
+
+       if (caa_unlikely(uatomic_read(&chan->record_disabled)))
+               return -EPERM;
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
        else
                buf = shmp(handle, chan->backend.buf[0].shmp);
-       if (uatomic_read(&buf->record_disabled))
-               return -EAGAIN;
+       if (caa_unlikely(!buf))
+               return -EIO;
+       if (caa_unlikely(uatomic_read(&buf->record_disabled)))
+               return -EPERM;
        ctx->buf = buf;
 
        /*
@@ -181,10 +196,26 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
                                                 &o_end, &o_old, &before_hdr_pad)))
                goto slow_path;
 
-       if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
-                    != o_old))
-               goto slow_path;
-
+       if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+                       || rseq_state.cpu_id < 0
+                       || uatomic_read(&chan->u.reserve_fallback_ref))) {
+               if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old,
+                               o_end) != o_old))
+                       goto slow_path;
+       } else {
+               /*
+                * Load reserve_fallback_ref before offset. Matches the
+                * implicit memory barrier after v_cmpxchg of offset.
+                */
+               cmm_smp_rmb();
+               if (caa_unlikely(ctx->buf->offset.a != o_old))
+                       return -EAGAIN;
+               if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *) &ctx->buf->offset.a,
+                               (intptr_t) o_end,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       return -EAGAIN;
+       }
        /*
         * Atomically update last_tsc. This update races against concurrent
         * atomic updates, but the race will always cause supplementary full TSC
@@ -253,11 +284,26 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
        unsigned long offset_end = ctx->buf_offset;
        unsigned long endidx = subbuf_index(offset_end - 1, chan);
        unsigned long commit_count;
+       struct commit_counters_hot *cc_hot = shmp_index(handle,
+                                               buf->commit_hot, endidx);
+       struct lttng_rseq_state rseq_state;
+
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
+
+       if (caa_unlikely(!cc_hot))
+               return;
 
        /*
         * Must count record before incrementing the commit count.
         */
-       subbuffer_count_record(config, &buf->backend, endidx, handle);
+       subbuffer_count_record(config, ctx, &buf->backend, endidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -265,7 +311,19 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
         */
        cmm_smp_wmb();
 
-       v_add(config, ctx->slot_size, &shmp_index(handle, buf->commit_hot, endidx)->cc);
+       if (caa_likely(config->sync == RING_BUFFER_SYNC_PER_CPU
+                       && rseq_state.cpu_id >= 0)) {
+               unsigned long newv;
+
+               newv = cc_hot->cc_rseq + ctx->slot_size;
+               if (caa_likely(__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *)&cc_hot->cc_rseq,
+                               (intptr_t) newv,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       goto add_done;
+       }
+       v_add(config, ctx->slot_size, &cc_hot->cc);
+add_done:
 
        /*
         * commit count read can race with concurrent OOO commit count updates.
@@ -285,17 +343,17 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
         *   count reaches back the reserve offset for a specific sub-buffer,
         *   which is completely independent of the order.
         */
-       commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc);
+       commit_count = v_read(config, &cc_hot->cc);
+       commit_count += cc_hot->cc_rseq;
 
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
-                                     commit_count, endidx, handle);
+                                     commit_count, endidx, handle, ctx->tsc);
        /*
         * Update used size at each commit. It's needed only for extracting
         * ring_buffer buffers from vmcore, after crash.
         */
-       lib_ring_buffer_write_commit_counter(config, buf, chan, endidx,
-                                            ctx->buf_offset, commit_count,
-                                            ctx->slot_size, handle);
+       lib_ring_buffer_write_commit_counter(config, buf, chan,
+                       offset_end, commit_count, handle, cc_hot);
 }
 
 /**
This page took 0.035255 seconds and 5 git commands to generate.