4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * Implementation of cl_page for OSC layer.
34 * Author: Nikita Danilov <nikita.danilov@sun.com>
35 * Author: Jinshan Xiong <jinshan.xiong@intel.com>
38 #define DEBUG_SUBSYSTEM S_OSC
40 #include "osc_cl_internal.h"
42 static void osc_lru_del(struct client_obd
*cli
, struct osc_page
*opg
);
43 static void osc_lru_use(struct client_obd
*cli
, struct osc_page
*opg
);
44 static int osc_lru_reserve(const struct lu_env
*env
, struct osc_object
*obj
,
45 struct osc_page
*opg
);
51 /*****************************************************************************
56 static void osc_page_transfer_get(struct osc_page
*opg
, const char *label
)
58 struct cl_page
*page
= opg
->ops_cl
.cpl_page
;
60 LASSERT(!opg
->ops_transfer_pinned
);
62 lu_ref_add_atomic(&page
->cp_reference
, label
, page
);
63 opg
->ops_transfer_pinned
= 1;
66 static void osc_page_transfer_put(const struct lu_env
*env
,
69 struct cl_page
*page
= opg
->ops_cl
.cpl_page
;
71 if (opg
->ops_transfer_pinned
) {
72 opg
->ops_transfer_pinned
= 0;
73 lu_ref_del(&page
->cp_reference
, "transfer", page
);
74 cl_page_put(env
, page
);
79 * This is called once for every page when it is submitted for a transfer
80 * either opportunistic (osc_page_cache_add()), or immediate
81 * (osc_page_submit()).
83 static void osc_page_transfer_add(const struct lu_env
*env
,
84 struct osc_page
*opg
, enum cl_req_type crt
)
86 struct osc_object
*obj
= cl2osc(opg
->ops_cl
.cpl_obj
);
88 osc_lru_use(osc_cli(obj
), opg
);
90 spin_lock(&obj
->oo_seatbelt
);
91 list_add(&opg
->ops_inflight
, &obj
->oo_inflight
[crt
]);
92 opg
->ops_submitter
= current
;
93 spin_unlock(&obj
->oo_seatbelt
);
96 int osc_page_cache_add(const struct lu_env
*env
,
97 const struct cl_page_slice
*slice
, struct cl_io
*io
)
99 struct osc_page
*opg
= cl2osc_page(slice
);
102 osc_page_transfer_get(opg
, "transfer\0cache");
103 result
= osc_queue_async_io(env
, io
, opg
);
105 osc_page_transfer_put(env
, opg
);
107 osc_page_transfer_add(env
, opg
, CRT_WRITE
);
112 void osc_index2policy(ldlm_policy_data_t
*policy
, const struct cl_object
*obj
,
113 pgoff_t start
, pgoff_t end
)
115 memset(policy
, 0, sizeof(*policy
));
116 policy
->l_extent
.start
= cl_offset(obj
, start
);
117 policy
->l_extent
.end
= cl_offset(obj
, end
+ 1) - 1;
120 static int osc_page_is_under_lock(const struct lu_env
*env
,
121 const struct cl_page_slice
*slice
,
122 struct cl_io
*unused
, pgoff_t
*max_index
)
124 struct osc_page
*opg
= cl2osc_page(slice
);
125 struct ldlm_lock
*dlmlock
;
126 int result
= -ENODATA
;
128 dlmlock
= osc_dlmlock_at_pgoff(env
, cl2osc(slice
->cpl_obj
),
129 osc_index(opg
), 1, 0);
131 *max_index
= cl_index(slice
->cpl_obj
,
132 dlmlock
->l_policy_data
.l_extent
.end
);
133 LDLM_LOCK_PUT(dlmlock
);
139 static const char *osc_list(struct list_head
*head
)
141 return list_empty(head
) ? "-" : "+";
144 static inline unsigned long osc_submit_duration(struct osc_page
*opg
)
146 if (opg
->ops_submit_time
== 0)
149 return (cfs_time_current() - opg
->ops_submit_time
);
152 static int osc_page_print(const struct lu_env
*env
,
153 const struct cl_page_slice
*slice
,
154 void *cookie
, lu_printer_t printer
)
156 struct osc_page
*opg
= cl2osc_page(slice
);
157 struct osc_async_page
*oap
= &opg
->ops_oap
;
158 struct osc_object
*obj
= cl2osc(slice
->cpl_obj
);
159 struct client_obd
*cli
= &osc_export(obj
)->exp_obd
->u
.cli
;
161 return (*printer
)(env
, cookie
, LUSTRE_OSC_NAME
"-page@%p %lu: 1< %#x %d %u %s %s > 2< %llu %u %u %#x %#x | %p %p %p > 3< %s %p %d %lu %d > 4< %d %d %d %lu %s | %s %s %s %s > 5< %s %s %s %s | %d %s | %d %s %s>\n",
164 oap
->oap_magic
, oap
->oap_cmd
,
165 oap
->oap_interrupted
,
166 osc_list(&oap
->oap_pending_item
),
167 osc_list(&oap
->oap_rpc_item
),
169 oap
->oap_obj_off
, oap
->oap_page_off
, oap
->oap_count
,
170 oap
->oap_async_flags
, oap
->oap_brw_flags
,
171 oap
->oap_request
, oap
->oap_cli
, obj
,
173 osc_list(&opg
->ops_inflight
),
174 opg
->ops_submitter
, opg
->ops_transfer_pinned
,
175 osc_submit_duration(opg
), opg
->ops_srvlock
,
177 cli
->cl_r_in_flight
, cli
->cl_w_in_flight
,
178 cli
->cl_max_rpcs_in_flight
,
180 osc_list(&cli
->cl_cache_waiters
),
181 osc_list(&cli
->cl_loi_ready_list
),
182 osc_list(&cli
->cl_loi_hp_ready_list
),
183 osc_list(&cli
->cl_loi_write_list
),
184 osc_list(&cli
->cl_loi_read_list
),
186 osc_list(&obj
->oo_ready_item
),
187 osc_list(&obj
->oo_hp_ready_item
),
188 osc_list(&obj
->oo_write_item
),
189 osc_list(&obj
->oo_read_item
),
190 atomic_read(&obj
->oo_nr_reads
),
191 osc_list(&obj
->oo_reading_exts
),
192 atomic_read(&obj
->oo_nr_writes
),
193 osc_list(&obj
->oo_hp_exts
),
194 osc_list(&obj
->oo_urgent_exts
));
197 static void osc_page_delete(const struct lu_env
*env
,
198 const struct cl_page_slice
*slice
)
200 struct osc_page
*opg
= cl2osc_page(slice
);
201 struct osc_object
*obj
= cl2osc(opg
->ops_cl
.cpl_obj
);
204 CDEBUG(D_TRACE
, "%p\n", opg
);
205 osc_page_transfer_put(env
, opg
);
206 rc
= osc_teardown_async_page(env
, obj
, opg
);
208 CL_PAGE_DEBUG(D_ERROR
, env
, slice
->cpl_page
,
209 "Trying to teardown failed: %d\n", rc
);
213 spin_lock(&obj
->oo_seatbelt
);
214 if (opg
->ops_submitter
) {
215 LASSERT(!list_empty(&opg
->ops_inflight
));
216 list_del_init(&opg
->ops_inflight
);
217 opg
->ops_submitter
= NULL
;
219 spin_unlock(&obj
->oo_seatbelt
);
221 osc_lru_del(osc_cli(obj
), opg
);
223 if (slice
->cpl_page
->cp_type
== CPT_CACHEABLE
) {
226 spin_lock(&obj
->oo_tree_lock
);
227 value
= radix_tree_delete(&obj
->oo_tree
, osc_index(opg
));
230 spin_unlock(&obj
->oo_tree_lock
);
232 LASSERT(ergo(value
, value
== opg
));
236 static void osc_page_clip(const struct lu_env
*env
,
237 const struct cl_page_slice
*slice
, int from
, int to
)
239 struct osc_page
*opg
= cl2osc_page(slice
);
240 struct osc_async_page
*oap
= &opg
->ops_oap
;
242 opg
->ops_from
= from
;
244 spin_lock(&oap
->oap_lock
);
245 oap
->oap_async_flags
|= ASYNC_COUNT_STABLE
;
246 spin_unlock(&oap
->oap_lock
);
249 static int osc_page_cancel(const struct lu_env
*env
,
250 const struct cl_page_slice
*slice
)
252 struct osc_page
*opg
= cl2osc_page(slice
);
255 /* Check if the transferring against this page
256 * is completed, or not even queued.
258 if (opg
->ops_transfer_pinned
)
259 /* FIXME: may not be interrupted.. */
260 rc
= osc_cancel_async_page(env
, opg
);
261 LASSERT(ergo(rc
== 0, opg
->ops_transfer_pinned
== 0));
265 static int osc_page_flush(const struct lu_env
*env
,
266 const struct cl_page_slice
*slice
,
269 struct osc_page
*opg
= cl2osc_page(slice
);
272 rc
= osc_flush_async_page(env
, io
, opg
);
276 static const struct cl_page_operations osc_page_ops
= {
277 .cpo_print
= osc_page_print
,
278 .cpo_delete
= osc_page_delete
,
279 .cpo_is_under_lock
= osc_page_is_under_lock
,
280 .cpo_clip
= osc_page_clip
,
281 .cpo_cancel
= osc_page_cancel
,
282 .cpo_flush
= osc_page_flush
285 int osc_page_init(const struct lu_env
*env
, struct cl_object
*obj
,
286 struct cl_page
*page
, pgoff_t index
)
288 struct osc_object
*osc
= cl2osc(obj
);
289 struct osc_page
*opg
= cl_object_page_slice(obj
, page
);
293 opg
->ops_to
= PAGE_SIZE
;
295 result
= osc_prep_async_page(osc
, opg
, page
->cp_vmpage
,
296 cl_offset(obj
, index
));
298 struct osc_io
*oio
= osc_env_io(env
);
300 opg
->ops_srvlock
= osc_io_srvlock(oio
);
301 cl_page_slice_add(page
, &opg
->ops_cl
, obj
, index
,
304 /* ops_inflight and ops_lru are the same field, but it doesn't
305 * hurt to initialize it twice :-)
307 INIT_LIST_HEAD(&opg
->ops_inflight
);
308 INIT_LIST_HEAD(&opg
->ops_lru
);
310 /* reserve an LRU space for this page */
311 if (page
->cp_type
== CPT_CACHEABLE
&& result
== 0) {
312 result
= osc_lru_reserve(env
, osc
, opg
);
314 spin_lock(&osc
->oo_tree_lock
);
315 result
= radix_tree_insert(&osc
->oo_tree
, index
, opg
);
318 spin_unlock(&osc
->oo_tree_lock
);
319 LASSERT(result
== 0);
326 int osc_over_unstable_soft_limit(struct client_obd
*cli
)
328 long obd_upages
, obd_dpages
, osc_upages
;
330 /* Can't check cli->cl_unstable_count, therefore, no soft limit */
334 obd_upages
= atomic_read(&obd_unstable_pages
);
335 obd_dpages
= atomic_read(&obd_dirty_pages
);
337 osc_upages
= atomic_read(&cli
->cl_unstable_count
);
340 * obd_max_dirty_pages is the max number of (dirty + unstable)
341 * pages allowed at any given time. To simulate an unstable page
342 * only limit, we subtract the current number of dirty pages
343 * from this max. This difference is roughly the amount of pages
344 * currently available for unstable pages. Thus, the soft limit
345 * is half of that difference. Check osc_upages to ensure we don't
346 * set SOFT_SYNC for OSCs without any outstanding unstable pages.
349 obd_upages
>= (obd_max_dirty_pages
- obd_dpages
) / 2;
353 * Helper function called by osc_io_submit() for every page in an immediate
354 * transfer (i.e., transferred synchronously).
356 void osc_page_submit(const struct lu_env
*env
, struct osc_page
*opg
,
357 enum cl_req_type crt
, int brw_flags
)
359 struct osc_async_page
*oap
= &opg
->ops_oap
;
361 LASSERTF(oap
->oap_magic
== OAP_MAGIC
, "Bad oap magic: oap %p, magic 0x%x\n",
362 oap
, oap
->oap_magic
);
363 LASSERT(oap
->oap_async_flags
& ASYNC_READY
);
364 LASSERT(oap
->oap_async_flags
& ASYNC_COUNT_STABLE
);
366 oap
->oap_cmd
= crt
== CRT_WRITE
? OBD_BRW_WRITE
: OBD_BRW_READ
;
367 oap
->oap_page_off
= opg
->ops_from
;
368 oap
->oap_count
= opg
->ops_to
- opg
->ops_from
;
369 oap
->oap_brw_flags
= brw_flags
| OBD_BRW_SYNC
;
371 if (osc_over_unstable_soft_limit(oap
->oap_cli
))
372 oap
->oap_brw_flags
|= OBD_BRW_SOFT_SYNC
;
374 if (capable(CFS_CAP_SYS_RESOURCE
)) {
375 oap
->oap_brw_flags
|= OBD_BRW_NOQUOTA
;
376 oap
->oap_cmd
|= OBD_BRW_NOQUOTA
;
379 opg
->ops_submit_time
= cfs_time_current();
380 osc_page_transfer_get(opg
, "transfer\0imm");
381 osc_page_transfer_add(env
, opg
, crt
);
384 /* --------------- LRU page management ------------------ */
386 /* OSC is a natural place to manage LRU pages as applications are specialized
387 * to write OSC by OSC. Ideally, if one OSC is used more frequently it should
388 * occupy more LRU slots. On the other hand, we should avoid using up all LRU
389 * slots (client_obd::cl_lru_left) otherwise process has to be put into sleep
390 * for free LRU slots - this will be very bad so the algorithm requires each
391 * OSC to free slots voluntarily to maintain a reasonable number of free slots
395 static DECLARE_WAIT_QUEUE_HEAD(osc_lru_waitq
);
396 /* LRU pages are freed in batch mode. OSC should at least free this
397 * number of pages to avoid running out of LRU budget, and..
399 static const int lru_shrink_min
= 2 << (20 - PAGE_SHIFT
); /* 2M */
400 /* free this number at most otherwise it will take too long time to finish. */
401 static const int lru_shrink_max
= 8 << (20 - PAGE_SHIFT
); /* 8M */
403 /* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
404 * we should free slots aggressively. In this way, slots are freed in a steady
405 * step to maintain fairness among OSCs.
407 * Return how many LRU pages should be freed.
409 static int osc_cache_too_much(struct client_obd
*cli
)
411 struct cl_client_cache
*cache
= cli
->cl_cache
;
412 int pages
= atomic_read(&cli
->cl_lru_in_list
);
413 unsigned long budget
;
415 budget
= cache
->ccc_lru_max
/ (atomic_read(&cache
->ccc_users
) - 2);
417 /* if it's going to run out LRU slots, we should free some, but not
418 * too much to maintain fairness among OSCs.
420 if (atomic_read(cli
->cl_lru_left
) < cache
->ccc_lru_max
>> 4) {
422 return lru_shrink_max
;
423 else if (pages
>= budget
/ 2)
424 return lru_shrink_min
;
425 } else if (pages
>= budget
* 2) {
426 return lru_shrink_min
;
431 int lru_queue_work(const struct lu_env
*env
, void *data
)
433 struct client_obd
*cli
= data
;
435 CDEBUG(D_CACHE
, "Run LRU work for client obd %p.\n", cli
);
437 if (osc_cache_too_much(cli
))
438 osc_lru_shrink(env
, cli
, lru_shrink_max
, true);
443 void osc_lru_add_batch(struct client_obd
*cli
, struct list_head
*plist
)
446 struct osc_async_page
*oap
;
449 list_for_each_entry(oap
, plist
, oap_pending_item
) {
450 struct osc_page
*opg
= oap2osc_page(oap
);
452 if (!opg
->ops_in_lru
)
456 LASSERT(list_empty(&opg
->ops_lru
));
457 list_add(&opg
->ops_lru
, &lru
);
461 spin_lock(&cli
->cl_lru_list_lock
);
462 list_splice_tail(&lru
, &cli
->cl_lru_list
);
463 atomic_sub(npages
, &cli
->cl_lru_busy
);
464 atomic_add(npages
, &cli
->cl_lru_in_list
);
465 spin_unlock(&cli
->cl_lru_list_lock
);
467 /* XXX: May set force to be true for better performance */
468 if (osc_cache_too_much(cli
))
469 (void)ptlrpcd_queue_work(cli
->cl_lru_work
);
473 static void __osc_lru_del(struct client_obd
*cli
, struct osc_page
*opg
)
475 LASSERT(atomic_read(&cli
->cl_lru_in_list
) > 0);
476 list_del_init(&opg
->ops_lru
);
477 atomic_dec(&cli
->cl_lru_in_list
);
481 * Page is being destroyed. The page may be not in LRU list, if the transfer
482 * has never finished(error occurred).
484 static void osc_lru_del(struct client_obd
*cli
, struct osc_page
*opg
)
486 if (opg
->ops_in_lru
) {
487 spin_lock(&cli
->cl_lru_list_lock
);
488 if (!list_empty(&opg
->ops_lru
)) {
489 __osc_lru_del(cli
, opg
);
491 LASSERT(atomic_read(&cli
->cl_lru_busy
) > 0);
492 atomic_dec(&cli
->cl_lru_busy
);
494 spin_unlock(&cli
->cl_lru_list_lock
);
496 atomic_inc(cli
->cl_lru_left
);
497 /* this is a great place to release more LRU pages if
498 * this osc occupies too many LRU pages and kernel is
499 * stealing one of them.
501 if (!memory_pressure_get())
502 (void)ptlrpcd_queue_work(cli
->cl_lru_work
);
503 wake_up(&osc_lru_waitq
);
505 LASSERT(list_empty(&opg
->ops_lru
));
510 * Delete page from LRUlist for redirty.
512 static void osc_lru_use(struct client_obd
*cli
, struct osc_page
*opg
)
514 /* If page is being transferred for the first time,
515 * ops_lru should be empty
517 if (opg
->ops_in_lru
&& !list_empty(&opg
->ops_lru
)) {
518 spin_lock(&cli
->cl_lru_list_lock
);
519 __osc_lru_del(cli
, opg
);
520 spin_unlock(&cli
->cl_lru_list_lock
);
521 atomic_inc(&cli
->cl_lru_busy
);
525 static void discard_pagevec(const struct lu_env
*env
, struct cl_io
*io
,
526 struct cl_page
**pvec
, int max_index
)
530 for (i
= 0; i
< max_index
; i
++) {
531 struct cl_page
*page
= pvec
[i
];
533 LASSERT(cl_page_is_owned(page
, io
));
534 cl_page_discard(env
, io
, page
);
535 cl_page_disown(env
, io
, page
);
536 cl_page_put(env
, page
);
543 * Drop @target of pages from LRU at most.
545 int osc_lru_shrink(const struct lu_env
*env
, struct client_obd
*cli
,
546 int target
, bool force
)
549 struct cl_object
*clobj
= NULL
;
550 struct cl_page
**pvec
;
551 struct osc_page
*opg
;
552 struct osc_page
*temp
;
558 LASSERT(atomic_read(&cli
->cl_lru_in_list
) >= 0);
559 if (atomic_read(&cli
->cl_lru_in_list
) == 0 || target
<= 0)
563 if (atomic_read(&cli
->cl_lru_shrinkers
) > 0)
566 if (atomic_inc_return(&cli
->cl_lru_shrinkers
) > 1) {
567 atomic_dec(&cli
->cl_lru_shrinkers
);
571 atomic_inc(&cli
->cl_lru_shrinkers
);
574 pvec
= (struct cl_page
**)osc_env_info(env
)->oti_pvec
;
575 io
= &osc_env_info(env
)->oti_io
;
577 spin_lock(&cli
->cl_lru_list_lock
);
578 maxscan
= min(target
<< 1, atomic_read(&cli
->cl_lru_in_list
));
579 list_for_each_entry_safe(opg
, temp
, &cli
->cl_lru_list
, ops_lru
) {
580 struct cl_page
*page
;
581 bool will_free
= false;
586 page
= opg
->ops_cl
.cpl_page
;
587 if (cl_page_in_use_noref(page
)) {
588 list_move_tail(&opg
->ops_lru
, &cli
->cl_lru_list
);
592 LASSERT(page
->cp_obj
);
593 if (clobj
!= page
->cp_obj
) {
594 struct cl_object
*tmp
= page
->cp_obj
;
597 spin_unlock(&cli
->cl_lru_list_lock
);
600 discard_pagevec(env
, io
, pvec
, index
);
604 cl_object_put(env
, clobj
);
610 io
->ci_ignore_layout
= 1;
611 rc
= cl_io_init(env
, io
, CIT_MISC
, clobj
);
613 spin_lock(&cli
->cl_lru_list_lock
);
622 if (cl_page_own_try(env
, io
, page
) == 0) {
623 if (!cl_page_in_use_noref(page
)) {
624 /* remove it from lru list earlier to avoid
627 __osc_lru_del(cli
, opg
);
628 opg
->ops_in_lru
= 0; /* will be discarded */
633 cl_page_disown(env
, io
, page
);
638 list_move_tail(&opg
->ops_lru
, &cli
->cl_lru_list
);
642 /* Don't discard and free the page with cl_lru_list held */
643 pvec
[index
++] = page
;
644 if (unlikely(index
== OTI_PVEC_SIZE
)) {
645 spin_unlock(&cli
->cl_lru_list_lock
);
646 discard_pagevec(env
, io
, pvec
, index
);
649 spin_lock(&cli
->cl_lru_list_lock
);
652 if (++count
>= target
)
655 spin_unlock(&cli
->cl_lru_list_lock
);
658 discard_pagevec(env
, io
, pvec
, index
);
661 cl_object_put(env
, clobj
);
664 atomic_dec(&cli
->cl_lru_shrinkers
);
666 atomic_add(count
, cli
->cl_lru_left
);
667 wake_up_all(&osc_lru_waitq
);
669 return count
> 0 ? count
: rc
;
672 static inline int max_to_shrink(struct client_obd
*cli
)
674 return min(atomic_read(&cli
->cl_lru_in_list
) >> 1, lru_shrink_max
);
677 int osc_lru_reclaim(struct client_obd
*cli
)
679 struct cl_env_nest nest
;
681 struct cl_client_cache
*cache
= cli
->cl_cache
;
687 env
= cl_env_nested_get(&nest
);
691 rc
= osc_lru_shrink(env
, cli
, osc_cache_too_much(cli
), false);
696 CDEBUG(D_CACHE
, "%s: Free %d pages from own LRU: %p.\n",
697 cli
->cl_import
->imp_obd
->obd_name
, rc
, cli
);
701 CDEBUG(D_CACHE
, "%s: cli %p no free slots, pages: %d, busy: %d.\n",
702 cli
->cl_import
->imp_obd
->obd_name
, cli
,
703 atomic_read(&cli
->cl_lru_in_list
),
704 atomic_read(&cli
->cl_lru_busy
));
706 /* Reclaim LRU slots from other client_obd as it can't free enough
707 * from its own. This should rarely happen.
709 spin_lock(&cache
->ccc_lru_lock
);
710 LASSERT(!list_empty(&cache
->ccc_lru
));
712 cache
->ccc_lru_shrinkers
++;
713 list_move_tail(&cli
->cl_lru_osc
, &cache
->ccc_lru
);
715 max_scans
= atomic_read(&cache
->ccc_users
) - 2;
716 while (--max_scans
> 0 && !list_empty(&cache
->ccc_lru
)) {
717 cli
= list_entry(cache
->ccc_lru
.next
, struct client_obd
,
720 CDEBUG(D_CACHE
, "%s: cli %p LRU pages: %d, busy: %d.\n",
721 cli
->cl_import
->imp_obd
->obd_name
, cli
,
722 atomic_read(&cli
->cl_lru_in_list
),
723 atomic_read(&cli
->cl_lru_busy
));
725 list_move_tail(&cli
->cl_lru_osc
, &cache
->ccc_lru
);
726 if (osc_cache_too_much(cli
) > 0) {
727 spin_unlock(&cache
->ccc_lru_lock
);
729 rc
= osc_lru_shrink(env
, cli
, osc_cache_too_much(cli
),
731 spin_lock(&cache
->ccc_lru_lock
);
736 spin_unlock(&cache
->ccc_lru_lock
);
739 cl_env_nested_put(&nest
, env
);
740 CDEBUG(D_CACHE
, "%s: cli %p freed %d pages.\n",
741 cli
->cl_import
->imp_obd
->obd_name
, cli
, rc
);
745 static int osc_lru_reserve(const struct lu_env
*env
, struct osc_object
*obj
,
746 struct osc_page
*opg
)
748 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
, NULL
);
749 struct osc_io
*oio
= osc_env_io(env
);
750 struct client_obd
*cli
= osc_cli(obj
);
753 if (!cli
->cl_cache
) /* shall not be in LRU */
756 if (oio
->oi_lru_reserved
> 0) {
757 --oio
->oi_lru_reserved
;
761 LASSERT(atomic_read(cli
->cl_lru_left
) >= 0);
762 while (!atomic_add_unless(cli
->cl_lru_left
, -1, 0)) {
763 /* run out of LRU spaces, try to drop some by itself */
764 rc
= osc_lru_reclaim(cli
);
772 rc
= l_wait_event(osc_lru_waitq
,
773 atomic_read(cli
->cl_lru_left
) > 0,
782 atomic_inc(&cli
->cl_lru_busy
);