2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
18 #include "md-cluster.h"
21 #define NEW_DEV_TIMEOUT 5000
23 struct dlm_lock_resource
{
26 char *name
; /* lock name. */
27 uint32_t flags
; /* flags to pass to dlm_lock() */
28 struct completion completion
; /* completion for synchronized locking */
29 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
30 struct mddev
*mddev
; /* pointing back to mddev. */
38 struct list_head list
;
46 /* md_cluster_info flags */
47 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
48 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
49 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
52 struct md_cluster_info
{
53 /* dlm lock space and resources for clustered raid. */
54 dlm_lockspace_t
*lockspace
;
56 struct completion completion
;
57 struct dlm_lock_resource
*bitmap_lockres
;
58 struct dlm_lock_resource
*resync_lockres
;
59 struct list_head suspend_list
;
60 spinlock_t suspend_lock
;
61 struct md_thread
*recovery_thread
;
62 unsigned long recovery_map
;
63 /* communication loc resources */
64 struct dlm_lock_resource
*ack_lockres
;
65 struct dlm_lock_resource
*message_lockres
;
66 struct dlm_lock_resource
*token_lockres
;
67 struct dlm_lock_resource
*no_new_dev_lockres
;
68 struct md_thread
*recv_thread
;
69 struct completion newdisk_completion
;
85 /* TODO: Unionize this for smaller footprint */
92 static void sync_ast(void *arg
)
94 struct dlm_lock_resource
*res
;
97 complete(&res
->completion
);
100 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
104 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
105 res
->flags
, res
->name
, strlen(res
->name
),
106 0, sync_ast
, res
, res
->bast
);
109 wait_for_completion(&res
->completion
);
110 if (res
->lksb
.sb_status
== 0)
112 return res
->lksb
.sb_status
;
115 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
117 return dlm_lock_sync(res
, DLM_LOCK_NL
);
120 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
121 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
123 struct dlm_lock_resource
*res
= NULL
;
125 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
127 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
130 init_completion(&res
->completion
);
131 res
->ls
= cinfo
->lockspace
;
133 res
->mode
= DLM_LOCK_IV
;
134 namelen
= strlen(name
);
135 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
137 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
140 strlcpy(res
->name
, name
, namelen
+ 1);
142 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
143 if (!res
->lksb
.sb_lvbptr
) {
144 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
147 res
->flags
= DLM_LKF_VALBLK
;
153 res
->flags
|= DLM_LKF_EXPEDITE
;
155 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
157 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
160 res
->flags
&= ~DLM_LKF_EXPEDITE
;
161 res
->flags
|= DLM_LKF_CONVERT
;
165 kfree(res
->lksb
.sb_lvbptr
);
171 static void lockres_free(struct dlm_lock_resource
*res
)
178 /* cancel a lock request or a conversion request that is blocked */
179 res
->flags
|= DLM_LKF_CANCEL
;
181 ret
= dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
182 if (unlikely(ret
!= 0)) {
183 pr_info("%s: failed to unlock %s return %d\n", __func__
, res
->name
, ret
);
185 /* if a lock conversion is cancelled, then the lock is put
186 * back to grant queue, need to ensure it is unlocked */
187 if (ret
== -DLM_ECANCEL
)
190 res
->flags
&= ~DLM_LKF_CANCEL
;
191 wait_for_completion(&res
->completion
);
194 kfree(res
->lksb
.sb_lvbptr
);
198 static void add_resync_info(struct dlm_lock_resource
*lockres
,
199 sector_t lo
, sector_t hi
)
201 struct resync_info
*ri
;
203 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
204 ri
->lo
= cpu_to_le64(lo
);
205 ri
->hi
= cpu_to_le64(hi
);
208 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
210 struct resync_info ri
;
211 struct suspend_info
*s
= NULL
;
214 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
215 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
216 hi
= le64_to_cpu(ri
.hi
);
218 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
222 s
->lo
= le64_to_cpu(ri
.lo
);
224 dlm_unlock_sync(lockres
);
229 static void recover_bitmaps(struct md_thread
*thread
)
231 struct mddev
*mddev
= thread
->mddev
;
232 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
233 struct dlm_lock_resource
*bm_lockres
;
236 struct suspend_info
*s
, *tmp
;
239 while (cinfo
->recovery_map
) {
240 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
242 /* Clear suspend_area associated with the bitmap */
243 spin_lock_irq(&cinfo
->suspend_lock
);
244 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
245 if (slot
== s
->slot
) {
249 spin_unlock_irq(&cinfo
->suspend_lock
);
251 snprintf(str
, 64, "bitmap%04d", slot
);
252 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
254 pr_err("md-cluster: Cannot initialize bitmaps\n");
258 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
260 pr_err("md-cluster: Could not DLM lock %s: %d\n",
264 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
, true);
266 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
270 /* TODO:Wait for current resync to get over */
271 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
272 if (lo
< mddev
->recovery_cp
)
273 mddev
->recovery_cp
= lo
;
274 md_check_recovery(mddev
);
277 dlm_unlock_sync(bm_lockres
);
279 clear_bit(slot
, &cinfo
->recovery_map
);
283 static void recover_prep(void *arg
)
285 struct mddev
*mddev
= arg
;
286 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
287 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
290 static void __recover_slot(struct mddev
*mddev
, int slot
)
292 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
294 set_bit(slot
, &cinfo
->recovery_map
);
295 if (!cinfo
->recovery_thread
) {
296 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
298 if (!cinfo
->recovery_thread
) {
299 pr_warn("md-cluster: Could not create recovery thread\n");
303 md_wakeup_thread(cinfo
->recovery_thread
);
306 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
308 struct mddev
*mddev
= arg
;
309 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
311 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
312 mddev
->bitmap_info
.cluster_name
,
313 slot
->nodeid
, slot
->slot
,
315 /* deduct one since dlm slot starts from one while the num of
316 * cluster-md begins with 0 */
317 __recover_slot(mddev
, slot
->slot
- 1);
320 static void recover_done(void *arg
, struct dlm_slot
*slots
,
321 int num_slots
, int our_slot
,
324 struct mddev
*mddev
= arg
;
325 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
327 cinfo
->slot_number
= our_slot
;
328 /* completion is only need to be complete when node join cluster,
329 * it doesn't need to run during another node's failure */
330 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
)) {
331 complete(&cinfo
->completion
);
332 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
334 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
);
337 /* the ops is called when node join the cluster, and do lock recovery
338 * if node failure occurs */
339 static const struct dlm_lockspace_ops md_ls_ops
= {
340 .recover_prep
= recover_prep
,
341 .recover_slot
= recover_slot
,
342 .recover_done
= recover_done
,
346 * The BAST function for the ack lock resource
347 * This function wakes up the receive thread in
348 * order to receive and process the message.
350 static void ack_bast(void *arg
, int mode
)
352 struct dlm_lock_resource
*res
= arg
;
353 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
355 if (mode
== DLM_LOCK_EX
)
356 md_wakeup_thread(cinfo
->recv_thread
);
359 static void __remove_suspend_info(struct md_cluster_info
*cinfo
, int slot
)
361 struct suspend_info
*s
, *tmp
;
363 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
364 if (slot
== s
->slot
) {
371 static void remove_suspend_info(struct mddev
*mddev
, int slot
)
373 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
374 spin_lock_irq(&cinfo
->suspend_lock
);
375 __remove_suspend_info(cinfo
, slot
);
376 spin_unlock_irq(&cinfo
->suspend_lock
);
377 mddev
->pers
->quiesce(mddev
, 2);
381 static void process_suspend_info(struct mddev
*mddev
,
382 int slot
, sector_t lo
, sector_t hi
)
384 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
385 struct suspend_info
*s
;
388 remove_suspend_info(mddev
, slot
);
389 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
390 md_wakeup_thread(mddev
->thread
);
393 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
399 mddev
->pers
->quiesce(mddev
, 1);
400 mddev
->pers
->quiesce(mddev
, 0);
401 spin_lock_irq(&cinfo
->suspend_lock
);
402 /* Remove existing entry (if exists) before adding */
403 __remove_suspend_info(cinfo
, slot
);
404 list_add(&s
->list
, &cinfo
->suspend_list
);
405 spin_unlock_irq(&cinfo
->suspend_lock
);
406 mddev
->pers
->quiesce(mddev
, 2);
409 static void process_add_new_disk(struct mddev
*mddev
, struct cluster_msg
*cmsg
)
412 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
413 char event_name
[] = "EVENT=ADD_DEVICE";
415 char *envp
[] = {event_name
, disk_uuid
, raid_slot
, NULL
};
418 len
= snprintf(disk_uuid
, 64, "DEVICE_UUID=");
419 sprintf(disk_uuid
+ len
, "%pU", cmsg
->uuid
);
420 snprintf(raid_slot
, 16, "RAID_DISK=%d", le32_to_cpu(cmsg
->raid_slot
));
421 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__
, __LINE__
, disk_uuid
, raid_slot
);
422 init_completion(&cinfo
->newdisk_completion
);
423 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
424 kobject_uevent_env(&disk_to_dev(mddev
->gendisk
)->kobj
, KOBJ_CHANGE
, envp
);
425 wait_for_completion_timeout(&cinfo
->newdisk_completion
,
427 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
);
431 static void process_metadata_update(struct mddev
*mddev
, struct cluster_msg
*msg
)
433 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
434 md_reload_sb(mddev
, le32_to_cpu(msg
->raid_slot
));
435 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
438 static void process_remove_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
440 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
441 le32_to_cpu(msg
->raid_slot
));
444 set_bit(ClusterRemove
, &rdev
->flags
);
445 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
446 md_wakeup_thread(mddev
->thread
);
449 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
450 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
453 static void process_readd_disk(struct mddev
*mddev
, struct cluster_msg
*msg
)
455 struct md_rdev
*rdev
= md_find_rdev_nr_rcu(mddev
,
456 le32_to_cpu(msg
->raid_slot
));
458 if (rdev
&& test_bit(Faulty
, &rdev
->flags
))
459 clear_bit(Faulty
, &rdev
->flags
);
461 pr_warn("%s: %d Could not find disk(%d) which is faulty",
462 __func__
, __LINE__
, le32_to_cpu(msg
->raid_slot
));
465 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
467 if (WARN(mddev
->cluster_info
->slot_number
- 1 == le32_to_cpu(msg
->slot
),
468 "node %d received it's own msg\n", le32_to_cpu(msg
->slot
)))
470 switch (le32_to_cpu(msg
->type
)) {
471 case METADATA_UPDATED
:
472 process_metadata_update(mddev
, msg
);
475 process_suspend_info(mddev
, le32_to_cpu(msg
->slot
),
476 le64_to_cpu(msg
->low
),
477 le64_to_cpu(msg
->high
));
480 process_add_new_disk(mddev
, msg
);
483 process_remove_disk(mddev
, msg
);
486 process_readd_disk(mddev
, msg
);
488 case BITMAP_NEEDS_SYNC
:
489 __recover_slot(mddev
, le32_to_cpu(msg
->slot
));
492 pr_warn("%s:%d Received unknown message from %d\n",
493 __func__
, __LINE__
, msg
->slot
);
498 * thread for receiving message
500 static void recv_daemon(struct md_thread
*thread
)
502 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
503 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
504 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
505 struct cluster_msg msg
;
508 /*get CR on Message*/
509 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
510 pr_err("md/raid1:failed to get CR on MESSAGE\n");
514 /* read lvb and wake up thread to process this message_lockres */
515 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
516 process_recvd_msg(thread
->mddev
, &msg
);
518 /*release CR on ack_lockres*/
519 ret
= dlm_unlock_sync(ack_lockres
);
520 if (unlikely(ret
!= 0))
521 pr_info("unlock ack failed return %d\n", ret
);
522 /*up-convert to PR on message_lockres*/
523 ret
= dlm_lock_sync(message_lockres
, DLM_LOCK_PR
);
524 if (unlikely(ret
!= 0))
525 pr_info("lock PR on msg failed return %d\n", ret
);
526 /*get CR on ack_lockres again*/
527 ret
= dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
528 if (unlikely(ret
!= 0))
529 pr_info("lock CR on ack failed return %d\n", ret
);
530 /*release CR on message_lockres*/
531 ret
= dlm_unlock_sync(message_lockres
);
532 if (unlikely(ret
!= 0))
533 pr_info("unlock msg failed return %d\n", ret
);
537 * Takes the lock on the TOKEN lock resource so no other
538 * node can communicate while the operation is underway.
539 * If called again, and the TOKEN lock is alread in EX mode
540 * return success. However, care must be taken that unlock_comm()
541 * is called only once.
543 static int lock_comm(struct md_cluster_info
*cinfo
)
547 if (cinfo
->token_lockres
->mode
== DLM_LOCK_EX
)
550 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
552 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
553 __func__
, __LINE__
, error
);
557 static void unlock_comm(struct md_cluster_info
*cinfo
)
559 WARN_ON(cinfo
->token_lockres
->mode
!= DLM_LOCK_EX
);
560 dlm_unlock_sync(cinfo
->token_lockres
);
564 * This function performs the actual sending of the message. This function is
565 * usually called after performing the encompassing operation
567 * 1. Grabs the message lockresource in EX mode
568 * 2. Copies the message to the message LVB
569 * 3. Downconverts message lockresource to CW
570 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
571 * and the other nodes read the message. The thread will wait here until all other
572 * nodes have released ack lock resource.
573 * 5. Downconvert ack lockresource to CR
575 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
578 int slot
= cinfo
->slot_number
- 1;
580 cmsg
->slot
= cpu_to_le32(slot
);
581 /*get EX on Message*/
582 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
584 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
588 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
589 sizeof(struct cluster_msg
));
590 /*down-convert EX to CW on Message*/
591 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CW
);
593 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
598 /*up-convert CR to EX on Ack*/
599 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
601 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
606 /*down-convert EX to CR on Ack*/
607 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
609 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
615 error
= dlm_unlock_sync(cinfo
->message_lockres
);
616 if (unlikely(error
!= 0)) {
617 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
619 /* in case the message can't be released due to some reason */
626 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
631 ret
= __sendmsg(cinfo
, cmsg
);
636 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
638 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
640 struct dlm_lock_resource
*bm_lockres
;
641 struct suspend_info
*s
;
646 for (i
= 0; i
< total_slots
; i
++) {
647 memset(str
, '\0', 64);
648 snprintf(str
, 64, "bitmap%04d", i
);
649 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
652 if (i
== (cinfo
->slot_number
- 1))
655 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
656 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
657 if (ret
== -EAGAIN
) {
658 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
659 s
= read_resync_info(mddev
, bm_lockres
);
661 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
663 (unsigned long long) s
->lo
,
664 (unsigned long long) s
->hi
, i
);
665 spin_lock_irq(&cinfo
->suspend_lock
);
667 list_add(&s
->list
, &cinfo
->suspend_list
);
668 spin_unlock_irq(&cinfo
->suspend_lock
);
671 lockres_free(bm_lockres
);
675 lockres_free(bm_lockres
);
679 /* Read the disk bitmap sb and check if it needs recovery */
680 ret
= bitmap_copy_from_slot(mddev
, i
, &lo
, &hi
, false);
682 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i
);
683 lockres_free(bm_lockres
);
686 if ((hi
> 0) && (lo
< mddev
->recovery_cp
)) {
687 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
688 mddev
->recovery_cp
= lo
;
689 md_check_recovery(mddev
);
692 dlm_unlock_sync(bm_lockres
);
693 lockres_free(bm_lockres
);
699 static int join(struct mddev
*mddev
, int nodes
)
701 struct md_cluster_info
*cinfo
;
705 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
709 INIT_LIST_HEAD(&cinfo
->suspend_list
);
710 spin_lock_init(&cinfo
->suspend_lock
);
711 init_completion(&cinfo
->completion
);
712 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER
, &cinfo
->state
);
714 mddev
->cluster_info
= cinfo
;
717 sprintf(str
, "%pU", mddev
->uuid
);
718 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
719 DLM_LSFL_FS
, LVB_SIZE
,
720 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
723 wait_for_completion(&cinfo
->completion
);
724 if (nodes
< cinfo
->slot_number
) {
725 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
726 cinfo
->slot_number
, nodes
);
730 /* Initiate the communication resources */
732 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
733 if (!cinfo
->recv_thread
) {
734 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
737 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
738 if (!cinfo
->message_lockres
)
740 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
741 if (!cinfo
->token_lockres
)
743 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
744 if (!cinfo
->ack_lockres
)
746 cinfo
->no_new_dev_lockres
= lockres_init(mddev
, "no-new-dev", NULL
, 0);
747 if (!cinfo
->no_new_dev_lockres
)
750 /* get sync CR lock on ACK. */
751 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
752 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
754 /* get sync CR lock on no-new-dev. */
755 if (dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
))
756 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret
);
759 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
760 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
761 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
762 if (!cinfo
->bitmap_lockres
)
764 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
765 pr_err("Failed to get bitmap lock\n");
770 cinfo
->resync_lockres
= lockres_init(mddev
, "resync", NULL
, 0);
771 if (!cinfo
->resync_lockres
)
774 ret
= gather_all_resync_info(mddev
, nodes
);
780 lockres_free(cinfo
->message_lockres
);
781 lockres_free(cinfo
->token_lockres
);
782 lockres_free(cinfo
->ack_lockres
);
783 lockres_free(cinfo
->no_new_dev_lockres
);
784 lockres_free(cinfo
->resync_lockres
);
785 lockres_free(cinfo
->bitmap_lockres
);
786 if (cinfo
->lockspace
)
787 dlm_release_lockspace(cinfo
->lockspace
, 2);
788 mddev
->cluster_info
= NULL
;
793 static void resync_bitmap(struct mddev
*mddev
)
795 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
796 struct cluster_msg cmsg
= {0};
799 cmsg
.type
= cpu_to_le32(BITMAP_NEEDS_SYNC
);
800 err
= sendmsg(cinfo
, &cmsg
);
802 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
803 __func__
, __LINE__
, err
);
806 static int leave(struct mddev
*mddev
)
808 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
813 /* BITMAP_NEEDS_SYNC message should be sent when node
814 * is leaving the cluster with dirty bitmap, also we
815 * can only deliver it when dlm connection is available */
816 if (cinfo
->slot_number
> 0 && mddev
->recovery_cp
!= MaxSector
)
817 resync_bitmap(mddev
);
819 md_unregister_thread(&cinfo
->recovery_thread
);
820 md_unregister_thread(&cinfo
->recv_thread
);
821 lockres_free(cinfo
->message_lockres
);
822 lockres_free(cinfo
->token_lockres
);
823 lockres_free(cinfo
->ack_lockres
);
824 lockres_free(cinfo
->no_new_dev_lockres
);
825 lockres_free(cinfo
->bitmap_lockres
);
826 dlm_release_lockspace(cinfo
->lockspace
, 2);
830 /* slot_number(): Returns the MD slot number to use
831 * DLM starts the slot numbers from 1, wheras cluster-md
832 * wants the number to be from zero, so we deduct one
834 static int slot_number(struct mddev
*mddev
)
836 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
838 return cinfo
->slot_number
- 1;
841 static int metadata_update_start(struct mddev
*mddev
)
843 return lock_comm(mddev
->cluster_info
);
846 static int metadata_update_finish(struct mddev
*mddev
)
848 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
849 struct cluster_msg cmsg
;
850 struct md_rdev
*rdev
;
854 memset(&cmsg
, 0, sizeof(cmsg
));
855 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
856 /* Pick up a good active device number to send.
858 rdev_for_each(rdev
, mddev
)
859 if (rdev
->raid_disk
> -1 && !test_bit(Faulty
, &rdev
->flags
)) {
860 raid_slot
= rdev
->desc_nr
;
863 if (raid_slot
>= 0) {
864 cmsg
.raid_slot
= cpu_to_le32(raid_slot
);
865 ret
= __sendmsg(cinfo
, &cmsg
);
867 pr_warn("md-cluster: No good device id found to send\n");
872 static void metadata_update_cancel(struct mddev
*mddev
)
874 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
878 static int resync_start(struct mddev
*mddev
)
880 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
881 cinfo
->resync_lockres
->flags
|= DLM_LKF_NOQUEUE
;
882 return dlm_lock_sync(cinfo
->resync_lockres
, DLM_LOCK_EX
);
885 static int resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
887 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
888 struct resync_info ri
;
889 struct cluster_msg cmsg
= {0};
891 /* do not send zero again, if we have sent before */
893 memcpy(&ri
, cinfo
->bitmap_lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
894 if (le64_to_cpu(ri
.hi
) == 0)
898 add_resync_info(cinfo
->bitmap_lockres
, lo
, hi
);
899 /* Re-acquire the lock to refresh LVB */
900 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
901 cmsg
.type
= cpu_to_le32(RESYNCING
);
902 cmsg
.low
= cpu_to_le64(lo
);
903 cmsg
.high
= cpu_to_le64(hi
);
905 return sendmsg(cinfo
, &cmsg
);
908 static int resync_finish(struct mddev
*mddev
)
910 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
911 cinfo
->resync_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
912 dlm_unlock_sync(cinfo
->resync_lockres
);
913 return resync_info_update(mddev
, 0, 0);
916 static int area_resyncing(struct mddev
*mddev
, int direction
,
917 sector_t lo
, sector_t hi
)
919 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
921 struct suspend_info
*s
;
923 if ((direction
== READ
) &&
924 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING
, &cinfo
->state
))
927 spin_lock_irq(&cinfo
->suspend_lock
);
928 if (list_empty(&cinfo
->suspend_list
))
930 list_for_each_entry(s
, &cinfo
->suspend_list
, list
)
931 if (hi
> s
->lo
&& lo
< s
->hi
) {
936 spin_unlock_irq(&cinfo
->suspend_lock
);
940 /* add_new_disk() - initiates a disk add
941 * However, if this fails before writing md_update_sb(),
942 * add_new_disk_cancel() must be called to release token lock
944 static int add_new_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
946 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
947 struct cluster_msg cmsg
;
949 struct mdp_superblock_1
*sb
= page_address(rdev
->sb_page
);
950 char *uuid
= sb
->device_uuid
;
952 memset(&cmsg
, 0, sizeof(cmsg
));
953 cmsg
.type
= cpu_to_le32(NEWDISK
);
954 memcpy(cmsg
.uuid
, uuid
, 16);
955 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
957 ret
= __sendmsg(cinfo
, &cmsg
);
960 cinfo
->no_new_dev_lockres
->flags
|= DLM_LKF_NOQUEUE
;
961 ret
= dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_EX
);
962 cinfo
->no_new_dev_lockres
->flags
&= ~DLM_LKF_NOQUEUE
;
963 /* Some node does not "see" the device */
969 dlm_lock_sync(cinfo
->no_new_dev_lockres
, DLM_LOCK_CR
);
973 static void add_new_disk_cancel(struct mddev
*mddev
)
975 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
979 static int new_disk_ack(struct mddev
*mddev
, bool ack
)
981 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
983 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK
, &cinfo
->state
)) {
984 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev
));
989 dlm_unlock_sync(cinfo
->no_new_dev_lockres
);
990 complete(&cinfo
->newdisk_completion
);
994 static int remove_disk(struct mddev
*mddev
, struct md_rdev
*rdev
)
996 struct cluster_msg cmsg
= {0};
997 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
998 cmsg
.type
= cpu_to_le32(REMOVE
);
999 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
1000 return __sendmsg(cinfo
, &cmsg
);
1003 static int gather_bitmaps(struct md_rdev
*rdev
)
1007 struct cluster_msg cmsg
= {0};
1008 struct mddev
*mddev
= rdev
->mddev
;
1009 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
1011 cmsg
.type
= cpu_to_le32(RE_ADD
);
1012 cmsg
.raid_slot
= cpu_to_le32(rdev
->desc_nr
);
1013 err
= sendmsg(cinfo
, &cmsg
);
1017 for (sn
= 0; sn
< mddev
->bitmap_info
.nodes
; sn
++) {
1018 if (sn
== (cinfo
->slot_number
- 1))
1020 err
= bitmap_copy_from_slot(mddev
, sn
, &lo
, &hi
, false);
1022 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn
);
1025 if ((hi
> 0) && (lo
< mddev
->recovery_cp
))
1026 mddev
->recovery_cp
= lo
;
1032 static struct md_cluster_operations cluster_ops
= {
1035 .slot_number
= slot_number
,
1036 .resync_start
= resync_start
,
1037 .resync_finish
= resync_finish
,
1038 .resync_info_update
= resync_info_update
,
1039 .metadata_update_start
= metadata_update_start
,
1040 .metadata_update_finish
= metadata_update_finish
,
1041 .metadata_update_cancel
= metadata_update_cancel
,
1042 .area_resyncing
= area_resyncing
,
1043 .add_new_disk
= add_new_disk
,
1044 .add_new_disk_cancel
= add_new_disk_cancel
,
1045 .new_disk_ack
= new_disk_ack
,
1046 .remove_disk
= remove_disk
,
1047 .gather_bitmaps
= gather_bitmaps
,
1050 static int __init
cluster_init(void)
1052 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
1053 pr_info("Registering Cluster MD functions\n");
1054 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
1058 static void cluster_exit(void)
1060 unregister_md_cluster_operations();
1063 module_init(cluster_init
);
1064 module_exit(cluster_exit
);
1065 MODULE_AUTHOR("SUSE");
1066 MODULE_LICENSE("GPL");
1067 MODULE_DESCRIPTION("Clustering support for MD");