4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_ECHO
34 #include "../../include/linux/libcfs/libcfs.h"
36 #include "../include/obd.h"
37 #include "../include/obd_support.h"
38 #include "../include/obd_class.h"
39 #include "../include/lustre_debug.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/cl_object.h"
42 #include "../include/lustre_fid.h"
43 #include "../include/lustre_acl.h"
44 #include "../include/lustre_net.h"
46 #include "echo_internal.h"
48 /** \defgroup echo_client Echo Client
53 struct cl_device ed_cl
;
54 struct echo_client_obd
*ed_ec
;
56 struct cl_site ed_site_myself
;
57 struct cl_site
*ed_site
;
58 struct lu_device
*ed_next
;
62 struct cl_object eo_cl
;
63 struct cl_object_header eo_hdr
;
65 struct echo_device
*eo_dev
;
66 struct list_head eo_obj_chain
;
67 struct lov_stripe_md
*eo_lsm
;
72 struct echo_object_conf
{
73 struct cl_object_conf eoc_cl
;
74 struct lov_stripe_md
**eoc_md
;
78 struct cl_page_slice ep_cl
;
83 struct cl_lock_slice el_cl
;
84 struct list_head el_chain
;
85 struct echo_object
*el_object
;
90 static int echo_client_setup(const struct lu_env
*env
,
91 struct obd_device
*obddev
,
92 struct lustre_cfg
*lcfg
);
93 static int echo_client_cleanup(struct obd_device
*obddev
);
95 /** \defgroup echo_helpers Helper functions
98 static inline struct echo_device
*cl2echo_dev(const struct cl_device
*dev
)
100 return container_of0(dev
, struct echo_device
, ed_cl
);
103 static inline struct cl_device
*echo_dev2cl(struct echo_device
*d
)
108 static inline struct echo_device
*obd2echo_dev(const struct obd_device
*obd
)
110 return cl2echo_dev(lu2cl_dev(obd
->obd_lu_dev
));
113 static inline struct cl_object
*echo_obj2cl(struct echo_object
*eco
)
118 static inline struct echo_object
*cl2echo_obj(const struct cl_object
*o
)
120 return container_of(o
, struct echo_object
, eo_cl
);
123 static inline struct echo_page
*cl2echo_page(const struct cl_page_slice
*s
)
125 return container_of(s
, struct echo_page
, ep_cl
);
128 static inline struct echo_lock
*cl2echo_lock(const struct cl_lock_slice
*s
)
130 return container_of(s
, struct echo_lock
, el_cl
);
133 static inline struct cl_lock
*echo_lock2cl(const struct echo_lock
*ecl
)
135 return ecl
->el_cl
.cls_lock
;
138 static struct lu_context_key echo_thread_key
;
139 static inline struct echo_thread_info
*echo_env_info(const struct lu_env
*env
)
141 struct echo_thread_info
*info
;
143 info
= lu_context_key_get(&env
->le_ctx
, &echo_thread_key
);
149 struct echo_object_conf
*cl2echo_conf(const struct cl_object_conf
*c
)
151 return container_of(c
, struct echo_object_conf
, eoc_cl
);
154 /** @} echo_helpers */
156 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
157 struct lov_stripe_md
**lsm
);
158 static int cl_echo_object_put(struct echo_object
*eco
);
159 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
160 struct page
**pages
, int npages
, int async
);
162 struct echo_thread_info
{
163 struct echo_object_conf eti_conf
;
164 struct lustre_md eti_md
;
166 struct cl_2queue eti_queue
;
168 struct cl_lock eti_lock
;
169 struct lu_fid eti_fid
;
170 struct lu_fid eti_fid2
;
173 /* No session used right now */
174 struct echo_session_info
{
178 static struct kmem_cache
*echo_lock_kmem
;
179 static struct kmem_cache
*echo_object_kmem
;
180 static struct kmem_cache
*echo_thread_kmem
;
181 static struct kmem_cache
*echo_session_kmem
;
183 static struct lu_kmem_descr echo_caches
[] = {
185 .ckd_cache
= &echo_lock_kmem
,
186 .ckd_name
= "echo_lock_kmem",
187 .ckd_size
= sizeof(struct echo_lock
)
190 .ckd_cache
= &echo_object_kmem
,
191 .ckd_name
= "echo_object_kmem",
192 .ckd_size
= sizeof(struct echo_object
)
195 .ckd_cache
= &echo_thread_kmem
,
196 .ckd_name
= "echo_thread_kmem",
197 .ckd_size
= sizeof(struct echo_thread_info
)
200 .ckd_cache
= &echo_session_kmem
,
201 .ckd_name
= "echo_session_kmem",
202 .ckd_size
= sizeof(struct echo_session_info
)
209 /** \defgroup echo_page Page operations
211 * Echo page operations.
215 static int echo_page_own(const struct lu_env
*env
,
216 const struct cl_page_slice
*slice
,
217 struct cl_io
*io
, int nonblock
)
219 struct echo_page
*ep
= cl2echo_page(slice
);
222 mutex_lock(&ep
->ep_lock
);
223 else if (!mutex_trylock(&ep
->ep_lock
))
228 static void echo_page_disown(const struct lu_env
*env
,
229 const struct cl_page_slice
*slice
,
232 struct echo_page
*ep
= cl2echo_page(slice
);
234 LASSERT(mutex_is_locked(&ep
->ep_lock
));
235 mutex_unlock(&ep
->ep_lock
);
238 static void echo_page_discard(const struct lu_env
*env
,
239 const struct cl_page_slice
*slice
,
240 struct cl_io
*unused
)
242 cl_page_delete(env
, slice
->cpl_page
);
245 static int echo_page_is_vmlocked(const struct lu_env
*env
,
246 const struct cl_page_slice
*slice
)
248 if (mutex_is_locked(&cl2echo_page(slice
)->ep_lock
))
253 static void echo_page_completion(const struct lu_env
*env
,
254 const struct cl_page_slice
*slice
,
257 LASSERT(slice
->cpl_page
->cp_sync_io
);
260 static void echo_page_fini(const struct lu_env
*env
,
261 struct cl_page_slice
*slice
)
263 struct echo_object
*eco
= cl2echo_obj(slice
->cpl_obj
);
265 atomic_dec(&eco
->eo_npages
);
266 put_page(slice
->cpl_page
->cp_vmpage
);
269 static int echo_page_prep(const struct lu_env
*env
,
270 const struct cl_page_slice
*slice
,
271 struct cl_io
*unused
)
276 static int echo_page_print(const struct lu_env
*env
,
277 const struct cl_page_slice
*slice
,
278 void *cookie
, lu_printer_t printer
)
280 struct echo_page
*ep
= cl2echo_page(slice
);
282 (*printer
)(env
, cookie
, LUSTRE_ECHO_CLIENT_NAME
"-page@%p %d vm@%p\n",
283 ep
, mutex_is_locked(&ep
->ep_lock
),
284 slice
->cpl_page
->cp_vmpage
);
288 static const struct cl_page_operations echo_page_ops
= {
289 .cpo_own
= echo_page_own
,
290 .cpo_disown
= echo_page_disown
,
291 .cpo_discard
= echo_page_discard
,
292 .cpo_fini
= echo_page_fini
,
293 .cpo_print
= echo_page_print
,
294 .cpo_is_vmlocked
= echo_page_is_vmlocked
,
297 .cpo_prep
= echo_page_prep
,
298 .cpo_completion
= echo_page_completion
,
301 .cpo_prep
= echo_page_prep
,
302 .cpo_completion
= echo_page_completion
,
309 /** \defgroup echo_lock Locking
311 * echo lock operations
315 static void echo_lock_fini(const struct lu_env
*env
,
316 struct cl_lock_slice
*slice
)
318 struct echo_lock
*ecl
= cl2echo_lock(slice
);
320 LASSERT(list_empty(&ecl
->el_chain
));
321 kmem_cache_free(echo_lock_kmem
, ecl
);
324 static struct cl_lock_operations echo_lock_ops
= {
325 .clo_fini
= echo_lock_fini
,
330 /** \defgroup echo_cl_ops cl_object operations
332 * operations for cl_object
336 static int echo_page_init(const struct lu_env
*env
, struct cl_object
*obj
,
337 struct cl_page
*page
, pgoff_t index
)
339 struct echo_page
*ep
= cl_object_page_slice(obj
, page
);
340 struct echo_object
*eco
= cl2echo_obj(obj
);
342 get_page(page
->cp_vmpage
);
343 mutex_init(&ep
->ep_lock
);
344 cl_page_slice_add(page
, &ep
->ep_cl
, obj
, index
, &echo_page_ops
);
345 atomic_inc(&eco
->eo_npages
);
349 static int echo_io_init(const struct lu_env
*env
, struct cl_object
*obj
,
355 static int echo_lock_init(const struct lu_env
*env
,
356 struct cl_object
*obj
, struct cl_lock
*lock
,
357 const struct cl_io
*unused
)
359 struct echo_lock
*el
;
361 el
= kmem_cache_zalloc(echo_lock_kmem
, GFP_NOFS
);
363 cl_lock_slice_add(lock
, &el
->el_cl
, obj
, &echo_lock_ops
);
364 el
->el_object
= cl2echo_obj(obj
);
365 INIT_LIST_HEAD(&el
->el_chain
);
366 atomic_set(&el
->el_refcount
, 0);
368 return !el
? -ENOMEM
: 0;
371 static int echo_conf_set(const struct lu_env
*env
, struct cl_object
*obj
,
372 const struct cl_object_conf
*conf
)
377 static const struct cl_object_operations echo_cl_obj_ops
= {
378 .coo_page_init
= echo_page_init
,
379 .coo_lock_init
= echo_lock_init
,
380 .coo_io_init
= echo_io_init
,
381 .coo_conf_set
= echo_conf_set
384 /** @} echo_cl_ops */
386 /** \defgroup echo_lu_ops lu_object operations
388 * operations for echo lu object.
392 static int echo_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
393 const struct lu_object_conf
*conf
)
395 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(obj
->lo_dev
));
396 struct echo_client_obd
*ec
= ed
->ed_ec
;
397 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
398 const struct cl_object_conf
*cconf
;
399 struct echo_object_conf
*econf
;
402 struct lu_object
*below
;
403 struct lu_device
*under
;
406 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
,
410 lu_object_add(obj
, below
);
413 cconf
= lu2cl_conf(conf
);
414 econf
= cl2echo_conf(cconf
);
416 LASSERT(econf
->eoc_md
);
417 eco
->eo_lsm
= *econf
->eoc_md
;
418 /* clear the lsm pointer so that it won't get freed. */
419 *econf
->eoc_md
= NULL
;
422 atomic_set(&eco
->eo_npages
, 0);
423 cl_object_page_init(lu2cl(obj
), sizeof(struct echo_page
));
425 spin_lock(&ec
->ec_lock
);
426 list_add_tail(&eco
->eo_obj_chain
, &ec
->ec_objects
);
427 spin_unlock(&ec
->ec_lock
);
432 /* taken from osc_unpackmd() */
433 static int echo_alloc_memmd(struct echo_device
*ed
,
434 struct lov_stripe_md
**lsmp
)
438 /* If export is lov/osc then use their obd method */
440 return obd_alloc_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
441 /* OFD has no unpackmd method, do everything here */
442 lsm_size
= lov_stripe_md_size(1);
445 *lsmp
= kzalloc(lsm_size
, GFP_NOFS
);
449 (*lsmp
)->lsm_oinfo
[0] = kzalloc(sizeof(struct lov_oinfo
), GFP_NOFS
);
450 if (!(*lsmp
)->lsm_oinfo
[0]) {
455 loi_init((*lsmp
)->lsm_oinfo
[0]);
456 (*lsmp
)->lsm_maxbytes
= LUSTRE_STRIPE_MAXBYTES
;
457 ostid_set_seq_echo(&(*lsmp
)->lsm_oi
);
462 static int echo_free_memmd(struct echo_device
*ed
, struct lov_stripe_md
**lsmp
)
466 /* If export is lov/osc then use their obd method */
468 return obd_free_memmd(ed
->ed_ec
->ec_exp
, lsmp
);
469 /* OFD has no unpackmd method, do everything here */
470 lsm_size
= lov_stripe_md_size(1);
472 kfree((*lsmp
)->lsm_oinfo
[0]);
478 static void echo_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
480 struct echo_object
*eco
= cl2echo_obj(lu2cl(obj
));
481 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
483 LASSERT(atomic_read(&eco
->eo_npages
) == 0);
485 spin_lock(&ec
->ec_lock
);
486 list_del_init(&eco
->eo_obj_chain
);
487 spin_unlock(&ec
->ec_lock
);
490 lu_object_header_fini(obj
->lo_header
);
493 echo_free_memmd(eco
->eo_dev
, &eco
->eo_lsm
);
494 kmem_cache_free(echo_object_kmem
, eco
);
497 static int echo_object_print(const struct lu_env
*env
, void *cookie
,
498 lu_printer_t p
, const struct lu_object
*o
)
500 struct echo_object
*obj
= cl2echo_obj(lu2cl(o
));
502 return (*p
)(env
, cookie
, "echoclient-object@%p", obj
);
505 static const struct lu_object_operations echo_lu_obj_ops
= {
506 .loo_object_init
= echo_object_init
,
507 .loo_object_delete
= NULL
,
508 .loo_object_release
= NULL
,
509 .loo_object_free
= echo_object_free
,
510 .loo_object_print
= echo_object_print
,
511 .loo_object_invariant
= NULL
514 /** @} echo_lu_ops */
516 /** \defgroup echo_lu_dev_ops lu_device operations
518 * Operations for echo lu device.
522 static struct lu_object
*echo_object_alloc(const struct lu_env
*env
,
523 const struct lu_object_header
*hdr
,
524 struct lu_device
*dev
)
526 struct echo_object
*eco
;
527 struct lu_object
*obj
= NULL
;
529 /* we're the top dev. */
531 eco
= kmem_cache_zalloc(echo_object_kmem
, GFP_NOFS
);
533 struct cl_object_header
*hdr
= &eco
->eo_hdr
;
535 obj
= &echo_obj2cl(eco
)->co_lu
;
536 cl_object_header_init(hdr
);
537 hdr
->coh_page_bufsize
= cfs_size_round(sizeof(struct cl_page
));
539 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
540 lu_object_add_top(&hdr
->coh_lu
, obj
);
542 eco
->eo_cl
.co_ops
= &echo_cl_obj_ops
;
543 obj
->lo_ops
= &echo_lu_obj_ops
;
548 static const struct lu_device_operations echo_device_lu_ops
= {
549 .ldo_object_alloc
= echo_object_alloc
,
552 /** @} echo_lu_dev_ops */
554 static const struct cl_device_operations echo_device_cl_ops
= {
557 /** \defgroup echo_init Setup and teardown
559 * Init and fini functions for echo client.
563 static int echo_site_init(const struct lu_env
*env
, struct echo_device
*ed
)
565 struct cl_site
*site
= &ed
->ed_site_myself
;
568 /* initialize site */
569 rc
= cl_site_init(site
, &ed
->ed_cl
);
571 CERROR("Cannot initialize site for echo client(%d)\n", rc
);
575 rc
= lu_site_init_finish(&site
->cs_lu
);
583 static void echo_site_fini(const struct lu_env
*env
, struct echo_device
*ed
)
586 cl_site_fini(ed
->ed_site
);
591 static void *echo_thread_key_init(const struct lu_context
*ctx
,
592 struct lu_context_key
*key
)
594 struct echo_thread_info
*info
;
596 info
= kmem_cache_zalloc(echo_thread_kmem
, GFP_NOFS
);
598 info
= ERR_PTR(-ENOMEM
);
602 static void echo_thread_key_fini(const struct lu_context
*ctx
,
603 struct lu_context_key
*key
, void *data
)
605 struct echo_thread_info
*info
= data
;
607 kmem_cache_free(echo_thread_kmem
, info
);
610 static void echo_thread_key_exit(const struct lu_context
*ctx
,
611 struct lu_context_key
*key
, void *data
)
615 static struct lu_context_key echo_thread_key
= {
616 .lct_tags
= LCT_CL_THREAD
,
617 .lct_init
= echo_thread_key_init
,
618 .lct_fini
= echo_thread_key_fini
,
619 .lct_exit
= echo_thread_key_exit
622 static void *echo_session_key_init(const struct lu_context
*ctx
,
623 struct lu_context_key
*key
)
625 struct echo_session_info
*session
;
627 session
= kmem_cache_zalloc(echo_session_kmem
, GFP_NOFS
);
629 session
= ERR_PTR(-ENOMEM
);
633 static void echo_session_key_fini(const struct lu_context
*ctx
,
634 struct lu_context_key
*key
, void *data
)
636 struct echo_session_info
*session
= data
;
638 kmem_cache_free(echo_session_kmem
, session
);
641 static void echo_session_key_exit(const struct lu_context
*ctx
,
642 struct lu_context_key
*key
, void *data
)
646 static struct lu_context_key echo_session_key
= {
647 .lct_tags
= LCT_SESSION
,
648 .lct_init
= echo_session_key_init
,
649 .lct_fini
= echo_session_key_fini
,
650 .lct_exit
= echo_session_key_exit
653 LU_TYPE_INIT_FINI(echo
, &echo_thread_key
, &echo_session_key
);
655 static struct lu_device
*echo_device_alloc(const struct lu_env
*env
,
656 struct lu_device_type
*t
,
657 struct lustre_cfg
*cfg
)
659 struct lu_device
*next
;
660 struct echo_device
*ed
;
661 struct cl_device
*cd
;
662 struct obd_device
*obd
= NULL
; /* to keep compiler happy */
663 struct obd_device
*tgt
;
664 const char *tgt_type_name
;
667 ed
= kzalloc(sizeof(*ed
), GFP_NOFS
);
674 rc
= cl_device_init(cd
, t
);
678 cd
->cd_lu_dev
.ld_ops
= &echo_device_lu_ops
;
679 cd
->cd_ops
= &echo_device_cl_ops
;
681 obd
= class_name2obd(lustre_cfg_string(cfg
, 0));
685 tgt
= class_name2obd(lustre_cfg_string(cfg
, 1));
687 CERROR("Can not find tgt device %s\n",
688 lustre_cfg_string(cfg
, 1));
690 goto out_device_fini
;
693 next
= tgt
->obd_lu_dev
;
694 if (!strcmp(tgt
->obd_type
->typ_name
, LUSTRE_MDT_NAME
)) {
695 CERROR("echo MDT client must be run on server\n");
697 goto out_device_fini
;
700 rc
= echo_site_init(env
, ed
);
702 goto out_device_fini
;
704 rc
= echo_client_setup(env
, obd
, cfg
);
708 ed
->ed_ec
= &obd
->u
.echo_client
;
710 /* if echo client is to be stacked upon ost device, the next is
711 * NULL since ost is not a clio device so far
713 if (next
&& !lu_device_is_cl(next
))
716 tgt_type_name
= tgt
->obd_type
->typ_name
;
723 next
->ld_site
= &ed
->ed_site
->cs_lu
;
724 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(env
, next
,
725 next
->ld_type
->ldt_name
,
731 LASSERT(strcmp(tgt_type_name
, LUSTRE_OST_NAME
) == 0);
735 return &cd
->cd_lu_dev
;
738 err
= echo_client_cleanup(obd
);
740 CERROR("Cleanup obd device %s error(%d)\n",
743 echo_site_fini(env
, ed
);
745 cl_device_fini(&ed
->ed_cl
);
752 static int echo_device_init(const struct lu_env
*env
, struct lu_device
*d
,
753 const char *name
, struct lu_device
*next
)
759 static struct lu_device
*echo_device_fini(const struct lu_env
*env
,
762 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
763 struct lu_device
*next
= ed
->ed_next
;
766 next
= next
->ld_type
->ldt_ops
->ldto_device_fini(env
, next
);
770 static void echo_lock_release(const struct lu_env
*env
,
771 struct echo_lock
*ecl
,
774 struct cl_lock
*clk
= echo_lock2cl(ecl
);
776 cl_lock_release(env
, clk
);
779 static struct lu_device
*echo_device_free(const struct lu_env
*env
,
782 struct echo_device
*ed
= cl2echo_dev(lu2cl_dev(d
));
783 struct echo_client_obd
*ec
= ed
->ed_ec
;
784 struct echo_object
*eco
;
785 struct lu_device
*next
= ed
->ed_next
;
787 CDEBUG(D_INFO
, "echo device:%p is going to be freed, next = %p\n",
790 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
792 /* check if there are objects still alive.
793 * It shouldn't have any object because lu_site_purge would cleanup
794 * all of cached objects. Anyway, probably the echo device is being
795 * parallelly accessed.
797 spin_lock(&ec
->ec_lock
);
798 list_for_each_entry(eco
, &ec
->ec_objects
, eo_obj_chain
)
800 spin_unlock(&ec
->ec_lock
);
803 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
806 "Waiting for the reference of echo object to be dropped\n");
808 /* Wait for the last reference to be dropped. */
809 spin_lock(&ec
->ec_lock
);
810 while (!list_empty(&ec
->ec_objects
)) {
811 spin_unlock(&ec
->ec_lock
);
812 CERROR("echo_client still has objects at cleanup time, wait for 1 second\n");
813 set_current_state(TASK_UNINTERRUPTIBLE
);
814 schedule_timeout(cfs_time_seconds(1));
815 lu_site_purge(env
, &ed
->ed_site
->cs_lu
, -1);
816 spin_lock(&ec
->ec_lock
);
818 spin_unlock(&ec
->ec_lock
);
820 LASSERT(list_empty(&ec
->ec_locks
));
822 CDEBUG(D_INFO
, "No object exists, exiting...\n");
824 echo_client_cleanup(d
->ld_obd
);
827 next
= next
->ld_type
->ldt_ops
->ldto_device_free(env
, next
);
829 LASSERT(ed
->ed_site
== lu2cl_site(d
->ld_site
));
830 echo_site_fini(env
, ed
);
831 cl_device_fini(&ed
->ed_cl
);
837 static const struct lu_device_type_operations echo_device_type_ops
= {
838 .ldto_init
= echo_type_init
,
839 .ldto_fini
= echo_type_fini
,
841 .ldto_start
= echo_type_start
,
842 .ldto_stop
= echo_type_stop
,
844 .ldto_device_alloc
= echo_device_alloc
,
845 .ldto_device_free
= echo_device_free
,
846 .ldto_device_init
= echo_device_init
,
847 .ldto_device_fini
= echo_device_fini
850 static struct lu_device_type echo_device_type
= {
851 .ldt_tags
= LU_DEVICE_CL
,
852 .ldt_name
= LUSTRE_ECHO_CLIENT_NAME
,
853 .ldt_ops
= &echo_device_type_ops
,
854 .ldt_ctx_tags
= LCT_CL_THREAD
,
859 /** \defgroup echo_exports Exported operations
861 * exporting functions to echo client
866 /* Interfaces to echo client obd device */
867 static struct echo_object
*cl_echo_object_find(struct echo_device
*d
,
868 struct lov_stripe_md
**lsmp
)
871 struct echo_thread_info
*info
;
872 struct echo_object_conf
*conf
;
873 struct lov_stripe_md
*lsm
;
874 struct echo_object
*eco
;
875 struct cl_object
*obj
;
883 LASSERTF(ostid_id(&lsm
->lsm_oi
) != 0, DOSTID
"\n", POSTID(&lsm
->lsm_oi
));
884 LASSERTF(ostid_seq(&lsm
->lsm_oi
) == FID_SEQ_ECHO
, DOSTID
"\n",
885 POSTID(&lsm
->lsm_oi
));
887 /* Never return an object if the obd is to be freed. */
888 if (echo_dev2cl(d
)->cd_lu_dev
.ld_obd
->obd_stopping
)
889 return ERR_PTR(-ENODEV
);
891 env
= cl_env_get(&refcheck
);
895 info
= echo_env_info(env
);
896 conf
= &info
->eti_conf
;
898 struct lov_oinfo
*oinfo
= lsm
->lsm_oinfo
[0];
901 oinfo
->loi_oi
= lsm
->lsm_oi
;
902 conf
->eoc_cl
.u
.coc_oinfo
= oinfo
;
906 fid
= &info
->eti_fid
;
907 rc
= ostid_to_fid(fid
, &lsm
->lsm_oi
, 0);
913 /* In the function below, .hs_keycmp resolves to
914 * lu_obj_hop_keycmp()
916 /* coverity[overrun-buffer-val] */
917 obj
= cl_object_find(env
, echo_dev2cl(d
), fid
, &conf
->eoc_cl
);
923 eco
= cl2echo_obj(obj
);
924 if (eco
->eo_deleted
) {
925 cl_object_put(env
, obj
);
926 eco
= ERR_PTR(-EAGAIN
);
930 cl_env_put(env
, &refcheck
);
934 static int cl_echo_object_put(struct echo_object
*eco
)
937 struct cl_object
*obj
= echo_obj2cl(eco
);
940 env
= cl_env_get(&refcheck
);
944 /* an external function to kill an object? */
945 if (eco
->eo_deleted
) {
946 struct lu_object_header
*loh
= obj
->co_lu
.lo_header
;
948 LASSERT(&eco
->eo_hdr
== luh2coh(loh
));
949 set_bit(LU_OBJECT_HEARD_BANSHEE
, &loh
->loh_flags
);
952 cl_object_put(env
, obj
);
953 cl_env_put(env
, &refcheck
);
957 static int cl_echo_enqueue0(struct lu_env
*env
, struct echo_object
*eco
,
958 u64 start
, u64 end
, int mode
,
959 __u64
*cookie
, __u32 enqflags
)
963 struct cl_object
*obj
;
964 struct cl_lock_descr
*descr
;
965 struct echo_thread_info
*info
;
968 info
= echo_env_info(env
);
970 lck
= &info
->eti_lock
;
971 obj
= echo_obj2cl(eco
);
973 memset(lck
, 0, sizeof(*lck
));
974 descr
= &lck
->cll_descr
;
975 descr
->cld_obj
= obj
;
976 descr
->cld_start
= cl_index(obj
, start
);
977 descr
->cld_end
= cl_index(obj
, end
);
978 descr
->cld_mode
= mode
== LCK_PW
? CLM_WRITE
: CLM_READ
;
979 descr
->cld_enq_flags
= enqflags
;
982 rc
= cl_lock_request(env
, io
, lck
);
984 struct echo_client_obd
*ec
= eco
->eo_dev
->ed_ec
;
985 struct echo_lock
*el
;
987 el
= cl2echo_lock(cl_lock_at(lck
, &echo_device_type
));
988 spin_lock(&ec
->ec_lock
);
989 if (list_empty(&el
->el_chain
)) {
990 list_add(&el
->el_chain
, &ec
->ec_locks
);
991 el
->el_cookie
= ++ec
->ec_unique
;
993 atomic_inc(&el
->el_refcount
);
994 *cookie
= el
->el_cookie
;
995 spin_unlock(&ec
->ec_lock
);
1000 static int cl_echo_cancel0(struct lu_env
*env
, struct echo_device
*ed
,
1003 struct echo_client_obd
*ec
= ed
->ed_ec
;
1004 struct echo_lock
*ecl
= NULL
;
1005 struct list_head
*el
;
1006 int found
= 0, still_used
= 0;
1008 spin_lock(&ec
->ec_lock
);
1009 list_for_each(el
, &ec
->ec_locks
) {
1010 ecl
= list_entry(el
, struct echo_lock
, el_chain
);
1011 CDEBUG(D_INFO
, "ecl: %p, cookie: %#llx\n", ecl
, ecl
->el_cookie
);
1012 found
= (ecl
->el_cookie
== cookie
);
1014 if (atomic_dec_and_test(&ecl
->el_refcount
))
1015 list_del_init(&ecl
->el_chain
);
1021 spin_unlock(&ec
->ec_lock
);
1026 echo_lock_release(env
, ecl
, still_used
);
1030 static void echo_commit_callback(const struct lu_env
*env
, struct cl_io
*io
,
1031 struct cl_page
*page
)
1033 struct echo_thread_info
*info
;
1034 struct cl_2queue
*queue
;
1036 info
= echo_env_info(env
);
1037 LASSERT(io
== &info
->eti_io
);
1039 queue
= &info
->eti_queue
;
1040 cl_page_list_add(&queue
->c2_qout
, page
);
1043 static int cl_echo_object_brw(struct echo_object
*eco
, int rw
, u64 offset
,
1044 struct page
**pages
, int npages
, int async
)
1047 struct echo_thread_info
*info
;
1048 struct cl_object
*obj
= echo_obj2cl(eco
);
1049 struct echo_device
*ed
= eco
->eo_dev
;
1050 struct cl_2queue
*queue
;
1052 struct cl_page
*clp
;
1053 struct lustre_handle lh
= { 0 };
1054 int page_size
= cl_page_size(obj
);
1059 LASSERT((offset
& ~PAGE_MASK
) == 0);
1060 LASSERT(ed
->ed_next
);
1061 env
= cl_env_get(&refcheck
);
1063 return PTR_ERR(env
);
1065 info
= echo_env_info(env
);
1067 queue
= &info
->eti_queue
;
1069 cl_2queue_init(queue
);
1071 io
->ci_ignore_layout
= 1;
1072 rc
= cl_io_init(env
, io
, CIT_MISC
, obj
);
1077 rc
= cl_echo_enqueue0(env
, eco
, offset
,
1078 offset
+ npages
* PAGE_SIZE
- 1,
1079 rw
== READ
? LCK_PR
: LCK_PW
, &lh
.cookie
,
1084 for (i
= 0; i
< npages
; i
++) {
1086 clp
= cl_page_find(env
, obj
, cl_index(obj
, offset
),
1087 pages
[i
], CPT_TRANSIENT
);
1092 LASSERT(clp
->cp_type
== CPT_TRANSIENT
);
1094 rc
= cl_page_own(env
, io
, clp
);
1096 LASSERT(clp
->cp_state
== CPS_FREEING
);
1097 cl_page_put(env
, clp
);
1101 * Add a page to the incoming page list of 2-queue.
1103 cl_page_list_add(&queue
->c2_qin
, clp
);
1105 /* drop the reference count for cl_page_find, so that the page
1106 * will be freed in cl_2queue_fini.
1108 cl_page_put(env
, clp
);
1109 cl_page_clip(env
, clp
, 0, page_size
);
1111 offset
+= page_size
;
1115 enum cl_req_type typ
= rw
== READ
? CRT_READ
: CRT_WRITE
;
1117 async
= async
&& (typ
== CRT_WRITE
);
1119 rc
= cl_io_commit_async(env
, io
, &queue
->c2_qin
,
1121 echo_commit_callback
);
1123 rc
= cl_io_submit_sync(env
, io
, typ
, queue
, 0);
1124 CDEBUG(D_INFO
, "echo_client %s write returns %d\n",
1125 async
? "async" : "sync", rc
);
1128 cl_echo_cancel0(env
, ed
, lh
.cookie
);
1130 cl_2queue_discard(env
, io
, queue
);
1131 cl_2queue_disown(env
, io
, queue
);
1132 cl_2queue_fini(env
, queue
);
1133 cl_io_fini(env
, io
);
1135 cl_env_put(env
, &refcheck
);
1139 /** @} echo_exports */
1141 static u64 last_object_id
;
1143 static int echo_create_object(const struct lu_env
*env
, struct echo_device
*ed
,
1144 struct obdo
*oa
, struct obd_trans_info
*oti
)
1146 struct echo_object
*eco
;
1147 struct echo_client_obd
*ec
= ed
->ed_ec
;
1148 struct lov_stripe_md
*lsm
= NULL
;
1152 if (!(oa
->o_valid
& OBD_MD_FLID
) ||
1153 !(oa
->o_valid
& OBD_MD_FLGROUP
) ||
1154 !fid_seq_is_echo(ostid_seq(&oa
->o_oi
))) {
1155 CERROR("invalid oid " DOSTID
"\n", POSTID(&oa
->o_oi
));
1159 rc
= echo_alloc_memmd(ed
, &lsm
);
1161 CERROR("Cannot allocate md: rc = %d\n", rc
);
1165 /* setup object ID here */
1166 lsm
->lsm_oi
= oa
->o_oi
;
1168 if (ostid_id(&lsm
->lsm_oi
) == 0)
1169 ostid_set_id(&lsm
->lsm_oi
, ++last_object_id
);
1171 rc
= obd_create(env
, ec
->ec_exp
, oa
, &lsm
, oti
);
1173 CERROR("Cannot create objects: rc = %d\n", rc
);
1178 /* See what object ID we were given */
1179 oa
->o_oi
= lsm
->lsm_oi
;
1180 oa
->o_valid
|= OBD_MD_FLID
;
1182 eco
= cl_echo_object_find(ed
, &lsm
);
1187 cl_echo_object_put(eco
);
1189 CDEBUG(D_INFO
, "oa oid "DOSTID
"\n", POSTID(&oa
->o_oi
));
1193 obd_destroy(env
, ec
->ec_exp
, oa
, lsm
, oti
, NULL
);
1195 echo_free_memmd(ed
, &lsm
);
1197 CERROR("create object failed with: rc = %d\n", rc
);
1201 static int echo_get_object(struct echo_object
**ecop
, struct echo_device
*ed
,
1204 struct lov_stripe_md
*lsm
= NULL
;
1205 struct echo_object
*eco
;
1208 if ((oa
->o_valid
& OBD_MD_FLID
) == 0 || ostid_id(&oa
->o_oi
) == 0) {
1209 /* disallow use of object id 0 */
1210 CERROR("No valid oid\n");
1214 rc
= echo_alloc_memmd(ed
, &lsm
);
1218 lsm
->lsm_oi
= oa
->o_oi
;
1219 if (!(oa
->o_valid
& OBD_MD_FLGROUP
))
1220 ostid_set_seq_echo(&lsm
->lsm_oi
);
1223 eco
= cl_echo_object_find(ed
, &lsm
);
1229 echo_free_memmd(ed
, &lsm
);
1233 static void echo_put_object(struct echo_object
*eco
)
1237 rc
= cl_echo_object_put(eco
);
1239 CERROR("%s: echo client drop an object failed: rc = %d\n",
1240 eco
->eo_dev
->ed_ec
->ec_exp
->exp_obd
->obd_name
, rc
);
1244 echo_client_page_debug_setup(struct page
*page
, int rw
, u64 id
,
1245 u64 offset
, u64 count
)
1252 /* no partial pages on the client */
1253 LASSERT(count
== PAGE_SIZE
);
1257 for (delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1258 if (rw
== OBD_BRW_WRITE
) {
1259 stripe_off
= offset
+ delta
;
1262 stripe_off
= 0xdeadbeef00c0ffeeULL
;
1263 stripe_id
= 0xdeadbeef00c0ffeeULL
;
1265 block_debug_setup(addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1266 stripe_off
, stripe_id
);
1272 static int echo_client_page_debug_check(struct page
*page
, u64 id
,
1273 u64 offset
, u64 count
)
1282 /* no partial pages on the client */
1283 LASSERT(count
== PAGE_SIZE
);
1287 for (rc
= delta
= 0; delta
< PAGE_SIZE
; delta
+= OBD_ECHO_BLOCK_SIZE
) {
1288 stripe_off
= offset
+ delta
;
1291 rc2
= block_debug_check("test_brw",
1292 addr
+ delta
, OBD_ECHO_BLOCK_SIZE
,
1293 stripe_off
, stripe_id
);
1295 CERROR("Error in echo object %#llx\n", id
);
1304 static int echo_client_kbrw(struct echo_device
*ed
, int rw
, struct obdo
*oa
,
1305 struct echo_object
*eco
, u64 offset
,
1306 u64 count
, int async
,
1307 struct obd_trans_info
*oti
)
1310 struct brw_page
*pga
;
1311 struct brw_page
*pgp
;
1312 struct page
**pages
;
1320 verify
= (ostid_id(&oa
->o_oi
) != ECHO_PERSISTENT_OBJID
&&
1321 (oa
->o_valid
& OBD_MD_FLFLAGS
) != 0 &&
1322 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) != 0);
1324 gfp_mask
= ((ostid_id(&oa
->o_oi
) & 2) == 0) ? GFP_KERNEL
: GFP_HIGHUSER
;
1326 LASSERT(rw
== OBD_BRW_WRITE
|| rw
== OBD_BRW_READ
);
1329 (count
& (~PAGE_MASK
)) != 0)
1332 /* XXX think again with misaligned I/O */
1333 npages
= count
>> PAGE_SHIFT
;
1335 if (rw
== OBD_BRW_WRITE
)
1336 brw_flags
= OBD_BRW_ASYNC
;
1338 pga
= kcalloc(npages
, sizeof(*pga
), GFP_NOFS
);
1342 pages
= kcalloc(npages
, sizeof(*pages
), GFP_NOFS
);
1348 for (i
= 0, pgp
= pga
, off
= offset
;
1350 i
++, pgp
++, off
+= PAGE_SIZE
) {
1351 LASSERT(!pgp
->pg
); /* for cleanup */
1354 pgp
->pg
= alloc_page(gfp_mask
);
1359 pgp
->count
= PAGE_SIZE
;
1361 pgp
->flag
= brw_flags
;
1364 echo_client_page_debug_setup(pgp
->pg
, rw
,
1365 ostid_id(&oa
->o_oi
), off
,
1369 /* brw mode can only be used at client */
1370 LASSERT(ed
->ed_next
);
1371 rc
= cl_echo_object_brw(eco
, rw
, offset
, pages
, npages
, async
);
1374 if (rc
!= 0 || rw
!= OBD_BRW_READ
)
1377 for (i
= 0, pgp
= pga
; i
< npages
; i
++, pgp
++) {
1384 vrc
= echo_client_page_debug_check(pgp
->pg
,
1385 ostid_id(&oa
->o_oi
),
1386 pgp
->off
, pgp
->count
);
1387 if (vrc
!= 0 && rc
== 0)
1390 __free_page(pgp
->pg
);
1397 static int echo_client_prep_commit(const struct lu_env
*env
,
1398 struct obd_export
*exp
, int rw
,
1399 struct obdo
*oa
, struct echo_object
*eco
,
1400 u64 offset
, u64 count
,
1401 u64 batch
, struct obd_trans_info
*oti
,
1404 struct obd_ioobj ioo
;
1405 struct niobuf_local
*lnb
;
1406 struct niobuf_remote
*rnb
;
1408 u64 npages
, tot_pages
;
1409 int i
, ret
= 0, brw_flags
= 0;
1411 if (count
<= 0 || (count
& (~PAGE_MASK
)) != 0)
1414 npages
= batch
>> PAGE_SHIFT
;
1415 tot_pages
= count
>> PAGE_SHIFT
;
1417 lnb
= kcalloc(npages
, sizeof(struct niobuf_local
), GFP_NOFS
);
1418 rnb
= kcalloc(npages
, sizeof(struct niobuf_remote
), GFP_NOFS
);
1425 if (rw
== OBD_BRW_WRITE
&& async
)
1426 brw_flags
|= OBD_BRW_ASYNC
;
1428 obdo_to_ioobj(oa
, &ioo
);
1432 for (; tot_pages
; tot_pages
-= npages
) {
1435 if (tot_pages
< npages
)
1438 for (i
= 0; i
< npages
; i
++, off
+= PAGE_SIZE
) {
1439 rnb
[i
].offset
= off
;
1440 rnb
[i
].len
= PAGE_SIZE
;
1441 rnb
[i
].flags
= brw_flags
;
1444 ioo
.ioo_bufcnt
= npages
;
1445 oti
->oti_transno
= 0;
1448 ret
= obd_preprw(env
, rw
, exp
, oa
, 1, &ioo
, rnb
, &lpages
,
1452 LASSERT(lpages
== npages
);
1454 for (i
= 0; i
< lpages
; i
++) {
1455 struct page
*page
= lnb
[i
].page
;
1457 /* read past eof? */
1458 if (!page
&& lnb
[i
].rc
== 0)
1462 lnb
[i
].flags
|= OBD_BRW_ASYNC
;
1464 if (ostid_id(&oa
->o_oi
) == ECHO_PERSISTENT_OBJID
||
1465 (oa
->o_valid
& OBD_MD_FLFLAGS
) == 0 ||
1466 (oa
->o_flags
& OBD_FL_DEBUG_CHECK
) == 0)
1469 if (rw
== OBD_BRW_WRITE
)
1470 echo_client_page_debug_setup(page
, rw
,
1471 ostid_id(&oa
->o_oi
),
1475 echo_client_page_debug_check(page
,
1476 ostid_id(&oa
->o_oi
),
1481 ret
= obd_commitrw(env
, rw
, exp
, oa
, 1, &ioo
,
1482 rnb
, npages
, lnb
, oti
, ret
);
1486 /* Reset oti otherwise it would confuse ldiskfs. */
1487 memset(oti
, 0, sizeof(*oti
));
1489 /* Reuse env context. */
1490 lu_context_exit((struct lu_context
*)&env
->le_ctx
);
1491 lu_context_enter((struct lu_context
*)&env
->le_ctx
);
1500 static int echo_client_brw_ioctl(const struct lu_env
*env
, int rw
,
1501 struct obd_export
*exp
,
1502 struct obd_ioctl_data
*data
,
1503 struct obd_trans_info
*dummy_oti
)
1505 struct obd_device
*obd
= class_exp2obd(exp
);
1506 struct echo_device
*ed
= obd2echo_dev(obd
);
1507 struct echo_client_obd
*ec
= ed
->ed_ec
;
1508 struct obdo
*oa
= &data
->ioc_obdo1
;
1509 struct echo_object
*eco
;
1514 LASSERT(oa
->o_valid
& OBD_MD_FLGROUP
);
1516 rc
= echo_get_object(&eco
, ed
, oa
);
1520 oa
->o_valid
&= ~OBD_MD_FLHANDLE
;
1522 /* OFD/obdfilter works only via prep/commit */
1523 test_mode
= (long)data
->ioc_pbuf1
;
1527 if (!ed
->ed_next
&& test_mode
!= 3) {
1529 data
->ioc_plen1
= data
->ioc_count
;
1532 /* Truncate batch size to maximum */
1533 if (data
->ioc_plen1
> PTLRPC_MAX_BRW_SIZE
)
1534 data
->ioc_plen1
= PTLRPC_MAX_BRW_SIZE
;
1536 switch (test_mode
) {
1540 rc
= echo_client_kbrw(ed
, rw
, oa
,
1541 eco
, data
->ioc_offset
,
1542 data
->ioc_count
, async
, dummy_oti
);
1545 rc
= echo_client_prep_commit(env
, ec
->ec_exp
, rw
, oa
,
1546 eco
, data
->ioc_offset
,
1547 data
->ioc_count
, data
->ioc_plen1
,
1553 echo_put_object(eco
);
1558 echo_client_iocontrol(unsigned int cmd
, struct obd_export
*exp
, int len
,
1559 void *karg
, void __user
*uarg
)
1561 struct obd_device
*obd
= exp
->exp_obd
;
1562 struct echo_device
*ed
= obd2echo_dev(obd
);
1563 struct echo_client_obd
*ec
= ed
->ed_ec
;
1564 struct echo_object
*eco
;
1565 struct obd_ioctl_data
*data
= karg
;
1566 struct obd_trans_info dummy_oti
;
1568 struct oti_req_ack_lock
*ack_lock
;
1571 int rw
= OBD_BRW_READ
;
1575 memset(&dummy_oti
, 0, sizeof(dummy_oti
));
1577 oa
= &data
->ioc_obdo1
;
1578 if (!(oa
->o_valid
& OBD_MD_FLGROUP
)) {
1579 oa
->o_valid
|= OBD_MD_FLGROUP
;
1580 ostid_set_seq_echo(&oa
->o_oi
);
1583 /* This FID is unpacked just for validation at this point */
1584 rc
= ostid_to_fid(&fid
, &oa
->o_oi
, 0);
1588 env
= kzalloc(sizeof(*env
), GFP_NOFS
);
1592 rc
= lu_env_init(env
, LCT_DT_THREAD
);
1599 case OBD_IOC_CREATE
: /* may create echo object */
1600 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1605 rc
= echo_create_object(env
, ed
, oa
, &dummy_oti
);
1608 case OBD_IOC_DESTROY
:
1609 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1614 rc
= echo_get_object(&eco
, ed
, oa
);
1616 rc
= obd_destroy(env
, ec
->ec_exp
, oa
, NULL
,
1619 eco
->eo_deleted
= 1;
1620 echo_put_object(eco
);
1624 case OBD_IOC_GETATTR
:
1625 rc
= echo_get_object(&eco
, ed
, oa
);
1627 struct obd_info oinfo
= {
1631 rc
= obd_getattr(env
, ec
->ec_exp
, &oinfo
);
1632 echo_put_object(eco
);
1636 case OBD_IOC_SETATTR
:
1637 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1642 rc
= echo_get_object(&eco
, ed
, oa
);
1644 struct obd_info oinfo
= {
1648 rc
= obd_setattr(env
, ec
->ec_exp
, &oinfo
, NULL
);
1649 echo_put_object(eco
);
1653 case OBD_IOC_BRW_WRITE
:
1654 if (!capable(CFS_CAP_SYS_ADMIN
)) {
1661 case OBD_IOC_BRW_READ
:
1662 rc
= echo_client_brw_ioctl(env
, rw
, exp
, data
, &dummy_oti
);
1666 CERROR("echo_ioctl(): unrecognised ioctl %#x\n", cmd
);
1675 /* XXX this should be in a helper also called by target_send_reply */
1676 for (ack_lock
= dummy_oti
.oti_ack_locks
, i
= 0; i
< 4;
1678 if (!ack_lock
->mode
)
1680 ldlm_lock_decref(&ack_lock
->lock
, ack_lock
->mode
);
1686 static int echo_client_setup(const struct lu_env
*env
,
1687 struct obd_device
*obddev
, struct lustre_cfg
*lcfg
)
1689 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1690 struct obd_device
*tgt
;
1691 struct obd_uuid echo_uuid
= { "ECHO_UUID" };
1692 struct obd_connect_data
*ocd
= NULL
;
1695 if (lcfg
->lcfg_bufcount
< 2 || LUSTRE_CFG_BUFLEN(lcfg
, 1) < 1) {
1696 CERROR("requires a TARGET OBD name\n");
1700 tgt
= class_name2obd(lustre_cfg_string(lcfg
, 1));
1701 if (!tgt
|| !tgt
->obd_attached
|| !tgt
->obd_set_up
) {
1702 CERROR("device not attached or not set up (%s)\n",
1703 lustre_cfg_string(lcfg
, 1));
1707 spin_lock_init(&ec
->ec_lock
);
1708 INIT_LIST_HEAD(&ec
->ec_objects
);
1709 INIT_LIST_HEAD(&ec
->ec_locks
);
1712 ocd
= kzalloc(sizeof(*ocd
), GFP_NOFS
);
1716 ocd
->ocd_connect_flags
= OBD_CONNECT_VERSION
| OBD_CONNECT_REQPORTAL
|
1717 OBD_CONNECT_BRW_SIZE
|
1718 OBD_CONNECT_GRANT
| OBD_CONNECT_FULL20
|
1719 OBD_CONNECT_64BITHASH
| OBD_CONNECT_LVB_TYPE
|
1721 ocd
->ocd_brw_size
= DT_MAX_BRW_SIZE
;
1722 ocd
->ocd_version
= LUSTRE_VERSION_CODE
;
1723 ocd
->ocd_group
= FID_SEQ_ECHO
;
1725 rc
= obd_connect(env
, &ec
->ec_exp
, tgt
, &echo_uuid
, ocd
, NULL
);
1730 CERROR("fail to connect to device %s\n",
1731 lustre_cfg_string(lcfg
, 1));
1738 static int echo_client_cleanup(struct obd_device
*obddev
)
1740 struct echo_client_obd
*ec
= &obddev
->u
.echo_client
;
1743 if (!list_empty(&obddev
->obd_exports
)) {
1744 CERROR("still has clients!\n");
1748 LASSERT(atomic_read(&ec
->ec_exp
->exp_refcount
) > 0);
1749 rc
= obd_disconnect(ec
->ec_exp
);
1751 CERROR("fail to disconnect device: %d\n", rc
);
1756 static int echo_client_connect(const struct lu_env
*env
,
1757 struct obd_export
**exp
,
1758 struct obd_device
*src
, struct obd_uuid
*cluuid
,
1759 struct obd_connect_data
*data
, void *localdata
)
1762 struct lustre_handle conn
= { 0 };
1764 rc
= class_connect(&conn
, src
, cluuid
);
1766 *exp
= class_conn2export(&conn
);
1772 static int echo_client_disconnect(struct obd_export
*exp
)
1781 rc
= class_disconnect(exp
);
1787 static struct obd_ops echo_client_obd_ops
= {
1788 .owner
= THIS_MODULE
,
1789 .iocontrol
= echo_client_iocontrol
,
1790 .connect
= echo_client_connect
,
1791 .disconnect
= echo_client_disconnect
1794 static int echo_client_init(void)
1798 rc
= lu_kmem_init(echo_caches
);
1800 rc
= class_register_type(&echo_client_obd_ops
, NULL
,
1801 LUSTRE_ECHO_CLIENT_NAME
,
1804 lu_kmem_fini(echo_caches
);
1809 static void echo_client_exit(void)
1811 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME
);
1812 lu_kmem_fini(echo_caches
);
1815 static int __init
obdecho_init(void)
1817 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
1819 LASSERT(PAGE_SIZE
% OBD_ECHO_BLOCK_SIZE
== 0);
1821 return echo_client_init();
1824 static void /*__exit*/ obdecho_exit(void)
1829 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1830 MODULE_DESCRIPTION("Lustre Echo Client test driver");
1831 MODULE_VERSION(LUSTRE_VERSION_STRING
);
1832 MODULE_LICENSE("GPL");
1834 module_init(obdecho_init
);
1835 module_exit(obdecho_exit
);
1837 /** @} echo_client */