#include <sys/mman.h>
#include <unistd.h>
+#include <common/buffer-view.hpp>
#include <common/common.hpp>
#include <common/consumer/consumer-timer.hpp>
-#include <common/consumer/consumer-timer.hpp>
-#include <common/consumer/consumer.hpp>
#include <common/consumer/consumer.hpp>
#include <common/consumer/metadata-bucket.hpp>
-#include <common/consumer/metadata-bucket.hpp>
#include <common/index/index.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
#include "consumer-stream.hpp"
+struct metadata_packet_header {
+ uint32_t magic; /* 0x75D11D57 */
+ uint8_t uuid[16]; /* Unique Universal Identifier */
+ uint32_t checksum; /* 0 if unused */
+ uint32_t content_size; /* in bits */
+ uint32_t packet_size; /* in bits */
+ uint8_t compression_scheme; /* 0 if unused */
+ uint8_t encryption_scheme; /* 0 if unused */
+ uint8_t checksum_scheme; /* 0 if unused */
+ uint8_t major; /* CTF spec major version number */
+ uint8_t minor; /* CTF spec minor version number */
+ uint8_t header_end[0];
+};
+
+static size_t metadata_length(void)
+{
+ return offsetof(struct metadata_packet_header, header_end);
+}
+
/*
* RCU call to free stream. MUST only be used with call_rcu().
*/
}
/* Only used for data streams. */
-static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuf)
+static int consumer_stream_update_stats(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuf_)
{
int ret = 0;
uint64_t sequence_number;
+ const stream_subbuffer *subbuf = subbuf_;
const uint64_t discarded_events = subbuf->info.data.events_discarded;
if (!subbuf->info.data.sequence_number.is_set) {
* of the metadata stream in the kernel. If it was updated, set the reset flag
* on the stream.
*/
-static
-int metadata_stream_check_version(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuffer)
+static void metadata_stream_check_version(
+ struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer)
{
if (stream->metadata_version == subbuffer->info.metadata.version) {
- goto end;
+ return;
}
DBG("New metadata version detected");
if (stream->read_subbuffer_ops.reset_metadata) {
stream->read_subbuffer_ops.reset_metadata(stream);
}
+}
-end:
+static void strip_packet_header_from_subbuffer(struct stream_subbuffer *buffer)
+{
+ /*
+ * Change the view and hide the packer header and padding from the view
+ */
+ size_t new_subbuf_size = buffer->info.metadata.subbuf_size - metadata_length();
+
+ buffer->buffer.buffer = lttng_buffer_view_from_view(
+ &buffer->buffer.buffer, metadata_length(), new_subbuf_size);
+
+ buffer->info.metadata.subbuf_size = new_subbuf_size;
+ /* Padding is not present in the view anymore */
+ buffer->info.metadata.padded_subbuf_size = new_subbuf_size;
+}
+
+static int metadata_stream_pre_consume_ctf1(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+ (void) metadata_stream_check_version(stream, subbuffer);
+ return 0;
+}
+
+static int metadata_stream_pre_consume_ctf2(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+ (void) metadata_stream_check_version(stream, subbuffer);
+ (void) strip_packet_header_from_subbuffer(subbuffer);
return 0;
}
consumer_stream_metadata_unlock_all;
stream->read_subbuffer_ops.assert_locked =
consumer_stream_metadata_assert_locked_all;
- stream->read_subbuffer_ops.pre_consume_subbuffer =
- metadata_stream_check_version;
+ if (trace_format == 1) {
+ stream->read_subbuffer_ops.pre_consume_subbuffer =
+ metadata_stream_pre_consume_ctf1;
+ } else if (trace_format == 2) {
+ stream->read_subbuffer_ops.pre_consume_subbuffer =
+ metadata_stream_pre_consume_ctf2;
+ } else {
+ abort();
+ }
} else {
const post_consume_cb post_consume_index_op = channel->is_live ?
consumer_stream_sync_metadata_index :