CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Sun, 17 Jan 2021 22:51:03 +0000 (17:51 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 25 Jan 2021 19:12:49 +0000 (14:12 -0500)
Note: this patch is not a bugfix, it is a targeted modification to
improve timing and predictability for particular scenarios.

Note: this patch only applies to userspace tracing.

Part of this work might find itself upstream but most probably not in
this form. Deeper work should be done upstream to mitigate the real root
of the problem which is how the network protocol for lttng-live works and
lifetime management of the different component required for the live
feature.

Scenario
=======
   System with high CPU resource restriction (base high workload 99% cpu load),
   High CPU count (32),
   Slowish network (ping ~60-100ms),
   Timing constraint for create, configure, start, stop , destroy , create ... cycles.

A lot of time (3s) is wasted in the iteration loop of stream creation.
The consumer sendis the add_stream command for a single stream then wait
for a reply containing the stream id. Then the next stream is treated.

What can be done is to split this operation in two iteration/phase.
The send and receive phase. TCP guarantee ordering of send/recv call.
Thus, we can iterate over the stream and perform the send part. Then
iterate over the stream again to perform the receive part.

This could lead to deadlock in certain scenario were the receiving end
(lttng-relayd) have a small send buffer and receive buffer and we end up
blocking on the send phase on the relayd side.  This is more that likely
not applicable to our current use case since the reply from lttng-relayd
are ~4 bytes long. For 32 streams, we are talking about 128 bytes of
data to receive. This mostly disqualify this solution for upstream as-is
since we would need to validate these scenarios further. Async
communication can ease all this (either on lttng-relayd side or
lttng-sessiond), still a lot more work should be put into this for
direct upstreaming.

Still, it is good enough for our current usage pattern.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Change-Id: I3c74fd8086230b753684fc6d8a67348f839011b0

src/common/consumer/consumer.c
src/common/consumer/consumer.h
src/common/relayd/relayd.c
src/common/relayd/relayd.h
src/common/ust-consumer/ust-consumer.c

index 688492e439d0811cc17308667055a5bcc7c3b9d5..0e34d3a2b65ceb310723650b2d872f07be191157 100644 (file)
@@ -730,6 +730,72 @@ end:
        return ret;
 }
 
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel)
+{
+       int ret = 0;
+       struct consumer_relayd_sock_pair *relayd;
+       struct lttng_consumer_stream *stream;
+
+       assert(channel);
+
+       rcu_read_lock();
+       relayd = consumer_find_relayd(channel->relayd_id);
+
+       if (relayd == NULL) {
+               ERR("relayd ID %" PRIu64 " unknown. Can't send streams.",
+                               channel->relayd_id);
+               ret = -1;
+               goto end_rcu_unlock;
+       }
+
+       /*
+        * Perform the send part of the relayd_add_stream for all stream.
+        *
+        * This ensure that we do not wait for response in between each command
+        * before sending the next one.
+        *
+        * This result in a waterfall of send command and a waterfall of recv.
+        *
+        * This leverage the TCP order guarantee for send and receive.
+        */
+       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+               ret = relayd_add_stream_send(&relayd->control_sock, stream->name,
+                               stream->chan->pathname,
+                               stream->chan->tracefile_size, stream->chan->tracefile_count);
+               if (ret < 0) {
+                       ERR("Relayd add stream send failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_socket_mutex;
+               }
+       }
+
+       /* Perform individual recv part of relayd_add_stream */
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               health_code_update();
+
+               ret = relayd_add_stream_rcv(&relayd->control_sock, &stream->relayd_stream_id);
+               if (ret < 0) {
+                       ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_socket_mutex;
+               }
+
+               uatomic_inc(&relayd->refcount);
+               stream->sent_to_relayd = 1;
+
+               DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64,
+                       stream->name, stream->key, stream->relayd_id);
+       }
+
+end_socket_mutex:
+       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+end_rcu_unlock:
+       rcu_read_unlock();
+       return ret;
+}
+
 /*
  * Find a relayd and send the streams sent message
  *
index a5a34d44f75cfbf68f700249fac50b3dd296d94b..14b52bb3ee925ecec208c6e3894159a753265b66 100644 (file)
@@ -919,5 +919,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
 void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
 int consumer_create_index_file(struct lttng_consumer_stream *stream);
 void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
+int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel);
 
 #endif /* LIB_CONSUMER_H */
index b936a4d3e7798e35e90983b6e20692d2b291bfc8..afcd2d838b3d0f7b2e256ba368a1fac635651135 100644 (file)
@@ -320,6 +320,109 @@ error:
        return ret;
 }
 
