4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * cl code shared between vvp and liblustre (and other Lustre clients in the
39 * Author: Nikita Danilov <nikita.danilov@sun.com>
42 #define DEBUG_SUBSYSTEM S_LLITE
44 #include "../../include/linux/libcfs/libcfs.h"
45 # include <linux/fs.h>
46 # include <linux/sched.h>
47 # include <linux/mm.h>
48 # include <linux/quotaops.h>
49 # include <linux/highmem.h>
50 # include <linux/pagemap.h>
51 # include <linux/rbtree.h>
53 #include "../include/obd.h"
54 #include "../include/obd_support.h"
55 #include "../include/lustre_fid.h"
56 #include "../include/lustre_lite.h"
57 #include "../include/lustre_dlm.h"
58 #include "../include/lustre_ver.h"
59 #include "../include/lustre_mdc.h"
60 #include "../include/cl_object.h"
62 #include "../include/lclient.h"
64 #include "../llite/llite_internal.h"
66 static const struct cl_req_operations ccc_req_ops
;
69 * ccc_ prefix stands for "Common Client Code".
72 static struct kmem_cache
*ccc_lock_kmem
;
73 static struct kmem_cache
*ccc_object_kmem
;
74 static struct kmem_cache
*ccc_thread_kmem
;
75 static struct kmem_cache
*ccc_session_kmem
;
76 static struct kmem_cache
*ccc_req_kmem
;
78 static struct lu_kmem_descr ccc_caches
[] = {
80 .ckd_cache
= &ccc_lock_kmem
,
81 .ckd_name
= "ccc_lock_kmem",
82 .ckd_size
= sizeof(struct ccc_lock
)
85 .ckd_cache
= &ccc_object_kmem
,
86 .ckd_name
= "ccc_object_kmem",
87 .ckd_size
= sizeof(struct ccc_object
)
90 .ckd_cache
= &ccc_thread_kmem
,
91 .ckd_name
= "ccc_thread_kmem",
92 .ckd_size
= sizeof(struct ccc_thread_info
),
95 .ckd_cache
= &ccc_session_kmem
,
96 .ckd_name
= "ccc_session_kmem",
97 .ckd_size
= sizeof(struct ccc_session
)
100 .ckd_cache
= &ccc_req_kmem
,
101 .ckd_name
= "ccc_req_kmem",
102 .ckd_size
= sizeof(struct ccc_req
)
109 /*****************************************************************************
111 * Vvp device and device type functions.
115 void *ccc_key_init(const struct lu_context
*ctx
, struct lu_context_key
*key
)
117 struct ccc_thread_info
*info
;
119 info
= kmem_cache_zalloc(ccc_thread_kmem
, GFP_NOFS
);
121 info
= ERR_PTR(-ENOMEM
);
125 void ccc_key_fini(const struct lu_context
*ctx
,
126 struct lu_context_key
*key
, void *data
)
128 struct ccc_thread_info
*info
= data
;
130 kmem_cache_free(ccc_thread_kmem
, info
);
133 void *ccc_session_key_init(const struct lu_context
*ctx
,
134 struct lu_context_key
*key
)
136 struct ccc_session
*session
;
138 session
= kmem_cache_zalloc(ccc_session_kmem
, GFP_NOFS
);
140 session
= ERR_PTR(-ENOMEM
);
144 void ccc_session_key_fini(const struct lu_context
*ctx
,
145 struct lu_context_key
*key
, void *data
)
147 struct ccc_session
*session
= data
;
149 kmem_cache_free(ccc_session_kmem
, session
);
152 struct lu_context_key ccc_key
= {
153 .lct_tags
= LCT_CL_THREAD
,
154 .lct_init
= ccc_key_init
,
155 .lct_fini
= ccc_key_fini
158 struct lu_context_key ccc_session_key
= {
159 .lct_tags
= LCT_SESSION
,
160 .lct_init
= ccc_session_key_init
,
161 .lct_fini
= ccc_session_key_fini
164 /* type constructor/destructor: ccc_type_{init,fini,start,stop}(). */
165 /* LU_TYPE_INIT_FINI(ccc, &ccc_key, &ccc_session_key); */
167 int ccc_device_init(const struct lu_env
*env
, struct lu_device
*d
,
168 const char *name
, struct lu_device
*next
)
170 struct ccc_device
*vdv
;
174 vdv
->cdv_next
= lu2cl_dev(next
);
176 LASSERT(d
->ld_site
&& next
->ld_type
);
177 next
->ld_site
= d
->ld_site
;
178 rc
= next
->ld_type
->ldt_ops
->ldto_device_init(
179 env
, next
, next
->ld_type
->ldt_name
, NULL
);
182 lu_ref_add(&next
->ld_reference
, "lu-stack", &lu_site_init
);
187 struct lu_device
*ccc_device_fini(const struct lu_env
*env
,
190 return cl2lu_dev(lu2ccc_dev(d
)->cdv_next
);
193 struct lu_device
*ccc_device_alloc(const struct lu_env
*env
,
194 struct lu_device_type
*t
,
195 struct lustre_cfg
*cfg
,
196 const struct lu_device_operations
*luops
,
197 const struct cl_device_operations
*clops
)
199 struct ccc_device
*vdv
;
200 struct lu_device
*lud
;
201 struct cl_site
*site
;
204 vdv
= kzalloc(sizeof(*vdv
), GFP_NOFS
);
206 return ERR_PTR(-ENOMEM
);
208 lud
= &vdv
->cdv_cl
.cd_lu_dev
;
209 cl_device_init(&vdv
->cdv_cl
, t
);
210 ccc2lu_dev(vdv
)->ld_ops
= luops
;
211 vdv
->cdv_cl
.cd_ops
= clops
;
213 site
= kzalloc(sizeof(*site
), GFP_NOFS
);
215 rc
= cl_site_init(site
, &vdv
->cdv_cl
);
217 rc
= lu_site_init_finish(&site
->cs_lu
);
219 LASSERT(!lud
->ld_site
);
220 CERROR("Cannot init lu_site, rc %d.\n", rc
);
226 ccc_device_free(env
, lud
);
232 struct lu_device
*ccc_device_free(const struct lu_env
*env
,
235 struct ccc_device
*vdv
= lu2ccc_dev(d
);
236 struct cl_site
*site
= lu2cl_site(d
->ld_site
);
237 struct lu_device
*next
= cl2lu_dev(vdv
->cdv_next
);
243 cl_device_fini(lu2cl_dev(d
));
248 int ccc_req_init(const struct lu_env
*env
, struct cl_device
*dev
,
254 vrq
= kmem_cache_zalloc(ccc_req_kmem
, GFP_NOFS
);
256 cl_req_slice_add(req
, &vrq
->crq_cl
, dev
, &ccc_req_ops
);
264 * An `emergency' environment used by ccc_inode_fini() when cl_env_get()
265 * fails. Access to this environment is serialized by ccc_inode_fini_guard
268 static struct lu_env
*ccc_inode_fini_env
;
271 * A mutex serializing calls to slp_inode_fini() under extreme memory
272 * pressure, when environments cannot be allocated.
274 static DEFINE_MUTEX(ccc_inode_fini_guard
);
275 static int dummy_refcheck
;
277 int ccc_global_init(struct lu_device_type
*device_type
)
281 result
= lu_kmem_init(ccc_caches
);
285 result
= lu_device_type_init(device_type
);
289 ccc_inode_fini_env
= cl_env_alloc(&dummy_refcheck
,
290 LCT_REMEMBER
|LCT_NOREF
);
291 if (IS_ERR(ccc_inode_fini_env
)) {
292 result
= PTR_ERR(ccc_inode_fini_env
);
296 ccc_inode_fini_env
->le_ctx
.lc_cookie
= 0x4;
299 lu_device_type_fini(device_type
);
301 lu_kmem_fini(ccc_caches
);
305 void ccc_global_fini(struct lu_device_type
*device_type
)
307 if (ccc_inode_fini_env
) {
308 cl_env_put(ccc_inode_fini_env
, &dummy_refcheck
);
309 ccc_inode_fini_env
= NULL
;
311 lu_device_type_fini(device_type
);
312 lu_kmem_fini(ccc_caches
);
315 /*****************************************************************************
321 struct lu_object
*ccc_object_alloc(const struct lu_env
*env
,
322 const struct lu_object_header
*unused
,
323 struct lu_device
*dev
,
324 const struct cl_object_operations
*clops
,
325 const struct lu_object_operations
*luops
)
327 struct ccc_object
*vob
;
328 struct lu_object
*obj
;
330 vob
= kmem_cache_zalloc(ccc_object_kmem
, GFP_NOFS
);
332 struct cl_object_header
*hdr
;
335 hdr
= &vob
->cob_header
;
336 cl_object_header_init(hdr
);
337 lu_object_init(obj
, &hdr
->coh_lu
, dev
);
338 lu_object_add_top(&hdr
->coh_lu
, obj
);
340 vob
->cob_cl
.co_ops
= clops
;
347 int ccc_object_init0(const struct lu_env
*env
,
348 struct ccc_object
*vob
,
349 const struct cl_object_conf
*conf
)
351 vob
->cob_inode
= conf
->coc_inode
;
352 vob
->cob_transient_pages
= 0;
353 cl_object_page_init(&vob
->cob_cl
, sizeof(struct ccc_page
));
357 int ccc_object_init(const struct lu_env
*env
, struct lu_object
*obj
,
358 const struct lu_object_conf
*conf
)
360 struct ccc_device
*dev
= lu2ccc_dev(obj
->lo_dev
);
361 struct ccc_object
*vob
= lu2ccc(obj
);
362 struct lu_object
*below
;
363 struct lu_device
*under
;
366 under
= &dev
->cdv_next
->cd_lu_dev
;
367 below
= under
->ld_ops
->ldo_object_alloc(env
, obj
->lo_header
, under
);
369 const struct cl_object_conf
*cconf
;
371 cconf
= lu2cl_conf(conf
);
372 INIT_LIST_HEAD(&vob
->cob_pending_list
);
373 lu_object_add(obj
, below
);
374 result
= ccc_object_init0(env
, vob
, cconf
);
380 void ccc_object_free(const struct lu_env
*env
, struct lu_object
*obj
)
382 struct ccc_object
*vob
= lu2ccc(obj
);
385 lu_object_header_fini(obj
->lo_header
);
386 kmem_cache_free(ccc_object_kmem
, vob
);
389 int ccc_lock_init(const struct lu_env
*env
,
390 struct cl_object
*obj
, struct cl_lock
*lock
,
391 const struct cl_io
*unused
,
392 const struct cl_lock_operations
*lkops
)
394 struct ccc_lock
*clk
;
397 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
399 clk
= kmem_cache_zalloc(ccc_lock_kmem
, GFP_NOFS
);
401 cl_lock_slice_add(lock
, &clk
->clk_cl
, obj
, lkops
);
408 int ccc_object_glimpse(const struct lu_env
*env
,
409 const struct cl_object
*obj
, struct ost_lvb
*lvb
)
411 struct inode
*inode
= ccc_object_inode(obj
);
413 lvb
->lvb_mtime
= cl_inode_mtime(inode
);
414 lvb
->lvb_atime
= cl_inode_atime(inode
);
415 lvb
->lvb_ctime
= cl_inode_ctime(inode
);
417 * LU-417: Add dirty pages block count lest i_blocks reports 0, some
418 * "cp" or "tar" on remote node may think it's a completely sparse file
421 if (lvb
->lvb_size
> 0 && lvb
->lvb_blocks
== 0)
422 lvb
->lvb_blocks
= dirty_cnt(inode
);
426 static void ccc_object_size_lock(struct cl_object
*obj
)
428 struct inode
*inode
= ccc_object_inode(obj
);
430 ll_inode_size_lock(inode
);
431 cl_object_attr_lock(obj
);
434 static void ccc_object_size_unlock(struct cl_object
*obj
)
436 struct inode
*inode
= ccc_object_inode(obj
);
438 cl_object_attr_unlock(obj
);
439 ll_inode_size_unlock(inode
);
442 /*****************************************************************************
448 struct page
*ccc_page_vmpage(const struct lu_env
*env
,
449 const struct cl_page_slice
*slice
)
451 return cl2vm_page(slice
);
454 int ccc_page_is_under_lock(const struct lu_env
*env
,
455 const struct cl_page_slice
*slice
,
458 struct ccc_io
*cio
= ccc_env_io(env
);
459 struct cl_lock_descr
*desc
= &ccc_env_info(env
)->cti_descr
;
460 struct cl_page
*page
= slice
->cpl_page
;
464 if (io
->ci_type
== CIT_READ
|| io
->ci_type
== CIT_WRITE
||
465 io
->ci_type
== CIT_FAULT
) {
466 if (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)
469 desc
->cld_start
= page
->cp_index
;
470 desc
->cld_end
= page
->cp_index
;
471 desc
->cld_obj
= page
->cp_obj
;
472 desc
->cld_mode
= CLM_READ
;
473 result
= cl_queue_match(&io
->ci_lockset
.cls_done
,
481 int ccc_fail(const struct lu_env
*env
, const struct cl_page_slice
*slice
)
490 int ccc_transient_page_prep(const struct lu_env
*env
,
491 const struct cl_page_slice
*slice
,
492 struct cl_io
*unused
)
494 /* transient page should always be sent. */
498 /*****************************************************************************
504 void ccc_lock_delete(const struct lu_env
*env
,
505 const struct cl_lock_slice
*slice
)
507 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
510 void ccc_lock_fini(const struct lu_env
*env
, struct cl_lock_slice
*slice
)
512 struct ccc_lock
*clk
= cl2ccc_lock(slice
);
514 kmem_cache_free(ccc_lock_kmem
, clk
);
517 int ccc_lock_enqueue(const struct lu_env
*env
,
518 const struct cl_lock_slice
*slice
,
519 struct cl_io
*unused
, __u32 enqflags
)
521 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
525 int ccc_lock_use(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
527 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
531 int ccc_lock_unuse(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
533 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
537 int ccc_lock_wait(const struct lu_env
*env
, const struct cl_lock_slice
*slice
)
539 CLOBINVRNT(env
, slice
->cls_obj
, ccc_object_invariant(slice
->cls_obj
));
544 * Implementation of cl_lock_operations::clo_fits_into() methods for ccc
545 * layer. This function is executed every time io finds an existing lock in
546 * the lock cache while creating new lock. This function has to decide whether
547 * cached lock "fits" into io.
549 * \param slice lock to be checked
550 * \param io IO that wants a lock.
552 * \see lov_lock_fits_into().
554 int ccc_lock_fits_into(const struct lu_env
*env
,
555 const struct cl_lock_slice
*slice
,
556 const struct cl_lock_descr
*need
,
557 const struct cl_io
*io
)
559 const struct cl_lock
*lock
= slice
->cls_lock
;
560 const struct cl_lock_descr
*descr
= &lock
->cll_descr
;
561 const struct ccc_io
*cio
= ccc_env_io(env
);
565 * Work around DLM peculiarity: it assumes that glimpse
566 * (LDLM_FL_HAS_INTENT) lock is always LCK_PR, and returns reads lock
567 * when asked for LCK_PW lock with LDLM_FL_HAS_INTENT flag set. Make
568 * sure that glimpse doesn't get CLM_WRITE top-lock, so that it
569 * doesn't enqueue CLM_WRITE sub-locks.
571 if (cio
->cui_glimpse
)
572 result
= descr
->cld_mode
!= CLM_WRITE
;
575 * Also, don't match incomplete write locks for read, otherwise read
576 * would enqueue missing sub-locks in the write mode.
578 else if (need
->cld_mode
!= descr
->cld_mode
)
579 result
= lock
->cll_state
>= CLS_ENQUEUED
;
586 * Implements cl_lock_operations::clo_state() method for ccc layer, invoked
587 * whenever lock state changes. Transfers object attributes, that might be
588 * updated as a result of lock acquiring into inode.
590 void ccc_lock_state(const struct lu_env
*env
,
591 const struct cl_lock_slice
*slice
,
592 enum cl_lock_state state
)
594 struct cl_lock
*lock
= slice
->cls_lock
;
597 * Refresh inode attributes when the lock is moving into CLS_HELD
598 * state, and only when this is a result of real enqueue, rather than
599 * of finding lock in the cache.
601 if (state
== CLS_HELD
&& lock
->cll_state
< CLS_HELD
) {
602 struct cl_object
*obj
;
605 obj
= slice
->cls_obj
;
606 inode
= ccc_object_inode(obj
);
608 /* vmtruncate() sets the i_size
609 * under both a DLM lock and the
610 * ll_inode_size_lock(). If we don't get the
611 * ll_inode_size_lock() here we can match the DLM lock and
612 * reset i_size. generic_file_write can then trust the
613 * stale i_size when doing appending writes and effectively
614 * cancel the result of the truncate. Getting the
615 * ll_inode_size_lock() after the enqueue maintains the DLM
616 * -> ll_inode_size_lock() acquiring order.
618 if (lock
->cll_descr
.cld_start
== 0 &&
619 lock
->cll_descr
.cld_end
== CL_PAGE_EOF
)
620 cl_merge_lvb(env
, inode
);
624 /*****************************************************************************
630 int ccc_io_one_lock_index(const struct lu_env
*env
, struct cl_io
*io
,
631 __u32 enqflags
, enum cl_lock_mode mode
,
632 pgoff_t start
, pgoff_t end
)
634 struct ccc_io
*cio
= ccc_env_io(env
);
635 struct cl_lock_descr
*descr
= &cio
->cui_link
.cill_descr
;
636 struct cl_object
*obj
= io
->ci_obj
;
638 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
640 CDEBUG(D_VFSTRACE
, "lock: %d [%lu, %lu]\n", mode
, start
, end
);
642 memset(&cio
->cui_link
, 0, sizeof(cio
->cui_link
));
644 if (cio
->cui_fd
&& (cio
->cui_fd
->fd_flags
& LL_FILE_GROUP_LOCKED
)) {
645 descr
->cld_mode
= CLM_GROUP
;
646 descr
->cld_gid
= cio
->cui_fd
->fd_grouplock
.cg_gid
;
648 descr
->cld_mode
= mode
;
650 descr
->cld_obj
= obj
;
651 descr
->cld_start
= start
;
652 descr
->cld_end
= end
;
653 descr
->cld_enq_flags
= enqflags
;
655 cl_io_lock_add(env
, io
, &cio
->cui_link
);
659 void ccc_io_update_iov(const struct lu_env
*env
,
660 struct ccc_io
*cio
, struct cl_io
*io
)
662 size_t size
= io
->u
.ci_rw
.crw_count
;
664 if (!cl_is_normalio(env
, io
) || !cio
->cui_iter
)
667 iov_iter_truncate(cio
->cui_iter
, size
);
670 int ccc_io_one_lock(const struct lu_env
*env
, struct cl_io
*io
,
671 __u32 enqflags
, enum cl_lock_mode mode
,
672 loff_t start
, loff_t end
)
674 struct cl_object
*obj
= io
->ci_obj
;
676 return ccc_io_one_lock_index(env
, io
, enqflags
, mode
,
677 cl_index(obj
, start
), cl_index(obj
, end
));
680 void ccc_io_end(const struct lu_env
*env
, const struct cl_io_slice
*ios
)
682 CLOBINVRNT(env
, ios
->cis_io
->ci_obj
,
683 ccc_object_invariant(ios
->cis_io
->ci_obj
));
686 void ccc_io_advance(const struct lu_env
*env
,
687 const struct cl_io_slice
*ios
,
690 struct ccc_io
*cio
= cl2ccc_io(env
, ios
);
691 struct cl_io
*io
= ios
->cis_io
;
692 struct cl_object
*obj
= ios
->cis_io
->ci_obj
;
694 CLOBINVRNT(env
, obj
, ccc_object_invariant(obj
));
696 if (!cl_is_normalio(env
, io
))
699 iov_iter_reexpand(cio
->cui_iter
, cio
->cui_tot_count
-= nob
);
703 * Helper function that if necessary adjusts file size (inode->i_size), when
704 * position at the offset \a pos is accessed. File size can be arbitrary stale
705 * on a Lustre client, but client at least knows KMS. If accessed area is
706 * inside [0, KMS], set file size to KMS, otherwise glimpse file size.
708 * Locking: cl_isize_lock is used to serialize changes to inode size and to
709 * protect consistency between inode size and cl_object
710 * attributes. cl_object_size_lock() protects consistency between cl_attr's of
711 * top-object and sub-objects.
713 int ccc_prep_size(const struct lu_env
*env
, struct cl_object
*obj
,
714 struct cl_io
*io
, loff_t start
, size_t count
, int *exceed
)
716 struct cl_attr
*attr
= ccc_env_thread_attr(env
);
717 struct inode
*inode
= ccc_object_inode(obj
);
718 loff_t pos
= start
+ count
- 1;
723 * Consistency guarantees: following possibilities exist for the
724 * relation between region being accessed and real file size at this
727 * (A): the region is completely inside of the file;
729 * (B-x): x bytes of region are inside of the file, the rest is
732 * (C): the region is completely outside of the file.
734 * This classification is stable under DLM lock already acquired by
735 * the caller, because to change the class, other client has to take
736 * DLM lock conflicting with our lock. Also, any updates to ->i_size
737 * by other threads on this client are serialized by
738 * ll_inode_size_lock(). This guarantees that short reads are handled
739 * correctly in the face of concurrent writes and truncates.
741 ccc_object_size_lock(obj
);
742 result
= cl_object_attr_get(env
, obj
, attr
);
747 * A glimpse is necessary to determine whether we
748 * return a short read (B) or some zeroes at the end
751 ccc_object_size_unlock(obj
);
752 result
= cl_glimpse_lock(env
, io
, inode
, obj
, 0);
753 if (result
== 0 && exceed
) {
754 /* If objective page index exceed end-of-file
755 * page index, return directly. Do not expect
756 * kernel will check such case correctly.
757 * linux-2.6.18-128.1.1 miss to do that.
760 loff_t size
= cl_isize_read(inode
);
761 loff_t cur_index
= start
>> PAGE_CACHE_SHIFT
;
762 loff_t size_index
= (size
- 1) >>
765 if ((size
== 0 && cur_index
!= 0) ||
766 size_index
< cur_index
)
772 * region is within kms and, hence, within real file
773 * size (A). We need to increase i_size to cover the
774 * read region so that generic_file_read() will do its
775 * job, but that doesn't mean the kms size is
776 * _correct_, it is only the _minimum_ size. If
777 * someone does a stat they will get the correct size
778 * which will always be >= the kms value here.
781 if (cl_isize_read(inode
) < kms
) {
782 cl_isize_write_nolock(inode
, kms
);
784 DFID
" updating i_size %llu\n",
785 PFID(lu_object_fid(&obj
->co_lu
)),
786 (__u64
)cl_isize_read(inode
));
790 ccc_object_size_unlock(obj
);
794 /*****************************************************************************
796 * Transfer operations.
800 void ccc_req_completion(const struct lu_env
*env
,
801 const struct cl_req_slice
*slice
, int ioret
)
806 cl_stats_tally(slice
->crs_dev
, slice
->crs_req
->crq_type
, ioret
);
808 vrq
= cl2ccc_req(slice
);
809 kmem_cache_free(ccc_req_kmem
, vrq
);
813 * Implementation of struct cl_req_operations::cro_attr_set() for ccc
814 * layer. ccc is responsible for
831 void ccc_req_attr_set(const struct lu_env
*env
,
832 const struct cl_req_slice
*slice
,
833 const struct cl_object
*obj
,
834 struct cl_req_attr
*attr
, u64 flags
)
841 inode
= ccc_object_inode(obj
);
842 valid_flags
= OBD_MD_FLTYPE
;
844 if (slice
->crs_req
->crq_type
== CRT_WRITE
) {
845 if (flags
& OBD_MD_FLEPOCH
) {
846 oa
->o_valid
|= OBD_MD_FLEPOCH
;
847 oa
->o_ioepoch
= cl_i2info(inode
)->lli_ioepoch
;
848 valid_flags
|= OBD_MD_FLMTIME
| OBD_MD_FLCTIME
|
849 OBD_MD_FLUID
| OBD_MD_FLGID
;
852 obdo_from_inode(oa
, inode
, valid_flags
& flags
);
853 obdo_set_parent_fid(oa
, &cl_i2info(inode
)->lli_fid
);
854 memcpy(attr
->cra_jobid
, cl_i2info(inode
)->lli_jobid
,
855 JOBSTATS_JOBID_SIZE
);
858 static const struct cl_req_operations ccc_req_ops
= {
859 .cro_attr_set
= ccc_req_attr_set
,
860 .cro_completion
= ccc_req_completion
863 int cl_setattr_ost(struct inode
*inode
, const struct iattr
*attr
)
870 env
= cl_env_get(&refcheck
);
874 io
= ccc_env_thread_io(env
);
875 io
->ci_obj
= cl_i2info(inode
)->lli_clob
;
877 io
->u
.ci_setattr
.sa_attr
.lvb_atime
= LTIME_S(attr
->ia_atime
);
878 io
->u
.ci_setattr
.sa_attr
.lvb_mtime
= LTIME_S(attr
->ia_mtime
);
879 io
->u
.ci_setattr
.sa_attr
.lvb_ctime
= LTIME_S(attr
->ia_ctime
);
880 io
->u
.ci_setattr
.sa_attr
.lvb_size
= attr
->ia_size
;
881 io
->u
.ci_setattr
.sa_valid
= attr
->ia_valid
;
884 if (cl_io_init(env
, io
, CIT_SETATTR
, io
->ci_obj
) == 0) {
885 struct ccc_io
*cio
= ccc_env_io(env
);
887 if (attr
->ia_valid
& ATTR_FILE
)
888 /* populate the file descriptor for ftruncate to honor
889 * group lock - see LU-787
891 cio
->cui_fd
= cl_iattr2fd(inode
, attr
);
893 result
= cl_io_loop(env
, io
);
895 result
= io
->ci_result
;
898 if (unlikely(io
->ci_need_restart
))
900 /* HSM import case: file is released, cannot be restored
901 * no need to fail except if restore registration failed
904 if (result
== -ENODATA
&& io
->ci_restore_needed
&&
905 io
->ci_result
!= -ENODATA
)
907 cl_env_put(env
, &refcheck
);
911 /*****************************************************************************
917 struct lu_device
*ccc2lu_dev(struct ccc_device
*vdv
)
919 return &vdv
->cdv_cl
.cd_lu_dev
;
922 struct ccc_device
*lu2ccc_dev(const struct lu_device
*d
)
924 return container_of0(d
, struct ccc_device
, cdv_cl
.cd_lu_dev
);
927 struct ccc_device
*cl2ccc_dev(const struct cl_device
*d
)
929 return container_of0(d
, struct ccc_device
, cdv_cl
);
932 struct lu_object
*ccc2lu(struct ccc_object
*vob
)
934 return &vob
->cob_cl
.co_lu
;
937 struct ccc_object
*lu2ccc(const struct lu_object
*obj
)
939 return container_of0(obj
, struct ccc_object
, cob_cl
.co_lu
);
942 struct ccc_object
*cl2ccc(const struct cl_object
*obj
)
944 return container_of0(obj
, struct ccc_object
, cob_cl
);
947 struct ccc_lock
*cl2ccc_lock(const struct cl_lock_slice
*slice
)
949 return container_of(slice
, struct ccc_lock
, clk_cl
);
952 struct ccc_io
*cl2ccc_io(const struct lu_env
*env
,
953 const struct cl_io_slice
*slice
)
957 cio
= container_of(slice
, struct ccc_io
, cui_cl
);
958 LASSERT(cio
== ccc_env_io(env
));
962 struct ccc_req
*cl2ccc_req(const struct cl_req_slice
*slice
)
964 return container_of0(slice
, struct ccc_req
, crq_cl
);
967 struct page
*cl2vm_page(const struct cl_page_slice
*slice
)
969 return cl2ccc_page(slice
)->cpg_page
;
972 /*****************************************************************************
977 int ccc_object_invariant(const struct cl_object
*obj
)
979 struct inode
*inode
= ccc_object_inode(obj
);
980 struct cl_inode_info
*lli
= cl_i2info(inode
);
982 return (S_ISREG(cl_inode_mode(inode
)) ||
983 /* i_mode of unlinked inode is zeroed. */
984 cl_inode_mode(inode
) == 0) && lli
->lli_clob
== obj
;
987 struct inode
*ccc_object_inode(const struct cl_object
*obj
)
989 return cl2ccc(obj
)->cob_inode
;
993 * Initialize or update CLIO structures for regular files when new
994 * meta-data arrives from the server.
996 * \param inode regular file inode
997 * \param md new file metadata from MDS
998 * - allocates cl_object if necessary,
999 * - updated layout, if object was already here.
1001 int cl_file_inode_init(struct inode
*inode
, struct lustre_md
*md
)
1004 struct cl_inode_info
*lli
;
1005 struct cl_object
*clob
;
1006 struct lu_site
*site
;
1008 struct cl_object_conf conf
= {
1017 LASSERT(md
->body
->valid
& OBD_MD_FLID
);
1018 LASSERT(S_ISREG(cl_inode_mode(inode
)));
1020 env
= cl_env_get(&refcheck
);
1022 return PTR_ERR(env
);
1024 site
= cl_i2sbi(inode
)->ll_site
;
1025 lli
= cl_i2info(inode
);
1026 fid
= &lli
->lli_fid
;
1027 LASSERT(fid_is_sane(fid
));
1029 if (!lli
->lli_clob
) {
1030 /* clob is slave of inode, empty lli_clob means for new inode,
1031 * there is no clob in cache with the given fid, so it is
1032 * unnecessary to perform lookup-alloc-lookup-insert, just
1033 * alloc and insert directly.
1035 LASSERT(inode
->i_state
& I_NEW
);
1036 conf
.coc_lu
.loc_flags
= LOC_F_NEW
;
1037 clob
= cl_object_find(env
, lu2cl_dev(site
->ls_top_dev
),
1039 if (!IS_ERR(clob
)) {
1041 * No locking is necessary, as new inode is
1042 * locked by I_NEW bit.
1044 lli
->lli_clob
= clob
;
1045 lli
->lli_has_smd
= lsm_has_objects(md
->lsm
);
1046 lu_object_ref_add(&clob
->co_lu
, "inode", inode
);
1048 result
= PTR_ERR(clob
);
1050 result
= cl_conf_set(env
, lli
->lli_clob
, &conf
);
1053 cl_env_put(env
, &refcheck
);
1056 CERROR("Failure to initialize cl object "DFID
": %d\n",
1062 * Wait for others drop their references of the object at first, then we drop
1063 * the last one, which will lead to the object be destroyed immediately.
1064 * Must be called after cl_object_kill() against this object.
1066 * The reason we want to do this is: destroying top object will wait for sub
1067 * objects being destroyed first, so we can't let bottom layer (e.g. from ASTs)
1068 * to initiate top object destroying which may deadlock. See bz22520.
1070 static void cl_object_put_last(struct lu_env
*env
, struct cl_object
*obj
)
1072 struct lu_object_header
*header
= obj
->co_lu
.lo_header
;
1073 wait_queue_t waiter
;
1075 if (unlikely(atomic_read(&header
->loh_ref
) != 1)) {
1076 struct lu_site
*site
= obj
->co_lu
.lo_dev
->ld_site
;
1077 struct lu_site_bkt_data
*bkt
;
1079 bkt
= lu_site_bkt_from_fid(site
, &header
->loh_fid
);
1081 init_waitqueue_entry(&waiter
, current
);
1082 add_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1085 set_current_state(TASK_UNINTERRUPTIBLE
);
1086 if (atomic_read(&header
->loh_ref
) == 1)
1091 set_current_state(TASK_RUNNING
);
1092 remove_wait_queue(&bkt
->lsb_marche_funebre
, &waiter
);
1095 cl_object_put(env
, obj
);
1098 void cl_inode_fini(struct inode
*inode
)
1101 struct cl_inode_info
*lli
= cl_i2info(inode
);
1102 struct cl_object
*clob
= lli
->lli_clob
;
1109 cookie
= cl_env_reenter();
1110 env
= cl_env_get(&refcheck
);
1111 emergency
= IS_ERR(env
);
1113 mutex_lock(&ccc_inode_fini_guard
);
1114 LASSERT(ccc_inode_fini_env
);
1115 cl_env_implant(ccc_inode_fini_env
, &refcheck
);
1116 env
= ccc_inode_fini_env
;
1119 * cl_object cache is a slave to inode cache (which, in turn
1120 * is a slave to dentry cache), don't keep cl_object in memory
1121 * when its master is evicted.
1123 cl_object_kill(env
, clob
);
1124 lu_object_ref_del(&clob
->co_lu
, "inode", inode
);
1125 cl_object_put_last(env
, clob
);
1126 lli
->lli_clob
= NULL
;
1128 cl_env_unplant(ccc_inode_fini_env
, &refcheck
);
1129 mutex_unlock(&ccc_inode_fini_guard
);
1131 cl_env_put(env
, &refcheck
);
1132 cl_env_reexit(cookie
);
1137 * return IF_* type for given lu_dirent entry.
1138 * IF_* flag shld be converted to particular OS file type in
1139 * platform llite module.
1141 __u16
ll_dirent_type_get(struct lu_dirent
*ent
)
1144 struct luda_type
*lt
;
1147 if (le32_to_cpu(ent
->lde_attrs
) & LUDA_TYPE
) {
1148 const unsigned align
= sizeof(struct luda_type
) - 1;
1150 len
= le16_to_cpu(ent
->lde_namelen
);
1151 len
= (len
+ align
) & ~align
;
1152 lt
= (void *)ent
->lde_name
+ len
;
1153 type
= IFTODT(le16_to_cpu(lt
->lt_type
));
1159 * build inode number from passed @fid
1161 __u64
cl_fid_build_ino(const struct lu_fid
*fid
, int api32
)
1163 if (BITS_PER_LONG
== 32 || api32
)
1164 return fid_flatten32(fid
);
1166 return fid_flatten(fid
);
1170 * build inode generation from passed @fid. If our FID overflows the 32-bit
1171 * inode number then return a non-zero generation to distinguish them.
1173 __u32
cl_fid_build_gen(const struct lu_fid
*fid
)
1177 if (fid_is_igif(fid
)) {
1178 gen
= lu_igif_gen(fid
);
1182 gen
= fid_flatten(fid
) >> 32;
1186 /* lsm is unreliable after hsm implementation as layout can be changed at
1187 * any time. This is only to support old, non-clio-ized interfaces. It will
1188 * cause deadlock if clio operations are called with this extra layout refcount
1189 * because in case the layout changed during the IO, ll_layout_refresh() will
1190 * have to wait for the refcount to become zero to destroy the older layout.
1192 * Notice that the lsm returned by this function may not be valid unless called
1193 * inside layout lock - MDS_INODELOCK_LAYOUT.
1195 struct lov_stripe_md
*ccc_inode_lsm_get(struct inode
*inode
)
1197 return lov_lsm_get(cl_i2info(inode
)->lli_clob
);
1200 inline void ccc_inode_lsm_put(struct inode
*inode
, struct lov_stripe_md
*lsm
)
1202 lov_lsm_put(cl_i2info(inode
)->lli_clob
, lsm
);