free(stream);
}
-static
-void consumer_free_metadata_stream(struct rcu_head *head)
-{
- struct lttng_ht_node_ulong *node =
- caa_container_of(head, struct lttng_ht_node_ulong, head);
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, waitfd_node);
-
- free(stream);
-}
-
/*
* RCU protected relayd socket pair free.
*/
iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
-
rcu_read_unlock();
- if (consumer_data.stream_count <= 0) {
- goto end;
- }
+ assert(consumer_data.stream_count > 0);
consumer_data.stream_count--;
- if (!stream) {
- goto end;
- }
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
stream->metadata_flag = metadata_flag;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
- lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
- lttng_ht_node_init_ulong(&stream->node, stream->key);
+
+ /*
+ * Index differently the metadata node because the thread is using an
+ * internal hash table to match streams in the metadata_ht to the epoll set
+ * file descriptor.
+ */
+ if (metadata_flag) {
+ lttng_ht_node_init_ulong(&stream->node, stream->wait_fd);
+ } else {
+ lttng_ht_node_init_ulong(&stream->node, stream->key);
+ }
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
rcu_read_unlock();
/*
- * Insert the consumer_poll_pipe at the end of the array and don't
+ * Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+ (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_poll_pipe);
+ ret = pipe(ctx->consumer_data_pipe);
if (ret < 0) {
PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
for (i = 0; i < 2; i++) {
int err;
- err = close(ctx->consumer_poll_pipe[i]);
+ err = close(ctx->consumer_data_pipe[i]);
if (err) {
PERROR("close");
}
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[0]);
+ ret = close(ctx->consumer_data_pipe[0]);
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[1]);
+ ret = close(ctx->consumer_data_pipe[1]);
if (ret) {
PERROR("close");
}
}
rcu_read_lock();
- cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, waitfd_node.node) {
+ cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) {
ret = lttng_ht_del(ht, &iter);
assert(!ret);
- call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
+ call_rcu(&stream->node.head, consumer_free_stream);
}
rcu_read_unlock();
}
rcu_read_lock();
- iter.iter.node = &stream->waitfd_node.node;
+ iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
rcu_read_unlock();
}
free_stream:
- call_rcu(&stream->waitfd_node.head, consumer_free_metadata_stream);
+ call_rcu(&stream->node.head, consumer_free_stream);
}
/*
/* Steal stream identifier to avoid having streams with the same key */
consumer_steal_stream_key(stream->key, ht);
- lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
+ lttng_ht_add_unique_ulong(ht, &stream->node);
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
assert(node);
stream = caa_container_of(node, struct lttng_consumer_stream,
- waitfd_node);
+ node);
/* Check for error event */
if (revents & (LPOLLERR | LPOLLHUP)) {
local_stream = NULL;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
goto end;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
}
/*
- * If the consumer_poll_pipe triggered poll go directly to the
+ * If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- DBG("consumer_poll_pipe wake up");
+ DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
do {
struct lttng_consumer_stream *null_stream = NULL;
- ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ ret = write(ctx->consumer_data_pipe[1], &null_stream,
sizeof(null_stream));
} while (ret < 0 && errno == EINTR);