Merge branch 'misc' of git://git.kernel.org/pub/scm/linux/kernel/git/mmarek/kbuild
[deliverable/linux.git] / drivers / md / md-cluster.c
index d6a1126d85ce1b9f5f528214a89af7a090647a1b..0ded8e97751d270dbfdae0aac73a792043f10621 100644 (file)
@@ -48,13 +48,29 @@ struct resync_info {
 #define                MD_CLUSTER_SUSPEND_READ_BALANCING       2
 #define                MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
 
+/* Lock the send communication. This is done through
+ * bit manipulation as opposed to a mutex in order to
+ * accomodate lock and hold. See next comment.
+ */
+#define                MD_CLUSTER_SEND_LOCK                    4
+/* If cluster operations (such as adding a disk) must lock the
+ * communication channel, so as to perform extra operations
+ * (update metadata) and no other operation is allowed on the
+ * MD. Token needs to be locked and held until the operation
+ * completes witha md_update_sb(), which would eventually release
+ * the lock.
+ */
+#define                MD_CLUSTER_SEND_LOCKED_ALREADY          5
+
 
 struct md_cluster_info {
        /* dlm lock space and resources for clustered raid. */
        dlm_lockspace_t *lockspace;
        int slot_number;
        struct completion completion;
+       struct mutex recv_mutex;
        struct dlm_lock_resource *bitmap_lockres;
+       struct dlm_lock_resource **other_bitmap_lockres;
        struct dlm_lock_resource *resync_lockres;
        struct list_head suspend_list;
        spinlock_t suspend_lock;
@@ -67,6 +83,7 @@ struct md_cluster_info {
        struct dlm_lock_resource *no_new_dev_lockres;
        struct md_thread *recv_thread;
        struct completion newdisk_completion;
+       wait_queue_head_t wait;
        unsigned long state;
 };
 
@@ -431,8 +448,10 @@ static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
-       md_reload_sb(mddev, le32_to_cpu(msg->raid_slot));
+       mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
+       set_bit(MD_RELOAD_SB, &mddev->flags);
        dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
+       md_wakeup_thread(mddev->thread);
 }
 
 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
@@ -440,8 +459,11 @@ static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
        struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev,
                                                   le32_to_cpu(msg->raid_slot));
 
-       if (rdev)
-               md_kick_rdev_from_array(rdev);
+       if (rdev) {
+               set_bit(ClusterRemove, &rdev->flags);
+               set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
+               md_wakeup_thread(mddev->thread);
+       }
        else
                pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
@@ -502,9 +524,11 @@ static void recv_daemon(struct md_thread *thread)
        struct cluster_msg msg;
        int ret;
 
+       mutex_lock(&cinfo->recv_mutex);
        /*get CR on Message*/
        if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
                pr_err("md/raid1:failed to get CR on MESSAGE\n");
+               mutex_unlock(&cinfo->recv_mutex);
                return;
        }
 
@@ -528,33 +552,45 @@ static void recv_daemon(struct md_thread *thread)
        ret = dlm_unlock_sync(message_lockres);
        if (unlikely(ret != 0))
                pr_info("unlock msg failed return %d\n", ret);
+       mutex_unlock(&cinfo->recv_mutex);
 }
 
-/* lock_comm()
+/* lock_token()
  * Takes the lock on the TOKEN lock resource so no other
  * node can communicate while the operation is underway.
- * If called again, and the TOKEN lock is alread in EX mode
- * return success. However, care must be taken that unlock_comm()
- * is called only once.
  */
-static int lock_comm(struct md_cluster_info *cinfo)
+static int lock_token(struct md_cluster_info *cinfo)
 {
        int error;
 
-       if (cinfo->token_lockres->mode == DLM_LOCK_EX)
-               return 0;
-
        error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
        if (error)
                pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
                                __func__, __LINE__, error);
+
+       /* Lock the receive sequence */
+       mutex_lock(&cinfo->recv_mutex);
        return error;
 }
 
+/* lock_comm()
+ * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
+ */
+static int lock_comm(struct md_cluster_info *cinfo)
+{
+       wait_event(cinfo->wait,
+                  !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
+
+       return lock_token(cinfo);
+}
+
 static void unlock_comm(struct md_cluster_info *cinfo)
 {
        WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
+       mutex_unlock(&cinfo->recv_mutex);
        dlm_unlock_sync(cinfo->token_lockres);
+       clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
+       wake_up(&cinfo->wait);
 }
 
 /* __sendmsg()
@@ -707,6 +743,8 @@ static int join(struct mddev *mddev, int nodes)
        spin_lock_init(&cinfo->suspend_lock);
        init_completion(&cinfo->completion);
        set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
+       init_waitqueue_head(&cinfo->wait);
+       mutex_init(&cinfo->recv_mutex);
 
        mddev->cluster_info = cinfo;
 
@@ -800,6 +838,7 @@ static void resync_bitmap(struct mddev *mddev)
                        __func__, __LINE__, err);
 }
 
+static void unlock_all_bitmaps(struct mddev *mddev);
 static int leave(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
@@ -820,6 +859,7 @@ static int leave(struct mddev *mddev)
        lockres_free(cinfo->ack_lockres);
        lockres_free(cinfo->no_new_dev_lockres);
        lockres_free(cinfo->bitmap_lockres);
+       unlock_all_bitmaps(mddev);
        dlm_release_lockspace(cinfo->lockspace, 2);
        return 0;
 }
@@ -835,9 +875,25 @@ static int slot_number(struct mddev *mddev)
        return cinfo->slot_number - 1;
 }
 
+/*
+ * Check if the communication is already locked, else lock the communication
+ * channel.
+ * If it is already locked, token is in EX mode, and hence lock_token()
+ * should not be called.
+ */
 static int metadata_update_start(struct mddev *mddev)
 {
-       return lock_comm(mddev->cluster_info);
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+
+       wait_event(cinfo->wait,
+                  !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
+                  test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
+
+       /* If token is already locked, return 0 */
+       if (cinfo->token_lockres->mode == DLM_LOCK_EX)
+               return 0;
+
+       return lock_token(cinfo);
 }
 
 static int metadata_update_finish(struct mddev *mddev)
@@ -862,6 +918,7 @@ static int metadata_update_finish(struct mddev *mddev)
                ret = __sendmsg(cinfo, &cmsg);
        } else
                pr_warn("md-cluster: No good device id found to send\n");
+       clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
        unlock_comm(cinfo);
        return ret;
 }
