Merge remote-tracking branch 'staging/staging-next'
[deliverable/linux.git] / drivers / staging / lustre / lustre / obdecho / echo_client.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
6a5b99a4 18 * http://www.gnu.org/licenses/gpl-2.0.html
d7e09d03 19 *
d7e09d03
PT
20 * GPL HEADER END
21 */
22/*
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
25 *
1dc563a6 26 * Copyright (c) 2011, 2015, Intel Corporation.
d7e09d03
PT
27 */
28/*
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
31 */
32
33#define DEBUG_SUBSYSTEM S_ECHO
9fdaf8c0 34#include "../../include/linux/libcfs/libcfs.h"
d7e09d03 35
7f2d973a
GKH
36#include "../include/obd.h"
37#include "../include/obd_support.h"
38#include "../include/obd_class.h"
39#include "../include/lustre_debug.h"
40#include "../include/lprocfs_status.h"
41#include "../include/cl_object.h"
7f2d973a
GKH
42#include "../include/lustre_fid.h"
43#include "../include/lustre_acl.h"
8877d3bf 44#include "../include/lustre/lustre_ioctl.h"
7f2d973a 45#include "../include/lustre_net.h"
d7e09d03
PT
46
47#include "echo_internal.h"
48
49/** \defgroup echo_client Echo Client
50 * @{
51 */
52
53struct echo_device {
54 struct cl_device ed_cl;
55 struct echo_client_obd *ed_ec;
56
57 struct cl_site ed_site_myself;
58 struct cl_site *ed_site;
59 struct lu_device *ed_next;
d7e09d03
PT
60};
61
62struct echo_object {
63 struct cl_object eo_cl;
64 struct cl_object_header eo_hdr;
65
66 struct echo_device *eo_dev;
67 struct list_head eo_obj_chain;
68 struct lov_stripe_md *eo_lsm;
69 atomic_t eo_npages;
70 int eo_deleted;
71};
72
73struct echo_object_conf {
74 struct cl_object_conf eoc_cl;
75 struct lov_stripe_md **eoc_md;
76};
77
78struct echo_page {
79 struct cl_page_slice ep_cl;
80 struct mutex ep_lock;
d7e09d03
PT
81};
82
83struct echo_lock {
84 struct cl_lock_slice el_cl;
85 struct list_head el_chain;
86 struct echo_object *el_object;
87 __u64 el_cookie;
88 atomic_t el_refcount;
89};
90
d7e09d03
PT
91static int echo_client_setup(const struct lu_env *env,
92 struct obd_device *obddev,
93 struct lustre_cfg *lcfg);
94static int echo_client_cleanup(struct obd_device *obddev);
95
d7e09d03
PT
96/** \defgroup echo_helpers Helper functions
97 * @{
98 */
99static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
100{
101 return container_of0(dev, struct echo_device, ed_cl);
102}
103
104static inline struct cl_device *echo_dev2cl(struct echo_device *d)
105{
106 return &d->ed_cl;
107}
108
109static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
110{
111 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
112}
113
114static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
115{
116 return &eco->eo_cl;
117}
118
119static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
120{
121 return container_of(o, struct echo_object, eo_cl);
122}
123
124static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
125{
126 return container_of(s, struct echo_page, ep_cl);
127}
128
129static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
130{
131 return container_of(s, struct echo_lock, el_cl);
132}
133
134static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
135{
136 return ecl->el_cl.cls_lock;
137}
138
139static struct lu_context_key echo_thread_key;
140static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
141{
142 struct echo_thread_info *info;
ded9f908 143
d7e09d03 144 info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
7ae59eda 145 LASSERT(info);
d7e09d03
PT
146 return info;
147}
148
149static inline
150struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
151{
152 return container_of(c, struct echo_object_conf, eoc_cl);
153}
154
155/** @} echo_helpers */
156
157static struct echo_object *cl_echo_object_find(struct echo_device *d,
158 struct lov_stripe_md **lsm);
159static int cl_echo_object_put(struct echo_object *eco);
21aef7d9 160static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
d7e09d03
PT
161 struct page **pages, int npages, int async);
162
d7e09d03
PT
163struct echo_thread_info {
164 struct echo_object_conf eti_conf;
165 struct lustre_md eti_md;
166
167 struct cl_2queue eti_queue;
168 struct cl_io eti_io;
06563b56 169 struct cl_lock eti_lock;
d7e09d03
PT
170 struct lu_fid eti_fid;
171 struct lu_fid eti_fid2;
d7e09d03
PT
172};
173
174/* No session used right now */
175struct echo_session_info {
176 unsigned long dummy;
177};
178
179static struct kmem_cache *echo_lock_kmem;
180static struct kmem_cache *echo_object_kmem;
181static struct kmem_cache *echo_thread_kmem;
182static struct kmem_cache *echo_session_kmem;
d7e09d03
PT
183
184static struct lu_kmem_descr echo_caches[] = {
185 {
186 .ckd_cache = &echo_lock_kmem,
187 .ckd_name = "echo_lock_kmem",
cb7e9f7b 188 .ckd_size = sizeof(struct echo_lock)
d7e09d03
PT
189 },
190 {
191 .ckd_cache = &echo_object_kmem,
192 .ckd_name = "echo_object_kmem",
cb7e9f7b 193 .ckd_size = sizeof(struct echo_object)
d7e09d03
PT
194 },
195 {
196 .ckd_cache = &echo_thread_kmem,
197 .ckd_name = "echo_thread_kmem",
cb7e9f7b 198 .ckd_size = sizeof(struct echo_thread_info)
d7e09d03
PT
199 },
200 {
201 .ckd_cache = &echo_session_kmem,
202 .ckd_name = "echo_session_kmem",
cb7e9f7b 203 .ckd_size = sizeof(struct echo_session_info)
d7e09d03 204 },
d7e09d03
PT
205 {
206 .ckd_cache = NULL
207 }
208};
209
210/** \defgroup echo_page Page operations
211 *
212 * Echo page operations.
213 *
214 * @{
215 */
d7e09d03
PT
216static int echo_page_own(const struct lu_env *env,
217 const struct cl_page_slice *slice,
218 struct cl_io *io, int nonblock)
219{
220 struct echo_page *ep = cl2echo_page(slice);
221
222 if (!nonblock)
223 mutex_lock(&ep->ep_lock);
224 else if (!mutex_trylock(&ep->ep_lock))
225 return -EAGAIN;
226 return 0;
227}
228
229static void echo_page_disown(const struct lu_env *env,
230 const struct cl_page_slice *slice,
231 struct cl_io *io)
232{
233 struct echo_page *ep = cl2echo_page(slice);
234
235 LASSERT(mutex_is_locked(&ep->ep_lock));
236 mutex_unlock(&ep->ep_lock);
237}
238
239static void echo_page_discard(const struct lu_env *env,
240 const struct cl_page_slice *slice,
241 struct cl_io *unused)
242{
243 cl_page_delete(env, slice->cpl_page);
244}
245
246static int echo_page_is_vmlocked(const struct lu_env *env,
247 const struct cl_page_slice *slice)
248{
249 if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
250 return -EBUSY;
251 return -ENODATA;
252}
253
254static void echo_page_completion(const struct lu_env *env,
255 const struct cl_page_slice *slice,
256 int ioret)
257{
7ae59eda 258 LASSERT(slice->cpl_page->cp_sync_io);
d7e09d03
PT
259}
260
261static void echo_page_fini(const struct lu_env *env,
262 struct cl_page_slice *slice)
263{
d7e09d03 264 struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
d7e09d03
PT
265
266 atomic_dec(&eco->eo_npages);
5f479924 267 put_page(slice->cpl_page->cp_vmpage);
d7e09d03
PT
268}
269
270static int echo_page_prep(const struct lu_env *env,
271 const struct cl_page_slice *slice,
272 struct cl_io *unused)
273{
274 return 0;
275}
276
277static int echo_page_print(const struct lu_env *env,
278 const struct cl_page_slice *slice,
279 void *cookie, lu_printer_t printer)
280{
281 struct echo_page *ep = cl2echo_page(slice);
282
283 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
7addf402
JX
284 ep, mutex_is_locked(&ep->ep_lock),
285 slice->cpl_page->cp_vmpage);
d7e09d03
PT
286 return 0;
287}
288
289static const struct cl_page_operations echo_page_ops = {
290 .cpo_own = echo_page_own,
291 .cpo_disown = echo_page_disown,
292 .cpo_discard = echo_page_discard,
d7e09d03
PT
293 .cpo_fini = echo_page_fini,
294 .cpo_print = echo_page_print,
295 .cpo_is_vmlocked = echo_page_is_vmlocked,
296 .io = {
297 [CRT_READ] = {
298 .cpo_prep = echo_page_prep,
299 .cpo_completion = echo_page_completion,
300 },
301 [CRT_WRITE] = {
302 .cpo_prep = echo_page_prep,
303 .cpo_completion = echo_page_completion,
304 }
305 }
306};
c9f6bb96 307
d7e09d03
PT
308/** @} echo_page */
309
310/** \defgroup echo_lock Locking
311 *
312 * echo lock operations
313 *
314 * @{
315 */
316static void echo_lock_fini(const struct lu_env *env,
317 struct cl_lock_slice *slice)
318{
319 struct echo_lock *ecl = cl2echo_lock(slice);
320
321 LASSERT(list_empty(&ecl->el_chain));
50d30362 322 kmem_cache_free(echo_lock_kmem, ecl);
d7e09d03
PT
323}
324
d7e09d03
PT
325static struct cl_lock_operations echo_lock_ops = {
326 .clo_fini = echo_lock_fini,
d7e09d03
PT
327};
328
329/** @} echo_lock */
330
331/** \defgroup echo_cl_ops cl_object operations
332 *
333 * operations for cl_object
334 *
335 * @{
336 */
337static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
7addf402 338 struct cl_page *page, pgoff_t index)
d7e09d03
PT
339{
340 struct echo_page *ep = cl_object_page_slice(obj, page);
341 struct echo_object *eco = cl2echo_obj(obj);
d7e09d03 342
5f479924 343 get_page(page->cp_vmpage);
d7e09d03 344 mutex_init(&ep->ep_lock);
fd7444fe 345 cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
d7e09d03 346 atomic_inc(&eco->eo_npages);
0a3bdb00 347 return 0;
d7e09d03
PT
348}
349
350static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
351 struct cl_io *io)
352{
353 return 0;
354}
355
356static int echo_lock_init(const struct lu_env *env,
357 struct cl_object *obj, struct cl_lock *lock,
358 const struct cl_io *unused)
359{
360 struct echo_lock *el;
d7e09d03 361
38cf948d 362 el = kmem_cache_zalloc(echo_lock_kmem, GFP_NOFS);
7ae59eda 363 if (el) {
d7e09d03
PT
364 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
365 el->el_object = cl2echo_obj(obj);
366 INIT_LIST_HEAD(&el->el_chain);
367 atomic_set(&el->el_refcount, 0);
368 }
7ae59eda 369 return !el ? -ENOMEM : 0;
d7e09d03
PT
370}
371
372static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
373 const struct cl_object_conf *conf)
374{
375 return 0;
376}
377
378static const struct cl_object_operations echo_cl_obj_ops = {
379 .coo_page_init = echo_page_init,
380 .coo_lock_init = echo_lock_init,
381 .coo_io_init = echo_io_init,
382 .coo_conf_set = echo_conf_set
383};
c9f6bb96 384
d7e09d03
PT
385/** @} echo_cl_ops */
386
387/** \defgroup echo_lu_ops lu_object operations
388 *
389 * operations for echo lu object.
390 *
391 * @{
392 */
393static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
394 const struct lu_object_conf *conf)
395{
396 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev));
397 struct echo_client_obd *ec = ed->ed_ec;
398 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
5bd07d99
PT
399 const struct cl_object_conf *cconf;
400 struct echo_object_conf *econf;
d7e09d03
PT
401
402 if (ed->ed_next) {
403 struct lu_object *below;
404 struct lu_device *under;
405
406 under = ed->ed_next;
407 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
408 under);
7ae59eda 409 if (!below)
0a3bdb00 410 return -ENOMEM;
d7e09d03
PT
411 lu_object_add(obj, below);
412 }
413
5bd07d99
PT
414 cconf = lu2cl_conf(conf);
415 econf = cl2echo_conf(cconf);
d7e09d03 416
5bd07d99
PT
417 LASSERT(econf->eoc_md);
418 eco->eo_lsm = *econf->eoc_md;
419 /* clear the lsm pointer so that it won't get freed. */
420 *econf->eoc_md = NULL;
d7e09d03
PT
421
422 eco->eo_dev = ed;
423 atomic_set(&eco->eo_npages, 0);
424 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
425
426 spin_lock(&ec->ec_lock);
427 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
428 spin_unlock(&ec->ec_lock);
429
0a3bdb00 430 return 0;
d7e09d03
PT
431}
432
433/* taken from osc_unpackmd() */
434static int echo_alloc_memmd(struct echo_device *ed,
435 struct lov_stripe_md **lsmp)
436{
437 int lsm_size;
438
d7e09d03 439 /* If export is lov/osc then use their obd method */
7ae59eda 440 if (ed->ed_next)
d7e09d03
PT
441 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
442 /* OFD has no unpackmd method, do everything here */
443 lsm_size = lov_stripe_md_size(1);
444
7ae59eda 445 LASSERT(!*lsmp);
c5b89520 446 *lsmp = kzalloc(lsm_size, GFP_NOFS);
efeb257d 447 if (!*lsmp)
0a3bdb00 448 return -ENOMEM;
d7e09d03 449
c5b89520 450 (*lsmp)->lsm_oinfo[0] = kzalloc(sizeof(struct lov_oinfo), GFP_NOFS);
efeb257d 451 if (!(*lsmp)->lsm_oinfo[0]) {
c5b89520 452 kfree(*lsmp);
0a3bdb00 453 return -ENOMEM;
d7e09d03
PT
454 }
455
456 loi_init((*lsmp)->lsm_oinfo[0]);
457 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
458 ostid_set_seq_echo(&(*lsmp)->lsm_oi);
459
0a3bdb00 460 return lsm_size;
d7e09d03
PT
461}
462
463static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
464{
465 int lsm_size;
466
d7e09d03 467 /* If export is lov/osc then use their obd method */
7ae59eda 468 if (ed->ed_next)
d7e09d03
PT
469 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
470 /* OFD has no unpackmd method, do everything here */
471 lsm_size = lov_stripe_md_size(1);
472
c5b89520
JL
473 kfree((*lsmp)->lsm_oinfo[0]);
474 kfree(*lsmp);
d7e09d03 475 *lsmp = NULL;
0a3bdb00 476 return 0;
d7e09d03
PT
477}
478
479static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
480{
481 struct echo_object *eco = cl2echo_obj(lu2cl(obj));
482 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
d7e09d03
PT
483
484 LASSERT(atomic_read(&eco->eo_npages) == 0);
485
486 spin_lock(&ec->ec_lock);
487 list_del_init(&eco->eo_obj_chain);
488 spin_unlock(&ec->ec_lock);
489
490 lu_object_fini(obj);
491 lu_object_header_fini(obj->lo_header);
492
493 if (eco->eo_lsm)
494 echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
50d30362 495 kmem_cache_free(echo_object_kmem, eco);
d7e09d03
PT
496}
497
498static int echo_object_print(const struct lu_env *env, void *cookie,
35d6c026 499 lu_printer_t p, const struct lu_object *o)
d7e09d03
PT
500{
501 struct echo_object *obj = cl2echo_obj(lu2cl(o));
502
503 return (*p)(env, cookie, "echoclient-object@%p", obj);
504}
505
506static const struct lu_object_operations echo_lu_obj_ops = {
507 .loo_object_init = echo_object_init,
508 .loo_object_delete = NULL,
509 .loo_object_release = NULL,
510 .loo_object_free = echo_object_free,
511 .loo_object_print = echo_object_print,
512 .loo_object_invariant = NULL
513};
c9f6bb96 514
d7e09d03
PT
515/** @} echo_lu_ops */
516
517/** \defgroup echo_lu_dev_ops lu_device operations
518 *
519 * Operations for echo lu device.
520 *
521 * @{
522 */
523static struct lu_object *echo_object_alloc(const struct lu_env *env,
524 const struct lu_object_header *hdr,
525 struct lu_device *dev)
526{
527 struct echo_object *eco;
528 struct lu_object *obj = NULL;
d7e09d03
PT
529
530 /* we're the top dev. */
7ae59eda 531 LASSERT(!hdr);
38cf948d 532 eco = kmem_cache_zalloc(echo_object_kmem, GFP_NOFS);
7ae59eda 533 if (eco) {
d7e09d03
PT
534 struct cl_object_header *hdr = &eco->eo_hdr;
535
536 obj = &echo_obj2cl(eco)->co_lu;
537 cl_object_header_init(hdr);
7addf402
JX
538 hdr->coh_page_bufsize = cfs_size_round(sizeof(struct cl_page));
539
d7e09d03
PT
540 lu_object_init(obj, &hdr->coh_lu, dev);
541 lu_object_add_top(&hdr->coh_lu, obj);
542
543 eco->eo_cl.co_ops = &echo_cl_obj_ops;
544 obj->lo_ops = &echo_lu_obj_ops;
545 }
0a3bdb00 546 return obj;
d7e09d03
PT
547}
548
161106c5 549static const struct lu_device_operations echo_device_lu_ops = {
d7e09d03
PT
550 .ldo_object_alloc = echo_object_alloc,
551};
552
553/** @} echo_lu_dev_ops */
554
161106c5 555static const struct cl_device_operations echo_device_cl_ops = {
d7e09d03
PT
556};
557
558/** \defgroup echo_init Setup and teardown
559 *
560 * Init and fini functions for echo client.
561 *
562 * @{
563 */
564static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
565{
566 struct cl_site *site = &ed->ed_site_myself;
567 int rc;
568
569 /* initialize site */
570 rc = cl_site_init(site, &ed->ed_cl);
571 if (rc) {
c4f39553 572 CERROR("Cannot initialize site for echo client(%d)\n", rc);
d7e09d03
PT
573 return rc;
574 }
575
576 rc = lu_site_init_finish(&site->cs_lu);
577 if (rc)
578 return rc;
579
580 ed->ed_site = site;
581 return 0;
582}
583
584static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
585{
586 if (ed->ed_site) {
5bd07d99 587 cl_site_fini(ed->ed_site);
d7e09d03
PT
588 ed->ed_site = NULL;
589 }
590}
591
592static void *echo_thread_key_init(const struct lu_context *ctx,
35d6c026 593 struct lu_context_key *key)
d7e09d03
PT
594{
595 struct echo_thread_info *info;
596
38cf948d 597 info = kmem_cache_zalloc(echo_thread_kmem, GFP_NOFS);
7ae59eda 598 if (!info)
d7e09d03
PT
599 info = ERR_PTR(-ENOMEM);
600 return info;
601}
602
603static void echo_thread_key_fini(const struct lu_context *ctx,
35d6c026 604 struct lu_context_key *key, void *data)
d7e09d03
PT
605{
606 struct echo_thread_info *info = data;
ded9f908 607
50d30362 608 kmem_cache_free(echo_thread_kmem, info);
d7e09d03
PT
609}
610
611static void echo_thread_key_exit(const struct lu_context *ctx,
35d6c026 612 struct lu_context_key *key, void *data)
d7e09d03
PT
613{
614}
615
616static struct lu_context_key echo_thread_key = {
617 .lct_tags = LCT_CL_THREAD,
618 .lct_init = echo_thread_key_init,
619 .lct_fini = echo_thread_key_fini,
620 .lct_exit = echo_thread_key_exit
621};
622
623static void *echo_session_key_init(const struct lu_context *ctx,
35d6c026 624 struct lu_context_key *key)
d7e09d03
PT
625{
626 struct echo_session_info *session;
627
38cf948d 628 session = kmem_cache_zalloc(echo_session_kmem, GFP_NOFS);
7ae59eda 629 if (!session)
d7e09d03
PT
630 session = ERR_PTR(-ENOMEM);
631 return session;
632}
633
634static void echo_session_key_fini(const struct lu_context *ctx,
35d6c026 635 struct lu_context_key *key, void *data)
d7e09d03
PT
636{
637 struct echo_session_info *session = data;
ded9f908 638
50d30362 639 kmem_cache_free(echo_session_kmem, session);
d7e09d03
PT
640}
641
642static void echo_session_key_exit(const struct lu_context *ctx,
35d6c026 643 struct lu_context_key *key, void *data)
d7e09d03
PT
644{
645}
646
647static struct lu_context_key echo_session_key = {
648 .lct_tags = LCT_SESSION,
649 .lct_init = echo_session_key_init,
650 .lct_fini = echo_session_key_fini,
651 .lct_exit = echo_session_key_exit
652};
653
654LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
655
d7e09d03
PT
656static struct lu_device *echo_device_alloc(const struct lu_env *env,
657 struct lu_device_type *t,
658 struct lustre_cfg *cfg)
659{
660 struct lu_device *next;
661 struct echo_device *ed;
662 struct cl_device *cd;
663 struct obd_device *obd = NULL; /* to keep compiler happy */
664 struct obd_device *tgt;
665 const char *tgt_type_name;
37264706 666 int rc, err;
d7e09d03 667
c5b89520 668 ed = kzalloc(sizeof(*ed), GFP_NOFS);
efeb257d 669 if (!ed) {
abb368ab
JY
670 rc = -ENOMEM;
671 goto out;
672 }
d7e09d03 673
d7e09d03
PT
674 cd = &ed->ed_cl;
675 rc = cl_device_init(cd, t);
676 if (rc)
37264706 677 goto out_free;
d7e09d03
PT
678
679 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
680 cd->cd_ops = &echo_device_cl_ops;
681
d7e09d03 682 obd = class_name2obd(lustre_cfg_string(cfg, 0));
7ae59eda
OD
683 LASSERT(obd);
684 LASSERT(env);
d7e09d03
PT
685
686 tgt = class_name2obd(lustre_cfg_string(cfg, 1));
7ae59eda 687 if (!tgt) {
d7e09d03 688 CERROR("Can not find tgt device %s\n",
35d6c026 689 lustre_cfg_string(cfg, 1));
abb368ab 690 rc = -ENODEV;
37264706 691 goto out_device_fini;
d7e09d03
PT
692 }
693
694 next = tgt->obd_lu_dev;
695 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
5bd07d99 696 CERROR("echo MDT client must be run on server\n");
abb368ab 697 rc = -EOPNOTSUPP;
37264706 698 goto out_device_fini;
d7e09d03 699 }
5bd07d99
PT
700
701 rc = echo_site_init(env, ed);
702 if (rc)
37264706 703 goto out_device_fini;
d7e09d03
PT
704
705 rc = echo_client_setup(env, obd, cfg);
706 if (rc)
37264706 707 goto out_site_fini;
d7e09d03
PT
708
709 ed->ed_ec = &obd->u.echo_client;
d7e09d03 710
5bd07d99 711 /* if echo client is to be stacked upon ost device, the next is
35f0d1ab
OD
712 * NULL since ost is not a clio device so far
713 */
7ae59eda 714 if (next && !lu_device_is_cl(next))
5bd07d99
PT
715 next = NULL;
716
717 tgt_type_name = tgt->obd_type->typ_name;
7ae59eda
OD
718 if (next) {
719 if (next->ld_site) {
abb368ab 720 rc = -EBUSY;
37264706 721 goto out_cleanup;
abb368ab 722 }
5bd07d99
PT
723
724 next->ld_site = &ed->ed_site->cs_lu;
725 rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
726 next->ld_type->ldt_name,
727 NULL);
728 if (rc)
37264706 729 goto out_cleanup;
5bd07d99 730
d7e09d03 731 } else {
5bd07d99 732 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
d7e09d03
PT
733 }
734
735 ed->ed_next = next;
0a3bdb00 736 return &cd->cd_lu_dev;
d7e09d03 737
37264706
CA
738out_cleanup:
739 err = echo_client_cleanup(obd);
740 if (err)
741 CERROR("Cleanup obd device %s error(%d)\n",
742 obd->obd_name, err);
743out_site_fini:
744 echo_site_fini(env, ed);
745out_device_fini:
746 cl_device_fini(&ed->ed_cl);
747out_free:
748 kfree(ed);
749out:
fbe7c6c7 750 return ERR_PTR(rc);
d7e09d03
PT
751}
752
753static int echo_device_init(const struct lu_env *env, struct lu_device *d,
35d6c026 754 const char *name, struct lu_device *next)
d7e09d03
PT
755{
756 LBUG();
757 return 0;
758}
759
760static struct lu_device *echo_device_fini(const struct lu_env *env,
761 struct lu_device *d)
762{
763 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
764 struct lu_device *next = ed->ed_next;
765
5bd07d99 766 while (next)
d7e09d03
PT
767 next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
768 return NULL;
769}
770
771static void echo_lock_release(const struct lu_env *env,
772 struct echo_lock *ecl,
773 int still_used)
774{
775 struct cl_lock *clk = echo_lock2cl(ecl);
776
06563b56 777 cl_lock_release(env, clk);
d7e09d03
PT
778}
779
780static struct lu_device *echo_device_free(const struct lu_env *env,
781 struct lu_device *d)
782{
783 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
784 struct echo_client_obd *ec = ed->ed_ec;
785 struct echo_object *eco;
786 struct lu_device *next = ed->ed_next;
787
788 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
789 ed, next);
790
791 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
792
793 /* check if there are objects still alive.
794 * It shouldn't have any object because lu_site_purge would cleanup
795 * all of cached objects. Anyway, probably the echo device is being
796 * parallelly accessed.
797 */
798 spin_lock(&ec->ec_lock);
799 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
800 eco->eo_deleted = 1;
801 spin_unlock(&ec->ec_lock);
802
803 /* purge again */
804 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
805
806 CDEBUG(D_INFO,
807 "Waiting for the reference of echo object to be dropped\n");
808
809 /* Wait for the last reference to be dropped. */
810 spin_lock(&ec->ec_lock);
811 while (!list_empty(&ec->ec_objects)) {
812 spin_unlock(&ec->ec_lock);
2d00bd17 813 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
18fd5baa
PT
814 set_current_state(TASK_UNINTERRUPTIBLE);
815 schedule_timeout(cfs_time_seconds(1));
d7e09d03
PT
816 lu_site_purge(env, &ed->ed_site->cs_lu, -1);
817 spin_lock(&ec->ec_lock);
818 }
819 spin_unlock(&ec->ec_lock);
820
821 LASSERT(list_empty(&ec->ec_locks));
822
823 CDEBUG(D_INFO, "No object exists, exiting...\n");
824
825 echo_client_cleanup(d->ld_obd);
5bd07d99
PT
826
827 while (next)
d7e09d03
PT
828 next = next->ld_type->ldt_ops->ldto_device_free(env, next);
829
830 LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
831 echo_site_fini(env, ed);
832 cl_device_fini(&ed->ed_cl);
c5b89520 833 kfree(ed);
d7e09d03
PT
834
835 return NULL;
836}
837
838static const struct lu_device_type_operations echo_device_type_ops = {
839 .ldto_init = echo_type_init,
840 .ldto_fini = echo_type_fini,
841
842 .ldto_start = echo_type_start,
843 .ldto_stop = echo_type_stop,
844
845 .ldto_device_alloc = echo_device_alloc,
846 .ldto_device_free = echo_device_free,
847 .ldto_device_init = echo_device_init,
848 .ldto_device_fini = echo_device_fini
849};
850
851static struct lu_device_type echo_device_type = {
852 .ldt_tags = LU_DEVICE_CL,
853 .ldt_name = LUSTRE_ECHO_CLIENT_NAME,
854 .ldt_ops = &echo_device_type_ops,
5bd07d99 855 .ldt_ctx_tags = LCT_CL_THREAD,
d7e09d03 856};
c9f6bb96 857
d7e09d03
PT
858/** @} echo_init */
859
860/** \defgroup echo_exports Exported operations
861 *
862 * exporting functions to echo client
863 *
864 * @{
865 */
866
867/* Interfaces to echo client obd device */
868static struct echo_object *cl_echo_object_find(struct echo_device *d,
869 struct lov_stripe_md **lsmp)
870{
871 struct lu_env *env;
872 struct echo_thread_info *info;
873 struct echo_object_conf *conf;
874 struct lov_stripe_md *lsm;
875 struct echo_object *eco;
876 struct cl_object *obj;
877 struct lu_fid *fid;
878 int refcheck;
879 int rc;
d7e09d03
PT
880
881 LASSERT(lsmp);
882 lsm = *lsmp;
883 LASSERT(lsm);
8f405503 884 LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
885 LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
886 POSTID(&lsm->lsm_oi));
d7e09d03
PT
887
888 /* Never return an object if the obd is to be freed. */
889 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
0a3bdb00 890 return ERR_PTR(-ENODEV);
d7e09d03
PT
891
892 env = cl_env_get(&refcheck);
893 if (IS_ERR(env))
0a3bdb00 894 return (void *)env;
d7e09d03
PT
895
896 info = echo_env_info(env);
897 conf = &info->eti_conf;
898 if (d->ed_next) {
f833ea10 899 struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
ded9f908 900
f833ea10
JH
901 LASSERT(oinfo);
902 oinfo->loi_oi = lsm->lsm_oi;
903 conf->eoc_cl.u.coc_oinfo = oinfo;
d7e09d03
PT
904 }
905 conf->eoc_md = lsmp;
906
907 fid = &info->eti_fid;
908 rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
abb368ab
JY
909 if (rc != 0) {
910 eco = ERR_PTR(rc);
911 goto out;
912 }
d7e09d03
PT
913
914 /* In the function below, .hs_keycmp resolves to
35f0d1ab
OD
915 * lu_obj_hop_keycmp()
916 */
d7e09d03
PT
917 /* coverity[overrun-buffer-val] */
918 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
abb368ab
JY
919 if (IS_ERR(obj)) {
920 eco = (void *)obj;
921 goto out;
922 }
d7e09d03
PT
923
924 eco = cl2echo_obj(obj);
925 if (eco->eo_deleted) {
926 cl_object_put(env, obj);
927 eco = ERR_PTR(-EAGAIN);
928 }
929
930out:
931 cl_env_put(env, &refcheck);
0a3bdb00 932 return eco;
d7e09d03
PT
933}
934
935static int cl_echo_object_put(struct echo_object *eco)
936{
937 struct lu_env *env;
938 struct cl_object *obj = echo_obj2cl(eco);
939 int refcheck;
d7e09d03
PT
940
941 env = cl_env_get(&refcheck);
942 if (IS_ERR(env))
0a3bdb00 943 return PTR_ERR(env);
d7e09d03
PT
944
945 /* an external function to kill an object? */
946 if (eco->eo_deleted) {
947 struct lu_object_header *loh = obj->co_lu.lo_header;
ded9f908 948
d7e09d03
PT
949 LASSERT(&eco->eo_hdr == luh2coh(loh));
950 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
951 }
952
953 cl_object_put(env, obj);
954 cl_env_put(env, &refcheck);
0a3bdb00 955 return 0;
d7e09d03
PT
956}
957
958static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
21aef7d9 959 u64 start, u64 end, int mode,
d841438a 960 __u64 *cookie, __u32 enqflags)
d7e09d03
PT
961{
962 struct cl_io *io;
963 struct cl_lock *lck;
964 struct cl_object *obj;
965 struct cl_lock_descr *descr;
966 struct echo_thread_info *info;
967 int rc = -ENOMEM;
d7e09d03
PT
968
969 info = echo_env_info(env);
970 io = &info->eti_io;
06563b56 971 lck = &info->eti_lock;
d7e09d03
PT
972 obj = echo_obj2cl(eco);
973
06563b56
JX
974 memset(lck, 0, sizeof(*lck));
975 descr = &lck->cll_descr;
d7e09d03
PT
976 descr->cld_obj = obj;
977 descr->cld_start = cl_index(obj, start);
978 descr->cld_end = cl_index(obj, end);
979 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
980 descr->cld_enq_flags = enqflags;
981 io->ci_obj = obj;
982
06563b56
JX
983 rc = cl_lock_request(env, io, lck);
984 if (rc == 0) {
d7e09d03
PT
985 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
986 struct echo_lock *el;
987
06563b56
JX
988 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
989 spin_lock(&ec->ec_lock);
990 if (list_empty(&el->el_chain)) {
991 list_add(&el->el_chain, &ec->ec_locks);
992 el->el_cookie = ++ec->ec_unique;
d7e09d03 993 }
06563b56
JX
994 atomic_inc(&el->el_refcount);
995 *cookie = el->el_cookie;
996 spin_unlock(&ec->ec_lock);
d7e09d03 997 }
0a3bdb00 998 return rc;
d7e09d03
PT
999}
1000
d7e09d03
PT
1001static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1002 __u64 cookie)
1003{
1004 struct echo_client_obd *ec = ed->ed_ec;
1005 struct echo_lock *ecl = NULL;
1006 struct list_head *el;
1007 int found = 0, still_used = 0;
d7e09d03 1008
d7e09d03 1009 spin_lock(&ec->ec_lock);
cb7e9f7b
DU
1010 list_for_each(el, &ec->ec_locks) {
1011 ecl = list_entry(el, struct echo_lock, el_chain);
55f5a824 1012 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
d7e09d03
PT
1013 found = (ecl->el_cookie == cookie);
1014 if (found) {
1015 if (atomic_dec_and_test(&ecl->el_refcount))
1016 list_del_init(&ecl->el_chain);
1017 else
1018 still_used = 1;
1019 break;
1020 }
1021 }
1022 spin_unlock(&ec->ec_lock);
1023
1024 if (!found)
0a3bdb00 1025 return -ENOENT;
d7e09d03
PT
1026
1027 echo_lock_release(env, ecl, still_used);
0a3bdb00 1028 return 0;
d7e09d03
PT
1029}
1030
77605e41
JX
1031static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
1032 struct cl_page *page)
d7e09d03 1033{
77605e41
JX
1034 struct echo_thread_info *info;
1035 struct cl_2queue *queue;
d7e09d03 1036
77605e41
JX
1037 info = echo_env_info(env);
1038 LASSERT(io == &info->eti_io);
ded9f908 1039
77605e41
JX
1040 queue = &info->eti_queue;
1041 cl_page_list_add(&queue->c2_qout, page);
d7e09d03
PT
1042}
1043
21aef7d9 1044static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
d7e09d03
PT
1045 struct page **pages, int npages, int async)
1046{
1047 struct lu_env *env;
1048 struct echo_thread_info *info;
1049 struct cl_object *obj = echo_obj2cl(eco);
1050 struct echo_device *ed = eco->eo_dev;
1051 struct cl_2queue *queue;
1052 struct cl_io *io;
1053 struct cl_page *clp;
1054 struct lustre_handle lh = { 0 };
1055 int page_size = cl_page_size(obj);
1056 int refcheck;
1057 int rc;
1058 int i;
d7e09d03 1059
616387e8 1060 LASSERT((offset & ~PAGE_MASK) == 0);
7ae59eda 1061 LASSERT(ed->ed_next);
d7e09d03
PT
1062 env = cl_env_get(&refcheck);
1063 if (IS_ERR(env))
0a3bdb00 1064 return PTR_ERR(env);
d7e09d03
PT
1065
1066 info = echo_env_info(env);
1067 io = &info->eti_io;
1068 queue = &info->eti_queue;
1069
1070 cl_2queue_init(queue);
1071
1072 io->ci_ignore_layout = 1;
1073 rc = cl_io_init(env, io, CIT_MISC, obj);
1074 if (rc < 0)
abb368ab 1075 goto out;
d7e09d03
PT
1076 LASSERT(rc == 0);
1077
d7e09d03 1078 rc = cl_echo_enqueue0(env, eco, offset,
09cbfeaf 1079 offset + npages * PAGE_SIZE - 1,
d7e09d03
PT
1080 rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1081 CEF_NEVER);
1082 if (rc < 0)
abb368ab 1083 goto error_lock;
d7e09d03
PT
1084
1085 for (i = 0; i < npages; i++) {
1086 LASSERT(pages[i]);
1087 clp = cl_page_find(env, obj, cl_index(obj, offset),
1088 pages[i], CPT_TRANSIENT);
1089 if (IS_ERR(clp)) {
1090 rc = PTR_ERR(clp);
1091 break;
1092 }
1093 LASSERT(clp->cp_type == CPT_TRANSIENT);
1094
1095 rc = cl_page_own(env, io, clp);
1096 if (rc) {
1097 LASSERT(clp->cp_state == CPS_FREEING);
1098 cl_page_put(env, clp);
1099 break;
1100 }
53f1a127
SB
1101 /*
1102 * Add a page to the incoming page list of 2-queue.
1103 */
1104 cl_page_list_add(&queue->c2_qin, clp);
d7e09d03
PT
1105
1106 /* drop the reference count for cl_page_find, so that the page
35f0d1ab
OD
1107 * will be freed in cl_2queue_fini.
1108 */
d7e09d03
PT
1109 cl_page_put(env, clp);
1110 cl_page_clip(env, clp, 0, page_size);
1111
1112 offset += page_size;
1113 }
1114
1115 if (rc == 0) {
1116 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1117
1118 async = async && (typ == CRT_WRITE);
1119 if (async)
77605e41
JX
1120 rc = cl_io_commit_async(env, io, &queue->c2_qin,
1121 0, PAGE_SIZE,
1122 echo_commit_callback);
d7e09d03
PT
1123 else
1124 rc = cl_io_submit_sync(env, io, typ, queue, 0);
1125 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1126 async ? "async" : "sync", rc);
1127 }
1128
1129 cl_echo_cancel0(env, ed, lh.cookie);
d7e09d03
PT
1130error_lock:
1131 cl_2queue_discard(env, io, queue);
1132 cl_2queue_disown(env, io, queue);
1133 cl_2queue_fini(env, queue);
1134 cl_io_fini(env, io);
1135out:
1136 cl_env_put(env, &refcheck);
1137 return rc;
1138}
c9f6bb96 1139
d7e09d03
PT
1140/** @} echo_exports */
1141
21aef7d9 1142static u64 last_object_id;
d7e09d03 1143
d7e09d03 1144static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
f833ea10 1145 struct obdo *oa, struct obd_trans_info *oti)
d7e09d03
PT
1146{
1147 struct echo_object *eco;
1148 struct echo_client_obd *ec = ed->ed_ec;
1149 struct lov_stripe_md *lsm = NULL;
1150 int rc;
1151 int created = 0;
d7e09d03 1152
01de911e
OD
1153 if (!(oa->o_valid & OBD_MD_FLID) ||
1154 !(oa->o_valid & OBD_MD_FLGROUP) ||
1155 !fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
1156 CERROR("invalid oid " DOSTID "\n", POSTID(&oa->o_oi));
0a3bdb00 1157 return -EINVAL;
d7e09d03
PT
1158 }
1159
1160 rc = echo_alloc_memmd(ed, &lsm);
1161 if (rc < 0) {
1162 CERROR("Cannot allocate md: rc = %d\n", rc);
abb368ab 1163 goto failed;
d7e09d03
PT
1164 }
1165
f833ea10 1166 /* setup object ID here */
61ece0e6 1167 lsm->lsm_oi = oa->o_oi;
d7e09d03
PT
1168
1169 if (ostid_id(&lsm->lsm_oi) == 0)
1170 ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1171
f833ea10
JH
1172 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1173 if (rc != 0) {
1174 CERROR("Cannot create objects: rc = %d\n", rc);
1175 goto failed;
d7e09d03 1176 }
f833ea10 1177 created = 1;
d7e09d03
PT
1178
1179 /* See what object ID we were given */
1180 oa->o_oi = lsm->lsm_oi;
1181 oa->o_valid |= OBD_MD_FLID;
1182
1183 eco = cl_echo_object_find(ed, &lsm);
abb368ab
JY
1184 if (IS_ERR(eco)) {
1185 rc = PTR_ERR(eco);
1186 goto failed;
1187 }
d7e09d03
PT
1188 cl_echo_object_put(eco);
1189
1190 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
d7e09d03
PT
1191
1192 failed:
1193 if (created && rc)
ef2e0f55 1194 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL);
d7e09d03
PT
1195 if (lsm)
1196 echo_free_memmd(ed, &lsm);
1197 if (rc)
1198 CERROR("create object failed with: rc = %d\n", rc);
fbe7c6c7 1199 return rc;
d7e09d03
PT
1200}
1201
1202static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1203 struct obdo *oa)
1204{
1205 struct lov_stripe_md *lsm = NULL;
1206 struct echo_object *eco;
1207 int rc;
d7e09d03
PT
1208
1209 if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1210 /* disallow use of object id 0 */
d841438a 1211 CERROR("No valid oid\n");
0a3bdb00 1212 return -EINVAL;
d7e09d03
PT
1213 }
1214
1215 rc = echo_alloc_memmd(ed, &lsm);
1216 if (rc < 0)
0a3bdb00 1217 return rc;
d7e09d03
PT
1218
1219 lsm->lsm_oi = oa->o_oi;
1220 if (!(oa->o_valid & OBD_MD_FLGROUP))
1221 ostid_set_seq_echo(&lsm->lsm_oi);
1222
1223 rc = 0;
1224 eco = cl_echo_object_find(ed, &lsm);
1225 if (!IS_ERR(eco))
1226 *ecop = eco;
1227 else
1228 rc = PTR_ERR(eco);
1229 if (lsm)
1230 echo_free_memmd(ed, &lsm);
0a3bdb00 1231 return rc;
d7e09d03
PT
1232}
1233
1234static void echo_put_object(struct echo_object *eco)
1235{
19b2056f
JN
1236 int rc;
1237
1238 rc = cl_echo_object_put(eco);
1239 if (rc)
1240 CERROR("%s: echo client drop an object failed: rc = %d\n",
1241 eco->eo_dev->ed_ec->ec_exp->exp_obd->obd_name, rc);
d7e09d03
PT
1242}
1243
1244static void
f833ea10 1245echo_client_page_debug_setup(struct page *page, int rw, u64 id,
21aef7d9 1246 u64 offset, u64 count)
d7e09d03
PT
1247{
1248 char *addr;
21aef7d9
OD
1249 u64 stripe_off;
1250 u64 stripe_id;
d7e09d03
PT
1251 int delta;
1252
1253 /* no partial pages on the client */
09cbfeaf 1254 LASSERT(count == PAGE_SIZE);
d7e09d03
PT
1255
1256 addr = kmap(page);
1257
09cbfeaf 1258 for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
d7e09d03
PT
1259 if (rw == OBD_BRW_WRITE) {
1260 stripe_off = offset + delta;
1261 stripe_id = id;
d7e09d03
PT
1262 } else {
1263 stripe_off = 0xdeadbeef00c0ffeeULL;
1264 stripe_id = 0xdeadbeef00c0ffeeULL;
1265 }
1266 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1267 stripe_off, stripe_id);
1268 }
1269
1270 kunmap(page);
1271}
1272
f833ea10 1273static int echo_client_page_debug_check(struct page *page, u64 id,
21aef7d9 1274 u64 offset, u64 count)
d7e09d03 1275{
21aef7d9
OD
1276 u64 stripe_off;
1277 u64 stripe_id;
d7e09d03
PT
1278 char *addr;
1279 int delta;
1280 int rc;
1281 int rc2;
1282
1283 /* no partial pages on the client */
09cbfeaf 1284 LASSERT(count == PAGE_SIZE);
d7e09d03
PT
1285
1286 addr = kmap(page);
1287
09cbfeaf 1288 for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
d7e09d03
PT
1289 stripe_off = offset + delta;
1290 stripe_id = id;
d7e09d03
PT
1291
1292 rc2 = block_debug_check("test_brw",
1293 addr + delta, OBD_ECHO_BLOCK_SIZE,
1294 stripe_off, stripe_id);
1295 if (rc2 != 0) {
d841438a 1296 CERROR("Error in echo object %#llx\n", id);
d7e09d03
PT
1297 rc = rc2;
1298 }
1299 }
1300
1301 kunmap(page);
1302 return rc;
1303}
1304
1305static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
21aef7d9
OD
1306 struct echo_object *eco, u64 offset,
1307 u64 count, int async,
d7e09d03
PT
1308 struct obd_trans_info *oti)
1309{
21aef7d9 1310 u32 npages;
d7e09d03
PT
1311 struct brw_page *pga;
1312 struct brw_page *pgp;
1313 struct page **pages;
21aef7d9 1314 u64 off;
d7e09d03
PT
1315 int i;
1316 int rc;
1317 int verify;
cad7aa13 1318 gfp_t gfp_mask;
d7e09d03 1319 int brw_flags = 0;
d7e09d03
PT
1320
1321 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1322 (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1323 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1324
40113370 1325 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
d7e09d03
PT
1326
1327 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
d7e09d03
PT
1328
1329 if (count <= 0 ||
616387e8 1330 (count & (~PAGE_MASK)) != 0)
0a3bdb00 1331 return -EINVAL;
d7e09d03
PT
1332
1333 /* XXX think again with misaligned I/O */
09cbfeaf 1334 npages = count >> PAGE_SHIFT;
d7e09d03
PT
1335
1336 if (rw == OBD_BRW_WRITE)
1337 brw_flags = OBD_BRW_ASYNC;
1338
c5b89520 1339 pga = kcalloc(npages, sizeof(*pga), GFP_NOFS);
7ae59eda 1340 if (!pga)
0a3bdb00 1341 return -ENOMEM;
d7e09d03 1342
c5b89520 1343 pages = kcalloc(npages, sizeof(*pages), GFP_NOFS);
7ae59eda 1344 if (!pages) {
c5b89520 1345 kfree(pga);
0a3bdb00 1346 return -ENOMEM;
d7e09d03
PT
1347 }
1348
1349 for (i = 0, pgp = pga, off = offset;
1350 i < npages;
09cbfeaf 1351 i++, pgp++, off += PAGE_SIZE) {
7ae59eda 1352 LASSERT(!pgp->pg); /* for cleanup */
d7e09d03
PT
1353
1354 rc = -ENOMEM;
0c52e423 1355 pgp->pg = alloc_page(gfp_mask);
7ae59eda 1356 if (!pgp->pg)
d7e09d03
PT
1357 goto out;
1358
1359 pages[i] = pgp->pg;
09cbfeaf 1360 pgp->count = PAGE_SIZE;
d7e09d03
PT
1361 pgp->off = off;
1362 pgp->flag = brw_flags;
1363
1364 if (verify)
f833ea10 1365 echo_client_page_debug_setup(pgp->pg, rw,
d7e09d03
PT
1366 ostid_id(&oa->o_oi), off,
1367 pgp->count);
1368 }
1369
1370 /* brw mode can only be used at client */
7ae59eda 1371 LASSERT(ed->ed_next);
d7e09d03
PT
1372 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1373
1374 out:
1375 if (rc != 0 || rw != OBD_BRW_READ)
1376 verify = 0;
1377
1378 for (i = 0, pgp = pga; i < npages; i++, pgp++) {
7ae59eda 1379 if (!pgp->pg)
d7e09d03
PT
1380 continue;
1381
1382 if (verify) {
1383 int vrc;
ded9f908 1384
f833ea10 1385 vrc = echo_client_page_debug_check(pgp->pg,
d7e09d03
PT
1386 ostid_id(&oa->o_oi),
1387 pgp->off, pgp->count);
1388 if (vrc != 0 && rc == 0)
1389 rc = vrc;
1390 }
0c52e423 1391 __free_page(pgp->pg);
d7e09d03 1392 }
c5b89520
JL
1393 kfree(pga);
1394 kfree(pages);
0a3bdb00 1395 return rc;
d7e09d03
PT
1396}
1397
1398static int echo_client_prep_commit(const struct lu_env *env,
1399 struct obd_export *exp, int rw,
1400 struct obdo *oa, struct echo_object *eco,
21aef7d9
OD
1401 u64 offset, u64 count,
1402 u64 batch, struct obd_trans_info *oti,
d7e09d03
PT
1403 int async)
1404{
d7e09d03
PT
1405 struct obd_ioobj ioo;
1406 struct niobuf_local *lnb;
1407 struct niobuf_remote *rnb;
21aef7d9
OD
1408 u64 off;
1409 u64 npages, tot_pages;
d7e09d03
PT
1410 int i, ret = 0, brw_flags = 0;
1411
616387e8 1412 if (count <= 0 || (count & (~PAGE_MASK)) != 0)
0a3bdb00 1413 return -EINVAL;
d7e09d03 1414
09cbfeaf
KS
1415 npages = batch >> PAGE_SHIFT;
1416 tot_pages = count >> PAGE_SHIFT;
d7e09d03 1417
c5b89520
JL
1418 lnb = kcalloc(npages, sizeof(struct niobuf_local), GFP_NOFS);
1419 rnb = kcalloc(npages, sizeof(struct niobuf_remote), GFP_NOFS);
d7e09d03 1420
7ae59eda 1421 if (!lnb || !rnb) {
abb368ab
JY
1422 ret = -ENOMEM;
1423 goto out;
1424 }
d7e09d03
PT
1425
1426 if (rw == OBD_BRW_WRITE && async)
1427 brw_flags |= OBD_BRW_ASYNC;
1428
1429 obdo_to_ioobj(oa, &ioo);
1430
1431 off = offset;
1432
a58a38ac 1433 for (; tot_pages; tot_pages -= npages) {
d7e09d03
PT
1434 int lpages;
1435
1436 if (tot_pages < npages)
1437 npages = tot_pages;
1438
09cbfeaf 1439 for (i = 0; i < npages; i++, off += PAGE_SIZE) {
d7e09d03 1440 rnb[i].offset = off;
09cbfeaf 1441 rnb[i].len = PAGE_SIZE;
d7e09d03
PT
1442 rnb[i].flags = brw_flags;
1443 }
1444
1445 ioo.ioo_bufcnt = npages;
d7e09d03
PT
1446
1447 lpages = npages;
1448 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
ef2e0f55 1449 lnb, oti);
d7e09d03 1450 if (ret != 0)
abb368ab 1451 goto out;
d7e09d03
PT
1452 LASSERT(lpages == npages);
1453
1454 for (i = 0; i < lpages; i++) {
1455 struct page *page = lnb[i].page;
1456
1457 /* read past eof? */
7ae59eda 1458 if (!page && lnb[i].rc == 0)
d7e09d03
PT
1459 continue;
1460
1461 if (async)
1462 lnb[i].flags |= OBD_BRW_ASYNC;
1463
1464 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1465 (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1466 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1467 continue;
1468
1469 if (rw == OBD_BRW_WRITE)
f833ea10 1470 echo_client_page_debug_setup(page, rw,
d7e09d03
PT
1471 ostid_id(&oa->o_oi),
1472 rnb[i].offset,
1473 rnb[i].len);
1474 else
f833ea10 1475 echo_client_page_debug_check(page,
d7e09d03
PT
1476 ostid_id(&oa->o_oi),
1477 rnb[i].offset,
1478 rnb[i].len);
1479 }
1480
1481 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1482 rnb, npages, lnb, oti, ret);
1483 if (ret != 0)
abb368ab 1484 goto out;
d7e09d03
PT
1485
1486 /* Reset oti otherwise it would confuse ldiskfs. */
1487 memset(oti, 0, sizeof(*oti));
09b0170e
JY
1488
1489 /* Reuse env context. */
1490 lu_context_exit((struct lu_context *)&env->le_ctx);
1491 lu_context_enter((struct lu_context *)&env->le_ctx);
d7e09d03
PT
1492 }
1493
1494out:
627ef6ea
JL
1495 kfree(lnb);
1496 kfree(rnb);
0a3bdb00 1497 return ret;
d7e09d03
PT
1498}
1499
1500static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1501 struct obd_export *exp,
1502 struct obd_ioctl_data *data,
1503 struct obd_trans_info *dummy_oti)
1504{
1505 struct obd_device *obd = class_exp2obd(exp);
1506 struct echo_device *ed = obd2echo_dev(obd);
1507 struct echo_client_obd *ec = ed->ed_ec;
1508 struct obdo *oa = &data->ioc_obdo1;
1509 struct echo_object *eco;
1510 int rc;
1511 int async = 1;
1512 long test_mode;
d7e09d03
PT
1513
1514 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1515
1516 rc = echo_get_object(&eco, ed, oa);
1517 if (rc)
0a3bdb00 1518 return rc;
d7e09d03
PT
1519
1520 oa->o_valid &= ~OBD_MD_FLHANDLE;
1521
1522 /* OFD/obdfilter works only via prep/commit */
1523 test_mode = (long)data->ioc_pbuf1;
1524 if (test_mode == 1)
1525 async = 0;
1526
7ae59eda 1527 if (!ed->ed_next && test_mode != 3) {
d7e09d03
PT
1528 test_mode = 3;
1529 data->ioc_plen1 = data->ioc_count;
1530 }
1531
1532 /* Truncate batch size to maximum */
1533 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1534 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1535
1536 switch (test_mode) {
1537 case 1:
1538 /* fall through */
1539 case 2:
1540 rc = echo_client_kbrw(ed, rw, oa,
1541 eco, data->ioc_offset,
1542 data->ioc_count, async, dummy_oti);
1543 break;
1544 case 3:
1545 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1546 eco, data->ioc_offset,
1547 data->ioc_count, data->ioc_plen1,
1548 dummy_oti, async);
1549 break;
1550 default:
1551 rc = -EINVAL;
1552 }
1553 echo_put_object(eco);
0a3bdb00 1554 return rc;
d7e09d03
PT
1555}
1556
d7e09d03
PT
1557static int
1558echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
e09bee34 1559 void *karg, void __user *uarg)
d7e09d03
PT
1560{
1561 struct obd_device *obd = exp->exp_obd;
1562 struct echo_device *ed = obd2echo_dev(obd);
1563 struct echo_client_obd *ec = ed->ed_ec;
1564 struct echo_object *eco;
1565 struct obd_ioctl_data *data = karg;
1566 struct obd_trans_info dummy_oti;
1567 struct lu_env *env;
1568 struct oti_req_ack_lock *ack_lock;
1569 struct obdo *oa;
1570 struct lu_fid fid;
1571 int rw = OBD_BRW_READ;
1572 int rc = 0;
1573 int i;
d7e09d03
PT
1574
1575 memset(&dummy_oti, 0, sizeof(dummy_oti));
1576
1577 oa = &data->ioc_obdo1;
1578 if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1579 oa->o_valid |= OBD_MD_FLGROUP;
1580 ostid_set_seq_echo(&oa->o_oi);
1581 }
1582
1583 /* This FID is unpacked just for validation at this point */
1584 rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1585 if (rc < 0)
0a3bdb00 1586 return rc;
d7e09d03 1587
c5b89520 1588 env = kzalloc(sizeof(*env), GFP_NOFS);
efeb257d 1589 if (!env)
0a3bdb00 1590 return -ENOMEM;
d7e09d03
PT
1591
1592 rc = lu_env_init(env, LCT_DT_THREAD);
abb368ab
JY
1593 if (rc) {
1594 rc = -ENOMEM;
1595 goto out;
1596 }
d7e09d03
PT
1597
1598 switch (cmd) {
1599 case OBD_IOC_CREATE: /* may create echo object */
abb368ab
JY
1600 if (!capable(CFS_CAP_SYS_ADMIN)) {
1601 rc = -EPERM;
1602 goto out;
1603 }
d7e09d03 1604
f833ea10 1605 rc = echo_create_object(env, ed, oa, &dummy_oti);
abb368ab 1606 goto out;
d7e09d03 1607
d7e09d03 1608 case OBD_IOC_DESTROY:
abb368ab
JY
1609 if (!capable(CFS_CAP_SYS_ADMIN)) {
1610 rc = -EPERM;
1611 goto out;
1612 }
d7e09d03
PT
1613
1614 rc = echo_get_object(&eco, ed, oa);
1615 if (rc == 0) {
f833ea10 1616 rc = obd_destroy(env, ec->ec_exp, oa, NULL,
ef2e0f55 1617 &dummy_oti, NULL);
d7e09d03
PT
1618 if (rc == 0)
1619 eco->eo_deleted = 1;
1620 echo_put_object(eco);
1621 }
abb368ab 1622 goto out;
d7e09d03
PT
1623
1624 case OBD_IOC_GETATTR:
1625 rc = echo_get_object(&eco, ed, oa);
1626 if (rc == 0) {
f833ea10
JH
1627 struct obd_info oinfo = {
1628 .oi_oa = oa,
1629 };
ded9f908 1630
d7e09d03
PT
1631 rc = obd_getattr(env, ec->ec_exp, &oinfo);
1632 echo_put_object(eco);
1633 }
abb368ab 1634 goto out;
d7e09d03
PT
1635
1636 case OBD_IOC_SETATTR:
abb368ab
JY
1637 if (!capable(CFS_CAP_SYS_ADMIN)) {
1638 rc = -EPERM;
1639 goto out;
1640 }
d7e09d03
PT
1641
1642 rc = echo_get_object(&eco, ed, oa);
1643 if (rc == 0) {
f833ea10
JH
1644 struct obd_info oinfo = {
1645 .oi_oa = oa,
1646 };
d7e09d03
PT
1647
1648 rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1649 echo_put_object(eco);
1650 }
abb368ab 1651 goto out;
d7e09d03
PT
1652
1653 case OBD_IOC_BRW_WRITE:
abb368ab
JY
1654 if (!capable(CFS_CAP_SYS_ADMIN)) {
1655 rc = -EPERM;
1656 goto out;
1657 }
d7e09d03
PT
1658
1659 rw = OBD_BRW_WRITE;
1660 /* fall through */
1661 case OBD_IOC_BRW_READ:
1662 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
abb368ab 1663 goto out;
d7e09d03 1664
d7e09d03 1665 default:
d841438a 1666 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
abb368ab
JY
1667 rc = -ENOTTY;
1668 goto out;
d7e09d03
PT
1669 }
1670
d7e09d03
PT
1671out:
1672 lu_env_fini(env);
c5b89520 1673 kfree(env);
d7e09d03
PT
1674
1675 /* XXX this should be in a helper also called by target_send_reply */
1676 for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1677 i++, ack_lock++) {
1678 if (!ack_lock->mode)
1679 break;
1680 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1681 }
1682
1683 return rc;
1684}
1685
1686static int echo_client_setup(const struct lu_env *env,
1687 struct obd_device *obddev, struct lustre_cfg *lcfg)
1688{
1689 struct echo_client_obd *ec = &obddev->u.echo_client;
1690 struct obd_device *tgt;
1691 struct obd_uuid echo_uuid = { "ECHO_UUID" };
1692 struct obd_connect_data *ocd = NULL;
1693 int rc;
d7e09d03
PT
1694
1695 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1696 CERROR("requires a TARGET OBD name\n");
0a3bdb00 1697 return -EINVAL;
d7e09d03
PT
1698 }
1699
1700 tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
1701 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
1702 CERROR("device not attached or not set up (%s)\n",
1703 lustre_cfg_string(lcfg, 1));
0a3bdb00 1704 return -EINVAL;
d7e09d03
PT
1705 }
1706
1707 spin_lock_init(&ec->ec_lock);
d841438a
YT
1708 INIT_LIST_HEAD(&ec->ec_objects);
1709 INIT_LIST_HEAD(&ec->ec_locks);
d7e09d03 1710 ec->ec_unique = 0;
d7e09d03 1711
c5b89520 1712 ocd = kzalloc(sizeof(*ocd), GFP_NOFS);
2f27a3e2 1713 if (!ocd)
d7e09d03 1714 return -ENOMEM;
d7e09d03
PT
1715
1716 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
1717 OBD_CONNECT_BRW_SIZE |
1718 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
3b2f75fd 1719 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
1720 OBD_CONNECT_FID;
d7e09d03
PT
1721 ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
1722 ocd->ocd_version = LUSTRE_VERSION_CODE;
1723 ocd->ocd_group = FID_SEQ_ECHO;
1724
1725 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
d7e09d03 1726
c5b89520 1727 kfree(ocd);
d7e09d03
PT
1728
1729 if (rc != 0) {
1730 CERROR("fail to connect to device %s\n",
1731 lustre_cfg_string(lcfg, 1));
fbe7c6c7 1732 return rc;
d7e09d03
PT
1733 }
1734
0a3bdb00 1735 return rc;
d7e09d03
PT
1736}
1737
1738static int echo_client_cleanup(struct obd_device *obddev)
1739{
d7e09d03
PT
1740 struct echo_client_obd *ec = &obddev->u.echo_client;
1741 int rc;
d7e09d03 1742
d7e09d03
PT
1743 if (!list_empty(&obddev->obd_exports)) {
1744 CERROR("still has clients!\n");
0a3bdb00 1745 return -EBUSY;
d7e09d03
PT
1746 }
1747
1748 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
1749 rc = obd_disconnect(ec->ec_exp);
1750 if (rc != 0)
1751 CERROR("fail to disconnect device: %d\n", rc);
1752
0a3bdb00 1753 return rc;
d7e09d03
PT
1754}
1755
1756static int echo_client_connect(const struct lu_env *env,
1757 struct obd_export **exp,
1758 struct obd_device *src, struct obd_uuid *cluuid,
1759 struct obd_connect_data *data, void *localdata)
1760{
1761 int rc;
1762 struct lustre_handle conn = { 0 };
1763
d7e09d03
PT
1764 rc = class_connect(&conn, src, cluuid);
1765 if (rc == 0) {
1766 *exp = class_conn2export(&conn);
1767 }
1768
0a3bdb00 1769 return rc;
d7e09d03
PT
1770}
1771
1772static int echo_client_disconnect(struct obd_export *exp)
1773{
d7e09d03 1774 int rc;
d7e09d03 1775
7ae59eda 1776 if (!exp) {
abb368ab
JY
1777 rc = -EINVAL;
1778 goto out;
1779 }
d7e09d03 1780
d7e09d03 1781 rc = class_disconnect(exp);
abb368ab 1782 goto out;
d7e09d03
PT
1783 out:
1784 return rc;
1785}
1786
1787static struct obd_ops echo_client_obd_ops = {
a13b1f32
DC
1788 .owner = THIS_MODULE,
1789 .iocontrol = echo_client_iocontrol,
1790 .connect = echo_client_connect,
1791 .disconnect = echo_client_disconnect
d7e09d03
PT
1792};
1793
5b34cd29 1794static int echo_client_init(void)
d7e09d03 1795{
d7e09d03
PT
1796 int rc;
1797
d7e09d03
PT
1798 rc = lu_kmem_init(echo_caches);
1799 if (rc == 0) {
1800 rc = class_register_type(&echo_client_obd_ops, NULL,
d7e09d03
PT
1801 LUSTRE_ECHO_CLIENT_NAME,
1802 &echo_device_type);
1803 if (rc)
1804 lu_kmem_fini(echo_caches);
1805 }
1806 return rc;
1807}
1808
5b34cd29 1809static void echo_client_exit(void)
d7e09d03
PT
1810{
1811 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
1812 lu_kmem_fini(echo_caches);
1813}
1814
1815static int __init obdecho_init(void)
1816{
d7e09d03
PT
1817 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1818
09cbfeaf 1819 LASSERT(PAGE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
d7e09d03 1820
d9518b76 1821 return echo_client_init();
d7e09d03
PT
1822}
1823
1824static void /*__exit*/ obdecho_exit(void)
1825{
1826 echo_client_exit();
d7e09d03
PT
1827}
1828
a0455471 1829MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
57878e17 1830MODULE_DESCRIPTION("Lustre Echo Client test driver");
6960736c 1831MODULE_VERSION(LUSTRE_VERSION_STRING);
5b0e50b9 1832MODULE_LICENSE("GPL");
d7e09d03 1833
6960736c
GKH
1834module_init(obdecho_init);
1835module_exit(obdecho_exit);
d7e09d03
PT
1836
1837/** @} echo_client */
This page took 0.797032 seconds and 5 git commands to generate.