+ chan->ops->event_write(&ctx, str, reserve_len);
+ chan->ops->event_commit(&ctx);
+
+end:
+ return reserve_len;
+}
+
+int ustctl_channel_close_wait_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+ int ret;
+
+ chan = consumer_chan->chan->chan;
+ ret = ring_buffer_channel_close_wait_fd(&chan->backend.config,
+ chan, chan->handle);
+ if (!ret)
+ consumer_chan->wait_fd = -1;
+ return ret;
+}
+
+int ustctl_channel_close_wakeup_fd(struct ustctl_consumer_channel *consumer_chan)
+{
+ struct channel *chan;
+ int ret;
+
+ chan = consumer_chan->chan->chan;
+ ret = ring_buffer_channel_close_wakeup_fd(&chan->backend.config,
+ chan, chan->handle);
+ if (!ret)
+ consumer_chan->wakeup_fd = -1;
+ return ret;
+}
+
+int ustctl_stream_close_wait_fd(struct ustctl_consumer_stream *stream)
+{
+ struct channel *chan;
+
+ chan = stream->chan->chan->chan;
+ return ring_buffer_stream_close_wait_fd(&chan->backend.config,
+ chan, stream->handle, stream->cpu);
+}
+
+int ustctl_stream_close_wakeup_fd(struct ustctl_consumer_stream *stream)
+{
+ struct channel *chan;
+
+ chan = stream->chan->chan->chan;
+ return ring_buffer_stream_close_wakeup_fd(&chan->backend.config,
+ chan, stream->handle, stream->cpu);
+}
+
+struct ustctl_consumer_stream *
+ ustctl_create_stream(struct ustctl_consumer_channel *channel,
+ int cpu)
+{
+ struct ustctl_consumer_stream *stream;
+ struct lttng_ust_shm_handle *handle;
+ struct channel *chan;
+ int shm_fd, wait_fd, wakeup_fd;
+ uint64_t memory_map_size;
+ struct lttng_ust_lib_ring_buffer *buf;
+ int ret;
+
+ if (!channel)
+ return NULL;
+ handle = channel->chan->handle;
+ if (!handle)
+ return NULL;
+
+ chan = channel->chan->chan;
+ buf = channel_get_ring_buffer(&chan->backend.config,
+ chan, cpu, handle, &shm_fd, &wait_fd,
+ &wakeup_fd, &memory_map_size);
+ if (!buf)
+ return NULL;
+ ret = lib_ring_buffer_open_read(buf, handle);
+ if (ret)
+ return NULL;
+
+ stream = zmalloc(sizeof(*stream));
+ if (!stream)
+ goto alloc_error;
+ stream->handle = handle;
+ stream->buf = buf;
+ stream->chan = channel;
+ stream->shm_fd = shm_fd;
+ stream->wait_fd = wait_fd;
+ stream->wakeup_fd = wakeup_fd;
+ stream->memory_map_size = memory_map_size;
+ stream->cpu = cpu;
+ return stream;
+
+alloc_error:
+ return NULL;
+}
+
+void ustctl_destroy_stream(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ assert(stream);
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ (void) ustctl_stream_close_wait_fd(stream);
+ (void) ustctl_stream_close_wakeup_fd(stream);
+ lib_ring_buffer_release_read(buf, consumer_chan->chan->handle);
+ free(stream);
+}
+
+int ustctl_channel_get_wait_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wait_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_channel_get_wakeup_fd(struct ustctl_consumer_channel *chan)
+{
+ if (!chan)
+ return -EINVAL;
+ return shm_get_wakeup_fd(chan->chan->handle,
+ &chan->chan->handle->chan._ref);
+}
+
+int ustctl_stream_get_wait_fd(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shm_get_wait_fd(consumer_chan->chan->handle, &buf->self._ref);
+}
+
+int ustctl_stream_get_wakeup_fd(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shm_get_wakeup_fd(consumer_chan->chan->handle, &buf->self._ref);
+}
+
+/* For mmap mode, readable without "get" operation */
+
+void *ustctl_get_mmap_base(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return NULL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return shmp(consumer_chan->chan->handle, buf->backend.memory_map);
+}
+
+/* returns the length to mmap. */
+int ustctl_get_mmap_len(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ unsigned long mmap_buf_len;
+ struct channel *chan;
+
+ if (!stream)
+ return -EINVAL;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ if (chan->backend.config.output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ mmap_buf_len = chan->backend.buf_size;
+ if (chan->backend.extra_reader_sb)
+ mmap_buf_len += chan->backend.subbuf_size;
+ if (mmap_buf_len > INT_MAX)
+ return -EFBIG;
+ *len = mmap_buf_len;
+ return 0;
+}
+
+/* returns the maximum size for sub-buffers. */
+int ustctl_get_max_subbuf_size(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ struct channel *chan;
+
+ if (!stream)
+ return -EINVAL;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ *len = chan->backend.subbuf_size;
+ return 0;
+}
+
+/*
+ * For mmap mode, operate on the current packet (between get/put or
+ * get_next/put_next).
+ */
+
+/* returns the offset of the subbuffer belonging to the mmap reader. */
+int ustctl_get_mmap_read_offset(struct ustctl_consumer_stream *stream,
+ unsigned long *off)
+{
+ struct channel *chan;
+ unsigned long sb_bindex;
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+ struct lttng_ust_lib_ring_buffer_backend_pages_shmp *barray_idx;
+ struct lttng_ust_lib_ring_buffer_backend_pages *pages;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ if (chan->backend.config.output != RING_BUFFER_MMAP)
+ return -EINVAL;
+ sb_bindex = subbuffer_id_get_index(&chan->backend.config,
+ buf->backend.buf_rsb.id);
+ barray_idx = shmp_index(consumer_chan->chan->handle, buf->backend.array,
+ sb_bindex);
+ if (!barray_idx)
+ return -EINVAL;
+ pages = shmp(consumer_chan->chan->handle, barray_idx->shmp);
+ if (!pages)
+ return -EINVAL;
+ *off = pages->mmap_offset;
+ return 0;
+}
+
+/* returns the size of the current sub-buffer, without padding (for mmap). */
+int ustctl_get_subbuf_size(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ struct channel *chan;
+ struct lttng_ust_lib_ring_buffer *buf;
+
+ if (!stream)
+ return -EINVAL;
+
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ *len = lib_ring_buffer_get_read_data_size(&chan->backend.config, buf,
+ consumer_chan->chan->handle);
+ return 0;
+}
+
+/* returns the size of the current sub-buffer, without padding (for mmap). */
+int ustctl_get_padded_subbuf_size(struct ustctl_consumer_stream *stream,
+ unsigned long *len)
+{
+ struct ustctl_consumer_channel *consumer_chan;
+ struct channel *chan;
+ struct lttng_ust_lib_ring_buffer *buf;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ chan = consumer_chan->chan->chan;
+ *len = lib_ring_buffer_get_read_data_size(&chan->backend.config, buf,
+ consumer_chan->chan->handle);
+ *len = LTTNG_UST_PAGE_ALIGN(*len);
+ return 0;
+}
+
+/* Get exclusive read access to the next sub-buffer that can be read. */
+int ustctl_get_next_subbuf(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return lib_ring_buffer_get_next_subbuf(buf,
+ consumer_chan->chan->handle);
+}
+
+
+/* Release exclusive sub-buffer access, move consumer forward. */
+int ustctl_put_next_subbuf(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ lib_ring_buffer_put_next_subbuf(buf, consumer_chan->chan->handle);
+ return 0;
+}
+
+/* snapshot */
+
+/* Get a snapshot of the current ring buffer producer and consumer positions */
+int ustctl_snapshot(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return lib_ring_buffer_snapshot(buf, &buf->cons_snapshot,
+ &buf->prod_snapshot, consumer_chan->chan->handle);
+}
+
+/*
+ * Get a snapshot of the current ring buffer producer and consumer positions
+ * even if the consumed and produced positions are contained within the same
+ * subbuffer.
+ */
+int ustctl_snapshot_sample_positions(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return lib_ring_buffer_snapshot_sample_positions(buf,
+ &buf->cons_snapshot, &buf->prod_snapshot,
+ consumer_chan->chan->handle);
+}
+
+/* Get the consumer position (iteration start) */
+int ustctl_snapshot_get_consumed(struct ustctl_consumer_stream *stream,
+ unsigned long *pos)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ *pos = buf->cons_snapshot;
+ return 0;
+}
+
+/* Get the producer position (iteration end) */
+int ustctl_snapshot_get_produced(struct ustctl_consumer_stream *stream,
+ unsigned long *pos)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ *pos = buf->prod_snapshot;
+ return 0;
+}
+
+/* Get exclusive read access to the specified sub-buffer position */
+int ustctl_get_subbuf(struct ustctl_consumer_stream *stream,
+ unsigned long *pos)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ return lib_ring_buffer_get_subbuf(buf, *pos,
+ consumer_chan->chan->handle);
+}
+
+/* Release exclusive sub-buffer access */
+int ustctl_put_subbuf(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ if (!stream)
+ return -EINVAL;
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ lib_ring_buffer_put_subbuf(buf, consumer_chan->chan->handle);
+ return 0;
+}
+
+void ustctl_flush_buffer(struct ustctl_consumer_stream *stream,
+ int producer_active)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ assert(stream);
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ lib_ring_buffer_switch_slow(buf,
+ producer_active ? SWITCH_ACTIVE : SWITCH_FLUSH,
+ consumer_chan->chan->handle);
+}
+
+void ustctl_clear_buffer(struct ustctl_consumer_stream *stream)
+{
+ struct lttng_ust_lib_ring_buffer *buf;
+ struct ustctl_consumer_channel *consumer_chan;
+
+ assert(stream);
+ buf = stream->buf;
+ consumer_chan = stream->chan;
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+ consumer_chan->chan->handle);
+ lib_ring_buffer_clear_reader(buf, consumer_chan->chan->handle);
+}
+
+static
+struct lttng_ust_client_lib_ring_buffer_client_cb *get_client_cb(
+ struct lttng_ust_lib_ring_buffer *buf,
+ struct lttng_ust_shm_handle *handle)
+{
+ struct channel *chan;
+ const struct lttng_ust_lib_ring_buffer_config *config;
+ struct lttng_ust_client_lib_ring_buffer_client_cb *client_cb;
+
+ chan = shmp(handle, buf->backend.chan);
+ if (!chan)
+ return NULL;
+ config = &chan->backend.config;
+ if (!config->cb_ptr)