Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux-2.6
[deliverable/linux.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmdomain.c
5 *
6 * defines domain join / leave apis
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/init.h>
32 #include <linux/spinlock.h>
33 #include <linux/delay.h>
34 #include <linux/err.h>
35 #include <linux/debugfs.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43 #include "dlmdomain.h"
44 #include "dlmdebug.h"
45
46 #include "dlmver.h"
47
48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49 #include "cluster/masklog.h"
50
51 /*
52 * ocfs2 node maps are array of long int, which limits to send them freely
53 * across the wire due to endianness issues. To workaround this, we convert
54 * long ints to byte arrays. Following 3 routines are helper functions to
55 * set/test/copy bits within those array of bytes
56 */
57 static inline void byte_set_bit(u8 nr, u8 map[])
58 {
59 map[nr >> 3] |= (1UL << (nr & 7));
60 }
61
62 static inline int byte_test_bit(u8 nr, u8 map[])
63 {
64 return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65 }
66
67 static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 unsigned int sz)
69 {
70 unsigned int nn;
71
72 if (!sz)
73 return;
74
75 memset(dmap, 0, ((sz + 7) >> 3));
76 for (nn = 0 ; nn < sz; nn++)
77 if (test_bit(nn, smap))
78 byte_set_bit(nn, dmap);
79 }
80
81 static void dlm_free_pagevec(void **vec, int pages)
82 {
83 while (pages--)
84 free_page((unsigned long)vec[pages]);
85 kfree(vec);
86 }
87
88 static void **dlm_alloc_pagevec(int pages)
89 {
90 void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
91 int i;
92
93 if (!vec)
94 return NULL;
95
96 for (i = 0; i < pages; i++)
97 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
98 goto out_free;
99
100 mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
101 pages, (unsigned long)DLM_HASH_PAGES,
102 (unsigned long)DLM_BUCKETS_PER_PAGE);
103 return vec;
104 out_free:
105 dlm_free_pagevec(vec, i);
106 return NULL;
107 }
108
109 /*
110 *
111 * spinlock lock ordering: if multiple locks are needed, obey this ordering:
112 * dlm_domain_lock
113 * struct dlm_ctxt->spinlock
114 * struct dlm_lock_resource->spinlock
115 * struct dlm_ctxt->master_lock
116 * struct dlm_ctxt->ast_lock
117 * dlm_master_list_entry->spinlock
118 * dlm_lock->spinlock
119 *
120 */
121
122 DEFINE_SPINLOCK(dlm_domain_lock);
123 LIST_HEAD(dlm_domains);
124 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125
126 /*
127 * The supported protocol version for DLM communication. Running domains
128 * will have a negotiated version with the same major number and a minor
129 * number equal or smaller. The dlm_ctxt->dlm_locking_proto field should
130 * be used to determine what a running domain is actually using.
131 *
132 * New in version 1.1:
133 * - Message DLM_QUERY_REGION added to support global heartbeat
134 * - Message DLM_QUERY_NODEINFO added to allow online node removes
135 */
136 static const struct dlm_protocol_version dlm_protocol = {
137 .pv_major = 1,
138 .pv_minor = 1,
139 };
140
141 #define DLM_DOMAIN_BACKOFF_MS 200
142
143 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 void **ret_data);
145 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
146 void **ret_data);
147 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
148 void **ret_data);
149 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
150 void *data, void **ret_data);
151 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
152 void **ret_data);
153 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
154 struct dlm_protocol_version *request);
155
156 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
157
158 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
159 {
160 if (!hlist_unhashed(&lockres->hash_node)) {
161 hlist_del_init(&lockres->hash_node);
162 dlm_lockres_put(lockres);
163 }
164 }
165
166 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
167 struct dlm_lock_resource *res)
168 {
169 struct hlist_head *bucket;
170 struct qstr *q;
171
172 assert_spin_locked(&dlm->spinlock);
173
174 q = &res->lockname;
175 bucket = dlm_lockres_hash(dlm, q->hash);
176
177 /* get a reference for our hashtable */
178 dlm_lockres_get(res);
179
180 hlist_add_head(&res->hash_node, bucket);
181 }
182
183 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
184 const char *name,
185 unsigned int len,
186 unsigned int hash)
187 {
188 struct hlist_head *bucket;
189 struct hlist_node *list;
190
191 mlog_entry("%.*s\n", len, name);
192
193 assert_spin_locked(&dlm->spinlock);
194
195 bucket = dlm_lockres_hash(dlm, hash);
196
197 hlist_for_each(list, bucket) {
198 struct dlm_lock_resource *res = hlist_entry(list,
199 struct dlm_lock_resource, hash_node);
200 if (res->lockname.name[0] != name[0])
201 continue;
202 if (unlikely(res->lockname.len != len))
203 continue;
204 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
205 continue;
206 dlm_lockres_get(res);
207 return res;
208 }
209 return NULL;
210 }
211
212 /* intended to be called by functions which do not care about lock
213 * resources which are being purged (most net _handler functions).
214 * this will return NULL for any lock resource which is found but
215 * currently in the process of dropping its mastery reference.
216 * use __dlm_lookup_lockres_full when you need the lock resource
217 * regardless (e.g. dlm_get_lock_resource) */
218 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
219 const char *name,
220 unsigned int len,
221 unsigned int hash)
222 {
223 struct dlm_lock_resource *res = NULL;
224
225 mlog_entry("%.*s\n", len, name);
226
227 assert_spin_locked(&dlm->spinlock);
228
229 res = __dlm_lookup_lockres_full(dlm, name, len, hash);
230 if (res) {
231 spin_lock(&res->spinlock);
232 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
233 spin_unlock(&res->spinlock);
234 dlm_lockres_put(res);
235 return NULL;
236 }
237 spin_unlock(&res->spinlock);
238 }
239
240 return res;
241 }
242
243 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
244 const char *name,
245 unsigned int len)
246 {
247 struct dlm_lock_resource *res;
248 unsigned int hash = dlm_lockid_hash(name, len);
249
250 spin_lock(&dlm->spinlock);
251 res = __dlm_lookup_lockres(dlm, name, len, hash);
252 spin_unlock(&dlm->spinlock);
253 return res;
254 }
255
256 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
257 {
258 struct dlm_ctxt *tmp = NULL;
259 struct list_head *iter;
260
261 assert_spin_locked(&dlm_domain_lock);
262
263 /* tmp->name here is always NULL terminated,
264 * but domain may not be! */
265 list_for_each(iter, &dlm_domains) {
266 tmp = list_entry (iter, struct dlm_ctxt, list);
267 if (strlen(tmp->name) == len &&
268 memcmp(tmp->name, domain, len)==0)
269 break;
270 tmp = NULL;
271 }
272
273 return tmp;
274 }
275
276 /* For null terminated domain strings ONLY */
277 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
278 {
279 assert_spin_locked(&dlm_domain_lock);
280
281 return __dlm_lookup_domain_full(domain, strlen(domain));
282 }
283
284
285 /* returns true on one of two conditions:
286 * 1) the domain does not exist
287 * 2) the domain exists and it's state is "joined" */
288 static int dlm_wait_on_domain_helper(const char *domain)
289 {
290 int ret = 0;
291 struct dlm_ctxt *tmp = NULL;
292
293 spin_lock(&dlm_domain_lock);
294
295 tmp = __dlm_lookup_domain(domain);
296 if (!tmp)
297 ret = 1;
298 else if (tmp->dlm_state == DLM_CTXT_JOINED)
299 ret = 1;
300
301 spin_unlock(&dlm_domain_lock);
302 return ret;
303 }
304
305 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
306 {
307 dlm_destroy_debugfs_subroot(dlm);
308
309 if (dlm->lockres_hash)
310 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
311
312 if (dlm->master_hash)
313 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
314
315 if (dlm->name)
316 kfree(dlm->name);
317
318 kfree(dlm);
319 }
320
321 /* A little strange - this function will be called while holding
322 * dlm_domain_lock and is expected to be holding it on the way out. We
323 * will however drop and reacquire it multiple times */
324 static void dlm_ctxt_release(struct kref *kref)
325 {
326 struct dlm_ctxt *dlm;
327
328 dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
329
330 BUG_ON(dlm->num_joins);
331 BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
332
333 /* we may still be in the list if we hit an error during join. */
334 list_del_init(&dlm->list);
335
336 spin_unlock(&dlm_domain_lock);
337
338 mlog(0, "freeing memory from domain %s\n", dlm->name);
339
340 wake_up(&dlm_domain_events);
341
342 dlm_free_ctxt_mem(dlm);
343
344 spin_lock(&dlm_domain_lock);
345 }
346
347 void dlm_put(struct dlm_ctxt *dlm)
348 {
349 spin_lock(&dlm_domain_lock);
350 kref_put(&dlm->dlm_refs, dlm_ctxt_release);
351 spin_unlock(&dlm_domain_lock);
352 }
353
354 static void __dlm_get(struct dlm_ctxt *dlm)
355 {
356 kref_get(&dlm->dlm_refs);
357 }
358
359 /* given a questionable reference to a dlm object, gets a reference if
360 * it can find it in the list, otherwise returns NULL in which case
361 * you shouldn't trust your pointer. */
362 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
363 {
364 struct list_head *iter;
365 struct dlm_ctxt *target = NULL;
366
367 spin_lock(&dlm_domain_lock);
368
369 list_for_each(iter, &dlm_domains) {
370 target = list_entry (iter, struct dlm_ctxt, list);
371
372 if (target == dlm) {
373 __dlm_get(target);
374 break;
375 }
376
377 target = NULL;
378 }
379
380 spin_unlock(&dlm_domain_lock);
381
382 return target;
383 }
384
385 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
386 {
387 int ret;
388
389 spin_lock(&dlm_domain_lock);
390 ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
391 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
392 spin_unlock(&dlm_domain_lock);
393
394 return ret;
395 }
396
397 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
398 {
399 if (dlm->dlm_worker) {
400 flush_workqueue(dlm->dlm_worker);
401 destroy_workqueue(dlm->dlm_worker);
402 dlm->dlm_worker = NULL;
403 }
404 }
405
406 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
407 {
408 dlm_unregister_domain_handlers(dlm);
409 dlm_debug_shutdown(dlm);
410 dlm_complete_thread(dlm);
411 dlm_complete_recovery_thread(dlm);
412 dlm_destroy_dlm_worker(dlm);
413
414 /* We've left the domain. Now we can take ourselves out of the
415 * list and allow the kref stuff to help us free the
416 * memory. */
417 spin_lock(&dlm_domain_lock);
418 list_del_init(&dlm->list);
419 spin_unlock(&dlm_domain_lock);
420
421 /* Wake up anyone waiting for us to remove this domain */
422 wake_up(&dlm_domain_events);
423 }
424
425 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
426 {
427 int i, num, n, ret = 0;
428 struct dlm_lock_resource *res;
429 struct hlist_node *iter;
430 struct hlist_head *bucket;
431 int dropped;
432
433 mlog(0, "Migrating locks from domain %s\n", dlm->name);
434
435 num = 0;
436 spin_lock(&dlm->spinlock);
437 for (i = 0; i < DLM_HASH_BUCKETS; i++) {
438 redo_bucket:
439 n = 0;
440 bucket = dlm_lockres_hash(dlm, i);
441 iter = bucket->first;
442 while (iter) {
443 n++;
444 res = hlist_entry(iter, struct dlm_lock_resource,
445 hash_node);
446 dlm_lockres_get(res);
447 /* migrate, if necessary. this will drop the dlm
448 * spinlock and retake it if it does migration. */
449 dropped = dlm_empty_lockres(dlm, res);
450
451 spin_lock(&res->spinlock);
452 __dlm_lockres_calc_usage(dlm, res);
453 iter = res->hash_node.next;
454 spin_unlock(&res->spinlock);
455
456 dlm_lockres_put(res);
457
458 if (dropped)
459 goto redo_bucket;
460 }
461 cond_resched_lock(&dlm->spinlock);
462 num += n;
463 mlog(0, "%s: touched %d lockreses in bucket %d "
464 "(tot=%d)\n", dlm->name, n, i, num);
465 }
466 spin_unlock(&dlm->spinlock);
467 wake_up(&dlm->dlm_thread_wq);
468
469 /* let the dlm thread take care of purging, keep scanning until
470 * nothing remains in the hash */
471 if (num) {
472 mlog(0, "%s: %d lock resources in hash last pass\n",
473 dlm->name, num);
474 ret = -EAGAIN;
475 }
476 mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
477 return ret;
478 }
479
480 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
481 {
482 int ret;
483
484 spin_lock(&dlm->spinlock);
485 ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
486 spin_unlock(&dlm->spinlock);
487
488 return ret;
489 }
490
491 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
492 {
493 /* Yikes, a double spinlock! I need domain_lock for the dlm
494 * state and the dlm spinlock for join state... Sorry! */
495 again:
496 spin_lock(&dlm_domain_lock);
497 spin_lock(&dlm->spinlock);
498
499 if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
500 mlog(0, "Node %d is joining, we wait on it.\n",
501 dlm->joining_node);
502 spin_unlock(&dlm->spinlock);
503 spin_unlock(&dlm_domain_lock);
504
505 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
506 goto again;
507 }
508
509 dlm->dlm_state = DLM_CTXT_LEAVING;
510 spin_unlock(&dlm->spinlock);
511 spin_unlock(&dlm_domain_lock);
512 }
513
514 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
515 {
516 int node = -1;
517
518 assert_spin_locked(&dlm->spinlock);
519
520 printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name);
521
522 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
523 node + 1)) < O2NM_MAX_NODES) {
524 printk("%d ", node);
525 }
526 printk("\n");
527 }
528
529 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
530 void **ret_data)
531 {
532 struct dlm_ctxt *dlm = data;
533 unsigned int node;
534 struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
535
536 mlog_entry("%p %u %p", msg, len, data);
537
538 if (!dlm_grab(dlm))
539 return 0;
540
541 node = exit_msg->node_idx;
542
543 printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
544
545 spin_lock(&dlm->spinlock);
546 clear_bit(node, dlm->domain_map);
547 __dlm_print_nodes(dlm);
548
549 /* notify anything attached to the heartbeat events */
550 dlm_hb_event_notify_attached(dlm, node, 0);
551
552 spin_unlock(&dlm->spinlock);
553
554 dlm_put(dlm);
555
556 return 0;
557 }
558
559 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
560 unsigned int node)
561 {
562 int status;
563 struct dlm_exit_domain leave_msg;
564
565 mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
566 node, dlm->name, dlm->node_num);
567
568 memset(&leave_msg, 0, sizeof(leave_msg));
569 leave_msg.node_idx = dlm->node_num;
570
571 status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
572 &leave_msg, sizeof(leave_msg), node,
573 NULL);
574 if (status < 0)
575 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
576 "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node);
577 mlog(0, "status return %d from o2net_send_message\n", status);
578
579 return status;
580 }
581
582
583 static void dlm_leave_domain(struct dlm_ctxt *dlm)
584 {
585 int node, clear_node, status;
586
587 /* At this point we've migrated away all our locks and won't
588 * accept mastership of new ones. The dlm is responsible for
589 * almost nothing now. We make sure not to confuse any joining
590 * nodes and then commence shutdown procedure. */
591
592 spin_lock(&dlm->spinlock);
593 /* Clear ourselves from the domain map */
594 clear_bit(dlm->node_num, dlm->domain_map);
595 while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
596 0)) < O2NM_MAX_NODES) {
597 /* Drop the dlm spinlock. This is safe wrt the domain_map.
598 * -nodes cannot be added now as the
599 * query_join_handlers knows to respond with OK_NO_MAP
600 * -we catch the right network errors if a node is
601 * removed from the map while we're sending him the
602 * exit message. */
603 spin_unlock(&dlm->spinlock);
604
605 clear_node = 1;
606
607 status = dlm_send_one_domain_exit(dlm, node);
608 if (status < 0 &&
609 status != -ENOPROTOOPT &&
610 status != -ENOTCONN) {
611 mlog(ML_NOTICE, "Error %d sending domain exit message "
612 "to node %d\n", status, node);
613
614 /* Not sure what to do here but lets sleep for
615 * a bit in case this was a transient
616 * error... */
617 msleep(DLM_DOMAIN_BACKOFF_MS);
618 clear_node = 0;
619 }
620
621 spin_lock(&dlm->spinlock);
622 /* If we're not clearing the node bit then we intend
623 * to loop back around to try again. */
624 if (clear_node)
625 clear_bit(node, dlm->domain_map);
626 }
627 spin_unlock(&dlm->spinlock);
628 }
629
630 int dlm_joined(struct dlm_ctxt *dlm)
631 {
632 int ret = 0;
633
634 spin_lock(&dlm_domain_lock);
635
636 if (dlm->dlm_state == DLM_CTXT_JOINED)
637 ret = 1;
638
639 spin_unlock(&dlm_domain_lock);
640
641 return ret;
642 }
643
644 int dlm_shutting_down(struct dlm_ctxt *dlm)
645 {
646 int ret = 0;
647
648 spin_lock(&dlm_domain_lock);
649
650 if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
651 ret = 1;
652
653 spin_unlock(&dlm_domain_lock);
654
655 return ret;
656 }
657
658 void dlm_unregister_domain(struct dlm_ctxt *dlm)
659 {
660 int leave = 0;
661 struct dlm_lock_resource *res;
662
663 spin_lock(&dlm_domain_lock);
664 BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
665 BUG_ON(!dlm->num_joins);
666
667 dlm->num_joins--;
668 if (!dlm->num_joins) {
669 /* We mark it "in shutdown" now so new register
670 * requests wait until we've completely left the
671 * domain. Don't use DLM_CTXT_LEAVING yet as we still
672 * want new domain joins to communicate with us at
673 * least until we've completed migration of our
674 * resources. */
675 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
676 leave = 1;
677 }
678 spin_unlock(&dlm_domain_lock);
679
680 if (leave) {
681 mlog(0, "shutting down domain %s\n", dlm->name);
682
683 /* We changed dlm state, notify the thread */
684 dlm_kick_thread(dlm, NULL);
685
686 while (dlm_migrate_all_locks(dlm)) {
687 /* Give dlm_thread time to purge the lockres' */
688 msleep(500);
689 mlog(0, "%s: more migration to do\n", dlm->name);
690 }
691
692 /* This list should be empty. If not, print remaining lockres */
693 if (!list_empty(&dlm->tracking_list)) {
694 mlog(ML_ERROR, "Following lockres' are still on the "
695 "tracking list:\n");
696 list_for_each_entry(res, &dlm->tracking_list, tracking)
697 dlm_print_one_lock_resource(res);
698 }
699
700 dlm_mark_domain_leaving(dlm);
701 dlm_leave_domain(dlm);
702 dlm_force_free_mles(dlm);
703 dlm_complete_dlm_shutdown(dlm);
704 }
705 dlm_put(dlm);
706 }
707 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
708
709 static int dlm_query_join_proto_check(char *proto_type, int node,
710 struct dlm_protocol_version *ours,
711 struct dlm_protocol_version *request)
712 {
713 int rc;
714 struct dlm_protocol_version proto = *request;
715
716 if (!dlm_protocol_compare(ours, &proto)) {
717 mlog(0,
718 "node %u wanted to join with %s locking protocol "
719 "%u.%u, we respond with %u.%u\n",
720 node, proto_type,
721 request->pv_major,
722 request->pv_minor,
723 proto.pv_major, proto.pv_minor);
724 request->pv_minor = proto.pv_minor;
725 rc = 0;
726 } else {
727 mlog(ML_NOTICE,
728 "Node %u wanted to join with %s locking "
729 "protocol %u.%u, but we have %u.%u, disallowing\n",
730 node, proto_type,
731 request->pv_major,
732 request->pv_minor,
733 ours->pv_major,
734 ours->pv_minor);
735 rc = 1;
736 }
737
738 return rc;
739 }
740
741 /*
742 * struct dlm_query_join_packet is made up of four one-byte fields. They
743 * are effectively in big-endian order already. However, little-endian
744 * machines swap them before putting the packet on the wire (because
745 * query_join's response is a status, and that status is treated as a u32
746 * on the wire). Thus, a big-endian and little-endian machines will treat
747 * this structure differently.
748 *
749 * The solution is to have little-endian machines swap the structure when
750 * converting from the structure to the u32 representation. This will
751 * result in the structure having the correct format on the wire no matter
752 * the host endian format.
753 */
754 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
755 u32 *wire)
756 {
757 union dlm_query_join_response response;
758
759 response.packet = *packet;
760 *wire = cpu_to_be32(response.intval);
761 }
762
763 static void dlm_query_join_wire_to_packet(u32 wire,
764 struct dlm_query_join_packet *packet)
765 {
766 union dlm_query_join_response response;
767
768 response.intval = cpu_to_be32(wire);
769 *packet = response.packet;
770 }
771
772 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
773 void **ret_data)
774 {
775 struct dlm_query_join_request *query;
776 struct dlm_query_join_packet packet = {
777 .code = JOIN_DISALLOW,
778 };
779 struct dlm_ctxt *dlm = NULL;
780 u32 response;
781 u8 nodenum;
782
783 query = (struct dlm_query_join_request *) msg->buf;
784
785 mlog(0, "node %u wants to join domain %s\n", query->node_idx,
786 query->domain);
787
788 /*
789 * If heartbeat doesn't consider the node live, tell it
790 * to back off and try again. This gives heartbeat a chance
791 * to catch up.
792 */
793 if (!o2hb_check_node_heartbeating(query->node_idx)) {
794 mlog(0, "node %u is not in our live map yet\n",
795 query->node_idx);
796
797 packet.code = JOIN_DISALLOW;
798 goto respond;
799 }
800
801 packet.code = JOIN_OK_NO_MAP;
802
803 spin_lock(&dlm_domain_lock);
804 dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
805 if (!dlm)
806 goto unlock_respond;
807
808 /*
809 * There is a small window where the joining node may not see the
810 * node(s) that just left but still part of the cluster. DISALLOW
811 * join request if joining node has different node map.
812 */
813 nodenum=0;
814 while (nodenum < O2NM_MAX_NODES) {
815 if (test_bit(nodenum, dlm->domain_map)) {
816 if (!byte_test_bit(nodenum, query->node_map)) {
817 mlog(0, "disallow join as node %u does not "
818 "have node %u in its nodemap\n",
819 query->node_idx, nodenum);
820 packet.code = JOIN_DISALLOW;
821 goto unlock_respond;
822 }
823 }
824 nodenum++;
825 }
826
827 /* Once the dlm ctxt is marked as leaving then we don't want
828 * to be put in someone's domain map.
829 * Also, explicitly disallow joining at certain troublesome
830 * times (ie. during recovery). */
831 if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
832 int bit = query->node_idx;
833 spin_lock(&dlm->spinlock);
834
835 if (dlm->dlm_state == DLM_CTXT_NEW &&
836 dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
837 /*If this is a brand new context and we
838 * haven't started our join process yet, then
839 * the other node won the race. */
840 packet.code = JOIN_OK_NO_MAP;
841 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
842 /* Disallow parallel joins. */
843 packet.code = JOIN_DISALLOW;
844 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
845 mlog(0, "node %u trying to join, but recovery "
846 "is ongoing.\n", bit);
847 packet.code = JOIN_DISALLOW;
848 } else if (test_bit(bit, dlm->recovery_map)) {
849 mlog(0, "node %u trying to join, but it "
850 "still needs recovery.\n", bit);
851 packet.code = JOIN_DISALLOW;
852 } else if (test_bit(bit, dlm->domain_map)) {
853 mlog(0, "node %u trying to join, but it "
854 "is still in the domain! needs recovery?\n",
855 bit);
856 packet.code = JOIN_DISALLOW;
857 } else {
858 /* Alright we're fully a part of this domain
859 * so we keep some state as to who's joining
860 * and indicate to him that needs to be fixed
861 * up. */
862
863 /* Make sure we speak compatible locking protocols. */
864 if (dlm_query_join_proto_check("DLM", bit,
865 &dlm->dlm_locking_proto,
866 &query->dlm_proto)) {
867 packet.code = JOIN_PROTOCOL_MISMATCH;
868 } else if (dlm_query_join_proto_check("fs", bit,
869 &dlm->fs_locking_proto,
870 &query->fs_proto)) {
871 packet.code = JOIN_PROTOCOL_MISMATCH;
872 } else {
873 packet.dlm_minor = query->dlm_proto.pv_minor;
874 packet.fs_minor = query->fs_proto.pv_minor;
875 packet.code = JOIN_OK;
876 __dlm_set_joining_node(dlm, query->node_idx);
877 }
878 }
879
880 spin_unlock(&dlm->spinlock);
881 }
882 unlock_respond:
883 spin_unlock(&dlm_domain_lock);
884
885 respond:
886 mlog(0, "We respond with %u\n", packet.code);
887
888 dlm_query_join_packet_to_wire(&packet, &response);
889 return response;
890 }
891
892 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
893 void **ret_data)
894 {
895 struct dlm_assert_joined *assert;
896 struct dlm_ctxt *dlm = NULL;
897
898 assert = (struct dlm_assert_joined *) msg->buf;
899
900 mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
901 assert->domain);
902
903 spin_lock(&dlm_domain_lock);
904 dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
905 /* XXX should we consider no dlm ctxt an error? */
906 if (dlm) {
907 spin_lock(&dlm->spinlock);
908
909 /* Alright, this node has officially joined our
910 * domain. Set him in the map and clean up our
911 * leftover join state. */
912 BUG_ON(dlm->joining_node != assert->node_idx);
913 set_bit(assert->node_idx, dlm->domain_map);
914 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
915
916 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n",
917 assert->node_idx, dlm->name);
918 __dlm_print_nodes(dlm);
919
920 /* notify anything attached to the heartbeat events */
921 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
922
923 spin_unlock(&dlm->spinlock);
924 }
925 spin_unlock(&dlm_domain_lock);
926
927 return 0;
928 }
929
930 static int dlm_match_regions(struct dlm_ctxt *dlm,
931 struct dlm_query_region *qr)
932 {
933 char *local = NULL, *remote = qr->qr_regions;
934 char *l, *r;
935 int localnr, i, j, foundit;
936 int status = 0;
937
938 if (!o2hb_global_heartbeat_active()) {
939 if (qr->qr_numregions) {
940 mlog(ML_ERROR, "Domain %s: Joining node %d has global "
941 "heartbeat enabled but local node %d does not\n",
942 qr->qr_domain, qr->qr_node, dlm->node_num);
943 status = -EINVAL;
944 }
945 goto bail;
946 }
947
948 if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
949 mlog(ML_ERROR, "Domain %s: Local node %d has global "
950 "heartbeat enabled but joining node %d does not\n",
951 qr->qr_domain, dlm->node_num, qr->qr_node);
952 status = -EINVAL;
953 goto bail;
954 }
955
956 r = remote;
957 for (i = 0; i < qr->qr_numregions; ++i) {
958 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
959 r += O2HB_MAX_REGION_NAME_LEN;
960 }
961
962 local = kmalloc(sizeof(qr->qr_regions), GFP_ATOMIC);
963 if (!local) {
964 status = -ENOMEM;
965 goto bail;
966 }
967
968 localnr = o2hb_get_all_regions(local, O2NM_MAX_REGIONS);
969
970 /* compare local regions with remote */
971 l = local;
972 for (i = 0; i < localnr; ++i) {
973 foundit = 0;
974 r = remote;
975 for (j = 0; j <= qr->qr_numregions; ++j) {
976 if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
977 foundit = 1;
978 break;
979 }
980 r += O2HB_MAX_REGION_NAME_LEN;
981 }
982 if (!foundit) {
983 status = -EINVAL;
984 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
985 "in local node %d but not in joining node %d\n",
986 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
987 dlm->node_num, qr->qr_node);
988 goto bail;
989 }
990 l += O2HB_MAX_REGION_NAME_LEN;
991 }
992
993 /* compare remote with local regions */
994 r = remote;
995 for (i = 0; i < qr->qr_numregions; ++i) {
996 foundit = 0;
997 l = local;
998 for (j = 0; j < localnr; ++j) {
999 if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1000 foundit = 1;
1001 break;
1002 }
1003 l += O2HB_MAX_REGION_NAME_LEN;
1004 }
1005 if (!foundit) {
1006 status = -EINVAL;
1007 mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1008 "in joining node %d but not in local node %d\n",
1009 qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1010 qr->qr_node, dlm->node_num);
1011 goto bail;
1012 }
1013 r += O2HB_MAX_REGION_NAME_LEN;
1014 }
1015
1016 bail:
1017 kfree(local);
1018
1019 return status;
1020 }
1021
1022 static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1023 {
1024 struct dlm_query_region *qr = NULL;
1025 int status, ret = 0, i;
1026 char *p;
1027
1028 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1029 goto bail;
1030
1031 qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1032 if (!qr) {
1033 ret = -ENOMEM;
1034 mlog_errno(ret);
1035 goto bail;
1036 }
1037
1038 qr->qr_node = dlm->node_num;
1039 qr->qr_namelen = strlen(dlm->name);
1040 memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1041 /* if local hb, the numregions will be zero */
1042 if (o2hb_global_heartbeat_active())
1043 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1044 O2NM_MAX_REGIONS);
1045
1046 p = qr->qr_regions;
1047 for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1048 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1049
1050 i = -1;
1051 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1052 i + 1)) < O2NM_MAX_NODES) {
1053 if (i == dlm->node_num)
1054 continue;
1055
1056 mlog(0, "Sending regions to node %d\n", i);
1057
1058 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1059 sizeof(struct dlm_query_region),
1060 i, &status);
1061 if (ret >= 0)
1062 ret = status;
1063 if (ret) {
1064 mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1065 ret, i);
1066 break;
1067 }
1068 }
1069
1070 bail:
1071 kfree(qr);
1072 return ret;
1073 }
1074
1075 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1076 void *data, void **ret_data)
1077 {
1078 struct dlm_query_region *qr;
1079 struct dlm_ctxt *dlm = NULL;
1080 int status = 0;
1081 int locked = 0;
1082
1083 qr = (struct dlm_query_region *) msg->buf;
1084
1085 mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1086 qr->qr_domain);
1087
1088 status = -EINVAL;
1089
1090 spin_lock(&dlm_domain_lock);
1091 dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1092 if (!dlm) {
1093 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1094 "before join domain\n", qr->qr_node, qr->qr_domain);
1095 goto bail;
1096 }
1097
1098 spin_lock(&dlm->spinlock);
1099 locked = 1;
1100 if (dlm->joining_node != qr->qr_node) {
1101 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1102 "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1103 dlm->joining_node);
1104 goto bail;
1105 }
1106
1107 /* Support for global heartbeat was added in 1.1 */
1108 if (dlm->dlm_locking_proto.pv_major == 1 &&
1109 dlm->dlm_locking_proto.pv_minor == 0) {
1110 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1111 "but active dlm protocol is %d.%d\n", qr->qr_node,
1112 qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1113 dlm->dlm_locking_proto.pv_minor);
1114 goto bail;
1115 }
1116
1117 status = dlm_match_regions(dlm, qr);
1118
1119 bail:
1120 if (locked)
1121 spin_unlock(&dlm->spinlock);
1122 spin_unlock(&dlm_domain_lock);
1123
1124 return status;
1125 }
1126
1127 static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1128 {
1129 struct o2nm_node *local;
1130 struct dlm_node_info *remote;
1131 int i, j;
1132 int status = 0;
1133
1134 for (j = 0; j < qn->qn_numnodes; ++j)
1135 mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1136 &(qn->qn_nodes[j].ni_ipv4_address),
1137 ntohs(qn->qn_nodes[j].ni_ipv4_port));
1138
1139 for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1140 local = o2nm_get_node_by_num(i);
1141 remote = NULL;
1142 for (j = 0; j < qn->qn_numnodes; ++j) {
1143 if (qn->qn_nodes[j].ni_nodenum == i) {
1144 remote = &(qn->qn_nodes[j]);
1145 break;
1146 }
1147 }
1148
1149 if (!local && !remote)
1150 continue;
1151
1152 if ((local && !remote) || (!local && remote))
1153 status = -EINVAL;
1154
1155 if (!status &&
1156 ((remote->ni_nodenum != local->nd_num) ||
1157 (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1158 (remote->ni_ipv4_address != local->nd_ipv4_address)))
1159 status = -EINVAL;
1160
1161 if (status) {
1162 if (remote && !local)
1163 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1164 "registered in joining node %d but not in "
1165 "local node %d\n", qn->qn_domain,
1166 remote->ni_nodenum,
1167 &(remote->ni_ipv4_address),
1168 ntohs(remote->ni_ipv4_port),
1169 qn->qn_nodenum, dlm->node_num);
1170 if (local && !remote)
1171 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1172 "registered in local node %d but not in "
1173 "joining node %d\n", qn->qn_domain,
1174 local->nd_num, &(local->nd_ipv4_address),
1175 ntohs(local->nd_ipv4_port),
1176 dlm->node_num, qn->qn_nodenum);
1177 BUG_ON((!local && !remote));
1178 }
1179
1180 if (local)
1181 o2nm_node_put(local);
1182 }
1183
1184 return status;
1185 }
1186
1187 static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1188 {
1189 struct dlm_query_nodeinfo *qn = NULL;
1190 struct o2nm_node *node;
1191 int ret = 0, status, count, i;
1192
1193 if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1194 goto bail;
1195
1196 qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1197 if (!qn) {
1198 ret = -ENOMEM;
1199 mlog_errno(ret);
1200 goto bail;
1201 }
1202
1203 for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1204 node = o2nm_get_node_by_num(i);
1205 if (!node)
1206 continue;
1207 qn->qn_nodes[count].ni_nodenum = node->nd_num;
1208 qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1209 qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1210 mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1211 &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1212 ++count;
1213 o2nm_node_put(node);
1214 }
1215
1216 qn->qn_nodenum = dlm->node_num;
1217 qn->qn_numnodes = count;
1218 qn->qn_namelen = strlen(dlm->name);
1219 memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1220
1221 i = -1;
1222 while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1223 i + 1)) < O2NM_MAX_NODES) {
1224 if (i == dlm->node_num)
1225 continue;
1226
1227 mlog(0, "Sending nodeinfo to node %d\n", i);
1228
1229 ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1230 qn, sizeof(struct dlm_query_nodeinfo),
1231 i, &status);
1232 if (ret >= 0)
1233 ret = status;
1234 if (ret) {
1235 mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1236 break;
1237 }
1238 }
1239
1240 bail:
1241 kfree(qn);
1242 return ret;
1243 }
1244
1245 static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1246 void *data, void **ret_data)
1247 {
1248 struct dlm_query_nodeinfo *qn;
1249 struct dlm_ctxt *dlm = NULL;
1250 int locked = 0, status = -EINVAL;
1251
1252 qn = (struct dlm_query_nodeinfo *) msg->buf;
1253
1254 mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1255 qn->qn_domain);
1256
1257 spin_lock(&dlm_domain_lock);
1258 dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1259 if (!dlm) {
1260 mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1261 "join domain\n", qn->qn_nodenum, qn->qn_domain);
1262 goto bail;
1263 }
1264
1265 spin_lock(&dlm->spinlock);
1266 locked = 1;
1267 if (dlm->joining_node != qn->qn_nodenum) {
1268 mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1269 "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1270 dlm->joining_node);
1271 goto bail;
1272 }
1273
1274 /* Support for node query was added in 1.1 */
1275 if (dlm->dlm_locking_proto.pv_major == 1 &&
1276 dlm->dlm_locking_proto.pv_minor == 0) {
1277 mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1278 "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1279 qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1280 dlm->dlm_locking_proto.pv_minor);
1281 goto bail;
1282 }
1283
1284 status = dlm_match_nodes(dlm, qn);
1285
1286 bail:
1287 if (locked)
1288 spin_unlock(&dlm->spinlock);
1289 spin_unlock(&dlm_domain_lock);
1290
1291 return status;
1292 }
1293
1294 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1295 void **ret_data)
1296 {
1297 struct dlm_cancel_join *cancel;
1298 struct dlm_ctxt *dlm = NULL;
1299
1300 cancel = (struct dlm_cancel_join *) msg->buf;
1301
1302 mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1303 cancel->domain);
1304
1305 spin_lock(&dlm_domain_lock);
1306 dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1307
1308 if (dlm) {
1309 spin_lock(&dlm->spinlock);
1310
1311 /* Yikes, this guy wants to cancel his join. No
1312 * problem, we simply cleanup our join state. */
1313 BUG_ON(dlm->joining_node != cancel->node_idx);
1314 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1315
1316 spin_unlock(&dlm->spinlock);
1317 }
1318 spin_unlock(&dlm_domain_lock);
1319
1320 return 0;
1321 }
1322
1323 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1324 unsigned int node)
1325 {
1326 int status;
1327 struct dlm_cancel_join cancel_msg;
1328
1329 memset(&cancel_msg, 0, sizeof(cancel_msg));
1330 cancel_msg.node_idx = dlm->node_num;
1331 cancel_msg.name_len = strlen(dlm->name);
1332 memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1333
1334 status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1335 &cancel_msg, sizeof(cancel_msg), node,
1336 NULL);
1337 if (status < 0) {
1338 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1339 "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1340 node);
1341 goto bail;
1342 }
1343
1344 bail:
1345 return status;
1346 }
1347
1348 /* map_size should be in bytes. */
1349 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1350 unsigned long *node_map,
1351 unsigned int map_size)
1352 {
1353 int status, tmpstat;
1354 unsigned int node;
1355
1356 if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1357 sizeof(unsigned long))) {
1358 mlog(ML_ERROR,
1359 "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1360 map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1361 return -EINVAL;
1362 }
1363
1364 status = 0;
1365 node = -1;
1366 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1367 node + 1)) < O2NM_MAX_NODES) {
1368 if (node == dlm->node_num)
1369 continue;
1370
1371 tmpstat = dlm_send_one_join_cancel(dlm, node);
1372 if (tmpstat) {
1373 mlog(ML_ERROR, "Error return %d cancelling join on "
1374 "node %d\n", tmpstat, node);
1375 if (!status)
1376 status = tmpstat;
1377 }
1378 }
1379
1380 if (status)
1381 mlog_errno(status);
1382 return status;
1383 }
1384
1385 static int dlm_request_join(struct dlm_ctxt *dlm,
1386 int node,
1387 enum dlm_query_join_response_code *response)
1388 {
1389 int status;
1390 struct dlm_query_join_request join_msg;
1391 struct dlm_query_join_packet packet;
1392 u32 join_resp;
1393
1394 mlog(0, "querying node %d\n", node);
1395
1396 memset(&join_msg, 0, sizeof(join_msg));
1397 join_msg.node_idx = dlm->node_num;
1398 join_msg.name_len = strlen(dlm->name);
1399 memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1400 join_msg.dlm_proto = dlm->dlm_locking_proto;
1401 join_msg.fs_proto = dlm->fs_locking_proto;
1402
1403 /* copy live node map to join message */
1404 byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1405
1406 status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1407 sizeof(join_msg), node, &join_resp);
1408 if (status < 0 && status != -ENOPROTOOPT) {
1409 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1410 "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1411 node);
1412 goto bail;
1413 }
1414 dlm_query_join_wire_to_packet(join_resp, &packet);
1415
1416 /* -ENOPROTOOPT from the net code means the other side isn't
1417 listening for our message type -- that's fine, it means
1418 his dlm isn't up, so we can consider him a 'yes' but not
1419 joined into the domain. */
1420 if (status == -ENOPROTOOPT) {
1421 status = 0;
1422 *response = JOIN_OK_NO_MAP;
1423 } else if (packet.code == JOIN_DISALLOW ||
1424 packet.code == JOIN_OK_NO_MAP) {
1425 *response = packet.code;
1426 } else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1427 mlog(ML_NOTICE,
1428 "This node requested DLM locking protocol %u.%u and "
1429 "filesystem locking protocol %u.%u. At least one of "
1430 "the protocol versions on node %d is not compatible, "
1431 "disconnecting\n",
1432 dlm->dlm_locking_proto.pv_major,
1433 dlm->dlm_locking_proto.pv_minor,
1434 dlm->fs_locking_proto.pv_major,
1435 dlm->fs_locking_proto.pv_minor,
1436 node);
1437 status = -EPROTO;
1438 *response = packet.code;
1439 } else if (packet.code == JOIN_OK) {
1440 *response = packet.code;
1441 /* Use the same locking protocol as the remote node */
1442 dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1443 dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1444 mlog(0,
1445 "Node %d responds JOIN_OK with DLM locking protocol "
1446 "%u.%u and fs locking protocol %u.%u\n",
1447 node,
1448 dlm->dlm_locking_proto.pv_major,
1449 dlm->dlm_locking_proto.pv_minor,
1450 dlm->fs_locking_proto.pv_major,
1451 dlm->fs_locking_proto.pv_minor);
1452 } else {
1453 status = -EINVAL;
1454 mlog(ML_ERROR, "invalid response %d from node %u\n",
1455 packet.code, node);
1456 }
1457
1458 mlog(0, "status %d, node %d response is %d\n", status, node,
1459 *response);
1460
1461 bail:
1462 return status;
1463 }
1464
1465 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1466 unsigned int node)
1467 {
1468 int status;
1469 struct dlm_assert_joined assert_msg;
1470
1471 mlog(0, "Sending join assert to node %u\n", node);
1472
1473 memset(&assert_msg, 0, sizeof(assert_msg));
1474 assert_msg.node_idx = dlm->node_num;
1475 assert_msg.name_len = strlen(dlm->name);
1476 memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1477
1478 status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1479 &assert_msg, sizeof(assert_msg), node,
1480 NULL);
1481 if (status < 0)
1482 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1483 "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1484 node);
1485
1486 return status;
1487 }
1488
1489 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1490 unsigned long *node_map)
1491 {
1492 int status, node, live;
1493
1494 status = 0;
1495 node = -1;
1496 while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1497 node + 1)) < O2NM_MAX_NODES) {
1498 if (node == dlm->node_num)
1499 continue;
1500
1501 do {
1502 /* It is very important that this message be
1503 * received so we spin until either the node
1504 * has died or it gets the message. */
1505 status = dlm_send_one_join_assert(dlm, node);
1506
1507 spin_lock(&dlm->spinlock);
1508 live = test_bit(node, dlm->live_nodes_map);
1509 spin_unlock(&dlm->spinlock);
1510
1511 if (status) {
1512 mlog(ML_ERROR, "Error return %d asserting "
1513 "join on node %d\n", status, node);
1514
1515 /* give us some time between errors... */
1516 if (live)
1517 msleep(DLM_DOMAIN_BACKOFF_MS);
1518 }
1519 } while (status && live);
1520 }
1521 }
1522
1523 struct domain_join_ctxt {
1524 unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1525 unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1526 };
1527
1528 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1529 struct domain_join_ctxt *ctxt,
1530 enum dlm_query_join_response_code response)
1531 {
1532 int ret;
1533
1534 if (response == JOIN_DISALLOW) {
1535 mlog(0, "Latest response of disallow -- should restart\n");
1536 return 1;
1537 }
1538
1539 spin_lock(&dlm->spinlock);
1540 /* For now, we restart the process if the node maps have
1541 * changed at all */
1542 ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1543 sizeof(dlm->live_nodes_map));
1544 spin_unlock(&dlm->spinlock);
1545
1546 if (ret)
1547 mlog(0, "Node maps changed -- should restart\n");
1548
1549 return ret;
1550 }
1551
1552 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1553 {
1554 int status = 0, tmpstat, node;
1555 struct domain_join_ctxt *ctxt;
1556 enum dlm_query_join_response_code response = JOIN_DISALLOW;
1557
1558 mlog_entry("%p", dlm);
1559
1560 ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1561 if (!ctxt) {
1562 status = -ENOMEM;
1563 mlog_errno(status);
1564 goto bail;
1565 }
1566
1567 /* group sem locking should work for us here -- we're already
1568 * registered for heartbeat events so filling this should be
1569 * atomic wrt getting those handlers called. */
1570 o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1571
1572 spin_lock(&dlm->spinlock);
1573 memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1574
1575 __dlm_set_joining_node(dlm, dlm->node_num);
1576
1577 spin_unlock(&dlm->spinlock);
1578
1579 node = -1;
1580 while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1581 node + 1)) < O2NM_MAX_NODES) {
1582 if (node == dlm->node_num)
1583 continue;
1584
1585 status = dlm_request_join(dlm, node, &response);
1586 if (status < 0) {
1587 mlog_errno(status);
1588 goto bail;
1589 }
1590
1591 /* Ok, either we got a response or the node doesn't have a
1592 * dlm up. */
1593 if (response == JOIN_OK)
1594 set_bit(node, ctxt->yes_resp_map);
1595
1596 if (dlm_should_restart_join(dlm, ctxt, response)) {
1597 status = -EAGAIN;
1598 goto bail;
1599 }
1600 }
1601
1602 mlog(0, "Yay, done querying nodes!\n");
1603
1604 /* Yay, everyone agree's we can join the domain. My domain is
1605 * comprised of all nodes who were put in the
1606 * yes_resp_map. Copy that into our domain map and send a join
1607 * assert message to clean up everyone elses state. */
1608 spin_lock(&dlm->spinlock);
1609 memcpy(dlm->domain_map, ctxt->yes_resp_map,
1610 sizeof(ctxt->yes_resp_map));
1611 set_bit(dlm->node_num, dlm->domain_map);
1612 spin_unlock(&dlm->spinlock);
1613
1614 /* Support for global heartbeat and node info was added in 1.1 */
1615 if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
1616 status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1617 if (status) {
1618 mlog_errno(status);
1619 goto bail;
1620 }
1621 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1622 if (status) {
1623 mlog_errno(status);
1624 goto bail;
1625 }
1626 }
1627
1628 dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1629
1630 /* Joined state *must* be set before the joining node
1631 * information, otherwise the query_join handler may read no
1632 * current joiner but a state of NEW and tell joining nodes
1633 * we're not in the domain. */
1634 spin_lock(&dlm_domain_lock);
1635 dlm->dlm_state = DLM_CTXT_JOINED;
1636 dlm->num_joins++;
1637 spin_unlock(&dlm_domain_lock);
1638
1639 bail:
1640 spin_lock(&dlm->spinlock);
1641 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1642 if (!status)
1643 __dlm_print_nodes(dlm);
1644 spin_unlock(&dlm->spinlock);
1645
1646 if (ctxt) {
1647 /* Do we need to send a cancel message to any nodes? */
1648 if (status < 0) {
1649 tmpstat = dlm_send_join_cancels(dlm,
1650 ctxt->yes_resp_map,
1651 sizeof(ctxt->yes_resp_map));
1652 if (tmpstat < 0)
1653 mlog_errno(tmpstat);
1654 }
1655 kfree(ctxt);
1656 }
1657
1658 mlog(0, "returning %d\n", status);
1659 return status;
1660 }
1661
1662 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1663 {
1664 o2hb_unregister_callback(NULL, &dlm->dlm_hb_up);
1665 o2hb_unregister_callback(NULL, &dlm->dlm_hb_down);
1666 o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1667 }
1668
1669 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1670 {
1671 int status;
1672
1673 mlog(0, "registering handlers.\n");
1674
1675 o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1676 dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1677 status = o2hb_register_callback(NULL, &dlm->dlm_hb_down);
1678 if (status)
1679 goto bail;
1680
1681 o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1682 dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1683 status = o2hb_register_callback(NULL, &dlm->dlm_hb_up);
1684 if (status)
1685 goto bail;
1686
1687 status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1688 sizeof(struct dlm_master_request),
1689 dlm_master_request_handler,
1690 dlm, NULL, &dlm->dlm_domain_handlers);
1691 if (status)
1692 goto bail;
1693
1694 status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1695 sizeof(struct dlm_assert_master),
1696 dlm_assert_master_handler,
1697 dlm, dlm_assert_master_post_handler,
1698 &dlm->dlm_domain_handlers);
1699 if (status)
1700 goto bail;
1701
1702 status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1703 sizeof(struct dlm_create_lock),
1704 dlm_create_lock_handler,
1705 dlm, NULL, &dlm->dlm_domain_handlers);
1706 if (status)
1707 goto bail;
1708
1709 status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1710 DLM_CONVERT_LOCK_MAX_LEN,
1711 dlm_convert_lock_handler,
1712 dlm, NULL, &dlm->dlm_domain_handlers);
1713 if (status)
1714 goto bail;
1715
1716 status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1717 DLM_UNLOCK_LOCK_MAX_LEN,
1718 dlm_unlock_lock_handler,
1719 dlm, NULL, &dlm->dlm_domain_handlers);
1720 if (status)
1721 goto bail;
1722
1723 status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1724 DLM_PROXY_AST_MAX_LEN,
1725 dlm_proxy_ast_handler,
1726 dlm, NULL, &dlm->dlm_domain_handlers);
1727 if (status)
1728 goto bail;
1729
1730 status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1731 sizeof(struct dlm_exit_domain),
1732 dlm_exit_domain_handler,
1733 dlm, NULL, &dlm->dlm_domain_handlers);
1734 if (status)
1735 goto bail;
1736
1737 status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1738 sizeof(struct dlm_deref_lockres),
1739 dlm_deref_lockres_handler,
1740 dlm, NULL, &dlm->dlm_domain_handlers);
1741 if (status)
1742 goto bail;
1743
1744 status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1745 sizeof(struct dlm_migrate_request),
1746 dlm_migrate_request_handler,
1747 dlm, NULL, &dlm->dlm_domain_handlers);
1748 if (status)
1749 goto bail;
1750
1751 status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1752 DLM_MIG_LOCKRES_MAX_LEN,
1753 dlm_mig_lockres_handler,
1754 dlm, NULL, &dlm->dlm_domain_handlers);
1755 if (status)
1756 goto bail;
1757
1758 status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1759 sizeof(struct dlm_master_requery),
1760 dlm_master_requery_handler,
1761 dlm, NULL, &dlm->dlm_domain_handlers);
1762 if (status)
1763 goto bail;
1764
1765 status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1766 sizeof(struct dlm_lock_request),
1767 dlm_request_all_locks_handler,
1768 dlm, NULL, &dlm->dlm_domain_handlers);
1769 if (status)
1770 goto bail;
1771
1772 status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1773 sizeof(struct dlm_reco_data_done),
1774 dlm_reco_data_done_handler,
1775 dlm, NULL, &dlm->dlm_domain_handlers);
1776 if (status)
1777 goto bail;
1778
1779 status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1780 sizeof(struct dlm_begin_reco),
1781 dlm_begin_reco_handler,
1782 dlm, NULL, &dlm->dlm_domain_handlers);
1783 if (status)
1784 goto bail;
1785
1786 status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1787 sizeof(struct dlm_finalize_reco),
1788 dlm_finalize_reco_handler,
1789 dlm, NULL, &dlm->dlm_domain_handlers);
1790 if (status)
1791 goto bail;
1792
1793 bail:
1794 if (status)
1795 dlm_unregister_domain_handlers(dlm);
1796
1797 return status;
1798 }
1799
1800 static int dlm_join_domain(struct dlm_ctxt *dlm)
1801 {
1802 int status;
1803 unsigned int backoff;
1804 unsigned int total_backoff = 0;
1805
1806 BUG_ON(!dlm);
1807
1808 mlog(0, "Join domain %s\n", dlm->name);
1809
1810 status = dlm_register_domain_handlers(dlm);
1811 if (status) {
1812 mlog_errno(status);
1813 goto bail;
1814 }
1815
1816 status = dlm_debug_init(dlm);
1817 if (status < 0) {
1818 mlog_errno(status);
1819 goto bail;
1820 }
1821
1822 status = dlm_launch_thread(dlm);
1823 if (status < 0) {
1824 mlog_errno(status);
1825 goto bail;
1826 }
1827
1828 status = dlm_launch_recovery_thread(dlm);
1829 if (status < 0) {
1830 mlog_errno(status);
1831 goto bail;
1832 }
1833
1834 dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1835 if (!dlm->dlm_worker) {
1836 status = -ENOMEM;
1837 mlog_errno(status);
1838 goto bail;
1839 }
1840
1841 do {
1842 status = dlm_try_to_join_domain(dlm);
1843
1844 /* If we're racing another node to the join, then we
1845 * need to back off temporarily and let them
1846 * complete. */
1847 #define DLM_JOIN_TIMEOUT_MSECS 90000
1848 if (status == -EAGAIN) {
1849 if (signal_pending(current)) {
1850 status = -ERESTARTSYS;
1851 goto bail;
1852 }
1853
1854 if (total_backoff >
1855 msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1856 status = -ERESTARTSYS;
1857 mlog(ML_NOTICE, "Timed out joining dlm domain "
1858 "%s after %u msecs\n", dlm->name,
1859 jiffies_to_msecs(total_backoff));
1860 goto bail;
1861 }
1862
1863 /*
1864 * <chip> After you!
1865 * <dale> No, after you!
1866 * <chip> I insist!
1867 * <dale> But you first!
1868 * ...
1869 */
1870 backoff = (unsigned int)(jiffies & 0x3);
1871 backoff *= DLM_DOMAIN_BACKOFF_MS;
1872 total_backoff += backoff;
1873 mlog(0, "backoff %d\n", backoff);
1874 msleep(backoff);
1875 }
1876 } while (status == -EAGAIN);
1877
1878 if (status < 0) {
1879 mlog_errno(status);
1880 goto bail;
1881 }
1882
1883 status = 0;
1884 bail:
1885 wake_up(&dlm_domain_events);
1886
1887 if (status) {
1888 dlm_unregister_domain_handlers(dlm);
1889 dlm_debug_shutdown(dlm);
1890 dlm_complete_thread(dlm);
1891 dlm_complete_recovery_thread(dlm);
1892 dlm_destroy_dlm_worker(dlm);
1893 }
1894
1895 return status;
1896 }
1897
1898 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1899 u32 key)
1900 {
1901 int i;
1902 int ret;
1903 struct dlm_ctxt *dlm = NULL;
1904
1905 dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1906 if (!dlm) {
1907 mlog_errno(-ENOMEM);
1908 goto leave;
1909 }
1910
1911 dlm->name = kstrdup(domain, GFP_KERNEL);
1912 if (dlm->name == NULL) {
1913 mlog_errno(-ENOMEM);
1914 kfree(dlm);
1915 dlm = NULL;
1916 goto leave;
1917 }
1918
1919 dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1920 if (!dlm->lockres_hash) {
1921 mlog_errno(-ENOMEM);
1922 kfree(dlm->name);
1923 kfree(dlm);
1924 dlm = NULL;
1925 goto leave;
1926 }
1927
1928 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1929 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1930
1931 dlm->master_hash = (struct hlist_head **)
1932 dlm_alloc_pagevec(DLM_HASH_PAGES);
1933 if (!dlm->master_hash) {
1934 mlog_errno(-ENOMEM);
1935 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1936 kfree(dlm->name);
1937 kfree(dlm);
1938 dlm = NULL;
1939 goto leave;
1940 }
1941
1942 for (i = 0; i < DLM_HASH_BUCKETS; i++)
1943 INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1944
1945 dlm->key = key;
1946 dlm->node_num = o2nm_this_node();
1947
1948 ret = dlm_create_debugfs_subroot(dlm);
1949 if (ret < 0) {
1950 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
1951 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1952 kfree(dlm->name);
1953 kfree(dlm);
1954 dlm = NULL;
1955 goto leave;
1956 }
1957
1958 spin_lock_init(&dlm->spinlock);
1959 spin_lock_init(&dlm->master_lock);
1960 spin_lock_init(&dlm->ast_lock);
1961 spin_lock_init(&dlm->track_lock);
1962 INIT_LIST_HEAD(&dlm->list);
1963 INIT_LIST_HEAD(&dlm->dirty_list);
1964 INIT_LIST_HEAD(&dlm->reco.resources);
1965 INIT_LIST_HEAD(&dlm->reco.received);
1966 INIT_LIST_HEAD(&dlm->reco.node_data);
1967 INIT_LIST_HEAD(&dlm->purge_list);
1968 INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1969 INIT_LIST_HEAD(&dlm->tracking_list);
1970 dlm->reco.state = 0;
1971
1972 INIT_LIST_HEAD(&dlm->pending_asts);
1973 INIT_LIST_HEAD(&dlm->pending_basts);
1974
1975 mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1976 dlm->recovery_map, &(dlm->recovery_map[0]));
1977
1978 memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1979 memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1980 memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1981
1982 dlm->dlm_thread_task = NULL;
1983 dlm->dlm_reco_thread_task = NULL;
1984 dlm->dlm_worker = NULL;
1985 init_waitqueue_head(&dlm->dlm_thread_wq);
1986 init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1987 init_waitqueue_head(&dlm->reco.event);
1988 init_waitqueue_head(&dlm->ast_wq);
1989 init_waitqueue_head(&dlm->migration_wq);
1990 INIT_LIST_HEAD(&dlm->mle_hb_events);
1991
1992 dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1993 init_waitqueue_head(&dlm->dlm_join_events);
1994
1995 dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1996 dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1997
1998 atomic_set(&dlm->res_tot_count, 0);
1999 atomic_set(&dlm->res_cur_count, 0);
2000 for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2001 atomic_set(&dlm->mle_tot_count[i], 0);
2002 atomic_set(&dlm->mle_cur_count[i], 0);
2003 }
2004
2005 spin_lock_init(&dlm->work_lock);
2006 INIT_LIST_HEAD(&dlm->work_list);
2007 INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2008
2009 kref_init(&dlm->dlm_refs);
2010 dlm->dlm_state = DLM_CTXT_NEW;
2011
2012 INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2013
2014 mlog(0, "context init: refcount %u\n",
2015 atomic_read(&dlm->dlm_refs.refcount));
2016
2017 leave:
2018 return dlm;
2019 }
2020
2021 /*
2022 * Compare a requested locking protocol version against the current one.
2023 *
2024 * If the major numbers are different, they are incompatible.
2025 * If the current minor is greater than the request, they are incompatible.
2026 * If the current minor is less than or equal to the request, they are
2027 * compatible, and the requester should run at the current minor version.
2028 */
2029 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2030 struct dlm_protocol_version *request)
2031 {
2032 if (existing->pv_major != request->pv_major)
2033 return 1;
2034
2035 if (existing->pv_minor > request->pv_minor)
2036 return 1;
2037
2038 if (existing->pv_minor < request->pv_minor)
2039 request->pv_minor = existing->pv_minor;
2040
2041 return 0;
2042 }
2043
2044 /*
2045 * dlm_register_domain: one-time setup per "domain".
2046 *
2047 * The filesystem passes in the requested locking version via proto.
2048 * If registration was successful, proto will contain the negotiated
2049 * locking protocol.
2050 */
2051 struct dlm_ctxt * dlm_register_domain(const char *domain,
2052 u32 key,
2053 struct dlm_protocol_version *fs_proto)
2054 {
2055 int ret;
2056 struct dlm_ctxt *dlm = NULL;
2057 struct dlm_ctxt *new_ctxt = NULL;
2058
2059 if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2060 ret = -ENAMETOOLONG;
2061 mlog(ML_ERROR, "domain name length too long\n");
2062 goto leave;
2063 }
2064
2065 if (!o2hb_check_local_node_heartbeating()) {
2066 mlog(ML_ERROR, "the local node has not been configured, or is "
2067 "not heartbeating\n");
2068 ret = -EPROTO;
2069 goto leave;
2070 }
2071
2072 mlog(0, "register called for domain \"%s\"\n", domain);
2073
2074 retry:
2075 dlm = NULL;
2076 if (signal_pending(current)) {
2077 ret = -ERESTARTSYS;
2078 mlog_errno(ret);
2079 goto leave;
2080 }
2081
2082 spin_lock(&dlm_domain_lock);
2083
2084 dlm = __dlm_lookup_domain(domain);
2085 if (dlm) {
2086 if (dlm->dlm_state != DLM_CTXT_JOINED) {
2087 spin_unlock(&dlm_domain_lock);
2088
2089 mlog(0, "This ctxt is not joined yet!\n");
2090 wait_event_interruptible(dlm_domain_events,
2091 dlm_wait_on_domain_helper(
2092 domain));
2093 goto retry;
2094 }
2095
2096 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2097 spin_unlock(&dlm_domain_lock);
2098 mlog(ML_ERROR,
2099 "Requested locking protocol version is not "
2100 "compatible with already registered domain "
2101 "\"%s\"\n", domain);
2102 ret = -EPROTO;
2103 goto leave;
2104 }
2105
2106 __dlm_get(dlm);
2107 dlm->num_joins++;
2108
2109 spin_unlock(&dlm_domain_lock);
2110
2111 ret = 0;
2112 goto leave;
2113 }
2114
2115 /* doesn't exist */
2116 if (!new_ctxt) {
2117 spin_unlock(&dlm_domain_lock);
2118
2119 new_ctxt = dlm_alloc_ctxt(domain, key);
2120 if (new_ctxt)
2121 goto retry;
2122
2123 ret = -ENOMEM;
2124 mlog_errno(ret);
2125 goto leave;
2126 }
2127
2128 /* a little variable switch-a-roo here... */
2129 dlm = new_ctxt;
2130 new_ctxt = NULL;
2131
2132 /* add the new domain */
2133 list_add_tail(&dlm->list, &dlm_domains);
2134 spin_unlock(&dlm_domain_lock);
2135
2136 /*
2137 * Pass the locking protocol version into the join. If the join
2138 * succeeds, it will have the negotiated protocol set.
2139 */
2140 dlm->dlm_locking_proto = dlm_protocol;
2141 dlm->fs_locking_proto = *fs_proto;
2142
2143 ret = dlm_join_domain(dlm);
2144 if (ret) {
2145 mlog_errno(ret);
2146 dlm_put(dlm);
2147 goto leave;
2148 }
2149
2150 /* Tell the caller what locking protocol we negotiated */
2151 *fs_proto = dlm->fs_locking_proto;
2152
2153 ret = 0;
2154 leave:
2155 if (new_ctxt)
2156 dlm_free_ctxt_mem(new_ctxt);
2157
2158 if (ret < 0)
2159 dlm = ERR_PTR(ret);
2160
2161 return dlm;
2162 }
2163 EXPORT_SYMBOL_GPL(dlm_register_domain);
2164
2165 static LIST_HEAD(dlm_join_handlers);
2166
2167 static void dlm_unregister_net_handlers(void)
2168 {
2169 o2net_unregister_handler_list(&dlm_join_handlers);
2170 }
2171
2172 static int dlm_register_net_handlers(void)
2173 {
2174 int status = 0;
2175
2176 status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2177 sizeof(struct dlm_query_join_request),
2178 dlm_query_join_handler,
2179 NULL, NULL, &dlm_join_handlers);
2180 if (status)
2181 goto bail;
2182
2183 status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2184 sizeof(struct dlm_assert_joined),
2185 dlm_assert_joined_handler,
2186 NULL, NULL, &dlm_join_handlers);
2187 if (status)
2188 goto bail;
2189
2190 status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2191 sizeof(struct dlm_cancel_join),
2192 dlm_cancel_join_handler,
2193 NULL, NULL, &dlm_join_handlers);
2194 if (status)
2195 goto bail;
2196
2197 status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2198 sizeof(struct dlm_query_region),
2199 dlm_query_region_handler,
2200 NULL, NULL, &dlm_join_handlers);
2201
2202 if (status)
2203 goto bail;
2204
2205 status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2206 sizeof(struct dlm_query_nodeinfo),
2207 dlm_query_nodeinfo_handler,
2208 NULL, NULL, &dlm_join_handlers);
2209 bail:
2210 if (status < 0)
2211 dlm_unregister_net_handlers();
2212
2213 return status;
2214 }
2215
2216 /* Domain eviction callback handling.
2217 *
2218 * The file system requires notification of node death *before* the
2219 * dlm completes it's recovery work, otherwise it may be able to
2220 * acquire locks on resources requiring recovery. Since the dlm can
2221 * evict a node from it's domain *before* heartbeat fires, a similar
2222 * mechanism is required. */
2223
2224 /* Eviction is not expected to happen often, so a per-domain lock is
2225 * not necessary. Eviction callbacks are allowed to sleep for short
2226 * periods of time. */
2227 static DECLARE_RWSEM(dlm_callback_sem);
2228
2229 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2230 int node_num)
2231 {
2232 struct list_head *iter;
2233 struct dlm_eviction_cb *cb;
2234
2235 down_read(&dlm_callback_sem);
2236 list_for_each(iter, &dlm->dlm_eviction_callbacks) {
2237 cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
2238
2239 cb->ec_func(node_num, cb->ec_data);
2240 }
2241 up_read(&dlm_callback_sem);
2242 }
2243
2244 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2245 dlm_eviction_func *f,
2246 void *data)
2247 {
2248 INIT_LIST_HEAD(&cb->ec_item);
2249 cb->ec_func = f;
2250 cb->ec_data = data;
2251 }
2252 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2253
2254 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2255 struct dlm_eviction_cb *cb)
2256 {
2257 down_write(&dlm_callback_sem);
2258 list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2259 up_write(&dlm_callback_sem);
2260 }
2261 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2262
2263 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2264 {
2265 down_write(&dlm_callback_sem);
2266 list_del_init(&cb->ec_item);
2267 up_write(&dlm_callback_sem);
2268 }
2269 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2270
2271 static int __init dlm_init(void)
2272 {
2273 int status;
2274
2275 dlm_print_version();
2276
2277 status = dlm_init_mle_cache();
2278 if (status) {
2279 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2280 goto error;
2281 }
2282
2283 status = dlm_init_master_caches();
2284 if (status) {
2285 mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2286 "o2dlm_lockname slabcaches\n");
2287 goto error;
2288 }
2289
2290 status = dlm_init_lock_cache();
2291 if (status) {
2292 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2293 goto error;
2294 }
2295
2296 status = dlm_register_net_handlers();
2297 if (status) {
2298 mlog(ML_ERROR, "Unable to register network handlers\n");
2299 goto error;
2300 }
2301
2302 status = dlm_create_debugfs_root();
2303 if (status)
2304 goto error;
2305
2306 return 0;
2307 error:
2308 dlm_unregister_net_handlers();
2309 dlm_destroy_lock_cache();
2310 dlm_destroy_master_caches();
2311 dlm_destroy_mle_cache();
2312 return -1;
2313 }
2314
2315 static void __exit dlm_exit (void)
2316 {
2317 dlm_destroy_debugfs_root();
2318 dlm_unregister_net_handlers();
2319 dlm_destroy_lock_cache();
2320 dlm_destroy_master_caches();
2321 dlm_destroy_mle_cache();
2322 }
2323
2324 MODULE_AUTHOR("Oracle");
2325 MODULE_LICENSE("GPL");
2326
2327 module_init(dlm_init);
2328 module_exit(dlm_exit);
This page took 0.078115 seconds and 5 git commands to generate.