Do not consume everything all at once even if there is data left on the
socket. This is to provide fairness to the overall data handling of all
connections.
It also provide a bounded processing execution for a data processing
iteration.
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
&conn->protocol.data.state.receive_payload;
const size_t chunk_size = RECV_DATA_BUFFER_SIZE;
char data_buffer[chunk_size];
&conn->protocol.data.state.receive_payload;
const size_t chunk_size = RECV_DATA_BUFFER_SIZE;
char data_buffer[chunk_size];
- bool partial_recv = false;
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
* - the data immediately available on the socket,
* - the on-stack data buffer
*/
* - the data immediately available on the socket,
* - the on-stack data buffer
*/
- while (left_to_receive > 0 && !partial_recv) {
+ if (left_to_receive > 0) {
ssize_t write_ret;
size_t recv_size = min(left_to_receive, chunk_size);
ssize_t write_ret;
size_t recv_size = min(left_to_receive, chunk_size);
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
status = RELAY_CONNECTION_STATUS_CLOSED;
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
state->header.stream_id);
status = RELAY_CONNECTION_STATUS_CLOSED;
- break;
- } else if (ret < (int) recv_size) {
- /*
- * All the data available on the socket has been
- * consumed.
- */
- partial_recv = true;
+ goto data_left_to_process;
write_ret, stream->stream_handle);
}
write_ret, stream->stream_handle);
}
if (state->left_to_receive > 0) {
/*
* Did not receive all the data expected, wait for more data to
if (state->left_to_receive > 0) {
/*
* Did not receive all the data expected, wait for more data to