Merge remote-tracking branch 'staging/staging-next'
[deliverable/linux.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
6a5b99a4 18 * http://www.gnu.org/licenses/gpl-2.0.html
d7e09d03 19 *
d7e09d03
PT
20 * GPL HEADER END
21 */
22/*
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
25 *
1dc563a6 26 * Copyright (c) 2011, 2015, Intel Corporation.
d7e09d03
PT
27 */
28/*
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
31 */
32
33#define DEBUG_SUBSYSTEM S_OSC
34
9fdaf8c0 35#include "../../include/linux/libcfs/libcfs.h"
d7e09d03 36
3ee30015
GKH
37#include "../include/lustre_dlm.h"
38#include "../include/lustre_net.h"
39#include "../include/lustre/lustre_user.h"
40#include "../include/obd_cksum.h"
d7e09d03 41
3ee30015
GKH
42#include "../include/lustre_ha.h"
43#include "../include/lprocfs_status.h"
8877d3bf 44#include "../include/lustre/lustre_ioctl.h"
3ee30015
GKH
45#include "../include/lustre_debug.h"
46#include "../include/lustre_param.h"
47#include "../include/lustre_fid.h"
dd45f477 48#include "../include/obd_class.h"
aefd9d71 49#include "../include/obd.h"
d7e09d03
PT
50#include "osc_internal.h"
51#include "osc_cl_internal.h"
52
aefd9d71
LX
53atomic_t osc_pool_req_count;
54unsigned int osc_reqpool_maxreqcount;
55struct ptlrpc_request_pool *osc_rq_pool;
56
57/* max memory used for request pool, unit is MB */
58static unsigned int osc_reqpool_mem_max = 5;
59module_param(osc_reqpool_mem_max, uint, 0444);
60
f024bad4
JH
61struct osc_brw_async_args {
62 struct obdo *aa_oa;
63 int aa_requested_nob;
64 int aa_nio_count;
65 u32 aa_page_count;
66 int aa_resends;
67 struct brw_page **aa_ppga;
68 struct client_obd *aa_cli;
69 struct list_head aa_oaps;
70 struct list_head aa_exts;
f024bad4
JH
71 struct cl_req *aa_clerq;
72};
73
74struct osc_async_args {
75 struct obd_info *aa_oi;
76};
77
78struct osc_setattr_args {
79 struct obdo *sa_oa;
80 obd_enqueue_update_f sa_upcall;
81 void *sa_cookie;
82};
83
84struct osc_fsync_args {
85 struct obd_info *fa_oi;
86 obd_enqueue_update_f fa_upcall;
87 void *fa_cookie;
88};
89
90struct osc_enqueue_args {
91 struct obd_export *oa_exp;
06563b56
JX
92 enum ldlm_type oa_type;
93 enum ldlm_mode oa_mode;
f024bad4 94 __u64 *oa_flags;
06563b56 95 osc_enqueue_upcall_f oa_upcall;
f024bad4
JH
96 void *oa_cookie;
97 struct ost_lvb *oa_lvb;
06563b56 98 struct lustre_handle oa_lockh;
f024bad4
JH
99 unsigned int oa_agl:1;
100};
101
21aef7d9 102static void osc_release_ppga(struct brw_page **ppga, u32 count);
d7e09d03
PT
103static int brw_interpret(const struct lu_env *env,
104 struct ptlrpc_request *req, void *data, int rc);
d7e09d03
PT
105
106/* Pack OSC object metadata for disk storage (LE byte order). */
107static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
108 struct lov_stripe_md *lsm)
109{
110 int lmm_size;
d7e09d03
PT
111
112 lmm_size = sizeof(**lmmp);
7f1ae4c0 113 if (!lmmp)
0a3bdb00 114 return lmm_size;
d7e09d03 115
7f1ae4c0 116 if (*lmmp && !lsm) {
7795178d 117 kfree(*lmmp);
d7e09d03 118 *lmmp = NULL;
0a3bdb00 119 return 0;
7f1ae4c0 120 } else if (unlikely(lsm && ostid_id(&lsm->lsm_oi) == 0)) {
0a3bdb00 121 return -EBADF;
d7e09d03
PT
122 }
123
7f1ae4c0 124 if (!*lmmp) {
7795178d 125 *lmmp = kzalloc(lmm_size, GFP_NOFS);
3408e9ae 126 if (!*lmmp)
0a3bdb00 127 return -ENOMEM;
d7e09d03
PT
128 }
129
130 if (lsm)
131 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
132
0a3bdb00 133 return lmm_size;
d7e09d03
PT
134}
135
136/* Unpack OSC object metadata from disk storage (LE byte order). */
137static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
138 struct lov_mds_md *lmm, int lmm_bytes)
139{
140 int lsm_size;
141 struct obd_import *imp = class_exp2cliimp(exp);
d7e09d03 142
7f1ae4c0 143 if (lmm) {
d7e09d03
PT
144 if (lmm_bytes < sizeof(*lmm)) {
145 CERROR("%s: lov_mds_md too small: %d, need %d\n",
146 exp->exp_obd->obd_name, lmm_bytes,
147 (int)sizeof(*lmm));
0a3bdb00 148 return -EINVAL;
d7e09d03
PT
149 }
150 /* XXX LOV_MAGIC etc check? */
151
152 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
153 CERROR("%s: zero lmm_object_id: rc = %d\n",
154 exp->exp_obd->obd_name, -EINVAL);
0a3bdb00 155 return -EINVAL;
d7e09d03
PT
156 }
157 }
158
159 lsm_size = lov_stripe_md_size(1);
7f1ae4c0 160 if (!lsmp)
0a3bdb00 161 return lsm_size;
d7e09d03 162
7f1ae4c0 163 if (*lsmp && !lmm) {
7795178d
JL
164 kfree((*lsmp)->lsm_oinfo[0]);
165 kfree(*lsmp);
d7e09d03 166 *lsmp = NULL;
0a3bdb00 167 return 0;
d7e09d03
PT
168 }
169
7f1ae4c0 170 if (!*lsmp) {
7795178d 171 *lsmp = kzalloc(lsm_size, GFP_NOFS);
7f1ae4c0 172 if (unlikely(!*lsmp))
0a3bdb00 173 return -ENOMEM;
7795178d
JL
174 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo),
175 GFP_NOFS);
7f1ae4c0 176 if (unlikely(!(*lsmp)->lsm_oinfo[0])) {
7795178d 177 kfree(*lsmp);
0a3bdb00 178 return -ENOMEM;
d7e09d03
PT
179 }
180 loi_init((*lsmp)->lsm_oinfo[0]);
181 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
0a3bdb00 182 return -EBADF;
d7e09d03
PT
183 }
184
7f1ae4c0 185 if (lmm)
d7e09d03
PT
186 /* XXX zero *lsmp? */
187 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
188
7f1ae4c0 189 if (imp &&
d7e09d03
PT
190 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
191 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
192 else
193 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
194
0a3bdb00 195 return lsm_size;
d7e09d03
PT
196}
197
d7e09d03
PT
198static inline void osc_pack_req_body(struct ptlrpc_request *req,
199 struct obd_info *oinfo)
200{
201 struct ost_body *body;
202
203 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
204 LASSERT(body);
205
3b2f75fd 206 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
207 oinfo->oi_oa);
d7e09d03
PT
208}
209
210static int osc_getattr_interpret(const struct lu_env *env,
211 struct ptlrpc_request *req,
212 struct osc_async_args *aa, int rc)
213{
214 struct ost_body *body;
d7e09d03
PT
215
216 if (rc != 0)
26c4ea46 217 goto out;
d7e09d03
PT
218
219 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
220 if (body) {
221 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 222 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
223 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
224
225 /* This should really be sent by the OST */
226 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
227 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
228 } else {
229 CDEBUG(D_INFO, "can't unpack ost_body\n");
230 rc = -EPROTO;
231 aa->aa_oi->oi_oa->o_valid = 0;
232 }
233out:
234 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 235 return rc;
d7e09d03
PT
236}
237
238static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
239 struct ptlrpc_request_set *set)
240{
241 struct ptlrpc_request *req;
242 struct osc_async_args *aa;
29ac6840 243 int rc;
d7e09d03
PT
244
245 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
7f1ae4c0 246 if (!req)
0a3bdb00 247 return -ENOMEM;
d7e09d03 248
d7e09d03
PT
249 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
250 if (rc) {
251 ptlrpc_request_free(req);
0a3bdb00 252 return rc;
d7e09d03
PT
253 }
254
255 osc_pack_req_body(req, oinfo);
256
257 ptlrpc_request_set_replen(req);
258 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
259
260 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
261 aa = ptlrpc_req_async_args(req);
262 aa->aa_oi = oinfo;
263
264 ptlrpc_set_add_req(set, req);
0a3bdb00 265 return 0;
d7e09d03
PT
266}
267
268static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
269 struct obd_info *oinfo)
270{
271 struct ptlrpc_request *req;
29ac6840
CH
272 struct ost_body *body;
273 int rc;
d7e09d03
PT
274
275 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
7f1ae4c0 276 if (!req)
0a3bdb00 277 return -ENOMEM;
d7e09d03 278
d7e09d03
PT
279 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280 if (rc) {
281 ptlrpc_request_free(req);
0a3bdb00 282 return rc;
d7e09d03
PT
283 }
284
285 osc_pack_req_body(req, oinfo);
286
287 ptlrpc_request_set_replen(req);
288
289 rc = ptlrpc_queue_wait(req);
290 if (rc)
26c4ea46 291 goto out;
d7e09d03
PT
292
293 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 294 if (!body) {
26c4ea46
TJ
295 rc = -EPROTO;
296 goto out;
297 }
d7e09d03
PT
298
299 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 300 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
301 &body->oa);
d7e09d03
PT
302
303 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
304 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
305
d7e09d03
PT
306 out:
307 ptlrpc_req_finished(req);
308 return rc;
309}
310
311static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
312 struct obd_info *oinfo, struct obd_trans_info *oti)
313{
314 struct ptlrpc_request *req;
29ac6840
CH
315 struct ost_body *body;
316 int rc;
d7e09d03
PT
317
318 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
7f1ae4c0 321 if (!req)
0a3bdb00 322 return -ENOMEM;
d7e09d03 323
d7e09d03
PT
324 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
325 if (rc) {
326 ptlrpc_request_free(req);
0a3bdb00 327 return rc;
d7e09d03
PT
328 }
329
330 osc_pack_req_body(req, oinfo);
331
332 ptlrpc_request_set_replen(req);
333
334 rc = ptlrpc_queue_wait(req);
335 if (rc)
26c4ea46 336 goto out;
d7e09d03
PT
337
338 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 339 if (!body) {
26c4ea46
TJ
340 rc = -EPROTO;
341 goto out;
342 }
d7e09d03 343
3b2f75fd 344 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
345 &body->oa);
d7e09d03 346
d7e09d03
PT
347out:
348 ptlrpc_req_finished(req);
0a3bdb00 349 return rc;
d7e09d03
PT
350}
351
352static int osc_setattr_interpret(const struct lu_env *env,
353 struct ptlrpc_request *req,
354 struct osc_setattr_args *sa, int rc)
355{
356 struct ost_body *body;
d7e09d03
PT
357
358 if (rc != 0)
26c4ea46 359 goto out;
d7e09d03
PT
360
361 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 362 if (!body) {
26c4ea46
TJ
363 rc = -EPROTO;
364 goto out;
365 }
d7e09d03 366
3b2f75fd 367 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
368 &body->oa);
d7e09d03
PT
369out:
370 rc = sa->sa_upcall(sa->sa_cookie, rc);
0a3bdb00 371 return rc;
d7e09d03
PT
372}
373
374int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
375 struct obd_trans_info *oti,
376 obd_enqueue_update_f upcall, void *cookie,
377 struct ptlrpc_request_set *rqset)
378{
29ac6840 379 struct ptlrpc_request *req;
d7e09d03 380 struct osc_setattr_args *sa;
29ac6840 381 int rc;
d7e09d03
PT
382
383 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
7f1ae4c0 384 if (!req)
0a3bdb00 385 return -ENOMEM;
d7e09d03 386
d7e09d03
PT
387 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
388 if (rc) {
389 ptlrpc_request_free(req);
0a3bdb00 390 return rc;
d7e09d03
PT
391 }
392
393 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
394 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
395
396 osc_pack_req_body(req, oinfo);
397
398 ptlrpc_request_set_replen(req);
399
400 /* do mds to ost setattr asynchronously */
401 if (!rqset) {
402 /* Do not wait for response. */
c5c4c6fa 403 ptlrpcd_add_req(req);
d7e09d03
PT
404 } else {
405 req->rq_interpret_reply =
406 (ptlrpc_interpterer_t)osc_setattr_interpret;
407
e72f36e2 408 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03
PT
409 sa = ptlrpc_req_async_args(req);
410 sa->sa_oa = oinfo->oi_oa;
411 sa->sa_upcall = upcall;
412 sa->sa_cookie = cookie;
413
414 if (rqset == PTLRPCD_SET)
c5c4c6fa 415 ptlrpcd_add_req(req);
d7e09d03
PT
416 else
417 ptlrpc_set_add_req(rqset, req);
418 }
419
0a3bdb00 420 return 0;
d7e09d03
PT
421}
422
423static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
424 struct obd_trans_info *oti,
425 struct ptlrpc_request_set *rqset)
426{
427 return osc_setattr_async_base(exp, oinfo, oti,
428 oinfo->oi_cb_up, oinfo, rqset);
429}
430
74d4ec11
SB
431static int osc_real_create(struct obd_export *exp, struct obdo *oa,
432 struct lov_stripe_md **ea,
433 struct obd_trans_info *oti)
d7e09d03
PT
434{
435 struct ptlrpc_request *req;
29ac6840
CH
436 struct ost_body *body;
437 struct lov_stripe_md *lsm;
438 int rc;
d7e09d03
PT
439
440 LASSERT(oa);
441 LASSERT(ea);
442
443 lsm = *ea;
444 if (!lsm) {
445 rc = obd_alloc_memmd(exp, &lsm);
446 if (rc < 0)
0a3bdb00 447 return rc;
d7e09d03
PT
448 }
449
450 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
7f1ae4c0 451 if (!req) {
26c4ea46
TJ
452 rc = -ENOMEM;
453 goto out;
454 }
d7e09d03
PT
455
456 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
457 if (rc) {
458 ptlrpc_request_free(req);
26c4ea46 459 goto out;
d7e09d03
PT
460 }
461
462 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
463 LASSERT(body);
3b2f75fd 464
465 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
466
467 ptlrpc_request_set_replen(req);
468
469 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
470 oa->o_flags == OBD_FL_DELORPHAN) {
471 DEBUG_REQ(D_HA, req,
472 "delorphan from OST integration");
473 /* Don't resend the delorphan req */
04a6284f
NC
474 req->rq_no_resend = 1;
475 req->rq_no_delay = 1;
d7e09d03
PT
476 }
477
478 rc = ptlrpc_queue_wait(req);
479 if (rc)
26c4ea46 480 goto out_req;
d7e09d03
PT
481
482 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 483 if (!body) {
26c4ea46
TJ
484 rc = -EPROTO;
485 goto out_req;
486 }
d7e09d03 487
3b2f75fd 488 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
489 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
490
491 oa->o_blksize = cli_brw_size(exp->exp_obd);
492 oa->o_valid |= OBD_MD_FLBLKSZ;
493
494 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
495 * have valid lsm_oinfo data structs, so don't go touching that.
496 * This needs to be fixed in a big way.
497 */
498 lsm->lsm_oi = oa->o_oi;
499 *ea = lsm;
500
58c78cd2
JH
501 if (oti && oa->o_valid & OBD_MD_FLCOOKIE) {
502 if (!oti->oti_logcookies)
503 oti->oti_logcookies = &oti->oti_onecookie;
504 *oti->oti_logcookies = oa->o_lcookie;
d7e09d03
PT
505 }
506
f537dd2c 507 CDEBUG(D_HA, "transno: %lld\n",
d7e09d03
PT
508 lustre_msg_get_transno(req->rq_repmsg));
509out_req:
510 ptlrpc_req_finished(req);
511out:
512 if (rc && !*ea)
513 obd_free_memmd(exp, &lsm);
0a3bdb00 514 return rc;
d7e09d03
PT
515}
516
517int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
518 obd_enqueue_update_f upcall, void *cookie,
519 struct ptlrpc_request_set *rqset)
520{
29ac6840 521 struct ptlrpc_request *req;
d7e09d03 522 struct osc_setattr_args *sa;
29ac6840
CH
523 struct ost_body *body;
524 int rc;
d7e09d03
PT
525
526 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
7f1ae4c0 527 if (!req)
0a3bdb00 528 return -ENOMEM;
d7e09d03 529
d7e09d03
PT
530 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531 if (rc) {
532 ptlrpc_request_free(req);
0a3bdb00 533 return rc;
d7e09d03
PT
534 }
535 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536 ptlrpc_at_set_req_timeout(req);
537
538 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539 LASSERT(body);
3b2f75fd 540 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
541 oinfo->oi_oa);
d7e09d03
PT
542
543 ptlrpc_request_set_replen(req);
544
545 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
e72f36e2 546 CLASSERT(sizeof(*sa) <= sizeof(req->rq_async_args));
d7e09d03 547 sa = ptlrpc_req_async_args(req);
29ac6840 548 sa->sa_oa = oinfo->oi_oa;
d7e09d03
PT
549 sa->sa_upcall = upcall;
550 sa->sa_cookie = cookie;
551 if (rqset == PTLRPCD_SET)
c5c4c6fa 552 ptlrpcd_add_req(req);
d7e09d03
PT
553 else
554 ptlrpc_set_add_req(rqset, req);
555
0a3bdb00 556 return 0;
d7e09d03
PT
557}
558
d7e09d03
PT
559static int osc_sync_interpret(const struct lu_env *env,
560 struct ptlrpc_request *req,
561 void *arg, int rc)
562{
563 struct osc_fsync_args *fa = arg;
564 struct ost_body *body;
d7e09d03
PT
565
566 if (rc)
26c4ea46 567 goto out;
d7e09d03
PT
568
569 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 570 if (!body) {
e72f36e2 571 CERROR("can't unpack ost_body\n");
26c4ea46
TJ
572 rc = -EPROTO;
573 goto out;
d7e09d03
PT
574 }
575
576 *fa->fa_oi->oi_oa = body->oa;
577out:
578 rc = fa->fa_upcall(fa->fa_cookie, rc);
0a3bdb00 579 return rc;
d7e09d03
PT
580}
581
582int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
583 obd_enqueue_update_f upcall, void *cookie,
584 struct ptlrpc_request_set *rqset)
585{
586 struct ptlrpc_request *req;
29ac6840 587 struct ost_body *body;
d7e09d03 588 struct osc_fsync_args *fa;
29ac6840 589 int rc;
d7e09d03
PT
590
591 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
7f1ae4c0 592 if (!req)
0a3bdb00 593 return -ENOMEM;
d7e09d03 594
d7e09d03
PT
595 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
596 if (rc) {
597 ptlrpc_request_free(req);
0a3bdb00 598 return rc;
d7e09d03
PT
599 }
600
601 /* overload the size and blocks fields in the oa with start/end */
602 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
603 LASSERT(body);
3b2f75fd 604 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
605 oinfo->oi_oa);
d7e09d03
PT
606
607 ptlrpc_request_set_replen(req);
608 req->rq_interpret_reply = osc_sync_interpret;
609
610 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
611 fa = ptlrpc_req_async_args(req);
612 fa->fa_oi = oinfo;
613 fa->fa_upcall = upcall;
614 fa->fa_cookie = cookie;
615
616 if (rqset == PTLRPCD_SET)
c5c4c6fa 617 ptlrpcd_add_req(req);
d7e09d03
PT
618 else
619 ptlrpc_set_add_req(rqset, req);
620
0a3bdb00 621 return 0;
d7e09d03
PT
622}
623
d7e09d03
PT
624/* Find and cancel locally locks matched by @mode in the resource found by
625 * @objid. Found locks are added into @cancel list. Returns the amount of
30aa9c52
OD
626 * locks added to @cancels list.
627 */
d7e09d03
PT
628static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
629 struct list_head *cancels,
52ee0d20 630 enum ldlm_mode mode, __u64 lock_flags)
d7e09d03
PT
631{
632 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
633 struct ldlm_res_id res_id;
634 struct ldlm_resource *res;
635 int count;
d7e09d03
PT
636
637 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
638 * export) but disabled through procfs (flag in NS).
639 *
640 * This distinguishes from a case when ELC is not supported originally,
641 * when we still want to cancel locks in advance and just cancel them
30aa9c52
OD
642 * locally, without sending any RPC.
643 */
d7e09d03 644 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
0a3bdb00 645 return 0;
d7e09d03
PT
646
647 ostid_build_res_name(&oa->o_oi, &res_id);
648 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
099d5adf 649 if (IS_ERR(res))
0a3bdb00 650 return 0;
d7e09d03
PT
651
652 LDLM_RESOURCE_ADDREF(res);
653 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
654 lock_flags, 0, NULL);
655 LDLM_RESOURCE_DELREF(res);
656 ldlm_resource_putref(res);
0a3bdb00 657 return count;
d7e09d03
PT
658}
659
660static int osc_destroy_interpret(const struct lu_env *env,
661 struct ptlrpc_request *req, void *data,
662 int rc)
663{
664 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
665
666 atomic_dec(&cli->cl_destroy_in_flight);
667 wake_up(&cli->cl_destroy_waitq);
668 return 0;
669}
670
671static int osc_can_send_destroy(struct client_obd *cli)
672{
673 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
674 cli->cl_max_rpcs_in_flight) {
675 /* The destroy request can be sent */
676 return 1;
677 }
678 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
679 cli->cl_max_rpcs_in_flight) {
680 /*
681 * The counter has been modified between the two atomic
682 * operations.
683 */
684 wake_up(&cli->cl_destroy_waitq);
685 }
686 return 0;
687}
688
74d4ec11
SB
689static int osc_create(const struct lu_env *env, struct obd_export *exp,
690 struct obdo *oa, struct lov_stripe_md **ea,
691 struct obd_trans_info *oti)
d7e09d03
PT
692{
693 int rc = 0;
d7e09d03
PT
694
695 LASSERT(oa);
696 LASSERT(ea);
697 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
698
699 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
700 oa->o_flags == OBD_FL_RECREATE_OBJS) {
0a3bdb00 701 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
702 }
703
704 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
0a3bdb00 705 return osc_real_create(exp, oa, ea, oti);
d7e09d03
PT
706
707 /* we should not get here anymore */
708 LBUG();
709
0a3bdb00 710 return rc;
d7e09d03
PT
711}
712
713/* Destroy requests can be async always on the client, and we don't even really
714 * care about the return code since the client cannot do anything at all about
715 * a destroy failure.
716 * When the MDS is unlinking a filename, it saves the file objects into a
717 * recovery llog, and these object records are cancelled when the OST reports
718 * they were destroyed and sync'd to disk (i.e. transaction committed).
719 * If the client dies, or the OST is down when the object should be destroyed,
720 * the records are not cancelled, and when the OST reconnects to the MDS next,
721 * it will retrieve the llog unlink logs and then sends the log cancellation
30aa9c52
OD
722 * cookies to the MDS after committing destroy transactions.
723 */
d7e09d03
PT
724static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
725 struct obdo *oa, struct lov_stripe_md *ea,
ef2e0f55 726 struct obd_trans_info *oti, struct obd_export *md_export)
d7e09d03 727{
29ac6840 728 struct client_obd *cli = &exp->exp_obd->u.cli;
d7e09d03 729 struct ptlrpc_request *req;
29ac6840 730 struct ost_body *body;
d7e09d03
PT
731 LIST_HEAD(cancels);
732 int rc, count;
d7e09d03
PT
733
734 if (!oa) {
735 CDEBUG(D_INFO, "oa NULL\n");
0a3bdb00 736 return -EINVAL;
d7e09d03
PT
737 }
738
739 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
740 LDLM_FL_DISCARD_DATA);
741
742 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
7f1ae4c0 743 if (!req) {
d7e09d03 744 ldlm_lock_list_put(&cancels, l_bl_ast, count);
0a3bdb00 745 return -ENOMEM;
d7e09d03
PT
746 }
747
d7e09d03
PT
748 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
749 0, &cancels, count);
750 if (rc) {
751 ptlrpc_request_free(req);
0a3bdb00 752 return rc;
d7e09d03
PT
753 }
754
755 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
756 ptlrpc_at_set_req_timeout(req);
757
7f1ae4c0 758 if (oti && oa->o_valid & OBD_MD_FLCOOKIE)
d7e09d03
PT
759 oa->o_lcookie = *oti->oti_logcookies;
760 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
761 LASSERT(body);
3b2f75fd 762 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03 763
d7e09d03
PT
764 ptlrpc_request_set_replen(req);
765
11d66e89 766 /* If osc_destroy is for destroying the unlink orphan,
d7e09d03
PT
767 * sent from MDT to OST, which should not be blocked here,
768 * because the process might be triggered by ptlrpcd, and
30aa9c52
OD
769 * it is not good to block ptlrpcd thread (b=16006
770 **/
d7e09d03
PT
771 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
772 req->rq_interpret_reply = osc_destroy_interpret;
773 if (!osc_can_send_destroy(cli)) {
774 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
775 NULL);
776
777 /*
778 * Wait until the number of on-going destroy RPCs drops
779 * under max_rpc_in_flight
780 */
781 l_wait_event_exclusive(cli->cl_destroy_waitq,
782 osc_can_send_destroy(cli), &lwi);
783 }
784 }
785
786 /* Do not wait for response */
c5c4c6fa 787 ptlrpcd_add_req(req);
0a3bdb00 788 return 0;
d7e09d03
PT
789}
790
791static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
792 long writing_bytes)
793{
cd94f231 794 u32 bits = OBD_MD_FLBLOCKS | OBD_MD_FLGRANT;
d7e09d03
PT
795
796 LASSERT(!(oa->o_valid & bits));
797
798 oa->o_valid |= bits;
7d53d8f4 799 spin_lock(&cli->cl_loi_list_lock);
3147b268
HZ
800 oa->o_dirty = cli->cl_dirty_pages << PAGE_SHIFT;
801 if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
802 cli->cl_dirty_max_pages)) {
d7e09d03 803 CERROR("dirty %lu - %lu > dirty_max %lu\n",
3147b268
HZ
804 cli->cl_dirty_pages, cli->cl_dirty_transit,
805 cli->cl_dirty_max_pages);
d7e09d03 806 oa->o_undirty = 0;
d806f30e 807 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
808 atomic_read(&obd_dirty_transit_pages) >
809 (long)(obd_max_dirty_pages + 1))) {
810 /* The atomic_read() allowing the atomic_inc() are
811 * not covered by a lock thus they may safely race and trip
30aa9c52
OD
812 * this CERROR() unless we add in a small fudge factor (+1).
813 */
d806f30e 814 CERROR("%s: dirty %d + %d > system dirty_max %d\n",
ac5b1481 815 cli->cl_import->imp_obd->obd_name,
d7e09d03
PT
816 atomic_read(&obd_dirty_pages),
817 atomic_read(&obd_dirty_transit_pages),
818 obd_max_dirty_pages);
819 oa->o_undirty = 0;
3147b268
HZ
820 } else if (unlikely(cli->cl_dirty_max_pages - cli->cl_dirty_pages >
821 0x7fffffff)) {
d7e09d03 822 CERROR("dirty %lu - dirty_max %lu too big???\n",
3147b268 823 cli->cl_dirty_pages, cli->cl_dirty_max_pages);
d7e09d03
PT
824 oa->o_undirty = 0;
825 } else {
826 long max_in_flight = (cli->cl_max_pages_per_rpc <<
cd94f231 827 PAGE_SHIFT) *
d7e09d03 828 (cli->cl_max_rpcs_in_flight + 1);
3147b268
HZ
829 oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_SHIFT,
830 max_in_flight);
d7e09d03
PT
831 }
832 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
833 oa->o_dropped = cli->cl_lost_grant;
834 cli->cl_lost_grant = 0;
7d53d8f4 835 spin_unlock(&cli->cl_loi_list_lock);
1d8cb70c 836 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
d7e09d03 837 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
d7e09d03
PT
838}
839
840void osc_update_next_shrink(struct client_obd *cli)
841{
842 cli->cl_next_shrink_grant =
843 cfs_time_shift(cli->cl_grant_shrink_interval);
72a87fca 844 CDEBUG(D_CACHE, "next time %ld to shrink grant\n",
d7e09d03
PT
845 cli->cl_next_shrink_grant);
846}
847
21aef7d9 848static void __osc_update_grant(struct client_obd *cli, u64 grant)
d7e09d03 849{
7d53d8f4 850 spin_lock(&cli->cl_loi_list_lock);
d7e09d03 851 cli->cl_avail_grant += grant;
7d53d8f4 852 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
853}
854
855static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
856{
857 if (body->oa.o_valid & OBD_MD_FLGRANT) {
b0f5aad5 858 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
d7e09d03
PT
859 __osc_update_grant(cli, body->oa.o_grant);
860 }
861}
862
863static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 864 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
865 void *val, struct ptlrpc_request_set *set);
866
867static int osc_shrink_grant_interpret(const struct lu_env *env,
868 struct ptlrpc_request *req,
869 void *aa, int rc)
870{
871 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
f024bad4 872 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
d7e09d03
PT
873 struct ost_body *body;
874
875 if (rc != 0) {
876 __osc_update_grant(cli, oa->o_grant);
26c4ea46 877 goto out;
d7e09d03
PT
878 }
879
880 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
881 LASSERT(body);
882 osc_update_grant(cli, body);
883out:
2ba262fb 884 kmem_cache_free(obdo_cachep, oa);
d7e09d03
PT
885 return rc;
886}
887
888static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
889{
7d53d8f4 890 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
891 oa->o_grant = cli->cl_avail_grant / 4;
892 cli->cl_avail_grant -= oa->o_grant;
7d53d8f4 893 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
894 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
895 oa->o_valid |= OBD_MD_FLFLAGS;
896 oa->o_flags = 0;
897 }
898 oa->o_flags |= OBD_FL_SHRINK_GRANT;
899 osc_update_next_shrink(cli);
900}
901
902/* Shrink the current grant, either from some large amount to enough for a
903 * full set of in-flight RPCs, or if we have already shrunk to that limit
904 * then to enough for a single RPC. This avoids keeping more grant than
30aa9c52
OD
905 * needed, and avoids shrinking the grant piecemeal.
906 */
d7e09d03
PT
907static int osc_shrink_grant(struct client_obd *cli)
908{
909 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
09cbfeaf 910 (cli->cl_max_pages_per_rpc << PAGE_SHIFT);
d7e09d03 911
7d53d8f4 912 spin_lock(&cli->cl_loi_list_lock);
d7e09d03 913 if (cli->cl_avail_grant <= target_bytes)
09cbfeaf 914 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
7d53d8f4 915 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
916
917 return osc_shrink_grant_to_target(cli, target_bytes);
918}
919
920int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
921{
29ac6840 922 int rc = 0;
d7e09d03 923 struct ost_body *body;
d7e09d03 924
7d53d8f4 925 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
926 /* Don't shrink if we are already above or below the desired limit
927 * We don't want to shrink below a single RPC, as that will negatively
30aa9c52
OD
928 * impact block allocation and long-term performance.
929 */
09cbfeaf
KS
930 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_SHIFT)
931 target_bytes = cli->cl_max_pages_per_rpc << PAGE_SHIFT;
d7e09d03
PT
932
933 if (target_bytes >= cli->cl_avail_grant) {
7d53d8f4 934 spin_unlock(&cli->cl_loi_list_lock);
0a3bdb00 935 return 0;
d7e09d03 936 }
7d53d8f4 937 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03 938
7795178d 939 body = kzalloc(sizeof(*body), GFP_NOFS);
d7e09d03 940 if (!body)
0a3bdb00 941 return -ENOMEM;
d7e09d03
PT
942
943 osc_announce_cached(cli, &body->oa, 0);
944
7d53d8f4 945 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
946 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
947 cli->cl_avail_grant = target_bytes;
7d53d8f4 948 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
949 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
950 body->oa.o_valid |= OBD_MD_FLFLAGS;
951 body->oa.o_flags = 0;
952 }
953 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
954 osc_update_next_shrink(cli);
955
956 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
957 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
958 sizeof(*body), body, NULL);
959 if (rc != 0)
960 __osc_update_grant(cli, body->oa.o_grant);
7795178d 961 kfree(body);
0a3bdb00 962 return rc;
d7e09d03
PT
963}
964
965static int osc_should_shrink_grant(struct client_obd *client)
966{
a649ad1d
GKH
967 unsigned long time = cfs_time_current();
968 unsigned long next_shrink = client->cl_next_shrink_grant;
d7e09d03
PT
969
970 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
971 OBD_CONNECT_GRANT_SHRINK) == 0)
972 return 0;
973
974 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
975 /* Get the current RPC size directly, instead of going via:
976 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
30aa9c52
OD
977 * Keep comment here so that it can be found by searching.
978 */
09cbfeaf 979 int brw_size = client->cl_max_pages_per_rpc << PAGE_SHIFT;
d7e09d03
PT
980
981 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
982 client->cl_avail_grant > brw_size)
983 return 1;
71e8dd9a
AM
984
985 osc_update_next_shrink(client);
d7e09d03
PT
986 }
987 return 0;
988}
989
990static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
991{
992 struct client_obd *client;
993
79910d7d 994 list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
d7e09d03
PT
995 if (osc_should_shrink_grant(client))
996 osc_shrink_grant(client);
997 }
998 return 0;
999}
1000
1001static int osc_add_shrink_grant(struct client_obd *client)
1002{
1003 int rc;
1004
1005 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1006 TIMEOUT_GRANT,
1007 osc_grant_shrink_grant_cb, NULL,
1008 &client->cl_grant_shrink_list);
1009 if (rc) {
1010 CERROR("add grant client %s error %d\n",
79910d7d 1011 client->cl_import->imp_obd->obd_name, rc);
d7e09d03
PT
1012 return rc;
1013 }
72a87fca 1014 CDEBUG(D_CACHE, "add grant client %s\n",
d7e09d03
PT
1015 client->cl_import->imp_obd->obd_name);
1016 osc_update_next_shrink(client);
1017 return 0;
1018}
1019
1020static int osc_del_shrink_grant(struct client_obd *client)
1021{
1022 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1023 TIMEOUT_GRANT);
1024}
1025
1026static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1027{
1028 /*
1029 * ocd_grant is the total grant amount we're expect to hold: if we've
3147b268
HZ
1030 * been evicted, it's the new avail_grant amount, cl_dirty_pages will
1031 * drop to 0 as inflight RPCs fail out; otherwise, it's avail_grant +
1032 * dirty.
d7e09d03
PT
1033 *
1034 * race is tolerable here: if we're evicted, but imp_state already
3147b268 1035 * left EVICTED state, then cl_dirty_pages must be 0 already.
d7e09d03 1036 */
7d53d8f4 1037 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
1038 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1039 cli->cl_avail_grant = ocd->ocd_grant;
1040 else
3147b268
HZ
1041 cli->cl_avail_grant = ocd->ocd_grant -
1042 (cli->cl_dirty_pages << PAGE_SHIFT);
d7e09d03
PT
1043
1044 if (cli->cl_avail_grant < 0) {
1045 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
3147b268 1047 ocd->ocd_grant, cli->cl_dirty_pages << PAGE_SHIFT);
d7e09d03 1048 /* workaround for servers which do not have the patch from
30aa9c52
OD
1049 * LU-2679
1050 */
d7e09d03
PT
1051 cli->cl_avail_grant = ocd->ocd_grant;
1052 }
1053
1054 /* determine the appropriate chunk size used by osc_extent. */
09cbfeaf 1055 cli->cl_chunkbits = max_t(int, PAGE_SHIFT, ocd->ocd_blocksize);
7d53d8f4 1056 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03 1057
2d00bd17
JP
1058 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld chunk bits: %d\n",
1059 cli->cl_import->imp_obd->obd_name,
1060 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
d7e09d03
PT
1061
1062 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1063 list_empty(&cli->cl_grant_shrink_list))
1064 osc_add_shrink_grant(cli);
1065}
1066
1067/* We assume that the reason this OSC got a short read is because it read
1068 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1069 * via the LOV, and it _knows_ it's reading inside the file, it's just that
30aa9c52
OD
1070 * this stripe never got written at or beyond this stripe offset yet.
1071 */
21aef7d9 1072static void handle_short_read(int nob_read, u32 page_count,
d7e09d03
PT
1073 struct brw_page **pga)
1074{
1075 char *ptr;
1076 int i = 0;
1077
1078 /* skip bytes read OK */
1079 while (nob_read > 0) {
e72f36e2 1080 LASSERT(page_count > 0);
d7e09d03
PT
1081
1082 if (pga[i]->count > nob_read) {
1083 /* EOF inside this page */
1084 ptr = kmap(pga[i]->pg) +
616387e8 1085 (pga[i]->off & ~PAGE_MASK);
d7e09d03
PT
1086 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1087 kunmap(pga[i]->pg);
1088 page_count--;
1089 i++;
1090 break;
1091 }
1092
1093 nob_read -= pga[i]->count;
1094 page_count--;
1095 i++;
1096 }
1097
1098 /* zero remaining pages */
1099 while (page_count-- > 0) {
616387e8 1100 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~PAGE_MASK);
d7e09d03
PT
1101 memset(ptr, 0, pga[i]->count);
1102 kunmap(pga[i]->pg);
1103 i++;
1104 }
1105}
1106
1107static int check_write_rcs(struct ptlrpc_request *req,
1108 int requested_nob, int niocount,
21aef7d9 1109 u32 page_count, struct brw_page **pga)
d7e09d03 1110{
29ac6840
CH
1111 int i;
1112 __u32 *remote_rcs;
d7e09d03
PT
1113
1114 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1115 sizeof(*remote_rcs) *
1116 niocount);
7f1ae4c0 1117 if (!remote_rcs) {
d7e09d03 1118 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
fbe7c6c7 1119 return -EPROTO;
d7e09d03
PT
1120 }
1121
1122 /* return error if any niobuf was in error */
1123 for (i = 0; i < niocount; i++) {
1124 if ((int)remote_rcs[i] < 0)
e8291974 1125 return remote_rcs[i];
d7e09d03
PT
1126
1127 if (remote_rcs[i] != 0) {
1128 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
79910d7d 1129 i, remote_rcs[i], req);
fbe7c6c7 1130 return -EPROTO;
d7e09d03
PT
1131 }
1132 }
1133
1134 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1135 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1136 req->rq_bulk->bd_nob_transferred, requested_nob);
fbe7c6c7 1137 return -EPROTO;
d7e09d03
PT
1138 }
1139
fbe7c6c7 1140 return 0;
d7e09d03
PT
1141}
1142
1143static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1144{
1145 if (p1->flag != p2->flag) {
7cf1054b 1146 unsigned mask = ~(OBD_BRW_FROM_GRANT | OBD_BRW_NOCACHE |
ad479287
PS
1147 OBD_BRW_SYNC | OBD_BRW_ASYNC |
1148 OBD_BRW_NOQUOTA | OBD_BRW_SOFT_SYNC);
d7e09d03
PT
1149
1150 /* warn if we try to combine flags that we don't know to be
30aa9c52
OD
1151 * safe to combine
1152 */
d7e09d03 1153 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
2d00bd17 1154 CWARN("Saw flags 0x%x and 0x%x in the same brw, please report this at http://bugs.whamcloud.com/\n",
d7e09d03
PT
1155 p1->flag, p2->flag);
1156 }
1157 return 0;
1158 }
1159
1160 return (p1->off + p1->count == p2->off);
1161}
1162
21aef7d9 1163static u32 osc_checksum_bulk(int nob, u32 pg_count,
29ac6840 1164 struct brw_page **pga, int opc,
d133210f 1165 enum cksum_type cksum_type)
d7e09d03 1166{
29ac6840
CH
1167 __u32 cksum;
1168 int i = 0;
1169 struct cfs_crypto_hash_desc *hdesc;
1170 unsigned int bufsize;
1171 int err;
1172 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
d7e09d03
PT
1173
1174 LASSERT(pg_count > 0);
1175
1176 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1177 if (IS_ERR(hdesc)) {
1178 CERROR("Unable to initialize checksum hash %s\n",
1179 cfs_crypto_hash_name(cfs_alg));
1180 return PTR_ERR(hdesc);
1181 }
1182
1183 while (nob > 0 && pg_count > 0) {
1184 int count = pga[i]->count > nob ? nob : pga[i]->count;
1185
1186 /* corrupt the data before we compute the checksum, to
30aa9c52
OD
1187 * simulate an OST->client data error
1188 */
d7e09d03
PT
1189 if (i == 0 && opc == OST_READ &&
1190 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1191 unsigned char *ptr = kmap(pga[i]->pg);
616387e8 1192 int off = pga[i]->off & ~PAGE_MASK;
50ffcb7e 1193
d7e09d03
PT
1194 memcpy(ptr + off, "bad1", min(4, nob));
1195 kunmap(pga[i]->pg);
1196 }
1197 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
616387e8 1198 pga[i]->off & ~PAGE_MASK,
d7e09d03 1199 count);
aa3bee0d
GKH
1200 CDEBUG(D_PAGE,
1201 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1202 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1203 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1204 page_private(pga[i]->pg),
616387e8 1205 (int)(pga[i]->off & ~PAGE_MASK));
d7e09d03
PT
1206
1207 nob -= pga[i]->count;
1208 pg_count--;
1209 i++;
1210 }
1211
c11e27a4 1212 bufsize = sizeof(cksum);
d7e09d03
PT
1213 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1214
d7e09d03 1215 /* For sending we only compute the wrong checksum instead
30aa9c52
OD
1216 * of corrupting the data so it is still correct on a redo
1217 */
d7e09d03
PT
1218 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1219 cksum++;
1220
1221 return cksum;
1222}
1223
1d8cb70c
GD
1224static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1225 struct obdo *oa,
21aef7d9 1226 struct lov_stripe_md *lsm, u32 page_count,
d7e09d03
PT
1227 struct brw_page **pga,
1228 struct ptlrpc_request **reqp,
ef2e0f55 1229 int reserve,
d7e09d03
PT
1230 int resend)
1231{
29ac6840 1232 struct ptlrpc_request *req;
d7e09d03 1233 struct ptlrpc_bulk_desc *desc;
29ac6840
CH
1234 struct ost_body *body;
1235 struct obd_ioobj *ioobj;
1236 struct niobuf_remote *niobuf;
d7e09d03
PT
1237 int niocount, i, requested_nob, opc, rc;
1238 struct osc_brw_async_args *aa;
29ac6840 1239 struct req_capsule *pill;
d7e09d03
PT
1240 struct brw_page *pg_prev;
1241
d7e09d03 1242 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
0a3bdb00 1243 return -ENOMEM; /* Recoverable */
d7e09d03 1244 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
0a3bdb00 1245 return -EINVAL; /* Fatal */
d7e09d03
PT
1246
1247 if ((cmd & OBD_BRW_WRITE) != 0) {
1248 opc = OST_WRITE;
1249 req = ptlrpc_request_alloc_pool(cli->cl_import,
aefd9d71 1250 osc_rq_pool,
d7e09d03
PT
1251 &RQF_OST_BRW_WRITE);
1252 } else {
1253 opc = OST_READ;
1254 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1255 }
7f1ae4c0 1256 if (!req)
0a3bdb00 1257 return -ENOMEM;
d7e09d03
PT
1258
1259 for (niocount = i = 1; i < page_count; i++) {
1260 if (!can_merge_pages(pga[i - 1], pga[i]))
1261 niocount++;
1262 }
1263
1264 pill = &req->rq_pill;
1265 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1266 sizeof(*ioobj));
1267 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1268 niocount * sizeof(*niobuf));
d7e09d03
PT
1269
1270 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1271 if (rc) {
1272 ptlrpc_request_free(req);
0a3bdb00 1273 return rc;
d7e09d03
PT
1274 }
1275 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1276 ptlrpc_at_set_req_timeout(req);
1277 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
30aa9c52
OD
1278 * retry logic
1279 */
d7e09d03
PT
1280 req->rq_no_retry_einprogress = 1;
1281
1282 desc = ptlrpc_prep_bulk_imp(req, page_count,
1283 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1284 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1285 OST_BULK_PORTAL);
1286
7f1ae4c0 1287 if (!desc) {
26c4ea46
TJ
1288 rc = -ENOMEM;
1289 goto out;
1290 }
d7e09d03
PT
1291 /* NB request now owns desc and will free it when it gets freed */
1292
1293 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1294 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1295 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
7f1ae4c0 1296 LASSERT(body && ioobj && niobuf);
d7e09d03 1297
3b2f75fd 1298 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1299
1300 obdo_to_ioobj(oa, ioobj);
1301 ioobj->ioo_bufcnt = niocount;
1302 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1303 * that might be send for this request. The actual number is decided
1304 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1305 * "max - 1" for old client compatibility sending "0", and also so the
30aa9c52
OD
1306 * the actual maximum is a power-of-two number, not one less. LU-1431
1307 */
d7e09d03 1308 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
d7e09d03
PT
1309 LASSERT(page_count > 0);
1310 pg_prev = pga[0];
1311 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1312 struct brw_page *pg = pga[i];
616387e8 1313 int poff = pg->off & ~PAGE_MASK;
d7e09d03
PT
1314
1315 LASSERT(pg->count > 0);
1316 /* make sure there is no gap in the middle of page array */
1317 LASSERTF(page_count == 1 ||
09cbfeaf 1318 (ergo(i == 0, poff + pg->count == PAGE_SIZE) &&
d7e09d03 1319 ergo(i > 0 && i < page_count - 1,
09cbfeaf 1320 poff == 0 && pg->count == PAGE_SIZE) &&
d7e09d03 1321 ergo(i == page_count - 1, poff == 0)),
b0f5aad5 1322 "i: %d/%d pg: %p off: %llu, count: %u\n",
d7e09d03
PT
1323 i, page_count, pg, pg->off, pg->count);
1324 LASSERTF(i == 0 || pg->off > pg_prev->off,
2d00bd17 1325 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu prev_pg %p [pri %lu ind %lu] off %llu\n",
d7e09d03
PT
1326 i, page_count,
1327 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1328 pg_prev->pg, page_private(pg_prev->pg),
1329 pg_prev->pg->index, pg_prev->off);
1330 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1331 (pg->flag & OBD_BRW_SRVLOCK));
1332
1333 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1334 requested_nob += pg->count;
1335
1336 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1337 niobuf--;
1338 niobuf->len += pg->count;
1339 } else {
1340 niobuf->offset = pg->off;
29ac6840
CH
1341 niobuf->len = pg->count;
1342 niobuf->flags = pg->flag;
d7e09d03
PT
1343 }
1344 pg_prev = pg;
1345 }
1346
1347 LASSERTF((void *)(niobuf - niocount) ==
1348 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1349 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1350 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1351
1352 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1353 if (resend) {
1354 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1355 body->oa.o_valid |= OBD_MD_FLFLAGS;
1356 body->oa.o_flags = 0;
1357 }
1358 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1359 }
1360
1361 if (osc_should_shrink_grant(cli))
1362 osc_shrink_grant_local(cli, &body->oa);
1363
1364 /* size[REQ_REC_OFF] still sizeof (*body) */
1365 if (opc == OST_WRITE) {
1366 if (cli->cl_checksum &&
1367 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1368 /* store cl_cksum_type in a local variable since
30aa9c52
OD
1369 * it can be changed via lprocfs
1370 */
d133210f 1371 enum cksum_type cksum_type = cli->cl_cksum_type;
d7e09d03
PT
1372
1373 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1374 oa->o_flags &= OBD_FL_LOCAL_MASK;
1375 body->oa.o_flags = 0;
1376 }
1377 body->oa.o_flags |= cksum_type_pack(cksum_type);
1378 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1379 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1380 page_count, pga,
1381 OST_WRITE,
1382 cksum_type);
1383 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1384 body->oa.o_cksum);
1385 /* save this in 'oa', too, for later checking */
1386 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1387 oa->o_flags |= cksum_type_pack(cksum_type);
1388 } else {
1389 /* clear out the checksum flag, in case this is a
30aa9c52
OD
1390 * resend but cl_checksum is no longer set. b=11238
1391 */
d7e09d03
PT
1392 oa->o_valid &= ~OBD_MD_FLCKSUM;
1393 }
1394 oa->o_cksum = body->oa.o_cksum;
1395 /* 1 RC per niobuf */
1396 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1397 sizeof(__u32) * niocount);
1398 } else {
1399 if (cli->cl_checksum &&
1400 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1401 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1402 body->oa.o_flags = 0;
1403 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1404 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1405 }
1406 }
1407 ptlrpc_request_set_replen(req);
1408
1409 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1410 aa = ptlrpc_req_async_args(req);
1411 aa->aa_oa = oa;
1412 aa->aa_requested_nob = requested_nob;
1413 aa->aa_nio_count = niocount;
1414 aa->aa_page_count = page_count;
1415 aa->aa_resends = 0;
1416 aa->aa_ppga = pga;
1417 aa->aa_cli = cli;
1418 INIT_LIST_HEAD(&aa->aa_oaps);
d7e09d03
PT
1419
1420 *reqp = req;
0a3bdb00 1421 return 0;
d7e09d03
PT
1422
1423 out:
1424 ptlrpc_req_finished(req);
0a3bdb00 1425 return rc;
d7e09d03
PT
1426}
1427
1428static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1429 __u32 client_cksum, __u32 server_cksum, int nob,
21aef7d9 1430 u32 page_count, struct brw_page **pga,
d133210f 1431 enum cksum_type client_cksum_type)
d7e09d03
PT
1432{
1433 __u32 new_cksum;
1434 char *msg;
d133210f 1435 enum cksum_type cksum_type;
d7e09d03
PT
1436
1437 if (server_cksum == client_cksum) {
1438 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1439 return 0;
1440 }
1441
1442 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1443 oa->o_flags : 0);
1444 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1445 cksum_type);
1446
1447 if (cksum_type != client_cksum_type)
2d00bd17
JP
1448 msg = "the server did not use the checksum type specified in the original request - likely a protocol problem"
1449 ;
d7e09d03 1450 else if (new_cksum == server_cksum)
2d00bd17
JP
1451 msg = "changed on the client after we checksummed it - likely false positive due to mmap IO (bug 11742)"
1452 ;
d7e09d03
PT
1453 else if (new_cksum == client_cksum)
1454 msg = "changed in transit before arrival at OST";
1455 else
2d00bd17
JP
1456 msg = "changed in transit AND doesn't match the original - likely false positive due to mmap IO (bug 11742)"
1457 ;
d7e09d03
PT
1458
1459 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
b0f5aad5 1460 " object "DOSTID" extent [%llu-%llu]\n",
d7e09d03
PT
1461 msg, libcfs_nid2str(peer->nid),
1462 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1463 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1464 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1465 POSTID(&oa->o_oi), pga[0]->off,
cd94f231
OD
1466 pga[page_count - 1]->off +
1467 pga[page_count - 1]->count - 1);
2d00bd17
JP
1468 CERROR("original client csum %x (type %x), server csum %x (type %x), client csum now %x\n",
1469 client_cksum, client_cksum_type,
d7e09d03
PT
1470 server_cksum, cksum_type, new_cksum);
1471 return 1;
1472}
1473
1474/* Note rc enters this function as number of bytes transferred */
1475static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1476{
1477 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1478 const lnet_process_id_t *peer =
1479 &req->rq_import->imp_connection->c_peer;
1480 struct client_obd *cli = aa->aa_cli;
1481 struct ost_body *body;
1482 __u32 client_cksum = 0;
d7e09d03
PT
1483
1484 if (rc < 0 && rc != -EDQUOT) {
1485 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
0a3bdb00 1486 return rc;
d7e09d03
PT
1487 }
1488
7f1ae4c0 1489 LASSERTF(req->rq_repmsg, "rc = %d\n", rc);
d7e09d03 1490 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
7f1ae4c0 1491 if (!body) {
d7e09d03 1492 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
0a3bdb00 1493 return -EPROTO;
d7e09d03
PT
1494 }
1495
1496 /* set/clear over quota flag for a uid/gid */
1497 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1498 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1499 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1500
55f5a824 1501 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
d7e09d03
PT
1502 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1503 body->oa.o_flags);
1504 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1505 }
1506
1507 osc_update_grant(cli, body);
1508
1509 if (rc < 0)
0a3bdb00 1510 return rc;
d7e09d03
PT
1511
1512 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1513 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1514
1515 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1516 if (rc > 0) {
1517 CERROR("Unexpected +ve rc %d\n", rc);
0a3bdb00 1518 return -EPROTO;
d7e09d03
PT
1519 }
1520 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1521
1522 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
0a3bdb00 1523 return -EAGAIN;
d7e09d03
PT
1524
1525 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1526 check_write_checksum(&body->oa, peer, client_cksum,
1527 body->oa.o_cksum, aa->aa_requested_nob,
1528 aa->aa_page_count, aa->aa_ppga,
1529 cksum_type_unpack(aa->aa_oa->o_flags)))
0a3bdb00 1530 return -EAGAIN;
d7e09d03 1531
1d8cb70c
GD
1532 rc = check_write_rcs(req, aa->aa_requested_nob,
1533 aa->aa_nio_count,
d7e09d03 1534 aa->aa_page_count, aa->aa_ppga);
26c4ea46 1535 goto out;
d7e09d03
PT
1536 }
1537
1538 /* The rest of this function executes only for OST_READs */
1539
1540 /* if unwrap_bulk failed, return -EAGAIN to retry */
1541 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
26c4ea46
TJ
1542 if (rc < 0) {
1543 rc = -EAGAIN;
1544 goto out;
1545 }
d7e09d03
PT
1546
1547 if (rc > aa->aa_requested_nob) {
1548 CERROR("Unexpected rc %d (%d requested)\n", rc,
1549 aa->aa_requested_nob);
0a3bdb00 1550 return -EPROTO;
d7e09d03
PT
1551 }
1552
1553 if (rc != req->rq_bulk->bd_nob_transferred) {
e72f36e2 1554 CERROR("Unexpected rc %d (%d transferred)\n",
79910d7d 1555 rc, req->rq_bulk->bd_nob_transferred);
fbe7c6c7 1556 return -EPROTO;
d7e09d03
PT
1557 }
1558
1559 if (rc < aa->aa_requested_nob)
1560 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1561
1562 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1563 static int cksum_counter;
29ac6840 1564 __u32 server_cksum = body->oa.o_cksum;
80feb1ef
DE
1565 char *via = "";
1566 char *router = "";
d133210f 1567 enum cksum_type cksum_type;
d7e09d03 1568
cd94f231
OD
1569 cksum_type = cksum_type_unpack(body->oa.o_valid &
1570 OBD_MD_FLFLAGS ?
d7e09d03
PT
1571 body->oa.o_flags : 0);
1572 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1573 aa->aa_ppga, OST_READ,
1574 cksum_type);
1575
80feb1ef 1576 if (peer->nid != req->rq_bulk->bd_sender) {
d7e09d03
PT
1577 via = " via ";
1578 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1579 }
1580
a2ff0f97 1581 if (server_cksum != client_cksum) {
2d00bd17 1582 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from %s%s%s inode " DFID " object " DOSTID " extent [%llu-%llu]\n",
d7e09d03
PT
1583 req->rq_import->imp_obd->obd_name,
1584 libcfs_nid2str(peer->nid),
1585 via, router,
1586 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1587 body->oa.o_parent_seq : (__u64)0,
d7e09d03 1588 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1589 body->oa.o_parent_oid : 0,
d7e09d03 1590 body->oa.o_valid & OBD_MD_FLFID ?
2d00bd17 1591 body->oa.o_parent_ver : 0,
d7e09d03
PT
1592 POSTID(&body->oa.o_oi),
1593 aa->aa_ppga[0]->off,
1594 aa->aa_ppga[aa->aa_page_count-1]->off +
1595 aa->aa_ppga[aa->aa_page_count-1]->count -
2d00bd17 1596 1);
d7e09d03
PT
1597 CERROR("client %x, server %x, cksum_type %x\n",
1598 client_cksum, server_cksum, cksum_type);
1599 cksum_counter = 0;
1600 aa->aa_oa->o_cksum = client_cksum;
1601 rc = -EAGAIN;
1602 } else {
1603 cksum_counter++;
1604 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1605 rc = 0;
1606 }
1607 } else if (unlikely(client_cksum)) {
1608 static int cksum_missed;
1609
1610 cksum_missed++;
1611 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1612 CERROR("Checksum %u requested from %s but not sent\n",
1613 cksum_missed, libcfs_nid2str(peer->nid));
1614 } else {
1615 rc = 0;
1616 }
1617out:
1618 if (rc >= 0)
3b2f75fd 1619 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1620 aa->aa_oa, &body->oa);
d7e09d03 1621
0a3bdb00 1622 return rc;
d7e09d03
PT
1623}
1624
d7e09d03
PT
1625static int osc_brw_redo_request(struct ptlrpc_request *request,
1626 struct osc_brw_async_args *aa, int rc)
1627{
1628 struct ptlrpc_request *new_req;
1629 struct osc_brw_async_args *new_aa;
1630 struct osc_async_page *oap;
d7e09d03
PT
1631
1632 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1633 "redo for recoverable error %d", rc);
1634
1635 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
b2952d62 1636 OST_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
d7e09d03
PT
1637 aa->aa_cli, aa->aa_oa,
1638 NULL /* lsm unused by osc currently */,
1639 aa->aa_page_count, aa->aa_ppga,
ef2e0f55 1640 &new_req, 0, 1);
d7e09d03 1641 if (rc)
0a3bdb00 1642 return rc;
d7e09d03
PT
1643
1644 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
7f1ae4c0 1645 if (oap->oap_request) {
d7e09d03
PT
1646 LASSERTF(request == oap->oap_request,
1647 "request %p != oap_request %p\n",
1648 request, oap->oap_request);
1649 if (oap->oap_interrupted) {
1650 ptlrpc_req_finished(new_req);
0a3bdb00 1651 return -EINTR;
d7e09d03
PT
1652 }
1653 }
1654 }
1655 /* New request takes over pga and oaps from old request.
30aa9c52
OD
1656 * Note that copying a list_head doesn't work, need to move it...
1657 */
d7e09d03
PT
1658 aa->aa_resends++;
1659 new_req->rq_interpret_reply = request->rq_interpret_reply;
1660 new_req->rq_async_args = request->rq_async_args;
ac5b1481 1661 new_req->rq_commit_cb = request->rq_commit_cb;
d7e09d03 1662 /* cap resend delay to the current request timeout, this is similar to
30aa9c52
OD
1663 * what ptlrpc does (see after_reply())
1664 */
d7e09d03 1665 if (aa->aa_resends > new_req->rq_timeout)
219e6de6 1666 new_req->rq_sent = ktime_get_real_seconds() + new_req->rq_timeout;
d7e09d03 1667 else
219e6de6 1668 new_req->rq_sent = ktime_get_real_seconds() + aa->aa_resends;
d7e09d03
PT
1669 new_req->rq_generation_set = 1;
1670 new_req->rq_import_generation = request->rq_import_generation;
1671
1672 new_aa = ptlrpc_req_async_args(new_req);
1673
1674 INIT_LIST_HEAD(&new_aa->aa_oaps);
1675 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1676 INIT_LIST_HEAD(&new_aa->aa_exts);
1677 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1678 new_aa->aa_resends = aa->aa_resends;
1679
1680 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1681 if (oap->oap_request) {
1682 ptlrpc_req_finished(oap->oap_request);
1683 oap->oap_request = ptlrpc_request_addref(new_req);
1684 }
1685 }
1686
d7e09d03
PT
1687 /* XXX: This code will run into problem if we're going to support
1688 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1689 * and wait for all of them to be finished. We should inherit request
30aa9c52
OD
1690 * set from old request.
1691 */
c5c4c6fa 1692 ptlrpcd_add_req(new_req);
d7e09d03
PT
1693
1694 DEBUG_REQ(D_INFO, new_req, "new request");
0a3bdb00 1695 return 0;
d7e09d03
PT
1696}
1697
1698/*
1699 * ugh, we want disk allocation on the target to happen in offset order. we'll
1700 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1701 * fine for our small page arrays and doesn't require allocation. its an
1702 * insertion sort that swaps elements that are strides apart, shrinking the
1703 * stride down until its '1' and the array is sorted.
1704 */
1705static void sort_brw_pages(struct brw_page **array, int num)
1706{
1707 int stride, i, j;
1708 struct brw_page *tmp;
1709
1710 if (num == 1)
1711 return;
1712 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1713 ;
1714
1715 do {
1716 stride /= 3;
1717 for (i = stride ; i < num ; i++) {
1718 tmp = array[i];
1719 j = i;
1720 while (j >= stride && array[j - stride]->off > tmp->off) {
1721 array[j] = array[j - stride];
1722 j -= stride;
1723 }
1724 array[j] = tmp;
1725 }
1726 } while (stride > 1);
1727}
1728
21aef7d9 1729static void osc_release_ppga(struct brw_page **ppga, u32 count)
d7e09d03 1730{
7f1ae4c0 1731 LASSERT(ppga);
7795178d 1732 kfree(ppga);
d7e09d03
PT
1733}
1734
d7e09d03
PT
1735static int brw_interpret(const struct lu_env *env,
1736 struct ptlrpc_request *req, void *data, int rc)
1737{
1738 struct osc_brw_async_args *aa = data;
1739 struct osc_extent *ext;
1740 struct osc_extent *tmp;
d7e09d03 1741 struct client_obd *cli = aa->aa_cli;
d7e09d03
PT
1742
1743 rc = osc_brw_fini_request(req, rc);
1744 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1745 /* When server return -EINPROGRESS, client should always retry
30aa9c52
OD
1746 * regardless of the number of times the bulk was resent already.
1747 */
d7e09d03
PT
1748 if (osc_recoverable_error(rc)) {
1749 if (req->rq_import_generation !=
1750 req->rq_import->imp_generation) {
2d00bd17 1751 CDEBUG(D_HA, "%s: resend cross eviction for object: " DOSTID ", rc = %d.\n",
d7e09d03
PT
1752 req->rq_import->imp_obd->obd_name,
1753 POSTID(&aa->aa_oa->o_oi), rc);
1754 } else if (rc == -EINPROGRESS ||
1755 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1756 rc = osc_brw_redo_request(req, aa, rc);
1757 } else {
b0f5aad5 1758 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
d7e09d03
PT
1759 req->rq_import->imp_obd->obd_name,
1760 POSTID(&aa->aa_oa->o_oi), rc);
1761 }
1762
1763 if (rc == 0)
0a3bdb00 1764 return 0;
d7e09d03
PT
1765 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1766 rc = -EIO;
1767 }
1768
77605e41 1769 if (rc == 0) {
d7e09d03
PT
1770 struct obdo *oa = aa->aa_oa;
1771 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1772 unsigned long valid = 0;
77605e41
JX
1773 struct cl_object *obj;
1774 struct osc_async_page *last;
1775
1776 last = brw_page2oap(aa->aa_ppga[aa->aa_page_count - 1]);
1777 obj = osc2cl(last->oap_obj);
d7e09d03 1778
77605e41 1779 cl_object_attr_lock(obj);
d7e09d03
PT
1780 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1781 attr->cat_blocks = oa->o_blocks;
1782 valid |= CAT_BLOCKS;
1783 }
1784 if (oa->o_valid & OBD_MD_FLMTIME) {
1785 attr->cat_mtime = oa->o_mtime;
1786 valid |= CAT_MTIME;
1787 }
1788 if (oa->o_valid & OBD_MD_FLATIME) {
1789 attr->cat_atime = oa->o_atime;
1790 valid |= CAT_ATIME;
1791 }
1792 if (oa->o_valid & OBD_MD_FLCTIME) {
1793 attr->cat_ctime = oa->o_ctime;
1794 valid |= CAT_CTIME;
1795 }
77605e41
JX
1796
1797 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1798 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo;
1799 loff_t last_off = last->oap_count + last->oap_obj_off;
1800
1801 /* Change file size if this is an out of quota or
1802 * direct IO write and it extends the file size
1803 */
1804 if (loi->loi_lvb.lvb_size < last_off) {
1805 attr->cat_size = last_off;
1806 valid |= CAT_SIZE;
1807 }
1808 /* Extend KMS if it's not a lockless write */
1809 if (loi->loi_kms < last_off &&
1810 oap2osc_page(last)->ops_srvlock == 0) {
1811 attr->cat_kms = last_off;
1812 valid |= CAT_KMS;
1813 }
d7e09d03 1814 }
77605e41
JX
1815
1816 if (valid != 0)
1817 cl_object_attr_set(env, obj, attr, valid);
1818 cl_object_attr_unlock(obj);
d7e09d03 1819 }
2ba262fb 1820 kmem_cache_free(obdo_cachep, aa->aa_oa);
d7e09d03 1821
d806f30e
JX
1822 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
1823 osc_inc_unstable_pages(req);
1824
77605e41
JX
1825 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1826 list_del_init(&ext->oe_link);
1827 osc_extent_finish(env, ext, 1, rc);
1828 }
1829 LASSERT(list_empty(&aa->aa_exts));
1830 LASSERT(list_empty(&aa->aa_oaps));
1831
d7e09d03
PT
1832 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1833 req->rq_bulk->bd_nob_transferred);
1834 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1835 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1836
7d53d8f4 1837 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
1838 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1839 * is called so we know whether to go to sync BRWs or wait for more
30aa9c52
OD
1840 * RPCs to complete
1841 */
d7e09d03
PT
1842 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1843 cli->cl_w_in_flight--;
1844 else
1845 cli->cl_r_in_flight--;
1846 osc_wake_cache_waiters(cli);
7d53d8f4 1847 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03 1848
c5c4c6fa 1849 osc_io_unplug(env, cli, NULL);
0a3bdb00 1850 return rc;
d7e09d03
PT
1851}
1852
ac5b1481
PS
1853static void brw_commit(struct ptlrpc_request *req)
1854{
ac5b1481
PS
1855 /*
1856 * If osc_inc_unstable_pages (via osc_extent_finish) races with
1857 * this called via the rq_commit_cb, I need to ensure
1858 * osc_dec_unstable_pages is still called. Otherwise unstable
1859 * pages may be leaked.
1860 */
fa1cc966
JX
1861 spin_lock(&req->rq_lock);
1862 if (unlikely(req->rq_unstable)) {
1863 req->rq_unstable = 0;
ac5b1481
PS
1864 spin_unlock(&req->rq_lock);
1865 osc_dec_unstable_pages(req);
ac5b1481
PS
1866 } else {
1867 req->rq_committed = 1;
fa1cc966 1868 spin_unlock(&req->rq_lock);
ac5b1481 1869 }
ac5b1481
PS
1870}
1871
d7e09d03
PT
1872/**
1873 * Build an RPC by the list of extent @ext_list. The caller must ensure
1874 * that the total pages in this list are NOT over max pages per RPC.
1875 * Extents in the list must be in OES_RPC state.
1876 */
1877int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
c5c4c6fa 1878 struct list_head *ext_list, int cmd)
d7e09d03 1879{
29ac6840
CH
1880 struct ptlrpc_request *req = NULL;
1881 struct osc_extent *ext;
1882 struct brw_page **pga = NULL;
1883 struct osc_brw_async_args *aa = NULL;
1884 struct obdo *oa = NULL;
1885 struct osc_async_page *oap;
1886 struct osc_async_page *tmp;
1887 struct cl_req *clerq = NULL;
1888 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
29ac6840
CH
1889 struct cl_req_attr *crattr = NULL;
1890 u64 starting_offset = OBD_OBJECT_EOF;
1891 u64 ending_offset = 0;
1892 int mpflag = 0;
1893 int mem_tight = 0;
1894 int page_count = 0;
d806f30e 1895 bool soft_sync = false;
29ac6840
CH
1896 int i;
1897 int rc;
1898 struct ost_body *body;
d7e09d03 1899 LIST_HEAD(rpc_list);
d7e09d03 1900
d7e09d03
PT
1901 LASSERT(!list_empty(ext_list));
1902
1903 /* add pages into rpc_list to build BRW rpc */
1904 list_for_each_entry(ext, ext_list, oe_link) {
1905 LASSERT(ext->oe_state == OES_RPC);
1906 mem_tight |= ext->oe_memalloc;
1907 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1908 ++page_count;
1909 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1910 if (starting_offset > oap->oap_obj_off)
1911 starting_offset = oap->oap_obj_off;
1912 else
1913 LASSERT(oap->oap_page_off == 0);
1914 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1915 ending_offset = oap->oap_obj_off +
1916 oap->oap_count;
1917 else
1918 LASSERT(oap->oap_page_off + oap->oap_count ==
09cbfeaf 1919 PAGE_SIZE);
d7e09d03
PT
1920 }
1921 }
1922
d806f30e 1923 soft_sync = osc_over_unstable_soft_limit(cli);
d7e09d03
PT
1924 if (mem_tight)
1925 mpflag = cfs_memory_pressure_get_and_set();
1926
7795178d 1927 crattr = kzalloc(sizeof(*crattr), GFP_NOFS);
3408e9ae 1928 if (!crattr) {
26c4ea46
TJ
1929 rc = -ENOMEM;
1930 goto out;
1931 }
cad6fafa 1932
7795178d 1933 pga = kcalloc(page_count, sizeof(*pga), GFP_NOFS);
7f1ae4c0 1934 if (!pga) {
26c4ea46
TJ
1935 rc = -ENOMEM;
1936 goto out;
1937 }
d7e09d03 1938
c4418dac 1939 oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
7f1ae4c0 1940 if (!oa) {
26c4ea46
TJ
1941 rc = -ENOMEM;
1942 goto out;
1943 }
d7e09d03
PT
1944
1945 i = 0;
1946 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1947 struct cl_page *page = oap2cl_page(oap);
50ffcb7e 1948
7f1ae4c0 1949 if (!clerq) {
d7e09d03 1950 clerq = cl_req_alloc(env, page, crt,
cad6fafa 1951 1 /* only 1-object rpcs for now */);
26c4ea46
TJ
1952 if (IS_ERR(clerq)) {
1953 rc = PTR_ERR(clerq);
1954 goto out;
1955 }
d7e09d03
PT
1956 }
1957 if (mem_tight)
1958 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
d806f30e
JX
1959 if (soft_sync)
1960 oap->oap_brw_flags |= OBD_BRW_SOFT_SYNC;
d7e09d03
PT
1961 pga[i] = &oap->oap_brw_page;
1962 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1963 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
badc9fed 1964 pga[i]->pg, oap->oap_page->index, oap,
cad6fafa 1965 pga[i]->flag);
d7e09d03
PT
1966 i++;
1967 cl_req_page_add(env, clerq, page);
1968 }
1969
1970 /* always get the data for the obdo for the rpc */
7f1ae4c0 1971 LASSERT(clerq);
cad6fafa
BJ
1972 crattr->cra_oa = oa;
1973 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
1974
1975 rc = cl_req_prep(env, clerq);
1976 if (rc != 0) {
1977 CERROR("cl_req_prep failed: %d\n", rc);
26c4ea46 1978 goto out;
d7e09d03
PT
1979 }
1980
1981 sort_brw_pages(pga, page_count);
1982 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
79910d7d 1983 pga, &req, 1, 0);
d7e09d03
PT
1984 if (rc != 0) {
1985 CERROR("prep_req failed: %d\n", rc);
26c4ea46 1986 goto out;
d7e09d03
PT
1987 }
1988
ac5b1481 1989 req->rq_commit_cb = brw_commit;
d7e09d03
PT
1990 req->rq_interpret_reply = brw_interpret;
1991
1992 if (mem_tight != 0)
1993 req->rq_memalloc = 1;
1994
1995 /* Need to update the timestamps after the request is built in case
1996 * we race with setattr (locally or in queue at OST). If OST gets
1997 * later setattr before earlier BRW (as determined by the request xid),
1998 * the OST will not use BRW timestamps. Sadly, there is no obvious
30aa9c52
OD
1999 * way to do this in a single call. bug 10150
2000 */
3ce08cd7
NY
2001 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2002 crattr->cra_oa = &body->oa;
cad6fafa 2003 cl_req_attr_set(env, clerq, crattr,
cd94f231 2004 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLATIME);
d7e09d03 2005
cad6fafa 2006 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
2007
2008 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2009 aa = ptlrpc_req_async_args(req);
2010 INIT_LIST_HEAD(&aa->aa_oaps);
2011 list_splice_init(&rpc_list, &aa->aa_oaps);
2012 INIT_LIST_HEAD(&aa->aa_exts);
2013 list_splice_init(ext_list, &aa->aa_exts);
2014 aa->aa_clerq = clerq;
2015
2016 /* queued sync pages can be torn down while the pages
30aa9c52
OD
2017 * were between the pending list and the rpc
2018 */
d7e09d03
PT
2019 tmp = NULL;
2020 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2021 /* only one oap gets a request reference */
7f1ae4c0 2022 if (!tmp)
d7e09d03
PT
2023 tmp = oap;
2024 if (oap->oap_interrupted && !req->rq_intr) {
2025 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
79910d7d 2026 oap, req);
d7e09d03
PT
2027 ptlrpc_mark_interrupted(req);
2028 }
2029 }
7f1ae4c0 2030 if (tmp)
d7e09d03
PT
2031 tmp->oap_request = ptlrpc_request_addref(req);
2032
7d53d8f4 2033 spin_lock(&cli->cl_loi_list_lock);
09cbfeaf 2034 starting_offset >>= PAGE_SHIFT;
d7e09d03
PT
2035 if (cmd == OBD_BRW_READ) {
2036 cli->cl_r_in_flight++;
2037 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2038 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2039 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2040 starting_offset + 1);
2041 } else {
2042 cli->cl_w_in_flight++;
2043 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2044 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2045 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2046 starting_offset + 1);
2047 }
7d53d8f4 2048 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
2049
2050 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2051 page_count, aa, cli->cl_r_in_flight,
2052 cli->cl_w_in_flight);
2053
c5c4c6fa 2054 ptlrpcd_add_req(req);
d7e09d03 2055 rc = 0;
d7e09d03
PT
2056
2057out:
2058 if (mem_tight != 0)
2059 cfs_memory_pressure_restore(mpflag);
2060
f999d098 2061 kfree(crattr);
cad6fafa 2062
d7e09d03 2063 if (rc != 0) {
7f1ae4c0 2064 LASSERT(!req);
d7e09d03
PT
2065
2066 if (oa)
2ba262fb 2067 kmem_cache_free(obdo_cachep, oa);
59e267c0 2068 kfree(pga);
d7e09d03 2069 /* this should happen rarely and is pretty bad, it makes the
30aa9c52
OD
2070 * pending list not follow the dirty order
2071 */
d7e09d03
PT
2072 while (!list_empty(ext_list)) {
2073 ext = list_entry(ext_list->next, struct osc_extent,
79910d7d 2074 oe_link);
d7e09d03
PT
2075 list_del_init(&ext->oe_link);
2076 osc_extent_finish(env, ext, 0, rc);
2077 }
2078 if (clerq && !IS_ERR(clerq))
2079 cl_req_completion(env, clerq, rc);
2080 }
0a3bdb00 2081 return rc;
d7e09d03
PT
2082}
2083
2084static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2085 struct ldlm_enqueue_info *einfo)
2086{
2087 void *data = einfo->ei_cbdata;
2088 int set = 0;
2089
d7e09d03
PT
2090 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2091 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2092 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2093 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2094
2095 lock_res_and_lock(lock);
d7e09d03 2096
7f1ae4c0 2097 if (!lock->l_ast_data)
d7e09d03
PT
2098 lock->l_ast_data = data;
2099 if (lock->l_ast_data == data)
2100 set = 1;
2101
d7e09d03
PT
2102 unlock_res_and_lock(lock);
2103
2104 return set;
2105}
2106
2107static int osc_set_data_with_check(struct lustre_handle *lockh,
2108 struct ldlm_enqueue_info *einfo)
2109{
2110 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2111 int set = 0;
2112
7f1ae4c0 2113 if (lock) {
d7e09d03
PT
2114 set = osc_set_lock_data_with_check(lock, einfo);
2115 LDLM_LOCK_PUT(lock);
2116 } else
2117 CERROR("lockh %p, data %p - client evicted?\n",
2118 lockh, einfo->ei_cbdata);
2119 return set;
2120}
2121
06563b56
JX
2122static int osc_enqueue_fini(struct ptlrpc_request *req,
2123 osc_enqueue_upcall_f upcall, void *cookie,
2124 struct lustre_handle *lockh, enum ldlm_mode mode,
2125 __u64 *flags, int agl, int errcode)
d7e09d03 2126{
06563b56
JX
2127 bool intent = *flags & LDLM_FL_HAS_INTENT;
2128 int rc;
50ffcb7e 2129
06563b56
JX
2130 /* The request was created before ldlm_cli_enqueue call. */
2131 if (intent && errcode == ELDLM_LOCK_ABORTED) {
2132 struct ldlm_reply *rep;
d7e09d03 2133
06563b56 2134 rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
d7e09d03 2135
06563b56
JX
2136 rep->lock_policy_res1 =
2137 ptlrpc_status_ntoh(rep->lock_policy_res1);
2138 if (rep->lock_policy_res1)
2139 errcode = rep->lock_policy_res1;
2140 if (!agl)
2141 *flags |= LDLM_FL_LVB_READY;
2142 } else if (errcode == ELDLM_OK) {
d7e09d03 2143 *flags |= LDLM_FL_LVB_READY;
d7e09d03
PT
2144 }
2145
2146 /* Call the update callback. */
06563b56
JX
2147 rc = (*upcall)(cookie, lockh, errcode);
2148 /* release the reference taken in ldlm_cli_enqueue() */
2149 if (errcode == ELDLM_LOCK_MATCHED)
2150 errcode = ELDLM_OK;
2151 if (errcode == ELDLM_OK && lustre_handle_is_used(lockh))
2152 ldlm_lock_decref(lockh, mode);
2153
0a3bdb00 2154 return rc;
d7e09d03
PT
2155}
2156
2157static int osc_enqueue_interpret(const struct lu_env *env,
2158 struct ptlrpc_request *req,
2159 struct osc_enqueue_args *aa, int rc)
2160{
2161 struct ldlm_lock *lock;
06563b56
JX
2162 struct lustre_handle *lockh = &aa->oa_lockh;
2163 enum ldlm_mode mode = aa->oa_mode;
2164 struct ost_lvb *lvb = aa->oa_lvb;
2165 __u32 lvb_len = sizeof(*lvb);
2166 __u64 flags = 0;
2167
d7e09d03
PT
2168
2169 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
30aa9c52
OD
2170 * be valid.
2171 */
06563b56
JX
2172 lock = ldlm_handle2lock(lockh);
2173 LASSERTF(lock, "lockh %llx, req %p, aa %p - client evicted?\n",
2174 lockh->cookie, req, aa);
d7e09d03
PT
2175
2176 /* Take an additional reference so that a blocking AST that
2177 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2178 * to arrive after an upcall has been executed by
30aa9c52
OD
2179 * osc_enqueue_fini().
2180 */
06563b56 2181 ldlm_lock_addref(lockh, mode);
d7e09d03 2182
219eeac8
AB
2183 /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */
2184 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_HANG, 2);
2185
d7e09d03
PT
2186 /* Let CP AST to grant the lock first. */
2187 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2188
06563b56
JX
2189 if (aa->oa_agl) {
2190 LASSERT(!aa->oa_lvb);
2191 LASSERT(!aa->oa_flags);
2192 aa->oa_flags = &flags;
d7e09d03
PT
2193 }
2194
2195 /* Complete obtaining the lock procedure. */
06563b56
JX
2196 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1,
2197 aa->oa_mode, aa->oa_flags, lvb, lvb_len,
2198 lockh, rc);
d7e09d03 2199 /* Complete osc stuff. */
06563b56
JX
2200 rc = osc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode,
2201 aa->oa_flags, aa->oa_agl, rc);
d7e09d03
PT
2202
2203 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2204
06563b56 2205 ldlm_lock_decref(lockh, mode);
d7e09d03
PT
2206 LDLM_LOCK_PUT(lock);
2207 return rc;
2208}
2209
d7e09d03
PT
2210struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2211
2212/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2213 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2214 * other synchronous requests, however keeping some locks and trying to obtain
2215 * others may take a considerable amount of time in a case of ost failure; and
2216 * when other sync requests do not get released lock from a client, the client
06563b56 2217 * is evicted from the cluster -- such scenaries make the life difficult, so
30aa9c52
OD
2218 * release locks just after they are obtained.
2219 */
d7e09d03
PT
2220int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2221 __u64 *flags, ldlm_policy_data_t *policy,
2222 struct ost_lvb *lvb, int kms_valid,
06563b56 2223 osc_enqueue_upcall_f upcall, void *cookie,
d7e09d03 2224 struct ldlm_enqueue_info *einfo,
d7e09d03
PT
2225 struct ptlrpc_request_set *rqset, int async, int agl)
2226{
2227 struct obd_device *obd = exp->exp_obd;
06563b56 2228 struct lustre_handle lockh = { 0 };
d7e09d03
PT
2229 struct ptlrpc_request *req = NULL;
2230 int intent = *flags & LDLM_FL_HAS_INTENT;
025fd3c2 2231 __u64 match_flags = *flags;
52ee0d20 2232 enum ldlm_mode mode;
d7e09d03 2233 int rc;
d7e09d03
PT
2234
2235 /* Filesystem lock extents are extended to page boundaries so that
30aa9c52
OD
2236 * dealing with the page cache is a little smoother.
2237 */
616387e8
OD
2238 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2239 policy->l_extent.end |= ~PAGE_MASK;
d7e09d03
PT
2240
2241 /*
2242 * kms is not valid when either object is completely fresh (so that no
2243 * locks are cached), or object was evicted. In the latter case cached
2244 * lock cannot be used, because it would prime inode state with
2245 * potentially stale LVB.
2246 */
2247 if (!kms_valid)
2248 goto no_match;
2249
2250 /* Next, search for already existing extent locks that will cover us */
2251 /* If we're trying to read, we also search for an existing PW lock. The
2252 * VFS and page cache already protect us locally, so lots of readers/
2253 * writers can share a single PW lock.
2254 *
2255 * There are problems with conversion deadlocks, so instead of
2256 * converting a read lock to a write lock, we'll just enqueue a new
2257 * one.
2258 *
2259 * At some point we should cancel the read lock instead of making them
2260 * send us a blocking callback, but there are problems with canceling
30aa9c52
OD
2261 * locks out from other users right now, too.
2262 */
d7e09d03
PT
2263 mode = einfo->ei_mode;
2264 if (einfo->ei_mode == LCK_PR)
2265 mode |= LCK_PW;
025fd3c2
AS
2266 if (agl == 0)
2267 match_flags |= LDLM_FL_LVB_READY;
2268 if (intent != 0)
2269 match_flags |= LDLM_FL_BLOCK_GRANTED;
2270 mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
06563b56 2271 einfo->ei_type, policy, mode, &lockh, 0);
d7e09d03 2272 if (mode) {
06563b56 2273 struct ldlm_lock *matched;
d7e09d03 2274
06563b56
JX
2275 if (*flags & LDLM_FL_TEST_LOCK)
2276 return ELDLM_OK;
2277
2278 matched = ldlm_handle2lock(&lockh);
2279 if (agl) {
2280 /* AGL enqueues DLM locks speculatively. Therefore if
2281 * it already exists a DLM lock, it wll just inform the
2282 * caller to cancel the AGL process for this stripe.
30aa9c52 2283 */
06563b56 2284 ldlm_lock_decref(&lockh, mode);
d7e09d03 2285 LDLM_LOCK_PUT(matched);
0a3bdb00 2286 return -ECANCELED;
06563b56 2287 } else if (osc_set_lock_data_with_check(matched, einfo)) {
d7e09d03 2288 *flags |= LDLM_FL_LVB_READY;
06563b56
JX
2289 /* We already have a lock, and it's referenced. */
2290 (*upcall)(cookie, &lockh, ELDLM_LOCK_MATCHED);
d7e09d03 2291
06563b56 2292 ldlm_lock_decref(&lockh, mode);
d7e09d03 2293 LDLM_LOCK_PUT(matched);
0a3bdb00 2294 return ELDLM_OK;
06563b56
JX
2295 } else {
2296 ldlm_lock_decref(&lockh, mode);
2297 LDLM_LOCK_PUT(matched);
d7e09d03
PT
2298 }
2299 }
2300
06563b56
JX
2301no_match:
2302 if (*flags & LDLM_FL_TEST_LOCK)
2303 return -ENOLCK;
d7e09d03 2304 if (intent) {
d7e09d03
PT
2305 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2306 &RQF_LDLM_ENQUEUE_LVB);
7f1ae4c0 2307 if (!req)
0a3bdb00 2308 return -ENOMEM;
d7e09d03 2309
2640256e
VF
2310 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
2311 if (rc) {
d7e09d03 2312 ptlrpc_request_free(req);
0a3bdb00 2313 return rc;
d7e09d03
PT
2314 }
2315
2316 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
ec83e611 2317 sizeof(*lvb));
d7e09d03
PT
2318 ptlrpc_request_set_replen(req);
2319 }
2320
2321 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2322 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2323
2324 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
06563b56
JX
2325 sizeof(*lvb), LVB_T_OST, &lockh, async);
2326 if (async) {
d7e09d03
PT
2327 if (!rc) {
2328 struct osc_enqueue_args *aa;
50ffcb7e 2329
06563b56 2330 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
d7e09d03 2331 aa = ptlrpc_req_async_args(req);
d7e09d03 2332 aa->oa_exp = exp;
06563b56
JX
2333 aa->oa_mode = einfo->ei_mode;
2334 aa->oa_type = einfo->ei_type;
2335 lustre_handle_copy(&aa->oa_lockh, &lockh);
d7e09d03
PT
2336 aa->oa_upcall = upcall;
2337 aa->oa_cookie = cookie;
d7e09d03 2338 aa->oa_agl = !!agl;
06563b56
JX
2339 if (!agl) {
2340 aa->oa_flags = flags;
2341 aa->oa_lvb = lvb;
2342 } else {
2343 /* AGL is essentially to enqueue an DLM lock
2344 * in advance, so we don't care about the
2345 * result of AGL enqueue.
2346 */
2347 aa->oa_lvb = NULL;
2348 aa->oa_flags = NULL;
2349 }
d7e09d03
PT
2350
2351 req->rq_interpret_reply =
2352 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2353 if (rqset == PTLRPCD_SET)
c5c4c6fa 2354 ptlrpcd_add_req(req);
d7e09d03
PT
2355 else
2356 ptlrpc_set_add_req(rqset, req);
2357 } else if (intent) {
2358 ptlrpc_req_finished(req);
2359 }
0a3bdb00 2360 return rc;
d7e09d03
PT
2361 }
2362
06563b56
JX
2363 rc = osc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode,
2364 flags, agl, rc);
d7e09d03
PT
2365 if (intent)
2366 ptlrpc_req_finished(req);
2367
0a3bdb00 2368 return rc;
d7e09d03
PT
2369}
2370
d7e09d03
PT
2371int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2372 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
875332d4 2373 __u64 *flags, void *data, struct lustre_handle *lockh,
d7e09d03
PT
2374 int unref)
2375{
2376 struct obd_device *obd = exp->exp_obd;
875332d4 2377 __u64 lflags = *flags;
52ee0d20 2378 enum ldlm_mode rc;
d7e09d03
PT
2379
2380 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
0a3bdb00 2381 return -EIO;
d7e09d03
PT
2382
2383 /* Filesystem lock extents are extended to page boundaries so that
30aa9c52
OD
2384 * dealing with the page cache is a little smoother
2385 */
616387e8
OD
2386 policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
2387 policy->l_extent.end |= ~PAGE_MASK;
d7e09d03
PT
2388
2389 /* Next, search for already existing extent locks that will cover us */
2390 /* If we're trying to read, we also search for an existing PW lock. The
2391 * VFS and page cache already protect us locally, so lots of readers/
30aa9c52
OD
2392 * writers can share a single PW lock.
2393 */
d7e09d03
PT
2394 rc = mode;
2395 if (mode == LCK_PR)
2396 rc |= LCK_PW;
2397 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2398 res_id, type, policy, rc, lockh, unref);
2399 if (rc) {
7f1ae4c0 2400 if (data) {
d7e09d03
PT
2401 if (!osc_set_data_with_check(lockh, data)) {
2402 if (!(lflags & LDLM_FL_TEST_LOCK))
2403 ldlm_lock_decref(lockh, rc);
0a3bdb00 2404 return 0;
d7e09d03
PT
2405 }
2406 }
2407 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2408 ldlm_lock_addref(lockh, LCK_PR);
2409 ldlm_lock_decref(lockh, LCK_PW);
2410 }
0a3bdb00 2411 return rc;
d7e09d03 2412 }
0a3bdb00 2413 return rc;
d7e09d03
PT
2414}
2415
2416int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2417{
d7e09d03
PT
2418 if (unlikely(mode == LCK_GROUP))
2419 ldlm_lock_decref_and_cancel(lockh, mode);
2420 else
2421 ldlm_lock_decref(lockh, mode);
2422
0a3bdb00 2423 return 0;
d7e09d03
PT
2424}
2425
d7e09d03
PT
2426static int osc_statfs_interpret(const struct lu_env *env,
2427 struct ptlrpc_request *req,
2428 struct osc_async_args *aa, int rc)
2429{
2430 struct obd_statfs *msfs;
d7e09d03
PT
2431
2432 if (rc == -EBADR)
2433 /* The request has in fact never been sent
2434 * due to issues at a higher level (LOV).
2435 * Exit immediately since the caller is
2436 * aware of the problem and takes care
30aa9c52
OD
2437 * of the clean up
2438 */
defa220f 2439 return rc;
d7e09d03
PT
2440
2441 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
26c4ea46
TJ
2442 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2443 rc = 0;
2444 goto out;
2445 }
d7e09d03
PT
2446
2447 if (rc != 0)
26c4ea46 2448 goto out;
d7e09d03
PT
2449
2450 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
7f1ae4c0 2451 if (!msfs) {
26c4ea46
TJ
2452 rc = -EPROTO;
2453 goto out;
d7e09d03
PT
2454 }
2455
2456 *aa->aa_oi->oi_osfs = *msfs;
2457out:
2458 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
0a3bdb00 2459 return rc;
d7e09d03
PT
2460}
2461
2462static int osc_statfs_async(struct obd_export *exp,
2463 struct obd_info *oinfo, __u64 max_age,
2464 struct ptlrpc_request_set *rqset)
2465{
29ac6840 2466 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
2467 struct ptlrpc_request *req;
2468 struct osc_async_args *aa;
29ac6840 2469 int rc;
d7e09d03
PT
2470
2471 /* We could possibly pass max_age in the request (as an absolute
2472 * timestamp or a "seconds.usec ago") so the target can avoid doing
2473 * extra calls into the filesystem if that isn't necessary (e.g.
2474 * during mount that would help a bit). Having relative timestamps
2475 * is not so great if request processing is slow, while absolute
30aa9c52
OD
2476 * timestamps are not ideal because they need time synchronization.
2477 */
d7e09d03 2478 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
7f1ae4c0 2479 if (!req)
0a3bdb00 2480 return -ENOMEM;
d7e09d03
PT
2481
2482 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2483 if (rc) {
2484 ptlrpc_request_free(req);
0a3bdb00 2485 return rc;
d7e09d03
PT
2486 }
2487 ptlrpc_request_set_replen(req);
2488 req->rq_request_portal = OST_CREATE_PORTAL;
2489 ptlrpc_at_set_req_timeout(req);
2490
2491 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2492 /* procfs requests not want stat in wait for avoid deadlock */
2493 req->rq_no_resend = 1;
2494 req->rq_no_delay = 1;
2495 }
2496
2497 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
e9570b49 2498 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
d7e09d03
PT
2499 aa = ptlrpc_req_async_args(req);
2500 aa->aa_oi = oinfo;
2501
2502 ptlrpc_set_add_req(rqset, req);
0a3bdb00 2503 return 0;
d7e09d03
PT
2504}
2505
2506static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2507 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2508{
29ac6840
CH
2509 struct obd_device *obd = class_exp2obd(exp);
2510 struct obd_statfs *msfs;
d7e09d03 2511 struct ptlrpc_request *req;
29ac6840 2512 struct obd_import *imp = NULL;
d7e09d03 2513 int rc;
d7e09d03 2514
30aa9c52
OD
2515 /* Since the request might also come from lprocfs, so we need
2516 * sync this with client_disconnect_export Bug15684
2517 */
d7e09d03
PT
2518 down_read(&obd->u.cli.cl_sem);
2519 if (obd->u.cli.cl_import)
2520 imp = class_import_get(obd->u.cli.cl_import);
2521 up_read(&obd->u.cli.cl_sem);
2522 if (!imp)
0a3bdb00 2523 return -ENODEV;
d7e09d03
PT
2524
2525 /* We could possibly pass max_age in the request (as an absolute
2526 * timestamp or a "seconds.usec ago") so the target can avoid doing
2527 * extra calls into the filesystem if that isn't necessary (e.g.
2528 * during mount that would help a bit). Having relative timestamps
2529 * is not so great if request processing is slow, while absolute
30aa9c52
OD
2530 * timestamps are not ideal because they need time synchronization.
2531 */
d7e09d03
PT
2532 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2533
2534 class_import_put(imp);
2535
7f1ae4c0 2536 if (!req)
0a3bdb00 2537 return -ENOMEM;
d7e09d03
PT
2538
2539 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2540 if (rc) {
2541 ptlrpc_request_free(req);
0a3bdb00 2542 return rc;
d7e09d03
PT
2543 }
2544 ptlrpc_request_set_replen(req);
2545 req->rq_request_portal = OST_CREATE_PORTAL;
2546 ptlrpc_at_set_req_timeout(req);
2547
2548 if (flags & OBD_STATFS_NODELAY) {
2549 /* procfs requests not want stat in wait for avoid deadlock */
2550 req->rq_no_resend = 1;
2551 req->rq_no_delay = 1;
2552 }
2553
2554 rc = ptlrpc_queue_wait(req);
2555 if (rc)
26c4ea46 2556 goto out;
d7e09d03
PT
2557
2558 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
7f1ae4c0 2559 if (!msfs) {
26c4ea46
TJ
2560 rc = -EPROTO;
2561 goto out;
d7e09d03
PT
2562 }
2563
2564 *osfs = *msfs;
2565
d7e09d03
PT
2566 out:
2567 ptlrpc_req_finished(req);
2568 return rc;
2569}
2570
2571/* Retrieve object striping information.
2572 *
2573 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2574 * the maximum number of OST indices which will fit in the user buffer.
2575 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2576 */
ec2d71d0
OD
2577static int osc_getstripe(struct lov_stripe_md *lsm,
2578 struct lov_user_md __user *lump)
d7e09d03
PT
2579{
2580 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2581 struct lov_user_md_v3 lum, *lumk;
2582 struct lov_user_ost_data_v1 *lmm_objects;
2583 int rc = 0, lum_size;
d7e09d03
PT
2584
2585 if (!lsm)
0a3bdb00 2586 return -ENODATA;
d7e09d03
PT
2587
2588 /* we only need the header part from user space to get lmm_magic and
30aa9c52
OD
2589 * lmm_stripe_count, (the header part is common to v1 and v3)
2590 */
d7e09d03
PT
2591 lum_size = sizeof(struct lov_user_md_v1);
2592 if (copy_from_user(&lum, lump, lum_size))
0a3bdb00 2593 return -EFAULT;
d7e09d03
PT
2594
2595 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2596 (lum.lmm_magic != LOV_USER_MAGIC_V3))
0a3bdb00 2597 return -EINVAL;
d7e09d03
PT
2598
2599 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2600 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2601 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2602 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2603
2604 /* we can use lov_mds_md_size() to compute lum_size
30aa9c52
OD
2605 * because lov_user_md_vX and lov_mds_md_vX have the same size
2606 */
d7e09d03
PT
2607 if (lum.lmm_stripe_count > 0) {
2608 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
7795178d 2609 lumk = kzalloc(lum_size, GFP_NOFS);
d7e09d03 2610 if (!lumk)
0a3bdb00 2611 return -ENOMEM;
d7e09d03
PT
2612
2613 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2614 lmm_objects =
2615 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2616 else
49880263 2617 lmm_objects = &lumk->lmm_objects[0];
d7e09d03
PT
2618 lmm_objects->l_ost_oi = lsm->lsm_oi;
2619 } else {
2620 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2621 lumk = &lum;
2622 }
2623
2624 lumk->lmm_oi = lsm->lsm_oi;
2625 lumk->lmm_stripe_count = 1;
2626
2627 if (copy_to_user(lump, lumk, lum_size))
2628 rc = -EFAULT;
2629
2630 if (lumk != &lum)
7795178d 2631 kfree(lumk);
d7e09d03 2632
0a3bdb00 2633 return rc;
d7e09d03
PT
2634}
2635
d7e09d03 2636static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
e09bee34 2637 void *karg, void __user *uarg)
d7e09d03
PT
2638{
2639 struct obd_device *obd = exp->exp_obd;
2640 struct obd_ioctl_data *data = karg;
2641 int err = 0;
d7e09d03
PT
2642
2643 if (!try_module_get(THIS_MODULE)) {
19b2056f
JN
2644 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
2645 module_name(THIS_MODULE));
d7e09d03
PT
2646 return -EINVAL;
2647 }
2648 switch (cmd) {
2649 case OBD_IOC_LOV_GET_CONFIG: {
2650 char *buf;
2651 struct lov_desc *desc;
2652 struct obd_uuid uuid;
2653
2654 buf = NULL;
2655 len = 0;
b7856753 2656 if (obd_ioctl_getdata(&buf, &len, uarg)) {
26c4ea46
TJ
2657 err = -EINVAL;
2658 goto out;
2659 }
d7e09d03
PT
2660
2661 data = (struct obd_ioctl_data *)buf;
2662
2663 if (sizeof(*desc) > data->ioc_inllen1) {
2664 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2665 err = -EINVAL;
2666 goto out;
d7e09d03
PT
2667 }
2668
2669 if (data->ioc_inllen2 < sizeof(uuid)) {
2670 obd_ioctl_freedata(buf, len);
26c4ea46
TJ
2671 err = -EINVAL;
2672 goto out;
d7e09d03
PT
2673 }
2674
2675 desc = (struct lov_desc *)data->ioc_inlbuf1;
2676 desc->ld_tgt_count = 1;
2677 desc->ld_active_tgt_count = 1;
2678 desc->ld_default_stripe_count = 1;
2679 desc->ld_default_stripe_size = 0;
2680 desc->ld_default_stripe_offset = 0;
2681 desc->ld_pattern = 0;
2682 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2683
2684 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2685
b7856753 2686 err = copy_to_user(uarg, buf, len);
d7e09d03
PT
2687 if (err)
2688 err = -EFAULT;
2689 obd_ioctl_freedata(buf, len);
26c4ea46 2690 goto out;
d7e09d03
PT
2691 }
2692 case LL_IOC_LOV_SETSTRIPE:
2693 err = obd_alloc_memmd(exp, karg);
2694 if (err > 0)
2695 err = 0;
26c4ea46 2696 goto out;
d7e09d03
PT
2697 case LL_IOC_LOV_GETSTRIPE:
2698 err = osc_getstripe(karg, uarg);
26c4ea46 2699 goto out;
d7e09d03
PT
2700 case OBD_IOC_CLIENT_RECOVER:
2701 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2702 data->ioc_inlbuf1, 0);
2703 if (err > 0)
2704 err = 0;
26c4ea46 2705 goto out;
d7e09d03
PT
2706 case IOC_OSC_SET_ACTIVE:
2707 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2708 data->ioc_offset);
26c4ea46 2709 goto out;
d7e09d03 2710 case OBD_IOC_POLL_QUOTACHECK:
167a47c5 2711 err = osc_quota_poll_check(exp, karg);
26c4ea46 2712 goto out;
d7e09d03
PT
2713 case OBD_IOC_PING_TARGET:
2714 err = ptlrpc_obd_ping(obd);
26c4ea46 2715 goto out;
d7e09d03
PT
2716 default:
2717 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2718 cmd, current_comm());
26c4ea46
TJ
2719 err = -ENOTTY;
2720 goto out;
d7e09d03
PT
2721 }
2722out:
2723 module_put(THIS_MODULE);
2724 return err;
2725}
2726
2727static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2728 u32 keylen, void *key, __u32 *vallen, void *val,
d7e09d03
PT
2729 struct lov_stripe_md *lsm)
2730{
d7e09d03 2731 if (!vallen || !val)
0a3bdb00 2732 return -EFAULT;
d7e09d03
PT
2733
2734 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2735 __u32 *stripe = val;
2736 *vallen = sizeof(*stripe);
2737 *stripe = 0;
0a3bdb00 2738 return 0;
d7e09d03
PT
2739 } else if (KEY_IS(KEY_LAST_ID)) {
2740 struct ptlrpc_request *req;
29ac6840
CH
2741 u64 *reply;
2742 char *tmp;
2743 int rc;
d7e09d03
PT
2744
2745 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2746 &RQF_OST_GET_INFO_LAST_ID);
7f1ae4c0 2747 if (!req)
0a3bdb00 2748 return -ENOMEM;
d7e09d03
PT
2749
2750 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2751 RCL_CLIENT, keylen);
2752 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2753 if (rc) {
2754 ptlrpc_request_free(req);
0a3bdb00 2755 return rc;
d7e09d03
PT
2756 }
2757
2758 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2759 memcpy(tmp, key, keylen);
2760
04a6284f
NC
2761 req->rq_no_delay = 1;
2762 req->rq_no_resend = 1;
d7e09d03
PT
2763 ptlrpc_request_set_replen(req);
2764 rc = ptlrpc_queue_wait(req);
2765 if (rc)
26c4ea46 2766 goto out;
d7e09d03
PT
2767
2768 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
7f1ae4c0 2769 if (!reply) {
26c4ea46
TJ
2770 rc = -EPROTO;
2771 goto out;
2772 }
d7e09d03 2773
21aef7d9 2774 *((u64 *)val) = *reply;
c71d2645 2775out:
d7e09d03 2776 ptlrpc_req_finished(req);
0a3bdb00 2777 return rc;
d7e09d03 2778 } else if (KEY_IS(KEY_FIEMAP)) {
167a47c5 2779 struct ll_fiemap_info_key *fm_key = key;
29ac6840
CH
2780 struct ldlm_res_id res_id;
2781 ldlm_policy_data_t policy;
2782 struct lustre_handle lockh;
52ee0d20 2783 enum ldlm_mode mode = 0;
29ac6840
CH
2784 struct ptlrpc_request *req;
2785 struct ll_user_fiemap *reply;
2786 char *tmp;
2787 int rc;
9d865439
AB
2788
2789 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2790 goto skip_locking;
2791
2792 policy.l_extent.start = fm_key->fiemap.fm_start &
616387e8 2793 PAGE_MASK;
9d865439
AB
2794
2795 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
09cbfeaf 2796 fm_key->fiemap.fm_start + PAGE_SIZE - 1)
9d865439
AB
2797 policy.l_extent.end = OBD_OBJECT_EOF;
2798 else
2799 policy.l_extent.end = (fm_key->fiemap.fm_start +
2800 fm_key->fiemap.fm_length +
5f479924 2801 PAGE_SIZE - 1) & PAGE_MASK;
9d865439
AB
2802
2803 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2804 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2805 LDLM_FL_BLOCK_GRANTED |
2806 LDLM_FL_LVB_READY,
2807 &res_id, LDLM_EXTENT, &policy,
2808 LCK_PR | LCK_PW, &lockh, 0);
2809 if (mode) { /* lock is cached on client */
2810 if (mode != LCK_PR) {
2811 ldlm_lock_addref(&lockh, LCK_PR);
2812 ldlm_lock_decref(&lockh, LCK_PW);
2813 }
2814 } else { /* no cached lock, needs acquire lock on server side */
2815 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2816 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2817 }
d7e09d03 2818
9d865439 2819skip_locking:
d7e09d03
PT
2820 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2821 &RQF_OST_GET_INFO_FIEMAP);
7f1ae4c0 2822 if (!req) {
26c4ea46
TJ
2823 rc = -ENOMEM;
2824 goto drop_lock;
2825 }
d7e09d03
PT
2826
2827 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2828 RCL_CLIENT, keylen);
2829 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2830 RCL_CLIENT, *vallen);
2831 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2832 RCL_SERVER, *vallen);
2833
2834 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2835 if (rc) {
2836 ptlrpc_request_free(req);
26c4ea46 2837 goto drop_lock;
d7e09d03
PT
2838 }
2839
2840 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2841 memcpy(tmp, key, keylen);
2842 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2843 memcpy(tmp, val, *vallen);
2844
2845 ptlrpc_request_set_replen(req);
2846 rc = ptlrpc_queue_wait(req);
2847 if (rc)
26c4ea46 2848 goto fini_req;
d7e09d03
PT
2849
2850 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
7f1ae4c0 2851 if (!reply) {
26c4ea46
TJ
2852 rc = -EPROTO;
2853 goto fini_req;
2854 }
d7e09d03
PT
2855
2856 memcpy(val, reply, *vallen);
9d865439 2857fini_req:
d7e09d03 2858 ptlrpc_req_finished(req);
9d865439
AB
2859drop_lock:
2860 if (mode)
2861 ldlm_lock_decref(&lockh, LCK_PR);
0a3bdb00 2862 return rc;
d7e09d03
PT
2863 }
2864
0a3bdb00 2865 return -EINVAL;
d7e09d03
PT
2866}
2867
2868static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 2869 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
2870 void *val, struct ptlrpc_request_set *set)
2871{
2872 struct ptlrpc_request *req;
29ac6840
CH
2873 struct obd_device *obd = exp->exp_obd;
2874 struct obd_import *imp = class_exp2cliimp(exp);
2875 char *tmp;
2876 int rc;
d7e09d03
PT
2877
2878 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2879
2880 if (KEY_IS(KEY_CHECKSUM)) {
2881 if (vallen != sizeof(int))
0a3bdb00 2882 return -EINVAL;
d7e09d03 2883 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
0a3bdb00 2884 return 0;
d7e09d03
PT
2885 }
2886
2887 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2888 sptlrpc_conf_client_adapt(obd);
0a3bdb00 2889 return 0;
d7e09d03
PT
2890 }
2891
2892 if (KEY_IS(KEY_FLUSH_CTX)) {
2893 sptlrpc_import_flush_my_ctx(imp);
0a3bdb00 2894 return 0;
d7e09d03
PT
2895 }
2896
2897 if (KEY_IS(KEY_CACHE_SET)) {
2898 struct client_obd *cli = &obd->u.cli;
2899
7f1ae4c0 2900 LASSERT(!cli->cl_cache); /* only once */
167a47c5 2901 cli->cl_cache = val;
1b02bde3 2902 cl_cache_incref(cli->cl_cache);
d7e09d03
PT
2903 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2904
2905 /* add this osc into entity list */
2906 LASSERT(list_empty(&cli->cl_lru_osc));
2907 spin_lock(&cli->cl_cache->ccc_lru_lock);
2908 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2909 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2910
0a3bdb00 2911 return 0;
d7e09d03
PT
2912 }
2913
2914 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2915 struct client_obd *cli = &obd->u.cli;
2916 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2917 int target = *(int *)val;
2918
2579d8d0 2919 nr = osc_lru_shrink(env, cli, min(nr, target), true);
d7e09d03 2920 *(int *)val -= nr;
0a3bdb00 2921 return 0;
d7e09d03
PT
2922 }
2923
2924 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
0a3bdb00 2925 return -EINVAL;
d7e09d03
PT
2926
2927 /* We pass all other commands directly to OST. Since nobody calls osc
30aa9c52
OD
2928 * methods directly and everybody is supposed to go through LOV, we
2929 * assume lov checked invalid values for us.
2930 * The only recognised values so far are evict_by_nid and mds_conn.
2931 * Even if something bad goes through, we'd get a -EINVAL from OST
2932 * anyway.
2933 */
d7e09d03
PT
2934
2935 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2936 &RQF_OST_SET_GRANT_INFO :
2937 &RQF_OBD_SET_INFO);
7f1ae4c0 2938 if (!req)
0a3bdb00 2939 return -ENOMEM;
d7e09d03
PT
2940
2941 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2942 RCL_CLIENT, keylen);
2943 if (!KEY_IS(KEY_GRANT_SHRINK))
2944 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2945 RCL_CLIENT, vallen);
2946 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2947 if (rc) {
2948 ptlrpc_request_free(req);
0a3bdb00 2949 return rc;
d7e09d03
PT
2950 }
2951
2952 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2953 memcpy(tmp, key, keylen);
2954 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2955 &RMF_OST_BODY :
2956 &RMF_SETINFO_VAL);
2957 memcpy(tmp, val, vallen);
2958
2959 if (KEY_IS(KEY_GRANT_SHRINK)) {
f024bad4 2960 struct osc_brw_async_args *aa;
d7e09d03
PT
2961 struct obdo *oa;
2962
2963 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2964 aa = ptlrpc_req_async_args(req);
c4418dac 2965 oa = kmem_cache_zalloc(obdo_cachep, GFP_NOFS);
d7e09d03
PT
2966 if (!oa) {
2967 ptlrpc_req_finished(req);
0a3bdb00 2968 return -ENOMEM;
d7e09d03
PT
2969 }
2970 *oa = ((struct ost_body *)val)->oa;
2971 aa->aa_oa = oa;
2972 req->rq_interpret_reply = osc_shrink_grant_interpret;
2973 }
2974
2975 ptlrpc_request_set_replen(req);
2976 if (!KEY_IS(KEY_GRANT_SHRINK)) {
7f1ae4c0 2977 LASSERT(set);
d7e09d03
PT
2978 ptlrpc_set_add_req(set, req);
2979 ptlrpc_check_set(NULL, set);
c5c4c6fa
OW
2980 } else {
2981 ptlrpcd_add_req(req);
2982 }
d7e09d03 2983
0a3bdb00 2984 return 0;
d7e09d03
PT
2985}
2986
d7e09d03
PT
2987static int osc_reconnect(const struct lu_env *env,
2988 struct obd_export *exp, struct obd_device *obd,
2989 struct obd_uuid *cluuid,
2990 struct obd_connect_data *data,
2991 void *localdata)
2992{
2993 struct client_obd *cli = &obd->u.cli;
2994
7f1ae4c0 2995 if (data && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
d7e09d03
PT
2996 long lost_grant;
2997
7d53d8f4 2998 spin_lock(&cli->cl_loi_list_lock);
3147b268
HZ
2999 data->ocd_grant = (cli->cl_avail_grant +
3000 (cli->cl_dirty_pages << PAGE_SHIFT)) ?:
3001 2 * cli_brw_size(obd);
d7e09d03
PT
3002 lost_grant = cli->cl_lost_grant;
3003 cli->cl_lost_grant = 0;
7d53d8f4 3004 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03 3005
2d00bd17
JP
3006 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d ocd_grant: %d, lost: %ld.\n",
3007 data->ocd_connect_flags,
d7e09d03
PT
3008 data->ocd_version, data->ocd_grant, lost_grant);
3009 }
3010
0a3bdb00 3011 return 0;
d7e09d03
PT
3012}
3013
3014static int osc_disconnect(struct obd_export *exp)
3015{
3016 struct obd_device *obd = class_exp2obd(exp);
d7e09d03
PT
3017 int rc;
3018
d7e09d03
PT
3019 rc = client_disconnect_export(exp);
3020 /**
3021 * Initially we put del_shrink_grant before disconnect_export, but it
3022 * causes the following problem if setup (connect) and cleanup
3023 * (disconnect) are tangled together.
3024 * connect p1 disconnect p2
3025 * ptlrpc_connect_import
3026 * ............... class_manual_cleanup
3027 * osc_disconnect
3028 * del_shrink_grant
3029 * ptlrpc_connect_interrupt
3030 * init_grant_shrink
3031 * add this client to shrink list
3032 * cleanup_osc
3033 * Bang! pinger trigger the shrink.
3034 * So the osc should be disconnected from the shrink list, after we
3035 * are sure the import has been destroyed. BUG18662
3036 */
7f1ae4c0 3037 if (!obd->u.cli.cl_import)
d7e09d03
PT
3038 osc_del_shrink_grant(&obd->u.cli);
3039 return rc;
3040}
3041
3042static int osc_import_event(struct obd_device *obd,
3043 struct obd_import *imp,
3044 enum obd_import_event event)
3045{
3046 struct client_obd *cli;
3047 int rc = 0;
3048
d7e09d03
PT
3049 LASSERT(imp->imp_obd == obd);
3050
3051 switch (event) {
3052 case IMP_EVENT_DISCON: {
3053 cli = &obd->u.cli;
7d53d8f4 3054 spin_lock(&cli->cl_loi_list_lock);
d7e09d03
PT
3055 cli->cl_avail_grant = 0;
3056 cli->cl_lost_grant = 0;
7d53d8f4 3057 spin_unlock(&cli->cl_loi_list_lock);
d7e09d03
PT
3058 break;
3059 }
3060 case IMP_EVENT_INACTIVE: {
3061 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3062 break;
3063 }
3064 case IMP_EVENT_INVALIDATE: {
3065 struct ldlm_namespace *ns = obd->obd_namespace;
29ac6840
CH
3066 struct lu_env *env;
3067 int refcheck;
d7e09d03
PT
3068
3069 env = cl_env_get(&refcheck);
3070 if (!IS_ERR(env)) {
3071 /* Reset grants */
3072 cli = &obd->u.cli;
3073 /* all pages go to failing rpcs due to the invalid
30aa9c52
OD
3074 * import
3075 */
c5c4c6fa 3076 osc_io_unplug(env, cli, NULL);
d7e09d03
PT
3077
3078 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3079 cl_env_put(env, &refcheck);
da5ecb4d 3080 } else {
d7e09d03 3081 rc = PTR_ERR(env);
da5ecb4d 3082 }
d7e09d03
PT
3083 break;
3084 }
3085 case IMP_EVENT_ACTIVE: {
3086 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3087 break;
3088 }
3089 case IMP_EVENT_OCD: {
3090 struct obd_connect_data *ocd = &imp->imp_connect_data;
3091
3092 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3093 osc_init_grant(&obd->u.cli, ocd);
3094
3095 /* See bug 7198 */
3096 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
b2952d62 3097 imp->imp_client->cli_request_portal = OST_REQUEST_PORTAL;
d7e09d03
PT
3098
3099 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3100 break;
3101 }
3102 case IMP_EVENT_DEACTIVATE: {
3103 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3104 break;
3105 }
3106 case IMP_EVENT_ACTIVATE: {
3107 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3108 break;
3109 }
3110 default:
3111 CERROR("Unknown import event %d\n", event);
3112 LBUG();
3113 }
0a3bdb00 3114 return rc;
d7e09d03
PT
3115}
3116
3117/**
3118 * Determine whether the lock can be canceled before replaying the lock
3119 * during recovery, see bug16774 for detailed information.
3120 *
3121 * \retval zero the lock can't be canceled
3122 * \retval other ok to cancel
3123 */
7d443334 3124static int osc_cancel_weight(struct ldlm_lock *lock)
d7e09d03 3125{
d7e09d03 3126 /*
7d443334 3127 * Cancel all unused and granted extent lock.
d7e09d03
PT
3128 */
3129 if (lock->l_resource->lr_type == LDLM_EXTENT &&
7d443334
JX
3130 lock->l_granted_mode == lock->l_req_mode &&
3131 osc_ldlm_weigh_ast(lock) == 0)
0a3bdb00 3132 return 1;
d7e09d03 3133
0a3bdb00 3134 return 0;
d7e09d03
PT
3135}
3136
3137static int brw_queue_work(const struct lu_env *env, void *data)
3138{
3139 struct client_obd *cli = data;
3140
3141 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3142
c5c4c6fa 3143 osc_io_unplug(env, cli, NULL);
0a3bdb00 3144 return 0;
d7e09d03
PT
3145}
3146
3147int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3148{
ea7893bb 3149 struct lprocfs_static_vars lvars = { NULL };
29ac6840
CH
3150 struct client_obd *cli = &obd->u.cli;
3151 void *handler;
3152 int rc;
aefd9d71
LX
3153 int adding;
3154 int added;
3155 int req_count;
d7e09d03
PT
3156
3157 rc = ptlrpcd_addref();
3158 if (rc)
0a3bdb00 3159 return rc;
d7e09d03
PT
3160
3161 rc = client_obd_setup(obd, lcfg);
3162 if (rc)
26c4ea46 3163 goto out_ptlrpcd;
d7e09d03
PT
3164
3165 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
26c4ea46
TJ
3166 if (IS_ERR(handler)) {
3167 rc = PTR_ERR(handler);
3168 goto out_client_setup;
3169 }
d7e09d03
PT
3170 cli->cl_writeback_work = handler;
3171
2579d8d0
JX
3172 handler = ptlrpcd_alloc_work(cli->cl_import, lru_queue_work, cli);
3173 if (IS_ERR(handler)) {
3174 rc = PTR_ERR(handler);
3175 goto out_ptlrpcd_work;
3176 }
3177
3178 cli->cl_lru_work = handler;
3179
d7e09d03
PT
3180 rc = osc_quota_setup(obd);
3181 if (rc)
26c4ea46 3182 goto out_ptlrpcd_work;
d7e09d03
PT
3183
3184 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3185 lprocfs_osc_init_vars(&lvars);
9b801302 3186 if (lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars) == 0) {
d7e09d03
PT
3187 lproc_osc_attach_seqstat(obd);
3188 sptlrpc_lprocfs_cliobd_attach(obd);
3189 ptlrpc_lprocfs_register_obd(obd);
3190 }
3191
aefd9d71
LX
3192 /*
3193 * We try to control the total number of requests with a upper limit
3194 * osc_reqpool_maxreqcount. There might be some race which will cause
3195 * over-limit allocation, but it is fine.
3196 */
3197 req_count = atomic_read(&osc_pool_req_count);
3198 if (req_count < osc_reqpool_maxreqcount) {
3199 adding = cli->cl_max_rpcs_in_flight + 2;
3200 if (req_count + adding > osc_reqpool_maxreqcount)
3201 adding = osc_reqpool_maxreqcount - req_count;
3202
3203 added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
3204 atomic_add(added, &osc_pool_req_count);
3205 }
d7e09d03
PT
3206
3207 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
7d443334 3208 ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
0a3bdb00 3209 return rc;
d7e09d03
PT
3210
3211out_ptlrpcd_work:
2579d8d0
JX
3212 if (cli->cl_writeback_work) {
3213 ptlrpcd_destroy_work(cli->cl_writeback_work);
3214 cli->cl_writeback_work = NULL;
3215 }
3216 if (cli->cl_lru_work) {
3217 ptlrpcd_destroy_work(cli->cl_lru_work);
3218 cli->cl_lru_work = NULL;
3219 }
d7e09d03
PT
3220out_client_setup:
3221 client_obd_cleanup(obd);
3222out_ptlrpcd:
3223 ptlrpcd_decref();
0a3bdb00 3224 return rc;
d7e09d03
PT
3225}
3226
3227static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3228{
d7e09d03
PT
3229 switch (stage) {
3230 case OBD_CLEANUP_EARLY: {
3231 struct obd_import *imp;
50ffcb7e 3232
d7e09d03
PT
3233 imp = obd->u.cli.cl_import;
3234 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3235 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3236 ptlrpc_deactivate_import(imp);
3237 spin_lock(&imp->imp_lock);
3238 imp->imp_pingable = 0;
3239 spin_unlock(&imp->imp_lock);
3240 break;
3241 }
3242 case OBD_CLEANUP_EXPORTS: {
3243 struct client_obd *cli = &obd->u.cli;
3244 /* LU-464
3245 * for echo client, export may be on zombie list, wait for
3246 * zombie thread to cull it, because cli.cl_import will be
3247 * cleared in client_disconnect_export():
3248 * class_export_destroy() -> obd_cleanup() ->
3249 * echo_device_free() -> echo_client_cleanup() ->
3250 * obd_disconnect() -> osc_disconnect() ->
3251 * client_disconnect_export()
3252 */
3253 obd_zombie_barrier();
3254 if (cli->cl_writeback_work) {
3255 ptlrpcd_destroy_work(cli->cl_writeback_work);
3256 cli->cl_writeback_work = NULL;
3257 }
2579d8d0
JX
3258 if (cli->cl_lru_work) {
3259 ptlrpcd_destroy_work(cli->cl_lru_work);
3260 cli->cl_lru_work = NULL;
3261 }
d7e09d03
PT
3262 obd_cleanup_client_import(obd);
3263 ptlrpc_lprocfs_unregister_obd(obd);
3264 lprocfs_obd_cleanup(obd);
d7e09d03
PT
3265 break;
3266 }
3267 }
41f8d410 3268 return 0;
d7e09d03
PT
3269}
3270
f51e5a20 3271static int osc_cleanup(struct obd_device *obd)
d7e09d03
PT
3272{
3273 struct client_obd *cli = &obd->u.cli;
3274 int rc;
3275
d7e09d03 3276 /* lru cleanup */
7f1ae4c0 3277 if (cli->cl_cache) {
d7e09d03
PT
3278 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3279 spin_lock(&cli->cl_cache->ccc_lru_lock);
3280 list_del_init(&cli->cl_lru_osc);
3281 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3282 cli->cl_lru_left = NULL;
1b02bde3 3283 cl_cache_decref(cli->cl_cache);
d7e09d03
PT
3284 cli->cl_cache = NULL;
3285 }
3286
3287 /* free memory of osc quota cache */
3288 osc_quota_cleanup(obd);
3289
3290 rc = client_obd_cleanup(obd);
3291
3292 ptlrpcd_decref();
0a3bdb00 3293 return rc;
d7e09d03
PT
3294}
3295
3296int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3297{
ea7893bb 3298 struct lprocfs_static_vars lvars = { NULL };
d7e09d03
PT
3299 int rc = 0;
3300
3301 lprocfs_osc_init_vars(&lvars);
3302
3303 switch (lcfg->lcfg_command) {
3304 default:
3305 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3306 lcfg, obd);
3307 if (rc > 0)
3308 rc = 0;
3309 break;
3310 }
3311
fbe7c6c7 3312 return rc;
d7e09d03
PT
3313}
3314
21aef7d9 3315static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
3316{
3317 return osc_process_config_base(obd, buf);
3318}
3319
f51e5a20 3320static struct obd_ops osc_obd_ops = {
a13b1f32
DC
3321 .owner = THIS_MODULE,
3322 .setup = osc_setup,
3323 .precleanup = osc_precleanup,
3324 .cleanup = osc_cleanup,
3325 .add_conn = client_import_add_conn,
3326 .del_conn = client_import_del_conn,
3327 .connect = client_connect_import,
3328 .reconnect = osc_reconnect,
3329 .disconnect = osc_disconnect,
3330 .statfs = osc_statfs,
3331 .statfs_async = osc_statfs_async,
3332 .packmd = osc_packmd,
3333 .unpackmd = osc_unpackmd,
3334 .create = osc_create,
3335 .destroy = osc_destroy,
3336 .getattr = osc_getattr,
3337 .getattr_async = osc_getattr_async,
3338 .setattr = osc_setattr,
3339 .setattr_async = osc_setattr_async,
a13b1f32
DC
3340 .iocontrol = osc_iocontrol,
3341 .get_info = osc_get_info,
3342 .set_info_async = osc_set_info_async,
3343 .import_event = osc_import_event,
3344 .process_config = osc_process_config,
3345 .quotactl = osc_quotactl,
3346 .quotacheck = osc_quotacheck,
d7e09d03
PT
3347};
3348
3349extern struct lu_kmem_descr osc_caches[];
d7e09d03
PT
3350extern struct lock_class_key osc_ast_guard_class;
3351
b47ea4bb 3352static int __init osc_init(void)
d7e09d03 3353{
ea7893bb 3354 struct lprocfs_static_vars lvars = { NULL };
aefd9d71
LX
3355 unsigned int reqpool_size;
3356 unsigned int reqsize;
d7e09d03 3357 int rc;
d7e09d03
PT
3358
3359 /* print an address of _any_ initialized kernel symbol from this
3360 * module, to allow debugging with gdb that doesn't support data
30aa9c52
OD
3361 * symbols from modules.
3362 */
d7e09d03
PT
3363 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3364
3365 rc = lu_kmem_init(osc_caches);
a55e0f44 3366 if (rc)
0a3bdb00 3367 return rc;
d7e09d03
PT
3368
3369 lprocfs_osc_init_vars(&lvars);
3370
2962b440 3371 rc = class_register_type(&osc_obd_ops, NULL,
d7e09d03 3372 LUSTRE_OSC_NAME, &osc_device_type);
aefd9d71
LX
3373 if (rc)
3374 goto out_kmem;
d7e09d03 3375
aefd9d71
LX
3376 /* This is obviously too much memory, only prevent overflow here */
3377 if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0) {
3378 rc = -EINVAL;
3379 goto out_type;
3380 }
3381
3382 reqpool_size = osc_reqpool_mem_max << 20;
3383
3384 reqsize = 1;
3385 while (reqsize < OST_MAXREQSIZE)
3386 reqsize = reqsize << 1;
3387
3388 /*
3389 * We don't enlarge the request count in OSC pool according to
3390 * cl_max_rpcs_in_flight. The allocation from the pool will only be
3391 * tried after normal allocation failed. So a small OSC pool won't
3392 * cause much performance degression in most of cases.
3393 */
3394 osc_reqpool_maxreqcount = reqpool_size / reqsize;
3395
3396 atomic_set(&osc_pool_req_count, 0);
3397 osc_rq_pool = ptlrpc_init_rq_pool(0, OST_MAXREQSIZE,
3398 ptlrpc_add_rqs_to_pool);
3399
3400 if (osc_rq_pool)
3401 return 0;
3402
3403 rc = -ENOMEM;
3404
3405out_type:
3406 class_unregister_type(LUSTRE_OSC_NAME);
3407out_kmem:
3408 lu_kmem_fini(osc_caches);
0a3bdb00 3409 return rc;
d7e09d03
PT
3410}
3411
3412static void /*__exit*/ osc_exit(void)
3413{
3414 class_unregister_type(LUSTRE_OSC_NAME);
3415 lu_kmem_fini(osc_caches);
aefd9d71 3416 ptlrpc_free_rq_pool(osc_rq_pool);
d7e09d03
PT
3417}
3418
a0455471 3419MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
d7e09d03
PT
3420MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3421MODULE_LICENSE("GPL");
6960736c 3422MODULE_VERSION(LUSTRE_VERSION_STRING);
d7e09d03 3423
6960736c
GKH
3424module_init(osc_init);
3425module_exit(osc_exit);
This page took 0.763472 seconds and 5 git commands to generate.