#include <common/common.hpp>
#include <common/defaults.hpp>
-#include <common/uri.hpp>
#include <common/relayd/relayd.hpp>
#include <common/string-utils/format.hpp>
+#include <common/uri.hpp>
+#include <lttng/trace-format-descriptor-internal.hpp>
#include "consumer.hpp"
#include "health-sessiond.hpp"
const char *root_shm_path,
const char *shm_path,
struct lttng_trace_chunk *trace_chunk,
- const struct lttng_credentials *buffer_credentials)
+ const struct lttng_credentials *buffer_credentials,
+ const lttng::trace_format_descriptor& trace_format)
{
LTTNG_ASSERT(msg);
msg->u.ask_channel.monitor = monitor;
msg->u.ask_channel.ust_app_uid = ust_app_uid;
msg->u.ask_channel.blocking_timeout = blocking_timeout;
+ if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+ msg->u.ask_channel.trace_format = 1;
+ } else {
+ msg->u.ask_channel.trace_format = 2;
+ }
std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
unsigned int live_timer_interval,
bool is_in_live_session,
unsigned int monitor_timer_interval,
- struct lttng_trace_chunk *trace_chunk)
+ struct lttng_trace_chunk *trace_chunk,
+ const lttng::trace_format_descriptor& trace_format)
{
LTTNG_ASSERT(msg);
msg->u.channel.live_timer_interval = live_timer_interval;
msg->u.channel.is_live = is_in_live_session;
msg->u.channel.monitor_timer_interval = monitor_timer_interval;
+ if (trace_format.type() == LTTNG_TRACE_FORMAT_DESCRIPTOR_TYPE_CTF_1) {
+ msg->u.channel.trace_format = 1;
+ } else {
+ msg->u.channel.trace_format = 2;
+ }
strncpy(msg->u.channel.pathname, pathname,
sizeof(msg->u.channel.pathname));
#ifndef _CONSUMER_H
#define _CONSUMER_H
+#include <algorithm>
#include <common/consumer/consumer.hpp>
#include <common/hashtable/hashtable.hpp>
#include <lttng/lttng.h>
+#include <lttng/trace-format-descriptor-internal.hpp>
#include <urcu/ref.h>
-#include <algorithm>
#include "snapshot.hpp"
const char *root_shm_path,
const char *shm_path,
struct lttng_trace_chunk *trace_chunk,
- const struct lttng_credentials *buffer_credentials);
+ const struct lttng_credentials *buffer_credentials,
+ const lttng::trace_format_descriptor& trace_format);
void consumer_init_add_stream_comm_msg(struct lttcomm_consumer_msg *msg,
uint64_t channel_key,
uint64_t stream_key,
unsigned int live_timer_interval,
bool is_in_live_session,
unsigned int monitor_timer_interval,
- struct lttng_trace_chunk *trace_chunk);
+ struct lttng_trace_chunk *trace_chunk,
+ const lttng::trace_format_descriptor& trace_format);
int consumer_is_data_pending(uint64_t session_id,
struct consumer_output *consumer);
int consumer_close_metadata(struct consumer_socket *socket,
}
/* Prep channel message structure */
- consumer_init_add_channel_comm_msg(&lkm,
- channel->key,
- ksession->id,
- &pathname[consumer_path_offset],
- consumer->net_seq_index,
- channel->channel->name,
- channel->stream_count,
- channel->channel->attr.output,
- CONSUMER_CHANNEL_TYPE_DATA,
+ consumer_init_add_channel_comm_msg(&lkm, channel->key, ksession->id,
+ &pathname[consumer_path_offset], consumer->net_seq_index,
+ channel->channel->name, channel->stream_count,
+ channel->channel->attr.output, CONSUMER_CHANNEL_TYPE_DATA,
channel->channel->attr.tracefile_size,
- channel->channel->attr.tracefile_count,
- monitor,
- channel->channel->attr.live_timer_interval,
- ksession->is_live_session,
+ channel->channel->attr.tracefile_count, monitor,
+ channel->channel->attr.live_timer_interval, ksession->is_live_session,
channel_attr_extended->monitor_timer_interval,
- ksession->current_trace_chunk);
+ ksession->current_trace_chunk, *ksession->trace_format);
health_code_update();
consumer = ksession->consumer;
/* Prep channel message structure */
- consumer_init_add_channel_comm_msg(&lkm,
- ksession->metadata->key,
- ksession->id,
- "",
- consumer->net_seq_index,
- ksession->metadata->conf->name,
- 1,
- ksession->metadata->conf->attr.output,
- CONSUMER_CHANNEL_TYPE_METADATA,
+ consumer_init_add_channel_comm_msg(&lkm, ksession->metadata->key, ksession->id, "",
+ consumer->net_seq_index, ksession->metadata->conf->name, 1,
+ ksession->metadata->conf->attr.output, CONSUMER_CHANNEL_TYPE_METADATA,
ksession->metadata->conf->attr.tracefile_size,
- ksession->metadata->conf->attr.tracefile_count,
- monitor,
+ ksession->metadata->conf->attr.tracefile_count, monitor,
ksession->metadata->conf->attr.live_timer_interval,
- ksession->is_live_session,
- 0,
- ksession->current_trace_chunk);
+ ksession->is_live_session, 0, ksession->current_trace_chunk,
+ *ksession->trace_format);
health_code_update();
return ret;
}
-struct lttng_consumer_stream *consumer_stream_create(
- struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
uint64_t channel_key,
uint64_t stream_key,
const char *channel_name,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
- unsigned int monitor)
+ unsigned int monitor,
+ int trace_format)
{
int ret;
struct lttng_consumer_stream *stream;
*
* The channel lock MUST be acquired.
*/
-struct lttng_consumer_stream *consumer_stream_create(
- struct lttng_consumer_channel *channel,
+struct lttng_consumer_stream *consumer_stream_create(struct lttng_consumer_channel *channel,
uint64_t channel_key,
uint64_t stream_key,
const char *channel_name,
int cpu,
int *alloc_ret,
enum consumer_channel_type type,
- unsigned int monitor);
+ unsigned int monitor,
+ int trace_format);
/*
* Close stream's file descriptors and, if needed, close stream also on the
unsigned int live_timer_interval,
bool is_in_live_session,
const char *root_shm_path,
- const char *shm_path)
+ const char *shm_path,
+ int trace_format)
{
struct lttng_consumer_channel *channel = NULL;
struct lttng_trace_chunk *trace_chunk = NULL;
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
channel->is_live = is_in_live_session;
+ channel->trace_format = trace_format;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
bool streams_sent_to_relayd;
uint64_t last_consumed_size_sample_sent;
+
+ int trace_format;
};
struct stream_subbuffer {
unsigned int live_timer_interval,
bool is_in_live_session,
const char *root_shm_path,
- const char *shm_path);
+ const char *shm_path,
+ int trace_format);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
DBG("consumer_add_channel %" PRIu64, msg.u.channel.channel_key);
new_channel = consumer_allocate_channel(msg.u.channel.channel_key,
msg.u.channel.session_id,
- msg.u.channel.chunk_id.is_set ?
- &chunk_id : NULL,
- msg.u.channel.pathname,
- msg.u.channel.name,
- msg.u.channel.relayd_id, msg.u.channel.output,
- msg.u.channel.tracefile_size,
- msg.u.channel.tracefile_count, 0,
- msg.u.channel.monitor,
- msg.u.channel.live_timer_interval,
- msg.u.channel.is_live,
- NULL, NULL);
+ msg.u.channel.chunk_id.is_set ? &chunk_id : NULL,
+ msg.u.channel.pathname, msg.u.channel.name, msg.u.channel.relayd_id,
+ msg.u.channel.output, msg.u.channel.tracefile_size,
+ msg.u.channel.tracefile_count, 0, msg.u.channel.monitor,
+ msg.u.channel.live_timer_interval, msg.u.channel.is_live, NULL,
+ NULL, msg.u.channel.trace_format);
if (new_channel == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
goto end_nosignal;
health_code_update();
pthread_mutex_lock(&channel->lock);
- new_stream = consumer_stream_create(
- channel,
- channel->key,
- fd,
- channel->name,
- channel->relayd_id,
- channel->session_id,
- channel->trace_chunk,
- msg.u.stream.cpu,
- &alloc_ret,
- channel->type,
- channel->monitor);
+ new_stream = consumer_stream_create(channel, channel->key, fd, channel->name,
+ channel->relayd_id, channel->session_id, channel->trace_chunk,
+ msg.u.stream.cpu, &alloc_ret, channel->type, channel->monitor,
+ channel->trace_format);
if (new_stream == NULL) {
switch (alloc_ret) {
case -ENOMEM:
uint8_t is_live;
/* timer to sample a channel's positions (usec). */
unsigned int monitor_timer_interval;
+ int trace_format;
} LTTNG_PACKED channel; /* Only used by Kernel. */
struct {
uint64_t stream_key;
int64_t blocking_timeout;
char root_shm_path[PATH_MAX];
char shm_path[PATH_MAX];
+ int trace_format;
} LTTNG_PACKED ask_channel;
struct {
uint64_t key;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(ctx);
- stream = consumer_stream_create(
- channel,
- channel->key,
- key,
- channel->name,
- channel->relayd_id,
- channel->session_id,
- channel->trace_chunk,
- cpu,
- &alloc_ret,
- channel->type,
- channel->monitor);
+ stream = consumer_stream_create(channel, channel->key, key, channel->name,
+ channel->relayd_id, channel->session_id, channel->trace_chunk, cpu,
+ &alloc_ret, channel->type, channel->monitor, channel->trace_format);
if (stream == NULL) {
switch (alloc_ret) {
case -ENOENT:
};
/* Create a plain object and reserve a channel key. */
- channel = consumer_allocate_channel(
- msg.u.ask_channel.key,
+ channel = consumer_allocate_channel(msg.u.ask_channel.key,
msg.u.ask_channel.session_id,
- msg.u.ask_channel.chunk_id.is_set ?
- &chunk_id : NULL,
- msg.u.ask_channel.pathname,
- msg.u.ask_channel.name,
+ msg.u.ask_channel.chunk_id.is_set ? &chunk_id : NULL,
+ msg.u.ask_channel.pathname, msg.u.ask_channel.name,
msg.u.ask_channel.relayd_id,
(enum lttng_event_output) msg.u.ask_channel.output,
- msg.u.ask_channel.tracefile_size,
- msg.u.ask_channel.tracefile_count,
- msg.u.ask_channel.session_id_per_pid,
- msg.u.ask_channel.monitor,
- msg.u.ask_channel.live_timer_interval,
- msg.u.ask_channel.is_live,
- msg.u.ask_channel.root_shm_path,
- msg.u.ask_channel.shm_path);
+ msg.u.ask_channel.tracefile_size, msg.u.ask_channel.tracefile_count,
+ msg.u.ask_channel.session_id_per_pid, msg.u.ask_channel.monitor,
+ msg.u.ask_channel.live_timer_interval, msg.u.ask_channel.is_live,
+ msg.u.ask_channel.root_shm_path, msg.u.ask_channel.shm_path,
+ msg.u.ask_channel.trace_format);
if (!channel) {
goto end_channel_error;
}