#include <common/compat/fcntl.h>
#include <common/sessiond-comm/sessiond-comm.h>
-/*
- * When the receiving thread dies, we need to have a way to make the polling
- * thread exit eventually. If all FDs hang up (normal case when the
- * lttng-sessiond stops), we can exit cleanly, but if there is a problem and
- * for whatever reason some FDs remain open, the consumer should still exit
- * eventually.
- *
- * If the timeout is reached, it means that during this period no events
- * occurred on the FDs so we need to force an exit. This case should not happen
- * but it is a safety to ensure we won't block the consumer indefinitely.
- *
- * The value of 2 seconds is an arbitrary choice.
- */
-#define LTTNG_CONSUMER_POLL_TIMEOUT 2000
-
/* Commands for consumer */
enum lttng_consumer_command {
LTTNG_CONSUMER_ADD_CHANNEL,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
/* Return to the sessiond if there is data pending for a session */
- LTTNG_CONSUMER_DATA_AVAILABLE,
+ LTTNG_CONSUMER_DATA_PENDING,
};
/* State of each fd in consumer */
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
/*
- * Lock to use the stream FDs since they are used between threads. Using
- * this lock with network streaming, when using the control mutex of a
- * consumer_relayd_sock_pair, make sure to acquire this lock BEFORE locking
- * it and releasing it AFTER the control mutex unlock.
+ * Lock to use the stream FDs since they are used between threads.
+ *
+ * This is nested INSIDE the consumer_data lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
/* Tracing session id */
* between threads sending data to the relayd. Since metadata data is sent
* over that socket, at least two sendmsg() are needed (header + data)
* creating a race for packets to overlap between threads using it.
+ *
+ * This is nested INSIDE the consumer_data lock.
+ * This is nested INSIDE the stream lock.
*/
pthread_mutex_t ctrl_sock_mutex;
*/
struct lttcomm_sock data_sock;
struct lttng_ht_node_ulong node;
+
+ /* Session id on both sides for the sockets. */
+ uint64_t relayd_session_id;
+ uint64_t sessiond_session_id;
};
/*
* and number of element in the hash table. It's also a protection for
* concurrent read/write between threads.
*
- * XXX: We need to see if this lock is still needed with the lockless RCU
- * hash tables.
+ * This is nested OUTSIDE the stream lock.
+ * This is nested OUTSIDE the consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
struct lttng_ht *stream_list_ht;
};
-/* Defined in consumer.c and coupled with explanations */
-extern struct lttng_ht *metadata_ht;
-extern struct lttng_ht *data_ht;
+/*
+ * Session id mapping structure storred in relayd_session_id_ht.
+ */
+struct consumer_relayd_session_id {
+ uint64_t sessiond_id;
+ uint64_t relayd_id;
+ struct lttng_ht_node_ulong node;
+};
/*
* Init consumer data structures.
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
-int consumer_data_available(uint64_t id);
+int consumer_data_pending(uint64_t id);
+int consumer_send_status_msg(int sock, int ret_code);
#endif /* LIB_CONSUMER_H */