4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/lustre_net.h"
43 #include "../include/lustre/lustre_user.h"
44 #include "../include/obd_cksum.h"
46 #include "../include/lustre_ha.h"
47 #include "../include/lprocfs_status.h"
48 #include "../include/lustre_debug.h"
49 #include "../include/lustre_param.h"
50 #include "../include/lustre_fid.h"
51 #include "../include/obd_class.h"
52 #include "../include/obd.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
56 atomic_t osc_pool_req_count
;
57 unsigned int osc_reqpool_maxreqcount
;
58 struct ptlrpc_request_pool
*osc_rq_pool
;
60 /* max memory used for request pool, unit is MB */
61 static unsigned int osc_reqpool_mem_max
= 5;
62 module_param(osc_reqpool_mem_max
, uint
, 0444);
64 struct osc_brw_async_args
{
70 struct brw_page
**aa_ppga
;
71 struct client_obd
*aa_cli
;
72 struct list_head aa_oaps
;
73 struct list_head aa_exts
;
74 struct cl_req
*aa_clerq
;
77 struct osc_async_args
{
78 struct obd_info
*aa_oi
;
81 struct osc_setattr_args
{
83 obd_enqueue_update_f sa_upcall
;
87 struct osc_fsync_args
{
88 struct obd_info
*fa_oi
;
89 obd_enqueue_update_f fa_upcall
;
93 struct osc_enqueue_args
{
94 struct obd_export
*oa_exp
;
96 obd_enqueue_update_f oa_upcall
;
98 struct ost_lvb
*oa_lvb
;
99 struct lustre_handle
*oa_lockh
;
100 struct ldlm_enqueue_info
*oa_ei
;
101 unsigned int oa_agl
:1;
104 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
);
105 static int brw_interpret(const struct lu_env
*env
,
106 struct ptlrpc_request
*req
, void *data
, int rc
);
108 /* Pack OSC object metadata for disk storage (LE byte order). */
109 static int osc_packmd(struct obd_export
*exp
, struct lov_mds_md
**lmmp
,
110 struct lov_stripe_md
*lsm
)
114 lmm_size
= sizeof(**lmmp
);
122 } else if (unlikely(lsm
&& ostid_id(&lsm
->lsm_oi
) == 0)) {
127 *lmmp
= kzalloc(lmm_size
, GFP_NOFS
);
133 ostid_cpu_to_le(&lsm
->lsm_oi
, &(*lmmp
)->lmm_oi
);
138 /* Unpack OSC object metadata from disk storage (LE byte order). */
139 static int osc_unpackmd(struct obd_export
*exp
, struct lov_stripe_md
**lsmp
,
140 struct lov_mds_md
*lmm
, int lmm_bytes
)
143 struct obd_import
*imp
= class_exp2cliimp(exp
);
146 if (lmm_bytes
< sizeof(*lmm
)) {
147 CERROR("%s: lov_mds_md too small: %d, need %d\n",
148 exp
->exp_obd
->obd_name
, lmm_bytes
,
152 /* XXX LOV_MAGIC etc check? */
154 if (unlikely(ostid_id(&lmm
->lmm_oi
) == 0)) {
155 CERROR("%s: zero lmm_object_id: rc = %d\n",
156 exp
->exp_obd
->obd_name
, -EINVAL
);
161 lsm_size
= lov_stripe_md_size(1);
166 kfree((*lsmp
)->lsm_oinfo
[0]);
173 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
174 if (unlikely(!*lsmp
))
176 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
),
178 if (unlikely(!(*lsmp
)->lsm_oinfo
[0])) {
182 loi_init((*lsmp
)->lsm_oinfo
[0]);
183 } else if (unlikely(ostid_id(&(*lsmp
)->lsm_oi
) == 0)) {
188 /* XXX zero *lsmp? */
189 ostid_le_to_cpu(&lmm
->lmm_oi
, &(*lsmp
)->lsm_oi
);
192 (imp
->imp_connect_data
.ocd_connect_flags
& OBD_CONNECT_MAXBYTES
))
193 (*lsmp
)->lsm_maxbytes
= imp
->imp_connect_data
.ocd_maxbytes
;
195 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
200 static inline void osc_pack_req_body(struct ptlrpc_request
*req
,
201 struct obd_info
*oinfo
)
203 struct ost_body
*body
;
205 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
208 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
212 static int osc_getattr_interpret(const struct lu_env
*env
,
213 struct ptlrpc_request
*req
,
214 struct osc_async_args
*aa
, int rc
)
216 struct ost_body
*body
;
221 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
223 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
224 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
225 aa
->aa_oi
->oi_oa
, &body
->oa
);
227 /* This should really be sent by the OST */
228 aa
->aa_oi
->oi_oa
->o_blksize
= DT_MAX_BRW_SIZE
;
229 aa
->aa_oi
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
231 CDEBUG(D_INFO
, "can't unpack ost_body\n");
233 aa
->aa_oi
->oi_oa
->o_valid
= 0;
236 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
240 static int osc_getattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
241 struct ptlrpc_request_set
*set
)
243 struct ptlrpc_request
*req
;
244 struct osc_async_args
*aa
;
247 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
251 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
253 ptlrpc_request_free(req
);
257 osc_pack_req_body(req
, oinfo
);
259 ptlrpc_request_set_replen(req
);
260 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_getattr_interpret
;
262 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
263 aa
= ptlrpc_req_async_args(req
);
266 ptlrpc_set_add_req(set
, req
);
270 static int osc_getattr(const struct lu_env
*env
, struct obd_export
*exp
,
271 struct obd_info
*oinfo
)
273 struct ptlrpc_request
*req
;
274 struct ost_body
*body
;
277 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
281 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
283 ptlrpc_request_free(req
);
287 osc_pack_req_body(req
, oinfo
);
289 ptlrpc_request_set_replen(req
);
291 rc
= ptlrpc_queue_wait(req
);
295 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
301 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
302 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
305 oinfo
->oi_oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
306 oinfo
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
309 ptlrpc_req_finished(req
);
313 static int osc_setattr(const struct lu_env
*env
, struct obd_export
*exp
,
314 struct obd_info
*oinfo
, struct obd_trans_info
*oti
)
316 struct ptlrpc_request
*req
;
317 struct ost_body
*body
;
320 LASSERT(oinfo
->oi_oa
->o_valid
& OBD_MD_FLGROUP
);
322 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
326 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
328 ptlrpc_request_free(req
);
332 osc_pack_req_body(req
, oinfo
);
334 ptlrpc_request_set_replen(req
);
336 rc
= ptlrpc_queue_wait(req
);
340 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
346 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
350 ptlrpc_req_finished(req
);
354 static int osc_setattr_interpret(const struct lu_env
*env
,
355 struct ptlrpc_request
*req
,
356 struct osc_setattr_args
*sa
, int rc
)
358 struct ost_body
*body
;
363 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
369 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, sa
->sa_oa
,
372 rc
= sa
->sa_upcall(sa
->sa_cookie
, rc
);
376 int osc_setattr_async_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
377 struct obd_trans_info
*oti
,
378 obd_enqueue_update_f upcall
, void *cookie
,
379 struct ptlrpc_request_set
*rqset
)
381 struct ptlrpc_request
*req
;
382 struct osc_setattr_args
*sa
;
385 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
389 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
391 ptlrpc_request_free(req
);
395 if (oti
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
396 oinfo
->oi_oa
->o_lcookie
= *oti
->oti_logcookies
;
398 osc_pack_req_body(req
, oinfo
);
400 ptlrpc_request_set_replen(req
);
402 /* do mds to ost setattr asynchronously */
404 /* Do not wait for response. */
405 ptlrpcd_add_req(req
);
407 req
->rq_interpret_reply
=
408 (ptlrpc_interpterer_t
)osc_setattr_interpret
;
410 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
411 sa
= ptlrpc_req_async_args(req
);
412 sa
->sa_oa
= oinfo
->oi_oa
;
413 sa
->sa_upcall
= upcall
;
414 sa
->sa_cookie
= cookie
;
416 if (rqset
== PTLRPCD_SET
)
417 ptlrpcd_add_req(req
);
419 ptlrpc_set_add_req(rqset
, req
);
425 static int osc_setattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
426 struct obd_trans_info
*oti
,
427 struct ptlrpc_request_set
*rqset
)
429 return osc_setattr_async_base(exp
, oinfo
, oti
,
430 oinfo
->oi_cb_up
, oinfo
, rqset
);
433 static int osc_real_create(struct obd_export
*exp
, struct obdo
*oa
,
434 struct lov_stripe_md
**ea
,
435 struct obd_trans_info
*oti
)
437 struct ptlrpc_request
*req
;
438 struct ost_body
*body
;
439 struct lov_stripe_md
*lsm
;
447 rc
= obd_alloc_memmd(exp
, &lsm
);
452 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_CREATE
);
458 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_CREATE
);
460 ptlrpc_request_free(req
);
464 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
467 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
469 ptlrpc_request_set_replen(req
);
471 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
472 oa
->o_flags
== OBD_FL_DELORPHAN
) {
474 "delorphan from OST integration");
475 /* Don't resend the delorphan req */
476 req
->rq_no_resend
= req
->rq_no_delay
= 1;
479 rc
= ptlrpc_queue_wait(req
);
483 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
489 CDEBUG(D_INFO
, "oa flags %x\n", oa
->o_flags
);
490 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oa
, &body
->oa
);
492 oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
493 oa
->o_valid
|= OBD_MD_FLBLKSZ
;
495 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
496 * have valid lsm_oinfo data structs, so don't go touching that.
497 * This needs to be fixed in a big way.
499 lsm
->lsm_oi
= oa
->o_oi
;
503 oti
->oti_transno
= lustre_msg_get_transno(req
->rq_repmsg
);
505 if (oa
->o_valid
& OBD_MD_FLCOOKIE
) {
506 if (!oti
->oti_logcookies
)
507 oti_alloc_cookies(oti
, 1);
508 *oti
->oti_logcookies
= oa
->o_lcookie
;
512 CDEBUG(D_HA
, "transno: %lld\n",
513 lustre_msg_get_transno(req
->rq_repmsg
));
515 ptlrpc_req_finished(req
);
518 obd_free_memmd(exp
, &lsm
);
522 int osc_punch_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
523 obd_enqueue_update_f upcall
, void *cookie
,
524 struct ptlrpc_request_set
*rqset
)
526 struct ptlrpc_request
*req
;
527 struct osc_setattr_args
*sa
;
528 struct ost_body
*body
;
531 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_PUNCH
);
535 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_PUNCH
);
537 ptlrpc_request_free(req
);
540 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
541 ptlrpc_at_set_req_timeout(req
);
543 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
545 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
548 ptlrpc_request_set_replen(req
);
550 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_setattr_interpret
;
551 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
552 sa
= ptlrpc_req_async_args(req
);
553 sa
->sa_oa
= oinfo
->oi_oa
;
554 sa
->sa_upcall
= upcall
;
555 sa
->sa_cookie
= cookie
;
556 if (rqset
== PTLRPCD_SET
)
557 ptlrpcd_add_req(req
);
559 ptlrpc_set_add_req(rqset
, req
);
564 static int osc_sync_interpret(const struct lu_env
*env
,
565 struct ptlrpc_request
*req
,
568 struct osc_fsync_args
*fa
= arg
;
569 struct ost_body
*body
;
574 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
576 CERROR("can't unpack ost_body\n");
581 *fa
->fa_oi
->oi_oa
= body
->oa
;
583 rc
= fa
->fa_upcall(fa
->fa_cookie
, rc
);
587 int osc_sync_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
588 obd_enqueue_update_f upcall
, void *cookie
,
589 struct ptlrpc_request_set
*rqset
)
591 struct ptlrpc_request
*req
;
592 struct ost_body
*body
;
593 struct osc_fsync_args
*fa
;
596 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SYNC
);
600 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SYNC
);
602 ptlrpc_request_free(req
);
606 /* overload the size and blocks fields in the oa with start/end */
607 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
609 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
612 ptlrpc_request_set_replen(req
);
613 req
->rq_interpret_reply
= osc_sync_interpret
;
615 CLASSERT(sizeof(*fa
) <= sizeof(req
->rq_async_args
));
616 fa
= ptlrpc_req_async_args(req
);
618 fa
->fa_upcall
= upcall
;
619 fa
->fa_cookie
= cookie
;
621 if (rqset
== PTLRPCD_SET
)
622 ptlrpcd_add_req(req
);
624 ptlrpc_set_add_req(rqset
, req
);
629 /* Find and cancel locally locks matched by @mode in the resource found by
630 * @objid. Found locks are added into @cancel list. Returns the amount of
631 * locks added to @cancels list.
633 static int osc_resource_get_unused(struct obd_export
*exp
, struct obdo
*oa
,
634 struct list_head
*cancels
,
635 enum ldlm_mode mode
, __u64 lock_flags
)
637 struct ldlm_namespace
*ns
= exp
->exp_obd
->obd_namespace
;
638 struct ldlm_res_id res_id
;
639 struct ldlm_resource
*res
;
642 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
643 * export) but disabled through procfs (flag in NS).
645 * This distinguishes from a case when ELC is not supported originally,
646 * when we still want to cancel locks in advance and just cancel them
647 * locally, without sending any RPC.
649 if (exp_connect_cancelset(exp
) && !ns_connect_cancelset(ns
))
652 ostid_build_res_name(&oa
->o_oi
, &res_id
);
653 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
657 LDLM_RESOURCE_ADDREF(res
);
658 count
= ldlm_cancel_resource_local(res
, cancels
, NULL
, mode
,
659 lock_flags
, 0, NULL
);
660 LDLM_RESOURCE_DELREF(res
);
661 ldlm_resource_putref(res
);
665 static int osc_destroy_interpret(const struct lu_env
*env
,
666 struct ptlrpc_request
*req
, void *data
,
669 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
671 atomic_dec(&cli
->cl_destroy_in_flight
);
672 wake_up(&cli
->cl_destroy_waitq
);
676 static int osc_can_send_destroy(struct client_obd
*cli
)
678 if (atomic_inc_return(&cli
->cl_destroy_in_flight
) <=
679 cli
->cl_max_rpcs_in_flight
) {
680 /* The destroy request can be sent */
683 if (atomic_dec_return(&cli
->cl_destroy_in_flight
) <
684 cli
->cl_max_rpcs_in_flight
) {
686 * The counter has been modified between the two atomic
689 wake_up(&cli
->cl_destroy_waitq
);
694 static int osc_create(const struct lu_env
*env
, struct obd_export
*exp
,
695 struct obdo
*oa
, struct lov_stripe_md
**ea
,
696 struct obd_trans_info
*oti
)
702 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
704 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
705 oa
->o_flags
== OBD_FL_RECREATE_OBJS
) {
706 return osc_real_create(exp
, oa
, ea
, oti
);
709 if (!fid_seq_is_mdt(ostid_seq(&oa
->o_oi
)))
710 return osc_real_create(exp
, oa
, ea
, oti
);
712 /* we should not get here anymore */
718 /* Destroy requests can be async always on the client, and we don't even really
719 * care about the return code since the client cannot do anything at all about
721 * When the MDS is unlinking a filename, it saves the file objects into a
722 * recovery llog, and these object records are cancelled when the OST reports
723 * they were destroyed and sync'd to disk (i.e. transaction committed).
724 * If the client dies, or the OST is down when the object should be destroyed,
725 * the records are not cancelled, and when the OST reconnects to the MDS next,
726 * it will retrieve the llog unlink logs and then sends the log cancellation
727 * cookies to the MDS after committing destroy transactions.
729 static int osc_destroy(const struct lu_env
*env
, struct obd_export
*exp
,
730 struct obdo
*oa
, struct lov_stripe_md
*ea
,
731 struct obd_trans_info
*oti
, struct obd_export
*md_export
)
733 struct client_obd
*cli
= &exp
->exp_obd
->u
.cli
;
734 struct ptlrpc_request
*req
;
735 struct ost_body
*body
;
740 CDEBUG(D_INFO
, "oa NULL\n");
744 count
= osc_resource_get_unused(exp
, oa
, &cancels
, LCK_PW
,
745 LDLM_FL_DISCARD_DATA
);
747 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_DESTROY
);
749 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
753 rc
= ldlm_prep_elc_req(exp
, req
, LUSTRE_OST_VERSION
, OST_DESTROY
,
756 ptlrpc_request_free(req
);
760 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
761 ptlrpc_at_set_req_timeout(req
);
763 if (oti
&& oa
->o_valid
& OBD_MD_FLCOOKIE
)
764 oa
->o_lcookie
= *oti
->oti_logcookies
;
765 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
767 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
769 ptlrpc_request_set_replen(req
);
771 /* If osc_destroy is for destroying the unlink orphan,
772 * sent from MDT to OST, which should not be blocked here,
773 * because the process might be triggered by ptlrpcd, and
774 * it is not good to block ptlrpcd thread (b=16006
776 if (!(oa
->o_flags
& OBD_FL_DELORPHAN
)) {
777 req
->rq_interpret_reply
= osc_destroy_interpret
;
778 if (!osc_can_send_destroy(cli
)) {
779 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
,
783 * Wait until the number of on-going destroy RPCs drops
784 * under max_rpc_in_flight
786 l_wait_event_exclusive(cli
->cl_destroy_waitq
,
787 osc_can_send_destroy(cli
), &lwi
);
791 /* Do not wait for response */
792 ptlrpcd_add_req(req
);
796 static void osc_announce_cached(struct client_obd
*cli
, struct obdo
*oa
,
799 u32 bits
= OBD_MD_FLBLOCKS
|OBD_MD_FLGRANT
;
801 LASSERT(!(oa
->o_valid
& bits
));
804 client_obd_list_lock(&cli
->cl_loi_list_lock
);
805 oa
->o_dirty
= cli
->cl_dirty
;
806 if (unlikely(cli
->cl_dirty
- cli
->cl_dirty_transit
>
807 cli
->cl_dirty_max
)) {
808 CERROR("dirty %lu - %lu > dirty_max %lu\n",
809 cli
->cl_dirty
, cli
->cl_dirty_transit
, cli
->cl_dirty_max
);
811 } else if (unlikely(atomic_read(&obd_dirty_pages
) -
812 atomic_read(&obd_dirty_transit_pages
) >
813 (long)(obd_max_dirty_pages
+ 1))) {
814 /* The atomic_read() allowing the atomic_inc() are
815 * not covered by a lock thus they may safely race and trip
816 * this CERROR() unless we add in a small fudge factor (+1).
818 CERROR("dirty %d - %d > system dirty_max %d\n",
819 atomic_read(&obd_dirty_pages
),
820 atomic_read(&obd_dirty_transit_pages
),
821 obd_max_dirty_pages
);
823 } else if (unlikely(cli
->cl_dirty_max
- cli
->cl_dirty
> 0x7fffffff)) {
824 CERROR("dirty %lu - dirty_max %lu too big???\n",
825 cli
->cl_dirty
, cli
->cl_dirty_max
);
828 long max_in_flight
= (cli
->cl_max_pages_per_rpc
<<
830 (cli
->cl_max_rpcs_in_flight
+ 1);
831 oa
->o_undirty
= max(cli
->cl_dirty_max
, max_in_flight
);
833 oa
->o_grant
= cli
->cl_avail_grant
+ cli
->cl_reserved_grant
;
834 oa
->o_dropped
= cli
->cl_lost_grant
;
835 cli
->cl_lost_grant
= 0;
836 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
837 CDEBUG(D_CACHE
, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
838 oa
->o_dirty
, oa
->o_undirty
, oa
->o_dropped
, oa
->o_grant
);
842 void osc_update_next_shrink(struct client_obd
*cli
)
844 cli
->cl_next_shrink_grant
=
845 cfs_time_shift(cli
->cl_grant_shrink_interval
);
846 CDEBUG(D_CACHE
, "next time %ld to shrink grant\n",
847 cli
->cl_next_shrink_grant
);
850 static void __osc_update_grant(struct client_obd
*cli
, u64 grant
)
852 client_obd_list_lock(&cli
->cl_loi_list_lock
);
853 cli
->cl_avail_grant
+= grant
;
854 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
857 static void osc_update_grant(struct client_obd
*cli
, struct ost_body
*body
)
859 if (body
->oa
.o_valid
& OBD_MD_FLGRANT
) {
860 CDEBUG(D_CACHE
, "got %llu extra grant\n", body
->oa
.o_grant
);
861 __osc_update_grant(cli
, body
->oa
.o_grant
);
865 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
866 u32 keylen
, void *key
, u32 vallen
,
867 void *val
, struct ptlrpc_request_set
*set
);
869 static int osc_shrink_grant_interpret(const struct lu_env
*env
,
870 struct ptlrpc_request
*req
,
873 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
874 struct obdo
*oa
= ((struct osc_brw_async_args
*)aa
)->aa_oa
;
875 struct ost_body
*body
;
878 __osc_update_grant(cli
, oa
->o_grant
);
882 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
884 osc_update_grant(cli
, body
);
886 kmem_cache_free(obdo_cachep
, oa
);
890 static void osc_shrink_grant_local(struct client_obd
*cli
, struct obdo
*oa
)
892 client_obd_list_lock(&cli
->cl_loi_list_lock
);
893 oa
->o_grant
= cli
->cl_avail_grant
/ 4;
894 cli
->cl_avail_grant
-= oa
->o_grant
;
895 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
896 if (!(oa
->o_valid
& OBD_MD_FLFLAGS
)) {
897 oa
->o_valid
|= OBD_MD_FLFLAGS
;
900 oa
->o_flags
|= OBD_FL_SHRINK_GRANT
;
901 osc_update_next_shrink(cli
);
904 /* Shrink the current grant, either from some large amount to enough for a
905 * full set of in-flight RPCs, or if we have already shrunk to that limit
906 * then to enough for a single RPC. This avoids keeping more grant than
907 * needed, and avoids shrinking the grant piecemeal.
909 static int osc_shrink_grant(struct client_obd
*cli
)
911 __u64 target_bytes
= (cli
->cl_max_rpcs_in_flight
+ 1) *
912 (cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
);
914 client_obd_list_lock(&cli
->cl_loi_list_lock
);
915 if (cli
->cl_avail_grant
<= target_bytes
)
916 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
917 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
919 return osc_shrink_grant_to_target(cli
, target_bytes
);
922 int osc_shrink_grant_to_target(struct client_obd
*cli
, __u64 target_bytes
)
925 struct ost_body
*body
;
927 client_obd_list_lock(&cli
->cl_loi_list_lock
);
928 /* Don't shrink if we are already above or below the desired limit
929 * We don't want to shrink below a single RPC, as that will negatively
930 * impact block allocation and long-term performance.
932 if (target_bytes
< cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
)
933 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
935 if (target_bytes
>= cli
->cl_avail_grant
) {
936 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
939 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
941 body
= kzalloc(sizeof(*body
), GFP_NOFS
);
945 osc_announce_cached(cli
, &body
->oa
, 0);
947 client_obd_list_lock(&cli
->cl_loi_list_lock
);
948 body
->oa
.o_grant
= cli
->cl_avail_grant
- target_bytes
;
949 cli
->cl_avail_grant
= target_bytes
;
950 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
951 if (!(body
->oa
.o_valid
& OBD_MD_FLFLAGS
)) {
952 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
953 body
->oa
.o_flags
= 0;
955 body
->oa
.o_flags
|= OBD_FL_SHRINK_GRANT
;
956 osc_update_next_shrink(cli
);
958 rc
= osc_set_info_async(NULL
, cli
->cl_import
->imp_obd
->obd_self_export
,
959 sizeof(KEY_GRANT_SHRINK
), KEY_GRANT_SHRINK
,
960 sizeof(*body
), body
, NULL
);
962 __osc_update_grant(cli
, body
->oa
.o_grant
);
967 static int osc_should_shrink_grant(struct client_obd
*client
)
969 unsigned long time
= cfs_time_current();
970 unsigned long next_shrink
= client
->cl_next_shrink_grant
;
972 if ((client
->cl_import
->imp_connect_data
.ocd_connect_flags
&
973 OBD_CONNECT_GRANT_SHRINK
) == 0)
976 if (cfs_time_aftereq(time
, next_shrink
- 5 * CFS_TICK
)) {
977 /* Get the current RPC size directly, instead of going via:
978 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
979 * Keep comment here so that it can be found by searching.
981 int brw_size
= client
->cl_max_pages_per_rpc
<< PAGE_CACHE_SHIFT
;
983 if (client
->cl_import
->imp_state
== LUSTRE_IMP_FULL
&&
984 client
->cl_avail_grant
> brw_size
)
987 osc_update_next_shrink(client
);
992 static int osc_grant_shrink_grant_cb(struct timeout_item
*item
, void *data
)
994 struct client_obd
*client
;
996 list_for_each_entry(client
, &item
->ti_obd_list
, cl_grant_shrink_list
) {
997 if (osc_should_shrink_grant(client
))
998 osc_shrink_grant(client
);
1003 static int osc_add_shrink_grant(struct client_obd
*client
)
1007 rc
= ptlrpc_add_timeout_client(client
->cl_grant_shrink_interval
,
1009 osc_grant_shrink_grant_cb
, NULL
,
1010 &client
->cl_grant_shrink_list
);
1012 CERROR("add grant client %s error %d\n",
1013 client
->cl_import
->imp_obd
->obd_name
, rc
);
1016 CDEBUG(D_CACHE
, "add grant client %s\n",
1017 client
->cl_import
->imp_obd
->obd_name
);
1018 osc_update_next_shrink(client
);
1022 static int osc_del_shrink_grant(struct client_obd
*client
)
1024 return ptlrpc_del_timeout_client(&client
->cl_grant_shrink_list
,
1028 static void osc_init_grant(struct client_obd
*cli
, struct obd_connect_data
*ocd
)
1031 * ocd_grant is the total grant amount we're expect to hold: if we've
1032 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1035 * race is tolerable here: if we're evicted, but imp_state already
1036 * left EVICTED state, then cl_dirty must be 0 already.
1038 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1039 if (cli
->cl_import
->imp_state
== LUSTRE_IMP_EVICTED
)
1040 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1042 cli
->cl_avail_grant
= ocd
->ocd_grant
- cli
->cl_dirty
;
1044 if (cli
->cl_avail_grant
< 0) {
1045 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046 cli
->cl_import
->imp_obd
->obd_name
, cli
->cl_avail_grant
,
1047 ocd
->ocd_grant
, cli
->cl_dirty
);
1048 /* workaround for servers which do not have the patch from
1051 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1054 /* determine the appropriate chunk size used by osc_extent. */
1055 cli
->cl_chunkbits
= max_t(int, PAGE_CACHE_SHIFT
, ocd
->ocd_blocksize
);
1056 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1058 CDEBUG(D_CACHE
, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1059 cli
->cl_import
->imp_obd
->obd_name
,
1060 cli
->cl_avail_grant
, cli
->cl_lost_grant
, cli
->cl_chunkbits
);
1062 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT_SHRINK
&&
1063 list_empty(&cli
->cl_grant_shrink_list
))
1064 osc_add_shrink_grant(cli
);
1067 /* We assume that the reason this OSC got a short read is because it read
1068 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1069 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1070 * this stripe never got written at or beyond this stripe offset yet.
1072 static void handle_short_read(int nob_read
, u32 page_count
,
1073 struct brw_page
**pga
)
1078 /* skip bytes read OK */
1079 while (nob_read
> 0) {
1080 LASSERT(page_count
> 0);
1082 if (pga
[i
]->count
> nob_read
) {
1083 /* EOF inside this page */
1084 ptr
= kmap(pga
[i
]->pg
) +
1085 (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1086 memset(ptr
+ nob_read
, 0, pga
[i
]->count
- nob_read
);
1093 nob_read
-= pga
[i
]->count
;
1098 /* zero remaining pages */
1099 while (page_count
-- > 0) {
1100 ptr
= kmap(pga
[i
]->pg
) + (pga
[i
]->off
& ~CFS_PAGE_MASK
);
1101 memset(ptr
, 0, pga
[i
]->count
);
1107 static int check_write_rcs(struct ptlrpc_request
*req
,
1108 int requested_nob
, int niocount
,
1109 u32 page_count
, struct brw_page
**pga
)
1114 remote_rcs
= req_capsule_server_sized_get(&req
->rq_pill
, &RMF_RCS
,
1115 sizeof(*remote_rcs
) *
1118 CDEBUG(D_INFO
, "Missing/short RC vector on BRW_WRITE reply\n");
1122 /* return error if any niobuf was in error */
1123 for (i
= 0; i
< niocount
; i
++) {
1124 if ((int)remote_rcs
[i
] < 0)
1125 return remote_rcs
[i
];
1127 if (remote_rcs
[i
] != 0) {
1128 CDEBUG(D_INFO
, "rc[%d] invalid (%d) req %p\n",
1129 i
, remote_rcs
[i
], req
);
1134 if (req
->rq_bulk
->bd_nob_transferred
!= requested_nob
) {
1135 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1136 req
->rq_bulk
->bd_nob_transferred
, requested_nob
);
1143 static inline int can_merge_pages(struct brw_page
*p1
, struct brw_page
*p2
)
1145 if (p1
->flag
!= p2
->flag
) {
1146 unsigned mask
= ~(OBD_BRW_FROM_GRANT
| OBD_BRW_NOCACHE
|
1147 OBD_BRW_SYNC
| OBD_BRW_ASYNC
|OBD_BRW_NOQUOTA
);
1149 /* warn if we try to combine flags that we don't know to be
1152 if (unlikely((p1
->flag
& mask
) != (p2
->flag
& mask
))) {
1153 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1154 p1
->flag
, p2
->flag
);
1159 return (p1
->off
+ p1
->count
== p2
->off
);
1162 static u32
osc_checksum_bulk(int nob
, u32 pg_count
,
1163 struct brw_page
**pga
, int opc
,
1164 enum cksum_type cksum_type
)
1168 struct cfs_crypto_hash_desc
*hdesc
;
1169 unsigned int bufsize
;
1171 unsigned char cfs_alg
= cksum_obd2cfs(cksum_type
);
1173 LASSERT(pg_count
> 0);
1175 hdesc
= cfs_crypto_hash_init(cfs_alg
, NULL
, 0);
1176 if (IS_ERR(hdesc
)) {
1177 CERROR("Unable to initialize checksum hash %s\n",
1178 cfs_crypto_hash_name(cfs_alg
));
1179 return PTR_ERR(hdesc
);
1182 while (nob
> 0 && pg_count
> 0) {
1183 int count
= pga
[i
]->count
> nob
? nob
: pga
[i
]->count
;
1185 /* corrupt the data before we compute the checksum, to
1186 * simulate an OST->client data error
1188 if (i
== 0 && opc
== OST_READ
&&
1189 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE
)) {
1190 unsigned char *ptr
= kmap(pga
[i
]->pg
);
1191 int off
= pga
[i
]->off
& ~CFS_PAGE_MASK
;
1193 memcpy(ptr
+ off
, "bad1", min(4, nob
));
1196 cfs_crypto_hash_update_page(hdesc
, pga
[i
]->pg
,
1197 pga
[i
]->off
& ~CFS_PAGE_MASK
,
1200 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1201 pga
[i
]->pg
, pga
[i
]->pg
->mapping
, pga
[i
]->pg
->index
,
1202 (long)pga
[i
]->pg
->flags
, page_count(pga
[i
]->pg
),
1203 page_private(pga
[i
]->pg
),
1204 (int)(pga
[i
]->off
& ~CFS_PAGE_MASK
));
1206 nob
-= pga
[i
]->count
;
1212 err
= cfs_crypto_hash_final(hdesc
, (unsigned char *)&cksum
, &bufsize
);
1215 cfs_crypto_hash_final(hdesc
, NULL
, NULL
);
1217 /* For sending we only compute the wrong checksum instead
1218 * of corrupting the data so it is still correct on a redo
1220 if (opc
== OST_WRITE
&& OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND
))
1226 static int osc_brw_prep_request(int cmd
, struct client_obd
*cli
,
1228 struct lov_stripe_md
*lsm
, u32 page_count
,
1229 struct brw_page
**pga
,
1230 struct ptlrpc_request
**reqp
,
1234 struct ptlrpc_request
*req
;
1235 struct ptlrpc_bulk_desc
*desc
;
1236 struct ost_body
*body
;
1237 struct obd_ioobj
*ioobj
;
1238 struct niobuf_remote
*niobuf
;
1239 int niocount
, i
, requested_nob
, opc
, rc
;
1240 struct osc_brw_async_args
*aa
;
1241 struct req_capsule
*pill
;
1242 struct brw_page
*pg_prev
;
1244 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ
))
1245 return -ENOMEM
; /* Recoverable */
1246 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2
))
1247 return -EINVAL
; /* Fatal */
1249 if ((cmd
& OBD_BRW_WRITE
) != 0) {
1251 req
= ptlrpc_request_alloc_pool(cli
->cl_import
,
1253 &RQF_OST_BRW_WRITE
);
1256 req
= ptlrpc_request_alloc(cli
->cl_import
, &RQF_OST_BRW_READ
);
1261 for (niocount
= i
= 1; i
< page_count
; i
++) {
1262 if (!can_merge_pages(pga
[i
- 1], pga
[i
]))
1266 pill
= &req
->rq_pill
;
1267 req_capsule_set_size(pill
, &RMF_OBD_IOOBJ
, RCL_CLIENT
,
1269 req_capsule_set_size(pill
, &RMF_NIOBUF_REMOTE
, RCL_CLIENT
,
1270 niocount
* sizeof(*niobuf
));
1272 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, opc
);
1274 ptlrpc_request_free(req
);
1277 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
1278 ptlrpc_at_set_req_timeout(req
);
1279 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1282 req
->rq_no_retry_einprogress
= 1;
1284 desc
= ptlrpc_prep_bulk_imp(req
, page_count
,
1285 cli
->cl_import
->imp_connect_data
.ocd_brw_size
>> LNET_MTU_BITS
,
1286 opc
== OST_WRITE
? BULK_GET_SOURCE
: BULK_PUT_SINK
,
1293 /* NB request now owns desc and will free it when it gets freed */
1295 body
= req_capsule_client_get(pill
, &RMF_OST_BODY
);
1296 ioobj
= req_capsule_client_get(pill
, &RMF_OBD_IOOBJ
);
1297 niobuf
= req_capsule_client_get(pill
, &RMF_NIOBUF_REMOTE
);
1298 LASSERT(body
&& ioobj
&& niobuf
);
1300 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
1302 obdo_to_ioobj(oa
, ioobj
);
1303 ioobj
->ioo_bufcnt
= niocount
;
1304 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1305 * that might be send for this request. The actual number is decided
1306 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1307 * "max - 1" for old client compatibility sending "0", and also so the
1308 * the actual maximum is a power-of-two number, not one less. LU-1431
1310 ioobj_max_brw_set(ioobj
, desc
->bd_md_max_brw
);
1311 LASSERT(page_count
> 0);
1313 for (requested_nob
= i
= 0; i
< page_count
; i
++, niobuf
++) {
1314 struct brw_page
*pg
= pga
[i
];
1315 int poff
= pg
->off
& ~CFS_PAGE_MASK
;
1317 LASSERT(pg
->count
> 0);
1318 /* make sure there is no gap in the middle of page array */
1319 LASSERTF(page_count
== 1 ||
1320 (ergo(i
== 0, poff
+ pg
->count
== PAGE_CACHE_SIZE
) &&
1321 ergo(i
> 0 && i
< page_count
- 1,
1322 poff
== 0 && pg
->count
== PAGE_CACHE_SIZE
) &&
1323 ergo(i
== page_count
- 1, poff
== 0)),
1324 "i: %d/%d pg: %p off: %llu, count: %u\n",
1325 i
, page_count
, pg
, pg
->off
, pg
->count
);
1326 LASSERTF(i
== 0 || pg
->off
> pg_prev
->off
,
1327 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1329 pg
->pg
, page_private(pg
->pg
), pg
->pg
->index
, pg
->off
,
1330 pg_prev
->pg
, page_private(pg_prev
->pg
),
1331 pg_prev
->pg
->index
, pg_prev
->off
);
1332 LASSERT((pga
[0]->flag
& OBD_BRW_SRVLOCK
) ==
1333 (pg
->flag
& OBD_BRW_SRVLOCK
));
1335 ptlrpc_prep_bulk_page_pin(desc
, pg
->pg
, poff
, pg
->count
);
1336 requested_nob
+= pg
->count
;
1338 if (i
> 0 && can_merge_pages(pg_prev
, pg
)) {
1340 niobuf
->len
+= pg
->count
;
1342 niobuf
->offset
= pg
->off
;
1343 niobuf
->len
= pg
->count
;
1344 niobuf
->flags
= pg
->flag
;
1349 LASSERTF((void *)(niobuf
- niocount
) ==
1350 req_capsule_client_get(&req
->rq_pill
, &RMF_NIOBUF_REMOTE
),
1351 "want %p - real %p\n", req_capsule_client_get(&req
->rq_pill
,
1352 &RMF_NIOBUF_REMOTE
), (void *)(niobuf
- niocount
));
1354 osc_announce_cached(cli
, &body
->oa
, opc
== OST_WRITE
? requested_nob
:0);
1356 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1357 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
1358 body
->oa
.o_flags
= 0;
1360 body
->oa
.o_flags
|= OBD_FL_RECOV_RESEND
;
1363 if (osc_should_shrink_grant(cli
))
1364 osc_shrink_grant_local(cli
, &body
->oa
);
1366 /* size[REQ_REC_OFF] still sizeof (*body) */
1367 if (opc
== OST_WRITE
) {
1368 if (cli
->cl_checksum
&&
1369 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1370 /* store cl_cksum_type in a local variable since
1371 * it can be changed via lprocfs
1373 enum cksum_type cksum_type
= cli
->cl_cksum_type
;
1375 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1376 oa
->o_flags
&= OBD_FL_LOCAL_MASK
;
1377 body
->oa
.o_flags
= 0;
1379 body
->oa
.o_flags
|= cksum_type_pack(cksum_type
);
1380 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1381 body
->oa
.o_cksum
= osc_checksum_bulk(requested_nob
,
1385 CDEBUG(D_PAGE
, "checksum at write origin: %x\n",
1387 /* save this in 'oa', too, for later checking */
1388 oa
->o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1389 oa
->o_flags
|= cksum_type_pack(cksum_type
);
1391 /* clear out the checksum flag, in case this is a
1392 * resend but cl_checksum is no longer set. b=11238
1394 oa
->o_valid
&= ~OBD_MD_FLCKSUM
;
1396 oa
->o_cksum
= body
->oa
.o_cksum
;
1397 /* 1 RC per niobuf */
1398 req_capsule_set_size(pill
, &RMF_RCS
, RCL_SERVER
,
1399 sizeof(__u32
) * niocount
);
1401 if (cli
->cl_checksum
&&
1402 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1403 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0)
1404 body
->oa
.o_flags
= 0;
1405 body
->oa
.o_flags
|= cksum_type_pack(cli
->cl_cksum_type
);
1406 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1409 ptlrpc_request_set_replen(req
);
1411 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1412 aa
= ptlrpc_req_async_args(req
);
1414 aa
->aa_requested_nob
= requested_nob
;
1415 aa
->aa_nio_count
= niocount
;
1416 aa
->aa_page_count
= page_count
;
1420 INIT_LIST_HEAD(&aa
->aa_oaps
);
1426 ptlrpc_req_finished(req
);
1430 static int check_write_checksum(struct obdo
*oa
, const lnet_process_id_t
*peer
,
1431 __u32 client_cksum
, __u32 server_cksum
, int nob
,
1432 u32 page_count
, struct brw_page
**pga
,
1433 enum cksum_type client_cksum_type
)
1437 enum cksum_type cksum_type
;
1439 if (server_cksum
== client_cksum
) {
1440 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1444 cksum_type
= cksum_type_unpack(oa
->o_valid
& OBD_MD_FLFLAGS
?
1446 new_cksum
= osc_checksum_bulk(nob
, page_count
, pga
, OST_WRITE
,
1449 if (cksum_type
!= client_cksum_type
)
1450 msg
= "the server did not use the checksum type specified in the original request - likely a protocol problem"
1452 else if (new_cksum
== server_cksum
)
1453 msg
= "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1455 else if (new_cksum
== client_cksum
)
1456 msg
= "changed in transit before arrival at OST";
1458 msg
= "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1461 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1462 " object "DOSTID
" extent [%llu-%llu]\n",
1463 msg
, libcfs_nid2str(peer
->nid
),
1464 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_seq
: (__u64
)0,
1465 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_oid
: 0,
1466 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_ver
: 0,
1467 POSTID(&oa
->o_oi
), pga
[0]->off
,
1468 pga
[page_count
-1]->off
+ pga
[page_count
-1]->count
- 1);
1469 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1470 client_cksum
, client_cksum_type
,
1471 server_cksum
, cksum_type
, new_cksum
);
1475 /* Note rc enters this function as number of bytes transferred */
1476 static int osc_brw_fini_request(struct ptlrpc_request
*req
, int rc
)
1478 struct osc_brw_async_args
*aa
= (void *)&req
->rq_async_args
;
1479 const lnet_process_id_t
*peer
=
1480 &req
->rq_import
->imp_connection
->c_peer
;
1481 struct client_obd
*cli
= aa
->aa_cli
;
1482 struct ost_body
*body
;
1483 __u32 client_cksum
= 0;
1485 if (rc
< 0 && rc
!= -EDQUOT
) {
1486 DEBUG_REQ(D_INFO
, req
, "Failed request with rc = %d\n", rc
);
1490 LASSERTF(req
->rq_repmsg
, "rc = %d\n", rc
);
1491 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
1493 DEBUG_REQ(D_INFO
, req
, "Can't unpack body\n");
1497 /* set/clear over quota flag for a uid/gid */
1498 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
&&
1499 body
->oa
.o_valid
& (OBD_MD_FLUSRQUOTA
| OBD_MD_FLGRPQUOTA
)) {
1500 unsigned int qid
[MAXQUOTAS
] = { body
->oa
.o_uid
, body
->oa
.o_gid
};
1502 CDEBUG(D_QUOTA
, "setdq for [%u %u] with valid %#llx, flags %x\n",
1503 body
->oa
.o_uid
, body
->oa
.o_gid
, body
->oa
.o_valid
,
1505 osc_quota_setdq(cli
, qid
, body
->oa
.o_valid
, body
->oa
.o_flags
);
1508 osc_update_grant(cli
, body
);
1513 if (aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
)
1514 client_cksum
= aa
->aa_oa
->o_cksum
; /* save for later */
1516 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
) {
1518 CERROR("Unexpected +ve rc %d\n", rc
);
1521 LASSERT(req
->rq_bulk
->bd_nob
== aa
->aa_requested_nob
);
1523 if (sptlrpc_cli_unwrap_bulk_write(req
, req
->rq_bulk
))
1526 if ((aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
) && client_cksum
&&
1527 check_write_checksum(&body
->oa
, peer
, client_cksum
,
1528 body
->oa
.o_cksum
, aa
->aa_requested_nob
,
1529 aa
->aa_page_count
, aa
->aa_ppga
,
1530 cksum_type_unpack(aa
->aa_oa
->o_flags
)))
1533 rc
= check_write_rcs(req
, aa
->aa_requested_nob
,
1535 aa
->aa_page_count
, aa
->aa_ppga
);
1539 /* The rest of this function executes only for OST_READs */
1541 /* if unwrap_bulk failed, return -EAGAIN to retry */
1542 rc
= sptlrpc_cli_unwrap_bulk_read(req
, req
->rq_bulk
, rc
);
1548 if (rc
> aa
->aa_requested_nob
) {
1549 CERROR("Unexpected rc %d (%d requested)\n", rc
,
1550 aa
->aa_requested_nob
);
1554 if (rc
!= req
->rq_bulk
->bd_nob_transferred
) {
1555 CERROR("Unexpected rc %d (%d transferred)\n",
1556 rc
, req
->rq_bulk
->bd_nob_transferred
);
1560 if (rc
< aa
->aa_requested_nob
)
1561 handle_short_read(rc
, aa
->aa_page_count
, aa
->aa_ppga
);
1563 if (body
->oa
.o_valid
& OBD_MD_FLCKSUM
) {
1564 static int cksum_counter
;
1565 __u32 server_cksum
= body
->oa
.o_cksum
;
1568 enum cksum_type cksum_type
;
1570 cksum_type
= cksum_type_unpack(body
->oa
.o_valid
&OBD_MD_FLFLAGS
?
1571 body
->oa
.o_flags
: 0);
1572 client_cksum
= osc_checksum_bulk(rc
, aa
->aa_page_count
,
1573 aa
->aa_ppga
, OST_READ
,
1576 if (peer
->nid
!= req
->rq_bulk
->bd_sender
) {
1578 router
= libcfs_nid2str(req
->rq_bulk
->bd_sender
);
1581 if (server_cksum
!= client_cksum
) {
1582 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID
" object " DOSTID
" extent [%llu-%llu]\n",
1583 req
->rq_import
->imp_obd
->obd_name
,
1584 libcfs_nid2str(peer
->nid
),
1586 body
->oa
.o_valid
& OBD_MD_FLFID
?
1587 body
->oa
.o_parent_seq
: (__u64
)0,
1588 body
->oa
.o_valid
& OBD_MD_FLFID
?
1589 body
->oa
.o_parent_oid
: 0,
1590 body
->oa
.o_valid
& OBD_MD_FLFID
?
1591 body
->oa
.o_parent_ver
: 0,
1592 POSTID(&body
->oa
.o_oi
),
1593 aa
->aa_ppga
[0]->off
,
1594 aa
->aa_ppga
[aa
->aa_page_count
-1]->off
+
1595 aa
->aa_ppga
[aa
->aa_page_count
-1]->count
-
1597 CERROR("client %x, server %x, cksum_type %x\n",
1598 client_cksum
, server_cksum
, cksum_type
);
1600 aa
->aa_oa
->o_cksum
= client_cksum
;
1604 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1607 } else if (unlikely(client_cksum
)) {
1608 static int cksum_missed
;
1611 if ((cksum_missed
& (-cksum_missed
)) == cksum_missed
)
1612 CERROR("Checksum %u requested from %s but not sent\n",
1613 cksum_missed
, libcfs_nid2str(peer
->nid
));
1619 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
1620 aa
->aa_oa
, &body
->oa
);
1625 static int osc_brw_redo_request(struct ptlrpc_request
*request
,
1626 struct osc_brw_async_args
*aa
, int rc
)
1628 struct ptlrpc_request
*new_req
;
1629 struct osc_brw_async_args
*new_aa
;
1630 struct osc_async_page
*oap
;
1632 DEBUG_REQ(rc
== -EINPROGRESS
? D_RPCTRACE
: D_ERROR
, request
,
1633 "redo for recoverable error %d", rc
);
1635 rc
= osc_brw_prep_request(lustre_msg_get_opc(request
->rq_reqmsg
) ==
1636 OST_WRITE
? OBD_BRW_WRITE
: OBD_BRW_READ
,
1637 aa
->aa_cli
, aa
->aa_oa
,
1638 NULL
/* lsm unused by osc currently */,
1639 aa
->aa_page_count
, aa
->aa_ppga
,
1644 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
1645 if (oap
->oap_request
) {
1646 LASSERTF(request
== oap
->oap_request
,
1647 "request %p != oap_request %p\n",
1648 request
, oap
->oap_request
);
1649 if (oap
->oap_interrupted
) {
1650 ptlrpc_req_finished(new_req
);
1655 /* New request takes over pga and oaps from old request.
1656 * Note that copying a list_head doesn't work, need to move it...
1659 new_req
->rq_interpret_reply
= request
->rq_interpret_reply
;
1660 new_req
->rq_async_args
= request
->rq_async_args
;
1661 /* cap resend delay to the current request timeout, this is similar to
1662 * what ptlrpc does (see after_reply())
1664 if (aa
->aa_resends
> new_req
->rq_timeout
)
1665 new_req
->rq_sent
= ktime_get_real_seconds() + new_req
->rq_timeout
;
1667 new_req
->rq_sent
= ktime_get_real_seconds() + aa
->aa_resends
;
1668 new_req
->rq_generation_set
= 1;
1669 new_req
->rq_import_generation
= request
->rq_import_generation
;
1671 new_aa
= ptlrpc_req_async_args(new_req
);
1673 INIT_LIST_HEAD(&new_aa
->aa_oaps
);
1674 list_splice_init(&aa
->aa_oaps
, &new_aa
->aa_oaps
);
1675 INIT_LIST_HEAD(&new_aa
->aa_exts
);
1676 list_splice_init(&aa
->aa_exts
, &new_aa
->aa_exts
);
1677 new_aa
->aa_resends
= aa
->aa_resends
;
1679 list_for_each_entry(oap
, &new_aa
->aa_oaps
, oap_rpc_item
) {
1680 if (oap
->oap_request
) {
1681 ptlrpc_req_finished(oap
->oap_request
);
1682 oap
->oap_request
= ptlrpc_request_addref(new_req
);
1686 /* XXX: This code will run into problem if we're going to support
1687 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1688 * and wait for all of them to be finished. We should inherit request
1689 * set from old request.
1691 ptlrpcd_add_req(new_req
);
1693 DEBUG_REQ(D_INFO
, new_req
, "new request");
1698 * ugh, we want disk allocation on the target to happen in offset order. we'll
1699 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1700 * fine for our small page arrays and doesn't require allocation. its an
1701 * insertion sort that swaps elements that are strides apart, shrinking the
1702 * stride down until its '1' and the array is sorted.
1704 static void sort_brw_pages(struct brw_page
**array
, int num
)
1707 struct brw_page
*tmp
;
1711 for (stride
= 1; stride
< num
; stride
= (stride
* 3) + 1)
1716 for (i
= stride
; i
< num
; i
++) {
1719 while (j
>= stride
&& array
[j
- stride
]->off
> tmp
->off
) {
1720 array
[j
] = array
[j
- stride
];
1725 } while (stride
> 1);
1728 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
)
1734 static int brw_interpret(const struct lu_env
*env
,
1735 struct ptlrpc_request
*req
, void *data
, int rc
)
1737 struct osc_brw_async_args
*aa
= data
;
1738 struct osc_extent
*ext
;
1739 struct osc_extent
*tmp
;
1740 struct cl_object
*obj
= NULL
;
1741 struct client_obd
*cli
= aa
->aa_cli
;
1743 rc
= osc_brw_fini_request(req
, rc
);
1744 CDEBUG(D_INODE
, "request %p aa %p rc %d\n", req
, aa
, rc
);
1745 /* When server return -EINPROGRESS, client should always retry
1746 * regardless of the number of times the bulk was resent already.
1748 if (osc_recoverable_error(rc
)) {
1749 if (req
->rq_import_generation
!=
1750 req
->rq_import
->imp_generation
) {
1751 CDEBUG(D_HA
, "%s: resend cross eviction for object: " DOSTID
", rc = %d.\n",
1752 req
->rq_import
->imp_obd
->obd_name
,
1753 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1754 } else if (rc
== -EINPROGRESS
||
1755 client_should_resend(aa
->aa_resends
, aa
->aa_cli
)) {
1756 rc
= osc_brw_redo_request(req
, aa
, rc
);
1758 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1759 req
->rq_import
->imp_obd
->obd_name
,
1760 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1765 else if (rc
== -EAGAIN
|| rc
== -EINPROGRESS
)
1769 list_for_each_entry_safe(ext
, tmp
, &aa
->aa_exts
, oe_link
) {
1770 if (!obj
&& rc
== 0) {
1771 obj
= osc2cl(ext
->oe_obj
);
1775 list_del_init(&ext
->oe_link
);
1776 osc_extent_finish(env
, ext
, 1, rc
);
1778 LASSERT(list_empty(&aa
->aa_exts
));
1779 LASSERT(list_empty(&aa
->aa_oaps
));
1782 struct obdo
*oa
= aa
->aa_oa
;
1783 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
1784 unsigned long valid
= 0;
1787 if (oa
->o_valid
& OBD_MD_FLBLOCKS
) {
1788 attr
->cat_blocks
= oa
->o_blocks
;
1789 valid
|= CAT_BLOCKS
;
1791 if (oa
->o_valid
& OBD_MD_FLMTIME
) {
1792 attr
->cat_mtime
= oa
->o_mtime
;
1795 if (oa
->o_valid
& OBD_MD_FLATIME
) {
1796 attr
->cat_atime
= oa
->o_atime
;
1799 if (oa
->o_valid
& OBD_MD_FLCTIME
) {
1800 attr
->cat_ctime
= oa
->o_ctime
;
1804 cl_object_attr_lock(obj
);
1805 cl_object_attr_set(env
, obj
, attr
, valid
);
1806 cl_object_attr_unlock(obj
);
1808 cl_object_put(env
, obj
);
1810 kmem_cache_free(obdo_cachep
, aa
->aa_oa
);
1812 cl_req_completion(env
, aa
->aa_clerq
, rc
< 0 ? rc
:
1813 req
->rq_bulk
->bd_nob_transferred
);
1814 osc_release_ppga(aa
->aa_ppga
, aa
->aa_page_count
);
1815 ptlrpc_lprocfs_brw(req
, req
->rq_bulk
->bd_nob_transferred
);
1817 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1818 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1819 * is called so we know whether to go to sync BRWs or wait for more
1822 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
)
1823 cli
->cl_w_in_flight
--;
1825 cli
->cl_r_in_flight
--;
1826 osc_wake_cache_waiters(cli
);
1827 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
1829 osc_io_unplug(env
, cli
, NULL
);
1834 * Build an RPC by the list of extent @ext_list. The caller must ensure
1835 * that the total pages in this list are NOT over max pages per RPC.
1836 * Extents in the list must be in OES_RPC state.
1838 int osc_build_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
1839 struct list_head
*ext_list
, int cmd
)
1841 struct ptlrpc_request
*req
= NULL
;
1842 struct osc_extent
*ext
;
1843 struct brw_page
**pga
= NULL
;
1844 struct osc_brw_async_args
*aa
= NULL
;
1845 struct obdo
*oa
= NULL
;
1846 struct osc_async_page
*oap
;
1847 struct osc_async_page
*tmp
;
1848 struct cl_req
*clerq
= NULL
;
1849 enum cl_req_type crt
= (cmd
& OBD_BRW_WRITE
) ? CRT_WRITE
: CRT_READ
;
1850 struct ldlm_lock
*lock
= NULL
;
1851 struct cl_req_attr
*crattr
= NULL
;
1852 u64 starting_offset
= OBD_OBJECT_EOF
;
1853 u64 ending_offset
= 0;
1859 struct ost_body
*body
;
1860 LIST_HEAD(rpc_list
);
1862 LASSERT(!list_empty(ext_list
));
1864 /* add pages into rpc_list to build BRW rpc */
1865 list_for_each_entry(ext
, ext_list
, oe_link
) {
1866 LASSERT(ext
->oe_state
== OES_RPC
);
1867 mem_tight
|= ext
->oe_memalloc
;
1868 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1870 list_add_tail(&oap
->oap_rpc_item
, &rpc_list
);
1871 if (starting_offset
> oap
->oap_obj_off
)
1872 starting_offset
= oap
->oap_obj_off
;
1874 LASSERT(oap
->oap_page_off
== 0);
1875 if (ending_offset
< oap
->oap_obj_off
+ oap
->oap_count
)
1876 ending_offset
= oap
->oap_obj_off
+
1879 LASSERT(oap
->oap_page_off
+ oap
->oap_count
==
1885 mpflag
= cfs_memory_pressure_get_and_set();
1887 crattr
= kzalloc(sizeof(*crattr
), GFP_NOFS
);
1893 pga
= kcalloc(page_count
, sizeof(*pga
), GFP_NOFS
);
1899 oa
= kmem_cache_zalloc(obdo_cachep
, GFP_NOFS
);
1906 list_for_each_entry(oap
, &rpc_list
, oap_rpc_item
) {
1907 struct cl_page
*page
= oap2cl_page(oap
);
1910 clerq
= cl_req_alloc(env
, page
, crt
,
1911 1 /* only 1-object rpcs for now */);
1912 if (IS_ERR(clerq
)) {
1913 rc
= PTR_ERR(clerq
);
1916 lock
= oap
->oap_ldlm_lock
;
1919 oap
->oap_brw_flags
|= OBD_BRW_MEMALLOC
;
1920 pga
[i
] = &oap
->oap_brw_page
;
1921 pga
[i
]->off
= oap
->oap_obj_off
+ oap
->oap_page_off
;
1922 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1923 pga
[i
]->pg
, page_index(oap
->oap_page
), oap
,
1926 cl_req_page_add(env
, clerq
, page
);
1929 /* always get the data for the obdo for the rpc */
1931 crattr
->cra_oa
= oa
;
1932 cl_req_attr_set(env
, clerq
, crattr
, ~0ULL);
1934 oa
->o_handle
= lock
->l_remote_handle
;
1935 oa
->o_valid
|= OBD_MD_FLHANDLE
;
1938 rc
= cl_req_prep(env
, clerq
);
1940 CERROR("cl_req_prep failed: %d\n", rc
);
1944 sort_brw_pages(pga
, page_count
);
1945 rc
= osc_brw_prep_request(cmd
, cli
, oa
, NULL
, page_count
,
1948 CERROR("prep_req failed: %d\n", rc
);
1952 req
->rq_interpret_reply
= brw_interpret
;
1955 req
->rq_memalloc
= 1;
1957 /* Need to update the timestamps after the request is built in case
1958 * we race with setattr (locally or in queue at OST). If OST gets
1959 * later setattr before earlier BRW (as determined by the request xid),
1960 * the OST will not use BRW timestamps. Sadly, there is no obvious
1961 * way to do this in a single call. bug 10150
1963 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
1964 crattr
->cra_oa
= &body
->oa
;
1965 cl_req_attr_set(env
, clerq
, crattr
,
1966 OBD_MD_FLMTIME
|OBD_MD_FLCTIME
|OBD_MD_FLATIME
);
1968 lustre_msg_set_jobid(req
->rq_reqmsg
, crattr
->cra_jobid
);
1970 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1971 aa
= ptlrpc_req_async_args(req
);
1972 INIT_LIST_HEAD(&aa
->aa_oaps
);
1973 list_splice_init(&rpc_list
, &aa
->aa_oaps
);
1974 INIT_LIST_HEAD(&aa
->aa_exts
);
1975 list_splice_init(ext_list
, &aa
->aa_exts
);
1976 aa
->aa_clerq
= clerq
;
1978 /* queued sync pages can be torn down while the pages
1979 * were between the pending list and the rpc
1982 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
1983 /* only one oap gets a request reference */
1986 if (oap
->oap_interrupted
&& !req
->rq_intr
) {
1987 CDEBUG(D_INODE
, "oap %p in req %p interrupted\n",
1989 ptlrpc_mark_interrupted(req
);
1993 tmp
->oap_request
= ptlrpc_request_addref(req
);
1995 client_obd_list_lock(&cli
->cl_loi_list_lock
);
1996 starting_offset
>>= PAGE_CACHE_SHIFT
;
1997 if (cmd
== OBD_BRW_READ
) {
1998 cli
->cl_r_in_flight
++;
1999 lprocfs_oh_tally_log2(&cli
->cl_read_page_hist
, page_count
);
2000 lprocfs_oh_tally(&cli
->cl_read_rpc_hist
, cli
->cl_r_in_flight
);
2001 lprocfs_oh_tally_log2(&cli
->cl_read_offset_hist
,
2002 starting_offset
+ 1);
2004 cli
->cl_w_in_flight
++;
2005 lprocfs_oh_tally_log2(&cli
->cl_write_page_hist
, page_count
);
2006 lprocfs_oh_tally(&cli
->cl_write_rpc_hist
, cli
->cl_w_in_flight
);
2007 lprocfs_oh_tally_log2(&cli
->cl_write_offset_hist
,
2008 starting_offset
+ 1);
2010 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
2012 DEBUG_REQ(D_INODE
, req
, "%d pages, aa %p. now %dr/%dw in flight",
2013 page_count
, aa
, cli
->cl_r_in_flight
,
2014 cli
->cl_w_in_flight
);
2016 ptlrpcd_add_req(req
);
2021 cfs_memory_pressure_restore(mpflag
);
2029 kmem_cache_free(obdo_cachep
, oa
);
2031 /* this should happen rarely and is pretty bad, it makes the
2032 * pending list not follow the dirty order
2034 while (!list_empty(ext_list
)) {
2035 ext
= list_entry(ext_list
->next
, struct osc_extent
,
2037 list_del_init(&ext
->oe_link
);
2038 osc_extent_finish(env
, ext
, 0, rc
);
2040 if (clerq
&& !IS_ERR(clerq
))
2041 cl_req_completion(env
, clerq
, rc
);
2046 static int osc_set_lock_data_with_check(struct ldlm_lock
*lock
,
2047 struct ldlm_enqueue_info
*einfo
)
2049 void *data
= einfo
->ei_cbdata
;
2052 LASSERT(lock
->l_blocking_ast
== einfo
->ei_cb_bl
);
2053 LASSERT(lock
->l_resource
->lr_type
== einfo
->ei_type
);
2054 LASSERT(lock
->l_completion_ast
== einfo
->ei_cb_cp
);
2055 LASSERT(lock
->l_glimpse_ast
== einfo
->ei_cb_gl
);
2057 lock_res_and_lock(lock
);
2058 spin_lock(&osc_ast_guard
);
2060 if (!lock
->l_ast_data
)
2061 lock
->l_ast_data
= data
;
2062 if (lock
->l_ast_data
== data
)
2065 spin_unlock(&osc_ast_guard
);
2066 unlock_res_and_lock(lock
);
2071 static int osc_set_data_with_check(struct lustre_handle
*lockh
,
2072 struct ldlm_enqueue_info
*einfo
)
2074 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
2078 set
= osc_set_lock_data_with_check(lock
, einfo
);
2079 LDLM_LOCK_PUT(lock
);
2081 CERROR("lockh %p, data %p - client evicted?\n",
2082 lockh
, einfo
->ei_cbdata
);
2086 /* find any ldlm lock of the inode in osc
2091 static int osc_find_cbdata(struct obd_export
*exp
, struct lov_stripe_md
*lsm
,
2092 ldlm_iterator_t replace
, void *data
)
2094 struct ldlm_res_id res_id
;
2095 struct obd_device
*obd
= class_exp2obd(exp
);
2098 ostid_build_res_name(&lsm
->lsm_oi
, &res_id
);
2099 rc
= ldlm_resource_iterate(obd
->obd_namespace
, &res_id
, replace
, data
);
2100 if (rc
== LDLM_ITER_STOP
)
2102 if (rc
== LDLM_ITER_CONTINUE
)
2107 static int osc_enqueue_fini(struct ptlrpc_request
*req
, struct ost_lvb
*lvb
,
2108 obd_enqueue_update_f upcall
, void *cookie
,
2109 __u64
*flags
, int agl
, int rc
)
2111 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2114 /* The request was created before ldlm_cli_enqueue call. */
2115 if (rc
== ELDLM_LOCK_ABORTED
) {
2116 struct ldlm_reply
*rep
;
2118 rep
= req_capsule_server_get(&req
->rq_pill
,
2121 rep
->lock_policy_res1
=
2122 ptlrpc_status_ntoh(rep
->lock_policy_res1
);
2123 if (rep
->lock_policy_res1
)
2124 rc
= rep
->lock_policy_res1
;
2128 if ((intent
!= 0 && rc
== ELDLM_LOCK_ABORTED
&& agl
== 0) ||
2130 *flags
|= LDLM_FL_LVB_READY
;
2131 CDEBUG(D_INODE
, "got kms %llu blocks %llu mtime %llu\n",
2132 lvb
->lvb_size
, lvb
->lvb_blocks
, lvb
->lvb_mtime
);
2135 /* Call the update callback. */
2136 rc
= (*upcall
)(cookie
, rc
);
2140 static int osc_enqueue_interpret(const struct lu_env
*env
,
2141 struct ptlrpc_request
*req
,
2142 struct osc_enqueue_args
*aa
, int rc
)
2144 struct ldlm_lock
*lock
;
2145 struct lustre_handle handle
;
2147 struct ost_lvb
*lvb
;
2149 __u64
*flags
= aa
->oa_flags
;
2151 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2152 * might be freed anytime after lock upcall has been called.
2154 lustre_handle_copy(&handle
, aa
->oa_lockh
);
2155 mode
= aa
->oa_ei
->ei_mode
;
2157 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2160 lock
= ldlm_handle2lock(&handle
);
2162 /* Take an additional reference so that a blocking AST that
2163 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2164 * to arrive after an upcall has been executed by
2165 * osc_enqueue_fini().
2167 ldlm_lock_addref(&handle
, mode
);
2169 /* Let CP AST to grant the lock first. */
2170 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 1);
2172 if (aa
->oa_agl
&& rc
== ELDLM_LOCK_ABORTED
) {
2177 lvb_len
= sizeof(*aa
->oa_lvb
);
2180 /* Complete obtaining the lock procedure. */
2181 rc
= ldlm_cli_enqueue_fini(aa
->oa_exp
, req
, aa
->oa_ei
->ei_type
, 1,
2182 mode
, flags
, lvb
, lvb_len
, &handle
, rc
);
2183 /* Complete osc stuff. */
2184 rc
= osc_enqueue_fini(req
, aa
->oa_lvb
, aa
->oa_upcall
, aa
->oa_cookie
,
2185 flags
, aa
->oa_agl
, rc
);
2187 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE
, 10);
2189 /* Release the lock for async request. */
2190 if (lustre_handle_is_used(&handle
) && rc
== ELDLM_OK
)
2192 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2193 * not already released by
2194 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2196 ldlm_lock_decref(&handle
, mode
);
2198 LASSERTF(lock
, "lockh %p, req %p, aa %p - client evicted?\n",
2199 aa
->oa_lockh
, req
, aa
);
2200 ldlm_lock_decref(&handle
, mode
);
2201 LDLM_LOCK_PUT(lock
);
2205 struct ptlrpc_request_set
*PTLRPCD_SET
= (void *)1;
2207 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2208 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2209 * other synchronous requests, however keeping some locks and trying to obtain
2210 * others may take a considerable amount of time in a case of ost failure; and
2211 * when other sync requests do not get released lock from a client, the client
2212 * is excluded from the cluster -- such scenarious make the life difficult, so
2213 * release locks just after they are obtained.
2215 int osc_enqueue_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2216 __u64
*flags
, ldlm_policy_data_t
*policy
,
2217 struct ost_lvb
*lvb
, int kms_valid
,
2218 obd_enqueue_update_f upcall
, void *cookie
,
2219 struct ldlm_enqueue_info
*einfo
,
2220 struct lustre_handle
*lockh
,
2221 struct ptlrpc_request_set
*rqset
, int async
, int agl
)
2223 struct obd_device
*obd
= exp
->exp_obd
;
2224 struct ptlrpc_request
*req
= NULL
;
2225 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2226 __u64 match_lvb
= (agl
!= 0 ? 0 : LDLM_FL_LVB_READY
);
2227 enum ldlm_mode mode
;
2230 /* Filesystem lock extents are extended to page boundaries so that
2231 * dealing with the page cache is a little smoother.
2233 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2234 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2237 * kms is not valid when either object is completely fresh (so that no
2238 * locks are cached), or object was evicted. In the latter case cached
2239 * lock cannot be used, because it would prime inode state with
2240 * potentially stale LVB.
2245 /* Next, search for already existing extent locks that will cover us */
2246 /* If we're trying to read, we also search for an existing PW lock. The
2247 * VFS and page cache already protect us locally, so lots of readers/
2248 * writers can share a single PW lock.
2250 * There are problems with conversion deadlocks, so instead of
2251 * converting a read lock to a write lock, we'll just enqueue a new
2254 * At some point we should cancel the read lock instead of making them
2255 * send us a blocking callback, but there are problems with canceling
2256 * locks out from other users right now, too.
2258 mode
= einfo
->ei_mode
;
2259 if (einfo
->ei_mode
== LCK_PR
)
2261 mode
= ldlm_lock_match(obd
->obd_namespace
, *flags
| match_lvb
, res_id
,
2262 einfo
->ei_type
, policy
, mode
, lockh
, 0);
2264 struct ldlm_lock
*matched
= ldlm_handle2lock(lockh
);
2266 if ((agl
!= 0) && !(matched
->l_flags
& LDLM_FL_LVB_READY
)) {
2267 /* For AGL, if enqueue RPC is sent but the lock is not
2268 * granted, then skip to process this strpe.
2269 * Return -ECANCELED to tell the caller.
2271 ldlm_lock_decref(lockh
, mode
);
2272 LDLM_LOCK_PUT(matched
);
2276 if (osc_set_lock_data_with_check(matched
, einfo
)) {
2277 *flags
|= LDLM_FL_LVB_READY
;
2278 /* addref the lock only if not async requests and PW
2279 * lock is matched whereas we asked for PR.
2281 if (!rqset
&& einfo
->ei_mode
!= mode
)
2282 ldlm_lock_addref(lockh
, LCK_PR
);
2284 /* I would like to be able to ASSERT here that
2285 * rss <= kms, but I can't, for reasons which
2286 * are explained in lov_enqueue()
2290 /* We already have a lock, and it's referenced.
2292 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2293 * AGL upcall may change it to CLS_HELD directly.
2295 (*upcall
)(cookie
, ELDLM_OK
);
2297 if (einfo
->ei_mode
!= mode
)
2298 ldlm_lock_decref(lockh
, LCK_PW
);
2300 /* For async requests, decref the lock. */
2301 ldlm_lock_decref(lockh
, einfo
->ei_mode
);
2302 LDLM_LOCK_PUT(matched
);
2306 ldlm_lock_decref(lockh
, mode
);
2307 LDLM_LOCK_PUT(matched
);
2314 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2315 &RQF_LDLM_ENQUEUE_LVB
);
2319 rc
= ldlm_prep_enqueue_req(exp
, req
, &cancels
, 0);
2321 ptlrpc_request_free(req
);
2325 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
2327 ptlrpc_request_set_replen(req
);
2330 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2331 *flags
&= ~LDLM_FL_BLOCK_GRANTED
;
2333 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, res_id
, policy
, flags
, lvb
,
2334 sizeof(*lvb
), LVB_T_OST
, lockh
, async
);
2337 struct osc_enqueue_args
*aa
;
2339 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2340 aa
= ptlrpc_req_async_args(req
);
2343 aa
->oa_flags
= flags
;
2344 aa
->oa_upcall
= upcall
;
2345 aa
->oa_cookie
= cookie
;
2347 aa
->oa_lockh
= lockh
;
2350 req
->rq_interpret_reply
=
2351 (ptlrpc_interpterer_t
)osc_enqueue_interpret
;
2352 if (rqset
== PTLRPCD_SET
)
2353 ptlrpcd_add_req(req
);
2355 ptlrpc_set_add_req(rqset
, req
);
2356 } else if (intent
) {
2357 ptlrpc_req_finished(req
);
2362 rc
= osc_enqueue_fini(req
, lvb
, upcall
, cookie
, flags
, agl
, rc
);
2364 ptlrpc_req_finished(req
);
2369 int osc_match_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2370 __u32 type
, ldlm_policy_data_t
*policy
, __u32 mode
,
2371 __u64
*flags
, void *data
, struct lustre_handle
*lockh
,
2374 struct obd_device
*obd
= exp
->exp_obd
;
2375 __u64 lflags
= *flags
;
2378 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH
))
2381 /* Filesystem lock extents are extended to page boundaries so that
2382 * dealing with the page cache is a little smoother
2384 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~CFS_PAGE_MASK
;
2385 policy
->l_extent
.end
|= ~CFS_PAGE_MASK
;
2387 /* Next, search for already existing extent locks that will cover us */
2388 /* If we're trying to read, we also search for an existing PW lock. The
2389 * VFS and page cache already protect us locally, so lots of readers/
2390 * writers can share a single PW lock.
2395 rc
= ldlm_lock_match(obd
->obd_namespace
, lflags
,
2396 res_id
, type
, policy
, rc
, lockh
, unref
);
2399 if (!osc_set_data_with_check(lockh
, data
)) {
2400 if (!(lflags
& LDLM_FL_TEST_LOCK
))
2401 ldlm_lock_decref(lockh
, rc
);
2405 if (!(lflags
& LDLM_FL_TEST_LOCK
) && mode
!= rc
) {
2406 ldlm_lock_addref(lockh
, LCK_PR
);
2407 ldlm_lock_decref(lockh
, LCK_PW
);
2414 int osc_cancel_base(struct lustre_handle
*lockh
, __u32 mode
)
2416 if (unlikely(mode
== LCK_GROUP
))
2417 ldlm_lock_decref_and_cancel(lockh
, mode
);
2419 ldlm_lock_decref(lockh
, mode
);
2424 static int osc_statfs_interpret(const struct lu_env
*env
,
2425 struct ptlrpc_request
*req
,
2426 struct osc_async_args
*aa
, int rc
)
2428 struct obd_statfs
*msfs
;
2431 /* The request has in fact never been sent
2432 * due to issues at a higher level (LOV).
2433 * Exit immediately since the caller is
2434 * aware of the problem and takes care
2439 if ((rc
== -ENOTCONN
|| rc
== -EAGAIN
) &&
2440 (aa
->aa_oi
->oi_flags
& OBD_STATFS_NODELAY
)) {
2448 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2454 *aa
->aa_oi
->oi_osfs
= *msfs
;
2456 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
2460 static int osc_statfs_async(struct obd_export
*exp
,
2461 struct obd_info
*oinfo
, __u64 max_age
,
2462 struct ptlrpc_request_set
*rqset
)
2464 struct obd_device
*obd
= class_exp2obd(exp
);
2465 struct ptlrpc_request
*req
;
2466 struct osc_async_args
*aa
;
2469 /* We could possibly pass max_age in the request (as an absolute
2470 * timestamp or a "seconds.usec ago") so the target can avoid doing
2471 * extra calls into the filesystem if that isn't necessary (e.g.
2472 * during mount that would help a bit). Having relative timestamps
2473 * is not so great if request processing is slow, while absolute
2474 * timestamps are not ideal because they need time synchronization.
2476 req
= ptlrpc_request_alloc(obd
->u
.cli
.cl_import
, &RQF_OST_STATFS
);
2480 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2482 ptlrpc_request_free(req
);
2485 ptlrpc_request_set_replen(req
);
2486 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2487 ptlrpc_at_set_req_timeout(req
);
2489 if (oinfo
->oi_flags
& OBD_STATFS_NODELAY
) {
2490 /* procfs requests not want stat in wait for avoid deadlock */
2491 req
->rq_no_resend
= 1;
2492 req
->rq_no_delay
= 1;
2495 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_statfs_interpret
;
2496 CLASSERT (sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2497 aa
= ptlrpc_req_async_args(req
);
2500 ptlrpc_set_add_req(rqset
, req
);
2504 static int osc_statfs(const struct lu_env
*env
, struct obd_export
*exp
,
2505 struct obd_statfs
*osfs
, __u64 max_age
, __u32 flags
)
2507 struct obd_device
*obd
= class_exp2obd(exp
);
2508 struct obd_statfs
*msfs
;
2509 struct ptlrpc_request
*req
;
2510 struct obd_import
*imp
= NULL
;
2513 /* Since the request might also come from lprocfs, so we need
2514 * sync this with client_disconnect_export Bug15684
2516 down_read(&obd
->u
.cli
.cl_sem
);
2517 if (obd
->u
.cli
.cl_import
)
2518 imp
= class_import_get(obd
->u
.cli
.cl_import
);
2519 up_read(&obd
->u
.cli
.cl_sem
);
2523 /* We could possibly pass max_age in the request (as an absolute
2524 * timestamp or a "seconds.usec ago") so the target can avoid doing
2525 * extra calls into the filesystem if that isn't necessary (e.g.
2526 * during mount that would help a bit). Having relative timestamps
2527 * is not so great if request processing is slow, while absolute
2528 * timestamps are not ideal because they need time synchronization.
2530 req
= ptlrpc_request_alloc(imp
, &RQF_OST_STATFS
);
2532 class_import_put(imp
);
2537 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2539 ptlrpc_request_free(req
);
2542 ptlrpc_request_set_replen(req
);
2543 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2544 ptlrpc_at_set_req_timeout(req
);
2546 if (flags
& OBD_STATFS_NODELAY
) {
2547 /* procfs requests not want stat in wait for avoid deadlock */
2548 req
->rq_no_resend
= 1;
2549 req
->rq_no_delay
= 1;
2552 rc
= ptlrpc_queue_wait(req
);
2556 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2565 ptlrpc_req_finished(req
);
2569 /* Retrieve object striping information.
2571 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2572 * the maximum number of OST indices which will fit in the user buffer.
2573 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2575 static int osc_getstripe(struct lov_stripe_md
*lsm
,
2576 struct lov_user_md __user
*lump
)
2578 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2579 struct lov_user_md_v3 lum
, *lumk
;
2580 struct lov_user_ost_data_v1
*lmm_objects
;
2581 int rc
= 0, lum_size
;
2586 /* we only need the header part from user space to get lmm_magic and
2587 * lmm_stripe_count, (the header part is common to v1 and v3)
2589 lum_size
= sizeof(struct lov_user_md_v1
);
2590 if (copy_from_user(&lum
, lump
, lum_size
))
2593 if ((lum
.lmm_magic
!= LOV_USER_MAGIC_V1
) &&
2594 (lum
.lmm_magic
!= LOV_USER_MAGIC_V3
))
2597 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2598 LASSERT(sizeof(struct lov_user_md_v1
) == sizeof(struct lov_mds_md_v1
));
2599 LASSERT(sizeof(struct lov_user_md_v3
) == sizeof(struct lov_mds_md_v3
));
2600 LASSERT(sizeof(lum
.lmm_objects
[0]) == sizeof(lumk
->lmm_objects
[0]));
2602 /* we can use lov_mds_md_size() to compute lum_size
2603 * because lov_user_md_vX and lov_mds_md_vX have the same size
2605 if (lum
.lmm_stripe_count
> 0) {
2606 lum_size
= lov_mds_md_size(lum
.lmm_stripe_count
, lum
.lmm_magic
);
2607 lumk
= kzalloc(lum_size
, GFP_NOFS
);
2611 if (lum
.lmm_magic
== LOV_USER_MAGIC_V1
)
2613 &(((struct lov_user_md_v1
*)lumk
)->lmm_objects
[0]);
2615 lmm_objects
= &(lumk
->lmm_objects
[0]);
2616 lmm_objects
->l_ost_oi
= lsm
->lsm_oi
;
2618 lum_size
= lov_mds_md_size(0, lum
.lmm_magic
);
2622 lumk
->lmm_oi
= lsm
->lsm_oi
;
2623 lumk
->lmm_stripe_count
= 1;
2625 if (copy_to_user(lump
, lumk
, lum_size
))
2634 static int osc_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
2635 void *karg
, void __user
*uarg
)
2637 struct obd_device
*obd
= exp
->exp_obd
;
2638 struct obd_ioctl_data
*data
= karg
;
2641 if (!try_module_get(THIS_MODULE
)) {
2642 CERROR("%s: cannot get module '%s'\n", obd
->obd_name
,
2643 module_name(THIS_MODULE
));
2647 case OBD_IOC_LOV_GET_CONFIG
: {
2649 struct lov_desc
*desc
;
2650 struct obd_uuid uuid
;
2654 if (obd_ioctl_getdata(&buf
, &len
, uarg
)) {
2659 data
= (struct obd_ioctl_data
*)buf
;
2661 if (sizeof(*desc
) > data
->ioc_inllen1
) {
2662 obd_ioctl_freedata(buf
, len
);
2667 if (data
->ioc_inllen2
< sizeof(uuid
)) {
2668 obd_ioctl_freedata(buf
, len
);
2673 desc
= (struct lov_desc
*)data
->ioc_inlbuf1
;
2674 desc
->ld_tgt_count
= 1;
2675 desc
->ld_active_tgt_count
= 1;
2676 desc
->ld_default_stripe_count
= 1;
2677 desc
->ld_default_stripe_size
= 0;
2678 desc
->ld_default_stripe_offset
= 0;
2679 desc
->ld_pattern
= 0;
2680 memcpy(&desc
->ld_uuid
, &obd
->obd_uuid
, sizeof(uuid
));
2682 memcpy(data
->ioc_inlbuf2
, &obd
->obd_uuid
, sizeof(uuid
));
2684 err
= copy_to_user(uarg
, buf
, len
);
2687 obd_ioctl_freedata(buf
, len
);
2690 case LL_IOC_LOV_SETSTRIPE
:
2691 err
= obd_alloc_memmd(exp
, karg
);
2695 case LL_IOC_LOV_GETSTRIPE
:
2696 err
= osc_getstripe(karg
, uarg
);
2698 case OBD_IOC_CLIENT_RECOVER
:
2699 err
= ptlrpc_recover_import(obd
->u
.cli
.cl_import
,
2700 data
->ioc_inlbuf1
, 0);
2704 case IOC_OSC_SET_ACTIVE
:
2705 err
= ptlrpc_set_import_active(obd
->u
.cli
.cl_import
,
2708 case OBD_IOC_POLL_QUOTACHECK
:
2709 err
= osc_quota_poll_check(exp
, karg
);
2711 case OBD_IOC_PING_TARGET
:
2712 err
= ptlrpc_obd_ping(obd
);
2715 CDEBUG(D_INODE
, "unrecognised ioctl %#x by %s\n",
2716 cmd
, current_comm());
2721 module_put(THIS_MODULE
);
2725 static int osc_get_info(const struct lu_env
*env
, struct obd_export
*exp
,
2726 u32 keylen
, void *key
, __u32
*vallen
, void *val
,
2727 struct lov_stripe_md
*lsm
)
2729 if (!vallen
|| !val
)
2732 if (KEY_IS(KEY_LOCK_TO_STRIPE
)) {
2733 __u32
*stripe
= val
;
2734 *vallen
= sizeof(*stripe
);
2737 } else if (KEY_IS(KEY_LAST_ID
)) {
2738 struct ptlrpc_request
*req
;
2743 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2744 &RQF_OST_GET_INFO_LAST_ID
);
2748 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2749 RCL_CLIENT
, keylen
);
2750 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2752 ptlrpc_request_free(req
);
2756 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2757 memcpy(tmp
, key
, keylen
);
2759 req
->rq_no_delay
= req
->rq_no_resend
= 1;
2760 ptlrpc_request_set_replen(req
);
2761 rc
= ptlrpc_queue_wait(req
);
2765 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_ID
);
2771 *((u64
*)val
) = *reply
;
2773 ptlrpc_req_finished(req
);
2775 } else if (KEY_IS(KEY_FIEMAP
)) {
2776 struct ll_fiemap_info_key
*fm_key
= key
;
2777 struct ldlm_res_id res_id
;
2778 ldlm_policy_data_t policy
;
2779 struct lustre_handle lockh
;
2780 enum ldlm_mode mode
= 0;
2781 struct ptlrpc_request
*req
;
2782 struct ll_user_fiemap
*reply
;
2786 if (!(fm_key
->fiemap
.fm_flags
& FIEMAP_FLAG_SYNC
))
2789 policy
.l_extent
.start
= fm_key
->fiemap
.fm_start
&
2792 if (OBD_OBJECT_EOF
- fm_key
->fiemap
.fm_length
<=
2793 fm_key
->fiemap
.fm_start
+ PAGE_CACHE_SIZE
- 1)
2794 policy
.l_extent
.end
= OBD_OBJECT_EOF
;
2796 policy
.l_extent
.end
= (fm_key
->fiemap
.fm_start
+
2797 fm_key
->fiemap
.fm_length
+
2798 PAGE_CACHE_SIZE
- 1) & CFS_PAGE_MASK
;
2800 ostid_build_res_name(&fm_key
->oa
.o_oi
, &res_id
);
2801 mode
= ldlm_lock_match(exp
->exp_obd
->obd_namespace
,
2802 LDLM_FL_BLOCK_GRANTED
|
2804 &res_id
, LDLM_EXTENT
, &policy
,
2805 LCK_PR
| LCK_PW
, &lockh
, 0);
2806 if (mode
) { /* lock is cached on client */
2807 if (mode
!= LCK_PR
) {
2808 ldlm_lock_addref(&lockh
, LCK_PR
);
2809 ldlm_lock_decref(&lockh
, LCK_PW
);
2811 } else { /* no cached lock, needs acquire lock on server side */
2812 fm_key
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
2813 fm_key
->oa
.o_flags
|= OBD_FL_SRVLOCK
;
2817 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2818 &RQF_OST_GET_INFO_FIEMAP
);
2824 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_KEY
,
2825 RCL_CLIENT
, keylen
);
2826 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2827 RCL_CLIENT
, *vallen
);
2828 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2829 RCL_SERVER
, *vallen
);
2831 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2833 ptlrpc_request_free(req
);
2837 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_KEY
);
2838 memcpy(tmp
, key
, keylen
);
2839 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2840 memcpy(tmp
, val
, *vallen
);
2842 ptlrpc_request_set_replen(req
);
2843 rc
= ptlrpc_queue_wait(req
);
2847 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2853 memcpy(val
, reply
, *vallen
);
2855 ptlrpc_req_finished(req
);
2858 ldlm_lock_decref(&lockh
, LCK_PR
);
2865 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
2866 u32 keylen
, void *key
, u32 vallen
,
2867 void *val
, struct ptlrpc_request_set
*set
)
2869 struct ptlrpc_request
*req
;
2870 struct obd_device
*obd
= exp
->exp_obd
;
2871 struct obd_import
*imp
= class_exp2cliimp(exp
);
2875 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN
, 10);
2877 if (KEY_IS(KEY_CHECKSUM
)) {
2878 if (vallen
!= sizeof(int))
2880 exp
->exp_obd
->u
.cli
.cl_checksum
= (*(int *)val
) ? 1 : 0;
2884 if (KEY_IS(KEY_SPTLRPC_CONF
)) {
2885 sptlrpc_conf_client_adapt(obd
);
2889 if (KEY_IS(KEY_FLUSH_CTX
)) {
2890 sptlrpc_import_flush_my_ctx(imp
);
2894 if (KEY_IS(KEY_CACHE_SET
)) {
2895 struct client_obd
*cli
= &obd
->u
.cli
;
2897 LASSERT(!cli
->cl_cache
); /* only once */
2898 cli
->cl_cache
= val
;
2899 atomic_inc(&cli
->cl_cache
->ccc_users
);
2900 cli
->cl_lru_left
= &cli
->cl_cache
->ccc_lru_left
;
2902 /* add this osc into entity list */
2903 LASSERT(list_empty(&cli
->cl_lru_osc
));
2904 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
2905 list_add(&cli
->cl_lru_osc
, &cli
->cl_cache
->ccc_lru
);
2906 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
2911 if (KEY_IS(KEY_CACHE_LRU_SHRINK
)) {
2912 struct client_obd
*cli
= &obd
->u
.cli
;
2913 int nr
= atomic_read(&cli
->cl_lru_in_list
) >> 1;
2914 int target
= *(int *)val
;
2916 nr
= osc_lru_shrink(cli
, min(nr
, target
));
2921 if (!set
&& !KEY_IS(KEY_GRANT_SHRINK
))
2924 /* We pass all other commands directly to OST. Since nobody calls osc
2925 * methods directly and everybody is supposed to go through LOV, we
2926 * assume lov checked invalid values for us.
2927 * The only recognised values so far are evict_by_nid and mds_conn.
2928 * Even if something bad goes through, we'd get a -EINVAL from OST
2932 req
= ptlrpc_request_alloc(imp
, KEY_IS(KEY_GRANT_SHRINK
) ?
2933 &RQF_OST_SET_GRANT_INFO
:
2938 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2939 RCL_CLIENT
, keylen
);
2940 if (!KEY_IS(KEY_GRANT_SHRINK
))
2941 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
2942 RCL_CLIENT
, vallen
);
2943 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SET_INFO
);
2945 ptlrpc_request_free(req
);
2949 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2950 memcpy(tmp
, key
, keylen
);
2951 tmp
= req_capsule_client_get(&req
->rq_pill
, KEY_IS(KEY_GRANT_SHRINK
) ?
2954 memcpy(tmp
, val
, vallen
);
2956 if (KEY_IS(KEY_GRANT_SHRINK
)) {
2957 struct osc_brw_async_args
*aa
;
2960 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2961 aa
= ptlrpc_req_async_args(req
);
2962 oa
= kmem_cache_zalloc(obdo_cachep
, GFP_NOFS
);
2964 ptlrpc_req_finished(req
);
2967 *oa
= ((struct ost_body
*)val
)->oa
;
2969 req
->rq_interpret_reply
= osc_shrink_grant_interpret
;
2972 ptlrpc_request_set_replen(req
);
2973 if (!KEY_IS(KEY_GRANT_SHRINK
)) {
2975 ptlrpc_set_add_req(set
, req
);
2976 ptlrpc_check_set(NULL
, set
);
2978 ptlrpcd_add_req(req
);
2984 static int osc_reconnect(const struct lu_env
*env
,
2985 struct obd_export
*exp
, struct obd_device
*obd
,
2986 struct obd_uuid
*cluuid
,
2987 struct obd_connect_data
*data
,
2990 struct client_obd
*cli
= &obd
->u
.cli
;
2992 if (data
&& (data
->ocd_connect_flags
& OBD_CONNECT_GRANT
)) {
2995 client_obd_list_lock(&cli
->cl_loi_list_lock
);
2996 data
->ocd_grant
= (cli
->cl_avail_grant
+ cli
->cl_dirty
) ?:
2997 2 * cli_brw_size(obd
);
2998 lost_grant
= cli
->cl_lost_grant
;
2999 cli
->cl_lost_grant
= 0;
3000 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3002 CDEBUG(D_RPCTRACE
, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3003 data
->ocd_connect_flags
,
3004 data
->ocd_version
, data
->ocd_grant
, lost_grant
);
3010 static int osc_disconnect(struct obd_export
*exp
)
3012 struct obd_device
*obd
= class_exp2obd(exp
);
3015 rc
= client_disconnect_export(exp
);
3017 * Initially we put del_shrink_grant before disconnect_export, but it
3018 * causes the following problem if setup (connect) and cleanup
3019 * (disconnect) are tangled together.
3020 * connect p1 disconnect p2
3021 * ptlrpc_connect_import
3022 * ............... class_manual_cleanup
3025 * ptlrpc_connect_interrupt
3027 * add this client to shrink list
3029 * Bang! pinger trigger the shrink.
3030 * So the osc should be disconnected from the shrink list, after we
3031 * are sure the import has been destroyed. BUG18662
3033 if (!obd
->u
.cli
.cl_import
)
3034 osc_del_shrink_grant(&obd
->u
.cli
);
3038 static int osc_import_event(struct obd_device
*obd
,
3039 struct obd_import
*imp
,
3040 enum obd_import_event event
)
3042 struct client_obd
*cli
;
3045 LASSERT(imp
->imp_obd
== obd
);
3048 case IMP_EVENT_DISCON
: {
3050 client_obd_list_lock(&cli
->cl_loi_list_lock
);
3051 cli
->cl_avail_grant
= 0;
3052 cli
->cl_lost_grant
= 0;
3053 client_obd_list_unlock(&cli
->cl_loi_list_lock
);
3056 case IMP_EVENT_INACTIVE
: {
3057 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_INACTIVE
, NULL
);
3060 case IMP_EVENT_INVALIDATE
: {
3061 struct ldlm_namespace
*ns
= obd
->obd_namespace
;
3065 env
= cl_env_get(&refcheck
);
3069 /* all pages go to failing rpcs due to the invalid
3072 osc_io_unplug(env
, cli
, NULL
);
3074 ldlm_namespace_cleanup(ns
, LDLM_FL_LOCAL_ONLY
);
3075 cl_env_put(env
, &refcheck
);
3080 case IMP_EVENT_ACTIVE
: {
3081 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVE
, NULL
);
3084 case IMP_EVENT_OCD
: {
3085 struct obd_connect_data
*ocd
= &imp
->imp_connect_data
;
3087 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT
)
3088 osc_init_grant(&obd
->u
.cli
, ocd
);
3091 if (ocd
->ocd_connect_flags
& OBD_CONNECT_REQPORTAL
)
3092 imp
->imp_client
->cli_request_portal
= OST_REQUEST_PORTAL
;
3094 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_OCD
, NULL
);
3097 case IMP_EVENT_DEACTIVATE
: {
3098 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_DEACTIVATE
, NULL
);
3101 case IMP_EVENT_ACTIVATE
: {
3102 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVATE
, NULL
);
3106 CERROR("Unknown import event %d\n", event
);
3113 * Determine whether the lock can be canceled before replaying the lock
3114 * during recovery, see bug16774 for detailed information.
3116 * \retval zero the lock can't be canceled
3117 * \retval other ok to cancel
3119 static int osc_cancel_for_recovery(struct ldlm_lock
*lock
)
3121 check_res_locked(lock
->l_resource
);
3124 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3126 * XXX as a future improvement, we can also cancel unused write lock
3127 * if it doesn't have dirty data and active mmaps.
3129 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
3130 (lock
->l_granted_mode
== LCK_PR
||
3131 lock
->l_granted_mode
== LCK_CR
) &&
3132 (osc_dlm_lock_pageref(lock
) == 0))
3138 static int brw_queue_work(const struct lu_env
*env
, void *data
)
3140 struct client_obd
*cli
= data
;
3142 CDEBUG(D_CACHE
, "Run writeback work for client obd %p.\n", cli
);
3144 osc_io_unplug(env
, cli
, NULL
);
3148 int osc_setup(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3150 struct lprocfs_static_vars lvars
= { NULL
};
3151 struct client_obd
*cli
= &obd
->u
.cli
;
3158 rc
= ptlrpcd_addref();
3162 rc
= client_obd_setup(obd
, lcfg
);
3166 handler
= ptlrpcd_alloc_work(cli
->cl_import
, brw_queue_work
, cli
);
3167 if (IS_ERR(handler
)) {
3168 rc
= PTR_ERR(handler
);
3169 goto out_client_setup
;
3171 cli
->cl_writeback_work
= handler
;
3173 rc
= osc_quota_setup(obd
);
3175 goto out_ptlrpcd_work
;
3177 cli
->cl_grant_shrink_interval
= GRANT_SHRINK_INTERVAL
;
3178 lprocfs_osc_init_vars(&lvars
);
3179 if (lprocfs_obd_setup(obd
, lvars
.obd_vars
, lvars
.sysfs_vars
) == 0) {
3180 lproc_osc_attach_seqstat(obd
);
3181 sptlrpc_lprocfs_cliobd_attach(obd
);
3182 ptlrpc_lprocfs_register_obd(obd
);
3186 * We try to control the total number of requests with a upper limit
3187 * osc_reqpool_maxreqcount. There might be some race which will cause
3188 * over-limit allocation, but it is fine.
3190 req_count
= atomic_read(&osc_pool_req_count
);
3191 if (req_count
< osc_reqpool_maxreqcount
) {
3192 adding
= cli
->cl_max_rpcs_in_flight
+ 2;
3193 if (req_count
+ adding
> osc_reqpool_maxreqcount
)
3194 adding
= osc_reqpool_maxreqcount
- req_count
;
3196 added
= ptlrpc_add_rqs_to_pool(osc_rq_pool
, adding
);
3197 atomic_add(added
, &osc_pool_req_count
);
3200 INIT_LIST_HEAD(&cli
->cl_grant_shrink_list
);
3201 ns_register_cancel(obd
->obd_namespace
, osc_cancel_for_recovery
);
3205 ptlrpcd_destroy_work(handler
);
3207 client_obd_cleanup(obd
);
3213 static int osc_precleanup(struct obd_device
*obd
, enum obd_cleanup_stage stage
)
3216 case OBD_CLEANUP_EARLY
: {
3217 struct obd_import
*imp
;
3219 imp
= obd
->u
.cli
.cl_import
;
3220 CDEBUG(D_HA
, "Deactivating import %s\n", obd
->obd_name
);
3221 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3222 ptlrpc_deactivate_import(imp
);
3223 spin_lock(&imp
->imp_lock
);
3224 imp
->imp_pingable
= 0;
3225 spin_unlock(&imp
->imp_lock
);
3228 case OBD_CLEANUP_EXPORTS
: {
3229 struct client_obd
*cli
= &obd
->u
.cli
;
3231 * for echo client, export may be on zombie list, wait for
3232 * zombie thread to cull it, because cli.cl_import will be
3233 * cleared in client_disconnect_export():
3234 * class_export_destroy() -> obd_cleanup() ->
3235 * echo_device_free() -> echo_client_cleanup() ->
3236 * obd_disconnect() -> osc_disconnect() ->
3237 * client_disconnect_export()
3239 obd_zombie_barrier();
3240 if (cli
->cl_writeback_work
) {
3241 ptlrpcd_destroy_work(cli
->cl_writeback_work
);
3242 cli
->cl_writeback_work
= NULL
;
3244 obd_cleanup_client_import(obd
);
3245 ptlrpc_lprocfs_unregister_obd(obd
);
3246 lprocfs_obd_cleanup(obd
);
3253 static int osc_cleanup(struct obd_device
*obd
)
3255 struct client_obd
*cli
= &obd
->u
.cli
;
3259 if (cli
->cl_cache
) {
3260 LASSERT(atomic_read(&cli
->cl_cache
->ccc_users
) > 0);
3261 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
3262 list_del_init(&cli
->cl_lru_osc
);
3263 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
3264 cli
->cl_lru_left
= NULL
;
3265 atomic_dec(&cli
->cl_cache
->ccc_users
);
3266 cli
->cl_cache
= NULL
;
3269 /* free memory of osc quota cache */
3270 osc_quota_cleanup(obd
);
3272 rc
= client_obd_cleanup(obd
);
3278 int osc_process_config_base(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3280 struct lprocfs_static_vars lvars
= { NULL
};
3283 lprocfs_osc_init_vars(&lvars
);
3285 switch (lcfg
->lcfg_command
) {
3287 rc
= class_process_proc_param(PARAM_OSC
, lvars
.obd_vars
,
3297 static int osc_process_config(struct obd_device
*obd
, u32 len
, void *buf
)
3299 return osc_process_config_base(obd
, buf
);
3302 static struct obd_ops osc_obd_ops
= {
3303 .owner
= THIS_MODULE
,
3305 .precleanup
= osc_precleanup
,
3306 .cleanup
= osc_cleanup
,
3307 .add_conn
= client_import_add_conn
,
3308 .del_conn
= client_import_del_conn
,
3309 .connect
= client_connect_import
,
3310 .reconnect
= osc_reconnect
,
3311 .disconnect
= osc_disconnect
,
3312 .statfs
= osc_statfs
,
3313 .statfs_async
= osc_statfs_async
,
3314 .packmd
= osc_packmd
,
3315 .unpackmd
= osc_unpackmd
,
3316 .create
= osc_create
,
3317 .destroy
= osc_destroy
,
3318 .getattr
= osc_getattr
,
3319 .getattr_async
= osc_getattr_async
,
3320 .setattr
= osc_setattr
,
3321 .setattr_async
= osc_setattr_async
,
3322 .find_cbdata
= osc_find_cbdata
,
3323 .iocontrol
= osc_iocontrol
,
3324 .get_info
= osc_get_info
,
3325 .set_info_async
= osc_set_info_async
,
3326 .import_event
= osc_import_event
,
3327 .process_config
= osc_process_config
,
3328 .quotactl
= osc_quotactl
,
3329 .quotacheck
= osc_quotacheck
,
3332 extern struct lu_kmem_descr osc_caches
[];
3333 extern spinlock_t osc_ast_guard
;
3334 extern struct lock_class_key osc_ast_guard_class
;
3336 static int __init
osc_init(void)
3338 struct lprocfs_static_vars lvars
= { NULL
};
3339 unsigned int reqpool_size
;
3340 unsigned int reqsize
;
3343 /* print an address of _any_ initialized kernel symbol from this
3344 * module, to allow debugging with gdb that doesn't support data
3345 * symbols from modules.
3347 CDEBUG(D_INFO
, "Lustre OSC module (%p).\n", &osc_caches
);
3349 rc
= lu_kmem_init(osc_caches
);
3353 lprocfs_osc_init_vars(&lvars
);
3355 rc
= class_register_type(&osc_obd_ops
, NULL
,
3356 LUSTRE_OSC_NAME
, &osc_device_type
);
3360 spin_lock_init(&osc_ast_guard
);
3361 lockdep_set_class(&osc_ast_guard
, &osc_ast_guard_class
);
3363 /* This is obviously too much memory, only prevent overflow here */
3364 if (osc_reqpool_mem_max
>= 1 << 12 || osc_reqpool_mem_max
== 0) {
3369 reqpool_size
= osc_reqpool_mem_max
<< 20;
3372 while (reqsize
< OST_MAXREQSIZE
)
3373 reqsize
= reqsize
<< 1;
3376 * We don't enlarge the request count in OSC pool according to
3377 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3378 * tried after normal allocation failed. So a small OSC pool won't
3379 * cause much performance degression in most of cases.
3381 osc_reqpool_maxreqcount
= reqpool_size
/ reqsize
;
3383 atomic_set(&osc_pool_req_count
, 0);
3384 osc_rq_pool
= ptlrpc_init_rq_pool(0, OST_MAXREQSIZE
,
3385 ptlrpc_add_rqs_to_pool
);
3393 class_unregister_type(LUSTRE_OSC_NAME
);
3395 lu_kmem_fini(osc_caches
);
3399 static void /*__exit*/ osc_exit(void)
3401 class_unregister_type(LUSTRE_OSC_NAME
);
3402 lu_kmem_fini(osc_caches
);
3403 ptlrpc_free_rq_pool(osc_rq_pool
);
3406 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3407 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3408 MODULE_LICENSE("GPL");
3409 MODULE_VERSION(LUSTRE_VERSION_STRING
);
3411 module_init(osc_init
);
3412 module_exit(osc_exit
);