libceph: abstract out ceph_osd_request enqueue logic
[deliverable/linux.git] / net / ceph / osd_client.c
index 05be0c1816958b0d0db6b2c319631d41f273e3d0..648a215d734e8f1e22a93e17eb210fff69014e3c 100644 (file)
@@ -297,12 +297,21 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 /*
  * requests
  */
-void ceph_osdc_release_request(struct kref *kref)
+static void ceph_osdc_release_request(struct kref *kref)
 {
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req = container_of(kref,
+                                           struct ceph_osd_request, r_kref);
        unsigned int which;
 
-       req = container_of(kref, struct ceph_osd_request, r_kref);
+       dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
+            req->r_request, req->r_reply);
+       WARN_ON(!RB_EMPTY_NODE(&req->r_node));
+       WARN_ON(!list_empty(&req->r_req_lru_item));
+       WARN_ON(!list_empty(&req->r_osd_item));
+       WARN_ON(!list_empty(&req->r_linger_item));
+       WARN_ON(!list_empty(&req->r_linger_osd_item));
+       WARN_ON(req->r_osd);
+
        if (req->r_request)
                ceph_msg_put(req->r_request);
        if (req->r_reply) {
@@ -320,7 +329,22 @@ void ceph_osdc_release_request(struct kref *kref)
                kmem_cache_free(ceph_osd_request_cache, req);
 
 }
