Merge remote-tracking branch 'staging/staging-next'
[deliverable/linux.git] / drivers / staging / lustre / lustre / obdclass / genops.c
index 99c2da632b51431d6ca52c7b2608668fb575f35a..0bc623e5b35a2e51a36663c49410966fd4815584 100644 (file)
@@ -166,10 +166,10 @@ int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
            !type->typ_name)
                goto failed;
 
-       *(type->typ_dt_ops) = *dt_ops;
+       *type->typ_dt_ops = *dt_ops;
        /* md_ops is optional */
        if (md_ops)
-               *(type->typ_md_ops) = *md_ops;
+               *type->typ_md_ops = *md_ops;
        strcpy(type->typ_name, name);
        spin_lock_init(&type->obd_type_lock);
 
@@ -509,7 +509,7 @@ struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
                        continue;
                if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
                        if (next)
-                               *next = i+1;
+                               *next = i + 1;
                        read_unlock(&obd_dev_lock);
                        return obd;
                }
@@ -618,7 +618,7 @@ struct obd_export *class_conn2export(struct lustre_handle *conn)
        }
 
        CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
-       export = class_handle2object(conn->cookie);
+       export = class_handle2object(conn->cookie, NULL);
        return export;
 }
 EXPORT_SYMBOL(class_conn2export);
@@ -1312,3 +1312,135 @@ void obd_zombie_impexp_stop(void)
        obd_zombie_impexp_notify();
        wait_for_completion(&obd_zombie_stop);
 }
+
+struct obd_request_slot_waiter {
+       struct list_head        orsw_entry;
+       wait_queue_head_t       orsw_waitq;
+       bool                    orsw_signaled;
+};
+
+static bool obd_request_slot_avail(struct client_obd *cli,
+                                  struct obd_request_slot_waiter *orsw)
+{
+       bool avail;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       avail = !!list_empty(&orsw->orsw_entry);
+       spin_unlock(&cli->cl_loi_list_lock);
+
+       return avail;
+};
+
+/*
+ * For network flow control, the RPC sponsor needs to acquire a credit
+ * before sending the RPC. The credits count for a connection is defined
+ * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
+ * the subsequent RPC sponsors need to wait until others released their
+ * credits, or the administrator increased the "cl_max_rpcs_in_flight".
+ */
+int obd_get_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter orsw;
+       struct l_wait_info lwi;
+       int rc;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
+               cli->cl_r_in_flight++;
+               spin_unlock(&cli->cl_loi_list_lock);
+               return 0;
+       }
+
+       init_waitqueue_head(&orsw.orsw_waitq);
+       list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
+       orsw.orsw_signaled = false;
+       spin_unlock(&cli->cl_loi_list_lock);
+
+       lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+       rc = l_wait_event(orsw.orsw_waitq,
+                         obd_request_slot_avail(cli, &orsw) ||
+                         orsw.orsw_signaled,
+                         &lwi);
+
+       /*
+        * Here, we must take the lock to avoid the on-stack 'orsw' to be
+        * freed but other (such as obd_put_request_slot) is using it.
+        */
+       spin_lock(&cli->cl_loi_list_lock);
+       if (rc) {
+               if (!orsw.orsw_signaled) {
+                       if (list_empty(&orsw.orsw_entry))
+                               cli->cl_r_in_flight--;
+                       else
+                               list_del(&orsw.orsw_entry);
+               }
+       }
+
+       if (orsw.orsw_signaled) {
+               LASSERT(list_empty(&orsw.orsw_entry));
+
+               rc = -EINTR;
+       }
+       spin_unlock(&cli->cl_loi_list_lock);
+
+       return rc;
+}
+EXPORT_SYMBOL(obd_get_request_slot);
+
+void obd_put_request_slot(struct client_obd *cli)
+{
+       struct obd_request_slot_waiter *orsw;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       cli->cl_r_in_flight--;
+
+       /* If there is free slot, wakeup the first waiter. */
+       if (!list_empty(&cli->cl_loi_read_list) &&
+           likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       spin_unlock(&cli->cl_loi_list_lock);
+}
+EXPORT_SYMBOL(obd_put_request_slot);
+
+__u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
+{
+       return cli->cl_max_rpcs_in_flight;
+}
+EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
+
+int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
+{
+       struct obd_request_slot_waiter *orsw;
+       __u32 old;
+       int diff;
+       int i;
+
+       if (max > OBD_MAX_RIF_MAX || max < 1)
+               return -ERANGE;
+
+       spin_lock(&cli->cl_loi_list_lock);
+       old = cli->cl_max_rpcs_in_flight;
+       cli->cl_max_rpcs_in_flight = max;
+       diff = max - old;
+
+       /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
+       for (i = 0; i < diff; i++) {
+               if (list_empty(&cli->cl_loi_read_list))
+                       break;
+
+               orsw = list_entry(cli->cl_loi_read_list.next,
+                                 struct obd_request_slot_waiter, orsw_entry);
+               list_del_init(&orsw->orsw_entry);
+               cli->cl_r_in_flight++;
+               wake_up(&orsw->orsw_waitq);
+       }
+       spin_unlock(&cli->cl_loi_list_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
This page took 0.026072 seconds and 5 git commands to generate.