Merge branch 'master'
[deliverable/linux.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24
25 #ifdef CONFIG_DLM_DEBUG
26 int dlm_create_debug_file(struct dlm_ls *ls);
27 void dlm_delete_debug_file(struct dlm_ls *ls);
28 #else
29 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
30 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
31 #endif
32
33 static int ls_count;
34 static struct mutex ls_lock;
35 static struct list_head lslist;
36 static spinlock_t lslist_lock;
37 static struct task_struct * scand_task;
38
39
40 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
41 {
42 ssize_t ret = len;
43 int n = simple_strtol(buf, NULL, 0);
44
45 switch (n) {
46 case 0:
47 dlm_ls_stop(ls);
48 break;
49 case 1:
50 dlm_ls_start(ls);
51 break;
52 default:
53 ret = -EINVAL;
54 }
55 return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60 ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62 wake_up(&ls->ls_uevent_wait);
63 return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68 return sprintf(buf, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73 ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74 return len;
75 }
76
77 struct dlm_attr {
78 struct attribute attr;
79 ssize_t (*show)(struct dlm_ls *, char *);
80 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
81 };
82
83 static struct dlm_attr dlm_attr_control = {
84 .attr = {.name = "control", .mode = S_IWUSR},
85 .store = dlm_control_store
86 };
87
88 static struct dlm_attr dlm_attr_event = {
89 .attr = {.name = "event_done", .mode = S_IWUSR},
90 .store = dlm_event_store
91 };
92
93 static struct dlm_attr dlm_attr_id = {
94 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
95 .show = dlm_id_show,
96 .store = dlm_id_store
97 };
98
99 static struct attribute *dlm_attrs[] = {
100 &dlm_attr_control.attr,
101 &dlm_attr_event.attr,
102 &dlm_attr_id.attr,
103 NULL,
104 };
105
106 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
107 char *buf)
108 {
109 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
110 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
111 return a->show ? a->show(ls, buf) : 0;
112 }
113
114 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
115 const char *buf, size_t len)
116 {
117 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
118 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
119 return a->store ? a->store(ls, buf, len) : len;
120 }
121
122 static struct sysfs_ops dlm_attr_ops = {
123 .show = dlm_attr_show,
124 .store = dlm_attr_store,
125 };
126
127 static struct kobj_type dlm_ktype = {
128 .default_attrs = dlm_attrs,
129 .sysfs_ops = &dlm_attr_ops,
130 };
131
132 static struct kset dlm_kset = {
133 .subsys = &kernel_subsys,
134 .kobj = {.name = "dlm",},
135 .ktype = &dlm_ktype,
136 };
137
138 static int kobject_setup(struct dlm_ls *ls)
139 {
140 char lsname[DLM_LOCKSPACE_LEN];
141 int error;
142
143 memset(lsname, 0, DLM_LOCKSPACE_LEN);
144 snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
145
146 error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
147 if (error)
148 return error;
149
150 ls->ls_kobj.kset = &dlm_kset;
151 ls->ls_kobj.ktype = &dlm_ktype;
152 return 0;
153 }
154
155 static int do_uevent(struct dlm_ls *ls, int in)
156 {
157 int error;
158
159 if (in)
160 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
161 else
162 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
163
164 error = wait_event_interruptible(ls->ls_uevent_wait,
165 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
166 if (error)
167 goto out;
168
169 error = ls->ls_uevent_result;
170 out:
171 return error;
172 }
173
174
175 int dlm_lockspace_init(void)
176 {
177 int error;
178
179 ls_count = 0;
180 mutex_init(&ls_lock);
181 INIT_LIST_HEAD(&lslist);
182 spin_lock_init(&lslist_lock);
183
184 error = kset_register(&dlm_kset);
185 if (error)
186 printk("dlm_lockspace_init: cannot register kset %d\n", error);
187 return error;
188 }
189
190 void dlm_lockspace_exit(void)
191 {
192 kset_unregister(&dlm_kset);
193 }
194
195 static int dlm_scand(void *data)
196 {
197 struct dlm_ls *ls;
198
199 while (!kthread_should_stop()) {
200 list_for_each_entry(ls, &lslist, ls_list)
201 dlm_scan_rsbs(ls);
202 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
203 }
204 return 0;
205 }
206
207 static int dlm_scand_start(void)
208 {
209 struct task_struct *p;
210 int error = 0;
211
212 p = kthread_run(dlm_scand, NULL, "dlm_scand");
213 if (IS_ERR(p))
214 error = PTR_ERR(p);
215 else
216 scand_task = p;
217 return error;
218 }
219
220 static void dlm_scand_stop(void)
221 {
222 kthread_stop(scand_task);
223 }
224
225 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
226 {
227 struct dlm_ls *ls;
228
229 spin_lock(&lslist_lock);
230
231 list_for_each_entry(ls, &lslist, ls_list) {
232 if (ls->ls_namelen == namelen &&
233 memcmp(ls->ls_name, name, namelen) == 0)
234 goto out;
235 }
236 ls = NULL;
237 out:
238 spin_unlock(&lslist_lock);
239 return ls;
240 }
241
242 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
243 {
244 struct dlm_ls *ls;
245
246 spin_lock(&lslist_lock);
247
248 list_for_each_entry(ls, &lslist, ls_list) {
249 if (ls->ls_global_id == id) {
250 ls->ls_count++;
251 goto out;
252 }
253 }
254 ls = NULL;
255 out:
256 spin_unlock(&lslist_lock);
257 return ls;
258 }
259
260 struct dlm_ls *dlm_find_lockspace_local(void *id)
261 {
262 struct dlm_ls *ls = id;
263
264 spin_lock(&lslist_lock);
265 ls->ls_count++;
266 spin_unlock(&lslist_lock);
267 return ls;
268 }
269
270 void dlm_put_lockspace(struct dlm_ls *ls)
271 {
272 spin_lock(&lslist_lock);
273 ls->ls_count--;
274 spin_unlock(&lslist_lock);
275 }
276
277 static void remove_lockspace(struct dlm_ls *ls)
278 {
279 for (;;) {
280 spin_lock(&lslist_lock);
281 if (ls->ls_count == 0) {
282 list_del(&ls->ls_list);
283 spin_unlock(&lslist_lock);
284 return;
285 }
286 spin_unlock(&lslist_lock);
287 ssleep(1);
288 }
289 }
290
291 static int threads_start(void)
292 {
293 int error;
294
295 /* Thread which process lock requests for all lockspace's */
296 error = dlm_astd_start();
297 if (error) {
298 log_print("cannot start dlm_astd thread %d", error);
299 goto fail;
300 }
301
302 error = dlm_scand_start();
303 if (error) {
304 log_print("cannot start dlm_scand thread %d", error);
305 goto astd_fail;
306 }
307
308 /* Thread for sending/receiving messages for all lockspace's */
309 error = dlm_lowcomms_start();
310 if (error) {
311 log_print("cannot start dlm lowcomms %d", error);
312 goto scand_fail;
313 }
314
315 return 0;
316
317 scand_fail:
318 dlm_scand_stop();
319 astd_fail:
320 dlm_astd_stop();
321 fail:
322 return error;
323 }
324
325 static void threads_stop(void)
326 {
327 dlm_scand_stop();
328 dlm_lowcomms_stop();
329 dlm_astd_stop();
330 }
331
332 static int new_lockspace(char *name, int namelen, void **lockspace,
333 uint32_t flags, int lvblen)
334 {
335 struct dlm_ls *ls;
336 int i, size, error = -ENOMEM;
337
338 if (namelen > DLM_LOCKSPACE_LEN)
339 return -EINVAL;
340
341 if (!lvblen || (lvblen % 8))
342 return -EINVAL;
343
344 if (!try_module_get(THIS_MODULE))
345 return -EINVAL;
346
347 ls = dlm_find_lockspace_name(name, namelen);
348 if (ls) {
349 *lockspace = ls;
350 module_put(THIS_MODULE);
351 return -EEXIST;
352 }
353
354 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
355 if (!ls)
356 goto out;
357 memcpy(ls->ls_name, name, namelen);
358 ls->ls_namelen = namelen;
359 ls->ls_exflags = flags;
360 ls->ls_lvblen = lvblen;
361 ls->ls_count = 0;
362 ls->ls_flags = 0;
363
364 size = dlm_config.rsbtbl_size;
365 ls->ls_rsbtbl_size = size;
366
367 ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
368 if (!ls->ls_rsbtbl)
369 goto out_lsfree;
370 for (i = 0; i < size; i++) {
371 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
372 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
373 rwlock_init(&ls->ls_rsbtbl[i].lock);
374 }
375
376 size = dlm_config.lkbtbl_size;
377 ls->ls_lkbtbl_size = size;
378
379 ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
380 if (!ls->ls_lkbtbl)
381 goto out_rsbfree;
382 for (i = 0; i < size; i++) {
383 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
384 rwlock_init(&ls->ls_lkbtbl[i].lock);
385 ls->ls_lkbtbl[i].counter = 1;
386 }
387
388 size = dlm_config.dirtbl_size;
389 ls->ls_dirtbl_size = size;
390
391 ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
392 if (!ls->ls_dirtbl)
393 goto out_lkbfree;
394 for (i = 0; i < size; i++) {
395 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
396 rwlock_init(&ls->ls_dirtbl[i].lock);
397 }
398
399 INIT_LIST_HEAD(&ls->ls_waiters);
400 mutex_init(&ls->ls_waiters_mutex);
401
402 INIT_LIST_HEAD(&ls->ls_nodes);
403 INIT_LIST_HEAD(&ls->ls_nodes_gone);
404 ls->ls_num_nodes = 0;
405 ls->ls_low_nodeid = 0;
406 ls->ls_total_weight = 0;
407 ls->ls_node_array = NULL;
408
409 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
410 ls->ls_stub_rsb.res_ls = ls;
411
412 ls->ls_debug_dentry = NULL;
413
414 init_waitqueue_head(&ls->ls_uevent_wait);
415 ls->ls_uevent_result = 0;
416
417 ls->ls_recoverd_task = NULL;
418 mutex_init(&ls->ls_recoverd_active);
419 spin_lock_init(&ls->ls_recover_lock);
420 ls->ls_recover_status = 0;
421 ls->ls_recover_seq = 0;
422 ls->ls_recover_args = NULL;
423 init_rwsem(&ls->ls_in_recovery);
424 INIT_LIST_HEAD(&ls->ls_requestqueue);
425 mutex_init(&ls->ls_requestqueue_mutex);
426
427 ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
428 if (!ls->ls_recover_buf)
429 goto out_dirfree;
430
431 INIT_LIST_HEAD(&ls->ls_recover_list);
432 spin_lock_init(&ls->ls_recover_list_lock);
433 ls->ls_recover_list_count = 0;
434 init_waitqueue_head(&ls->ls_wait_general);
435 INIT_LIST_HEAD(&ls->ls_root_list);
436 init_rwsem(&ls->ls_root_sem);
437
438 down_write(&ls->ls_in_recovery);
439
440 error = dlm_recoverd_start(ls);
441 if (error) {
442 log_error(ls, "can't start dlm_recoverd %d", error);
443 goto out_rcomfree;
444 }
445
446 spin_lock(&lslist_lock);
447 list_add(&ls->ls_list, &lslist);
448 spin_unlock(&lslist_lock);
449
450 dlm_create_debug_file(ls);
451
452 error = kobject_setup(ls);
453 if (error)
454 goto out_del;
455
456 error = kobject_register(&ls->ls_kobj);
457 if (error)
458 goto out_del;
459
460 error = do_uevent(ls, 1);
461 if (error)
462 goto out_unreg;
463
464 *lockspace = ls;
465 return 0;
466
467 out_unreg:
468 kobject_unregister(&ls->ls_kobj);
469 out_del:
470 dlm_delete_debug_file(ls);
471 spin_lock(&lslist_lock);
472 list_del(&ls->ls_list);
473 spin_unlock(&lslist_lock);
474 dlm_recoverd_stop(ls);
475 out_rcomfree:
476 kfree(ls->ls_recover_buf);
477 out_dirfree:
478 kfree(ls->ls_dirtbl);
479 out_lkbfree:
480 kfree(ls->ls_lkbtbl);
481 out_rsbfree:
482 kfree(ls->ls_rsbtbl);
483 out_lsfree:
484 kfree(ls);
485 out:
486 module_put(THIS_MODULE);
487 return error;
488 }
489
490 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
491 uint32_t flags, int lvblen)
492 {
493 int error = 0;
494
495 mutex_lock(&ls_lock);
496 if (!ls_count)
497 error = threads_start();
498 if (error)
499 goto out;
500
501 error = new_lockspace(name, namelen, lockspace, flags, lvblen);
502 if (!error)
503 ls_count++;
504 out:
505 mutex_unlock(&ls_lock);
506 return error;
507 }
508
509 /* Return 1 if the lockspace still has active remote locks,
510 * 2 if the lockspace still has active local locks.
511 */
512 static int lockspace_busy(struct dlm_ls *ls)
513 {
514 int i, lkb_found = 0;
515 struct dlm_lkb *lkb;
516
517 /* NOTE: We check the lockidtbl here rather than the resource table.
518 This is because there may be LKBs queued as ASTs that have been
519 unlinked from their RSBs and are pending deletion once the AST has
520 been delivered */
521
522 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
523 read_lock(&ls->ls_lkbtbl[i].lock);
524 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
525 lkb_found = 1;
526 list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
527 lkb_idtbl_list) {
528 if (!lkb->lkb_nodeid) {
529 read_unlock(&ls->ls_lkbtbl[i].lock);
530 return 2;
531 }
532 }
533 }
534 read_unlock(&ls->ls_lkbtbl[i].lock);
535 }
536 return lkb_found;
537 }
538
539 static int release_lockspace(struct dlm_ls *ls, int force)
540 {
541 struct dlm_lkb *lkb;
542 struct dlm_rsb *rsb;
543 struct list_head *head;
544 int i;
545 int busy = lockspace_busy(ls);
546
547 if (busy > force)
548 return -EBUSY;
549
550 if (force < 3)
551 do_uevent(ls, 0);
552
553 dlm_recoverd_stop(ls);
554
555 remove_lockspace(ls);
556
557 dlm_delete_debug_file(ls);
558
559 dlm_astd_suspend();
560
561 kfree(ls->ls_recover_buf);
562
563 /*
564 * Free direntry structs.
565 */
566
567 dlm_dir_clear(ls);
568 kfree(ls->ls_dirtbl);
569
570 /*
571 * Free all lkb's on lkbtbl[] lists.
572 */
573
574 for (i = 0; i < ls->ls_lkbtbl_size; i++) {
575 head = &ls->ls_lkbtbl[i].list;
576 while (!list_empty(head)) {
577 lkb = list_entry(head->next, struct dlm_lkb,
578 lkb_idtbl_list);
579
580 list_del(&lkb->lkb_idtbl_list);
581
582 dlm_del_ast(lkb);
583
584 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
585 free_lvb(lkb->lkb_lvbptr);
586
587 free_lkb(lkb);
588 }
589 }
590 dlm_astd_resume();
591
592 kfree(ls->ls_lkbtbl);
593
594 /*
595 * Free all rsb's on rsbtbl[] lists
596 */
597
598 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
599 head = &ls->ls_rsbtbl[i].list;
600 while (!list_empty(head)) {
601 rsb = list_entry(head->next, struct dlm_rsb,
602 res_hashchain);
603
604 list_del(&rsb->res_hashchain);
605 free_rsb(rsb);
606 }
607
608 head = &ls->ls_rsbtbl[i].toss;
609 while (!list_empty(head)) {
610 rsb = list_entry(head->next, struct dlm_rsb,
611 res_hashchain);
612 list_del(&rsb->res_hashchain);
613 free_rsb(rsb);
614 }
615 }
616
617 kfree(ls->ls_rsbtbl);
618
619 /*
620 * Free structures on any other lists
621 */
622
623 kfree(ls->ls_recover_args);
624 dlm_clear_free_entries(ls);
625 dlm_clear_members(ls);
626 dlm_clear_members_gone(ls);
627 kfree(ls->ls_node_array);
628 kobject_unregister(&ls->ls_kobj);
629 kfree(ls);
630
631 mutex_lock(&ls_lock);
632 ls_count--;
633 if (!ls_count)
634 threads_stop();
635 mutex_unlock(&ls_lock);
636
637 module_put(THIS_MODULE);
638 return 0;
639 }
640
641 /*
642 * Called when a system has released all its locks and is not going to use the
643 * lockspace any longer. We free everything we're managing for this lockspace.
644 * Remaining nodes will go through the recovery process as if we'd died. The
645 * lockspace must continue to function as usual, participating in recoveries,
646 * until this returns.
647 *
648 * Force has 4 possible values:
649 * 0 - don't destroy locksapce if it has any LKBs
650 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
651 * 2 - destroy lockspace regardless of LKBs
652 * 3 - destroy lockspace as part of a forced shutdown
653 */
654
655 int dlm_release_lockspace(void *lockspace, int force)
656 {
657 struct dlm_ls *ls;
658
659 ls = dlm_find_lockspace_local(lockspace);
660 if (!ls)
661 return -EINVAL;
662 dlm_put_lockspace(ls);
663 return release_lockspace(ls, force);
664 }
665
This page took 0.074266 seconds and 5 git commands to generate.