+/*
+ * Add stream on the relayd. Send part.
+ *
+ * On success return 0 else return ret_code negative value.
+ */
+int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name,
+               const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count)
+{
+       int ret;
+       struct lttcomm_relayd_add_stream msg;
+       struct lttcomm_relayd_add_stream_2_2 msg_2_2;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+       assert(channel_name);
+       assert(pathname);
+
+       DBG("Relayd adding stream for channel name %s. Part send", channel_name);
+
+       /* Compat with relayd 2.1 */
+       if (rsock->minor == 1) {
+               memset(&msg, 0, sizeof(msg));
+               if (lttng_strncpy(msg.channel_name, channel_name,
+                               sizeof(msg.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg.pathname, pathname,
+                               sizeof(msg.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       } else {
+               memset(&msg_2_2, 0, sizeof(msg_2_2));
+               /* Compat with relayd 2.2+ */
+               if (lttng_strncpy(msg_2_2.channel_name, channel_name,
+                               sizeof(msg_2_2.channel_name))) {
+                       ret = -1;
+                       goto error;
+               }
+               if (lttng_strncpy(msg_2_2.pathname, pathname,
+                               sizeof(msg_2_2.pathname))) {
+                       ret = -1;
+                       goto error;
+               }
+               msg_2_2.tracefile_size = htobe64(tracefile_size);
+               msg_2_2.tracefile_count = htobe64(tracefile_count);
+
+               /* Send command */
+               ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0);
+               if (ret < 0) {
+                       goto error;
+               }
+       }
+
+       DBG("Relayd add stream sent for channel name %s.", channel_name);
+       ret = 0;
+
+error:
+       return ret;
+}
+
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id)
+{
+       int ret;
+       struct lttcomm_relayd_status_stream reply;
+
+       /* Code flow error. Safety net. */
+       assert(rsock);
+
+       /* Waiting for reply */
+       ret = recv_reply(rsock, (void *) &reply, sizeof(reply));
+       if (ret < 0) {
+               goto error;
+       }
+
+       /* Back to host bytes order. */
+       reply.handle = be64toh(reply.handle);
+       reply.ret_code = be32toh(reply.ret_code);
+
+       /* Return session id or negative ret code. */
+       if (reply.ret_code != LTTNG_OK) {
+               ret = -1;
+               ERR("Relayd add stream replied error %d", reply.ret_code);
+       } else {
+               /* Success */
+               ret = 0;
+               *_stream_id = reply.handle;
+       }
+
+       DBG("Relayd stream added successfully with handle %" PRIu64,
+               reply.handle);
+
+error:
+       return ret;
+}
+
 /*
  * Inform the relay that all the streams for the current channel has been sent.
  *
index f090a0db63681b3b83ec451113039b3d40dd0b75..4ea2f6345ba712041489bc9d213996ef8e949e99 100644 (file)
@@ -52,4 +52,8 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock,
 int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock,
                uint64_t stream_id, uint64_t version);
 
+int relayd_add_stream_send(struct lttcomm_relayd_sock *sock, const char *channel_name,
+               const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count);
+int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id);
+
 #endif /* _RELAYD_H */
index 97cd27d9dd02cf5022923ce2159b870f3c976156..c9ce0d1b295b03ef641a640b85009d60ba7de099 100644 (file)
@@ -537,7 +537,6 @@ static int send_sessiond_channel(int sock,
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct lttng_consumer_stream *stream;
-       uint64_t relayd_id = -1ULL;
 
        assert(channel);
        assert(ctx);
@@ -546,25 +545,16 @@ static int send_sessiond_channel(int sock,
        DBG("UST consumer sending channel %s to sessiond", channel->name);
 
        if (channel->relayd_id != (uint64_t) -1ULL) {
-               cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-
-                       health_code_update();
-
-                       /* Try to send the stream to the relayd if one is available. */
-                       ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
-                       if (ret < 0) {
-                               /*
-                                * Flag that the relayd was the problem here probably due to a
-                                * communicaton error on the socket.
-                                */
-                               if (relayd_error) {
-                                       *relayd_error = 1;
-                               }
-                               ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
-                       }
-                       if (relayd_id == -1ULL) {
-                               relayd_id = stream->relayd_id;
+               ret = consumer_send_relayd_channel_bulk(channel);
+               if (ret < 0) {
+                       /*
+                        * Flag that the relayd was the problem here probably due to a
+                        * communicaton error on the socket.
+                        */
+                       if (relayd_error) {
+                               *relayd_error = 1;
                        }
+                       ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
                }
        }
 
This page took 0.044933 seconds and 5 git commands to generate.