From 0934dd7e7c31433d5976119c05779e3800573e04 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Sun, 17 Jan 2021 17:51:03 -0500 Subject: [PATCH] CUSTOM: consumer: ust: relayd: perform 2 stage relay_add_stream Note: this patch is not a bugfix, it is a targeted modification to improve timing and predictability for particular scenarios. Note: this patch only applies to userspace tracing. Part of this work might find itself upstream but most probably not in this form. Deeper work should be done upstream to mitigate the real root of the problem which is how the network protocol for lttng-live works and lifetime management of the different component required for the live feature. Scenario ======= System with high CPU resource restriction (base high workload 99% cpu load), High CPU count (32), Slowish network (ping ~60-100ms), Timing constraint for create, configure, start, stop , destroy , create ... cycles. A lot of time (3s) is wasted in the iteration loop of stream creation. The consumer sendis the add_stream command for a single stream then wait for a reply containing the stream id. Then the next stream is treated. What can be done is to split this operation in two iteration/phase. The send and receive phase. TCP guarantee ordering of send/recv call. Thus, we can iterate over the stream and perform the send part. Then iterate over the stream again to perform the receive part. This could lead to deadlock in certain scenario were the receiving end (lttng-relayd) have a small send buffer and receive buffer and we end up blocking on the send phase on the relayd side. This is more that likely not applicable to our current use case since the reply from lttng-relayd are ~4 bytes long. For 32 streams, we are talking about 128 bytes of data to receive. This mostly disqualify this solution for upstream as-is since we would need to validate these scenarios further. Async communication can ease all this (either on lttng-relayd side or lttng-sessiond), still a lot more work should be put into this for direct upstreaming. Still, it is good enough for our current usage pattern. Signed-off-by: Jonathan Rajotte Change-Id: I3c74fd8086230b753684fc6d8a67348f839011b0 --- src/common/consumer/consumer.c | 66 ++++++++++++++++ src/common/consumer/consumer.h | 1 + src/common/relayd/relayd.c | 103 +++++++++++++++++++++++++ src/common/relayd/relayd.h | 4 + src/common/ust-consumer/ust-consumer.c | 28 +++---- 5 files changed, 183 insertions(+), 19 deletions(-) diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 688492e43..0e34d3a2b 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -730,6 +730,72 @@ end: return ret; } +int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + struct lttng_consumer_stream *stream; + + assert(channel); + + rcu_read_lock(); + relayd = consumer_find_relayd(channel->relayd_id); + + if (relayd == NULL) { + ERR("relayd ID %" PRIu64 " unknown. Can't send streams.", + channel->relayd_id); + ret = -1; + goto end_rcu_unlock; + } + + /* + * Perform the send part of the relayd_add_stream for all stream. + * + * This ensure that we do not wait for response in between each command + * before sending the next one. + * + * This result in a waterfall of send command and a waterfall of recv. + * + * This leverage the TCP order guarantee for send and receive. + */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + health_code_update(); + ret = relayd_add_stream_send(&relayd->control_sock, stream->name, + stream->chan->pathname, + stream->chan->tracefile_size, stream->chan->tracefile_count); + if (ret < 0) { + ERR("Relayd add stream send failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); + goto end_socket_mutex; + } + } + + /* Perform individual recv part of relayd_add_stream */ + cds_list_for_each_entry(stream, &channel->streams.head, send_node) { + health_code_update(); + + ret = relayd_add_stream_rcv(&relayd->control_sock, &stream->relayd_stream_id); + if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->id); + lttng_consumer_cleanup_relayd(relayd); + goto end_socket_mutex; + } + + uatomic_inc(&relayd->refcount); + stream->sent_to_relayd = 1; + + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->relayd_id); + } + +end_socket_mutex: + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); +end_rcu_unlock: + rcu_read_unlock(); + return ret; +} + /* * Find a relayd and send the streams sent message * diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index a5a34d44f..14b52bb3e 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -919,5 +919,6 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); +int consumer_send_relayd_channel_bulk(struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */ diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index b936a4d3e..afcd2d838 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -320,6 +320,109 @@ error: return ret; } +/* + * Add stream on the relayd. Send part. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_add_stream_send(struct lttcomm_relayd_sock *rsock, const char *channel_name, + const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count) +{ + int ret; + struct lttcomm_relayd_add_stream msg; + struct lttcomm_relayd_add_stream_2_2 msg_2_2; + + /* Code flow error. Safety net. */ + assert(rsock); + assert(channel_name); + assert(pathname); + + DBG("Relayd adding stream for channel name %s. Part send", channel_name); + + /* Compat with relayd 2.1 */ + if (rsock->minor == 1) { + memset(&msg, 0, sizeof(msg)); + if (lttng_strncpy(msg.channel_name, channel_name, + sizeof(msg.channel_name))) { + ret = -1; + goto error; + } + if (lttng_strncpy(msg.pathname, pathname, + sizeof(msg.pathname))) { + ret = -1; + goto error; + } + + /* Send command */ + ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg, sizeof(msg), 0); + if (ret < 0) { + goto error; + } + } else { + memset(&msg_2_2, 0, sizeof(msg_2_2)); + /* Compat with relayd 2.2+ */ + if (lttng_strncpy(msg_2_2.channel_name, channel_name, + sizeof(msg_2_2.channel_name))) { + ret = -1; + goto error; + } + if (lttng_strncpy(msg_2_2.pathname, pathname, + sizeof(msg_2_2.pathname))) { + ret = -1; + goto error; + } + msg_2_2.tracefile_size = htobe64(tracefile_size); + msg_2_2.tracefile_count = htobe64(tracefile_count); + + /* Send command */ + ret = send_command(rsock, RELAYD_ADD_STREAM, (void *) &msg_2_2, sizeof(msg_2_2), 0); + if (ret < 0) { + goto error; + } + } + + DBG("Relayd add stream sent for channel name %s.", channel_name); + ret = 0; + +error: + return ret; +} + +int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id) +{ + int ret; + struct lttcomm_relayd_status_stream reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + /* Waiting for reply */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Back to host bytes order. */ + reply.handle = be64toh(reply.handle); + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd add stream replied error %d", reply.ret_code); + } else { + /* Success */ + ret = 0; + *_stream_id = reply.handle; + } + + DBG("Relayd stream added successfully with handle %" PRIu64, + reply.handle); + +error: + return ret; +} + /* * Inform the relay that all the streams for the current channel has been sent. * diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index f090a0db6..4ea2f6345 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -52,4 +52,8 @@ int relayd_send_index(struct lttcomm_relayd_sock *rsock, int relayd_reset_metadata(struct lttcomm_relayd_sock *rsock, uint64_t stream_id, uint64_t version); +int relayd_add_stream_send(struct lttcomm_relayd_sock *sock, const char *channel_name, + const char *pathname, uint64_t tracefile_size, uint64_t tracefile_count); +int relayd_add_stream_rcv(struct lttcomm_relayd_sock *rsock, uint64_t *_stream_id); + #endif /* _RELAYD_H */ diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 97cd27d9d..c9ce0d1b2 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -537,7 +537,6 @@ static int send_sessiond_channel(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; - uint64_t relayd_id = -1ULL; assert(channel); assert(ctx); @@ -546,25 +545,16 @@ static int send_sessiond_channel(int sock, DBG("UST consumer sending channel %s to sessiond", channel->name); if (channel->relayd_id != (uint64_t) -1ULL) { - cds_list_for_each_entry(stream, &channel->streams.head, send_node) { - - health_code_update(); - - /* Try to send the stream to the relayd if one is available. */ - ret = consumer_send_relayd_stream(stream, stream->chan->pathname); - if (ret < 0) { - /* - * Flag that the relayd was the problem here probably due to a - * communicaton error on the socket. - */ - if (relayd_error) { - *relayd_error = 1; - } - ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; - } - if (relayd_id == -1ULL) { - relayd_id = stream->relayd_id; + ret = consumer_send_relayd_channel_bulk(channel); + if (ret < 0) { + /* + * Flag that the relayd was the problem here probably due to a + * communicaton error on the socket. + */ + if (relayd_error) { + *relayd_error = 1; } + ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL; } } -- 2.34.1