X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.h;h=aa8a401a0416efe089057f2e07b6187c38bf78d5;hp=72c580eb2e1be87894d98a4c40e83a09ce07b8a5;hb=6f9449c22eef59294cf1e1dc3610a5cbf14baec0;hpb=c70636a7342f34e3be68fcf411cf3e3718b8e73f diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 72c580eb2..aa8a401a0 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -1,21 +1,11 @@ /* - * Copyright (C) 2011 - Julien Desfossez - * Mathieu Desnoyers - * 2012 - David Goulet - * 2018 - Jérémie Galarneau + * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 Mathieu Desnoyers + * Copyright (C) 2012 David Goulet + * Copyright (C) 2018 Jérémie Galarneau * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License, version 2 only, - * as published by the Free Software Foundation. + * SPDX-License-Identifier: GPL-2.0-only * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #ifndef LIB_CONSUMER_H @@ -36,6 +26,9 @@ #include #include #include +#include + +struct lttng_consumer_local_data; /* Commands for consumer */ enum lttng_consumer_command { @@ -70,6 +63,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_CREATE_TRACE_CHUNK, LTTNG_CONSUMER_CLOSE_TRACE_CHUNK, LTTNG_CONSUMER_TRACE_CHUNK_EXISTS, + LTTNG_CONSUMER_CLEAR_CHANNEL, }; enum lttng_consumer_type { @@ -191,6 +185,8 @@ struct lttng_consumer_channel { int live_timer_enabled; timer_t live_timer; int live_timer_error; + /* Channel is part of a live session ? */ + bool is_live; /* For channel monitoring timer. */ int monitor_timer_enabled; @@ -250,6 +246,142 @@ struct lttng_consumer_channel { bool streams_sent_to_relayd; }; +struct stream_subbuffer { + union { + /* + * CONSUMER_CHANNEL_SPLICE + * No ownership assumed. + */ + int fd; + /* CONSUMER_CHANNEL_MMAP */ + struct lttng_buffer_view buffer; + } buffer; + union { + /* + * Common members are fine to access through either + * union entries (as per C11, Common Initial Sequence). + */ + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t version; + } metadata; + struct { + unsigned long subbuf_size; + unsigned long padded_subbuf_size; + uint64_t packet_size; + uint64_t content_size; + uint64_t timestamp_begin; + uint64_t timestamp_end; + uint64_t events_discarded; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) sequence_number; + uint64_t stream_id; + /* Left unset when unsupported. */ + LTTNG_OPTIONAL(uint64_t) stream_instance_id; + } data; + } info; +}; + +/* + * Perform any operation required to acknowledge + * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe). + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); + +/* + * Perform any operation required before a consumer stream is put + * to sleep before awaiting a data availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, + struct lttng_consumer_local_data *); + +/* + * Acquire the subbuffer at the current 'consumed' position. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Populate the stream_subbuffer's info member. The info to populate + * depends on the type (metadata/data) of the stream. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*extract_subbuffer_info_cb)( + struct lttng_consumer_stream *, struct stream_subbuffer *); + +/* + * Invoked after a subbuffer's info has been filled. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Consume subbuffer contents. + * + * Stream and channel locks are acquired during this call. + */ +typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); + +/* + * Release the current subbuffer and advance the 'consumed' position by + * one subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, + struct stream_subbuffer *); + +/* + * Invoked after consuming a subbuffer. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*post_consume_cb)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); + +/* + * Send a live beacon if no data is available. + * + * Stream and channel locks are acquired during this call. + */ +typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); + +/* + * Lock the stream and channel locks and any other stream-type specific + * lock that need to be acquired during the processing of an + * availability notification. + */ +typedef void (*lock_cb)(struct lttng_consumer_stream *); + +/* + * Unlock the stream and channel locks and any other stream-type specific + * lock before sleeping until the next availability notification. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*unlock_cb)(struct lttng_consumer_stream *); + +/* + * Invoked when a subbuffer's metadata version does not match the last + * known metadata version. + * + * Stream and channel locks are acquired during this call. + */ +typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); + /* * Internal representation of the streams, sessiond_key is used to identify * uniquely a stream. @@ -304,6 +436,11 @@ struct lttng_consumer_stream { */ bool quiescent; + /* + * True if the sequence number is not available (lttng-modules < 2.8). + */ + bool sequence_number_unavailable; + /* * metadata_timer_lock protects flags waiting_on_metadata and * missed_metadata_flush. @@ -438,12 +575,12 @@ struct lttng_consumer_stream { pthread_mutex_t metadata_rdv_lock; /* - * rotate_position represents the position in the ring-buffer that has to - * be flushed to disk to complete the ongoing rotation. When that position - * is reached, this tracefile can be closed and a new one is created in - * channel_read_only_attributes.path. + * rotate_position represents the packet sequence number of the last + * packet which belongs to the current trace chunk prior to the rotation. + * When that position is reached, this tracefile can be closed and a + * new one is created in channel_read_only_attributes.path. */ - unsigned long rotate_position; + uint64_t rotate_position; /* * Read-only copies of channel values. We cannot safely access the @@ -468,6 +605,24 @@ struct lttng_consumer_stream { * file before writing in it (regeneration). */ unsigned int reset_metadata_flag:1; + struct { + /* + * Invoked in the order of declaration. + * See callback type definitions. + */ + lock_cb lock; + on_wake_up_cb on_wake_up; + get_next_subbuffer_cb get_next_subbuffer; + extract_subbuffer_info_cb extract_subbuffer_info; + pre_consume_subbuffer_cb pre_consume_subbuffer; + reset_metadata_cb reset_metadata; + consume_subbuffer_cb consume_subbuffer; + put_next_subbuffer_cb put_next_subbuffer; + post_consume_cb post_consume; + send_live_beacon_cb send_live_beacon; + on_sleep_cb on_sleep; + unlock_cb unlock; + } read_subbuffer_ops; }; /* @@ -524,7 +679,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -743,7 +899,9 @@ void consumer_stream_update_channel_attributes( struct lttng_consumer_stream *stream, struct lttng_consumer_channel *channel); -struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, +struct lttng_consumer_stream *consumer_allocate_stream( + struct lttng_consumer_channel *channel, + uint64_t channel_key, uint64_t stream_key, const char *channel_name, uint64_t relayd_id, @@ -765,6 +923,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uint64_t session_id_per_pid, unsigned int monitor, unsigned int live_timer_interval, + bool is_in_live_session, const char *root_shm_path, const char *shm_path); void consumer_del_stream(struct lttng_consumer_stream *stream, @@ -788,21 +947,21 @@ void consumer_steal_stream_key(int key, struct lttng_ht *ht); struct lttng_consumer_local_data *lttng_consumer_create( enum lttng_consumer_type type, ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx), + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), int (*recv_channel)(struct lttng_consumer_channel *channel), int (*recv_stream)(struct lttng_consumer_stream *stream), int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); ssize_t lttng_consumer_on_read_subbuffer_mmap( struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + struct lttng_consumer_stream *stream, + const struct lttng_buffer_view *buffer, + unsigned long padding); ssize_t lttng_consumer_on_read_subbuffer_splice( struct lttng_consumer_local_data *ctx, struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding, - struct ctf_packet_index *index); + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, @@ -819,7 +978,8 @@ int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, int sock, struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, @@ -870,5 +1030,6 @@ void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); enum lttcomm_return_code lttng_consumer_init_command( struct lttng_consumer_local_data *ctx, const lttng_uuid sessiond_uuid); +int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */