X-Git-Url: http://git.efficios.com/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=d1ee84eae836739c3addd70cb7ec63a337af682c;hb=f465e9787f5b70d4a8f3761c60b8d6e4161e9628;hp=d6a8923e19aa1c56667838fe7710def869c57456;hpb=fa7f1bce9446b781b5a5cf22fa2e43d2bcdf24ca;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index d6a8923e1..d1ee84eae 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -862,6 +862,23 @@ static int close_sock(void *data, int *in_fd) return sock->ops->close(sock); } +static int accept_sock(void *data, int *out_fd) +{ + int ret = 0; + /* Socks is an array of in_sock, out_sock. */ + struct lttcomm_sock **socks = data; + struct lttcomm_sock *in_sock = socks[0]; + + socks[1] = in_sock->ops->accept(in_sock); + if (!socks[1]) { + ret = -1; + goto end; + } + *out_fd = socks[1]->fd; +end: + return ret; +} + /* * Create and init socket from uri. */ @@ -919,6 +936,27 @@ error: return NULL; } +static +struct lttcomm_sock *accept_relayd_sock(struct lttcomm_sock *listening_sock, + const char *name) +{ + int out_fd, ret; + struct lttcomm_sock *socks[2] = { listening_sock, NULL }; + struct lttcomm_sock *new_sock = NULL; + + ret = fd_tracker_open_unsuspendable_fd( + the_fd_tracker, &out_fd, + (const char **) &name, + 1, accept_sock, &socks); + if (ret) { + goto end; + } + new_sock = socks[1]; + DBG("%s accepted, socket %d", name, new_sock->fd); +end: + return new_sock; +} + /* * This thread manages the listening for new connections on the network */ @@ -1025,20 +1063,18 @@ restart: */ int val = 1; struct relay_connection *new_conn; - struct lttcomm_sock *newsock; + struct lttcomm_sock *newsock = NULL; enum connection_type type; if (pollfd == data_sock->fd) { type = RELAY_DATA; - newsock = data_sock->ops->accept(data_sock); - DBG("Relay data connection accepted, socket %d", - newsock->fd); + newsock = accept_relayd_sock(data_sock, + "Data socket to relayd"); } else { assert(pollfd == control_sock->fd); type = RELAY_CONTROL; - newsock = control_sock->ops->accept(control_sock); - DBG("Relay control connection accepted, socket %d", - newsock->fd); + newsock = accept_relayd_sock(control_sock, + "Control socket to relayd"); } if (!newsock) { PERROR("accepting sock"); @@ -1557,9 +1593,8 @@ int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr, goto end_unlock; } - ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, - 0, 0, -1, -1, stream->stream_fd->fd, NULL, - &stream->stream_fd->fd); + ret = stream_fd_rotate(stream->stream_fd, + stream->path_name, stream->channel_name, 0, 0, NULL); if (ret < 0) { ERR("Failed to rotate metadata file %s of channel %s", stream->path_name, stream->channel_name); @@ -1679,6 +1714,7 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_metadata_payload metadata_payload_header; struct relay_stream *metadata_stream; uint64_t metadata_payload_size; + int metadata_fd = -1; if (!session) { ERR("Metadata sent before version check"); @@ -1709,20 +1745,24 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, pthread_mutex_lock(&metadata_stream->lock); - size_ret = lttng_write(metadata_stream->stream_fd->fd, + metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd); + if (metadata_fd < 0) { + goto end_put; + } + size_ret = lttng_write(metadata_fd, payload->data + sizeof(metadata_payload_header), metadata_payload_size); if (size_ret < metadata_payload_size) { ERR("Relay error writing metadata on file"); ret = -1; - goto end_put; + goto end_put_fd; } - size_ret = write_padding_to_file(metadata_stream->stream_fd->fd, + size_ret = write_padding_to_file(metadata_fd, metadata_payload_header.padding_size); if (size_ret < (int64_t) metadata_payload_header.padding_size) { ret = -1; - goto end_put; + goto end_put_fd; } metadata_stream->metadata_received += @@ -1730,6 +1770,8 @@ static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr, DBG2("Relay metadata written. Updated metadata_received %" PRIu64, metadata_stream->metadata_received); +end_put_fd: + stream_fd_put_fd(metadata_stream->stream_fd); end_put: pthread_mutex_unlock(&metadata_stream->lock); stream_put(metadata_stream); @@ -2575,14 +2617,14 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, /* Put ref on previous index_file. */ if (stream->index_file) { - lttng_index_file_put(stream->index_file); + relay_index_file_put(stream->index_file); stream->index_file = NULL; } major = stream->trace->session->major; minor = stream->trace->session->minor; - stream->index_file = lttng_index_file_create(stream->path_name, + stream->index_file = relay_index_file_create(stream->path_name, stream->channel_name, - -1, -1, stream->tracefile_size, + stream->tracefile_size, tracefile_array_get_file_index_head(stream->tfa), lttng_to_index_major(major, minor), lttng_to_index_minor(major, minor)); @@ -2710,11 +2752,9 @@ static enum relay_connection_status relay_process_data_receive_header( /* new_id is updated by utils_rotate_stream_file. */ new_id = old_id; - ret = utils_rotate_stream_file(stream->path_name, + ret = stream_fd_rotate(stream->stream_fd, stream->path_name, stream->channel_name, stream->tracefile_size, - stream->tracefile_count, -1, - -1, stream->stream_fd->fd, - &new_id, &stream->stream_fd->fd); + stream->tracefile_count, &new_id); if (ret < 0) { ERR("Failed to rotate stream output file"); status = RELAY_CONNECTION_STATUS_ERROR; @@ -2751,6 +2791,7 @@ static enum relay_connection_status relay_process_data_receive_payload( bool new_stream = false, close_requested = false; uint64_t left_to_receive = state->left_to_receive; struct relay_session *session; + int stream_fd = -1; DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->header.net_seq_num, @@ -2775,6 +2816,12 @@ static enum relay_connection_status relay_process_data_receive_payload( } } + stream_fd = stream_fd_get_fd(stream->stream_fd); + if (stream_fd < 0) { + status = RELAY_CONNECTION_STATUS_ERROR; + goto end_stream_unlock; + } + /* * The size of the "chunk" received on any iteration is bounded by: * - the data left to receive, @@ -2792,7 +2839,7 @@ static enum relay_connection_status relay_process_data_receive_payload( PERROR("Socket %d error", conn->sock->fd); status = RELAY_CONNECTION_STATUS_ERROR; } - goto end_stream_unlock; + goto end_put_fd; } else if (ret == 0) { /* No more data ready to be consumed on socket. */ DBG3("No more data ready for consumption on data socket of stream id %" PRIu64, @@ -2810,12 +2857,12 @@ static enum relay_connection_status relay_process_data_receive_payload( recv_size = ret; /* Write data to stream output fd. */ - write_ret = lttng_write(stream->stream_fd->fd, data_buffer, + write_ret = lttng_write(stream_fd, data_buffer, recv_size); if (write_ret < (ssize_t) recv_size) { ERR("Relay error writing data to file"); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } left_to_receive -= recv_size; @@ -2834,17 +2881,17 @@ static enum relay_connection_status relay_process_data_receive_payload( DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive", state->header.stream_id, state->received, state->left_to_receive); - goto end_stream_unlock; + goto end_put_fd; } - ret = write_padding_to_file(stream->stream_fd->fd, + ret = write_padding_to_file(stream_fd, state->header.padding_size); if ((int64_t) ret < (int64_t) state->header.padding_size) { ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d", stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } @@ -2856,7 +2903,7 @@ static enum relay_connection_status relay_process_data_receive_payload( stream->stream_handle, state->header.net_seq_num, ret); status = RELAY_CONNECTION_STATUS_ERROR; - goto end_stream_unlock; + goto end_put_fd; } } @@ -2877,6 +2924,8 @@ static enum relay_connection_status relay_process_data_receive_payload( connection_reset_protocol_state(conn); state = NULL; +end_put_fd: + stream_fd_put_fd(stream->stream_fd); end_stream_unlock: close_requested = stream->close_requested; pthread_mutex_unlock(&stream->lock); @@ -2924,7 +2973,8 @@ static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollf (void) lttng_poll_del(events, pollfd); - ret = close(pollfd); + ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1, + fd_tracker_util_close_fd, NULL); if (ret < 0) { ERR("Closing pollfd %d", pollfd); }