static
void *relay_thread_worker(void *data)
{
- int i, ret, pollfd, err = -1;
- uint32_t revents, nb_fd;
+ int ret, err = -1, last_seen_data_fd = -1;
+ uint32_t nb_fd;
struct relay_command *relay_connection;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
goto error;
}
+restart:
while (1) {
+ int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+
/* Infinite blocking call, waiting for transmission */
- restart:
DBG3("Relayd worker thread polling...");
ret = lttng_poll_wait(&events, -1);
if (ret < 0) {
nb_fd = ret;
+ /*
+ * Process control. The control connection is prioritised so we don't
+ * starve it with high throughout put tracing data on the data
+ * connection.
+ */
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
- revents = LTTNG_POLL_GETEV(&events, i);
- pollfd = LTTNG_POLL_GETFD(&events, i);
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
/* Thread quit pipe has been closed. Killing thread. */
ret = check_thread_quit_pipe(pollfd, revents);
goto error;
}
}
- } else if (revents > 0) {
+ } else if (revents) {
rcu_read_lock();
lttng_ht_lookup(relay_connections_ht,
(void *)((unsigned long) pollfd),
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else if (revents & (LPOLLHUP | LPOLLRDHUP)) {
DBG("Socket %d hung up", pollfd);
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else if (revents & LPOLLIN) {
/* control socket */
if (relay_connection->type == RELAY_CONTROL) {
ret = relay_process_control(&recv_hdr,
relay_connection,
streams_ht);
- /*
- * there was an error in processing a control
- * command: clear the session
- * */
if (ret < 0) {
+ /* Clear the session on error. */
relay_cleanup_poll_connection(&events, pollfd);
relay_del_connection(relay_connections_ht,
streams_ht, &iter,
relay_connection);
DBG("Connection closed with %d", pollfd);
}
+ seen_control = 1;
}
- /* data socket */
- } else if (relay_connection->type == RELAY_DATA) {
- ret = relay_process_data(relay_connection, streams_ht);
- /* connection closed */
- if (ret < 0) {
- relay_cleanup_poll_connection(&events, pollfd);
- relay_del_connection(relay_connections_ht,
- streams_ht, &iter,
- relay_connection);
- DBG("Data connection closed with %d", pollfd);
- }
+ } else {
+ /*
+ * Flag the last seen data fd not deleted. It will be
+ * used as the last seen fd if any fd gets deleted in
+ * this first loop.
+ */
+ last_notdel_data_fd = pollfd;
+ }
+ }
+ rcu_read_unlock();
+ }
+ }
+
+ /*
+ * The last loop handled a control request, go back to poll to make
+ * sure we prioritise the control socket.
+ */
+ if (seen_control) {
+ continue;
+ }
+
+ if (last_seen_data_fd >= 0) {
+ for (i = 0; i < nb_fd; i++) {
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+ if (last_seen_data_fd == pollfd) {
+ idx = i;
+ break;
+ }
+ }
+ }
+
+ /* Process data connection. */
+ for (i = idx + 1; i < nb_fd; i++) {
+ /* Fetch the poll data. */
+ uint32_t revents = LTTNG_POLL_GETEV(&events, i);
+ int pollfd = LTTNG_POLL_GETFD(&events, i);
+
+ /* Skip the command pipe. It's handled in the first loop. */
+ if (pollfd == relay_cmd_pipe[0]) {
+ continue;
+ }
+
+ if (revents) {
+ rcu_read_lock();
+ lttng_ht_lookup(relay_connections_ht,
+ (void *)((unsigned long) pollfd),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ /* Skip it. Might be removed before. */
+ rcu_read_unlock();
+ continue;
+ }
+ relay_connection = caa_container_of(node,
+ struct relay_command, sock_n);
+
+ if (revents & LPOLLIN) {
+ if (relay_connection->type != RELAY_DATA) {
+ continue;
+ }
+
+ ret = relay_process_data(relay_connection, streams_ht);
+ /* connection closed */
+ if (ret < 0) {
+ relay_cleanup_poll_connection(&events, pollfd);
+ relay_del_connection(relay_connections_ht,
+ streams_ht, &iter,
+ relay_connection);
+ DBG("Data connection closed with %d", pollfd);
+ /*
+ * Every goto restart call sets the last seen fd where
+ * here we don't really care since we gracefully
+ * continue the loop after the connection is deleted.
+ */
+ } else {
+ /* Keep last seen port. */
+ last_seen_data_fd = pollfd;
+ rcu_read_unlock();
+ goto restart;
}
}
rcu_read_unlock();
}
}
+ last_seen_data_fd = -1;
}
exit: