* Dual LGPL v2.1/GPL v2 license.
*/
-#include <linux/kref.h>
-#include "../../wrapper/ringbuffer/config.h"
-#include "../../wrapper/ringbuffer/backend_types.h"
-#include "../../lib/prio_heap/lttng_prio_heap.h" /* For per-CPU read-side iterator */
+#include <string.h>
+
+#include <urcu/list.h>
+#include <urcu/uatomic.h>
+
+#include <lttng/ringbuffer-config.h>
+#include <usterr-signal-safe.h>
+#include "backend_types.h"
+#include "shm_internal.h"
+#include "vatomic.h"
/*
* A switch is done during tracing or as a final flush after tracing (so it
*/
enum switch_mode { SWITCH_ACTIVE, SWITCH_FLUSH };
-/* channel-level read-side iterator */
-struct channel_iter {
- /* Prio heap of buffers. Lowest timestamps at the top. */
- struct lttng_ptr_heap heap; /* Heap of struct lib_ring_buffer ptrs */
- struct list_head empty_head; /* Empty buffers linked-list head */
- int read_open; /* Opened for reading ? */
- u64 last_qs; /* Last quiescent state timestamp */
- u64 last_timestamp; /* Last timestamp (for WARN_ON) */
- int last_cpu; /* Last timestamp cpu */
- /*
- * read() file operation state.
- */
- unsigned long len_left;
-};
-
/* channel: collection of per-cpu ring buffers. */
+#define RB_CHANNEL_PADDING 64
struct channel {
- atomic_t record_disabled;
+ int record_disabled;
unsigned long commit_count_mask; /*
* Commit count mask, removing
* the MSBs corresponding to
* subbuffer index.
*/
- struct channel_backend backend; /* Associated backend */
-
unsigned long switch_timer_interval; /* Buffer flush (jiffies) */
unsigned long read_timer_interval; /* Reader wakeup (jiffies) */
- struct notifier_block cpu_hp_notifier; /* CPU hotplug notifier */
- struct notifier_block tick_nohz_notifier; /* CPU nohz notifier */
- struct notifier_block hp_iter_notifier; /* hotplug iterator notifier */
- int cpu_hp_enable:1; /* Enable CPU hotplug notif. */
- int hp_iter_enable:1; /* Enable hp iter notif. */
- wait_queue_head_t read_wait; /* reader wait queue */
- wait_queue_head_t hp_wait; /* CPU hotplug wait queue */
+ //wait_queue_head_t read_wait; /* reader wait queue */
int finalized; /* Has channel been finalized */
- struct channel_iter iter; /* Channel read-side iterator */
- struct kref ref; /* Reference count */
-};
+ size_t priv_data_offset;
+ /*
+ * Associated backend contains a variable-length array. Needs to
+ * be last member.
+ */
+ struct channel_backend backend; /* Associated backend */
+ char padding[RB_CHANNEL_PADDING];
+} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
/* Per-subbuffer commit counters used on the hot path */
+#define RB_COMMIT_COUNT_HOT_PADDING 16
struct commit_counters_hot {
union v_atomic cc; /* Commit counter */
union v_atomic seq; /* Consecutive commits */
-};
+ char padding[RB_COMMIT_COUNT_HOT_PADDING];
+} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
/* Per-subbuffer commit counters used only on cold paths */
+#define RB_COMMIT_COUNT_COLD_PADDING 24
struct commit_counters_cold {
union v_atomic cc_sb; /* Incremented _once_ at sb switch */
-};
-
-/* Per-buffer read iterator */
-struct lib_ring_buffer_iter {
- u64 timestamp; /* Current record timestamp */
- size_t header_len; /* Current record header length */
- size_t payload_len; /* Current record payload length */
-
- struct list_head empty_node; /* Linked list of empty buffers */
- unsigned long consumed, read_offset, data_size;
- enum {
- ITER_GET_SUBBUF = 0,
- ITER_TEST_RECORD,
- ITER_NEXT_RECORD,
- ITER_PUT_SUBBUF,
- } state;
- int allocated:1;
- int read_open:1; /* Opened for reading ? */
-};
+ char padding[RB_COMMIT_COUNT_COLD_PADDING];
+} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
/* ring buffer state */
-struct lib_ring_buffer {
+#define RB_RING_BUFFER_PADDING 64
+struct lttng_ust_lib_ring_buffer {
/* First 32 bytes cache-hot cacheline */
union v_atomic offset; /* Current offset in the buffer */
- struct commit_counters_hot *commit_hot;
+ DECLARE_SHMP(struct commit_counters_hot, commit_hot);
/* Commit count per sub-buffer */
- atomic_long_t consumed; /*
+ long consumed; /*
* Current offset in the buffer
* standard atomic access (shared)
*/
- atomic_t record_disabled;
+ int record_disabled;
/* End of first 32 bytes cacheline */
union v_atomic last_tsc; /*
* Last timestamp written in the buffer.
*/
- struct lib_ring_buffer_backend backend; /* Associated backend */
+ struct lttng_ust_lib_ring_buffer_backend backend; /* Associated backend */
- struct commit_counters_cold *commit_cold;
+ DECLARE_SHMP(struct commit_counters_cold, commit_cold);
/* Commit count per sub-buffer */
- atomic_long_t active_readers; /*
+ long active_readers; /*
* Active readers count
* standard atomic access (shared)
*/
+ long active_shadow_readers;
/* Dropped records */
union v_atomic records_lost_full; /* Buffer full */
union v_atomic records_lost_wrap; /* Nested wrap-around */
union v_atomic records_lost_big; /* Events too big */
union v_atomic records_count; /* Number of records written */
union v_atomic records_overrun; /* Number of overwritten records */
- wait_queue_head_t read_wait; /* reader buffer-level wait queue */
+ //wait_queue_head_t read_wait; /* reader buffer-level wait queue */
int finalized; /* buffer has been finalized */
- struct timer_list switch_timer; /* timer for periodical switch */
- struct timer_list read_timer; /* timer for read poll */
- raw_spinlock_t raw_tick_nohz_spinlock; /* nohz entry lock/trylock */
- struct lib_ring_buffer_iter iter; /* read-side iterator */
+ //struct timer_list switch_timer; /* timer for periodical switch */
+ //struct timer_list read_timer; /* timer for read poll */
unsigned long get_subbuf_consumed; /* Read-side consumed */
unsigned long prod_snapshot; /* Producer count snapshot */
unsigned long cons_snapshot; /* Consumer count snapshot */
- int get_subbuf:1; /* Sub-buffer being held by reader */
- int switch_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */
- int read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */
-};
+ unsigned int get_subbuf:1, /* Sub-buffer being held by reader */
+ switch_timer_enabled:1, /* Protected by ring_buffer_nohz_lock */
+ read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */
+ /* shmp pointer to self */
+ DECLARE_SHMP(struct lttng_ust_lib_ring_buffer, self);
+ char padding[RB_RING_BUFFER_PADDING];
+} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
static inline
void *channel_get_private(struct channel *chan)
{
- return chan->backend.priv;
+ return ((char *) chan) + chan->priv_data_offset;
}
+#ifndef __rb_same_type
+#define __rb_same_type(a, b) __builtin_types_compatible_p(typeof(a), typeof(b))
+#endif
+
/*
* Issue warnings and disable channels upon internal error.
- * Can receive struct lib_ring_buffer or struct lib_ring_buffer_backend
+ * Can receive struct lttng_ust_lib_ring_buffer or struct lttng_ust_lib_ring_buffer_backend
* parameters.
*/
#define CHAN_WARN_ON(c, cond) \
({ \
struct channel *__chan; \
- int _____ret = unlikely(cond); \
+ int _____ret = caa_unlikely(cond); \
if (_____ret) { \
- if (__same_type(*(c), struct channel_backend)) \
- __chan = container_of((void *) (c), \
+ if (__rb_same_type(*(c), struct channel_backend)) \
+ __chan = caa_container_of((void *) (c), \
struct channel, \
backend); \
- else if (__same_type(*(c), struct channel)) \
+ else if (__rb_same_type(*(c), struct channel)) \
__chan = (void *) (c); \
else \
BUG_ON(1); \
- atomic_inc(&__chan->record_disabled); \
+ uatomic_inc(&__chan->record_disabled); \
WARN_ON(1); \
} \
_____ret; \