relayd: do not prioritize control events over data.
authorJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Mon, 11 Feb 2019 19:43:54 +0000 (14:43 -0500)
committerJonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Tue, 25 Jun 2019 15:15:44 +0000 (11:15 -0400)
Simplify the algorithm used by relayd for control and data connections
handling.

Use the notion of activity phase. An activity phase represent a phase
for which all connections with activity (poll/epoll) are not yet processed.

When an active connection is processed, her activity phase is set to the
current activity phase to prevent further progress during the same
activity phase.

Once all active connections (poll events) have been processed during
the current activity phase, the current activity phase is incremented.

This give fairness across all connections during a given activity phase.

This can also serve as a base for future work toward resources based
prioritizing.

Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
src/bin/lttng-relayd/connection.c
src/bin/lttng-relayd/connection.h
src/bin/lttng-relayd/main.c

index 6a2c27ff6c984565f5b1880d6afe8bcc138b9b29..6d6107eb66ed065aff6c9503b5027c78cf596501 100644 (file)
@@ -122,6 +122,7 @@ struct relay_connection *connection_create(struct lttcomm_sock *sock,
        if (conn->type == RELAY_CONTROL) {
                lttng_dynamic_buffer_init(&conn->protocol.ctrl.reception_buffer);
        }
+       conn->activity_phase = 0;
        connection_reset_protocol_state(conn);
 end:
        return conn;
index d0edf82146ba4de10826443c4c4e46d0fd9e1799..d189e1549672d8016ab80adfa9bb25f4537390d3 100644 (file)
@@ -138,6 +138,11 @@ struct relay_connection {
                        struct lttng_dynamic_buffer reception_buffer;
                } ctrl;
        } protocol;
