4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/fid/fid_request.c
38 * Lustre Sequence Manager
40 * Author: Yury Umanets <umka@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_FID
45 #include "../../include/linux/libcfs/libcfs.h"
46 #include <linux/module.h>
48 #include "../include/obd.h"
49 #include "../include/obd_class.h"
50 #include "../include/obd_support.h"
51 #include "../include/lustre_fid.h"
53 #include "../include/lustre_mdc.h"
54 #include "fid_internal.h"
56 static struct dentry
*seq_debugfs_dir
;
58 static int seq_client_rpc(struct lu_client_seq
*seq
,
59 struct lu_seq_range
*output
, __u32 opc
,
62 struct obd_export
*exp
= seq
->lcs_exp
;
63 struct ptlrpc_request
*req
;
64 struct lu_seq_range
*out
, *in
;
66 unsigned int debug_mask
;
69 req
= ptlrpc_request_alloc_pack(class_exp2cliimp(exp
), &RQF_SEQ_QUERY
,
70 LUSTRE_MDS_VERSION
, SEQ_QUERY
);
74 /* Init operation code */
75 op
= req_capsule_client_get(&req
->rq_pill
, &RMF_SEQ_OPC
);
78 /* Zero out input range, this is not recovery yet. */
79 in
= req_capsule_client_get(&req
->rq_pill
, &RMF_SEQ_RANGE
);
82 ptlrpc_request_set_replen(req
);
84 in
->lsr_index
= seq
->lcs_space
.lsr_index
;
85 if (seq
->lcs_type
== LUSTRE_SEQ_METADATA
)
86 fld_range_set_mdt(in
);
88 fld_range_set_ost(in
);
90 if (opc
== SEQ_ALLOC_SUPER
) {
91 req
->rq_request_portal
= SEQ_CONTROLLER_PORTAL
;
92 req
->rq_reply_portal
= MDC_REPLY_PORTAL
;
93 /* During allocating super sequence for data object,
94 * the current thread might hold the export of MDT0(MDT0
95 * precreating objects on this OST), and it will send the
96 * request to MDT0 here, so we can not keep resending the
97 * request here, otherwise if MDT0 is failed(umounted),
98 * it can not release the export of MDT0 */
99 if (seq
->lcs_type
== LUSTRE_SEQ_DATA
)
100 req
->rq_no_delay
= req
->rq_no_resend
= 1;
101 debug_mask
= D_CONSOLE
;
103 if (seq
->lcs_type
== LUSTRE_SEQ_METADATA
)
104 req
->rq_request_portal
= SEQ_METADATA_PORTAL
;
106 req
->rq_request_portal
= SEQ_DATA_PORTAL
;
110 ptlrpc_at_set_req_timeout(req
);
112 if (seq
->lcs_type
== LUSTRE_SEQ_METADATA
)
113 mdc_get_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
114 rc
= ptlrpc_queue_wait(req
);
115 if (seq
->lcs_type
== LUSTRE_SEQ_METADATA
)
116 mdc_put_rpc_lock(exp
->exp_obd
->u
.cli
.cl_rpc_lock
, NULL
);
120 out
= req_capsule_server_get(&req
->rq_pill
, &RMF_SEQ_RANGE
);
123 if (!range_is_sane(output
)) {
124 CERROR("%s: Invalid range received from server: "
125 DRANGE
"\n", seq
->lcs_name
, PRANGE(output
));
130 if (range_is_exhausted(output
)) {
131 CERROR("%s: Range received from server is exhausted: "
132 DRANGE
"]\n", seq
->lcs_name
, PRANGE(output
));
137 CDEBUG_LIMIT(debug_mask
, "%s: Allocated %s-sequence "DRANGE
"]\n",
138 seq
->lcs_name
, opcname
, PRANGE(output
));
141 ptlrpc_req_finished(req
);
145 /* Request sequence-controller node to allocate new super-sequence. */
146 int seq_client_alloc_super(struct lu_client_seq
*seq
,
147 const struct lu_env
*env
)
151 mutex_lock(&seq
->lcs_mutex
);
156 /* Check whether the connection to seq controller has been
157 * setup (lcs_exp != NULL) */
158 if (seq
->lcs_exp
== NULL
) {
159 mutex_unlock(&seq
->lcs_mutex
);
163 rc
= seq_client_rpc(seq
, &seq
->lcs_space
,
164 SEQ_ALLOC_SUPER
, "super");
166 mutex_unlock(&seq
->lcs_mutex
);
170 /* Request sequence-controller node to allocate new meta-sequence. */
171 static int seq_client_alloc_meta(const struct lu_env
*env
,
172 struct lu_client_seq
*seq
)
180 /* If meta server return -EINPROGRESS or EAGAIN,
181 * it means meta server might not be ready to
182 * allocate super sequence from sequence controller
184 rc
= seq_client_rpc(seq
, &seq
->lcs_space
,
185 SEQ_ALLOC_META
, "meta");
186 } while (rc
== -EINPROGRESS
|| rc
== -EAGAIN
);
192 /* Allocate new sequence for client. */
193 static int seq_client_alloc_seq(const struct lu_env
*env
,
194 struct lu_client_seq
*seq
, u64
*seqnr
)
198 LASSERT(range_is_sane(&seq
->lcs_space
));
200 if (range_is_exhausted(&seq
->lcs_space
)) {
201 rc
= seq_client_alloc_meta(env
, seq
);
203 CERROR("%s: Can't allocate new meta-sequence, rc %d\n",
207 CDEBUG(D_INFO
, "%s: New range - "DRANGE
"\n",
208 seq
->lcs_name
, PRANGE(&seq
->lcs_space
));
213 LASSERT(!range_is_exhausted(&seq
->lcs_space
));
214 *seqnr
= seq
->lcs_space
.lsr_start
;
215 seq
->lcs_space
.lsr_start
+= 1;
217 CDEBUG(D_INFO
, "%s: Allocated sequence [%#llx]\n", seq
->lcs_name
,
223 static int seq_fid_alloc_prep(struct lu_client_seq
*seq
,
226 if (seq
->lcs_update
) {
227 add_wait_queue(&seq
->lcs_waitq
, link
);
228 set_current_state(TASK_UNINTERRUPTIBLE
);
229 mutex_unlock(&seq
->lcs_mutex
);
233 mutex_lock(&seq
->lcs_mutex
);
234 remove_wait_queue(&seq
->lcs_waitq
, link
);
235 set_current_state(TASK_RUNNING
);
239 mutex_unlock(&seq
->lcs_mutex
);
243 static void seq_fid_alloc_fini(struct lu_client_seq
*seq
)
245 LASSERT(seq
->lcs_update
== 1);
246 mutex_lock(&seq
->lcs_mutex
);
248 wake_up(&seq
->lcs_waitq
);
252 * Allocate the whole seq to the caller.
254 int seq_client_get_seq(const struct lu_env
*env
,
255 struct lu_client_seq
*seq
, u64
*seqnr
)
260 LASSERT(seqnr
!= NULL
);
261 mutex_lock(&seq
->lcs_mutex
);
262 init_waitqueue_entry(&link
, current
);
265 rc
= seq_fid_alloc_prep(seq
, &link
);
270 rc
= seq_client_alloc_seq(env
, seq
, seqnr
);
272 CERROR("%s: Can't allocate new sequence, rc %d\n",
274 seq_fid_alloc_fini(seq
);
275 mutex_unlock(&seq
->lcs_mutex
);
279 CDEBUG(D_INFO
, "%s: allocate sequence [0x%16.16Lx]\n",
280 seq
->lcs_name
, *seqnr
);
282 /* Since the caller require the whole seq,
283 * so marked this seq to be used */
284 if (seq
->lcs_type
== LUSTRE_SEQ_METADATA
)
285 seq
->lcs_fid
.f_oid
= LUSTRE_METADATA_SEQ_MAX_WIDTH
;
287 seq
->lcs_fid
.f_oid
= LUSTRE_DATA_SEQ_MAX_WIDTH
;
289 seq
->lcs_fid
.f_seq
= *seqnr
;
290 seq
->lcs_fid
.f_ver
= 0;
292 * Inform caller that sequence switch is performed to allow it
293 * to setup FLD for it.
295 seq_fid_alloc_fini(seq
);
296 mutex_unlock(&seq
->lcs_mutex
);
300 EXPORT_SYMBOL(seq_client_get_seq
);
302 /* Allocate new fid on passed client @seq and save it to @fid. */
303 int seq_client_alloc_fid(const struct lu_env
*env
,
304 struct lu_client_seq
*seq
, struct lu_fid
*fid
)
309 LASSERT(seq
!= NULL
);
310 LASSERT(fid
!= NULL
);
312 init_waitqueue_entry(&link
, current
);
313 mutex_lock(&seq
->lcs_mutex
);
315 if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST
))
316 seq
->lcs_fid
.f_oid
= seq
->lcs_width
;
321 if (!fid_is_zero(&seq
->lcs_fid
) &&
322 fid_oid(&seq
->lcs_fid
) < seq
->lcs_width
) {
323 /* Just bump last allocated fid and return to caller. */
324 seq
->lcs_fid
.f_oid
+= 1;
329 rc
= seq_fid_alloc_prep(seq
, &link
);
333 rc
= seq_client_alloc_seq(env
, seq
, &seqnr
);
335 CERROR("%s: Can't allocate new sequence, rc %d\n",
337 seq_fid_alloc_fini(seq
);
338 mutex_unlock(&seq
->lcs_mutex
);
342 CDEBUG(D_INFO
, "%s: Switch to sequence [0x%16.16Lx]\n",
343 seq
->lcs_name
, seqnr
);
345 seq
->lcs_fid
.f_oid
= LUSTRE_FID_INIT_OID
;
346 seq
->lcs_fid
.f_seq
= seqnr
;
347 seq
->lcs_fid
.f_ver
= 0;
350 * Inform caller that sequence switch is performed to allow it
351 * to setup FLD for it.
355 seq_fid_alloc_fini(seq
);
360 mutex_unlock(&seq
->lcs_mutex
);
362 CDEBUG(D_INFO
, "%s: Allocated FID "DFID
"\n", seq
->lcs_name
, PFID(fid
));
365 EXPORT_SYMBOL(seq_client_alloc_fid
);
368 * Finish the current sequence due to disconnect.
369 * See mdc_import_event()
371 void seq_client_flush(struct lu_client_seq
*seq
)
375 LASSERT(seq
!= NULL
);
376 init_waitqueue_entry(&link
, current
);
377 mutex_lock(&seq
->lcs_mutex
);
379 while (seq
->lcs_update
) {
380 add_wait_queue(&seq
->lcs_waitq
, &link
);
381 set_current_state(TASK_UNINTERRUPTIBLE
);
382 mutex_unlock(&seq
->lcs_mutex
);
386 mutex_lock(&seq
->lcs_mutex
);
387 remove_wait_queue(&seq
->lcs_waitq
, &link
);
388 set_current_state(TASK_RUNNING
);
391 fid_zero(&seq
->lcs_fid
);
393 * this id shld not be used for seq range allocation.
394 * set to -1 for dgb check.
397 seq
->lcs_space
.lsr_index
= -1;
399 range_init(&seq
->lcs_space
);
400 mutex_unlock(&seq
->lcs_mutex
);
402 EXPORT_SYMBOL(seq_client_flush
);
404 static void seq_client_debugfs_fini(struct lu_client_seq
*seq
)
406 if (!IS_ERR_OR_NULL(seq
->lcs_debugfs_entry
))
407 ldebugfs_remove(&seq
->lcs_debugfs_entry
);
410 static int seq_client_debugfs_init(struct lu_client_seq
*seq
)
414 seq
->lcs_debugfs_entry
= ldebugfs_register(seq
->lcs_name
,
418 if (IS_ERR_OR_NULL(seq
->lcs_debugfs_entry
)) {
419 CERROR("%s: LdebugFS failed in seq-init\n", seq
->lcs_name
);
420 rc
= seq
->lcs_debugfs_entry
? PTR_ERR(seq
->lcs_debugfs_entry
)
422 seq
->lcs_debugfs_entry
= NULL
;
426 rc
= ldebugfs_add_vars(seq
->lcs_debugfs_entry
,
427 seq_client_debugfs_list
, seq
);
429 CERROR("%s: Can't init sequence manager debugfs, rc %d\n",
437 seq_client_debugfs_fini(seq
);
441 int seq_client_init(struct lu_client_seq
*seq
,
442 struct obd_export
*exp
,
443 enum lu_cli_type type
,
445 struct lu_server_seq
*srv
)
449 LASSERT(seq
!= NULL
);
450 LASSERT(prefix
!= NULL
);
453 seq
->lcs_type
= type
;
455 mutex_init(&seq
->lcs_mutex
);
456 if (type
== LUSTRE_SEQ_METADATA
)
457 seq
->lcs_width
= LUSTRE_METADATA_SEQ_MAX_WIDTH
;
459 seq
->lcs_width
= LUSTRE_DATA_SEQ_MAX_WIDTH
;
461 init_waitqueue_head(&seq
->lcs_waitq
);
462 /* Make sure that things are clear before work is started. */
463 seq_client_flush(seq
);
466 seq
->lcs_exp
= class_export_get(exp
);
467 else if (type
== LUSTRE_SEQ_METADATA
)
468 LASSERT(seq
->lcs_srv
!= NULL
);
470 snprintf(seq
->lcs_name
, sizeof(seq
->lcs_name
),
473 rc
= seq_client_debugfs_init(seq
);
475 seq_client_fini(seq
);
478 EXPORT_SYMBOL(seq_client_init
);
480 void seq_client_fini(struct lu_client_seq
*seq
)
482 seq_client_debugfs_fini(seq
);
484 if (seq
->lcs_exp
!= NULL
) {
485 class_export_put(seq
->lcs_exp
);
491 EXPORT_SYMBOL(seq_client_fini
);
493 int client_fid_init(struct obd_device
*obd
,
494 struct obd_export
*exp
, enum lu_cli_type type
)
496 struct client_obd
*cli
= &obd
->u
.cli
;
500 cli
->cl_seq
= kzalloc(sizeof(*cli
->cl_seq
), GFP_NOFS
);
504 prefix
= kzalloc(MAX_OBD_NAME
+ 5, GFP_NOFS
);
510 snprintf(prefix
, MAX_OBD_NAME
+ 5, "cli-%s", obd
->obd_name
);
512 /* Init client side sequence-manager */
513 rc
= seq_client_init(cli
->cl_seq
, exp
, type
, prefix
, NULL
);
524 EXPORT_SYMBOL(client_fid_init
);
526 int client_fid_fini(struct obd_device
*obd
)
528 struct client_obd
*cli
= &obd
->u
.cli
;
530 if (cli
->cl_seq
!= NULL
) {
531 seq_client_fini(cli
->cl_seq
);
538 EXPORT_SYMBOL(client_fid_fini
);
540 static int __init
fid_mod_init(void)
542 seq_debugfs_dir
= ldebugfs_register(LUSTRE_SEQ_NAME
,
545 return PTR_ERR_OR_ZERO(seq_debugfs_dir
);
548 static void __exit
fid_mod_exit(void)
550 if (!IS_ERR_OR_NULL(seq_debugfs_dir
))
551 ldebugfs_remove(&seq_debugfs_dir
);
554 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
555 MODULE_DESCRIPTION("Lustre FID Module");
556 MODULE_LICENSE("GPL");
557 MODULE_VERSION("0.1.0");
559 module_init(fid_mod_init
);
560 module_exit(fid_mod_exit
);