/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
*
#include <sys/mman.h>
#include <unistd.h>
-#include <common/common.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/index/index.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/macros.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/utils.h>
-
-#include "consumer-stream.h"
+#include <common/buffer-view.hpp>
+#include <common/common.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/consumer/metadata-bucket.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/macros.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
+
+#include "consumer-stream.hpp"
+
+struct metadata_packet_header {
+ uint32_t magic; /* 0x75D11D57 */
+ uint8_t uuid[16]; /* Unique Universal Identifier */
+ uint32_t checksum; /* 0 if unused */
+ uint32_t content_size; /* in bits */
+ uint32_t packet_size; /* in bits */
+ uint8_t compression_scheme; /* 0 if unused */
+ uint8_t encryption_scheme; /* 0 if unused */
+ uint8_t checksum_scheme; /* 0 if unused */
+ uint8_t major; /* CTF spec major version number */
+ uint8_t minor; /* CTF spec minor version number */
+ uint8_t header_end[0];
+};
+
+static size_t metadata_length(void)
+{
+ return offsetof(struct metadata_packet_header, header_end);
+}
/*
* RCU call to free stream. MUST only be used with call_rcu().
static void free_stream_rcu(struct rcu_head *head)
{
struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
+ lttng::utils::container_of(node, <tng_consumer_stream::node);
pthread_mutex_destroy(&stream->lock);
free(stream);
}
/* Only used for data streams. */
-static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuf)
+static int consumer_stream_update_stats(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuf_)
{
int ret = 0;
uint64_t sequence_number;
+ const stream_subbuffer *subbuf = subbuf_;
const uint64_t discarded_events = subbuf->info.data.events_discarded;
if (!subbuf->info.data.sequence_number.is_set) {
}
static ssize_t consumer_stream_consume_mmap(
- struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_local_data *ctx __attribute__((unused)),
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer)
{
static int consumer_stream_send_index(
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
off_t packet_offset = 0;
struct ctf_packet_index index = {};
* of the metadata stream in the kernel. If it was updated, set the reset flag
* on the stream.
*/
-static
-int metadata_stream_check_version(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuffer)
+static void metadata_stream_check_version(
+ struct lttng_consumer_stream *stream, const struct stream_subbuffer *subbuffer)
{
if (stream->metadata_version == subbuffer->info.metadata.version) {
- goto end;
+ return;
}
DBG("New metadata version detected");
if (stream->read_subbuffer_ops.reset_metadata) {
stream->read_subbuffer_ops.reset_metadata(stream);
}
+}
-end:
+static void strip_packet_header_from_subbuffer(struct stream_subbuffer *buffer)
+{
+ /*
+ * Change the view and hide the packer header and padding from the view
+ */
+ size_t new_subbuf_size = buffer->info.metadata.subbuf_size - metadata_length();
+
+ buffer->buffer.buffer = lttng_buffer_view_from_view(
+ &buffer->buffer.buffer, metadata_length(), new_subbuf_size);
+
+ buffer->info.metadata.subbuf_size = new_subbuf_size;
+ /* Padding is not present in the view anymore */
+ buffer->info.metadata.padded_subbuf_size = new_subbuf_size;
+}
+
+static int metadata_stream_pre_consume_ctf1(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+ (void) metadata_stream_check_version(stream, subbuffer);
+ return 0;
+}
+
+static int metadata_stream_pre_consume_ctf2(
+ struct lttng_consumer_stream *stream, struct stream_subbuffer *subbuffer)
+{
+ (void) metadata_stream_check_version(stream, subbuffer);
+ (void) strip_packet_header_from_subbuffer(subbuffer);
return 0;
}
*/
static
int post_consume_open_new_packet(struct lttng_consumer_stream *stream,
- const struct stream_subbuffer *subbuffer,
- struct lttng_consumer_local_data *ctx)
+ const struct stream_subbuffer *subbuffer __attribute__((unused)),
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
int ret = 0;
return ret;
}
-struct lttng_consumer_stream *consumer_stream_create(
- struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
uint64_t channel_key,
uint64_t stream_key,
const char *channel_name,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
- unsigned int monitor)
+ unsigned int monitor,
+ int trace_format)
{
int ret;
struct lttng_consumer_stream *stream;
- stream = (lttng_consumer_stream *) zmalloc(sizeof(*stream));
+ stream = zmalloc<lttng_consumer_stream>();
if (stream == NULL) {
PERROR("malloc struct lttng_consumer_stream");
ret = -ENOMEM;
goto error;
}
+ stream->send_node = CDS_LIST_HEAD_INIT(stream->send_node);
stream->chan = channel;
stream->key = stream_key;
stream->trace_chunk = trace_chunk;
consumer_stream_metadata_unlock_all;
stream->read_subbuffer_ops.assert_locked =
consumer_stream_metadata_assert_locked_all;
- stream->read_subbuffer_ops.pre_consume_subbuffer =
- metadata_stream_check_version;
+ if (trace_format == 1) {
+ stream->read_subbuffer_ops.pre_consume_subbuffer =
+ metadata_stream_pre_consume_ctf1;
+ } else if (trace_format == 2) {
+ stream->read_subbuffer_ops.pre_consume_subbuffer =
+ metadata_stream_pre_consume_ctf2;
+ } else {
+ abort();
+ }
} else {
const post_consume_cb post_consume_index_op = channel->is_live ?
consumer_stream_sync_metadata_index :
{
LTTNG_ASSERT(stream);
+ cds_list_del_init(&stream->send_node);
+
/* Stream is in monitor mode. */
if (stream->monitor) {
struct lttng_consumer_channel *free_chan = NULL;
if (stream->globally_visible) {
pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
+
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
+
destroy_close_stream(stream);
/* Update channel's refcount of the stream. */
* If the stream is not visible globally, this needs to be done
* outside of the consumer data lock section.
*/
+ destroy_close_stream(stream);
free_chan = unref_channel(stream);
}
}
static ssize_t metadata_bucket_consume(
- struct lttng_consumer_local_data *unused,
+ struct lttng_consumer_local_data *unused __attribute__((unused)),
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer)
{