goto exit;
}
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("socket poll error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
/*
* A new connection is requested, therefore a
* sessiond/consumerd connection is allocated in
* exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("socket poll error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
pthread_mutex_unlock(&stream->lock);
- stream_close(stream);
-
+ /*
+ * This is one of the conditions which may trigger a stream close
+ * with the others being:
+ * 1) A close command is received for a stream
+ * 2) The control connection owning the stream is closed
+ * 3) We have received all of the stream's data _after_ a close
+ * request.
+ */
+ try_stream_close(stream);
if (stream->is_metadata) {
struct relay_viewer_stream *vstream;
static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret = htobe32(LTTNG_OK);
+ int ret = 0;
ssize_t size_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
}
memset(data_buffer, 0, data_size);
DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
- ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (ret < 0 || ret != data_size) {
- if (ret == 0) {
+ size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (size_ret < 0 || size_ret != data_size) {
+ if (size_ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
} else {
goto end_put;
}
- ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
be32toh(metadata_struct->padding_size));
- if (ret < 0) {
+ if (size_ret < 0) {
goto end_put;
}
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
-
end:
return ret;
}
uint64_t net_seq_num;
uint32_t data_size;
struct relay_session *session;
- bool new_stream = false;
+ bool new_stream = false, close_requested = false;
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
stream->prev_seq = net_seq_num;
end_stream_unlock:
+ close_requested = stream->close_requested;
pthread_mutex_unlock(&stream->lock);
+ if (close_requested) {
+ try_stream_close(stream);
+ }
+
if (new_stream) {
pthread_mutex_lock(&session->lock);
uatomic_set(&session->new_streams, 1);
/* Inspect the relay conn pipe for new connection */
if (pollfd == relay_conn_pipe[0]) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Relay connection pipe error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
struct relay_connection *conn;
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
LPOLLIN | LPOLLRDHUP);
connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Relay connection pipe error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
} else {
struct relay_connection *ctrl_conn;
/* If not found, there is a synchronization issue. */
assert(ctrl_conn);
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- relay_thread_close_connection(&events, pollfd, ctrl_conn);
- if (last_seen_data_fd == pollfd) {
- last_seen_data_fd = last_notdel_data_fd;
- }
- } else if (revents & LPOLLIN) {
- if (ctrl_conn->type == RELAY_CONTROL) {
- ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
- sizeof(recv_hdr), 0);
- if (ret <= 0) {
- /* Connection closed */
- relay_thread_close_connection(&events, pollfd,
- ctrl_conn);
- } else {
- ret = relay_process_control(&recv_hdr, ctrl_conn);
- if (ret < 0) {
- /* Clear the session on error. */
- relay_thread_close_connection(&events, pollfd,
- ctrl_conn);
- }
- seen_control = 1;
- }
- } else {
+ if (ctrl_conn->type == RELAY_DATA) {
+ if (revents & LPOLLIN) {
/*
* Flag the last seen data fd not deleted. It will be
* used as the last seen fd if any fd gets deleted in
*/
last_notdel_data_fd = pollfd;
}
+ goto put_ctrl_connection;
+ }
+ assert(ctrl_conn->type == RELAY_CONTROL);
+
+ if (revents & LPOLLIN) {
+ ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock,
+ &recv_hdr, sizeof(recv_hdr), 0);
+ if (ret <= 0) {
+ /* Connection closed */
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
+ } else {
+ ret = relay_process_control(&recv_hdr, ctrl_conn);
+ if (ret < 0) {
+ /* Clear the session on error. */
+ relay_thread_close_connection(&events,
+ pollfd, ctrl_conn);
+ }
+ seen_control = 1;
+ }
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ relay_thread_close_connection(&events,
+ pollfd, ctrl_conn);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else {
- ERR("Unknown poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for control sock %d",
+ revents, pollfd);
+ connection_put(ctrl_conn);
+ goto error;
}
+ put_ctrl_connection:
connection_put(ctrl_conn);
}
}
/* Skip it. Might be removed before. */
continue;
}
+ if (data_conn->type == RELAY_CONTROL) {
+ goto put_data_connection;
+ }
+ assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
- if (data_conn->type != RELAY_DATA) {
- goto put_connection;
- }
-
ret = relay_process_data(data_conn);
/* Connection closed */
if (ret < 0) {
relay_thread_close_connection(&events, pollfd,
- data_conn);
+ data_conn);
/*
* Every goto restart call sets the last seen fd where
* here we don't really care since we gracefully
connection_put(data_conn);
goto restart;
}
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ relay_thread_close_connection(&events, pollfd,
+ data_conn);
+ } else {
+ ERR("Unknown poll events %u for data sock %d",
+ revents, pollfd);
}
- put_connection:
+ put_data_connection:
connection_put(data_conn);
}
last_seen_data_fd = -1;