+ int ret = 0;
+ uint64_t data_offset;
+ struct relay_index *index;
+
+ /* Get data offset because we are about to update the index. */
+ data_offset = htobe64(stream->tracefile_size_current);
+
+ DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, net_seq_num, stream->tracefile_size_current);
+
+ /*
+ * Lookup for an existing index for that stream id/sequence
+ * number. If it exists, the control thread has already received the
+ * data for it, thus we need to write it to disk.
+ */
+ index = relay_index_get_by_id_or_create(stream, net_seq_num);
+ if (!index) {
+ ret = -1;
+ goto end;
+ }
+
+ if (rotate_index || !stream->index_fd) {
+ int fd;
+
+ /* Put ref on previous index_fd. */
+ if (stream->index_fd) {
+ stream_fd_put(stream->index_fd);
+ stream->index_fd = NULL;
+ }
+
+ fd = index_create_file(stream->path_name, stream->channel_name,
+ -1, -1, stream->tracefile_size,
+ tracefile_array_get_file_index_head(stream->tfa));
+ if (fd < 0) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
+ }
+ stream->index_fd = stream_fd_create(fd);
+ if (!stream->index_fd) {
+ ret = -1;
+ if (close(fd)) {
+ PERROR("Error closing FD %d", fd);
+ }
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ /* Will put the local ref. */
+ goto end;
+ }
+ }
+
+ if (relay_index_set_fd(index, stream->index_fd, data_offset)) {
+ ret = -1;
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ goto end;
+ }
+
+ ret = relay_index_try_flush(index);
+ if (ret == 0) {
+ tracefile_array_commit_seq(stream->tfa);
+ stream->index_received_seqcount++;
+ } else if (ret > 0) {
+ /* No flush. */
+ ret = 0;
+ } else {
+ /* Put self-ref for this index due to error. */
+ relay_index_put(index);
+ ret = -1;
+ }
+end:
+ return ret;
+}
+
+/*
+ * relay_process_data: Process the data received on the data socket
+ */
+static int relay_process_data(struct relay_connection *conn)
+{
+ int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
+ struct relay_stream *stream;
+ struct lttcomm_relayd_data_hdr data_hdr;
+ uint64_t stream_id;
+ uint64_t net_seq_num;
+ uint32_t data_size;
+ struct relay_session *session;
+ bool new_stream = false, close_requested = false;
+
+ ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
+ sizeof(struct lttcomm_relayd_data_hdr), 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Unable to receive data header on sock %d", conn->sock->fd);
+ }
+ ret = -1;
+ goto end;
+ }
+
+ stream_id = be64toh(data_hdr.stream_id);
+ stream = stream_get_by_id(stream_id);
+ if (!stream) {
+ ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
+ ret = -1;
+ goto end;
+ }
+ session = stream->trace->session;
+ data_size = be32toh(data_hdr.data_size);
+ if (data_buffer_size < data_size) {
+ char *tmp_data_ptr;
+
+ tmp_data_ptr = realloc(data_buffer, data_size);
+ if (!tmp_data_ptr) {
+ ERR("Allocating data buffer");
+ free(data_buffer);
+ ret = -1;
+ goto end_stream_put;
+ }
+ data_buffer = tmp_data_ptr;
+ data_buffer_size = data_size;
+ }
+ memset(data_buffer, 0, data_size);
+
+ net_seq_num = be64toh(data_hdr.net_seq_num);