4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_OSC
35 #include "../../include/linux/libcfs/libcfs.h"
37 #include "../include/lustre_dlm.h"
38 #include "../include/lustre_net.h"
39 #include "../include/lustre/lustre_user.h"
40 #include "../include/obd_cksum.h"
42 #include "../include/lustre_ha.h"
43 #include "../include/lprocfs_status.h"
44 #include "../include/lustre_debug.h"
45 #include "../include/lustre_param.h"
46 #include "../include/lustre_fid.h"
47 #include "../include/obd_class.h"
48 #include "../include/obd.h"
49 #include "osc_internal.h"
50 #include "osc_cl_internal.h"
52 atomic_t osc_pool_req_count
;
53 unsigned int osc_reqpool_maxreqcount
;
54 struct ptlrpc_request_pool
*osc_rq_pool
;
56 /* max memory used for request pool, unit is MB */
57 static unsigned int osc_reqpool_mem_max
= 5;
58 module_param(osc_reqpool_mem_max
, uint
, 0444);
60 struct osc_brw_async_args
{
66 struct brw_page
**aa_ppga
;
67 struct client_obd
*aa_cli
;
68 struct list_head aa_oaps
;
69 struct list_head aa_exts
;
70 struct cl_req
*aa_clerq
;
73 struct osc_async_args
{
74 struct obd_info
*aa_oi
;
77 struct osc_setattr_args
{
79 obd_enqueue_update_f sa_upcall
;
83 struct osc_fsync_args
{
84 struct obd_info
*fa_oi
;
85 obd_enqueue_update_f fa_upcall
;
89 struct osc_enqueue_args
{
90 struct obd_export
*oa_exp
;
91 enum ldlm_type oa_type
;
92 enum ldlm_mode oa_mode
;
94 osc_enqueue_upcall_f oa_upcall
;
96 struct ost_lvb
*oa_lvb
;
97 struct lustre_handle oa_lockh
;
98 unsigned int oa_agl
:1;
101 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
);
102 static int brw_interpret(const struct lu_env
*env
,
103 struct ptlrpc_request
*req
, void *data
, int rc
);
105 /* Pack OSC object metadata for disk storage (LE byte order). */
106 static int osc_packmd(struct obd_export
*exp
, struct lov_mds_md
**lmmp
,
107 struct lov_stripe_md
*lsm
)
111 lmm_size
= sizeof(**lmmp
);
119 } else if (unlikely(lsm
&& ostid_id(&lsm
->lsm_oi
) == 0)) {
124 *lmmp
= kzalloc(lmm_size
, GFP_NOFS
);
130 ostid_cpu_to_le(&lsm
->lsm_oi
, &(*lmmp
)->lmm_oi
);
135 /* Unpack OSC object metadata from disk storage (LE byte order). */
136 static int osc_unpackmd(struct obd_export
*exp
, struct lov_stripe_md
**lsmp
,
137 struct lov_mds_md
*lmm
, int lmm_bytes
)
140 struct obd_import
*imp
= class_exp2cliimp(exp
);
143 if (lmm_bytes
< sizeof(*lmm
)) {
144 CERROR("%s: lov_mds_md too small: %d, need %d\n",
145 exp
->exp_obd
->obd_name
, lmm_bytes
,
149 /* XXX LOV_MAGIC etc check? */
151 if (unlikely(ostid_id(&lmm
->lmm_oi
) == 0)) {
152 CERROR("%s: zero lmm_object_id: rc = %d\n",
153 exp
->exp_obd
->obd_name
, -EINVAL
);
158 lsm_size
= lov_stripe_md_size(1);
163 kfree((*lsmp
)->lsm_oinfo
[0]);
170 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
171 if (unlikely(!*lsmp
))
173 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
),
175 if (unlikely(!(*lsmp
)->lsm_oinfo
[0])) {
179 loi_init((*lsmp
)->lsm_oinfo
[0]);
180 } else if (unlikely(ostid_id(&(*lsmp
)->lsm_oi
) == 0)) {
185 /* XXX zero *lsmp? */
186 ostid_le_to_cpu(&lmm
->lmm_oi
, &(*lsmp
)->lsm_oi
);
189 (imp
->imp_connect_data
.ocd_connect_flags
& OBD_CONNECT_MAXBYTES
))
190 (*lsmp
)->lsm_maxbytes
= imp
->imp_connect_data
.ocd_maxbytes
;
192 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
197 static inline void osc_pack_req_body(struct ptlrpc_request
*req
,
198 struct obd_info
*oinfo
)
200 struct ost_body
*body
;
202 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
205 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
209 static int osc_getattr_interpret(const struct lu_env
*env
,
210 struct ptlrpc_request
*req
,
211 struct osc_async_args
*aa
, int rc
)
213 struct ost_body
*body
;
218 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
220 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
221 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
222 aa
->aa_oi
->oi_oa
, &body
->oa
);
224 /* This should really be sent by the OST */
225 aa
->aa_oi
->oi_oa
->o_blksize
= DT_MAX_BRW_SIZE
;
226 aa
->aa_oi
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
228 CDEBUG(D_INFO
, "can't unpack ost_body\n");
230 aa
->aa_oi
->oi_oa
->o_valid
= 0;
233 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
237 static int osc_getattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
238 struct ptlrpc_request_set
*set
)
240 struct ptlrpc_request
*req
;
241 struct osc_async_args
*aa
;
244 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
248 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
250 ptlrpc_request_free(req
);
254 osc_pack_req_body(req
, oinfo
);
256 ptlrpc_request_set_replen(req
);
257 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_getattr_interpret
;
259 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
260 aa
= ptlrpc_req_async_args(req
);
263 ptlrpc_set_add_req(set
, req
);
267 static int osc_getattr(const struct lu_env
*env
, struct obd_export
*exp
,
268 struct obd_info
*oinfo
)
270 struct ptlrpc_request
*req
;
271 struct ost_body
*body
;
274 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_GETATTR
);
278 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GETATTR
);
280 ptlrpc_request_free(req
);
284 osc_pack_req_body(req
, oinfo
);
286 ptlrpc_request_set_replen(req
);
288 rc
= ptlrpc_queue_wait(req
);
292 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
298 CDEBUG(D_INODE
, "mode: %o\n", body
->oa
.o_mode
);
299 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
302 oinfo
->oi_oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
303 oinfo
->oi_oa
->o_valid
|= OBD_MD_FLBLKSZ
;
306 ptlrpc_req_finished(req
);
310 static int osc_setattr(const struct lu_env
*env
, struct obd_export
*exp
,
311 struct obd_info
*oinfo
, struct obd_trans_info
*oti
)
313 struct ptlrpc_request
*req
;
314 struct ost_body
*body
;
317 LASSERT(oinfo
->oi_oa
->o_valid
& OBD_MD_FLGROUP
);
319 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
323 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
325 ptlrpc_request_free(req
);
329 osc_pack_req_body(req
, oinfo
);
331 ptlrpc_request_set_replen(req
);
333 rc
= ptlrpc_queue_wait(req
);
337 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
343 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oinfo
->oi_oa
,
347 ptlrpc_req_finished(req
);
351 static int osc_setattr_interpret(const struct lu_env
*env
,
352 struct ptlrpc_request
*req
,
353 struct osc_setattr_args
*sa
, int rc
)
355 struct ost_body
*body
;
360 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
366 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, sa
->sa_oa
,
369 rc
= sa
->sa_upcall(sa
->sa_cookie
, rc
);
373 int osc_setattr_async_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
374 struct obd_trans_info
*oti
,
375 obd_enqueue_update_f upcall
, void *cookie
,
376 struct ptlrpc_request_set
*rqset
)
378 struct ptlrpc_request
*req
;
379 struct osc_setattr_args
*sa
;
382 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SETATTR
);
386 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SETATTR
);
388 ptlrpc_request_free(req
);
392 if (oti
&& oinfo
->oi_oa
->o_valid
& OBD_MD_FLCOOKIE
)
393 oinfo
->oi_oa
->o_lcookie
= *oti
->oti_logcookies
;
395 osc_pack_req_body(req
, oinfo
);
397 ptlrpc_request_set_replen(req
);
399 /* do mds to ost setattr asynchronously */
401 /* Do not wait for response. */
402 ptlrpcd_add_req(req
);
404 req
->rq_interpret_reply
=
405 (ptlrpc_interpterer_t
)osc_setattr_interpret
;
407 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
408 sa
= ptlrpc_req_async_args(req
);
409 sa
->sa_oa
= oinfo
->oi_oa
;
410 sa
->sa_upcall
= upcall
;
411 sa
->sa_cookie
= cookie
;
413 if (rqset
== PTLRPCD_SET
)
414 ptlrpcd_add_req(req
);
416 ptlrpc_set_add_req(rqset
, req
);
422 static int osc_setattr_async(struct obd_export
*exp
, struct obd_info
*oinfo
,
423 struct obd_trans_info
*oti
,
424 struct ptlrpc_request_set
*rqset
)
426 return osc_setattr_async_base(exp
, oinfo
, oti
,
427 oinfo
->oi_cb_up
, oinfo
, rqset
);
430 static int osc_real_create(struct obd_export
*exp
, struct obdo
*oa
,
431 struct lov_stripe_md
**ea
,
432 struct obd_trans_info
*oti
)
434 struct ptlrpc_request
*req
;
435 struct ost_body
*body
;
436 struct lov_stripe_md
*lsm
;
444 rc
= obd_alloc_memmd(exp
, &lsm
);
449 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_CREATE
);
455 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_CREATE
);
457 ptlrpc_request_free(req
);
461 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
464 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
466 ptlrpc_request_set_replen(req
);
468 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
469 oa
->o_flags
== OBD_FL_DELORPHAN
) {
471 "delorphan from OST integration");
472 /* Don't resend the delorphan req */
473 req
->rq_no_resend
= 1;
474 req
->rq_no_delay
= 1;
477 rc
= ptlrpc_queue_wait(req
);
481 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
487 CDEBUG(D_INFO
, "oa flags %x\n", oa
->o_flags
);
488 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
, oa
, &body
->oa
);
490 oa
->o_blksize
= cli_brw_size(exp
->exp_obd
);
491 oa
->o_valid
|= OBD_MD_FLBLKSZ
;
493 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
494 * have valid lsm_oinfo data structs, so don't go touching that.
495 * This needs to be fixed in a big way.
497 lsm
->lsm_oi
= oa
->o_oi
;
501 oti
->oti_transno
= lustre_msg_get_transno(req
->rq_repmsg
);
503 if (oa
->o_valid
& OBD_MD_FLCOOKIE
) {
504 if (!oti
->oti_logcookies
)
505 oti_alloc_cookies(oti
, 1);
506 *oti
->oti_logcookies
= oa
->o_lcookie
;
510 CDEBUG(D_HA
, "transno: %lld\n",
511 lustre_msg_get_transno(req
->rq_repmsg
));
513 ptlrpc_req_finished(req
);
516 obd_free_memmd(exp
, &lsm
);
520 int osc_punch_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
521 obd_enqueue_update_f upcall
, void *cookie
,
522 struct ptlrpc_request_set
*rqset
)
524 struct ptlrpc_request
*req
;
525 struct osc_setattr_args
*sa
;
526 struct ost_body
*body
;
529 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_PUNCH
);
533 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_PUNCH
);
535 ptlrpc_request_free(req
);
538 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
539 ptlrpc_at_set_req_timeout(req
);
541 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
543 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
546 ptlrpc_request_set_replen(req
);
548 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_setattr_interpret
;
549 CLASSERT(sizeof(*sa
) <= sizeof(req
->rq_async_args
));
550 sa
= ptlrpc_req_async_args(req
);
551 sa
->sa_oa
= oinfo
->oi_oa
;
552 sa
->sa_upcall
= upcall
;
553 sa
->sa_cookie
= cookie
;
554 if (rqset
== PTLRPCD_SET
)
555 ptlrpcd_add_req(req
);
557 ptlrpc_set_add_req(rqset
, req
);
562 static int osc_sync_interpret(const struct lu_env
*env
,
563 struct ptlrpc_request
*req
,
566 struct osc_fsync_args
*fa
= arg
;
567 struct ost_body
*body
;
572 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
574 CERROR("can't unpack ost_body\n");
579 *fa
->fa_oi
->oi_oa
= body
->oa
;
581 rc
= fa
->fa_upcall(fa
->fa_cookie
, rc
);
585 int osc_sync_base(struct obd_export
*exp
, struct obd_info
*oinfo
,
586 obd_enqueue_update_f upcall
, void *cookie
,
587 struct ptlrpc_request_set
*rqset
)
589 struct ptlrpc_request
*req
;
590 struct ost_body
*body
;
591 struct osc_fsync_args
*fa
;
594 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_SYNC
);
598 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SYNC
);
600 ptlrpc_request_free(req
);
604 /* overload the size and blocks fields in the oa with start/end */
605 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
607 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
,
610 ptlrpc_request_set_replen(req
);
611 req
->rq_interpret_reply
= osc_sync_interpret
;
613 CLASSERT(sizeof(*fa
) <= sizeof(req
->rq_async_args
));
614 fa
= ptlrpc_req_async_args(req
);
616 fa
->fa_upcall
= upcall
;
617 fa
->fa_cookie
= cookie
;
619 if (rqset
== PTLRPCD_SET
)
620 ptlrpcd_add_req(req
);
622 ptlrpc_set_add_req(rqset
, req
);
627 /* Find and cancel locally locks matched by @mode in the resource found by
628 * @objid. Found locks are added into @cancel list. Returns the amount of
629 * locks added to @cancels list.
631 static int osc_resource_get_unused(struct obd_export
*exp
, struct obdo
*oa
,
632 struct list_head
*cancels
,
633 enum ldlm_mode mode
, __u64 lock_flags
)
635 struct ldlm_namespace
*ns
= exp
->exp_obd
->obd_namespace
;
636 struct ldlm_res_id res_id
;
637 struct ldlm_resource
*res
;
640 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
641 * export) but disabled through procfs (flag in NS).
643 * This distinguishes from a case when ELC is not supported originally,
644 * when we still want to cancel locks in advance and just cancel them
645 * locally, without sending any RPC.
647 if (exp_connect_cancelset(exp
) && !ns_connect_cancelset(ns
))
650 ostid_build_res_name(&oa
->o_oi
, &res_id
);
651 res
= ldlm_resource_get(ns
, NULL
, &res_id
, 0, 0);
655 LDLM_RESOURCE_ADDREF(res
);
656 count
= ldlm_cancel_resource_local(res
, cancels
, NULL
, mode
,
657 lock_flags
, 0, NULL
);
658 LDLM_RESOURCE_DELREF(res
);
659 ldlm_resource_putref(res
);
663 static int osc_destroy_interpret(const struct lu_env
*env
,
664 struct ptlrpc_request
*req
, void *data
,
667 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
669 atomic_dec(&cli
->cl_destroy_in_flight
);
670 wake_up(&cli
->cl_destroy_waitq
);
674 static int osc_can_send_destroy(struct client_obd
*cli
)
676 if (atomic_inc_return(&cli
->cl_destroy_in_flight
) <=
677 cli
->cl_max_rpcs_in_flight
) {
678 /* The destroy request can be sent */
681 if (atomic_dec_return(&cli
->cl_destroy_in_flight
) <
682 cli
->cl_max_rpcs_in_flight
) {
684 * The counter has been modified between the two atomic
687 wake_up(&cli
->cl_destroy_waitq
);
692 static int osc_create(const struct lu_env
*env
, struct obd_export
*exp
,
693 struct obdo
*oa
, struct lov_stripe_md
**ea
,
694 struct obd_trans_info
*oti
)
700 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
702 if ((oa
->o_valid
& OBD_MD_FLFLAGS
) &&
703 oa
->o_flags
== OBD_FL_RECREATE_OBJS
) {
704 return osc_real_create(exp
, oa
, ea
, oti
);
707 if (!fid_seq_is_mdt(ostid_seq(&oa
->o_oi
)))
708 return osc_real_create(exp
, oa
, ea
, oti
);
710 /* we should not get here anymore */
716 /* Destroy requests can be async always on the client, and we don't even really
717 * care about the return code since the client cannot do anything at all about
719 * When the MDS is unlinking a filename, it saves the file objects into a
720 * recovery llog, and these object records are cancelled when the OST reports
721 * they were destroyed and sync'd to disk (i.e. transaction committed).
722 * If the client dies, or the OST is down when the object should be destroyed,
723 * the records are not cancelled, and when the OST reconnects to the MDS next,
724 * it will retrieve the llog unlink logs and then sends the log cancellation
725 * cookies to the MDS after committing destroy transactions.
727 static int osc_destroy(const struct lu_env
*env
, struct obd_export
*exp
,
728 struct obdo
*oa
, struct lov_stripe_md
*ea
,
729 struct obd_trans_info
*oti
, struct obd_export
*md_export
)
731 struct client_obd
*cli
= &exp
->exp_obd
->u
.cli
;
732 struct ptlrpc_request
*req
;
733 struct ost_body
*body
;
738 CDEBUG(D_INFO
, "oa NULL\n");
742 count
= osc_resource_get_unused(exp
, oa
, &cancels
, LCK_PW
,
743 LDLM_FL_DISCARD_DATA
);
745 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
), &RQF_OST_DESTROY
);
747 ldlm_lock_list_put(&cancels
, l_bl_ast
, count
);
751 rc
= ldlm_prep_elc_req(exp
, req
, LUSTRE_OST_VERSION
, OST_DESTROY
,
754 ptlrpc_request_free(req
);
758 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
759 ptlrpc_at_set_req_timeout(req
);
761 if (oti
&& oa
->o_valid
& OBD_MD_FLCOOKIE
)
762 oa
->o_lcookie
= *oti
->oti_logcookies
;
763 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
765 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
767 ptlrpc_request_set_replen(req
);
769 /* If osc_destroy is for destroying the unlink orphan,
770 * sent from MDT to OST, which should not be blocked here,
771 * because the process might be triggered by ptlrpcd, and
772 * it is not good to block ptlrpcd thread (b=16006
774 if (!(oa
->o_flags
& OBD_FL_DELORPHAN
)) {
775 req
->rq_interpret_reply
= osc_destroy_interpret
;
776 if (!osc_can_send_destroy(cli
)) {
777 struct l_wait_info lwi
= LWI_INTR(LWI_ON_SIGNAL_NOOP
,
781 * Wait until the number of on-going destroy RPCs drops
782 * under max_rpc_in_flight
784 l_wait_event_exclusive(cli
->cl_destroy_waitq
,
785 osc_can_send_destroy(cli
), &lwi
);
789 /* Do not wait for response */
790 ptlrpcd_add_req(req
);
794 static void osc_announce_cached(struct client_obd
*cli
, struct obdo
*oa
,
797 u32 bits
= OBD_MD_FLBLOCKS
|OBD_MD_FLGRANT
;
799 LASSERT(!(oa
->o_valid
& bits
));
802 spin_lock(&cli
->cl_loi_list_lock
);
803 oa
->o_dirty
= cli
->cl_dirty
;
804 if (unlikely(cli
->cl_dirty
- cli
->cl_dirty_transit
>
805 cli
->cl_dirty_max
)) {
806 CERROR("dirty %lu - %lu > dirty_max %lu\n",
807 cli
->cl_dirty
, cli
->cl_dirty_transit
, cli
->cl_dirty_max
);
809 } else if (unlikely(atomic_read(&obd_unstable_pages
) +
810 atomic_read(&obd_dirty_pages
) -
811 atomic_read(&obd_dirty_transit_pages
) >
812 (long)(obd_max_dirty_pages
+ 1))) {
813 /* The atomic_read() allowing the atomic_inc() are
814 * not covered by a lock thus they may safely race and trip
815 * this CERROR() unless we add in a small fudge factor (+1).
817 CERROR("%s: dirty %d + %d - %d > system dirty_max %d\n",
818 cli
->cl_import
->imp_obd
->obd_name
,
819 atomic_read(&obd_unstable_pages
),
820 atomic_read(&obd_dirty_pages
),
821 atomic_read(&obd_dirty_transit_pages
),
822 obd_max_dirty_pages
);
824 } else if (unlikely(cli
->cl_dirty_max
- cli
->cl_dirty
> 0x7fffffff)) {
825 CERROR("dirty %lu - dirty_max %lu too big???\n",
826 cli
->cl_dirty
, cli
->cl_dirty_max
);
829 long max_in_flight
= (cli
->cl_max_pages_per_rpc
<<
831 (cli
->cl_max_rpcs_in_flight
+ 1);
832 oa
->o_undirty
= max(cli
->cl_dirty_max
, max_in_flight
);
834 oa
->o_grant
= cli
->cl_avail_grant
+ cli
->cl_reserved_grant
;
835 oa
->o_dropped
= cli
->cl_lost_grant
;
836 cli
->cl_lost_grant
= 0;
837 spin_unlock(&cli
->cl_loi_list_lock
);
838 CDEBUG(D_CACHE
, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
839 oa
->o_dirty
, oa
->o_undirty
, oa
->o_dropped
, oa
->o_grant
);
842 void osc_update_next_shrink(struct client_obd
*cli
)
844 cli
->cl_next_shrink_grant
=
845 cfs_time_shift(cli
->cl_grant_shrink_interval
);
846 CDEBUG(D_CACHE
, "next time %ld to shrink grant\n",
847 cli
->cl_next_shrink_grant
);
850 static void __osc_update_grant(struct client_obd
*cli
, u64 grant
)
852 spin_lock(&cli
->cl_loi_list_lock
);
853 cli
->cl_avail_grant
+= grant
;
854 spin_unlock(&cli
->cl_loi_list_lock
);
857 static void osc_update_grant(struct client_obd
*cli
, struct ost_body
*body
)
859 if (body
->oa
.o_valid
& OBD_MD_FLGRANT
) {
860 CDEBUG(D_CACHE
, "got %llu extra grant\n", body
->oa
.o_grant
);
861 __osc_update_grant(cli
, body
->oa
.o_grant
);
865 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
866 u32 keylen
, void *key
, u32 vallen
,
867 void *val
, struct ptlrpc_request_set
*set
);
869 static int osc_shrink_grant_interpret(const struct lu_env
*env
,
870 struct ptlrpc_request
*req
,
873 struct client_obd
*cli
= &req
->rq_import
->imp_obd
->u
.cli
;
874 struct obdo
*oa
= ((struct osc_brw_async_args
*)aa
)->aa_oa
;
875 struct ost_body
*body
;
878 __osc_update_grant(cli
, oa
->o_grant
);
882 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
884 osc_update_grant(cli
, body
);
886 kmem_cache_free(obdo_cachep
, oa
);
890 static void osc_shrink_grant_local(struct client_obd
*cli
, struct obdo
*oa
)
892 spin_lock(&cli
->cl_loi_list_lock
);
893 oa
->o_grant
= cli
->cl_avail_grant
/ 4;
894 cli
->cl_avail_grant
-= oa
->o_grant
;
895 spin_unlock(&cli
->cl_loi_list_lock
);
896 if (!(oa
->o_valid
& OBD_MD_FLFLAGS
)) {
897 oa
->o_valid
|= OBD_MD_FLFLAGS
;
900 oa
->o_flags
|= OBD_FL_SHRINK_GRANT
;
901 osc_update_next_shrink(cli
);
904 /* Shrink the current grant, either from some large amount to enough for a
905 * full set of in-flight RPCs, or if we have already shrunk to that limit
906 * then to enough for a single RPC. This avoids keeping more grant than
907 * needed, and avoids shrinking the grant piecemeal.
909 static int osc_shrink_grant(struct client_obd
*cli
)
911 __u64 target_bytes
= (cli
->cl_max_rpcs_in_flight
+ 1) *
912 (cli
->cl_max_pages_per_rpc
<< PAGE_SHIFT
);
914 spin_lock(&cli
->cl_loi_list_lock
);
915 if (cli
->cl_avail_grant
<= target_bytes
)
916 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_SHIFT
;
917 spin_unlock(&cli
->cl_loi_list_lock
);
919 return osc_shrink_grant_to_target(cli
, target_bytes
);
922 int osc_shrink_grant_to_target(struct client_obd
*cli
, __u64 target_bytes
)
925 struct ost_body
*body
;
927 spin_lock(&cli
->cl_loi_list_lock
);
928 /* Don't shrink if we are already above or below the desired limit
929 * We don't want to shrink below a single RPC, as that will negatively
930 * impact block allocation and long-term performance.
932 if (target_bytes
< cli
->cl_max_pages_per_rpc
<< PAGE_SHIFT
)
933 target_bytes
= cli
->cl_max_pages_per_rpc
<< PAGE_SHIFT
;
935 if (target_bytes
>= cli
->cl_avail_grant
) {
936 spin_unlock(&cli
->cl_loi_list_lock
);
939 spin_unlock(&cli
->cl_loi_list_lock
);
941 body
= kzalloc(sizeof(*body
), GFP_NOFS
);
945 osc_announce_cached(cli
, &body
->oa
, 0);
947 spin_lock(&cli
->cl_loi_list_lock
);
948 body
->oa
.o_grant
= cli
->cl_avail_grant
- target_bytes
;
949 cli
->cl_avail_grant
= target_bytes
;
950 spin_unlock(&cli
->cl_loi_list_lock
);
951 if (!(body
->oa
.o_valid
& OBD_MD_FLFLAGS
)) {
952 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
953 body
->oa
.o_flags
= 0;
955 body
->oa
.o_flags
|= OBD_FL_SHRINK_GRANT
;
956 osc_update_next_shrink(cli
);
958 rc
= osc_set_info_async(NULL
, cli
->cl_import
->imp_obd
->obd_self_export
,
959 sizeof(KEY_GRANT_SHRINK
), KEY_GRANT_SHRINK
,
960 sizeof(*body
), body
, NULL
);
962 __osc_update_grant(cli
, body
->oa
.o_grant
);
967 static int osc_should_shrink_grant(struct client_obd
*client
)
969 unsigned long time
= cfs_time_current();
970 unsigned long next_shrink
= client
->cl_next_shrink_grant
;
972 if ((client
->cl_import
->imp_connect_data
.ocd_connect_flags
&
973 OBD_CONNECT_GRANT_SHRINK
) == 0)
976 if (cfs_time_aftereq(time
, next_shrink
- 5 * CFS_TICK
)) {
977 /* Get the current RPC size directly, instead of going via:
978 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
979 * Keep comment here so that it can be found by searching.
981 int brw_size
= client
->cl_max_pages_per_rpc
<< PAGE_SHIFT
;
983 if (client
->cl_import
->imp_state
== LUSTRE_IMP_FULL
&&
984 client
->cl_avail_grant
> brw_size
)
987 osc_update_next_shrink(client
);
992 static int osc_grant_shrink_grant_cb(struct timeout_item
*item
, void *data
)
994 struct client_obd
*client
;
996 list_for_each_entry(client
, &item
->ti_obd_list
, cl_grant_shrink_list
) {
997 if (osc_should_shrink_grant(client
))
998 osc_shrink_grant(client
);
1003 static int osc_add_shrink_grant(struct client_obd
*client
)
1007 rc
= ptlrpc_add_timeout_client(client
->cl_grant_shrink_interval
,
1009 osc_grant_shrink_grant_cb
, NULL
,
1010 &client
->cl_grant_shrink_list
);
1012 CERROR("add grant client %s error %d\n",
1013 client
->cl_import
->imp_obd
->obd_name
, rc
);
1016 CDEBUG(D_CACHE
, "add grant client %s\n",
1017 client
->cl_import
->imp_obd
->obd_name
);
1018 osc_update_next_shrink(client
);
1022 static int osc_del_shrink_grant(struct client_obd
*client
)
1024 return ptlrpc_del_timeout_client(&client
->cl_grant_shrink_list
,
1028 static void osc_init_grant(struct client_obd
*cli
, struct obd_connect_data
*ocd
)
1031 * ocd_grant is the total grant amount we're expect to hold: if we've
1032 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1035 * race is tolerable here: if we're evicted, but imp_state already
1036 * left EVICTED state, then cl_dirty must be 0 already.
1038 spin_lock(&cli
->cl_loi_list_lock
);
1039 if (cli
->cl_import
->imp_state
== LUSTRE_IMP_EVICTED
)
1040 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1042 cli
->cl_avail_grant
= ocd
->ocd_grant
- cli
->cl_dirty
;
1044 if (cli
->cl_avail_grant
< 0) {
1045 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046 cli
->cl_import
->imp_obd
->obd_name
, cli
->cl_avail_grant
,
1047 ocd
->ocd_grant
, cli
->cl_dirty
);
1048 /* workaround for servers which do not have the patch from
1051 cli
->cl_avail_grant
= ocd
->ocd_grant
;
1054 /* determine the appropriate chunk size used by osc_extent. */
1055 cli
->cl_chunkbits
= max_t(int, PAGE_SHIFT
, ocd
->ocd_blocksize
);
1056 spin_unlock(&cli
->cl_loi_list_lock
);
1058 CDEBUG(D_CACHE
, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1059 cli
->cl_import
->imp_obd
->obd_name
,
1060 cli
->cl_avail_grant
, cli
->cl_lost_grant
, cli
->cl_chunkbits
);
1062 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT_SHRINK
&&
1063 list_empty(&cli
->cl_grant_shrink_list
))
1064 osc_add_shrink_grant(cli
);
1067 /* We assume that the reason this OSC got a short read is because it read
1068 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1069 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1070 * this stripe never got written at or beyond this stripe offset yet.
1072 static void handle_short_read(int nob_read
, u32 page_count
,
1073 struct brw_page
**pga
)
1078 /* skip bytes read OK */
1079 while (nob_read
> 0) {
1080 LASSERT(page_count
> 0);
1082 if (pga
[i
]->count
> nob_read
) {
1083 /* EOF inside this page */
1084 ptr
= kmap(pga
[i
]->pg
) +
1085 (pga
[i
]->off
& ~PAGE_MASK
);
1086 memset(ptr
+ nob_read
, 0, pga
[i
]->count
- nob_read
);
1093 nob_read
-= pga
[i
]->count
;
1098 /* zero remaining pages */
1099 while (page_count
-- > 0) {
1100 ptr
= kmap(pga
[i
]->pg
) + (pga
[i
]->off
& ~PAGE_MASK
);
1101 memset(ptr
, 0, pga
[i
]->count
);
1107 static int check_write_rcs(struct ptlrpc_request
*req
,
1108 int requested_nob
, int niocount
,
1109 u32 page_count
, struct brw_page
**pga
)
1114 remote_rcs
= req_capsule_server_sized_get(&req
->rq_pill
, &RMF_RCS
,
1115 sizeof(*remote_rcs
) *
1118 CDEBUG(D_INFO
, "Missing/short RC vector on BRW_WRITE reply\n");
1122 /* return error if any niobuf was in error */
1123 for (i
= 0; i
< niocount
; i
++) {
1124 if ((int)remote_rcs
[i
] < 0)
1125 return remote_rcs
[i
];
1127 if (remote_rcs
[i
] != 0) {
1128 CDEBUG(D_INFO
, "rc[%d] invalid (%d) req %p\n",
1129 i
, remote_rcs
[i
], req
);
1134 if (req
->rq_bulk
->bd_nob_transferred
!= requested_nob
) {
1135 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1136 req
->rq_bulk
->bd_nob_transferred
, requested_nob
);
1143 static inline int can_merge_pages(struct brw_page
*p1
, struct brw_page
*p2
)
1145 if (p1
->flag
!= p2
->flag
) {
1146 unsigned mask
= ~(OBD_BRW_FROM_GRANT
| OBD_BRW_NOCACHE
|
1147 OBD_BRW_SYNC
| OBD_BRW_ASYNC
|
1148 OBD_BRW_NOQUOTA
| OBD_BRW_SOFT_SYNC
);
1150 /* warn if we try to combine flags that we don't know to be
1153 if (unlikely((p1
->flag
& mask
) != (p2
->flag
& mask
))) {
1154 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
1155 p1
->flag
, p2
->flag
);
1160 return (p1
->off
+ p1
->count
== p2
->off
);
1163 static u32
osc_checksum_bulk(int nob
, u32 pg_count
,
1164 struct brw_page
**pga
, int opc
,
1165 enum cksum_type cksum_type
)
1169 struct cfs_crypto_hash_desc
*hdesc
;
1170 unsigned int bufsize
;
1172 unsigned char cfs_alg
= cksum_obd2cfs(cksum_type
);
1174 LASSERT(pg_count
> 0);
1176 hdesc
= cfs_crypto_hash_init(cfs_alg
, NULL
, 0);
1177 if (IS_ERR(hdesc
)) {
1178 CERROR("Unable to initialize checksum hash %s\n",
1179 cfs_crypto_hash_name(cfs_alg
));
1180 return PTR_ERR(hdesc
);
1183 while (nob
> 0 && pg_count
> 0) {
1184 int count
= pga
[i
]->count
> nob
? nob
: pga
[i
]->count
;
1186 /* corrupt the data before we compute the checksum, to
1187 * simulate an OST->client data error
1189 if (i
== 0 && opc
== OST_READ
&&
1190 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE
)) {
1191 unsigned char *ptr
= kmap(pga
[i
]->pg
);
1192 int off
= pga
[i
]->off
& ~PAGE_MASK
;
1194 memcpy(ptr
+ off
, "bad1", min(4, nob
));
1197 cfs_crypto_hash_update_page(hdesc
, pga
[i
]->pg
,
1198 pga
[i
]->off
& ~PAGE_MASK
,
1201 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1202 pga
[i
]->pg
, pga
[i
]->pg
->mapping
, pga
[i
]->pg
->index
,
1203 (long)pga
[i
]->pg
->flags
, page_count(pga
[i
]->pg
),
1204 page_private(pga
[i
]->pg
),
1205 (int)(pga
[i
]->off
& ~PAGE_MASK
));
1207 nob
-= pga
[i
]->count
;
1212 bufsize
= sizeof(cksum
);
1213 err
= cfs_crypto_hash_final(hdesc
, (unsigned char *)&cksum
, &bufsize
);
1215 /* For sending we only compute the wrong checksum instead
1216 * of corrupting the data so it is still correct on a redo
1218 if (opc
== OST_WRITE
&& OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND
))
1224 static int osc_brw_prep_request(int cmd
, struct client_obd
*cli
,
1226 struct lov_stripe_md
*lsm
, u32 page_count
,
1227 struct brw_page
**pga
,
1228 struct ptlrpc_request
**reqp
,
1232 struct ptlrpc_request
*req
;
1233 struct ptlrpc_bulk_desc
*desc
;
1234 struct ost_body
*body
;
1235 struct obd_ioobj
*ioobj
;
1236 struct niobuf_remote
*niobuf
;
1237 int niocount
, i
, requested_nob
, opc
, rc
;
1238 struct osc_brw_async_args
*aa
;
1239 struct req_capsule
*pill
;
1240 struct brw_page
*pg_prev
;
1242 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ
))
1243 return -ENOMEM
; /* Recoverable */
1244 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2
))
1245 return -EINVAL
; /* Fatal */
1247 if ((cmd
& OBD_BRW_WRITE
) != 0) {
1249 req
= ptlrpc_request_alloc_pool(cli
->cl_import
,
1251 &RQF_OST_BRW_WRITE
);
1254 req
= ptlrpc_request_alloc(cli
->cl_import
, &RQF_OST_BRW_READ
);
1259 for (niocount
= i
= 1; i
< page_count
; i
++) {
1260 if (!can_merge_pages(pga
[i
- 1], pga
[i
]))
1264 pill
= &req
->rq_pill
;
1265 req_capsule_set_size(pill
, &RMF_OBD_IOOBJ
, RCL_CLIENT
,
1267 req_capsule_set_size(pill
, &RMF_NIOBUF_REMOTE
, RCL_CLIENT
,
1268 niocount
* sizeof(*niobuf
));
1270 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, opc
);
1272 ptlrpc_request_free(req
);
1275 req
->rq_request_portal
= OST_IO_PORTAL
; /* bug 7198 */
1276 ptlrpc_at_set_req_timeout(req
);
1277 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1280 req
->rq_no_retry_einprogress
= 1;
1282 desc
= ptlrpc_prep_bulk_imp(req
, page_count
,
1283 cli
->cl_import
->imp_connect_data
.ocd_brw_size
>> LNET_MTU_BITS
,
1284 opc
== OST_WRITE
? BULK_GET_SOURCE
: BULK_PUT_SINK
,
1291 /* NB request now owns desc and will free it when it gets freed */
1293 body
= req_capsule_client_get(pill
, &RMF_OST_BODY
);
1294 ioobj
= req_capsule_client_get(pill
, &RMF_OBD_IOOBJ
);
1295 niobuf
= req_capsule_client_get(pill
, &RMF_NIOBUF_REMOTE
);
1296 LASSERT(body
&& ioobj
&& niobuf
);
1298 lustre_set_wire_obdo(&req
->rq_import
->imp_connect_data
, &body
->oa
, oa
);
1300 obdo_to_ioobj(oa
, ioobj
);
1301 ioobj
->ioo_bufcnt
= niocount
;
1302 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1303 * that might be send for this request. The actual number is decided
1304 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1305 * "max - 1" for old client compatibility sending "0", and also so the
1306 * the actual maximum is a power-of-two number, not one less. LU-1431
1308 ioobj_max_brw_set(ioobj
, desc
->bd_md_max_brw
);
1309 LASSERT(page_count
> 0);
1311 for (requested_nob
= i
= 0; i
< page_count
; i
++, niobuf
++) {
1312 struct brw_page
*pg
= pga
[i
];
1313 int poff
= pg
->off
& ~PAGE_MASK
;
1315 LASSERT(pg
->count
> 0);
1316 /* make sure there is no gap in the middle of page array */
1317 LASSERTF(page_count
== 1 ||
1318 (ergo(i
== 0, poff
+ pg
->count
== PAGE_SIZE
) &&
1319 ergo(i
> 0 && i
< page_count
- 1,
1320 poff
== 0 && pg
->count
== PAGE_SIZE
) &&
1321 ergo(i
== page_count
- 1, poff
== 0)),
1322 "i: %d/%d pg: %p off: %llu, count: %u\n",
1323 i
, page_count
, pg
, pg
->off
, pg
->count
);
1324 LASSERTF(i
== 0 || pg
->off
> pg_prev
->off
,
1325 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
1327 pg
->pg
, page_private(pg
->pg
), pg
->pg
->index
, pg
->off
,
1328 pg_prev
->pg
, page_private(pg_prev
->pg
),
1329 pg_prev
->pg
->index
, pg_prev
->off
);
1330 LASSERT((pga
[0]->flag
& OBD_BRW_SRVLOCK
) ==
1331 (pg
->flag
& OBD_BRW_SRVLOCK
));
1333 ptlrpc_prep_bulk_page_pin(desc
, pg
->pg
, poff
, pg
->count
);
1334 requested_nob
+= pg
->count
;
1336 if (i
> 0 && can_merge_pages(pg_prev
, pg
)) {
1338 niobuf
->len
+= pg
->count
;
1340 niobuf
->offset
= pg
->off
;
1341 niobuf
->len
= pg
->count
;
1342 niobuf
->flags
= pg
->flag
;
1347 LASSERTF((void *)(niobuf
- niocount
) ==
1348 req_capsule_client_get(&req
->rq_pill
, &RMF_NIOBUF_REMOTE
),
1349 "want %p - real %p\n", req_capsule_client_get(&req
->rq_pill
,
1350 &RMF_NIOBUF_REMOTE
), (void *)(niobuf
- niocount
));
1352 osc_announce_cached(cli
, &body
->oa
, opc
== OST_WRITE
? requested_nob
:0);
1354 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1355 body
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
1356 body
->oa
.o_flags
= 0;
1358 body
->oa
.o_flags
|= OBD_FL_RECOV_RESEND
;
1361 if (osc_should_shrink_grant(cli
))
1362 osc_shrink_grant_local(cli
, &body
->oa
);
1364 /* size[REQ_REC_OFF] still sizeof (*body) */
1365 if (opc
== OST_WRITE
) {
1366 if (cli
->cl_checksum
&&
1367 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1368 /* store cl_cksum_type in a local variable since
1369 * it can be changed via lprocfs
1371 enum cksum_type cksum_type
= cli
->cl_cksum_type
;
1373 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0) {
1374 oa
->o_flags
&= OBD_FL_LOCAL_MASK
;
1375 body
->oa
.o_flags
= 0;
1377 body
->oa
.o_flags
|= cksum_type_pack(cksum_type
);
1378 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1379 body
->oa
.o_cksum
= osc_checksum_bulk(requested_nob
,
1383 CDEBUG(D_PAGE
, "checksum at write origin: %x\n",
1385 /* save this in 'oa', too, for later checking */
1386 oa
->o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1387 oa
->o_flags
|= cksum_type_pack(cksum_type
);
1389 /* clear out the checksum flag, in case this is a
1390 * resend but cl_checksum is no longer set. b=11238
1392 oa
->o_valid
&= ~OBD_MD_FLCKSUM
;
1394 oa
->o_cksum
= body
->oa
.o_cksum
;
1395 /* 1 RC per niobuf */
1396 req_capsule_set_size(pill
, &RMF_RCS
, RCL_SERVER
,
1397 sizeof(__u32
) * niocount
);
1399 if (cli
->cl_checksum
&&
1400 !sptlrpc_flavor_has_bulk(&req
->rq_flvr
)) {
1401 if ((body
->oa
.o_valid
& OBD_MD_FLFLAGS
) == 0)
1402 body
->oa
.o_flags
= 0;
1403 body
->oa
.o_flags
|= cksum_type_pack(cli
->cl_cksum_type
);
1404 body
->oa
.o_valid
|= OBD_MD_FLCKSUM
| OBD_MD_FLFLAGS
;
1407 ptlrpc_request_set_replen(req
);
1409 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
1410 aa
= ptlrpc_req_async_args(req
);
1412 aa
->aa_requested_nob
= requested_nob
;
1413 aa
->aa_nio_count
= niocount
;
1414 aa
->aa_page_count
= page_count
;
1418 INIT_LIST_HEAD(&aa
->aa_oaps
);
1424 ptlrpc_req_finished(req
);
1428 static int check_write_checksum(struct obdo
*oa
, const lnet_process_id_t
*peer
,
1429 __u32 client_cksum
, __u32 server_cksum
, int nob
,
1430 u32 page_count
, struct brw_page
**pga
,
1431 enum cksum_type client_cksum_type
)
1435 enum cksum_type cksum_type
;
1437 if (server_cksum
== client_cksum
) {
1438 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1442 cksum_type
= cksum_type_unpack(oa
->o_valid
& OBD_MD_FLFLAGS
?
1444 new_cksum
= osc_checksum_bulk(nob
, page_count
, pga
, OST_WRITE
,
1447 if (cksum_type
!= client_cksum_type
)
1448 msg
= "the server did not use the checksum type specified in the original request - likely a protocol problem"
1450 else if (new_cksum
== server_cksum
)
1451 msg
= "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1453 else if (new_cksum
== client_cksum
)
1454 msg
= "changed in transit before arrival at OST";
1456 msg
= "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1459 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1460 " object "DOSTID
" extent [%llu-%llu]\n",
1461 msg
, libcfs_nid2str(peer
->nid
),
1462 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_seq
: (__u64
)0,
1463 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_oid
: 0,
1464 oa
->o_valid
& OBD_MD_FLFID
? oa
->o_parent_ver
: 0,
1465 POSTID(&oa
->o_oi
), pga
[0]->off
,
1466 pga
[page_count
-1]->off
+ pga
[page_count
-1]->count
- 1);
1467 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1468 client_cksum
, client_cksum_type
,
1469 server_cksum
, cksum_type
, new_cksum
);
1473 /* Note rc enters this function as number of bytes transferred */
1474 static int osc_brw_fini_request(struct ptlrpc_request
*req
, int rc
)
1476 struct osc_brw_async_args
*aa
= (void *)&req
->rq_async_args
;
1477 const lnet_process_id_t
*peer
=
1478 &req
->rq_import
->imp_connection
->c_peer
;
1479 struct client_obd
*cli
= aa
->aa_cli
;
1480 struct ost_body
*body
;
1481 __u32 client_cksum
= 0;
1483 if (rc
< 0 && rc
!= -EDQUOT
) {
1484 DEBUG_REQ(D_INFO
, req
, "Failed request with rc = %d\n", rc
);
1488 LASSERTF(req
->rq_repmsg
, "rc = %d\n", rc
);
1489 body
= req_capsule_server_get(&req
->rq_pill
, &RMF_OST_BODY
);
1491 DEBUG_REQ(D_INFO
, req
, "Can't unpack body\n");
1495 /* set/clear over quota flag for a uid/gid */
1496 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
&&
1497 body
->oa
.o_valid
& (OBD_MD_FLUSRQUOTA
| OBD_MD_FLGRPQUOTA
)) {
1498 unsigned int qid
[MAXQUOTAS
] = { body
->oa
.o_uid
, body
->oa
.o_gid
};
1500 CDEBUG(D_QUOTA
, "setdq for [%u %u] with valid %#llx, flags %x\n",
1501 body
->oa
.o_uid
, body
->oa
.o_gid
, body
->oa
.o_valid
,
1503 osc_quota_setdq(cli
, qid
, body
->oa
.o_valid
, body
->oa
.o_flags
);
1506 osc_update_grant(cli
, body
);
1511 if (aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
)
1512 client_cksum
= aa
->aa_oa
->o_cksum
; /* save for later */
1514 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
) {
1516 CERROR("Unexpected +ve rc %d\n", rc
);
1519 LASSERT(req
->rq_bulk
->bd_nob
== aa
->aa_requested_nob
);
1521 if (sptlrpc_cli_unwrap_bulk_write(req
, req
->rq_bulk
))
1524 if ((aa
->aa_oa
->o_valid
& OBD_MD_FLCKSUM
) && client_cksum
&&
1525 check_write_checksum(&body
->oa
, peer
, client_cksum
,
1526 body
->oa
.o_cksum
, aa
->aa_requested_nob
,
1527 aa
->aa_page_count
, aa
->aa_ppga
,
1528 cksum_type_unpack(aa
->aa_oa
->o_flags
)))
1531 rc
= check_write_rcs(req
, aa
->aa_requested_nob
,
1533 aa
->aa_page_count
, aa
->aa_ppga
);
1537 /* The rest of this function executes only for OST_READs */
1539 /* if unwrap_bulk failed, return -EAGAIN to retry */
1540 rc
= sptlrpc_cli_unwrap_bulk_read(req
, req
->rq_bulk
, rc
);
1546 if (rc
> aa
->aa_requested_nob
) {
1547 CERROR("Unexpected rc %d (%d requested)\n", rc
,
1548 aa
->aa_requested_nob
);
1552 if (rc
!= req
->rq_bulk
->bd_nob_transferred
) {
1553 CERROR("Unexpected rc %d (%d transferred)\n",
1554 rc
, req
->rq_bulk
->bd_nob_transferred
);
1558 if (rc
< aa
->aa_requested_nob
)
1559 handle_short_read(rc
, aa
->aa_page_count
, aa
->aa_ppga
);
1561 if (body
->oa
.o_valid
& OBD_MD_FLCKSUM
) {
1562 static int cksum_counter
;
1563 __u32 server_cksum
= body
->oa
.o_cksum
;
1566 enum cksum_type cksum_type
;
1568 cksum_type
= cksum_type_unpack(body
->oa
.o_valid
&OBD_MD_FLFLAGS
?
1569 body
->oa
.o_flags
: 0);
1570 client_cksum
= osc_checksum_bulk(rc
, aa
->aa_page_count
,
1571 aa
->aa_ppga
, OST_READ
,
1574 if (peer
->nid
!= req
->rq_bulk
->bd_sender
) {
1576 router
= libcfs_nid2str(req
->rq_bulk
->bd_sender
);
1579 if (server_cksum
!= client_cksum
) {
1580 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID
" object " DOSTID
" extent [%llu-%llu]\n",
1581 req
->rq_import
->imp_obd
->obd_name
,
1582 libcfs_nid2str(peer
->nid
),
1584 body
->oa
.o_valid
& OBD_MD_FLFID
?
1585 body
->oa
.o_parent_seq
: (__u64
)0,
1586 body
->oa
.o_valid
& OBD_MD_FLFID
?
1587 body
->oa
.o_parent_oid
: 0,
1588 body
->oa
.o_valid
& OBD_MD_FLFID
?
1589 body
->oa
.o_parent_ver
: 0,
1590 POSTID(&body
->oa
.o_oi
),
1591 aa
->aa_ppga
[0]->off
,
1592 aa
->aa_ppga
[aa
->aa_page_count
-1]->off
+
1593 aa
->aa_ppga
[aa
->aa_page_count
-1]->count
-
1595 CERROR("client %x, server %x, cksum_type %x\n",
1596 client_cksum
, server_cksum
, cksum_type
);
1598 aa
->aa_oa
->o_cksum
= client_cksum
;
1602 CDEBUG(D_PAGE
, "checksum %x confirmed\n", client_cksum
);
1605 } else if (unlikely(client_cksum
)) {
1606 static int cksum_missed
;
1609 if ((cksum_missed
& (-cksum_missed
)) == cksum_missed
)
1610 CERROR("Checksum %u requested from %s but not sent\n",
1611 cksum_missed
, libcfs_nid2str(peer
->nid
));
1617 lustre_get_wire_obdo(&req
->rq_import
->imp_connect_data
,
1618 aa
->aa_oa
, &body
->oa
);
1623 static int osc_brw_redo_request(struct ptlrpc_request
*request
,
1624 struct osc_brw_async_args
*aa
, int rc
)
1626 struct ptlrpc_request
*new_req
;
1627 struct osc_brw_async_args
*new_aa
;
1628 struct osc_async_page
*oap
;
1630 DEBUG_REQ(rc
== -EINPROGRESS
? D_RPCTRACE
: D_ERROR
, request
,
1631 "redo for recoverable error %d", rc
);
1633 rc
= osc_brw_prep_request(lustre_msg_get_opc(request
->rq_reqmsg
) ==
1634 OST_WRITE
? OBD_BRW_WRITE
: OBD_BRW_READ
,
1635 aa
->aa_cli
, aa
->aa_oa
,
1636 NULL
/* lsm unused by osc currently */,
1637 aa
->aa_page_count
, aa
->aa_ppga
,
1642 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
1643 if (oap
->oap_request
) {
1644 LASSERTF(request
== oap
->oap_request
,
1645 "request %p != oap_request %p\n",
1646 request
, oap
->oap_request
);
1647 if (oap
->oap_interrupted
) {
1648 ptlrpc_req_finished(new_req
);
1653 /* New request takes over pga and oaps from old request.
1654 * Note that copying a list_head doesn't work, need to move it...
1657 new_req
->rq_interpret_reply
= request
->rq_interpret_reply
;
1658 new_req
->rq_async_args
= request
->rq_async_args
;
1659 new_req
->rq_commit_cb
= request
->rq_commit_cb
;
1660 /* cap resend delay to the current request timeout, this is similar to
1661 * what ptlrpc does (see after_reply())
1663 if (aa
->aa_resends
> new_req
->rq_timeout
)
1664 new_req
->rq_sent
= ktime_get_real_seconds() + new_req
->rq_timeout
;
1666 new_req
->rq_sent
= ktime_get_real_seconds() + aa
->aa_resends
;
1667 new_req
->rq_generation_set
= 1;
1668 new_req
->rq_import_generation
= request
->rq_import_generation
;
1670 new_aa
= ptlrpc_req_async_args(new_req
);
1672 INIT_LIST_HEAD(&new_aa
->aa_oaps
);
1673 list_splice_init(&aa
->aa_oaps
, &new_aa
->aa_oaps
);
1674 INIT_LIST_HEAD(&new_aa
->aa_exts
);
1675 list_splice_init(&aa
->aa_exts
, &new_aa
->aa_exts
);
1676 new_aa
->aa_resends
= aa
->aa_resends
;
1678 list_for_each_entry(oap
, &new_aa
->aa_oaps
, oap_rpc_item
) {
1679 if (oap
->oap_request
) {
1680 ptlrpc_req_finished(oap
->oap_request
);
1681 oap
->oap_request
= ptlrpc_request_addref(new_req
);
1685 /* XXX: This code will run into problem if we're going to support
1686 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1687 * and wait for all of them to be finished. We should inherit request
1688 * set from old request.
1690 ptlrpcd_add_req(new_req
);
1692 DEBUG_REQ(D_INFO
, new_req
, "new request");
1697 * ugh, we want disk allocation on the target to happen in offset order. we'll
1698 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1699 * fine for our small page arrays and doesn't require allocation. its an
1700 * insertion sort that swaps elements that are strides apart, shrinking the
1701 * stride down until its '1' and the array is sorted.
1703 static void sort_brw_pages(struct brw_page
**array
, int num
)
1706 struct brw_page
*tmp
;
1710 for (stride
= 1; stride
< num
; stride
= (stride
* 3) + 1)
1715 for (i
= stride
; i
< num
; i
++) {
1718 while (j
>= stride
&& array
[j
- stride
]->off
> tmp
->off
) {
1719 array
[j
] = array
[j
- stride
];
1724 } while (stride
> 1);
1727 static void osc_release_ppga(struct brw_page
**ppga
, u32 count
)
1733 static int brw_interpret(const struct lu_env
*env
,
1734 struct ptlrpc_request
*req
, void *data
, int rc
)
1736 struct osc_brw_async_args
*aa
= data
;
1737 struct osc_extent
*ext
;
1738 struct osc_extent
*tmp
;
1739 struct client_obd
*cli
= aa
->aa_cli
;
1741 rc
= osc_brw_fini_request(req
, rc
);
1742 CDEBUG(D_INODE
, "request %p aa %p rc %d\n", req
, aa
, rc
);
1743 /* When server return -EINPROGRESS, client should always retry
1744 * regardless of the number of times the bulk was resent already.
1746 if (osc_recoverable_error(rc
)) {
1747 if (req
->rq_import_generation
!=
1748 req
->rq_import
->imp_generation
) {
1749 CDEBUG(D_HA
, "%s: resend cross eviction for object: " DOSTID
", rc = %d.\n",
1750 req
->rq_import
->imp_obd
->obd_name
,
1751 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1752 } else if (rc
== -EINPROGRESS
||
1753 client_should_resend(aa
->aa_resends
, aa
->aa_cli
)) {
1754 rc
= osc_brw_redo_request(req
, aa
, rc
);
1756 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1757 req
->rq_import
->imp_obd
->obd_name
,
1758 POSTID(&aa
->aa_oa
->o_oi
), rc
);
1763 else if (rc
== -EAGAIN
|| rc
== -EINPROGRESS
)
1768 struct obdo
*oa
= aa
->aa_oa
;
1769 struct cl_attr
*attr
= &osc_env_info(env
)->oti_attr
;
1770 unsigned long valid
= 0;
1771 struct cl_object
*obj
;
1772 struct osc_async_page
*last
;
1774 last
= brw_page2oap(aa
->aa_ppga
[aa
->aa_page_count
- 1]);
1775 obj
= osc2cl(last
->oap_obj
);
1777 cl_object_attr_lock(obj
);
1778 if (oa
->o_valid
& OBD_MD_FLBLOCKS
) {
1779 attr
->cat_blocks
= oa
->o_blocks
;
1780 valid
|= CAT_BLOCKS
;
1782 if (oa
->o_valid
& OBD_MD_FLMTIME
) {
1783 attr
->cat_mtime
= oa
->o_mtime
;
1786 if (oa
->o_valid
& OBD_MD_FLATIME
) {
1787 attr
->cat_atime
= oa
->o_atime
;
1790 if (oa
->o_valid
& OBD_MD_FLCTIME
) {
1791 attr
->cat_ctime
= oa
->o_ctime
;
1795 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
) {
1796 struct lov_oinfo
*loi
= cl2osc(obj
)->oo_oinfo
;
1797 loff_t last_off
= last
->oap_count
+ last
->oap_obj_off
;
1799 /* Change file size if this is an out of quota or
1800 * direct IO write and it extends the file size
1802 if (loi
->loi_lvb
.lvb_size
< last_off
) {
1803 attr
->cat_size
= last_off
;
1806 /* Extend KMS if it's not a lockless write */
1807 if (loi
->loi_kms
< last_off
&&
1808 oap2osc_page(last
)->ops_srvlock
== 0) {
1809 attr
->cat_kms
= last_off
;
1815 cl_object_attr_set(env
, obj
, attr
, valid
);
1816 cl_object_attr_unlock(obj
);
1818 kmem_cache_free(obdo_cachep
, aa
->aa_oa
);
1820 list_for_each_entry_safe(ext
, tmp
, &aa
->aa_exts
, oe_link
) {
1821 list_del_init(&ext
->oe_link
);
1822 osc_extent_finish(env
, ext
, 1, rc
);
1824 LASSERT(list_empty(&aa
->aa_exts
));
1825 LASSERT(list_empty(&aa
->aa_oaps
));
1827 cl_req_completion(env
, aa
->aa_clerq
, rc
< 0 ? rc
:
1828 req
->rq_bulk
->bd_nob_transferred
);
1829 osc_release_ppga(aa
->aa_ppga
, aa
->aa_page_count
);
1830 ptlrpc_lprocfs_brw(req
, req
->rq_bulk
->bd_nob_transferred
);
1832 spin_lock(&cli
->cl_loi_list_lock
);
1833 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1834 * is called so we know whether to go to sync BRWs or wait for more
1837 if (lustre_msg_get_opc(req
->rq_reqmsg
) == OST_WRITE
)
1838 cli
->cl_w_in_flight
--;
1840 cli
->cl_r_in_flight
--;
1841 osc_wake_cache_waiters(cli
);
1842 spin_unlock(&cli
->cl_loi_list_lock
);
1844 osc_io_unplug(env
, cli
, NULL
);
1848 static void brw_commit(struct ptlrpc_request
*req
)
1850 spin_lock(&req
->rq_lock
);
1852 * If osc_inc_unstable_pages (via osc_extent_finish) races with
1853 * this called via the rq_commit_cb, I need to ensure
1854 * osc_dec_unstable_pages is still called. Otherwise unstable
1855 * pages may be leaked.
1857 if (req
->rq_unstable
) {
1858 spin_unlock(&req
->rq_lock
);
1859 osc_dec_unstable_pages(req
);
1860 spin_lock(&req
->rq_lock
);
1862 req
->rq_committed
= 1;
1864 spin_unlock(&req
->rq_lock
);
1868 * Build an RPC by the list of extent @ext_list. The caller must ensure
1869 * that the total pages in this list are NOT over max pages per RPC.
1870 * Extents in the list must be in OES_RPC state.
1872 int osc_build_rpc(const struct lu_env
*env
, struct client_obd
*cli
,
1873 struct list_head
*ext_list
, int cmd
)
1875 struct ptlrpc_request
*req
= NULL
;
1876 struct osc_extent
*ext
;
1877 struct brw_page
**pga
= NULL
;
1878 struct osc_brw_async_args
*aa
= NULL
;
1879 struct obdo
*oa
= NULL
;
1880 struct osc_async_page
*oap
;
1881 struct osc_async_page
*tmp
;
1882 struct cl_req
*clerq
= NULL
;
1883 enum cl_req_type crt
= (cmd
& OBD_BRW_WRITE
) ? CRT_WRITE
: CRT_READ
;
1884 struct ldlm_lock
*lock
= NULL
;
1885 struct cl_req_attr
*crattr
= NULL
;
1886 u64 starting_offset
= OBD_OBJECT_EOF
;
1887 u64 ending_offset
= 0;
1893 struct ost_body
*body
;
1894 LIST_HEAD(rpc_list
);
1896 LASSERT(!list_empty(ext_list
));
1898 /* add pages into rpc_list to build BRW rpc */
1899 list_for_each_entry(ext
, ext_list
, oe_link
) {
1900 LASSERT(ext
->oe_state
== OES_RPC
);
1901 mem_tight
|= ext
->oe_memalloc
;
1902 list_for_each_entry(oap
, &ext
->oe_pages
, oap_pending_item
) {
1904 list_add_tail(&oap
->oap_rpc_item
, &rpc_list
);
1905 if (starting_offset
> oap
->oap_obj_off
)
1906 starting_offset
= oap
->oap_obj_off
;
1908 LASSERT(oap
->oap_page_off
== 0);
1909 if (ending_offset
< oap
->oap_obj_off
+ oap
->oap_count
)
1910 ending_offset
= oap
->oap_obj_off
+
1913 LASSERT(oap
->oap_page_off
+ oap
->oap_count
==
1919 mpflag
= cfs_memory_pressure_get_and_set();
1921 crattr
= kzalloc(sizeof(*crattr
), GFP_NOFS
);
1927 pga
= kcalloc(page_count
, sizeof(*pga
), GFP_NOFS
);
1933 oa
= kmem_cache_zalloc(obdo_cachep
, GFP_NOFS
);
1940 list_for_each_entry(oap
, &rpc_list
, oap_rpc_item
) {
1941 struct cl_page
*page
= oap2cl_page(oap
);
1944 clerq
= cl_req_alloc(env
, page
, crt
,
1945 1 /* only 1-object rpcs for now */);
1946 if (IS_ERR(clerq
)) {
1947 rc
= PTR_ERR(clerq
);
1950 lock
= oap
->oap_ldlm_lock
;
1953 oap
->oap_brw_flags
|= OBD_BRW_MEMALLOC
;
1954 pga
[i
] = &oap
->oap_brw_page
;
1955 pga
[i
]->off
= oap
->oap_obj_off
+ oap
->oap_page_off
;
1956 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1957 pga
[i
]->pg
, oap
->oap_page
->index
, oap
,
1960 cl_req_page_add(env
, clerq
, page
);
1963 /* always get the data for the obdo for the rpc */
1965 crattr
->cra_oa
= oa
;
1966 cl_req_attr_set(env
, clerq
, crattr
, ~0ULL);
1968 oa
->o_handle
= lock
->l_remote_handle
;
1969 oa
->o_valid
|= OBD_MD_FLHANDLE
;
1972 rc
= cl_req_prep(env
, clerq
);
1974 CERROR("cl_req_prep failed: %d\n", rc
);
1978 sort_brw_pages(pga
, page_count
);
1979 rc
= osc_brw_prep_request(cmd
, cli
, oa
, NULL
, page_count
,
1982 CERROR("prep_req failed: %d\n", rc
);
1986 req
->rq_commit_cb
= brw_commit
;
1987 req
->rq_interpret_reply
= brw_interpret
;
1990 req
->rq_memalloc
= 1;
1992 /* Need to update the timestamps after the request is built in case
1993 * we race with setattr (locally or in queue at OST). If OST gets
1994 * later setattr before earlier BRW (as determined by the request xid),
1995 * the OST will not use BRW timestamps. Sadly, there is no obvious
1996 * way to do this in a single call. bug 10150
1998 body
= req_capsule_client_get(&req
->rq_pill
, &RMF_OST_BODY
);
1999 crattr
->cra_oa
= &body
->oa
;
2000 cl_req_attr_set(env
, clerq
, crattr
,
2001 OBD_MD_FLMTIME
|OBD_MD_FLCTIME
|OBD_MD_FLATIME
);
2003 lustre_msg_set_jobid(req
->rq_reqmsg
, crattr
->cra_jobid
);
2005 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2006 aa
= ptlrpc_req_async_args(req
);
2007 INIT_LIST_HEAD(&aa
->aa_oaps
);
2008 list_splice_init(&rpc_list
, &aa
->aa_oaps
);
2009 INIT_LIST_HEAD(&aa
->aa_exts
);
2010 list_splice_init(ext_list
, &aa
->aa_exts
);
2011 aa
->aa_clerq
= clerq
;
2013 /* queued sync pages can be torn down while the pages
2014 * were between the pending list and the rpc
2017 list_for_each_entry(oap
, &aa
->aa_oaps
, oap_rpc_item
) {
2018 /* only one oap gets a request reference */
2021 if (oap
->oap_interrupted
&& !req
->rq_intr
) {
2022 CDEBUG(D_INODE
, "oap %p in req %p interrupted\n",
2024 ptlrpc_mark_interrupted(req
);
2028 tmp
->oap_request
= ptlrpc_request_addref(req
);
2030 spin_lock(&cli
->cl_loi_list_lock
);
2031 starting_offset
>>= PAGE_SHIFT
;
2032 if (cmd
== OBD_BRW_READ
) {
2033 cli
->cl_r_in_flight
++;
2034 lprocfs_oh_tally_log2(&cli
->cl_read_page_hist
, page_count
);
2035 lprocfs_oh_tally(&cli
->cl_read_rpc_hist
, cli
->cl_r_in_flight
);
2036 lprocfs_oh_tally_log2(&cli
->cl_read_offset_hist
,
2037 starting_offset
+ 1);
2039 cli
->cl_w_in_flight
++;
2040 lprocfs_oh_tally_log2(&cli
->cl_write_page_hist
, page_count
);
2041 lprocfs_oh_tally(&cli
->cl_write_rpc_hist
, cli
->cl_w_in_flight
);
2042 lprocfs_oh_tally_log2(&cli
->cl_write_offset_hist
,
2043 starting_offset
+ 1);
2045 spin_unlock(&cli
->cl_loi_list_lock
);
2047 DEBUG_REQ(D_INODE
, req
, "%d pages, aa %p. now %dr/%dw in flight",
2048 page_count
, aa
, cli
->cl_r_in_flight
,
2049 cli
->cl_w_in_flight
);
2051 ptlrpcd_add_req(req
);
2056 cfs_memory_pressure_restore(mpflag
);
2064 kmem_cache_free(obdo_cachep
, oa
);
2066 /* this should happen rarely and is pretty bad, it makes the
2067 * pending list not follow the dirty order
2069 while (!list_empty(ext_list
)) {
2070 ext
= list_entry(ext_list
->next
, struct osc_extent
,
2072 list_del_init(&ext
->oe_link
);
2073 osc_extent_finish(env
, ext
, 0, rc
);
2075 if (clerq
&& !IS_ERR(clerq
))
2076 cl_req_completion(env
, clerq
, rc
);
2081 static int osc_set_lock_data_with_check(struct ldlm_lock
*lock
,
2082 struct ldlm_enqueue_info
*einfo
)
2084 void *data
= einfo
->ei_cbdata
;
2087 LASSERT(lock
->l_blocking_ast
== einfo
->ei_cb_bl
);
2088 LASSERT(lock
->l_resource
->lr_type
== einfo
->ei_type
);
2089 LASSERT(lock
->l_completion_ast
== einfo
->ei_cb_cp
);
2090 LASSERT(lock
->l_glimpse_ast
== einfo
->ei_cb_gl
);
2092 lock_res_and_lock(lock
);
2094 if (!lock
->l_ast_data
)
2095 lock
->l_ast_data
= data
;
2096 if (lock
->l_ast_data
== data
)
2099 unlock_res_and_lock(lock
);
2104 static int osc_set_data_with_check(struct lustre_handle
*lockh
,
2105 struct ldlm_enqueue_info
*einfo
)
2107 struct ldlm_lock
*lock
= ldlm_handle2lock(lockh
);
2111 set
= osc_set_lock_data_with_check(lock
, einfo
);
2112 LDLM_LOCK_PUT(lock
);
2114 CERROR("lockh %p, data %p - client evicted?\n",
2115 lockh
, einfo
->ei_cbdata
);
2119 /* find any ldlm lock of the inode in osc
2124 static int osc_find_cbdata(struct obd_export
*exp
, struct lov_stripe_md
*lsm
,
2125 ldlm_iterator_t replace
, void *data
)
2127 struct ldlm_res_id res_id
;
2128 struct obd_device
*obd
= class_exp2obd(exp
);
2131 ostid_build_res_name(&lsm
->lsm_oi
, &res_id
);
2132 rc
= ldlm_resource_iterate(obd
->obd_namespace
, &res_id
, replace
, data
);
2133 if (rc
== LDLM_ITER_STOP
)
2135 if (rc
== LDLM_ITER_CONTINUE
)
2140 static int osc_enqueue_fini(struct ptlrpc_request
*req
,
2141 osc_enqueue_upcall_f upcall
, void *cookie
,
2142 struct lustre_handle
*lockh
, enum ldlm_mode mode
,
2143 __u64
*flags
, int agl
, int errcode
)
2145 bool intent
= *flags
& LDLM_FL_HAS_INTENT
;
2148 /* The request was created before ldlm_cli_enqueue call. */
2149 if (intent
&& errcode
== ELDLM_LOCK_ABORTED
) {
2150 struct ldlm_reply
*rep
;
2152 rep
= req_capsule_server_get(&req
->rq_pill
, &RMF_DLM_REP
);
2154 rep
->lock_policy_res1
=
2155 ptlrpc_status_ntoh(rep
->lock_policy_res1
);
2156 if (rep
->lock_policy_res1
)
2157 errcode
= rep
->lock_policy_res1
;
2159 *flags
|= LDLM_FL_LVB_READY
;
2160 } else if (errcode
== ELDLM_OK
) {
2161 *flags
|= LDLM_FL_LVB_READY
;
2164 /* Call the update callback. */
2165 rc
= (*upcall
)(cookie
, lockh
, errcode
);
2166 /* release the reference taken in ldlm_cli_enqueue() */
2167 if (errcode
== ELDLM_LOCK_MATCHED
)
2169 if (errcode
== ELDLM_OK
&& lustre_handle_is_used(lockh
))
2170 ldlm_lock_decref(lockh
, mode
);
2175 static int osc_enqueue_interpret(const struct lu_env
*env
,
2176 struct ptlrpc_request
*req
,
2177 struct osc_enqueue_args
*aa
, int rc
)
2179 struct ldlm_lock
*lock
;
2180 struct lustre_handle
*lockh
= &aa
->oa_lockh
;
2181 enum ldlm_mode mode
= aa
->oa_mode
;
2182 struct ost_lvb
*lvb
= aa
->oa_lvb
;
2183 __u32 lvb_len
= sizeof(*lvb
);
2187 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2190 lock
= ldlm_handle2lock(lockh
);
2191 LASSERTF(lock
, "lockh %llx, req %p, aa %p - client evicted?\n",
2192 lockh
->cookie
, req
, aa
);
2194 /* Take an additional reference so that a blocking AST that
2195 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2196 * to arrive after an upcall has been executed by
2197 * osc_enqueue_fini().
2199 ldlm_lock_addref(lockh
, mode
);
2201 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2202 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG
, 2);
2204 /* Let CP AST to grant the lock first. */
2205 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE
, 1);
2208 LASSERT(!aa
->oa_lvb
);
2209 LASSERT(!aa
->oa_flags
);
2210 aa
->oa_flags
= &flags
;
2213 /* Complete obtaining the lock procedure. */
2214 rc
= ldlm_cli_enqueue_fini(aa
->oa_exp
, req
, aa
->oa_type
, 1,
2215 aa
->oa_mode
, aa
->oa_flags
, lvb
, lvb_len
,
2217 /* Complete osc stuff. */
2218 rc
= osc_enqueue_fini(req
, aa
->oa_upcall
, aa
->oa_cookie
, lockh
, mode
,
2219 aa
->oa_flags
, aa
->oa_agl
, rc
);
2221 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE
, 10);
2223 ldlm_lock_decref(lockh
, mode
);
2224 LDLM_LOCK_PUT(lock
);
2228 struct ptlrpc_request_set
*PTLRPCD_SET
= (void *)1;
2230 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2231 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2232 * other synchronous requests, however keeping some locks and trying to obtain
2233 * others may take a considerable amount of time in a case of ost failure; and
2234 * when other sync requests do not get released lock from a client, the client
2235 * is evicted from the cluster -- such scenaries make the life difficult, so
2236 * release locks just after they are obtained.
2238 int osc_enqueue_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2239 __u64
*flags
, ldlm_policy_data_t
*policy
,
2240 struct ost_lvb
*lvb
, int kms_valid
,
2241 osc_enqueue_upcall_f upcall
, void *cookie
,
2242 struct ldlm_enqueue_info
*einfo
,
2243 struct ptlrpc_request_set
*rqset
, int async
, int agl
)
2245 struct obd_device
*obd
= exp
->exp_obd
;
2246 struct lustre_handle lockh
= { 0 };
2247 struct ptlrpc_request
*req
= NULL
;
2248 int intent
= *flags
& LDLM_FL_HAS_INTENT
;
2249 __u64 match_flags
= *flags
;
2250 enum ldlm_mode mode
;
2253 /* Filesystem lock extents are extended to page boundaries so that
2254 * dealing with the page cache is a little smoother.
2256 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~PAGE_MASK
;
2257 policy
->l_extent
.end
|= ~PAGE_MASK
;
2260 * kms is not valid when either object is completely fresh (so that no
2261 * locks are cached), or object was evicted. In the latter case cached
2262 * lock cannot be used, because it would prime inode state with
2263 * potentially stale LVB.
2268 /* Next, search for already existing extent locks that will cover us */
2269 /* If we're trying to read, we also search for an existing PW lock. The
2270 * VFS and page cache already protect us locally, so lots of readers/
2271 * writers can share a single PW lock.
2273 * There are problems with conversion deadlocks, so instead of
2274 * converting a read lock to a write lock, we'll just enqueue a new
2277 * At some point we should cancel the read lock instead of making them
2278 * send us a blocking callback, but there are problems with canceling
2279 * locks out from other users right now, too.
2281 mode
= einfo
->ei_mode
;
2282 if (einfo
->ei_mode
== LCK_PR
)
2285 match_flags
|= LDLM_FL_LVB_READY
;
2287 match_flags
|= LDLM_FL_BLOCK_GRANTED
;
2288 mode
= ldlm_lock_match(obd
->obd_namespace
, match_flags
, res_id
,
2289 einfo
->ei_type
, policy
, mode
, &lockh
, 0);
2291 struct ldlm_lock
*matched
;
2293 if (*flags
& LDLM_FL_TEST_LOCK
)
2296 matched
= ldlm_handle2lock(&lockh
);
2298 /* AGL enqueues DLM locks speculatively. Therefore if
2299 * it already exists a DLM lock, it wll just inform the
2300 * caller to cancel the AGL process for this stripe.
2302 ldlm_lock_decref(&lockh
, mode
);
2303 LDLM_LOCK_PUT(matched
);
2305 } else if (osc_set_lock_data_with_check(matched
, einfo
)) {
2306 *flags
|= LDLM_FL_LVB_READY
;
2307 /* We already have a lock, and it's referenced. */
2308 (*upcall
)(cookie
, &lockh
, ELDLM_LOCK_MATCHED
);
2310 ldlm_lock_decref(&lockh
, mode
);
2311 LDLM_LOCK_PUT(matched
);
2314 ldlm_lock_decref(&lockh
, mode
);
2315 LDLM_LOCK_PUT(matched
);
2320 if (*flags
& LDLM_FL_TEST_LOCK
)
2323 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2324 &RQF_LDLM_ENQUEUE_LVB
);
2328 rc
= ldlm_prep_enqueue_req(exp
, req
, NULL
, 0);
2330 ptlrpc_request_free(req
);
2334 req_capsule_set_size(&req
->rq_pill
, &RMF_DLM_LVB
, RCL_SERVER
,
2336 ptlrpc_request_set_replen(req
);
2339 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2340 *flags
&= ~LDLM_FL_BLOCK_GRANTED
;
2342 rc
= ldlm_cli_enqueue(exp
, &req
, einfo
, res_id
, policy
, flags
, lvb
,
2343 sizeof(*lvb
), LVB_T_OST
, &lockh
, async
);
2346 struct osc_enqueue_args
*aa
;
2348 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2349 aa
= ptlrpc_req_async_args(req
);
2351 aa
->oa_mode
= einfo
->ei_mode
;
2352 aa
->oa_type
= einfo
->ei_type
;
2353 lustre_handle_copy(&aa
->oa_lockh
, &lockh
);
2354 aa
->oa_upcall
= upcall
;
2355 aa
->oa_cookie
= cookie
;
2358 aa
->oa_flags
= flags
;
2361 /* AGL is essentially to enqueue an DLM lock
2362 * in advance, so we don't care about the
2363 * result of AGL enqueue.
2366 aa
->oa_flags
= NULL
;
2369 req
->rq_interpret_reply
=
2370 (ptlrpc_interpterer_t
)osc_enqueue_interpret
;
2371 if (rqset
== PTLRPCD_SET
)
2372 ptlrpcd_add_req(req
);
2374 ptlrpc_set_add_req(rqset
, req
);
2375 } else if (intent
) {
2376 ptlrpc_req_finished(req
);
2381 rc
= osc_enqueue_fini(req
, upcall
, cookie
, &lockh
, einfo
->ei_mode
,
2384 ptlrpc_req_finished(req
);
2389 int osc_match_base(struct obd_export
*exp
, struct ldlm_res_id
*res_id
,
2390 __u32 type
, ldlm_policy_data_t
*policy
, __u32 mode
,
2391 __u64
*flags
, void *data
, struct lustre_handle
*lockh
,
2394 struct obd_device
*obd
= exp
->exp_obd
;
2395 __u64 lflags
= *flags
;
2398 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH
))
2401 /* Filesystem lock extents are extended to page boundaries so that
2402 * dealing with the page cache is a little smoother
2404 policy
->l_extent
.start
-= policy
->l_extent
.start
& ~PAGE_MASK
;
2405 policy
->l_extent
.end
|= ~PAGE_MASK
;
2407 /* Next, search for already existing extent locks that will cover us */
2408 /* If we're trying to read, we also search for an existing PW lock. The
2409 * VFS and page cache already protect us locally, so lots of readers/
2410 * writers can share a single PW lock.
2415 rc
= ldlm_lock_match(obd
->obd_namespace
, lflags
,
2416 res_id
, type
, policy
, rc
, lockh
, unref
);
2419 if (!osc_set_data_with_check(lockh
, data
)) {
2420 if (!(lflags
& LDLM_FL_TEST_LOCK
))
2421 ldlm_lock_decref(lockh
, rc
);
2425 if (!(lflags
& LDLM_FL_TEST_LOCK
) && mode
!= rc
) {
2426 ldlm_lock_addref(lockh
, LCK_PR
);
2427 ldlm_lock_decref(lockh
, LCK_PW
);
2434 int osc_cancel_base(struct lustre_handle
*lockh
, __u32 mode
)
2436 if (unlikely(mode
== LCK_GROUP
))
2437 ldlm_lock_decref_and_cancel(lockh
, mode
);
2439 ldlm_lock_decref(lockh
, mode
);
2444 static int osc_statfs_interpret(const struct lu_env
*env
,
2445 struct ptlrpc_request
*req
,
2446 struct osc_async_args
*aa
, int rc
)
2448 struct obd_statfs
*msfs
;
2451 /* The request has in fact never been sent
2452 * due to issues at a higher level (LOV).
2453 * Exit immediately since the caller is
2454 * aware of the problem and takes care
2459 if ((rc
== -ENOTCONN
|| rc
== -EAGAIN
) &&
2460 (aa
->aa_oi
->oi_flags
& OBD_STATFS_NODELAY
)) {
2468 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2474 *aa
->aa_oi
->oi_osfs
= *msfs
;
2476 rc
= aa
->aa_oi
->oi_cb_up(aa
->aa_oi
, rc
);
2480 static int osc_statfs_async(struct obd_export
*exp
,
2481 struct obd_info
*oinfo
, __u64 max_age
,
2482 struct ptlrpc_request_set
*rqset
)
2484 struct obd_device
*obd
= class_exp2obd(exp
);
2485 struct ptlrpc_request
*req
;
2486 struct osc_async_args
*aa
;
2489 /* We could possibly pass max_age in the request (as an absolute
2490 * timestamp or a "seconds.usec ago") so the target can avoid doing
2491 * extra calls into the filesystem if that isn't necessary (e.g.
2492 * during mount that would help a bit). Having relative timestamps
2493 * is not so great if request processing is slow, while absolute
2494 * timestamps are not ideal because they need time synchronization.
2496 req
= ptlrpc_request_alloc(obd
->u
.cli
.cl_import
, &RQF_OST_STATFS
);
2500 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2502 ptlrpc_request_free(req
);
2505 ptlrpc_request_set_replen(req
);
2506 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2507 ptlrpc_at_set_req_timeout(req
);
2509 if (oinfo
->oi_flags
& OBD_STATFS_NODELAY
) {
2510 /* procfs requests not want stat in wait for avoid deadlock */
2511 req
->rq_no_resend
= 1;
2512 req
->rq_no_delay
= 1;
2515 req
->rq_interpret_reply
= (ptlrpc_interpterer_t
)osc_statfs_interpret
;
2516 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2517 aa
= ptlrpc_req_async_args(req
);
2520 ptlrpc_set_add_req(rqset
, req
);
2524 static int osc_statfs(const struct lu_env
*env
, struct obd_export
*exp
,
2525 struct obd_statfs
*osfs
, __u64 max_age
, __u32 flags
)
2527 struct obd_device
*obd
= class_exp2obd(exp
);
2528 struct obd_statfs
*msfs
;
2529 struct ptlrpc_request
*req
;
2530 struct obd_import
*imp
= NULL
;
2533 /* Since the request might also come from lprocfs, so we need
2534 * sync this with client_disconnect_export Bug15684
2536 down_read(&obd
->u
.cli
.cl_sem
);
2537 if (obd
->u
.cli
.cl_import
)
2538 imp
= class_import_get(obd
->u
.cli
.cl_import
);
2539 up_read(&obd
->u
.cli
.cl_sem
);
2543 /* We could possibly pass max_age in the request (as an absolute
2544 * timestamp or a "seconds.usec ago") so the target can avoid doing
2545 * extra calls into the filesystem if that isn't necessary (e.g.
2546 * during mount that would help a bit). Having relative timestamps
2547 * is not so great if request processing is slow, while absolute
2548 * timestamps are not ideal because they need time synchronization.
2550 req
= ptlrpc_request_alloc(imp
, &RQF_OST_STATFS
);
2552 class_import_put(imp
);
2557 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_STATFS
);
2559 ptlrpc_request_free(req
);
2562 ptlrpc_request_set_replen(req
);
2563 req
->rq_request_portal
= OST_CREATE_PORTAL
;
2564 ptlrpc_at_set_req_timeout(req
);
2566 if (flags
& OBD_STATFS_NODELAY
) {
2567 /* procfs requests not want stat in wait for avoid deadlock */
2568 req
->rq_no_resend
= 1;
2569 req
->rq_no_delay
= 1;
2572 rc
= ptlrpc_queue_wait(req
);
2576 msfs
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_STATFS
);
2585 ptlrpc_req_finished(req
);
2589 /* Retrieve object striping information.
2591 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2592 * the maximum number of OST indices which will fit in the user buffer.
2593 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2595 static int osc_getstripe(struct lov_stripe_md
*lsm
,
2596 struct lov_user_md __user
*lump
)
2598 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2599 struct lov_user_md_v3 lum
, *lumk
;
2600 struct lov_user_ost_data_v1
*lmm_objects
;
2601 int rc
= 0, lum_size
;
2606 /* we only need the header part from user space to get lmm_magic and
2607 * lmm_stripe_count, (the header part is common to v1 and v3)
2609 lum_size
= sizeof(struct lov_user_md_v1
);
2610 if (copy_from_user(&lum
, lump
, lum_size
))
2613 if ((lum
.lmm_magic
!= LOV_USER_MAGIC_V1
) &&
2614 (lum
.lmm_magic
!= LOV_USER_MAGIC_V3
))
2617 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2618 LASSERT(sizeof(struct lov_user_md_v1
) == sizeof(struct lov_mds_md_v1
));
2619 LASSERT(sizeof(struct lov_user_md_v3
) == sizeof(struct lov_mds_md_v3
));
2620 LASSERT(sizeof(lum
.lmm_objects
[0]) == sizeof(lumk
->lmm_objects
[0]));
2622 /* we can use lov_mds_md_size() to compute lum_size
2623 * because lov_user_md_vX and lov_mds_md_vX have the same size
2625 if (lum
.lmm_stripe_count
> 0) {
2626 lum_size
= lov_mds_md_size(lum
.lmm_stripe_count
, lum
.lmm_magic
);
2627 lumk
= kzalloc(lum_size
, GFP_NOFS
);
2631 if (lum
.lmm_magic
== LOV_USER_MAGIC_V1
)
2633 &(((struct lov_user_md_v1
*)lumk
)->lmm_objects
[0]);
2635 lmm_objects
= &(lumk
->lmm_objects
[0]);
2636 lmm_objects
->l_ost_oi
= lsm
->lsm_oi
;
2638 lum_size
= lov_mds_md_size(0, lum
.lmm_magic
);
2642 lumk
->lmm_oi
= lsm
->lsm_oi
;
2643 lumk
->lmm_stripe_count
= 1;
2645 if (copy_to_user(lump
, lumk
, lum_size
))
2654 static int osc_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
2655 void *karg
, void __user
*uarg
)
2657 struct obd_device
*obd
= exp
->exp_obd
;
2658 struct obd_ioctl_data
*data
= karg
;
2661 if (!try_module_get(THIS_MODULE
)) {
2662 CERROR("%s: cannot get module '%s'\n", obd
->obd_name
,
2663 module_name(THIS_MODULE
));
2667 case OBD_IOC_LOV_GET_CONFIG
: {
2669 struct lov_desc
*desc
;
2670 struct obd_uuid uuid
;
2674 if (obd_ioctl_getdata(&buf
, &len
, uarg
)) {
2679 data
= (struct obd_ioctl_data
*)buf
;
2681 if (sizeof(*desc
) > data
->ioc_inllen1
) {
2682 obd_ioctl_freedata(buf
, len
);
2687 if (data
->ioc_inllen2
< sizeof(uuid
)) {
2688 obd_ioctl_freedata(buf
, len
);
2693 desc
= (struct lov_desc
*)data
->ioc_inlbuf1
;
2694 desc
->ld_tgt_count
= 1;
2695 desc
->ld_active_tgt_count
= 1;
2696 desc
->ld_default_stripe_count
= 1;
2697 desc
->ld_default_stripe_size
= 0;
2698 desc
->ld_default_stripe_offset
= 0;
2699 desc
->ld_pattern
= 0;
2700 memcpy(&desc
->ld_uuid
, &obd
->obd_uuid
, sizeof(uuid
));
2702 memcpy(data
->ioc_inlbuf2
, &obd
->obd_uuid
, sizeof(uuid
));
2704 err
= copy_to_user(uarg
, buf
, len
);
2707 obd_ioctl_freedata(buf
, len
);
2710 case LL_IOC_LOV_SETSTRIPE
:
2711 err
= obd_alloc_memmd(exp
, karg
);
2715 case LL_IOC_LOV_GETSTRIPE
:
2716 err
= osc_getstripe(karg
, uarg
);
2718 case OBD_IOC_CLIENT_RECOVER
:
2719 err
= ptlrpc_recover_import(obd
->u
.cli
.cl_import
,
2720 data
->ioc_inlbuf1
, 0);
2724 case IOC_OSC_SET_ACTIVE
:
2725 err
= ptlrpc_set_import_active(obd
->u
.cli
.cl_import
,
2728 case OBD_IOC_POLL_QUOTACHECK
:
2729 err
= osc_quota_poll_check(exp
, karg
);
2731 case OBD_IOC_PING_TARGET
:
2732 err
= ptlrpc_obd_ping(obd
);
2735 CDEBUG(D_INODE
, "unrecognised ioctl %#x by %s\n",
2736 cmd
, current_comm());
2741 module_put(THIS_MODULE
);
2745 static int osc_get_info(const struct lu_env
*env
, struct obd_export
*exp
,
2746 u32 keylen
, void *key
, __u32
*vallen
, void *val
,
2747 struct lov_stripe_md
*lsm
)
2749 if (!vallen
|| !val
)
2752 if (KEY_IS(KEY_LOCK_TO_STRIPE
)) {
2753 __u32
*stripe
= val
;
2754 *vallen
= sizeof(*stripe
);
2757 } else if (KEY_IS(KEY_LAST_ID
)) {
2758 struct ptlrpc_request
*req
;
2763 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2764 &RQF_OST_GET_INFO_LAST_ID
);
2768 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2769 RCL_CLIENT
, keylen
);
2770 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2772 ptlrpc_request_free(req
);
2776 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2777 memcpy(tmp
, key
, keylen
);
2779 req
->rq_no_delay
= 1;
2780 req
->rq_no_resend
= 1;
2781 ptlrpc_request_set_replen(req
);
2782 rc
= ptlrpc_queue_wait(req
);
2786 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_OBD_ID
);
2792 *((u64
*)val
) = *reply
;
2794 ptlrpc_req_finished(req
);
2796 } else if (KEY_IS(KEY_FIEMAP
)) {
2797 struct ll_fiemap_info_key
*fm_key
= key
;
2798 struct ldlm_res_id res_id
;
2799 ldlm_policy_data_t policy
;
2800 struct lustre_handle lockh
;
2801 enum ldlm_mode mode
= 0;
2802 struct ptlrpc_request
*req
;
2803 struct ll_user_fiemap
*reply
;
2807 if (!(fm_key
->fiemap
.fm_flags
& FIEMAP_FLAG_SYNC
))
2810 policy
.l_extent
.start
= fm_key
->fiemap
.fm_start
&
2813 if (OBD_OBJECT_EOF
- fm_key
->fiemap
.fm_length
<=
2814 fm_key
->fiemap
.fm_start
+ PAGE_SIZE
- 1)
2815 policy
.l_extent
.end
= OBD_OBJECT_EOF
;
2817 policy
.l_extent
.end
= (fm_key
->fiemap
.fm_start
+
2818 fm_key
->fiemap
.fm_length
+
2819 PAGE_SIZE
- 1) & PAGE_MASK
;
2821 ostid_build_res_name(&fm_key
->oa
.o_oi
, &res_id
);
2822 mode
= ldlm_lock_match(exp
->exp_obd
->obd_namespace
,
2823 LDLM_FL_BLOCK_GRANTED
|
2825 &res_id
, LDLM_EXTENT
, &policy
,
2826 LCK_PR
| LCK_PW
, &lockh
, 0);
2827 if (mode
) { /* lock is cached on client */
2828 if (mode
!= LCK_PR
) {
2829 ldlm_lock_addref(&lockh
, LCK_PR
);
2830 ldlm_lock_decref(&lockh
, LCK_PW
);
2832 } else { /* no cached lock, needs acquire lock on server side */
2833 fm_key
->oa
.o_valid
|= OBD_MD_FLFLAGS
;
2834 fm_key
->oa
.o_flags
|= OBD_FL_SRVLOCK
;
2838 req
= ptlrpc_request_alloc(class_exp2cliimp(exp
),
2839 &RQF_OST_GET_INFO_FIEMAP
);
2845 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_KEY
,
2846 RCL_CLIENT
, keylen
);
2847 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2848 RCL_CLIENT
, *vallen
);
2849 req_capsule_set_size(&req
->rq_pill
, &RMF_FIEMAP_VAL
,
2850 RCL_SERVER
, *vallen
);
2852 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_GET_INFO
);
2854 ptlrpc_request_free(req
);
2858 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_KEY
);
2859 memcpy(tmp
, key
, keylen
);
2860 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2861 memcpy(tmp
, val
, *vallen
);
2863 ptlrpc_request_set_replen(req
);
2864 rc
= ptlrpc_queue_wait(req
);
2868 reply
= req_capsule_server_get(&req
->rq_pill
, &RMF_FIEMAP_VAL
);
2874 memcpy(val
, reply
, *vallen
);
2876 ptlrpc_req_finished(req
);
2879 ldlm_lock_decref(&lockh
, LCK_PR
);
2886 static int osc_set_info_async(const struct lu_env
*env
, struct obd_export
*exp
,
2887 u32 keylen
, void *key
, u32 vallen
,
2888 void *val
, struct ptlrpc_request_set
*set
)
2890 struct ptlrpc_request
*req
;
2891 struct obd_device
*obd
= exp
->exp_obd
;
2892 struct obd_import
*imp
= class_exp2cliimp(exp
);
2896 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN
, 10);
2898 if (KEY_IS(KEY_CHECKSUM
)) {
2899 if (vallen
!= sizeof(int))
2901 exp
->exp_obd
->u
.cli
.cl_checksum
= (*(int *)val
) ? 1 : 0;
2905 if (KEY_IS(KEY_SPTLRPC_CONF
)) {
2906 sptlrpc_conf_client_adapt(obd
);
2910 if (KEY_IS(KEY_FLUSH_CTX
)) {
2911 sptlrpc_import_flush_my_ctx(imp
);
2915 if (KEY_IS(KEY_CACHE_SET
)) {
2916 struct client_obd
*cli
= &obd
->u
.cli
;
2918 LASSERT(!cli
->cl_cache
); /* only once */
2919 cli
->cl_cache
= val
;
2920 cl_cache_incref(cli
->cl_cache
);
2921 cli
->cl_lru_left
= &cli
->cl_cache
->ccc_lru_left
;
2923 /* add this osc into entity list */
2924 LASSERT(list_empty(&cli
->cl_lru_osc
));
2925 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
2926 list_add(&cli
->cl_lru_osc
, &cli
->cl_cache
->ccc_lru
);
2927 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
2932 if (KEY_IS(KEY_CACHE_LRU_SHRINK
)) {
2933 struct client_obd
*cli
= &obd
->u
.cli
;
2934 int nr
= atomic_read(&cli
->cl_lru_in_list
) >> 1;
2935 int target
= *(int *)val
;
2937 nr
= osc_lru_shrink(env
, cli
, min(nr
, target
), true);
2942 if (!set
&& !KEY_IS(KEY_GRANT_SHRINK
))
2945 /* We pass all other commands directly to OST. Since nobody calls osc
2946 * methods directly and everybody is supposed to go through LOV, we
2947 * assume lov checked invalid values for us.
2948 * The only recognised values so far are evict_by_nid and mds_conn.
2949 * Even if something bad goes through, we'd get a -EINVAL from OST
2953 req
= ptlrpc_request_alloc(imp
, KEY_IS(KEY_GRANT_SHRINK
) ?
2954 &RQF_OST_SET_GRANT_INFO
:
2959 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_KEY
,
2960 RCL_CLIENT
, keylen
);
2961 if (!KEY_IS(KEY_GRANT_SHRINK
))
2962 req_capsule_set_size(&req
->rq_pill
, &RMF_SETINFO_VAL
,
2963 RCL_CLIENT
, vallen
);
2964 rc
= ptlrpc_request_pack(req
, LUSTRE_OST_VERSION
, OST_SET_INFO
);
2966 ptlrpc_request_free(req
);
2970 tmp
= req_capsule_client_get(&req
->rq_pill
, &RMF_SETINFO_KEY
);
2971 memcpy(tmp
, key
, keylen
);
2972 tmp
= req_capsule_client_get(&req
->rq_pill
, KEY_IS(KEY_GRANT_SHRINK
) ?
2975 memcpy(tmp
, val
, vallen
);
2977 if (KEY_IS(KEY_GRANT_SHRINK
)) {
2978 struct osc_brw_async_args
*aa
;
2981 CLASSERT(sizeof(*aa
) <= sizeof(req
->rq_async_args
));
2982 aa
= ptlrpc_req_async_args(req
);
2983 oa
= kmem_cache_zalloc(obdo_cachep
, GFP_NOFS
);
2985 ptlrpc_req_finished(req
);
2988 *oa
= ((struct ost_body
*)val
)->oa
;
2990 req
->rq_interpret_reply
= osc_shrink_grant_interpret
;
2993 ptlrpc_request_set_replen(req
);
2994 if (!KEY_IS(KEY_GRANT_SHRINK
)) {
2996 ptlrpc_set_add_req(set
, req
);
2997 ptlrpc_check_set(NULL
, set
);
2999 ptlrpcd_add_req(req
);
3005 static int osc_reconnect(const struct lu_env
*env
,
3006 struct obd_export
*exp
, struct obd_device
*obd
,
3007 struct obd_uuid
*cluuid
,
3008 struct obd_connect_data
*data
,
3011 struct client_obd
*cli
= &obd
->u
.cli
;
3013 if (data
&& (data
->ocd_connect_flags
& OBD_CONNECT_GRANT
)) {
3016 spin_lock(&cli
->cl_loi_list_lock
);
3017 data
->ocd_grant
= (cli
->cl_avail_grant
+ cli
->cl_dirty
) ?:
3018 2 * cli_brw_size(obd
);
3019 lost_grant
= cli
->cl_lost_grant
;
3020 cli
->cl_lost_grant
= 0;
3021 spin_unlock(&cli
->cl_loi_list_lock
);
3023 CDEBUG(D_RPCTRACE
, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3024 data
->ocd_connect_flags
,
3025 data
->ocd_version
, data
->ocd_grant
, lost_grant
);
3031 static int osc_disconnect(struct obd_export
*exp
)
3033 struct obd_device
*obd
= class_exp2obd(exp
);
3036 rc
= client_disconnect_export(exp
);
3038 * Initially we put del_shrink_grant before disconnect_export, but it
3039 * causes the following problem if setup (connect) and cleanup
3040 * (disconnect) are tangled together.
3041 * connect p1 disconnect p2
3042 * ptlrpc_connect_import
3043 * ............... class_manual_cleanup
3046 * ptlrpc_connect_interrupt
3048 * add this client to shrink list
3050 * Bang! pinger trigger the shrink.
3051 * So the osc should be disconnected from the shrink list, after we
3052 * are sure the import has been destroyed. BUG18662
3054 if (!obd
->u
.cli
.cl_import
)
3055 osc_del_shrink_grant(&obd
->u
.cli
);
3059 static int osc_import_event(struct obd_device
*obd
,
3060 struct obd_import
*imp
,
3061 enum obd_import_event event
)
3063 struct client_obd
*cli
;
3066 LASSERT(imp
->imp_obd
== obd
);
3069 case IMP_EVENT_DISCON
: {
3071 spin_lock(&cli
->cl_loi_list_lock
);
3072 cli
->cl_avail_grant
= 0;
3073 cli
->cl_lost_grant
= 0;
3074 spin_unlock(&cli
->cl_loi_list_lock
);
3077 case IMP_EVENT_INACTIVE
: {
3078 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_INACTIVE
, NULL
);
3081 case IMP_EVENT_INVALIDATE
: {
3082 struct ldlm_namespace
*ns
= obd
->obd_namespace
;
3086 env
= cl_env_get(&refcheck
);
3090 /* all pages go to failing rpcs due to the invalid
3093 osc_io_unplug(env
, cli
, NULL
);
3095 ldlm_namespace_cleanup(ns
, LDLM_FL_LOCAL_ONLY
);
3096 cl_env_put(env
, &refcheck
);
3102 case IMP_EVENT_ACTIVE
: {
3103 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVE
, NULL
);
3106 case IMP_EVENT_OCD
: {
3107 struct obd_connect_data
*ocd
= &imp
->imp_connect_data
;
3109 if (ocd
->ocd_connect_flags
& OBD_CONNECT_GRANT
)
3110 osc_init_grant(&obd
->u
.cli
, ocd
);
3113 if (ocd
->ocd_connect_flags
& OBD_CONNECT_REQPORTAL
)
3114 imp
->imp_client
->cli_request_portal
= OST_REQUEST_PORTAL
;
3116 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_OCD
, NULL
);
3119 case IMP_EVENT_DEACTIVATE
: {
3120 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_DEACTIVATE
, NULL
);
3123 case IMP_EVENT_ACTIVATE
: {
3124 rc
= obd_notify_observer(obd
, obd
, OBD_NOTIFY_ACTIVATE
, NULL
);
3128 CERROR("Unknown import event %d\n", event
);
3135 * Determine whether the lock can be canceled before replaying the lock
3136 * during recovery, see bug16774 for detailed information.
3138 * \retval zero the lock can't be canceled
3139 * \retval other ok to cancel
3141 static int osc_cancel_weight(struct ldlm_lock
*lock
)
3144 * Cancel all unused and granted extent lock.
3146 if (lock
->l_resource
->lr_type
== LDLM_EXTENT
&&
3147 lock
->l_granted_mode
== lock
->l_req_mode
&&
3148 osc_ldlm_weigh_ast(lock
) == 0)
3154 static int brw_queue_work(const struct lu_env
*env
, void *data
)
3156 struct client_obd
*cli
= data
;
3158 CDEBUG(D_CACHE
, "Run writeback work for client obd %p.\n", cli
);
3160 osc_io_unplug(env
, cli
, NULL
);
3164 int osc_setup(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3166 struct lprocfs_static_vars lvars
= { NULL
};
3167 struct client_obd
*cli
= &obd
->u
.cli
;
3174 rc
= ptlrpcd_addref();
3178 rc
= client_obd_setup(obd
, lcfg
);
3182 handler
= ptlrpcd_alloc_work(cli
->cl_import
, brw_queue_work
, cli
);
3183 if (IS_ERR(handler
)) {
3184 rc
= PTR_ERR(handler
);
3185 goto out_client_setup
;
3187 cli
->cl_writeback_work
= handler
;
3189 handler
= ptlrpcd_alloc_work(cli
->cl_import
, lru_queue_work
, cli
);
3190 if (IS_ERR(handler
)) {
3191 rc
= PTR_ERR(handler
);
3192 goto out_ptlrpcd_work
;
3195 cli
->cl_lru_work
= handler
;
3197 rc
= osc_quota_setup(obd
);
3199 goto out_ptlrpcd_work
;
3201 cli
->cl_grant_shrink_interval
= GRANT_SHRINK_INTERVAL
;
3202 lprocfs_osc_init_vars(&lvars
);
3203 if (lprocfs_obd_setup(obd
, lvars
.obd_vars
, lvars
.sysfs_vars
) == 0) {
3204 lproc_osc_attach_seqstat(obd
);
3205 sptlrpc_lprocfs_cliobd_attach(obd
);
3206 ptlrpc_lprocfs_register_obd(obd
);
3210 * We try to control the total number of requests with a upper limit
3211 * osc_reqpool_maxreqcount. There might be some race which will cause
3212 * over-limit allocation, but it is fine.
3214 req_count
= atomic_read(&osc_pool_req_count
);
3215 if (req_count
< osc_reqpool_maxreqcount
) {
3216 adding
= cli
->cl_max_rpcs_in_flight
+ 2;
3217 if (req_count
+ adding
> osc_reqpool_maxreqcount
)
3218 adding
= osc_reqpool_maxreqcount
- req_count
;
3220 added
= ptlrpc_add_rqs_to_pool(osc_rq_pool
, adding
);
3221 atomic_add(added
, &osc_pool_req_count
);
3224 INIT_LIST_HEAD(&cli
->cl_grant_shrink_list
);
3225 ns_register_cancel(obd
->obd_namespace
, osc_cancel_weight
);
3229 if (cli
->cl_writeback_work
) {
3230 ptlrpcd_destroy_work(cli
->cl_writeback_work
);
3231 cli
->cl_writeback_work
= NULL
;
3233 if (cli
->cl_lru_work
) {
3234 ptlrpcd_destroy_work(cli
->cl_lru_work
);
3235 cli
->cl_lru_work
= NULL
;
3238 client_obd_cleanup(obd
);
3244 static int osc_precleanup(struct obd_device
*obd
, enum obd_cleanup_stage stage
)
3247 case OBD_CLEANUP_EARLY
: {
3248 struct obd_import
*imp
;
3250 imp
= obd
->u
.cli
.cl_import
;
3251 CDEBUG(D_HA
, "Deactivating import %s\n", obd
->obd_name
);
3252 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3253 ptlrpc_deactivate_import(imp
);
3254 spin_lock(&imp
->imp_lock
);
3255 imp
->imp_pingable
= 0;
3256 spin_unlock(&imp
->imp_lock
);
3259 case OBD_CLEANUP_EXPORTS
: {
3260 struct client_obd
*cli
= &obd
->u
.cli
;
3262 * for echo client, export may be on zombie list, wait for
3263 * zombie thread to cull it, because cli.cl_import will be
3264 * cleared in client_disconnect_export():
3265 * class_export_destroy() -> obd_cleanup() ->
3266 * echo_device_free() -> echo_client_cleanup() ->
3267 * obd_disconnect() -> osc_disconnect() ->
3268 * client_disconnect_export()
3270 obd_zombie_barrier();
3271 if (cli
->cl_writeback_work
) {
3272 ptlrpcd_destroy_work(cli
->cl_writeback_work
);
3273 cli
->cl_writeback_work
= NULL
;
3275 if (cli
->cl_lru_work
) {
3276 ptlrpcd_destroy_work(cli
->cl_lru_work
);
3277 cli
->cl_lru_work
= NULL
;
3279 obd_cleanup_client_import(obd
);
3280 ptlrpc_lprocfs_unregister_obd(obd
);
3281 lprocfs_obd_cleanup(obd
);
3288 static int osc_cleanup(struct obd_device
*obd
)
3290 struct client_obd
*cli
= &obd
->u
.cli
;
3294 if (cli
->cl_cache
) {
3295 LASSERT(atomic_read(&cli
->cl_cache
->ccc_users
) > 0);
3296 spin_lock(&cli
->cl_cache
->ccc_lru_lock
);
3297 list_del_init(&cli
->cl_lru_osc
);
3298 spin_unlock(&cli
->cl_cache
->ccc_lru_lock
);
3299 cli
->cl_lru_left
= NULL
;
3300 cl_cache_decref(cli
->cl_cache
);
3301 cli
->cl_cache
= NULL
;
3304 /* free memory of osc quota cache */
3305 osc_quota_cleanup(obd
);
3307 rc
= client_obd_cleanup(obd
);
3313 int osc_process_config_base(struct obd_device
*obd
, struct lustre_cfg
*lcfg
)
3315 struct lprocfs_static_vars lvars
= { NULL
};
3318 lprocfs_osc_init_vars(&lvars
);
3320 switch (lcfg
->lcfg_command
) {
3322 rc
= class_process_proc_param(PARAM_OSC
, lvars
.obd_vars
,
3332 static int osc_process_config(struct obd_device
*obd
, u32 len
, void *buf
)
3334 return osc_process_config_base(obd
, buf
);
3337 static struct obd_ops osc_obd_ops
= {
3338 .owner
= THIS_MODULE
,
3340 .precleanup
= osc_precleanup
,
3341 .cleanup
= osc_cleanup
,
3342 .add_conn
= client_import_add_conn
,
3343 .del_conn
= client_import_del_conn
,
3344 .connect
= client_connect_import
,
3345 .reconnect
= osc_reconnect
,
3346 .disconnect
= osc_disconnect
,
3347 .statfs
= osc_statfs
,
3348 .statfs_async
= osc_statfs_async
,
3349 .packmd
= osc_packmd
,
3350 .unpackmd
= osc_unpackmd
,
3351 .create
= osc_create
,
3352 .destroy
= osc_destroy
,
3353 .getattr
= osc_getattr
,
3354 .getattr_async
= osc_getattr_async
,
3355 .setattr
= osc_setattr
,
3356 .setattr_async
= osc_setattr_async
,
3357 .find_cbdata
= osc_find_cbdata
,
3358 .iocontrol
= osc_iocontrol
,
3359 .get_info
= osc_get_info
,
3360 .set_info_async
= osc_set_info_async
,
3361 .import_event
= osc_import_event
,
3362 .process_config
= osc_process_config
,
3363 .quotactl
= osc_quotactl
,
3364 .quotacheck
= osc_quotacheck
,
3367 extern struct lu_kmem_descr osc_caches
[];
3368 extern struct lock_class_key osc_ast_guard_class
;
3370 static int __init
osc_init(void)
3372 struct lprocfs_static_vars lvars
= { NULL
};
3373 unsigned int reqpool_size
;
3374 unsigned int reqsize
;
3377 /* print an address of _any_ initialized kernel symbol from this
3378 * module, to allow debugging with gdb that doesn't support data
3379 * symbols from modules.
3381 CDEBUG(D_INFO
, "Lustre OSC module (%p).\n", &osc_caches
);
3383 rc
= lu_kmem_init(osc_caches
);
3387 lprocfs_osc_init_vars(&lvars
);
3389 rc
= class_register_type(&osc_obd_ops
, NULL
,
3390 LUSTRE_OSC_NAME
, &osc_device_type
);
3394 /* This is obviously too much memory, only prevent overflow here */
3395 if (osc_reqpool_mem_max
>= 1 << 12 || osc_reqpool_mem_max
== 0) {
3400 reqpool_size
= osc_reqpool_mem_max
<< 20;
3403 while (reqsize
< OST_MAXREQSIZE
)
3404 reqsize
= reqsize
<< 1;
3407 * We don't enlarge the request count in OSC pool according to
3408 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3409 * tried after normal allocation failed. So a small OSC pool won't
3410 * cause much performance degression in most of cases.
3412 osc_reqpool_maxreqcount
= reqpool_size
/ reqsize
;
3414 atomic_set(&osc_pool_req_count
, 0);
3415 osc_rq_pool
= ptlrpc_init_rq_pool(0, OST_MAXREQSIZE
,
3416 ptlrpc_add_rqs_to_pool
);
3424 class_unregister_type(LUSTRE_OSC_NAME
);
3426 lu_kmem_fini(osc_caches
);
3430 static void /*__exit*/ osc_exit(void)
3432 class_unregister_type(LUSTRE_OSC_NAME
);
3433 lu_kmem_fini(osc_caches
);
3434 ptlrpc_free_rq_pool(osc_rq_pool
);
3437 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3438 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3439 MODULE_LICENSE("GPL");
3440 MODULE_VERSION(LUSTRE_VERSION_STRING
);
3442 module_init(osc_init
);
3443 module_exit(osc_exit
);