Use rseq for reserve position
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Mon, 21 Nov 2016 21:15:59 +0000 (16:15 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 22 Nov 2016 00:11:35 +0000 (19:11 -0500)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
liblttng-ust/lttng-ring-buffer-client.h
libringbuffer/frontend_api.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_frontend.c
libringbuffer/vatomic.h

index 5fde855cea6947bc1f24f0a2d75388d777930298..f014f4e46158e9706873e9ae5bc7a74d67a72303 100644 (file)
@@ -637,7 +637,7 @@ static const struct lttng_ust_lib_ring_buffer_config client_config = {
 
        .tsc_bits = LTTNG_COMPACT_TSC_BITS,
        .alloc = RING_BUFFER_ALLOC_PER_CPU,
-       .sync = RING_BUFFER_SYNC_GLOBAL,
+       .sync = RING_BUFFER_SYNC_PER_CPU,
        .mode = RING_BUFFER_MODE_TEMPLATE,
        .backend = RING_BUFFER_PAGE,
        .output = RING_BUFFER_MMAP,
@@ -696,7 +696,7 @@ int lttng_event_reserve(struct lttng_ust_lib_ring_buffer_ctx *ctx,
 {
        struct lttng_channel *lttng_chan = channel_get_private(ctx->chan);
        struct lttng_rseq_state rseq_state;
-       int ret, cpu;
+       int ret, cpu, fallback = 0;
 
        if (lib_ring_buffer_begin(&client_config))
                return -EPERM;
@@ -716,6 +716,7 @@ retry:
        } else {
                cpu = rseq_cpu_at_start(rseq_state);
        }
+fallback:
        ctx->cpu = cpu;
 
        switch (lttng_chan->header_type) {
@@ -731,9 +732,25 @@ retry:
                WARN_ON_ONCE(1);
        }
 
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx)))
+               ctx->rseq_state = rseq_state;
+
        ret = lib_ring_buffer_reserve(&client_config, ctx);
-       if (caa_unlikely(ret))
-               goto put;
+       if (caa_unlikely(ret)) {
+               if (ret == -EAGAIN) {
+                       assert(!fallback);
+                       fallback = 1;
+                       uatomic_inc(&lttng_chan->chan->u.reserve_fallback_ref);
+                       cpu = lib_ring_buffer_get_cpu(&client_config);
+                       if (caa_unlikely(cpu < 0)) {
+                               ret = -EPERM;
+                               goto end;
+                       }
+                       goto fallback;
+               }
+               goto end;
+       }
        if (caa_likely(ctx->ctx_len
                        >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
                if (lib_ring_buffer_backend_get_pages(&client_config, ctx,
@@ -744,13 +761,14 @@ retry:
        }
        lttng_write_event_header(&client_config, ctx, event_id);
 
-       if (caa_likely(ctx->ctx_len
-                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx)))
-               ctx->rseq_state = rseq_state;
+       if (caa_unlikely(fallback))
+               uatomic_dec(&lttng_chan->chan->u.reserve_fallback_ref);
 
        return 0;
 end:
        lib_ring_buffer_end(&client_config);
+       if (fallback)
+               uatomic_dec(&lttng_chan->chan->u.reserve_fallback_ref);
        return ret;
 }
 
index 6ff98da0d1ed22a60800cb2dc7008e4f49e46ff0..84be505548dde3ab6607173d68942aca5a63eb2e 100644 (file)
@@ -150,10 +150,11 @@ int lib_ring_buffer_try_reserve(const struct lttng_ust_lib_ring_buffer_config *c
  *
  * Return :
  *  0 on success.
- * -EAGAIN if channel is disabled.
+ * -EPERM if channel is disabled.
  * -ENOSPC if event size is too large for packet.
  * -ENOBUFS if there is currently not enough space in buffer for the event.
  * -EIO if data cannot be written into the buffer for any other reason.
+ * -EAGAIN reserve aborted, should be attempted again.
  */
 
 static inline
@@ -165,10 +166,19 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        struct lttng_ust_lib_ring_buffer *buf;
        unsigned long o_begin, o_end, o_old;
        size_t before_hdr_pad = 0;
+       struct lttng_rseq_state rseq_state;
 
-       if (caa_unlikely(uatomic_read(&chan->record_disabled)))
-               return -EAGAIN;
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
 
+       if (caa_unlikely(uatomic_read(&chan->record_disabled)))
+               return -EPERM;
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
        else
@@ -176,7 +186,7 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
        if (caa_unlikely(!buf))
                return -EIO;
        if (caa_unlikely(uatomic_read(&buf->record_disabled)))
-               return -EAGAIN;
+               return -EPERM;
        ctx->buf = buf;
 
        /*
@@ -186,10 +196,26 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi
                                                 &o_end, &o_old, &before_hdr_pad)))
                goto slow_path;
 
-       if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
-                    != o_old))
-               goto slow_path;
-
+       if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+                       || rseq_state.cpu_id < 0
+                       || uatomic_read(&chan->u.reserve_fallback_ref))) {
+               if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old,
+                               o_end) != o_old))
+                       goto slow_path;
+       } else {
+               /*
+                * Load reserve_fallback_ref before offset. Matches the
+                * implicit memory barrier after v_cmpxchg of offset.
+                */
+               cmm_smp_rmb();
+               if (caa_unlikely(ctx->buf->offset.a != o_old))
+                       return -EAGAIN;
+               if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *) &ctx->buf->offset.a,
+                               (intptr_t) o_end,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       return -EAGAIN;
+       }
        /*
         * Atomically update last_tsc. This update races against concurrent
         * atomic updates, but the race will always cause supplementary full TSC
@@ -267,6 +293,8 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
                rseq_state = ctx->rseq_state;
        } else {
                rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
        }
 
        if (caa_unlikely(!cc_hot))
@@ -283,7 +311,8 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi
         */
        cmm_smp_wmb();
 
