+static enum relay_connection_status relay_process_data_receive_payload(
+ struct relay_connection *conn)
+{
+ int ret;
+ enum relay_connection_status status = RELAY_CONNECTION_STATUS_OK;
+ struct relay_stream *stream;
+ struct data_connection_state_receive_payload *state =
+ &conn->protocol.data.state.receive_payload;
+ const size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+ char data_buffer[chunk_size];
+ bool partial_recv = false;
+ bool new_stream = false, close_requested = false;
+ uint64_t left_to_receive = state->left_to_receive;
+ struct relay_session *session;
+ int stream_fd = -1;
+
+ DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->header.net_seq_num,
+ state->received, left_to_receive);
+
+ stream = stream_get_by_id(state->header.stream_id);
+ if (!stream) {
+ /* Protocol error. */
+ ERR("relay_process_data_receive_payload: cannot find stream %" PRIu64,
+ state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+ session = stream->trace->session;
+ if (!conn->session) {
+ ret = connection_set_session(conn, session);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
+
+ stream_fd = stream_fd_get_fd(stream->stream_fd);
+ if (stream_fd < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
+ /*
+ * The size of the "chunk" received on any iteration is bounded by:
+ * - the data left to receive,
+ * - the data immediately available on the socket,
+ * - the on-stack data buffer
+ */
+ while (left_to_receive > 0 && !partial_recv) {
+ ssize_t write_ret;
+ size_t recv_size = min(left_to_receive, chunk_size);
+
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
+ recv_size, MSG_DONTWAIT);
+ if (ret < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Socket %d error", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
+ goto end_put_fd;
+ } else if (ret == 0) {
+ /* No more data ready to be consumed on socket. */
+ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
+ state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
+ break;
+ } else if (ret < (int) recv_size) {
+ /*
+ * All the data available on the socket has been
+ * consumed.
+ */
+ partial_recv = true;
+ }
+
+ recv_size = ret;
+
+ /* Write data to stream output fd. */
+ write_ret = lttng_write(stream_fd, data_buffer,
+ recv_size);
+ if (write_ret < (ssize_t) recv_size) {
+ ERR("Relay error writing data to file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_put_fd;
+ }
+
+ left_to_receive -= recv_size;
+ state->received += recv_size;
+ state->left_to_receive = left_to_receive;
+
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ write_ret, stream->stream_handle);
+ }
+
+ if (state->left_to_receive > 0) {
+ /*
+ * Did not receive all the data expected, wait for more data to
+ * become available on the socket.
+ */
+ DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->received,
+ state->left_to_receive);
+ goto end_put_fd;
+ }
+
+ ret = write_padding_to_file(stream_fd,
+ state->header.padding_size);
+ if ((int64_t) ret < (int64_t) state->header.padding_size) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle,
+ state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_put_fd;
+ }
+
+
+ if (session->minor >= 4 && !session->snapshot) {
+ ret = handle_index_data(stream, state->header.net_seq_num,
+ state->rotate_index);
+ if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle,
+ state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_put_fd;
+ }
+ }
+
+ stream->tracefile_size_current += state->header.data_size +
+ state->header.padding_size;
+
+ if (stream->prev_seq == -1ULL) {
+ new_stream = true;
+ }
+
+ stream->prev_seq = state->header.net_seq_num;
+
+ /*
+ * Resetting the protocol state (to RECEIVE_HEADER) will trash the
+ * contents of *state which are aliased (union) to the same location as
+ * the new state. Don't use it beyond this point.
+ */
+ connection_reset_protocol_state(conn);
+ state = NULL;
+
+end_put_fd:
+ stream_fd_put_fd(stream->stream_fd);
+end_stream_unlock:
+ close_requested = stream->close_requested;
+ pthread_mutex_unlock(&stream->lock);
+ if (close_requested && left_to_receive == 0) {
+ try_stream_close(stream);
+ }
+
+ if (new_stream) {
+ pthread_mutex_lock(&session->lock);
+ uatomic_set(&session->new_streams, 1);
+ pthread_mutex_unlock(&session->lock);
+ }
+
+ stream_put(stream);
+end:
+ return status;
+}
+
+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static enum relay_connection_status relay_process_data(
+ struct relay_connection *conn)
+{
+ enum relay_connection_status status;
+
+ switch (conn->protocol.data.state_id) {
+ case DATA_CONNECTION_STATE_RECEIVE_HEADER:
+ status = relay_process_data_receive_header(conn);
+ break;
+ case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
+ status = relay_process_data_receive_payload(conn);
+ break;
+ default:
+ ERR("Unexpected data connection communication state.");
+ abort();
+ }
+
+ return status;
+}
+
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)