+ pthread_mutex_lock(&stream->lock);
+ session = stream->trace->session;
+ if (!conn->session) {
+ ret = connection_set_session(conn, session);
+ if (ret) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
+
+ /*
+ * The size of the "chunk" received on any iteration is bounded by:
+ * - the data left to receive,
+ * - the data immediately available on the socket,
+ * - the on-stack data buffer
+ */
+ while (left_to_receive > 0 && !partial_recv) {
+ ssize_t write_ret;
+ size_t recv_size = min(left_to_receive, chunk_size);
+
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer,
+ recv_size, MSG_DONTWAIT);
+ if (ret < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ PERROR("Socket %d error", conn->sock->fd);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ }
+ goto end_stream_unlock;
+ } else if (ret == 0) {
+ /* No more data ready to be consumed on socket. */
+ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
+ state->header.stream_id);
+ status = RELAY_CONNECTION_STATUS_CLOSED;
+ break;
+ } else if (ret < (int) recv_size) {
+ /*
+ * All the data available on the socket has been
+ * consumed.
+ */
+ partial_recv = true;
+ }
+
+ recv_size = ret;
+
+ /* Write data to stream output fd. */
+ write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ recv_size);
+ if (write_ret < (ssize_t) recv_size) {
+ ERR("Relay error writing data to file");
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
+ left_to_receive -= recv_size;
+ state->received += recv_size;
+ state->left_to_receive = left_to_receive;
+
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ write_ret, stream->stream_handle);
+ }
+
+ if (state->left_to_receive > 0) {
+ /*
+ * Did not receive all the data expected, wait for more data to
+ * become available on the socket.
+ */
+ DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
+ state->header.stream_id, state->received,
+ state->left_to_receive);
+ goto end_stream_unlock;
+ }
+
+ ret = write_padding_to_file(stream->stream_fd->fd,
+ state->header.padding_size);
+ if ((int64_t) ret < (int64_t) state->header.padding_size) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle,
+ state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
+
+ if (session->minor >= 4 && !session->snapshot) {
+ ret = handle_index_data(stream, state->header.net_seq_num,
+ state->rotate_index);
+ if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle,
+ state->header.net_seq_num, ret);
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+ }
+
+ stream->tracefile_size_current += state->header.data_size +
+ state->header.padding_size;
+
+ if (stream->prev_seq == -1ULL) {
+ new_stream = true;
+ }
+
+ stream->prev_seq = state->header.net_seq_num;
+
+ /*
+ * Resetting the protocol state (to RECEIVE_HEADER) will trash the
+ * contents of *state which are aliased (union) to the same location as
+ * the new state. Don't use it beyond this point.
+ */
+ connection_reset_protocol_state(conn);
+ state = NULL;
+
+end_stream_unlock:
+ close_requested = stream->close_requested;
+ pthread_mutex_unlock(&stream->lock);
+ if (close_requested && left_to_receive == 0) {
+ try_stream_close(stream);
+ }
+
+ if (new_stream) {
+ pthread_mutex_lock(&session->lock);
+ uatomic_set(&session->new_streams, 1);
+ pthread_mutex_unlock(&session->lock);
+ }
+
+ stream_put(stream);
+end:
+ return status;
+}
+
+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static enum relay_connection_status relay_process_data(
+ struct relay_connection *conn)
+{
+ enum relay_connection_status status;
+
+ switch (conn->protocol.data.state_id) {
+ case DATA_CONNECTION_STATE_RECEIVE_HEADER:
+ status = relay_process_data_receive_header(conn);
+ break;
+ case DATA_CONNECTION_STATE_RECEIVE_PAYLOAD:
+ status = relay_process_data_receive_payload(conn);
+ break;
+ default:
+ ERR("Unexpected data connection communication state.");
+ abort();
+ }
+
+ return status;
+}
+
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)
+{
+ int ret;
+
+ (void) lttng_poll_del(events, pollfd);
+
+ ret = close(pollfd);
+ if (ret < 0) {
+ ERR("Closing pollfd %d", pollfd);
+ }
+}
+
+static void relay_thread_close_connection(struct lttng_poll_event *events,
+ int pollfd, struct relay_connection *conn)
+{
+ const char *type_str;
+
+ switch (conn->type) {
+ case RELAY_DATA:
+ type_str = "Data";
+ break;
+ case RELAY_CONTROL:
+ type_str = "Control";
+ break;
+ case RELAY_VIEWER_COMMAND:
+ type_str = "Viewer Command";
+ break;
+ case RELAY_VIEWER_NOTIFICATION:
+ type_str = "Viewer Notification";
+ break;
+ default:
+ type_str = "Unknown";
+ }
+ cleanup_connection_pollfd(events, pollfd);
+ connection_put(conn);
+ DBG("%s connection closed with %d", type_str, pollfd);