@@ -869,6 +926,7 @@ static int metadata_update_finish(struct mddev *mddev)
 static void metadata_update_cancel(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
+       clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
        unlock_comm(cinfo);
 }
 
@@ -882,8 +940,16 @@ static int resync_start(struct mddev *mddev)
 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
+       struct resync_info ri;
        struct cluster_msg cmsg = {0};
 
+       /* do not send zero again, if we have sent before */
+       if (hi == 0) {
+               memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
+               if (le64_to_cpu(ri.hi) == 0)
+                       return 0;
+       }
+
        add_resync_info(cinfo->bitmap_lockres, lo, hi);
        /* Re-acquire the lock to refresh LVB */
        dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
@@ -954,14 +1020,30 @@ static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
                ret = -ENOENT;
        if (ret)
                unlock_comm(cinfo);
-       else
+       else {
                dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
+               /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
+                * will run soon after add_new_disk, the below path will be
+                * invoked:
+                *   md_wakeup_thread(mddev->thread)
+                *      -> conf->thread (raid1d)
+                *      -> md_check_recovery -> md_update_sb
+                *      -> metadata_update_start/finish
+                * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
+                *
+                * For other failure cases, metadata_update_cancel and
+                * add_new_disk_cancel also clear below bit as well.
+                * */
+               set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
+               wake_up(&cinfo->wait);
+       }
        return ret;
 }
 
 static void add_new_disk_cancel(struct mddev *mddev)
 {
        struct md_cluster_info *cinfo = mddev->cluster_info;
+       clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
        unlock_comm(cinfo);
 }
 
@@ -986,7 +1068,59 @@ static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
        struct md_cluster_info *cinfo = mddev->cluster_info;
        cmsg.type = cpu_to_le32(REMOVE);
        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
-       return __sendmsg(cinfo, &cmsg);
+       return sendmsg(cinfo, &cmsg);
+}
+
+static int lock_all_bitmaps(struct mddev *mddev)
+{
+       int slot, my_slot, ret, held = 1, i = 0;
+       char str[64];
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+
+       cinfo->other_bitmap_lockres = kzalloc((mddev->bitmap_info.nodes - 1) *
+                                            sizeof(struct dlm_lock_resource *),
+                                            GFP_KERNEL);
+       if (!cinfo->other_bitmap_lockres) {
+               pr_err("md: can't alloc mem for other bitmap locks\n");
+               return 0;
+       }
+
+       my_slot = slot_number(mddev);
+       for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
+               if (slot == my_slot)
+                       continue;
+
+               memset(str, '\0', 64);
+               snprintf(str, 64, "bitmap%04d", slot);
+               cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
+               if (!cinfo->other_bitmap_lockres[i])
+                       return -ENOMEM;
+
+               cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
+               ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
+               if (ret)
+                       held = -1;
+               i++;
+       }
+
+       return held;
+}
+
+static void unlock_all_bitmaps(struct mddev *mddev)
+{
+       struct md_cluster_info *cinfo = mddev->cluster_info;
+       int i;
+
+       /* release other node's bitmap lock if they are existed */
+       if (cinfo->other_bitmap_lockres) {
+               for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
+                       if (cinfo->other_bitmap_lockres[i]) {
+                               dlm_unlock_sync(cinfo->other_bitmap_lockres[i]);
+                               lockres_free(cinfo->other_bitmap_lockres[i]);
+                       }
+               }
+               kfree(cinfo->other_bitmap_lockres);
+       }
 }
 
 static int gather_bitmaps(struct md_rdev *rdev)
@@ -1034,6 +1168,8 @@ static struct md_cluster_operations cluster_ops = {
        .new_disk_ack = new_disk_ack,
        .remove_disk = remove_disk,
        .gather_bitmaps = gather_bitmaps,
+       .lock_all_bitmaps = lock_all_bitmaps,
+       .unlock_all_bitmaps = unlock_all_bitmaps,
 };
 
 static int __init cluster_init(void)
This page took 0.027883 seconds and 5 git commands to generate.