X-Git-Url: http://git.efficios.com/?p=lttng-tools.git;a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;fp=src%2Fbin%2Flttng-relayd%2Fmain.c;h=19b1342ac5deeffa9ac2fc8c886779c2fa5fbd1d;hp=7f091c2b792891f523a22aad61a60749869d8be1;hb=d9b9088d65b39e019f0d663df9fc5383408d60a3;hpb=2ec8403bb9647a2ad22dcfc8bbc79db15b179783 diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 7f091c2b7..19b1342ac 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -3058,12 +3058,14 @@ static void relay_thread_close_connection(struct lttng_poll_event *events, */ static void *relay_thread_worker(void *data) { - int ret, err = -1, last_seen_data_fd = -1; + int ret, err = -1; uint32_t nb_fd; struct lttng_poll_event events; struct lttng_ht *relay_connections_ht; struct lttng_ht_iter iter; - struct relay_connection *destroy_conn = NULL; + struct relay_connection *tmp_conn = NULL; + uint64_t relay_conn_pipe_activity_phase = 0; + uint64_t current_activity_phase = 1; DBG("[thread] Relay worker started"); @@ -3095,7 +3097,8 @@ static void *relay_thread_worker(void *data) restart: while (1) { - int idx = -1, i, seen_control = 0, last_notdel_data_fd = -1; + int i; + bool at_least_one_event_processed = false; health_code_update(); @@ -3117,9 +3120,11 @@ restart: nb_fd = ret; /* - * Process control. The control connection is - * prioritized so we don't starve it with high - * throughput tracing data on the data connection. + * Processes FDs that were not yet active during the + * current activity phase. Increment the activity phase when all + * events are already processed for the current activity + * phase. This result in fairness across all connections and + * connection pipe. */ for (i = 0; i < nb_fd; i++) { /* Fetch once the poll data */ @@ -3148,14 +3153,32 @@ restart: if (revents & LPOLLIN) { struct relay_connection *conn; + if (relay_conn_pipe_activity_phase == current_activity_phase) { + /* + * Only consider once per + * activity phase for fairness. + */ + DBG("Skipping adding reception, already happened in activity phase %" PRIu64, current_activity_phase); + continue; + } + ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn)); if (ret < 0) { goto error; } + /* + * For now we prefer fairness over + * "immediate" action of new + * connection. Set activity phase to the + * current phase. + */ + conn->activity_phase = current_activity_phase; lttng_poll_add(&events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); + relay_conn_pipe_activity_phase = current_activity_phase; + at_least_one_event_processed = true; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { ERR("Relay connection pipe error"); goto error; @@ -3164,29 +3187,27 @@ restart: goto error; } } else { - struct relay_connection *ctrl_conn; + struct relay_connection *connection; - ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd); + connection = connection_get_by_sock(relay_connections_ht, pollfd); /* If not found, there is a synchronization issue. */ - assert(ctrl_conn); - - if (ctrl_conn->type == RELAY_DATA) { - if (revents & LPOLLIN) { - /* - * Flag the last seen data fd not deleted. It will be - * used as the last seen fd if any fd gets deleted in - * this first loop. - */ - last_notdel_data_fd = pollfd; - } - goto put_ctrl_connection; + assert(connection); + if (connection->activity_phase == current_activity_phase) { + DBG3("Skipping connection %d, already processed in activity phase %" PRIu64, connection->sock->fd, current_activity_phase); + goto connection_put; } - assert(ctrl_conn->type == RELAY_CONTROL); if (revents & LPOLLIN) { enum relay_connection_status status; - status = relay_process_control(ctrl_conn); + at_least_one_event_processed = true; + connection->activity_phase = current_activity_phase; + + if (connection->type == RELAY_DATA) { + status = relay_process_data(connection); + } else { + status = relay_process_control(connection); + } if (status != RELAY_CONNECTION_STATUS_OK) { /* * On socket error flag the session as aborted to force @@ -3199,137 +3220,38 @@ restart: * connection is closed (uncleanly) before the packet's * data provided. * - * Since the control connection encountered an error, + * Since the connection encountered an error, * it is okay to be conservative and close the * session right now as we can't rely on the protocol * being respected anymore. */ if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(ctrl_conn->session); + session_abort(connection->session); } /* Clear the connection on error or close. */ relay_thread_close_connection(&events, pollfd, - ctrl_conn); + connection); } - seen_control = 1; } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { relay_thread_close_connection(&events, - pollfd, ctrl_conn); - if (last_seen_data_fd == pollfd) { - last_seen_data_fd = last_notdel_data_fd; - } + pollfd, connection); } else { - ERR("Unexpected poll events %u for control sock %d", + ERR("Unexpected poll events %u for sock %d", revents, pollfd); - connection_put(ctrl_conn); + connection_put(connection); goto error; } - put_ctrl_connection: - connection_put(ctrl_conn); + connection_put: + connection_put(connection); } } - /* - * The last loop handled a control request, go back to poll to make - * sure we prioritise the control socket. - */ - if (seen_control) { - continue; - } - - if (last_seen_data_fd >= 0) { - for (i = 0; i < nb_fd; i++) { - int pollfd = LTTNG_POLL_GETFD(&events, i); - - health_code_update(); - - if (last_seen_data_fd == pollfd) { - idx = i; - break; - } - } - } - - /* Process data connection. */ - for (i = idx + 1; i < nb_fd; i++) { - /* Fetch the poll data. */ - uint32_t revents = LTTNG_POLL_GETEV(&events, i); - int pollfd = LTTNG_POLL_GETFD(&events, i); - struct relay_connection *data_conn; - - health_code_update(); - - if (!revents) { - /* No activity for this FD (poll implementation). */ - continue; - } - - /* Skip the command pipe. It's handled in the first loop. */ - if (pollfd == relay_conn_pipe[0]) { - continue; - } - - data_conn = connection_get_by_sock(relay_connections_ht, pollfd); - if (!data_conn) { - /* Skip it. Might be removed before. */ - continue; - } - if (data_conn->type == RELAY_CONTROL) { - goto put_data_connection; - } - assert(data_conn->type == RELAY_DATA); - - if (revents & LPOLLIN) { - enum relay_connection_status status; - - status = relay_process_data(data_conn); - /* Connection closed or error. */ - if (status != RELAY_CONNECTION_STATUS_OK) { - /* - * On socket error flag the session as aborted to force - * the cleanup of its stream otherwise it can leak - * during the lifetime of the relayd. - * - * This prevents situations in which streams can be - * left opened because an index was received, the - * control connection is closed, and the data - * connection is closed (uncleanly) before the packet's - * data provided. - * - * Since the data connection encountered an error, - * it is okay to be conservative and close the - * session right now as we can't rely on the protocol - * being respected anymore. - */ - if (status == RELAY_CONNECTION_STATUS_ERROR) { - session_abort(data_conn->session); - } - relay_thread_close_connection(&events, pollfd, - data_conn); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - connection_put(data_conn); - goto restart; - } - } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { - relay_thread_close_connection(&events, pollfd, - data_conn); - } else { - ERR("Unknown poll events %u for data sock %d", - revents, pollfd); - } - put_data_connection: - connection_put(data_conn); + if (!at_least_one_event_processed) { + current_activity_phase++; + DBG3("Incrementing activity phase: %" PRIu64, current_activity_phase); } - last_seen_data_fd = -1; } /* Normal exit, no error */ @@ -3340,18 +3262,18 @@ error: /* Cleanup reamaining connection object. */ rcu_read_lock(); cds_lfht_for_each_entry(relay_connections_ht->ht, &iter.iter, - destroy_conn, + tmp_conn, sock_n.node) { health_code_update(); - session_abort(destroy_conn->session); + session_abort(tmp_conn->session); /* * No need to grab another ref, because we own - * destroy_conn. + * tmp conn. */ - relay_thread_close_connection(&events, destroy_conn->sock->fd, - destroy_conn); + relay_thread_close_connection(&events, tmp_conn->sock->fd, + tmp_conn); } rcu_read_unlock();