goto end;
}
- DBG("File descriptor count limits are %lu (soft) and %lu (hard)",
- rlimit.rlim_cur, rlimit.rlim_max);
+ DBG("File descriptor count limits are %" PRIu64 " (soft) and %" PRIu64 " (hard)",
+ (uint64_t) rlimit.rlim_cur, (uint64_t) rlimit.rlim_max);
if (lttng_opt_fd_pool_size == -1) {
/* Use default value (soft limit - reserve). */
if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
- ERR("The process' file number limit is too low (%lu). The process' file number limit must be set to at least %i.",
- rlimit.rlim_cur, DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
+ ERR("The process' file number limit is too low (%" PRIu64 "). The process' file number limit must be set to at least %i.",
+ (uint64_t) rlimit.rlim_cur,
+ DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
ret = -1;
goto end;
}
/* no flush. */
ret = 0;
} else {
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
ERR("relay_index_try_flush error %d", ret);
- relay_index_put(index);
ret = -1;
}
/* No flush. */
ret = 0;
} else {
- /* Put self-ref for this index due to error. */
- relay_index_put(index);
- index = NULL;
+ /*
+ * ret < 0
+ *
+ * relay_index_try_flush is responsible for the self-reference
+ * put of the index object on error.
+ */
+ ERR("relay_index_try_flush error %d", ret);
ret = -1;
}
end:
*/
static void *relay_thread_worker(void *data)
{
- int ret, err = -1, last_seen_data_fd = -1;
+ int ret, err = -1;
uint32_t nb_fd;
struct lttng_poll_event events;
struct lttng_ht *relay_connections_ht;
struct lttng_ht_iter iter;
- struct relay_connection *destroy_conn = NULL;
+ struct relay_connection *tmp_conn = NULL;
+ uint64_t relay_conn_pipe_activity_phase = 0;
+ uint64_t current_activity_phase = 1;
DBG("[thread] Relay worker started");
restart:
while (1) {
- int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+ int i;
+ bool at_least_one_event_processed = false;
health_code_update();
nb_fd = ret;
/*
- * Process control. The control connection is
- * prioritized so we don't starve it with high
- * throughput tracing data on the data connection.
+ * Processes FDs that were not yet active during the
+ * current activity phase. Increment the activity phase when all
+ * events are already processed for the current activity
+ * phase. This result in fairness across all connections and
+ * connection pipe.
*/
for (i = 0; i < nb_fd; i++) {
/* Fetch once the poll data */
if (revents & LPOLLIN) {
struct relay_connection *conn;
+ if (relay_conn_pipe_activity_phase == current_activity_phase) {
+ /*
+ * Only consider once per
+ * activity phase for fairness.
+ */
+ DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase);
+ continue;
+ }
+
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
if (ret < 0) {
goto error;
}
+ /*
+ * For now we prefer fairness over
+ * "immediate" action of new
+ * connection. Set activity phase to the
+ * current phase.
+ */
+ conn->activity_phase = current_activity_phase;
lttng_poll_add(&events, conn->sock->fd,
LPOLLIN | LPOLLRDHUP);
connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
+ relay_conn_pipe_activity_phase = current_activity_phase;
+ at_least_one_event_processed = true;
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
ERR("Relay connection pipe error");
goto error;
goto error;
}
} else {
- struct relay_connection *ctrl_conn;
+ struct relay_connection *connection;
- ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
+ connection = connection_get_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
- assert(ctrl_conn);
-
- if (ctrl_conn->type == RELAY_DATA) {
- if (revents & LPOLLIN) {
- /*
- * Flag the last seen data fd not deleted. It will be
- * used as the last seen fd if any fd gets deleted in
- * this first loop.
- */
- last_notdel_data_fd = pollfd;
- }
- goto put_ctrl_connection;
+ assert(connection);
+ if (connection->activity_phase == current_activity_phase) {
+ DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase);
+ goto connection_put;
}
- assert(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
enum relay_connection_status status;
- status = relay_process_control(ctrl_conn);
+ at_least_one_event_processed = true;
+ connection->activity_phase = current_activity_phase;
+
+ if (connection->type == RELAY_DATA) {
+ status = relay_process_data(connection);
+ } else {
+ status = relay_process_control(connection);
+ }
if (status != RELAY_CONNECTION_STATUS_OK) {
/*
* On socket error flag the session as aborted to force
* connection is closed (uncleanly) before the packet's
* data provided.
*
- * Since the control connection encountered an error,
+ * Since the connection encountered an error,
* it is okay to be conservative and close the
* session right now as we can't rely on the protocol
* being respected anymore.
*/
if (status == RELAY_CONNECTION_STATUS_ERROR) {
- session_abort(ctrl_conn->session);
+ session_abort(connection->session);
}
/* Clear the connection on error or close. */
relay_thread_close_connection(&events,
pollfd,
- ctrl_conn);
+ connection);
}
- seen_control = 1;
} else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
relay_thread_close_connection(&events,
- pollfd, ctrl_conn);
- if (last_seen_data_fd == pollfd) {
- last_seen_data_fd = last_notdel_data_fd;
- }
+ pollfd, connection);
} else {
- ERR("Unexpected poll events %u for control sock %d",
+ ERR("Unexpected poll events %u for sock %d",
revents, pollfd);
- connection_put(ctrl_conn);
+ connection_put(connection);
goto error;
}
- put_ctrl_connection:
- connection_put(ctrl_conn);
- }
- }
-
- /*
- * The last loop handled a control request, go back to poll to make
- * sure we prioritise the control socket.
- */
- if (seen_control) {
- continue;
- }
-
- if (last_seen_data_fd >= 0) {
- for (i = 0; i < nb_fd; i++) {
- int pollfd = LTTNG_POLL_GETFD(&events, i);
-
- health_code_update();
-
- if (last_seen_data_fd == pollfd) {
- idx = i;
- break;
- }
+ connection_put:
+ connection_put(connection);
}
}
- /* Process data connection. */
- for (i = idx + 1; i < nb_fd; i++) {
- /* Fetch the poll data. */
- uint32_t revents = LTTNG_POLL_GETEV(&events, i);
- int pollfd = LTTNG_POLL_GETFD(&events, i);
- struct relay_connection *data_conn;
-
- health_code_update();
-
- if (!revents) {
- /* No activity for this FD (poll implementation). */
- continue;
- }
-
- /* Skip the command pipe. It's handled in the first loop. */
- if (pollfd == relay_conn_pipe[0]) {
- continue;
- }
-
- data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
- if (!data_conn) {
- /* Skip it. Might be removed before. */
- continue;
- }
- if (data_conn->type == RELAY_CONTROL) {
- goto put_data_connection;
- }
- assert(data_conn->type == RELAY_DATA);
-
- if (revents & LPOLLIN) {
- enum relay_connection_status status;
-
- status = relay_process_data(data_conn);
- /* Connection closed or error. */
- if (status != RELAY_CONNECTION_STATUS_OK) {
- /*
- * On socket error flag the session as aborted to force
- * the cleanup of its stream otherwise it can leak
- * during the lifetime of the relayd.
- *
- * This prevents situations in which streams can be
- * left opened because an index was received, the
- * control connection is closed, and the data
- * connection is closed (uncleanly) before the packet's
- * data provided.
- *
- * Since the data connection encountered an error,
- * it is okay to be conservative and close the
- * session right now as we can't rely on the protocol
- * being respected anymore.
- */
- if (status == RELAY_CONNECTION_STATUS_ERROR) {
- session_abort(data_conn->session);
- }
- relay_thread_close_connection(&events, pollfd,
- data_conn);
- /*
- * Every goto restart call sets the last seen fd where
- * here we don't really care since we gracefully
- * continue the loop after the connection is deleted.
- */
- } else {
- /* Keep last seen port. */
- last_seen_data_fd = pollfd;
- connection_put(data_conn);
- goto restart;
- }
- } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- relay_thread_close_connection(&events, pollfd,
- data_conn);
- } else {
- ERR("Unknown poll events %u for data sock %d",
- revents, pollfd);
- }
- put_data_connection:
- connection_put(data_conn);
+ if (!at_least_one_event_processed) {
+ current_activity_phase++;
+ DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase);
}
- last_seen_data_fd = -1;
}
/* Normal exit, no error */
/* Cleanup reamaining connection object. */
rcu_read_lock();
cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
- destroy_conn,
+ tmp_conn,
sock_n.node) {
health_code_update();
- session_abort(destroy_conn->session);
+ session_abort(tmp_conn->session);
/*
* No need to grab another ref, because we own
- * destroy_conn.
+ * tmp conn.
*/
- relay_thread_close_connection(&events, destroy_conn->sock->fd,
- destroy_conn);
+ relay_thread_close_connection(&events, tmp_conn->sock->fd,
+ tmp_conn);
}
rcu_read_unlock();