/* Poll on consumer socket. */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
/* block */
if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ rcu_read_unlock();
return -EINTR;
}
ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1);
if (ret != sizeof(fd)) {
lttng_consumer_send_error(ctx, CONSUMERD_ERROR_RECV_FD);
+ rcu_read_unlock();
return ret;
}
msg.u.stream.metadata_flag);
if (new_stream == NULL) {
lttng_consumer_send_error(ctx, CONSUMERD_OUTFD_ERROR);
- goto end;
+ goto end_nosignal;
}
/* The stream is not metadata. Get relayd reference if exists. */
&new_stream->relayd_stream_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else if (msg.u.stream.net_index != -1) {
ERR("Network sequence index %d unknown. Not adding stream.",
msg.u.stream.net_index);
free(new_stream);
- goto end;
+ goto end_nosignal;
}
if (ctx->on_recv_stream != NULL) {
if (ret == 0) {
consumer_add_stream(new_stream);
} else if (ret < 0) {
- goto end;
+ goto end_nosignal;
}
} else {
consumer_add_stream(new_stream);
}
case LTTNG_CONSUMER_UPDATE_STREAM:
{
- if (ctx->on_update_stream != NULL) {
- ret = ctx->on_update_stream(msg.u.stream.stream_key, msg.u.stream.state);
- if (ret == 0) {
- consumer_change_stream_state(msg.u.stream.stream_key, msg.u.stream.state);
- } else if (ret < 0) {
- goto end;
- }
- } else {
- consumer_change_stream_state(msg.u.stream.stream_key,
- msg.u.stream.state);
+ rcu_read_unlock();
+ return -ENOSYS;
+ }
+ case LTTNG_CONSUMER_DESTROY_RELAYD:
+ {
+ struct consumer_relayd_sock_pair *relayd;
+
+ DBG("Kernel consumer destroying relayd %zu",
+ msg.u.destroy_relayd.net_seq_idx);
+
+ /* Get relayd reference if exists. */
+ relayd = consumer_find_relayd(msg.u.destroy_relayd.net_seq_idx);
+ if (relayd == NULL) {
+ ERR("Unable to find relayd %zu", msg.u.destroy_relayd.net_seq_idx);
+ goto end_nosignal;
}
- break;
+
+ /* Set destroy flag for this object */
+ uatomic_set(&relayd->destroy_flag, 1);
+
+ /* Destroy the relayd if refcount is 0 else set the destroy flag. */
+ if (uatomic_read(&relayd->refcount) == 0) {
+ consumer_destroy_relayd(relayd);
+ }
+ goto end_nosignal;
}
default:
- break;
+ goto end_nosignal;
}
-end:
+
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
+ * Wake-up the other end by writing a null byte in the pipe (non-blocking).
+ * Important note: Because writing into the pipe is non-blocking (and
+ * therefore we allow dropping wakeup data, as long as there is wakeup data
+ * present in the pipe buffer to wake up the other end), the other end
+ * should perform the following sequence for waiting:
+ *
* 1) empty the pipe (reads).
* 2) perform update operation.
* 3) wait on the pipe (poll).