volatile int consumer_quit;
/*
- * The following two hash tables are visible by all threads which are separated
- * in different source files.
- *
* Global hash table containing respectively metadata and data streams. The
* stream element in this ht should only be updated by the metadata poll thread
* for the metadata and the data poll thread for the data.
*/
-struct lttng_ht *metadata_ht;
-struct lttng_ht *data_ht;
+static struct lttng_ht *metadata_ht;
+static struct lttng_ht *data_ht;
/*
* Notify a thread pipe to poll back again. This usually means that some global
* Find a stream. The consumer_data.lock must be locked during this
* call.
*/
-static struct lttng_consumer_stream *consumer_find_stream(int key,
+static struct lttng_consumer_stream *find_stream(int key,
struct lttng_ht *ht)
{
struct lttng_ht_iter iter;
return stream;
}
-void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(int key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
- stream = consumer_find_stream(key, ht);
+ stream = find_stream(key, ht);
if (stream) {
stream->key = -1;
/*
rcu_read_unlock();
}
-static struct lttng_consumer_channel *consumer_find_channel(int key)
+/*
+ * Return a channel object for the given key.
+ *
+ * RCU read side lock MUST be acquired before calling this function and
+ * protects the channel ptr.
+ */
+struct lttng_consumer_channel *consumer_find_channel(unsigned long key)
{
struct lttng_ht_iter iter;
struct lttng_ht_node_ulong *node;
return NULL;
}
- rcu_read_lock();
-
- lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
- &iter);
+ lttng_ht_lookup(consumer_data.channel_ht, (void *) key, &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
channel = caa_container_of(node, struct lttng_consumer_channel, node);
}
- rcu_read_unlock();
-
return channel;
}
-static void consumer_steal_channel_key(int key)
+static void free_stream_rcu(struct rcu_head *head)
{
- struct lttng_consumer_channel *channel;
+ struct lttng_ht_node_ulong *node =
+ caa_container_of(head, struct lttng_ht_node_ulong, head);
+ struct lttng_consumer_stream *stream =
+ caa_container_of(node, struct lttng_consumer_stream, node);
- rcu_read_lock();
- channel = consumer_find_channel(key);
- if (channel) {
- channel->key = -1;
- /*
- * We don't want the lookup to match, but we still need
- * to iterate on this channel when iterating over the hash table. Just
- * change the node key.
- */
- channel->node.key = -1;
- }
- rcu_read_unlock();
+ free(stream);
}
-static
-void consumer_free_stream(struct rcu_head *head)
+static void free_channel_rcu(struct rcu_head *head)
{
struct lttng_ht_node_ulong *node =
caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
+ struct lttng_consumer_channel *channel =
+ caa_container_of(node, struct lttng_consumer_channel, node);
- free(stream);
+ free(channel);
}
/*
* RCU protected relayd socket pair free.
*/
-static void consumer_rcu_free_relayd(struct rcu_head *head)
+static void free_relayd_rcu(struct rcu_head *head)
{
struct lttng_ht_node_ulong *node =
caa_container_of(head, struct lttng_ht_node_ulong, head);
}
/* RCU free() call */
- call_rcu(&relayd->node.head, consumer_rcu_free_relayd);
+ call_rcu(&relayd->node.head, free_relayd_rcu);
+}
+
+/*
+ * Remove a channel from the global list protected by a mutex. This function is
+ * also responsible for freeing its data structures.
+ */
+void consumer_del_channel(struct lttng_consumer_channel *channel)
+{
+ int ret;
+ struct lttng_ht_iter iter;
+
+ DBG("Consumer delete channel key %d", channel->key);
+
+ pthread_mutex_lock(&consumer_data.lock);
+
+ switch (consumer_data.type) {
+ case LTTNG_CONSUMER_KERNEL:
+ break;
+ case LTTNG_CONSUMER32_UST:
+ case LTTNG_CONSUMER64_UST:
+ lttng_ustconsumer_del_channel(channel);
+ break;
+ default:
+ ERR("Unknown consumer_data type");
+ assert(0);
+ goto end;
+ }
+
+ rcu_read_lock();
+ iter.iter.node = &channel->node.node;
+ ret = lttng_ht_del(consumer_data.channel_ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
+ call_rcu(&channel->node.head, free_channel_rcu);
+end:
+ pthread_mutex_unlock(&consumer_data.lock);
+}
+
+/*
+ * Iterate over the relayd hash table and destroy each element. Finally,
+ * destroy the whole hash table.
+ */
+static void cleanup_relayd_ht(void)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd;
+
+ rcu_read_lock();
+
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ destroy_relayd(relayd);
+ }
+
+ lttng_ht_destroy(consumer_data.relayd_ht);
+
+ rcu_read_unlock();
}
/*
if (ht == NULL) {
/* Means the stream was allocated but not successfully added */
- goto free_stream;
+ goto free_stream_rcu;
}
- pthread_mutex_lock(&stream->lock);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
PERROR("close");
}
}
- if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
- ret = close(stream->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
/* Check and cleanup relayd */
rcu_read_lock();
uatomic_dec(&stream->chan->refcount);
if (!uatomic_read(&stream->chan->refcount)
- && !uatomic_read(&stream->chan->nb_init_streams)) {
+ && !uatomic_read(&stream->chan->nb_init_stream_left)) {
free_chan = stream->chan;
}
end:
consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
}
-free_stream:
- call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+ call_rcu(&stream->node.head, free_stream_rcu);
}
-struct lttng_consumer_stream *consumer_allocate_stream(
- int channel_key, int stream_key,
- int shm_fd, int wait_fd,
+struct lttng_consumer_stream *consumer_allocate_stream(int channel_key,
+ int stream_key,
enum lttng_consumer_stream_state state,
- uint64_t mmap_len,
- enum lttng_event_output output,
- const char *path_name,
+ const char *channel_name,
uid_t uid,
gid_t gid,
- int net_index,
- int metadata_flag,
+ int relayd_id,
uint64_t session_id,
- int *alloc_ret)
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type)
{
+ int ret;
struct lttng_consumer_stream *stream;
stream = zmalloc(sizeof(*stream));
if (stream == NULL) {
PERROR("malloc struct lttng_consumer_stream");
- *alloc_ret = -ENOMEM;
+ ret = -ENOMEM;
goto end;
}
- /*
- * Get stream's channel reference. Needed when adding the stream to the
- * global hash table.
- */
- stream->chan = consumer_find_channel(channel_key);
- if (!stream->chan) {
- *alloc_ret = -ENOENT;
- ERR("Unable to find channel for stream %d", stream_key);
- goto error;
- }
+ rcu_read_lock();
stream->key = stream_key;
- stream->shm_fd = shm_fd;
- stream->wait_fd = wait_fd;
stream->out_fd = -1;
stream->out_fd_offset = 0;
stream->state = state;
- stream->mmap_len = mmap_len;
- stream->mmap_base = NULL;
- stream->output = output;
stream->uid = uid;
stream->gid = gid;
- stream->net_seq_idx = net_index;
- stream->metadata_flag = metadata_flag;
+ stream->net_seq_idx = relayd_id;
stream->session_id = session_id;
- strncpy(stream->path_name, path_name, sizeof(stream->path_name));
- stream->path_name[sizeof(stream->path_name) - 1] = '\0';
pthread_mutex_init(&stream->lock, NULL);
- /*
- * Index differently the metadata node because the thread is using an
- * internal hash table to match streams in the metadata_ht to the epoll set
- * file descriptor.
- */
- if (metadata_flag) {
- lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
+ /* If channel is the metadata, flag this stream as metadata. */
+ if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
+ stream->metadata_flag = 1;
+ /* Metadata is flat out. */
+ strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
} else {
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+ /* Format stream name to <channel_name>_<cpu_number> */
+ ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
+ channel_name, cpu);
+ if (ret < 0) {
+ PERROR("snprintf stream name");
+ goto error;
+ }
}
+ /* Key is always the wait_fd for streams. */
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
+
/* Init session id node with the stream session id */
lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
- /*
- * The cpu number is needed before using any ustctl_* actions. Ignored for
- * the kernel so the value does not matter.
- */
- pthread_mutex_lock(&consumer_data.lock);
- stream->cpu = stream->chan->cpucount++;
- pthread_mutex_unlock(&consumer_data.lock);
+ DBG3("Allocated stream %s (key %d, relayd_id %d, session_id %" PRIu64,
+ stream->name, stream->key, stream->net_seq_idx, stream->session_id);
- DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
- stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
- (unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx, stream->session_id);
+ rcu_read_unlock();
return stream;
error:
+ rcu_read_unlock();
free(stream);
end:
+ if (alloc_ret) {
+ *alloc_ret = ret;
+ }
return NULL;
}
/*
* Add a stream to the global list protected by a mutex.
*/
-static int consumer_add_stream(struct lttng_consumer_stream *stream,
+static int add_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
int ret = 0;
DBG3("Adding consumer stream %d", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
rcu_read_lock();
/* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
+ steal_stream_key(stream->key, ht);
lttng_ht_add_unique_ulong(ht, &stream->node);
uatomic_inc(&stream->chan->refcount);
/*
- * When nb_init_streams reaches 0, we don't need to trigger any action in
- * terms of destroying the associated channel, because the action that
+ * When nb_init_stream_left reaches 0, we don't need to trigger any action
+ * in terms of destroying the associated channel, because the action that
* causes the count to become 0 also causes a stream to be added. The
* channel deletion will thus be triggered by the following removal of this
* stream.
*/
- if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
- uatomic_dec(&stream->chan->nb_init_streams);
+ if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ uatomic_dec(&stream->chan->nb_init_stream_left);
}
/* Update consumer data once the node is inserted. */
consumer_data.need_update = 1;
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
- if (relayd == NULL) {
- ret = -1;
- goto end;
- }
+ assert(relayd);
lttng_ht_lookup(consumer_data.relayd_ht,
(void *)((unsigned long) relayd->net_seq_idx), &iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
- /* Relayd already exist. Ignore the insertion */
goto end;
}
lttng_ht_add_unique_ulong(consumer_data.relayd_ht, &relayd->node);
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
- data_hdr.net_seq_num = htobe64(stream->next_net_seq_num++);
+ /*
+ * Note that net_seq_num below is assigned with the *current* value of
+ * next_net_seq_num and only after that the next_net_seq_num will be
+ * increment. This is why when issuing a command on the relayd using
+ * this next value, 1 should always be substracted in order to compare
+ * the last seen sequence number on the relayd side to the last sent.
+ */
+ data_hdr.net_seq_num = htobe64(stream->next_net_seq_num);
/* Other fields are zeroed previously */
ret = relayd_send_data_hdr(&relayd->data_sock, &data_hdr,
goto error;
}
+ ++stream->next_net_seq_num;
+
/* Set to go on data socket */
outfd = relayd->data_sock.fd;
}
return outfd;
}
-static
-void consumer_free_channel(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
-
- free(channel);
-}
-
/*
- * Remove a channel from the global list protected by a mutex. This
- * function is also responsible for freeing its data structures.
+ * Allocate and return a new lttng_consumer_channel object using the given key
+ * to initialize the hash table node.
+ *
+ * On error, return NULL.
*/
-void consumer_del_channel(struct lttng_consumer_channel *channel)
-{
- int ret;
- struct lttng_ht_iter iter;
-
- pthread_mutex_lock(&consumer_data.lock);
-
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_del_channel(channel);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
-
- rcu_read_lock();
- iter.iter.node = &channel->node.node;
- ret = lttng_ht_del(consumer_data.channel_ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
- if (channel->mmap_base != NULL) {
- ret = munmap(channel->mmap_base, channel->mmap_len);
- if (ret != 0) {
- PERROR("munmap");
- }
- }
- if (channel->wait_fd >= 0 && !channel->wait_fd_is_copy) {
- ret = close(channel->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
- if (channel->shm_fd >= 0 && channel->wait_fd != channel->shm_fd) {
- ret = close(channel->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- call_rcu(&channel->node.head, consumer_free_channel);
-end:
- pthread_mutex_unlock(&consumer_data.lock);
-}
-
-struct lttng_consumer_channel *consumer_allocate_channel(
- int channel_key,
- int shm_fd, int wait_fd,
- uint64_t mmap_len,
- uint64_t max_sb_size,
- unsigned int nb_init_streams)
+struct lttng_consumer_channel *consumer_allocate_channel(unsigned long key,
+ uint64_t session_id,
+ const char *pathname,
+ const char *name,
+ uid_t uid,
+ gid_t gid,
+ int relayd_id,
+ enum lttng_event_output output)
{
struct lttng_consumer_channel *channel;
- int ret;
channel = zmalloc(sizeof(*channel));
if (channel == NULL) {
PERROR("malloc struct lttng_consumer_channel");
goto end;
}
- channel->key = channel_key;
- channel->shm_fd = shm_fd;
- channel->wait_fd = wait_fd;
- channel->mmap_len = mmap_len;
- channel->max_sb_size = max_sb_size;
+
+ channel->key = key;
channel->refcount = 0;
- channel->nb_init_streams = nb_init_streams;
+ channel->session_id = session_id;
+ channel->uid = uid;
+ channel->gid = gid;
+ channel->relayd_id = relayd_id;
+ channel->output = output;
+
+ strncpy(channel->pathname, pathname, sizeof(channel->pathname));
+ channel->pathname[sizeof(channel->pathname) - 1] = '\0';
+
+ strncpy(channel->name, name, sizeof(channel->name));
+ channel->name[sizeof(channel->name) - 1] = '\0';
+
lttng_ht_node_init_ulong(&channel->node, channel->key);
+ CDS_INIT_LIST_HEAD(&channel->streams.head);
+
+ DBG("Allocated channel (key %d)", channel->key)
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- channel->mmap_base = NULL;
- channel->mmap_len = 0;
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_allocate_channel(channel);
- if (ret) {
- free(channel);
- return NULL;
- }
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- goto end;
- }
- DBG("Allocated channel (key %d, shm_fd %d, wait_fd %d, mmap_len %llu, max_sb_size %llu)",
- channel->key, channel->shm_fd, channel->wait_fd,
- (unsigned long long) channel->mmap_len,
- (unsigned long long) channel->max_sb_size);
end:
return channel;
}
*/
int consumer_add_channel(struct lttng_consumer_channel *channel)
{
+ int ret = 0;
struct lttng_ht_node_ulong *node;
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
- /* Steal channel identifier, for UST */
- consumer_steal_channel_key(channel->key);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht,
node = lttng_ht_iter_get_node_ulong(&iter);
if (node != NULL) {
/* Channel already exist. Ignore the insertion */
+ ERR("Consumer add channel key %d already exists!", channel->key);
+ ret = -1;
goto end;
}
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
- return 0;
+ return ret;
}
/*
*
* Returns the number of fds in the structures.
*/
-static int consumer_update_poll_array(
- struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_stream, struct lttng_ht *ht)
+static int update_poll_array(struct lttng_consumer_local_data *ctx,
+ struct pollfd **pollfd, struct lttng_consumer_stream **local_stream,
+ struct lttng_ht *ht)
{
int i = 0;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
+ assert(ctx);
+ assert(ht);
+ assert(pollfd);
+ assert(local_stream);
+
DBG("Updating poll fd array");
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
* changed where this function will be called back again.
*/
if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM ||
- stream->endpoint_status) {
+ stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
continue;
}
DBG("Active FD %d", stream->wait_fd);
/*
* Set the error socket.
*/
-void lttng_consumer_set_error_sock(
- struct lttng_consumer_local_data *ctx, int sock)
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
+ int sock)
{
ctx->consumer_error_socket = sock;
}
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(
- struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
{
if (ctx->consumer_error_socket > 0) {
return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
}
/*
- * Close all the tracefiles and stream fds, should be called when all instances
- * are destroyed.
+ * Close all the tracefiles and stream fds and MUST be called when all
+ * instances are destroyed i.e. when all threads were joined and are ended.
*/
void lttng_consumer_cleanup(void)
{
struct lttng_ht_iter iter;
- struct lttng_ht_node_ulong *node;
+ struct lttng_consumer_channel *channel;
rcu_read_lock();
- cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
- node) {
- struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
+ cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
+ node.node) {
consumer_del_channel(channel);
}
rcu_read_unlock();
lttng_ht_destroy(consumer_data.channel_ht);
+
+ cleanup_relayd_ht();
+
+ /*
+ * This HT contains streams that are freed by either the metadata thread or
+ * the data thread so we do *nothing* on the hash table and simply destroy
+ * it.
+ */
+ lttng_ht_destroy(consumer_data.stream_list_ht);
}
/*
do {
ret = write(ctx->consumer_should_quit[1], "4", 1);
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
+ if (ret < 0 || ret != 1) {
PERROR("write consumer quit");
}
* Don't care about error values, as these are just hints and ways to
* limit the amount of page cache used.
*/
- if (orig_offset < stream->chan->max_sb_size) {
+ if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd, orig_offset - stream->chan->max_sb_size,
- stream->chan->max_sb_size,
+ lttng_sync_file_range(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size,
SYNC_FILE_RANGE_WAIT_BEFORE
| SYNC_FILE_RANGE_WRITE
| SYNC_FILE_RANGE_WAIT_AFTER);
* defined. So it can be expected to lead to lower throughput in
* streaming.
*/
- posix_fadvise(outfd, orig_offset - stream->chan->max_sb_size,
- stream->chan->max_sb_size, POSIX_FADV_DONTNEED);
+ posix_fadvise(outfd, orig_offset - stream->max_sb_size,
+ stream->max_sb_size, POSIX_FADV_DONTNEED);
}
/*
*/
static int write_relayd_metadata_id(int fd,
struct lttng_consumer_stream *stream,
- struct consumer_relayd_sock_pair *relayd,
- unsigned long padding)
+ struct consumer_relayd_sock_pair *relayd, unsigned long padding)
{
int ret;
struct lttcomm_relayd_metadata_payload hdr;
do {
ret = write(fd, (void *) &hdr, sizeof(hdr));
} while (ret < 0 && errno == EINTR);
- if (ret < 0) {
- PERROR("write metadata stream id");
+ if (ret < 0 || ret != sizeof(hdr)) {
+ /*
+ * This error means that the fd's end is closed so ignore the perror
+ * not to clubber the error output since this can happen in a normal
+ * code path.
+ */
+ if (errno != EPIPE) {
+ PERROR("write metadata stream id");
+ }
+ DBG3("Consumer failed to write relayd metadata id (errno: %d)", errno);
+ /*
+ * Set ret to a negative value because if ret != sizeof(hdr), we don't
+ * handle writting the missing part so report that as an error and
+ * don't lie to the caller.
+ */
+ ret = -1;
goto end;
}
DBG("Metadata stream id %" PRIu64 " with padding %lu written before data",
* core function for writing trace buffers to either the local filesystem or
* the network.
*
+ * It must be called with the stream lock held.
+ *
* Careful review MUST be put if any changes occur!
*
* Returns the number of bytes written
unsigned long padding)
{
unsigned long mmap_offset;
+ void *mmap_base;
ssize_t ret = 0, written = 0;
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
/* get the offset inside the fd to mmap */
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
+ mmap_base = stream->mmap_base;
ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- ret = lttng_ustctl_get_mmap_read_offset(stream->chan->handle,
- stream->buf, &mmap_offset);
+ mmap_base = lttng_ustctl_get_mmap_base(stream);
+ if (!mmap_base) {
+ ERR("read mmap get mmap base for stream %s", stream->name);
+ written = -1;
+ goto end;
+ }
+ ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
break;
default:
ERR("Unknown consumer_data type");
while (len > 0) {
do {
- ret = write(outfd, stream->mmap_base + mmap_offset, len);
+ ret = write(outfd, mmap_base + mmap_offset, len);
} while (ret < 0 && errno == EINTR);
DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
if (ret < 0) {
- PERROR("Error in file write");
+ /*
+ * This is possible if the fd is closed on the other side (outfd)
+ * or any write problem. It can be verbose a bit for a normal
+ * execution if for instance the relayd is stopped abruptly. This
+ * can happen so set this to a DBG statement.
+ */
+ DBG("Error in file write mmap");
if (written == 0) {
written = ret;
}
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
/*
* Splice the data from the ring buffer to the tracefile.
*
+ * It must be called with the stream lock held.
+ *
* Returns the number of bytes spliced.
*/
ssize_t lttng_consumer_on_read_subbuffer_splice(
/* RCU lock for the relayd pointer */
rcu_read_lock();
- pthread_mutex_lock(&stream->lock);
-
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
return written;
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_take_snapshot(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream)
{
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_take_snapshot(ctx, stream);
+ return lttng_kconsumer_take_snapshot(stream);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_take_snapshot(ctx, stream);
+ return lttng_ustconsumer_take_snapshot(stream);
default:
ERR("Unknown consumer_data type");
assert(0);
*
* Returns 0 on success, < 0 on error
*/
-int lttng_consumer_get_produced_snapshot(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream,
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
unsigned long *pos)
{
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_get_produced_snapshot(ctx, stream, pos);
+ return lttng_kconsumer_get_produced_snapshot(stream, pos);
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_get_produced_snapshot(ctx, stream, pos);
+ return lttng_ustconsumer_get_produced_snapshot(stream, pos);
default:
ERR("Unknown consumer_data type");
assert(0);
*/
static void destroy_data_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
}
rcu_read_unlock();
*/
static void destroy_stream_ht(struct lttng_ht *ht)
{
- int ret;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
rcu_read_lock();
cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
-
- call_rcu(&stream->node.head, consumer_free_stream);
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
}
rcu_read_unlock();
if (ht == NULL) {
/* Means the stream was allocated but not successfully added */
- goto free_stream;
+ goto free_stream_rcu;
}
+ pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&stream->lock);
- pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
}
}
- if (stream->wait_fd >= 0 && !stream->wait_fd_is_copy) {
- ret = close(stream->wait_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
- if (stream->shm_fd >= 0 && stream->wait_fd != stream->shm_fd) {
- ret = close(stream->shm_fd);
- if (ret) {
- PERROR("close");
- }
- }
-
/* Check and cleanup relayd */
rcu_read_lock();
relayd = consumer_find_relayd(stream->net_seq_idx);
/* Atomically decrement channel refcount since other threads can use it. */
uatomic_dec(&stream->chan->refcount);
if (!uatomic_read(&stream->chan->refcount)
- && !uatomic_read(&stream->chan->nb_init_streams)) {
+ && !uatomic_read(&stream->chan->nb_init_stream_left)) {
/* Go for channel deletion! */
free_chan = stream->chan;
}
end:
- pthread_mutex_unlock(&consumer_data.lock);
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
consumer_del_channel(free_chan);
}
-free_stream:
- call_rcu(&stream->node.head, consumer_free_stream);
+free_stream_rcu:
+ call_rcu(&stream->node.head, free_stream_rcu);
}
/*
* Action done with the metadata stream when adding it to the consumer internal
* data structures to handle it.
*/
-static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
+static int add_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht)
{
int ret = 0;
struct consumer_relayd_sock_pair *relayd;
+ struct lttng_ht_iter iter;
+ struct lttng_ht_node_ulong *node;
assert(stream);
assert(ht);
- DBG3("Adding metadata stream %d to hash table", stream->wait_fd);
+ DBG3("Adding metadata stream %d to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->lock);
/*
* From here, refcounts are updated so be _careful_ when returning an error
*/
rcu_read_lock();
+
+ /*
+ * Lookup the stream just to make sure it does not exist in our internal
+ * state. This should NEVER happen.
+ */
+ lttng_ht_lookup(ht, (void *)((unsigned long) stream->key), &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ assert(!node);
+
/* Find relayd and, if one is found, increment refcount. */
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd != NULL) {
uatomic_inc(&stream->chan->refcount);
/*
- * When nb_init_streams reaches 0, we don't need to trigger any action in
- * terms of destroying the associated channel, because the action that
+ * When nb_init_stream_left reaches 0, we don't need to trigger any action
+ * in terms of destroying the associated channel, because the action that
* causes the count to become 0 also causes a stream to be added. The
* channel deletion will thus be triggered by the following removal of this
* stream.
*/
- if (uatomic_read(&stream->chan->nb_init_streams) > 0) {
- uatomic_dec(&stream->chan->nb_init_streams);
+ if (uatomic_read(&stream->chan->nb_init_stream_left) > 0) {
+ uatomic_dec(&stream->chan->nb_init_stream_left);
}
- /* Steal stream identifier to avoid having streams with the same key */
- consumer_steal_stream_key(stream->key, ht);
-
lttng_ht_add_unique_ulong(ht, &stream->node);
/*
rcu_read_unlock();
+ pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
rcu_read_lock();
cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (stream->endpoint_status != CONSUMER_ENDPOINT_INACTIVE) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
}
/* Delete it right now */
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
/* Validate delete flag of the stream */
- if (!stream->endpoint_status) {
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
continue;
}
/*
rcu_register_thread();
+ metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ if (!metadata_ht) {
+ /* ENOMEM at this point. Better to bail out. */
+ goto error;
+ }
+
DBG("Thread metadata poll started");
/* Size is set to 1 for the consumer_metadata pipe */
DBG("Metadata main loop started");
while (1) {
- lttng_poll_reset(&events);
-
- nb_fd = LTTNG_POLL_GETNB(&events);
-
/* Only the metadata pipe is set */
- if (nb_fd == 0 && consumer_quit == 1) {
+ if (LTTNG_POLL_GETNB(&events) == 0 && consumer_quit == 1) {
goto end;
}
restart:
- DBG("Metadata poll wait with %d fd(s)", nb_fd);
+ DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events));
ret = lttng_poll_wait(&events, -1);
DBG("Metadata event catched in thread");
if (ret < 0) {
goto error;
}
+ nb_fd = ret;
+
/* From here, the event is a metadata wait fd */
for (i = 0; i < nb_fd; i++) {
revents = LTTNG_POLL_GETEV(&events, i);
* since their might be data to consume.
*/
lttng_poll_del(&events, ctx->consumer_metadata_pipe[0]);
- close(ctx->consumer_metadata_pipe[0]);
+ ret = close(ctx->consumer_metadata_pipe[0]);
+ if (ret < 0) {
+ PERROR("close metadata pipe");
+ }
continue;
} else if (revents & LPOLLIN) {
do {
if (stream == NULL) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
- continue;
+ goto restart;
}
DBG("Adding metadata stream %d to poll set",
stream->wait_fd);
- ret = consumer_add_metadata_stream(stream, metadata_ht);
+ ret = add_metadata_stream(stream, metadata_ht);
if (ret) {
ERR("Unable to add metadata stream");
/* Stream was not setup properly. Continuing. */
DBG("Metadata poll thread exiting");
lttng_poll_clean(&events);
- if (metadata_ht) {
- destroy_stream_ht(metadata_ht);
- }
+ destroy_stream_ht(metadata_ht);
rcu_unregister_thread();
return NULL;
data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
if (data_ht == NULL) {
+ /* ENOMEM at this point. Better to bail out. */
goto end;
}
*/
pthread_mutex_lock(&consumer_data.lock);
if (consumer_data.need_update) {
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ pollfd = NULL;
+
+ free(local_stream);
+ local_stream = NULL;
/* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
pthread_mutex_unlock(&consumer_data.lock);
goto end;
}
- ret = consumer_update_poll_array(ctx, &pollfd, local_stream,
+ ret = update_poll_array(ctx, &pollfd, local_stream,
data_ht);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
- size_t pipe_readlen;
+ ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
+ if (pipe_readlen < 0) {
+ PERROR("read consumer data pipe");
+ /* Continue so we can at least handle the current stream(s). */
+ continue;
+ }
/*
* If the stream is NULL, just ignore it. It's also possible that
continue;
}
- ret = consumer_add_stream(new_stream, data_ht);
+ ret = add_stream(new_stream, data_ht);
if (ret) {
ERR("Consumer add stream %d failed. Continuing",
new_stream->key);
}
end:
DBG("polling thread exiting");
- if (pollfd != NULL) {
- free(pollfd);
- pollfd = NULL;
- }
- if (local_stream != NULL) {
- free(local_stream);
- local_stream = NULL;
- }
+ free(pollfd);
+ free(local_stream);
/*
* Close the write side of the pipe so epoll_wait() in
* only tracked fd in the poll set. The thread will take care of closing
* the read side.
*/
- close(ctx->consumer_metadata_pipe[1]);
-
- if (data_ht) {
- destroy_data_stream_ht(data_ht);
+ ret = close(ctx->consumer_metadata_pipe[1]);
+ if (ret < 0) {
+ PERROR("close data pipe");
}
+ destroy_data_stream_ht(data_ht);
+
rcu_unregister_thread();
return NULL;
}
*/
void *consumer_thread_sessiond_poll(void *data)
{
- int sock, client_socket, ret;
+ int sock = -1, client_socket, ret;
/*
* structure to poll for incoming data on communication socket avoids
* making blocking sockets.
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
- if (sock <= 0) {
+ if (sock < 0) {
WARN("On accept");
goto end;
}
goto end;
}
+ /* This socket is not useful anymore. */
+ ret = close(client_socket);
+ if (ret < 0) {
+ PERROR("close client_socket");
+ }
+ client_socket = -1;
+
/* update the polling structure to poll on the established socket */
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
DBG("consumer_thread_receive_fds received quit from signal");
goto end;
}
- DBG("received fds on sock");
+ DBG("received command on sock");
}
end:
- DBG("consumer_thread_receive_fds exiting");
+ DBG("Consumer thread sessiond poll exiting");
/*
* when all fds have hung up, the polling thread
*/
notify_thread_pipe(ctx->consumer_data_pipe[1]);
+ /* Cleaning up possibly open sockets. */
+ if (sock >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close sock sessiond poll");
+ }
+ }
+ if (client_socket >= 0) {
+ ret = close(sock);
+ if (ret < 0) {
+ PERROR("close client_socket sessiond poll");
+ }
+ }
+
rcu_unregister_thread();
return NULL;
}
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
+ ssize_t ret;
+
+ pthread_mutex_lock(&stream->lock);
+
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
- return lttng_kconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_kconsumer_read_subbuffer(stream, ctx);
+ break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- return lttng_ustconsumer_read_subbuffer(stream, ctx);
+ ret = lttng_ustconsumer_read_subbuffer(stream, ctx);
+ break;
default:
ERR("Unknown consumer_data type");
assert(0);
- return -ENOSYS;
+ ret = -ENOSYS;
+ break;
}
+
+ pthread_mutex_unlock(&stream->lock);
+ return ret;
}
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream)
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
-
- metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(metadata_ht);
- data_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
- assert(data_ht);
}
/*
*/
int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
- struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock)
+ struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock,
+ unsigned int sessiond_id)
{
- int fd, ret = -1;
+ int fd = -1, ret = -1, relayd_created = 0;
+ enum lttng_error_code ret_code = LTTNG_OK;
struct consumer_relayd_sock_pair *relayd;
DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ /* First send a status message before receiving the fds. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto error;
+ }
+
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
+ ret = -1;
goto error;
}
+ relayd->sessiond_session_id = (uint64_t) sessiond_id;
+ relayd_created = 1;
}
/* Poll on consumer socket. */
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD);
ret = -1;
+ fd = -1; /* Just in case it gets set with an invalid value. */
+ goto error_close;
+ }
+
+ /* We have the fds without error. Send status back. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
goto error;
}
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->control_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->control_sock.fd >= 0) {
+ if (close(relayd->control_sock.fd)) {
+ PERROR("close relayd control socket");
+ }
+ }
+ /* Handle create_sock error. */
if (ret < 0) {
goto error;
}
- /* Close the created socket fd which is useless */
- close(relayd->control_sock.fd);
-
/* Assign new file descriptor */
relayd->control_sock.fd = fd;
+
+ /*
+ * Create a session on the relayd and store the returned id. Lock the
+ * control socket mutex if the relayd was NOT created before.
+ */
+ if (!relayd_created) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ }
+ ret = relayd_create_session(&relayd->control_sock,
+ &relayd->relayd_session_id);
+ if (!relayd_created) {
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ }
+ if (ret < 0) {
+ /*
+ * Close all sockets of a relayd object. It will be freed if it was
+ * created at the error code path or else it will be garbage
+ * collect.
+ */
+ (void) relayd_close(&relayd->control_sock);
+ (void) relayd_close(&relayd->data_sock);
+ goto error;
+ }
+
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock, relayd_sock);
ret = lttcomm_create_sock(&relayd->data_sock);
+ /* Immediately try to close the created socket if valid. */
+ if (relayd->data_sock.fd >= 0) {
+ if (close(relayd->data_sock.fd)) {
+ PERROR("close relayd data socket");
+ }
+ }
+ /* Handle create_sock error. */
if (ret < 0) {
goto error;
}
- /* Close the created socket fd which is useless */
- close(relayd->data_sock.fd);
-
/* Assign new file descriptor */
relayd->data_sock.fd = fd;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
+ ret = -1;
goto error;
}
add_relayd(relayd);
/* All good! */
- ret = 0;
+ return 0;
error:
+ /* Close received socket if valid. */
+ if (fd >= 0) {
+ if (close(fd)) {
+ PERROR("close received socket");
+ }
+ }
+
+error_close:
+ if (relayd_created) {
+ free(relayd);
+ }
+
return ret;
}
return ret;
}
+/*
+ * Search for a relayd associated to the session id and return the reference.
+ *
+ * A rcu read side lock MUST be acquire before calling this function and locked
+ * until the relayd object is no longer necessary.
+ */
+static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
+{
+ struct lttng_ht_iter iter;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+
+ /* Iterate over all relayd since they are indexed by net_seq_idx. */
+ cds_lfht_for_each_entry(consumer_data.relayd_ht->ht, &iter.iter, relayd,
+ node.node) {
+ /*
+ * Check by sessiond id which is unique here where the relayd session
+ * id might not be when having multiple relayd.
+ */
+ if (relayd->sessiond_session_id == id) {
+ /* Found the relayd. There can be only one per id. */
+ goto found;
+ }
+ }
+
+ return NULL;
+
+found:
+ return relayd;
+}
+
/*
* Check if for a given session id there is still data needed to be extract
* from the buffers.
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd;
+ struct consumer_relayd_sock_pair *relayd = NULL;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
/* Ease our life a bit */
ht = consumer_data.stream_list_ht;
+ relayd = find_relayd_by_session_id(id);
+ if (relayd) {
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_begin_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ /* Communication error thus the relayd so no data pending. */
+ goto data_not_pending;
+ }
+ }
+
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct((void *)((unsigned long) id), lttng_ht_seed),
ht->match_fct, (void *)((unsigned long) id),
/* If this call fails, the stream is being used hence data pending. */
ret = stream_try_lock(stream);
if (!ret) {
- goto data_not_pending;
+ goto data_pending;
}
/*
ret = data_pending(stream);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
/* Relayd check */
- if (stream->net_seq_idx != -1) {
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- /*
- * At this point, if the relayd object is not available for the
- * given stream, it is because the relayd is being cleaned up
- * so every stream associated with it (for a session id value)
- * are or will be marked for deletion hence no data pending.
- */
- pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
- }
-
+ if (relayd) {
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
if (stream->metadata_flag) {
- ret = relayd_quiescent_control(&relayd->control_sock);
+ ret = relayd_quiescent_control(&relayd->control_sock,
+ stream->relayd_stream_id);
} else {
ret = relayd_data_pending(&relayd->control_sock,
- stream->relayd_stream_id, stream->next_net_seq_num);
+ stream->relayd_stream_id,
+ stream->next_net_seq_num - 1);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
- goto data_not_pending;
+ goto data_pending;
}
}
pthread_mutex_unlock(&stream->lock);
}
+ if (relayd) {
+ unsigned int is_data_inflight = 0;
+
+ /* Send init command for data pending. */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_end_data_pending(&relayd->control_sock,
+ relayd->relayd_session_id, &is_data_inflight);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ goto data_not_pending;
+ }
+ if (is_data_inflight) {
+ goto data_pending;
+ }
+ }
+
/*
- * Finding _no_ node in the hash table means that the stream(s) have been
- * removed thus data is guaranteed to be available for analysis from the
- * trace files. This is *only* true for local consumer and not network
- * streaming.
+ * Finding _no_ node in the hash table and no inflight data means that the
+ * stream(s) have been removed thus data is guaranteed to be available for
+ * analysis from the trace files.
*/
+data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 0;
-data_not_pending:
+data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
return 1;
}
+
+/*
+ * Send a ret code status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_msg(int sock, int ret_code)
+{
+ struct lttcomm_consumer_status_msg msg;
+
+ msg.ret_code = ret_code;
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}
+
+/*
+ * Send a channel status message to the sessiond daemon.
+ *
+ * Return the sendmsg() return value.
+ */
+int consumer_send_status_channel(int sock,
+ struct lttng_consumer_channel *channel)
+{
+ struct lttcomm_consumer_status_channel msg;
+
+ assert(sock >= 0);
+
+ if (!channel) {
+ msg.ret_code = -LTTNG_ERR_UST_CHAN_FAIL;
+ } else {
+ msg.ret_code = LTTNG_OK;
+ msg.key = channel->key;
+ msg.stream_count = channel->streams.count;
+ }
+
+ return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
+}