From 6fd21280083aa3a69348275cd00b21c0396967a5 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 21 Nov 2016 16:15:59 -0500 Subject: [PATCH] Use rseq for reserve position Signed-off-by: Mathieu Desnoyers --- liblttng-ust/lttng-ring-buffer-client.h | 32 +++++++++++++---- libringbuffer/frontend_api.h | 47 ++++++++++++++++++++----- libringbuffer/frontend_types.h | 5 ++- libringbuffer/ring_buffer_frontend.c | 39 ++++++++++++++++---- libringbuffer/vatomic.h | 5 --- 5 files changed, 100 insertions(+), 28 deletions(-) diff --git a/liblttng-ust/lttng-ring-buffer-client.h b/liblttng-ust/lttng-ring-buffer-client.h index 5fde855c..f014f4e4 100644 --- a/liblttng-ust/lttng-ring-buffer-client.h +++ b/liblttng-ust/lttng-ring-buffer-client.h @@ -637,7 +637,7 @@ static const struct lttng_ust_lib_ring_buffer_config client_config = { .tsc_bits = LTTNG_COMPACT_TSC_BITS, .alloc = RING_BUFFER_ALLOC_PER_CPU, - .sync = RING_BUFFER_SYNC_GLOBAL, + .sync = RING_BUFFER_SYNC_PER_CPU, .mode = RING_BUFFER_MODE_TEMPLATE, .backend = RING_BUFFER_PAGE, .output = RING_BUFFER_MMAP, @@ -696,7 +696,7 @@ int lttng_event_reserve(struct lttng_ust_lib_ring_buffer_ctx *ctx, { struct lttng_channel *lttng_chan = channel_get_private(ctx->chan); struct lttng_rseq_state rseq_state; - int ret, cpu; + int ret, cpu, fallback = 0; if (lib_ring_buffer_begin(&client_config)) return -EPERM; @@ -716,6 +716,7 @@ retry: } else { cpu = rseq_cpu_at_start(rseq_state); } +fallback: ctx->cpu = cpu; switch (lttng_chan->header_type) { @@ -731,9 +732,25 @@ retry: WARN_ON_ONCE(1); } + if (caa_likely(ctx->ctx_len + >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) + ctx->rseq_state = rseq_state; + ret = lib_ring_buffer_reserve(&client_config, ctx); - if (caa_unlikely(ret)) - goto put; + if (caa_unlikely(ret)) { + if (ret == -EAGAIN) { + assert(!fallback); + fallback = 1; + uatomic_inc(<tng_chan->chan->u.reserve_fallback_ref); + cpu = lib_ring_buffer_get_cpu(&client_config); + if (caa_unlikely(cpu < 0)) { + ret = -EPERM; + goto end; + } + goto fallback; + } + goto end; + } if (caa_likely(ctx->ctx_len >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) { if (lib_ring_buffer_backend_get_pages(&client_config, ctx, @@ -744,13 +761,14 @@ retry: } lttng_write_event_header(&client_config, ctx, event_id); - if (caa_likely(ctx->ctx_len - >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) - ctx->rseq_state = rseq_state; + if (caa_unlikely(fallback)) + uatomic_dec(<tng_chan->chan->u.reserve_fallback_ref); return 0; end: lib_ring_buffer_end(&client_config); + if (fallback) + uatomic_dec(<tng_chan->chan->u.reserve_fallback_ref); return ret; } diff --git a/libringbuffer/frontend_api.h b/libringbuffer/frontend_api.h index 6ff98da0..84be5055 100644 --- a/libringbuffer/frontend_api.h +++ b/libringbuffer/frontend_api.h @@ -150,10 +150,11 @@ int lib_ring_buffer_try_reserve(const struct lttng_ust_lib_ring_buffer_config *c * * Return : * 0 on success. - * -EAGAIN if channel is disabled. + * -EPERM if channel is disabled. * -ENOSPC if event size is too large for packet. * -ENOBUFS if there is currently not enough space in buffer for the event. * -EIO if data cannot be written into the buffer for any other reason. + * -EAGAIN reserve aborted, should be attempted again. */ static inline @@ -165,10 +166,19 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi struct lttng_ust_lib_ring_buffer *buf; unsigned long o_begin, o_end, o_old; size_t before_hdr_pad = 0; + struct lttng_rseq_state rseq_state; - if (caa_unlikely(uatomic_read(&chan->record_disabled))) - return -EAGAIN; + if (caa_likely(ctx->ctx_len + >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) { + rseq_state = ctx->rseq_state; + } else { + rseq_state.cpu_id = -2; + rseq_state.event_counter = 0; + rseq_state.rseqp = NULL; + } + if (caa_unlikely(uatomic_read(&chan->record_disabled))) + return -EPERM; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp); else @@ -176,7 +186,7 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi if (caa_unlikely(!buf)) return -EIO; if (caa_unlikely(uatomic_read(&buf->record_disabled))) - return -EAGAIN; + return -EPERM; ctx->buf = buf; /* @@ -186,10 +196,26 @@ int lib_ring_buffer_reserve(const struct lttng_ust_lib_ring_buffer_config *confi &o_end, &o_old, &before_hdr_pad))) goto slow_path; - if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end) - != o_old)) - goto slow_path; - + if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL + || rseq_state.cpu_id < 0 + || uatomic_read(&chan->u.reserve_fallback_ref))) { + if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, + o_end) != o_old)) + goto slow_path; + } else { + /* + * Load reserve_fallback_ref before offset. Matches the + * implicit memory barrier after v_cmpxchg of offset. + */ + cmm_smp_rmb(); + if (caa_unlikely(ctx->buf->offset.a != o_old)) + return -EAGAIN; + if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0, + (intptr_t *) &ctx->buf->offset.a, + (intptr_t) o_end, + rseq_state, RSEQ_FINISH_SINGLE, false))) + return -EAGAIN; + } /* * Atomically update last_tsc. This update races against concurrent * atomic updates, but the race will always cause supplementary full TSC @@ -267,6 +293,8 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi rseq_state = ctx->rseq_state; } else { rseq_state.cpu_id = -2; + rseq_state.event_counter = 0; + rseq_state.rseqp = NULL; } if (caa_unlikely(!cc_hot)) @@ -283,7 +311,8 @@ void lib_ring_buffer_commit(const struct lttng_ust_lib_ring_buffer_config *confi */ cmm_smp_wmb(); - if (caa_likely(rseq_state.cpu_id >= 0)) { + if (caa_likely(config->sync == RING_BUFFER_SYNC_PER_CPU + && rseq_state.cpu_id >= 0)) { unsigned long newv; newv = cc_hot->cc_rseq + ctx->slot_size; diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index c5c716f7..b99fb8cb 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -70,7 +70,10 @@ struct channel { size_t priv_data_offset; unsigned int nr_streams; /* Number of streams */ struct lttng_ust_shm_handle *handle; - char padding[RB_CHANNEL_PADDING]; + union { + unsigned long reserve_fallback_ref; + char padding[RB_CHANNEL_PADDING]; + } u; /* * Associated backend contains a variable-length array. Needs to * be last member. diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 46751a2d..cda91dab 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -74,6 +74,7 @@ #include "shm.h" #include "tlsfixup.h" #include "../liblttng-ust/compat.h" /* For ENODATA */ +#include "rseq.h" /* Print DBG() messages about events lost only every 1048576 hits */ #define DBG_PRINT_NR_LOST (1UL << 20) @@ -2193,6 +2194,16 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) struct lttng_ust_lib_ring_buffer *buf; struct switch_offsets offsets; int ret; + struct lttng_rseq_state rseq_state; + + if (caa_likely(ctx->ctx_len + >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) { + rseq_state = ctx->rseq_state; + } else { + rseq_state.cpu_id = -2; + rseq_state.event_counter = 0; + rseq_state.rseqp = NULL; + } if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp); @@ -2204,14 +2215,30 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) offsets.size = 0; - do { - ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets, - ctx); + if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL + || rseq_state.cpu_id < 0 + || uatomic_read(&chan->u.reserve_fallback_ref))) { + do { + ret = lib_ring_buffer_try_reserve_slow(buf, chan, + &offsets, ctx); + if (caa_unlikely(ret)) + return ret; + } while (caa_unlikely(v_cmpxchg(config, &buf->offset, + offsets.old, offsets.end) + != offsets.old)); + } else { + ret = lib_ring_buffer_try_reserve_slow(buf, chan, + &offsets, ctx); if (caa_unlikely(ret)) return ret; - } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old, - offsets.end) - != offsets.old)); + if (caa_unlikely(buf->offset.a != offsets.old)) + return -EAGAIN; + if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0, + (intptr_t *) &buf->offset.a, + (intptr_t) offsets.end, + rseq_state, RSEQ_FINISH_SINGLE, false))) + return -EAGAIN; + } /* * Atomically update last_tsc. This update races against concurrent diff --git a/libringbuffer/vatomic.h b/libringbuffer/vatomic.h index 019ea06c..890b3d18 100644 --- a/libringbuffer/vatomic.h +++ b/libringbuffer/vatomic.h @@ -44,7 +44,6 @@ union v_atomic { static inline long v_read(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a) { - assert(config->sync != RING_BUFFER_SYNC_PER_CPU); return uatomic_read(&v_a->a); } @@ -52,21 +51,18 @@ static inline void v_set(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a, long v) { - assert(config->sync != RING_BUFFER_SYNC_PER_CPU); uatomic_set(&v_a->a, v); } static inline void v_add(const struct lttng_ust_lib_ring_buffer_config *config, long v, union v_atomic *v_a) { - assert(config->sync != RING_BUFFER_SYNC_PER_CPU); uatomic_add(&v_a->a, v); } static inline void v_inc(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a) { - assert(config->sync != RING_BUFFER_SYNC_PER_CPU); uatomic_inc(&v_a->a); } @@ -83,7 +79,6 @@ static inline long v_cmpxchg(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a, long old, long _new) { - assert(config->sync != RING_BUFFER_SYNC_PER_CPU); return uatomic_cmpxchg(&v_a->a, old, _new); } -- 2.34.1