+
+/*
+ * Ask the consumer the number of discarded events for a channel.
+ */
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer discarded events id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
+ msg.u.discarded_events.session_id = session_id;
+ msg.u.discarded_events.channel_key = channel_key;
+
+ *discarded = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_discarded = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_discarded,
+ sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *discarded += consumer_discarded;
+ }
+ ret = 0;
+ DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64,
+ *discarded, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Ask the consumer the number of lost packets for a channel.
+ */
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer lost packets id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
+ msg.u.lost_packets.session_id = session_id;
+ msg.u.lost_packets.channel_key = channel_key;
+
+ *lost = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost,
+ sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *lost += consumer_lost;
+ }
+ ret = 0;
+ DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64,
+ *lost, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}