4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Implementation of cl_lock for LOV layer.
38 * Author: Nikita Danilov <nikita.danilov@sun.com>
41 #define DEBUG_SUBSYSTEM S_LOV
43 #include "lov_cl_internal.h"
49 static struct cl_lock_closure
*lov_closure_get(const struct lu_env
*env
,
50 struct cl_lock
*parent
);
52 static int lov_lock_unuse(const struct lu_env
*env
,
53 const struct cl_lock_slice
*slice
);
54 /*****************************************************************************
56 * Lov lock operations.
60 static struct lov_sublock_env
*lov_sublock_env_get(const struct lu_env
*env
,
61 struct cl_lock
*parent
,
62 struct lov_lock_sub
*lls
)
64 struct lov_sublock_env
*subenv
;
65 struct lov_io
*lio
= lov_env_io(env
);
66 struct cl_io
*io
= lio
->lis_cl
.cis_io
;
67 struct lov_io_sub
*sub
;
69 subenv
= &lov_env_session(env
)->ls_subenv
;
72 * FIXME: We tend to use the subio's env & io to call the sublock
73 * lock operations because osc lock sometimes stores some control
74 * variables in thread's IO information(Now only lockless information).
75 * However, if the lock's host(object) is different from the object
76 * for current IO, we have no way to get the subenv and subio because
77 * they are not initialized at all. As a temp fix, in this case,
78 * we still borrow the parent's env to call sublock operations.
80 if (!io
|| !cl_object_same(io
->ci_obj
, parent
->cll_descr
.cld_obj
)) {
81 subenv
->lse_env
= env
;
83 subenv
->lse_sub
= NULL
;
85 sub
= lov_sub_get(env
, lio
, lls
->sub_stripe
);
87 subenv
->lse_env
= sub
->sub_env
;
88 subenv
->lse_io
= sub
->sub_io
;
89 subenv
->lse_sub
= sub
;
97 static void lov_sublock_env_put(struct lov_sublock_env
*subenv
)
99 if (subenv
&& subenv
->lse_sub
)
100 lov_sub_put(subenv
->lse_sub
);
103 static void lov_sublock_adopt(const struct lu_env
*env
, struct lov_lock
*lck
,
104 struct cl_lock
*sublock
, int idx
,
105 struct lov_lock_link
*link
)
107 struct lovsub_lock
*lsl
;
108 struct cl_lock
*parent
= lck
->lls_cl
.cls_lock
;
111 LASSERT(cl_lock_is_mutexed(parent
));
112 LASSERT(cl_lock_is_mutexed(sublock
));
114 lsl
= cl2sub_lock(sublock
);
116 * check that sub-lock doesn't have lock link to this top-lock.
118 LASSERT(lov_lock_link_find(env
, lck
, lsl
) == NULL
);
119 LASSERT(idx
< lck
->lls_nr
);
121 lck
->lls_sub
[idx
].sub_lock
= lsl
;
122 lck
->lls_nr_filled
++;
123 LASSERT(lck
->lls_nr_filled
<= lck
->lls_nr
);
124 list_add_tail(&link
->lll_list
, &lsl
->lss_parents
);
126 link
->lll_super
= lck
;
128 lu_ref_add(&parent
->cll_reference
, "lov-child", sublock
);
129 lck
->lls_sub
[idx
].sub_flags
|= LSF_HELD
;
130 cl_lock_user_add(env
, sublock
);
132 rc
= lov_sublock_modify(env
, lck
, lsl
, &sublock
->cll_descr
, idx
);
133 LASSERT(rc
== 0); /* there is no way this can fail, currently */
136 static struct cl_lock
*lov_sublock_alloc(const struct lu_env
*env
,
137 const struct cl_io
*io
,
138 struct lov_lock
*lck
,
139 int idx
, struct lov_lock_link
**out
)
141 struct cl_lock
*sublock
;
142 struct cl_lock
*parent
;
143 struct lov_lock_link
*link
;
145 LASSERT(idx
< lck
->lls_nr
);
147 link
= kmem_cache_alloc(lov_lock_link_kmem
, GFP_NOFS
| __GFP_ZERO
);
149 struct lov_sublock_env
*subenv
;
150 struct lov_lock_sub
*lls
;
151 struct cl_lock_descr
*descr
;
153 parent
= lck
->lls_cl
.cls_lock
;
154 lls
= &lck
->lls_sub
[idx
];
155 descr
= &lls
->sub_got
;
157 subenv
= lov_sublock_env_get(env
, parent
, lls
);
158 if (!IS_ERR(subenv
)) {
159 /* CAVEAT: Don't try to add a field in lov_lock_sub
160 * to remember the subio. This is because lock is able
161 * to be cached, but this is not true for IO. This
162 * further means a sublock might be referenced in
163 * different io context. -jay */
165 sublock
= cl_lock_hold(subenv
->lse_env
, subenv
->lse_io
,
166 descr
, "lov-parent", parent
);
167 lov_sublock_env_put(subenv
);
170 sublock
= (void *)subenv
;
173 if (!IS_ERR(sublock
))
176 kmem_cache_free(lov_lock_link_kmem
, link
);
178 sublock
= ERR_PTR(-ENOMEM
);
182 static void lov_sublock_unlock(const struct lu_env
*env
,
183 struct lovsub_lock
*lsl
,
184 struct cl_lock_closure
*closure
,
185 struct lov_sublock_env
*subenv
)
187 lov_sublock_env_put(subenv
);
188 lsl
->lss_active
= NULL
;
189 cl_lock_disclosure(env
, closure
);
192 static int lov_sublock_lock(const struct lu_env
*env
,
193 struct lov_lock
*lck
,
194 struct lov_lock_sub
*lls
,
195 struct cl_lock_closure
*closure
,
196 struct lov_sublock_env
**lsep
)
198 struct lovsub_lock
*sublock
;
199 struct cl_lock
*child
;
202 LASSERT(list_empty(&closure
->clc_list
));
204 sublock
= lls
->sub_lock
;
205 child
= sublock
->lss_cl
.cls_lock
;
206 result
= cl_lock_closure_build(env
, child
, closure
);
208 struct cl_lock
*parent
= closure
->clc_origin
;
210 LASSERT(cl_lock_is_mutexed(child
));
211 sublock
->lss_active
= parent
;
213 if (unlikely((child
->cll_state
== CLS_FREEING
) ||
214 (child
->cll_flags
& CLF_CANCELLED
))) {
215 struct lov_lock_link
*link
;
217 * we could race with lock deletion which temporarily
218 * put the lock in freeing state, bug 19080.
220 LASSERT(!(lls
->sub_flags
& LSF_HELD
));
222 link
= lov_lock_link_find(env
, lck
, sublock
);
223 LASSERT(link
!= NULL
);
224 lov_lock_unlink(env
, link
, sublock
);
225 lov_sublock_unlock(env
, sublock
, closure
, NULL
);
226 lck
->lls_cancel_race
= 1;
229 struct lov_sublock_env
*subenv
;
231 subenv
= lov_sublock_env_get(env
, parent
, lls
);
232 if (IS_ERR(subenv
)) {
233 lov_sublock_unlock(env
, sublock
,
235 result
= PTR_ERR(subenv
);
245 * Updates the result of a top-lock operation from a result of sub-lock
246 * sub-operations. Top-operations like lov_lock_{enqueue,use,unuse}() iterate
247 * over sub-locks and lov_subresult() is used to calculate return value of a
248 * top-operation. To this end, possible return values of sub-operations are
252 * - CLO_WAIT wait for event
253 * - CLO_REPEAT repeat top-operation
254 * - -ne fundamental error
256 * Top-level return code can only go down through this list. CLO_REPEAT
257 * overwrites CLO_WAIT, because lock mutex was released and sleeping condition
258 * has to be rechecked by the upper layer.
260 static int lov_subresult(int result
, int rc
)
265 LASSERTF(result
<= 0 || result
== CLO_REPEAT
|| result
== CLO_WAIT
,
266 "result = %d", result
);
267 LASSERTF(rc
<= 0 || rc
== CLO_REPEAT
|| rc
== CLO_WAIT
,
269 CLASSERT(CLO_WAIT
< CLO_REPEAT
);
271 /* calculate ranks in the ordering above */
272 result_rank
= result
< 0 ? 1 + CLO_REPEAT
: result
;
273 rc_rank
= rc
< 0 ? 1 + CLO_REPEAT
: rc
;
275 if (result_rank
< rc_rank
)
281 * Creates sub-locks for a given lov_lock for the first time.
283 * Goes through all sub-objects of top-object, and creates sub-locks on every
284 * sub-object intersecting with top-lock extent. This is complicated by the
285 * fact that top-lock (that is being created) can be accessed concurrently
286 * through already created sub-locks (possibly shared with other top-locks).
288 static int lov_lock_sub_init(const struct lu_env
*env
,
289 struct lov_lock
*lck
, const struct cl_io
*io
)
299 struct lov_object
*loo
= cl2lov(lck
->lls_cl
.cls_obj
);
300 struct lov_layout_raid0
*r0
= lov_r0(loo
);
301 struct cl_lock
*parent
= lck
->lls_cl
.cls_lock
;
303 lck
->lls_orig
= parent
->cll_descr
;
304 file_start
= cl_offset(lov2cl(loo
), parent
->cll_descr
.cld_start
);
305 file_end
= cl_offset(lov2cl(loo
), parent
->cll_descr
.cld_end
+ 1) - 1;
307 for (i
= 0, nr
= 0; i
< r0
->lo_nr
; i
++) {
309 * XXX for wide striping smarter algorithm is desirable,
310 * breaking out of the loop, early.
312 if (likely(r0
->lo_sub
[i
] != NULL
) &&
313 lov_stripe_intersects(loo
->lo_lsm
, i
,
314 file_start
, file_end
, &start
, &end
))
318 lck
->lls_sub
= libcfs_kvzalloc(nr
* sizeof(lck
->lls_sub
[0]), GFP_NOFS
);
319 if (lck
->lls_sub
== NULL
)
324 * First, fill in sub-lock descriptions in
325 * lck->lls_sub[].sub_descr. They are used by lov_sublock_alloc()
326 * (called below in this function, and by lov_lock_enqueue()) to
327 * create sub-locks. At this moment, no other thread can access
330 for (i
= 0, nr
= 0; i
< r0
->lo_nr
; ++i
) {
331 if (likely(r0
->lo_sub
[i
] != NULL
) &&
332 lov_stripe_intersects(loo
->lo_lsm
, i
,
333 file_start
, file_end
, &start
, &end
)) {
334 struct cl_lock_descr
*descr
;
336 descr
= &lck
->lls_sub
[nr
].sub_descr
;
338 LASSERT(descr
->cld_obj
== NULL
);
339 descr
->cld_obj
= lovsub2cl(r0
->lo_sub
[i
]);
340 descr
->cld_start
= cl_index(descr
->cld_obj
, start
);
341 descr
->cld_end
= cl_index(descr
->cld_obj
, end
);
342 descr
->cld_mode
= parent
->cll_descr
.cld_mode
;
343 descr
->cld_gid
= parent
->cll_descr
.cld_gid
;
344 descr
->cld_enq_flags
= parent
->cll_descr
.cld_enq_flags
;
345 /* XXX has no effect */
346 lck
->lls_sub
[nr
].sub_got
= *descr
;
347 lck
->lls_sub
[nr
].sub_stripe
= i
;
351 LASSERT(nr
== lck
->lls_nr
);
354 * Some sub-locks can be missing at this point. This is not a problem,
355 * because enqueue will create them anyway. Main duty of this function
356 * is to fill in sub-lock descriptions in a race free manner.
361 static int lov_sublock_release(const struct lu_env
*env
, struct lov_lock
*lck
,
362 int i
, int deluser
, int rc
)
364 struct cl_lock
*parent
= lck
->lls_cl
.cls_lock
;
366 LASSERT(cl_lock_is_mutexed(parent
));
368 if (lck
->lls_sub
[i
].sub_flags
& LSF_HELD
) {
369 struct cl_lock
*sublock
;
372 LASSERT(lck
->lls_sub
[i
].sub_lock
!= NULL
);
373 sublock
= lck
->lls_sub
[i
].sub_lock
->lss_cl
.cls_lock
;
374 LASSERT(cl_lock_is_mutexed(sublock
));
376 lck
->lls_sub
[i
].sub_flags
&= ~LSF_HELD
;
378 cl_lock_user_del(env
, sublock
);
380 * If the last hold is released, and cancellation is pending
381 * for a sub-lock, release parent mutex, to avoid keeping it
382 * while sub-lock is being paged out.
384 dying
= (sublock
->cll_descr
.cld_mode
== CLM_PHANTOM
||
385 sublock
->cll_descr
.cld_mode
== CLM_GROUP
||
386 (sublock
->cll_flags
& (CLF_CANCELPEND
|CLF_DOOMED
))) &&
387 sublock
->cll_holds
== 1;
389 cl_lock_mutex_put(env
, parent
);
390 cl_lock_unhold(env
, sublock
, "lov-parent", parent
);
392 cl_lock_mutex_get(env
, parent
);
393 rc
= lov_subresult(rc
, CLO_REPEAT
);
396 * From now on lck->lls_sub[i].sub_lock is a "weak" pointer,
397 * not backed by a reference on a
398 * sub-lock. lovsub_lock_delete() will clear
399 * lck->lls_sub[i].sub_lock under semaphores, just before
400 * sub-lock is destroyed.
406 static void lov_sublock_hold(const struct lu_env
*env
, struct lov_lock
*lck
,
409 struct cl_lock
*parent
= lck
->lls_cl
.cls_lock
;
411 LASSERT(cl_lock_is_mutexed(parent
));
413 if (!(lck
->lls_sub
[i
].sub_flags
& LSF_HELD
)) {
414 struct cl_lock
*sublock
;
416 LASSERT(lck
->lls_sub
[i
].sub_lock
!= NULL
);
417 sublock
= lck
->lls_sub
[i
].sub_lock
->lss_cl
.cls_lock
;
418 LASSERT(cl_lock_is_mutexed(sublock
));
419 LASSERT(sublock
->cll_state
!= CLS_FREEING
);
421 lck
->lls_sub
[i
].sub_flags
|= LSF_HELD
;
423 cl_lock_get_trust(sublock
);
424 cl_lock_hold_add(env
, sublock
, "lov-parent", parent
);
425 cl_lock_user_add(env
, sublock
);
426 cl_lock_put(env
, sublock
);
430 static void lov_lock_fini(const struct lu_env
*env
,
431 struct cl_lock_slice
*slice
)
433 struct lov_lock
*lck
;
436 lck
= cl2lov_lock(slice
);
437 LASSERT(lck
->lls_nr_filled
== 0);
438 if (lck
->lls_sub
!= NULL
) {
439 for (i
= 0; i
< lck
->lls_nr
; ++i
)
441 * No sub-locks exists at this point, as sub-lock has
442 * a reference on its parent.
444 LASSERT(lck
->lls_sub
[i
].sub_lock
== NULL
);
445 kvfree(lck
->lls_sub
);
447 kmem_cache_free(lov_lock_kmem
, lck
);
450 static int lov_lock_enqueue_wait(const struct lu_env
*env
,
451 struct lov_lock
*lck
,
452 struct cl_lock
*sublock
)
454 struct cl_lock
*lock
= lck
->lls_cl
.cls_lock
;
457 LASSERT(cl_lock_is_mutexed(lock
));
459 cl_lock_mutex_put(env
, lock
);
460 result
= cl_lock_enqueue_wait(env
, sublock
, 0);
461 cl_lock_mutex_get(env
, lock
);
462 return result
?: CLO_REPEAT
;
466 * Tries to advance a state machine of a given sub-lock toward enqueuing of
469 * \retval 0 if state-transition can proceed
470 * \retval -ve otherwise.
472 static int lov_lock_enqueue_one(const struct lu_env
*env
, struct lov_lock
*lck
,
473 struct cl_lock
*sublock
,
474 struct cl_io
*io
, __u32 enqflags
, int last
)
478 /* first, try to enqueue a sub-lock ... */
479 result
= cl_enqueue_try(env
, sublock
, io
, enqflags
);
480 if ((sublock
->cll_state
== CLS_ENQUEUED
) && !(enqflags
& CEF_AGL
)) {
481 /* if it is enqueued, try to `wait' on it---maybe it's already
483 result
= cl_wait_try(env
, sublock
);
484 if (result
== CLO_REENQUEUED
)
488 * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
489 * parallel, otherwise---enqueue has to wait until sub-lock is granted
490 * before proceeding to the next one.
492 if ((result
== CLO_WAIT
) && (sublock
->cll_state
<= CLS_HELD
) &&
493 (enqflags
& CEF_ASYNC
) && (!last
|| (enqflags
& CEF_AGL
)))
499 * Helper function for lov_lock_enqueue() that creates missing sub-lock.
501 static int lov_sublock_fill(const struct lu_env
*env
, struct cl_lock
*parent
,
502 struct cl_io
*io
, struct lov_lock
*lck
, int idx
)
504 struct lov_lock_link
*link
= NULL
;
505 struct cl_lock
*sublock
;
508 LASSERT(parent
->cll_depth
== 1);
509 cl_lock_mutex_put(env
, parent
);
510 sublock
= lov_sublock_alloc(env
, io
, lck
, idx
, &link
);
511 if (!IS_ERR(sublock
))
512 cl_lock_mutex_get(env
, sublock
);
513 cl_lock_mutex_get(env
, parent
);
515 if (!IS_ERR(sublock
)) {
516 cl_lock_get_trust(sublock
);
517 if (parent
->cll_state
== CLS_QUEUING
&&
518 lck
->lls_sub
[idx
].sub_lock
== NULL
) {
519 lov_sublock_adopt(env
, lck
, sublock
, idx
, link
);
521 kmem_cache_free(lov_lock_link_kmem
, link
);
522 /* other thread allocated sub-lock, or enqueue is no
524 cl_lock_mutex_put(env
, parent
);
525 cl_lock_unhold(env
, sublock
, "lov-parent", parent
);
526 cl_lock_mutex_get(env
, parent
);
528 cl_lock_mutex_put(env
, sublock
);
529 cl_lock_put(env
, sublock
);
532 result
= PTR_ERR(sublock
);
537 * Implementation of cl_lock_operations::clo_enqueue() for lov layer. This
538 * function is rather subtle, as it enqueues top-lock (i.e., advances top-lock
539 * state machine from CLS_QUEUING to CLS_ENQUEUED states) by juggling sub-lock
540 * state machines in the face of sub-locks sharing (by multiple top-locks),
541 * and concurrent sub-lock cancellations.
543 static int lov_lock_enqueue(const struct lu_env
*env
,
544 const struct cl_lock_slice
*slice
,
545 struct cl_io
*io
, __u32 enqflags
)
547 struct cl_lock
*lock
= slice
->cls_lock
;
548 struct lov_lock
*lck
= cl2lov_lock(slice
);
549 struct cl_lock_closure
*closure
= lov_closure_get(env
, lock
);
552 enum cl_lock_state minstate
;
554 for (result
= 0, minstate
= CLS_FREEING
, i
= 0; i
< lck
->lls_nr
; ++i
) {
556 struct lovsub_lock
*sub
;
557 struct lov_lock_sub
*lls
;
558 struct cl_lock
*sublock
;
559 struct lov_sublock_env
*subenv
;
561 if (lock
->cll_state
!= CLS_QUEUING
) {
563 * Lock might have left QUEUING state if previous
564 * iteration released its mutex. Stop enqueing in this
565 * case and let the upper layer to decide what to do.
567 LASSERT(i
> 0 && result
!= 0);
571 lls
= &lck
->lls_sub
[i
];
574 * Sub-lock might have been canceled, while top-lock was
578 result
= lov_sublock_fill(env
, lock
, io
, lck
, i
);
579 /* lov_sublock_fill() released @lock mutex,
583 sublock
= sub
->lss_cl
.cls_lock
;
584 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, &subenv
);
586 lov_sublock_hold(env
, lck
, i
);
587 rc
= lov_lock_enqueue_one(subenv
->lse_env
, lck
, sublock
,
588 subenv
->lse_io
, enqflags
,
589 i
== lck
->lls_nr
- 1);
590 minstate
= min(minstate
, sublock
->cll_state
);
591 if (rc
== CLO_WAIT
) {
592 switch (sublock
->cll_state
) {
594 /* take recursive mutex, the lock is
595 * released in lov_lock_enqueue_wait.
597 cl_lock_mutex_get(env
, sublock
);
598 lov_sublock_unlock(env
, sub
, closure
,
600 rc
= lov_lock_enqueue_wait(env
, lck
,
604 cl_lock_get(sublock
);
605 /* take recursive mutex of sublock */
606 cl_lock_mutex_get(env
, sublock
);
607 /* need to release all locks in closure
608 * otherwise it may deadlock. LU-2683.*/
609 lov_sublock_unlock(env
, sub
, closure
,
611 /* sublock and parent are held. */
612 rc
= lov_sublock_release(env
, lck
, i
,
614 cl_lock_mutex_put(env
, sublock
);
615 cl_lock_put(env
, sublock
);
618 lov_sublock_unlock(env
, sub
, closure
,
623 LASSERT(sublock
->cll_conflict
== NULL
);
624 lov_sublock_unlock(env
, sub
, closure
, subenv
);
627 result
= lov_subresult(result
, rc
);
631 cl_lock_closure_fini(closure
);
632 return result
?: minstate
>= CLS_ENQUEUED
? 0 : CLO_WAIT
;
635 static int lov_lock_unuse(const struct lu_env
*env
,
636 const struct cl_lock_slice
*slice
)
638 struct lov_lock
*lck
= cl2lov_lock(slice
);
639 struct cl_lock_closure
*closure
= lov_closure_get(env
, slice
->cls_lock
);
643 for (result
= 0, i
= 0; i
< lck
->lls_nr
; ++i
) {
645 struct lovsub_lock
*sub
;
646 struct cl_lock
*sublock
;
647 struct lov_lock_sub
*lls
;
648 struct lov_sublock_env
*subenv
;
650 /* top-lock state cannot change concurrently, because single
651 * thread (one that released the last hold) carries unlocking
652 * to the completion. */
653 LASSERT(slice
->cls_lock
->cll_state
== CLS_INTRANSIT
);
654 lls
= &lck
->lls_sub
[i
];
659 sublock
= sub
->lss_cl
.cls_lock
;
660 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, &subenv
);
662 if (lls
->sub_flags
& LSF_HELD
) {
663 LASSERT(sublock
->cll_state
== CLS_HELD
||
664 sublock
->cll_state
== CLS_ENQUEUED
);
665 rc
= cl_unuse_try(subenv
->lse_env
, sublock
);
666 rc
= lov_sublock_release(env
, lck
, i
, 0, rc
);
668 lov_sublock_unlock(env
, sub
, closure
, subenv
);
670 result
= lov_subresult(result
, rc
);
673 if (result
== 0 && lck
->lls_cancel_race
) {
674 lck
->lls_cancel_race
= 0;
677 cl_lock_closure_fini(closure
);
681 static void lov_lock_cancel(const struct lu_env
*env
,
682 const struct cl_lock_slice
*slice
)
684 struct lov_lock
*lck
= cl2lov_lock(slice
);
685 struct cl_lock_closure
*closure
= lov_closure_get(env
, slice
->cls_lock
);
689 for (result
= 0, i
= 0; i
< lck
->lls_nr
; ++i
) {
691 struct lovsub_lock
*sub
;
692 struct cl_lock
*sublock
;
693 struct lov_lock_sub
*lls
;
694 struct lov_sublock_env
*subenv
;
696 /* top-lock state cannot change concurrently, because single
697 * thread (one that released the last hold) carries unlocking
698 * to the completion. */
699 lls
= &lck
->lls_sub
[i
];
704 sublock
= sub
->lss_cl
.cls_lock
;
705 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, &subenv
);
707 if (!(lls
->sub_flags
& LSF_HELD
)) {
708 lov_sublock_unlock(env
, sub
, closure
, subenv
);
712 switch (sublock
->cll_state
) {
714 rc
= cl_unuse_try(subenv
->lse_env
, sublock
);
715 lov_sublock_release(env
, lck
, i
, 0, 0);
718 lov_sublock_release(env
, lck
, i
, 1, 0);
721 lov_sublock_unlock(env
, sub
, closure
, subenv
);
724 if (rc
== CLO_REPEAT
) {
729 result
= lov_subresult(result
, rc
);
733 CL_LOCK_DEBUG(D_ERROR
, env
, slice
->cls_lock
,
734 "lov_lock_cancel fails with %d.\n", result
);
736 cl_lock_closure_fini(closure
);
739 static int lov_lock_wait(const struct lu_env
*env
,
740 const struct cl_lock_slice
*slice
)
742 struct lov_lock
*lck
= cl2lov_lock(slice
);
743 struct cl_lock_closure
*closure
= lov_closure_get(env
, slice
->cls_lock
);
744 enum cl_lock_state minstate
;
750 for (result
= 0, minstate
= CLS_FREEING
, i
= 0, reenqueued
= 0;
751 i
< lck
->lls_nr
; ++i
) {
753 struct lovsub_lock
*sub
;
754 struct cl_lock
*sublock
;
755 struct lov_lock_sub
*lls
;
756 struct lov_sublock_env
*subenv
;
758 lls
= &lck
->lls_sub
[i
];
760 LASSERT(sub
!= NULL
);
761 sublock
= sub
->lss_cl
.cls_lock
;
762 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, &subenv
);
764 LASSERT(sublock
->cll_state
>= CLS_ENQUEUED
);
765 if (sublock
->cll_state
< CLS_HELD
)
766 rc
= cl_wait_try(env
, sublock
);
768 minstate
= min(minstate
, sublock
->cll_state
);
769 lov_sublock_unlock(env
, sub
, closure
, subenv
);
771 if (rc
== CLO_REENQUEUED
) {
775 result
= lov_subresult(result
, rc
);
779 /* Each sublock only can be reenqueued once, so will not loop for
781 if (result
== 0 && reenqueued
!= 0)
783 cl_lock_closure_fini(closure
);
784 return result
?: minstate
>= CLS_HELD
? 0 : CLO_WAIT
;
787 static int lov_lock_use(const struct lu_env
*env
,
788 const struct cl_lock_slice
*slice
)
790 struct lov_lock
*lck
= cl2lov_lock(slice
);
791 struct cl_lock_closure
*closure
= lov_closure_get(env
, slice
->cls_lock
);
795 LASSERT(slice
->cls_lock
->cll_state
== CLS_INTRANSIT
);
797 for (result
= 0, i
= 0; i
< lck
->lls_nr
; ++i
) {
799 struct lovsub_lock
*sub
;
800 struct cl_lock
*sublock
;
801 struct lov_lock_sub
*lls
;
802 struct lov_sublock_env
*subenv
;
804 LASSERT(slice
->cls_lock
->cll_state
== CLS_INTRANSIT
);
806 lls
= &lck
->lls_sub
[i
];
810 * Sub-lock might have been canceled, while top-lock was
817 sublock
= sub
->lss_cl
.cls_lock
;
818 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, &subenv
);
820 LASSERT(sublock
->cll_state
!= CLS_FREEING
);
821 lov_sublock_hold(env
, lck
, i
);
822 if (sublock
->cll_state
== CLS_CACHED
) {
823 rc
= cl_use_try(subenv
->lse_env
, sublock
, 0);
825 rc
= lov_sublock_release(env
, lck
,
827 } else if (sublock
->cll_state
== CLS_NEW
) {
828 /* Sub-lock might have been canceled, while
829 * top-lock was cached. */
831 lov_sublock_release(env
, lck
, i
, 1, result
);
833 lov_sublock_unlock(env
, sub
, closure
, subenv
);
835 result
= lov_subresult(result
, rc
);
840 if (lck
->lls_cancel_race
) {
842 * If there is unlocking happened at the same time, then
843 * sublock_lock state should be FREEING, and lov_sublock_lock
844 * should return CLO_REPEAT. In this case, it should return
845 * ESTALE, and up layer should reset the lock state to be NEW.
847 lck
->lls_cancel_race
= 0;
848 LASSERT(result
!= 0);
851 cl_lock_closure_fini(closure
);
856 static int lock_lock_multi_match()
858 struct cl_lock
*lock
= slice
->cls_lock
;
859 struct cl_lock_descr
*subneed
= &lov_env_info(env
)->lti_ldescr
;
860 struct lov_object
*loo
= cl2lov(lov
->lls_cl
.cls_obj
);
861 struct lov_layout_raid0
*r0
= lov_r0(loo
);
862 struct lov_lock_sub
*sub
;
863 struct cl_object
*subobj
;
870 fstart
= cl_offset(need
->cld_obj
, need
->cld_start
);
871 fend
= cl_offset(need
->cld_obj
, need
->cld_end
+ 1) - 1;
872 subneed
->cld_mode
= need
->cld_mode
;
873 cl_lock_mutex_get(env
, lock
);
874 for (i
= 0; i
< lov
->lls_nr
; ++i
) {
875 sub
= &lov
->lls_sub
[i
];
876 if (sub
->sub_lock
== NULL
)
878 subobj
= sub
->sub_descr
.cld_obj
;
879 if (!lov_stripe_intersects(loo
->lo_lsm
, sub
->sub_stripe
,
880 fstart
, fend
, &start
, &end
))
882 subneed
->cld_start
= cl_index(subobj
, start
);
883 subneed
->cld_end
= cl_index(subobj
, end
);
884 subneed
->cld_obj
= subobj
;
885 if (!cl_lock_ext_match(&sub
->sub_got
, subneed
)) {
890 cl_lock_mutex_put(env
, lock
);
895 * Check if the extent region \a descr is covered by \a child against the
896 * specific \a stripe.
898 static int lov_lock_stripe_is_matching(const struct lu_env
*env
,
899 struct lov_object
*lov
, int stripe
,
900 const struct cl_lock_descr
*child
,
901 const struct cl_lock_descr
*descr
)
903 struct lov_stripe_md
*lsm
= lov
->lo_lsm
;
908 if (lov_r0(lov
)->lo_nr
== 1)
909 return cl_lock_ext_match(child
, descr
);
912 * For a multi-stripes object:
913 * - make sure the descr only covers child's stripe, and
914 * - check if extent is matching.
916 start
= cl_offset(&lov
->lo_cl
, descr
->cld_start
);
917 end
= cl_offset(&lov
->lo_cl
, descr
->cld_end
+ 1) - 1;
919 /* glimpse should work on the object with LOV EA hole. */
920 if (end
- start
<= lsm
->lsm_stripe_size
) {
923 idx
= lov_stripe_number(lsm
, start
);
925 unlikely(lov_r0(lov
)->lo_sub
[idx
] == NULL
)) {
926 idx
= lov_stripe_number(lsm
, end
);
928 unlikely(lov_r0(lov
)->lo_sub
[idx
] == NULL
))
934 struct cl_lock_descr
*subd
= &lov_env_info(env
)->lti_ldescr
;
938 subd
->cld_obj
= NULL
; /* don't need sub object at all */
939 subd
->cld_mode
= descr
->cld_mode
;
940 subd
->cld_gid
= descr
->cld_gid
;
941 result
= lov_stripe_intersects(lsm
, stripe
, start
, end
,
942 &sub_start
, &sub_end
);
944 subd
->cld_start
= cl_index(child
->cld_obj
, sub_start
);
945 subd
->cld_end
= cl_index(child
->cld_obj
, sub_end
);
946 result
= cl_lock_ext_match(child
, subd
);
952 * An implementation of cl_lock_operations::clo_fits_into() method.
954 * Checks whether a lock (given by \a slice) is suitable for \a
955 * io. Multi-stripe locks can be used only for "quick" io, like truncate, or
958 * \see ccc_lock_fits_into().
960 static int lov_lock_fits_into(const struct lu_env
*env
,
961 const struct cl_lock_slice
*slice
,
962 const struct cl_lock_descr
*need
,
963 const struct cl_io
*io
)
965 struct lov_lock
*lov
= cl2lov_lock(slice
);
966 struct lov_object
*obj
= cl2lov(slice
->cls_obj
);
969 LASSERT(cl_object_same(need
->cld_obj
, slice
->cls_obj
));
970 LASSERT(lov
->lls_nr
> 0);
972 /* for top lock, it's necessary to match enq flags otherwise it will
973 * run into problem if a sublock is missing and reenqueue. */
974 if (need
->cld_enq_flags
!= lov
->lls_orig
.cld_enq_flags
)
977 if (need
->cld_mode
== CLM_GROUP
)
979 * always allow to match group lock.
981 result
= cl_lock_ext_match(&lov
->lls_orig
, need
);
982 else if (lov
->lls_nr
== 1) {
983 struct cl_lock_descr
*got
= &lov
->lls_sub
[0].sub_got
;
985 result
= lov_lock_stripe_is_matching(env
,
986 cl2lov(slice
->cls_obj
),
987 lov
->lls_sub
[0].sub_stripe
,
989 } else if (io
->ci_type
!= CIT_SETATTR
&& io
->ci_type
!= CIT_MISC
&&
990 !cl_io_is_append(io
) && need
->cld_mode
!= CLM_PHANTOM
)
992 * Multi-stripe locks are only suitable for `quick' IO and for
998 * Most general case: multi-stripe existing lock, and
999 * (potentially) multi-stripe @need lock. Check that @need is
1000 * covered by @lov's sub-locks.
1002 * For now, ignore lock expansions made by the server, and
1003 * match against original lock extent.
1005 result
= cl_lock_ext_match(&lov
->lls_orig
, need
);
1006 CDEBUG(D_DLMTRACE
, DDESCR
"/"DDESCR
" %d %d/%d: %d\n",
1007 PDESCR(&lov
->lls_orig
), PDESCR(&lov
->lls_sub
[0].sub_got
),
1008 lov
->lls_sub
[0].sub_stripe
, lov
->lls_nr
, lov_r0(obj
)->lo_nr
,
1013 void lov_lock_unlink(const struct lu_env
*env
,
1014 struct lov_lock_link
*link
, struct lovsub_lock
*sub
)
1016 struct lov_lock
*lck
= link
->lll_super
;
1017 struct cl_lock
*parent
= lck
->lls_cl
.cls_lock
;
1019 LASSERT(cl_lock_is_mutexed(parent
));
1020 LASSERT(cl_lock_is_mutexed(sub
->lss_cl
.cls_lock
));
1022 list_del_init(&link
->lll_list
);
1023 LASSERT(lck
->lls_sub
[link
->lll_idx
].sub_lock
== sub
);
1024 /* yank this sub-lock from parent's array */
1025 lck
->lls_sub
[link
->lll_idx
].sub_lock
= NULL
;
1026 LASSERT(lck
->lls_nr_filled
> 0);
1027 lck
->lls_nr_filled
--;
1028 lu_ref_del(&parent
->cll_reference
, "lov-child", sub
->lss_cl
.cls_lock
);
1029 cl_lock_put(env
, parent
);
1030 kmem_cache_free(lov_lock_link_kmem
, link
);
1033 struct lov_lock_link
*lov_lock_link_find(const struct lu_env
*env
,
1034 struct lov_lock
*lck
,
1035 struct lovsub_lock
*sub
)
1037 struct lov_lock_link
*scan
;
1039 LASSERT(cl_lock_is_mutexed(sub
->lss_cl
.cls_lock
));
1041 list_for_each_entry(scan
, &sub
->lss_parents
, lll_list
) {
1042 if (scan
->lll_super
== lck
)
1049 * An implementation of cl_lock_operations::clo_delete() method. This is
1050 * invoked for "top-to-bottom" delete, when lock destruction starts from the
1051 * top-lock, e.g., as a result of inode destruction.
1053 * Unlinks top-lock from all its sub-locks. Sub-locks are not deleted there:
1054 * this is done separately elsewhere:
1056 * - for inode destruction, lov_object_delete() calls cl_object_kill() for
1057 * each sub-object, purging its locks;
1059 * - in other cases (e.g., a fatal error with a top-lock) sub-locks are
1060 * left in the cache.
1062 static void lov_lock_delete(const struct lu_env
*env
,
1063 const struct cl_lock_slice
*slice
)
1065 struct lov_lock
*lck
= cl2lov_lock(slice
);
1066 struct cl_lock_closure
*closure
= lov_closure_get(env
, slice
->cls_lock
);
1067 struct lov_lock_link
*link
;
1071 LASSERT(slice
->cls_lock
->cll_state
== CLS_FREEING
);
1073 for (i
= 0; i
< lck
->lls_nr
; ++i
) {
1074 struct lov_lock_sub
*lls
= &lck
->lls_sub
[i
];
1075 struct lovsub_lock
*lsl
= lls
->sub_lock
;
1077 if (lsl
== NULL
) /* already removed */
1080 rc
= lov_sublock_lock(env
, lck
, lls
, closure
, NULL
);
1081 if (rc
== CLO_REPEAT
) {
1087 LASSERT(lsl
->lss_cl
.cls_lock
->cll_state
< CLS_FREEING
);
1089 if (lls
->sub_flags
& LSF_HELD
)
1090 lov_sublock_release(env
, lck
, i
, 1, 0);
1092 link
= lov_lock_link_find(env
, lck
, lsl
);
1093 LASSERT(link
!= NULL
);
1094 lov_lock_unlink(env
, link
, lsl
);
1095 LASSERT(lck
->lls_sub
[i
].sub_lock
== NULL
);
1097 lov_sublock_unlock(env
, lsl
, closure
, NULL
);
1100 cl_lock_closure_fini(closure
);
1103 static int lov_lock_print(const struct lu_env
*env
, void *cookie
,
1104 lu_printer_t p
, const struct cl_lock_slice
*slice
)
1106 struct lov_lock
*lck
= cl2lov_lock(slice
);
1109 (*p
)(env
, cookie
, "%d\n", lck
->lls_nr
);
1110 for (i
= 0; i
< lck
->lls_nr
; ++i
) {
1111 struct lov_lock_sub
*sub
;
1113 sub
= &lck
->lls_sub
[i
];
1114 (*p
)(env
, cookie
, " %d %x: ", i
, sub
->sub_flags
);
1115 if (sub
->sub_lock
!= NULL
)
1116 cl_lock_print(env
, cookie
, p
,
1117 sub
->sub_lock
->lss_cl
.cls_lock
);
1119 (*p
)(env
, cookie
, "---\n");
1124 static const struct cl_lock_operations lov_lock_ops
= {
1125 .clo_fini
= lov_lock_fini
,
1126 .clo_enqueue
= lov_lock_enqueue
,
1127 .clo_wait
= lov_lock_wait
,
1128 .clo_use
= lov_lock_use
,
1129 .clo_unuse
= lov_lock_unuse
,
1130 .clo_cancel
= lov_lock_cancel
,
1131 .clo_fits_into
= lov_lock_fits_into
,
1132 .clo_delete
= lov_lock_delete
,
1133 .clo_print
= lov_lock_print
1136 int lov_lock_init_raid0(const struct lu_env
*env
, struct cl_object
*obj
,
1137 struct cl_lock
*lock
, const struct cl_io
*io
)
1139 struct lov_lock
*lck
;
1142 lck
= kmem_cache_alloc(lov_lock_kmem
, GFP_NOFS
| __GFP_ZERO
);
1144 cl_lock_slice_add(lock
, &lck
->lls_cl
, obj
, &lov_lock_ops
);
1145 result
= lov_lock_sub_init(env
, lck
, io
);
1151 static void lov_empty_lock_fini(const struct lu_env
*env
,
1152 struct cl_lock_slice
*slice
)
1154 struct lov_lock
*lck
= cl2lov_lock(slice
);
1156 kmem_cache_free(lov_lock_kmem
, lck
);
1159 static int lov_empty_lock_print(const struct lu_env
*env
, void *cookie
,
1160 lu_printer_t p
, const struct cl_lock_slice
*slice
)
1162 (*p
)(env
, cookie
, "empty\n");
1166 /* XXX: more methods will be added later. */
1167 static const struct cl_lock_operations lov_empty_lock_ops
= {
1168 .clo_fini
= lov_empty_lock_fini
,
1169 .clo_print
= lov_empty_lock_print
1172 int lov_lock_init_empty(const struct lu_env
*env
, struct cl_object
*obj
,
1173 struct cl_lock
*lock
, const struct cl_io
*io
)
1175 struct lov_lock
*lck
;
1176 int result
= -ENOMEM
;
1178 lck
= kmem_cache_alloc(lov_lock_kmem
, GFP_NOFS
| __GFP_ZERO
);
1180 cl_lock_slice_add(lock
, &lck
->lls_cl
, obj
, &lov_empty_lock_ops
);
1181 lck
->lls_orig
= lock
->cll_descr
;
1187 static struct cl_lock_closure
*lov_closure_get(const struct lu_env
*env
,
1188 struct cl_lock
*parent
)
1190 struct cl_lock_closure
*closure
;
1192 closure
= &lov_env_info(env
)->lti_closure
;
1193 LASSERT(list_empty(&closure
->clc_list
));
1194 cl_lock_closure_init(env
, closure
, parent
, 1);