+       /*
+        * The activity phase for which the connection was last active.
+        * This is used to ensure fairness across connections.
+        */
+       uint64_t activity_phase;
 };
 
 struct relay_connection *connection_create(struct lttcomm_sock *sock,
index 7f091c2b792891f523a22aad61a60749869d8be1..19b1342ac5deeffa9ac2fc8c886779c2fa5fbd1d 100644 (file)
@@ -3058,12 +3058,14 @@ static void relay_thread_close_connection(struct lttng_poll_event *events,
  */
 static void *relay_thread_worker(void *data)
 {
-       int ret, err = -1, last_seen_data_fd = -1;
+       int ret, err = -1;
        uint32_t nb_fd;
        struct lttng_poll_event events;
        struct lttng_ht *relay_connections_ht;
        struct lttng_ht_iter iter;
-       struct relay_connection *destroy_conn = NULL;
+       struct relay_connection *tmp_conn = NULL;
+       uint64_t relay_conn_pipe_activity_phase = 0;
+       uint64_t current_activity_phase = 1;
 
        DBG("[thread] Relay worker started");
 
@@ -3095,7 +3097,8 @@ static void *relay_thread_worker(void *data)
 
 restart:
        while (1) {
-               int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1;
+               int i;
+               bool at_least_one_event_processed = false;
 
                health_code_update();
 
@@ -3117,9 +3120,11 @@ restart:
                nb_fd = ret;
 
                /*
-                * Process control. The control connection is
-                * prioritized so we don't starve it with high
-                * throughput tracing data on the data connection.
+                * Processes FDs that were not yet active during the
+                * current activity phase. Increment the activity phase when all
+                * events are already processed for the current activity
+                * phase. This result in fairness across all connections and
+                * connection pipe.
                 */
                for (i = 0; i < nb_fd; i++) {
                        /* Fetch once the poll data */
@@ -3148,14 +3153,32 @@ restart:
                                if (revents & LPOLLIN) {
                                        struct relay_connection *conn;
 
+                                       if (relay_conn_pipe_activity_phase == current_activity_phase) {
+                                               /*
+                                                * Only consider once per
+                                                * activity phase for fairness.
+                                                */
+                                               DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase);
+                                               continue;
+                                       }
+
                                        ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
                                        if (ret < 0) {
                                                goto error;
                                        }
+                                       /*
+                                        * For now we prefer fairness over
+                                        * "immediate" action of new
+                                        * connection. Set activity phase to the
+                                        * current phase.
+                                        */
+                                       conn->activity_phase = current_activity_phase;
                                        lttng_poll_add(&events, conn->sock->fd,
                                                        LPOLLIN | LPOLLRDHUP);
                                        connection_ht_add(relay_connections_ht, conn);
                                        DBG("Connection socket %d added", conn->sock->fd);
+                                       relay_conn_pipe_activity_phase = current_activity_phase;
+                                       at_least_one_event_processed = true;
                                } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        ERR("Relay connection pipe error");
                                        goto error;
@@ -3164,29 +3187,27 @@ restart:
                                        goto error;
                                }
                        } else {
-                               struct relay_connection *ctrl_conn;
+                               struct relay_connection *connection;
 
-                               ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
+                               connection = connection_get_by_sock(relay_connections_ht, pollfd);
                                /* If not found, there is a synchronization issue. */
-                               assert(ctrl_conn);
-
-                               if (ctrl_conn->type == RELAY_DATA) {
-                                       if (revents & LPOLLIN) {
-                                               /*
-                                                * Flag the last seen data fd not deleted. It will be
-                                                * used as the last seen fd if any fd gets deleted in
-                                                * this first loop.
-                                                */
-                                               last_notdel_data_fd = pollfd;
-                                       }
-                                       goto put_ctrl_connection;
+                               assert(connection);
+                               if (connection->activity_phase == current_activity_phase) {
+                                       DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase);
+                                       goto connection_put;
                                }
-                               assert(ctrl_conn->type == RELAY_CONTROL);
 
                                if (revents & LPOLLIN) {
                                        enum relay_connection_status status;
 
-                                       status = relay_process_control(ctrl_conn);
+                                       at_least_one_event_processed = true;
+                                       connection->activity_phase = current_activity_phase;
+
+                                       if (connection->type == RELAY_DATA) {
+                                               status = relay_process_data(connection);
+                                       } else {
+                                               status = relay_process_control(connection);
+                                       }
                                        if (status != RELAY_CONNECTION_STATUS_OK) {
                                                /*
                                                 * On socket error flag the session as aborted to force
@@ -3199,137 +3220,38 @@ restart:
                                                 * connection is closed (uncleanly) before the packet's
                                                 * data provided.
                                                 *
-                                                * Since the control connection encountered an error,
+                                                * Since the connection encountered an error,
                                                 * it is okay to be conservative and close the
                                                 * session right now as we can't rely on the protocol
                                                 * being respected anymore.
                                                 */
                                                if (status == RELAY_CONNECTION_STATUS_ERROR) {
-                                                       session_abort(ctrl_conn->session);
+                                                       session_abort(connection->session);
                                                }
 
                                                /* Clear the connection on error or close. */
                                                relay_thread_close_connection(&events,
                                                                pollfd,
-                                                               ctrl_conn);
+                                                               connection);
                                        }
-                                       seen_control = 1;
                                } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
                                        relay_thread_close_connection(&events,
-                                                       pollfd, ctrl_conn);
-                                       if (last_seen_data_fd == pollfd) {
-                                               last_seen_data_fd = last_notdel_data_fd;
-                                       }
+                                                       pollfd, connection);
                                } else {
-                                       ERR("Unexpected poll events %u for control sock %d",
+                                       ERR("Unexpected poll events %u for sock %d",
                                                        revents, pollfd);
-                                       connection_put(ctrl_conn);
+                                       connection_put(connection);
                                        goto error;
                                }
-                       put_ctrl_connection:
-                               connection_put(ctrl_conn);
+                       connection_put:
+                               connection_put(connection);
                        }
                }
 
