c0be1ecd11c99ce57360687fa2b34f312c0cf0d6
[deliverable/linux.git] / rds / recv.c
1 /*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33 #include <linux/kernel.h>
34 #include <linux/slab.h>
35 #include <net/sock.h>
36 #include <linux/in.h>
37 #include <linux/export.h>
38 #include <linux/time.h>
39 #include <linux/rds.h>
40
41 #include "rds.h"
42
43 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
44 __be32 saddr)
45 {
46 atomic_set(&inc->i_refcount, 1);
47 INIT_LIST_HEAD(&inc->i_item);
48 inc->i_conn = conn;
49 inc->i_saddr = saddr;
50 inc->i_rdma_cookie = 0;
51 inc->i_rx_tstamp.tv_sec = 0;
52 inc->i_rx_tstamp.tv_usec = 0;
53 }
54 EXPORT_SYMBOL_GPL(rds_inc_init);
55
56 static void rds_inc_addref(struct rds_incoming *inc)
57 {
58 rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
59 atomic_inc(&inc->i_refcount);
60 }
61
62 void rds_inc_put(struct rds_incoming *inc)
63 {
64 rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
65 if (atomic_dec_and_test(&inc->i_refcount)) {
66 BUG_ON(!list_empty(&inc->i_item));
67
68 inc->i_conn->c_trans->inc_free(inc);
69 }
70 }
71 EXPORT_SYMBOL_GPL(rds_inc_put);
72
73 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
74 struct rds_cong_map *map,
75 int delta, __be16 port)
76 {
77 int now_congested;
78
79 if (delta == 0)
80 return;
81
82 rs->rs_rcv_bytes += delta;
83 now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
84
85 rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
86 "now_cong %d delta %d\n",
87 rs, &rs->rs_bound_addr,
88 ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
89 rds_sk_rcvbuf(rs), now_congested, delta);
90
91 /* wasn't -> am congested */
92 if (!rs->rs_congested && now_congested) {
93 rs->rs_congested = 1;
94 rds_cong_set_bit(map, port);
95 rds_cong_queue_updates(map);
96 }
97 /* was -> aren't congested */
98 /* Require more free space before reporting uncongested to prevent
99 bouncing cong/uncong state too often */
100 else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
101 rs->rs_congested = 0;
102 rds_cong_clear_bit(map, port);
103 rds_cong_queue_updates(map);
104 }
105
106 /* do nothing if no change in cong state */
107 }
108
109 /*
110 * Process all extension headers that come with this message.
111 */
112 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
113 {
114 struct rds_header *hdr = &inc->i_hdr;
115 unsigned int pos = 0, type, len;
116 union {
117 struct rds_ext_header_version version;
118 struct rds_ext_header_rdma rdma;
119 struct rds_ext_header_rdma_dest rdma_dest;
120 } buffer;
121
122 while (1) {
123 len = sizeof(buffer);
124 type = rds_message_next_extension(hdr, &pos, &buffer, &len);
125 if (type == RDS_EXTHDR_NONE)
126 break;
127 /* Process extension header here */
128 switch (type) {
129 case RDS_EXTHDR_RDMA:
130 rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
131 break;
132
133 case RDS_EXTHDR_RDMA_DEST:
134 /* We ignore the size for now. We could stash it
135 * somewhere and use it for error checking. */
136 inc->i_rdma_cookie = rds_rdma_make_cookie(
137 be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
138 be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
139
140 break;
141 }
142 }
143 }
144
145 /*
146 * The transport must make sure that this is serialized against other
147 * rx and conn reset on this specific conn.
148 *
149 * We currently assert that only one fragmented message will be sent
150 * down a connection at a time. This lets us reassemble in the conn
151 * instead of per-flow which means that we don't have to go digging through
152 * flows to tear down partial reassembly progress on conn failure and
153 * we save flow lookup and locking for each frag arrival. It does mean
154 * that small messages will wait behind large ones. Fragmenting at all
155 * is only to reduce the memory consumption of pre-posted buffers.
156 *
157 * The caller passes in saddr and daddr instead of us getting it from the
158 * conn. This lets loopback, who only has one conn for both directions,
159 * tell us which roles the addrs in the conn are playing for this message.
160 */
161 void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
162 struct rds_incoming *inc, gfp_t gfp)
163 {
164 struct rds_sock *rs = NULL;
165 struct sock *sk;
166 unsigned long flags;
167
168 inc->i_conn = conn;
169 inc->i_rx_jiffies = jiffies;
170
171 rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
172 "flags 0x%x rx_jiffies %lu\n", conn,
173 (unsigned long long)conn->c_next_rx_seq,
174 inc,
175 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
176 be32_to_cpu(inc->i_hdr.h_len),
177 be16_to_cpu(inc->i_hdr.h_sport),
178 be16_to_cpu(inc->i_hdr.h_dport),
179 inc->i_hdr.h_flags,
180 inc->i_rx_jiffies);
181
182 /*
183 * Sequence numbers should only increase. Messages get their
184 * sequence number as they're queued in a sending conn. They
185 * can be dropped, though, if the sending socket is closed before
186 * they hit the wire. So sequence numbers can skip forward
187 * under normal operation. They can also drop back in the conn
188 * failover case as previously sent messages are resent down the
189 * new instance of a conn. We drop those, otherwise we have
190 * to assume that the next valid seq does not come after a
191 * hole in the fragment stream.
192 *
193 * The headers don't give us a way to realize if fragments of
194 * a message have been dropped. We assume that frags that arrive
195 * to a flow are part of the current message on the flow that is
196 * being reassembled. This means that senders can't drop messages
197 * from the sending conn until all their frags are sent.
198 *
199 * XXX we could spend more on the wire to get more robust failure
200 * detection, arguably worth it to avoid data corruption.
201 */
202 if (be64_to_cpu(inc->i_hdr.h_sequence) < conn->c_next_rx_seq &&
203 (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
204 rds_stats_inc(s_recv_drop_old_seq);
205 goto out;
206 }
207 conn->c_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
208
209 if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
210 rds_stats_inc(s_recv_ping);
211 rds_send_pong(conn, inc->i_hdr.h_sport);
212 goto out;
213 }
214
215 rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
216 if (!rs) {
217 rds_stats_inc(s_recv_drop_no_sock);
218 goto out;
219 }
220
221 /* Process extension headers */
222 rds_recv_incoming_exthdrs(inc, rs);
223
224 /* We can be racing with rds_release() which marks the socket dead. */
225 sk = rds_rs_to_sk(rs);
226
227 /* serialize with rds_release -> sock_orphan */
228 write_lock_irqsave(&rs->rs_recv_lock, flags);
229 if (!sock_flag(sk, SOCK_DEAD)) {
230 rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
231 rds_stats_inc(s_recv_queued);
232 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
233 be32_to_cpu(inc->i_hdr.h_len),
234 inc->i_hdr.h_dport);
235 if (sock_flag(sk, SOCK_RCVTSTAMP))
236 do_gettimeofday(&inc->i_rx_tstamp);
237 rds_inc_addref(inc);
238 list_add_tail(&inc->i_item, &rs->rs_recv_queue);
239 __rds_wake_sk_sleep(sk);
240 } else {
241 rds_stats_inc(s_recv_drop_dead_sock);
242 }
243 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
244
245 out:
246 if (rs)
247 rds_sock_put(rs);
248 }
249 EXPORT_SYMBOL_GPL(rds_recv_incoming);
250
251 /*
252 * be very careful here. This is being called as the condition in
253 * wait_event_*() needs to cope with being called many times.
254 */
255 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
256 {
257 unsigned long flags;
258
259 if (!*inc) {
260 read_lock_irqsave(&rs->rs_recv_lock, flags);
261 if (!list_empty(&rs->rs_recv_queue)) {
262 *inc = list_entry(rs->rs_recv_queue.next,
263 struct rds_incoming,
264 i_item);
265 rds_inc_addref(*inc);
266 }
267 read_unlock_irqrestore(&rs->rs_recv_lock, flags);
268 }
269
270 return *inc != NULL;
271 }
272
273 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
274 int drop)
275 {
276 struct sock *sk = rds_rs_to_sk(rs);
277 int ret = 0;
278 unsigned long flags;
279
280 write_lock_irqsave(&rs->rs_recv_lock, flags);
281 if (!list_empty(&inc->i_item)) {
282 ret = 1;
283 if (drop) {
284 /* XXX make sure this i_conn is reliable */
285 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
286 -be32_to_cpu(inc->i_hdr.h_len),
287 inc->i_hdr.h_dport);
288 list_del_init(&inc->i_item);
289 rds_inc_put(inc);
290 }
291 }
292 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
293
294 rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
295 return ret;
296 }
297
298 /*
299 * Pull errors off the error queue.
300 * If msghdr is NULL, we will just purge the error queue.
301 */
302 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
303 {
304 struct rds_notifier *notifier;
305 struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */
306 unsigned int count = 0, max_messages = ~0U;
307 unsigned long flags;
308 LIST_HEAD(copy);
309 int err = 0;
310
311
312 /* put_cmsg copies to user space and thus may sleep. We can't do this
313 * with rs_lock held, so first grab as many notifications as we can stuff
314 * in the user provided cmsg buffer. We don't try to copy more, to avoid
315 * losing notifications - except when the buffer is so small that it wouldn't
316 * even hold a single notification. Then we give him as much of this single
317 * msg as we can squeeze in, and set MSG_CTRUNC.
318 */
319 if (msghdr) {
320 max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
321 if (!max_messages)
322 max_messages = 1;
323 }
324
325 spin_lock_irqsave(&rs->rs_lock, flags);
326 while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
327 notifier = list_entry(rs->rs_notify_queue.next,
328 struct rds_notifier, n_list);
329 list_move(&notifier->n_list, &copy);
330 count++;
331 }
332 spin_unlock_irqrestore(&rs->rs_lock, flags);
333
334 if (!count)
335 return 0;
336
337 while (!list_empty(&copy)) {
338 notifier = list_entry(copy.next, struct rds_notifier, n_list);
339
340 if (msghdr) {
341 cmsg.user_token = notifier->n_user_token;
342 cmsg.status = notifier->n_status;
343
344 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
345 sizeof(cmsg), &cmsg);
346 if (err)
347 break;
348 }
349
350 list_del_init(&notifier->n_list);
351 kfree(notifier);
352 }
353
354 /* If we bailed out because of an error in put_cmsg,
355 * we may be left with one or more notifications that we
356 * didn't process. Return them to the head of the list. */
357 if (!list_empty(&copy)) {
358 spin_lock_irqsave(&rs->rs_lock, flags);
359 list_splice(&copy, &rs->rs_notify_queue);
360 spin_unlock_irqrestore(&rs->rs_lock, flags);
361 }
362
363 return err;
364 }
365
366 /*
367 * Queue a congestion notification
368 */
369 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
370 {
371 uint64_t notify = rs->rs_cong_notify;
372 unsigned long flags;
373 int err;
374
375 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
376 sizeof(notify), &notify);
377 if (err)
378 return err;
379
380 spin_lock_irqsave(&rs->rs_lock, flags);
381 rs->rs_cong_notify &= ~notify;
382 spin_unlock_irqrestore(&rs->rs_lock, flags);
383
384 return 0;
385 }
386
387 /*
388 * Receive any control messages.
389 */
390 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
391 struct rds_sock *rs)
392 {
393 int ret = 0;
394
395 if (inc->i_rdma_cookie) {
396 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
397 sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
398 if (ret)
399 return ret;
400 }
401
402 if ((inc->i_rx_tstamp.tv_sec != 0) &&
403 sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
404 ret = put_cmsg(msg, SOL_SOCKET, SCM_TIMESTAMP,
405 sizeof(struct timeval),
406 &inc->i_rx_tstamp);
407 if (ret)
408 return ret;
409 }
410
411 return 0;
412 }
413
414 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
415 int msg_flags)
416 {
417 struct sock *sk = sock->sk;
418 struct rds_sock *rs = rds_sk_to_rs(sk);
419 long timeo;
420 int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
421 DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
422 struct rds_incoming *inc = NULL;
423
424 /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
425 timeo = sock_rcvtimeo(sk, nonblock);
426
427 rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
428
429 if (msg_flags & MSG_OOB)
430 goto out;
431
432 while (1) {
433 struct iov_iter save;
434 /* If there are pending notifications, do those - and nothing else */
435 if (!list_empty(&rs->rs_notify_queue)) {
436 ret = rds_notify_queue_get(rs, msg);
437 break;
438 }
439
440 if (rs->rs_cong_notify) {
441 ret = rds_notify_cong(rs, msg);
442 break;
443 }
444
445 if (!rds_next_incoming(rs, &inc)) {
446 if (nonblock) {
447 ret = -EAGAIN;
448 break;
449 }
450
451 timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
452 (!list_empty(&rs->rs_notify_queue) ||
453 rs->rs_cong_notify ||
454 rds_next_incoming(rs, &inc)), timeo);
455 rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
456 timeo);
457 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
458 continue;
459
460 ret = timeo;
461 if (ret == 0)
462 ret = -ETIMEDOUT;
463 break;
464 }
465
466 rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
467 &inc->i_conn->c_faddr,
468 ntohs(inc->i_hdr.h_sport));
469 save = msg->msg_iter;
470 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
471 if (ret < 0)
472 break;
473
474 /*
475 * if the message we just copied isn't at the head of the
476 * recv queue then someone else raced us to return it, try
477 * to get the next message.
478 */
479 if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
480 rds_inc_put(inc);
481 inc = NULL;
482 rds_stats_inc(s_recv_deliver_raced);
483 msg->msg_iter = save;
484 continue;
485 }
486
487 if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
488 if (msg_flags & MSG_TRUNC)
489 ret = be32_to_cpu(inc->i_hdr.h_len);
490 msg->msg_flags |= MSG_TRUNC;
491 }
492
493 if (rds_cmsg_recv(inc, msg, rs)) {
494 ret = -EFAULT;
495 goto out;
496 }
497
498 rds_stats_inc(s_recv_delivered);
499
500 if (sin) {
501 sin->sin_family = AF_INET;
502 sin->sin_port = inc->i_hdr.h_sport;
503 sin->sin_addr.s_addr = inc->i_saddr;
504 memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
505 msg->msg_namelen = sizeof(*sin);
506 }
507 break;
508 }
509
510 if (inc)
511 rds_inc_put(inc);
512
513 out:
514 return ret;
515 }
516
517 /*
518 * The socket is being shut down and we're asked to drop messages that were
519 * queued for recvmsg. The caller has unbound the socket so the receive path
520 * won't queue any more incoming fragments or messages on the socket.
521 */
522 void rds_clear_recv_queue(struct rds_sock *rs)
523 {
524 struct sock *sk = rds_rs_to_sk(rs);
525 struct rds_incoming *inc, *tmp;
526 unsigned long flags;
527
528 write_lock_irqsave(&rs->rs_recv_lock, flags);
529 list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
530 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
531 -be32_to_cpu(inc->i_hdr.h_len),
532 inc->i_hdr.h_dport);
533 list_del_init(&inc->i_item);
534 rds_inc_put(inc);
535 }
536 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
537 }
538
539 /*
540 * inc->i_saddr isn't used here because it is only set in the receive
541 * path.
542 */
543 void rds_inc_info_copy(struct rds_incoming *inc,
544 struct rds_info_iterator *iter,
545 __be32 saddr, __be32 daddr, int flip)
546 {
547 struct rds_info_message minfo;
548
549 minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
550 minfo.len = be32_to_cpu(inc->i_hdr.h_len);
551
552 if (flip) {
553 minfo.laddr = daddr;
554 minfo.faddr = saddr;
555 minfo.lport = inc->i_hdr.h_dport;
556 minfo.fport = inc->i_hdr.h_sport;
557 } else {
558 minfo.laddr = saddr;
559 minfo.faddr = daddr;
560 minfo.lport = inc->i_hdr.h_sport;
561 minfo.fport = inc->i_hdr.h_dport;
562 }
563
564 rds_info_copy(iter, &minfo, sizeof(minfo));
565 }
This page took 0.041071 seconds and 4 git commands to generate.