#include <common/compat/fcntl.h>
#include <common/sessiond-comm/sessiond-comm.h>
-/*
- * When the receiving thread dies, we need to have a way to make the polling
- * thread exit eventually. If all FDs hang up (normal case when the
- * lttng-sessiond stops), we can exit cleanly, but if there is a problem and
- * for whatever reason some FDs remain open, the consumer should still exit
- * eventually.
- *
- * If the timeout is reached, it means that during this period no events
- * occurred on the FDs so we need to force an exit. This case should not happen
- * but it is a safety to ensure we won't block the consumer indefinitely.
- *
- * The value of 2 seconds is an arbitrary choice.
- */
-#define LTTNG_CONSUMER_POLL_TIMEOUT 2000
-
/* Commands for consumer */
enum lttng_consumer_command {
LTTNG_CONSUMER_ADD_CHANNEL,
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
+ /* Return to the sessiond if there is data pending for a session */
+ LTTNG_CONSUMER_DATA_PENDING,
};
/* State of each fd in consumer */
LTTNG_CONSUMER32_UST,
};
+enum consumer_endpoint_status {
+ CONSUMER_ENDPOINT_ACTIVE,
+ CONSUMER_ENDPOINT_INACTIVE,
+};
+
struct lttng_consumer_channel {
struct lttng_ht_node_ulong node;
int key;
* uniquely a stream.
*/
struct lttng_consumer_stream {
+ /* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_ulong node;
- struct lttng_ht_node_ulong waitfd_node;
+ /* HT node used in consumer_data.stream_list_ht */
+ struct lttng_ht_node_ulong node_session_id;
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
uint64_t relayd_stream_id;
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
+ /*
+ * Lock to use the stream FDs since they are used between threads.
+ *
+ * This is nested INSIDE the consumer_data lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t lock;
+ /* Tracing session id */
+ uint64_t session_id;
+ /*
+ * Indicates if the stream end point is still active or not (network
+ * streaming or local file system). The thread "owning" the stream is
+ * handling this status and can be notified of a state change through the
+ * consumer data appropriate pipe.
+ */
+ enum consumer_endpoint_status endpoint_status;
};
/*
* between threads sending data to the relayd. Since metadata data is sent
* over that socket, at least two sendmsg() are needed (header + data)
* creating a race for packets to overlap between threads using it.
+ *
+ * This is nested INSIDE the consumer_data lock.
+ * This is nested INSIDE the stream lock.
*/
pthread_mutex_t ctrl_sock_mutex;
*/
struct lttcomm_sock data_sock;
struct lttng_ht_node_ulong node;
+
+ /* Session id on both sides for the sockets. */
+ uint64_t relayd_session_id;
+ uint64_t sessiond_session_id;
};
/*
* and number of element in the hash table. It's also a protection for
* concurrent read/write between threads.
*
- * XXX: We need to see if this lock is still needed with the lockless RCU
- * hash tables.
+ * This is nested OUTSIDE the stream lock.
+ * This is nested OUTSIDE the consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
/*
- * Number of streams in the hash table. Protected by consumer_data.lock.
+ * Number of streams in the data stream hash table declared outside.
+ * Protected by consumer_data.lock.
*/
int stream_count;
- /*
- * Hash tables of streams and channels. Protected by consumer_data.lock.
- */
- struct lttng_ht *stream_ht;
+
+ /* Channel hash table protected by consumer_data.lock. */
struct lttng_ht *channel_ht;
/*
* Flag specifying if the local array of FDs needs update in the
* stream has an index which associate the right relayd socket to use.
*/
struct lttng_ht *relayd_ht;
+
+ /*
+ * This hash table contains all streams (metadata and data) indexed by
+ * session id. In other words, the ht is indexed by session id and each
+ * bucket contains the list of associated streams.
+ *
+ * This HT uses the "node_session_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_list_ht;
};
-/* Defined in consumer.c and coupled with explanations */
-extern struct lttng_ht *metadata_ht;
-extern struct lttng_ht *data_ht;
+/*
+ * Session id mapping structure storred in relayd_session_id_ht.
+ */
+struct consumer_relayd_session_id {
+ uint64_t sessiond_id;
+ uint64_t relayd_id;
+ struct lttng_ht_node_ulong node;
+};
/*
* Init consumer data structures.
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret);
extern void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-extern void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state);
extern void consumer_del_channel(struct lttng_consumer_channel *channel);
extern struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
+int consumer_data_pending(uint64_t id);
+int consumer_send_status_msg(int sock, int ret_code);
#endif /* LIB_CONSUMER_H */