Merge branch 'sched-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[deliverable/linux.git] / net / tipc / link.c
index fb1485dc6736ec84719c262334ead93d659bb7c4..65410e18b8a6e90f52554276db9bdd927d2a63ba 100644 (file)
@@ -36,7 +36,6 @@
 
 #include "core.h"
 #include "link.h"
-#include "port.h"
 #include "socket.h"
 #include "name_distr.h"
 #include "discover.h"
@@ -275,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        link_init_max_pkt(l_ptr);
 
        l_ptr->next_out_no = 1;
-       INIT_LIST_HEAD(&l_ptr->waiting_ports);
+       __skb_queue_head_init(&l_ptr->waiting_sks);
 
        link_reset_statistics(l_ptr);
 
@@ -322,66 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
 }
 
 /**
- * link_schedule_port - schedule port for deferred sending
- * @l_ptr: pointer to link
- * @origport: reference to sending port
- * @sz: amount of data to be sent
- *
- * Schedules port for renewed sending of messages after link congestion
- * has abated.
+ * link_schedule_user - schedule user for wakeup after congestion
+ * @link: congested link
+ * @oport: sending port
+ * @chain_sz: size of buffer chain that was attempted sent
+ * @imp: importance of message attempted sent
+ * Create pseudo msg to send back to user when congestion abates
  */
-static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
+static bool link_schedule_user(struct tipc_link *link, u32 oport,
+                              uint chain_sz, uint imp)
 {
-       struct tipc_port *p_ptr;
-       struct tipc_sock *tsk;
+       struct sk_buff *buf;
 
-       spin_lock_bh(&tipc_port_list_lock);
-       p_ptr = tipc_port_lock(origport);
-       if (p_ptr) {
-               if (!list_empty(&p_ptr->wait_list))
-                       goto exit;
-               tsk = tipc_port_to_sock(p_ptr);
-               tsk->link_cong = 1;
-               p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
-               list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
-               l_ptr->stats.link_congs++;
-exit:
-               tipc_port_unlock(p_ptr);
-       }
-       spin_unlock_bh(&tipc_port_list_lock);
-       return -ELINKCONG;
+       buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
+                             tipc_own_addr, oport, 0, 0);
+       if (!buf)
+               return false;
+       TIPC_SKB_CB(buf)->chain_sz = chain_sz;
+       TIPC_SKB_CB(buf)->chain_imp = imp;
+       __skb_queue_tail(&link->waiting_sks, buf);
+       link->stats.link_congs++;
+       return true;
 }
 
-void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
+/**
+ * link_prepare_wakeup - prepare users for wakeup after congestion
+ * @link: congested link
+ * Move a number of waiting users, as permitted by available space in
+ * the send queue, from link wait queue to node wait queue for wakeup
+ */
+static void link_prepare_wakeup(struct tipc_link *link)
 {
-       struct tipc_port *p_ptr;
-       struct tipc_sock *tsk;
-       struct tipc_port *temp_p_ptr;
-       int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
-
-       if (all)
-               win = 100000;
-       if (win <= 0)
-               return;
-       if (!spin_trylock_bh(&tipc_port_list_lock))
-               return;
-       if (link_congested(l_ptr))
-               goto exit;
-       list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
-                                wait_list) {
-               if (win <= 0)
+       struct sk_buff_head *wq = &link->waiting_sks;
+       struct sk_buff *buf;
+       uint pend_qsz = link->out_queue_size;
+
+       for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
+               if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
                        break;
-               tsk = tipc_port_to_sock(p_ptr);
-               list_del_init(&p_ptr->wait_list);
-               spin_lock_bh(p_ptr->lock);
-               tsk->link_cong = 0;
-               tipc_sock_wakeup(tsk);
-               win -= p_ptr->waiting_pkts;
-               spin_unlock_bh(p_ptr->lock);
+               pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
+               __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
        }
-
-exit:
-       spin_unlock_bh(&tipc_port_list_lock);
 }
 
 /**
@@ -423,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        u32 prev_state = l_ptr->state;
        u32 checkpoint = l_ptr->next_in_no;
        int was_active_link = tipc_link_is_active(l_ptr);
+       struct tipc_node *owner = l_ptr->owner;
 
        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 
@@ -450,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        kfree_skb(l_ptr->proto_msg_queue);
        l_ptr->proto_msg_queue = NULL;
        kfree_skb_list(l_ptr->oldest_deferred_in);
-       if (!list_empty(&l_ptr->waiting_ports))
-               tipc_link_wakeup_ports(l_ptr, 1);
-
+       if (!skb_queue_empty(&l_ptr->waiting_sks)) {
+               skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
+               owner->action_flags |= TIPC_WAKEUP_USERS;
+       }
        l_ptr->retransm_queue_head = 0;
        l_ptr->retransm_queue_size = 0;
        l_ptr->last_out = NULL;
@@ -688,19 +670,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
 {
        struct tipc_msg *msg = buf_msg(buf);
-       uint psz = msg_size(msg);
        uint imp = tipc_msg_tot_importance(msg);
        u32 oport = msg_tot_origport(msg);
 
-       if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
-               if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
-                       link_schedule_port(link, oport, psz);
-                       return -ELINKCONG;
-               }
-       } else {
+       if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
                tipc_link_reset(link);
+               goto drop;
        }
+       if (unlikely(msg_errcode(msg)))
+               goto drop;
+       if (unlikely(msg_reroute_cnt(msg)))
+               goto drop;
+       if (TIPC_SKB_CB(buf)->wakeup_pending)
+               return -ELINKCONG;
+       if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
+               return -ELINKCONG;
+drop:
        kfree_skb_list(buf);
        return -EHOSTUNREACH;
 }
@@ -1202,8 +1188,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                if (unlikely(l_ptr->next_out))
                        tipc_link_push_queue(l_ptr);
 
-               if (unlikely(!list_empty(&l_ptr->waiting_ports)))
-                       tipc_link_wakeup_ports(l_ptr, 0);
+               if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
+                       link_prepare_wakeup(l_ptr);
+                       l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
+               }
 
                /* Process the incoming packet */
                if (unlikely(!link_working_working(l_ptr))) {
This page took 0.026547 seconds and 5 git commands to generate.