* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *consumer_find_stream(int key,
+static struct lttng_consumer_stream *find_stream(int key,
struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
return stream;
}
-void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(int key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
- stream = consumer_find_stream(key, ht);
+ stream = find_stream(key, ht);
if (stream) {
stream->key = -1;
/*
* RCU read side lock MUST be acquired before calling this function and
* protects the channel ptr.
*/
-static struct lttng_consumer_channel *consumer_find_channel(int key)
+struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
return NULL;
}
- lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
- &iter);
+ lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
channel = caa_container_of(node, struct lttng_consumer_channel, node);
return channel;
}
-static void consumer_steal_channel_key(int key)
+static void free_stream_rcu(struct rcu_head *head)
{
- struct lttng_consumer_channel *channel;
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
- rcu_read_lock();
- channel = consumer_find_channel(key);
- if (channel) {
- channel->key = -1;
- /*
- * We don't want the lookup to match, but we still need
- * to iterate on this channel when iterating over the hash table. Just
- * change the node key.
- */
- channel->node.key = -1;
- }
- rcu_read_unlock();
+ free(stream);
}
-static
-void consumer_free_stream(struct rcu_head *head)
+static void free_channel_rcu(struct rcu_head *head)
{
struct lttng_ht_node_ulong *node =
caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
- free(stream);
+ free(channel);
}
/*
* RCU protected relayd socket pair free.
*/
-static void consumer_rcu_free_relayd(struct rcu_head *head)
+static void free_relayd_rcu(struct rcu_head *head)
{
struct lttng_ht_node_ulong *node =
caa_container_of(head, struct lttng_ht_node_ulong, head);
}
/* RCU free() call */
- call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
+ call_rcu(&relayd->node.head, free_relayd_rcu);
+}
+
+/*
+ * Remove a channel from the global list protected by a mutex. This function is
+ * also responsible for freeing its data structures.
+ */
+void consumer_del_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ DBG("Consumer delete channel key %d", channel->key);
+
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_del_channel(channel);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ goto end;
+ }
+
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
+ call_rcu(&channel->node.head, free_channel_rcu);
+end:
+ pthread_mutex_unlock(&consumer_data.lock);
}
/*
if (ht == NULL) {
/* Means the stream was allocated but not successfully added */
- goto free_stream;
+ goto free_stream_rcu;
}
pthread_mutex_lock(&consumer_data.lock);
PERROR("close");
}
}
- if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
- ret = close(stream->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
/* Check and cleanup relayd */
rcu_read_lock();
uatomic_dec(&stream->chan->refcount);
if (!uatomic_read(&stream->chan->refcount)
- && !uatomic_read(&stream->chan->nb_init_streams)) {
+ && !uatomic_read(&stream->chan->nb_init_stream_left)) {
free_chan = stream->chan;
}
consumer_del_channel(free_chan);
}
-free_stream:
- call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+ call_rcu(&stream->node.head, free_stream_rcu);
}
-struct lttng_consumer_stream *consumer_allocate_stream(
- int channel_key, int stream_key,
- int shm_fd, int wait_fd,
+struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
+ int stream_key,
enum lttng_consumer_stream_state state,
- uint64_t mmap_len,
- enum lttng_event_output output,
- const char *path_name,
+ const char *channel_name,
uid_t uid,
gid_t gid,
- int net_index,
- int metadata_flag,
+ int relayd_id,
uint64_t session_id,
- int *alloc_ret)
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type)
{
+ int ret;
struct lttng_consumer_stream *stream;
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
PERROR("malloc struct lttng_consumer_stream");
- *alloc_ret = -ENOMEM;
+ ret = -ENOMEM;
goto end;
}
rcu_read_lock();
- /*
- * Get stream's channel reference. Needed when adding the stream to the
- * global hash table.
- */
- stream->chan = consumer_find_channel(channel_key);
- if (!stream->chan) {
- *alloc_ret = -ENOENT;
- ERR("Unable to find channel for stream %d", stream_key);
- goto error;
- }
-
stream->key = stream_key;
- stream->shm_fd = shm_fd;
- stream->wait_fd = wait_fd;
stream->out_fd = -1;
stream->out_fd_offset = 0;
stream->state = state;
- stream->mmap_len = mmap_len;
- stream->mmap_base = NULL;
- stream->output = output;
stream->uid = uid;
stream->gid = gid;
- stream->net_seq_idx = net_index;
- stream->metadata_flag = metadata_flag;
+ stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
- strncpy(stream->path_name, path_name, sizeof(stream->path_name));
- stream->path_name[sizeof(stream->path_name) - 1] = '\0';
pthread_mutex_init(&stream->lock, NULL);
- /*
- * Index differently the metadata node because the thread is using an
- * internal hash table to match streams in the metadata_ht to the epoll set
- * file descriptor.
- */
- if (metadata_flag) {
- lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
+ /* If channel is the metadata, flag this stream as metadata. */
+ if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ stream->metadata_flag = 1;
+ /* Metadata is flat out. */
+ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
} else {
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+ /* Format stream name to <channel_name>_<cpu_number> */
+ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+ channel_name, cpu);
+ if (ret < 0) {
+ PERROR("snprintf stream name");
+ goto error;
+ }
}
+ /* Key is always the wait_fd for streams. */
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
+
/* Init session id node with the stream session id */
lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
- /*
- * The cpu number is needed before using any ustctl_* actions. Ignored for
- * the kernel so the value does not matter.
- */
- pthread_mutex_lock(&consumer_data.lock);
- stream->cpu = stream->chan->cpucount++;
- pthread_mutex_unlock(&consumer_data.lock);
-
- DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
- stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
- (unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx, stream->session_id);
+ DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx, stream->session_id);
rcu_read_unlock();
return stream;
rcu_read_unlock();
free(stream);
end:
+ if (alloc_ret) {
+ *alloc_ret = ret;
+ }
return NULL;
}
/*
* Add a stream to the global list protected by a mutex.
*/
-static int consumer_add_stream(struct lttng_consumer_stream *stream,
+static int add_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
int ret = 0;
rcu_read_lock();
/* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
+ steal_stream_key(stream->key, ht);
lttng_ht_add_unique_ulong(ht, &stream->node);
uatomic_inc(&stream->chan->refcount);
/*
- * When nb_init_streams reaches 0, we don't need to trigger any action in
- * terms of destroying the associated channel, because the action that
+ * When nb_init_stream_left reaches 0, we don't need to trigger any action
+ * in terms of destroying the associated channel, because the action that
* causes the count to become 0 also causes a stream to be added. The
* channel deletion will thus be triggered by the following removal of this
* stream.
*/
- if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
- uatomic_dec(&stream->chan->nb_init_streams);
+ if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ uatomic_dec(&stream->chan->nb_init_stream_left);
}
/* Update consumer data once the node is inserted. */
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
- if (relayd == NULL) {
- ret = -1;
- goto end;
- }
+ assert(relayd);
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- /* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
* this next value, 1 should always be substracted in order to compare
* the last seen sequence number on the relayd side to the last sent.
*/
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
return outfd;
}
-static
-void consumer_free_channel(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
-
- free(channel);
-}
-
/*
- * Remove a channel from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Allocate and return a new lttng_consumer_channel object using the given key
+ * to initialize the hash table node.
+ *
+ * On error, return NULL.
*/
-void consumer_del_channel(struct lttng_consumer_channel *channel)
-{
- int ret;
- struct lttng_ht_iter iter;
-
- DBG("Consumer delete channel key %d", channel->key);
-
- pthread_mutex_lock(&consumer_data.lock);
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_del_channel(channel);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
-
- rcu_read_lock();
- iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
- if (channel->mmap_base != NULL) {
- ret = munmap(channel->mmap_base, channel->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
- if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
- ret = close(channel->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
- ret = close(channel->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- call_rcu(&channel->node.head, consumer_free_channel);
-end:
- pthread_mutex_unlock(&consumer_data.lock);
-}
-
-struct lttng_consumer_channel *consumer_allocate_channel(
- int channel_key,
- int shm_fd, int wait_fd,
- uint64_t mmap_len,
- uint64_t max_sb_size,
- unsigned int nb_init_streams)
+struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+ uint64_t session_id,
+ const char *pathname,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
+ int relayd_id,
+ enum lttng_event_output output)
{
struct lttng_consumer_channel *channel;
- int ret;
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
PERROR("malloc struct lttng_consumer_channel");
goto end;
}
- channel->key = channel_key;
- channel->shm_fd = shm_fd;
- channel->wait_fd = wait_fd;
- channel->mmap_len = mmap_len;
- channel->max_sb_size = max_sb_size;
+
+ channel->key = key;
channel->refcount = 0;
- channel->nb_init_streams = nb_init_streams;
+ channel->session_id = session_id;
+ channel->uid = uid;
+ channel->gid = gid;
+ channel->relayd_id = relayd_id;
+ channel->output = output;
+
+ strncpy(channel->pathname, pathname, sizeof(channel->pathname));
+ channel->pathname[sizeof(channel->pathname) - 1] = '\0';
+
+ strncpy(channel->name, name, sizeof(channel->name));
+ channel->name[sizeof(channel->name) - 1] = '\0';
+
lttng_ht_node_init_ulong(&channel->node, channel->key);
+ CDS_INIT_LIST_HEAD(&channel->streams.head);
+
+ DBG("Allocated channel (key %d)", channel->key)
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- channel->mmap_base = NULL;
- channel->mmap_len = 0;
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_allocate_channel(channel);
- if (ret) {
- free(channel);
- return NULL;
- }
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
- DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
- channel->key, channel->shm_fd, channel->wait_fd,
- (unsigned long long) channel->mmap_len,
- (unsigned long long) channel->max_sb_size);
end:
return channel;
}
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
+ int ret = 0;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
- /* Steal channel identifier, for UST */
- consumer_steal_channel_key(channel->key);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht,
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
/* Channel already exist. Ignore the insertion */
+ ERR("Consumer add channel key %d already exists!", channel->key);
+ ret = -1;
goto end;
}
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
- return 0;
+ return ret;
}
/*
*
* Returns the number of fds in the structures.
*/
-static int consumer_update_poll_array(
- struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
+static int update_poll_array(struct lttng_consumer_local_data *ctx,
+ struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht)
{
int i = 0;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
+ assert(ctx);
+ assert(ht);
+ assert(pollfd);
+ assert(local_stream);
+
DBG("Updating poll fd array");
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
/*
* Set the error socket.
*/
-void lttng_consumer_set_error_sock(
- struct lttng_consumer_local_data *ctx, int sock)
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
+ int sock)
{
ctx->consumer_error_socket = sock;
}
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(
- struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
{
if (ctx->consumer_error_socket > 0) {
return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
void lttng_consumer_cleanup(void)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_channel *channel;
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
- node) {
- struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+ node.node) {
consumer_del_channel(channel);
}
* Don't care about error values, as these are just hints and ways to
* limit the amount of page cache used.
*/
- if (orig_offset < stream->chan->max_sb_size) {
+ if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
- stream->chan->max_sb_size,
+ lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size,
SYNC_FILE_RANGE_WAIT_BEFORE
| SYNC_FILE_RANGE_WRITE
| SYNC_FILE_RANGE_WAIT_AFTER);
* defined. So it can be expected to lead to lower throughput in
* streaming.
*/
- posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
- stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
+ posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size, POSIX_FADV_DONTNEED);
}
/*
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd,
- unsigned long padding)
+ struct consumer_relayd_sock_pair *relayd, unsigned long padding)
{
int ret;
struct lttcomm_relayd_metadata_payload hdr;
unsigned long padding)
{
unsigned long mmap_offset;
+ void *mmap_base;
ssize_t ret = 0, written = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
/* get the offset inside the fd to mmap */
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
+ mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
- stream->buf, &mmap_offset);
+ mmap_base = lttng_ustctl_get_mmap_base(stream);
+ if (!mmap_base) {
+ ERR("read mmap get mmap base for stream %s", stream->name);
+ written = -1;
+ goto end;
+ }
+ ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
break;
default:
ERR("Unknown consumer_data type");
while (len > 0) {
do {
- ret = write(outfd, stream->mmap_base + mmap_offset, len);
+ ret = write(outfd, mmap_base + mmap_offset, len);
} while (ret < 0 && errno == EINTR);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
{
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_take_snapshot(ctx, stream);
+ return lttng_kconsumer_take_snapshot(stream);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_take_snapshot(ctx, stream);
+ return lttng_ustconsumer_take_snapshot(stream);
default:
ERR("Unknown consumer_data type");
assert(0);
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_produced_snapshot(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream,
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos)
{
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
+ return lttng_kconsumer_get_produced_snapshot(stream, pos);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
+ return lttng_ustconsumer_get_produced_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
assert(0);
if (ht == NULL) {
/* Means the stream was allocated but not successfully added */
- goto free_stream;
+ goto free_stream_rcu;
}
pthread_mutex_lock(&consumer_data.lock);
}
}
- if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
- ret = close(stream->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
/* Check and cleanup relayd */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
if (!uatomic_read(&stream->chan->refcount)
- && !uatomic_read(&stream->chan->nb_init_streams)) {
+ && !uatomic_read(&stream->chan->nb_init_stream_left)) {
/* Go for channel deletion! */
free_chan = stream->chan;
}
consumer_del_channel(free_chan);
}
-free_stream:
- call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+ call_rcu(&stream->node.head, free_stream_rcu);
}
/*
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
+static int add_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
int ret = 0;
assert(stream);
assert(ht);
- DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
+ DBG3("Adding metadata stream %d to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
* Lookup the stream just to make sure it does not exist in our internal
* state. This should NEVER happen.
*/
- lttng_ht_lookup(ht, (void *)((unsigned long) stream->wait_fd), &iter);
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
assert(!node);
uatomic_inc(&stream->chan->refcount);
/*
- * When nb_init_streams reaches 0, we don't need to trigger any action in
- * terms of destroying the associated channel, because the action that
+ * When nb_init_stream_left reaches 0, we don't need to trigger any action
+ * in terms of destroying the associated channel, because the action that
* causes the count to become 0 also causes a stream to be added. The
* channel deletion will thus be triggered by the following removal of this
* stream.
*/
- if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
- uatomic_dec(&stream->chan->nb_init_streams);
+ if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ uatomic_dec(&stream->chan->nb_init_stream_left);
}
lttng_ht_add_unique_ulong(ht, &stream->node);
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = consumer_add_metadata_stream(stream, metadata_ht);
+ ret = add_metadata_stream(stream, metadata_ht);
if (ret) {
ERR("Unable to add metadata stream");
/* Stream was not setup properly. Continuing. */
*/
pthread_mutex_lock(&consumer_data.lock);
if (consumer_data.need_update) {
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ pollfd = NULL;
+
+ free(local_stream);
+ local_stream = NULL;
/* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ ret = update_poll_array(ctx, &pollfd, local_stream,
data_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
- size_t pipe_readlen;
+ ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
continue;
}
- ret = consumer_add_stream(new_stream, data_ht);
+ ret = add_stream(new_stream, data_ht);
if (ret) {
ERR("Consumer add stream %d failed. Continuing",
new_stream->key);
}
end:
DBG("polling thread exiting");
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ free(local_stream);
/*
* Close the write side of the pipe so epoll_wait() in
DBG("consumer_thread_receive_fds received quit from signal");
goto end;
}
- DBG("received fds on sock");
+ DBG("received command on sock");
}
end:
- DBG("consumer_thread_receive_fds exiting");
+ DBG("Consumer thread sessiond poll exiting");
/*
* when all fds have hung up, the polling thread
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
fd = -1; /* Just in case it gets set with an invalid value. */
- goto error;
+ goto error_close;
}
/* We have the fds without error. Send status back. */
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
if (ret < 0) {
+ /*
+ * Close all sockets of a relayd object. It will be freed if it was
+ * created at the error code path or else it will be garbage
+ * collect.
+ */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
goto error;
}
}
}
+error_close:
if (relayd_created) {
- /* We just want to cleanup. Ignore ret value. */
- (void) relayd_close(&relayd->control_sock);
- (void) relayd_close(&relayd->data_sock);
free(relayd);
}
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
+
+/*
+ * Send a channel status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_channel(int sock,
+ struct lttng_consumer_channel *channel)
+{
+ struct lttcomm_consumer_status_channel msg;
+
+ assert(sock >= 0);
+
+ if (!channel) {
+ msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ } else {
+ msg.ret_code = LTTNG_OK;
+ msg.key = channel->key;
+ msg.stream_count = channel->streams.count;
+ }
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}