CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream
[lttng-tools.git] / src / common / consumer / consumer.c
index 688492e439d0811cc17308667055a5bcc7c3b9d5..0e34d3a2b65ceb310723650b2d872f07be191157 100644 (file)
@@ -730,6 +730,72 @@ end:
        return ret;
 }
 
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+       struct lttng_consumer_stream *stream;
+
+       assert(channel);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(channel->relayd_id);
+
+       if (relayd == NULL) {
+               ERR("relayd ID %" PRIu64 " unknown. Can't send streams.",
+                               channel->relayd_id);
+               ret = -1;
+               goto end_rcu_unlock;
+       }
+
+       /*
+        * Perform the send part of the relayd_add_stream for all stream.
+        *
+        * This ensure that we do not wait for response in between each command
+        * before sending the next one.
+        *
+        * This result in a waterfall of send command and a waterfall of recv.
+        *
+        * This leverage the TCP order guarantee for send and receive.
+        */
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               ret = relayd_add_stream_send(&relayd->control_sock, stream->name,
+                               stream->chan->pathname,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               if (ret < 0) {
+                       ERR("Relayd add stream send failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_socket_mutex;
+               }
+       }
+
+       /* Perform individual recv part of relayd_add_stream */
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+
+               ret = relayd_add_stream_rcv(&relayd->control_sock, &stream->relayd_stream_id);
+               if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_socket_mutex;
+               }
+
+               uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
+
+               DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->relayd_id);
+       }
+
+end_socket_mutex:
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end_rcu_unlock:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and send the streams sent message
  *
This page took 0.025613 seconds and 5 git commands to generate.