4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2015, Intel Corporation.
30 * This file is part of Lustre, http://www.lustre.org/
31 * Lustre is a trademark of Sun Microsystems, Inc.
33 * osc cache management.
35 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
38 #define DEBUG_SUBSYSTEM S_OSC
40 #include "osc_cl_internal.h"
41 #include "osc_internal.h"
43 static int extent_debug
; /* set it to be true for more debug */
45 static void osc_update_pending(struct osc_object
*obj
, int cmd
, int delta
);
46 static int osc_extent_wait(const struct lu_env
*env
, struct osc_extent
*ext
,
48 static void osc_ap_completion(const struct lu_env
*env
, struct client_obd
*cli
,
49 struct osc_async_page
*oap
, int sent
, int rc
);
50 static int osc_make_ready(const struct lu_env
*env
, struct osc_async_page
*oap
,
52 static int osc_refresh_count(const struct lu_env
*env
,
53 struct osc_async_page
*oap
, int cmd
);
54 static int osc_io_unplug_async(const struct lu_env
*env
,
55 struct client_obd
*cli
, struct osc_object
*osc
);
56 static void osc_free_grant(struct client_obd
*cli
, unsigned int nr_pages
,
57 unsigned int lost_grant
);
59 static void osc_extent_tree_dump0(int level
, struct osc_object
*obj
,
60 const char *func
, int line
);
61 #define osc_extent_tree_dump(lvl, obj) \
62 osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
68 /* ------------------ osc extent ------------------ */
69 static inline char *ext_flags(struct osc_extent
*ext
, char *flags
)
72 *buf
++ = ext
->oe_rw
? 'r' : 'w';
85 if (ext
->oe_trunc_pending
)
87 if (ext
->oe_fsync_wait
)
93 static inline char list_empty_marker(struct list_head
*list
)
95 return list_empty(list
) ? '-' : '+';
98 #define EXTSTR "[%lu -> %lu/%lu]"
99 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
100 static const char *oes_strings
[] = {
101 "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL
};
103 #define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do { \
104 struct osc_extent *__ext = (extent); \
108 "extent %p@{" EXTSTR ", " \
109 "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt, \
110 /* ----- extent part 0 ----- */ \
111 __ext, EXTPARA(__ext), \
112 /* ----- part 1 ----- */ \
113 atomic_read(&__ext->oe_refc), \
114 atomic_read(&__ext->oe_users), \
115 list_empty_marker(&__ext->oe_link), \
116 oes_strings[__ext->oe_state], ext_flags(__ext, __buf), \
118 /* ----- part 2 ----- */ \
119 __ext->oe_grants, __ext->oe_nr_pages, \
120 list_empty_marker(&__ext->oe_pages), \
121 waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
122 __ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner, \
123 /* ----- part 4 ----- */ \
125 if (lvl == D_ERROR && __ext->oe_dlmlock) \
126 LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext); \
128 LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext); \
132 #define EASSERTF(expr, ext, fmt, args...) do { \
134 OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args); \
135 osc_extent_tree_dump(D_ERROR, (ext)->oe_obj); \
141 #define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
143 static inline struct osc_extent
*rb_extent(struct rb_node
*n
)
148 return container_of(n
, struct osc_extent
, oe_node
);
151 static inline struct osc_extent
*next_extent(struct osc_extent
*ext
)
156 LASSERT(ext
->oe_intree
);
157 return rb_extent(rb_next(&ext
->oe_node
));
160 static inline struct osc_extent
*prev_extent(struct osc_extent
*ext
)
165 LASSERT(ext
->oe_intree
);
166 return rb_extent(rb_prev(&ext
->oe_node
));
169 static inline struct osc_extent
*first_extent(struct osc_object
*obj
)
171 return rb_extent(rb_first(&obj
->oo_root
));
174 /* object must be locked by caller. */
175 static int osc_extent_sanity_check0(struct osc_extent
*ext
,
176 const char *func
, const int line
)
178 struct osc_object
*obj
= ext
->oe_obj
;
179 struct osc_async_page
*oap
;
183 if (!osc_object_is_locked(obj
)) {
188 if (ext
->oe_state
>= OES_STATE_MAX
) {
193 if (atomic_read(&ext
->oe_refc
) <= 0) {
198 if (atomic_read(&ext
->oe_refc
) < atomic_read(&ext
->oe_users
)) {
203 switch (ext
->oe_state
) {
205 if (ext
->oe_nr_pages
> 0 || !list_empty(&ext
->oe_pages
))
211 if (atomic_read(&ext
->oe_users
) == 0) {
219 if (ext
->oe_fsync_wait
&& !ext
->oe_urgent
) {
225 if (ext
->oe_grants
== 0) {
229 if (ext
->oe_fsync_wait
&& !ext
->oe_urgent
&& !ext
->oe_hp
) {
234 if (atomic_read(&ext
->oe_users
) > 0) {
240 if (ext
->oe_max_end
< ext
->oe_end
|| ext
->oe_end
< ext
->oe_start
) {
245 if (ext
->oe_sync
&& ext
->oe_grants
> 0) {
250 if (ext
->oe_dlmlock
) {
251 struct ldlm_extent
*extent
;
253 extent
= &ext
->oe_dlmlock
->l_policy_data
.l_extent
;
254 if (!(extent
->start
<= cl_offset(osc2cl(obj
), ext
->oe_start
) &&
255 extent
->end
>= cl_offset(osc2cl(obj
), ext
->oe_max_end
))) {
260 if (!(ext
->oe_dlmlock
->l_granted_mode
& (LCK_PW
| LCK_GROUP
))) {
266 if (ext
->oe_nr_pages
> ext
->oe_mppr
) {
271 /* Do not verify page list if extent is in RPC. This is because an
272 * in-RPC extent is supposed to be exclusively accessible w/o lock.
274 if (ext
->oe_state
> OES_CACHE
) {
285 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
286 pgoff_t index
= osc_index(oap2osc(oap
));
288 if (index
> ext
->oe_end
|| index
< ext
->oe_start
) {
293 if (page_count
!= ext
->oe_nr_pages
) {
300 OSC_EXTENT_DUMP(D_ERROR
, ext
,
301 "%s:%d sanity check %p failed with rc = %d\n",
302 func
, line
, ext
, rc
);
306 #define sanity_check_nolock(ext) \
307 osc_extent_sanity_check0(ext, __func__, __LINE__)
309 #define sanity_check(ext) ({ \
311 osc_object_lock((ext)->oe_obj); \
312 __res = sanity_check_nolock(ext); \
313 osc_object_unlock((ext)->oe_obj); \
318 * sanity check - to make sure there is no overlapped extent in the tree.
320 static int osc_extent_is_overlapped(struct osc_object
*obj
,
321 struct osc_extent
*ext
)
323 struct osc_extent
*tmp
;
325 LASSERT(osc_object_is_locked(obj
));
330 for (tmp
= first_extent(obj
); tmp
; tmp
= next_extent(tmp
)) {
333 if (tmp
->oe_end
>= ext
->oe_start
&&
334 tmp
->oe_start
<= ext
->oe_end
)
340 static void osc_extent_state_set(struct osc_extent
*ext
, int state
)
342 LASSERT(osc_object_is_locked(ext
->oe_obj
));
343 LASSERT(state
>= OES_INV
&& state
< OES_STATE_MAX
);
345 /* Never try to sanity check a state changing extent :-) */
346 /* LASSERT(sanity_check_nolock(ext) == 0); */
348 /* TODO: validate the state machine */
349 ext
->oe_state
= state
;
350 wake_up_all(&ext
->oe_waitq
);
353 static struct osc_extent
*osc_extent_alloc(struct osc_object
*obj
)
355 struct osc_extent
*ext
;
357 ext
= kmem_cache_zalloc(osc_extent_kmem
, GFP_NOFS
);
361 RB_CLEAR_NODE(&ext
->oe_node
);
363 atomic_set(&ext
->oe_refc
, 1);
364 atomic_set(&ext
->oe_users
, 0);
365 INIT_LIST_HEAD(&ext
->oe_link
);
366 ext
->oe_state
= OES_INV
;
367 INIT_LIST_HEAD(&ext
->oe_pages
);
368 init_waitqueue_head(&ext
->oe_waitq
);
369 ext
->oe_dlmlock
= NULL
;
374 static void osc_extent_free(struct osc_extent
*ext
)
376 kmem_cache_free(osc_extent_kmem
, ext
);
379 static struct osc_extent
*osc_extent_get(struct osc_extent
*ext
)
381 LASSERT(atomic_read(&ext
->oe_refc
) >= 0);
382 atomic_inc(&ext
->oe_refc
);
386 static void osc_extent_put(const struct lu_env
*env
, struct osc_extent
*ext
)
388 LASSERT(atomic_read(&ext
->oe_refc
) > 0);
389 if (atomic_dec_and_test(&ext
->oe_refc
)) {
390 LASSERT(list_empty(&ext
->oe_link
));
391 LASSERT(atomic_read(&ext
->oe_users
) == 0);
392 LASSERT(ext
->oe_state
== OES_INV
);
393 LASSERT(!ext
->oe_intree
);
395 if (ext
->oe_dlmlock
) {
396 lu_ref_add(&ext
->oe_dlmlock
->l_reference
,
398 LDLM_LOCK_PUT(ext
->oe_dlmlock
);
399 ext
->oe_dlmlock
= NULL
;
401 osc_extent_free(ext
);
406 * osc_extent_put_trust() is a special version of osc_extent_put() when
407 * it's known that the caller is not the last user. This is to address the
408 * problem of lacking of lu_env ;-).
410 static void osc_extent_put_trust(struct osc_extent
*ext
)
412 LASSERT(atomic_read(&ext
->oe_refc
) > 1);
413 LASSERT(osc_object_is_locked(ext
->oe_obj
));
414 atomic_dec(&ext
->oe_refc
);
418 * Return the extent which includes pgoff @index, or return the greatest
419 * previous extent in the tree.
421 static struct osc_extent
*osc_extent_search(struct osc_object
*obj
,
424 struct rb_node
*n
= obj
->oo_root
.rb_node
;
425 struct osc_extent
*tmp
, *p
= NULL
;
427 LASSERT(osc_object_is_locked(obj
));
430 if (index
< tmp
->oe_start
) {
432 } else if (index
> tmp
->oe_end
) {
443 * Return the extent covering @index, otherwise return NULL.
444 * caller must have held object lock.
446 static struct osc_extent
*osc_extent_lookup(struct osc_object
*obj
,
449 struct osc_extent
*ext
;
451 ext
= osc_extent_search(obj
, index
);
452 if (ext
&& ext
->oe_start
<= index
&& index
<= ext
->oe_end
)
453 return osc_extent_get(ext
);
457 /* caller must have held object lock. */
458 static void osc_extent_insert(struct osc_object
*obj
, struct osc_extent
*ext
)
460 struct rb_node
**n
= &obj
->oo_root
.rb_node
;
461 struct rb_node
*parent
= NULL
;
462 struct osc_extent
*tmp
;
464 LASSERT(ext
->oe_intree
== 0);
465 LASSERT(ext
->oe_obj
== obj
);
466 LASSERT(osc_object_is_locked(obj
));
471 if (ext
->oe_end
< tmp
->oe_start
)
473 else if (ext
->oe_start
> tmp
->oe_end
)
476 EASSERTF(0, tmp
, EXTSTR
"\n", EXTPARA(ext
));
478 rb_link_node(&ext
->oe_node
, parent
, n
);
479 rb_insert_color(&ext
->oe_node
, &obj
->oo_root
);
484 /* caller must have held object lock. */
485 static void osc_extent_erase(struct osc_extent
*ext
)
487 struct osc_object
*obj
= ext
->oe_obj
;
489 LASSERT(osc_object_is_locked(obj
));
490 if (ext
->oe_intree
) {
491 rb_erase(&ext
->oe_node
, &obj
->oo_root
);
493 /* rbtree held a refcount */
494 osc_extent_put_trust(ext
);
498 static struct osc_extent
*osc_extent_hold(struct osc_extent
*ext
)
500 struct osc_object
*obj
= ext
->oe_obj
;
502 LASSERT(osc_object_is_locked(obj
));
503 LASSERT(ext
->oe_state
== OES_ACTIVE
|| ext
->oe_state
== OES_CACHE
);
504 if (ext
->oe_state
== OES_CACHE
) {
505 osc_extent_state_set(ext
, OES_ACTIVE
);
506 osc_update_pending(obj
, OBD_BRW_WRITE
, -ext
->oe_nr_pages
);
508 atomic_inc(&ext
->oe_users
);
509 list_del_init(&ext
->oe_link
);
510 return osc_extent_get(ext
);
513 static void __osc_extent_remove(struct osc_extent
*ext
)
515 LASSERT(osc_object_is_locked(ext
->oe_obj
));
516 LASSERT(list_empty(&ext
->oe_pages
));
517 osc_extent_erase(ext
);
518 list_del_init(&ext
->oe_link
);
519 osc_extent_state_set(ext
, OES_INV
);
520 OSC_EXTENT_DUMP(D_CACHE
, ext
, "destroyed.\n");
523 static void osc_extent_remove(struct osc_extent
*ext
)
525 struct osc_object
*obj
= ext
->oe_obj
;
527 osc_object_lock(obj
);
528 __osc_extent_remove(ext
);
529 osc_object_unlock(obj
);
533 * This function is used to merge extents to get better performance. It checks
534 * if @cur and @victim are contiguous at chunk level.
536 static int osc_extent_merge(const struct lu_env
*env
, struct osc_extent
*cur
,
537 struct osc_extent
*victim
)
539 struct osc_object
*obj
= cur
->oe_obj
;
544 LASSERT(cur
->oe_state
== OES_CACHE
);
545 LASSERT(osc_object_is_locked(obj
));
549 if (victim
->oe_state
!= OES_CACHE
|| victim
->oe_fsync_wait
)
552 if (cur
->oe_max_end
!= victim
->oe_max_end
)
555 LASSERT(cur
->oe_dlmlock
== victim
->oe_dlmlock
);
556 ppc_bits
= osc_cli(obj
)->cl_chunkbits
- PAGE_SHIFT
;
557 chunk_start
= cur
->oe_start
>> ppc_bits
;
558 chunk_end
= cur
->oe_end
>> ppc_bits
;
559 if (chunk_start
!= (victim
->oe_end
>> ppc_bits
) + 1 &&
560 chunk_end
+ 1 != victim
->oe_start
>> ppc_bits
)
563 OSC_EXTENT_DUMP(D_CACHE
, victim
, "will be merged by %p.\n", cur
);
565 cur
->oe_start
= min(cur
->oe_start
, victim
->oe_start
);
566 cur
->oe_end
= max(cur
->oe_end
, victim
->oe_end
);
567 cur
->oe_grants
+= victim
->oe_grants
;
568 cur
->oe_nr_pages
+= victim
->oe_nr_pages
;
569 /* only the following bits are needed to merge */
570 cur
->oe_urgent
|= victim
->oe_urgent
;
571 cur
->oe_memalloc
|= victim
->oe_memalloc
;
572 list_splice_init(&victim
->oe_pages
, &cur
->oe_pages
);
573 list_del_init(&victim
->oe_link
);
574 victim
->oe_nr_pages
= 0;
576 osc_extent_get(victim
);
577 __osc_extent_remove(victim
);
578 osc_extent_put(env
, victim
);
580 OSC_EXTENT_DUMP(D_CACHE
, cur
, "after merging %p.\n", victim
);
585 * Drop user count of osc_extent, and unplug IO asynchronously.
587 void osc_extent_release(const struct lu_env
*env
, struct osc_extent
*ext
)
589 struct osc_object
*obj
= ext
->oe_obj
;
591 LASSERT(atomic_read(&ext
->oe_users
) > 0);
592 LASSERT(sanity_check(ext
) == 0);
593 LASSERT(ext
->oe_grants
> 0);
595 if (atomic_dec_and_lock(&ext
->oe_users
, &obj
->oo_lock
)) {
596 LASSERT(ext
->oe_state
== OES_ACTIVE
);
597 if (ext
->oe_trunc_pending
) {
598 /* a truncate process is waiting for this extent.
599 * This may happen due to a race, check
600 * osc_cache_truncate_start().
602 osc_extent_state_set(ext
, OES_TRUNC
);
603 ext
->oe_trunc_pending
= 0;
605 osc_extent_state_set(ext
, OES_CACHE
);
606 osc_update_pending(obj
, OBD_BRW_WRITE
,
609 /* try to merge the previous and next extent. */
610 osc_extent_merge(env
, ext
, prev_extent(ext
));
611 osc_extent_merge(env
, ext
, next_extent(ext
));
614 list_move_tail(&ext
->oe_link
,
615 &obj
->oo_urgent_exts
);
617 osc_object_unlock(obj
);
619 osc_io_unplug_async(env
, osc_cli(obj
), obj
);
621 osc_extent_put(env
, ext
);
624 static inline int overlapped(struct osc_extent
*ex1
, struct osc_extent
*ex2
)
626 return !(ex1
->oe_end
< ex2
->oe_start
|| ex2
->oe_end
< ex1
->oe_start
);
630 * Find or create an extent which includes @index, core function to manage
633 static struct osc_extent
*osc_extent_find(const struct lu_env
*env
,
634 struct osc_object
*obj
, pgoff_t index
,
637 struct client_obd
*cli
= osc_cli(obj
);
638 struct osc_lock
*olck
;
639 struct cl_lock_descr
*descr
;
640 struct osc_extent
*cur
;
641 struct osc_extent
*ext
;
642 struct osc_extent
*conflict
= NULL
;
643 struct osc_extent
*found
= NULL
;
646 int max_pages
; /* max_pages_per_rpc */
648 int ppc_bits
; /* pages per chunk bits */
652 cur
= osc_extent_alloc(obj
);
654 return ERR_PTR(-ENOMEM
);
656 olck
= osc_env_io(env
)->oi_write_osclock
;
657 LASSERTF(olck
, "page %lu is not covered by lock\n", index
);
658 LASSERT(olck
->ols_state
== OLS_GRANTED
);
660 descr
= &olck
->ols_cl
.cls_lock
->cll_descr
;
661 LASSERT(descr
->cld_mode
>= CLM_WRITE
);
663 LASSERT(cli
->cl_chunkbits
>= PAGE_SHIFT
);
664 ppc_bits
= cli
->cl_chunkbits
- PAGE_SHIFT
;
665 chunk_mask
= ~((1 << ppc_bits
) - 1);
666 chunksize
= 1 << cli
->cl_chunkbits
;
667 chunk
= index
>> ppc_bits
;
669 /* align end to rpc edge, rpc size may not be a power 2 integer. */
670 max_pages
= cli
->cl_max_pages_per_rpc
;
671 LASSERT((max_pages
& ~chunk_mask
) == 0);
672 max_end
= index
- (index
% max_pages
) + max_pages
- 1;
673 max_end
= min_t(pgoff_t
, max_end
, descr
->cld_end
);
675 /* initialize new extent by parameters so far */
676 cur
->oe_max_end
= max_end
;
677 cur
->oe_start
= index
& chunk_mask
;
678 cur
->oe_end
= ((index
+ ~chunk_mask
+ 1) & chunk_mask
) - 1;
679 if (cur
->oe_start
< descr
->cld_start
)
680 cur
->oe_start
= descr
->cld_start
;
681 if (cur
->oe_end
> max_end
)
682 cur
->oe_end
= max_end
;
684 cur
->oe_mppr
= max_pages
;
685 if (olck
->ols_dlmlock
) {
686 LASSERT(olck
->ols_hold
);
687 cur
->oe_dlmlock
= LDLM_LOCK_GET(olck
->ols_dlmlock
);
688 lu_ref_add(&olck
->ols_dlmlock
->l_reference
, "osc_extent", cur
);
691 /* grants has been allocated by caller */
692 LASSERTF(*grants
>= chunksize
+ cli
->cl_extent_tax
,
693 "%u/%u/%u.\n", *grants
, chunksize
, cli
->cl_extent_tax
);
694 LASSERTF((max_end
- cur
->oe_start
) < max_pages
, EXTSTR
"\n",
698 osc_object_lock(obj
);
699 ext
= osc_extent_search(obj
, cur
->oe_start
);
701 ext
= first_extent(obj
);
703 loff_t ext_chk_start
= ext
->oe_start
>> ppc_bits
;
704 loff_t ext_chk_end
= ext
->oe_end
>> ppc_bits
;
706 LASSERT(sanity_check_nolock(ext
) == 0);
707 if (chunk
> ext_chk_end
+ 1)
710 /* if covering by different locks, no chance to match */
711 if (olck
->ols_dlmlock
!= ext
->oe_dlmlock
) {
712 EASSERTF(!overlapped(ext
, cur
), ext
,
713 EXTSTR
"\n", EXTPARA(cur
));
715 ext
= next_extent(ext
);
719 /* discontiguous chunks? */
720 if (chunk
+ 1 < ext_chk_start
) {
721 ext
= next_extent(ext
);
725 /* ok, from now on, ext and cur have these attrs:
726 * 1. covered by the same lock
727 * 2. contiguous at chunk level or overlapping.
730 if (overlapped(ext
, cur
)) {
731 /* cur is the minimum unit, so overlapping means
734 EASSERTF((ext
->oe_start
<= cur
->oe_start
&&
735 ext
->oe_end
>= cur
->oe_end
),
736 ext
, EXTSTR
"\n", EXTPARA(cur
));
738 if (ext
->oe_state
> OES_CACHE
|| ext
->oe_fsync_wait
) {
739 /* for simplicity, we wait for this extent to
740 * finish before going forward.
742 conflict
= osc_extent_get(ext
);
746 found
= osc_extent_hold(ext
);
750 /* non-overlapped extent */
751 if (ext
->oe_state
!= OES_CACHE
|| ext
->oe_fsync_wait
) {
752 /* we can't do anything for a non OES_CACHE extent, or
753 * if there is someone waiting for this extent to be
754 * flushed, try next one.
756 ext
= next_extent(ext
);
760 /* check if they belong to the same rpc slot before trying to
761 * merge. the extents are not overlapped and contiguous at
762 * chunk level to get here.
764 if (ext
->oe_max_end
!= max_end
) {
765 /* if they don't belong to the same RPC slot or
766 * max_pages_per_rpc has ever changed, do not merge.
768 ext
= next_extent(ext
);
772 /* it's required that an extent must be contiguous at chunk
773 * level so that we know the whole extent is covered by grant
774 * (the pages in the extent are NOT required to be contiguous).
775 * Otherwise, it will be too much difficult to know which
776 * chunks have grants allocated.
779 /* try to do front merge - extend ext's start */
780 if (chunk
+ 1 == ext_chk_start
) {
781 /* ext must be chunk size aligned */
782 EASSERT((ext
->oe_start
& ~chunk_mask
) == 0, ext
);
784 /* pull ext's start back to cover cur */
785 ext
->oe_start
= cur
->oe_start
;
786 ext
->oe_grants
+= chunksize
;
787 *grants
-= chunksize
;
789 found
= osc_extent_hold(ext
);
790 } else if (chunk
== ext_chk_end
+ 1) {
792 ext
->oe_end
= cur
->oe_end
;
793 ext
->oe_grants
+= chunksize
;
794 *grants
-= chunksize
;
796 /* try to merge with the next one because we just fill
799 if (osc_extent_merge(env
, ext
, next_extent(ext
)) == 0)
800 /* we can save extent tax from next extent */
801 *grants
+= cli
->cl_extent_tax
;
803 found
= osc_extent_hold(ext
);
808 ext
= next_extent(ext
);
811 osc_extent_tree_dump(D_CACHE
, obj
);
814 if (!IS_ERR(found
)) {
815 LASSERT(found
->oe_dlmlock
== cur
->oe_dlmlock
);
816 OSC_EXTENT_DUMP(D_CACHE
, found
,
817 "found caching ext for %lu.\n", index
);
819 } else if (!conflict
) {
820 /* create a new extent */
821 EASSERT(osc_extent_is_overlapped(obj
, cur
) == 0, cur
);
822 cur
->oe_grants
= chunksize
+ cli
->cl_extent_tax
;
823 *grants
-= cur
->oe_grants
;
824 LASSERT(*grants
>= 0);
826 cur
->oe_state
= OES_CACHE
;
827 found
= osc_extent_hold(cur
);
828 osc_extent_insert(obj
, cur
);
829 OSC_EXTENT_DUMP(D_CACHE
, cur
, "add into tree %lu/%lu.\n",
830 index
, descr
->cld_end
);
832 osc_object_unlock(obj
);
837 /* waiting for IO to finish. Please notice that it's impossible
838 * to be an OES_TRUNC extent.
840 rc
= osc_extent_wait(env
, conflict
, OES_INV
);
841 osc_extent_put(env
, conflict
);
852 osc_extent_put(env
, cur
);
853 LASSERT(*grants
>= 0);
858 * Called when IO is finished to an extent.
860 int osc_extent_finish(const struct lu_env
*env
, struct osc_extent
*ext
,
863 struct client_obd
*cli
= osc_cli(ext
->oe_obj
);
864 struct osc_async_page
*oap
;
865 struct osc_async_page
*tmp
;
866 int nr_pages
= ext
->oe_nr_pages
;
868 int blocksize
= cli
->cl_import
->imp_obd
->obd_osfs
.os_bsize
? : 4096;
872 OSC_EXTENT_DUMP(D_CACHE
, ext
, "extent finished.\n");
874 ext
->oe_rc
= rc
?: ext
->oe_nr_pages
;
875 EASSERT(ergo(rc
== 0, ext
->oe_state
== OES_RPC
), ext
);
877 osc_lru_add_batch(cli
, &ext
->oe_pages
);
878 list_for_each_entry_safe(oap
, tmp
, &ext
->oe_pages
, oap_pending_item
) {
879 list_del_init(&oap
->oap_rpc_item
);
880 list_del_init(&oap
->oap_pending_item
);
881 if (last_off
<= oap
->oap_obj_off
) {
882 last_off
= oap
->oap_obj_off
;
883 last_count
= oap
->oap_count
;
887 osc_ap_completion(env
, cli
, oap
, sent
, rc
);
889 EASSERT(ext
->oe_nr_pages
== 0, ext
);
892 lost_grant
= ext
->oe_grants
;
893 } else if (blocksize
< PAGE_SIZE
&&
894 last_count
!= PAGE_SIZE
) {
895 /* For short writes we shouldn't count parts of pages that
896 * span a whole chunk on the OST side, or our accounting goes
897 * wrong. Should match the code in filter_grant_check.
899 int offset
= last_off
& ~PAGE_MASK
;
900 int count
= last_count
+ (offset
& (blocksize
- 1));
901 int end
= (offset
+ last_count
) & (blocksize
- 1);
903 count
+= blocksize
- end
;
905 lost_grant
= PAGE_SIZE
- count
;
907 if (ext
->oe_grants
> 0)
908 osc_free_grant(cli
, nr_pages
, lost_grant
);
910 osc_extent_remove(ext
);
911 /* put the refcount for RPC */
912 osc_extent_put(env
, ext
);
916 static int extent_wait_cb(struct osc_extent
*ext
, int state
)
920 osc_object_lock(ext
->oe_obj
);
921 ret
= ext
->oe_state
== state
;
922 osc_object_unlock(ext
->oe_obj
);
928 * Wait for the extent's state to become @state.
930 static int osc_extent_wait(const struct lu_env
*env
, struct osc_extent
*ext
,
933 struct osc_object
*obj
= ext
->oe_obj
;
934 struct l_wait_info lwi
= LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL
,
935 LWI_ON_SIGNAL_NOOP
, NULL
);
938 osc_object_lock(obj
);
939 LASSERT(sanity_check_nolock(ext
) == 0);
940 /* `Kick' this extent only if the caller is waiting for it to be
943 if (state
== OES_INV
&& !ext
->oe_urgent
&& !ext
->oe_hp
&&
944 !ext
->oe_trunc_pending
) {
945 if (ext
->oe_state
== OES_ACTIVE
) {
947 } else if (ext
->oe_state
== OES_CACHE
) {
949 osc_extent_hold(ext
);
953 osc_object_unlock(obj
);
955 osc_extent_release(env
, ext
);
957 /* wait for the extent until its state becomes @state */
958 rc
= l_wait_event(ext
->oe_waitq
, extent_wait_cb(ext
, state
), &lwi
);
959 if (rc
== -ETIMEDOUT
) {
960 OSC_EXTENT_DUMP(D_ERROR
, ext
,
961 "%s: wait ext to %d timedout, recovery in progress?\n",
962 osc_export(obj
)->exp_obd
->obd_name
, state
);
964 lwi
= LWI_INTR(NULL
, NULL
);
965 rc
= l_wait_event(ext
->oe_waitq
, extent_wait_cb(ext
, state
),
968 if (rc
== 0 && ext
->oe_rc
< 0)
974 * Discard pages with index greater than @size. If @ext is overlapped with
975 * @size, then partial truncate happens.
977 static int osc_extent_truncate(struct osc_extent
*ext
, pgoff_t trunc_index
,
980 struct cl_env_nest nest
;
983 struct osc_object
*obj
= ext
->oe_obj
;
984 struct client_obd
*cli
= osc_cli(obj
);
985 struct osc_async_page
*oap
;
986 struct osc_async_page
*tmp
;
987 int pages_in_chunk
= 0;
988 int ppc_bits
= cli
->cl_chunkbits
- PAGE_SHIFT
;
989 __u64 trunc_chunk
= trunc_index
>> ppc_bits
;
994 LASSERT(sanity_check(ext
) == 0);
995 EASSERT(ext
->oe_state
== OES_TRUNC
, ext
);
996 EASSERT(!ext
->oe_urgent
, ext
);
998 /* Request new lu_env.
999 * We can't use that env from osc_cache_truncate_start() because
1000 * it's from lov_io_sub and not fully initialized.
1002 env
= cl_env_nested_get(&nest
);
1003 io
= &osc_env_info(env
)->oti_io
;
1004 io
->ci_obj
= cl_object_top(osc2cl(obj
));
1005 rc
= cl_io_init(env
, io
, CIT_MISC
, io
->ci_obj
);
1009 /* discard all pages with index greater then trunc_index */
1010 list_for_each_entry_safe(oap
, tmp
, &ext
->oe_pages
, oap_pending_item
) {
1011 pgoff_t index
= osc_index(oap2osc(oap
));
1012 struct cl_page
*page
= oap2cl_page(oap
);
1014 LASSERT(list_empty(&oap
->oap_rpc_item
));
1016 /* only discard the pages with their index greater than
1017 * trunc_index, and ...
1019 if (index
< trunc_index
||
1020 (index
== trunc_index
&& partial
)) {
1021 /* accounting how many pages remaining in the chunk
1022 * so that we can calculate grants correctly. */
1023 if (index
>> ppc_bits
== trunc_chunk
)
1028 list_del_init(&oap
->oap_pending_item
);
1031 lu_ref_add(&page
->cp_reference
, "truncate", current
);
1033 if (cl_page_own(env
, io
, page
) == 0) {
1034 cl_page_discard(env
, io
, page
);
1035 cl_page_disown(env
, io
, page
);
1037 LASSERT(page
->cp_state
== CPS_FREEING
);
1041 lu_ref_del(&page
->cp_reference
, "truncate", current
);
1042 cl_page_put(env
, page
);
1047 EASSERTF(ergo(ext
->oe_start
>= trunc_index
+ !!partial
,
1048 ext
->oe_nr_pages
== 0),
1049 ext
, "trunc_index %lu, partial %d\n", trunc_index
, partial
);
1051 osc_object_lock(obj
);
1052 if (ext
->oe_nr_pages
== 0) {
1053 LASSERT(pages_in_chunk
== 0);
1054 grants
= ext
->oe_grants
;
1056 } else { /* calculate how many grants we can free */
1057 int chunks
= (ext
->oe_end
>> ppc_bits
) - trunc_chunk
;
1060 /* if there is no pages in this chunk, we can also free grants
1061 * for the last chunk
1063 if (pages_in_chunk
== 0) {
1064 /* if this is the 1st chunk and no pages in this chunk,
1065 * ext->oe_nr_pages must be zero, so we should be in
1066 * the other if-clause.
1068 LASSERT(trunc_chunk
> 0);
1073 /* this is what we can free from this extent */
1074 grants
= chunks
<< cli
->cl_chunkbits
;
1075 ext
->oe_grants
-= grants
;
1076 last_index
= ((trunc_chunk
+ 1) << ppc_bits
) - 1;
1077 ext
->oe_end
= min(last_index
, ext
->oe_max_end
);
1078 LASSERT(ext
->oe_end
>= ext
->oe_start
);
1079 LASSERT(ext
->oe_grants
> 0);
1081 osc_object_unlock(obj
);
1083 if (grants
> 0 || nr_pages
> 0)
1084 osc_free_grant(cli
, nr_pages
, grants
);
1087 cl_io_fini(env
, io
);
1088 cl_env_nested_put(&nest
, env
);
1093 * This function is used to make the extent prepared for transfer.
1094 * A race with flushing page - ll_writepage() has to be handled cautiously.
1096 static int osc_extent_make_ready(const struct lu_env
*env
,
1097 struct osc_extent
*ext
)
1099 struct osc_async_page
*oap
;
1100 struct osc_async_page
*last
= NULL
;
1101 struct osc_object
*obj
= ext
->oe_obj
;
1105 /* we're going to grab page lock, so object lock must not be taken. */
1106 LASSERT(sanity_check(ext
) == 0);
1107 /* in locking state, any process should not touch this extent. */
1108 EASSERT(ext
->oe_state
== OES_LOCKING
, ext
);
1109 EASSERT(ext
->oe_owner
, ext
);
1111 OSC_EXTENT_DUMP(D_CACHE
, ext
, "make ready\n");
1113 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1115 if (!last
|| last
->oap_obj_off
< oap
->oap_obj_off
)
1118 /* checking ASYNC_READY is race safe */
1119 if ((oap
->oap_async_flags
& ASYNC_READY
) != 0)
1122 rc
= osc_make_ready(env
, oap
, OBD_BRW_WRITE
);
1125 spin_lock(&oap
->oap_lock
);
1126 oap
->oap_async_flags
|= ASYNC_READY
;
1127 spin_unlock(&oap
->oap_lock
);
1130 LASSERT((oap
->oap_async_flags
& ASYNC_READY
) != 0);
1133 LASSERTF(0, "unknown return code: %d\n", rc
);
1137 LASSERT(page_count
== ext
->oe_nr_pages
);
1139 /* the last page is the only one we need to refresh its count by
1142 if (!(last
->oap_async_flags
& ASYNC_COUNT_STABLE
)) {
1143 last
->oap_count
= osc_refresh_count(env
, last
, OBD_BRW_WRITE
);
1144 LASSERT(last
->oap_count
> 0);
1145 LASSERT(last
->oap_page_off
+ last
->oap_count
<= PAGE_SIZE
);
1146 spin_lock(&last
->oap_lock
);
1147 last
->oap_async_flags
|= ASYNC_COUNT_STABLE
;
1148 spin_unlock(&last
->oap_lock
);
1151 /* for the rest of pages, we don't need to call osf_refresh_count()
1152 * because it's known they are not the last page
1154 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1155 if (!(oap
->oap_async_flags
& ASYNC_COUNT_STABLE
)) {
1156 oap
->oap_count
= PAGE_SIZE
- oap
->oap_page_off
;
1157 spin_lock(&last
->oap_lock
);
1158 oap
->oap_async_flags
|= ASYNC_COUNT_STABLE
;
1159 spin_unlock(&last
->oap_lock
);
1163 osc_object_lock(obj
);
1164 osc_extent_state_set(ext
, OES_RPC
);
1165 osc_object_unlock(obj
);
1166 /* get a refcount for RPC. */
1167 osc_extent_get(ext
);
1173 * Quick and simple version of osc_extent_find(). This function is frequently
1174 * called to expand the extent for the same IO. To expand the extent, the
1175 * page index must be in the same or next chunk of ext->oe_end.
1177 static int osc_extent_expand(struct osc_extent
*ext
, pgoff_t index
, int *grants
)
1179 struct osc_object
*obj
= ext
->oe_obj
;
1180 struct client_obd
*cli
= osc_cli(obj
);
1181 struct osc_extent
*next
;
1182 int ppc_bits
= cli
->cl_chunkbits
- PAGE_SHIFT
;
1183 pgoff_t chunk
= index
>> ppc_bits
;
1186 int chunksize
= 1 << cli
->cl_chunkbits
;
1189 LASSERT(ext
->oe_max_end
>= index
&& ext
->oe_start
<= index
);
1190 osc_object_lock(obj
);
1191 LASSERT(sanity_check_nolock(ext
) == 0);
1192 end_chunk
= ext
->oe_end
>> ppc_bits
;
1193 if (chunk
> end_chunk
+ 1) {
1198 if (end_chunk
>= chunk
) {
1203 LASSERT(end_chunk
+ 1 == chunk
);
1204 /* try to expand this extent to cover @index */
1205 end_index
= min(ext
->oe_max_end
, ((chunk
+ 1) << ppc_bits
) - 1);
1207 next
= next_extent(ext
);
1208 if (next
&& next
->oe_start
<= end_index
) {
1209 /* complex mode - overlapped with the next extent,
1210 * this case will be handled by osc_extent_find()
1216 ext
->oe_end
= end_index
;
1217 ext
->oe_grants
+= chunksize
;
1218 *grants
-= chunksize
;
1219 LASSERT(*grants
>= 0);
1220 EASSERTF(osc_extent_is_overlapped(obj
, ext
) == 0, ext
,
1221 "overlapped after expanding for %lu.\n", index
);
1224 osc_object_unlock(obj
);
1228 static void osc_extent_tree_dump0(int level
, struct osc_object
*obj
,
1229 const char *func
, int line
)
1231 struct osc_extent
*ext
;
1234 CDEBUG(level
, "Dump object %p extents at %s:%d, mppr: %u.\n",
1235 obj
, func
, line
, osc_cli(obj
)->cl_max_pages_per_rpc
);
1237 /* osc_object_lock(obj); */
1239 for (ext
= first_extent(obj
); ext
; ext
= next_extent(ext
))
1240 OSC_EXTENT_DUMP(level
, ext
, "in tree %d.\n", cnt
++);
1243 list_for_each_entry(ext
, &obj
->oo_hp_exts
, oe_link
)
1244 OSC_EXTENT_DUMP(level
, ext
, "hp %d.\n", cnt
++);
1247 list_for_each_entry(ext
, &obj
->oo_urgent_exts
, oe_link
)
1248 OSC_EXTENT_DUMP(level
, ext
, "urgent %d.\n", cnt
++);
1251 list_for_each_entry(ext
, &obj
->oo_reading_exts
, oe_link
)
1252 OSC_EXTENT_DUMP(level
, ext
, "reading %d.\n", cnt
++);
1253 /* osc_object_unlock(obj); */
1256 /* ------------------ osc extent end ------------------ */
1258 static inline int osc_is_ready(struct osc_object
*osc
)
1260 return !list_empty(&osc
->oo_ready_item
) ||
1261 !list_empty(&osc
->oo_hp_ready_item
);
1264 #define OSC_IO_DEBUG(OSC, STR, args...) \
1265 CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR, \
1266 (OSC), osc_is_ready(OSC), \
1267 list_empty_marker(&(OSC)->oo_hp_ready_item), \
1268 list_empty_marker(&(OSC)->oo_ready_item), \
1269 atomic_read(&(OSC)->oo_nr_writes), \
1270 list_empty_marker(&(OSC)->oo_hp_exts), \
1271 list_empty_marker(&(OSC)->oo_urgent_exts), \
1272 atomic_read(&(OSC)->oo_nr_reads), \
1273 list_empty_marker(&(OSC)->oo_reading_exts), \
1276 static int osc_make_ready(const struct lu_env
*env
, struct osc_async_page
*oap
,
1279 struct osc_page
*opg
= oap2osc_page(oap
);
1280 struct cl_page
*page
= oap2cl_page(oap
);
1283 LASSERT(cmd
== OBD_BRW_WRITE
); /* no cached reads */
1285 result
= cl_page_make_ready(env
, page
, CRT_WRITE
);
1287 opg
->ops_submit_time
= cfs_time_current();
1291 static int osc_refresh_count(const struct lu_env
*env
,
1292 struct osc_async_page
*oap
, int cmd
)
1294 struct osc_page
*opg
= oap2osc_page(oap
);
1295 pgoff_t index
= osc_index(oap2osc(oap
));
1296 struct cl_object
*obj
;
1297 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
1302 /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1303 LASSERT(!(cmd
& OBD_BRW_READ
));
1304 obj
= opg
->ops_cl
.cpl_obj
;
1306 cl_object_attr_lock(obj
);
1307 result
= cl_object_attr_get(env
, obj
, attr
);
1308 cl_object_attr_unlock(obj
);
1311 kms
= attr
->cat_kms
;
1312 if (cl_offset(obj
, index
) >= kms
)
1313 /* catch race with truncate */
1315 else if (cl_offset(obj
, index
+ 1) > kms
)
1316 /* catch sub-page write at end of file */
1317 return kms
% PAGE_SIZE
;
1322 static int osc_completion(const struct lu_env
*env
, struct osc_async_page
*oap
,
1325 struct osc_page
*opg
= oap2osc_page(oap
);
1326 struct cl_page
*page
= oap2cl_page(oap
);
1327 struct osc_object
*obj
= cl2osc(opg
->ops_cl
.cpl_obj
);
1328 enum cl_req_type crt
;
1331 cmd
&= ~OBD_BRW_NOQUOTA
;
1332 LASSERTF(equi(page
->cp_state
== CPS_PAGEIN
, cmd
== OBD_BRW_READ
),
1333 "cp_state:%u, cmd:%d\n", page
->cp_state
, cmd
);
1334 LASSERTF(equi(page
->cp_state
== CPS_PAGEOUT
, cmd
== OBD_BRW_WRITE
),
1335 "cp_state:%u, cmd:%d\n", page
->cp_state
, cmd
);
1336 LASSERT(opg
->ops_transfer_pinned
);
1339 * page->cp_req can be NULL if io submission failed before
1340 * cl_req was allocated.
1343 cl_req_page_done(env
, page
);
1344 LASSERT(!page
->cp_req
);
1346 crt
= cmd
== OBD_BRW_READ
? CRT_READ
: CRT_WRITE
;
1347 /* Clear opg->ops_transfer_pinned before VM lock is released. */
1348 opg
->ops_transfer_pinned
= 0;
1350 spin_lock(&obj
->oo_seatbelt
);
1351 LASSERT(opg
->ops_submitter
);
1352 LASSERT(!list_empty(&opg
->ops_inflight
));
1353 list_del_init(&opg
->ops_inflight
);
1354 opg
->ops_submitter
= NULL
;
1355 spin_unlock(&obj
->oo_seatbelt
);
1357 opg
->ops_submit_time
= 0;
1358 srvlock
= oap
->oap_brw_flags
& OBD_BRW_SRVLOCK
;
1361 if (rc
== 0 && srvlock
) {
1362 struct lu_device
*ld
= opg
->ops_cl
.cpl_obj
->co_lu
.lo_dev
;
1363 struct osc_stats
*stats
= &lu2osc_dev(ld
)->od_stats
;
1364 int bytes
= oap
->oap_count
;
1366 if (crt
== CRT_READ
)
1367 stats
->os_lockless_reads
+= bytes
;
1369 stats
->os_lockless_writes
+= bytes
;
1373 * This has to be the last operation with the page, as locks are
1374 * released in cl_page_completion() and nothing except for the
1375 * reference counter protects page from concurrent reclaim.
1377 lu_ref_del(&page
->cp_reference
, "transfer", page
);
1379 cl_page_completion(env
, page
, crt
, rc
);
1384 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \
1385 struct client_obd *__tmp = (cli); \
1386 CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \
1387 "dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \
1388 "lru {in list: %d, left: %d, waiters: %d }" fmt, \
1389 __tmp->cl_import->imp_obd->obd_name, \
1390 __tmp->cl_dirty_pages, __tmp->cl_dirty_max_pages, \
1391 atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
1392 __tmp->cl_lost_grant, __tmp->cl_avail_grant, \
1393 __tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \
1394 atomic_read(&__tmp->cl_lru_in_list), \
1395 atomic_read(&__tmp->cl_lru_busy), \
1396 atomic_read(&__tmp->cl_lru_shrinkers), ##args); \
1399 /* caller must hold loi_list_lock */
1400 static void osc_consume_write_grant(struct client_obd
*cli
,
1401 struct brw_page
*pga
)
1403 assert_spin_locked(&cli
->cl_loi_list_lock
);
1404 LASSERT(!(pga
->flag
& OBD_BRW_FROM_GRANT
));
1405 atomic_inc(&obd_dirty_pages
);
1406 cli
->cl_dirty_pages
++;
1407 pga
->flag
|= OBD_BRW_FROM_GRANT
;
1408 CDEBUG(D_CACHE
, "using %lu grant credits for brw %p page %p\n",
1409 PAGE_SIZE
, pga
, pga
->pg
);
1410 osc_update_next_shrink(cli
);
1413 /* the companion to osc_consume_write_grant, called when a brw has completed.
1414 * must be called with the loi lock held.
1416 static void osc_release_write_grant(struct client_obd
*cli
,
1417 struct brw_page
*pga
)
1419 assert_spin_locked(&cli
->cl_loi_list_lock
);
1420 if (!(pga
->flag
& OBD_BRW_FROM_GRANT
)) {
1424 pga
->flag
&= ~OBD_BRW_FROM_GRANT
;
1425 atomic_dec(&obd_dirty_pages
);
1426 cli
->cl_dirty_pages
--;
1427 if (pga
->flag
& OBD_BRW_NOCACHE
) {
1428 pga
->flag
&= ~OBD_BRW_NOCACHE
;
1429 atomic_dec(&obd_dirty_transit_pages
);
1430 cli
->cl_dirty_transit
--;
1435 * To avoid sleeping with object lock held, it's good for us allocate enough
1436 * grants before entering into critical section.
1438 * spin_lock held by caller
1440 static int osc_reserve_grant(struct client_obd
*cli
, unsigned int bytes
)
1444 if (cli
->cl_avail_grant
>= bytes
) {
1445 cli
->cl_avail_grant
-= bytes
;
1446 cli
->cl_reserved_grant
+= bytes
;
1452 static void __osc_unreserve_grant(struct client_obd
*cli
,
1453 unsigned int reserved
, unsigned int unused
)
1455 /* it's quite normal for us to get more grant than reserved.
1456 * Thinking about a case that two extents merged by adding a new
1457 * chunk, we can save one extent tax. If extent tax is greater than
1458 * one chunk, we can save more grant by adding a new chunk
1460 cli
->cl_reserved_grant
-= reserved
;
1461 if (unused
> reserved
) {
1462 cli
->cl_avail_grant
+= reserved
;
1463 cli
->cl_lost_grant
+= unused
- reserved
;
1465 cli
->cl_avail_grant
+= unused
;
1469 static void osc_unreserve_grant(struct client_obd
*cli
,
1470 unsigned int reserved
, unsigned int unused
)
1472 spin_lock(&cli
->cl_loi_list_lock
);
1473 __osc_unreserve_grant(cli
, reserved
, unused
);
1475 osc_wake_cache_waiters(cli
);
1476 spin_unlock(&cli
->cl_loi_list_lock
);
1480 * Free grant after IO is finished or canceled.
1482 * @lost_grant is used to remember how many grants we have allocated but not
1483 * used, we should return these grants to OST. There're two cases where grants
1486 * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
1487 * written. In this case OST may use less chunks to serve this partial
1488 * write. OSTs don't actually know the page size on the client side. so
1489 * clients have to calculate lost grant by the blocksize on the OST.
1490 * See filter_grant_check() for details.
1492 static void osc_free_grant(struct client_obd
*cli
, unsigned int nr_pages
,
1493 unsigned int lost_grant
)
1495 int grant
= (1 << cli
->cl_chunkbits
) + cli
->cl_extent_tax
;
1497 spin_lock(&cli
->cl_loi_list_lock
);
1498 atomic_sub(nr_pages
, &obd_dirty_pages
);
1499 cli
->cl_dirty_pages
-= nr_pages
;
1500 cli
->cl_lost_grant
+= lost_grant
;
1501 if (cli
->cl_avail_grant
< grant
&& cli
->cl_lost_grant
>= grant
) {
1502 /* borrow some grant from truncate to avoid the case that
1503 * truncate uses up all avail grant
1505 cli
->cl_lost_grant
-= grant
;
1506 cli
->cl_avail_grant
+= grant
;
1508 osc_wake_cache_waiters(cli
);
1509 spin_unlock(&cli
->cl_loi_list_lock
);
1510 CDEBUG(D_CACHE
, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1511 lost_grant
, cli
->cl_lost_grant
,
1512 cli
->cl_avail_grant
, cli
->cl_dirty_pages
<< PAGE_SHIFT
);
1516 * The companion to osc_enter_cache(), called when @oap is no longer part of
1517 * the dirty accounting due to error.
1519 static void osc_exit_cache(struct client_obd
*cli
, struct osc_async_page
*oap
)
1521 spin_lock(&cli
->cl_loi_list_lock
);
1522 osc_release_write_grant(cli
, &oap
->oap_brw_page
);
1523 spin_unlock(&cli
->cl_loi_list_lock
);
1527 * Non-blocking version of osc_enter_cache() that consumes grant only when it
1530 static int osc_enter_cache_try(struct client_obd
*cli
,
1531 struct osc_async_page
*oap
,
1532 int bytes
, int transient
)
1536 OSC_DUMP_GRANT(D_CACHE
, cli
, "need:%d.\n", bytes
);
1538 rc
= osc_reserve_grant(cli
, bytes
);
1542 if (cli
->cl_dirty_pages
<= cli
->cl_dirty_max_pages
&&
1543 atomic_read(&obd_dirty_pages
) + 1 <= obd_max_dirty_pages
) {
1544 osc_consume_write_grant(cli
, &oap
->oap_brw_page
);
1546 cli
->cl_dirty_transit
++;
1547 atomic_inc(&obd_dirty_transit_pages
);
1548 oap
->oap_brw_flags
|= OBD_BRW_NOCACHE
;
1552 __osc_unreserve_grant(cli
, bytes
, bytes
);
1558 static int ocw_granted(struct client_obd
*cli
, struct osc_cache_waiter
*ocw
)
1562 spin_lock(&cli
->cl_loi_list_lock
);
1563 rc
= list_empty(&ocw
->ocw_entry
);
1564 spin_unlock(&cli
->cl_loi_list_lock
);
1569 * The main entry to reserve dirty page accounting. Usually the grant reserved
1570 * in this function will be freed in bulk in osc_free_grant() unless it fails
1571 * to add osc cache, in that case, it will be freed in osc_exit_cache().
1573 * The process will be put into sleep if it's already run out of grant.
1575 static int osc_enter_cache(const struct lu_env
*env
, struct client_obd
*cli
,
1576 struct osc_async_page
*oap
, int bytes
)
1578 struct osc_object
*osc
= oap
->oap_obj
;
1579 struct lov_oinfo
*loi
= osc
->oo_oinfo
;
1580 struct osc_cache_waiter ocw
;
1581 struct l_wait_info lwi
= LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL
,
1582 LWI_ON_SIGNAL_NOOP
, NULL
);
1585 OSC_DUMP_GRANT(D_CACHE
, cli
, "need:%d.\n", bytes
);
1587 spin_lock(&cli
->cl_loi_list_lock
);
1589 /* force the caller to try sync io. this can jump the list
1590 * of queued writes and create a discontiguous rpc stream
1592 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT
) ||
1593 !cli
->cl_dirty_max_pages
|| cli
->cl_ar
.ar_force_sync
||
1594 loi
->loi_ar
.ar_force_sync
) {
1599 /* Hopefully normal case - cache space and write credits available */
1600 if (osc_enter_cache_try(cli
, oap
, bytes
, 0)) {
1605 /* We can get here for two reasons: too many dirty pages in cache, or
1606 * run out of grants. In both cases we should write dirty pages out.
1607 * Adding a cache waiter will trigger urgent write-out no matter what
1609 * The exiting condition is no avail grants and no dirty pages caching,
1610 * that really means there is no space on the OST.
1612 init_waitqueue_head(&ocw
.ocw_waitq
);
1614 ocw
.ocw_grant
= bytes
;
1615 while (cli
->cl_dirty_pages
> 0 || cli
->cl_w_in_flight
> 0) {
1616 list_add_tail(&ocw
.ocw_entry
, &cli
->cl_cache_waiters
);
1618 spin_unlock(&cli
->cl_loi_list_lock
);
1620 osc_io_unplug_async(env
, cli
, NULL
);
1622 CDEBUG(D_CACHE
, "%s: sleeping for cache space @ %p for %p\n",
1623 cli
->cl_import
->imp_obd
->obd_name
, &ocw
, oap
);
1625 rc
= l_wait_event(ocw
.ocw_waitq
, ocw_granted(cli
, &ocw
), &lwi
);
1627 spin_lock(&cli
->cl_loi_list_lock
);
1629 /* l_wait_event is interrupted by signal, or timed out */
1631 if (rc
== -ETIMEDOUT
) {
1632 OSC_DUMP_GRANT(D_ERROR
, cli
,
1633 "try to reserve %d.\n", bytes
);
1634 osc_extent_tree_dump(D_ERROR
, osc
);
1638 list_del_init(&ocw
.ocw_entry
);
1642 LASSERT(list_empty(&ocw
.ocw_entry
));
1647 if (osc_enter_cache_try(cli
, oap
, bytes
, 0)) {
1653 spin_unlock(&cli
->cl_loi_list_lock
);
1654 OSC_DUMP_GRANT(D_CACHE
, cli
, "returned %d.\n", rc
);
1658 /* caller must hold loi_list_lock */
1659 void osc_wake_cache_waiters(struct client_obd
*cli
)
1661 struct list_head
*l
, *tmp
;
1662 struct osc_cache_waiter
*ocw
;
1664 list_for_each_safe(l
, tmp
, &cli
->cl_cache_waiters
) {
1665 ocw
= list_entry(l
, struct osc_cache_waiter
, ocw_entry
);
1666 list_del_init(&ocw
->ocw_entry
);
1668 ocw
->ocw_rc
= -EDQUOT
;
1669 /* we can't dirty more */
1670 if ((cli
->cl_dirty_pages
> cli
->cl_dirty_max_pages
) ||
1671 (atomic_read(&obd_dirty_pages
) + 1 > obd_max_dirty_pages
)) {
1672 CDEBUG(D_CACHE
, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
1673 cli
->cl_dirty_pages
, cli
->cl_dirty_max_pages
,
1674 obd_max_dirty_pages
);
1679 if (!osc_enter_cache_try(cli
, ocw
->ocw_oap
, ocw
->ocw_grant
, 0))
1680 ocw
->ocw_rc
= -EDQUOT
;
1683 CDEBUG(D_CACHE
, "wake up %p for oap %p, avail grant %ld, %d\n",
1684 ocw
, ocw
->ocw_oap
, cli
->cl_avail_grant
, ocw
->ocw_rc
);
1686 wake_up(&ocw
->ocw_waitq
);
1690 static int osc_max_rpc_in_flight(struct client_obd
*cli
, struct osc_object
*osc
)
1692 int hprpc
= !!list_empty(&osc
->oo_hp_exts
);
1694 return rpcs_in_flight(cli
) >= cli
->cl_max_rpcs_in_flight
+ hprpc
;
1697 /* This maintains the lists of pending pages to read/write for a given object
1698 * (lop). This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1699 * to quickly find objects that are ready to send an RPC.
1701 static int osc_makes_rpc(struct client_obd
*cli
, struct osc_object
*osc
,
1704 int invalid_import
= 0;
1706 /* if we have an invalid import we want to drain the queued pages
1707 * by forcing them through rpcs that immediately fail and complete
1708 * the pages. recovery relies on this to empty the queued pages
1709 * before canceling the locks and evicting down the llite pages
1711 if (!cli
->cl_import
|| cli
->cl_import
->imp_invalid
)
1714 if (cmd
& OBD_BRW_WRITE
) {
1715 if (atomic_read(&osc
->oo_nr_writes
) == 0)
1717 if (invalid_import
) {
1718 CDEBUG(D_CACHE
, "invalid import forcing RPC\n");
1721 if (!list_empty(&osc
->oo_hp_exts
)) {
1722 CDEBUG(D_CACHE
, "high prio request forcing RPC\n");
1725 if (!list_empty(&osc
->oo_urgent_exts
)) {
1726 CDEBUG(D_CACHE
, "urgent request forcing RPC\n");
1729 /* trigger a write rpc stream as long as there are dirtiers
1730 * waiting for space. as they're waiting, they're not going to
1731 * create more pages to coalesce with what's waiting..
1733 if (!list_empty(&cli
->cl_cache_waiters
)) {
1734 CDEBUG(D_CACHE
, "cache waiters forcing RPC\n");
1737 if (atomic_read(&osc
->oo_nr_writes
) >=
1738 cli
->cl_max_pages_per_rpc
)
1741 if (atomic_read(&osc
->oo_nr_reads
) == 0)
1743 if (invalid_import
) {
1744 CDEBUG(D_CACHE
, "invalid import forcing RPC\n");
1747 /* all read are urgent. */
1748 if (!list_empty(&osc
->oo_reading_exts
))
1755 static void osc_update_pending(struct osc_object
*obj
, int cmd
, int delta
)
1757 struct client_obd
*cli
= osc_cli(obj
);
1759 if (cmd
& OBD_BRW_WRITE
) {
1760 atomic_add(delta
, &obj
->oo_nr_writes
);
1761 atomic_add(delta
, &cli
->cl_pending_w_pages
);
1762 LASSERT(atomic_read(&obj
->oo_nr_writes
) >= 0);
1764 atomic_add(delta
, &obj
->oo_nr_reads
);
1765 atomic_add(delta
, &cli
->cl_pending_r_pages
);
1766 LASSERT(atomic_read(&obj
->oo_nr_reads
) >= 0);
1768 OSC_IO_DEBUG(obj
, "update pending cmd %d delta %d.\n", cmd
, delta
);
1771 static int osc_makes_hprpc(struct osc_object
*obj
)
1773 return !list_empty(&obj
->oo_hp_exts
);
1776 static void on_list(struct list_head
*item
, struct list_head
*list
, int should_be_on
)
1778 if (list_empty(item
) && should_be_on
)
1779 list_add_tail(item
, list
);
1780 else if (!list_empty(item
) && !should_be_on
)
1781 list_del_init(item
);
1784 /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1785 * can find pages to build into rpcs quickly
1787 static int __osc_list_maint(struct client_obd
*cli
, struct osc_object
*osc
)
1789 if (osc_makes_hprpc(osc
)) {
1791 on_list(&osc
->oo_ready_item
, &cli
->cl_loi_ready_list
, 0);
1792 on_list(&osc
->oo_hp_ready_item
, &cli
->cl_loi_hp_ready_list
, 1);
1794 on_list(&osc
->oo_hp_ready_item
, &cli
->cl_loi_hp_ready_list
, 0);
1795 on_list(&osc
->oo_ready_item
, &cli
->cl_loi_ready_list
,
1796 osc_makes_rpc(cli
, osc
, OBD_BRW_WRITE
) ||
1797 osc_makes_rpc(cli
, osc
, OBD_BRW_READ
));
1800 on_list(&osc
->oo_write_item
, &cli
->cl_loi_write_list
,
1801 atomic_read(&osc
->oo_nr_writes
) > 0);
1803 on_list(&osc
->oo_read_item
, &cli
->cl_loi_read_list
,
1804 atomic_read(&osc
->oo_nr_reads
) > 0);
1806 return osc_is_ready(osc
);
1809 static int osc_list_maint(struct client_obd
*cli
, struct osc_object
*osc
)
1813 spin_lock(&cli
->cl_loi_list_lock
);
1814 is_ready
= __osc_list_maint(cli
, osc
);
1815 spin_unlock(&cli
->cl_loi_list_lock
);
1820 /* this is trying to propagate async writeback errors back up to the
1821 * application. As an async write fails we record the error code for later if
1822 * the app does an fsync. As long as errors persist we force future rpcs to be
1823 * sync so that the app can get a sync error and break the cycle of queueing
1824 * pages for which writeback will fail.
1826 static void osc_process_ar(struct osc_async_rc
*ar
, __u64 xid
,
1833 ar
->ar_force_sync
= 1;
1834 ar
->ar_min_xid
= ptlrpc_sample_next_xid();
1838 if (ar
->ar_force_sync
&& (xid
>= ar
->ar_min_xid
))
1839 ar
->ar_force_sync
= 0;
1842 /* this must be called holding the loi list lock to give coverage to exit_cache,
1843 * async_flag maintenance, and oap_request
1845 static void osc_ap_completion(const struct lu_env
*env
, struct client_obd
*cli
,
1846 struct osc_async_page
*oap
, int sent
, int rc
)
1848 struct osc_object
*osc
= oap
->oap_obj
;
1849 struct lov_oinfo
*loi
= osc
->oo_oinfo
;
1852 if (oap
->oap_request
) {
1853 xid
= ptlrpc_req_xid(oap
->oap_request
);
1854 ptlrpc_req_finished(oap
->oap_request
);
1855 oap
->oap_request
= NULL
;
1858 /* As the transfer for this page is being done, clear the flags */
1859 spin_lock(&oap
->oap_lock
);
1860 oap
->oap_async_flags
= 0;
1861 spin_unlock(&oap
->oap_lock
);
1862 oap
->oap_interrupted
= 0;
1864 if (oap
->oap_cmd
& OBD_BRW_WRITE
&& xid
> 0) {
1865 spin_lock(&cli
->cl_loi_list_lock
);
1866 osc_process_ar(&cli
->cl_ar
, xid
, rc
);
1867 osc_process_ar(&loi
->loi_ar
, xid
, rc
);
1868 spin_unlock(&cli
->cl_loi_list_lock
);
1871 rc
= osc_completion(env
, oap
, oap
->oap_cmd
, rc
);
1873 CERROR("completion on oap %p obj %p returns %d.\n",
1878 * Try to add extent to one RPC. We need to think about the following things:
1879 * - # of pages must not be over max_pages_per_rpc
1880 * - extent must be compatible with previous ones
1882 static int try_to_add_extent_for_io(struct client_obd
*cli
,
1883 struct osc_extent
*ext
, struct list_head
*rpclist
,
1884 int *pc
, unsigned int *max_pages
)
1886 struct osc_extent
*tmp
;
1887 struct osc_async_page
*oap
= list_first_entry(&ext
->oe_pages
,
1888 struct osc_async_page
,
1891 EASSERT((ext
->oe_state
== OES_CACHE
|| ext
->oe_state
== OES_LOCK_DONE
),
1894 *max_pages
= max(ext
->oe_mppr
, *max_pages
);
1895 if (*pc
+ ext
->oe_nr_pages
> *max_pages
)
1898 list_for_each_entry(tmp
, rpclist
, oe_link
) {
1899 struct osc_async_page
*oap2
;
1901 oap2
= list_first_entry(&tmp
->oe_pages
, struct osc_async_page
,
1903 EASSERT(tmp
->oe_owner
== current
, tmp
);
1904 if (oap2cl_page(oap
)->cp_type
!= oap2cl_page(oap2
)->cp_type
) {
1905 CDEBUG(D_CACHE
, "Do not permit different type of IO"
1906 " for a same RPC\n");
1910 if (tmp
->oe_srvlock
!= ext
->oe_srvlock
||
1911 !tmp
->oe_grants
!= !ext
->oe_grants
)
1914 /* remove break for strict check */
1918 *pc
+= ext
->oe_nr_pages
;
1919 list_move_tail(&ext
->oe_link
, rpclist
);
1920 ext
->oe_owner
= current
;
1925 * In order to prevent multiple ptlrpcd from breaking contiguous extents,
1926 * get_write_extent() takes all appropriate extents in atomic.
1928 * The following policy is used to collect extents for IO:
1929 * 1. Add as many HP extents as possible;
1930 * 2. Add the first urgent extent in urgent extent list and take it out of
1932 * 3. Add subsequent extents of this urgent extent;
1933 * 4. If urgent list is not empty, goto 2;
1934 * 5. Traverse the extent tree from the 1st extent;
1935 * 6. Above steps exit if there is no space in this RPC.
1937 static int get_write_extents(struct osc_object
*obj
, struct list_head
*rpclist
)
1939 struct client_obd
*cli
= osc_cli(obj
);
1940 struct osc_extent
*ext
;
1941 struct osc_extent
*temp
;
1943 unsigned int max_pages
= cli
->cl_max_pages_per_rpc
;
1945 LASSERT(osc_object_is_locked(obj
));
1946 list_for_each_entry_safe(ext
, temp
, &obj
->oo_hp_exts
, oe_link
) {
1947 LASSERT(ext
->oe_state
== OES_CACHE
);
1948 if (!try_to_add_extent_for_io(cli
, ext
, rpclist
, &page_count
,
1951 EASSERT(ext
->oe_nr_pages
<= max_pages
, ext
);
1953 if (page_count
== max_pages
)
1956 while (!list_empty(&obj
->oo_urgent_exts
)) {
1957 ext
= list_entry(obj
->oo_urgent_exts
.next
,
1958 struct osc_extent
, oe_link
);
1959 if (!try_to_add_extent_for_io(cli
, ext
, rpclist
, &page_count
,
1963 if (!ext
->oe_intree
)
1966 while ((ext
= next_extent(ext
)) != NULL
) {
1967 if ((ext
->oe_state
!= OES_CACHE
) ||
1968 (!list_empty(&ext
->oe_link
) &&
1972 if (!try_to_add_extent_for_io(cli
, ext
, rpclist
,
1973 &page_count
, &max_pages
))
1977 if (page_count
== max_pages
)
1980 ext
= first_extent(obj
);
1982 if ((ext
->oe_state
!= OES_CACHE
) ||
1983 /* this extent may be already in current rpclist */
1984 (!list_empty(&ext
->oe_link
) && ext
->oe_owner
)) {
1985 ext
= next_extent(ext
);
1989 if (!try_to_add_extent_for_io(cli
, ext
, rpclist
, &page_count
,
1993 ext
= next_extent(ext
);
1999 osc_send_write_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
2000 struct osc_object
*osc
)
2004 struct osc_extent
*ext
;
2005 struct osc_extent
*tmp
;
2006 struct osc_extent
*first
= NULL
;
2011 LASSERT(osc_object_is_locked(osc
));
2013 page_count
= get_write_extents(osc
, &rpclist
);
2014 LASSERT(equi(page_count
== 0, list_empty(&rpclist
)));
2016 if (list_empty(&rpclist
))
2019 osc_update_pending(osc
, OBD_BRW_WRITE
, -page_count
);
2021 list_for_each_entry(ext
, &rpclist
, oe_link
) {
2022 LASSERT(ext
->oe_state
== OES_CACHE
||
2023 ext
->oe_state
== OES_LOCK_DONE
);
2024 if (ext
->oe_state
== OES_CACHE
)
2025 osc_extent_state_set(ext
, OES_LOCKING
);
2027 osc_extent_state_set(ext
, OES_RPC
);
2030 /* we're going to grab page lock, so release object lock because
2031 * lock order is page lock -> object lock.
2033 osc_object_unlock(osc
);
2035 list_for_each_entry_safe(ext
, tmp
, &rpclist
, oe_link
) {
2036 if (ext
->oe_state
== OES_LOCKING
) {
2037 rc
= osc_extent_make_ready(env
, ext
);
2038 if (unlikely(rc
< 0)) {
2039 list_del_init(&ext
->oe_link
);
2040 osc_extent_finish(env
, ext
, 0, rc
);
2046 srvlock
= ext
->oe_srvlock
;
2048 LASSERT(srvlock
== ext
->oe_srvlock
);
2052 if (!list_empty(&rpclist
)) {
2053 LASSERT(page_count
> 0);
2054 rc
= osc_build_rpc(env
, cli
, &rpclist
, OBD_BRW_WRITE
);
2055 LASSERT(list_empty(&rpclist
));
2058 osc_object_lock(osc
);
2063 * prepare pages for ASYNC io and put pages in send queue.
2065 * \param cmd OBD_BRW_* macroses
2066 * \param lop pending pages
2068 * \return zero if no page added to send queue.
2069 * \return 1 if pages successfully added to send queue.
2070 * \return negative on errors.
2073 osc_send_read_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
2074 struct osc_object
*osc
)
2077 struct osc_extent
*ext
;
2078 struct osc_extent
*next
;
2081 unsigned int max_pages
= cli
->cl_max_pages_per_rpc
;
2084 LASSERT(osc_object_is_locked(osc
));
2085 list_for_each_entry_safe(ext
, next
, &osc
->oo_reading_exts
, oe_link
) {
2086 EASSERT(ext
->oe_state
== OES_LOCK_DONE
, ext
);
2087 if (!try_to_add_extent_for_io(cli
, ext
, &rpclist
, &page_count
,
2090 osc_extent_state_set(ext
, OES_RPC
);
2091 EASSERT(ext
->oe_nr_pages
<= max_pages
, ext
);
2093 LASSERT(page_count
<= max_pages
);
2095 osc_update_pending(osc
, OBD_BRW_READ
, -page_count
);
2097 if (!list_empty(&rpclist
)) {
2098 osc_object_unlock(osc
);
2100 LASSERT(page_count
> 0);
2101 rc
= osc_build_rpc(env
, cli
, &rpclist
, OBD_BRW_READ
);
2102 LASSERT(list_empty(&rpclist
));
2104 osc_object_lock(osc
);
2109 #define list_to_obj(list, item) ({ \
2110 struct list_head *__tmp = (list)->next; \
2111 list_del_init(__tmp); \
2112 list_entry(__tmp, struct osc_object, oo_##item); \
2115 /* This is called by osc_check_rpcs() to find which objects have pages that
2116 * we could be sending. These lists are maintained by osc_makes_rpc().
2118 static struct osc_object
*osc_next_obj(struct client_obd
*cli
)
2120 /* First return objects that have blocked locks so that they
2121 * will be flushed quickly and other clients can get the lock,
2122 * then objects which have pages ready to be stuffed into RPCs
2124 if (!list_empty(&cli
->cl_loi_hp_ready_list
))
2125 return list_to_obj(&cli
->cl_loi_hp_ready_list
, hp_ready_item
);
2126 if (!list_empty(&cli
->cl_loi_ready_list
))
2127 return list_to_obj(&cli
->cl_loi_ready_list
, ready_item
);
2129 /* then if we have cache waiters, return all objects with queued
2130 * writes. This is especially important when many small files
2131 * have filled up the cache and not been fired into rpcs because
2132 * they don't pass the nr_pending/object threshold
2134 if (!list_empty(&cli
->cl_cache_waiters
) &&
2135 !list_empty(&cli
->cl_loi_write_list
))
2136 return list_to_obj(&cli
->cl_loi_write_list
, write_item
);
2138 /* then return all queued objects when we have an invalid import
2139 * so that they get flushed
2141 if (!cli
->cl_import
|| cli
->cl_import
->imp_invalid
) {
2142 if (!list_empty(&cli
->cl_loi_write_list
))
2143 return list_to_obj(&cli
->cl_loi_write_list
, write_item
);
2144 if (!list_empty(&cli
->cl_loi_read_list
))
2145 return list_to_obj(&cli
->cl_loi_read_list
, read_item
);
2150 /* called with the loi list lock held */
2151 static void osc_check_rpcs(const struct lu_env
*env
, struct client_obd
*cli
)
2152 __must_hold(&cli
->cl_loi_list_lock
)
2154 struct osc_object
*osc
;
2157 while ((osc
= osc_next_obj(cli
)) != NULL
) {
2158 struct cl_object
*obj
= osc2cl(osc
);
2159 struct lu_ref_link link
;
2161 OSC_IO_DEBUG(osc
, "%lu in flight\n", rpcs_in_flight(cli
));
2163 if (osc_max_rpc_in_flight(cli
, osc
)) {
2164 __osc_list_maint(cli
, osc
);
2169 spin_unlock(&cli
->cl_loi_list_lock
);
2170 lu_object_ref_add_at(&obj
->co_lu
, &link
, "check", current
);
2172 /* attempt some read/write balancing by alternating between
2173 * reads and writes in an object. The makes_rpc checks here
2174 * would be redundant if we were getting read/write work items
2175 * instead of objects. we don't want send_oap_rpc to drain a
2176 * partial read pending queue when we're given this object to
2177 * do io on writes while there are cache waiters
2179 osc_object_lock(osc
);
2180 if (osc_makes_rpc(cli
, osc
, OBD_BRW_WRITE
)) {
2181 rc
= osc_send_write_rpc(env
, cli
, osc
);
2183 CERROR("Write request failed with %d\n", rc
);
2185 /* osc_send_write_rpc failed, mostly because of
2188 * It can't break here, because if:
2189 * - a page was submitted by osc_io_submit, so
2191 * - no request in flight
2192 * - no subsequent request
2193 * The system will be in live-lock state,
2194 * because there is no chance to call
2195 * osc_io_unplug() and osc_check_rpcs() any
2196 * more. pdflush can't help in this case,
2197 * because it might be blocked at grabbing
2198 * the page lock as we mentioned.
2200 * Anyway, continue to drain pages.
2205 if (osc_makes_rpc(cli
, osc
, OBD_BRW_READ
)) {
2206 rc
= osc_send_read_rpc(env
, cli
, osc
);
2208 CERROR("Read request failed with %d\n", rc
);
2210 osc_object_unlock(osc
);
2212 osc_list_maint(cli
, osc
);
2213 lu_object_ref_del_at(&obj
->co_lu
, &link
, "check", current
);
2214 cl_object_put(env
, obj
);
2216 spin_lock(&cli
->cl_loi_list_lock
);
2220 static int osc_io_unplug0(const struct lu_env
*env
, struct client_obd
*cli
,
2221 struct osc_object
*osc
, int async
)
2225 if (osc
&& osc_list_maint(cli
, osc
) == 0)
2229 /* disable osc_lru_shrink() temporarily to avoid
2230 * potential stack overrun problem. LU-2859
2232 atomic_inc(&cli
->cl_lru_shrinkers
);
2233 spin_lock(&cli
->cl_loi_list_lock
);
2234 osc_check_rpcs(env
, cli
);
2235 spin_unlock(&cli
->cl_loi_list_lock
);
2236 atomic_dec(&cli
->cl_lru_shrinkers
);
2238 CDEBUG(D_CACHE
, "Queue writeback work for client %p.\n", cli
);
2239 LASSERT(cli
->cl_writeback_work
);
2240 rc
= ptlrpcd_queue_work(cli
->cl_writeback_work
);
2245 static int osc_io_unplug_async(const struct lu_env
*env
,
2246 struct client_obd
*cli
, struct osc_object
*osc
)
2248 return osc_io_unplug0(env
, cli
, osc
, 1);
2251 void osc_io_unplug(const struct lu_env
*env
, struct client_obd
*cli
,
2252 struct osc_object
*osc
)
2254 (void)osc_io_unplug0(env
, cli
, osc
, 0);
2257 int osc_prep_async_page(struct osc_object
*osc
, struct osc_page
*ops
,
2258 struct page
*page
, loff_t offset
)
2260 struct obd_export
*exp
= osc_export(osc
);
2261 struct osc_async_page
*oap
= &ops
->ops_oap
;
2264 return cfs_size_round(sizeof(*oap
));
2266 oap
->oap_magic
= OAP_MAGIC
;
2267 oap
->oap_cli
= &exp
->exp_obd
->u
.cli
;
2270 oap
->oap_page
= page
;
2271 oap
->oap_obj_off
= offset
;
2272 LASSERT(!(offset
& ~PAGE_MASK
));
2274 if (capable(CFS_CAP_SYS_RESOURCE
))
2275 oap
->oap_brw_flags
= OBD_BRW_NOQUOTA
;
2277 INIT_LIST_HEAD(&oap
->oap_pending_item
);
2278 INIT_LIST_HEAD(&oap
->oap_rpc_item
);
2280 spin_lock_init(&oap
->oap_lock
);
2281 CDEBUG(D_INFO
, "oap %p page %p obj off %llu\n",
2282 oap
, page
, oap
->oap_obj_off
);
2286 int osc_queue_async_io(const struct lu_env
*env
, struct cl_io
*io
,
2287 struct osc_page
*ops
)
2289 struct osc_io
*oio
= osc_env_io(env
);
2290 struct osc_extent
*ext
= NULL
;
2291 struct osc_async_page
*oap
= &ops
->ops_oap
;
2292 struct client_obd
*cli
= oap
->oap_cli
;
2293 struct osc_object
*osc
= oap
->oap_obj
;
2296 int brw_flags
= OBD_BRW_ASYNC
;
2297 int cmd
= OBD_BRW_WRITE
;
2298 int need_release
= 0;
2301 if (oap
->oap_magic
!= OAP_MAGIC
)
2304 if (!cli
->cl_import
|| cli
->cl_import
->imp_invalid
)
2307 if (!list_empty(&oap
->oap_pending_item
) ||
2308 !list_empty(&oap
->oap_rpc_item
))
2311 /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2312 brw_flags
|= ops
->ops_srvlock
? OBD_BRW_SRVLOCK
: 0;
2313 if (capable(CFS_CAP_SYS_RESOURCE
)) {
2314 brw_flags
|= OBD_BRW_NOQUOTA
;
2315 cmd
|= OBD_BRW_NOQUOTA
;
2318 /* check if the file's owner/group is over quota */
2319 if (!(cmd
& OBD_BRW_NOQUOTA
)) {
2320 struct cl_object
*obj
;
2321 struct cl_attr
*attr
;
2322 unsigned int qid
[MAXQUOTAS
];
2324 obj
= cl_object_top(&osc
->oo_cl
);
2325 attr
= &osc_env_info(env
)->oti_attr
;
2327 cl_object_attr_lock(obj
);
2328 rc
= cl_object_attr_get(env
, obj
, attr
);
2329 cl_object_attr_unlock(obj
);
2331 qid
[USRQUOTA
] = attr
->cat_uid
;
2332 qid
[GRPQUOTA
] = attr
->cat_gid
;
2333 if (rc
== 0 && osc_quota_chkdq(cli
, qid
) == NO_QUOTA
)
2340 oap
->oap_page_off
= ops
->ops_from
;
2341 oap
->oap_count
= ops
->ops_to
- ops
->ops_from
;
2343 * No need to hold a lock here,
2344 * since this page is not in any list yet.
2346 oap
->oap_async_flags
= 0;
2347 oap
->oap_brw_flags
= brw_flags
;
2349 OSC_IO_DEBUG(osc
, "oap %p page %p added for cmd %d\n",
2350 oap
, oap
->oap_page
, oap
->oap_cmd
& OBD_BRW_RWMASK
);
2352 index
= osc_index(oap2osc(oap
));
2354 /* Add this page into extent by the following steps:
2355 * 1. if there exists an active extent for this IO, mostly this page
2356 * can be added to the active extent and sometimes we need to
2357 * expand extent to accommodate this page;
2358 * 2. otherwise, a new extent will be allocated.
2361 ext
= oio
->oi_active
;
2362 if (ext
&& ext
->oe_start
<= index
&& ext
->oe_max_end
>= index
) {
2363 /* one chunk plus extent overhead must be enough to write this
2366 grants
= (1 << cli
->cl_chunkbits
) + cli
->cl_extent_tax
;
2367 if (ext
->oe_end
>= index
)
2370 /* it doesn't need any grant to dirty this page */
2371 spin_lock(&cli
->cl_loi_list_lock
);
2372 rc
= osc_enter_cache_try(cli
, oap
, grants
, 0);
2373 spin_unlock(&cli
->cl_loi_list_lock
);
2374 if (rc
== 0) { /* try failed */
2377 } else if (ext
->oe_end
< index
) {
2379 /* try to expand this extent */
2380 rc
= osc_extent_expand(ext
, index
, &tmp
);
2383 /* don't free reserved grant */
2385 OSC_EXTENT_DUMP(D_CACHE
, ext
,
2386 "expanded for %lu.\n", index
);
2387 osc_unreserve_grant(cli
, grants
, tmp
);
2393 /* index is located outside of active extent */
2397 osc_extent_release(env
, ext
);
2398 oio
->oi_active
= NULL
;
2403 int tmp
= (1 << cli
->cl_chunkbits
) + cli
->cl_extent_tax
;
2405 /* try to find new extent to cover this page */
2406 LASSERT(!oio
->oi_active
);
2407 /* we may have allocated grant for this page if we failed
2408 * to expand the previous active extent.
2410 LASSERT(ergo(grants
> 0, grants
>= tmp
));
2414 /* we haven't allocated grant for this page. */
2415 rc
= osc_enter_cache(env
, cli
, oap
, tmp
);
2422 ext
= osc_extent_find(env
, osc
, index
, &tmp
);
2424 LASSERT(tmp
== grants
);
2425 osc_exit_cache(cli
, oap
);
2429 oio
->oi_active
= ext
;
2433 osc_unreserve_grant(cli
, grants
, tmp
);
2436 LASSERT(ergo(rc
== 0, ext
));
2438 EASSERTF(ext
->oe_end
>= index
&& ext
->oe_start
<= index
,
2439 ext
, "index = %lu.\n", index
);
2440 LASSERT((oap
->oap_brw_flags
& OBD_BRW_FROM_GRANT
) != 0);
2442 osc_object_lock(osc
);
2443 if (ext
->oe_nr_pages
== 0)
2444 ext
->oe_srvlock
= ops
->ops_srvlock
;
2446 LASSERT(ext
->oe_srvlock
== ops
->ops_srvlock
);
2448 list_add_tail(&oap
->oap_pending_item
, &ext
->oe_pages
);
2449 osc_object_unlock(osc
);
2454 int osc_teardown_async_page(const struct lu_env
*env
,
2455 struct osc_object
*obj
, struct osc_page
*ops
)
2457 struct osc_async_page
*oap
= &ops
->ops_oap
;
2458 struct osc_extent
*ext
= NULL
;
2461 LASSERT(oap
->oap_magic
== OAP_MAGIC
);
2463 CDEBUG(D_INFO
, "teardown oap %p page %p at index %lu.\n",
2464 oap
, ops
, osc_index(oap2osc(oap
)));
2466 osc_object_lock(obj
);
2467 if (!list_empty(&oap
->oap_rpc_item
)) {
2468 CDEBUG(D_CACHE
, "oap %p is not in cache.\n", oap
);
2470 } else if (!list_empty(&oap
->oap_pending_item
)) {
2471 ext
= osc_extent_lookup(obj
, osc_index(oap2osc(oap
)));
2472 /* only truncated pages are allowed to be taken out.
2473 * See osc_extent_truncate() and osc_cache_truncate_start()
2476 if (ext
&& ext
->oe_state
!= OES_TRUNC
) {
2477 OSC_EXTENT_DUMP(D_ERROR
, ext
, "trunc at %lu.\n",
2478 osc_index(oap2osc(oap
)));
2482 osc_object_unlock(obj
);
2484 osc_extent_put(env
, ext
);
2489 * This is called when a page is picked up by kernel to write out.
2491 * We should find out the corresponding extent and add the whole extent
2492 * into urgent list. The extent may be being truncated or used, handle it
2495 int osc_flush_async_page(const struct lu_env
*env
, struct cl_io
*io
,
2496 struct osc_page
*ops
)
2498 struct osc_extent
*ext
= NULL
;
2499 struct osc_object
*obj
= cl2osc(ops
->ops_cl
.cpl_obj
);
2500 struct cl_page
*cp
= ops
->ops_cl
.cpl_page
;
2501 pgoff_t index
= osc_index(ops
);
2502 struct osc_async_page
*oap
= &ops
->ops_oap
;
2503 bool unplug
= false;
2506 osc_object_lock(obj
);
2507 ext
= osc_extent_lookup(obj
, index
);
2509 osc_extent_tree_dump(D_ERROR
, obj
);
2510 LASSERTF(0, "page index %lu is NOT covered.\n", index
);
2513 switch (ext
->oe_state
) {
2516 CL_PAGE_DEBUG(D_ERROR
, env
, cp
, "flush an in-rpc page?\n");
2520 /* If we know this extent is being written out, we should abort
2521 * so that the writer can make this page ready. Otherwise, there
2522 * exists a deadlock problem because other process can wait for
2523 * page writeback bit holding page lock; and meanwhile in
2524 * vvp_page_make_ready(), we need to grab page lock before
2525 * really sending the RPC.
2528 /* race with truncate, page will be redirtied */
2530 /* The extent is active so we need to abort and let the caller
2531 * re-dirty the page. If we continued on here, and we were the
2532 * one making the extent active, we could deadlock waiting for
2533 * the page writeback to clear but it won't because the extent
2534 * is active and won't be written out.
2542 rc
= cl_page_prep(env
, io
, cp
, CRT_WRITE
);
2546 spin_lock(&oap
->oap_lock
);
2547 oap
->oap_async_flags
|= ASYNC_READY
| ASYNC_URGENT
;
2548 spin_unlock(&oap
->oap_lock
);
2550 if (memory_pressure_get())
2551 ext
->oe_memalloc
= 1;
2554 if (ext
->oe_state
== OES_CACHE
) {
2555 OSC_EXTENT_DUMP(D_CACHE
, ext
,
2556 "flush page %p make it urgent.\n", oap
);
2557 if (list_empty(&ext
->oe_link
))
2558 list_add_tail(&ext
->oe_link
, &obj
->oo_urgent_exts
);
2564 osc_object_unlock(obj
);
2565 osc_extent_put(env
, ext
);
2567 osc_io_unplug_async(env
, osc_cli(obj
), obj
);
2572 * this is called when a sync waiter receives an interruption. Its job is to
2573 * get the caller woken as soon as possible. If its page hasn't been put in an
2574 * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
2575 * desiring interruption which will forcefully complete the rpc once the rpc
2578 int osc_cancel_async_page(const struct lu_env
*env
, struct osc_page
*ops
)
2580 struct osc_async_page
*oap
= &ops
->ops_oap
;
2581 struct osc_object
*obj
= oap
->oap_obj
;
2582 struct client_obd
*cli
= osc_cli(obj
);
2583 struct osc_extent
*ext
;
2584 struct osc_extent
*found
= NULL
;
2585 struct list_head
*plist
;
2586 pgoff_t index
= osc_index(ops
);
2590 LASSERT(!oap
->oap_interrupted
);
2591 oap
->oap_interrupted
= 1;
2593 /* Find out the caching extent */
2594 osc_object_lock(obj
);
2595 if (oap
->oap_cmd
& OBD_BRW_WRITE
) {
2596 plist
= &obj
->oo_urgent_exts
;
2597 cmd
= OBD_BRW_WRITE
;
2599 plist
= &obj
->oo_reading_exts
;
2602 list_for_each_entry(ext
, plist
, oe_link
) {
2603 if (ext
->oe_start
<= index
&& ext
->oe_end
>= index
) {
2604 LASSERT(ext
->oe_state
== OES_LOCK_DONE
);
2605 /* For OES_LOCK_DONE state extent, it has already held
2606 * a refcount for RPC.
2608 found
= osc_extent_get(ext
);
2613 list_del_init(&found
->oe_link
);
2614 osc_update_pending(obj
, cmd
, -found
->oe_nr_pages
);
2615 osc_object_unlock(obj
);
2617 osc_extent_finish(env
, found
, 0, -EINTR
);
2618 osc_extent_put(env
, found
);
2621 osc_object_unlock(obj
);
2622 /* ok, it's been put in an rpc. only one oap gets a request
2625 if (oap
->oap_request
) {
2626 ptlrpc_mark_interrupted(oap
->oap_request
);
2627 ptlrpcd_wake(oap
->oap_request
);
2628 ptlrpc_req_finished(oap
->oap_request
);
2629 oap
->oap_request
= NULL
;
2633 osc_list_maint(cli
, obj
);
2637 int osc_queue_sync_pages(const struct lu_env
*env
, struct osc_object
*obj
,
2638 struct list_head
*list
, int cmd
, int brw_flags
)
2640 struct client_obd
*cli
= osc_cli(obj
);
2641 struct osc_extent
*ext
;
2642 struct osc_async_page
*oap
, *tmp
;
2644 int mppr
= cli
->cl_max_pages_per_rpc
;
2645 pgoff_t start
= CL_PAGE_EOF
;
2648 list_for_each_entry(oap
, list
, oap_pending_item
) {
2649 pgoff_t index
= osc_index(oap2osc(oap
));
2656 mppr
<<= (page_count
> mppr
);
2659 ext
= osc_extent_alloc(obj
);
2661 list_for_each_entry_safe(oap
, tmp
, list
, oap_pending_item
) {
2662 list_del_init(&oap
->oap_pending_item
);
2663 osc_ap_completion(env
, cli
, oap
, 0, -ENOMEM
);
2668 ext
->oe_rw
= !!(cmd
& OBD_BRW_READ
);
2671 ext
->oe_start
= start
;
2673 ext
->oe_max_end
= end
;
2675 ext
->oe_srvlock
= !!(brw_flags
& OBD_BRW_SRVLOCK
);
2676 ext
->oe_nr_pages
= page_count
;
2677 ext
->oe_mppr
= mppr
;
2678 list_splice_init(list
, &ext
->oe_pages
);
2680 osc_object_lock(obj
);
2681 /* Reuse the initial refcount for RPC, don't drop it */
2682 osc_extent_state_set(ext
, OES_LOCK_DONE
);
2683 if (cmd
& OBD_BRW_WRITE
) {
2684 list_add_tail(&ext
->oe_link
, &obj
->oo_urgent_exts
);
2685 osc_update_pending(obj
, OBD_BRW_WRITE
, page_count
);
2687 list_add_tail(&ext
->oe_link
, &obj
->oo_reading_exts
);
2688 osc_update_pending(obj
, OBD_BRW_READ
, page_count
);
2690 osc_object_unlock(obj
);
2692 osc_io_unplug_async(env
, cli
, obj
);
2697 * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2699 int osc_cache_truncate_start(const struct lu_env
*env
, struct osc_io
*oio
,
2700 struct osc_object
*obj
, __u64 size
)
2702 struct client_obd
*cli
= osc_cli(obj
);
2703 struct osc_extent
*ext
;
2704 struct osc_extent
*temp
;
2705 struct osc_extent
*waiting
= NULL
;
2711 /* pages with index greater or equal to index will be truncated. */
2712 index
= cl_index(osc2cl(obj
), size
);
2713 partial
= size
> cl_offset(osc2cl(obj
), index
);
2716 osc_object_lock(obj
);
2717 ext
= osc_extent_search(obj
, index
);
2719 ext
= first_extent(obj
);
2720 else if (ext
->oe_end
< index
)
2721 ext
= next_extent(ext
);
2723 EASSERT(ext
->oe_state
!= OES_TRUNC
, ext
);
2725 if (ext
->oe_state
> OES_CACHE
|| ext
->oe_urgent
) {
2726 /* if ext is in urgent state, it means there must exist
2727 * a page already having been flushed by write_page().
2728 * We have to wait for this extent because we can't
2729 * truncate that page.
2731 LASSERT(!ext
->oe_hp
);
2732 OSC_EXTENT_DUMP(D_CACHE
, ext
,
2733 "waiting for busy extent\n");
2734 waiting
= osc_extent_get(ext
);
2738 OSC_EXTENT_DUMP(D_CACHE
, ext
, "try to trunc:%llu.\n", size
);
2740 osc_extent_get(ext
);
2741 if (ext
->oe_state
== OES_ACTIVE
) {
2742 /* though we grab inode mutex for write path, but we
2743 * release it before releasing extent(in osc_io_end()),
2744 * so there is a race window that an extent is still
2745 * in OES_ACTIVE when truncate starts.
2747 LASSERT(!ext
->oe_trunc_pending
);
2748 ext
->oe_trunc_pending
= 1;
2750 EASSERT(ext
->oe_state
== OES_CACHE
, ext
);
2751 osc_extent_state_set(ext
, OES_TRUNC
);
2752 osc_update_pending(obj
, OBD_BRW_WRITE
,
2755 EASSERT(list_empty(&ext
->oe_link
), ext
);
2756 list_add_tail(&ext
->oe_link
, &list
);
2758 ext
= next_extent(ext
);
2760 osc_object_unlock(obj
);
2762 osc_list_maint(cli
, obj
);
2764 list_for_each_entry_safe(ext
, temp
, &list
, oe_link
) {
2767 list_del_init(&ext
->oe_link
);
2769 /* extent may be in OES_ACTIVE state because inode mutex
2770 * is released before osc_io_end() in file write case
2772 if (ext
->oe_state
!= OES_TRUNC
)
2773 osc_extent_wait(env
, ext
, OES_TRUNC
);
2775 rc
= osc_extent_truncate(ext
, index
, partial
);
2780 OSC_EXTENT_DUMP(D_ERROR
, ext
,
2781 "truncate error %d\n", rc
);
2782 } else if (ext
->oe_nr_pages
== 0) {
2783 osc_extent_remove(ext
);
2785 /* this must be an overlapped extent which means only
2786 * part of pages in this extent have been truncated.
2788 EASSERTF(ext
->oe_start
<= index
, ext
,
2789 "trunc index = %lu/%d.\n", index
, partial
);
2790 /* fix index to skip this partially truncated extent */
2791 index
= ext
->oe_end
+ 1;
2794 /* we need to hold this extent in OES_TRUNC state so
2795 * that no writeback will happen. This is to avoid
2798 LASSERT(!oio
->oi_trunc
);
2799 oio
->oi_trunc
= osc_extent_get(ext
);
2800 OSC_EXTENT_DUMP(D_CACHE
, ext
,
2801 "trunc at %llu\n", size
);
2803 osc_extent_put(env
, ext
);
2808 /* ignore the result of osc_extent_wait the write initiator
2809 * should take care of it.
2811 rc
= osc_extent_wait(env
, waiting
, OES_INV
);
2813 OSC_EXTENT_DUMP(D_CACHE
, waiting
, "error: %d.\n", rc
);
2815 osc_extent_put(env
, waiting
);
2823 * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2825 void osc_cache_truncate_end(const struct lu_env
*env
, struct osc_io
*oio
,
2826 struct osc_object
*obj
)
2828 struct osc_extent
*ext
= oio
->oi_trunc
;
2830 oio
->oi_trunc
= NULL
;
2832 bool unplug
= false;
2834 EASSERT(ext
->oe_nr_pages
> 0, ext
);
2835 EASSERT(ext
->oe_state
== OES_TRUNC
, ext
);
2836 EASSERT(!ext
->oe_urgent
, ext
);
2838 OSC_EXTENT_DUMP(D_CACHE
, ext
, "trunc -> cache.\n");
2839 osc_object_lock(obj
);
2840 osc_extent_state_set(ext
, OES_CACHE
);
2841 if (ext
->oe_fsync_wait
&& !ext
->oe_urgent
) {
2843 list_move_tail(&ext
->oe_link
, &obj
->oo_urgent_exts
);
2846 osc_update_pending(obj
, OBD_BRW_WRITE
, ext
->oe_nr_pages
);
2847 osc_object_unlock(obj
);
2848 osc_extent_put(env
, ext
);
2851 osc_io_unplug_async(env
, osc_cli(obj
), obj
);
2856 * Wait for extents in a specific range to be written out.
2857 * The caller must have called osc_cache_writeback_range() to issue IO
2858 * otherwise it will take a long time for this function to finish.
2860 * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2861 * nobody else can dirty this range of file while we're waiting for
2862 * extents to be written.
2864 int osc_cache_wait_range(const struct lu_env
*env
, struct osc_object
*obj
,
2865 pgoff_t start
, pgoff_t end
)
2867 struct osc_extent
*ext
;
2868 pgoff_t index
= start
;
2872 osc_object_lock(obj
);
2873 ext
= osc_extent_search(obj
, index
);
2875 ext
= first_extent(obj
);
2876 else if (ext
->oe_end
< index
)
2877 ext
= next_extent(ext
);
2881 if (ext
->oe_start
> end
)
2884 if (!ext
->oe_fsync_wait
) {
2885 ext
= next_extent(ext
);
2889 EASSERT(ergo(ext
->oe_state
== OES_CACHE
,
2890 ext
->oe_hp
|| ext
->oe_urgent
), ext
);
2891 EASSERT(ergo(ext
->oe_state
== OES_ACTIVE
,
2892 !ext
->oe_hp
&& ext
->oe_urgent
), ext
);
2894 index
= ext
->oe_end
+ 1;
2895 osc_extent_get(ext
);
2896 osc_object_unlock(obj
);
2898 rc
= osc_extent_wait(env
, ext
, OES_INV
);
2901 osc_extent_put(env
, ext
);
2904 osc_object_unlock(obj
);
2906 OSC_IO_DEBUG(obj
, "sync file range.\n");
2911 * Called to write out a range of osc object.
2913 * @hp : should be set this is caused by lock cancel;
2914 * @discard: is set if dirty pages should be dropped - file will be deleted or
2915 * truncated, this implies there is no partially discarding extents.
2917 * Return how many pages will be issued, or error code if error occurred.
2919 int osc_cache_writeback_range(const struct lu_env
*env
, struct osc_object
*obj
,
2920 pgoff_t start
, pgoff_t end
, int hp
, int discard
)
2922 struct osc_extent
*ext
;
2923 LIST_HEAD(discard_list
);
2924 bool unplug
= false;
2927 osc_object_lock(obj
);
2928 ext
= osc_extent_search(obj
, start
);
2930 ext
= first_extent(obj
);
2931 else if (ext
->oe_end
< start
)
2932 ext
= next_extent(ext
);
2934 if (ext
->oe_start
> end
)
2937 ext
->oe_fsync_wait
= 1;
2938 switch (ext
->oe_state
) {
2940 result
+= ext
->oe_nr_pages
;
2942 struct list_head
*list
= NULL
;
2945 EASSERT(!ext
->oe_hp
, ext
);
2947 list
= &obj
->oo_hp_exts
;
2948 } else if (!ext
->oe_urgent
) {
2950 list
= &obj
->oo_urgent_exts
;
2953 list_move_tail(&ext
->oe_link
, list
);
2956 /* the only discarder is lock cancelling, so
2957 * [start, end] must contain this extent
2959 EASSERT(ext
->oe_start
>= start
&&
2960 ext
->oe_max_end
<= end
, ext
);
2961 osc_extent_state_set(ext
, OES_LOCKING
);
2962 ext
->oe_owner
= current
;
2963 list_move_tail(&ext
->oe_link
, &discard_list
);
2964 osc_update_pending(obj
, OBD_BRW_WRITE
,
2969 /* It's pretty bad to wait for ACTIVE extents, because
2970 * we don't know how long we will wait for it to be
2971 * flushed since it may be blocked at awaiting more
2972 * grants. We do this for the correctness of fsync.
2974 LASSERT(hp
== 0 && discard
== 0);
2978 /* this extent is being truncated, can't do anything
2979 * for it now. it will be set to urgent after truncate
2980 * is finished in osc_cache_truncate_end().
2985 ext
= next_extent(ext
);
2987 osc_object_unlock(obj
);
2989 LASSERT(ergo(!discard
, list_empty(&discard_list
)));
2990 if (!list_empty(&discard_list
)) {
2991 struct osc_extent
*tmp
;
2994 osc_list_maint(osc_cli(obj
), obj
);
2995 list_for_each_entry_safe(ext
, tmp
, &discard_list
, oe_link
) {
2996 list_del_init(&ext
->oe_link
);
2997 EASSERT(ext
->oe_state
== OES_LOCKING
, ext
);
2999 /* Discard caching pages. We don't actually write this
3000 * extent out but we complete it as if we did.
3002 rc
= osc_extent_make_ready(env
, ext
);
3003 if (unlikely(rc
< 0)) {
3004 OSC_EXTENT_DUMP(D_ERROR
, ext
,
3005 "make_ready returned %d\n", rc
);
3010 /* finish the extent as if the pages were sent */
3011 osc_extent_finish(env
, ext
, 0, 0);
3016 osc_io_unplug(env
, osc_cli(obj
), obj
);
3018 if (hp
|| discard
) {
3021 rc
= osc_cache_wait_range(env
, obj
, start
, end
);
3022 if (result
>= 0 && rc
< 0)
3026 OSC_IO_DEBUG(obj
, "pageout [%lu, %lu], %d.\n", start
, end
, result
);
3031 * Returns a list of pages by a given [start, end] of \a obj.
3033 * \param resched If not NULL, then we give up before hogging CPU for too
3034 * long and set *resched = 1, in that case caller should implement a retry
3037 * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
3038 * crucial in the face of [offset, EOF] locks.
3040 * Return at least one page in @queue unless there is no covered page.
3042 int osc_page_gang_lookup(const struct lu_env
*env
, struct cl_io
*io
,
3043 struct osc_object
*osc
, pgoff_t start
, pgoff_t end
,
3044 osc_page_gang_cbt cb
, void *cbdata
)
3046 struct osc_page
*ops
;
3052 int res
= CLP_GANG_OKAY
;
3053 bool tree_lock
= true;
3056 pvec
= osc_env_info(env
)->oti_pvec
;
3057 spin_lock(&osc
->oo_tree_lock
);
3058 while ((nr
= radix_tree_gang_lookup(&osc
->oo_tree
, pvec
,
3059 idx
, OTI_PVEC_SIZE
)) > 0) {
3060 struct cl_page
*page
;
3061 bool end_of_region
= false;
3063 for (i
= 0, j
= 0; i
< nr
; ++i
) {
3067 idx
= osc_index(ops
);
3069 end_of_region
= true;
3073 page
= ops
->ops_cl
.cpl_page
;
3074 LASSERT(page
->cp_type
== CPT_CACHEABLE
);
3075 if (page
->cp_state
== CPS_FREEING
)
3079 lu_ref_add_atomic(&page
->cp_reference
,
3080 "gang_lookup", current
);
3086 * Here a delicate locking dance is performed. Current thread
3087 * holds a reference to a page, but has to own it before it
3088 * can be placed into queue. Owning implies waiting, so
3089 * radix-tree lock is to be released. After a wait one has to
3090 * check that pages weren't truncated (cl_page_own() returns
3091 * error in the latter case).
3093 spin_unlock(&osc
->oo_tree_lock
);
3096 for (i
= 0; i
< j
; ++i
) {
3098 if (res
== CLP_GANG_OKAY
)
3099 res
= (*cb
)(env
, io
, ops
, cbdata
);
3101 page
= ops
->ops_cl
.cpl_page
;
3102 lu_ref_del(&page
->cp_reference
, "gang_lookup", current
);
3103 cl_page_put(env
, page
);
3105 if (nr
< OTI_PVEC_SIZE
|| end_of_region
)
3108 if (res
== CLP_GANG_OKAY
&& need_resched())
3109 res
= CLP_GANG_RESCHED
;
3110 if (res
!= CLP_GANG_OKAY
)
3113 spin_lock(&osc
->oo_tree_lock
);
3117 spin_unlock(&osc
->oo_tree_lock
);
3122 * Check if page @page is covered by an extra lock or discard it.
3124 static int check_and_discard_cb(const struct lu_env
*env
, struct cl_io
*io
,
3125 struct osc_page
*ops
, void *cbdata
)
3127 struct osc_thread_info
*info
= osc_env_info(env
);
3128 struct osc_object
*osc
= cbdata
;
3131 index
= osc_index(ops
);
3132 if (index
>= info
->oti_fn_index
) {
3133 struct ldlm_lock
*tmp
;
3134 struct cl_page
*page
= ops
->ops_cl
.cpl_page
;
3136 /* refresh non-overlapped index */
3137 tmp
= osc_dlmlock_at_pgoff(env
, osc
, index
, 0, 0);
3139 __u64 end
= tmp
->l_policy_data
.l_extent
.end
;
3140 /* Cache the first-non-overlapped index so as to skip
3141 * all pages within [index, oti_fn_index). This is safe
3142 * because if tmp lock is canceled, it will discard
3145 info
->oti_fn_index
= cl_index(osc2cl(osc
), end
+ 1);
3146 if (end
== OBD_OBJECT_EOF
)
3147 info
->oti_fn_index
= CL_PAGE_EOF
;
3149 } else if (cl_page_own(env
, io
, page
) == 0) {
3150 /* discard the page */
3151 cl_page_discard(env
, io
, page
);
3152 cl_page_disown(env
, io
, page
);
3154 LASSERT(page
->cp_state
== CPS_FREEING
);
3158 info
->oti_next_index
= index
+ 1;
3159 return CLP_GANG_OKAY
;
3162 static int discard_cb(const struct lu_env
*env
, struct cl_io
*io
,
3163 struct osc_page
*ops
, void *cbdata
)
3165 struct osc_thread_info
*info
= osc_env_info(env
);
3166 struct cl_page
*page
= ops
->ops_cl
.cpl_page
;
3168 /* page is top page. */
3169 info
->oti_next_index
= osc_index(ops
) + 1;
3170 if (cl_page_own(env
, io
, page
) == 0) {
3171 KLASSERT(ergo(page
->cp_type
== CPT_CACHEABLE
,
3172 !PageDirty(cl_page_vmpage(page
))));
3174 /* discard the page */
3175 cl_page_discard(env
, io
, page
);
3176 cl_page_disown(env
, io
, page
);
3178 LASSERT(page
->cp_state
== CPS_FREEING
);
3181 return CLP_GANG_OKAY
;
3185 * Discard pages protected by the given lock. This function traverses radix
3186 * tree to find all covering pages and discard them. If a page is being covered
3187 * by other locks, it should remain in cache.
3189 * If error happens on any step, the process continues anyway (the reasoning
3190 * behind this being that lock cancellation cannot be delayed indefinitely).
3192 int osc_lock_discard_pages(const struct lu_env
*env
, struct osc_object
*osc
,
3193 pgoff_t start
, pgoff_t end
, enum cl_lock_mode mode
)
3195 struct osc_thread_info
*info
= osc_env_info(env
);
3196 struct cl_io
*io
= &info
->oti_io
;
3197 osc_page_gang_cbt cb
;
3201 io
->ci_obj
= cl_object_top(osc2cl(osc
));
3202 io
->ci_ignore_layout
= 1;
3203 result
= cl_io_init(env
, io
, CIT_MISC
, io
->ci_obj
);
3207 cb
= mode
== CLM_READ
? check_and_discard_cb
: discard_cb
;
3208 info
->oti_fn_index
= start
;
3209 info
->oti_next_index
= start
;
3211 res
= osc_page_gang_lookup(env
, io
, osc
,
3212 info
->oti_next_index
, end
, cb
, osc
);
3213 if (info
->oti_next_index
> end
)
3216 if (res
== CLP_GANG_RESCHED
)
3218 } while (res
!= CLP_GANG_OKAY
);
3220 cl_io_fini(env
, io
);
This page took 0.134556 seconds and 5 git commands to generate.