-               /*
-                * The last loop handled a control request, go back to poll to make
-                * sure we prioritise the control socket.
-                */
-               if (seen_control) {
-                       continue;
-               }
-
-               if (last_seen_data_fd >= 0) {
-                       for (i = 0; i < nb_fd; i++) {
-                               int pollfd = LTTNG_POLL_GETFD(&events, i);
-
-                               health_code_update();
-
-                               if (last_seen_data_fd == pollfd) {
-                                       idx = i;
-                                       break;
-                               }
-                       }
-               }
-
-               /* Process data connection. */
-               for (i = idx + 1; i < nb_fd; i++) {
-                       /* Fetch the poll data. */
-                       uint32_t revents = LTTNG_POLL_GETEV(&events, i);
-                       int pollfd = LTTNG_POLL_GETFD(&events, i);
-                       struct relay_connection *data_conn;
-
-                       health_code_update();
-
-                       if (!revents) {
-                               /* No activity for this FD (poll implementation). */
-                               continue;
-                       }
-
-                       /* Skip the command pipe. It's handled in the first loop. */
-                       if (pollfd == relay_conn_pipe[0]) {
-                               continue;
-                       }
-
-                       data_conn = connection_get_by_sock(relay_connections_ht, pollfd);
-                       if (!data_conn) {
-                               /* Skip it. Might be removed before. */
-                               continue;
-                       }
-                       if (data_conn->type == RELAY_CONTROL) {
-                               goto put_data_connection;
-                       }
-                       assert(data_conn->type == RELAY_DATA);
-
-                       if (revents & LPOLLIN) {
-                               enum relay_connection_status status;
-
-                               status = relay_process_data(data_conn);
-                               /* Connection closed or error. */
-                               if (status != RELAY_CONNECTION_STATUS_OK) {
-                                       /*
-                                        * On socket error flag the session as aborted to force
-                                        * the cleanup of its stream otherwise it can leak
-                                        * during the lifetime of the relayd.
-                                        *
-                                        * This prevents situations in which streams can be
-                                        * left opened because an index was received, the
-                                        * control connection is closed, and the data
-                                        * connection is closed (uncleanly) before the packet's
-                                        * data provided.
-                                        *
-                                        * Since the data connection encountered an error,
-                                        * it is okay to be conservative and close the
-                                        * session right now as we can't rely on the protocol
-                                        * being respected anymore.
-                                        */
-                                       if (status == RELAY_CONNECTION_STATUS_ERROR) {
-                                               session_abort(data_conn->session);
-                                       }
-                                       relay_thread_close_connection(&events, pollfd,
-                                                       data_conn);
-                                       /*
-                                        * Every goto restart call sets the last seen fd where
-                                        * here we don't really care since we gracefully
-                                        * continue the loop after the connection is deleted.
-                                        */
-                               } else {
-                                       /* Keep last seen port. */
-                                       last_seen_data_fd = pollfd;
-                                       connection_put(data_conn);
-                                       goto restart;
-                               }
-                       } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
-                               relay_thread_close_connection(&events, pollfd,
-                                               data_conn);
-                       } else {
-                               ERR("Unknown poll events %u for data sock %d",
-                                               revents, pollfd);
-                       }
-               put_data_connection:
-                       connection_put(data_conn);
+               if (!at_least_one_event_processed) {
+                       current_activity_phase++;
+                       DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase);
                }
-               last_seen_data_fd = -1;
        }
 
        /* Normal exit, no error */
@@ -3340,18 +3262,18 @@ error:
        /* Cleanup reamaining connection object. */
        rcu_read_lock();
        cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter,
-                       destroy_conn,
+                       tmp_conn,
                        sock_n.node) {
                health_code_update();
 
-               session_abort(destroy_conn->session);
+               session_abort(tmp_conn->session);
 
                /*
                 * No need to grab another ref, because we own
-                * destroy_conn.
+                * tmp conn.
                 */
-               relay_thread_close_connection(&events, destroy_conn->sock->fd,
-                               destroy_conn);
+               relay_thread_close_connection(&events, tmp_conn->sock->fd,
+                               tmp_conn);
        }
        rcu_read_unlock();
 
This page took 0.034384 seconds and 5 git commands to generate.