4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/selftest/framework.c
38 * Author: Isaac Huang <isaac@clusterfs.com>
39 * Author: Liang Zhen <liangzhen@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_LNET
46 lst_sid_t LST_INVALID_SID
= {LNET_NID_ANY
, -1};
48 static int session_timeout
= 100;
49 module_param(session_timeout
, int, 0444);
50 MODULE_PARM_DESC(session_timeout
, "test session timeout in seconds (100 by default, 0 == never)");
52 static int rpc_timeout
= 64;
53 module_param(rpc_timeout
, int, 0644);
54 MODULE_PARM_DESC(rpc_timeout
, "rpc timeout in seconds (64 by default, 0 == never)");
56 #define sfw_unpack_id(id) \
58 __swab64s(&(id).nid); \
59 __swab32s(&(id).pid); \
62 #define sfw_unpack_sid(sid) \
64 __swab64s(&(sid).ses_nid); \
65 __swab64s(&(sid).ses_stamp); \
68 #define sfw_unpack_fw_counters(fc) \
70 __swab32s(&(fc).running_ms); \
71 __swab32s(&(fc).active_batches); \
72 __swab32s(&(fc).zombie_sessions); \
73 __swab32s(&(fc).brw_errors); \
74 __swab32s(&(fc).ping_errors); \
77 #define sfw_unpack_rpc_counters(rc) \
79 __swab32s(&(rc).errors); \
80 __swab32s(&(rc).rpcs_sent); \
81 __swab32s(&(rc).rpcs_rcvd); \
82 __swab32s(&(rc).rpcs_dropped); \
83 __swab32s(&(rc).rpcs_expired); \
84 __swab64s(&(rc).bulk_get); \
85 __swab64s(&(rc).bulk_put); \
88 #define sfw_unpack_lnet_counters(lc) \
90 __swab32s(&(lc).errors); \
91 __swab32s(&(lc).msgs_max); \
92 __swab32s(&(lc).msgs_alloc); \
93 __swab32s(&(lc).send_count); \
94 __swab32s(&(lc).recv_count); \
95 __swab32s(&(lc).drop_count); \
96 __swab32s(&(lc).route_count); \
97 __swab64s(&(lc).send_length); \
98 __swab64s(&(lc).recv_length); \
99 __swab64s(&(lc).drop_length); \
100 __swab64s(&(lc).route_length); \
103 #define sfw_test_active(t) (atomic_read(&(t)->tsi_nactive))
104 #define sfw_batch_active(b) (atomic_read(&(b)->bat_nactive))
106 static struct smoketest_framework
{
107 struct list_head fw_zombie_rpcs
; /* RPCs to be recycled */
108 struct list_head fw_zombie_sessions
; /* stopping sessions */
109 struct list_head fw_tests
; /* registered test cases */
110 atomic_t fw_nzombies
; /* # zombie sessions */
111 spinlock_t fw_lock
; /* serialise */
112 sfw_session_t
*fw_session
; /* _the_ session */
113 int fw_shuttingdown
; /* shutdown in progress */
114 struct srpc_server_rpc
*fw_active_srpc
;/* running RPC */
118 int sfw_stop_batch(sfw_batch_t
*tsb
, int force
);
119 void sfw_destroy_session(sfw_session_t
*sn
);
121 static inline sfw_test_case_t
*
122 sfw_find_test_case(int id
)
124 sfw_test_case_t
*tsc
;
126 LASSERT(id
<= SRPC_SERVICE_MAX_ID
);
127 LASSERT(id
> SRPC_FRAMEWORK_SERVICE_MAX_ID
);
129 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
130 if (tsc
->tsc_srv_service
->sv_id
== id
)
138 sfw_register_test(srpc_service_t
*service
, sfw_test_client_ops_t
*cliops
)
140 sfw_test_case_t
*tsc
;
142 if (sfw_find_test_case(service
->sv_id
)) {
143 CERROR("Failed to register test %s (%d)\n",
144 service
->sv_name
, service
->sv_id
);
148 LIBCFS_ALLOC(tsc
, sizeof(sfw_test_case_t
));
152 tsc
->tsc_cli_ops
= cliops
;
153 tsc
->tsc_srv_service
= service
;
155 list_add_tail(&tsc
->tsc_list
, &sfw_data
.fw_tests
);
160 sfw_add_session_timer(void)
162 sfw_session_t
*sn
= sfw_data
.fw_session
;
163 struct stt_timer
*timer
= &sn
->sn_timer
;
165 LASSERT(!sfw_data
.fw_shuttingdown
);
167 if (!sn
|| !sn
->sn_timeout
)
170 LASSERT(!sn
->sn_timer_active
);
172 sn
->sn_timer_active
= 1;
173 timer
->stt_expires
= ktime_get_real_seconds() + sn
->sn_timeout
;
174 stt_add_timer(timer
);
178 sfw_del_session_timer(void)
180 sfw_session_t
*sn
= sfw_data
.fw_session
;
182 if (!sn
|| !sn
->sn_timer_active
)
185 LASSERT(sn
->sn_timeout
);
187 if (stt_del_timer(&sn
->sn_timer
)) { /* timer defused */
188 sn
->sn_timer_active
= 0;
192 return EBUSY
; /* racing with sfw_session_expired() */
196 sfw_deactivate_session(void)
197 __must_hold(&sfw_data
.fw_lock
)
199 sfw_session_t
*sn
= sfw_data
.fw_session
;
202 sfw_test_case_t
*tsc
;
207 LASSERT(!sn
->sn_timer_active
);
209 sfw_data
.fw_session
= NULL
;
210 atomic_inc(&sfw_data
.fw_nzombies
);
211 list_add(&sn
->sn_list
, &sfw_data
.fw_zombie_sessions
);
213 spin_unlock(&sfw_data
.fw_lock
);
215 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
216 srpc_abort_service(tsc
->tsc_srv_service
);
219 spin_lock(&sfw_data
.fw_lock
);
221 list_for_each_entry(tsb
, &sn
->sn_batches
, bat_list
) {
222 if (sfw_batch_active(tsb
)) {
224 sfw_stop_batch(tsb
, 1);
229 return; /* wait for active batches to stop */
231 list_del_init(&sn
->sn_list
);
232 spin_unlock(&sfw_data
.fw_lock
);
234 sfw_destroy_session(sn
);
236 spin_lock(&sfw_data
.fw_lock
);
240 sfw_session_expired(void *data
)
242 sfw_session_t
*sn
= data
;
244 spin_lock(&sfw_data
.fw_lock
);
246 LASSERT(sn
->sn_timer_active
);
247 LASSERT(sn
== sfw_data
.fw_session
);
249 CWARN("Session expired! sid: %s-%llu, name: %s\n",
250 libcfs_nid2str(sn
->sn_id
.ses_nid
),
251 sn
->sn_id
.ses_stamp
, &sn
->sn_name
[0]);
253 sn
->sn_timer_active
= 0;
254 sfw_deactivate_session();
256 spin_unlock(&sfw_data
.fw_lock
);
260 sfw_init_session(sfw_session_t
*sn
, lst_sid_t sid
,
261 unsigned features
, const char *name
)
263 struct stt_timer
*timer
= &sn
->sn_timer
;
265 memset(sn
, 0, sizeof(sfw_session_t
));
266 INIT_LIST_HEAD(&sn
->sn_list
);
267 INIT_LIST_HEAD(&sn
->sn_batches
);
268 atomic_set(&sn
->sn_refcount
, 1); /* +1 for caller */
269 atomic_set(&sn
->sn_brw_errors
, 0);
270 atomic_set(&sn
->sn_ping_errors
, 0);
271 strlcpy(&sn
->sn_name
[0], name
, sizeof(sn
->sn_name
));
273 sn
->sn_timer_active
= 0;
275 sn
->sn_features
= features
;
276 sn
->sn_timeout
= session_timeout
;
277 sn
->sn_started
= cfs_time_current();
279 timer
->stt_data
= sn
;
280 timer
->stt_func
= sfw_session_expired
;
281 INIT_LIST_HEAD(&timer
->stt_list
);
284 /* completion handler for incoming framework RPCs */
286 sfw_server_rpc_done(struct srpc_server_rpc
*rpc
)
288 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
289 int status
= rpc
->srpc_status
;
291 CDEBUG(D_NET
, "Incoming framework RPC done: service %s, peer %s, status %s:%d\n",
292 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
),
293 swi_state2str(rpc
->srpc_wi
.swi_state
),
301 sfw_client_rpc_fini(srpc_client_rpc_t
*rpc
)
303 LASSERT(!rpc
->crpc_bulk
.bk_niov
);
304 LASSERT(list_empty(&rpc
->crpc_list
));
305 LASSERT(!atomic_read(&rpc
->crpc_refcount
));
307 CDEBUG(D_NET
, "Outgoing framework RPC done: service %d, peer %s, status %s:%d:%d\n",
308 rpc
->crpc_service
, libcfs_id2str(rpc
->crpc_dest
),
309 swi_state2str(rpc
->crpc_wi
.swi_state
),
310 rpc
->crpc_aborted
, rpc
->crpc_status
);
312 spin_lock(&sfw_data
.fw_lock
);
314 /* my callers must finish all RPCs before shutting me down */
315 LASSERT(!sfw_data
.fw_shuttingdown
);
316 list_add(&rpc
->crpc_list
, &sfw_data
.fw_zombie_rpcs
);
318 spin_unlock(&sfw_data
.fw_lock
);
322 sfw_find_batch(lst_bid_t bid
)
324 sfw_session_t
*sn
= sfw_data
.fw_session
;
329 list_for_each_entry(bat
, &sn
->sn_batches
, bat_list
) {
330 if (bat
->bat_id
.bat_id
== bid
.bat_id
)
338 sfw_bid2batch(lst_bid_t bid
)
340 sfw_session_t
*sn
= sfw_data
.fw_session
;
345 bat
= sfw_find_batch(bid
);
349 LIBCFS_ALLOC(bat
, sizeof(sfw_batch_t
));
354 bat
->bat_session
= sn
;
356 atomic_set(&bat
->bat_nactive
, 0);
357 INIT_LIST_HEAD(&bat
->bat_tests
);
359 list_add_tail(&bat
->bat_list
, &sn
->sn_batches
);
364 sfw_get_stats(srpc_stat_reqst_t
*request
, srpc_stat_reply_t
*reply
)
366 sfw_session_t
*sn
= sfw_data
.fw_session
;
367 sfw_counters_t
*cnt
= &reply
->str_fw
;
370 reply
->str_sid
= !sn
? LST_INVALID_SID
: sn
->sn_id
;
372 if (request
->str_sid
.ses_nid
== LNET_NID_ANY
) {
373 reply
->str_status
= EINVAL
;
377 if (!sn
|| !sfw_sid_equal(request
->str_sid
, sn
->sn_id
)) {
378 reply
->str_status
= ESRCH
;
382 lnet_counters_get(&reply
->str_lnet
);
383 srpc_get_counters(&reply
->str_rpc
);
386 * send over the msecs since the session was started
387 * with 32 bits to send, this is ~49 days
389 cnt
->running_ms
= jiffies_to_msecs(jiffies
- sn
->sn_started
);
390 cnt
->brw_errors
= atomic_read(&sn
->sn_brw_errors
);
391 cnt
->ping_errors
= atomic_read(&sn
->sn_ping_errors
);
392 cnt
->zombie_sessions
= atomic_read(&sfw_data
.fw_nzombies
);
394 cnt
->active_batches
= 0;
395 list_for_each_entry(bat
, &sn
->sn_batches
, bat_list
) {
396 if (atomic_read(&bat
->bat_nactive
) > 0)
397 cnt
->active_batches
++;
400 reply
->str_status
= 0;
405 sfw_make_session(srpc_mksn_reqst_t
*request
, srpc_mksn_reply_t
*reply
)
407 sfw_session_t
*sn
= sfw_data
.fw_session
;
408 srpc_msg_t
*msg
= container_of(request
, srpc_msg_t
,
409 msg_body
.mksn_reqst
);
412 if (request
->mksn_sid
.ses_nid
== LNET_NID_ANY
) {
413 reply
->mksn_sid
= !sn
? LST_INVALID_SID
: sn
->sn_id
;
414 reply
->mksn_status
= EINVAL
;
419 reply
->mksn_status
= 0;
420 reply
->mksn_sid
= sn
->sn_id
;
421 reply
->mksn_timeout
= sn
->sn_timeout
;
423 if (sfw_sid_equal(request
->mksn_sid
, sn
->sn_id
)) {
424 atomic_inc(&sn
->sn_refcount
);
428 if (!request
->mksn_force
) {
429 reply
->mksn_status
= EBUSY
;
430 cplen
= strlcpy(&reply
->mksn_name
[0], &sn
->sn_name
[0],
431 sizeof(reply
->mksn_name
));
432 if (cplen
>= sizeof(reply
->mksn_name
))
439 * reject the request if it requires unknown features
440 * NB: old version will always accept all features because it's not
441 * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also
442 * harmless because it will return zero feature to console, and it's
443 * console's responsibility to make sure all nodes in a session have
446 if (msg
->msg_ses_feats
& ~LST_FEATS_MASK
) {
447 reply
->mksn_status
= EPROTO
;
451 /* brand new or create by force */
452 LIBCFS_ALLOC(sn
, sizeof(sfw_session_t
));
454 CERROR("dropping RPC mksn under memory pressure\n");
458 sfw_init_session(sn
, request
->mksn_sid
,
459 msg
->msg_ses_feats
, &request
->mksn_name
[0]);
461 spin_lock(&sfw_data
.fw_lock
);
463 sfw_deactivate_session();
464 LASSERT(!sfw_data
.fw_session
);
465 sfw_data
.fw_session
= sn
;
467 spin_unlock(&sfw_data
.fw_lock
);
469 reply
->mksn_status
= 0;
470 reply
->mksn_sid
= sn
->sn_id
;
471 reply
->mksn_timeout
= sn
->sn_timeout
;
476 sfw_remove_session(srpc_rmsn_reqst_t
*request
, srpc_rmsn_reply_t
*reply
)
478 sfw_session_t
*sn
= sfw_data
.fw_session
;
480 reply
->rmsn_sid
= !sn
? LST_INVALID_SID
: sn
->sn_id
;
482 if (request
->rmsn_sid
.ses_nid
== LNET_NID_ANY
) {
483 reply
->rmsn_status
= EINVAL
;
487 if (!sn
|| !sfw_sid_equal(request
->rmsn_sid
, sn
->sn_id
)) {
488 reply
->rmsn_status
= !sn
? ESRCH
: EBUSY
;
492 if (!atomic_dec_and_test(&sn
->sn_refcount
)) {
493 reply
->rmsn_status
= 0;
497 spin_lock(&sfw_data
.fw_lock
);
498 sfw_deactivate_session();
499 spin_unlock(&sfw_data
.fw_lock
);
501 reply
->rmsn_status
= 0;
502 reply
->rmsn_sid
= LST_INVALID_SID
;
503 LASSERT(!sfw_data
.fw_session
);
508 sfw_debug_session(srpc_debug_reqst_t
*request
, srpc_debug_reply_t
*reply
)
510 sfw_session_t
*sn
= sfw_data
.fw_session
;
513 reply
->dbg_status
= ESRCH
;
514 reply
->dbg_sid
= LST_INVALID_SID
;
518 reply
->dbg_status
= 0;
519 reply
->dbg_sid
= sn
->sn_id
;
520 reply
->dbg_timeout
= sn
->sn_timeout
;
521 if (strlcpy(reply
->dbg_name
, &sn
->sn_name
[0], sizeof(reply
->dbg_name
))
522 >= sizeof(reply
->dbg_name
))
529 sfw_test_rpc_fini(srpc_client_rpc_t
*rpc
)
531 sfw_test_unit_t
*tsu
= rpc
->crpc_priv
;
532 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
534 /* Called with hold of tsi->tsi_lock */
535 LASSERT(list_empty(&rpc
->crpc_list
));
536 list_add(&rpc
->crpc_list
, &tsi
->tsi_free_rpcs
);
540 sfw_test_buffers(sfw_test_instance_t
*tsi
)
542 struct sfw_test_case
*tsc
;
543 struct srpc_service
*svc
;
547 tsc
= sfw_find_test_case(tsi
->tsi_service
);
549 svc
= tsc
->tsc_srv_service
;
552 nbuf
= min(svc
->sv_wi_total
, tsi
->tsi_loop
) / svc
->sv_ncpts
;
553 return max(SFW_TEST_WI_MIN
, nbuf
+ SFW_TEST_WI_EXTRA
);
557 sfw_load_test(struct sfw_test_instance
*tsi
)
559 struct sfw_test_case
*tsc
;
560 struct srpc_service
*svc
;
565 tsc
= sfw_find_test_case(tsi
->tsi_service
);
566 nbuf
= sfw_test_buffers(tsi
);
568 svc
= tsc
->tsc_srv_service
;
570 if (tsi
->tsi_is_client
) {
571 tsi
->tsi_ops
= tsc
->tsc_cli_ops
;
575 rc
= srpc_service_add_buffers(svc
, nbuf
);
577 CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
578 svc
->sv_name
, nbuf
, rc
);
580 * NB: this error handler is not strictly correct, because
581 * it may release more buffers than already allocated,
582 * but it doesn't matter because request portal should
583 * be lazy portal and will grow buffers if necessary.
585 srpc_service_remove_buffers(svc
, nbuf
);
589 CDEBUG(D_NET
, "Reserved %d buffers for test %s\n",
590 nbuf
* (srpc_serv_is_framework(svc
) ?
591 2 : cfs_cpt_number(cfs_cpt_table
)), svc
->sv_name
);
596 sfw_unload_test(struct sfw_test_instance
*tsi
)
598 struct sfw_test_case
*tsc
;
601 tsc
= sfw_find_test_case(tsi
->tsi_service
);
604 if (tsi
->tsi_is_client
)
608 * shrink buffers, because request portal is lazy portal
609 * which can grow buffers at runtime so we may leave
610 * some buffers behind, but never mind...
612 srpc_service_remove_buffers(tsc
->tsc_srv_service
,
613 sfw_test_buffers(tsi
));
617 sfw_destroy_test_instance(sfw_test_instance_t
*tsi
)
619 srpc_client_rpc_t
*rpc
;
620 sfw_test_unit_t
*tsu
;
622 if (!tsi
->tsi_is_client
)
625 tsi
->tsi_ops
->tso_fini(tsi
);
627 LASSERT(!tsi
->tsi_stopping
);
628 LASSERT(list_empty(&tsi
->tsi_active_rpcs
));
629 LASSERT(!sfw_test_active(tsi
));
631 while (!list_empty(&tsi
->tsi_units
)) {
632 tsu
= list_entry(tsi
->tsi_units
.next
,
633 sfw_test_unit_t
, tsu_list
);
634 list_del(&tsu
->tsu_list
);
635 LIBCFS_FREE(tsu
, sizeof(*tsu
));
638 while (!list_empty(&tsi
->tsi_free_rpcs
)) {
639 rpc
= list_entry(tsi
->tsi_free_rpcs
.next
,
640 srpc_client_rpc_t
, crpc_list
);
641 list_del(&rpc
->crpc_list
);
642 LIBCFS_FREE(rpc
, srpc_client_rpc_size(rpc
));
646 sfw_unload_test(tsi
);
647 LIBCFS_FREE(tsi
, sizeof(*tsi
));
651 sfw_destroy_batch(sfw_batch_t
*tsb
)
653 sfw_test_instance_t
*tsi
;
655 LASSERT(!sfw_batch_active(tsb
));
656 LASSERT(list_empty(&tsb
->bat_list
));
658 while (!list_empty(&tsb
->bat_tests
)) {
659 tsi
= list_entry(tsb
->bat_tests
.next
,
660 sfw_test_instance_t
, tsi_list
);
661 list_del_init(&tsi
->tsi_list
);
662 sfw_destroy_test_instance(tsi
);
665 LIBCFS_FREE(tsb
, sizeof(sfw_batch_t
));
669 sfw_destroy_session(sfw_session_t
*sn
)
673 LASSERT(list_empty(&sn
->sn_list
));
674 LASSERT(sn
!= sfw_data
.fw_session
);
676 while (!list_empty(&sn
->sn_batches
)) {
677 batch
= list_entry(sn
->sn_batches
.next
,
678 sfw_batch_t
, bat_list
);
679 list_del_init(&batch
->bat_list
);
680 sfw_destroy_batch(batch
);
683 LIBCFS_FREE(sn
, sizeof(*sn
));
684 atomic_dec(&sfw_data
.fw_nzombies
);
688 sfw_unpack_addtest_req(srpc_msg_t
*msg
)
690 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
692 LASSERT(msg
->msg_type
== SRPC_MSG_TEST_REQST
);
693 LASSERT(req
->tsr_is_client
);
695 if (msg
->msg_magic
== SRPC_MSG_MAGIC
)
696 return; /* no flipping needed */
698 LASSERT(msg
->msg_magic
== __swab32(SRPC_MSG_MAGIC
));
700 if (req
->tsr_service
== SRPC_SERVICE_BRW
) {
701 if (!(msg
->msg_ses_feats
& LST_FEAT_BULK_LEN
)) {
702 test_bulk_req_t
*bulk
= &req
->tsr_u
.bulk_v0
;
704 __swab32s(&bulk
->blk_opc
);
705 __swab32s(&bulk
->blk_npg
);
706 __swab32s(&bulk
->blk_flags
);
709 test_bulk_req_v1_t
*bulk
= &req
->tsr_u
.bulk_v1
;
711 __swab16s(&bulk
->blk_opc
);
712 __swab16s(&bulk
->blk_flags
);
713 __swab32s(&bulk
->blk_offset
);
714 __swab32s(&bulk
->blk_len
);
720 if (req
->tsr_service
== SRPC_SERVICE_PING
) {
721 test_ping_req_t
*ping
= &req
->tsr_u
.ping
;
723 __swab32s(&ping
->png_size
);
724 __swab32s(&ping
->png_flags
);
732 sfw_add_test_instance(sfw_batch_t
*tsb
, struct srpc_server_rpc
*rpc
)
734 srpc_msg_t
*msg
= &rpc
->srpc_reqstbuf
->buf_msg
;
735 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
736 srpc_bulk_t
*bk
= rpc
->srpc_bulk
;
737 int ndest
= req
->tsr_ndest
;
738 sfw_test_unit_t
*tsu
;
739 sfw_test_instance_t
*tsi
;
743 LIBCFS_ALLOC(tsi
, sizeof(*tsi
));
745 CERROR("Can't allocate test instance for batch: %llu\n",
750 spin_lock_init(&tsi
->tsi_lock
);
751 atomic_set(&tsi
->tsi_nactive
, 0);
752 INIT_LIST_HEAD(&tsi
->tsi_units
);
753 INIT_LIST_HEAD(&tsi
->tsi_free_rpcs
);
754 INIT_LIST_HEAD(&tsi
->tsi_active_rpcs
);
756 tsi
->tsi_stopping
= 0;
757 tsi
->tsi_batch
= tsb
;
758 tsi
->tsi_loop
= req
->tsr_loop
;
759 tsi
->tsi_concur
= req
->tsr_concur
;
760 tsi
->tsi_service
= req
->tsr_service
;
761 tsi
->tsi_is_client
= !!(req
->tsr_is_client
);
762 tsi
->tsi_stoptsu_onerr
= !!(req
->tsr_stop_onerr
);
764 rc
= sfw_load_test(tsi
);
766 LIBCFS_FREE(tsi
, sizeof(*tsi
));
770 LASSERT(!sfw_batch_active(tsb
));
772 if (!tsi
->tsi_is_client
) {
773 /* it's test server, just add it to tsb */
774 list_add_tail(&tsi
->tsi_list
, &tsb
->bat_tests
);
779 LASSERT(bk
->bk_niov
* SFW_ID_PER_PAGE
>= (unsigned int)ndest
);
780 LASSERT((unsigned int)bk
->bk_len
>=
781 sizeof(lnet_process_id_packed_t
) * ndest
);
783 sfw_unpack_addtest_req(msg
);
784 memcpy(&tsi
->tsi_u
, &req
->tsr_u
, sizeof(tsi
->tsi_u
));
786 for (i
= 0; i
< ndest
; i
++) {
787 lnet_process_id_packed_t
*dests
;
788 lnet_process_id_packed_t id
;
791 dests
= page_address(bk
->bk_iovs
[i
/ SFW_ID_PER_PAGE
].kiov_page
);
792 LASSERT(dests
); /* my pages are within KVM always */
793 id
= dests
[i
% SFW_ID_PER_PAGE
];
794 if (msg
->msg_magic
!= SRPC_MSG_MAGIC
)
797 for (j
= 0; j
< tsi
->tsi_concur
; j
++) {
798 LIBCFS_ALLOC(tsu
, sizeof(sfw_test_unit_t
));
801 CERROR("Can't allocate tsu for %d\n",
806 tsu
->tsu_dest
.nid
= id
.nid
;
807 tsu
->tsu_dest
.pid
= id
.pid
;
808 tsu
->tsu_instance
= tsi
;
809 tsu
->tsu_private
= NULL
;
810 list_add_tail(&tsu
->tsu_list
, &tsi
->tsi_units
);
814 rc
= tsi
->tsi_ops
->tso_init(tsi
);
816 list_add_tail(&tsi
->tsi_list
, &tsb
->bat_tests
);
822 sfw_destroy_test_instance(tsi
);
827 sfw_test_unit_done(sfw_test_unit_t
*tsu
)
829 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
830 sfw_batch_t
*tsb
= tsi
->tsi_batch
;
831 sfw_session_t
*sn
= tsb
->bat_session
;
833 LASSERT(sfw_test_active(tsi
));
835 if (!atomic_dec_and_test(&tsi
->tsi_nactive
))
838 /* the test instance is done */
839 spin_lock(&tsi
->tsi_lock
);
841 tsi
->tsi_stopping
= 0;
843 spin_unlock(&tsi
->tsi_lock
);
845 spin_lock(&sfw_data
.fw_lock
);
847 if (!atomic_dec_and_test(&tsb
->bat_nactive
) ||/* tsb still active */
848 sn
== sfw_data
.fw_session
) { /* sn also active */
849 spin_unlock(&sfw_data
.fw_lock
);
853 LASSERT(!list_empty(&sn
->sn_list
)); /* I'm a zombie! */
855 list_for_each_entry(tsb
, &sn
->sn_batches
, bat_list
) {
856 if (sfw_batch_active(tsb
)) {
857 spin_unlock(&sfw_data
.fw_lock
);
862 list_del_init(&sn
->sn_list
);
863 spin_unlock(&sfw_data
.fw_lock
);
865 sfw_destroy_session(sn
);
869 sfw_test_rpc_done(srpc_client_rpc_t
*rpc
)
871 sfw_test_unit_t
*tsu
= rpc
->crpc_priv
;
872 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
875 tsi
->tsi_ops
->tso_done_rpc(tsu
, rpc
);
877 spin_lock(&tsi
->tsi_lock
);
879 LASSERT(sfw_test_active(tsi
));
880 LASSERT(!list_empty(&rpc
->crpc_list
));
882 list_del_init(&rpc
->crpc_list
);
884 /* batch is stopping or loop is done or get error */
885 if (tsi
->tsi_stopping
|| !tsu
->tsu_loop
||
886 (rpc
->crpc_status
&& tsi
->tsi_stoptsu_onerr
))
889 /* dec ref for poster */
890 srpc_client_rpc_decref(rpc
);
892 spin_unlock(&tsi
->tsi_lock
);
895 swi_schedule_workitem(&tsu
->tsu_worker
);
899 sfw_test_unit_done(tsu
);
903 sfw_create_test_rpc(sfw_test_unit_t
*tsu
, lnet_process_id_t peer
,
904 unsigned features
, int nblk
, int blklen
,
905 srpc_client_rpc_t
**rpcpp
)
907 srpc_client_rpc_t
*rpc
= NULL
;
908 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
910 spin_lock(&tsi
->tsi_lock
);
912 LASSERT(sfw_test_active(tsi
));
913 /* pick request from buffer */
914 rpc
= list_first_entry_or_null(&tsi
->tsi_free_rpcs
,
915 srpc_client_rpc_t
, crpc_list
);
917 LASSERT(nblk
== rpc
->crpc_bulk
.bk_niov
);
918 list_del_init(&rpc
->crpc_list
);
921 spin_unlock(&tsi
->tsi_lock
);
924 rpc
= srpc_create_client_rpc(peer
, tsi
->tsi_service
, nblk
,
925 blklen
, sfw_test_rpc_done
,
926 sfw_test_rpc_fini
, tsu
);
928 srpc_init_client_rpc(rpc
, peer
, tsi
->tsi_service
, nblk
,
929 blklen
, sfw_test_rpc_done
,
930 sfw_test_rpc_fini
, tsu
);
934 CERROR("Can't create rpc for test %d\n", tsi
->tsi_service
);
938 rpc
->crpc_reqstmsg
.msg_ses_feats
= features
;
945 sfw_run_test(swi_workitem_t
*wi
)
947 sfw_test_unit_t
*tsu
= wi
->swi_workitem
.wi_data
;
948 sfw_test_instance_t
*tsi
= tsu
->tsu_instance
;
949 srpc_client_rpc_t
*rpc
= NULL
;
951 LASSERT(wi
== &tsu
->tsu_worker
);
953 if (tsi
->tsi_ops
->tso_prep_rpc(tsu
, tsu
->tsu_dest
, &rpc
)) {
960 spin_lock(&tsi
->tsi_lock
);
962 if (tsi
->tsi_stopping
) {
963 list_add(&rpc
->crpc_list
, &tsi
->tsi_free_rpcs
);
964 spin_unlock(&tsi
->tsi_lock
);
968 if (tsu
->tsu_loop
> 0)
971 list_add_tail(&rpc
->crpc_list
, &tsi
->tsi_active_rpcs
);
972 spin_unlock(&tsi
->tsi_lock
);
974 spin_lock(&rpc
->crpc_lock
);
975 rpc
->crpc_timeout
= rpc_timeout
;
977 spin_unlock(&rpc
->crpc_lock
);
982 * No one can schedule me now since:
983 * - previous RPC, if any, has done and
984 * - no new RPC is initiated.
985 * - my batch is still active; no one can run it again now.
986 * Cancel pending schedules and prevent future schedule attempts:
988 swi_exit_workitem(wi
);
989 sfw_test_unit_done(tsu
);
994 sfw_run_batch(sfw_batch_t
*tsb
)
997 sfw_test_unit_t
*tsu
;
998 sfw_test_instance_t
*tsi
;
1000 if (sfw_batch_active(tsb
)) {
1001 CDEBUG(D_NET
, "Batch already active: %llu (%d)\n",
1002 tsb
->bat_id
.bat_id
, atomic_read(&tsb
->bat_nactive
));
1006 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1007 if (!tsi
->tsi_is_client
) /* skip server instances */
1010 LASSERT(!tsi
->tsi_stopping
);
1011 LASSERT(!sfw_test_active(tsi
));
1013 atomic_inc(&tsb
->bat_nactive
);
1015 list_for_each_entry(tsu
, &tsi
->tsi_units
, tsu_list
) {
1016 atomic_inc(&tsi
->tsi_nactive
);
1017 tsu
->tsu_loop
= tsi
->tsi_loop
;
1018 wi
= &tsu
->tsu_worker
;
1019 swi_init_workitem(wi
, tsu
, sfw_run_test
,
1020 lst_sched_test
[lnet_cpt_of_nid(tsu
->tsu_dest
.nid
)]);
1021 swi_schedule_workitem(wi
);
1029 sfw_stop_batch(sfw_batch_t
*tsb
, int force
)
1031 sfw_test_instance_t
*tsi
;
1032 srpc_client_rpc_t
*rpc
;
1034 if (!sfw_batch_active(tsb
)) {
1035 CDEBUG(D_NET
, "Batch %llu inactive\n", tsb
->bat_id
.bat_id
);
1039 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1040 spin_lock(&tsi
->tsi_lock
);
1042 if (!tsi
->tsi_is_client
||
1043 !sfw_test_active(tsi
) || tsi
->tsi_stopping
) {
1044 spin_unlock(&tsi
->tsi_lock
);
1048 tsi
->tsi_stopping
= 1;
1051 spin_unlock(&tsi
->tsi_lock
);
1055 /* abort launched rpcs in the test */
1056 list_for_each_entry(rpc
, &tsi
->tsi_active_rpcs
, crpc_list
) {
1057 spin_lock(&rpc
->crpc_lock
);
1059 srpc_abort_rpc(rpc
, -EINTR
);
1061 spin_unlock(&rpc
->crpc_lock
);
1064 spin_unlock(&tsi
->tsi_lock
);
1071 sfw_query_batch(sfw_batch_t
*tsb
, int testidx
, srpc_batch_reply_t
*reply
)
1073 sfw_test_instance_t
*tsi
;
1079 reply
->bar_active
= atomic_read(&tsb
->bat_nactive
);
1083 list_for_each_entry(tsi
, &tsb
->bat_tests
, tsi_list
) {
1087 reply
->bar_active
= atomic_read(&tsi
->tsi_nactive
);
1095 sfw_free_pages(struct srpc_server_rpc
*rpc
)
1097 srpc_free_bulk(rpc
->srpc_bulk
);
1098 rpc
->srpc_bulk
= NULL
;
1102 sfw_alloc_pages(struct srpc_server_rpc
*rpc
, int cpt
, int npages
, int len
,
1105 LASSERT(!rpc
->srpc_bulk
);
1106 LASSERT(npages
> 0 && npages
<= LNET_MAX_IOV
);
1108 rpc
->srpc_bulk
= srpc_alloc_bulk(cpt
, npages
, len
, sink
);
1109 if (!rpc
->srpc_bulk
)
1116 sfw_add_test(struct srpc_server_rpc
*rpc
)
1118 sfw_session_t
*sn
= sfw_data
.fw_session
;
1119 srpc_test_reply_t
*reply
= &rpc
->srpc_replymsg
.msg_body
.tes_reply
;
1120 srpc_test_reqst_t
*request
;
1124 request
= &rpc
->srpc_reqstbuf
->buf_msg
.msg_body
.tes_reqst
;
1125 reply
->tsr_sid
= !sn
? LST_INVALID_SID
: sn
->sn_id
;
1127 if (!request
->tsr_loop
||
1128 !request
->tsr_concur
||
1129 request
->tsr_sid
.ses_nid
== LNET_NID_ANY
||
1130 request
->tsr_ndest
> SFW_MAX_NDESTS
||
1131 (request
->tsr_is_client
&& !request
->tsr_ndest
) ||
1132 request
->tsr_concur
> SFW_MAX_CONCUR
||
1133 request
->tsr_service
> SRPC_SERVICE_MAX_ID
||
1134 request
->tsr_service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
) {
1135 reply
->tsr_status
= EINVAL
;
1139 if (!sn
|| !sfw_sid_equal(request
->tsr_sid
, sn
->sn_id
) ||
1140 !sfw_find_test_case(request
->tsr_service
)) {
1141 reply
->tsr_status
= ENOENT
;
1145 bat
= sfw_bid2batch(request
->tsr_bid
);
1147 CERROR("dropping RPC %s from %s under memory pressure\n",
1148 rpc
->srpc_scd
->scd_svc
->sv_name
,
1149 libcfs_id2str(rpc
->srpc_peer
));
1153 if (sfw_batch_active(bat
)) {
1154 reply
->tsr_status
= EBUSY
;
1158 if (request
->tsr_is_client
&& !rpc
->srpc_bulk
) {
1159 /* rpc will be resumed later in sfw_bulk_ready */
1160 int npg
= sfw_id_pages(request
->tsr_ndest
);
1163 if (!(sn
->sn_features
& LST_FEAT_BULK_LEN
)) {
1164 len
= npg
* PAGE_SIZE
;
1167 len
= sizeof(lnet_process_id_packed_t
) *
1171 return sfw_alloc_pages(rpc
, CFS_CPT_ANY
, npg
, len
, 1);
1174 rc
= sfw_add_test_instance(bat
, rpc
);
1175 CDEBUG(!rc
? D_NET
: D_WARNING
,
1176 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1177 !rc
? "Added" : "Failed to add", request
->tsr_service
,
1178 request
->tsr_is_client
? "client" : "server",
1179 request
->tsr_loop
, request
->tsr_concur
, request
->tsr_ndest
);
1181 reply
->tsr_status
= (rc
< 0) ? -rc
: rc
;
1186 sfw_control_batch(srpc_batch_reqst_t
*request
, srpc_batch_reply_t
*reply
)
1188 sfw_session_t
*sn
= sfw_data
.fw_session
;
1192 reply
->bar_sid
= !sn
? LST_INVALID_SID
: sn
->sn_id
;
1194 if (!sn
|| !sfw_sid_equal(request
->bar_sid
, sn
->sn_id
)) {
1195 reply
->bar_status
= ESRCH
;
1199 bat
= sfw_find_batch(request
->bar_bid
);
1201 reply
->bar_status
= ENOENT
;
1205 switch (request
->bar_opc
) {
1206 case SRPC_BATCH_OPC_RUN
:
1207 rc
= sfw_run_batch(bat
);
1210 case SRPC_BATCH_OPC_STOP
:
1211 rc
= sfw_stop_batch(bat
, request
->bar_arg
);
1214 case SRPC_BATCH_OPC_QUERY
:
1215 rc
= sfw_query_batch(bat
, request
->bar_testidx
, reply
);
1219 return -EINVAL
; /* drop it */
1222 reply
->bar_status
= (rc
< 0) ? -rc
: rc
;
1227 sfw_handle_server_rpc(struct srpc_server_rpc
*rpc
)
1229 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
1230 srpc_msg_t
*reply
= &rpc
->srpc_replymsg
;
1231 srpc_msg_t
*request
= &rpc
->srpc_reqstbuf
->buf_msg
;
1232 unsigned features
= LST_FEATS_MASK
;
1235 LASSERT(!sfw_data
.fw_active_srpc
);
1236 LASSERT(sv
->sv_id
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1238 spin_lock(&sfw_data
.fw_lock
);
1240 if (sfw_data
.fw_shuttingdown
) {
1241 spin_unlock(&sfw_data
.fw_lock
);
1245 /* Remove timer to avoid racing with it or expiring active session */
1246 if (sfw_del_session_timer()) {
1247 CERROR("Dropping RPC (%s) from %s: racing with expiry timer.",
1248 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
));
1249 spin_unlock(&sfw_data
.fw_lock
);
1253 sfw_data
.fw_active_srpc
= rpc
;
1254 spin_unlock(&sfw_data
.fw_lock
);
1256 sfw_unpack_message(request
);
1257 LASSERT(request
->msg_type
== srpc_service2request(sv
->sv_id
));
1259 /* rpc module should have checked this */
1260 LASSERT(request
->msg_version
== SRPC_MSG_VERSION
);
1262 if (sv
->sv_id
!= SRPC_SERVICE_MAKE_SESSION
&&
1263 sv
->sv_id
!= SRPC_SERVICE_DEBUG
) {
1264 sfw_session_t
*sn
= sfw_data
.fw_session
;
1267 sn
->sn_features
!= request
->msg_ses_feats
) {
1268 CNETERR("Features of framework RPC don't match features of current session: %x/%x\n",
1269 request
->msg_ses_feats
, sn
->sn_features
);
1270 reply
->msg_body
.reply
.status
= EPROTO
;
1271 reply
->msg_body
.reply
.sid
= sn
->sn_id
;
1275 } else if (request
->msg_ses_feats
& ~LST_FEATS_MASK
) {
1277 * NB: at this point, old version will ignore features and
1278 * create new session anyway, so console should be able
1281 reply
->msg_body
.reply
.status
= EPROTO
;
1285 switch (sv
->sv_id
) {
1288 case SRPC_SERVICE_TEST
:
1289 rc
= sfw_add_test(rpc
);
1292 case SRPC_SERVICE_BATCH
:
1293 rc
= sfw_control_batch(&request
->msg_body
.bat_reqst
,
1294 &reply
->msg_body
.bat_reply
);
1297 case SRPC_SERVICE_QUERY_STAT
:
1298 rc
= sfw_get_stats(&request
->msg_body
.stat_reqst
,
1299 &reply
->msg_body
.stat_reply
);
1302 case SRPC_SERVICE_DEBUG
:
1303 rc
= sfw_debug_session(&request
->msg_body
.dbg_reqst
,
1304 &reply
->msg_body
.dbg_reply
);
1307 case SRPC_SERVICE_MAKE_SESSION
:
1308 rc
= sfw_make_session(&request
->msg_body
.mksn_reqst
,
1309 &reply
->msg_body
.mksn_reply
);
1312 case SRPC_SERVICE_REMOVE_SESSION
:
1313 rc
= sfw_remove_session(&request
->msg_body
.rmsn_reqst
,
1314 &reply
->msg_body
.rmsn_reply
);
1318 if (sfw_data
.fw_session
)
1319 features
= sfw_data
.fw_session
->sn_features
;
1321 reply
->msg_ses_feats
= features
;
1322 rpc
->srpc_done
= sfw_server_rpc_done
;
1323 spin_lock(&sfw_data
.fw_lock
);
1325 if (!sfw_data
.fw_shuttingdown
)
1326 sfw_add_session_timer();
1328 sfw_data
.fw_active_srpc
= NULL
;
1329 spin_unlock(&sfw_data
.fw_lock
);
1334 sfw_bulk_ready(struct srpc_server_rpc
*rpc
, int status
)
1336 struct srpc_service
*sv
= rpc
->srpc_scd
->scd_svc
;
1339 LASSERT(rpc
->srpc_bulk
);
1340 LASSERT(sv
->sv_id
== SRPC_SERVICE_TEST
);
1341 LASSERT(!sfw_data
.fw_active_srpc
);
1342 LASSERT(rpc
->srpc_reqstbuf
->buf_msg
.msg_body
.tes_reqst
.tsr_is_client
);
1344 spin_lock(&sfw_data
.fw_lock
);
1347 CERROR("Bulk transfer failed for RPC: service %s, peer %s, status %d\n",
1348 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
), status
);
1349 spin_unlock(&sfw_data
.fw_lock
);
1353 if (sfw_data
.fw_shuttingdown
) {
1354 spin_unlock(&sfw_data
.fw_lock
);
1358 if (sfw_del_session_timer()) {
1359 CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1360 sv
->sv_name
, libcfs_id2str(rpc
->srpc_peer
));
1361 spin_unlock(&sfw_data
.fw_lock
);
1365 sfw_data
.fw_active_srpc
= rpc
;
1366 spin_unlock(&sfw_data
.fw_lock
);
1368 rc
= sfw_add_test(rpc
);
1370 spin_lock(&sfw_data
.fw_lock
);
1372 if (!sfw_data
.fw_shuttingdown
)
1373 sfw_add_session_timer();
1375 sfw_data
.fw_active_srpc
= NULL
;
1376 spin_unlock(&sfw_data
.fw_lock
);
1381 sfw_create_rpc(lnet_process_id_t peer
, int service
,
1382 unsigned features
, int nbulkiov
, int bulklen
,
1383 void (*done
)(srpc_client_rpc_t
*), void *priv
)
1385 srpc_client_rpc_t
*rpc
= NULL
;
1387 spin_lock(&sfw_data
.fw_lock
);
1389 LASSERT(!sfw_data
.fw_shuttingdown
);
1390 LASSERT(service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1392 if (!nbulkiov
&& !list_empty(&sfw_data
.fw_zombie_rpcs
)) {
1393 rpc
= list_entry(sfw_data
.fw_zombie_rpcs
.next
,
1394 srpc_client_rpc_t
, crpc_list
);
1395 list_del(&rpc
->crpc_list
);
1397 srpc_init_client_rpc(rpc
, peer
, service
, 0, 0,
1398 done
, sfw_client_rpc_fini
, priv
);
1401 spin_unlock(&sfw_data
.fw_lock
);
1404 rpc
= srpc_create_client_rpc(peer
, service
,
1405 nbulkiov
, bulklen
, done
,
1407 sfw_client_rpc_fini
,
1411 if (rpc
) /* "session" is concept in framework */
1412 rpc
->crpc_reqstmsg
.msg_ses_feats
= features
;
1418 sfw_unpack_message(srpc_msg_t
*msg
)
1420 if (msg
->msg_magic
== SRPC_MSG_MAGIC
)
1421 return; /* no flipping needed */
1423 /* srpc module should guarantee I wouldn't get crap */
1424 LASSERT(msg
->msg_magic
== __swab32(SRPC_MSG_MAGIC
));
1426 if (msg
->msg_type
== SRPC_MSG_STAT_REQST
) {
1427 srpc_stat_reqst_t
*req
= &msg
->msg_body
.stat_reqst
;
1429 __swab32s(&req
->str_type
);
1430 __swab64s(&req
->str_rpyid
);
1431 sfw_unpack_sid(req
->str_sid
);
1435 if (msg
->msg_type
== SRPC_MSG_STAT_REPLY
) {
1436 srpc_stat_reply_t
*rep
= &msg
->msg_body
.stat_reply
;
1438 __swab32s(&rep
->str_status
);
1439 sfw_unpack_sid(rep
->str_sid
);
1440 sfw_unpack_fw_counters(rep
->str_fw
);
1441 sfw_unpack_rpc_counters(rep
->str_rpc
);
1442 sfw_unpack_lnet_counters(rep
->str_lnet
);
1446 if (msg
->msg_type
== SRPC_MSG_MKSN_REQST
) {
1447 srpc_mksn_reqst_t
*req
= &msg
->msg_body
.mksn_reqst
;
1449 __swab64s(&req
->mksn_rpyid
);
1450 __swab32s(&req
->mksn_force
);
1451 sfw_unpack_sid(req
->mksn_sid
);
1455 if (msg
->msg_type
== SRPC_MSG_MKSN_REPLY
) {
1456 srpc_mksn_reply_t
*rep
= &msg
->msg_body
.mksn_reply
;
1458 __swab32s(&rep
->mksn_status
);
1459 __swab32s(&rep
->mksn_timeout
);
1460 sfw_unpack_sid(rep
->mksn_sid
);
1464 if (msg
->msg_type
== SRPC_MSG_RMSN_REQST
) {
1465 srpc_rmsn_reqst_t
*req
= &msg
->msg_body
.rmsn_reqst
;
1467 __swab64s(&req
->rmsn_rpyid
);
1468 sfw_unpack_sid(req
->rmsn_sid
);
1472 if (msg
->msg_type
== SRPC_MSG_RMSN_REPLY
) {
1473 srpc_rmsn_reply_t
*rep
= &msg
->msg_body
.rmsn_reply
;
1475 __swab32s(&rep
->rmsn_status
);
1476 sfw_unpack_sid(rep
->rmsn_sid
);
1480 if (msg
->msg_type
== SRPC_MSG_DEBUG_REQST
) {
1481 srpc_debug_reqst_t
*req
= &msg
->msg_body
.dbg_reqst
;
1483 __swab64s(&req
->dbg_rpyid
);
1484 __swab32s(&req
->dbg_flags
);
1485 sfw_unpack_sid(req
->dbg_sid
);
1489 if (msg
->msg_type
== SRPC_MSG_DEBUG_REPLY
) {
1490 srpc_debug_reply_t
*rep
= &msg
->msg_body
.dbg_reply
;
1492 __swab32s(&rep
->dbg_nbatch
);
1493 __swab32s(&rep
->dbg_timeout
);
1494 sfw_unpack_sid(rep
->dbg_sid
);
1498 if (msg
->msg_type
== SRPC_MSG_BATCH_REQST
) {
1499 srpc_batch_reqst_t
*req
= &msg
->msg_body
.bat_reqst
;
1501 __swab32s(&req
->bar_opc
);
1502 __swab64s(&req
->bar_rpyid
);
1503 __swab32s(&req
->bar_testidx
);
1504 __swab32s(&req
->bar_arg
);
1505 sfw_unpack_sid(req
->bar_sid
);
1506 __swab64s(&req
->bar_bid
.bat_id
);
1510 if (msg
->msg_type
== SRPC_MSG_BATCH_REPLY
) {
1511 srpc_batch_reply_t
*rep
= &msg
->msg_body
.bat_reply
;
1513 __swab32s(&rep
->bar_status
);
1514 sfw_unpack_sid(rep
->bar_sid
);
1518 if (msg
->msg_type
== SRPC_MSG_TEST_REQST
) {
1519 srpc_test_reqst_t
*req
= &msg
->msg_body
.tes_reqst
;
1521 __swab64s(&req
->tsr_rpyid
);
1522 __swab64s(&req
->tsr_bulkid
);
1523 __swab32s(&req
->tsr_loop
);
1524 __swab32s(&req
->tsr_ndest
);
1525 __swab32s(&req
->tsr_concur
);
1526 __swab32s(&req
->tsr_service
);
1527 sfw_unpack_sid(req
->tsr_sid
);
1528 __swab64s(&req
->tsr_bid
.bat_id
);
1532 if (msg
->msg_type
== SRPC_MSG_TEST_REPLY
) {
1533 srpc_test_reply_t
*rep
= &msg
->msg_body
.tes_reply
;
1535 __swab32s(&rep
->tsr_status
);
1536 sfw_unpack_sid(rep
->tsr_sid
);
1540 if (msg
->msg_type
== SRPC_MSG_JOIN_REQST
) {
1541 srpc_join_reqst_t
*req
= &msg
->msg_body
.join_reqst
;
1543 __swab64s(&req
->join_rpyid
);
1544 sfw_unpack_sid(req
->join_sid
);
1548 if (msg
->msg_type
== SRPC_MSG_JOIN_REPLY
) {
1549 srpc_join_reply_t
*rep
= &msg
->msg_body
.join_reply
;
1551 __swab32s(&rep
->join_status
);
1552 __swab32s(&rep
->join_timeout
);
1553 sfw_unpack_sid(rep
->join_sid
);
1561 sfw_abort_rpc(srpc_client_rpc_t
*rpc
)
1563 LASSERT(atomic_read(&rpc
->crpc_refcount
) > 0);
1564 LASSERT(rpc
->crpc_service
<= SRPC_FRAMEWORK_SERVICE_MAX_ID
);
1566 spin_lock(&rpc
->crpc_lock
);
1567 srpc_abort_rpc(rpc
, -EINTR
);
1568 spin_unlock(&rpc
->crpc_lock
);
1572 sfw_post_rpc(srpc_client_rpc_t
*rpc
)
1574 spin_lock(&rpc
->crpc_lock
);
1576 LASSERT(!rpc
->crpc_closed
);
1577 LASSERT(!rpc
->crpc_aborted
);
1578 LASSERT(list_empty(&rpc
->crpc_list
));
1579 LASSERT(!sfw_data
.fw_shuttingdown
);
1581 rpc
->crpc_timeout
= rpc_timeout
;
1584 spin_unlock(&rpc
->crpc_lock
);
1587 static srpc_service_t sfw_services
[] = {
1589 /* sv_id */ SRPC_SERVICE_DEBUG
,
1590 /* sv_name */ "debug",
1594 /* sv_id */ SRPC_SERVICE_QUERY_STAT
,
1595 /* sv_name */ "query stats",
1599 /* sv_id */ SRPC_SERVICE_MAKE_SESSION
,
1600 /* sv_name */ "make session",
1604 /* sv_id */ SRPC_SERVICE_REMOVE_SESSION
,
1605 /* sv_name */ "remove session",
1609 /* sv_id */ SRPC_SERVICE_BATCH
,
1610 /* sv_name */ "batch service",
1614 /* sv_id */ SRPC_SERVICE_TEST
,
1615 /* sv_name */ "test service",
1632 sfw_test_case_t
*tsc
;
1634 if (session_timeout
< 0) {
1635 CERROR("Session timeout must be non-negative: %d\n",
1640 if (rpc_timeout
< 0) {
1641 CERROR("RPC timeout must be non-negative: %d\n",
1646 if (!session_timeout
)
1647 CWARN("Zero session_timeout specified - test sessions never expire.\n");
1650 CWARN("Zero rpc_timeout specified - test RPC never expire.\n");
1652 memset(&sfw_data
, 0, sizeof(struct smoketest_framework
));
1654 sfw_data
.fw_session
= NULL
;
1655 sfw_data
.fw_active_srpc
= NULL
;
1656 spin_lock_init(&sfw_data
.fw_lock
);
1657 atomic_set(&sfw_data
.fw_nzombies
, 0);
1658 INIT_LIST_HEAD(&sfw_data
.fw_tests
);
1659 INIT_LIST_HEAD(&sfw_data
.fw_zombie_rpcs
);
1660 INIT_LIST_HEAD(&sfw_data
.fw_zombie_sessions
);
1662 brw_init_test_client();
1663 brw_init_test_service();
1664 rc
= sfw_register_test(&brw_test_service
, &brw_test_client
);
1667 ping_init_test_client();
1668 ping_init_test_service();
1669 rc
= sfw_register_test(&ping_test_service
, &ping_test_client
);
1673 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
1674 sv
= tsc
->tsc_srv_service
;
1676 rc
= srpc_add_service(sv
);
1677 LASSERT(rc
!= -EBUSY
);
1679 CWARN("Failed to add %s service: %d\n",
1685 for (i
= 0; ; i
++) {
1686 sv
= &sfw_services
[i
];
1690 sv
->sv_bulk_ready
= NULL
;
1691 sv
->sv_handler
= sfw_handle_server_rpc
;
1692 sv
->sv_wi_total
= SFW_FRWK_WI_MAX
;
1693 if (sv
->sv_id
== SRPC_SERVICE_TEST
)
1694 sv
->sv_bulk_ready
= sfw_bulk_ready
;
1696 rc
= srpc_add_service(sv
);
1697 LASSERT(rc
!= -EBUSY
);
1699 CWARN("Failed to add %s service: %d\n",
1704 /* about to sfw_shutdown, no need to add buffer */
1708 rc
= srpc_service_add_buffers(sv
, sv
->sv_wi_total
);
1710 CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
1711 sv
->sv_name
, sv
->sv_wi_total
, rc
);
1725 sfw_test_case_t
*tsc
;
1728 spin_lock(&sfw_data
.fw_lock
);
1730 sfw_data
.fw_shuttingdown
= 1;
1731 lst_wait_until(!sfw_data
.fw_active_srpc
, sfw_data
.fw_lock
,
1732 "waiting for active RPC to finish.\n");
1734 if (sfw_del_session_timer())
1735 lst_wait_until(!sfw_data
.fw_session
, sfw_data
.fw_lock
,
1736 "waiting for session timer to explode.\n");
1738 sfw_deactivate_session();
1739 lst_wait_until(!atomic_read(&sfw_data
.fw_nzombies
),
1741 "waiting for %d zombie sessions to die.\n",
1742 atomic_read(&sfw_data
.fw_nzombies
));
1744 spin_unlock(&sfw_data
.fw_lock
);
1746 for (i
= 0; ; i
++) {
1747 sv
= &sfw_services
[i
];
1751 srpc_shutdown_service(sv
);
1752 srpc_remove_service(sv
);
1755 list_for_each_entry(tsc
, &sfw_data
.fw_tests
, tsc_list
) {
1756 sv
= tsc
->tsc_srv_service
;
1757 srpc_shutdown_service(sv
);
1758 srpc_remove_service(sv
);
1761 while (!list_empty(&sfw_data
.fw_zombie_rpcs
)) {
1762 srpc_client_rpc_t
*rpc
;
1764 rpc
= list_entry(sfw_data
.fw_zombie_rpcs
.next
,
1765 srpc_client_rpc_t
, crpc_list
);
1766 list_del(&rpc
->crpc_list
);
1768 LIBCFS_FREE(rpc
, srpc_client_rpc_size(rpc
));
1771 for (i
= 0; ; i
++) {
1772 sv
= &sfw_services
[i
];
1776 srpc_wait_service_shutdown(sv
);
1779 while (!list_empty(&sfw_data
.fw_tests
)) {
1780 tsc
= list_entry(sfw_data
.fw_tests
.next
,
1781 sfw_test_case_t
, tsc_list
);
1783 srpc_wait_service_shutdown(tsc
->tsc_srv_service
);
1785 list_del(&tsc
->tsc_list
);
1786 LIBCFS_FREE(tsc
, sizeof(*tsc
));