consumerd: refactor: split read_subbuf into sub-operations
[lttng-tools.git] / src / common / consumer / consumer.h
index 13fc61617881de52d86d2839262aaacca7f0d906..aa8a401a0416efe089057f2e07b6187c38bf78d5 100644 (file)
@@ -1,21 +1,11 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
- *                      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *               2012 - David Goulet <dgoulet@efficios.com>
- *               2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License, version 2 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
  *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
 
 #ifndef LIB_CONSUMER_H
 
 #include <common/hashtable/hashtable.h>
 #include <common/compat/fcntl.h>
-#include <common/compat/uuid.h>
+#include <common/uuid.h>
 #include <common/sessiond-comm/sessiond-comm.h>
 #include <common/pipe.h>
 #include <common/index/ctf-index.h>
 #include <common/trace-chunk-registry.h>
 #include <common/credentials.h>
+#include <common/buffer-view.h>
+
+struct lttng_consumer_local_data;
 
 /* Commands for consumer */
 enum lttng_consumer_command {
@@ -70,6 +63,7 @@ enum lttng_consumer_command {
        LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
        LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
        LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
+       LTTNG_CONSUMER_CLEAR_CHANNEL,
 };
 
 enum lttng_consumer_type {
@@ -159,7 +153,7 @@ struct lttng_consumer_channel {
        /* For UST */
        uid_t ust_app_uid;      /* Application UID. */
        struct ustctl_consumer_channel *uchan;
-       unsigned char uuid[UUID_STR_LEN];
+       unsigned char uuid[LTTNG_UUID_STR_LEN];
        /*
         * Temporary stream list used to store the streams once created and waiting
         * to be sent to the session daemon by receiving the
@@ -191,6 +185,8 @@ struct lttng_consumer_channel {
        int live_timer_enabled;
        timer_t live_timer;
        int live_timer_error;
+       /* Channel is part of a live session ? */
+       bool is_live;
 
        /* For channel monitoring timer. */
        int monitor_timer_enabled;
@@ -250,6 +246,142 @@ struct lttng_consumer_channel {
        bool streams_sent_to_relayd;
 };
 
+struct stream_subbuffer {
+       union {
+               /*
+                * CONSUMER_CHANNEL_SPLICE
+                * No ownership assumed.
+                */
+               int fd;
+               /* CONSUMER_CHANNEL_MMAP */
+               struct lttng_buffer_view buffer;
+       } buffer;
+       union {
+               /*
+                * Common members are fine to access through either
+                * union entries (as per C11, Common Initial Sequence).
+                */
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t version;
+               } metadata;
+               struct {
+                       unsigned long subbuf_size;
+                       unsigned long padded_subbuf_size;
+                       uint64_t packet_size;
+                       uint64_t content_size;
+                       uint64_t timestamp_begin;
+                       uint64_t timestamp_end;
+                       uint64_t events_discarded;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) sequence_number;
+                       uint64_t stream_id;
+                       /* Left unset when unsupported. */
+                       LTTNG_OPTIONAL(uint64_t) stream_instance_id;
+               } data;
+       } info;
+};
+
+/*
+ * Perform any operation required to acknowledge
+ * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe).
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Perform any operation required before a consumer stream is put
+ * to sleep before awaiting a data availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_sleep_cb)(struct lttng_consumer_stream *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Acquire the subbuffer at the current 'consumed' position.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Populate the stream_subbuffer's info member. The info to populate
+ * depends on the type (metadata/data) of the stream.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*extract_subbuffer_info_cb)(
+               struct lttng_consumer_stream *, struct stream_subbuffer *);
+
+/*
+ * Invoked after a subbuffer's info has been filled.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Consume subbuffer contents.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *,
+               struct lttng_consumer_stream *,
+               const struct stream_subbuffer *);
+
+/*
+ * Release the current subbuffer and advance the 'consumed' position by
+ * one subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *,
+               struct stream_subbuffer *);
+
+/*
+ * Invoked after consuming a subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*post_consume_cb)(struct lttng_consumer_stream *,
+               const struct stream_subbuffer *,
+               struct lttng_consumer_local_data *);
+
+/*
+ * Send a live beacon if no data is available.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Lock the stream and channel locks and any other stream-type specific
+ * lock that need to be acquired during the processing of an
+ * availability notification.
+ */
+typedef void (*lock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Unlock the stream and channel locks and any other stream-type specific
+ * lock before sleeping until the next availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Invoked when a subbuffer's metadata version does not match the last
+ * known metadata version.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *);
+
 /*
  * Internal representation of the streams, sessiond_key is used to identify
  * uniquely a stream.
@@ -304,6 +436,11 @@ struct lttng_consumer_stream {
         */
        bool quiescent;
 
+       /*
+        * True if the sequence number is not available (lttng-modules < 2.8).
+        */
+       bool sequence_number_unavailable;
+
        /*
         * metadata_timer_lock protects flags waiting_on_metadata and
         * missed_metadata_flush.
@@ -438,12 +575,12 @@ struct lttng_consumer_stream {
        pthread_mutex_t metadata_rdv_lock;
 
        /*
-        * rotate_position represents the position in the ring-buffer that has to
-        * be flushed to disk to complete the ongoing rotation. When that position
-        * is reached, this tracefile can be closed and a new one is created in
-        * channel_read_only_attributes.path.
+        * rotate_position represents the packet sequence number of the last
+        * packet which belongs to the current trace chunk prior to the rotation.
+        * When that position is reached, this tracefile can be closed and a
+        * new one is created in channel_read_only_attributes.path.
         */
-       unsigned long rotate_position;
+       uint64_t rotate_position;
 
        /*
         * Read-only copies of channel values. We cannot safely access the
@@ -468,6 +605,24 @@ struct lttng_consumer_stream {
         * file before writing in it (regeneration).
         */
        unsigned int reset_metadata_flag:1;
+       struct {
+               /*
+                * Invoked in the order of declaration.
+                * See callback type definitions.
+                */
+               lock_cb lock;
+               on_wake_up_cb on_wake_up;
+               get_next_subbuffer_cb get_next_subbuffer;
+               extract_subbuffer_info_cb extract_subbuffer_info;
+               pre_consume_subbuffer_cb pre_consume_subbuffer;
+               reset_metadata_cb reset_metadata;
+               consume_subbuffer_cb consume_subbuffer;
+               put_next_subbuffer_cb put_next_subbuffer;
+               post_consume_cb post_consume;
+               send_live_beacon_cb send_live_beacon;
+               on_sleep_cb on_sleep;
+               unlock_cb unlock;
+       } read_subbuffer_ops;
 };
 
 /*
@@ -524,7 +679,8 @@ struct lttng_consumer_local_data {
         * Returns the number of bytes read, or negative error value.
         */
        ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx);
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller);
        /*
         * function to call when we receive a new channel, it receives a
         * newly allocated channel, depending on the return code of this
@@ -743,7 +899,9 @@ void consumer_stream_update_channel_attributes(
                struct lttng_consumer_stream *stream,
                struct lttng_consumer_channel *channel);
 
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+               struct lttng_consumer_channel *channel,
+               uint64_t channel_key,
                uint64_t stream_key,
                const char *channel_name,
                uint64_t relayd_id,
@@ -765,6 +923,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                uint64_t session_id_per_pid,
                unsigned int monitor,
                unsigned int live_timer_interval,
+               bool is_in_live_session,
                const char *root_shm_path,
                const char *shm_path);
 void consumer_del_stream(struct lttng_consumer_stream *stream,
@@ -788,21 +947,21 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht);
 struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
                ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
-                       struct lttng_consumer_local_data *ctx),
+                       struct lttng_consumer_local_data *ctx,
+                       bool locked_by_caller),
                int (*recv_channel)(struct lttng_consumer_channel *channel),
                int (*recv_stream)(struct lttng_consumer_stream *stream),
                int (*update_stream)(uint64_t sessiond_key, uint32_t state));
 void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               struct lttng_consumer_stream *stream,
+               const struct lttng_buffer_view *buffer,
+               unsigned long padding);
 ssize_t lttng_consumer_on_read_subbuffer_splice(
                struct lttng_consumer_local_data *ctx,
                struct lttng_consumer_stream *stream, unsigned long len,
-               unsigned long padding,
-               struct ctf_packet_index *index);
+               unsigned long padding);
 int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
 int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
 int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
@@ -819,7 +978,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                int sock, struct pollfd *consumer_sockpoll);
 
 ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
-               struct lttng_consumer_local_data *ctx);
+               struct lttng_consumer_local_data *ctx,
+               bool locked_by_caller);
 int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
 void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
                struct lttng_consumer_local_data *ctx, int sock,
@@ -861,7 +1021,8 @@ enum lttcomm_return_code lttng_consumer_create_trace_chunk(
 enum lttcomm_return_code lttng_consumer_close_trace_chunk(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id, time_t chunk_close_timestamp,
-               const enum lttng_trace_chunk_command_type *close_command);
+               const enum lttng_trace_chunk_command_type *close_command,
+               char *path);
 enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
                const uint64_t *relayd_id, uint64_t session_id,
                uint64_t chunk_id);
@@ -869,5 +1030,6 @@ void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
 enum lttcomm_return_code lttng_consumer_init_command(
                struct lttng_consumer_local_data *ctx,
                const lttng_uuid sessiond_uuid);
+int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
This page took 0.028249 seconds and 5 git commands to generate.