-EXPORT_SYMBOL(ceph_osdc_release_request);
+
+void ceph_osdc_get_request(struct ceph_osd_request *req)
+{
+       dout("%s %p (was %d)\n", __func__, req,
+            atomic_read(&req->r_kref.refcount));
+       kref_get(&req->r_kref);
+}
+EXPORT_SYMBOL(ceph_osdc_get_request);
+
+void ceph_osdc_put_request(struct ceph_osd_request *req)
+{
+       dout("%s %p (was %d)\n", __func__, req,
+            atomic_read(&req->r_kref.refcount));
+       kref_put(&req->r_kref, ceph_osdc_release_request);
+}
+EXPORT_SYMBOL(ceph_osdc_put_request);
 
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                                               struct ceph_snap_context *snapc,
@@ -364,7 +388,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        RB_CLEAR_NODE(&req->r_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
-       INIT_LIST_HEAD(&req->r_linger_osd);
+       INIT_LIST_HEAD(&req->r_linger_osd_item);
        INIT_LIST_HEAD(&req->r_req_lru_item);
        INIT_LIST_HEAD(&req->r_osd_item);
 
@@ -916,7 +940,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
         * list at the end to keep things in tid order.
         */
        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
-                                r_linger_osd) {
+                                r_linger_osd_item) {
                /*
                 * reregister request prior to unregistering linger so
                 * that r_osd is preserved.
@@ -1008,6 +1032,8 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
        dout("__remove_osd %p\n", osd);
        BUG_ON(!list_empty(&osd->o_requests));
+       BUG_ON(!list_empty(&osd->o_linger_requests));
+
        rb_erase(&osd->o_node, &osdc->osds);
        list_del_init(&osd->o_osd_lru);
        ceph_con_close(&osd->o_con);
@@ -1029,12 +1055,23 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
                              struct ceph_osd *osd)
 {
-       dout("__move_osd_to_lru %p\n", osd);
+       dout("%s %p\n", __func__, osd);
        BUG_ON(!list_empty(&osd->o_osd_lru));
+
        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
        osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
 }
 
+static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
+                                 struct ceph_osd *osd)
+{
+       dout("%s %p\n", __func__, osd);
+
+       if (list_empty(&osd->o_requests) &&
+           list_empty(&osd->o_linger_requests))
+               __move_osd_to_lru(osdc, osd);
+}
+
 static void __remove_osd_from_lru(struct ceph_osd *osd)
 {
        dout("__remove_osd_from_lru %p\n", osd);
@@ -1175,6 +1212,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
        rb_erase(&req->r_node, &osdc->requests);
+       RB_CLEAR_NODE(&req->r_node);
        osdc->num_requests--;
 
        if (req->r_osd) {
@@ -1182,12 +1220,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                ceph_msg_revoke(req->r_request);
 
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests) &&
-                   list_empty(&req->r_osd->o_linger_requests)) {
-                       dout("moving osd to %p lru\n", req->r_osd);
-                       __move_osd_to_lru(osdc, req->r_osd);
-               }
-               if (list_empty(&req->r_linger_item))
+               maybe_move_osd_to_lru(osdc, req->r_osd);
+               if (list_empty(&req->r_linger_osd_item))
                        req->r_osd = NULL;
        }
 
@@ -1214,45 +1248,39 @@ static void __cancel_request(struct ceph_osd_request *req)
 static void __register_linger_request(struct ceph_osd_client *osdc,
                                    struct ceph_osd_request *req)
 {
-       dout("__register_linger_request %p\n", req);
+       dout("%s %p tid %llu\n", __func__, req, req->r_tid);
+       WARN_ON(!req->r_linger);
+
        ceph_osdc_get_request(req);
        list_add_tail(&req->r_linger_item, &osdc->req_linger);
        if (req->r_osd)
-               list_add_tail(&req->r_linger_osd,
+               list_add_tail(&req->r_linger_osd_item,
                              &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req)
 {
-       dout("__unregister_linger_request %p\n", req);
+       WARN_ON(!req->r_linger);
+
+       if (list_empty(&req->r_linger_item)) {
+               dout("%s %p tid %llu not registered\n", __func__, req,
+                    req->r_tid);
+               return;
+       }
+
+       dout("%s %p tid %llu\n", __func__, req, req->r_tid);
        list_del_init(&req->r_linger_item);
-       if (req->r_osd) {
-               list_del_init(&req->r_linger_osd);
 
-               if (list_empty(&req->r_osd->o_requests) &&
-                   list_empty(&req->r_osd->o_linger_requests)) {
-                       dout("moving osd to %p lru\n", req->r_osd);
-                       __move_osd_to_lru(osdc, req->r_osd);
-               }
+       if (req->r_osd) {
+               list_del_init(&req->r_linger_osd_item);
+               maybe_move_osd_to_lru(osdc, req->r_osd);
                if (list_empty(&req->r_osd_item))
                        req->r_osd = NULL;
        }
        ceph_osdc_put_request(req);
 }
 
-void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
-                                        struct ceph_osd_request *req)
-{
-       mutex_lock(&osdc->request_mutex);
-       if (req->r_linger) {
-               req->r_linger = 0;
-               __unregister_linger_request(osdc, req);
-       }
-       mutex_unlock(&osdc->request_mutex);
-}
-EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
-
 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
                                  struct ceph_osd_request *req)
 {
@@ -1318,6 +1346,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
                                   &req->r_target_oid, pg_out);
 }
 
+static void __enqueue_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+
+       dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
+            req->r_osd ? req->r_osd->o_osd : -1);
+
+       if (req->r_osd) {
+               __remove_osd_from_lru(req->r_osd);
+               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
+       } else {
+               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
+       }
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1395,13 +1439,7 @@ static int __map_request(struct ceph_osd_client *osdc,
                              &osdc->osdmap->osd_addr[o]);
        }
 
-       if (req->r_osd) {
-               __remove_osd_from_lru(req->r_osd);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-       } else {
-               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-       }
+       __enqueue_request(req);
        err = 1;   /* osd or pg changed */
 
 out:
@@ -2429,6 +2467,25 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
 
+/*
+ * Unregister a registered request.  The request is not completed (i.e.
+ * no callbacks or wakeups) - higher layers are supposed to know what
+ * they are canceling.
+ */
+void ceph_osdc_cancel_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+
+       mutex_lock(&osdc->request_mutex);
+       if (req->r_linger)
+               __unregister_linger_request(osdc, req);
+       __unregister_request(osdc, req);
+       mutex_unlock(&osdc->request_mutex);
+
+       dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_request);
+
 /*
  * wait for a request to complete
  */
@@ -2437,18 +2494,18 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
 {
        int rc;
 
+       dout("%s %p tid %llu\n", __func__, req, req->r_tid);
+
        rc = wait_for_completion_interruptible(&req->r_completion);
        if (rc < 0) {
-               mutex_lock(&osdc->request_mutex);
-               __cancel_request(req);
-               __unregister_request(osdc, req);
-               mutex_unlock(&osdc->request_mutex);
+               dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
+               ceph_osdc_cancel_request(req);
                complete_request(req);
-               dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
                return rc;
        }
 
-       dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
+       dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
+            req->r_result);
        return req->r_result;
 }
 EXPORT_SYMBOL(ceph_osdc_wait_request);
This page took 0.027642 seconds and 5 git commands to generate.