#include <unistd.h>
#include <common/common.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/metadata-bucket.h>
+#include <common/consumer/metadata-bucket.h>
#include <common/index/index.h>
#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/macros.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
#include <common/utils.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/kernel-ctl/kernel-ctl.h>
#include "consumer-stream.h"
pthread_mutex_unlock(&stream->chan->lock);
}
+static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+ ASSERT_LOCKED(stream->lock);
+ ASSERT_LOCKED(stream->chan->lock);
+}
+
static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
{
consumer_stream_data_lock_all(stream);
consumer_stream_data_unlock_all(stream);
}
+static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+ ASSERT_LOCKED(stream->metadata_rdv_lock);
+ consumer_stream_data_assert_locked_all(stream);
+}
+
/* Only used for data streams. */
static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuf)
consumer_stream_metadata_lock_all;
stream->read_subbuffer_ops.unlock =
consumer_stream_metadata_unlock_all;
+ stream->read_subbuffer_ops.assert_locked =
+ consumer_stream_metadata_assert_locked_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
metadata_stream_check_version;
} else {
stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
stream->read_subbuffer_ops.unlock =
consumer_stream_data_unlock_all;
+ stream->read_subbuffer_ops.assert_locked =
+ consumer_stream_data_assert_locked_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
consumer_stream_update_stats;
}