uint64_t metadata_id;
struct consumer_relayd_sock_pair *relayd = NULL;
+ /* RCU lock for the relayd pointer */
+ rcu_read_lock();
+
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != -1) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
+ ERR("UST consumer mmap(), unable to find relay for index %d",
+ stream->net_seq_idx);
goto end;
}
}
/* Handle stream on the relayd if the output is on the network */
if (relayd) {
+ unsigned long netlen = len;
+
if (stream->metadata_flag) {
/* Only lock if metadata since we use the control socket. */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ netlen += sizeof(stream->relayd_stream_id);
}
- ret = consumer_handle_stream_before_relayd(stream, len);
+ ret = consumer_handle_stream_before_relayd(stream, netlen);
if (ret >= 0) {
outfd = ret;
if (relayd && stream->metadata_flag) {
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
+ rcu_read_unlock();
return written;
}
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
+ DBG("Consumer received unexpected message size %zd (expects %zu)",
+ ret, sizeof(msg));
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
return ret;
}
return -ENOENT;
}
+ /* relayd need RCU read-side lock */
+ rcu_read_lock();
+
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
int fds[1];
size_t nb_fd = 1;
+ DBG("UST Consumer adding channel");
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
size_t nb_fd = 2;
struct consumer_relayd_sock_pair *relayd = NULL;
+ DBG("UST Consumer adding stream");
+
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
return -EINTR;
return ret;
}
+ DBG("consumer_add_stream chan %d stream %d",
+ msg.u.stream.channel_key,
+ msg.u.stream.stream_key);
+
assert(msg.u.stream.output == LTTNG_EVENT_MMAP);
- new_stream = consumer_allocate_stream(msg.u.channel.channel_key,
+ new_stream = consumer_allocate_stream(msg.u.stream.channel_key,
msg.u.stream.stream_key,
fds[0], fds[1],
msg.u.stream.state,
new_stream->relayd_stream_id);
break;
}
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("UST consumer destroying relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+ if (relayd == NULL) {
+ ERR("Unable to find relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+ }
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+ break;
+ }
case LTTNG_CONSUMER_UPDATE_STREAM:
{
return -ENOSYS;
ret = write(ctx->consumer_poll_pipe[1], "", 1);
} while (ret < 0 && errno == EINTR);
end_nosignal:
+ rcu_read_unlock();
return 0;
}