.tsc_bits = LTTNG_COMPACT_TSC_BITS,
.alloc = RING_BUFFER_ALLOC_PER_CPU,
- .sync = RING_BUFFER_SYNC_GLOBAL,
+ .sync = RING_BUFFER_SYNC_PER_CPU,
.mode = RING_BUFFER_MODE_TEMPLATE,
.backend = RING_BUFFER_PAGE,
.output = RING_BUFFER_MMAP,
{
struct lttng_channel *lttng_chan = channel_get_private(ctx->chan);
struct lttng_rseq_state rseq_state;
- int ret, cpu;
+ int ret, cpu, fallback = 0;
if (lib_ring_buffer_begin(&client_config))
return -EPERM;
} else {
cpu = rseq_cpu_at_start(rseq_state);
}
+fallback:
ctx->cpu = cpu;
switch (lttng_chan->header_type) {
WARN_ON_ONCE(1);
}
+ if (caa_likely(ctx->ctx_len
+ >= sizeof(struct lttng_ust_lib_ring_buffer_ctx)))
+ ctx->rseq_state = rseq_state;
+
ret = lib_ring_buffer_reserve(&client_config, ctx);
- if (caa_unlikely(ret))
- goto put;
+ if (caa_unlikely(ret)) {
+ if (ret == -EAGAIN) {
+ assert(!fallback);
+ fallback = 1;
+ uatomic_inc(<tng_chan->chan->u.reserve_fallback_ref);
+ cpu = lib_ring_buffer_get_cpu(&client_config);
+ if (caa_unlikely(cpu < 0)) {
+ ret = -EPERM;
+ goto end;
+ }
+ goto fallback;
+ }
+ goto end;
+ }
if (caa_likely(ctx->ctx_len
>= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
if (lib_ring_buffer_backend_get_pages(&client_config, ctx,
}
lttng_write_event_header(&client_config, ctx, event_id);
- if (caa_likely(ctx->ctx_len
- >= sizeof(struct lttng_ust_lib_ring_buffer_ctx)))
- ctx->rseq_state = rseq_state;
+ if (caa_unlikely(fallback))
+ uatomic_dec(<tng_chan->chan->u.reserve_fallback_ref);
return 0;
end:
lib_ring_buffer_end(&client_config);
+ if (fallback)
+ uatomic_dec(<tng_chan->chan->u.reserve_fallback_ref);
return ret;
}
*
* Return :
* 0 on success.
- * -EAGAIN if channel is disabled.
+ * -EPERM if channel is disabled.
* -ENOSPC if event size is too large for packet.
* -ENOBUFS if there is currently not enough space in buffer for the event.
* -EIO if data cannot be written into the buffer for any other reason.
+ * -EAGAIN reserve aborted, should be attempted again.
*/
static inline
struct lttng_ust_lib_ring_buffer *buf;
unsigned long o_begin, o_end, o_old;
size_t before_hdr_pad = 0;
+ struct lttng_rseq_state rseq_state;
- if (caa_unlikely(uatomic_read(&chan->record_disabled)))
- return -EAGAIN;
+ if (caa_likely(ctx->ctx_len
+ >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+ rseq_state = ctx->rseq_state;
+ } else {
+ rseq_state.cpu_id = -2;
+ rseq_state.event_counter = 0;
+ rseq_state.rseqp = NULL;
+ }
+ if (caa_unlikely(uatomic_read(&chan->record_disabled)))
+ return -EPERM;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
else
if (caa_unlikely(!buf))
return -EIO;
if (caa_unlikely(uatomic_read(&buf->record_disabled)))
- return -EAGAIN;
+ return -EPERM;
ctx->buf = buf;
/*
&o_end, &o_old, &before_hdr_pad)))
goto slow_path;
- if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old, o_end)
- != o_old))
- goto slow_path;
-
+ if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+ || rseq_state.cpu_id < 0
+ || uatomic_read(&chan->u.reserve_fallback_ref))) {
+ if (caa_unlikely(v_cmpxchg(config, &ctx->buf->offset, o_old,
+ o_end) != o_old))
+ goto slow_path;
+ } else {
+ /*
+ * Load reserve_fallback_ref before offset. Matches the
+ * implicit memory barrier after v_cmpxchg of offset.
+ */
+ cmm_smp_rmb();
+ if (caa_unlikely(ctx->buf->offset.a != o_old))
+ return -EAGAIN;
+ if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+ (intptr_t *) &ctx->buf->offset.a,
+ (intptr_t) o_end,
+ rseq_state, RSEQ_FINISH_SINGLE, false)))
+ return -EAGAIN;
+ }
/*
* Atomically update last_tsc. This update races against concurrent
* atomic updates, but the race will always cause supplementary full TSC
rseq_state = ctx->rseq_state;
} else {
rseq_state.cpu_id = -2;
+ rseq_state.event_counter = 0;
+ rseq_state.rseqp = NULL;
}
if (caa_unlikely(!cc_hot))
*/
cmm_smp_wmb();
- if (caa_likely(rseq_state.cpu_id >= 0)) {
+ if (caa_likely(config->sync == RING_BUFFER_SYNC_PER_CPU
+ && rseq_state.cpu_id >= 0)) {
unsigned long newv;
newv = cc_hot->cc_rseq + ctx->slot_size;
size_t priv_data_offset;
unsigned int nr_streams; /* Number of streams */
struct lttng_ust_shm_handle *handle;
- char padding[RB_CHANNEL_PADDING];
+ union {
+ unsigned long reserve_fallback_ref;
+ char padding[RB_CHANNEL_PADDING];
+ } u;
/*
* Associated backend contains a variable-length array. Needs to
* be last member.
#include "shm.h"
#include "tlsfixup.h"
#include "../liblttng-ust/compat.h" /* For ENODATA */
+#include "rseq.h"
/* Print DBG() messages about events lost only every 1048576 hits */
#define DBG_PRINT_NR_LOST (1UL << 20)
struct lttng_ust_lib_ring_buffer *buf;
struct switch_offsets offsets;
int ret;
+ struct lttng_rseq_state rseq_state;
+
+ if (caa_likely(ctx->ctx_len
+ >= sizeof(struct lttng_ust_lib_ring_buffer_ctx))) {
+ rseq_state = ctx->rseq_state;
+ } else {
+ rseq_state.cpu_id = -2;
+ rseq_state.event_counter = 0;
+ rseq_state.rseqp = NULL;
+ }
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
offsets.size = 0;
- do {
- ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
- ctx);
+ if (caa_unlikely(config->sync == RING_BUFFER_SYNC_GLOBAL
+ || rseq_state.cpu_id < 0
+ || uatomic_read(&chan->u.reserve_fallback_ref))) {
+ do {
+ ret = lib_ring_buffer_try_reserve_slow(buf, chan,
+ &offsets, ctx);
+ if (caa_unlikely(ret))
+ return ret;
+ } while (caa_unlikely(v_cmpxchg(config, &buf->offset,
+ offsets.old, offsets.end)
+ != offsets.old));
+ } else {
+ ret = lib_ring_buffer_try_reserve_slow(buf, chan,
+ &offsets, ctx);
if (caa_unlikely(ret))
return ret;
- } while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
- offsets.end)
- != offsets.old));
+ if (caa_unlikely(buf->offset.a != offsets.old))
+ return -EAGAIN;
+ if (caa_unlikely(!__rseq_finish(NULL, 0, NULL, NULL, 0,
+ (intptr_t *) &buf->offset.a,
+ (intptr_t) offsets.end,
+ rseq_state, RSEQ_FINISH_SINGLE, false)))
+ return -EAGAIN;
+ }
/*
* Atomically update last_tsc. This update races against concurrent
static inline
long v_read(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a)
{
- assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
return uatomic_read(&v_a->a);
}
void v_set(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a,
long v)
{
- assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
uatomic_set(&v_a->a, v);
}
static inline
void v_add(const struct lttng_ust_lib_ring_buffer_config *config, long v, union v_atomic *v_a)
{
- assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
uatomic_add(&v_a->a, v);
}
static inline
void v_inc(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a)
{
- assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
uatomic_inc(&v_a->a);
}
long v_cmpxchg(const struct lttng_ust_lib_ring_buffer_config *config, union v_atomic *v_a,
long old, long _new)
{
- assert(config->sync != RING_BUFFER_SYNC_PER_CPU);
return uatomic_cmpxchg(&v_a->a, old, _new);
}