2 * Copyright (C) 2015, SUSE
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2, or (at your option)
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
17 #include "md-cluster.h"
21 struct dlm_lock_resource
{
24 char *name
; /* lock name. */
25 uint32_t flags
; /* flags to pass to dlm_lock() */
26 struct completion completion
; /* completion for synchronized locking */
27 void (*bast
)(void *arg
, int mode
); /* blocking AST function pointer*/
28 struct mddev
*mddev
; /* pointing back to mddev. */
35 struct list_head list
;
43 struct md_cluster_info
{
44 /* dlm lock space and resources for clustered raid. */
45 dlm_lockspace_t
*lockspace
;
47 struct completion completion
;
48 struct dlm_lock_resource
*sb_lock
;
49 struct mutex sb_mutex
;
50 struct dlm_lock_resource
*bitmap_lockres
;
51 struct list_head suspend_list
;
52 spinlock_t suspend_lock
;
53 struct md_thread
*recovery_thread
;
54 unsigned long recovery_map
;
55 /* communication loc resources */
56 struct dlm_lock_resource
*ack_lockres
;
57 struct dlm_lock_resource
*message_lockres
;
58 struct dlm_lock_resource
*token_lockres
;
59 struct md_thread
*recv_thread
;
74 static void sync_ast(void *arg
)
76 struct dlm_lock_resource
*res
;
78 res
= (struct dlm_lock_resource
*) arg
;
79 complete(&res
->completion
);
82 static int dlm_lock_sync(struct dlm_lock_resource
*res
, int mode
)
86 init_completion(&res
->completion
);
87 ret
= dlm_lock(res
->ls
, mode
, &res
->lksb
,
88 res
->flags
, res
->name
, strlen(res
->name
),
89 0, sync_ast
, res
, res
->bast
);
92 wait_for_completion(&res
->completion
);
93 return res
->lksb
.sb_status
;
96 static int dlm_unlock_sync(struct dlm_lock_resource
*res
)
98 return dlm_lock_sync(res
, DLM_LOCK_NL
);
101 static struct dlm_lock_resource
*lockres_init(struct mddev
*mddev
,
102 char *name
, void (*bastfn
)(void *arg
, int mode
), int with_lvb
)
104 struct dlm_lock_resource
*res
= NULL
;
106 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
108 res
= kzalloc(sizeof(struct dlm_lock_resource
), GFP_KERNEL
);
111 res
->ls
= cinfo
->lockspace
;
113 namelen
= strlen(name
);
114 res
->name
= kzalloc(namelen
+ 1, GFP_KERNEL
);
116 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name
);
119 strlcpy(res
->name
, name
, namelen
+ 1);
121 res
->lksb
.sb_lvbptr
= kzalloc(LVB_SIZE
, GFP_KERNEL
);
122 if (!res
->lksb
.sb_lvbptr
) {
123 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name
);
126 res
->flags
= DLM_LKF_VALBLK
;
132 res
->flags
|= DLM_LKF_EXPEDITE
;
134 ret
= dlm_lock_sync(res
, DLM_LOCK_NL
);
136 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name
);
139 res
->flags
&= ~DLM_LKF_EXPEDITE
;
140 res
->flags
|= DLM_LKF_CONVERT
;
144 kfree(res
->lksb
.sb_lvbptr
);
150 static void lockres_free(struct dlm_lock_resource
*res
)
155 init_completion(&res
->completion
);
156 dlm_unlock(res
->ls
, res
->lksb
.sb_lkid
, 0, &res
->lksb
, res
);
157 wait_for_completion(&res
->completion
);
160 kfree(res
->lksb
.sb_lvbptr
);
164 static char *pretty_uuid(char *dest
, char *src
)
168 for (i
= 0; i
< 16; i
++) {
169 if (i
== 4 || i
== 6 || i
== 8 || i
== 10)
170 len
+= sprintf(dest
+ len
, "-");
171 len
+= sprintf(dest
+ len
, "%02x", (__u8
)src
[i
]);
176 static void add_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
,
177 sector_t lo
, sector_t hi
)
179 struct resync_info
*ri
;
181 ri
= (struct resync_info
*)lockres
->lksb
.sb_lvbptr
;
182 ri
->lo
= cpu_to_le64(lo
);
183 ri
->hi
= cpu_to_le64(hi
);
186 static struct suspend_info
*read_resync_info(struct mddev
*mddev
, struct dlm_lock_resource
*lockres
)
188 struct resync_info ri
;
189 struct suspend_info
*s
= NULL
;
192 dlm_lock_sync(lockres
, DLM_LOCK_CR
);
193 memcpy(&ri
, lockres
->lksb
.sb_lvbptr
, sizeof(struct resync_info
));
194 hi
= le64_to_cpu(ri
.hi
);
196 s
= kzalloc(sizeof(struct suspend_info
), GFP_KERNEL
);
200 s
->lo
= le64_to_cpu(ri
.lo
);
202 dlm_unlock_sync(lockres
);
207 void recover_bitmaps(struct md_thread
*thread
)
209 struct mddev
*mddev
= thread
->mddev
;
210 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
211 struct dlm_lock_resource
*bm_lockres
;
214 struct suspend_info
*s
, *tmp
;
217 while (cinfo
->recovery_map
) {
218 slot
= fls64((u64
)cinfo
->recovery_map
) - 1;
220 /* Clear suspend_area associated with the bitmap */
221 spin_lock_irq(&cinfo
->suspend_lock
);
222 list_for_each_entry_safe(s
, tmp
, &cinfo
->suspend_list
, list
)
223 if (slot
== s
->slot
) {
227 spin_unlock_irq(&cinfo
->suspend_lock
);
229 snprintf(str
, 64, "bitmap%04d", slot
);
230 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
232 pr_err("md-cluster: Cannot initialize bitmaps\n");
236 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
238 pr_err("md-cluster: Could not DLM lock %s: %d\n",
242 ret
= bitmap_copy_from_slot(mddev
, slot
, &lo
, &hi
);
244 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot
);
248 /* TODO:Wait for current resync to get over */
249 set_bit(MD_RECOVERY_NEEDED
, &mddev
->recovery
);
250 if (lo
< mddev
->recovery_cp
)
251 mddev
->recovery_cp
= lo
;
252 md_check_recovery(mddev
);
255 dlm_unlock_sync(bm_lockres
);
257 clear_bit(slot
, &cinfo
->recovery_map
);
261 static void recover_prep(void *arg
)
265 static void recover_slot(void *arg
, struct dlm_slot
*slot
)
267 struct mddev
*mddev
= arg
;
268 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
270 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
271 mddev
->bitmap_info
.cluster_name
,
272 slot
->nodeid
, slot
->slot
,
274 set_bit(slot
->slot
- 1, &cinfo
->recovery_map
);
275 if (!cinfo
->recovery_thread
) {
276 cinfo
->recovery_thread
= md_register_thread(recover_bitmaps
,
278 if (!cinfo
->recovery_thread
) {
279 pr_warn("md-cluster: Could not create recovery thread\n");
283 md_wakeup_thread(cinfo
->recovery_thread
);
286 static void recover_done(void *arg
, struct dlm_slot
*slots
,
287 int num_slots
, int our_slot
,
290 struct mddev
*mddev
= arg
;
291 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
293 cinfo
->slot_number
= our_slot
;
294 complete(&cinfo
->completion
);
297 static const struct dlm_lockspace_ops md_ls_ops
= {
298 .recover_prep
= recover_prep
,
299 .recover_slot
= recover_slot
,
300 .recover_done
= recover_done
,
304 * The BAST function for the ack lock resource
305 * This function wakes up the receive thread in
306 * order to receive and process the message.
308 static void ack_bast(void *arg
, int mode
)
310 struct dlm_lock_resource
*res
= (struct dlm_lock_resource
*)arg
;
311 struct md_cluster_info
*cinfo
= res
->mddev
->cluster_info
;
313 if (mode
== DLM_LOCK_EX
)
314 md_wakeup_thread(cinfo
->recv_thread
);
317 static void process_recvd_msg(struct mddev
*mddev
, struct cluster_msg
*msg
)
320 case METADATA_UPDATED
:
321 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
322 __func__
, __LINE__
, msg
->slot
);
326 pr_info("%s: %d Received message: RESYNCING from %d\n",
327 __func__
, __LINE__
, msg
->slot
);
333 * thread for receiving message
335 static void recv_daemon(struct md_thread
*thread
)
337 struct md_cluster_info
*cinfo
= thread
->mddev
->cluster_info
;
338 struct dlm_lock_resource
*ack_lockres
= cinfo
->ack_lockres
;
339 struct dlm_lock_resource
*message_lockres
= cinfo
->message_lockres
;
340 struct cluster_msg msg
;
342 /*get CR on Message*/
343 if (dlm_lock_sync(message_lockres
, DLM_LOCK_CR
)) {
344 pr_err("md/raid1:failed to get CR on MESSAGE\n");
348 /* read lvb and wake up thread to process this message_lockres */
349 memcpy(&msg
, message_lockres
->lksb
.sb_lvbptr
, sizeof(struct cluster_msg
));
350 process_recvd_msg(thread
->mddev
, &msg
);
352 /*release CR on ack_lockres*/
353 dlm_unlock_sync(ack_lockres
);
354 /*up-convert to EX on message_lockres*/
355 dlm_lock_sync(message_lockres
, DLM_LOCK_EX
);
356 /*get CR on ack_lockres again*/
357 dlm_lock_sync(ack_lockres
, DLM_LOCK_CR
);
358 /*release CR on message_lockres*/
359 dlm_unlock_sync(message_lockres
);
363 * Takes the lock on the TOKEN lock resource so no other
364 * node can communicate while the operation is underway.
366 static int lock_comm(struct md_cluster_info
*cinfo
)
370 error
= dlm_lock_sync(cinfo
->token_lockres
, DLM_LOCK_EX
);
372 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
373 __func__
, __LINE__
, error
);
377 static void unlock_comm(struct md_cluster_info
*cinfo
)
379 dlm_unlock_sync(cinfo
->token_lockres
);
383 * This function performs the actual sending of the message. This function is
384 * usually called after performing the encompassing operation
386 * 1. Grabs the message lockresource in EX mode
387 * 2. Copies the message to the message LVB
388 * 3. Downconverts message lockresource to CR
389 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
390 * and the other nodes read the message. The thread will wait here until all other
391 * nodes have released ack lock resource.
392 * 5. Downconvert ack lockresource to CR
394 static int __sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
397 int slot
= cinfo
->slot_number
- 1;
399 cmsg
->slot
= cpu_to_le32(slot
);
400 /*get EX on Message*/
401 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_EX
);
403 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error
);
407 memcpy(cinfo
->message_lockres
->lksb
.sb_lvbptr
, (void *)cmsg
,
408 sizeof(struct cluster_msg
));
409 /*down-convert EX to CR on Message*/
410 error
= dlm_lock_sync(cinfo
->message_lockres
, DLM_LOCK_CR
);
412 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
417 /*up-convert CR to EX on Ack*/
418 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_EX
);
420 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
425 /*down-convert EX to CR on Ack*/
426 error
= dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
);
428 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
434 dlm_unlock_sync(cinfo
->message_lockres
);
439 static int sendmsg(struct md_cluster_info
*cinfo
, struct cluster_msg
*cmsg
)
444 ret
= __sendmsg(cinfo
, cmsg
);
449 static int gather_all_resync_info(struct mddev
*mddev
, int total_slots
)
451 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
453 struct dlm_lock_resource
*bm_lockres
;
454 struct suspend_info
*s
;
458 for (i
= 0; i
< total_slots
; i
++) {
459 memset(str
, '\0', 64);
460 snprintf(str
, 64, "bitmap%04d", i
);
461 bm_lockres
= lockres_init(mddev
, str
, NULL
, 1);
464 if (i
== (cinfo
->slot_number
- 1))
467 bm_lockres
->flags
|= DLM_LKF_NOQUEUE
;
468 ret
= dlm_lock_sync(bm_lockres
, DLM_LOCK_PW
);
469 if (ret
== -EAGAIN
) {
470 memset(bm_lockres
->lksb
.sb_lvbptr
, '\0', LVB_SIZE
);
471 s
= read_resync_info(mddev
, bm_lockres
);
473 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
475 (unsigned long long) s
->lo
,
476 (unsigned long long) s
->hi
, i
);
477 spin_lock_irq(&cinfo
->suspend_lock
);
479 list_add(&s
->list
, &cinfo
->suspend_list
);
480 spin_unlock_irq(&cinfo
->suspend_lock
);
483 lockres_free(bm_lockres
);
488 /* TODO: Read the disk bitmap sb and check if it needs recovery */
489 dlm_unlock_sync(bm_lockres
);
490 lockres_free(bm_lockres
);
496 static int join(struct mddev
*mddev
, int nodes
)
498 struct md_cluster_info
*cinfo
;
502 if (!try_module_get(THIS_MODULE
))
505 cinfo
= kzalloc(sizeof(struct md_cluster_info
), GFP_KERNEL
);
509 init_completion(&cinfo
->completion
);
511 mutex_init(&cinfo
->sb_mutex
);
512 mddev
->cluster_info
= cinfo
;
515 pretty_uuid(str
, mddev
->uuid
);
516 ret
= dlm_new_lockspace(str
, mddev
->bitmap_info
.cluster_name
,
517 DLM_LSFL_FS
, LVB_SIZE
,
518 &md_ls_ops
, mddev
, &ops_rv
, &cinfo
->lockspace
);
521 wait_for_completion(&cinfo
->completion
);
522 if (nodes
<= cinfo
->slot_number
) {
523 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo
->slot_number
- 1,
528 cinfo
->sb_lock
= lockres_init(mddev
, "cmd-super",
530 if (!cinfo
->sb_lock
) {
534 /* Initiate the communication resources */
536 cinfo
->recv_thread
= md_register_thread(recv_daemon
, mddev
, "cluster_recv");
537 if (!cinfo
->recv_thread
) {
538 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
541 cinfo
->message_lockres
= lockres_init(mddev
, "message", NULL
, 1);
542 if (!cinfo
->message_lockres
)
544 cinfo
->token_lockres
= lockres_init(mddev
, "token", NULL
, 0);
545 if (!cinfo
->token_lockres
)
547 cinfo
->ack_lockres
= lockres_init(mddev
, "ack", ack_bast
, 0);
548 if (!cinfo
->ack_lockres
)
550 /* get sync CR lock on ACK. */
551 if (dlm_lock_sync(cinfo
->ack_lockres
, DLM_LOCK_CR
))
552 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
555 pr_info("md-cluster: Joined cluster %s slot %d\n", str
, cinfo
->slot_number
);
556 snprintf(str
, 64, "bitmap%04d", cinfo
->slot_number
- 1);
557 cinfo
->bitmap_lockres
= lockres_init(mddev
, str
, NULL
, 1);
558 if (!cinfo
->bitmap_lockres
)
560 if (dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
)) {
561 pr_err("Failed to get bitmap lock\n");
566 INIT_LIST_HEAD(&cinfo
->suspend_list
);
567 spin_lock_init(&cinfo
->suspend_lock
);
569 ret
= gather_all_resync_info(mddev
, nodes
);
575 lockres_free(cinfo
->message_lockres
);
576 lockres_free(cinfo
->token_lockres
);
577 lockres_free(cinfo
->ack_lockres
);
578 lockres_free(cinfo
->bitmap_lockres
);
579 lockres_free(cinfo
->sb_lock
);
580 if (cinfo
->lockspace
)
581 dlm_release_lockspace(cinfo
->lockspace
, 2);
582 mddev
->cluster_info
= NULL
;
584 module_put(THIS_MODULE
);
588 static int leave(struct mddev
*mddev
)
590 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
594 md_unregister_thread(&cinfo
->recovery_thread
);
595 md_unregister_thread(&cinfo
->recv_thread
);
596 lockres_free(cinfo
->message_lockres
);
597 lockres_free(cinfo
->token_lockres
);
598 lockres_free(cinfo
->ack_lockres
);
599 lockres_free(cinfo
->sb_lock
);
600 lockres_free(cinfo
->bitmap_lockres
);
601 dlm_release_lockspace(cinfo
->lockspace
, 2);
605 /* slot_number(): Returns the MD slot number to use
606 * DLM starts the slot numbers from 1, wheras cluster-md
607 * wants the number to be from zero, so we deduct one
609 static int slot_number(struct mddev
*mddev
)
611 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
613 return cinfo
->slot_number
- 1;
616 static void resync_info_update(struct mddev
*mddev
, sector_t lo
, sector_t hi
)
618 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
620 add_resync_info(mddev
, cinfo
->bitmap_lockres
, lo
, hi
);
621 /* Re-acquire the lock to refresh LVB */
622 dlm_lock_sync(cinfo
->bitmap_lockres
, DLM_LOCK_PW
);
625 static int metadata_update_start(struct mddev
*mddev
)
627 return lock_comm(mddev
->cluster_info
);
630 static int metadata_update_finish(struct mddev
*mddev
)
632 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
633 struct cluster_msg cmsg
;
636 memset(&cmsg
, 0, sizeof(cmsg
));
637 cmsg
.type
= cpu_to_le32(METADATA_UPDATED
);
638 ret
= __sendmsg(cinfo
, &cmsg
);
643 static int metadata_update_cancel(struct mddev
*mddev
)
645 struct md_cluster_info
*cinfo
= mddev
->cluster_info
;
647 return dlm_unlock_sync(cinfo
->token_lockres
);
650 static struct md_cluster_operations cluster_ops
= {
653 .slot_number
= slot_number
,
654 .resync_info_update
= resync_info_update
,
655 .metadata_update_start
= metadata_update_start
,
656 .metadata_update_finish
= metadata_update_finish
,
657 .metadata_update_cancel
= metadata_update_cancel
,
660 static int __init
cluster_init(void)
662 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
663 pr_info("Registering Cluster MD functions\n");
664 register_md_cluster_operations(&cluster_ops
, THIS_MODULE
);
668 static void cluster_exit(void)
670 unregister_md_cluster_operations();
673 module_init(cluster_init
);
674 module_exit(cluster_exit
);
675 MODULE_LICENSE("GPL");
676 MODULE_DESCRIPTION("Clustering support for MD");