Backport: relayd: replace lttng_index_file with relay_index_file
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 30eb5c7aee48a3a648686fd7c4745e82fe94954d..d1ee84eae836739c3addd70cb7ec63a337af682c 100644 (file)
@@ -1073,9 +1073,8 @@ restart:
                                } else {
                                        assert(pollfd == control_sock->fd);
                                        type = RELAY_CONTROL;
-                                       newsock = control_sock->ops->accept(control_sock);
-                                       DBG("Relay control connection accepted, socket %d",
-                                                       newsock->fd);
+                                       newsock = accept_relayd_sock(control_sock,
+                                                       "Control socket to relayd");
                                }
                                if (!newsock) {
                                        PERROR("accepting sock");
@@ -1594,9 +1593,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
                goto end_unlock;
        }
 
-       ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
-                       0, 0, -1, -1, stream->stream_fd->fd, NULL,
-                       &stream->stream_fd->fd);
+       ret = stream_fd_rotate(stream->stream_fd,
+                       stream->path_name, stream->channel_name, 0, 0, NULL);
        if (ret < 0) {
                ERR("Failed to rotate metadata file %s of channel %s",
                                stream->path_name, stream->channel_name);
@@ -1716,6 +1714,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        struct lttcomm_relayd_metadata_payload metadata_payload_header;
        struct relay_stream *metadata_stream;
        uint64_t metadata_payload_size;
+       int metadata_fd = -1;
 
        if (!session) {
                ERR("Metadata sent before version check");
@@ -1746,20 +1745,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
 
        pthread_mutex_lock(&metadata_stream->lock);
 
-       size_ret = lttng_write(metadata_stream->stream_fd->fd,
+       metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+       if (metadata_fd < 0) {
+               goto end_put;
+       }
+       size_ret = lttng_write(metadata_fd,
                        payload->data + sizeof(metadata_payload_header),
                        metadata_payload_size);
        if (size_ret < metadata_payload_size) {
                ERR("Relay error writing metadata on file");
                ret = -1;
-               goto end_put;
+               goto end_put_fd;
        }
 
-       size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+       size_ret = write_padding_to_file(metadata_fd,
                        metadata_payload_header.padding_size);
        if (size_ret < (int64_t) metadata_payload_header.padding_size) {
                ret = -1;
-               goto end_put;
+               goto end_put_fd;
        }
 
        metadata_stream->metadata_received +=
@@ -1767,6 +1770,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
        DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
                metadata_stream->metadata_received);
 
+end_put_fd:
+       stream_fd_put_fd(metadata_stream->stream_fd);
 end_put:
        pthread_mutex_unlock(&metadata_stream->lock);
        stream_put(metadata_stream);
@@ -2612,14 +2617,14 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
 
                /* Put ref on previous index_file. */
                if (stream->index_file) {
-                       lttng_index_file_put(stream->index_file);
+                       relay_index_file_put(stream->index_file);
                        stream->index_file = NULL;
                }
                major = stream->trace->session->major;
                minor = stream->trace->session->minor;
-               stream->index_file = lttng_index_file_create(stream->path_name,
+               stream->index_file = relay_index_file_create(stream->path_name,
                                stream->channel_name,
-                               -1, -1, stream->tracefile_size,
+                               stream->tracefile_size,
                                tracefile_array_get_file_index_head(stream->tfa),
                                lttng_to_index_major(major, minor),
                                lttng_to_index_minor(major, minor));
@@ -2747,11 +2752,9 @@ static enum relay_connection_status relay_process_data_receive_header(
                /* new_id is updated by utils_rotate_stream_file. */
                new_id = old_id;
 
-               ret = utils_rotate_stream_file(stream->path_name,
+               ret = stream_fd_rotate(stream->stream_fd, stream->path_name,
                                stream->channel_name, stream->tracefile_size,
-                               stream->tracefile_count, -1,
-                               -1, stream->stream_fd->fd,
-                               &new_id, &stream->stream_fd->fd);
+                               stream->tracefile_count, &new_id);
                if (ret < 0) {
                        ERR("Failed to rotate stream output file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
@@ -2788,6 +2791,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
        bool new_stream = false, close_requested = false;
        uint64_t left_to_receive = state->left_to_receive;
        struct relay_session *session;
+       int stream_fd = -1;
 
        DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
                        state->header.stream_id, state->header.net_seq_num,
@@ -2812,6 +2816,12 @@ static enum relay_connection_status relay_process_data_receive_payload(
                }
        }
 
+       stream_fd = stream_fd_get_fd(stream->stream_fd);
+       if (stream_fd < 0) {
+               status = RELAY_CONNECTION_STATUS_ERROR;
+               goto end_stream_unlock;
+       }
+
        /*
         * The size of the "chunk" received on any iteration is bounded by:
         *   - the data left to receive,
@@ -2829,7 +2839,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
                                PERROR("Socket %d error", conn->sock->fd);
                                status = RELAY_CONNECTION_STATUS_ERROR;
                        }
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                } else if (ret == 0) {
                        /* No more data ready to be consumed on socket. */
                        DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
@@ -2847,12 +2857,12 @@ static enum relay_connection_status relay_process_data_receive_payload(
                recv_size = ret;
 
                /* Write data to stream output fd. */
-               write_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+               write_ret = lttng_write(stream_fd, data_buffer,
                                recv_size);
                if (write_ret < (ssize_t) recv_size) {
                        ERR("Relay error writing data to file");
                        status = RELAY_CONNECTION_STATUS_ERROR;
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                }
 
                left_to_receive -= recv_size;
@@ -2871,17 +2881,17 @@ static enum relay_connection_status relay_process_data_receive_payload(
                DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
                                state->header.stream_id, state->received,
                                state->left_to_receive);
-               goto end_stream_unlock;
+               goto end_put_fd;
        }
 
-       ret = write_padding_to_file(stream->stream_fd->fd,
+       ret = write_padding_to_file(stream_fd,
                        state->header.padding_size);
        if ((int64_t) ret < (int64_t) state->header.padding_size) {
                ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
                                stream->stream_handle,
                                state->header.net_seq_num, ret);
                status = RELAY_CONNECTION_STATUS_ERROR;
-               goto end_stream_unlock;
+               goto end_put_fd;
        }
 
 
@@ -2893,7 +2903,7 @@ static enum relay_connection_status relay_process_data_receive_payload(
                                        stream->stream_handle,
                                        state->header.net_seq_num, ret);
                        status = RELAY_CONNECTION_STATUS_ERROR;
-                       goto end_stream_unlock;
+                       goto end_put_fd;
                }
        }
 
@@ -2914,6 +2924,8 @@ static enum relay_connection_status relay_process_data_receive_payload(
        connection_reset_protocol_state(conn);
        state = NULL;
 
+end_put_fd:
+       stream_fd_put_fd(stream->stream_fd);
 end_stream_unlock:
        close_requested = stream->close_requested;
        pthread_mutex_unlock(&stream->lock);
This page took 0.028491 seconds and 5 git commands to generate.