consumerd: refactor: split read_subbuf into sub-operations
[lttng-tools.git] / src / common / consumer / consumer.h
index 000040982d5dfb4bba6abaa2bb2a6a9e1f3d5b75..aa8a401a0416efe089057f2e07b6187c38bf78d5 100644 (file)
@@ -28,6 +28,8 @@
 #include <common/credentials.h>
 #include <common/buffer-view.h>
 
+struct lttng_consumer_local_data;
+
 /* Commands for consumer */
 enum lttng_consumer_command {
        LTTNG_CONSUMER_ADD_CHANNEL,
@@ -244,6 +246,142 @@ struct lttng_consumer_channel {
        bool streams_sent_to_relayd;
 };
 
+struct stream_subbuffer {
+       union {
+               /*
+                * CONSUMER_CHANNEL_SPLICE
+                * No ownership assumed.
+                */
+               int fd;
+               /* CONSUMER_CHANNEL_MMAP */
+               struct lttng_buffer_view buffer;
+       } buffer;
+       union {
+               /*
+                * Common members are fine to access through either
+                * union entries (as per C11, Common Initial Sequence).
+                */
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t version;
+               } metadata;
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t packet_size;
+                       uint64_t content_size;
+                       uint64_t timestamp_begin;
+                       uint64_t timestamp_end;
+                       uint64_t events_discarded;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) sequence_number;
+                       uint64_t stream_id;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) stream_instance_id;
+               } data;
+       } info;
+};
+
+/*
+ * Perform any operation required to acknowledge
+ * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe).
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Perform any operation required before a consumer stream is put
+ * to sleep before awaiting a data availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_sleep_cb)(struct lttng_consumer_stream *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Acquire the subbuffer at the current 'consumed' position.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Populate the stream_subbuffer's info member. The info to populate
+ * depends on the type (metadata/data) of the stream.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*extract_subbuffer_info_cb)(
+               struct lttng_consumer_stream *, struct stream_subbuffer *);
+
+/*
+ * Invoked after a subbuffer's info has been filled.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Consume subbuffer contents.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *,
+               struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Release the current subbuffer and advance the 'consumed' position by
+ * one subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Invoked after consuming a subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*post_consume_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Send a live beacon if no data is available.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Lock the stream and channel locks and any other stream-type specific
+ * lock that need to be acquired during the processing of an
+ * availability notification.
+ */
+typedef void (*lock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Unlock the stream and channel locks and any other stream-type specific
+ * lock before sleeping until the next availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Invoked when a subbuffer's metadata version does not match the last
+ * known metadata version.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *);
+
 /*
  * Internal representation of the streams, sessiond_key is used to identify
  * uniquely a stream.
@@ -467,6 +605,24 @@ struct lttng_consumer_stream {
         * file before writing in it (regeneration).
         */
        unsigned int reset_metadata_flag:1;
+       struct {
+               /*
+                * Invoked in the order of declaration.
+                * See callback type definitions.
+                */
+               lock_cb lock;
+               on_wake_up_cb on_wake_up;
+               get_next_subbuffer_cb get_next_subbuffer;
+               extract_subbuffer_info_cb extract_subbuffer_info;
+               pre_consume_subbuffer_cb pre_consume_subbuffer;
+               reset_metadata_cb reset_metadata;
+               consume_subbuffer_cb consume_subbuffer;
+               put_next_subbuffer_cb put_next_subbuffer;
+               post_consume_cb post_consume;
+               send_live_beacon_cb send_live_beacon;
+               on_sleep_cb on_sleep;
+               unlock_cb unlock;
+       } read_subbuffer_ops;
 };
 
 /*
@@ -523,7 +679,8 @@ struct lttng_consumer_local_data {
         * Returns the number of bytes read, or negative error value.
         */
        ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx);
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller);
        /*
         * function to call when we receive a new channel, it receives a
         * newly allocated channel, depending on the return code of this
@@ -790,7 +947,8 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht);
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
                ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx),
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(uint64_t sessiond_key, uint32_t state));
@@ -799,13 +957,11 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream,
                const struct lttng_buffer_view *buffer,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               unsigned long padding);
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               unsigned long padding);
 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
@@ -822,7 +978,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx);
+               struct lttng_consumer_local_data *ctx,
+               bool locked_by_caller);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
This page took 0.03691 seconds and 5 git commands to generate.