X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=0f584487a09819b58de9602718bb9a197ffc59ef;hb=0048db42e2bd47a76ea60653df79aff94debb5bb;hp=123731e171c662cef4e69f4bacedbd42681be508;hpb=d2d615465f45574460e3846658c67ccc0db8681e;p=deliverable%2Flttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 123731e17..0f584487a 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1593,9 +1593,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock; } - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, - 0, 0, -1, -1, stream->stream_fd->fd, NULL, - &stream->stream_fd->fd); + ret = stream_fd_rotate(stream->stream_fd, + stream->path_name, stream->channel_name, 0, 0, NULL); if (ret < 0) { ERR("Failed to rotate metadata file %s of channel %s", stream->path_name, stream->channel_name); @@ -1715,6 +1714,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload metadata_payload_header; struct relay_stream *metadata_stream; uint64_t metadata_payload_size; + int metadata_fd = -1; if (!session) { ERR("Metadata sent before version check"); @@ -1745,20 +1745,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&metadata_stream->lock); - size_ret = lttng_write(metadata_stream->stream_fd->fd, + metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd); + if (metadata_fd < 0) { + goto end_put; + } + size_ret = lttng_write(metadata_fd, payload->data + sizeof(metadata_payload_header), metadata_payload_size); if (size_ret < metadata_payload_size) { ERR("Relay error writing metadata on file"); ret = -1; - goto end_put; + goto end_put_fd; } - size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_fd, metadata_payload_header.padding_size); if (size_ret < (int64_t) metadata_payload_header.padding_size) { ret = -1; - goto end_put; + goto end_put_fd; } metadata_stream->metadata_received += @@ -1766,6 +1770,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); +end_put_fd: + stream_fd_put_fd(metadata_stream->stream_fd); end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -2746,11 +2752,9 @@ static enum relay_connection_status relay_process_data_receive_header( /* new_id is updated by utils_rotate_stream_file. */ new_id = old_id; - ret = utils_rotate_stream_file(stream->path_name, + ret = stream_fd_rotate(stream->stream_fd, stream->path_name, stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, - -1, stream->stream_fd->fd, - &new_id, &stream->stream_fd->fd); + stream->tracefile_count, &new_id); if (ret < 0) { ERR("Failed to rotate stream output file"); status = RELAY_CONNECTION_STATUS_ERROR; @@ -2787,6 +2791,7 @@ static enum relay_connection_status relay_process_data_receive_payload( bool new_stream = false, close_requested = false; uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + int stream_fd = -1; DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->header.net_seq_num, @@ -2811,6 +2816,12 @@ static enum relay_connection_status relay_process_data_receive_payload( } } + stream_fd = stream_fd_get_fd(stream->stream_fd); + if (stream_fd < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + /* * The size of the "chunk" received on any iteration is bounded by: * - the data left to receive, @@ -2828,7 +2839,7 @@ static enum relay_connection_status relay_process_data_receive_payload( PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } - goto end_stream_unlock; + goto end_put_fd; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, @@ -2846,12 +2857,12 @@ static enum relay_connection_status relay_process_data_receive_payload( recv_size = ret; /* Write data to stream output fd. */ - write_ret = lttng_write(stream->stream_fd->fd, data_buffer, + write_ret = lttng_write(stream_fd, data_buffer, recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } left_to_receive -= recv_size; @@ -2870,17 +2881,17 @@ static enum relay_connection_status relay_process_data_receive_payload( DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - goto end_stream_unlock; + goto end_put_fd; } - ret = write_padding_to_file(stream->stream_fd->fd, + ret = write_padding_to_file(stream_fd, state->header.padding_size); if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } @@ -2892,7 +2903,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } } @@ -2913,6 +2924,8 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; +end_put_fd: + stream_fd_put_fd(stream->stream_fd); end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock);