+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static int relay_process_data(struct relay_connection *conn)
+{
+ int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
+ struct relay_stream *stream;
+ struct lttcomm_relayd_data_hdr data_hdr;
+ uint64_t stream_id;
+ uint64_t net_seq_num;
+ uint32_t data_size;
+ struct relay_session *session;
+ bool new_stream = false, close_requested = false;
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ stream_id = be64toh(data_hdr.stream_id);
+ stream = stream_get_by_id(stream_id);
+ if (!stream) {
+ ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
+ ret = -1;
+ goto end;
+ }
+ session = stream->trace->session;
+ data_size = be32toh(data_hdr.data_size);
+ if (data_buffer_size < data_size) {
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
+ ERR("Allocating data buffer");
+ free(data_buffer);
+ ret = -1;
+ goto end_stream_put;
+ }
+ data_buffer = tmp_data_ptr;
+ data_buffer_size = data_size;
+ }
+ memset(data_buffer, 0, data_size);
+
+ net_seq_num = be64toh(data_hdr.net_seq_num);
+
+ DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ data_size, stream_id, net_seq_num);
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Socket %d error %d", conn->sock->fd, ret);
+ }
+ ret = -1;
+ goto end_stream_put;
+ }
+
+ pthread_mutex_lock(&stream->lock);
+
+ /* Check if a rotation is needed. */
+ if (stream->tracefile_size > 0 &&
+ (stream->tracefile_size_current + data_size) >
+ stream->tracefile_size) {
+ uint64_t old_id, new_id;
+
+ old_id = tracefile_array_get_file_index_head(stream->tfa);
+ tracefile_array_file_rotate(stream->tfa);
+
+ /* new_id is updated by utils_rotate_stream_file. */
+ new_id = old_id;
+
+ ret = utils_rotate_stream_file(stream->path_name,
+ stream->channel_name, stream->tracefile_size,
+ stream->tracefile_count, -1,
+ -1, stream->stream_fd->fd,
+ &new_id, &stream->stream_fd->fd);
+ if (ret < 0) {
+ ERR("Rotating stream output file");
+ goto end_stream_unlock;
+ }
+ /*
+ * Reset current size because we just performed a stream
+ * rotation.
+ */
+ stream->tracefile_size_current = 0;
+ rotate_index = 1;
+ }
+
+ /*
+ * Index are handled in protocol version 2.4 and above. Also,
+ * snapshot and index are NOT supported.
+ */
+ if (session->minor >= 4 && !session->snapshot) {
+ ret = handle_index_data(stream, net_seq_num, rotate_index);
+ if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
+ goto end_stream_unlock;
+ }
+ }
+
+ /* Write data to stream output fd. */
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
+ if (size_ret < data_size) {
+ ERR("Relay error writing data to file");
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ size_ret, stream->stream_handle);
+
+ ret = write_padding_to_file(stream->stream_fd->fd,
+ be32toh(data_hdr.padding_size));
+ if (ret < 0) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
+ goto end_stream_unlock;
+ }
+ stream->tracefile_size_current +=
+ data_size + be32toh(data_hdr.padding_size);
+ if (stream->prev_seq == -1ULL) {
+ new_stream = true;
+ }
+
+ stream->prev_seq = net_seq_num;
+
+end_stream_unlock:
+ close_requested = stream->close_requested;
+ pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
+ if (new_stream) {
+ pthread_mutex_lock(&session->lock);
+ uatomic_set(&session->new_streams, 1);
+ pthread_mutex_unlock(&session->lock);
+ }
+end_stream_put:
+ stream_put(stream);
+end:
+ return ret;
+}
+
+static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd)