Use rseq for reserve position
[lttng-ust.git] / libringbuffer / frontend_api.h
index 19fc09e9ae5f29d779a78c737e08e8530c795ae4..84be505548dde3ab6607173d68942aca5a63eb2e 100644 (file)
@@ -33,6 +33,7 @@
 #include "frontend.h"
 #include <urcu-bp.h>
 #include <urcu/compiler.h>
+#include "rseq.h"
 
 static inline
 int lib_ring_buffer_get_cpu(const struct lttng_ust_lib_ring_buffer_config *config)
@@ -149,10 +150,11 @@ int lib_ring_buffer_try_reserve(const struct lttng_ust_lib_ring_buffer_config *c
  *
  * Return :
  *  0 on success.
- * -EAGAIN if channel is disabled.
+ * -EPERM if channel is disabled.
  * -ENOSPC if event size is too large for packet.
  * -ENOBUFS if there is currently not enough space in buffer for the event.
  * -EIO if data cannot be written into the buffer for any other reason.
+ * -EAGAIN reserve aborted, should be attempted again.
  */
 
 static inline
@@ -164,10 +166,19 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        struct lttng_ust_lib_ring_buffer *buf;
        unsigned long o_begin, o_end, o_old;
        size_t before_hdr_pad = 0;
+       struct lttng_rseq_state rseq_state;
+
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
 
        if (caa_unlikely(uatomic_read(&chan->record_disabled)))
-               return -EAGAIN;
-
+               return -EPERM;
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
        else
@@ -175,7 +186,7 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        if (caa_unlikely(!buf))
                return -EIO;
        if (caa_unlikely(uatomic_read(&buf->record_disabled)))
-               return -EAGAIN;
+               return -EPERM;
        ctx->buf = buf;
 
        /*
@@ -185,10 +196,26 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
                                                 &o_end, &o_old, &before_hdr_pad)))
                goto slow_path;
 
-       if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
-                    != o_old))
-               goto slow_path;
-
+       if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+                       || rseq_state.cpu_id < 0
+                       || uatomic_read(&chan->u.reserve_fallback_ref))) {
+               if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old,
+                               o_end) != o_old))
+                       goto slow_path;
+       } else {
+               /*
+                * Load reserve_fallback_ref before offset. Matches the
+                * implicit memory barrier after v_cmpxchg of offset.
+                */
+               cmm_smp_rmb();
+               if (caa_unlikely(ctx->buf->offset.a != o_old))
+                       return -EAGAIN;
+               if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *) &ctx->buf->offset.a,
+                               (intptr_t) o_end,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       return -EAGAIN;
+       }
        /*
         * Atomically update last_tsc. This update races against concurrent
         * atomic updates, but the race will always cause supplementary full TSC
@@ -259,6 +286,16 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot = shmp_index(handle,
                                                buf->commit_hot, endidx);
+       struct lttng_rseq_state rseq_state;
+
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
 
        if (caa_unlikely(!cc_hot))
                return;
@@ -274,7 +311,19 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
         */
        cmm_smp_wmb();
 
+       if (caa_likely(config->sync == RING_BUFFER_SYNC_PER_CPU
+                       && rseq_state.cpu_id >= 0)) {
+               unsigned long newv;
+
+               newv = cc_hot->cc_rseq + ctx->slot_size;
+               if (caa_likely(__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *)&cc_hot->cc_rseq,
+                               (intptr_t) newv,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       goto add_done;
+       }
        v_add(config, ctx->slot_size, &cc_hot->cc);
+add_done:
 
        /*
         * commit count read can race with concurrent OOO commit count updates.
@@ -295,6 +344,7 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
         *   which is completely independent of the order.
         */
        commit_count = v_read(config, &cc_hot->cc);
+       commit_count += cc_hot->cc_rseq;
 
        lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
                                      commit_count, endidx, handle, ctx->tsc);
This page took 0.026207 seconds and 5 git commands to generate.