-       if (caa_likely(rseq_state.cpu_id >= 0)) {
+       if (caa_likely(config->sync == RING_BUFFER_SYNC_PER_CPU
+                       && rseq_state.cpu_id >= 0)) {
                unsigned long newv;
 
                newv = cc_hot->cc_rseq + ctx->slot_size;
index c5c716f753cdd0774319b45fc462ca5f86ff0a60..b99fb8cba204687da8e4900da80c01da4c533a51 100644 (file)
@@ -70,7 +70,10 @@ struct channel {
        size_t priv_data_offset;
        unsigned int nr_streams;                /* Number of streams */
        struct lttng_ust_shm_handle *handle;
-       char padding[RB_CHANNEL_PADDING];
+       union {
+               unsigned long reserve_fallback_ref;
+               char padding[RB_CHANNEL_PADDING];
+       } u;
        /*
         * Associated backend contains a variable-length array. Needs to
         * be last member.
index 46751a2d8f58269a940c6efd3fa42f9a64a335c4..cda91dabcc8588d9c3f6b154357044b1ab37c236 100644 (file)
@@ -74,6 +74,7 @@
 #include "shm.h"
 #include "tlsfixup.h"
 #include "../liblttng-ust/compat.h"    /* For ENODATA */
+#include "rseq.h"
 
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
@@ -2193,6 +2194,16 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
        struct lttng_ust_lib_ring_buffer *buf;
        struct switch_offsets offsets;
        int ret;
+       struct lttng_rseq_state rseq_state;
+
+       if (caa_likely(ctx->ctx_len
+                       >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+               rseq_state = ctx->rseq_state;
+       } else {
+               rseq_state.cpu_id = -2;
+               rseq_state.event_counter = 0;
+               rseq_state.rseqp = NULL;
+       }
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
@@ -2204,14 +2215,30 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
 
        offsets.size = 0;
 
-       do {
-               ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
-                                                      ctx);
+       if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+                       || rseq_state.cpu_id < 0
+                       || uatomic_read(&chan->u.reserve_fallback_ref))) {
+               do {
+                       ret = lib_ring_buffer_try_reserve_slow(buf, chan,
+                                       &offsets, ctx);
+                       if (caa_unlikely(ret))
+                               return ret;
+               } while (caa_unlikely(v_cmpxchg(config, &buf->offset,
+                                       offsets.old, offsets.end)
+                               != offsets.old));
+       } else {
+               ret = lib_ring_buffer_try_reserve_slow(buf, chan,
+                               &offsets, ctx);
                if (caa_unlikely(ret))
                        return ret;
-       } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
-                                   offsets.end)
-                         != offsets.old));
+               if (caa_unlikely(buf->offset.a != offsets.old))
+                       return -EAGAIN;
+               if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+                               (intptr_t *) &buf->offset.a,
+                               (intptr_t) offsets.end,
+                               rseq_state, RSEQ_FINISH_SINGLE, false)))
+                       return -EAGAIN;
+       }
 
        /*
         * Atomically update last_tsc. This update races against concurrent
index 019ea06c213033ce7e80f944bbcff9c4d4a86d39..890b3d183f25385523050b16a2c2a19f95278203 100644 (file)
@@ -44,7 +44,6 @@ union v_atomic {
 static inline
 long v_read(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a)
 {
-       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
        return uatomic_read(&v_a->a);
 }
 
@@ -52,21 +51,18 @@ static inline
 void v_set(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a,
           long v)
 {
-       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
        uatomic_set(&v_a->a, v);
 }
 
 static inline
 void v_add(const struct lttng_ust_lib_ring_buffer_config *config, long v, union v_atomic *v_a)
 {
-       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
        uatomic_add(&v_a->a, v);
 }
 
 static inline
 void v_inc(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a)
 {
-       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
        uatomic_inc(&v_a->a);
 }
 
@@ -83,7 +79,6 @@ static inline
 long v_cmpxchg(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a,
               long old, long _new)
 {
-       assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
        return uatomic_cmpxchg(&v_a->a, old, _new);
 }
 
This page took 0.030787 seconds and 5 git commands to generate.