return ret;
}
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel)
+{
+ int ret = 0;
+ struct consumer_relayd_sock_pair *relayd;
+ struct lttng_consumer_stream *stream;
+
+ assert(channel);
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(channel->relayd_id);
+
+ if (relayd == NULL) {
+ ERR("relayd ID %" PRIu64 " unknown. Can't send streams.",
+ channel->relayd_id);
+ ret = -1;
+ goto end_rcu_unlock;
+ }
+
+ /*
+ * Perform the send part of the relayd_add_stream for all stream.
+ *
+ * This ensure that we do not wait for response in between each command
+ * before sending the next one.
+ *
+ * This result in a waterfall of send command and a waterfall of recv.
+ *
+ * This leverage the TCP order guarantee for send and receive.
+ */
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+ ret = relayd_add_stream_send(&relayd->control_sock, stream->name,
+ stream->chan->pathname,
+ stream->chan->tracefile_size, stream->chan->tracefile_count);
+ if (ret < 0) {
+ ERR("Relayd add stream send failed. Cleaning up relayd %" PRIu64".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_socket_mutex;
+ }
+ }
+
+ /* Perform individual recv part of relayd_add_stream */
+ cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+ health_code_update();
+
+ ret = relayd_add_stream_rcv(&relayd->control_sock, &stream->relayd_stream_id);
+ if (ret < 0) {
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end_socket_mutex;
+ }
+
+ uatomic_inc(&relayd->refcount);
+ stream->sent_to_relayd = 1;
+
+ DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+ stream->name, stream->key, stream->relayd_id);
+ }
+
+end_socket_mutex:
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end_rcu_unlock:
+ rcu_read_unlock();
+ return ret;
+}
+
/*
* Find a relayd and send the streams sent message
*