2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
48 #include "xprt_rdma.h"
50 #include <linux/highmem.h>
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY RPCDBG_TRANS
56 enum rpcrdma_chunktype
{
64 static const char transfertypes
[][12] = {
65 "inline", /* no chunks */
66 "read list", /* some argument via rdma read */
67 "*read list", /* entire request via rdma read */
68 "write list", /* some result via rdma write */
69 "reply chunk" /* entire reply via rdma write */
72 /* Returns size of largest RPC-over-RDMA header in a Call message
74 * The largest Call header contains a full-size Read list and a
75 * minimal Reply chunk.
77 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
81 /* Fixed header fields and list discriminators */
82 size
= RPCRDMA_HDRLEN_MIN
;
84 /* Maximum Read list size */
85 maxsegs
+= 2; /* segment for head and tail buffers */
86 size
= maxsegs
* sizeof(struct rpcrdma_read_chunk
);
88 /* Minimal Read chunk size */
89 size
+= sizeof(__be32
); /* segment count */
90 size
+= sizeof(struct rpcrdma_segment
);
91 size
+= sizeof(__be32
); /* list discriminator */
93 dprintk("RPC: %s: max call header size = %u\n",
98 /* Returns size of largest RPC-over-RDMA header in a Reply message
100 * There is only one Write list or one Reply chunk per Reply
101 * message. The larger list is the Write list.
103 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
107 /* Fixed header fields and list discriminators */
108 size
= RPCRDMA_HDRLEN_MIN
;
110 /* Maximum Write list size */
111 maxsegs
+= 2; /* segment for head and tail buffers */
112 size
= sizeof(__be32
); /* segment count */
113 size
+= maxsegs
* sizeof(struct rpcrdma_segment
);
114 size
+= sizeof(__be32
); /* list discriminator */
116 dprintk("RPC: %s: max reply header size = %u\n",
121 void rpcrdma_set_max_header_sizes(struct rpcrdma_ia
*ia
,
122 struct rpcrdma_create_data_internal
*cdata
,
123 unsigned int maxsegs
)
125 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
126 rpcrdma_max_call_header_size(maxsegs
);
127 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
128 rpcrdma_max_reply_header_size(maxsegs
);
131 /* The client can send a request inline as long as the RPCRDMA header
132 * plus the RPC call fit under the transport's inline limit. If the
133 * combined call message size exceeds that limit, the client must use
134 * the read chunk list for this operation.
136 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
137 struct rpc_rqst
*rqst
)
139 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
141 return rqst
->rq_snd_buf
.len
<= ia
->ri_max_inline_write
;
144 /* The client can't know how large the actual reply will be. Thus it
145 * plans for the largest possible reply for that particular ULP
146 * operation. If the maximum combined reply message size exceeds that
147 * limit, the client must provide a write list or a reply chunk for
150 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
151 struct rpc_rqst
*rqst
)
153 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
155 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
159 rpcrdma_tail_pullup(struct xdr_buf
*buf
)
161 size_t tlen
= buf
->tail
[0].iov_len
;
162 size_t skip
= tlen
& 3;
164 /* Do not include the tail if it is only an XDR pad */
168 /* xdr_write_pages() adds a pad at the beginning of the tail
169 * if the content in "buf->pages" is unaligned. Force the
170 * tail's actual content to land at the next XDR position
171 * after the head instead.
174 unsigned char *src
, *dst
;
177 src
= buf
->tail
[0].iov_base
;
178 dst
= buf
->head
[0].iov_base
;
179 dst
+= buf
->head
[0].iov_len
;
184 dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
185 __func__
, skip
, dst
, src
, tlen
);
187 for (count
= tlen
; count
; count
--)
194 /* Split "vec" on page boundaries into segments. FMR registers pages,
195 * not a byte range. Other modes coalesce these segments into a single
199 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
,
206 base
= vec
->iov_base
;
207 page_offset
= offset_in_page(base
);
208 remaining
= vec
->iov_len
;
209 while (remaining
&& n
< nsegs
) {
210 seg
[n
].mr_page
= NULL
;
211 seg
[n
].mr_offset
= base
;
212 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
213 remaining
-= seg
[n
].mr_len
;
214 base
+= seg
[n
].mr_len
;
222 * Chunk assembly from upper layer xdr_buf.
224 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
225 * elements. Segments are then coalesced when registered, if possible
226 * within the selected memreg mode.
228 * Returns positive number of segments converted, or a negative errno.
232 rpcrdma_convert_iovs(struct xdr_buf
*xdrbuf
, unsigned int pos
,
233 enum rpcrdma_chunktype type
, struct rpcrdma_mr_seg
*seg
, int nsegs
)
237 struct page
**ppages
;
240 n
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, n
, nsegs
);
245 len
= xdrbuf
->page_len
;
246 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
247 page_base
= xdrbuf
->page_base
& ~PAGE_MASK
;
249 while (len
&& n
< nsegs
) {
251 /* alloc the pagelist for receiving buffer */
252 ppages
[p
] = alloc_page(GFP_ATOMIC
);
256 seg
[n
].mr_page
= ppages
[p
];
257 seg
[n
].mr_offset
= (void *)(unsigned long) page_base
;
258 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
259 if (seg
[n
].mr_len
> PAGE_SIZE
)
261 len
-= seg
[n
].mr_len
;
264 page_base
= 0; /* page offset only applies to first page */
267 /* Message overflows the seg array */
268 if (len
&& n
== nsegs
)
271 /* When encoding the read list, the tail is always sent inline */
272 if (type
== rpcrdma_readch
)
275 if (xdrbuf
->tail
[0].iov_len
) {
276 /* the rpcrdma protocol allows us to omit any trailing
277 * xdr pad bytes, saving the server an RDMA operation. */
278 if (xdrbuf
->tail
[0].iov_len
< 4 && xprt_rdma_pad_optimize
)
280 n
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, n
, nsegs
);
289 * Create read/write chunk lists, and reply chunks, for RDMA
291 * Assume check against THRESHOLD has been done, and chunks are required.
292 * Assume only encoding one list entry for read|write chunks. The NFSv3
293 * protocol is simple enough to allow this as it only has a single "bulk
294 * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
295 * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
297 * When used for a single reply chunk (which is a special write
298 * chunk used for the entire reply, rather than just the data), it
299 * is used primarily for READDIR and READLINK which would otherwise
300 * be severely size-limited by a small rdma inline read max. The server
301 * response will come back as an RDMA Write, followed by a message
302 * of type RDMA_NOMSG carrying the xid and length. As a result, reply
303 * chunks do not provide data alignment, however they do not require
304 * "fixup" (moving the response to the upper layer buffer) either.
306 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
308 * Read chunklist (a linked list):
309 * N elements, position P (same P for all chunks of same arg!):
310 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
312 * Write chunklist (a list of (one) counted array):
314 * 1 - N - HLOO - HLOO - ... - HLOO - 0
316 * Reply chunk (a counted array):
318 * 1 - N - HLOO - HLOO - ... - HLOO
320 * Returns positive RPC/RDMA header size, or negative errno.
324 rpcrdma_create_chunks(struct rpc_rqst
*rqst
, struct xdr_buf
*target
,
325 struct rpcrdma_msg
*headerp
, enum rpcrdma_chunktype type
)
327 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
328 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(rqst
->rq_xprt
);
329 int n
, nsegs
, nchunks
= 0;
331 struct rpcrdma_mr_seg
*seg
= req
->rl_segments
;
332 struct rpcrdma_read_chunk
*cur_rchunk
= NULL
;
333 struct rpcrdma_write_array
*warray
= NULL
;
334 struct rpcrdma_write_chunk
*cur_wchunk
= NULL
;
335 __be32
*iptr
= headerp
->rm_body
.rm_chunks
;
336 int (*map
)(struct rpcrdma_xprt
*, struct rpcrdma_mr_seg
*, int, bool);
338 if (type
== rpcrdma_readch
|| type
== rpcrdma_areadch
) {
339 /* a read chunk - server will RDMA Read our memory */
340 cur_rchunk
= (struct rpcrdma_read_chunk
*) iptr
;
342 /* a write or reply chunk - server will RDMA Write our memory */
343 *iptr
++ = xdr_zero
; /* encode a NULL read chunk list */
344 if (type
== rpcrdma_replych
)
345 *iptr
++ = xdr_zero
; /* a NULL write chunk list */
346 warray
= (struct rpcrdma_write_array
*) iptr
;
347 cur_wchunk
= (struct rpcrdma_write_chunk
*) (warray
+ 1);
350 if (type
== rpcrdma_replych
|| type
== rpcrdma_areadch
)
353 pos
= target
->head
[0].iov_len
;
355 nsegs
= rpcrdma_convert_iovs(target
, pos
, type
, seg
, RPCRDMA_MAX_SEGS
);
359 map
= r_xprt
->rx_ia
.ri_ops
->ro_map
;
361 n
= map(r_xprt
, seg
, nsegs
, cur_wchunk
!= NULL
);
364 if (cur_rchunk
) { /* read */
365 cur_rchunk
->rc_discrim
= xdr_one
;
366 /* all read chunks have the same "position" */
367 cur_rchunk
->rc_position
= cpu_to_be32(pos
);
368 cur_rchunk
->rc_target
.rs_handle
=
369 cpu_to_be32(seg
->mr_rkey
);
370 cur_rchunk
->rc_target
.rs_length
=
371 cpu_to_be32(seg
->mr_len
);
373 (__be32
*)&cur_rchunk
->rc_target
.rs_offset
,
375 dprintk("RPC: %s: read chunk "
376 "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__
,
377 seg
->mr_len
, (unsigned long long)seg
->mr_base
,
378 seg
->mr_rkey
, pos
, n
< nsegs
? "more" : "last");
380 r_xprt
->rx_stats
.read_chunk_count
++;
381 } else { /* write/reply */
382 cur_wchunk
->wc_target
.rs_handle
=
383 cpu_to_be32(seg
->mr_rkey
);
384 cur_wchunk
->wc_target
.rs_length
=
385 cpu_to_be32(seg
->mr_len
);
387 (__be32
*)&cur_wchunk
->wc_target
.rs_offset
,
389 dprintk("RPC: %s: %s chunk "
390 "elem %d@0x%llx:0x%x (%s)\n", __func__
,
391 (type
== rpcrdma_replych
) ? "reply" : "write",
392 seg
->mr_len
, (unsigned long long)seg
->mr_base
,
393 seg
->mr_rkey
, n
< nsegs
? "more" : "last");
395 if (type
== rpcrdma_replych
)
396 r_xprt
->rx_stats
.reply_chunk_count
++;
398 r_xprt
->rx_stats
.write_chunk_count
++;
399 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
406 /* success. all failures return above */
407 req
->rl_nchunks
= nchunks
;
410 * finish off header. If write, marshal discrim and nchunks.
413 iptr
= (__be32
*) cur_rchunk
;
414 *iptr
++ = xdr_zero
; /* finish the read chunk list */
415 *iptr
++ = xdr_zero
; /* encode a NULL write chunk list */
416 *iptr
++ = xdr_zero
; /* encode a NULL reply chunk */
418 warray
->wc_discrim
= xdr_one
;
419 warray
->wc_nchunks
= cpu_to_be32(nchunks
);
420 iptr
= (__be32
*) cur_wchunk
;
421 if (type
== rpcrdma_writech
) {
422 *iptr
++ = xdr_zero
; /* finish the write chunk list */
423 *iptr
++ = xdr_zero
; /* encode a NULL reply chunk */
428 * Return header size.
430 return (unsigned char *)iptr
- (unsigned char *)headerp
;
433 for (pos
= 0; nchunks
--;)
434 pos
+= r_xprt
->rx_ia
.ri_ops
->ro_unmap(r_xprt
,
435 &req
->rl_segments
[pos
]);
439 static inline __be32
*
440 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mr_seg
*seg
)
442 *iptr
++ = cpu_to_be32(seg
->mr_rkey
);
443 *iptr
++ = cpu_to_be32(seg
->mr_len
);
444 return xdr_encode_hyper(iptr
, seg
->mr_base
);
447 /* XDR-encode the Read list. Supports encoding a list of read
448 * segments that belong to a single read chunk.
450 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
452 * Read chunklist (a linked list):
453 * N elements, position P (same P for all chunks of same arg!):
454 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
456 * Returns a pointer to the XDR word in the RDMA header following
457 * the end of the Read list, or an error pointer.
460 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
,
461 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
462 __be32
*iptr
, enum rpcrdma_chunktype rtype
)
464 struct rpcrdma_mr_seg
*seg
= req
->rl_nextseg
;
468 if (rtype
== rpcrdma_noch
) {
469 *iptr
++ = xdr_zero
; /* item not present */
473 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
474 if (rtype
== rpcrdma_areadch
)
476 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_snd_buf
, pos
, rtype
, seg
,
477 RPCRDMA_MAX_SEGS
- req
->rl_nchunks
);
479 return ERR_PTR(nsegs
);
482 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
, false);
486 *iptr
++ = xdr_one
; /* item present */
488 /* All read segments in this chunk
489 * have the same "position".
491 *iptr
++ = cpu_to_be32(pos
);
492 iptr
= xdr_encode_rdma_segment(iptr
, seg
);
494 dprintk("RPC: %5u %s: read segment pos %u "
495 "%d@0x%016llx:0x%08x (%s)\n",
496 rqst
->rq_task
->tk_pid
, __func__
, pos
,
497 seg
->mr_len
, (unsigned long long)seg
->mr_base
,
498 seg
->mr_rkey
, n
< nsegs
? "more" : "last");
500 r_xprt
->rx_stats
.read_chunk_count
++;
505 req
->rl_nextseg
= seg
;
507 /* Finish Read list */
508 *iptr
++ = xdr_zero
; /* Next item not present */
512 /* XDR-encode the Write list. Supports encoding a list containing
513 * one array of plain segments that belong to a single write chunk.
515 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
517 * Write chunklist (a list of (one) counted array):
519 * 1 - N - HLOO - HLOO - ... - HLOO - 0
521 * Returns a pointer to the XDR word in the RDMA header following
522 * the end of the Write list, or an error pointer.
525 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
526 struct rpc_rqst
*rqst
, __be32
*iptr
,
527 enum rpcrdma_chunktype wtype
)
529 struct rpcrdma_mr_seg
*seg
= req
->rl_nextseg
;
530 int n
, nsegs
, nchunks
;
533 if (wtype
!= rpcrdma_writech
) {
534 *iptr
++ = xdr_zero
; /* no Write list present */
538 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
,
539 rqst
->rq_rcv_buf
.head
[0].iov_len
,
541 RPCRDMA_MAX_SEGS
- req
->rl_nchunks
);
543 return ERR_PTR(nsegs
);
545 *iptr
++ = xdr_one
; /* Write list present */
546 segcount
= iptr
++; /* save location of segment count */
550 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
, true);
554 iptr
= xdr_encode_rdma_segment(iptr
, seg
);
556 dprintk("RPC: %5u %s: write segment "
557 "%d@0x016%llx:0x%08x (%s)\n",
558 rqst
->rq_task
->tk_pid
, __func__
,
559 seg
->mr_len
, (unsigned long long)seg
->mr_base
,
560 seg
->mr_rkey
, n
< nsegs
? "more" : "last");
562 r_xprt
->rx_stats
.write_chunk_count
++;
563 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
569 req
->rl_nextseg
= seg
;
571 /* Update count of segments in this Write chunk */
572 *segcount
= cpu_to_be32(nchunks
);
574 /* Finish Write list */
575 *iptr
++ = xdr_zero
; /* Next item not present */
579 /* XDR-encode the Reply chunk. Supports encoding an array of plain
580 * segments that belong to a single write (reply) chunk.
582 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
584 * Reply chunk (a counted array):
586 * 1 - N - HLOO - HLOO - ... - HLOO
588 * Returns a pointer to the XDR word in the RDMA header following
589 * the end of the Reply chunk, or an error pointer.
592 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
,
593 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
594 __be32
*iptr
, enum rpcrdma_chunktype wtype
)
596 struct rpcrdma_mr_seg
*seg
= req
->rl_nextseg
;
597 int n
, nsegs
, nchunks
;
600 if (wtype
!= rpcrdma_replych
) {
601 *iptr
++ = xdr_zero
; /* no Reply chunk present */
605 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
, 0, wtype
, seg
,
606 RPCRDMA_MAX_SEGS
- req
->rl_nchunks
);
608 return ERR_PTR(nsegs
);
610 *iptr
++ = xdr_one
; /* Reply chunk present */
611 segcount
= iptr
++; /* save location of segment count */
615 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
, true);
619 iptr
= xdr_encode_rdma_segment(iptr
, seg
);
621 dprintk("RPC: %5u %s: reply segment "
622 "%d@0x%016llx:0x%08x (%s)\n",
623 rqst
->rq_task
->tk_pid
, __func__
,
624 seg
->mr_len
, (unsigned long long)seg
->mr_base
,
625 seg
->mr_rkey
, n
< nsegs
? "more" : "last");
627 r_xprt
->rx_stats
.reply_chunk_count
++;
628 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
634 req
->rl_nextseg
= seg
;
636 /* Update count of segments in the Reply chunk */
637 *segcount
= cpu_to_be32(nchunks
);
643 * Copy write data inline.
644 * This function is used for "small" requests. Data which is passed
645 * to RPC via iovecs (or page list) is copied directly into the
646 * pre-registered memory buffer for this request. For small amounts
647 * of data, this is efficient. The cutoff value is tunable.
649 static void rpcrdma_inline_pullup(struct rpc_rqst
*rqst
)
651 int i
, npages
, curlen
;
653 unsigned char *srcp
, *destp
;
654 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(rqst
->rq_xprt
);
656 struct page
**ppages
;
658 destp
= rqst
->rq_svec
[0].iov_base
;
659 curlen
= rqst
->rq_svec
[0].iov_len
;
662 dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
663 __func__
, destp
, rqst
->rq_slen
, curlen
);
665 copy_len
= rqst
->rq_snd_buf
.page_len
;
667 if (rqst
->rq_snd_buf
.tail
[0].iov_len
) {
668 curlen
= rqst
->rq_snd_buf
.tail
[0].iov_len
;
669 if (destp
+ copy_len
!= rqst
->rq_snd_buf
.tail
[0].iov_base
) {
670 memmove(destp
+ copy_len
,
671 rqst
->rq_snd_buf
.tail
[0].iov_base
, curlen
);
672 r_xprt
->rx_stats
.pullup_copy_count
+= curlen
;
674 dprintk("RPC: %s: tail destp 0x%p len %d\n",
675 __func__
, destp
+ copy_len
, curlen
);
676 rqst
->rq_svec
[0].iov_len
+= curlen
;
678 r_xprt
->rx_stats
.pullup_copy_count
+= copy_len
;
680 page_base
= rqst
->rq_snd_buf
.page_base
;
681 ppages
= rqst
->rq_snd_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
682 page_base
&= ~PAGE_MASK
;
683 npages
= PAGE_ALIGN(page_base
+copy_len
) >> PAGE_SHIFT
;
684 for (i
= 0; copy_len
&& i
< npages
; i
++) {
685 curlen
= PAGE_SIZE
- page_base
;
686 if (curlen
> copy_len
)
688 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
689 __func__
, i
, destp
, copy_len
, curlen
);
690 srcp
= kmap_atomic(ppages
[i
]);
691 memcpy(destp
, srcp
+page_base
, curlen
);
693 rqst
->rq_svec
[0].iov_len
+= curlen
;
698 /* header now contains entire send message */
702 * Marshal a request: the primary job of this routine is to choose
703 * the transfer modes. See comments below.
705 * Prepares up to two IOVs per Call message:
707 * [0] -- RPC RDMA header
708 * [1] -- the RPC header/data
710 * Returns zero on success, otherwise a negative errno.
714 rpcrdma_marshal_req(struct rpc_rqst
*rqst
)
716 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
717 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
718 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
719 enum rpcrdma_chunktype rtype
, wtype
;
720 struct rpcrdma_msg
*headerp
;
726 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
727 if (test_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
))
728 return rpcrdma_bc_marshal_reply(rqst
);
731 headerp
= rdmab_to_msg(req
->rl_rdmabuf
);
732 /* don't byte-swap XID, it's already done in request */
733 headerp
->rm_xid
= rqst
->rq_xid
;
734 headerp
->rm_vers
= rpcrdma_version
;
735 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
736 headerp
->rm_type
= rdma_msg
;
739 * Chunks needed for results?
741 * o If the expected result is under the inline threshold, all ops
743 * o Large read ops return data as write chunk(s), header as
745 * o Large non-read ops return as a single reply chunk.
747 if (rpcrdma_results_inline(r_xprt
, rqst
))
748 wtype
= rpcrdma_noch
;
749 else if (rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
750 wtype
= rpcrdma_writech
;
752 wtype
= rpcrdma_replych
;
755 * Chunks needed for arguments?
757 * o If the total request is under the inline threshold, all ops
758 * are sent as inline.
759 * o Large write ops transmit data as read chunk(s), header as
761 * o Large non-write ops are sent with the entire message as a
762 * single read chunk (protocol 0-position special case).
764 * This assumes that the upper layer does not present a request
765 * that both has a data payload, and whose non-data arguments
766 * by themselves are larger than the inline threshold.
768 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
769 rtype
= rpcrdma_noch
;
770 rpcrdma_inline_pullup(rqst
);
771 rpclen
= rqst
->rq_svec
[0].iov_len
;
772 } else if (rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
773 rtype
= rpcrdma_readch
;
774 rpclen
= rqst
->rq_svec
[0].iov_len
;
775 rpclen
+= rpcrdma_tail_pullup(&rqst
->rq_snd_buf
);
777 r_xprt
->rx_stats
.nomsg_call_count
++;
778 headerp
->rm_type
= htonl(RDMA_NOMSG
);
779 rtype
= rpcrdma_areadch
;
783 /* This implementation supports the following combinations
784 * of chunk lists in one RPC-over-RDMA Call message:
789 * - Read list + Reply chunk
791 * It might not yet support the following combinations:
793 * - Read list + Write list
795 * It does not support the following combinations:
797 * - Write list + Reply chunk
798 * - Read list + Write list + Reply chunk
800 * This implementation supports only a single chunk in each
801 * Read or Write list. Thus for example the client cannot
802 * send a Call message with a Position Zero Read chunk and a
803 * regular Read chunk at the same time.
806 req
->rl_nextseg
= req
->rl_segments
;
807 iptr
= headerp
->rm_body
.rm_chunks
;
808 iptr
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, iptr
, rtype
);
811 iptr
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, iptr
, wtype
);
814 iptr
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, iptr
, wtype
);
817 hdrlen
= (unsigned char *)iptr
- (unsigned char *)headerp
;
819 if (hdrlen
+ rpclen
> RPCRDMA_INLINE_WRITE_THRESHOLD(rqst
))
822 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
823 rqst
->rq_task
->tk_pid
, __func__
,
824 transfertypes
[rtype
], transfertypes
[wtype
],
827 req
->rl_send_iov
[0].addr
= rdmab_addr(req
->rl_rdmabuf
);
828 req
->rl_send_iov
[0].length
= hdrlen
;
829 req
->rl_send_iov
[0].lkey
= rdmab_lkey(req
->rl_rdmabuf
);
832 if (rtype
== rpcrdma_areadch
)
835 req
->rl_send_iov
[1].addr
= rdmab_addr(req
->rl_sendbuf
);
836 req
->rl_send_iov
[1].length
= rpclen
;
837 req
->rl_send_iov
[1].lkey
= rdmab_lkey(req
->rl_sendbuf
);
843 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
844 hdrlen
, rpclen
, transfertypes
[rtype
], transfertypes
[wtype
]);
845 /* Terminate this RPC. Chunks registered above will be
846 * released by xprt_release -> xprt_rmda_free .
851 for (pos
= 0; req
->rl_nchunks
--;)
852 pos
+= r_xprt
->rx_ia
.ri_ops
->ro_unmap(r_xprt
,
853 &req
->rl_segments
[pos
]);
854 return PTR_ERR(iptr
);
858 * Chase down a received write or reply chunklist to get length
859 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
862 rpcrdma_count_chunks(struct rpcrdma_rep
*rep
, unsigned int max
, int wrchunk
, __be32
**iptrp
)
864 unsigned int i
, total_len
;
865 struct rpcrdma_write_chunk
*cur_wchunk
;
866 char *base
= (char *)rdmab_to_msg(rep
->rr_rdmabuf
);
868 i
= be32_to_cpu(**iptrp
);
871 cur_wchunk
= (struct rpcrdma_write_chunk
*) (*iptrp
+ 1);
874 struct rpcrdma_segment
*seg
= &cur_wchunk
->wc_target
;
877 xdr_decode_hyper((__be32
*)&seg
->rs_offset
, &off
);
878 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
880 be32_to_cpu(seg
->rs_length
),
881 (unsigned long long)off
,
882 be32_to_cpu(seg
->rs_handle
));
884 total_len
+= be32_to_cpu(seg
->rs_length
);
887 /* check and adjust for properly terminated write chunk */
889 __be32
*w
= (__be32
*) cur_wchunk
;
890 if (*w
++ != xdr_zero
)
892 cur_wchunk
= (struct rpcrdma_write_chunk
*) w
;
894 if ((char *)cur_wchunk
> base
+ rep
->rr_len
)
897 *iptrp
= (__be32
*) cur_wchunk
;
902 * Scatter inline received data back into provided iov's.
905 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
907 int i
, npages
, curlen
, olen
;
909 struct page
**ppages
;
912 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
913 if (curlen
> copy_len
) { /* write chunk header fixup */
915 rqst
->rq_rcv_buf
.head
[0].iov_len
= curlen
;
918 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
919 __func__
, srcp
, copy_len
, curlen
);
921 /* Shift pointer for first receive segment only */
922 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
928 rpcx_to_rdmax(rqst
->rq_xprt
)->rx_stats
.fixup_copy_count
+= olen
;
929 page_base
= rqst
->rq_rcv_buf
.page_base
;
930 ppages
= rqst
->rq_rcv_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
931 page_base
&= ~PAGE_MASK
;
933 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
934 npages
= PAGE_ALIGN(page_base
+
935 rqst
->rq_rcv_buf
.page_len
) >> PAGE_SHIFT
;
936 for (; i
< npages
; i
++) {
937 curlen
= PAGE_SIZE
- page_base
;
938 if (curlen
> copy_len
)
940 dprintk("RPC: %s: page %d"
941 " srcp 0x%p len %d curlen %d\n",
942 __func__
, i
, srcp
, copy_len
, curlen
);
943 destp
= kmap_atomic(ppages
[i
]);
944 memcpy(destp
+ page_base
, srcp
, curlen
);
945 flush_dcache_page(ppages
[i
]);
946 kunmap_atomic(destp
);
955 if (copy_len
&& rqst
->rq_rcv_buf
.tail
[0].iov_len
) {
957 if (curlen
> rqst
->rq_rcv_buf
.tail
[0].iov_len
)
958 curlen
= rqst
->rq_rcv_buf
.tail
[0].iov_len
;
959 if (rqst
->rq_rcv_buf
.tail
[0].iov_base
!= srcp
)
960 memmove(rqst
->rq_rcv_buf
.tail
[0].iov_base
, srcp
, curlen
);
961 dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
962 __func__
, srcp
, copy_len
, curlen
);
963 rqst
->rq_rcv_buf
.tail
[0].iov_len
= curlen
;
964 copy_len
-= curlen
; ++i
;
966 rqst
->rq_rcv_buf
.tail
[0].iov_len
= 0;
969 /* implicit padding on terminal chunk */
970 unsigned char *p
= rqst
->rq_rcv_buf
.tail
[0].iov_base
;
972 p
[rqst
->rq_rcv_buf
.tail
[0].iov_len
++] = 0;
976 dprintk("RPC: %s: %d bytes in"
977 " %d extra segments (%d lost)\n",
978 __func__
, olen
, i
, copy_len
);
980 /* TBD avoid a warning from call_decode() */
981 rqst
->rq_private_buf
= rqst
->rq_rcv_buf
;
985 rpcrdma_connect_worker(struct work_struct
*work
)
987 struct rpcrdma_ep
*ep
=
988 container_of(work
, struct rpcrdma_ep
, rep_connect_worker
.work
);
989 struct rpcrdma_xprt
*r_xprt
=
990 container_of(ep
, struct rpcrdma_xprt
, rx_ep
);
991 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
993 spin_lock_bh(&xprt
->transport_lock
);
994 if (++xprt
->connect_cookie
== 0) /* maintain a reserved value */
995 ++xprt
->connect_cookie
;
996 if (ep
->rep_connected
> 0) {
997 if (!xprt_test_and_set_connected(xprt
))
998 xprt_wake_pending_tasks(xprt
, 0);
1000 if (xprt_test_and_clear_connected(xprt
))
1001 xprt_wake_pending_tasks(xprt
, -ENOTCONN
);
1003 spin_unlock_bh(&xprt
->transport_lock
);
1006 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1007 /* By convention, backchannel calls arrive via rdma_msg type
1008 * messages, and never populate the chunk lists. This makes
1009 * the RPC/RDMA header small and fixed in size, so it is
1010 * straightforward to check the RPC header's direction field.
1013 rpcrdma_is_bcall(struct rpcrdma_msg
*headerp
)
1015 __be32
*p
= (__be32
*)headerp
;
1017 if (headerp
->rm_type
!= rdma_msg
)
1019 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
)
1021 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
)
1023 if (headerp
->rm_body
.rm_chunks
[2] != xdr_zero
)
1027 if (p
[7] != headerp
->rm_xid
)
1029 /* call direction */
1030 if (p
[8] != cpu_to_be32(RPC_CALL
))
1035 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1038 * This function is called when an async event is posted to
1039 * the connection which changes the connection state. All it
1040 * does at this point is mark the connection up/down, the rpc
1041 * timers do the rest.
1044 rpcrdma_conn_func(struct rpcrdma_ep
*ep
)
1046 schedule_delayed_work(&ep
->rep_connect_worker
, 0);
1049 /* Process received RPC/RDMA messages.
1051 * Errors must result in the RPC task either being awakened, or
1052 * allowed to timeout, to discover the errors at that time.
1055 rpcrdma_reply_handler(struct rpcrdma_rep
*rep
)
1057 struct rpcrdma_msg
*headerp
;
1058 struct rpcrdma_req
*req
;
1059 struct rpc_rqst
*rqst
;
1060 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
1061 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
1063 int rdmalen
, status
, rmerr
;
1066 dprintk("RPC: %s: incoming rep %p\n", __func__
, rep
);
1068 if (rep
->rr_len
== RPCRDMA_BAD_LEN
)
1070 if (rep
->rr_len
< RPCRDMA_HDRLEN_ERR
)
1071 goto out_shortreply
;
1073 headerp
= rdmab_to_msg(rep
->rr_rdmabuf
);
1074 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1075 if (rpcrdma_is_bcall(headerp
))
1079 /* Match incoming rpcrdma_rep to an rpcrdma_req to
1080 * get context for handling any incoming chunks.
1082 spin_lock_bh(&xprt
->transport_lock
);
1083 rqst
= xprt_lookup_rqst(xprt
, headerp
->rm_xid
);
1087 req
= rpcr_to_rdmar(rqst
);
1091 /* Sanity checking has passed. We are now committed
1092 * to complete this transaction.
1094 list_del_init(&rqst
->rq_list
);
1095 spin_unlock_bh(&xprt
->transport_lock
);
1096 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
1097 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1099 /* from here on, the reply is no longer an orphan */
1100 req
->rl_reply
= rep
;
1101 xprt
->reestablish_timeout
= 0;
1103 if (headerp
->rm_vers
!= rpcrdma_version
)
1104 goto out_badversion
;
1106 /* check for expected message types */
1107 /* The order of some of these tests is important. */
1108 switch (headerp
->rm_type
) {
1110 /* never expect read chunks */
1111 /* never expect reply chunks (two ways to check) */
1112 /* never expect write chunks without having offered RDMA */
1113 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1114 (headerp
->rm_body
.rm_chunks
[1] == xdr_zero
&&
1115 headerp
->rm_body
.rm_chunks
[2] != xdr_zero
) ||
1116 (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
&&
1117 req
->rl_nchunks
== 0))
1119 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
) {
1120 /* count any expected write chunks in read reply */
1121 /* start at write chunk array count */
1122 iptr
= &headerp
->rm_body
.rm_chunks
[2];
1123 rdmalen
= rpcrdma_count_chunks(rep
,
1124 req
->rl_nchunks
, 1, &iptr
);
1125 /* check for validity, and no reply chunk after */
1126 if (rdmalen
< 0 || *iptr
++ != xdr_zero
)
1129 ((unsigned char *)iptr
- (unsigned char *)headerp
);
1130 status
= rep
->rr_len
+ rdmalen
;
1131 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1132 /* special case - last chunk may omit padding */
1134 rdmalen
= 4 - rdmalen
;
1138 /* else ordinary inline */
1140 iptr
= (__be32
*)((unsigned char *)headerp
+
1141 RPCRDMA_HDRLEN_MIN
);
1142 rep
->rr_len
-= RPCRDMA_HDRLEN_MIN
;
1143 status
= rep
->rr_len
;
1145 /* Fix up the rpc results for upper layer */
1146 rpcrdma_inline_fixup(rqst
, (char *)iptr
, rep
->rr_len
, rdmalen
);
1150 /* never expect read or write chunks, always reply chunks */
1151 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1152 headerp
->rm_body
.rm_chunks
[1] != xdr_zero
||
1153 headerp
->rm_body
.rm_chunks
[2] != xdr_one
||
1154 req
->rl_nchunks
== 0)
1156 iptr
= (__be32
*)((unsigned char *)headerp
+
1157 RPCRDMA_HDRLEN_MIN
);
1158 rdmalen
= rpcrdma_count_chunks(rep
, req
->rl_nchunks
, 0, &iptr
);
1161 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1162 /* Reply chunk buffer already is the reply vector - no fixup. */
1171 dprintk("%s: invalid rpcrdma reply header (type %d):"
1172 " chunks[012] == %d %d %d"
1173 " expected chunks <= %d\n",
1174 __func__
, be32_to_cpu(headerp
->rm_type
),
1175 headerp
->rm_body
.rm_chunks
[0],
1176 headerp
->rm_body
.rm_chunks
[1],
1177 headerp
->rm_body
.rm_chunks
[2],
1180 r_xprt
->rx_stats
.bad_reply_count
++;
1185 /* Invalidate and flush the data payloads before waking the
1186 * waiting application. This guarantees the memory region is
1187 * properly fenced from the server before the application
1188 * accesses the data. It also ensures proper send flow
1189 * control: waking the next RPC waits until this RPC has
1190 * relinquished all its Send Queue entries.
1192 if (req
->rl_nchunks
)
1193 r_xprt
->rx_ia
.ri_ops
->ro_unmap_sync(r_xprt
, req
);
1195 spin_lock_bh(&xprt
->transport_lock
);
1197 xprt
->cwnd
= atomic_read(&r_xprt
->rx_buf
.rb_credits
) << RPC_CWNDSHIFT
;
1198 if (xprt
->cwnd
> cwnd
)
1199 xprt_release_rqst_cong(rqst
->rq_task
);
1201 xprt_complete_rqst(rqst
->rq_task
, status
);
1202 spin_unlock_bh(&xprt
->transport_lock
);
1203 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1204 __func__
, xprt
, rqst
, status
);
1208 rpcrdma_recv_buffer_put(rep
);
1209 if (r_xprt
->rx_ep
.rep_connected
== 1) {
1210 r_xprt
->rx_ep
.rep_connected
= -EIO
;
1211 rpcrdma_conn_func(&r_xprt
->rx_ep
);
1215 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1217 rpcrdma_bc_receive_call(r_xprt
, rep
);
1221 /* If the incoming reply terminated a pending RPC, the next
1222 * RPC call will post a replacement receive buffer as it is
1226 dprintk("RPC: %s: invalid version %d\n",
1227 __func__
, be32_to_cpu(headerp
->rm_vers
));
1229 r_xprt
->rx_stats
.bad_reply_count
++;
1233 rmerr
= be32_to_cpu(headerp
->rm_body
.rm_error
.rm_err
);
1236 pr_err("%s: server reports header version error (%u-%u)\n",
1238 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_low
),
1239 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_high
));
1242 pr_err("%s: server reports header decoding error\n",
1246 pr_err("%s: server reports unknown error %d\n",
1249 status
= -EREMOTEIO
;
1250 r_xprt
->rx_stats
.bad_reply_count
++;
1253 /* If no pending RPC transaction was matched, post a replacement
1254 * receive buffer before returning.
1257 dprintk("RPC: %s: short/invalid reply\n", __func__
);
1261 spin_unlock_bh(&xprt
->transport_lock
);
1262 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
1263 __func__
, be32_to_cpu(headerp
->rm_xid
),
1268 spin_unlock_bh(&xprt
->transport_lock
);
1270 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1271 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1274 r_xprt
->rx_stats
.bad_reply_count
++;
1275 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, &r_xprt
->rx_ep
, rep
))
1276 rpcrdma_recv_buffer_put(rep
);