projects
/
lttng-tools.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Backport: relayd: use the fd-tracker to track stream_fd fds
[lttng-tools.git]
/
src
/
bin
/
lttng-relayd
/
main.c
diff --git
a/src/bin/lttng-relayd/main.c
b/src/bin/lttng-relayd/main.c
index 123731e171c662cef4e69f4bacedbd42681be508..0f584487a09819b58de9602718bb9a197ffc59ef 100644
(file)
--- a/
src/bin/lttng-relayd/main.c
+++ b/
src/bin/lttng-relayd/main.c
@@
-1593,9
+1593,8
@@
int relay_reset_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
goto end_unlock;
}
goto end_unlock;
}
- ret = utils_rotate_stream_file(stream->path_name, stream->channel_name,
- 0, 0, -1, -1, stream->stream_fd->fd, NULL,
- &stream->stream_fd->fd);
+ ret = stream_fd_rotate(stream->stream_fd,
+ stream->path_name, stream->channel_name, 0, 0, NULL);
if (ret < 0) {
ERR("Failed to rotate metadata file %s of channel %s",
stream->path_name, stream->channel_name);
if (ret < 0) {
ERR("Failed to rotate metadata file %s of channel %s",
stream->path_name, stream->channel_name);
@@
-1715,6
+1714,7
@@
static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
struct lttcomm_relayd_metadata_payload metadata_payload_header;
struct relay_stream *metadata_stream;
uint64_t metadata_payload_size;
+ int metadata_fd = -1;
if (!session) {
ERR("Metadata sent before version check");
if (!session) {
ERR("Metadata sent before version check");
@@
-1745,20
+1745,24
@@
static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
pthread_mutex_lock(&metadata_stream->lock);
pthread_mutex_lock(&metadata_stream->lock);
- size_ret = lttng_write(metadata_stream->stream_fd->fd,
+ metadata_fd = stream_fd_get_fd(metadata_stream->stream_fd);
+ if (metadata_fd < 0) {
+ goto end_put;
+ }
+ size_ret = lttng_write(metadata_fd,
payload->data + sizeof(metadata_payload_header),
metadata_payload_size);
if (size_ret < metadata_payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
payload->data + sizeof(metadata_payload_header),
metadata_payload_size);
if (size_ret < metadata_payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
- goto end_put;
+ goto end_put
_fd
;
}
}
- size_ret = write_padding_to_file(metadata_
stream->stream_fd->
fd,
+ size_ret = write_padding_to_file(metadata_fd,
metadata_payload_header.padding_size);
if (size_ret < (int64_t) metadata_payload_header.padding_size) {
ret = -1;
metadata_payload_header.padding_size);
if (size_ret < (int64_t) metadata_payload_header.padding_size) {
ret = -1;
- goto end_put;
+ goto end_put
_fd
;
}
metadata_stream->metadata_received +=
}
metadata_stream->metadata_received +=
@@
-1766,6
+1770,8
@@
static int relay_recv_metadata(const struct lttcomm_relayd_hdr *recv_hdr,
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
DBG2("Relay metadata written. Updated metadata_received %" PRIu64,
metadata_stream->metadata_received);
+end_put_fd:
+ stream_fd_put_fd(metadata_stream->stream_fd);
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
@@
-2746,11
+2752,9
@@
static enum relay_connection_status relay_process_data_receive_header(
/* new_id is updated by utils_rotate_stream_file. */
new_id = old_id;
/* new_id is updated by utils_rotate_stream_file. */
new_id = old_id;
- ret =
utils_rotate_stream_file(
stream->path_name,
+ ret =
stream_fd_rotate(stream->stream_fd,
stream->path_name,
stream->channel_name, stream->tracefile_size,
stream->channel_name, stream->tracefile_size,
- stream->tracefile_count, -1,
- -1, stream->stream_fd->fd,
- &new_id, &stream->stream_fd->fd);
+ stream->tracefile_count, &new_id);
if (ret < 0) {
ERR("Failed to rotate stream output file");
status = RELAY_CONNECTION_STATUS_ERROR;
if (ret < 0) {
ERR("Failed to rotate stream output file");
status = RELAY_CONNECTION_STATUS_ERROR;
@@
-2787,6
+2791,7
@@
static enum relay_connection_status relay_process_data_receive_payload(
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
bool new_stream = false, close_requested = false;
uint64_t left_to_receive = state->left_to_receive;
struct relay_session *session;
+ int stream_fd = -1;
DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->header.net_seq_num,
DBG3("Receiving data for stream id %" PRIu64 " seqnum %" PRIu64 ", %" PRIu64" bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->header.net_seq_num,
@@
-2811,6
+2816,12
@@
static enum relay_connection_status relay_process_data_receive_payload(
}
}
}
}
+ stream_fd = stream_fd_get_fd(stream->stream_fd);
+ if (stream_fd < 0) {
+ status = RELAY_CONNECTION_STATUS_ERROR;
+ goto end_stream_unlock;
+ }
+
/*
* The size of the "chunk" received on any iteration is bounded by:
* - the data left to receive,
/*
* The size of the "chunk" received on any iteration is bounded by:
* - the data left to receive,
@@
-2828,7
+2839,7
@@
static enum relay_connection_status relay_process_data_receive_payload(
PERROR("Socket %d error", conn->sock->fd);
status = RELAY_CONNECTION_STATUS_ERROR;
}
PERROR("Socket %d error", conn->sock->fd);
status = RELAY_CONNECTION_STATUS_ERROR;
}
- goto end_
stream_unlock
;
+ goto end_
put_fd
;
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
} else if (ret == 0) {
/* No more data ready to be consumed on socket. */
DBG3("No more data ready for consumption on data socket of stream id %" PRIu64,
@@
-2846,12
+2857,12
@@
static enum relay_connection_status relay_process_data_receive_payload(
recv_size = ret;
/* Write data to stream output fd. */
recv_size = ret;
/* Write data to stream output fd. */
- write_ret = lttng_write(stream
->stream_fd->
fd, data_buffer,
+ write_ret = lttng_write(stream
_
fd, data_buffer,
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
recv_size);
if (write_ret < (ssize_t) recv_size) {
ERR("Relay error writing data to file");
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_
stream_unlock
;
+ goto end_
put_fd
;
}
left_to_receive -= recv_size;
}
left_to_receive -= recv_size;
@@
-2870,17
+2881,17
@@
static enum relay_connection_status relay_process_data_receive_payload(
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
DBG3("Partial receive on data connection of stream id %" PRIu64 ", %" PRIu64 " bytes received, %" PRIu64 " bytes left to receive",
state->header.stream_id, state->received,
state->left_to_receive);
- goto end_
stream_unlock
;
+ goto end_
put_fd
;
}
}
- ret = write_padding_to_file(stream
->stream_fd->
fd,
+ ret = write_padding_to_file(stream
_
fd,
state->header.padding_size);
if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
state->header.padding_size);
if ((int64_t) ret < (int64_t) state->header.padding_size) {
ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_
stream_unlock
;
+ goto end_
put_fd
;
}
}
@@
-2892,7
+2903,7
@@
static enum relay_connection_status relay_process_data_receive_payload(
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
stream->stream_handle,
state->header.net_seq_num, ret);
status = RELAY_CONNECTION_STATUS_ERROR;
- goto end_
stream_unlock
;
+ goto end_
put_fd
;
}
}
}
}
@@
-2913,6
+2924,8
@@
static enum relay_connection_status relay_process_data_receive_payload(
connection_reset_protocol_state(conn);
state = NULL;
connection_reset_protocol_state(conn);
state = NULL;
+end_put_fd:
+ stream_fd_put_fd(stream->stream_fd);
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
end_stream_unlock:
close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
This page took
0.028693 seconds
and
5
git commands to generate.